diff --git a/be/src/exec/data-sink.cc b/be/src/exec/data-sink.cc index c95b854e4..6a3454355 100644 --- a/be/src/exec/data-sink.cc +++ b/be/src/exec/data-sink.cc @@ -142,6 +142,11 @@ string DataSink::OutputInsertStats(const PartitionStatusMap& stats, } else { ss << partition_key << endl; } + if (val.second.__isset.num_modified_rows) { + ss << "NumModifiedRows: " << val.second.num_modified_rows << endl; + } + + if (!val.second.__isset.stats) continue; const TInsertStats& stats = val.second.stats; ss << indent << "BytesWritten: " << PrettyPrinter::Print(stats.bytes_written, TUnit::BYTES); diff --git a/be/src/exec/hbase-table-sink.cc b/be/src/exec/hbase-table-sink.cc index e6052cc5f..3d84fed79 100644 --- a/be/src/exec/hbase-table-sink.cc +++ b/be/src/exec/hbase-table-sink.cc @@ -72,7 +72,7 @@ Status HBaseTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { // Add a 'root partition' status in which to collect insert statistics TInsertPartitionStatus root_status; - root_status.__set_num_appended_rows(0L); + root_status.__set_num_modified_rows(0L); root_status.__set_stats(TInsertStats()); root_status.__set_id(-1L); state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status)); @@ -90,7 +90,7 @@ Status HBaseTableSink::Send(RuntimeState* state, RowBatch* batch) { RETURN_IF_ERROR(state->CheckQueryState()); // Since everything is set up just forward everything to the writer. RETURN_IF_ERROR(hbase_table_writer_->AppendRowBatch(batch)); - (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows += + (*state->per_partition_status())[ROOT_PARTITION_KEY].num_modified_rows += batch->num_rows(); return Status::OK(); } diff --git a/be/src/exec/hdfs-table-sink.cc b/be/src/exec/hdfs-table-sink.cc index 77316d4a9..63bd64878 100644 --- a/be/src/exec/hdfs-table-sink.cc +++ b/be/src/exec/hdfs-table-sink.cc @@ -499,7 +499,7 @@ inline Status HdfsTableSink::GetOutputPartition(RuntimeState* state, DCHECK(state->per_partition_status()->find(partition->partition_name) == state->per_partition_status()->end()); TInsertPartitionStatus partition_status; - partition_status.__set_num_appended_rows(0L); + partition_status.__set_num_modified_rows(0L); partition_status.__set_id(partition_descriptor->id()); partition_status.__set_stats(TInsertStats()); partition_status.__set_partition_base_dir(table_desc_->hdfs_base_dir()); @@ -594,7 +594,7 @@ Status HdfsTableSink::FinalizePartitionFile(RuntimeState* state, // Should have been created in GetOutputPartition() when the partition was // initialised. DCHECK(it != state->per_partition_status()->end()); - it->second.num_appended_rows += partition->num_rows; + it->second.num_modified_rows += partition->num_rows; DataSink::MergeInsertStats(partition->writer->stats(), &it->second.stats); } diff --git a/be/src/exec/kudu-table-sink.cc b/be/src/exec/kudu-table-sink.cc index 661489f79..70a74a97a 100644 --- a/be/src/exec/kudu-table-sink.cc +++ b/be/src/exec/kudu-table-sink.cc @@ -33,6 +33,8 @@ DEFINE_int32(kudu_session_timeout_seconds, 60, "Timeout set on the Kudu session. " "How long to wait before considering a write failed."); +DEFINE_int32(kudu_mutation_buffer_size, 100 * 1024 * 1024, "The size (bytes) of the " + "Kudu client buffer for mutations."); using kudu::client::KuduColumnSchema; using kudu::client::KuduSchema; @@ -48,6 +50,9 @@ namespace impala { const static string& ROOT_PARTITION_KEY = g_ImpalaInternalService_constants.ROOT_PARTITION_KEY; +// Send 7MB buffers to Kudu, matching a hard-coded size in Kudu (KUDU-1693). +const static int INDIVIDUAL_BUFFER_SIZE = 7 * 1024 * 1024; + KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, const vector& select_list_texprs, const TDataSink& tsink) @@ -56,8 +61,6 @@ KuduTableSink::KuduTableSink(const RowDescriptor& row_desc, select_list_texprs_(select_list_texprs), sink_action_(tsink.table_sink.action), kudu_table_sink_(tsink.table_sink.kudu_table_sink), - kudu_flush_counter_(NULL), - kudu_flush_timer_(NULL), kudu_error_counter_(NULL), rows_written_(NULL), rows_written_rate_(NULL) { @@ -91,16 +94,14 @@ Status KuduTableSink::Prepare(RuntimeState* state, MemTracker* mem_tracker) { // Add a 'root partition' status in which to collect write statistics TInsertPartitionStatus root_status; - root_status.__set_num_appended_rows(0L); - root_status.__set_stats(TInsertStats()); + root_status.__set_num_modified_rows(0L); root_status.__set_id(-1L); state->per_partition_status()->insert(make_pair(ROOT_PARTITION_KEY, root_status)); // Add counters - kudu_flush_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushOperations", TUnit::UNIT); kudu_error_counter_ = ADD_COUNTER(profile(), "TotalKuduFlushErrors", TUnit::UNIT); - kudu_flush_timer_ = ADD_TIMER(profile(), "KuduFlushTimer"); rows_written_ = ADD_COUNTER(profile(), "RowsWritten", TUnit::UNIT); + kudu_apply_timer_ = ADD_TIMER(profile(), "KuduApplyTimer"); rows_written_rate_ = profile()->AddDerivedCounter( "RowsWrittenRate", TUnit::UNIT_PER_SECOND, bind(&RuntimeProfile::UnitsPerSecond, rows_written_, @@ -124,8 +125,40 @@ Status KuduTableSink::Open(RuntimeState* state) { session_ = client_->NewSession(); session_->SetTimeoutMillis(FLAGS_kudu_session_timeout_seconds * 1000); + + // KuduSession Set* methods here and below return a status for API compatibility. + // As long as the Kudu client is statically linked, these shouldn't fail and thus these + // calls could also DCHECK status is OK for debug builds (while still returning errors + // for release). KUDU_RETURN_IF_ERROR(session_->SetFlushMode( - kudu::client::KuduSession::MANUAL_FLUSH), "Unable to set flush mode"); + kudu::client::KuduSession::AUTO_FLUSH_BACKGROUND), "Unable to set flush mode"); + + const int32_t buf_size = FLAGS_kudu_mutation_buffer_size; + if (buf_size < 1024 * 1024) { + return Status(strings::Substitute( + "Invalid kudu_mutation_buffer_size: '$0'. Must be greater than 1MB.", buf_size)); + } + KUDU_RETURN_IF_ERROR(session_->SetMutationBufferSpace(buf_size), + "Couldn't set mutation buffer size"); + + // Configure client memory used for buffering. + // Internally, the Kudu client keeps one or more buffers for writing operations. When a + // single buffer is flushed, it is locked (that space cannot be reused) until all + // operations within it complete, so it is important to have a number of buffers. In + // our testing, we found that allowing a total of 100MB of buffer space to provide good + // results; this is the default. Then, because of some existing 8MB limits in Kudu, we + // want to have that total space broken up into 7MB buffers (INDIVIDUAL_BUFFER_SIZE). + // The mutation flush watermark is set to flush every INDIVIDUAL_BUFFER_SIZE. + int num_buffers = FLAGS_kudu_mutation_buffer_size / INDIVIDUAL_BUFFER_SIZE; + if (num_buffers == 0) num_buffers = 1; + KUDU_RETURN_IF_ERROR(session_->SetMutationBufferFlushWatermark(1.0 / num_buffers), + "Couldn't set mutation buffer watermark"); + + // No limit on the buffer count since the settings above imply a max number of buffers. + // Note that the Kudu client API has a few too many knobs for configuring the size and + // number of these buffers; there are a few ways to accomplish similar behaviors. + KUDU_RETURN_IF_ERROR(session_->SetMutationBufferMaxNum(0), + "Couldn't set mutation buffer count"); return Status::OK(); } @@ -135,7 +168,8 @@ kudu::client::KuduWriteOperation* KuduTableSink::NewWriteOp() { } else if (sink_action_ == TSinkAction::UPDATE) { return table_->NewUpdate(); } else { - DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported. " << sink_action_; + DCHECK(sink_action_ == TSinkAction::DELETE) << "Sink type not supported: " + << sink_action_; return table_->NewDelete(); } } @@ -145,11 +179,15 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { ExprContext::FreeLocalAllocations(output_expr_ctxs_); RETURN_IF_ERROR(state->CheckQueryState()); + // Collect all write operations and apply them together so the time in Apply() can be + // easily timed. + vector> write_ops; + int rows_added = 0; // Since everything is set up just forward everything to the writer. for (int i = 0; i < batch->num_rows(); ++i) { TupleRow* current_row = batch->GetRow(i); - gscoped_ptr write(NewWriteOp()); + unique_ptr write(NewWriteOp()); for (int j = 0; j < output_expr_ctxs_.size(); ++j) { int col = kudu_table_sink_.referenced_columns.empty() ? @@ -173,7 +211,7 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { case TYPE_STRING: { StringValue* sv = reinterpret_cast(value); kudu::Slice slice(reinterpret_cast(sv->ptr), sv->len); - KUDU_RETURN_IF_ERROR(write->mutable_row()->SetStringNoCopy(col, slice), + KUDU_RETURN_IF_ERROR(write->mutable_row()->SetString(col, slice), "Could not add Kudu WriteOp."); break; } @@ -216,46 +254,38 @@ Status KuduTableSink::Send(RuntimeState* state, RowBatch* batch) { return Status(TErrorCode::IMPALA_KUDU_TYPE_MISSING, TypeToString(type)); } } - - KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), - "Error while applying Kudu session."); - ++rows_added; + write_ops.push_back(move(write)); } + + { + SCOPED_TIMER(kudu_apply_timer_); + for (auto&& write: write_ops) { + KUDU_RETURN_IF_ERROR(session_->Apply(write.release()), "Error applying Kudu Op."); + ++rows_added; + } + } + COUNTER_ADD(rows_written_, rows_added); - int64_t error_count = 0; - RETURN_IF_ERROR(Flush(&error_count)); - (*state->per_partition_status())[ROOT_PARTITION_KEY].num_appended_rows += - rows_added - error_count; + RETURN_IF_ERROR(CheckForErrors(state)); return Status::OK(); } -Status KuduTableSink::Flush(int64_t* error_count) { - // TODO right now we always flush an entire row batch, if these are small we'll - // be inefficient. Consider decoupling impala's batch size from kudu's - kudu::Status s; - { - SCOPED_TIMER(kudu_flush_timer_); - COUNTER_ADD(kudu_flush_counter_, 1); - s = session_->Flush(); - } - if (LIKELY(s.ok())) return Status::OK(); +Status KuduTableSink::CheckForErrors(RuntimeState* state) { + if (session_->CountPendingErrors() == 0) return Status::OK(); - stringstream error_msg_buffer; vector errors; + Status status = Status::OK(); - // Check if there are pending errors in the Kudu session. If errors overflowed the error - // buffer we can't be sure all errors can be ignored and fail immediately, setting 'failed' - // to true. - bool failed = false; - session_->GetPendingErrors(&errors, &failed); - if (UNLIKELY(failed)) { - error_msg_buffer << "Error overflow in Kudu session, " - << "previous write operation might be inconsistent.\n"; + // Get the pending errors from the Kudu session. If errors overflowed the error buffer + // we can't be sure all errors can be ignored, so an error status will be reported. + bool error_overflow = false; + session_->GetPendingErrors(&errors, &error_overflow); + if (UNLIKELY(error_overflow)) { + status = Status("Error overflow in Kudu session."); } // The memory for the errors is manually managed. Iterate over all errors and delete // them accordingly. - bool first_error = true; for (int i = 0; i < errors.size(); ++i) { kudu::Status e = errors[i]->status(); // If the sink has the option "ignore_not_found_or_duplicate" set, duplicate key or @@ -265,24 +295,39 @@ Status KuduTableSink::Flush(int64_t* error_count) { ((sink_action_ == TSinkAction::DELETE && !e.IsNotFound()) || (sink_action_ == TSinkAction::UPDATE && !e.IsNotFound()) || (sink_action_ == TSinkAction::INSERT && !e.IsAlreadyPresent()))) { - if (first_error) { - error_msg_buffer << "Error while flushing Kudu session: \n"; - first_error = false; + if (status.ok()) { + status = Status(strings::Substitute( + "Kudu error(s) reported, first error: $0", e.ToString())); } - error_msg_buffer << e.ToString() << "\n"; - failed = true; + } + if (e.IsNotFound()) { + state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_NOT_FOUND, + table_desc_->table_name())); + } else if (e.IsAlreadyPresent()) { + state->LogError(ErrorMsg::Init(TErrorCode::KUDU_KEY_ALREADY_PRESENT, + table_desc_->table_name())); + } else { + state->LogError(ErrorMsg::Init(TErrorCode::KUDU_SESSION_ERROR, + table_desc_->table_name(), e.ToString())); } delete errors[i]; } COUNTER_ADD(kudu_error_counter_, errors.size()); - if (error_count != NULL) *error_count = errors.size(); - if (failed) return Status(error_msg_buffer.str()); - return Status::OK(); + return status; } Status KuduTableSink::FlushFinal(RuntimeState* state) { - // No buffered state to flush. - return Status::OK(); + kudu::Status flush_status = session_->Flush(); + + // Flush() may return an error status but any errors will also be reported by + // CheckForErrors(), so it's safe to ignore and always call CheckForErrors. + if (!flush_status.ok()) { + VLOG_RPC << "Ignoring Flush() error status: " << flush_status.ToString(); + } + Status status = CheckForErrors(state); + (*state->per_partition_status())[ROOT_PARTITION_KEY].__set_num_modified_rows( + rows_written_->value() - kudu_error_counter_->value()); + return status; } void KuduTableSink::Close(RuntimeState* state) { diff --git a/be/src/exec/kudu-table-sink.h b/be/src/exec/kudu-table-sink.h index 2eeb721eb..fe278c5d8 100644 --- a/be/src/exec/kudu-table-sink.h +++ b/be/src/exec/kudu-table-sink.h @@ -30,17 +30,27 @@ namespace impala { -/// Sink that takes RowBatches and writes them into Kudu. -/// Currently the data is sent to Kudu on Send(), i.e. the data is batched on the -/// KuduSession until all the rows in a RowBatch are applied and then the session -/// is flushed. +/// Sink that takes RowBatches and writes them into a Kudu table. /// -/// Kudu doesn't have transactions (yet!) so some rows may fail to write while -/// others are successful. This sink will return an error if any of the rows fails -/// to be written. +/// The data is added to Kudu in Send(). The Kudu client is configured to automatically +/// flush records when enough data has been written (AUTO_FLUSH_BACKGROUND). This +/// requires specifying a mutation buffer size and a buffer flush watermark percentage in +/// the Kudu client. The mutation buffer needs to be large enough to buffer rows sent to +/// all destination nodes because the buffer accounting is not specified per-tablet +/// server (KUDU-1693). Tests showed that 100MB was a good default, and this is +/// configurable via the gflag --kudu_mutation_buffer_size. The buffer flush watermark +/// percentage is set to a value that results in Kudu flushing after 7MB is in a +/// buffer for a particular destination (of the 100MB of the total mutation buffer space) +/// because Kudu currently has some 8MB buffer limits. /// -/// TODO Once Kudu actually implements AUTOFLUSH_BACKGROUND flush mode we should -/// change the flushing behavior as it will likely make writes more efficient. +/// Kudu doesn't have transactions yet, so some rows may fail to write while others are +/// successful. The Kudu client reports errors, some of which may be considered to be +/// expected: rows that fail to be written/updated/deleted due to a key conflict while +/// the IGNORE option is specified, and these will not result in the sink returning an +/// error. These errors when IGNORE is not specified, or any other kind of error +/// reported by Kudu result in the sink returning an error status. The first non-ignored +/// error is returned in the sink's Status. All reported errors (ignored or not) will be +/// logged via the RuntimeState. class KuduTableSink : public DataSink { public: KuduTableSink(const RowDescriptor& row_desc, @@ -59,7 +69,7 @@ class KuduTableSink : public DataSink { /// The KuduSession is flushed on each row batch. virtual Status Send(RuntimeState* state, RowBatch* batch); - /// Does nothing. We currently flush on each Send() call. + /// Forces any remaining buffered operations to be flushed to Kudu. virtual Status FlushFinal(RuntimeState* state); /// Closes the KuduSession and the expressions. @@ -72,12 +82,11 @@ class KuduTableSink : public DataSink { /// Create a new write operation according to the sink type. kudu::client::KuduWriteOperation* NewWriteOp(); - /// Flushes the Kudu session, making sure all previous operations were committed, and handles - /// errors returned from Kudu. Passes the number of errors during the flush operations as an - /// out parameter. - /// Returns a non-OK status if there was an unrecoverable error. This might return an OK - /// status even if 'error_count' is > 0, as some errors might be ignored. - Status Flush(int64_t* error_count); + /// Checks for any errors buffered in the Kudu session, and increments + /// appropriate counters for ignored errors. + // + /// Returns a bad Status if there are non-ignorable errors. + Status CheckForErrors(RuntimeState* state); /// Used to get the KuduTableDescriptor from the RuntimeState TableId table_id_; @@ -102,15 +111,15 @@ class KuduTableSink : public DataSink { /// Captures parameters passed down from the frontend TKuduTableSink kudu_table_sink_; - /// Counts the number of calls to KuduSession::flush(). - RuntimeProfile::Counter* kudu_flush_counter_; - - /// Aggregates the times spent in KuduSession:flush(). - RuntimeProfile::Counter* kudu_flush_timer_; - /// Total number of errors returned from Kudu. RuntimeProfile::Counter* kudu_error_counter_; + /// Time spent applying Kudu operations. In normal circumstances, Apply() should be + /// negligible because it is asynchronous with AUTO_FLUSH_BACKGROUND enabled. + /// Significant time spent in Apply() may indicate that Kudu cannot buffer and send + /// rows as fast as the sink can write them. + RuntimeProfile::Counter* kudu_apply_timer_; + /// Total number of rows written including errors. RuntimeProfile::Counter* rows_written_; RuntimeProfile::Counter* rows_written_rate_; diff --git a/be/src/runtime/coordinator.cc b/be/src/runtime/coordinator.cc index 4214d4dec..0f41deb1d 100644 --- a/be/src/runtime/coordinator.cc +++ b/be/src/runtime/coordinator.cc @@ -1668,11 +1668,15 @@ Status Coordinator::UpdateFragmentExecStatus(const TReportExecStatusParams& para for (const PartitionStatusMap::value_type& partition: params.insert_exec_status.per_partition_status) { TInsertPartitionStatus* status = &(per_partition_status_[partition.first]); - status->num_appended_rows += partition.second.num_appended_rows; - status->id = partition.second.id; - status->partition_base_dir = partition.second.partition_base_dir; - if (!status->__isset.stats) status->__set_stats(TInsertStats()); - DataSink::MergeInsertStats(partition.second.stats, &status->stats); + status->__set_num_modified_rows( + status->num_modified_rows + partition.second.num_modified_rows); + status->__set_id(partition.second.id); + status->__set_partition_base_dir(partition.second.partition_base_dir); + + if (partition.second.__isset.stats) { + if (!status->__isset.stats) status->__set_stats(TInsertStats()); + DataSink::MergeInsertStats(partition.second.stats, &status->stats); + } } files_to_move_.insert( params.insert_exec_status.files_to_move.begin(), diff --git a/be/src/service/impala-beeswax-server.cc b/be/src/service/impala-beeswax-server.cc index b50499e4b..63fa9cd78 100644 --- a/be/src/service/impala-beeswax-server.cc +++ b/be/src/service/impala-beeswax-server.cc @@ -523,8 +523,8 @@ Status ImpalaServer::CloseInsertInternal(const TUniqueId& query_id, for (const PartitionStatusMap::value_type& v: exec_state->coord()->per_partition_status()) { const pair partition_status = v; - insert_result->rows_appended[partition_status.first] = - partition_status.second.num_appended_rows; + insert_result->rows_modified[partition_status.first] = + partition_status.second.num_modified_rows; } } } diff --git a/be/src/service/query-exec-state.cc b/be/src/service/query-exec-state.cc index 69473c55d..3dc9d6c78 100644 --- a/be/src/service/query-exec-state.cc +++ b/be/src/service/query-exec-state.cc @@ -956,7 +956,7 @@ void ImpalaServer::QueryExecState::SetCreateTableAsSelectResultSet() { if (catalog_op_executor_->ddl_exec_response()->new_table_created) { DCHECK(coord_.get()); for (const PartitionStatusMap::value_type& p: coord_->per_partition_status()) { - total_num_rows_inserted += p.second.num_appended_rows; + total_num_rows_inserted += p.second.num_modified_rows; } } const string& summary_msg = Substitute("Inserted $0 row(s)", total_num_rows_inserted); diff --git a/common/thrift/ImpalaInternalService.thrift b/common/thrift/ImpalaInternalService.thrift index 736de3461..e9c3119f6 100644 --- a/common/thrift/ImpalaInternalService.thrift +++ b/common/thrift/ImpalaInternalService.thrift @@ -422,8 +422,8 @@ struct TInsertPartitionStatus { // query). See THdfsTable.partitions. 1: optional i64 id - // The number of rows appended to this partition - 2: optional i64 num_appended_rows + // The number of rows modified in this partition + 2: optional i64 num_modified_rows // Detailed statistics gathered by table writers for this partition 3: optional TInsertStats stats diff --git a/common/thrift/ImpalaService.thrift b/common/thrift/ImpalaService.thrift index 794c140e0..573709b9d 100644 --- a/common/thrift/ImpalaService.thrift +++ b/common/thrift/ImpalaService.thrift @@ -247,10 +247,10 @@ enum TImpalaQueryOptions { // The summary of an insert. struct TInsertResult { - // Number of appended rows per modified partition. Only applies to HDFS tables. - // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with the - // root in an unpartitioned table being the empty string. - 1: required map rows_appended + // Number of modified rows per partition. Only applies to HDFS and Kudu tables. + // The keys represent partitions to create, coded as k1=v1/k2=v2/k3=v3..., with + // the root in an unpartitioned table being the empty string. + 1: required map rows_modified } // Response from a call to PingImpalaService diff --git a/common/thrift/generate_error_codes.py b/common/thrift/generate_error_codes.py index ae338d52a..f03b07380 100755 --- a/common/thrift/generate_error_codes.py +++ b/common/thrift/generate_error_codes.py @@ -292,6 +292,12 @@ error_codes = ( ("NO_REGISTERED_BACKENDS", 94, "Cannot schedule query: no registered backends " "available."), + + ("KUDU_KEY_ALREADY_PRESENT", 95, "Key already present in Kudu table '$0'."), + + ("KUDU_KEY_NOT_FOUND", 96, "Key not found in Kudu table '$0'."), + + ("KUDU_SESSION_ERROR", 97, "Error in Kudu table '$0': $1") ) import sys diff --git a/shell/impala_client.py b/shell/impala_client.py index 0d1c8353d..f57a0154d 100755 --- a/shell/impala_client.py +++ b/shell/impala_client.py @@ -354,7 +354,7 @@ class ImpalaClient(object): if status != RpcStatus.OK: raise RPCException() - num_rows = sum([int(k) for k in insert_result.rows_appended.values()]) + num_rows = sum([int(k) for k in insert_result.rows_modified.values()]) return num_rows def close_query(self, last_query_handle, query_handle_closed=False): diff --git a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test index a06d20369..e4f3205f0 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test +++ b/testdata/workloads/functional-query/queries/QueryTest/kudu_crud.test @@ -29,10 +29,15 @@ insert into tdata values (3, "todd", cast(1.0 as float), 993393939, cast('c' as VARCHAR(20)), true) ---- RESULTS : 3 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 3.* ==== ---- QUERY update tdata set vali=43 where id = 1 ---- RESULTS +# TODO: Verify row count after fixing IMPALA-3713 (Here and UPDATE/DELETE below) +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -48,6 +53,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN # Try updating a varchar col. with a value that is bigger than it's size (truncated). update tdata set valv=cast('aaaaaaaaaaaaaaaaaaaaaaaaaaaaaaaa' as VARCHAR(20)) where id = 1 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -58,10 +65,11 @@ select * from tdata ---- TYPES INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ==== -==== ---- QUERY update tdata set valb=false where id = 1 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -75,6 +83,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY update tdata set vali=43 where id > 1 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 2.* ==== ---- QUERY select * from tdata @@ -88,6 +98,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY update tdata set name='unknown' where name = 'martin' ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -104,6 +116,8 @@ insert into tdata values (120, "she", cast(0.0 as float), 99, cast('f' as VARCHAR(20)), true) ---- RESULTS : 2 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 2.* ==== ---- QUERY select * from tdata @@ -119,6 +133,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ---- QUERY update tdata set name=null where id = 40 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select * from tdata @@ -133,6 +149,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN ==== ---- QUERY update tdata set name='he' where id = 40 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ---- RESULTS ==== ---- QUERY @@ -152,6 +170,8 @@ INT,STRING,FLOAT,BIGINT,STRING,BOOLEAN insert into tdata values (320, '', 2.0, 932, cast('' as VARCHAR(20)), false) ---- RESULTS : 1 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select id, name, valv, valb from tdata where id = 320; @@ -169,6 +189,10 @@ create table ignore_column_case (Id int, NAME string, vAlf float, vali bigint, ==== ---- QUERY insert into ignore_column_case values (1, 'Martin', 1.0, 10); +---- RESULTS +: 1 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY select ID, nAmE, VALF, VALI from ignore_column_case where NaMe = 'Martin'; @@ -182,36 +206,44 @@ insert into tdata values (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) ---- RESULTS : 1 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY insert into tdata values (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) ---- CATCH -Error while flushing Kudu session: +Kudu error(s) reported, first error: Already present ==== ---- QUERY insert ignore into tdata values (666, "The Devil", cast(1.2 as float), 43, cast('z' as VARCHAR(20)), true) ---- RESULTS : 0 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 0.* ==== ---- QUERY --- Updating the same record twice +-- Updating the same record many times: cross join produces 7 identical updates update a set a.name='Satan' from tdata a, tdata b where a.id = 666 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7.* ==== ---- QUERY --- Does not exercise any error path in the sink because updating the same record twice --- is valid. Makes sure IGNORE works. +-- Does not exercise any error path in the sink because updating the same record multiple +-- times is valid. Makes sure IGNORE works. update ignore a set a.name='Satan' from tdata a, tdata b where a.id = 666 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7.* ==== ---- QUERY -- Using a cross join to generate the same delete twice. After the first delete succeeded, -- trying to execute the second delete will fail because the record does not exist. delete a from tdata a, tdata b where a.id = 666 ---- CATCH -Error while flushing Kudu session: +Kudu error(s) reported, first error: Not found: key not found ==== ---- QUERY -- Re-insert the data @@ -223,6 +255,8 @@ insert into tdata values ---- QUERY delete ignore a from tdata a, tdata b where a.id = 666 ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 1.* ==== ---- QUERY # IMPALA-3454: A delete that requires a rewrite may not get the Kudu column order correct @@ -242,6 +276,8 @@ insert into impala_3454 values ---- QUERY delete from impala_3454 where key_1 < (select max(key_2) from impala_3454) ---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 2.* ==== ---- QUERY select * from impala_3454 @@ -250,3 +286,49 @@ select * from impala_3454 ---- TYPES TINYINT,BIGINT ==== +---- QUERY +CREATE TABLE kudu_test_tbl PRIMARY KEY(id) +DISTRIBUTE BY RANGE(id) SPLIT ROWS ((100000000)) +STORED AS KUDU AS +SELECT * FROM functional_kudu.alltypes WHERE id < 100; +---- RESULTS +'Inserted 100 row(s)' +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 100.* +==== +---- QUERY +INSERT IGNORE INTO kudu_test_tbl +SELECT * FROM functional_kudu.alltypes WHERE id < 100; +---- RESULTS +: 0 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 0.* +==== +---- QUERY +INSERT INTO kudu_test_tbl +SELECT * FROM functional_kudu.alltypes WHERE id < 100; +---- CATCH +Kudu error(s) reported, first error: Already present: key already present +==== +---- QUERY +INSERT IGNORE INTO kudu_test_tbl +SELECT * FROM functional_kudu.alltypes; +---- RESULTS +: 7200 +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7200.* +==== +---- QUERY +# Test a larger UPDATE +UPDATE kudu_test_tbl SET int_col = -1; +---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7300.* +==== +---- QUERY +# Test a larger DELETE +DELETE FROM kudu_test_tbl WHERE id > -1; +---- RESULTS +---- RUNTIME_PROFILE +row_regex: .*NumModifiedRows: 7300.* +==== diff --git a/tests/beeswax/impala_beeswax.py b/tests/beeswax/impala_beeswax.py index e0f5d5512..7ed411e55 100644 --- a/tests/beeswax/impala_beeswax.py +++ b/tests/beeswax/impala_beeswax.py @@ -416,8 +416,8 @@ class ImpalaBeeswaxClient(object): """Executes an insert query""" result = self.__do_rpc(lambda: self.imp_service.CloseInsert(handle)) # The insert was successful - num_rows = sum(map(int, result.rows_appended.values())) - data = ["%s: %s" % row for row in result.rows_appended.iteritems()] + num_rows = sum(map(int, result.rows_modified.values())) + data = ["%s: %s" % row for row in result.rows_modified.iteritems()] exec_result = ImpalaBeeswaxResult(success=True, data=data) exec_result.summary = "Inserted %d rows" % (num_rows,) return exec_result