Integrate source performance harness with datadog (#31410)
Co-authored-by: xiaohansong <xiaohansong@users.noreply.github.com>
This commit is contained in:
@@ -183,13 +183,10 @@ jobs:
|
||||
connector_name=$(echo $CONN | cut -d / -f 2)
|
||||
kind load docker-image airbyte/$connector_name:dev --name chart-testing
|
||||
kind load docker-image airbyte/$HARNESS_TYPE:dev --name chart-testing
|
||||
# envsubst requires variables to be exported
|
||||
# envsubst requires variables to be exported or setup in the env field in this step.
|
||||
export CONNECTOR_IMAGE_NAME=${CONN/connectors/airbyte}:dev
|
||||
export DATASET=$DS
|
||||
export STREAM_NUMBER=$STREAM_NUMBER
|
||||
export SYNC_MODE=$SYNC_MODE
|
||||
export HARNESS=$HARNESS_TYPE
|
||||
export DD_API_KEY=$DD_API_KEY
|
||||
envsubst < ./tools/bin/run-harness-process.yaml | kubectl create -f -
|
||||
echo "harness is ${{ steps.which-harness.outputs.harness_type }}"
|
||||
POD=$(kubectl get pod -l app=performance-harness -o jsonpath="{.items[0].metadata.name}")
|
||||
|
||||
@@ -16,4 +16,5 @@ dependencies {
|
||||
implementation 'org.apache.commons:commons-lang3:3.11'
|
||||
implementation 'io.airbyte:airbyte-commons-worker:0.42.0'
|
||||
implementation 'io.airbyte.airbyte-config:config-models:0.42.0'
|
||||
implementation 'com.datadoghq:datadog-api-client:2.16.0'
|
||||
}
|
||||
|
||||
@@ -78,6 +78,7 @@ public class Main {
|
||||
try {
|
||||
final PerformanceTest test = new PerformanceTest(
|
||||
image,
|
||||
dataset,
|
||||
config.toString(),
|
||||
catalog.toString());
|
||||
test.runTest();
|
||||
|
||||
@@ -4,6 +4,15 @@
|
||||
|
||||
package io.airbyte.integrations.source_performance;
|
||||
|
||||
import com.datadog.api.client.ApiClient;
|
||||
import com.datadog.api.client.ApiException;
|
||||
import com.datadog.api.client.v2.api.MetricsApi;
|
||||
import com.datadog.api.client.v2.model.IntakePayloadAccepted;
|
||||
import com.datadog.api.client.v2.model.MetricIntakeType;
|
||||
import com.datadog.api.client.v2.model.MetricPayload;
|
||||
import com.datadog.api.client.v2.model.MetricPoint;
|
||||
import com.datadog.api.client.v2.model.MetricResource;
|
||||
import com.datadog.api.client.v2.model.MetricSeries;
|
||||
import com.fasterxml.jackson.core.JsonProcessingException;
|
||||
import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
@@ -26,6 +35,8 @@ import io.fabric8.kubernetes.client.KubernetesClient;
|
||||
import java.net.InetAddress;
|
||||
import java.nio.file.Path;
|
||||
import java.time.Duration;
|
||||
import java.time.OffsetDateTime;
|
||||
import java.util.Collections;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import java.util.Set;
|
||||
@@ -43,17 +54,24 @@ public class PerformanceTest {
|
||||
|
||||
public static final double MEGABYTE = Math.pow(1024, 2);
|
||||
private final String imageName;
|
||||
private final String dataset;
|
||||
private final JsonNode config;
|
||||
private final ConfiguredAirbyteCatalog catalog;
|
||||
|
||||
PerformanceTest(final String imageName, final String config, final String catalog) throws JsonProcessingException {
|
||||
PerformanceTest(final String imageName, final String dataset, final String config, final String catalog) throws JsonProcessingException {
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
this.imageName = imageName;
|
||||
this.dataset = dataset;
|
||||
this.config = mapper.readTree(config);
|
||||
this.catalog = Jsons.deserialize(catalog, ConfiguredAirbyteCatalog.class);
|
||||
}
|
||||
|
||||
void runTest() throws Exception {
|
||||
|
||||
// Initialize datadog.
|
||||
ApiClient defaultClient = ApiClient.getDefaultApiClient();
|
||||
MetricsApi apiInstance = new MetricsApi(defaultClient);
|
||||
|
||||
KubePortManagerSingleton.init(PORTS);
|
||||
|
||||
final KubernetesClient fabricClient = new DefaultKubernetesClient();
|
||||
@@ -105,8 +123,44 @@ public class PerformanceTest {
|
||||
final var totalMB = totalBytes / MEGABYTE;
|
||||
final var totalTimeSecs = (end - start) / 1000.0;
|
||||
final var rps = counter / totalTimeSecs;
|
||||
log.info("total secs: {}. total MB read: {}, rps: {}, throughput: {}", totalTimeSecs, totalMB, rps, totalMB / totalTimeSecs);
|
||||
final var throughput = totalMB / totalTimeSecs;
|
||||
log.info("total secs: {}. total MB read: {}, rps: {}, throughput: {}", totalTimeSecs, totalMB, rps, throughput);
|
||||
source.close();
|
||||
|
||||
final long reportingTimeInEpochSeconds = OffsetDateTime.now().toInstant().getEpochSecond();
|
||||
|
||||
List<MetricResource> metricResources = List.of(
|
||||
new MetricResource().name("github").type("runner"),
|
||||
new MetricResource().name(imageName).type("image"),
|
||||
new MetricResource().name(dataset).type("dataset"));
|
||||
MetricPayload body =
|
||||
new MetricPayload()
|
||||
.series(
|
||||
List.of(
|
||||
new MetricSeries()
|
||||
.metric("connectors.performance.rps")
|
||||
.type(MetricIntakeType.GAUGE)
|
||||
.points(
|
||||
Collections.singletonList(
|
||||
new MetricPoint()
|
||||
.timestamp(reportingTimeInEpochSeconds)
|
||||
.value(rps)))
|
||||
.resources(metricResources),
|
||||
new MetricSeries()
|
||||
.metric("connectors.performance.throughput")
|
||||
.type(MetricIntakeType.GAUGE)
|
||||
.points(
|
||||
Collections.singletonList(
|
||||
new MetricPoint()
|
||||
.timestamp(reportingTimeInEpochSeconds)
|
||||
.value(throughput)))
|
||||
.resources(metricResources)));
|
||||
try {
|
||||
IntakePayloadAccepted result = apiInstance.submitMetrics(body);
|
||||
System.out.println(result);
|
||||
} catch (ApiException e) {
|
||||
log.error("Exception when calling MetricsApi#submitMetrics.", e);
|
||||
}
|
||||
}
|
||||
|
||||
private static <V0, V1> V0 convertProtocolObject(final V1 v1, final Class<V0> klass) {
|
||||
|
||||
@@ -20,6 +20,11 @@ spec:
|
||||
requests:
|
||||
cpu: "2.5"
|
||||
memory: "2Gi"
|
||||
env:
|
||||
- name: DD_API_KEY
|
||||
value: $DD_API_KEY
|
||||
- name: DD_SITE
|
||||
value: "datadoghq.com"
|
||||
volumes:
|
||||
- name: secrets-volume
|
||||
hostPath:
|
||||
|
||||
Reference in New Issue
Block a user