1
0
mirror of synced 2025-12-23 21:03:15 -05:00

Destination Performance Harness: Adds support for incremental syncs with GLOBAL state message emission (#26443)

* Adds support for incremental syncs with GLOBAL state message emission

* Adds args to Source to not break, future work is to add same features to Source perf-harness and ./gradlew :spotlessJavaApply
This commit is contained in:
Ryan Fu
2023-05-23 15:13:31 -07:00
committed by GitHub
parent ba040c9462
commit 8849669420
8 changed files with 144 additions and 41 deletions

View File

@@ -27,6 +27,10 @@ on:
description: "Number of streams to use for destination performance measurement."
required: false
default: "1"
sync-mode:
description: "Sync mode to use for destination performance measurement."
required: false
default: "full_refresh"
jobs:
uuid:
name: "Custom UUID of workflow run"
@@ -73,7 +77,8 @@ jobs:
comment-id: ${{ github.event.inputs.comment-id }}
body: |
#### Note: The following `dataset=` values are supported: `1m`<sub>(default)</sub>, `10m`, `20m`, `bottleneck_stream1`.
For destination performance only: you can also use `stream-numbers=N` to simulate N number of parallel streams.
For destinations only: you can also use `stream-numbers=N` to simulate N number of parallel streams. Additionally,
`sync-mode=incremental` is supported for destinations. For example: `dataset=1m stream-numbers=2 sync-mode=incremental`
> :runner: ${{github.event.inputs.connector}} https://github.com/${{github.repository}}/actions/runs/${{github.run_id}}.
- name: Search for valid connector name format
id: regex
@@ -146,6 +151,7 @@ jobs:
CONN: ${{ github.event.inputs.connector }}
DS: ${{ github.event.inputs.dataset }}
STREAM_NUMBER: ${{ github.event.inputs.stream-number }}
SYNC_MODE: ${{ github.event.inputs.sync-mode }}
PREFIX: '{"type":"LOG","log":{"level":"INFO","message":"INFO i.a.i.p.PerformanceTest(runTest):165'
SUFFIX: '"}}'
HARNESS_TYPE: ${{ steps.which-harness.outputs.harness_type }}
@@ -158,6 +164,7 @@ jobs:
export CONNECTOR_IMAGE_NAME=${CONN/connectors/airbyte}:dev
export DATASET=$DS
export STREAM_NUMBER=$STREAM_NUMBER
export SYNC_MODE=$SYNC_MODE
export HARNESS=$HARNESS_TYPE
envsubst < ./tools/bin/run-harness-process.yaml | kubectl create -f -
echo "harness is ${{ steps.which-harness.outputs.harness_type }}"

View File

@@ -11,6 +11,8 @@ import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.SyncMode;
import java.io.BufferedReader;
import java.io.IOException;
import java.io.InputStream;
@@ -29,12 +31,16 @@ public class Main {
private static final String CREDENTIALS_PATH = "secrets/%s_%s_credentials.json";
public static void main(final String[] args) {
// If updating args for Github Actions, also update the run-performance-test.yml file
// If updating args for Github Actions, also update the run-harness-process.yaml and
// connector-performance-command.yml
log.info("args: {}", Arrays.toString(args));
String image = null;
String dataset = "1m";
int numOfParallelStreams = 1;
String syncMode = "full_refresh";
// TODO (ryankfu): Add a better way to parse arguments. Take a look at {@link Clis.java} for
// references
switch (args.length) {
case 1 -> image = args[0];
case 2 -> {
@@ -46,6 +52,12 @@ public class Main {
dataset = args[1];
numOfParallelStreams = Integer.parseInt(args[2]);
}
case 4 -> {
image = args[0];
dataset = args[1];
numOfParallelStreams = Integer.parseInt(args[2]);
syncMode = args[3];
}
default -> {
log.info("unexpected arguments");
System.exit(1);
@@ -65,6 +77,7 @@ public class Main {
final JsonNode catalog;
try {
catalog = getCatalog(dataset, connector);
updateSyncMode(catalog, syncMode);
duplicateStreams(catalog, numOfParallelStreams);
} catch (final IOException ex) {
throw new IllegalStateException("Failed to read catalog", ex);
@@ -97,6 +110,27 @@ public class Main {
System.exit(0);
}
/**
* Modifies the catalog in place to update the syncMode to INCREMENTAL | APPEND to match CDC
* {@link CdcMssqlSourceAcceptanceTest} for reference. If the syncMode isn't INCREMENTAL then no-op
* since default catalog is FULL_REFERESH
*
* @param catalog ConfiguredCatalog to be modified
* @param syncMode syncMode to update to
*/
@VisibleForTesting
static void updateSyncMode(final JsonNode catalog, final String syncMode) {
if (syncMode.equals(SyncMode.INCREMENTAL.toString())) {
try {
final ObjectNode streamObject = (ObjectNode) catalog.path("streams").get(0);
streamObject.put("sync_mode", SyncMode.INCREMENTAL.toString());
streamObject.put("destination_sync_mode", DestinationSyncMode.APPEND.toString());
} catch (final Exception e) {
log.error("Failed to update sync mode", e);
}
}
}
/**
* Duplicate the streams in the catalog to emulate parallel streams
*

View File

@@ -19,6 +19,7 @@ import io.airbyte.config.WorkerDestinationConfig;
import io.airbyte.protocol.models.AirbyteMessage;
import io.airbyte.protocol.models.AirbyteMessage.Type;
import io.airbyte.protocol.models.AirbyteRecordMessage;
import io.airbyte.protocol.models.AirbyteStateMessage;
import io.airbyte.protocol.models.ConfiguredAirbyteCatalog;
import io.airbyte.workers.WorkerConfigs;
import io.airbyte.workers.internal.DefaultAirbyteDestination;
@@ -64,6 +65,7 @@ public class PerformanceHarness {
public static final int PORT2 = 9878;
public static final int PORT3 = 9879;
public static final int PORT4 = 9880;
public static final int STATE_FREQUENCY = 10000;
public static final Set<Integer> PORTS = Set.of(PORT1, PORT2, PORT3, PORT4);
@@ -124,6 +126,8 @@ public class PerformanceHarness {
}
});
final String syncMode = this.catalog.getStreams().get(0).getSyncMode().name();
final BufferedReader reader = loadFileFromUrl();
final Pattern pattern = Pattern.compile(",");
@@ -165,6 +169,16 @@ public class PerformanceHarness {
log.info("current throughput({}): {} total MB {}", counter, (totalBytes / MEGABYTE) / ((System.currentTimeMillis() - start) / 1000.0),
totalBytes / MEGABYTE);
}
// If sync mode is incremental, send state message every 10,000 records
if (syncMode.equals("INCREMENTAL") && counter % STATE_FREQUENCY == 0) {
final AirbyteMessage stateMessage = new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage()
.withType(AirbyteStateMessage.AirbyteStateType.GLOBAL)
.withData(Jsons.deserialize("{\"checkpoint\": \"" + counter + "\"}")));
destination.accept(stateMessage);
}
}
}
destination.notifyEndOfInput();

View File

@@ -9,49 +9,80 @@ import static org.junit.jupiter.api.Assertions.*;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.airbyte.protocol.models.DestinationSyncMode;
import io.airbyte.protocol.models.SyncMode;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
class MainTest {
private static String simpleCatalog;
@BeforeAll
public static void setup() {
simpleCatalog = """
{
"streams": [
{
"stream": {
"name": "users",
"namespace": "PERF_TEST_HARNESS",
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": "number",
"airbyte_type": "integer"
},
"academic_degree": {
"type": "string"
}
}
},
"default_cursor_field": [],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"primary_key": [["id"]],
"cursor_field": ["updated_at"],
"destination_sync_mode": "overwrite"
}
]
}
""";
}
@Test
void testDuplicateStreams() throws JsonProcessingException {
final String simpleCatalog = """
{
"streams": [
{
"stream": {
"name": "users",
"namespace": "PERF_TEST_HARNESS",
"json_schema": {
"type": "object",
"properties": {
"id": {
"type": "number",
"airbyte_type": "integer"
},
"academic_degree": {
"type": "string"
}
}
},
"default_cursor_field": [],
"supported_sync_modes": ["full_refresh", "incremental"],
"source_defined_primary_key": [["id"]]
},
"sync_mode": "full_refresh",
"primary_key": [["id"]],
"cursor_field": ["updated_at"],
"destination_sync_mode": "overwrite"
}
]
}
""";
final ObjectMapper objectMapper = new ObjectMapper();
final JsonNode root = objectMapper.readTree(simpleCatalog);
final JsonNode duplicateRoot = root.deepCopy();
final int duplicateFactor = 10;
Main.duplicateStreams(root, duplicateFactor);
assertEquals(duplicateFactor, root.get("streams").size());
assertEquals("users9", root.path("streams").get(9).path("stream").path("name").asText());
Main.duplicateStreams(duplicateRoot, duplicateFactor);
assertEquals(duplicateFactor, duplicateRoot.get("streams").size());
assertEquals("users9", duplicateRoot.path("streams").get(9).path("stream").path("name").asText());
}
@Test
void testUpdateSyncModeIncrementalAppend() throws JsonProcessingException {
final ObjectMapper objectMapper = new ObjectMapper();
final JsonNode root = objectMapper.readTree(simpleCatalog);
final JsonNode duplicateRoot = root.deepCopy();
Main.updateSyncMode(duplicateRoot, "incremental");
assertEquals(SyncMode.INCREMENTAL.toString(), duplicateRoot.path("streams").get(0).path("sync_mode").asText());
assertEquals(DestinationSyncMode.APPEND.toString(), duplicateRoot.path("streams").get(0).path("destination_sync_mode").asText());
}
@Test
void testUpdateSyncModeFullRefreshNoop() throws JsonProcessingException {
// expects a no-op when updating sync mode to full_refresh
final ObjectMapper objectMapper = new ObjectMapper();
final JsonNode root = objectMapper.readTree(simpleCatalog);
final JsonNode duplicateRoot = root.deepCopy();
Main.updateSyncMode(duplicateRoot, "full_refresh");
assertEquals(SyncMode.FULL_REFRESH.toString(), duplicateRoot.path("streams").get(0).path("sync_mode").asText());
assertEquals(DestinationSyncMode.OVERWRITE.toString(), duplicateRoot.path("streams").get(0).path("destination_sync_mode").asText());
}
}

View File

@@ -25,13 +25,28 @@ public class Main {
log.info("args: {}", Arrays.toString(args));
String image = null;
String dataset = "1m";
// TODO: (ryankfu) add function parity with destination_performance
int numOfParallelStreams = 1;
String syncMode = "full_refresh";
// TODO: (ryankfu) Integrated something akin to {@link Clis} for parsing arguments.
switch (args.length) {
case 1 -> image = args[0];
case 2 -> {
image = args[0];
dataset = args[1];
}
case 3 -> {
image = args[0];
dataset = args[1];
numOfParallelStreams = Integer.parseInt(args[2]);
}
case 4 -> {
image = args[0];
dataset = args[1];
numOfParallelStreams = Integer.parseInt(args[2]);
syncMode = args[3];
}
default -> {
log.info("unexpected arguments");
System.exit(1);

View File

@@ -393,7 +393,7 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
@Override
public JdbcDatabase createDatabase(final JsonNode sourceConfig) throws SQLException {
// return super.createDatabase(sourceConfig, this::getConnectionProperties);
// return super.createDatabase(sourceConfig, this::getConnectionProperties);
final JsonNode jdbcConfig = toDatabaseConfig(sourceConfig);
// Create the data source
final DataSource dataSource = DataSourceFactory.create(
@@ -419,7 +419,8 @@ public class MySqlSource extends AbstractJdbcSource<MysqlType> implements Source
public Map<String, String> getConnectionProperties(final JsonNode config) {
final Map<String, String> customProperties =
config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)
? parseJdbcParameters(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText(), DEFAULT_JDBC_PARAMETERS_DELIMITER) : new HashMap<>();
? parseJdbcParameters(config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText(), DEFAULT_JDBC_PARAMETERS_DELIMITER)
: new HashMap<>();
final Map<String, String> defaultProperties = JdbcDataSourceUtils.getDefaultConnectionProperties(config);
assertCustomParametersDontOverwriteDefaultParameters(customProperties, defaultProperties);
return MoreMaps.merge(customProperties, defaultProperties);

View File

@@ -195,10 +195,10 @@ public class MySqlSourceTests {
}
@Test
void testParseJdbcParameters() {
Map<String, String> parameters = MySqlSource.parseJdbcParameters("theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar", "&");
Map<String, String> parameters =
MySqlSource.parseJdbcParameters("theAnswerToLiveAndEverything=42&sessionVariables=max_execution_time=10000&foo=bar", "&");
assertEquals("max_execution_time=10000", parameters.get("sessionVariables"));
assertEquals("42", parameters.get("theAnswerToLiveAndEverything"));
assertEquals("bar", parameters.get("foo"));
@@ -229,4 +229,5 @@ public class MySqlSourceTests {
assertEquals(AirbyteConnectionStatus.Status.SUCCEEDED, check.getStatus());
}
}
}

View File

@@ -8,7 +8,7 @@ spec:
containers:
- name: main
image: airbyte/$HARNESS:dev
args: ["$CONNECTOR_IMAGE_NAME", "$DATASET", "$STREAM_NUMBER"]
args: ["$CONNECTOR_IMAGE_NAME", "$DATASET", "$STREAM_NUMBER", "$SYNC_MODE"]
volumeMounts:
- name: secrets-volume
mountPath: /airbyte/secrets