1
0
mirror of synced 2025-12-23 21:03:15 -05:00

remove naming transformer from destination interface (#1953)

This commit is contained in:
Charles
2021-02-04 15:46:53 -08:00
committed by GitHub
parent aadfae24bd
commit 285c176a93
8 changed files with 40 additions and 60 deletions

View File

@@ -41,6 +41,7 @@ import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.NamingConventionTransformer;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
@@ -101,6 +102,8 @@ class PostgresDestinationTest {
Field.of("id", JsonSchemaPrimitive.STRING)),
CatalogHelpers.createConfiguredAirbyteStream(TASKS_STREAM_NAME, Field.of("goal", JsonSchemaPrimitive.STRING))));
private static final NamingConventionTransformer NAMING_TRANSFORMER = new PostgresSQLNameTransformer();
private PostgreSQLContainer<?> container;
private JsonNode config;
private Database database;
@@ -165,11 +168,11 @@ class PostgresDestinationTest {
consumer.accept(MESSAGE_STATE);
consumer.close();
Set<JsonNode> usersActual = recordRetriever(destination.getNamingTransformer().getRawTableName(USERS_STREAM_NAME));
Set<JsonNode> usersActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME));
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
Set<JsonNode> tasksActual = recordRetriever(destination.getNamingTransformer().getRawTableName(TASKS_STREAM_NAME));
Set<JsonNode> tasksActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME));
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
@@ -206,14 +209,14 @@ class PostgresDestinationTest {
consumer2.accept(messageUser3);
consumer2.close();
Set<JsonNode> usersActual = recordRetriever(destination.getNamingTransformer().getRawTableName(USERS_STREAM_NAME));
Set<JsonNode> usersActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME));
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(
MESSAGE_USERS1.getRecord().getData(),
MESSAGE_USERS2.getRecord().getData(),
messageUser3.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
Set<JsonNode> tasksActual = recordRetriever(destination.getNamingTransformer().getRawTableName(TASKS_STREAM_NAME));
Set<JsonNode> tasksActual = recordRetriever(NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME));
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
@@ -245,13 +248,13 @@ class PostgresDestinationTest {
consumer.accept(MESSAGE_STATE);
consumer.close();
final String schemaName = destination.getNamingTransformer().getIdentifier("new_schema");
String streamName = schemaName + "." + destination.getNamingTransformer().getRawTableName(USERS_STREAM_NAME);
final String schemaName = NAMING_TRANSFORMER.getIdentifier("new_schema");
String streamName = schemaName + "." + NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME);
Set<JsonNode> usersActual = recordRetriever(streamName);
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
streamName = schemaName + "." + destination.getNamingTransformer().getRawTableName(TASKS_STREAM_NAME);
streamName = schemaName + "." + NAMING_TRANSFORMER.getRawTableName(TASKS_STREAM_NAME);
Set<JsonNode> tasksActual = recordRetriever(streamName);
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
@@ -262,7 +265,7 @@ class PostgresDestinationTest {
.map(AirbyteStream::getName)
.collect(Collectors.toList()));
assertThrows(RuntimeException.class, () -> recordRetriever(destination.getNamingTransformer().getRawTableName(USERS_STREAM_NAME)));
assertThrows(RuntimeException.class, () -> recordRetriever(NAMING_TRANSFORMER.getRawTableName(USERS_STREAM_NAME)));
}
@SuppressWarnings("ResultOfMethodCallIgnored")
@@ -283,7 +286,7 @@ class PostgresDestinationTest {
final List<String> tableNames = CATALOG.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(s -> destination.getNamingTransformer().getRawTableName(s.getName()))
.map(s -> NAMING_TRANSFORMER.getRawTableName(s.getName()))
.collect(Collectors.toList());
assertTmpTablesNotPresent(CATALOG.getStreams()
.stream()