1
0
mirror of synced 2025-12-25 02:09:19 -05:00

Source Apify Dataset: Migrate Python CDK to Low-code CDK (#29859)

Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
This commit is contained in:
btkcodedev
2023-08-30 21:42:00 +05:30
committed by GitHub
parent 19a65bf39d
commit 05b7d01801
28 changed files with 490 additions and 340 deletions

View File

@@ -1,6 +1,5 @@
*
!Dockerfile
!Dockerfile.test
!main.py
!source_apify_dataset
!setup.py

View File

@@ -1,16 +1,38 @@
FROM python:3.9-slim
# Bash is installed for more convenient debugging.
RUN apt-get update && apt-get install -y bash && rm -rf /var/lib/apt/lists/*
FROM python:3.9.11-alpine3.15 as base
# build and load all requirements
FROM base as builder
WORKDIR /airbyte/integration_code
COPY source_apify_dataset ./source_apify_dataset
COPY main.py ./
# upgrade pip to the latest version
RUN apk --no-cache upgrade \
&& pip install --upgrade pip \
&& apk --no-cache add tzdata build-base
COPY setup.py ./
RUN pip install .
# install necessary packages to a temporary folder
RUN pip install --prefix=/install .
# build a clean environment
FROM base
WORKDIR /airbyte/integration_code
# copy all loaded and built libraries to a pure basic image
COPY --from=builder /install /usr/local
# add default timezone settings
COPY --from=builder /usr/share/zoneinfo/Etc/UTC /etc/localtime
RUN echo "Etc/UTC" > /etc/timezone
# bash is installed for more convenient debugging.
RUN apk --no-cache add bash
# copy payload code only
COPY main.py ./
COPY source_apify_dataset ./source_apify_dataset
ENV AIRBYTE_ENTRYPOINT "python /airbyte/integration_code/main.py"
ENTRYPOINT ["python", "/airbyte/integration_code/main.py"]
LABEL io.airbyte.version=0.2.0
LABEL io.airbyte.version=1.0.0
LABEL io.airbyte.name=airbyte/source-apify-dataset

View File

@@ -1,63 +1,27 @@
# Apify Dataset Source
This is the repository for the Apify Dataset source connector, written in Python.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.io/integrations/sources/apify-dataset).
# About connector
This connector allows you to download data from Apify [dataset](https://docs.apify.com/storage/dataset) to Airbyte. All you need
is Apify dataset ID.
This is the repository for the Apify Dataset configuration based source connector.
For information about how to use this connector within Airbyte, see [the documentation](https://docs.airbyte.com/integrations/sources/apify-dataset).
## Local development
### Prerequisites
**To iterate on this connector, make sure to complete this prerequisites section.**
#### Minimum Python version required `= 3.7.0`
#### Build & Activate Virtual Environment and install dependencies
From this connector directory, create a virtual environment:
```
python -m venv .venv
```
This will generate a virtualenv for this module in `.venv/`. Make sure this venv is active in your
development environment of choice. To activate it from the terminal, run:
```
source .venv/bin/activate
pip install -r requirements.txt
```
If you are in an IDE, follow your IDE's instructions to activate the virtualenv.
Note that while we are installing dependencies from `requirements.txt`, you should only edit `setup.py` for your dependencies. `requirements.txt` is
used for editable installs (`pip install -e`) to pull in Python dependencies from the monorepo and will call `setup.py`.
If this is mumbo jumbo to you, don't worry about it, just put your deps in `setup.py` but install using `pip install -r requirements.txt` and everything
should work as you expect.
#### Building via Gradle
From the Airbyte repository root, run:
You can also build the connector in Gradle. This is typically used in CI and not needed for your development workflow.
To build using Gradle, from the Airbyte repository root, run:
```
./gradlew :airbyte-integrations:connectors:source-apify-dataset:build
```
#### Create credentials
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.io/integrations/sources/apify-dataset)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_apify_dataset/spec.json` file.
Note that the `secrets` directory is gitignored by default, so there is no danger of accidentally checking in sensitive information.
**If you are a community contributor**, follow the instructions in the [documentation](https://docs.airbyte.com/integrations/sources/apify-dataset)
to generate the necessary credentials. Then create a file `secrets/config.json` conforming to the `source_apify_dataset/spec.yaml` file.
Note that any directory named `secrets` is gitignored across the entire Airbyte repo, so there is no danger of accidentally checking in sensitive information.
See `integration_tests/sample_config.json` for a sample config file.
You can get your Apify credentials from Settings > Integration [section](https://my.apify.com/account#/integrations) of the Apify app
**If you are an Airbyte core member**, copy the credentials in Lastpass under the secret name `source apify-dataset test creds`
and place them into `secrets/config.json`.
### Locally running the connector
```
python main.py spec
python main.py check --config secrets/config.json
python main.py discover --config secrets/config.json
python main.py read --config secrets/config.json --catalog integration_tests/configured_catalog.json
```
### Locally running the connector docker image
#### Build
@@ -82,32 +46,15 @@ docker run --rm -v $(pwd)/secrets:/secrets airbyte/source-apify-dataset:dev disc
docker run --rm -v $(pwd)/secrets:/secrets -v $(pwd)/integration_tests:/integration_tests airbyte/source-apify-dataset:dev read --config /secrets/config.json --catalog /integration_tests/configured_catalog.json
```
## Testing
Make sure to familiarize yourself with [pytest test discovery](https://docs.pytest.org/en/latest/goodpractices.html#test-discovery) to know how your test files and methods should be named.
First install test dependencies into your virtual environment:
```
pip install .[tests]
```
### Unit Tests
To run unit tests locally, from the connector directory run:
```
python -m pytest unit_tests
```
### Integration Tests
There are two types of integration tests: Acceptance Tests (Airbyte's test suite for all source connectors) and custom integration tests (which are specific to this connector).
#### Custom Integration tests
Place custom tests inside `integration_tests/` folder, then, from the connector root, run
```
python -m pytest integration_tests
```
#### Acceptance Tests
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.io/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
Customize `acceptance-test-config.yml` file to configure tests. See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference) for more information.
If your connector requires to create or destroy resources for use during acceptance tests create fixtures for it and place them inside integration_tests/acceptance.py.
To run your integration tests with acceptance tests, from the connector root, run
To run your integration tests with Docker, run:
```
python -m pytest integration_tests -p integration_tests.acceptance
./acceptance-test-docker.sh
```
To run your integration tests with docker
### Using gradle to run tests
All commands should be run from airbyte project root.

View File

@@ -1,7 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
def test_example_method():
assert True

View File

@@ -1,19 +1,44 @@
# See [Connector Acceptance Tests](https://docs.airbyte.com/connector-development/testing-connectors/connector-acceptance-tests-reference)
# for more information about how to configure these tests
connector_image: airbyte/source-apify-dataset:dev
tests:
acceptance_tests:
spec:
- spec_path: "source_apify_dataset/spec.json"
tests:
- spec_path: "source_apify_dataset/spec.yaml"
backward_compatibility_tests_config:
disable_for_version: 0.2.0
connection:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
tests:
- config_path: "secrets/config.json"
status: "succeed"
- config_path: "integration_tests/invalid_config.json"
status: "failed"
discovery:
- config_path: "secrets/config.json"
tests:
- config_path: "secrets/config.json"
backward_compatibility_tests_config:
disable_for_version: 0.2.0
basic_read:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
incremental:
bypass_reason: Connector doesn't use incremental sync
# tests:
# - config_path: "secrets/config.json"
# configured_catalog_path: "integration_tests/configured_catalog.json"
# future_state:
# future_state_path: "integration_tests/abnormal_state.json"
full_refresh:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
tests:
- config_path: "secrets/config.json"
configured_catalog_path: "integration_tests/configured_catalog.json"
ignored_fields:
datasets:
- name: "accessedAt"
bypass_reason: "Change everytime"
- name: "stats/readCount"
bypass_reason: "Change everytime"
dataset:
- name: "accessedAt"
bypass_reason: "Change everytime"
- name: "stats/readCount"
bypass_reason: "Change everytime"

View File

@@ -1,2 +1,3 @@
#!/usr/bin/env sh
source "$(git rev-parse --show-toplevel)/airbyte-integrations/bases/connector-acceptance-test/acceptance-test-docker.sh"

View File

@@ -0,0 +1,3 @@
#
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#

View File

@@ -0,0 +1,16 @@
[
{
"type": "STREAM",
"stream": {
"stream_state": { "modifiedAt": "3021-09-08T07:04:28.000Z" },
"stream_descriptor": { "name": "dataset" }
}
},
{
"type": "STREAM",
"stream": {
"stream_state": { "modifiedAt": "3021-09-08T07:04:28.000Z" },
"stream_descriptor": { "name": "datasets" }
}
}
]

View File

@@ -11,4 +11,6 @@ pytest_plugins = ("connector_acceptance_test.plugin",)
@pytest.fixture(scope="session", autouse=True)
def connector_setup():
"""This fixture is a placeholder for external resources that acceptance test might require."""
# TODO: setup test dependencies if needed. otherwise remove the TODO comments
yield
# TODO: clean up test dependencies

View File

@@ -1,20 +0,0 @@
{
"streams": [
{
"name": "DatasetItems",
"supported_sync_modes": ["full_refresh"],
"destination_sync_mode": "overwrite",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"data": {
"type": "object",
"additionalProperties": true
}
},
"additionalProperties": true
}
}
]
}

View File

@@ -1,24 +1,31 @@
{
"streams": [
{
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite",
"stream": {
"name": "DatasetItems",
"supported_sync_modes": ["full_refresh"],
"destination_sync_mode": "overwrite",
"json_schema": {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"data": {
"type": "object",
"additionalProperties": true
}
},
"additionalProperties": true
}
}
"name": "datasets",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "dataset",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
},
{
"stream": {
"name": "item_collection",
"json_schema": {},
"supported_sync_modes": ["full_refresh"]
},
"sync_mode": "full_refresh",
"destination_sync_mode": "overwrite"
}
]
}

View File

@@ -0,0 +1,2 @@
{"stream": "datasets", "data": {"id":"Mxnvcv4Rspg9P9aP0","name":"my-dataset-name","userId":"YnGtyk7naKpwpousW","username":"encouraging_cliff","createdAt":"2023-08-25T19:19:33.588Z","modifiedAt":"2023-08-25T19:19:33.588Z","accessedAt":"2023-08-25T19:19:43.646Z","itemCount":0,"cleanItemCount":0,"actId":null,"actRunId":null,"schema":null,"stats":{"inflatedBytes":0,"readCount":0,"writeCount":0}}, "emitted_at": 1692990238010}
{"stream": "dataset", "data": {"id":"Mxnvcv4Rspg9P9aP0","name":"my-dataset-name","userId":"YnGtyk7naKpwpousW","createdAt":"2023-08-25T19:19:33.588Z","modifiedAt":"2023-08-25T19:19:33.588Z","accessedAt":"2023-08-25T19:19:43.646Z","itemCount":0,"cleanItemCount":0,"actId":null,"actRunId":null,"schema":null,"stats":{"readCount":0,"writeCount":0,"storageBytes":0},"fields":[]}, "emitted_at": 1692990238010}

View File

@@ -1,4 +1,4 @@
{
"datasetId": "non_existent_dataset_id",
"clean": false
"token": "abc",
"start_date": "2099-08-25T00:00:59.244Z"
}

View File

@@ -0,0 +1,4 @@
{
"token": "apify_api_XXXXXXXXXXXXXXXXXXXX",
"start_date": "2023-08-25T00:00:59.244Z"
}

View File

@@ -0,0 +1,9 @@
[
{
"type": "STREAM",
"stream": {
"stream_state": { "modifiedAt": "3021-09-08T07:04:28.000Z" },
"stream_descriptor": { "name": "example" }
}
}
]

View File

@@ -1,24 +1,30 @@
data:
allowedHosts:
hosts:
- api.apify.com
registries:
oss:
enabled: true
cloud:
enabled: true
connectorSubtype: api
connectorType: source
definitionId: 47f17145-fe20-4ef5-a548-e29b048adf84
dockerImageTag: 0.2.0
dockerImageTag: 1.0.0
dockerRepository: airbyte/source-apify-dataset
githubIssueLabel: source-apify-dataset
icon: apify.svg
icon: apify-dataset.svg
license: MIT
name: Apify Dataset
registries:
cloud:
enabled: true
oss:
enabled: true
releaseDate: 2023-08-25
releaseStage: alpha
releases:
breakingChanges:
1.0.0:
upgradeDeadline: 2023-08-30
message: "Update spec to use token and ingest all 3 streams correctly"
supportLevel: community
documentationUrl: https://docs.airbyte.com/integrations/sources/apify-dataset
tags:
- language:python
ab_internal:
sl: 100
ql: 100
supportLevel: community
- language:lowcode
metadataSpecVersion: "1.0"

View File

@@ -1,2 +1,2 @@
# This file is autogenerated -- only edit if you know what you are doing. Use setup.py for declaring dependencies.
-e ../../bases/connector-acceptance-test
-e .

View File

@@ -5,13 +5,9 @@
from setuptools import find_packages, setup
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1", "apify-client~=0.0.1"]
MAIN_REQUIREMENTS = ["airbyte-cdk~=0.1"]
TEST_REQUIREMENTS = [
"requests-mock~=1.9.3",
"pytest-mock~=3.6.1",
"pytest~=6.1",
]
TEST_REQUIREMENTS = ["requests-mock~=1.9.3", "pytest~=6.2", "pytest-mock~=3.6.1"]
setup(
name="source_apify_dataset",
@@ -20,7 +16,7 @@ setup(
author_email="contact@airbyte.io",
packages=find_packages(),
install_requires=MAIN_REQUIREMENTS,
package_data={"": ["*.json"]},
package_data={"": ["*.json", "*.yaml", "schemas/*.json", "schemas/shared/*.json"]},
extras_require={
"tests": TEST_REQUIREMENTS,
},

View File

@@ -1,24 +1,6 @@
# MIT License
#
# Copyright (c) 2020 Airbyte
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
from .source import SourceApifyDataset

View File

@@ -0,0 +1,109 @@
version: "0.29.0"
definitions:
selector:
type: RecordSelector
extractor:
type: DpathExtractor
field_path: ["data"]
requester:
type: HttpRequester
url_base: "https://api.apify.com/v2/"
http_method: "GET"
authenticator:
type: NoAuth
request_parameters:
token: "{{ config['token'] }}"
retriever:
type: SimpleRetriever
record_selector:
$ref: "#/definitions/selector"
paginator:
type: "NoPagination"
requester:
$ref: "#/definitions/requester"
base_paginator:
type: "DefaultPaginator"
page_size_option:
type: "RequestOption"
inject_into: "request_parameter"
field_name: "limit"
pagination_strategy:
type: "OffsetIncrement"
page_size: 50
page_token_option:
type: "RequestOption"
field_name: "offset"
inject_into: "request_parameter"
base_stream:
type: DeclarativeStream
retriever:
$ref: "#/definitions/retriever"
datasets_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "datasets"
primary_key: "id"
path: "datasets"
retriever:
$ref: "#/definitions/retriever"
paginator:
$ref: "#/definitions/base_paginator"
record_selector:
$ref: "#/definitions/selector"
extractor:
type: DpathExtractor
field_path: ["data", "items"]
datasets_partition_router:
type: SubstreamPartitionRouter
parent_stream_configs:
- stream: "#/definitions/datasets_stream"
parent_key: "id"
partition_field: "parent_id"
dataset_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "dataset"
primary_key: "id"
path: "datasets/{{ stream_partition.parent_id }}"
retriever:
$ref: "#/definitions/retriever"
paginator:
$ref: "#/definitions/base_paginator"
partition_router:
$ref: "#/definitions/datasets_partition_router"
item_collection_stream:
$ref: "#/definitions/base_stream"
$parameters:
name: "item_collection"
path: "datasets/{{ stream_partition.parent_id }}/items"
retriever:
$ref: "#/definitions/retriever"
paginator:
$ref: "#/definitions/base_paginator"
record_selector:
$ref: "#/definitions/selector"
extractor:
type: DpathExtractor
field_path: []
partition_router:
$ref: "#/definitions/datasets_partition_router"
streams:
- "#/definitions/datasets_stream"
- "#/definitions/dataset_stream"
- "#/definitions/item_collection_stream"
check:
type: CheckStream
stream_names:
- "datasets"
- "dataset"
- "item_collection"

View File

@@ -0,0 +1,65 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Individual datasets schema",
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
},
"userId": {
"type": ["null", "string"]
},
"createdAt": {
"type": ["null", "string"]
},
"stats": {
"type": ["null", "object"],
"properties": {
"readCount": {
"type": ["null", "number"]
},
"storageBytes": {
"type": ["null", "number"]
},
"writeCount": {
"type": ["null", "number"]
}
}
},
"schema": {
"type": ["null", "string"]
},
"modifiedAt": {
"type": ["null", "string"]
},
"accessedAt": {
"type": ["null", "string"]
},
"itemCount": {
"type": ["null", "number"]
},
"cleanItemCount": {
"type": ["null", "number"]
},
"actId": {
"type": ["null", "string"]
},
"actRunId": {
"type": ["null", "string"]
},
"fields": {
"anyOf": [
{
"type": "null"
},
{
"type": "array"
}
]
}
}
}

View File

@@ -0,0 +1,68 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Collection of datasets schema",
"type": ["null", "object"],
"additionalProperties": true,
"properties": {
"id": {
"type": ["null", "string"]
},
"name": {
"type": ["null", "string"]
},
"userId": {
"type": ["null", "string"]
},
"createdAt": {
"type": ["null", "string"]
},
"modifiedAt": {
"type": ["null", "string"]
},
"accessedAt": {
"type": ["null", "string"]
},
"itemCount": {
"type": ["null", "number"]
},
"username": {
"type": ["null", "string"]
},
"stats": {
"type": ["null", "object"],
"properties": {
"readCount": {
"type": ["null", "number"]
},
"storageBytes": {
"type": ["null", "number"]
},
"writeCount": {
"type": ["null", "number"]
}
}
},
"schema": {
"type": ["null", "string"]
},
"cleanItemCount": {
"type": ["null", "number"]
},
"actId": {
"type": ["null", "string"]
},
"actRunId": {
"type": ["null", "string"]
},
"fields": {
"anyOf": [
{
"type": "null"
},
{
"type": "array"
}
]
}
}
}

View File

@@ -0,0 +1,21 @@
{
"$schema": "http://json-schema.org/draft-07/schema#",
"type": ["null", "object"],
"title": "Item collection schema",
"additionalProperties": true,
"properties": {
"url": {
"type": ["null", "string"]
},
"#debug": {
"type": ["null", "object"],
"additionalProperties": true
},
"pageTitle": {
"type": ["null", "string"]
},
"#error": {
"type": ["null", "boolean"]
}
}
}

View File

@@ -2,143 +2,17 @@
# Copyright (c) 2023 Airbyte, Inc., all rights reserved.
#
import concurrent.futures
import json
from datetime import datetime
from functools import partial
from typing import Dict, Generator
from airbyte_cdk.sources.declarative.yaml_declarative_source import YamlDeclarativeSource
from airbyte_cdk.logger import AirbyteLogger
from airbyte_cdk.models import (
AirbyteCatalog,
AirbyteConnectionStatus,
AirbyteMessage,
AirbyteRecordMessage,
AirbyteStream,
ConfiguredAirbyteCatalog,
Status,
Type,
)
from airbyte_cdk.models.airbyte_protocol import SyncMode
from airbyte_cdk.sources import Source
from apify_client import ApifyClient
"""
This file provides the necessary constructs to interpret a provided declarative YAML configuration file into
source connector.
DATASET_ITEMS_STREAM_NAME = "DatasetItems"
# Batch size for downloading dataset items from Apify dataset
BATCH_SIZE = 50000
WARNING: Do not modify this file.
"""
class SourceApifyDataset(Source):
def _apify_get_dataset_items(self, dataset_client, clean, offset):
"""
Wrapper around Apify dataset client that returns a single page with dataset items.
This function needs to be defined explicitly so it can be called in parallel in the main read function.
:param dataset_client: Apify dataset client
:param clean: whether to fetch only clean items (clean are non-empty ones excluding hidden columns)
:param offset: page offset
:return: dictionary where .items field contains the fetched dataset items
"""
return dataset_client.list_items(offset=offset, limit=BATCH_SIZE, clean=clean)
def check(self, logger: AirbyteLogger, config: json) -> AirbyteConnectionStatus:
"""
Tests if the input configuration can be used to successfully connect to the Apify integration.
This is tested by trying to access the Apify user object with the provided userId and Apify token.
:param logger: Logging object to display debug/info/error to the logs
(logs will not be accessible via airbyte UI if they are not passed to this logger)
:param config: Json object containing the configuration of this source, content of this json is as specified in
the properties of the spec.json file
:return: AirbyteConnectionStatus indicating a Success or Failure
"""
try:
dataset_id = config["datasetId"]
dataset = ApifyClient().dataset(dataset_id).get()
if dataset is None:
raise ValueError(f"Dataset {dataset_id} does not exist")
except Exception as e:
return AirbyteConnectionStatus(status=Status.FAILED, message=f"An exception occurred: {str(e)}")
else:
return AirbyteConnectionStatus(status=Status.SUCCEEDED)
def discover(self, logger: AirbyteLogger, config: json) -> AirbyteCatalog:
"""
Returns an AirbyteCatalog representing the available streams and fields in this integration.
For example, given valid credentials to a Postgres database,
returns an Airbyte catalog where each postgres table is a stream, and each table column is a field.
:param logger: Logging object to display debug/info/error to the logs
(logs will not be accessible via airbyte UI if they are not passed to this logger)
:param config: Json object containing the configuration of this source, content of this json is as specified in
the properties of the spec.json file
:return: AirbyteCatalog is an object describing a list of all available streams in this source.
A stream is an AirbyteStream object that includes:
- its stream name (or table name in the case of Postgres)
- json_schema providing the specifications of expected schema for this stream (a list of columns described
by their names and types)
"""
stream_name = DATASET_ITEMS_STREAM_NAME
json_schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"data": {
"type": "object",
"additionalProperties": True,
}
},
}
return AirbyteCatalog(
streams=[AirbyteStream(name=stream_name, supported_sync_modes=[SyncMode.full_refresh], json_schema=json_schema)]
)
def read(
self, logger: AirbyteLogger, config: json, catalog: ConfiguredAirbyteCatalog, state: Dict[str, any]
) -> Generator[AirbyteMessage, None, None]:
"""
Returns a generator of the AirbyteMessages generated by reading the source with the given configuration,
catalog, and state.
:param logger: Logging object to display debug/info/error to the logs
(logs will not be accessible via airbyte UI if they are not passed to this logger)
:param config: Json object containing the configuration of this source, content of this json is as specified in
the properties of the spec.json file
:param catalog: The input catalog is a ConfiguredAirbyteCatalog which is almost the same as AirbyteCatalog
returned by discover(), but
in addition, it's been configured in the UI! For each particular stream and field, there may have been provided
with extra modifications such as: filtering streams and/or columns out, renaming some entities, etc
:param state: When a Airbyte reads data from a source, it might need to keep a checkpoint cursor to resume
replication in the future from that saved checkpoint.
This is the object that is provided with state from previous runs and avoid replicating the entire set of
data everytime.
:return: A generator that produces a stream of AirbyteRecordMessage contained in AirbyteMessage object.
"""
logger.info("Reading data from Apify dataset")
dataset_id = config["datasetId"]
clean = config.get("clean", False)
client = ApifyClient()
dataset_client = client.dataset(dataset_id)
# Get total number of items in dataset. This will be used in pagination
dataset = dataset_client.get()
num_items = dataset["itemCount"]
with concurrent.futures.ThreadPoolExecutor() as executor:
for result in executor.map(partial(self._apify_get_dataset_items, dataset_client, clean), range(0, num_items, BATCH_SIZE)):
for data in result.items:
yield AirbyteMessage(
type=Type.RECORD,
record=AirbyteRecordMessage(
stream=DATASET_ITEMS_STREAM_NAME, data={"data": data}, emitted_at=int(datetime.now().timestamp()) * 1000
),
)
# Declarative Source
class SourceApifyDataset(YamlDeclarativeSource):
def __init__(self):
super().__init__(**{"path_to_yaml": "manifest.yaml"})

View File

@@ -1,22 +0,0 @@
{
"documentationUrl": "https://docs.airbyte.com/integrations/sources/apify-dataset",
"connectionSpecification": {
"$schema": "http://json-schema.org/draft-07/schema#",
"title": "Apify Dataset Spec",
"type": "object",
"required": ["datasetId"],
"additionalProperties": true,
"properties": {
"datasetId": {
"type": "string",
"title": "Dataset ID",
"description": "ID of the dataset you would like to load to Airbyte."
},
"clean": {
"type": "boolean",
"title": "Clean",
"description": "If set to true, only clean items will be downloaded from the dataset. See description of what clean means in <a href=\"https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items\">Apify API docs</a>. If not sure, set clean to false."
}
}
}
}

View File

@@ -0,0 +1,30 @@
documentationUrl: https://docs.airbyte.com/integrations/sources/apify-dataset
connectionSpecification:
$schema: http://json-schema.org/draft-07/schema#
title: Apify Dataset Spec
type: object
required:
- token
additionalProperties: true
properties:
token:
title: Personal API tokens
description: >-
Your application's Client Secret. You can find this value on the <a
href="https://console.apify.com/account/integrations">console integrations tab</a>
after you login.
type: string
examples:
- "Personal API tokens"
airbyte_secret: true
datasetId:
type: string
title: Dataset ID
description: ID of the dataset you would like to load to Airbyte.
clean:
type: boolean
title: Clean
description:
If set to true, only clean items will be downloaded from the dataset.
See description of what clean means in <a href="https://docs.apify.com/api/v2#/reference/datasets/item-collection/get-items">Apify
API docs</a>. If not sure, set clean to false.

View File

@@ -0,0 +1,6 @@
# Apify Dataset Migration Guide
## Upgrading to 1.0.0
A major update fixing the data ingestion to retrieve properly data from Apify.
Please update your connector configuration setup.

View File

@@ -24,10 +24,10 @@ Since the dataset items do not have strongly typed schema, they are synced as ob
### Features
| Feature | Supported? |
| :--- | :--- |
| Full Refresh Sync | Yes |
| Incremental Sync | No |
| Feature | Supported? |
| :------------------------ | :--------------- |
| Full Refresh Sync | Yes |
| Incremental Sync | Yes |
### Performance considerations
@@ -37,16 +37,18 @@ The Apify dataset connector uses [Apify Python Client](https://docs.apify.com/ap
### Requirements
* Apify [dataset](https://docs.apify.com/storage/dataset) ID
* Apify [token](https://console.apify.com/account/integrations) token
* Parameter clean: true or false
### Changelog
| Version | Date | Pull Request | Subject |
| :--- | :--- | :--- | :--- |
| 0.2.0 | 2022-06-20 | [28290](https://github.com/airbytehq/airbyte/pull/28290) | Make connector work with platform changes not syncing empty stream schemas. |
| 0.1.11 | 2022-04-27 | [12397](https://github.com/airbytehq/airbyte/pull/12397) | No changes. Used connector to test publish workflow changes. |
| 0.1.9 | 2022-04-05 | [PR\#11712](https://github.com/airbytehq/airbyte/pull/11712) | No changes from 0.1.4. Used connector to test publish workflow changes. |
| 0.1.4 | 2021-12-23 | [PR\#8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.2 | 2021-11-08 | [PR\#7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
| 0.1.0 | 2021-07-29 | [PR\#5069](https://github.com/airbytehq/airbyte/pull/5069) | Initial version of the connector |
| Version | Date | Pull Request | Subject |
| :-------- | :---------- | :------------------------------------------------------------ | :-------------------------------------------------------------------------- |
| 1.0.0 | 2023-08-25 | [29859](https://github.com/airbytehq/airbyte/pull/29859) | Migrate to lowcode |
| 0.2.0 | 2022-06-20 | [28290](https://github.com/airbytehq/airbyte/pull/28290) | Make connector work with platform changes not syncing empty stream schemas. |
| 0.1.11 | 2022-04-27 | [12397](https://github.com/airbytehq/airbyte/pull/12397) | No changes. Used connector to test publish workflow changes. |
| 0.1.9 | 2022-04-05 | [PR\#11712](https://github.com/airbytehq/airbyte/pull/11712) | No changes from 0.1.4. Used connector to test publish workflow changes. |
| 0.1.4 | 2021-12-23 | [PR\#8434](https://github.com/airbytehq/airbyte/pull/8434) | Update fields in source-connectors specifications |
| 0.1.2 | 2021-11-08 | [PR\#7499](https://github.com/airbytehq/airbyte/pull/7499) | Remove base-python dependencies |
| 0.1.0 | 2021-07-29 | [PR\#5069](https://github.com/airbytehq/airbyte/pull/5069) | Initial version of the connector |