1
0
mirror of synced 2025-12-19 18:14:56 -05:00

java CDK: hoist top-level gradle projects into CDK (#31960)

Co-authored-by: postamar <postamar@users.noreply.github.com>
This commit is contained in:
Marius Posta
2023-10-30 12:03:06 -07:00
committed by GitHub
parent 96954c7cad
commit 7cd8020ac8
503 changed files with 309 additions and 1715 deletions

2
.github/CODEOWNERS vendored
View File

@@ -9,7 +9,7 @@
# CDK and Connector Acceptance Tests
/airbyte-cdk/python @airbytehq/connector-extensibility
/airbyte-integrations/connector-templates/ @airbytehq/connector-extensibility
/airbyte-integrations/bases/connector-acceptance-tests/ @airbytehq/connector-operations
/airbyte-integrations/bases/connector-acceptance-test/ @airbytehq/connector-operations
# Protocol related items
/docs/understanding-airbyte/airbyte-protocol.md @airbytehq/protocol-reviewers

4
.gitignore vendored
View File

@@ -70,8 +70,8 @@ docs/SUMMARY.md
**/specs_secrets_mask.yaml
# Files generated when downloading connector registry
airbyte-config-oss/**/seed/oss_registry.json
airbyte-config-oss/**/seed/oss_catalog.json
airbyte-cdk/java/airbyte-cdk/init-oss/src/main/resources/seed/oss_registry.json
airbyte-cdk/java/airbyte-cdk/init-oss/src/main/resources/seed/oss_catalog.json
# Output Files generated by scripts
lowcode_connector_names.txt

View File

@@ -1,25 +0,0 @@
#!/bin/bash
# This script is used to rebuild the java-cdk-010-d2 branch from scratch.
# TODO: Delete this script once the migration is complete.
# Check with the use to confirm the action
read -p "This script will delete and recreate the java-cdk-010-d2 branch. You should merge in the latest from from master before running this. Are you sure? (y/n) " -n 1 -r
# Switch to the base branch
git checkout java-cdk-010-d
# Make sure the base branch is up-to-date
# git merge origin/master
# Delete and recreate the old branch locally
git branch -D java-cdk-010-d2
git checkout -b java-cdk-010-d2
# Run the migration script
python ./airbyte-cdk/java/airbyte-cdk/_temp_migration_script.py
# Stage the changes and commit
git add .
git commit -m "commit the migration script result"
# Force push to the origin (this PR)
git push -uf origin java-cdk-010-d2
# git cherry-pick c125f14cdce8be61da85ebedffd3c4a576dc1fc5 # delete dead code (incorrectly annotated as override)
# git cherry-pick 89d81ec62fcfa44fff36196e2d34571b46593818 # disable GlobalStateManagerTest.testToState

View File

@@ -1,341 +0,0 @@
#!/usr/bin/env python3
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
"""Migration script. TODO: Delete this script once the migration is complete.
Usage:
python3 ./airbyte-cdk/java/airbyte-cdk/_temp_migration_script.py
python3 ./airbyte-cdk/java/airbyte-cdk/_temp_migration_script.py test
"""
import os
import re
import shutil
import sys
from pathlib import Path
REPO_ROOT = "."
CDK_ROOT = f"{REPO_ROOT}/airbyte-cdk/java/airbyte-cdk"
EXCLUDE_DIRS = [
"target",
"out",
"build",
"dist",
".git",
"docs",
".venv",
"sample_files",
"node_modules",
"lib",
"bin",
"__pycache__",
".gradle",
".symlinks",
]
EXCLUDE_FILES = [
"pom\.xml",
"README\.md",
"LICENSE",
"build",
".coverage\..*",
".*\.zip",
".*\.gz",
"_temp_.*",
".*\.dat",
".*\.bin",
".*\.csv",
".*\.jsonl",
".*\.png",
".*\.db",
".*\.pyc",
".*\.jar",
".*\.archive",
".*\.coverage",
]
CORE_FEATURE = "core"
DB_SOURCES_FEATURE = "db-sources"
DB_DESTINATIONS_FEATURE = "db-destinations"
MAIN_PACKAGES = {
CORE_FEATURE: [
"airbyte-db/db-lib", # Jooq is fragile and reliant on manual code generation steps
"airbyte-integrations/bases/base-java",
"airbyte-integrations/bases/base-java-s3",
],
DB_SOURCES_FEATURE: [
"airbyte-integrations/bases/debezium",
"airbyte-integrations/connectors/source-jdbc",
"airbyte-integrations/connectors/source-relational-db",
],
DB_DESTINATIONS_FEATURE: [
"airbyte-integrations/bases/bases-destination-jdbc",
# "airbyte-integrations/bases/base-typing-deduping", # Excluded by request
],
}
TEST_FIXTURE_PACKAGES = {
CORE_FEATURE: [],
DB_SOURCES_FEATURE: [
"airbyte-test-utils",
"airbyte-integrations/bases/base-standard-source-test-file",
"airbyte-integrations/bases/standard-source-test",
],
DB_DESTINATIONS_FEATURE: [
# "airbyte-integrations/bases/base-typing-deduping-test", # Excluded by request
"airbyte-integrations/bases/s3-destination-base-integration-test",
"airbyte-integrations/bases/standard-destination-test",
],
}
TEST_CMDS = [
# These should pass:
# f"{REPO_ROOT}/./gradlew :airbyte-cdk:java:airbyte-cdk:build",
# f"{REPO_ROOT}/./gradlew :airbyte-integrations:connectors:source-postgres:test --fail-fast",
# f"{REPO_ROOT}/./gradlew :airbyte-integrations:connectors:source-bigquery:test --fail-fast",
# f"{REPO_ROOT}/./gradlew :airbyte-integrations:connectors:destination-bigquery:test --fail-fast",
# f"{REPO_ROOT}/./gradlew :airbyte-integrations:connectors:destination-snowflake:test --fail-fast",
# f"{REPO_ROOT}/./gradlew :airbyte-integrations:connectors:destination-gcs:test --fail-fast",
# f"{REPO_ROOT}/./gradlew :airbyte-integrations:bases:base-typing-deduping:build",
# f"{REPO_ROOT}/./gradlew :airbyte-integrations:bases:base-typing-deduping-test:build",
# Working on:
# Failing:
f"{REPO_ROOT}/./gradlew :airbyte-integrations:connectors:destination-postgres:test --fail-fast", # Needs cdk plugin and extension settings.
f"{REPO_ROOT}/./gradlew :airbyte-cdk:java:airbyte-cdk:integrationTest", # Missing image for source-jdbc
f"{REPO_ROOT}/./gradlew :airbyte-integrations:connectors:source-postgres:integrationTestJava", # org.testcontainers.containers.ContainerLaunchException: Container startup failed for image postgres:13-alpine
# "java.io.StreamCorruptedException: Overriding the global section with a specific one at line 3: Host *":
f"{REPO_ROOT}/./gradlew :airbyte-integrations:connectors:source-postgres:integrationTestJava --tests=SshKeyPostgresSourceAcceptanceTest.testEntrypointEnvVar",
# SshKeyPostgresSourceAcceptanceTest.testIdenticalFullRefreshes
# SshKeyPostgresSourceAcceptanceTest.testIncrementalSyncWithState
# SshPasswordPostgresSourceAcceptanceTest.testEntrypointEnvVar
# SshPasswordPostgresSourceAcceptanceTest.testIdenticalFullRefreshes
# SshPasswordPostgresSourceAcceptanceTest.testIncrementalSyncWithState
]
def move_files(source_dir, dest_dir, path_desc):
if os.path.isdir(source_dir):
print(f"Moving '{path_desc}' files (ignoring existing)...\n" f" - From: {source_dir}\n" f" - To: {dest_dir}")
os.makedirs(dest_dir, exist_ok=True)
for root, dirs, files in os.walk(source_dir):
for file in files:
src_file = os.path.join(root, file)
sub_dir = os.path.relpath(root, source_dir)
dst_file = os.path.join(dest_dir, sub_dir, file)
os.makedirs(os.path.dirname(dst_file), exist_ok=True)
shutil.move(src_file, dst_file)
else:
pass
# print(f"The source directory does not exist: {source_dir} ('{path_desc}')")
def remove_empty_dirs(root_dir):
for root, dirs, files in os.walk(root_dir, topdown=False):
for dir in dirs:
path = os.path.join(root, dir)
if not os.listdir(path):
os.rmdir(path)
def list_remnant_files(from_dir: str):
# List remnant files in the OLD_PACKAGE_ROOT
print(f"Files remaining in {from_dir}:")
for root, dirs, files in os.walk(from_dir):
for f in files:
print(os.path.join(root, f))
def move_package(old_package_root: str, feature_name: str, as_test_fixture: bool):
# Define source and destination directories
old_main_path = os.path.join(old_package_root, "src/main/java/io/airbyte")
old_test_path = os.path.join(old_package_root, "src/test/java/io/airbyte")
old_integtest_path = os.path.join(old_package_root, "src/test-integration/java/io/airbyte")
old_perftest_path = os.path.join(old_package_root, "src/test-performance/java/io/airbyte")
old_testfixture_path = os.path.join(old_package_root, "src/testfixtures/java/io/airbyte")
old_main_resources_path = os.path.join(old_package_root, "src/main/resources")
old_test_resources_path = os.path.join(old_package_root, "src/test/resources")
old_integtest_resources_path = os.path.join(old_package_root, "src/test-integration/resources")
old_perftest_resources_path = os.path.join(old_package_root, "src/test-performance/resources")
old_testfixture_resources_path = os.path.join(old_package_root, "src/testfixtures/resources")
dest_main_path = os.path.join(CDK_ROOT, feature_name, "src/main/java/io/airbyte/cdk")
dest_test_path = os.path.join(CDK_ROOT, feature_name, "src/test/java/io/airbyte/cdk")
dest_integtest_path = os.path.join(CDK_ROOT, feature_name, "src/test-integration/java/io/airbyte/cdk")
dest_perftest_path = os.path.join(CDK_ROOT, feature_name, "src/test-performance/java/io/airbyte/cdk")
dest_testfixture_path = os.path.join(CDK_ROOT, feature_name, "src/testFixtures/java/io/airbyte/cdk")
old_project_name = str(Path(old_package_root).parts[-1])
remnants_archive_path = os.path.join(CDK_ROOT, "archive", old_project_name)
dest_main_resources_path = os.path.join(CDK_ROOT, feature_name, "src/main/resources")
dest_test_resources_path = os.path.join(CDK_ROOT, feature_name, "src/test/resources")
dest_integtest_resources_path = os.path.join(CDK_ROOT, feature_name, "src/test-integration/resources")
dest_perftest_resources_path = os.path.join(CDK_ROOT, feature_name, "src/test-performance/resources")
dest_testfixture_resources_path = os.path.join(CDK_ROOT, feature_name, "src/testFixtures/resources")
if as_test_fixture:
# Move the test project's 'main' files to the test fixtures directory
dest_main_path = dest_testfixture_path
dest_main_resources_path = dest_testfixture_resources_path
# Define source and destination directories as list of tuples
paths = [
("main classes", old_main_path, dest_main_path),
("main test classes", old_test_path, dest_test_path),
("integ test classes", old_integtest_path, dest_integtest_path),
("perf test classes", old_perftest_path, dest_perftest_path),
("test fixtures", old_testfixture_path, dest_testfixture_path),
("main resources", old_main_resources_path, dest_main_resources_path),
("test resources", old_test_resources_path, dest_test_resources_path),
("integ test resources", old_integtest_resources_path, dest_integtest_resources_path),
("perf test resources", old_perftest_resources_path, dest_perftest_resources_path),
("test fixtures resources", old_testfixture_resources_path, dest_testfixture_resources_path),
("remnants to archive", old_package_root, remnants_archive_path),
]
for path_desc, source_dir, dest_dir in paths:
move_files(source_dir, dest_dir, path_desc)
remove_empty_dirs(old_package_root)
def migrate_package_refs(
text_pattern: str,
text_replacement: str,
within_dir: str,
exclude_files: list,
exclude_dirs: list,
):
"""
Migrates a Java package to the new CDK package structure.
Args:
package_root (str): The root directory of the package to migrate.
exclude_files (list): A list of file patterns to exclude from the migration.
exclude_dirs (list): A list of directory patterns to exclude from the migration.
Returns:
None
"""
# Define the files to exclude from the search
exclude_files_pattern = "|".join(exclude_files)
exclude_files_regex = re.compile(exclude_files_pattern)
# Walk the directory tree and perform the find and replace operation on each file
for root, dirs, files in os.walk(within_dir):
# Exclude files that match the exclude_files pattern
files = [f for f in files if not exclude_files_regex.match(f)]
for file in files:
file_path = os.path.join(root, file)
if any([exclude_dir in file_path.split("/") for exclude_dir in exclude_dirs]):
continue
# print("Scanning file: ", file_path)
# Exclude files that match the exclude_files pattern
if exclude_files_regex.match(file):
continue
# Read the file contents
with open(file_path, "r") as f:
try:
contents = f.read()
except UnicodeDecodeError:
print(f"Skipping file {file_path} due to UnicodeDecodeError")
continue
# Perform the find and replace operation
new_contents = re.sub(text_pattern, text_replacement, contents)
# Write back the file if it has changed
if contents != new_contents:
# Write the updated contents back to the file
with open(file_path, "w") as f:
f.write(new_contents)
# else:
# print(f"No files found to scan within {within_dir}")
def update_cdk_package_defs() -> None:
"""Within CDK_ROOT, packages should be declared as 'package io.airbyte.cdk...'"""
migrate_package_refs(
text_pattern=r"package io\.airbyte\.(?!cdk\.)(?!cdk$)",
text_replacement=r"package io.airbyte.cdk.",
within_dir=CDK_ROOT,
exclude_files=EXCLUDE_FILES,
exclude_dirs=EXCLUDE_DIRS,
)
# Undo any dupes if they exist.
migrate_package_refs(
text_pattern=r"package io\.airbyte\.cdk\.cdk",
text_replacement=r"package io.airbyte.cdk",
within_dir=CDK_ROOT,
exclude_files=EXCLUDE_FILES,
exclude_dirs=EXCLUDE_DIRS,
)
def refactor_cdk_package_refs() -> None:
for text_pattern, text_replacement, within_dir in [
(
r"(?<!package )io\.airbyte\.(db)",
r"io.airbyte.cdk.\1",
REPO_ROOT,
),
(
r"(?<!package )io\.airbyte\.(?!.*typing_deduping)(integrations\.base|integrations\.debezium|integrations\.standardtest|integrations\.destination\.NamingConventionTransformer|integrations\.destination\.StandardNameTransformer|integrations\.destination\.jdbc|integrations\.destination\.record_buffer|integrations\.destination\.normalization|integrations\.destination\.buffered_stream_consumer|integrations\.destination\.dest_state_lifecycle_manager|integrations\.destination\.staging|integrations\.destination_async|integrations\.source\.jdbc|integrations\.source\.relationaldb|integrations\.util|integrations\.BaseConnector|test\.utils)",
r"io.airbyte.cdk.\1",
REPO_ROOT,
),
(
r"(?<!package )io\.airbyte\.integrations\.destination\.s3\.(avro|constant|credential|csv|jsonl|parquet|S3BaseChecks|S3ConsumerFactory|S3DestinationConfig|S3DestinationConstants|S3Format|S3FormatConfig|S3FormatConfigs|SerializedBufferFactory|StorageProvider|S3StorageOperations|template|util|writer)\b",
r"io.airbyte.cdk.integrations.destination.s3.\1",
REPO_ROOT,
),
]:
migrate_package_refs(
text_pattern,
text_replacement,
within_dir=within_dir,
exclude_files=EXCLUDE_FILES,
exclude_dirs=EXCLUDE_DIRS,
)
def main() -> None:
# Remove empty directories in CDK_ROOT
remove_empty_dirs(CDK_ROOT)
if len(sys.argv) > 1:
if sys.argv[1] == "test":
for cmd in TEST_CMDS:
print(f"Running test command: {cmd}")
exit_code = os.system(cmd)
if exit_code != 0:
print(f"Error running command: {cmd}")
sys.exit(exit_code)
return
else:
raise ValueError(f"Unknown argument: {sys.argv[1]}")
for feature_name in MAIN_PACKAGES.keys():
paths_to_migrate = MAIN_PACKAGES[feature_name] + TEST_FIXTURE_PACKAGES[feature_name]
for old_package_root in paths_to_migrate:
# Remove empty directories in the OLD_PACKAGE_ROOT
as_test_fixture = old_package_root in TEST_FIXTURE_PACKAGES[feature_name]
move_package(old_package_root, feature_name, as_test_fixture)
remove_empty_dirs(old_package_root)
update_cdk_package_defs()
refactor_cdk_package_refs()
# Move the base-java folder back, as base docker image definition for java connectors:
move_files(
source_dir="airbyte-cdk/java/airbyte-cdk/archive/base-java",
dest_dir="airbyte-integrations/bases/base-java",
path_desc="base java dockerfile definitions (moving back)",
)
print("Migration operation complete!")
if __name__ == "__main__":
main()

View File

@@ -9,6 +9,7 @@ dependencies {
implementation platform(libs.micronaut.bom)
implementation libs.bundles.micronaut
implementation group: 'joda-time', name: 'joda-time', version: '2.12.5'
implementation 'io.fabric8:kubernetes-client:5.12.2'
implementation 'com.auth0:java-jwt:3.19.2'
implementation libs.guava
@@ -19,12 +20,11 @@ dependencies {
implementation 'org.apache.commons:commons-text:1.10.0'
implementation libs.bundles.datadog
implementation project(':airbyte-api')
implementation project(':airbyte-commons')
implementation project(':airbyte-commons-protocol')
implementation project(':airbyte-config-oss:config-models-oss')
implementation project(':airbyte-json-validation')
implementation libs.airbyte.protocol
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-api')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons-protocol')
implementation project(':airbyte-cdk:java:airbyte-cdk:config-models-oss')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
testAnnotationProcessor platform(libs.micronaut.bom)
testAnnotationProcessor libs.bundles.micronaut.test.annotation.processor

View File

@@ -14,7 +14,6 @@ import com.google.common.base.Strings;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.configoss.OperatorDbt;
@@ -37,7 +36,7 @@ public class DbtTransformationRunner implements AutoCloseable {
private static final Logger LOGGER = LoggerFactory.getLogger(DbtTransformationRunner.class);
private static final String DBT_ENTRYPOINT_SH = "entrypoint.sh";
private static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
private static final Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("dbt")
.setPrefixColor(Color.PURPLE_BACKGROUND);

View File

@@ -49,8 +49,8 @@ public class CatalogClientConverters {
}
@SuppressWarnings("PMD.AvoidLiteralsInIfCondition")
private static io.airbyte.protocol.models.AirbyteStream toConfiguredProtocol(final io.airbyte.api.client.model.generated.AirbyteStream stream,
AirbyteStreamConfiguration config)
private static AirbyteStream toConfiguredProtocol(final io.airbyte.api.client.model.generated.AirbyteStream stream,
AirbyteStreamConfiguration config)
throws JsonValidationException {
if (config.getFieldSelectionEnabled() != null && config.getFieldSelectionEnabled()) {
// Validate the selected field paths.
@@ -95,7 +95,7 @@ public class CatalogClientConverters {
}
((ObjectNode) properties).retain(selectedFieldNames);
}
return new io.airbyte.protocol.models.AirbyteStream()
return new AirbyteStream()
.withName(stream.getName())
.withJsonSchema(stream.getJsonSchema())
.withSupportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(), io.airbyte.protocol.models.SyncMode.class))
@@ -121,20 +121,20 @@ public class CatalogClientConverters {
.collect(Collectors.toList()));
}
private static io.airbyte.api.client.model.generated.AirbyteStreamConfiguration generateDefaultConfiguration(
final io.airbyte.api.client.model.generated.AirbyteStream stream) {
final io.airbyte.api.client.model.generated.AirbyteStreamConfiguration result =
new io.airbyte.api.client.model.generated.AirbyteStreamConfiguration()
private static AirbyteStreamConfiguration generateDefaultConfiguration(
final io.airbyte.api.client.model.generated.AirbyteStream stream) {
final AirbyteStreamConfiguration result =
new AirbyteStreamConfiguration()
.aliasName(Names.toAlphanumericAndUnderscore(stream.getName()))
.cursorField(stream.getDefaultCursorField())
.destinationSyncMode(io.airbyte.api.client.model.generated.DestinationSyncMode.APPEND)
.destinationSyncMode(DestinationSyncMode.APPEND)
.primaryKey(stream.getSourceDefinedPrimaryKey())
.selected(true);
if (stream.getSupportedSyncModes().size() > 0) {
result.setSyncMode(Enums.convertTo(stream.getSupportedSyncModes().get(0),
io.airbyte.api.client.model.generated.SyncMode.class));
SyncMode.class));
} else {
result.setSyncMode(io.airbyte.api.client.model.generated.SyncMode.INCREMENTAL);
result.setSyncMode(SyncMode.INCREMENTAL);
}
return result;
}
@@ -145,7 +145,7 @@ public class CatalogClientConverters {
.name(stream.getName())
.jsonSchema(stream.getJsonSchema())
.supportedSyncModes(Enums.convertListTo(stream.getSupportedSyncModes(),
io.airbyte.api.client.model.generated.SyncMode.class))
SyncMode.class))
.sourceDefinedCursor(stream.getSourceDefinedCursor())
.defaultCursorField(stream.getDefaultCursorField())
.sourceDefinedPrimaryKey(stream.getSourceDefinedPrimaryKey())

View File

@@ -64,7 +64,7 @@ public class FailureHelper {
} else {
try {
final String traceMessageError = m.getError().getFailureType().toString();
failureType = FailureReason.FailureType.fromValue(traceMessageError);
failureType = FailureType.fromValue(traceMessageError);
} catch (final IllegalArgumentException e) {
// the trace message error does not exist as a FailureReason failure type,
// so set the failure type to null
@@ -125,10 +125,10 @@ public class FailureHelper {
public static FailureReason checkFailure(final Throwable t,
final Long jobId,
final Integer attemptNumber,
final FailureReason.FailureOrigin origin) {
final FailureOrigin origin) {
return connectorCommandFailure(t, jobId, attemptNumber, ConnectorCommand.CHECK)
.withFailureOrigin(origin)
.withFailureType(FailureReason.FailureType.CONFIG_ERROR)
.withFailureType(FailureType.CONFIG_ERROR)
.withRetryable(false)
.withExternalMessage(String
.format("Checking %s connection failed - please review this connection's configuration to prevent future syncs from failing", origin));

View File

@@ -10,7 +10,6 @@ import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.protocol.DefaultProtocolSerializer;
import io.airbyte.commons.protocol.ProtocolSerializer;
@@ -38,7 +37,7 @@ import org.slf4j.LoggerFactory;
public class DefaultAirbyteDestination implements AirbyteDestination {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultAirbyteDestination.class);
public static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
public static final Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("destination")
.setPrefixColor(Color.YELLOW_BACKGROUND);
static final Set<Integer> IGNORED_EXIT_CODES = Set.of(

View File

@@ -11,7 +11,6 @@ import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.commons.protocol.DefaultProtocolSerializer;
import io.airbyte.commons.protocol.ProtocolSerializer;
@@ -44,7 +43,7 @@ public class DefaultAirbyteSource implements AirbyteSource {
143 // SIGTERM
);
public static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
public static final Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("source")
.setPrefixColor(Color.BLUE_BACKGROUND);

View File

@@ -16,7 +16,6 @@ import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.logging.LoggingHelper.Color;
import io.airbyte.commons.logging.MdcScope;
import io.airbyte.commons.logging.MdcScope.Builder;
import io.airbyte.configoss.OperatorDbt;
import io.airbyte.configoss.ResourceRequirements;
@@ -44,7 +43,7 @@ import org.slf4j.LoggerFactory;
public class DefaultNormalizationRunner implements NormalizationRunner {
private static final Logger LOGGER = LoggerFactory.getLogger(DefaultNormalizationRunner.class);
private static final MdcScope.Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
private static final Builder CONTAINER_LOG_MDC_BUILDER = new Builder()
.setLogPrefix("normalization")
.setPrefixColor(Color.GREEN_BACKGROUND);

View File

@@ -34,7 +34,7 @@ public class AirbyteMessageUtils {
final Instant timeExtracted) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withData(record)
.withStream(tableName)
@@ -45,7 +45,7 @@ public class AirbyteMessageUtils {
final String message) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.LOG)
.withType(Type.LOG)
.withLog(new AirbyteLogMessage()
.withLevel(level)
.withMessage(message));
@@ -70,13 +70,13 @@ public class AirbyteMessageUtils {
public static AirbyteMessage createRecordMessage(final String streamName, final int recordData) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.RECORD)
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage().withStream(streamName).withData(Jsons.jsonNode(recordData)));
}
public static AirbyteMessage createStateMessage(final int stateData) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.STATE)
.withType(Type.STATE)
.withState(new AirbyteStateMessage().withData(Jsons.jsonNode(stateData)));
}
@@ -139,7 +139,7 @@ public class AirbyteMessageUtils {
public static AirbyteMessage createErrorMessage(final String message, final Double emittedAt) {
return new AirbyteMessage()
.withType(AirbyteMessage.Type.TRACE)
.withType(Type.TRACE)
.withTrace(createErrorTraceMessage(message, emittedAt));
}
@@ -151,7 +151,7 @@ public class AirbyteMessageUtils {
final Double emittedAt,
final AirbyteErrorTraceMessage.FailureType failureType) {
final var msg = new AirbyteTraceMessage()
.withType(io.airbyte.protocol.models.AirbyteTraceMessage.Type.ERROR)
.withType(AirbyteTraceMessage.Type.ERROR)
.withError(new AirbyteErrorTraceMessage().withMessage(message))
.withEmittedAt(emittedAt);

View File

@@ -5,7 +5,6 @@ dependencies {
implementation libs.bundles.micronaut.annotation
testImplementation libs.bundles.micronaut.test
implementation libs.airbyte.protocol
implementation project(':airbyte-commons')
implementation project(':airbyte-json-validation')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-commons')
implementation project(':airbyte-cdk:java:airbyte-cdk:airbyte-json-validation')
}

Some files were not shown because too many files have changed in this diff Show More