Better database connection handling for connectors (#12743)
* Better database connection handling for connectors * Log connection error * Properly close connection * Remove unused method * Close data source * Use utility to close data source * Use utility to close data source * PR feedback * Add Databricks driver * Use driver class enum * Use correct config * Ensure config created before use * Fix failing integration test * Create DSLContext before use * Address integration test failures * Ensure DSLContext is closed * Fix compile error * Use correct datasource * Use correct connection properties * Close DSLContext * Close DSLContext * Fix integration test failures * Properly close datasource * Fix compilation issues * Use existing database object * Wrap close in try/finally * Update test * Wrap close in try/finally * Ensure DSLContext is created * Revert change to test * Use correct data source * Remove unused import * More cleanup * Add missing annotation * Only initialize data source once * Remove unused import * Force testcontainers version * Fix testcontainer issue * Fix failing test * Properly close all data sources * Clear data sources after closing * Fix compile error * Fix compilation error * Add missing method
This commit is contained in:
@@ -17,6 +17,7 @@ import io.airbyte.integrations.standardtest.destination.comparator.TestDataCompa
|
||||
import java.sql.SQLException;
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.testcontainers.containers.PostgreSQLContainer;
|
||||
|
||||
@@ -111,21 +112,21 @@ public class PostgresDestinationAcceptanceTest extends JdbcDestinationAcceptance
|
||||
}
|
||||
|
||||
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
|
||||
return new Database(
|
||||
DSLContextFactory.create(
|
||||
db.getUsername(),
|
||||
db.getPassword(),
|
||||
DatabaseDriver.POSTGRESQL.getDriverClassName(),
|
||||
db.getJdbcUrl(),
|
||||
SQLDialect.POSTGRES
|
||||
)
|
||||
).query(ctx -> {
|
||||
ctx.execute("set time zone 'UTC';");
|
||||
return ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
|
||||
.stream()
|
||||
.map(this::getJsonFromRecord)
|
||||
.collect(Collectors.toList());
|
||||
});
|
||||
try (final DSLContext dslContext = DSLContextFactory.create(
|
||||
db.getUsername(),
|
||||
db.getPassword(),
|
||||
DatabaseDriver.POSTGRESQL.getDriverClassName(),
|
||||
db.getJdbcUrl(),
|
||||
SQLDialect.POSTGRES)) {
|
||||
return new Database(dslContext)
|
||||
.query(ctx -> {
|
||||
ctx.execute("set time zone 'UTC';");
|
||||
return ctx.fetch(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT))
|
||||
.stream()
|
||||
.map(this::getJsonFromRecord)
|
||||
.collect(Collectors.toList());
|
||||
});
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
|
||||
Reference in New Issue
Block a user