1
0
mirror of synced 2025-12-25 02:09:19 -05:00

6339: error when attempting to use azure sql database within an elastic pool as source for cdc based replication (#14121)

* 6339: implementation

* 6339: changelog updated

* 6339: definitions updated

* 6339: definitions reverted

* 6339: still struggling with publishing

* auto-bump connector version

* 6339: definitions reverted - correct

* auto-bump connector version

Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
Tuhai Maksym
2022-06-29 14:44:38 +03:00
committed by GitHub
parent 0a6a630bb4
commit a147b6f453
5 changed files with 17 additions and 7 deletions

View File

@@ -16,5 +16,5 @@ ENV APPLICATION source-mssql
COPY --from=build /airbyte /airbyte
LABEL io.airbyte.version=0.4.5
LABEL io.airbyte.version=0.4.8
LABEL io.airbyte.name=airbyte/source-mssql

View File

@@ -33,12 +33,13 @@ import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.CommonField;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.SyncMode;
import java.io.File;
import java.sql.JDBCType;
import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.time.Instant;
import java.util.ArrayList;
import java.util.List;
@@ -276,8 +277,16 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
protected void assertCdcSchemaQueryable(final JsonNode config, final JdbcDatabase database)
throws SQLException {
final List<JsonNode> queryResponse = database.queryJsons(connection -> {
final String sql = "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables";
final PreparedStatement ps = connection.prepareStatement(sql);
boolean isAzureSQL = false;
try (Statement stmt = connection.createStatement();
ResultSet editionRS = stmt.executeQuery("SELECT ServerProperty('Edition')")) {
isAzureSQL = editionRS.next() && "SQL Azure".equals(editionRS.getString(1));
}
// Azure SQL does not support USE clause
final String sql =
isAzureSQL ? "SELECT * FROM cdc.change_tables" : "USE " + config.get("database").asText() + "; SELECT * FROM cdc.change_tables"; final PreparedStatement ps = connection.prepareStatement(sql);
LOGGER.info(String.format(
"Checking user '%s' can query the cdc schema and that we have at least 1 cdc enabled table using the query: '%s'",
config.get("username").asText(), sql));