1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Set isResumeable flag in catalog (#38584)

Co-authored-by: Dhroov Makwana <pabloescoder@gmail.com>
Co-authored-by: Alexandre Girard <alexandre@airbyte.io>
Co-authored-by: btkcodedev <btk.codedev@gmail.com>
Co-authored-by: Gireesh Sreepathi <gisripa@gmail.com>
Co-authored-by: Yue Li <61070669+theyueli@users.noreply.github.com>
Co-authored-by: Augustin <augustin@airbyte.io>
Co-authored-by: Natik Gadzhi <natik@respawn.io>
Co-authored-by: Danylo Jablonski <150933663+DanyloGL@users.noreply.github.com>
Co-authored-by: Maxime Carbonneau-Leclerc <3360483+maxi297@users.noreply.github.com>
Co-authored-by: Oleksandr Bazarnov <oleksandr.bazarnov@globallogic.com>
Co-authored-by: Ben Church <ben@airbyte.io>
Co-authored-by: alafanechere <augustin.lafanechere@gmail.com>
Co-authored-by: Christo Grabowski <108154848+ChristoGrab@users.noreply.github.com>
Co-authored-by: Edward Gao <edward.gao@airbyte.io>
Co-authored-by: Catherine Noll <clnoll@users.noreply.github.com>
Co-authored-by: Audrey Maldonado <audrey.maldonado@gmail.com>
Co-authored-by: Rodi Reich Zilberman <867491+rodireich@users.noreply.github.com>
Co-authored-by: Serhii Lazebnyi <serhii.lazebnyi@globallogic.com>
Co-authored-by: Serhii Lazebnyi <53845333+lazebnyi@users.noreply.github.com>
This commit is contained in:
Xiaohan Song
2024-05-30 10:23:09 -07:00
committed by GitHub
parent 06ac9a0d33
commit 2d194fa359
34 changed files with 201 additions and 44 deletions

View File

@@ -3,7 +3,7 @@ plugins {
}
airbyteJavaConnector {
cdkVersionRequired = '0.36.2'
cdkVersionRequired = '0.36.3'
features = ['db-sources']
useLocalCdk = false
}

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: b5ea17b1-f170-46dc-bc31-cc744ca984c1
dockerImageTag: 4.0.26
dockerImageTag: 4.0.27
dockerRepository: airbyte/source-mssql
documentationUrl: https://docs.airbyte.com/integrations/sources/mssql
githubIssueLabel: source-mssql

View File

@@ -216,7 +216,7 @@ public class MssqlSource extends AbstractJdbcSource<JDBCType> implements Source
}
@Override
public AirbyteCatalog discover(final JsonNode config) throws Exception {
public AirbyteCatalog discover(final JsonNode config) {
final AirbyteCatalog catalog = super.discover(config);
if (MssqlCdcHelper.isCdc(config)) {

View File

@@ -185,6 +185,11 @@ public class CdcMssqlSourceTest extends CdcSourceTest<MssqlSource, MsSQLTestData
// Do nothing
}
@Override
protected void addIsResumableFlagForNonPkTable(final AirbyteStream stream) {
stream.setIsResumable(false);
}
// Utilize the setup to do test on MssqlDebeziumStateUtil.
@Test
public void testCdcSnapshot() {

View File

@@ -385,7 +385,8 @@ public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest<Mssq
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID))),
.withSourceDefinedPrimaryKey(List.of(List.of(COL_ID)))
.withIsResumable(true),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_WITHOUT_PK,
defaultNamespace,
@@ -393,7 +394,8 @@ public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest<Mssq
Field.of(COL_NAME, JsonSchemaType.STRING),
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(Collections.emptyList()),
.withSourceDefinedPrimaryKey(Collections.emptyList())
.withIsResumable(false),
CatalogHelpers.createAirbyteStream(
TABLE_NAME_COMPOSITE_PK,
defaultNamespace,
@@ -402,7 +404,8 @@ public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest<Mssq
Field.of(COL_UPDATED_AT, JsonSchemaType.STRING_DATE))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))));
List.of(List.of(COL_FIRST_NAME), List.of(COL_LAST_NAME)))
.withIsResumable(true)));
}
@Override

View File

@@ -35,7 +35,8 @@ class MssqlSourceTest {
Field.of("name", JsonSchemaType.STRING),
Field.of("born", JsonSchemaType.STRING_TIMESTAMP_WITH_TIMEZONE))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))
.withSourceDefinedPrimaryKey(List.of(List.of("id")))));
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
.withIsResumable(true)));
private MsSQLTestDatabase testdb;
@@ -69,7 +70,7 @@ class MssqlSourceTest {
// the column twice. we now de-duplicate it (pr: https://github.com/airbytehq/airbyte/pull/983).
// this tests that this de-duplication is successful.
@Test
void testDiscoverWithPk() throws Exception {
void testDiscoverWithPk() {
testdb
.with("ALTER TABLE id_and_name ADD CONSTRAINT i3pk PRIMARY KEY CLUSTERED (id);")
.with("CREATE INDEX i1 ON id_and_name (id);");
@@ -77,6 +78,13 @@ class MssqlSourceTest {
assertEquals(CATALOG, actual);
}
@Test
void testDiscoverWithoutPk() {
final AirbyteCatalog actual = source().discover(getConfig());
assertEquals(STREAM_NAME, actual.getStreams().get(0).getName());
assertEquals(false, actual.getStreams().get(0).getIsResumable());
}
@Test
@Disabled("See https://github.com/airbytehq/airbyte/pull/23908#issuecomment-1463753684, enable once communication is out")
public void testTableWithNullCursorValueShouldThrowException() throws Exception {