diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc index a89384e43..9c5998b74 100644 --- a/be/src/exec/hbase-table-sink.cc +++ b/be/src/exec/hbase-table-sink.cc @@ -89,7 +89,7 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) { ExprContext::FreeLocalAllocations(output_expr_ctxs_); RETURN_IF_ERROR(state->CheckQueryState()); // Since everything is set up just forward everything to the writer. - RETURN_IF_ERROR(hbase_table_writer_->AppendRowBatch(batch)); + RETURN_IF_ERROR(hbase_table_writer_->AppendRows(batch)); (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows += batch->num_rows(); return Status::OK(); diff --git a/be/src/exec/hbase-table-writer.cc b/be/src/exec/hbase-table-writer.cc index 41ea184b1..7574adc1f 100644 --- a/be/src/exec/hbase-table-writer.cc +++ b/be/src/exec/hbase-table-writer.cc @@ -114,7 +114,7 @@ Status HBaseTableWriter::InitJNI() { return Status::OK(); } -Status HBaseTableWriter::AppendRowBatch(RowBatch* batch) { +Status HBaseTableWriter::AppendRows(RowBatch* batch) { JNIEnv* env = getJNIEnv(); if (env == NULL) return Status("Error getting JNIEnv."); @@ -270,9 +270,9 @@ void HBaseTableWriter::Close(RuntimeState* state) { table_.reset(); } - // The jni should already have everything cleaned at this point - // but try again just in case there was an error that caused - // AppendRowBatch to exit out before calling CleanUpJni. + // The jni should already have everything cleaned at this point but try again just in + // case there was an error that caused AppendRows() to exit out before calling + // CleanUpJni. Status status = CleanUpJni(); if (!status.ok()) { stringstream ss; diff --git a/be/src/exec/hbase-table-writer.h b/be/src/exec/hbase-table-writer.h index 94947c928..bc414ab64 100644 --- a/be/src/exec/hbase-table-writer.h +++ b/be/src/exec/hbase-table-writer.h @@ -41,13 +41,13 @@ class RowBatch; /// HBaseTableWriter::InitJni(); /// writer = new HBaseTableWriter(state, table_desc_, output_exprs_); /// writer.Init(state); -/// writer.AppendRowBatch(batch); +/// writer.AppendRows(batch); class HBaseTableWriter { public: HBaseTableWriter(HBaseTableDescriptor* table_desc, const std::vector& output_expr_ctxs, RuntimeProfile* profile); - Status AppendRowBatch(RowBatch* batch); + Status AppendRows(RowBatch* batch); /// Calls to Close release the HBaseTable. void Close(RuntimeState* state); diff --git a/be/src/exec/hdfs-avro-table-writer.cc b/be/src/exec/hdfs-avro-table-writer.cc index 193430430..ec0ee0837 100644 --- a/be/src/exec/hdfs-avro-table-writer.cc +++ b/be/src/exec/hdfs-avro-table-writer.cc @@ -175,8 +175,8 @@ void HdfsAvroTableWriter::Close() { mem_pool_->FreeAll(); } -Status HdfsAvroTableWriter::AppendRowBatch(RowBatch* batch, - const vector& row_group_indices, bool* new_file) { +Status HdfsAvroTableWriter::AppendRows( + RowBatch* batch, const vector& row_group_indices, bool* new_file) { int32_t limit; bool all_rows = row_group_indices.empty(); if (all_rows) { diff --git a/be/src/exec/hdfs-avro-table-writer.h b/be/src/exec/hdfs-avro-table-writer.h index aeff0ed1f..38e820ea6 100644 --- a/be/src/exec/hdfs-avro-table-writer.h +++ b/be/src/exec/hdfs-avro-table-writer.h @@ -75,9 +75,8 @@ class HdfsAvroTableWriter : public HdfsTableWriter { /// Outputs the given rows into an HDFS sequence file. The rows are buffered /// to fill a sequence file block. - virtual Status AppendRowBatch(RowBatch* rows, - const std::vector& row_group_indices, - bool* new_file); + virtual Status AppendRows( + RowBatch* rows, const std::vector& row_group_indices, bool* new_file); private: /// Processes a single row, appending to out_ diff --git a/be/src/exec/hdfs-parquet-table-writer.cc b/be/src/exec/hdfs-parquet-table-writer.cc index cc708ab5c..7ae55b730 100644 --- a/be/src/exec/hdfs-parquet-table-writer.cc +++ b/be/src/exec/hdfs-parquet-table-writer.cc @@ -872,8 +872,8 @@ Status HdfsParquetTableWriter::InitNewFile() { return Status::OK(); } -Status HdfsParquetTableWriter::AppendRowBatch(RowBatch* batch, - const vector& row_group_indices, bool* new_file) { +Status HdfsParquetTableWriter::AppendRows( + RowBatch* batch, const vector& row_group_indices, bool* new_file) { SCOPED_TIMER(parent_->encode_timer()); *new_file = false; int limit; diff --git a/be/src/exec/hdfs-parquet-table-writer.h b/be/src/exec/hdfs-parquet-table-writer.h index 581160df6..4e1670793 100644 --- a/be/src/exec/hdfs-parquet-table-writer.h +++ b/be/src/exec/hdfs-parquet-table-writer.h @@ -67,9 +67,8 @@ class HdfsParquetTableWriter : public HdfsTableWriter { virtual Status InitNewFile(); /// Appends parquet representation of rows in the batch to the current file. - virtual Status AppendRowBatch(RowBatch* batch, - const std::vector& row_group_indices, - bool* new_file); + virtual Status AppendRows( + RowBatch* batch, const std::vector& row_group_indices, bool* new_file); /// Write out all the data. virtual Status Finalize(); diff --git a/be/src/exec/hdfs-sequence-table-writer.cc b/be/src/exec/hdfs-sequence-table-writer.cc index 99910e285..3c522ba7e 100644 --- a/be/src/exec/hdfs-sequence-table-writer.cc +++ b/be/src/exec/hdfs-sequence-table-writer.cc @@ -91,9 +91,8 @@ Status HdfsSequenceTableWriter::Init() { return Status::OK(); } -Status HdfsSequenceTableWriter::AppendRowBatch(RowBatch* batch, - const vector& row_group_indices, - bool* new_file) { +Status HdfsSequenceTableWriter::AppendRows( + RowBatch* batch, const vector& row_group_indices, bool* new_file) { int32_t limit; if (row_group_indices.empty()) { limit = batch->num_rows(); diff --git a/be/src/exec/hdfs-sequence-table-writer.h b/be/src/exec/hdfs-sequence-table-writer.h index c94ab1c75..7f6a8889a 100644 --- a/be/src/exec/hdfs-sequence-table-writer.h +++ b/be/src/exec/hdfs-sequence-table-writer.h @@ -57,9 +57,8 @@ class HdfsSequenceTableWriter : public HdfsTableWriter { /// Outputs the given rows into an HDFS sequence file. The rows are buffered /// to fill a sequence file block. - virtual Status AppendRowBatch(RowBatch* rows, - const std::vector& row_group_indices, - bool* new_file); + virtual Status AppendRows( + RowBatch* rows, const std::vector& row_group_indices, bool* new_file); private: /// processes a single row, delegates to Compress or NoCompress ConsumeRow(). diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 8c74797bc..3a9725c0a 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -56,26 +56,30 @@ const static string& ROOT_PARTITION_KEY = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY; HdfsTableSink::HdfsTableSink(const RowDescriptor& row_desc, - const vector& select_list_texprs, - const TDataSink& tsink) - : DataSink(row_desc), - table_desc_(NULL), - default_partition_(NULL), - current_row_(NULL), - table_id_(tsink.table_sink.target_table_id), - skip_header_line_count_( - tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count - ? tsink.table_sink.hdfs_table_sink.skip_header_line_count : 0), - select_list_texprs_(select_list_texprs), - partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs), - overwrite_(tsink.table_sink.hdfs_table_sink.overwrite) { + const vector& select_list_texprs, const TDataSink& tsink) + : DataSink(row_desc), + table_desc_(nullptr), + default_partition_(nullptr), + table_id_(tsink.table_sink.target_table_id), + skip_header_line_count_( + tsink.table_sink.hdfs_table_sink.__isset.skip_header_line_count ? + tsink.table_sink.hdfs_table_sink.skip_header_line_count : + 0), + select_list_texprs_(select_list_texprs), + partition_key_texprs_(tsink.table_sink.hdfs_table_sink.partition_key_exprs), + overwrite_(tsink.table_sink.hdfs_table_sink.overwrite), + input_is_clustered_(tsink.table_sink.hdfs_table_sink.input_is_clustered), + current_clustered_partition_(nullptr) { DCHECK(tsink.__isset.table_sink); } OutputPartition::OutputPartition() - : hdfs_connection(NULL), tmp_hdfs_file(NULL), num_rows(0), num_files(0), - partition_descriptor(NULL), block_size(0) { -} + : hdfs_connection(nullptr), + tmp_hdfs_file(nullptr), + num_rows(0), + num_files(0), + partition_descriptor(nullptr), + block_size(0) {} Status HdfsTableSink::PrepareExprs(RuntimeState* state) { // Prepare select list expressions. @@ -125,7 +129,7 @@ Status HdfsTableSink::Prepare(RuntimeState* state, MemTracker* parent_mem_tracke table_desc_ = static_cast( state->desc_tbl().GetTableDescriptor(table_id_)); - if (table_desc_ == NULL) { + if (table_desc_ == nullptr) { stringstream error_msg("Failed to get table descriptor for table id: "); error_msg << table_id_; return Status(error_msg.str()); @@ -192,14 +196,17 @@ Status HdfsTableSink::Open(RuntimeState* state) { // partition keys. So only keep a reference to the partition which matches // partition_key_values for constant values, since only that is written to. void* table_partition_key_value = - partition->partition_key_value_ctxs()[i]->GetValue(NULL); - void* target_partition_key_value = partition_key_expr_ctxs_[i]->GetValue(NULL); - if (table_partition_key_value == NULL && target_partition_key_value == NULL) { + partition->partition_key_value_ctxs()[i]->GetValue(nullptr); + void* target_partition_key_value = + partition_key_expr_ctxs_[i]->GetValue(nullptr); + if (table_partition_key_value == nullptr + && target_partition_key_value == nullptr) { continue; } - if (table_partition_key_value == NULL || target_partition_key_value == NULL + if (table_partition_key_value == nullptr + || target_partition_key_value == nullptr || !RawValue::Eq(table_partition_key_value, target_partition_key_value, - partition_key_expr_ctxs_[i]->root()->type())) { + partition_key_expr_ctxs_[i]->root()->type())) { relevant_partition = false; break; } @@ -207,17 +214,16 @@ Status HdfsTableSink::Open(RuntimeState* state) { } if (relevant_partition) { string key; - // It's ok if current_row_ is NULL (which it should be here), since all of these - // expressions are constant, and can therefore be evaluated without a valid row - // context. - GetHashTblKey(dynamic_partition_key_value_ctxs, &key); + // Pass nullptr as row, since all of these expressions are constant, and can + // therefore be evaluated without a valid row context. + GetHashTblKey(nullptr, dynamic_partition_key_value_ctxs, &key); DCHECK(partition_descriptor_map_.find(key) == partition_descriptor_map_.end()) << "Partitions with duplicate 'static' keys found during INSERT"; partition_descriptor_map_[key] = partition; } } } - if (default_partition_ == NULL) { + if (default_partition_ == nullptr) { return Status("No default partition found for HdfsTextTableSink"); } return Status::OK(); @@ -262,6 +268,87 @@ void HdfsTableSink::BuildHdfsFileNames( output_partition->num_files = 0; } +Status HdfsTableSink::WriteRowsToPartition( + RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair) { + // The rows of this batch may span multiple files. We repeatedly pass the row batch to + // the writer until it sets new_file to false, indicating that all rows have been + // written. The writer tracks where it is in the batch when it returns with new_file + // set. + bool new_file; + while (true) { + OutputPartition* output_partition = partition_pair->first; + RETURN_IF_ERROR( + output_partition->writer->AppendRows(batch, partition_pair->second, &new_file)); + if (!new_file) break; + RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition)); + RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition)); + } + partition_pair->second.clear(); + return Status::OK(); +} + +Status HdfsTableSink::WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch) { + DCHECK_GT(batch->num_rows(), 0); + DCHECK(!dynamic_partition_key_expr_ctxs_.empty()); + DCHECK(input_is_clustered_); + + // Initialize the clustered partition and key. + if (current_clustered_partition_ == nullptr) { + const TupleRow* current_row = batch->GetRow(0); + GetHashTblKey( + current_row, dynamic_partition_key_expr_ctxs_, ¤t_clustered_partition_key_); + RETURN_IF_ERROR(GetOutputPartition(state, current_row, + current_clustered_partition_key_, ¤t_clustered_partition_, false)); + } + + // Compare the last row of the batch to the last current partition key. If they match, + // then all the rows in the batch have the same key and can be written as a whole. + string last_row_key; + GetHashTblKey(batch->GetRow(batch->num_rows() - 1), dynamic_partition_key_expr_ctxs_, + &last_row_key); + if (last_row_key == current_clustered_partition_key_) { + DCHECK(current_clustered_partition_->second.empty()); + RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_clustered_partition_)); + return Status::OK(); + } + + // Not all rows in this batch match the previously written partition key, so we process + // them individually. + for (int i = 0; i < batch->num_rows(); ++i) { + const TupleRow* current_row = batch->GetRow(i); + + string key; + GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &key); + + if (current_clustered_partition_key_ != key) { + DCHECK(current_clustered_partition_ != nullptr); + // Done with previous partition - write rows and close. + if (!current_clustered_partition_->second.empty()) { + RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_clustered_partition_)); + current_clustered_partition_->second.clear(); + } + RETURN_IF_ERROR(FinalizePartitionFile(state, current_clustered_partition_->first)); + if (current_clustered_partition_->first->writer.get() != nullptr) { + current_clustered_partition_->first->writer->Close(); + } + partition_keys_to_output_partitions_.erase(current_clustered_partition_key_); + current_clustered_partition_key_ = std::move(key); + RETURN_IF_ERROR(GetOutputPartition(state, current_row, + current_clustered_partition_key_, ¤t_clustered_partition_, false)); + } +#ifdef DEBUG + string debug_row_key; + GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &debug_row_key); + DCHECK_EQ(current_clustered_partition_key_, debug_row_key); +#endif + DCHECK(current_clustered_partition_ != nullptr); + current_clustered_partition_->second.push_back(i); + } + // Write final set of rows to the partition but keep its file open. + RETURN_IF_ERROR(WriteRowsToPartition(state, batch, current_clustered_partition_)); + return Status::OK(); +} + Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition) { SCOPED_TIMER(ADD_TIMER(profile(), "TmpFileCreateTimer")); @@ -295,7 +382,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, tmp_hdfs_file_name_cstr, O_WRONLY, 0, 0, block_size); VLOG_FILE << "hdfsOpenFile() file=" << tmp_hdfs_file_name_cstr; - if (output_partition->tmp_hdfs_file == NULL) { + if (output_partition->tmp_hdfs_file == nullptr) { return Status(GetHdfsErrorMsg("Failed to open HDFS file for writing: ", output_partition->current_file_name)); } @@ -310,7 +397,7 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, // files, we get the block size by stat-ing the file. hdfsFileInfo* info = hdfsGetPathInfo(output_partition->hdfs_connection, output_partition->current_file_name.c_str()); - if (info == NULL) { + if (info == nullptr) { return Status(GetHdfsErrorMsg("Failed to get info on temporary HDFS file: ", output_partition->current_file_name)); } @@ -338,16 +425,16 @@ Status HdfsTableSink::CreateNewTmpFile(RuntimeState* state, } Status HdfsTableSink::InitOutputPartition(RuntimeState* state, - const HdfsPartitionDescriptor& partition_descriptor, + const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row, OutputPartition* output_partition, bool empty_partition) { // Build the unique name for this partition from the partition keys, e.g. "j=1/f=foo/" // etc. stringstream partition_name_ss; for (int j = 0; j < partition_key_expr_ctxs_.size(); ++j) { partition_name_ss << table_desc_->col_descs()[j].name() << "="; - void* value = partition_key_expr_ctxs_[j]->GetValue(current_row_); - // NULL partition keys get a special value to be compatible with Hive. - if (value == NULL) { + void* value = partition_key_expr_ctxs_[j]->GetValue(row); + // nullptr partition keys get a special value to be compatible with Hive. + if (value == nullptr) { partition_name_ss << table_desc_->null_partition_key_value(); } else { string value_str; @@ -362,7 +449,7 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, // decoded partition key value. string encoded_str; UrlEncode(value_str, &encoded_str, true); - // If the string is empty, map it to NULL (mimicking Hive's behaviour) + // If the string is empty, map it to nullptr (mimicking Hive's behaviour) partition_name_ss << (encoded_str.empty() ? table_desc_->null_partition_key_value() : encoded_str); } @@ -459,24 +546,25 @@ Status HdfsTableSink::InitOutputPartition(RuntimeState* state, return CreateNewTmpFile(state, output_partition); } -void HdfsTableSink::GetHashTblKey(const vector& ctxs, string* key) { +void HdfsTableSink::GetHashTblKey( + const TupleRow* row, const vector& ctxs, string* key) { stringstream hash_table_key; for (int i = 0; i < ctxs.size(); ++i) { RawValue::PrintValueAsBytes( - ctxs[i]->GetValue(current_row_), ctxs[i]->root()->type(), &hash_table_key); + ctxs[i]->GetValue(row), ctxs[i]->root()->type(), &hash_table_key); // Additionally append "/" to avoid accidental key collisions. hash_table_key << "/"; } *key = hash_table_key.str(); } -inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, +inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, const TupleRow* row, const string& key, PartitionPair** partition_pair, bool no_more_rows) { + DCHECK(row != nullptr || key == ROOT_PARTITION_KEY); PartitionMap::iterator existing_partition; existing_partition = partition_keys_to_output_partitions_.find(key); if (existing_partition == partition_keys_to_output_partitions_.end()) { - // Create a new OutputPartition, and add it to - // partition_keys_to_output_partitions. + // Create a new OutputPartition, and add it to partition_keys_to_output_partitions. const HdfsPartitionDescriptor* partition_descriptor = default_partition_; PartitionDescriptorMap::const_iterator it = partition_descriptor_map_.find(key); if (it != partition_descriptor_map_.end()) { @@ -484,13 +572,13 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, } OutputPartition* partition = state->obj_pool()->Add(new OutputPartition()); - Status status = InitOutputPartition(state, *partition_descriptor, partition, - no_more_rows); + Status status = + InitOutputPartition(state, *partition_descriptor, row, partition, no_more_rows); if (!status.ok()) { // We failed to create the output partition successfully. Clean it up now // as it is not added to partition_keys_to_output_partitions_ so won't be // cleaned up in Close(). - if (partition->writer.get() != NULL) partition->writer->Close(); + if (partition->writer.get() != nullptr) partition->writer->Close(); return status; } @@ -533,45 +621,25 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) { // If there are no dynamic keys just use an empty key. PartitionPair* partition_pair; RETURN_IF_ERROR( - GetOutputPartition(state, ROOT_PARTITION_KEY, &partition_pair, false)); - // Pass the row batch to the writer. If new_file is returned true then the current - // file is finalized and a new file is opened. - // The writer tracks where it is in the batch when it returns with new_file set. - OutputPartition* output_partition = partition_pair->first; - bool new_file; - do { - RETURN_IF_ERROR(output_partition->writer->AppendRowBatch( - batch, partition_pair->second, &new_file)); - if (new_file) { - RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition)); - RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition)); - } - } while (new_file); + GetOutputPartition(state, nullptr, ROOT_PARTITION_KEY, &partition_pair, false)); + RETURN_IF_ERROR(WriteRowsToPartition(state, batch, partition_pair)); + } else if (input_is_clustered_) { + WriteClusteredRowBatch(state, batch); } else { for (int i = 0; i < batch->num_rows(); ++i) { - current_row_ = batch->GetRow(i); + const TupleRow* current_row = batch->GetRow(i); string key; - GetHashTblKey(dynamic_partition_key_expr_ctxs_, &key); - PartitionPair* partition_pair = NULL; - RETURN_IF_ERROR(GetOutputPartition(state, key, &partition_pair, false)); + GetHashTblKey(current_row, dynamic_partition_key_expr_ctxs_, &key); + PartitionPair* partition_pair = nullptr; + RETURN_IF_ERROR( + GetOutputPartition(state, current_row, key, &partition_pair, false)); partition_pair->second.push_back(i); } - for (PartitionMap::iterator partition = partition_keys_to_output_partitions_.begin(); - partition != partition_keys_to_output_partitions_.end(); ++partition) { - OutputPartition* output_partition = partition->second.first; - if (partition->second.second.empty()) continue; - - bool new_file; - do { - RETURN_IF_ERROR(output_partition->writer->AppendRowBatch( - batch, partition->second.second, &new_file)); - if (new_file) { - RETURN_IF_ERROR(FinalizePartitionFile(state, output_partition)); - RETURN_IF_ERROR(CreateNewTmpFile(state, output_partition)); - } - } while (new_file); - partition->second.second.clear(); + for (PartitionMap::value_type& partition : partition_keys_to_output_partitions_) { + if (!partition.second.second.empty()) { + RETURN_IF_ERROR(WriteRowsToPartition(state, batch, &partition.second)); + } } } return Status::OK(); @@ -579,11 +647,11 @@ Status HdfsTableSink::Send(RuntimeState* state, RowBatch* batch) { Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, OutputPartition* partition) { - if (partition->tmp_hdfs_file == NULL && !overwrite_) return Status::OK(); + if (partition->tmp_hdfs_file == nullptr && !overwrite_) return Status::OK(); SCOPED_TIMER(ADD_TIMER(profile(), "FinalizePartitionFileTimer")); - // OutputPartition writer could be NULL if there is no row to output. - if (partition->writer.get() != NULL) { + // OutputPartition writer could be nullptr if there is no row to output. + if (partition->writer.get() != nullptr) { RETURN_IF_ERROR(partition->writer->Finalize()); // Track total number of appended rows per partition in runtime @@ -604,10 +672,10 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, Status HdfsTableSink::ClosePartitionFile( RuntimeState* state, OutputPartition* partition) { - if (partition->tmp_hdfs_file == NULL) return Status::OK(); + if (partition->tmp_hdfs_file == nullptr) return Status::OK(); int hdfs_ret = hdfsCloseFile(partition->hdfs_connection, partition->tmp_hdfs_file); VLOG_FILE << "hdfsCloseFile() file=" << partition->current_file_name; - partition->tmp_hdfs_file = NULL; + partition->tmp_hdfs_file = nullptr; ImpaladMetrics::NUM_FILES_OPEN_FOR_INSERT->Increment(-1); if (hdfs_ret != 0) { return Status(ErrorMsg(TErrorCode::GENERAL, @@ -625,7 +693,7 @@ Status HdfsTableSink::FlushFinal(RuntimeState* state) { // Make sure we create an output partition even if the input is empty because we need // it to delete the existing data for 'insert overwrite'. PartitionPair* dummy; - RETURN_IF_ERROR(GetOutputPartition(state, ROOT_PARTITION_KEY, &dummy, true)); + RETURN_IF_ERROR(GetOutputPartition(state, nullptr, ROOT_PARTITION_KEY, &dummy, true)); } // Close Hdfs files, and update stats in runtime state. @@ -646,7 +714,7 @@ void HdfsTableSink::Close(RuntimeState* state) { partition_keys_to_output_partitions_.begin(); cur_partition != partition_keys_to_output_partitions_.end(); ++cur_partition) { - if (cur_partition->second.first->writer.get() != NULL) { + if (cur_partition->second.first->writer.get() != nullptr) { cur_partition->second.first->writer->Close(); } Status close_status = ClosePartitionFile(state, cur_partition->second.first); diff --git a/be/src/exec/hdfs-table-sink.h b/be/src/exec/hdfs-table-sink.h index ae3227029..22b6a44ed 100644 --- a/be/src/exec/hdfs-table-sink.h +++ b/be/src/exec/hdfs-table-sink.h @@ -165,10 +165,11 @@ class HdfsTableSink : public DataSink { private: /// Initialises the filenames of a given output partition, and opens the temporary file. - /// If the partition will not have any rows added to it, empty_partition must be true. + /// The partition key is derived from 'row'. If the partition will not have any rows + /// added to it, empty_partition must be true. Status InitOutputPartition(RuntimeState* state, - const HdfsPartitionDescriptor& partition_descriptor, - OutputPartition* output_partition, bool empty_partition); + const HdfsPartitionDescriptor& partition_descriptor, const TupleRow* row, + OutputPartition* output_partition, bool empty_partition); /// Add a temporary file to an output partition. Files are created in a /// temporary directory and then moved to the real partition directory by the @@ -177,24 +178,23 @@ class HdfsTableSink : public DataSink { /// If this function fails, the tmp file is cleaned up. Status CreateNewTmpFile(RuntimeState* state, OutputPartition* output_partition); - /// Key is the concatenation of the evaluated - /// dynamic_partition_key_exprs_ generated by GetHashTblKey(). - /// Maps to an OutputPartition, which are owned by the object pool and - /// a vector of rows to insert into this partition from the current row batch. + /// Key is the concatenation of the evaluated dynamic_partition_key_exprs_ generated by + /// GetHashTblKey(). Maps to an OutputPartition, which are owned by the object pool, and + /// a vector of indices of the rows in the current batch to insert into the partition. typedef std::pair> PartitionPair; typedef boost::unordered_map PartitionMap; - - /// Generates string key for hash_tbl_ as a concatenation - /// of all evaluated exprs, evaluated against current_row_. - /// The generated string is much shorter than the full Hdfs file name. - void GetHashTblKey(const std::vector& ctxs, std::string* key); + /// Generates string key for hash_tbl_ as a concatenation of all evaluated exprs, + /// evaluated against 'row'. The generated string is much shorter than the full Hdfs + /// file name. + void GetHashTblKey( + const TupleRow* row, const std::vector& ctxs, std::string* key); /// Given a hashed partition key, get the output partition structure from - /// the partition_keys_to_output_partitions_. - /// no_more_rows indicates that no more rows will be added to the partition. - Status GetOutputPartition(RuntimeState* state, const std::string& key, - PartitionPair** partition_pair, bool no_more_rows); + /// the 'partition_keys_to_output_partitions_'. 'no_more_rows' indicates that no more + /// rows will be added to the partition. + Status GetOutputPartition(RuntimeState* state, const TupleRow* row, + const std::string& key, PartitionPair** partition_pair, bool no_more_rows); /// Initialise and prepare select and partition key expressions Status PrepareExprs(RuntimeState* state); @@ -206,6 +206,15 @@ class HdfsTableSink : public DataSink { void BuildHdfsFileNames(const HdfsPartitionDescriptor& partition_descriptor, OutputPartition* output); + /// Writes all rows referenced by the row index vector in 'partition_pair' to the + /// partition's writer and clears the row index vector afterwards. + Status WriteRowsToPartition( + RuntimeState* state, RowBatch* batch, PartitionPair* partition_pair); + + /// Maps all rows in 'batch' to partitions and appends them to their temporary Hdfs + /// files. The input must be ordered by the partition key expressions. + Status WriteClusteredRowBatch(RuntimeState* state, RowBatch* batch); + /// Updates runtime stats of HDFS with rows written, then closes the file associated /// with the partition by calling ClosePartitionFile() Status FinalizePartitionFile(RuntimeState* state, OutputPartition* partition); @@ -230,9 +239,6 @@ class HdfsTableSink : public DataSink { /// Exprs that materialize output values std::vector output_expr_ctxs_; - /// Current row from the current RowBatch to output - TupleRow* current_row_; - /// Table id resolved in Prepare() to set tuple_desc_; TableId table_id_; @@ -255,6 +261,18 @@ class HdfsTableSink : public DataSink { /// Indicates whether the existing partitions should be overwritten. bool overwrite_; + /// Indicates whether the input is ordered by the partition keys, meaning partitions can + /// be opened, written, and closed one by one. + bool input_is_clustered_; + + /// Stores the current partition during clustered inserts across subsequent row batches. + /// Only set if 'input_is_clustered_' is true. + PartitionPair* current_clustered_partition_; + + /// Stores the current partition key during clustered inserts across subsequent row + /// batches. Only set if 'input_is_clustered_' is true. + std::string current_clustered_partition_key_; + /// The directory in which to write intermediate results. Set to /// /_impala_insert_staging/ during Prepare() std::string staging_dir_; diff --git a/be/src/exec/hdfs-table-writer.cc b/be/src/exec/hdfs-table-writer.cc index 71e9278a7..48349e33a 100644 --- a/be/src/exec/hdfs-table-writer.cc +++ b/be/src/exec/hdfs-table-writer.cc @@ -39,6 +39,7 @@ HdfsTableWriter::HdfsTableWriter(HdfsTableSink* parent, Status HdfsTableWriter::Write(const uint8_t* data, int32_t len) { DCHECK_GE(len, 0); + DCHECK(output_->tmp_hdfs_file != nullptr); int ret = hdfsWrite(output_->hdfs_connection, output_->tmp_hdfs_file, data, len); if (ret == -1) { string error_msg = GetHdfsErrorMsg(""); diff --git a/be/src/exec/hdfs-table-writer.h b/be/src/exec/hdfs-table-writer.h index b3d344f95..8066315fc 100644 --- a/be/src/exec/hdfs-table-writer.h +++ b/be/src/exec/hdfs-table-writer.h @@ -52,7 +52,7 @@ class HdfsTableWriter { /// The sequence of calls to this object are: /// 1. Init() /// 2. InitNewFile() - /// 3. AppendRowBatch() - called repeatedly + /// 3. AppendRows() - called repeatedly /// 4. Finalize() /// For files formats that are splittable (and therefore can be written to an /// arbitrarily large file), 1-4 is called once. @@ -65,17 +65,14 @@ class HdfsTableWriter { /// Called when a new file is started. virtual Status InitNewFile() = 0; - /// Appends the current batch of rows to the partition. If there are multiple - /// partitions then row_group_indices will contain the rows that are for this - /// partition, otherwise all rows in the batch are appended. - /// If the current file is full, the writer stops appending and - /// returns with *new_file == true. A new file will be opened and - /// the same row batch will be passed again. The writer must track how - /// much of the batch it had already processed asking for a new file. - /// Otherwise the writer will return with *newfile == false. - virtual Status AppendRowBatch(RowBatch* batch, - const std::vector& row_group_indices, - bool* new_file) = 0; + /// Appends rows of 'batch' to the partition that are selected via 'row_group_indices', + /// and if the latter is empty, appends every row. + /// If the current file is full, the writer stops appending and returns with + /// *new_file == true. A new file will be opened and the same row batch will be passed + /// again. The writer must track how much of the batch it had already processed asking + /// for a new file. Otherwise the writer will return with *newfile == false. + virtual Status AppendRows( + RowBatch* batch, const std::vector& row_group_indices, bool* new_file) = 0; /// Finalize this partition. The writer needs to finish processing /// all data have written out after the return from this call. diff --git a/be/src/exec/hdfs-text-table-writer.cc b/be/src/exec/hdfs-text-table-writer.cc index 053c8210a..cba40325a 100644 --- a/be/src/exec/hdfs-text-table-writer.cc +++ b/be/src/exec/hdfs-text-table-writer.cc @@ -99,9 +99,8 @@ string HdfsTextTableWriter::file_extension() const { return compressor_->file_extension(); } -Status HdfsTextTableWriter::AppendRowBatch(RowBatch* batch, - const vector& row_group_indices, - bool* new_file) { +Status HdfsTextTableWriter::AppendRows( + RowBatch* batch, const vector& row_group_indices, bool* new_file) { int32_t limit; if (row_group_indices.empty()) { limit = batch->num_rows(); diff --git a/be/src/exec/hdfs-text-table-writer.h b/be/src/exec/hdfs-text-table-writer.h index 1d65f111b..2944f2375 100644 --- a/be/src/exec/hdfs-text-table-writer.h +++ b/be/src/exec/hdfs-text-table-writer.h @@ -60,8 +60,8 @@ class HdfsTextTableWriter : public HdfsTableWriter { /// Appends delimited string representation of the rows in the batch to output partition. /// The resulting output is buffered until HDFS_FLUSH_WRITE_SIZE before being written /// to HDFS. - Status AppendRowBatch(RowBatch* current_row, - const std::vector& row_group_indices, bool* new_file); + Status AppendRows(RowBatch* current_row, const std::vector& row_group_indices, + bool* new_file); private: /// Escapes occurrences of field_delim_ and escape_char_ with escape_char_ and diff --git a/bin/impala-config.sh b/bin/impala-config.sh index caae5e977..834f21622 100755 --- a/bin/impala-config.sh +++ b/bin/impala-config.sh @@ -197,6 +197,10 @@ elif [ "${TARGET_FILESYSTEM}" = "isilon" ]; then # isilon manages its own replication. export HDFS_REPLICATION=1 elif [ "${TARGET_FILESYSTEM}" = "local" ]; then + if [[ "${WAREHOUSE_LOCATION_PREFIX}" = "" ]]; then + echo "WAREHOUSE_LOCATION_PREFIX cannot be an empty string for local filesystem" + return 1 + fi if [ ! -d "${WAREHOUSE_LOCATION_PREFIX}" ]; then echo "'$WAREHOUSE_LOCATION_PREFIX' is not a directory on the local filesystem." return 1 diff --git a/common/thrift/DataSinks.thrift b/common/thrift/DataSinks.thrift index 2a5730400..0b136b2bc 100644 --- a/common/thrift/DataSinks.thrift +++ b/common/thrift/DataSinks.thrift @@ -62,10 +62,14 @@ struct THdfsTableSink { 1: required list partition_key_exprs 2: required bool overwrite - /// The 'skip.header.line.count' property of the target Hdfs table. We will insert this - /// many empty lines at the beginning of new text files, which will be skipped by the - /// scanners while reading from the files. + // The 'skip.header.line.count' property of the target Hdfs table. We will insert this + // many empty lines at the beginning of new text files, which will be skipped by the + // scanners while reading from the files. 3: optional i32 skip_header_line_count + + // This property indicates to the table sink whether the input is ordered by the + // partition keys, meaning partitions can be opened, written, and closed one by one. + 4: required bool input_is_clustered } // Structure to encapsulate specific options that are passed down to the KuduTableSink @@ -73,7 +77,10 @@ struct TKuduTableSink { // The position in this vector is equal to the position in the output expressions of the // sink and holds the index of the corresponsding column in the Kudu schema, // e.g. 'exprs[i]' references 'kudu_table.column(referenced_cols[i])' - 1: optional list referenced_columns; + 1: optional list referenced_columns + + // Defines if duplicate or not found keys should be ignored + 2: optional bool ignore_not_found_or_duplicate } // Sink to create the build side of a JoinNode. diff --git a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java index 9d84baaa3..2f7f6709b 100644 --- a/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/DeleteStmt.java @@ -56,7 +56,7 @@ public class DeleteStmt extends ModifyStmt { // analyze() must have been called before. Preconditions.checkState(table_ != null); TableSink tableSink = TableSink.create(table_, TableSink.Op.DELETE, - ImmutableList.of(), referencedColumns_, false); + ImmutableList.of(), referencedColumns_, false, false); Preconditions.checkState(!referencedColumns_.isEmpty()); return tableSink; } diff --git a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java index 87f7cefb5..5528da9ea 100644 --- a/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/InsertStmt.java @@ -793,7 +793,7 @@ public class InsertStmt extends StatementBase { // analyze() must have been called before. Preconditions.checkState(table_ != null); return TableSink.create(table_, isUpsert_ ? TableSink.Op.UPSERT : TableSink.Op.INSERT, - partitionKeyExprs_, mentionedColumns_, overwrite_); + partitionKeyExprs_, mentionedColumns_, overwrite_, hasClusteredHint_); } /** diff --git a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java index 9a1bc9eab..de74bd8c8 100644 --- a/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java +++ b/fe/src/main/java/org/apache/impala/analysis/UpdateStmt.java @@ -65,7 +65,7 @@ public class UpdateStmt extends ModifyStmt { // analyze() must have been called before. Preconditions.checkState(table_ != null); DataSink dataSink = TableSink.create(table_, TableSink.Op.UPDATE, - ImmutableList.of(), referencedColumns_, false); + ImmutableList.of(), referencedColumns_, false, false); Preconditions.checkState(!referencedColumns_.isEmpty()); return dataSink; } diff --git a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java index bb06b5eb6..fc7f9b126 100644 --- a/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/HdfsTableSink.java @@ -46,12 +46,17 @@ public class HdfsTableSink extends TableSink { // Whether to overwrite the existing partition(s). protected final boolean overwrite_; + // Indicates whether the input is ordered by the partition keys, meaning partitions can + // be opened, written, and closed one by one. + protected final boolean inputIsClustered_; + public HdfsTableSink(Table targetTable, List partitionKeyExprs, - boolean overwrite) { + boolean overwrite, boolean inputIsClustered) { super(targetTable, Op.INSERT); Preconditions.checkState(targetTable instanceof HdfsTable); partitionKeyExprs_ = partitionKeyExprs; overwrite_ = overwrite; + inputIsClustered_ = inputIsClustered; } @Override @@ -140,7 +145,7 @@ public class HdfsTableSink extends TableSink { protected TDataSink toThrift() { TDataSink result = new TDataSink(TDataSinkType.TABLE_SINK); THdfsTableSink hdfsTableSink = new THdfsTableSink( - Expr.treesToThrift(partitionKeyExprs_), overwrite_); + Expr.treesToThrift(partitionKeyExprs_), overwrite_, inputIsClustered_); HdfsTable table = (HdfsTable) targetTable_; StringBuilder error = new StringBuilder(); int skipHeaderLineCount = table.parseSkipHeaderLineCount(error); diff --git a/fe/src/main/java/org/apache/impala/planner/TableSink.java b/fe/src/main/java/org/apache/impala/planner/TableSink.java index 742e6c93a..fb3cea259 100644 --- a/fe/src/main/java/org/apache/impala/planner/TableSink.java +++ b/fe/src/main/java/org/apache/impala/planner/TableSink.java @@ -90,13 +90,13 @@ public abstract class TableSink extends DataSink { */ public static TableSink create(Table table, Op sinkAction, List partitionKeyExprs, List referencedColumns, - boolean overwrite) { + boolean overwrite, boolean inputIsClustered) { if (table instanceof HdfsTable) { // Hdfs only supports inserts. Preconditions.checkState(sinkAction == Op.INSERT); // Referenced columns don't make sense for an Hdfs table. Preconditions.checkState(referencedColumns.isEmpty()); - return new HdfsTableSink(table, partitionKeyExprs, overwrite); + return new HdfsTableSink(table, partitionKeyExprs, overwrite, inputIsClustered); } else if (table instanceof HBaseTable) { // HBase only supports inserts. Preconditions.checkState(sinkAction == Op.INSERT); diff --git a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java index cacdd838f..11f6842d9 100644 --- a/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java +++ b/fe/src/test/java/org/apache/impala/analysis/AnalyzeStmtsTest.java @@ -1750,6 +1750,22 @@ public class AnalyzeStmtsTest extends AnalyzerTest { "partition (year, month) /* +shuffle,noshuffle */ " + "select * from functional.alltypes", "Conflicting INSERT hints: shuffle and noshuffle"); + + // Test clustered hint. + AnalyzesOk(String.format( + "insert into functional.alltypessmall partition (year, month) %sclustered%s " + + "select * from functional.alltypes", prefix, suffix)); + AnalyzesOk(String.format( + "insert into table functional.alltypesnopart %sclustered%s " + + "select * from functional.alltypesnopart", prefix, suffix)); + AnalyzesOk(String.format( + "insert into table functional.alltypesnopart %snoclustered%s " + + "select * from functional.alltypesnopart", prefix, suffix)); + // Conflicting clustered hints. + AnalysisError(String.format( + "insert into table functional.alltypessmall partition (year, month) " + + "/* +clustered,noclustered */ select * from functional.alltypes", prefix, + suffix), "Conflicting INSERT hints: clustered and noclustered"); } // Multiple non-conflicting hints and case insensitivity of hints. diff --git a/testdata/workloads/functional-query/queries/QueryTest/insert.test b/testdata/workloads/functional-query/queries/QueryTest/insert.test index b0c5c7cf3..5601b3513 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/insert.test +++ b/testdata/workloads/functional-query/queries/QueryTest/insert.test @@ -861,14 +861,87 @@ Memory limit exceeded ---- QUERY # IMPALA-2521: clustered insert into table insert into table alltypesinsert -partition (year, month) /*+ clustered */ -select * from functional.alltypessmall; +partition (year, month) /*+ clustered,shuffle */ +select * from alltypes; ---- SETUP DROP PARTITIONS alltypesinsert RESET alltypesinsert ---- RESULTS -year=2009/month=1/: 25 -year=2009/month=2/: 25 -year=2009/month=3/: 25 -year=2009/month=4/: 25 +year=2009/month=1/: 310 +year=2009/month=10/: 310 +year=2009/month=11/: 300 +year=2009/month=12/: 310 +year=2009/month=2/: 280 +year=2009/month=3/: 310 +year=2009/month=4/: 300 +year=2009/month=5/: 310 +year=2009/month=6/: 300 +year=2009/month=7/: 310 +year=2009/month=8/: 310 +year=2009/month=9/: 300 +year=2010/month=1/: 310 +year=2010/month=10/: 310 +year=2010/month=11/: 300 +year=2010/month=12/: 310 +year=2010/month=2/: 280 +year=2010/month=3/: 310 +year=2010/month=4/: 300 +year=2010/month=5/: 310 +year=2010/month=6/: 300 +year=2010/month=7/: 310 +year=2010/month=8/: 310 +year=2010/month=9/: 300 +==== +---- QUERY +# IMPALA-2521: clustered insert into table +insert into table alltypesinsert +partition (year, month) /*+ clustered,shuffle */ +select * from alltypestiny; +---- SETUP +DROP PARTITIONS alltypesinsert +RESET alltypesinsert +---- RESULTS +year=2009/month=1/: 2 +year=2009/month=2/: 2 +year=2009/month=3/: 2 +year=2009/month=4/: 2 +==== +---- QUERY +# IMPALA-2521: clustered insert into table +insert into table alltypesinsert +partition (year, month) /*+ clustered,noshuffle */ +select * from alltypestiny; +---- SETUP +DROP PARTITIONS alltypesinsert +RESET alltypesinsert +---- RESULTS +year=2009/month=1/: 2 +year=2009/month=2/: 2 +year=2009/month=3/: 2 +year=2009/month=4/: 2 +==== +---- QUERY +# IMPALA-2521: clustered insert into table +insert into table alltypesinsert +partition (year, month) /*+ clustered,shuffle */ +select * from alltypestiny where int_col = 0; +---- SETUP +DROP PARTITIONS alltypesinsert +RESET alltypesinsert +---- RESULTS +year=2009/month=1/: 1 +year=2009/month=2/: 1 +year=2009/month=3/: 1 +year=2009/month=4/: 1 +==== +---- QUERY +# IMPALA-2521: clustered, unpartitioned insert into table +insert into table alltypesnopart_insert + /*+ clustered,shuffle */ +select id, bool_col, tinyint_col, smallint_col, int_col, bigint_col, float_col, +double_col, date_string_col, string_col, timestamp_col from alltypessmall; +---- SETUP +RESET alltypesnopart_insert +---- RESULTS +: 100 ==== diff --git a/tests/query_test/test_insert.py b/tests/query_test/test_insert.py index e5e74ed5e..d1950508e 100644 --- a/tests/query_test/test_insert.py +++ b/tests/query_test/test_insert.py @@ -55,7 +55,7 @@ class TestInsertQueries(ImpalaTestSuite): cls.TestMatrix.add_dimension(create_uncompressed_text_dimension(cls.get_workload())) else: cls.TestMatrix.add_dimension(create_exec_option_dimension( - cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0], + cluster_sizes=[0], disable_codegen_options=[False], batch_sizes=[0, 1, 16], sync_ddl=[0, 1])) cls.TestMatrix.add_dimension(TestDimension("compression_codec", *PARQUET_CODECS)); # Insert is currently only supported for text and parquet @@ -68,6 +68,12 @@ class TestInsertQueries(ImpalaTestSuite): v.get_value('compression_codec') == 'none')) cls.TestMatrix.add_constraint(lambda v:\ v.get_value('table_format').compression_codec == 'none') + # Only test other batch sizes for uncompressed parquet to keep the execution time + # within reasonable bounds. + cls.TestMatrix.add_constraint(lambda v:\ + v.get_value('exec_option')['batch_size'] == 0 or \ + (v.get_value('table_format').file_format == 'parquet' and \ + v.get_value('compression_codec') == 'none')) def test_insert_large_string(self, vector, unique_database): """Test handling of large strings in inserter and scanner.""" @@ -103,7 +109,7 @@ class TestInsertQueries(ImpalaTestSuite): super(TestInsertQueries, cls).setup_class() @pytest.mark.execute_serially - def test_insert(self, vector): + def test_insert_test(self, vector): if (vector.get_value('table_format').file_format == 'parquet'): vector.get_value('exec_option')['COMPRESSION_CODEC'] = \ vector.get_value('compression_codec') diff --git a/tests/query_test/test_insert_behaviour.py b/tests/query_test/test_insert_behaviour.py index 53c9dbfed..158434350 100644 --- a/tests/query_test/test_insert_behaviour.py +++ b/tests/query_test/test_insert_behaviour.py @@ -23,11 +23,14 @@ import pytest from tests.common.impala_test_suite import ImpalaTestSuite from tests.common.parametrize import UniqueDatabase from tests.common.skip import SkipIfS3, SkipIfIsilon, SkipIfLocal -from tests.util.filesystem_utils import WAREHOUSE, get_fs_path +from tests.util.filesystem_utils import WAREHOUSE, get_fs_path, IS_S3 @SkipIfLocal.hdfs_client class TestInsertBehaviour(ImpalaTestSuite): """Tests for INSERT behaviour that isn't covered by checking query results""" + @classmethod + def get_workload(self): + return 'functional-query' TEST_DB_NAME = "insert_empty_result_db" @@ -473,3 +476,106 @@ class TestInsertBehaviour(ImpalaTestSuite): "other::---".format(groups[0])) self.execute_query_expect_success(self.client, "REFRESH " + table) self.execute_query_expect_failure(self.client, insert_query) + + def test_clustered_partition_single_file(self, unique_database): + """IMPALA-2523: Tests that clustered insert creates one file per partition, even when + inserting over multiple row batches.""" + # On s3 this test takes about 220 seconds and we are unlikely to break it, so only run + # it in exhaustive strategy. + if self.exploration_strategy() != 'exhaustive' and IS_S3: + pytest.skip("only runs in exhaustive") + table = "{0}.insert_clustered".format(unique_database) + table_path = "test-warehouse/{0}.db/insert_clustered".format(unique_database) + table_location = get_fs_path("/" + table_path) + + create_stmt = """create table {0} like functional.alltypes""".format(table) + self.execute_query_expect_success(self.client, create_stmt) + + set_location_stmt = """alter table {0} set location '{1}'""".format( + table, table_location) + self.execute_query_expect_success(self.client, set_location_stmt) + + # Setting a lower batch size will result in multiple row batches being written. + self.execute_query_expect_success(self.client, "set batch_size=10") + + insert_stmt = """insert into {0} partition(year, month) /*+ clustered,shuffle */ + select * from functional.alltypes""".format(table) + self.execute_query_expect_success(self.client, insert_stmt) + + # We expect exactly one partition per year and month, since subsequent row batches of + # a partition will be written into the same file. + expected_partitions = \ + ["year=%s/month=%s" % (y, m) for y in [2009, 2010] for m in range(1,13)] + + for partition in expected_partitions: + partition_path = "{0}/{1}".format(table_path, partition) + files = self.filesystem_client.ls(partition_path) + assert len(files) == 1, "%s: %s" % (partition, files) + + def test_clustered_partition_multiple_files(self, unique_database): + """IMPALA-2523: Tests that clustered insert creates the right number of files per + partition when inserting over multiple row batches.""" + # This test takes about 30 seconds and we are unlikely to break it, so only run it in + # exhaustive strategy. + if self.exploration_strategy() != 'exhaustive': + pytest.skip("only runs in exhaustive") + table = "{0}.insert_clustered".format(unique_database) + table_path = "test-warehouse/{0}.db/insert_clustered".format(unique_database) + table_location = get_fs_path("/" + table_path) + + create_stmt = """create table {0} ( + l_orderkey BIGINT, + l_partkey BIGINT, + l_suppkey BIGINT, + l_linenumber INT, + l_quantity DECIMAL(12,2), + l_extendedprice DECIMAL(12,2), + l_discount DECIMAL(12,2), + l_tax DECIMAL(12,2), + l_linestatus STRING, + l_shipdate STRING, + l_commitdate STRING, + l_receiptdate STRING, + l_shipinstruct STRING, + l_shipmode STRING, + l_comment STRING) + partitioned by (l_returnflag STRING) stored as parquet + """.format(table) + self.execute_query_expect_success(self.client, create_stmt) + + set_location_stmt = """alter table {0} set location '{1}'""".format( + table, table_location) + self.execute_query_expect_success(self.client, set_location_stmt) + + # Setting a lower parquet file size will result in multiple files being written. + self.execute_query_expect_success(self.client, "set parquet_file_size=10485760") + + insert_stmt = """insert into {0} partition(l_returnflag) /*+ clustered,shuffle */ + select l_orderkey, + l_partkey, + l_suppkey, + l_linenumber, + l_quantity, + l_extendedprice, + l_discount, + l_tax, + l_linestatus, + l_shipdate, + l_commitdate, + l_receiptdate, + l_shipinstruct, + l_shipmode, + l_comment, + l_returnflag + from tpch_parquet.lineitem""".format(table) + self.execute_query_expect_success(self.client, insert_stmt) + + expected_partition_files = [("l_returnflag=A", 3, 30), + ("l_returnflag=N", 3, 30), + ("l_returnflag=R", 3, 30)] + + for partition, min_files, max_files in expected_partition_files: + partition_path = "{0}/{1}".format(table_path, partition) + files = self.filesystem_client.ls(partition_path) + assert min_files <= len(files) and len(files) <= max_files, \ + "%s: %s" % (partition, files)