mirror of
https://github.com/apache/impala.git
synced 2026-01-09 06:05:09 -05:00
IMPALA-2521 introduced clustering for insert statements. This change makes the HdfsTableSink aware of clustered inputs, so that partitions are opened, written, and closed one by one. This change also adds/modifies tests in several ways: - clustered insert tests switch from selecting all rows from alltypessmall to alltypes. Together with varying settings for batch_size, this results in a larger number of row batches being written. - clustered insert tests select from alltypes instead of functional.alltypes to make sure we also select from various input formats. - clustered insert tests have been added to select from alltypestiny to create inserts with 1 and 2 rows per partition respectively. - exhaustive insert tests now use different values for batch_size: 1, 16, 0 (meaning default, 1024). This is limited to uncompressed parquet files, to maintain a reasonable runtime. On my machine execution of test.insert took 1778 seconds, compared to 1002 seconds with the just default row batch size. - There is additional testing in test_insert_behaviour.py to make sure that insertion over several row batches only creates one file per partition. - It renames the test_insert method to make it unique in the file and allow for effective filtering with -k. - It adds tests to the Analyzer test suite. Change-Id: Ibeda0bdabbfe44c8ac95bf7c982a75649e1b82d0 Reviewed-on: http://gerrit.cloudera.org:8080/4863 Reviewed-by: Lars Volker <lv@cloudera.com> Reviewed-by: Tim Armstrong <tarmstrong@cloudera.com> Tested-by: Internal Jenkins
582 lines
27 KiB
Python
582 lines
27 KiB
Python
# Licensed to the Apache Software Foundation (ASF) under one
|
|
# or more contributor license agreements. See the NOTICE file
|
|
# distributed with this work for additional information
|
|
# regarding copyright ownership. The ASF licenses this file
|
|
# to you under the Apache License, Version 2.0 (the
|
|
# "License"); you may not use this file except in compliance
|
|
# with the License. You may obtain a copy of the License at
|
|
#
|
|
# http://www.apache.org/licenses/LICENSE-2.0
|
|
#
|
|
# Unless required by applicable law or agreed to in writing,
|
|
# software distributed under the License is distributed on an
|
|
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
|
# KIND, either express or implied. See the License for the
|
|
# specific language governing permissions and limitations
|
|
# under the License.
|
|
|
|
import getpass
|
|
import grp
|
|
import pwd
|
|
import pytest
|
|
|
|
from tests.common.impala_test_suite import ImpalaTestSuite
|
|
from tests.common.parametrize import UniqueDatabase
|
|
from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal
|
|
from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_S3
|
|
|
|
@SkipIfLocal.hdfs_client
|
|
class TestInsertBehaviour(ImpalaTestSuite):
|
|
"""Tests for INSERT behaviour that isn't covered by checking query results"""
|
|
@classmethod
|
|
def get_workload(self):
|
|
return 'functional-query'
|
|
|
|
TEST_DB_NAME = "insert_empty_result_db"
|
|
|
|
def setup_method(self, method):
|
|
# cleanup and create a fresh test database
|
|
if method.__name__ == "test_insert_select_with_empty_resultset":
|
|
self.cleanup_db(self.TEST_DB_NAME)
|
|
self.execute_query("create database if not exists {0} location "
|
|
"'{1}/{0}.db'".format(self.TEST_DB_NAME, WAREHOUSE))
|
|
|
|
def teardown_method(self, method):
|
|
if method.__name__ == "test_insert_select_with_empty_resultset":
|
|
self.cleanup_db(self.TEST_DB_NAME)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_insert_removes_staging_files(self):
|
|
TBL_NAME = "insert_overwrite_nopart"
|
|
insert_staging_dir = ("test-warehouse/functional.db/%s/"
|
|
"_impala_insert_staging" % TBL_NAME)
|
|
self.filesystem_client.delete_file_dir(insert_staging_dir, recursive=True)
|
|
self.client.execute("INSERT OVERWRITE functional.%s "
|
|
"SELECT int_col FROM functional.tinyinttable" % TBL_NAME)
|
|
ls = self.filesystem_client.ls(insert_staging_dir)
|
|
assert len(ls) == 0
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_insert_preserves_hidden_files(self):
|
|
"""Test that INSERT OVERWRITE preserves hidden files in the root table directory"""
|
|
TBL_NAME = "insert_overwrite_nopart"
|
|
table_dir = "test-warehouse/functional.db/%s/" % TBL_NAME
|
|
hidden_file_locations = [".hidden", "_hidden"]
|
|
dir_locations = ["dir", ".hidden_dir"]
|
|
|
|
for dir_ in dir_locations:
|
|
self.filesystem_client.make_dir(table_dir + dir_)
|
|
|
|
# We do this here because the above 'make_dir' call doesn't make a directory for S3.
|
|
for dir_ in dir_locations:
|
|
self.filesystem_client.create_file(
|
|
table_dir + dir_ + '/' + hidden_file_locations[0] , '', overwrite=True)
|
|
|
|
for file_ in hidden_file_locations:
|
|
self.filesystem_client.create_file(table_dir + file_, '', overwrite=True)
|
|
|
|
self.client.execute("INSERT OVERWRITE functional.%s"
|
|
" SELECT int_col FROM functional.tinyinttable" % TBL_NAME)
|
|
|
|
for file_ in hidden_file_locations:
|
|
assert self.filesystem_client.exists(table_dir + file_), "Hidden file {0} was " \
|
|
"unexpectedly deleted by INSERT OVERWRITE".format(table_dir + file_)
|
|
|
|
for dir_ in dir_locations:
|
|
assert self.filesystem_client.exists(table_dir + dir_), "Directory {0} was " \
|
|
"unexpectedly deleted by INSERT OVERWRITE".format(table_dir + dir_)
|
|
|
|
def test_insert_ascii_nulls(self, unique_database):
|
|
TBL_NAME = '`{0}`.`null_insert`'.format(unique_database)
|
|
self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS %s" % TBL_NAME)
|
|
self.execute_query_expect_success(self.client, "create table %s as select '\0' s"
|
|
% TBL_NAME)
|
|
result = self.execute_query_expect_success(self.client,
|
|
"SELECT LENGTH(s) FROM %s" % TBL_NAME)
|
|
assert int(result.get_data()) == 1
|
|
|
|
@UniqueDatabase.parametrize(name_prefix='test_insert_alter_partition_location_db')
|
|
def test_insert_alter_partition_location(self, unique_database):
|
|
"""Test that inserts after changing the location of a partition work correctly,
|
|
including the creation of a non-existant partition dir"""
|
|
part_dir = "tmp/{0}".format(unique_database)
|
|
qualified_part_dir = get_fs_path('/' + part_dir)
|
|
table_name = "`{0}`.`insert_alter_partition_location`".format(unique_database)
|
|
|
|
self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS %s" % table_name)
|
|
self.filesystem_client.delete_file_dir(part_dir, recursive=True)
|
|
|
|
self.execute_query_expect_success(
|
|
self.client,
|
|
"CREATE TABLE %s (c int) PARTITIONED BY (p int)" % table_name)
|
|
self.execute_query_expect_success(
|
|
self.client,
|
|
"ALTER TABLE %s ADD PARTITION(p=1)" % table_name)
|
|
self.execute_query_expect_success(
|
|
self.client,
|
|
"ALTER TABLE %s PARTITION(p=1) SET LOCATION '%s'" % (table_name,
|
|
qualified_part_dir))
|
|
self.execute_query_expect_success(
|
|
self.client,
|
|
"INSERT OVERWRITE %s PARTITION(p=1) VALUES(1)" % table_name)
|
|
|
|
result = self.execute_query_expect_success(
|
|
self.client,
|
|
"SELECT COUNT(*) FROM %s" % table_name)
|
|
assert int(result.get_data()) == 1
|
|
|
|
# Should have created the partition dir, which should contain exactly one file (not in
|
|
# a subdirectory)
|
|
assert len(self.filesystem_client.ls(part_dir)) == 1
|
|
|
|
@SkipIfS3.hdfs_acls
|
|
@SkipIfIsilon.hdfs_acls
|
|
@pytest.mark.xfail(run=False, reason="Fails intermittently on test clusters")
|
|
@pytest.mark.execute_serially
|
|
def test_insert_inherit_acls(self):
|
|
"""Check that ACLs are inherited when we create new partitions"""
|
|
|
|
ROOT_ACL = "default:group:dummy_group:rwx"
|
|
TEST_ACL = "default:group:impala_test_users:r-x"
|
|
|
|
def check_has_acls(part, acl=TEST_ACL):
|
|
path = "test-warehouse/functional.db/insert_inherit_acls/" + part
|
|
result = self.hdfs_client.getacl(path)
|
|
assert acl in result['AclStatus']['entries']
|
|
|
|
# Add a spurious ACL to functional.db directory
|
|
self.hdfs_client.setacl("test-warehouse/functional.db", ROOT_ACL)
|
|
|
|
self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS"
|
|
" functional.insert_inherit_acls")
|
|
self.execute_query_expect_success(self.client, "CREATE TABLE "
|
|
"functional.insert_inherit_acls (col int)"
|
|
" PARTITIONED BY (p1 int, p2 int, p3 int)")
|
|
|
|
# Check that table creation inherited the ACL
|
|
check_has_acls("", ROOT_ACL)
|
|
|
|
self.execute_query_expect_success(self.client, "ALTER TABLE "
|
|
"functional.insert_inherit_acls ADD PARTITION"
|
|
"(p1=1, p2=1, p3=1)")
|
|
|
|
check_has_acls("p1=1", ROOT_ACL)
|
|
check_has_acls("p1=1/p2=1", ROOT_ACL)
|
|
check_has_acls("p1=1/p2=1/p3=1", ROOT_ACL)
|
|
|
|
self.hdfs_client.setacl(
|
|
"test-warehouse/functional.db/insert_inherit_acls/p1=1/", TEST_ACL)
|
|
|
|
self.execute_query_expect_success(self.client, "INSERT INTO "
|
|
"functional.insert_inherit_acls "
|
|
"PARTITION(p1=1, p2=2, p3=2) VALUES(1)")
|
|
|
|
check_has_acls("p1=1/p2=2/")
|
|
check_has_acls("p1=1/p2=2/p3=2")
|
|
|
|
# Check that SETACL didn't cascade down to children (which is more to do with HDFS
|
|
# than Impala, but worth asserting here)
|
|
check_has_acls("p1=1/p2=1", ROOT_ACL)
|
|
check_has_acls("p1=1/p2=1/p3=1", ROOT_ACL)
|
|
|
|
# Change ACLs on p1=1,p2=2 and create a new leaf at p3=30
|
|
self.hdfs_client.setacl(
|
|
"test-warehouse/functional.db/insert_inherit_acls/p1=1/p2=2/",
|
|
"default:group:new_leaf_group:-w-")
|
|
|
|
self.execute_query_expect_success(self.client, "INSERT INTO "
|
|
"functional.insert_inherit_acls "
|
|
"PARTITION(p1=1, p2=2, p3=30) VALUES(1)")
|
|
check_has_acls("p1=1/p2=2/p3=30", "default:group:new_leaf_group:-w-")
|
|
|
|
@SkipIfS3.hdfs_acls
|
|
@SkipIfIsilon.hdfs_acls
|
|
def test_insert_file_permissions(self, unique_database):
|
|
"""Test that INSERT correctly respects file permission (minimum ACLs)"""
|
|
table = "`{0}`.`insert_acl_permissions`".format(unique_database)
|
|
table_path = "test-warehouse/{0}.db/insert_acl_permissions".format(unique_database)
|
|
|
|
insert_query = "INSERT INTO {0} VALUES(1)".format(table)
|
|
refresh_query = "REFRESH {0}".format(table)
|
|
|
|
self.execute_query_expect_success(self.client,
|
|
"DROP TABLE IF EXISTS {0}".format(table))
|
|
self.execute_query_expect_success(
|
|
self.client, "CREATE TABLE {0} (col int)".format(table))
|
|
|
|
# Check that a simple insert works
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
# Remove the permission to write and confirm that INSERTs won't work
|
|
self.hdfs_client.setacl(table_path, "user::r-x,group::r-x,other::r-x")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
# Now add group access, still should fail (because the user will match and take
|
|
# priority)
|
|
self.hdfs_client.setacl(table_path, "user::r-x,group::rwx,other::r-x")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
# Now make the target directory non-writable with posix permissions, but writable with
|
|
# ACLs (ACLs should take priority). Note: chmod affects ACLs (!) so it has to be done
|
|
# first.
|
|
self.hdfs_client.chmod(table_path, "000")
|
|
self.hdfs_client.setacl(table_path, "user::rwx,group::r-x,other::r-x")
|
|
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
# Finally, change the owner
|
|
self.hdfs_client.chown(table_path, "another_user", "another_group")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
# Should be unwritable because 'other' ACLs don't allow writes
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
# Give write perms back to 'other'
|
|
self.hdfs_client.setacl(table_path, "user::rwx,group::r-x,other::rwx")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
# Should be writable because 'other' ACLs allow writes
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
@SkipIfS3.hdfs_acls
|
|
@SkipIfIsilon.hdfs_acls
|
|
def test_insert_acl_permissions(self, unique_database):
|
|
"""Test that INSERT correctly respects ACLs"""
|
|
table = "`{0}`.`insert_acl_permissions`".format(unique_database)
|
|
table_path = "test-warehouse/{0}.db/insert_acl_permissions".format(unique_database)
|
|
|
|
insert_query = "INSERT INTO %s VALUES(1)" % table
|
|
refresh_query = "REFRESH {0}".format(table)
|
|
|
|
self.execute_query_expect_success(self.client,
|
|
"DROP TABLE IF EXISTS {0}".format(table))
|
|
self.execute_query_expect_success(self.client,
|
|
"CREATE TABLE {0} (col int)".format(table))
|
|
|
|
# Check that a simple insert works
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
user = getpass.getuser()
|
|
group = grp.getgrgid(pwd.getpwnam(user).pw_gid).gr_name
|
|
# First, change the owner to someone other than user who runs impala service
|
|
self.hdfs_client.chown(table_path, "another_user", group)
|
|
|
|
# Remove the permission to write and confirm that INSERTs won't work
|
|
self.hdfs_client.setacl(table_path,
|
|
"user::r-x,user:" + user + ":r-x,group::r-x,other::r-x")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
# Add the permission to write. if we're not the owner of the file, INSERTs should work
|
|
self.hdfs_client.setacl(table_path,
|
|
"user::r-x,user:" + user + ":rwx,group::r-x,other::r-x")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
# Now add group access, still should fail (because the user will match and take
|
|
# priority)
|
|
self.hdfs_client.setacl(table_path,
|
|
"user::r-x,user:" + user + ":r-x,group::rwx,other::r-x")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
# Check that the mask correctly applies to the anonymous group ACL
|
|
self.hdfs_client.setacl(table_path, "user::r-x,group::rwx,other::rwx,mask::r--")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
# Should be unwritable because mask applies to unnamed group and disables writing
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
# Check that the mask correctly applies to the named user ACL
|
|
self.hdfs_client.setacl(table_path, "user::r-x,user:" + user +
|
|
":rwx,group::r-x,other::rwx,mask::r--")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
# Should be unwritable because mask applies to named user and disables writing
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
# Now make the target directory non-writable with posix permissions, but writable with
|
|
# ACLs (ACLs should take priority). Note: chmod affects ACLs (!) so it has to be done
|
|
# first.
|
|
self.hdfs_client.chmod(table_path, "000")
|
|
self.hdfs_client.setacl(table_path, "user::rwx,user:foo:rwx,group::rwx,other::r-x")
|
|
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
# Finally, change the owner/group
|
|
self.hdfs_client.chown(table_path, "test_user", "invalid")
|
|
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
# Should be unwritable because 'other' ACLs don't allow writes
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
# Give write perms back to 'other'
|
|
self.hdfs_client.setacl(table_path, "user::r-x,user:foo:rwx,group::r-x,other::rwx")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
# Should be writable because 'other' ACLs allow writes
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
@SkipIfS3.hdfs_acls
|
|
@SkipIfIsilon.hdfs_acls
|
|
def test_load_permissions(self, unique_database):
|
|
# We rely on test_insert_acl_permissions() to exhaustively check that ACL semantics
|
|
# are correct. Here we just validate that LOADs can't be done when we cannot read from
|
|
# or write to the src directory, or write to the dest directory.
|
|
|
|
table = "`{0}`.`load_acl_permissions`".format(unique_database)
|
|
table_path = "test-warehouse/{0}.db/load_acl_permissions".format(unique_database)
|
|
|
|
file_path = "tmp/{0}".format(unique_database)
|
|
file_name = "%s/impala_data_file" % file_path
|
|
|
|
load_file_query = "LOAD DATA INPATH '/%s' INTO TABLE %s" % (file_name, table)
|
|
load_dir_query = "LOAD DATA INPATH '/%s' INTO TABLE %s" % (file_path, table)
|
|
|
|
refresh_query = "REFRESH {0}".format(table)
|
|
|
|
self.hdfs_client.make_dir(file_path)
|
|
|
|
self.hdfs_client.setacl(file_path, "user::rwx,group::rwx,other::---")
|
|
self.execute_query_expect_success(self.client,
|
|
"DROP TABLE IF EXISTS {0}".format(table))
|
|
self.execute_query_expect_success(
|
|
self.client, "CREATE TABLE {0} (col int)".format(table))
|
|
self.hdfs_client.delete_file_dir(file_name)
|
|
self.hdfs_client.create_file(file_name, "1")
|
|
|
|
self.execute_query_expect_success(self.client, load_file_query)
|
|
|
|
# Now remove write perms from the source directory
|
|
self.hdfs_client.create_file(file_name, "1")
|
|
self.hdfs_client.setacl(file_path, "user::---,group::---,other::---")
|
|
self.hdfs_client.setacl(table_path, "user::rwx,group::r-x,other::r-x")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_failure(self.client, load_file_query)
|
|
self.execute_query_expect_failure(self.client, load_dir_query)
|
|
|
|
# Remove write perms from target
|
|
self.hdfs_client.setacl(file_path, "user::rwx,group::rwx,other::rwx")
|
|
self.hdfs_client.setacl(table_path, "user::r-x,group::r-x,other::r-x")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_failure(self.client, load_file_query)
|
|
self.execute_query_expect_failure(self.client, load_dir_query)
|
|
|
|
# Finally remove read perms from file itself
|
|
self.hdfs_client.setacl(file_name, "user::-wx,group::rwx,other::rwx")
|
|
self.hdfs_client.setacl(table_path, "user::rwx,group::rwx,other::rwx")
|
|
self.execute_query_expect_success(self.client, refresh_query)
|
|
self.execute_query_expect_failure(self.client, load_file_query)
|
|
# We expect this to succeed, it's not an error if all files in the dir cannot be read
|
|
self.execute_query_expect_success(self.client, load_dir_query)
|
|
|
|
@pytest.mark.execute_serially
|
|
def test_insert_select_with_empty_resultset(self):
|
|
"""Test insert/select query won't trigger partition directory or zero size data file
|
|
creation if the resultset of select is empty."""
|
|
def check_path_exists(path, should_exist):
|
|
assert self.filesystem_client.exists(path) == should_exist, "file/dir '{0}' " \
|
|
"should {1}exist but does {2}exist.".format(
|
|
path, '' if should_exist else 'not ', 'not ' if should_exist else '')
|
|
|
|
db_path = "test-warehouse/%s.db/" % self.TEST_DB_NAME
|
|
table_path = db_path + "test_insert_empty_result"
|
|
partition_path = "{0}/year=2009/month=1".format(table_path)
|
|
check_path_exists(table_path, False)
|
|
|
|
table_name = self.TEST_DB_NAME + ".test_insert_empty_result"
|
|
self.execute_query_expect_success(
|
|
self.client,
|
|
"CREATE TABLE %s (id INT, col INT) PARTITIONED BY (year INT, month "
|
|
"INT)" % table_name)
|
|
check_path_exists(table_path, True)
|
|
check_path_exists(partition_path, False)
|
|
|
|
# Run an insert/select stmt that returns an empty resultset.
|
|
insert_query = ("INSERT INTO TABLE {0} PARTITION(year=2009, month=1)"
|
|
"select 1, 1 from {0} LIMIT 0".format(table_name))
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
# Partition directory is created
|
|
check_path_exists(partition_path, True)
|
|
|
|
# Insert one record
|
|
insert_query_one_row = ("INSERT INTO TABLE %s PARTITION(year=2009, month=1) "
|
|
"values(2, 2)" % table_name)
|
|
self.execute_query_expect_success(self.client, insert_query_one_row)
|
|
# Partition directory should be created with one data file
|
|
check_path_exists(partition_path, True)
|
|
ls = (self.filesystem_client.ls(partition_path))
|
|
assert len(ls) == 1
|
|
|
|
# Run an insert/select statement that returns an empty resultset again
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
# No new data file should be created
|
|
new_ls = self.filesystem_client.ls(partition_path)
|
|
assert len(new_ls) == 1
|
|
assert new_ls == ls
|
|
|
|
# Run an insert overwrite/select that returns an empty resultset
|
|
insert_query = ("INSERT OVERWRITE {0} PARTITION(year=2009, month=1)"
|
|
" select 1, 1 from {0} LIMIT 0".format(table_name))
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
# Data file should be deleted
|
|
new_ls2 = self.filesystem_client.ls(partition_path)
|
|
assert len(new_ls2) == 0
|
|
assert new_ls != new_ls2
|
|
|
|
# Test for IMPALA-2008 insert overwrite to an empty table with empty dataset
|
|
empty_target_tbl = "test_overwrite_with_empty_target"
|
|
create_table = "create table %s.%s (id INT, col INT)" % (self.TEST_DB_NAME,
|
|
empty_target_tbl)
|
|
self.execute_query_expect_success(self.client, create_table)
|
|
insert_query = ("INSERT OVERWRITE {0}.{1} select 1, 1 from {0}.{1} LIMIT 0"
|
|
.format(self.TEST_DB_NAME, empty_target_tbl))
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
# Delete target table directory, query should fail with
|
|
# "No such file or directory" error
|
|
target_table_path = "%s%s" % (db_path, empty_target_tbl)
|
|
self.filesystem_client.delete_file_dir(target_table_path, recursive=True)
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
@SkipIfS3.hdfs_acls
|
|
@SkipIfIsilon.hdfs_acls
|
|
def test_multiple_group_acls(self, unique_database):
|
|
"""Test that INSERT correctly respects multiple group ACLs"""
|
|
table = "`{0}`.`insert_group_acl_permissions`".format(unique_database)
|
|
table_path = "test-warehouse/{0}.db/insert_group_acl_permissions".format(
|
|
unique_database)
|
|
insert_query = "INSERT INTO %s VALUES(1)" % table
|
|
|
|
self.execute_query_expect_success(self.client, "DROP TABLE IF EXISTS " + table)
|
|
self.execute_query_expect_success(self.client, "CREATE TABLE %s (col int)" % table)
|
|
|
|
user = getpass.getuser()
|
|
test_user = "test_user"
|
|
# Get the list of all groups of user except the user's owning group.
|
|
owning_group = grp.getgrgid(pwd.getpwnam(user).pw_gid).gr_name
|
|
groups = [g.gr_name for g in grp.getgrall() if user in g.gr_mem]
|
|
if (len(groups) < 1):
|
|
pytest.xfail(reason="Cannot run test, user belongs to only one group.")
|
|
|
|
# First, change the owner to someone other than user who runs impala service
|
|
self.hdfs_client.chown(table_path, "another_user", owning_group)
|
|
|
|
# Set two group ACLs, one contains requested permission, the other doesn't.
|
|
self.hdfs_client.setacl(
|
|
table_path,
|
|
"user::r-x,user:{0}:r-x,group::---,group:{1}:rwx,"
|
|
"other::r-x".format(test_user, groups[0]))
|
|
self.execute_query_expect_success(self.client, "REFRESH " + table)
|
|
self.execute_query_expect_success(self.client, insert_query)
|
|
|
|
# Two group ACLs but with mask to deny the permission.
|
|
self.hdfs_client.setacl(
|
|
table_path,
|
|
"user::r-x,group::r--,group:{0}:rwx,mask::r-x,"
|
|
"other::---".format(groups[0]))
|
|
self.execute_query_expect_success(self.client, "REFRESH " + table)
|
|
self.execute_query_expect_failure(self.client, insert_query)
|
|
|
|
def test_clustered_partition_single_file(self, unique_database):
|
|
"""IMPALA-2523: Tests that clustered insert creates one file per partition, even when
|
|
inserting over multiple row batches."""
|
|
# On s3 this test takes about 220 seconds and we are unlikely to break it, so only run
|
|
# it in exhaustive strategy.
|
|
if self.exploration_strategy() != 'exhaustive' and IS_S3:
|
|
pytest.skip("only runs in exhaustive")
|
|
table = "{0}.insert_clustered".format(unique_database)
|
|
table_path = "test-warehouse/{0}.db/insert_clustered".format(unique_database)
|
|
table_location = get_fs_path("/" + table_path)
|
|
|
|
create_stmt = """create table {0} like functional.alltypes""".format(table)
|
|
self.execute_query_expect_success(self.client, create_stmt)
|
|
|
|
set_location_stmt = """alter table {0} set location '{1}'""".format(
|
|
table, table_location)
|
|
self.execute_query_expect_success(self.client, set_location_stmt)
|
|
|
|
# Setting a lower batch size will result in multiple row batches being written.
|
|
self.execute_query_expect_success(self.client, "set batch_size=10")
|
|
|
|
insert_stmt = """insert into {0} partition(year, month) /*+ clustered,shuffle */
|
|
select * from functional.alltypes""".format(table)
|
|
self.execute_query_expect_success(self.client, insert_stmt)
|
|
|
|
# We expect exactly one partition per year and month, since subsequent row batches of
|
|
# a partition will be written into the same file.
|
|
expected_partitions = \
|
|
["year=%s/month=%s" % (y, m) for y in [2009, 2010] for m in range(1,13)]
|
|
|
|
for partition in expected_partitions:
|
|
partition_path = "{0}/{1}".format(table_path, partition)
|
|
files = self.filesystem_client.ls(partition_path)
|
|
assert len(files) == 1, "%s: %s" % (partition, files)
|
|
|
|
def test_clustered_partition_multiple_files(self, unique_database):
|
|
"""IMPALA-2523: Tests that clustered insert creates the right number of files per
|
|
partition when inserting over multiple row batches."""
|
|
# This test takes about 30 seconds and we are unlikely to break it, so only run it in
|
|
# exhaustive strategy.
|
|
if self.exploration_strategy() != 'exhaustive':
|
|
pytest.skip("only runs in exhaustive")
|
|
table = "{0}.insert_clustered".format(unique_database)
|
|
table_path = "test-warehouse/{0}.db/insert_clustered".format(unique_database)
|
|
table_location = get_fs_path("/" + table_path)
|
|
|
|
create_stmt = """create table {0} (
|
|
l_orderkey BIGINT,
|
|
l_partkey BIGINT,
|
|
l_suppkey BIGINT,
|
|
l_linenumber INT,
|
|
l_quantity DECIMAL(12,2),
|
|
l_extendedprice DECIMAL(12,2),
|
|
l_discount DECIMAL(12,2),
|
|
l_tax DECIMAL(12,2),
|
|
l_linestatus STRING,
|
|
l_shipdate STRING,
|
|
l_commitdate STRING,
|
|
l_receiptdate STRING,
|
|
l_shipinstruct STRING,
|
|
l_shipmode STRING,
|
|
l_comment STRING)
|
|
partitioned by (l_returnflag STRING) stored as parquet
|
|
""".format(table)
|
|
self.execute_query_expect_success(self.client, create_stmt)
|
|
|
|
set_location_stmt = """alter table {0} set location '{1}'""".format(
|
|
table, table_location)
|
|
self.execute_query_expect_success(self.client, set_location_stmt)
|
|
|
|
# Setting a lower parquet file size will result in multiple files being written.
|
|
self.execute_query_expect_success(self.client, "set parquet_file_size=10485760")
|
|
|
|
insert_stmt = """insert into {0} partition(l_returnflag) /*+ clustered,shuffle */
|
|
select l_orderkey,
|
|
l_partkey,
|
|
l_suppkey,
|
|
l_linenumber,
|
|
l_quantity,
|
|
l_extendedprice,
|
|
l_discount,
|
|
l_tax,
|
|
l_linestatus,
|
|
l_shipdate,
|
|
l_commitdate,
|
|
l_receiptdate,
|
|
l_shipinstruct,
|
|
l_shipmode,
|
|
l_comment,
|
|
l_returnflag
|
|
from tpch_parquet.lineitem""".format(table)
|
|
self.execute_query_expect_success(self.client, insert_stmt)
|
|
|
|
expected_partition_files = [("l_returnflag=A", 3, 30),
|
|
("l_returnflag=N", 3, 30),
|
|
("l_returnflag=R", 3, 30)]
|
|
|
|
for partition, min_files, max_files in expected_partition_files:
|
|
partition_path = "{0}/{1}".format(table_path, partition)
|
|
files = self.filesystem_client.ls(partition_path)
|
|
assert min_files <= len(files) and len(files) <= max_files, \
|
|
"%s: %s" % (partition, files)
|