# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. from __future__ import absolute_import, division, print_function from builtins import range import pytest import re import threading import time from multiprocessing.pool import ThreadPool from multiprocessing import TimeoutError from tests.common.custom_cluster_test_suite import CustomClusterTestSuite from tests.common.impala_connection import ( ERROR, FINISHED, IMPALA_CONNECTION_EXCEPTION) from tests.util.shell_util import dump_server_stacktraces class TestConcurrentDdls(CustomClusterTestSuite): """Test concurrent DDLs with invalidate metadata TODO: optimize the time dropping the unique_database at the end which dominants test time. It currently takes >1m. Most of the time spent in HMS.""" def _make_per_impalad_args(local_catalog_enabled): assert isinstance(local_catalog_enabled, list) args = ['--use_local_catalog=%s' % str(e).lower() for e in local_catalog_enabled] return "--per_impalad_args=" + ";".join(args) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=false", catalogd_args="--catalog_topic_mode=full") def test_ddls_with_invalidate_metadata(self, unique_database): self._run_ddls_with_invalidation(unique_database, sync_ddl=False) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=false", catalogd_args="--catalog_topic_mode=full --max_wait_time_for_sync_ddl_s=10") def test_ddls_with_invalidate_metadata_sync_ddl(self, unique_database): self._run_ddls_with_invalidation(unique_database, sync_ddl=True) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( start_args=_make_per_impalad_args([True, False]), catalogd_args="--catalog_topic_mode=mixed") def test_mixed_catalog_ddls_with_invalidate_metadata(self, unique_database): self._run_ddls_with_invalidation(unique_database, sync_ddl=False) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( start_args=_make_per_impalad_args([True, False]), catalogd_args="--catalog_topic_mode=mixed --max_wait_time_for_sync_ddl_s=10") def test_mixed_catalog_ddls_with_invalidate_metadata_sync_ddl(self, unique_database): self._run_ddls_with_invalidation(unique_database, sync_ddl=True) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", catalogd_args="--catalog_topic_mode=minimal") def test_local_catalog_ddls_with_invalidate_metadata(self, unique_database): self._run_ddls_with_invalidation(unique_database, sync_ddl=False) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", catalogd_args="--catalog_topic_mode=minimal --max_wait_time_for_sync_ddl_s=10") def test_local_catalog_ddls_with_invalidate_metadata_sync_ddl(self, unique_database): self._run_ddls_with_invalidation(unique_database, sync_ddl=True) @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", catalogd_args="--catalog_topic_mode=minimal " "--reset_metadata_lock_duration_ms=50 " "--debug_actions=reset_metadata_loop_unlocked:SLEEP@50") def test_local_catalog_ddls_with_invalidate_metadata_unlock_gap(self, unique_database): """Test with 50ms write unlock gap.""" self._run_ddls_with_invalidation(unique_database, sync_ddl=False) def _run_ddls_with_invalidation(self, db, sync_ddl=False): """Test INVALIDATE METADATA with concurrent DDLs to see if any queries hang""" test_self = self class ThreadLocalClient(threading.local): def __init__(self): self.client = test_self.create_impala_client() if sync_ddl: self.client.set_configuration_option('sync_ddl', 'true') pool = ThreadPool(processes=8) tls = ThreadLocalClient() def run_ddls(i): # Add a sleep so global INVALIDATE has more chance to run concurrently with other # DDLs. time.sleep(i % 5) tbl_name = db + ".test_" + str(i) # func_name = "f_" + str(i) for query in [ # alter database operations # TODO (IMPALA-9532): Uncomment the alter database operations # "comment on database %s is 'test-concurrent-ddls'" % db, # "alter database %s set owner user `test-user`" % db, # "create function %s.%s() returns int location '%s/libTestUdfs.so' \ # symbol='NoArgs'" % (db, func_name, WAREHOUSE), # "drop function if exists %s.%s()" % (db, func_name), # Create a partitioned and unpartitioned table "create table %s (i int)" % tbl_name, "create table %s_part (i int) partitioned by (j int)" % tbl_name, # Below queries could fail if running with invalidate metadata concurrently "alter table %s_part add partition (j=1)" % tbl_name, "alter table %s_part add partition (j=2)" % tbl_name, "alter table {0} rename to {0}_2".format(tbl_name), "alter table {0}_part rename to {0}_part2".format(tbl_name), "alter table {0}_2 rename to {0}".format(tbl_name), "alter table {0}_part2 rename to {0}_part".format(tbl_name), "invalidate metadata %s_part" % tbl_name, "refresh %s" % tbl_name, "refresh %s_part" % tbl_name, "insert overwrite table %s select int_col from " "functional.alltypestiny" % tbl_name, "insert overwrite table %s_part partition(j=1) " "values (1), (2), (3), (4), (5)" % tbl_name, "insert overwrite table %s_part partition(j=2) " "values (1), (2), (3), (4), (5)" % tbl_name ]: # Running concurrent with INVALIDATE METADATA can raise an exception. These are # safe to retry, so do that until we get a success. while True: try: handle = tls.client.execute_async(query) is_finished = tls.client.wait_for_finished_timeout(handle, timeout=120) assert is_finished, "Query timeout(120s): " + query tls.client.close_query(handle) # Success, next case. break except IMPALA_CONNECTION_EXCEPTION as e: err = str(e) if self.is_transient_error(err): # Retry the query. continue assert self.is_acceptable_error(err, sync_ddl), err self.execute_query_expect_success(tls.client, "invalidate metadata") return True # Run DDLs in single thread first. Some bugs causing DDL hangs can be hidden when run # with concurrent DDLs. res = pool.apply_async(run_ddls, (0,)) try: res.get(timeout=100) except TimeoutError: dump_server_stacktraces() assert False, "Single thread execution timeout!" # Run DDLs with invalidate metadata in parallel NUM_ITERS = 16 worker = [None] * (NUM_ITERS + 1) for i in range(1, NUM_ITERS + 1): worker[i] = pool.apply_async(run_ddls, (i,)) # INSERT with sync_ddl=true could hit IMPALA-9135 and hanging infinitely if there are # no more catalog updates, e.g. all other threads have finished. This leads to # timeout in this test. As a workaround, run a thread to keep creating new tables # to trigger new catalog updates. stop = False if sync_ddl: def create_tbls(): i = 0 while not stop: tls.client.execute("create table {}.tmp_tbl{} (i int)".format(db, i)) time.sleep(10) i += 1 pool.apply_async(create_tbls) for i in range(1, NUM_ITERS + 1): try: worker[i].get(timeout=100) except TimeoutError: stop = True dump_server_stacktraces() assert False, "Timeout in thread run_ddls(%d)" % i stop = True @classmethod def is_transient_error(cls, err): # DDL/DMLs may fail if running with invalidate metadata concurrently, since in-flight # table loadings can't finish if the target table is changed (e.g. reset to unloaded # state). See more in CatalogOpExecutor.getExistingTable(). if "CatalogException: Table" in err and \ "was modified while operation was in progress, aborting execution" in err: return True return False @classmethod def is_acceptable_error(cls, err, sync_ddl): # TODO: Consider remove this case after IMPALA-9135 is fixed. if sync_ddl: if "Couldn't retrieve the catalog topic version for the SYNC_DDL operation" in err\ and ("The operation has been successfully executed but its effects may have not " "been broadcast to all the coordinators.") in err: return True return False @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( impalad_args="--use_local_catalog=true", catalogd_args="--catalog_topic_mode=minimal") def test_concurrent_invalidate_metadata(self): """Test concurrent requests for INVALIDATE METADATA not hang""" test_self = self class ThreadLocalClient(threading.local): def __init__(self): self.client = test_self.create_impala_client() tls = ThreadLocalClient() def run_invalidate_metadata(): # TODO(IMPALA-9123): Detect hangs here instead of using pytest.mark.timeout self.execute_query_expect_success(tls.client, "invalidate metadata") NUM_ITERS = 20 pool = ThreadPool(processes=2) for i in range(NUM_ITERS): # Run two INVALIDATE METADATA commands in parallel r1 = pool.apply_async(run_invalidate_metadata) r2 = pool.apply_async(run_invalidate_metadata) try: r1.get(timeout=60) r2.get(timeout=60) except TimeoutError: dump_server_stacktraces() assert False, "INVALIDATE METADATA timeout in 60s!" pool.terminate() @CustomClusterTestSuite.with_args( catalogd_args="--enable_incremental_metadata_updates=true") def test_concurrent_invalidate_metadata_with_refresh(self, unique_database): # Create a wide table with some partitions tbl = unique_database + ".wide_tbl" create_stmt = "create table {} (".format(tbl) for i in range(600): create_stmt += "col{} int, ".format(i) create_stmt += "col600 int) partitioned by (p int) stored as textfile" self.execute_query(create_stmt) for i in range(10): self.execute_query("alter table {} add partition (p={})".format(tbl, i)) refresh_stmt = "refresh " + tbl refresh_handle = self.client.execute_async(refresh_stmt) for i in range(10): self.execute_query("invalidate metadata " + tbl) # Always keep a concurrent REFRESH statement running refresh_state = self.client.get_impala_exec_state(refresh_handle) if refresh_state == FINISHED or ERROR: refresh_handle = self.client.execute_async(refresh_stmt)