mirror of
https://github.com/opentffoundation/opentf.git
synced 2025-12-19 17:59:05 -05:00
execgraph: Most of the "compiler" machinery
This covers most of the logic required to turn a source graph into a compiled graph ready for execution. There's currently only support for one of the opcodes though, so subsequent commits will sketch those out more and then add some tests and fix any problems that inevitably exist here but aren't yet visible because there are no tests. Signed-off-by: Martin Atkins <mart@degeneration.co.uk>
This commit is contained in:
@@ -18,7 +18,7 @@ import (
|
||||
)
|
||||
|
||||
type CompiledGraph struct {
|
||||
// ops is the main essence of a compiled graph: a series of functions
|
||||
// steps is the main essence of a compiled graph: a series of functions
|
||||
// that we'll run all at once, one goroutine each, and then wait until
|
||||
// they've all returned something.
|
||||
//
|
||||
@@ -27,7 +27,7 @@ type CompiledGraph struct {
|
||||
// compiler to arrange for the necessary data flow while it's building
|
||||
// these compiled operations. Execution is complete once all of these
|
||||
// functions have returned.
|
||||
ops []anyCompiledOperation
|
||||
steps []nodeExecuteRaw
|
||||
|
||||
// resourceInstanceValues provides a function for each resource instance
|
||||
// that was registered as a "sink" during graph building which blocks
|
||||
@@ -65,10 +65,10 @@ func (c *CompiledGraph) Execute(ctx context.Context) tfdiags.Diagnostics {
|
||||
var diagsMu sync.Mutex
|
||||
|
||||
var wg sync.WaitGroup
|
||||
wg.Add(len(c.ops))
|
||||
for _, op := range c.ops {
|
||||
wg.Add(len(c.steps))
|
||||
for _, op := range c.steps {
|
||||
wg.Go(func() {
|
||||
opDiags := op(grapheval.ContextWithNewWorker(ctx))
|
||||
_, _, opDiags := op(grapheval.ContextWithNewWorker(ctx))
|
||||
diagsMu.Lock()
|
||||
diags = diags.Append(opDiags)
|
||||
diagsMu.Unlock()
|
||||
|
||||
@@ -7,22 +7,53 @@ package execgraph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"iter"
|
||||
"strings"
|
||||
"sync"
|
||||
|
||||
"github.com/apparentlymart/go-workgraph/workgraph"
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/lang/eval"
|
||||
"github.com/opentofu/opentofu/internal/lang/grapheval"
|
||||
"github.com/opentofu/opentofu/internal/states"
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
)
|
||||
|
||||
func (g *Graph) Compile() *CompiledGraph {
|
||||
// Compile produces a compiled version of the graph which will, once executed,
|
||||
// use the given arguments to interact with other parts of the broader system.
|
||||
//
|
||||
// TODO: This currently takes a prior state snapshot using our current models,
|
||||
// but the state model we have probably isn't the best for this new execution
|
||||
// approach.
|
||||
func (g *Graph) Compile(oracle *eval.ApplyOracle, evalCtx *eval.EvalContext, priorState *states.SyncState) (*CompiledGraph, tfdiags.Diagnostics) {
|
||||
ret := &CompiledGraph{
|
||||
ops: make([]anyCompiledOperation, len(g.ops)),
|
||||
resourceInstanceValues: addrs.MakeMap[addrs.AbsResourceInstance, func(ctx context.Context) cty.Value](),
|
||||
cleanupWorker: workgraph.NewWorker(),
|
||||
}
|
||||
c := &compiler{
|
||||
sourceGraph: g,
|
||||
compiledGraph: ret,
|
||||
sourceGraph: g,
|
||||
compiledGraph: ret,
|
||||
oracle: oracle,
|
||||
opResolvers: make([]workgraph.Resolver[nodeResultRaw], len(g.ops)),
|
||||
opResults: make([]workgraph.Promise[nodeResultRaw], len(g.ops)),
|
||||
desiredStateFuncs: make([]nodeExecuteRaw, len(g.desiredStateRefs)),
|
||||
providerInstConfigFuncs: make([]nodeExecuteRaw, len(g.providerInstConfigRefs)),
|
||||
}
|
||||
// We'll prepopulate all of the operation promises, and then the compiler
|
||||
// will arrange for them to each get wired where they need to be.
|
||||
for i := range c.opResults {
|
||||
// The "cleanupWorker" is initially the responsible worker, but
|
||||
// the compiler arranges for responsibility to transfer to per-operation
|
||||
// workers created dynamically as the graph is executed, so in the
|
||||
// happy path cleanupWorker should end up responsible for nothing
|
||||
// at the end. (If that isn't true then all of the remaining requests
|
||||
// will force-fail when the compiled graph gets garbage collected.)
|
||||
resolver, promise := workgraph.NewRequest[nodeResultRaw](ret.cleanupWorker)
|
||||
c.opResolvers[i] = resolver
|
||||
c.opResults[i] = promise
|
||||
}
|
||||
return c.Compile()
|
||||
}
|
||||
@@ -35,9 +66,432 @@ func (g *Graph) Compile() *CompiledGraph {
|
||||
type compiler struct {
|
||||
sourceGraph *Graph
|
||||
compiledGraph *CompiledGraph
|
||||
oracle *eval.ApplyOracle
|
||||
evalCtx *eval.EvalContext
|
||||
priorState *states.SyncState
|
||||
|
||||
// opResolvers and opResults track our requests for our operation results,
|
||||
// each of which should be resolved by one of the "steps" in the compiled
|
||||
// graph so that the data can then propagate between nodes.
|
||||
//
|
||||
// The indices of this slice correspond to the indices of sourceGraph.ops.
|
||||
// The promises in here are initially owned by compiledGraph.cleanupWorker,
|
||||
// but responsibility for them is transferred to the worker for each
|
||||
// operation's "step" in the compiled graph once they begin executing.
|
||||
opResolvers []workgraph.Resolver[nodeResultRaw]
|
||||
opResults []workgraph.Promise[nodeResultRaw]
|
||||
|
||||
// Some of our node types cause fallible side-effects and so we memoize
|
||||
// what we returned to ensure that the action only runs once and then
|
||||
// its results are distributed to all referrers.
|
||||
//
|
||||
// The indices of each of these correlate with the matching slices in
|
||||
// sourceGraph.
|
||||
desiredStateFuncs []nodeExecuteRaw
|
||||
providerInstConfigFuncs []nodeExecuteRaw
|
||||
|
||||
// diags accumulates any problems we detect during the compilation process,
|
||||
// which are ultimately returned by [compiler.Compile] so that the caller
|
||||
// knows not to even try executing the result graph.
|
||||
diags tfdiags.Diagnostics
|
||||
}
|
||||
|
||||
func (c *compiler) Compile() *CompiledGraph {
|
||||
// TODO: Implement
|
||||
return c.compiledGraph
|
||||
func (c *compiler) Compile() (*CompiledGraph, tfdiags.Diagnostics) {
|
||||
// Although the _execution_ of the compiled graph runs all of the steps
|
||||
// concurrently, the compiler itself is intentionally written as
|
||||
// sequential code in the hope of that making it easier to understand
|
||||
// and maintain, since it's inevitably quite self-referential as it
|
||||
// turns the source graph into a series of executable functions.
|
||||
|
||||
// The operations are the main part of the graph we actually care about
|
||||
// because they represent externally-visible side-effects. We'll use
|
||||
// those as our main vehicle for compilation, producing compiled versions
|
||||
// of other nodes as we go along only as needed to satisfy the operations.
|
||||
opResolvers := c.opResolvers
|
||||
for opIdx, opDesc := range c.sourceGraph.ops {
|
||||
operands := newCompilerOperands(c.compileOperands(opDesc.operands))
|
||||
var compileFunc func(operands *compilerOperands) nodeExecuteRaw
|
||||
switch opDesc.opCode {
|
||||
case opOpenProvider:
|
||||
compileFunc = c.compileOpOpenProvider
|
||||
default:
|
||||
c.diags = c.diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Unsupported opcode in execution graph",
|
||||
fmt.Sprintf("Execution graph includes opcode %s, but the compiler doesn't know how to handle it. This is a bug in OpenTofu.", opDesc.opCode),
|
||||
))
|
||||
continue
|
||||
}
|
||||
// The main execution function deals with the opCode-specific behavior,
|
||||
// but we need to wrap it in some general code that arranges for
|
||||
// the operation results to propagate through the graph using the
|
||||
// promises set up in [Graph.Compile].
|
||||
mainExec := compileFunc(operands)
|
||||
graphExec := func(parentCtx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
// Each operation's execution must have its own workgraph worker
|
||||
// that's responsible for resolving the associated promise, since
|
||||
// that allows us to detect if operations try to depend on their
|
||||
// own results, or if the implementation panics and thus causes
|
||||
// this worker to get garbage-collected.
|
||||
resolver := opResolvers[opIdx]
|
||||
worker := workgraph.NewWorker(resolver)
|
||||
ctx := grapheval.ContextWithWorker(parentCtx, worker)
|
||||
ret, ok, diags := mainExec(ctx)
|
||||
// Resolving the promise might allow dependent operations to begin.
|
||||
resolver.ReportSuccess(worker, nodeResultRaw{
|
||||
Value: ret,
|
||||
CanContinue: ok,
|
||||
Diagnostics: diags,
|
||||
})
|
||||
return ret, ok, diags
|
||||
}
|
||||
c.compiledGraph.steps = append(c.compiledGraph.steps, graphExec)
|
||||
}
|
||||
if c.diags.HasErrors() {
|
||||
// Don't expose the likely-invalid compiled graph, then.
|
||||
return nil, c.diags
|
||||
}
|
||||
|
||||
// Before we return we also need to fill in the resource instance values
|
||||
// so that it's possible to get the information needed to satisfy the
|
||||
// evaluation system.
|
||||
for _, elem := range c.sourceGraph.resourceInstanceResults.Elems {
|
||||
instAddr := elem.Key
|
||||
ref := elem.Value
|
||||
execFunc := c.compileResultRef(ref)
|
||||
c.compiledGraph.resourceInstanceValues.Put(instAddr, func(ctx context.Context) cty.Value {
|
||||
rawResult, ok, _ := execFunc(ctx)
|
||||
if !ok {
|
||||
return cty.DynamicVal
|
||||
}
|
||||
finalStateObj := rawResult.(*states.ResourceInstanceObject)
|
||||
return finalStateObj.Value
|
||||
})
|
||||
}
|
||||
|
||||
return c.compiledGraph, c.diags
|
||||
}
|
||||
|
||||
func (c *compiler) compileOperands(refs []AnyResultRef) iter.Seq2[AnyResultRef, nodeExecuteRaw] {
|
||||
return func(yield func(AnyResultRef, nodeExecuteRaw) bool) {
|
||||
for _, ref := range refs {
|
||||
exec := c.compileResultRef(ref)
|
||||
if !yield(ref, exec) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// compileResultRef transforms a result reference into a function that blocks
|
||||
// until the associated result is ready and then returns that result as a
|
||||
// value of type [any], which the caller could then cast into the concrete
|
||||
// type that the result was expected to produce.
|
||||
func (c *compiler) compileResultRef(ref AnyResultRef) nodeExecuteRaw {
|
||||
// The closures we return should only capture primitive values and
|
||||
// pointers to as small a part of the compiler's state as possible, so
|
||||
// that the overall compiler object can be garbage-collected once
|
||||
// compilation is complete.
|
||||
oracle := c.oracle
|
||||
|
||||
// For any of the cases that return functions that cause side-effects that
|
||||
// can potentially fail we must use a "once" wrapper to ensure that the
|
||||
// execution is coalesced for all callers, and make sure it's included
|
||||
// in the "steps" of the compiled graph so that any diagnostics will be
|
||||
// recorded.
|
||||
|
||||
const errSummary = "Invalid execution graph"
|
||||
switch ref := ref.(type) {
|
||||
case valueResultRef:
|
||||
vals := c.sourceGraph.constantVals
|
||||
index := ref.index
|
||||
return func(_ context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
return vals[index], true, nil
|
||||
}
|
||||
case providerAddrResultRef:
|
||||
providerAddrs := c.sourceGraph.providerAddrs
|
||||
index := ref.index
|
||||
return func(_ context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
return providerAddrs[index], true, nil
|
||||
}
|
||||
case desiredResourceInstanceResultRef:
|
||||
resourceInstAddrs := c.sourceGraph.desiredStateRefs
|
||||
index := ref.index
|
||||
if existing := c.desiredStateFuncs[index]; existing != nil {
|
||||
return existing
|
||||
}
|
||||
c.desiredStateFuncs[index] = nodeExecuteRawOnce(func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
desired := oracle.DesiredResourceInstance(ctx, resourceInstAddrs[index])
|
||||
if desired == nil {
|
||||
// If we get here then it suggests a bug in the planning engine,
|
||||
// because it should not include a node referring to a resource
|
||||
// instance that is not part of the desired state.
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
errSummary,
|
||||
fmt.Sprintf("The execution graph expects desired state for %s, but the evaluation system does not consider this resource instance to be \"desired\". This is a bug in OpenTofu.", resourceInstAddrs[index]),
|
||||
))
|
||||
return nil, false, diags
|
||||
}
|
||||
return desired, true, diags
|
||||
})
|
||||
c.compiledGraph.steps = append(c.compiledGraph.steps, c.desiredStateFuncs[index])
|
||||
return c.desiredStateFuncs[index]
|
||||
case resourceInstancePriorStateResultRef:
|
||||
priorState := c.priorState
|
||||
priorStateRefs := c.sourceGraph.priorStateRefs
|
||||
index := ref.index
|
||||
return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
priorStateRef := priorStateRefs[index]
|
||||
var gen states.Generation
|
||||
if priorStateRef.DeposedKey == states.NotDeposed {
|
||||
gen = states.CurrentGen
|
||||
} else {
|
||||
gen = priorStateRef.DeposedKey
|
||||
}
|
||||
obj := priorState.ResourceInstanceObject(priorStateRef.ResourceInstance, gen)
|
||||
if obj == nil {
|
||||
// If we get here then it suggests a bug in the planning engine,
|
||||
// because it should not include a node referring to a resource
|
||||
// instance object that is not part of the prior state. (An
|
||||
// object being created should have its prior state set to a
|
||||
// constant nil, without referring to prior state.)
|
||||
name := priorStateRef.ResourceInstance.String()
|
||||
if priorStateRef.DeposedKey != states.NotDeposed {
|
||||
name += fmt.Sprintf("deposed object %s", priorStateRef.DeposedKey)
|
||||
}
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
errSummary,
|
||||
fmt.Sprintf("The execution graph expects prior state for %s, but no such object exists in the state. This is a bug in OpenTofu.", name),
|
||||
))
|
||||
return nil, false, diags
|
||||
}
|
||||
return obj, true, diags
|
||||
}
|
||||
case providerInstanceConfigResultRef:
|
||||
providerInstConfigRefs := c.sourceGraph.providerInstConfigRefs
|
||||
index := ref.index
|
||||
if existing := c.providerInstConfigFuncs[index]; existing != nil {
|
||||
return existing
|
||||
}
|
||||
c.providerInstConfigFuncs[index] = nodeExecuteRawOnce(func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
configVal := oracle.ProviderInstanceConfig(ctx, providerInstConfigRefs[index])
|
||||
if configVal == cty.NilVal {
|
||||
// If we get here then it suggests a bug in the planning engine,
|
||||
// because it should not include a node referring to a provider
|
||||
// instance that is not present in the configuration.
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
errSummary,
|
||||
fmt.Sprintf("The execution graph expects configuration for %s, but the evaluation system does not know about that provider instance. This is a bug in OpenTofu.", providerInstConfigRefs[index]),
|
||||
))
|
||||
return nil, false, diags
|
||||
}
|
||||
return configVal, true, diags
|
||||
})
|
||||
c.compiledGraph.steps = append(c.compiledGraph.steps, c.desiredStateFuncs[index])
|
||||
return c.providerInstConfigFuncs[index]
|
||||
case anyOperationResultRef:
|
||||
// Operations have different result types depending on their opcodes,
|
||||
// but at this point we just represent everything as "any" and expect
|
||||
// that the downstream operations that rely on these results will
|
||||
// type-assert them dynamically as needed.
|
||||
opResults := c.opResults
|
||||
opResolvers := c.opResolvers
|
||||
index := ref.operationResultIndex()
|
||||
return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
promise := opResults[index]
|
||||
resultRaw, err := promise.Await(grapheval.WorkerFromContext(ctx))
|
||||
if err != nil {
|
||||
// An error here always means that the workgraph library has
|
||||
// detected a problem that might have caused a deadlock, which
|
||||
// during the apply phase is always a bug in OpenTofu because
|
||||
// we should've detected any user-caused problems during the
|
||||
// planning phase.
|
||||
diags = diags.Append(diagsForWorkgraphError(ctx, err, opResolvers))
|
||||
return nil, false, diags
|
||||
}
|
||||
return resultRaw.Value, resultRaw.CanContinue, resultRaw.Diagnostics
|
||||
}
|
||||
case waiterResultRef:
|
||||
// In this case we'll precompile the results we're waiting for because
|
||||
// then we can catch certain graph consistency problems sooner.
|
||||
waitForRefs := c.sourceGraph.waiters[ref.index]
|
||||
waiters := make([]nodeExecuteRaw, len(waitForRefs))
|
||||
for i, waitForRef := range waitForRefs {
|
||||
waiters[i] = c.compileResultRef(waitForRef)
|
||||
}
|
||||
return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
callerCanContinue := true
|
||||
for _, waiter := range waiters {
|
||||
_, ok, moreDiags := waiter(ctx)
|
||||
diags = diags.Append(moreDiags)
|
||||
if !ok {
|
||||
// We'll remember that the caller is supposed to stop
|
||||
// but we'll continue through our set of waiters in case
|
||||
// we find any other diagnostics to propagate.
|
||||
callerCanContinue = false
|
||||
}
|
||||
}
|
||||
return struct{}{}, callerCanContinue, diags
|
||||
}
|
||||
default:
|
||||
c.diags = append(c.diags, tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
errSummary,
|
||||
fmt.Sprintf("The execution graph includes %#v, but the compiler doesn't know how to handle it. This is a bug in OpenTofu.", ref),
|
||||
))
|
||||
return nil
|
||||
}
|
||||
}
|
||||
|
||||
// nodeExecuteRaw is the lowest-level representation of producing a result,
|
||||
// without any static type information yet.
|
||||
//
|
||||
// If the returned diagnostics includes errors then the caller must not try
|
||||
// to type-assert the first result, and should instead just return the
|
||||
// diagnostics along with its own nil result.
|
||||
type nodeExecuteRaw = func(ctx context.Context) (any, bool, tfdiags.Diagnostics)
|
||||
|
||||
// nodeExecuteRawOnce returns a [nodeExecuteRaw] that will call the given
|
||||
// [nodeExecuteRaw] only once on first call and then return its result to all
|
||||
// future callers.
|
||||
//
|
||||
// Each call to this function is independent even if two calls wrap the same
|
||||
// [nodeExecuteRaw]. Callers should probably stash their result somewhere to
|
||||
// reuse it for other callers that ought to share the result.
|
||||
func nodeExecuteRawOnce(inner nodeExecuteRaw) nodeExecuteRaw {
|
||||
// This mutex only for avoiding races to _start_ the request. It must not
|
||||
// be used to await the result because we want to use the workgraph
|
||||
// machinery to detect failures to resolve if e.g. the wrapped function
|
||||
// panics.
|
||||
var mu sync.Mutex
|
||||
var reqID workgraph.RequestID
|
||||
var promise workgraph.Promise[nodeResultRaw]
|
||||
return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
worker := grapheval.WorkerFromContext(ctx)
|
||||
|
||||
mu.Lock() // We hold this only while ensuring there's an active request
|
||||
if reqID == workgraph.NoRequest {
|
||||
// This is the first request, so we'll actually run the function
|
||||
// but first we'll set up the workgraph request so that subsequent
|
||||
// callers can wait for it.
|
||||
var resolver workgraph.Resolver[nodeResultRaw]
|
||||
resolver, promise = workgraph.NewRequest[nodeResultRaw](worker)
|
||||
mu.Unlock() // Allow concurrent callers to begin awaiting the promise
|
||||
|
||||
ret, ok, diags := inner(ctx)
|
||||
resolver.ReportSuccess(worker, nodeResultRaw{
|
||||
Value: ret,
|
||||
CanContinue: ok,
|
||||
Diagnostics: diags,
|
||||
})
|
||||
return ret, ok, diags
|
||||
}
|
||||
mu.Unlock()
|
||||
|
||||
result, err := promise.Await(worker)
|
||||
diags := result.Diagnostics
|
||||
if err != nil {
|
||||
diags = diags.Append(diagsForWorkgraphError(ctx, err, nil))
|
||||
result.CanContinue = false
|
||||
}
|
||||
return result.Value, result.CanContinue, diags
|
||||
}
|
||||
}
|
||||
|
||||
// nodeExecute is the type of a function that blocks until the result of a node
|
||||
// is available and then returns that result.
|
||||
//
|
||||
// The boolean result is true if the caller is allowed to take any action based
|
||||
// on the result. If it is false then the callers should ignore the T result
|
||||
// and immediately return, on the assumption that something upstream has failed
|
||||
// and will have already returned some diagnostics.
|
||||
type nodeExecute[T any] func(ctx context.Context) (T, bool, tfdiags.Diagnostics)
|
||||
|
||||
type nodeResultRaw struct {
|
||||
Value any
|
||||
CanContinue bool
|
||||
Diagnostics tfdiags.Diagnostics
|
||||
}
|
||||
|
||||
func diagsForWorkgraphError(ctx context.Context, err error, operationResolvers []workgraph.Resolver[nodeResultRaw]) tfdiags.Diagnostics {
|
||||
// findRequestName makes a best effort to describe the given workgraph request
|
||||
// in terms of operations in the execution graph, though because all of
|
||||
// these are "should never happen" cases this focuses mainly on providing
|
||||
// information to help OpenTofu developers with debugging, rather than
|
||||
// end-user-friendly information. (Any user-caused problems ought to have
|
||||
// been detected during the planning phase, so any problem we encounter
|
||||
// during apply is always an OpenTofu bug.)
|
||||
//
|
||||
// As usual we tolerate this being a pretty inefficient linear search
|
||||
// over all of the requests we know about because we should only end up
|
||||
// here when something has gone very wrong, and this approach avoids
|
||||
// tracking a bunch of extra debug state in the happy path.
|
||||
findRequestName := func(reqId workgraph.RequestID) string {
|
||||
for opIdx, resolver := range operationResolvers {
|
||||
if resolver.RequestID() == reqId {
|
||||
return fmt.Sprintf("execution graph operation r[%d]", opIdx)
|
||||
}
|
||||
}
|
||||
// If we fall out here then we presumably have a request ID from some
|
||||
// other part of the system, such as from package configgraph. We
|
||||
// might be able to get a useful description from a request tracker
|
||||
// attached to the given context, if so.
|
||||
// Note that we shouldn't really get here if the execution graph was
|
||||
// constructed correctly because the "waiter" nodes used by anything
|
||||
// that refers to the evaluator's oracle should block us from trying
|
||||
// to retrieve something that isn't ready yet, but we'll attempt this
|
||||
// anyway because if we get here then there's a bug somewhere by
|
||||
// definition.
|
||||
if reqTracker := grapheval.RequestTrackerFromContext(ctx); reqTracker != nil {
|
||||
for candidate, info := range reqTracker.ActiveRequests() {
|
||||
if candidate == reqId {
|
||||
return info.Name
|
||||
}
|
||||
}
|
||||
}
|
||||
// If all of that failed then we'll just return a useless placeholder
|
||||
// and hope that something else in the error message or debug log
|
||||
// gives some clue as to what's going on.
|
||||
return "<unknown>"
|
||||
}
|
||||
|
||||
var diags tfdiags.Diagnostics
|
||||
const summary = "Apply-time execution error"
|
||||
switch err := err.(type) {
|
||||
case workgraph.ErrSelfDependency:
|
||||
var buf strings.Builder
|
||||
buf.WriteString("While performing actions during the apply phase, OpenTofu detected a self-dependency cycle between the following:\n")
|
||||
for _, reqId := range err.RequestIDs {
|
||||
fmt.Fprintf(&buf, " - %s\n", findRequestName(reqId))
|
||||
}
|
||||
buf.WriteString("\nThis is a bug in OpenTofu.")
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
summary,
|
||||
buf.String(),
|
||||
))
|
||||
case workgraph.ErrUnresolved:
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
summary,
|
||||
fmt.Sprintf("While performing actions during the apply phase, a request for %q was left unresolved. This is a bug in OpenTofu.", findRequestName(err.RequestID)),
|
||||
))
|
||||
default:
|
||||
// We're not expecting any other error types here so we'll just
|
||||
// return something generic.
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
summary,
|
||||
fmt.Sprintf("While performing actions during the apply phase, OpenTofu encountered an unexpected error: %s.\n\nThis is a bug in OpenTofu.", err),
|
||||
))
|
||||
}
|
||||
return diags
|
||||
}
|
||||
|
||||
147
internal/engine/internal/execgraph/compiler_operands.go
Normal file
147
internal/engine/internal/execgraph/compiler_operands.go
Normal file
@@ -0,0 +1,147 @@
|
||||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package execgraph
|
||||
|
||||
import (
|
||||
"context"
|
||||
"fmt"
|
||||
"iter"
|
||||
"strings"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
)
|
||||
|
||||
// compilerOperands is a helper for concisely unpacking the operands of an
|
||||
// operation while asserting the result types they are expected to produce.
|
||||
//
|
||||
// Users of this should call [nextOperand] for each expected operand in turn,
|
||||
// and then call [compilerOperands.Finish] to collect error diagnostics for
|
||||
// any problems that were detected and to ensure that the internal state is
|
||||
// cleaned up correctly. If the Finish method returns error diagnostics then
|
||||
// none of the results from [nextOperand] should be used.
|
||||
//
|
||||
// // assuming that "operands" is a pointer to a compilerOperands object
|
||||
// getProviderAddr := nextOperand[addrs.Provider](operands)
|
||||
// getProviderConfig := nextOperand[cty.Value](operands)
|
||||
// waitForDependencies := operands.OperandWaiter()
|
||||
// diags := operands.Finish()
|
||||
// if diags.HasErrors() {
|
||||
// // compilation fails
|
||||
// }
|
||||
type compilerOperands struct {
|
||||
nextOperand func() (AnyResultRef, nodeExecuteRaw, bool)
|
||||
stop func()
|
||||
idx int
|
||||
problems []string
|
||||
}
|
||||
|
||||
// newCompilerOperands prepares a new [compilerOperands] object that produces
|
||||
// results based on the given sequence of operands, which was presumably
|
||||
// returned by [compiler.compileOperands].
|
||||
//
|
||||
// Refer to the documentation of [compilerOperands] for an example of how to
|
||||
// use the result.
|
||||
func newCompilerOperands(operands iter.Seq2[AnyResultRef, nodeExecuteRaw]) *compilerOperands {
|
||||
next, stop := iter.Pull2(operands)
|
||||
return &compilerOperands{
|
||||
nextOperand: next,
|
||||
stop: stop,
|
||||
idx: 0,
|
||||
problems: nil,
|
||||
}
|
||||
}
|
||||
|
||||
func nextOperand[T any](operands *compilerOperands) nodeExecute[T] {
|
||||
idx := operands.idx
|
||||
operands.idx++
|
||||
resultRef, execRaw, ok := operands.nextOperand()
|
||||
if !ok {
|
||||
operands.problems = append(operands.problems, fmt.Sprintf("missing expected operand %d", idx))
|
||||
return nil
|
||||
}
|
||||
// We'll catch type mismatches during compile time as long as the compiler
|
||||
// produces correct nodeExecuteRaw implementations that actually honor
|
||||
// the expected type.
|
||||
if _, typeOk := resultRef.(ResultRef[T]); !typeOk {
|
||||
var zero T
|
||||
operands.problems = append(operands.problems, fmt.Sprintf("operand %d not of expected type %T (got %T)", idx, any(zero), resultRef))
|
||||
return nil
|
||||
}
|
||||
|
||||
return func(ctx context.Context) (T, bool, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
// We intentionally don't propagate diagnostics here because they
|
||||
// describe problems that the node associated with this operand will
|
||||
// report directly when visited by [CompiledGraph.Execute]. We only
|
||||
// want to return diagnostics that are unique to this particular
|
||||
// reference to the node, such as the type mismatch error below.
|
||||
resultRaw, ok, _ := execRaw(ctx)
|
||||
if !ok {
|
||||
var zero T
|
||||
return zero, false, nil
|
||||
}
|
||||
result, ok := resultRaw.(T)
|
||||
if !ok {
|
||||
// We'll get here if the execRaw function was compiled incorrectly
|
||||
// so that its actual result does not agree with the type of the
|
||||
// ResultRef it was expected to satisfy.
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Invalid execution graph compilation",
|
||||
fmt.Sprintf("Operand %d was supposed to be %T, but its implementation produced %T. This is a bug in OpenTofu.", idx, result, resultRaw),
|
||||
))
|
||||
var zero T
|
||||
return zero, false, diags
|
||||
}
|
||||
return result, true, diags
|
||||
}
|
||||
}
|
||||
|
||||
// OperandWaiter is a variant of [nextOperand] for operands that don't produce
|
||||
// a useful value and exist only to block beginning some other work until
|
||||
// they have completed.
|
||||
//
|
||||
// If the returned function produces false then the caller must immediately
|
||||
// return without doing any other work, because some upstream has failed and
|
||||
// so we need to unwind and report the collected errors.
|
||||
func (ops *compilerOperands) OperandWaiter() func(ctx context.Context) bool {
|
||||
idx := ops.idx
|
||||
ops.idx++
|
||||
_, execRaw, ok := ops.nextOperand()
|
||||
if !ok {
|
||||
ops.problems = append(ops.problems, fmt.Sprintf("missing expected operand %d", idx))
|
||||
return nil
|
||||
}
|
||||
return func(ctx context.Context) bool {
|
||||
_, canContinue, _ := execRaw(ctx)
|
||||
return canContinue
|
||||
}
|
||||
}
|
||||
|
||||
func (ops *compilerOperands) Finish() tfdiags.Diagnostics {
|
||||
// Regardless of how this terminates we no longer need the operand iterator.
|
||||
defer ops.stop()
|
||||
|
||||
var diags tfdiags.Diagnostics
|
||||
problems := ops.problems
|
||||
if _, _, anotherOperand := ops.nextOperand(); anotherOperand {
|
||||
problems = append(problems, fmt.Sprintf("expected only %d operands, but found additional operands", ops.idx))
|
||||
}
|
||||
if len(problems) != 0 {
|
||||
var buf strings.Builder
|
||||
buf.WriteString("Found incorrect operands when compiling operation:\n")
|
||||
for _, problem := range problems {
|
||||
fmt.Fprintf(&buf, " - %s\n", problem)
|
||||
}
|
||||
buf.WriteString("\nThis is a bug in OpenTofu.")
|
||||
diags = diags.Append(tfdiags.Sourceless(
|
||||
tfdiags.Error,
|
||||
"Invalid operands for execution graph operation",
|
||||
buf.String(),
|
||||
))
|
||||
}
|
||||
return diags
|
||||
}
|
||||
52
internal/engine/internal/execgraph/compiler_ops.go
Normal file
52
internal/engine/internal/execgraph/compiler_ops.go
Normal file
@@ -0,0 +1,52 @@
|
||||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package execgraph
|
||||
|
||||
import (
|
||||
"context"
|
||||
|
||||
"github.com/zclconf/go-cty/cty"
|
||||
|
||||
"github.com/opentofu/opentofu/internal/addrs"
|
||||
"github.com/opentofu/opentofu/internal/tfdiags"
|
||||
)
|
||||
|
||||
func (c *compiler) compileOpOpenProvider(operands *compilerOperands) nodeExecuteRaw {
|
||||
getProviderAddr := nextOperand[addrs.Provider](operands)
|
||||
getConfigVal := nextOperand[cty.Value](operands)
|
||||
waitForDeps := operands.OperandWaiter()
|
||||
diags := operands.Finish()
|
||||
c.diags = c.diags.Append(diags)
|
||||
if diags.HasErrors() {
|
||||
return nil
|
||||
}
|
||||
|
||||
providers := c.evalCtx.Providers
|
||||
|
||||
return func(ctx context.Context) (any, bool, tfdiags.Diagnostics) {
|
||||
var diags tfdiags.Diagnostics
|
||||
if !waitForDeps(ctx) {
|
||||
return nil, false, diags
|
||||
}
|
||||
providerAddr, ok, moreDiags := getProviderAddr(ctx)
|
||||
diags = diags.Append(moreDiags)
|
||||
if !ok {
|
||||
return nil, false, diags
|
||||
}
|
||||
configVal, ok, moreDiags := getConfigVal(ctx)
|
||||
diags = diags.Append(moreDiags)
|
||||
if !ok {
|
||||
return nil, false, diags
|
||||
}
|
||||
|
||||
ret, moreDiags := providers.NewConfiguredProvider(ctx, providerAddr, configVal)
|
||||
diags = diags.Append(moreDiags)
|
||||
if moreDiags.HasErrors() {
|
||||
return nil, false, diags
|
||||
}
|
||||
return ret, true, diags
|
||||
}
|
||||
}
|
||||
6
internal/engine/internal/execgraph/compiler_test.go
Normal file
6
internal/engine/internal/execgraph/compiler_test.go
Normal file
@@ -0,0 +1,6 @@
|
||||
// Copyright (c) The OpenTofu Authors
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
// Copyright (c) 2023 HashiCorp, Inc.
|
||||
// SPDX-License-Identifier: MPL-2.0
|
||||
|
||||
package execgraph
|
||||
@@ -88,3 +88,31 @@ func (c *ConfigInstance) DriveApplying(ctx context.Context, glue ApplyGlue, run
|
||||
// the oracle only once it has already been made available by earlier work.
|
||||
type ApplyOracle struct {
|
||||
}
|
||||
|
||||
// DesiredResourceInstance returns the [DesiredResourceInstance] object
|
||||
// associated with the given resource instance address, or nil if the given
|
||||
// address does not match a desired resource instance.
|
||||
//
|
||||
// This API assumes that the apply phase is working from an execution graph
|
||||
// built during the planning phase and is therefore relying on the plan phase
|
||||
// to correctly describe a subset of the desired resource instances so that
|
||||
// this should never return nil. If this _does_ return nil then that suggests
|
||||
// a bug in the planning engine, which caused it to create an incorrect
|
||||
// execution graph.
|
||||
func (o *ApplyOracle) DesiredResourceInstance(ctx context.Context, addr addrs.AbsResourceInstance) *DesiredResourceInstance {
|
||||
// TODO: Implement
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
// ProviderInstanceConfig returns the configuration value for the given
|
||||
// provider instance, or [cty.NilVal] if there is no such provider instance.
|
||||
//
|
||||
// This API assumes that the apply phase is working from an execution graph
|
||||
// built during the planning phase and is therefore relyingo n the plan phase
|
||||
// to refer only to provider instances that are present ni the configuration.
|
||||
// If this _does_ return cty.NilVal then that suggests a bug in the planning
|
||||
// engine, causing it to create an incorrect execution graph.
|
||||
func (o *ApplyOracle) ProviderInstanceConfig(ctx context.Context, addr addrs.AbsProviderInstanceCorrect) cty.Value {
|
||||
// TODO: Implement
|
||||
panic("unimplemented")
|
||||
}
|
||||
|
||||
@@ -53,12 +53,9 @@ func ContextWithRequestTracker(parent context.Context, tracker RequestTracker) c
|
||||
return context.WithValue(parent, trackerContextKey, tracker)
|
||||
}
|
||||
|
||||
// requestTrackerFromContext returns the request tracker associated with the
|
||||
// RequestTrackerFromContext returns the request tracker associated with the
|
||||
// given context, or nil if there is no request tracker.
|
||||
//
|
||||
// This is unexported because request trackers are provided by external code
|
||||
// but only used by code within this package.
|
||||
func requestTrackerFromContext(ctx context.Context) RequestTracker {
|
||||
func RequestTrackerFromContext(ctx context.Context) RequestTracker {
|
||||
tracker, ok := ctx.Value(trackerContextKey).(RequestTracker)
|
||||
if !ok {
|
||||
return nil
|
||||
|
||||
@@ -30,7 +30,7 @@ import (
|
||||
// information and will report that missing information as being a bug in
|
||||
// OpenTofu, because we should always be tracking requests correctly.
|
||||
func DiagnosticsForWorkgraphError(ctx context.Context, err error) tfdiags.Diagnostics {
|
||||
tracker := requestTrackerFromContext(ctx)
|
||||
tracker := RequestTrackerFromContext(ctx)
|
||||
|
||||
if tracker == nil {
|
||||
// In this case we must return lower-quality error messages because
|
||||
|
||||
Reference in New Issue
Block a user