This reverts commit a5cd384a40.
This commit is contained in:
@@ -17,5 +17,5 @@ ENV APPLICATION source-dynamodb
|
||||
COPY --from=build /airbyte /airbyte
|
||||
|
||||
# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
|
||||
LABEL io.airbyte.version=0.1.1
|
||||
LABEL io.airbyte.version=0.1.2
|
||||
LABEL io.airbyte.name=airbyte/source-dynamodb
|
||||
|
||||
@@ -6,6 +6,8 @@ package io.airbyte.integrations.source.dynamodb;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import java.net.URI;
|
||||
import java.util.Arrays;
|
||||
import java.util.List;
|
||||
import software.amazon.awssdk.regions.Region;
|
||||
|
||||
public record DynamodbConfig(
|
||||
@@ -16,18 +18,22 @@ public record DynamodbConfig(
|
||||
|
||||
String accessKey,
|
||||
|
||||
String secretKey
|
||||
String secretKey,
|
||||
|
||||
List<String> reservedAttributeNames
|
||||
|
||||
) {
|
||||
|
||||
public static DynamodbConfig createDynamodbConfig(JsonNode jsonNode) {
|
||||
JsonNode endpoint = jsonNode.get("endpoint");
|
||||
JsonNode region = jsonNode.get("region");
|
||||
JsonNode attributeNames = jsonNode.get("reserved_attribute_names");
|
||||
return new DynamodbConfig(
|
||||
endpoint != null && !endpoint.asText().isBlank() ? URI.create(endpoint.asText()) : null,
|
||||
region != null && !region.asText().isBlank() ? Region.of(region.asText()) : null,
|
||||
jsonNode.get("access_key_id").asText(),
|
||||
jsonNode.get("secret_access_key").asText());
|
||||
jsonNode.get("secret_access_key").asText(),
|
||||
attributeNames != null ? Arrays.asList(attributeNames.asText().split("\\s*,\\s*")) : List.of());
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -13,9 +13,11 @@ import java.time.LocalDate;
|
||||
import java.time.format.DateTimeParseException;
|
||||
import java.util.ArrayList;
|
||||
import java.util.HashMap;
|
||||
import java.util.HashSet;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.stream.Collectors;
|
||||
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
|
||||
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
|
||||
@@ -30,7 +32,10 @@ public class DynamodbOperations extends AbstractDatabase implements Closeable {
|
||||
|
||||
private ObjectMapper schemaObjectMapper;
|
||||
|
||||
private DynamodbConfig dynamodbConfig;
|
||||
|
||||
public DynamodbOperations(DynamodbConfig dynamodbConfig) {
|
||||
this.dynamodbConfig = dynamodbConfig;
|
||||
this.dynamoDbClient = DynamodbUtils.createDynamoDbClient(dynamodbConfig);
|
||||
initMappers();
|
||||
}
|
||||
@@ -105,12 +110,31 @@ public class DynamodbOperations extends AbstractDatabase implements Closeable {
|
||||
public List<JsonNode> scanTable(String tableName, Set<String> attributes, FilterAttribute filterAttribute) {
|
||||
List<JsonNode> items = new ArrayList<>();
|
||||
|
||||
var projectionAttributes = String.join(", ", attributes);
|
||||
String prefix = "dyndb";
|
||||
// remove and replace reserved attribute names
|
||||
Set<String> copyAttributes = new HashSet<>(attributes);
|
||||
dynamodbConfig.reservedAttributeNames().forEach(copyAttributes::remove);
|
||||
dynamodbConfig.reservedAttributeNames().stream()
|
||||
.filter(attributes::contains)
|
||||
.map(str -> str.replaceAll("[-.]", ""))
|
||||
.forEach(attr -> copyAttributes.add("#" + prefix + "_" + attr));
|
||||
|
||||
Map<String, String> mappingAttributes = dynamodbConfig.reservedAttributeNames().stream()
|
||||
.filter(attributes::contains)
|
||||
.collect(Collectors.toUnmodifiableMap(k -> "#" + prefix + "_" + k.replaceAll("[-.]", ""), k -> k));
|
||||
|
||||
var projectionAttributes = String.join(", ", copyAttributes);
|
||||
|
||||
|
||||
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
|
||||
.tableName(tableName)
|
||||
.projectionExpression(projectionAttributes);
|
||||
|
||||
if (!mappingAttributes.isEmpty()) {
|
||||
scanRequestBuilder
|
||||
.expressionAttributeNames(mappingAttributes);
|
||||
}
|
||||
|
||||
if (filterAttribute != null && filterAttribute.name() != null &&
|
||||
filterAttribute.value() != null && filterAttribute.type() != null) {
|
||||
|
||||
@@ -134,8 +158,10 @@ public class DynamodbOperations extends AbstractDatabase implements Closeable {
|
||||
comparator = ">";
|
||||
}
|
||||
|
||||
String filterPlaceholder = dynamodbConfig.reservedAttributeNames().contains(filterName) ?
|
||||
"#" + prefix + "_" + filterName.replaceAll("[-.]", "") : filterName;
|
||||
scanRequestBuilder
|
||||
.filterExpression(filterName + " " + comparator + " :timestamp")
|
||||
.filterExpression(filterPlaceholder + " " + comparator + " :timestamp")
|
||||
.expressionAttributeValues(Map.of(":timestamp", attributeValue));
|
||||
|
||||
}
|
||||
|
||||
@@ -86,9 +86,9 @@ public class DynamodbUtils {
|
||||
|
||||
record StreamState(
|
||||
|
||||
AirbyteStateMessage.AirbyteStateType airbyteStateType,
|
||||
AirbyteStateMessage.AirbyteStateType airbyteStateType,
|
||||
|
||||
List<AirbyteStateMessage> airbyteStateMessages) {
|
||||
List<AirbyteStateMessage> airbyteStateMessages) {
|
||||
|
||||
}
|
||||
|
||||
|
||||
@@ -61,6 +61,13 @@
|
||||
"description": "The corresponding secret to the access key id.",
|
||||
"airbyte_secret": true,
|
||||
"examples": ["a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"]
|
||||
},
|
||||
"reserved_attribute_names": {
|
||||
"title": "Reserved attribute names",
|
||||
"type": "string",
|
||||
"description": "Comma separated reserved attribute names present in your tables",
|
||||
"airbyte_secret": true,
|
||||
"examples": ["name, field_name, field-name"]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -80,6 +80,7 @@ public class DynamodbDataFactory {
|
||||
.put("region", dynamodbContainer.getRegion())
|
||||
.put("access_key_id", dynamodbContainer.getAccessKey())
|
||||
.put("secret_access_key", dynamodbContainer.getSecretKey())
|
||||
.put("reserved_attribute_names", "name, field.name, field-name")
|
||||
.build());
|
||||
}
|
||||
|
||||
|
||||
@@ -153,7 +153,7 @@ public class DynamodbOperationsTest {
|
||||
PutItemRequest putItemRequest1 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
|
||||
"attr_1", AttributeValue.builder().s("str_4").build(),
|
||||
"attr_2", AttributeValue.builder().s("str_5").build(),
|
||||
"attr_3", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
|
||||
"name", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
|
||||
"attr_4", AttributeValue.builder().ns("12.5", "74.5").build()));
|
||||
|
||||
dynamoDbClient.putItem(putItemRequest1);
|
||||
@@ -161,13 +161,13 @@ public class DynamodbOperationsTest {
|
||||
PutItemRequest putItemRequest2 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
|
||||
"attr_1", AttributeValue.builder().s("str_6").build(),
|
||||
"attr_2", AttributeValue.builder().s("str_7").build(),
|
||||
"attr_3", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(),
|
||||
"name", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(),
|
||||
"attr_6", AttributeValue.builder().ss("str_1", "str_2").build()));
|
||||
|
||||
dynamoDbClient.putItem(putItemRequest2);
|
||||
|
||||
var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "attr_3"),
|
||||
new DynamodbOperations.FilterAttribute("attr_3", "2018-12-21T17:42:34Z",
|
||||
var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "name"),
|
||||
new DynamodbOperations.FilterAttribute("name", "2018-12-21T17:42:34Z",
|
||||
DynamodbOperations.FilterAttribute.FilterType.S));
|
||||
|
||||
assertThat(response)
|
||||
@@ -175,7 +175,7 @@ public class DynamodbOperationsTest {
|
||||
|
||||
JSONAssert.assertEquals(objectMapper.writeValueAsString(response.get(0)), """
|
||||
{
|
||||
"attr_3": "2019-12-21T17:42:34Z",
|
||||
"name": "2019-12-21T17:42:34Z",
|
||||
"attr_2": "str_7",
|
||||
"attr_1": "str_6"
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user