1
0
mirror of synced 2026-01-07 00:05:48 -05:00
Files
airbyte/airbyte-integrations/connectors/source-clockify/source_clockify/source.py
Ahmed Mousa f38df6619a 🎉 New Connector: Clockify [python cdk] (#17767)
* new connector source-clockify

* feat: enable caching for streams (users, projects)

* chore: pep8 changes at unit_tests

* chore: pep8 changes at integration_tests

* chore: update schema

* Adds python formatting, removes unused import.

* Makes the task duration field nullable to pass integration test.

* fix: add second type to null values to the schema files

* Adds a null fallback value to task duration.

* Updates airbyte-cdk dependency.

* Adds UUID in source definitions.

* auto-bump connector version

* Requested changes.

* add clockify to source def seed

* correct spec.json add titles

* add icon

* run format

* remove source spec

* correct spec

* add eof gitignore

* auto-bump connector version

Co-authored-by: nataly <nataly@airbyte.io>
Co-authored-by: Marcos Marx <marcosmarxm@users.noreply.github.com>
Co-authored-by: Nataly Merezhuk <65251165+natalyjazzviolin@users.noreply.github.com>
Co-authored-by: Octavia Squidington III <octavia-squidington-iii@users.noreply.github.com>
Co-authored-by: marcosmarxm <marcosmarxm@gmail.com>
2022-10-26 09:09:15 -03:00

34 lines
1.4 KiB
Python

#
# Copyright (c) 2022 Airbyte, Inc., all rights reserved.
#
from typing import Any, List, Mapping, Tuple
from airbyte_cdk.models import SyncMode
from airbyte_cdk.sources import AbstractSource
from airbyte_cdk.sources.streams import Stream
from airbyte_cdk.sources.streams.http.requests_native_auth.token import TokenAuthenticator
from .streams import Clients, Projects, Tags, Tasks, TimeEntries, UserGroups, Users
# Source
class SourceClockify(AbstractSource):
def check_connection(self, logger, config) -> Tuple[bool, any]:
try:
workspace_stream = Users(
authenticator=TokenAuthenticator(token=config["api_key"], auth_header="X-Api-Key", auth_method=""),
workspace_id=config["workspace_id"],
)
next(workspace_stream.read_records(sync_mode=SyncMode.full_refresh))
return True, None
except Exception as e:
return False, f"Please check that your API key and workspace id are entered correctly: {repr(e)}"
def streams(self, config: Mapping[str, Any]) -> List[Stream]:
authenticator = TokenAuthenticator(token=config["api_key"], auth_header="X-Api-Key", auth_method="")
args = {"authenticator": authenticator, "workspace_id": config["workspace_id"]}
return [Users(**args), Projects(**args), Clients(**args), Tags(**args), UserGroups(**args), TimeEntries(**args), Tasks(**args)]