Prepare Database Access Layer for Dependency Injection (#12546)
* Prepare database access objects for dependency injection * Replace duplicate code * Remove unused imports * Remove redundant validation call * Remove unused imports * Use constants * Disable fast fail during connection pool initialization * Remove typo * Add missing test dependency * Add missing test dependency * Add missing test dependency * Fix issue caused by rebase * Add method for cloud * Autoclose DSL context during migration * Better connection close handling * Fix typo in dependency * Fix SpotBugs issue * React to rebase * Fix typo * Update JavaDoc * Fix database close calls * Pass configs to getServer * Fix typo * Fix call to removed method * Fix typo * Use catalog to manage versions * PR feedback * Centralize shutdown hook * Fix rebase issues * Document test cases * Document test cases * Formatting * Properly close database resources * Rebase cleanup
This commit is contained in:
@@ -21,7 +21,7 @@ dependencies {
|
||||
|
||||
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
|
||||
testImplementation project(':airbyte-test-utils')
|
||||
testImplementation "org.testcontainers:db2:1.15.3"
|
||||
testImplementation libs.testcontainers.db2
|
||||
|
||||
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
|
||||
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-db2')
|
||||
|
||||
@@ -8,6 +8,7 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.airbyte.commons.functional.CheckedFunction;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.db.factory.DatabaseDriver;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.db.jdbc.streaming.AdaptiveStreamingQueryConfig;
|
||||
import io.airbyte.integrations.base.IntegrationRunner;
|
||||
@@ -34,7 +35,7 @@ import org.slf4j.LoggerFactory;
|
||||
public class Db2Source extends AbstractJdbcSource<JDBCType> implements Source {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(Db2Source.class);
|
||||
public static final String DRIVER_CLASS = "com.ibm.db2.jcc.DB2Driver";
|
||||
public static final String DRIVER_CLASS = DatabaseDriver.DB2.getDriverClassName();
|
||||
public static final String USERNAME = "username";
|
||||
public static final String PASSWORD = "password";
|
||||
|
||||
@@ -54,9 +55,9 @@ public class Db2Source extends AbstractJdbcSource<JDBCType> implements Source {
|
||||
|
||||
@Override
|
||||
public JsonNode toDatabaseConfig(final JsonNode config) {
|
||||
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:db2://%s:%s/%s",
|
||||
final StringBuilder jdbcUrl = new StringBuilder(String.format(DatabaseDriver.DB2.getUrlFormatString(),
|
||||
config.get("host").asText(),
|
||||
config.get("port").asText(),
|
||||
config.get("port").asInt(),
|
||||
config.get("db").asText()));
|
||||
|
||||
var result = Jsons.jsonNode(ImmutableMap.builder()
|
||||
|
||||
@@ -11,7 +11,9 @@ import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.commons.resources.MoreResources;
|
||||
import io.airbyte.db.Databases;
|
||||
import io.airbyte.db.factory.DataSourceFactory;
|
||||
import io.airbyte.db.factory.DatabaseDriver;
|
||||
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.integrations.source.db2.Db2Source;
|
||||
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
|
||||
@@ -62,7 +64,7 @@ public class Db2SourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
return config;
|
||||
}
|
||||
|
||||
private JsonNode getConfig(String userName, String password) {
|
||||
private JsonNode getConfig(final String userName, final String password) {
|
||||
return Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put("host", db.getHost())
|
||||
.put("port", db.getFirstMappedPort())
|
||||
@@ -111,14 +113,17 @@ public class Db2SourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
|
||||
config = getConfig(db.getUsername(), db.getPassword());
|
||||
|
||||
database = Databases.createJdbcDatabase(
|
||||
config.get("username").asText(),
|
||||
config.get("password").asText(),
|
||||
String.format("jdbc:db2://%s:%s/%s",
|
||||
config.get("host").asText(),
|
||||
config.get("port").asText(),
|
||||
config.get("db").asText()),
|
||||
Db2Source.DRIVER_CLASS);
|
||||
database = new DefaultJdbcDatabase(
|
||||
DataSourceFactory.create(
|
||||
config.get("username").asText(),
|
||||
config.get("password").asText(),
|
||||
Db2Source.DRIVER_CLASS,
|
||||
String.format(DatabaseDriver.DB2.getUrlFormatString(),
|
||||
config.get("host").asText(),
|
||||
config.get("port").asInt(),
|
||||
config.get("db").asText())
|
||||
)
|
||||
);
|
||||
|
||||
final String createSchemaQuery = String.format("CREATE SCHEMA %s", SCHEMA_NAME);
|
||||
final String createTableQuery1 = String
|
||||
@@ -160,10 +165,10 @@ public class Db2SourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
@Test
|
||||
public void testCheckPrivilegesForUserWithLessPerm() throws Exception {
|
||||
createUser(LESS_PERMITTED_USER);
|
||||
JsonNode config = getConfig(LESS_PERMITTED_USER, PASSWORD);
|
||||
final JsonNode config = getConfig(LESS_PERMITTED_USER, PASSWORD);
|
||||
|
||||
final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
|
||||
List<String> expected = List.of(STREAM_NAME3, STREAM_NAME1);
|
||||
final List<String> expected = List.of(STREAM_NAME3, STREAM_NAME1);
|
||||
assertEquals(expected.size(), actualNamesWithPermission.size());
|
||||
assertEquals(expected, actualNamesWithPermission);
|
||||
}
|
||||
@@ -172,21 +177,21 @@ public class Db2SourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
public void testCheckPrivilegesForUserWithoutPerm() throws Exception {
|
||||
createUser(USER_WITH_OUT_PERMISSIONS);
|
||||
|
||||
JsonNode config = getConfig(USER_WITH_OUT_PERMISSIONS, PASSWORD);
|
||||
final JsonNode config = getConfig(USER_WITH_OUT_PERMISSIONS, PASSWORD);
|
||||
|
||||
final List<String> actualNamesWithPermission = getActualNamesWithPermission(config);
|
||||
List<String> expected = Collections.emptyList();
|
||||
final List<String> expected = Collections.emptyList();
|
||||
assertEquals(0, actualNamesWithPermission.size());
|
||||
assertEquals(expected, actualNamesWithPermission);
|
||||
}
|
||||
|
||||
private void createUser(String lessPermittedUser) throws IOException, InterruptedException {
|
||||
String encryptedPassword = db.execInContainer("openssl", "passwd", PASSWORD).getStdout().replaceAll("\n", "");
|
||||
private void createUser(final String lessPermittedUser) throws IOException, InterruptedException {
|
||||
final String encryptedPassword = db.execInContainer("openssl", "passwd", PASSWORD).getStdout().replaceAll("\n", "");
|
||||
db.execInContainer("useradd", lessPermittedUser, "-p", encryptedPassword);
|
||||
}
|
||||
|
||||
private List<String> getActualNamesWithPermission(JsonNode config) throws Exception {
|
||||
AirbyteCatalog airbyteCatalog = new Db2Source().discover(config);
|
||||
private List<String> getActualNamesWithPermission(final JsonNode config) throws Exception {
|
||||
final AirbyteCatalog airbyteCatalog = new Db2Source().discover(config);
|
||||
return airbyteCatalog
|
||||
.getStreams()
|
||||
.stream()
|
||||
|
||||
@@ -9,7 +9,8 @@ import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.commons.resources.MoreResources;
|
||||
import io.airbyte.db.Databases;
|
||||
import io.airbyte.db.factory.DataSourceFactory;
|
||||
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.integrations.source.db2.Db2Source;
|
||||
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
|
||||
@@ -119,11 +120,14 @@ public class Db2SourceCertificateAcceptanceTest extends SourceAcceptanceTest {
|
||||
config.get("db").asText()) + ":sslConnection=true;sslTrustStoreLocation=" + KEY_STORE_FILE_PATH +
|
||||
";sslTrustStorePassword=" + TEST_KEY_STORE_PASS + ";";
|
||||
|
||||
database = Databases.createJdbcDatabase(
|
||||
config.get("username").asText(),
|
||||
config.get("password").asText(),
|
||||
jdbcUrl,
|
||||
Db2Source.DRIVER_CLASS);
|
||||
database = new DefaultJdbcDatabase(
|
||||
DataSourceFactory.create(
|
||||
config.get("username").asText(),
|
||||
config.get("password").asText(),
|
||||
Db2Source.DRIVER_CLASS,
|
||||
jdbcUrl
|
||||
)
|
||||
);
|
||||
|
||||
final String createSchemaQuery = String.format("CREATE SCHEMA %s", SCHEMA_NAME);
|
||||
final String createTableQuery1 = String
|
||||
|
||||
@@ -8,12 +8,16 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.db.Database;
|
||||
import io.airbyte.db.Databases;
|
||||
import io.airbyte.db.factory.DSLContextFactory;
|
||||
import io.airbyte.db.factory.DataSourceFactory;
|
||||
import io.airbyte.db.factory.DatabaseDriver;
|
||||
import io.airbyte.integrations.source.db2.Db2Source;
|
||||
import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTest;
|
||||
import io.airbyte.integrations.standardtest.source.TestDataHolder;
|
||||
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
|
||||
import io.airbyte.protocol.models.JsonSchemaType;
|
||||
import javax.sql.DataSource;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.testcontainers.containers.Db2Container;
|
||||
|
||||
@@ -56,15 +60,15 @@ public class Db2SourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
.build()))
|
||||
.build());
|
||||
|
||||
final Database database = Databases.createDatabase(
|
||||
final DSLContext dslContext = DSLContextFactory.create(
|
||||
config.get("username").asText(),
|
||||
config.get("password").asText(),
|
||||
String.format("jdbc:db2://%s:%s/%s",
|
||||
config.get("host").asText(),
|
||||
config.get("port").asText(),
|
||||
config.get("db").asText()),
|
||||
Db2Source.DRIVER_CLASS,
|
||||
SQLDialect.DEFAULT);
|
||||
String.format(DatabaseDriver.DB2.getUrlFormatString(),
|
||||
config.get("host").asText(),
|
||||
config.get("port").asInt(),
|
||||
config.get("db").asText()), SQLDialect.DEFAULT);
|
||||
final Database database = new Database(dslContext);
|
||||
|
||||
database.query(ctx -> ctx.fetch("CREATE SCHEMA TEST"));
|
||||
|
||||
|
||||
Reference in New Issue
Block a user