diff --git a/be/src/exec/hdfs-parquet-scanner.cc b/be/src/exec/hdfs-parquet-scanner.cc index db50f264f..bb04dce02 100644 --- a/be/src/exec/hdfs-parquet-scanner.cc +++ b/be/src/exec/hdfs-parquet-scanner.cc @@ -1472,10 +1472,25 @@ inline bool HdfsParquetScanner::ReadRow(const vector& column_read Status HdfsParquetScanner::ProcessFooter(bool* eosr) { *eosr = false; - uint8_t* buffer; - int64_t len; + int64_t len = stream_->scan_range()->len(); - RETURN_IF_ERROR(stream_->GetBuffer(false, &buffer, &len)); + // We're processing the scan range issued in IssueInitialRanges(). The scan range should + // be the last FOOTER_BYTES of the file. !success means the file is shorter than we + // expect. Note we can't detect if the file is larger than we expect without attempting + // to read past the end of the scan range, but in this case we'll fail below trying to + // parse the footer. + DCHECK_LE(len, FOOTER_SIZE); + uint8_t* buffer; + bool success = stream_->ReadBytes(len, &buffer, &parse_status_); + if (!success) { + VLOG_QUERY << "Metadata for file '" << stream_->filename() << "' appears stale: " + << "metadata states file size to be " + << PrettyPrinter::Print(stream_->file_desc()->file_length, TUnit::BYTES) + << ", but could only read " + << PrettyPrinter::Print(stream_->total_bytes_returned(), TUnit::BYTES); + return Status(TErrorCode::STALE_METADATA_FILE_TOO_SHORT, stream_->filename(), + scan_node_->hdfs_table()->fully_qualified_name()); + } DCHECK(stream_->eosr()); // Number of bytes in buffer after the fixed size footer is accounted for. @@ -1483,7 +1498,7 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { // Make sure footer has enough bytes to contain the required information. if (remaining_bytes_buffered < 0) { - return Status(Substitute("File $0 is invalid. Missing metadata.", + return Status(Substitute("File '$0' is invalid. Missing metadata.", stream_->filename())); } @@ -1491,9 +1506,9 @@ Status HdfsParquetScanner::ProcessFooter(bool* eosr) { uint8_t* magic_number_ptr = buffer + len - sizeof(PARQUET_VERSION_NUMBER); if (memcmp(magic_number_ptr, PARQUET_VERSION_NUMBER, sizeof(PARQUET_VERSION_NUMBER)) != 0) { - return Status(Substitute("File $0 is invalid. Invalid file footer: $1", - stream_->filename(), - string((char*)magic_number_ptr, sizeof(PARQUET_VERSION_NUMBER)))); + return Status(TErrorCode::PARQUET_BAD_VERSION_NUMBER, stream_->filename(), + string(reinterpret_cast(magic_number_ptr), sizeof(PARQUET_VERSION_NUMBER)), + scan_node_->hdfs_table()->fully_qualified_name()); } // The size of the metadata is encoded as a 4 byte little endian value before diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index d32dce8ae..4ed700a00 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -124,6 +124,10 @@ TableDescriptor::TableDescriptor(const TTableDescriptor& tdesc) } } +string TableDescriptor::fully_qualified_name() const { + return Substitute("$0.$1", database_, name_); +} + string TableDescriptor::DebugString() const { vector cols; BOOST_FOREACH(const ColumnDescriptor& col_desc, col_descs_) { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 6bce750d7..e36330c5a 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -202,6 +202,9 @@ class TableDescriptor { int id() const { return id_; } const std::vector& col_descs() const { return col_descs_; } + /// Returns "." + std::string fully_qualified_name() const; + protected: std::string name_; std::string database_; diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index 085d3959e..c3504e476 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -184,7 +184,13 @@ error_codes = ( "Temporary file $0 is blacklisted from a previous error and cannot be expanded."), ("RPC_CLIENT_CONNECT_FAILURE", 59, - "RPC client failed to connect: $0") + "RPC client failed to connect: $0"), + + ("STALE_METADATA_FILE_TOO_SHORT", 60, "Metadata for file '$0' appears stale. " + "Try running \\\"refresh $1\\\" to reload the file metadata."), + + ("PARQUET_BAD_VERSION_NUMBER", 61, "File '$0' has an invalid version number: $1\\n" + "This could be due to stale metadata. Try running \\\"refresh $2\\\"."), ) import sys diff --git a/testdata/workloads/functional-query/queries/QueryTest/parquet.test b/testdata/workloads/functional-query/queries/QueryTest/parquet.test index 96e778c52..551174d75 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/parquet.test +++ b/testdata/workloads/functional-query/queries/QueryTest/parquet.test @@ -50,5 +50,5 @@ bigint,bigint,string,string,boolean,boolean,bigint,bigint,bigint,bigint # Parquet file with invalid magic number SELECT * from bad_magic_number ---- CATCH -File $NAMENODE/test-warehouse/bad_magic_number_parquet/bad_magic_number.parquet is invalid. Invalid file footer: XXXX +File '$NAMENODE/test-warehouse/bad_magic_number_parquet/bad_magic_number.parquet' has an invalid version number: XXXX ==== diff --git a/tests/metadata/test_stale_metadata.py b/tests/metadata/test_stale_metadata.py new file mode 100644 index 000000000..d16c9698b --- /dev/null +++ b/tests/metadata/test_stale_metadata.py @@ -0,0 +1,136 @@ +# Copyright (c) 2015 Cloudera, Inc. All rights reserved. + +import random +from subprocess import check_call +from tests.beeswax.impala_beeswax import ImpalaBeeswaxException +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.impala_test_suite import create_single_exec_option_dimension + +class TestRewrittenFile(ImpalaTestSuite): + """Tests that we gracefully handle when a file in HDFS is rewritten outside of Impala + without issuing "invalidate metadata".""" + + # Create a unique database name so we can run multiple instances of this test class in + # parallel + DATABASE = "test_written_file_" + str(random.randint(0, 10**10)) + + TABLE_NAME = "alltypes_rewritten_file" + TABLE_LOCATION = "/test-warehouse/%s" % DATABASE + FILE_NAME = "alltypes.parq" + # file size = 17.8 KB + SHORT_FILE = "/test-warehouse/alltypesagg_parquet/year=2010/month=1/" \ + "day=__HIVE_DEFAULT_PARTITION__/*.parq" + SHORT_FILE_NUM_ROWS = 1000 + # file size = 43.3 KB + LONG_FILE = "/test-warehouse/alltypesagg_parquet/year=2010/month=1/day=9/*.parq" + LONG_FILE_NUM_ROWS = 1000 + + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def add_test_dimensions(cls): + super(TestRewrittenFile, cls).add_test_dimensions() + cls.TestMatrix.add_dimension(create_single_exec_option_dimension()) + # TODO: add more file formats + cls.TestMatrix.add_constraint( + lambda v: v.get_value('table_format').file_format == 'parquet') + + @classmethod + def setup_class(cls): + super(TestRewrittenFile, cls).setup_class() + cls.cleanup_db(cls.DATABASE) + cls.client.execute("create database if not exists " + cls.DATABASE) + + @classmethod + def teardown_class(cls): + cls.cleanup_db(cls.DATABASE) + super(TestRewrittenFile, cls).teardown_class() + + def teardown_method(self, method): + self.__drop_test_table() + + def __overwrite_file_and_query(self, vector, old_file, new_file, expected_error, + expected_new_count): + """Rewrites 'old_file' with 'new_file' without invalidating metadata and verifies that + querying the table results in the expected error. 'expected_error' only needs to be a + substring of the full error message.""" + self.__create_test_table() + + # First copy in 'old_file' and refresh the cached file metadata. + self.__copy_file_to_test_table(old_file) + self.client.execute("refresh %s" % self.__full_table_name()) + + # Then overwrite 'old_file' with 'new_file', and don't invalidate metadata. + self.__copy_file_to_test_table(new_file) + + # Query the table and check for expected error. + try: + result = self.client.execute("select * from %s" % self.__full_table_name()) + assert False, "Query was expected to fail" + except ImpalaBeeswaxException as e: + assert expected_error in str(e) + + # Refresh the table and make sure we get results + self.client.execute("refresh %s" % self.__full_table_name()) + result = self.client.execute("select count(*) from %s" % self.__full_table_name()) + assert result.data == [str(expected_new_count)] + + def test_new_file_shorter(self, vector): + """Rewrites an existing file with a new shorter file.""" + # Full error is something like: + # Metadata for file '...' appears stale. Try running "refresh + # test_written_file_xxx.alltypes_rewritten_file" to reload the file metadata. + self.__overwrite_file_and_query(vector, self.LONG_FILE, self.SHORT_FILE, + 'appears stale.', self.SHORT_FILE_NUM_ROWS) + + def test_new_file_longer(self, vector): + """Rewrites an existing file with a new longer file.""" + # Full error is something like: + # File '...' has an invalid version number: ff4C + # This could be due to stale metadata. Try running "refresh + # test_written_file_xxx.alltypes_rewritten_file". + self.__overwrite_file_and_query(vector, self.SHORT_FILE, self.LONG_FILE, + 'invalid version number', self.LONG_FILE_NUM_ROWS) + + def test_delete_file(self, vector): + """Deletes an existing file without refreshing metadata.""" + self.__create_test_table() + + # Copy in a file and refresh the cached file metadata. + self.__copy_file_to_test_table(self.LONG_FILE) + self.client.execute("refresh %s" % self.__full_table_name()) + + # Delete the file without refreshing metadata. + check_call(["hadoop", "fs", "-rm", self.TABLE_LOCATION + '/*'], shell=False) + + # Query the table and check for expected error. + try: + result = self.client.execute("select * from %s" % self.__full_table_name()) + assert False, "Query was expected to fail" + except ImpalaBeeswaxException as e: + assert 'appears stale.' in str(e) + + # Refresh the table and make sure we get results + self.client.execute("refresh %s" % self.__full_table_name()) + result = self.client.execute("select count(*) from %s" % self.__full_table_name()) + assert result.data == ['0'] + + def __create_test_table(self): + self.__drop_test_table() + self.client.execute(""" + CREATE TABLE %s LIKE functional.alltypesnopart STORED AS PARQUET + LOCATION '%s' + """ % (self.__full_table_name(), self.TABLE_LOCATION)) + + def __drop_test_table(self): + self.client.execute("DROP TABLE IF EXISTS %s" % self.__full_table_name()) + + def __copy_file_to_test_table(self, src_path): + """Copies the provided path to the test table, overwriting any previous file.""" + dst_path = "%s/%s" % (self.TABLE_LOCATION, self.FILE_NAME) + check_call(["hadoop", "fs", "-cp", "-f", src_path, dst_path], shell=False) + + def __full_table_name(self): + return "%s.%s" % (self.DATABASE, self.TABLE_NAME)