mirror of
https://github.com/opentffoundation/opentf.git
synced 2025-12-23 03:34:30 -05:00
This is a similar idea to a sync.WorkGroup except that it tracks the completion of individual items represented by comparable values, rather than only a count of expected items to complete. My intention for this is for the planning engine to use it to track when all of the expected work for a provider instance or ephemeral resource instance has been completed, so that it can be closed shortly afterwards instead of waiting for the entire planning operation to run to completion. Signed-off-by: Martin Atkins <mart@degeneration.co.uk>
150 lines
4.7 KiB
Go
150 lines
4.7 KiB
Go
// Copyright (c) The OpenTofu Authors
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
// Copyright (c) 2023 HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package lifecycle
|
|
|
|
import (
|
|
"iter"
|
|
"maps"
|
|
"sync"
|
|
|
|
"github.com/opentofu/opentofu/internal/collections"
|
|
)
|
|
|
|
// CompletionTracker is a synchronization utility that keeps a record of the
|
|
// completion of various items and allows various different goroutines to wait
|
|
// for the completion of different subsets of the items.
|
|
//
|
|
// "Items" can be of any comparable type, but the design intention is that a
|
|
// caller will define its own types to represent the different kinds of work
|
|
// it needs to track.
|
|
type CompletionTracker[T comparable] struct {
|
|
mu sync.Mutex
|
|
completed collections.Set[T]
|
|
waiters collections.Set[*completionWaiter[T]]
|
|
}
|
|
|
|
type completionWaiter[T comparable] struct {
|
|
pending collections.Set[T]
|
|
ch chan<- struct{}
|
|
}
|
|
|
|
// NewCompletionTracker returns a new [CompletionTracker] that initially
|
|
// has no waiters and no completed items.
|
|
func NewCompletionTracker[T comparable]() *CompletionTracker[T] {
|
|
return &CompletionTracker[T]{
|
|
completed: collections.NewSet[T](),
|
|
waiters: collections.NewSet[*completionWaiter[T]](),
|
|
}
|
|
}
|
|
|
|
// ItemComplete returns true if the given item has already been reported
|
|
// as complete using [CompletionTracker.ReportCompletion].
|
|
//
|
|
// A complete item can never become incomplete again, but if this function
|
|
// returns false then a concurrent goroutine could potentially report the
|
|
// item as complete before the caller acts on that result.
|
|
func (t *CompletionTracker[T]) ItemComplete(item T) bool {
|
|
t.mu.Lock()
|
|
_, ret := t.completed[item]
|
|
t.mu.Unlock()
|
|
return ret
|
|
}
|
|
|
|
// NewWaiterFor returns an unbuffered channel that will be closed once all
|
|
// of the addresses in the given seqence have had their completion reported
|
|
// using [CompletionTracker.ReportCompletion].
|
|
//
|
|
// No items will be sent to the channel.
|
|
//
|
|
// For callers that would just immediately block waiting for the given channel
|
|
// to be closed (without using it as part of a larger "select" statement),
|
|
// consider using the simpler [CompletionTracker.WaitFor] instead.
|
|
func (t *CompletionTracker[T]) NewWaiterFor(waitFor iter.Seq[T]) <-chan struct{} {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
ch := make(chan struct{})
|
|
waiter := &completionWaiter[T]{
|
|
pending: collections.NewSet[T](),
|
|
ch: ch,
|
|
}
|
|
for item := range waitFor {
|
|
if t.completed.Has(item) {
|
|
continue // ignore any already-completed items
|
|
}
|
|
waiter.pending[item] = struct{}{}
|
|
}
|
|
|
|
if len(waiter.pending) == 0 {
|
|
// If we didn't find any addresses that were not already completed
|
|
// then we'll just close the channel immediately before we return,
|
|
// and not track the waiter at all.
|
|
close(ch)
|
|
return ch
|
|
}
|
|
|
|
// If we have at least one item to wait for then we'll remember this
|
|
// new tracker so we can reconsider it each time something has its
|
|
// completion reported.
|
|
t.waiters[waiter] = struct{}{}
|
|
return ch
|
|
}
|
|
|
|
// WaitFor blocks until all of the addresses in the given set have had their
|
|
// completion reported using [CompletionTracker.ReportCompletion].
|
|
//
|
|
// This is a convenience wrapper for [CompletionTracker.NewWaiterFor] that
|
|
// just blocks until the returned channel is closed.
|
|
func (t *CompletionTracker[T]) WaitFor(waitFor iter.Seq[T]) {
|
|
ch := t.NewWaiterFor(waitFor)
|
|
for range ch {
|
|
// just block until the channel is closed
|
|
}
|
|
}
|
|
|
|
// ReportCompletion records the completion of the given item and signals
|
|
// any waiters for which it was the last remaining pending item.
|
|
func (t *CompletionTracker[T]) ReportCompletion(of T) {
|
|
t.mu.Lock()
|
|
t.completed[of] = struct{}{}
|
|
for waiter := range t.waiters {
|
|
delete(waiter.pending, of)
|
|
if len(waiter.pending) == 0 {
|
|
// nothing left for this waiter to wait for
|
|
close(waiter.ch)
|
|
delete(t.waiters, waiter)
|
|
}
|
|
}
|
|
t.mu.Unlock()
|
|
}
|
|
|
|
// PendingItems returns a set of all of the items that are pending for at
|
|
// least one waiter at the time of the call.
|
|
//
|
|
// This is mainly to allow detection and cleanup of uncompleted work: once
|
|
// a caller thinks that all work ought to have completed it can call this
|
|
// function and should hopefully receive an empty set. If not, it can report
|
|
// an error about certain items being left unresolved and optionally make
|
|
// synthetic calls to [CompletionTracker.ReportCompletion] to cause all of the
|
|
// remaining waiters to be unblocked.
|
|
//
|
|
// The result is a fresh set allocated for each call, so the caller is free
|
|
// to modify that set without corrupting the internal state of the
|
|
// [CompletionTracker].
|
|
func (t *CompletionTracker[T]) PendingItems() collections.Set[T] {
|
|
t.mu.Lock()
|
|
defer t.mu.Unlock()
|
|
|
|
if len(t.waiters) == 0 {
|
|
return nil
|
|
}
|
|
ret := collections.NewSet[T]()
|
|
for waiter := range t.waiters {
|
|
maps.Copy(ret, waiter.pending)
|
|
}
|
|
return ret
|
|
}
|