Files
opentf/internal/engine/lifecycle/completion_tracker.go
Martin Atkins afb92995e8 engine/lifecycle: CompletionTracker
This is a similar idea to a sync.WorkGroup except that it tracks the
completion of individual items represented by comparable values, rather
than only a count of expected items to complete.

My intention for this is for the planning engine to use it to track when
all of the expected work for a provider instance or ephemeral resource
instance has been completed, so that it can be closed shortly afterwards
instead of waiting for the entire planning operation to run to completion.

Signed-off-by: Martin Atkins <mart@degeneration.co.uk>
2025-10-27 10:15:41 -07:00

150 lines
4.7 KiB
Go

// Copyright (c) The OpenTofu Authors
// SPDX-License-Identifier: MPL-2.0
// Copyright (c) 2023 HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package lifecycle
import (
"iter"
"maps"
"sync"
"github.com/opentofu/opentofu/internal/collections"
)
// CompletionTracker is a synchronization utility that keeps a record of the
// completion of various items and allows various different goroutines to wait
// for the completion of different subsets of the items.
//
// "Items" can be of any comparable type, but the design intention is that a
// caller will define its own types to represent the different kinds of work
// it needs to track.
type CompletionTracker[T comparable] struct {
mu sync.Mutex
completed collections.Set[T]
waiters collections.Set[*completionWaiter[T]]
}
type completionWaiter[T comparable] struct {
pending collections.Set[T]
ch chan<- struct{}
}
// NewCompletionTracker returns a new [CompletionTracker] that initially
// has no waiters and no completed items.
func NewCompletionTracker[T comparable]() *CompletionTracker[T] {
return &CompletionTracker[T]{
completed: collections.NewSet[T](),
waiters: collections.NewSet[*completionWaiter[T]](),
}
}
// ItemComplete returns true if the given item has already been reported
// as complete using [CompletionTracker.ReportCompletion].
//
// A complete item can never become incomplete again, but if this function
// returns false then a concurrent goroutine could potentially report the
// item as complete before the caller acts on that result.
func (t *CompletionTracker[T]) ItemComplete(item T) bool {
t.mu.Lock()
_, ret := t.completed[item]
t.mu.Unlock()
return ret
}
// NewWaiterFor returns an unbuffered channel that will be closed once all
// of the addresses in the given seqence have had their completion reported
// using [CompletionTracker.ReportCompletion].
//
// No items will be sent to the channel.
//
// For callers that would just immediately block waiting for the given channel
// to be closed (without using it as part of a larger "select" statement),
// consider using the simpler [CompletionTracker.WaitFor] instead.
func (t *CompletionTracker[T]) NewWaiterFor(waitFor iter.Seq[T]) <-chan struct{} {
t.mu.Lock()
defer t.mu.Unlock()
ch := make(chan struct{})
waiter := &completionWaiter[T]{
pending: collections.NewSet[T](),
ch: ch,
}
for item := range waitFor {
if t.completed.Has(item) {
continue // ignore any already-completed items
}
waiter.pending[item] = struct{}{}
}
if len(waiter.pending) == 0 {
// If we didn't find any addresses that were not already completed
// then we'll just close the channel immediately before we return,
// and not track the waiter at all.
close(ch)
return ch
}
// If we have at least one item to wait for then we'll remember this
// new tracker so we can reconsider it each time something has its
// completion reported.
t.waiters[waiter] = struct{}{}
return ch
}
// WaitFor blocks until all of the addresses in the given set have had their
// completion reported using [CompletionTracker.ReportCompletion].
//
// This is a convenience wrapper for [CompletionTracker.NewWaiterFor] that
// just blocks until the returned channel is closed.
func (t *CompletionTracker[T]) WaitFor(waitFor iter.Seq[T]) {
ch := t.NewWaiterFor(waitFor)
for range ch {
// just block until the channel is closed
}
}
// ReportCompletion records the completion of the given item and signals
// any waiters for which it was the last remaining pending item.
func (t *CompletionTracker[T]) ReportCompletion(of T) {
t.mu.Lock()
t.completed[of] = struct{}{}
for waiter := range t.waiters {
delete(waiter.pending, of)
if len(waiter.pending) == 0 {
// nothing left for this waiter to wait for
close(waiter.ch)
delete(t.waiters, waiter)
}
}
t.mu.Unlock()
}
// PendingItems returns a set of all of the items that are pending for at
// least one waiter at the time of the call.
//
// This is mainly to allow detection and cleanup of uncompleted work: once
// a caller thinks that all work ought to have completed it can call this
// function and should hopefully receive an empty set. If not, it can report
// an error about certain items being left unresolved and optionally make
// synthetic calls to [CompletionTracker.ReportCompletion] to cause all of the
// remaining waiters to be unblocked.
//
// The result is a fresh set allocated for each call, so the caller is free
// to modify that set without corrupting the internal state of the
// [CompletionTracker].
func (t *CompletionTracker[T]) PendingItems() collections.Set[T] {
t.mu.Lock()
defer t.mu.Unlock()
if len(t.waiters) == 0 {
return nil
}
ret := collections.NewSet[T]()
for waiter := range t.waiters {
maps.Copy(ret, waiter.pending)
}
return ret
}