mirror of
https://github.com/apache/impala.git
synced 2025-12-19 18:12:08 -05:00
The Status::ToThrift() function takes the ErrorMsg, and pushes both the msg() and details() into the TStatus::error_msgs list. However, when we unpack the TStatus object into a Status object, we just copy all the TStatus::error_msgs to Status::ErrorMsg::details_ and leave Status::ErrorMsg::message_ blank. This led to the error message not being printed in certain cases which is now fixed. The PlanFragmentExecutor had some code to add query statuses to the error_log (IMP-633), which is no longer necessary after a future patch (IMPALA-762) explicitly returned the query status to the client via get_log(), making the adding of the query statuses to the error_log redundant. That code in the PFE has been removed and a test has been added to make sure that the case it previously tried to fix doesn't regress. Change-Id: I5d9d63610eb0d2acae3a9303ce46e1410727ce87 Reviewed-on: http://gerrit.cloudera.org:8080/6627 Reviewed-by: Sailesh Mukil <sailesh@cloudera.com> Tested-by: Impala Public Jenkins
183 lines
7.3 KiB
Python
183 lines
7.3 KiB
Python
# encoding=utf-8
|
||
#
|
||
# Licensed to the Apache Software Foundation (ASF) under one
|
||
# or more contributor license agreements. See the NOTICE file
|
||
# distributed with this work for additional information
|
||
# regarding copyright ownership. The ASF licenses this file
|
||
# to you under the Apache License, Version 2.0 (the
|
||
# "License"); you may not use this file except in compliance
|
||
# with the License. You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing,
|
||
# software distributed under the License is distributed on an
|
||
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
|
||
# KIND, either express or implied. See the License for the
|
||
# specific language governing permissions and limitations
|
||
# under the License.
|
||
#
|
||
# Tests Impala properly handles errors when reading and writing data.
|
||
|
||
import pytest
|
||
import random
|
||
|
||
from tests.beeswax.impala_beeswax import ImpalaBeeswaxException
|
||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||
from tests.common.skip import SkipIfS3, SkipIfLocal
|
||
from tests.common.test_dimensions import create_exec_option_dimension
|
||
|
||
class TestDataErrors(ImpalaTestSuite):
|
||
# batch_size of 1 can expose some interesting corner cases at row batch boundaries.
|
||
BATCH_SIZES = [0, 1]
|
||
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestDataErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_dimension(
|
||
create_exec_option_dimension(batch_sizes=cls.BATCH_SIZES))
|
||
|
||
|
||
@classmethod
|
||
def get_workload(self):
|
||
return 'functional-query'
|
||
|
||
# Regression test for IMP-633. Added as a part of IMPALA-5198
|
||
class TestHdfsFileOpenFailErrors(ImpalaTestSuite):
|
||
@pytest.mark.execute_serially
|
||
def test_hdfs_file_open_fail(self):
|
||
absolute_location = "/test-warehouse/file_open_fail"
|
||
create_stmt = \
|
||
"create table file_open_fail (x int) location '" + absolute_location + "'"
|
||
insert_stmt = "insert into file_open_fail values(1)"
|
||
select_stmt = "select * from file_open_fail"
|
||
drop_stmt = "drop table if exists file_open_fail purge"
|
||
self.client.execute(drop_stmt)
|
||
self.client.execute(create_stmt)
|
||
self.client.execute(insert_stmt)
|
||
self.filesystem_client.delete_file_dir(absolute_location, recursive=True)
|
||
assert not self.filesystem_client.exists(absolute_location)
|
||
try:
|
||
self.client.execute(select_stmt)
|
||
except ImpalaBeeswaxException as e:
|
||
assert "Failed to open HDFS file" in str(e)
|
||
self.client.execute(drop_stmt)
|
||
|
||
|
||
@SkipIfS3.qualified_path
|
||
class TestHdfsScanNodeErrors(TestDataErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsScanNodeErrors, cls).add_test_dimensions()
|
||
# Only run on delimited text with no compression.
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format != 'hbase' and
|
||
v.get_value('table_format').file_format != 'parquet')
|
||
|
||
def test_hdfs_scan_node_errors(self, vector):
|
||
# TODO: Run each test with abort_on_error=0 and abort_on_error=1.
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
if (vector.get_value('table_format').file_format != 'text'):
|
||
pytest.xfail("Expected results differ across file formats")
|
||
self.run_test_case('DataErrorsTest/hdfs-scan-node-errors', vector)
|
||
|
||
@SkipIfS3.qualified_path
|
||
@SkipIfLocal.qualified_path
|
||
class TestHdfsSeqScanNodeErrors(TestHdfsScanNodeErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsSeqScanNodeErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'seq')
|
||
|
||
def test_hdfs_seq_scan_node_errors(self, vector):
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hdfs-sequence-scan-errors', vector)
|
||
|
||
|
||
@SkipIfS3.qualified_path
|
||
class TestHdfsRcFileScanNodeErrors(TestHdfsScanNodeErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsRcFileScanNodeErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'rc')
|
||
|
||
def test_hdfs_rcfile_scan_node_errors(self, vector):
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hdfs-rcfile-scan-node-errors', vector)
|
||
|
||
|
||
class TestAvroErrors(TestDataErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestAvroErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:
|
||
v.get_value('table_format').file_format == 'avro' and
|
||
v.get_value('table_format').compression_codec == 'snap')
|
||
|
||
def test_avro_errors(self, vector):
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/avro-errors', vector)
|
||
|
||
class TestHBaseDataErrors(TestDataErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHBaseDataErrors, cls).add_test_dimensions()
|
||
|
||
# Only run on hbase.
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'hbase' and\
|
||
v.get_value('table_format').compression_codec == 'none')
|
||
|
||
def test_hbase_scan_node_errors(self, vector):
|
||
pytest.xfail("hbasealltypeserror doesn't seem to return any errors")
|
||
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hbase-scan-node-errors', vector)
|
||
|
||
def test_hbase_insert_errors(self, vector):
|
||
pytest.xfail("hbasealltypeserror doesn't seem to return any errors")
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hbase-insert-errors', vector)
|
||
|
||
|
||
class TestTimestampErrors(TestDataErrors):
|
||
"""
|
||
Create test table with various valid/invalid timestamp values, then run
|
||
scan and aggregation queries to make sure Impala doesn't crash.
|
||
- value doesn't have date
|
||
- value contains non-ascii char
|
||
- value contains unicode char
|
||
- value is outside boost gregorian date range.
|
||
"""
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestTimestampErrors, cls).add_test_dimensions()
|
||
cls.ImpalaTestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'text')
|
||
|
||
def _setup_test_table(self, fq_tbl_name):
|
||
create_stmt = "CREATE TABLE " + fq_tbl_name + " (col string)"
|
||
insert_stmt = "INSERT INTO TABLE " + fq_tbl_name + " values" + \
|
||
"('1999-03-24 07:21:02'), ('2001-ån-02 12:12:15')," + \
|
||
"('1997-1131 02:09:32'), ('1954-12-03 15:10:02')," + \
|
||
"('12:10:02'), ('1001-04-23 21:08:19'), ('15:03:09')"
|
||
alter_stmt = "ALTER TABLE " + fq_tbl_name + " CHANGE col col timestamp"
|
||
self.client.execute(create_stmt)
|
||
self.client.execute(insert_stmt)
|
||
self.client.execute(alter_stmt)
|
||
|
||
def test_timestamp_scan_agg_errors(self, vector, unique_database):
|
||
FQ_TBL_NAME = "%s.%s" % (unique_database, 'scan_agg_timestamp')
|
||
self._setup_test_table(FQ_TBL_NAME)
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
result = self.client.execute("SELECT AVG(col) FROM " + FQ_TBL_NAME)
|
||
assert result.data == ['1977-01-27 11:15:32']
|
||
result = self.client.execute("SELECT * FROM " + FQ_TBL_NAME + " ORDER BY col")
|
||
assert len(result.data) == 7
|
||
assert result.data == ['1954-12-03 15:10:02', '1999-03-24 07:21:02', \
|
||
'12:10:02', '15:03:09', 'NULL', 'NULL', 'NULL']
|
||
result = self.client.execute("SELECT COUNT(DISTINCT col) FROM " + FQ_TBL_NAME)
|
||
assert result.data == ['4']
|