From 07e2232025e8fe83f433579b462263dfd337c4fd Mon Sep 17 00:00:00 2001 From: Edward Gao Date: Thu, 17 Feb 2022 15:17:03 -0800 Subject: [PATCH] Track const config values in analytics (#10120) --- .../java/io/airbyte/commons/json/Jsons.java | 61 +++++++- .../persistence/job_tracker/JobTracker.java | 134 ++++++++++++++---- .../src/main/resources/example_config.json | 21 +++ .../main/resources/example_config_schema.json | 67 +++++++++ .../job_tracker/JobTrackerTest.java | 73 ++++++++-- 5 files changed, 316 insertions(+), 40 deletions(-) create mode 100644 airbyte-scheduler/persistence/src/main/resources/example_config_schema.json diff --git a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java index 6afcda2f748..96235a646ab 100644 --- a/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java +++ b/airbyte-commons/src/main/java/io/airbyte/commons/json/Jsons.java @@ -4,6 +4,9 @@ package io.airbyte.commons.json; +import static java.util.Collections.singletonMap; +import static java.util.stream.Collectors.toMap; + import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.core.type.TypeReference; import com.fasterxml.jackson.core.util.DefaultPrettyPrinter; @@ -19,9 +22,12 @@ import io.airbyte.commons.stream.MoreStreams; import java.io.IOException; import java.util.Arrays; import java.util.Collections; +import java.util.HashMap; import java.util.HashSet; +import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Map.Entry; import java.util.Optional; import java.util.Set; import java.util.function.BiConsumer; @@ -198,8 +204,59 @@ public class Jsons { } /** - * By the Jackson DefaultPrettyPrinter prints objects with an extra space as follows: {"name" : - * "airbyte"}. We prefer {"name": "airbyte"}. + * Flattens an ObjectNode, or dumps it into a {null: value} map if it's not an object. + */ + public static Map flatten(final JsonNode node) { + if (node.isObject()) { + final Map output = new HashMap<>(); + for (final Iterator> it = node.fields(); it.hasNext(); ) { + final Entry entry = it.next(); + final String field = entry.getKey(); + final JsonNode value = entry.getValue(); + mergeMaps(output, field, flatten(value)); + } + return output; + } else { + final Object value; + if (node.isBoolean()) { + value = node.asBoolean(); + } else if (node.isLong()) { + value = node.asLong(); + } else if (node.isInt()) { + value = node.asInt(); + } else if (node.isDouble()) { + value = node.asDouble(); + } else if (node.isValueNode() && !node.isNull()) { + value = node.asText(); + } else { + // Fallback handling for e.g. arrays + value = node.toString(); + } + return singletonMap(null, value); + } + } + + /** + * Prepend all keys in subMap with prefix, then merge that map into originalMap. + *

+ * If subMap contains a null key, then instead it is replaced with prefix. I.e. {null: value} is treated as {prefix: value} when merging into + * originalMap. + */ + public static void mergeMaps(final Map originalMap, final String prefix, final Map subMap) { + originalMap.putAll(subMap.entrySet().stream().collect(toMap( + e -> { + final String key = e.getKey(); + if (key != null) { + return prefix + "." + key; + } else { + return prefix; + } + }, + Entry::getValue))); + } + + /** + * By the Jackson DefaultPrettyPrinter prints objects with an extra space as follows: {"name" : "airbyte"}. We prefer {"name": "airbyte"}. */ private static class JsonPrettyPrinter extends DefaultPrettyPrinter { diff --git a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java index c2624e22a1d..56cbbd80682 100644 --- a/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java +++ b/airbyte-scheduler/persistence/src/main/java/io/airbyte/scheduler/persistence/job_tracker/JobTracker.java @@ -4,13 +4,17 @@ package io.airbyte.scheduler.persistence.job_tracker; +import static java.util.Collections.emptyMap; +import static java.util.Collections.singletonMap; + import com.fasterxml.jackson.databind.JsonNode; -import com.fasterxml.jackson.databind.node.ObjectNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Preconditions; import com.google.common.collect.ImmutableMap; import com.google.common.collect.ImmutableMap.Builder; import io.airbyte.analytics.TrackingClient; +import io.airbyte.commons.json.Jsons; import io.airbyte.commons.lang.Exceptions; import io.airbyte.commons.map.MoreMaps; import io.airbyte.config.JobConfig; @@ -28,12 +32,13 @@ import io.airbyte.protocol.models.ConfiguredAirbyteStream; import io.airbyte.scheduler.models.Job; import io.airbyte.scheduler.persistence.JobPersistence; import io.airbyte.scheduler.persistence.WorkspaceHelper; +import io.airbyte.validation.json.JsonSchemaValidator; import io.airbyte.validation.json.JsonValidationException; import java.io.IOException; -import java.util.Collections; import java.util.HashMap; import java.util.Iterator; import java.util.Map; +import java.util.Map.Entry; import java.util.UUID; public class JobTracker { @@ -50,6 +55,8 @@ public class JobTracker { public static final String OPERATION = "operation."; public static final String SET = "set"; + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private final ConfigRepository configRepository; private final JobPersistence jobPersistence; private final WorkspaceHelper workspaceHelper; @@ -118,8 +125,10 @@ public class JobTracker { Preconditions.checkArgument(allowedJob, "Job type " + configType + " is not allowed!"); final long jobId = job.getId(); final UUID connectionId = UUID.fromString(job.getScope()); - final UUID sourceDefinitionId = configRepository.getSourceDefinitionFromConnection(connectionId).getSourceDefinitionId(); - final UUID destinationDefinitionId = configRepository.getDestinationDefinitionFromConnection(connectionId).getDestinationDefinitionId(); + final StandardSourceDefinition sourceDefinition = configRepository.getSourceDefinitionFromConnection(connectionId); + final UUID sourceDefinitionId = sourceDefinition.getSourceDefinitionId(); + final StandardDestinationDefinition destinationDefinition = configRepository.getDestinationDefinitionFromConnection(connectionId); + final UUID destinationDefinitionId = destinationDefinition.getDestinationDefinitionId(); final Map jobMetadata = generateJobMetadata(String.valueOf(jobId), configType, job.getAttemptsCount()); final Map jobAttemptMetadata = generateJobAttemptMetadata(job.getId(), jobState); @@ -127,7 +136,10 @@ public class JobTracker { final Map destinationDefMetadata = generateDestinationDefinitionMetadata(destinationDefinitionId); final Map syncMetadata = generateSyncMetadata(connectionId); final Map stateMetadata = generateStateMetadata(jobState); - final Map syncConfigMetadata = generateSyncConfigMetadata(job.getConfig()); + final Map syncConfigMetadata = generateSyncConfigMetadata( + job.getConfig(), + sourceDefinition.getSpec().getConnectionSpecification(), + destinationDefinition.getSpec().getConnectionSpecification()); final UUID workspaceId = workspaceHelper.getWorkspaceForJobIdIgnoreExceptions(jobId); track(workspaceId, @@ -142,18 +154,20 @@ public class JobTracker { }); } - private Map generateSyncConfigMetadata(final JobConfig config) { + private Map generateSyncConfigMetadata(final JobConfig config, + final JsonNode sourceConfigSchema, + final JsonNode destinationConfigSchema) { if (config.getConfigType() == ConfigType.SYNC) { final JsonNode sourceConfiguration = config.getSync().getSourceConfiguration(); final JsonNode destinationConfiguration = config.getSync().getDestinationConfiguration(); - final Map sourceMetadata = configToMetadata(CONFIG + ".source", sourceConfiguration); - final Map destinationMetadata = configToMetadata(CONFIG + ".destination", destinationConfiguration); + final Map sourceMetadata = configToMetadata(CONFIG + ".source", sourceConfiguration, sourceConfigSchema); + final Map destinationMetadata = configToMetadata(CONFIG + ".destination", destinationConfiguration, destinationConfigSchema); final Map catalogMetadata = getCatalogMetadata(config.getSync().getConfiguredAirbyteCatalog()); return MoreMaps.merge(sourceMetadata, destinationMetadata, catalogMetadata); } else { - return Collections.emptyMap(); + return emptyMap(); } } @@ -168,30 +182,94 @@ public class JobTracker { return output; } - protected static Map configToMetadata(final String jsonPath, final JsonNode config) { + /** + * Flattens a config into a map. Uses the schema to determine which fields are const (i.e. + * non-sensitive). Non-const, non-boolean values are replaced with {@link #SET} to avoid leaking + * potentially-sensitive information. + *

+ * anyOf/allOf schemas are treated as non-const values. These aren't (currently) used in config + * schemas anyway. + * + * @param jsonPath A prefix to add to all the keys in the returned map, with a period (`.`) + * separator + * @param schema The JSON schema that {@code config} conforms to + */ + protected static Map configToMetadata(final String jsonPath, final JsonNode config, final JsonNode schema) { + final Map metadata = configToMetadata(config, schema); + // Prepend all the keys with the root jsonPath + // But leave the values unchanged final Map output = new HashMap<>(); + Jsons.mergeMaps(output, jsonPath, metadata); + return output; + } - if (config.isObject()) { - final ObjectNode node = (ObjectNode) config; - for (final Iterator> it = node.fields(); it.hasNext();) { - final var entry = it.next(); - final var field = entry.getKey(); - final var fieldJsonPath = jsonPath + "." + field; - final var child = entry.getValue(); - - if (child.isBoolean()) { - output.put(fieldJsonPath, child.asBoolean()); - } else if (!child.isNull()) { - if (child.isObject()) { - output.putAll(configToMetadata(fieldJsonPath, child)); - } else if (!child.isTextual() || (child.isTextual() && !child.asText().isEmpty())) { - output.put(fieldJsonPath, SET); - } + /** + * Does the actually interesting bits of configToMetadata. If config is an object, returns a + * flattened map. If config is _not_ an object (i.e. it's a primitive string/number/etc, or it's an + * array) then returns a map of {null: toMetadataValue(config)}. + */ + private static Map configToMetadata(final JsonNode config, final JsonNode schema) { + if (schema.hasNonNull("const")) { + // If this schema is a const, then just dump it into a map: + // * If it's an object, flatten it + // * Otherwise, do some basic conversions to value-ish data. + // It would be a weird thing to declare const: null, but in that case we don't want to report null + // anyway, so explicitly use hasNonNull. + return Jsons.flatten(config); + } else if (schema.has("oneOf")) { + // If this schema is a oneOf, then find the first sub-schema which the config matches + // and use that sub-schema to convert the config to a map + final JsonSchemaValidator validator = new JsonSchemaValidator(); + for (final Iterator it = schema.get("oneOf").elements(); it.hasNext();) { + final JsonNode subSchema = it.next(); + if (validator.test(subSchema, config)) { + return configToMetadata(config, subSchema); } } - } + // If we didn't match any of the subschemas, then something is wrong. Bail out silently. + return emptyMap(); + } else if (config.isObject()) { + // If the schema is not a oneOf, but the config is an object (i.e. the schema has "type": "object") + // then we need to recursively convert each field of the object to a map. + final Map output = new HashMap<>(); + final JsonNode maybeProperties = schema.get("properties"); - return output; + // If additionalProperties is not set, or it's a boolean, then there's no schema for additional properties. Use the accept-all schema. + // Otherwise, it's an actual schema. + final JsonNode maybeAdditionalProperties = schema.get("additionalProperties"); + final JsonNode additionalPropertiesSchema; + if (maybeAdditionalProperties == null || maybeAdditionalProperties.isBoolean()) { + additionalPropertiesSchema = OBJECT_MAPPER.createObjectNode(); + } else { + additionalPropertiesSchema = maybeAdditionalProperties; + } + + for (final Iterator> it = config.fields(); it.hasNext(); ) { + final Entry entry = it.next(); + final String field = entry.getKey(); + final JsonNode value = entry.getValue(); + + final JsonNode propertySchema; + if (maybeProperties != null && maybeProperties.hasNonNull(field)) { + // If this property is explicitly declared, then use its schema + propertySchema = maybeProperties.get(field); + } else { + // otherwise, use the additionalProperties schema + propertySchema = additionalPropertiesSchema; + } + + Jsons.mergeMaps(output, field, configToMetadata(value, propertySchema)); + } + return output; + } else if (config.isBoolean()) { + return singletonMap(null, config.asBoolean()); + } else if ((!config.isTextual() && !config.isNull()) || (config.isTextual() && !config.asText().isEmpty())) { + // This is either non-textual (e.g. integer, array, etc) or non-empty text + return singletonMap(null, SET); + } else { + // Otherwise, this is an empty string, so just ignore it + return emptyMap(); + } } private Map generateSyncMetadata(final UUID connectionId) throws ConfigNotFoundException, IOException, JsonValidationException { diff --git a/airbyte-scheduler/persistence/src/main/resources/example_config.json b/airbyte-scheduler/persistence/src/main/resources/example_config.json index 11a101d190c..c3f2f31fc16 100644 --- a/airbyte-scheduler/persistence/src/main/resources/example_config.json +++ b/airbyte-scheduler/persistence/src/main/resources/example_config.json @@ -5,6 +5,27 @@ "empty_string": "", "null_value": null, "one_of": { + "type_key": "foo", "some_key": 100 + }, + "const_object": { + "sub_key": "bar", + "sub_array": [1, 2, 3], + "sub_object": { + "sub_sub_key": "baz" + } + }, + "const_null": null, + "additionalPropertiesUnset": { + "foo": "bar" + }, + "additionalPropertiesBoolean": { + "foo": "bar" + }, + "additionalPropertiesSchema": { + "foo": 42 + }, + "additionalPropertiesConst": { + "foo": 42 } } diff --git a/airbyte-scheduler/persistence/src/main/resources/example_config_schema.json b/airbyte-scheduler/persistence/src/main/resources/example_config_schema.json new file mode 100644 index 00000000000..27ceeb10672 --- /dev/null +++ b/airbyte-scheduler/persistence/src/main/resources/example_config_schema.json @@ -0,0 +1,67 @@ +{ + "type": "object", + "properties": { + "username": { + "type": "string" + }, + "password": { + "type": "string" + }, + "has_ssl": { + "type": "boolean" + }, + "empty_string": { + "type": "string" + }, + "null_value": { + "type": "null" + }, + "one_of": { + "type": "object", + "oneOf": [ + { + "type": "object", + "properties": { + "type_key": { + "const": "foo" + }, + "some_key": { + "type": "integer" + } + } + } + ] + }, + "const_object": { + "const": { + "sub_key": "bar", + "sub_array": [1, 2, 3], + "sub_object": { + "sub_sub_key": "baz" + } + } + }, + "const_null": { + "const": null + }, + "additionalPropertiesUnset": { + "type": "object" + }, + "additionalPropertiesBoolean": { + "type": "object", + "additionalProperties": true + }, + "additionalPropertiesSchema": { + "type": "object", + "additionalProperties": { + "type": "integer" + } + }, + "additionalPropertiesConst": { + "type": "object", + "additionalProperties": { + "const": 42 + } + } + } +} diff --git a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java index 3e5bf9a0b0c..c57b5855075 100644 --- a/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java +++ b/airbyte-scheduler/persistence/src/test/java/io/airbyte/scheduler/persistence/job_tracker/JobTrackerTest.java @@ -11,7 +11,9 @@ import static org.mockito.Mockito.verify; import static org.mockito.Mockito.verifyNoInteractions; import static org.mockito.Mockito.when; +import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.collect.ImmutableMap; import io.airbyte.analytics.TrackingClient; import io.airbyte.commons.json.Jsons; @@ -36,6 +38,7 @@ import io.airbyte.config.persistence.ConfigNotFoundException; import io.airbyte.config.persistence.ConfigRepository; import io.airbyte.protocol.models.ConfiguredAirbyteCatalog; import io.airbyte.protocol.models.ConfiguredAirbyteStream; +import io.airbyte.protocol.models.ConnectorSpecification; import io.airbyte.protocol.models.DestinationSyncMode; import io.airbyte.protocol.models.SyncMode; import io.airbyte.scheduler.models.Attempt; @@ -57,6 +60,8 @@ import org.junit.jupiter.api.Test; class JobTrackerTest { + private static final ObjectMapper OBJECT_MAPPER = new ObjectMapper(); + private static final UUID WORKSPACE_ID = UUID.randomUUID(); private static final String WORKSPACE_NAME = "WORKSPACE_TEST"; private static final UUID JOB_ID = UUID.randomUUID(); @@ -100,6 +105,38 @@ class JobTrackerTest { .put("operation_count", 0) .build(); + private static final ConnectorSpecification SOURCE_SPEC; + private static final ConnectorSpecification DESTINATION_SPEC; + + static { + try { + SOURCE_SPEC = new ConnectorSpecification().withConnectionSpecification(OBJECT_MAPPER.readTree( + """ + { + "type": "object", + "properties": { + "key": { + "type": "string" + } + } + } + """)); + DESTINATION_SPEC = new ConnectorSpecification().withConnectionSpecification(OBJECT_MAPPER.readTree( + """ + { + "type": "object", + "properties": { + "key": { + "type": "boolean" + } + } + } + """)); + } catch (final JsonProcessingException e) { + throw new RuntimeException(e); + } + } + private ConfigRepository configRepository; private JobPersistence jobPersistence; @@ -259,13 +296,25 @@ class JobTrackerTest { final String configJson = MoreResources.readResource("example_config.json"); final JsonNode config = Jsons.deserialize(configJson); - final Map expected = ImmutableMap.of( - JobTracker.CONFIG + ".username", JobTracker.SET, - JobTracker.CONFIG + ".has_ssl", false, - JobTracker.CONFIG + ".password", JobTracker.SET, - JobTracker.CONFIG + ".one_of.some_key", JobTracker.SET); + final String schemaJson = MoreResources.readResource("example_config_schema.json"); + final JsonNode schema = Jsons.deserialize(schemaJson); - final Map actual = JobTracker.configToMetadata(JobTracker.CONFIG, config); + final Map expected = new ImmutableMap.Builder() + .put(JobTracker.CONFIG + ".username", JobTracker.SET) + .put(JobTracker.CONFIG + ".has_ssl", false) + .put(JobTracker.CONFIG + ".password", JobTracker.SET) + .put(JobTracker.CONFIG + ".one_of.type_key", "foo") + .put(JobTracker.CONFIG + ".one_of.some_key", JobTracker.SET) + .put(JobTracker.CONFIG + ".const_object.sub_key", "bar") + .put(JobTracker.CONFIG + ".const_object.sub_array", "[1,2,3]") + .put(JobTracker.CONFIG + ".const_object.sub_object.sub_sub_key", "baz") + .put(JobTracker.CONFIG + ".additionalPropertiesUnset.foo", JobTracker.SET) + .put(JobTracker.CONFIG + ".additionalPropertiesBoolean.foo", JobTracker.SET) + .put(JobTracker.CONFIG + ".additionalPropertiesSchema.foo", JobTracker.SET) + .put(JobTracker.CONFIG + ".additionalPropertiesConst.foo", 42) + .build(); + + final Map actual = JobTracker.configToMetadata(JobTracker.CONFIG, config, schema); assertEquals(expected, actual); } @@ -305,26 +354,30 @@ class JobTrackerTest { .withSourceDefinitionId(UUID1) .withName(SOURCE_DEF_NAME) .withDockerRepository(CONNECTOR_REPOSITORY) - .withDockerImageTag(CONNECTOR_VERSION)); + .withDockerImageTag(CONNECTOR_VERSION) + .withSpec(SOURCE_SPEC)); when(configRepository.getDestinationDefinitionFromConnection(CONNECTION_ID)) .thenReturn(new StandardDestinationDefinition() .withDestinationDefinitionId(UUID2) .withName(DESTINATION_DEF_NAME) .withDockerRepository(CONNECTOR_REPOSITORY) - .withDockerImageTag(CONNECTOR_VERSION)); + .withDockerImageTag(CONNECTOR_VERSION) + .withSpec(DESTINATION_SPEC)); when(configRepository.getStandardSourceDefinition(UUID1)) .thenReturn(new StandardSourceDefinition() .withSourceDefinitionId(UUID1) .withName(SOURCE_DEF_NAME) .withDockerRepository(CONNECTOR_REPOSITORY) - .withDockerImageTag(CONNECTOR_VERSION)); + .withDockerImageTag(CONNECTOR_VERSION) + .withSpec(SOURCE_SPEC)); when(configRepository.getStandardDestinationDefinition(UUID2)) .thenReturn(new StandardDestinationDefinition() .withDestinationDefinitionId(UUID2) .withName(DESTINATION_DEF_NAME) .withDockerRepository(CONNECTOR_REPOSITORY) - .withDockerImageTag(CONNECTOR_VERSION)); + .withDockerImageTag(CONNECTOR_VERSION) + .withSpec(DESTINATION_SPEC)); final ConfiguredAirbyteCatalog catalog = new ConfiguredAirbyteCatalog().withStreams(List.of( new ConfiguredAirbyteStream()