mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
IMPALA-12893 upgrade CDP_BUILD_NUMBER=71942734 upgrade Ozone version to 1.4.0.7.3.1.500-182. This newer Ozone version does not include WAREHOUSE_PREFIX anymore in its trash path. This patch fix the broken tests in test_ddl.py by updating the expected trash path. Testing: Run and pass metadata/test_ddl.py in Ozone environment. Change-Id: If1271a399d4eb82fed9b073b99d9a7b2c18a03b1 Reviewed-on: http://gerrit.cloudera.org:8080/23734 Reviewed-by: Michael Smith <michael.smith@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
1326 lines
62 KiB
Python
1326 lines
62 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
from __future__ import absolute_import, division, print_function
|
|
from builtins import map, range
|
|
import getpass
|
|
import itertools
|
|
import pytest
|
|
import re
|
|
import time
|
|
import threading
|
|
from multiprocessing.pool import ThreadPool
|
|
from multiprocessing import TimeoutError
|
|
|
|
from copy import deepcopy
|
|
from tests.metadata.test_ddl_base import TestDdlBase
|
|
from tests.common.environ import (HIVE_MAJOR_VERSION)
|
|
from tests.common.file_utils import create_table_from_orc
|
|
from tests.common.impala_connection import (
|
|
FINISHED, INITIALIZED, IMPALA_CONNECTION_EXCEPTION, PENDING, RUNNING)
|
|
from tests.common.impala_test_suite import LOG
|
|
from tests.common.parametrize import UniqueDatabase
|
|
from tests.common.skip import (
|
|
SkipIfDockerizedCluster,
|
|
SkipIfFS,
|
|
SkipIfHive2,
|
|
SkipIfKudu,
|
|
SkipIfLocal)
|
|
from tests.common.test_dimensions import create_single_exec_option_dimension
|
|
from tests.common.test_dimensions import (create_exec_option_dimension,
|
|
create_client_protocol_dimension, create_exec_option_dimension_from_dict)
|
|
from tests.common.test_vector import ImpalaTestDimension
|
|
from tests.util.filesystem_utils import (
|
|
get_fs_path,
|
|
WAREHOUSE,
|
|
WAREHOUSE_PREFIX,
|
|
IS_HDFS,
|
|
IS_S3,
|
|
IS_ADLS,
|
|
IS_OZONE,
|
|
FILESYSTEM_NAME)
|
|
from tests.common.impala_cluster import ImpalaCluster
|
|
from tests.util.filesystem_utils import FILESYSTEM_PREFIX
|
|
from tests.util.parse_util import parse_duration_string_ms
|
|
from tests.util.shell_util import dump_server_stacktraces
|
|
|
|
|
|
def get_trash_path(bucket, path):
|
|
if IS_OZONE:
|
|
return get_fs_path('/{0}/.Trash/{1}/Current/{2}'.format(
|
|
bucket, getpass.getuser(), path))
|
|
return '/user/{0}/.Trash/Current/{1}/{2}'.format(getpass.getuser(), bucket, path)
|
|
|
|
|
|
# Validates DDL statements (create, drop)
|
|
class TestDdlStatements(TestDdlBase):
|
|
@SkipIfLocal.hdfs_client
|
|
def test_drop_table_with_purge(self, unique_database):
|
|
"""This test checks if the table data is permanently deleted in
|
|
DROP TABLE <tbl> PURGE queries"""
|
|
self.client.execute("create table {0}.t1(i int)".format(unique_database))
|
|
self.client.execute("create table {0}.t2(i int)".format(unique_database))
|
|
# Create sample test data files under the table directories
|
|
dbpath = "{0}/{1}.db".format(WAREHOUSE, unique_database)
|
|
self.filesystem_client.create_file("{}/t1/t1.txt".format(dbpath), file_data='t1')
|
|
self.filesystem_client.create_file("{}/t2/t2.txt".format(dbpath), file_data='t2')
|
|
# Drop the table (without purge) and make sure it exists in trash
|
|
self.client.execute("drop table {0}.t1".format(unique_database))
|
|
assert not self.filesystem_client.exists("{}/t1/t1.txt".format(dbpath))
|
|
assert not self.filesystem_client.exists("{}/t1/".format(dbpath))
|
|
trash = get_trash_path("test-warehouse", unique_database + ".db")
|
|
assert self.filesystem_client.exists("{}/t1/t1.txt".format(trash))
|
|
assert self.filesystem_client.exists("{}/t1".format(trash))
|
|
# Drop the table (with purge) and make sure it doesn't exist in trash
|
|
self.client.execute("drop table {0}.t2 purge".format(unique_database))
|
|
if not IS_S3 and not IS_ADLS:
|
|
# In S3, deletes are eventual. So even though we dropped the table, the files
|
|
# belonging to this table may still be visible for some unbounded time. This
|
|
# happens only with PURGE. A regular DROP TABLE is just a copy of files which is
|
|
# consistent.
|
|
# The ADLS Python client is not strongly consistent, so these files may still be
|
|
# visible after a DROP. (Remove after IMPALA-5335 is resolved)
|
|
assert not self.filesystem_client.exists("{}/t2/".format(dbpath))
|
|
assert not self.filesystem_client.exists("{}/t2/t2.txt".format(dbpath))
|
|
assert not self.filesystem_client.exists("{}/t2/t2.txt".format(trash))
|
|
assert not self.filesystem_client.exists("{}/t2".format(trash))
|
|
# Create an external table t3 and run the same test as above. Make
|
|
# sure the data is not deleted
|
|
self.filesystem_client.make_dir("{}/data_t3/".format(dbpath), permission=777)
|
|
self.filesystem_client.create_file(
|
|
"{}/data_t3/data.txt".format(dbpath), file_data='100')
|
|
self.client.execute("create external table {0}.t3(i int) stored as "
|
|
"textfile location \'{1}/data_t3\'".format(unique_database, dbpath))
|
|
self.client.execute("drop table {0}.t3 purge".format(unique_database))
|
|
assert self.filesystem_client.exists("{}/data_t3/data.txt".format(dbpath))
|
|
self.filesystem_client.delete_file_dir("{}/data_t3".format(dbpath), recursive=True)
|
|
|
|
@SkipIfFS.eventually_consistent
|
|
@SkipIfLocal.hdfs_client
|
|
def test_drop_cleans_hdfs_dirs(self, unique_database):
|
|
self.client.execute('use default')
|
|
# Verify the db directory exists
|
|
assert self.filesystem_client.exists(
|
|
"{1}/{0}.db/".format(unique_database, WAREHOUSE))
|
|
|
|
self.client.execute("create table {0}.t1(i int)".format(unique_database))
|
|
# Verify the table directory exists
|
|
assert self.filesystem_client.exists(
|
|
"{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))
|
|
|
|
# Dropping the table removes the table's directory and preserves the db's directory
|
|
self.client.execute("drop table {0}.t1".format(unique_database))
|
|
assert not self.filesystem_client.exists(
|
|
"{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))
|
|
assert self.filesystem_client.exists(
|
|
"{1}/{0}.db/".format(unique_database, WAREHOUSE))
|
|
|
|
# Dropping the db removes the db's directory
|
|
self.client.execute("drop database {0}".format(unique_database))
|
|
assert not self.filesystem_client.exists(
|
|
"{1}/{0}.db/".format(unique_database, WAREHOUSE))
|
|
|
|
# Dropping the db using "cascade" removes all tables' and db's directories
|
|
# but keeps the external tables' directory
|
|
self._create_db(unique_database)
|
|
self.client.execute("create table {0}.t1(i int)".format(unique_database))
|
|
self.client.execute("create table {0}.t2(i int)".format(unique_database))
|
|
self.client.execute("create external table {0}.t3(i int) "
|
|
"location '{1}/{0}/t3/'".format(unique_database, WAREHOUSE))
|
|
self.client.execute("drop database {0} cascade".format(unique_database))
|
|
assert not self.filesystem_client.exists(
|
|
"{1}/{0}.db/".format(unique_database, WAREHOUSE))
|
|
assert not self.filesystem_client.exists(
|
|
"{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))
|
|
assert not self.filesystem_client.exists(
|
|
"{1}/{0}.db/t2/".format(unique_database, WAREHOUSE))
|
|
assert self.filesystem_client.exists(
|
|
"{1}/{0}/t3/".format(unique_database, WAREHOUSE))
|
|
self.filesystem_client.delete_file_dir(
|
|
"{1}/{0}/t3/".format(unique_database, WAREHOUSE), recursive=True)
|
|
assert not self.filesystem_client.exists(
|
|
"{1}/{0}/t3/".format(unique_database, WAREHOUSE))
|
|
# Re-create database to make unique_database teardown succeed.
|
|
self._create_db(unique_database)
|
|
|
|
@SkipIfFS.eventually_consistent
|
|
@SkipIfLocal.hdfs_client
|
|
def test_truncate_cleans_hdfs_files(self, unique_database):
|
|
# Verify the db directory exists
|
|
assert self.filesystem_client.exists(
|
|
"{1}/{0}.db/".format(unique_database, WAREHOUSE))
|
|
|
|
self.client.execute("create table {0}.t1(i int)".format(unique_database))
|
|
# Verify the table directory exists
|
|
assert self.filesystem_client.exists(
|
|
"{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))
|
|
|
|
try:
|
|
# If we're testing S3, we want the staging directory to be created.
|
|
self.client.execute("set s3_skip_insert_staging=false")
|
|
# Should have created one file in the table's dir
|
|
self.client.execute("insert into {0}.t1 values (1)".format(unique_database))
|
|
assert len(self.filesystem_client.ls(
|
|
"{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))) == 2
|
|
|
|
# Truncating the table removes the data files and the staging directory
|
|
self.client.execute("truncate table {0}.t1".format(unique_database))
|
|
assert len(self.filesystem_client.ls(
|
|
"{1}/{0}.db/t1/".format(unique_database, WAREHOUSE))) == 0
|
|
|
|
self.client.execute(
|
|
"create table {0}.t2(i int) partitioned by (p int)".format(unique_database))
|
|
# Verify the table directory exists
|
|
assert self.filesystem_client.exists(
|
|
"{1}/{0}.db/t2/".format(unique_database, WAREHOUSE))
|
|
|
|
# Should have created the partition dir, which should contain exactly one file
|
|
self.client.execute(
|
|
"insert into {0}.t2 partition(p=1) values (1)".format(unique_database))
|
|
assert len(self.filesystem_client.ls(
|
|
"{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE))) == 1
|
|
|
|
# Truncating the table removes the data files and preserves the partition's
|
|
# directory
|
|
self.client.execute("truncate table {0}.t2".format(unique_database))
|
|
assert self.filesystem_client.exists(
|
|
"{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE))
|
|
assert len(self.filesystem_client.ls(
|
|
"{1}/{0}.db/t2/p=1".format(unique_database, WAREHOUSE))) == 0
|
|
finally:
|
|
# Reset to its default value.
|
|
self.client.execute("set s3_skip_insert_staging=true")
|
|
|
|
@SkipIfFS.incorrent_reported_ec
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_truncate_table(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
self.run_test_case('QueryTest/truncate-table', vector, use_db=unique_database,
|
|
multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_create_database(self, vector, unique_database):
|
|
# The unique_database provides the .test a unique database name which allows
|
|
# us to run this test in parallel with others.
|
|
self.run_test_case('QueryTest/create-database', vector, use_db=unique_database,
|
|
multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
def test_comment_on_database(self, unique_database):
|
|
comment = self._get_db_comment(unique_database)
|
|
assert '' == comment
|
|
|
|
self.client.execute("comment on database {0} is 'comment'".format(unique_database))
|
|
comment = self._get_db_comment(unique_database)
|
|
assert 'comment' == comment
|
|
|
|
self.client.execute("comment on database {0} is ''".format(unique_database))
|
|
comment = self._get_db_comment(unique_database)
|
|
assert '' == comment
|
|
|
|
self.client.execute("comment on database {0} is '\\'comment\\''".format(
|
|
unique_database))
|
|
comment = self._get_db_comment(unique_database)
|
|
assert "\\'comment\\'" == comment
|
|
|
|
self.client.execute("comment on database {0} is null".format(unique_database))
|
|
comment = self._get_db_comment(unique_database)
|
|
assert '' == comment
|
|
|
|
def test_alter_database_set_owner(self, unique_database):
|
|
self.client.execute("alter database {0} set owner user foo_user".format(
|
|
unique_database))
|
|
properties = self._get_db_owner_properties(unique_database)
|
|
assert len(properties) == 1
|
|
assert {'foo_user': 'USER'} == properties
|
|
|
|
self.client.execute("alter database {0} set owner role foo_role".format(
|
|
unique_database))
|
|
properties = self._get_db_owner_properties(unique_database)
|
|
assert len(properties) == 1
|
|
assert {'foo_role': 'ROLE'} == properties
|
|
|
|
def test_metadata_after_alter_database(self, unique_database):
|
|
self.client.execute("create table {0}.tbl (i int)".format(unique_database))
|
|
self.client.execute("create function {0}.f() returns int "
|
|
"location '{1}/libTestUdfs.so' symbol='NoArgs'"
|
|
.format(unique_database, WAREHOUSE))
|
|
self.client.execute("alter database {0} set owner user foo_user".format(
|
|
unique_database))
|
|
table_names = self.client.execute("show tables in {0}".format(
|
|
unique_database)).get_data()
|
|
assert "tbl" == table_names
|
|
func_names = self.client.execute("show functions in {0}".format(
|
|
unique_database)).get_data()
|
|
assert "INT\tf()\tNATIVE\ttrue" == func_names
|
|
|
|
def test_alter_table_set_owner(self, unique_database):
|
|
table_name = "{0}.test_owner_tbl".format(unique_database)
|
|
self.client.execute("create table {0}(i int)".format(table_name))
|
|
self.client.execute("alter table {0} set owner user foo_user".format(table_name))
|
|
owner = self._get_table_or_view_owner(table_name)
|
|
assert ('foo_user', 'USER') == owner
|
|
|
|
self.client.execute("alter table {0} set owner role foo_role".format(table_name))
|
|
owner = self._get_table_or_view_owner(table_name)
|
|
assert ('foo_role', 'ROLE') == owner
|
|
|
|
def test_alter_view_set_owner(self, unique_database):
|
|
view_name = "{0}.test_owner_tbl".format(unique_database)
|
|
self.client.execute("create view {0} as select 1".format(view_name))
|
|
self.client.execute("alter view {0} set owner user foo_user".format(view_name))
|
|
owner = self._get_table_or_view_owner(view_name)
|
|
assert ('foo_user', 'USER') == owner
|
|
|
|
self.client.execute("alter view {0} set owner role foo_role".format(view_name))
|
|
owner = self._get_table_or_view_owner(view_name)
|
|
assert ('foo_role', 'ROLE') == owner
|
|
|
|
# There is a query in QueryTest/create-table that references nested types, which is not
|
|
# supported if old joins and aggs are enabled. Since we do not get any meaningful
|
|
# additional coverage by running a DDL test under the old aggs and joins, it can be
|
|
# skipped.
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_create_table(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
self.run_test_case('QueryTest/create-table', vector, use_db=unique_database,
|
|
multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@SkipIfFS.incorrent_reported_ec
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_create_table_like_table(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
self.run_test_case('QueryTest/create-table-like-table', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_create_table_like_file(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
self.run_test_case('QueryTest/create-table-like-file', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@SkipIfHive2.orc
|
|
@SkipIfFS.hive
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_create_table_like_file_orc(self, vector, unique_database):
|
|
COMPLEXTYPETBL_PATH = 'test-warehouse/managed/functional_orc_def.db/' \
|
|
'complextypestbl_orc_def/'
|
|
base_dir = list(filter(lambda s: s.startswith('base'),
|
|
self.filesystem_client.ls(COMPLEXTYPETBL_PATH)))[0]
|
|
bucket_file = list(filter(lambda s: s.startswith('bucket'),
|
|
self.filesystem_client.ls(COMPLEXTYPETBL_PATH + base_dir)))[0]
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
create_table_from_orc(self.client, unique_database,
|
|
'timestamp_with_local_timezone')
|
|
self.run_test_case('QueryTest/create-table-like-file-orc', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector),
|
|
test_file_vars={
|
|
'$TRANSACTIONAL_COMPLEXTYPESTBL_FILE':
|
|
FILESYSTEM_PREFIX + '/' + COMPLEXTYPETBL_PATH + base_dir + '/' + bucket_file})
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_create_table_as_select(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
self.run_test_case('QueryTest/create-table-as-select', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
@SkipIfKudu.no_hybrid_clock()
|
|
def test_create_kudu(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
vector.get_value('exec_option')['kudu_read_mode'] = "READ_AT_SNAPSHOT"
|
|
self.run_test_case('QueryTest/kudu_create', vector, use_db=unique_database,
|
|
multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
def test_comment_on_table(self, unique_database):
|
|
table = '{0}.comment_table'.format(unique_database)
|
|
self.client.execute("create table {0} (i int)".format(table))
|
|
|
|
comment = self._get_table_or_view_comment(table)
|
|
assert comment is None
|
|
|
|
self.client.execute("comment on table {0} is 'comment'".format(table))
|
|
comment = self._get_table_or_view_comment(table)
|
|
assert "comment" == comment
|
|
|
|
self.client.execute("comment on table {0} is ''".format(table))
|
|
comment = self._get_table_or_view_comment(table)
|
|
assert "" == comment
|
|
|
|
self.client.execute("comment on table {0} is '\\'comment\\''".format(table))
|
|
comment = self._get_table_or_view_comment(table)
|
|
assert "\\\\'comment\\\\'" == comment
|
|
|
|
self.client.execute("comment on table {0} is null".format(table))
|
|
comment = self._get_table_or_view_comment(table)
|
|
assert comment is None
|
|
|
|
def test_comment_on_view(self, unique_database):
|
|
view = '{0}.comment_view'.format(unique_database)
|
|
self.client.execute("create view {0} as select 1".format(view))
|
|
|
|
comment = self._get_table_or_view_comment(view)
|
|
assert comment is None
|
|
|
|
self.client.execute("comment on view {0} is 'comment'".format(view))
|
|
comment = self._get_table_or_view_comment(view)
|
|
assert "comment" == comment
|
|
|
|
self.client.execute("comment on view {0} is ''".format(view))
|
|
comment = self._get_table_or_view_comment(view)
|
|
assert "" == comment
|
|
|
|
self.client.execute("comment on view {0} is '\\'comment\\''".format(view))
|
|
comment = self._get_table_or_view_comment(view)
|
|
assert "\\\\'comment\\\\'" == comment
|
|
|
|
self.client.execute("comment on view {0} is null".format(view))
|
|
comment = self._get_table_or_view_comment(view)
|
|
assert comment is None
|
|
|
|
def test_comment_on_column(self, unique_database):
|
|
table = "{0}.comment_table".format(unique_database)
|
|
self.client.execute("create table {0} (i int) partitioned by (j int)".format(table))
|
|
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert '' == comment
|
|
|
|
# Updating comment on a regular column.
|
|
self.client.execute("comment on column {0}.i is 'comment 1'".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "comment 1" == comment
|
|
|
|
# Updating comment on a partition column.
|
|
self.client.execute("comment on column {0}.j is 'comment 2'".format(table))
|
|
comment = self._get_column_comment(table, 'j')
|
|
assert "comment 2" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is ''".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is '\\'comment\\''".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "\\'comment\\'" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is null".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "" == comment
|
|
|
|
view = "{0}.comment_view".format(unique_database)
|
|
self.client.execute("create view {0}(i) as select 1".format(view))
|
|
|
|
comment = self._get_column_comment(view, 'i')
|
|
assert "" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is 'comment'".format(view))
|
|
comment = self._get_column_comment(view, 'i')
|
|
assert "comment" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is ''".format(view))
|
|
comment = self._get_column_comment(view, 'i')
|
|
assert "" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is '\\'comment\\''".format(view))
|
|
comment = self._get_column_comment(view, 'i')
|
|
assert "\\'comment\\'" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is null".format(view))
|
|
comment = self._get_column_comment(view, 'i')
|
|
assert "" == comment
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_sync_ddl_drop(self, unique_database):
|
|
"""Verifies the catalog gets updated properly when dropping objects with sync_ddl
|
|
enabled"""
|
|
self.client.set_configuration({'sync_ddl': 1})
|
|
# Drop the database immediately after creation (within a statestore heartbeat) and
|
|
# verify the catalog gets updated properly.
|
|
self.client.execute("drop database {0}".format(unique_database))
|
|
assert unique_database not in self.all_db_names()
|
|
# Re-create database to make unique_database teardown succeed.
|
|
self._create_db(unique_database)
|
|
|
|
# TODO: don't use hdfs_client
|
|
@SkipIfLocal.hdfs_client
|
|
@SkipIfFS.incorrent_reported_ec
|
|
@UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2)
|
|
def test_alter_table(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
|
|
# Create an unpartitioned table to get a filesystem directory that does not
|
|
# use the (key=value) format. The directory is automatically cleanup up
|
|
# by the unique_database fixture.
|
|
self.client.execute("create table {0}.part_data (i int)".format(unique_database))
|
|
dbpath = "{1}/{0}.db".format(unique_database, WAREHOUSE)
|
|
assert self.filesystem_client.exists("{}/part_data".format(dbpath))
|
|
self.filesystem_client.create_file(
|
|
"{}/part_data/data.txt".format(dbpath), file_data='1984')
|
|
self.run_test_case('QueryTest/alter-table', vector, use_db=unique_database,
|
|
multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@SkipIfFS.hdfs_caching
|
|
@SkipIfLocal.hdfs_client
|
|
@UniqueDatabase.parametrize(sync_ddl=True, num_dbs=2)
|
|
def test_alter_table_hdfs_caching(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/alter-table-hdfs-caching', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_alter_set_column_stats(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/alter-table-set-column-stats', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
# Run serially as alter waits for catalog to catch up before marking "DDL finished" so
|
|
# we don't have a good way to confirm the alter timeline if catalog gets delayed.
|
|
@pytest.mark.execute_serially
|
|
def test_alter_table_rename_independent(self, vector, unique_database):
|
|
"""Tests that two alter table renames run concurrently do not block each other."""
|
|
|
|
def table_name(i):
|
|
return "{}.tbl_{}".format(unique_database, i)
|
|
|
|
def alter(i, j):
|
|
return "alter table {} rename to {}".format(table_name(i), table_name(j))
|
|
|
|
def get_read_lock_duration_ms(profile):
|
|
read_lock_durations = re.findall(r"Got catalog version read lock: [^ ]*", profile)
|
|
assert len(read_lock_durations) == 1
|
|
return parse_duration_string_ms(read_lock_durations[0].split(" ")[-1])
|
|
|
|
self.client.execute("create table {} (i int)".format(table_name(1)))
|
|
self.client.execute("create table {} (i int)".format(table_name(2)))
|
|
# Ensure loading metadata is not a factor in alter execution time.
|
|
self.client.execute("describe {}".format(table_name(1)))
|
|
self.client.execute("describe {}".format(table_name(2)))
|
|
|
|
new_vector = deepcopy(vector)
|
|
new_vector.get_value('exec_option')['debug_action'] = \
|
|
"catalogd_table_rename_delay:SLEEP@5000"
|
|
with self.create_impala_client_from_vector(new_vector) as client1, \
|
|
self.create_impala_client_from_vector(new_vector) as client2:
|
|
start = time.time()
|
|
handle1 = client1.execute_async(alter(1, 3))
|
|
handle2 = client2.execute_async(alter(2, 4))
|
|
assert client1.wait_for_finished_timeout(handle1, timeout=15)
|
|
assert client2.wait_for_finished_timeout(handle2, timeout=15)
|
|
assert time.time() - start < 15
|
|
|
|
profile1 = client1.get_runtime_profile(handle1)
|
|
assert get_read_lock_duration_ms(profile1) < 5000
|
|
profile2 = client2.get_runtime_profile(handle2)
|
|
assert get_read_lock_duration_ms(profile2) < 5000
|
|
|
|
client1.close_query(handle1)
|
|
client2.close_query(handle2)
|
|
|
|
@UniqueDatabase.parametrize(num_dbs=2)
|
|
def test_concurrent_alter_table_rename(self, vector, unique_database):
|
|
test_self = self
|
|
|
|
class ThreadLocalClient(threading.local):
|
|
def __init__(self):
|
|
self.client = test_self.create_impala_client_from_vector(vector)
|
|
|
|
pool = ThreadPool(processes=8)
|
|
tlc = ThreadLocalClient()
|
|
|
|
def run_rename(i):
|
|
if i % 2 == 0:
|
|
tlc.client.set_configuration_option("sync_ddl", "1")
|
|
is_partitioned = i % 4 < 2
|
|
tbl_name = "{}.tbl_{}".format(unique_database, i)
|
|
tlc.client.execute("create table {}(i int){}".format(
|
|
tbl_name, "partitioned by(p int)" if is_partitioned else ""))
|
|
if i % 8 < 4:
|
|
# Rename inside the same db
|
|
new_tbl_name = tbl_name + "_new"
|
|
else:
|
|
# Move to another db
|
|
new_tbl_name = "{}2.tbl_{}".format(unique_database, i)
|
|
stmts = [
|
|
"alter table {} rename to {}".format(tbl_name, new_tbl_name),
|
|
"alter table {} rename to {}".format(new_tbl_name, tbl_name),
|
|
]
|
|
# Move the table back and forth in several rounds
|
|
for _ in range(4):
|
|
for query in stmts:
|
|
# Run the query asynchronously to avoid getting stuck by it
|
|
handle = tlc.client.execute_async(query)
|
|
is_finished = tlc.client.wait_for_finished_timeout(handle, timeout=60)
|
|
assert is_finished, "Query timeout(60s): " + query
|
|
tlc.client.close_query(handle)
|
|
return True
|
|
|
|
# Run renames in parallel
|
|
NUM_ITERS = 16
|
|
worker = [None] * (NUM_ITERS + 1)
|
|
for i in range(1, NUM_ITERS + 1):
|
|
worker[i] = pool.apply_async(run_rename, (i,))
|
|
for i in range(1, NUM_ITERS + 1):
|
|
try:
|
|
assert worker[i].get(timeout=100)
|
|
except TimeoutError:
|
|
dump_server_stacktraces()
|
|
assert False, "Timeout in thread run_ddls(%d)" % i
|
|
|
|
@SkipIfFS.hbase
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_alter_hbase_set_column_stats(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/alter-hbase-table-set-column-stats', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@SkipIfLocal.hdfs_client
|
|
def test_drop_partition_with_purge(self, unique_database):
|
|
"""Verfies whether alter <tbl> drop partition purge actually skips trash"""
|
|
self.client.execute(
|
|
"create table {0}.t1(i int) partitioned by (j int)".format(unique_database))
|
|
# Add two partitions (j=1) and (j=2) to table t1
|
|
self.client.execute("alter table {0}.t1 add partition(j=1)".format(unique_database))
|
|
self.client.execute("alter table {0}.t1 add partition(j=2)".format(unique_database))
|
|
dbpath = "{1}/{0}.db".format(unique_database, WAREHOUSE)
|
|
self.filesystem_client.create_file("{}/t1/j=1/j1.txt".format(dbpath), file_data='j1')
|
|
self.filesystem_client.create_file("{}/t1/j=2/j2.txt".format(dbpath), file_data='j2')
|
|
# Drop the partition (j=1) without purge and make sure it exists in trash
|
|
self.client.execute("alter table {0}.t1 drop partition(j=1)".format(unique_database))
|
|
assert not self.filesystem_client.exists("{}/t1/j=1/j1.txt".format(dbpath))
|
|
assert not self.filesystem_client.exists("{}/t1/j=1".format(dbpath))
|
|
trash = get_trash_path("test-warehouse", unique_database + ".db")
|
|
assert self.filesystem_client.exists('{}/t1/j=1/j1.txt'.format(trash))
|
|
assert self.filesystem_client.exists('{}/t1/j=1'.format(trash))
|
|
# Drop the partition (with purge) and make sure it doesn't exist in trash
|
|
self.client.execute("alter table {0}.t1 drop partition(j=2) purge".format(
|
|
unique_database))
|
|
if not IS_S3 and not IS_ADLS:
|
|
# In S3, deletes are eventual. So even though we dropped the partition, the files
|
|
# belonging to this partition may still be visible for some unbounded time. This
|
|
# happens only with PURGE. A regular DROP TABLE is just a copy of files which is
|
|
# consistent.
|
|
# The ADLS Python client is not strongly consistent, so these files may still be
|
|
# visible after a DROP. (Remove after IMPALA-5335 is resolved)
|
|
assert not self.filesystem_client.exists("{}/t1/j=2/j2.txt".format(dbpath))
|
|
assert not self.filesystem_client.exists("{}/t1/j=2".format(dbpath))
|
|
assert not self.filesystem_client.exists('{}/t1/j=2/j2.txt'.format(trash))
|
|
assert not self.filesystem_client.exists('{}/t1/j=2'.format(trash))
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_views_ddl(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
self.run_test_case('QueryTest/views-ddl', vector, use_db=unique_database,
|
|
multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@UniqueDatabase.parametrize()
|
|
def test_view_hints(self, unique_database):
|
|
# Test that plan hints are stored in the view's comment field; this should work
|
|
# regardless of how Hive formats the output. Getting this to work with the
|
|
# automated test case runner is rather difficult, so verify directly. There
|
|
# should be two # of each join hint, one for the original text, one for the expanded
|
|
self.client.execute("""
|
|
create view {0}.hints_test as
|
|
select /* +straight_join */ a.* from functional.alltypestiny a
|
|
inner join /* +broadcast */ functional.alltypes b on a.id = b.id
|
|
inner join /* +shuffle */ functional.alltypessmall c on b.id = c.id
|
|
""".format(unique_database))
|
|
results = self.execute_query("describe formatted %s.hints_test" % unique_database)
|
|
sj, bc, shuf = 0, 0, 0
|
|
for row in results.data:
|
|
sj += '-- +straight_join' in row
|
|
bc += '-- +broadcast' in row
|
|
shuf += '-- +shuffle' in row
|
|
assert sj == 2
|
|
assert bc == 2
|
|
assert shuf == 2
|
|
|
|
# Test querying the hinted view.
|
|
results = self.execute_query("select count(*) from %s.hints_test" % unique_database)
|
|
assert results.success
|
|
assert len(results.data) == 1
|
|
assert results.data[0] == '8'
|
|
|
|
# Test the plan to make sure hints were applied correctly
|
|
plan = self.execute_query("explain select * from %s.hints_test" % unique_database,
|
|
query_options={'explain_level': 0})
|
|
plan_match = """PLAN-ROOT SINK
|
|
08:EXCHANGE [UNPARTITIONED]
|
|
04:HASH JOIN [INNER JOIN, PARTITIONED]
|
|
|--07:EXCHANGE [HASH(c.id)]
|
|
| 02:SCAN {filesystem_name} [functional.alltypessmall c]
|
|
06:EXCHANGE [HASH(b.id)]
|
|
03:HASH JOIN [INNER JOIN, BROADCAST]
|
|
|--05:EXCHANGE [BROADCAST]
|
|
| 01:SCAN {filesystem_name} [functional.alltypes b]
|
|
00:SCAN {filesystem_name} [functional.alltypestiny a]"""
|
|
assert plan_match.format(filesystem_name=FILESYSTEM_NAME) in '\n'.join(plan.data)
|
|
|
|
def _verify_describe_view(self, vector, view_name, expected_substr):
|
|
"""
|
|
Verify across all impalads that the view 'view_name' has the given substring in its
|
|
expanded SQL.
|
|
|
|
If SYNC_DDL is enabled, the verification should complete immediately. Otherwise,
|
|
loops waiting for the expected condition to pass.
|
|
"""
|
|
if vector.get_value('exec_option')['sync_ddl']:
|
|
num_attempts = 1
|
|
else:
|
|
num_attempts = 60
|
|
for impalad in ImpalaCluster.get_e2e_test_cluster().impalads:
|
|
client = impalad.service.create_client_from_vector(vector)
|
|
try:
|
|
for attempt in itertools.count(1):
|
|
assert attempt <= num_attempts, "ran out of attempts"
|
|
try:
|
|
result = self.execute_query_expect_success(
|
|
client, "describe formatted %s" % view_name)
|
|
exp_line = [line for line in result.data if 'View Expanded' in line][0]
|
|
except IMPALA_CONNECTION_EXCEPTION as e:
|
|
# In non-SYNC_DDL tests, it's OK to get a "missing view" type error
|
|
# until the metadata propagates.
|
|
exp_line = "Exception: %s" % e
|
|
if expected_substr in exp_line.lower():
|
|
return
|
|
time.sleep(1)
|
|
finally:
|
|
client.close()
|
|
|
|
def test_views_describe(self, vector, unique_database):
|
|
# IMPALA-6896: Tests that altered views can be described by all impalads.
|
|
impala_cluster = ImpalaCluster.get_e2e_test_cluster()
|
|
impalads = impala_cluster.impalads
|
|
view_name = "%s.test_describe_view" % unique_database
|
|
first_client = impalads[0].service.create_client_from_vector(vector)
|
|
try:
|
|
# Create a view and verify it's visible.
|
|
self.execute_query_expect_success(first_client,
|
|
"create view {0} as "
|
|
"select * from functional.alltypes"
|
|
.format(view_name))
|
|
self._verify_describe_view(vector, view_name, "select * from functional.alltypes")
|
|
|
|
# Alter the view and verify the alter is visible.
|
|
self.execute_query_expect_success(first_client,
|
|
"alter view {0} as "
|
|
"select * from functional.alltypesagg"
|
|
.format(view_name))
|
|
self._verify_describe_view(vector, view_name,
|
|
"select * from functional.alltypesagg")
|
|
finally:
|
|
first_client.close()
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_functions_ddl(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/functions-ddl', vector, use_db=unique_database,
|
|
multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
@SkipIfLocal.hdfs_client
|
|
def test_create_alter_bulk_partition(self, unique_database):
|
|
# Change the scale depending on the exploration strategy, with 50 partitions this
|
|
# test runs a few minutes, with 10 partitions it takes ~50s for two configurations.
|
|
num_parts = 50 if self.exploration_strategy() == 'exhaustive' else 10
|
|
fq_tbl_name = unique_database + ".part_test_tbl"
|
|
self.client.execute("create table {0}(i int) partitioned by(j int, s string) "
|
|
"location '{1}/{0}'".format(fq_tbl_name, WAREHOUSE))
|
|
|
|
# Add some partitions (first batch of two)
|
|
for i in range(num_parts // 5):
|
|
start = time.time()
|
|
self.client.execute(
|
|
"alter table {0} add partition(j={1}, s='{1}')".format(fq_tbl_name, i))
|
|
LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start))
|
|
|
|
# Modify one of the partitions
|
|
self.client.execute("alter table {0} partition(j=1, s='1')"
|
|
" set fileformat parquetfile".format(fq_tbl_name))
|
|
|
|
# Alter one partition to a non-existent location twice (IMPALA-741)
|
|
self.filesystem_client.delete_file_dir("tmp/dont_exist1/", recursive=True)
|
|
self.filesystem_client.delete_file_dir("tmp/dont_exist2/", recursive=True)
|
|
|
|
self.execute_query_expect_success(self.client,
|
|
"alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist1'"
|
|
.format(fq_tbl_name, WAREHOUSE))
|
|
self.execute_query_expect_success(self.client,
|
|
"alter table {0} partition(j=1,s='1') set location '{1}/tmp/dont_exist2'"
|
|
.format(fq_tbl_name, WAREHOUSE))
|
|
|
|
# Add some more partitions
|
|
for i in range(num_parts // 5, num_parts):
|
|
start = time.time()
|
|
self.client.execute(
|
|
"alter table {0} add partition(j={1},s='{1}')".format(fq_tbl_name, i))
|
|
LOG.info('ADD PARTITION #%d exec time: %s' % (i, time.time() - start))
|
|
|
|
# Insert data and verify it shows up.
|
|
self.client.execute(
|
|
"insert into table {0} partition(j=1, s='1') select 1".format(fq_tbl_name))
|
|
assert '1' == self.execute_scalar("select count(*) from {0}".format(fq_tbl_name))
|
|
|
|
@SkipIfLocal.hdfs_client
|
|
def test_alter_table_set_fileformat(self, unique_database):
|
|
# Tests that SET FILEFORMAT clause is set for ALTER TABLE ADD PARTITION statement
|
|
fq_tbl_name = unique_database + ".p_fileformat"
|
|
self.client.execute(
|
|
"create table {0}(i int) partitioned by (p int)".format(fq_tbl_name))
|
|
|
|
# Add a partition with Parquet fileformat
|
|
self.execute_query_expect_success(self.client,
|
|
"alter table {0} add partition(p=1) set fileformat parquet"
|
|
.format(fq_tbl_name))
|
|
|
|
# Add two partitions with ORC fileformat
|
|
self.execute_query_expect_success(self.client,
|
|
"alter table {0} add partition(p=2) partition(p=3) set fileformat orc"
|
|
.format(fq_tbl_name))
|
|
|
|
result = self.execute_query_expect_success(self.client,
|
|
"SHOW PARTITIONS %s" % fq_tbl_name)
|
|
|
|
assert 1 == len([line for line in result.data if line.find("PARQUET") != -1])
|
|
assert 2 == len([line for line in result.data if line.find("ORC") != -1])
|
|
|
|
def test_alter_table_create_many_partitions(self, unique_database):
|
|
"""
|
|
Checks that creating more partitions than the MAX_PARTITION_UPDATES_PER_RPC
|
|
batch size works, in that it creates all the underlying partitions.
|
|
"""
|
|
self.client.execute(
|
|
"create table {0}.t(i int) partitioned by (p int)".format(unique_database))
|
|
MAX_PARTITION_UPDATES_PER_RPC = 500
|
|
alter_stmt = "alter table {0}.t add ".format(unique_database) + " ".join(
|
|
"partition(p=%d)" % (i,) for i in range(MAX_PARTITION_UPDATES_PER_RPC + 2))
|
|
self.client.execute(alter_stmt)
|
|
partitions = self.client.execute("show partitions {0}.t".format(unique_database))
|
|
# Show partitions will contain partition HDFS paths, which we expect to contain
|
|
# "p=val" subdirectories for each partition. The regexp finds all the "p=[0-9]*"
|
|
# paths, converts them to integers, and checks that wehave all the ones we
|
|
# expect.
|
|
PARTITION_RE = re.compile("p=([0-9]+)")
|
|
assert list(map(int, PARTITION_RE.findall(str(partitions.data)))) == \
|
|
list(range(MAX_PARTITION_UPDATES_PER_RPC + 2))
|
|
|
|
def test_create_alter_tbl_properties(self, unique_database):
|
|
fq_tbl_name = unique_database + ".test_alter_tbl"
|
|
|
|
# Specify TBLPROPERTIES and SERDEPROPERTIES at CREATE time
|
|
self.client.execute("""create table {0} (i int)
|
|
with serdeproperties ('s1'='s2', 's3'='s4')
|
|
tblproperties ('p1'='v0', 'p1'='v1')""".format(fq_tbl_name))
|
|
properties = self._get_tbl_properties(fq_tbl_name)
|
|
|
|
if HIVE_MAJOR_VERSION > 2:
|
|
assert properties['OBJCAPABILITIES'] == 'EXTREAD,EXTWRITE'
|
|
assert properties['TRANSLATED_TO_EXTERNAL'] == 'TRUE'
|
|
assert properties['external.table.purge'] == 'TRUE'
|
|
assert properties['EXTERNAL'] == 'TRUE'
|
|
del properties['OBJCAPABILITIES']
|
|
del properties['TRANSLATED_TO_EXTERNAL']
|
|
del properties['external.table.purge']
|
|
del properties['EXTERNAL']
|
|
assert len(properties) == 2
|
|
# The transient_lastDdlTime is variable, so don't verify the value.
|
|
assert 'transient_lastDdlTime' in properties
|
|
del properties['transient_lastDdlTime']
|
|
assert {'p1': 'v1'} == properties
|
|
|
|
properties = self._get_serde_properties(fq_tbl_name)
|
|
assert {'s1': 's2', 's3': 's4'} == properties
|
|
|
|
# Modify the SERDEPROPERTIES using ALTER TABLE SET.
|
|
self.client.execute("alter table {0} set serdeproperties "
|
|
"('s1'='new', 's5'='s6')".format(fq_tbl_name))
|
|
properties = self._get_serde_properties(fq_tbl_name)
|
|
assert {'s1': 'new', 's3': 's4', 's5': 's6'} == properties
|
|
|
|
# Modify the TBLPROPERTIES using ALTER TABLE SET.
|
|
self.client.execute("alter table {0} set tblproperties "
|
|
"('prop1'='val1', 'p2'='val2', 'p2'='val3', ''='')".format(fq_tbl_name))
|
|
properties = self._get_tbl_properties(fq_tbl_name)
|
|
|
|
if HIVE_MAJOR_VERSION > 2:
|
|
assert 'OBJCAPABILITIES' in properties
|
|
assert 'transient_lastDdlTime' in properties
|
|
assert properties['p1'] == 'v1'
|
|
assert properties['prop1'] == 'val1'
|
|
assert properties['p2'] == 'val3'
|
|
assert properties[''] == ''
|
|
|
|
@SkipIfHive2.acid
|
|
def test_create_insertonly_tbl(self, unique_database):
|
|
insertonly_tbl = unique_database + ".test_insertonly"
|
|
self.client.execute("""create table {0} (coli int) stored as parquet tblproperties(
|
|
'transactional'='true', 'transactional_properties'='insert_only')"""
|
|
.format(insertonly_tbl))
|
|
properties = self._get_tbl_properties(insertonly_tbl)
|
|
assert properties['OBJCAPABILITIES'] == 'HIVEMANAGEDINSERTREAD,HIVEMANAGEDINSERTWRITE'
|
|
|
|
def test_alter_tbl_properties_reload(self, unique_database):
|
|
# IMPALA-8734: Force a table schema reload when setting table properties.
|
|
tbl_name = "test_tbl"
|
|
self.execute_query_expect_success(self.client, "create table {0}.{1} (c1 string)"
|
|
.format(unique_database, tbl_name))
|
|
self.filesystem_client.create_file("{2}/{0}.db/{1}/f".
|
|
format(unique_database, tbl_name, WAREHOUSE),
|
|
file_data="\nfoo\n")
|
|
self.execute_query_expect_success(self.client,
|
|
"alter table {0}.{1} set tblproperties"
|
|
"('serialization.null.format'='foo')"
|
|
.format(unique_database, tbl_name))
|
|
result = self.execute_query_expect_success(self.client,
|
|
"select * from {0}.{1}"
|
|
.format(unique_database, tbl_name))
|
|
assert len(result.data) == 2
|
|
assert result.data[0] == ''
|
|
assert result.data[1] == 'NULL'
|
|
|
|
@SkipIfFS.incorrent_reported_ec
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_partition_ddl_predicates(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/partition-ddl-predicates-all-fs', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
if IS_HDFS:
|
|
self.run_test_case('QueryTest/partition-ddl-predicates-hdfs-only', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
def test_create_table_file_format(self, unique_database):
|
|
# When default_file_format query option is not specified, the default table file
|
|
# format is TEXT.
|
|
text_table = "{0}.text_tbl".format(unique_database)
|
|
self.execute_query_expect_success(
|
|
self.client, "create table {0}(i int)".format(text_table))
|
|
result = self.execute_query_expect_success(
|
|
self.client, "show create table {0}".format(text_table))
|
|
assert any("TEXTFILE" in x for x in result.data)
|
|
|
|
self.execute_query_expect_failure(
|
|
self.client, "create table {0}.foobar_tbl".format(unique_database),
|
|
{"default_file_format": "foobar"})
|
|
|
|
parquet_table = "{0}.parquet_tbl".format(unique_database)
|
|
self.execute_query_expect_success(
|
|
self.client, "create table {0}(i int)".format(parquet_table),
|
|
{"default_file_format": "parquet"})
|
|
result = self.execute_query_expect_success(
|
|
self.client, "show create table {0}".format(parquet_table))
|
|
assert any("PARQUET" in x for x in result.data)
|
|
|
|
# The table created should still be ORC even though the default_file_format query
|
|
# option is set to parquet.
|
|
orc_table = "{0}.orc_tbl".format(unique_database)
|
|
self.execute_query_expect_success(
|
|
self.client,
|
|
"create table {0}(i int) stored as orc".format(orc_table),
|
|
{"default_file_format": "parquet"})
|
|
result = self.execute_query_expect_success(
|
|
self.client, "show create table {0}".format(orc_table))
|
|
assert any("ORC" in x for x in result.data)
|
|
|
|
@SkipIfHive2.acid
|
|
def test_create_table_transactional_type(self, unique_database):
|
|
# When default_transactional_type query option is not specified, the transaction
|
|
# related table properties are not set.
|
|
non_acid_table = "{0}.non_acid_tbl".format(unique_database)
|
|
self.execute_query_expect_success(
|
|
self.client, "create table {0}(i int)".format(non_acid_table),
|
|
{"default_transactional_type": "none"})
|
|
props = self._get_properties("Table Parameters", non_acid_table)
|
|
assert "transactional" not in props
|
|
assert "transactional_properties" not in props
|
|
|
|
# Create table as "insert_only" transactional.
|
|
insert_only_acid_table = "{0}.insert_only_acid_tbl".format(unique_database)
|
|
self.execute_query_expect_success(
|
|
self.client, "create table {0}(i int)".format(insert_only_acid_table),
|
|
{"default_transactional_type": "insert_only"})
|
|
props = self._get_properties("Table Parameters", insert_only_acid_table)
|
|
assert props["transactional"] == "true"
|
|
assert props["transactional_properties"] == "insert_only"
|
|
|
|
# default_transactional_type query option should not affect external tables
|
|
external_table = "{0}.external_tbl".format(unique_database)
|
|
self.execute_query_expect_success(
|
|
self.client, "create external table {0}(i int)".format(external_table),
|
|
{"default_transactional_type": "insert_only"})
|
|
props = self._get_properties("Table Parameters", external_table)
|
|
assert "transactional" not in props
|
|
assert "transactional_properties" not in props
|
|
|
|
# default_transactional_type query option should not affect Kudu tables.
|
|
kudu_table = "{0}.kudu_tbl".format(unique_database)
|
|
self.execute_query_expect_success(
|
|
self.client,
|
|
"create table {0}(i int primary key) stored as kudu".format(kudu_table),
|
|
{"default_transactional_type": "insert_only"})
|
|
props = self._get_properties("Table Parameters", kudu_table)
|
|
assert "transactional" not in props
|
|
assert "transactional_properties" not in props
|
|
|
|
# default_transactional_type query option should have no effect when transactional
|
|
# table properties are set manually.
|
|
manual_acid_table = "{0}.manual_acid_tbl".format(unique_database)
|
|
self.execute_query_expect_success(
|
|
self.client, "create table {0}(i int) TBLPROPERTIES ('transactional'='false')"
|
|
.format(manual_acid_table),
|
|
{"default_transactional_type": "insert_only"})
|
|
props = self._get_properties("Table Parameters", manual_acid_table)
|
|
assert "transactional" not in props
|
|
assert "transactional_properties" not in props
|
|
|
|
def test_kudu_column_comment(self, unique_database):
|
|
table = "{0}.kudu_table0".format(unique_database)
|
|
self.client.execute("create table {0}(x int comment 'x' primary key) \
|
|
stored as kudu".format(table))
|
|
comment = self._get_column_comment(table, 'x')
|
|
assert "x" == comment
|
|
|
|
table = "{0}.kudu_table".format(unique_database)
|
|
self.client.execute("create table {0}(i int primary key) stored as kudu"
|
|
.format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is 'comment1'".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "comment1" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is ''".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is 'comment2'".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "comment2" == comment
|
|
|
|
self.client.execute("comment on column {0}.i is null".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "" == comment
|
|
|
|
self.client.execute("alter table {0} alter column i set comment 'comment3'"
|
|
.format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "comment3" == comment
|
|
|
|
self.client.execute("alter table {0} alter column i set comment ''".format(table))
|
|
comment = self._get_column_comment(table, 'i')
|
|
assert "" == comment
|
|
|
|
self.client.execute("alter table {0} add columns (j int comment 'comment4')"
|
|
.format(table))
|
|
comment = self._get_column_comment(table, 'j')
|
|
assert "comment4" == comment
|
|
|
|
@UniqueDatabase.parametrize(sync_ddl=True)
|
|
def test_describe_materialized_view(self, vector, unique_database):
|
|
vector.get_value('exec_option')['abort_on_error'] = False
|
|
self.run_test_case('QueryTest/describe-materialized-view', vector,
|
|
use_db=unique_database, multiple_impalad=self._use_multiple_impalad(vector))
|
|
|
|
|
|
# IMPALA-10811: RPC to submit query getting stuck for AWS NLB forever
|
|
# Test HS2, Beeswax and HS2-HTTP three clients.
|
|
class TestAsyncDDL(TestDdlBase):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestAsyncDDL, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
sync_ddl=[0], disable_codegen_options=[False]))
|
|
|
|
def test_async_ddl(self, vector, unique_database):
|
|
self.run_test_case('QueryTest/async_ddl', vector, use_db=unique_database)
|
|
|
|
def test_async_ddl_with_JDBC(self, unique_database):
|
|
self.exec_with_jdbc("drop table if exists {0}.test_table".format(unique_database))
|
|
self.exec_with_jdbc_and_compare_result(
|
|
"create table {0}.test_table(a int)".format(unique_database),
|
|
"'Table has been created.'")
|
|
|
|
self.exec_with_jdbc("drop table if exists {0}.alltypes_clone".format(unique_database))
|
|
self.exec_with_jdbc_and_compare_result(
|
|
"create table {0}.alltypes_clone as select * from\
|
|
functional_parquet.alltypes".format(unique_database),
|
|
"'Inserted 7300 row(s)'")
|
|
|
|
@classmethod
|
|
def test_get_operation_status_for_client(self, client, unique_database):
|
|
# Setup
|
|
SLEEP_S = 2
|
|
client.execute("select count(*) from functional_parquet.alltypes")
|
|
client.set_configuration_option("enable_async_ddl_execution", "true")
|
|
client.set_configuration_option("debug_action",
|
|
"CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@%s" % (SLEEP_S * 1000))
|
|
# Run the test query which will only compile the DDL in execute_statement()
|
|
# and measure the time spent. Should be less than 3s.
|
|
start = time.time()
|
|
handle = client.execute_async(
|
|
"create table {0}.alltypes_clone as select sleep({1})".format(
|
|
unique_database, SLEEP_S * 1000))
|
|
end = time.time()
|
|
assert (end - start <= 3)
|
|
|
|
# The table creation and population part will be done in a separate thread.
|
|
# The query must be in PENDING state after execute_async and enter RUNNING state
|
|
# after creating the table in catalogd (at least SLEEP_S delay). The query can
|
|
# enter FINISHED state after another delay of at least SLEEP_S.
|
|
assert client.get_impala_exec_state(handle) == PENDING
|
|
client.wait_for_impala_state(handle, RUNNING, SLEEP_S + 3)
|
|
client.wait_for_impala_state(handle, FINISHED, SLEEP_S + 3)
|
|
client.close_query(handle)
|
|
|
|
def test_get_operation_status_for_async_ddl(self, vector, unique_database):
|
|
"""Tests that for an asynchronously executed DDL with delay, GetOperationStatus
|
|
must be issued repeatedly. Test client hs2-http, hs2 and beeswax"""
|
|
client = self.default_impala_client(vector.get_value('protocol'))
|
|
self.test_get_operation_status_for_client(client, unique_database)
|
|
|
|
|
|
class TestAsyncDDLTiming(TestDdlBase):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestAsyncDDLTiming, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_client_protocol_dimension())
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension(
|
|
sync_ddl=[0], disable_codegen_options=[False]))
|
|
cls.ImpalaTestMatrix.add_dimension(
|
|
ImpalaTestDimension('enable_async_ddl_execution', True, False))
|
|
|
|
def test_alter_table_recover(self, vector, unique_database):
|
|
enable_async_ddl = vector.get_value('enable_async_ddl_execution')
|
|
client = self.create_impala_client(protocol=vector.get_value('protocol'))
|
|
|
|
try:
|
|
# Setup for the alter table case (create table that points to an existing
|
|
# location)
|
|
alltypes_location = get_fs_path("/test-warehouse/alltypes_parquet")
|
|
source_tbl = "functional_parquet.alltypes"
|
|
dest_tbl = "{0}.alltypes_clone".format(unique_database)
|
|
create_table_stmt = 'create external table {0} like {1} location "{2}"'.format(
|
|
dest_tbl, source_tbl, alltypes_location)
|
|
self.execute_query_expect_success(client, create_table_stmt)
|
|
|
|
# Describe the table to fetch its metadata
|
|
self.execute_query_expect_success(client, "describe {0}".format(dest_tbl))
|
|
|
|
# Configure whether to use async DDL and add appropriate delays
|
|
new_vector = deepcopy(vector)
|
|
new_vector.get_value('exec_option')['enable_async_ddl_execution'] = enable_async_ddl
|
|
new_vector.get_value('exec_option')['debug_action'] = \
|
|
"CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000"
|
|
exec_start = time.time()
|
|
alter_stmt = "alter table {0} recover partitions".format(dest_tbl)
|
|
handle = self.execute_query_async_using_client(client, alter_stmt, new_vector)
|
|
exec_end = time.time()
|
|
exec_time = exec_end - exec_start
|
|
state = client.get_impala_exec_state(handle)
|
|
if enable_async_ddl:
|
|
assert state in [PENDING, RUNNING]
|
|
else:
|
|
assert state in [RUNNING, FINISHED]
|
|
|
|
# Wait for the statement to finish with a timeout of 20 seconds
|
|
wait_start = time.time()
|
|
client.wait_for_impala_state(handle, FINISHED, 20)
|
|
wait_end = time.time()
|
|
wait_time = wait_end - wait_start
|
|
self.close_query_using_client(client, handle)
|
|
# In sync mode:
|
|
# The entire DDL is processed in the exec step with delay. exec_time should be
|
|
# more than 10 seconds.
|
|
#
|
|
# In async mode:
|
|
# The compilation of DDL is processed in the exec step without delay. And the
|
|
# processing of the DDL plan is in wait step with delay. The wait time should
|
|
# definitely take more time than 10 seconds.
|
|
if enable_async_ddl:
|
|
assert(wait_time >= 10)
|
|
else:
|
|
assert(exec_time >= 10)
|
|
finally:
|
|
client.close()
|
|
|
|
def test_ctas(self, vector, unique_database):
|
|
enable_async_ddl = vector.get_value('enable_async_ddl_execution')
|
|
client = self.create_impala_client(protocol=vector.get_value('protocol'))
|
|
|
|
try:
|
|
# The CTAS is going to need the metadata of the source table in the
|
|
# select. To avoid flakiness about metadata loading, this selects from
|
|
# that source table first to get the metadata loaded.
|
|
self.execute_query_expect_success(client,
|
|
"select count(*) from functional_parquet.alltypes")
|
|
|
|
# Configure whether to use async DDL and add appropriate delays
|
|
new_vector = deepcopy(vector)
|
|
new_vector.get_value('exec_option')['enable_async_ddl_execution'] = enable_async_ddl
|
|
create_delay = "CRS_DELAY_BEFORE_CATALOG_OP_EXEC:SLEEP@10000"
|
|
insert_delay = "CRS_BEFORE_COORD_STARTS:SLEEP@2000"
|
|
new_vector.get_value('exec_option')['debug_action'] = \
|
|
"{0}|{1}".format(create_delay, insert_delay)
|
|
dest_tbl = "{0}.ctas_test".format(unique_database)
|
|
source_tbl = "functional_parquet.alltypes"
|
|
ctas_stmt = 'create external table {0} as select * from {1}'.format(
|
|
dest_tbl, source_tbl)
|
|
exec_start = time.time()
|
|
handle = self.execute_query_async_using_client(client, ctas_stmt, new_vector)
|
|
exec_end = time.time()
|
|
exec_time = exec_end - exec_start
|
|
# The CRS_BEFORE_COORD_STARTS delay postpones the transition from PENDING
|
|
# to RUNNING, so the sync case should be in PENDING state at the end of
|
|
# the execute call. This means that the sync and async cases are the same.
|
|
assert client.is_pending(handle)
|
|
|
|
# Wait for the statement to finish with a timeout of 40 seconds
|
|
# (60 seconds without shortcircuit reads). There are other tests running
|
|
# in parallel and ASAN can be slow, so this timeout has been bumped
|
|
# substantially to avoid flakiness. The actual test case does not depend
|
|
# on the statement finishing in a particular amount of time.
|
|
wait_time = 40 if IS_HDFS else 60
|
|
wait_start = time.time()
|
|
client.wait_for_impala_state(handle, FINISHED, wait_time)
|
|
wait_end = time.time()
|
|
wait_time = wait_end - wait_start
|
|
self.close_query_using_client(client, handle)
|
|
# In sync mode:
|
|
# The entire CTAS is processed in the exec step with delay. exec_time should be
|
|
# more than 10 seconds.
|
|
#
|
|
# In async mode:
|
|
# The compilation of CTAS is processed in the exec step without delay. And the
|
|
# processing of the CTAS plan is in wait step with delay. The wait time should
|
|
# definitely take more time than 10 seconds.
|
|
if enable_async_ddl:
|
|
assert(wait_time >= 10)
|
|
else:
|
|
assert(exec_time >= 10)
|
|
finally:
|
|
client.close()
|
|
|
|
|
|
class TestDdlLogs(TestDdlBase):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestDdlLogs, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_exec_option_dimension_from_dict({
|
|
'enable_async_ddl_execution': [True, False]}))
|
|
|
|
@SkipIfDockerizedCluster.daemon_logs_not_exposed
|
|
def test_error_logs(self, vector, unique_database):
|
|
query_opts = vector.get_value('exec_option')
|
|
tbl_name = 'test_async' if query_opts['enable_async_ddl_execution'] else 'test_sync'
|
|
tbl_name = unique_database + '.' + tbl_name
|
|
result = self.execute_query_expect_failure(
|
|
self.client, 'invalidate metadata ' + tbl_name, query_opts)
|
|
err = "TableNotFoundException: Table not found: " + tbl_name
|
|
assert err in str(result)
|
|
self.assert_impalad_log_contains('INFO', err)
|
|
|
|
|
|
# IMPALA-2002: Tests repeated adding/dropping of .jar and .so in the lib cache.
|
|
class TestLibCache(TestDdlBase):
|
|
@classmethod
|
|
def add_test_dimensions(cls):
|
|
super(TestLibCache, cls).add_test_dimensions()
|
|
cls.ImpalaTestMatrix.add_dimension(create_single_exec_option_dimension())
|
|
|
|
def test_create_drop_function(self, vector, unique_database):
|
|
"""This will create, run, and drop the same function repeatedly, exercising the
|
|
lib cache mechanism.
|
|
"""
|
|
create_fn_stmt = ("create function {0}.f() returns int "
|
|
"location '{1}/libTestUdfs.so' symbol='NoArgs'"
|
|
.format(unique_database, WAREHOUSE))
|
|
select_stmt = ("select {0}.f() from functional.alltypes limit 10"
|
|
.format(unique_database))
|
|
drop_fn_stmt = "drop function %s {0}.f()".format(unique_database)
|
|
self.create_drop_ddl(vector, [create_fn_stmt], [drop_fn_stmt], select_stmt)
|
|
|
|
# Run serially because this test inspects global impalad metrics.
|
|
# TODO: The metrics checks could be relaxed to enable running this test in
|
|
# parallel, but that might need a more general wait_for_metric_value().
|
|
@pytest.mark.execute_serially
|
|
def test_create_drop_data_src(self, vector, unique_database):
|
|
"""This will create, run, and drop the same data source repeatedly, exercising
|
|
the lib cache mechanism.
|
|
"""
|
|
data_src_name = unique_database + "_datasrc"
|
|
create_ds_stmt = ("CREATE DATA SOURCE {0} "
|
|
"LOCATION '{1}/data-sources/test-data-source.jar' "
|
|
"CLASS 'org.apache.impala.extdatasource.AllTypesDataSource' "
|
|
"API_VERSION 'V1'".format(data_src_name, WAREHOUSE))
|
|
create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) "
|
|
"PRODUCED BY DATA SOURCE {1}('dummy_init_string')")\
|
|
.format(unique_database, data_src_name)
|
|
drop_ds_stmt = "drop data source %s {0}".format(data_src_name)
|
|
drop_tbl_stmt = "drop table %s {0}.data_src_tbl".format(unique_database)
|
|
select_stmt = "select * from {0}.data_src_tbl limit 1".format(unique_database)
|
|
class_cache_hits_metric = "external-data-source.class-cache.hits"
|
|
class_cache_misses_metric = "external-data-source.class-cache.misses"
|
|
|
|
create_stmts = [create_ds_stmt, create_tbl_stmt]
|
|
drop_stmts = [drop_tbl_stmt, drop_ds_stmt]
|
|
|
|
# The ImpaladService is used to capture metrics
|
|
service = self.impalad_test_service
|
|
|
|
# Initial metric values
|
|
class_cache_hits = service.get_metric_value(class_cache_hits_metric)
|
|
class_cache_misses = service.get_metric_value(class_cache_misses_metric)
|
|
# Test with 1 node so we can check the metrics on only the coordinator
|
|
vector.get_value('exec_option')['num_nodes'] = 1
|
|
num_iterations = 2
|
|
self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, num_iterations)
|
|
|
|
# Check class cache metrics. Shouldn't have any new cache hits, there should be
|
|
# 2 cache misses for every iteration (jar is loaded by both the FE and BE).
|
|
expected_cache_misses = class_cache_misses + (num_iterations * 2)
|
|
service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits)
|
|
service.wait_for_metric_value(class_cache_misses_metric,
|
|
expected_cache_misses)
|
|
|
|
# Test with a table that caches the class
|
|
create_tbl_stmt = ("CREATE TABLE {0}.data_src_tbl (x int) "
|
|
"PRODUCED BY DATA SOURCE {1}('CACHE_CLASS::dummy_init_string')")\
|
|
.format(unique_database, data_src_name)
|
|
create_stmts = [create_ds_stmt, create_tbl_stmt]
|
|
# Run once before capturing metrics because the class already may be cached from
|
|
# a previous test run.
|
|
# TODO: Provide a way to clear the cache
|
|
self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1)
|
|
|
|
# Capture metric values and run again, should hit the cache.
|
|
class_cache_hits = service.get_metric_value(class_cache_hits_metric)
|
|
class_cache_misses = service.get_metric_value(class_cache_misses_metric)
|
|
self.create_drop_ddl(vector, create_stmts, drop_stmts, select_stmt, 1)
|
|
service.wait_for_metric_value(class_cache_hits_metric, class_cache_hits + 2)
|
|
service.wait_for_metric_value(class_cache_misses_metric, class_cache_misses)
|
|
|
|
def create_drop_ddl(self, vector, create_stmts, drop_stmts, select_stmt,
|
|
num_iterations=3):
|
|
"""Helper method to run CREATE/DROP DDL commands repeatedly and exercise the lib
|
|
cache. create_stmts is the list of CREATE statements to be executed in order
|
|
drop_stmts is the list of DROP statements to be executed in order. Each statement
|
|
should have a '%s' placeholder to insert "IF EXISTS" or "". The select_stmt is just a
|
|
single statement to test after executing the CREATE statements.
|
|
TODO: it's hard to tell that the cache is working (i.e. if it did nothing to drop
|
|
the cache, these tests would still pass). Testing that is a bit harder and requires
|
|
us to update the udf binary in the middle.
|
|
"""
|
|
self.client.set_configuration(vector.get_value("exec_option"))
|
|
for drop_stmt in drop_stmts: self.client.execute(drop_stmt % ("if exists"))
|
|
for i in range(0, num_iterations):
|
|
for create_stmt in create_stmts: self.client.execute(create_stmt)
|
|
self.client.execute(select_stmt)
|
|
for drop_stmt in drop_stmts: self.client.execute(drop_stmt % (""))
|