From 8609b09a95f2b20e42e4ba4aa8b5d605fb545a4e Mon Sep 17 00:00:00 2001 From: Tim Armstrong Date: Mon, 7 Aug 2017 09:15:29 -0700 Subject: [PATCH] IMPALA-5681: release reservation from blocking operators When an in-memory blocking aggregation or join is in the GetNext() phase where it is outputting accumulated rows then we expect memory consumption to monotonically decrease because no more rows will be accumulated in memory. This change adds support to release unused reservation and makes use of it for in-memory aggregations and sorts. We don't release memory for operators with spilled data, since they may need the reservation to bring it back into memory. We also don't release memory in subplans, since it will probably be used in a later iteration of the subplan. Testing: Updated spilling test that now requires less memory. Ran stress test binary search on tpch_parquet. No changes, except Q18 now requires 325MB instead of 450MB to execute without spilling. Ran query with two sorts in the same pipeline and watched /memz to confirm that the first node in the pipeline was incrementally releasing memory. Added a regression test based on this experiment. Added a backend test to directly test reservation decreasing. Change-Id: I6f4d0ad127d5fcd14b9821a7c127eec11d98692f Reviewed-on: http://gerrit.cloudera.org:8080/7619 Reviewed-by: Tim Armstrong Tested-by: Impala Public Jenkins --- be/src/exec/exec-node.cc | 4 ++ be/src/exec/exec-node.h | 11 +++- be/src/exec/partitioned-aggregation-node.cc | 10 +++ be/src/exec/sort-node.cc | 14 +++++ be/src/exec/sort-node.h | 4 ++ .../runtime/bufferpool/buffer-pool-internal.h | 14 ++++- be/src/runtime/bufferpool/buffer-pool-test.cc | 62 +++++++++++++++++-- be/src/runtime/bufferpool/buffer-pool.cc | 21 +++++++ be/src/runtime/bufferpool/buffer-pool.h | 10 +++ be/src/runtime/sorter.cc | 7 +++ be/src/runtime/sorter.h | 5 +- .../queries/QueryTest/spilling-aggs.test | 2 +- .../tpch/queries/sort-reservation-usage.test | 30 +++++++++ tests/query_test/test_sort.py | 4 ++ 14 files changed, 187 insertions(+), 11 deletions(-) create mode 100644 testdata/workloads/tpch/queries/sort-reservation-usage.test diff --git a/be/src/exec/exec-node.cc b/be/src/exec/exec-node.cc index c3a5f80d4..afd72620e 100644 --- a/be/src/exec/exec-node.cc +++ b/be/src/exec/exec-node.cc @@ -247,6 +247,10 @@ Status ExecNode::ClaimBufferReservation(RuntimeState* state) { return Status::OK(); } +Status ExecNode::ReleaseUnusedReservation() { + return buffer_pool_client_.DecreaseReservationTo(resource_profile_.min_reservation); +} + Status ExecNode::CreateTree( RuntimeState* state, const TPlan& plan, const DescriptorTbl& descs, ExecNode** root) { if (plan.nodes.size() == 0) { diff --git a/be/src/exec/exec-node.h b/be/src/exec/exec-node.h index 55c51ab7e..04470f2af 100644 --- a/be/src/exec/exec-node.h +++ b/be/src/exec/exec-node.h @@ -233,7 +233,16 @@ class ExecNode { /// ExecNode. Only needs to be called by ExecNodes that will use the client. /// The client is automatically cleaned up in Close(). Should not be called if /// the client is already open. - Status ClaimBufferReservation(RuntimeState* state); + /// + /// The ExecNode must return the initial reservation to + /// QueryState::initial_reservations(), which is done automatically in Close() as long + /// as the initial reservation is not released before Close(). + Status ClaimBufferReservation(RuntimeState* state) WARN_UNUSED_RESULT; + + /// Release any unused reservation in excess of the node's initial reservation. Returns + /// an error if releasing the reservation requires flushing pages to disk, and that + /// fails. + Status ReleaseUnusedReservation() WARN_UNUSED_RESULT; /// Extends blocking queue for row batches. Row batches have a property that /// they must be processed in the order they were produced, even in cancellation diff --git a/be/src/exec/partitioned-aggregation-node.cc b/be/src/exec/partitioned-aggregation-node.cc index 8432bccb6..394904139 100644 --- a/be/src/exec/partitioned-aggregation-node.cc +++ b/be/src/exec/partitioned-aggregation-node.cc @@ -1198,6 +1198,16 @@ int64_t PartitionedAggregationNode::LargestSpilledPartition() const { Status PartitionedAggregationNode::NextPartition() { DCHECK(output_partition_ == nullptr); + if (!IsInSubplan() && spilled_partitions_.empty()) { + // All partitions are in memory. Release reservation that was used for previous + // partitions that is no longer needed. If we have spilled partitions, we want to + // hold onto all reservation in case it is needed to process the spilled partitions. + DCHECK(!buffer_pool_client_.has_unpinned_pages()); + Status status = ReleaseUnusedReservation(); + DCHECK(status.ok()) << "Should not fail - all partitions are in memory so there are " + << "no unpinned pages. " << status.GetDetail(); + } + // Keep looping until we get to a partition that fits in memory. Partition* partition = nullptr; while (true) { diff --git a/be/src/exec/sort-node.cc b/be/src/exec/sort-node.cc index 440f80950..80df2146a 100644 --- a/be/src/exec/sort-node.cc +++ b/be/src/exec/sort-node.cc @@ -103,6 +103,19 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { *eos = false; } + if (returned_buffer_) { + // If the Sorter returned a buffer on the last call to GetNext(), we might have an + // opportunity to release memory. Release reservation, unless it might be needed + // for the next subplan iteration or merging spilled runs. + returned_buffer_ = false; + if (!IsInSubplan() && !sorter_->HasSpilledRuns()) { + DCHECK(!buffer_pool_client_.has_unpinned_pages()); + Status status = ReleaseUnusedReservation(); + DCHECK(status.ok()) << "Should not fail - no runs were spilled so no pages are " + << "unpinned. " << status.GetDetail(); + } + } + DCHECK_EQ(row_batch->num_rows(), 0); RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos)); while ((num_rows_skipped_ < offset_)) { @@ -119,6 +132,7 @@ Status SortNode::GetNext(RuntimeState* state, RowBatch* row_batch, bool* eos) { RETURN_IF_ERROR(sorter_->GetNext(row_batch, eos)); } + returned_buffer_ = row_batch->num_buffers() > 0; num_rows_returned_ += row_batch->num_rows(); if (ReachedLimit()) { row_batch->set_num_rows(row_batch->num_rows() - (num_rows_returned_ - limit_)); diff --git a/be/src/exec/sort-node.h b/be/src/exec/sort-node.h index a11d42463..d6eef256a 100644 --- a/be/src/exec/sort-node.h +++ b/be/src/exec/sort-node.h @@ -69,6 +69,10 @@ class SortNode : public ExecNode { std::vector is_asc_order_; std::vector nulls_first_; + /// Whether the previous call to GetNext() returned a buffer attached to the RowBatch. + /// Used to avoid unnecessary calls to ReleaseUnusedReservation(). + bool returned_buffer_ = false; + ///////////////////////////////////////// /// BEGIN: Members that must be Reset() diff --git a/be/src/runtime/bufferpool/buffer-pool-internal.h b/be/src/runtime/bufferpool/buffer-pool-internal.h index 0c0408be2..70949422e 100644 --- a/be/src/runtime/bufferpool/buffer-pool-internal.h +++ b/be/src/runtime/bufferpool/buffer-pool-internal.h @@ -227,7 +227,7 @@ class BufferPool::Client { /// already in memory, ensures the data is in the page's buffer. If the data is on /// disk, starts an async read of the data and sets 'pin_in_flight' on the page to /// true. Neither the client's lock nor page->buffer_lock should be held by the caller. - Status StartMoveToPinned(ClientHandle* client, Page* page); + Status StartMoveToPinned(ClientHandle* client, Page* page) WARN_UNUSED_RESULT; /// Moves a page that has a pin in flight back to the evicted state, undoing /// StartMoveToPinned(). Neither the client's lock nor page->buffer_lock should be held @@ -236,13 +236,16 @@ class BufferPool::Client { /// Finish the work of bring the data of an evicted page to memory if /// page->pin_in_flight was set to true by StartMoveToPinned(). - Status FinishMoveEvictedToPinned(Page* page); + Status FinishMoveEvictedToPinned(Page* page) WARN_UNUSED_RESULT; /// Must be called once before allocating a buffer of 'len' via the AllocateBuffer() /// API to deduct from the client's reservation and update internal accounting. Cleans /// dirty pages if needed to satisfy the buffer pool's internal invariants. No page or /// client locks should be held by the caller. - Status PrepareToAllocateBuffer(int64_t len); + Status PrepareToAllocateBuffer(int64_t len) WARN_UNUSED_RESULT; + + /// Implementation of ClientHandle::DecreaseReservationTo(). + Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT; /// Called after a buffer of 'len' is freed via the FreeBuffer() API to update /// internal accounting and release the buffer to the client's reservation. No page or @@ -272,6 +275,11 @@ class BufferPool::Client { const BufferPoolClientCounters& counters() const { return counters_; } bool spilling_enabled() const { return file_group_ != NULL; } void set_debug_write_delay_ms(int val) { debug_write_delay_ms_ = val; } + bool has_unpinned_pages() const { + // Safe to read without lock since other threads should not be calling BufferPool + // functions that create, destroy or unpin pages. + return pinned_pages_.size() < num_pages_; + } std::string DebugString(); diff --git a/be/src/runtime/bufferpool/buffer-pool-test.cc b/be/src/runtime/bufferpool/buffer-pool-test.cc index 2eff955e9..b2f869513 100644 --- a/be/src/runtime/bufferpool/buffer-pool-test.cc +++ b/be/src/runtime/bufferpool/buffer-pool-test.cc @@ -157,6 +157,14 @@ class BufferPoolTest : public ::testing::Test { return !page->page_->buffer.is_open(); } + int NumEvicted(vector& pages) { + int num_evicted = 0; + for (PageHandle& page : pages) { + if (IsEvicted(&page)) ++num_evicted; + } + return num_evicted; + } + /// Allocate buffers of varying sizes at most 'max_buffer_size' that add up to /// 'total_bytes'. Both numbers must be a multiple of the minimum buffer size. /// If 'randomize_core' is true, will switch thread between cores randomly before @@ -606,6 +614,7 @@ TEST_F(BufferPoolTest, CleanPageStats) { vector pages; CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); WriteData(pages, 0); + EXPECT_FALSE(client.has_unpinned_pages()); // Pages don't start off clean. EXPECT_EQ(0, pool.GetNumCleanPages()); @@ -613,22 +622,27 @@ TEST_F(BufferPoolTest, CleanPageStats) { // Unpin pages and wait until they're written out and therefore clean. UnpinAll(&pool, &client, &pages); + EXPECT_TRUE(client.has_unpinned_pages()); WaitForAllWrites(&client); EXPECT_EQ(MAX_NUM_BUFFERS, pool.GetNumCleanPages()); EXPECT_EQ(TOTAL_MEM, pool.GetCleanPageBytes()); + EXPECT_TRUE(client.has_unpinned_pages()); // Do an allocation to force eviction of one page. ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); EXPECT_EQ(MAX_NUM_BUFFERS - 1, pool.GetNumCleanPages()); EXPECT_EQ(TOTAL_MEM - TEST_BUFFER_LEN, pool.GetCleanPageBytes()); + EXPECT_TRUE(client.has_unpinned_pages()); // Re-pin all the pages - none will be clean afterwards. ASSERT_OK(PinAll(&pool, &client, &pages)); VerifyData(pages, 0); EXPECT_EQ(0, pool.GetNumCleanPages()); EXPECT_EQ(0, pool.GetCleanPageBytes()); + EXPECT_FALSE(client.has_unpinned_pages()); DestroyAll(&pool, &client, &pages); + EXPECT_FALSE(client.has_unpinned_pages()); pool.DeregisterClient(&client); global_reservations_.Close(); } @@ -1242,11 +1256,7 @@ void BufferPoolTest::TestEvictionPolicy(int64_t page_size) { // No additional memory should have been allocated - it should have been recycled. EXPECT_EQ(total_mem, pool.GetSystemBytesAllocated()); // Check that two pages were evicted. - int num_evicted = 0; - for (PageHandle& page : pages) { - if (IsEvicted(&page)) ++num_evicted; - } - EXPECT_EQ(NUM_EXTRA_BUFFERS, num_evicted); + EXPECT_EQ(NUM_EXTRA_BUFFERS, NumEvicted(pages)); // Free up memory required to pin the original pages again. FreeBuffers(&pool, &client, &extra_buffers); @@ -1928,6 +1938,48 @@ TEST_F(BufferPoolTest, SubReservation) { subreservation.Close(); pool.DeregisterClient(&client); } + +// Check that we can decrease reservation without violating any buffer pool invariants. +TEST_F(BufferPoolTest, DecreaseReservation) { + const int MAX_NUM_BUFFERS = 4; + const int64_t TOTAL_MEM = MAX_NUM_BUFFERS * TEST_BUFFER_LEN; + global_reservations_.InitRootTracker(NewProfile(), TOTAL_MEM); + BufferPool pool(TEST_BUFFER_LEN, TOTAL_MEM); + + ClientHandle client; + ASSERT_OK(pool.RegisterClient("test client", NewFileGroup(), &global_reservations_, + nullptr, TOTAL_MEM, NewProfile(), &client)); + ASSERT_TRUE(client.IncreaseReservation(TOTAL_MEM)); + + vector pages; + CreatePages(&pool, &client, TEST_BUFFER_LEN, TOTAL_MEM, &pages); + WriteData(pages, 0); + + // Unpin pages and decrease reservation while the writes are in flight. + UnpinAll(&pool, &client, &pages); + ASSERT_OK(client.DecreaseReservationTo(2 * TEST_BUFFER_LEN)); + // Two pages must be clean to stay within reservation + EXPECT_GE(pool.GetNumCleanPages(), 2); + EXPECT_EQ(2 * TEST_BUFFER_LEN, client.GetReservation()); + + // Decrease it further after the pages are evicted. + WaitForAllWrites(&client); + ASSERT_OK(client.DecreaseReservationTo(TEST_BUFFER_LEN)); + EXPECT_GE(pool.GetNumCleanPages(), 3); + EXPECT_EQ(TEST_BUFFER_LEN, client.GetReservation()); + + // Check that we can still use the reservation. + ASSERT_OK(AllocateAndFree(&pool, &client, TEST_BUFFER_LEN)); + EXPECT_EQ(1, NumEvicted(pages)); + + // Check that we can decrease it to zero. + ASSERT_OK(client.DecreaseReservationTo(0)); + EXPECT_EQ(0, client.GetReservation()); + + DestroyAll(&pool, &client, &pages); + pool.DeregisterClient(&client); + global_reservations_.Close(); +} } int main(int argc, char** argv) { diff --git a/be/src/runtime/bufferpool/buffer-pool.cc b/be/src/runtime/bufferpool/buffer-pool.cc index df92928be..2e9ba3d4a 100644 --- a/be/src/runtime/bufferpool/buffer-pool.cc +++ b/be/src/runtime/bufferpool/buffer-pool.cc @@ -296,6 +296,10 @@ bool BufferPool::ClientHandle::IncreaseReservationToFit(int64_t bytes) { return impl_->reservation()->IncreaseReservationToFit(bytes); } +Status BufferPool::ClientHandle::DecreaseReservationTo(int64_t target_bytes) { + return impl_->DecreaseReservationTo(target_bytes); +} + int64_t BufferPool::ClientHandle::GetReservation() const { return impl_->reservation()->GetReservation(); } @@ -334,6 +338,10 @@ void BufferPool::ClientHandle::SetDebugDenyIncreaseReservation(double probabilit impl_->reservation()->SetDebugDenyIncreaseReservation(probability); } +bool BufferPool::ClientHandle::has_unpinned_pages() const { + return impl_->has_unpinned_pages(); +} + BufferPool::SubReservation::SubReservation(ClientHandle* client) { tracker_.reset(new ReservationTracker); tracker_->InitChildTracker( @@ -543,6 +551,19 @@ Status BufferPool::Client::PrepareToAllocateBuffer(int64_t len) { return Status::OK(); } +Status BufferPool::Client::DecreaseReservationTo(int64_t target_bytes) { + unique_lock lock(lock_); + int64_t current_reservation = reservation_.GetReservation(); + DCHECK_GE(current_reservation, target_bytes); + int64_t amount_to_free = + min(reservation_.GetUnusedReservation(), current_reservation - target_bytes); + if (amount_to_free == 0) return Status::OK(); + // Clean enough pages to allow us to safely release reservation. + RETURN_IF_ERROR(CleanPages(&lock, amount_to_free)); + reservation_.DecreaseReservation(amount_to_free); + return Status::OK(); +} + Status BufferPool::Client::CleanPages(unique_lock* client_lock, int64_t len) { DCheckHoldsLock(*client_lock); DCHECK_CONSISTENCY(); diff --git a/be/src/runtime/bufferpool/buffer-pool.h b/be/src/runtime/bufferpool/buffer-pool.h index 4798d6c21..7b1155177 100644 --- a/be/src/runtime/bufferpool/buffer-pool.h +++ b/be/src/runtime/bufferpool/buffer-pool.h @@ -324,6 +324,13 @@ class BufferPool::ClientHandle { /// if successful, after which 'bytes' can be used. bool IncreaseReservationToFit(int64_t bytes) WARN_UNUSED_RESULT; + /// Try to decrease this client's reservation down to a minimum of 'target_bytes' by + /// releasing unused reservation to ancestor ReservationTrackers, all the way up to + /// the root of the ReservationTracker tree. This client's reservation must be at least + /// 'target_bytes' before calling this method. May fail if decreasing the reservation + /// requires flushing unpinned pages to disk and a write to disk fails. + Status DecreaseReservationTo(int64_t target_bytes) WARN_UNUSED_RESULT; + /// Move some of this client's reservation to the SubReservation. 'bytes' of unused /// reservation must be available in this tracker. void SaveReservation(SubReservation* dst, int64_t bytes); @@ -351,6 +358,9 @@ class BufferPool::ClientHandle { bool is_registered() const { return impl_ != NULL; } + /// Return true if there are any unpinned pages for this client. + bool has_unpinned_pages() const; + std::string DebugString() const; private: diff --git a/be/src/runtime/sorter.cc b/be/src/runtime/sorter.cc index ee0e4be06..de0594598 100644 --- a/be/src/runtime/sorter.cc +++ b/be/src/runtime/sorter.cc @@ -1760,4 +1760,11 @@ Status Sorter::ExecuteIntermediateMerge(Sorter::Run* merged_run) { RETURN_IF_ERROR(merged_run->FinalizeInput()); return Status::OK(); } + +bool Sorter::HasSpilledRuns() const { + // All runs in 'merging_runs_' are spilled. 'sorted_runs_' can contain at most one + // non-spilled run. + return !merging_runs_.empty() || sorted_runs_.size() > 1 || + (sorted_runs_.size() == 1 && !sorted_runs_.back()->is_pinned()); +} } // namespace impala diff --git a/be/src/runtime/sorter.h b/be/src/runtime/sorter.h index cafab7277..5e7240be8 100644 --- a/be/src/runtime/sorter.h +++ b/be/src/runtime/sorter.h @@ -152,6 +152,9 @@ class Sorter { /// sort with the current sorter. int64_t ComputeMinReservation(); + /// Return true if the sorter has any spilled runs. + bool HasSpilledRuns() const; + private: class Page; class Run; @@ -239,7 +242,7 @@ class Sorter { /// memory. boost::scoped_ptr merger_; - /// Runs that are currently processed by the merge_. + /// Spilled runs that are currently processed by the merge_. /// These runs can be deleted when we are done with the current merge. std::deque merging_runs_; diff --git a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test index 6fe86c3e2..d9f60cc2c 100644 --- a/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test +++ b/testdata/workloads/functional-query/queries/QueryTest/spilling-aggs.test @@ -125,7 +125,7 @@ row_regex: .*RowsPassedThrough: .* \([1-9][0-9]*\) ==== ---- QUERY # Test aggregation spill with group_concat distinct -set buffer_pool_limit=50m; +set buffer_pool_limit=30m; select l_orderkey, count(*), group_concat(distinct l_linestatus, '|') from lineitem group by 1 diff --git a/testdata/workloads/tpch/queries/sort-reservation-usage.test b/testdata/workloads/tpch/queries/sort-reservation-usage.test new file mode 100644 index 000000000..92f180dd7 --- /dev/null +++ b/testdata/workloads/tpch/queries/sort-reservation-usage.test @@ -0,0 +1,30 @@ +==== +---- QUERY +# Test that in-mem sorts incrementally give up memory when emitting output. +# This query and the limit is calibrated to fail if the first sort does not +# give up memory to the second sort. +set num_nodes=1; +set scratch_limit=0; +set buffer_pool_limit=15m; +set default_spillable_buffer_size=64kb; +SELECT * +FROM (SELECT + Rank() OVER(ORDER BY l_orderkey) AS rank, + Rank() OVER(ORDER BY l_partkey) AS rank2 + FROM lineitem + WHERE l_shipdate < '1992-05-09') a +WHERE rank < 10 +ORDER BY rank; +---- RESULTS +1,118035 +2,55836 +2,141809 +2,155407 +5,84064 +5,129763 +7,10725 +7,31340 +7,155173 +---- TYPES +BIGINT,BIGINT +==== diff --git a/tests/query_test/test_sort.py b/tests/query_test/test_sort.py index df95dddf1..0eae03559 100644 --- a/tests/query_test/test_sort.py +++ b/tests/query_test/test_sort.py @@ -159,6 +159,10 @@ class TestQueryFullSort(ImpalaTestSuite): query, exec_option, table_format=table_format).data) assert(result[0] == sorted(result[0])) + def test_sort_reservation_usage(self, vector): + """Tests for sorter reservation usage.""" + self.run_test_case('sort-reservation-usage', vector) + class TestRandomSort(ImpalaTestSuite): @classmethod def get_workload(self):