mirror of
https://github.com/kestra-io/kestra.git
synced 2025-12-19 18:05:41 -05:00
feat(triggers): add inputs property to webhook trigger
Add a new `inputs` property to the Webhook trigger, allowing input data to be passed to the triggered flow. If no inputs are defined on the trigger, the flow will not receive any inputs, even if some have default values. This behavior ensures backward compatibility with how the Webhook trigger currently works.
This commit is contained in:
committed by
Florian Hussonnois
parent
ad13a64ccc
commit
78c01999ad
@@ -156,6 +156,13 @@ public class Webhook extends AbstractTrigger implements TriggerOutput<Webhook.Ou
|
||||
"""
|
||||
)
|
||||
private Boolean wait = false;
|
||||
|
||||
|
||||
@Schema(
|
||||
title = "The inputs to pass to the triggered flow"
|
||||
)
|
||||
@PluginProperty(dynamic = true)
|
||||
private Map<String, Object> inputs;
|
||||
|
||||
@PluginProperty
|
||||
@Builder.Default
|
||||
@@ -174,6 +181,7 @@ public class Webhook extends AbstractTrigger implements TriggerOutput<Webhook.Ou
|
||||
.namespace(flow.getNamespace())
|
||||
.flowId(flow.getId())
|
||||
.flowRevision(flow.getRevision())
|
||||
.inputs(inputs)
|
||||
.state(new State())
|
||||
.trigger(ExecutionTrigger.of(
|
||||
this,
|
||||
|
||||
15
core/src/test/resources/flows/valids/webhook-inputs.yaml
Normal file
15
core/src/test/resources/flows/valids/webhook-inputs.yaml
Normal file
@@ -0,0 +1,15 @@
|
||||
id: webhook-inputs
|
||||
namespace: io.kestra.tests
|
||||
inputs:
|
||||
- id: body
|
||||
type: STRING
|
||||
tasks:
|
||||
- id: out
|
||||
type: io.kestra.plugin.core.debug.Return
|
||||
format: "{{ inputs.body }}"
|
||||
triggers:
|
||||
- id: webhook
|
||||
type: io.kestra.plugin.core.trigger.Webhook
|
||||
key: webhookKey
|
||||
inputs:
|
||||
body: "{{ trigger.body }}"
|
||||
@@ -544,7 +544,7 @@ public class ExecutionController {
|
||||
throw new IllegalStateException("Cannot execute an invalid flow: " + fwe.getException());
|
||||
}
|
||||
|
||||
Optional<Webhook> webhook = (flow.getTriggers() == null ? new ArrayList<AbstractTrigger>() : flow
|
||||
Optional<Webhook> maybeWebhook = (flow.getTriggers() == null ? new ArrayList<AbstractTrigger>() : flow
|
||||
.getTriggers())
|
||||
.stream()
|
||||
.filter(o -> o instanceof Webhook)
|
||||
@@ -562,11 +562,12 @@ public class ExecutionController {
|
||||
})
|
||||
.findFirst();
|
||||
|
||||
if (webhook.isEmpty()) {
|
||||
if (maybeWebhook.isEmpty()) {
|
||||
throw new HttpStatusException(HttpStatus.NOT_FOUND, "Webhook not found");
|
||||
}
|
||||
|
||||
Optional<Execution> execution = webhook.get().evaluate(request, flow);
|
||||
|
||||
final Webhook webhook = maybeWebhook.get();
|
||||
Optional<Execution> execution = webhook.evaluate(request, flow);
|
||||
|
||||
if (execution.isEmpty()) {
|
||||
throw new HttpStatusException(HttpStatus.NOT_FOUND, "No execution triggered");
|
||||
@@ -576,12 +577,25 @@ public class ExecutionController {
|
||||
if (flow.getLabels() != null) {
|
||||
result = result.withLabels(LabelService.labelsExcludingSystem(flow));
|
||||
}
|
||||
|
||||
|
||||
// we check conditions here as it's easier as the execution is created we have the body and headers available for the runContext
|
||||
var conditionContext = conditionService.conditionContext(runContextFactory.of(flow, result), flow, result);
|
||||
if (!conditionService.isValid(flow, webhook.get(), conditionContext)) {
|
||||
if (!conditionService.isValid(flow, webhook, conditionContext)) {
|
||||
return Mono.just(HttpResponse.noContent());
|
||||
}
|
||||
|
||||
// inject trigger inputs
|
||||
if (webhook.getInputs() != null) {
|
||||
RunContext runContext = runContextFactory.of(flow, result);
|
||||
try {
|
||||
Map<String, Object> inputs = runContext.render(webhook.getInputs());
|
||||
inputs = flowInputOutput.readExecutionInputs(flow, result, inputs);
|
||||
result = result.withInputs(inputs);
|
||||
} catch (Exception e) {
|
||||
log.warn("Unable to render the webhook inputs. Webhook will be ignored", e);
|
||||
throw new HttpStatusException(HttpStatus.NOT_FOUND, "No execution triggered");
|
||||
}
|
||||
}
|
||||
|
||||
try {
|
||||
// inject the traceparent into the execution
|
||||
@@ -596,7 +610,7 @@ public class ExecutionController {
|
||||
executionQueue.emit(result);
|
||||
eventPublisher.publishEvent(new CrudEvent<>(result, CrudEventType.CREATE));
|
||||
|
||||
if (webhook.get().getWait()) {
|
||||
if (webhook.getWait()) {
|
||||
var subscriberId = UUID.randomUUID().toString();
|
||||
var executionId = result.getId();
|
||||
return Flux.<Event<Execution>>create(emitter -> {
|
||||
@@ -609,7 +623,7 @@ public class ExecutionController {
|
||||
})
|
||||
.last()
|
||||
.map(event -> {
|
||||
if (webhook.get().getReturnOutputs()) {
|
||||
if (webhook.getReturnOutputs()) {
|
||||
return HttpResponse.ok(event.getData().getOutputs());
|
||||
|
||||
} else {
|
||||
|
||||
@@ -227,7 +227,24 @@ class ExecutionControllerTest {
|
||||
assertThat(response.getStatus().getCode()).isEqualTo(HttpStatus.NO_CONTENT.getCode());
|
||||
assertThat(response.body()).isNull();
|
||||
}
|
||||
|
||||
|
||||
@Test
|
||||
void webhookWithInputs() {
|
||||
record Hello(String hello) {}
|
||||
|
||||
Execution execution = client.toBlocking().retrieve(
|
||||
HttpRequest
|
||||
.POST(
|
||||
"/api/v1/main/executions/webhook/" + TESTS_FLOW_NS + "/webhook-inputs/webhookKey",
|
||||
new Hello("world")
|
||||
),
|
||||
Execution.class
|
||||
);
|
||||
|
||||
assertThat(execution).isNotNull();
|
||||
assertThat(execution.getId()).isNotNull();
|
||||
}
|
||||
|
||||
@Test
|
||||
void resolveAbsoluteDateTime() {
|
||||
final ZonedDateTime absoluteTimestamp = ZonedDateTime.of(2023, 2, 3, 4, 6,10, 0, ZoneId.systemDefault());
|
||||
|
||||
Reference in New Issue
Block a user