1
0
mirror of synced 2025-12-25 02:09:19 -05:00

correct timestamps in standard destination tests and assume millis input in BQ (#981)

This commit is contained in:
Sherif A. Nada
2020-11-14 18:59:50 -08:00
committed by GitHub
parent 0e8a315bc0
commit fba7f1f8bc
6 changed files with 31 additions and 22 deletions

View File

@@ -2,6 +2,6 @@
"destinationDefinitionId": "22f6c74f-5699-40ff-833c-4a879ea40133",
"name": "BigQuery",
"dockerRepository": "airbyte/destination-bigquery",
"dockerImageTag": "0.1.1",
"dockerImageTag": "0.1.2",
"documentationUrl": "https://hub.docker.com/r/airbyte/integration-singer-bigquery-destination"
}

View File

@@ -1,4 +1,4 @@
{"type": "RECORD", "record": {"stream": "streamWithCamelCase", "emitted_at": 1602637589, "data": { "data" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_underscores", "emitted_at": 1602637589, "data": { "data" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_edge_case_field_names", "emitted_at": 1602637589, "data": { "fieldWithCamelCase" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_edge_case_field_names", "emitted_at": 1602637589, "data": { "field_with_underscore" : "one" }}}
{"type": "RECORD", "record": {"stream": "streamWithCamelCase", "emitted_at": 1602637589000, "data": { "data" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_underscores", "emitted_at": 1602637589000, "data": { "data" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_edge_case_field_names", "emitted_at": 1602637589000, "data": { "fieldWithCamelCase" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_edge_case_field_names", "emitted_at": 1602637589000, "data": { "field_with_underscore" : "one" }}}

View File

@@ -1,8 +1,8 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589, "data": { "date": "2020-08-29T00:00:00Z", "NZD": 0.12, "HKD": 2.13}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-29T00:00:00Z", "NZD": 0.12, "HKD": 2.13}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-08-31"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589, "data": { "date": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD": 7.15}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD": 7.15}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-01"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.99, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589, "data": { "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.99, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}

View File

@@ -8,5 +8,5 @@ COPY build/distributions/${APPLICATION}*.tar ${APPLICATION}.tar
RUN tar xf ${APPLICATION}.tar --strip-components=1
LABEL io.airbyte.version=0.1.1
LABEL io.airbyte.version=0.1.2
LABEL io.airbyte.name=airbyte/destination-bigquery

View File

@@ -38,6 +38,7 @@ import com.google.cloud.bigquery.JobInfo;
import com.google.cloud.bigquery.JobInfo.CreateDisposition;
import com.google.cloud.bigquery.JobInfo.WriteDisposition;
import com.google.cloud.bigquery.QueryJobConfiguration;
import com.google.cloud.bigquery.QueryParameterValue;
import com.google.cloud.bigquery.StandardSQLTypeName;
import com.google.cloud.bigquery.StandardTableDefinition;
import com.google.cloud.bigquery.TableDataWriteChannel;
@@ -68,6 +69,7 @@ import java.time.Instant;
import java.util.HashMap;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -266,10 +268,16 @@ public class BigQueryDestination implements Destination {
String.format("Message contained record from a stream that was not in the catalog. \ncatalog: %s , \nmessage: %s",
Jsons.serialize(catalog), Jsons.serialize(message)));
}
// Bigquery represents TIMESTAMP to the microsecond precision, so we convert to microseconds then
// use BQ helpers to string-format correctly.
long emittedAtMicroseconds = TimeUnit.MICROSECONDS.convert(message.getRecord().getEmittedAt(), TimeUnit.MILLISECONDS);
String formattedEmittedAt = QueryParameterValue.timestamp(emittedAtMicroseconds).getValue();
final JsonNode data = Jsons.jsonNode(ImmutableMap.of(
COLUMN_AB_ID, UUID.randomUUID().toString(),
COLUMN_DATA, Jsons.serialize(message.getRecord().getData()),
COLUMN_EMITTED_AT, message.getRecord().getEmittedAt()));
COLUMN_EMITTED_AT, formattedEmittedAt));
try {
writeConfigs.get(message.getRecord().getStream()).getWriter()
.write(ByteBuffer.wrap((Jsons.serialize(data) + "\n").getBytes(Charsets.UTF_8)));

View File

@@ -30,6 +30,7 @@ import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.integrations.standardtest.destination.TestDestination;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
@@ -40,7 +41,7 @@ public class PostgresIntegrationTest extends TestDestination {
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
private static final String COLUMN_NAME = "data";
private static final String RAW_DATA_COLUMN = "data";
private PostgreSQLContainer<?> db;
@Override
@@ -74,14 +75,10 @@ public class PostgresIntegrationTest extends TestDestination {
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName) throws Exception {
return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(), db.getJdbcUrl()).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM \"%s\" ORDER BY emitted_at ASC;", NamingHelper.getRawTableName(streamName)))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)
.map(r -> Jsons.deserialize(r.get(COLUMN_NAME).asText()))
.collect(Collectors.toList()));
return retrieveRecordsFromTable(NamingHelper.getRawTableName(streamName))
.stream()
.map(r -> Jsons.deserialize(r.get(RAW_DATA_COLUMN).asText()))
.collect(Collectors.toList());
}
@Override
@@ -92,10 +89,14 @@ public class PostgresIntegrationTest extends TestDestination {
@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv env, String streamName)
throws Exception {
return retrieveRecordsFromTable(streamName);
}
private List<JsonNode> retrieveRecordsFromTable(String tableName) throws SQLException {
return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(),
db.getJdbcUrl()).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM \"%s\" ORDER BY emitted_at ASC;", streamName))
.fetch(String.format("SELECT * FROM \"%s\" ORDER BY emitted_at ASC;", tableName))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)