1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🎉Source MySQL - added option to connect using SSL (#6510)

* Source MySQL - added option to connect using SSL
This commit is contained in:
Eugene
2021-09-30 18:23:51 +03:00
committed by GitHub
parent 37542c0bae
commit 7698a50c74
10 changed files with 181 additions and 14 deletions

View File

@@ -48,6 +48,10 @@ public class MySqlSource extends AbstractJdbcSource implements Source {
public static final String MYSQL_DB_HISTORY = "mysql_db_history";
public static final String CDC_LOG_FILE = "_ab_cdc_log_file";
public static final String CDC_LOG_POS = "_ab_cdc_log_pos";
public static final List<String> SSL_PARAMETERS = List.of(
"useSSL=true",
"requireSSL=true",
"verifyServerCertificate=false");
public MySqlSource() {
super(DRIVER_CLASS, new MySqlJdbcStreamingQueryConfiguration());
@@ -163,18 +167,24 @@ public class MySqlSource extends AbstractJdbcSource implements Source {
@Override
public JsonNode toDatabaseConfig(JsonNode config) {
final StringBuilder jdbc_url = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
final StringBuilder jdbcUrl = new StringBuilder(String.format("jdbc:mysql://%s:%s/%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText()));
// see MySqlJdbcStreamingQueryConfiguration for more context on why useCursorFetch=true is needed.
jdbc_url.append("?useCursorFetch=true");
jdbcUrl.append("?useCursorFetch=true");
if (config.get("jdbc_url_params") != null && !config.get("jdbc_url_params").asText().isEmpty()) {
jdbc_url.append("&").append(config.get("jdbc_url_params").asText());
jdbcUrl.append("&").append(config.get("jdbc_url_params").asText());
}
if (config.has("ssl") && config.get("ssl").asBoolean()) {
jdbcUrl.append("&").append(String.join("&", SSL_PARAMETERS));
}
ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put("username", config.get("username").asText())
.put("jdbc_url", jdbc_url.toString());
.put("jdbc_url", jdbcUrl.toString());
if (config.has("password")) {
configBuilder.put("password", config.get("password").asText());

View File

@@ -49,6 +49,13 @@
"order": 6,
"default": "STANDARD",
"enum": ["STANDARD", "CDC"]
},
"ssl": {
"title": "SSL Connection",
"description": "Encrypt data using SSL.",
"type": "boolean",
"default": true,
"order": 7
}
}
}

View File

@@ -33,8 +33,8 @@ public class MySqlSourceAcceptanceTest extends SourceAcceptanceTest {
private static final String STREAM_NAME = "id_and_name";
private static final String STREAM_NAME2 = "public.starships";
private MySQLContainer<?> container;
private JsonNode config;
protected MySQLContainer<?> container;
protected JsonNode config;
@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {

View File

@@ -0,0 +1,79 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.source.mysql.MySqlSource.ReplicationMethod;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import org.jooq.SQLDialect;
import org.testcontainers.containers.MySQLContainer;
public class MySqlSslSourceAcceptanceTest extends MySqlSourceAcceptanceTest {
@Override
protected void setupEnvironment(TestDestinationEnv environment) throws Exception {
container = new MySQLContainer<>("mysql:8.0");
container.start();
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", container.getDatabaseName())
.put("username", container.getUsername())
.put("password", container.getPassword())
.put("ssl", true)
.put("replication_method", ReplicationMethod.STANDARD)
.build());
final Database database = Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:mysql://%s:%s/%s?%s",
config.get("host").asText(),
config.get("port").asText(),
config.get("database").asText(),
String.join("&", SSL_PARAMETERS)),
"com.mysql.cj.jdbc.Driver",
SQLDialect.MYSQL);
database.query(ctx -> {
ctx.fetch("CREATE TABLE id_and_name(id INTEGER, name VARCHAR(200));");
ctx.fetch(
"INSERT INTO id_and_name (id, name) VALUES (1,'picard'), (2, 'crusher'), (3, 'vash');");
ctx.fetch("CREATE TABLE starships(id INTEGER, name VARCHAR(200));");
ctx.fetch(
"INSERT INTO starships (id, name) VALUES (1,'enterprise-d'), (2, 'defiant'), (3, 'yamato');");
return null;
});
database.close();
}
}

View File

@@ -29,12 +29,12 @@ import org.testcontainers.containers.MySQLContainer;
class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
private static final String TEST_USER = "test";
private static final String TEST_PASSWORD = "test";
private static MySQLContainer<?> container;
protected static final String TEST_USER = "test";
protected static final String TEST_PASSWORD = "test";
protected static MySQLContainer<?> container;
private JsonNode config;
private Database database;
protected JsonNode config;
protected Database database;
@BeforeAll
static void init() throws SQLException {

View File

@@ -0,0 +1,70 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.source.mysql;
import static io.airbyte.integrations.source.mysql.MySqlSource.SSL_PARAMETERS;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.BeforeEach;
class MySqlSslJdbcSourceAcceptanceTest extends MySqlJdbcSourceAcceptanceTest {
@BeforeEach
public void setup() throws Exception {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", Strings.addRandomSuffix("db", "_", 10))
.put("username", TEST_USER)
.put("password", TEST_PASSWORD)
.put("ssl", true)
.build());
database = Databases.createDatabase(
config.get("username").asText(),
config.get("password").asText(),
String.format("jdbc:mysql://%s:%s?%s",
config.get("host").asText(),
config.get("port").asText(),
String.join("&", SSL_PARAMETERS)),
MySqlSource.DRIVER_CLASS,
SQLDialect.MYSQL);
database.query(ctx -> {
ctx.fetch("CREATE DATABASE " + config.get("database").asText());
ctx.fetch("SHOW STATUS LIKE 'Ssl_cipher'");
return null;
});
database.close();
super.setup();
}
}