1
0
mirror of synced 2025-12-25 02:09:19 -05:00

ClickHouse V2: Release (#62887)

This commit is contained in:
Ryan Br...
2025-07-10 12:23:57 -07:00
committed by GitHub
parent 9d31a46495
commit bf99d28486
80 changed files with 340 additions and 2518 deletions

View File

@@ -1,5 +0,0 @@
# ClickHouse Strict Encrypt Test Configuration
In order to test the ClickHouse destination, you need to have the up and running ClickHouse database that has SSL enabled.
This connector inherits the ClickHouse destination, but support SSL connections only.

View File

@@ -1,38 +0,0 @@
plugins {
id 'application'
id 'airbyte-java-connector'
id("io.airbyte.gradle.docker")
id("airbyte-connector-docker-convention")
}
airbyteJavaConnector {
cdkVersionRequired = '0.22.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
}
//remove once upgrading the CDK version to 0.4.x or later
java {
compileJava {
options.compilerArgs.remove("-Werror")
}
}
airbyteJavaConnector.addCdkDependencies()
application {
mainClass = 'io.airbyte.integrations.destination.clickhouse.ClickhouseDestinationStrictEncrypt'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
}
dependencies {
implementation project(':airbyte-integrations:connectors:destination-clickhouse')
implementation 'com.clickhouse:clickhouse-jdbc:0.3.2-patch10:all'
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
testImplementation 'org.testcontainers:clickhouse:1.19.0'
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
integrationTestJavaImplementation 'org.testcontainers:clickhouse:1.19.0'
}

View File

@@ -1 +0,0 @@
<svg xmlns="http://www.w3.org/2000/svg" width="250" height="250" fill="none"><g clip-path="url(#a)"><path fill="red" d="M29.01 189.5h21.33V211H29.01v-21.5Z"/><path fill="#FC0" d="M29.01 39h21.332v150.5H29.01V39Zm42.663 0h21.331v172h-21.33V39Zm42.663 0h21.331v172h-21.331V39Zm42.662 0h21.331v172h-21.331V39Zm42.662 69.875h21.332v32.25H199.66v-32.25Z"/></g><defs><clipPath id="a"><path fill="#fff" d="M29 39h192v172H29z"/></clipPath></defs></svg>

Before

Width:  |  Height:  |  Size: 444 B

View File

@@ -1,36 +0,0 @@
data:
connectorBuildOptions:
baseImage: docker.io/airbyte/java-connector-base:2.0.2@sha256:f8e47304842a2c4d75ac223cf4b3c4117aa1c5c9207149369d296616815fe5b0
registryOverrides:
cloud:
enabled: false # strict encrypt connectors are deployed to Cloud by their non strict encrypt sibling.
oss:
enabled: false # strict encrypt connectors are not used on OSS.
connectorSubtype: database
connectorType: destination
definitionId: ce0d828e-1dc4-496c-b122-2da42e637e48
dockerImageTag: 1.1.0
dockerRepository: airbyte/destination-clickhouse-strict-encrypt
githubIssueLabel: destination-clickhouse
icon: clickhouse.svg
license: MIT
name: Clickhouse
releases:
breakingChanges:
1.0.0:
upgradeDeadline: "2024-03-15"
message: >
This version removes the option to use "normalization" with clickhouse. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/clickhouse
supportsDbt: false
tags:
- language:java
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
metadataSpecVersion: "1.0"

View File

@@ -1,3 +0,0 @@
include = [
"${POE_GIT_DIR}/poe-tasks/gradle-connector-tasks.toml",
]

View File

@@ -1,44 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.spec_modification.SpecModifyingDestination;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClickhouseDestinationStrictEncrypt extends SpecModifyingDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseDestinationStrictEncrypt.class);
public ClickhouseDestinationStrictEncrypt() {
super(ClickhouseDestination.sshWrappedDestination());
}
@Override
public ConnectorSpecification modifySpec(final ConnectorSpecification originalSpec) {
final ConnectorSpecification spec = Jsons.clone(originalSpec);
((ObjectNode) spec.getConnectionSpecification().get("properties")).remove(JdbcUtils.SSL_KEY);
return spec;
}
public static void main(final String[] args) throws Exception {
final Destination destination = new ClickhouseDestinationStrictEncrypt();
LOGGER.info("starting destination: {}", ClickhouseDestinationStrictEncrypt.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", ClickhouseDestinationStrictEncrypt.class);
}
@Override
public boolean isV2Destination() {
return true;
}
}

View File

@@ -1,213 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import static java.time.temporal.ChronoUnit.SECONDS;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider;
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.sql.SQLException;
import java.time.Duration;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.testcontainers.containers.BindMode;
import org.testcontainers.containers.GenericContainer;
import org.testcontainers.containers.wait.strategy.Wait;
import org.testcontainers.images.builder.ImageFromDockerfile;
public class ClickhouseDestinationStrictEncryptAcceptanceTest extends DestinationAcceptanceTest {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseDestinationStrictEncryptAcceptanceTest.class);
public static final Integer HTTP_PORT = 8123;
public static final Integer NATIVE_PORT = 9000;
public static final Integer HTTPS_PORT = 8443;
public static final Integer NATIVE_SECURE_PORT = 9440;
private static final String DB_NAME = "default";
private static final String USER_NAME = "default";
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
private GenericContainer db;
private static JdbcDatabase getDatabase(final JsonNode config) {
final String jdbcStr = String.format(DatabaseDriver.CLICKHOUSE.getUrlFormatString() + "?sslmode=NONE",
ClickhouseDestination.HTTPS_PROTOCOL,
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText());
return new DefaultJdbcDatabase(DataSourceFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.has(JdbcUtils.PASSWORD_KEY) ? config.get(JdbcUtils.PASSWORD_KEY).asText() : null,
ClickhouseDestination.DRIVER_CLASS,
jdbcStr), new ClickhouseTestSourceOperations());
}
@Override
protected String getImageName() {
return "airbyte/destination-clickhouse-strict-encrypt:dev";
}
@Override
protected boolean implementsNamespaces() {
return true;
}
@Override
protected TestDataComparator getTestDataComparator() {
return new ClickhouseTestDataComparator();
}
@Override
protected boolean supportBasicDataTypeTest() {
return true;
}
@Override
protected boolean supportArrayDataTypeTest() {
return true;
}
@Override
protected boolean supportObjectDataTypeTest() {
return true;
}
@Override
protected String getDestinationDefinitionKey() {
return "airbyte/destination-clickhouse";
}
@Override
protected String getDefaultSchema(final JsonNode config) {
if (config.get(JdbcUtils.DATABASE_KEY) == null) {
return null;
}
return config.get(JdbcUtils.DATABASE_KEY).asText();
}
@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveIpAddress(db))
.put(JdbcUtils.PORT_KEY, HTTPS_PORT)
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, USER_NAME)
.put(JdbcUtils.PASSWORD_KEY, "")
.put(JdbcUtils.SCHEMA_KEY, DB_NAME)
.put(JdbcUtils.SSL_KEY, true)
.build());
}
@Override
protected JsonNode getFailCheckConfig() {
final JsonNode clone = Jsons.clone(getConfig());
((ObjectNode) clone).put("password", "wrong password").put(JdbcUtils.SSL_KEY, false);
return clone;
}
@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv testEnv,
final String streamName,
final String namespace)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getIdentifier(streamName), namespace);
}
@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
final String streamName,
final String namespace,
final JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(StreamId.concatenateRawTableName(namespace, streamName), "airbyte_internal")
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.collect(Collectors.toList());
}
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
final JdbcDatabase jdbcDB = getDatabase(getConfig());
final var nameTransformer = new StandardNameTransformer();
final String query = String.format("SELECT * FROM `%s`.`%s` ORDER BY %s ASC", schemaName, nameTransformer.convertStreamName(tableName),
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT);
return jdbcDB.queryJsons(query);
}
@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
@Override
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) {
db = new GenericContainer<>(new ImageFromDockerfile("clickhouse-test")
.withFileFromClasspath("Dockerfile", "docker/Dockerfile")
.withFileFromClasspath("clickhouse_certs.sh", "docker/clickhouse_certs.sh"))
.withEnv("TZ", "UTC")
.withExposedPorts(HTTP_PORT, NATIVE_PORT, HTTPS_PORT, NATIVE_SECURE_PORT)
.withClasspathResourceMapping("ssl_ports.xml", "/etc/clickhouse-server/config.d/ssl_ports.xml", BindMode.READ_ONLY)
.waitingFor(Wait.forHttp("/ping").forPort(HTTP_PORT)
.forStatusCode(200).withStartupTimeout(Duration.of(60, SECONDS)));
db.start();
LOGGER.info(String.format("Clickhouse server container port mapping: %d -> %d, %d -> %d, %d -> %d, %d -> %d",
HTTP_PORT, db.getMappedPort(HTTP_PORT),
HTTPS_PORT, db.getMappedPort(HTTPS_PORT),
NATIVE_PORT, db.getMappedPort(NATIVE_PORT),
NATIVE_SECURE_PORT, db.getMappedPort(NATIVE_SECURE_PORT)));
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) {
db.stop();
db.close();
}
@Override
@ParameterizedTest
@ArgumentsSource(DataTypeTestArgumentProvider.class)
public void testDataTypeTestWithNormalization(final String messagesFilename,
final String catalogFilename,
final DataTypeTestArgumentProvider.TestCompatibility testCompatibility)
throws Exception {
// arrays are not fully supported yet in jdbc driver
// https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseArray.java
if (messagesFilename.contains("array")) {
return;
}
super.testDataTypeTestWithNormalization(messagesFilename, catalogFilename, testCompatibility);
}
}

View File

@@ -1,154 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClickhouseTestDataComparator extends AdvancedTestDataComparator {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseTestDataComparator.class);
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
private static final String CLICKHOUSE_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSX";
// https://clickhouse.com/docs/en/sql-reference/data-types/date32/
private final LocalDate minSupportedDate = LocalDate.parse("1970-01-01");
private final LocalDate maxSupportedDate = LocalDate.parse("2149-06-06");
private final ZonedDateTime minSupportedDateTime = ZonedDateTime.parse(
"1925-01-01T00:00:00.000Z");
private final ZonedDateTime maxSupportedDateTime = ZonedDateTime.parse(
"2283-11-10T20:23:45.000Z");
@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
@Override
protected boolean compareNumericValues(final String firstNumericValue,
final String secondNumericValue) {
// clickhouse stores double 1.14 as 1.1400000000000001
// https://clickhouse.com/docs/en/sql-reference/data-types/float/
final double epsilon = 0.000000000000001d;
final double firstValue = Double.parseDouble(firstNumericValue);
final double secondValue = Double.parseDouble(secondNumericValue);
return firstValue == secondValue || Math.abs(firstValue - secondValue) < epsilon;
}
@Override
protected boolean compareBooleanValues(final String firstValue, final String secondValue) {
return parseBool(firstValue) == parseBool(secondValue);
}
@Override
protected boolean compareDateValues(final String airbyteMessageValue,
final String destinationValue) {
final LocalDate expectedDate = LocalDate.parse(airbyteMessageValue);
final LocalDate actualDate = LocalDate.parse(destinationValue);
if (expectedDate.isBefore(minSupportedDate) || expectedDate.isAfter(maxSupportedDate)) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/date32
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}
return actualDate.equals(expectedDate);
}
@Override
protected boolean compareDateTimeWithTzValues(final String airbyteMessageValue,
final String destinationValue) {
try {
final ZonedDateTime airbyteDate = ZonedDateTime.parse(airbyteMessageValue,
getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC);
final ZonedDateTime destinationDate = parseDestinationDateWithTz(destinationValue);
if (airbyteDate.isBefore(minSupportedDateTime) || airbyteDate.isAfter(maxSupportedDateTime)) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}
return airbyteDate.equals(destinationDate);
} catch (final DateTimeParseException e) {
LOGGER.warn(
"Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}",
airbyteMessageValue, destinationValue, e);
return compareTextValues(airbyteMessageValue, destinationValue);
}
}
@Override
protected ZonedDateTime parseDestinationDateWithTz(final String destinationValue) {
return ZonedDateTime.parse(destinationValue,
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant(
ZoneOffset.UTC);
}
@Override
protected boolean compareDateTimeValues(final String airbyteMessageValue,
final String destinationValue) {
final LocalDateTime expectedDateTime = LocalDateTime.parse(airbyteMessageValue);
LocalDateTime actualDateTime;
try {
actualDateTime = LocalDateTime.parse(destinationValue,
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT));
} catch (final DateTimeParseException e) {
LOGGER.warn("Error using Clickhouse Timezone format, trying standard format", e);
actualDateTime = LocalDateTime.parse(destinationValue);
}
if (expectedDateTime.isBefore(minSupportedDateTime.toLocalDateTime())
|| expectedDateTime.isAfter(maxSupportedDateTime.toLocalDateTime())) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}
return expectedDateTime.equals(actualDateTime);
}
private static boolean parseBool(final String valueAsString) {
// boolen as a String may be returned as true\false and as 0\1
// https://clickhouse.com/docs/en/sql-reference/data-types/boolean
try {
return Integer.parseInt(valueAsString) > 0;
} catch (final NumberFormatException ex) {
return Boolean.parseBoolean(valueAsString);
}
}
}

View File

@@ -1,34 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.DataTypeUtils;
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class ClickhouseTestSourceOperations extends JdbcSourceOperations {
@Override
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeFormatter.ISO_DATE.format(resultSet.getTimestamp(index).toLocalDateTime()));
}
@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
DataTypeUtils.DATE_FORMAT_WITH_MILLISECONDS_PATTERN);
node.put(columnName, resolveEra(date, timestamp.format(dateTimeFormatter)));
}
}

View File

@@ -1,8 +0,0 @@
FROM clickhouse/clickhouse-server:22.5
EXPOSE 8123
EXPOSE 8443
EXPOSE 9000
EXPOSE 9440
COPY clickhouse_certs.sh /docker-entrypoint-initdb.d/

View File

@@ -1,12 +0,0 @@
echo "Preparing certs"
openssl req -subj "/CN=my.host.name" -new \
-newkey rsa:2048 -days 365 -nodes -x509 \
-keyout /etc/clickhouse-server/server.key \
-out /etc/clickhouse-server/server.crt
openssl dhparam -out /etc/clickhouse-server/dhparam.pem 1024
chown $(id -u clickhouse):$(id -g clickhouse) /etc/clickhouse-server/server.{key,crt}
echo "Finished preparing certs"

View File

@@ -1,26 +0,0 @@
<clickhouse>
<http_port>8123</http_port>
<https_port>8443</https_port>
<tcp_port>9000</tcp_port>
<tcp_port_secure>9440</tcp_port_secure>
<mysql_port>9004</mysql_port>
<postgresql_port>9005</postgresql_port>
<interserver_http_port>9009</interserver_http_port>
<grpc_port>9100</grpc_port>
<openSSL>
<server>
<certificateFile>/etc/clickhouse-server/server.crt</certificateFile>
<privateKeyFile>/etc/clickhouse-server/server.key</privateKeyFile>
<verificationMode>relaxed</verificationMode>
<invalidCertificateHandler>
<name>ConsoleCertificateHandler</name>
</invalidCertificateHandler>
<loadDefaultCAFile>false</loadDefaultCAFile>
<cacheSessions>true</cacheSessions>
<disableProtocols>sslv2,sslv3</disableProtocols>
<preferServerCiphers>true</preferServerCiphers>
</server>
</openSSL>
</clickhouse>

View File

@@ -1,23 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import org.junit.jupiter.api.Test;
class ClickhouseDestinationStrictEncryptTest {
@Test
void testGetSpec() throws Exception {
System.out.println(new ClickhouseDestinationStrictEncrypt().spec().getConnectionSpecification());
assertEquals(Jsons.deserialize(MoreResources.readResource("expected_spec.json"), ConnectorSpecification.class),
new ClickhouseDestinationStrictEncrypt().spec());
}
}

View File

@@ -1,177 +0,0 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/clickhouse",
"supportsIncremental": true,
"supportsNormalization": true,
"supportsDBT": false,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ClickHouse Destination Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"additionalProperties": true,
"properties": {
"host": {
"title": "Host",
"description": "Hostname of the database.",
"type": "string",
"order": 0
},
"port": {
"title": "Port",
"description": "HTTP port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 8123,
"examples": ["8123"],
"order": 1
},
"database": {
"title": "DB Name",
"description": "Name of the database.",
"type": "string",
"order": 2
},
"username": {
"title": "User",
"description": "Username to use to access the database.",
"type": "string",
"order": 3
},
"password": {
"title": "Password",
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true,
"order": 4
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 5
},
"raw_data_schema": {
"type": "string",
"description": "The schema to write raw tables into (default: airbyte_internal)",
"title": "Raw Table Schema Name",
"order": 7
},
"tunnel_method": {
"type": "object",
"title": "SSH Tunnel Method",
"description": "Whether to initiate an SSH tunnel before connecting to the database, and if so, which kind of authentication to use.",
"oneOf": [
{
"title": "No Tunnel",
"required": ["tunnel_method"],
"properties": {
"tunnel_method": {
"description": "No ssh tunnel needed to connect to database",
"type": "string",
"const": "NO_TUNNEL",
"order": 0
}
}
},
{
"title": "SSH Key Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"ssh_key"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and ssh key",
"type": "string",
"const": "SSH_KEY_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host.",
"type": "string",
"order": 3
},
"ssh_key": {
"title": "SSH Private Key",
"description": "OS-level user account ssh key credentials in RSA PEM format ( created with ssh-keygen -t rsa -m PEM -f myuser_rsa )",
"type": "string",
"airbyte_secret": true,
"multiline": true,
"order": 4
}
}
},
{
"title": "Password Authentication",
"required": [
"tunnel_method",
"tunnel_host",
"tunnel_port",
"tunnel_user",
"tunnel_user_password"
],
"properties": {
"tunnel_method": {
"description": "Connect through a jump server tunnel host using username and password authentication",
"type": "string",
"const": "SSH_PASSWORD_AUTH",
"order": 0
},
"tunnel_host": {
"title": "SSH Tunnel Jump Server Host",
"description": "Hostname of the jump server host that allows inbound ssh tunnel.",
"type": "string",
"order": 1
},
"tunnel_port": {
"title": "SSH Connection Port",
"description": "Port on the proxy/jump server that accepts inbound ssh connections.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 22,
"examples": ["22"],
"order": 2
},
"tunnel_user": {
"title": "SSH Login Username",
"description": "OS-level username for logging into the jump server host",
"type": "string",
"order": 3
},
"tunnel_user_password": {
"title": "Password",
"description": "OS-level password for logging into the jump server host",
"type": "string",
"airbyte_secret": true,
"order": 4
}
}
}
]
}
}
}
}

View File

@@ -1,87 +0,0 @@
# Destination Clickhouse
This is the repository for the Clickhouse destination connector in Java.
For information about how to use this connector within Airbyte, see [the User Documentation](https://docs.airbyte.io/integrations/destinations/clickhouse).
## Local development
#### Building via Gradle
From the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:destination-clickhouse-v2:build
```
#### Create credentials
**If you are a community contributor**, generate the necessary credentials and place them in `secrets/config.json` conforming to the spec file in `src/main/resources/spec.json`.
Note that the `secrets` directory is git-ignored by default, so there is no danger of accidentally checking in sensitive information.
**If you are an Airbyte core member**, follow the [instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci) to set up the credentials.
### Locally running the connector docker image
#### Build
Build the connector image via Gradle:
```
./gradlew :airbyte-integrations:connectors:destination-clickhouse-v2:buildConnectorImage
```
Once built, the docker image name and tag on your host will be `airbyte/destination-clickhouse:dev`.
the Dockerfile.
#### Run
Then run any of the connector commands as follows:
```
docker run --rm airbyte/destination-clickhouse:dev spec
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-clickhouse-v2:dev check --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets airbyte/destination-clickhouse-v2:dev discover --config /secrets/config.json
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/destination-clickhouse-v2:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
We use `JUnit` for Java tests.
### Unit and Integration Tests
Place unit tests under `src/test/io/airbyte/integrations/destinations/clickhouse`.
#### Acceptance Tests
Airbyte has a standard test suite that all destination connectors must pass. Implement the `TODO`s in
`src/test-integration/java/io/airbyte/integrations/destinations/clickhouseDestinationAcceptanceTest.java`.
### Using gradle to run tests
All commands should be run from airbyte project root.
To run unit tests:
```
./gradlew :airbyte-integrations:connectors:destination-clickhouse-v2:unitTest
```
To run acceptance and custom integration tests:
```
./gradlew :airbyte-integrations:connectors:destination-clickhouse-v2:integrationTest
```
## Dependency Management
### Publishing a new version of the connector
You've checked out the repo, implemented a million dollar feature, and you're ready to share your changes with the world. Now what?
1. Make sure your changes are passing our test suite: `airbyte-ci connectors --name=destination-clickhouse test`
2. Bump the connector version in `metadata.yaml`: increment the `dockerImageTag` value. Please follow [semantic versioning for connectors](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#semantic-versioning-for-connectors).
3. Make sure the `metadata.yaml` content is up to date.
4. Make the connector documentation and its changelog is up to date (`docs/integrations/destinations/clickhouse.md`).
5. Create a Pull Request: use [our PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook/#pull-request-title-convention).
6. Pat yourself on the back for being an awesome contributor.
7. Someone from Airbyte will take a look at your PR and iterate with you to merge it into master.

View File

@@ -1,3 +0,0 @@
# ClickHouse
## TODO

View File

@@ -1,50 +0,0 @@
plugins {
id 'application'
id 'airbyte-bulk-connector'
id "io.airbyte.gradle.docker"
id 'airbyte-connector-docker-convention'
}
airbyteBulkConnector {
core = 'load'
toolkits = ['load-gcs', 'load-db', 'load-s3']
cdk = 'local'
}
java {
compileJava {
options.compilerArgs += "-Xlint:-this-escape"
}
}
application {
mainClass = 'io.airbyte.integrations.destination.clickhouse_v2.ClickhouseV2DestinationKt'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0',
'-XX:NativeMemoryTracking=detail', '-XX:+UnlockDiagnosticVMOptions',
'-XX:GCLockerRetryAllocationCount=100',
// '-Djava.rmi.server.hostname=localhost',
// '-Dcom.sun.management.jmxremote=true',
// '-Dcom.sun.management.jmxremote.port=6000',
// '-Dcom.sun.management.jmxremote.rmi.port=6000',
// '-Dcom.sun.management.jmxremote.local.only=false',
// '-Dcom.sun.management.jmxremote.authenticate=false',
// '-Dcom.sun.management.jmxremote.ssl=false'
]
}
dependencies {
implementation 'com.clickhouse:client-v2:0.9.0'
testImplementation("io.mockk:mockk:1.14.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
// This is being used in the test to check that the generated SQL can be parsed correctly.
testImplementation("com.github.vertical-blank:sql-formatter:2.0.4")
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
integrationTestImplementation 'org.testcontainers:clickhouse:1.21.1'
integrationTestImplementation 'com.clickhouse:client-v2:0.9.0'
}
test {
systemProperties(["mockk.junit.extension.requireParallelTesting":"true"])
}

View File

@@ -1 +0,0 @@
<svg xmlns="http://www.w3.org/2000/svg" width="250" height="250" fill="none"><g clip-path="url(#a)"><path fill="red" d="M29.01 189.5h21.33V211H29.01v-21.5Z"/><path fill="#FC0" d="M29.01 39h21.332v150.5H29.01V39Zm42.663 0h21.331v172h-21.33V39Zm42.663 0h21.331v172h-21.331V39Zm42.662 0h21.331v172h-21.331V39Zm42.662 69.875h21.332v32.25H199.66v-32.25Z"/></g><defs><clipPath id="a"><path fill="#fff" d="M29 39h192v172H29z"/></clipPath></defs></svg>

Before

Width:  |  Height:  |  Size: 444 B

View File

@@ -1,31 +0,0 @@
data:
connectorSubtype: database
connectorType: destination
definitionId: c4510ac6-094f-4ac3-8811-a5582346b9b9
dockerImageTag: 0.1.11
dockerRepository: airbyte/destination-clickhouse-v2
githubIssueLabel: destination-clickhouse-v2
icon: clickhouse.svg
license: MIT
name: Clickhouse V2 Beta (deprecated)
connectorBuildOptions:
baseImage: docker.io/airbyte/java-connector-base:2.0.1@sha256:ec89bd1a89e825514dd2fc8730ba299a3ae1544580a078df0e35c5202c2085b3
registryOverrides:
cloud:
enabled: false
oss:
enabled: false
releaseStage: alpha
documentationUrl: https://docs.airbyte.com/integrations/destinations/clickhouse-v2
tags:
- language:java
ab_internal:
sl: 100
ql: 200
requireVersionIncrementsInPullRequests: false
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
supportsRefreshes: true
metadataSpecVersion: "1.0"

View File

@@ -1,3 +0,0 @@
include = [
"${POE_GIT_DIR}/poe-tasks/gradle-connector-tasks.toml",
]

View File

@@ -4,14 +4,10 @@
ClickHouse is a fast open-source column-oriented database management system that allows generating analytical data reports in real-time using SQL queries.
## Endpoints
This destination connector uses ClickHouse official JDBC driver, which uses HTTP as protocol. [https://github.com/ClickHouse/clickhouse-jdbc](https://github.com/ClickHouse/clickhouse-jdbc)
## Quick Notes
- This connector doesn't support nested streams and schema change yet.
- This connector doesn't support nested streams and schema changes.
## API Reference
The ClickHouse reference documents: [https://clickhouse.com/docs/en/](https://clickhouse.com/docs/en/)
The ClickHouse reference documents: [https://clickhouse.com/docs/en/](https://clickhouse.com/docs/en/)

View File

@@ -1,37 +1,50 @@
plugins {
id 'application'
id 'airbyte-java-connector'
id("io.airbyte.gradle.docker")
id("airbyte-connector-docker-convention")
id 'airbyte-bulk-connector'
id "io.airbyte.gradle.docker"
id 'airbyte-connector-docker-convention'
}
airbyteJavaConnector {
cdkVersionRequired = '0.22.1'
features = ['db-destinations', 's3-destinations', 'typing-deduping']
useLocalCdk = false
airbyteBulkConnector {
core = 'load'
toolkits = ['load-gcs', 'load-db', 'load-s3']
cdk = 'local'
}
//remove once upgrading the CDK version to 0.4.x or later
java {
compileJava {
options.compilerArgs.remove("-Werror")
options.compilerArgs += "-Xlint:-this-escape"
}
}
airbyteJavaConnector.addCdkDependencies()
application {
mainClass = 'io.airbyte.integrations.destination.clickhouse.ClickhouseDestination'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0']
mainClass = 'io.airbyte.integrations.destination.clickhouse.ClickhouseDestinationKt'
applicationDefaultJvmArgs = ['-XX:+ExitOnOutOfMemoryError', '-XX:MaxRAMPercentage=75.0',
'-XX:NativeMemoryTracking=detail', '-XX:+UnlockDiagnosticVMOptions',
'-XX:GCLockerRetryAllocationCount=100',
// '-Djava.rmi.server.hostname=localhost',
// '-Dcom.sun.management.jmxremote=true',
// '-Dcom.sun.management.jmxremote.port=6000',
// '-Dcom.sun.management.jmxremote.rmi.port=6000',
// '-Dcom.sun.management.jmxremote.local.only=false',
// '-Dcom.sun.management.jmxremote.authenticate=false',
// '-Dcom.sun.management.jmxremote.ssl=false'
]
}
dependencies {
implementation 'com.clickhouse:client-v2:0.9.0'
implementation 'com.clickhouse:clickhouse-jdbc:0.3.2-patch10:all'
testImplementation("io.mockk:mockk:1.14.2")
testImplementation("org.jetbrains.kotlinx:kotlinx-coroutines-test:1.10.2")
// This is being used in the test to check that the generated SQL can be parsed correctly.
testImplementation("com.github.vertical-blank:sql-formatter:2.0.4")
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
testImplementation 'org.testcontainers:clickhouse:1.19.0'
// https://mvnrepository.com/artifact/org.testcontainers/clickhouse
integrationTestJavaImplementation 'org.testcontainers:clickhouse:1.19.0'
integrationTestImplementation 'org.testcontainers:clickhouse:1.21.1'
integrationTestImplementation 'com.clickhouse:client-v2:0.9.0'
}
test {
systemProperties(["mockk.junit.extension.requireParallelTesting":"true"])
}

View File

@@ -1,41 +1,36 @@
data:
connectorBuildOptions:
baseImage: docker.io/airbyte/java-connector-base:2.0.2@sha256:f8e47304842a2c4d75ac223cf4b3c4117aa1c5c9207149369d296616815fe5b0
connectorSubtype: database
connectorType: destination
definitionId: ce0d828e-1dc4-496c-b122-2da42e637e48
dockerImageTag: 1.1.0
dockerImageTag: 2.0.0
dockerRepository: airbyte/destination-clickhouse
githubIssueLabel: destination-clickhouse
icon: clickhouse.svg
license: MIT
name: Clickhouse
connectorBuildOptions:
baseImage: docker.io/airbyte/java-connector-base:2.0.1@sha256:ec89bd1a89e825514dd2fc8730ba299a3ae1544580a078df0e35c5202c2085b3
registryOverrides:
cloud:
dockerRepository: airbyte/destination-clickhouse-strict-encrypt
enabled: true
oss:
enabled: true
releaseStage: generally_available
releases:
breakingChanges:
1.0.0:
upgradeDeadline: "2024-03-15"
message: >
This version removes the option to use "normalization" with clickhouse. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models.
Selecting `Upgrade` will upgrade **all** connections using this destination at their next sync.
releaseStage: alpha
2.0.0:
message: "This connector has been re-written from scratch. Data will now be typed and stored in final (non-raw) tables. The connector should function without changes, but downstream pipelines may be affected."
upgradeDeadline: "2025-08-19"
documentationUrl: https://docs.airbyte.com/integrations/destinations/clickhouse
supportsDbt: false
tags:
- language:java
ab_internal:
sl: 100
ql: 200
requireVersionIncrementsInPullRequests: false
supportLevel: community
connectorTestSuitesOptions:
- suite: unitTests
- suite: integrationTests
supportsRefreshes: true
metadataSpecVersion: "1.0"

View File

@@ -1,135 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.IntegrationRunner;
import io.airbyte.cdk.integrations.base.ssh.SshWrappedDestination;
import io.airbyte.cdk.integrations.destination.NamingConventionTransformer;
import io.airbyte.cdk.integrations.destination.jdbc.AbstractJdbcDestination;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.JdbcSqlGenerator;
import io.airbyte.cdk.integrations.destination.jdbc.typing_deduping.RawOnlySqlGenerator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus;
import io.airbyte.protocol.models.v0.AirbyteConnectionStatus.Status;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import javax.sql.DataSource;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClickhouseDestination extends AbstractJdbcDestination implements Destination {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseDestination.class);
public static final String DRIVER_CLASS = DatabaseDriver.CLICKHOUSE.getDriverClassName();
public static final String HTTPS_PROTOCOL = "https";
public static final String HTTP_PROTOCOL = "http";
static final List<String> SSL_PARAMETERS = ImmutableList.of(
"socket_timeout=3000000",
"sslmode=NONE");
static final List<String> DEFAULT_PARAMETERS = ImmutableList.of(
"socket_timeout=3000000");
public static Destination sshWrappedDestination() {
return new SshWrappedDestination(new ClickhouseDestination(), JdbcUtils.HOST_LIST_KEY, JdbcUtils.PORT_LIST_KEY);
}
public ClickhouseDestination() {
super(DRIVER_CLASS, new ClickhouseSQLNameTransformer(), new ClickhouseSqlOperations());
}
@Override
public JsonNode toJdbcConfig(final JsonNode config) {
final boolean isSsl = JdbcUtils.useSsl(config);
final StringBuilder jdbcUrl = new StringBuilder(
String.format(DatabaseDriver.CLICKHOUSE.getUrlFormatString(),
isSsl ? HTTPS_PROTOCOL : HTTP_PROTOCOL,
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText()));
if (isSsl) {
jdbcUrl.append("?").append(String.join("&", SSL_PARAMETERS));
} else {
jdbcUrl.append("?").append(String.join("&", DEFAULT_PARAMETERS));
}
final ImmutableMap.Builder<Object, Object> configBuilder = ImmutableMap.builder()
.put(JdbcUtils.USERNAME_KEY, config.get(JdbcUtils.USERNAME_KEY).asText())
.put(JdbcUtils.JDBC_URL_KEY, jdbcUrl);
if (config.has(JdbcUtils.PASSWORD_KEY)) {
configBuilder.put(JdbcUtils.PASSWORD_KEY, config.get(JdbcUtils.PASSWORD_KEY).asText());
}
if (config.has(JdbcUtils.JDBC_URL_PARAMS_KEY)) {
configBuilder.put(JdbcUtils.JDBC_URL_PARAMS_KEY, config.get(JdbcUtils.JDBC_URL_PARAMS_KEY).asText());
}
return Jsons.jsonNode(configBuilder.build());
}
@Override
public AirbyteConnectionStatus check(final JsonNode config) {
final DataSource dataSource = getDataSource(config);
try {
final JdbcDatabase database = getDatabase(dataSource);
final NamingConventionTransformer namingResolver = getNamingResolver();
final String outputSchema = namingResolver.getIdentifier(config.get(JdbcUtils.DATABASE_KEY).asText());
attemptTableOperations(outputSchema, database, namingResolver, getSqlOperations(), false);
return new AirbyteConnectionStatus().withStatus(Status.SUCCEEDED);
} catch (final Exception e) {
LOGGER.error("Exception while checking connection: ", e);
return new AirbyteConnectionStatus()
.withStatus(Status.FAILED)
.withMessage("Could not connect with provided configuration. \n" + e.getMessage());
} finally {
try {
DataSourceFactory.close(dataSource);
} catch (final Exception e) {
LOGGER.warn("Unable to close data source.", e);
}
}
}
@Override
protected Map<String, String> getDefaultConnectionProperties(final JsonNode config) {
return Collections.emptyMap();
}
public static void main(final String[] args) throws Exception {
final Destination destination = ClickhouseDestination.sshWrappedDestination();
LOGGER.info("starting destination: {}", ClickhouseDestination.class);
new IntegrationRunner(destination).run(args);
LOGGER.info("completed destination: {}", ClickhouseDestination.class);
}
@Override
protected JdbcSqlGenerator getSqlGenerator() {
return new RawOnlySqlGenerator(new ClickhouseSQLNameTransformer());
}
@Override
public boolean isV2Destination() {
return true;
}
@Override
protected String getConfigSchemaKey() {
return "database";
}
}

View File

@@ -1,16 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
public class ClickhouseSQLNameTransformer extends StandardNameTransformer {
@Override
public String applyDefaultCase(final String input) {
return input.toLowerCase();
}
}

View File

@@ -1,118 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import com.clickhouse.client.ClickHouseFormat;
import com.clickhouse.jdbc.ClickHouseConnection;
import com.clickhouse.jdbc.ClickHouseStatement;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.jdbc.JdbcSqlOperations;
import io.airbyte.cdk.integrations.destination_async.partial_messages.PartialAirbyteMessage;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.sql.SQLException;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClickhouseSqlOperations extends JdbcSqlOperations {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseSqlOperations.class);
@Override
public void createSchemaIfNotExists(final JdbcDatabase database, final String schemaName) throws Exception {
database.execute(String.format("CREATE DATABASE IF NOT EXISTS %s;\n", schemaName));
}
@Override
public boolean isSchemaRequired() {
return false;
}
@Override
public String createTableQuery(final JdbcDatabase database, final String schemaName, final String tableName) {
return String.format(
"""
CREATE TABLE IF NOT EXISTS `%s`.`%s` (
%s String,
%s String,
%s DateTime64(3, 'GMT') DEFAULT now(),
%s DateTime64(3, 'GMT') NULL,
PRIMARY KEY(%s)
)
ENGINE = MergeTree;
""",
schemaName, tableName,
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID,
JavaBaseConstants.COLUMN_NAME_DATA,
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT,
JavaBaseConstants.COLUMN_NAME_AB_LOADED_AT,
JavaBaseConstants.COLUMN_NAME_AB_RAW_ID);
}
@Override
public void executeTransaction(final JdbcDatabase database, final List<String> queries) throws Exception {
// Note: ClickHouse does not support multi query
for (final String query : queries) {
database.execute(query);
}
}
@Override
public void insertRecordsInternal(final JdbcDatabase database,
final List<PartialAirbyteMessage> records,
final String schemaName,
final String tmpTableName)
throws SQLException {
LOGGER.info("actual size of batch: {}", records.size());
if (records.isEmpty()) {
return;
}
database.execute(connection -> {
File tmpFile = null;
Exception primaryException = null;
try {
tmpFile = Files.createTempFile(tmpTableName + "-", ".tmp").toFile();
writeBatchToFile(tmpFile, records);
final ClickHouseConnection conn = connection.unwrap(ClickHouseConnection.class);
final ClickHouseStatement sth = conn.createStatement();
sth.write() // Write API entrypoint
.table(String.format("%s.%s", schemaName, tmpTableName)) // where to write data
.format(ClickHouseFormat.CSV) // set a format
.data(tmpFile.getAbsolutePath()) // specify input
.send();
} catch (final Exception e) {
primaryException = e;
throw new RuntimeException(e);
} finally {
try {
if (tmpFile != null) {
Files.delete(tmpFile.toPath());
}
} catch (final IOException e) {
if (primaryException != null)
e.addSuppressed(primaryException);
throw new RuntimeException(e);
}
}
});
}
@Override
protected void insertRecordsInternalV2(final JdbcDatabase database,
final List<PartialAirbyteMessage> records,
final String schemaName,
final String tableName)
throws Exception {
insertRecordsInternal(database, records, schemaName, tableName);
}
}

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2
package io.airbyte.integrations.destination.clickhouse
import io.airbyte.cdk.AirbyteDestinationRunner

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.check
package io.airbyte.integrations.destination.clickhouse.check
import com.clickhouse.data.ClickHouseFormat
import com.google.common.annotations.VisibleForTesting
import io.airbyte.cdk.load.check.DestinationChecker
import io.airbyte.integrations.destination.clickhouse_v2.check.ClickhouseChecker.Constants.TEST_DATA
import io.airbyte.integrations.destination.clickhouse_v2.config.ClickhouseBeanFactory
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.check.ClickhouseChecker.Constants.TEST_DATA
import io.airbyte.integrations.destination.clickhouse.config.ClickhouseBeanFactory
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import jakarta.inject.Singleton
import java.time.Clock
import java.util.concurrent.TimeUnit

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.client
package io.airbyte.integrations.destination.clickhouse.client
import com.clickhouse.client.api.Client as ClickHouseClientRaw
import com.clickhouse.client.api.command.CommandResponse
@@ -20,11 +20,11 @@ import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.cdk.load.orchestration.db.TempTableNameGenerator
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableNativeOperations
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableSqlOperations
import io.airbyte.integrations.destination.clickhouse_v2.client.ClickhouseSqlGenerator.Companion.DATETIME_WITH_PRECISION
import io.airbyte.integrations.destination.clickhouse_v2.config.ClickhouseFinalTableNameGenerator
import io.airbyte.integrations.destination.clickhouse_v2.model.AlterationSummary
import io.airbyte.integrations.destination.clickhouse_v2.model.hasApplicableAlterations
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DATETIME_WITH_PRECISION
import io.airbyte.integrations.destination.clickhouse.config.ClickhouseFinalTableNameGenerator
import io.airbyte.integrations.destination.clickhouse.model.AlterationSummary
import io.airbyte.integrations.destination.clickhouse.model.hasApplicableAlterations
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import io.github.oshai.kotlinlogging.KotlinLogging
import jakarta.inject.Singleton
import kotlinx.coroutines.future.await

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.client
package io.airbyte.integrations.destination.clickhouse.client
import com.clickhouse.data.ClickHouseDataType
import io.airbyte.cdk.load.command.Dedupe
@@ -31,10 +31,10 @@ import io.airbyte.cdk.load.message.Meta.Companion.COLUMN_NAME_AB_RAW_ID
import io.airbyte.cdk.load.orchestration.db.CDC_DELETED_AT_COLUMN
import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.integrations.destination.clickhouse_v2.client.ClickhouseSqlGenerator.Companion.DATETIME_WITH_PRECISION
import io.airbyte.integrations.destination.clickhouse_v2.client.ClickhouseSqlGenerator.Companion.DECIMAL_WITH_PRECISION_AND_SCALE
import io.airbyte.integrations.destination.clickhouse_v2.model.AlterationSummary
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DATETIME_WITH_PRECISION
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseSqlGenerator.Companion.DECIMAL_WITH_PRECISION_AND_SCALE
import io.airbyte.integrations.destination.clickhouse.model.AlterationSummary
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import jakarta.inject.Singleton
@Singleton

View File

@@ -2,16 +2,16 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.config
package io.airbyte.integrations.destination.clickhouse.config
import com.clickhouse.client.api.Client
import com.clickhouse.client.api.internal.ServerSettings
import io.airbyte.cdk.command.ConfigurationSpecificationSupplier
import io.airbyte.cdk.load.orchestration.db.DefaultTempTableNameGenerator
import io.airbyte.cdk.load.orchestration.db.TempTableNameGenerator
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfigurationFactory
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseSpecification
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfigurationFactory
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseSpecification
import io.micronaut.context.annotation.Factory
import jakarta.inject.Singleton

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.config
package io.airbyte.integrations.destination.clickhouse.config
import io.airbyte.cdk.load.client.AirbyteClient
import io.airbyte.cdk.load.orchestration.db.BaseDirectLoadInitialStatusGatherer

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.config
package io.airbyte.integrations.destination.clickhouse.config
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.data.Transformations.Companion.toAlphanumericAndUnderscore
import io.airbyte.cdk.load.orchestration.db.ColumnNameGenerator
import io.airbyte.cdk.load.orchestration.db.FinalTableNameGenerator
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import jakarta.inject.Singleton
import java.util.Locale
import java.util.UUID

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.model
package io.airbyte.integrations.destination.clickhouse.model
data class AlterationSummary(
val added: Map<String, String>,

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.spec
package io.airbyte.integrations.destination.clickhouse.spec
import io.airbyte.cdk.load.command.DestinationConfiguration
import io.airbyte.cdk.load.command.DestinationConfigurationFactory

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.spec
package io.airbyte.integrations.destination.clickhouse.spec
import com.fasterxml.jackson.annotation.JsonProperty
import com.fasterxml.jackson.annotation.JsonPropertyDescription
@@ -31,7 +31,7 @@ sealed class ClickhouseSpecification : ConfigurationSpecification() {
class ClickhouseSpecificationOss : ClickhouseSpecification() {
@get:JsonSchemaTitle("Hostname")
@get:JsonPropertyDescription("Hostname of the database.")
@get:JsonProperty("hostname")
@get:JsonProperty("host")
@get:JsonSchemaInject(json = """{"order": 0}""")
override val hostname: String = ""
@@ -79,7 +79,7 @@ class ClickhouseSpecificationOss : ClickhouseSpecification() {
open class ClickhouseSpecificationCloud : ClickhouseSpecification() {
@get:JsonSchemaTitle("Hostname")
@get:JsonPropertyDescription("Hostname of the database.")
@get:JsonProperty("hostname")
@get:JsonProperty("host")
@get:JsonSchemaInject(json = """{"order": 0}""")
override val hostname: String = ""
@@ -128,7 +128,7 @@ enum class ClickhouseConnectionProtocol(@get:JsonValue val value: String) {
}
@Singleton
class ClickhouseV2SpecificationExtension : DestinationSpecificationExtension {
class ClickhouseSpecificationExtension : DestinationSpecificationExtension {
override val supportedSyncModes =
listOf(
DestinationSyncMode.OVERWRITE,

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write
package io.airbyte.integrations.destination.clickhouse.write
import io.airbyte.cdk.SystemErrorException
import io.airbyte.cdk.load.command.DestinationStream
@@ -16,7 +16,7 @@ import io.airbyte.cdk.load.orchestration.db.legacy_typing_deduping.TableCatalog
import io.airbyte.cdk.load.write.DestinationWriter
import io.airbyte.cdk.load.write.StreamLoader
import io.airbyte.cdk.load.write.StreamStateStore
import io.airbyte.integrations.destination.clickhouse_v2.client.ClickhouseAirbyteClient
import io.airbyte.integrations.destination.clickhouse.client.ClickhouseAirbyteClient
import jakarta.inject.Singleton
@Singleton

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
import com.clickhouse.client.api.Client
import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter

View File

@@ -2,13 +2,13 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
import com.google.common.annotations.VisibleForTesting
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.write.DirectLoader
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.RecordMunger
import io.airbyte.integrations.destination.clickhouse.write.transform.RecordMunger
@SuppressFBWarnings(
value = ["NP_NONNULL_PARAM_VIOLATION"],

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
import com.clickhouse.client.api.Client
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableExecutionConfig
import io.airbyte.cdk.load.write.DirectLoaderFactory
import io.airbyte.cdk.load.write.StreamStateStore
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.RecordMunger
import io.airbyte.integrations.destination.clickhouse.write.transform.RecordMunger
import jakarta.inject.Singleton
/*

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
/*
* Encapsulates basic sized windowing logic. As we implement other windowing,

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.transform
package io.airbyte.integrations.destination.clickhouse.write.transform
import io.airbyte.cdk.load.data.ArrayValue
import io.airbyte.cdk.load.data.BooleanValue
@@ -18,14 +18,14 @@ import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.util.serializeToString
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DATE32_MAX
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DATE32_MIN
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DATETIME64_MAX
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DATETIME64_MIN
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DECIMAL128_MAX
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DECIMAL128_MIN
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.INT64_MAX
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.INT64_MIN
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DATE32_MAX
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DATE32_MIN
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DATETIME64_MAX
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DATETIME64_MIN
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DECIMAL128_MAX
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DECIMAL128_MIN
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.INT64_MAX
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.INT64_MIN
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import jakarta.inject.Singleton
import java.math.BigDecimal

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.transform
package io.airbyte.integrations.destination.clickhouse.write.transform
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.UnionType

View File

@@ -1,70 +0,0 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/destinations/clickhouse",
"supportsIncremental": true,
"supportsNormalization": true,
"supportsDBT": false,
"supported_destination_sync_modes": ["overwrite", "append"],
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "ClickHouse Destination Spec",
"type": "object",
"required": ["host", "port", "database", "username"],
"additionalProperties": true,
"properties": {
"host": {
"title": "Host",
"description": "Hostname of the database.",
"type": "string",
"order": 0
},
"port": {
"title": "Port",
"description": "HTTP port of the database.",
"type": "integer",
"minimum": 0,
"maximum": 65536,
"default": 8123,
"examples": ["8123"],
"order": 1
},
"database": {
"title": "DB Name",
"description": "Name of the database.",
"type": "string",
"order": 2
},
"username": {
"title": "User",
"description": "Username to use to access the database.",
"type": "string",
"order": 3
},
"password": {
"title": "Password",
"description": "Password associated with the username.",
"type": "string",
"airbyte_secret": true,
"order": 4
},
"jdbc_url_params": {
"description": "Additional properties to pass to the JDBC URL string when connecting to the database formatted as 'key=value' pairs separated by the symbol '&'. (example: key1=value1&key2=value2&key3=value3).",
"title": "JDBC URL Params",
"type": "string",
"order": 5
},
"ssl": {
"title": "SSL Connection",
"description": "Encrypt data using SSL.",
"type": "boolean",
"default": false,
"order": 6
},
"raw_data_schema": {
"type": "string",
"description": "The schema to write raw tables into (default: airbyte_internal)",
"title": "Raw Table Schema Name",
"order": 7
}
}
}
}

View File

@@ -1,173 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import static java.time.temporal.ChronoUnit.SECONDS;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider;
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.cdk.integrations.util.HostPortResolver;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.sql.SQLException;
import java.time.Duration;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.wait.strategy.Wait;
public class ClickhouseDestinationAcceptanceTest extends DestinationAcceptanceTest {
private static final String DB_NAME = "default";
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
private ClickHouseContainer db;
@Override
protected String getImageName() {
return "airbyte/destination-clickhouse:dev";
}
@Override
protected boolean implementsNamespaces() {
return true;
}
@Override
protected TestDataComparator getTestDataComparator() {
return new ClickhouseTestDataComparator();
}
@Override
protected boolean supportBasicDataTypeTest() {
return true;
}
@Override
protected boolean supportArrayDataTypeTest() {
return true;
}
@Override
protected boolean supportObjectDataTypeTest() {
return true;
}
@Override
protected String getDefaultSchema(final JsonNode config) {
if (config.get(JdbcUtils.DATABASE_KEY) == null) {
return null;
}
return config.get(JdbcUtils.DATABASE_KEY).asText();
}
@Override
protected JsonNode getConfig() {
return Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, HostPortResolver.resolveHost(db))
.put(JdbcUtils.PORT_KEY, HostPortResolver.resolvePort(db))
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, db.getUsername())
.put(JdbcUtils.PASSWORD_KEY, db.getPassword())
.put(JdbcUtils.SCHEMA_KEY, DB_NAME)
.put(JdbcUtils.SSL_KEY, false)
.build());
}
@Override
protected JsonNode getFailCheckConfig() {
final JsonNode clone = Jsons.clone(getConfig());
((ObjectNode) clone).put("password", "wrong password");
return clone;
}
@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv testEnv,
final String streamName,
final String namespace)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getIdentifier(streamName), namespace);
}
@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
final String streamName,
final String namespace,
final JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(StreamId.concatenateRawTableName(namespace, streamName), "airbyte_internal")
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.collect(Collectors.toList());
}
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws SQLException {
final JdbcDatabase jdbcDB = getDatabase(getConfig());
final var nameTransformer = new StandardNameTransformer();
final String query = String.format("SELECT * FROM `%s`.`%s` ORDER BY %s ASC", schemaName, nameTransformer.convertStreamName(tableName),
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT);
return jdbcDB.queryJsons(query);
}
private static JdbcDatabase getDatabase(final JsonNode config) {
return new DefaultJdbcDatabase(
DataSourceFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.has(JdbcUtils.PASSWORD_KEY) ? config.get(JdbcUtils.PASSWORD_KEY).asText() : null,
ClickhouseDestination.DRIVER_CLASS,
String.format(DatabaseDriver.CLICKHOUSE.getUrlFormatString(),
ClickhouseDestination.HTTP_PROTOCOL,
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText())),
new ClickhouseTestSourceOperations());
}
@Override
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) {
db = new ClickHouseContainer("clickhouse/clickhouse-server:22.5")
.waitingFor(Wait.forHttp("/ping").forPort(8123)
.forStatusCode(200).withStartupTimeout(Duration.of(60, SECONDS)));
db.start();
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) {
db.stop();
db.close();
}
@ParameterizedTest
@ArgumentsSource(DataTypeTestArgumentProvider.class)
public void testDataTypeTestWithNormalization(final String messagesFilename,
final String catalogFilename,
final DataTypeTestArgumentProvider.TestCompatibility testCompatibility)
throws Exception {
// arrays are not fully supported yet in jdbc driver
// https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseArray.java
if (messagesFilename.contains("array")) {
return;
}
super.testDataTypeTestWithNormalization(messagesFilename, catalogFilename, testCompatibility);
}
}

View File

@@ -1,154 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.standardtest.destination.comparator.AdvancedTestDataComparator;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.ZoneOffset;
import java.time.ZonedDateTime;
import java.time.format.DateTimeFormatter;
import java.time.format.DateTimeParseException;
import java.util.ArrayList;
import java.util.List;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
public class ClickhouseTestDataComparator extends AdvancedTestDataComparator {
private static final Logger LOGGER = LoggerFactory.getLogger(ClickhouseTestDataComparator.class);
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
private static final String CLICKHOUSE_DATETIME_WITH_TZ_FORMAT = "yyyy-MM-dd'T'HH:mm:ss.SSSX";
// https://clickhouse.com/docs/en/sql-reference/data-types/date32/
private final LocalDate minSupportedDate = LocalDate.parse("1970-01-01");
private final LocalDate maxSupportedDate = LocalDate.parse("2149-06-06");
private final ZonedDateTime minSupportedDateTime = ZonedDateTime.parse(
"1925-01-01T00:00:00.000Z");
private final ZonedDateTime maxSupportedDateTime = ZonedDateTime.parse(
"2283-11-10T20:23:45.000Z");
@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
@Override
protected boolean compareNumericValues(final String firstNumericValue,
final String secondNumericValue) {
// clickhouse stores double 1.14 as 1.1400000000000001
// https://clickhouse.com/docs/en/sql-reference/data-types/float/
final double epsilon = 0.000000000000001d;
final double firstValue = Double.parseDouble(firstNumericValue);
final double secondValue = Double.parseDouble(secondNumericValue);
return firstValue == secondValue || Math.abs(firstValue - secondValue) < epsilon;
}
@Override
protected boolean compareBooleanValues(final String firstValue, final String secondValue) {
return parseBool(firstValue) == parseBool(secondValue);
}
@Override
protected boolean compareDateValues(final String airbyteMessageValue,
final String destinationValue) {
final LocalDate expectedDate = LocalDate.parse(airbyteMessageValue);
final LocalDate actualDate = LocalDate.parse(destinationValue);
if (expectedDate.isBefore(minSupportedDate) || expectedDate.isAfter(maxSupportedDate)) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/date32
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}
return actualDate.equals(expectedDate);
}
@Override
protected boolean compareDateTimeWithTzValues(final String airbyteMessageValue,
final String destinationValue) {
try {
final ZonedDateTime airbyteDate = ZonedDateTime.parse(airbyteMessageValue,
getAirbyteDateTimeWithTzFormatter()).withZoneSameInstant(ZoneOffset.UTC);
final ZonedDateTime destinationDate = parseDestinationDateWithTz(destinationValue);
if (airbyteDate.isBefore(minSupportedDateTime) || airbyteDate.isAfter(maxSupportedDateTime)) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}
return airbyteDate.equals(destinationDate);
} catch (final DateTimeParseException e) {
LOGGER.warn(
"Fail to convert values to ZonedDateTime. Try to compare as text. Airbyte value({}), Destination value ({}). Exception: {}",
airbyteMessageValue, destinationValue, e);
return compareTextValues(airbyteMessageValue, destinationValue);
}
}
@Override
protected ZonedDateTime parseDestinationDateWithTz(final String destinationValue) {
return ZonedDateTime.parse(destinationValue,
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT)).withZoneSameInstant(
ZoneOffset.UTC);
}
@Override
protected boolean compareDateTimeValues(final String airbyteMessageValue,
final String destinationValue) {
final LocalDateTime expectedDateTime = LocalDateTime.parse(airbyteMessageValue);
LocalDateTime actualDateTime;
try {
actualDateTime = LocalDateTime.parse(destinationValue,
DateTimeFormatter.ofPattern(CLICKHOUSE_DATETIME_WITH_TZ_FORMAT));
} catch (final DateTimeParseException e) {
LOGGER.warn("Error using Clickhouse Timezone format, trying standard format", e);
actualDateTime = LocalDateTime.parse(destinationValue);
}
if (expectedDateTime.isBefore(minSupportedDateTime.toLocalDateTime())
|| expectedDateTime.isAfter(maxSupportedDateTime.toLocalDateTime())) {
// inserting any dates that are out of supported range causes registers overflow in clickhouseDB,
// so actually you end up with unpredicted values, more
// https://clickhouse.com/docs/en/sql-reference/data-types/datetime64
LOGGER.warn(
"Test value is out of range and would be corrupted by Snowflake, so we skip this verification");
return true;
}
return expectedDateTime.equals(actualDateTime);
}
private static boolean parseBool(final String valueAsString) {
// boolen as a String may be returned as true\false and as 0\1
// https://clickhouse.com/docs/en/sql-reference/data-types/boolean
try {
return Integer.parseInt(valueAsString) > 0;
} catch (final NumberFormatException ex) {
return Boolean.parseBoolean(valueAsString);
}
}
}

View File

@@ -1,34 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.DataTypeUtils;
import io.airbyte.cdk.db.jdbc.JdbcSourceOperations;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.time.LocalDate;
import java.time.LocalDateTime;
import java.time.format.DateTimeFormatter;
public class ClickhouseTestSourceOperations extends JdbcSourceOperations {
@Override
protected void putDate(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
node.put(columnName, DateTimeFormatter.ISO_DATE.format(resultSet.getTimestamp(index).toLocalDateTime()));
}
@Override
protected void putTimestamp(final ObjectNode node, final String columnName, final ResultSet resultSet, final int index) throws SQLException {
final LocalDateTime timestamp = getObject(resultSet, index, LocalDateTime.class);
final LocalDate date = timestamp.toLocalDate();
DateTimeFormatter dateTimeFormatter = DateTimeFormatter.ofPattern(
DataTypeUtils.DATE_FORMAT_WITH_MILLISECONDS_PATTERN);
node.put(columnName, resolveEra(date, timestamp.format(dateTimeFormatter)));
}
}

View File

@@ -1,187 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.JavaBaseConstants;
import io.airbyte.cdk.integrations.base.ssh.SshBastionContainer;
import io.airbyte.cdk.integrations.base.ssh.SshTunnel;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.cdk.integrations.standardtest.destination.DestinationAcceptanceTest;
import io.airbyte.cdk.integrations.standardtest.destination.argproviders.DataTypeTestArgumentProvider;
import io.airbyte.cdk.integrations.standardtest.destination.comparator.TestDataComparator;
import io.airbyte.commons.json.Jsons;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.stream.Collectors;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.ArgumentsSource;
import org.testcontainers.containers.ClickHouseContainer;
import org.testcontainers.containers.Network;
/**
* Abstract class that allows us to avoid duplicating testing logic for testing SSH with a key file
* or with a password.
*/
public abstract class SshClickhouseDestinationAcceptanceTest extends DestinationAcceptanceTest {
public abstract SshTunnel.TunnelMethod getTunnelMethod();
private static final String DB_NAME = "default";
private static final Network network = Network.newNetwork();
private final StandardNameTransformer namingResolver = new StandardNameTransformer();
private ClickHouseContainer db;
private final SshBastionContainer bastion = new SshBastionContainer();
@Override
protected String getImageName() {
return "airbyte/destination-clickhouse:dev";
}
@Override
protected boolean implementsNamespaces() {
return true;
}
@Override
protected TestDataComparator getTestDataComparator() {
return new ClickhouseTestDataComparator();
}
@Override
protected boolean supportBasicDataTypeTest() {
return true;
}
@Override
protected boolean supportArrayDataTypeTest() {
return true;
}
@Override
protected boolean supportObjectDataTypeTest() {
return true;
}
@Override
protected String getDefaultSchema(final JsonNode config) {
if (config.get(JdbcUtils.DATABASE_KEY) == null) {
return null;
}
return config.get(JdbcUtils.DATABASE_KEY).asText();
}
@Override
protected JsonNode getConfig() throws Exception {
return bastion.getTunnelConfig(getTunnelMethod(), bastion.getBasicDbConfigBuider(db, DB_NAME)
.put("schema", DB_NAME), true);
}
@Override
protected JsonNode getFailCheckConfig() throws Exception {
final JsonNode clone = Jsons.clone(getConfig());
((ObjectNode) clone).put("password", "wrong password");
return clone;
}
@Override
protected List<JsonNode> retrieveNormalizedRecords(final TestDestinationEnv testEnv,
final String streamName,
final String namespace)
throws Exception {
return retrieveRecordsFromTable(namingResolver.getIdentifier(streamName), namespace);
}
@Override
protected List<JsonNode> retrieveRecords(final TestDestinationEnv testEnv,
final String streamName,
final String namespace,
final JsonNode streamSchema)
throws Exception {
return retrieveRecordsFromTable(StreamId.concatenateRawTableName(namespace, streamName), "airbyte_internal")
.stream()
.map(r -> Jsons.deserialize(r.get(JavaBaseConstants.COLUMN_NAME_DATA).asText()))
.collect(Collectors.toList());
}
private List<JsonNode> retrieveRecordsFromTable(final String tableName, final String schemaName) throws Exception {
return SshTunnel.sshWrap(
getConfig(),
JdbcUtils.HOST_LIST_KEY,
JdbcUtils.PORT_LIST_KEY,
mangledConfig -> {
final JdbcDatabase database = getDatabase(mangledConfig);
final String query = String.format("SELECT * FROM `%s`.`%s` ORDER BY %s ASC", schemaName, namingResolver.convertStreamName(tableName),
JavaBaseConstants.COLUMN_NAME_AB_EXTRACTED_AT);
return database.queryJsons(query);
});
}
@Override
protected List<String> resolveIdentifier(final String identifier) {
final List<String> result = new ArrayList<>();
final String resolved = namingResolver.getIdentifier(identifier);
result.add(identifier);
result.add(resolved);
if (!resolved.startsWith("\"")) {
result.add(resolved.toLowerCase());
result.add(resolved.toUpperCase());
}
return result;
}
private static JdbcDatabase getDatabase(final JsonNode config) {
return new DefaultJdbcDatabase(
DataSourceFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.has(JdbcUtils.PASSWORD_KEY) ? config.get(JdbcUtils.PASSWORD_KEY).asText() : null,
ClickhouseDestination.DRIVER_CLASS,
String.format(DatabaseDriver.CLICKHOUSE.getUrlFormatString(),
ClickhouseDestination.HTTP_PROTOCOL,
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText())),
new ClickhouseTestSourceOperations());
}
@Override
protected void setup(final TestDestinationEnv testEnv, final HashSet<String> TEST_SCHEMAS) {
bastion.initAndStartBastion(network);
db = (ClickHouseContainer) new ClickHouseContainer("clickhouse/clickhouse-server:22.5").withNetwork(network);
db.start();
}
@Override
protected void tearDown(final TestDestinationEnv testEnv) {
bastion.stopAndCloseContainers(db);
}
@ParameterizedTest
@ArgumentsSource(DataTypeTestArgumentProvider.class)
public void testDataTypeTestWithNormalization(final String messagesFilename,
final String catalogFilename,
final DataTypeTestArgumentProvider.TestCompatibility testCompatibility)
throws Exception {
// arrays are not fully supported yet in jdbc driver
// https://github.com/ClickHouse/clickhouse-jdbc/blob/master/clickhouse-jdbc/src/main/java/ru/yandex/clickhouse/ClickHouseArray.java
if (messagesFilename.contains("array")) {
return;
}
super.testDataTypeTestWithNormalization(messagesFilename, catalogFilename, testCompatibility);
}
}

View File

@@ -1,16 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import io.airbyte.cdk.integrations.base.ssh.SshTunnel;
public class SshKeyClickhouseDestinationAcceptanceTest extends SshClickhouseDestinationAcceptanceTest {
@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_KEY_AUTH;
}
}

View File

@@ -1,16 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import io.airbyte.cdk.integrations.base.ssh.SshTunnel;
public class SshPasswordClickhouseDestinationAcceptanceTest extends SshClickhouseDestinationAcceptanceTest {
@Override
public SshTunnel.TunnelMethod getTunnelMethod() {
return SshTunnel.TunnelMethod.SSH_PASSWORD_AUTH;
}
}

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2
package io.airbyte.integrations.destination.clickhouse
import io.airbyte.cdk.load.test.util.ConfigurationUpdater
import io.airbyte.cdk.load.test.util.DefaultNamespaceResult
import io.airbyte.integrations.destination.clickhouse_v2.ClickhouseContainerHelper.getIpAddress
import io.airbyte.integrations.destination.clickhouse_v2.ClickhouseContainerHelper.getPassword
import io.airbyte.integrations.destination.clickhouse_v2.ClickhouseContainerHelper.getPort
import io.airbyte.integrations.destination.clickhouse_v2.ClickhouseContainerHelper.getUsername
import io.airbyte.integrations.destination.clickhouse.ClickhouseContainerHelper.getIpAddress
import io.airbyte.integrations.destination.clickhouse.ClickhouseContainerHelper.getPassword
import io.airbyte.integrations.destination.clickhouse.ClickhouseContainerHelper.getPort
import io.airbyte.integrations.destination.clickhouse.ClickhouseContainerHelper.getUsername
class ClickhouseConfigUpdater : ConfigurationUpdater {
override fun update(config: String): String {

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2
package io.airbyte.integrations.destination.clickhouse
import org.testcontainers.clickhouse.ClickHouseContainer

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2
package io.airbyte.integrations.destination.clickhouse
import io.github.oshai.kotlinlogging.KotlinLogging
import java.nio.file.Path

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.check
package io.airbyte.integrations.destination.clickhouse.check
import io.airbyte.cdk.load.check.CheckIntegrationTest
import io.airbyte.cdk.load.check.CheckTestConfig
import io.airbyte.integrations.destination.clickhouse_v2.ClickhouseConfigUpdater
import io.airbyte.integrations.destination.clickhouse_v2.ClickhouseContainerHelper
import io.airbyte.integrations.destination.clickhouse_v2.Utils.getConfigPath
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseSpecification
import io.airbyte.integrations.destination.clickhouse.ClickhouseConfigUpdater
import io.airbyte.integrations.destination.clickhouse.ClickhouseContainerHelper
import io.airbyte.integrations.destination.clickhouse.Utils.getConfigPath
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseSpecification
import java.nio.file.Files
import org.junit.jupiter.api.BeforeAll
import org.junit.jupiter.api.Disabled

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.fixtures
package io.airbyte.integrations.destination.clickhouse.fixtures
import io.airbyte.cdk.load.data.AirbyteType
import io.airbyte.cdk.load.data.AirbyteValue
@@ -16,7 +16,7 @@ import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.cdk.load.test.util.ExpectedRecordMapper
import io.airbyte.cdk.load.test.util.OutputRecord
import io.airbyte.integrations.destination.clickhouse_v2.config.toClickHouseCompatibleName
import io.airbyte.integrations.destination.clickhouse.config.toClickHouseCompatibleName
import java.math.RoundingMode
import java.time.LocalTime
import java.time.ZoneOffset

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.spec
package io.airbyte.integrations.destination.clickhouse.spec
import io.airbyte.cdk.load.spec.SpecTest

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
import com.clickhouse.client.api.Client
import com.clickhouse.client.api.ClientFaultCause
@@ -25,15 +25,15 @@ import io.airbyte.cdk.load.write.SchematizedNestedValueBehavior
import io.airbyte.cdk.load.write.StronglyTyped
import io.airbyte.cdk.load.write.UnionBehavior
import io.airbyte.cdk.load.write.UnknownTypesBehavior
import io.airbyte.integrations.destination.clickhouse_v2.ClickhouseConfigUpdater
import io.airbyte.integrations.destination.clickhouse_v2.ClickhouseContainerHelper
import io.airbyte.integrations.destination.clickhouse_v2.Utils
import io.airbyte.integrations.destination.clickhouse_v2.config.toClickHouseCompatibleName
import io.airbyte.integrations.destination.clickhouse_v2.fixtures.ClickhouseExpectedRecordMapper
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfigurationFactory
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseSpecificationOss
import io.airbyte.integrations.destination.clickhouse_v2.write.load.ClientProvider.getClient
import io.airbyte.integrations.destination.clickhouse.ClickhouseConfigUpdater
import io.airbyte.integrations.destination.clickhouse.ClickhouseContainerHelper
import io.airbyte.integrations.destination.clickhouse.Utils
import io.airbyte.integrations.destination.clickhouse.config.toClickHouseCompatibleName
import io.airbyte.integrations.destination.clickhouse.fixtures.ClickhouseExpectedRecordMapper
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfigurationFactory
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseSpecificationOss
import io.airbyte.integrations.destination.clickhouse.write.load.ClientProvider.getClient
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import java.nio.file.Files
import java.time.ZonedDateTime

View File

@@ -1,12 +1,12 @@
{
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/clickhouse-v2",
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/clickhouse",
"connectionSpecification" : {
"$schema" : "http://json-schema.org/draft-07/schema#",
"title" : "Clickhouse Specification Cloud",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"hostname" : {
"host" : {
"type" : "string",
"description" : "Hostname of the database.",
"title" : "Hostname",
@@ -57,7 +57,7 @@
"default" : false
}
},
"required" : [ "hostname", "port", "protocol", "database", "username", "password", "enable_json" ],
"required" : [ "host", "port", "protocol", "database", "username", "password", "enable_json" ],
"groups" : [ {
"id" : "connection",
"title" : "Connection"

View File

@@ -1,12 +1,12 @@
{
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/clickhouse-v2",
"documentationUrl" : "https://docs.airbyte.com/integrations/destinations/clickhouse",
"connectionSpecification" : {
"$schema" : "http://json-schema.org/draft-07/schema#",
"title" : "Clickhouse Specification Oss",
"type" : "object",
"additionalProperties" : true,
"properties" : {
"hostname" : {
"host" : {
"type" : "string",
"description" : "Hostname of the database.",
"title" : "Hostname",
@@ -56,7 +56,7 @@
"default" : false
}
},
"required" : [ "hostname", "port", "protocol", "database", "username", "password", "enable_json" ],
"required" : [ "host", "port", "protocol", "database", "username", "password", "enable_json" ],
"groups" : [ {
"id" : "connection",
"title" : "Connection"

View File

@@ -1,5 +1,5 @@
{
"hostname": "localhost",
"host": "localhost",
"port": "8123",
"protocol": "http",
"username": "replace_me_username",

View File

@@ -1,114 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertNotNull;
import static org.junit.jupiter.api.Assertions.assertTrue;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.node.ObjectNode;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.commons.io.IOs;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.resources.MoreResources;
import io.airbyte.protocol.models.v0.ConnectorSpecification;
import io.airbyte.validation.json.JsonSchemaValidator;
import java.io.File;
import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
/**
* Tests that the clickhouse spec passes JsonSchema validation. While this may seem like overkill,
* we are doing it because there are some gotchas in correctly configuring the oneOf.
*/
public class ClickhouseDestinationSpecTest {
private static final String CONFIGURATION = "{ "
+ "\"password\" : \"pwd\", "
+ "\"username\" : \"clickhouse\", "
+ "\"database\" : \"clickhouse_db\", "
+ "\"port\" : 8123, "
+ "\"host\" : \"localhost\", "
+ "\"jdbc_url_params\" : \"property1=pValue1&property2=pValue2\", "
+ "\"ssl\" : true "
+ "}";
private static JsonNode schema;
private static JsonSchemaValidator validator;
@BeforeAll
static void init() throws IOException {
final String spec = MoreResources.readResource("spec.json");
final File schemaFile = IOs.writeFile(Files.createTempDirectory(Path.of("/tmp"), "cl-spec-test"), "schema.json", spec).toFile();
schema = JsonSchemaValidator.getSchema(schemaFile).get("connectionSpecification");
validator = new JsonSchemaValidator();
}
@Test
void testDatabaseMissing() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove(JdbcUtils.DATABASE_KEY);
assertFalse(validator.test(schema, config));
}
@Test
void testSchemaMissing() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove("schemas");
assertTrue(validator.test(schema, config));
}
@Test
void testWithoutReplicationMethod() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove("replication_method");
assertTrue(validator.test(schema, config));
}
@Test
void testWithReplicationMethodWithReplicationSlot() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
assertTrue(validator.test(schema, config));
}
@Test
void testWithJdbcAdditionalProperty() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
assertTrue(validator.test(schema, config));
}
@Test
void testJdbcAdditionalProperty() throws Exception {
final ConnectorSpecification spec = new ClickhouseDestination().spec();
assertNotNull(spec.getConnectionSpecification().get("properties").get(JdbcUtils.JDBC_URL_PARAMS_KEY));
}
@Test
void testHostMissing() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove("host");
assertFalse(validator.test(schema, config));
}
@Test
void testPortMissing() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove("port");
assertFalse(validator.test(schema, config));
}
@Test
void testUsernameMissing() {
final JsonNode config = Jsons.deserialize(CONFIGURATION);
((ObjectNode) config).remove("username");
assertFalse(validator.test(schema, config));
}
}

View File

@@ -1,163 +0,0 @@
/*
* Copyright (c) 2023 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse;
import static org.junit.jupiter.api.Assertions.assertEquals;
import com.fasterxml.jackson.databind.JsonNode;
import com.google.common.collect.ImmutableMap;
import io.airbyte.cdk.db.factory.DataSourceFactory;
import io.airbyte.cdk.db.factory.DatabaseDriver;
import io.airbyte.cdk.db.jdbc.DefaultJdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcDatabase;
import io.airbyte.cdk.db.jdbc.JdbcUtils;
import io.airbyte.cdk.integrations.base.Destination;
import io.airbyte.cdk.integrations.base.DestinationConfig;
import io.airbyte.cdk.integrations.base.SerializedAirbyteMessageConsumer;
import io.airbyte.cdk.integrations.destination.StandardNameTransformer;
import io.airbyte.commons.json.Jsons;
import io.airbyte.commons.map.MoreMaps;
import io.airbyte.integrations.base.destination.typing_deduping.StreamId;
import io.airbyte.protocol.models.Field;
import io.airbyte.protocol.models.JsonSchemaType;
import io.airbyte.protocol.models.v0.AirbyteMessage;
import io.airbyte.protocol.models.v0.AirbyteMessage.Type;
import io.airbyte.protocol.models.v0.AirbyteRecordMessage;
import io.airbyte.protocol.models.v0.AirbyteStateMessage;
import io.airbyte.protocol.models.v0.CatalogHelpers;
import io.airbyte.protocol.models.v0.ConfiguredAirbyteCatalog;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
import java.util.Comparator;
import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
import java.util.stream.IntStream;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.testcontainers.containers.ClickHouseContainer;
public class ClickhouseDestinationTest {
private static final String DB_NAME = "default";
private static final String STREAM_NAME = "id_and_name";
private static final StandardNameTransformer namingResolver = new StandardNameTransformer();
private static ClickHouseContainer db;
private static ConfiguredAirbyteCatalog catalog;
private static JsonNode config;
private static final Map<String, String> CONFIG_WITH_SSL = ImmutableMap.of(
JdbcUtils.HOST_KEY, "localhost",
JdbcUtils.PORT_KEY, "1337",
JdbcUtils.USERNAME_KEY, "user",
JdbcUtils.DATABASE_KEY, "db");
private static final Map<String, String> CONFIG_NO_SSL = MoreMaps.merge(
CONFIG_WITH_SSL,
ImmutableMap.of(
"socket_timeout", "3000000",
JdbcUtils.SSL_KEY, "false"));
@BeforeAll
static void init() {
db = new ClickHouseContainer("clickhouse/clickhouse-server:22.5");
db.start();
}
@BeforeEach
void setup() {
catalog = new ConfiguredAirbyteCatalog().withStreams(List.of(
CatalogHelpers.createConfiguredAirbyteStream(
STREAM_NAME,
DB_NAME,
Field.of("id", JsonSchemaType.NUMBER),
Field.of("name", JsonSchemaType.STRING))));
config = Jsons.jsonNode(ImmutableMap.builder()
.put(JdbcUtils.HOST_KEY, db.getHost())
.put(JdbcUtils.PORT_KEY, db.getFirstMappedPort())
.put(JdbcUtils.DATABASE_KEY, DB_NAME)
.put(JdbcUtils.USERNAME_KEY, db.getUsername())
.put(JdbcUtils.PASSWORD_KEY, db.getPassword())
.put(JdbcUtils.SCHEMA_KEY, DB_NAME)
.put(JdbcUtils.SSL_KEY, false)
.build());
}
@AfterAll
static void cleanUp() {
db.stop();
db.close();
}
@Test
void sanityTest() throws Exception {
final Destination dest = new ClickhouseDestination();
DestinationConfig.initialize(config, dest.isV2Destination());
final SerializedAirbyteMessageConsumer consumer = dest.getSerializedMessageConsumer(config, catalog,
Destination::defaultOutputRecordCollector);
final List<AirbyteMessage> expectedRecords = generateRecords(10);
consumer.start();
expectedRecords.forEach(m -> {
try {
final var strMessage = Jsons.jsonNode(m).toString();
consumer.accept(strMessage, strMessage.getBytes(StandardCharsets.UTF_8).length);
} catch (final Exception e) {
throw new RuntimeException(e);
}
});
final var abMessage = Jsons.jsonNode(new AirbyteMessage()
.withType(Type.STATE)
.withState(new AirbyteStateMessage()
.withData(Jsons.jsonNode(ImmutableMap.of(DB_NAME + "." + STREAM_NAME, 10)))))
.toString();
consumer.accept(abMessage, abMessage.getBytes(StandardCharsets.UTF_8).length);
consumer.close();
final JdbcDatabase database = new DefaultJdbcDatabase(
DataSourceFactory.create(
config.get(JdbcUtils.USERNAME_KEY).asText(),
config.get(JdbcUtils.PASSWORD_KEY).asText(),
ClickhouseDestination.DRIVER_CLASS,
String.format(DatabaseDriver.CLICKHOUSE.getUrlFormatString(),
ClickhouseDestination.HTTP_PROTOCOL,
config.get(JdbcUtils.HOST_KEY).asText(),
config.get(JdbcUtils.PORT_KEY).asInt(),
config.get(JdbcUtils.DATABASE_KEY).asText())));
final List<JsonNode> actualRecords = database.bufferedResultSetQuery(
connection -> connection.createStatement().executeQuery(
String.format("SELECT * FROM %s.%s;", "airbyte_internal",
StreamId.concatenateRawTableName(DB_NAME, STREAM_NAME))),
JdbcUtils.getDefaultSourceOperations()::rowToJson);
assertEquals(
expectedRecords.stream().map(AirbyteMessage::getRecord)
.map(AirbyteRecordMessage::getData).collect(Collectors.toList()),
actualRecords.stream()
.map(o -> o.get("_airbyte_data").asText())
.map(Jsons::deserialize)
.sorted(Comparator.comparingInt(x -> x.get("id").asInt()))
.collect(Collectors.toList()));
}
private List<AirbyteMessage> generateRecords(final int n) {
return IntStream.range(0, n)
.boxed()
.map(i -> new AirbyteMessage()
.withType(Type.RECORD)
.withRecord(new AirbyteRecordMessage()
.withStream(STREAM_NAME)
.withNamespace(DB_NAME)
.withEmittedAt(Instant.now().toEpochMilli())
.withData(Jsons.jsonNode(ImmutableMap.of("id", i, "name", "test name " + i)))))
.collect(Collectors.toList());
}
}

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.check
package io.airbyte.integrations.destination.clickhouse.check
import com.clickhouse.client.api.Client
import com.clickhouse.client.api.insert.InsertResponse
import com.clickhouse.data.ClickHouseFormat
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.client
package io.airbyte.integrations.destination.clickhouse.client
import com.clickhouse.client.api.Client as ClickHouseClientRaw
import com.clickhouse.client.api.command.CommandResponse
@@ -14,9 +14,9 @@ import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.cdk.load.orchestration.db.TempTableNameGenerator
import io.airbyte.integrations.destination.clickhouse_v2.config.ClickhouseFinalTableNameGenerator
import io.airbyte.integrations.destination.clickhouse_v2.model.AlterationSummary
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.config.ClickhouseFinalTableNameGenerator
import io.airbyte.integrations.destination.clickhouse.model.AlterationSummary
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import io.mockk.coEvery
import io.mockk.coVerify
import io.mockk.coVerifyOrder

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.client
package io.airbyte.integrations.destination.clickhouse.client
import com.github.vertical_blank.sqlformatter.SqlFormatter
import com.github.vertical_blank.sqlformatter.languages.Dialect
import io.airbyte.cdk.load.orchestration.db.ColumnNameMapping
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.integrations.destination.clickhouse_v2.model.AlterationSummary
import io.airbyte.integrations.destination.clickhouse_v2.spec.ClickhouseConfiguration
import io.airbyte.integrations.destination.clickhouse.model.AlterationSummary
import io.airbyte.integrations.destination.clickhouse.spec.ClickhouseConfiguration
import io.mockk.mockk
import kotlin.test.assertTrue
import org.junit.jupiter.api.Assertions

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.config
package io.airbyte.integrations.destination.clickhouse.config
import java.util.UUID
import org.junit.jupiter.api.Assertions

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
import com.clickhouse.client.api.Client
import com.clickhouse.client.api.data_formats.RowBinaryFormatWriter

View File

@@ -2,14 +2,14 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
import com.clickhouse.client.api.Client
import io.airbyte.cdk.load.command.DestinationStream
import io.airbyte.cdk.load.orchestration.db.TableName
import io.airbyte.cdk.load.orchestration.db.direct_load_table.DirectLoadTableExecutionConfig
import io.airbyte.cdk.load.write.StreamStateStore
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.RecordMunger
import io.airbyte.integrations.destination.clickhouse.write.transform.RecordMunger
import io.mockk.every
import io.mockk.impl.annotations.MockK
import io.mockk.junit5.MockKExtension

View File

@@ -2,12 +2,12 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
import io.airbyte.cdk.load.data.StringValue
import io.airbyte.cdk.load.message.DestinationRecordRaw
import io.airbyte.cdk.load.write.DirectLoader
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.RecordMunger
import io.airbyte.integrations.destination.clickhouse.write.transform.RecordMunger
import io.mockk.coVerify
import io.mockk.every
import io.mockk.impl.annotations.MockK

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.load
package io.airbyte.integrations.destination.clickhouse.write.load
import kotlin.test.assertFalse
import kotlin.test.assertTrue

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.transform
package io.airbyte.integrations.destination.clickhouse.write.transform
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.ArrayValue
@@ -19,17 +19,17 @@ import io.airbyte.cdk.load.data.TimeWithTimezoneValue
import io.airbyte.cdk.load.data.TimeWithoutTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithTimezoneValue
import io.airbyte.cdk.load.data.TimestampWithoutTimezoneValue
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DATE32_MAX
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DATE32_MIN
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DECIMAL128_MAX
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.DECIMAL128_MIN
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.INT64_MAX
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercer.Constants.INT64_MIN
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteDateValue
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteIntegerValue
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteNumberValue
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteTimestampWithTimezoneValue
import io.airbyte.integrations.destination.clickhouse_v2.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteTimestampWithoutTimezoneValue
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DATE32_MAX
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DATE32_MIN
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DECIMAL128_MAX
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.DECIMAL128_MIN
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.INT64_MAX
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercer.Constants.INT64_MIN
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteDateValue
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteIntegerValue
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteNumberValue
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteTimestampWithTimezoneValue
import io.airbyte.integrations.destination.clickhouse.write.transform.ClickhouseCoercerTest.Fixtures.toAirbyteTimestampWithoutTimezoneValue
import io.airbyte.protocol.models.v0.AirbyteRecordMessageMetaChange
import io.mockk.impl.annotations.InjectMockKs
import io.mockk.junit5.MockKExtension

View File

@@ -2,7 +2,7 @@
* Copyright (c) 2025 Airbyte, Inc., all rights reserved.
*/
package io.airbyte.integrations.destination.clickhouse_v2.write.transform
package io.airbyte.integrations.destination.clickhouse.write.transform
import io.airbyte.cdk.load.data.AirbyteValue
import io.airbyte.cdk.load.data.BooleanType

View File

@@ -0,0 +1,121 @@
# ClickHouse
:::warning
The Clickhouse connector is outdated and does not use Destination v2 (typing and deduplication).
As a result, it may not work well with large datasets or could have performance issues.
Subscribe to the [discussion](https://github.com/airbytehq/airbyte/discussions/35339) to receive updates and learn more.
:::
## Features
| Feature | Supported?\(Yes/No\) | Notes |
| :----------------------------- | :------------------- | :---- |
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Append + Deduped | No | |
| Namespaces | Yes | |
#### Output Schema
Each stream will be output into its own table in ClickHouse. Each table will contain 3 columns:
- `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in ClickHouse is `String`.
- `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in ClickHouse is `DateTime64`.
- `_airbyte_data`: a json blob representing with the event data. The column type in ClickHouse is `String`.
## Getting Started \(Airbyte Cloud\)
Airbyte Cloud only supports connecting to your ClickHouse instance with SSL or TLS encryption, which is supported by [ClickHouse JDBC driver](https://github.com/ClickHouse/clickhouse-jdbc).
## Getting Started \(Airbyte Open Source\)
#### Requirements
To use the ClickHouse destination, you'll need:
- A ClickHouse server version 21.8.10.19 or above
#### Configure Network Access
Make sure your ClickHouse database can be accessed by Airbyte. If your database is within a VPC, you may need to allow access from the IP you're using to expose Airbyte.
#### **Permissions**
You need a ClickHouse user with the following permissions:
- can create tables and write rows.
- can create databases e.g:
You can create such a user by running:
```
GRANT CREATE ON * TO airbyte_user;
GRANT CREATE ON default * TO airbyte_user;
GRANT DROP ON * TO airbyte_user;
GRANT TRUNCATE ON * TO airbyte_user;
GRANT INSERT ON * TO airbyte_user;
GRANT SELECT ON * TO airbyte_user;
GRANT CREATE DATABASE ON airbyte_internal.* TO airbyte_user;
GRANT CREATE TABLE ON airbyte_internal.* TO airbyte_user;
GRANT DROP ON airbyte_internal.* TO airbyte_user;
GRANT TRUNCATE ON airbyte_internal.* TO airbyte_user;
GRANT INSERT ON airbyte_internal.* TO airbyte_user;
GRANT SELECT ON airbyte_internal.* TO airbyte_user;
```
You can also use a pre-existing user but we highly recommend creating a dedicated user for Airbyte.
#### Target Database
You will need to choose an existing database or create a new database that will be used to store synced data from Airbyte.
### Setup the ClickHouse Destination in Airbyte
You should now have all the requirements needed to configure ClickHouse as a destination in the UI. You'll need the following information to configure the ClickHouse destination:
- **Host**
- **Port**
- **Username**
- **Password**
- **Database**
- **Jdbc_url_params**
## Naming Conventions
From [ClickHouse SQL Identifiers syntax](https://clickhouse.com/docs/en/sql-reference/syntax/):
- SQL identifiers and key words must begin with a letter \(a-z, but also letters with diacritical marks and non-Latin letters\) or an underscore \(\_\).
- Subsequent characters in an identifier or key word can be letters, underscores, digits \(0-9\).
- Identifiers can be quoted or non-quoted. The latter is preferred.
- If you want to use identifiers the same as keywords or you want to use other symbols in identifiers, quote it using double quotes or backticks, for example, "id", `id`.
- If you want to write portable applications you are advised to always quote a particular name or never quote it.
Therefore, Airbyte ClickHouse destination will create tables and schemas using the Unquoted identifiers when possible or fallback to Quoted Identifiers if the names are containing special characters.
## Changelog
<details>
<summary>Expand to review</summary>
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.1.0 | 2024-06-16 | [\#61650](https://github.com/airbytehq/airbyte/pull/61650) | Migrate to new build flow + restrict connector to run on rootless permissions. Requires Airbyte >= 1.3.0. |
| 1.0.0 | 2024-02-07 | [\#34637](https://github.com/airbytehq/airbyte/pull/34637) | Update the raw table schema |
| 0.2.5 | 2023-06-21 | [\#27555](https://github.com/airbytehq/airbyte/pull/27555) | Reduce image size |
| 0.2.4 | 2023-06-05 | [\#27036](https://github.com/airbytehq/airbyte/pull/27036) | Internal code change for future development (install normalization packages inside connector) |
| 0.2.3 | 2023-04-04 | [\#24604](https://github.com/airbytehq/airbyte/pull/24604) | Support for destination checkpointing |
| 0.2.2 | 2023-02-21 | [\#21509](https://github.com/airbytehq/airbyte/pull/21509) | Compatibility update with security patch for strict encrypt version |
| 0.2.1 | 2022-12-06 | [\#19573](https://github.com/airbytehq/airbyte/pull/19573) | Update dbt version to 1.3.1 |
| 0.2.0 | 2022-09-27 | [\#16970](https://github.com/airbytehq/airbyte/pull/16970) | Remove TCP port from spec parameters |
| 0.1.12 | 2022-09-08 | [\#16444](https://github.com/airbytehq/airbyte/pull/16444) | Added custom jdbc params field |
| 0.1.10 | 2022-07-05 | [\#13639](https://github.com/airbytehq/airbyte/pull/13639) | Change JDBC ClickHouse version into 0.3.2-patch9 |
| 0.1.8 | 2022-07-05 | [\#13516](https://github.com/airbytehq/airbyte/pull/13516) | Added JDBC default parameter socket timeout |
| 0.1.7 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors |
| 0.1.6 | 2022-05-17 | [\#12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.1.5 | 2022-04-06 | [\#11729](https://github.com/airbytehq/airbyte/pull/11729) | Bump mina-sshd from 2.7.0 to 2.8.0 |
| 0.1.4 | 2022-02-25 | [\#10421](https://github.com/airbytehq/airbyte/pull/10421) | Refactor JDBC parameters handling |
| 0.1.3 | 2022-02-14 | [\#10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.1 | 2021-12-21 | [\#8982](https://github.com/airbytehq/airbyte/pull/8982) | Set isSchemaRequired to false |
| 0.1.0 | 2021-11-04 | [\#7620](https://github.com/airbytehq/airbyte/pull/7620) | Add ClickHouse destination |
</details>

View File

@@ -1,67 +1,29 @@
# Clickhouse Migration Guide
## Upgrading to 1.0.0
## Upgrading to 2.0.0
This version removes the option to use "normalization" with clickhouse. It also changes
the schema and database of Airbyte's "raw" tables to be compatible with the new
[Destinations V2](https://docs.airbyte.com/release_notes/upgrading_to_destinations_v2/#what-is-destinations-v2)
format. These changes will likely require updates to downstream dbt / SQL models. After this update,
Airbyte will only produce the raw v2 tables, which store all content in JSON. These changes remove
the ability to do deduplicated syncs with Clickhouse. (Clickhouse has an overview)[[https://clickhouse.com/docs/en/integrations/dbt]]
for integrating with dbt If you are interested in the Clickhouse destination gaining the full features
of Destinations V2 (including final tables), click [[https://github.com/airbytehq/airbyte/discussions/35339]]
to register your interest.
This version differs from 1.0.0 radically. Whereas 1.0.0 wrote all your data
as JSON to raw tables in airbyte_internal database, 2.0.0 will properly separate
your schema into typed columns and write to the specified database in the
configuration and the un-prefixed table name. You will no longer see
`airbyte_internal.{database}_raw__stream_{table}` and will instead see
`{database}.{table}`.
This upgrade will ignore any existing raw tables and will not migrate any data to the new schema.
For each stream, you could perform the following query to migrate the data from the old raw table
to the new raw table:
While is treated as a "breaking change", connections should continue to function
with no changes, albeit writing data to a completely different location and in a
different form. So any downstream pipelines will need updating to ingest the new
data location / format.
```sql
-- assumes your database was 'default'
-- replace `{{stream_name}}` with replace your stream name
## Migrating existing data to the new format
CREATE TABLE airbyte_internal.default_raw__stream_{{stream_name}}
(
`_airbyte_raw_id` String,
`_airbyte_extracted_at` DateTime64(3, 'GMT') DEFAULT now(),
`_airbyte_loaded_at` DateTime64(3, 'GMT') NULL,
`_airbyte_data` String,
PRIMARY KEY(`_airbyte_raw_id`)
)
ENGINE = MergeTree;
Unfortunately Airbyte has no way to migrate the existing raw tables to the new
typed format. The only "out of the box" way to get your data into the new format
is to re-sync it from scratch.
INSERT INTO `airbyte_internal`.`default_raw__stream_{{stream_name}}`
SELECT
`_airbyte_ab_id` AS "_airbyte_raw_id",
`_airbyte_emitted_at` AS "_airbyte_extracted_at",
NULL AS "_airbyte_loaded_at",
_airbyte_data AS "_airbyte_data"
FROM default._airbyte_raw_{{stream_name}};
```
## Removing the old tables
Airbyte will not delete any of your v1 data.
### Database/Schema and the Internal Schema
We have split the raw and final tables into their own schemas,
which in clickhouse is analogous to a `database`. For the Clickhouse destination, this means that
we will only write into the raw table which will live in the `airbyte_internal` database.
The tables written into this schema will be prefixed with either the default database provided in
the `DB Name` field when configuring clickhouse (but can also be overridden in the connection). You can
change the "raw" database from the default `airbyte_internal` by supplying a value for
`Raw Table Schema Name`.
For Example:
- DB Name: `default`
- Stream Name: `my_stream`
Writes to `airbyte_intneral.default_raw__stream_my_stream`
where as:
- DB Name: `default`
- Stream Name: `my_stream`
- Raw Table Schema Name: `raw_data`
Writes to: `raw_data.default_raw__stream_my_stream`
Because the new destination has no knowledge of the old destination's table
naming semantics, we will not remove existing data. If you would like to, you
will need to delete all the tables saved in the old format, which for most
people should be under `airbyte_internal.{database}_raw__`, but may vary based
on your specific configuration.

View File

@@ -1,93 +0,0 @@
# ClickHouse v2
A fresh implementation of ClickHouse leveraging our new CDK.
## Improvements over v1
* All sync modes supported
* Data will be typed and written to columns matching the defined schema (Direct Load)
* Performance improvements
* Actively maintained and developed by Airbyte
## Features
All sync modes are supported.
| Feature | Supported?\(Yes/No\) | Notes |
| :----------------------------- |:---------------------|:-------------------------------|
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Append + Deduped | Yes | Leverages `ReplacingMergeTree` |
| Namespaces | Yes | |
### Output Schema
Each stream will be output into its own table in ClickHouse in either the configured default database (`default`) or a database corresponding to the specified namespace on the stream.
Airbyte types will be converted to ClickHouse types as follows:
- Decimal types are NUMBER128(9) — 9 digit precision
- Timestamp are DateTime64(3) — millisecond precision
- Object types are JSON **if JSON is enabled in the actor config**; otherwise they are converted to String
- Integers are Int64
- Booleans are Bool
- Strings are String
- Unions will be converted to String
- Arrays will be converted to String
### Requirements
To use the ClickHouse destination, you'll need:
- A cloud ClickHouse instance
- A ClickHouse server version 21.8.10.19 or above
### Configure Network Access
Make sure your ClickHouse database can be accessed by Airbyte. If your database is within a VPC, you may need to allow access from the IP you're using to expose Airbyte.
### **Permissions**
You need a ClickHouse user with the following permissions:
- can create tables and write rows.
- can create databases e.g:
You can create such a user by running:
```
GRANT CREATE ON * TO airbyte_user;
GRANT CREATE ON {your configured default database} * TO airbyte_user;
GRANT DROP ON * TO airbyte_user;
GRANT TRUNCATE ON * TO airbyte_user;
GRANT INSERT ON * TO airbyte_user;
GRANT SELECT ON * TO airbyte_user;
GRANT CREATE DATABASE ON airbyte_internal.* TO airbyte_user;
GRANT CREATE TABLE ON airbyte_internal.* TO airbyte_user;
GRANT DROP ON airbyte_internal.* TO airbyte_user;
GRANT TRUNCATE ON airbyte_internal.* TO airbyte_user;
GRANT INSERT ON airbyte_internal.* TO airbyte_user;
GRANT SELECT ON airbyte_internal.* TO airbyte_user;
```
You can also use a pre-existing user but we highly recommend creating a dedicated user for Airbyte.
## Changelog
<details>
<summary>Expand to review</summary>
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:--------------------------------------------------------------|:-------------------------------------------------------------------------------|
| 0.1.11 | 2025-07-09 | [\#62883](https://github.com/airbytehq/airbyte/pull/62883) | Only set JSON properties on client if enabled to support older CH deployments. |
| 0.1.10 | 2025-07-08 | [\#62861](https://github.com/airbytehq/airbyte/pull/62861) | Set user agent header for internal CH telemetry. |
| 0.1.9 | 2025-07-03 | [\#62509](https://github.com/airbytehq/airbyte/pull/62509) | Simplify union stringification behavior. |
| 0.1.8 | 2025-06-30 | [\#62100](https://github.com/airbytehq/airbyte/pull/62100) | Add JSON support. |
| 0.1.7 | 2025-06-24 | [\#62047](https://github.com/airbytehq/airbyte/pull/62047) | Remove the use of the internal namespace. |
| 0.1.6 | 2025-06-24 | [\#62047](https://github.com/airbytehq/airbyte/pull/62047) | Hide protocol option when running on cloud. |
| 0.1.5 | 2025-06-24 | [\#62043](https://github.com/airbytehq/airbyte/pull/62043) | Expose database protocol config option. |
| 0.1.4 | 2025-06-24 | [\#62040](https://github.com/airbytehq/airbyte/pull/62040) | Checker inserts into configured DB. |
| 0.1.3 | 2025-06-24 | [\#62038](https://github.com/airbytehq/airbyte/pull/62038) | Allow the client to connect to the resolved DB. |
| 0.1.2 | 2025-06-23 | [\#62028](https://github.com/airbytehq/airbyte/pull/62028) | Enable the registry in OSS and cloud. |
| 0.1.1 | 2025-06-23 | [\#62022](https://github.com/airbytehq/airbyte/pull/62022) | Publish first beta version and pin the CDK version. |
| 0.1.0 | 2025-06-23 | [\#62024](https://github.com/airbytehq/airbyte/pull/62024) | Release first beta version. |
</details>

View File

@@ -1,45 +1,51 @@
# ClickHouse
:::warning
The Clickhouse connector is outdated and does not use Destination v2 (typing and deduplication).
As a result, it may not work well with large datasets or could have performance issues.
Subscribe to the [discussion](https://github.com/airbytehq/airbyte/discussions/35339) to receive updates and learn more.
:::
A fresh implementation of ClickHouse leveraging our new CDK.
## Improvements over v1
* All sync modes supported
* Data will be typed and written to columns matching the defined schema (Direct Load)
* Performance improvements
* Actively maintained and developed by Airbyte
## Features
| Feature | Supported?\(Yes/No\) | Notes |
| :----------------------------- | :------------------- | :---- |
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Append + Deduped | No | |
| Namespaces | Yes | |
All sync modes are supported.
#### Output Schema
| Feature | Supported?\(Yes/No\) | Notes |
| :----------------------------- |:---------------------|:-------------------------------|
| Full Refresh Sync | Yes | |
| Incremental - Append Sync | Yes | |
| Incremental - Append + Deduped | Yes | Leverages `ReplacingMergeTree` |
| Namespaces | Yes | |
Each stream will be output into its own table in ClickHouse. Each table will contain 3 columns:
### Output Schema
- `_airbyte_ab_id`: a uuid assigned by Airbyte to each event that is processed. The column type in ClickHouse is `String`.
- `_airbyte_emitted_at`: a timestamp representing when the event was pulled from the data source. The column type in ClickHouse is `DateTime64`.
- `_airbyte_data`: a json blob representing with the event data. The column type in ClickHouse is `String`.
Each stream will be output into its own table in ClickHouse in either the configured default database (`default`) or a database corresponding to the specified namespace on the stream.
## Getting Started \(Airbyte Cloud\)
Airbyte types will be converted to ClickHouse types as follows:
Airbyte Cloud only supports connecting to your ClickHouse instance with SSL or TLS encryption, which is supported by [ClickHouse JDBC driver](https://github.com/ClickHouse/clickhouse-jdbc).
- Decimal types are NUMBER128(9) — 9 digit precision
- Timestamp are DateTime64(3) — millisecond precision
- Object types are JSON **if JSON is enabled in the actor config**; otherwise they are converted to String
- Integers are Int64
- Booleans are Bool
- Strings are String
- Unions will be converted to String
- Arrays will be converted to String
## Getting Started \(Airbyte Open Source\)
#### Requirements
### Requirements
To use the ClickHouse destination, you'll need:
- A cloud ClickHouse instance
- A ClickHouse server version 21.8.10.19 or above
#### Configure Network Access
### Configure Network Access
Make sure your ClickHouse database can be accessed by Airbyte. If your database is within a VPC, you may need to allow access from the IP you're using to expose Airbyte.
#### **Permissions**
### **Permissions**
You need a ClickHouse user with the following permissions:
@@ -50,7 +56,7 @@ You can create such a user by running:
```
GRANT CREATE ON * TO airbyte_user;
GRANT CREATE ON default * TO airbyte_user;
GRANT CREATE ON {your configured default database} * TO airbyte_user;
GRANT DROP ON * TO airbyte_user;
GRANT TRUNCATE ON * TO airbyte_user;
GRANT INSERT ON * TO airbyte_user;
@@ -65,57 +71,24 @@ GRANT SELECT ON airbyte_internal.* TO airbyte_user;
You can also use a pre-existing user but we highly recommend creating a dedicated user for Airbyte.
#### Target Database
You will need to choose an existing database or create a new database that will be used to store synced data from Airbyte.
### Setup the ClickHouse Destination in Airbyte
You should now have all the requirements needed to configure ClickHouse as a destination in the UI. You'll need the following information to configure the ClickHouse destination:
- **Host**
- **Port**
- **Username**
- **Password**
- **Database**
- **Jdbc_url_params**
## Naming Conventions
From [ClickHouse SQL Identifiers syntax](https://clickhouse.com/docs/en/sql-reference/syntax/):
- SQL identifiers and key words must begin with a letter \(a-z, but also letters with diacritical marks and non-Latin letters\) or an underscore \(\_\).
- Subsequent characters in an identifier or key word can be letters, underscores, digits \(0-9\).
- Identifiers can be quoted or non-quoted. The latter is preferred.
- If you want to use identifiers the same as keywords or you want to use other symbols in identifiers, quote it using double quotes or backticks, for example, "id", `id`.
- If you want to write portable applications you are advised to always quote a particular name or never quote it.
Therefore, Airbyte ClickHouse destination will create tables and schemas using the Unquoted identifiers when possible or fallback to Quoted Identifiers if the names are containing special characters.
## Changelog
<details>
<summary>Expand to review</summary>
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:-----------------------------------------------------------|:----------------------------------------------------------------------------------------------------------|
| 1.1.0 | 2024-06-16 | [\#61650](https://github.com/airbytehq/airbyte/pull/61650) | Migrate to new build flow + restrict connector to run on rootless permissions. Requires Airbyte >= 1.3.0. |
| 1.0.0 | 2024-02-07 | [\#34637](https://github.com/airbytehq/airbyte/pull/34637) | Update the raw table schema |
| 0.2.5 | 2023-06-21 | [\#27555](https://github.com/airbytehq/airbyte/pull/27555) | Reduce image size |
| 0.2.4 | 2023-06-05 | [\#27036](https://github.com/airbytehq/airbyte/pull/27036) | Internal code change for future development (install normalization packages inside connector) |
| 0.2.3 | 2023-04-04 | [\#24604](https://github.com/airbytehq/airbyte/pull/24604) | Support for destination checkpointing |
| 0.2.2 | 2023-02-21 | [\#21509](https://github.com/airbytehq/airbyte/pull/21509) | Compatibility update with security patch for strict encrypt version |
| 0.2.1 | 2022-12-06 | [\#19573](https://github.com/airbytehq/airbyte/pull/19573) | Update dbt version to 1.3.1 |
| 0.2.0 | 2022-09-27 | [\#16970](https://github.com/airbytehq/airbyte/pull/16970) | Remove TCP port from spec parameters |
| 0.1.12 | 2022-09-08 | [\#16444](https://github.com/airbytehq/airbyte/pull/16444) | Added custom jdbc params field |
| 0.1.10 | 2022-07-05 | [\#13639](https://github.com/airbytehq/airbyte/pull/13639) | Change JDBC ClickHouse version into 0.3.2-patch9 |
| 0.1.8 | 2022-07-05 | [\#13516](https://github.com/airbytehq/airbyte/pull/13516) | Added JDBC default parameter socket timeout |
| 0.1.7 | 2022-06-16 | [\#13852](https://github.com/airbytehq/airbyte/pull/13852) | Updated stacktrace format for any trace message errors |
| 0.1.6 | 2022-05-17 | [\#12820](https://github.com/airbytehq/airbyte/pull/12820) | Improved 'check' operation performance |
| 0.1.5 | 2022-04-06 | [\#11729](https://github.com/airbytehq/airbyte/pull/11729) | Bump mina-sshd from 2.7.0 to 2.8.0 |
| 0.1.4 | 2022-02-25 | [\#10421](https://github.com/airbytehq/airbyte/pull/10421) | Refactor JDBC parameters handling |
| 0.1.3 | 2022-02-14 | [\#10256](https://github.com/airbytehq/airbyte/pull/10256) | Add `-XX:+ExitOnOutOfMemoryError` JVM option |
| 0.1.1 | 2021-12-21 | [\#8982](https://github.com/airbytehq/airbyte/pull/8982) | Set isSchemaRequired to false |
| 0.1.0 | 2021-11-04 | [\#7620](https://github.com/airbytehq/airbyte/pull/7620) | Add ClickHouse destination |
| Version | Date | Pull Request | Subject |
|:--------|:-----------|:------------------------------------------------------------|:-------------------------------------------------------------------------------|
| 2.0.0 | 2025-07-10 | [\#62887](https://github.com/airbytehq/airbyte/pull/62887) | Cut 2.0.0 release. Replace existing connector. |
| 0.1.11 | 2025-07-09 | [\#62883](https://github.com/airbytehq/airbyte/pull/62883) | Only set JSON properties on client if enabled to support older CH deployments. |
| 0.1.10 | 2025-07-08 | [\#62861](https://github.com/airbytehq/airbyte/pull/62861) | Set user agent header for internal CH telemetry. |
| 0.1.9 | 2025-07-03 | [\#62509](https://github.com/airbytehq/airbyte/pull/62509) | Simplify union stringification behavior. |
| 0.1.8 | 2025-06-30 | [\#62100](https://github.com/airbytehq/airbyte/pull/62100) | Add JSON support. |
| 0.1.7 | 2025-06-24 | [\#62047](https://github.com/airbytehq/airbyte/pull/62047) | Remove the use of the internal namespace. |
| 0.1.6 | 2025-06-24 | [\#62047](https://github.com/airbytehq/airbyte/pull/62047) | Hide protocol option when running on cloud. |
| 0.1.5 | 2025-06-24 | [\#62043](https://github.com/airbytehq/airbyte/pull/62043) | Expose database protocol config option. |
| 0.1.4 | 2025-06-24 | [\#62040](https://github.com/airbytehq/airbyte/pull/62040) | Checker inserts into configured DB. |
| 0.1.3 | 2025-06-24 | [\#62038](https://github.com/airbytehq/airbyte/pull/62038) | Allow the client to connect to the resolved DB. |
| 0.1.2 | 2025-06-23 | [\#62028](https://github.com/airbytehq/airbyte/pull/62028) | Enable the registry in OSS and cloud. |
| 0.1.1 | 2025-06-23 | [\#62022](https://github.com/airbytehq/airbyte/pull/62022) | Publish first beta version and pin the CDK version. |
| 0.1.0 | 2025-06-23 | [\#62024](https://github.com/airbytehq/airbyte/pull/62024) | Release first beta version. |
</details>