From 08dd7de8f8f46863243456906839f30d5e2c9d21 Mon Sep 17 00:00:00 2001 From: Ella Rohm-Ensing Date: Tue, 5 Mar 2024 08:56:52 -0800 Subject: [PATCH] file cdk: fix typing, pull out non-scalar handling (#35687) MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit ## What * Fix typing and handling of different types in `_to_output_value` - we don't always get a `Scalar`. We already handle the different cases correctly, but the typing doesn't reflect this. * Splitting out the methods to do the scalar separately is a helpful precursor to https://github.com/airbytehq/airbyte/pull/35688, as the `DictionaryArray` object doesn't have an `as_py()` method. ## 🚨 User Impact 🚨 None ## Pre-merge Actions *Expand the relevant checklist and delete the others.*
New Connector ### Community member or Airbyter - **Community member?** Grant edit access to maintainers ([instructions](https://docs.github.com/en/github/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork#enabling-repository-maintainer-permissions-on-existing-pull-requests)) - Unit & integration tests added and passing. Community members, please provide proof of success locally e.g: screenshot or copy-paste unit, integration, and acceptance test output. To run acceptance tests for a Python connector, follow instructions in the README. For java connectors run `./gradlew :airbyte-integrations:connectors::integrationTest`. - Connector version is set to `0.0.1` - `Dockerfile` has version `0.0.1` - Documentation updated - Connector's `README.md` - Connector's `bootstrap.md`. See [description and examples](https://docs.google.com/document/d/1ypdgmwmEHWv-TrO4_YOQ7pAJGVrMp5BOkEVh831N260/edit?usp=sharing) - `docs/integrations//.md` including changelog with an entry for the initial version. See changelog [example](https://docs.airbyte.io/integrations/sources/stripe#changelog) - `docs/integrations/README.md` ### Airbyter If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items. - Create a non-forked branch based on this PR and test the below items on it - Build is successful - If new credentials are required for use in CI, add them to GSM. [Instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci).
Updating a connector ### Community member or Airbyter - Grant edit access to maintainers ([instructions](https://docs.github.com/en/github/collaborating-with-pull-requests/working-with-forks/allowing-changes-to-a-pull-request-branch-created-from-a-fork#enabling-repository-maintainer-permissions-on-existing-pull-requests)) - Unit & integration tests added ### Airbyter If this is a community PR, the Airbyte engineer reviewing this PR is responsible for the below items. - Create a non-forked branch based on this PR and test the below items on it - Build is successful - If new credentials are required for use in CI, add them to GSM. [Instructions](https://docs.airbyte.io/connector-development#using-credentials-in-ci).
Connector Generator - Issue acceptance criteria met - PR name follows [PR naming conventions](https://docs.airbyte.com/contributing-to-airbyte/resources/pull-requests-handbook) - If adding a new generator, add it to the [list of scaffold modules being tested](https://github.com/airbytehq/airbyte/blob/master/airbyte-integrations/connector-templates/generator/build.gradle#L41) - The generator test modules (all connectors with `-scaffold` in their name) have been updated with the latest scaffold by running `./gradlew :airbyte-integrations:connector-templates:generator:generateScaffolds` then checking in your changes - Documentation which references the generator is updated as needed
Updating the Python CDK ### Airbyter Before merging: - Pull Request description explains what problem it is solving - Code change is unit tested - Build and my-py check pass - Smoke test the change on at least one affected connector - On Github: Run [this workflow](https://github.com/airbytehq/airbyte/actions/workflows/connectors_tests.yml), passing `--use-local-cdk --name=source-` as options - Locally: `airbyte-ci connectors --use-local-cdk --name=source- test` - PR is reviewed and approved After merging: - [Publish the CDK](https://github.com/airbytehq/airbyte/actions/workflows/publish-cdk-command-manually.yml) - The CDK does not follow proper semantic versioning. Choose minor if this the change has significant user impact or is a breaking change. Choose patch otherwise. - Write a thoughtful changelog message so we know what was updated. - Merge the platform PR that was auto-created for updating the Connector Builder's CDK version - This step is optional if the change does not affect the connector builder or declarative connectors.
--- .../file_based/file_types/parquet_parser.py | 37 ++++++++++++++----- 1 file changed, 27 insertions(+), 10 deletions(-) diff --git a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py index 00b78c48980..b57e413c024 100644 --- a/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py +++ b/airbyte-cdk/python/airbyte_cdk/sources/file_based/file_types/parquet_parser.py @@ -5,7 +5,7 @@ import json import logging import os -from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple +from typing import Any, Dict, Iterable, List, Mapping, Optional, Tuple, Union from urllib.parse import unquote import pyarrow as pa @@ -16,7 +16,7 @@ from airbyte_cdk.sources.file_based.file_based_stream_reader import AbstractFile from airbyte_cdk.sources.file_based.file_types.file_type_parser import FileTypeParser from airbyte_cdk.sources.file_based.remote_file import RemoteFile from airbyte_cdk.sources.file_based.schema_helpers import SchemaType -from pyarrow import Scalar +from pyarrow import DictionaryArray, Scalar class ParquetParser(FileTypeParser): @@ -95,7 +95,17 @@ class ParquetParser(FileTypeParser): return FileReadMode.READ_BINARY @staticmethod - def _to_output_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any: + def _to_output_value(parquet_value: Union[Scalar, DictionaryArray], parquet_format: ParquetFormat) -> Any: + """ + Convert an entry in a pyarrow table to a value that can be output by the source. + """ + if isinstance(parquet_value, DictionaryArray): + return ParquetParser._dictionary_array_to_python_value(parquet_value) + else: + return ParquetParser._scalar_to_python_value(parquet_value, parquet_format) + + @staticmethod + def _scalar_to_python_value(parquet_value: Scalar, parquet_format: ParquetFormat) -> Any: """ Convert a pyarrow scalar to a value that can be output by the source. """ @@ -119,13 +129,6 @@ class ParquetParser(FileTypeParser): else: return str(parquet_value.as_py()) - # Dictionaries are stored as two columns: indices and values - # The indices column is an array of integers that maps to the values column - if pa.types.is_dictionary(parquet_value.type): - return { - "indices": parquet_value.indices.tolist(), - "values": parquet_value.dictionary.tolist(), - } if pa.types.is_map(parquet_value.type): return {k: v for k, v in parquet_value.as_py()} @@ -149,6 +152,20 @@ class ParquetParser(FileTypeParser): else: return parquet_value.as_py() + @staticmethod + def _dictionary_array_to_python_value(parquet_value: DictionaryArray) -> Dict[str, Any]: + """ + Convert a pyarrow dictionary array to a value that can be output by the source. + + Dictionaries are stored as two columns: indices and values + The indices column is an array of integers that maps to the values column + """ + + return { + "indices": parquet_value.indices.tolist(), + "values": parquet_value.dictionary.tolist(), + } + @staticmethod def parquet_type_to_schema_type(parquet_type: pa.DataType, parquet_format: ParquetFormat) -> Mapping[str, str]: """