Add orchestrator label. (#20904)
Add the orchestrator label to orchestrators so we can better differentiate orchestrator pods. This is useful since orchestrator pods are the only pods in the job namespace with a need to talk to the main Airbyte application pods. These labels allow us to apply more granular network filtering. Also took the chance to do some clean up of labels.
This commit is contained in:
@@ -4,6 +4,11 @@
|
||||
|
||||
package io.airbyte.workers.general;
|
||||
|
||||
import static io.airbyte.workers.process.Metadata.CUSTOM_STEP;
|
||||
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
@@ -17,7 +22,6 @@ import io.airbyte.config.ResourceRequirements;
|
||||
import io.airbyte.workers.WorkerUtils;
|
||||
import io.airbyte.workers.exception.WorkerException;
|
||||
import io.airbyte.workers.normalization.NormalizationRunner;
|
||||
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
|
||||
import io.airbyte.workers.process.ProcessFactory;
|
||||
import java.nio.file.Path;
|
||||
import java.util.ArrayList;
|
||||
@@ -93,7 +97,7 @@ public class DbtTransformationRunner implements AutoCloseable {
|
||||
Collections.addAll(dbtArguments, Commandline.translateCommandline(dbtConfig.getDbtArguments()));
|
||||
process =
|
||||
processFactory.create(
|
||||
AirbyteIntegrationLauncher.CUSTOM_STEP,
|
||||
CUSTOM_STEP,
|
||||
jobId,
|
||||
attempt,
|
||||
jobRoot,
|
||||
@@ -103,8 +107,7 @@ public class DbtTransformationRunner implements AutoCloseable {
|
||||
files,
|
||||
"/bin/bash",
|
||||
resourceRequirements,
|
||||
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
|
||||
AirbyteIntegrationLauncher.CUSTOM_STEP),
|
||||
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, CUSTOM_STEP),
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
dbtArguments.toArray(new String[0]));
|
||||
|
||||
@@ -4,6 +4,11 @@
|
||||
|
||||
package io.airbyte.workers.normalization;
|
||||
|
||||
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
|
||||
import static io.airbyte.workers.process.Metadata.NORMALIZE_STEP;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
|
||||
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.google.common.base.Strings;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
@@ -25,7 +30,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
|
||||
import io.airbyte.workers.WorkerConstants;
|
||||
import io.airbyte.workers.WorkerUtils;
|
||||
import io.airbyte.workers.exception.WorkerException;
|
||||
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
|
||||
import io.airbyte.workers.process.ProcessFactory;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Path;
|
||||
@@ -120,7 +124,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
|
||||
try {
|
||||
LOGGER.info("Running with normalization version: {}", normalizationImageName);
|
||||
process = processFactory.create(
|
||||
AirbyteIntegrationLauncher.NORMALIZE_STEP,
|
||||
NORMALIZE_STEP,
|
||||
jobId,
|
||||
attempt,
|
||||
jobRoot,
|
||||
@@ -130,8 +134,7 @@ public class DefaultNormalizationRunner implements NormalizationRunner {
|
||||
false, files,
|
||||
null,
|
||||
resourceRequirements,
|
||||
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
|
||||
AirbyteIntegrationLauncher.NORMALIZE_STEP),
|
||||
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP),
|
||||
Collections.emptyMap(),
|
||||
Collections.emptyMap(),
|
||||
args);
|
||||
|
||||
@@ -8,6 +8,14 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.DOCKER_IMAGE_KEY;
|
||||
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
|
||||
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
|
||||
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
|
||||
import static io.airbyte.workers.process.Metadata.CHECK_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.DISCOVER_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
|
||||
import static io.airbyte.workers.process.Metadata.READ_STEP;
|
||||
import static io.airbyte.workers.process.Metadata.SPEC_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
|
||||
import static io.airbyte.workers.process.Metadata.WRITE_STEP;
|
||||
|
||||
import com.google.common.base.Preconditions;
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
@@ -27,29 +35,8 @@ import java.util.Map;
|
||||
|
||||
public class AirbyteIntegrationLauncher implements IntegrationLauncher {
|
||||
|
||||
/**
|
||||
* The following variables help, either via names or labels, add metadata to processes actually
|
||||
* running operations. These are more readable forms of
|
||||
* {@link io.airbyte.config.JobTypeResourceLimit.JobType}.
|
||||
*/
|
||||
public static final String JOB_TYPE = "job_type";
|
||||
public static final String SYNC_JOB = "sync";
|
||||
public static final String SPEC_JOB = "spec";
|
||||
public static final String CHECK_JOB = "check";
|
||||
public static final String DISCOVER_JOB = "discover";
|
||||
|
||||
private static final String CONFIG = "--config";
|
||||
|
||||
/**
|
||||
* A sync job can actually be broken down into the following steps. Try to be as precise as possible
|
||||
* with naming/labels to help operations.
|
||||
*/
|
||||
public static final String SYNC_STEP = "sync_step";
|
||||
public static final String READ_STEP = "read";
|
||||
public static final String WRITE_STEP = "write";
|
||||
public static final String NORMALIZE_STEP = "normalize";
|
||||
public static final String CUSTOM_STEP = "custom";
|
||||
|
||||
private final String jobId;
|
||||
private final int attempt;
|
||||
private final String imageName;
|
||||
@@ -94,7 +81,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
|
||||
Collections.emptyMap(),
|
||||
null,
|
||||
resourceRequirement,
|
||||
Map.of(JOB_TYPE, SPEC_JOB),
|
||||
Map.of(JOB_TYPE_KEY, SPEC_JOB),
|
||||
getWorkerMetadata(),
|
||||
Collections.emptyMap(),
|
||||
"spec");
|
||||
@@ -115,7 +102,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
|
||||
ImmutableMap.of(configFilename, configContents),
|
||||
null,
|
||||
resourceRequirement,
|
||||
Map.of(JOB_TYPE, CHECK_JOB),
|
||||
Map.of(JOB_TYPE_KEY, CHECK_JOB),
|
||||
getWorkerMetadata(),
|
||||
Collections.emptyMap(),
|
||||
"check",
|
||||
@@ -137,7 +124,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
|
||||
ImmutableMap.of(configFilename, configContents),
|
||||
null,
|
||||
resourceRequirement,
|
||||
Map.of(JOB_TYPE, DISCOVER_JOB),
|
||||
Map.of(JOB_TYPE_KEY, DISCOVER_JOB),
|
||||
getWorkerMetadata(),
|
||||
Collections.emptyMap(),
|
||||
"discover",
|
||||
@@ -183,7 +170,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
|
||||
files,
|
||||
null,
|
||||
resourceRequirement,
|
||||
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, READ_STEP),
|
||||
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP),
|
||||
getWorkerMetadata(),
|
||||
Collections.emptyMap(),
|
||||
arguments.toArray(new String[arguments.size()]));
|
||||
@@ -213,7 +200,7 @@ public class AirbyteIntegrationLauncher implements IntegrationLauncher {
|
||||
files,
|
||||
null,
|
||||
resourceRequirement,
|
||||
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, WRITE_STEP),
|
||||
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP),
|
||||
getWorkerMetadata(),
|
||||
Collections.emptyMap(),
|
||||
"write",
|
||||
|
||||
@@ -25,11 +25,6 @@ public class KubeProcessFactory implements ProcessFactory {
|
||||
|
||||
private static final Logger LOGGER = LoggerFactory.getLogger(KubeProcessFactory.class);
|
||||
|
||||
private static final String JOB_LABEL_KEY = "job_id";
|
||||
private static final String ATTEMPT_LABEL_KEY = "attempt_id";
|
||||
private static final String WORKER_POD_LABEL_KEY = "airbyte";
|
||||
private static final String WORKER_POD_LABEL_VALUE = "worker-pod";
|
||||
|
||||
private final WorkerConfigs workerConfigs;
|
||||
private final String namespace;
|
||||
private final KubernetesClient fabricClient;
|
||||
@@ -146,13 +141,17 @@ public class KubeProcessFactory implements ProcessFactory {
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Returns general labels to be applied to all Kubernetes pods. All general labels should be added
|
||||
* here.
|
||||
*/
|
||||
public static Map<String, String> getLabels(final String jobId, final int attemptId, final Map<String, String> customLabels) {
|
||||
final var allLabels = new HashMap<>(customLabels);
|
||||
|
||||
final var generalKubeLabels = Map.of(
|
||||
JOB_LABEL_KEY, jobId,
|
||||
ATTEMPT_LABEL_KEY, String.valueOf(attemptId),
|
||||
WORKER_POD_LABEL_KEY, WORKER_POD_LABEL_VALUE);
|
||||
Metadata.JOB_LABEL_KEY, jobId,
|
||||
Metadata.ATTEMPT_LABEL_KEY, String.valueOf(attemptId),
|
||||
Metadata.WORKER_POD_LABEL_KEY, Metadata.WORKER_POD_LABEL_VALUE);
|
||||
|
||||
allLabels.putAll(generalKubeLabels);
|
||||
|
||||
|
||||
@@ -0,0 +1,42 @@
|
||||
/*
|
||||
* Copyright (c) 2022 Airbyte, Inc., all rights reserved.
|
||||
*/
|
||||
|
||||
package io.airbyte.workers.process;
|
||||
|
||||
/**
|
||||
* The following variables help, either via names or labels, add metadata to processes actually
|
||||
* running operations to ease operations.
|
||||
*/
|
||||
public final class Metadata {
|
||||
|
||||
/**
|
||||
* General Metadata
|
||||
*/
|
||||
static final String JOB_LABEL_KEY = "job_id";
|
||||
static final String ATTEMPT_LABEL_KEY = "attempt_id";
|
||||
static final String WORKER_POD_LABEL_KEY = "airbyte";
|
||||
static final String WORKER_POD_LABEL_VALUE = "job-pod";
|
||||
public static final String CONNECTION_ID_LABEL_KEY = "connection_id";
|
||||
|
||||
/**
|
||||
* These are more readable forms of {@link io.airbyte.config.JobTypeResourceLimit.JobType}.
|
||||
*/
|
||||
public static final String JOB_TYPE_KEY = "job_type";
|
||||
public static final String SYNC_JOB = "sync";
|
||||
public static final String SPEC_JOB = "spec";
|
||||
public static final String CHECK_JOB = "check";
|
||||
public static final String DISCOVER_JOB = "discover";
|
||||
|
||||
/**
|
||||
* A sync job can actually be broken down into the following steps. Try to be as precise as possible
|
||||
* with naming/labels to help operations.
|
||||
*/
|
||||
public static final String SYNC_STEP_KEY = "sync_step";
|
||||
public static final String READ_STEP = "read";
|
||||
public static final String WRITE_STEP = "write";
|
||||
public static final String NORMALIZE_STEP = "normalize";
|
||||
public static final String CUSTOM_STEP = "custom";
|
||||
public static final String ORCHESTRATOR_STEP = "orchestrator";
|
||||
|
||||
}
|
||||
@@ -9,6 +9,9 @@ import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ID_KEY;
|
||||
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.JOB_ROOT_KEY;
|
||||
import static io.airbyte.metrics.lib.ApmTraceConstants.Tags.PROCESS_EXIT_VALUE_KEY;
|
||||
import static io.airbyte.metrics.lib.ApmTraceConstants.WORKER_OPERATION_NAME;
|
||||
import static io.airbyte.workers.process.Metadata.CONNECTION_ID_LABEL_KEY;
|
||||
import static io.airbyte.workers.process.Metadata.ORCHESTRATOR_STEP;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
|
||||
|
||||
import com.google.common.base.Stopwatch;
|
||||
import datadog.trace.api.Trace;
|
||||
@@ -57,7 +60,6 @@ import lombok.extern.slf4j.Slf4j;
|
||||
@Slf4j
|
||||
public class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUTPUT> {
|
||||
|
||||
private static final String CONNECTION_ID_LABEL_KEY = "connection_id";
|
||||
private static final Duration MAX_DELETION_TIMEOUT = Duration.ofSeconds(45);
|
||||
|
||||
private final UUID connectionId;
|
||||
@@ -138,7 +140,7 @@ public class LauncherWorker<INPUT, OUTPUT> implements Worker<INPUT, OUTPUT> {
|
||||
final var allLabels = KubeProcessFactory.getLabels(
|
||||
jobRunConfig.getJobId(),
|
||||
Math.toIntExact(jobRunConfig.getAttemptId()),
|
||||
Map.of(CONNECTION_ID_LABEL_KEY, connectionId.toString()));
|
||||
Map.of(CONNECTION_ID_LABEL_KEY, connectionId.toString(), SYNC_STEP_KEY, ORCHESTRATOR_STEP));
|
||||
|
||||
final var podNameAndJobPrefix = podNamePrefix + "-job-" + jobRunConfig.getJobId() + "-attempt-";
|
||||
final var podName = podNameAndJobPrefix + jobRunConfig.getAttemptId();
|
||||
|
||||
@@ -5,6 +5,10 @@
|
||||
package io.airbyte.workers.normalization;
|
||||
|
||||
import static io.airbyte.commons.logging.LoggingHelper.RESET;
|
||||
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
|
||||
import static io.airbyte.workers.process.Metadata.NORMALIZE_STEP;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertFalse;
|
||||
import static org.junit.jupiter.api.Assertions.assertThrows;
|
||||
@@ -27,7 +31,6 @@ import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
|
||||
import io.airbyte.workers.WorkerConfigs;
|
||||
import io.airbyte.workers.WorkerConstants;
|
||||
import io.airbyte.workers.exception.WorkerException;
|
||||
import io.airbyte.workers.process.AirbyteIntegrationLauncher;
|
||||
import io.airbyte.workers.process.ProcessFactory;
|
||||
import java.io.ByteArrayInputStream;
|
||||
import java.io.IOException;
|
||||
@@ -85,11 +88,10 @@ class DefaultNormalizationRunnerTest {
|
||||
WorkerConstants.DESTINATION_CONFIG_JSON_FILENAME, Jsons.serialize(config),
|
||||
WorkerConstants.DESTINATION_CATALOG_JSON_FILENAME, Jsons.serialize(catalog));
|
||||
|
||||
when(processFactory.create(AirbyteIntegrationLauncher.NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
|
||||
when(processFactory.create(NORMALIZE_STEP, JOB_ID, JOB_ATTEMPT, jobRoot,
|
||||
DockerUtils.getTaggedImageName(NORMALIZATION_IMAGE, NORMALIZATION_TAG), false, false, files, null,
|
||||
workerConfigs.getResourceRequirements(),
|
||||
Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SYNC_JOB, AirbyteIntegrationLauncher.SYNC_STEP,
|
||||
AirbyteIntegrationLauncher.NORMALIZE_STEP),
|
||||
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, NORMALIZE_STEP),
|
||||
Map.of(),
|
||||
Map.of(),
|
||||
"run",
|
||||
|
||||
@@ -4,14 +4,14 @@
|
||||
|
||||
package io.airbyte.workers.process;
|
||||
|
||||
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.CHECK_JOB;
|
||||
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.DISCOVER_JOB;
|
||||
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.JOB_TYPE;
|
||||
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.READ_STEP;
|
||||
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SPEC_JOB;
|
||||
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_JOB;
|
||||
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.SYNC_STEP;
|
||||
import static io.airbyte.workers.process.AirbyteIntegrationLauncher.WRITE_STEP;
|
||||
import static io.airbyte.workers.process.Metadata.CHECK_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.DISCOVER_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.JOB_TYPE_KEY;
|
||||
import static io.airbyte.workers.process.Metadata.READ_STEP;
|
||||
import static io.airbyte.workers.process.Metadata.SPEC_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_STEP_KEY;
|
||||
import static io.airbyte.workers.process.Metadata.WRITE_STEP;
|
||||
|
||||
import com.google.common.collect.ImmutableMap;
|
||||
import com.google.common.collect.Lists;
|
||||
@@ -72,7 +72,7 @@ class AirbyteIntegrationLauncherTest {
|
||||
launcher.spec(JOB_ROOT);
|
||||
|
||||
Mockito.verify(processFactory).create(SPEC_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, Collections.emptyMap(), null,
|
||||
workerConfigs.getResourceRequirements(), Map.of(AirbyteIntegrationLauncher.JOB_TYPE, AirbyteIntegrationLauncher.SPEC_JOB), JOB_METADATA,
|
||||
workerConfigs.getResourceRequirements(), Map.of(JOB_TYPE_KEY, SPEC_JOB), JOB_METADATA,
|
||||
Map.of(),
|
||||
"spec");
|
||||
}
|
||||
@@ -83,7 +83,7 @@ class AirbyteIntegrationLauncherTest {
|
||||
|
||||
Mockito.verify(processFactory).create(CHECK_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null,
|
||||
workerConfigs.getResourceRequirements(),
|
||||
Map.of(JOB_TYPE, CHECK_JOB),
|
||||
Map.of(JOB_TYPE_KEY, CHECK_JOB),
|
||||
JOB_METADATA,
|
||||
Map.of(),
|
||||
"check",
|
||||
@@ -96,7 +96,7 @@ class AirbyteIntegrationLauncherTest {
|
||||
|
||||
Mockito.verify(processFactory).create(DISCOVER_JOB, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_FILES, null,
|
||||
workerConfigs.getResourceRequirements(),
|
||||
Map.of(JOB_TYPE, DISCOVER_JOB),
|
||||
Map.of(JOB_TYPE_KEY, DISCOVER_JOB),
|
||||
JOB_METADATA,
|
||||
Map.of(),
|
||||
"discover",
|
||||
@@ -109,7 +109,7 @@ class AirbyteIntegrationLauncherTest {
|
||||
|
||||
Mockito.verify(processFactory).create(READ_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, false, CONFIG_CATALOG_STATE_FILES, null,
|
||||
workerConfigs.getResourceRequirements(),
|
||||
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, READ_STEP),
|
||||
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, READ_STEP),
|
||||
JOB_METADATA,
|
||||
Map.of(),
|
||||
Lists.newArrayList(
|
||||
@@ -125,7 +125,7 @@ class AirbyteIntegrationLauncherTest {
|
||||
|
||||
Mockito.verify(processFactory).create(WRITE_STEP, JOB_ID, JOB_ATTEMPT, JOB_ROOT, FAKE_IMAGE, false, true, CONFIG_CATALOG_FILES, null,
|
||||
workerConfigs.getResourceRequirements(),
|
||||
Map.of(JOB_TYPE, SYNC_JOB, SYNC_STEP, WRITE_STEP),
|
||||
Map.of(JOB_TYPE_KEY, SYNC_JOB, SYNC_STEP_KEY, WRITE_STEP),
|
||||
JOB_METADATA,
|
||||
Map.of(),
|
||||
"write",
|
||||
|
||||
@@ -4,6 +4,8 @@
|
||||
|
||||
package io.airbyte.workers.process;
|
||||
|
||||
import static io.airbyte.workers.process.Metadata.SYNC_JOB;
|
||||
|
||||
import org.junit.jupiter.api.Assertions;
|
||||
import org.junit.jupiter.api.Test;
|
||||
|
||||
@@ -11,7 +13,7 @@ class ProcessFactoryTest {
|
||||
|
||||
@Test
|
||||
void getPodNameNormal() {
|
||||
final var name = ProcessFactory.createProcessName("airbyte/tester:1", AirbyteIntegrationLauncher.SYNC_JOB, "1", 10,
|
||||
final var name = ProcessFactory.createProcessName("airbyte/tester:1", SYNC_JOB, "1", 10,
|
||||
KubeProcessFactory.KUBE_NAME_LEN_LIMIT);
|
||||
final var withoutRandSuffix = name.substring(0, name.length() - 5);
|
||||
Assertions.assertEquals("tester-sync-1-10-", withoutRandSuffix);
|
||||
@@ -21,7 +23,7 @@ class ProcessFactoryTest {
|
||||
void getPodNameTruncated() {
|
||||
final var name =
|
||||
ProcessFactory.createProcessName("airbyte/very-very-very-long-name-longer-than-63-chars:2",
|
||||
AirbyteIntegrationLauncher.SYNC_JOB, "1", 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT);
|
||||
SYNC_JOB, "1", 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT);
|
||||
final var withoutRandSuffix = name.substring(0, name.length() - 5);
|
||||
Assertions.assertEquals("very-very-very-long-name-longer-than-63-chars-sync-1-10-", withoutRandSuffix);
|
||||
}
|
||||
@@ -29,7 +31,7 @@ class ProcessFactoryTest {
|
||||
@Test
|
||||
void testHandlingDashAsFirstCharacter() {
|
||||
final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330";
|
||||
final var name = ProcessFactory.createProcessName("airbyte/source-google-adwordsv2:latest", AirbyteIntegrationLauncher.SYNC_JOB,
|
||||
final var name = ProcessFactory.createProcessName("airbyte/source-google-adwordsv2:latest", SYNC_JOB,
|
||||
uuid, 10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT);
|
||||
|
||||
final var withoutRandSuffix = name.substring(0, name.length() - 5);
|
||||
@@ -39,7 +41,7 @@ class ProcessFactoryTest {
|
||||
@Test
|
||||
void testOnlyDashes() {
|
||||
final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330";
|
||||
final var name = ProcessFactory.createProcessName("--------", AirbyteIntegrationLauncher.SYNC_JOB, uuid,
|
||||
final var name = ProcessFactory.createProcessName("--------", SYNC_JOB, uuid,
|
||||
10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT);
|
||||
|
||||
final var withoutRandSuffix = name.substring(0, name.length() - 5);
|
||||
@@ -49,7 +51,7 @@ class ProcessFactoryTest {
|
||||
@Test
|
||||
void testOnlyNumeric() {
|
||||
final var uuid = "7339ba3b-cb53-4210-9591-c70d4a372330";
|
||||
final var name = ProcessFactory.createProcessName("0000000000", AirbyteIntegrationLauncher.SYNC_JOB, uuid,
|
||||
final var name = ProcessFactory.createProcessName("0000000000", SYNC_JOB, uuid,
|
||||
10, KubeProcessFactory.KUBE_NAME_LEN_LIMIT);
|
||||
|
||||
final var withoutRandSuffix = name.substring(0, name.length() - 5);
|
||||
|
||||
@@ -105,7 +105,7 @@ public class ReplicationJobOrchestrator implements JobOrchestrator<StandardSyncI
|
||||
|
||||
// At this moment, if either source or destination is from custom connector image, we will put all
|
||||
// jobs into isolated pool to run.
|
||||
boolean useIsolatedPool = sourceLauncherConfig.getIsCustomConnector() || destinationLauncherConfig.getIsCustomConnector();
|
||||
final boolean useIsolatedPool = sourceLauncherConfig.getIsCustomConnector() || destinationLauncherConfig.getIsCustomConnector();
|
||||
log.info("Setting up source launcher...");
|
||||
final var sourceLauncher = new AirbyteIntegrationLauncher(
|
||||
sourceLauncherConfig.getJobId(),
|
||||
|
||||
Reference in New Issue
Block a user