import React, { useState } from "react";
import CodeBlock from "@theme/CodeBlock";
function concatenateRawTableName(namespace, name) {
let plainConcat = namespace + name;
// Pretend we always have at least one underscore, so that we never generate `_raw_stream_`
let longestUnderscoreRun = 1;
for (let i = 0; i < plainConcat.length; i++) {
// If we've found an underscore, count the number of consecutive underscores
let underscoreRun = 0;
while (i < plainConcat.length && plainConcat.charAt(i) === "_") {
underscoreRun++;
i++;
}
longestUnderscoreRun = Math.max(longestUnderscoreRun, underscoreRun);
}
return (
namespace + "_raw" + "_".repeat(longestUnderscoreRun + 1) + "stream_" + name
);
}
// Taken from StandardNameTransformer
function convertStreamName(str) {
return str
.normalize("NFKD")
.replaceAll(/\p{M}/gu, "")
.replaceAll(/\s+/g, "_")
.replaceAll(/[^A-Za-z0-9_]/g, "_");
}
export const BigQueryMigrationGenerator = () => {
// See BigQuerySQLNameTransformer
function bigqueryConvertStreamName(str) {
str = convertStreamName(str);
if (str.charAt(0).match(/[A-Za-z_]/)) {
return str;
} else {
return "_" + str;
}
}
function escapeNamespace(namespace) {
namespace = convertStreamName(namespace);
if (!namespace.charAt(0).match(/[A-Za-z0-9]/)) {
namespace = "n" + namespace;
}
return namespace;
}
function generateSql(og_namespace, new_namespace, name, raw_dataset) {
let v2RawTableName =
"`" +
bigqueryConvertStreamName(concatenateRawTableName(new_namespace, name)) +
"`";
let v1namespace = "`" + escapeNamespace(og_namespace) + "`";
let v1name = "`" + bigqueryConvertStreamName("_airbyte_raw_" + name) + "`";
return `CREATE SCHEMA IF NOT EXISTS ${raw_dataset};
CREATE OR REPLACE TABLE \`${raw_dataset}\`.${v2RawTableName} (
_airbyte_raw_id STRING,
_airbyte_extracted_at TIMESTAMP,
_airbyte_loaded_at TIMESTAMP,
_airbyte_data JSON)
PARTITION BY DATE(_airbyte_extracted_at)
CLUSTER BY _airbyte_extracted_at
AS (
SELECT
_airbyte_ab_id AS _airbyte_raw_id,
_airbyte_emitted_at AS _airbyte_extracted_at,
CAST(NULL AS TIMESTAMP) AS _airbyte_loaded_at,
PARSE_JSON(_airbyte_data) AS _airbyte_data
FROM ${v1namespace}.${v1name}
)`;
}
return (
);
};
export const SnowflakeMigrationGenerator = () => {
// See SnowflakeSQLNameTransformer
function snowflakeConvertStreamName(str) {
str = convertStreamName(str);
if (str.charAt(0).match(/[A-Za-z_]/)) {
return str;
} else {
return "_" + str;
}
}
function generateSql(og_namespace, new_namespace, name, raw_schema) {
let v2RawTableName =
'"' + concatenateRawTableName(new_namespace, name) + '"';
let v1namespace = snowflakeConvertStreamName(og_namespace);
let v1name = snowflakeConvertStreamName("_airbyte_raw_" + name);
return `CREATE SCHEMA IF NOT EXISTS "${raw_schema}";
CREATE OR REPLACE TABLE "${raw_schema}".${v2RawTableName} (
"_airbyte_raw_id" STRING NOT NULL PRIMARY KEY,
"_airbyte_extracted_at" TIMESTAMP_TZ DEFAULT CURRENT_TIMESTAMP(),
"_airbyte_loaded_at" TIMESTAMP_TZ,
"_airbyte_data" VARIANT)
AS (
SELECT
_airbyte_ab_id AS "_airbyte_raw_id",
_airbyte_emitted_at AS "_airbyte_extracted_at",
CAST(NULL AS TIMESTAMP_TZ) AS "_airbyte_loaded_at",
_airbyte_data AS "_airbyte_data"
FROM ${v1namespace}.${v1name}
)`;
}
return (
);
};
export const RedshiftMigrationGenerator = () => {
// See RedshiftSQLNameTransformer
function redshiftConvertStreamName(str) {
str = convertStreamName(str);
if (str.charAt(0).match(/[A-Za-z_]/)) {
return str;
} else {
return "_" + str;
}
}
function generateSql(og_namespace, new_namespace, name, raw_schema) {
let v2RawTableName =
'"' + concatenateRawTableName(new_namespace, name) + '"';
let v1namespace = redshiftConvertStreamName(og_namespace);
let v1name = redshiftConvertStreamName("_airbyte_raw_" + name);
return `CREATE SCHEMA IF NOT EXISTS "${raw_schema}";
DROP TABLE IF EXISTS "${raw_schema}".${v2RawTableName};
CREATE TABLE "${raw_schema}".${v2RawTableName} (
"_airbyte_raw_id" VARCHAR(36) NOT NULL PRIMARY KEY
, "_airbyte_extracted_at" TIMESTAMPTZ DEFAULT NOW()
, "_airbyte_loaded_at" TIMESTAMPTZ
, "_airbyte_data" SUPER
);
INSERT INTO "${raw_schema}".${v2RawTableName} (
SELECT
_airbyte_ab_id AS "_airbyte_raw_id",
_airbyte_emitted_at AS "_airbyte_extracted_at",
CAST(NULL AS TIMESTAMPTZ) AS "_airbyte_loaded_at",
_airbyte_data AS "_airbyte_data"
FROM ${v1namespace}.${v1name}
);`;
}
return (
);
};
export const PostgresMigrationGenerator = () => {
// StandardNameTransformer + identifier should start with a letter or an underscore
function postgresConvertStreamName(str) {
str = convertStreamName(str);
if (str.charAt(0).match(/[A-Za-z_]/)) {
return str;
} else {
return "_" + str;
}
}
function generateSql(og_namespace, new_namespace, name, raw_schema) {
let v2RawTableName =
concatenateRawTableName(new_namespace, name).toLowerCase();
let v1namespace = postgresConvertStreamName(og_namespace);
let v1name = postgresConvertStreamName("_airbyte_raw_" + name).toLowerCase();
return `CREATE SCHEMA IF NOT EXISTS "${raw_schema}";
DROP TABLE IF EXISTS "${raw_schema}".${v2RawTableName};
CREATE TABLE "${raw_schema}".${v2RawTableName} (
"_airbyte_raw_id" VARCHAR(36) NOT NULL PRIMARY KEY
, "_airbyte_extracted_at" TIMESTAMP WITH TIME ZONE DEFAULT CURRENT_TIMESTAMP
, "_airbyte_loaded_at" TIMESTAMP WITH TIME ZONE DEFAULT NULL
, "_airbyte_data" JSONB
);
INSERT INTO "${raw_schema}".${v2RawTableName} (
SELECT
_airbyte_ab_id AS "_airbyte_raw_id",
_airbyte_emitted_at AS "_airbyte_extracted_at",
CAST(NULL AS TIMESTAMP WITH TIME ZONE) AS "_airbyte_loaded_at",
_airbyte_data AS "_airbyte_data"
FROM ${v1namespace}.${v1name}
);`;
}
return (
);
};
export const MigrationGenerator = ({ destination, generateSql }) => {
const defaultMessage = `Enter your stream's name and namespace to see the SQL output.
If your stream has no namespace, take the default value from the destination connector's settings.`;
const [message, updateMessage] = useState({
message: defaultMessage,
language: "text",
});
function updateSql(event) {
let og_namespace = document.getElementById(
"og_stream_namespace_" + destination
).value;
let new_namespace = document.getElementById(
"new_stream_namespace_" + destination
).value;
let name = document.getElementById("stream_name_" + destination).value;
var raw_dataset = document.getElementById(
"raw_dataset_" + destination
).value;
if (raw_dataset === "") {
raw_dataset = "airbyte_internal";
}
let sql = generateSql(og_namespace, new_namespace, name, raw_dataset);
if ([og_namespace, new_namespace, name].every((text) => text != "")) {
updateMessage({
message: sql,
language: "sql",
});
} else {
updateMessage({
message: defaultMessage,
language: "text",
});
}
}
return (