mirror of
https://github.com/apache/impala.git
synced 2025-12-22 03:18:15 -05:00
Previously Impala disallowed LOAD DATA and INSERT on S3. This patch functionally enables LOAD DATA and INSERT on S3 without making major changes for the sake of improving performance over S3. This patch also enables both INSERT and LOAD DATA between file systems. S3 does not support the rename operation, so the staged files in S3 are copied instead of renamed, which contributes to the slow performance on S3. The FinalizeSuccessfulInsert() function now does not make any underlying assumptions of the filesystem it is on and works across all supported filesystems. This is done by adding a full URI field to the base directory for a partition in the TInsertPartitionStatus. Also, the HdfsOp class now does not assume a single filesystem and gets connections to the filesystems based on the URI of the file it is operating on. Added a python S3 client called 'boto3' to access S3 from the python tests. A new class called S3Client is introduced which creates wrappers around the boto3 functions and have the same function signatures as PyWebHdfsClient by deriving from a base abstract class BaseFileSystem so that they can be interchangeably through a 'generic_client'. test_load.py is refactored to use this generic client. The ImpalaTestSuite setup creates a client according to the TARGET_FILESYSTEM environment variable and assigns it to the 'generic_client'. P.S: Currently, the test_load.py runs 4x slower on S3 than on HDFS. Performance needs to be improved in future patches. INSERT performance is slower than on HDFS too. This is mainly because of an extra copy that happens between staging and the final location of a file. However, larger INSERTs come closer to HDFS permformance than smaller inserts. ACLs are not taken care of for S3 in this patch. It is something that still needs to be discussed before implementing. Change-Id: I94e15ad67752dce21c9b7c1dced6e114905a942d Reviewed-on: http://gerrit.cloudera.org:8080/2574 Reviewed-by: Sailesh Mukil <sailesh@cloudera.com> Tested-by: Internal Jenkins
152 lines
5.8 KiB
Python
152 lines
5.8 KiB
Python
# encoding=utf-8
|
||
# Copyright (c) 2012 Cloudera, Inc. All rights reserved.
|
||
#
|
||
# Licensed under the Apache License, Version 2.0 (the "License");
|
||
# you may not use this file except in compliance with the License.
|
||
# You may obtain a copy of the License at
|
||
#
|
||
# http://www.apache.org/licenses/LICENSE-2.0
|
||
#
|
||
# Unless required by applicable law or agreed to in writing, software
|
||
# distributed under the License is distributed on an "AS IS" BASIS,
|
||
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||
# See the License for the specific language governing permissions and
|
||
# limitations under the License.
|
||
#
|
||
# Tests Impala properly handles errors when reading and writing data.
|
||
|
||
from tests.common.impala_test_suite import ImpalaTestSuite
|
||
from tests.common.skip import SkipIfS3, SkipIfLocal
|
||
import random
|
||
import pytest
|
||
|
||
class TestDataErrors(ImpalaTestSuite):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestDataErrors, cls).add_test_dimensions()
|
||
|
||
@classmethod
|
||
def get_workload(self):
|
||
return 'functional-query'
|
||
|
||
|
||
@SkipIfS3.qualified_path
|
||
class TestHdfsScanNodeErrors(TestDataErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsScanNodeErrors, cls).add_test_dimensions()
|
||
# Only run on delimited text with no compression.
|
||
cls.TestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format != 'hbase' and
|
||
v.get_value('table_format').file_format != 'parquet')
|
||
|
||
def test_hdfs_scan_node_errors(self, vector):
|
||
# TODO: Run each test with abort_on_error=0 and abort_on_error=1.
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
if (vector.get_value('table_format').file_format != 'text'):
|
||
pytest.xfail("Expected results differ across file formats")
|
||
self.run_test_case('DataErrorsTest/hdfs-scan-node-errors', vector)
|
||
|
||
|
||
@SkipIfS3.qualified_path
|
||
@SkipIfLocal.qualified_path
|
||
class TestHdfsSeqScanNodeErrors(TestHdfsScanNodeErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsSeqScanNodeErrors, cls).add_test_dimensions()
|
||
cls.TestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'seq')
|
||
|
||
def test_hdfs_seq_scan_node_errors(self, vector):
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hdfs-sequence-scan-errors', vector)
|
||
|
||
|
||
@SkipIfS3.qualified_path
|
||
class TestHdfsRcFileScanNodeErrors(TestHdfsScanNodeErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHdfsRcFileScanNodeErrors, cls).add_test_dimensions()
|
||
cls.TestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'rc')
|
||
|
||
def test_hdfs_rcfile_scan_node_errors(self, vector):
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hdfs-rcfile-scan-node-errors', vector)
|
||
|
||
|
||
class TestHBaseDataErrors(TestDataErrors):
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestHBaseDataErrors, cls).add_test_dimensions()
|
||
|
||
# Only run on hbase.
|
||
cls.TestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'hbase' and\
|
||
v.get_value('table_format').compression_codec == 'none')
|
||
|
||
def test_hbase_scan_node_errors(self, vector):
|
||
pytest.xfail("hbasealltypeserror doesn't seem to return any errors")
|
||
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hbase-scan-node-errors', vector)
|
||
|
||
def test_hbase_insert_errors(self, vector):
|
||
pytest.xfail("hbasealltypeserror doesn't seem to return any errors")
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
self.run_test_case('DataErrorsTest/hbase-insert-errors', vector)
|
||
|
||
class TestTimestampErrors(TestDataErrors):
|
||
"""
|
||
Create test table with various valid/invalid timestamp values, then run
|
||
scan and aggregation queries to make sure Impala doesn't crash.
|
||
- value doesn't have date
|
||
- value contains non-ascii char
|
||
- value contains unicode char
|
||
- value is outside boost gregorian date range.
|
||
"""
|
||
TEST_DATABASE = "test_timestamp" + str(random.randint(0, 10**5))
|
||
|
||
@classmethod
|
||
def setup_class(cls):
|
||
super(TestTimestampErrors, cls).setup_class()
|
||
cls.cleanup_db(cls.TEST_DATABASE)
|
||
cls.client.execute("CREATE DATABASE IF NOT EXISTS " + cls.TEST_DATABASE)
|
||
|
||
@classmethod
|
||
def teardown_class(cls):
|
||
cls.cleanup_db(cls.TEST_DATABASE)
|
||
super(TestTimestampErrors, cls).teardown_class()
|
||
|
||
@classmethod
|
||
def add_test_dimensions(cls):
|
||
super(TestTimestampErrors, cls).add_test_dimensions()
|
||
cls.TestMatrix.add_constraint(lambda v:\
|
||
v.get_value('table_format').file_format == 'text')
|
||
|
||
def _setup_test_table(self, table_name):
|
||
create_stmt = "CREATE TABLE " + table_name + " (col string)"
|
||
insert_stmt = "INSERT INTO TABLE " + table_name + " values" + \
|
||
"('1999-03-24 07:21:02'), ('2001-ån-02 12:12:15')," + \
|
||
"('1997-1131 02:09:32'), ('1954-12-03 15:10:02')," + \
|
||
"('12:10:02'), ('1001-04-23 21:08:19'), ('15:03:09')"
|
||
alter_stmt = "ALTER TABLE " + table_name + " CHANGE col col timestamp"
|
||
self.client.execute(create_stmt)
|
||
self.client.execute(insert_stmt)
|
||
self.client.execute(alter_stmt)
|
||
|
||
@pytest.mark.execute_serially
|
||
def test_timestamp_scan_agg_errors(self, vector):
|
||
table_name = "%s.%s_%s" % (self.TEST_DATABASE, 'scan_agg_timestamp', \
|
||
str(random.randint(0, 10**5)))
|
||
self._setup_test_table(table_name)
|
||
vector.get_value('exec_option')['abort_on_error'] = 0
|
||
result = self.client.execute("SELECT AVG(col) FROM " + table_name)
|
||
assert result.data == ['1977-01-27 11:15:32']
|
||
result = self.client.execute("SELECT * FROM " + table_name + " ORDER BY col")
|
||
assert len(result.data) == 7
|
||
assert result.data == ['1954-12-03 15:10:02', '1999-03-24 07:21:02', \
|
||
'12:10:02', '15:03:09', 'NULL', 'NULL', 'NULL']
|
||
result = self.client.execute("SELECT COUNT(DISTINCT col) FROM " + table_name)
|
||
assert result.data == ['4']
|