From f8015ff68dddcc7e9813b3fec7953d87627ca5ab Mon Sep 17 00:00:00 2001 From: Zoltan Borok-Nagy Date: Thu, 2 Apr 2020 18:04:07 +0200 Subject: [PATCH] IMPALA-9512: Full ACID Milestone 2: Validate rows against the valid write id list Minor compactions can compact several delta directories into a single delta directory. The current directory filtering algorithm had to be modified to handle minor compacted directories and prefer those over plain delta directories. This happens in the Frontend, mostly in AcidUtils.java. Hive Streaming Ingestion writes similar delta directories, but they might contain rows Impala cannot see based on its valid write id list. E.g. we can have the following delta directory: full_acid/delta_0000001_0000010/0000 # minWriteId: 1 # maxWriteId: 10 This delta dir contains rows with write ids between 1 and 10. But maybe we are only allowed to see write ids less than 5. Therefore we need to check the ACID write id column (named originalTransaction) to determine which rows are valid. Delta directories written by Hive Streaming don't have a visibility txn id, so we can recognize them based on the directory name. If there's a visibilityTxnId and it is committed => every row is valid: full_acid/delta_0000001_0000010_v01234 # has visibilityTxnId # every row is valid If there's no visibilityTxnId then it was created via Hive Streaming, therefore we need to validate rows. Fortunately Hive Streaming writes rows with different write ids into different ORC stripes, therefore we don't need to validate the write id per row. If we had statistics, we could validate per stripe, but since Hive Streaming doesn't write statistics we validate the write id per ORC row batch (an alternative could be to do a 2-pass read, first we'd read a single value from each stripe's 'currentTransaction' field, then we'd read the stripe if the write id is valid). Testing * the frontend logic is tested in AcidUtilsTest * the backend row validation is tested in test_acid_row_validation Change-Id: I5ed74585a2d73ebbcee763b0545be4412926299d Reviewed-on: http://gerrit.cloudera.org:8080/15818 Reviewed-by: Impala Public Jenkins Tested-by: Impala Public Jenkins --- be/src/exec/CMakeLists.txt | 3 + be/src/exec/acid-metadata-utils-test.cc | 209 +++++++++++++++ be/src/exec/acid-metadata-utils.cc | 124 +++++++++ be/src/exec/acid-metadata-utils.h | 50 ++++ be/src/exec/hdfs-orc-scanner.cc | 34 ++- be/src/exec/hdfs-orc-scanner.h | 11 + be/src/exec/orc-column-readers.cc | 54 +++- be/src/exec/orc-column-readers.h | 33 ++- be/src/exec/orc-metadata-utils.cc | 2 +- be/src/exec/orc-metadata-utils.h | 6 + be/src/runtime/descriptors.cc | 1 + be/src/runtime/descriptors.h | 3 + common/thrift/CatalogObjects.thrift | 24 +- common/thrift/CatalogService.thrift | 3 + .../apache/impala/compat/MetastoreShim.java | 53 ++++ .../impala/analysis/StmtMetadataLoader.java | 2 +- .../impala/catalog/DataSourceTable.java | 3 +- .../org/apache/impala/catalog/FeTable.java | 3 +- .../impala/catalog/FileMetadataLoader.java | 8 +- .../org/apache/impala/catalog/HdfsTable.java | 64 ++++- .../java/org/apache/impala/catalog/Table.java | 66 +---- .../catalog/local/CatalogdMetaProvider.java | 20 +- .../catalog/local/DirectMetaProvider.java | 8 + .../impala/catalog/local/LocalFsTable.java | 7 + .../impala/catalog/local/LocalTable.java | 3 +- .../impala/catalog/local/MetaProvider.java | 3 + .../org/apache/impala/util/AcidUtils.java | 247 +++++++++++++++--- .../analysis/StmtMetadataLoaderTest.java | 3 +- .../catalog/FileMetadataLoaderTest.java | 21 ++ .../org/apache/impala/util/AcidUtilsTest.java | 171 ++++++++++-- testdata/bin/generate-schema-statements.py | 2 +- testdata/data/README | 5 + testdata/data/streaming.orc | Bin 0 -> 1137 bytes .../functional/functional_schema_template.sql | 24 ++ .../functional/schema_constraints.csv | 1 + .../queries/QueryTest/acid-negative.test | 24 +- .../QueryTest/acid-row-validation-0.test | 15 ++ .../QueryTest/acid-row-validation-1.test | 17 ++ .../QueryTest/acid-row-validation-2.test | 19 ++ .../queries/QueryTest/acid.test | 25 ++ .../queries/QueryTest/full-acid-rowid.test | 20 ++ tests/query_test/test_acid_row_validation.py | 72 +++++ tests/util/acid_txn.py | 151 +++++++++++ 43 files changed, 1425 insertions(+), 189 deletions(-) create mode 100644 be/src/exec/acid-metadata-utils-test.cc create mode 100644 be/src/exec/acid-metadata-utils.cc create mode 100644 be/src/exec/acid-metadata-utils.h create mode 100644 testdata/data/streaming.orc create mode 100644 testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-0.test create mode 100644 testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-1.test create mode 100644 testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-2.test create mode 100644 tests/query_test/test_acid_row_validation.py create mode 100644 tests/util/acid_txn.py diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt index c4440c1ba..bfd7948db 100644 --- a/be/src/exec/CMakeLists.txt +++ b/be/src/exec/CMakeLists.txt @@ -26,6 +26,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec") set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec") add_library(Exec + acid-metadata-utils.cc aggregation-node.cc aggregation-node-base.cc aggregator.cc @@ -109,6 +110,7 @@ add_library(Exec add_dependencies(Exec gen-deps) add_library(ExecTests STATIC + acid-metadata-utils-test.cc delimited-text-parser-test.cc hash-table-test.cc hdfs-avro-scanner-test.cc @@ -118,6 +120,7 @@ add_library(ExecTests STATIC ) add_dependencies(ExecTests gen-deps) +ADD_UNIFIED_BE_LSAN_TEST(acid-metadata-utils-test ValidWriteIdListTest.*) ADD_UNIFIED_BE_LSAN_TEST(zigzag-test ZigzagTest.*) ADD_UNIFIED_BE_LSAN_TEST(hash-table-test HashTableTest.*) ADD_UNIFIED_BE_LSAN_TEST(delimited-text-parser-test DelimitedTextParser.*) diff --git a/be/src/exec/acid-metadata-utils-test.cc b/be/src/exec/acid-metadata-utils-test.cc new file mode 100644 index 000000000..7db3e572a --- /dev/null +++ b/be/src/exec/acid-metadata-utils-test.cc @@ -0,0 +1,209 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include + +#include "testutil/gtest-util.h" +#include "exec/acid-metadata-utils.h" + +#include "gen-cpp/CatalogObjects_types.h" + +#include "common/names.h" + +using namespace impala; + +namespace { + +TValidWriteIdList MakeTValidWriteIdList(int64_t high_watermak, + const std::vector& invalid_write_ids = {}, + const std::vector& aborted_indexes = {}) { + TValidWriteIdList ret; + ret.__set_high_watermark(high_watermak); + ret.__set_invalid_write_ids(invalid_write_ids); + ret.__set_aborted_indexes(aborted_indexes); + return ret; +} + +} // anonymous namespace + +TEST(ValidWriteIdListTest, IsWriteIdValid) { + ValidWriteIdList write_id_list; + // Everything is valid by default. + EXPECT_TRUE(write_id_list.IsWriteIdValid(1)); + EXPECT_TRUE(write_id_list.IsWriteIdValid(1000)); + EXPECT_TRUE(write_id_list.IsWriteIdValid(std::numeric_limits::max())); + + // Write ids <= 10 are valid + write_id_list.InitFrom(MakeTValidWriteIdList(10)); + for (int i = 1; i <= 10; ++i) { + EXPECT_TRUE(write_id_list.IsWriteIdValid(i)); + } + EXPECT_FALSE(write_id_list.IsWriteIdValid(11)); + + // Test open write ids + write_id_list.InitFrom(MakeTValidWriteIdList(10, {1, 3, 5, 7, 9})); + for (int i = 1; i <= 10; ++i) { + if (i % 2 == 0) { + EXPECT_TRUE(write_id_list.IsWriteIdValid(i)); + } else { + EXPECT_FALSE(write_id_list.IsWriteIdValid(i)); + } + } + EXPECT_FALSE(write_id_list.IsWriteIdValid(11)); + EXPECT_FALSE(write_id_list.IsWriteIdValid(12)); + + // Test aborted write ids + write_id_list.InitFrom(MakeTValidWriteIdList(10, {1, 3, 5, 7, 9}, {0, 1, 2, 3, 4})); + for (int i = 1; i <= 10; ++i) { + if (i % 2 == 0) { + EXPECT_TRUE(write_id_list.IsWriteIdValid(i)); + } else { + EXPECT_FALSE(write_id_list.IsWriteIdValid(i)); + } + } + EXPECT_FALSE(write_id_list.IsWriteIdValid(11)); + EXPECT_FALSE(write_id_list.IsWriteIdValid(12)); + + // Test open and aborted write ids + write_id_list.InitFrom(MakeTValidWriteIdList(10, {1, 3, 5, 7, 9}, {3, 4})); + for (int i = 1; i <= 10; ++i) { + if (i % 2 == 0) { + EXPECT_TRUE(write_id_list.IsWriteIdValid(i)); + } else { + EXPECT_FALSE(write_id_list.IsWriteIdValid(i)); + } + } + EXPECT_FALSE(write_id_list.IsWriteIdValid(11)); + EXPECT_FALSE(write_id_list.IsWriteIdValid(12)); +} + +TEST(ValidWriteIdListTest, IsWriteIdRangeValid) { + ValidWriteIdList write_id_list; + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(1, 2)); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(2, 200)); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(100, 100000)); + + write_id_list.InitFrom( + MakeTValidWriteIdList(110, {1,2,3,5,7,9,11,31,50,55,90,97,98,99})); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsWriteIdRangeValid(1, 2)); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(2, 200)); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(100, 100000)); + + write_id_list.InitFrom(MakeTValidWriteIdList(1000000, + {1,2,3,5,7,9,11,31,50,55,90,97,98,99,110,1000,1100,2000,10000,150000,550000,90000, + 100000,110000,111000,222000,333000,444000,555000,666000,777000,888000,999000,999999} + )); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsWriteIdRangeValid(1, 2)); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(2, 200)); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(100, 100000)); + + write_id_list.InitFrom( + MakeTValidWriteIdList(1000000, {555000,666000,777000,888000,999000,999999})); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(1, 2)); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(2, 200)); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(100, 100000)); + + write_id_list.InitFrom(MakeTValidWriteIdList(1000, {500,600,700,800,900})); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsWriteIdRangeValid(1100, 2000)); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(900, 1100)); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(901, 1000)); + + write_id_list.InitFrom(MakeTValidWriteIdList(1000)); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsWriteIdRangeValid(1100, 1200)); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(900, 1100)); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(90, 950)); +} + +TEST(ValidWriteIdListTest, IsFileRangeValid) { + ValidWriteIdList write_id_list; + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delta_1_2/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_1_2/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delta_5_5/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_5_5/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delta_100_1100/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_100_1100/0000")); + + write_id_list.InitFrom( + MakeTValidWriteIdList(110, {1,2,3,5,7,9,11,31,50,55,90,97,98,99})); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid( + "/foo/bar/delta_1_2/0000")); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_1_2/0000")); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid( + "/foo/bar/delta_2_2/0000")); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_1_1/0000")); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid( + "/foo/bar/delta_2_200/0000")); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_2_200/0000")); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid( + "/foo/bar/delta_100_100000/0000")); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_100_100000/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delta_100_100/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_100_100/0000")); + + write_id_list.InitFrom(MakeTValidWriteIdList(1000, {500,600,700,800,900})); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid( + "/foo/bar/delta_1100_2000/0000")); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_1100_2000/0000")); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid( + "/foo/bar/delta_800_800/0000")); + EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_500_500/0000")); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid( + "/foo/bar/delta_900_1100/0000")); + EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_900_1100/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delta_901_1000/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_901_1000/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delta_901_901/0000")); + EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid( + "/foo/bar/delete_delta_901_901/0000")); +} + +TEST(ValidWriteIdListTest, IsCompacted) { + EXPECT_TRUE(ValidWriteIdList::IsCompacted("/foo/delta_00005_00010_v123/000")); + EXPECT_TRUE(ValidWriteIdList::IsCompacted("/foo/delete_delta_00001_00010_v123/000")); + EXPECT_TRUE(ValidWriteIdList::IsCompacted("/foo/base_00001_v0123/000")); + + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_1_1/0000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_1_2/0000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_5_10/0000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_05_09/0000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_0000010_0000030/0000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delete_delta_1_2/0000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delete_delta_5_10/0000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delete_delta_05_09/0000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delete_delta_000010_000030/000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/base_10/000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/000")); + EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/p=1/000")); +} diff --git a/be/src/exec/acid-metadata-utils.cc b/be/src/exec/acid-metadata-utils.cc new file mode 100644 index 000000000..fe4691cd0 --- /dev/null +++ b/be/src/exec/acid-metadata-utils.cc @@ -0,0 +1,124 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#include "exec/acid-metadata-utils.h" + +#include "common/logging.h" +#include "gen-cpp/CatalogObjects_types.h" + +#include "common/names.h" + +namespace impala { + +/// Unnamed namespace for auxiliary functions. +namespace { + +const string BASE_PREFIX = "base_"; +const string DELTA_PREFIX = "delta_"; +const string DELETE_DELTA_PREFIX = "delete_delta_"; + +string GetParentDirName(const string& filepath) { + int slash_before_file = filepath.rfind('/'); + if (slash_before_file <= 0) return ""; + int slash_before_dirname = filepath.rfind('/', slash_before_file - 1); + if (slash_before_dirname <= 0) return ""; + return filepath.substr( + slash_before_dirname + 1, slash_before_file - slash_before_dirname - 1); +} + +inline bool StrStartsWith(const string& str, const string& prefix) { + return str.rfind(prefix, 0) == 0; +} + +std::pair GetWriteIdRangeOfDeltaDir(const string& delta_dir) { + int min_write_id_pos = 0; + if (StrStartsWith(delta_dir, DELTA_PREFIX)) { + min_write_id_pos = DELTA_PREFIX.size(); + } + else if (StrStartsWith(delta_dir, DELETE_DELTA_PREFIX)) { + min_write_id_pos = DELETE_DELTA_PREFIX.size(); + } else { + DCHECK(false) << delta_dir + " is not a delta directory"; + } + int max_write_id_pos = delta_dir.find('_', min_write_id_pos) + 1; + return {std::atoll(delta_dir.c_str() + min_write_id_pos), + std::atoll(delta_dir.c_str() + max_write_id_pos)}; +} + +} // unnamed namespace + +ValidWriteIdList::ValidWriteIdList(const TValidWriteIdList& valid_write_ids) { + InitFrom(valid_write_ids); +} + +void ValidWriteIdList::InitFrom(const TValidWriteIdList& valid_write_ids) { + if (valid_write_ids.__isset.high_watermark) { + high_water_mark_ = valid_write_ids.high_watermark; + } else { + high_water_mark_ = std::numeric_limits::max(); + } + invalid_write_ids_.clear(); + for (int64_t invalid_write_id : valid_write_ids.invalid_write_ids) { + invalid_write_ids_.insert(invalid_write_id); + } +} + +bool ValidWriteIdList::IsWriteIdValid(int64_t write_id) const { + if (write_id > high_water_mark_) { + return false; + } + return invalid_write_ids_.find(write_id) == invalid_write_ids_.end(); +} + +ValidWriteIdList::RangeResponse ValidWriteIdList::IsWriteIdRangeValid( + int64_t min_write_id, int64_t max_write_id) const { + if (max_write_id <= high_water_mark_ && invalid_write_ids_.empty()) return ALL; + + bool found_valid = false; + bool found_invalid = false; + for (int64_t i = min_write_id; i <= max_write_id; ++i) { + if (IsWriteIdValid(i)) { + found_valid = true; + } else { + found_invalid = true; + } + if (found_valid && found_invalid) return SOME; + } + if (found_invalid) return NONE; + return ALL; +} + +ValidWriteIdList::RangeResponse ValidWriteIdList::IsFileRangeValid( + const std::string& file_path) const { + string dir_name = GetParentDirName(file_path); + if (!(StrStartsWith(dir_name, DELTA_PREFIX) || + StrStartsWith(dir_name, DELETE_DELTA_PREFIX))) { + return ALL; + } + std::pair write_id_range = GetWriteIdRangeOfDeltaDir(dir_name); + return IsWriteIdRangeValid(write_id_range.first, write_id_range.second); +} + +bool ValidWriteIdList::IsCompacted(const std::string& file_path) { + string dir_name = GetParentDirName(file_path); + if (!StrStartsWith(dir_name, BASE_PREFIX) && + !StrStartsWith(dir_name, DELTA_PREFIX) && + !StrStartsWith(dir_name, DELETE_DELTA_PREFIX)) return false; + return dir_name.find("_v") != string::npos; +} + +} // namespace impala diff --git a/be/src/exec/acid-metadata-utils.h b/be/src/exec/acid-metadata-utils.h new file mode 100644 index 000000000..2bd5b283e --- /dev/null +++ b/be/src/exec/acid-metadata-utils.h @@ -0,0 +1,50 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +#pragma once + +#include +#include +#include + +namespace impala { + +class TValidWriteIdList; + +class ValidWriteIdList { +public: + enum RangeResponse { + NONE, SOME, ALL + }; + + ValidWriteIdList() {} + ValidWriteIdList(const TValidWriteIdList& valid_write_ids); + + void InitFrom(const TValidWriteIdList& valid_write_ids); + + bool IsWriteIdValid(int64_t write_id) const; + RangeResponse IsWriteIdRangeValid(int64_t min_write_id, int64_t max_write_id) const; + RangeResponse IsFileRangeValid(const std::string& file_path) const; + + static bool IsCompacted(const std::string& file_path); +private: + void AddInvalidWriteIds(const std::string& invalid_ids_str); + int64_t high_water_mark_ = std::numeric_limits::max(); + std::unordered_set invalid_write_ids_; +}; + +} diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc index 81df06613..f4d4a2832 100644 --- a/be/src/exec/hdfs-orc-scanner.cc +++ b/be/src/exec/hdfs-orc-scanner.cc @@ -41,8 +41,6 @@ using namespace impala::io; DEFINE_bool(enable_orc_scanner, true, "If false, reading from ORC format tables is not supported"); -const string HIVE_ACID_VERSION_KEY = "hive.acid.version"; - Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node, const vector& files) { DCHECK(!files.empty()); @@ -205,6 +203,19 @@ Status HdfsOrcScanner::Open(ScannerContext* context) { &reader_->getType(), filename(), is_table_full_acid, is_file_full_acid)); RETURN_IF_ERROR(schema_resolver_->ValidateFullAcidFileSchema()); + // Hive Streaming Ingestion allocates multiple write ids, hence create delta directories + // like delta_5_10. Then it continuously appends new stripes (and footers) to the + // ORC files of the delte dir. So it's possible that the file has rows that Impala is + // not allowed to see based on its valid write id list. In such cases we need to + // validate the write ids of the row batches. + if (is_table_full_acid && !ValidWriteIdList::IsCompacted(filename())) { + valid_write_ids_.InitFrom(scan_node_->hdfs_table()->ValidWriteIdList()); + ValidWriteIdList::RangeResponse rows_valid = + valid_write_ids_.IsFileRangeValid(filename()); + DCHECK_NE(rows_valid, ValidWriteIdList::NONE); + row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME; + } + // Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip // columns we don't need. RETURN_IF_ERROR(SelectColumns(*scan_node_->tuple_desc())); @@ -445,6 +456,7 @@ Status HdfsOrcScanner::SelectColumns(const TupleDescriptor& tuple_desc) { RETURN_IF_ERROR(ResolveColumns(tuple_desc, &selected_nodes, &pos_slots)); for (auto t : selected_nodes) selected_type_ids_.push_back(t->getColumnId()); + // Select columns for array positions. Due to ORC-450 we can't materialize array // offsets without materializing its items, so we should still select the item or any // sub column of the item. To be simple, we choose the max column id in the subtree @@ -469,6 +481,17 @@ Status HdfsOrcScanner::SelectColumns(const TupleDescriptor& tuple_desc) { selected_nodes.push_back(array_node); } + // Select "CurrentTransaction" when we need to validate rows. + if (row_batches_need_validation_) { + // In case of zero-slot scans (e.g. count(*) over the table) we only select the + // 'currentTransaction' column. + if (scan_node_->IsZeroSlotTableScan()) selected_type_ids_.clear(); + if (std::find(selected_type_ids_.begin(), selected_type_ids_.end(), + CURRENT_TRANSCACTION_TYPE_ID) == selected_type_ids_.end()) { + selected_type_ids_.push_back(CURRENT_TRANSCACTION_TYPE_ID); + } + } + COUNTER_SET(num_cols_counter_, static_cast(selected_type_ids_.size())); row_reader_options_.includeTypes(selected_type_ids_); return Status::OK(); @@ -494,7 +517,10 @@ Status HdfsOrcScanner::ProcessSplit() { } Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) { - if (scan_node_->IsZeroSlotTableScan()) { + // In case 'row_batches_need_validation_' is true, we need to look at the row + // batches and check their validity. In that case 'currentTransaction' is the only + // selected field from the file (in case of zero slot scans). + if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) { uint64_t file_rows = reader_->getNumberOfRows(); // There are no materialized slots, e.g. count(*) over the table. We can serve // this query from just the file metadata. We don't need to read the column data. @@ -741,7 +767,7 @@ Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) { Status HdfsOrcScanner::AssembleCollection( const OrcComplexColumnReader& complex_col_reader, int row_idx, CollectionValueBuilder* coll_value_builder) { - int total_tuples = complex_col_reader.GetNumTuples(row_idx); + int total_tuples = complex_col_reader.GetNumChildValues(row_idx); if (!complex_col_reader.MaterializeTuple()) { // 'complex_col_reader' maps to a STRUCT or collection column of STRUCTs/collections // and there're no need to materialize current level tuples. Delegate the diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h index 728faee80..a95a13fe9 100644 --- a/be/src/exec/hdfs-orc-scanner.h +++ b/be/src/exec/hdfs-orc-scanner.h @@ -26,6 +26,7 @@ #include "runtime/exec-env.h" #include "runtime/io/disk-io-mgr.h" #include "runtime/runtime-state.h" +#include "exec/acid-metadata-utils.h" #include "exec/hdfs-columnar-scanner.h" #include "exec/hdfs-scan-node.h" #include "exec/orc-metadata-utils.h" @@ -216,6 +217,16 @@ class HdfsOrcScanner : public HdfsColumnarScanner { /// Scan range for the metadata (file tail). const io::ScanRange* metadata_range_ = nullptr; + /// With the help of it we can check the validity of ACID write ids. + ValidWriteIdList valid_write_ids_; + + /// True if we need to validate the row batches against the valid write id list. This + /// only needs to be done for Hive Streaming Ingestion. The 'write id' will be the same + /// within a stripe, but we still need to read the row batches for validation because + /// there are no statistics written. + /// For files not written by Streaming Ingestion we can assume that every row is valid. + bool row_batches_need_validation_ = false; + /// Timer for materializing rows. This ignores time getting the next buffer. ScopedTimer assemble_rows_timer_; diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc index 72fe3f4a9..331f42442 100644 --- a/be/src/exec/orc-column-readers.cc +++ b/be/src/exec/orc-column-readers.cc @@ -34,6 +34,13 @@ string PrintNode(const orc::Type* node) { return Substitute("$0 column (ORC id=$1)", node->toString(), node->getColumnId()); } +bool OrcRowValidator::IsRowBatchValid() const { + if (write_ids_ == nullptr || write_ids_->numElements == 0) return true; + + int64_t write_id = write_ids_->data[0]; + return valid_write_ids_.IsWriteIdValid(write_id); +} + OrcColumnReader* OrcColumnReader::Create(const orc::Type* node, const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner) { DCHECK(node != nullptr); @@ -323,6 +330,9 @@ void OrcStructReader::CreateChildForSlot(const orc::Type* curr_node, OrcStructReader::OrcStructReader(const orc::Type* node, const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner) : OrcComplexColumnReader(node, table_tuple_desc, scanner) { + bool needs_row_validation = table_tuple_desc == scanner_->scan_node_->tuple_desc() && + node->getColumnId() == 0 && + scanner_->row_batches_need_validation_; if (materialize_tuple_) { for (SlotDescriptor* child_slot : tuple_desc_->slots()) { // Skip partition columns and missed columns @@ -337,11 +347,21 @@ OrcStructReader::OrcStructReader(const orc::Type* node, // matches to a descendant of 'node'. Those tuples should be materialized by the // corresponding descendant reader. So 'node' should have exactly one selected // subtype: the child in the path to the target descendant. - DCHECK_EQ(node->getSubtypeCount(), 1); + DCHECK_EQ(node->getSubtypeCount(), needs_row_validation ? 2 : 1); + int child_index = needs_row_validation ? 1 : 0; OrcComplexColumnReader* child = OrcComplexColumnReader::CreateTopLevelReader( - node->getSubtype(0), table_tuple_desc, scanner); + node->getSubtype(child_index), table_tuple_desc, scanner); children_.push_back(child); - children_fields_.push_back(0); + children_fields_.push_back(child_index); + } + if (needs_row_validation) { + row_validator_.reset(new OrcRowValidator(scanner_->valid_write_ids_)); + for (int i = 0; i < node->getSubtypeCount(); ++i) { + if (node->getSubtype(i)->getColumnId() == CURRENT_TRANSCACTION_TYPE_ID) { + current_write_id_field_index_ = i; + break; + } + } } } @@ -374,6 +394,12 @@ Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) { Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, MemPool* pool) { + // Validate row batch if needed. + if (row_validator_) DCHECK(scanner_->row_batches_need_validation_); + if (row_validator_ && !row_validator_->IsRowBatchValid()) { + row_idx_ = NumElements(); + return Status::OK(); + } // Saving the initial value of num_tuples because each child->ReadValueBatch() will // update it. int scratch_batch_idx = scratch_batch->num_tuples; @@ -390,6 +416,14 @@ Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch, } } row_idx_ += scratch_batch->num_tuples - scratch_batch_idx; + if (row_validator_ && scanner_->scan_node_->IsZeroSlotTableScan()) { + DCHECK_EQ(1, batch_->fields.size()); // We should only select 'currentTransaction'. + DCHECK_EQ(scratch_batch_idx, scratch_batch->num_tuples); + int num_to_fake_read = std::min(scratch_batch->capacity - scratch_batch->num_tuples, + NumElements() - row_idx_); + scratch_batch->num_tuples += num_to_fake_read; + row_idx_ += num_to_fake_read; + } return Status::OK(); } @@ -419,6 +453,14 @@ Status OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) { for (int c = 0; c < size; ++c) { RETURN_IF_ERROR(children_[c]->UpdateInputBatch(batch_->fields[children_fields_[c]])); } + if (row_validator_) { + orc::ColumnVectorBatch* write_id_batch = + batch_->fields[current_write_id_field_index_]; + DCHECK_EQ(static_cast(write_id_batch), + dynamic_cast(write_id_batch)); + row_validator_->UpdateTransactionBatch( + static_cast(write_id_batch)); + } return Status::OK(); } @@ -451,7 +493,7 @@ Status OrcCollectionReader::AssembleCollection(int row_idx, Tuple* tuple, MemPoo int OrcListReader::NumElements() const { if (DirectReader()) return batch_ != nullptr ? batch_->numElements : 0; if (children_.empty()) { - return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0; + return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0; } return children_[0]->NumElements(); } @@ -575,7 +617,7 @@ Status OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) { return Status::OK(); } -int OrcListReader::GetNumTuples(int row_idx) const { +int OrcListReader::GetNumChildValues(int row_idx) const { if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) return 0; DCHECK_GT(batch_->offsets.size(), row_idx + 1); return batch_->offsets[row_idx + 1] - batch_->offsets[row_idx]; @@ -688,7 +730,7 @@ Status OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) { return Status::OK(); } -int OrcMapReader::GetNumTuples(int row_idx) const { +int OrcMapReader::GetNumChildValues(int row_idx) const { if (IsNull(batch_, row_idx)) return 0; DCHECK_GT(batch_->offsets.size(), row_idx + 1); return batch_->offsets[row_idx + 1] - batch_->offsets[row_idx]; diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h index 001b913e4..54f3a94d3 100644 --- a/be/src/exec/orc-column-readers.h +++ b/be/src/exec/orc-column-readers.h @@ -28,6 +28,21 @@ namespace impala { class HdfsOrcScanner; +class OrcRowValidator { + public: + OrcRowValidator(const ValidWriteIdList& valid_write_ids) : + valid_write_ids_(valid_write_ids) {} + + void UpdateTransactionBatch(orc::LongVectorBatch* batch) { + write_ids_ = batch; + } + + bool IsRowBatchValid() const; + private: + const ValidWriteIdList& valid_write_ids_; + orc::LongVectorBatch* write_ids_; +}; + /// Base class for reading an ORC column. Each column reader will keep track of an /// orc::ColumnVectorBatch and transfer its values into Impala internals(tuples/slots). /// @@ -486,7 +501,7 @@ class OrcComplexColumnReader : public OrcBatchedReader { /// Num of tuples inside the 'row_idx'-th row. LIST/MAP types will have 0 to N tuples. /// STRUCT type will always have one tuple. - virtual int GetNumTuples(int row_idx) const = 0; + virtual int GetNumChildValues(int row_idx) const = 0; /// Collection values (array items, map keys/values) are concatenated in the child's /// batch. Get the start offset of values inside the 'row_idx'-th collection. @@ -551,7 +566,7 @@ class OrcStructReader : public OrcComplexColumnReader { /// Whether we've finished reading the current orc batch. bool EndOfBatch(); - int GetNumTuples(int row_idx) const override { return 1; } + int GetNumChildValues(int row_idx) const final { return 1; } int GetChildBatchOffset(int row_idx) const override { return row_idx; } @@ -561,6 +576,7 @@ class OrcStructReader : public OrcComplexColumnReader { OrcColumnReader* child = children()[0]; return child->NumElements(); } + private: orc::StructVectorBatch* batch_ = nullptr; @@ -570,6 +586,9 @@ class OrcStructReader : public OrcComplexColumnReader { /// Keep row index if we're top level readers int row_idx_; + int current_write_id_field_index_ = -1; + std::unique_ptr row_validator_; + void SetNullSlot(Tuple* tuple) override { for (OrcColumnReader* child : children_) child->SetNullSlot(tuple); } @@ -580,6 +599,8 @@ class OrcStructReader : public OrcComplexColumnReader { /// '*child' and its index inside the children. Returns false for not found. inline bool FindChild(const orc::Type& curr_node, const SchemaPath& child_path, const orc::Type** child, int* field); + + Status ReadAndValidateRows(ScratchTupleBatch* scratch_batch, MemPool* pool); }; class OrcCollectionReader : public OrcComplexColumnReader { @@ -628,14 +649,14 @@ class OrcListReader : public OrcCollectionReader { Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT; - int GetNumTuples(int row_idx) const override; + int GetNumChildValues(int row_idx) const final; int GetChildBatchOffset(int row_idx) const override; Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool) const override WARN_UNUSED_RESULT; - virtual int NumElements() const final; + int NumElements() const final; private: Status SetPositionSlot(int row_idx, Tuple* tuple); @@ -662,14 +683,14 @@ class OrcMapReader : public OrcCollectionReader { Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT; - int GetNumTuples(int row_idx) const override; + int GetNumChildValues(int row_idx) const final; int GetChildBatchOffset(int row_idx) const override; Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool) const override WARN_UNUSED_RESULT; - virtual int NumElements() const final; + int NumElements() const final; private: orc::MapVectorBatch* batch_ = nullptr; vector key_readers_; diff --git a/be/src/exec/orc-metadata-utils.cc b/be/src/exec/orc-metadata-utils.cc index 03433dbea..dbb0d53ba 100644 --- a/be/src/exec/orc-metadata-utils.cc +++ b/be/src/exec/orc-metadata-utils.cc @@ -84,7 +84,7 @@ Status OrcSchemaResolver::ResolveColumn(const SchemaPath& col_path, *node = root_; *pos_field = false; *missing_field = false; - DCHECK(ValidateFullAcidFileSchema().ok()); // Should have already been validated. + DCHECK_OK(ValidateFullAcidFileSchema()); // Should have already been validated. bool translate_acid_path = is_table_full_acid_ && is_file_full_acid_; int num_part_cols = tbl_desc_.num_clustering_cols(); for (int i = 0; i < col_path.size(); ++i) { diff --git a/be/src/exec/orc-metadata-utils.h b/be/src/exec/orc-metadata-utils.h index 72ddc5426..50420925b 100644 --- a/be/src/exec/orc-metadata-utils.h +++ b/be/src/exec/orc-metadata-utils.h @@ -24,6 +24,12 @@ namespace impala { +// Key of Hive ACID version in ORC metadata. +const string HIVE_ACID_VERSION_KEY = "hive.acid.version"; + +// ORC type id of column "currentTransaction" in full ACID ORC files. +constexpr int CURRENT_TRANSCACTION_TYPE_ID = 5; + /// Util class to resolve SchemaPaths of TupleDescriptors/SlotDescriptors into orc::Type. class OrcSchemaResolver { public: diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc index 3b9fe8441..b387f1630 100644 --- a/be/src/runtime/descriptors.cc +++ b/be/src/runtime/descriptors.cc @@ -236,6 +236,7 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPo tdesc.hdfsTable, tdesc.hdfsTable.prototype_partition)); avro_schema_ = tdesc.hdfsTable.__isset.avroSchema ? tdesc.hdfsTable.avroSchema : ""; is_full_acid_ = tdesc.hdfsTable.is_full_acid; + valid_write_id_list_ = tdesc.hdfsTable.valid_write_ids; } void HdfsTableDescriptor::ReleaseResources() { diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h index 67a93fe7e..85e19a739 100644 --- a/be/src/runtime/descriptors.h +++ b/be/src/runtime/descriptors.h @@ -331,6 +331,8 @@ class HdfsTableDescriptor : public TableDescriptor { bool IsTableFullAcid() const { return is_full_acid_; } + const TValidWriteIdList& ValidWriteIdList() const { return valid_write_id_list_; } + virtual std::string DebugString() const; protected: @@ -343,6 +345,7 @@ class HdfsTableDescriptor : public TableDescriptor { /// Set to the table's Avro schema if this is an Avro table, empty string otherwise std::string avro_schema_; bool is_full_acid_; + TValidWriteIdList valid_write_id_list_; }; class HBaseTableDescriptor : public TableDescriptor { diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift index 192b0cfb1..f8ddf63b3 100644 --- a/common/thrift/CatalogObjects.thrift +++ b/common/thrift/CatalogObjects.thrift @@ -331,6 +331,21 @@ struct THdfsPartition { // Must be < 0 to avoid collisions const i64 PROTOTYPE_PARTITION_ID = -1; +// Thrift representation of a Hive ACID valid write id list. +struct TValidWriteIdList { + // Every write id greater than 'high_watermark' are invalid. + 1: optional i64 high_watermark + + // The smallest open write id. + 2: optional i64 min_open_write_id + + // Open or aborted write ids. + 3: optional list invalid_write_ids + + // Indexes of the aborted write ids in 'invalid_write_ids'. The write ids whose index + // are not present here are open. + 4: optional list aborted_indexes +} struct THdfsTable { // ============================================================ @@ -381,6 +396,9 @@ struct THdfsTable { // True if the table is in Hive Full ACID format. 12: optional bool is_full_acid = false + + // Set iff this is an acid table. The valid write ids list. + 13: optional TValidWriteIdList valid_write_ids } struct THBaseTable { @@ -501,12 +519,6 @@ struct TTable { // Set iff this a kudu table 13: optional TKuduTable kudu_table - // Set iff this is an acid table. The valid write ids list. - // The string is assumed to be created by ValidWriteIdList.writeToString - // For example ValidReaderWriteIdList object's format is: - // :::: - 14: optional string valid_write_ids - // Set if this table needs storage access during metadata load. // Time used for storage loading in nanoseconds. 15: optional i64 storage_metadata_load_time_ns diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift index 3b444060a..662325a06 100644 --- a/common/thrift/CatalogService.thrift +++ b/common/thrift/CatalogService.thrift @@ -384,6 +384,9 @@ struct TPartialTableInfo { // SqlConstraints for the table, small enough that we can // return them wholesale. 8: optional SqlConstraints.TSqlConstraints sql_constraints + + // Valid write id list of ACID table. + 9: optional CatalogObjects.TValidWriteIdList valid_write_ids; } // Selector for partial information about a Database. diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java index 2091fbc13..4ae340966 100644 --- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java +++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java @@ -35,6 +35,7 @@ import java.util.BitSet; import java.util.Collections; import java.util.EnumSet; import java.util.List; +import java.util.stream.Collectors; import org.apache.hadoop.hive.common.StatsSetupConst; import org.apache.hadoop.hive.common.ValidReaderWriteIdList; @@ -98,6 +99,7 @@ import org.apache.impala.service.Frontend; import org.apache.impala.service.MetadataOp; import org.apache.impala.thrift.TMetadataOpRequest; import org.apache.impala.thrift.TResultSet; +import org.apache.impala.thrift.TValidWriteIdList; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.AcidUtils.TblTransaction; import org.apache.impala.util.MetaStoreUtil.InsertEventInfo; @@ -597,6 +599,57 @@ public class MetastoreShim { return new ValidReaderWriteIdList(validWriteIds); } + /** + * Converts a TValidWriteIdList object to ValidWriteIdList. + * @param tableName the name of the table. + * @param validWriteIds the thrift object. + * @return ValidWriteIdList object + */ + public static ValidWriteIdList getValidWriteIdListFromThrift(String tableName, + TValidWriteIdList validWriteIds) { + Preconditions.checkNotNull(validWriteIds); + BitSet abortedBits; + if (validWriteIds.getAborted_indexesSize() > 0) { + abortedBits = new BitSet(validWriteIds.getInvalid_write_idsSize()); + for (int aborted_index : validWriteIds.getAborted_indexes()) { + abortedBits.set(aborted_index); + } + } else { + abortedBits = new BitSet(); + } + long highWatermark = validWriteIds.isSetHigh_watermark() ? + validWriteIds.high_watermark : Long.MAX_VALUE; + long minOpenWriteId = validWriteIds.isSetMin_open_write_id() ? + validWriteIds.min_open_write_id : Long.MAX_VALUE; + return new ValidReaderWriteIdList(tableName, + validWriteIds.getInvalid_write_ids().stream().mapToLong(i -> i).toArray(), + abortedBits, highWatermark, minOpenWriteId); + } + + /** + * Converts a ValidWriteIdList object to TValidWriteIdList. + */ + public static TValidWriteIdList convertToTValidWriteIdList( + ValidWriteIdList validWriteIdList) { + Preconditions.checkNotNull(validWriteIdList); + TValidWriteIdList ret = new TValidWriteIdList(); + long minOpenWriteId = validWriteIdList.getMinOpenWriteId() != null ? + validWriteIdList.getMinOpenWriteId() : Long.MAX_VALUE; + ret.setHigh_watermark(validWriteIdList.getHighWatermark()); + ret.setMin_open_write_id(minOpenWriteId); + ret.setInvalid_write_ids(Arrays.stream( + validWriteIdList.getInvalidWriteIds()).boxed().collect(Collectors.toList())); + List abortedIndexes = new ArrayList<>(); + for (int i = 0; i < validWriteIdList.getInvalidWriteIds().length; ++i) { + long writeId = validWriteIdList.getInvalidWriteIds()[i]; + if (validWriteIdList.isWriteIdAborted(writeId)) { + abortedIndexes.add(i); + } + } + ret.setAborted_indexes(abortedIndexes); + return ret; + } + /** * Returns a ValidTxnList object that helps to identify in-progress and aborted * transactions. diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java index 42f9ae4c7..df7ccf9cb 100644 --- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java @@ -257,7 +257,7 @@ public class StmtMetadataLoader { if (AcidUtils.isTransactionalTable(iTbl.getMetaStoreTable().getParameters())) { validIdsBuf.append("\n"); validIdsBuf.append(" "); - validIdsBuf.append(iTbl.getValidWriteIds()); + validIdsBuf.append(iTbl.getValidWriteIds().writeToString()); hasAcidTbls = true; } } diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java index 4bf6d68f7..df3063948 100644 --- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java @@ -20,6 +20,7 @@ package org.apache.impala.catalog; import java.util.List; import java.util.Set; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.IMetaStoreClient; import org.apache.hadoop.hive.metastore.api.FieldSchema; import org.slf4j.Logger; @@ -113,7 +114,7 @@ public class DataSourceTable extends Table implements FeDataSourceTable { } @Override - public String getValidWriteIds() { + public ValidWriteIdList getValidWriteIds() { return null; } diff --git a/fe/src/main/java/org/apache/impala/catalog/FeTable.java b/fe/src/main/java/org/apache/impala/catalog/FeTable.java index b22e11d6f..0d4f3a171 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FeTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/FeTable.java @@ -20,6 +20,7 @@ import java.util.Comparator; import java.util.List; import java.util.Set; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.Table; import org.apache.impala.analysis.TableName; import org.apache.impala.thrift.TCatalogObjectType; @@ -149,7 +150,7 @@ public interface FeTable { /** * @return the valid write id list for this table */ - String getValidWriteIds(); + ValidWriteIdList getValidWriteIds(); /** * @return the owner user for this table. If the table is not loaded or the owner is diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java index 7bcd10f9a..632bb78cf 100644 --- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java +++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java @@ -264,10 +264,10 @@ public class FileMetadataLoader { public int uncommittedAcidFilesSkipped = 0; /** - * Number of files skipped because they pertain to ACID directories superceded - * by later base data. + * Number of files skipped because they pertain to ACID directories superseded + * by compaction or newer base. */ - public int filesSupercededByNewerBase = 0; + public int filesSupersededByAcidState = 0; // Number of files for which the metadata was loaded. public int loadedFiles = 0; @@ -293,7 +293,7 @@ public class FileMetadataLoader { .add("hidden files", nullIfZero(hiddenFiles)) .add("skipped files", nullIfZero(skippedFiles)) .add("uncommited files", nullIfZero(uncommittedAcidFilesSkipped)) - .add("superceded files", nullIfZero(filesSupercededByNewerBase)) + .add("superceded files", nullIfZero(filesSupersededByAcidState)) .add("unknown diskIds", nullIfZero(unknownDiskIds)) .omitNullValues() .toString(); diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java index 2928d9612..ce971b3c4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java @@ -101,6 +101,7 @@ import com.codahale.metrics.Timer; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Joiner; import com.google.common.base.Preconditions; +import com.google.common.base.Stopwatch; import com.google.common.collect.HashMultiset; import com.google.common.collect.ImmutableList; import com.google.common.collect.Iterables; @@ -249,6 +250,10 @@ public class HdfsTable extends Table implements FeFsTable { private SqlConstraints sqlConstraints_ = new SqlConstraints(new ArrayList<>(), new ArrayList<>()); + // Valid write id list for this table. + // null in the case that this table is not transactional. + protected ValidWriteIdList validWriteIds_ = null; + // Represents a set of storage-related statistics aggregated at the table or partition // level. public final static class FileMetadataStats { @@ -624,15 +629,13 @@ public class HdfsTable extends Table implements FeFsTable { .add(p); } - ValidWriteIdList writeIds = validWriteIds_ != null - ? MetastoreShim.getValidWriteIdListFromString(validWriteIds_) : null; //TODO: maybe it'd be better to load the valid txn list in the context of a // transaction to have consistent valid write ids and valid transaction ids. // Currently tables are loaded when they are first referenced and stay in catalog // until certain actions occur (refresh, invalidate, insert, etc.). However, // Impala doesn't notice when HMS's cleaner removes old transactional directories, // which might lead to FileNotFound exceptions. - ValidTxnList validTxnList = writeIds != null ? loadValidTxns(client) : null; + ValidTxnList validTxnList = validWriteIds_ != null ? loadValidTxns(client) : null; // Create a FileMetadataLoader for each path. Map loadersByPath = Maps.newHashMap(); @@ -640,7 +643,7 @@ public class HdfsTable extends Table implements FeFsTable { List oldFds = e.getValue().get(0).getFileDescriptors(); FileMetadataLoader loader = new FileMetadataLoader(e.getKey(), Utils.shouldRecursivelyListPartitions(this), oldFds, hostIndex_, validTxnList, - writeIds, e.getValue().get(0).getFileFormat()); + validWriteIds_, e.getValue().get(0).getFileFormat()); // If there is a cached partition mapped to this path, we recompute the block // locations even if the underlying files have not changed. // This is done to keep the cached block metadata up to date. @@ -1483,6 +1486,10 @@ public class HdfsTable extends Table implements FeFsTable { avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null; isMarkedCached_ = HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters()); + if (hdfsTable.isSetValid_write_ids()) { + validWriteIds_ = MetastoreShim.getValidWriteIdListFromThrift( + getFullName(), hdfsTable.getValid_write_ids()); + } } @Override @@ -1644,6 +1651,10 @@ public class HdfsTable extends Table implements FeFsTable { if (AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) { hdfsTable.setIs_full_acid(true); } + if (validWriteIds_ != null) { + hdfsTable.setValid_write_ids( + MetastoreShim.convertToTValidWriteIdList(validWriteIds_)); + } return hdfsTable; } @@ -2044,4 +2055,49 @@ public class HdfsTable extends Table implements FeFsTable { public boolean isPartitioned() { return getMetaStoreTable().getPartitionKeysSize() > 0; } + + /** + * Get valid write ids for the acid table. + * @param client the client to access HMS + * @return the list of valid write IDs for the table + */ + protected ValidWriteIdList fetchValidWriteIds(IMetaStoreClient client) + throws TableLoadingException { + String tblFullName = getFullName(); + if (LOG.isTraceEnabled()) LOG.trace("Get valid writeIds for table: " + tblFullName); + ValidWriteIdList validWriteIds = null; + try { + validWriteIds = MetastoreShim.fetchValidWriteIds(client, tblFullName); + if (LOG.isTraceEnabled()) { + LOG.trace("Valid writeIds: " + validWriteIds.writeToString()); + } + return validWriteIds; + } catch (Exception e) { + throw new TableLoadingException(String.format("Error loading ValidWriteIds for " + + "table '%s'", getName()), e); + } + } + + /** + * Set ValistWriteIdList with stored writeId + * @param client the client to access HMS + */ + protected void loadValidWriteIdList(IMetaStoreClient client) + throws TableLoadingException { + Stopwatch sw = Stopwatch.createStarted(); + Preconditions.checkState(msTable_ != null && msTable_.getParameters() != null); + if (MetastoreShim.getMajorVersion() > 2 && + AcidUtils.isTransactionalTable(msTable_.getParameters())) { + validWriteIds_ = fetchValidWriteIds(client); + } else { + validWriteIds_ = null; + } + LOG.debug("Load Valid Write Id List Done. Time taken: " + + PrintUtils.printTimeNs(sw.elapsed(TimeUnit.NANOSECONDS))); + } + + @Override + public ValidWriteIdList getValidWriteIds() { + return validWriteIds_; + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java index 8f0f5ada8..faaeeedbd 100644 --- a/fe/src/main/java/org/apache/impala/catalog/Table.java +++ b/fe/src/main/java/org/apache/impala/catalog/Table.java @@ -40,9 +40,7 @@ import org.apache.impala.compat.MetastoreShim; import org.apache.impala.common.ImpalaRuntimeException; import org.apache.impala.common.Metrics; import org.apache.impala.common.Pair; -import org.apache.impala.common.PrintUtils; import org.apache.impala.common.RuntimeEnv; -import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.MetadataOp; import org.apache.impala.thrift.TAccessLevel; import org.apache.impala.thrift.TCatalogObject; @@ -62,7 +60,6 @@ import org.apache.log4j.Logger; import com.codahale.metrics.Timer; import com.google.common.base.Preconditions; -import com.google.common.base.Stopwatch; import com.google.common.collect.Lists; /** @@ -133,11 +130,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { // impalad. protected long lastUsedTime_; - // Valid write id list for this table. - // null in the case that this table is not transactional. - // TODO(todd) this should probably be a ValidWriteIdList in memory instead of a String. - protected String validWriteIds_ = null; - // tracks the in-flight metastore events for this table. Used by Events processor to // avoid unnecessary refresh when the event is received private final InFlightEvents inFlightEvents = new InFlightEvents(); @@ -378,48 +370,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { } } - /** - * Get valid write ids for the acid table. - * @param client the client to access HMS - * @return the list of valid write IDs for the table - */ - protected String fetchValidWriteIds(IMetaStoreClient client) - throws TableLoadingException { - String tblFullName = getFullName(); - if (LOG.isTraceEnabled()) LOG.trace("Get valid writeIds for table: " + tblFullName); - String writeIds = null; - try { - ValidWriteIdList validWriteIds = MetastoreShim.fetchValidWriteIds(client, - tblFullName); - writeIds = validWriteIds == null ? null : validWriteIds.writeToString(); - } catch (Exception e) { - throw new TableLoadingException(String.format("Error loading ValidWriteIds for " + - "table '%s'", getName()), e); - } - if (LOG.isTraceEnabled()) { - LOG.trace("Valid writeIds: " + writeIds); - } - return writeIds; - } - - /** - * Set ValistWriteIdList with stored writeId - * @param client the client to access HMS - */ - protected void loadValidWriteIdList(IMetaStoreClient client) - throws TableLoadingException { - Stopwatch sw = Stopwatch.createStarted(); - Preconditions.checkState(msTable_ != null && msTable_.getParameters() != null); - if (MetastoreShim.getMajorVersion() > 2 && - AcidUtils.isTransactionalTable(msTable_.getParameters())) { - validWriteIds_ = fetchValidWriteIds(client); - } else { - validWriteIds_ = null; - } - LOG.debug("Load Valid Write Id List Done. Time taken: " + - PrintUtils.printTimeNs(sw.elapsed(TimeUnit.NANOSECONDS))); - } - /** * Creates a table of the appropriate type based on the given hive.metastore.api.Table * object. @@ -502,8 +452,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { storageMetadataLoadTime_ = thriftTable.getStorage_metadata_load_time_ns(); storedInImpaladCatalogCache_ = true; - validWriteIds_ = thriftTable.isSetValid_write_ids() ? - thriftTable.getValid_write_ids() : null; } /** @@ -554,9 +502,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { table.setMetastore_table(getMetaStoreTable()); table.setTable_stats(tableStats_); - if (validWriteIds_ != null) { - table.setValid_write_ids(validWriteIds_); - } return table; } @@ -632,7 +577,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { } resp.table_info.setColumn_stats(statsList); } - + if (getMetaStoreTable() != null && + AcidUtils.isTransactionalTable(getMetaStoreTable().getParameters())) { + Preconditions.checkState(getValidWriteIds() != null); + resp.table_info.setValid_write_ids( + MetastoreShim.convertToTValidWriteIdList(getValidWriteIds())); + } return resp; } /** @@ -845,7 +795,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable { } @Override - public String getValidWriteIds() { - return validWriteIds_; + public ValidWriteIdList getValidWriteIds() { + return null; } } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java index 2c8ece434..c66ccbadc 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java @@ -81,6 +81,7 @@ import org.apache.impala.thrift.TUniqueId; import org.apache.impala.thrift.TUnit; import org.apache.impala.thrift.TUpdateCatalogCacheRequest; import org.apache.impala.thrift.TUpdateCatalogCacheResponse; +import org.apache.impala.thrift.TValidWriteIdList; import org.apache.impala.util.ListMap; import org.apache.impala.util.TByteBuffer; import org.apache.thrift.TDeserializer; @@ -703,7 +704,8 @@ public class CatalogdMetaProvider implements MetaProvider { new ArrayList<>() : resp.table_info.sql_constraints.getForeign_keys(); return new TableMetaRefImpl( dbName, tableName, resp.table_info.hms_table, resp.object_version_number, - new SqlConstraints(primaryKeys, foreignKeys)); + new SqlConstraints(primaryKeys, foreignKeys), + resp.table_info.valid_write_ids); } }); return Pair.create(ref.msTable_, (TableMetaRef)ref); @@ -1409,13 +1411,20 @@ public class CatalogdMetaProvider implements MetaProvider { */ private final long catalogVersion_; + /** + * Valid write id list of ACID tables. + */ + private final TValidWriteIdList validWriteIds_; + public TableMetaRefImpl(String dbName, String tableName, - Table msTable, long catalogVersion, SqlConstraints sqlConstraints) { + Table msTable, long catalogVersion, SqlConstraints sqlConstraints, + TValidWriteIdList validWriteIds) { this.dbName_ = dbName; this.tableName_ = tableName; this.msTable_ = msTable; this.catalogVersion_ = catalogVersion; this.sqlConstraints_ = sqlConstraints; + this.validWriteIds_ = validWriteIds; } @Override @@ -1640,4 +1649,11 @@ public class CatalogdMetaProvider implements MetaProvider { return (int)size; } } + + @Override + public TValidWriteIdList getValidWriteIdList(TableMetaRef ref) { + Preconditions.checkArgument(ref instanceof TableMetaRefImpl, + "table ref %s was not created by CatalogdMetaProvider", ref); + return ((TableMetaRefImpl)ref).validWriteIds_; + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java index 849aa5274..98527a7e4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import org.apache.commons.lang.NotImplementedException; import org.apache.hadoop.fs.Path; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Database; @@ -46,6 +47,7 @@ import org.apache.impala.compat.MetastoreShim; import org.apache.impala.service.BackendConfig; import org.apache.impala.thrift.TBackendGflags; import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TValidWriteIdList; import org.apache.impala.util.ListMap; import org.apache.impala.util.MetaStoreUtil; import org.apache.thrift.TException; @@ -397,4 +399,10 @@ class DirectMetaProvider implements MetaProvider { return msTable_.getPartitionKeysSize() != 0; } } + + @Override + public TValidWriteIdList getValidWriteIdList(TableMetaRef ref) { + throw new NotImplementedException( + "getValidWriteIdList() is not implemented for DirectMetaProvider"); + } } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java index 8321a2859..036cce3d4 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java @@ -59,6 +59,7 @@ import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.thrift.TResultSet; import org.apache.impala.thrift.TTableDescriptor; import org.apache.impala.thrift.TTableType; +import org.apache.impala.thrift.TValidWriteIdList; import org.apache.impala.util.AcidUtils; import org.apache.impala.util.AvroSchemaConverter; import org.apache.impala.util.AvroSchemaUtils; @@ -327,6 +328,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable { TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE, FeCatalogUtils.getTColumnDescriptors(this), getNumClusteringCols(), name_, db_.getName()); + // 'ref_' can be null when this table is the target of a CTAS statement. + if (ref_ != null) { + TValidWriteIdList validWriteIdList = + db_.getCatalog().getMetaProvider().getValidWriteIdList(ref_); + if (validWriteIdList != null) hdfsTable.setValid_write_ids(validWriteIdList); + } tableDesc.setHdfsTable(hdfsTable); return tableDesc; } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java index 6a2891b8f..6c22898ec 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java @@ -22,6 +22,7 @@ import java.util.List; import javax.annotation.Nullable; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.TableType; import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj; import org.apache.hadoop.hive.metastore.api.Table; @@ -276,7 +277,7 @@ abstract class LocalTable implements FeTable { } @Override - public String getValidWriteIds() { + public ValidWriteIdList getValidWriteIds() { return null; } diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java index 6b79567b6..6a6022652 100644 --- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java +++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java @@ -33,6 +33,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor; import org.apache.impala.catalog.SqlConstraints; import org.apache.impala.common.Pair; import org.apache.impala.thrift.TNetworkAddress; +import org.apache.impala.thrift.TValidWriteIdList; import org.apache.impala.util.ListMap; import org.apache.thrift.TException; @@ -144,4 +145,6 @@ public interface MetaProvider { byte[] getPartitionStats(); boolean hasIncrementalStats(); } + + public TValidWriteIdList getValidWriteIdList(TableMetaRef ref); } diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java index 075383e6b..e31bac7b3 100644 --- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java +++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java @@ -31,13 +31,18 @@ import org.apache.impala.catalog.ScalarType; import org.apache.impala.catalog.StructField; import org.apache.impala.catalog.StructType; import org.apache.impala.common.FileSystemUtil; -import org.apache.impala.thrift.TQueryOptions; +import org.apache.impala.common.Pair; import org.apache.impala.thrift.TTransactionalType; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.ArrayList; +import java.util.Comparator; +import java.util.HashSet; import java.util.Iterator; import java.util.List; import java.util.Map; +import java.util.Set; import java.util.function.Predicate; import java.util.regex.Matcher; import java.util.regex.Pattern; @@ -52,6 +57,7 @@ import javax.annotation.Nullable; *

*/ public class AcidUtils { + private final static Logger LOG = LoggerFactory.getLogger(AcidUtils.class); // Constant also defined in TransactionalValidationListener public static final String INSERTONLY_TRANSACTIONAL_PROPERTY = "insert_only"; // Constant also defined in hive_metastoreConstants @@ -85,7 +91,8 @@ public class AcidUtils { "delta_" + "(?\\d+)_" + "(?\\d+)" + - "(?:_(?\\d+)|_v(?\\d+))?" + + // Statement id, or visiblityTxnId + "(?:_(?\\d+)|_v(?\\d+))?" + // Optional path suffix. "(?:/.*)?"; @@ -181,9 +188,10 @@ public class AcidUtils { } else { ParsedDelta pd = parseDelta(dirPath); if (pd != null) { + if (!isTxnValid(pd.visibilityTxnId)) return false; ValidWriteIdList.RangeResponse rr = writeIdList.isWriteIdRangeValid(pd.minWriteId, pd.maxWriteId); - return rr.equals(ValidWriteIdList.RangeResponse.ALL); + return rr != ValidWriteIdList.RangeResponse.NONE; } } // If it wasn't in a base or a delta directory, we should include it. @@ -232,15 +240,17 @@ public class AcidUtils { private static final class ParsedDelta { final long minWriteId; final long maxWriteId; - /** - * Negative value indicates there was no statement id. - */ + /// Value -1 means there is no statement id. final long statementId; + /// Value -1 means there is no visibility txn id. + final long visibilityTxnId; - ParsedDelta(long minWriteId, long maxWriteId, long statementId) { + ParsedDelta(long minWriteId, long maxWriteId, long statementId, + long visibilityTxnId) { this.minWriteId = minWriteId; this.maxWriteId = maxWriteId; this.statementId = statementId; + this.visibilityTxnId = visibilityTxnId; } } @@ -250,9 +260,12 @@ public class AcidUtils { } long minWriteId = Long.valueOf(deltaMatcher.group("minWriteId")); long maxWriteId = Long.valueOf(deltaMatcher.group("maxWriteId")); - String statementIdStr = deltaMatcher.group("optionalStatementId"); + String statementIdStr = deltaMatcher.group("statementId"); long statementId = statementIdStr != null ? Long.valueOf(statementIdStr) : -1; - return new ParsedDelta(minWriteId, maxWriteId, statementId); + String visibilityTxnIdStr = deltaMatcher.group("visibilityTxnId"); + long visibilityTxnId = visibilityTxnIdStr != null ? + Long.valueOf(visibilityTxnIdStr) : -1; + return new ParsedDelta(minWriteId, maxWriteId, statementId, visibilityTxnId); } private static ParsedDelta parseDelta(String dirPath) { @@ -263,6 +276,15 @@ public class AcidUtils { return matcherToParsedDelta(DELETE_DELTA_PATTERN.matcher(dirPath)); } + private static String getFirstDirName(String relPath) { + int slashIdx = relPath.indexOf("/"); + if (slashIdx != -1) { + return relPath.substring(0, slashIdx); + } else { + return null; + } + } + /** * Filters the files based on Acid state. * @param stats the FileStatuses obtained from recursively listing the directory @@ -278,13 +300,13 @@ public class AcidUtils { public static List filterFilesForAcidState(List stats, Path baseDir, ValidTxnList validTxnList, ValidWriteIdList writeIds, @Nullable LoadStats loadStats) throws MetaException { - List validStats = new ArrayList<>(stats); - // First filter out any paths that are not considered valid write IDs. - // At the same time, calculate the max valid base write ID. + // At the same time, calculate the max valid base write ID and collect the names of + // the delta directories. Predicate pred = new WriteListBasedPredicate(validTxnList, writeIds); long maxBaseWriteId = Long.MIN_VALUE; - for (Iterator it = validStats.iterator(); it.hasNext(); ) { + Set deltaDirNames = new HashSet<>(); + for (Iterator it = stats.iterator(); it.hasNext();) { FileStatus stat = it.next(); String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir); if (!pred.test(relPath)) { @@ -292,13 +314,30 @@ public class AcidUtils { if (loadStats != null) loadStats.uncommittedAcidFilesSkipped++; continue; } - maxBaseWriteId = Math.max(getBaseWriteId(relPath), maxBaseWriteId); + String dirName = getFirstDirName(relPath); + if (dirName != null && (dirName.startsWith("delta_") || + dirName.startsWith("delete_delta_"))) { + deltaDirNames.add(dirName); + } } + // Get a list of all valid delta directories. + List> deltas = + getValidDeltaDirsOrdered(deltaDirNames, maxBaseWriteId, writeIds); + // Filter out delta directories superceded by major/minor compactions. + Set filteredDeltaDirs = + getFilteredDeltaDirs(deltas, maxBaseWriteId, writeIds); + // Filter out any files that are superceded by the latest valid base or not located + // in 'filteredDeltaDirs'. + return filterFilesForAcidState(stats, baseDir, maxBaseWriteId, filteredDeltaDirs, + loadStats); + } - // Filter out any files that are superceded by the latest valid base, - // as well as any directories. - for (Iterator it = validStats.iterator(); it.hasNext(); ) { + private static List filterFilesForAcidState(List stats, + Path baseDir, long maxBaseWriteId, Set deltaDirs, + @Nullable LoadStats loadStats) { + List validStats = new ArrayList<>(stats); + for (Iterator it = validStats.iterator(); it.hasNext();) { FileStatus stat = it.next(); if (stat.isDirectory()) { @@ -307,39 +346,26 @@ public class AcidUtils { } String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir); + if (relPath.startsWith("delta_") || + relPath.startsWith("delete_delta_")) { + String dirName = getFirstDirName(relPath); + if (dirName != null && !deltaDirs.contains(dirName)) { + it.remove(); + if (loadStats != null) loadStats.filesSupersededByAcidState++; + } + continue; + } long baseWriteId = getBaseWriteId(relPath); if (baseWriteId != SENTINEL_BASE_WRITE_ID) { if (baseWriteId < maxBaseWriteId) { it.remove(); - if (loadStats != null) loadStats.filesSupercededByNewerBase++; + if (loadStats != null) loadStats.filesSupersededByAcidState++; } continue; } - ParsedDelta parsedDelta = parseDelta(relPath); - if (parsedDelta != null) { - if (parsedDelta.minWriteId <= maxBaseWriteId) { - it.remove(); - if (loadStats != null) loadStats.filesSupercededByNewerBase++; - } else if (parsedDelta.minWriteId != parsedDelta.maxWriteId) { - // TODO(IMPALA-9512): Validate rows in minor compacted deltas. - // We could read the non-compacted delta directories, but we'd need to check - // that all of them still exists. Let's throw an error on minor compacted tables - // for now since we want to read minor compacted deltas in the near future. - throw new MetaException("Table is minor compacted which is not supported " + - "by Impala. Run major compaction to resolve this."); - } - continue; - } - ParsedDelta deleteDelta = parseDeleteDelta(relPath); - if (deleteDelta != null) { - if (deleteDelta.maxWriteId > maxBaseWriteId) { - throw new MetaException("Table has deleted rows. It's currently not " + - "supported by Impala. Run major compaction to resolve this."); - } - } - // Not in a base or a delta directory. In that case, it's probably a post-upgrade - // file. + // Not in a base or a delta directory. In that case, it's probably a + // post-upgrade file. // If there is no valid base: we should read the file (assuming that // hive.mm.allow.originals == true) // If there is a valid base: the file should be merged to the base by the @@ -349,4 +375,141 @@ public class AcidUtils { } return validStats; } + + private static List> getValidDeltaDirsOrdered( + Set deltaDirNames, long baseWriteId, ValidWriteIdList writeIds) + throws MetaException { + List > deltas = new ArrayList<>(); + for (Iterator it = deltaDirNames.iterator(); it.hasNext();) { + String dirname = it.next(); + ParsedDelta parsedDelta = parseDelta(dirname); + if (parsedDelta != null) { + if (parsedDelta.minWriteId <= baseWriteId) { + Preconditions.checkState(parsedDelta.maxWriteId <= baseWriteId); + it.remove(); + continue; + } + deltas.add(new Pair(dirname, parsedDelta)); + continue; + } + ParsedDelta deleteDelta = parseDeleteDelta(dirname); + if (deleteDelta != null) { + if (deleteDelta.maxWriteId > baseWriteId) { + throw new MetaException("Table has deleted rows. It's currently not " + + "supported by Impala. Run major compaction to resolve this."); + } + } + } + + deltas.sort(new Comparator>() { + // This compare method is based on Hive (d6ad73c3615) + // AcidUtils.ParsedDeltaLight.compareTo() + // One additon to it is to take the visbilityTxnId into consideration. Hence if + // there's delta_N_M and delta_N_M_v001234 then delta_N_M_v001234 must be ordered + // before. + @Override + public int compare(Pair o1, Pair o2) { + ParsedDelta pd1 = o1.second; + ParsedDelta pd2 = o2.second; + if (pd1.minWriteId != pd2.minWriteId) { + if (pd1.minWriteId < pd2.minWriteId) { + return -1; + } else { + return 1; + } + } else if (pd1.maxWriteId != pd2.maxWriteId) { + if (pd1.maxWriteId < pd2.maxWriteId) { + return 1; + } else { + return -1; + } + } else if (pd1.statementId != pd2.statementId) { + /** + * We want deltas after minor compaction (w/o statementId) to sort earlier so + * that getAcidState() considers compacted files (into larger ones) obsolete + * Before compaction, include deltas with all statementIds for a given writeId. + */ + if (pd1.statementId < pd2.statementId) { + return -1; + } else { + return 1; + } + } else if (pd1.visibilityTxnId != pd2.visibilityTxnId) { + // This is an alteration from Hive's algorithm. If everything is the same then + // the higher visibilityTxnId wins (since no visibiltyTxnId is -1). + // Currently this cannot happen since Hive doesn't minor compact standalone + // delta directories of streaming ingestion, i.e. the following cannot happen: + // delta_1_5 => delta_1_5_v01234 + // However, it'd make sense because streaming ingested ORC files doesn't use + // advanced features like dictionary encoding or statistics. Hence Hive might + // do that in the future and that'd make Impala seeing duplicate rows. + // So I'd be cautious here in case they forget to tell us. + if (pd1.visibilityTxnId < pd2.visibilityTxnId) { + return 1; + } else { + return -1; + } + } else { + return o1.first.compareTo(o2.first); + } + } + }); + return deltas; + } + + /** + * The algorithm is copied from Hive's (d6ad73c3615) + * org.apache.hadoop.hive.ql.io.AcidUtils.getAcidState() + * One additon to it is to take the visbilityTxnId into consideration. Hence if + * there's delta_N_M_v001234 and delta_N_M then it ignores delta_N_M. + */ + private static Set getFilteredDeltaDirs(List> deltas, + long baseWriteId, ValidWriteIdList writeIds) { + long current = baseWriteId; + long lastStmtId = -1; + ParsedDelta prev = null; + Set filteredDeltaDirs = new HashSet<>(); + for (Pair pathDelta : deltas) { + ParsedDelta next = pathDelta.second; + if (next.maxWriteId > current) { + // are any of the new transactions ones that we care about? + if (writeIds.isWriteIdRangeValid(current + 1, next.maxWriteId) != + ValidWriteIdList.RangeResponse.NONE) { + filteredDeltaDirs.add(pathDelta.first); + current = next.maxWriteId; + lastStmtId = next.statementId; + prev = next; + } + } else if (next.maxWriteId == current && lastStmtId >= 0) { + // make sure to get all deltas within a single transaction; multi-statement txn + // generate multiple delta files with the same txnId range + // of course, if maxWriteId has already been minor compacted, all per statement + // deltas are obsolete + filteredDeltaDirs.add(pathDelta.first); + prev = next; + } else if (prev != null && next.maxWriteId == prev.maxWriteId && + next.minWriteId == prev.minWriteId && + next.statementId == prev.statementId && + // If visibilityTxnId differs, then 'pathDelta' is probably a streaming ingested + // delta directory and 'prev' is the compacted version of it. + next.visibilityTxnId == prev.visibilityTxnId) { + // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta, + // except + // the path. This may happen when we have split update and we have two types of + // delta + // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range. + + // Also note that any delete_deltas in between a given delta_x_y range would be + // made + // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete. + // This is valid because minor compaction always compacts the normal deltas and + // the delete deltas for the same range. That is, if we had 3 directories, + // delta_30_30, delete_delta_40_40 and delta_50_50, then running minor compaction + // would produce delta_30_50 and delete_delta_30_50. + filteredDeltaDirs.add(pathDelta.first); + prev = next; + } + } + return filteredDeltaDirs; + } } diff --git a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java index 4632de0c0..730a02ec1 100644 --- a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java @@ -97,8 +97,7 @@ public class StmtMetadataLoaderTest { for (FeTable t: stmtTableCache.tables.values()) { Assert.assertTrue(t.isLoaded()); Assert.assertTrue(t.getValidWriteIds() != null); - Assert.assertTrue(MetastoreShim.getValidWriteIdListFromString(t.getValidWriteIds()) - .isWriteIdValid(t.getWriteId())); + Assert.assertTrue(t.getValidWriteIds().isWriteIdValid(t.getWriteId())); } } diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java index 19b36609c..b6fe6501b 100644 --- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java +++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java @@ -27,8 +27,11 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hive.common.ValidReadTxnList; +import org.apache.hadoop.hive.common.ValidWriteIdList; import org.apache.hadoop.hive.metastore.api.MetaException; import org.apache.impala.catalog.HdfsPartition.FileDescriptor; +import org.apache.impala.compat.MetastoreShim; import org.apache.impala.thrift.TNetworkAddress; import org.apache.impala.util.ListMap; import org.junit.Test; @@ -108,6 +111,24 @@ public class FileMetadataLoaderTest { relPaths.get(2)); } + @Test + public void testAcidMinorCompactionLoading() throws IOException, MetaException { + //TODO(IMPALA-9042): Remove "throws MetaException" + ListMap hostIndex = new ListMap<>(); + ValidWriteIdList writeIds = MetastoreShim.getValidWriteIdListFromString( + "functional_orc_def.complextypestbl_minor_compacted:10:10::"); + Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/managed/" + + "complextypestbl_minor_compacted_orc_def/"); + FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true, + /* oldFds = */ Collections.emptyList(), hostIndex, new ValidReadTxnList(""), + writeIds, HdfsFileFormat.ORC); + fml.load(); + // Only load the compacted file. + assertEquals(1, fml.getStats().loadedFiles); + // 2 * 8 files since the hidden '_orc_acid_version' is filtered out later. + assertEquals(16, fml.getStats().filesSupersededByAcidState); + } + @Test public void testLoadMissingDirectory() throws IOException, MetaException { //TODO(IMPALA-9042): Remove "throws MetaException" diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java index 07620b202..65986871c 100644 --- a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java +++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java @@ -36,9 +36,11 @@ import org.apache.impala.compat.MetastoreShim; import org.hamcrest.Matchers; import org.junit.Assume; import org.junit.Test; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; public class AcidUtilsTest { - + private static final Logger LOG = LoggerFactory.getLogger(AcidUtilsTest.class); /** Fake base path to root all FileStatuses under. */ private static final Path BASE_PATH = new Path("file:///foo/bar/"); @@ -189,6 +191,36 @@ public class AcidUtilsTest { "delta_0000009_0000009_0000/0000/def.txt"}); } + public void testStreamingIngest() { + assertFiltering(new String[]{ + "delta_0000001_0000005/bucket_0000", + "delta_0000001_0000005/bucket_0001", + "delta_0000001_0000005/bucket_0002", + "delta_0000001_0000005_v01000/bucket_0000", + "delta_0000001_0000005_v01000/bucket_0001", + "delta_0000001_0000005_v01000/bucket_0002"}, + "1100:1000:1000:", // txn 1000 is open + "default.test:10:10::", // write ids are committed + new String[]{ + "delta_0000001_0000005/bucket_0000", + "delta_0000001_0000005/bucket_0001", + "delta_0000001_0000005/bucket_0002"}); + + assertFiltering(new String[]{ + "delta_0000001_0000005/bucket_0000", + "delta_0000001_0000005/bucket_0001", + "delta_0000001_0000005/bucket_0002", + "delta_0000001_0000005_v01000/bucket_0000", + "delta_0000001_0000005_v01000/bucket_0001", + "delta_0000001_0000005_v01000/bucket_0002"}, + "1100:::", // txn 1000 is committed + "default.test:10:10::", // write ids are committed + new String[]{ + "delta_0000001_0000005_v01000/bucket_0000", + "delta_0000001_0000005_v01000/bucket_0001", + "delta_0000001_0000005_v01000/bucket_0002"}); + } + @Test public void testAbortedCompaction() { assertFiltering(new String[]{ @@ -272,14 +304,14 @@ public class AcidUtilsTest { "delta_000006_0000020/", "delta_000006_0000020/def.txt", "delta_000005.txt"}; - - // Only committed up to transaction 10, so skip the 6-20 delta. + // Only committed up to write id 10, so we can select 6-20 delta. assertFiltering(paths, "default.test:10:1234:1,2,3", new String[]{ "delta_000005_0000005/abc.txt", - "delta_000005_0000005_0000/abc.txt", - "delta_000005.txt"}); + "delta_000006_0000020/def.txt", + "delta_000005.txt", + }); } @Test @@ -339,37 +371,125 @@ public class AcidUtilsTest { } @Test - public void testMinorCompactionFail() { - filteringError(new String[]{ + public void testMinorCompactionAllTxnsValid() { + assertFiltering(new String[]{ "base_0000005/", "base_0000005/abc.txt", - "delta_0000006_0000007/", - "delta_0000006_0000007/00000"}, - // all txns are valid - "", + "delta_0000006_0000006_v01000/00000", + "delta_0000007_0000007_v01000/00000", + "delta_0000006_0000007_v01000/00000"}, + "", // all txns are valid // :::: "default.test:10:1234:1,2,3", - "Table is minor compacted"); - filteringError(new String[]{ + new String[]{ + "base_0000005/abc.txt", + "delta_0000006_0000007_v01000/00000"}); + // Minor compact multi-statement transaction + assertFiltering(new String[]{ "base_0000005/", "base_0000005/abc.txt", - "delta_0000006_0000007_00123/", - "delta_0000006_0000007_00123/00000"}, - // all txns are valid - "", + "delta_0000006_0000006_00001/00000", // statement id 1 + "delta_0000006_0000006_00002/00000", // statement id 2 + "delta_0000006_0000006_00003/00000", // statement id 3 + "delta_0000006_0000006_v01000/00000", // compacted + "delta_0000006_0000006_v01000/00001"}, + "", // all txns are valid + // :::: + "default.test:10:1234:", + new String[]{ + "base_0000005/abc.txt", + "delta_0000006_0000006_v01000/00000", + "delta_0000006_0000006_v01000/00001"}); + // Disjunct minor compacted delta dirs + assertFiltering(new String[]{ + "delta_0000001_0000001/00000", + "delta_0000002_0000002/00000", + "delta_0000003_0000003/00000", + "delta_0000004_0000004/00000", + "delta_0000005_0000005/00000", + "delta_0000006_0000006/00000", + "delta_0000007_0000007/00000", + "delta_0000001_0000003_v00100/00000", + "delta_0000004_0000005_v00101/00000", + "delta_0000001_0000005_v00102/00000", + "delta_0000006_0000007_v00123/00000", + "delta_0000006_0000007_v00123/00001"}, + "", // all txns are valid + // :::: + "default.test:10:1234:", + new String[]{ + "delta_0000001_0000005_v00102/00000", + "delta_0000006_0000007_v00123/00000", + "delta_0000006_0000007_v00123/00001"}); + // Compacted delta range contains aborted write id + assertFiltering(new String[]{ + "delta_0000001_0000001/00000", + "delta_0000002_0000002/00000", + "delta_0000003_0000003/00000", + "delta_0000001_0000003_v01000/00000"}, + "", // all txns are valid + // :::: + "default.test:10:5::2", + new String[]{"delta_0000001_0000003_v01000/00000"}); + } + + @Test + public void testInProgressMinorCompactions() { + assertFiltering(new String[]{ + "base_0000005/", + "base_0000005/abc.txt", + "delta_0000006_0000006/00000", + "delta_0000007_0000007/00000", + "delta_0000006_0000007_v100/00000"}, + // Txns valid up to id 90, so 100 is invalid + "90:90::", // :::: "default.test:10:1234:1,2,3", - "Table is minor compacted"); - filteringError(new String[]{ + new String[]{ + "base_0000005/abc.txt", + "delta_0000006_0000006/00000", + "delta_0000007_0000007/00000"}); + // Minor compact multi-statement transaction + assertFiltering(new String[]{ "base_0000005/", "base_0000005/abc.txt", - "delta_0000006_0000007_v00123/", - "delta_0000006_0000007_v00123/00000"}, - // all txns are valid - "", + "delta_0000006_0000006_00001/00000", // statement id 1 + "delta_0000006_0000006_00002/00000", // statement id 2 + "delta_0000006_0000006_00003/00000", // statement id 3 + "delta_0000006_0000006_v100/00000", // no statement id => compacted + "delta_0000006_0000006_v100/00001"}, + // Txn 100 is invalid + "110:100:100:", // :::: - "default.test:10:1234:1,2,3", - "Table is minor compacted"); + "default.test:10:1234:", + new String[]{ + "base_0000005/abc.txt", + "delta_0000006_0000006_00001/00000", + "delta_0000006_0000006_00002/00000", + "delta_0000006_0000006_00003/00000"}); + // Disjunct minor compacted delta dirs + assertFiltering(new String[]{ + "delta_0000001_0000001/00000", + "delta_0000002_0000002/00000", + "delta_0000003_0000003/00000", + "delta_0000004_0000004/00000", + "delta_0000005_0000005/00000", + "delta_0000006_0000006/00000", + "delta_0000007_0000007/00000", + "delta_0000001_0000003_v00100/00000", + "delta_0000004_0000005_v00101/00000", + "delta_0000001_0000005_v00102/00000", + "delta_0000006_0000007_v00123/00000", + "delta_0000006_0000007_v00123/00001"}, + // Txn 102 is invalid (minor compaction 1-5) + "130:102:102:", + // :::: + "default.test:10:1234:", + new String[]{ + "delta_0000001_0000003_v00100/00000", + "delta_0000004_0000005_v00101/00000", + "delta_0000006_0000007_v00123/00000", + "delta_0000006_0000007_v00123/00001"}); } @Test @@ -400,7 +520,6 @@ public class AcidUtilsTest { // :: "default.test:20:15::", new String[]{ - // No minor compactions after base directory so it should succeed. "base_000010/0000_0", "delta_0000012_0000012_0000/0000_0", "delta_0000012_0000012_0000/0000_1"}); diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py index 590445c70..503c74932 100755 --- a/testdata/bin/generate-schema-statements.py +++ b/testdata/bin/generate-schema-statements.py @@ -654,7 +654,7 @@ def generate_statements(output_name, test_vectors, sections, create_file_format = file_format create_codec = codec if not (section['LOAD'] or section['DEPENDENT_LOAD'] - or section['DEPENDENT_LOAD_HIVE']): + or section['DEPENDENT_LOAD_HIVE'] or section['DEPENDENT_LOAD_ACID']): create_codec = 'none' create_file_format = file_format if file_format not in IMPALA_SUPPORTED_INSERT_FORMATS: diff --git a/testdata/data/README b/testdata/data/README index 7dca8b0db..0dee50999 100644 --- a/testdata/data/README +++ b/testdata/data/README @@ -494,3 +494,8 @@ the same indexing hash but a newer version depends on the time of writing. `ca51fa17-681b-4497-85b7-4f68e7a63ee7-0_1-38-282_20200112194529.parquet` If the impala table was refreshed after this file was written, impala will only query on the file with latest version. + +streaming.orc: +ORC file generated by Hive Streaming Ingestion. I used a slightly altered version of +TestStreaming.testNoBuckets() from Hive 3.1 to generate this file. It contains +values coming from two transactions. The file has two stripes (one per transaction). diff --git a/testdata/data/streaming.orc b/testdata/data/streaming.orc new file mode 100644 index 0000000000000000000000000000000000000000..3594ca952959839118595a745a5ea2640bf32741 GIT binary patch literal 1137 zcmeYda%N{>aA0Hrl1_{ZgcumKlS>j35*QL8A_@?3P6mcV!$e~^C&?(u1j0#SV0fJL zI6)*~O2T8Ni#`FfBAeKKHaq-W6EG{bN!G)dF_U9H^Qq?(5`>c;B`_qZ=}74DF?awq zbv;aan8196`#}ED*0cO2=KfR zQT?9&nS0Vchcn7P83JyP7W@tgv$`v3_FRV1~ZYrm>T=L$@>z*ghonAR}?xx8XC8xaoD=c$eHIk)a-o+-yM=Bq4B;P*# zc<)R-GlSRm-w#j7T~LmC6wmR5QGijRp@B(>iGfE$-GHs>3p2BSkh26Zcp=f^#^{1I zT7cr9XfaJRM?{NRk_83P;zvxhY|O~AVz>c~83T!P;kzZc&+#l_;lDI_O3{ zS7vv#$8I*un6YG1T5x?!{ek%GWeLv{+uOhYtk|z1S0{8(L$2w^uav5;BX)m(37=%{ zt>6$;bJX~DD?D}S<+Xx~uf0;weR6v7)Mb;u_Dkx8eGRe{yz%YrI>9}0k*j-d=N+{3 z5;!WCvU-xrr}7I8TpFw~73musttX6kI{)kR(n3TMO+xH{SN*a5X(%Dq+7xKR^6~4*FCv|_zkaGuVwd_T zpybmO@Mi1!Q(nuXmEG1}^}lsuy1TZo`fGXPm^H7K7Ab9bdplOSV(+Ff$?bQWYNsf4 z*`5ef^?dSv0i$RD*NzY85}HfSxSg+BTNL?PEq+yh%xt&4pC22|?mMq{uKV<+Q>RwH zn(vidc7eatZ&x4tnHur9R-rX29 SA!o~ro4h+%i+}Lq3^4#KmhBq= literal 0 HcmV?d00001 diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql index 3f927c047..dc53371de 100644 --- a/testdata/datasets/functional/functional_schema_template.sql +++ b/testdata/datasets/functional/functional_schema_template.sql @@ -711,6 +711,30 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM functiona ---- DATASET functional ---- BASE_TABLE_NAME +complextypestbl_minor_compacted +---- COLUMNS +id bigint +int_array array +int_array_array array> +int_map map +int_map_array array> +nested_struct struct, c: struct>>>, g: map>>>> +---- DEPENDENT_LOAD_ACID +INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.complextypestbl where id = 1; +INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.complextypestbl where id = 2; +INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.complextypestbl where id = 3; +INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.complextypestbl where id = 4; +INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.complextypestbl where id = 5; +INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.complextypestbl where id = 6; +INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.complextypestbl where id = 7; +INSERT INTO TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM {db_name}{db_suffix}.complextypestbl where id = 8; +ALTER TABLE {db_name}{db_suffix}.{table_name} compact 'minor'; +---- TABLE_PROPERTIES +transactional=true +==== +---- DATASET +functional +---- BASE_TABLE_NAME complextypestbl_non_transactional ---- COLUMNS id bigint diff --git a/testdata/datasets/functional/schema_constraints.csv b/testdata/datasets/functional/schema_constraints.csv index dba9d7a07..a308c1304 100644 --- a/testdata/datasets/functional/schema_constraints.csv +++ b/testdata/datasets/functional/schema_constraints.csv @@ -78,6 +78,7 @@ table_name:complextypes_multifileformat, constraint:restrict_to, table_format:te # TODO: Avro table_name:complextypestbl, constraint:restrict_to, table_format:parquet/none/none table_name:complextypestbl, constraint:restrict_to, table_format:orc/def/block +table_name:complextypestbl_minor_compacted, constraint:restrict_to, table_format:orc/def/block table_name:complextypestbl_medium, constraint:restrict_to, table_format:parquet/none/none table_name:complextypestbl_medium, constraint:restrict_to, table_format:orc/def/block table_name:complextypestbl_non_transactional, constraint:restrict_to, table_format:orc/def/block diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test index 8511da814..3ad8bc805 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test +++ b/testdata/workloads/functional-query/queries/QueryTest/acid-negative.test @@ -64,32 +64,10 @@ select * from acid; INT ==== ---- HIVE_QUERY -alter table $DATABASE.acid compact 'minor' and wait; -==== ----- QUERY -# REFRESH fails because of minor compacted delete delta directories. -refresh acid; ----- CATCH -TableLoadingException -==== ----- QUERY -# Until the old files are still there SELECT works well. -select * from acid; ----- RESULTS -1 -3 -5 -5 -5 -==== ----- HIVE_QUERY alter table $DATABASE.acid compact 'major' and wait; ==== ---- QUERY -# We can't issue REFRESH here because REFRESH only works if the metadata for the table -# is successfully loaded. The previous REFRESH broke table metadata. Once Impala-9042 is -# resolved everything should work smoothly. -invalidate metadata acid; +refresh acid; show files in acid; ---- RESULTS row_regex:'$NAMENODE/$MANAGED_WAREHOUSE_DIR/$DATABASE.db/acid/base_0000005_v\d+/bucket_\d+','\d+K?B','' diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-0.test b/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-0.test new file mode 100644 index 000000000..4ee01e66b --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-0.test @@ -0,0 +1,15 @@ +==== +---- QUERY +refresh streaming; +select * from streaming; +---- RESULTS +---- TYPES +STRING, STRING +==== +---- QUERY +select count(*) from streaming; +---- RESULTS +0 +---- TYPES +BIGINT +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-1.test b/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-1.test new file mode 100644 index 000000000..0813984df --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-1.test @@ -0,0 +1,17 @@ +==== +---- QUERY +refresh streaming; +select * from streaming; +---- RESULTS +'a1','b2' +'a3','b4' +---- TYPES +STRING, STRING +==== +---- QUERY +select count(*) from streaming; +---- RESULTS +2 +---- TYPES +BIGINT +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-2.test b/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-2.test new file mode 100644 index 000000000..8dcf964dd --- /dev/null +++ b/testdata/workloads/functional-query/queries/QueryTest/acid-row-validation-2.test @@ -0,0 +1,19 @@ +==== +---- QUERY +refresh streaming; +select * from streaming; +---- RESULTS +'a1','b2' +'a3','b4' +'a5','b6' +'a7','b8' +---- TYPES +STRING, STRING +==== +---- QUERY +select count(*) from streaming; +---- RESULTS +4 +---- TYPES +BIGINT +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/acid.test b/testdata/workloads/functional-query/queries/QueryTest/acid.test index ecd48e2e4..9d912c4b0 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/acid.test +++ b/testdata/workloads/functional-query/queries/QueryTest/acid.test @@ -111,3 +111,28 @@ show tables; ---- RESULTS 'upgraded_table' ==== +---- QUERY +# Test reading minor-compacted table. +show files in functional_orc_def.complextypestbl_minor_compacted; +---- LABELS +Path,Size,Partition +---- RESULTS +row_regex:'$NAMENODE/test-warehouse/managed/complextypestbl_minor_compacted_orc_def/delta_0000001_0000008_v\d+/bucket_00000','.+KB','' +---- TYPES +STRING,STRING,STRING +==== +---- QUERY +select row__id.originaltransaction, id +from functional_orc_def.complextypestbl_minor_compacted; +---- RESULTS +1,1 +2,2 +3,3 +4,4 +5,5 +6,6 +7,7 +8,8 +---- TYPES +BIGINT,BIGINT +==== diff --git a/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test b/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test index 2ddc06cfa..e5e389985 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test +++ b/testdata/workloads/functional-query/queries/QueryTest/full-acid-rowid.test @@ -135,3 +135,23 @@ ROW__ID.ROWID, ROW__ID.CURRENTTRANSACTION, ROW__ID.OPERATION, ROW__ID.BUCKET, RO ---- TYPES BIGINT, BIGINT, INT, INT, BIGINT, INT ==== +---- QUERY +select row__id.rowid, row__id.currenttransaction, row__id.operation, + row__id.bucket, row__id.originaltransaction, item +from functional_orc_def.complextypestbl_minor_compacted c, c.int_array a; +---- LABELS +ROW__ID.ROWID, ROW__ID.CURRENTTRANSACTION, ROW__ID.OPERATION, ROW__ID.BUCKET, ROW__ID.ORIGINALTRANSACTION, ITEM +---- RESULTS +0,1,0,536870912,1,1 +0,1,0,536870912,1,2 +0,1,0,536870912,1,3 +0,2,0,536870912,2,NULL +0,2,0,536870912,2,1 +0,2,0,536870912,2,2 +0,2,0,536870912,2,NULL +0,2,0,536870912,2,3 +0,2,0,536870912,2,NULL +0,8,0,536870912,8,-1 +---- TYPES +BIGINT, BIGINT, INT, INT, BIGINT, INT +==== diff --git a/tests/query_test/test_acid_row_validation.py b/tests/query_test/test_acid_row_validation.py new file mode 100644 index 000000000..c6438c09b --- /dev/null +++ b/tests/query_test/test_acid_row_validation.py @@ -0,0 +1,72 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Functional tests for ACID integration with Hive. + +import os + +from tests.common.impala_test_suite import ImpalaTestSuite +from tests.common.skip import SkipIfLocal +from tests.util.acid_txn import AcidTxn + + +# Tests that Impala validates rows against a validWriteIdList correctly. +class TestAcidRowValidation(ImpalaTestSuite): + @classmethod + def get_workload(self): + return 'functional-query' + + @classmethod + def setup_class(cls): + super(TestAcidRowValidation, cls).setup_class() + cls.acid = AcidTxn(cls.hive_client) + + @classmethod + def add_test_dimensions(cls): + super(TestAcidRowValidation, cls).add_test_dimensions() + cls.ImpalaTestMatrix.add_constraint(lambda v: + v.get_value('table_format').file_format in ['orc']) + + def _create_test_table(self, vector, unique_database, tbl_name): + fq_tbl_name = "{0}.{1}".format(unique_database, tbl_name) + create_stmt = """CREATE TABLE {0} (a string, b string) STORED AS ORC + TBLPROPERTIES('transactional'='true')""".format(fq_tbl_name) + self.client.execute(create_stmt) + table_uri = self._get_table_location(fq_tbl_name, vector) + table_path = table_uri[table_uri.index("test-warehouse"):] + delta_dir = table_path + "/delta_1_2" + self.hdfs_client.make_dir(delta_dir) + streaming_orc_file = os.environ['IMPALA_HOME'] + "/testdata/data/streaming.orc" + self.hdfs_client.copy_from_local(streaming_orc_file, "/" + delta_dir) + + def _commit_one(self, unique_database, table_name): + txn_id = self.acid.open_txns() + self.acid.allocate_table_write_ids(txn_id, unique_database, table_name) + self.acid.commit_txn(txn_id) + + @SkipIfLocal.hdfs_client + def test_row_validation(self, vector, unique_database): + """Tests reading from a file written by Hive Streaming Ingestion. In the first no rows + are valid. Then we commit the first transaction and read the table. Then we commit the + last transaction and read the table.""" + tbl_name = "streaming" + self._create_test_table(vector, unique_database, tbl_name) + self.run_test_case('QueryTest/acid-row-validation-0', vector, use_db=unique_database) + self._commit_one(unique_database, tbl_name) + self.run_test_case('QueryTest/acid-row-validation-1', vector, use_db=unique_database) + self._commit_one(unique_database, tbl_name) + self.run_test_case('QueryTest/acid-row-validation-2', vector, use_db=unique_database) diff --git a/tests/util/acid_txn.py b/tests/util/acid_txn.py new file mode 100644 index 000000000..c50f4572f --- /dev/null +++ b/tests/util/acid_txn.py @@ -0,0 +1,151 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for addiitional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +from tests.util.thrift_util import create_transport +from hive_metastore import ThriftHiveMetastore +from hive_metastore.ttypes import (AbortTxnRequest, AllocateTableWriteIdsRequest, + CheckLockRequest, CommitTxnRequest, GetValidWriteIdsRequest, HeartbeatRequest, + LockComponent, LockLevel, LockType, LockRequest, OpenTxnRequest, ShowLocksRequest, + TruncateTableRequest, UnlockRequest) +from thrift.protocol import TBinaryProtocol + +# HMS config +metastore_host = "localhost" +metastore_port = "9083" +service = "Hive Metastore Server" +trans_type = 'buffered' + +# User config +user = 'AcidTxn - Impala test' +hostname = 'localhost' + + +# Utility class for interacting with Hive ACID transactions. +# It's basically a facade, i.e. it provides a simplified interface for HMS. +# +# You can also use it interactively from impala-python, e.g.: +# $> impala-python +# >>> from tests.util.acid_txn import AcidTxn +# >>> at = AcidTxn() +# >>> at.get_open_txns() +class AcidTxn(object): + def __init__(self, hms_client=None): + if hms_client: + self.hms_client = hms_client + else: + hive_transport = create_transport( + host=metastore_host, + port=metastore_port, + service=service, + transport_type=trans_type) + protocol = TBinaryProtocol.TBinaryProtocol(hive_transport) + self.hms_client = ThriftHiveMetastore.Client(protocol) + hive_transport.open() + + def get_hms_client(self): + return self.hms_client + + def get_open_txns(self): + return self.hms_client.get_open_txns() + + def get_open_txns_info(self): + return self.hms_client.get_open_txns_info() + + def open_txns(self): + open_txn_req = OpenTxnRequest() + open_txn_req.num_txns = 1 + open_txn_req.user = user + open_txn_req.hostname = hostname + open_txn_resp = self.hms_client.open_txns(open_txn_req) + return open_txn_resp.txn_ids[0] + + def allocate_table_write_ids(self, txn_id, db_name, table_name): + allocate_req = AllocateTableWriteIdsRequest() + allocate_req.dbName = db_name + allocate_req.tableName = table_name + allocate_req.txnIds = [txn_id] + resp = self.hms_client.allocate_table_write_ids(allocate_req) + return resp.txnToWriteIds[0].writeId + + def get_valid_write_ids(self, db_name, table_name): + get_writeids_req = GetValidWriteIdsRequest() + get_writeids_req.fullTableNames = ['{}.{}'.format(db_name, table_name)] + return self.hms_client.get_valid_write_ids(get_writeids_req) + + def show_locks(self, db_name, table_name, part_name=None, is_extended=False): + show_locks_req = ShowLocksRequest() + show_locks_req.dbname = db_name + show_locks_req.tablename = table_name + show_locks_req.partname = part_name + show_locks_req.isExtended = is_extended + return self.hms_client.show_locks(show_locks_req) + + def lock(self, txn_id, db_name, table_name, type=LockType.SHARED_WRITE, + level=LockLevel.TABLE): + lock_comp = LockComponent() + lock_comp.type = type + lock_comp.level = level + lock_comp.dbname = db_name + lock_comp.tablename = table_name + lock_req = LockRequest() + lock_req.component = [lock_comp] + lock_req.txnid = txn_id + lock_req.user = user + lock_req.hostname = hostname + return self.hms_client.lock(lock_req) + + def check_lock(self, lock_id): + check_lock_req = CheckLockRequest() + check_lock_req.lockid = lock_id + return self.hms_client.check_lock(check_lock_req) + + def unlock(self, lock_id): + unlock_req = UnlockRequest() + unlock_req.lockid = lock_id + return self.hms_client.unlock(unlock_req) + + def heartbeat(self, txn_id=None, lock_id=None): + heartbeat_req = HeartbeatRequest() + heartbeat_req.txnid = txn_id + heartbeat_req.lockid = lock_id + self.hms_client.heartbeat(heartbeat_req) + + def commit_txn(self, txn_id): + commit_req = CommitTxnRequest() + commit_req.txnid = txn_id + return self.hms_client.commit_txn(commit_req) + + def abort_txn(self, txn_id): + abort_req = AbortTxnRequest() + abort_req.txnid = txn_id + return self.hms_client.abort_txn(abort_req) + + def truncate_table_req(self, db_name, table_name): + truncate_req = TruncateTableRequest() + truncate_req.dbName = db_name + truncate_req.tableName = table_name + return self.hms_client.truncate_table_req(truncate_req) + + def commit_all_open_txns(self): + open_txns_resp = self.get_open_txns() + min_open = open_txns_resp.min_open_txn + for txn in open_txns_resp.open_txns: + if txn >= min_open: + try: + self.commit_txn(txn) + except Exception as e: + print str(e)