file cdk: fix typing, pull out non-scalar handling (#35687)
<!-- Thanks for your contribution! Before you submit the pull request, I'd like to kindly remind you to take a moment and read through our guidelines to ensure that your contribution aligns with the type of contributions our project accepts. All the information you need can be found here: https://docs.airbyte.com/contributing-to-airbyte/ We truly appreciate your interest in contributing to Airbyte, and we're excited to see what you have to offer! If you have any questions or need any assistance, feel free to reach out in #contributions Slack channel. --> ## What * Fix typing and handling of different types in `_to_output_value` - we don't always get a `Scalar`. We already handle the different cases correctly, but the typing doesn't reflect this. * Splitting out the methods to do the scalar separately is a helpful precursor to https://github.com/airbytehq/airbyte/pull/35688, as the `DictionaryArray` object doesn't have an `as_py()` method. ## 🚨 User Impact 🚨 None ## Pre-merge Actions *Expand the relevant checklist and delete the others.* <details><summary><strong>New Connector</strong></summary> ### Community member or Airbyter - **Community member?** Grant edit access to maintainers ([instructions](https://docs.github.com/en/github/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork#enabling-repository-maintainer-permissions-on-existing-pull-requests)) - Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run `./gradlew :airbyte-integrations:connectors:<name>:integrationTest`. - Connector version is set to `0.0.1` - `Dockerfile` has version `0.0.1` - Documentation updated - Connector's `README.md` - Connector's `bootstrap.md`. See [description and examples](https://docs.google.com/document/d/1ypdgmwmEHWv-TrO4_YOQ7pAJGVrMp5BOkEVh831N260/edit?usp=sharing) - `docs/integrations/<source or destination>/<name>.md` including changelog with an entry for the initial version. See changelog [example](https://docs.airbyte.io/integrations/sources/stripe#changelog) - `docs/integrations/README.md` ### Airbyter If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items. - Create a non-forked branch based on this PR and test the below items on it - Build is successful - If new credentials are required for use in CI, add them to GSM. [Instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci). </details> <details><summary><strong>Updating a connector</strong></summary> ### Community member or Airbyter - Grant edit access to maintainers ([instructions](https://docs.github.com/en/github/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork#enabling-repository-maintainer-permissions-on-existing-pull-requests)) - Unit & integration tests added ### Airbyter If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items. - Create a non-forked branch based on this PR and test the below items on it - Build is successful - If new credentials are required for use in CI, add them to GSM. [Instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci). </details> <details><summary><strong>Connector Generator</strong></summary> - Issue acceptance criteria met - PR name follows [PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook) - If adding a new generator, add it to the [list of scaffold modules being tested](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/build.gradle#L41) - The generator test modules (all connectors with `-scaffold` in their name) have been updated with the latest scaffold by running `./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds` then checking in your changes - Documentation which references the generator is updated as needed </details> <details><summary><strong>Updating the Python CDK</strong></summary> ### Airbyter Before merging: - Pull Request description explains what problem it is solving - Code change is unit tested - Build and my-py check pass - Smoke test the change on at least one affected connector - On Github: Run [this workflow](https://github.com/airbytehq/airbyte/actions/workflows/connectors_tests.yml), passing `--use-local-cdk --name=source-<connector>` as options - Locally: `airbyte-ci connectors --use-local-cdk --name=source-<connector> test` - PR is reviewed and approved After merging: - [Publish the CDK](https://github.com/airbytehq/airbyte/actions/workflows/publish-cdk-command-manually.yml) - The CDK does not follow proper semantic versioning. Choose minor if this the change has significant user impact or is a breaking change. Choose patch otherwise. - Write a thoughtful changelog message so we know what was updated. - Merge the platform PR that was auto-created for updating the Connector Builder's CDK version - This step is optional if the change does not affect the connector builder or declarative connectors. </details>
This commit is contained in:
@@ -5,7 +5,7 @@
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple
|
||||
from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union
|
||||
from urllib.parse import unquote
|
||||
|
||||
import pyarrow as pa
|
||||
@@ -16,7 +16,7 @@ from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFile
|
||||
from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser
|
||||
from airbyte_cdk.sources.file_based.remote_file import RemoteFile
|
||||
from airbyte_cdk.sources.file_based.schema_helpers import SchemaType
|
||||
from pyarrow import Scalar
|
||||
from pyarrow import DictionaryArray, Scalar
|
||||
|
||||
|
||||
class ParquetParser(FileTypeParser):
|
||||
@@ -95,7 +95,17 @@ class ParquetParser(FileTypeParser):
|
||||
return FileReadMode.READ_BINARY
|
||||
|
||||
@staticmethod
|
||||
def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any:
|
||||
def _to_output_value(parquet_value: Union[Scalar, DictionaryArray], parquet_format: ParquetFormat) -> Any:
|
||||
"""
|
||||
Convert an entry in a pyarrow table to a value that can be output by the source.
|
||||
"""
|
||||
if isinstance(parquet_value, DictionaryArray):
|
||||
return ParquetParser._dictionary_array_to_python_value(parquet_value)
|
||||
else:
|
||||
return ParquetParser._scalar_to_python_value(parquet_value, parquet_format)
|
||||
|
||||
@staticmethod
|
||||
def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any:
|
||||
"""
|
||||
Convert a pyarrow scalar to a value that can be output by the source.
|
||||
"""
|
||||
@@ -119,13 +129,6 @@ class ParquetParser(FileTypeParser):
|
||||
else:
|
||||
return str(parquet_value.as_py())
|
||||
|
||||
# Dictionaries are stored as two columns: indices and values
|
||||
# The indices column is an array of integers that maps to the values column
|
||||
if pa.types.is_dictionary(parquet_value.type):
|
||||
return {
|
||||
"indices": parquet_value.indices.tolist(),
|
||||
"values": parquet_value.dictionary.tolist(),
|
||||
}
|
||||
if pa.types.is_map(parquet_value.type):
|
||||
return {k: v for k, v in parquet_value.as_py()}
|
||||
|
||||
@@ -149,6 +152,20 @@ class ParquetParser(FileTypeParser):
|
||||
else:
|
||||
return parquet_value.as_py()
|
||||
|
||||
@staticmethod
|
||||
def _dictionary_array_to_python_value(parquet_value: DictionaryArray) -> Dict[str, Any]:
|
||||
"""
|
||||
Convert a pyarrow dictionary array to a value that can be output by the source.
|
||||
|
||||
Dictionaries are stored as two columns: indices and values
|
||||
The indices column is an array of integers that maps to the values column
|
||||
"""
|
||||
|
||||
return {
|
||||
"indices": parquet_value.indices.tolist(),
|
||||
"values": parquet_value.dictionary.tolist(),
|
||||
}
|
||||
|
||||
@staticmethod
|
||||
def parquet_type_to_schema_type(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> Mapping[str, str]:
|
||||
"""
|
||||
|
||||
Reference in New Issue
Block a user