1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Naming conventions managed in destinations (#1060)

Handled field and table identifier conversions as necessary in order to successfully in destinations
This commit is contained in:
Christophe Duong
2020-11-25 18:53:23 +01:00
committed by GitHub
parent 45c5cc0f84
commit 206d3cbea8
38 changed files with 850 additions and 244 deletions

View File

@@ -30,6 +30,14 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
public interface Destination extends Integration {
/**
* Return which Naming Conventions this destination is using in order to handle invalid characters
* in identifiers.
*
* @return SQLNamingResolvable conventions used when creating tables.
*/
SQLNamingResolvable getNamingResolver();
/**
* Return a consumer that writes messages to the destination.
*

View File

@@ -0,0 +1,52 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
public class ExtendedSQLNaming extends StandardSQLNaming {
@Override
protected String convertStreamName(String input) {
if (useExtendedIdentifiers(input)) {
return "\"" + input + "\"";
} else {
return applyDefaultCase(input);
}
}
protected String applyDefaultCase(String input) {
return input;
}
protected boolean useExtendedIdentifiers(String input) {
boolean result = false;
if (input.matches("[^\\p{Alpha}_].*")) {
result = true;
} else if (input.matches(".*[^\\p{Alnum}_].*")) {
result = true;
}
return result;
}
}

View File

@@ -24,10 +24,11 @@
package io.airbyte.integrations.base;
public class NamingHelper {
public class RedshiftSQLNaming extends ExtendedSQLNaming {
public static String getRawTableName(String streamName) {
return streamName + "_raw";
@Override
protected String convertStreamName(String input) {
return super.convertStreamName(input).toLowerCase();
}
}

View File

@@ -0,0 +1,57 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
public interface SQLNamingResolvable {
/**
* Handle Naming Conversions of an input name to output a valid identifier name for the desired SQL
* dialect.
*
* @param name of the identifier to check proper naming conventions
* @return modified name with invalid characters replaced by '_' and adapted for the chosen SQL
* dialect.
*/
String getIdentifier(String name);
/**
* Same as getIdentifier but returns also the name of the table for storing raw data
*
* @param name of the identifier to check proper naming conventions
* @return modified name with invalid characters replaced by '_' and adapted for the chosen SQL
* dialect.
*/
String getRawTableName(String name);
/**
* Same as getIdentifier but returns also the name of the table for storing tmp data
*
* @param name of the identifier to check proper naming conventions
* @return modified name with invalid characters replaced by '_' and adapted for the chosen SQL
* dialect.
*/
String getTmpTableName(String name);
}

View File

@@ -0,0 +1,59 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import java.text.Normalizer;
import java.time.Instant;
public class StandardSQLNaming implements SQLNamingResolvable {
@Override
public String getIdentifier(String name) {
return convertStreamName(name);
}
@Override
public String getRawTableName(String streamName) {
return convertStreamName(streamName + "_raw");
}
@Override
public String getTmpTableName(String streamName) {
return convertStreamName(streamName + "_" + Instant.now().toEpochMilli());
}
protected String convertStreamName(String input) {
final String value = Normalizer.normalize(input, Normalizer.Form.NFKD);
return value
.replaceAll("\\p{M}", "")
.replaceAll("\\s+", "_")
.replaceAll(getNonValidCharacterPattern(), "_");
}
protected String getNonValidCharacterPattern() {
return "[^\\p{Alnum}_]";
}
}

View File

@@ -0,0 +1,78 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.base;
import static org.junit.jupiter.api.Assertions.assertEquals;
import org.junit.jupiter.api.Test;
class NamingResolverTest {
@Test
void testStandardSQLNaming() {
final SQLNamingResolvable namingResolver = new StandardSQLNaming();
assertEquals("identifier_name", namingResolver.getIdentifier("identifier_name"));
assertEquals("iDenTiFieR_name", namingResolver.getIdentifier("iDenTiFieR_name"));
assertEquals("__identifier_name", namingResolver.getIdentifier("__identifier_name"));
assertEquals("IDENTIFIER_NAME", namingResolver.getIdentifier("IDENTIFIER_NAME"));
assertEquals("123identifier_name", namingResolver.getIdentifier("123identifier_name"));
assertEquals("i0d0e0n0t0i0f0i0e0r0n0a0m0e", namingResolver.getIdentifier("i0d0e0n0t0i0f0i0e0r0n0a0m0e"));
assertEquals("_identifier_name", namingResolver.getIdentifier(",identifier+name"));
assertEquals("identifier_name", namingResolver.getIdentifier("identifiêr name"));
assertEquals("a_unicode_name__", namingResolver.getIdentifier("a_unicode_name_文"));
assertEquals("identifier__name__", namingResolver.getIdentifier("identifier__name__"));
assertEquals("identifier_name_weee", namingResolver.getIdentifier("identifier-name.weee"));
assertEquals("_identifier_name_", namingResolver.getIdentifier("\"identifier name\""));
assertEquals("identifier_name", namingResolver.getIdentifier("identifier name"));
assertEquals("identifier_", namingResolver.getIdentifier("identifier%"));
assertEquals("_identifier_", namingResolver.getIdentifier("`identifier`"));
assertEquals("identifier_name_raw", namingResolver.getRawTableName("identifier_name"));
}
@Test
void testExtendedSQLNaming() {
final SQLNamingResolvable namingResolver = new ExtendedSQLNaming();
assertEquals("identifier_name", namingResolver.getIdentifier("identifier_name"));
assertEquals("iDenTiFieR_name", namingResolver.getIdentifier("iDenTiFieR_name"));
assertEquals("__identifier_name", namingResolver.getIdentifier("__identifier_name"));
assertEquals("IDENTIFIER_NAME", namingResolver.getIdentifier("IDENTIFIER_NAME"));
assertEquals("\"123identifier_name\"", namingResolver.getIdentifier("123identifier_name"));
assertEquals("i0d0e0n0t0i0f0i0e0r0n0a0m0e", namingResolver.getIdentifier("i0d0e0n0t0i0f0i0e0r0n0a0m0e"));
assertEquals("\",identifier+name\"", namingResolver.getIdentifier(",identifier+name"));
assertEquals("\"identifiêr name\"", namingResolver.getIdentifier("identifiêr name"));
assertEquals("\"a_unicode_name_文\"", namingResolver.getIdentifier("a_unicode_name_文"));
assertEquals("identifier__name__", namingResolver.getIdentifier("identifier__name__"));
assertEquals("\"identifier-name.weee\"", namingResolver.getIdentifier("identifier-name.weee"));
assertEquals("\"\"identifier name\"\"", namingResolver.getIdentifier("\"identifier name\""));
assertEquals("\"identifier name\"", namingResolver.getIdentifier("identifier name"));
assertEquals("\"identifier%\"", namingResolver.getIdentifier("identifier%"));
assertEquals("\"`identifier`\"", namingResolver.getIdentifier("`identifier`"));
assertEquals("identifier_name_raw", namingResolver.getRawTableName("identifier_name"));
assertEquals("\"identifiêr name_raw\"", namingResolver.getRawTableName("identifiêr name"));
}
}

View File

@@ -0,0 +1,4 @@
build/
logs/
dbt-project-template/models/generated/
dbt_modules/

View File

@@ -1,4 +0,0 @@
build/
logs/
models/generated/
dbt_modules/

View File

@@ -1,7 +1,7 @@
# Name your package! Package names should contain only lowercase characters
# and underscores. A good package name should reflect your organization's
# name or the intended use of these models
name: 'airbyte'
name: 'airbyte_utils'
version: '1.0'
config-version: 2
@@ -19,12 +19,14 @@ test-paths: ["tests"]
data-paths: ["data"]
macro-paths: ["macros"]
target-path: "build" # directory which will store compiled SQL files
target-path: "../build" # directory which will store compiled SQL files
log-path: "../logs" # directory which will store DBT logs
modules-path: "../dbt_modules" # directory which will store external DBT dependencies
clean-targets: # directories to be removed by `dbt clean`
- "build"
- "dbt_modules"
# https://docs.getdbt.com/reference/project-configs/quoting/
quoting:
database: true
schema: true
@@ -34,5 +36,7 @@ quoting:
# Using these configurations, you can enable or disable models, change how they
# are materialized, and more!
models:
airbyte:
+materialized: table
+materialized: table
vars:
dbt_utils_dispatch_list: ['airbyte_utils']

View File

@@ -6,28 +6,9 @@
#}
{% macro concat(fields) -%}
{{ adapter.dispatch('concat')(fields) }}
{{ adapter.dispatch('concat', packages = ['airbyte_utils', 'dbt_utils'])(fields) }}
{%- endmacro %}
{% macro default__concat(fields) -%}
concat({{ fields|join(', ') }})
{%- endmacro %}
{% macro alternative_concat(fields) %}
{{ fields|join(' || ') }}
{% endmacro %}
{% macro postgres__concat(fields) %}
{{ dbt_utils.alternative_concat(fields) }}
{% endmacro %}
{% macro redshift__concat(fields) %}
{{ dbt_utils.alternative_concat(fields) }}
{% endmacro %}
{% macro snowflake__concat(fields) %}
{{ dbt_utils.alternative_concat(fields) }}
{% endmacro %}

View File

@@ -1,51 +0,0 @@
{#
Overriding the following macro from dbt-utils:
https://github.com/fishtown-analytics/dbt-utils/blob/0.6.2/macros/sql/surrogate_key.sql
To implement our own version of concat
Because on postgres, we cannot pass more than 100 arguments to a function
#}
{%- macro surrogate_key(field_list) -%}
{%- if varargs|length >= 1 or field_list is string %}
{%- set error_message = '
Warning: the `surrogate_key` macro now takes a single list argument instead of \
multiple string arguments. Support for multiple string arguments will be \
deprecated in a future release of dbt-utils. The {}.{} model triggered this warning. \
'.format(model.package_name, model.name) -%}
{%- do exceptions.warn(error_message) -%}
{# first argument is not included in varargs, so add first element to field_list_xf #}
{%- set field_list_xf = [field_list] -%}
{%- for field in varargs %}
{%- set _ = field_list_xf.append(field) -%}
{%- endfor -%}
{%- else -%}
{# if using list, just set field_list_xf as field_list #}
{%- set field_list_xf = field_list -%}
{%- endif -%}
{%- set fields = [] -%}
{%- for field in field_list_xf -%}
{%- set _ = fields.append(
"coalesce(cast(" ~ field ~ " as " ~ dbt_utils.type_string() ~ "), '')"
) -%}
{%- if not loop.last %}
{%- set _ = fields.append("'-'") -%}
{%- endif -%}
{%- endfor -%}
{{dbt_utils.hash(concat(fields))}}
{%- endmacro -%}

View File

@@ -43,7 +43,7 @@ function main() {
run)
cp -r /airbyte/normalization_code/dbt-template/* $PROJECT_DIR
transform-config --config "$CONFIG_FILE" --integration-type "$INTEGRATION_TYPE" --out $PROJECT_DIR
transform-catalog --profile-config-dir $PROJECT_DIR --catalog "$CATALOG_FILE" --out $PROJECT_DIR/models/generated/ --json-column data
transform-catalog --integration-type "$INTEGRATION_TYPE" --profile-config-dir $PROJECT_DIR --catalog "$CATALOG_FILE" --out $PROJECT_DIR/models/generated/ --json-column data
dbt deps --profiles-dir $PROJECT_DIR --project-dir $PROJECT_DIR
dbt run --profiles-dir $PROJECT_DIR --project-dir $PROJECT_DIR
;;

View File

@@ -25,6 +25,8 @@ SOFTWARE.
import argparse
import json
import os
import unicodedata as ud
from re import match, sub
from typing import List, Optional, Set, Tuple, Union
import yaml
@@ -50,6 +52,7 @@ python3 main_dev_transform_catalog.py \
def parse(self, args) -> None:
parser = argparse.ArgumentParser(add_help=False)
parser.add_argument("--integration-type", type=str, required=True, help="type of integration dialect to use")
parser.add_argument("--profile-config-dir", type=str, required=True, help="path to directory containing DBT profiles.yml")
parser.add_argument("--catalog", nargs="+", type=str, required=True, help="path to Catalog (JSON Schema) file")
parser.add_argument("--out", type=str, required=True, help="path to output generated DBT Models to")
@@ -57,6 +60,7 @@ python3 main_dev_transform_catalog.py \
parsed_args = parser.parse_args(args)
profiles_yml = read_profiles_yml(parsed_args.profile_config_dir)
self.config = {
"integration_type": parsed_args.integration_type,
"schema": extract_schema(profiles_yml),
"catalog": parsed_args.catalog,
"output_path": parsed_args.out,
@@ -65,15 +69,18 @@ python3 main_dev_transform_catalog.py \
def process_catalog(self) -> None:
source_tables: set = set()
integration_type = self.config["integration_type"]
schema = self.config["schema"]
output = self.config["output_path"]
for catalog_file in self.config["catalog"]:
print(f"Processing {catalog_file}...")
catalog = read_json_catalog(catalog_file)
result, tables = generate_dbt_model(catalog=catalog, json_col=self.config["json_column"], schema=schema)
result, tables = generate_dbt_model(
integration_type=integration_type, catalog=catalog, json_col=self.config["json_column"], schema=schema
)
self.output_sql_models(output, result)
source_tables = source_tables.union(tables)
self.write_yaml_sources(output, schema, source_tables)
self.write_yaml_sources(output, schema, source_tables, integration_type)
@staticmethod
def output_sql_models(output: str, result: dict) -> None:
@@ -86,25 +93,30 @@ python3 main_dev_transform_catalog.py \
f.write(sql)
@staticmethod
def write_yaml_sources(output: str, schema: str, sources: set) -> None:
tables = [{"name": source} for source in sources]
def write_yaml_sources(output: str, schema: str, sources: set, integration_type: str) -> None:
quoted_schema = schema[0] == '"'
tables = [
{
"name": source,
"quoting": {"identifier": True},
}
for source in sources
if table_name(source, integration_type)[0] == '"'
] + [{"name": source} for source in sources if table_name(source, integration_type)[0] != '"']
source_config = {
"version": 2,
"sources": [
{
"name": schema,
"tables": tables,
"quoting": {
"database": True,
"schema": True,
"identifier": True,
"schema": quoted_schema,
"identifier": False,
},
"tables": tables,
},
],
}
# Quoting options are hardcoded and passed down to the sources instead of
# inheriting them from dbt_project.yml (does not work well for some reasons?)
# Apparently, Snowflake needs this quoting configuration to work properly...
source_path = os.path.join(output, "sources.yml")
if not os.path.exists(source_path):
with open(source_path, "w") as fh:
@@ -163,7 +175,43 @@ def jinja_call(command: str) -> str:
return "{{ " + command + " }}"
def json_extract_base_property(path: List[str], json_col: str, name: str, definition: dict) -> Optional[str]:
def strip_accents(s):
return "".join(c for c in ud.normalize("NFD", s) if ud.category(c) != "Mn")
def resolve_identifier(input_name: str, integration_type: str) -> str:
if integration_type == "bigquery":
input_name = strip_accents(input_name)
input_name = sub(r"\s+", "_", input_name)
return sub(r"[^a-zA-Z0-9_]", "_", input_name)
else:
return input_name
def table_name(input_name: str, integration_type) -> str:
if integration_type == "bigquery":
return resolve_identifier(input_name, integration_type)
elif match("[^A-Za-z_]", input_name[0]) or match(".*[^A-Za-z0-9_].*", input_name):
return '"' + input_name + '"'
else:
return input_name
def quote(input_name: str, integration_type: str, in_jinja=False) -> str:
if integration_type == "bigquery":
input_name = resolve_identifier(input_name, integration_type)
if match("[^A-Za-z_]", input_name[0]) or match(".*[^A-Za-z0-9_].*", input_name):
result = f"adapter.quote('{input_name}')"
elif in_jinja:
result = f"'{input_name}'"
else:
return input_name
if not in_jinja:
return jinja_call(result)
return result
def json_extract_base_property(path: List[str], json_col: str, name: str, definition: dict, integration_type: str) -> Optional[str]:
current = path + [name]
if "type" not in definition:
return None
@@ -171,30 +219,32 @@ def json_extract_base_property(path: List[str], json_col: str, name: str, defini
return "cast({} as {}) as {}".format(
jinja_call(f"json_extract_scalar('{json_col}', {current})"),
jinja_call("dbt_utils.type_string()"),
jinja_call(f"adapter.quote_as_configured('{name}', 'identifier')"),
quote(name, integration_type),
)
elif is_integer(definition["type"]):
return "cast({} as {}) as {}".format(
jinja_call(f"json_extract_scalar('{json_col}', {current})"),
jinja_call("dbt_utils.type_int()"),
jinja_call(f"adapter.quote_as_configured('{name}', 'identifier')"),
quote(name, integration_type),
)
elif is_number(definition["type"]):
return "cast({} as {}) as {}".format(
jinja_call(f"json_extract_scalar('{json_col}', {current})"),
jinja_call("dbt_utils.type_float()"),
jinja_call(f"adapter.quote_as_configured('{name}', 'identifier')"),
quote(name, integration_type),
)
elif is_boolean(definition["type"]):
return "cast({} as boolean) as {}".format(
jinja_call(f"json_extract_scalar('{json_col}', {current})"),
jinja_call(f"adapter.quote_as_configured('{name}', 'identifier')"),
quote(name, integration_type),
)
else:
return None
def json_extract_nested_property(path: List[str], json_col: str, name: str, definition: dict) -> Union[Tuple[None, None], Tuple[str, str]]:
def json_extract_nested_property(
path: List[str], json_col: str, name: str, definition: dict, integration_type: str
) -> Union[Tuple[None, None], Tuple[str, str]]:
current = path + [name]
if definition is None or "type" not in definition:
return None, None
@@ -202,17 +252,15 @@ def json_extract_nested_property(path: List[str], json_col: str, name: str, defi
return (
"{} as {}".format(
jinja_call(f"json_extract_array('{json_col}', {current})"),
jinja_call(f"adapter.quote_as_configured('{name}', 'identifier')"),
),
"cross join {} as {}".format(
jinja_call(f"unnest('{name}')"), jinja_call(f"adapter.quote_as_configured('{name}', 'identifier')")
quote(name, integration_type),
),
"cross join {} as {}".format(jinja_call(f"unnest('{name}')"), quote(name, integration_type)),
)
elif is_object(definition["type"]):
return (
"{} as {}".format(
jinja_call(f"json_extract('{json_col}', {current})"),
jinja_call(f"adapter.quote_as_configured('{name}', 'identifier')"),
quote(name, integration_type),
),
"",
)
@@ -220,47 +268,51 @@ def json_extract_nested_property(path: List[str], json_col: str, name: str, defi
return None, None
def select_table(table: str, columns="*"):
return f"""\nselect {columns} from {jinja_call(f"adapter.quote_as_configured('{table}', 'identifier')")}"""
def select_table(table: str, integration_type: str, columns="*"):
return f"""\nselect {columns} from {table_name(table, integration_type)}"""
def extract_node_properties(path: List[str], json_col: str, properties: dict) -> dict:
def extract_node_properties(path: List[str], json_col: str, properties: dict, integration_type: str) -> dict:
result = {}
if properties:
for field in properties.keys():
sql_field = json_extract_base_property(path=path, json_col=json_col, name=field, definition=properties[field])
sql_field = json_extract_base_property(
path=path, json_col=json_col, name=field, definition=properties[field], integration_type=integration_type
)
if sql_field:
result[field] = sql_field
return result
def find_properties_object(path: List[str], field: str, properties) -> dict:
def find_properties_object(path: List[str], field: str, properties, integration_type: str) -> dict:
if isinstance(properties, str) or isinstance(properties, int):
return {}
else:
if "items" in properties:
return find_properties_object(path, field, properties["items"])
return find_properties_object(path, field, properties["items"], integration_type=integration_type)
elif "properties" in properties:
# we found a properties object
return {field: properties["properties"]}
elif "type" in properties and json_extract_base_property(path=path, json_col="", name="", definition=properties):
elif "type" in properties and json_extract_base_property(
path=path, json_col="", name="", definition=properties, integration_type=integration_type
):
# we found a basic type
return {field: None}
elif isinstance(properties, dict):
for key in properties.keys():
if not json_extract_base_property(path, "", key, properties[key]):
child = find_properties_object(path, key, properties[key])
if not json_extract_base_property(path, "", key, properties[key], integration_type=integration_type):
child = find_properties_object(path, key, properties[key], integration_type=integration_type)
if child:
return child
elif isinstance(properties, list):
for item in properties:
child = find_properties_object(path=path, field=field, properties=item)
child = find_properties_object(path=path, field=field, properties=item, integration_type=integration_type)
if child:
return child
return {}
def extract_nested_properties(path: List[str], field: str, properties: dict) -> dict:
def extract_nested_properties(path: List[str], field: str, properties: dict, integration_type: str) -> dict:
result = {}
if properties:
for key in properties.keys():
@@ -268,7 +320,9 @@ def extract_nested_properties(path: List[str], field: str, properties: dict) ->
if combining:
# skip combining schemas
for combo in combining:
found = find_properties_object(path=path + [field, key], field=key, properties=properties[key][combo])
found = find_properties_object(
path=path + [field, key], field=key, properties=properties[key][combo], integration_type=integration_type
)
result.update(found)
elif "type" not in properties[key]:
pass
@@ -277,74 +331,87 @@ def extract_nested_properties(path: List[str], field: str, properties: dict) ->
if combining:
# skip combining schemas
for combo in combining:
found = find_properties_object(path=path + [field, key], field=key, properties=properties[key]["items"][combo])
found = find_properties_object(
path=path + [field, key],
field=key,
properties=properties[key]["items"][combo],
integration_type=integration_type,
)
result.update(found)
else:
found = find_properties_object(path=path + [field, key], field=key, properties=properties[key]["items"])
found = find_properties_object(
path=path + [field, key], field=key, properties=properties[key]["items"], integration_type=integration_type
)
result.update(found)
elif is_object(properties[key]["type"]):
found = find_properties_object(path=path + [field, key], field=key, properties=properties[key])
found = find_properties_object(
path=path + [field, key], field=key, properties=properties[key], integration_type=integration_type
)
result.update(found)
return result
def process_node(
path: List[str], json_col: str, name: str, properties: dict, from_table: str = "", previous="with ", inject_cols=""
path: List[str],
json_col: str,
name: str,
properties: dict,
integration_type: str,
from_table: str = "",
previous="with ",
inject_cols="",
) -> dict:
result = {}
if previous == "with ":
prefix = previous
else:
prefix = previous + ","
node_properties = extract_node_properties(path=path, json_col=json_col, properties=properties)
node_properties = extract_node_properties(path=path, json_col=json_col, properties=properties, integration_type=integration_type)
node_columns = ",\n ".join([sql for sql in node_properties.values()])
hash_node_columns = ",\n ".join([f"adapter.quote_as_configured('{column}', 'identifier')" for column in node_properties.keys()])
# Disable dbt_utils.surrogate_key for own version to fix a bug with Postgres (#913).
# hash_node_columns = jinja_call(f"dbt_utils.surrogate_key([\n {hash_node_columns}\n ])")
# We should re-enable it when our PR to dbt_utils is merged
hash_node_columns = jinja_call(f"surrogate_key([\n {hash_node_columns}\n ])")
hash_id = jinja_call(f"adapter.quote_as_configured('_{name}_hashid', 'identifier')")
foreign_hash_id = jinja_call(f"adapter.quote_as_configured('_{name}_foreign_hashid', 'identifier')")
emitted_col = "{},\n {} as {}".format(
jinja_call("adapter.quote_as_configured('emitted_at', 'identifier')"),
hash_node_columns = ",\n ".join([quote(column, integration_type, in_jinja=True) for column in node_properties.keys()])
hash_node_columns = jinja_call(f"dbt_utils.surrogate_key([\n {hash_node_columns}\n ])")
hash_id = quote(f"_{name}_hashid", integration_type)
foreign_hash_id = quote(f"_{name}_foreign_hashid", integration_type)
emitted_col = "emitted_at,\n {} as normalized_at".format(
jinja_call("dbt_utils.current_timestamp_in_utc()"),
jinja_call("adapter.quote_as_configured('normalized_at', 'identifier')"),
)
node_sql = f"""{prefix}
{jinja_call(f"adapter.quote_as_configured('{name}_node', 'identifier')")} as (
{table_name(f"{name}_node", integration_type)} as (
select {inject_cols}
{emitted_col},
{node_columns}
from {from_table}
),
{jinja_call(f"adapter.quote_as_configured('{name}_with_id', 'identifier')")} as (
{table_name(f"{name}_with_id", integration_type)} as (
select
*,
{hash_node_columns} as {hash_id}
from {jinja_call(f"adapter.quote_as_configured('{name}_node', 'identifier')")}
from {table_name(f"{name}_node", integration_type)}
)"""
# SQL Query for current node's basic properties
result[name] = node_sql + select_table(f"{name}_with_id")
result[resolve_identifier(name, integration_type)] = node_sql + select_table(f"{name}_with_id", integration_type)
children_columns = extract_nested_properties(path=path, field=name, properties=properties)
children_columns = extract_nested_properties(path=path, field=name, properties=properties, integration_type=integration_type)
if children_columns:
for col in children_columns.keys():
child_col, join_child_table = json_extract_nested_property(path=path, json_col=json_col, name=col, definition=properties[col])
column_name = jinja_call(f"adapter.quote_as_configured('{col}', 'identifier')")
child_col, join_child_table = json_extract_nested_property(
path=path, json_col=json_col, name=col, definition=properties[col], integration_type=integration_type
)
column_name = quote(col, integration_type)
child_sql = f"""{prefix}
{jinja_call(f"adapter.quote_as_configured('{name}_node', 'identifier')")} as (
{table_name(f"{name}_node", integration_type)} as (
select
{emitted_col},
{child_col},
{node_columns}
from {from_table}
),
{jinja_call(f"adapter.quote_as_configured('{name}_with_id', 'identifier')")} as (
{table_name(f"{name}_with_id", integration_type)} as (
select
{hash_node_columns} as {hash_id},
{column_name}
from {jinja_call(f"adapter.quote_as_configured('{name}_node', 'identifier')")}
from {table_name(f"{name}_node", integration_type)}
{join_child_table}
)"""
if children_columns[col]:
@@ -360,8 +427,9 @@ def process_node(
result.update(children)
else:
# SQL Query for current node's basic properties
result[f"{name}_{col}"] = child_sql + select_table(
result[resolve_identifier(f"{name}_{col}", integration_type)] = child_sql + select_table(
f"{name}_with_id",
integration_type,
columns=f"""
{hash_id} as {foreign_hash_id},
{col}
@@ -370,7 +438,7 @@ def process_node(
return result
def generate_dbt_model(catalog: dict, json_col: str, schema: str) -> Tuple[dict, Set[Union[str]]]:
def generate_dbt_model(integration_type: str, catalog: dict, json_col: str, schema: str) -> Tuple[dict, Set[Union[str]]]:
result = {}
source_tables = set()
for configuredStream in catalog["streams"]:
@@ -390,9 +458,13 @@ def generate_dbt_model(catalog: dict, json_col: str, schema: str) -> Tuple[dict,
# TODO Replace {name}_raw by an argument like we do for the json_blob column
# This would enable destination to freely choose where to store intermediate data before notifying
# normalization step
table = jinja_call(f"source('{schema}', '{name}_raw')")
result.update(process_node(path=[], json_col=json_col, name=name, properties=properties, from_table=table))
source_tables.add(f"{name}_raw")
table = jinja_call(
f"source('{resolve_identifier(schema, integration_type)}', '{resolve_identifier(name + '_raw', integration_type)}')"
)
result.update(
process_node(path=[], json_col=json_col, name=name, properties=properties, from_table=table, integration_type=integration_type)
)
source_tables.add(resolve_identifier(name + "_raw", integration_type))
return result, source_tables

View File

@@ -64,7 +64,9 @@ import io.airbyte.workers.protocols.airbyte.DefaultAirbyteDestination;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.List;
import java.util.Map.Entry;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.Stream;
@@ -179,6 +181,12 @@ public abstract class TestDestination {
*/
protected abstract void tearDown(TestDestinationEnv testEnv) throws Exception;
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
result.add(identifier);
return result;
}
@BeforeEach
void setUpInternal() throws Exception {
Path testDir = Path.of("/tmp/airbyte_tests/");
@@ -311,7 +319,9 @@ public abstract class TestDestination {
.map(record -> Jsons.deserialize(record, AirbyteMessage.class)).collect(Collectors.toList());
runSync(getConfigWithBasicNormalization(), messages, configuredCatalog);
LOGGER.info("Comparing retrieveRecordsForCatalog for {} and {}", messagesFilename, catalogFilename);
assertSameMessages(messages, retrieveRecordsForCatalog(catalog));
LOGGER.info("Comparing retrieveNormalizedRecordsForCatalog for {} and {}", messagesFilename, catalogFilename);
assertSameMessages(messages, retrieveNormalizedRecordsForCatalog(catalog), true);
}
@@ -411,27 +421,52 @@ public abstract class TestDestination {
// ignores emitted at.
private void assertSameMessages(List<AirbyteMessage> expected, List<AirbyteRecordMessage> actual, boolean pruneAirbyteInternalFields) {
final List<AirbyteRecordMessage> expectedProcessed = expected.stream()
final List<JsonNode> expectedProcessed = expected.stream()
.filter(message -> message.getType() == AirbyteMessage.Type.RECORD)
.map(AirbyteMessage::getRecord)
.peek(recordMessage -> recordMessage.setEmittedAt(null))
.map(recordMessage -> pruneAirbyteInternalFields ? this.safePrune(recordMessage) : recordMessage)
.map(recordMessage -> recordMessage.getData())
.collect(Collectors.toList());
final List<AirbyteRecordMessage> actualProcessed = actual.stream()
final List<JsonNode> actualProcessed = actual.stream()
.map(recordMessage -> pruneAirbyteInternalFields ? this.safePrune(recordMessage) : recordMessage)
.map(recordMessage -> recordMessage.getData())
.collect(Collectors.toList());
assertSameData(expectedProcessed, actualProcessed);
}
private void assertSameData(List<AirbyteRecordMessage> expected, List<AirbyteRecordMessage> actual) {
LOGGER.info("expected: {}", expected);
LOGGER.info("actual: {}", actual);
// we want to ignore order in this comparison.
private void assertSameData(List<JsonNode> expected, List<JsonNode> actual) {
LOGGER.info("Expected data {}", expected);
LOGGER.info("Actual data {}", actual);
assertEquals(expected.size(), actual.size());
assertTrue(expected.containsAll(actual));
assertTrue(actual.containsAll(expected));
final Iterator<JsonNode> expectedIterator = expected.iterator();
final Iterator<JsonNode> actualIterator = actual.iterator();
while (expectedIterator.hasNext() && actualIterator.hasNext()) {
final JsonNode expectedData = expectedIterator.next();
final JsonNode actualData = actualIterator.next();
final Iterator<Entry<String, JsonNode>> expectedDataIterator = expectedData.fields();
LOGGER.info("Expected row {}", expectedData);
LOGGER.info("Actual row {}", actualData);
assertEquals(expectedData.size(), actualData.size());
while (expectedDataIterator.hasNext()) {
final Entry<String, JsonNode> expectedEntry = expectedDataIterator.next();
final JsonNode expectedValue = expectedEntry.getValue();
JsonNode actualValue = null;
String key = expectedEntry.getKey();
for (String tmpKey : resolveIdentifier(expectedEntry.getKey())) {
actualValue = actualData.get(tmpKey);
if (actualValue != null) {
key = tmpKey;
break;
}
}
LOGGER.info("For {} Expected {} vs Actual {}", key, expectedValue, actualValue);
assertTrue(actualData.has(key));
assertEquals(expectedValue, actualValue);
}
}
}
/**
@@ -469,7 +504,8 @@ public abstract class TestDestination {
// likely did not exist in the original message. the most consistent thing to do is always remove
// the null fields (this choice does decrease our ability to check that normalization creates
// columns even if all the values in that column are null)
if (Sets.newHashSet("emitted_at", "ab_id", "normalized_at").contains(key) || key.matches("^_.*_hashid$") || json.get(key).isNull()) {
if (Sets.newHashSet("emitted_at", "ab_id", "normalized_at", "EMITTED_AT", "AB_ID", "NORMALIZED_AT").contains(key) || key.matches("^_.*_hashid$")
|| json.get(key).isNull()) {
((ObjectNode) json).remove(key);
}
}

View File

@@ -45,6 +45,16 @@
}
}
}
},
{
"name": "stream-with:spécial:character_names",
"json_schema": {
"properties": {
"field_with_spécial_character": {
"type": "string"
}
}
}
}
]
}

View File

@@ -1,4 +1,5 @@
{"type": "RECORD", "record": {"stream": "streamWithCamelCase", "emitted_at": 1602637589000, "data": { "data" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_underscores", "emitted_at": 1602637589000, "data": { "data" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_edge_case_field_names", "emitted_at": 1602637589000, "data": { "fieldWithCamelCase" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_edge_case_field_names", "emitted_at": 1602637589000, "data": { "field_with_underscore" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_underscores", "emitted_at": 1602637589100, "data": { "data" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_edge_case_field_names", "emitted_at": 1602637589200, "data": { "fieldWithCamelCase" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream_with_edge_case_field_names", "emitted_at": 1602637589300, "data": { "field_with_underscore" : "one" }}}
{"type": "RECORD", "record": {"stream": "stream-with:spécial:character_names", "emitted_at": 1602637589400, "data": { "field_with_spécial_character" : "one" }}}

View File

@@ -1,8 +1,8 @@
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-29T00:00:00Z", "NZD": 0.12, "HKD": 2.13}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-08-31"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD": 7.15}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589100, "data": { "date": "2020-08-30T00:00:00Z", "NZD": 1.14, "HKD": 7.15}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-01"}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.99, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589000, "data": { "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589200, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589300, "data": { "date": "2020-08-31T00:00:00Z", "NZD": 1.99, "HKD": 7.99, "USD": 10.99}}}
{"type": "RECORD", "record": {"stream": "exchange_rate", "emitted_at": 1602637589400, "data": { "date": "2020-09-01T00:00:00Z", "NZD": 1.14, "HKD": 7.15, "USD": 10.16}}}
{"type": "STATE", "state": { "data": {"start_date": "2020-09-02"}}}

View File

@@ -1,3 +1,3 @@
{"type": "RECORD", "record": { "stream": "customers", "emitted_at": 1602637450, "data": {"id": "cus_I1l0XHrjLYwLR2", "object": "customer", "account_balance": 0, "created": "2020-09-15T16:58:52.000000Z", "currency": null, "default_source": null, "delinquent": false, "description": "Customer 3", "discount": null, "email": "customer3@test.com", "invoice_prefix": "EC156D8F", "livemode": false, "metadata": {}, "shipping": null, "sources": [], "subscriptions": null, "tax_info": null, "tax_info_verification": null, "updated": "2020-09-15T16:58:52.000000Z"}, "time_extracted": "2020-09-15T18:01:23.634272Z"}}
{"type": "RECORD", "record": { "stream": "customers", "emitted_at": 1602637450, "data": {"id": "cus_I1l0INzfeSf2MM", "object": "customer", "account_balance": 0, "created": "2020-09-15T16:58:52.000000Z", "currency": null, "default_source": null, "delinquent": false, "description": "Customer 2", "discount": null, "email": "customer2@test.com", "invoice_prefix": "D4564D22", "livemode": false, "metadata": {}, "shipping": null, "sources": [], "subscriptions": null, "tax_info": null, "tax_info_verification": null, "updated": "2020-09-15T16:58:52.000000Z"}, "time_extracted": "2020-09-15T18:01:23.634272Z"}}
{"type": "RECORD", "record": { "stream": "customers", "emitted_at": 1602637450, "data": {"id": "cus_I1l0cRVFy4ZhwQ", "object": "customer", "account_balance": 0, "created": "2020-09-15T16:58:52.000000Z", "currency": null, "default_source": null, "delinquent": false, "description": "Customer 1", "discount": null, "email": "customer1@test.com", "invoice_prefix": "92A8C396", "livemode": false, "metadata": {}, "shipping": null, "sources": [], "subscriptions": null, "tax_info": null, "tax_info_verification": null, "updated": "2020-09-15T16:58:52.000000Z"}, "time_extracted": "2020-09-15T18:01:23.634272Z"}}
{"type": "RECORD", "record": { "stream": "customers", "emitted_at": 1602637460, "data": {"id": "cus_I1l0INzfeSf2MM", "object": "customer", "account_balance": 0, "created": "2020-09-15T16:58:52.000000Z", "currency": null, "default_source": null, "delinquent": false, "description": "Customer 2", "discount": null, "email": "customer2@test.com", "invoice_prefix": "D4564D22", "livemode": false, "metadata": {}, "shipping": null, "sources": [], "subscriptions": null, "tax_info": null, "tax_info_verification": null, "updated": "2020-09-15T16:58:52.000000Z"}, "time_extracted": "2020-09-15T18:01:23.634272Z"}}
{"type": "RECORD", "record": { "stream": "customers", "emitted_at": 1602637470, "data": {"id": "cus_I1l0cRVFy4ZhwQ", "object": "customer", "account_balance": 0, "created": "2020-09-15T16:58:52.000000Z", "currency": null, "default_source": null, "delinquent": false, "description": "Customer 1", "discount": null, "email": "customer1@test.com", "invoice_prefix": "92A8C396", "livemode": false, "metadata": {}, "shipping": null, "sources": [], "subscriptions": null, "tax_info": null, "tax_info_verification": null, "updated": "2020-09-15T16:58:52.000000Z"}, "time_extracted": "2020-09-15T18:01:23.634272Z"}}

View File

@@ -10,7 +10,7 @@ As a community contributor, you will need access to a GCP project and BigQuery t
1. Click on `+ Create Service Account" button
1. Fill out a descriptive name/id/description
1. Click the edit icon next to the service account you created on the `IAM` page
1. Add the `BigQuery User` role
1. Add the `BigQuery Data Editor` and `BigQuery User` role
1. Go back to the `Service Accounts` page and use the actions modal to `Create Key`
1. Download this key as a JSON file
1. Move and rename this file to `secrets/credentials.json`

View File

@@ -30,6 +30,7 @@ import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryException;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.bigquery.CopyJobConfiguration;
import com.google.cloud.bigquery.DatasetInfo;
import com.google.cloud.bigquery.Field;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
@@ -55,7 +56,8 @@ import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.integrations.base.SQLNamingResolvable;
import io.airbyte.integrations.base.StandardSQLNaming;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
@@ -65,9 +67,10 @@ import io.airbyte.protocol.models.ConnectorSpecification;
import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.commons.lang3.tuple.ImmutablePair;
@@ -90,6 +93,12 @@ public class BigQueryDestination implements Destination {
Field.of(COLUMN_DATA, StandardSQLTypeName.STRING),
Field.of(COLUMN_EMITTED_AT, StandardSQLTypeName.TIMESTAMP));
private final SQLNamingResolvable namingResolver;
public BigQueryDestination() {
namingResolver = new StandardSQLNaming();
}
@Override
public ConnectorSpecification spec() throws IOException {
// return a jsonschema representation of the spec for the integration.
@@ -101,13 +110,18 @@ public class BigQueryDestination implements Destination {
public AirbyteConnectionStatus check(JsonNode config) {
try {
final String datasetId = config.get(CONFIG_DATASET_ID).asText();
final BigQuery bigquery = getBigQuery(config);
if (!bigquery.getDataset(datasetId).exists()) {
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(datasetId).build();
bigquery.create(datasetInfo);
}
QueryJobConfiguration queryConfig = QueryJobConfiguration
.newBuilder(String.format("SELECT * FROM %s.INFORMATION_SCHEMA.TABLES LIMIT 1;", datasetId))
.setUseLegacySql(false)
.build();
final ImmutablePair<Job, String> result = executeQuery(getBigQuery(config), queryConfig);
final ImmutablePair<Job, String> result = executeQuery(bigquery, queryConfig);
if (result.getLeft() != null) {
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} else {
@@ -163,6 +177,11 @@ public class BigQueryDestination implements Destination {
}
}
@Override
public SQLNamingResolvable getNamingResolver() {
return namingResolver;
}
/**
* Strategy:
* <p>
@@ -190,23 +209,33 @@ public class BigQueryDestination implements Destination {
final BigQuery bigquery = getBigQuery(config);
Map<String, WriteConfig> writeConfigs = new HashMap<>();
final String datasetId = config.get(CONFIG_DATASET_ID).asText();
Set<String> schemaSet = new HashSet<>();
// create tmp tables if not exist
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String tableName = NamingHelper.getRawTableName(stream.getStream().getName());
final String tmpTableName = stream.getStream().getName() + "_" + Instant.now().toEpochMilli();
createTable(bigquery, datasetId, tmpTableName);
final String streamName = stream.getStream().getName();
final String schemaName = getNamingResolver().getIdentifier(datasetId);
final String tableName = getNamingResolver().getRawTableName(streamName);
final String tmpTableName = getNamingResolver().getTmpTableName(streamName);
if (!schemaSet.contains(schemaName)) {
if (!bigquery.getDataset(schemaName).exists()) {
final DatasetInfo datasetInfo = DatasetInfo.newBuilder(schemaName).build();
bigquery.create(datasetInfo);
}
schemaSet.add(schemaName);
}
createTable(bigquery, schemaName, tmpTableName);
// https://cloud.google.com/bigquery/docs/loading-data-local#loading_data_from_a_local_data_source
final WriteChannelConfiguration writeChannelConfiguration = WriteChannelConfiguration
.newBuilder(TableId.of(datasetId, tmpTableName))
.newBuilder(TableId.of(schemaName, tmpTableName))
.setCreateDisposition(CreateDisposition.CREATE_IF_NEEDED)
.setSchema(SCHEMA)
.setFormatOptions(FormatOptions.json()).build(); // new-line delimited json.
final TableDataWriteChannel writer = bigquery.writer(JobId.of(UUID.randomUUID().toString()), writeChannelConfiguration);
writeConfigs.put(stream.getStream().getName(), new WriteConfig(TableId.of(datasetId, tableName), TableId.of(datasetId, tmpTableName), writer));
writeConfigs.put(stream.getStream().getName(),
new WriteConfig(TableId.of(schemaName, tableName), TableId.of(schemaName, tmpTableName), writer));
}
// write to tmp tables

View File

@@ -43,11 +43,12 @@ import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.integrations.base.StandardSQLNaming;
import io.airbyte.integrations.standardtest.destination.TestDestination;
import java.io.ByteArrayInputStream;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.UUID;
@@ -72,6 +73,7 @@ public class BigQueryIntegrationTest extends TestDestination {
private Dataset dataset;
private boolean tornDown;
private JsonNode config;
private StandardSQLNaming namingResolver = new StandardSQLNaming();
@Override
protected String getImageName() {
@@ -96,23 +98,32 @@ public class BigQueryIntegrationTest extends TestDestination {
@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName) throws Exception {
return retrieveRecordsFromTable(testEnv, streamName);
String tableName = namingResolver.getIdentifier(streamName);
return retrieveRecordsFromTable(testEnv, tableName);
}
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName) throws Exception {
return retrieveRecordsFromTable(env, NamingHelper.getRawTableName(streamName))
return retrieveRecordsFromTable(env, namingResolver.getRawTableName(streamName))
.stream()
.map(node -> node.get("data").asText())
.map(Jsons::deserialize)
.collect(Collectors.toList());
}
@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
result.add(identifier);
result.add(namingResolver.getIdentifier(identifier));
return result;
}
private List<JsonNode> retrieveRecordsFromTable(TestDestinationEnv env, String tableName) throws InterruptedException {
final QueryJobConfiguration queryConfig =
QueryJobConfiguration
.newBuilder(
String.format("SELECT * FROM `%s`.`%s`;", dataset.getDatasetId().getDataset(), tableName))
String.format("SELECT * FROM `%s`.`%s` order by emitted_at asc;", dataset.getDatasetId().getDataset(), tableName))
.setUseLegacySql(false).build();
TableResult queryResults = executeQuery(bigquery, queryConfig).getLeft().getQueryResults();

View File

@@ -44,7 +44,6 @@ import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
@@ -210,7 +209,8 @@ class BigQueryDestinationTest {
// @Test
void testWriteSuccess() throws Exception {
final DestinationConsumer<AirbyteMessage> consumer = new BigQueryDestination().write(config, CATALOG);
final BigQueryDestination destination = new BigQueryDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, CATALOG);
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
@@ -219,12 +219,12 @@ class BigQueryDestinationTest {
consumer.accept(MESSAGE_STATE);
consumer.close();
final List<JsonNode> usersActual = retrieveRecords(NamingHelper.getRawTableName(USERS_STREAM_NAME));
final List<JsonNode> usersActual = retrieveRecords(destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME));
final List<JsonNode> expectedUsersJson = Lists.newArrayList(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson.size(), usersActual.size());
assertTrue(expectedUsersJson.containsAll(usersActual) && usersActual.containsAll(expectedUsersJson));
final List<JsonNode> tasksActual = retrieveRecords(NamingHelper.getRawTableName(TASKS_STREAM_NAME));
final List<JsonNode> tasksActual = retrieveRecords(destination.getNamingResolver().getRawTableName(TASKS_STREAM_NAME));
final List<JsonNode> expectedTasksJson = Lists.newArrayList(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson.size(), tasksActual.size());
assertTrue(expectedTasksJson.containsAll(tasksActual) && tasksActual.containsAll(expectedTasksJson));

View File

@@ -30,8 +30,10 @@ import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.ExtendedSQLNaming;
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.SQLNamingResolvable;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
@@ -62,6 +64,12 @@ public class CsvDestination implements Destination {
static final String COLUMN_EMITTED_AT = "emitted_at"; // we output all data as a blob to a single column.
static final String DESTINATION_PATH_FIELD = "destination_path";
private final SQLNamingResolvable namingResolver;
public CsvDestination() {
namingResolver = new ExtendedSQLNaming();
}
@Override
public ConnectorSpecification spec() throws IOException {
final String resourceString = MoreResources.readResource("spec.json");
@@ -78,6 +86,11 @@ public class CsvDestination implements Destination {
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
}
@Override
public SQLNamingResolvable getNamingResolver() {
return namingResolver;
}
/**
* @param config - csv destination config.
* @param catalog - schema of the incoming messages.
@@ -93,8 +106,11 @@ public class CsvDestination implements Destination {
final long now = Instant.now().toEpochMilli();
final Map<String, WriteConfig> writeConfigs = new HashMap<>();
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final Path tmpPath = destinationDir.resolve(stream.getStream().getName() + "_" + now + ".csv");
final Path finalPath = destinationDir.resolve(stream.getStream().getName() + ".csv");
final String streamName = stream.getStream().getName();
final String tableName = getNamingResolver().getRawTableName(streamName);
final String tmpTableName = getNamingResolver().getTmpTableName(streamName);
final Path tmpPath = destinationDir.resolve(tmpTableName + ".csv");
final Path finalPath = destinationDir.resolve(tableName + ".csv");
final FileWriter fileWriter = new FileWriter(tmpPath.toFile());
final CSVPrinter printer = new CSVPrinter(fileWriter, CSVFormat.DEFAULT.withHeader(COLUMN_AB_ID, COLUMN_EMITTED_AT, COLUMN_DATA));
writeConfigs.put(stream.getStream().getName(), new WriteConfig(printer, tmpPath, finalPath));

View File

@@ -68,8 +68,8 @@ class CsvDestinationTest {
private static final Path TEST_ROOT = Path.of("/tmp/airbyte_tests");
private static final String USERS_STREAM_NAME = "users";
private static final String TASKS_STREAM_NAME = "tasks";
private static final String USERS_FILE = USERS_STREAM_NAME + ".csv";
private static final String TASKS_FILE = TASKS_STREAM_NAME + ".csv";
private static final String USERS_FILE = USERS_STREAM_NAME + "_raw.csv";
private static final String TASKS_FILE = TASKS_STREAM_NAME + "_raw.csv";
private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build()))

View File

@@ -25,8 +25,8 @@
package io.airbyte.integrations.destination.postgres;
import static org.jooq.impl.DSL.field;
import static org.jooq.impl.DSL.name;
import static org.jooq.impl.DSL.table;
import static org.jooq.impl.DSL.unquotedName;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.base.Charsets;
@@ -40,7 +40,7 @@ import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.FailureTrackingConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.integrations.base.SQLNamingResolvable;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
@@ -55,7 +55,6 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.SQLException;
import java.time.Duration;
import java.time.Instant;
import java.time.LocalDateTime;
import java.time.OffsetDateTime;
import java.time.ZoneOffset;
@@ -79,6 +78,12 @@ public class PostgresDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(PostgresDestination.class);
static final String COLUMN_NAME = "data";
private final SQLNamingResolvable namingResolver;
public PostgresDestination() {
namingResolver = new PostgresSQLNaming();
}
@Override
public ConnectorSpecification spec() throws IOException {
// return a jsonschema representation of the spec for the integration.
@@ -101,6 +106,11 @@ public class PostgresDestination implements Destination {
}
}
@Override
public SQLNamingResolvable getNamingResolver() {
return namingResolver;
}
/**
* Strategy:
* <p>
@@ -136,10 +146,10 @@ public class PostgresDestination implements Destination {
final Set<String> schemaSet = new HashSet<>();
// create tmp tables if not exist
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String schemaName = getSchemaName(config);
final String streamName = stream.getStream().getName();
final String tableName = NamingHelper.getRawTableName(streamName);
final String tmpTableName = streamName + "_" + Instant.now().toEpochMilli();
final String schemaName = getNamingResolver().getIdentifier(getConfigSchemaName(config));
final String tableName = getNamingResolver().getRawTableName(streamName);
final String tmpTableName = getNamingResolver().getTmpTableName(streamName);
if (!schemaSet.contains(schemaName)) {
database.query(ctx -> ctx.execute(createSchemaQuery(schemaName)));
schemaSet.add(schemaName);
@@ -158,15 +168,15 @@ public class PostgresDestination implements Destination {
}
static String createSchemaQuery(String schemaName) {
return String.format("CREATE SCHEMA IF NOT EXISTS \"%s\";\n", schemaName);
return String.format("CREATE SCHEMA IF NOT EXISTS %s;\n", schemaName);
}
static String createRawTableQuery(String schemaName, String streamName) {
return String.format(
"CREATE TABLE IF NOT EXISTS \"%s\".\"%s\" ( \n"
+ "\"ab_id\" VARCHAR PRIMARY KEY,\n"
+ "\"%s\" JSONB,\n"
+ "\"emitted_at\" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
"CREATE TABLE IF NOT EXISTS %s.%s ( \n"
+ "ab_id VARCHAR PRIMARY KEY,\n"
+ "%s JSONB,\n"
+ "emitted_at TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP\n"
+ ");\n",
schemaName, streamName, COLUMN_NAME);
}
@@ -237,7 +247,7 @@ public class PostgresDestination implements Destination {
String schemaName,
String tmpTableName) {
InsertValuesStep3<Record, String, JSONB, OffsetDateTime> step =
ctx.insertInto(table(name(schemaName, tmpTableName)), field("ab_id", String.class),
ctx.insertInto(table(unquotedName(schemaName, tmpTableName)), field("ab_id", String.class),
field(COLUMN_NAME, JSONB.class), field("emitted_at", OffsetDateTime.class));
for (int i = 0; i < batchSize; i++) {
@@ -296,13 +306,13 @@ public class PostgresDestination implements Destination {
switch (writeConfig.getSyncMode()) {
case FULL_REFRESH -> {
// truncate table if already exist.
query.append(String.format("TRUNCATE TABLE \"%s\".\"%s\";\n", writeConfig.getSchemaName(), writeConfig.getTableName()));
query.append(String.format("TRUNCATE TABLE %s.%s;\n", writeConfig.getSchemaName(), writeConfig.getTableName()));
}
case INCREMENTAL -> {}
default -> throw new IllegalStateException("Unrecognized sync mode: " + writeConfig.getSyncMode());
}
// always copy data from tmp table into "main" table.
query.append(String.format("INSERT INTO \"%s\".\"%s\" SELECT * FROM \"%s\".\"%s\";\n", writeConfig.getSchemaName(),
query.append(String.format("INSERT INTO %s.%s SELECT * FROM %s.%s;\n", writeConfig.getSchemaName(),
writeConfig.getTableName(), writeConfig.getSchemaName(), writeConfig.getTmpTableName()));
}
return ctx.execute(query.toString());
@@ -321,7 +331,7 @@ public class PostgresDestination implements Destination {
for (WriteConfig writeConfig : writeConfigs.values()) {
try {
database.query(
ctx -> ctx.execute(String.format("DROP TABLE IF EXISTS \"%s\".\"%s\";", writeConfig.getSchemaName(), writeConfig.getTmpTableName())));
ctx -> ctx.execute(String.format("DROP TABLE IF EXISTS %s.%s;", writeConfig.getSchemaName(), writeConfig.getTmpTableName())));
} catch (SQLException e) {
throw new RuntimeException(e);
}
@@ -378,7 +388,7 @@ public class PostgresDestination implements Destination {
config.get("database").asText()));
}
private static String getSchemaName(JsonNode config) {
private static String getConfigSchemaName(JsonNode config) {
if (config.has("schema")) {
return config.get("schema").asText();
} else {

View File

@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.postgres;
import io.airbyte.integrations.base.ExtendedSQLNaming;
public class PostgresSQLNaming extends ExtendedSQLNaming {
@Override
protected String applyDefaultCase(String input) {
return input.toLowerCase();
}
}

View File

@@ -28,9 +28,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.integrations.base.ExtendedSQLNaming;
import io.airbyte.integrations.standardtest.destination.TestDestination;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.jooq.JSONFormat;
@@ -43,6 +44,7 @@ public class PostgresIntegrationTest extends TestDestination {
private static final String RAW_DATA_COLUMN = "data";
private PostgreSQLContainer<?> db;
private ExtendedSQLNaming namingResolver = new ExtendedSQLNaming();
@Override
protected String getImageName() {
@@ -75,7 +77,7 @@ public class PostgresIntegrationTest extends TestDestination {
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName) throws Exception {
return retrieveRecordsFromTable(NamingHelper.getRawTableName(streamName))
return retrieveRecordsFromTable(namingResolver.getRawTableName(streamName))
.stream()
.map(r -> Jsons.deserialize(r.get(RAW_DATA_COLUMN).asText()))
.collect(Collectors.toList());
@@ -94,14 +96,32 @@ public class PostgresIntegrationTest extends TestDestination {
@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv env, String streamName)
throws Exception {
return retrieveRecordsFromTable(streamName);
String tableName = namingResolver.getIdentifier(streamName);
if (!tableName.startsWith("\"")) {
// Currently, Normalization always quote tables identifiers
tableName = "\"" + tableName + "\"";
}
return retrieveRecordsFromTable(tableName);
}
@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
private List<JsonNode> retrieveRecordsFromTable(String tableName) throws SQLException {
return Databases.createPostgresDatabase(db.getUsername(), db.getPassword(),
db.getJdbcUrl()).query(
ctx -> ctx
.fetch(String.format("SELECT * FROM \"%s\" ORDER BY emitted_at ASC;", tableName))
.fetch(String.format("SELECT * FROM %s ORDER BY emitted_at ASC;", tableName))
.stream()
.map(r -> r.formatJSON(JSON_FORMAT))
.map(Jsons::deserialize)

View File

@@ -40,7 +40,6 @@ import io.airbyte.commons.resources.MoreResources;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
@@ -75,7 +74,7 @@ class PostgresDestinationTest {
private static final JSONFormat JSON_FORMAT = new JSONFormat().recordFormat(RecordFormat.OBJECT);
private static final Instant NOW = Instant.now();
private static final String USERS_STREAM_NAME = "users";
private static final String TASKS_STREAM_NAME = "tasks";
private static final String TASKS_STREAM_NAME = "tasks-list";
private static final AirbyteMessage MESSAGE_USERS1 = new AirbyteMessage().withType(AirbyteMessage.Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
.withData(Jsons.jsonNode(ImmutableMap.builder().put("name", "john").put("id", "10").build()))
@@ -158,7 +157,8 @@ class PostgresDestinationTest {
@Test
void testWriteSuccess() throws Exception {
final DestinationConsumer<AirbyteMessage> consumer = new PostgresDestination().write(config, CATALOG);
final PostgresDestination destination = new PostgresDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, CATALOG);
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
@@ -167,11 +167,11 @@ class PostgresDestinationTest {
consumer.accept(MESSAGE_STATE);
consumer.close();
Set<JsonNode> usersActual = recordRetriever(NamingHelper.getRawTableName(USERS_STREAM_NAME));
Set<JsonNode> usersActual = recordRetriever(destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME));
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
Set<JsonNode> tasksActual = recordRetriever(NamingHelper.getRawTableName(TASKS_STREAM_NAME));
Set<JsonNode> tasksActual = recordRetriever(destination.getNamingResolver().getRawTableName(TASKS_STREAM_NAME));
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
@@ -184,7 +184,8 @@ class PostgresDestinationTest {
final ConfiguredAirbyteCatalog catalog = Jsons.clone(CATALOG);
catalog.getStreams().forEach(stream -> stream.withSyncMode(SyncMode.INCREMENTAL));
final DestinationConsumer<AirbyteMessage> consumer = new PostgresDestination().write(config, catalog);
final PostgresDestination destination = new PostgresDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(config, catalog);
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
@@ -193,7 +194,7 @@ class PostgresDestinationTest {
consumer.accept(MESSAGE_STATE);
consumer.close();
final DestinationConsumer<AirbyteMessage> consumer2 = new PostgresDestination().write(config, catalog);
final DestinationConsumer<AirbyteMessage> consumer2 = destination.write(config, catalog);
final AirbyteMessage messageUser3 = new AirbyteMessage().withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(USERS_STREAM_NAME)
@@ -202,14 +203,14 @@ class PostgresDestinationTest {
consumer2.accept(messageUser3);
consumer2.close();
Set<JsonNode> usersActual = recordRetriever(NamingHelper.getRawTableName(USERS_STREAM_NAME));
Set<JsonNode> usersActual = recordRetriever(destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME));
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(
MESSAGE_USERS1.getRecord().getData(),
MESSAGE_USERS2.getRecord().getData(),
messageUser3.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
Set<JsonNode> tasksActual = recordRetriever(NamingHelper.getRawTableName(TASKS_STREAM_NAME));
Set<JsonNode> tasksActual = recordRetriever(destination.getNamingResolver().getRawTableName(TASKS_STREAM_NAME));
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
@@ -227,7 +228,8 @@ class PostgresDestinationTest {
.put("port", container.getFirstMappedPort())
.put("database", container.getDatabaseName())
.build());
final DestinationConsumer<AirbyteMessage> consumer = new PostgresDestination().write(newConfig, CATALOG);
final PostgresDestination destination = new PostgresDestination();
final DestinationConsumer<AirbyteMessage> consumer = destination.write(newConfig, CATALOG);
consumer.accept(MESSAGE_USERS1);
consumer.accept(MESSAGE_TASKS1);
@@ -236,18 +238,21 @@ class PostgresDestinationTest {
consumer.accept(MESSAGE_STATE);
consumer.close();
Set<JsonNode> usersActual = recordRetriever(NamingHelper.getRawTableName("new_schema." + USERS_STREAM_NAME));
final String schemaName = destination.getNamingResolver().getIdentifier("new_schema");
String streamName = schemaName + "." + destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME);
Set<JsonNode> usersActual = recordRetriever(streamName);
final Set<JsonNode> expectedUsersJson = Sets.newHashSet(MESSAGE_USERS1.getRecord().getData(), MESSAGE_USERS2.getRecord().getData());
assertEquals(expectedUsersJson, usersActual);
Set<JsonNode> tasksActual = recordRetriever(NamingHelper.getRawTableName("new_schema." + TASKS_STREAM_NAME));
streamName = schemaName + "." + destination.getNamingResolver().getRawTableName(TASKS_STREAM_NAME);
Set<JsonNode> tasksActual = recordRetriever(streamName);
final Set<JsonNode> expectedTasksJson = Sets.newHashSet(MESSAGE_TASKS1.getRecord().getData(), MESSAGE_TASKS2.getRecord().getData());
assertEquals(expectedTasksJson, tasksActual);
assertTmpTablesNotPresent(
CATALOG.getStreams().stream().map(ConfiguredAirbyteStream::getStream).map(AirbyteStream::getName).collect(Collectors.toList()));
assertThrows(RuntimeException.class, () -> recordRetriever(NamingHelper.getRawTableName(USERS_STREAM_NAME)));
assertThrows(RuntimeException.class, () -> recordRetriever(destination.getNamingResolver().getRawTableName(USERS_STREAM_NAME)));
}
@SuppressWarnings("ResultOfMethodCallIgnored")
@@ -257,7 +262,8 @@ class PostgresDestinationTest {
final AirbyteMessage spiedMessage = spy(MESSAGE_USERS1);
doThrow(new RuntimeException()).when(spiedMessage).getRecord();
final DestinationConsumer<AirbyteMessage> consumer = spy(new PostgresDestination().write(config, CATALOG));
final PostgresDestination destination = new PostgresDestination();
final DestinationConsumer<AirbyteMessage> consumer = spy(destination.write(config, CATALOG));
assertThrows(RuntimeException.class, () -> consumer.accept(spiedMessage));
consumer.accept(MESSAGE_USERS2);
@@ -266,7 +272,7 @@ class PostgresDestinationTest {
final List<String> tableNames = CATALOG.getStreams()
.stream()
.map(ConfiguredAirbyteStream::getStream)
.map(s -> NamingHelper.getRawTableName(s.getName()))
.map(s -> destination.getNamingResolver().getRawTableName(s.getName()))
.collect(Collectors.toList());
assertTmpTablesNotPresent(CATALOG.getStreams()
.stream()

View File

@@ -30,7 +30,7 @@ import io.airbyte.commons.resources.MoreResources;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.DestinationConsumer;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.integrations.base.SQLNamingResolvable;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import io.airbyte.protocol.models.AirbyteConnectionStatus.Status;
import io.airbyte.protocol.models.AirbyteMessage;
@@ -42,9 +42,10 @@ import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.sql.Connection;
import java.time.Instant;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.function.Supplier;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,6 +56,12 @@ public class SnowflakeDestination implements Destination {
protected static final String COLUMN_NAME = "data";
private final SQLNamingResolvable namingResolver;
public SnowflakeDestination() {
namingResolver = new SnowflakeSQLNaming();
}
@Override
public ConnectorSpecification spec() throws IOException {
final String resourceString = MoreResources.readResource("spec.json");
@@ -72,6 +79,11 @@ public class SnowflakeDestination implements Destination {
}
}
@Override
public SQLNamingResolvable getNamingResolver() {
return namingResolver;
}
/**
* Strategy:
* <p>
@@ -103,26 +115,34 @@ public class SnowflakeDestination implements Destination {
// connect to snowflake
final Supplier<Connection> connectionFactory = SnowflakeDatabase.getConnectionFactory(config);
Map<String, SnowflakeWriteContext> writeBuffers = new HashMap<>();
Set<String> schemaSet = new HashSet<>();
// create temporary tables if they do not exist
// we don't use temporary/transient since we want to control the lifecycle
for (final ConfiguredAirbyteStream stream : catalog.getStreams()) {
final String tableName = NamingHelper.getRawTableName(stream.getStream().getName());
final String tmpTableName = stream.getStream().getName() + "_" + Instant.now().toEpochMilli();
final String streamName = stream.getStream().getName();
final String schemaName = getNamingResolver().getIdentifier(config.get("schema").asText());
final String tableName = getNamingResolver().getRawTableName(streamName);
final String tmpTableName = getNamingResolver().getTmpTableName(streamName);
if (!schemaSet.contains(schemaName)) {
final String query = String.format("CREATE SCHEMA IF NOT EXISTS %s;", schemaName);
SnowflakeDatabase.executeSync(connectionFactory, query);
schemaSet.add(schemaName);
}
final String query = String.format(
"CREATE TABLE IF NOT EXISTS \"%s\" ( \n"
+ "\"ab_id\" VARCHAR PRIMARY KEY,\n"
"CREATE TABLE IF NOT EXISTS %s.%s ( \n"
+ "ab_id VARCHAR PRIMARY KEY,\n"
+ "\"%s\" VARIANT,\n"
+ "\"emitted_at\" TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp()\n"
+ "emitted_at TIMESTAMP WITH TIME ZONE DEFAULT current_timestamp()\n"
+ ") data_retention_time_in_days = 0;",
tmpTableName, COLUMN_NAME);
schemaName, tmpTableName, COLUMN_NAME);
SnowflakeDatabase.executeSync(connectionFactory, query);
final Path queueRoot = Files.createTempDirectory("queues");
final BigQueue writeBuffer = new BigQueue(queueRoot.resolve(stream.getStream().getName()), stream.getStream().getName());
writeBuffers.put(stream.getStream().getName(), new SnowflakeWriteContext(tableName, tmpTableName, writeBuffer));
writeBuffers.put(stream.getStream().getName(), new SnowflakeWriteContext(schemaName, tableName, tmpTableName, writeBuffer));
}
// write to transient tables

View File

@@ -95,10 +95,11 @@ public class SnowflakeRecordConsumer extends FailureTrackingConsumer<AirbyteMess
Map<String, SnowflakeWriteContext> writeBuffers,
Supplier<Connection> connectionFactory) {
for (final Map.Entry<String, SnowflakeWriteContext> entry : writeBuffers.entrySet()) {
final String schemaName = entry.getValue().getSchemaName();
final String tmpTableName = entry.getValue().getTmpTableName();
final CloseableQueue<byte[]> writeBuffer = entry.getValue().getWriteBuffer();
while (writeBuffer.size() > minRecords) {
executeWriteQuery(connectionFactory, batchSize, writeBuffer, tmpTableName);
executeWriteQuery(connectionFactory, batchSize, writeBuffer, schemaName, tmpTableName);
}
}
}
@@ -106,6 +107,7 @@ public class SnowflakeRecordConsumer extends FailureTrackingConsumer<AirbyteMess
private static void executeWriteQuery(Supplier<Connection> connectionFactory,
int batchSize,
CloseableQueue<byte[]> writeBuffer,
String schemaName,
String tmpTableName) {
final List<AirbyteRecordMessage> records = accumulateRecordsFromBuffer(writeBuffer, batchSize);
@@ -124,7 +126,8 @@ public class SnowflakeRecordConsumer extends FailureTrackingConsumer<AirbyteMess
// 1) Loop over records to build the full string.
// 2) Loop over the records and bind the appropriate values to the string.
final StringBuilder sql = new StringBuilder().append(String.format(
"INSERT INTO \"%s\" (\"ab_id\", \"%s\", \"emitted_at\") SELECT column1, parse_json(column2), column3 FROM VALUES\n",
"INSERT INTO %s.%s (ab_id, \"%s\", emitted_at) SELECT column1, parse_json(column2), column3 FROM VALUES\n",
schemaName,
tmpTableName,
SnowflakeDestination.COLUMN_NAME));
@@ -201,8 +204,9 @@ public class SnowflakeRecordConsumer extends FailureTrackingConsumer<AirbyteMess
final StringBuilder query = new StringBuilder();
query.append("BEGIN;");
for (final SnowflakeWriteContext writeContext : writeContexts.values()) {
query.append(String.format("DROP TABLE IF EXISTS \"%s\";\n", writeContext.getTableName()));
query.append(String.format("ALTER TABLE \"%s\" RENAME TO \"%s\";\n", writeContext.getTmpTableName(), writeContext.getTableName()));
query.append(String.format("DROP TABLE IF EXISTS %s.%s;\n", writeContext.getSchemaName(), writeContext.getTableName()));
query.append(String.format("ALTER TABLE %s.%s RENAME TO %s.%s;\n", writeContext.getSchemaName(), writeContext.getTmpTableName(),
writeContext.getSchemaName(), writeContext.getTableName()));
}
query.append("COMMIT;");
@@ -221,7 +225,8 @@ public class SnowflakeRecordConsumer extends FailureTrackingConsumer<AirbyteMess
private static void cleanupTmpTables(Supplier<Connection> connectionFactory, Map<String, SnowflakeWriteContext> writeContexts) {
for (SnowflakeWriteContext writeContext : writeContexts.values()) {
try {
SnowflakeDatabase.executeSync(connectionFactory, String.format("DROP TABLE IF EXISTS \"%s\";", writeContext.getTmpTableName()));
SnowflakeDatabase.executeSync(connectionFactory,
String.format("DROP TABLE IF EXISTS %s.%s;", writeContext.getSchemaName(), writeContext.getTmpTableName()));
} catch (SQLException | InterruptedException e) {
throw new RuntimeException(e);
}

View File

@@ -0,0 +1,36 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.integrations.destination.snowflake;
import io.airbyte.integrations.base.ExtendedSQLNaming;
public class SnowflakeSQLNaming extends ExtendedSQLNaming {
@Override
protected String applyDefaultCase(String input) {
return input.toUpperCase();
}
}

View File

@@ -28,16 +28,22 @@ import io.airbyte.commons.lang.CloseableQueue;
public class SnowflakeWriteContext {
private final String schemaName;
private final String tableName;
private final String tmpTableName;
private final CloseableQueue<byte[]> writeBuffer;
SnowflakeWriteContext(String tableName, String tmpTableName, CloseableQueue<byte[]> writeBuffer) {
SnowflakeWriteContext(String schemaName, String tableName, String tmpTableName, CloseableQueue<byte[]> writeBuffer) {
this.schemaName = schemaName;
this.tableName = tableName;
this.tmpTableName = tmpTableName;
this.writeBuffer = writeBuffer;
}
public String getSchemaName() {
return schemaName;
}
public String getTableName() {
return tableName;
}

View File

@@ -28,10 +28,11 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.NamingHelper;
import io.airbyte.integrations.base.ExtendedSQLNaming;
import io.airbyte.integrations.standardtest.destination.TestDestination;
import java.nio.file.Path;
import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
@@ -43,6 +44,7 @@ public class SnowflakeIntegrationTest extends TestDestination {
private JsonNode baseConfig;
// config which refers to the schema that the test is being run in.
private JsonNode config;
private ExtendedSQLNaming namingResolver = new ExtendedSQLNaming();
@Override
protected String getImageName() {
@@ -67,7 +69,7 @@ public class SnowflakeIntegrationTest extends TestDestination {
@Override
protected List<JsonNode> retrieveRecords(TestDestinationEnv env, String streamName) throws Exception {
return retrieveRecordsFromTable(env, NamingHelper.getRawTableName(streamName))
return retrieveRecordsFromTable(env, namingResolver.getRawTableName(streamName))
.stream()
.map(j -> Jsons.deserialize(j.get(COLUMN_NAME).asText()))
.collect(Collectors.toList());
@@ -80,13 +82,31 @@ public class SnowflakeIntegrationTest extends TestDestination {
@Override
protected List<JsonNode> retrieveNormalizedRecords(TestDestinationEnv testEnv, String streamName) throws Exception {
return retrieveRecordsFromTable(testEnv, streamName);
String tableName = namingResolver.getIdentifier(streamName);
if (!tableName.startsWith("\"")) {
// Currently, Normalization always quote tables identifiers
tableName = "\"" + tableName + "\"";
}
return retrieveRecordsFromTable(testEnv, tableName);
}
@Override
protected List<String> resolveIdentifier(String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
private List<JsonNode> retrieveRecordsFromTable(TestDestinationEnv env, String tableName) throws SQLException, InterruptedException {
return SnowflakeDatabase.executeSync(
SnowflakeDatabase.getConnectionFactory(getConfig()),
String.format("SELECT * FROM \"%s\" ORDER BY \"emitted_at\" ASC;", tableName),
String.format("SELECT * FROM %s ORDER BY emitted_at ASC;", tableName),
false,
rs -> {
try {

View File

@@ -22,6 +22,8 @@ OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import os
def test_example_method():
assert True
assert os.path.commonpath(["/usr/lib", "/usr/local/lib"]) == "/usr"

View File

@@ -59,7 +59,7 @@ Note that queries written in BigQueries can only reference Datasets in the same
#### Service account
In order for Airbyte to sync data into BigQuery, it needs credentials for a [Service Account](https://cloud.google.com/iam/docs/service-accounts) with the "BigQuery User" role, which grants permissions to run BigQuery jobs, write to BigQuery Datasets, and read table metadata.We highly recommend that this Service Account is exclusive to Airbyte for ease of permissioning and auditing. However, you can use a pre-existing Service Account if you already have one with the correct permissions.
In order for Airbyte to sync data into BigQuery, it needs credentials for a [Service Account](https://cloud.google.com/iam/docs/service-accounts) with the "BigQuery User" and "BigQuery Data Editor" roles, which grants permissions to run BigQuery jobs, write to BigQuery Datasets, and read table metadata. We highly recommend that this Service Account is exclusive to Airbyte for ease of permissioning and auditing. However, you can use a pre-existing Service Account if you already have one with the correct permissions.
The easiest way to create a Service Account is to follow GCP's guide for [Creating a Service Account](https://cloud.google.com/iam/docs/creating-managing-service-accounts). Once you've created the Service Account, make sure to keep its ID handy as you will need to reference it when granting roles. Service Account IDs typically take the form `<account-name>@<project-name>.iam.gserviceaccount.com`
@@ -84,3 +84,17 @@ You should now have all the requirements needed to configure BigQuery as a desti
Once you've configured BigQuery as a destination, delete the Service Account Key from your computer.
## Notes about BigQuery Naming Conventions
From [BigQuery Datasets Naming](https://cloud.google.com/bigquery/docs/datasets#dataset-naming):
When you create a dataset in BigQuery, the dataset name must be unique for each project. The dataset name can contain the following:
- Up to 1,024 characters.
- Letters (uppercase or lowercase), numbers, and underscores.
Note: In the Cloud Console, datasets that begin with an underscore are hidden from the navigation pane. You can query tables and views in these datasets even though these datasets aren't visible.
- Dataset names are case-sensitive: mydataset and MyDataset can coexist in the same project.
- Dataset names cannot contain spaces or special characters such as -, &, @, or %.
Therefore, Airbyte BigQuery destination will convert any invalid characters into '_' characters when writing data.

View File

@@ -56,3 +56,18 @@ You should now have all the requirements needed to configure Postgres as a desti
* **Database**
* This database needs to exist within the schema provided.
## Notes about Postgres Naming Conventions
From [Postgres SQL Identifiers syntax](https://www.postgresql.org/docs/9.0/sql-syntax-lexical.html#SQL-SYNTAX-IDENTIFIERS):
- SQL identifiers and key words must begin with a letter (a-z, but also letters with diacritical marks and non-Latin letters) or an underscore (_).
- Subsequent characters in an identifier or key word can be letters, underscores, digits (0-9), or dollar signs ($).
Note that dollar signs are not allowed in identifiers according to the letter of the SQL standard, so their use might render applications less portable. The SQL standard will not define a key word that contains digits or starts or ends with an underscore, so identifiers of this form are safe against possible conflict with future extensions of the standard.
- The system uses no more than NAMEDATALEN-1 bytes of an identifier; longer names can be written in commands, but they will be truncated. By default, NAMEDATALEN is 64 so the maximum identifier length is 63 bytes
- Quoted identifiers can contain any character, except the character with code zero. (To include a double quote, write two double quotes.) This allows constructing table or column names that would otherwise not be possible, such as ones containing spaces or ampersands. The length limitation still applies.
- Quoting an identifier also makes it case-sensitive, whereas unquoted names are always folded to lower case.
- If you want to write portable applications you are advised to always quote a particular name or never quote it.
Therefore, Airbyte Postgres destination will create tables and schemas using the Unquoted identifiers when possible or fallback to Quoted Identifiers if the names are containing special characters.

View File

@@ -0,0 +1,22 @@
# Redshift
## Overview
Not released yet
## Notes about Redshift Naming Conventions
From [Redshift Names & Identifiers](https://docs.aws.amazon.com/redshift/latest/dg/r_names.html):
### Standard Identifiers
- Begin with an ASCII single-byte alphabetic character or underscore character, or a UTF-8 multibyte character two to four bytes long.
- Subsequent characters can be ASCII single-byte alphanumeric characters, underscores, or dollar signs, or UTF-8 multibyte characters two to four bytes long.
- Be between 1 and 127 bytes in length, not including quotation marks for delimited identifiers.
- Contain no quotation marks and no spaces.
### Delimited Identifiers
Delimited identifiers (also known as quoted identifiers) begin and end with double quotation marks ("). If you use a delimited identifier, you must use the double quotation marks for every reference to that object. The identifier can contain any standard UTF-8 printable characters other than the double quotation mark itself. Therefore, you can create column or table names that include otherwise illegal characters, such as spaces or the percent symbol.
ASCII letters in delimited identifiers are case-insensitive and are folded to lowercase. To use a double quotation mark in a string, you must precede it with another double quotation mark character.
Therefore, Airbyte Redshift destination will create tables and schemas using the Unquoted identifiers when possible or fallback to Quoted Identifiers if the names are containing special characters.

View File

@@ -101,3 +101,33 @@ You should now have all the requirements needed to configure Snowflake as a dest
* **Username**
* **Password**
## Notes about Snowflake Naming Conventions
From [Snowflake Identifiers syntax](https://docs.snowflake.com/en/sql-reference/identifiers-syntax.html):
### Unquoted Identifiers:
- Start with a letter (A-Z, a-z) or an underscore (“_”).
- Contain only letters, underscores, decimal digits (0-9), and dollar signs (“$”).
- Are case-insensitive.
When an identifier is unquoted, it is stored and resolved in uppercase.
### Quoted Identifiers:
- The identifier is case-sensitive.
- Delimited identifiers (i.e. identifiers enclosed in double quotes) can start with and contain any valid characters, including:
- Numbers
- Special characters (., ', !, @, #, $, %, ^, &, *, etc.)
- Extended ASCII and non-ASCII characters
- Blank spaces
When an identifier is double-quoted, it is stored and resolved exactly as entered, including case.
### Note
- Regardless of whether an identifier is unquoted or double-quoted, the maximum number of characters allowed is 255 (including blank spaces).
- Identifiers can also be specified using string literals, session variables or bind variables. For details, see SQL Variables.
- If an object is created using a double-quoted identifier, when referenced in a query or any other SQL statement, the identifier must be specified exactly as created, including the double quotes. Failure to include the quotes might result in an Object does not exist error (or similar type of error).
- Also, note that the entire identifier must be enclosed in quotes when referenced in a query/SQL statement. This is particularly important if periods (.) are used in identifiers because periods are also used in fully-qualified object names to separate each object.
Therefore, Airbyte Snowflake destination will create tables and schemas using the Unquoted identifiers when possible or fallback to Quoted Identifiers if the names are containing special characters.