1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Migrate Postgres and MySql to use new JdbcSource (#1307)

This commit is contained in:
Charles
2021-01-08 14:15:34 -08:00
committed by GitHub
parent d89673be0f
commit 102b432a5b
18 changed files with 592 additions and 328 deletions

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.1.5
LABEL io.airbyte.version=0.1.6
LABEL io.airbyte.name=airbyte/source-mysql

View File

@@ -2,7 +2,6 @@ plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
// todo: needs standard source test
}
application {
@@ -16,13 +15,15 @@ dependencies {
implementation project(':airbyte-integrations:connectors:source-jdbc')
implementation 'mysql:mysql-connector-java:8.0.22'
testImplementation 'org.testcontainers:mysql:1.15.1'
implementation 'org.apache.commons:commons-lang3:3.11'
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
testImplementation testFixtures(project(':airbyte-integrations:connectors:source-jdbc'))
testImplementation 'org.apache.commons:commons-lang3:3.11'
testImplementation 'org.testcontainers:mysql:1.15.1'
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}

View File

@@ -0,0 +1,53 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import io.airbyte.db.jdbc.JdbcStreamingQueryConfiguration;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
public class MySqlJdbcStreamingQueryConfiguration implements JdbcStreamingQueryConfiguration {
@Override
public void accept(Connection connection, PreparedStatement preparedStatement) throws SQLException {
// This is only respected if "useCursorFetch=true" is set in the connection. See the "resultset"
// section the MySql docs for more details.
// https://dev.mysql.com/doc/connector-j/8.0/en/connector-j-reference-implementation-notes.html.
// When using this approach MySql creates a temporary table which may have some effect on db
// performance.
// e.g. conn = DriverManager.getConnection("jdbc:mysql://localhost/?useCursorFetch=true", "user",
// "s3cr3t");
// We set userCursorFetch in MySqlSource.
connection.setAutoCommit(false);
preparedStatement.setFetchSize(1000);
// If for some reason, you cannot set useCursorFetch in the connection, fall back on this
// implementation below. It fetches records one at a time, which while inefficient, at least does
// not risk OOM.
// connection.setAutoCommit(false);
// preparedStatement.setFetchSize(Integer.MIN_VALUE);
}
}

View File

@@ -29,25 +29,27 @@ import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.AbstractJooqSource;
import java.util.List;
import org.jooq.SQLDialect;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class MySqlSource extends AbstractJooqSource implements Source {
public class MySqlSource extends AbstractJdbcSource implements Source {
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSource.class);
public static final String DRIVER_CLASS = "com.mysql.cj.jdbc.Driver";
public MySqlSource() {
super("com.mysql.cj.jdbc.Driver", SQLDialect.MYSQL);
super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration());
}
@Override
public JsonNode toJdbcConfig(JsonNode config) {
ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", String.format("jdbc:mysql://%s:%s/%s",
// see MySqlJdbcStreamingQueryConfiguration for more context on why useCursorFetch=true is needed.
.put("jdbc_url", String.format("jdbc:mysql://%s:%s/%s?useCursorFetch=true",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));
@@ -60,8 +62,8 @@ public class MySqlSource extends AbstractJooqSource implements Source {
}
@Override
protected List<String> getExcludedInternalSchemas() {
return List.of(
public Set<String> getExcludedInternalSchemas() {
return Set.of(
"information_schema",
"mysql",
"performance_schema",

View File

@@ -45,7 +45,7 @@ import java.util.List;
import org.jooq.SQLDialect;
import org.testcontainers.containers.MySQLContainer;
public class MySqlIntegrationTest extends StandardSourceTest {
public class MySqlSourceStandardTest extends StandardSourceTest {
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";

View File

@@ -1,207 +0,0 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import static org.mockito.Mockito.doCallRealMethod;
import static org.mockito.Mockito.spy;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.ConnectorSpecification;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.Field.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;
class MySqlSourceTest {
private static final String TEST_USER = "test";
private static final String TEST_PASSWORD = "test";
private static final String STREAM_NAME = "id_and_name";
private static MySQLContainer<?> container;
private JsonNode config;
@BeforeAll
static void init() {
// test containers withInitScript only accepts scripts that are mounted as resources.
MoreResources.writeResource("init.sql",
"CREATE USER '" + TEST_USER + "'@'%' IDENTIFIED BY '" + TEST_PASSWORD + "';\n"
+ "GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
container = new MySQLContainer<>("mysql:8.0").withInitScript("init.sql").withUsername("root").withPassword("");
container.start();
}
@BeforeEach
void setup() throws Exception {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", "db_" + RandomStringUtils.randomAlphabetic(10))
.put("username", TEST_USER)
.put("password", TEST_PASSWORD)
.build());
final Database database = Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:mysql://%s:%s",
config.get("host").asText(),
config.get("port").asText()),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL);
database.query(ctx -> {
ctx.fetch("CREATE DATABASE " + config.get("database").asText());
ctx.fetch("USE " + config.get("database").asText());
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
ctx.fetch("INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
return null;
});
database.close();
}
@AfterAll
static void cleanUp() {
container.close();
}
@Test
void testSpec() throws Exception {
final ConnectorSpecification actual = new MySqlSource().spec();
final String resourceString = MoreResources.readResource("spec.json");
final ConnectorSpecification expected = Jsons.deserialize(resourceString, ConnectorSpecification.class);
assertEquals(expected, actual);
}
@Test
void testCheckSuccess() {
final AirbyteConnectionStatus actual = new MySqlSource().check(config);
final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
assertEquals(expected, actual);
}
@Test
void testCheckFailure() {
((ObjectNode) config).put("password", "fake");
final AirbyteConnectionStatus actual = new MySqlSource().check(config);
final AirbyteConnectionStatus expected = new AirbyteConnectionStatus().withStatus(Status.FAILED)
.withMessage("Can't connect with provided configuration.");
assertEquals(expected, actual);
}
@Test
void testDiscover() throws Exception {
final AirbyteCatalog allStreams = new MySqlSource().discover(config);
// Filter out streams not related to this test case (from other tests running in parallel)
final AirbyteCatalog actual = new AirbyteCatalog()
.withStreams(allStreams.getStreams()
.stream()
.filter(s -> s.getName().equals(getStreamName()))
.collect(Collectors.toList()));
assertEquals(generateExpectedCatalog(), actual);
}
@Test
void testReadSuccess() throws Exception {
final Set<AirbyteMessage> actualMessages = new MySqlSource().read(config, generateConfiguredCatalog(), null).collect(Collectors.toSet());
actualMessages.forEach(r -> {
if (r.getRecord() != null) {
r.getRecord().setEmittedAt(null);
}
});
assertEquals(generateExpectedMessages(), actualMessages);
}
@SuppressWarnings("ResultOfMethodCallIgnored")
@Test
void testReadFailure() throws Exception {
final ConfiguredAirbyteStream spiedAbStream = spy(generateConfiguredCatalog().getStreams().get(0));
final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(spiedAbStream));
doCallRealMethod().doCallRealMethod().doThrow(new RuntimeException()).when(spiedAbStream).getStream();
final MySqlSource source = new MySqlSource();
assertThrows(RuntimeException.class, () -> source.read(config, catalog, null));
}
private AirbyteCatalog generateExpectedCatalog() {
return new AirbyteCatalog().withStreams(Lists.newArrayList(CatalogHelpers.createAirbyteStream(
getStreamName(),
Field.of("id", JsonSchemaPrimitive.NUMBER),
Field.of("name", JsonSchemaPrimitive.STRING))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL))));
}
private ConfiguredAirbyteCatalog generateConfiguredCatalog() {
return CatalogHelpers.toDefaultConfiguredCatalog(generateExpectedCatalog());
}
private java.util.HashSet<AirbyteMessage> generateExpectedMessages() {
final String streamName = getStreamName();
return Sets.newHashSet(
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withData(Jsons.jsonNode(ImmutableMap.of("id", 1, "name", "picard")))),
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withData(Jsons.jsonNode(ImmutableMap.of("id", 2, "name", "crusher")))),
new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withData(Jsons.jsonNode(ImmutableMap.of("id", 3, "name", "vash")))));
}
private String getStreamName() {
return String.format("%s.%s", config.get("database").asText(), STREAM_NAME);
}
}

View File

@@ -0,0 +1,122 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceStandardTest;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.testcontainers.containers.MySQLContainer;
class MySqlStandardSourceTest extends JdbcSourceStandardTest {
private static final String TEST_USER = "test";
private static final String TEST_PASSWORD = "test";
private static MySQLContainer<?> container;
private JsonNode config;
private Database database;
@BeforeAll
static void init() {
// test containers withInitScript only accepts scripts that are mounted as resources.
MoreResources.writeResource("init.sql",
"CREATE USER '" + TEST_USER + "'@'%' IDENTIFIED BY '" + TEST_PASSWORD + "';\n"
+ "GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
container = new MySQLContainer<>("mysql:8.0").withInitScript("init.sql").withUsername("root").withPassword("");
container.start();
}
@BeforeEach
public void setup() throws Exception {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", "db_" + RandomStringUtils.randomAlphabetic(10))
.put("username", TEST_USER)
.put("password", TEST_PASSWORD)
.build());
database = Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:mysql://%s:%s",
config.get("host").asText(),
config.get("port").asText()),
MySqlSource.DRIVER_CLASS,
SQLDialect.MYSQL);
database.query(ctx -> {
ctx.fetch("CREATE DATABASE " + config.get("database").asText());
return null;
});
database.close();
super.setup();
}
@AfterEach
void tearDown() throws Exception {
database.close();
}
@AfterAll
static void cleanUp() {
container.close();
}
// MySql does not support schemas in the way most dbs do. Instead we namespace by db name.
@Override
public Optional<String> getDefaultSchemaName() {
return Optional.of(config.get("database").asText());
}
@Override
public AbstractJdbcSource getSource() {
return new MySqlSource();
}
@Override
public String getDriverClass() {
return MySqlSource.DRIVER_CLASS;
}
@Override
public JsonNode getConfig() {
return Jsons.clone(config);
}
}

View File

@@ -0,0 +1,124 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcStressTest;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Disabled;
import org.testcontainers.containers.MySQLContainer;
@Disabled
class MySqlStressTest extends JdbcStressTest {
private static final String TEST_USER = "test";
private static final String TEST_PASSWORD = "test";
private static MySQLContainer<?> container;
private JsonNode config;
private Database database;
@BeforeAll
static void init() {
// test containers withInitScript only accepts scripts that are mounted as resources.
MoreResources.writeResource("init.sql",
"CREATE USER '" + TEST_USER + "'@'%' IDENTIFIED BY '" + TEST_PASSWORD + "';\n"
+ "GRANT ALL PRIVILEGES ON *.* TO '" + TEST_USER + "'@'%';\n");
container = new MySQLContainer<>("mysql:8.0").withInitScript("init.sql").withUsername("root").withPassword("");
container.start();
}
@BeforeEach
public void setup() throws Exception {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", "db_" + RandomStringUtils.randomAlphabetic(10))
.put("username", TEST_USER)
.put("password", TEST_PASSWORD)
.build());
database = Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:mysql://%s:%s",
config.get("host").asText(),
config.get("port").asText()),
MySqlSource.DRIVER_CLASS,
SQLDialect.MYSQL);
database.query(ctx -> {
ctx.fetch("CREATE DATABASE " + config.get("database").asText());
return null;
});
database.close();
super.setup();
}
@AfterEach
void tearDown() throws Exception {
database.close();
}
@AfterAll
static void cleanUp() {
container.close();
}
// MySql does not support schemas in the way most dbs do. Instead we namespace by db name.
@Override
public Optional<String> getDefaultSchemaName() {
return Optional.of(config.get("database").asText());
}
@Override
public AbstractJdbcSource getSource() {
return new MySqlSource();
}
@Override
public String getDriverClass() {
return MySqlSource.DRIVER_CLASS;
}
@Override
public JsonNode getConfig() {
return Jsons.clone(config);
}
}