From 68fef6a5bfca4bedfc60cf117feacb815efe4c99 Mon Sep 17 00:00:00 2001 From: Skye Wanderman-Milne Date: Wed, 30 Sep 2015 21:55:30 -0400 Subject: [PATCH] IMPALA-2213: make Parquet scanner fail query if the file size metadata is stale This patch changes the Parquet scanner to check if it can't read the full footer scan range, indicating that file has been overwritten by a shorter file without refreshing the table metadata. Before it would DCHECK. This patch adds a test for this case, as well as the case where the new file is longer than the metadata states (which fails with an existing error). Change-Id: Ie2031ac2dc90e4f2573bd3ca8a3709db60424f07 Reviewed-on: http://gerrit.cloudera.org:8080/1084 Tested-by: Internal Jenkins Reviewed-by: Tim Armstrong --- be/src/exec/hdfs-parquet-scanner.cc | 29 +++- be/src/runtime/descriptors.cc | 4 + be/src/runtime/descriptors.h | 3 + common/thrift/generate_error_codes.py | 8 +- .../queries/QueryTest/parquet.test | 2 +- tests/metadata/test_stale_metadata.py | 136 ++++++++++++++++++ 6 files changed, 173 insertions(+), 9 deletions(-) create mode 100644 tests/metadata/test_stale_metadata.py diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index db50f264f..bb04dce02 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -1472,10 +1472,25 @@ inline bool HdfsParquetScanner::ReadRow(const vector& column_read Status HdfsParquetScanner::ProcessFooter(bool* eosr) { *eosr = false; - uint8_t* buffer; - int64_t len; + int64_t len = stream_->scan_range()->len(); - RETURN_IF_ERROR(stream_->GetBuffer(false, &buffer, &len)); + // We're processing the scan range issued in IssueInitialRanges(). The scan range should + // be the last FOOTER_BYTES of the file. !success means the file is shorter than we + // expect. Note we can't detect if the file is larger than we expect without attempting + // to read past the end of the scan range, but in this case we'll fail below trying to + // parse the footer. + DCHECK_LE(len, FOOTER_SIZE); + uint8_t* buffer; + bool success = stream_->ReadBytes(len, &buffer, &parse_status_); + if (!success) { + VLOG_QUERY << "Metadata for file '" << stream_->filename() << "' appears stale: " + << "metadata states file size to be " + << PrettyPrinter::Print(stream_->file_desc()->file_length, TUnit::BYTES) + << ", but could only read " + << PrettyPrinter::Print(stream_->total_bytes_returned(), TUnit::BYTES); + return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, stream_->filename(), + scan_node_->hdfs_table()->fully_qualified_name()); + } DCHECK(stream_->eosr()); // Number of bytes in buffer after the fixed size footer is accounted for. @@ -1483,7 +1498,7 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { // Make sure footer has enough bytes to contain the required information. if (remaining_bytes_buffered < 0) { - return Status(Substitute("File $0 is invalid. Missing metadata.", + return Status(Substitute("File '$0' is invalid. Missing metadata.", stream_->filename())); } @@ -1491,9 +1506,9 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { uint8_t* magic_number_ptr = buffer + len - sizeof(PARQUET_VERSION_NUMBER); if (memcmp(magic_number_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) { - return Status(Substitute("File $0 is invalid. Invalid file footer: $1", - stream_->filename(), - string((char*)magic_number_ptr, sizeof(PARQUET_VERSION_NUMBER)))); + return Status(TErrorCode::PARQUET_BAD_VERSION_NUMBER, stream_->filename(), + string(reinterpret_cast(magic_number_ptr), sizeof(PARQUET_VERSION_NUMBER)), + scan_node_->hdfs_table()->fully_qualified_name()); } // The size of the metadata is encoded as a 4 byte little endian value before diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index d32dce8ae..4ed700a00 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -124,6 +124,10 @@ TableDescriptor::TableDescriptor(const TTableDescriptor& tdesc) } } +string TableDescriptor::fully_qualified_name() const { + return Substitute("$0.$1", database_, name_); +} + string TableDescriptor::DebugString() const { vector cols; BOOST_FOREACH(const ColumnDescriptor& col_desc, col_descs_) { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 6bce750d7..e36330c5a 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -202,6 +202,9 @@ class TableDescriptor { int id() const { return id_; } const std::vector& col_descs() const { return col_descs_; } + /// Returns "." + std::string fully_qualified_name() const; + protected: std::string name_; std::string database_; diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 085d3959e..c3504e476 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -184,7 +184,13 @@ error_codes = ( "Temporary file $0 is blacklisted from a previous error and cannot be expanded."), ("RPC_CLIENT_CONNECT_FAILURE", 59, - "RPC client failed to connect: $0") + "RPC client failed to connect: $0"), + + ("STALE_METADATA_FILE_TOO_SHORT", 60, "Metadata for file '$0' appears stale. " + "Try running \\\"refresh $1\\\" to reload the file metadata."), + + ("PARQUET_BAD_VERSION_NUMBER", 61, "File '$0' has an invalid version number: $1\\n" + "This could be due to stale metadata. Try running \\\"refresh $2\\\"."), ) import sys diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet.test b/testdata/workloads/functional-query/queries/QueryTest/parquet.test index 96e778c52..551174d75 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet.test @@ -50,5 +50,5 @@ bigint,bigint,string,string,boolean,boolean,bigint,bigint,bigint,bigint # Parquet file with invalid magic number SELECT * from bad_magic_number ---- CATCH -File $NAMENODE/test-warehouse/bad_magic_number_parquet/bad_magic_number.parquet is invalid. Invalid file footer: XXXX +File '$NAMENODE/test-warehouse/bad_magic_number_parquet/bad_magic_number.parquet' has an invalid version number: XXXX ==== diff --git a/tests/metadata/test_stale_metadata.py b/tests/metadata/test_stale_metadata.py new file mode 100644 index 000000000..d16c9698b --- /dev/null +++ b/tests/metadata/test_stale_metadata.py @@ -0,0 +1,136 @@ +# Copyright (c) 2015 Cloudera, Inc. All rights reserved. + +import random +from subprocess import check_call +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.impala_test_suite import create_single_exec_option_dimension + +class TestRewrittenFile(ImpalaTestSuite): + """Tests that we gracefully handle when a file in HDFS is rewritten outside of Impala + without issuing "invalidate metadata".""" + + # Create a unique database name so we can run multiple instances of this test class in + # parallel + DATABASE = "test_written_file_" + str(random.randint(0, 10**10)) + + TABLE_NAME = "alltypes_rewritten_file" + TABLE_LOCATION = "/test-warehouse/%s" % DATABASE + FILE_NAME = "alltypes.parq" + # file size = 17.8 KB + SHORT_FILE = "/test-warehouse/alltypesagg_parquet/year=2010/month=1/" \ + "day=__HIVE_DEFAULT_PARTITION__/*.parq" + SHORT_FILE_NUM_ROWS = 1000 + # file size = 43.3 KB + LONG_FILE = "/test-warehouse/alltypesagg_parquet/year=2010/month=1/day=9/*.parq" + LONG_FILE_NUM_ROWS = 1000 + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestRewrittenFile, cls).add_test_dimensions() + cls.TestMatrix.add_dimension(create_single_exec_option_dimension()) + # TODO: add more file formats + cls.TestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + + @classmethod + def setup_class(cls): + super(TestRewrittenFile, cls).setup_class() + cls.cleanup_db(cls.DATABASE) + cls.client.execute("create database if not exists " + cls.DATABASE) + + @classmethod + def teardown_class(cls): + cls.cleanup_db(cls.DATABASE) + super(TestRewrittenFile, cls).teardown_class() + + def teardown_method(self, method): + self.__drop_test_table() + + def __overwrite_file_and_query(self, vector, old_file, new_file, expected_error, + expected_new_count): + """Rewrites 'old_file' with 'new_file' without invalidating metadata and verifies that + querying the table results in the expected error. 'expected_error' only needs to be a + substring of the full error message.""" + self.__create_test_table() + + # First copy in 'old_file' and refresh the cached file metadata. + self.__copy_file_to_test_table(old_file) + self.client.execute("refresh %s" % self.__full_table_name()) + + # Then overwrite 'old_file' with 'new_file', and don't invalidate metadata. + self.__copy_file_to_test_table(new_file) + + # Query the table and check for expected error. + try: + result = self.client.execute("select * from %s" % self.__full_table_name()) + assert False, "Query was expected to fail" + except ImpalaBeeswaxException as e: + assert expected_error in str(e) + + # Refresh the table and make sure we get results + self.client.execute("refresh %s" % self.__full_table_name()) + result = self.client.execute("select count(*) from %s" % self.__full_table_name()) + assert result.data == [str(expected_new_count)] + + def test_new_file_shorter(self, vector): + """Rewrites an existing file with a new shorter file.""" + # Full error is something like: + # Metadata for file '...' appears stale. Try running "refresh + # test_written_file_xxx.alltypes_rewritten_file" to reload the file metadata. + self.__overwrite_file_and_query(vector, self.LONG_FILE, self.SHORT_FILE, + 'appears stale.', self.SHORT_FILE_NUM_ROWS) + + def test_new_file_longer(self, vector): + """Rewrites an existing file with a new longer file.""" + # Full error is something like: + # File '...' has an invalid version number: ff4C + # This could be due to stale metadata. Try running "refresh + # test_written_file_xxx.alltypes_rewritten_file". + self.__overwrite_file_and_query(vector, self.SHORT_FILE, self.LONG_FILE, + 'invalid version number', self.LONG_FILE_NUM_ROWS) + + def test_delete_file(self, vector): + """Deletes an existing file without refreshing metadata.""" + self.__create_test_table() + + # Copy in a file and refresh the cached file metadata. + self.__copy_file_to_test_table(self.LONG_FILE) + self.client.execute("refresh %s" % self.__full_table_name()) + + # Delete the file without refreshing metadata. + check_call(["hadoop", "fs", "-rm", self.TABLE_LOCATION + '/*'], shell=False) + + # Query the table and check for expected error. + try: + result = self.client.execute("select * from %s" % self.__full_table_name()) + assert False, "Query was expected to fail" + except ImpalaBeeswaxException as e: + assert 'appears stale.' in str(e) + + # Refresh the table and make sure we get results + self.client.execute("refresh %s" % self.__full_table_name()) + result = self.client.execute("select count(*) from %s" % self.__full_table_name()) + assert result.data == ['0'] + + def __create_test_table(self): + self.__drop_test_table() + self.client.execute(""" + CREATE TABLE %s LIKE functional.alltypesnopart STORED AS PARQUET + LOCATION '%s' + """ % (self.__full_table_name(), self.TABLE_LOCATION)) + + def __drop_test_table(self): + self.client.execute("DROP TABLE IF EXISTS %s" % self.__full_table_name()) + + def __copy_file_to_test_table(self, src_path): + """Copies the provided path to the test table, overwriting any previous file.""" + dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME) + check_call(["hadoop", "fs", "-cp", "-f", src_path, dst_path], shell=False) + + def __full_table_name(self): + return "%s.%s" % (self.DATABASE, self.TABLE_NAME)