1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Rodi/refactor output consumer (#62862)

This commit is contained in:
Rodi Reich Zilberman
2025-07-10 12:59:25 -07:00
committed by GitHub
parent f748f52ace
commit 045491d8fe
6 changed files with 119 additions and 160 deletions

View File

@@ -25,43 +25,12 @@ import java.util.concurrent.ConcurrentHashMap
/** Configuration properties prefix for [StdoutOutputConsumer]. */
const val CONNECTOR_OUTPUT_PREFIX = "airbyte.connector.output"
/** A simple [OutputConsumer] such as standard output or buffering test output consumer. */
abstract class StandardOutputConsumer(clock: Clock) : OutputConsumer(clock)
/** Default implementation of [OutputConsumer]. */
@Singleton
@Secondary
class StdoutOutputConsumer(
val stdout: PrintStream,
abstract class BaseStdoutOutputConsumer(
clock: Clock,
/**
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
* buffer's size (in bytes) grows past this value.
*
* Flushing the record buffer to stdout is done by calling [println] which is synchronized. The
* choice of [println] is imposed by our use of the ConsoleJSONAppender log4j appended which
* concurrently calls [println] to print [AirbyteMessage]s of type LOG to standard output.
*
* Because calling [println] incurs both a synchronization overhead and a syscall overhead, the
* connector's performance will noticeably degrade if it's called too often. This happens
* primarily when emitting lots of tiny RECORD messages, which is typical of source connectors.
*
* For this reason, the [bufferByteSizeThresholdForFlush] value should not be too small. The
* default value of 4kB is good in this respect. For example, if the average serialized record
* size is 100 bytes, this will reduce the volume of [println] calls by a factor of 40.
*
* Conversely, the [bufferByteSizeThresholdForFlush] value should also not be too large.
* Otherwise, the output becomes bursty and this also degrades performance. As of today (and
* hopefully not for long) the platform still pipes the connector's stdout into socat to emit
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
*/
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}")
val bufferByteSizeThresholdForFlush: Int,
) : StandardOutputConsumer(clock) {
private val buffer = ByteArrayOutputStream() // TODO: replace this with a StringWriter?
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)
) : OutputConsumer(clock) {
protected open val buffer = ByteArrayOutputStream()
protected val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
protected val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(jsonGenerator)
override fun accept(airbyteMessage: AirbyteMessage) {
// This method effectively println's its JSON-serialized argument.
@@ -90,7 +59,7 @@ class StdoutOutputConsumer(
}
}
private fun withLockMaybeWriteNewline() {
protected fun withLockMaybeWriteNewline() {
if (buffer.size() > 0) {
buffer.write('\n'.code)
}
@@ -103,56 +72,14 @@ class StdoutOutputConsumer(
}
}
private fun withLockFlush() {
if (buffer.size() > 0) {
stdout.println(buffer.toString(Charsets.UTF_8))
stdout.flush()
buffer.reset()
}
}
abstract fun withLockFlush()
override fun accept(record: AirbyteRecordMessage) {
// The serialization of RECORD messages can become a performance bottleneck for source
// connectors because they can come in much higher volumes than other message types.
// Specifically, with jackson, the bottleneck is in the object mapping logic.
// As it turns out, this object mapping logic is not particularly useful for RECORD messages
// because within a given stream the only variations occur in the "data" and the "meta"
// fields:
// - the "data" field is already an ObjectNode and is cheap to serialize,
// - the "meta" field is often unset.
// For this reason, this method builds and reuses a JSON template for each stream.
// Then, for each record, it serializes just "data" and "meta" to populate the template.
val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace)
synchronized(this) {
// Write a newline character to the buffer if it's not empty.
withLockMaybeWriteNewline()
// Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'.
buffer.write(template.prefix)
// Serialize the record data ObjectNode to JSON, writing it to the buffer.
Jsons.writeTree(jsonGenerator, record.data)
jsonGenerator.flush()
// If the record has a AirbyteRecordMessageMeta instance set,
// write ',"meta":' followed by the serialized meta.
val meta: AirbyteRecordMessageMeta? = record.meta
if (meta != null) {
buffer.write(metaPrefixBytes)
sequenceWriter.write(meta)
sequenceWriter.flush()
}
// Write ',"emitted_at":...}}'.
buffer.write(template.suffix)
// Flush the buffer to stdout only once it has reached a certain size.
// Flushing to stdout incurs some overhead (mutex, syscall, etc.)
// which otherwise becomes very apparent when lots of tiny records are involved.
if (buffer.size() >= bufferByteSizeThresholdForFlush) {
withLockFlush()
}
}
}
protected val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray()
private val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray()
private fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate {
protected open fun getOrCreateRecordTemplate(
stream: String,
namespace: String?
): RecordTemplate {
val streamToTemplateMap: StreamToTemplateMap =
if (namespace == null) {
unNamespacedTemplates
@@ -164,8 +91,8 @@ class StdoutOutputConsumer(
}
}
private val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>()
private val unNamespacedTemplates = StreamToTemplateMap()
protected val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>()
protected val unNamespacedTemplates = StreamToTemplateMap()
companion object {
const val META_PREFIX = ""","meta":"""
@@ -223,6 +150,91 @@ class RecordTemplate(
}
}
/** A simple [OutputConsumer] such as standard output or buffering test output consumer. */
abstract class StandardOutputConsumer(clock: Clock) : BaseStdoutOutputConsumer(clock)
/** Default implementation of [OutputConsumer]. */
@Singleton
@Secondary
class StdoutOutputConsumer(
val stdout: PrintStream,
clock: Clock,
/**
* [bufferByteSizeThresholdForFlush] triggers flushing the record buffer to stdout once the
* buffer's size (in bytes) grows past this value.
*
* Flushing the record buffer to stdout is done by calling [println] which is synchronized. The
* choice of [println] is imposed by our use of the ConsoleJSONAppender log4j appended which
* concurrently calls [println] to print [AirbyteMessage]s of type LOG to standard output.
*
* Because calling [println] incurs both a synchronization overhead and a syscall overhead, the
* connector's performance will noticeably degrade if it's called too often. This happens
* primarily when emitting lots of tiny RECORD messages, which is typical of source connectors.
*
* For this reason, the [bufferByteSizeThresholdForFlush] value should not be too small. The
* default value of 4kB is good in this respect. For example, if the average serialized record
* size is 100 bytes, this will reduce the volume of [println] calls by a factor of 40.
*
* Conversely, the [bufferByteSizeThresholdForFlush] value should also not be too large.
* Otherwise, the output becomes bursty and this also degrades performance. As of today (and
* hopefully not for long) the platform still pipes the connector's stdout into socat to emit
* the output as TCP packets. While socat is buffered, its buffer size is only 8 kB. In any
* case, TCP packet sized (capped by the MTU) are also in the low kilobytes.
*/
@Value("\${$CONNECTOR_OUTPUT_PREFIX.buffer-byte-size-threshold-for-flush:4096}")
val bufferByteSizeThresholdForFlush: Int,
) :
StandardOutputConsumer(
clock,
) {
override fun withLockFlush() {
if (buffer.size() > 0) {
stdout.println(buffer.toString(Charsets.UTF_8))
stdout.flush()
buffer.reset()
}
}
override fun accept(record: AirbyteRecordMessage) {
// The serialization of RECORD messages can become a performance bottleneck for source
// connectors because they can come in much higher volumes than other message types.
// Specifically, with jackson, the bottleneck is in the object mapping logic.
// As it turns out, this object mapping logic is not particularly useful for RECORD messages
// because within a given stream the only variations occur in the "data" and the "meta"
// fields:
// - the "data" field is already an ObjectNode and is cheap to serialize,
// - the "meta" field is often unset.
// For this reason, this method builds and reuses a JSON template for each stream.
// Then, for each record, it serializes just "data" and "meta" to populate the template.
val template: RecordTemplate = getOrCreateRecordTemplate(record.stream, record.namespace)
synchronized(this) {
// Write a newline character to the buffer if it's not empty.
withLockMaybeWriteNewline()
// Write '{"type":"RECORD","record":{"namespace":"...","stream":"...","data":'.
buffer.write(template.prefix)
// Serialize the record data ObjectNode to JSON, writing it to the buffer.
Jsons.writeTree(jsonGenerator, record.data)
jsonGenerator.flush()
// If the record has a AirbyteRecordMessageMeta instance set,
// write ',"meta":' followed by the serialized meta.
val meta: AirbyteRecordMessageMeta? = record.meta
if (meta != null) {
buffer.write(metaPrefixBytes)
sequenceWriter.write(meta)
sequenceWriter.flush()
}
// Write ',"emitted_at":...}}'.
buffer.write(template.suffix)
// Flush the buffer to stdout only once it has reached a certain size.
// Flushing to stdout incurs some overhead (mutex, syscall, etc.)
// which otherwise becomes very apparent when lots of tiny records are involved.
if (buffer.size() >= bufferByteSizeThresholdForFlush) {
withLockFlush()
}
}
}
}
@Factory
private class PrintStreamFactory {

View File

@@ -41,10 +41,10 @@ class BufferingOutputConsumer(
synchronized(this) { field = value }
}
override fun accept(input: AirbyteMessage) {
override fun accept(airbyteMessage: AirbyteMessage) {
// Deep copy the input, which may be reused and mutated later on.
val m: AirbyteMessage =
Jsons.readValue(Jsons.writeValueAsBytes(input), AirbyteMessage::class.java)
Jsons.readValue(Jsons.writeValueAsBytes(airbyteMessage), AirbyteMessage::class.java)
synchronized(this) {
messages.add(m)
when (m.type) {
@@ -63,7 +63,9 @@ class BufferingOutputConsumer(
}
}
override fun close() {}
override fun close() = Unit
override fun withLockFlush() = Unit
fun records(): List<AirbyteRecordMessage> =
synchronized(this) { listOf(*records.toTypedArray()) }

View File

@@ -4,71 +4,23 @@
package io.airbyte.cdk.output.sockets
import com.fasterxml.jackson.core.JsonGenerator
import com.fasterxml.jackson.databind.SequenceWriter
import io.airbyte.cdk.output.OutputConsumer
import io.airbyte.cdk.output.BaseStdoutOutputConsumer
import io.airbyte.cdk.output.RecordTemplate
import io.airbyte.cdk.output.StreamToTemplateMap
import io.airbyte.cdk.util.Jsons
import io.airbyte.protocol.models.v0.AirbyteMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessage
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMeta
import io.github.oshai.kotlinlogging.KotlinLogging
import java.io.ByteArrayOutputStream
import java.time.Clock
import java.util.concurrent.ConcurrentHashMap
// This class emits Airbyte messages in JSONL format to a socket data channel.
// Emits Airbyte messages in JSONL format to a socket data channel.
// It accepts a data channel acquired by the caller.
class SocketJsonOutputConsumer(
private val dataChannel: SocketDataChannel,
clock: Clock,
val bufferByteSizeThresholdForFlush: Int,
private val additionalProperties: Map<String, String>,
) : OutputConsumer(clock) {
private val log = KotlinLogging.logger {}
private val buffer = ByteArrayOutputStream()
private val jsonGenerator: JsonGenerator = Jsons.createGenerator(buffer)
private val sequenceWriter: SequenceWriter = Jsons.writer().writeValues(buffer)
override fun accept(airbyteMessage: AirbyteMessage) {
// This method effectively println's its JSON-serialized argument.
// Using println is not particularly efficient, however.
// To improve performance, this method accumulates RECORD messages into a buffer
// before writing them to standard output in a batch.
if (airbyteMessage.type == AirbyteMessage.Type.RECORD) {
// RECORD messages undergo a different serialization scheme.
accept(airbyteMessage.record)
} else {
synchronized(this) {
// Write a newline character to the buffer if it's not empty.
withLockMaybeWriteNewline()
// Non-RECORD AirbyteMessage instances are serialized and written to the buffer
// using standard jackson object mapping facilities.
sequenceWriter.write(airbyteMessage)
sequenceWriter.flush()
// Such messages don't linger in the buffer, they are flushed to stdout immediately,
// along with whatever might have already been lingering inside.
// This prints a newline after the message.
withLockFlush()
}
}
}
private fun withLockMaybeWriteNewline() {
if (buffer.size() > 0) {
buffer.write('\n'.code)
}
}
override fun close() {
synchronized(this) {
// Flush any remaining buffer contents to stdout before closing.
withLockFlush()
}
}
private fun withLockFlush() {
) : BaseStdoutOutputConsumer(clock) {
override fun withLockFlush() {
if (buffer.size() > 0) {
buffer.writeTo(dataChannel.outputStream)
dataChannel.outputStream?.write(System.lineSeparator().toByteArray())
@@ -113,9 +65,7 @@ class SocketJsonOutputConsumer(
}
}
private val metaPrefixBytes: ByteArray = META_PREFIX.toByteArray()
private fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate {
override fun getOrCreateRecordTemplate(stream: String, namespace: String?): RecordTemplate {
val streamToTemplateMap: StreamToTemplateMap =
if (namespace == null) {
unNamespacedTemplates
@@ -131,11 +81,4 @@ class SocketJsonOutputConsumer(
)
}
}
private val namespacedTemplates = ConcurrentHashMap<String, StreamToTemplateMap>()
private val unNamespacedTemplates = StreamToTemplateMap()
companion object {
const val META_PREFIX = ""","meta":"""
}
}

View File

@@ -38,6 +38,7 @@ private class RecordingOutputConsumer(clock: Clock = Clock.fixed(Instant.EPOCH,
_messages += airbyteMessage
}
override fun close() = Unit
override fun withLockFlush() = Unit
}
class StatsEmitterTest {

View File

@@ -9,7 +9,7 @@ data:
connectorSubtype: database
connectorType: source
definitionId: 435bb9a5-7887-4809-aa58-28c27df0d7ad
dockerImageTag: 3.50.0
dockerImageTag: 3.50.1-rc.1
dockerRepository: airbyte/source-mysql
documentationUrl: https://docs.airbyte.com/integrations/sources/mysql
githubIssueLabel: source-mysql
@@ -77,5 +77,5 @@ data:
message: Add default cursor for cdc
upgradeDeadline: "2023-08-17"
rolloutConfiguration:
enableProgressiveRollout: false
enableProgressiveRollout: true
metadataSpecVersion: "1.0"

View File

@@ -230,12 +230,13 @@ Any database or table encoding combination of charset and collation is supported
| Version | Date | Pull Request | Subject |
|:------------|:-----------|:-----------------------------------------------------------|:------------------------------------------------------------------------------------------------------------------------------------------------|
| 3.50.0 | 2025-07-08 | [62863](https://github.com/airbytehq/airbyte/pull/62863) | Promoting release candidate 3.50.0-rc.1 to a main version. |
| 3.50.0-rc.1 | 2025-07-08 | [60993](https://github.com/airbytehq/airbyte/pull/60993) | Prepare to enable speed improvements |
| 3.12.0 | 2025-06-26 | [60993](https://github.com/airbytehq/airbyte/pull/60993) | Boosted Mode
| 3.11.21 | 2025-05-30 | [61014](https://github.com/airbytehq/airbyte/pull/61014) | Fix merge error. Point to a published CDK
| 3.11.20 | 2025-05-29 | [60218](https://github.com/airbytehq/airbyte/pull/60218) | Testing concurrent read.
| 3.11.19 | 2025-05-11 | [60214](https://github.com/airbytehq/airbyte/pull/60214) | Migrate to new Gradle flow.
| 3.50.1-rc.1 | 2025-07-08 | [62862](https://github.com/airbytehq/airbyte/pull/62862) | Prepare to enable speed improvements |
| 3.50.0 | 2025-07-08 | [62863](https://github.com/airbytehq/airbyte/pull/62863) | Promoting release candidate 3.50.0-rc.1 to a main version. |
| 3.50.0-rc.1 | 2025-07-08 | [60993](https://github.com/airbytehq/airbyte/pull/60993) | Prepare to enable speed improvements |
| 3.12.0 | 2025-06-26 | [60993](https://github.com/airbytehq/airbyte/pull/60993) | Boosted Mode
| 3.11.21 | 2025-05-30 | [61014](https://github.com/airbytehq/airbyte/pull/61014) | Fix merge error. Point to a published CDK
| 3.11.20 | 2025-05-29 | [60218](https://github.com/airbytehq/airbyte/pull/60218) | Testing concurrent read.
| 3.11.19 | 2025-05-11 | [60214](https://github.com/airbytehq/airbyte/pull/60214) | Migrate to new Gradle flow.
| 3.11.18 | 2025-05-02 | [59732](https://github.com/airbytehq/airbyte/pull/59732) | Fix a bug that caused the sync to go into a loop in some cases. |
| 3.11.17 | 2025-05-02 | [59683](https://github.com/airbytehq/airbyte/pull/59683) | CDK version bump. |
| 3.11.16 | 2025-05-02 | [59223](https://github.com/airbytehq/airbyte/pull/59223) | Improve handling of big int and decimal values preventing it from represented with scientific notation |