Apply Dataline schema to discovered singer catalog (#93)
This commit is contained in:
@@ -12,102 +12,209 @@
|
||||
"streams": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"additionalProperties": true,
|
||||
"required": [
|
||||
"tap_stream_id",
|
||||
"stream",
|
||||
"schema"
|
||||
],
|
||||
"properties": {
|
||||
"tap_stream_id": {
|
||||
"type": "string"
|
||||
},
|
||||
"stream": {
|
||||
"type": "string"
|
||||
},
|
||||
"schema": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string"
|
||||
},
|
||||
"properties": {
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"properties": {
|
||||
"type": {
|
||||
"$ref": "SingerCatalog.json#/definitions/SingerType"
|
||||
},
|
||||
"format": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"metadata": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "object",
|
||||
"required": [
|
||||
"metadata"
|
||||
],
|
||||
"properties": {
|
||||
"metadata": {
|
||||
"type": "object",
|
||||
"properties": {
|
||||
"inclusion": {
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"available",
|
||||
"?"
|
||||
]
|
||||
},
|
||||
"table-key-properties": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"selected": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"validation-replication-keys": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"schema-name": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
},
|
||||
"breadcrumbs": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
"$ref": "SingerCatalog.json#/definitions/SingerStream"
|
||||
}
|
||||
}
|
||||
},
|
||||
"definitions": {
|
||||
"SingerStream": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"required": [
|
||||
"tap_stream_id",
|
||||
"stream",
|
||||
"schema"
|
||||
],
|
||||
"properties": {
|
||||
"stream": {
|
||||
"type": "string",
|
||||
"description": "The name of the stream."
|
||||
},
|
||||
"tap_stream_id": {
|
||||
"type": "string",
|
||||
"description": "The unique identifier for the stream. This is allowed to be different from the name of the stream in order to allow for sources that have duplicate stream names."
|
||||
},
|
||||
"schema": {
|
||||
"$ref": "SingerCatalog.json#/definitions/SingerTableSchema"
|
||||
},
|
||||
"table_name": {
|
||||
"type": "string",
|
||||
"description": "For a database source, the name of the table."
|
||||
},
|
||||
"metadata": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"$ref": "SingerCatalog.json#/definitions/SingerMetadata"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"SingerTableSchema": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"required": [
|
||||
"properties"
|
||||
],
|
||||
"description": "The JSON schema for the stream. This struct is weird because it's essentially modeling the properties field of a jsonschema object.",
|
||||
"properties": {
|
||||
"type": {
|
||||
"type": "string"
|
||||
},
|
||||
"properties": {
|
||||
"$ref": "SingerCatalog.json#/definitions/SingerColumnMap"
|
||||
},
|
||||
"definitions": {
|
||||
"description": "placeholder for definitions that are included since it is all jsonchema."
|
||||
}
|
||||
}
|
||||
},
|
||||
"SingerColumnMap": {
|
||||
"type": "object",
|
||||
"additionalProperties": {
|
||||
"$ref": "SingerCatalog.json#/definitions/SingerColumn"
|
||||
}
|
||||
},
|
||||
"SingerColumn": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"required": [
|
||||
"type"
|
||||
],
|
||||
"properties": {
|
||||
"type": {
|
||||
"$ref": "SingerCatalog.json#/definitions/SingerType"
|
||||
},
|
||||
"format": {
|
||||
"type": "string"
|
||||
},
|
||||
"selected": {
|
||||
"description": "Deprecated: Some legacy Taps handle stream and field selection by looking for \"selected\": true directly in the stream's schema in the catalog.json file (called properties.json in legacy taps).",
|
||||
"type": "boolean"
|
||||
},
|
||||
"minimum": {
|
||||
"type": "integer"
|
||||
},
|
||||
"maximum": {
|
||||
"type": "integer"
|
||||
},
|
||||
"maxLength": {
|
||||
"type": "integer"
|
||||
}
|
||||
}
|
||||
},
|
||||
"SingerMetadata": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"required": [
|
||||
"metadata",
|
||||
"breadcrumb"
|
||||
],
|
||||
"properties": {
|
||||
"metadata": {
|
||||
"$ref": "SingerCatalog.json#/definitions/SingerMetadataChild"
|
||||
},
|
||||
"breadcrumb": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
}
|
||||
}
|
||||
},
|
||||
"SingerMetadataChild": {
|
||||
"type": "object",
|
||||
"additionalProperties": false,
|
||||
"required": [],
|
||||
"properties": {
|
||||
"selected": {
|
||||
"description": "Non-Discoverable: Indicates that this node in the schema has been selected by the user for replication.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"replication-method": {
|
||||
"description": "Non-Discoverable: Either FULL_TABLE, INCREMENTAL, or LOG_BASED. The replication method to use for a stream.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"FULL_TABLE",
|
||||
"INCREMENTAL",
|
||||
"LOG_BASED"
|
||||
]
|
||||
},
|
||||
"replication-key": {
|
||||
"description": "Non-Discoverable: The name of a property in the source to use as a \"bookmark\". For example, this will often be an \"updated-at\" field or an auto-incrementing primary key (requires replication-method",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"view-key-properties": {
|
||||
"description": "Non Discoverable: List of key properties for a database view.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"inclusion": {
|
||||
"description": "Discoverable: available means the field is available for selection, and the tap will only emit values for that field if it is marked with \"selected\": true. automatic means that the tap will emit values for the field. unsupported means that the field exists in the source data but the tap is unable to provide it.",
|
||||
"type": "string",
|
||||
"enum": [
|
||||
"available",
|
||||
"automatic",
|
||||
"unsupported"
|
||||
]
|
||||
},
|
||||
"selected-by-default": {
|
||||
"description": "Discoverable: Indicates if a node in the schema should be replicated if a user has not expressed any opinion on whether or not to replicate it.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"valid-replication-keys": {
|
||||
"description": "Discoverable: List of the fields that could be used as replication keys.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"forced-replication-method": {
|
||||
"description": "Discoverable: Used to force the replication method to either FULL_TABLE or INCREMENTAL.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"table-key-properties": {
|
||||
"description": "Discoverable: List of key properties for a database table.",
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string"
|
||||
}
|
||||
},
|
||||
"schema-name": {
|
||||
"description": "Discoverable: The name of the stream.",
|
||||
"type": "string"
|
||||
},
|
||||
"is-view": {
|
||||
"description": "Discoverable: Indicates whether a stream corresponds to a database view.",
|
||||
"type": "boolean"
|
||||
},
|
||||
"row-count": {
|
||||
"description": "Discoverable: Number of rows in a database table/view.",
|
||||
"type": "integer"
|
||||
},
|
||||
"database-name": {
|
||||
"description": "Discoverable: Name of database.",
|
||||
"type": "string"
|
||||
},
|
||||
"sql-datatype": {
|
||||
"description": "Discoverable: Represents the datatype of a database column."
|
||||
}
|
||||
}
|
||||
},
|
||||
"SingerType": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
"type": "string",
|
||||
"enum": ["string", "integer", "null", "boolean"]
|
||||
"enum": [
|
||||
"string",
|
||||
"integer",
|
||||
"null",
|
||||
"boolean"
|
||||
]
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -29,6 +29,9 @@
|
||||
"name": {
|
||||
"type": "string"
|
||||
},
|
||||
"selected": {
|
||||
"type": "boolean"
|
||||
},
|
||||
"columns": {
|
||||
"type": "array",
|
||||
"items": {
|
||||
@@ -41,7 +44,8 @@
|
||||
"type": "object",
|
||||
"required": [
|
||||
"name",
|
||||
"dataType"
|
||||
"dataType",
|
||||
"selected"
|
||||
],
|
||||
"additionalProperties": false,
|
||||
"properties": {
|
||||
@@ -50,6 +54,10 @@
|
||||
},
|
||||
"dataType": {
|
||||
"$ref": "DataType.json"
|
||||
},
|
||||
"selected": {
|
||||
"description": "whether or not the column will be replicated.",
|
||||
"type": "boolean"
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,276 @@
|
||||
/*
|
||||
* MIT License
|
||||
*
|
||||
* Copyright (c) 2020 Dataline
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*/
|
||||
|
||||
package io.dataline.workers.singer;
|
||||
|
||||
import io.dataline.config.Column;
|
||||
import io.dataline.config.DataType;
|
||||
import io.dataline.config.Schema;
|
||||
import io.dataline.config.SingerCatalog;
|
||||
import io.dataline.config.SingerColumn;
|
||||
import io.dataline.config.SingerMetadata;
|
||||
import io.dataline.config.SingerMetadataChild;
|
||||
import io.dataline.config.SingerStream;
|
||||
import io.dataline.config.SingerType;
|
||||
import io.dataline.config.Table;
|
||||
import java.util.ArrayList;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.stream.Collectors;
|
||||
|
||||
public class SingerCatalogConverters {
|
||||
|
||||
/**
|
||||
* Takes in a singer catalog and a dataline schema. It then applies the dataline configuration to
|
||||
* that catalog. e.g. If dataline says that a certain column should or should not be included in
|
||||
* the sync, this method applies that to the catalog. Thus we produce a valid singer catalog that
|
||||
* contains configurations stored in dataline.
|
||||
*
|
||||
* @param catalog - singer catalog
|
||||
* @param schema - dataline schema
|
||||
* @return singer catalog with dataline schema applied to it.
|
||||
*/
|
||||
public static SingerCatalog applySchemaToDiscoveredCatalog(SingerCatalog catalog, Schema schema) {
|
||||
Map<String, Table> tableNameToTable =
|
||||
schema.getTables().stream().collect(Collectors.toMap(Table::getName, table -> table));
|
||||
|
||||
final List<SingerStream> updatedStreams =
|
||||
catalog.getStreams().stream()
|
||||
.map(
|
||||
stream -> {
|
||||
// recourse here is probably to run discovery again and update sync
|
||||
// configuration. this method just outputs the original metadata.
|
||||
if (!tableNameToTable.containsKey(stream.getStream())) {
|
||||
return stream;
|
||||
}
|
||||
final Table table = tableNameToTable.get(stream.getStream());
|
||||
final Map<String, Column> columnNameToColumn =
|
||||
table.getColumns().stream()
|
||||
.collect(Collectors.toMap(Column::getName, column -> column));
|
||||
|
||||
final List<SingerMetadata> newMetadata =
|
||||
stream.getMetadata().stream()
|
||||
.map(
|
||||
metadata -> {
|
||||
final SingerMetadata newSingerMetadata =
|
||||
cloneSingerMetadata(metadata);
|
||||
if (isColumnMetadata(metadata)) {
|
||||
// column metadata
|
||||
final String columnName = getColumnName(metadata);
|
||||
// recourse here is probably to run discovery again and update
|
||||
// sync configuration. this method just outputs the original
|
||||
// metadata.
|
||||
if (!columnNameToColumn.containsKey(columnName)) {
|
||||
return metadata;
|
||||
}
|
||||
final Column column = columnNameToColumn.get(columnName);
|
||||
|
||||
newSingerMetadata.getMetadata().setSelected(column.getSelected());
|
||||
} else {
|
||||
// table metadata
|
||||
newSingerMetadata.getMetadata().setSelected(table.getSelected());
|
||||
}
|
||||
return newSingerMetadata;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final SingerStream newSingerStream = new SingerStream();
|
||||
newSingerStream.setStream(stream.getStream());
|
||||
newSingerStream.setTableName(stream.getTableName());
|
||||
newSingerStream.setTapStreamId(stream.getTapStreamId());
|
||||
newSingerStream.setMetadata(newMetadata);
|
||||
// todo (cgardens) - this will not work for legacy catalogs. want to handle this
|
||||
// in a subsequent PR, because handling this is going to require doing another
|
||||
// one of these monster map tasks.
|
||||
newSingerStream.setSchema(stream.getSchema());
|
||||
|
||||
return newSingerStream;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final SingerCatalog outputCatalog = new SingerCatalog();
|
||||
outputCatalog.setStreams(updatedStreams);
|
||||
|
||||
return outputCatalog;
|
||||
}
|
||||
|
||||
// assumes discoverable input only.
|
||||
public static Schema toDatalineSchema(SingerCatalog catalog) {
|
||||
Map<String, List<SingerMetadata>> tableNameToMetadata =
|
||||
getTableNameToMetadataList(catalog.getStreams());
|
||||
|
||||
List<Table> tables =
|
||||
catalog.getStreams().stream()
|
||||
.map(
|
||||
stream -> {
|
||||
final Map<String, SingerMetadataChild> columnNameToMetadata =
|
||||
getColumnMetadataForTable(tableNameToMetadata, stream.getStream());
|
||||
final SingerMetadata tableMetadata =
|
||||
tableNameToMetadata.get(stream.getStream()).stream()
|
||||
.filter(metadata -> metadata.getBreadcrumb().equals(new ArrayList<>()))
|
||||
.findFirst()
|
||||
.orElseThrow(() -> new RuntimeException("Could not find table metadata"));
|
||||
final Table table = new Table();
|
||||
table.setName(stream.getStream());
|
||||
table.setSelected(
|
||||
tableMetadata.getMetadata().getSelectedByDefault() == null
|
||||
? false
|
||||
: tableMetadata.getMetadata().getSelectedByDefault());
|
||||
table.setColumns(
|
||||
stream
|
||||
.getSchema()
|
||||
.getProperties()
|
||||
.getAdditionalProperties()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(
|
||||
entry -> {
|
||||
final String columnName = entry.getKey();
|
||||
final SingerColumn singerColumn = entry.getValue();
|
||||
final SingerMetadataChild singerColumnMetadata =
|
||||
columnNameToMetadata.get(columnName);
|
||||
|
||||
final Column column = new Column();
|
||||
column.setName(columnName);
|
||||
column.setDataType(singerTypesToDataType(singerColumn.getType()));
|
||||
// in discovery, you can find columns that are replicated by
|
||||
// default. we set those to selected. the rest are not.
|
||||
column.setSelected(singerColumnMetadata.getSelectedByDefault());
|
||||
return column;
|
||||
})
|
||||
.collect(Collectors.toList()));
|
||||
return table;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final Schema schema = new Schema();
|
||||
schema.setTables(tables);
|
||||
return schema;
|
||||
}
|
||||
|
||||
private static Map<String, List<SingerMetadata>> getTableNameToMetadataList(
|
||||
List<SingerStream> streams) {
|
||||
// todo (cgardens) - figure out if it's stream or stream id or table name.
|
||||
return streams.stream()
|
||||
.collect(Collectors.toMap(SingerStream::getStream, SingerStream::getMetadata));
|
||||
}
|
||||
|
||||
private static Map<String, SingerMetadataChild> getColumnMetadataForTable(
|
||||
Map<String, List<SingerMetadata>> tableNameToMetadata, String tableName) {
|
||||
if (!tableNameToMetadata.containsKey(tableName)) {
|
||||
throw new RuntimeException("could not find metadata for table: " + tableName);
|
||||
}
|
||||
return tableNameToMetadata.get(tableName).stream()
|
||||
// singer breadcrumb is empty if it is table metadata and it it has two
|
||||
// items if it is column metadata. the first item is "properties" and
|
||||
// the second item is the column name.
|
||||
.filter(SingerCatalogConverters::isColumnMetadata)
|
||||
.collect(
|
||||
Collectors.toMap(
|
||||
metadata -> metadata.getBreadcrumb().get(1), SingerMetadata::getMetadata));
|
||||
}
|
||||
|
||||
private static boolean isColumnMetadata(SingerMetadata metadata) {
|
||||
// column metadata must have 2 breadcrumb entries
|
||||
if (metadata.getBreadcrumb().size() != 2) {
|
||||
return false;
|
||||
}
|
||||
// column metadata must have first breadcrumb be property
|
||||
return !metadata.getBreadcrumb().get(0).equals("property");
|
||||
}
|
||||
|
||||
private static String getColumnName(SingerMetadata metadata) {
|
||||
if (!isColumnMetadata(metadata)) {
|
||||
throw new RuntimeException("Cannot get column name for non-column metadata");
|
||||
}
|
||||
|
||||
return metadata.getBreadcrumb().get(1);
|
||||
}
|
||||
|
||||
/**
|
||||
* Singer tends to have 2 types for columns one of which is null. The null is pretty irrelevant,
|
||||
* so look at types and find the first non-null one and use that.
|
||||
*
|
||||
* @param singerTypes - list of types discovered by singer.
|
||||
* @return reduce down to one type which best matches the column's data type
|
||||
*/
|
||||
private static DataType singerTypesToDataType(List<SingerType> singerTypes) {
|
||||
return singerTypes.stream()
|
||||
.filter(singerType -> !SingerType.NULL.equals(singerType))
|
||||
.map(SingerCatalogConverters::singerTypeToDataType)
|
||||
.findFirst()
|
||||
.orElse(DataType.STRING);
|
||||
}
|
||||
|
||||
/**
|
||||
* Singer doesn't seem to have an official list of the data types that they support, so we will
|
||||
* have to do our best here as we discover them. If it becomes too awful, we can just map types we
|
||||
* don't recognize to string.
|
||||
*
|
||||
* @param singerType - singer's column data type
|
||||
* @return best match for our own data type
|
||||
*/
|
||||
private static DataType singerTypeToDataType(SingerType singerType) {
|
||||
switch (singerType) {
|
||||
case STRING:
|
||||
return DataType.STRING;
|
||||
case INTEGER:
|
||||
return DataType.NUMBER;
|
||||
case NULL:
|
||||
//noinspection DuplicateBranchesInSwitch
|
||||
return DataType.STRING; // todo (cgardens) - hackasaurus rex
|
||||
case BOOLEAN:
|
||||
return DataType.BOOLEAN;
|
||||
default:
|
||||
throw new RuntimeException(
|
||||
String.format("could not map SingerType: %s to DataType", singerType));
|
||||
}
|
||||
}
|
||||
|
||||
private static SingerMetadata cloneSingerMetadata(SingerMetadata toClone) {
|
||||
// bad variable name. tradeoff to keep stuff on one line.
|
||||
SingerMetadataChild toClone2 = toClone.getMetadata();
|
||||
final SingerMetadataChild singerMetadataChild = new SingerMetadataChild();
|
||||
singerMetadataChild.setSelected(toClone2.getSelected());
|
||||
singerMetadataChild.setReplicationMethod(toClone2.getReplicationMethod());
|
||||
singerMetadataChild.setReplicationKey(toClone2.getReplicationKey());
|
||||
singerMetadataChild.setViewKeyProperties(toClone2.getViewKeyProperties());
|
||||
singerMetadataChild.setInclusion(toClone2.getInclusion());
|
||||
singerMetadataChild.setSelectedByDefault(toClone2.getSelectedByDefault());
|
||||
singerMetadataChild.setValidReplicationKeys(toClone2.getValidReplicationKeys());
|
||||
singerMetadataChild.setForcedReplicationMethod(toClone2.getForcedReplicationMethod());
|
||||
singerMetadataChild.setTableKeyProperties(toClone2.getTableKeyProperties());
|
||||
singerMetadataChild.setSchemaName(toClone2.getSchemaName());
|
||||
singerMetadataChild.setIsView(toClone2.getIsView());
|
||||
singerMetadataChild.setRowCount(toClone2.getRowCount());
|
||||
singerMetadataChild.setDatabaseName(toClone2.getDatabaseName());
|
||||
singerMetadataChild.setSqlDatatype(toClone2.getSqlDatatype());
|
||||
|
||||
final SingerMetadata singerMetadata = new SingerMetadata();
|
||||
singerMetadata.setBreadcrumb(new ArrayList<>(toClone.getBreadcrumb()));
|
||||
singerMetadata.setMetadata(singerMetadataChild);
|
||||
|
||||
return singerMetadata;
|
||||
}
|
||||
}
|
||||
@@ -29,22 +29,15 @@ import static io.dataline.workers.JobStatus.SUCCESSFUL;
|
||||
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.dataline.config.Column;
|
||||
import io.dataline.config.ConnectionImplementation;
|
||||
import io.dataline.config.DataType;
|
||||
import io.dataline.config.PropertiesProperty;
|
||||
import io.dataline.config.Schema;
|
||||
import io.dataline.config.SingerCatalog;
|
||||
import io.dataline.config.SingerType;
|
||||
import io.dataline.config.StandardDiscoveryOutput;
|
||||
import io.dataline.config.Table;
|
||||
import io.dataline.workers.DiscoverSchemaWorker;
|
||||
import io.dataline.workers.OutputAndStatus;
|
||||
import java.io.IOException;
|
||||
import java.nio.file.Path;
|
||||
import java.util.List;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.stream.Collectors;
|
||||
import org.slf4j.Logger;
|
||||
import org.slf4j.LoggerFactory;
|
||||
|
||||
@@ -134,82 +127,13 @@ public class SingerDiscoveryWorker
|
||||
}
|
||||
|
||||
private static StandardDiscoveryOutput toDiscoveryOutput(SingerCatalog catalog) {
|
||||
List<Table> tableStream =
|
||||
catalog.getStreams().stream()
|
||||
.map(
|
||||
stream -> {
|
||||
final Table table = new Table();
|
||||
table.setName(
|
||||
stream.getStream()); // todo (cgardens) - is stream the same as table name?
|
||||
table.setColumns(
|
||||
stream
|
||||
.getSchema()
|
||||
.getProperties()
|
||||
.getAdditionalProperties()
|
||||
.entrySet()
|
||||
.stream()
|
||||
.map(
|
||||
entry -> {
|
||||
final String columnName = entry.getKey();
|
||||
final PropertiesProperty columnMetadata = entry.getValue();
|
||||
final Column column = new Column();
|
||||
column.setName(columnName);
|
||||
column.setDataType(singerTypesToDataType(columnMetadata.getType()));
|
||||
return column;
|
||||
})
|
||||
.collect(Collectors.toList()));
|
||||
return table;
|
||||
})
|
||||
.collect(Collectors.toList());
|
||||
|
||||
final Schema schema = new Schema();
|
||||
schema.setTables(tableStream);
|
||||
final Schema schema = SingerCatalogConverters.toDatalineSchema(catalog);
|
||||
final StandardDiscoveryOutput discoveryOutput = new StandardDiscoveryOutput();
|
||||
discoveryOutput.setSchema(schema);
|
||||
|
||||
return discoveryOutput;
|
||||
}
|
||||
|
||||
/**
|
||||
* Singer tends to have 2 types for columns one of which is null. The null is pretty irrelevant,
|
||||
* so look at types and find the first non-null one and use that.
|
||||
*
|
||||
* @param singerTypes - list of types discovered by singer.
|
||||
* @return reduce down to one type which best matches the column's data type
|
||||
*/
|
||||
private static DataType singerTypesToDataType(List<SingerType> singerTypes) {
|
||||
return singerTypes.stream()
|
||||
.filter(singerType -> !SingerType.NULL.equals(singerType))
|
||||
.map(SingerDiscoveryWorker::singerTypeToDataType)
|
||||
.findFirst()
|
||||
.orElse(DataType.STRING);
|
||||
}
|
||||
|
||||
/**
|
||||
* Singer doesn't seem to have an official list of the data types that they support, so we will
|
||||
* have to do our best here as we discover them. If it becomes too awful, we can just map types we
|
||||
* don't recognize to string.
|
||||
*
|
||||
* @param singerType - singer's column data type
|
||||
* @return best match for our own data type
|
||||
*/
|
||||
private static DataType singerTypeToDataType(SingerType singerType) {
|
||||
switch (singerType) {
|
||||
case STRING:
|
||||
return DataType.STRING;
|
||||
case INTEGER:
|
||||
return DataType.NUMBER;
|
||||
case NULL:
|
||||
//noinspection DuplicateBranchesInSwitch
|
||||
return DataType.STRING; // todo (cgardens) - hackasaurus rex
|
||||
case BOOLEAN:
|
||||
return DataType.BOOLEAN;
|
||||
default:
|
||||
throw new RuntimeException(
|
||||
String.format("could not map SingerType: %s to DataType", singerType));
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void cancel() {
|
||||
cancelHelper(workerProcess);
|
||||
|
||||
@@ -24,7 +24,12 @@
|
||||
|
||||
package io.dataline.workers;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.io.Resources;
|
||||
import java.io.File;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.Charset;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import org.junit.jupiter.api.BeforeAll;
|
||||
@@ -55,4 +60,23 @@ public abstract class BaseWorkerTestCase {
|
||||
}
|
||||
return workspacePath;
|
||||
}
|
||||
|
||||
protected String readResource(String name) {
|
||||
URL resource = Resources.getResource(name);
|
||||
try {
|
||||
return Resources.toString(resource, Charset.defaultCharset());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
protected <T> T getJsonAsTyped(String file, Class<T> clazz) {
|
||||
final URL resource = Resources.getResource(file);
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
try {
|
||||
return objectMapper.readValue(new File(resource.getFile()), clazz);
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
@@ -0,0 +1,70 @@
|
||||
/*
|
||||
* MIT License
|
||||
*
|
||||
* Copyright (c) 2020 Dataline
|
||||
*
|
||||
* Permission is hereby granted, free of charge, to any person obtaining a copy
|
||||
* of this software and associated documentation files (the "Software"), to deal
|
||||
* in the Software without restriction, including without limitation the rights
|
||||
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
|
||||
* copies of the Software, and to permit persons to whom the Software is
|
||||
* furnished to do so, subject to the following conditions:
|
||||
*
|
||||
* The above copyright notice and this permission notice shall be included in all
|
||||
* copies or substantial portions of the Software.
|
||||
*
|
||||
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
|
||||
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
|
||||
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
|
||||
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
|
||||
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
|
||||
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
|
||||
* SOFTWARE.
|
||||
*/
|
||||
|
||||
package io.dataline.workers.singer;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.*;
|
||||
|
||||
import io.dataline.config.Schema;
|
||||
import io.dataline.config.SingerCatalog;
|
||||
import io.dataline.config.StandardDiscoveryOutput;
|
||||
import io.dataline.workers.BaseWorkerTestCase;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
class SingerCatalogConvertersTest extends BaseWorkerTestCase {
|
||||
|
||||
@Test
|
||||
void applySchemaToDiscoveredCatalog() {
|
||||
final SingerCatalog catalog =
|
||||
getJsonAsTyped("simple_postgres_singer_catalog.json", SingerCatalog.class);
|
||||
final SingerCatalog expectedCatalog =
|
||||
getJsonAsTyped("simple_postgres_singer_catalog.json", SingerCatalog.class);
|
||||
final Schema datalineSchema =
|
||||
getJsonAsTyped("simple_postgres_schema.json", StandardDiscoveryOutput.class).getSchema();
|
||||
|
||||
final SingerCatalog actualCatalog =
|
||||
SingerCatalogConverters.applySchemaToDiscoveredCatalog(catalog, datalineSchema);
|
||||
|
||||
expectedCatalog.getStreams().get(0).getMetadata().get(0).getMetadata().setSelected(false);
|
||||
expectedCatalog.getStreams().get(0).getMetadata().get(1).getMetadata().setSelected(true);
|
||||
expectedCatalog.getStreams().get(0).getMetadata().get(2).getMetadata().setSelected(true);
|
||||
|
||||
assertEquals(expectedCatalog, actualCatalog);
|
||||
}
|
||||
|
||||
@Test
|
||||
void toDatalineSchema() {
|
||||
final SingerCatalog catalog =
|
||||
getJsonAsTyped("simple_postgres_singer_catalog.json", SingerCatalog.class);
|
||||
final Schema expectedSchema =
|
||||
getJsonAsTyped("simple_postgres_schema.json", StandardDiscoveryOutput.class).getSchema();
|
||||
expectedSchema.getTables().get(0).setSelected(false);
|
||||
expectedSchema.getTables().get(0).getColumns().get(0).setSelected(true);
|
||||
expectedSchema.getTables().get(0).getColumns().get(1).setSelected(true);
|
||||
|
||||
final Schema actualSchema = SingerCatalogConverters.toDatalineSchema(catalog);
|
||||
|
||||
assertEquals(expectedSchema, actualSchema);
|
||||
}
|
||||
}
|
||||
@@ -30,15 +30,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import com.google.common.io.Resources;
|
||||
import io.dataline.config.ConnectionImplementation;
|
||||
import io.dataline.config.StandardDiscoveryOutput;
|
||||
import io.dataline.workers.BaseWorkerTestCase;
|
||||
import io.dataline.workers.OutputAndStatus;
|
||||
import io.dataline.workers.PostgreSQLContainerHelper;
|
||||
import java.io.IOException;
|
||||
import java.net.URL;
|
||||
import java.nio.charset.Charset;
|
||||
import java.sql.Connection;
|
||||
import java.sql.DriverManager;
|
||||
import java.sql.SQLException;
|
||||
@@ -80,7 +77,7 @@ public class SingerDiscoveryWorkerTest extends BaseWorkerTestCase {
|
||||
|
||||
assertEquals(SUCCESSFUL, run.getStatus());
|
||||
|
||||
String expectedSchema = readResource("simple_postgres_schema.json");
|
||||
String expectedSchema = readResource("simple_discovered_postgres_schema.json");
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
final String actualSchema = objectMapper.writeValueAsString(run.getOutput().get());
|
||||
|
||||
@@ -113,15 +110,6 @@ public class SingerDiscoveryWorkerTest extends BaseWorkerTestCase {
|
||||
workerWasCancelled.get();
|
||||
}
|
||||
|
||||
private String readResource(String name) {
|
||||
URL resource = Resources.getResource(name);
|
||||
try {
|
||||
return Resources.toString(resource, Charset.defaultCharset());
|
||||
} catch (IOException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
}
|
||||
|
||||
private void assertJsonEquals(String s1, String s2) throws IOException {
|
||||
ObjectMapper mapper = new ObjectMapper();
|
||||
assertEquals(mapper.readTree(s1), mapper.readTree(s2));
|
||||
|
||||
@@ -0,0 +1,22 @@
|
||||
{
|
||||
"schema": {
|
||||
"tables": [
|
||||
{
|
||||
"name": "id_and_name",
|
||||
"selected": false,
|
||||
"columns": [
|
||||
{
|
||||
"name": "name",
|
||||
"dataType": "string",
|
||||
"selected": true
|
||||
},
|
||||
{
|
||||
"name": "id",
|
||||
"dataType": "number",
|
||||
"selected": true
|
||||
}
|
||||
]
|
||||
}
|
||||
]
|
||||
}
|
||||
}
|
||||
@@ -3,14 +3,17 @@
|
||||
"tables": [
|
||||
{
|
||||
"name": "id_and_name",
|
||||
"selected": false,
|
||||
"columns": [
|
||||
{
|
||||
"name": "name",
|
||||
"dataType": "string"
|
||||
"dataType": "string",
|
||||
"selected": true
|
||||
},
|
||||
{
|
||||
"name": "id",
|
||||
"dataType": "number"
|
||||
"dataType": "number",
|
||||
"selected": true
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user