MySQL Source: fixed unencrypted CDC connections (#18851)
* MySQL Source: fixed unencrypted CDC connections * updated changelog * bump version * auto-bump connector version Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
This commit is contained in:
@@ -813,7 +813,7 @@
|
||||
- name: MySQL
|
||||
sourceDefinitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
|
||||
dockerRepository: airbyte/source-mysql
|
||||
dockerImageTag: 1.0.10
|
||||
dockerImageTag: 1.0.11
|
||||
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
|
||||
icon: mysql.svg
|
||||
sourceType: database
|
||||
|
||||
@@ -7727,7 +7727,7 @@
|
||||
supportsNormalization: false
|
||||
supportsDBT: false
|
||||
supported_destination_sync_modes: []
|
||||
- dockerImage: "airbyte/source-mysql:1.0.10"
|
||||
- dockerImage: "airbyte/source-mysql:1.0.11"
|
||||
spec:
|
||||
documentationUrl: "https://docs.airbyte.com/integrations/sources/mysql"
|
||||
connectionSpecification:
|
||||
|
||||
@@ -16,6 +16,6 @@ ENV APPLICATION source-mysql-strict-encrypt
|
||||
|
||||
COPY --from=build /airbyte /airbyte
|
||||
|
||||
LABEL io.airbyte.version=1.0.10
|
||||
LABEL io.airbyte.version=1.0.11
|
||||
|
||||
LABEL io.airbyte.name=airbyte/source-mysql-strict-encrypt
|
||||
|
||||
@@ -16,6 +16,6 @@ ENV APPLICATION source-mysql
|
||||
|
||||
COPY --from=build /airbyte /airbyte
|
||||
|
||||
LABEL io.airbyte.version=1.0.10
|
||||
LABEL io.airbyte.version=1.0.11
|
||||
|
||||
LABEL io.airbyte.name=airbyte/source-mysql
|
||||
|
||||
@@ -67,7 +67,7 @@ public class MySqlCdcProperties {
|
||||
// https://debezium.io/documentation/reference/stable/connectors/mysql.html#mysql-property-database-ssl-mode
|
||||
if (!sourceConfig.has(JdbcUtils.SSL_KEY) || sourceConfig.get(JdbcUtils.SSL_KEY).asBoolean()) {
|
||||
if (dbConfig.has(SSL_MODE) && !dbConfig.get(SSL_MODE).asText().isEmpty()) {
|
||||
props.setProperty("database.sslmode", MySqlSource.toSslJdbcParamInternal(SslMode.valueOf(dbConfig.get(SSL_MODE).asText())));
|
||||
props.setProperty("database.ssl.mode", MySqlSource.toSslJdbcParamInternal(SslMode.valueOf(dbConfig.get(SSL_MODE).asText())));
|
||||
props.setProperty("database.history.producer.security.protocol", "SSL");
|
||||
props.setProperty("database.history.consumer.security.protocol", "SSL");
|
||||
|
||||
|
||||
@@ -0,0 +1,198 @@
|
||||
package io.airbyte.integrations.io.airbyte.integration_tests.sources;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Iterables;
|
||||
import com.google.common.collect.Lists;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.db.Database;
|
||||
import io.airbyte.db.factory.DSLContextFactory;
|
||||
import io.airbyte.db.factory.DatabaseDriver;
|
||||
import io.airbyte.db.jdbc.JdbcUtils;
|
||||
import io.airbyte.integrations.base.ssh.SshHelpers;
|
||||
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
|
||||
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
|
||||
import io.airbyte.protocol.models.AirbyteMessage;
|
||||
import io.airbyte.protocol.models.AirbyteRecordMessage;
|
||||
import io.airbyte.protocol.models.AirbyteStateMessage;
|
||||
import io.airbyte.protocol.models.CatalogHelpers;
|
||||
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
|
||||
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
|
||||
import io.airbyte.protocol.models.ConnectorSpecification;
|
||||
import io.airbyte.protocol.models.DestinationSyncMode;
|
||||
import io.airbyte.protocol.models.Field;
|
||||
import io.airbyte.protocol.models.JsonSchemaType;
|
||||
import io.airbyte.protocol.models.SyncMode;
|
||||
import org.jooq.DSLContext;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
|
||||
import java.util.List;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
import static io.airbyte.integrations.io.airbyte.integration_tests.sources.utils.TestConstants.INITIAL_CDC_WAITING_SECONDS;
|
||||
import static io.airbyte.protocol.models.SyncMode.INCREMENTAL;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
|
||||
public class CdcMySqlSslRequiredSourceAcceptanceTest extends SourceAcceptanceTest {
|
||||
private static final String STREAM_NAME = "id_and_name";
|
||||
private static final String STREAM_NAME2 = "starships";
|
||||
private MySQLContainer<?> container;
|
||||
private JsonNode config;
|
||||
|
||||
@Override
|
||||
protected String getImageName() {
|
||||
return "airbyte/source-mysql:dev";
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ConnectorSpecification getSpec() throws Exception {
|
||||
return SshHelpers.getSpecAndInjectSsh();
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonNode getConfig() {
|
||||
return config;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
|
||||
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
|
||||
new ConfiguredAirbyteStream()
|
||||
.withSyncMode(INCREMENTAL)
|
||||
.withDestinationSyncMode(DestinationSyncMode.APPEND)
|
||||
.withStream(CatalogHelpers.createAirbyteStream(
|
||||
String.format("%s", STREAM_NAME),
|
||||
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
|
||||
Field.of("id", JsonSchemaType.NUMBER),
|
||||
Field.of("name", JsonSchemaType.STRING))
|
||||
.withSourceDefinedCursor(true)
|
||||
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
|
||||
.withSupportedSyncModes(
|
||||
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL))),
|
||||
new ConfiguredAirbyteStream()
|
||||
.withSyncMode(INCREMENTAL)
|
||||
.withDestinationSyncMode(DestinationSyncMode.APPEND)
|
||||
.withStream(CatalogHelpers.createAirbyteStream(
|
||||
String.format("%s", STREAM_NAME2),
|
||||
String.format("%s", config.get(JdbcUtils.DATABASE_KEY).asText()),
|
||||
Field.of("id", JsonSchemaType.NUMBER),
|
||||
Field.of("name", JsonSchemaType.STRING))
|
||||
.withSourceDefinedCursor(true)
|
||||
.withSourceDefinedPrimaryKey(List.of(List.of("id")))
|
||||
.withSupportedSyncModes(
|
||||
Lists.newArrayList(SyncMode.FULL_REFRESH, INCREMENTAL)))));
|
||||
}
|
||||
|
||||
@Override
|
||||
protected JsonNode getState() {
|
||||
return null;
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void setupEnvironment(final TestDestinationEnv environment) throws Exception {
|
||||
container = new MySQLContainer<>("mysql:8.0");
|
||||
container.start();
|
||||
|
||||
final var sslMode = ImmutableMap.builder()
|
||||
.put(JdbcUtils.MODE_KEY, "required")
|
||||
.build();
|
||||
final JsonNode replicationMethod = Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put("method", "CDC")
|
||||
.put("initial_waiting_seconds", INITIAL_CDC_WAITING_SECONDS)
|
||||
.build());
|
||||
|
||||
config = Jsons.jsonNode(ImmutableMap.builder()
|
||||
.put(JdbcUtils.HOST_KEY, container.getHost())
|
||||
.put(JdbcUtils.PORT_KEY, container.getFirstMappedPort())
|
||||
.put(JdbcUtils.DATABASE_KEY, container.getDatabaseName())
|
||||
.put(JdbcUtils.USERNAME_KEY, container.getUsername())
|
||||
.put(JdbcUtils.PASSWORD_KEY, container.getPassword())
|
||||
.put(JdbcUtils.SSL_KEY, true)
|
||||
.put(JdbcUtils.SSL_MODE_KEY, sslMode)
|
||||
.put("replication_method", replicationMethod)
|
||||
.put("is_test", true)
|
||||
.build());
|
||||
|
||||
revokeAllPermissions();
|
||||
grantCorrectPermissions();
|
||||
alterUserRequireSsl();
|
||||
createAndPopulateTables();
|
||||
}
|
||||
|
||||
private void alterUserRequireSsl() {
|
||||
executeQuery("ALTER USER " + container.getUsername() + " REQUIRE SSL;");
|
||||
}
|
||||
|
||||
private void createAndPopulateTables() {
|
||||
executeQuery("CREATE TABLE id_and_name(id INTEGER PRIMARY KEY, name VARCHAR(200));");
|
||||
executeQuery(
|
||||
"INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
|
||||
executeQuery("CREATE TABLE starships(id INTEGER PRIMARY KEY, name VARCHAR(200));");
|
||||
executeQuery(
|
||||
"INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
|
||||
}
|
||||
|
||||
private void revokeAllPermissions() {
|
||||
executeQuery("REVOKE ALL PRIVILEGES, GRANT OPTION FROM " + container.getUsername() + "@'%';");
|
||||
}
|
||||
|
||||
private void grantCorrectPermissions() {
|
||||
executeQuery(
|
||||
"GRANT SELECT, RELOAD, SHOW DATABASES, REPLICATION SLAVE, REPLICATION CLIENT ON *.* TO "
|
||||
+ container.getUsername() + "@'%';");
|
||||
}
|
||||
|
||||
private void executeQuery(final String query) {
|
||||
try (final DSLContext dslContext = DSLContextFactory.create(
|
||||
"root",
|
||||
"test",
|
||||
DatabaseDriver.MYSQL.getDriverClassName(),
|
||||
String.format(DatabaseDriver.MYSQL.getUrlFormatString(),
|
||||
container.getHost(),
|
||||
container.getFirstMappedPort(),
|
||||
container.getDatabaseName()),
|
||||
SQLDialect.MYSQL)) {
|
||||
final Database database = new Database(dslContext);
|
||||
database.query(
|
||||
ctx -> ctx
|
||||
.execute(query));
|
||||
} catch (final Exception e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
protected void tearDown(final TestDestinationEnv testEnv) {
|
||||
container.close();
|
||||
}
|
||||
|
||||
@Test
|
||||
public void testIncrementalSyncShouldNotFailIfBinlogIsDeleted() throws Exception {
|
||||
final ConfiguredAirbyteCatalog configuredCatalog = withSourceDefinedCursors(getConfiguredCatalog());
|
||||
// only sync incremental streams
|
||||
configuredCatalog.setStreams(
|
||||
configuredCatalog.getStreams().stream().filter(s -> s.getSyncMode() == INCREMENTAL).collect(Collectors.toList()));
|
||||
|
||||
final List<AirbyteMessage> airbyteMessages = runRead(configuredCatalog, getState());
|
||||
final List<AirbyteRecordMessage> recordMessages = filterRecords(airbyteMessages);
|
||||
final List<AirbyteStateMessage> stateMessages = airbyteMessages
|
||||
.stream()
|
||||
.filter(m -> m.getType() == AirbyteMessage.Type.STATE)
|
||||
.map(AirbyteMessage::getState)
|
||||
.collect(Collectors.toList());
|
||||
assertFalse(recordMessages.isEmpty(), "Expected the first incremental sync to produce records");
|
||||
assertFalse(stateMessages.isEmpty(), "Expected incremental sync to produce STATE messages");
|
||||
|
||||
// when we run incremental sync again there should be no new records. Run a sync with the latest
|
||||
// state message and assert no records were emitted.
|
||||
final JsonNode latestState = Jsons.jsonNode(supportsPerStream() ? stateMessages : List.of(Iterables.getLast(stateMessages)));
|
||||
// RESET MASTER removes all binary log files that are listed in the index file,
|
||||
// leaving only a single, empty binary log file with a numeric suffix of .000001
|
||||
executeQuery("RESET MASTER;");
|
||||
|
||||
assertEquals(6, filterRecords(runRead(configuredCatalog, latestState)).size());
|
||||
}
|
||||
}
|
||||
@@ -252,6 +252,7 @@ WHERE actor_definition_id ='435bb9a5-7887-4809-aa58-28c27df0d7ad' AND (configura
|
||||
## Changelog
|
||||
| Version | Date | Pull Request | Subject |
|
||||
|:--------|:-----------|:-----------------------------------------------------------|:-------------------------------------------------------------------------------------------------------------------------------------------------|
|
||||
| 1.0.11 | 2022-11-03 | [18851](https://github.com/airbytehq/airbyte/pull/18851) | Fix bug with unencrypted CDC connections |
|
||||
| 1.0.10 | 2022-11-02 | [18619](https://github.com/airbytehq/airbyte/pull/18619) | Fix bug with handling Tinyint(1) Unsigned values as boolean |
|
||||
| 1.0.9 | 2022-10-31 | [18538](https://github.com/airbytehq/airbyte/pull/18538) | Encode database name |
|
||||
| 1.0.8 | 2022-10-25 | [18383](https://github.com/airbytehq/airbyte/pull/18383) | Better SSH error handling + messages |
|
||||
|
||||
Reference in New Issue
Block a user