fix(runner-kafka): fix detection of evicted worker

The previous implementation are not working due to partition. The worker instance are read for each partition and so don't detect evicted worker for other partition.
Now we use a globalStateStore to do it.
This commit is contained in:
tchiotludo
2020-11-20 18:44:27 +01:00
parent a34afcb414
commit 87d81f8b8b
6 changed files with 163 additions and 73 deletions

View File

@@ -113,6 +113,11 @@ kestra:
properties:
cleanup.policy: "compact"
executorworkerinstance:
name: "kestra_executor_workerinstance"
properties:
cleanup.policy: "compact"
workertaskrunning:
cls: org.kestra.core.runners.WorkerTaskRunning
name: "kestra_workertaskrunning"

View File

@@ -14,10 +14,8 @@ import org.apache.kafka.streams.Topology;
import org.apache.kafka.streams.kstream.*;
import org.apache.kafka.streams.state.KeyValueStore;
import org.apache.kafka.streams.state.QueryableStoreTypes;
import org.apache.kafka.streams.state.ReadOnlyKeyValueStore;
import org.apache.kafka.streams.state.Stores;
import org.kestra.core.metrics.MetricRegistry;
import org.kestra.core.models.conditions.Condition;
import org.kestra.core.models.executions.Execution;
import org.kestra.core.models.executions.ExecutionKilled;
import org.kestra.core.models.executions.LogEntry;
@@ -50,8 +48,10 @@ public class KafkaExecutor extends AbstractExecutor {
private static final String WORKERTASK_DEDUPLICATION_STATE_STORE_NAME = "workertask_deduplication";
private static final String TRIGGER_DEDUPLICATION_STATE_STORE_NAME = "trigger_deduplication";
private static final String NEXTS_DEDUPLICATION_STATE_STORE_NAME = "next_deduplication";
private static final String WORKERINSTANCE_STATE_STORE_NAME = "worker_instance";
public static final String WORKER_RUNNING_STATE_STORE_NAME = "worker_running";
public static final String WORKERINSTANCE_STATE_STORE_NAME = "worker_instance";
private static final String TOPIC_EXECUTOR = "executor";
private static final String TOPIC_EXECUTOR_WORKERINSTANCE = "executorworkerinstance";
ApplicationContext applicationContext;
KafkaStreamService kafkaStreamService;
@@ -106,12 +106,17 @@ public class KafkaExecutor extends AbstractExecutor {
Serdes.String()
));
// worker instance
builder.addStateStore(Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(WORKERINSTANCE_STATE_STORE_NAME),
Serdes.String(),
JsonSerde.of(WorkerInstance.class)
));
// worker instance global state store
builder.addGlobalStore(
Stores.keyValueStoreBuilder(
Stores.persistentKeyValueStore(WORKERINSTANCE_STATE_STORE_NAME),
Serdes.String(),
JsonSerde.of(WorkerInstance.class)
),
kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE),
Consumed.with(Serdes.String(), JsonSerde.of(WorkerInstance.class)),
() -> new GlobalStateProcessor<>(WORKERINSTANCE_STATE_STORE_NAME)
);
// declare ktable & kstream
KStream<String, WorkerTaskResult> workerTaskResultKStream = this.workerTaskResultKStream(builder);
@@ -153,7 +158,7 @@ public class KafkaExecutor extends AbstractExecutor {
// handle worker
this.purgeWorkerRunning(workerTaskResultKStream);
this.detectNewWorker(workerInstanceKStream, workerTaskRunningKTable);
this.detectNewWorker(workerInstanceKStream);
// build
Topology topology = builder.build();
@@ -231,7 +236,7 @@ public class KafkaExecutor extends AbstractExecutor {
.globalTable(
kafkaAdminService.getTopicName(WorkerTaskRunning.class),
Consumed.with(Serdes.String(), JsonSerde.of(WorkerTaskRunning.class)),
Materialized.<String, WorkerTaskRunning, KeyValueStore<Bytes, byte[]>>as("worker_running")
Materialized.<String, WorkerTaskRunning, KeyValueStore<Bytes, byte[]>>as(WORKER_RUNNING_STATE_STORE_NAME)
.withKeySerde(Serdes.String())
.withValueSerde(JsonSerde.of(WorkerTaskRunning.class))
);
@@ -643,17 +648,20 @@ public class KafkaExecutor extends AbstractExecutor {
);
}
private void detectNewWorker(
KStream<String, WorkerInstance> workerInstanceKStream,
@SuppressWarnings("unused") GlobalKTable<String, WorkerTaskRunning> workerTaskRunningKTable // used as a state store on WorkerInstanceTransformer
) {
/**
* GlobalKTable<String, WorkerTaskRunning> is used as a state store on WorkerInstanceTransformer
*/
private void detectNewWorker(KStream<String, WorkerInstance> workerInstanceKStream) {
workerInstanceKStream
.to(
kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE),
Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class))
);
KStream<String, WorkerInstanceTransformer.Result> stream = workerInstanceKStream
.transformValues(
() -> new WorkerInstanceTransformer(
WORKERINSTANCE_STATE_STORE_NAME
),
Named.as("detectNewWorker-transformValues"),
WORKERINSTANCE_STATE_STORE_NAME
WorkerInstanceTransformer::new,
Named.as("detectNewWorker-transformValues")
)
.flatMapValues((readOnlyKey, value) -> value, Named.as("detectNewWorker-listToItem-flatMap"));
@@ -678,7 +686,7 @@ public class KafkaExecutor extends AbstractExecutor {
);
// we resend the WorkerInstance update
logIfEnabled(
KStream<String, WorkerInstance> updatedStream = logIfEnabled(
stream,
(key, value) -> log.debug(
"Instance updated: {}",
@@ -689,7 +697,17 @@ public class KafkaExecutor extends AbstractExecutor {
.map(
(key, value) -> value.getWorkerInstanceUpdated(),
Named.as("detectNewWorker-instanceUpdate-map")
)
);
// cleanup executor workerinstance state store
updatedStream
.filter((key, value) -> value != null, Named.as("detectNewWorker-null-filter"))
.to(
kafkaAdminService.getTopicName(TOPIC_EXECUTOR_WORKERINSTANCE),
Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class))
);
updatedStream
.to(
kafkaAdminService.getTopicName(WorkerInstance.class),
Produced.with(Serdes.String(), JsonSerde.of(WorkerInstance.class))
@@ -847,6 +865,7 @@ public class KafkaExecutor extends AbstractExecutor {
kafkaAdminService.createIfNotExist(Execution.class);
kafkaAdminService.createIfNotExist(Flow.class);
kafkaAdminService.createIfNotExist(TOPIC_EXECUTOR);
kafkaAdminService.createIfNotExist(TOPIC_EXECUTOR_WORKERINSTANCE);
kafkaAdminService.createIfNotExist(ExecutionKilled.class);
kafkaAdminService.createIfNotExist(WorkerTaskRunning.class);
kafkaAdminService.createIfNotExist(WorkerInstance.class);

View File

@@ -0,0 +1,36 @@
package org.kestra.runner.kafka.streams;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.processor.Processor;
import org.apache.kafka.streams.processor.ProcessorContext;
import org.apache.kafka.streams.state.KeyValueStore;
@Slf4j
public class GlobalStateProcessor <T> implements Processor<String, T> {
private final String storeName;
private KeyValueStore<String, T> store;
public GlobalStateProcessor(String storeName) {
this.storeName = storeName;
}
@SuppressWarnings("unchecked")
@Override
public void init(ProcessorContext context) {
this.store = (KeyValueStore<String, T>) context.getStateStore(this.storeName);
}
@Override
public void process(String key, T value) {
if (value == null) {
this.store.delete(key);
} else {
this.store.put(key, value);
}
}
@Override
public void close() {
}
}

View File

@@ -3,6 +3,7 @@ package org.kestra.runner.kafka.streams;
import com.google.common.collect.Streams;
import lombok.AllArgsConstructor;
import lombok.Getter;
import lombok.ToString;
import lombok.extern.slf4j.Slf4j;
import org.apache.kafka.streams.KeyValue;
import org.apache.kafka.streams.kstream.ValueTransformerWithKey;
@@ -14,6 +15,7 @@ import org.kestra.core.runners.WorkerInstance;
import org.kestra.core.runners.WorkerTask;
import org.kestra.core.runners.WorkerTaskRunning;
import org.kestra.core.services.WorkerInstanceService;
import org.kestra.runner.kafka.KafkaExecutor;
import java.util.Collections;
import java.util.List;
@@ -22,78 +24,100 @@ import java.util.stream.Collectors;
@Slf4j
@SuppressWarnings("UnstableApiUsage")
public class WorkerInstanceTransformer implements ValueTransformerWithKey<String, WorkerInstance, List<WorkerInstanceTransformer.Result>> {
private final String instanceStoreName;
private KeyValueStore<String, WorkerInstance> instanceStore;
private KeyValueStore<String, ValueAndTimestamp<WorkerTaskRunning>> runningStore;
public WorkerInstanceTransformer(String instanceStoreName) {
this.instanceStoreName = instanceStoreName;
public WorkerInstanceTransformer() {
}
@Override
@SuppressWarnings("unchecked")
public void init(final ProcessorContext context) {
this.instanceStore = (KeyValueStore<String, WorkerInstance>) context.getStateStore(this.instanceStoreName);
this.runningStore = (KeyValueStore<String, ValueAndTimestamp<WorkerTaskRunning>>) context.getStateStore("worker_running");
this.instanceStore = (KeyValueStore<String, WorkerInstance>) context.getStateStore(KafkaExecutor.WORKERINSTANCE_STATE_STORE_NAME);
this.runningStore = (KeyValueStore<String, ValueAndTimestamp<WorkerTaskRunning>>) context.getStateStore(KafkaExecutor.WORKER_RUNNING_STATE_STORE_NAME);
}
@Override
public List<Result> transform(final String key, final WorkerInstance value) {
if (value == null) {
this.instanceStore.delete(key);
log.debug("Incoming instance: {} {}", key, value);
if (value == null) {
return Collections.emptyList();
}
this.instanceStore.put(key, value);
List<WorkerInstance> allInstances;
try (KeyValueIterator<String, WorkerInstance> all = this.instanceStore.all()) {
List<WorkerInstance> updatedInstances = WorkerInstanceService.removeEvictedPartitions(
Streams.stream(all).map(r -> r.value),
value
);
return updatedInstances
.stream()
.map(updated -> {
String finalInstanceKey = updated.getWorkerUuid().toString();
if (updated.getPartitions().size() > 0) {
return new Result(
Collections.emptyList(),
KeyValue.pair(finalInstanceKey, updated)
);
} else {
// no more partitions for this WorkerInstance, this one doesn't exist any more.
// we delete this one and resend all the running tasks
log.warn("Detected evicted worker: {}", updated);
List<WorkerTask> workerTasks = this.listRunningForWorkerInstance(updated);
workerTasks.forEach(workerTask ->
log.info(
"[namespace: {}] [flow: {}] [execution: {}] [taskrun: {}] WorkerTask is being resend",
workerTask.getTaskRun().getNamespace(),
workerTask.getTaskRun().getFlowId(),
workerTask.getTaskRun().getId(),
workerTask.getTaskRun().getExecutionId()
)
);
return new Result(
workerTasks,
KeyValue.pair(finalInstanceKey, null)
);
}
})
allInstances = Streams.stream(all)
.map(r -> r.value)
.collect(Collectors.toList());
}
log.trace("All instance defined: {}", allInstances);
List<WorkerInstance> updatedInstances = WorkerInstanceService.removeEvictedPartitions(
allInstances.stream(),
value
);
log.trace("Updated instances: {}", updatedInstances);
return updatedInstances
.stream()
.map(updated -> {
String finalInstanceKey = updated.getWorkerUuid().toString();
if (updated.getPartitions().size() > 0) {
return new Result(
Collections.emptyList(),
KeyValue.pair(finalInstanceKey, updated)
);
} else {
// no more partitions for this WorkerInstance, this one doesn't exist any more.
// we delete this one and resend all the running tasks
log.warn("Detected evicted worker: {}", updated);
List<WorkerTask> workerTasks = this.listRunningForWorkerInstance(updated);
workerTasks.forEach(workerTask ->
log.warn(
"[namespace: {}] [flow: {}] [execution: {}] [taskrun: {}] WorkerTask is being resend",
workerTask.getTaskRun().getNamespace(),
workerTask.getTaskRun().getFlowId(),
workerTask.getTaskRun().getId(),
workerTask.getTaskRun().getExecutionId()
)
);
return new Result(
workerTasks,
KeyValue.pair(finalInstanceKey, null)
);
}
})
.collect(Collectors.toList());
}
private List<WorkerTask> listRunningForWorkerInstance(WorkerInstance workerInstance) {
try (KeyValueIterator<String, ValueAndTimestamp<WorkerTaskRunning>> all = this.runningStore.all()) {
return Streams.stream(all)
List<KeyValue<String, ValueAndTimestamp<WorkerTaskRunning>>> runnings = Streams
.stream(all)
.collect(Collectors.toList());
if (log.isDebugEnabled()) {
runnings
.forEach(kv -> {
log.debug(
"Current running tasks: {}",
runnings.stream()
.map(s -> kv.value.value())
.collect(Collectors.toList())
);
});
}
return runnings
.stream()
.map(r -> r.value.value())
.filter(r -> r.getWorkerInstance().getWorkerUuid().toString().equals(workerInstance.getWorkerUuid().toString()))
.map(r -> WorkerTask.builder()
@@ -112,6 +136,7 @@ public class WorkerInstanceTransformer implements ValueTransformerWithKey<String
@AllArgsConstructor
@Getter
@ToString
public static class Result {
List<WorkerTask> workerTasksToSend;
KeyValue<String, WorkerInstance> workerInstanceUpdated;

View File

@@ -47,7 +47,7 @@ import static org.hamcrest.Matchers.*;
class KafkaExecutorTest {
@Inject
KafkaExecutor stream;
@Inject
ClientConfig clientConfig;
@@ -59,11 +59,11 @@ class KafkaExecutorTest {
@Inject
FlowRepositoryInterface flowRepository;
TestTopology<String, String> testTopology;
static WorkerInstance workerInstance = workerInstance();
@BeforeEach
void init() throws IOException, URISyntaxException {
TestsUtils.loads(repositoryLoader);

View File

@@ -92,6 +92,11 @@ kestra:
properties:
cleanup.policy: "compact"
executorworkerinstance:
name: "kestra_executor_workerinstance"
properties:
cleanup.policy: "compact"
workertaskrunning:
cls: org.kestra.core.runners.WorkerTaskRunning
name: "kestra_workertaskrunning"