1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Kube Queueing POC (#3464)

* Use CDK to generate source that can be configured to emit a certain number of records and always works.

* Checkpoint: socat works from inside the docker container.

* Override the entry point.

* Clean up and add ReadMe.

* Clean up socat.

* Checkpoint: connect to Kube cluster and list all the pods.

* Checkpoint: Sync worker pod is able to send output to the destination pod.

* Checkpoint: Sync worker creates Dest pod if none existed previously. It also waits for the pod to be ready before doing anything else. Sync worker will also remove the pod on termination.

* update readme

* Checkpoint: Dest pod does nott restart after finishing. Comment out delete command in Sync worker.

* working towards named pipes

* named pipes working

* update readme

* WIP named pipe / socat sidecar kube port forwarding (#3518)

* nearly working sources

* update

* stdin example

* move all kube testing yamls into the airbyte-workers directories. sort the airbyte-workers resource folder; place all the poc yamls together.

* Format.

* Put back the original KubeProcessBuilderFactory.

* Fix slight errors.

* Checkpoint: Worker pod knows its own IP. Successfully starts and writes to Dest pod after refactor.

* remove unused file and update readme

* Dest pod loops back into worker pod. However, the right messages do not seem to be passing in.

* Switch back to worker ip.

* SWEET VICTORY!.

* wrap kube pod in process (#3540)

also clean up kubernetes deploys.

* More clean up. (#3586)

The first 6 points of #3464.

The only interesting thing about this PR is the kube pod shutdown. For whatever reason, the OkHttpPool isn't respecting the evictAll call and 1 idle thread remains. So instead of shutting down immediately, the worker pod shuts down after 5 mins when the idle thread id reaped. There isn't an easy way to modify the pool's idle reap configuration now. I do not think this issue is blocking since it's relatively benign, so I vote we create a ticket and come back to this once we do an e2e test.

* Implements redirecting standard error as well. (#3623)

* Clean up before next implementation.

* kube process launching (#3790)

* processes must handle file mounting

* remove comment

* default to base entrypoint

* use process builder factory / select stdin / use a pool of ports

* fix up

* add super hacky copying example

* Checkpoint: Works end to end!

* Checkpoint: Use API to make sure init container is ready instead of blind sleep. Propagate exception in DefaultCheckConnectionWorker.

* Refactor KubePodProcess. Checked to make sure everything still works.

* Format.

* Clean up code. Begin putting this into variables and breaking up long constructor function.

* Add comments to explain what is happening.

* fix normalization test

* increase timeout for initcontainer

Co-authored-by: Davin Chia <davinchia@gmail.com>

* facepalm moment

* clean up kube poc pr (#3834)

* clean up

* remove source-always-works

* create separate commons-docker

* fix test

* enable kube e2e tests (#3866)

* enable kube e2e tests

* use more generally accepted env definition

* use new runners

* use its own runner and install minikube differently

* update name

* use kubectl alias

* use link instead of alias that doesn't propagate

* start minikube

* use driver=none

* go back to using action

* mess with versions

* revert runner

* install socat

* print logs after run

* also try re-runnining tasks

* always wait for file transfer

* use ports

* increase wait timeout for kube

* use different localhost ips and bump normalization to include an entrypoint

* proposed fix

* all working locally

* revert temporary changes

* revert normalization image change that's happening in a separate pr

* readability

* final comment

* Working Kube Cancel. (#3983)

* Port over the basic changes.

* Add logic to return proper exit code in the event of termination. Add comments to explain why.

* revert envs change and merge master to fix kube acceptance tests (#4012)

* use older env format

* fix build

Co-authored-by: jrhizor <me@jaredrhizor.com>
Co-authored-by: Jared Rhizor <jared@dataline.io>
This commit is contained in:
Davin Chia
2021-06-10 09:12:39 +08:00
committed by GitHub
parent 36488ef282
commit b04c080c95
72 changed files with 1143 additions and 315 deletions

View File

@@ -286,41 +286,37 @@ jobs:
- name: Run End-to-End Frontend Tests
run: ./tools/bin/e2e_test.sh
test_kube:
runs-on: ubuntu-latest
name: Run Acceptance Tests (Kube)
steps:
- name: Checkout Airbyte
uses: actions/checkout@v2
# DISABLED UNTIL WE HAVE TEMPORAL ON KUBE
# test_kube:
# runs-on: ubuntu-latest
# steps:
# - name: Checkout Airbyte
# uses: actions/checkout@v2
#
# - uses: actions/setup-java@v1
# with:
# java-version: '14'
#
# - uses: actions/setup-node@v1
# with:
# node-version: '14.7'
#
# - uses: actions/setup-python@v2
# with:
# python-version: '3.7'
#
# - name: Setup Minikube
# uses: manusa/actions-setup-minikube@v2.3.0
# with:
# minikube version: 'v1.16.0'
# kubernetes version: 'v1.19.2'
#
# - name: Install socat
# run: sudo apt-get install socat
#
# - name: Build Core Docker Images and Run Tests
# run: CORE_ONLY=true ./gradlew --no-daemon composeBuild test -x :airbyte-webapp:test --scan
# env:
# GIT_REVISION: ${{ github.sha }}
# CORE_ONLY: true
#
# - name: Run Kubernetes End-to-End Acceptance Tests
# run: |
# ./tools/bin/acceptance_test_kube.sh
- uses: actions/setup-java@v1
with:
java-version: '14'
- uses: actions/setup-node@v1
with:
node-version: '14.7'
- uses: actions/setup-python@v2
with:
python-version: '3.7'
- name: Setup Minikube
uses: manusa/actions-setup-minikube@v2.4.1
with:
minikube version: 'v1.21.0-beta.0'
kubernetes version: 'v1.20.7'
- name: Install socat (required for port forwarding)
run: sudo apt-get install socat
- name: Build Core Docker Images and Run Tests
run: CORE_ONLY=true ./gradlew --no-daemon build --scan --rerun-tasks
- name: Run Kubernetes End-to-End Acceptance Tests
run: |
IS_MINIKUBE=true ./tools/bin/acceptance_test_kube.sh

View File

@@ -0,0 +1,11 @@
plugins {
id "java-library"
}
dependencies {
implementation 'org.apache.commons:commons-compress:1.20'
implementation 'com.github.docker-java:docker-java:3.2.8'
implementation 'com.github.docker-java:docker-java-transport-httpclient5:3.2.8'
testImplementation 'org.apache.commons:commons-lang3:3.11'
}

View File

@@ -24,10 +24,36 @@
package io.airbyte.commons.docker;
import com.github.dockerjava.api.DockerClient;
import com.github.dockerjava.api.command.BuildImageResultCallback;
import com.github.dockerjava.core.DefaultDockerClientConfig;
import com.github.dockerjava.core.DockerClientConfig;
import com.github.dockerjava.core.DockerClientImpl;
import com.github.dockerjava.httpclient5.ApacheDockerHttpClient;
import com.github.dockerjava.transport.DockerHttpClient;
import java.io.File;
import java.util.Set;
public class DockerUtils {
private static final DockerClientConfig CONFIG = DefaultDockerClientConfig.createDefaultConfigBuilder().build();
private static final DockerHttpClient HTTP_CLIENT = new ApacheDockerHttpClient.Builder()
.dockerHost(CONFIG.getDockerHost())
.sslConfig(CONFIG.getSSLConfig())
.maxConnections(100)
.build();
private static final DockerClient DOCKER_CLIENT = DockerClientImpl.getInstance(CONFIG, HTTP_CLIENT);
public static String getTaggedImageName(String dockerRepository, String tag) {
return String.join(":", dockerRepository, tag);
}
public static String buildImage(String dockerFilePath, String tag) {
return DOCKER_CLIENT.buildImageCmd()
.withDockerfile(new File(dockerFilePath))
.withTags(Set.of(tag))
.exec(new BuildImageResultCallback())
.awaitImageId();
}
}

View File

@@ -3,7 +3,6 @@ plugins {
}
dependencies {
testImplementation 'org.apache.commons:commons-lang3:3.11'
implementation group: 'org.apache.commons', name: 'commons-compress', version: '1.20'
implementation 'org.apache.commons:commons-compress:1.20'
implementation 'org.apache.commons:commons-lang3:3.11'
}

View File

@@ -26,6 +26,7 @@ package io.airbyte.commons.string;
import com.google.common.collect.Streams;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
public class Strings {
@@ -35,4 +36,8 @@ public class Strings {
.collect(Collectors.joining(separator));
}
public static String addRandomSuffix(String base, String separator, int suffixLength) {
return base + separator + RandomStringUtils.randomAlphabetic(suffixLength).toLowerCase();
}
}

View File

@@ -25,6 +25,7 @@
package io.airbyte.config;
import java.nio.file.Path;
import java.util.Set;
public interface Configs {
@@ -62,6 +63,8 @@ public interface Configs {
String getTemporalHost();
Set<Integer> getTemporalWorkerPorts();
enum TrackingStrategy {
SEGMENT,
LOGGING

View File

@@ -26,8 +26,11 @@ package io.airbyte.config;
import com.google.common.base.Preconditions;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.Optional;
import java.util.Set;
import java.util.function.Function;
import java.util.stream.Collectors;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -53,6 +56,7 @@ public class EnvConfigs implements Configs {
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final String TEMPORAL_WORKER_PORTS = "TEMPORAL_WORKER_PORTS";
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
@@ -166,6 +170,13 @@ public class EnvConfigs implements Configs {
return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233");
}
@Override
public Set<Integer> getTemporalWorkerPorts() {
return Arrays.stream(getEnvOrDefault(TEMPORAL_WORKER_PORTS, "").split(","))
.map(Integer::valueOf)
.collect(Collectors.toSet());
}
private String getEnvOrDefault(String key, String defaultValue) {
return getEnvOrDefault(key, defaultValue, Function.identity());
}

View File

@@ -31,12 +31,12 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.SQLException;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -58,7 +58,7 @@ class PostgresUtilsTest {
@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);
final JsonNode config = getConfig(PSQL_DB, dbName);

View File

@@ -31,13 +31,13 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.SQLException;
import java.util.List;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -65,7 +65,7 @@ public class TestDefaultJdbcDatabase {
@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);
config = getConfig(PSQL_DB, dbName);

View File

@@ -34,6 +34,7 @@ import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.stream.MoreStreams;
import io.airbyte.commons.string.Strings;
import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.math.BigDecimal;
@@ -48,7 +49,6 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -75,7 +75,7 @@ public class TestJdbcUtils {
@BeforeEach
void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);
final JsonNode config = getConfig(PSQL_DB, dbName);

View File

@@ -38,6 +38,7 @@ import com.google.common.collect.Lists;
import io.airbyte.commons.functional.CheckedConsumer;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.sql.Connection;
import java.sql.PreparedStatement;
@@ -47,7 +48,6 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.dbcp2.BasicDataSource;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -79,7 +79,7 @@ public class TestStreamingJdbcDatabase {
void setup() throws Exception {
jdbcStreamingQueryConfiguration = mock(JdbcStreamingQueryConfiguration.class);
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);
final JsonNode config = getConfig(PSQL_DB, dbName);

View File

@@ -43,6 +43,7 @@ import com.google.cloud.bigquery.TableResult;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Maps;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.StandardNameTransformer;
import io.airbyte.integrations.standardtest.destination.DestinationAcceptanceTest;
@@ -55,7 +56,6 @@ import java.util.Map;
import java.util.UUID;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.apache.commons.lang3.tuple.ImmutablePair;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -190,7 +190,8 @@ public class BigQueryDestinationAcceptanceTest extends DestinationAcceptanceTest
final String projectId = credentialsJson.get(CONFIG_PROJECT_ID).asText();
final String datasetLocation = "US";
final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
config = Jsons.jsonNode(ImmutableMap.builder()
.put(CONFIG_PROJECT_ID, projectId)
.put(CONFIG_CREDS, credentialsJsonString)

View File

@@ -43,6 +43,7 @@ import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.base.AirbyteMessageConsumer;
import io.airbyte.integrations.base.Destination;
import io.airbyte.integrations.base.JavaBaseConstants;
@@ -69,7 +70,6 @@ import java.util.List;
import java.util.Set;
import java.util.stream.Collectors;
import java.util.stream.StreamSupport;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -136,7 +136,7 @@ class BigQueryDestinationTest {
.build()
.getService();
final String datasetId = "airbyte_tests_" + RandomStringUtils.randomAlphanumeric(8);
final String datasetId = Strings.addRandomSuffix("airbyte_tests", "_", 8);
final String datasetLocation = "EU";
MESSAGE_USERS1.getRecord().setNamespace(datasetId);
MESSAGE_USERS2.getRecord().setNamespace(datasetId);

View File

@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
@@ -37,7 +38,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
@@ -166,7 +166,7 @@ public class MSSQLDestinationAcceptanceTest extends DestinationAcceptanceTest {
@Override
protected void setup(TestDestinationEnv testEnv) throws SQLException {
configWithoutDbName = getConfig(db);
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);
final Database database = getDatabase(configWithoutDbName);
database.query(ctx -> {

View File

@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
@@ -37,7 +38,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
import org.junit.jupiter.api.AfterAll;
@@ -175,7 +175,7 @@ public class MSSQLDestinationAcceptanceTestSSL extends DestinationAcceptanceTest
@Override
protected void setup(TestDestinationEnv testEnv) throws SQLException {
configWithoutDbName = getConfig(db);
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);
final Database database = getDatabase(configWithoutDbName);
database.query(ctx -> {

View File

@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.base.JavaBaseConstants;
@@ -37,7 +38,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.JSONFormat;
import org.jooq.JSONFormat.RecordFormat;
@@ -133,7 +133,7 @@ public class RedshiftCopyDestinationAcceptanceTest extends DestinationAcceptance
// for each test we create a new schema in the database. run the test in there and then remove it.
@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
final String schemaName = ("integration_test_" + RandomStringUtils.randomAlphanumeric(5));
final String schemaName = Strings.addRandomSuffix("integration_test", "_", 5);
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
baseConfig = getStaticConfig();
getDatabase().query(ctx -> ctx.execute(createSchemaQuery));

View File

@@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.base.Preconditions;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.JdbcUtils;
import io.airbyte.integrations.base.JavaBaseConstants;
import io.airbyte.integrations.destination.ExtendedNameTransformer;
@@ -38,7 +39,6 @@ import java.sql.SQLException;
import java.util.ArrayList;
import java.util.List;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAcceptanceTest {
@@ -130,7 +130,7 @@ public class SnowflakeInsertDestinationAcceptanceTest extends DestinationAccepta
// for each test we create a new schema in the database. run the test in there and then remove it.
@Override
protected void setup(TestDestinationEnv testEnv) throws Exception {
final String schemaName = ("integration_test_" + RandomStringUtils.randomAlphanumeric(5));
final String schemaName = Strings.addRandomSuffix("integration_test", "_", 5);
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
baseConfig = getStaticConfig();

View File

@@ -28,13 +28,13 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -62,7 +62,7 @@ class AbstractJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
@BeforeEach
public void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", PSQL_DB.getHost())

View File

@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
@@ -35,7 +36,6 @@ import io.airbyte.integrations.source.jdbc.test.JdbcStressTest;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -65,7 +65,7 @@ class DefaultJdbcStressTest extends JdbcStressTest {
@BeforeEach
public void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10);
config = Jsons.jsonNode(ImmutableMap.of("host", "localhost", "port", 5432, "database", "charles", "username", "postgres", "password", ""));

View File

@@ -35,7 +35,6 @@ import io.airbyte.integrations.source.jdbc.test.JdbcStressTest;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -65,7 +64,7 @@ class JdbcSourceStressTest extends JdbcStressTest {
@BeforeEach
public void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String schemaName = Strings.addRandomSuffix("db", "_", 10);;
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", PSQL_DB.getHost())

View File

@@ -29,6 +29,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.standardtest.source.SourceAcceptanceTest;
@@ -42,7 +43,6 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.testcontainers.containers.MSSQLServerContainer;
public class MssqlSourceAcceptanceTest extends SourceAcceptanceTest {
@@ -63,7 +63,7 @@ public class MssqlSourceAcceptanceTest extends SourceAcceptanceTest {
.put("username", db.getUsername())
.put("password", db.getPassword())
.build());
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
final Database database = getDatabase(configWithoutDbName);
database.query(ctx -> {

View File

@@ -28,11 +28,11 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -67,7 +67,7 @@ public class MssqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
configWithoutDbName.get("port").asInt()),
"com.microsoft.sqlserver.jdbc.SQLServerDriver");
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
database.execute(ctx -> ctx.createStatement().execute(String.format("CREATE DATABASE %s;", dbName)));

View File

@@ -31,6 +31,7 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.protocol.models.AirbyteCatalog;
@@ -40,7 +41,6 @@ import io.airbyte.protocol.models.JsonSchemaPrimitive;
import io.airbyte.protocol.models.SyncMode;
import java.sql.SQLException;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -77,7 +77,7 @@ class MssqlSourceTest {
@BeforeEach
void setup() throws SQLException {
configWithoutDbName = getConfig(db);
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
final Database database = getDatabase(configWithoutDbName);
database.query(ctx -> {

View File

@@ -28,12 +28,12 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcStressTest;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -70,7 +70,7 @@ public class MssqlStressTest extends JdbcStressTest {
configWithoutDbName.get("port").asInt()),
"com.microsoft.sqlserver.jdbc.SQLServerDriver");
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
database.execute(ctx -> ctx.createStatement().execute(String.format("CREATE DATABASE %s;", dbName)));

View File

@@ -27,6 +27,7 @@ package io.airbyte.integrations.source.mysql;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
@@ -34,7 +35,6 @@ import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -68,7 +68,7 @@ class MySqlJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", "db_" + RandomStringUtils.randomAlphabetic(10))
.put("database", Strings.addRandomSuffix("db", "_", 10))
.put("username", TEST_USER)
.put("password", TEST_PASSWORD)
.build());

View File

@@ -29,12 +29,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.protocol.models.AirbyteConnectionStatus;
import java.sql.Connection;
import java.sql.DriverManager;
import java.util.Properties;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.MySQLContainer;
@@ -60,7 +60,7 @@ public class MySqlSourceTests {
Properties properties = new Properties();
properties.putAll(ImmutableMap.of("user", "root", "password", TEST_PASSWORD, "serverTimezone", "Europe/Moscow"));
DriverManager.getConnection(container.getJdbcUrl(), properties);
String dbName = "db_" + RandomStringUtils.randomAlphabetic(10);
final String dbName = Strings.addRandomSuffix("db", "_", 10);
config = getConfig(container, dbName, "serverTimezone=Europe/Moscow");
try (Connection connection = DriverManager.getConnection(container.getJdbcUrl(), properties)) {

View File

@@ -27,6 +27,7 @@ package io.airbyte.integrations.source.mysql;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
@@ -35,7 +36,6 @@ import java.sql.Connection;
import java.sql.DriverManager;
import java.sql.SQLException;
import java.util.Optional;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
@@ -71,7 +71,7 @@ class MySqlStressTest extends JdbcStressTest {
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", container.getHost())
.put("port", container.getFirstMappedPort())
.put("database", "db_" + RandomStringUtils.randomAlphabetic(10))
.put("database", Strings.addRandomSuffix("db", "_", 10))
.put("username", TEST_USER)
.put("password", TEST_PASSWORD)
.build());

View File

@@ -41,6 +41,7 @@ import com.google.common.collect.Streams;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.AutoCloseableIterator;
import io.airbyte.commons.util.AutoCloseableIterators;
import io.airbyte.db.Database;
@@ -72,7 +73,6 @@ import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Collectors;
import java.util.stream.Stream;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.DSLContext;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
@@ -159,7 +159,7 @@ class CdcPostgresSourceTest {
void setup() throws Exception {
source = new PostgresSource();
dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
final String initScriptName = "init_" + dbName.concat(".sql");
final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";");

View File

@@ -28,10 +28,10 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.integrations.source.jdbc.AbstractJdbcSource;
import io.airbyte.integrations.source.jdbc.test.JdbcSourceAcceptanceTest;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -52,7 +52,7 @@ class PostgresJdbcSourceAcceptanceTest extends JdbcSourceAcceptanceTest {
@BeforeEach
public void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", PSQL_DB.getHost())

View File

@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
@@ -56,7 +57,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -113,7 +113,7 @@ class PostgresSourceSSLTest {
@BeforeEach
void setup() throws Exception {
dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
final String initScriptName = "init_" + dbName.concat(".sql");
final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";");

View File

@@ -35,6 +35,7 @@ import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.commons.util.MoreIterators;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
@@ -56,7 +57,6 @@ import java.util.Map;
import java.util.Optional;
import java.util.Set;
import java.util.stream.Collectors;
import org.apache.commons.lang3.RandomStringUtils;
import org.jooq.SQLDialect;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
@@ -115,7 +115,7 @@ class PostgresSourceTest {
@BeforeEach
void setup() throws Exception {
dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
final String initScriptName = "init_" + dbName.concat(".sql");
final String tmpFilePath = IOs.writeFileToRandomTmpDir(initScriptName, "CREATE DATABASE " + dbName + ";");

View File

@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.jdbc.PostgresJdbcStreamingQueryConfiguration;
import io.airbyte.integrations.base.IntegrationRunner;
import io.airbyte.integrations.base.Source;
@@ -36,7 +37,6 @@ import io.airbyte.integrations.source.jdbc.test.JdbcStressTest;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.util.Optional;
import java.util.Set;
import org.apache.commons.lang3.RandomStringUtils;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
@@ -66,7 +66,7 @@ class PostgresStressTest extends JdbcStressTest {
@BeforeEach
public void setup() throws Exception {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
config = Jsons.jsonNode(ImmutableMap.builder()
.put("host", PSQL_DB.getHost())

View File

@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
import io.airbyte.db.jdbc.JdbcUtils;
@@ -44,7 +45,6 @@ import java.sql.SQLException;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import org.apache.commons.lang3.RandomStringUtils;
public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
@@ -71,7 +71,7 @@ public class RedshiftSourceAcceptanceTest extends SourceAcceptanceTest {
config.get("database").asText()),
RedshiftSource.DRIVER_CLASS);
schemaName = ("integration_test_" + RandomStringUtils.randomAlphanumeric(5)).toLowerCase();
schemaName = Strings.addRandomSuffix("integration_test", "_", 5).toLowerCase();
final String createSchemaQuery = String.format("CREATE SCHEMA %s", schemaName);
database.execute(connection -> {
connection.createStatement().execute(createSchemaQuery);

View File

@@ -3,6 +3,10 @@ plugins {
}
dependencies {
implementation 'io.fabric8:kubernetes-client:5.3.1'
implementation 'io.kubernetes:client-java-api:10.0.0'
implementation 'io.kubernetes:client-java:10.0.0'
implementation 'io.kubernetes:client-java-extended:10.0.0'
implementation 'io.temporal:temporal-sdk:1.0.4'
implementation project(':airbyte-analytics')

View File

@@ -49,6 +49,8 @@ import io.airbyte.workers.process.ProcessFactory;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalPool;
import io.airbyte.workers.temporal.TemporalUtils;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
@@ -56,8 +58,10 @@ import java.time.Duration;
import java.time.Instant;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
@@ -161,7 +165,9 @@ public class SchedulerApp {
private static ProcessFactory getProcessBuilderFactory(Configs configs) {
if (configs.getWorkerEnvironment() == Configs.WorkerEnvironment.KUBERNETES) {
return new KubeProcessFactory(configs.getWorkspaceRoot());
final KubernetesClient kubeClient = new DefaultKubernetesClient();
final BlockingQueue<Integer> ports = new LinkedBlockingDeque<>(configs.getTemporalWorkerPorts());
return new KubeProcessFactory("default", kubeClient, ports);
} else {
return new DockerProcessFactory(
configs.getWorkspaceRoot(),

View File

@@ -4,6 +4,7 @@ plugins {
dependencies {
implementation project(':airbyte-analytics')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-db')

View File

@@ -20,6 +20,7 @@ dependencies {
implementation project(':airbyte-analytics')
implementation project(':airbyte-api')
implementation project(':airbyte-commons-docker')
implementation project(':airbyte-config:models')
implementation project(':airbyte-config:persistence')
implementation project(':airbyte-config:init')

View File

@@ -28,6 +28,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.string.Strings;
import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.db.jdbc.JdbcDatabase;
@@ -35,7 +36,6 @@ import java.io.IOException;
import java.util.UUID;
import org.jooq.SQLDialect;
import org.testcontainers.containers.PostgreSQLContainer;
import org.testcontainers.shaded.org.apache.commons.lang.RandomStringUtils;
import org.testcontainers.utility.MountableFile;
public class PostgreSQLContainerHelper {
@@ -53,7 +53,7 @@ public class PostgreSQLContainerHelper {
}
public static JsonNode createDatabaseWithRandomNameAndGetPostgresConfig(PostgreSQLContainer<?> psqlDb) {
final String dbName = "db_" + RandomStringUtils.randomAlphabetic(10).toLowerCase();
final String dbName = Strings.addRandomSuffix("db", "_", 10).toLowerCase();
return createDatabaseAndGetPostgresConfig(psqlDb, dbName);
}

View File

@@ -26,7 +26,6 @@ package io.airbyte.test.acceptance;
import static io.airbyte.api.client.model.ConnectionSchedule.TimeUnitEnum.MINUTES;
import static java.lang.Thread.sleep;
import static java.time.temporal.ChronoUnit.SECONDS;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
@@ -86,6 +85,7 @@ import io.airbyte.db.Database;
import io.airbyte.db.Databases;
import io.airbyte.test.utils.PostgreSQLContainerHelper;
import java.io.IOException;
import java.net.Inet4Address;
import java.net.URI;
import java.nio.charset.Charset;
import java.sql.SQLException;
@@ -102,7 +102,6 @@ import java.util.stream.Collectors;
import org.jooq.JSONB;
import org.jooq.Record;
import org.jooq.Result;
import org.junit.Assume;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeAll;
@@ -125,8 +124,8 @@ public class AcceptanceTests {
private static final Logger LOGGER = LoggerFactory.getLogger(AcceptanceTests.class);
// Skip networking related failures on kube using this flag
private static final boolean IS_KUBE = System.getenv().containsKey("KUBE");
private static final boolean IS_MINIKUBE = System.getenv().containsKey("IS_MINIKUBE");
private static final String OUTPUT_NAMESPACE_PREFIX = "output_namespace_";
private static final String OUTPUT_NAMESPACE = OUTPUT_NAMESPACE_PREFIX + "${SOURCE_NAMESPACE}";
@@ -438,9 +437,6 @@ public class AcceptanceTests {
@Test
@Order(9)
public void testScheduledSync() throws Exception {
// skip with Kube. HTTP client error with port forwarding
Assume.assumeFalse(IS_KUBE);
final String connectionName = "test-connection";
final UUID sourceId = createPostgresSource().getSourceId();
final UUID destinationId = createDestination().getDestinationId();
@@ -455,7 +451,10 @@ public class AcceptanceTests {
// When a new connection is created, Airbyte might sync it immediately (before the sync interval).
// Then it will wait the sync interval.
sleep(Duration.of(30, SECONDS).toMillis());
// todo: wait for two attempts in the UI
// if the wait isn't long enough, failures say "Connection refused" because the assert kills the
// syncs in progress
sleep(Duration.ofMinutes(2).toMillis());
assertSourceAndTargetDbInSync(sourcePsql, false);
}
@@ -826,11 +825,18 @@ public class AcceptanceTests {
try {
final Map<Object, Object> dbConfig = new HashMap<>();
// todo (cgardens) - hack to get building passing in CI. need to follow up on why this was necessary
// (and affect on the k8s version of these tests).
dbConfig.put("host", "localhost");
// necessary for minikube tests on Github Actions instead of psql.getHost()
// dbConfig.put("host", Inet4Address.getLocalHost().getHostAddress());
// don't use psql.getHost() directly since the ip we need differs depending on environment
if (IS_KUBE) {
if (IS_MINIKUBE) {
// used with minikube driver=none instance
dbConfig.put("host", Inet4Address.getLocalHost().getHostAddress());
} else {
// used on a single node with docker driver
dbConfig.put("host", "host.docker.internal");
}
} else {
dbConfig.put("host", "localhost");
}
if (hiddenPassword) {
dbConfig.put("password", "**********");
@@ -919,7 +925,7 @@ public class AcceptanceTests {
private static JobRead waitForJob(JobsApi jobsApi, JobRead originalJob, Set<JobStatus> jobStatuses) throws InterruptedException, ApiException {
JobRead job = originalJob;
int count = 0;
while (count < 60 && jobStatuses.contains(job.getStatus())) {
while (count < 200 && jobStatuses.contains(job.getStatus())) {
sleep(1000);
count++;

View File

@@ -10,13 +10,13 @@ configurations {
}
dependencies {
implementation 'org.apache.commons:commons-lang3:3.11'
implementation 'io.fabric8:kubernetes-client:5.3.1'
implementation 'io.kubernetes:client-java-api:10.0.0'
implementation 'io.kubernetes:client-java:10.0.0'
implementation 'io.kubernetes:client-java-extended:10.0.0'
implementation 'io.temporal:temporal-sdk:1.0.4'
implementation 'org.apache.ant:ant:1.10.10'
implementation 'org.apache.commons:commons-lang3:3.11'
implementation project(':airbyte-config:models')
implementation project(':airbyte-db')
@@ -24,9 +24,11 @@ dependencies {
implementation project(':airbyte-protocol:models')
testImplementation 'org.mockito:mockito-inline:2.13.0'
testImplementation 'org.testcontainers:testcontainers:1.14.3'
testImplementation 'org.testcontainers:testcontainers:1.15.3'
testImplementation 'org.testcontainers:postgresql:1.15.1'
testImplementation 'org.postgresql:postgresql:42.2.18'
testImplementation project(':airbyte-commons-docker')
}
jsonSchema2Pojo {

View File

@@ -25,7 +25,7 @@
package io.airbyte.workers;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.config.OperatorDbt;
@@ -35,6 +35,7 @@ import java.nio.file.Path;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.tools.ant.types.Commandline;
import org.slf4j.Logger;
@@ -76,8 +77,8 @@ public class DbtTransformationRunner implements AutoCloseable {
}
public boolean transform(String jobId, int attempt, Path jobRoot, JsonNode config, OperatorDbt dbtConfig) throws Exception {
IOs.writeFile(jobRoot, DBT_ENTRYPOINT_SH, MoreResources.readResource("dbt_transformation_entrypoint.sh"));
try {
final Map<String, String> files = ImmutableMap.of(DBT_ENTRYPOINT_SH, MoreResources.readResource("dbt_transformation_entrypoint.sh"));
final List<String> dbtArguments = new ArrayList<>();
dbtArguments.add("/data/job/transform/" + DBT_ENTRYPOINT_SH);
Collections.addAll(dbtArguments, Commandline.translateCommandline(dbtConfig.getDbtArguments()));
@@ -87,7 +88,7 @@ public class DbtTransformationRunner implements AutoCloseable {
if (!dbtConfig.getDbtArguments().contains("--project-dir=")) {
dbtArguments.add("--project-dir=/data/job/transform/git_repo/");
}
process = processFactory.create(jobId, attempt, jobRoot, dbtConfig.getDockerImage(), "/bin/bash", dbtArguments);
process = processFactory.create(jobId, attempt, jobRoot, dbtConfig.getDockerImage(), false, files, "/bin/bash", dbtArguments);
LineGobbler.gobble(process.getInputStream(), LOGGER::info);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

View File

@@ -24,7 +24,6 @@
package io.airbyte.workers;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.enums.Enums;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
@@ -66,11 +65,11 @@ public class DefaultCheckConnectionWorker implements CheckConnectionWorker {
@Override
public StandardCheckConnectionOutput run(StandardCheckConnectionInput input, Path jobRoot) throws WorkerException {
final JsonNode configDotJson = input.getConnectionConfiguration();
IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(configDotJson));
try {
process = integrationLauncher.check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME);
process = integrationLauncher.check(
jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(input.getConnectionConfiguration()));
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);
@@ -98,7 +97,7 @@ public class DefaultCheckConnectionWorker implements CheckConnectionWorker {
}
} catch (Exception e) {
throw new WorkerException("Error while getting checking connection.");
throw new WorkerException("Error while getting checking connection.", e);
}
}

View File

@@ -24,7 +24,6 @@
package io.airbyte.workers;
import com.fasterxml.jackson.databind.JsonNode;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
@@ -62,12 +61,11 @@ public class DefaultDiscoverCatalogWorker implements DiscoverCatalogWorker {
@Override
public AirbyteCatalog run(final StandardDiscoverCatalogInput discoverSchemaInput, final Path jobRoot) throws WorkerException {
final JsonNode configDotJson = discoverSchemaInput.getConnectionConfiguration();
IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(configDotJson));
try {
process = integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME);
process = integrationLauncher.discover(
jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(discoverSchemaInput.getConnectionConfiguration()));
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

View File

@@ -113,7 +113,7 @@ public class WorkerUtils {
try {
process.waitFor(gracefulShutdownDuration.toMillis(), TimeUnit.MILLISECONDS);
} catch (InterruptedException e) {
LOGGER.error("Exception during grace period for process to finish", e);
LOGGER.error("Exception during grace period for process to finish. This can happen when cancelling jobs.");
}
}

View File

@@ -26,7 +26,7 @@ package io.airbyte.workers.normalization;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.io.IOs;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.LineGobbler;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.OperatorDbt;
@@ -36,6 +36,7 @@ import io.airbyte.workers.WorkerException;
import io.airbyte.workers.WorkerUtils;
import io.airbyte.workers.process.ProcessFactory;
import java.nio.file.Path;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -65,8 +66,10 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
@Override
public boolean configureDbt(String jobId, int attempt, Path jobRoot, JsonNode config, OperatorDbt dbtConfig) throws Exception {
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config));
return runProcess(jobId, attempt, jobRoot, "configure-dbt",
final Map<String, String> files = ImmutableMap.of(
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config));
return runProcess(jobId, attempt, jobRoot, files, "configure-dbt",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--git-repo", dbtConfig.getGitRepoUrl(),
@@ -75,18 +78,19 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
@Override
public boolean normalize(String jobId, int attempt, Path jobRoot, JsonNode config, ConfiguredAirbyteCatalog catalog) throws Exception {
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config));
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));
final Map<String, String> files = ImmutableMap.of(
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));
return runProcess(jobId, attempt, jobRoot, "run",
return runProcess(jobId, attempt, jobRoot, files, "run",
"--integration-type", destinationType.toString().toLowerCase(),
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME);
}
private boolean runProcess(String jobId, int attempt, Path jobRoot, final String... args) throws Exception {
private boolean runProcess(String jobId, int attempt, Path jobRoot, Map<String, String> files, final String... args) throws Exception {
try {
process = processFactory.create(jobId, attempt, jobRoot, NORMALIZATION_IMAGE_NAME, null, args);
process = processFactory.create(jobId, attempt, jobRoot, NORMALIZATION_IMAGE_NAME, false, files, null, args);
LineGobbler.gobble(process.getInputStream(), LOGGER::info);
LineGobbler.gobble(process.getErrorStream(), LOGGER::error);

View File

@@ -24,10 +24,15 @@
package io.airbyte.workers.process;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.workers.WorkerException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -58,29 +63,35 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
attempt,
jobRoot,
imageName,
false,
Collections.emptyMap(),
null,
"spec");
}
@Override
public Process check(final Path jobRoot, final String configFilename) throws WorkerException {
public Process check(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException {
return processFactory.create(
jobId,
attempt,
jobRoot,
imageName,
false,
ImmutableMap.of(configFilename, configContents),
null,
"check",
"--config", configFilename);
}
@Override
public Process discover(final Path jobRoot, final String configFilename) throws WorkerException {
public Process discover(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException {
return processFactory.create(
jobId,
attempt,
jobRoot,
imageName,
false,
ImmutableMap.of(configFilename, configContents),
null,
"discover",
"--config", configFilename);
@@ -89,17 +100,27 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
@Override
public Process read(final Path jobRoot,
final String configFilename,
final String configContents,
final String catalogFilename,
final String stateFilename)
final String catalogContents,
final String stateFilename,
final String stateContents)
throws WorkerException {
final List<String> arguments = Lists.newArrayList(
"read",
"--config", configFilename,
"--catalog", catalogFilename);
final Map<String, String> files = new HashMap<>();
files.put(configFilename, configContents);
files.put(catalogFilename, catalogContents);
if (stateFilename != null) {
arguments.add("--state");
arguments.add(stateFilename);
Preconditions.checkNotNull(stateContents);
files.put(stateFilename, stateContents);
}
return processFactory.create(
@@ -107,17 +128,30 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
attempt,
jobRoot,
imageName,
false,
files,
null,
arguments);
}
@Override
public Process write(Path jobRoot, String configFilename, String catalogFilename) throws WorkerException {
public Process write(final Path jobRoot,
final String configFilename,
final String configContents,
final String catalogFilename,
final String catalogContents)
throws WorkerException {
final Map<String, String> files = ImmutableMap.of(
configFilename, configContents,
catalogFilename, catalogContents);
return processFactory.create(
jobId,
attempt,
jobRoot,
imageName,
true,
files,
null,
"write",
"--config", configFilename,

View File

@@ -38,6 +38,7 @@ import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -79,38 +80,52 @@ public class DockerProcessFactory implements ProcessFactory {
}
@Override
public Process create(String jobId, int attempt, final Path jobRoot, final String imageName, final String entrypoint, final String... args)
public Process create(String jobId,
int attempt,
final Path jobRoot,
final String imageName,
final boolean usesStdin,
final Map<String, String> files,
final String entrypoint,
final String... args)
throws WorkerException {
if (!checkImageExists(imageName)) {
throw new WorkerException("Could not find image: " + imageName);
}
final List<String> cmd =
Lists.newArrayList(
"docker",
"run",
"--rm",
"--init",
"-i",
"-v",
String.format("%s:%s", workspaceMountSource, DATA_MOUNT_DESTINATION),
"-v",
String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION),
"-w",
rebasePath(jobRoot).toString(),
"--network",
networkName);
if (!Strings.isNullOrEmpty(entrypoint)) {
cmd.add("--entrypoint");
cmd.add(entrypoint);
}
cmd.add(imageName);
cmd.addAll(Arrays.asList(args));
LOGGER.info("Preparing command: {}", Joiner.on(" ").join(cmd));
try {
if (!checkImageExists(imageName)) {
throw new WorkerException("Could not find image: " + imageName);
}
if (!jobRoot.toFile().exists()) {
Files.createDirectory(jobRoot);
}
for (Map.Entry<String, String> file : files.entrySet()) {
IOs.writeFile(jobRoot, file.getKey(), file.getValue());
}
final List<String> cmd =
Lists.newArrayList(
"docker",
"run",
"--rm",
"--init",
"-i",
"-v",
String.format("%s:%s", workspaceMountSource, DATA_MOUNT_DESTINATION),
"-v",
String.format("%s:%s", localMountSource, LOCAL_MOUNT_DESTINATION),
"-w",
rebasePath(jobRoot).toString(),
"--network",
networkName);
if (!Strings.isNullOrEmpty(entrypoint)) {
cmd.add("--entrypoint");
cmd.add(entrypoint);
}
cmd.add(imageName);
cmd.addAll(Arrays.asList(args));
LOGGER.info("Preparing command: {}", Joiner.on(" ").join(cmd));
return new ProcessBuilder(cmd).start();
} catch (IOException e) {
throw new WorkerException(e.getMessage(), e);

View File

@@ -31,33 +31,33 @@ public interface IntegrationLauncher {
Process spec(final Path jobRoot) throws WorkerException;
Process check(final Path jobRoot, final String configFilename) throws WorkerException;
Process check(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException;
Process discover(final Path jobRoot, final String configFilename) throws WorkerException;
Process discover(final Path jobRoot, final String configFilename, final String configContents) throws WorkerException;
Process read(final Path jobRoot,
final String configFilename,
final String configContents,
final String catalogFilename,
final String stateFilename)
final String catalogContents,
final String stateFilename,
final String stateContents)
throws WorkerException;
default Process read(final Path jobRoot,
final String configFilename,
final String catalogFilename)
final String configContents,
final String catalogFilename,
final String catalogContents)
throws WorkerException {
return read(jobRoot, configFilename, catalogFilename, null);
return read(jobRoot, configFilename, configContents, catalogFilename, catalogContents, null, null);
}
Process write(final Path jobRoot,
final String configFilename,
final String catalogFilename)
final String configContents,
final String catalogFilename,
final String catalogContents)
throws WorkerException;
// TODO: this version should be removed once we've moved away from singer protocol
default Process write(final Path jobRoot,
final String configFilename)
throws WorkerException {
return write(jobRoot, configFilename, null);
}
}

View File

@@ -0,0 +1,507 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.workers.process;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.lang.Exceptions;
import io.airbyte.commons.string.Strings;
import io.fabric8.kubernetes.api.model.Container;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.DeletionPropagation;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.api.model.Volume;
import io.fabric8.kubernetes.api.model.VolumeBuilder;
import io.fabric8.kubernetes.api.model.VolumeMount;
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.net.InetAddress;
import java.net.ServerSocket;
import java.net.Socket;
import java.nio.file.Path;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.function.Consumer;
import org.apache.commons.io.output.NullOutputStream;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
/**
* A Process abstraction backed by a Kube Pod running in a Kubernetes cluster 'somewhere'. The
* parent process starting a Kube Pod Process needs to exist within the Kube networking space. This
* is so the parent process can forward data into the child's stdin and read the child's stdout and
* stderr streams and copy configuration files over.
*
* This is made possible by:
* <li>1) An init container that creates 3 named pipes corresponding to stdin, stdout and std err on
* a shared volume.</li>
* <li>2) Config files (e.g. config.json, catalog.json etc) are copied from the parent process into
* a shared volume.</li>
* <li>3) Redirecting the stdin named pipe to the original image's entrypoint and it's output into
* the respective named pipes for stdout and stderr.</li>
* <li>4) Each named pipe has a corresponding side car. Each side car forwards its stream
* accordingly using socat. e.g. stderr/stdout is forwarded to parent process while input from the
* parent process is forwarded into stdin.</li>
* <li>5) The parent process listens on the stdout and stederr sockets for an incoming TCP
* connection. It also initiates a TCP connection to the child process aka the Kube pod on the
* specified stdin socket.</li>
* <li>6) The child process is able to access configuration data via the shared volume. It's inputs
* and outputs - stdin, stdout and stderr - are forwarded the parent process via the sidecars.</li>
*
*
* See the constructor for more information.
*/
// TODO(Davin): Better test for this. See https://github.com/airbytehq/airbyte/issues/3700.
public class KubePodProcess extends Process {
private static final Logger LOGGER = LoggerFactory.getLogger(KubePodProcess.class);
private static final String INIT_CONTAINER_NAME = "init";
private static final String PIPES_DIR = "/pipes";
private static final String STDIN_PIPE_FILE = PIPES_DIR + "/stdin";
private static final String STDOUT_PIPE_FILE = PIPES_DIR + "/stdout";
private static final String STDERR_PIPE_FILE = PIPES_DIR + "/stderr";
private static final String CONFIG_DIR = "/config";
private static final String SUCCESS_FILE_NAME = "FINISHED_UPLOADING";
// 143 is the typical SIGTERM exit code.
private static final int KILLED_EXIT_CODE = 143;
private static final int STDIN_REMOTE_PORT = 9001;
private final KubernetesClient client;
private final Pod podDefinition;
// Necessary since it is not possible to retrieve the pod's actual exit code upon termination. This
// is because the Kube API server does not keep
// terminated pod history like it does for successful pods.
// This variable should be set in functions where the pod is forcefully terminated. See
// getReturnCode() for more info.
private final AtomicBoolean wasKilled = new AtomicBoolean(false);
private final OutputStream stdin;
private InputStream stdout;
private InputStream stderr;
private final Consumer<Integer> portReleaser;
private final ServerSocket stdoutServerSocket;
private final int stdoutLocalPort;
private final ServerSocket stderrServerSocket;
private final int stderrLocalPort;
private final ExecutorService executorService;
// TODO(Davin): Cache this result.
public static String getCommandFromImage(KubernetesClient client, String imageName, String namespace) throws InterruptedException {
final String podName = Strings.addRandomSuffix("airbyte-command-fetcher", "-", 5);
Container commandFetcher = new ContainerBuilder()
.withName("airbyte-command-fetcher")
.withImage(imageName)
.withCommand("sh", "-c", "echo \"AIRBYTE_ENTRYPOINT=$AIRBYTE_ENTRYPOINT\"")
.build();
Pod pod = new PodBuilder()
.withApiVersion("v1")
.withNewMetadata()
.withName(podName)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
.withContainers(commandFetcher)
.endSpec()
.build();
LOGGER.info("Creating pod...");
Pod podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod);
LOGGER.info("Waiting until command fetcher pod completes...");
// TODO(Davin): If a pod is missing, this will wait for up to 2 minutes before error-ing out.
// Figure out a better way.
client.resource(podDefinition).waitUntilCondition(p -> p.getStatus().getPhase().equals("Succeeded"), 2, TimeUnit.MINUTES);
var logs = client.pods().inNamespace(namespace).withName(podName).getLog();
if (!logs.contains("AIRBYTE_ENTRYPOINT")) {
throw new RuntimeException(
"Missing AIRBYTE_ENTRYPOINT from command fetcher logs. This should not happen. Check the echo command has not been changed.");
}
var envVal = logs.split("=")[1].strip();
if (envVal.isEmpty()) {
// default to returning default entrypoint in bases
return "/airbyte/base.sh";
}
return envVal;
}
public static String getPodIP(KubernetesClient client, String podName, String namespace) {
var pod = client.pods().inNamespace(namespace).withName(podName).get();
if (pod == null) {
throw new RuntimeException("Error: unable to find pod!");
}
return pod.getStatus().getPodIP();
}
private static Container getInit(boolean usesStdin, List<VolumeMount> mainVolumeMounts) {
var initEntrypointStr = String.format("mkfifo %s && mkfifo %s", STDOUT_PIPE_FILE, STDERR_PIPE_FILE);
if (usesStdin) {
initEntrypointStr = String.format("mkfifo %s && ", STDIN_PIPE_FILE) + initEntrypointStr;
}
initEntrypointStr = initEntrypointStr + String.format(" && until [ -f %s ]; do sleep 5; done;", SUCCESS_FILE_NAME);
return new ContainerBuilder()
.withName(INIT_CONTAINER_NAME)
.withImage("busybox:1.28")
.withWorkingDir(CONFIG_DIR)
.withCommand("sh", "-c", initEntrypointStr)
.withVolumeMounts(mainVolumeMounts)
.build();
}
private static Container getMain(String image, boolean usesStdin, String entrypoint, List<VolumeMount> mainVolumeMounts, String[] args) {
var argsStr = String.join(" ", args);
var entrypointStr = entrypoint + " " + argsStr + " ";
var entrypointStrWithPipes = entrypointStr + String.format(" 2> %s > %s", STDERR_PIPE_FILE, STDOUT_PIPE_FILE);
if (usesStdin) {
entrypointStrWithPipes = String.format("cat %s | ", STDIN_PIPE_FILE) + entrypointStrWithPipes;
}
return new ContainerBuilder()
.withName("main")
.withImage(image)
.withCommand("sh", "-c", entrypointStrWithPipes)
.withWorkingDir(CONFIG_DIR)
.withVolumeMounts(mainVolumeMounts)
.build();
}
private static void copyFilesToKubeConfigVolume(KubernetesClient client, String podName, String namespace, Map<String, String> files) {
List<Map.Entry<String, String>> fileEntries = new ArrayList<>(files.entrySet());
for (Map.Entry<String, String> file : fileEntries) {
Path tmpFile = null;
try {
tmpFile = Path.of(IOs.writeFileToRandomTmpDir(file.getKey(), file.getValue()));
LOGGER.info("Uploading file: " + file.getKey());
client.pods().inNamespace(namespace).withName(podName).inContainer(INIT_CONTAINER_NAME)
.file(CONFIG_DIR + "/" + file.getKey())
.upload(tmpFile);
} finally {
if (tmpFile != null) {
tmpFile.toFile().delete();
}
}
}
}
/**
* The calls in this function aren't straight-forward due to api limitations. There is no proper way
* to directly look for containers within a pod or query if a container is in a running state beside
* checking if the getRunning field is set. We could put this behind an interface, but that seems
* heavy-handed compared to the 10 lines here.
*/
private static void waitForInitPodToRun(KubernetesClient client, Pod podDefinition) throws InterruptedException {
LOGGER.info("Waiting for init container to be ready before copying files...");
client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName())
.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES);
LOGGER.info("Init container present..");
client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName())
.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().get(0).getState().getRunning() != null, 5, TimeUnit.MINUTES);
LOGGER.info("Init container ready..");
}
public KubePodProcess(KubernetesClient client,
Consumer<Integer> portReleaser,
String podName,
String namespace,
String image,
int stdoutLocalPort,
int stderrLocalPort,
boolean usesStdin,
final Map<String, String> files,
final String entrypointOverride,
final String... args)
throws IOException, InterruptedException {
this.client = client;
this.portReleaser = portReleaser;
this.stdoutLocalPort = stdoutLocalPort;
this.stderrLocalPort = stderrLocalPort;
stdoutServerSocket = new ServerSocket(stdoutLocalPort);
stderrServerSocket = new ServerSocket(stderrLocalPort);
executorService = Executors.newFixedThreadPool(2);
setupStdOutAndStdErrListeners();
String entrypoint = entrypointOverride == null ? getCommandFromImage(client, image, namespace) : entrypointOverride;
LOGGER.info("Found entrypoint: {}", entrypoint);
Volume pipeVolume = new VolumeBuilder()
.withName("airbyte-pipes")
.withNewEmptyDir()
.endEmptyDir()
.build();
VolumeMount pipeVolumeMount = new VolumeMountBuilder()
.withName("airbyte-pipes")
.withMountPath(PIPES_DIR)
.build();
Volume configVolume = new VolumeBuilder()
.withName("airbyte-config")
.withNewEmptyDir()
.endEmptyDir()
.build();
VolumeMount configVolumeMount = new VolumeMountBuilder()
.withName("airbyte-config")
.withMountPath(CONFIG_DIR)
.build();
var volumes = List.of(pipeVolume, configVolume);
var mainVolumeMounts = List.of(pipeVolumeMount, configVolumeMount);
Container init = getInit(usesStdin, mainVolumeMounts);
Container main = getMain(image, usesStdin, entrypoint, mainVolumeMounts, args);
Container remoteStdin = new ContainerBuilder()
.withName("remote-stdin")
.withImage("alpine/socat:1.7.4.1-r1")
.withCommand("sh", "-c", "socat -d -d -d TCP-L:9001 STDOUT > " + STDIN_PIPE_FILE)
.withVolumeMounts(pipeVolumeMount)
.build();
var localIp = InetAddress.getLocalHost().getHostAddress();
Container relayStdout = new ContainerBuilder()
.withName("relay-stdout")
.withImage("alpine/socat:1.7.4.1-r1")
.withCommand("sh", "-c", String.format("cat %s | socat -d -d -d - TCP:%s:%s", STDOUT_PIPE_FILE, localIp, stdoutLocalPort))
.withVolumeMounts(pipeVolumeMount)
.build();
Container relayStderr = new ContainerBuilder()
.withName("relay-stderr")
.withImage("alpine/socat:1.7.4.1-r1")
.withCommand("sh", "-c", String.format("cat %s | socat -d -d -d - TCP:%s:%s", STDERR_PIPE_FILE, localIp, stderrLocalPort))
.withVolumeMounts(pipeVolumeMount)
.build();
List<Container> containers = usesStdin ? List.of(main, remoteStdin, relayStdout, relayStderr) : List.of(main, relayStdout, relayStderr);
Pod pod = new PodBuilder()
.withApiVersion("v1")
.withNewMetadata()
.withName(podName)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
.withInitContainers(init)
.withContainers(containers)
.withVolumes(volumes)
.endSpec()
.build();
LOGGER.info("Creating pod...");
this.podDefinition = client.pods().inNamespace(namespace).createOrReplace(pod);
waitForInitPodToRun(client, podDefinition);
LOGGER.info("Copying files...");
Map<String, String> filesWithSuccess = new HashMap<>(files);
// We always copy the empty success file to ensure our waiting step can detect the init container in
// RUNNING. Otherwise, the container can complete and exit before we are able to detect it.
filesWithSuccess.put(SUCCESS_FILE_NAME, "");
copyFilesToKubeConfigVolume(client, podName, namespace, filesWithSuccess);
LOGGER.info("Waiting until pod is ready...");
client.resource(podDefinition).waitUntilReady(30, TimeUnit.MINUTES);
// allow writing stdin to pod
LOGGER.info("Reading pod IP...");
var podIp = getPodIP(client, podName, namespace);
LOGGER.info("Pod IP: {}", podIp);
if (usesStdin) {
LOGGER.info("Creating stdin socket...");
var socketToDestStdIo = new Socket(podIp, STDIN_REMOTE_PORT);
this.stdin = socketToDestStdIo.getOutputStream();
} else {
LOGGER.info("Using null stdin output stream...");
this.stdin = NullOutputStream.NULL_OUTPUT_STREAM;
}
}
private void setupStdOutAndStdErrListeners() {
executorService.submit(() -> {
try {
LOGGER.info("Creating stdout socket server...");
var socket = stdoutServerSocket.accept(); // blocks until connected
LOGGER.info("Setting stdout...");
this.stdout = socket.getInputStream();
} catch (IOException e) {
e.printStackTrace(); // todo: propagate exception / join at the end of constructor
}
});
executorService.submit(() -> {
try {
LOGGER.info("Creating stderr socket server...");
var socket = stderrServerSocket.accept(); // blocks until connected
LOGGER.info("Setting stderr...");
this.stderr = socket.getInputStream();
} catch (IOException e) {
e.printStackTrace(); // todo: propagate exception / join at the end of constructor
}
});
}
@Override
public OutputStream getOutputStream() {
return this.stdin;
}
@Override
public InputStream getInputStream() {
return this.stdout;
}
@Override
public InputStream getErrorStream() {
return this.stderr;
}
/**
* Immediately terminates the Kube Pod backing this process and cleans up IO resources.
*/
@Override
public int waitFor() throws InterruptedException {
try {
Pod refreshedPod = client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName()).get();
client.resource(refreshedPod).waitUntilCondition(this::isTerminal, 10, TimeUnit.DAYS);
wasKilled.set(true);
return exitValue();
} finally {
close();
}
}
/**
* Intended to gracefully clean up after a completed Kube Pod. This should only be called if the
* process is successful.
*/
@Override
public boolean waitFor(long timeout, TimeUnit unit) throws InterruptedException {
try {
return super.waitFor(timeout, unit);
} finally {
close();
}
}
/**
* Immediately terminates the Kube Pod backing this process and cleans up IO resources.
*/
@Override
public void destroy() {
LOGGER.info("Destroying Kube process: {}", podDefinition.getMetadata().getName());
try {
client.resource(podDefinition).withPropagationPolicy(DeletionPropagation.FOREGROUND).delete();
wasKilled.set(true);
} finally {
close();
LOGGER.info("Destroyed Kube process: {}", podDefinition.getMetadata().getName());
}
}
/**
* Close all open resource in the opposite order of resource creation.
*/
private void close() {
Exceptions.swallow(this.stdin::close);
Exceptions.swallow(this.stdout::close);
Exceptions.swallow(this.stderr::close);
Exceptions.swallow(this.stdoutServerSocket::close);
Exceptions.swallow(this.stderrServerSocket::close);
Exceptions.swallow(this.executorService::shutdownNow);
Exceptions.swallow(() -> portReleaser.accept(stdoutLocalPort));
Exceptions.swallow(() -> portReleaser.accept(stderrLocalPort));
}
private boolean isTerminal(Pod pod) {
if (pod.getStatus() != null) {
return pod.getStatus()
.getContainerStatuses()
.stream()
.anyMatch(e -> e.getState() != null && e.getState().getTerminated() != null);
} else {
return false;
}
}
private int getReturnCode(Pod pod) {
var name = pod.getMetadata().getName();
Pod refreshedPod = client.pods().inNamespace(pod.getMetadata().getNamespace()).withName(name).get();
if (refreshedPod == null) {
if (wasKilled.get()) {
LOGGER.info("Unable to find pod {} to retrieve exit value. Defaulting to value {}. This is expected if the job was cancelled.", name,
KILLED_EXIT_CODE);
return KILLED_EXIT_CODE;
}
// If the pod cannot be found and was not killed, it either means 1) the pod was not created
// properly 2) this method is incorrectly called.
throw new RuntimeException("Cannot find pod while trying to retrieve exit code. This probably means the Pod was not correctly created.");
}
if (!isTerminal(refreshedPod)) {
throw new IllegalThreadStateException("Kube pod process has not exited yet.");
}
return refreshedPod.getStatus().getContainerStatuses()
.stream()
.filter(containerStatus -> containerStatus.getState() != null && containerStatus.getState().getTerminated() != null)
.map(containerStatus -> {
int statusCode = containerStatus.getState().getTerminated().getExitCode();
LOGGER.info("Exit code for pod {}, container {} is {}", name, containerStatus.getName(), statusCode);
return statusCode;
})
.reduce(Integer::sum)
.orElseThrow();
}
@Override
public int exitValue() {
return getReturnCode(podDefinition);
}
}

View File

@@ -24,17 +24,14 @@
package io.airbyte.workers.process;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.dataformat.yaml.YAMLFactory;
import com.google.common.base.Joiner;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.workers.WorkerException;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.nio.file.Path;
import java.util.Arrays;
import java.util.List;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.BlockingQueue;
import java.util.function.Consumer;
import org.apache.commons.lang3.RandomStringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,54 +40,61 @@ public class KubeProcessFactory implements ProcessFactory {
private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);
private static final Path WORKSPACE_MOUNT_DESTINATION = Path.of("/workspace");
private final String namespace;
private final KubernetesClient kubeClient;
private final BlockingQueue<Integer> ports;
private final Set<Integer> claimedPorts = new HashSet<>();
private final Path workspaceRoot;
public KubeProcessFactory(Path workspaceRoot) {
this.workspaceRoot = workspaceRoot;
public KubeProcessFactory(String namespace, KubernetesClient kubeClient, BlockingQueue<Integer> ports) {
this.namespace = namespace;
this.kubeClient = kubeClient;
this.ports = ports;
}
@Override
public Process create(String jobId, int attempt, final Path jobRoot, final String imageName, final String entrypoint, final String... args)
public Process create(String jobId,
int attempt,
final Path jobRoot,
final String imageName,
final boolean usesStdin,
final Map<String, String> files,
final String entrypoint,
final String... args)
throws WorkerException {
try {
final String template = MoreResources.readResource("kube_runner_template.yaml");
// used to differentiate source and destination processes with the same id and attempt
final String suffix = RandomStringUtils.randomAlphabetic(5).toLowerCase();
ObjectMapper yamlMapper = new ObjectMapper(new YAMLFactory());
final String rendered = template.replaceAll("JOBID", jobId)
.replaceAll("ATTEMPTID", String.valueOf(attempt))
.replaceAll("IMAGE", imageName)
.replaceAll("SUFFIX", suffix)
.replaceAll("ARGS", Jsons.serialize(Arrays.asList(args)))
.replaceAll("WORKDIR", jobRoot.toString());
final JsonNode node = yamlMapper.readTree(rendered);
final String overrides = Jsons.serialize(node);
final String podName = "airbyte-worker-" + jobId + "-" + attempt + "-" + suffix;
final List<String> cmd =
Lists.newArrayList(
"kubectl",
"run",
"--generator=run-pod/v1",
"--rm",
"-i",
"--pod-running-timeout=24h",
"--image=" + imageName,
"--restart=Never",
"--overrides=" + overrides, // fails if you add quotes around the overrides string
podName);
// TODO handle entrypoint override (to run DbtTransformationRunner for example)
LOGGER.debug("Preparing command: {}", Joiner.on(" ").join(cmd));
final int stdoutLocalPort = ports.take();
claimedPorts.add(stdoutLocalPort);
LOGGER.info("stdoutLocalPort = " + stdoutLocalPort);
return new ProcessBuilder(cmd).start();
final int stderrLocalPort = ports.take();
claimedPorts.add(stderrLocalPort);
LOGGER.info("stderrLocalPort = " + stderrLocalPort);
final Consumer<Integer> portReleaser = port -> {
if (!ports.contains(port)) {
ports.add(port);
LOGGER.info("Port consumer releasing: " + port);
} else {
LOGGER.info("Port consumer skipping releasing: " + port);
}
};
return new KubePodProcess(
kubeClient,
portReleaser,
podName,
namespace,
imageName,
stdoutLocalPort,
stderrLocalPort,
usesStdin,
files,
entrypoint,
args);
} catch (Exception e) {
throw new WorkerException(e.getMessage());
}

View File

@@ -27,6 +27,7 @@ package io.airbyte.workers.process;
import io.airbyte.workers.WorkerException;
import java.nio.file.Path;
import java.util.List;
import java.util.Map;
public interface ProcessFactory {
@@ -37,23 +38,34 @@ public interface ProcessFactory {
* @param attempt attempt Id
* @param jobPath Workspace directory to run the process from
* @param imageName Docker image name to start the process from
* @param files file name to contents map that will be written into the working dir of the process
* prior to execution
* @param entrypoint If not null, the default entrypoint program of the docker image can be changed
* by this argument
* @param args arguments to pass to the docker image being run in the new process
* @return the ProcessBuilder object to run the process
* @throws WorkerException
*/
Process create(String jobId, int attempt, final Path jobPath, final String imageName, final String entrypoint, final String... args)
Process create(String jobId,
int attempt,
final Path jobPath,
final String imageName,
final boolean usesStdin,
final Map<String, String> files,
final String entrypoint,
final String... args)
throws WorkerException;
default Process create(String jobId,
int attempt,
final Path jobPath,
final String imageName,
final boolean usesStdin,
final Map<String, String> files,
final String entrypoint,
final List<String> args)
throws WorkerException {
return create(jobId, attempt, jobPath, imageName, entrypoint, args.toArray(new String[0]));
return create(jobId, attempt, jobPath, imageName, usesStdin, files, entrypoint, args.toArray(new String[0]));
}
}

View File

@@ -74,15 +74,14 @@ public class DefaultAirbyteDestination implements AirbyteDestination {
@Override
public void start(StandardTargetConfig destinationConfig, Path jobRoot) throws IOException, WorkerException {
Preconditions.checkState(destinationProcess == null);
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
Jsons.serialize(destinationConfig.getDestinationConnectionConfiguration()));
IOs.writeFile(jobRoot, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(destinationConfig.getCatalog()));
LOGGER.info("Running destination...");
destinationProcess = integrationLauncher.write(
jobRoot,
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME);
Jsons.serialize(destinationConfig.getDestinationConnectionConfiguration()),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME,
Jsons.serialize(destinationConfig.getCatalog()));
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(destinationProcess.getErrorStream(), LOGGER::error, "airbyte-destination");

View File

@@ -78,16 +78,13 @@ public class DefaultAirbyteSource implements AirbyteSource {
public void start(StandardTapConfig input, Path jobRoot) throws Exception {
Preconditions.checkState(sourceProcess == null);
IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(input.getSourceConnectionConfiguration()));
IOs.writeFile(jobRoot, WorkerConstants.SOURCE_CATALOG_JSON_FILENAME, Jsons.serialize(input.getCatalog()));
if (input.getState() != null) {
IOs.writeFile(jobRoot, WorkerConstants.INPUT_STATE_JSON_FILENAME, Jsons.serialize(input.getState().getState()));
}
sourceProcess = integrationLauncher.read(jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(input.getSourceConnectionConfiguration()),
WorkerConstants.SOURCE_CATALOG_JSON_FILENAME,
input.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME);
Jsons.serialize(input.getCatalog()),
input.getState() == null ? null : WorkerConstants.INPUT_STATE_JSON_FILENAME,
input.getState() == null ? null : Jsons.serialize(input.getState().getState()));
// stdout logs are logged elsewhere since stdout also contains data
LineGobbler.gobble(sourceProcess.getErrorStream(), LOGGER::error, "airbyte-source");

View File

@@ -1,30 +0,0 @@
apiVersion: v1
kind: Pod
metadata:
name: airbyte-worker-JOBID-ATTEMPTID-SUFFIX
spec:
affinity:
podAffinity:
requiredDuringSchedulingIgnoredDuringExecution:
- labelSelector:
matchExpressions:
- key: airbyte
operator: In
values:
- scheduler
topologyKey: kubernetes.io/hostname
restartPolicy: Never
containers:
- name: worker
image: IMAGE
workingDir: WORKDIR
args: ARGS
stdin: true
stdinOnce: true
volumeMounts:
- name: airbyte-volume-workspace
mountPath: /workspace
volumes:
- name: airbyte-volume-workspace
persistentVolumeClaim:
claimName: airbyte-volume-workspace

View File

@@ -74,7 +74,7 @@ public class DefaultCheckConnectionWorkerTest {
integrationLauncher = mock(IntegrationLauncher.class, RETURNS_DEEP_STUBS);
process = mock(Process.class);
when(integrationLauncher.check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME)).thenReturn(process);
when(integrationLauncher.check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDS))).thenReturn(process);
final InputStream inputStream = mock(InputStream.class);
when(process.getInputStream()).thenReturn(inputStream);
when(process.getErrorStream()).thenReturn(new ByteArrayInputStream(new byte[0]));
@@ -124,7 +124,7 @@ public class DefaultCheckConnectionWorkerTest {
@Test
public void testExceptionThrownInRun() throws WorkerException {
doThrow(new RuntimeException()).when(integrationLauncher).check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME);
doThrow(new RuntimeException()).when(integrationLauncher).check(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDS));
final DefaultCheckConnectionWorker worker = new DefaultCheckConnectionWorker(integrationLauncher, failureStreamFactory);
assertThrows(WorkerException.class, () -> worker.run(input, jobRoot));

View File

@@ -83,7 +83,7 @@ public class DefaultDiscoverCatalogWorkerTest {
integrationLauncher = mock(IntegrationLauncher.class, RETURNS_DEEP_STUBS);
process = mock(Process.class);
when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME)).thenReturn(process);
when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDENTIALS))).thenReturn(process);
final InputStream inputStream = mock(InputStream.class);
when(process.getInputStream()).thenReturn(inputStream);
when(process.getErrorStream()).thenReturn(new ByteArrayInputStream(new byte[0]));
@@ -101,11 +101,6 @@ public class DefaultDiscoverCatalogWorkerTest {
assertEquals(CATALOG, output);
// test that config is written to correct location on disk.
assertEquals(
Jsons.jsonNode(INPUT.getConnectionConfiguration()),
Jsons.deserialize(IOs.readFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME)));
Assertions.assertTimeout(Duration.ofSeconds(5), () -> {
while (process.getErrorStream().available() != 0) {
Thread.sleep(50);
@@ -134,7 +129,7 @@ public class DefaultDiscoverCatalogWorkerTest {
@Test
public void testDiscoverSchemaException() throws WorkerException {
when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME))
when(integrationLauncher.discover(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME, Jsons.serialize(CREDENTIALS)))
.thenThrow(new RuntimeException());
final DefaultDiscoverCatalogWorker worker = new DefaultDiscoverCatalogWorker(integrationLauncher, streamFactory);

View File

@@ -31,6 +31,8 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.WorkerConstants;
import io.airbyte.workers.WorkerException;
@@ -40,6 +42,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -63,7 +66,11 @@ class DefaultNormalizationRunnerTest {
config = mock(JsonNode.class);
catalog = mock(ConfiguredAirbyteCatalog.class);
when(processFactory.create(JOB_ID, JOB_ATTEMPT, jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME, null, "run",
final Map<String, String> files = ImmutableMap.of(
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));
when(processFactory.create(JOB_ID, JOB_ATTEMPT, jobRoot, DefaultNormalizationRunner.NORMALIZATION_IMAGE_NAME, false, files, null, "run",
"--integration-type", "bigquery",
"--config", WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
"--catalog", WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME))

View File

@@ -24,9 +24,12 @@
package io.airbyte.workers.process;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.workers.WorkerException;
import java.nio.file.Path;
import java.util.Collections;
import java.util.Map;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.mockito.Mockito;
@@ -37,6 +40,15 @@ class AirbyteIntegrationLauncherTest {
private static final int JOB_ATTEMPT = 0;
private static final Path JOB_ROOT = Path.of("abc");
public static final String FAKE_IMAGE = "fake_image";
private static final Map<String, String> CONFIG_FILES = ImmutableMap.of(
"config", "{}");
private static final Map<String, String> CONFIG_CATALOG_FILES = ImmutableMap.of(
"config", "{}",
"catalog", "{}");
private static final Map<String, String> CONFIG_CATALOG_STATE_FILES = ImmutableMap.of(
"config", "{}",
"catalog", "{}",
"state", "{}");
private ProcessFactory processFactory;
private AirbyteIntegrationLauncher launcher;
@@ -51,32 +63,32 @@ class AirbyteIntegrationLauncherTest {
void spec() throws WorkerException {
launcher.spec(JOB_ROOT);
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null, "spec");
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, Collections.emptyMap(), null, "spec");
}
@Test
void check() throws WorkerException {
launcher.check(JOB_ROOT, "config");
launcher.check(JOB_ROOT, "config", "{}");
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null,
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
"check",
"--config", "config");
}
@Test
void discover() throws WorkerException {
launcher.discover(JOB_ROOT, "config");
launcher.discover(JOB_ROOT, "config", "{}");
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null,
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_FILES, null,
"discover",
"--config", "config");
}
@Test
void read() throws WorkerException {
launcher.read(JOB_ROOT, "config", "catalog", "state");
launcher.read(JOB_ROOT, "config", "{}", "catalog", "{}", "state", "{}");
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null,
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, CONFIG_CATALOG_STATE_FILES, null,
Lists.newArrayList(
"read",
"--config", "config",
@@ -86,9 +98,9 @@ class AirbyteIntegrationLauncherTest {
@Test
void write() throws WorkerException {
launcher.write(JOB_ROOT, "config", "catalog");
launcher.write(JOB_ROOT, "config", "{}", "catalog", "{}");
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, null,
Mockito.verify(processFactory).create(JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, true, CONFIG_CATALOG_FILES, null,
"write",
"--config", "config",
"--catalog", "catalog");

View File

@@ -26,6 +26,9 @@ package io.airbyte.workers.process;
import static org.junit.jupiter.api.Assertions.*;
import com.google.common.collect.ImmutableMap;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.workers.WorkerException;
import java.io.IOException;
import java.nio.file.Files;
@@ -54,4 +57,17 @@ class DockerProcessFactoryTest {
assertFalse(processFactory.checkImageExists("airbyte/fake:0.1.2"));
}
@Test
public void testFileWriting() throws IOException, WorkerException {
Path workspaceRoot = Files.createTempDirectory(Files.createDirectories(TEST_ROOT), "process_factory");
Path jobRoot = workspaceRoot.resolve("job");
final DockerProcessFactory processFactory = new DockerProcessFactory(workspaceRoot, "", "", "");
processFactory.create("job_id", 0, jobRoot, "airbyte/scheduler:dev", false, ImmutableMap.of("config.json", "{\"data\": 2}"), "echo hi");
assertEquals(
Jsons.jsonNode(ImmutableMap.of("data", 2)),
Jsons.deserialize(IOs.readFile(jobRoot, "config.json")));
}
}

View File

@@ -0,0 +1,136 @@
/*
* MIT License
*
* Copyright (c) 2020 Airbyte
*
* Permission is hereby granted, free of charge, to any person obtaining a copy
* of this software and associated documentation files (the "Software"), to deal
* in the Software without restriction, including without limitation the rights
* to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
* copies of the Software, and to permit persons to whom the Software is
* furnished to do so, subject to the following conditions:
*
* The above copyright notice and this permission notice shall be included in all
* copies or substantial portions of the Software.
*
* THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
* IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
* FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
* AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
* LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
* OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
* SOFTWARE.
*/
package io.airbyte.workers.process;
import static org.junit.jupiter.api.Assertions.assertEquals;
import static org.junit.jupiter.api.Assertions.assertThrows;
import io.airbyte.commons.docker.DockerUtils;
import io.airbyte.commons.string.Strings;
import io.fabric8.kubernetes.api.model.ContainerBuilder;
import io.fabric8.kubernetes.api.model.Pod;
import io.fabric8.kubernetes.api.model.PodBuilder;
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
import io.fabric8.kubernetes.client.KubernetesClient;
import java.io.IOException;
import java.util.concurrent.TimeUnit;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.DisplayName;
import org.junit.jupiter.api.Nested;
import org.junit.jupiter.api.Test;
import org.testcontainers.shaded.com.google.common.io.Resources;
// Disabled until we start minikube on the node.
@Disabled
public class KubePodProcessTest {
private static final KubernetesClient K8s = new DefaultKubernetesClient();
private static final String ENTRYPOINT = "sh";
private static final String TEST_IMAGE_WITH_VAR_PATH = "Dockerfile.with_var";
private static final String TEST_IMAGE_WITH_VAR_NAME = "worker-test:with-var";
private static final String TEST_IMAGE_NO_VAR_PATH = "Dockerfile.no_var";
private static final String TEST_IMAGE_NO_VAR_NAME = "worker-test:no-var";
@BeforeAll
public static void setup() {
var varDockerfile = Resources.getResource(TEST_IMAGE_WITH_VAR_PATH);
DockerUtils.buildImage(varDockerfile.getPath(), TEST_IMAGE_WITH_VAR_NAME);
var noVarDockerfile = Resources.getResource(TEST_IMAGE_NO_VAR_PATH);
DockerUtils.buildImage(noVarDockerfile.getPath(), TEST_IMAGE_NO_VAR_NAME);
}
@Nested
class GetCommand {
@Test
@DisplayName("Should error if image does not have the right env var set.")
public void testGetCommandFromImageNoCommand() {
assertThrows(RuntimeException.class, () -> KubePodProcess.getCommandFromImage(K8s, TEST_IMAGE_NO_VAR_NAME, "default"));
}
@Test
@DisplayName("Should error if image does not exists.")
public void testGetCommandFromImageMissingImage() {
assertThrows(RuntimeException.class, () -> KubePodProcess.getCommandFromImage(K8s, "bad_missing_image", "default"));
}
@Test
@DisplayName("Should retrieve the right command if image has the right env var set.")
public void testGetCommandFromImageCommandPresent() throws IOException, InterruptedException {
var command = KubePodProcess.getCommandFromImage(K8s, TEST_IMAGE_WITH_VAR_NAME, "default");
assertEquals(ENTRYPOINT, command);
}
}
@Nested
class GetPodIp {
@Test
@DisplayName("Should error when the given pod does not exists.")
public void testGetPodIpNoPod() {
assertThrows(RuntimeException.class, () -> KubePodProcess.getPodIP(K8s, "pod-does-not-exist", "default"));
}
@Test
@DisplayName("Should return the correct pod ip.")
public void testGetPodIpGoodPod() throws InterruptedException {
var sleep = new ContainerBuilder()
.withImage("busybox")
.withName("sleep")
.withCommand("sleep", "100000")
.build();
var podName = Strings.addRandomSuffix("test-get-pod-good-pod", "-", 5);
Pod podDef = new PodBuilder()
.withApiVersion("v1")
.withNewMetadata()
.withName(podName)
.endMetadata()
.withNewSpec()
.withRestartPolicy("Never")
.withRestartPolicy("Never")
.withContainers(sleep)
.endSpec()
.build();
String namespace = "default";
Pod pod = K8s.pods().inNamespace(namespace).createOrReplace(podDef);
K8s.resource(pod).waitUntilReady(20, TimeUnit.SECONDS);
var ip = KubePodProcess.getPodIP(K8s, podName, namespace);
var exp = K8s.pods().inNamespace(namespace).withName(podName).get().getStatus().getPodIP();
assertEquals(exp, ip);
K8s.resource(podDef).inNamespace(namespace).delete();
}
}
}

View File

@@ -36,6 +36,7 @@ import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.google.common.collect.Lists;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardTargetConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.workers.TestConfigHelpers;
@@ -87,8 +88,13 @@ class DefaultAirbyteDestinationTest {
integrationLauncher = mock(IntegrationLauncher.class, RETURNS_DEEP_STUBS);
final InputStream inputStream = mock(InputStream.class);
when(integrationLauncher.write(jobRoot, WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME))
.thenReturn(process);
when(integrationLauncher.write(
jobRoot,
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME,
Jsons.serialize(DESTINATION_CONFIG.getDestinationConnectionConfiguration()),
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME,
Jsons.serialize(DESTINATION_CONFIG.getCatalog())))
.thenReturn(process);
when(process.isAlive()).thenReturn(true);
when(process.getInputStream()).thenReturn(inputStream);

View File

@@ -33,9 +33,9 @@ import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.config.StandardTapConfig;
import io.airbyte.config.State;
@@ -67,17 +67,19 @@ class DefaultAirbyteSourceTest {
private static final String STREAM_NAME = "user_preferences";
private static final String FIELD_NAME = "favorite_color";
private static final JsonNode STATE = Jsons.jsonNode(ImmutableMap.of("checkpoint", "the future."));
private static final JsonNode CONFIG = Jsons.jsonNode(Map.of(
"apiKey", "123",
"region", "us-east"));
private static final ConfiguredAirbyteCatalog CATALOG = CatalogHelpers.createConfiguredAirbyteCatalog(
"hudi:latest",
NAMESPACE,
Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING));
private static final StandardTapConfig SOURCE_CONFIG = new StandardTapConfig()
.withState(new State().withState(Jsons.jsonNode(ImmutableMap.of("checkpoint", "the future."))))
.withSourceConnectionConfiguration(Jsons.jsonNode(Map.of(
"apiKey", "123",
"region", "us-east")))
.withCatalog(CatalogHelpers.createConfiguredAirbyteCatalog("hudi:latest", NAMESPACE, Field.of(FIELD_NAME, JsonSchemaPrimitive.STRING)));
.withState(new State().withState(STATE))
.withSourceConnectionConfiguration(CONFIG)
.withCatalog(CATALOG);
private static final List<AirbyteMessage> MESSAGES = Lists.newArrayList(
AirbyteMessageUtils.createRecordMessage(STREAM_NAME, FIELD_NAME, "blue"),
@@ -100,8 +102,11 @@ class DefaultAirbyteSourceTest {
when(integrationLauncher.read(
jobRoot,
WorkerConstants.SOURCE_CONFIG_JSON_FILENAME,
Jsons.serialize(CONFIG),
WorkerConstants.SOURCE_CATALOG_JSON_FILENAME,
WorkerConstants.INPUT_STATE_JSON_FILENAME)).thenReturn(process);
Jsons.serialize(CATALOG),
WorkerConstants.INPUT_STATE_JSON_FILENAME,
Jsons.serialize(STATE))).thenReturn(process);
when(process.isAlive()).thenReturn(true);
when(process.getInputStream()).thenReturn(inputStream);
when(process.getErrorStream()).thenReturn(new ByteArrayInputStream("qwer".getBytes(StandardCharsets.UTF_8)));
@@ -131,16 +136,6 @@ class DefaultAirbyteSourceTest {
source.close();
assertEquals(
Jsons.jsonNode(SOURCE_CONFIG.getSourceConnectionConfiguration()),
Jsons.deserialize(IOs.readFile(jobRoot, WorkerConstants.SOURCE_CONFIG_JSON_FILENAME)));
assertEquals(
Jsons.jsonNode(SOURCE_CONFIG.getState().getState()),
Jsons.deserialize(IOs.readFile(jobRoot, WorkerConstants.INPUT_STATE_JSON_FILENAME)));
assertEquals(
Jsons.jsonNode(CATALOG),
Jsons.deserialize(IOs.readFile(jobRoot, WorkerConstants.SOURCE_CATALOG_JSON_FILENAME)));
assertEquals(MESSAGES, messages);
Assertions.assertTimeout(Duration.ofSeconds(5), () -> {

View File

@@ -0,0 +1,3 @@
FROM alpine:3
ENTRYPOINT "sh"

View File

@@ -0,0 +1,5 @@
FROM alpine:3
ENV AIRBYTE_ENTRYPOINT="sh"
ENTRYPOINT "sh"

View File

@@ -17,3 +17,4 @@ PAPERCUPS_STORYTIME=disabled
IS_DEMO=false
TEMPORAL_HOST=airbyte-temporal-svc:7233
INTERNAL_API_HOST=airbyte-server-svc:8001
TEMPORAL_WORKER_PORTS=9000,9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029

View File

@@ -20,5 +20,4 @@ images:
configMapGenerator:
- name: airbyte-env
envs:
- .env
env: .env

View File

@@ -17,3 +17,4 @@ PAPERCUPS_STORYTIME=enabled
IS_DEMO=false
TEMPORAL_HOST=airbyte-temporal-svc:7233
INTERNAL_API_HOST=airbyte-server-svc:8001
TEMPORAL_WORKER_PORTS=9000,9001,9002,9003,9004,9005,9006,9007,9008,9009,9010,9011,9012,9013,9014,9015,9016,9017,9018,9019,9020,9021,9022,9023,9024,9025,9026,9027,9028,9029

View File

@@ -20,5 +20,4 @@ images:
configMapGenerator:
- name: airbyte-env
envs:
- .env
env: .env

View File

@@ -81,8 +81,42 @@ spec:
configMapKeyRef:
name: airbyte-env
key: TEMPORAL_HOST
- name: TEMPORAL_WORKER_PORTS
valueFrom:
configMapKeyRef:
name: airbyte-env
key: TEMPORAL_WORKER_PORTS
ports:
- containerPort: 8001
- containerPort: 9000
- containerPort: 9001
- containerPort: 9002
- containerPort: 9003
- containerPort: 9004
- containerPort: 9005
- containerPort: 9006
- containerPort: 9007
- containerPort: 9008
- containerPort: 9009
- containerPort: 9010
- containerPort: 9011
- containerPort: 9012
- containerPort: 9013
- containerPort: 9014
- containerPort: 9015
- containerPort: 9016
- containerPort: 9017
- containerPort: 9018
- containerPort: 9019
- containerPort: 9020
- containerPort: 9021
- containerPort: 9022
- containerPort: 9023
- containerPort: 9024
- containerPort: 9025
- containerPort: 9026
- containerPort: 9027
- containerPort: 9028
- containerPort: 9029
volumeMounts:
- name: airbyte-volume-configs
mountPath: /configs

View File

@@ -18,6 +18,7 @@ include ':airbyte-api'
include ':airbyte-cli'
include ':airbyte-cdk:python'
include ':airbyte-commons'
include ':airbyte-commons-docker'
include ':airbyte-config:models'
include ':airbyte-config:init'
include ':airbyte-config:persistence'

View File

@@ -14,7 +14,14 @@ kubectl apply -k kube/overlays/dev
kubectl wait --for=condition=Available deployment/airbyte-server --timeout=300s || (kubectl describe pods && exit 1)
kubectl wait --for=condition=Available deployment/airbyte-scheduler --timeout=300s || (kubectl describe pods && exit 1)
sleep 20s
# allocates a lot of time to start kube. takes a while for postgres+temporal to work things out
sleep 120s
server_logs () { echo "server logs:" && kubectl logs deployment.apps/airbyte-server; }
scheduler_logs () { echo "scheduler logs:" && kubectl logs deployment.apps/airbyte-scheduler; }
print_all_logs () { server_logs; scheduler_logs; }
trap "echo 'kube logs:' && print_all_logs" EXIT
kubectl port-forward svc/airbyte-server-svc 8001:8001 &