mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
This patch adds support for COS(Cloud Object Storage). Using the hadoop-cos, the implementation is similar to other remote FileSystems. New flags for COS: - num_cos_io_threads: Number of COS I/O threads. Defaults to be 16. Follow-up: - Support for caching COS file handles will be addressed in IMPALA-10772. - test_concurrent_inserts and test_failing_inserts in test_acid_stress.py are skipped due to slow file listing on COS (IMPALA-10773). Tests: - Upload hdfs test data to a COS bucket. Modify all locations in HMS DB to point to the COS bucket. Remove some hdfs caching params. Run CORE tests. Change-Id: Idce135a7591d1b4c74425e365525be3086a39821 Reviewed-on: http://gerrit.cloudera.org:8080/17503 Reviewed-by: Joe McDonnell <joemcdonnell@cloudera.com> Tested-by: Impala Public Jenkins <impala-public-jenkins@cloudera.com>
237 lines
9.7 KiB
Python
237 lines
9.7 KiB
Python
# encoding=utf-8
|
||
#
|
||
# Licensed to the Apache Software Foundation (ASF) under one
|
||
# or more contributor license agreements. See the NOTICE file
|
||
# distributed with this work for additional information
|
||
# regarding copyright ownership. The ASF licenses this file
|
||
# to you under the Apache License, Version 2.0 (the
|
||
# "License"); you may not use this file except in compliance
|
||
# with the License. You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing,
|
||
# software distributed under the License is distributed on an
|
||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
# KIND, either express or implied. See the License for the
|
||
# specific language governing permissions and limitations
|
||
# under the License.
|
||
#
|
||
# Tests Impala properly handles errors when reading and writing data.
|
||
|
||
import pytest
|
||
import random
|
||
import subprocess
|
||
|
||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||
from tests.common.skip import (SkipIf, SkipIfS3, SkipIfABFS, SkipIfADLS, SkipIfGCS,
|
||
SkipIfCOS, SkipIfLocal)
|
||
from tests.common.test_dimensions import create_exec_option_dimension
|
||
|
||
class TestDataErrors(ImpalaTestSuite):
|
||
# batch_size of 1 can expose some interesting corner cases at row batch boundaries.
|
||
BATCH_SIZES = [0, 1]
|
||
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestDataErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_dimension(
|
||
create_exec_option_dimension(batch_sizes=cls.BATCH_SIZES))
|
||
|
||
|
||
@classmethod
|
||
def get_workload(self):
|
||
return 'functional-query'
|
||
|
||
# Regression test for IMP-633. Added as a part of IMPALA-5198.
|
||
@SkipIf.not_hdfs
|
||
class TestHdfsFileOpenFailErrors(ImpalaTestSuite):
|
||
@pytest.mark.execute_serially
|
||
def test_hdfs_file_open_fail(self):
|
||
absolute_location = "/test-warehouse/file_open_fail"
|
||
create_stmt = \
|
||
"create table file_open_fail (x int) location '" + absolute_location + "'"
|
||
insert_stmt = "insert into file_open_fail values(1)"
|
||
select_stmt = "select * from file_open_fail"
|
||
drop_stmt = "drop table if exists file_open_fail purge"
|
||
self.client.execute(drop_stmt)
|
||
self.client.execute(create_stmt)
|
||
self.client.execute(insert_stmt)
|
||
self.filesystem_client.delete_file_dir(absolute_location, recursive=True)
|
||
assert not self.filesystem_client.exists(absolute_location)
|
||
try:
|
||
self.client.execute(select_stmt)
|
||
except ImpalaBeeswaxException as e:
|
||
assert "Failed to open HDFS file" in str(e)
|
||
self.client.execute(drop_stmt)
|
||
|
||
# Test for IMPALA-5331 to verify that the libHDFS API hdfsGetLastExceptionRootCause()
|
||
# works.
|
||
@SkipIf.not_hdfs
|
||
class TestHdfsUnknownErrors(ImpalaTestSuite):
|
||
@pytest.mark.execute_serially
|
||
def test_hdfs_safe_mode_error_255(self, unique_database):
|
||
pytest.xfail("IMPALA-6109: Putting HDFS name node into safe mode trips up HBase")
|
||
create_stmt = "create table {0}.safe_mode_fail (x int)".format(unique_database)
|
||
insert_stmt = "insert into {0}.safe_mode_fail values (1)".format(unique_database)
|
||
self.execute_query_expect_success(self.client, create_stmt)
|
||
self.execute_query_expect_success(self.client, insert_stmt)
|
||
try:
|
||
# Check that we're not in safe mode.
|
||
output, error = subprocess.Popen(
|
||
['hdfs', 'dfsadmin', '-safemode', 'get'],
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
|
||
assert error is "", "Couldn't get status of Safe mode. Error: %s" % (error)
|
||
assert "Safe mode is OFF" in output
|
||
# Turn safe mode on.
|
||
output, error = subprocess.Popen(
|
||
['hdfs', 'dfsadmin', '-safemode', 'enter'],
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
|
||
assert error is "", "Couldn't turn Safe mode ON. Error: %s" % (error)
|
||
assert "Safe mode is ON" in output
|
||
|
||
# We shouldn't be able to write to HDFS when it's in safe mode.
|
||
ex = self.execute_query_expect_failure(self.client, insert_stmt)
|
||
|
||
# Confirm that it is an Unknown error with error code 255.
|
||
assert "Unknown error 255" in str(ex)
|
||
# Confirm that we were able to get the root cause.
|
||
assert "Name node is in safe mode" in str(ex)
|
||
finally:
|
||
# Leave safe mode.
|
||
output, error = subprocess.Popen(
|
||
['hdfs', 'dfsadmin', '-safemode', 'leave'],
|
||
stdout=subprocess.PIPE, stderr=subprocess.PIPE).communicate()
|
||
assert error is "", "Couldn't turn Safe mode OFF. Error: %s" % (error)
|
||
assert "Safe mode is OFF" in output
|
||
|
||
@SkipIfS3.qualified_path
|
||
@SkipIfGCS.qualified_path
|
||
@SkipIfCOS.qualified_path
|
||
@SkipIfABFS.qualified_path
|
||
@SkipIfADLS.qualified_path
|
||
class TestHdfsScanNodeErrors(TestDataErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsScanNodeErrors, cls).add_test_dimensions()
|
||
# Only run on delimited text with no compression.
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format != 'hbase' and
|
||
v.get_value('table_format').file_format != 'parquet')
|
||
|
||
def test_hdfs_scan_node_errors(self, vector):
|
||
# TODO: Run each test with abort_on_error=0 and abort_on_error=1.
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
if (vector.get_value('table_format').file_format != 'text'):
|
||
pytest.xfail("Expected results differ across file formats")
|
||
self.run_test_case('DataErrorsTest/hdfs-scan-node-errors', vector)
|
||
|
||
@SkipIfS3.qualified_path
|
||
@SkipIfGCS.qualified_path
|
||
@SkipIfCOS.qualified_path
|
||
@SkipIfABFS.qualified_path
|
||
@SkipIfADLS.qualified_path
|
||
@SkipIfLocal.qualified_path
|
||
class TestHdfsSeqScanNodeErrors(TestHdfsScanNodeErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsSeqScanNodeErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'seq')
|
||
|
||
def test_hdfs_seq_scan_node_errors(self, vector):
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hdfs-sequence-scan-errors', vector)
|
||
|
||
|
||
@SkipIfS3.qualified_path
|
||
@SkipIfGCS.qualified_path
|
||
@SkipIfCOS.qualified_path
|
||
@SkipIfABFS.qualified_path
|
||
@SkipIfADLS.qualified_path
|
||
class TestHdfsRcFileScanNodeErrors(TestHdfsScanNodeErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsRcFileScanNodeErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'rc')
|
||
|
||
def test_hdfs_rcfile_scan_node_errors(self, vector):
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hdfs-rcfile-scan-node-errors', vector)
|
||
|
||
|
||
class TestAvroErrors(TestDataErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestAvroErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||
v.get_value('table_format').file_format == 'avro' and
|
||
v.get_value('table_format').compression_codec == 'snap')
|
||
|
||
def test_avro_errors(self, vector):
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/avro-errors', vector)
|
||
|
||
class TestHBaseDataErrors(TestDataErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHBaseDataErrors, cls).add_test_dimensions()
|
||
|
||
# Only run on hbase.
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'hbase' and\
|
||
v.get_value('table_format').compression_codec == 'none')
|
||
|
||
def test_hbase_scan_node_errors(self, vector):
|
||
pytest.xfail("hbasealltypeserror doesn't seem to return any errors")
|
||
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hbase-scan-node-errors', vector)
|
||
|
||
def test_hbase_insert_errors(self, vector):
|
||
pytest.xfail("hbasealltypeserror doesn't seem to return any errors")
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hbase-insert-errors', vector)
|
||
|
||
|
||
class TestTimestampErrors(TestDataErrors):
|
||
"""
|
||
Create test table with various valid/invalid timestamp values, then run
|
||
scan and aggregation queries to make sure Impala doesn't crash.
|
||
- value doesn't have date
|
||
- value contains non-ascii char
|
||
- value contains unicode char
|
||
- value is outside boost gregorian date range.
|
||
"""
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestTimestampErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'text')
|
||
|
||
def _setup_test_table(self, fq_tbl_name):
|
||
create_stmt = "CREATE TABLE " + fq_tbl_name + " (col string)"
|
||
insert_stmt = "INSERT INTO TABLE " + fq_tbl_name + " values" + \
|
||
"('1999-03-24 07:21:02'), ('2001-ån-02 12:12:15')," + \
|
||
"('1997-1131 02:09:32'), ('1954-12-03 15:10:02')," + \
|
||
"('12:10:02'), ('1001-04-23 21:08:19'), ('15:03:09')"
|
||
alter_stmt = "ALTER TABLE " + fq_tbl_name + " CHANGE col col timestamp"
|
||
self.client.execute(create_stmt)
|
||
self.client.execute(insert_stmt)
|
||
self.client.execute(alter_stmt)
|
||
|
||
def test_timestamp_scan_agg_errors(self, vector, unique_database):
|
||
FQ_TBL_NAME = "%s.%s" % (unique_database, 'scan_agg_timestamp')
|
||
self._setup_test_table(FQ_TBL_NAME)
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
result = self.client.execute("SELECT AVG(col) FROM " + FQ_TBL_NAME)
|
||
assert result.data == ['1977-01-27 11:15:32']
|
||
result = self.client.execute("SELECT * FROM " + FQ_TBL_NAME + " ORDER BY col")
|
||
assert len(result.data) == 7
|
||
assert result.data == ['1954-12-03 15:10:02', '1999-03-24 07:21:02', \
|
||
'NULL', 'NULL', 'NULL', 'NULL', 'NULL']
|
||
result = self.client.execute("SELECT COUNT(DISTINCT col) FROM " + FQ_TBL_NAME)
|
||
assert result.data == ['2']
|