1
0
mirror of synced 2026-02-03 01:02:02 -05:00
Files
airbyte/airbyte-workers
Pedro S. Lopez 2a3817748a feat(Platform): update actor configuration when receiving control messages from connectors during sync (#19811)
* track latest config message

* pass new config as part of outputs

* persist new config

* persist config as the messages come through, dont set output

* clean up old implementation

* accept control messages for destinations

* get api client from micronaut

* mask instance-wide oauth params when updating configs

* defaultreplicationworker tests

* formatting

* tests for source/destination handlers

* rm todo

* refactor test a bit to fix pmd

* fix pmd

* fix test

* add PersistConfigHelperTest

* update message tracker comment

* fix pmd

* format

* move ApiClientBeanFactory to commons-worker, use in container-orchestrator

* pull out config updating to separate methods

* add jitter

* rename PersistConfigHelper -> UpdateConnectorConfigHelper, docs

* fix exception type

* fmt

* move message type check into runnable

* formatting

* pass api client env vars to container orchestrator

* pass micronaut envs to container orchestrator

* print stacktrace for debugging

* different api host for container orchestrator

* fix default env var

* format

* fix errors after merge

* set source and destination actor id as part of the sync input

* fix: get destination definition

* fix null ptr

* remove "actor" from naming

* fix missing change from rename

* revert ContainerOrchestratorConfigBeanFactory changes

* inject sourceapi/destinationapi directly rather than airbyteapiclient

* UpdateConnectorConfigHelper -> ConnectorConfigUpdater

* rm log

* fix test

* dont fail on config update error

* pass id, not full config to runnables/accept control message

* add new config required for api client

* add test file

* fix test compatibility

* mount data plane credentials secret to container orchestrator (#20724)

* mount data plane credentials secret to container orchestrator

* rm copy-pasta

* properly handle empty strings

* set env vars like before

* use the right config vars
2023-01-06 13:17:25 -04:00
..
2022-03-13 14:45:36 -07:00

airbyte-workers

This module contains the logic for how Jobs are executed. Jobs are executed using a tool called Temporal.

Temporal Development

Versioning

Temporal is maintaining an internal history of the activity it runs. This history is based on a specific order. If we restart a temporal workflow with a new implementation that has a different order, the workflow will be stuck and will need manual action to be properly restarted. Temporal provides an API to be able to manage those changes smoothly. However, temporal is very permissive with version rules. Airbyte will follow the following rules:

  • There will be one global version per workflow, meaning that we will use a single tag per workflow.
  • All the following code modifications will need to bump the version number, it won't be limited to a release of a new airbyte version
    • Addition of an activity
    • Deletion of an activity
    • Change of the input of an activity
    • Addition of a temporal sleep timer

The way to use this version should be the following:

If no prior version usage is present:

final int version = Workflow.getVersion(VERSION_LABEL, MINIMAL_VERSION, CURRENT_VERSION);

if (version >= CURRENT_VERSION) {
        // New implemenation
}

if some prior version usage is present (we bump the version from 4 to 5 in this example):

final int version = Workflow.getVersion(VERSION_LABEL, MINIMAL_VERSION, CURRENT_VERSION);

if (version <= 4 && version >= MINIMAL_VERSION) {
        // old implemenation
} else if (version >= CURRENT_VERSION) {
        // New implemenation
}

Removing a version

Removing a version is a potential breaking change and should be done version carefully. We should maintain a MINIMAL_VERSION to keep track of the current minimal version. Both MINIMAL_VERSION and CURRENT_VERSION needs to be present on the workflow file even if they are unused (if they have been used once).