🐛 Connector exit code should still be detected if resourceVersion is updated (#11861)
This commit is contained in:
@@ -3,7 +3,7 @@ plugins {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation 'io.fabric8:kubernetes-client:5.3.1'
|
||||
implementation 'io.fabric8:kubernetes-client:5.12.2'
|
||||
implementation 'org.apache.commons:commons-lang3:3.11'
|
||||
implementation 'org.apache.commons:commons-text:1.9'
|
||||
implementation 'org.eclipse.jetty:jetty-server:9.4.31.v20200723'
|
||||
|
||||
@@ -74,14 +74,14 @@ public class ContainerOrchestratorApp {
|
||||
}
|
||||
|
||||
private void configureLogging() {
|
||||
for (String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) {
|
||||
for (final String envVar : OrchestratorConstants.ENV_VARS_TO_TRANSFER) {
|
||||
if (envMap.containsKey(envVar)) {
|
||||
System.setProperty(envVar, envMap.get(envVar));
|
||||
}
|
||||
}
|
||||
|
||||
// make sure the new configuration is picked up
|
||||
LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
|
||||
final LoggerContext ctx = (LoggerContext) LogManager.getContext(false);
|
||||
ctx.reconfigure();
|
||||
|
||||
final var logClient = LogClientSingleton.getInstance();
|
||||
@@ -119,7 +119,7 @@ public class ContainerOrchestratorApp {
|
||||
|
||||
// required to kill clients with thread pools
|
||||
System.exit(0);
|
||||
} catch (Throwable t) {
|
||||
} catch (final Throwable t) {
|
||||
asyncStateManager.write(kubePodInfo, AsyncKubePodStatus.FAILED);
|
||||
System.exit(1);
|
||||
}
|
||||
@@ -177,7 +177,7 @@ public class ContainerOrchestratorApp {
|
||||
|
||||
final var app = new ContainerOrchestratorApp(applicationName, envMap, jobRunConfig, kubePodInfo);
|
||||
app.run();
|
||||
} catch (Throwable t) {
|
||||
} catch (final Throwable t) {
|
||||
log.error("Orchestrator failed...", t);
|
||||
// otherwise the pod hangs on closing
|
||||
System.exit(1);
|
||||
@@ -212,7 +212,11 @@ public class ContainerOrchestratorApp {
|
||||
// exposed)
|
||||
KubePortManagerSingleton.init(OrchestratorConstants.PORTS);
|
||||
|
||||
return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
|
||||
return new KubeProcessFactory(workerConfigs,
|
||||
configs.getJobKubeNamespace(),
|
||||
fabricClient,
|
||||
kubeHeartbeatUrl,
|
||||
false);
|
||||
} else {
|
||||
return new DockerProcessFactory(
|
||||
workerConfigs,
|
||||
|
||||
@@ -3,7 +3,7 @@ plugins {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation 'io.fabric8:kubernetes-client:5.9.0'
|
||||
implementation 'io.fabric8:kubernetes-client:5.12.2'
|
||||
implementation 'io.temporal:temporal-sdk:1.8.1'
|
||||
|
||||
implementation project(':airbyte-analytics')
|
||||
|
||||
@@ -38,7 +38,7 @@ dependencies {
|
||||
implementation project(':airbyte-api')
|
||||
implementation project(':airbyte-container-orchestrator')
|
||||
|
||||
implementation 'io.fabric8:kubernetes-client:5.3.1'
|
||||
implementation 'io.fabric8:kubernetes-client:5.12.2'
|
||||
implementation 'org.testcontainers:testcontainers:1.15.3'
|
||||
|
||||
acceptanceTestsImplementation project(':airbyte-api')
|
||||
|
||||
@@ -11,7 +11,7 @@ configurations {
|
||||
}
|
||||
|
||||
dependencies {
|
||||
implementation 'io.fabric8:kubernetes-client:5.3.1'
|
||||
implementation 'io.fabric8:kubernetes-client:5.12.2'
|
||||
implementation 'io.temporal:temporal-sdk:1.8.1'
|
||||
implementation 'org.apache.ant:ant:1.10.10'
|
||||
implementation 'org.apache.commons:commons-lang3:3.11'
|
||||
|
||||
@@ -281,7 +281,11 @@ public class WorkerApp {
|
||||
final String localIp = InetAddress.getLocalHost().getHostAddress();
|
||||
final String kubeHeartbeatUrl = localIp + ":" + KUBE_HEARTBEAT_PORT;
|
||||
LOGGER.info("Using Kubernetes namespace: {}", configs.getJobKubeNamespace());
|
||||
return new KubeProcessFactory(workerConfigs, configs.getJobKubeNamespace(), fabricClient, kubeHeartbeatUrl, false);
|
||||
return new KubeProcessFactory(workerConfigs,
|
||||
configs.getJobKubeNamespace(),
|
||||
fabricClient,
|
||||
kubeHeartbeatUrl,
|
||||
false);
|
||||
} else {
|
||||
return new DockerProcessFactory(
|
||||
workerConfigs,
|
||||
|
||||
@@ -310,16 +310,12 @@ public class AsyncOrchestratorPodProcess implements KubePod {
|
||||
.createOrReplace(podToCreate);
|
||||
|
||||
log.info("Waiting for pod to be running...");
|
||||
try {
|
||||
kubernetesClient.pods()
|
||||
.inNamespace(kubePodInfo.namespace())
|
||||
.withName(kubePodInfo.name())
|
||||
.waitUntilCondition(p -> {
|
||||
return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null;
|
||||
}, 5, TimeUnit.MINUTES);
|
||||
} catch (final InterruptedException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
kubernetesClient.pods()
|
||||
.inNamespace(kubePodInfo.namespace())
|
||||
.withName(kubePodInfo.name())
|
||||
.waitUntilCondition(p -> {
|
||||
return !p.getStatus().getContainerStatuses().isEmpty() && p.getStatus().getContainerStatuses().get(0).getState().getWaiting() == null;
|
||||
}, 5, TimeUnit.MINUTES);
|
||||
|
||||
final var podStatus = kubernetesClient.pods()
|
||||
.inNamespace(kubePodInfo.namespace())
|
||||
|
||||
@@ -7,8 +7,9 @@ package io.airbyte.workers.process;
|
||||
import com.google.common.collect.MoreCollectors;
|
||||
import io.fabric8.kubernetes.api.model.ContainerStatus;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import io.fabric8.kubernetes.client.Watcher;
|
||||
import io.fabric8.kubernetes.client.WatcherException;
|
||||
import io.fabric8.kubernetes.client.informers.ResourceEventHandler;
|
||||
import java.util.Optional;
|
||||
import java.util.concurrent.atomic.AtomicBoolean;
|
||||
import java.util.function.Consumer;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
@@ -20,52 +21,111 @@ import lombok.extern.slf4j.Slf4j;
|
||||
* be able to retrieve the exit code).
|
||||
*/
|
||||
@Slf4j
|
||||
public class ExitCodeWatcher implements Watcher<Pod> {
|
||||
public class ExitCodeWatcher implements ResourceEventHandler<Pod> {
|
||||
|
||||
private final String podName;
|
||||
private final String podNamespace;
|
||||
private final Consumer<Integer> onExitCode;
|
||||
private final Consumer<WatcherException> onWatchFailure;
|
||||
private boolean exitCodeRetrieved = false;
|
||||
private final Runnable onWatchFailure;
|
||||
/**
|
||||
* This flag is set to false when we either (a) find the pod's exit code, or (b) when the pod is
|
||||
* deleted. This is so that we call exactly one of onExitCode and onWatchFailure, and we make that
|
||||
* call exactly once.
|
||||
* <p>
|
||||
* We rely on this class being side-effect-free, outside of persistExitCode() and persistFailure().
|
||||
* Those two methods use compareAndSet to prevent race conditions. Everywhere else, we can be sloppy
|
||||
* because we won't actually emit any output.
|
||||
*/
|
||||
private final AtomicBoolean active = new AtomicBoolean(true);
|
||||
|
||||
/**
|
||||
*
|
||||
* @param onExitCode callback used to store the exit code
|
||||
* @param onWatchFailure callback that's triggered when the watch fails. should be some failed exit
|
||||
* code.
|
||||
*/
|
||||
public ExitCodeWatcher(final Consumer<Integer> onExitCode, final Consumer<WatcherException> onWatchFailure) {
|
||||
public ExitCodeWatcher(final String podName,
|
||||
final String podNamespace,
|
||||
final Consumer<Integer> onExitCode,
|
||||
final Runnable onWatchFailure) {
|
||||
this.podName = podName;
|
||||
this.podNamespace = podNamespace;
|
||||
this.onExitCode = onExitCode;
|
||||
this.onWatchFailure = onWatchFailure;
|
||||
}
|
||||
|
||||
@Override
|
||||
public void eventReceived(Action action, Pod resource) {
|
||||
try {
|
||||
if (!exitCodeRetrieved && KubePodResourceHelper.isTerminal(resource)) {
|
||||
final ContainerStatus mainContainerStatus = resource.getStatus().getContainerStatuses()
|
||||
.stream()
|
||||
.filter(containerStatus -> containerStatus.getName().equals(KubePodProcess.MAIN_CONTAINER_NAME))
|
||||
.collect(MoreCollectors.onlyElement());
|
||||
|
||||
if (mainContainerStatus.getState() != null && mainContainerStatus.getState().getTerminated() != null) {
|
||||
final int exitCode = mainContainerStatus.getState().getTerminated().getExitCode();
|
||||
log.info("Processing event with exit code " + exitCode + " for pod: " + resource.getMetadata().getName());
|
||||
onExitCode.accept(exitCode);
|
||||
exitCodeRetrieved = true;
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
String podName = "<unknown_name>";
|
||||
if (resource.getMetadata() != null) {
|
||||
podName = resource.getMetadata().getName();
|
||||
}
|
||||
|
||||
log.error("ExitCodeWatcher event handling failed for pod: " + podName, e);
|
||||
public void onAdd(final Pod pod) {
|
||||
if (shouldCheckPod(pod)) {
|
||||
final Optional<Integer> exitCode = getExitCode(pod);
|
||||
exitCode.ifPresent(this::persistExitCode);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onClose(WatcherException cause) {
|
||||
onWatchFailure.accept(cause);
|
||||
public void onUpdate(final Pod oldPod, final Pod newPod) {
|
||||
if (shouldCheckPod(newPod)) {
|
||||
final Optional<Integer> exitCode = getExitCode(newPod);
|
||||
exitCode.ifPresent(this::persistExitCode);
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void onDelete(final Pod pod, final boolean deletedFinalStateUnknown) {
|
||||
if (shouldCheckPod(pod)) {
|
||||
if (!deletedFinalStateUnknown) {
|
||||
final Optional<Integer> exitCode = getExitCode(pod);
|
||||
exitCode.ifPresentOrElse(
|
||||
this::persistExitCode,
|
||||
this::persistFailure);
|
||||
} else {
|
||||
persistFailure();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Informers without an OperationContext will monitor ALL pods in ALL namespaces; filter down to the
|
||||
* one pod that we care about. If it's still running, then we obviously can't fetch its exit code.
|
||||
* <p>
|
||||
* Also, if we've already found the exit code, or the pod has been deleted, then stop doing anything
|
||||
* at all.
|
||||
*/
|
||||
private boolean shouldCheckPod(final Pod pod) {
|
||||
final boolean correctName = podName.equals(pod.getMetadata().getName());
|
||||
final boolean correctNamespace = podNamespace.equals(pod.getMetadata().getNamespace());
|
||||
return active.get() && correctName && correctNamespace && KubePodResourceHelper.isTerminal(pod);
|
||||
}
|
||||
|
||||
private Optional<Integer> getExitCode(final Pod pod) {
|
||||
final ContainerStatus mainContainerStatus = pod.getStatus().getContainerStatuses()
|
||||
.stream()
|
||||
.filter(containerStatus -> containerStatus.getName().equals(KubePodProcess.MAIN_CONTAINER_NAME))
|
||||
.collect(MoreCollectors.onlyElement());
|
||||
|
||||
if (mainContainerStatus.getState() != null && mainContainerStatus.getState().getTerminated() != null) {
|
||||
return Optional.of(mainContainerStatus.getState().getTerminated().getExitCode());
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
|
||||
private void persistExitCode(final int exitCode) {
|
||||
if (active.compareAndSet(true, false)) {
|
||||
log.info("Received exit code {} for pod {}", exitCode, podName);
|
||||
onExitCode.accept(exitCode);
|
||||
}
|
||||
}
|
||||
|
||||
private void persistFailure() {
|
||||
if (active.compareAndSet(true, false)) {
|
||||
// Log an error. The pod is completely gone, and we have no way to retrieve its exit code
|
||||
// In theory, this shouldn't really happen. From
|
||||
// https://pkg.go.dev/k8s.io/client-go/tools/cache#DeletedFinalStateUnknown:
|
||||
// "in the case where an object was deleted but the watch deletion event was missed while
|
||||
// disconnected from apiserver"
|
||||
// But we have this handler just in case.
|
||||
log.error("Pod {} was deleted before we could retrieve its exit code", podName);
|
||||
onWatchFailure.run();
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -30,8 +30,8 @@ import io.fabric8.kubernetes.api.model.VolumeBuilder;
|
||||
import io.fabric8.kubernetes.api.model.VolumeMount;
|
||||
import io.fabric8.kubernetes.api.model.VolumeMountBuilder;
|
||||
import io.fabric8.kubernetes.client.KubernetesClient;
|
||||
import io.fabric8.kubernetes.client.Watch;
|
||||
import io.fabric8.kubernetes.client.dsl.PodResource;
|
||||
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
|
||||
import io.fabric8.kubernetes.client.internal.readiness.Readiness;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
@@ -147,7 +147,7 @@ public class KubePodProcess extends Process implements KubePod {
|
||||
private final int stderrLocalPort;
|
||||
private final ExecutorService executorService;
|
||||
private final CompletableFuture<Integer> exitCodeFuture;
|
||||
private final Watch podWatch;
|
||||
private final SharedIndexInformer<Pod> podInformer;
|
||||
|
||||
public static String getPodIP(final KubernetesClient client, final String podName, final String podNamespace) {
|
||||
final var pod = client.pods().inNamespace(podNamespace).withName(podName).get();
|
||||
@@ -305,16 +305,7 @@ public class KubePodProcess extends Process implements KubePod {
|
||||
LOGGER.info("Waiting for init container to be ready before copying files...");
|
||||
final PodResource<Pod> pod =
|
||||
client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName());
|
||||
try {
|
||||
pod.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES);
|
||||
} catch (final InterruptedException e) {
|
||||
LOGGER.error("Init pod not found after 5 minutes");
|
||||
LOGGER.error("Pod search executed in namespace {} for pod name {} resulted in: {}",
|
||||
podDefinition.getMetadata().getNamespace(),
|
||||
podDefinition.getMetadata().getName(),
|
||||
pod.get().toString());
|
||||
throw e;
|
||||
}
|
||||
pod.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().size() != 0, 5, TimeUnit.MINUTES);
|
||||
LOGGER.info("Init container present..");
|
||||
client.pods().inNamespace(podDefinition.getMetadata().getNamespace()).withName(podDefinition.getMetadata().getName())
|
||||
.waitUntilCondition(p -> p.getStatus().getInitContainerStatuses().get(0).getState().getRunning() != null, 5, TimeUnit.MINUTES);
|
||||
@@ -528,15 +519,21 @@ public class KubePodProcess extends Process implements KubePod {
|
||||
// This is safe only because we are blocking the init pod until we copy files onto it.
|
||||
// See the ExitCodeWatcher comments for more info.
|
||||
exitCodeFuture = new CompletableFuture<>();
|
||||
podWatch = fabricClient.resource(podDefinition).watch(new ExitCodeWatcher(
|
||||
podInformer = fabricClient.pods()
|
||||
.inNamespace(namespace)
|
||||
.withName(pod.getMetadata().getName())
|
||||
.inform();
|
||||
podInformer.addEventHandler(new ExitCodeWatcher(
|
||||
pod.getMetadata().getName(),
|
||||
namespace,
|
||||
exitCodeFuture::complete,
|
||||
exception -> {
|
||||
() -> {
|
||||
LOGGER.info(prependPodInfo(
|
||||
String.format(
|
||||
"Exit code watcher failed to retrieve the exit code. Defaulting to %s. This is expected if the job was cancelled. Error: %s",
|
||||
KILLED_EXIT_CODE,
|
||||
exception.getMessage()),
|
||||
namespace, podName));
|
||||
"Exit code watcher failed to retrieve the exit code. Defaulting to %s. This is expected if the job was cancelled.",
|
||||
KILLED_EXIT_CODE),
|
||||
namespace,
|
||||
podName));
|
||||
|
||||
exitCodeFuture.complete(KILLED_EXIT_CODE);
|
||||
}));
|
||||
@@ -631,7 +628,7 @@ public class KubePodProcess extends Process implements KubePod {
|
||||
public int waitFor() throws InterruptedException {
|
||||
try {
|
||||
exitCodeFuture.get();
|
||||
} catch (ExecutionException e) {
|
||||
} catch (final ExecutionException e) {
|
||||
throw new RuntimeException(e);
|
||||
}
|
||||
|
||||
@@ -704,7 +701,7 @@ public class KubePodProcess extends Process implements KubePod {
|
||||
|
||||
Exceptions.swallow(this.stdoutServerSocket::close);
|
||||
Exceptions.swallow(this.stderrServerSocket::close);
|
||||
Exceptions.swallow(this.podWatch::close);
|
||||
Exceptions.swallow(this.podInformer::close);
|
||||
Exceptions.swallow(this.executorService::shutdownNow);
|
||||
|
||||
KubePortManagerSingleton.getInstance().offer(stdoutLocalPort);
|
||||
@@ -717,7 +714,7 @@ public class KubePodProcess extends Process implements KubePod {
|
||||
if (exitCodeFuture.isDone()) {
|
||||
try {
|
||||
return exitCodeFuture.get();
|
||||
} catch (InterruptedException | ExecutionException e) {
|
||||
} catch (final InterruptedException | ExecutionException e) {
|
||||
throw new RuntimeException(
|
||||
prependPodInfo("Cannot find pod %s : %s while trying to retrieve exit code. This probably means the pod was not correctly created.",
|
||||
podDefinition.getMetadata().getNamespace(),
|
||||
|
||||
@@ -39,7 +39,7 @@ public class KubeProcessFactory implements ProcessFactory {
|
||||
public static final String NORMALISE_STEP = "normalise";
|
||||
public static final String CUSTOM_STEP = "custom";
|
||||
|
||||
private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+");;
|
||||
private static final Pattern ALPHABETIC = Pattern.compile("[a-zA-Z]+");
|
||||
private static final String JOB_LABEL_KEY = "job_id";
|
||||
private static final String ATTEMPT_LABEL_KEY = "attempt_id";
|
||||
private static final String WORKER_POD_LABEL_KEY = "airbyte";
|
||||
|
||||
@@ -18,8 +18,10 @@ import io.airbyte.config.EnvConfigs;
|
||||
import io.airbyte.config.ResourceRequirements;
|
||||
import io.airbyte.workers.WorkerConfigs;
|
||||
import io.airbyte.workers.WorkerException;
|
||||
import io.fabric8.kubernetes.api.model.Pod;
|
||||
import io.fabric8.kubernetes.client.DefaultKubernetesClient;
|
||||
import io.fabric8.kubernetes.client.KubernetesClient;
|
||||
import io.fabric8.kubernetes.client.informers.SharedIndexInformer;
|
||||
import java.io.IOException;
|
||||
import java.net.Inet4Address;
|
||||
import java.net.ServerSocket;
|
||||
@@ -297,11 +299,17 @@ public class KubePodProcessIntegrationTest {
|
||||
.filter(p -> p.getMetadata() != null && p.getMetadata().getLabels() != null)
|
||||
.filter(p -> p.getMetadata().getLabels().containsKey("uuid") && p.getMetadata().getLabels().get("uuid").equals(uuid.toString()))
|
||||
.collect(Collectors.toList()).get(0);
|
||||
fabricClient.resource(pod).watch(new ExitCodeWatcher(
|
||||
final SharedIndexInformer<Pod> podInformer = fabricClient.pods()
|
||||
.inNamespace(pod.getMetadata().getNamespace())
|
||||
.withName(pod.getMetadata().getName())
|
||||
.inform();
|
||||
podInformer.addEventHandler(new ExitCodeWatcher(
|
||||
pod.getMetadata().getName(),
|
||||
pod.getMetadata().getNamespace(),
|
||||
exitCode -> {
|
||||
fabricClient.pods().delete(pod);
|
||||
},
|
||||
exception -> {}));
|
||||
() -> {}));
|
||||
|
||||
process.waitFor();
|
||||
|
||||
|
||||
Reference in New Issue
Block a user