From ae4ff47fe7547863df06e5e224b87065a4eaf110 Mon Sep 17 00:00:00 2001 From: Zoltan Borok-Nagy Date: Thu, 3 Oct 2024 16:58:58 +0200 Subject: [PATCH] IMPALA-12967, IMPALA-13059, IMPALA-13144, IMPALA-13195: test_migrated_table_field_id_resolution fails in exhaustive mode When executed in exhaustive mode, multiple instances of test_migrated_table_field_id_resolution is running in parallel, reading and writing the same files which can lead to various errors, hence the multiple Jira tickets in the title. Building upon rewrite-iceberg-metadata.py, with this patch the different test instances load the tables under different directories (corresponding to the unique_database). Change-Id: Id41a78940a5da5344735974e1d2c94ed4f24539a Reviewed-on: http://gerrit.cloudera.org:8080/21882 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- testdata/bin/load-test-warehouse-snapshot.sh | 5 +- testdata/bin/rewrite-iceberg-metadata.py | 109 +------------ tests/common/file_utils.py | 26 ++-- tests/util/iceberg_metadata_util.py | 154 +++++++++++++++++++ 4 files changed, 174 insertions(+), 120 deletions(-) create mode 100644 tests/util/iceberg_metadata_util.py diff --git a/testdata/bin/load-test-warehouse-snapshot.sh b/testdata/bin/load-test-warehouse-snapshot.sh index ff73d11eb..480eaa4b7 100755 --- a/testdata/bin/load-test-warehouse-snapshot.sh +++ b/testdata/bin/load-test-warehouse-snapshot.sh @@ -117,8 +117,9 @@ if [ "${TARGET_FILESYSTEM}" != "hdfs" ]; then # Need to rewrite test metadata regardless of ${WAREHOUSE_LOCATION_PREFIX} because # paths can have "hdfs://" scheme echo "Updating Iceberg locations with warehouse prefix ${WAREHOUSE_LOCATION_PREFIX}" - ${IMPALA_HOME}/testdata/bin/rewrite-iceberg-metadata.py "${WAREHOUSE_LOCATION_PREFIX}" \ - $(find ${SNAPSHOT_STAGING_DIR}${TEST_WAREHOUSE_DIR}/ -name "metadata") + PYTHONPATH=${IMPALA_HOME} ${IMPALA_HOME}/testdata/bin/rewrite-iceberg-metadata.py \ + "${WAREHOUSE_LOCATION_PREFIX}" \ + $(find ${SNAPSHOT_STAGING_DIR}${TEST_WAREHOUSE_DIR}/ -name "metadata") fi echo "Copying data to ${TARGET_FILESYSTEM}" diff --git a/testdata/bin/rewrite-iceberg-metadata.py b/testdata/bin/rewrite-iceberg-metadata.py index d0c4d40dc..a435b5993 100755 --- a/testdata/bin/rewrite-iceberg-metadata.py +++ b/testdata/bin/rewrite-iceberg-metadata.py @@ -18,14 +18,9 @@ # under the License. from __future__ import absolute_import, division, print_function -from builtins import map -import glob -import json -import os import sys +from tests.util.iceberg_metadata_util import rewrite_metadata -from avro.datafile import DataFileReader, DataFileWriter -from avro.io import DatumReader, DatumWriter args = sys.argv[1:] if len(args) < 2: @@ -34,103 +29,5 @@ if len(args) < 2: prefix = args[0] -# Easier to cache it instead of trying to resolve the manifest files paths -file_size_cache = {} - - -def generate_new_path(prefix, file_path): - """ Hive generates metadata with absolute paths. - This method relativizes the path and applies a new prefix.""" - start_directory = "/test-warehouse" - start = file_path.find(start_directory) - if start == -1: - raise RuntimeError("{} is not found in file path:{}".format( - start_directory, file_path)) - return prefix + file_path[start:] - - -def add_prefix_to_snapshot(snapshot): - if 'manifest-list' in snapshot: - snapshot['manifest-list'] = generate_new_path(prefix, snapshot['manifest-list']) - if 'manifests' in snapshot: - snapshot['manifests'] = [generate_new_path(prefix, m) for m in snapshot['manifests']] - return snapshot - - -def add_prefix_to_mlog(metadata_log): - metadata_log['metadata-file'] = generate_new_path(prefix, metadata_log['metadata-file']) - return metadata_log - - -def add_prefix_to_snapshot_entry(entry): - if 'manifest_path' in entry: - entry['manifest_path'] = generate_new_path(prefix, entry['manifest_path']) - if 'data_file' in entry: - entry['data_file']['file_path'] = generate_new_path(prefix, - entry['data_file']['file_path']) - return entry - - -def fix_manifest_length(entry): - if 'manifest_path' in entry and 'manifest_length' in entry: - filename = entry['manifest_path'].split('/')[-1] - if filename in file_size_cache: - entry['manifest_length'] = file_size_cache[filename] - return entry - - -for arg in args[1:]: - # Update metadata.json - for mfile in glob.glob(os.path.join(arg, '*.metadata.json')): - with open(mfile, 'r') as f: - metadata = json.load(f) - - if 'format-version' not in metadata: - print("WARN: skipping {}, missing format-version".format(f)) - continue - - version = metadata['format-version'] - if version < 1 or version > 2: - print("WARN: skipping {}, unknown version {}".format(f, version)) - continue - - # metadata: required - metadata['location'] = generate_new_path(prefix, metadata['location']) - - # snapshots: optional - if 'snapshots' in metadata: - metadata['snapshots'] = list(map(add_prefix_to_snapshot, metadata['snapshots'])) - - # metadata-log: optional - if 'metadata-log' in metadata: - metadata['metadata-log'] = list(map(add_prefix_to_mlog, metadata['metadata-log'])) - - with open(mfile + '.tmp', 'w') as f: - json.dump(metadata, f, indent=2) - os.rename(mfile + '.tmp', mfile) - - for afile in glob.glob(os.path.join(arg, '*.avro')): - with open(afile, 'rb') as f: - with DataFileReader(f, DatumReader()) as reader: - schema = reader.datum_reader.writers_schema - lines = list(map(add_prefix_to_snapshot_entry, reader)) - - with open(afile + '.tmp', 'wb') as f: - with DataFileWriter(f, DatumWriter(), schema) as writer: - for line in lines: - writer.append(line) - os.rename(afile + '.tmp', afile) - filename = afile.split('/')[-1] - file_size_cache[filename] = os.path.getsize(afile) - - for snapfile in glob.glob(os.path.join(arg, 'snap*.avro')): - with open(snapfile, 'rb') as f: - with DataFileReader(f, DatumReader()) as reader: - schema = reader.datum_reader.writers_schema - lines = list(map(fix_manifest_length, reader)) - - with open(snapfile + '.tmp', 'wb') as f: - with DataFileWriter(f, DatumWriter(), schema) as writer: - for line in lines: - writer.append(line) - os.rename(snapfile + '.tmp', snapfile) +for metadata_dir in args[1:]: + rewrite_metadata(prefix, None, metadata_dir) diff --git a/tests/common/file_utils.py b/tests/common/file_utils.py index f3c4e478f..66431dd16 100644 --- a/tests/common/file_utils.py +++ b/tests/common/file_utils.py @@ -27,6 +27,7 @@ import tempfile from subprocess import check_call from tests.util.filesystem_utils import get_fs_path, WAREHOUSE_PREFIX +from tests.util.iceberg_metadata_util import rewrite_metadata def create_iceberg_table_from_directory(impala_client, unique_database, table_name, @@ -38,28 +39,29 @@ def create_iceberg_table_from_directory(impala_client, unique_database, table_na assert file_format == "orc" or file_format == "parquet" local_dir = os.path.join( - os.environ['IMPALA_HOME'], 'testdata/data/iceberg_test/{0}'.format(table_name)) + os.environ['IMPALA_HOME'], 'testdata', 'data', 'iceberg_test', table_name) assert os.path.isdir(local_dir) - # If using a prefix, rewrite iceberg metadata to use the prefix - if WAREHOUSE_PREFIX: - tmp_dir = tempfile.mktemp(table_name) - check_call(['cp', '-r', local_dir, tmp_dir]) - rewrite = os.path.join( - os.environ['IMPALA_HOME'], 'testdata/bin/rewrite-iceberg-metadata.py') - check_call([rewrite, WAREHOUSE_PREFIX, os.path.join(tmp_dir, 'metadata')]) - local_dir = tmp_dir + # Rewrite iceberg metadata to use the warehouse prefix and use unique_database + tmp_dir = tempfile.mktemp(table_name) + # Need to create the temp dir so 'cp -r' will copy local dir with its original name + # under the temp dir. rewrite_metadata() has the assumption that the parent directory + # of the 'metadata' directory bears the name of the table. + check_call(['mkdir', '-p', tmp_dir]) + check_call(['cp', '-r', local_dir, tmp_dir]) + local_dir = os.path.join(tmp_dir, table_name) + rewrite_metadata(WAREHOUSE_PREFIX, unique_database, os.path.join(local_dir, 'metadata')) # Put the directory in the database's directory (not the table directory) - hdfs_parent_dir = get_fs_path("/test-warehouse") - + hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), unique_database) hdfs_dir = os.path.join(hdfs_parent_dir, table_name) # Purge existing files if any check_call(['hdfs', 'dfs', '-rm', '-f', '-r', hdfs_dir]) # Note: -d skips a staging copy - check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_dir]) + check_call(['hdfs', 'dfs', '-mkdir', '-p', hdfs_parent_dir]) + check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_parent_dir]) # Create external table qualified_table_name = '{0}.{1}'.format(unique_database, table_name) diff --git a/tests/util/iceberg_metadata_util.py b/tests/util/iceberg_metadata_util.py new file mode 100644 index 000000000..f9935b227 --- /dev/null +++ b/tests/util/iceberg_metadata_util.py @@ -0,0 +1,154 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from __future__ import absolute_import, division, print_function +from builtins import map +from functools import partial +import glob +import json +import os + +from avro.datafile import DataFileReader, DataFileWriter +from avro.io import DatumReader, DatumWriter + + +def rewrite_metadata(prefix, unique_database, metadata_dir): + # Update metadata.json + for mfile in glob.glob(os.path.join(metadata_dir, '*.metadata.json')): + with open(mfile, 'r') as f: + metadata = json.load(f) + + if 'format-version' not in metadata: + print("WARN: skipping {}, missing format-version".format(f)) + continue + + version = metadata['format-version'] + if version < 1 or version > 2: + print("WARN: skipping {}, unknown version {}".format(f, version)) + continue + + metadata_parent_dir = os.path.split(metadata_dir.rstrip("/"))[0] + table_name = os.path.basename(metadata_parent_dir) + + table_params = (prefix, unique_database, table_name) + + # metadata: required + metadata['location'] = generate_new_path( + table_params, metadata['location']) + + # snapshots: optional + if 'snapshots' in metadata: + metadata['snapshots'] = \ + list(map(partial(add_prefix_to_snapshot, table_params), metadata['snapshots'])) + + # metadata-log: optional + if 'metadata-log' in metadata: + metadata['metadata-log'] = \ + list(map(partial(add_prefix_to_mlog, table_params), metadata['metadata-log'])) + + with open(mfile + '.tmp', 'w') as f: + json.dump(metadata, f, indent=2) + os.rename(mfile + '.tmp', mfile) + + # Easier to cache it instead of trying to resolve the manifest files paths + file_size_cache = {} + + for afile in glob.glob(os.path.join(metadata_dir, '*.avro')): + with open(afile, 'rb') as f: + with DataFileReader(f, DatumReader()) as reader: + schema = reader.datum_reader.writers_schema + lines = list(map( + partial(add_prefix_to_snapshot_entry, table_params), + reader)) + + with open(afile + '.tmp', 'wb') as f: + with DataFileWriter(f, DatumWriter(), schema) as writer: + for line in lines: + writer.append(line) + os.rename(afile + '.tmp', afile) + filename = afile.split('/')[-1] + file_size_cache[filename] = os.path.getsize(afile) + + for snapfile in glob.glob(os.path.join(metadata_dir, 'snap*.avro')): + with open(snapfile, 'rb') as f: + with DataFileReader(f, DatumReader()) as reader: + schema = reader.datum_reader.writers_schema + lines = list(map(partial(fix_manifest_length, file_size_cache), reader)) + + with open(snapfile + '.tmp', 'wb') as f: + with DataFileWriter(f, DatumWriter(), schema) as writer: + for line in lines: + writer.append(line) + os.rename(snapfile + '.tmp', snapfile) + + +def generate_new_path(table_params, file_path): + """ Hive generates metadata with absolute paths. + This method relativizes the path and applies a new prefix.""" + prefix, unique_database, table_name = table_params + start_directory = "/test-warehouse" + start = file_path.find(start_directory) + if start == -1: + raise RuntimeError("{} is not found in file path:{}".format( + start_directory, file_path)) + + result = file_path[start:] + if prefix: + result = prefix + result + + if not unique_database: + return result + + def replace_last(s, old_expr, new_expr): + maxsplit = 1 + li = s.rsplit(old_expr, maxsplit) + assert len(li) == 2 + return new_expr.join(li) + + return replace_last(result, table_name, "{}/{}".format(unique_database, table_name)) + + +def add_prefix_to_snapshot(table_params, snapshot): + if 'manifest-list' in snapshot: + snapshot['manifest-list'] = generate_new_path(table_params, snapshot['manifest-list']) + if 'manifests' in snapshot: + snapshot['manifests'] = [ + generate_new_path(table_params, m) for m in snapshot['manifests']] + return snapshot + + +def add_prefix_to_mlog(table_params, metadata_log): + metadata_log['metadata-file'] = generate_new_path( + table_params, metadata_log['metadata-file']) + return metadata_log + + +def add_prefix_to_snapshot_entry(table_params, entry): + if 'manifest_path' in entry: + entry['manifest_path'] = generate_new_path(table_params, entry['manifest_path']) + if 'data_file' in entry: + entry['data_file']['file_path'] = generate_new_path( + table_params, entry['data_file']['file_path']) + return entry + + +def fix_manifest_length(file_size_cache, entry): + if 'manifest_path' in entry and 'manifest_length' in entry: + filename = entry['manifest_path'].split('/')[-1] + if filename in file_size_cache: + entry['manifest_length'] = file_size_cache[filename] + return entry