diff --git a/be/src/exec/CMakeLists.txt b/be/src/exec/CMakeLists.txt
index c4440c1ba..bfd7948db 100644
--- a/be/src/exec/CMakeLists.txt
+++ b/be/src/exec/CMakeLists.txt
@@ -26,6 +26,7 @@ set(LIBRARY_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
set(EXECUTABLE_OUTPUT_PATH "${BUILD_OUTPUT_ROOT_DIRECTORY}/exec")
add_library(Exec
+ acid-metadata-utils.cc
aggregation-node.cc
aggregation-node-base.cc
aggregator.cc
@@ -109,6 +110,7 @@ add_library(Exec
add_dependencies(Exec gen-deps)
add_library(ExecTests STATIC
+ acid-metadata-utils-test.cc
delimited-text-parser-test.cc
hash-table-test.cc
hdfs-avro-scanner-test.cc
@@ -118,6 +120,7 @@ add_library(ExecTests STATIC
)
add_dependencies(ExecTests gen-deps)
+ADD_UNIFIED_BE_LSAN_TEST(acid-metadata-utils-test ValidWriteIdListTest.*)
ADD_UNIFIED_BE_LSAN_TEST(zigzag-test ZigzagTest.*)
ADD_UNIFIED_BE_LSAN_TEST(hash-table-test HashTableTest.*)
ADD_UNIFIED_BE_LSAN_TEST(delimited-text-parser-test DelimitedTextParser.*)
diff --git a/be/src/exec/acid-metadata-utils-test.cc b/be/src/exec/acid-metadata-utils-test.cc
new file mode 100644
index 000000000..7db3e572a
--- /dev/null
+++ b/be/src/exec/acid-metadata-utils-test.cc
@@ -0,0 +1,209 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include
+
+#include "testutil/gtest-util.h"
+#include "exec/acid-metadata-utils.h"
+
+#include "gen-cpp/CatalogObjects_types.h"
+
+#include "common/names.h"
+
+using namespace impala;
+
+namespace {
+
+TValidWriteIdList MakeTValidWriteIdList(int64_t high_watermak,
+ const std::vector& invalid_write_ids = {},
+ const std::vector& aborted_indexes = {}) {
+ TValidWriteIdList ret;
+ ret.__set_high_watermark(high_watermak);
+ ret.__set_invalid_write_ids(invalid_write_ids);
+ ret.__set_aborted_indexes(aborted_indexes);
+ return ret;
+}
+
+} // anonymous namespace
+
+TEST(ValidWriteIdListTest, IsWriteIdValid) {
+ ValidWriteIdList write_id_list;
+ // Everything is valid by default.
+ EXPECT_TRUE(write_id_list.IsWriteIdValid(1));
+ EXPECT_TRUE(write_id_list.IsWriteIdValid(1000));
+ EXPECT_TRUE(write_id_list.IsWriteIdValid(std::numeric_limits::max()));
+
+ // Write ids <= 10 are valid
+ write_id_list.InitFrom(MakeTValidWriteIdList(10));
+ for (int i = 1; i <= 10; ++i) {
+ EXPECT_TRUE(write_id_list.IsWriteIdValid(i));
+ }
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(11));
+
+ // Test open write ids
+ write_id_list.InitFrom(MakeTValidWriteIdList(10, {1, 3, 5, 7, 9}));
+ for (int i = 1; i <= 10; ++i) {
+ if (i % 2 == 0) {
+ EXPECT_TRUE(write_id_list.IsWriteIdValid(i));
+ } else {
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(i));
+ }
+ }
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(11));
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(12));
+
+ // Test aborted write ids
+ write_id_list.InitFrom(MakeTValidWriteIdList(10, {1, 3, 5, 7, 9}, {0, 1, 2, 3, 4}));
+ for (int i = 1; i <= 10; ++i) {
+ if (i % 2 == 0) {
+ EXPECT_TRUE(write_id_list.IsWriteIdValid(i));
+ } else {
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(i));
+ }
+ }
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(11));
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(12));
+
+ // Test open and aborted write ids
+ write_id_list.InitFrom(MakeTValidWriteIdList(10, {1, 3, 5, 7, 9}, {3, 4}));
+ for (int i = 1; i <= 10; ++i) {
+ if (i % 2 == 0) {
+ EXPECT_TRUE(write_id_list.IsWriteIdValid(i));
+ } else {
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(i));
+ }
+ }
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(11));
+ EXPECT_FALSE(write_id_list.IsWriteIdValid(12));
+}
+
+TEST(ValidWriteIdListTest, IsWriteIdRangeValid) {
+ ValidWriteIdList write_id_list;
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(1, 2));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(2, 200));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(100, 100000));
+
+ write_id_list.InitFrom(
+ MakeTValidWriteIdList(110, {1,2,3,5,7,9,11,31,50,55,90,97,98,99}));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsWriteIdRangeValid(1, 2));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(2, 200));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(100, 100000));
+
+ write_id_list.InitFrom(MakeTValidWriteIdList(1000000,
+ {1,2,3,5,7,9,11,31,50,55,90,97,98,99,110,1000,1100,2000,10000,150000,550000,90000,
+ 100000,110000,111000,222000,333000,444000,555000,666000,777000,888000,999000,999999}
+ ));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsWriteIdRangeValid(1, 2));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(2, 200));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(100, 100000));
+
+ write_id_list.InitFrom(
+ MakeTValidWriteIdList(1000000, {555000,666000,777000,888000,999000,999999}));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(1, 2));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(2, 200));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(100, 100000));
+
+ write_id_list.InitFrom(MakeTValidWriteIdList(1000, {500,600,700,800,900}));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsWriteIdRangeValid(1100, 2000));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(900, 1100));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(901, 1000));
+
+ write_id_list.InitFrom(MakeTValidWriteIdList(1000));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsWriteIdRangeValid(1100, 1200));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsWriteIdRangeValid(900, 1100));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsWriteIdRangeValid(90, 950));
+}
+
+TEST(ValidWriteIdListTest, IsFileRangeValid) {
+ ValidWriteIdList write_id_list;
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_1_2/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_1_2/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_5_5/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_5_5/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_100_1100/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_100_1100/0000"));
+
+ write_id_list.InitFrom(
+ MakeTValidWriteIdList(110, {1,2,3,5,7,9,11,31,50,55,90,97,98,99}));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_1_2/0000"));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_1_2/0000"));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_2_2/0000"));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_1_1/0000"));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_2_200/0000"));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_2_200/0000"));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_100_100000/0000"));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_100_100000/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_100_100/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_100_100/0000"));
+
+ write_id_list.InitFrom(MakeTValidWriteIdList(1000, {500,600,700,800,900}));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_1100_2000/0000"));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_1100_2000/0000"));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_800_800/0000"));
+ EXPECT_EQ(ValidWriteIdList::NONE, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_500_500/0000"));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_900_1100/0000"));
+ EXPECT_EQ(ValidWriteIdList::SOME, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_900_1100/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_901_1000/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_901_1000/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delta_901_901/0000"));
+ EXPECT_EQ(ValidWriteIdList::ALL, write_id_list.IsFileRangeValid(
+ "/foo/bar/delete_delta_901_901/0000"));
+}
+
+TEST(ValidWriteIdListTest, IsCompacted) {
+ EXPECT_TRUE(ValidWriteIdList::IsCompacted("/foo/delta_00005_00010_v123/000"));
+ EXPECT_TRUE(ValidWriteIdList::IsCompacted("/foo/delete_delta_00001_00010_v123/000"));
+ EXPECT_TRUE(ValidWriteIdList::IsCompacted("/foo/base_00001_v0123/000"));
+
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_1_1/0000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_1_2/0000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_5_10/0000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_05_09/0000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delta_0000010_0000030/0000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delete_delta_1_2/0000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delete_delta_5_10/0000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delete_delta_05_09/0000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/delete_delta_000010_000030/000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/base_10/000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/000"));
+ EXPECT_FALSE(ValidWriteIdList::IsCompacted("/foo/p=1/000"));
+}
diff --git a/be/src/exec/acid-metadata-utils.cc b/be/src/exec/acid-metadata-utils.cc
new file mode 100644
index 000000000..fe4691cd0
--- /dev/null
+++ b/be/src/exec/acid-metadata-utils.cc
@@ -0,0 +1,124 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#include "exec/acid-metadata-utils.h"
+
+#include "common/logging.h"
+#include "gen-cpp/CatalogObjects_types.h"
+
+#include "common/names.h"
+
+namespace impala {
+
+/// Unnamed namespace for auxiliary functions.
+namespace {
+
+const string BASE_PREFIX = "base_";
+const string DELTA_PREFIX = "delta_";
+const string DELETE_DELTA_PREFIX = "delete_delta_";
+
+string GetParentDirName(const string& filepath) {
+ int slash_before_file = filepath.rfind('/');
+ if (slash_before_file <= 0) return "";
+ int slash_before_dirname = filepath.rfind('/', slash_before_file - 1);
+ if (slash_before_dirname <= 0) return "";
+ return filepath.substr(
+ slash_before_dirname + 1, slash_before_file - slash_before_dirname - 1);
+}
+
+inline bool StrStartsWith(const string& str, const string& prefix) {
+ return str.rfind(prefix, 0) == 0;
+}
+
+std::pair GetWriteIdRangeOfDeltaDir(const string& delta_dir) {
+ int min_write_id_pos = 0;
+ if (StrStartsWith(delta_dir, DELTA_PREFIX)) {
+ min_write_id_pos = DELTA_PREFIX.size();
+ }
+ else if (StrStartsWith(delta_dir, DELETE_DELTA_PREFIX)) {
+ min_write_id_pos = DELETE_DELTA_PREFIX.size();
+ } else {
+ DCHECK(false) << delta_dir + " is not a delta directory";
+ }
+ int max_write_id_pos = delta_dir.find('_', min_write_id_pos) + 1;
+ return {std::atoll(delta_dir.c_str() + min_write_id_pos),
+ std::atoll(delta_dir.c_str() + max_write_id_pos)};
+}
+
+} // unnamed namespace
+
+ValidWriteIdList::ValidWriteIdList(const TValidWriteIdList& valid_write_ids) {
+ InitFrom(valid_write_ids);
+}
+
+void ValidWriteIdList::InitFrom(const TValidWriteIdList& valid_write_ids) {
+ if (valid_write_ids.__isset.high_watermark) {
+ high_water_mark_ = valid_write_ids.high_watermark;
+ } else {
+ high_water_mark_ = std::numeric_limits::max();
+ }
+ invalid_write_ids_.clear();
+ for (int64_t invalid_write_id : valid_write_ids.invalid_write_ids) {
+ invalid_write_ids_.insert(invalid_write_id);
+ }
+}
+
+bool ValidWriteIdList::IsWriteIdValid(int64_t write_id) const {
+ if (write_id > high_water_mark_) {
+ return false;
+ }
+ return invalid_write_ids_.find(write_id) == invalid_write_ids_.end();
+}
+
+ValidWriteIdList::RangeResponse ValidWriteIdList::IsWriteIdRangeValid(
+ int64_t min_write_id, int64_t max_write_id) const {
+ if (max_write_id <= high_water_mark_ && invalid_write_ids_.empty()) return ALL;
+
+ bool found_valid = false;
+ bool found_invalid = false;
+ for (int64_t i = min_write_id; i <= max_write_id; ++i) {
+ if (IsWriteIdValid(i)) {
+ found_valid = true;
+ } else {
+ found_invalid = true;
+ }
+ if (found_valid && found_invalid) return SOME;
+ }
+ if (found_invalid) return NONE;
+ return ALL;
+}
+
+ValidWriteIdList::RangeResponse ValidWriteIdList::IsFileRangeValid(
+ const std::string& file_path) const {
+ string dir_name = GetParentDirName(file_path);
+ if (!(StrStartsWith(dir_name, DELTA_PREFIX) ||
+ StrStartsWith(dir_name, DELETE_DELTA_PREFIX))) {
+ return ALL;
+ }
+ std::pair write_id_range = GetWriteIdRangeOfDeltaDir(dir_name);
+ return IsWriteIdRangeValid(write_id_range.first, write_id_range.second);
+}
+
+bool ValidWriteIdList::IsCompacted(const std::string& file_path) {
+ string dir_name = GetParentDirName(file_path);
+ if (!StrStartsWith(dir_name, BASE_PREFIX) &&
+ !StrStartsWith(dir_name, DELTA_PREFIX) &&
+ !StrStartsWith(dir_name, DELETE_DELTA_PREFIX)) return false;
+ return dir_name.find("_v") != string::npos;
+}
+
+} // namespace impala
diff --git a/be/src/exec/acid-metadata-utils.h b/be/src/exec/acid-metadata-utils.h
new file mode 100644
index 000000000..2bd5b283e
--- /dev/null
+++ b/be/src/exec/acid-metadata-utils.h
@@ -0,0 +1,50 @@
+// Licensed to the Apache Software Foundation (ASF) under one
+// or more contributor license agreements. See the NOTICE file
+// distributed with this work for additional information
+// regarding copyright ownership. The ASF licenses this file
+// to you under the Apache License, Version 2.0 (the
+// "License"); you may not use this file except in compliance
+// with the License. You may obtain a copy of the License at
+//
+// http://www.apache.org/licenses/LICENSE-2.0
+//
+// Unless required by applicable law or agreed to in writing,
+// software distributed under the License is distributed on an
+// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+// KIND, either express or implied. See the License for the
+// specific language governing permissions and limitations
+// under the License.
+
+#pragma once
+
+#include
+#include
+#include
+
+namespace impala {
+
+class TValidWriteIdList;
+
+class ValidWriteIdList {
+public:
+ enum RangeResponse {
+ NONE, SOME, ALL
+ };
+
+ ValidWriteIdList() {}
+ ValidWriteIdList(const TValidWriteIdList& valid_write_ids);
+
+ void InitFrom(const TValidWriteIdList& valid_write_ids);
+
+ bool IsWriteIdValid(int64_t write_id) const;
+ RangeResponse IsWriteIdRangeValid(int64_t min_write_id, int64_t max_write_id) const;
+ RangeResponse IsFileRangeValid(const std::string& file_path) const;
+
+ static bool IsCompacted(const std::string& file_path);
+private:
+ void AddInvalidWriteIds(const std::string& invalid_ids_str);
+ int64_t high_water_mark_ = std::numeric_limits::max();
+ std::unordered_set invalid_write_ids_;
+};
+
+}
diff --git a/be/src/exec/hdfs-orc-scanner.cc b/be/src/exec/hdfs-orc-scanner.cc
index 81df06613..f4d4a2832 100644
--- a/be/src/exec/hdfs-orc-scanner.cc
+++ b/be/src/exec/hdfs-orc-scanner.cc
@@ -41,8 +41,6 @@ using namespace impala::io;
DEFINE_bool(enable_orc_scanner, true,
"If false, reading from ORC format tables is not supported");
-const string HIVE_ACID_VERSION_KEY = "hive.acid.version";
-
Status HdfsOrcScanner::IssueInitialRanges(HdfsScanNodeBase* scan_node,
const vector& files) {
DCHECK(!files.empty());
@@ -205,6 +203,19 @@ Status HdfsOrcScanner::Open(ScannerContext* context) {
&reader_->getType(), filename(), is_table_full_acid, is_file_full_acid));
RETURN_IF_ERROR(schema_resolver_->ValidateFullAcidFileSchema());
+ // Hive Streaming Ingestion allocates multiple write ids, hence create delta directories
+ // like delta_5_10. Then it continuously appends new stripes (and footers) to the
+ // ORC files of the delte dir. So it's possible that the file has rows that Impala is
+ // not allowed to see based on its valid write id list. In such cases we need to
+ // validate the write ids of the row batches.
+ if (is_table_full_acid && !ValidWriteIdList::IsCompacted(filename())) {
+ valid_write_ids_.InitFrom(scan_node_->hdfs_table()->ValidWriteIdList());
+ ValidWriteIdList::RangeResponse rows_valid =
+ valid_write_ids_.IsFileRangeValid(filename());
+ DCHECK_NE(rows_valid, ValidWriteIdList::NONE);
+ row_batches_need_validation_ = rows_valid == ValidWriteIdList::SOME;
+ }
+
// Update 'row_reader_options_' based on the tuple descriptor so the ORC lib can skip
// columns we don't need.
RETURN_IF_ERROR(SelectColumns(*scan_node_->tuple_desc()));
@@ -445,6 +456,7 @@ Status HdfsOrcScanner::SelectColumns(const TupleDescriptor& tuple_desc) {
RETURN_IF_ERROR(ResolveColumns(tuple_desc, &selected_nodes, &pos_slots));
for (auto t : selected_nodes) selected_type_ids_.push_back(t->getColumnId());
+
// Select columns for array positions. Due to ORC-450 we can't materialize array
// offsets without materializing its items, so we should still select the item or any
// sub column of the item. To be simple, we choose the max column id in the subtree
@@ -469,6 +481,17 @@ Status HdfsOrcScanner::SelectColumns(const TupleDescriptor& tuple_desc) {
selected_nodes.push_back(array_node);
}
+ // Select "CurrentTransaction" when we need to validate rows.
+ if (row_batches_need_validation_) {
+ // In case of zero-slot scans (e.g. count(*) over the table) we only select the
+ // 'currentTransaction' column.
+ if (scan_node_->IsZeroSlotTableScan()) selected_type_ids_.clear();
+ if (std::find(selected_type_ids_.begin(), selected_type_ids_.end(),
+ CURRENT_TRANSCACTION_TYPE_ID) == selected_type_ids_.end()) {
+ selected_type_ids_.push_back(CURRENT_TRANSCACTION_TYPE_ID);
+ }
+ }
+
COUNTER_SET(num_cols_counter_, static_cast(selected_type_ids_.size()));
row_reader_options_.includeTypes(selected_type_ids_);
return Status::OK();
@@ -494,7 +517,10 @@ Status HdfsOrcScanner::ProcessSplit() {
}
Status HdfsOrcScanner::GetNextInternal(RowBatch* row_batch) {
- if (scan_node_->IsZeroSlotTableScan()) {
+ // In case 'row_batches_need_validation_' is true, we need to look at the row
+ // batches and check their validity. In that case 'currentTransaction' is the only
+ // selected field from the file (in case of zero slot scans).
+ if (scan_node_->IsZeroSlotTableScan() && !row_batches_need_validation_) {
uint64_t file_rows = reader_->getNumberOfRows();
// There are no materialized slots, e.g. count(*) over the table. We can serve
// this query from just the file metadata. We don't need to read the column data.
@@ -741,7 +767,7 @@ Status HdfsOrcScanner::AllocateTupleMem(RowBatch* row_batch) {
Status HdfsOrcScanner::AssembleCollection(
const OrcComplexColumnReader& complex_col_reader, int row_idx,
CollectionValueBuilder* coll_value_builder) {
- int total_tuples = complex_col_reader.GetNumTuples(row_idx);
+ int total_tuples = complex_col_reader.GetNumChildValues(row_idx);
if (!complex_col_reader.MaterializeTuple()) {
// 'complex_col_reader' maps to a STRUCT or collection column of STRUCTs/collections
// and there're no need to materialize current level tuples. Delegate the
diff --git a/be/src/exec/hdfs-orc-scanner.h b/be/src/exec/hdfs-orc-scanner.h
index 728faee80..a95a13fe9 100644
--- a/be/src/exec/hdfs-orc-scanner.h
+++ b/be/src/exec/hdfs-orc-scanner.h
@@ -26,6 +26,7 @@
#include "runtime/exec-env.h"
#include "runtime/io/disk-io-mgr.h"
#include "runtime/runtime-state.h"
+#include "exec/acid-metadata-utils.h"
#include "exec/hdfs-columnar-scanner.h"
#include "exec/hdfs-scan-node.h"
#include "exec/orc-metadata-utils.h"
@@ -216,6 +217,16 @@ class HdfsOrcScanner : public HdfsColumnarScanner {
/// Scan range for the metadata (file tail).
const io::ScanRange* metadata_range_ = nullptr;
+ /// With the help of it we can check the validity of ACID write ids.
+ ValidWriteIdList valid_write_ids_;
+
+ /// True if we need to validate the row batches against the valid write id list. This
+ /// only needs to be done for Hive Streaming Ingestion. The 'write id' will be the same
+ /// within a stripe, but we still need to read the row batches for validation because
+ /// there are no statistics written.
+ /// For files not written by Streaming Ingestion we can assume that every row is valid.
+ bool row_batches_need_validation_ = false;
+
/// Timer for materializing rows. This ignores time getting the next buffer.
ScopedTimer assemble_rows_timer_;
diff --git a/be/src/exec/orc-column-readers.cc b/be/src/exec/orc-column-readers.cc
index 72fe3f4a9..331f42442 100644
--- a/be/src/exec/orc-column-readers.cc
+++ b/be/src/exec/orc-column-readers.cc
@@ -34,6 +34,13 @@ string PrintNode(const orc::Type* node) {
return Substitute("$0 column (ORC id=$1)", node->toString(), node->getColumnId());
}
+bool OrcRowValidator::IsRowBatchValid() const {
+ if (write_ids_ == nullptr || write_ids_->numElements == 0) return true;
+
+ int64_t write_id = write_ids_->data[0];
+ return valid_write_ids_.IsWriteIdValid(write_id);
+}
+
OrcColumnReader* OrcColumnReader::Create(const orc::Type* node,
const SlotDescriptor* slot_desc, HdfsOrcScanner* scanner) {
DCHECK(node != nullptr);
@@ -323,6 +330,9 @@ void OrcStructReader::CreateChildForSlot(const orc::Type* curr_node,
OrcStructReader::OrcStructReader(const orc::Type* node,
const TupleDescriptor* table_tuple_desc, HdfsOrcScanner* scanner)
: OrcComplexColumnReader(node, table_tuple_desc, scanner) {
+ bool needs_row_validation = table_tuple_desc == scanner_->scan_node_->tuple_desc() &&
+ node->getColumnId() == 0 &&
+ scanner_->row_batches_need_validation_;
if (materialize_tuple_) {
for (SlotDescriptor* child_slot : tuple_desc_->slots()) {
// Skip partition columns and missed columns
@@ -337,11 +347,21 @@ OrcStructReader::OrcStructReader(const orc::Type* node,
// matches to a descendant of 'node'. Those tuples should be materialized by the
// corresponding descendant reader. So 'node' should have exactly one selected
// subtype: the child in the path to the target descendant.
- DCHECK_EQ(node->getSubtypeCount(), 1);
+ DCHECK_EQ(node->getSubtypeCount(), needs_row_validation ? 2 : 1);
+ int child_index = needs_row_validation ? 1 : 0;
OrcComplexColumnReader* child = OrcComplexColumnReader::CreateTopLevelReader(
- node->getSubtype(0), table_tuple_desc, scanner);
+ node->getSubtype(child_index), table_tuple_desc, scanner);
children_.push_back(child);
- children_fields_.push_back(0);
+ children_fields_.push_back(child_index);
+ }
+ if (needs_row_validation) {
+ row_validator_.reset(new OrcRowValidator(scanner_->valid_write_ids_));
+ for (int i = 0; i < node->getSubtypeCount(); ++i) {
+ if (node->getSubtype(i)->getColumnId() == CURRENT_TRANSCACTION_TYPE_ID) {
+ current_write_id_field_index_ = i;
+ break;
+ }
+ }
}
}
@@ -374,6 +394,12 @@ Status OrcStructReader::ReadValue(int row_idx, Tuple* tuple, MemPool* pool) {
Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
MemPool* pool) {
+ // Validate row batch if needed.
+ if (row_validator_) DCHECK(scanner_->row_batches_need_validation_);
+ if (row_validator_ && !row_validator_->IsRowBatchValid()) {
+ row_idx_ = NumElements();
+ return Status::OK();
+ }
// Saving the initial value of num_tuples because each child->ReadValueBatch() will
// update it.
int scratch_batch_idx = scratch_batch->num_tuples;
@@ -390,6 +416,14 @@ Status OrcStructReader::TopLevelReadValueBatch(ScratchTupleBatch* scratch_batch,
}
}
row_idx_ += scratch_batch->num_tuples - scratch_batch_idx;
+ if (row_validator_ && scanner_->scan_node_->IsZeroSlotTableScan()) {
+ DCHECK_EQ(1, batch_->fields.size()); // We should only select 'currentTransaction'.
+ DCHECK_EQ(scratch_batch_idx, scratch_batch->num_tuples);
+ int num_to_fake_read = std::min(scratch_batch->capacity - scratch_batch->num_tuples,
+ NumElements() - row_idx_);
+ scratch_batch->num_tuples += num_to_fake_read;
+ row_idx_ += num_to_fake_read;
+ }
return Status::OK();
}
@@ -419,6 +453,14 @@ Status OrcStructReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
for (int c = 0; c < size; ++c) {
RETURN_IF_ERROR(children_[c]->UpdateInputBatch(batch_->fields[children_fields_[c]]));
}
+ if (row_validator_) {
+ orc::ColumnVectorBatch* write_id_batch =
+ batch_->fields[current_write_id_field_index_];
+ DCHECK_EQ(static_cast(write_id_batch),
+ dynamic_cast(write_id_batch));
+ row_validator_->UpdateTransactionBatch(
+ static_cast(write_id_batch));
+ }
return Status::OK();
}
@@ -451,7 +493,7 @@ Status OrcCollectionReader::AssembleCollection(int row_idx, Tuple* tuple, MemPoo
int OrcListReader::NumElements() const {
if (DirectReader()) return batch_ != nullptr ? batch_->numElements : 0;
if (children_.empty()) {
- return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0;
+ return batch_ != nullptr ? batch_->offsets[batch_->numElements] : 0;
}
return children_[0]->NumElements();
}
@@ -575,7 +617,7 @@ Status OrcListReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
return Status::OK();
}
-int OrcListReader::GetNumTuples(int row_idx) const {
+int OrcListReader::GetNumChildValues(int row_idx) const {
if (IsNull(DCHECK_NOTNULL(batch_), row_idx)) return 0;
DCHECK_GT(batch_->offsets.size(), row_idx + 1);
return batch_->offsets[row_idx + 1] - batch_->offsets[row_idx];
@@ -688,7 +730,7 @@ Status OrcMapReader::UpdateInputBatch(orc::ColumnVectorBatch* orc_batch) {
return Status::OK();
}
-int OrcMapReader::GetNumTuples(int row_idx) const {
+int OrcMapReader::GetNumChildValues(int row_idx) const {
if (IsNull(batch_, row_idx)) return 0;
DCHECK_GT(batch_->offsets.size(), row_idx + 1);
return batch_->offsets[row_idx + 1] - batch_->offsets[row_idx];
diff --git a/be/src/exec/orc-column-readers.h b/be/src/exec/orc-column-readers.h
index 001b913e4..54f3a94d3 100644
--- a/be/src/exec/orc-column-readers.h
+++ b/be/src/exec/orc-column-readers.h
@@ -28,6 +28,21 @@ namespace impala {
class HdfsOrcScanner;
+class OrcRowValidator {
+ public:
+ OrcRowValidator(const ValidWriteIdList& valid_write_ids) :
+ valid_write_ids_(valid_write_ids) {}
+
+ void UpdateTransactionBatch(orc::LongVectorBatch* batch) {
+ write_ids_ = batch;
+ }
+
+ bool IsRowBatchValid() const;
+ private:
+ const ValidWriteIdList& valid_write_ids_;
+ orc::LongVectorBatch* write_ids_;
+};
+
/// Base class for reading an ORC column. Each column reader will keep track of an
/// orc::ColumnVectorBatch and transfer its values into Impala internals(tuples/slots).
///
@@ -486,7 +501,7 @@ class OrcComplexColumnReader : public OrcBatchedReader {
/// Num of tuples inside the 'row_idx'-th row. LIST/MAP types will have 0 to N tuples.
/// STRUCT type will always have one tuple.
- virtual int GetNumTuples(int row_idx) const = 0;
+ virtual int GetNumChildValues(int row_idx) const = 0;
/// Collection values (array items, map keys/values) are concatenated in the child's
/// batch. Get the start offset of values inside the 'row_idx'-th collection.
@@ -551,7 +566,7 @@ class OrcStructReader : public OrcComplexColumnReader {
/// Whether we've finished reading the current orc batch.
bool EndOfBatch();
- int GetNumTuples(int row_idx) const override { return 1; }
+ int GetNumChildValues(int row_idx) const final { return 1; }
int GetChildBatchOffset(int row_idx) const override { return row_idx; }
@@ -561,6 +576,7 @@ class OrcStructReader : public OrcComplexColumnReader {
OrcColumnReader* child = children()[0];
return child->NumElements();
}
+
private:
orc::StructVectorBatch* batch_ = nullptr;
@@ -570,6 +586,9 @@ class OrcStructReader : public OrcComplexColumnReader {
/// Keep row index if we're top level readers
int row_idx_;
+ int current_write_id_field_index_ = -1;
+ std::unique_ptr row_validator_;
+
void SetNullSlot(Tuple* tuple) override {
for (OrcColumnReader* child : children_) child->SetNullSlot(tuple);
}
@@ -580,6 +599,8 @@ class OrcStructReader : public OrcComplexColumnReader {
/// '*child' and its index inside the children. Returns false for not found.
inline bool FindChild(const orc::Type& curr_node, const SchemaPath& child_path,
const orc::Type** child, int* field);
+
+ Status ReadAndValidateRows(ScratchTupleBatch* scratch_batch, MemPool* pool);
};
class OrcCollectionReader : public OrcComplexColumnReader {
@@ -628,14 +649,14 @@ class OrcListReader : public OrcCollectionReader {
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT;
- int GetNumTuples(int row_idx) const override;
+ int GetNumChildValues(int row_idx) const final;
int GetChildBatchOffset(int row_idx) const override;
Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool)
const override WARN_UNUSED_RESULT;
- virtual int NumElements() const final;
+ int NumElements() const final;
private:
Status SetPositionSlot(int row_idx, Tuple* tuple);
@@ -662,14 +683,14 @@ class OrcMapReader : public OrcCollectionReader {
Status ReadValue(int row_idx, Tuple* tuple, MemPool* pool) final WARN_UNUSED_RESULT;
- int GetNumTuples(int row_idx) const override;
+ int GetNumChildValues(int row_idx) const final;
int GetChildBatchOffset(int row_idx) const override;
Status ReadChildrenValue(int row_idx, int tuple_idx, Tuple* tuple, MemPool* pool)
const override WARN_UNUSED_RESULT;
- virtual int NumElements() const final;
+ int NumElements() const final;
private:
orc::MapVectorBatch* batch_ = nullptr;
vector key_readers_;
diff --git a/be/src/exec/orc-metadata-utils.cc b/be/src/exec/orc-metadata-utils.cc
index 03433dbea..dbb0d53ba 100644
--- a/be/src/exec/orc-metadata-utils.cc
+++ b/be/src/exec/orc-metadata-utils.cc
@@ -84,7 +84,7 @@ Status OrcSchemaResolver::ResolveColumn(const SchemaPath& col_path,
*node = root_;
*pos_field = false;
*missing_field = false;
- DCHECK(ValidateFullAcidFileSchema().ok()); // Should have already been validated.
+ DCHECK_OK(ValidateFullAcidFileSchema()); // Should have already been validated.
bool translate_acid_path = is_table_full_acid_ && is_file_full_acid_;
int num_part_cols = tbl_desc_.num_clustering_cols();
for (int i = 0; i < col_path.size(); ++i) {
diff --git a/be/src/exec/orc-metadata-utils.h b/be/src/exec/orc-metadata-utils.h
index 72ddc5426..50420925b 100644
--- a/be/src/exec/orc-metadata-utils.h
+++ b/be/src/exec/orc-metadata-utils.h
@@ -24,6 +24,12 @@
namespace impala {
+// Key of Hive ACID version in ORC metadata.
+const string HIVE_ACID_VERSION_KEY = "hive.acid.version";
+
+// ORC type id of column "currentTransaction" in full ACID ORC files.
+constexpr int CURRENT_TRANSCACTION_TYPE_ID = 5;
+
/// Util class to resolve SchemaPaths of TupleDescriptors/SlotDescriptors into orc::Type.
class OrcSchemaResolver {
public:
diff --git a/be/src/runtime/descriptors.cc b/be/src/runtime/descriptors.cc
index 3b9fe8441..b387f1630 100644
--- a/be/src/runtime/descriptors.cc
+++ b/be/src/runtime/descriptors.cc
@@ -236,6 +236,7 @@ HdfsTableDescriptor::HdfsTableDescriptor(const TTableDescriptor& tdesc, ObjectPo
tdesc.hdfsTable, tdesc.hdfsTable.prototype_partition));
avro_schema_ = tdesc.hdfsTable.__isset.avroSchema ? tdesc.hdfsTable.avroSchema : "";
is_full_acid_ = tdesc.hdfsTable.is_full_acid;
+ valid_write_id_list_ = tdesc.hdfsTable.valid_write_ids;
}
void HdfsTableDescriptor::ReleaseResources() {
diff --git a/be/src/runtime/descriptors.h b/be/src/runtime/descriptors.h
index 67a93fe7e..85e19a739 100644
--- a/be/src/runtime/descriptors.h
+++ b/be/src/runtime/descriptors.h
@@ -331,6 +331,8 @@ class HdfsTableDescriptor : public TableDescriptor {
bool IsTableFullAcid() const { return is_full_acid_; }
+ const TValidWriteIdList& ValidWriteIdList() const { return valid_write_id_list_; }
+
virtual std::string DebugString() const;
protected:
@@ -343,6 +345,7 @@ class HdfsTableDescriptor : public TableDescriptor {
/// Set to the table's Avro schema if this is an Avro table, empty string otherwise
std::string avro_schema_;
bool is_full_acid_;
+ TValidWriteIdList valid_write_id_list_;
};
class HBaseTableDescriptor : public TableDescriptor {
diff --git a/common/thrift/CatalogObjects.thrift b/common/thrift/CatalogObjects.thrift
index 192b0cfb1..f8ddf63b3 100644
--- a/common/thrift/CatalogObjects.thrift
+++ b/common/thrift/CatalogObjects.thrift
@@ -331,6 +331,21 @@ struct THdfsPartition {
// Must be < 0 to avoid collisions
const i64 PROTOTYPE_PARTITION_ID = -1;
+// Thrift representation of a Hive ACID valid write id list.
+struct TValidWriteIdList {
+ // Every write id greater than 'high_watermark' are invalid.
+ 1: optional i64 high_watermark
+
+ // The smallest open write id.
+ 2: optional i64 min_open_write_id
+
+ // Open or aborted write ids.
+ 3: optional list invalid_write_ids
+
+ // Indexes of the aborted write ids in 'invalid_write_ids'. The write ids whose index
+ // are not present here are open.
+ 4: optional list aborted_indexes
+}
struct THdfsTable {
// ============================================================
@@ -381,6 +396,9 @@ struct THdfsTable {
// True if the table is in Hive Full ACID format.
12: optional bool is_full_acid = false
+
+ // Set iff this is an acid table. The valid write ids list.
+ 13: optional TValidWriteIdList valid_write_ids
}
struct THBaseTable {
@@ -501,12 +519,6 @@ struct TTable {
// Set iff this a kudu table
13: optional TKuduTable kudu_table
- // Set iff this is an acid table. The valid write ids list.
- // The string is assumed to be created by ValidWriteIdList.writeToString
- // For example ValidReaderWriteIdList object's format is:
- // ::::
- 14: optional string valid_write_ids
-
// Set if this table needs storage access during metadata load.
// Time used for storage loading in nanoseconds.
15: optional i64 storage_metadata_load_time_ns
diff --git a/common/thrift/CatalogService.thrift b/common/thrift/CatalogService.thrift
index 3b444060a..662325a06 100644
--- a/common/thrift/CatalogService.thrift
+++ b/common/thrift/CatalogService.thrift
@@ -384,6 +384,9 @@ struct TPartialTableInfo {
// SqlConstraints for the table, small enough that we can
// return them wholesale.
8: optional SqlConstraints.TSqlConstraints sql_constraints
+
+ // Valid write id list of ACID table.
+ 9: optional CatalogObjects.TValidWriteIdList valid_write_ids;
}
// Selector for partial information about a Database.
diff --git a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
index 2091fbc13..4ae340966 100644
--- a/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
+++ b/fe/src/compat-hive-3/java/org/apache/impala/compat/MetastoreShim.java
@@ -35,6 +35,7 @@ import java.util.BitSet;
import java.util.Collections;
import java.util.EnumSet;
import java.util.List;
+import java.util.stream.Collectors;
import org.apache.hadoop.hive.common.StatsSetupConst;
import org.apache.hadoop.hive.common.ValidReaderWriteIdList;
@@ -98,6 +99,7 @@ import org.apache.impala.service.Frontend;
import org.apache.impala.service.MetadataOp;
import org.apache.impala.thrift.TMetadataOpRequest;
import org.apache.impala.thrift.TResultSet;
+import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AcidUtils.TblTransaction;
import org.apache.impala.util.MetaStoreUtil.InsertEventInfo;
@@ -597,6 +599,57 @@ public class MetastoreShim {
return new ValidReaderWriteIdList(validWriteIds);
}
+ /**
+ * Converts a TValidWriteIdList object to ValidWriteIdList.
+ * @param tableName the name of the table.
+ * @param validWriteIds the thrift object.
+ * @return ValidWriteIdList object
+ */
+ public static ValidWriteIdList getValidWriteIdListFromThrift(String tableName,
+ TValidWriteIdList validWriteIds) {
+ Preconditions.checkNotNull(validWriteIds);
+ BitSet abortedBits;
+ if (validWriteIds.getAborted_indexesSize() > 0) {
+ abortedBits = new BitSet(validWriteIds.getInvalid_write_idsSize());
+ for (int aborted_index : validWriteIds.getAborted_indexes()) {
+ abortedBits.set(aborted_index);
+ }
+ } else {
+ abortedBits = new BitSet();
+ }
+ long highWatermark = validWriteIds.isSetHigh_watermark() ?
+ validWriteIds.high_watermark : Long.MAX_VALUE;
+ long minOpenWriteId = validWriteIds.isSetMin_open_write_id() ?
+ validWriteIds.min_open_write_id : Long.MAX_VALUE;
+ return new ValidReaderWriteIdList(tableName,
+ validWriteIds.getInvalid_write_ids().stream().mapToLong(i -> i).toArray(),
+ abortedBits, highWatermark, minOpenWriteId);
+ }
+
+ /**
+ * Converts a ValidWriteIdList object to TValidWriteIdList.
+ */
+ public static TValidWriteIdList convertToTValidWriteIdList(
+ ValidWriteIdList validWriteIdList) {
+ Preconditions.checkNotNull(validWriteIdList);
+ TValidWriteIdList ret = new TValidWriteIdList();
+ long minOpenWriteId = validWriteIdList.getMinOpenWriteId() != null ?
+ validWriteIdList.getMinOpenWriteId() : Long.MAX_VALUE;
+ ret.setHigh_watermark(validWriteIdList.getHighWatermark());
+ ret.setMin_open_write_id(minOpenWriteId);
+ ret.setInvalid_write_ids(Arrays.stream(
+ validWriteIdList.getInvalidWriteIds()).boxed().collect(Collectors.toList()));
+ List abortedIndexes = new ArrayList<>();
+ for (int i = 0; i < validWriteIdList.getInvalidWriteIds().length; ++i) {
+ long writeId = validWriteIdList.getInvalidWriteIds()[i];
+ if (validWriteIdList.isWriteIdAborted(writeId)) {
+ abortedIndexes.add(i);
+ }
+ }
+ ret.setAborted_indexes(abortedIndexes);
+ return ret;
+ }
+
/**
* Returns a ValidTxnList object that helps to identify in-progress and aborted
* transactions.
diff --git a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
index 42f9ae4c7..df7ccf9cb 100644
--- a/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/analysis/StmtMetadataLoader.java
@@ -257,7 +257,7 @@ public class StmtMetadataLoader {
if (AcidUtils.isTransactionalTable(iTbl.getMetaStoreTable().getParameters())) {
validIdsBuf.append("\n");
validIdsBuf.append(" ");
- validIdsBuf.append(iTbl.getValidWriteIds());
+ validIdsBuf.append(iTbl.getValidWriteIds().writeToString());
hasAcidTbls = true;
}
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
index 4bf6d68f7..df3063948 100644
--- a/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/DataSourceTable.java
@@ -20,6 +20,7 @@ package org.apache.impala.catalog;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.IMetaStoreClient;
import org.apache.hadoop.hive.metastore.api.FieldSchema;
import org.slf4j.Logger;
@@ -113,7 +114,7 @@ public class DataSourceTable extends Table implements FeDataSourceTable {
}
@Override
- public String getValidWriteIds() {
+ public ValidWriteIdList getValidWriteIds() {
return null;
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/FeTable.java b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
index b22e11d6f..0d4f3a171 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FeTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FeTable.java
@@ -20,6 +20,7 @@ import java.util.Comparator;
import java.util.List;
import java.util.Set;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.Table;
import org.apache.impala.analysis.TableName;
import org.apache.impala.thrift.TCatalogObjectType;
@@ -149,7 +150,7 @@ public interface FeTable {
/**
* @return the valid write id list for this table
*/
- String getValidWriteIds();
+ ValidWriteIdList getValidWriteIds();
/**
* @return the owner user for this table. If the table is not loaded or the owner is
diff --git a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
index 7bcd10f9a..632bb78cf 100644
--- a/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
+++ b/fe/src/main/java/org/apache/impala/catalog/FileMetadataLoader.java
@@ -264,10 +264,10 @@ public class FileMetadataLoader {
public int uncommittedAcidFilesSkipped = 0;
/**
- * Number of files skipped because they pertain to ACID directories superceded
- * by later base data.
+ * Number of files skipped because they pertain to ACID directories superseded
+ * by compaction or newer base.
*/
- public int filesSupercededByNewerBase = 0;
+ public int filesSupersededByAcidState = 0;
// Number of files for which the metadata was loaded.
public int loadedFiles = 0;
@@ -293,7 +293,7 @@ public class FileMetadataLoader {
.add("hidden files", nullIfZero(hiddenFiles))
.add("skipped files", nullIfZero(skippedFiles))
.add("uncommited files", nullIfZero(uncommittedAcidFilesSkipped))
- .add("superceded files", nullIfZero(filesSupercededByNewerBase))
+ .add("superceded files", nullIfZero(filesSupersededByAcidState))
.add("unknown diskIds", nullIfZero(unknownDiskIds))
.omitNullValues()
.toString();
diff --git a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
index 2928d9612..ce971b3c4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/HdfsTable.java
@@ -101,6 +101,7 @@ import com.codahale.metrics.Timer;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Joiner;
import com.google.common.base.Preconditions;
+import com.google.common.base.Stopwatch;
import com.google.common.collect.HashMultiset;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Iterables;
@@ -249,6 +250,10 @@ public class HdfsTable extends Table implements FeFsTable {
private SqlConstraints sqlConstraints_ = new SqlConstraints(new ArrayList<>(),
new ArrayList<>());
+ // Valid write id list for this table.
+ // null in the case that this table is not transactional.
+ protected ValidWriteIdList validWriteIds_ = null;
+
// Represents a set of storage-related statistics aggregated at the table or partition
// level.
public final static class FileMetadataStats {
@@ -624,15 +629,13 @@ public class HdfsTable extends Table implements FeFsTable {
.add(p);
}
- ValidWriteIdList writeIds = validWriteIds_ != null
- ? MetastoreShim.getValidWriteIdListFromString(validWriteIds_) : null;
//TODO: maybe it'd be better to load the valid txn list in the context of a
// transaction to have consistent valid write ids and valid transaction ids.
// Currently tables are loaded when they are first referenced and stay in catalog
// until certain actions occur (refresh, invalidate, insert, etc.). However,
// Impala doesn't notice when HMS's cleaner removes old transactional directories,
// which might lead to FileNotFound exceptions.
- ValidTxnList validTxnList = writeIds != null ? loadValidTxns(client) : null;
+ ValidTxnList validTxnList = validWriteIds_ != null ? loadValidTxns(client) : null;
// Create a FileMetadataLoader for each path.
Map loadersByPath = Maps.newHashMap();
@@ -640,7 +643,7 @@ public class HdfsTable extends Table implements FeFsTable {
List oldFds = e.getValue().get(0).getFileDescriptors();
FileMetadataLoader loader = new FileMetadataLoader(e.getKey(),
Utils.shouldRecursivelyListPartitions(this), oldFds, hostIndex_, validTxnList,
- writeIds, e.getValue().get(0).getFileFormat());
+ validWriteIds_, e.getValue().get(0).getFileFormat());
// If there is a cached partition mapped to this path, we recompute the block
// locations even if the underlying files have not changed.
// This is done to keep the cached block metadata up to date.
@@ -1483,6 +1486,10 @@ public class HdfsTable extends Table implements FeFsTable {
avroSchema_ = hdfsTable.isSetAvroSchema() ? hdfsTable.getAvroSchema() : null;
isMarkedCached_ =
HdfsCachingUtil.validateCacheParams(getMetaStoreTable().getParameters());
+ if (hdfsTable.isSetValid_write_ids()) {
+ validWriteIds_ = MetastoreShim.getValidWriteIdListFromThrift(
+ getFullName(), hdfsTable.getValid_write_ids());
+ }
}
@Override
@@ -1644,6 +1651,10 @@ public class HdfsTable extends Table implements FeFsTable {
if (AcidUtils.isFullAcidTable(getMetaStoreTable().getParameters())) {
hdfsTable.setIs_full_acid(true);
}
+ if (validWriteIds_ != null) {
+ hdfsTable.setValid_write_ids(
+ MetastoreShim.convertToTValidWriteIdList(validWriteIds_));
+ }
return hdfsTable;
}
@@ -2044,4 +2055,49 @@ public class HdfsTable extends Table implements FeFsTable {
public boolean isPartitioned() {
return getMetaStoreTable().getPartitionKeysSize() > 0;
}
+
+ /**
+ * Get valid write ids for the acid table.
+ * @param client the client to access HMS
+ * @return the list of valid write IDs for the table
+ */
+ protected ValidWriteIdList fetchValidWriteIds(IMetaStoreClient client)
+ throws TableLoadingException {
+ String tblFullName = getFullName();
+ if (LOG.isTraceEnabled()) LOG.trace("Get valid writeIds for table: " + tblFullName);
+ ValidWriteIdList validWriteIds = null;
+ try {
+ validWriteIds = MetastoreShim.fetchValidWriteIds(client, tblFullName);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Valid writeIds: " + validWriteIds.writeToString());
+ }
+ return validWriteIds;
+ } catch (Exception e) {
+ throw new TableLoadingException(String.format("Error loading ValidWriteIds for " +
+ "table '%s'", getName()), e);
+ }
+ }
+
+ /**
+ * Set ValistWriteIdList with stored writeId
+ * @param client the client to access HMS
+ */
+ protected void loadValidWriteIdList(IMetaStoreClient client)
+ throws TableLoadingException {
+ Stopwatch sw = Stopwatch.createStarted();
+ Preconditions.checkState(msTable_ != null && msTable_.getParameters() != null);
+ if (MetastoreShim.getMajorVersion() > 2 &&
+ AcidUtils.isTransactionalTable(msTable_.getParameters())) {
+ validWriteIds_ = fetchValidWriteIds(client);
+ } else {
+ validWriteIds_ = null;
+ }
+ LOG.debug("Load Valid Write Id List Done. Time taken: " +
+ PrintUtils.printTimeNs(sw.elapsed(TimeUnit.NANOSECONDS)));
+ }
+
+ @Override
+ public ValidWriteIdList getValidWriteIds() {
+ return validWriteIds_;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/Table.java b/fe/src/main/java/org/apache/impala/catalog/Table.java
index 8f0f5ada8..faaeeedbd 100644
--- a/fe/src/main/java/org/apache/impala/catalog/Table.java
+++ b/fe/src/main/java/org/apache/impala/catalog/Table.java
@@ -40,9 +40,7 @@ import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.common.ImpalaRuntimeException;
import org.apache.impala.common.Metrics;
import org.apache.impala.common.Pair;
-import org.apache.impala.common.PrintUtils;
import org.apache.impala.common.RuntimeEnv;
-import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.MetadataOp;
import org.apache.impala.thrift.TAccessLevel;
import org.apache.impala.thrift.TCatalogObject;
@@ -62,7 +60,6 @@ import org.apache.log4j.Logger;
import com.codahale.metrics.Timer;
import com.google.common.base.Preconditions;
-import com.google.common.base.Stopwatch;
import com.google.common.collect.Lists;
/**
@@ -133,11 +130,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
// impalad.
protected long lastUsedTime_;
- // Valid write id list for this table.
- // null in the case that this table is not transactional.
- // TODO(todd) this should probably be a ValidWriteIdList in memory instead of a String.
- protected String validWriteIds_ = null;
-
// tracks the in-flight metastore events for this table. Used by Events processor to
// avoid unnecessary refresh when the event is received
private final InFlightEvents inFlightEvents = new InFlightEvents();
@@ -378,48 +370,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
}
}
- /**
- * Get valid write ids for the acid table.
- * @param client the client to access HMS
- * @return the list of valid write IDs for the table
- */
- protected String fetchValidWriteIds(IMetaStoreClient client)
- throws TableLoadingException {
- String tblFullName = getFullName();
- if (LOG.isTraceEnabled()) LOG.trace("Get valid writeIds for table: " + tblFullName);
- String writeIds = null;
- try {
- ValidWriteIdList validWriteIds = MetastoreShim.fetchValidWriteIds(client,
- tblFullName);
- writeIds = validWriteIds == null ? null : validWriteIds.writeToString();
- } catch (Exception e) {
- throw new TableLoadingException(String.format("Error loading ValidWriteIds for " +
- "table '%s'", getName()), e);
- }
- if (LOG.isTraceEnabled()) {
- LOG.trace("Valid writeIds: " + writeIds);
- }
- return writeIds;
- }
-
- /**
- * Set ValistWriteIdList with stored writeId
- * @param client the client to access HMS
- */
- protected void loadValidWriteIdList(IMetaStoreClient client)
- throws TableLoadingException {
- Stopwatch sw = Stopwatch.createStarted();
- Preconditions.checkState(msTable_ != null && msTable_.getParameters() != null);
- if (MetastoreShim.getMajorVersion() > 2 &&
- AcidUtils.isTransactionalTable(msTable_.getParameters())) {
- validWriteIds_ = fetchValidWriteIds(client);
- } else {
- validWriteIds_ = null;
- }
- LOG.debug("Load Valid Write Id List Done. Time taken: " +
- PrintUtils.printTimeNs(sw.elapsed(TimeUnit.NANOSECONDS)));
- }
-
/**
* Creates a table of the appropriate type based on the given hive.metastore.api.Table
* object.
@@ -502,8 +452,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
storageMetadataLoadTime_ = thriftTable.getStorage_metadata_load_time_ns();
storedInImpaladCatalogCache_ = true;
- validWriteIds_ = thriftTable.isSetValid_write_ids() ?
- thriftTable.getValid_write_ids() : null;
}
/**
@@ -554,9 +502,6 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
table.setMetastore_table(getMetaStoreTable());
table.setTable_stats(tableStats_);
- if (validWriteIds_ != null) {
- table.setValid_write_ids(validWriteIds_);
- }
return table;
}
@@ -632,7 +577,12 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
}
resp.table_info.setColumn_stats(statsList);
}
-
+ if (getMetaStoreTable() != null &&
+ AcidUtils.isTransactionalTable(getMetaStoreTable().getParameters())) {
+ Preconditions.checkState(getValidWriteIds() != null);
+ resp.table_info.setValid_write_ids(
+ MetastoreShim.convertToTValidWriteIdList(getValidWriteIds()));
+ }
return resp;
}
/**
@@ -845,7 +795,7 @@ public abstract class Table extends CatalogObjectImpl implements FeTable {
}
@Override
- public String getValidWriteIds() {
- return validWriteIds_;
+ public ValidWriteIdList getValidWriteIds() {
+ return null;
}
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
index 2c8ece434..c66ccbadc 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/CatalogdMetaProvider.java
@@ -81,6 +81,7 @@ import org.apache.impala.thrift.TUniqueId;
import org.apache.impala.thrift.TUnit;
import org.apache.impala.thrift.TUpdateCatalogCacheRequest;
import org.apache.impala.thrift.TUpdateCatalogCacheResponse;
+import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.TByteBuffer;
import org.apache.thrift.TDeserializer;
@@ -703,7 +704,8 @@ public class CatalogdMetaProvider implements MetaProvider {
new ArrayList<>() : resp.table_info.sql_constraints.getForeign_keys();
return new TableMetaRefImpl(
dbName, tableName, resp.table_info.hms_table, resp.object_version_number,
- new SqlConstraints(primaryKeys, foreignKeys));
+ new SqlConstraints(primaryKeys, foreignKeys),
+ resp.table_info.valid_write_ids);
}
});
return Pair.create(ref.msTable_, (TableMetaRef)ref);
@@ -1409,13 +1411,20 @@ public class CatalogdMetaProvider implements MetaProvider {
*/
private final long catalogVersion_;
+ /**
+ * Valid write id list of ACID tables.
+ */
+ private final TValidWriteIdList validWriteIds_;
+
public TableMetaRefImpl(String dbName, String tableName,
- Table msTable, long catalogVersion, SqlConstraints sqlConstraints) {
+ Table msTable, long catalogVersion, SqlConstraints sqlConstraints,
+ TValidWriteIdList validWriteIds) {
this.dbName_ = dbName;
this.tableName_ = tableName;
this.msTable_ = msTable;
this.catalogVersion_ = catalogVersion;
this.sqlConstraints_ = sqlConstraints;
+ this.validWriteIds_ = validWriteIds;
}
@Override
@@ -1640,4 +1649,11 @@ public class CatalogdMetaProvider implements MetaProvider {
return (int)size;
}
}
+
+ @Override
+ public TValidWriteIdList getValidWriteIdList(TableMetaRef ref) {
+ Preconditions.checkArgument(ref instanceof TableMetaRefImpl,
+ "table ref %s was not created by CatalogdMetaProvider", ref);
+ return ((TableMetaRefImpl)ref).validWriteIds_;
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
index 849aa5274..98527a7e4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/DirectMetaProvider.java
@@ -24,6 +24,7 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
+import org.apache.commons.lang.NotImplementedException;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Database;
@@ -46,6 +47,7 @@ import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.service.BackendConfig;
import org.apache.impala.thrift.TBackendGflags;
import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.ListMap;
import org.apache.impala.util.MetaStoreUtil;
import org.apache.thrift.TException;
@@ -397,4 +399,10 @@ class DirectMetaProvider implements MetaProvider {
return msTable_.getPartitionKeysSize() != 0;
}
}
+
+ @Override
+ public TValidWriteIdList getValidWriteIdList(TableMetaRef ref) {
+ throw new NotImplementedException(
+ "getValidWriteIdList() is not implemented for DirectMetaProvider");
+ }
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
index 8321a2859..036cce3d4 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalFsTable.java
@@ -59,6 +59,7 @@ import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.thrift.TResultSet;
import org.apache.impala.thrift.TTableDescriptor;
import org.apache.impala.thrift.TTableType;
+import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.AcidUtils;
import org.apache.impala.util.AvroSchemaConverter;
import org.apache.impala.util.AvroSchemaUtils;
@@ -327,6 +328,12 @@ public class LocalFsTable extends LocalTable implements FeFsTable {
TTableDescriptor tableDesc = new TTableDescriptor(tableId, TTableType.HDFS_TABLE,
FeCatalogUtils.getTColumnDescriptors(this),
getNumClusteringCols(), name_, db_.getName());
+ // 'ref_' can be null when this table is the target of a CTAS statement.
+ if (ref_ != null) {
+ TValidWriteIdList validWriteIdList =
+ db_.getCatalog().getMetaProvider().getValidWriteIdList(ref_);
+ if (validWriteIdList != null) hdfsTable.setValid_write_ids(validWriteIdList);
+ }
tableDesc.setHdfsTable(hdfsTable);
return tableDesc;
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
index 6a2891b8f..6c22898ec 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/LocalTable.java
@@ -22,6 +22,7 @@ import java.util.List;
import javax.annotation.Nullable;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.TableType;
import org.apache.hadoop.hive.metastore.api.ColumnStatisticsObj;
import org.apache.hadoop.hive.metastore.api.Table;
@@ -276,7 +277,7 @@ abstract class LocalTable implements FeTable {
}
@Override
- public String getValidWriteIds() {
+ public ValidWriteIdList getValidWriteIds() {
return null;
}
diff --git a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
index 6b79567b6..6a6022652 100644
--- a/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
+++ b/fe/src/main/java/org/apache/impala/catalog/local/MetaProvider.java
@@ -33,6 +33,7 @@ import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
import org.apache.impala.catalog.SqlConstraints;
import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TNetworkAddress;
+import org.apache.impala.thrift.TValidWriteIdList;
import org.apache.impala.util.ListMap;
import org.apache.thrift.TException;
@@ -144,4 +145,6 @@ public interface MetaProvider {
byte[] getPartitionStats();
boolean hasIncrementalStats();
}
+
+ public TValidWriteIdList getValidWriteIdList(TableMetaRef ref);
}
diff --git a/fe/src/main/java/org/apache/impala/util/AcidUtils.java b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
index 075383e6b..e31bac7b3 100644
--- a/fe/src/main/java/org/apache/impala/util/AcidUtils.java
+++ b/fe/src/main/java/org/apache/impala/util/AcidUtils.java
@@ -31,13 +31,18 @@ import org.apache.impala.catalog.ScalarType;
import org.apache.impala.catalog.StructField;
import org.apache.impala.catalog.StructType;
import org.apache.impala.common.FileSystemUtil;
-import org.apache.impala.thrift.TQueryOptions;
+import org.apache.impala.common.Pair;
import org.apache.impala.thrift.TTransactionalType;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import java.util.ArrayList;
+import java.util.Comparator;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.List;
import java.util.Map;
+import java.util.Set;
import java.util.function.Predicate;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
@@ -52,6 +57,7 @@ import javax.annotation.Nullable;
*
*/
public class AcidUtils {
+ private final static Logger LOG = LoggerFactory.getLogger(AcidUtils.class);
// Constant also defined in TransactionalValidationListener
public static final String INSERTONLY_TRANSACTIONAL_PROPERTY = "insert_only";
// Constant also defined in hive_metastoreConstants
@@ -85,7 +91,8 @@ public class AcidUtils {
"delta_" +
"(?\\d+)_" +
"(?\\d+)" +
- "(?:_(?\\d+)|_v(?\\d+))?" +
+ // Statement id, or visiblityTxnId
+ "(?:_(?\\d+)|_v(?\\d+))?" +
// Optional path suffix.
"(?:/.*)?";
@@ -181,9 +188,10 @@ public class AcidUtils {
} else {
ParsedDelta pd = parseDelta(dirPath);
if (pd != null) {
+ if (!isTxnValid(pd.visibilityTxnId)) return false;
ValidWriteIdList.RangeResponse rr =
writeIdList.isWriteIdRangeValid(pd.minWriteId, pd.maxWriteId);
- return rr.equals(ValidWriteIdList.RangeResponse.ALL);
+ return rr != ValidWriteIdList.RangeResponse.NONE;
}
}
// If it wasn't in a base or a delta directory, we should include it.
@@ -232,15 +240,17 @@ public class AcidUtils {
private static final class ParsedDelta {
final long minWriteId;
final long maxWriteId;
- /**
- * Negative value indicates there was no statement id.
- */
+ /// Value -1 means there is no statement id.
final long statementId;
+ /// Value -1 means there is no visibility txn id.
+ final long visibilityTxnId;
- ParsedDelta(long minWriteId, long maxWriteId, long statementId) {
+ ParsedDelta(long minWriteId, long maxWriteId, long statementId,
+ long visibilityTxnId) {
this.minWriteId = minWriteId;
this.maxWriteId = maxWriteId;
this.statementId = statementId;
+ this.visibilityTxnId = visibilityTxnId;
}
}
@@ -250,9 +260,12 @@ public class AcidUtils {
}
long minWriteId = Long.valueOf(deltaMatcher.group("minWriteId"));
long maxWriteId = Long.valueOf(deltaMatcher.group("maxWriteId"));
- String statementIdStr = deltaMatcher.group("optionalStatementId");
+ String statementIdStr = deltaMatcher.group("statementId");
long statementId = statementIdStr != null ? Long.valueOf(statementIdStr) : -1;
- return new ParsedDelta(minWriteId, maxWriteId, statementId);
+ String visibilityTxnIdStr = deltaMatcher.group("visibilityTxnId");
+ long visibilityTxnId = visibilityTxnIdStr != null ?
+ Long.valueOf(visibilityTxnIdStr) : -1;
+ return new ParsedDelta(minWriteId, maxWriteId, statementId, visibilityTxnId);
}
private static ParsedDelta parseDelta(String dirPath) {
@@ -263,6 +276,15 @@ public class AcidUtils {
return matcherToParsedDelta(DELETE_DELTA_PATTERN.matcher(dirPath));
}
+ private static String getFirstDirName(String relPath) {
+ int slashIdx = relPath.indexOf("/");
+ if (slashIdx != -1) {
+ return relPath.substring(0, slashIdx);
+ } else {
+ return null;
+ }
+ }
+
/**
* Filters the files based on Acid state.
* @param stats the FileStatuses obtained from recursively listing the directory
@@ -278,13 +300,13 @@ public class AcidUtils {
public static List filterFilesForAcidState(List stats,
Path baseDir, ValidTxnList validTxnList, ValidWriteIdList writeIds,
@Nullable LoadStats loadStats) throws MetaException {
- List validStats = new ArrayList<>(stats);
-
// First filter out any paths that are not considered valid write IDs.
- // At the same time, calculate the max valid base write ID.
+ // At the same time, calculate the max valid base write ID and collect the names of
+ // the delta directories.
Predicate pred = new WriteListBasedPredicate(validTxnList, writeIds);
long maxBaseWriteId = Long.MIN_VALUE;
- for (Iterator it = validStats.iterator(); it.hasNext(); ) {
+ Set deltaDirNames = new HashSet<>();
+ for (Iterator it = stats.iterator(); it.hasNext();) {
FileStatus stat = it.next();
String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
if (!pred.test(relPath)) {
@@ -292,13 +314,30 @@ public class AcidUtils {
if (loadStats != null) loadStats.uncommittedAcidFilesSkipped++;
continue;
}
-
maxBaseWriteId = Math.max(getBaseWriteId(relPath), maxBaseWriteId);
+ String dirName = getFirstDirName(relPath);
+ if (dirName != null && (dirName.startsWith("delta_") ||
+ dirName.startsWith("delete_delta_"))) {
+ deltaDirNames.add(dirName);
+ }
}
+ // Get a list of all valid delta directories.
+ List> deltas =
+ getValidDeltaDirsOrdered(deltaDirNames, maxBaseWriteId, writeIds);
+ // Filter out delta directories superceded by major/minor compactions.
+ Set filteredDeltaDirs =
+ getFilteredDeltaDirs(deltas, maxBaseWriteId, writeIds);
+ // Filter out any files that are superceded by the latest valid base or not located
+ // in 'filteredDeltaDirs'.
+ return filterFilesForAcidState(stats, baseDir, maxBaseWriteId, filteredDeltaDirs,
+ loadStats);
+ }
- // Filter out any files that are superceded by the latest valid base,
- // as well as any directories.
- for (Iterator it = validStats.iterator(); it.hasNext(); ) {
+ private static List filterFilesForAcidState(List stats,
+ Path baseDir, long maxBaseWriteId, Set deltaDirs,
+ @Nullable LoadStats loadStats) {
+ List validStats = new ArrayList<>(stats);
+ for (Iterator it = validStats.iterator(); it.hasNext();) {
FileStatus stat = it.next();
if (stat.isDirectory()) {
@@ -307,39 +346,26 @@ public class AcidUtils {
}
String relPath = FileSystemUtil.relativizePath(stat.getPath(), baseDir);
+ if (relPath.startsWith("delta_") ||
+ relPath.startsWith("delete_delta_")) {
+ String dirName = getFirstDirName(relPath);
+ if (dirName != null && !deltaDirs.contains(dirName)) {
+ it.remove();
+ if (loadStats != null) loadStats.filesSupersededByAcidState++;
+ }
+ continue;
+ }
long baseWriteId = getBaseWriteId(relPath);
if (baseWriteId != SENTINEL_BASE_WRITE_ID) {
if (baseWriteId < maxBaseWriteId) {
it.remove();
- if (loadStats != null) loadStats.filesSupercededByNewerBase++;
+ if (loadStats != null) loadStats.filesSupersededByAcidState++;
}
continue;
}
- ParsedDelta parsedDelta = parseDelta(relPath);
- if (parsedDelta != null) {
- if (parsedDelta.minWriteId <= maxBaseWriteId) {
- it.remove();
- if (loadStats != null) loadStats.filesSupercededByNewerBase++;
- } else if (parsedDelta.minWriteId != parsedDelta.maxWriteId) {
- // TODO(IMPALA-9512): Validate rows in minor compacted deltas.
- // We could read the non-compacted delta directories, but we'd need to check
- // that all of them still exists. Let's throw an error on minor compacted tables
- // for now since we want to read minor compacted deltas in the near future.
- throw new MetaException("Table is minor compacted which is not supported " +
- "by Impala. Run major compaction to resolve this.");
- }
- continue;
- }
- ParsedDelta deleteDelta = parseDeleteDelta(relPath);
- if (deleteDelta != null) {
- if (deleteDelta.maxWriteId > maxBaseWriteId) {
- throw new MetaException("Table has deleted rows. It's currently not " +
- "supported by Impala. Run major compaction to resolve this.");
- }
- }
- // Not in a base or a delta directory. In that case, it's probably a post-upgrade
- // file.
+ // Not in a base or a delta directory. In that case, it's probably a
+ // post-upgrade file.
// If there is no valid base: we should read the file (assuming that
// hive.mm.allow.originals == true)
// If there is a valid base: the file should be merged to the base by the
@@ -349,4 +375,141 @@ public class AcidUtils {
}
return validStats;
}
+
+ private static List> getValidDeltaDirsOrdered(
+ Set deltaDirNames, long baseWriteId, ValidWriteIdList writeIds)
+ throws MetaException {
+ List > deltas = new ArrayList<>();
+ for (Iterator it = deltaDirNames.iterator(); it.hasNext();) {
+ String dirname = it.next();
+ ParsedDelta parsedDelta = parseDelta(dirname);
+ if (parsedDelta != null) {
+ if (parsedDelta.minWriteId <= baseWriteId) {
+ Preconditions.checkState(parsedDelta.maxWriteId <= baseWriteId);
+ it.remove();
+ continue;
+ }
+ deltas.add(new Pair(dirname, parsedDelta));
+ continue;
+ }
+ ParsedDelta deleteDelta = parseDeleteDelta(dirname);
+ if (deleteDelta != null) {
+ if (deleteDelta.maxWriteId > baseWriteId) {
+ throw new MetaException("Table has deleted rows. It's currently not "
+ + "supported by Impala. Run major compaction to resolve this.");
+ }
+ }
+ }
+
+ deltas.sort(new Comparator>() {
+ // This compare method is based on Hive (d6ad73c3615)
+ // AcidUtils.ParsedDeltaLight.compareTo()
+ // One additon to it is to take the visbilityTxnId into consideration. Hence if
+ // there's delta_N_M and delta_N_M_v001234 then delta_N_M_v001234 must be ordered
+ // before.
+ @Override
+ public int compare(Pair o1, Pair o2) {
+ ParsedDelta pd1 = o1.second;
+ ParsedDelta pd2 = o2.second;
+ if (pd1.minWriteId != pd2.minWriteId) {
+ if (pd1.minWriteId < pd2.minWriteId) {
+ return -1;
+ } else {
+ return 1;
+ }
+ } else if (pd1.maxWriteId != pd2.maxWriteId) {
+ if (pd1.maxWriteId < pd2.maxWriteId) {
+ return 1;
+ } else {
+ return -1;
+ }
+ } else if (pd1.statementId != pd2.statementId) {
+ /**
+ * We want deltas after minor compaction (w/o statementId) to sort earlier so
+ * that getAcidState() considers compacted files (into larger ones) obsolete
+ * Before compaction, include deltas with all statementIds for a given writeId.
+ */
+ if (pd1.statementId < pd2.statementId) {
+ return -1;
+ } else {
+ return 1;
+ }
+ } else if (pd1.visibilityTxnId != pd2.visibilityTxnId) {
+ // This is an alteration from Hive's algorithm. If everything is the same then
+ // the higher visibilityTxnId wins (since no visibiltyTxnId is -1).
+ // Currently this cannot happen since Hive doesn't minor compact standalone
+ // delta directories of streaming ingestion, i.e. the following cannot happen:
+ // delta_1_5 => delta_1_5_v01234
+ // However, it'd make sense because streaming ingested ORC files doesn't use
+ // advanced features like dictionary encoding or statistics. Hence Hive might
+ // do that in the future and that'd make Impala seeing duplicate rows.
+ // So I'd be cautious here in case they forget to tell us.
+ if (pd1.visibilityTxnId < pd2.visibilityTxnId) {
+ return 1;
+ } else {
+ return -1;
+ }
+ } else {
+ return o1.first.compareTo(o2.first);
+ }
+ }
+ });
+ return deltas;
+ }
+
+ /**
+ * The algorithm is copied from Hive's (d6ad73c3615)
+ * org.apache.hadoop.hive.ql.io.AcidUtils.getAcidState()
+ * One additon to it is to take the visbilityTxnId into consideration. Hence if
+ * there's delta_N_M_v001234 and delta_N_M then it ignores delta_N_M.
+ */
+ private static Set getFilteredDeltaDirs(List> deltas,
+ long baseWriteId, ValidWriteIdList writeIds) {
+ long current = baseWriteId;
+ long lastStmtId = -1;
+ ParsedDelta prev = null;
+ Set filteredDeltaDirs = new HashSet<>();
+ for (Pair pathDelta : deltas) {
+ ParsedDelta next = pathDelta.second;
+ if (next.maxWriteId > current) {
+ // are any of the new transactions ones that we care about?
+ if (writeIds.isWriteIdRangeValid(current + 1, next.maxWriteId) !=
+ ValidWriteIdList.RangeResponse.NONE) {
+ filteredDeltaDirs.add(pathDelta.first);
+ current = next.maxWriteId;
+ lastStmtId = next.statementId;
+ prev = next;
+ }
+ } else if (next.maxWriteId == current && lastStmtId >= 0) {
+ // make sure to get all deltas within a single transaction; multi-statement txn
+ // generate multiple delta files with the same txnId range
+ // of course, if maxWriteId has already been minor compacted, all per statement
+ // deltas are obsolete
+ filteredDeltaDirs.add(pathDelta.first);
+ prev = next;
+ } else if (prev != null && next.maxWriteId == prev.maxWriteId &&
+ next.minWriteId == prev.minWriteId &&
+ next.statementId == prev.statementId &&
+ // If visibilityTxnId differs, then 'pathDelta' is probably a streaming ingested
+ // delta directory and 'prev' is the compacted version of it.
+ next.visibilityTxnId == prev.visibilityTxnId) {
+ // The 'next' parsedDelta may have everything equal to the 'prev' parsedDelta,
+ // except
+ // the path. This may happen when we have split update and we have two types of
+ // delta
+ // directories- 'delta_x_y' and 'delete_delta_x_y' for the SAME txn range.
+
+ // Also note that any delete_deltas in between a given delta_x_y range would be
+ // made
+ // obsolete. For example, a delta_30_50 would make delete_delta_40_40 obsolete.
+ // This is valid because minor compaction always compacts the normal deltas and
+ // the delete deltas for the same range. That is, if we had 3 directories,
+ // delta_30_30, delete_delta_40_40 and delta_50_50, then running minor compaction
+ // would produce delta_30_50 and delete_delta_30_50.
+ filteredDeltaDirs.add(pathDelta.first);
+ prev = next;
+ }
+ }
+ return filteredDeltaDirs;
+ }
}
diff --git a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
index 4632de0c0..730a02ec1 100644
--- a/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/analysis/StmtMetadataLoaderTest.java
@@ -97,8 +97,7 @@ public class StmtMetadataLoaderTest {
for (FeTable t: stmtTableCache.tables.values()) {
Assert.assertTrue(t.isLoaded());
Assert.assertTrue(t.getValidWriteIds() != null);
- Assert.assertTrue(MetastoreShim.getValidWriteIdListFromString(t.getValidWriteIds())
- .isWriteIdValid(t.getWriteId()));
+ Assert.assertTrue(t.getValidWriteIds().isWriteIdValid(t.getWriteId()));
}
}
diff --git a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
index 19b36609c..b6fe6501b 100644
--- a/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
+++ b/fe/src/test/java/org/apache/impala/catalog/FileMetadataLoaderTest.java
@@ -27,8 +27,11 @@ import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.FileUtil;
import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.common.ValidReadTxnList;
+import org.apache.hadoop.hive.common.ValidWriteIdList;
import org.apache.hadoop.hive.metastore.api.MetaException;
import org.apache.impala.catalog.HdfsPartition.FileDescriptor;
+import org.apache.impala.compat.MetastoreShim;
import org.apache.impala.thrift.TNetworkAddress;
import org.apache.impala.util.ListMap;
import org.junit.Test;
@@ -108,6 +111,24 @@ public class FileMetadataLoaderTest {
relPaths.get(2));
}
+ @Test
+ public void testAcidMinorCompactionLoading() throws IOException, MetaException {
+ //TODO(IMPALA-9042): Remove "throws MetaException"
+ ListMap hostIndex = new ListMap<>();
+ ValidWriteIdList writeIds = MetastoreShim.getValidWriteIdListFromString(
+ "functional_orc_def.complextypestbl_minor_compacted:10:10::");
+ Path tablePath = new Path("hdfs://localhost:20500/test-warehouse/managed/" +
+ "complextypestbl_minor_compacted_orc_def/");
+ FileMetadataLoader fml = new FileMetadataLoader(tablePath, /* recursive=*/true,
+ /* oldFds = */ Collections.emptyList(), hostIndex, new ValidReadTxnList(""),
+ writeIds, HdfsFileFormat.ORC);
+ fml.load();
+ // Only load the compacted file.
+ assertEquals(1, fml.getStats().loadedFiles);
+ // 2 * 8 files since the hidden '_orc_acid_version' is filtered out later.
+ assertEquals(16, fml.getStats().filesSupersededByAcidState);
+ }
+
@Test
public void testLoadMissingDirectory() throws IOException, MetaException {
//TODO(IMPALA-9042): Remove "throws MetaException"
diff --git a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
index 07620b202..65986871c 100644
--- a/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
+++ b/fe/src/test/java/org/apache/impala/util/AcidUtilsTest.java
@@ -36,9 +36,11 @@ import org.apache.impala.compat.MetastoreShim;
import org.hamcrest.Matchers;
import org.junit.Assume;
import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AcidUtilsTest {
-
+ private static final Logger LOG = LoggerFactory.getLogger(AcidUtilsTest.class);
/** Fake base path to root all FileStatuses under. */
private static final Path BASE_PATH = new Path("file:///foo/bar/");
@@ -189,6 +191,36 @@ public class AcidUtilsTest {
"delta_0000009_0000009_0000/0000/def.txt"});
}
+ public void testStreamingIngest() {
+ assertFiltering(new String[]{
+ "delta_0000001_0000005/bucket_0000",
+ "delta_0000001_0000005/bucket_0001",
+ "delta_0000001_0000005/bucket_0002",
+ "delta_0000001_0000005_v01000/bucket_0000",
+ "delta_0000001_0000005_v01000/bucket_0001",
+ "delta_0000001_0000005_v01000/bucket_0002"},
+ "1100:1000:1000:", // txn 1000 is open
+ "default.test:10:10::", // write ids are committed
+ new String[]{
+ "delta_0000001_0000005/bucket_0000",
+ "delta_0000001_0000005/bucket_0001",
+ "delta_0000001_0000005/bucket_0002"});
+
+ assertFiltering(new String[]{
+ "delta_0000001_0000005/bucket_0000",
+ "delta_0000001_0000005/bucket_0001",
+ "delta_0000001_0000005/bucket_0002",
+ "delta_0000001_0000005_v01000/bucket_0000",
+ "delta_0000001_0000005_v01000/bucket_0001",
+ "delta_0000001_0000005_v01000/bucket_0002"},
+ "1100:::", // txn 1000 is committed
+ "default.test:10:10::", // write ids are committed
+ new String[]{
+ "delta_0000001_0000005_v01000/bucket_0000",
+ "delta_0000001_0000005_v01000/bucket_0001",
+ "delta_0000001_0000005_v01000/bucket_0002"});
+ }
+
@Test
public void testAbortedCompaction() {
assertFiltering(new String[]{
@@ -272,14 +304,14 @@ public class AcidUtilsTest {
"delta_000006_0000020/",
"delta_000006_0000020/def.txt",
"delta_000005.txt"};
-
- // Only committed up to transaction 10, so skip the 6-20 delta.
+ // Only committed up to write id 10, so we can select 6-20 delta.
assertFiltering(paths,
"default.test:10:1234:1,2,3",
new String[]{
"delta_000005_0000005/abc.txt",
- "delta_000005_0000005_0000/abc.txt",
- "delta_000005.txt"});
+ "delta_000006_0000020/def.txt",
+ "delta_000005.txt",
+ });
}
@Test
@@ -339,37 +371,125 @@ public class AcidUtilsTest {
}
@Test
- public void testMinorCompactionFail() {
- filteringError(new String[]{
+ public void testMinorCompactionAllTxnsValid() {
+ assertFiltering(new String[]{
"base_0000005/",
"base_0000005/abc.txt",
- "delta_0000006_0000007/",
- "delta_0000006_0000007/00000"},
- // all txns are valid
- "",
+ "delta_0000006_0000006_v01000/00000",
+ "delta_0000007_0000007_v01000/00000",
+ "delta_0000006_0000007_v01000/00000"},
+ "", // all txns are valid
// ::::
"default.test:10:1234:1,2,3",
- "Table is minor compacted");
- filteringError(new String[]{
+ new String[]{
+ "base_0000005/abc.txt",
+ "delta_0000006_0000007_v01000/00000"});
+ // Minor compact multi-statement transaction
+ assertFiltering(new String[]{
"base_0000005/",
"base_0000005/abc.txt",
- "delta_0000006_0000007_00123/",
- "delta_0000006_0000007_00123/00000"},
- // all txns are valid
- "",
+ "delta_0000006_0000006_00001/00000", // statement id 1
+ "delta_0000006_0000006_00002/00000", // statement id 2
+ "delta_0000006_0000006_00003/00000", // statement id 3
+ "delta_0000006_0000006_v01000/00000", // compacted
+ "delta_0000006_0000006_v01000/00001"},
+ "", // all txns are valid
+ // ::::
+ "default.test:10:1234:",
+ new String[]{
+ "base_0000005/abc.txt",
+ "delta_0000006_0000006_v01000/00000",
+ "delta_0000006_0000006_v01000/00001"});
+ // Disjunct minor compacted delta dirs
+ assertFiltering(new String[]{
+ "delta_0000001_0000001/00000",
+ "delta_0000002_0000002/00000",
+ "delta_0000003_0000003/00000",
+ "delta_0000004_0000004/00000",
+ "delta_0000005_0000005/00000",
+ "delta_0000006_0000006/00000",
+ "delta_0000007_0000007/00000",
+ "delta_0000001_0000003_v00100/00000",
+ "delta_0000004_0000005_v00101/00000",
+ "delta_0000001_0000005_v00102/00000",
+ "delta_0000006_0000007_v00123/00000",
+ "delta_0000006_0000007_v00123/00001"},
+ "", // all txns are valid
+ // ::::
+ "default.test:10:1234:",
+ new String[]{
+ "delta_0000001_0000005_v00102/00000",
+ "delta_0000006_0000007_v00123/00000",
+ "delta_0000006_0000007_v00123/00001"});
+ // Compacted delta range contains aborted write id
+ assertFiltering(new String[]{
+ "delta_0000001_0000001/00000",
+ "delta_0000002_0000002/00000",
+ "delta_0000003_0000003/00000",
+ "delta_0000001_0000003_v01000/00000"},
+ "", // all txns are valid
+ // ::::
+ "default.test:10:5::2",
+ new String[]{"delta_0000001_0000003_v01000/00000"});
+ }
+
+ @Test
+ public void testInProgressMinorCompactions() {
+ assertFiltering(new String[]{
+ "base_0000005/",
+ "base_0000005/abc.txt",
+ "delta_0000006_0000006/00000",
+ "delta_0000007_0000007/00000",
+ "delta_0000006_0000007_v100/00000"},
+ // Txns valid up to id 90, so 100 is invalid
+ "90:90::",
// ::::
"default.test:10:1234:1,2,3",
- "Table is minor compacted");
- filteringError(new String[]{
+ new String[]{
+ "base_0000005/abc.txt",
+ "delta_0000006_0000006/00000",
+ "delta_0000007_0000007/00000"});
+ // Minor compact multi-statement transaction
+ assertFiltering(new String[]{
"base_0000005/",
"base_0000005/abc.txt",
- "delta_0000006_0000007_v00123/",
- "delta_0000006_0000007_v00123/00000"},
- // all txns are valid
- "",
+ "delta_0000006_0000006_00001/00000", // statement id 1
+ "delta_0000006_0000006_00002/00000", // statement id 2
+ "delta_0000006_0000006_00003/00000", // statement id 3
+ "delta_0000006_0000006_v100/00000", // no statement id => compacted
+ "delta_0000006_0000006_v100/00001"},
+ // Txn 100 is invalid
+ "110:100:100:",
// ::::
- "default.test:10:1234:1,2,3",
- "Table is minor compacted");
+ "default.test:10:1234:",
+ new String[]{
+ "base_0000005/abc.txt",
+ "delta_0000006_0000006_00001/00000",
+ "delta_0000006_0000006_00002/00000",
+ "delta_0000006_0000006_00003/00000"});
+ // Disjunct minor compacted delta dirs
+ assertFiltering(new String[]{
+ "delta_0000001_0000001/00000",
+ "delta_0000002_0000002/00000",
+ "delta_0000003_0000003/00000",
+ "delta_0000004_0000004/00000",
+ "delta_0000005_0000005/00000",
+ "delta_0000006_0000006/00000",
+ "delta_0000007_0000007/00000",
+ "delta_0000001_0000003_v00100/00000",
+ "delta_0000004_0000005_v00101/00000",
+ "delta_0000001_0000005_v00102/00000",
+ "delta_0000006_0000007_v00123/00000",
+ "delta_0000006_0000007_v00123/00001"},
+ // Txn 102 is invalid (minor compaction 1-5)
+ "130:102:102:",
+ // ::::
+ "default.test:10:1234:",
+ new String[]{
+ "delta_0000001_0000003_v00100/00000",
+ "delta_0000004_0000005_v00101/00000",
+ "delta_0000006_0000007_v00123/00000",
+ "delta_0000006_0000007_v00123/00001"});
}
@Test
@@ -400,7 +520,6 @@ public class AcidUtilsTest {
// ::
"default.test:20:15::",
new String[]{
- // No minor compactions after base directory so it should succeed.
"base_000010/0000_0",
"delta_0000012_0000012_0000/0000_0",
"delta_0000012_0000012_0000/0000_1"});
diff --git a/testdata/bin/generate-schema-statements.py b/testdata/bin/generate-schema-statements.py
index 590445c70..503c74932 100755
--- a/testdata/bin/generate-schema-statements.py
+++ b/testdata/bin/generate-schema-statements.py
@@ -654,7 +654,7 @@ def generate_statements(output_name, test_vectors, sections,
create_file_format = file_format
create_codec = codec
if not (section['LOAD'] or section['DEPENDENT_LOAD']
- or section['DEPENDENT_LOAD_HIVE']):
+ or section['DEPENDENT_LOAD_HIVE'] or section['DEPENDENT_LOAD_ACID']):
create_codec = 'none'
create_file_format = file_format
if file_format not in IMPALA_SUPPORTED_INSERT_FORMATS:
diff --git a/testdata/data/README b/testdata/data/README
index 7dca8b0db..0dee50999 100644
--- a/testdata/data/README
+++ b/testdata/data/README
@@ -494,3 +494,8 @@ the same indexing hash but a newer version depends on the time of writing.
`ca51fa17-681b-4497-85b7-4f68e7a63ee7-0_1-38-282_20200112194529.parquet`
If the impala table was refreshed after this file was written, impala will
only query on the file with latest version.
+
+streaming.orc:
+ORC file generated by Hive Streaming Ingestion. I used a slightly altered version of
+TestStreaming.testNoBuckets() from Hive 3.1 to generate this file. It contains
+values coming from two transactions. The file has two stripes (one per transaction).
diff --git a/testdata/data/streaming.orc b/testdata/data/streaming.orc
new file mode 100644
index 000000000..3594ca952
Binary files /dev/null and b/testdata/data/streaming.orc differ
diff --git a/testdata/datasets/functional/functional_schema_template.sql b/testdata/datasets/functional/functional_schema_template.sql
index 3f927c047..dc53371de 100644
--- a/testdata/datasets/functional/functional_schema_template.sql
+++ b/testdata/datasets/functional/functional_schema_template.sql
@@ -711,6 +711,30 @@ INSERT OVERWRITE TABLE {db_name}{db_suffix}.{table_name} SELECT * FROM functiona
---- DATASET
functional
---- BASE_TABLE_NAME
+complextypestbl_minor_compacted
+---- COLUMNS
+id bigint
+int_array array
+int_array_array array>
+int_map map
+int_map_array array