From e15ae56389dd63c8161df342b732b4fccf736003 Mon Sep 17 00:00:00 2001 From: LiRen Tu Date: Tue, 3 May 2022 13:45:02 -0700 Subject: [PATCH] Close all unsafe queries (#12495) * Add helper methods to return lists of json nodes * Close all unsafe queries * Add one more helper method * Simplify helper names * Format code --- .../java/io/airbyte/db/jdbc/JdbcDatabase.java | 46 ++++++++++++++--- .../db/jdbc/TestDefaultJdbcDatabase.java | 29 +++++------ .../db/jdbc/TestStreamingJdbcDatabase.java | 49 ++++++++---------- ...estinationStrictEncryptAcceptanceTest.java | 17 +++---- .../ClickhouseDestinationAcceptanceTest.java | 17 +++---- ...shClickhouseDestinationAcceptanceTest.java | 22 ++++---- .../MariadbColumnstoreSqlOperations.java | 25 ++++------ ...bColumnstoreDestinationAcceptanceTest.java | 33 +++++------- .../destination/mysql/MySQLSqlOperations.java | 17 +++---- ...trictEncryptDestinationAcceptanceTest.java | 16 +++--- .../NneOracleDestinationAcceptanceTest.java | 9 ++-- ...ryptedOracleDestinationAcceptanceTest.java | 4 +- .../source/clickhouse/ClickHouseSource.java | 8 ++- .../source/cockroachdb/CockroachDbSource.java | 8 +-- .../Db2Source.java | 8 +-- .../source/mssql/MssqlSource.java | 43 ++++++++-------- .../source/mysql/MySqlCdcTargetPosition.java | 28 +++++------ .../mysql/helpers/CdcConfigurationHelper.java | 50 +++++++++---------- .../OracleSourceNneAcceptanceTest.java | 8 +-- .../oracle/OracleSourceNneAcceptanceTest.java | 19 +++---- .../source/postgres/PostgresSource.java | 24 ++++----- 21 files changed, 235 insertions(+), 245 deletions(-) diff --git a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java index 2170c847bd4..2ace4f22321 100644 --- a/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java +++ b/airbyte-db/lib/src/main/java/io/airbyte/db/jdbc/JdbcDatabase.java @@ -44,9 +44,7 @@ public abstract class JdbcDatabase extends SqlDatabase { @Override public void execute(final String sql) throws SQLException { - execute(connection -> { - connection.createStatement().execute(sql); - }); + execute(connection -> connection.createStatement().execute(sql)); } public void executeWithinTransaction(final List queries) throws SQLException { @@ -127,6 +125,18 @@ public abstract class JdbcDatabase extends SqlDatabase { CheckedFunction recordTransform) throws SQLException; + /** + * String query is a common use case for {@link JdbcDatabase#unsafeResultSetQuery}. So this method + * is created as syntactic sugar. + */ + public List queryStrings(final CheckedFunction query, + final CheckedFunction recordTransform) + throws SQLException { + try (final Stream stream = unsafeResultSetQuery(query, recordTransform)) { + return stream.toList(); + } + } + /** * Use a connection to create a {@link PreparedStatement} and map it into a stream. You CANNOT * assume that data will be returned from this method before the entire {@link ResultSet} is @@ -148,8 +158,21 @@ public abstract class JdbcDatabase extends SqlDatabase { CheckedFunction recordTransform) throws SQLException; + /** + * Json query is a common use case for + * {@link JdbcDatabase#unsafeQuery(CheckedFunction, CheckedFunction)}. So this method is created as + * syntactic sugar. + */ + public List queryJsons(final CheckedFunction statementCreator, + final CheckedFunction recordTransform) + throws SQLException { + try (final Stream stream = unsafeQuery(statementCreator, recordTransform)) { + return stream.toList(); + } + } + public int queryInt(final String sql, final String... params) throws SQLException { - try (final Stream q = unsafeQuery(c -> { + try (final Stream stream = unsafeQuery(c -> { PreparedStatement statement = c.prepareStatement(sql); int i = 1; for (String param : params) { @@ -157,9 +180,8 @@ public abstract class JdbcDatabase extends SqlDatabase { ++i; } return statement; - }, - rs -> rs.getInt(1))) { - return q.findFirst().get(); + }, rs -> rs.getInt(1))) { + return stream.findFirst().get(); } } @@ -181,6 +203,16 @@ public abstract class JdbcDatabase extends SqlDatabase { }, sourceOperations::rowToJson); } + /** + * Json query is a common use case for {@link JdbcDatabase#unsafeQuery(String, String...)}. So this + * method is created as syntactic sugar. + */ + public List queryJsons(final String sql, final String... params) throws SQLException { + try (final Stream stream = unsafeQuery(sql, params)) { + return stream.toList(); + } + } + public ResultSetMetaData queryMetadata(final String sql, final String... params) throws SQLException { try (final Stream q = unsafeQuery(c -> { PreparedStatement statement = c.prepareStatement(sql); diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java index 43448e3d44d..0661c5b29af 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestDefaultJdbcDatabase.java @@ -16,7 +16,6 @@ import io.airbyte.db.Databases; import io.airbyte.test.utils.PostgreSQLContainerHelper; import java.sql.SQLException; import java.util.List; -import java.util.stream.Collectors; import java.util.stream.Stream; import org.junit.jupiter.api.AfterAll; import org.junit.jupiter.api.AfterEach; @@ -34,9 +33,8 @@ public class TestDefaultJdbcDatabase { Jsons.jsonNode(ImmutableMap.of("id", 3, "name", "vash"))); private static PostgreSQLContainer PSQL_DB; - - private JdbcDatabase database; private final JdbcSourceOperations sourceOperations = JdbcUtils.getDefaultSourceOperations(); + private JdbcDatabase database; @BeforeAll static void init() { @@ -44,6 +42,11 @@ public class TestDefaultJdbcDatabase { PSQL_DB.start(); } + @AfterAll + static void cleanUp() { + PSQL_DB.close(); + } + @BeforeEach void setup() throws Exception { final String dbName = Strings.addRandomSuffix("db", "_", 10); @@ -65,11 +68,6 @@ public class TestDefaultJdbcDatabase { database.close(); } - @AfterAll - static void cleanUp() { - PSQL_DB.close(); - } - @Test void testBufferedResultQuery() throws SQLException { final List actual = database.bufferedResultSetQuery( @@ -81,22 +79,19 @@ public class TestDefaultJdbcDatabase { @Test void testResultSetQuery() throws SQLException { - final Stream actual = database.unsafeResultSetQuery( + try (final Stream actual = database.unsafeResultSetQuery( connection -> connection.createStatement().executeQuery("SELECT * FROM id_and_name;"), - sourceOperations::rowToJson); - final List actualAsList = actual.collect(Collectors.toList()); - actual.close(); - - assertEquals(RECORDS_AS_JSON, actualAsList); + sourceOperations::rowToJson)) { + assertEquals(RECORDS_AS_JSON, actual.toList()); + } } @Test void testQuery() throws SQLException { - final Stream actual = database.unsafeQuery( + final List actual = database.queryJsons( connection -> connection.prepareStatement("SELECT * FROM id_and_name;"), sourceOperations::rowToJson); - - assertEquals(RECORDS_AS_JSON, actual.collect(Collectors.toList())); + assertEquals(RECORDS_AS_JSON, actual); } private JdbcDatabase getDatabaseFromConfig(final JsonNode config) { diff --git a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java index e5e8b350755..cac812dd71c 100644 --- a/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java +++ b/airbyte-db/lib/src/test/java/io/airbyte/db/jdbc/TestStreamingJdbcDatabase.java @@ -24,7 +24,6 @@ import java.util.HashSet; import java.util.List; import java.util.Set; import java.util.concurrent.atomic.AtomicReference; -import java.util.stream.Stream; import org.apache.commons.dbcp2.BasicDataSource; import org.elasticsearch.common.collect.Map; import org.junit.jupiter.api.AfterAll; @@ -95,20 +94,17 @@ public class TestStreamingJdbcDatabase { // invoked. final AtomicReference connection1 = new AtomicReference<>(); final AtomicReference ps1 = new AtomicReference<>(); - try (final Stream actual = streamingJdbcDatabase.unsafeQuery( - connection -> { - connection1.set(connection); - final PreparedStatement ps = connection.prepareStatement("SELECT * FROM id_and_name;"); - ps1.set(ps); - return ps; - }, - sourceOperations::rowToJson)) { - final List expectedRecords = Lists.newArrayList( - Jsons.jsonNode(Map.of("id", 1, "name", "picard")), - Jsons.jsonNode(Map.of("id", 2, "name", "crusher")), - Jsons.jsonNode(Map.of("id", 3, "name", "vash"))); - assertEquals(expectedRecords, actual.toList()); - } + final List actual = streamingJdbcDatabase.queryJsons(connection -> { + connection1.set(connection); + final PreparedStatement ps = connection.prepareStatement("SELECT * FROM id_and_name;"); + ps1.set(ps); + return ps; + }, sourceOperations::rowToJson); + final List expectedRecords = Lists.newArrayList( + Jsons.jsonNode(Map.of("id", 1, "name", "picard")), + Jsons.jsonNode(Map.of("id", 2, "name", "crusher")), + Jsons.jsonNode(Map.of("id", 3, "name", "vash"))); + assertEquals(expectedRecords, actual); } /** @@ -131,7 +127,7 @@ public class TestStreamingJdbcDatabase { final AtomicReference connection1 = new AtomicReference<>(); final AtomicReference ps1 = new AtomicReference<>(); final Set fetchSizes = new HashSet<>(); - try (final Stream actual = streamingJdbcDatabase.unsafeQuery( + final List actual = streamingJdbcDatabase.queryJsons( connection -> { connection1.set(connection); final PreparedStatement ps = connection.prepareStatement("SELECT * FROM id_and_name;"); @@ -141,18 +137,17 @@ public class TestStreamingJdbcDatabase { resultSet -> { fetchSizes.add(resultSet.getFetchSize()); return sourceOperations.rowToJson(resultSet); - })) { - assertEquals(20, actual.count()); + }); + assertEquals(20, actual.size()); - // Two fetch sizes should be set on the result set, one is the initial sample size, - // and the other is smaller than the initial value because of the large row. - // This check assumes that FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE = 200 MB. - // Update this check if the buffer size constant is changed. - assertEquals(2, fetchSizes.size()); - final List sortedSizes = fetchSizes.stream().sorted().toList(); - assertTrue(sortedSizes.get(0) < FetchSizeConstants.INITIAL_SAMPLE_SIZE); - assertEquals(FetchSizeConstants.INITIAL_SAMPLE_SIZE, sortedSizes.get(1)); - } + // Two fetch sizes should be set on the result set, one is the initial sample size, + // and the other is smaller than the initial value because of the large row. + // This check assumes that FetchSizeConstants.TARGET_BUFFER_BYTE_SIZE = 200 MB. + // Update this check if the buffer size constant is changed. + assertEquals(2, fetchSizes.size()); + final List sortedSizes = fetchSizes.stream().sorted().toList(); + assertTrue(sortedSizes.get(0) < FetchSizeConstants.INITIAL_SAMPLE_SIZE); + assertEquals(FetchSizeConstants.INITIAL_SAMPLE_SIZE, sortedSizes.get(1)); } private JsonNode getConfig(final PostgreSQLContainer psqlDb, final String dbName) { diff --git a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java index d423eb2f9a6..ee22f15849f 100644 --- a/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationStrictEncryptAcceptanceTest.java @@ -97,10 +97,10 @@ public class ClickhouseDestinationStrictEncryptAcceptanceTest extends Destinatio } @Override - protected List retrieveRecords(TestDestinationEnv testEnv, - String streamName, - String namespace, - JsonNode streamSchema) + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() @@ -110,9 +110,8 @@ public class ClickhouseDestinationStrictEncryptAcceptanceTest extends Destinatio private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { final JdbcDatabase jdbcDB = getDatabase(getConfig()); - return jdbcDB.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .collect(Collectors.toList()); + final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + return jdbcDB.queryJsons(query); } @Override @@ -141,7 +140,7 @@ public class ClickhouseDestinationStrictEncryptAcceptanceTest extends Destinatio } @Override - protected void setup(TestDestinationEnv testEnv) { + protected void setup(final TestDestinationEnv testEnv) { db = (ClickHouseContainer) new ClickHouseContainer("yandex/clickhouse-server") .withExposedPorts(HTTP_PORT, NATIVE_PORT, HTTPS_PORT, NATIVE_SECURE_PORT) .withClasspathResourceMapping("config.xml", "/etc/clickhouse-server/config.xml", BindMode.READ_ONLY) @@ -156,7 +155,7 @@ public class ClickhouseDestinationStrictEncryptAcceptanceTest extends Destinatio } @Override - protected void tearDown(TestDestinationEnv testEnv) { + protected void tearDown(final TestDestinationEnv testEnv) { db.stop(); db.close(); } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java index d897950a23e..a1a16aff1bc 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/ClickhouseDestinationAcceptanceTest.java @@ -88,10 +88,10 @@ public class ClickhouseDestinationAcceptanceTest extends DestinationAcceptanceTe } @Override - protected List retrieveRecords(TestDestinationEnv testEnv, - String streamName, - String namespace, - JsonNode streamSchema) + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() @@ -101,9 +101,8 @@ public class ClickhouseDestinationAcceptanceTest extends DestinationAcceptanceTe private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { final JdbcDatabase jdbcDB = getDatabase(getConfig()); - return jdbcDB.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .collect(Collectors.toList()); + final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + return jdbcDB.queryJsons(query); } @Override @@ -131,13 +130,13 @@ public class ClickhouseDestinationAcceptanceTest extends DestinationAcceptanceTe } @Override - protected void setup(TestDestinationEnv testEnv) { + protected void setup(final TestDestinationEnv testEnv) { db = new ClickHouseContainer("yandex/clickhouse-server"); db.start(); } @Override - protected void tearDown(TestDestinationEnv testEnv) { + protected void tearDown(final TestDestinationEnv testEnv) { db.stop(); db.close(); } diff --git a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java index dd6b640fdfb..a6ab946dfdb 100644 --- a/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-clickhouse/src/test-integration/java/io/airbyte/integrations/destination/clickhouse/SshClickhouseDestinationAcceptanceTest.java @@ -6,7 +6,6 @@ package io.airbyte.integrations.destination.clickhouse; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.node.ObjectNode; -import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.db.Databases; import io.airbyte.db.jdbc.JdbcDatabase; @@ -86,10 +85,10 @@ public abstract class SshClickhouseDestinationAcceptanceTest extends Destination } @Override - protected List retrieveRecords(TestDestinationEnv testEnv, - String streamName, - String namespace, - JsonNode streamSchema) + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() @@ -102,10 +101,11 @@ public abstract class SshClickhouseDestinationAcceptanceTest extends Destination getConfig(), ClickhouseDestination.HOST_KEY, ClickhouseDestination.PORT_KEY, - (CheckedFunction, Exception>) mangledConfig -> getDatabase(mangledConfig) - .unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .collect(Collectors.toList())); + mangledConfig -> { + final JdbcDatabase database = getDatabase(mangledConfig); + final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + return database.queryJsons(query); + }); } @Override @@ -133,14 +133,14 @@ public abstract class SshClickhouseDestinationAcceptanceTest extends Destination } @Override - protected void setup(TestDestinationEnv testEnv) { + protected void setup(final TestDestinationEnv testEnv) { bastion.initAndStartBastion(); db = (ClickHouseContainer) new ClickHouseContainer("yandex/clickhouse-server").withNetwork(bastion.getNetWork()); db.start(); } @Override - protected void tearDown(TestDestinationEnv testEnv) { + protected void tearDown(final TestDestinationEnv testEnv) { bastion.stopAndCloseContainers(db); } diff --git a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java index 4460ca17e08..6dd6c3abc75 100644 --- a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java +++ b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/main/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreSqlOperations.java @@ -17,13 +17,9 @@ import java.sql.Statement; import java.util.List; import java.util.regex.Matcher; import java.util.regex.Pattern; -import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public class MariadbColumnstoreSqlOperations extends JdbcSqlOperations { - private static final Logger LOGGER = LoggerFactory.getLogger(MariadbColumnstoreSqlOperations.class); private final String MINIMUM_VERSION = "5.5.3"; Pattern VERSION_PATTERN = Pattern.compile("^(\\d+\\.\\d+\\.\\d+)-MariaDB"); private boolean isLocalFileEnabled = false; @@ -70,7 +66,7 @@ public class MariadbColumnstoreSqlOperations extends JdbcSqlOperations { @Override public void executeTransaction(final JdbcDatabase database, final List queries) throws Exception { database.execute(connection -> { - try (Statement stmt = connection.createStatement()) { + try (final Statement stmt = connection.createStatement()) { stmt.addBatch("BEGIN;"); for (final String query : queries) { stmt.addBatch(query); @@ -103,13 +99,15 @@ public class MariadbColumnstoreSqlOperations extends JdbcSqlOperations { } private Semver getVersion(final JdbcDatabase database) throws SQLException { - final List value = database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SELECT version()"), - resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); - Matcher matcher = VERSION_PATTERN.matcher(value.get(0)); + final List versions = database.queryStrings( + connection -> connection.createStatement().executeQuery("SELECT version()"), + resultSet -> resultSet.getString("version()")); + + final Matcher matcher = VERSION_PATTERN.matcher(versions.get(0)); if (matcher.find()) { return new Semver(matcher.group(1)); } else { - throw new RuntimeException(String.format("Unexpected version string: %s\nExpected version format is X.X.X-MariaDB", value.get(0))); + throw new RuntimeException(String.format("Unexpected version string: %s\nExpected version format is X.X.X-MariaDB", versions.get(0))); } } @@ -122,11 +120,10 @@ public class MariadbColumnstoreSqlOperations extends JdbcSqlOperations { } private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQLException { - final List value = - database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), - resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); - - return value.get(0).equalsIgnoreCase("on"); + final List localFiles = database.queryStrings( + connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), + resultSet -> resultSet.getString("Value")); + return localFiles.get(0).equalsIgnoreCase("on"); } private void tryEnableLocalFile(final JdbcDatabase database) throws SQLException { diff --git a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/test-integration/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/test-integration/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestinationAcceptanceTest.java index 8098ab53ae4..c2b193d2280 100644 --- a/airbyte-integrations/connectors/destination-mariadb-columnstore/src/test-integration/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-mariadb-columnstore/src/test-integration/java/io/airbyte/integrations/destination/mariadb_columnstore/MariadbColumnstoreDestinationAcceptanceTest.java @@ -17,19 +17,13 @@ import io.airbyte.integrations.standardtest.destination.comparator.TestDataCompa import java.sql.SQLException; import java.util.List; import java.util.stream.Collectors; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; import org.testcontainers.containers.MariaDBContainer; import org.testcontainers.utility.DockerImageName; public class MariadbColumnstoreDestinationAcceptanceTest extends DestinationAcceptanceTest { - private static final Logger LOGGER = LoggerFactory.getLogger(MariadbColumnstoreDestinationAcceptanceTest.class); - private final ExtendedNameTransformer namingResolver = new MariadbColumnstoreNameTransformer(); - private JsonNode configJson; - private MariaDBContainer db; @Override @@ -89,10 +83,10 @@ public class MariadbColumnstoreDestinationAcceptanceTest extends DestinationAcce } @Override - protected List retrieveRecords(TestDestinationEnv testEnv, - String streamName, - String namespace, - JsonNode streamSchema) + protected List retrieveRecords(final TestDestinationEnv testEnv, + final String streamName, + final String namespace, + final JsonNode streamSchema) throws Exception { return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName), namespace) .stream() @@ -101,10 +95,9 @@ public class MariadbColumnstoreDestinationAcceptanceTest extends DestinationAcce } private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { - JdbcDatabase database = getDatabase(getConfig()); - return database.unsafeQuery(String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, - JavaBaseConstants.COLUMN_NAME_EMITTED_AT)) - .collect(Collectors.toList()); + final JdbcDatabase database = getDatabase(getConfig()); + final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC;", schemaName, tableName, JavaBaseConstants.COLUMN_NAME_EMITTED_AT); + return database.queryJsons(query); } private static JdbcDatabase getDatabase(final JsonNode config) { @@ -119,19 +112,19 @@ public class MariadbColumnstoreDestinationAcceptanceTest extends DestinationAcce } @Override - protected void setup(TestDestinationEnv testEnv) throws Exception { - DockerImageName mcsImage = DockerImageName.parse("fengdi/columnstore:1.5.2").asCompatibleSubstituteFor("mariadb"); + protected void setup(final TestDestinationEnv testEnv) throws Exception { + final DockerImageName mcsImage = DockerImageName.parse("fengdi/columnstore:1.5.2").asCompatibleSubstituteFor("mariadb"); db = new MariaDBContainer(mcsImage); db.start(); - String createUser = String.format("CREATE USER '%s'@'%%' IDENTIFIED BY '%s';", db.getUsername(), db.getPassword()); - String grantAll = String.format("GRANT ALL PRIVILEGES ON *.* TO '%s'@'%%' IDENTIFIED BY '%s';", db.getUsername(), db.getPassword()); - String createDb = String.format("CREATE DATABASE %s DEFAULT CHARSET = utf8;", db.getDatabaseName()); + final String createUser = String.format("CREATE USER '%s'@'%%' IDENTIFIED BY '%s';", db.getUsername(), db.getPassword()); + final String grantAll = String.format("GRANT ALL PRIVILEGES ON *.* TO '%s'@'%%' IDENTIFIED BY '%s';", db.getUsername(), db.getPassword()); + final String createDb = String.format("CREATE DATABASE %s DEFAULT CHARSET = utf8;", db.getDatabaseName()); db.execInContainer("mariadb", "-e", createUser + grantAll + createDb); } @Override - protected void tearDown(TestDestinationEnv testEnv) { + protected void tearDown(final TestDestinationEnv testEnv) { db.stop(); db.close(); } diff --git a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java index 181fb8d8bf9..78957bf88ac 100644 --- a/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java +++ b/airbyte-integrations/connectors/destination-mysql/src/main/java/io/airbyte/integrations/destination/mysql/MySQLSqlOperations.java @@ -16,7 +16,6 @@ import java.nio.file.Files; import java.sql.SQLException; import java.sql.Statement; import java.util.List; -import java.util.stream.Collectors; public class MySQLSqlOperations extends JdbcSqlOperations { @@ -100,9 +99,10 @@ public class MySQLSqlOperations extends JdbcSqlOperations { } private double getVersion(final JdbcDatabase database) throws SQLException { - final List value = database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("select version()"), - resultSet -> resultSet.getString("version()")).collect(Collectors.toList()); - return Double.parseDouble(value.get(0).substring(0, 3)); + final List versions = database.queryStrings( + connection -> connection.createStatement().executeQuery("select version()"), + resultSet -> resultSet.getString("version()")); + return Double.parseDouble(versions.get(0).substring(0, 3)); } VersionCompatibility isCompatibleVersion(final JdbcDatabase database) throws SQLException { @@ -116,11 +116,10 @@ public class MySQLSqlOperations extends JdbcSqlOperations { } private boolean checkIfLocalFileIsEnabled(final JdbcDatabase database) throws SQLException { - final List value = - database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), - resultSet -> resultSet.getString("Value")).collect(Collectors.toList()); - - return value.get(0).equalsIgnoreCase("on"); + final List localFiles = database.queryStrings( + connection -> connection.createStatement().executeQuery("SHOW GLOBAL VARIABLES LIKE 'local_infile'"), + resultSet -> resultSet.getString("Value")); + return localFiles.get(0).equalsIgnoreCase("on"); } @Override diff --git a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java index 6cfde501324..be74f4c06cf 100644 --- a/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/destination/oracle_strict_encrypt/OracleStrictEncryptDestinationAcceptanceTest.java @@ -112,12 +112,8 @@ public class OracleStrictEncryptDestinationAcceptanceTest extends DestinationAcc private List retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException { - final List result = getDatabase(config) - .query(ctx -> ctx.fetch( - String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, - OracleDestination.COLUMN_NAME_EMITTED_AT)) - .stream() - .collect(Collectors.toList())); + final String query = String.format("SELECT * FROM %s.%s ORDER BY %s ASC", schemaName, tableName, OracleDestination.COLUMN_NAME_EMITTED_AT); + final List result = getDatabase(config).query(ctx -> ctx.fetch(query).stream().toList()); return result .stream() .map(r -> r.formatJSON(JSON_FORMAT)) @@ -180,9 +176,9 @@ public class OracleStrictEncryptDestinationAcceptanceTest extends DestinationAcc JdbcUtils.parseJdbcParameters("oracle.net.encryption_client=REQUIRED;" + "oracle.net.encryption_types_client=( " + algorithm + " )", ";")); - final String network_service_banner = + final String networkServiceBanner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); + final List collect = database.queryJsons(networkServiceBanner); assertThat(collect.get(2).get("NETWORK_SERVICE_BANNER").asText(), equals("Oracle Advanced Security: " + algorithm + " encryption")); @@ -205,8 +201,8 @@ public class OracleStrictEncryptDestinationAcceptanceTest extends DestinationAcc JdbcUtils.parseJdbcParameters("oracle.net.encryption_client=REQUIRED;" + "oracle.net.encryption_types_client=( " + algorithm + " )", ";")); - final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; - final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); + final String networkServiceBanner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; + final List collect = database.queryJsons(networkServiceBanner); assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); } diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java index 8a65723e2a0..69794489206 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/NneOracleDestinationAcceptanceTest.java @@ -16,7 +16,6 @@ import io.airbyte.db.jdbc.JdbcDatabase; import java.sql.SQLException; import java.util.List; import java.util.Map; -import java.util.stream.Collectors; import org.junit.Test; public class NneOracleDestinationAcceptanceTest extends UnencryptedOracleDestinationAcceptanceTest { @@ -40,9 +39,9 @@ public class NneOracleDestinationAcceptanceTest extends UnencryptedOracleDestina "oracle.jdbc.driver.OracleDriver", getAdditionalProperties(algorithm)); - final String network_service_banner = + final String networkServiceBanner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.unsafeQuery(network_service_banner).toList(); + final List collect = database.queryJsons(networkServiceBanner); assertThat(collect.get(2).get("NETWORK_SERVICE_BANNER").asText(), equals("Oracle Advanced Security: " + algorithm + " encryption")); @@ -73,8 +72,8 @@ public class NneOracleDestinationAcceptanceTest extends UnencryptedOracleDestina "oracle.jdbc.driver.OracleDriver", getAdditionalProperties(algorithm)); - final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; - final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); + final String networkServiceBanner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; + final List collect = database.queryJsons(networkServiceBanner); assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); } diff --git a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java index 1342c57dafd..1ec98f5aab5 100644 --- a/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java +++ b/airbyte-integrations/connectors/destination-oracle/src/test-integration/java/io/airbyte/integrations/destination/oracle/UnencryptedOracleDestinationAcceptanceTest.java @@ -180,9 +180,9 @@ public class UnencryptedOracleDestinationAcceptanceTest extends DestinationAccep config.get("sid").asText()), "oracle.jdbc.driver.OracleDriver"); - final String network_service_banner = + final String networkServiceBanner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); + final List collect = database.queryJsons(networkServiceBanner); assertTrue(collect.get(1).get("NETWORK_SERVICE_BANNER").asText() .contains("Oracle Advanced Security: encryption")); diff --git a/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java b/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java index 3fd083e4387..abc10ec86ef 100644 --- a/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java +++ b/airbyte-integrations/connectors/source-clickhouse/src/main/java/io/airbyte/integrations/source/clickhouse/ClickHouseSource.java @@ -46,18 +46,16 @@ public class ClickHouseSource extends AbstractJdbcSource implements So final List>> tableInfos) { return tableInfos.stream() .collect(Collectors.toMap( - tableInfo -> sourceOperations - .getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()), + tableInfo -> sourceOperations.getFullyQualifiedTableName(tableInfo.getNameSpace(), tableInfo.getName()), tableInfo -> { try { - return database.unsafeResultSetQuery(connection -> { + return database.queryStrings(connection -> { final String sql = "SELECT name FROM system.columns WHERE database = ? AND table = ? AND is_in_primary_key = 1"; final PreparedStatement preparedStatement = connection.prepareStatement(sql); preparedStatement.setString(1, tableInfo.getNameSpace()); preparedStatement.setString(2, tableInfo.getName()); return preparedStatement.executeQuery(); - - }, resultSet -> resultSet.getString("name")).collect(Collectors.toList()); + }, resultSet -> resultSet.getString("name")); } catch (final SQLException e) { throw new RuntimeException(e); } diff --git a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java index dfe64063cf5..571091d4046 100644 --- a/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java +++ b/airbyte-integrations/connectors/source-cockroachdb/src/main/java/io/airbyte/integrations/source/cockroachdb/CockroachDbSource.java @@ -29,6 +29,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Set; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -101,10 +102,9 @@ public class CockroachDbSource extends AbstractJdbcSource { @Override public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException { - return database - .unsafeQuery(getPrivileges(database), sourceOperations::rowToJson) - .map(this::getPrivilegeDto) - .collect(Collectors.toSet()); + try (final Stream stream = database.unsafeQuery(getPrivileges(database), sourceOperations::rowToJson)) { + return stream.map(this::getPrivilegeDto).collect(Collectors.toSet()); + } } @Override diff --git a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java index f48de8c9e2d..d817b10f6ac 100644 --- a/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java +++ b/airbyte-integrations/connectors/source-db2/src/main/java/io.airbyte.integrations.source.db2/Db2Source.java @@ -26,6 +26,7 @@ import java.util.List; import java.util.Set; import java.util.concurrent.TimeUnit; import java.util.stream.Collectors; +import java.util.stream.Stream; import org.apache.commons.lang3.RandomStringUtils; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -89,10 +90,9 @@ public class Db2Source extends AbstractJdbcSource implements Source { @Override public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException { - return database - .unsafeQuery(getPrivileges(), sourceOperations::rowToJson) - .map(this::getPrivilegeDto) - .collect(Collectors.toSet()); + try (final Stream stream = database.unsafeQuery(getPrivileges(), sourceOperations::rowToJson)) { + return stream.map(this::getPrivilegeDto).collect(Collectors.toSet()); + } } @Override diff --git a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java index 3af72f59dea..d8b97821ea4 100644 --- a/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java +++ b/airbyte-integrations/connectors/source-mssql/src/main/java/io/airbyte/integrations/source/mssql/MssqlSource.java @@ -100,15 +100,12 @@ public class MssqlSource extends AbstractJdbcSource implements Source connection -> { LOGGER.info("Preparing query for table: {}", tableName); - final String identifierQuoteString = connection.getMetaData() - .getIdentifierQuoteString(); - final List newColumnNames = getWrappedColumn(database, - columnNames, schemaName, tableName, identifierQuoteString); + final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString(); + final List newColumnNames = getWrappedColumn(database, columnNames, schemaName, tableName, identifierQuoteString); final String sql = String.format("SELECT %s FROM %s WHERE %s > ?", String.join(",", newColumnNames), - sourceOperations - .getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), + sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName), sourceOperations.enquoteIdentifier(connection, cursorField)); LOGGER.info("Prepared SQL query for queryTableIncremental is: " + sql); @@ -251,15 +248,15 @@ public class MssqlSource extends AbstractJdbcSource implements Source protected void assertCdcEnabledInDb(final JsonNode config, final JdbcDatabase database) throws SQLException { - final List queryResponse = database.unsafeQuery(connection -> { + final List queryResponse = database.queryJsons(connection -> { final String sql = "SELECT name, is_cdc_enabled FROM sys.databases WHERE name = ?"; final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("database").asText()); - LOGGER - .info(String.format("Checking that cdc is enabled on database '%s' using the query: '%s'", - config.get("database").asText(), sql)); + LOGGER.info(String.format("Checking that cdc is enabled on database '%s' using the query: '%s'", + config.get("database").asText(), sql)); return ps; - }, sourceOperations::rowToJson).collect(toList()); + }, sourceOperations::rowToJson); + if (queryResponse.size() < 1) { throw new RuntimeException(String.format( "Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).", @@ -274,15 +271,15 @@ public class MssqlSource extends AbstractJdbcSource implements Source protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabase database) throws SQLException { - final List queryResponse = database.unsafeQuery(connection -> { - final String sql = - "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; + final List queryResponse = database.queryJsons(connection -> { + final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; final PreparedStatement ps = connection.prepareStatement(sql); LOGGER.info(String.format( "Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'", config.get("username").asText(), sql)); return ps; - }, sourceOperations::rowToJson).collect(toList()); + }, sourceOperations::rowToJson); + // Ensure at least one available CDC table if (queryResponse.size() < 1) { throw new RuntimeException( @@ -293,22 +290,21 @@ public class MssqlSource extends AbstractJdbcSource implements Source // todo: ensure this works for Azure managed SQL (since it uses different sql server agent) protected void assertSqlServerAgentRunning(final JdbcDatabase database) throws SQLException { try { - final List queryResponse = database.unsafeQuery(connection -> { + final List queryResponse = database.queryJsons(connection -> { final String sql = "SELECT status_desc FROM sys.dm_server_services WHERE [servicename] LIKE 'SQL Server Agent%' OR [servicename] LIKE 'SQL Server 代理%' "; final PreparedStatement ps = connection.prepareStatement(sql); - LOGGER.info(String - .format("Checking that the SQL Server Agent is running using the query: '%s'", sql)); + LOGGER.info(String.format("Checking that the SQL Server Agent is running using the query: '%s'", sql)); return ps; - }, sourceOperations::rowToJson).collect(toList()); + }, sourceOperations::rowToJson); + if (!(queryResponse.get(0).get("status_desc").toString().contains("Running"))) { throw new RuntimeException(String.format( "The SQL Server Agent is not running. Current state: '%s'. Please check the documentation on ensuring SQL Server Agent is running.", queryResponse.get(0).get("status_desc").toString())); } } catch (final Exception e) { - if (e.getCause() != null && e.getCause().getClass() - .equals(com.microsoft.sqlserver.jdbc.SQLServerException.class)) { + if (e.getCause() != null && e.getCause().getClass().equals(com.microsoft.sqlserver.jdbc.SQLServerException.class)) { LOGGER.warn(String.format( "Skipping check for whether the SQL Server Agent is running, SQLServerException thrown: '%s'", e.getMessage())); @@ -320,7 +316,7 @@ public class MssqlSource extends AbstractJdbcSource implements Source protected void assertSnapshotIsolationAllowed(final JsonNode config, final JdbcDatabase database) throws SQLException { - final List queryResponse = database.unsafeQuery(connection -> { + final List queryResponse = database.queryJsons(connection -> { final String sql = "SELECT name, snapshot_isolation_state FROM sys.databases WHERE name = ?"; final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("database").asText()); @@ -328,7 +324,8 @@ public class MssqlSource extends AbstractJdbcSource implements Source "Checking that snapshot isolation is enabled on database '%s' using the query: '%s'", config.get("database").asText(), sql)); return ps; - }, sourceOperations::rowToJson).collect(toList()); + }, sourceOperations::rowToJson); + if (queryResponse.size() < 1) { throw new RuntimeException(String.format( "Couldn't find '%s' in sys.databases table. Please check the spelling and that the user has relevant permissions (see docs).", diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java index de02f827e0e..4c5901676e1 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/MySqlCdcTargetPosition.java @@ -11,7 +11,7 @@ import io.airbyte.integrations.debezium.internals.SnapshotMetadata; import java.sql.SQLException; import java.util.List; import java.util.Objects; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -28,8 +28,7 @@ public class MySqlCdcTargetPosition implements CdcTargetPosition { @Override public boolean equals(final Object obj) { - if (obj instanceof MySqlCdcTargetPosition) { - final MySqlCdcTargetPosition cdcTargetPosition = (MySqlCdcTargetPosition) obj; + if (obj instanceof final MySqlCdcTargetPosition cdcTargetPosition) { return fileName.equals(cdcTargetPosition.fileName) && cdcTargetPosition.position.equals(position); } return false; @@ -46,20 +45,19 @@ public class MySqlCdcTargetPosition implements CdcTargetPosition { } public static MySqlCdcTargetPosition targetPosition(final JdbcDatabase database) { - try { - final List masterStatus = database.unsafeResultSetQuery( - connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"), - resultSet -> { - final String file = resultSet.getString("File"); - final int position = resultSet.getInt("Position"); - if (file == null || position == 0) { - return new MySqlCdcTargetPosition(null, null); - } - return new MySqlCdcTargetPosition(file, position); - }).collect(Collectors.toList()); + try (final Stream stream = database.unsafeResultSetQuery( + connection -> connection.createStatement().executeQuery("SHOW MASTER STATUS"), + resultSet -> { + final String file = resultSet.getString("File"); + final int position = resultSet.getInt("Position"); + if (file == null || position == 0) { + return new MySqlCdcTargetPosition(null, null); + } + return new MySqlCdcTargetPosition(file, position); + })) { + final List masterStatus = stream.toList(); final MySqlCdcTargetPosition targetPosition = masterStatus.get(0); LOGGER.info("Target File position : " + targetPosition); - return targetPosition; } catch (final SQLException e) { throw new RuntimeException(e); diff --git a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java index b2df3e91225..f4c55d6257d 100644 --- a/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java +++ b/airbyte-integrations/connectors/source-mysql/src/main/java/io/airbyte/integrations/source/mysql/helpers/CdcConfigurationHelper.java @@ -4,8 +4,6 @@ package io.airbyte.integrations.source.mysql.helpers; -import static java.util.stream.Collectors.toList; - import com.fasterxml.jackson.databind.JsonNode; import io.airbyte.commons.functional.CheckedConsumer; import io.airbyte.commons.json.Jsons; @@ -15,7 +13,7 @@ import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Optional; -import java.util.stream.Collectors; +import java.util.stream.Stream; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -43,14 +41,14 @@ public class CdcConfigurationHelper { * @param offset - saved cdc offset with required binlog file * @param database - database */ - public static void checkBinlog(JsonNode offset, JdbcDatabase database) { - Optional binlogOptional = getBinlog(offset); + public static void checkBinlog(final JsonNode offset, final JdbcDatabase database) { + final Optional binlogOptional = getBinlog(offset); binlogOptional.ifPresent(binlog -> { if (isBinlogAvailable(binlog, database)) { LOGGER.info(""" Binlog %s is available""".formatted(binlog)); } else { - String error = + final String error = """ Binlog %s is not available. This is a critical error, it means that requested binlog is not present on mysql server. To fix data synchronization you need to reset your data. Please check binlog retention policy configurations.""" .formatted(binlog); @@ -73,46 +71,44 @@ public class CdcConfigurationHelper { } - private static boolean isBinlogAvailable(String binlog, JdbcDatabase database) { - try { - List binlogs = database.unsafeResultSetQuery(connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"), - resultSet -> resultSet.getString("Log_name")).collect(Collectors.toList()); + private static boolean isBinlogAvailable(final String binlog, final JdbcDatabase database) { + if (binlog.isEmpty()) { + return false; + } - return !binlog.isEmpty() && binlogs.stream().anyMatch(e -> e.equals(binlog)); - } catch (SQLException e) { + try (final Stream binlogs = database.unsafeResultSetQuery( + connection -> connection.createStatement().executeQuery("SHOW BINARY LOGS"), + resultSet -> resultSet.getString("Log_name"))) { + return binlogs.anyMatch(e -> e.equals(binlog)); + } catch (final SQLException e) { LOGGER.error("Can not get binlog list. Error: ", e); throw new RuntimeException(e); } } - private static Optional getBinlog(JsonNode offset) { - JsonNode node = offset.get(CDC_OFFSET); - Iterator> fields = node.fields(); + private static Optional getBinlog(final JsonNode offset) { + final JsonNode node = offset.get(CDC_OFFSET); + final Iterator> fields = node.fields(); while (fields.hasNext()) { - Map.Entry jsonField = fields.next(); + final Map.Entry jsonField = fields.next(); return Optional.ofNullable(Jsons.deserialize(jsonField.getValue().asText()).path("file").asText()); } return Optional.empty(); } - private static CheckedConsumer getCheckOperation(String name, String value) { + private static CheckedConsumer getCheckOperation(final String name, final String value) { return database -> { - final List result = database.unsafeResultSetQuery(connection -> { - final String sql = """ - show variables where Variable_name = '%s'""".formatted(name); - - return connection.createStatement().executeQuery(sql); - }, resultSet -> resultSet.getString("Value")).collect(toList()); + final List result = database.queryStrings( + connection -> connection.createStatement().executeQuery(String.format("show variables where Variable_name = '%s'", name)), + resultSet -> resultSet.getString("Value")); if (result.size() != 1) { - throw new RuntimeException(""" - Could not query the variable %s""".formatted(name)); + throw new RuntimeException("Could not query the variable " + name); } final String resultValue = result.get(0); if (!resultValue.equalsIgnoreCase(value)) { - throw new RuntimeException(""" - The variable %s should be set to %s, but it is : %s""".formatted(name, value, resultValue)); + throw new RuntimeException(String.format("The variable \"%s\" should be set to \"%s\", but it is \"%s\"", name, value, resultValue)); } }; } diff --git a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java index f012936d464..720226fffbc 100644 --- a/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle-strict-encrypt/src/test-integration/java/io/airbyte/integrations/source/oracle_strict_encrypt/OracleSourceNneAcceptanceTest.java @@ -42,9 +42,9 @@ public class OracleSourceNneAcceptanceTest extends OracleStrictEncryptSourceAcce "oracle.net.encryption_types_client=( " + algorithm + " )")); - final String network_service_banner = + final String networkServiceBanner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.unsafeQuery(network_service_banner).toList(); + final List collect = database.queryJsons(networkServiceBanner); assertTrue(collect.get(2).get("NETWORK_SERVICE_BANNER").asText() .contains(algorithm + " Encryption")); @@ -71,8 +71,8 @@ public class OracleSourceNneAcceptanceTest extends OracleStrictEncryptSourceAcce JdbcUtils.parseJdbcParameters("oracle.net.encryption_client=REQUIRED;" + "oracle.net.encryption_types_client=( " + algorithm + " )", ";")); - final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; - final List collect = database.unsafeQuery(network_service_banner).toList(); + final String networkServiceBanner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; + final List collect = database.queryJsons(networkServiceBanner); assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); } diff --git a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceNneAcceptanceTest.java b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceNneAcceptanceTest.java index e67716e4550..aaabf381fcf 100644 --- a/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceNneAcceptanceTest.java +++ b/airbyte-integrations/connectors/source-oracle/src/test-integration/java/io/airbyte/integrations/source/oracle/OracleSourceNneAcceptanceTest.java @@ -16,7 +16,6 @@ import io.airbyte.db.jdbc.JdbcDatabase; import io.airbyte.db.jdbc.JdbcUtils; import java.sql.SQLException; import java.util.List; -import java.util.stream.Collectors; import org.junit.jupiter.api.Test; public class OracleSourceNneAcceptanceTest extends OracleSourceAcceptanceTest { @@ -43,12 +42,11 @@ public class OracleSourceNneAcceptanceTest extends OracleSourceAcceptanceTest { "oracle.net.encryption_types_client=( " + algorithm + " )")); - final String network_service_banner = + final String networkServiceBanner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); + final List collect = database.queryJsons(networkServiceBanner); - assertTrue(collect.get(2).get("NETWORK_SERVICE_BANNER").asText() - .contains(algorithm + " Encryption")); + assertTrue(collect.get(2).get("NETWORK_SERVICE_BANNER").asText().contains(algorithm + " Encryption")); } @Test @@ -62,12 +60,11 @@ public class OracleSourceNneAcceptanceTest extends OracleSourceAcceptanceTest { config.get("sid").asText()), "oracle.jdbc.driver.OracleDriver"); - final String network_service_banner = + final String networkServiceBanner = "select network_service_banner from v$session_connect_info where sid in (select distinct sid from v$mystat)"; - final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); + final List collect = database.queryJsons(networkServiceBanner); - assertTrue(collect.get(1).get("NETWORK_SERVICE_BANNER").asText() - .contains("Encryption service")); + assertTrue(collect.get(1).get("NETWORK_SERVICE_BANNER").asText().contains("Encryption service")); } @Test @@ -92,8 +89,8 @@ public class OracleSourceNneAcceptanceTest extends OracleSourceAcceptanceTest { "oracle.net.encryption_types_client=( " + algorithm + " )")); - final String network_service_banner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; - final List collect = database.unsafeQuery(network_service_banner).collect(Collectors.toList()); + final String networkServiceBanner = "SELECT sys_context('USERENV', 'NETWORK_PROTOCOL') as network_protocol FROM dual"; + final List collect = database.queryJsons(networkServiceBanner); assertEquals("tcp", collect.get(0).get("NETWORK_PROTOCOL").asText()); } diff --git a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java index bbc397b0cd2..8ea0f7dacd7 100644 --- a/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java +++ b/airbyte-integrations/connectors/source-postgres/src/main/java/io/airbyte/integrations/source/postgres/PostgresSource.java @@ -14,6 +14,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; import com.google.common.collect.ImmutableMap; import io.airbyte.commons.functional.CheckedConsumer; +import io.airbyte.commons.functional.CheckedFunction; import io.airbyte.commons.json.Jsons; import io.airbyte.commons.util.AutoCloseableIterator; import io.airbyte.db.jdbc.JdbcDatabase; @@ -33,6 +34,7 @@ import io.airbyte.protocol.models.AirbyteStream; import io.airbyte.protocol.models.CommonField; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.SyncMode; +import java.sql.Connection; import java.sql.JDBCType; import java.sql.PreparedStatement; import java.sql.SQLException; @@ -155,7 +157,7 @@ public class PostgresSource extends AbstractJdbcSource implements Sour if (isCdc(config)) { checkOperations.add(database -> { - final List matchingSlots = database.unsafeQuery(connection -> { + final List matchingSlots = database.queryJsons(connection -> { final String sql = "SELECT * FROM pg_replication_slots WHERE slot_name = ? AND plugin = ? AND database = ?"; final PreparedStatement ps = connection.prepareStatement(sql); ps.setString(1, config.get("replication_method").get("replication_slot").asText()); @@ -166,7 +168,7 @@ public class PostgresSource extends AbstractJdbcSource implements Sour "Attempting to find the named replication slot using the query: " + ps.toString()); return ps; - }, sourceOperations::rowToJson).collect(toList()); + }, sourceOperations::rowToJson); if (matchingSlots.size() != 1) { throw new RuntimeException( @@ -177,15 +179,12 @@ public class PostgresSource extends AbstractJdbcSource implements Sour }); checkOperations.add(database -> { - final List matchingPublications = database.unsafeQuery(connection -> { - final PreparedStatement ps = connection - .prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?"); + final List matchingPublications = database.queryJsons(connection -> { + final PreparedStatement ps = connection.prepareStatement("SELECT * FROM pg_publication WHERE pubname = ?"); ps.setString(1, config.get("replication_method").get("publication").asText()); - - LOGGER.info("Attempting to find the publication using the query: " + ps.toString()); - + LOGGER.info("Attempting to find the publication using the query: " + ps); return ps; - }, sourceOperations::rowToJson).collect(toList()); + }, sourceOperations::rowToJson); if (matchingPublications.size() != 1) { throw new RuntimeException( @@ -274,7 +273,7 @@ public class PostgresSource extends AbstractJdbcSource implements Sour public Set getPrivilegesTableForCurrentUser(final JdbcDatabase database, final String schema) throws SQLException { - return database.unsafeQuery(connection -> { + final CheckedFunction statementCreator = connection -> { final PreparedStatement ps = connection.prepareStatement( """ SELECT DISTINCT table_catalog, @@ -316,8 +315,9 @@ public class PostgresSource extends AbstractJdbcSource implements Sour ps.setString(2, username); ps.setString(3, username); return ps; - }, sourceOperations::rowToJson) - .collect(toSet()) + }; + + return database.queryJsons(statementCreator, sourceOperations::rowToJson) .stream() .map(e -> JdbcPrivilegeDto.builder() .schemaName(e.get("table_schema").asText())