1
0
mirror of synced 2025-12-23 21:03:15 -05:00

🎉 New Source: Dynamodb (#18750)

* dynamodb source connector

* code cleanup

* code cleanup

* add date type filtering

* add doc

* revert integer schema primitive

* add dynamodb to source def

* auto-bump connector version

Co-authored-by: itaseski <itaseski@debian-BULLSEYE-live-builder-AMD64>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
This commit is contained in:
Ivica Taseski
2022-11-14 16:21:50 +01:00
committed by GitHub
parent d7e2b76d4a
commit a2d24b5abf
21 changed files with 1831 additions and 0 deletions

View File

@@ -367,6 +367,13 @@
documentationUrl: https://docs.airbyte.com/integrations/sources/dv-360
sourceType: api
releaseStage: alpha
- name: DynamoDB
sourceDefinitionId: 50401137-8871-4c5a-abb7-1f5fda35545a
dockerRepository: airbyte/source-dynamodb
dockerImageTag: 0.1.0
documentationUrl: https://docs.airbyte.com/integrations/sources/dynamodb
sourceType: api
releaseStage: alpha
- name: E2E Testing
sourceDefinitionId: d53f9084-fa6b-4a5a-976c-5b8392f4ad8a
dockerRepository: airbyte/source-e2e-test

View File

@@ -3000,6 +3000,75 @@
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-dynamodb:0.1.0"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/dynamodb"
connectionSpecification:
$schema: "http://json-schema.org/draft-07/schema#"
title: "Dynamodb Source Spec"
type: "object"
required:
- "access_key_id"
- "secret_access_key"
additionalProperties: false
properties:
endpoint:
title: "Dynamodb Endpoint"
type: "string"
default: ""
description: "the URL of the Dynamodb database"
examples:
- "https://{aws_dynamo_db_url}.com"
region:
title: "Dynamodb Region"
type: "string"
default: ""
description: "The region of the Dynamodb database"
enum:
- ""
- "us-east-1"
- "us-east-2"
- "us-west-1"
- "us-west-2"
- "af-south-1"
- "ap-east-1"
- "ap-south-1"
- "ap-northeast-1"
- "ap-northeast-2"
- "ap-northeast-3"
- "ap-southeast-1"
- "ap-southeast-2"
- "ca-central-1"
- "cn-north-1"
- "cn-northwest-1"
- "eu-central-1"
- "eu-north-1"
- "eu-south-1"
- "eu-west-1"
- "eu-west-2"
- "eu-west-3"
- "sa-east-1"
- "me-south-1"
- "us-gov-east-1"
- "us-gov-west-1"
access_key_id:
title: "Dynamodb Key Id"
type: "string"
description: "The access key id to access Dynamodb. Airbyte requires read\
\ permissions to the database"
airbyte_secret: true
examples:
- "A012345678910EXAMPLE"
secret_access_key:
title: "Dynamodb Access Key"
type: "string"
description: "The corresponding secret to the access key id."
airbyte_secret: true
examples:
- "a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"
supportsNormalization: false
supportsDBT: false
supported_destination_sync_modes: []
- dockerImage: "airbyte/source-e2e-test:2.1.3"
spec:
documentationUrl: "https://docs.airbyte.com/integrations/sources/e2e-test"

View File

@@ -0,0 +1,21 @@
FROM airbyte/integration-base-java:dev AS build
WORKDIR /airbyte
ENV APPLICATION source-dynamodb
COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1 && rm -rf ${APPLICATION}.tar
FROM airbyte/integration-base-java:dev
WORKDIR /airbyte
ENV APPLICATION source-dynamodb
COPY --from=build /airbyte /airbyte
# Airbyte's build system uses these labels to know what to name and tag the docker images produced by this Dockerfile.
LABEL io.airbyte.version=0.1.0
LABEL io.airbyte.name=airbyte/source-dynamodb

View File

@@ -0,0 +1,69 @@
# Source Dynamodb
This is the repository for the Dynamodb source connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/sources/dynamodb).
## Local development
#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-dynamodb:build
```
#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.
**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.
### Locally running the connector docker image
#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:source-dynamodb:airbyteDocker
```
When building via Gradle, the docker image name and tag, respectively, are the values of the `io.airbyte.name` and `io.airbyte.version` `LABEL`s in
the Dockerfile.
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/source-dynamodb:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-dynamodb:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-dynamodb:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-dynamodb:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
We use `JUnit` for Java tests.
### Unit and Integration Tests
Place unit tests under `src/test/...`
Place integration tests in `src/test-integration/...`
#### Acceptance Tests
Airbyte has a standard test suite that all source connectors must pass. Implement the `TODO`s in
`src/test-integration/java/io/airbyte/integrations/sources/dynamodbSourceAcceptanceTest.java`.
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:source-dynamodb:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:source-dynamodb:integrationTest
```
## Dependency Management
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing unit and integration tests.
1. Bump the connector version in `Dockerfile` -- just increment the value of the `LABEL io.airbyte.version` appropriately (we use [SemVer](https://semver.org/)).
1. Create a Pull Request.
1. Pat yourself on the back for being an awesome contributor.
1. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

View File

@@ -0,0 +1,43 @@
plugins {
id 'application'
id 'airbyte-docker'
id 'airbyte-integration-test-java'
}
application {
mainClass = 'io.airbyte.integrations.source.dynamodb.DynamodbSource'
}
def testContainersVersion = '1.17.5'
def assertVersion = '3.23.1'
dependencies {
implementation project(':airbyte-db:db-lib')
implementation project(':airbyte-integrations:bases:base-java')
implementation project(':airbyte-protocol:protocol-models')
implementation project(':airbyte-integrations:connectors:source-relational-db')
implementation project(':airbyte-config:config-models')
implementation platform('software.amazon.awssdk:bom:2.18.1')
// https://mvnrepository.com/artifact/software.amazon.awssdk/dynamodb
implementation 'software.amazon.awssdk:dynamodb'
testImplementation 'org.skyscreamer:jsonassert:1.5.1'
// https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
implementation 'com.fasterxml.jackson.core:jackson-databind:2.13.4.2'
// https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-core
implementation 'com.fasterxml.jackson.core:jackson-core:2.13.4'
testImplementation "org.assertj:assertj-core:${assertVersion}"
testImplementation "org.testcontainers:localstack:${testContainersVersion}"
integrationTestJavaImplementation project(':airbyte-integrations:connectors:source-dynamodb')
integrationTestJavaImplementation project(':airbyte-integrations:bases:standard-source-test')
implementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
integrationTestJavaImplementation files(project(':airbyte-integrations:bases:base-java').airbyteDocker.outputs)
}

View File

@@ -0,0 +1,67 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import java.io.IOException;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
public class DynamodbAttributeSerializer extends JsonSerializer<AttributeValue> {
@Override
public void serialize(AttributeValue value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
switch (value.type()) {
case S -> gen.writeString(value.s());
case N -> {
try {
Long.parseLong(value.n());
gen.writeNumber(Long.parseLong(value.n()));
} catch (NumberFormatException e) {
gen.writeNumber(Double.parseDouble(value.n()));
}
}
case B -> gen.writeBinary(value.b().asByteArray());
case SS -> {
gen.writeStartArray();
for (var str : value.ss()) {
gen.writeString(str);
}
gen.writeEndArray();
}
case NS -> {
gen.writeStartArray();
for (var str : value.ns()) {
gen.writeNumber(str);
}
gen.writeEndArray();
}
case BS -> {
gen.writeStartArray();
for (var sb : value.bs()) {
gen.writeBinary(sb.asByteArray());
}
gen.writeEndArray();
}
case M -> {
gen.writeStartObject();
for (var attr : value.m().entrySet()) {
gen.writeFieldName(attr.getKey());
serialize(attr.getValue(), gen, serializers);
}
gen.writeEndObject();
}
case L -> {
gen.writeStartArray();
for (var attr : value.l()) {
serialize(attr, gen, serializers);
}
gen.writeEndArray();
}
case BOOL -> gen.writeBoolean(value.bool());
case NUL -> gen.writeNull();
case UNKNOWN_TO_SDK_VERSION -> {
// ignore unknown fields
}
}
}
}

View File

@@ -0,0 +1,30 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.databind.JsonNode;
import java.net.URI;
import software.amazon.awssdk.regions.Region;
public record DynamodbConfig(
URI endpoint,
Region region,
String accessKey,
String secretKey
) {
public static DynamodbConfig createDynamodbConfig(JsonNode jsonNode) {
JsonNode endpoint = jsonNode.get("endpoint");
JsonNode region = jsonNode.get("region");
return new DynamodbConfig(
endpoint != null && !endpoint.asText().isBlank() ? URI.create(endpoint.asText()) : null,
region != null && !region.asText().isBlank() ? Region.of(region.asText()) : null,
jsonNode.get("access_key_id").asText(),
jsonNode.get("secret_access_key").asText()
);
}
}

View File

@@ -0,0 +1,167 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.module.SimpleModule;
import io.airbyte.db.AbstractDatabase;
import java.io.Closeable;
import java.time.LocalDate;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.DescribeTableRequest;
import software.amazon.awssdk.services.dynamodb.model.ScanRequest;
public class DynamodbOperations extends AbstractDatabase implements Closeable {
private final DynamoDbClient dynamoDbClient;
private ObjectMapper attributeObjectMapper;
private ObjectMapper schemaObjectMapper;
public DynamodbOperations(DynamodbConfig dynamodbConfig) {
this.dynamoDbClient = DynamodbUtils.createDynamoDbClient(dynamodbConfig);
initMappers();
}
public DynamodbOperations(DynamoDbClient dynamoDbClient) {
this.dynamoDbClient = dynamoDbClient;
initMappers();
}
private void initMappers() {
SimpleModule attributeModule = new SimpleModule();
attributeModule.addSerializer(AttributeValue.class, new DynamodbAttributeSerializer());
this.attributeObjectMapper = new ObjectMapper().registerModule(attributeModule);
SimpleModule schemaModule = new SimpleModule();
schemaModule.addSerializer(AttributeValue.class, new DynamodbSchemaSerializer());
this.schemaObjectMapper = new ObjectMapper().registerModule(schemaModule);
}
public List<String> listTables() {
return dynamoDbClient.listTables()
// filter on table status?
.tableNames();
}
public List<String> primaryKey(String tableName) {
DescribeTableRequest describeTableRequest = DescribeTableRequest.builder().tableName(tableName).build();
return dynamoDbClient.describeTable(describeTableRequest).table().attributeDefinitions().stream()
.map(AttributeDefinition::attributeName)
.toList();
}
public JsonNode inferSchema(String tableName, int sampleSize) {
List<Map<String, AttributeValue>> items = new ArrayList<>();
ScanRequest scanRequest = ScanRequest.builder()
.limit(sampleSize)
.tableName(tableName)
.build();
var scanIterable = dynamoDbClient.scanPaginator(scanRequest);
int scannedItems = 0;
for (var scanResponse : scanIterable) {
if (scannedItems >= sampleSize) {
break;
}
// can scan a 'bit' more items than 'sampleSize' if response is > 1MB since every
// new page request on the iterator will return new 'sampleSize' amount of items.
scannedItems += scanResponse.count();
items.addAll(scanResponse.items());
}
/*
* schema inference with combining only the top level attributes of different items.
* for complete schema inference the implementation should do full traversal of each item object graph
* and merge different nested attributes at the same level
* */
Map<String, AttributeValue> mergedItems = items.stream()
.reduce(new HashMap<>(), (merged, current) -> {
merged.putAll(current);
return merged;
});
return schemaObjectMapper.convertValue(mergedItems, JsonNode.class);
}
public List<JsonNode> scanTable(String tableName, Set<String> attributes, FilterAttribute filterAttribute) {
List<JsonNode> items = new ArrayList<>();
var projectionAttributes = String.join(", ", attributes);
ScanRequest.Builder scanRequestBuilder = ScanRequest.builder()
.tableName(tableName)
.projectionExpression(projectionAttributes);
if (filterAttribute != null && filterAttribute.name() != null &&
filterAttribute.value() != null && filterAttribute.type() != null) {
var filterName = filterAttribute.name();
var filterValue = filterAttribute.value();
// Dynamodb supports timestamp filtering based on ISO format as string and Epoch format as number type
AttributeValue attributeValue = switch (filterAttribute.type()) {
case S -> AttributeValue.builder().s(filterValue).build();
case N -> AttributeValue.builder().n(filterValue).build();
};
String comparator;
try {
// if date is of format 2016-02-15 we should use gr-eq in order to not skip records
// from the same date after first replication
LocalDate.parse(filterValue);
comparator = ">=";
} catch (DateTimeParseException e) {
comparator = ">";
}
scanRequestBuilder
.filterExpression(filterName + " " + comparator + " :timestamp")
.expressionAttributeValues(Map.of(":timestamp", attributeValue));
}
var scanIterable = dynamoDbClient.scanPaginator(scanRequestBuilder.build());
for (var scanResponse : scanIterable) {
scanResponse.items().stream()
.map(attr -> attributeObjectMapper.convertValue(attr, JsonNode.class))
.forEach(items::add);
}
return items;
}
@Override
public void close() {
dynamoDbClient.close();
}
public record FilterAttribute(String name, String value, FilterType type) {
public enum FilterType {
S, N
}
}
}

View File

@@ -0,0 +1,156 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.core.JsonGenerator;
import com.fasterxml.jackson.databind.JsonSerializer;
import com.fasterxml.jackson.databind.SerializerProvider;
import java.io.IOException;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
public class DynamodbSchemaSerializer extends JsonSerializer<AttributeValue> {
@Override
public void serialize(AttributeValue value, JsonGenerator gen, SerializerProvider serializers) throws IOException {
switch (value.type()) {
case S -> {
gen.writeStartObject();
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "string"}, 0, 2);
gen.writeEndObject();
}
case N -> {
gen.writeStartObject();
gen.writeFieldName("type");
try {
Long.parseLong(value.n());
gen.writeArray(new String[] {"null", "integer"}, 0, 2);
} catch (NumberFormatException e) {
gen.writeArray(new String[] {"null", "number"}, 0, 2);
}
gen.writeEndObject();
}
case B -> {
gen.writeStartObject();
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "string"}, 0, 2);
gen.writeStringField("contentEncoding", "base64");
gen.writeEndObject();
}
case SS -> {
gen.writeStartObject();
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "array"}, 0, 2);
gen.writeObjectFieldStart("items");
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "string"}, 0, 2);
gen.writeEndObject();
gen.writeEndObject();
}
case NS -> {
gen.writeStartObject();
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "array"}, 0, 2);
gen.writeObjectFieldStart("items");
gen.writeFieldName("type");
//array can contain mixed integer and decimal values
gen.writeArray(new String[] {"null", "number"}, 0, 2);
gen.writeEndObject();
gen.writeEndObject();
}
case BS -> {
gen.writeStartObject();
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "array"}, 0, 2);
gen.writeObjectFieldStart("items");
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "string"}, 0, 2);
gen.writeStringField("contentEncoding", "base64");
gen.writeEndObject();
gen.writeEndObject();
}
case M -> {
gen.writeStartObject();
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "object"}, 0, 2);
gen.writeObjectFieldStart("properties");
for (var attr : value.m().entrySet()) {
gen.writeFieldName(attr.getKey());
// recursively iterate over nested attributes and create json schema fields
serialize(attr.getValue(), gen, serializers);
}
gen.writeEndObject();
gen.writeEndObject();
}
case L -> {
//TODO (itaseski) perform deduplication on same type schema elements
gen.writeStartObject();
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "array"}, 0, 2);
gen.writeObjectFieldStart("items");
gen.writeArrayFieldStart("anyOf");
// recursively iterate over nested attributes and create json schema fields
for (var attr : value.l()) {
serialize(attr, gen, serializers);
}
gen.writeEndArray();
gen.writeEndObject();
gen.writeEndObject();
}
case BOOL -> {
gen.writeStartObject();
gen.writeFieldName("type");
gen.writeArray(new String[] {"null", "boolean"}, 0, 2);
gen.writeEndObject();
}
case NUL -> {
gen.writeStartObject();
gen.writeStringField("type", "null");
gen.writeEndObject();
}
case UNKNOWN_TO_SDK_VERSION -> {
// ignore unknown fields
}
}
}
}

View File

@@ -0,0 +1,188 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.core.type.TypeReference;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.features.EnvVariableFeatureFlags;
import io.airbyte.commons.features.FeatureFlags;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.integrations.BaseConnector;
import io.airbyte.integrations.base.AirbyteStreamNameNamespacePair;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.relationaldb.CursorInfo;
import io.airbyte.integrations.source.relationaldb.StateDecoratingIterator;
import io.airbyte.integrations.source.relationaldb.state.StateManager;
import io.airbyte.integrations.source.relationaldb.state.StateManagerFactory;
import io.airbyte.protocol.models.AirbyteCatalog;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteStream;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.Set;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class DynamodbSource extends BaseConnector implements Source {
private static final Logger LOGGER = LoggerFactory.getLogger(DynamodbSource.class);
private final FeatureFlags featureFlags = new EnvVariableFeatureFlags();
private final ObjectMapper objectMapper = new ObjectMapper();
public static void main(String[] args) throws Exception {
Source source = new DynamodbSource();
LOGGER.info("starting Source: {}", DynamodbSource.class);
new IntegrationRunner(source).run(args);
LOGGER.info("completed Source: {}", DynamodbSource.class);
}
@Override
public AirbyteConnectionStatus check(JsonNode config) {
var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);
try (var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {
dynamodbOperations.listTables();
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.SUCCEEDED);
} catch (Exception e) {
LOGGER.error("Error while listing Dynamodb tables with reason: ", e);
return new AirbyteConnectionStatus()
.withStatus(AirbyteConnectionStatus.Status.FAILED);
}
}
@Override
public AirbyteCatalog discover(JsonNode config) {
var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);
try (var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {
var airbyteStreams = dynamodbOperations.listTables().stream()
.map(tb -> new AirbyteStream()
.withName(tb)
.withJsonSchema(Jsons.jsonNode(ImmutableMap.builder()
.put("type", "object")
.put("properties", dynamodbOperations.inferSchema(tb, 1000))
.build()))
.withSourceDefinedPrimaryKey(Collections.singletonList(dynamodbOperations.primaryKey(tb)))
.withSupportedSyncModes(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
.toList();
return new AirbyteCatalog().withStreams(airbyteStreams);
}
}
@Override
public AutoCloseableIterator<AirbyteMessage> read(JsonNode config, ConfiguredAirbyteCatalog catalog,
JsonNode state) {
var streamState = DynamodbUtils.deserializeStreamState(state, featureFlags.useStreamCapableState());
StateManager stateManager = StateManagerFactory
.createStateManager(streamState.airbyteStateType(), streamState.airbyteStateMessages(), catalog);
var dynamodbConfig = DynamodbConfig.createDynamodbConfig(config);
try (var dynamodbOperations = new DynamodbOperations(dynamodbConfig)) {
var streamIterators = catalog.getStreams().stream()
.map(str -> switch (str.getSyncMode()) {
case INCREMENTAL -> scanIncremental(dynamodbOperations, str.getStream(), str.getCursorField().get(0), stateManager);
case FULL_REFRESH -> scanFullRefresh(dynamodbOperations, str.getStream());
})
.toList();
return AutoCloseableIterators.concatWithEagerClose(streamIterators);
}
}
private AutoCloseableIterator<AirbyteMessage> scanIncremental(DynamodbOperations dynamodbOperations,
AirbyteStream airbyteStream,
String cursorField, StateManager stateManager) {
var streamPair = new AirbyteStreamNameNamespacePair(airbyteStream.getName(), airbyteStream.getNamespace());
Optional<CursorInfo> cursorInfo = stateManager.getCursorInfo(streamPair);
Map<String, JsonNode> properties = objectMapper.convertValue(airbyteStream.getJsonSchema().get("properties"), new TypeReference<>() {});
Set<String> selectedAttributes = properties.keySet();
//cursor type will be retrieved from the json schema to save time on db schema crawling reading large amount of items
String cursorType = properties.get(cursorField).get("type").asText();
var messageStream = cursorInfo.map(cursor -> {
var filterType = switch (cursorType) {
case "string" -> DynamodbOperations.FilterAttribute.FilterType.S;
case "integer" -> DynamodbOperations.FilterAttribute.FilterType.N;
case "number" -> {
JsonNode airbyteType = properties.get(cursorField).get("airbyte_type");
if (airbyteType != null && airbyteType.asText().equals("integer")) {
yield DynamodbOperations.FilterAttribute.FilterType.N;
} else {
throw new UnsupportedOperationException("Unsupported attribute type for filtering");
}
}
default -> throw new UnsupportedOperationException("Unsupported attribute type for filtering");
};
DynamodbOperations.FilterAttribute filterAttribute = new DynamodbOperations.FilterAttribute(
cursor.getCursorField(),
cursor.getCursor(),
filterType
);
return dynamodbOperations.scanTable(airbyteStream.getName(), selectedAttributes, filterAttribute);
})
// perform full refresh if cursor is not present
.orElse(dynamodbOperations.scanTable(airbyteStream.getName(), selectedAttributes, null))
.stream()
.map(jn -> DynamodbUtils.mapAirbyteMessage(airbyteStream.getName(), jn));
// wrap stream in state emission iterator
return AutoCloseableIterators.transform(autoCloseableIterator -> new StateDecoratingIterator(
autoCloseableIterator,
stateManager,
streamPair,
cursorField,
cursorInfo.map(CursorInfo::getCursor).orElse(null),
JsonSchemaPrimitive.valueOf(cursorType.toUpperCase()),
//emit state after full stream has been processed
0),
AutoCloseableIterators.fromStream(messageStream));
}
private AutoCloseableIterator<AirbyteMessage> scanFullRefresh(DynamodbOperations dynamodbOperations,
AirbyteStream airbyteStream) {
Map<String, JsonNode> properties = objectMapper.convertValue(airbyteStream.getJsonSchema().get("properties"), new TypeReference<>() {});
Set<String> selectedAttributes = properties.keySet();
var messageStream = dynamodbOperations
.scanTable(airbyteStream.getName(), selectedAttributes, null)
.stream()
.map(jn -> DynamodbUtils.mapAirbyteMessage(airbyteStream.getName(), jn));
return AutoCloseableIterators.fromStream(messageStream);
}
}

View File

@@ -0,0 +1,89 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StateWrapper;
import io.airbyte.config.helpers.StateMessageHelper;
import io.airbyte.integrations.source.relationaldb.models.DbState;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.AirbyteStreamState;
import java.time.Instant;
import java.util.List;
import java.util.Optional;
import software.amazon.awssdk.auth.credentials.AwsBasicCredentials;
import software.amazon.awssdk.auth.credentials.StaticCredentialsProvider;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
public class DynamodbUtils {
private DynamodbUtils() {
}
public static DynamoDbClient createDynamoDbClient(DynamodbConfig dynamodbConfig) {
var dynamoDbClientBuilder = DynamoDbClient.builder();
// configure access credentials
dynamoDbClientBuilder.credentialsProvider(StaticCredentialsProvider.create(
AwsBasicCredentials.create(dynamodbConfig.accessKey(), dynamodbConfig.secretKey())));
if (dynamodbConfig.region() != null) {
dynamoDbClientBuilder.region(dynamodbConfig.region());
}
if (dynamodbConfig.endpoint() != null) {
dynamoDbClientBuilder.endpointOverride(dynamodbConfig.endpoint());
}
return dynamoDbClientBuilder.build();
}
public static AirbyteMessage mapAirbyteMessage(String stream, JsonNode data) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(stream)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(data));
}
public static StreamState deserializeStreamState(JsonNode state, boolean useStreamCapableState) {
Optional<StateWrapper> typedState =
StateMessageHelper.getTypedState(state, useStreamCapableState);
return typedState.map(stateWrapper -> switch (stateWrapper.getStateType()) {
case STREAM:
yield new StreamState(AirbyteStateMessage.AirbyteStateType.STREAM, stateWrapper.getStateMessages());
case LEGACY:
yield new StreamState(AirbyteStateMessage.AirbyteStateType.LEGACY, List.of(
new AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.LEGACY)
.withData(stateWrapper.getLegacyState())));
case GLOBAL:
throw new UnsupportedOperationException("Unsupported stream state");
}).orElseGet(() -> {
//create empty initial state
if (useStreamCapableState) {
return new StreamState(AirbyteStateMessage.AirbyteStateType.STREAM, List.of(
new AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.STREAM)
.withStream(new AirbyteStreamState())));
} else {
return new StreamState(AirbyteStateMessage.AirbyteStateType.LEGACY, List.of(
new AirbyteStateMessage().withType(AirbyteStateMessage.AirbyteStateType.LEGACY)
.withData(Jsons.jsonNode(new DbState()))));
}
});
}
record StreamState(
AirbyteStateMessage.AirbyteStateType airbyteStateType,
List<AirbyteStateMessage> airbyteStateMessages) {
}
}

View File

@@ -0,0 +1,76 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/sources/dynamodb",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Dynamodb Source Spec",
"type": "object",
"required": [
"access_key_id",
"secret_access_key"
],
"additionalProperties": false,
"properties": {
"endpoint": {
"title": "Dynamodb Endpoint",
"type": "string",
"default": "",
"description": "the URL of the Dynamodb database",
"examples": [
"https://{aws_dynamo_db_url}.com"
]
},
"region": {
"title": "Dynamodb Region",
"type": "string",
"default": "",
"description": "The region of the Dynamodb database",
"enum": [
"",
"us-east-1",
"us-east-2",
"us-west-1",
"us-west-2",
"af-south-1",
"ap-east-1",
"ap-south-1",
"ap-northeast-1",
"ap-northeast-2",
"ap-northeast-3",
"ap-southeast-1",
"ap-southeast-2",
"ca-central-1",
"cn-north-1",
"cn-northwest-1",
"eu-central-1",
"eu-north-1",
"eu-south-1",
"eu-west-1",
"eu-west-2",
"eu-west-3",
"sa-east-1",
"me-south-1",
"us-gov-east-1",
"us-gov-west-1"
]
},
"access_key_id": {
"title": "Dynamodb Key Id",
"type": "string",
"description": "The access key id to access Dynamodb. Airbyte requires read permissions to the database",
"airbyte_secret": true,
"examples": [
"A012345678910EXAMPLE"
]
},
"secret_access_key": {
"title": "Dynamodb Access Key",
"type": "string",
"description": "The corresponding secret to the access key id.",
"airbyte_secret": true,
"examples": [
"a012345678910ABCDEFGH/AbCdEfGhEXAMPLEKEY"
]
}
}
}
}

View File

@@ -0,0 +1,28 @@
package io.airbyte.integrations.source.dynamodb;
import java.net.URI;
import org.testcontainers.containers.localstack.LocalStackContainer;
import org.testcontainers.utility.DockerImageName;
public class DynamodbContainer extends LocalStackContainer {
public static DynamodbContainer createWithStart() {
var dynamodbContainer = (DynamodbContainer) new DynamodbContainer()
.withServices(Service.DYNAMODB);
dynamodbContainer.start();
return dynamodbContainer;
}
public static DynamodbContainer create() {
return (DynamodbContainer) new DynamodbContainer()
.withServices(Service.DYNAMODB);
}
public DynamodbContainer() {
super(DockerImageName.parse("localstack/localstack:1.2.0"));
}
public URI getEndpointOverride() {
return super.getEndpointOverride(Service.DYNAMODB);
}
}

View File

@@ -0,0 +1,101 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.CatalogHelpers;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConfiguredAirbyteStream;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.SyncMode;
import java.util.List;
import java.util.Map;
import java.util.stream.IntStream;
import software.amazon.awssdk.services.dynamodb.model.AttributeDefinition;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.CreateTableRequest;
import software.amazon.awssdk.services.dynamodb.model.KeySchemaElement;
import software.amazon.awssdk.services.dynamodb.model.KeyType;
import software.amazon.awssdk.services.dynamodb.model.ProvisionedThroughput;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
import software.amazon.awssdk.services.dynamodb.model.ScalarAttributeType;
import software.amazon.awssdk.services.dynamodb.model.TableClass;
public class DynamodbDataFactory {
private DynamodbDataFactory() {
}
public static List<CreateTableRequest> createTables(String tablePrefix, int tables) {
return IntStream.range(0, tables).mapToObj(range -> CreateTableRequest
.builder()
.tableClass(TableClass.STANDARD)
.tableName(tablePrefix + (range + 1))
.attributeDefinitions(
AttributeDefinition.builder()
.attributeName("attr_1")
.attributeType(ScalarAttributeType.S)
.build(),
AttributeDefinition.builder()
.attributeName("attr_2")
.attributeType(ScalarAttributeType.S)
.build()
)
.keySchema(
KeySchemaElement.builder()
.attributeName("attr_1")
.keyType(KeyType.HASH)
.build(),
KeySchemaElement.builder()
.attributeName("attr_2")
.keyType(KeyType.RANGE)
.build()
)
.provisionedThroughput(ProvisionedThroughput.builder()
.readCapacityUnits(10L)
.writeCapacityUnits(10L).build())
.build())
.toList();
}
public static PutItemRequest putItemRequest(String tableName, Map<String, AttributeValue> item) {
return PutItemRequest
.builder()
.tableName(tableName)
.item(item)
.build();
}
public static JsonNode createJsonConfig(DynamodbContainer dynamodbContainer) {
return Jsons.jsonNode(ImmutableMap.builder()
.put("endpoint", dynamodbContainer.getEndpointOverride().toString())
.put("region", dynamodbContainer.getRegion())
.put("access_key_id", dynamodbContainer.getAccessKey())
.put("secret_access_key", dynamodbContainer.getSecretKey())
.build());
}
public static ConfiguredAirbyteCatalog createConfiguredAirbyteCatalog(String streamName) {
return new ConfiguredAirbyteCatalog().withStreams(Lists.newArrayList(
new ConfiguredAirbyteStream()
.withSyncMode(SyncMode.INCREMENTAL)
.withCursorField(Lists.newArrayList("attr_timestamp"))
.withPrimaryKey(List.of(List.of("attr_1", "attr_2")))
.withDestinationSyncMode(DestinationSyncMode.APPEND)
.withStream(CatalogHelpers.createAirbyteStream(
streamName,
Field.of("attr_1", JsonSchemaType.STRING),
Field.of("attr_2", JsonSchemaType.STRING),
Field.of("attr_3", JsonSchemaType.NUMBER),
Field.of("attr_timestamp", JsonSchemaType.INTEGER))
.withSupportedSyncModes(Lists.newArrayList(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))));
}
}

View File

@@ -0,0 +1,184 @@
package io.airbyte.integrations.source.dynamodb;
import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import java.util.List;
import java.util.Map;
import java.util.Set;
import org.json.JSONException;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
public class DynamodbOperationsTest {
private static final String TABLE_NAME = "airbyte_table";
private DynamodbOperations dynamodbOperations;
private DynamoDbClient dynamoDbClient;
private DynamodbContainer dynamodbContainer;
private ObjectMapper objectMapper;
@BeforeEach
void setup() {
dynamodbContainer = DynamodbContainer.createWithStart();
var jsonConfig = DynamodbDataFactory.createJsonConfig(dynamodbContainer);
this.dynamodbOperations = new DynamodbOperations(DynamodbConfig.createDynamodbConfig(jsonConfig));
this.dynamoDbClient = DynamodbUtils.createDynamoDbClient(DynamodbConfig.createDynamodbConfig(jsonConfig));
this.objectMapper = new ObjectMapper()
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
.configure(SerializationFeature.INDENT_OUTPUT, true);
}
@AfterEach
void shutdown() {
dynamoDbClient.close();
dynamodbOperations.close();
dynamodbContainer.stop();
dynamodbContainer.close();
}
@Test
void testListTables() {
var createTableRequests = DynamodbDataFactory.createTables(TABLE_NAME, 5);
createTableRequests.forEach(dynamoDbClient::createTable);
List<String> tables = dynamodbOperations.listTables();
assertThat(tables).hasSize(5)
.anyMatch(t -> t.equals(TABLE_NAME + 1))
.anyMatch(t -> t.equals(TABLE_NAME + 2))
.anyMatch(t -> t.equals(TABLE_NAME + 3))
.anyMatch(t -> t.equals(TABLE_NAME + 4))
.anyMatch(t -> t.equals(TABLE_NAME + 5));
}
@Test
void testPrimaryKey() {
var createTableRequests = DynamodbDataFactory.createTables(TABLE_NAME, 1);
var createTableResponse = dynamoDbClient.createTable(createTableRequests.get(0));
var primaryKey = dynamodbOperations.primaryKey(createTableResponse.tableDescription().tableName());
assertThat(primaryKey).hasSize(2)
.anyMatch(t -> t.equals("attr_1"))
.anyMatch(t -> t.equals("attr_2"));
}
@Test
void testInferSchema() throws JsonProcessingException, JSONException {
var createTableRequests = DynamodbDataFactory.createTables(TABLE_NAME, 1);
var createTableResponse = dynamoDbClient.createTable(createTableRequests.get(0));
String tableName = createTableResponse.tableDescription().tableName();
PutItemRequest putItemRequest1 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_4").build(),
"attr_2", AttributeValue.builder().s("str_5").build(),
"attr_3", AttributeValue.builder().n("1234").build(),
"attr_4", AttributeValue.builder().ns("12.5", "74.5").build()));
dynamoDbClient.putItem(putItemRequest1);
PutItemRequest putItemRequest2 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_6").build(),
"attr_2", AttributeValue.builder().s("str_7").build(),
"attr_5", AttributeValue.builder().bool(true).build(),
"attr_6", AttributeValue.builder().ss("str_1", "str_2").build()));
dynamoDbClient.putItem(putItemRequest2);
var schema = dynamodbOperations.inferSchema(tableName, 1000);
JSONAssert.assertEquals(objectMapper.writeValueAsString(schema), """
{
"attr_5": {
"type": ["null","boolean"]
},
"attr_4": {
"type": ["null","array"],
"items": {
"type": ["null","number"]
}
},
"attr_3": {
"type": ["null","integer"]
},
"attr_2": {
"type": ["null","string"]
},
"attr_1": {
"type": ["null","string"]
},
"attr_6": {
"type": ["null","array"],
"items": {
"type": ["null","string"]
}
}
}
""", true);
}
@Test
void testScanTable() throws JsonProcessingException, JSONException {
var createTableRequests = DynamodbDataFactory.createTables(TABLE_NAME, 1);
var createTableResponse = dynamoDbClient.createTable(createTableRequests.get(0));
String tableName = createTableResponse.tableDescription().tableName();
PutItemRequest putItemRequest1 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_4").build(),
"attr_2", AttributeValue.builder().s("str_5").build(),
"attr_3", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
"attr_4", AttributeValue.builder().ns("12.5", "74.5").build()));
dynamoDbClient.putItem(putItemRequest1);
PutItemRequest putItemRequest2 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_6").build(),
"attr_2", AttributeValue.builder().s("str_7").build(),
"attr_3", AttributeValue.builder().s("2019-12-21T17:42:34Z").build(),
"attr_6", AttributeValue.builder().ss("str_1", "str_2").build()));
dynamoDbClient.putItem(putItemRequest2);
var response = dynamodbOperations.scanTable(tableName, Set.of("attr_1", "attr_2", "attr_3"),
new DynamodbOperations.FilterAttribute("attr_3", "2018-12-21T17:42:34Z",
DynamodbOperations.FilterAttribute.FilterType.S));
assertThat(response)
.hasSize(1);
JSONAssert.assertEquals(objectMapper.writeValueAsString(response.get(0)), """
{
"attr_3": "2019-12-21T17:42:34Z",
"attr_2": "str_7",
"attr_1": "str_6"
}
""", true);
}
}

View File

@@ -0,0 +1,85 @@
/*
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
import io.airbyte.integrations.standardtest.source.TestDestinationEnv;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.protocol.models.ConnectorSpecification;
import java.util.HashMap;
import java.util.Map;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
public class DynamodbSourceAcceptanceTest extends SourceAcceptanceTest {
private static final String TABLE_NAME = "airbyte_table";
private JsonNode config;
private DynamodbContainer dynamodbContainer;
private DynamoDbClient dynamoDbClient;
@Override
protected void setupEnvironment(final TestDestinationEnv testEnv) {
dynamodbContainer = DynamodbContainer.createWithStart();
config = DynamodbDataFactory.createJsonConfig(dynamodbContainer);
dynamoDbClient = DynamodbUtils.createDynamoDbClient(DynamodbConfig.createDynamodbConfig(config));
var createTableRequests = DynamodbDataFactory.createTables(TABLE_NAME, 1);
var createTableResponse = dynamoDbClient.createTable(createTableRequests.get(0));
String tableName = createTableResponse.tableDescription().tableName();
PutItemRequest putItemRequest = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_4").build(),
"attr_2", AttributeValue.builder().s("str_5").build(),
"attr_3", AttributeValue.builder().n("1234.25").build(),
"attr_timestamp", AttributeValue.builder().n("1572268323").build()));
dynamoDbClient.putItem(putItemRequest);
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) {
dynamoDbClient.close();
dynamodbContainer.stop();
dynamodbContainer.close();
}
@Override
protected String getImageName() {
return "airbyte/source-dynamodb:dev";
}
@Override
protected ConnectorSpecification getSpec() throws Exception {
return Jsons.deserialize(MoreResources.readResource("spec.json"), ConnectorSpecification.class);
}
@Override
protected JsonNode getConfig() {
return config;
}
@Override
protected ConfiguredAirbyteCatalog getConfiguredCatalog() {
return DynamodbDataFactory.createConfiguredAirbyteCatalog(TABLE_NAME + 1);
}
@Override
protected JsonNode getState() {
return Jsons.jsonNode(new HashMap<>());
}
}

View File

@@ -0,0 +1,158 @@
package io.airbyte.integrations.source.dynamodb;
import static org.assertj.core.api.Assertions.assertThat;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.SyncMode;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.stream.Stream;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.services.dynamodb.DynamoDbClient;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
import software.amazon.awssdk.services.dynamodb.model.PutItemRequest;
public class DynamodbSourceTest {
private static final String TABLE_NAME = "airbyte_table";
private DynamodbSource dynamodbSource;
private DynamoDbClient dynamoDbClient;
private DynamodbContainer dynamodbContainer;
@BeforeEach
void setup() {
dynamodbContainer = DynamodbContainer.createWithStart();
var jsonConfig = DynamodbDataFactory.createJsonConfig(dynamodbContainer);
this.dynamodbSource = new DynamodbSource();
this.dynamoDbClient = DynamodbUtils.createDynamoDbClient(DynamodbConfig.createDynamodbConfig(jsonConfig));
}
@AfterEach
void shutdown() {
dynamoDbClient.close();
dynamodbContainer.stop();
dynamodbContainer.close();
}
@Test
void testCheckWithSucceeded() {
var jsonConfig = DynamodbDataFactory.createJsonConfig(dynamodbContainer);
DynamodbDataFactory.createTables(TABLE_NAME, 1).forEach(dynamoDbClient::createTable);
var connectionStatus = dynamodbSource.check(jsonConfig);
assertThat(connectionStatus.getStatus()).isEqualTo(AirbyteConnectionStatus.Status.SUCCEEDED);
}
@Test
void testCheckWithFailed() {
var jsonConfig = DynamodbDataFactory.createJsonConfig(dynamodbContainer);
((ObjectNode) jsonConfig).replace("endpoint", Jsons.jsonNode("localhost:8080"));
DynamodbDataFactory.createTables(TABLE_NAME, 1).forEach(dynamoDbClient::createTable);
var connectionStatus = dynamodbSource.check(jsonConfig);
assertThat(connectionStatus.getStatus()).isEqualTo(AirbyteConnectionStatus.Status.FAILED);
}
@Test
void testDiscover() {
var jsonConfig = DynamodbDataFactory.createJsonConfig(dynamodbContainer);
var createTableRequests = DynamodbDataFactory.createTables(TABLE_NAME, 2);
var createTableResponses = createTableRequests.stream().map(dynamoDbClient::createTable).toList();
DynamodbDataFactory.putItemRequest(createTableResponses.get(0).tableDescription().tableName(), Map.of(
"attr_1", AttributeValue.builder().s("str_4").build(),
"attr_2", AttributeValue.builder().s("str_5").build(),
"attr_3", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
"attr_4", AttributeValue.builder().ns("12.5", "74.5").build()));
DynamodbDataFactory.putItemRequest(createTableResponses.get(1).tableDescription().tableName(), Map.of(
"attr_1", AttributeValue.builder().s("str_4").build(),
"attr_2", AttributeValue.builder().s("str_5").build(),
"attr_4", AttributeValue.builder().s("2017-12-21T17:42:34Z").build(),
"attr_5", AttributeValue.builder().ns("12.5", "74.5").build()));
var airbyteCatalog = dynamodbSource.discover(jsonConfig);
assertThat(airbyteCatalog.getStreams())
.anyMatch(as -> as.getName().equals(createTableResponses.get(0).tableDescription().tableName()) &&
as.getJsonSchema().isObject() &&
as.getSourceDefinedPrimaryKey().get(0).containsAll(List.of("attr_1", "attr_2")) &&
as.getSupportedSyncModes().containsAll(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)))
.anyMatch(as -> as.getName().equals(createTableResponses.get(1).tableDescription().tableName()) &&
as.getJsonSchema().isObject() &&
as.getSourceDefinedPrimaryKey().get(0).containsAll(List.of("attr_1", "attr_2")) &&
as.getSupportedSyncModes().containsAll(List.of(SyncMode.FULL_REFRESH, SyncMode.INCREMENTAL)));
}
@Test
void testRead() {
var jsonConfig = DynamodbDataFactory.createJsonConfig(dynamodbContainer);
var createTableRequests = DynamodbDataFactory.createTables(TABLE_NAME, 1);
var createTableResponses = createTableRequests.stream().map(dynamoDbClient::createTable).toList();
String tableName = createTableResponses.get(0).tableDescription().tableName();
var configuredCatalog = DynamodbDataFactory.createConfiguredAirbyteCatalog(tableName);
PutItemRequest putItemRequest1 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_4").build(),
"attr_2", AttributeValue.builder().s("str_5").build(),
"attr_3", AttributeValue.builder().n("1234.25").build(),
"attr_timestamp", AttributeValue.builder().n("1572268323").build()));
dynamoDbClient.putItem(putItemRequest1);
PutItemRequest putItemRequest2 = DynamodbDataFactory.putItemRequest(tableName, Map.of(
"attr_1", AttributeValue.builder().s("str_6").build(),
"attr_2", AttributeValue.builder().s("str_7").build(),
"attr_3", AttributeValue.builder().n("1234.25").build(),
"attr_timestamp", AttributeValue.builder().n("1672228343").build()));
dynamoDbClient.putItem(putItemRequest2);
Iterator<AirbyteMessage> iterator = dynamodbSource.read(jsonConfig, configuredCatalog, Jsons.emptyObject());
var airbyteRecordMessages = Stream.generate(() -> null)
.takeWhile(x -> iterator.hasNext())
.map(n -> iterator.next())
.filter(am -> am.getType() == AirbyteMessage.Type.RECORD)
.map(AirbyteMessage::getRecord)
.toList();
assertThat(airbyteRecordMessages)
.anyMatch(arm -> arm.getStream().equals(tableName) &&
Jsons.serialize(arm.getData()).equals(
"{\"attr_timestamp\":1572268323,\"attr_3\":1234.25,\"attr_2\":\"str_5\",\"attr_1\":\"str_4\"}"))
.anyMatch(arm -> arm.getStream().equals(tableName) &&
Jsons.serialize(arm.getData()).equals(
"{\"attr_timestamp\":1672228343,\"attr_3\":1234.25,\"attr_2\":\"str_7\",\"attr_1\":\"str_6\"}"));
}
}

View File

@@ -0,0 +1,77 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.json.JSONException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
class DynamodbAttributeSerializerTest {
private ObjectMapper attributeObjectMapper;
@BeforeEach
void setup() {
SimpleModule module = new SimpleModule();
module.addSerializer(AttributeValue.class, new DynamodbAttributeSerializer());
this.attributeObjectMapper = new ObjectMapper()
.registerModule(module)
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
.configure(SerializationFeature.INDENT_OUTPUT, true);
}
@Test
void serializeAttributeValueToJson() throws JSONException, JsonProcessingException {
Map<String, AttributeValue> items = Map.of(
"sAttribute", AttributeValue.builder().s("string").build(),
"nAttribute", AttributeValue.builder().n("123").build(),
"bAttribute",
AttributeValue.builder().b(SdkBytes.fromByteArray("byteArray".getBytes(StandardCharsets.UTF_8))).build(),
"ssAttribute", AttributeValue.builder().ss("string1", "string2").build(),
"nsAttribute", AttributeValue.builder().ns("12.5", "25.5").build(),
"bsAttribute", AttributeValue.builder().bs(
SdkBytes.fromByteArray("byteArray1".getBytes(StandardCharsets.UTF_8)),
SdkBytes.fromByteArray("byteArray2".getBytes(StandardCharsets.UTF_8))).build(),
"lAttribute", AttributeValue.builder().l(
AttributeValue.builder().s("string3").build(),
AttributeValue.builder().n("125").build()).build(),
"mAttribute", AttributeValue.builder().m(Map.of(
"attr1", AttributeValue.builder().s("string4").build(),
"attr2", AttributeValue.builder().s("string5").build())).build(),
"boolAttribute", AttributeValue.builder().bool(false).build(),
"nulAttribute", AttributeValue.builder().nul(true).build()
);
var jsonNode = attributeObjectMapper.writeValueAsString(items);
JSONAssert.assertEquals(jsonNode, """
{
"bAttribute": "Ynl0ZUFycmF5",
"boolAttribute": false,
"bsAttribute": ["Ynl0ZUFycmF5MQ==", "Ynl0ZUFycmF5Mg=="],
"lAttribute": ["string3", 125],
"mAttribute": {
"attr1": "string4",
"attr2": "string5"
},
"nAttribute": 123,
"nsAttribute": [12.5, 25.5],
"nulAttribute": null,
"sAttribute": "string",
"ssAttribute": ["string1", "string2"]
}
""", true);
}
}

View File

@@ -0,0 +1,33 @@
package io.airbyte.integrations.source.dynamodb;
import static org.assertj.core.api.Assertions.assertThat;
import io.airbyte.commons.json.Jsons;
import java.net.URI;
import java.util.Map;
import org.junit.jupiter.api.Test;
import software.amazon.awssdk.regions.Region;
class DynamodbConfigTest {
@Test
void testDynamodbConfig() {
var jsonConfig = Jsons.jsonNode(Map.of(
"endpoint", "http://localhost:8080",
"region", "us-east-1",
"access_key_id", "A012345678910EXAMPLE",
"secret_access_key", "a012345678910ABCDEFGH/AbCdEfGhLEKEY"
));
var dynamodbConfig = DynamodbConfig.createDynamodbConfig(jsonConfig);
assertThat(dynamodbConfig)
.hasFieldOrPropertyWithValue("endpoint", URI.create("http://localhost:8080"))
.hasFieldOrPropertyWithValue("region", Region.of("us-east-1"))
.hasFieldOrPropertyWithValue("accessKey", "A012345678910EXAMPLE")
.hasFieldOrPropertyWithValue("secretKey", "a012345678910ABCDEFGH/AbCdEfGhLEKEY");
}
}

View File

@@ -0,0 +1,119 @@
package io.airbyte.integrations.source.dynamodb;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.SerializationFeature;
import com.fasterxml.jackson.databind.module.SimpleModule;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import org.json.JSONException;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.skyscreamer.jsonassert.JSONAssert;
import software.amazon.awssdk.core.SdkBytes;
import software.amazon.awssdk.services.dynamodb.model.AttributeValue;
class DynamodbSchemaSerializerTest {
private ObjectMapper schemaObjectMapper;
@BeforeEach
void setup() {
SimpleModule module = new SimpleModule();
module.addSerializer(AttributeValue.class, new DynamodbSchemaSerializer());
this.schemaObjectMapper = new ObjectMapper()
.registerModule(module)
.configure(SerializationFeature.ORDER_MAP_ENTRIES_BY_KEYS, true)
.configure(SerializationFeature.INDENT_OUTPUT, true);
}
@Test
void serializeAttributeValueToJsonSchema() throws JsonProcessingException, JSONException {
Map<String, AttributeValue> items = Map.of(
"sAttribute", AttributeValue.builder().s("string").build(),
"nAttribute", AttributeValue.builder().n("123").build(),
"bAttribute",
AttributeValue.builder().b(SdkBytes.fromByteArray("byteArray".getBytes(StandardCharsets.UTF_8))).build(),
"ssAttribute", AttributeValue.builder().ss("string1", "string2").build(),
"nsAttribute", AttributeValue.builder().ns("125", "126").build(),
"bsAttribute", AttributeValue.builder().bs(
SdkBytes.fromByteArray("byteArray1".getBytes(StandardCharsets.UTF_8)),
SdkBytes.fromByteArray("byteArray2".getBytes(StandardCharsets.UTF_8))).build(),
"lAttribute", AttributeValue.builder().l(
AttributeValue.builder().s("string3").build(),
AttributeValue.builder().n("12.5").build()).build(),
"mAttribute", AttributeValue.builder().m(Map.of(
"attr1", AttributeValue.builder().s("string4").build(),
"attr2", AttributeValue.builder().s("number4").build())).build(),
"boolAttribute", AttributeValue.builder().bool(false).build(),
"nulAttribute", AttributeValue.builder().nul(true).build()
);
var jsonSchema = schemaObjectMapper.writeValueAsString(items);
JSONAssert.assertEquals(jsonSchema, """
{
"bAttribute": {
"type": ["null", "string"],
"contentEncoding": "base64"
},
"boolAttribute": {
"type": ["null", "boolean"]
},
"bsAttribute": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"],
"contentEncoding": "base64"
}
},
"lAttribute": {
"type": ["null", "array"],
"items": {
"anyOf": [{
"type": ["null", "string"]
}, {
"type": ["null", "number"]
}]
}
},
"mAttribute": {
"type": ["null", "object"],
"properties": {
"attr2": {
"type": ["null", "string"]
},
"attr1": {
"type": ["null", "string"]
}
}
},
"nAttribute": {
"type": ["null", "integer"]
},
"nsAttribute": {
"type": ["null", "array"],
"items": {
"type": ["null", "number"]
}
},
"nulAttribute": {
"type": "null"
},
"sAttribute": {
"type": ["null", "string"]
},
"ssAttribute": {
"type": ["null", "array"],
"items": {
"type": ["null", "string"]
}
}
}
""", true);
}
}

View File

@@ -0,0 +1,64 @@
# Dynamodb
The Dynamodb source allows you to sync data from Dynamodb. The source supports Full Refresh and Incremental sync strategies.
## Resulting schema
Dynamodb doesn't have table schemas. The discover phase has three steps:
### Step 1. Retrieve items
The connector scans the table with a scan limit of 1k and if the data set size is > 1MB it will initiate another
scan with the same limit until it has >= 1k items.
### Step 2. Combining attributes
After retrieving the items it will combine all the different top level attributes found in the retrieved items. The implementation
assumes that the same attribute present in different items has the same type and possibly nested attributes values.
### Step 3. Determine property types
For each item attribute found the connector determines its type by calling AttributeValue.type(), depending on the received type it will map the
attribute to one of the supported Airbyte types in the schema.
## Features
| Feature | Supported |
| :--- | :--- |
| Full Refresh Sync | Yes |
| Incremental - Append Sync | Yes |
| Replicate Incremental Deletes | No |
| Namespaces | No |
### Full Refresh sync
Works as usual full refresh sync.
### Incremental sync
Cursor field can't be nested, and it needs to be top level attribute in the item.
Cursor should **never** be blank. and it needs to be either a string or integer type - the incremental sync results might be unpredictable and will totally rely on Dynamodb comparison algorithm.
Only `ISO 8601` and `epoch` cursor types are supported. Cursor type is determined based on the property type present in the previously generated schema:
* `ISO 8601` - if cursor type is string
* `epoch` - if cursor type is integer
## Getting started
This guide describes in details how you can configure the connector to connect with Dynamodb.
### Сonfiguration Parameters
* endpoint: aws endpoint of the dynamodb instance
* region: the region code of the dynamodb instance
* access_key_id: the access key for the IAM user with the required permissions
* secret_access_key: the secret key for the IAM user with the required permissions
## Changelog
| Version | Date | Pull Request | Subject |
|:--------|:-----|:-------------|:--------|
| | | | |