🐛 Source MySQL: transform binary data base64 format (#8047)
* Source-MySql: transform binary data base64 format, add integration tests * Source-MySql: fix code style * Source-MySql: bump versions * Source-MySql: bump versions in source_specs.yaml * Source-MySql: added test for stream with binary data for DestinationAbstractTest * Source-MySql: added format
This commit is contained in:
@@ -34,6 +34,9 @@ public class MySqlCdcProperties {
|
||||
props.setProperty("snapshot.locking.mode", "none");
|
||||
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-include-schema-changes
|
||||
props.setProperty("include.schema.changes", "false");
|
||||
// This to make sure that binary data represented as a base64-encoded String.
|
||||
// https://debezium.io/documentation/reference/1.4/connectors/mysql.html#mysql-property-binary-handling-mode
|
||||
props.setProperty("binary.handling.mode", "base64");
|
||||
|
||||
return props;
|
||||
}
|
||||
|
||||
@@ -13,12 +13,20 @@ import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTes
|
||||
import io.airbyte.integrations.standardtest.source.TestDataHolder;
|
||||
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
|
||||
import io.airbyte.protocol.models.JsonSchemaPrimitive;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
|
||||
public class CdcMySqlSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(CdcMySqlSourceDatatypeTest.class);
|
||||
|
||||
private MySQLContainer<?> container;
|
||||
private JsonNode config;
|
||||
|
||||
@@ -288,18 +296,18 @@ public class CdcMySqlSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
addDataTypeTestData(
|
||||
TestDataHolder.builder()
|
||||
.sourceType("varbinary")
|
||||
.airbyteType(JsonSchemaPrimitive.STRING)
|
||||
.fullSourceDataType("varbinary(256)")
|
||||
.addInsertValues("null", "'test'", "'тест'")
|
||||
.addExpectedValues(null, "test", "тест")
|
||||
.airbyteType(JsonSchemaPrimitive.STRING_BINARY)
|
||||
.fullSourceDataType("varbinary(20000)") //// size should be enough to save test.png
|
||||
.addInsertValues("null", "'test'", "'тест'", String.format("FROM_BASE64('%s')", getFileDataInBase64()))
|
||||
.addExpectedValues(null, "dGVzdA==", "0YLQtdGB0YI=", getFileDataInBase64())
|
||||
.build());
|
||||
|
||||
addDataTypeTestData(
|
||||
TestDataHolder.builder()
|
||||
.sourceType("blob")
|
||||
.airbyteType(JsonSchemaPrimitive.STRING)
|
||||
.addInsertValues("null", "'test'", "'тест'")
|
||||
.addExpectedValues(null, "test", "тест")
|
||||
.airbyteType(JsonSchemaPrimitive.STRING_BINARY)
|
||||
.addInsertValues("null", "'test'", "'тест'", String.format("FROM_BASE64('%s')", getFileDataInBase64()))
|
||||
.addExpectedValues(null, "dGVzdA==", "0YLQtdGB0YI=", getFileDataInBase64())
|
||||
.build());
|
||||
|
||||
addDataTypeTestData(
|
||||
@@ -370,4 +378,14 @@ public class CdcMySqlSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
private String getFileDataInBase64() {
|
||||
File file = new File(getClass().getClassLoader().getResource("test.png").getFile());
|
||||
try {
|
||||
return Base64.encodeBase64String(FileUtils.readFileToByteArray(file));
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(String.format("Fail to read the file: %s. Error: %s", file.getAbsoluteFile(), e.getMessage()));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -14,12 +14,20 @@ import io.airbyte.integrations.standardtest.source.AbstractSourceDatabaseTypeTes
|
||||
import io.airbyte.integrations.standardtest.source.TestDataHolder;
|
||||
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
|
||||
import io.airbyte.protocol.models.JsonSchemaPrimitive;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import org.apache.commons.codec.binary.Base64;
|
||||
import org.apache.commons.io.FileUtils;
|
||||
import org.apache.commons.lang3.StringUtils;
|
||||
import org.jooq.SQLDialect;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
import org.testcontainers.containers.MySQLContainer;
|
||||
|
||||
public class MySqlSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(MySqlSourceDatatypeTest.class);
|
||||
|
||||
private MySQLContainer<?> container;
|
||||
private JsonNode config;
|
||||
|
||||
@@ -267,22 +275,18 @@ public class MySqlSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
addDataTypeTestData(
|
||||
TestDataHolder.builder()
|
||||
.sourceType("varbinary")
|
||||
.airbyteType(JsonSchemaPrimitive.STRING)
|
||||
.fullSourceDataType("varbinary(256)")
|
||||
.addInsertValues("null", "'test'")
|
||||
// @TODO Returns binary value instead of text
|
||||
// #5878 binary value issue
|
||||
// .addExpectedValues(null, "test")
|
||||
.airbyteType(JsonSchemaPrimitive.STRING_BINARY)
|
||||
.fullSourceDataType("varbinary(20000)")// size should be enough to save test.png
|
||||
.addInsertValues("null", "'test'", "'тест'", String.format("FROM_BASE64('%s')", getFileDataInBase64()))
|
||||
.addExpectedValues(null, "dGVzdA==", "0YLQtdGB0YI=", getFileDataInBase64())
|
||||
.build());
|
||||
|
||||
addDataTypeTestData(
|
||||
TestDataHolder.builder()
|
||||
.sourceType("blob")
|
||||
.airbyteType(JsonSchemaPrimitive.STRING)
|
||||
.addInsertValues("null", "'test'")
|
||||
// @TODO Returns binary value instead of text
|
||||
// #5878 binary value issue
|
||||
// .addExpectedValues(null, "test")
|
||||
.airbyteType(JsonSchemaPrimitive.STRING_BINARY)
|
||||
.addInsertValues("null", "'test'", "'тест'", String.format("FROM_BASE64('%s')", getFileDataInBase64()))
|
||||
.addExpectedValues(null, "dGVzdA==", "0YLQtdGB0YI=", getFileDataInBase64())
|
||||
.build());
|
||||
|
||||
addDataTypeTestData(
|
||||
@@ -354,4 +358,14 @@ public class MySqlSourceDatatypeTest extends AbstractSourceDatabaseTypeTest {
|
||||
return stringBuilder.toString();
|
||||
}
|
||||
|
||||
private String getFileDataInBase64() {
|
||||
File file = new File(getClass().getClassLoader().getResource("test.png").getFile());
|
||||
try {
|
||||
return Base64.encodeBase64String(FileUtils.readFileToByteArray(file));
|
||||
} catch (IOException e) {
|
||||
LOGGER.error(String.format("Fail to read the file: %s. Error: %s", file.getAbsoluteFile(), e.getMessage()));
|
||||
}
|
||||
return null;
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
Binary file not shown.
|
After Width: | Height: | Size: 17 KiB |
Reference in New Issue
Block a user