1
0
mirror of synced 2025-12-25 02:09:19 -05:00

🐛 Source Dynamodb: Fix reserved words in expression (#20172)

* fix reserved words in expression

* chore: bump version

* fix state message

* auto-bump connector version

* fix reserved words in projection expression

* bump dockerfile version

* auto-bump connector version

---------

Co-authored-by: Vincent Koc <vincentkoc@ieee.org>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Sajarin <sajarindider@gmail.com>
Co-authored-by: Sunny <6833405+sh4sh@users.noreply.github.com>
This commit is contained in:
Ivica Taseski
2023-02-27 22:13:03 +01:00
committed by GitHub
parent 32ae1b0c94
commit b9b8cb0638
10 changed files with 73 additions and 23 deletions

View File

@@ -17,5 +17,5 @@ ENV APPLICATION source-dynamodb
COPY --from=build /airbyte /airbyte
# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/source-dynamodb

View File

@@ -6,6 +6,8 @@ package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.databind.JsonNode;
import java.net.URI;
import java.util.Arrays;
import java.util.List;
import software.amazon.awssdk.regions.Region;
public record DynamodbConfig(
@@ -16,18 +18,22 @@ public record DynamodbConfig(
String accessKey,
String secretKey
String secretKey,
List<String> reservedAttributeNames
) {
public static DynamodbConfig createDynamodbConfig(JsonNode jsonNode) {
JsonNode endpoint = jsonNode.get("endpoint");
JsonNode region = jsonNode.get("region");
JsonNode attributeNames = jsonNode.get("reserved_attribute_names");
return new DynamodbConfig(
endpoint != null && !endpoint.asText().isBlank() ? URI.create(endpoint.asText()) : null,
region != null && !region.asText().isBlank() ? Region.of(region.asText()) : null,
jsonNode.get("access_key_id").asText(),
jsonNode.get("secret_access_key").asText());
jsonNode.get("secret_access_key").asText(),
attributeNames != null ? Arrays.asList(attributeNames.asText().split("\\s*,\\s*")) : List.of());
}
}

View File

@@ -13,9 +13,11 @@ import java.time.LocalDate;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.stream.Collectors;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
@@ -30,7 +32,10 @@ public class DynamodbOperations extends AbstractDatabase implements Closeable {
private ObjectMapper schemaObjectMapper;
private DynamodbConfig dynamodbConfig;
public DynamodbOperations(DynamodbConfig dynamodbConfig) {
this.dynamodbConfig = dynamodbConfig;
this.dynamoDbClient = DynamodbUtils.createDynamoDbClient(dynamodbConfig);
initMappers();
}
@@ -105,12 +110,31 @@ public class DynamodbOperations extends AbstractDatabase implements Closeable {
public List<JsonNode> scanTable(String tableName, Set<String> attributes, FilterAttribute filterAttribute) {
List<JsonNode> items = new ArrayList<>();
var projectionAttributes = String.join(", ", attributes);
String prefix = "dyndb";
// remove and replace reserved attribute names
Set<String> copyAttributes = new HashSet<>(attributes);
dynamodbConfig.reservedAttributeNames().forEach(copyAttributes::remove);
dynamodbConfig.reservedAttributeNames().stream()
.filter(attributes::contains)
.map(str -> str.replaceAll("[-.]", ""))
.forEach(attr -> copyAttributes.add("#" + prefix + "_" + attr));
Map<String, String> mappingAttributes = dynamodbConfig.reservedAttributeNames().stream()
.filter(attributes::contains)
.collect(Collectors.toUnmodifiableMap(k -> "#" + prefix + "_" + k.replaceAll("[-.]", ""), k -> k));
var projectionAttributes = String.join(", ", copyAttributes);
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
.tableName(tableName)
.projectionExpression(projectionAttributes);
if (!mappingAttributes.isEmpty()) {
scanRequestBuilder
.expressionAttributeNames(mappingAttributes);
}
if (filterAttribute != null && filterAttribute.name() != null &&
filterAttribute.value() != null && filterAttribute.type() != null) {
@@ -134,8 +158,10 @@ public class DynamodbOperations extends AbstractDatabase implements Closeable {
comparator = ">";
}
String filterPlaceholder = dynamodbConfig.reservedAttributeNames().contains(filterName) ?
"#" + prefix + "_" + filterName.replaceAll("[-.]", "") : filterName;
scanRequestBuilder
.filterExpression(filterName + " " + comparator + " :timestamp")
.filterExpression(filterPlaceholder + " " + comparator + " :timestamp")
.expressionAttributeValues(Map.of(":timestamp", attributeValue));
}

View File

@@ -86,9 +86,9 @@ public class DynamodbUtils {
record StreamState(
AirbyteStateMessage.AirbyteStateType airbyteStateType,
AirbyteStateMessage.AirbyteStateType airbyteStateType,
List<AirbyteStateMessage> airbyteStateMessages) {
List<AirbyteStateMessage> airbyteStateMessages) {
}

View File

@@ -61,6 +61,13 @@
"description": "The corresponding secret to the access key id.",
"airbyte_secret": true,
"examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"]
},
"reserved_attribute_names": {
"title": "Reserved attribute names",
"type": "string",
"description": "Comma separated reserved attribute names present in your tables",
"airbyte_secret": true,
"examples": ["name, field_name, field-name"]
}
}
}

View File

@@ -80,6 +80,7 @@ public class DynamodbDataFactory {
.put("region", dynamodbContainer.getRegion())
.put("access_key_id", dynamodbContainer.getAccessKey())
.put("secret_access_key", dynamodbContainer.getSecretKey())
.put("reserved_attribute_names", "name, field.name, field-name")
.build());
}

View File

@@ -153,7 +153,7 @@ public class DynamodbOperationsTest {
PutItemRequest putItemRequest1 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_4").build(),
"attr_2", AttributeValue.builder().s("str_5").build(),
"attr_3", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
"name", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
"attr_4", AttributeValue.builder().ns("12.5", "74.5").build()));
dynamoDbClient.putItem(putItemRequest1);
@@ -161,13 +161,13 @@ public class DynamodbOperationsTest {
PutItemRequest putItemRequest2 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_6").build(),
"attr_2", AttributeValue.builder().s("str_7").build(),
"attr_3", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(),
"name", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(),
"attr_6", AttributeValue.builder().ss("str_1", "str_2").build()));
dynamoDbClient.putItem(putItemRequest2);
var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "attr_3"),
new DynamodbOperations.FilterAttribute("attr_3", "2018-12-21T17:42:34Z",
var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "name"),
new DynamodbOperations.FilterAttribute("name", "2018-12-21T17:42:34Z",
DynamodbOperations.FilterAttribute.FilterType.S));
assertThat(response)
@@ -175,7 +175,7 @@ public class DynamodbOperationsTest {
JSONAssert.assertEquals(objectMapper.writeValueAsString(response.get(0)), """
{
"attr_3": "2019-12-21T17:42:34Z",
"name": "2019-12-21T17:42:34Z",
"attr_2": "str_7",
"attr_1": "str_6"
}