IMPALA-12967, IMPALA-13059, IMPALA-13144, IMPALA-13195: test_migrated_table_field_id_resolution fails in exhaustive mode

When executed in exhaustive mode, multiple instances of
test_migrated_table_field_id_resolution is running in parallel,
reading and writing the same files which can lead to various
errors, hence the multiple Jira tickets in the title.

Building upon rewrite-iceberg-metadata.py, with this patch
the different test instances load the tables under different
directories (corresponding to the unique_database).

Change-Id: Id41a78940a5da5344735974e1d2c94ed4f24539a
Reviewed-on: http://gerrit.cloudera.org:8080/21882
Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
This commit is contained in:
Zoltan Borok-Nagy
2024-10-03 16:58:58 +02:00
committed by Impala Public Jenkins
parent 139c74bcf3
commit ae4ff47fe7
4 changed files with 174 additions and 120 deletions

View File

@@ -117,8 +117,9 @@ if [ "${TARGET_FILESYSTEM}" != "hdfs" ]; then
# Need to rewrite test metadata regardless of ${WAREHOUSE_LOCATION_PREFIX} because
# paths can have "hdfs://" scheme
echo "Updating Iceberg locations with warehouse prefix ${WAREHOUSE_LOCATION_PREFIX}"
${IMPALA_HOME}/testdata/bin/rewrite-iceberg-metadata.py "${WAREHOUSE_LOCATION_PREFIX}" \
$(find ${SNAPSHOT_STAGING_DIR}${TEST_WAREHOUSE_DIR}/ -name "metadata")
PYTHONPATH=${IMPALA_HOME} ${IMPALA_HOME}/testdata/bin/rewrite-iceberg-metadata.py \
"${WAREHOUSE_LOCATION_PREFIX}" \
$(find ${SNAPSHOT_STAGING_DIR}${TEST_WAREHOUSE_DIR}/ -name "metadata")
fi
echo "Copying data to ${TARGET_FILESYSTEM}"

View File

@@ -18,14 +18,9 @@
# under the License.
from __future__ import absolute_import, division, print_function
from builtins import map
import glob
import json
import os
import sys
from tests.util.iceberg_metadata_util import rewrite_metadata
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
args = sys.argv[1:]
if len(args) < 2:
@@ -34,103 +29,5 @@ if len(args) < 2:
prefix = args[0]
# Easier to cache it instead of trying to resolve the manifest files paths
file_size_cache = {}
def generate_new_path(prefix, file_path):
""" Hive generates metadata with absolute paths.
This method relativizes the path and applies a new prefix."""
start_directory = "/test-warehouse"
start = file_path.find(start_directory)
if start == -1:
raise RuntimeError("{} is not found in file path:{}".format(
start_directory, file_path))
return prefix + file_path[start:]
def add_prefix_to_snapshot(snapshot):
if 'manifest-list' in snapshot:
snapshot['manifest-list'] = generate_new_path(prefix, snapshot['manifest-list'])
if 'manifests' in snapshot:
snapshot['manifests'] = [generate_new_path(prefix, m) for m in snapshot['manifests']]
return snapshot
def add_prefix_to_mlog(metadata_log):
metadata_log['metadata-file'] = generate_new_path(prefix, metadata_log['metadata-file'])
return metadata_log
def add_prefix_to_snapshot_entry(entry):
if 'manifest_path' in entry:
entry['manifest_path'] = generate_new_path(prefix, entry['manifest_path'])
if 'data_file' in entry:
entry['data_file']['file_path'] = generate_new_path(prefix,
entry['data_file']['file_path'])
return entry
def fix_manifest_length(entry):
if 'manifest_path' in entry and 'manifest_length' in entry:
filename = entry['manifest_path'].split('/')[-1]
if filename in file_size_cache:
entry['manifest_length'] = file_size_cache[filename]
return entry
for arg in args[1:]:
# Update metadata.json
for mfile in glob.glob(os.path.join(arg, '*.metadata.json')):
with open(mfile, 'r') as f:
metadata = json.load(f)
if 'format-version' not in metadata:
print("WARN: skipping {}, missing format-version".format(f))
continue
version = metadata['format-version']
if version < 1 or version > 2:
print("WARN: skipping {}, unknown version {}".format(f, version))
continue
# metadata: required
metadata['location'] = generate_new_path(prefix, metadata['location'])
# snapshots: optional
if 'snapshots' in metadata:
metadata['snapshots'] = list(map(add_prefix_to_snapshot, metadata['snapshots']))
# metadata-log: optional
if 'metadata-log' in metadata:
metadata['metadata-log'] = list(map(add_prefix_to_mlog, metadata['metadata-log']))
with open(mfile + '.tmp', 'w') as f:
json.dump(metadata, f, indent=2)
os.rename(mfile + '.tmp', mfile)
for afile in glob.glob(os.path.join(arg, '*.avro')):
with open(afile, 'rb') as f:
with DataFileReader(f, DatumReader()) as reader:
schema = reader.datum_reader.writers_schema
lines = list(map(add_prefix_to_snapshot_entry, reader))
with open(afile + '.tmp', 'wb') as f:
with DataFileWriter(f, DatumWriter(), schema) as writer:
for line in lines:
writer.append(line)
os.rename(afile + '.tmp', afile)
filename = afile.split('/')[-1]
file_size_cache[filename] = os.path.getsize(afile)
for snapfile in glob.glob(os.path.join(arg, 'snap*.avro')):
with open(snapfile, 'rb') as f:
with DataFileReader(f, DatumReader()) as reader:
schema = reader.datum_reader.writers_schema
lines = list(map(fix_manifest_length, reader))
with open(snapfile + '.tmp', 'wb') as f:
with DataFileWriter(f, DatumWriter(), schema) as writer:
for line in lines:
writer.append(line)
os.rename(snapfile + '.tmp', snapfile)
for metadata_dir in args[1:]:
rewrite_metadata(prefix, None, metadata_dir)

View File

@@ -27,6 +27,7 @@ import tempfile
from subprocess import check_call
from tests.util.filesystem_utils import get_fs_path, WAREHOUSE_PREFIX
from tests.util.iceberg_metadata_util import rewrite_metadata
def create_iceberg_table_from_directory(impala_client, unique_database, table_name,
@@ -38,28 +39,29 @@ def create_iceberg_table_from_directory(impala_client, unique_database, table_na
assert file_format == "orc" or file_format == "parquet"
local_dir = os.path.join(
os.environ['IMPALA_HOME'], 'testdata/data/iceberg_test/{0}'.format(table_name))
os.environ['IMPALA_HOME'], 'testdata', 'data', 'iceberg_test', table_name)
assert os.path.isdir(local_dir)
# If using a prefix, rewrite iceberg metadata to use the prefix
if WAREHOUSE_PREFIX:
tmp_dir = tempfile.mktemp(table_name)
check_call(['cp', '-r', local_dir, tmp_dir])
rewrite = os.path.join(
os.environ['IMPALA_HOME'], 'testdata/bin/rewrite-iceberg-metadata.py')
check_call([rewrite, WAREHOUSE_PREFIX, os.path.join(tmp_dir, 'metadata')])
local_dir = tmp_dir
# Rewrite iceberg metadata to use the warehouse prefix and use unique_database
tmp_dir = tempfile.mktemp(table_name)
# Need to create the temp dir so 'cp -r' will copy local dir with its original name
# under the temp dir. rewrite_metadata() has the assumption that the parent directory
# of the 'metadata' directory bears the name of the table.
check_call(['mkdir', '-p', tmp_dir])
check_call(['cp', '-r', local_dir, tmp_dir])
local_dir = os.path.join(tmp_dir, table_name)
rewrite_metadata(WAREHOUSE_PREFIX, unique_database, os.path.join(local_dir, 'metadata'))
# Put the directory in the database's directory (not the table directory)
hdfs_parent_dir = get_fs_path("/test-warehouse")
hdfs_parent_dir = os.path.join(get_fs_path("/test-warehouse"), unique_database)
hdfs_dir = os.path.join(hdfs_parent_dir, table_name)
# Purge existing files if any
check_call(['hdfs', 'dfs', '-rm', '-f', '-r', hdfs_dir])
# Note: -d skips a staging copy
check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_dir])
check_call(['hdfs', 'dfs', '-mkdir', '-p', hdfs_parent_dir])
check_call(['hdfs', 'dfs', '-put', '-d', local_dir, hdfs_parent_dir])
# Create external table
qualified_table_name = '{0}.{1}'.format(unique_database, table_name)

View File

@@ -0,0 +1,154 @@
# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements. See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership. The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License. You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied. See the License for the
# specific language governing permissions and limitations
# under the License.
from __future__ import absolute_import, division, print_function
from builtins import map
from functools import partial
import glob
import json
import os
from avro.datafile import DataFileReader, DataFileWriter
from avro.io import DatumReader, DatumWriter
def rewrite_metadata(prefix, unique_database, metadata_dir):
# Update metadata.json
for mfile in glob.glob(os.path.join(metadata_dir, '*.metadata.json')):
with open(mfile, 'r') as f:
metadata = json.load(f)
if 'format-version' not in metadata:
print("WARN: skipping {}, missing format-version".format(f))
continue
version = metadata['format-version']
if version < 1 or version > 2:
print("WARN: skipping {}, unknown version {}".format(f, version))
continue
metadata_parent_dir = os.path.split(metadata_dir.rstrip("/"))[0]
table_name = os.path.basename(metadata_parent_dir)
table_params = (prefix, unique_database, table_name)
# metadata: required
metadata['location'] = generate_new_path(
table_params, metadata['location'])
# snapshots: optional
if 'snapshots' in metadata:
metadata['snapshots'] = \
list(map(partial(add_prefix_to_snapshot, table_params), metadata['snapshots']))
# metadata-log: optional
if 'metadata-log' in metadata:
metadata['metadata-log'] = \
list(map(partial(add_prefix_to_mlog, table_params), metadata['metadata-log']))
with open(mfile + '.tmp', 'w') as f:
json.dump(metadata, f, indent=2)
os.rename(mfile + '.tmp', mfile)
# Easier to cache it instead of trying to resolve the manifest files paths
file_size_cache = {}
for afile in glob.glob(os.path.join(metadata_dir, '*.avro')):
with open(afile, 'rb') as f:
with DataFileReader(f, DatumReader()) as reader:
schema = reader.datum_reader.writers_schema
lines = list(map(
partial(add_prefix_to_snapshot_entry, table_params),
reader))
with open(afile + '.tmp', 'wb') as f:
with DataFileWriter(f, DatumWriter(), schema) as writer:
for line in lines:
writer.append(line)
os.rename(afile + '.tmp', afile)
filename = afile.split('/')[-1]
file_size_cache[filename] = os.path.getsize(afile)
for snapfile in glob.glob(os.path.join(metadata_dir, 'snap*.avro')):
with open(snapfile, 'rb') as f:
with DataFileReader(f, DatumReader()) as reader:
schema = reader.datum_reader.writers_schema
lines = list(map(partial(fix_manifest_length, file_size_cache), reader))
with open(snapfile + '.tmp', 'wb') as f:
with DataFileWriter(f, DatumWriter(), schema) as writer:
for line in lines:
writer.append(line)
os.rename(snapfile + '.tmp', snapfile)
def generate_new_path(table_params, file_path):
""" Hive generates metadata with absolute paths.
This method relativizes the path and applies a new prefix."""
prefix, unique_database, table_name = table_params
start_directory = "/test-warehouse"
start = file_path.find(start_directory)
if start == -1:
raise RuntimeError("{} is not found in file path:{}".format(
start_directory, file_path))
result = file_path[start:]
if prefix:
result = prefix + result
if not unique_database:
return result
def replace_last(s, old_expr, new_expr):
maxsplit = 1
li = s.rsplit(old_expr, maxsplit)
assert len(li) == 2
return new_expr.join(li)
return replace_last(result, table_name, "{}/{}".format(unique_database, table_name))
def add_prefix_to_snapshot(table_params, snapshot):
if 'manifest-list' in snapshot:
snapshot['manifest-list'] = generate_new_path(table_params, snapshot['manifest-list'])
if 'manifests' in snapshot:
snapshot['manifests'] = [
generate_new_path(table_params, m) for m in snapshot['manifests']]
return snapshot
def add_prefix_to_mlog(table_params, metadata_log):
metadata_log['metadata-file'] = generate_new_path(
table_params, metadata_log['metadata-file'])
return metadata_log
def add_prefix_to_snapshot_entry(table_params, entry):
if 'manifest_path' in entry:
entry['manifest_path'] = generate_new_path(table_params, entry['manifest_path'])
if 'data_file' in entry:
entry['data_file']['file_path'] = generate_new_path(
table_params, entry['data_file']['file_path'])
return entry
def fix_manifest_length(file_size_cache, entry):
if 'manifest_path' in entry and 'manifest_length' in entry:
filename = entry['manifest_path'].split('/')[-1]
if filename in file_size_cache:
entry['manifest_length'] = file_size_cache[filename]
return entry