Somewhat correctly handle provider mananger lifecycle

Signed-off-by: Christian Mesh <christianmesh1@gmail.com>
This commit is contained in:
Christian Mesh
2025-12-11 10:24:48 -05:00
parent 5c66f389ae
commit 271dac36a4
19 changed files with 112 additions and 323 deletions

View File

@@ -89,7 +89,7 @@ func (b *Local) opApply(
op.Hooks = append(op.Hooks, stateHook)
// Get our context
lr, _, opState, contextDiags := b.localRun(ctx, op)
lr, _, opState, contextDiags := b.localRun(ctx, op, stopCtx)
diags = diags.Append(contextDiags)
if contextDiags.HasErrors() {
op.ReportResult(runningOp, diags)

View File

@@ -39,11 +39,11 @@ func (b *Local) LocalRun(ctx context.Context, op *backend.Operation) (*backend.L
op.StateLocker = op.StateLocker.WithContext(context.Background())
lr, _, stateMgr, diags := b.localRun(ctx, op)
lr, _, stateMgr, diags := b.localRun(ctx, op, ctx)
return lr, stateMgr, diags
}
func (b *Local) localRun(ctx context.Context, op *backend.Operation) (*backend.LocalRun, *configload.Snapshot, statemgr.Full, tfdiags.Diagnostics) {
func (b *Local) localRun(ctx context.Context, op *backend.Operation, stopCtx context.Context) (*backend.LocalRun, *configload.Snapshot, statemgr.Full, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
// Get the latest state.
@@ -100,7 +100,7 @@ func (b *Local) localRun(ctx context.Context, op *backend.Operation) (*backend.L
stateMeta = &m
}
log.Printf("[TRACE] backend/local: populating backend.LocalRun from plan file")
ret, configSnap, ctxDiags = b.localRunForPlanFile(ctx, op, lp, ret, &coreOpts, stateMeta)
ret, configSnap, ctxDiags = b.localRunForPlanFile(ctx, op, lp, ret, &coreOpts, stateMeta, stopCtx)
if ctxDiags.HasErrors() {
diags = diags.Append(ctxDiags)
return nil, nil, nil, diags
@@ -111,7 +111,7 @@ func (b *Local) localRun(ctx context.Context, op *backend.Operation) (*backend.L
op.ConfigLoader.ImportSourcesFromSnapshot(configSnap)
} else {
log.Printf("[TRACE] backend/local: populating backend.LocalRun for current working directory")
ret, configSnap, ctxDiags = b.localRunDirect(ctx, op, ret, &coreOpts, s)
ret, configSnap, ctxDiags = b.localRunDirect(ctx, op, ret, &coreOpts, s, stopCtx)
}
diags = diags.Append(ctxDiags)
if diags.HasErrors() {
@@ -144,7 +144,7 @@ func (b *Local) localRun(ctx context.Context, op *backend.Operation) (*backend.L
return ret, configSnap, s, diags
}
func (b *Local) localRunDirect(ctx context.Context, op *backend.Operation, run *backend.LocalRun, coreOpts *tofu.ContextOpts, s statemgr.Full) (*backend.LocalRun, *configload.Snapshot, tfdiags.Diagnostics) {
func (b *Local) localRunDirect(ctx context.Context, op *backend.Operation, run *backend.LocalRun, coreOpts *tofu.ContextOpts, s statemgr.Full, stopCtx context.Context) (*backend.LocalRun, *configload.Snapshot, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
// Load the configuration using the caller-provided configuration loader.
@@ -226,7 +226,7 @@ func (b *Local) localRunDirect(ctx context.Context, op *backend.Operation, run *
}
run.InputState = state
tfCtx, moreDiags := tofu.NewContext(coreOpts)
tfCtx, moreDiags := tofu.NewContext(stopCtx, coreOpts)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
return nil, nil, diags
@@ -235,7 +235,7 @@ func (b *Local) localRunDirect(ctx context.Context, op *backend.Operation, run *
return run, configSnap, diags
}
func (b *Local) localRunForPlanFile(ctx context.Context, op *backend.Operation, pf *planfile.Reader, run *backend.LocalRun, coreOpts *tofu.ContextOpts, currentStateMeta *statemgr.SnapshotMeta) (*backend.LocalRun, *configload.Snapshot, tfdiags.Diagnostics) {
func (b *Local) localRunForPlanFile(ctx context.Context, op *backend.Operation, pf *planfile.Reader, run *backend.LocalRun, coreOpts *tofu.ContextOpts, currentStateMeta *statemgr.SnapshotMeta, stopCtx context.Context) (*backend.LocalRun, *configload.Snapshot, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
const errSummary = "Invalid plan file"
@@ -370,7 +370,7 @@ func (b *Local) localRunForPlanFile(ctx context.Context, op *backend.Operation,
// refreshing we did while building the plan.
run.InputState = priorStateFile.State
tfCtx, moreDiags := tofu.NewContext(coreOpts)
tfCtx, moreDiags := tofu.NewContext(stopCtx, coreOpts)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
return nil, nil, diags

View File

@@ -95,7 +95,7 @@ func (b *Local) opPlan(
}
// Get our context
lr, configSnap, opState, ctxDiags := b.localRun(ctx, op)
lr, configSnap, opState, ctxDiags := b.localRun(ctx, op, stopCtx)
diags = diags.Append(ctxDiags)
if ctxDiags.HasErrors() {
op.ReportResult(runningOp, diags)

View File

@@ -66,7 +66,7 @@ func (b *Local) opRefresh(
op.PlanRefresh = true
// Get our context
lr, _, opState, contextDiags := b.localRun(ctx, op)
lr, _, opState, contextDiags := b.localRun(ctx, op, stopCtx)
diags = diags.Append(contextDiags)
if contextDiags.HasErrors() {
op.ReportResult(runningOp, diags)

View File

@@ -146,7 +146,7 @@ func (b *Local) opPlanWithExperimentalRuntime(stopCtx context.Context, cancelCtx
prevRoundState = states.NewState() // this is the first round, starting with an empty state
}
plugins := plugins.NewRuntimePlugins(b.ContextOpts.Providers, b.ContextOpts.Provisioners)
plugins := plugins.NewRuntimePlugins(b.ContextOpts.Plugins.Manager(ctx))
evalCtx := &eval.EvalContext{
RootModuleDir: op.ConfigDir,
OriginalWorkingDir: b.ContextOpts.Meta.OriginalWorkingDir,

View File

@@ -17,6 +17,7 @@ import (
"github.com/opentofu/opentofu/internal/backend"
"github.com/opentofu/opentofu/internal/configs/configschema"
"github.com/opentofu/opentofu/internal/encryption"
"github.com/opentofu/opentofu/internal/plugins"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/states"
"github.com/opentofu/opentofu/internal/states/statemgr"
@@ -105,9 +106,9 @@ func TestLocalProvider(t *testing.T, b *Local, name string, schema providers.Pro
}
// Set up our provider
b.ContextOpts.Providers = map[addrs.Provider]providers.Factory{
b.ContextOpts.Plugins = plugins.NewPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider(name): providers.FactoryFixed(p),
}
}, nil)
return p

View File

@@ -150,7 +150,7 @@ func (b *Remote) LocalRun(ctx context.Context, op *backend.Operation) (*backend.
}
}
tfCtx, ctxDiags := tofu.NewContext(&opts)
tfCtx, ctxDiags := tofu.NewContext(ctx, &opts)
diags = diags.Append(ctxDiags)
ret.Core = tfCtx

View File

@@ -151,7 +151,7 @@ func (b *Cloud) LocalRun(ctx context.Context, op *backend.Operation) (*backend.L
}
}
tfCtx, ctxDiags := tofu.NewContext(&opts)
tfCtx, ctxDiags := tofu.NewContext(ctx, &opts)
diags = diags.Append(ctxDiags)
ret.Core = tfCtx

View File

@@ -1107,8 +1107,11 @@ func TestApply_shutdown(t *testing.T) {
},
}
var closer sync.Once
p.StopFn = func() error {
close(cancelled)
closer.Do(func() {
close(cancelled)
})
return nil
}

View File

@@ -38,6 +38,7 @@ import (
"github.com/opentofu/opentofu/internal/getmodules"
"github.com/opentofu/opentofu/internal/getproviders"
legacy "github.com/opentofu/opentofu/internal/legacy/tofu"
"github.com/opentofu/opentofu/internal/plugins"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/provisioners"
"github.com/opentofu/opentofu/internal/states"
@@ -590,13 +591,11 @@ func (m *Meta) contextOpts(ctx context.Context) (*tofu.ContextOpts, error) {
// and just work with what we've been given, thus allowing the tests
// to provide mock providers and provisioners.
if m.testingOverrides != nil {
opts.Providers = m.testingOverrides.Providers
opts.Provisioners = m.testingOverrides.Provisioners
opts.Plugins = plugins.NewPlugins(m.testingOverrides.Providers, m.testingOverrides.Provisioners)
} else {
var providerFactories map[addrs.Provider]providers.Factory
providerFactories, err = m.providerFactories()
opts.Providers = providerFactories
opts.Provisioners = m.provisionerFactories()
opts.Plugins = plugins.NewPlugins(providerFactories, m.provisionerFactories())
}
opts.Meta = &tofu.ContextMeta{
@@ -956,7 +955,7 @@ func (c *Meta) MaybeGetSchemas(ctx context.Context, state *states.State, config
diags = diags.Append(err)
return nil, diags
}
tfCtx, ctxDiags := tofu.NewContext(opts)
tfCtx, ctxDiags := tofu.NewContext(ctx, opts)
diags = diags.Append(ctxDiags)
if ctxDiags.HasErrors() {
return nil, diags

View File

@@ -1487,8 +1487,11 @@ func TestPlan_shutdown(t *testing.T) {
},
}
var closer sync.Once
p.StopFn = func() error {
close(cancelled)
closer.Do(func() {
close(cancelled)
})
return nil
}

View File

@@ -701,7 +701,7 @@ func (runner *TestFileRunner) validate(ctx context.Context, config *configs.Conf
var diags tfdiags.Diagnostics
tfCtx, ctxDiags := tofu.NewContext(runner.Suite.Opts)
tfCtx, ctxDiags := tofu.NewContext(ctx, runner.Suite.Opts)
diags = diags.Append(ctxDiags)
if ctxDiags.HasErrors() {
return diags
@@ -759,7 +759,7 @@ func (runner *TestFileRunner) destroy(ctx context.Context, config *configs.Confi
SetVariables: variables,
}
tfCtx, ctxDiags := tofu.NewContext(runner.Suite.Opts)
tfCtx, ctxDiags := tofu.NewContext(ctx, runner.Suite.Opts)
diags = diags.Append(ctxDiags)
if ctxDiags.HasErrors() {
return state, diags
@@ -836,7 +836,7 @@ func (runner *TestFileRunner) plan(ctx context.Context, config *configs.Config,
ExternalReferences: references,
}
tfCtx, ctxDiags := tofu.NewContext(runner.Suite.Opts)
tfCtx, ctxDiags := tofu.NewContext(ctx, runner.Suite.Opts)
diags = diags.Append(ctxDiags)
if ctxDiags.HasErrors() {
return nil, nil, diags
@@ -891,7 +891,7 @@ func (runner *TestFileRunner) apply(ctx context.Context, plan *plans.Plan, state
created = append(created, change)
}
tfCtx, ctxDiags := tofu.NewContext(runner.Suite.Opts)
tfCtx, ctxDiags := tofu.NewContext(ctx, runner.Suite.Opts)
diags = diags.Append(ctxDiags)
if ctxDiags.HasErrors() {
return nil, state, diags

View File

@@ -113,7 +113,7 @@ func (c *ValidateCommand) validate(ctx context.Context, dir, testDir string, noT
return diags
}
tfCtx, ctxDiags := tofu.NewContext(opts)
tfCtx, ctxDiags := tofu.NewContext(ctx, opts)
diags = diags.Append(ctxDiags)
if ctxDiags.HasErrors() {
return diags

View File

@@ -92,7 +92,7 @@ func (pi *providerInstances) ProviderClient(ctx context.Context, addr addrs.AbsP
// then this should return "nil, nil" in the error case so that the
// caller will treat it the same as a "configuration not valid enough"
// problem.
ret, diags := planGlue.planCtx.providers.NewConfiguredProvider(ctx, addr.Config.Config.Provider, configVal)
ret, diags := planGlue.planCtx.providers.NewConfiguredProvider(ctx, addr, configVal)
// This background goroutine deals with closing the provider once it's
// no longer needed, and with asking it to gracefully stop if our

View File

@@ -7,15 +7,12 @@ package plugins
import (
"context"
"errors"
"fmt"
"sync"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/configs/configschema"
"github.com/opentofu/opentofu/internal/lang/eval"
"github.com/opentofu/opentofu/internal/plugins"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/provisioners"
"github.com/opentofu/opentofu/internal/tfdiags"
"github.com/zclconf/go-cty/cty"
)
@@ -70,7 +67,7 @@ type Providers interface {
// [Providers.ValidateProviderConfig]. If the returned diagnostics contains
// errors then the [providers.Configured] result is invalid and must not be
// used.
NewConfiguredProvider(ctx context.Context, provider addrs.Provider, configVal cty.Value) (providers.Configured, tfdiags.Diagnostics)
NewConfiguredProvider(ctx context.Context, provider addrs.AbsProviderInstanceCorrect, configVal cty.Value) (providers.Configured, tfdiags.Diagnostics)
Close(ctx context.Context) error
}
@@ -80,259 +77,59 @@ type Provisioners interface {
}
type newRuntimePlugins struct {
providers map[addrs.Provider]providers.Factory
provisioners map[string]provisioners.Factory
// unconfiguredInsts is all of the provider instances we've created for
// unconfigured uses such as schema fetching and validation, which we
// currently just leave running for the remainder of the life of this
// object though perhaps we'll do something more clever eventually.
//
// Must hold a lock on mu throughout any access to this map.
unconfiguredInsts map[addrs.Provider]providers.Unconfigured
mu sync.Mutex
providers plugins.ProviderManager
provisioners plugins.ProvisionerManager
}
var _ Providers = (*newRuntimePlugins)(nil)
var _ Provisioners = (*newRuntimePlugins)(nil)
func NewRuntimePlugins(providers map[addrs.Provider]providers.Factory, provisioners map[string]provisioners.Factory) Plugins {
func NewRuntimePlugins(manager plugins.PluginManager) Plugins {
return &newRuntimePlugins{
providers: providers,
provisioners: provisioners,
providers: manager,
provisioners: manager,
}
}
// NewConfiguredProvider implements evalglue.Providers.
func (n *newRuntimePlugins) NewConfiguredProvider(ctx context.Context, provider addrs.Provider, configVal cty.Value) (providers.Configured, tfdiags.Diagnostics) {
inst, diags := n.newProviderInst(ctx, provider)
if diags.HasErrors() {
return nil, diags
}
resp := inst.ConfigureProvider(ctx, providers.ConfigureProviderRequest{
Config: configVal,
// We aren't actually Terraform, so we'll just pretend to be a
// Terraform version that has roughly the same functionality that
// OpenTofu currently has, since providers are permitted to use this to
// adapt their behavior for older versions of Terraform.
TerraformVersion: "1.13.0",
})
diags = diags.Append(resp.Diagnostics)
if resp.Diagnostics.HasErrors() {
return nil, diags
}
return inst, diags
func (n *newRuntimePlugins) NewConfiguredProvider(ctx context.Context, provider addrs.AbsProviderInstanceCorrect, configVal cty.Value) (providers.Configured, tfdiags.Diagnostics) {
diags := n.providers.ConfigureProvider(ctx, provider, configVal)
configured := n.providers.ConfiguredProvider(provider)
return configured, diags
}
// ProviderConfigSchema implements evalglue.Providers.
func (n *newRuntimePlugins) ProviderConfigSchema(ctx context.Context, provider addrs.Provider) (*providers.Schema, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
inst, moreDiags := n.unconfiguredProviderInst(ctx, provider)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
return nil, diags
}
resp := inst.GetProviderSchema(ctx)
diags = diags.Append(resp.Diagnostics)
if resp.Diagnostics.HasErrors() {
return nil, diags
}
return &resp.Provider, diags
return n.providers.ProviderConfigSchema(ctx, provider)
}
// ResourceTypeSchema implements evalglue.Providers.
func (n *newRuntimePlugins) ResourceTypeSchema(ctx context.Context, provider addrs.Provider, mode addrs.ResourceMode, typeName string) (*providers.Schema, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
inst, moreDiags := n.unconfiguredProviderInst(ctx, provider)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
return nil, diags
}
resp := inst.GetProviderSchema(ctx)
diags = diags.Append(resp.Diagnostics)
if resp.Diagnostics.HasErrors() {
return nil, diags
}
// NOTE: Callers expect us to return nil if we successfully fetch the
// provider schema and then find there is no matching resource type, because
// the caller is typically in a better position to return a useful error
// message than we are.
var types map[string]providers.Schema
switch mode {
case addrs.ManagedResourceMode:
types = resp.ResourceTypes
case addrs.DataResourceMode:
types = resp.DataSources
case addrs.EphemeralResourceMode:
types = resp.EphemeralResources
default:
// We don't support any other modes, so we'll just treat these as
// a request for a resource type that doesn't exist at all.
return nil, nil
}
ret, ok := types[typeName]
if !ok {
return nil, diags
}
return &ret, diags
return n.providers.ResourceTypeSchema(ctx, provider, mode, typeName)
}
// ValidateProviderConfig implements evalglue.Providers.
func (n *newRuntimePlugins) ValidateProviderConfig(ctx context.Context, provider addrs.Provider, configVal cty.Value) tfdiags.Diagnostics {
var diags tfdiags.Diagnostics
inst, moreDiags := n.unconfiguredProviderInst(ctx, provider)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
return diags
}
resp := inst.ValidateProviderConfig(ctx, providers.ValidateProviderConfigRequest{
Config: configVal,
})
diags = diags.Append(resp.Diagnostics)
return diags
return n.providers.ValidateProviderConfig(ctx, provider, configVal)
}
// ValidateResourceConfig implements evalglue.Providers.
func (n *newRuntimePlugins) ValidateResourceConfig(ctx context.Context, provider addrs.Provider, mode addrs.ResourceMode, typeName string, configVal cty.Value) tfdiags.Diagnostics {
var diags tfdiags.Diagnostics
inst, moreDiags := n.unconfiguredProviderInst(ctx, provider)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
return diags
}
switch mode {
case addrs.ManagedResourceMode:
resp := inst.ValidateResourceConfig(ctx, providers.ValidateResourceConfigRequest{
TypeName: typeName,
Config: configVal,
})
diags = diags.Append(resp.Diagnostics)
case addrs.DataResourceMode:
resp := inst.ValidateDataResourceConfig(ctx, providers.ValidateDataResourceConfigRequest{
TypeName: typeName,
Config: configVal,
})
diags = diags.Append(resp.Diagnostics)
case addrs.EphemeralResourceMode:
resp := inst.ValidateEphemeralConfig(ctx, providers.ValidateEphemeralConfigRequest{
TypeName: typeName,
Config: configVal,
})
diags = diags.Append(resp.Diagnostics)
default:
// If we get here then it's a bug because the cases above should
// cover all valid values of [addrs.ResourceMode].
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Error,
"Unsupported resource mode",
fmt.Sprintf("Attempted to validate resource of unsupported mode %s; this is a bug in OpenTofu.", mode),
))
}
return diags
}
func (m *newRuntimePlugins) unconfiguredProviderInst(ctx context.Context, provider addrs.Provider) (providers.Unconfigured, tfdiags.Diagnostics) {
m.mu.Lock()
defer m.mu.Unlock()
if running, ok := m.unconfiguredInsts[provider]; ok {
return running, nil
}
inst, diags := m.newProviderInst(ctx, provider)
if diags.HasErrors() {
return nil, diags
}
if m.unconfiguredInsts == nil {
m.unconfiguredInsts = make(map[addrs.Provider]providers.Unconfigured)
}
m.unconfiguredInsts[provider] = inst
return inst, diags
}
// newProviderInst creates a new instance of the given provider.
//
// The result is not retained anywhere inside the receiver. Each call to this
// function returns a new object. A successful result is always an unconfigured
// provider, but we return [providers.Interface] in case the caller would like
// to subsequently configure the result before returning it as
// [providers.Configured].
//
// If you intend to use the resulting instance only for "unconfigured"
// operations like fetching schema, use
// [newRuntimePlugins.unconfiguredProviderInst] instead to potentially reuse
// an already-active instance of the same provider.
func (m *newRuntimePlugins) newProviderInst(_ context.Context, provider addrs.Provider) (providers.Interface, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
factory, ok := m.providers[provider]
if !ok {
// FIXME: If this error remains reachable in the final version of this
// code (i.e. if some caller isn't already guaranteeing that all
// providers from the configuration and state are included here) then
// we should make this error message more actionable.
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Error,
"Provider unavailable",
fmt.Sprintf("This configuration requires provider %q, but it isn't installed.", provider),
))
return nil, diags
}
inst, err := factory()
if err != nil {
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Error,
"Provider failed to start",
fmt.Sprintf("Failed to launch provider %q: %s.", provider, tfdiags.FormatError(err)),
))
return nil, diags
}
return inst, diags
return n.providers.ValidateResourceConfig(ctx, provider, mode, typeName, configVal)
}
// ProvisionerConfigSchema implements evalglue.Provisioners.
func (n *newRuntimePlugins) ProvisionerConfigSchema(ctx context.Context, typeName string) (*configschema.Block, tfdiags.Diagnostics) {
// TODO: Implement this in terms of [newRuntimePlugins.provisioners].
// But provisioners aren't in scope for our "walking skeleton" phase of
// development, so we'll skip this for now.
var diags tfdiags.Diagnostics
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Error,
"Cannot use providers in new runtime codepath",
fmt.Sprintf("Can't use provisioner %q: new runtime codepath doesn't know how to instantiate provisioners yet", typeName),
))
return nil, diags
schema, err := n.provisioners.ProvisionerSchema(typeName)
if err != nil {
return nil, tfdiags.Diagnostics{}.Append(err)
}
return schema, nil
}
// Close terminates any plugins that are managed by this object and are still
// running.
func (n *newRuntimePlugins) Close(ctx context.Context) error {
n.mu.Lock()
defer n.mu.Unlock()
var errs error
for addr, p := range n.unconfiguredInsts {
err := p.Close(ctx)
if err != nil {
errs = errors.Join(errs, fmt.Errorf("closing provider %q: %w", addr, err))
}
}
n.unconfiguredInsts = nil // discard all of the memoized instances
return errs
// TODO use proper close?
return n.providers.Stop(ctx)
}

View File

@@ -18,15 +18,55 @@ type PluginManager interface {
ProviderManager
}
func NewPluginManager(ctx context.Context,
type Plugins interface {
HasProvider(addr addrs.Provider) bool
HasProvisioner(typ string) bool
Schemas(ctx context.Context) PluginSchemas
Manager(ctx context.Context) PluginManager
}
type plugins struct {
providerFactories map[addrs.Provider]providers.Factory
provisionerFactories map[string]provisioners.Factory
}
func NewPlugins(
providerFactories map[addrs.Provider]providers.Factory,
provisionerFactories map[string]provisioners.Factory,
) PluginManager {
) Plugins {
return &plugins{
providerFactories: providerFactories,
provisionerFactories: provisionerFactories,
}
}
func (p *plugins) HasProvider(addr addrs.Provider) bool {
_, ok := p.providerFactories[addr]
return ok
}
func (p *plugins) HasProvisioner(typ string) bool {
_, ok := p.provisionerFactories[typ]
return ok
}
func (p *plugins) Schemas(ctx context.Context) PluginSchemas {
return struct {
ProvisionerSchemas
ProviderSchemas
}{
ProvisionerSchemas: NewProvisionerManager(p.provisionerFactories),
ProviderSchemas: NewProviderManager(ctx, p.providerFactories),
}
}
func (p *plugins) Manager(ctx context.Context) PluginManager {
return struct {
ProvisionerManager
ProviderManager
}{
ProvisionerManager: NewProvisionerManager(provisionerFactories),
ProviderManager: NewProviderManager(ctx, providerFactories),
ProvisionerManager: NewProvisionerManager(p.provisionerFactories),
ProviderManager: NewProviderManager(ctx, p.providerFactories),
}
}

View File

@@ -47,6 +47,7 @@ func NewProviderManager(ctx context.Context, factories map[addrs.Provider]provid
go func() {
// TODO configurable
expiration := time.Duration(15 * time.Second)
loop:
for {
manager.unconfiguredLock.Lock()
for addr, entry := range manager.unconfigured {
@@ -67,7 +68,7 @@ func NewProviderManager(ctx context.Context, factories map[addrs.Provider]provid
case <-time.After(expiration):
continue
case <-ctx.Done():
break
break loop
}
}

View File

@@ -14,13 +14,10 @@ import (
"github.com/zclconf/go-cty/cty"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/configs"
"github.com/opentofu/opentofu/internal/encryption"
"github.com/opentofu/opentofu/internal/logging"
"github.com/opentofu/opentofu/internal/plugins"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/provisioners"
"github.com/opentofu/opentofu/internal/states"
"github.com/opentofu/opentofu/internal/tfdiags"
)
@@ -41,12 +38,11 @@ const (
// ContextOpts are the user-configurable options to create a context with
// NewContext.
type ContextOpts struct {
Meta *ContextMeta
Hooks []Hook
Parallelism int
Providers map[addrs.Provider]providers.Factory
Provisioners map[string]provisioners.Factory
Encryption encryption.Encryption
Meta *ContextMeta
Hooks []Hook
Parallelism int
Plugins plugins.Plugins
Encryption encryption.Encryption
UIInput UIInput
}
@@ -106,7 +102,7 @@ type Context struct {
//
// If the returned diagnostics contains errors then the resulting context is
// invalid and must not be used.
func NewContext(opts *ContextOpts) (*Context, tfdiags.Diagnostics) {
func NewContext(ctx context.Context, opts *ContextOpts) (*Context, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
log.Printf("[TRACE] tofu.NewContext: starting")
@@ -136,9 +132,6 @@ func NewContext(opts *ContextOpts) (*Context, tfdiags.Diagnostics) {
par = 10
}
// TODO move up
plugins := plugins.NewPluginManager(context.TODO(), opts.Providers, opts.Provisioners)
log.Printf("[TRACE] tofu.NewContext: complete")
return &Context{
@@ -146,7 +139,7 @@ func NewContext(opts *ContextOpts) (*Context, tfdiags.Diagnostics) {
meta: opts.Meta,
uiInput: opts.UIInput,
plugins: plugins,
plugins: opts.Plugins.Manager(ctx),
parallelSem: NewSemaphore(par),
providerInputConfig: make(map[string]map[string]cty.Value),
@@ -292,45 +285,7 @@ func (c *Context) watchStop(walker *ContextGraphWalker) (chan struct{}, <-chan s
// If we're here, we're stopped, trigger the call.
log.Printf("[TRACE] Context: requesting providers and provisioners to gracefully stop")
{
// Copy the providers so that a misbehaved blocking Stop doesn't
// completely hang OpenTofu.
walker.providerLock.Lock()
toStop := make([]providers.Interface, 0, len(walker.providerCache))
for _, providerMap := range walker.providerCache {
for _, provider := range providerMap {
toStop = append(toStop, provider)
}
}
defer walker.providerLock.Unlock()
for _, p := range toStop {
// We ignore the error for now since there isn't any reasonable
// action to take if there is an error here, since the stop is still
// advisory: OpenTofu will exit once the graph node completes.
// The providers.Interface API contract requires that the
// context passed to Stop is never canceled and has no deadline.
_ = p.Stop(context.WithoutCancel(context.TODO()))
}
}
{
// Call stop on all the provisioners
walker.provisionerLock.Lock()
ps := make([]provisioners.Interface, 0, len(walker.provisionerCache))
for _, p := range walker.provisionerCache {
ps = append(ps, p)
}
defer walker.provisionerLock.Unlock()
for _, p := range ps {
// We ignore the error for now since there isn't any reasonable
// action to take if there is an error here, since the stop is still
// advisory: OpenTofu will exit once the graph node completes.
_ = p.Stop()
}
}
// TODO this should now be done when the manager is stopped
}()
return stop, wait

View File

@@ -18,8 +18,6 @@ import (
"github.com/opentofu/opentofu/internal/encryption"
"github.com/opentofu/opentofu/internal/instances"
"github.com/opentofu/opentofu/internal/plans"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/provisioners"
"github.com/opentofu/opentofu/internal/refactoring"
"github.com/opentofu/opentofu/internal/states"
"github.com/opentofu/opentofu/internal/tfdiags"
@@ -58,12 +56,6 @@ type ContextGraphWalker struct {
variableValuesLock sync.Mutex
variableValues map[string]map[string]cty.Value
providerLock sync.Mutex
providerCache map[string]map[addrs.InstanceKey]providers.Interface
provisionerLock sync.Mutex
provisionerCache map[string]provisioners.Interface
}
var _ GraphWalker = (*ContextGraphWalker)(nil)
@@ -111,7 +103,7 @@ func (w *ContextGraphWalker) EvalContext() EvalContext {
MoveResultsValue: w.MoveResults,
ImportResolverValue: w.ImportResolver,
ProviderInputConfig: w.Context.providerInputConfig,
ProviderLock: &w.providerLock,
ProviderLock: new(sync.Mutex),
ChangesValue: w.Changes,
ChecksValue: w.Checks,
StateValue: w.State,
@@ -129,8 +121,6 @@ func (w *ContextGraphWalker) EvalContext() EvalContext {
func (w *ContextGraphWalker) init() {
w.contexts = make(map[string]*BuiltinEvalContext)
w.providerCache = make(map[string]map[addrs.InstanceKey]providers.Interface)
w.provisionerCache = make(map[string]provisioners.Interface)
w.variableValues = make(map[string]map[string]cty.Value)
// Populate root module variable values. Other modules will be populated