🎉 JDBC sources: store cursor record count in db state (#15535)
* Add cursor_record_count to db stream state * Add cursor record count to cursor info * Emit max cursor record count * Add original cursor record count * Unify logging format * Add backward compatible methods * Update unit tests for state decorating iterator * Update test (not done yet) * Fix one more unit test * Change where clause operator according to record count * Add branch for null cursor * Skip saving record count when it is 0 * Fix log wording * Set mock record count in test * Check cursor value instead of cursor info * Fix source jdbc test * Read record count from state * Fix tests * Add an acceptance test case * Fix npe * Change record count from int to long to avoid type conversion * Fix references * Fix oracle container * Use uppercase for snowflake * Use uppercase for db2 * Fix and use uppercase * Update test case to include the edge case * Format code * Remove extra assertion in clickhouse * Merge ms sql incremental query method * Log query for debugging * Clean up name_and_timestamp table * Fix db2 tests * Fix mssql tests * Fix oracle tests * Fix oracle tests * Fix cockroachdb tests * Fix snowflake tests * Add changelog * Fix mssql tests * Fix db2-strict-encrypt tests * Fix oracle-strict-encrypt tests * Bump postgres version * Fix oracle-strict-encrypt tests * auto-bump connector version [ci skip] Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
@@ -17,7 +17,6 @@ import com.microsoft.sqlserver.jdbc.SQLServerResultSetMetaData;
|
||||
import io.airbyte.commons.functional.CheckedConsumer;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.commons.util.AutoCloseableIterator;
|
||||
import io.airbyte.commons.util.AutoCloseableIterators;
|
||||
import io.airbyte.db.factory.DatabaseDriver;
|
||||
import io.airbyte.db.jdbc.JdbcDatabase;
|
||||
import io.airbyte.db.jdbc.JdbcUtils;
|
||||
@@ -37,6 +36,7 @@ import io.airbyte.protocol.models.CommonField;
|
||||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
|
||||
import io.airbyte.protocol.models.SyncMode;
|
||||
import java.io.File;
|
||||
import java.sql.Connection;
|
||||
import java.sql.JDBCType;
|
||||
import java.sql.PreparedStatement;
|
||||
import java.sql.ResultSet;
|
||||
@@ -49,7 +49,6 @@ import java.util.Map;
|
||||
import java.util.Optional;
|
||||
import java.util.Properties;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Stream;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -80,54 +79,13 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
|
||||
final String tableName) {
|
||||
LOGGER.info("Queueing query for table: {}", tableName);
|
||||
|
||||
final List<String> newIdentifiersList = getWrappedColumn(database,
|
||||
columnNames,
|
||||
schemaName, tableName, "\"");
|
||||
final String preparedSqlQuery = String
|
||||
.format("SELECT %s FROM %s", String.join(",", newIdentifiersList),
|
||||
getFullTableName(schemaName, tableName));
|
||||
final String newIdentifiers = getWrappedColumnNames(database, null, columnNames, schemaName, tableName);
|
||||
final String preparedSqlQuery = String.format("SELECT %s FROM %s", newIdentifiers, getFullTableName(schemaName, tableName));
|
||||
|
||||
LOGGER.info("Prepared SQL query for TableFullRefresh is: " + preparedSqlQuery);
|
||||
return queryTable(database, preparedSqlQuery);
|
||||
}
|
||||
|
||||
@Override
|
||||
public AutoCloseableIterator<JsonNode> queryTableIncremental(final JdbcDatabase database,
|
||||
final List<String> columnNames,
|
||||
final String schemaName,
|
||||
final String tableName,
|
||||
final String cursorField,
|
||||
final JDBCType cursorFieldType,
|
||||
final String cursorValue) {
|
||||
LOGGER.info("Queueing query for table: {}", tableName);
|
||||
return AutoCloseableIterators.lazyIterator(() -> {
|
||||
try {
|
||||
final Stream<JsonNode> stream = database.unsafeQuery(
|
||||
connection -> {
|
||||
LOGGER.info("Preparing query for table: {}", tableName);
|
||||
|
||||
final String identifierQuoteString = connection.getMetaData().getIdentifierQuoteString();
|
||||
final List<String> newColumnNames = getWrappedColumn(database, columnNames, schemaName, tableName, identifierQuoteString);
|
||||
|
||||
final String sql = String.format("SELECT %s FROM %s WHERE %s > ?",
|
||||
String.join(",", newColumnNames),
|
||||
sourceOperations.getFullyQualifiedTableNameWithQuoting(connection, schemaName, tableName),
|
||||
sourceOperations.enquoteIdentifier(connection, cursorField));
|
||||
LOGGER.info("Prepared SQL query for queryTableIncremental is: " + sql);
|
||||
|
||||
final PreparedStatement preparedStatement = connection.prepareStatement(sql);
|
||||
sourceOperations.setStatementField(preparedStatement, 1, cursorFieldType, cursorValue);
|
||||
LOGGER.info("Executing query for table: {}", tableName);
|
||||
return preparedStatement;
|
||||
},
|
||||
sourceOperations::rowToJson);
|
||||
return AutoCloseableIterators.fromStream(stream);
|
||||
} catch (final SQLException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
/**
|
||||
* There is no support for hierarchyid even in the native SQL Server JDBC driver. Its value can be
|
||||
* converted to a nvarchar(4000) data type by calling the ToString() method. So we make a separate
|
||||
@@ -137,13 +95,15 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
|
||||
*
|
||||
* @return the list with Column names updated to handle functions (if nay) properly
|
||||
*/
|
||||
private List<String> getWrappedColumn(final JdbcDatabase database,
|
||||
final List<String> columnNames,
|
||||
final String schemaName,
|
||||
final String tableName,
|
||||
final String enquoteSymbol) {
|
||||
@Override
|
||||
protected String getWrappedColumnNames(final JdbcDatabase database,
|
||||
final Connection connection,
|
||||
final List<String> columnNames,
|
||||
final String schemaName,
|
||||
final String tableName) {
|
||||
final List<String> hierarchyIdColumns = new ArrayList<>();
|
||||
try {
|
||||
final String identifierQuoteString = database.getMetaData().getIdentifierQuoteString();
|
||||
final SQLServerResultSetMetaData sqlServerResultSetMetaData = (SQLServerResultSetMetaData) database
|
||||
.queryMetadata(String
|
||||
.format("SELECT TOP 1 %s FROM %s", // only first row is enough to get field's type
|
||||
@@ -159,20 +119,20 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
|
||||
}
|
||||
}
|
||||
|
||||
// iterate through names and replace Hierarchyid field for query is with toString() function
|
||||
// Eventually would get columns like this: testColumn.toString as "testColumn"
|
||||
// toString function in SQL server is the only way to get human readable value, but not mssql
|
||||
// specific HEX value
|
||||
return String.join(", ", columnNames.stream()
|
||||
.map(
|
||||
el -> hierarchyIdColumns.contains(el) ? String
|
||||
.format("%s.ToString() as %s%s%s", el, identifierQuoteString, el, identifierQuoteString)
|
||||
: getIdentifierWithQuoting(el))
|
||||
.toList());
|
||||
} catch (final SQLException e) {
|
||||
LOGGER.error("Failed to fetch metadata to prepare a proper request.", e);
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
// iterate through names and replace Hierarchyid field for query is with toString() function
|
||||
// Eventually would get columns like this: testColumn.toString as "testColumn"
|
||||
// toString function in SQL server is the only way to get human readable value, but not mssql
|
||||
// specific HEX value
|
||||
return columnNames.stream()
|
||||
.map(
|
||||
el -> hierarchyIdColumns.contains(el) ? String
|
||||
.format("%s.ToString() as %s%s%s", el, enquoteSymbol, el, enquoteSymbol)
|
||||
: getIdentifierWithQuoting(el))
|
||||
.collect(toList());
|
||||
}
|
||||
|
||||
@Override
|
||||
@@ -245,15 +205,15 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
|
||||
}
|
||||
|
||||
@Override
|
||||
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(JdbcDatabase database) throws Exception {
|
||||
public List<TableInfo<CommonField<JDBCType>>> discoverInternal(final JdbcDatabase database) throws Exception {
|
||||
final List<TableInfo<CommonField<JDBCType>>> internals = super.discoverInternal(database);
|
||||
if (schemas != null && !schemas.isEmpty()) {
|
||||
// process explicitly filtered (from UI) schemas
|
||||
List<TableInfo<CommonField<JDBCType>>> resultInternals = internals
|
||||
final List<TableInfo<CommonField<JDBCType>>> resultInternals = internals
|
||||
.stream()
|
||||
.filter(this::isTableInRequestedSchema)
|
||||
.toList();
|
||||
for (TableInfo<CommonField<JDBCType>> info : resultInternals) {
|
||||
for (final TableInfo<CommonField<JDBCType>> info : resultInternals) {
|
||||
LOGGER.debug("Found table (schema: {}): {}", info.getNameSpace(), info.getName());
|
||||
}
|
||||
return resultInternals;
|
||||
@@ -263,7 +223,7 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
|
||||
}
|
||||
}
|
||||
|
||||
private boolean isTableInRequestedSchema(TableInfo<CommonField<JDBCType>> tableInfo) {
|
||||
private boolean isTableInRequestedSchema(final TableInfo<CommonField<JDBCType>> tableInfo) {
|
||||
return schemas
|
||||
.stream()
|
||||
.anyMatch(schema -> schema.equals(tableInfo.getNameSpace()));
|
||||
|
||||
@@ -38,6 +38,10 @@ public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
|
||||
|
||||
@BeforeAll
|
||||
static void init() {
|
||||
// In mssql, timestamp is generated automatically, so we need to use
|
||||
// the datetime type instead so that we can set the value manually.
|
||||
COL_TIMESTAMP_TYPE = "DATETIME";
|
||||
|
||||
dbContainer = new MSSQLServerContainer<>("mcr.microsoft.com/mssql/server:2019-latest").acceptLicense();
|
||||
dbContainer.start();
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user