1
0
mirror of synced 2026-02-02 16:02:07 -05:00
Files
airbyte/docs/release_notes/destinations_v2.js
2023-08-28 13:36:02 -07:00

153 lines
5.5 KiB
JavaScript

import React, {useState} from 'react';
import CodeBlock from '@theme/CodeBlock';
function concatenateRawTableName(namespace, name) {
let plainConcat = namespace + name;
// Pretend we always have at least one underscore, so that we never generate `_raw_stream_`
let longestUnderscoreRun = 1;
for (let i = 0; i < plainConcat.length; i++) {
// If we've found an underscore, count the number of consecutive underscores
let underscoreRun = 0;
while (i < plainConcat.length && plainConcat.charAt(i) === '_') {
underscoreRun++;
i++;
}
longestUnderscoreRun = Math.max(longestUnderscoreRun, underscoreRun);
}
return namespace + "_raw" + "_".repeat(longestUnderscoreRun + 1) + "stream_" + name;
}
// Taken from StandardNameTransformer
function convertStreamName(str) {
return str.normalize('NFKD')
.replaceAll(/\p{M}/gu, "")
.replaceAll(/\s+/g, "_")
.replaceAll(/[^A-Za-z0-9_]/g, "_");
}
export const BigQueryMigrationGenerator = () => {
// See BigQuerySQLNameTransformer
function bigqueryConvertStreamName(str) {
str = convertStreamName(str);
if (str.charAt(0).match(/[A-Za-z_]/)) {
return str;
} else {
return "_" + str;
}
}
function escapeNamespace(namespace) {
namespace = convertStreamName(namespace);
if (!namespace.charAt(0).match(/[A-Za-z0-9]/)) {
namespace = "n" + namespace;
}
return namespace;
}
function generateSql(og_namespace, new_namespace, name, raw_dataset) {
let v2RawTableName = '`' + bigqueryConvertStreamName(concatenateRawTableName(new_namespace, name)) + '`';
let v1namespace = '`' + escapeNamespace(og_namespace) + '`';
let v1name = '`' + bigqueryConvertStreamName("_airbyte_raw_" + name) + '`';
return `CREATE SCHEMA IF NOT EXISTS ${raw_dataset};
CREATE OR REPLACE TABLE \`${raw_dataset}\`.${v2RawTableName} (
_airbyte_raw_id STRING,
_airbyte_extracted_at TIMESTAMP,
_airbyte_loaded_at TIMESTAMP,
_airbyte_data JSON)
PARTITION BY DATE(_airbyte_extracted_at)
CLUSTER BY _airbyte_extracted_at
AS (
SELECT
_airbyte_ab_id AS _airbyte_raw_id,
_airbyte_emitted_at AS _airbyte_extracted_at,
CAST(NULL AS TIMESTAMP) AS _airbyte_loaded_at,
PARSE_JSON(_airbyte_data) AS _airbyte_data
FROM ${v1namespace}.${v1name}
)`;
}
return (
<MigrationGenerator destination="bigquery" generateSql={generateSql}/>
);
}
export const SnowflakeMigrationGenerator = () => {
// See SnowflakeSQLNameTransformer
function snowflakeConvertStreamName(str) {
str = convertStreamName(str);
if (str.charAt(0).match(/[A-Za-z_]/)) {
return str;
} else {
return "_" + str;
}
}
function generateSql(og_namespace, new_namespace, name, raw_schema) {
let v2RawTableName = '"' + concatenateRawTableName(new_namespace, name) + '"';
let v1namespace = snowflakeConvertStreamName(og_namespace);
let v1name = snowflakeConvertStreamName("_airbyte_raw_" + name);
return `CREATE SCHEMA IF NOT EXISTS "${raw_schema}";
CREATE OR REPLACE TABLE "${raw_schema}".${v2RawTableName} (
"_airbyte_raw_id" STRING NOT NULL PRIMARY KEY,
"_airbyte_extracted_at" TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP(),
"_airbyte_loaded_at" TIMESTAMP_TZ,
"_airbyte_data" VARIANT)
AS (
SELECT
_airbyte_ab_id AS "_airbyte_raw_id",
_airbyte_emitted_at AS "_airbyte_extracted_at",
CAST(NULL AS TIMESTAMP_TZ) AS "_airbyte_loaded_at",
_airbyte_data AS "_airbyte_data"
FROM ${v1namespace}.${v1name}
)`;
}
return (
<MigrationGenerator destination="snowflake" generateSql={generateSql}/>
);
}
export const MigrationGenerator = ({destination, generateSql}) => {
const defaultMessage =
`Enter your stream's name and namespace to see the SQL output.
If your stream has no namespace, take the default value from the destination connector's settings.`;
const [message, updateMessage] = useState({
'message': defaultMessage,
'language': 'text'
});
function updateSql(event) {
let og_namespace = document.getElementById("og_stream_namespace_" + destination).value;
let new_namespace = document.getElementById("new_stream_namespace_" + destination).value;
let name = document.getElementById("stream_name_" + destination).value;
var raw_dataset = document.getElementById("raw_dataset_" + destination).value;
if (raw_dataset === '') {
raw_dataset = 'airbyte_internal';
}
let sql = generateSql(og_namespace, new_namespace, name, raw_dataset);
if ([og_namespace, new_namespace, name].every(text => text != "")) {
updateMessage({
'message': sql,
'language': 'sql'
});
} else {
updateMessage({
'message': defaultMessage,
'language': 'text'
});
}
}
return (
<div>
<label>Original Stream namespace </label>
<input type="text" id={"og_stream_namespace_" + destination} onChange={ updateSql }/><br/>
<label>New Stream namespace (to avoid overwriting)</label>
<input type="text" id={"new_stream_namespace_" + destination} onChange={ updateSql }/><br/>
<label>Stream name </label>
<input type="text" id={"stream_name_" + destination} onChange={ updateSql }/><br/>
<label>Raw table dataset/schema (defaults to <code>airbyte_internal</code>) </label>
<input type="text" id={"raw_dataset_" + destination} onChange={ updateSql }/><br/>
<CodeBlock id={ "sql_output_block_" + destination } language={ message['language'] }>
{ message['message'] }
</CodeBlock>
</div>
);
}