diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index c3a5f80d4..afd72620e 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -247,6 +247,10 @@ Status ExecNode::ClaimBufferReservation(RuntimeState* state) { return Status::OK(); } +Status ExecNode::ReleaseUnusedReservation() { + return buffer_pool_client_.DecreaseReservationTo(resource_profile_.min_reservation); +} + Status ExecNode::CreateTree( RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) { if (plan.nodes.size() == 0) { diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index 55c51ab7e..04470f2af 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -233,7 +233,16 @@ class ExecNode { /// ExecNode. Only needs to be called by ExecNodes that will use the client. /// The client is automatically cleaned up in Close(). Should not be called if /// the client is already open. - Status ClaimBufferReservation(RuntimeState* state); + /// + /// The ExecNode must return the initial reservation to + /// QueryState::initial_reservations(), which is done automatically in Close() as long + /// as the initial reservation is not released before Close(). + Status ClaimBufferReservation(RuntimeState* state) WARN_UNUSED_RESULT; + + /// Release any unused reservation in excess of the node's initial reservation. Returns + /// an error if releasing the reservation requires flushing pages to disk, and that + /// fails. + Status ReleaseUnusedReservation() WARN_UNUSED_RESULT; /// Extends blocking queue for row batches. Row batches have a property that /// they must be processed in the order they were produced, even in cancellation diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 8432bccb6..394904139 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -1198,6 +1198,16 @@ int64_t PartitionedAggregationNode::LargestSpilledPartition() const { Status PartitionedAggregationNode::NextPartition() { DCHECK(output_partition_ == nullptr); + if (!IsInSubplan() && spilled_partitions_.empty()) { + // All partitions are in memory. Release reservation that was used for previous + // partitions that is no longer needed. If we have spilled partitions, we want to + // hold onto all reservation in case it is needed to process the spilled partitions. + DCHECK(!buffer_pool_client_.has_unpinned_pages()); + Status status = ReleaseUnusedReservation(); + DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are " + << "no unpinned pages. " << status.GetDetail(); + } + // Keep looping until we get to a partition that fits in memory. Partition* partition = nullptr; while (true) { diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc index 440f80950..80df2146a 100644 --- a/be/src/exec/sort-node.cc +++ b/be/src/exec/sort-node.cc @@ -103,6 +103,19 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { *eos = false; } + if (returned_buffer_) { + // If the Sorter returned a buffer on the last call to GetNext(), we might have an + // opportunity to release memory. Release reservation, unless it might be needed + // for the next subplan iteration or merging spilled runs. + returned_buffer_ = false; + if (!IsInSubplan() && !sorter_->HasSpilledRuns()) { + DCHECK(!buffer_pool_client_.has_unpinned_pages()); + Status status = ReleaseUnusedReservation(); + DCHECK(status.ok()) << "Should not fail - no runs were spilled so no pages are " + << "unpinned. " << status.GetDetail(); + } + } + DCHECK_EQ(row_batch->num_rows(), 0); RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos)); while ((num_rows_skipped_ < offset_)) { @@ -119,6 +132,7 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos)); } + returned_buffer_ = row_batch->num_buffers() > 0; num_rows_returned_ += row_batch->num_rows(); if (ReachedLimit()) { row_batch->set_num_rows(row_batch->num_rows() - (num_rows_returned_ - limit_)); diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h index a11d42463..d6eef256a 100644 --- a/be/src/exec/sort-node.h +++ b/be/src/exec/sort-node.h @@ -69,6 +69,10 @@ class SortNode : public ExecNode { std::vector is_asc_order_; std::vector nulls_first_; + /// Whether the previous call to GetNext() returned a buffer attached to the RowBatch. + /// Used to avoid unnecessary calls to ReleaseUnusedReservation(). + bool returned_buffer_ = false; + ///////////////////////////////////////// /// BEGIN: Members that must be Reset() diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h index 0c0408be2..70949422e 100644 --- a/be/src/runtime/bufferpool/buffer-pool-internal.h +++ b/be/src/runtime/bufferpool/buffer-pool-internal.h @@ -227,7 +227,7 @@ class BufferPool::Client { /// already in memory, ensures the data is in the page's buffer. If the data is on /// disk, starts an async read of the data and sets 'pin_in_flight' on the page to /// true. Neither the client's lock nor page->buffer_lock should be held by the caller. - Status StartMoveToPinned(ClientHandle* client, Page* page); + Status StartMoveToPinned(ClientHandle* client, Page* page) WARN_UNUSED_RESULT; /// Moves a page that has a pin in flight back to the evicted state, undoing /// StartMoveToPinned(). Neither the client's lock nor page->buffer_lock should be held @@ -236,13 +236,16 @@ class BufferPool::Client { /// Finish the work of bring the data of an evicted page to memory if /// page->pin_in_flight was set to true by StartMoveToPinned(). - Status FinishMoveEvictedToPinned(Page* page); + Status FinishMoveEvictedToPinned(Page* page) WARN_UNUSED_RESULT; /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer() /// API to deduct from the client's reservation and update internal accounting. Cleans /// dirty pages if needed to satisfy the buffer pool's internal invariants. No page or /// client locks should be held by the caller. - Status PrepareToAllocateBuffer(int64_t len); + Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT; + + /// Implementation of ClientHandle::DecreaseReservationTo(). + Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT; /// Called after a buffer of 'len' is freed via the FreeBuffer() API to update /// internal accounting and release the buffer to the client's reservation. No page or @@ -272,6 +275,11 @@ class BufferPool::Client { const BufferPoolClientCounters& counters() const { return counters_; } bool spilling_enabled() const { return file_group_ != NULL; } void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; } + bool has_unpinned_pages() const { + // Safe to read without lock since other threads should not be calling BufferPool + // functions that create, destroy or unpin pages. + return pinned_pages_.size() < num_pages_; + } std::string DebugString(); diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index 2eff955e9..b2f869513 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -157,6 +157,14 @@ class BufferPoolTest : public ::testing::Test { return !page->page_->buffer.is_open(); } + int NumEvicted(vector& pages) { + int num_evicted = 0; + for (PageHandle& page : pages) { + if (IsEvicted(&page)) ++num_evicted; + } + return num_evicted; + } + /// Allocate buffers of varying sizes at most 'max_buffer_size' that add up to /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size. /// If 'randomize_core' is true, will switch thread between cores randomly before @@ -606,6 +614,7 @@ TEST_F(BufferPoolTest, CleanPageStats) { vector pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); WriteData(pages, 0); + EXPECT_FALSE(client.has_unpinned_pages()); // Pages don't start off clean. EXPECT_EQ(0, pool.GetNumCleanPages()); @@ -613,22 +622,27 @@ TEST_F(BufferPoolTest, CleanPageStats) { // Unpin pages and wait until they're written out and therefore clean. UnpinAll(&pool, &client, &pages); + EXPECT_TRUE(client.has_unpinned_pages()); WaitForAllWrites(&client); EXPECT_EQ(MAX_NUM_BUFFERS, pool.GetNumCleanPages()); EXPECT_EQ(TOTAL_MEM, pool.GetCleanPageBytes()); + EXPECT_TRUE(client.has_unpinned_pages()); // Do an allocation to force eviction of one page. ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); EXPECT_EQ(MAX_NUM_BUFFERS - 1, pool.GetNumCleanPages()); EXPECT_EQ(TOTAL_MEM - TEST_BUFFER_LEN, pool.GetCleanPageBytes()); + EXPECT_TRUE(client.has_unpinned_pages()); // Re-pin all the pages - none will be clean afterwards. ASSERT_OK(PinAll(&pool, &client, &pages)); VerifyData(pages, 0); EXPECT_EQ(0, pool.GetNumCleanPages()); EXPECT_EQ(0, pool.GetCleanPageBytes()); + EXPECT_FALSE(client.has_unpinned_pages()); DestroyAll(&pool, &client, &pages); + EXPECT_FALSE(client.has_unpinned_pages()); pool.DeregisterClient(&client); global_reservations_.Close(); } @@ -1242,11 +1256,7 @@ void BufferPoolTest::TestEvictionPolicy(int64_t page_size) { // No additional memory should have been allocated - it should have been recycled. EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated()); // Check that two pages were evicted. - int num_evicted = 0; - for (PageHandle& page : pages) { - if (IsEvicted(&page)) ++num_evicted; - } - EXPECT_EQ(NUM_EXTRA_BUFFERS, num_evicted); + EXPECT_EQ(NUM_EXTRA_BUFFERS, NumEvicted(pages)); // Free up memory required to pin the original pages again. FreeBuffers(&pool, &client, &extra_buffers); @@ -1928,6 +1938,48 @@ TEST_F(BufferPoolTest, SubReservation) { subreservation.Close(); pool.DeregisterClient(&client); } + +// Check that we can decrease reservation without violating any buffer pool invariants. +TEST_F(BufferPoolTest, DecreaseReservation) { + const int MAX_NUM_BUFFERS = 4; + const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; + global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); + BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM); + + ClientHandle client; + ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, + nullptr, TOTAL_MEM, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); + + vector pages; + CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); + WriteData(pages, 0); + + // Unpin pages and decrease reservation while the writes are in flight. + UnpinAll(&pool, &client, &pages); + ASSERT_OK(client.DecreaseReservationTo(2 * TEST_BUFFER_LEN)); + // Two pages must be clean to stay within reservation + EXPECT_GE(pool.GetNumCleanPages(), 2); + EXPECT_EQ(2 * TEST_BUFFER_LEN, client.GetReservation()); + + // Decrease it further after the pages are evicted. + WaitForAllWrites(&client); + ASSERT_OK(client.DecreaseReservationTo(TEST_BUFFER_LEN)); + EXPECT_GE(pool.GetNumCleanPages(), 3); + EXPECT_EQ(TEST_BUFFER_LEN, client.GetReservation()); + + // Check that we can still use the reservation. + ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); + EXPECT_EQ(1, NumEvicted(pages)); + + // Check that we can decrease it to zero. + ASSERT_OK(client.DecreaseReservationTo(0)); + EXPECT_EQ(0, client.GetReservation()); + + DestroyAll(&pool, &client, &pages); + pool.DeregisterClient(&client); + global_reservations_.Close(); +} } int main(int argc, char** argv) { diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc index df92928be..2e9ba3d4a 100644 --- a/be/src/runtime/bufferpool/buffer-pool.cc +++ b/be/src/runtime/bufferpool/buffer-pool.cc @@ -296,6 +296,10 @@ bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) { return impl_->reservation()->IncreaseReservationToFit(bytes); } +Status BufferPool::ClientHandle::DecreaseReservationTo(int64_t target_bytes) { + return impl_->DecreaseReservationTo(target_bytes); +} + int64_t BufferPool::ClientHandle::GetReservation() const { return impl_->reservation()->GetReservation(); } @@ -334,6 +338,10 @@ void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double probabilit impl_->reservation()->SetDebugDenyIncreaseReservation(probability); } +bool BufferPool::ClientHandle::has_unpinned_pages() const { + return impl_->has_unpinned_pages(); +} + BufferPool::SubReservation::SubReservation(ClientHandle* client) { tracker_.reset(new ReservationTracker); tracker_->InitChildTracker( @@ -543,6 +551,19 @@ Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) { return Status::OK(); } +Status BufferPool::Client::DecreaseReservationTo(int64_t target_bytes) { + unique_lock lock(lock_); + int64_t current_reservation = reservation_.GetReservation(); + DCHECK_GE(current_reservation, target_bytes); + int64_t amount_to_free = + min(reservation_.GetUnusedReservation(), current_reservation - target_bytes); + if (amount_to_free == 0) return Status::OK(); + // Clean enough pages to allow us to safely release reservation. + RETURN_IF_ERROR(CleanPages(&lock, amount_to_free)); + reservation_.DecreaseReservation(amount_to_free); + return Status::OK(); +} + Status BufferPool::Client::CleanPages(unique_lock* client_lock, int64_t len) { DCheckHoldsLock(*client_lock); DCHECK_CONSISTENCY(); diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index 4798d6c21..7b1155177 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -324,6 +324,13 @@ class BufferPool::ClientHandle { /// if successful, after which 'bytes' can be used. bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT; + /// Try to decrease this client's reservation down to a minimum of 'target_bytes' by + /// releasing unused reservation to ancestor ReservationTrackers, all the way up to + /// the root of the ReservationTracker tree. This client's reservation must be at least + /// 'target_bytes' before calling this method. May fail if decreasing the reservation + /// requires flushing unpinned pages to disk and a write to disk fails. + Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT; + /// Move some of this client's reservation to the SubReservation. 'bytes' of unused /// reservation must be available in this tracker. void SaveReservation(SubReservation* dst, int64_t bytes); @@ -351,6 +358,9 @@ class BufferPool::ClientHandle { bool is_registered() const { return impl_ != NULL; } + /// Return true if there are any unpinned pages for this client. + bool has_unpinned_pages() const; + std::string DebugString() const; private: diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index ee0e4be06..de0594598 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -1760,4 +1760,11 @@ Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) { RETURN_IF_ERROR(merged_run->FinalizeInput()); return Status::OK(); } + +bool Sorter::HasSpilledRuns() const { + // All runs in 'merging_runs_' are spilled. 'sorted_runs_' can contain at most one + // non-spilled run. + return !merging_runs_.empty() || sorted_runs_.size() > 1 || + (sorted_runs_.size() == 1 && !sorted_runs_.back()->is_pinned()); +} } // namespace impala diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index cafab7277..5e7240be8 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -152,6 +152,9 @@ class Sorter { /// sort with the current sorter. int64_t ComputeMinReservation(); + /// Return true if the sorter has any spilled runs. + bool HasSpilledRuns() const; + private: class Page; class Run; @@ -239,7 +242,7 @@ class Sorter { /// memory. boost::scoped_ptr merger_; - /// Runs that are currently processed by the merge_. + /// Spilled runs that are currently processed by the merge_. /// These runs can be deleted when we are done with the current merge. std::deque merging_runs_; diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test index 6fe86c3e2..d9f60cc2c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test @@ -125,7 +125,7 @@ row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\) ==== ---- QUERY # Test aggregation spill with group_concat distinct -set buffer_pool_limit=50m; +set buffer_pool_limit=30m; select l_orderkey, count(*), group_concat(distinct l_linestatus, '|') from lineitem group by 1 diff --git a/testdata/workloads/tpch/queries/sort-reservation-usage.test b/testdata/workloads/tpch/queries/sort-reservation-usage.test new file mode 100644 index 000000000..92f180dd7 --- /dev/null +++ b/testdata/workloads/tpch/queries/sort-reservation-usage.test @@ -0,0 +1,30 @@ +==== +---- QUERY +# Test that in-mem sorts incrementally give up memory when emitting output. +# This query and the limit is calibrated to fail if the first sort does not +# give up memory to the second sort. +set num_nodes=1; +set scratch_limit=0; +set buffer_pool_limit=15m; +set default_spillable_buffer_size=64kb; +SELECT * +FROM (SELECT + Rank() OVER(ORDER BY l_orderkey) AS rank, + Rank() OVER(ORDER BY l_partkey) AS rank2 + FROM lineitem + WHERE l_shipdate < '1992-05-09') a +WHERE rank < 10 +ORDER BY rank; +---- RESULTS +1,118035 +2,55836 +2,141809 +2,155407 +5,84064 +5,129763 +7,10725 +7,31340 +7,155173 +---- TYPES +BIGINT,BIGINT +==== diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py index df95dddf1..0eae03559 100644 --- a/tests/query_test/test_sort.py +++ b/tests/query_test/test_sort.py @@ -159,6 +159,10 @@ class TestQueryFullSort(ImpalaTestSuite): query, exec_option, table_format=table_format).data) assert(result[0] == sorted(result[0])) + def test_sort_reservation_usage(self, vector): + """Tests for sorter reservation usage.""" + self.run_test_case('sort-reservation-usage', vector) + class TestRandomSort(ImpalaTestSuite): @classmethod def get_workload(self):