diff --git a/.github/workflows/connector-performance-command.yml b/.github/workflows/connector-performance-command.yml index c8b3b1d5581..391e6736a71 100644 --- a/.github/workflows/connector-performance-command.yml +++ b/.github/workflows/connector-performance-command.yml @@ -44,7 +44,15 @@ on: sync-mode: description: "Sync mode to use for destination performance measurement." required: false + type: choice + options: + - full_refresh + - incremental default: "full_refresh" + reportToDatadog: + description: "Whether to report the performance test results to Datadog." + required: false + default: "false" jobs: uuid: name: "Custom UUID of workflow run" @@ -174,6 +182,7 @@ jobs: DS: ${{ github.event.inputs.dataset }} STREAM_NUMBER: ${{ github.event.inputs.stream-number }} SYNC_MODE: ${{ github.event.inputs.sync-mode }} + REPORT_TO_DATADOG: ${{ inputs.reportToDatadog }} PREFIX: '{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.p.PerformanceTest(runTest):165' SUFFIX: '"}}' HARNESS_TYPE: ${{ steps.which-harness.outputs.harness_type }} diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/source_performance/Main.java b/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/source_performance/Main.java index 012841a9a89..56a0122e059 100644 --- a/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/source_performance/Main.java +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/source_performance/Main.java @@ -8,8 +8,8 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import io.airbyte.commons.io.IOs; import io.airbyte.commons.json.Jsons; +import io.airbyte.commons.resources.MoreResources; import java.io.IOException; -import java.io.InputStream; import java.nio.file.Files; import java.nio.file.Path; import java.util.Arrays; @@ -28,6 +28,7 @@ public class Main { // TODO: (ryankfu) add function parity with destination_performance int numOfParallelStreams = 1; String syncMode = "full_refresh"; + boolean reportToDatadog = false; // TODO: (ryankfu) Integrated something akin to {@link Clis} for parsing arguments. switch (args.length) { @@ -47,6 +48,13 @@ public class Main { numOfParallelStreams = Integer.parseInt(args[2]); syncMode = args[3]; } + case 5 -> { + image = args[0]; + dataset = args[1]; + numOfParallelStreams = Integer.parseInt(args[2]); + syncMode = args[3]; + reportToDatadog = Boolean.parseBoolean(args[4]); + } default -> { log.info("unexpected arguments"); System.exit(1); @@ -65,7 +73,7 @@ public class Main { final JsonNode catalog; try { - catalog = getCatalog(dataset, connector); + catalog = getCatalog(dataset, connector, syncMode); } catch (final IOException ex) { throw new IllegalStateException("Failed to read catalog", ex); } @@ -79,6 +87,8 @@ public class Main { final PerformanceTest test = new PerformanceTest( image, dataset, + syncMode, + reportToDatadog, config.toString(), catalog.toString()); test.runTest(); @@ -90,11 +100,11 @@ public class Main { System.exit(0); } - static JsonNode getCatalog(final String dataset, final String connector) throws IOException { + static JsonNode getCatalog(final String dataset, final String connector, final String syncMode) throws IOException { final ObjectMapper objectMapper = new ObjectMapper(); final String catalogFilename = "catalogs/%s/%s_catalog.json".formatted(connector, dataset); - final InputStream is = Thread.currentThread().getContextClassLoader().getResourceAsStream(catalogFilename); - return objectMapper.readTree(is); + final String template = MoreResources.readResource(catalogFilename); + return objectMapper.readTree(String.format(template, syncMode)); } } diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/source_performance/PerformanceTest.java b/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/source_performance/PerformanceTest.java index 29cfe1b549f..4166aa28f80 100644 --- a/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/source_performance/PerformanceTest.java +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/java/io/airbyte/integrations/source_performance/PerformanceTest.java @@ -55,13 +55,23 @@ public class PerformanceTest { public static final double MEGABYTE = Math.pow(1024, 2); private final String imageName; private final String dataset; + private final String syncMode; + private final boolean reportToDatadog; private final JsonNode config; private final ConfiguredAirbyteCatalog catalog; - PerformanceTest(final String imageName, final String dataset, final String config, final String catalog) throws JsonProcessingException { + PerformanceTest(final String imageName, + final String dataset, + final String syncMode, + final Boolean reportToDatadog, + final String config, + final String catalog) + throws JsonProcessingException { final ObjectMapper mapper = new ObjectMapper(); this.imageName = imageName; this.dataset = dataset; + this.syncMode = syncMode; + this.reportToDatadog = reportToDatadog; this.config = mapper.readTree(config); this.catalog = Jsons.deserialize(catalog, ConfiguredAirbyteCatalog.class); } @@ -118,6 +128,9 @@ public class PerformanceTest { totalBytes / MEGABYTE); } } + if (source.getExitValue() > 0) { + throw new RuntimeException("Source failed with exit code: " + source.getExitValue()); + } log.info("Test ended successfully"); final var end = System.currentTimeMillis(); final var totalMB = totalBytes / MEGABYTE; @@ -126,13 +139,17 @@ public class PerformanceTest { final var throughput = totalMB / totalTimeSecs; log.info("total secs: {}. total MB read: {}, rps: {}, throughput: {}", totalTimeSecs, totalMB, rps, throughput); source.close(); + if (!reportToDatadog) { + return; + } final long reportingTimeInEpochSeconds = OffsetDateTime.now().toInstant().getEpochSecond(); List metricResources = List.of( new MetricResource().name("github").type("runner"), new MetricResource().name(imageName).type("image"), - new MetricResource().name(dataset).type("dataset")); + new MetricResource().name(dataset).type("dataset"), + new MetricResource().name(syncMode).type("syncMode")); MetricPayload body = new MetricPayload() .series( diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/10m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/10m_catalog.json index 75d4fc01be8..48e4f23dfd5 100644 --- a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/10m_catalog.json +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/10m_catalog.json @@ -40,7 +40,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["id"], "destination_sync_mode": "append" @@ -113,7 +113,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["updated_at"], "destination_sync_mode": "append" @@ -152,7 +152,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["created_at"], "destination_sync_mode": "append" diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/1m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/1m_catalog.json index ad274523846..f9ebc75dbb4 100644 --- a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/1m_catalog.json +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/1m_catalog.json @@ -40,7 +40,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["id"], "destination_sync_mode": "append" @@ -113,7 +113,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["updated_at"], "destination_sync_mode": "append" @@ -152,7 +152,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["created_at"], "destination_sync_mode": "append" diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/20m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/20m_catalog.json index 55c38e69c72..4f9baa60df9 100644 --- a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/20m_catalog.json +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-mysql/20m_catalog.json @@ -40,7 +40,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["id"], "destination_sync_mode": "append" @@ -113,7 +113,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["updated_at"], "destination_sync_mode": "append" @@ -152,7 +152,7 @@ "supported_sync_modes": ["full_refresh", "incremental"], "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%1$s", "primary_key": [["id"]], "cursor_field": ["created_at"], "destination_sync_mode": "append" diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/10m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/10m_catalog.json index d99fccf9d06..a130885e5ef 100644 --- a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/10m_catalog.json +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/10m_catalog.json @@ -69,10 +69,10 @@ "source_defined_cursor": true, "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%s", "primary_key": [["id"]], "cursor_field": [], - "destination_sync_mode": "overwrite" + "destination_sync_mode": "append" } ] } diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/1m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/1m_catalog.json index d99fccf9d06..a130885e5ef 100644 --- a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/1m_catalog.json +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/1m_catalog.json @@ -69,10 +69,10 @@ "source_defined_cursor": true, "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%s", "primary_key": [["id"]], "cursor_field": [], - "destination_sync_mode": "overwrite" + "destination_sync_mode": "append" } ] } diff --git a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/20m_catalog.json b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/20m_catalog.json index d99fccf9d06..a130885e5ef 100644 --- a/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/20m_catalog.json +++ b/airbyte-integrations/connectors-performance/source-harness/src/main/resources/catalogs/source-postgres/20m_catalog.json @@ -69,10 +69,10 @@ "source_defined_cursor": true, "source_defined_primary_key": [["id"]] }, - "sync_mode": "full_refresh", + "sync_mode": "%s", "primary_key": [["id"]], "cursor_field": [], - "destination_sync_mode": "overwrite" + "destination_sync_mode": "append" } ] } diff --git a/tools/bin/run-harness-process.yaml b/tools/bin/run-harness-process.yaml index 166e002622c..81411d31471 100644 --- a/tools/bin/run-harness-process.yaml +++ b/tools/bin/run-harness-process.yaml @@ -9,7 +9,13 @@ spec: - name: main image: airbyte/$HARNESS:dev args: - ["$CONNECTOR_IMAGE_NAME", "$DATASET", "$STREAM_NUMBER", "$SYNC_MODE"] + [ + "$CONNECTOR_IMAGE_NAME", + "$DATASET", + "$STREAM_NUMBER", + "$SYNC_MODE", + "$REPORT_TO_DATADOG", + ] volumeMounts: - name: secrets-volume mountPath: /airbyte/secrets @@ -25,6 +31,8 @@ spec: value: $DD_API_KEY - name: DD_SITE value: "datadoghq.com" + - name: USE_STREAM_CAPABLE_STATE + value: "true" volumes: - name: secrets-volume hostPath: