diff --git a/tests/common/impala_test_suite.py b/tests/common/impala_test_suite.py index d6e70c860..35c77df27 100644 --- a/tests/common/impala_test_suite.py +++ b/tests/common/impala_test_suite.py @@ -22,6 +22,7 @@ import pytest import grp import re import string +import subprocess import time from getpass import getuser from functools import wraps @@ -488,6 +489,25 @@ class ImpalaTestSuite(BaseTestSuite): self.hive_client.drop_table(db_name, table_name, True) self.hive_client.create_table(table) + def run_stmt_in_hive(self, stmt): + """ + Run a statement in Hive, returning stdout if successful and throwing + RuntimeError(stderr) if not. + """ + call = subprocess.Popen( + ['beeline', + '--outputformat=csv2', + '-u', 'jdbc:hive2://' + pytest.config.option.hive_server2, + '-n', getuser(), + '-e', stmt], + stdout=subprocess.PIPE, + stderr=subprocess.PIPE) + (stdout, stderr) = call.communicate() + call.wait() + if call.returncode != 0: + raise RuntimeError(stderr) + return stdout + @classmethod def create_table_info_dimension(cls, exploration_strategy): # If the user has specified a specific set of table formats to run against, then diff --git a/tests/metadata/test_hms_integration.py b/tests/metadata/test_hms_integration.py index 595f84e56..51b01067e 100644 --- a/tests/metadata/test_hms_integration.py +++ b/tests/metadata/test_hms_integration.py @@ -54,25 +54,6 @@ class TestHmsIntegration(ImpalaTestSuite): cls.TestMatrix.add_dimension( create_uncompressed_text_dimension(cls.get_workload())) - def run_stmt_in_hive(self, stmt): - """ - Run a statement in Hive, returning stdout if successful and throwing - RuntimeError(stderr) if not. - """ - call = subprocess.Popen( - ['beeline', - '--outputformat=csv2', - '-u', 'jdbc:hive2://' + pytest.config.option.hive_server2, - '-n', getuser(), - '-e', stmt], - stdout=subprocess.PIPE, - stderr=subprocess.PIPE) - (stdout, stderr) = call.communicate() - call.wait() - if call.returncode != 0: - raise RuntimeError(stderr) - return stdout - class ImpalaDbWrapper(object): """ A wrapper class for using `with` guards with databases created through diff --git a/tests/metadata/test_partition_metadata.py b/tests/metadata/test_partition_metadata.py index 228bec4f9..d48b09f29 100644 --- a/tests/metadata/test_partition_metadata.py +++ b/tests/metadata/test_partition_metadata.py @@ -25,15 +25,10 @@ from tests.common.impala_test_suite import * from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal from tests.util.filesystem_utils import WAREHOUSE - # Tests specific to partition metadata. # TODO: Split up the DDL tests and move some of the partition-specific tests # here. -@pytest.mark.execute_serially class TestPartitionMetadata(ImpalaTestSuite): - TEST_DB = 'partition_md' - TEST_TBL = 'bulk_part' - @classmethod def get_workload(self): return 'functional-query' @@ -48,90 +43,76 @@ class TestPartitionMetadata(ImpalaTestSuite): v.get_value('table_format').file_format == 'text' and\ v.get_value('table_format').compression_codec == 'none') - def setup_method(self, method): - self.cleanup_db(self.TEST_DB) - ddl = "create database {0} location '{1}/{0}.db'".format(self.TEST_DB, WAREHOUSE) - self.client.execute(ddl) - - def teardown_method(self, method): - self.cleanup_db(self.TEST_DB) - @SkipIfLocal.hdfs_client - def test_multiple_partitions_same_location(self, vector): + def test_multiple_partitions_same_location(self, vector, unique_database): """Regression test for IMPALA-597. Verifies Impala is able to properly read tables that have multiple partitions pointing to the same location. """ - self.client.execute("use %s" % self.TEST_DB) - impala_location = '%s/%s.db/%s' % (WAREHOUSE, self.TEST_DB, self.TEST_TBL) - filesystem_client_location = impala_location.split("/")[-1] + TBL_NAME = "same_loc_test" + FQ_TBL_NAME = unique_database + "." + TBL_NAME + TBL_LOCATION = '%s/%s.db/%s' % (WAREHOUSE, unique_database, TBL_NAME) # Cleanup any existing data in the table directory. - self.filesystem_client.delete_file_dir(filesystem_client_location, recursive=True) + self.filesystem_client.delete_file_dir(TBL_NAME, recursive=True) # Create the table - self.client.execute("create table {0}(i int) partitioned by(j int)" - "location '{1}/{2}.db/{0}'".format(self.TEST_TBL, WAREHOUSE, self.TEST_DB)) + self.client.execute("create table %s (i int) partitioned by(j int) location '%s'" + % (FQ_TBL_NAME, TBL_LOCATION)) # Point multiple partitions to the same location and use partition locations that # do not contain a key=value path. - self.filesystem_client.make_dir(filesystem_client_location + '/p') + self.filesystem_client.make_dir(TBL_NAME + '/p') # Point both partitions to the same location. - self.client.execute("alter table %s add partition (j=1) location '%s/p'" % - (self.TEST_TBL, impala_location)) - self.client.execute("alter table %s add partition (j=2) location '%s/p'" % - (self.TEST_TBL, impala_location)) + self.client.execute("alter table %s add partition (j=1) location '%s/p'" + % (FQ_TBL_NAME, TBL_LOCATION)) + self.client.execute("alter table %s add partition (j=2) location '%s/p'" + % (FQ_TBL_NAME, TBL_LOCATION)) # Insert some data. This will only update partition j=1 (IMPALA-1480). - self.client.execute("insert into table %s partition(j=1) select 1" % self.TEST_TBL) + self.client.execute("insert into table %s partition(j=1) select 1" % FQ_TBL_NAME) # Refresh to update file metadata of both partitions. - self.client.execute("refresh %s" % self.TEST_TBL) + self.client.execute("refresh %s" % FQ_TBL_NAME) # The data will be read twice because each partition points to the same location. - data = self.execute_scalar("select sum(i), sum(j) from %s" % self.TEST_TBL) + data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME) assert data.split('\t') == ['2', '3'] - self.client.execute("insert into %s partition(j) select 1, 1" % self.TEST_TBL) - self.client.execute("insert into %s partition(j) select 1, 2" % self.TEST_TBL) - self.client.execute("refresh %s" % self.TEST_TBL) - data = self.execute_scalar("select sum(i), sum(j) from %s" % self.TEST_TBL) + self.client.execute("insert into %s partition(j) select 1, 1" % FQ_TBL_NAME) + self.client.execute("insert into %s partition(j) select 1, 2" % FQ_TBL_NAME) + self.client.execute("refresh %s" % FQ_TBL_NAME) + data = self.execute_scalar("select sum(i), sum(j) from %s" % FQ_TBL_NAME) assert data.split('\t') == ['6', '9'] @SkipIfS3.hive @SkipIfIsilon.hive @SkipIfLocal.hive - def test_partition_metadata_compatibility(self, vector): + def test_partition_metadata_compatibility(self, vector, unique_database): """Regression test for IMPALA-2048. For partitioned tables, test that when Impala updates the partition metadata (e.g. by doing a compute stats), the tables are accessible in Hive.""" - TEST_TBL_HIVE = "part_parquet_tbl_hive" - TEST_TBL_IMP = "part_parquet_tbl_impala" + FQ_TBL_HIVE = unique_database + ".part_parquet_tbl_hive" + FQ_TBL_IMP = unique_database + ".part_parquet_tbl_impala" # First case, the table is created in HIVE. - self.run_stmt_in_hive("create table %s.%s(a int) partitioned by (x int) "\ - "stored as parquet" % (self.TEST_DB, TEST_TBL_HIVE)) + self.run_stmt_in_hive("create table %s (a int) partitioned by (x int) "\ + "stored as parquet" % FQ_TBL_HIVE) self.run_stmt_in_hive("set hive.exec.dynamic.partition.mode=nostrict;"\ - "insert into %s.%s partition (x) values(1,1)" % (self.TEST_DB, TEST_TBL_HIVE)) - self.run_stmt_in_hive("select * from %s.%s" % (self.TEST_DB, TEST_TBL_HIVE)) + "insert into %s partition (x) values(1,1)" % FQ_TBL_HIVE) + self.run_stmt_in_hive("select * from %s" % FQ_TBL_HIVE) # Load the table in Impala and modify its partition metadata by computing table # statistics. - self.client.execute("invalidate metadata %s.%s" % (self.TEST_DB, TEST_TBL_HIVE)) - self.client.execute("compute stats %s.%s" % (self.TEST_DB, TEST_TBL_HIVE)) - self.client.execute("select * from %s.%s" % (self.TEST_DB, TEST_TBL_HIVE)) + self.client.execute("invalidate metadata %s" % FQ_TBL_HIVE) + self.client.execute("compute stats %s" % FQ_TBL_HIVE) + self.client.execute("select * from %s" % FQ_TBL_HIVE) # Make sure the table is accessible in Hive - self.run_stmt_in_hive("select * from %s.%s" % (self.TEST_DB, TEST_TBL_HIVE)) + self.run_stmt_in_hive("select * from %s" % FQ_TBL_HIVE) # Second case, the table is created in Impala - self.client.execute("create table %s.%s(a int) partitioned by (x int) "\ - "stored as parquet" % (self.TEST_DB, TEST_TBL_IMP)) - self.client.execute("insert into %s.%s partition(x) values(1,1)" % (self.TEST_DB, - TEST_TBL_IMP)) + self.client.execute("create table %s (a int) partitioned by (x int) "\ + "stored as parquet" % FQ_TBL_IMP) + self.client.execute("insert into %s partition(x) values(1,1)" % FQ_TBL_IMP) # Make sure the table is accessible in HIVE - self.run_stmt_in_hive("select * from %s.%s" % (self.TEST_DB, TEST_TBL_IMP)) + self.run_stmt_in_hive("select * from %s" % FQ_TBL_IMP) # Compute table statistics - self.client.execute("compute stats %s.%s" % (self.TEST_DB, TEST_TBL_IMP)) - self.client.execute("select * from %s.%s" % (self.TEST_DB, TEST_TBL_IMP)) + self.client.execute("compute stats %s" % FQ_TBL_IMP) + self.client.execute("select * from %s" % FQ_TBL_IMP) # Make sure the table remains accessible in HIVE - self.run_stmt_in_hive("select * from %s.%s" % (self.TEST_DB, TEST_TBL_IMP)) - - def run_stmt_in_hive(self, stmt): - hive_ret = call(['hive', '-e', stmt]) - assert hive_ret == 0, 'Error executing statement %s in Hive' % stmt - + self.run_stmt_in_hive("select * from %s" % FQ_TBL_IMP)