Support incremental sync in performance harness tool (#31588)
This commit is contained in:
@@ -44,7 +44,15 @@ on:
|
||||
sync-mode:
|
||||
description: "Sync mode to use for destination performance measurement."
|
||||
required: false
|
||||
type: choice
|
||||
options:
|
||||
- full_refresh
|
||||
- incremental
|
||||
default: "full_refresh"
|
||||
reportToDatadog:
|
||||
description: "Whether to report the performance test results to Datadog."
|
||||
required: false
|
||||
default: "false"
|
||||
jobs:
|
||||
uuid:
|
||||
name: "Custom UUID of workflow run"
|
||||
@@ -174,6 +182,7 @@ jobs:
|
||||
DS: ${{ github.event.inputs.dataset }}
|
||||
STREAM_NUMBER: ${{ github.event.inputs.stream-number }}
|
||||
SYNC_MODE: ${{ github.event.inputs.sync-mode }}
|
||||
REPORT_TO_DATADOG: ${{ inputs.reportToDatadog }}
|
||||
PREFIX: '{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.p.PerformanceTest(runTest):165'
|
||||
SUFFIX: '"}}'
|
||||
HARNESS_TYPE: ${{ steps.which-harness.outputs.harness_type }}
|
||||
|
||||
@@ -8,8 +8,8 @@ import com.fasterxml.jackson.databind.JsonNode;
|
||||
import com.fasterxml.jackson.databind.ObjectMapper;
|
||||
import io.airbyte.commons.io.IOs;
|
||||
import io.airbyte.commons.json.Jsons;
|
||||
import io.airbyte.commons.resources.MoreResources;
|
||||
import java.io.IOException;
|
||||
import java.io.InputStream;
|
||||
import java.nio.file.Files;
|
||||
import java.nio.file.Path;
|
||||
import java.util.Arrays;
|
||||
@@ -28,6 +28,7 @@ public class Main {
|
||||
// TODO: (ryankfu) add function parity with destination_performance
|
||||
int numOfParallelStreams = 1;
|
||||
String syncMode = "full_refresh";
|
||||
boolean reportToDatadog = false;
|
||||
|
||||
// TODO: (ryankfu) Integrated something akin to {@link Clis} for parsing arguments.
|
||||
switch (args.length) {
|
||||
@@ -47,6 +48,13 @@ public class Main {
|
||||
numOfParallelStreams = Integer.parseInt(args[2]);
|
||||
syncMode = args[3];
|
||||
}
|
||||
case 5 -> {
|
||||
image = args[0];
|
||||
dataset = args[1];
|
||||
numOfParallelStreams = Integer.parseInt(args[2]);
|
||||
syncMode = args[3];
|
||||
reportToDatadog = Boolean.parseBoolean(args[4]);
|
||||
}
|
||||
default -> {
|
||||
log.info("unexpected arguments");
|
||||
System.exit(1);
|
||||
@@ -65,7 +73,7 @@ public class Main {
|
||||
|
||||
final JsonNode catalog;
|
||||
try {
|
||||
catalog = getCatalog(dataset, connector);
|
||||
catalog = getCatalog(dataset, connector, syncMode);
|
||||
} catch (final IOException ex) {
|
||||
throw new IllegalStateException("Failed to read catalog", ex);
|
||||
}
|
||||
@@ -79,6 +87,8 @@ public class Main {
|
||||
final PerformanceTest test = new PerformanceTest(
|
||||
image,
|
||||
dataset,
|
||||
syncMode,
|
||||
reportToDatadog,
|
||||
config.toString(),
|
||||
catalog.toString());
|
||||
test.runTest();
|
||||
@@ -90,11 +100,11 @@ public class Main {
|
||||
System.exit(0);
|
||||
}
|
||||
|
||||
static JsonNode getCatalog(final String dataset, final String connector) throws IOException {
|
||||
static JsonNode getCatalog(final String dataset, final String connector, final String syncMode) throws IOException {
|
||||
final ObjectMapper objectMapper = new ObjectMapper();
|
||||
final String catalogFilename = "catalogs/%s/%s_catalog.json".formatted(connector, dataset);
|
||||
final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(catalogFilename);
|
||||
return objectMapper.readTree(is);
|
||||
final String template = MoreResources.readResource(catalogFilename);
|
||||
return objectMapper.readTree(String.format(template, syncMode));
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
@@ -55,13 +55,23 @@ public class PerformanceTest {
|
||||
public static final double MEGABYTE = Math.pow(1024, 2);
|
||||
private final String imageName;
|
||||
private final String dataset;
|
||||
private final String syncMode;
|
||||
private final boolean reportToDatadog;
|
||||
private final JsonNode config;
|
||||
private final ConfiguredAirbyteCatalog catalog;
|
||||
|
||||
PerformanceTest(final String imageName, final String dataset, final String config, final String catalog) throws JsonProcessingException {
|
||||
PerformanceTest(final String imageName,
|
||||
final String dataset,
|
||||
final String syncMode,
|
||||
final Boolean reportToDatadog,
|
||||
final String config,
|
||||
final String catalog)
|
||||
throws JsonProcessingException {
|
||||
final ObjectMapper mapper = new ObjectMapper();
|
||||
this.imageName = imageName;
|
||||
this.dataset = dataset;
|
||||
this.syncMode = syncMode;
|
||||
this.reportToDatadog = reportToDatadog;
|
||||
this.config = mapper.readTree(config);
|
||||
this.catalog = Jsons.deserialize(catalog, ConfiguredAirbyteCatalog.class);
|
||||
}
|
||||
@@ -118,6 +128,9 @@ public class PerformanceTest {
|
||||
totalBytes / MEGABYTE);
|
||||
}
|
||||
}
|
||||
if (source.getExitValue() > 0) {
|
||||
throw new RuntimeException("Source failed with exit code: " + source.getExitValue());
|
||||
}
|
||||
log.info("Test ended successfully");
|
||||
final var end = System.currentTimeMillis();
|
||||
final var totalMB = totalBytes / MEGABYTE;
|
||||
@@ -126,13 +139,17 @@ public class PerformanceTest {
|
||||
final var throughput = totalMB / totalTimeSecs;
|
||||
log.info("total secs: {}. total MB read: {}, rps: {}, throughput: {}", totalTimeSecs, totalMB, rps, throughput);
|
||||
source.close();
|
||||
if (!reportToDatadog) {
|
||||
return;
|
||||
}
|
||||
|
||||
final long reportingTimeInEpochSeconds = OffsetDateTime.now().toInstant().getEpochSecond();
|
||||
|
||||
List<MetricResource> metricResources = List.of(
|
||||
new MetricResource().name("github").type("runner"),
|
||||
new MetricResource().name(imageName).type("image"),
|
||||
new MetricResource().name(dataset).type("dataset"));
|
||||
new MetricResource().name(dataset).type("dataset"),
|
||||
new MetricResource().name(syncMode).type("syncMode"));
|
||||
MetricPayload body =
|
||||
new MetricPayload()
|
||||
.series(
|
||||
|
||||
@@ -40,7 +40,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["id"],
|
||||
"destination_sync_mode": "append"
|
||||
@@ -113,7 +113,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["updated_at"],
|
||||
"destination_sync_mode": "append"
|
||||
@@ -152,7 +152,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["created_at"],
|
||||
"destination_sync_mode": "append"
|
||||
|
||||
@@ -40,7 +40,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["id"],
|
||||
"destination_sync_mode": "append"
|
||||
@@ -113,7 +113,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["updated_at"],
|
||||
"destination_sync_mode": "append"
|
||||
@@ -152,7 +152,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["created_at"],
|
||||
"destination_sync_mode": "append"
|
||||
|
||||
@@ -40,7 +40,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["id"],
|
||||
"destination_sync_mode": "append"
|
||||
@@ -113,7 +113,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["updated_at"],
|
||||
"destination_sync_mode": "append"
|
||||
@@ -152,7 +152,7 @@
|
||||
"supported_sync_modes": ["full_refresh", "incremental"],
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%1$s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": ["created_at"],
|
||||
"destination_sync_mode": "append"
|
||||
|
||||
@@ -69,10 +69,10 @@
|
||||
"source_defined_cursor": true,
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": [],
|
||||
"destination_sync_mode": "overwrite"
|
||||
"destination_sync_mode": "append"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -69,10 +69,10 @@
|
||||
"source_defined_cursor": true,
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": [],
|
||||
"destination_sync_mode": "overwrite"
|
||||
"destination_sync_mode": "append"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -69,10 +69,10 @@
|
||||
"source_defined_cursor": true,
|
||||
"source_defined_primary_key": [["id"]]
|
||||
},
|
||||
"sync_mode": "full_refresh",
|
||||
"sync_mode": "%s",
|
||||
"primary_key": [["id"]],
|
||||
"cursor_field": [],
|
||||
"destination_sync_mode": "overwrite"
|
||||
"destination_sync_mode": "append"
|
||||
}
|
||||
]
|
||||
}
|
||||
|
||||
@@ -9,7 +9,13 @@ spec:
|
||||
- name: main
|
||||
image: airbyte/$HARNESS:dev
|
||||
args:
|
||||
["$CONNECTOR_IMAGE_NAME", "$DATASET", "$STREAM_NUMBER", "$SYNC_MODE"]
|
||||
[
|
||||
"$CONNECTOR_IMAGE_NAME",
|
||||
"$DATASET",
|
||||
"$STREAM_NUMBER",
|
||||
"$SYNC_MODE",
|
||||
"$REPORT_TO_DATADOG",
|
||||
]
|
||||
volumeMounts:
|
||||
- name: secrets-volume
|
||||
mountPath: /airbyte/secrets
|
||||
@@ -25,6 +31,8 @@ spec:
|
||||
value: $DD_API_KEY
|
||||
- name: DD_SITE
|
||||
value: "datadoghq.com"
|
||||
- name: USE_STREAM_CAPABLE_STATE
|
||||
value: "true"
|
||||
volumes:
|
||||
- name: secrets-volume
|
||||
hostPath:
|
||||
|
||||
Reference in New Issue
Block a user