1
0
mirror of synced 2025-12-22 03:21:25 -05:00
Files
airbyte/docs/integrations/destinations/s3-data-lake.md
2025-11-07 09:33:03 -08:00

30 KiB

S3 Data Lake

This page guides you through setting up the S3 Data Lake destination connector.

This connector is Airbyte's official support for the Iceberg protocol on S3. It writes the Iceberg table format to S3 or an S3-compatible storage backend using a supported Iceberg catalog.

Prerequisites

The S3 Data Lake connector requires two things.

  1. An S3 storage bucket or S3-compatible storage backend.

  2. A supported Iceberg catalog. Currently, the connector supports these catalogs:

    • REST
    • AWS Glue
    • Nessie
    • Polaris

Setup guide

Follow these steps to set up your S3 storage and Iceberg catalog permissions.

S3 setup and permissions

S3 setup consists of creating a bucket policy and authenticating.

Create a bucket policy

Create a bucket policy.

  1. Open the IAM console.

  2. In the IAM dashboard, select Policies > Create Policy.

  3. Select the JSON tab and paste the following JSON into the Policy editor. Substitute your own bucket name on the highlighted lines.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:ListAllMyBuckets",
            "s3:GetObject*",
            "s3:PutObject",
            "s3:PutObjectAcl",
            "s3:DeleteObject",
            "s3:ListBucket*"
          ],
          "Resource": [
            // highlight-next-line
            "arn:aws:s3:::YOUR_BUCKET_NAME/*",
            // highlight-next-line
            "arn:aws:s3:::YOUR_BUCKET_NAME"
          ]
        }
      ]
    }
    

    :::note Object-level permissions alone aren't sufficient to authenticate. Include bucket-level permissions as provided in the preceding example. :::

  4. Click Next, give your policy a descriptive name, then click Create policy.

Authenticate

In most cases, you authenticate with an IAM user. If you're using Airbyte Cloud with the Glue catalog, you can authenticate with an IAM role.

Authenticate with an IAM user (Self-Managed or Cloud, with any catalog)

Use an existing or new Access Key ID and Secret Access Key.

  1. In the IAM dashboard, click Users.

  2. If you're using an existing IAM user, select that user, then click Add permissions > Add permission. If you're creating a new user, click Add users.

  3. Click Attach policies directly, then check the box for your policy. Click Next > Add permissions.

  4. Click the Security credentials tab > Create access key. The AWS console prompts you to select a use case and add optional tags to your access key.

  5. Click Create access key. Take note of your keys.

  6. In Airbyte, enter those keys into the Airbyte connector's AWS Access Key ID and AWS Secret Access Key fields.

Authenticate with an IAM role (Cloud with Glue catalog only)

:::note To use S3 authentication with an IAM role, an Airbyte team member must enable it. If you'd like to use this feature, contact the Sales team. :::

  1. In the IAM dashboard, click Roles, then Create role.

  2. Choose the AWS account trusted entity type.

  3. Set up a trust relationship for the role. This allows the Airbyte instance's AWS account to assume this role. You also need to specify an external ID, which is a secret key that the trusting service (Airbyte) and the trusted role (the role you're creating) both know. This ID prevents the "confused deputy" problem. The External ID should be your Airbyte workspace ID, which you can find in the URL of your workspace page. Edit the trust relationship policy to include the external ID:

    {
        "Version": "2012-10-17",
        "Statement": [
            {
                "Effect": "Allow",
                "Principal": {
                    "AWS": "arn:aws:iam::094410056844:user/delegated_access_user"
                },
                "Action": "sts:AssumeRole",
                "Condition": {
                    "StringEquals": {
                        "sts:ExternalId": "{your-airbyte-workspace-id}"
                    }
                }
            }
        ]
    }
    
  4. Complete the role creation and save the Role ARN for later.

  5. Select Attach policies directly, then find and check the box for your new policy. Click Next, then Add permissions.

  6. In Airbyte, select Glue as the catalog and enter the Role ARN into the Role ARN field.

Iceberg catalog setup and permissions

The rest of the setup process differs depending on the catalog you're using.

REST

Enter the URI of your REST catalog. You may also need to enter the default namespace.

AWS Glue

  1. Update your S3 policy, created previously, to grant these Glue permissions.

    {
      "Version": "2012-10-17",
      "Statement": [
        {
          "Effect": "Allow",
          "Action": [
            "s3:ListAllMyBuckets",
            "s3:GetObject*",
            "s3:PutObject",
            "s3:PutObjectAcl",
            "s3:DeleteObject",
            "s3:ListBucket*",
            // highlight-start
            "glue:TagResource",
            "glue:UnTagResource",
            "glue:BatchCreatePartition",
            "glue:BatchDeletePartition",
            "glue:BatchDeleteTable",
            "glue:BatchGetPartition",
            "glue:CreateDatabase",
            "glue:CreateTable",
            "glue:CreatePartition",
            "glue:DeletePartition",
            "glue:DeleteTable",
            "glue:GetDatabase",
            "glue:GetPartition",
            "glue:GetPartitions",
            "glue:GetTable",
            "glue:GetTables",
            "glue:UpdateDatabase",
            "glue:UpdatePartition",
            "glue:UpdateTable"
            // highlight-end
          ],
          "Resource": [
            "arn:aws:s3:::YOUR_BUCKET_NAME/*",
            "arn:aws:s3:::YOUR_BUCKET_NAME"
          ]
        }
      ]
    }
    
  2. Set the warehouse location option to s3://<bucket name>/path/within/bucket.

  3. If you're using Airbyte Cloud and authenticating with an IAM role, set the Role ARN option to the value you noted earlier while setting up authentication on S3.

  4. If you have an existing Glue table, and you want to replace that table with an Airbyte-managed Iceberg table, drop the Glue table. If you don't, you'll encounter the error Input Glue table is not an iceberg table: <your table name>.

    Dropping Glue tables from the console may not immediately delete them. Either wait for AWS to finish their background processing, or use the AWS API to drop all table versions.

  5. If you are using AWS Lake Formation, you must grant some permissions via Lake Formation:

    1. You must grant Data location_access on the S3 path.
    2. If you intend to have the connector create the database(s) on your behalf, you must also grant Create database on the catalog.
    3. (Advanced option) If you want to create the database(s) manually, and have the connector write into only those specific database(s), then you must grant Create table, Describe on the database(s).

Nessie

To authenticate with Nessie, do two things.

  1. Set the URI of your Nessie catalog and an access token to authenticate to that catalog.

  2. Set the Warehouse location option to s3://<bucket name>/path/within/bucket.

Polaris

To authenticate with Apache Polaris, follow these steps.

  1. Set up your Polaris catalog and create a principal with the necessary permissions. Refer to the Apache Polaris documentation for detailed setup instructions.

  2. When creating a principal in Polaris, you'll receive OAuth credentials (Client ID and Client Secret). Keep these credentials secure.

  3. Grant the required privileges to your principal's catalog role. You can either:

    Option A: Grant the broad CATALOG_MANAGE_CONTENT privilege (recommended for simplicity):

    • This single privilege allows the connector to manage tables and namespaces in the catalog

    Option B: Grant specific granular privileges:

    • TABLE_LIST - List tables in a namespace
    • TABLE_CREATE - Create new tables
    • TABLE_DROP - Delete tables
    • TABLE_READ_PROPERTIES - Read table metadata
    • TABLE_WRITE_PROPERTIES - Update table metadata
    • TABLE_WRITE_DATA - Write data to tables
    • NAMESPACE_LIST - List namespaces
    • NAMESPACE_CREATE - Create new namespaces
    • NAMESPACE_READ_PROPERTIES - Read namespace metadata
  4. In the Airbyte connector configuration, provide the following information:

    • Polaris Server URI: The base URL of your Polaris server. For example: http://localhost:8181/api/catalog
    • Catalog Name: The name of the catalog you created in Polaris (e.g., quickstart_catalog)
    • Client ID: The OAuth Client ID provided when creating the principal
    • Client Secret: The OAuth Client Secret provided when creating the principal
    • Default namespace: The namespace to be used for table identifiers when the destination namespace is set to "Destination-defined" or "Source-defined"
  5. Set the Warehouse location option to s3://<bucket name>/path/within/bucket.

  6. Ensure that your Polaris catalog has been configured with the appropriate storage credentials to access your S3 bucket.

Output schema

How Airbyte generates the Iceberg schema

In each stream, Airbyte maps top-level fields to Iceberg fields. Airbyte maps nested fields (objects, arrays, and unions) to string columns and writes them as serialized JSON.

This is the full mapping between Airbyte types and Iceberg types.

Airbyte type Iceberg type
Boolean Boolean
Date Date
Integer Long
Number Double
String String
Time with timezone* Time
Time without timezone Time
Timestamp with timezone* Timestamp with timezone
Timestamp without timezone Timestamp without timezone
Object String (JSON-serialized value)
Array String (JSON-serialized value)
Union String (JSON-serialized value)

*Airbyte converts the time with timezone and timestamp with timezone types to Coordinated Universal Time (UTC) before writing to the Iceberg file.

Managing schema evolution

This connector never rewrites existing Iceberg data files. This means Airbyte can only handle specific source schema changes.

  • Adding or removing a column
  • Widening a column
  • Changing the primary key

You have the following options to manage schema evolution.

  • To handle unsupported schema changes automatically, use Full Refresh - Overwrite as your sync mode.

  • To handle unsupported schema changes as they occur, wait for a sync to fail, then take action to restore it. Either:

    • Manually edit your table schema in Iceberg directly.
    • Refresh your connection in Airbyte.
    • Clear your connection in Airbyte.

Naming

Like most Airbyte destination connectors, the S3 Data Lake connector may modify identifiers (stream name/namespace, column names) for compatibility with the destination.

In particular, when using AWS Glue, the connector will:

Deduplication

This connector uses a merge-on-read strategy to support deduplication.

Assumptions about primary keys

The S3 Data Lake connector assumes that one of two things is true:

  • The source never emits the same primary key twice in a single sync attempt.
  • If the source emits the same primary key multiple times in a single attempt, it always emits those records in cursor order from oldest to newest.

If these conditions aren't met, you may see inaccurate data in Iceberg in the form of older records taking precedence over newer records. If this happens, use append or overwrite as your sync modes.

An unknown number of API sources have streams that don't meet these conditions. Airbyte knows Stripe and Monday don't, but there are probably others.

Branching and data availability

Iceberg supports Git-like semantics over your data. This connector leverages those semantics to provide resilient syncs.

  • In each sync, each microbatch creates a new snapshot.

  • During truncate syncs, the connector writes the refreshed data to the airbyte_staging branch and replaces the main branch with the airbyte_staging at the end of the sync. Since most query engines target the main branch, people can query your data until the end of a truncate sync, at which point it's atomically swapped to the new version.

Branch replacement

At the end of stream sync, we replace the current main branch with the airbyte_staging branch we were working on. We intentionally avoid fast-forwarding to better handle potential compaction issues. Important Warning: Any changes made to the main branch outside of Airbyte's operations after a sync begins will be lost during this process.

Compaction

:::caution Do not run compaction during a truncate refresh sync to prevent data loss. During a truncate refresh sync, the system deletes all files that don't belong to the latest generation. This includes:

  • Files without generation IDs (compacted files)
  • Files from previous generations

If compaction runs simultaneously with the sync, it will delete files from the current generation, causing data loss. The system identifies generations by parsing file names for generation IDs. :::

Considerations and limitations

This section documents known considerations and limitations about how this Iceberg destination interacts with other products.

Snowflake

Airbyte uses Iceberg row-level deletes to mark older versions of records as outdated. If you're using Iceberg tables for Snowflake, Snowflake doesn't recognize native Iceberg row-level deletes for Iceberg tables with external catalogs like Glue (see Snowflake's docs). As a result, your query results return all versions of a record.

For example, the following table contains three versions of the 'Alice' record.

id name updated_at _airbyte_extracted_at
1 Alice 2024-03-01 10:00 2024-03-01 10:10
1 Alice 2024-03-02 12:00 2024-03-02 12:10
1 Alice 2024-03-03 14:00 2024-03-03 14:10

To mitigate this, generate a flag to detect outdated records. Airbyte generates an airbyte_extracted_at metadata field that assists with this.

row_number() over (partition by {primary_key} order by {cursor}, _airbyte_extracted_at)) != 1 OR _ab_cdc_deleted_at IS NOT NULL as is_outdated;

Now, you can identify the latest version of the 'Alice' record by querying whether is_outdated is false.

id name updated_at _airbyte_extracted_at row_number is_outdated
1 Alice 2024-03-01 10:00 2024-03-01 10:10 3 True
1 Alice 2024-03-02 12:00 2024-03-02 12:10 2 True
1 Alice 2024-03-03 14:00 2024-03-03 14:10 1 False

Changelog

Expand to review
Version Date Pull Request Subject
0.3.41 2025-11-06 69232 Upgrade to Bulk CDK 0.1.69. Changes to handle changes in commit patterns
0.3.40 2025-11-01 69133 Upgrade to Bulk CDK 0.1.61.
0.3.39 2025-10-16 68108 Implement Polaris support
0.3.38 2025-10-07 67005 Fix: Treat empty string role_arn as null to prevent misleading config errors
0.3.37 2025-10-07 67150 Fix check operation to use unique table names, preventing conflicts with stale metadata and concurrent operations
0.3.36 2025-09-25 66711 CHECK operation uses configured default dataset instead of airbyte_test_namespace
0.3.35 2025-07-23 63746 Remove unnecessary properties from table
0.3.34 2025-07-11 62952 Update CDK version
0.3.33 2025-07-09 62888 Update CDK version to handle compaction issue when deleting files in a truncate refresh scenario
0.3.32 2025-07-08 62852 Fix metadata (revert accidental archiving)
0.3.31 2025-07-07 62835 Pin to latest CDK version 0.522
0.3.30 2025-06-26 62105 ReplaceBranch to staging from main instead of fast forwarding
0.3.29 2025-06-13 61588 Publish version to account for possible duplicate publishing in pipeline. Noop change. WARNING: THIS HAS A BUG. DO NOT USE.
0.3.28 2025-05-07 59710 CDK backpressure bugfix
0.3.27 2025-04-21 58146 Upgrade to latest CDK
0.3.26 2025-04-17 58104 Chore: Now passing a string around for the region
0.3.25 2025-04-16 58085 Internal refactoring
0.3.24 2025-03-27 56435 Bug fix: Correctly handle non-positive numbers.
0.3.23 2025-03-25 56395 Bug fix: Correctly coerce values inside nested arrays.
0.3.22 2025-03-24 56355 Upgrade to airbyte/java-connector-base:2.0.1 to be M4 compatible.
0.3.21 2025-03-22 #56347 Bugfix: stream start does not always await iceberg setup
0.3.20 2025-03-24 #55849 Internal refactoring
0.3.19 2025-03-19 #55798 CDK: Typing improvements
0.3.18 2025-03-18 #55811 CDK: Pass DestinationStream around vs Descriptor
0.3.17 2025-03-13 #55737 CDK: Pass DestinationRecordRaw around instead of DestinationRecordAirbyteValue
0.3.16 2025-03-13 #55755 Exclude number fields from identifier fields
0.3.15 2025-02-28 #54724 Certify connector
0.3.14 2025-02-14 #53241 New CDK interface; perf improvements, skip initial record staging
0.3.13 2025-02-14 #53697 Internal refactor
0.3.12 2025-02-12 #53170 Improve documentation, tweak error handling of invalid schema evolution
0.3.11 2025-02-12 #53216 Support arbitrary schema change in overwrite / truncate refresh / clear sync
0.3.10 2025-02-11 #53622 Enable the Nessie integration tests
0.3.9 2025-02-10 #53165 Very basic usability improvements and documentation
0.3.8 2025-02-10 #52666 Change the chunk size to 1.5Gb
0.3.7 2025-02-07 #53141 Adding integration tests around the Rest catalog
0.3.6 2025-02-06 #53172 Internal refactor
0.3.5 2025-02-06 #53164 Improve error message on null primary key in dedup mode
0.3.4 2025-02-05 #53173 Tweak spec wording
0.3.3 2025-02-05 #53176 Fix time_with_timezone handling (values are now adjusted to UTC)
0.3.2 2025-02-04 #52690 Handle special characters in stream name/namespace when using AWS Glue
0.3.1 2025-02-03 #52633 Fix dedup
0.3.0 2025-01-31 #52639 Make the database/namespace a required field
0.2.23 2025-01-27 #51600 Internal refactor
0.2.22 2025-01-22 #52081 Implement support for REST catalog
0.2.21 2025-01-27 #52564 Fix crash on stream with 0 records
0.2.20 2025-01-23 #52068 Add support for default namespace (/database name)
0.2.19 2025-01-16 #51595 Clarifications in connector config options
0.2.18 2025-01-15 #51042 Write structs as JSON strings instead of Iceberg structs.
0.2.17 2025-01-14 #51542 New identifier fields should be marked as required.
0.2.16 2025-01-14 #51538 Update identifier fields if incoming fields are different than existing ones
0.2.15 2025-01-14 #51530 Set AWS region for S3 bucket for nessie catalog
0.2.14 2025-01-14 #50413 Update existing table schema based on the incoming schema
0.2.13 2025-01-14 #50412 Implement logic to determine super types between iceberg types
0.2.12 2025-01-10 #50876 Add support for AWS instance profile auth
0.2.11 2025-01-10 #50971 Internal refactor in AWS auth flow
0.2.10 2025-01-09 #50400 Add S3DataLakeTypesComparator
0.2.9 2025-01-09 #51022 Rename all classes and files from Iceberg V2
0.2.8 2025-01-09 #51012 Rename/Cleanup package from Iceberg V2
0.2.7 2025-01-09 #50957 Add support for GLUE RBAC (Assume role)
0.2.6 2025-01-08 #50991 Initial public release.