1
0
mirror of synced 2025-12-23 21:03:15 -05:00

Destination redshift dv2: Update type mapping (#33307)

This commit is contained in:
Edward Gao
2023-12-11 11:57:46 -08:00
committed by GitHub
parent f5ae28fc0c
commit 382463764c
4 changed files with 19 additions and 16 deletions

View File

@@ -62,8 +62,10 @@ public abstract class JdbcSqlGenerator implements SqlGenerator<TableDefinition>
protected DataType<?> toDialectType(final AirbyteProtocolType airbyteProtocolType) {
return switch (airbyteProtocolType) {
case STRING -> SQLDataType.VARCHAR;
case NUMBER -> SQLDataType.FLOAT;
// Many destinations default to a very short length (e.g. Redshift defaults to 256)
case STRING -> SQLDataType.VARCHAR(65535);
// We default to precision=39, scale=9 across destinations
case NUMBER -> SQLDataType.DECIMAL(38, 9);
case INTEGER -> SQLDataType.BIGINT;
case BOOLEAN -> SQLDataType.BOOLEAN;
case TIMESTAMP_WITH_TIMEZONE -> SQLDataType.TIMESTAMPWITHTIMEZONE;

View File

@@ -5,7 +5,7 @@ data:
connectorSubtype: database
connectorType: destination
definitionId: f7a7d195-377f-cf5b-70a5-be6b819019dc
dockerImageTag: 0.7.0
dockerImageTag: 0.7.1
dockerRepository: airbyte/destination-redshift
documentationUrl: https://docs.airbyte.com/integrations/destinations/redshift
githubIssueLabel: destination-redshift

View File

@@ -224,7 +224,7 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
* @param arrays
* @return
*/
Field<?> arrayConcatStmt(List<Field<?>> arrays) {
Field<?> arrayConcatStmt(final List<Field<?>> arrays) {
if (arrays.isEmpty()) {
return field("ARRAY()"); // Return an empty string if the list is empty
}
@@ -235,8 +235,8 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
}
// Recursive case: construct ARRAY_CONCAT function call
Field<?> lastValue = arrays.get(arrays.size() - 1);
Field<?> recursiveCall = arrayConcatStmt(arrays.subList(0, arrays.size() - 1));
final Field<?> lastValue = arrays.get(arrays.size() - 1);
final Field<?> recursiveCall = arrayConcatStmt(arrays.subList(0, arrays.size() - 1));
return function("ARRAY_CONCAT", getSuperType(), recursiveCall, lastValue);
}
@@ -268,7 +268,7 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
* @param includeMetaColumn
* @return
*/
LinkedHashMap<String, DataType<?>> getFinalTableMetaColumns(boolean includeMetaColumn) {
LinkedHashMap<String, DataType<?>> getFinalTableMetaColumns(final boolean includeMetaColumn) {
final LinkedHashMap<String, DataType<?>> metaColumns = new LinkedHashMap<>();
metaColumns.put(COLUMN_NAME_AB_RAW_ID, SQLDataType.VARCHAR(36).nullable(false));
metaColumns.put(COLUMN_NAME_AB_EXTRACTED_AT, SQLDataType.TIMESTAMPWITHTIMEZONE.nullable(false));
@@ -279,12 +279,12 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
@Override
public String createTable(final StreamConfig stream, final String suffix, final boolean force) {
DSLContext dsl = getDslContext();
CreateSchemaFinalStep createSchemaSql = createSchemaIfNotExists(quotedName(stream.id().finalNamespace()));
final DSLContext dsl = getDslContext();
final CreateSchemaFinalStep createSchemaSql = createSchemaIfNotExists(quotedName(stream.id().finalNamespace()));
// TODO: Use Naming transformer to sanitize these strings with redshift restrictions.
String finalTableIdentifier = stream.id().finalName() + suffix.toLowerCase();
CreateTableColumnStep createTableSql = dsl
final String finalTableIdentifier = stream.id().finalName() + suffix.toLowerCase();
final CreateTableColumnStep createTableSql = dsl
.createTable(quotedName(stream.id().finalNamespace(), finalTableIdentifier))
.columns(buildFinalTableFields(stream.columns(), getFinalTableMetaColumns(true)));
if (!force) {
@@ -423,7 +423,7 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
* @param cursor
* @return
*/
Field<Integer> getRowNumber(List<ColumnId> primaryKeys, Optional<ColumnId> cursor) {
Field<Integer> getRowNumber(final List<ColumnId> primaryKeys, final Optional<ColumnId> cursor) {
final List<Field<?>> primaryKeyFields =
primaryKeys != null ? primaryKeys.stream().map(columnId -> field(quotedName(columnId.name()))).collect(Collectors.toList())
: new ArrayList<>();
@@ -452,7 +452,7 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
.where(condition);
}
Condition rawTableCondition(DestinationSyncMode syncMode, boolean isCdcDeletedAtPresent, Optional<Instant> minRawTimestamp) {
Condition rawTableCondition(final DestinationSyncMode syncMode, final boolean isCdcDeletedAtPresent, final Optional<Instant> minRawTimestamp) {
Condition condition = field(name(COLUMN_NAME_AB_LOADED_AT)).isNull();
if (syncMode == DestinationSyncMode.APPEND_DEDUP) {
if (isCdcDeletedAtPresent) {
@@ -478,7 +478,7 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
.columns(buildFinalTableFields(columns, metaFields));
}
String deleteFromFinalTable(final String schemaName, final String tableName, List<ColumnId> primaryKeys, Optional<ColumnId> cursor) {
String deleteFromFinalTable(final String schemaName, final String tableName, final List<ColumnId> primaryKeys, final Optional<ColumnId> cursor) {
final DSLContext dsl = getDslContext();
// Unknown type doesn't play well with where .. in (select..)
final Field<Object> airbyteRawId = field(quotedName(COLUMN_NAME_AB_RAW_ID));
@@ -552,7 +552,7 @@ public class RedshiftSqlGenerator extends JdbcSqlGenerator {
}
@Override
public boolean shouldRetry(Exception e) {
public boolean shouldRetry(final Exception e) {
return false;
}

View File

@@ -156,6 +156,7 @@ Each stream will be output into its own raw table in Redshift. Each table will c
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------|
| 0.7.1 | 2023-12-11 | [33307](https://github.com/airbytehq/airbyte/pull/33307) | DV2: improve data type mapping |
| 0.7.0 | 2023-12-05 | [32326](https://github.com/airbytehq/airbyte/pull/32326) | Opt in beta for v2 destination |
| 0.6.11 | 2023-11-29 | [#32888](https://github.com/airbytehq/airbyte/pull/32888) | Use the new async framework. |
| 0.6.10 | 2023-11-06 | [#32193](https://github.com/airbytehq/airbyte/pull/32193) | Adopt java CDK version 0.4.1. |
@@ -219,4 +220,4 @@ Each stream will be output into its own raw table in Redshift. Each table will c
| 0.3.14 | 2021-10-08 | [\#5924](https://github.com/airbytehq/airbyte/pull/5924) | Fixed AWS S3 Staging COPY is writing records from different table in the same raw table |
| 0.3.13 | 2021-09-02 | [\#5745](https://github.com/airbytehq/airbyte/pull/5745) | Disable STATUPDATE flag when using S3 staging to speed up performance |
| 0.3.12 | 2021-07-21 | [\#3555](https://github.com/airbytehq/airbyte/pull/3555) | Enable partial checkpointing for halfway syncs |
| 0.3.11 | 2021-07-20 | [\#4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec |
| 0.3.11 | 2021-07-20 | [\#4874](https://github.com/airbytehq/airbyte/pull/4874) | allow `additionalProperties` in connector spec |