diff --git a/be/src/exec/analytic-eval-node.cc b/be/src/exec/analytic-eval-node.cc index f666edc8f..4c0d2423a 100644 --- a/be/src/exec/analytic-eval-node.cc +++ b/be/src/exec/analytic-eval-node.cc @@ -168,7 +168,10 @@ Status AnalyticEvalNode::Open(RuntimeState* state) { RETURN_IF_ERROR(child(0)->Open(state)); RETURN_IF_ERROR(state->block_mgr()->RegisterClient(2, mem_tracker(), state, &client_)); input_stream_.reset(new BufferedTupleStream(state, child(0)->row_desc(), - state->block_mgr(), client_, true /* delete_on_read */, true /* read_write */)); + state->block_mgr(), client_, + false /* initial_small_buffers */, + true /* delete_on_read */, + true /* read_write */)); RETURN_IF_ERROR(input_stream_->Init(runtime_profile())); DCHECK_EQ(evaluators_.size(), fn_ctxs_.size()); diff --git a/be/src/exec/cross-join-node.cc b/be/src/exec/cross-join-node.cc index cca036134..f59aa0d3a 100644 --- a/be/src/exec/cross-join-node.cc +++ b/be/src/exec/cross-join-node.cc @@ -82,6 +82,7 @@ Status CrossJoinNode::GetNext(RuntimeState* state, RowBatch* output_batch, bool* *eos = true; return Status::OK; } + *eos = false; ScopedTimer timer(probe_timer_); while (!eos_) { diff --git a/be/src/exec/hash-table.cc b/be/src/exec/hash-table.cc index 3893a7070..d5fc20c7c 100644 --- a/be/src/exec/hash-table.cc +++ b/be/src/exec/hash-table.cc @@ -45,6 +45,12 @@ static uint32_t SEED_PRIMES[] = { 338294347, }; +// The first NUM_SMALL_BLOCKS of nodes_ are made of blocks less than the io size to +// reduce the memory footprint of small queries. +static const int64_t INITIAL_DATA_PAGE_SIZES[] = + { 64 * 1024, 512 * 1024 }; +static const int NUM_SMALL_DATA_PAGES = sizeof(INITIAL_DATA_PAGE_SIZES) / sizeof(int64_t); + HashTableCtx::HashTableCtx(const vector& build_expr_ctxs, const vector& probe_expr_ctxs, bool stores_nulls, bool finds_nulls, int32_t initial_seed, int max_levels, int num_build_tuples) @@ -159,17 +165,19 @@ bool HashTableCtx::Equals(TupleRow* build_row) { const float HashTable::MAX_BUCKET_OCCUPANCY_FRACTION = 0.75f; HashTable::HashTable(RuntimeState* state, BufferedBlockMgr::Client* client, - int num_build_tuples, BufferedTupleStream* stream, int64_t max_num_buckets, - int64_t num_buckets) + int num_build_tuples, BufferedTupleStream* stream, bool use_initial_small_pages, + int64_t max_num_buckets, int64_t num_buckets) : state_(state), block_mgr_client_(client), tuple_stream_(stream), data_page_pool_(NULL), num_build_tuples_(num_build_tuples), stores_tuples_(num_build_tuples == 1), + use_initial_small_pages_(use_initial_small_pages), max_num_buckets_(max_num_buckets), num_filled_buckets_(0), num_nodes_(0), + total_data_page_size_(0), next_node_(NULL), node_remaining_current_page_(0), buckets_(NULL), @@ -187,9 +195,11 @@ HashTable::HashTable(MemPool* pool, int num_buckets) data_page_pool_(pool), num_build_tuples_(1), stores_tuples_(true), + use_initial_small_pages_(false), max_num_buckets_(-1), num_filled_buckets_(0), num_nodes_(0), + total_data_page_size_(0), next_node_(NULL), node_remaining_current_page_(0), buckets_(NULL), @@ -218,8 +228,7 @@ void HashTable::Close() { data_pages_[i]->Delete(); } if (ImpaladMetrics::HASH_TABLE_TOTAL_BYTES != NULL) { - ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment( - -data_pages_.size() * state_->io_mgr()->max_read_buffer_size()); + ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(-total_data_page_size_); } data_pages_.clear(); if (buckets_ != NULL) free(buckets_); @@ -228,10 +237,6 @@ void HashTable::Close() { } } -int64_t HashTable::byte_size() const { - return data_pages_.size() * state_->io_mgr()->max_read_buffer_size(); -} - void HashTable::UpdateProbeFilters(HashTableCtx* ht_ctx, vector >& bitmaps) { DCHECK_NOTNULL(ht_ctx); @@ -317,25 +322,30 @@ bool HashTable::ResizeBuckets(int64_t num_buckets) { } bool HashTable::GrowNodeArray() { - int64_t buffer_size = 0; + int64_t page_size = 0; if (block_mgr_client_ != NULL) { + page_size = state_->block_mgr()->max_block_size();; + if (use_initial_small_pages_ && data_pages_.size() < NUM_SMALL_DATA_PAGES) { + page_size = min(page_size, INITIAL_DATA_PAGE_SIZES[data_pages_.size()]); + } BufferedBlockMgr::Block* block = NULL; - Status status = state_->block_mgr()->GetNewBlock(block_mgr_client_, NULL, &block); + Status status = state_->block_mgr()->GetNewBlock( + block_mgr_client_, NULL, &block, page_size); if (!status.ok()) DCHECK(block == NULL); if (block == NULL) return false; data_pages_.push_back(block); - buffer_size = state_->io_mgr()->max_read_buffer_size(); - next_node_ = block->Allocate(buffer_size); - ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(buffer_size); + next_node_ = block->Allocate(page_size); + ImpaladMetrics::HASH_TABLE_TOTAL_BYTES->Increment(page_size); } else { // Only used for testing. DCHECK(data_page_pool_ != NULL); - buffer_size = TEST_PAGE_SIZE; - next_node_ = reinterpret_cast(data_page_pool_->Allocate(buffer_size)); + page_size = TEST_PAGE_SIZE; + next_node_ = reinterpret_cast(data_page_pool_->Allocate(page_size)); if (data_page_pool_->mem_tracker()->LimitExceeded()) return false; DCHECK(next_node_ != NULL); } - node_remaining_current_page_ = buffer_size / sizeof(Node); + node_remaining_current_page_ = page_size / sizeof(Node); + total_data_page_size_ += page_size; return true; } diff --git a/be/src/exec/hash-table.h b/be/src/exec/hash-table.h index d6142e526..a4b742e03 100644 --- a/be/src/exec/hash-table.h +++ b/be/src/exec/hash-table.h @@ -262,14 +262,17 @@ class HashTable { // - tuple_stream: the tuple stream which contains the tuple rows index by the // hash table. Can be NULL if the rows contain only a single tuple, in which // the 'tuple_stream' is unused. + // - use_initial_small_pages: if true the first fixed N data_pages_ will be smaller + // than the io buffer size. // - max_num_buckets: the maximum number of buckets that can be stored. If we // try to grow the number of buckets to a larger number, the inserts will fail. // -1, if it unlimited. // - initial_num_buckets: number of buckets that the hash table // should be initialized with. HashTable(RuntimeState* state, BufferedBlockMgr::Client* client, - int num_build_tuples, BufferedTupleStream* tuple_stream = NULL, - int64_t max_num_buckets = -1, int64_t initial_num_buckets = 1024); + int num_build_tuples, BufferedTupleStream* tuple_stream, + bool use_initial_small_pages, int64_t max_num_buckets, + int64_t initial_num_buckets = 1024); // Ctor used only for testing. Memory is allocated from the pool instead of the // block mgr. @@ -333,7 +336,7 @@ class HashTable { } // Returns the number of bytes allocated to the hash table - int64_t byte_size() const; + int64_t byte_size() const { return total_data_page_size_; } // Can be called after all insert calls to update the bitmap filters for the probe // side values. The bitmap filters are similar to Bloom filters in that they have no @@ -524,6 +527,9 @@ class HashTable { // TODO: ..or with template-ization const bool stores_tuples_; + // If true use small pages for the first few allocated data pages. + const bool use_initial_small_pages_; + const int64_t max_num_buckets_; // Number of non-empty buckets. Used to determine when to grow and rehash @@ -535,6 +541,9 @@ class HashTable { // Data pages for all nodes. These are always pinned. std::vector data_pages_; + // Byte size of all buffers in data_pages_. + int64_t total_data_page_size_; + // Next node to insert. Node* next_node_; diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 2f65854eb..cb9d12297 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -216,7 +216,9 @@ Status PartitionedAggregationNode::Open(RuntimeState* state) { if (needs_serialize_ && block_mgr_client_ != NULL) { serialize_stream_.reset(new BufferedTupleStream(state, *intermediate_row_desc_, - state->block_mgr(), block_mgr_client_, true /* delete on read */)); + state->block_mgr(), block_mgr_client_, + false, /* use initial small buffers */ + true /* delete on read */)); RETURN_IF_ERROR(serialize_stream_->Init(runtime_profile(), false)); DCHECK(serialize_stream_->has_write_block()); } @@ -339,12 +341,13 @@ Status PartitionedAggregationNode::GetNext(RuntimeState* state, } COUNTER_SET(rows_returned_counter_, num_rows_returned_); *eos = ReachedLimit(); + if (output_iterator_.AtEnd()) row_batch->MarkNeedToReturn(); return Status::OK; } void PartitionedAggregationNode::CleanupHashTbl(const vector& ctxs, HashTable::Iterator it) { - if (!needs_finalize_) return; + if (!needs_finalize_ && !needs_serialize_) return; while (!it.AtEnd()) { FinalizeTuple(ctxs, it.GetTuple(), mem_pool_.get()); // Avoid consuming excessive memory. @@ -410,12 +413,16 @@ Status PartitionedAggregationNode::Partition::InitStreams() { aggregated_row_stream.reset(new BufferedTupleStream(parent->state_, *parent->intermediate_row_desc_, parent->state_->block_mgr(), - parent->block_mgr_client_, true /* delete on read */)); + parent->block_mgr_client_, + level == 0, /* use small buffers */ + false /* delete on read */)); RETURN_IF_ERROR(aggregated_row_stream->Init(parent->runtime_profile())); unaggregated_row_stream.reset(new BufferedTupleStream(parent->state_, parent->child(0)->row_desc(), parent->state_->block_mgr(), - parent->block_mgr_client_, true /* delete on read */)); + parent->block_mgr_client_, + level == 0, /* use small buffers */ + true /* delete on read */)); // This stream is only used to spill, no need to ever have this pinned. RETURN_IF_ERROR(unaggregated_row_stream->Init(parent->runtime_profile(), false)); DCHECK(unaggregated_row_stream->has_write_block()); @@ -430,8 +437,9 @@ bool PartitionedAggregationNode::Partition::InitHashTable() { // TODO: we could switch to 64 bit hashes and then we don't need a max size. // It might be reasonable to limit individual hash table size for other reasons // though. + // Only use small buffers on level 0 (no repartitioning). hash_tbl.reset(new HashTable(parent->state_, parent->block_mgr_client_, 1, NULL, - 1 << (32 - NUM_PARTITIONING_BITS))); + level == 0, 1 << (32 - NUM_PARTITIONING_BITS))); return hash_tbl->Init(); } @@ -452,7 +460,7 @@ Status PartitionedAggregationNode::Partition::Spill(Tuple* intermediate_tuple) { DCHECK(!parent->serialize_stream_->is_pinned()); DCHECK(parent->serialize_stream_->has_write_block()); - const std::vector& evaluators = parent->aggregate_evaluators_;; + const vector& evaluators = parent->aggregate_evaluators_;; // Serialize and copy the spilled partition's stream into the new stream. bool failed_to_add = false; @@ -482,10 +490,11 @@ Status PartitionedAggregationNode::Partition::Spill(Tuple* intermediate_tuple) { // to remember where we are). if (failed_to_add) { parent->CleanupHashTbl(agg_fn_ctxs, it); + hash_tbl->Close(); + hash_tbl.reset(); aggregated_row_stream->Close(); RETURN_IF_ERROR(new_stream->status()); - DCHECK(false) << "How do we get here"; - return Status::MEM_LIMIT_EXCEEDED; + return parent->state_->block_mgr()->MemLimitTooLowError(parent->block_mgr_client_); } aggregated_row_stream->Close(); @@ -496,7 +505,9 @@ Status PartitionedAggregationNode::Partition::Spill(Tuple* intermediate_tuple) { // freed at least one buffer from this partition's (old) aggregated_row_stream. parent->serialize_stream_.reset(new BufferedTupleStream(parent->state_, *parent->intermediate_row_desc_, parent->state_->block_mgr(), - parent->block_mgr_client_, true /* delete on read */)); + parent->block_mgr_client_, + false, /* use small buffers */ + true /* delete on read */)); RETURN_IF_ERROR(parent->serialize_stream_->Init(parent->runtime_profile(), false)); DCHECK(parent->serialize_stream_->has_write_block()); } @@ -512,7 +523,8 @@ Status PartitionedAggregationNode::Partition::Spill(Tuple* intermediate_tuple) { hash_tbl->Close(); hash_tbl.reset(); - DCHECK(aggregated_row_stream->has_write_block()); + DCHECK(aggregated_row_stream->has_write_block()) + << aggregated_row_stream->DebugString(); RETURN_IF_ERROR(aggregated_row_stream->UnpinStream(false)); COUNTER_ADD(parent->num_spilled_partitions_, 1); @@ -526,7 +538,7 @@ void PartitionedAggregationNode::Partition::Close(bool finalize_rows) { if (is_closed) return; is_closed = true; if (aggregated_row_stream.get() != NULL) { - if (finalize_rows && parent->needs_finalize_ && hash_tbl.get() != NULL) { + if (finalize_rows && hash_tbl.get() != NULL) { // We need to walk all the rows and Finalize them here so the UDA gets a chance // to cleanup. If the hash table is gone (meaning this was spilled), the rows // should have been finalized/serialized in Spill(). @@ -815,13 +827,8 @@ Status PartitionedAggregationNode::SpillPartition(Partition* curr_partition, } } if (partition_idx == -1) { - DCHECK(false) << "This should never happen due to the reservation. This is " - "defense on release builds"; - // Could not find a partition to spill. This means the mem limit was just too - // low. - Status status = Status::MEM_LIMIT_EXCEEDED; - status.AddErrorMsg("Mem limit is too low to perform partitioned aggregation"); - return status; + // Could not find a partition to spill. This means the mem limit was just too low. + return state_->block_mgr()->MemLimitTooLowError(block_mgr_client_); } Partition* spilled_partition = hash_partitions_[partition_idx]; @@ -871,7 +878,7 @@ Status PartitionedAggregationNode::MoveHashPartitions(int64_t num_input_rows) { } } - VLOG_QUERY << ss.str(); + VLOG(2) << ss.str(); hash_partitions_.clear(); return Status::OK; } @@ -1086,7 +1093,8 @@ llvm::Function* PartitionedAggregationNode::CodegenUpdateSlot( // Create intermediate argument 'dst' from 'dst_value' const ColumnType& dst_type = evaluator->intermediate_type(); - CodegenAnyVal dst = CodegenAnyVal::GetNonNullVal(codegen, &builder, dst_type, "dst"); + CodegenAnyVal dst = CodegenAnyVal::GetNonNullVal( + codegen, &builder, dst_type, "dst"); dst.SetFromRawValue(dst_value); // Create pointer to dst to pass to ir_fn. We must use the unlowered type. Value* dst_lowered_ptr = codegen->CreateEntryBlockAlloca( diff --git a/be/src/exec/partitioned-aggregation-node.h b/be/src/exec/partitioned-aggregation-node.h index 1a29b83f3..81be2e393 100644 --- a/be/src/exec/partitioned-aggregation-node.h +++ b/be/src/exec/partitioned-aggregation-node.h @@ -71,6 +71,12 @@ class SlotDescriptor; // 4) Unaggregated tuple stream. Stream to spill unaggregated rows. // Rows in this stream always have child(0)'s layout. // +// Buffering: Each stream and hash table needs to maintain at least one buffer for +// some duration of the processing. To minimize the memory requirements of small queries +// (memory usage is less than one buffer per partition), the initial streams and +// hash tables will use smaller (less than io-sized) buffers. Once we spill, the streams +// and hash table will use io-sized buffers only. +// // TODO: Buffer rows before probing into the hash table? // TODO: after spilling, we can still maintain a very small hash table just to remove // some number of rows (from likely going to disk). diff --git a/be/src/exec/partitioned-hash-join-node.cc b/be/src/exec/partitioned-hash-join-node.cc index 3208e9aa5..173afbe3a 100644 --- a/be/src/exec/partitioned-hash-join-node.cc +++ b/be/src/exec/partitioned-hash-join-node.cc @@ -94,6 +94,7 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { } RETURN_IF_ERROR(BlockingJoinNode::Prepare(state)); + runtime_state_ = state; // build and probe exprs are evaluated in the context of the rows produced by our // right and left children, respectively @@ -110,17 +111,8 @@ Status PartitionedHashJoinNode::Prepare(RuntimeState* state) { RETURN_IF_ERROR(Expr::Prepare(other_join_conjunct_ctxs_, state, row_descriptor_)); AddExprCtxsToFree(other_join_conjunct_ctxs_); - // We need two output buffer per partition (one for build and one for probe) and - // and one additional buffer either for the input (while repartitioning). - // TODO: with more careful reasoning we can turn this to 1 per partition I think. - int num_reserved_buffers = PARTITION_FANOUT * 2 + 1; - if (join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN) { - // We need to maintain three additional buffers in this case to store the build/probe - // on the null aware partition. - num_reserved_buffers += 3; - } RETURN_IF_ERROR(state->block_mgr()->RegisterClient( - num_reserved_buffers, mem_tracker(), state, &block_mgr_client_)); + MinRequiredBuffers(), mem_tracker(), state, &block_mgr_client_)); bool should_store_nulls = join_op_ == TJoinOp::RIGHT_OUTER_JOIN || join_op_ == TJoinOp::RIGHT_ANTI_JOIN || join_op_ == TJoinOp::FULL_OUTER_JOIN; @@ -209,10 +201,12 @@ PartitionedHashJoinNode::Partition::Partition(RuntimeState* state, level_(level), build_rows_(state->obj_pool()->Add(new BufferedTupleStream( state, parent_->child(1)->row_desc(), state->block_mgr(), - parent_->block_mgr_client_))), + parent_->block_mgr_client_, + level == 0 /* use small buffers */))), probe_rows_(state->obj_pool()->Add(new BufferedTupleStream( state, parent_->child(0)->row_desc(), state->block_mgr(), parent_->block_mgr_client_, + level == 0, /* use small buffers */ true /* delete on read */))) { } @@ -309,7 +303,7 @@ Status PartitionedHashJoinNode::Partition::BuildHashTableInternal( HashTable::EstimatedNumBuckets(build_rows()->num_rows()); hash_tbl_.reset(new HashTable(state, parent_->block_mgr_client_, parent_->child(1)->row_desc().tuple_descriptors().size(), build_rows(), - 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets)); + level_ == 0, 1 << (32 - NUM_PARTITIONING_BITS), estimated_num_buckets)); if (!hash_tbl_->Init()) goto not_built; while (!eos) { @@ -417,13 +411,8 @@ Status PartitionedHashJoinNode::SpillPartition() { } if (partition_idx == -1) { - // Could not find a partition to spill. This means the mem limit was just too - // low. e.g. mem_limit too small that we can't put a buffer in front of each - // partition. - Status status = Status::MEM_LIMIT_EXCEEDED; - status.AddErrorMsg("Mem limit is too low to perform partitioned join. We do not " - "have enough memory to maintain a buffer per partition."); - return status; + // Could not find a partition to spill. This means the mem limit was just too low. + return runtime_state_->block_mgr()->MemLimitTooLowError(block_mgr_client_); } VLOG(2) << "Spilling partition: " << partition_idx << endl << NodeDebugString(); RETURN_IF_ERROR(hash_partitions_[partition_idx]->Spill(false)); @@ -530,7 +519,7 @@ Status PartitionedHashJoinNode::ProcessBuildInput(RuntimeState* state, int level << " #rows:" << partition->build_rows()->num_rows() << endl; COUNTER_SET(largest_partition_percent_, static_cast(percent)); } - VLOG_QUERY << ss.str(); + VLOG(2) << ss.str(); COUNTER_ADD(num_build_rows_partitioned_, total_build_rows); non_empty_build_ |= (total_build_rows > 0); diff --git a/be/src/exec/partitioned-hash-join-node.h b/be/src/exec/partitioned-hash-join-node.h index b6adba445..64a87c123 100644 --- a/be/src/exec/partitioned-hash-join-node.h +++ b/be/src/exec/partitioned-hash-join-node.h @@ -257,6 +257,17 @@ class PartitionedHashJoinNode : public BlockingJoinNode { std::string NodeDebugString() const; + // We need two output buffers per partition (one for build and one for probe) and + // and one additional buffer for the input (while repartitioning). + // For NAAJ, we need 3 additional buffers to maintain the null_aware_partition_. + int MinRequiredBuffers() const { + int num_reserved_buffers = PARTITION_FANOUT * 2 + 1; + num_reserved_buffers += join_op_ == TJoinOp::NULL_AWARE_LEFT_ANTI_JOIN ? 3 : 0; + return num_reserved_buffers; + } + + RuntimeState* runtime_state_; + // our equi-join predicates " = " are separated into // build_expr_ctxs_ (over child(1)) and probe_expr_ctxs_ (over child(0)) std::vector probe_expr_ctxs_; diff --git a/be/src/exec/select-node.cc b/be/src/exec/select-node.cc index 0cb347fd0..dfb251505 100644 --- a/be/src/exec/select-node.cc +++ b/be/src/exec/select-node.cc @@ -60,11 +60,12 @@ Status SelectNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) RETURN_IF_CANCELLED(state); RETURN_IF_ERROR(QueryMaintenance(state)); if (child_row_idx_ == child_row_batch_->num_rows()) { + child_row_idx_ = 0; // fetch next batch child_row_batch_->TransferResourceOwnership(row_batch); child_row_batch_->Reset(); + if (row_batch->AtCapacity()) return Status::OK; RETURN_IF_ERROR(child(0)->GetNext(state, child_row_batch_.get(), &child_eos_)); - child_row_idx_ = 0; } if (CopyRows(row_batch)) { diff --git a/be/src/runtime/buffered-block-mgr-test.cc b/be/src/runtime/buffered-block-mgr-test.cc index bf3720d1a..ba7efb031 100644 --- a/be/src/runtime/buffered-block-mgr-test.cc +++ b/be/src/runtime/buffered-block-mgr-test.cc @@ -310,7 +310,8 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) { int max_num_blocks = 3; shared_ptr block_mgr = CreateMgr(max_num_blocks); BufferedBlockMgr::Client* client; - Status status = block_mgr->RegisterClient(0, NULL, runtime_state_.get(), &client); + MemTracker tracker; + Status status = block_mgr->RegisterClient(0, &tracker, runtime_state_.get(), &client); EXPECT_TRUE(status.ok()); EXPECT_TRUE(block_mgr_parent_tracker_->consumption() == 0); @@ -321,8 +322,9 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) { status = block_mgr->GetNewBlock(client, NULL, &new_block, 128); EXPECT_TRUE(new_block != NULL); EXPECT_TRUE(status.ok()); - EXPECT_EQ(block_mgr->bytes_allocated(), 128); - EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 128); + EXPECT_EQ(block_mgr->bytes_allocated(), 0); + EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 0); + EXPECT_EQ(tracker.consumption(), 128); EXPECT_TRUE(new_block->is_pinned()); EXPECT_EQ(new_block->BytesRemaining(), 128); EXPECT_TRUE(new_block->buffer() != NULL); @@ -332,8 +334,9 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) { status = block_mgr->GetNewBlock(client, NULL, &new_block); EXPECT_TRUE(new_block != NULL); EXPECT_TRUE(status.ok()); - EXPECT_EQ(block_mgr->bytes_allocated(), 128 + block_mgr->max_block_size()); - EXPECT_EQ(block_mgr_parent_tracker_->consumption(), 128 + block_mgr->max_block_size()); + EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size()); + EXPECT_EQ(block_mgr_parent_tracker_->consumption(), block_mgr->max_block_size()); + EXPECT_EQ(tracker.consumption(), 128 + block_mgr->max_block_size()); EXPECT_TRUE(new_block->is_pinned()); EXPECT_EQ(new_block->BytesRemaining(), block_mgr->max_block_size()); EXPECT_TRUE(new_block->buffer() != NULL); @@ -343,9 +346,9 @@ TEST_F(BufferedBlockMgrTest, GetNewBlockSmallBlocks) { status = block_mgr->GetNewBlock(client, NULL, &new_block, 512); EXPECT_TRUE(new_block != NULL); EXPECT_TRUE(status.ok()); - EXPECT_EQ(block_mgr->bytes_allocated(), 128 + 512 + block_mgr->max_block_size()); - EXPECT_EQ(block_mgr_parent_tracker_->consumption(), - 128 + 512 + block_mgr->max_block_size()); + EXPECT_EQ(block_mgr->bytes_allocated(), block_mgr->max_block_size()); + EXPECT_EQ(block_mgr_parent_tracker_->consumption(), block_mgr->max_block_size()); + EXPECT_EQ(tracker.consumption(), 128 + 512 + block_mgr->max_block_size()); EXPECT_TRUE(new_block->is_pinned()); EXPECT_EQ(new_block->BytesRemaining(), 512); EXPECT_TRUE(new_block->buffer() != NULL); diff --git a/be/src/runtime/buffered-block-mgr.cc b/be/src/runtime/buffered-block-mgr.cc index 555b3b0ae..06c747944 100644 --- a/be/src/runtime/buffered-block-mgr.cc +++ b/be/src/runtime/buffered-block-mgr.cc @@ -22,20 +22,23 @@ #include "util/filesystem-util.h" #include "util/impalad-metrics.h" #include "util/uid-util.h" -#include #include #include #include #include -using namespace std; +#include + using namespace boost; using namespace strings; // for Substitute DEFINE_bool(disk_spill_encryption, false, "Set this to encrypt and perform an integrity " "check on all data spilled to disk during a query"); +using namespace std; +using namespace strings; + namespace impala { BufferedBlockMgr::BlockMgrsMap BufferedBlockMgr::query_to_block_mgrs_; @@ -83,15 +86,19 @@ struct BufferedBlockMgr::Client { void PinBuffer(BufferDescriptor* buffer) { DCHECK_NOTNULL(buffer); - ++num_pinned_buffers_; - if (tracker_ != NULL) tracker_->ConsumeLocal(buffer->len, query_tracker_); + if (buffer->len == mgr_->max_block_size()) { + ++num_pinned_buffers_; + if (tracker_ != NULL) tracker_->ConsumeLocal(buffer->len, query_tracker_); + } } void UnpinBuffer(BufferDescriptor* buffer) { DCHECK_NOTNULL(buffer); - DCHECK_GT(num_pinned_buffers_, 0); - --num_pinned_buffers_; - if (tracker_ != NULL) tracker_->ReleaseLocal(buffer->len, query_tracker_); + if (buffer->len == mgr_->max_block_size()) { + DCHECK_GT(num_pinned_buffers_, 0); + --num_pinned_buffers_; + if (tracker_ != NULL) tracker_->ReleaseLocal(buffer->len, query_tracker_); + } } string DebugString() const { @@ -110,7 +117,8 @@ BufferedBlockMgr::Block::Block(BufferedBlockMgr* block_mgr) block_mgr_(block_mgr), client_(NULL), write_range_(NULL), - valid_data_len_(0) { + valid_data_len_(0), + num_rows_(0) { } Status BufferedBlockMgr::Block::Pin(bool* pinned, Block* release_block, bool unpin) { @@ -138,6 +146,7 @@ void BufferedBlockMgr::Block::Init() { is_deleted_ = false; valid_data_len_ = 0; client_ = NULL; + num_rows_ = 0; } bool BufferedBlockMgr::Block::Validate() const { @@ -169,7 +178,9 @@ string BufferedBlockMgr::Block::DebugString() const { ss << "Block: " << this << endl << " Buffer Desc: " << buffer_desc_ << endl << " Data Len: " << valid_data_len_ << endl - << " Deleted: " << is_deleted_ << endl + << " Num Rows: " << num_rows_ << endl; + if (is_pinned_) ss << " Buffer Len: " << buffer_len() << endl; + ss << " Deleted: " << is_deleted_ << endl << " Pinned: " << is_pinned_ << endl << " Write Issued: " << in_write_ << endl << " Client Local: " << client_local_; @@ -291,6 +302,12 @@ void BufferedBlockMgr::ClearTmpReservation(Client* client) { bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) { int buffers_needed = BitUtil::Ceil(size, max_block_size()); + if (size < max_block_size() && mem_tracker_->TryConsume(size)) { + // For small allocations (less than a block size), just let the allocation through. + client->tracker_->ConsumeLocal(size, client->query_tracker_); + return true; + } + unique_lock lock(lock_); if (max(0L, remaining_unreserved_buffers()) + client->num_tmp_reserved_buffers_ < @@ -316,7 +333,7 @@ bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) { // Loop until we have freed enough memory. while (buffers_needed > 0) { BufferDescriptor* buffer_desc = NULL; - FindBuffer(lock, &buffer_desc); + FindBuffer(lock, false, &buffer_desc); if (buffer_desc == NULL) { if (additional_tmp_reservations > 0) { client->num_tmp_reserved_buffers_ -= additional_tmp_reservations; @@ -328,17 +345,17 @@ bool BufferedBlockMgr::ConsumeMemory(Client* client, int64_t size) { all_io_buffers_.erase(buffer_desc->all_buffers_it); if (buffer_desc->block != NULL) buffer_desc->block->buffer_desc_ = NULL; delete[] buffer_desc->buffer; - mem_tracker_->Release(buffer_desc->len); + --additional_tmp_reservations; DCHECK_GT(client->num_tmp_reserved_buffers_, 0); --client->num_tmp_reserved_buffers_; --unfullfilled_reserved_buffers_; } - mem_tracker_->Consume(size); - client->tracker_->ConsumeLocal(size, client->query_tracker_); WriteUnpinnedBlocks(); + if (!mem_tracker_->TryConsume(size)) return false; + client->tracker_->ConsumeLocal(size, client->query_tracker_); DCHECK(Validate()) << endl << DebugInternal(); return true; } @@ -354,34 +371,49 @@ void BufferedBlockMgr::Cancel() { io_mgr_->CancelContext(io_request_context_); } +Status BufferedBlockMgr::MemLimitTooLowError(Client* client) { + // TODO: what to print here. We can't know the value of the entire query here. + Status status = Status::MEM_LIMIT_EXCEEDED; + status.AddErrorMsg(Substitute("The memory limit is set too low initialize the" + " spilling operator. The minimum required memory to spill this operator is $0.", + PrettyPrinter::Print(client->num_reserved_buffers_ * max_block_size(), + TCounterType::BYTES))); + VLOG_QUERY << "Query: " << query_id_ << " ran out of memory: " << endl + << DebugInternal() << endl << client->DebugString() << endl + << GetStackTrace(); + return status; +} + Status BufferedBlockMgr::GetNewBlock(Client* client, Block* unpin_block, Block** block, int64_t len) { + DCHECK_LE(len, max_block_size_) << "Cannot request blocks bigger than max_len"; + *block = NULL; if (is_cancelled_.Read() == 1) return Status::CANCELLED; Block* new_block = NULL; { lock_guard lock(lock_); new_block = GetUnusedBlock(client); - } - DCHECK_NOTNULL(new_block); - DCHECK(new_block->client_ == client); - if (len >= 0) { - DCHECK(unpin_block == NULL); - DCHECK_LT(len, max_block_size_) << "Cannot request blocks bigger than max_len"; - if (mem_tracker_->TryConsume(len)) { - uint8_t* buffer = new uint8_t[len]; - new_block->buffer_desc_ = obj_pool_.Add(new BufferDescriptor(buffer, len)); - new_block->buffer_desc_->block = new_block; - new_block->is_pinned_ = true; - client->PinBuffer(new_block->buffer_desc_); - ++total_pinned_buffers_; - *block = new_block; - return Status::OK; - } else { - new_block->is_deleted_ = true; - ReturnUnusedBlock(new_block); - return Status::OK; + DCHECK_NOTNULL(new_block); + DCHECK(new_block->client_ == client); + + if (len >= 0 && len < max_block_size_) { + DCHECK(unpin_block == NULL); + if (client->tracker_->TryConsume(len)) { + uint8_t* buffer = new uint8_t[len]; + new_block->buffer_desc_ = obj_pool_.Add(new BufferDescriptor(buffer, len)); + new_block->buffer_desc_->block = new_block; + new_block->is_pinned_ = true; + client->PinBuffer(new_block->buffer_desc_); + ++total_pinned_buffers_; + *block = new_block; + return Status::OK; + } else { + new_block->is_deleted_ = true; + ReturnUnusedBlock(new_block); + return Status::OK; + } } } @@ -726,12 +758,12 @@ Status BufferedBlockMgr::DeleteBlock(Block* block) { block->is_deleted_ = true; if (block->is_pinned_) { + if (block->is_max_size()) --total_pinned_buffers_; block->is_pinned_ = false; block->client_->UnpinBuffer(block->buffer_desc_); if (block->client_->num_pinned_buffers_ < block->client_->num_reserved_buffers_) { ++unfullfilled_reserved_buffers_; } - --total_pinned_buffers_; } else if (unpinned_blocks_.Contains(block)) { // Remove block from unpinned list. unpinned_blocks_.Remove(block); @@ -746,7 +778,7 @@ Status BufferedBlockMgr::DeleteBlock(Block* block) { if (block->buffer_desc_->len != max_block_size_) { // Just delete the block for now. delete[] block->buffer_desc_->buffer; - mem_tracker_->Release(block->buffer_desc_->len); + block->client_->tracker_->Release(block->buffer_desc_->len); } else if (!free_io_buffers_.Contains(block->buffer_desc_)) { free_io_buffers_.Enqueue(block->buffer_desc_); buffer_available_cv_.notify_one(); @@ -820,7 +852,7 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) { *in_mem = true; } else { BufferDescriptor* buffer_desc = NULL; - RETURN_IF_ERROR(FindBuffer(l, &buffer_desc)); + RETURN_IF_ERROR(FindBuffer(l, true, &buffer_desc)); if (buffer_desc == NULL) { // There are no free buffers or blocks we can evict. We need to fail this request. @@ -849,11 +881,11 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) { buffer_desc->block = block; block->buffer_desc_ = buffer_desc; } + DCHECK_NOTNULL(block->buffer_desc_); + block->is_pinned_ = true; client->PinBuffer(block->buffer_desc_); ++total_pinned_buffers_; - DCHECK_NOTNULL(block->buffer_desc_); - block->is_pinned_ = true; DCHECK(block->Validate()) << endl << block->DebugString(); // The number of free buffers has decreased. Write unpinned blocks if the number // of free buffers below the threshold is reached. @@ -867,12 +899,12 @@ Status BufferedBlockMgr::FindBufferForBlock(Block* block, bool* in_mem) { // threshold, until we run out of memory. // 2. Pick a buffer from the free list. // 3. Wait and evict an unpinned buffer. -Status BufferedBlockMgr::FindBuffer(unique_lock& lock, +Status BufferedBlockMgr::FindBuffer(unique_lock& lock, bool can_allocate, BufferDescriptor** buffer_desc) { *buffer_desc = NULL; // First try to allocate a new buffer. - if (free_io_buffers_.size() < block_write_threshold_ && + if (can_allocate && free_io_buffers_.size() < block_write_threshold_ && mem_tracker_->TryConsume(max_block_size_)) { uint8_t* new_buffer = new uint8_t[max_block_size_]; *buffer_desc = obj_pool_.Add(new BufferDescriptor(new_buffer, max_block_size_)); diff --git a/be/src/runtime/buffered-block-mgr.h b/be/src/runtime/buffered-block-mgr.h index 2513feeae..3a295cd3f 100644 --- a/be/src/runtime/buffered-block-mgr.h +++ b/be/src/runtime/buffered-block-mgr.h @@ -40,7 +40,8 @@ class RuntimeState; // Clients that do partitioning will start with these smaller to reduce the minimum // buffering requirements and grow to the max sized buffer as input grows. For simplicity, // these buffers are not recycled (there's also not really a need since they are allocated -// all at once on query startup and then not allocated again). +// all at once on query startup and then not allocated again). These buffers are +// not counted against the reservation. // // BufferedBlockMgr reserves one buffer per disk ('block_write_threshold_') for itself. // When the number of free buffers falls below 'block_write_threshold', unpinned blocks @@ -148,6 +149,9 @@ class BufferedBlockMgr { // Non-blocking. Status Delete(); + void AddRow() { ++num_rows_; } + int num_rows() const { return num_rows_; } + // Allocates the specified number of bytes from this block. template T* Allocate(int size) { DCHECK_GE(BytesRemaining(), size); @@ -178,6 +182,20 @@ class BufferedBlockMgr { // Return the number of bytes allocated in this block. int64_t valid_data_len() const { return valid_data_len_; } + // Returns the length of the underlying buffer. Only callable if the block is + // pinned. + int64_t buffer_len() const { + DCHECK(is_pinned()); + return buffer_desc_->len; + } + + // Returns true if this block is the max block size. Only callable if the block + // is pinned. + bool is_max_size() const { + DCHECK(is_pinned()); + return buffer_desc_->len == block_mgr_->max_block_size(); + } + bool is_pinned() const { return is_pinned_; } // Debug helper method to print the state of a block. @@ -215,6 +233,9 @@ class BufferedBlockMgr { // Length of valid (i.e. allocated) data within the block. int64_t valid_data_len_; + // Number of rows in this block. + int num_rows_; + // If encryption_ is on, in the write path we allocate a new buffer to hold // encrypted data while it's being written to disk. The read path, having no // references to the data, can be decrypted in place. @@ -338,6 +359,10 @@ class BufferedBlockMgr { // stopped, the number of buffers this client could get. int64_t available_buffers(Client* client) const; + // Returns a MEM_LIMIT_EXCEEDED error which includes the minimum memory required + // by this client. + Status MemLimitTooLowError(Client* client); + // TODO: remove these two. Not clear what the sorter really needs. int available_allocated_buffers() const { return all_io_buffers_.size(); } int num_free_buffers() const { return free_io_buffers_.size(); } @@ -403,11 +428,12 @@ class BufferedBlockMgr { // Return a new buffer that can be used. *buffer is set to NULL if there was no // memory. // Otherwise, this function gets a new buffer by: - // 1. Allocating a new buffer if possible. + // 1. Allocating a new buffer if possible (if can_allocate is true). // 2. Using a buffer from the free list (which is populated by moving blocks from // the unpinned list by writing them out). // lock must be taken before calling this. This function can block. - Status FindBuffer(boost::unique_lock& lock, BufferDescriptor** buffer); + Status FindBuffer(boost::unique_lock& lock, bool can_allocate, + BufferDescriptor** buffer); // Writes unpinned blocks via DiskIoMgr until one of the following is true: // 1) The number of outstanding writes >= (block_write_threshold_ - num free buffers) @@ -580,7 +606,6 @@ class BufferedBlockMgr { // and hence no real reason to keep this separate from encryption. When true, blocks // will have an integrity check (SHA-256) performed after being read from disk. const bool check_integrity_; - }; // class BufferedBlockMgr } // namespace impala. diff --git a/be/src/runtime/buffered-tuple-stream-ir.cc b/be/src/runtime/buffered-tuple-stream-ir.cc index 792fd7336..345b62518 100644 --- a/be/src/runtime/buffered-tuple-stream-ir.cc +++ b/be/src/runtime/buffered-tuple-stream-ir.cc @@ -31,12 +31,14 @@ bool BufferedTupleStream::DeepCopy(TupleRow* row, uint8_t** dst) { template bool BufferedTupleStream::DeepCopyInternal(TupleRow* row, uint8_t** dst) { if (UNLIKELY(write_block_ == NULL)) return false; - DCHECK(write_block_->is_pinned()); + DCHECK_GE(null_indicators_write_block_, 0); + DCHECK(write_block_->is_pinned()) << DebugString() << std::endl + << write_block_->DebugString(); const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); if (UNLIKELY((write_block_->BytesRemaining() < fixed_tuple_row_size_) || - (HasNullableTuple && - (write_tuple_idx_ + tuples_per_row > null_indicators_per_block_ * 8)))) { + (HasNullableTuple && + (write_tuple_idx_ + tuples_per_row > null_indicators_write_block_ * 8)))) { return false; } // Allocate the maximum possible buffer for the fixed portion of the tuple. @@ -49,7 +51,7 @@ bool BufferedTupleStream::DeepCopyInternal(TupleRow* row, uint8_t** dst) { // Copy the not NULL fixed len tuples. For the NULL tuples just update the NULL tuple // indicator. if (HasNullableTuple) { - DCHECK_GT(null_indicators_per_block_, 0); + DCHECK_GT(null_indicators_write_block_, 0); uint8_t* null_word = NULL; uint32_t null_pos = 0; // Calculate how much space it should return. @@ -70,16 +72,16 @@ bool BufferedTupleStream::DeepCopyInternal(TupleRow* row, uint8_t** dst) { to_return += tuple_size; } } - DCHECK_LE(write_tuple_idx_ - 1, null_indicators_per_block_ * 8); + DCHECK_LE(write_tuple_idx_ - 1, null_indicators_write_block_ * 8); write_block_->ReturnAllocation(to_return); bytes_allocated -= to_return; } else { // If we know that there are no nullable tuples no need to set the nullability flags. - DCHECK_EQ(null_indicators_per_block_, 0); + DCHECK_EQ(null_indicators_write_block_, 0); for (int i = 0; i < tuples_per_row; ++i) { const int tuple_size = desc_.tuple_descriptors()[i]->byte_size(); Tuple* t = row->GetTuple(i); - // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materiliazed slots) + // TODO: Once IMPALA-1306 (Avoid passing empty tuples of non-materialized slots) // is delivered, the check below should become DCHECK_NOTNULL(t). DCHECK(t != NULL || tuple_size == 0); memcpy(tuple_buf, t, tuple_size); @@ -108,6 +110,7 @@ bool BufferedTupleStream::DeepCopyInternal(TupleRow* row, uint8_t** dst) { } } } + write_block_->AddRow(); ++num_rows_; return true; } diff --git a/be/src/runtime/buffered-tuple-stream-test.cc b/be/src/runtime/buffered-tuple-stream-test.cc index 47a12db5c..803ca6b43 100644 --- a/be/src/runtime/buffered-tuple-stream-test.cc +++ b/be/src/runtime/buffered-tuple-stream-test.cc @@ -94,7 +94,7 @@ class SimpleTupleStreamTest : public testing::Test { Status status = BufferedBlockMgr::Create(runtime_state_.get(), &tracker_, runtime_state_->runtime_profile(), limit, block_size, &block_mgr_); EXPECT_TRUE(status.ok()); - status = block_mgr_->RegisterClient(0, NULL, runtime_state_.get(), &client_); + status = block_mgr_->RegisterClient(0, &tracker_, runtime_state_.get(), &client_); EXPECT_TRUE(status.ok()); } @@ -235,7 +235,7 @@ class SimpleTupleStreamTest : public testing::Test { for (int i = 0; i < results.size(); i += int_tuples) { for (int j = 0; j < int_tuples; ++j) { if (!gen_null || (j % 2) == 0) { - ASSERT_TRUE(results[i+j] == i / int_tuples) + ASSERT_EQ(results[i+j], i / int_tuples) << " results[" << (i + j) << "]: " << results[i + j] << " != " << (i / int_tuples) << " gen_null=" << gen_null; } else { @@ -337,39 +337,43 @@ class SimpleTupleStreamTest : public testing::Test { } void TestIntValuesInterleaved(int num_batches, int num_batches_before_read) { - BufferedTupleStream stream(runtime_state_.get(), *int_desc_, block_mgr_.get(), - client_, - true, // delete_on_read - true); // read_write - Status status = stream.Init(); - ASSERT_TRUE(status.ok()); - status = stream.UnpinStream(); - ASSERT_TRUE(status.ok()); + for (int small_buffers = 0; small_buffers < 2; ++small_buffers) { + BufferedTupleStream stream(runtime_state_.get(), *int_desc_, block_mgr_.get(), + client_, + small_buffers == 0, // initial small buffers + true, // delete_on_read + true); // read_write + Status status = stream.Init(); + ASSERT_TRUE(status.ok()); + status = stream.UnpinStream(); + ASSERT_TRUE(status.ok()); - vector results; + vector results; - for (int i = 0; i < num_batches; ++i) { - RowBatch* batch = CreateIntBatch(i * BATCH_SIZE, BATCH_SIZE, false); - for (int j = 0; j < batch->num_rows(); ++j) { - bool b = stream.AddRow(batch->GetRow(j)); - ASSERT_TRUE(b); + for (int i = 0; i < num_batches; ++i) { + RowBatch* batch = CreateIntBatch(i * BATCH_SIZE, BATCH_SIZE, false); + for (int j = 0; j < batch->num_rows(); ++j) { + bool b = stream.AddRow(batch->GetRow(j)); + ASSERT_TRUE(b); + } + // Reset the batch to make sure the stream handles the memory correctly. + batch->Reset(); + if (i % num_batches_before_read == 0) { + ReadValues(&stream, int_desc_, &results, + (rand() % num_batches_before_read) + 1); + } } - // Reset the batch to make sure the stream handles the memory correctly. - batch->Reset(); - if (i % num_batches_before_read == 0) { - ReadValues(&stream, int_desc_, &results, (rand() % num_batches_before_read) + 1); + ReadValues(&stream, int_desc_, &results); + + // Verify result + const int int_tuples = int_desc_->tuple_descriptors().size(); + EXPECT_EQ(results.size(), BATCH_SIZE * num_batches * int_tuples); + for (int i = 0; i < results.size(); ++i) { + ASSERT_EQ(results[i], i / int_tuples); } - } - ReadValues(&stream, int_desc_, &results); - // Verify result - const int int_tuples = int_desc_->tuple_descriptors().size(); - EXPECT_EQ(results.size(), BATCH_SIZE * num_batches * int_tuples); - for (int i = 0; i < results.size(); ++i) { - ASSERT_EQ(results[i], i / int_tuples); + stream.Close(); } - - stream.Close(); } scoped_ptr exec_env_; @@ -553,6 +557,36 @@ TEST_F(SimpleTupleStreamTest, UnpinPin) { stream.Close(); } +TEST_F(SimpleTupleStreamTest, SmallBuffers) { + int buffer_size = 8 * 1024 * 1024; + CreateMgr(2 * buffer_size, buffer_size); + + BufferedTupleStream stream(runtime_state_.get(), *int_desc_, block_mgr_.get(), client_); + Status status = stream.Init(NULL, false); + ASSERT_TRUE(status.ok()); + + // Initial buffer should be small. + EXPECT_LT(stream.bytes_in_mem(false), buffer_size); + + RowBatch* batch = CreateIntBatch(0, 1024, false); + for (int i = 0; i < batch->num_rows(); ++i) { + bool ret = stream.AddRow(batch->GetRow(i)); + EXPECT_TRUE(ret); + } + EXPECT_LT(stream.bytes_in_mem(false), buffer_size); + EXPECT_LT(stream.byte_size(), buffer_size); + + // 40 MB of ints + batch = CreateIntBatch(0, 10 * 1024 * 1024, false); + for (int i = 0; i < batch->num_rows(); ++i) { + bool ret = stream.AddRow(batch->GetRow(i)); + ASSERT_TRUE(ret); + } + EXPECT_EQ(stream.bytes_in_mem(false), buffer_size); + + stream.Close(); +} + // Basic API test. No data should be going to disk. TEST_F(SimpleNullStreamTest, Basic) { CreateMgr(-1, 8 * 1024 * 1024); diff --git a/be/src/runtime/buffered-tuple-stream.cc b/be/src/runtime/buffered-tuple-stream.cc index 55434d997..e03dcf8be 100644 --- a/be/src/runtime/buffered-tuple-stream.cc +++ b/be/src/runtime/buffered-tuple-stream.cc @@ -28,6 +28,12 @@ using namespace impala; using namespace std; using namespace strings; +// The first NUM_SMALL_BLOCKS of the tuple stream are made of blocks less than the +// io size. These blocks never spill. +static const int64_t INITIAL_BLOCK_SIZES[] = + { 64 * 1024, 512 * 1024 }; +static const int NUM_SMALL_BLOCKS = sizeof(INITIAL_BLOCK_SIZES) / sizeof(int64_t); + string BufferedTupleStream::RowIdx::DebugString() const { stringstream ss; ss << "RowIdx block=" << block() << " offset=" << offset() << " idx=" << idx(); @@ -36,14 +42,17 @@ string BufferedTupleStream::RowIdx::DebugString() const { BufferedTupleStream::BufferedTupleStream(RuntimeState* state, const RowDescriptor& row_desc, BufferedBlockMgr* block_mgr, - BufferedBlockMgr::Client* client, bool delete_on_read, bool read_write) - : delete_on_read_(delete_on_read), + BufferedBlockMgr::Client* client, bool use_initial_small_buffers, + bool delete_on_read, bool read_write) + : use_small_buffers_(use_initial_small_buffers), + delete_on_read_(delete_on_read), read_write_(read_write), state_(state), desc_(row_desc), nullable_tuple_(row_desc.IsAnyTupleNullable()), block_mgr_(block_mgr), block_mgr_client_(client), + total_byte_size_(0), read_ptr_(NULL), read_tuple_idx_(0), read_bytes_(0), @@ -51,37 +60,23 @@ BufferedTupleStream::BufferedTupleStream(RuntimeState* state, read_block_idx_(-1), write_block_(NULL), num_pinned_(0), + num_small_blocks_(0), closed_(false), num_rows_(0), pinned_(true), pin_timer_(NULL), unpin_timer_(NULL), get_new_block_timer_(NULL) { + null_indicators_read_block_ = null_indicators_write_block_ = -1; read_block_ = blocks_.end(); fixed_tuple_row_size_ = 0; - min_tuple_row_size_ = 0; for (int i = 0; i < desc_.tuple_descriptors().size(); ++i) { const TupleDescriptor* tuple_desc = desc_.tuple_descriptors()[i]; const int tuple_byte_size = tuple_desc->byte_size(); fixed_tuple_row_size_ += tuple_byte_size; - min_tuple_row_size_ += desc_.TupleIsNullable(i) ? 0 : tuple_byte_size; if (tuple_desc->string_slots().empty()) continue; string_slots_.push_back(make_pair(i, tuple_desc->string_slots())); } - - if (nullable_tuple_) { - // We assume that all rows will use their max size, so we may be underutilizing the - // space, i.e. we may have some unused space in case of rows with NULL tuples. - const uint32_t tuples_per_row = desc_.tuple_descriptors().size(); - const uint32_t min_row_size_in_bits = 8 * fixed_tuple_row_size_ + tuples_per_row; - const uint32_t block_size_in_bits = 8 * block_mgr_->max_block_size(); - const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits; - null_indicators_per_block_ = - BitUtil::RoundUpNumi64(max_num_rows * tuples_per_row) * 8; - } else { - // If there are no nullable tuples then no need to waste space for null indicators. - null_indicators_per_block_ = 0; - } } // Returns the number of pinned blocks in the list. Only called in DCHECKs to validate @@ -90,7 +85,7 @@ int NumPinned(const list& blocks) { int num_pinned = 0; for (list::const_iterator it = blocks.begin(); it != blocks.end(); ++it) { - if ((*it)->is_pinned()) ++num_pinned; + if ((*it)->is_pinned() && (*it)->is_max_size()) ++num_pinned; } return num_pinned; } @@ -100,6 +95,7 @@ string BufferedTupleStream::DebugString() const { ss << "BufferedTupleStream num_rows=" << num_rows_ << " rows_returned=" << rows_returned_ << " pinned=" << (pinned_ ? "true" : "false") << " delete_on_read=" << (delete_on_read_ ? "true" : "false") + << " closed=" << (closed_ ? "true" : "false") << " num_pinned=" << num_pinned_ << " write_block=" << write_block_ << " read_block_="; if (read_block_ == blocks_.end()) { @@ -126,7 +122,7 @@ Status BufferedTupleStream::Init(RuntimeProfile* profile, bool pinned) { bool got_block = false; RETURN_IF_ERROR(NewBlockForWrite(fixed_tuple_row_size_, &got_block)); - if (!got_block) return Status("Not enough memory to initialize BufferedTupleStream."); + if (!got_block) return block_mgr_->MemLimitTooLowError(block_mgr_client_); DCHECK(write_block_ != NULL); if (read_write_) RETURN_IF_ERROR(PrepareForRead()); if (!pinned) RETURN_IF_ERROR(UnpinStream()); @@ -145,16 +141,25 @@ void BufferedTupleStream::Close() { } int64_t BufferedTupleStream::bytes_in_mem(bool ignore_current) const { - int num_in_mem_blocks = num_pinned_; - if (write_block_ != NULL && ignore_current) { - DCHECK(write_block_->is_pinned()); - --num_in_mem_blocks; + int64_t result = 0; + for (list::const_iterator it = blocks_.begin(); + it != blocks_.end(); ++it) { + if (!(*it)->is_pinned()) continue; + if (!(*it)->is_max_size()) continue; + if (*it == write_block_ && ignore_current) continue; + result += (*it)->buffer_len(); } - return num_in_mem_blocks * block_mgr_->max_block_size(); + return result; } -int64_t BufferedTupleStream::bytes_unpinned() const { - return (blocks_.size() - num_pinned_) * block_mgr_->max_block_size(); +Status BufferedTupleStream::UnpinBlock(BufferedBlockMgr::Block* block) { + SCOPED_TIMER(unpin_timer_); + DCHECK(block->is_pinned()); + if (!block->is_max_size()) return Status::OK; + RETURN_IF_ERROR(block->Unpin()); + --num_pinned_; + DCHECK_EQ(num_pinned_, NumPinned(blocks_)); + return Status::OK; } Status BufferedTupleStream::NewBlockForWrite(int min_size, bool* got_block) { @@ -168,13 +173,34 @@ Status BufferedTupleStream::NewBlockForWrite(int min_size, bool* got_block) { BufferedBlockMgr::Block* unpin_block = write_block_; if (write_block_ != NULL) { DCHECK(write_block_->is_pinned()); - if (pinned_ || write_block_ == *read_block_) unpin_block = NULL; + if (pinned_ || write_block_ == *read_block_ || !write_block_->is_max_size()) { + // In these cases, don't unpin the current write block. + unpin_block = NULL; + } + } + + int64_t block_len = block_mgr_->max_block_size(); + if (use_small_buffers_ && blocks_.size() < NUM_SMALL_BLOCKS) { + block_len = min(block_len, INITIAL_BLOCK_SIZES[blocks_.size()]); + if (block_len < min_size) block_len = block_mgr_->max_block_size(); } BufferedBlockMgr::Block* new_block = NULL; { SCOPED_TIMER(get_new_block_timer_); - RETURN_IF_ERROR(block_mgr_->GetNewBlock(block_mgr_client_, unpin_block, &new_block)); + RETURN_IF_ERROR(block_mgr_->GetNewBlock( + block_mgr_client_, unpin_block, &new_block, block_len)); + if (new_block == NULL && block_len < block_mgr_->max_block_size()) { + // We were unable to get a small buffer. Try again with an io sized buffer. The + // io-sized buffers are protected by the reservations, so we should be able to + // get one. We don't want the small buffers to take away from the reservation. + // With a single io-sized buffer, we can make indefinite progress but we cannot + // with a smaller buffer. + DCHECK(unpin_block == NULL); + RETURN_IF_ERROR(block_mgr_->GetNewBlock( + block_mgr_client_, unpin_block, &new_block)); + use_small_buffers_ = false; + } } *got_block = (new_block != NULL); @@ -183,23 +209,30 @@ Status BufferedTupleStream::NewBlockForWrite(int min_size, bool* got_block) { return Status::OK; } - if (!pinned_ && write_block_ != NULL) { - if (!pinned_ && write_block_ != *read_block_) { - DCHECK(!write_block_->is_pinned()); - --num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); - } + if (unpin_block != NULL) { + DCHECK(unpin_block == write_block_); + DCHECK(!write_block_->is_pinned()); + --num_pinned_; + DCHECK_EQ(num_pinned_, NumPinned(blocks_)); } - // Allocate the block header with the null indicators - new_block->Allocate(null_indicators_per_block_); + + // Compute and allocate the block header with the null indicators + null_indicators_write_block_ = ComputeNumNullIndicatorBytes(block_len); + new_block->Allocate(null_indicators_write_block_); write_tuple_idx_ = 0; blocks_.push_back(new_block); block_start_idx_.push_back(new_block->buffer()); write_block_ = new_block; DCHECK(write_block_->is_pinned()); - ++num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); + DCHECK_EQ(write_block_->num_rows(), 0); + if (write_block_->is_max_size()) { + ++num_pinned_; + DCHECK_EQ(num_pinned_, NumPinned(blocks_)); + } else { + ++num_small_blocks_; + } + total_byte_size_ += block_len; return Status::OK; } @@ -214,12 +247,22 @@ Status BufferedTupleStream::NextBlockForRead() { BufferedBlockMgr::Block* block_to_free = (!pinned_ || delete_on_read_) ? *read_block_ : NULL; if (delete_on_read_) { + // TODO: this is weird. We are deleting even if it is pinned. The analytic + // eval node needs this. + DCHECK(read_block_ == blocks_.begin()); + DCHECK(*read_block_ != write_block_); blocks_.pop_front(); read_block_ = blocks_.begin(); read_block_idx_ = 0; + if (block_to_free != NULL && !block_to_free->is_max_size()) { + RETURN_IF_ERROR(block_to_free->Delete()); + block_to_free = NULL; + DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << DebugString(); + } } else { ++read_block_; ++read_block_idx_; + if (block_to_free != NULL && !block_to_free->is_max_size()) block_to_free = NULL; } read_ptr_ = NULL; @@ -233,10 +276,10 @@ Status BufferedTupleStream::NextBlockForRead() { SCOPED_TIMER(unpin_timer_); if (delete_on_read_) { RETURN_IF_ERROR(block_to_free->Delete()); + --num_pinned_; } else { - RETURN_IF_ERROR(block_to_free->Unpin()); + RETURN_IF_ERROR(UnpinBlock(block_to_free)); } - --num_pinned_; } } else { // Call into the block mgr to atomically unpin/delete the old block and pin the @@ -251,7 +294,9 @@ Status BufferedTupleStream::NextBlockForRead() { } if (read_block_ != blocks_.end() && (*read_block_)->is_pinned()) { - read_ptr_ = (*read_block_)->buffer() + null_indicators_per_block_; + null_indicators_read_block_ = + ComputeNumNullIndicatorBytes((*read_block_)->buffer_len()); + read_ptr_ = (*read_block_)->buffer() + null_indicators_read_block_; } DCHECK_EQ(num_pinned_, NumPinned(blocks_)) << DebugString(); return Status::OK; @@ -264,35 +309,34 @@ Status BufferedTupleStream::PrepareForRead(bool* got_buffer) { if (!read_write_ && write_block_ != NULL) { DCHECK(write_block_->is_pinned()); if (!pinned_ && write_block_ != blocks_.front()) { - SCOPED_TIMER(unpin_timer_); - write_block_->Unpin(); - --num_pinned_; + RETURN_IF_ERROR(UnpinBlock(write_block_)); } write_block_ = NULL; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); } - read_block_ = blocks_.begin(); - if (!(*read_block_)->is_pinned()) { - SCOPED_TIMER(pin_timer_); - bool current_pinned; - RETURN_IF_ERROR((*read_block_)->Pin(¤t_pinned)); - if (!current_pinned) { - if (got_buffer == NULL) { - DCHECK(current_pinned) << "Should have reserved enough blocks." << endl - << block_mgr_->DebugString(block_mgr_client_);; - return Status::MEM_LIMIT_EXCEEDED; - } else { + // Walk the blocks and pin the first non-io sized block. + for (list::iterator it = blocks_.begin(); + it != blocks_.end(); ++it) { + if (!(*it)->is_pinned()) { + SCOPED_TIMER(pin_timer_); + bool current_pinned; + RETURN_IF_ERROR((*it)->Pin(¤t_pinned)); + if (!current_pinned) { + DCHECK(got_buffer != NULL) << "Should have reserved enough blocks"; *got_buffer = false; return Status::OK; } + ++num_pinned_; + DCHECK_EQ(num_pinned_, NumPinned(blocks_)); } - ++num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); + if ((*it)->is_max_size()) break; } + read_block_ = blocks_.begin(); DCHECK(read_block_ != blocks_.end()); - read_ptr_ = (*read_block_)->buffer() + null_indicators_per_block_; + null_indicators_read_block_ = + ComputeNumNullIndicatorBytes((*read_block_)->buffer_len()); + read_ptr_ = (*read_block_)->buffer() + null_indicators_read_block_; read_tuple_idx_ = 0; read_bytes_ = 0; rows_returned_ = 0; @@ -348,9 +392,7 @@ Status BufferedTupleStream::UnpinStream(bool all) { if (!all && (block == write_block_ || (read_write_ && block == *read_block_))) { continue; } - RETURN_IF_ERROR(block->Unpin()); - --num_pinned_; - DCHECK_EQ(num_pinned_, NumPinned(blocks_)); + RETURN_IF_ERROR(UnpinBlock(block)); } if (all) { read_block_ = blocks_.end(); @@ -360,6 +402,22 @@ Status BufferedTupleStream::UnpinStream(bool all) { return Status::OK; } +int BufferedTupleStream::ComputeNumNullIndicatorBytes(int block_size) const { + if (nullable_tuple_) { + // We assume that all rows will use their max size, so we may be underutilizing the + // space, i.e. we may have some unused space in case of rows with NULL tuples. + const uint32_t tuples_per_row = desc_.tuple_descriptors().size(); + const uint32_t min_row_size_in_bits = 8 * fixed_tuple_row_size_ + tuples_per_row; + const uint32_t block_size_in_bits = 8 * block_size; + const uint32_t max_num_rows = block_size_in_bits / min_row_size_in_bits; + return + BitUtil::RoundUpNumi64(max_num_rows * tuples_per_row) * 8; + } else { + // If there are no nullable tuples then no need to waste space for null indicators. + return 0; + } +} + Status BufferedTupleStream::GetRows(scoped_ptr* batch, bool* got_rows) { RETURN_IF_ERROR(PinStream(false, got_rows)); if (!*got_rows) return Status::OK; @@ -389,29 +447,36 @@ Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, DCHECK_EQ(batch->num_rows(), 0); *eos = (rows_returned_ == num_rows_); if (*eos) return Status::OK; + DCHECK_GE(null_indicators_read_block_, 0); + + const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); + DCHECK_LE(read_tuple_idx_ / tuples_per_row, (*read_block_)->num_rows()); + DCHECK_EQ(read_tuple_idx_ % tuples_per_row, 0); + int rows_returned_curr_block = read_tuple_idx_ / tuples_per_row; + + int64_t data_len = (*read_block_)->valid_data_len() - null_indicators_read_block_; + if (UNLIKELY(rows_returned_curr_block == (*read_block_)->num_rows())) { + // Get the next block in the stream. We need to do this at the beginning of + // the GetNext() call to ensure the buffer management semantics. NextBlockForRead() + // will recycle the memory for the rows returned from the *previous* call to + // GetNext(). + RETURN_IF_ERROR(NextBlockForRead()); + DCHECK(read_block_ != blocks_.end()) << DebugString(); + DCHECK_GE(null_indicators_read_block_, 0); + data_len = (*read_block_)->valid_data_len() - null_indicators_read_block_; + rows_returned_curr_block = 0; + } + + DCHECK(read_block_ != blocks_.end()); + DCHECK((*read_block_)->is_pinned()) << DebugString(); + DCHECK(read_ptr_ != NULL); int64_t rows_left = num_rows_ - rows_returned_; int rows_to_fill = std::min(static_cast(batch->capacity()), rows_left); DCHECK_GE(rows_to_fill, 1); batch->AddRows(rows_to_fill); uint8_t* tuple_row_mem = reinterpret_cast(batch->GetRow(0)); - int64_t data_len = (*read_block_)->valid_data_len() - null_indicators_per_block_; - const uint64_t tuples_per_row = desc_.tuple_descriptors().size(); - if (UNLIKELY(((data_len - read_bytes_) < min_tuple_row_size_) || - (HasNullableTuple && - ((read_tuple_idx_ + tuples_per_row) > null_indicators_per_block_ * 8)))) { - // Get the next block in the stream. We need to do this at the beginning of - // the GetNext() call to ensure the buffer management semantics. NextBlockForRead() - // will recycle the memory for the rows returned from the *previous* call to - // GetNext(). - RETURN_IF_ERROR(NextBlockForRead()); - data_len = (*read_block_)->valid_data_len() - null_indicators_per_block_; - DCHECK_GE(data_len, min_tuple_row_size_); - } - DCHECK(read_block_ != blocks_.end()); - DCHECK((*read_block_)->is_pinned()); - DCHECK(read_ptr_ != NULL); // Produce tuple rows from the current block and the corresponding position on the // null tuple indicator. @@ -435,18 +500,14 @@ Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, uint64_t last_read_row = read_tuple_idx_ / tuples_per_row; while (i < rows_to_fill) { // Check if current block is done. - if (UNLIKELY(((data_len - read_bytes_) < min_tuple_row_size_) || - (HasNullableTuple && - ((read_tuple_idx_ + tuples_per_row) > null_indicators_per_block_ * 8)))) { - break; - } + if (UNLIKELY(rows_returned_curr_block + i == (*read_block_)->num_rows())) break; // Copy the row into the output batch. TupleRow* row = reinterpret_cast(tuple_row_mem); last_read_ptr = reinterpret_cast(read_ptr_); indices->push_back(RowIdx()); DCHECK_EQ(indices->size(), i + 1); - (*indices)[i].set(read_block_idx_, read_bytes_ + null_indicators_per_block_, + (*indices)[i].set(read_block_idx_, read_bytes_ + null_indicators_read_block_, last_read_row); if (HasNullableTuple) { for (int j = 0; j < tuples_per_row; ++j) { @@ -465,7 +526,6 @@ Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, const uint64_t row_read_bytes = reinterpret_cast(read_ptr_) - last_read_ptr; DCHECK_GE(fixed_tuple_row_size_, row_read_bytes); - DCHECK_LE(min_tuple_row_size_, row_read_bytes); read_bytes_ += row_read_bytes; last_read_ptr = reinterpret_cast(read_ptr_); } else { @@ -476,6 +536,7 @@ Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, read_ptr_ += desc_.tuple_descriptors()[j]->byte_size(); } read_bytes_ += fixed_tuple_row_size_; + read_tuple_idx_ += tuples_per_row; } tuple_row_mem += sizeof(Tuple*) * tuples_per_row; @@ -489,7 +550,7 @@ Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, if (tuple->IsNull(slot_desc->null_indicator_offset())) continue; StringValue* sv = tuple->GetStringSlot(slot_desc->tuple_offset()); - DCHECK_LE(sv->len, data_len - read_bytes_) << DebugString(); + DCHECK_LE(sv->len, data_len - read_bytes_); sv->ptr = reinterpret_cast(read_ptr_); read_ptr_ += sv->len; read_bytes_ += sv->len; @@ -502,10 +563,8 @@ Status BufferedTupleStream::GetNextInternal(RowBatch* batch, bool* eos, batch->CommitRows(i); rows_returned_ += i; *eos = (rows_returned_ == num_rows_); - if (!pinned_ && - (((data_len - read_bytes_) < min_tuple_row_size_) || - (HasNullableTuple && - ((read_tuple_idx_ + tuples_per_row) > null_indicators_per_block_ * 8)))) { + if ((!pinned_ || delete_on_read_) && + rows_returned_curr_block + i == (*read_block_)->num_rows()) { // No more data in this block. Mark this batch as needing to return so // the caller can pass the rows up the operator tree. batch->MarkNeedToReturn(); diff --git a/be/src/runtime/buffered-tuple-stream.h b/be/src/runtime/buffered-tuple-stream.h index 5ce0fbb1d..449b67b76 100644 --- a/be/src/runtime/buffered-tuple-stream.h +++ b/be/src/runtime/buffered-tuple-stream.h @@ -33,6 +33,16 @@ class TupleRow; // // The underlying memory management is done by the BufferedBlockMgr. // +// The tuple stream consists of a number of small (less than io sized blocks) before +// an arbitrary number of io sized blocks. The smaller blocks do not spill and are +// there to lower the minimum buffering requirements. For example, an operator that +// needs to maintain 64 streams (1 buffer per partition) would need, by default, +// 64 * 8MB = 512MB of buffering. A query with 5 of these operators would require +// 2.56 GB just to run any query, regardless of how much of that is used. This is +// problematic for small queries. Instead we will start with a fixed number of small +// buffers and only start using IO sized buffers when those fill up. The small buffers +// never spill. +// // The BufferedTupleStream is *not* thread safe from the caller's point of view. It is // expected that all the APIs are called from a single thread. Internally, the // object is thread safe wrt to the underlying block mgr. @@ -78,6 +88,8 @@ class TupleRow; // TODO: see if this can be merged with Sorter::Run. The key difference is that this // does not need to return rows in the order they were added, which allows it to be // simpler. +// TODO: we could compact the small buffers when we need to spill but they use very +// little memory so ths might not be very useful. // TODO: improvements: // - Think about how to layout for the var len data more, possibly filling in them // from the end of the same block. Don't interleave fixed and var len data. @@ -144,11 +156,13 @@ class BufferedTupleStream { // that are added and the rows being returned. // block_mgr: Underlying block mgr that owns the data blocks. // delete_on_read: Blocks are deleted after they are read. + // use_initial_small_buffers: If true, the initial N buffers allocated for the + // tuple stream use smaller than io sized buffers. // read_write: Stream allows interchanging read and write operations. Requires at // least two blocks may be pinned. - // The tuple stream is initially in pinned mode. BufferedTupleStream(RuntimeState* state, const RowDescriptor& row_desc, BufferedBlockMgr* block_mgr, BufferedBlockMgr::Client* client, + bool use_initial_small_buffers = true, bool delete_on_read = false, bool read_write = false); // Initializes the tuple stream object. Must be called once before any of the @@ -218,22 +232,24 @@ class BufferedTupleStream { int64_t rows_returned() const { return rows_returned_; } // Returns the byte size necessary to store the entire stream in memory. - int64_t byte_size() const { return blocks_.size() * block_mgr_->max_block_size(); } + int64_t byte_size() const { return total_byte_size_; } // Returns the byte size of the stream that is currently pinned in memory. // If ignore_current is true, the write_block_ memory is not included. int64_t bytes_in_mem(bool ignore_current) const; - // Returns the number of bytes that are in unpinned blocks. - int64_t bytes_unpinned() const; - bool is_pinned() const { return pinned_; } int blocks_pinned() const { return num_pinned_; } - int blocks_unpinned() const { return blocks_.size() - num_pinned_; } + int blocks_unpinned() const { return blocks_.size() - num_pinned_ - num_small_blocks_; } bool has_read_block() const { return read_block_ != blocks_.end(); } bool has_write_block() const { return write_block_ != NULL; } + std::string DebugString() const; + private: + // If true, this stream is still using small buffers. + bool use_small_buffers_; + // If true, blocks are deleted after they are read. const bool delete_on_read_; @@ -254,15 +270,13 @@ class BufferedTupleStream { // Sum of the fixed length portion of all the tuples in desc_. int fixed_tuple_row_size_; - // Sum of the non-nullable fixed-length portion of all the tuples in desc. This - // constitutes the minimum possible space occupied by a single row. - int min_tuple_row_size_; - - // Max size (in bytes) of null indicators bitstring in each block. If 0, it means that - // there is no need to store null indicators for this RowDesc. We calculate this value - // based on the block size and the fixed_tuple_row_size_. When not 0, this value is - // also an upper bound for the number of (rows * tuples_per_row) in a block. - uint32_t null_indicators_per_block_; + // Max size (in bytes) of null indicators bitstring in the current read and write + // blocks. If 0, it means that there is no need to store null indicators for this + // RowDesc. We calculate this value based on the block's size and the + // fixed_tuple_row_size_. When not 0, this value is also an upper bound for the number + // of (rows * tuples_per_row) in this block. + uint32_t null_indicators_read_block_; + uint32_t null_indicators_write_block_; // Vector of all the strings slots grouped by tuple_idx. std::vector > > string_slots_; @@ -274,6 +288,9 @@ class BufferedTupleStream { // List of blocks in the stream. std::list blocks_; + // Total size of blocks_, including small blocks. + int64_t total_byte_size_; + // Iterator pointing to the current block for read. If read_write_, this is always a // valid block, otherwise equal to list.end() until PrepareForRead() is called. std::list::iterator read_block_; @@ -306,8 +323,12 @@ class BufferedTupleStream { // Number of pinned blocks in blocks_, stored to avoid iterating over the list // to compute bytes_in_mem and bytes_unpinned. + // This does not include small blocks. int num_pinned_; + // The total number of small blocks in blocks_; + int num_small_blocks_; + bool closed_; // Used for debugging. Status status_; @@ -336,19 +357,23 @@ class BufferedTupleStream { bool DeepCopy(TupleRow* row, uint8_t** dst); // Gets a new block from the block_mgr_, updating write_block_ and write_tuple_idx_, and - // setting *got_block. If there are no blocks available, write_block_ is set to NULL - // and *got_block is set to false. + // setting *got_block. If there are no blocks available, *got_block is set to false + // and write_block_ is unchanged. // min_size is the minimum number of bytes required for this block. Status NewBlockForWrite(int min_size, bool* got_block); // Reads the next block from the block_mgr_. This blocks if necessary. - // Updates read_block_, read_ptr_, read_tuple_idx_ and read_bytes_left_. + // Updates read_block_, read_ptr_, read_tuple_idx_ and read_bytes_. Status NextBlockForRead(); // Returns the byte size of this row when encoded in a block. int ComputeRowSize(TupleRow* row) const; - std::string DebugString() const; + // Unpins block if it is an io sized block and updates tracking stats. + Status UnpinBlock(BufferedBlockMgr::Block* block); + + // Computes the number of bytes needed for null indicators for a block of 'block_size' + int ComputeNumNullIndicatorBytes(int block_size) const; }; } diff --git a/be/src/runtime/buffered-tuple-stream.inline.h b/be/src/runtime/buffered-tuple-stream.inline.h index ee8e86e3e..5ae4c814f 100644 --- a/be/src/runtime/buffered-tuple-stream.inline.h +++ b/be/src/runtime/buffered-tuple-stream.inline.h @@ -23,6 +23,7 @@ namespace impala { inline bool BufferedTupleStream::AddRow(TupleRow* row, uint8_t** dst) { + DCHECK(!closed_); if (LIKELY(DeepCopy(row, dst))) return true; bool got_block = false; status_ = NewBlockForWrite(ComputeRowSize(row), &got_block); @@ -31,7 +32,7 @@ inline bool BufferedTupleStream::AddRow(TupleRow* row, uint8_t** dst) { } inline uint8_t* BufferedTupleStream::AllocateRow(int size) { - DCHECK_GE(size, min_tuple_row_size_); + DCHECK(!closed_); if (UNLIKELY(write_block_ == NULL || write_block_->BytesRemaining() < size)) { bool got_block = false; status_ = NewBlockForWrite(size, &got_block); @@ -41,10 +42,12 @@ inline uint8_t* BufferedTupleStream::AllocateRow(int size) { DCHECK(write_block_->is_pinned()); DCHECK_GE(write_block_->BytesRemaining(), size); ++num_rows_; + write_block_->AddRow(); return write_block_->Allocate(size); } inline void BufferedTupleStream::GetTupleRow(const RowIdx& idx, TupleRow* row) const { + DCHECK(!closed_); DCHECK(is_pinned()); DCHECK(!delete_on_read_); DCHECK_EQ(blocks_.size(), block_start_idx_.size()); diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling.test b/testdata/workloads/functional-query/queries/QueryTest/spilling.test index 2f737913e..322eb635a 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling.test @@ -138,4 +138,23 @@ select max(t1.total_count), max(t1.l_shipinstruct), max(t1.l_comment) from 6001215,'TAKE BACK RETURN','zzle? slyly final platelets sleep quickly. ' ---- TYPES BIGINT, STRING, STRING -==== \ No newline at end of file +---- QUERY +# Run this query with very low memory. Since the tables are small, the PA/PHJ should be +# using buffers much smaller than the io buffer. +set max_block_mgr_memory=10m; +select a.int_col, count(*) +from functional.alltypessmall a, functional.alltypessmall b, functional.alltypessmall c +where a.id = b.id and b.id = c.id group by a.int_col; +---- RESULTS +0,12 +1,12 +2,12 +3,12 +4,12 +5,8 +6,8 +7,8 +8,8 +9,8 +---- TYPES +INT, BIGINT diff --git a/tests/custom_cluster/test_spillling.py b/tests/custom_cluster/test_spillling.py index 6b544cef8..6f2ff7a28 100755 --- a/tests/custom_cluster/test_spillling.py +++ b/tests/custom_cluster/test_spillling.py @@ -29,7 +29,8 @@ class TestSpillStress(CustomClusterTestSuite): @classmethod def setup_class(cls): #start impala with args - cls._start_impala_cluster(['--impalad_args=--"read_size=1000000"']) + cls._start_impala_cluster(['--impalad_args=--"read_size=1000000"', + 'catalogd_args="--load_catalog_in_background=false"']) super(CustomClusterTestSuite, cls).setup_class() @classmethod @@ -83,7 +84,8 @@ class TestSpilling(CustomClusterTestSuite): # Reduce the IO read size. This reduces the memory required to trigger spilling. @pytest.mark.execute_serially @CustomClusterTestSuite.with_args( - impalad_args="--read_size=1000000") + impalad_args="--read_size=1000000", + catalogd_args="--load_catalog_in_background=false") def test_spilling(self, vector): new_vector = deepcopy(vector) # remove this. the test cases set this explicitly.