From 99ed6dc67ae889eb2a45b10c97cb23f52bc83e5d Mon Sep 17 00:00:00 2001 From: Matthew Jacobs Date: Wed, 19 Oct 2016 15:30:58 -0700 Subject: [PATCH] IMPALA-4134,IMPALA-3704: Kudu INSERT improvements 1.) IMPALA-4134: Use Kudu AUTO FLUSH Improves performance of writes to Kudu up to 4.2x in bulk data loading tests (load 200 million rows from lineitem). 2.) IMPALA-3704: Improve errors on PK conflicts The Kudu client reports an error for every PK conflict, and all errors were being returned in the error status. As a result, inserts/updates/deletes could return errors with thousands errors reported. This changes the error handling to log all reported errors as warnings and return only the first error in the query error status. 3.) Improve the DataSink reporting of the insert stats. The per-partition stats returned by the data sink weren't useful for Kudu sinks. Firstly, the number of appended rows was not being displayed in the profile. Secondly, the 'stats' field isn't populated for Kudu tables and thus was confusing in the profile, so it is no longer printed if it is not set in the thrift struct. Testing: Ran local tests, including new tests to verify the query profile insert stats. Manual cluster testing was conducted of the AUTO FLUSH functionality, and that testing informed the default mutation buffer value of 100MB which was found to provide good results. Change-Id: I5542b9a061b01c543a139e8722560b1365f06595 Reviewed-on: http://gerrit.cloudera.org:8080/4728 Reviewed-by: Matthew Jacobs Tested-by: Internal Jenkins --- be/src/exec/data-sink.cc | 5 + be/src/exec/hbase-table-sink.cc | 4 +- be/src/exec/hdfs-table-sink.cc | 4 +- be/src/exec/kudu-table-sink.cc | 141 ++++++++++++------ be/src/exec/kudu-table-sink.h | 53 ++++--- be/src/runtime/coordinator.cc | 14 +- be/src/service/impala-beeswax-server.cc | 4 +- be/src/service/query-exec-state.cc | 2 +- common/thrift/ImpalaInternalService.thrift | 4 +- common/thrift/ImpalaService.thrift | 8 +- common/thrift/generate_error_codes.py | 6 + shell/impala_client.py | 2 +- .../queries/QueryTest/kudu_crud.test | 94 +++++++++++- tests/beeswax/impala_beeswax.py | 4 +- 14 files changed, 248 insertions(+), 97 deletions(-) diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index c95b854e4..6a3454355 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -142,6 +142,11 @@ string DataSink::OutputInsertStats(const PartitionStatusMap& stats, } else { ss << partition_key << endl; } + if (val.second.__isset.num_modified_rows) { + ss << "NumModifiedRows: " << val.second.num_modified_rows << endl; + } + + if (!val.second.__isset.stats) continue; const TInsertStats& stats = val.second.stats; ss << indent << "BytesWritten: " << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES); diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc index e6052cc5f..3d84fed79 100644 --- a/be/src/exec/hbase-table-sink.cc +++ b/be/src/exec/hbase-table-sink.cc @@ -72,7 +72,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { // Add a 'root partition' status in which to collect insert statistics TInsertPartitionStatus root_status; - root_status.__set_num_appended_rows(0L); + root_status.__set_num_modified_rows(0L); root_status.__set_stats(TInsertStats()); root_status.__set_id(-1L); state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status)); @@ -90,7 +90,7 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) { RETURN_IF_ERROR(state->CheckQueryState()); // Since everything is set up just forward everything to the writer. RETURN_IF_ERROR(hbase_table_writer_->AppendRowBatch(batch)); - (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows += + (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows += batch->num_rows(); return Status::OK(); } diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 77316d4a9..63bd64878 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -499,7 +499,7 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, DCHECK(state->per_partition_status()->find(partition->partition_name) == state->per_partition_status()->end()); TInsertPartitionStatus partition_status; - partition_status.__set_num_appended_rows(0L); + partition_status.__set_num_modified_rows(0L); partition_status.__set_id(partition_descriptor->id()); partition_status.__set_stats(TInsertStats()); partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir()); @@ -594,7 +594,7 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, // Should have been created in GetOutputPartition() when the partition was // initialised. DCHECK(it != state->per_partition_status()->end()); - it->second.num_appended_rows += partition->num_rows; + it->second.num_modified_rows += partition->num_rows; DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats); } diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index 661489f79..70a74a97a 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -33,6 +33,8 @@ DEFINE_int32(kudu_session_timeout_seconds, 60, "Timeout set on the Kudu session. " "How long to wait before considering a write failed."); +DEFINE_int32(kudu_mutation_buffer_size, 100 * 1024 * 1024, "The size (bytes) of the " + "Kudu client buffer for mutations."); using kudu::client::KuduColumnSchema; using kudu::client::KuduSchema; @@ -48,6 +50,9 @@ namespace impala { const static string& ROOT_PARTITION_KEY = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY; +// Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693). +const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024; + KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, const vector& select_list_texprs, const TDataSink& tsink) @@ -56,8 +61,6 @@ KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, select_list_texprs_(select_list_texprs), sink_action_(tsink.table_sink.action), kudu_table_sink_(tsink.table_sink.kudu_table_sink), - kudu_flush_counter_(NULL), - kudu_flush_timer_(NULL), kudu_error_counter_(NULL), rows_written_(NULL), rows_written_rate_(NULL) { @@ -91,16 +94,14 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { // Add a 'root partition' status in which to collect write statistics TInsertPartitionStatus root_status; - root_status.__set_num_appended_rows(0L); - root_status.__set_stats(TInsertStats()); + root_status.__set_num_modified_rows(0L); root_status.__set_id(-1L); state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status)); // Add counters - kudu_flush_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushOperations", TUnit::UNIT); kudu_error_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushErrors", TUnit::UNIT); - kudu_flush_timer_ = ADD_TIMER(profile(), "KuduFlushTimer"); rows_written_ = ADD_COUNTER(profile(), "RowsWritten", TUnit::UNIT); + kudu_apply_timer_ = ADD_TIMER(profile(), "KuduApplyTimer"); rows_written_rate_ = profile()->AddDerivedCounter( "RowsWrittenRate", TUnit::UNIT_PER_SECOND, bind(&RuntimeProfile::UnitsPerSecond, rows_written_, @@ -124,8 +125,40 @@ Status KuduTableSink::Open(RuntimeState* state) { session_ = client_->NewSession(); session_->SetTimeoutMillis(FLAGS_kudu_session_timeout_seconds * 1000); + + // KuduSession Set* methods here and below return a status for API compatibility. + // As long as the Kudu client is statically linked, these shouldn't fail and thus these + // calls could also DCHECK status is OK for debug builds (while still returning errors + // for release). KUDU_RETURN_IF_ERROR(session_->SetFlushMode( - kudu::client::KuduSession::MANUAL_FLUSH), "Unable to set flush mode"); + kudu::client::KuduSession::AUTO_FLUSH_BACKGROUND), "Unable to set flush mode"); + + const int32_t buf_size = FLAGS_kudu_mutation_buffer_size; + if (buf_size < 1024 * 1024) { + return Status(strings::Substitute( + "Invalid kudu_mutation_buffer_size: '$0'. Must be greater than 1MB.", buf_size)); + } + KUDU_RETURN_IF_ERROR(session_->SetMutationBufferSpace(buf_size), + "Couldn't set mutation buffer size"); + + // Configure client memory used for buffering. + // Internally, the Kudu client keeps one or more buffers for writing operations. When a + // single buffer is flushed, it is locked (that space cannot be reused) until all + // operations within it complete, so it is important to have a number of buffers. In + // our testing, we found that allowing a total of 100MB of buffer space to provide good + // results; this is the default. Then, because of some existing 8MB limits in Kudu, we + // want to have that total space broken up into 7MB buffers (INDIVIDUAL_BUFFER_SIZE). + // The mutation flush watermark is set to flush every INDIVIDUAL_BUFFER_SIZE. + int num_buffers = FLAGS_kudu_mutation_buffer_size / INDIVIDUAL_BUFFER_SIZE; + if (num_buffers == 0) num_buffers = 1; + KUDU_RETURN_IF_ERROR(session_->SetMutationBufferFlushWatermark(1.0 / num_buffers), + "Couldn't set mutation buffer watermark"); + + // No limit on the buffer count since the settings above imply a max number of buffers. + // Note that the Kudu client API has a few too many knobs for configuring the size and + // number of these buffers; there are a few ways to accomplish similar behaviors. + KUDU_RETURN_IF_ERROR(session_->SetMutationBufferMaxNum(0), + "Couldn't set mutation buffer count"); return Status::OK(); } @@ -135,7 +168,8 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() { } else if (sink_action_ == TSinkAction::UPDATE) { return table_->NewUpdate(); } else { - DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported. " << sink_action_; + DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported: " + << sink_action_; return table_->NewDelete(); } } @@ -145,11 +179,15 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { ExprContext::FreeLocalAllocations(output_expr_ctxs_); RETURN_IF_ERROR(state->CheckQueryState()); + // Collect all write operations and apply them together so the time in Apply() can be + // easily timed. + vector> write_ops; + int rows_added = 0; // Since everything is set up just forward everything to the writer. for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* current_row = batch->GetRow(i); - gscoped_ptr write(NewWriteOp()); + unique_ptr write(NewWriteOp()); for (int j = 0; j < output_expr_ctxs_.size(); ++j) { int col = kudu_table_sink_.referenced_columns.empty() ? @@ -173,7 +211,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { case TYPE_STRING: { StringValue* sv = reinterpret_cast(value); kudu::Slice slice(reinterpret_cast(sv->ptr), sv->len); - KUDU_RETURN_IF_ERROR(write->mutable_row()->SetStringNoCopy(col, slice), + KUDU_RETURN_IF_ERROR(write->mutable_row()->SetString(col, slice), "Could not add Kudu WriteOp."); break; } @@ -216,46 +254,38 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type)); } } - - KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), - "Error while applying Kudu session."); - ++rows_added; + write_ops.push_back(move(write)); } + + { + SCOPED_TIMER(kudu_apply_timer_); + for (auto&& write: write_ops) { + KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), "Error applying Kudu Op."); + ++rows_added; + } + } + COUNTER_ADD(rows_written_, rows_added); - int64_t error_count = 0; - RETURN_IF_ERROR(Flush(&error_count)); - (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows += - rows_added - error_count; + RETURN_IF_ERROR(CheckForErrors(state)); return Status::OK(); } -Status KuduTableSink::Flush(int64_t* error_count) { - // TODO right now we always flush an entire row batch, if these are small we'll - // be inefficient. Consider decoupling impala's batch size from kudu's - kudu::Status s; - { - SCOPED_TIMER(kudu_flush_timer_); - COUNTER_ADD(kudu_flush_counter_, 1); - s = session_->Flush(); - } - if (LIKELY(s.ok())) return Status::OK(); +Status KuduTableSink::CheckForErrors(RuntimeState* state) { + if (session_->CountPendingErrors() == 0) return Status::OK(); - stringstream error_msg_buffer; vector errors; + Status status = Status::OK(); - // Check if there are pending errors in the Kudu session. If errors overflowed the error - // buffer we can't be sure all errors can be ignored and fail immediately, setting 'failed' - // to true. - bool failed = false; - session_->GetPendingErrors(&errors, &failed); - if (UNLIKELY(failed)) { - error_msg_buffer << "Error overflow in Kudu session, " - << "previous write operation might be inconsistent.\n"; + // Get the pending errors from the Kudu session. If errors overflowed the error buffer + // we can't be sure all errors can be ignored, so an error status will be reported. + bool error_overflow = false; + session_->GetPendingErrors(&errors, &error_overflow); + if (UNLIKELY(error_overflow)) { + status = Status("Error overflow in Kudu session."); } // The memory for the errors is manually managed. Iterate over all errors and delete // them accordingly. - bool first_error = true; for (int i = 0; i < errors.size(); ++i) { kudu::Status e = errors[i]->status(); // If the sink has the option "ignore_not_found_or_duplicate" set, duplicate key or @@ -265,24 +295,39 @@ Status KuduTableSink::Flush(int64_t* error_count) { ((sink_action_ == TSinkAction::DELETE && !e.IsNotFound()) || (sink_action_ == TSinkAction::UPDATE && !e.IsNotFound()) || (sink_action_ == TSinkAction::INSERT && !e.IsAlreadyPresent()))) { - if (first_error) { - error_msg_buffer << "Error while flushing Kudu session: \n"; - first_error = false; + if (status.ok()) { + status = Status(strings::Substitute( + "Kudu error(s) reported, first error: $0", e.ToString())); } - error_msg_buffer << e.ToString() << "\n"; - failed = true; + } + if (e.IsNotFound()) { + state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_NOT_FOUND, + table_desc_->table_name())); + } else if (e.IsAlreadyPresent()) { + state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_ALREADY_PRESENT, + table_desc_->table_name())); + } else { + state->LogError(ErrorMsg::Init(TErrorCode::KUDU_SESSION_ERROR, + table_desc_->table_name(), e.ToString())); } delete errors[i]; } COUNTER_ADD(kudu_error_counter_, errors.size()); - if (error_count != NULL) *error_count = errors.size(); - if (failed) return Status(error_msg_buffer.str()); - return Status::OK(); + return status; } Status KuduTableSink::FlushFinal(RuntimeState* state) { - // No buffered state to flush. - return Status::OK(); + kudu::Status flush_status = session_->Flush(); + + // Flush() may return an error status but any errors will also be reported by + // CheckForErrors(), so it's safe to ignore and always call CheckForErrors. + if (!flush_status.ok()) { + VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString(); + } + Status status = CheckForErrors(state); + (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_num_modified_rows( + rows_written_->value() - kudu_error_counter_->value()); + return status; } void KuduTableSink::Close(RuntimeState* state) { diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h index 2eeb721eb..fe278c5d8 100644 --- a/be/src/exec/kudu-table-sink.h +++ b/be/src/exec/kudu-table-sink.h @@ -30,17 +30,27 @@ namespace impala { -/// Sink that takes RowBatches and writes them into Kudu. -/// Currently the data is sent to Kudu on Send(), i.e. the data is batched on the -/// KuduSession until all the rows in a RowBatch are applied and then the session -/// is flushed. +/// Sink that takes RowBatches and writes them into a Kudu table. /// -/// Kudu doesn't have transactions (yet!) so some rows may fail to write while -/// others are successful. This sink will return an error if any of the rows fails -/// to be written. +/// The data is added to Kudu in Send(). The Kudu client is configured to automatically +/// flush records when enough data has been written (AUTO_FLUSH_BACKGROUND). This +/// requires specifying a mutation buffer size and a buffer flush watermark percentage in +/// the Kudu client. The mutation buffer needs to be large enough to buffer rows sent to +/// all destination nodes because the buffer accounting is not specified per-tablet +/// server (KUDU-1693). Tests showed that 100MB was a good default, and this is +/// configurable via the gflag --kudu_mutation_buffer_size. The buffer flush watermark +/// percentage is set to a value that results in Kudu flushing after 7MB is in a +/// buffer for a particular destination (of the 100MB of the total mutation buffer space) +/// because Kudu currently has some 8MB buffer limits. /// -/// TODO Once Kudu actually implements AUTOFLUSH_BACKGROUND flush mode we should -/// change the flushing behavior as it will likely make writes more efficient. +/// Kudu doesn't have transactions yet, so some rows may fail to write while others are +/// successful. The Kudu client reports errors, some of which may be considered to be +/// expected: rows that fail to be written/updated/deleted due to a key conflict while +/// the IGNORE option is specified, and these will not result in the sink returning an +/// error. These errors when IGNORE is not specified, or any other kind of error +/// reported by Kudu result in the sink returning an error status. The first non-ignored +/// error is returned in the sink's Status. All reported errors (ignored or not) will be +/// logged via the RuntimeState. class KuduTableSink : public DataSink { public: KuduTableSink(const RowDescriptor& row_desc, @@ -59,7 +69,7 @@ class KuduTableSink : public DataSink { /// The KuduSession is flushed on each row batch. virtual Status Send(RuntimeState* state, RowBatch* batch); - /// Does nothing. We currently flush on each Send() call. + /// Forces any remaining buffered operations to be flushed to Kudu. virtual Status FlushFinal(RuntimeState* state); /// Closes the KuduSession and the expressions. @@ -72,12 +82,11 @@ class KuduTableSink : public DataSink { /// Create a new write operation according to the sink type. kudu::client::KuduWriteOperation* NewWriteOp(); - /// Flushes the Kudu session, making sure all previous operations were committed, and handles - /// errors returned from Kudu. Passes the number of errors during the flush operations as an - /// out parameter. - /// Returns a non-OK status if there was an unrecoverable error. This might return an OK - /// status even if 'error_count' is > 0, as some errors might be ignored. - Status Flush(int64_t* error_count); + /// Checks for any errors buffered in the Kudu session, and increments + /// appropriate counters for ignored errors. + // + /// Returns a bad Status if there are non-ignorable errors. + Status CheckForErrors(RuntimeState* state); /// Used to get the KuduTableDescriptor from the RuntimeState TableId table_id_; @@ -102,15 +111,15 @@ class KuduTableSink : public DataSink { /// Captures parameters passed down from the frontend TKuduTableSink kudu_table_sink_; - /// Counts the number of calls to KuduSession::flush(). - RuntimeProfile::Counter* kudu_flush_counter_; - - /// Aggregates the times spent in KuduSession:flush(). - RuntimeProfile::Counter* kudu_flush_timer_; - /// Total number of errors returned from Kudu. RuntimeProfile::Counter* kudu_error_counter_; + /// Time spent applying Kudu operations. In normal circumstances, Apply() should be + /// negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled. + /// Significant time spent in Apply() may indicate that Kudu cannot buffer and send + /// rows as fast as the sink can write them. + RuntimeProfile::Counter* kudu_apply_timer_; + /// Total number of rows written including errors. RuntimeProfile::Counter* rows_written_; RuntimeProfile::Counter* rows_written_rate_; diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 4214d4dec..0f41deb1d 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -1668,11 +1668,15 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para for (const PartitionStatusMap::value_type& partition: params.insert_exec_status.per_partition_status) { TInsertPartitionStatus* status = &(per_partition_status_[partition.first]); - status->num_appended_rows += partition.second.num_appended_rows; - status->id = partition.second.id; - status->partition_base_dir = partition.second.partition_base_dir; - if (!status->__isset.stats) status->__set_stats(TInsertStats()); - DataSink::MergeInsertStats(partition.second.stats, &status->stats); + status->__set_num_modified_rows( + status->num_modified_rows + partition.second.num_modified_rows); + status->__set_id(partition.second.id); + status->__set_partition_base_dir(partition.second.partition_base_dir); + + if (partition.second.__isset.stats) { + if (!status->__isset.stats) status->__set_stats(TInsertStats()); + DataSink::MergeInsertStats(partition.second.stats, &status->stats); + } } files_to_move_.insert( params.insert_exec_status.files_to_move.begin(), diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index b50499e4b..63fa9cd78 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -523,8 +523,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id, for (const PartitionStatusMap::value_type& v: exec_state->coord()->per_partition_status()) { const pair partition_status = v; - insert_result->rows_appended[partition_status.first] = - partition_status.second.num_appended_rows; + insert_result->rows_modified[partition_status.first] = + partition_status.second.num_modified_rows; } } } diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 69473c55d..3dc9d6c78 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -956,7 +956,7 @@ void ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() { if (catalog_op_executor_->ddl_exec_response()->new_table_created) { DCHECK(coord_.get()); for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) { - total_num_rows_inserted += p.second.num_appended_rows; + total_num_rows_inserted += p.second.num_modified_rows; } } const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted); diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 736de3461..e9c3119f6 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -422,8 +422,8 @@ struct TInsertPartitionStatus { // query). See THdfsTable.partitions. 1: optional i64 id - // The number of rows appended to this partition - 2: optional i64 num_appended_rows + // The number of rows modified in this partition + 2: optional i64 num_modified_rows // Detailed statistics gathered by table writers for this partition 3: optional TInsertStats stats diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 794c140e0..573709b9d 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -247,10 +247,10 @@ enum TImpalaQueryOptions { // The summary of an insert. struct TInsertResult { - // Number of appended rows per modified partition. Only applies to HDFS tables. - // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the - // root in an unpartitioned table being the empty string. - 1: required map rows_appended + // Number of modified rows per partition. Only applies to HDFS and Kudu tables. + // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with + // the root in an unpartitioned table being the empty string. + 1: required map rows_modified } // Response from a call to PingImpalaService diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index ae338d52a..f03b07380 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -292,6 +292,12 @@ error_codes = ( ("NO_REGISTERED_BACKENDS", 94, "Cannot schedule query: no registered backends " "available."), + + ("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."), + + ("KUDU_KEY_NOT_FOUND", 96, "Key not found in Kudu table '$0'."), + + ("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1") ) import sys diff --git a/shell/impala_client.py b/shell/impala_client.py index 0d1c8353d..f57a0154d 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -354,7 +354,7 @@ class ImpalaClient(object): if status != RpcStatus.OK: raise RPCException() - num_rows = sum([int(k) for k in insert_result.rows_appended.values()]) + num_rows = sum([int(k) for k in insert_result.rows_modified.values()]) return num_rows def close_query(self, last_query_handle, query_handle_closed=False): diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test index a06d20369..e4f3205f0 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test @@ -29,10 +29,15 @@ insert into tdata values (3, "todd", cast(1.0 as float), 993393939, cast('c' as VARCHAR(20)), true) ---- RESULTS : 3 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 3.* ==== ---- QUERY update tdata set vali=43 where id = 1 ---- RESULTS +# TODO: Verify row count after fixing IMPALA-3713 (Here and UPDATE/DELETE below) +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -48,6 +53,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN # Try updating a varchar col. with a value that is bigger than it's size (truncated). update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as VARCHAR(20)) where id = 1 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -58,10 +65,11 @@ select * from tdata ---- TYPES INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ==== -==== ---- QUERY update tdata set valb=false where id = 1 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -75,6 +83,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY update tdata set vali=43 where id > 1 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 2.* ==== ---- QUERY select * from tdata @@ -88,6 +98,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY update tdata set name='unknown' where name = 'martin' ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -104,6 +116,8 @@ insert into tdata values (120, "she", cast(0.0 as float), 99, cast('f' as VARCHAR(20)), true) ---- RESULTS : 2 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 2.* ==== ---- QUERY select * from tdata @@ -119,6 +133,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY update tdata set name=null where id = 40 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -133,6 +149,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ==== ---- QUERY update tdata set name='he' where id = 40 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ---- RESULTS ==== ---- QUERY @@ -152,6 +170,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN insert into tdata values (320, '', 2.0, 932, cast('' as VARCHAR(20)), false) ---- RESULTS : 1 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select id, name, valv, valb from tdata where id = 320; @@ -169,6 +189,10 @@ create table ignore_column_case (Id int, NAME string, vAlf float, vali bigint, ==== ---- QUERY insert into ignore_column_case values (1, 'Martin', 1.0, 10); +---- RESULTS +: 1 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select ID, nAmE, VALF, VALI from ignore_column_case where NaMe = 'Martin'; @@ -182,36 +206,44 @@ insert into tdata values (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) ---- RESULTS : 1 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY insert into tdata values (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) ---- CATCH -Error while flushing Kudu session: +Kudu error(s) reported, first error: Already present ==== ---- QUERY insert ignore into tdata values (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) ---- RESULTS : 0 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 0.* ==== ---- QUERY --- Updating the same record twice +-- Updating the same record many times: cross join produces 7 identical updates update a set a.name='Satan' from tdata a, tdata b where a.id = 666 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7.* ==== ---- QUERY --- Does not exercise any error path in the sink because updating the same record twice --- is valid. Makes sure IGNORE works. +-- Does not exercise any error path in the sink because updating the same record multiple +-- times is valid. Makes sure IGNORE works. update ignore a set a.name='Satan' from tdata a, tdata b where a.id = 666 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7.* ==== ---- QUERY -- Using a cross join to generate the same delete twice. After the first delete succeeded, -- trying to execute the second delete will fail because the record does not exist. delete a from tdata a, tdata b where a.id = 666 ---- CATCH -Error while flushing Kudu session: +Kudu error(s) reported, first error: Not found: key not found ==== ---- QUERY -- Re-insert the data @@ -223,6 +255,8 @@ insert into tdata values ---- QUERY delete ignore a from tdata a, tdata b where a.id = 666 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column order correct @@ -242,6 +276,8 @@ insert into impala_3454 values ---- QUERY delete from impala_3454 where key_1 < (select max(key_2) from impala_3454) ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 2.* ==== ---- QUERY select * from impala_3454 @@ -250,3 +286,49 @@ select * from impala_3454 ---- TYPES TINYINT,BIGINT ==== +---- QUERY +CREATE TABLE kudu_test_tbl PRIMARY KEY(id) +DISTRIBUTE BY RANGE(id) SPLIT ROWS ((100000000)) +STORED AS KUDU AS +SELECT * FROM functional_kudu.alltypes WHERE id < 100; +---- RESULTS +'Inserted 100 row(s)' +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 100.* +==== +---- QUERY +INSERT IGNORE INTO kudu_test_tbl +SELECT * FROM functional_kudu.alltypes WHERE id < 100; +---- RESULTS +: 0 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 0.* +==== +---- QUERY +INSERT INTO kudu_test_tbl +SELECT * FROM functional_kudu.alltypes WHERE id < 100; +---- CATCH +Kudu error(s) reported, first error: Already present: key already present +==== +---- QUERY +INSERT IGNORE INTO kudu_test_tbl +SELECT * FROM functional_kudu.alltypes; +---- RESULTS +: 7200 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7200.* +==== +---- QUERY +# Test a larger UPDATE +UPDATE kudu_test_tbl SET int_col = -1; +---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7300.* +==== +---- QUERY +# Test a larger DELETE +DELETE FROM kudu_test_tbl WHERE id > -1; +---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7300.* +==== diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index e0f5d5512..7ed411e55 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -416,8 +416,8 @@ class ImpalaBeeswaxClient(object): """Executes an insert query""" result = self.__do_rpc(lambda: self.imp_service.CloseInsert(handle)) # The insert was successful - num_rows = sum(map(int, result.rows_appended.values())) - data = ["%s: %s" % row for row in result.rows_appended.iteritems()] + num_rows = sum(map(int, result.rows_modified.values())) + data = ["%s: %s" % row for row in result.rows_modified.iteritems()] exec_result = ImpalaBeeswaxResult(success=True, data=data) exec_result.summary = "Inserted %d rows" % (num_rows,) return exec_result