1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Harshith/connection updates (#11153)

* Feat: first cut to allow naming for connections

* fix

* fix: migration

* fix: migration

* fix: formatting

* fix: formatting

* fix: tests

* fix: -> is bit outside of what we do generally

* fix: tests are failing

* fix: tests are failing

* fix: tests are failing

* fix: tests are failing

* fix: tests are failing
This commit is contained in:
Harshith Mullapudi
2022-03-16 03:05:18 +05:30
committed by GitHub
parent 266a491c8e
commit fa8cd83e30
11 changed files with 183 additions and 11 deletions

View File

@@ -2710,6 +2710,9 @@ components:
description: Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If "${SOURCE_NAMESPACE}" then behaves like namespaceDefinition = 'source'.
default: null
example: "${SOURCE_NAMESPACE}"
name:
type: string
description: Name that will be set to this connection
prefix:
type: string
description: Prefix that will be prepended to the name of each stream when it is written to the destination.
@@ -2732,6 +2735,9 @@ components:
- syncCatalog
- status
properties:
name:
type: string
description: Name that will be set to the connection
connectionId:
$ref: "#/components/schemas/ConnectionId"
namespaceDefinition:

View File

@@ -80,7 +80,7 @@ public class BootloaderAppTest {
mockedConfigs.getConfigDatabaseUrl())
.getAndInitialize();
val configsMigrator = new ConfigsDatabaseMigrator(configDatabase, this.getClass().getName());
assertEquals("0.35.46.001", configsMigrator.getLatestMigration().getVersion().getVersion());
assertEquals("0.35.54.001", configsMigrator.getLatestMigration().getVersion().getVersion());
val jobsPersistence = new DefaultJobPersistence(jobDatabase);
assertEquals(version, jobsPersistence.getVersion().get());

View File

@@ -0,0 +1,128 @@
/*
* Copyright (c) 2021 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.db.instance.configs.migrations;
import static org.jooq.impl.DSL.asterisk;
import static org.jooq.impl.DSL.table;
import java.util.List;
import java.util.UUID;
import java.util.stream.Collectors;
import org.flywaydb.core.api.migration.BaseJavaMigration;
import org.flywaydb.core.api.migration.Context;
import org.jooq.DSLContext;
import org.jooq.Field;
import org.jooq.Record;
import org.jooq.Result;
import org.jooq.impl.DSL;
import org.jooq.impl.SQLDataType;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class V0_35_54_001__ChangeDefaultConnectionName extends BaseJavaMigration {
private static final Logger LOGGER = LoggerFactory.getLogger(V0_35_54_001__ChangeDefaultConnectionName.class);
public static void defaultConnectionName(final DSLContext ctx) {
LOGGER.info("Updating connection name column");
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<String> name = DSL.field("name", SQLDataType.VARCHAR(256).nullable(false));
List<Connection> connections = getConnections(ctx);
for (final Connection connection : connections) {
final Actor sourceActor = getActor(connection.getSourceId(), ctx);
final Actor destinationActor = getActor(connection.getDestinationId(), ctx);
final String connectionName = sourceActor.getName() + " <> " + destinationActor.getName();
ctx.update(DSL.table("connection"))
.set(name, connectionName)
.where(id.eq(connection.getConnectionId()))
.execute();
}
}
static <T> List<Connection> getConnections(final DSLContext ctx) {
LOGGER.info("Get connections having name default");
final Field<String> name = DSL.field("name", SQLDataType.VARCHAR(36).nullable(false));
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Field<UUID> sourceId = DSL.field("source_id", SQLDataType.UUID.nullable(false));
final Field<UUID> destinationId = DSL.field("destination_id", SQLDataType.UUID.nullable(false));
final Field<String> connectionName = DSL.field("name", SQLDataType.VARCHAR(256).nullable(false));
final Result<Record> results = ctx.select(asterisk()).from(table("connection")).where(connectionName.eq("default")).fetch();
return results.stream().map(record -> new Connection(
record.get(name),
record.get(id),
record.get(sourceId),
record.get(destinationId)))
.collect(Collectors.toList());
}
static <T> Actor getActor(final UUID actorDefinitionId, final DSLContext ctx) {
final Field<String> name = DSL.field("name", SQLDataType.VARCHAR(36).nullable(false));
final Field<UUID> id = DSL.field("id", SQLDataType.UUID.nullable(false));
final Result<Record> results = ctx.select(asterisk()).from(table("actor")).where(id.eq(actorDefinitionId)).fetch();
return results.stream()
.map(record -> new Actor(record.get(name))).toList().get(0);
}
@Override
public void migrate(final Context context) throws Exception {
LOGGER.info("Running migration: {}", this.getClass().getSimpleName());
final DSLContext ctx = DSL.using(context.getConnection());
defaultConnectionName(ctx);
}
public static class Actor {
private final String name;
public <T> Actor(String name) {
this.name = name;
}
public String getName() {
return this.name;
}
}
public static class Connection {
private final String name;
private final UUID connectionId;
private final UUID sourceId;
private final UUID destinationId;
public <T> Connection(String name, UUID id, UUID sourceId, UUID destinationId) {
this.name = name;
this.connectionId = id;
this.sourceId = sourceId;
this.destinationId = destinationId;
}
public String getName() {
return this.name;
}
public UUID getSourceId() {
return this.sourceId;
}
public UUID getDestinationId() {
return this.destinationId;
}
public UUID getConnectionId() {
return this.connectionId;
}
}
}

View File

@@ -87,6 +87,10 @@ public class ApiPojoConverters {
.withCatalog(CatalogConverter.toProtocol(update.getSyncCatalog()))
.withStatus(toPersistenceStatus(update.getStatus()));
if (update.getName() != null) {
newConnection.withName(update.getName());
}
// update Resource Requirements
if (update.getResourceRequirements() != null) {
newConnection.withResourceRequirements(resourceRequirementsToInternal(update.getResourceRequirements()));

View File

@@ -100,9 +100,14 @@ public class ConnectionsHandler {
public ConnectionRead createConnection(final ConnectionCreate connectionCreate)
throws JsonValidationException, IOException, ConfigNotFoundException {
// Validate source and destination
configRepository.getSourceConnection(connectionCreate.getSourceId());
configRepository.getDestinationConnection(connectionCreate.getDestinationId());
final SourceConnection sourceConnection = configRepository.getSourceConnection(connectionCreate.getSourceId());
final DestinationConnection destinationConnection = configRepository.getDestinationConnection(connectionCreate.getDestinationId());
// Set this as default name if connectionCreate doesn't have it
final String defaultName = sourceConnection.getName() + " <> " + destinationConnection.getName();
ConnectionHelper.validateWorkspace(workspaceHelper,
connectionCreate.getSourceId(),
connectionCreate.getDestinationId(),
@@ -113,7 +118,7 @@ public class ConnectionsHandler {
// persist sync
final StandardSync standardSync = new StandardSync()
.withConnectionId(connectionId)
.withName(connectionCreate.getName() != null ? connectionCreate.getName() : "default")
.withName(connectionCreate.getName() != null ? connectionCreate.getName() : defaultName)
.withNamespaceDefinition(Enums.convertTo(connectionCreate.getNamespaceDefinition(), NamespaceDefinitionType.class))
.withNamespaceFormat(connectionCreate.getNamespaceFormat())
.withPrefix(connectionCreate.getPrefix())

View File

@@ -247,6 +247,7 @@ public class WebBackendConnectionsHandler {
public WebBackendConnectionRead webBackendCreateConnection(final WebBackendConnectionCreate webBackendConnectionCreate)
throws ConfigNotFoundException, IOException, JsonValidationException {
final List<UUID> operationIds = createOperations(webBackendConnectionCreate);
final ConnectionCreate connectionCreate = toConnectionCreate(webBackendConnectionCreate, operationIds);
return buildWebBackendConnectionRead(connectionsHandler.createConnection(connectionCreate));
}
@@ -373,6 +374,7 @@ public class WebBackendConnectionsHandler {
connectionUpdate.namespaceDefinition(webBackendConnectionUpdate.getNamespaceDefinition());
connectionUpdate.namespaceFormat(webBackendConnectionUpdate.getNamespaceFormat());
connectionUpdate.prefix(webBackendConnectionUpdate.getPrefix());
connectionUpdate.name(webBackendConnectionUpdate.getName());
connectionUpdate.operationIds(operationIds);
connectionUpdate.syncCatalog(webBackendConnectionUpdate.getSyncCatalog());
connectionUpdate.schedule(webBackendConnectionUpdate.getSchedule());

View File

@@ -110,10 +110,12 @@ class ConnectionsHandlerTest {
operationId = UUID.randomUUID();
source = new SourceConnection()
.withSourceId(sourceId)
.withWorkspaceId(workspaceId);
.withWorkspaceId(workspaceId)
.withName("presto");
destination = new DestinationConnection()
.withDestinationId(destinationId)
.withWorkspaceId(workspaceId)
.withName("hudi")
.withConfiguration(Jsons.jsonNode(Collections.singletonMap("apiKey", "123-abc")));
standardSync = new StandardSync()
.withConnectionId(connectionId)
@@ -190,6 +192,10 @@ class ConnectionsHandlerTest {
when(configRepository.getStandardSync(standardSync.getConnectionId())).thenReturn(standardSync);
when(configRepository.getSourceDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(sourceDefinition);
when(configRepository.getDestinationDefinitionFromConnection(standardSync.getConnectionId())).thenReturn(destinationDefinition);
when(configRepository.getSourceConnection(source.getSourceId()))
.thenReturn(source);
when(configRepository.getDestinationConnection(destination.getDestinationId()))
.thenReturn(destination);
final AirbyteCatalog catalog = ConnectionHelpers.generateBasicApiCatalog();
@@ -220,8 +226,13 @@ class ConnectionsHandlerTest {
}
@Test
void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace() {
void testValidateConnectionCreateSourceAndDestinationInDifferenceWorkspace()
throws JsonValidationException, ConfigNotFoundException, IOException {
when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(destinationId)).thenReturn(UUID.randomUUID());
when(configRepository.getSourceConnection(source.getSourceId()))
.thenReturn(source);
when(configRepository.getDestinationConnection(destination.getDestinationId()))
.thenReturn(destination);
final ConnectionCreate connectionCreate = new ConnectionCreate()
.sourceId(standardSync.getSourceId())
@@ -231,8 +242,12 @@ class ConnectionsHandlerTest {
}
@Test
void testValidateConnectionCreateOperationInDifferentWorkspace() {
void testValidateConnectionCreateOperationInDifferentWorkspace() throws JsonValidationException, ConfigNotFoundException, IOException {
when(workspaceHelper.getWorkspaceForOperationIdIgnoreExceptions(operationId)).thenReturn(UUID.randomUUID());
when(configRepository.getSourceConnection(source.getSourceId()))
.thenReturn(source);
when(configRepository.getDestinationConnection(destination.getDestinationId()))
.thenReturn(destination);
final ConnectionCreate connectionCreate = new ConnectionCreate()
.sourceId(standardSync.getSourceId())
@@ -314,6 +329,7 @@ class ConnectionsHandlerTest {
.status(ConnectionStatus.INACTIVE)
.schedule(null)
.syncCatalog(catalog)
.name(standardSync.getName())
.resourceRequirements(new ResourceRequirements()
.cpuLimit(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuLimit())
.cpuRequest(ConnectionHelpers.TESTING_RESOURCE_REQUIREMENTS.getCpuRequest())
@@ -587,6 +603,10 @@ class ConnectionsHandlerTest {
void failOnUnmatchedWorkspacesInCreate() throws JsonValidationException, ConfigNotFoundException, IOException {
when(workspaceHelper.getWorkspaceForSourceIdIgnoreExceptions(standardSync.getSourceId())).thenReturn(UUID.randomUUID());
when(workspaceHelper.getWorkspaceForDestinationIdIgnoreExceptions(standardSync.getDestinationId())).thenReturn(UUID.randomUUID());
when(configRepository.getSourceConnection(source.getSourceId()))
.thenReturn(source);
when(configRepository.getDestinationConnection(destination.getDestinationId()))
.thenReturn(destination);
when(uuidGenerator.get()).thenReturn(standardSync.getConnectionId());
final StandardSourceDefinition sourceDefinition = new StandardSourceDefinition()

View File

@@ -380,6 +380,7 @@ class WebBackendConnectionsHandlerTest {
.operationIds(List.of(newOperationId))
.status(ConnectionStatus.INACTIVE)
.schedule(schedule)
.name(standardSync.getName())
.syncCatalog(catalog)
.withRefreshedCatalog(false);
@@ -393,6 +394,7 @@ class WebBackendConnectionsHandlerTest {
.operationIds(operationIds)
.status(ConnectionStatus.INACTIVE)
.schedule(schedule)
.name(standardSync.getName())
.syncCatalog(catalog);
final ConnectionUpdate actual = WebBackendConnectionsHandler.toConnectionUpdate(input, operationIds);
@@ -423,7 +425,7 @@ class WebBackendConnectionsHandlerTest {
public void testForConnectionUpdateCompleteness() {
final Set<String> handledMethods =
Set.of("schedule", "connectionId", "syncCatalog", "namespaceDefinition", "namespaceFormat", "prefix", "status", "operationIds",
"resourceRequirements");
"resourceRequirements", "name");
final Set<String> methods = Arrays.stream(ConnectionUpdate.class.getMethods())
.filter(method -> method.getReturnType() == ConnectionUpdate.class)

View File

@@ -273,14 +273,12 @@ public class MigrationAcceptanceTest {
assertEquals("", connection.getPrefix());
assertEquals("28ffee2b-372a-4f72-9b95-8ed56a8b99c5", connection.getSourceId().toString());
assertEquals("4e00862d-5484-4f50-9860-f3bbb4317397", connection.getDestinationId().toString());
assertEquals("default", connection.getName());
assertEquals(ConnectionStatus.ACTIVE, connection.getStatus());
assertNull(connection.getSchedule());
} else if (connection.getConnectionId().toString().equals("49dae3f0-158b-4737-b6e4-0eed77d4b74e")) {
assertEquals("", connection.getPrefix());
assertEquals("28ffee2b-372a-4f72-9b95-8ed56a8b99c5", connection.getSourceId().toString());
assertEquals("5434615d-a3b7-4351-bc6b-a9a695555a30", connection.getDestinationId().toString());
assertEquals("default", connection.getName());
assertEquals(ConnectionStatus.ACTIVE, connection.getStatus());
assertNull(connection.getSchedule());
} else {

View File

@@ -70,6 +70,11 @@ public class ConnectionHelper {
.withCatalog(update.getCatalog())
.withStatus(update.getStatus());
// update name
if (update.getName() != null) {
newConnection.withName(update.getName());
}
// update Resource Requirements
if (update.getResourceRequirements() != null) {
newConnection.withResourceRequirements(Jsons.clone(update.getResourceRequirements()));

View File

@@ -8694,6 +8694,7 @@ font-style: italic;
<div class="param">connectionId </div><div class="param-desc"><span class="param-type"><a href="#UUID">UUID</a></span> format: uuid</div>
<div class="param">namespaceDefinition (optional)</div><div class="param-desc"><span class="param-type"><a href="#NamespaceDefinitionType">NamespaceDefinitionType</a></span> </div>
<div class="param">namespaceFormat (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If &quot;${SOURCE_NAMESPACE}&quot; then behaves like namespaceDefinition = 'source'. </div>
<div class="param">name (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> Name that will be set to this connection </div>
<div class="param">prefix (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> Prefix that will be prepended to the name of each stream when it is written to the destination. </div>
<div class="param">operationIds (optional)</div><div class="param-desc"><span class="param-type"><a href="#UUID">array[UUID]</a></span> format: uuid</div>
<div class="param">syncCatalog </div><div class="param-desc"><span class="param-type"><a href="#AirbyteCatalog">AirbyteCatalog</a></span> </div>
@@ -9523,7 +9524,8 @@ if oauth parameters were contained inside the top level, rootObject=[] If they w
<h3><a name="WebBackendConnectionUpdate"><code>WebBackendConnectionUpdate</code> - </a> <a class="up" href="#__Models">Up</a></h3>
<div class='model-description'></div>
<div class="field-items">
<div class="param">connectionId </div><div class="param-desc"><span class="param-type"><a href="#UUID">UUID</a></span> format: uuid</div>
<div class="param">name (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> Name that will be set to the connection </div>
<div class="param">connectionId </div><div class="param-desc"><span class="param-type"><a href="#UUID">UUID</a></span> format: uuid</div>
<div class="param">namespaceDefinition (optional)</div><div class="param-desc"><span class="param-type"><a href="#NamespaceDefinitionType">NamespaceDefinitionType</a></span> </div>
<div class="param">namespaceFormat (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> Used when namespaceDefinition is 'customformat'. If blank then behaves like namespaceDefinition = 'destination'. If &quot;${SOURCE_NAMESPACE}&quot; then behaves like namespaceDefinition = 'source'. </div>
<div class="param">prefix (optional)</div><div class="param-desc"><span class="param-type"><a href="#string">String</a></span> Prefix that will be prepended to the name of each stream when it is written to the destination. </div>