Compare commits

...

2 Commits

Author SHA1 Message Date
François Delbrayelle
a5b004e1e1 fix(tests): on HttpClientTest 2025-09-12 19:16:58 +02:00
François Delbrayelle
b74d09accb Reapply "feat(retry): use the retry policy on HttpClient (#10922)" (#11263)
This reverts commit 01e8e46b77.
2025-09-12 19:16:58 +02:00
4 changed files with 100 additions and 188 deletions

View File

@@ -39,8 +39,6 @@ public class NamespaceFilesUpdateCommand extends AbstractApiCommand {
@Inject
private TenantIdSelectorService tenantService;
private static final String KESTRA_IGNORE_FILE = ".kestraignore";
@Override
public Integer call() throws Exception {
super.call();

View File

@@ -4,11 +4,15 @@ import com.fasterxml.jackson.core.type.TypeReference;
import io.kestra.core.exceptions.IllegalVariableEvaluationException;
import io.kestra.core.http.HttpRequest;
import io.kestra.core.http.HttpResponse;
import io.kestra.core.http.client.apache.*;
import io.kestra.core.http.client.apache.FailedResponseInterceptor;
import io.kestra.core.http.client.apache.LoggingRequestInterceptor;
import io.kestra.core.http.client.apache.LoggingResponseInterceptor;
import io.kestra.core.http.client.apache.RunContextResponseInterceptor;
import io.kestra.core.http.client.configurations.HttpConfiguration;
import io.kestra.core.runners.DefaultRunContext;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.RetryUtils;
import io.micrometer.common.KeyValues;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.ApacheHttpClientContext;
import io.micrometer.core.instrument.binder.httpcomponents.hc5.DefaultApacheHttpClientObservationConvention;
@@ -21,7 +25,8 @@ import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.ArrayUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.hc.client5.http.ContextBuilder;
import org.apache.hc.client5.http.auth.*;
import org.apache.hc.client5.http.auth.AuthScope;
import org.apache.hc.client5.http.auth.UsernamePasswordCredentials;
import org.apache.hc.client5.http.config.ConnectionConfig;
import org.apache.hc.client5.http.impl.ChainElement;
import org.apache.hc.client5.http.impl.DefaultAuthenticationStrategy;
@@ -39,6 +44,8 @@ import org.apache.hc.core5.http.io.entity.EntityUtils;
import org.apache.hc.core5.ssl.SSLContexts;
import org.apache.hc.core5.util.Timeout;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
import java.io.Closeable;
import java.io.IOException;
import java.io.InputStream;
@@ -46,11 +53,8 @@ import java.net.*;
import java.security.KeyManagementException;
import java.security.KeyStoreException;
import java.security.NoSuchAlgorithmException;
import java.time.Duration;
import java.util.List;
import java.util.function.Consumer;
import javax.net.ssl.SSLContext;
import javax.net.ssl.SSLHandshakeException;
@Slf4j
public class HttpClient implements Closeable {
@@ -75,7 +79,7 @@ public class HttpClient implements Closeable {
throw new IllegalStateException("Client has already been created");
}
org.apache.hc.client5.http.impl.classic.HttpClientBuilder builder = HttpClients.custom()
var builder = HttpClients.custom()
.disableDefaultUserAgent()
.setUserAgent("Kestra");
@@ -89,49 +93,37 @@ public class HttpClient implements Closeable {
// logger
if (this.configuration.getLogs() != null && this.configuration.getLogs().length > 0) {
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_HEADERS) ||
ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_BODY)
) {
ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.REQUEST_BODY)) {
builder.addRequestInterceptorLast(new LoggingRequestInterceptor(runContext.logger(), this.configuration.getLogs()));
}
if (ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.RESPONSE_HEADERS) ||
ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.RESPONSE_BODY)
) {
ArrayUtils.contains(this.configuration.getLogs(), HttpConfiguration.LoggingType.RESPONSE_BODY)) {
builder.addResponseInterceptorLast(new LoggingResponseInterceptor(runContext.logger(), this.configuration.getLogs()));
}
}
// Object dependencies
PoolingHttpClientConnectionManagerBuilder connectionManagerBuilder = PoolingHttpClientConnectionManagerBuilder.create();
ConnectionConfig.Builder connectionConfig = ConnectionConfig.custom();
BasicCredentialsProvider credentialsStore = new BasicCredentialsProvider();
var connectionManagerBuilder = PoolingHttpClientConnectionManagerBuilder.create();
var connectionConfig = ConnectionConfig.custom();
var credentialsStore = new BasicCredentialsProvider();
// Timeout
if (this.configuration.getTimeout() != null) {
var connectTimeout = runContext.render(this.configuration.getTimeout().getConnectTimeout()).as(Duration.class);
var connectTimeout = runContext.render(this.configuration.getTimeout().getConnectTimeout()).as(java.time.Duration.class);
connectTimeout.ifPresent(duration -> connectionConfig.setConnectTimeout(Timeout.of(duration)));
var readIdleTimeout = runContext.render(this.configuration.getTimeout().getReadIdleTimeout()).as(Duration.class);
var readIdleTimeout = runContext.render(this.configuration.getTimeout().getReadIdleTimeout()).as(java.time.Duration.class);
readIdleTimeout.ifPresent(duration -> connectionConfig.setSocketTimeout(Timeout.of(duration)));
}
// proxy
if (this.configuration.getProxy() != null && configuration.getProxy().getAddress() != null) {
String proxyAddress = runContext.render(configuration.getProxy().getAddress()).as(String.class).orElse(null);
var proxyAddress = runContext.render(configuration.getProxy().getAddress()).as(String.class).orElse(null);
if (StringUtils.isNotEmpty(proxyAddress)) {
int port = runContext.render(configuration.getProxy().getPort()).as(Integer.class).orElseThrow();
SocketAddress proxyAddr = new InetSocketAddress(
proxyAddress,
port
);
Proxy proxy = new Proxy(runContext.render(configuration.getProxy().getType()).as(Proxy.Type.class).orElse(null), proxyAddr);
var port = runContext.render(configuration.getProxy().getPort()).as(Integer.class).orElseThrow();
var proxyAddr = new InetSocketAddress(proxyAddress, port);
var proxy = new Proxy(runContext.render(configuration.getProxy().getType()).as(Proxy.Type.class).orElse(null), proxyAddr);
builder.setProxySelector(new ProxySelector() {
@Override
public void connectFailed(URI uri, SocketAddress sa, IOException e) {
/* ignore */
}
@Override
@@ -142,7 +134,6 @@ public class HttpClient implements Closeable {
if (this.configuration.getProxy().getUsername() != null && this.configuration.getProxy().getPassword() != null) {
builder.setProxyAuthenticationStrategy(new DefaultAuthenticationStrategy());
credentialsStore.setCredentials(
new AuthScope(proxyAddress, port),
new UsernamePasswordCredentials(
@@ -154,19 +145,16 @@ public class HttpClient implements Closeable {
}
}
// ssl
if (this.configuration.getSsl() != null) {
if (this.configuration.getSsl().getInsecureTrustAllCertificates() != null) {
connectionManagerBuilder.setSSLSocketFactory(this.selfSignedConnectionSocketFactory());
}
}
// auth
if (this.configuration.getAuth() != null) {
this.configuration.getAuth().configure(builder, runContext);
}
// root options
if (!runContext.render(this.configuration.getFollowRedirects()).as(Boolean.class).orElseThrow()) {
builder.disableRedirectHandling();
}
@@ -176,8 +164,7 @@ public class HttpClient implements Closeable {
}
if (this.configuration.getAllowedResponseCodes() != null) {
List<Integer> list = runContext.render(this.configuration.getAllowedResponseCodes()).asList(Integer.class);
var list = runContext.render(this.configuration.getAllowedResponseCodes()).asList(Integer.class);
if (!list.isEmpty()) {
builder.addResponseInterceptorLast(new FailedResponseInterceptor(list));
}
@@ -185,91 +172,51 @@ public class HttpClient implements Closeable {
builder.addResponseInterceptorLast(new RunContextResponseInterceptor(this.runContext));
// builder object
connectionManagerBuilder.setDefaultConnectionConfig(connectionConfig.build());
builder.setConnectionManager(connectionManagerBuilder.build());
builder.setDefaultCredentialsProvider(credentialsStore);
this.client = builder.build();
return client;
}
private SSLConnectionSocketFactory selfSignedConnectionSocketFactory() {
try {
SSLContext sslContext = SSLContexts
.custom()
.loadTrustMaterial(null, (chain, authType) -> true)
.build();
SSLContext sslContext = SSLContexts.custom().loadTrustMaterial(null, (chain, authType) -> true).build();
return new SSLConnectionSocketFactory(sslContext, NoopHostnameVerifier.INSTANCE);
} catch (NoSuchAlgorithmException | KeyStoreException | KeyManagementException e) {
throw new IllegalArgumentException(e);
}
}
/**
* Send a request
*
* @param request the request
* @param cls the class of the response
* @param <T> the type of response expected
* @return the response
*/
public <T> HttpResponse<T> request(HttpRequest request, Class<T> cls) throws HttpClientException, IllegalVariableEvaluationException {
HttpClientContext httpClientContext = this.clientContext(request);
var httpClientContext = this.clientContext();
return this.request(request, httpClientContext, r -> {
T body = bodyHandler(cls, r.getEntity());
return HttpResponse.from(r, body, request, httpClientContext);
});
}
/**
* Send a request, getting the response with body as input stream
*
* @param request the request
* @param consumer the consumer of the response
* @return the response without the body
*/
public HttpResponse<Void> request(HttpRequest request, Consumer<HttpResponse<InputStream>> consumer) throws HttpClientException, IllegalVariableEvaluationException {
HttpClientContext httpClientContext = this.clientContext(request);
var httpClientContext = this.clientContext();
return this.request(request, httpClientContext, r -> {
HttpResponse<InputStream> from = HttpResponse.from(
r,
r.getEntity() != null ? r.getEntity().getContent() : null,
request,
httpClientContext
);
var from = HttpResponse.from(r, r.getEntity() != null ? r.getEntity().getContent() : null, request, httpClientContext);
consumer.accept(from);
return HttpResponse.from(r, null, request, httpClientContext);
});
}
/**
* Send a request and expect a json response
*
* @param request the request
* @param <T> the type of response expected
* @return the response
*/
public <T> HttpResponse<T> request(HttpRequest request) throws HttpClientException, IllegalVariableEvaluationException {
HttpClientContext httpClientContext = this.clientContext(request);
var httpClientContext = this.clientContext();
return this.request(request, httpClientContext, response -> {
T body = JacksonMapper.ofJson().readValue(response.getEntity().getContent(), new TypeReference<>() {});
T body = JacksonMapper.ofJson().readValue(response.getEntity().getContent(), new TypeReference<>() {
});
return HttpResponse.from(response, body, request, httpClientContext);
});
}
private HttpClientContext clientContext(HttpRequest request) {
ContextBuilder contextBuilder = ContextBuilder.create();
private HttpClientContext clientContext() {
var contextBuilder = ContextBuilder.create();
return contextBuilder.build();
}
@@ -277,22 +224,31 @@ public class HttpClient implements Closeable {
HttpRequest request,
HttpClientContext httpClientContext,
HttpClientResponseHandler<HttpResponse<T>> responseHandler
) throws HttpClientException {
try {
return this.client.execute(request.to(runContext), httpClientContext, responseHandler);
} catch (SocketException e) {
throw new HttpClientRequestException(e.getMessage(), request, e);
} catch (IOException e) {
if (e instanceof SSLHandshakeException) {
throw new HttpClientRequestException(e.getMessage(), request, e);
}
) throws HttpClientException, IllegalVariableEvaluationException {
if (e.getCause() instanceof HttpClientException httpClientException) {
throw httpClientException;
}
var retryableCodes = runContext.render(configuration.getRetryOnStatusCodes()).asList(Integer.class);
throw new RuntimeException(e);
}
return new RetryUtils().<HttpResponse<T>, HttpClientException>of(configuration.getRetry())
.run(
(res, throwable) -> {
if (throwable instanceof HttpClientResponseException ex) {
return retryableCodes.contains(ex.getResponse().getStatus().getCode());
}
return throwable instanceof HttpClientRequestException
|| throwable instanceof SocketException
|| throwable instanceof SSLHandshakeException;
},
() -> {
try {
return this.client.execute(request.to(runContext), httpClientContext, responseHandler);
} catch (org.apache.hc.client5.http.ClientProtocolException e) {
if (e.getCause() instanceof HttpClientException ex) {
throw ex;
}
throw new RuntimeException(e);
}
}
);
}
@SuppressWarnings("unchecked")

View File

@@ -2,6 +2,8 @@ package io.kestra.core.http.client.configurations;
import io.kestra.core.models.annotations.PluginProperty;
import io.kestra.core.models.property.Property;
import io.kestra.core.models.tasks.retrys.AbstractRetry;
import io.kestra.core.models.tasks.retrys.Exponential;
import io.micronaut.logging.LogLevel;
import io.swagger.v3.oas.annotations.media.Schema;
import lombok.Builder;
@@ -55,6 +57,19 @@ public class HttpConfiguration {
@PluginProperty
private LoggingType[] logs;
@Schema(title = "Retry strategy for HTTP requests.")
@Builder.Default
private AbstractRetry retry = Exponential.builder()
.interval(Duration.ofMillis(1000))
.maxInterval(Duration.ofSeconds(30))
.maxAttempts(3)
.build();
@Setter
@Schema(title = "HTTP status codes that should be retried.")
@Builder.Default
private Property<List<Integer>> retryOnStatusCodes = Property.ofValue(List.of(502, 503, 504));
public enum LoggingType {
REQUEST_HEADERS,
REQUEST_BODY,
@@ -62,7 +77,6 @@ public class HttpConfiguration {
RESPONSE_BODY
}
// Deprecated properties
@Schema(title = "The time allowed to establish a connection to the server before failing.")
@Deprecated
private final Duration connectTimeout;
@@ -104,7 +118,6 @@ public class HttpConfiguration {
@Deprecated
private final LogLevel logLevel;
// Deprecated properties with no equivalent value to be kept, silently ignore
@Schema(title = "The time allowed for a read connection to remain idle before closing it.")
@Deprecated
private final Duration readIdleTimeout;
@@ -121,115 +134,73 @@ public class HttpConfiguration {
@Deprecated
public HttpConfigurationBuilder connectTimeout(Duration connectTimeout) {
if (this.timeout == null) {
this.timeout = TimeoutConfiguration.builder()
.build();
this.timeout = TimeoutConfiguration.builder().build();
}
this.timeout = this.timeout.toBuilder()
.connectTimeout(Property.ofValue(connectTimeout))
.build();
this.timeout = this.timeout.toBuilder().connectTimeout(Property.ofValue(connectTimeout)).build();
return this;
}
@Deprecated
public HttpConfigurationBuilder readTimeout(Duration readTimeout) {
if (this.timeout == null) {
this.timeout = TimeoutConfiguration.builder()
.build();
this.timeout = TimeoutConfiguration.builder().build();
}
this.timeout = this.timeout.toBuilder()
.readIdleTimeout(Property.ofValue(readTimeout))
.build();
this.timeout = this.timeout.toBuilder().readIdleTimeout(Property.ofValue(readTimeout)).build();
return this;
}
@Deprecated
public HttpConfigurationBuilder proxyType(Proxy.Type proxyType) {
if (this.proxy == null) {
this.proxy = ProxyConfiguration.builder()
.build();
this.proxy = ProxyConfiguration.builder().build();
}
this.proxy = this.proxy.toBuilder()
.type(Property.ofValue(proxyType))
.build();
this.proxy = this.proxy.toBuilder().type(Property.ofValue(proxyType)).build();
return this;
}
@Deprecated
public HttpConfigurationBuilder proxyAddress(String proxyAddress) {
if (this.proxy == null) {
this.proxy = ProxyConfiguration.builder()
.build();
this.proxy = ProxyConfiguration.builder().build();
}
this.proxy = this.proxy.toBuilder()
.address(Property.ofValue(proxyAddress))
.build();
this.proxy = this.proxy.toBuilder().address(Property.ofValue(proxyAddress)).build();
return this;
}
@Deprecated
public HttpConfigurationBuilder proxyPort(Integer proxyPort) {
if (this.proxy == null) {
this.proxy = ProxyConfiguration.builder()
.build();
this.proxy = ProxyConfiguration.builder().build();
}
this.proxy = this.proxy.toBuilder()
.port(Property.ofValue(proxyPort))
.build();
this.proxy = this.proxy.toBuilder().port(Property.ofValue(proxyPort)).build();
return this;
}
@Deprecated
public HttpConfigurationBuilder proxyUsername(String proxyUsername) {
if (this.proxy == null) {
this.proxy = ProxyConfiguration.builder()
.build();
this.proxy = ProxyConfiguration.builder().build();
}
this.proxy = this.proxy.toBuilder()
.username(Property.ofValue(proxyUsername))
.build();
this.proxy = this.proxy.toBuilder().username(Property.ofValue(proxyUsername)).build();
return this;
}
@Deprecated
public HttpConfigurationBuilder proxyPassword(String proxyPassword) {
if (this.proxy == null) {
this.proxy = ProxyConfiguration.builder()
.build();
this.proxy = ProxyConfiguration.builder().build();
}
this.proxy = this.proxy.toBuilder()
.password(Property.ofValue(proxyPassword))
.build();
this.proxy = this.proxy.toBuilder().password(Property.ofValue(proxyPassword)).build();
return this;
}
@SuppressWarnings("DeprecatedIsStillUsed")
@Deprecated
public HttpConfigurationBuilder basicAuthUser(String basicAuthUser) {
if (this.auth == null || !(this.auth instanceof BasicAuthConfiguration)) {
this.auth = BasicAuthConfiguration.builder()
.build();
this.auth = BasicAuthConfiguration.builder().build();
}
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder()
.username(Property.ofValue(basicAuthUser))
.build();
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder().username(Property.ofValue(basicAuthUser)).build();
return this;
}
@@ -237,37 +208,21 @@ public class HttpConfiguration {
@Deprecated
public HttpConfigurationBuilder basicAuthPassword(String basicAuthPassword) {
if (this.auth == null || !(this.auth instanceof BasicAuthConfiguration)) {
this.auth = BasicAuthConfiguration.builder()
.build();
this.auth = BasicAuthConfiguration.builder().build();
}
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder()
.password(Property.ofValue(basicAuthPassword))
.build();
this.auth = ((BasicAuthConfiguration) this.auth).toBuilder().password(Property.ofValue(basicAuthPassword)).build();
return this;
}
@Deprecated
public HttpConfigurationBuilder logLevel(LogLevel logLevel) {
if (logLevel == LogLevel.TRACE) {
this.logs = new LoggingType[]{
LoggingType.REQUEST_HEADERS,
LoggingType.REQUEST_BODY,
LoggingType.RESPONSE_HEADERS,
LoggingType.RESPONSE_BODY
};
this.logs = new LoggingType[]{LoggingType.REQUEST_HEADERS, LoggingType.REQUEST_BODY, LoggingType.RESPONSE_HEADERS, LoggingType.RESPONSE_BODY};
} else if (logLevel == LogLevel.DEBUG) {
this.logs = new LoggingType[]{
LoggingType.REQUEST_HEADERS,
LoggingType.RESPONSE_HEADERS,
};
this.logs = new LoggingType[]{LoggingType.REQUEST_HEADERS, LoggingType.RESPONSE_HEADERS};
} else if (logLevel == LogLevel.INFO) {
this.logs = new LoggingType[]{
LoggingType.RESPONSE_HEADERS,
};
this.logs = new LoggingType[]{LoggingType.RESPONSE_HEADERS};
}
return this;
}
}

View File

@@ -19,6 +19,7 @@ import io.kestra.core.queues.QueueInterface;
import io.kestra.core.runners.RunContext;
import io.kestra.core.serializers.JacksonMapper;
import io.kestra.core.utils.IdUtils;
import io.kestra.core.utils.RetryUtils;
import io.kestra.core.utils.TestsUtils;
import io.micronaut.context.ApplicationContext;
import io.micronaut.http.HttpStatus;
@@ -109,7 +110,8 @@ class HttpClientTest {
return builder.build();
}
@RetryingTest(5) // Flaky on CI but never locally even with 100 repetitions
@RetryingTest(5)
// Flaky on CI but never locally even with 100 repetitions
void getText() throws IllegalVariableEvaluationException, HttpClientException, IOException {
Flow flow = TestsUtils.mockFlow();
Execution execution = TestsUtils.mockExecution(flow, Map.of());
@@ -292,7 +294,7 @@ class HttpClientTest {
Map<String, Object> multipart = Map.of(
"ping", "pong",
"int", 1,
"file", new File(Objects.requireNonNull(this.getClass().getClassLoader().getResource("logback.xml")).toURI()),
"file", new File(Objects.requireNonNull(this.getClass().getClassLoader().getResource("logback.xml")).toURI()),
"inputStream", new ByteArrayInputStream(IOUtils.toString(
Objects.requireNonNull(this.getClass().getClassLoader().getResourceAsStream("logback.xml")),
StandardCharsets.UTF_8
@@ -310,8 +312,7 @@ class HttpClientTest {
assertThat(response.getBody().get("ping")).isEqualTo("pong");
assertThat(response.getBody().get("int")).isEqualTo("1");
assertThat((String) response.getBody().get("file")).contains("logback");
// @FIXME: Request seems to be correct, but not returned by micronaut
// assertThat((String) response.getBody().get("inputStream"), containsString("logback"));
assertThat((String) response.getBody().get("inputStream")).contains("logback");
assertThat(response.getHeaders().firstValue(HttpHeaders.CONTENT_TYPE).orElseThrow()).isEqualTo(MediaType.APPLICATION_JSON);
}
}
@@ -321,12 +322,14 @@ class HttpClientTest {
try (HttpClient client = client()) {
URI uri = URI.create("http://localhost:1234");
HttpClientRequestException e = assertThrows(HttpClientRequestException.class, () -> {
RetryUtils.RetryFailed retryFailed = assertThrows(RetryUtils.RetryFailed.class, () -> {
client.request(HttpRequest.of(uri));
});
assertThat(e.getRequest().getUri()).isEqualTo(uri);
assertThat(e.getMessage()).contains("Connection refused");
Throwable cause = retryFailed.getCause();
assertThat(cause).isInstanceOf(org.apache.hc.client5.http.HttpHostConnectException.class);
var e = (org.apache.hc.client5.http.HttpHostConnectException) cause;
assertThat(e.getMessage()).contains("Connect to http://localhost:1234 failed");
}
}