# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function from builtins import map, range import getpass import itertools import pytest import re import time import threading from multiprocessing.pool import ThreadPool from multiprocessing import TimeoutError from copy import deepcopy from tests.metadata.test_ddl_base import TestDdlBase from tests.common.environ import (HIVE_MAJOR_VERSION) from tests.common.file_utils import create_table_from_orc from tests.common.impala_connection import ( FINISHED, INITIALIZED, IMPALA_CONNECTION_EXCEPTION, PENDING, RUNNING) from tests.common.impala_test_suite import LOG from tests.common.parametrize import UniqueDatabase from tests.common.skip import ( SkipIfDockerizedCluster, SkipIfFS, SkipIfHive2, SkipIfKudu, SkipIfLocal) from tests.common.test_dimensions import create_single_exec_option_dimension from tests.common.test_dimensions import (create_exec_option_dimension, create_client_protocol_dimension, create_exec_option_dimension_from_dict) from tests.common.test_vector import ImpalaTestDimension from tests.util.filesystem_utils import ( get_fs_path, WAREHOUSE, WAREHOUSE_PREFIX, IS_HDFS, IS_S3, IS_ADLS, IS_OZONE, FILESYSTEM_NAME) from tests.common.impala_cluster import ImpalaCluster from tests.util.filesystem_utils import FILESYSTEM_PREFIX from tests.util.parse_util import parse_duration_string_ms from tests.util.shell_util import dump_server_stacktraces def get_trash_path(bucket, path): if IS_OZONE: return get_fs_path('/{0}/.Trash/{1}/Current/{2}'.format( bucket, getpass.getuser(), path)) return '/user/{0}/.Trash/Current/{1}/{2}'.format(getpass.getuser(), bucket, path) # Validates DDL statements (create, drop) class TestDdlStatements(TestDdlBase): @SkipIfLocal.hdfs_client def test_drop_table_with_purge(self, unique_database): """This test checks if the table data is permanently deleted in DROP TABLE PURGE queries""" self.client.execute("create table {0}.t1(i int)".format(unique_database)) self.client.execute("create table {0}.t2(i int)".format(unique_database)) # Create sample test data files under the table directories dbpath = "{0}/{1}.db".format(WAREHOUSE, unique_database) self.filesystem_client.create_file("{}/t1/t1.txt".format(dbpath), file_data='t1') self.filesystem_client.create_file("{}/t2/t2.txt".format(dbpath), file_data='t2') # Drop the table (without purge) and make sure it exists in trash self.client.execute("drop table {0}.t1".format(unique_database)) assert not self.filesystem_client.exists("{}/t1/t1.txt".format(dbpath)) assert not self.filesystem_client.exists("{}/t1/".format(dbpath)) trash = get_trash_path("test-warehouse", unique_database + ".db") assert self.filesystem_client.exists("{}/t1/t1.txt".format(trash)) assert self.filesystem_client.exists("{}/t1".format(trash)) # Drop the table (with purge) and make sure it doesn't exist in trash self.client.execute("drop table {0}.t2 purge".format(unique_database)) if not IS_S3 and not IS_ADLS: # In S3, deletes are eventual. So even though we dropped the table, the files # belonging to this table may still be visible for some unbounded time. This # happens only with PURGE. A regular DROP TABLE is just a copy of files which is # consistent. # The ADLS Python client is not strongly consistent, so these files may still be # visible after a DROP. (Remove after IMPALA-5335 is resolved) assert not self.filesystem_client.exists("{}/t2/".format(dbpath)) assert not self.filesystem_client.exists("{}/t2/t2.txt".format(dbpath)) assert not self.filesystem_client.exists("{}/t2/t2.txt".format(trash)) assert not self.filesystem_client.exists("{}/t2".format(trash)) # Create an external table t3 and run the same test as above. Make # sure the data is not deleted self.filesystem_client.make_dir("{}/data_t3/".format(dbpath), permission=777) self.filesystem_client.create_file( "{}/data_t3/data.txt".format(dbpath), file_data='100') self.client.execute("create external table {0}.t3(i int) stored as " "textfile location \'{1}/data_t3\'".format(unique_database, dbpath)) self.client.execute("drop table {0}.t3 purge".format(unique_database)) assert self.filesystem_client.exists("{}/data_t3/data.txt".format(dbpath)) self.filesystem_client.delete_file_dir("{}/data_t3".format(dbpath), recursive=True) @SkipIfFS.eventually_consistent @SkipIfLocal.hdfs_client def test_drop_cleans_hdfs_dirs(self, unique_database): self.client.execute('use default') # Verify the db directory exists assert self.filesystem_client.exists( "{1}/{0}.db/".format(unique_database, WAREHOUSE)) self.client.execute("create table {0}.t1(i int)".format(unique_database)) # Verify the table directory exists assert self.filesystem_client.exists( "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE)) # Dropping the table removes the table's directory and preserves the db's directory self.client.execute("drop table {0}.t1".format(unique_database)) assert not self.filesystem_client.exists( "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE)) assert self.filesystem_client.exists( "{1}/{0}.db/".format(unique_database, WAREHOUSE)) # Dropping the db removes the db's directory self.client.execute("drop database {0}".format(unique_database)) assert not self.filesystem_client.exists( "{1}/{0}.db/".format(unique_database, WAREHOUSE)) # Dropping the db using "cascade" removes all tables' and db's directories # but keeps the external tables' directory self._create_db(unique_database) self.client.execute("create table {0}.t1(i int)".format(unique_database)) self.client.execute("create table {0}.t2(i int)".format(unique_database)) self.client.execute("create external table {0}.t3(i int) " "location '{1}/{0}/t3/'".format(unique_database, WAREHOUSE)) self.client.execute("drop database {0} cascade".format(unique_database)) assert not self.filesystem_client.exists( "{1}/{0}.db/".format(unique_database, WAREHOUSE)) assert not self.filesystem_client.exists( "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE)) assert not self.filesystem_client.exists( "{1}/{0}.db/t2/".format(unique_database, WAREHOUSE)) assert self.filesystem_client.exists( "{1}/{0}/t3/".format(unique_database, WAREHOUSE)) self.filesystem_client.delete_file_dir( "{1}/{0}/t3/".format(unique_database, WAREHOUSE), recursive=True) assert not self.filesystem_client.exists( "{1}/{0}/t3/".format(unique_database, WAREHOUSE)) # Re-create database to make unique_database teardown succeed. self._create_db(unique_database) @SkipIfFS.eventually_consistent @SkipIfLocal.hdfs_client def test_truncate_cleans_hdfs_files(self, unique_database): # Verify the db directory exists assert self.filesystem_client.exists( "{1}/{0}.db/".format(unique_database, WAREHOUSE)) self.client.execute("create table {0}.t1(i int)".format(unique_database)) # Verify the table directory exists assert self.filesystem_client.exists( "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE)) try: # If we're testing S3, we want the staging directory to be created. self.client.execute("set s3_skip_insert_staging=false") # Should have created one file in the table's dir self.client.execute("insert into {0}.t1 values (1)".format(unique_database)) assert len(self.filesystem_client.ls( "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))) == 2 # Truncating the table removes the data files and the staging directory self.client.execute("truncate table {0}.t1".format(unique_database)) assert len(self.filesystem_client.ls( "{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))) == 0 self.client.execute( "create table {0}.t2(i int) partitioned by (p int)".format(unique_database)) # Verify the table directory exists assert self.filesystem_client.exists( "{1}/{0}.db/t2/".format(unique_database, WAREHOUSE)) # Should have created the partition dir, which should contain exactly one file self.client.execute( "insert into {0}.t2 partition(p=1) values (1)".format(unique_database)) assert len(self.filesystem_client.ls( "{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE))) == 1 # Truncating the table removes the data files and preserves the partition's # directory self.client.execute("truncate table {0}.t2".format(unique_database)) assert self.filesystem_client.exists( "{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE)) assert len(self.filesystem_client.ls( "{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE))) == 0 finally: # Reset to its default value. self.client.execute("set s3_skip_insert_staging=true") @SkipIfFS.incorrent_reported_ec @UniqueDatabase.parametrize(sync_ddl=True) def test_truncate_table(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False self.run_test_case('QueryTest/truncate-table', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @UniqueDatabase.parametrize(sync_ddl=True) def test_create_database(self, vector, unique_database): # The unique_database provides the .test a unique database name which allows # us to run this test in parallel with others. self.run_test_case('QueryTest/create-database', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) def test_comment_on_database(self, unique_database): comment = self._get_db_comment(unique_database) assert '' == comment self.client.execute("comment on database {0} is 'comment'".format(unique_database)) comment = self._get_db_comment(unique_database) assert 'comment' == comment self.client.execute("comment on database {0} is ''".format(unique_database)) comment = self._get_db_comment(unique_database) assert '' == comment self.client.execute("comment on database {0} is '\\'comment\\''".format( unique_database)) comment = self._get_db_comment(unique_database) assert "\\'comment\\'" == comment self.client.execute("comment on database {0} is null".format(unique_database)) comment = self._get_db_comment(unique_database) assert '' == comment def test_alter_database_set_owner(self, unique_database): self.client.execute("alter database {0} set owner user foo_user".format( unique_database)) properties = self._get_db_owner_properties(unique_database) assert len(properties) == 1 assert {'foo_user': 'USER'} == properties self.client.execute("alter database {0} set owner role foo_role".format( unique_database)) properties = self._get_db_owner_properties(unique_database) assert len(properties) == 1 assert {'foo_role': 'ROLE'} == properties def test_metadata_after_alter_database(self, unique_database): self.client.execute("create table {0}.tbl (i int)".format(unique_database)) self.client.execute("create function {0}.f() returns int " "location '{1}/libTestUdfs.so' symbol='NoArgs'" .format(unique_database, WAREHOUSE)) self.client.execute("alter database {0} set owner user foo_user".format( unique_database)) table_names = self.client.execute("show tables in {0}".format( unique_database)).get_data() assert "tbl" == table_names func_names = self.client.execute("show functions in {0}".format( unique_database)).get_data() assert "INT\tf()\tNATIVE\ttrue" == func_names def test_alter_table_set_owner(self, unique_database): table_name = "{0}.test_owner_tbl".format(unique_database) self.client.execute("create table {0}(i int)".format(table_name)) self.client.execute("alter table {0} set owner user foo_user".format(table_name)) owner = self._get_table_or_view_owner(table_name) assert ('foo_user', 'USER') == owner self.client.execute("alter table {0} set owner role foo_role".format(table_name)) owner = self._get_table_or_view_owner(table_name) assert ('foo_role', 'ROLE') == owner def test_alter_view_set_owner(self, unique_database): view_name = "{0}.test_owner_tbl".format(unique_database) self.client.execute("create view {0} as select 1".format(view_name)) self.client.execute("alter view {0} set owner user foo_user".format(view_name)) owner = self._get_table_or_view_owner(view_name) assert ('foo_user', 'USER') == owner self.client.execute("alter view {0} set owner role foo_role".format(view_name)) owner = self._get_table_or_view_owner(view_name) assert ('foo_role', 'ROLE') == owner # There is a query in QueryTest/create-table that references nested types, which is not # supported if old joins and aggs are enabled. Since we do not get any meaningful # additional coverage by running a DDL test under the old aggs and joins, it can be # skipped. @UniqueDatabase.parametrize(sync_ddl=True) def test_create_table(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False self.run_test_case('QueryTest/create-table', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @SkipIfFS.incorrent_reported_ec @UniqueDatabase.parametrize(sync_ddl=True) def test_create_table_like_table(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False self.run_test_case('QueryTest/create-table-like-table', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @UniqueDatabase.parametrize(sync_ddl=True) def test_create_table_like_file(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False self.run_test_case('QueryTest/create-table-like-file', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @SkipIfHive2.orc @SkipIfFS.hive @UniqueDatabase.parametrize(sync_ddl=True) def test_create_table_like_file_orc(self, vector, unique_database): COMPLEXTYPETBL_PATH = 'test-warehouse/managed/functional_orc_def.db/' \ 'complextypestbl_orc_def/' base_dir = list(filter(lambda s: s.startswith('base'), self.filesystem_client.ls(COMPLEXTYPETBL_PATH)))[0] bucket_file = list(filter(lambda s: s.startswith('bucket'), self.filesystem_client.ls(COMPLEXTYPETBL_PATH + base_dir)))[0] vector.get_value('exec_option')['abort_on_error'] = False create_table_from_orc(self.client, unique_database, 'timestamp_with_local_timezone') self.run_test_case('QueryTest/create-table-like-file-orc', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector), test_file_vars={ '$TRANSACTIONAL_COMPLEXTYPESTBL_FILE': FILESYSTEM_PREFIX + '/' + COMPLEXTYPETBL_PATH + base_dir + '/' + bucket_file}) @UniqueDatabase.parametrize(sync_ddl=True) def test_create_table_as_select(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False self.run_test_case('QueryTest/create-table-as-select', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @UniqueDatabase.parametrize(sync_ddl=True) @SkipIfKudu.no_hybrid_clock() def test_create_kudu(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT" self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) def test_comment_on_table(self, unique_database): table = '{0}.comment_table'.format(unique_database) self.client.execute("create table {0} (i int)".format(table)) comment = self._get_table_or_view_comment(table) assert comment is None self.client.execute("comment on table {0} is 'comment'".format(table)) comment = self._get_table_or_view_comment(table) assert "comment" == comment self.client.execute("comment on table {0} is ''".format(table)) comment = self._get_table_or_view_comment(table) assert "" == comment self.client.execute("comment on table {0} is '\\'comment\\''".format(table)) comment = self._get_table_or_view_comment(table) assert "\\\\'comment\\\\'" == comment self.client.execute("comment on table {0} is null".format(table)) comment = self._get_table_or_view_comment(table) assert comment is None def test_comment_on_view(self, unique_database): view = '{0}.comment_view'.format(unique_database) self.client.execute("create view {0} as select 1".format(view)) comment = self._get_table_or_view_comment(view) assert comment is None self.client.execute("comment on view {0} is 'comment'".format(view)) comment = self._get_table_or_view_comment(view) assert "comment" == comment self.client.execute("comment on view {0} is ''".format(view)) comment = self._get_table_or_view_comment(view) assert "" == comment self.client.execute("comment on view {0} is '\\'comment\\''".format(view)) comment = self._get_table_or_view_comment(view) assert "\\\\'comment\\\\'" == comment self.client.execute("comment on view {0} is null".format(view)) comment = self._get_table_or_view_comment(view) assert comment is None def test_comment_on_column(self, unique_database): table = "{0}.comment_table".format(unique_database) self.client.execute("create table {0} (i int) partitioned by (j int)".format(table)) comment = self._get_column_comment(table, 'i') assert '' == comment # Updating comment on a regular column. self.client.execute("comment on column {0}.i is 'comment 1'".format(table)) comment = self._get_column_comment(table, 'i') assert "comment 1" == comment # Updating comment on a partition column. self.client.execute("comment on column {0}.j is 'comment 2'".format(table)) comment = self._get_column_comment(table, 'j') assert "comment 2" == comment self.client.execute("comment on column {0}.i is ''".format(table)) comment = self._get_column_comment(table, 'i') assert "" == comment self.client.execute("comment on column {0}.i is '\\'comment\\''".format(table)) comment = self._get_column_comment(table, 'i') assert "\\'comment\\'" == comment self.client.execute("comment on column {0}.i is null".format(table)) comment = self._get_column_comment(table, 'i') assert "" == comment view = "{0}.comment_view".format(unique_database) self.client.execute("create view {0}(i) as select 1".format(view)) comment = self._get_column_comment(view, 'i') assert "" == comment self.client.execute("comment on column {0}.i is 'comment'".format(view)) comment = self._get_column_comment(view, 'i') assert "comment" == comment self.client.execute("comment on column {0}.i is ''".format(view)) comment = self._get_column_comment(view, 'i') assert "" == comment self.client.execute("comment on column {0}.i is '\\'comment\\''".format(view)) comment = self._get_column_comment(view, 'i') assert "\\'comment\\'" == comment self.client.execute("comment on column {0}.i is null".format(view)) comment = self._get_column_comment(view, 'i') assert "" == comment @UniqueDatabase.parametrize(sync_ddl=True) def test_sync_ddl_drop(self, unique_database): """Verifies the catalog gets updated properly when dropping objects with sync_ddl enabled""" self.client.set_configuration({'sync_ddl': 1}) # Drop the database immediately after creation (within a statestore heartbeat) and # verify the catalog gets updated properly. self.client.execute("drop database {0}".format(unique_database)) assert unique_database not in self.all_db_names() # Re-create database to make unique_database teardown succeed. self._create_db(unique_database) # TODO: don't use hdfs_client @SkipIfLocal.hdfs_client @SkipIfFS.incorrent_reported_ec @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2) def test_alter_table(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False # Create an unpartitioned table to get a filesystem directory that does not # use the (key=value) format. The directory is automatically cleanup up # by the unique_database fixture. self.client.execute("create table {0}.part_data (i int)".format(unique_database)) dbpath = "{1}/{0}.db".format(unique_database, WAREHOUSE) assert self.filesystem_client.exists("{}/part_data".format(dbpath)) self.filesystem_client.create_file( "{}/part_data/data.txt".format(dbpath), file_data='1984') self.run_test_case('QueryTest/alter-table', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @SkipIfFS.hdfs_caching @SkipIfLocal.hdfs_client @UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2) def test_alter_table_hdfs_caching(self, vector, unique_database): self.run_test_case('QueryTest/alter-table-hdfs-caching', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @UniqueDatabase.parametrize(sync_ddl=True) def test_alter_set_column_stats(self, vector, unique_database): self.run_test_case('QueryTest/alter-table-set-column-stats', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) # Run serially as alter waits for catalog to catch up before marking "DDL finished" so # we don't have a good way to confirm the alter timeline if catalog gets delayed. @pytest.mark.execute_serially def test_alter_table_rename_independent(self, vector, unique_database): """Tests that two alter table renames run concurrently do not block each other.""" def table_name(i): return "{}.tbl_{}".format(unique_database, i) def alter(i, j): return "alter table {} rename to {}".format(table_name(i), table_name(j)) def get_read_lock_duration_ms(profile): read_lock_durations = re.findall(r"Got catalog version read lock: [^ ]*", profile) assert len(read_lock_durations) == 1 return parse_duration_string_ms(read_lock_durations[0].split(" ")[-1]) self.client.execute("create table {} (i int)".format(table_name(1))) self.client.execute("create table {} (i int)".format(table_name(2))) # Ensure loading metadata is not a factor in alter execution time. self.client.execute("describe {}".format(table_name(1))) self.client.execute("describe {}".format(table_name(2))) new_vector = deepcopy(vector) new_vector.get_value('exec_option')['debug_action'] = \ "catalogd_table_rename_delay:SLEEP@5000" with self.create_impala_client_from_vector(new_vector) as client1, \ self.create_impala_client_from_vector(new_vector) as client2: start = time.time() handle1 = client1.execute_async(alter(1, 3)) handle2 = client2.execute_async(alter(2, 4)) assert client1.wait_for_finished_timeout(handle1, timeout=15) assert client2.wait_for_finished_timeout(handle2, timeout=15) assert time.time() - start < 15 profile1 = client1.get_runtime_profile(handle1) assert get_read_lock_duration_ms(profile1) < 5000 profile2 = client2.get_runtime_profile(handle2) assert get_read_lock_duration_ms(profile2) < 5000 client1.close_query(handle1) client2.close_query(handle2) @UniqueDatabase.parametrize(num_dbs=2) def test_concurrent_alter_table_rename(self, vector, unique_database): test_self = self class ThreadLocalClient(threading.local): def __init__(self): self.client = test_self.create_impala_client_from_vector(vector) pool = ThreadPool(processes=8) tlc = ThreadLocalClient() def run_rename(i): if i % 2 == 0: tlc.client.set_configuration_option("sync_ddl", "1") is_partitioned = i % 4 < 2 tbl_name = "{}.tbl_{}".format(unique_database, i) tlc.client.execute("create table {}(i int){}".format( tbl_name, "partitioned by(p int)" if is_partitioned else "")) if i % 8 < 4: # Rename inside the same db new_tbl_name = tbl_name + "_new" else: # Move to another db new_tbl_name = "{}2.tbl_{}".format(unique_database, i) stmts = [ "alter table {} rename to {}".format(tbl_name, new_tbl_name), "alter table {} rename to {}".format(new_tbl_name, tbl_name), ] # Move the table back and forth in several rounds for _ in range(4): for query in stmts: # Run the query asynchronously to avoid getting stuck by it handle = tlc.client.execute_async(query) is_finished = tlc.client.wait_for_finished_timeout(handle, timeout=60) assert is_finished, "Query timeout(60s): " + query tlc.client.close_query(handle) return True # Run renames in parallel NUM_ITERS = 16 worker = [None] * (NUM_ITERS + 1) for i in range(1, NUM_ITERS + 1): worker[i] = pool.apply_async(run_rename, (i,)) for i in range(1, NUM_ITERS + 1): try: assert worker[i].get(timeout=100) except TimeoutError: dump_server_stacktraces() assert False, "Timeout in thread run_ddls(%d)" % i @SkipIfFS.hbase @UniqueDatabase.parametrize(sync_ddl=True) def test_alter_hbase_set_column_stats(self, vector, unique_database): self.run_test_case('QueryTest/alter-hbase-table-set-column-stats', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @SkipIfLocal.hdfs_client def test_drop_partition_with_purge(self, unique_database): """Verfies whether alter drop partition purge actually skips trash""" self.client.execute( "create table {0}.t1(i int) partitioned by (j int)".format(unique_database)) # Add two partitions (j=1) and (j=2) to table t1 self.client.execute("alter table {0}.t1 add partition(j=1)".format(unique_database)) self.client.execute("alter table {0}.t1 add partition(j=2)".format(unique_database)) dbpath = "{1}/{0}.db".format(unique_database, WAREHOUSE) self.filesystem_client.create_file("{}/t1/j=1/j1.txt".format(dbpath), file_data='j1') self.filesystem_client.create_file("{}/t1/j=2/j2.txt".format(dbpath), file_data='j2') # Drop the partition (j=1) without purge and make sure it exists in trash self.client.execute("alter table {0}.t1 drop partition(j=1)".format(unique_database)) assert not self.filesystem_client.exists("{}/t1/j=1/j1.txt".format(dbpath)) assert not self.filesystem_client.exists("{}/t1/j=1".format(dbpath)) trash = get_trash_path("test-warehouse", unique_database + ".db") assert self.filesystem_client.exists('{}/t1/j=1/j1.txt'.format(trash)) assert self.filesystem_client.exists('{}/t1/j=1'.format(trash)) # Drop the partition (with purge) and make sure it doesn't exist in trash self.client.execute("alter table {0}.t1 drop partition(j=2) purge".format( unique_database)) if not IS_S3 and not IS_ADLS: # In S3, deletes are eventual. So even though we dropped the partition, the files # belonging to this partition may still be visible for some unbounded time. This # happens only with PURGE. A regular DROP TABLE is just a copy of files which is # consistent. # The ADLS Python client is not strongly consistent, so these files may still be # visible after a DROP. (Remove after IMPALA-5335 is resolved) assert not self.filesystem_client.exists("{}/t1/j=2/j2.txt".format(dbpath)) assert not self.filesystem_client.exists("{}/t1/j=2".format(dbpath)) assert not self.filesystem_client.exists('{}/t1/j=2/j2.txt'.format(trash)) assert not self.filesystem_client.exists('{}/t1/j=2'.format(trash)) @UniqueDatabase.parametrize(sync_ddl=True) def test_views_ddl(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False self.run_test_case('QueryTest/views-ddl', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @UniqueDatabase.parametrize() def test_view_hints(self, unique_database): # Test that plan hints are stored in the view's comment field; this should work # regardless of how Hive formats the output. Getting this to work with the # automated test case runner is rather difficult, so verify directly. There # should be two # of each join hint, one for the original text, one for the expanded self.client.execute(""" create view {0}.hints_test as select /* +straight_join */ a.* from functional.alltypestiny a inner join /* +broadcast */ functional.alltypes b on a.id = b.id inner join /* +shuffle */ functional.alltypessmall c on b.id = c.id """.format(unique_database)) results = self.execute_query("describe formatted %s.hints_test" % unique_database) sj, bc, shuf = 0, 0, 0 for row in results.data: sj += '-- +straight_join' in row bc += '-- +broadcast' in row shuf += '-- +shuffle' in row assert sj == 2 assert bc == 2 assert shuf == 2 # Test querying the hinted view. results = self.execute_query("select count(*) from %s.hints_test" % unique_database) assert results.success assert len(results.data) == 1 assert results.data[0] == '8' # Test the plan to make sure hints were applied correctly plan = self.execute_query("explain select * from %s.hints_test" % unique_database, query_options={'explain_level': 0}) plan_match = """PLAN-ROOT SINK 08:EXCHANGE [UNPARTITIONED] 04:HASH JOIN [INNER JOIN, PARTITIONED] |--07:EXCHANGE [HASH(c.id)] | 02:SCAN {filesystem_name} [functional.alltypessmall c] 06:EXCHANGE [HASH(b.id)] 03:HASH JOIN [INNER JOIN, BROADCAST] |--05:EXCHANGE [BROADCAST] | 01:SCAN {filesystem_name} [functional.alltypes b] 00:SCAN {filesystem_name} [functional.alltypestiny a]""" assert plan_match.format(filesystem_name=FILESYSTEM_NAME) in '\n'.join(plan.data) def _verify_describe_view(self, vector, view_name, expected_substr): """ Verify across all impalads that the view 'view_name' has the given substring in its expanded SQL. If SYNC_DDL is enabled, the verification should complete immediately. Otherwise, loops waiting for the expected condition to pass. """ if vector.get_value('exec_option')['sync_ddl']: num_attempts = 1 else: num_attempts = 60 for impalad in ImpalaCluster.get_e2e_test_cluster().impalads: client = impalad.service.create_client_from_vector(vector) try: for attempt in itertools.count(1): assert attempt <= num_attempts, "ran out of attempts" try: result = self.execute_query_expect_success( client, "describe formatted %s" % view_name) exp_line = [line for line in result.data if 'View Expanded' in line][0] except IMPALA_CONNECTION_EXCEPTION as e: # In non-SYNC_DDL tests, it's OK to get a "missing view" type error # until the metadata propagates. exp_line = "Exception: %s" % e if expected_substr in exp_line.lower(): return time.sleep(1) finally: client.close() def test_views_describe(self, vector, unique_database): # IMPALA-6896: Tests that altered views can be described by all impalads. impala_cluster = ImpalaCluster.get_e2e_test_cluster() impalads = impala_cluster.impalads view_name = "%s.test_describe_view" % unique_database first_client = impalads[0].service.create_client_from_vector(vector) try: # Create a view and verify it's visible. self.execute_query_expect_success(first_client, "create view {0} as " "select * from functional.alltypes" .format(view_name)) self._verify_describe_view(vector, view_name, "select * from functional.alltypes") # Alter the view and verify the alter is visible. self.execute_query_expect_success(first_client, "alter view {0} as " "select * from functional.alltypesagg" .format(view_name)) self._verify_describe_view(vector, view_name, "select * from functional.alltypesagg") finally: first_client.close() @UniqueDatabase.parametrize(sync_ddl=True) def test_functions_ddl(self, vector, unique_database): self.run_test_case('QueryTest/functions-ddl', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) @SkipIfLocal.hdfs_client def test_create_alter_bulk_partition(self, unique_database): # Change the scale depending on the exploration strategy, with 50 partitions this # test runs a few minutes, with 10 partitions it takes ~50s for two configurations. num_parts = 50 if self.exploration_strategy() == 'exhaustive' else 10 fq_tbl_name = unique_database + ".part_test_tbl" self.client.execute("create table {0}(i int) partitioned by(j int, s string) " "location '{1}/{0}'".format(fq_tbl_name, WAREHOUSE)) # Add some partitions (first batch of two) for i in range(num_parts // 5): start = time.time() self.client.execute( "alter table {0} add partition(j={1}, s='{1}')".format(fq_tbl_name, i)) LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start)) # Modify one of the partitions self.client.execute("alter table {0} partition(j=1, s='1')" " set fileformat parquetfile".format(fq_tbl_name)) # Alter one partition to a non-existent location twice (IMPALA-741) self.filesystem_client.delete_file_dir("tmp/dont_exist1/", recursive=True) self.filesystem_client.delete_file_dir("tmp/dont_exist2/", recursive=True) self.execute_query_expect_success(self.client, "alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist1'" .format(fq_tbl_name, WAREHOUSE)) self.execute_query_expect_success(self.client, "alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist2'" .format(fq_tbl_name, WAREHOUSE)) # Add some more partitions for i in range(num_parts // 5, num_parts): start = time.time() self.client.execute( "alter table {0} add partition(j={1},s='{1}')".format(fq_tbl_name, i)) LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start)) # Insert data and verify it shows up. self.client.execute( "insert into table {0} partition(j=1, s='1') select 1".format(fq_tbl_name)) assert '1' == self.execute_scalar("select count(*) from {0}".format(fq_tbl_name)) @SkipIfLocal.hdfs_client def test_alter_table_set_fileformat(self, unique_database): # Tests that SET FILEFORMAT clause is set for ALTER TABLE ADD PARTITION statement fq_tbl_name = unique_database + ".p_fileformat" self.client.execute( "create table {0}(i int) partitioned by (p int)".format(fq_tbl_name)) # Add a partition with Parquet fileformat self.execute_query_expect_success(self.client, "alter table {0} add partition(p=1) set fileformat parquet" .format(fq_tbl_name)) # Add two partitions with ORC fileformat self.execute_query_expect_success(self.client, "alter table {0} add partition(p=2) partition(p=3) set fileformat orc" .format(fq_tbl_name)) result = self.execute_query_expect_success(self.client, "SHOW PARTITIONS %s" % fq_tbl_name) assert 1 == len([line for line in result.data if line.find("PARQUET") != -1]) assert 2 == len([line for line in result.data if line.find("ORC") != -1]) def test_alter_table_create_many_partitions(self, unique_database): """ Checks that creating more partitions than the MAX_PARTITION_UPDATES_PER_RPC batch size works, in that it creates all the underlying partitions. """ self.client.execute( "create table {0}.t(i int) partitioned by (p int)".format(unique_database)) MAX_PARTITION_UPDATES_PER_RPC = 500 alter_stmt = "alter table {0}.t add ".format(unique_database) + " ".join( "partition(p=%d)" % (i,) for i in range(MAX_PARTITION_UPDATES_PER_RPC + 2)) self.client.execute(alter_stmt) partitions = self.client.execute("show partitions {0}.t".format(unique_database)) # Show partitions will contain partition HDFS paths, which we expect to contain # "p=val" subdirectories for each partition. The regexp finds all the "p=[0-9]*" # paths, converts them to integers, and checks that wehave all the ones we # expect. PARTITION_RE = re.compile("p=([0-9]+)") assert list(map(int, PARTITION_RE.findall(str(partitions.data)))) == \ list(range(MAX_PARTITION_UPDATES_PER_RPC + 2)) def test_create_alter_tbl_properties(self, unique_database): fq_tbl_name = unique_database + ".test_alter_tbl" # Specify TBLPROPERTIES and SERDEPROPERTIES at CREATE time self.client.execute("""create table {0} (i int) with serdeproperties ('s1'='s2', 's3'='s4') tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name)) properties = self._get_tbl_properties(fq_tbl_name) if HIVE_MAJOR_VERSION > 2: assert properties['OBJCAPABILITIES'] == 'EXTREAD,EXTWRITE' assert properties['TRANSLATED_TO_EXTERNAL'] == 'TRUE' assert properties['external.table.purge'] == 'TRUE' assert properties['EXTERNAL'] == 'TRUE' del properties['OBJCAPABILITIES'] del properties['TRANSLATED_TO_EXTERNAL'] del properties['external.table.purge'] del properties['EXTERNAL'] assert len(properties) == 2 # The transient_lastDdlTime is variable, so don't verify the value. assert 'transient_lastDdlTime' in properties del properties['transient_lastDdlTime'] assert {'p1': 'v1'} == properties properties = self._get_serde_properties(fq_tbl_name) assert {'s1': 's2', 's3': 's4'} == properties # Modify the SERDEPROPERTIES using ALTER TABLE SET. self.client.execute("alter table {0} set serdeproperties " "('s1'='new', 's5'='s6')".format(fq_tbl_name)) properties = self._get_serde_properties(fq_tbl_name) assert {'s1': 'new', 's3': 's4', 's5': 's6'} == properties # Modify the TBLPROPERTIES using ALTER TABLE SET. self.client.execute("alter table {0} set tblproperties " "('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')".format(fq_tbl_name)) properties = self._get_tbl_properties(fq_tbl_name) if HIVE_MAJOR_VERSION > 2: assert 'OBJCAPABILITIES' in properties assert 'transient_lastDdlTime' in properties assert properties['p1'] == 'v1' assert properties['prop1'] == 'val1' assert properties['p2'] == 'val3' assert properties[''] == '' @SkipIfHive2.acid def test_create_insertonly_tbl(self, unique_database): insertonly_tbl = unique_database + ".test_insertonly" self.client.execute("""create table {0} (coli int) stored as parquet tblproperties( 'transactional'='true', 'transactional_properties'='insert_only')""" .format(insertonly_tbl)) properties = self._get_tbl_properties(insertonly_tbl) assert properties['OBJCAPABILITIES'] == 'HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE' def test_alter_tbl_properties_reload(self, unique_database): # IMPALA-8734: Force a table schema reload when setting table properties. tbl_name = "test_tbl" self.execute_query_expect_success(self.client, "create table {0}.{1} (c1 string)" .format(unique_database, tbl_name)) self.filesystem_client.create_file("{2}/{0}.db/{1}/f". format(unique_database, tbl_name, WAREHOUSE), file_data="\nfoo\n") self.execute_query_expect_success(self.client, "alter table {0}.{1} set tblproperties" "('serialization.null.format'='foo')" .format(unique_database, tbl_name)) result = self.execute_query_expect_success(self.client, "select * from {0}.{1}" .format(unique_database, tbl_name)) assert len(result.data) == 2 assert result.data[0] == '' assert result.data[1] == 'NULL' @SkipIfFS.incorrent_reported_ec @UniqueDatabase.parametrize(sync_ddl=True) def test_partition_ddl_predicates(self, vector, unique_database): self.run_test_case('QueryTest/partition-ddl-predicates-all-fs', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) if IS_HDFS: self.run_test_case('QueryTest/partition-ddl-predicates-hdfs-only', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) def test_create_table_file_format(self, unique_database): # When default_file_format query option is not specified, the default table file # format is TEXT. text_table = "{0}.text_tbl".format(unique_database) self.execute_query_expect_success( self.client, "create table {0}(i int)".format(text_table)) result = self.execute_query_expect_success( self.client, "show create table {0}".format(text_table)) assert any("TEXTFILE" in x for x in result.data) self.execute_query_expect_failure( self.client, "create table {0}.foobar_tbl".format(unique_database), {"default_file_format": "foobar"}) parquet_table = "{0}.parquet_tbl".format(unique_database) self.execute_query_expect_success( self.client, "create table {0}(i int)".format(parquet_table), {"default_file_format": "parquet"}) result = self.execute_query_expect_success( self.client, "show create table {0}".format(parquet_table)) assert any("PARQUET" in x for x in result.data) # The table created should still be ORC even though the default_file_format query # option is set to parquet. orc_table = "{0}.orc_tbl".format(unique_database) self.execute_query_expect_success( self.client, "create table {0}(i int) stored as orc".format(orc_table), {"default_file_format": "parquet"}) result = self.execute_query_expect_success( self.client, "show create table {0}".format(orc_table)) assert any("ORC" in x for x in result.data) @SkipIfHive2.acid def test_create_table_transactional_type(self, unique_database): # When default_transactional_type query option is not specified, the transaction # related table properties are not set. non_acid_table = "{0}.non_acid_tbl".format(unique_database) self.execute_query_expect_success( self.client, "create table {0}(i int)".format(non_acid_table), {"default_transactional_type": "none"}) props = self._get_properties("Table Parameters", non_acid_table) assert "transactional" not in props assert "transactional_properties" not in props # Create table as "insert_only" transactional. insert_only_acid_table = "{0}.insert_only_acid_tbl".format(unique_database) self.execute_query_expect_success( self.client, "create table {0}(i int)".format(insert_only_acid_table), {"default_transactional_type": "insert_only"}) props = self._get_properties("Table Parameters", insert_only_acid_table) assert props["transactional"] == "true" assert props["transactional_properties"] == "insert_only" # default_transactional_type query option should not affect external tables external_table = "{0}.external_tbl".format(unique_database) self.execute_query_expect_success( self.client, "create external table {0}(i int)".format(external_table), {"default_transactional_type": "insert_only"}) props = self._get_properties("Table Parameters", external_table) assert "transactional" not in props assert "transactional_properties" not in props # default_transactional_type query option should not affect Kudu tables. kudu_table = "{0}.kudu_tbl".format(unique_database) self.execute_query_expect_success( self.client, "create table {0}(i int primary key) stored as kudu".format(kudu_table), {"default_transactional_type": "insert_only"}) props = self._get_properties("Table Parameters", kudu_table) assert "transactional" not in props assert "transactional_properties" not in props # default_transactional_type query option should have no effect when transactional # table properties are set manually. manual_acid_table = "{0}.manual_acid_tbl".format(unique_database) self.execute_query_expect_success( self.client, "create table {0}(i int) TBLPROPERTIES ('transactional'='false')" .format(manual_acid_table), {"default_transactional_type": "insert_only"}) props = self._get_properties("Table Parameters", manual_acid_table) assert "transactional" not in props assert "transactional_properties" not in props def test_kudu_column_comment(self, unique_database): table = "{0}.kudu_table0".format(unique_database) self.client.execute("create table {0}(x int comment 'x' primary key) \ stored as kudu".format(table)) comment = self._get_column_comment(table, 'x') assert "x" == comment table = "{0}.kudu_table".format(unique_database) self.client.execute("create table {0}(i int primary key) stored as kudu" .format(table)) comment = self._get_column_comment(table, 'i') assert "" == comment self.client.execute("comment on column {0}.i is 'comment1'".format(table)) comment = self._get_column_comment(table, 'i') assert "comment1" == comment self.client.execute("comment on column {0}.i is ''".format(table)) comment = self._get_column_comment(table, 'i') assert "" == comment self.client.execute("comment on column {0}.i is 'comment2'".format(table)) comment = self._get_column_comment(table, 'i') assert "comment2" == comment self.client.execute("comment on column {0}.i is null".format(table)) comment = self._get_column_comment(table, 'i') assert "" == comment self.client.execute("alter table {0} alter column i set comment 'comment3'" .format(table)) comment = self._get_column_comment(table, 'i') assert "comment3" == comment self.client.execute("alter table {0} alter column i set comment ''".format(table)) comment = self._get_column_comment(table, 'i') assert "" == comment self.client.execute("alter table {0} add columns (j int comment 'comment4')" .format(table)) comment = self._get_column_comment(table, 'j') assert "comment4" == comment @UniqueDatabase.parametrize(sync_ddl=True) def test_describe_materialized_view(self, vector, unique_database): vector.get_value('exec_option')['abort_on_error'] = False self.run_test_case('QueryTest/describe-materialized-view', vector, use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector)) # IMPALA-10811: RPC to submit query getting stuck for AWS NLB forever # Test HS2, Beeswax and HS2-HTTP three clients. class TestAsyncDDL(TestDdlBase): @classmethod def add_test_dimensions(cls): super(TestAsyncDDL, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension()) cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( sync_ddl=[0], disable_codegen_options=[False])) def test_async_ddl(self, vector, unique_database): self.run_test_case('QueryTest/async_ddl', vector, use_db=unique_database) def test_async_ddl_with_JDBC(self, unique_database): self.exec_with_jdbc("drop table if exists {0}.test_table".format(unique_database)) self.exec_with_jdbc_and_compare_result( "create table {0}.test_table(a int)".format(unique_database), "'Table has been created.'") self.exec_with_jdbc("drop table if exists {0}.alltypes_clone".format(unique_database)) self.exec_with_jdbc_and_compare_result( "create table {0}.alltypes_clone as select * from\ functional_parquet.alltypes".format(unique_database), "'Inserted 7300 row(s)'") @classmethod def test_get_operation_status_for_client(self, client, unique_database): # Setup SLEEP_S = 2 client.execute("select count(*) from functional_parquet.alltypes") client.set_configuration_option("enable_async_ddl_execution", "true") client.set_configuration_option("debug_action", "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@%s" % (SLEEP_S * 1000)) # Run the test query which will only compile the DDL in execute_statement() # and measure the time spent. Should be less than 3s. start = time.time() handle = client.execute_async( "create table {0}.alltypes_clone as select sleep({1})".format( unique_database, SLEEP_S * 1000)) end = time.time() assert (end - start <= 3) # The table creation and population part will be done in a separate thread. # The query must be in PENDING state after execute_async and enter RUNNING state # after creating the table in catalogd (at least SLEEP_S delay). The query can # enter FINISHED state after another delay of at least SLEEP_S. assert client.get_impala_exec_state(handle) == PENDING client.wait_for_impala_state(handle, RUNNING, SLEEP_S + 3) client.wait_for_impala_state(handle, FINISHED, SLEEP_S + 3) client.close_query(handle) def test_get_operation_status_for_async_ddl(self, vector, unique_database): """Tests that for an asynchronously executed DDL with delay, GetOperationStatus must be issued repeatedly. Test client hs2-http, hs2 and beeswax""" client = self.default_impala_client(vector.get_value('protocol')) self.test_get_operation_status_for_client(client, unique_database) class TestAsyncDDLTiming(TestDdlBase): @classmethod def add_test_dimensions(cls): super(TestAsyncDDLTiming, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension()) cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension( sync_ddl=[0], disable_codegen_options=[False])) cls.ImpalaTestMatrix.add_dimension( ImpalaTestDimension('enable_async_ddl_execution', True, False)) def test_alter_table_recover(self, vector, unique_database): enable_async_ddl = vector.get_value('enable_async_ddl_execution') client = self.create_impala_client(protocol=vector.get_value('protocol')) try: # Setup for the alter table case (create table that points to an existing # location) alltypes_location = get_fs_path("/test-warehouse/alltypes_parquet") source_tbl = "functional_parquet.alltypes" dest_tbl = "{0}.alltypes_clone".format(unique_database) create_table_stmt = 'create external table {0} like {1} location "{2}"'.format( dest_tbl, source_tbl, alltypes_location) self.execute_query_expect_success(client, create_table_stmt) # Describe the table to fetch its metadata self.execute_query_expect_success(client, "describe {0}".format(dest_tbl)) # Configure whether to use async DDL and add appropriate delays new_vector = deepcopy(vector) new_vector.get_value('exec_option')['enable_async_ddl_execution'] = enable_async_ddl new_vector.get_value('exec_option')['debug_action'] = \ "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000" exec_start = time.time() alter_stmt = "alter table {0} recover partitions".format(dest_tbl) handle = self.execute_query_async_using_client(client, alter_stmt, new_vector) exec_end = time.time() exec_time = exec_end - exec_start state = client.get_impala_exec_state(handle) if enable_async_ddl: assert state in [PENDING, RUNNING] else: assert state in [RUNNING, FINISHED] # Wait for the statement to finish with a timeout of 20 seconds wait_start = time.time() client.wait_for_impala_state(handle, FINISHED, 20) wait_end = time.time() wait_time = wait_end - wait_start self.close_query_using_client(client, handle) # In sync mode: # The entire DDL is processed in the exec step with delay. exec_time should be # more than 10 seconds. # # In async mode: # The compilation of DDL is processed in the exec step without delay. And the # processing of the DDL plan is in wait step with delay. The wait time should # definitely take more time than 10 seconds. if enable_async_ddl: assert(wait_time >= 10) else: assert(exec_time >= 10) finally: client.close() def test_ctas(self, vector, unique_database): enable_async_ddl = vector.get_value('enable_async_ddl_execution') client = self.create_impala_client(protocol=vector.get_value('protocol')) try: # The CTAS is going to need the metadata of the source table in the # select. To avoid flakiness about metadata loading, this selects from # that source table first to get the metadata loaded. self.execute_query_expect_success(client, "select count(*) from functional_parquet.alltypes") # Configure whether to use async DDL and add appropriate delays new_vector = deepcopy(vector) new_vector.get_value('exec_option')['enable_async_ddl_execution'] = enable_async_ddl create_delay = "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000" insert_delay = "CRS_BEFORE_COORD_STARTS:SLEEP@2000" new_vector.get_value('exec_option')['debug_action'] = \ "{0}|{1}".format(create_delay, insert_delay) dest_tbl = "{0}.ctas_test".format(unique_database) source_tbl = "functional_parquet.alltypes" ctas_stmt = 'create external table {0} as select * from {1}'.format( dest_tbl, source_tbl) exec_start = time.time() handle = self.execute_query_async_using_client(client, ctas_stmt, new_vector) exec_end = time.time() exec_time = exec_end - exec_start # The CRS_BEFORE_COORD_STARTS delay postpones the transition from PENDING # to RUNNING, so the sync case should be in PENDING state at the end of # the execute call. This means that the sync and async cases are the same. assert client.is_pending(handle) # Wait for the statement to finish with a timeout of 40 seconds # (60 seconds without shortcircuit reads). There are other tests running # in parallel and ASAN can be slow, so this timeout has been bumped # substantially to avoid flakiness. The actual test case does not depend # on the statement finishing in a particular amount of time. wait_time = 40 if IS_HDFS else 60 wait_start = time.time() client.wait_for_impala_state(handle, FINISHED, wait_time) wait_end = time.time() wait_time = wait_end - wait_start self.close_query_using_client(client, handle) # In sync mode: # The entire CTAS is processed in the exec step with delay. exec_time should be # more than 10 seconds. # # In async mode: # The compilation of CTAS is processed in the exec step without delay. And the # processing of the CTAS plan is in wait step with delay. The wait time should # definitely take more time than 10 seconds. if enable_async_ddl: assert(wait_time >= 10) else: assert(exec_time >= 10) finally: client.close() class TestDdlLogs(TestDdlBase): @classmethod def add_test_dimensions(cls): super(TestDdlLogs, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension_from_dict({ 'enable_async_ddl_execution': [True, False]})) @SkipIfDockerizedCluster.daemon_logs_not_exposed def test_error_logs(self, vector, unique_database): query_opts = vector.get_value('exec_option') tbl_name = 'test_async' if query_opts['enable_async_ddl_execution'] else 'test_sync' tbl_name = unique_database + '.' + tbl_name result = self.execute_query_expect_failure( self.client, 'invalidate metadata ' + tbl_name, query_opts) err = "TableNotFoundException: Table not found: " + tbl_name assert err in str(result) self.assert_impalad_log_contains('INFO', err) # IMPALA-2002: Tests repeated adding/dropping of .jar and .so in the lib cache. class TestLibCache(TestDdlBase): @classmethod def add_test_dimensions(cls): super(TestLibCache, cls).add_test_dimensions() cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension()) def test_create_drop_function(self, vector, unique_database): """This will create, run, and drop the same function repeatedly, exercising the lib cache mechanism. """ create_fn_stmt = ("create function {0}.f() returns int " "location '{1}/libTestUdfs.so' symbol='NoArgs'" .format(unique_database, WAREHOUSE)) select_stmt = ("select {0}.f() from functional.alltypes limit 10" .format(unique_database)) drop_fn_stmt = "drop function %s {0}.f()".format(unique_database) self.create_drop_ddl(vector, [create_fn_stmt], [drop_fn_stmt], select_stmt) # Run serially because this test inspects global impalad metrics. # TODO: The metrics checks could be relaxed to enable running this test in # parallel, but that might need a more general wait_for_metric_value(). @pytest.mark.execute_serially def test_create_drop_data_src(self, vector, unique_database): """This will create, run, and drop the same data source repeatedly, exercising the lib cache mechanism. """ data_src_name = unique_database + "_datasrc" create_ds_stmt = ("CREATE DATA SOURCE {0} " "LOCATION '{1}/data-sources/test-data-source.jar' " "CLASS 'org.apache.impala.extdatasource.AllTypesDataSource' " "API_VERSION 'V1'".format(data_src_name, WAREHOUSE)) create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) " "PRODUCED BY DATA SOURCE {1}('dummy_init_string')")\ .format(unique_database, data_src_name) drop_ds_stmt = "drop data source %s {0}".format(data_src_name) drop_tbl_stmt = "drop table %s {0}.data_src_tbl".format(unique_database) select_stmt = "select * from {0}.data_src_tbl limit 1".format(unique_database) class_cache_hits_metric = "external-data-source.class-cache.hits" class_cache_misses_metric = "external-data-source.class-cache.misses" create_stmts = [create_ds_stmt, create_tbl_stmt] drop_stmts = [drop_tbl_stmt, drop_ds_stmt] # The ImpaladService is used to capture metrics service = self.impalad_test_service # Initial metric values class_cache_hits = service.get_metric_value(class_cache_hits_metric) class_cache_misses = service.get_metric_value(class_cache_misses_metric) # Test with 1 node so we can check the metrics on only the coordinator vector.get_value('exec_option')['num_nodes'] = 1 num_iterations = 2 self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, num_iterations) # Check class cache metrics. Shouldn't have any new cache hits, there should be # 2 cache misses for every iteration (jar is loaded by both the FE and BE). expected_cache_misses = class_cache_misses + (num_iterations * 2) service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits) service.wait_for_metric_value(class_cache_misses_metric, expected_cache_misses) # Test with a table that caches the class create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) " "PRODUCED BY DATA SOURCE {1}('CACHE_CLASS::dummy_init_string')")\ .format(unique_database, data_src_name) create_stmts = [create_ds_stmt, create_tbl_stmt] # Run once before capturing metrics because the class already may be cached from # a previous test run. # TODO: Provide a way to clear the cache self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1) # Capture metric values and run again, should hit the cache. class_cache_hits = service.get_metric_value(class_cache_hits_metric) class_cache_misses = service.get_metric_value(class_cache_misses_metric) self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1) service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits + 2) service.wait_for_metric_value(class_cache_misses_metric, class_cache_misses) def create_drop_ddl(self, vector, create_stmts, drop_stmts, select_stmt, num_iterations=3): """Helper method to run CREATE/DROP DDL commands repeatedly and exercise the lib cache. create_stmts is the list of CREATE statements to be executed in order drop_stmts is the list of DROP statements to be executed in order. Each statement should have a '%s' placeholder to insert "IF EXISTS" or "". The select_stmt is just a single statement to test after executing the CREATE statements. TODO: it's hard to tell that the cache is working (i.e. if it did nothing to drop the cache, these tests would still pass). Testing that is a bit harder and requires us to update the udf binary in the middle. """ self.client.set_configuration(vector.get_value("exec_option")) for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("if exists")) for i in range(0, num_iterations): for create_stmt in create_stmts: self.client.execute(create_stmt) self.client.execute(select_stmt) for drop_stmt in drop_stmts: self.client.execute(drop_stmt % (""))