1
0
mirror of synced 2026-01-06 06:04:16 -05:00

Move Temporal Client to .env (#2675)

This commit is contained in:
Charles
2021-04-08 14:37:06 -07:00
committed by GitHub
parent 6ba62ca0ea
commit b5bf9df2eb
13 changed files with 121 additions and 71 deletions

1
.env
View File

@@ -17,3 +17,4 @@ TRACKING_STRATEGY=segment
# already exist on the host filesystem and MUST be parents of *_ROOT.
# Issue: https://github.com/airbytehq/airbyte/issues/577
HACK_LOCAL_ROOT_PARENT=/tmp
TEMPORAL_HOST=airbyte-temporal:7233

View File

@@ -58,6 +58,8 @@ public interface Configs {
WorkspaceRetentionConfig getWorkspaceRetentionConfig();
String getTemporalHost();
enum TrackingStrategy {
SEGMENT,
LOGGING

View File

@@ -51,6 +51,7 @@ public class EnvConfigs implements Configs {
private static final String MINIMUM_WORKSPACE_RETENTION_DAYS = "MINIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_RETENTION_DAYS = "MAXIMUM_WORKSPACE_RETENTION_DAYS";
private static final String MAXIMUM_WORKSPACE_SIZE_MB = "MAXIMUM_WORKSPACE_SIZE_MB";
private static final String TEMPORAL_HOST = "TEMPORAL_HOST";
private static final long DEFAULT_MINIMUM_WORKSPACE_RETENTION_DAYS = 1;
private static final long DEFAULT_MAXIMUM_WORKSPACE_RETENTION_DAYS = 60;
@@ -115,62 +116,34 @@ public class EnvConfigs implements Configs {
@Override
public String getWorkspaceDockerMount() {
final String mount = getEnv.apply(WORKSPACE_DOCKER_MOUNT);
if (mount != null) {
return mount;
}
LOGGER.info(WORKSPACE_DOCKER_MOUNT + " not found, defaulting to " + WORKSPACE_ROOT);
return getWorkspaceRoot().toString();
return getEnvOrDefault(WORKSPACE_DOCKER_MOUNT, WORKSPACE_ROOT);
}
@Override
public String getLocalDockerMount() {
final String mount = getEnv.apply(LOCAL_DOCKER_MOUNT);
if (mount != null) {
return mount;
}
LOGGER.info(LOCAL_DOCKER_MOUNT + " not found, defaulting to " + LOCAL_ROOT);
return getLocalRoot().toString();
return getEnvOrDefault(LOCAL_DOCKER_MOUNT, LOCAL_ROOT);
}
@Override
public String getDockerNetwork() {
final String network = getEnv.apply(DOCKER_NETWORK);
if (network != null) {
return network;
}
LOGGER.info(DOCKER_NETWORK + " not found, defaulting to " + DEFAULT_NETWORK);
return DEFAULT_NETWORK;
return getEnvOrDefault(DOCKER_NETWORK, DEFAULT_NETWORK);
}
@Override
public TrackingStrategy getTrackingStrategy() {
final String trackingStrategy = getEnv.apply(TRACKING_STRATEGY);
if (trackingStrategy == null) {
LOGGER.info("TRACKING_STRATEGY not set, defaulting to " + TrackingStrategy.LOGGING);
return TrackingStrategy.LOGGING;
}
try {
return TrackingStrategy.valueOf(trackingStrategy.toUpperCase());
} catch (IllegalArgumentException e) {
LOGGER.info(trackingStrategy + " not recognized, defaulting to " + TrackingStrategy.LOGGING);
return TrackingStrategy.LOGGING;
}
return getEnvOrDefault(TRACKING_STRATEGY, TrackingStrategy.LOGGING, s -> {
try {
return TrackingStrategy.valueOf(s.toUpperCase());
} catch (IllegalArgumentException e) {
LOGGER.info(s + " not recognized, defaulting to " + TrackingStrategy.LOGGING);
return TrackingStrategy.LOGGING;
}
});
}
@Override
public WorkerEnvironment getWorkerEnvironment() {
final String workerEnvironment = getEnv.apply(WORKER_ENVIRONMENT);
if (workerEnvironment != null) {
return WorkerEnvironment.valueOf(workerEnvironment.toUpperCase());
}
LOGGER.info(WORKER_ENVIRONMENT + " not found, defaulting to " + WorkerEnvironment.DOCKER);
return WorkerEnvironment.DOCKER;
return getEnvOrDefault(WORKER_ENVIRONMENT, WorkerEnvironment.DOCKER, s -> WorkerEnvironment.valueOf(s.toUpperCase()));
}
@Override
@@ -182,10 +155,23 @@ public class EnvConfigs implements Configs {
return new WorkspaceRetentionConfig(minDays, maxDays, maxSizeMb);
}
public long getEnvOrDefault(String key, long defaultValue) {
@Override
public String getTemporalHost() {
return getEnvOrDefault(TEMPORAL_HOST, "airbyte-temporal:7233");
}
private String getEnvOrDefault(String key, String defaultValue) {
return getEnvOrDefault(key, defaultValue, Function.identity());
}
private long getEnvOrDefault(String key, long defaultValue) {
return getEnvOrDefault(key, defaultValue, Long::parseLong);
}
private <T> T getEnvOrDefault(String key, T defaultValue, Function<String, T> parser) {
final String value = getEnv.apply(key);
if (value != null) {
return Long.parseLong(value);
return parser.apply(value);
} else {
LOGGER.info(key + " not found, defaulting to " + defaultValue);
return defaultValue;

View File

@@ -47,6 +47,8 @@ import io.airbyte.workers.process.KubeProcessBuilderFactory;
import io.airbyte.workers.process.ProcessBuilderFactory;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalPool;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.time.Duration;
@@ -83,26 +85,33 @@ public class SchedulerApp {
private final JobPersistence jobPersistence;
private final ConfigRepository configRepository;
private final JobCleaner jobCleaner;
private final TemporalClient temporalClient;
private final WorkflowServiceStubs temporalService;
public SchedulerApp(Path workspaceRoot,
ProcessBuilderFactory pbf,
JobPersistence jobPersistence,
ConfigRepository configRepository,
JobCleaner jobCleaner) {
JobCleaner jobCleaner,
TemporalClient temporalClient,
WorkflowServiceStubs temporalService) {
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
this.jobPersistence = jobPersistence;
this.configRepository = configRepository;
this.jobCleaner = jobCleaner;
this.temporalClient = temporalClient;
this.temporalService = temporalService;
}
public void start() throws IOException {
final TemporalPool temporalPool = new TemporalPool(workspaceRoot, pbf);
final TemporalPool temporalPool = new TemporalPool(temporalService, workspaceRoot, pbf);
temporalPool.run();
final ExecutorService workerThreadPool = Executors.newFixedThreadPool(MAX_WORKERS, THREAD_FACTORY);
final ScheduledExecutorService scheduledPool = Executors.newSingleThreadScheduledExecutor();
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(TemporalClient.production(workspaceRoot), workspaceRoot);
final TemporalWorkerRunFactory temporalWorkerRunFactory = new TemporalWorkerRunFactory(temporalClient, workspaceRoot);
final JobRetrier jobRetrier = new JobRetrier(jobPersistence, Instant::now);
final JobScheduler jobScheduler = new JobScheduler(jobPersistence, configRepository);
final JobSubmitter jobSubmitter = new JobSubmitter(
@@ -170,6 +179,9 @@ public class SchedulerApp {
final Path workspaceRoot = configs.getWorkspaceRoot();
LOGGER.info("workspaceRoot = " + workspaceRoot);
final String temporalHost = configs.getTemporalHost();
LOGGER.info("temporalHost = " + temporalHost);
LOGGER.info("Creating DB connection pool...");
final Database database = Databases.createPostgresDatabase(
configs.getDatabaseUser(),
@@ -206,8 +218,11 @@ public class SchedulerApp {
throw new IllegalStateException("Unable to retrieve Airbyte Version, aborting...");
}
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(temporalHost);
final TemporalClient temporalClient = TemporalClient.production(temporalHost, workspaceRoot);
LOGGER.info("Launching scheduler...");
new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, jobCleaner).start();
new SchedulerApp(workspaceRoot, pbf, jobPersistence, configRepository, jobCleaner, temporalClient, temporalService).start();
}
}

View File

@@ -31,12 +31,14 @@ import io.airbyte.scheduler.client.CachingSynchronousSchedulerClient;
import io.airbyte.scheduler.client.SchedulerJobClient;
import io.airbyte.scheduler.persistence.JobPersistence;
import io.airbyte.server.apis.ConfigurationApi;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.util.Map;
import org.glassfish.hk2.api.Factory;
import org.slf4j.MDC;
public class ConfigurationApiFactory implements Factory<ConfigurationApi> {
private static WorkflowServiceStubs temporalService;
private static ConfigRepository configRepository;
private static JobPersistence jobPersistence;
private static SchedulerJobClient schedulerJobClient;
@@ -73,6 +75,10 @@ public class ConfigurationApiFactory implements Factory<ConfigurationApi> {
ConfigurationApiFactory.mdc = mdc;
}
public static void setTemporalService(final WorkflowServiceStubs temporalService) {
ConfigurationApiFactory.temporalService = temporalService;
}
@Override
public ConfigurationApi provide() {
MDC.setContextMap(mdc);
@@ -83,7 +89,8 @@ public class ConfigurationApiFactory implements Factory<ConfigurationApi> {
ConfigurationApiFactory.schedulerJobClient,
ConfigurationApiFactory.synchronousSchedulerClient,
ConfigurationApiFactory.configs,
ConfigurationApiFactory.archiveTtlManager);
ConfigurationApiFactory.archiveTtlManager,
ConfigurationApiFactory.temporalService);
}
@Override

View File

@@ -54,6 +54,8 @@ import io.airbyte.server.errors.NotFoundExceptionMapper;
import io.airbyte.server.errors.UncaughtExceptionMapper;
import io.airbyte.validation.json.JsonValidationException;
import io.airbyte.workers.temporal.TemporalClient;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.util.Map;
@@ -99,10 +101,12 @@ public class ServerApp {
ConfigurationApiFactory.setSchedulerJobClient(new DefaultSchedulerJobClient(jobPersistence, new DefaultJobCreator(jobPersistence)));
final JobTracker jobTracker = new JobTracker(configRepository, jobPersistence);
final TemporalClient temporalClient = TemporalClient.production(configs.getWorkspaceRoot());
final WorkflowServiceStubs temporalService = TemporalUtils.createTemporalService(configs.getTemporalHost());
final TemporalClient temporalClient = TemporalClient.production(configs.getTemporalHost(), configs.getWorkspaceRoot());
ConfigurationApiFactory
.setSynchronousSchedulerClient(new SpecCachingSynchronousSchedulerClient(new DefaultSynchronousSchedulerClient(temporalClient, jobTracker)));
ConfigurationApiFactory.setTemporalService(temporalService);
ConfigurationApiFactory.setConfigRepository(configRepository);
ConfigurationApiFactory.setJobPersistence(jobPersistence);
ConfigurationApiFactory.setConfigs(configs);

View File

@@ -100,6 +100,7 @@ import io.airbyte.server.handlers.WorkspacesHandler;
import io.airbyte.server.validators.DockerImageValidator;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.File;
import java.io.IOException;
import javax.validation.Valid;
@@ -124,17 +125,25 @@ public class ConfigurationApi implements io.airbyte.api.V1Api {
private final LogsHandler logsHandler;
private final OpenApiConfigHandler openApiConfigHandler;
private final Configs configs;
private final WorkflowServiceStubs temporalService;
public ConfigurationApi(final ConfigRepository configRepository,
final JobPersistence jobPersistence,
final SchedulerJobClient schedulerJobClient,
final CachingSynchronousSchedulerClient synchronousSchedulerClient,
final Configs configs,
final FileTtlManager archiveTtlManager) {
final FileTtlManager archiveTtlManager,
final WorkflowServiceStubs temporalService) {
this.temporalService = temporalService;
final SpecFetcher specFetcher = new SpecFetcher(synchronousSchedulerClient);
final JsonSchemaValidator schemaValidator = new JsonSchemaValidator();
schedulerHandler =
new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, jobPersistence, configs.getWorkspaceRoot());
schedulerHandler = new SchedulerHandler(
configRepository,
schedulerJobClient,
synchronousSchedulerClient,
jobPersistence,
configs.getWorkspaceRoot(),
temporalService);
final DockerImageValidator dockerImageValidator = new DockerImageValidator(synchronousSchedulerClient);
sourceDefinitionsHandler = new SourceDefinitionsHandler(configRepository, dockerImageValidator, synchronousSchedulerClient);
connectionsHandler = new ConnectionsHandler(configRepository);

View File

@@ -71,6 +71,7 @@ import io.airbyte.workers.temporal.TemporalAttemptExecution;
import io.airbyte.workers.temporal.TemporalUtils;
import io.temporal.api.common.v1.WorkflowExecution;
import io.temporal.api.workflowservice.v1.RequestCancelWorkflowExecutionRequest;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.nio.file.Path;
import java.util.UUID;
@@ -85,12 +86,14 @@ public class SchedulerHandler {
private final JsonSchemaValidator jsonSchemaValidator;
private final JobPersistence jobPersistence;
private final Path workspaceRoot;
private final WorkflowServiceStubs temporalService;
public SchedulerHandler(ConfigRepository configRepository,
SchedulerJobClient schedulerJobClient,
SynchronousSchedulerClient synchronousSchedulerClient,
JobPersistence jobPersistence,
Path workspaceRoot) {
Path workspaceRoot,
WorkflowServiceStubs temporalService) {
this(
configRepository,
schedulerJobClient,
@@ -99,7 +102,8 @@ public class SchedulerHandler {
new JsonSchemaValidator(),
new SpecFetcher(synchronousSchedulerClient),
jobPersistence,
workspaceRoot);
workspaceRoot,
temporalService);
}
@VisibleForTesting
@@ -110,7 +114,8 @@ public class SchedulerHandler {
JsonSchemaValidator jsonSchemaValidator,
SpecFetcher specFetcher,
JobPersistence jobPersistence,
Path workspaceRoot) {
Path workspaceRoot,
WorkflowServiceStubs temporalService) {
this.configRepository = configRepository;
this.schedulerJobClient = schedulerJobClient;
this.synchronousSchedulerClient = synchronousSchedulerClient;
@@ -119,6 +124,7 @@ public class SchedulerHandler {
this.specFetcher = specFetcher;
this.jobPersistence = jobPersistence;
this.workspaceRoot = workspaceRoot;
this.temporalService = temporalService;
}
public CheckConnectionRead checkSourceConnectionFromSourceId(SourceIdRequestBody sourceIdRequestBody)
@@ -316,7 +322,7 @@ public class SchedulerHandler {
.setNamespace(TemporalUtils.DEFAULT_NAMESPACE)
.build();
TemporalUtils.TEMPORAL_SERVICE.blockingStub().requestCancelWorkflowExecution(cancelRequest);
temporalService.blockingStub().requestCancelWorkflowExecution(cancelRequest);
return JobConverter.getJobInfoRead(jobPersistence.getJob(jobId));
}

View File

@@ -80,6 +80,7 @@ import io.airbyte.server.helpers.DestinationHelpers;
import io.airbyte.server.helpers.SourceHelpers;
import io.airbyte.validation.json.JsonSchemaValidator;
import io.airbyte.validation.json.JsonValidationException;
import io.temporal.serviceclient.WorkflowServiceStubs;
import java.io.IOException;
import java.net.URI;
import java.nio.file.Path;
@@ -148,8 +149,16 @@ class SchedulerHandlerTest {
configRepository = mock(ConfigRepository.class);
jobPersistence = mock(JobPersistence.class);
schedulerHandler = new SchedulerHandler(configRepository, schedulerJobClient, synchronousSchedulerClient, configurationUpdate,
jsonSchemaValidator, specFetcher, jobPersistence, mock(Path.class));
schedulerHandler = new SchedulerHandler(
configRepository,
schedulerJobClient,
synchronousSchedulerClient,
configurationUpdate,
jsonSchemaValidator,
specFetcher,
jobPersistence,
mock(Path.class),
mock(WorkflowServiceStubs.class));
}
@Test

View File

@@ -49,8 +49,8 @@ public class TemporalClient {
private final Path workspaceRoot;
private final WorkflowClient client;
public static TemporalClient production(Path workspaceRoot) {
return new TemporalClient(TemporalUtils.TEMPORAL_CLIENT, workspaceRoot);
public static TemporalClient production(String temporalHost, Path workspaceRoot) {
return new TemporalClient(TemporalUtils.createTemporalClient(temporalHost), workspaceRoot);
}
// todo (cgardens) - there are two sources of truth on workspace root. we need to get this down to

View File

@@ -30,6 +30,8 @@ import io.airbyte.workers.process.ProcessBuilderFactory;
import io.temporal.api.namespace.v1.NamespaceInfo;
import io.temporal.api.workflowservice.v1.DescribeNamespaceResponse;
import io.temporal.api.workflowservice.v1.ListNamespacesRequest;
import io.temporal.client.WorkflowClient;
import io.temporal.serviceclient.WorkflowServiceStubs;
import io.temporal.worker.Worker;
import io.temporal.worker.WorkerFactory;
import java.nio.file.Path;
@@ -41,10 +43,12 @@ public class TemporalPool implements Runnable {
private static final Logger LOGGER = LoggerFactory.getLogger(TemporalPool.class);
private final WorkflowServiceStubs temporalService;
private final Path workspaceRoot;
private final ProcessBuilderFactory pbf;
public TemporalPool(Path workspaceRoot, ProcessBuilderFactory pbf) {
public TemporalPool(WorkflowServiceStubs temporalService, Path workspaceRoot, ProcessBuilderFactory pbf) {
this.temporalService = temporalService;
this.workspaceRoot = workspaceRoot;
this.pbf = pbf;
}
@@ -53,7 +57,7 @@ public class TemporalPool implements Runnable {
public void run() {
waitForTemporalServerAndLog();
final WorkerFactory factory = WorkerFactory.newInstance(TemporalUtils.TEMPORAL_CLIENT);
final WorkerFactory factory = WorkerFactory.newInstance(WorkflowClient.newInstance(temporalService));
final Worker specWorker = factory.newWorker(TemporalJobType.GET_SPEC.name());
specWorker.registerWorkflowImplementationTypes(SpecWorkflow.WorkflowImpl.class);
@@ -74,10 +78,10 @@ public class TemporalPool implements Runnable {
factory.start();
}
private static void waitForTemporalServerAndLog() {
private void waitForTemporalServerAndLog() {
LOGGER.info("Waiting for temporal server...");
while (!getNamespaces().contains("default")) {
while (!getNamespaces(temporalService).contains("default")) {
LOGGER.warn("Waiting for default namespace to be initialized in temporal...");
wait(2);
}
@@ -96,8 +100,8 @@ public class TemporalPool implements Runnable {
}
}
private static Set<String> getNamespaces() {
return TemporalUtils.TEMPORAL_SERVICE.blockingStub()
private static Set<String> getNamespaces(WorkflowServiceStubs temporalService) {
return temporalService.blockingStub()
.listNamespaces(ListNamespacesRequest.newBuilder().build())
.getNamespacesList()
.stream()

View File

@@ -35,14 +35,19 @@ import java.util.UUID;
public class TemporalUtils {
private static final WorkflowServiceStubsOptions TEMPORAL_OPTIONS = WorkflowServiceStubsOptions.newBuilder()
// todo move to env.
.setTarget("airbyte-temporal:7233")
.build();
public static WorkflowServiceStubs createTemporalService(String temporalHost) {
final WorkflowServiceStubsOptions options = WorkflowServiceStubsOptions.newBuilder()
// todo move to env.
.setTarget(temporalHost)
.build();
public static final WorkflowServiceStubs TEMPORAL_SERVICE = WorkflowServiceStubs.newInstance(TEMPORAL_OPTIONS);
return WorkflowServiceStubs.newInstance(options);
}
public static final WorkflowClient TEMPORAL_CLIENT = WorkflowClient.newInstance(TEMPORAL_SERVICE);
public static WorkflowClient createTemporalClient(String temporalHost) {
final WorkflowServiceStubs temporalService = createTemporalService(temporalHost);
return WorkflowClient.newInstance(temporalService);
}
public static final RetryOptions NO_RETRY = RetryOptions.newBuilder().setMaximumAttempts(1).build();

View File

@@ -45,6 +45,7 @@ services:
- TRACKING_STRATEGY=${TRACKING_STRATEGY}
- AIRBYTE_VERSION=${VERSION}
- AIRBYTE_ROLE=${AIRBYTE_ROLE:-}
- TEMPORAL_HOST=${TEMPORAL_HOST}
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- workspace:${WORKSPACE_ROOT}
@@ -65,6 +66,7 @@ services:
- AIRBYTE_VERSION=${VERSION}
- AIRBYTE_ROLE=${AIRBYTE_ROLE:-}
- WORKSPACE_ROOT=${WORKSPACE_ROOT}
- TEMPORAL_HOST=${TEMPORAL_HOST}
ports:
- 8001:8001
volumes: