mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This patch modifies the string overload of IcebergFunctions::TruncatePartitionTransform so that it always handles strings as UTF-8-encoded ones, because the Iceberg specification states that that strings are UTF-8 encoded. Also, for an Iceberg table UrlEncode is called in not the Hive-compatible way, rather than the standard way, similar to Java's URLEncoder.encode() (which the Iceberg API also uses) to conform with existing practices by Hive, Spark and Trino. This included a change in the set of characters which are not escaped to follow the URL Standard's application/x-www-form-urlencoded format. [1] Also renamed it from ShouldNotEscape to IsUrlSafe for better readability. Testing: * add and extend e2e tests to check partitions with Unicode characters * add be tests to coding-util-test.cc [1]: https://url.spec.whatwg.org/#application-x-www-form-urlencoded-percent-encode-set Change-Id: Iabb39727f6dd49b76c918bcd6b3ec62532555755 Reviewed-on: http://gerrit.cloudera.org:8080/23190 Reviewed-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
632 lines
31 KiB
Python
632 lines
31 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
# Targeted Impala insert tests
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
import os
|
|
import pytest
|
|
import re
|
|
|
|
from testdata.common import widetable
|
|
from tests.common.impala_cluster import ImpalaCluster
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.parametrize import UniqueDatabase
|
|
from tests.common.skip import (SkipIfFS, SkipIfLocal, SkipIfHive2,
|
|
SkipIfNotHdfsMinicluster)
|
|
from tests.common.test_dimensions import (
|
|
add_exec_option_dimension,
|
|
create_exec_option_dimension,
|
|
create_uncompressed_text_dimension,
|
|
create_single_exec_option_dimension,
|
|
is_supported_insert_format)
|
|
from tests.common.test_result_verifier import (
|
|
QueryTestResult,
|
|
parse_result_rows)
|
|
from tests.common.test_vector import HS2
|
|
from tests.verifiers.metric_verifier import MetricVerifier
|
|
|
|
|
|
PARQUET_CODECS = ['none', 'snappy', 'gzip', 'zstd', 'lz4']
|
|
|
|
|
|
class TestInsertBase(ImpalaTestSuite):
|
|
"""All tests based on this class should run with 'unique_database' fixture."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestInsertBase, cls).add_test_dimensions()
|
|
if cls.exploration_strategy() == 'core':
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
cluster_sizes=[0], disable_codegen_options=[True, False], batch_sizes=[0],
|
|
sync_ddl=[0]))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
create_uncompressed_text_dimension(cls.get_workload()))
|
|
else:
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
cluster_sizes=[0], disable_codegen_options=[True, False],
|
|
batch_sizes=[0, 1, 16], sync_ddl=[0, 1]))
|
|
add_exec_option_dimension(cls, "compression_codec", PARQUET_CODECS)
|
|
# Insert is currently only supported for text and parquet
|
|
# For parquet, we want to iterate through all the compression codecs
|
|
# TODO: each column in parquet can have a different codec. We could
|
|
# test all the codecs in one table/file with some additional flags.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet'
|
|
or (v.get_value('table_format').file_format == 'text'
|
|
and v.get_value('compression_codec') == 'none'))
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').compression_codec == 'none')
|
|
# Only test other batch sizes for uncompressed parquet to keep the execution time
|
|
# within reasonable bounds.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('exec_option')['batch_size'] == 0
|
|
or (v.get_value('table_format').file_format == 'parquet'
|
|
and v.get_value('compression_codec') == 'none'))
|
|
|
|
@classmethod
|
|
def is_sync_ddl(cls, vector):
|
|
return vector.get_value('exec_option')['sync_ddl'] == 1
|
|
|
|
|
|
class TestInsertQueries(TestInsertBase):
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_insert_large_string(self, vector, unique_database):
|
|
"""Test handling of large strings in inserter and scanner."""
|
|
if "-Xcheck:jni" in os.environ.get("LIBHDFS_OPTS", ""):
|
|
pytest.skip("Test unreasonably slow with JNI checking.")
|
|
table_name = unique_database + ".insert_largestring"
|
|
|
|
self.client.set_configuration_option("mem_limit", "4gb")
|
|
self.client.set_configuration_option("max_row_size", "257mb")
|
|
file_format = vector.get_value('table_format').file_format
|
|
if file_format == "parquet":
|
|
stored_as = file_format
|
|
else:
|
|
assert file_format == "text"
|
|
stored_as = "textfile"
|
|
self.client.execute("""
|
|
create table {0}
|
|
stored as {1} as
|
|
select repeat('AZ', 128 * 1024 * 1024) as s""".format(table_name, stored_as))
|
|
|
|
# Make sure it produces correct result when materializing no tuples.
|
|
result = self.client.execute("select count(*) from {0}".format(table_name))
|
|
assert result.data == ["1"]
|
|
|
|
# Make sure it got the length right.
|
|
result = self.client.execute("select length(s) from {0}".format(table_name))
|
|
assert result.data == [str(2 * 128 * 1024 * 1024)]
|
|
|
|
# Spot-check the data.
|
|
result = self.client.execute(
|
|
"select substr(s, 200 * 1024 * 1024, 5) from {0}".format(table_name))
|
|
assert result.data == ["ZAZAZ"]
|
|
|
|
# IMPALA-7648: test that we gracefully fail when there is not enough memory
|
|
# to fit the scanned string in memory.
|
|
# IMPALA-9856: Disable result spooling for this query since it is intended to test
|
|
# for OOM.
|
|
self.client.set_configuration_option("spool_query_results", "0")
|
|
self.client.set_configuration_option("mem_limit", "50M")
|
|
try:
|
|
self.client.execute("select s from {0}".format(table_name))
|
|
assert False, "Expected query to fail"
|
|
except Exception as e:
|
|
assert "Memory limit exceeded" in str(e)
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
# ABFS partition names cannot end in periods
|
|
@SkipIfFS.file_or_folder_name_ends_with_period
|
|
def test_insert(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/insert', vector, unique_database,
|
|
multiple_impalad=self.is_sync_ddl(vector),
|
|
test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
|
|
.get_db_name_from_format(vector.get_value('table_format'))})
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
@pytest.mark.execute_serially
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
def test_insert_mem_limit(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/insert-mem-limit', vector, unique_database,
|
|
multiple_impalad=self.is_sync_ddl(vector),
|
|
test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
|
|
.get_db_name_from_format(vector.get_value('table_format'))})
|
|
# IMPALA-7023: These queries can linger and use up memory, causing subsequent
|
|
# tests to hit memory limits. Wait for some time to allow the query to
|
|
# be reclaimed.
|
|
verifiers = [MetricVerifier(i.service)
|
|
for i in ImpalaCluster.get_e2e_test_cluster().impalads]
|
|
for v in verifiers:
|
|
v.wait_for_metric("impala-server.num-fragments-in-flight", 0, timeout=180)
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_insert_overwrite(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/insert_overwrite', vector, unique_database,
|
|
multiple_impalad=self.is_sync_ddl(vector),
|
|
test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
|
|
.get_db_name_from_format(vector.get_value('table_format'))})
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_insert_bad_expr(self, vector, unique_database):
|
|
# The test currently relies on codegen being disabled to trigger an error in
|
|
# the output expression of the table sink.
|
|
if vector.get_value('exec_option')['disable_codegen']:
|
|
self.run_test_case('QueryTest/insert_bad_expr', vector, unique_database,
|
|
multiple_impalad=self.is_sync_ddl(vector),
|
|
test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
|
|
.get_db_name_from_format(vector.get_value('table_format'))})
|
|
|
|
|
|
class TestInsertQueriesWithDefaultFormat(TestInsertBase):
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestInsertQueriesWithDefaultFormat, cls).add_test_dimensions()
|
|
# Declare 'default_file_format' option and match it against table_format.
|
|
# This is needed because CREATE queries inside .test files are often not CTAS,
|
|
# not CREATE TABLE LIKE, and do not have STORED AS clause.
|
|
add_exec_option_dimension(cls, "default_file_format", ['text', 'parquet'])
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == v.get_value('default_file_format'))
|
|
|
|
@SkipIfHive2.acid
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_acid_insert(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/acid-insert', vector, unique_database,
|
|
multiple_impalad=self.is_sync_ddl(vector))
|
|
|
|
@SkipIfHive2.acid
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_acid_nonacid_insert(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/acid-nonacid-insert', vector, unique_database,
|
|
multiple_impalad=self.is_sync_ddl(vector))
|
|
|
|
@SkipIfHive2.acid
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_acid_insert_fail(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/acid-insert-fail', vector, unique_database,
|
|
multiple_impalad=self.is_sync_ddl(vector))
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_insert_random_partition(self, vector, unique_database):
|
|
"""Regression test for IMPALA-402: partitioning by rand() leads to strange behaviour
|
|
or crashes."""
|
|
self.run_test_case('QueryTest/insert-random-partition', vector, unique_database,
|
|
multiple_impalad=self.is_sync_ddl(vector))
|
|
|
|
|
|
class TestInsertWideTable(ImpalaTestSuite):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestInsertWideTable, cls).add_test_dimensions()
|
|
|
|
# Only vary codegen
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
cluster_sizes=[0], disable_codegen_options=[True, False], batch_sizes=[0]))
|
|
|
|
# Inserts only supported on text and parquet
|
|
# TODO: Enable 'text'/codec once the compressed text writers are in.
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'parquet'
|
|
or v.get_value('table_format').file_format == 'text')
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').compression_codec == 'none')
|
|
|
|
# Don't run on core. This test is very slow (IMPALA-864) and we are unlikely to
|
|
# regress here.
|
|
if cls.exploration_strategy() != 'exhaustive':
|
|
pytest.skip("Test only run in exhaustive exploration.")
|
|
|
|
@SkipIfLocal.parquet_file_size
|
|
def test_insert_wide_table(self, vector, unique_database):
|
|
table_format = vector.get_value('table_format')
|
|
|
|
# Text can't handle as many columns as Parquet (codegen takes forever)
|
|
num_cols = 1000 if table_format.file_format == 'text' else 2000
|
|
|
|
table_name = unique_database + ".insert_widetable"
|
|
if vector.get_value('exec_option')['disable_codegen']:
|
|
table_name += "_codegen_disabled"
|
|
|
|
col_descs = widetable.get_columns(num_cols)
|
|
create_stmt = "CREATE TABLE " + table_name + "(" + ','.join(col_descs) + ")"
|
|
if vector.get_value('table_format').file_format == 'parquet':
|
|
create_stmt += " stored as parquet"
|
|
self.client.execute(create_stmt)
|
|
|
|
# Get a single row of data
|
|
col_vals = widetable.get_data(num_cols, 1, quote_strings=True)[0]
|
|
insert_stmt = "INSERT INTO " + table_name + " VALUES(" + col_vals + ")"
|
|
self.client.execute(insert_stmt)
|
|
|
|
result = self.client.execute("select count(*) from " + table_name)
|
|
assert result.data == ["1"]
|
|
|
|
result = self.client.execute("select * from " + table_name)
|
|
types = result.column_types
|
|
labels = result.column_labels
|
|
expected = QueryTestResult([col_vals], types, labels, order_matters=False)
|
|
actual = QueryTestResult(
|
|
parse_result_rows(result), types, labels, order_matters=False)
|
|
assert expected == actual
|
|
|
|
|
|
class TestInsertPartKey(ImpalaTestSuite):
|
|
"""Regression test for IMPALA-875"""
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestInsertPartKey, cls).add_test_dimensions()
|
|
# Only run for a single table type
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0],
|
|
sync_ddl=[1]))
|
|
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
(v.get_value('table_format').file_format == 'text'))
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').compression_codec == 'none')
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_insert_part_key(self, vector):
|
|
"""Test that partition column exprs are cast to the correct type. See IMPALA-875."""
|
|
self.run_test_case('QueryTest/insert_part_key', vector,
|
|
multiple_impalad=vector.get_value('exec_option')['sync_ddl'] == 1)
|
|
|
|
def test_escaped_partition_values(self, unique_database):
|
|
"""Test for special characters in partition values."""
|
|
tbl = unique_database + ".tbl"
|
|
self.execute_query(
|
|
"create table {}(i int) partitioned by (p string) stored as parquet".format(tbl))
|
|
# "\\'" is used to represent a single quote since the insert statement uses a single
|
|
# quote on the partition value. "\\\\" is used for backslash since it gets escaped
|
|
# again in parsing the insert statement.
|
|
special_characters = "SpecialCharacters\x01\x02\x03\x04\x05\x06\x07\b\t\n\v\f\r" \
|
|
"\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B" \
|
|
"\x1C\x1D\x1E\x1F\"\x7F\\'%*/:=?\\\\{[]#^"
|
|
part_value = "SpecialCharacters\x01\x02\x03\x04\x05\x06\x07\b\t\n\v\f\r" \
|
|
"\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B" \
|
|
"\x1C\x1D\x1E\x1F\"\x7F'%*/:=?\\{[]#^"
|
|
part_dir = "p=SpecialCharacters%01%02%03%04%05%06%07%08%09%0A%0B%0C%0D%0E%0F%" \
|
|
"10%11%12%13%14%15%16%17%18%19%1A%1B%1C%1D%1E%1F%22%7F%27%25%2A" \
|
|
"%2F%3A%3D%3F%5C%7B%5B%5D%23%5E"
|
|
res = self.execute_query(
|
|
"insert into {} partition(p='{}') values (0)".format(tbl, special_characters))
|
|
assert part_dir in res.runtime_profile
|
|
assert 'NumModifiedRows: 1\n' in res.runtime_profile
|
|
res = self.client.execute("select p from {}".format(tbl))
|
|
assert res.data[0] == part_value
|
|
res = self.execute_query("show partitions " + tbl)
|
|
# There is a "\t" in the partition value making splitting of the result line
|
|
# difficult, hence we only verify that the value is present in string
|
|
# representation of the whole row.
|
|
assert part_value in res.data[0]
|
|
assert part_dir in res.data[0]
|
|
|
|
def test_escaped_partition_values_iceberg(self, unique_database):
|
|
"""Test for special characters in partition values for iceberg tables"""
|
|
tbl = unique_database + ".tbl"
|
|
self.execute_query("create table {} (id int, p string) partitioned by"
|
|
" spec (identity(p)) stored by iceberg".format(tbl))
|
|
# "\\'" is used to represent a single quote since the insert statement uses a single
|
|
# quote on the partition value. "\\\\" is used for backslash since it gets escaped
|
|
# again in parsing the insert statement.
|
|
special_characters = "SpecialCharacters\x01\x02\x03\x04\x05\x06\x07\b\t\n\v\f\r" \
|
|
"\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B" \
|
|
"\x1C\x1D\x1E\x1F\"\x7F\\'%*/:=?\\\\{[]#^"
|
|
part_value = "SpecialCharacters\x01\x02\x03\x04\x05\x06\x07\b\t\n\v\f\r" \
|
|
"\x0E\x0F\x10\x11\x12\x13\x14\x15\x16\x17\x18\x19\x1A\x1B" \
|
|
"\x1C\x1D\x1E\x1F\"\x7F'%*/:=?\\{[]#^"
|
|
part_dir = "p=SpecialCharacters%01%02%03%04%05%06%07%08%09%0A%0B%0C%0D%0E%0F%" \
|
|
"10%11%12%13%14%15%16%17%18%19%1A%1B%1C%1D%1E%1F%22%7F%27%25*" \
|
|
"%2F%3A%3D%3F%5C%7B%5B%5D%23%5E"
|
|
show_part_value = "SpecialCharacters\\u0001\\u0002\\u0003\\u0004\\u0005\\u0006" \
|
|
"\\u0007\\b\\t\\n\\u000B\\f\\r\\u000E\\u000F\\u0010" \
|
|
"\\u0011\\u0012\\u0013\\u0014\\u0015\\u0016\\u0017" \
|
|
"\\u0018\\u0019\\u001A\\u001B\\u001C\\u001D\\u001E" \
|
|
"\\u001F\\\"\\u007F\'%*\\/:=?\\\\{[]#^"
|
|
res = self.execute_query(
|
|
"insert into {} values (0, '{}')".format(tbl, special_characters))
|
|
assert part_dir in res.runtime_profile
|
|
assert 'NumModifiedRows: 1\n' in res.runtime_profile
|
|
res = self.client.execute("select p from {}".format(tbl))
|
|
assert res.data[0] == part_value
|
|
res = self.execute_query("show partitions " + tbl)
|
|
# There is a "\t" in the partition value making splitting of the result line
|
|
# difficult, hence we only verify that the value is present in string
|
|
# representation of the whole row.
|
|
assert show_part_value in res.data[0]
|
|
|
|
|
|
class TestInsertNullQueries(ImpalaTestSuite):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestInsertNullQueries, cls).add_test_dimensions()
|
|
# Fix the exec_option vector to have a single value. This is needed should we decide
|
|
# to run the insert tests in parallel (otherwise there will be two tests inserting
|
|
# into the same table at the same time for the same file format).
|
|
# TODO: When we do decide to run these tests in parallel we could create unique temp
|
|
# tables for each test case to resolve the concurrency problems.
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0]))
|
|
|
|
# These tests only make sense for inserting into a text table with special
|
|
# logic to handle all the possible ways NULL needs to be written as ascii
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
(v.get_value('table_format').file_format == 'text'
|
|
and v.get_value('table_format').compression_codec == 'none'))
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestInsertNullQueries, cls).setup_class()
|
|
|
|
def test_insert_null(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/insert_null', vector, unique_database,
|
|
test_file_vars={'$ORIGINAL_DB': ImpalaTestSuite
|
|
.get_db_name_from_format(vector.get_value('table_format'))})
|
|
|
|
|
|
class TestInsertFileExtension(ImpalaTestSuite):
|
|
"""Tests that files written to a table have the correct file extension. Asserts that
|
|
Parquet files end with .parq and text files end with .txt."""
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestInsertFileExtension, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
is_supported_insert_format(v.get_value('table_format')))
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestInsertFileExtension, cls).setup_class()
|
|
|
|
def test_file_extension(self, vector, unique_database):
|
|
table_format = vector.get_value('table_format').file_format
|
|
if table_format == 'parquet':
|
|
file_extension = '.parq'
|
|
stored_as_format = 'parquet'
|
|
else:
|
|
file_extension = '.txt'
|
|
stored_as_format = 'textfile'
|
|
table_name = "{0}_table".format(table_format)
|
|
ctas_query = "create table {0}.{1} stored as {2} as select 1".format(
|
|
unique_database, table_name, stored_as_format)
|
|
self.execute_query_expect_success(self.client, ctas_query)
|
|
for path in self.filesystem_client.ls("test-warehouse/{0}.db/{1}".format(
|
|
unique_database, table_name)):
|
|
if not path.startswith('_'): assert path.endswith(file_extension)
|
|
|
|
|
|
class TestInsertHdfsWriterLimit(ImpalaTestSuite):
|
|
"""Test to make sure writer fragment instances are distributed evenly when using max
|
|
hdfs_writers query option."""
|
|
@classmethod
|
|
def default_test_protocol(cls):
|
|
return HS2
|
|
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestInsertHdfsWriterLimit, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
(v.get_value('table_format').file_format == 'parquet'))
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
def test_insert_writer_limit(self, unique_database):
|
|
# Root internal (non-leaf) fragment.
|
|
query = "create table {0}.test1 as select int_col from " \
|
|
"functional_parquet.alltypes".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=0,
|
|
expected_num_instances_per_host=[1, 2, 2])
|
|
# Root coordinator fragment.
|
|
query = "create table {0}.test2 as select int_col from " \
|
|
"functional_parquet.alltypes limit 100000".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=0,
|
|
expected_num_instances_per_host=[1, 1, 2])
|
|
# Root scan fragment. Instance count within limit.
|
|
query = "create table {0}.test3 as select int_col from " \
|
|
"functional_parquet.alltypes".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=4, mt_dop=0,
|
|
expected_num_instances_per_host=[1, 1, 1])
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
def test_mt_dop_writer_limit(self, unique_database):
|
|
# Root internal (non-leaf) fragment.
|
|
query = "create table {0}.test1 as select int_col from " \
|
|
"functional_parquet.alltypes".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=11, mt_dop=10,
|
|
expected_num_instances_per_host=[11, 12, 12])
|
|
# Root coordinator fragment.
|
|
query = "create table {0}.test2 as select int_col from " \
|
|
"functional_parquet.alltypes limit 100000".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=2, mt_dop=10,
|
|
expected_num_instances_per_host=[8, 8, 9])
|
|
# Root scan fragment. Instance count within limit.
|
|
query = "create table {0}.test3 as select int_col from " \
|
|
"functional_parquet.alltypes".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=30, mt_dop=10,
|
|
expected_num_instances_per_host=[8, 8, 8])
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
@SkipIfNotHdfsMinicluster.tuned_for_minicluster
|
|
def test_processing_cost_writer_limit(self, unique_database):
|
|
"""Test both scenario of partitioned and unpartitioned insert.
|
|
All of the unpartitioned testscases will result in one instance writer because the
|
|
output volume is less than 256MB. Partitoned insert will result in 1 writer per node
|
|
unless max_fs_writers is set lower than num nodes."""
|
|
# Root internal (non-leaf) fragment.
|
|
query = "create table {0}.test1 as select int_col from " \
|
|
"functional_parquet.alltypes".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=2,
|
|
expected_num_instances_per_host=[1, 1, 2],
|
|
processing_cost_min_threads=1)
|
|
# Root coordinator fragment.
|
|
query = "create table {0}.test2 as select int_col from " \
|
|
"functional_parquet.alltypes limit 100000".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=2,
|
|
expected_num_instances_per_host=[1, 1, 2],
|
|
processing_cost_min_threads=1)
|
|
# Root internal (non-leaf) fragment. Instance count within limit.
|
|
query = "create table {0}.test3 as select int_col from " \
|
|
"functional_parquet.alltypes".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=30,
|
|
expected_num_instances_per_host=[1, 1, 2],
|
|
processing_cost_min_threads=1)
|
|
# Root internal (non-leaf) fragment. No max_fs_writers.
|
|
# Scan node and writer sink should always be in separate fragment with cost-based
|
|
# scaling.
|
|
query = "create table {0}.test4 as select int_col from " \
|
|
"functional_parquet.alltypes".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=0,
|
|
expected_num_instances_per_host=[1, 1, 2],
|
|
processing_cost_min_threads=1)
|
|
# Partitioned insert with 6 distinct partition values.
|
|
# Should create at least 1 writer per node.
|
|
query = "create table {0}.test5 partitioned by (ss_store_sk) as " \
|
|
"select ss_item_sk, ss_ticket_number, ss_store_sk " \
|
|
"from tpcds_parquet.store_sales".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=0,
|
|
expected_num_instances_per_host=[2, 2, 2],
|
|
processing_cost_min_threads=1)
|
|
# Partitioned insert can still be limited by max_fs_writers option.
|
|
query = "create table {0}.test6 partitioned by (ss_store_sk) as " \
|
|
"select ss_item_sk, ss_ticket_number, ss_store_sk " \
|
|
"from tpcds_parquet.store_sales".format(unique_database)
|
|
self.__run_insert_and_verify_instances(query, max_fs_writers=2,
|
|
expected_num_instances_per_host=[1, 2, 2],
|
|
processing_cost_min_threads=1)
|
|
|
|
def __run_insert_and_verify_instances(self, query, max_fs_writers=0, mt_dop=0,
|
|
processing_cost_min_threads=0,
|
|
expected_num_instances_per_host=[]):
|
|
self.client.set_configuration_option("max_fs_writers", max_fs_writers)
|
|
self.client.set_configuration_option("mt_dop", mt_dop)
|
|
if processing_cost_min_threads > 0:
|
|
self.client.set_configuration_option("compute_processing_cost", "true")
|
|
self.client.set_configuration_option("processing_cost_min_threads",
|
|
processing_cost_min_threads)
|
|
# Test depends on both planner and scheduler to see the same state of the cluster
|
|
# having 3 executors, so to reduce flakiness we make sure all 3 executors are up
|
|
# and running.
|
|
self.impalad_test_service.wait_for_metric_value("cluster-membership.backends.total",
|
|
3)
|
|
result = self.client.execute(query, fetch_exec_summary=True)
|
|
assert 'HDFS WRITER' in result.exec_summary[0]['operator'], result.runtime_profile
|
|
if (max_fs_writers > 0):
|
|
num_writers = int(result.exec_summary[0]['num_instances'])
|
|
assert (num_writers <= max_fs_writers), result.runtime_profile
|
|
regex = r'Per Host Number of Fragment Instances' \
|
|
r':.*?\((.*?)\).*?\((.*?)\).*?\((.*?)\).*?\n'
|
|
matches = re.findall(regex, result.runtime_profile)
|
|
assert len(matches) == 1 and len(matches[0]) == 3, result.runtime_profile
|
|
num_instances_per_host = [int(i) for i in matches[0]]
|
|
num_instances_per_host.sort()
|
|
expected_num_instances_per_host.sort()
|
|
assert num_instances_per_host == expected_num_instances_per_host, \
|
|
result.runtime_profile
|
|
self.client.clear_configuration()
|
|
|
|
|
|
class TestInsertNonPartitionedTable(ImpalaTestSuite):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestInsertNonPartitionedTable, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
v.get_value('table_format').file_format == 'text'
|
|
and v.get_value('table_format').compression_codec == 'none')
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
|
|
@classmethod
|
|
def setup_class(cls):
|
|
super(TestInsertNonPartitionedTable, cls).setup_class()
|
|
|
|
def test_insert_load_file_fail(self, vector, unique_database):
|
|
"""Tests metadata won't be corrupted after file metadata loading fails
|
|
in non-partitioned tables."""
|
|
table_name = '{0}.{1}'.format(unique_database, 'test_unpartition_tbl')
|
|
self.client.execute('create table {0}(f0 int)'
|
|
.format(table_name))
|
|
self.client.execute('insert overwrite table {0} select 0'
|
|
.format(table_name))
|
|
result = self.client.execute("select f0 from {0}".format(table_name))
|
|
assert result.data == ["0"]
|
|
|
|
exec_options = vector.get_value('exec_option')
|
|
exec_options['debug_action'] = 'catalogd_load_file_metadata_throw_exception'
|
|
try:
|
|
self.execute_query("insert overwrite table {0} select 1"
|
|
.format(table_name), exec_options)
|
|
assert False, "Expected query to fail."
|
|
except Exception as e:
|
|
assert "Failed to load metadata for table:" in str(e)
|
|
|
|
exec_options['debug_action'] = ''
|
|
self.execute_query("insert overwrite table {0} select 2"
|
|
.format(table_name), exec_options)
|
|
result = self.client.execute("select f0 from {0}".format(table_name))
|
|
assert result.data == ["2"]
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_parallel_checksum(self, vector, unique_database):
|
|
"""Test that checksum is calculated in parallel when inserting into a table
|
|
with multiple files."""
|
|
# Ensure source table is loaded into catalogd.
|
|
self.execute_query("describe functional.alltypesaggmultifilesnopart")
|
|
|
|
exec_options = vector.get_value('exec_option')
|
|
exec_options['debug_action'] = 'catalogd_load_file_checksums_delay:SLEEP@3000'
|
|
handle = self.execute_query_async("create table {0}.test as select * from "
|
|
"functional.alltypesaggmultifilesnopart".format(unique_database), exec_options)
|
|
|
|
# Test file has 4 files, so work can be distributed across 3 executors. This results
|
|
# in writing 3 new files in 3 threads.
|
|
catalogd = ImpalaCluster.get_e2e_test_cluster().catalogd.service
|
|
catalogd.wait_for_metric_value(
|
|
"catalog-server.metadata.file.num-loading-threads", 3)
|
|
catalogd.wait_for_metric_value(
|
|
"catalog-server.metadata.table.num-loading-file-metadata", 1)
|
|
is_finished = self.client.wait_for_finished_timeout(handle, 30)
|
|
assert is_finished, "Query '{}' failed".format(handle.sql_stmt())
|
|
|
|
# Stop the query and cleanup.
|
|
self.close_query(handle)
|
|
|
|
|
|
class TestUnsafeImplicitCasts(ImpalaTestSuite):
|
|
"""Test to check 'allow_unsafe_casts' query-option behaviour on insert statements."""
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestUnsafeImplicitCasts, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
|
(v.get_value('table_format').file_format == 'parquet'))
|
|
|
|
def test_unsafe_insert(self, vector, unique_database):
|
|
create_stmt = """create table {0}.unsafe_insert(tinyint_col tinyint,
|
|
smallint_col smallint, int_col int, bigint_col bigint, float_col float,
|
|
double_col double, decimal_col decimal, timestamp_col timestamp, date_col date,
|
|
string_col string, varchar_col varchar(100), char_col char(100),
|
|
bool_col boolean, binary_col binary)""".format(unique_database)
|
|
create_partitioned_stmt = """create table {0}.unsafe_insert_partitioned(int_col int,
|
|
tinyint_col tinyint) partitioned by(string_col string)""".format(unique_database)
|
|
self.client.execute(create_stmt)
|
|
self.client.execute(create_partitioned_stmt)
|
|
vector.get_value('exec_option')['allow_unsafe_casts'] = "true"
|
|
self.run_test_case('QueryTest/insert-unsafe', vector, unique_database)
|