1
0
mirror of synced 2025-12-23 21:03:15 -05:00

quote compressed db schema history (#48456)

This commit is contained in:
Rodi Reich Zilberman
2024-11-11 15:35:41 -08:00
committed by GitHub
parent 3dfb6d13cd
commit d184317acc
3 changed files with 15 additions and 7 deletions

View File

@@ -59,6 +59,7 @@ import kotlin.random.Random
import kotlin.random.nextInt
import org.apache.kafka.connect.json.JsonConverterConfig
import org.apache.kafka.connect.source.SourceRecord
import org.apache.mina.util.Base64
@Singleton
class MySqlDebeziumOperations(
@@ -329,8 +330,14 @@ class MySqlDebeziumOperations(
} else {
stateNode.put(IS_COMPRESSED, true)
val baos = ByteArrayOutputStream()
val builder = StringBuilder()
GZIPOutputStream(baos).writer(Charsets.UTF_8).use { it.write(uncompressedString) }
stateNode.put(MYSQL_DB_HISTORY, baos.toByteArray())
builder.append("\"")
builder.append(Base64.encodeBase64(baos.toByteArray()).toString(Charsets.UTF_8))
builder.append("\"")
stateNode.put(MYSQL_DB_HISTORY, builder.toString())
}
}
return Jsons.objectNode().apply { set<JsonNode>(STATE, stateNode) }
@@ -443,11 +450,12 @@ class MySqlDebeziumOperations(
val isCompressed: Boolean = stateNode[IS_COMPRESSED]?.asBoolean() ?: false
val uncompressedString: String =
if (isCompressed) {
val textValue: String = schemaNode.textValue()
val compressedBytes: ByteArray =
Jsons.readValue(schemaNode.textValue(), ByteArray::class.java)
GZIPInputStream(ByteArrayInputStream(compressedBytes))
.reader(Charsets.UTF_8)
.readText()
textValue.substring(1, textValue.length - 1).toByteArray(Charsets.UTF_8)
val decoded = Base64.decodeBase64(compressedBytes)
GZIPInputStream(ByteArrayInputStream(decoded)).reader(Charsets.UTF_8).readText()
} else {
schemaNode.textValue()
}