Remove global schema cache and clean up tofu schema/contextPlugins

Signed-off-by: Christian Mesh <christianmesh1@gmail.com>
This commit is contained in:
Christian Mesh
2025-12-11 15:04:12 -05:00
parent 6a8e73110e
commit b6a6a44ba1
33 changed files with 416 additions and 537 deletions

View File

@@ -110,12 +110,8 @@ func (b *Local) opApply(
// operation.
runningOp.State = lr.InputState
schemas, moreDiags := lr.Core.Schemas(ctx, lr.Config, lr.InputState)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
op.ReportResult(runningOp, diags)
return
}
schemas := lr.Core.Schemas()
// stateHook uses schemas for when it periodically persists state to the
// persistent storage backend.
stateHook.Schemas = schemas

View File

@@ -203,12 +203,7 @@ func (b *Local) opPlan(
// Render the plan, if we produced one.
// (This might potentially be a partial plan with Errored set to true)
schemas, moreDiags := lr.Core.Schemas(ctx, lr.Config, lr.InputState)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
op.ReportResult(runningOp, diags)
return
}
schemas := lr.Core.Schemas()
// Write out any generated config, before we render the plan.
wroteConfig, moreDiags := maybeWriteGeneratedConfig(plan, op.GenerateConfigOut)

View File

@@ -95,12 +95,7 @@ func (b *Local) opRefresh(
}
// get schemas before writing state
schemas, moreDiags := lr.Core.Schemas(ctx, lr.Config, lr.InputState)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
op.ReportResult(runningOp, diags)
return
}
schemas := lr.Core.Schemas()
// Perform the refresh in a goroutine so we can be interrupted
var newState *states.State

View File

@@ -575,7 +575,7 @@ func (m *Meta) RunOperation(ctx context.Context, b backend.Enhanced, opReq *back
// contextOpts returns the options to use to initialize a OpenTofu
// context with the settings from this Meta.
func (m *Meta) contextOpts(ctx context.Context) (*tofu.ContextOpts, error) {
func (m *Meta) contextOpts(ctx context.Context, config *configs.Config, state *states.State) (*tofu.ContextOpts, error) {
workspace, err := m.Workspace(ctx)
if err != nil {
return nil, err
@@ -599,6 +599,49 @@ func (m *Meta) contextOpts(ctx context.Context) (*tofu.ContextOpts, error) {
opts.Provisioners = m.provisionerFactories()
}
// Only include the providers referenced by configuration or state
if config != nil || state != nil {
referenced := map[addrs.Provider]providers.Factory{}
if config != nil {
for _, fqn := range config.ProviderTypes() {
referenced[fqn] = opts.Providers[fqn]
}
}
if state != nil {
needed := providers.AddressedTypesAbs(state.ProviderAddrs())
for _, fqn := range needed {
referenced[fqn] = opts.Providers[fqn]
}
}
opts.Providers = referenced
}
// Only include provisioners referenced by configuration
if config != nil {
referenced := map[string]provisioners.Factory{}
// Determine the full list of provisioners recursively
var addProvisionersToSchema func(config *configs.Config)
addProvisionersToSchema = func(config *configs.Config) {
if config == nil {
return
}
for _, rc := range config.Module.ManagedResources {
for _, pc := range rc.Managed.Provisioners {
referenced[pc.Type] = opts.Provisioners[pc.Type]
}
}
// Must also visit our child modules, recursively.
for _, cc := range config.Children {
addProvisionersToSchema(cc)
}
}
addProvisionersToSchema(config)
opts.Provisioners = referenced
}
opts.Meta = &tofu.ContextMeta{
Env: workspace,
OriginalWorkingDir: m.WorkingDir.OriginalWorkingDir(),
@@ -951,7 +994,7 @@ func (c *Meta) MaybeGetSchemas(ctx context.Context, state *states.State, config
}
if config != nil || state != nil {
opts, err := c.contextOpts(ctx)
opts, err := c.contextOpts(ctx, config, state)
if err != nil {
diags = diags.Append(err)
return nil, diags
@@ -962,7 +1005,7 @@ func (c *Meta) MaybeGetSchemas(ctx context.Context, state *states.State, config
return nil, diags
}
var schemaDiags tfdiags.Diagnostics
schemas, schemaDiags := tfCtx.Schemas(ctx, config, state)
schemas := tfCtx.Schemas()
diags = diags.Append(schemaDiags)
if schemaDiags.HasErrors() {
return nil, diags

View File

@@ -404,7 +404,9 @@ func (m *Meta) BackendForLocalPlan(ctx context.Context, settings plans.Backend,
// backendCLIOpts returns a backend.CLIOpts object that should be passed to
// a backend that supports local CLI operations.
func (m *Meta) backendCLIOpts(ctx context.Context) (*backend.CLIOpts, error) {
contextOpts, err := m.contextOpts(ctx)
// TODO this does not allow for filtering schemas in it's current form
// This does not break anything per say, other than being slower and more verbose in some scenarios
contextOpts, err := m.contextOpts(ctx, nil, nil)
if contextOpts == nil && err != nil {
return nil, err
}

View File

@@ -361,10 +361,28 @@ func (m *Meta) internalProviders() map[string]providers.Factory {
}
}
func providerSchemaCache() func(func() providers.ProviderSchema) providers.ProviderSchema {
var mu sync.Mutex
var schema providers.ProviderSchema
return func(getSchema func() providers.ProviderSchema) providers.ProviderSchema {
mu.Lock()
defer mu.Unlock()
if schema.Provider.Block != nil {
return schema
}
schema = getSchema()
return schema
}
}
// providerFactory produces a provider factory that runs up the executable
// file in the given cache package and uses go-plugin to implement
// providers.Interface against it.
func providerFactory(meta *providercache.CachedProvider) providers.Factory {
schemaCache := providerSchemaCache()
return func() (providers.Interface, error) {
execFile, err := meta.ExecutableFile()
if err != nil {
@@ -395,7 +413,7 @@ func providerFactory(meta *providercache.CachedProvider) providers.Factory {
}
protoVer := client.NegotiatedVersion()
p, err := initializeProviderInstance(raw, protoVer, client, meta.Provider)
p, err := initializeProviderInstance(raw, protoVer, client, schemaCache)
if errors.Is(err, errUnsupportedProtocolVersion) {
panic(err)
}
@@ -406,18 +424,18 @@ func providerFactory(meta *providercache.CachedProvider) providers.Factory {
// initializeProviderInstance uses the plugin dispensed by the RPC client, and initializes a plugin instance
// per the protocol version
func initializeProviderInstance(plugin interface{}, protoVer int, pluginClient *plugin.Client, pluginAddr addrs.Provider) (providers.Interface, error) {
func initializeProviderInstance(plugin interface{}, protoVer int, pluginClient *plugin.Client, schemaCache func(func() providers.ProviderSchema) providers.ProviderSchema) (providers.Interface, error) {
// store the client so that the plugin can kill the child process
switch protoVer {
case 5:
p := plugin.(*tfplugin.GRPCProvider)
p.PluginClient = pluginClient
p.Addr = pluginAddr
p.SchemaCache = schemaCache
return p, nil
case 6:
p := plugin.(*tfplugin6.GRPCProvider)
p.PluginClient = pluginClient
p.Addr = pluginAddr
p.SchemaCache = schemaCache
return p, nil
default:
return nil, errUnsupportedProtocolVersion
@@ -441,6 +459,8 @@ func devOverrideProviderFactory(provider addrs.Provider, localDir getproviders.P
// reattach information to connect to go-plugin processes that are already
// running, and implements providers.Interface against it.
func unmanagedProviderFactory(provider addrs.Provider, reattach *plugin.ReattachConfig) providers.Factory {
schemaCache := providerSchemaCache()
return func() (providers.Interface, error) {
config := &plugin.ClientConfig{
HandshakeConfig: tfplugin.Handshake,
@@ -490,7 +510,7 @@ func unmanagedProviderFactory(provider addrs.Provider, reattach *plugin.Reattach
protoVer = 5
}
return initializeProviderInstance(raw, protoVer, client, provider)
return initializeProviderInstance(raw, protoVer, client, schemaCache)
}
}

View File

@@ -120,12 +120,7 @@ func (c *ProvidersSchemaCommand) Run(args []string) int {
return 1
}
schemas, moreDiags := lr.Core.Schemas(ctx, lr.Config, lr.InputState)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
c.showDiagnostics(diags)
return 1
}
schemas := lr.Core.Schemas()
jsonSchemas, err := jsonprovider.Marshal(schemas)
if err != nil {

View File

@@ -122,11 +122,7 @@ func (c *StateShowCommand) Run(args []string) int {
}
// Get the schemas from the context
schemas, diags := lr.Core.Schemas(ctx, lr.Config, lr.InputState)
if diags.HasErrors() {
c.View.Diagnostics(diags)
return 1
}
schemas := lr.Core.Schemas()
// Get the state
env, err := c.Workspace(ctx)
@@ -149,6 +145,7 @@ func (c *StateShowCommand) Run(args []string) int {
c.Streams.Eprintln(errStateNotFound)
return 1
}
var diags tfdiags.Diagnostics
migratedState, migrateDiags := tofumigrate.MigrateStateProviderAddresses(lr.Config, state)
diags = diags.Append(migrateDiags)
if migrateDiags.HasErrors() {

View File

@@ -10,7 +10,6 @@ import (
"context"
"fmt"
"log"
"path"
"slices"
"sort"
"strings"
@@ -242,7 +241,7 @@ func (c *TestCommand) Run(rawArgs []string) int {
return 1
}
opts, err := c.contextOpts(ctx)
opts, err := c.contextOpts(ctx, config, nil)
if err != nil {
diags = diags.Append(err)
view.Diagnostics(nil, nil, diags)
@@ -591,28 +590,15 @@ func (runner *TestFileRunner) ExecuteTestRun(ctx context.Context, run *moduletes
}
if runner.Suite.Verbose {
schemas, diags := planCtx.Schemas(ctx, config, plan.PlannedState)
schemas := planCtx.Schemas()
// If we're going to fail to render the plan, let's not fail the overall
// test. It can still have succeeded. So we'll add the diagnostics, but
// still report the test status as a success.
if diags.HasErrors() {
// This is very unlikely.
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Warning,
"Failed to print verbose output",
fmt.Sprintf("OpenTofu failed to print the verbose output for %s, other diagnostics will contain more details as to why.", path.Join(file.Name, run.Name))))
} else {
run.Verbose = &moduletest.Verbose{
Plan: plan,
State: plan.PlannedState,
Config: config,
Providers: schemas.Providers,
Provisioners: schemas.Provisioners,
}
run.Verbose = &moduletest.Verbose{
Plan: plan,
State: plan.PlannedState,
Config: config,
Providers: schemas.Providers,
Provisioners: schemas.Provisioners,
}
run.Diagnostics = run.Diagnostics.Append(diags)
}
planCtx.TestContext(config, plan.PlannedState, plan, variables).EvaluateAgainstPlan(run)
@@ -668,28 +654,14 @@ func (runner *TestFileRunner) ExecuteTestRun(ctx context.Context, run *moduletes
}
if runner.Suite.Verbose {
schemas, diags := planCtx.Schemas(ctx, config, plan.PlannedState)
// If we're going to fail to render the plan, let's not fail the overall
// test. It can still have succeeded. So we'll add the diagnostics, but
// still report the test status as a success.
if diags.HasErrors() {
// This is very unlikely.
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Warning,
"Failed to print verbose output",
fmt.Sprintf("OpenTofu failed to print the verbose output for %s, other diagnostics will contain more details as to why.", path.Join(file.Name, run.Name))))
} else {
run.Verbose = &moduletest.Verbose{
Plan: plan,
State: updated,
Config: config,
Providers: schemas.Providers,
Provisioners: schemas.Provisioners,
}
schemas := planCtx.Schemas()
run.Verbose = &moduletest.Verbose{
Plan: plan,
State: updated,
Config: config,
Providers: schemas.Providers,
Provisioners: schemas.Provisioners,
}
run.Diagnostics = run.Diagnostics.Append(diags)
}
applyCtx.TestContext(config, updated, plan, variables).EvaluateAgainstState(run)

View File

@@ -107,7 +107,7 @@ func (c *ValidateCommand) validate(ctx context.Context, dir, testDir string, noT
validate := func(cfg *configs.Config) tfdiags.Diagnostics {
var diags tfdiags.Diagnostics
opts, err := c.contextOpts(ctx)
opts, err := c.contextOpts(ctx, cfg, nil)
if err != nil {
diags = diags.Append(err)
return diags

View File

@@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"sync"
plugin "github.com/hashicorp/go-plugin"
"github.com/opentofu/opentofu/internal/plugin/validation"
@@ -18,7 +17,6 @@ import (
"github.com/zclconf/go-cty/cty/msgpack"
"google.golang.org/grpc"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/logging"
"github.com/opentofu/opentofu/internal/plugin/convert"
"github.com/opentofu/opentofu/internal/providers"
@@ -27,6 +25,16 @@ import (
var logger = logging.HCLogger()
// Some providers may generate quite large schemas, and the internal default
// grpc response size limit is 4MB. 64MB should cover most any use case, and
// if we get providers nearing that we may want to consider a finer-grained
// API to fetch individual resource schemas.
// Note: this option is marked as EXPERIMENTAL in the grpc API. We keep
// this for compatibility, but recent providers all set the max message
// size much higher on the server side, which is the supported method for
// determining payload size.
const maxRecvSize = 64 << 20
// GRPCProviderPlugin implements plugin.GRPCPlugin for the go-plugin package.
type GRPCProviderPlugin struct {
plugin.Plugin
@@ -75,11 +83,6 @@ type GRPCProvider struct {
// used in an end to end test of a provider.
TestServer *grpc.Server
// Addr uniquely identifies the type of provider.
// Normally executed providers will have this set during initialization,
// but it may not always be available for alternative execute modes.
Addr addrs.Provider
// Proto client use to make the grpc service calls.
client proto.ProviderClient
@@ -94,53 +97,44 @@ type GRPCProvider struct {
// to use as the parent context for gRPC API calls.
ctx context.Context
mu sync.Mutex
// schema stores the schema for this provider. This is used to properly
// serialize the requests for schemas.
schema providers.GetProviderSchemaResponse
// SchemaCache stores the schema for this provider. This is used to properly
// serialize the requests for schemas. This is shared between instances
// of the provider.
SchemaCache func(func() providers.GetProviderSchemaResponse) providers.GetProviderSchemaResponse
hasFetchedSchema bool
}
var _ providers.Interface = new(GRPCProvider)
func (p *GRPCProvider) GetProviderSchema(ctx context.Context) (resp providers.GetProviderSchemaResponse) {
logger.Trace("GRPCProvider: GetProviderSchema")
p.mu.Lock()
defer p.mu.Unlock()
// First, we check the global cache.
// The cache could contain this schema if an instance of this provider has previously been started.
if !p.Addr.IsZero() {
// Even if the schema is cached, GetProviderSchemaOptional could be false. This would indicate that once instantiated,
// this provider requires the get schema call to be made at least once, as it handles part of the provider's setup.
// At this point, we don't know if this is the first call to a provider instance or not, so we don't use the result in that case.
if schemaCached, ok := providers.SchemaCache.Get(p.Addr); ok && schemaCached.ServerCapabilities.GetProviderSchemaOptional {
logger.Trace("GRPCProvider: GetProviderSchema: serving from global schema cache", "address", p.Addr)
return schemaCached
}
// For testing only
if p.SchemaCache == nil {
return p.getProviderSchema(ctx)
}
// If the local cache is non-zero, we know this instance has called
// GetProviderSchema at least once, so has satisfied the possible requirement of `GetProviderSchemaOptional=false`.
// This means that we can return early now using the locally cached schema, without making this call again.
if p.schema.Provider.Block != nil {
return p.schema
schema := p.SchemaCache(func() providers.GetProviderSchemaResponse {
return p.getProviderSchema(ctx)
})
if !p.hasFetchedSchema && !schema.ServerCapabilities.GetProviderSchemaOptional {
// Force call
p.client.GetSchema(ctx, new(proto.GetProviderSchema_Request), grpc.MaxRecvMsgSizeCallOption{MaxRecvMsgSize: maxRecvSize})
p.hasFetchedSchema = true
}
return schema
}
func (p *GRPCProvider) getProviderSchema(ctx context.Context) (resp providers.GetProviderSchemaResponse) {
resp.ResourceTypes = make(map[string]providers.Schema)
resp.DataSources = make(map[string]providers.Schema)
resp.EphemeralResources = make(map[string]providers.Schema)
resp.Functions = make(map[string]providers.FunctionSpec)
// Some providers may generate quite large schemas, and the internal default
// grpc response size limit is 4MB. 64MB should cover most any use case, and
// if we get providers nearing that we may want to consider a finer-grained
// API to fetch individual resource schemas.
// Note: this option is marked as EXPERIMENTAL in the grpc API. We keep
// this for compatibility, but recent providers all set the max message
// size much higher on the server side, which is the supported method for
// determining payload size.
const maxRecvSize = 64 << 20
protoResp, err := p.client.GetSchema(ctx, new(proto.GetProviderSchema_Request), grpc.MaxRecvMsgSizeCallOption{MaxRecvMsgSize: maxRecvSize})
p.hasFetchedSchema = true
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
return resp
@@ -188,23 +182,6 @@ func (p *GRPCProvider) GetProviderSchema(ctx context.Context) (resp providers.Ge
resp.ServerCapabilities.GetProviderSchemaOptional = protoResp.ServerCapabilities.GetProviderSchemaOptional
}
// Set the global provider cache so that future calls to this provider can use the cached value.
// Crucially, this doesn't look at GetProviderSchemaOptional, because the layers above could use this cache
// *without* creating an instance of this provider. And if there is no instance,
// then we don't need to set up anything (cause there is nothing to set up), so we need no call
// to the providers GetSchema rpc.
if !p.Addr.IsZero() {
providers.SchemaCache.Set(p.Addr, resp)
}
// Always store this here in the client for providers that are not able to use GetProviderSchemaOptional.
// Crucially, this indicates that we've made at least one call to GetProviderSchema to this instance of the provider,
// which means in the future we'll be able to return using this cache
// (because the possible setup contained in the GetProviderSchema call has happened).
// If GetProviderSchemaOptional is true then this cache won't actually ever be used, because the calls to this method
// will be satisfied by the global provider cache.
p.schema = resp
return resp
}

View File

@@ -11,6 +11,7 @@ import (
"fmt"
"slices"
"strings"
"sync"
"testing"
"time"
@@ -22,7 +23,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/legacy/hcl2shim"
mockproto "github.com/opentofu/opentofu/internal/plugin/mock_proto"
"github.com/opentofu/opentofu/internal/providers"
@@ -32,6 +32,23 @@ import (
var _ providers.Interface = (*GRPCProvider)(nil)
// TODO this should probably live somewhere common
func providerSchemaCache() func(func() providers.ProviderSchema) providers.ProviderSchema {
var mu sync.Mutex
var schema providers.ProviderSchema
return func(getSchema func() providers.ProviderSchema) providers.ProviderSchema {
mu.Lock()
defer mu.Unlock()
if schema.Provider.Block != nil {
return schema
}
schema = getSchema()
return schema
}
}
func mutateSchemaResponse(response *proto.GetProviderSchema_Response, mut ...func(schemaResponse *proto.GetProviderSchema_Response)) *proto.GetProviderSchema_Response {
for _, f := range mut {
f(response)
@@ -205,13 +222,8 @@ func TestGRPCProvider_GetSchema_GRPCError(t *testing.T) {
func TestGRPCProvider_GetSchema_GlobalCacheEnabled(t *testing.T) {
ctrl := gomock.NewController(t)
client := mockproto.NewMockProviderClient(ctrl)
// The SchemaCache is global and is saved between test runs
providers.SchemaCache = providers.NewMockSchemaCache()
providerAddr := addrs.Provider{
Namespace: "namespace",
Type: "type",
}
cache := providerSchemaCache()
mockedProviderResponse := &proto.Schema{Version: 2, Block: &proto.Schema_Block{}}
@@ -227,8 +239,8 @@ func TestGRPCProvider_GetSchema_GlobalCacheEnabled(t *testing.T) {
// Run GetProviderTwice, expect GetSchema to be called once
// Re-initialize the provider before each run to avoid usage of the local cache
p := &GRPCProvider{
client: client,
Addr: providerAddr,
client: client,
SchemaCache: cache,
}
resp := p.GetProviderSchema(t.Context())
@@ -238,8 +250,8 @@ func TestGRPCProvider_GetSchema_GlobalCacheEnabled(t *testing.T) {
}
p = &GRPCProvider{
client: client,
Addr: providerAddr,
client: client,
SchemaCache: cache,
}
resp = p.GetProviderSchema(t.Context())
@@ -252,13 +264,6 @@ func TestGRPCProvider_GetSchema_GlobalCacheEnabled(t *testing.T) {
func TestGRPCProvider_GetSchema_GlobalCacheDisabled(t *testing.T) {
ctrl := gomock.NewController(t)
client := mockproto.NewMockProviderClient(ctrl)
// The SchemaCache is global and is saved between test runs
providers.SchemaCache = providers.NewMockSchemaCache()
providerAddr := addrs.Provider{
Namespace: "namespace",
Type: "type",
}
mockedProviderResponse := &proto.Schema{Version: 2, Block: &proto.Schema_Block{}}
@@ -274,8 +279,8 @@ func TestGRPCProvider_GetSchema_GlobalCacheDisabled(t *testing.T) {
// Run GetProviderTwice, expect GetSchema to be called once
// Re-initialize the provider before each run to avoid usage of the local cache
p := &GRPCProvider{
client: client,
Addr: providerAddr,
client: client,
SchemaCache: providerSchemaCache(),
}
resp := p.GetProviderSchema(t.Context())
@@ -285,8 +290,8 @@ func TestGRPCProvider_GetSchema_GlobalCacheDisabled(t *testing.T) {
}
p = &GRPCProvider{
client: client,
Addr: providerAddr,
client: client,
SchemaCache: providerSchemaCache(),
}
resp = p.GetProviderSchema(t.Context())

View File

@@ -9,7 +9,6 @@ import (
"context"
"errors"
"fmt"
"sync"
plugin "github.com/hashicorp/go-plugin"
"github.com/opentofu/opentofu/internal/plugin6/validation"
@@ -18,7 +17,6 @@ import (
"github.com/zclconf/go-cty/cty/msgpack"
"google.golang.org/grpc"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/logging"
"github.com/opentofu/opentofu/internal/plugin6/convert"
"github.com/opentofu/opentofu/internal/providers"
@@ -27,6 +25,16 @@ import (
var logger = logging.HCLogger()
// Some providers may generate quite large schemas, and the internal default
// grpc response size limit is 4MB. 64MB should cover most any use case, and
// if we get providers nearing that we may want to consider a finer-grained
// API to fetch individual resource schemas.
// Note: this option is marked as EXPERIMENTAL in the grpc API. We keep
// this for compatibility, but recent providers all set the max message
// size much higher on the server side, which is the supported method for
// determining payload size.
const maxRecvSize = 64 << 20
// GRPCProviderPlugin implements plugin.GRPCPlugin for the go-plugin package.
type GRPCProviderPlugin struct {
plugin.Plugin
@@ -75,11 +83,6 @@ type GRPCProvider struct {
// used in an end to end test of a provider.
TestServer *grpc.Server
// Addr uniquely identifies the type of provider.
// Normally executed providers will have this set during initialization,
// but it may not always be available for alternative execute modes.
Addr addrs.Provider
// Proto client use to make the grpc service calls.
client proto6.ProviderClient
@@ -94,53 +97,45 @@ type GRPCProvider struct {
// to use as the parent context for gRPC API calls.
ctx context.Context
mu sync.Mutex
// schema stores the schema for this provider. This is used to properly
// serialize the requests for schemas.
schema providers.GetProviderSchemaResponse
// SchemaCache stores the schema for this provider. This is used to properly
// serialize the requests for schemas. This is shared between instances
// of the provider.
SchemaCache func(func() providers.GetProviderSchemaResponse) providers.GetProviderSchemaResponse
hasFetchedSchema bool
}
var _ providers.Interface = new(GRPCProvider)
func (p *GRPCProvider) GetProviderSchema(ctx context.Context) (resp providers.GetProviderSchemaResponse) {
logger.Trace("GRPCProvider.v6: GetProviderSchema")
p.mu.Lock()
defer p.mu.Unlock()
// First, we check the global cache.
// The cache could contain this schema if an instance of this provider has previously been started.
if !p.Addr.IsZero() {
// Even if the schema is cached, GetProviderSchemaOptional could be false. This would indicate that once instantiated,
// this provider requires the get schema call to be made at least once, as it handles part of the provider's setup.
// At this point, we don't know if this is the first call to a provider instance or not, so we don't use the result in that case.
if schemaCached, ok := providers.SchemaCache.Get(p.Addr); ok && schemaCached.ServerCapabilities.GetProviderSchemaOptional {
logger.Trace("GRPCProvider: GetProviderSchema: serving from global schema cache", "address", p.Addr)
return schemaCached
}
// For testing only
if p.SchemaCache == nil {
return p.getProviderSchema(ctx)
}
// If the local cache is non-zero, we know this instance has called
// GetProviderSchema at least once, so has satisfied the possible requirement of `GetProviderSchemaOptional=false`.
// This means that we can return early now using the locally cached schema, without making this call again.
if p.schema.Provider.Block != nil {
return p.schema
schema := p.SchemaCache(func() providers.GetProviderSchemaResponse {
return p.getProviderSchema(ctx)
})
if !p.hasFetchedSchema && !schema.ServerCapabilities.GetProviderSchemaOptional {
// Force call
p.client.GetProviderSchema(ctx, new(proto6.GetProviderSchema_Request), grpc.MaxRecvMsgSizeCallOption{MaxRecvMsgSize: maxRecvSize})
p.hasFetchedSchema = true
}
return schema
}
func (p *GRPCProvider) getProviderSchema(ctx context.Context) (resp providers.GetProviderSchemaResponse) {
resp.ResourceTypes = make(map[string]providers.Schema)
resp.DataSources = make(map[string]providers.Schema)
resp.EphemeralResources = make(map[string]providers.Schema)
resp.Functions = make(map[string]providers.FunctionSpec)
// Some providers may generate quite large schemas, and the internal default
// grpc response size limit is 4MB. 64MB should cover most any use case, and
// if we get providers nearing that we may want to consider a finer-grained
// API to fetch individual resource schemas.
// Note: this option is marked as EXPERIMENTAL in the grpc API. We keep
// this for compatibility, but recent providers all set the max message
// size much higher on the server side, which is the supported method for
// determining payload size.
const maxRecvSize = 64 << 20
protoResp, err := p.client.GetProviderSchema(ctx, new(proto6.GetProviderSchema_Request), grpc.MaxRecvMsgSizeCallOption{MaxRecvMsgSize: maxRecvSize})
p.hasFetchedSchema = true
if err != nil {
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
return resp
@@ -188,23 +183,6 @@ func (p *GRPCProvider) GetProviderSchema(ctx context.Context) (resp providers.Ge
resp.ServerCapabilities.GetProviderSchemaOptional = protoResp.ServerCapabilities.GetProviderSchemaOptional
}
// Set the global provider cache so that future calls to this provider can use the cached value.
// Crucially, this doesn't look at GetProviderSchemaOptional, because the layers above could use this cache
// *without* creating an instance of this provider. And if there is no instance,
// then we don't need to set up anything (cause there is nothing to set up), so we need no call
// to the providers GetSchema rpc.
if !p.Addr.IsZero() {
providers.SchemaCache.Set(p.Addr, resp)
}
// Always store this here in the client for providers that are not able to use GetProviderSchemaOptional.
// Crucially, this indicates that we've made at least one call to GetProviderSchema to this instance of the provider,
// which means in the future we'll be able to return using this cache
// (because the possible setup contained in the GetProviderSchema call has happened).
// If GetProviderSchemaOptional is true then this cache won't actually ever be used, because the calls to this method
// will be satisfied by the global provider cache.
p.schema = resp
return resp
}

View File

@@ -11,6 +11,7 @@ import (
"fmt"
"slices"
"strings"
"sync"
"testing"
"time"
@@ -23,7 +24,6 @@ import (
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/timestamppb"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/legacy/hcl2shim"
mockproto "github.com/opentofu/opentofu/internal/plugin6/mock_proto"
"github.com/opentofu/opentofu/internal/providers"
@@ -31,6 +31,23 @@ import (
proto "github.com/opentofu/opentofu/internal/tfplugin6"
)
// TODO this should probably live somewhere common
func providerSchemaCache() func(func() providers.ProviderSchema) providers.ProviderSchema {
var mu sync.Mutex
var schema providers.ProviderSchema
return func(getSchema func() providers.ProviderSchema) providers.ProviderSchema {
mu.Lock()
defer mu.Unlock()
if schema.Provider.Block != nil {
return schema
}
schema = getSchema()
return schema
}
}
var _ providers.Interface = (*GRPCProvider)(nil)
var (
@@ -243,13 +260,8 @@ func TestGRPCProvider_GetSchema_ResponseErrorDiagnostic(t *testing.T) {
func TestGRPCProvider_GetSchema_GlobalCacheEnabled(t *testing.T) {
ctrl := gomock.NewController(t)
client := mockproto.NewMockProviderClient(ctrl)
// The SchemaCache is global and is saved between test runs
providers.SchemaCache = providers.NewMockSchemaCache()
providerAddr := addrs.Provider{
Namespace: "namespace",
Type: "type",
}
cache := providerSchemaCache()
mockedProviderResponse := &proto.Schema{Version: 2, Block: &proto.Schema_Block{}}
@@ -265,8 +277,8 @@ func TestGRPCProvider_GetSchema_GlobalCacheEnabled(t *testing.T) {
// Run GetProviderTwice, expect GetSchema to be called once
// Re-initialize the provider before each run to avoid usage of the local cache
p := &GRPCProvider{
client: client,
Addr: providerAddr,
client: client,
SchemaCache: cache,
}
resp := p.GetProviderSchema(t.Context())
@@ -276,8 +288,8 @@ func TestGRPCProvider_GetSchema_GlobalCacheEnabled(t *testing.T) {
}
p = &GRPCProvider{
client: client,
Addr: providerAddr,
client: client,
SchemaCache: cache,
}
resp = p.GetProviderSchema(t.Context())
@@ -290,13 +302,6 @@ func TestGRPCProvider_GetSchema_GlobalCacheEnabled(t *testing.T) {
func TestGRPCProvider_GetSchema_GlobalCacheDisabled(t *testing.T) {
ctrl := gomock.NewController(t)
client := mockproto.NewMockProviderClient(ctrl)
// The SchemaCache is global and is saved between test runs
providers.SchemaCache = providers.NewMockSchemaCache()
providerAddr := addrs.Provider{
Namespace: "namespace",
Type: "type",
}
mockedProviderResponse := &proto.Schema{Version: 2, Block: &proto.Schema_Block{}}
@@ -312,8 +317,8 @@ func TestGRPCProvider_GetSchema_GlobalCacheDisabled(t *testing.T) {
// Run GetProviderTwice, expect GetSchema to be called once
// Re-initialize the provider before each run to avoid usage of the local cache
p := &GRPCProvider{
client: client,
Addr: providerAddr,
client: client,
SchemaCache: providerSchemaCache(),
}
resp := p.GetProviderSchema(t.Context())
@@ -323,8 +328,8 @@ func TestGRPCProvider_GetSchema_GlobalCacheDisabled(t *testing.T) {
}
p = &GRPCProvider{
client: client,
Addr: providerAddr,
client: client,
SchemaCache: providerSchemaCache(),
}
resp = p.GetProviderSchema(t.Context())

View File

@@ -1,9 +0,0 @@
package providers
import "github.com/opentofu/opentofu/internal/addrs"
func NewMockSchemaCache() *schemaCache {
return &schemaCache{
m: make(map[addrs.Provider]ProviderSchema),
}
}

View File

@@ -1,49 +0,0 @@
// Copyright (c) The OpenTofu Authors
// SPDX-License-Identifier: MPL-2.0
// Copyright (c) 2023 HashiCorp, Inc.
// SPDX-License-Identifier: MPL-2.0
package providers
import (
"sync"
"github.com/opentofu/opentofu/internal/addrs"
)
// SchemaCache is a global cache of Schemas.
// This will be accessed by both core and the provider clients to ensure that
// large schemas are stored in a single location.
var SchemaCache = &schemaCache{
m: make(map[addrs.Provider]ProviderSchema),
}
// Global cache for provider schemas
// Cache the entire response to ensure we capture any new fields, like
// ServerCapabilities. This also serves to capture errors so that multiple
// concurrent calls resulting in an error can be handled in the same manner.
type schemaCache struct {
mu sync.Mutex
m map[addrs.Provider]ProviderSchema
}
func (c *schemaCache) Set(p addrs.Provider, s ProviderSchema) {
c.mu.Lock()
defer c.mu.Unlock()
c.m[p] = s
}
func (c *schemaCache) Get(p addrs.Provider) (ProviderSchema, bool) {
c.mu.Lock()
defer c.mu.Unlock()
s, ok := c.m[p]
return s, ok
}
func (c *schemaCache) Remove(p addrs.Provider) {
c.mu.Lock()
defer c.mu.Unlock()
delete(c.m, p)
}

View File

@@ -6,6 +6,8 @@
package providers
import (
"fmt"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/configs/configschema"
)
@@ -39,3 +41,48 @@ func (ss ProviderSchema) SchemaForResourceType(mode addrs.ResourceMode, typeName
func (ss ProviderSchema) SchemaForResourceAddr(addr addrs.Resource) (schema *configschema.Block, version uint64) {
return ss.SchemaForResourceType(addr.Mode, addr.Type)
}
func (resp ProviderSchema) Validate(addr addrs.Provider) error {
if resp.Diagnostics.HasErrors() {
return fmt.Errorf("failed to retrieve schema from provider %q: %w", addr, resp.Diagnostics.Err())
}
if resp.Provider.Version < 0 {
// We're not using the version numbers here yet, but we'll check
// for validity anyway in case we start using them in future.
return fmt.Errorf("provider %s has invalid negative schema version for its configuration blocks,which is a bug in the provider ", addr)
}
for t, r := range resp.ResourceTypes {
if err := r.Block.InternalValidate(); err != nil {
return fmt.Errorf("provider %s has invalid schema for managed resource type %q, which is a bug in the provider: %w", addr, t, err)
}
if r.Version < 0 {
return fmt.Errorf("provider %s has invalid negative schema version for managed resource type %q, which is a bug in the provider", addr, t)
}
}
for t, d := range resp.DataSources {
if err := d.Block.InternalValidate(); err != nil {
return fmt.Errorf("provider %s has invalid schema for data resource type %q, which is a bug in the provider: %w", addr, t, err)
}
if d.Version < 0 {
// We're not using the version numbers here yet, but we'll check
// for validity anyway in case we start using them in future.
return fmt.Errorf("provider %s has invalid negative schema version for data resource type %q, which is a bug in the provider", addr, t)
}
}
for t, d := range resp.EphemeralResources {
if err := d.Block.InternalValidate(); err != nil {
return fmt.Errorf("provider %s has invalid schema for ephemeral resource type %q, which is a bug in the provider: %w", addr, t, err)
}
if d.Version < 0 {
// We're not using the version numbers here yet, but we'll check
// for validity anyway in case we start using them in future.
return fmt.Errorf("provider %s has invalid negative schema version for ephemeral resource type %q, which is a bug in the provider", addr, t)
}
}
return nil
}

View File

@@ -20,7 +20,6 @@ import (
"github.com/opentofu/opentofu/internal/logging"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/provisioners"
"github.com/opentofu/opentofu/internal/states"
"github.com/opentofu/opentofu/internal/tfdiags"
)
@@ -135,7 +134,11 @@ func NewContext(opts *ContextOpts) (*Context, tfdiags.Diagnostics) {
par = 10
}
plugins := newContextPlugins(opts.Providers, opts.Provisioners)
plugins, pluginDiags := newContextPlugins(opts.Providers, opts.Provisioners)
diags = diags.Append(pluginDiags)
if diags.HasErrors() {
return nil, diags
}
log.Printf("[TRACE] tofu.NewContext: complete")
@@ -154,19 +157,8 @@ func NewContext(opts *ContextOpts) (*Context, tfdiags.Diagnostics) {
}, diags
}
func (c *Context) Schemas(ctx context.Context, config *configs.Config, state *states.State) (*Schemas, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
ret, err := loadSchemas(ctx, config, state, c.plugins)
if err != nil {
diags = diags.Append(tfdiags.Sourceless(
tfdiags.Error,
"Failed to load plugin schemas",
fmt.Sprintf("Error while loading schemas for plugin components: %s.", err),
))
return nil, diags
}
return ret, diags
func (c *Context) Schemas() *Schemas {
return c.plugins.schemas
}
type ContextGraphOpts struct {

View File

@@ -471,12 +471,6 @@ variable "obfmod" {
// Defaulted stub provider with non-custom function
func TestContext2Functions_providerFunctionsStub(t *testing.T) {
p := testProvider("aws")
addr := addrs.ImpliedProviderForUnqualifiedType("aws")
// Explicitly non-parallel
t.Setenv("foo", "bar")
defer providers.SchemaCache.Remove(addr)
p.GetProviderSchemaResponse = &providers.GetProviderSchemaResponse{
Functions: map[string]providers.FunctionSpec{
"arn_parse": providers.FunctionSpec{
@@ -492,9 +486,6 @@ func TestContext2Functions_providerFunctionsStub(t *testing.T) {
Result: cty.True,
}
// SchemaCache is initialzed earlier on in the command package
providers.SchemaCache.Set(addr, *p.GetProviderSchemaResponse)
m := testModuleInline(t, map[string]string{
"main.tf": `
module "mod" {
@@ -571,12 +562,6 @@ variable "obfmod" {
// Defaulted stub provider with custom function (no allowed)
func TestContext2Functions_providerFunctionsStubCustom(t *testing.T) {
p := testProvider("aws")
addr := addrs.ImpliedProviderForUnqualifiedType("aws")
// Explicitly non-parallel
t.Setenv("foo", "bar")
defer providers.SchemaCache.Remove(addr)
p.GetProviderSchemaResponse = &providers.GetProviderSchemaResponse{
Functions: map[string]providers.FunctionSpec{
"arn_parse": providers.FunctionSpec{
@@ -592,9 +577,6 @@ func TestContext2Functions_providerFunctionsStubCustom(t *testing.T) {
Result: cty.True,
}
// SchemaCache is initialzed earlier on in the command package
providers.SchemaCache.Set(addr, *p.GetProviderSchemaResponse)
m := testModuleInline(t, map[string]string{
"main.tf": `
module "mod" {
@@ -655,12 +637,6 @@ variable "obfmod" {
// Defaulted stub provider
func TestContext2Functions_providerFunctionsForEachCount(t *testing.T) {
p := testProvider("aws")
addr := addrs.ImpliedProviderForUnqualifiedType("aws")
// Explicitly non-parallel
t.Setenv("foo", "bar")
defer providers.SchemaCache.Remove(addr)
p.GetProviderSchemaResponse = &providers.GetProviderSchemaResponse{
Functions: map[string]providers.FunctionSpec{
"arn_parse": providers.FunctionSpec{
@@ -676,9 +652,6 @@ func TestContext2Functions_providerFunctionsForEachCount(t *testing.T) {
Result: cty.True,
}
// SchemaCache is initialzed earlier on in the command package
providers.SchemaCache.Set(addr, *p.GetProviderSchemaResponse)
m := testModuleInline(t, map[string]string{
"main.tf": `
provider "aws" {

View File

@@ -56,11 +56,7 @@ func (c *Context) Input(ctx context.Context, config *configs.Config, mode InputM
)
defer span.End()
schemas, moreDiags := c.Schemas(ctx, config, nil)
diags = diags.Append(moreDiags)
if moreDiags.HasErrors() {
return diags
}
schemas := c.Schemas()
if c.uiInput == nil {
log.Printf("[TRACE] Context.Input: uiInput is nil, so skipping")

View File

@@ -280,9 +280,7 @@ The -target and -exclude options are not for routine use, and are provided only
}
if plan != nil {
relevantAttrs, rDiags := c.relevantResourceAttrsForPlan(ctx, config, plan)
diags = diags.Append(rDiags)
plan.RelevantAttributes = relevantAttrs
plan.RelevantAttributes = c.relevantResourceAttrsForPlan(ctx, config, plan)
}
if diags.HasErrors() {
@@ -487,10 +485,7 @@ func (c *Context) destroyPlan(ctx context.Context, config *configs.Config, prevR
destroyPlan.PrevRunState = prevRunState
}
relevantAttrs, rDiags := c.relevantResourceAttrsForPlan(ctx, config, destroyPlan)
diags = diags.Append(rDiags)
destroyPlan.RelevantAttributes = relevantAttrs
destroyPlan.RelevantAttributes = c.relevantResourceAttrsForPlan(ctx, config, destroyPlan)
return destroyPlan, diags
}
@@ -931,11 +926,7 @@ func (c *Context) driftedResources(ctx context.Context, config *configs.Config,
return nil, diags
}
schemas, schemaDiags := c.Schemas(ctx, config, newState)
diags = diags.Append(schemaDiags)
if diags.HasErrors() {
return nil, diags
}
schemas := c.Schemas()
var drs []*plans.ResourceInstanceChangeSrc
@@ -1108,21 +1099,14 @@ func blockedMovesWarningDiag(results refactoring.MoveResults) tfdiags.Diagnostic
// referenceAnalyzer returns a globalref.Analyzer object to help with
// global analysis of references within the configuration that's attached
// to the receiving context.
func (c *Context) referenceAnalyzer(ctx context.Context, config *configs.Config, state *states.State) (*globalref.Analyzer, tfdiags.Diagnostics) {
schemas, diags := c.Schemas(ctx, config, state)
if diags.HasErrors() {
return nil, diags
}
return globalref.NewAnalyzer(config, schemas.Providers), diags
func (c *Context) referenceAnalyzer(ctx context.Context, config *configs.Config) *globalref.Analyzer {
return globalref.NewAnalyzer(config, c.Schemas().Providers)
}
// relevantResourceAttrsForPlan implements the heuristic we use to populate the
// RelevantResources field of returned plans.
func (c *Context) relevantResourceAttrsForPlan(ctx context.Context, config *configs.Config, plan *plans.Plan) ([]globalref.ResourceAttr, tfdiags.Diagnostics) {
azr, diags := c.referenceAnalyzer(ctx, config, plan.PriorState)
if diags.HasErrors() {
return nil, diags
}
func (c *Context) relevantResourceAttrsForPlan(ctx context.Context, config *configs.Config, plan *plans.Plan) []globalref.ResourceAttr {
azr := c.referenceAnalyzer(ctx, config)
var refs []globalref.Reference
for _, change := range plan.Changes.Resources {
@@ -1151,7 +1135,7 @@ func (c *Context) relevantResourceAttrsForPlan(ctx context.Context, config *conf
}
}
return contributors, diags
return contributors
}
// warnOnUsedDeprecatedVars is checking for variables whose values are given by the user and if any of that is

View File

@@ -6760,11 +6760,6 @@ func TestContext2Plan_importIdInvalidNull(t *testing.T) {
func TestContext2Plan_importIdInvalidUnknown(t *testing.T) {
p := testProvider("test")
m := testModule(t, "import-id-invalid-unknown")
ctx := testContext2(t, &ContextOpts{
Providers: map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("test"): testProviderFuncFixed(p),
},
})
p.GetProviderSchemaResponse = getProviderSchemaResponseFromProviderSchema(&ProviderSchema{
ResourceTypes: map[string]*configschema.Block{
"test_resource": {
@@ -6794,6 +6789,11 @@ func TestContext2Plan_importIdInvalidUnknown(t *testing.T) {
},
},
}
ctx := testContext2(t, &ContextOpts{
Providers: map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("test"): testProviderFuncFixed(p),
},
})
_, diags := ctx.Plan(context.Background(), m, states.NewState(), DefaultPlanOpts)
if !diags.HasErrors() {

View File

@@ -8,12 +8,12 @@ package tofu
import (
"context"
"fmt"
"log"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/configs/configschema"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/provisioners"
"github.com/opentofu/opentofu/internal/tfdiags"
)
// contextPlugins represents a library of available plugins (providers and
@@ -23,13 +23,24 @@ import (
type contextPlugins struct {
providerFactories map[addrs.Provider]providers.Factory
provisionerFactories map[string]provisioners.Factory
schemas *Schemas
}
func newContextPlugins(providerFactories map[addrs.Provider]providers.Factory, provisionerFactories map[string]provisioners.Factory) *contextPlugins {
func newContextPlugins(providerFactories map[addrs.Provider]providers.Factory, provisionerFactories map[string]provisioners.Factory) (*contextPlugins, tfdiags.Diagnostics) {
schemas, err := loadSchemas(context.TODO(), providerFactories, provisionerFactories)
if err != nil {
return nil, tfdiags.Diagnostics{}.Append(tfdiags.Sourceless(
tfdiags.Error,
"Failed to load plugin schemas",
fmt.Sprintf("Error while loading schemas for plugin components: %s.", err),
))
}
return &contextPlugins{
providerFactories: providerFactories,
provisionerFactories: provisionerFactories,
}
schemas: schemas,
}, nil
}
func (cp *contextPlugins) HasProvider(addr addrs.Provider) bool {
@@ -67,81 +78,20 @@ func (cp *contextPlugins) NewProvisionerInstance(typ string) (provisioners.Inter
// ProviderSchema memoizes results by unique provider address, so it's fine
// to repeatedly call this method with the same address if various different
// parts of OpenTofu all need the same schema information.
func (cp *contextPlugins) ProviderSchema(ctx context.Context, addr addrs.Provider) (providers.ProviderSchema, error) {
// Check the global schema cache first.
// This cache is only written by the provider client, and transparently
// used by GetProviderSchema, but we check it here because at this point we
// may be able to avoid spinning up the provider instance at all.
//
// It's worth noting that ServerCapabilities.GetProviderSchemaOptional is ignored here.
// That is because we're checking *prior* to the provider's instantiation.
// GetProviderSchemaOptional only says that *if we instantiate a provider*,
// then we need to run the get schema call at least once.
// BUG This SHORT CIRCUITS the logic below and is not the only code which inserts provider schemas into the cache!!
schemas, ok := providers.SchemaCache.Get(addr)
if ok {
log.Printf("[TRACE] tofu.contextPlugins: Serving provider %q schema from global schema cache", addr)
return schemas, nil
func (cp *contextPlugins) ProviderSchema(addr addrs.Provider) (providers.ProviderSchema, error) {
schema, ok := cp.schemas.Providers[addr]
if !ok {
return schema, fmt.Errorf("unavailable provider %q", addr.String())
}
log.Printf("[TRACE] tofu.contextPlugins: Initializing provider %q to read its schema", addr)
provider, err := cp.NewProviderInstance(addr)
if err != nil {
return schemas, fmt.Errorf("failed to instantiate provider %q to obtain schema: %w", addr, err)
}
defer provider.Close(ctx)
resp := provider.GetProviderSchema(ctx)
if resp.Diagnostics.HasErrors() {
return resp, fmt.Errorf("failed to retrieve schema from provider %q: %w", addr, resp.Diagnostics.Err())
}
if resp.Provider.Version < 0 {
// We're not using the version numbers here yet, but we'll check
// for validity anyway in case we start using them in future.
return resp, fmt.Errorf("provider %s has invalid negative schema version for its configuration blocks,which is a bug in the provider ", addr)
}
for t, r := range resp.ResourceTypes {
if err := r.Block.InternalValidate(); err != nil {
return resp, fmt.Errorf("provider %s has invalid schema for managed resource type %q, which is a bug in the provider: %w", addr, t, err)
}
if r.Version < 0 {
return resp, fmt.Errorf("provider %s has invalid negative schema version for managed resource type %q, which is a bug in the provider", addr, t)
}
}
for t, d := range resp.DataSources {
if err := d.Block.InternalValidate(); err != nil {
return resp, fmt.Errorf("provider %s has invalid schema for data resource type %q, which is a bug in the provider: %w", addr, t, err)
}
if d.Version < 0 {
// We're not using the version numbers here yet, but we'll check
// for validity anyway in case we start using them in future.
return resp, fmt.Errorf("provider %s has invalid negative schema version for data resource type %q, which is a bug in the provider", addr, t)
}
}
for t, d := range resp.EphemeralResources {
if err := d.Block.InternalValidate(); err != nil {
return resp, fmt.Errorf("provider %s has invalid schema for ephemeral resource type %q, which is a bug in the provider: %w", addr, t, err)
}
if d.Version < 0 {
// We're not using the version numbers here yet, but we'll check
// for validity anyway in case we start using them in future.
return resp, fmt.Errorf("provider %s has invalid negative schema version for ephemeral resource type %q, which is a bug in the provider", addr, t)
}
}
return resp, nil
return schema, nil
}
// ProviderConfigSchema is a helper wrapper around ProviderSchema which first
// reads the full schema of the given provider and then extracts just the
// provider's configuration schema, which defines what's expected in a
// "provider" block in the configuration when configuring this provider.
func (cp *contextPlugins) ProviderConfigSchema(ctx context.Context, providerAddr addrs.Provider) (*configschema.Block, error) {
providerSchema, err := cp.ProviderSchema(ctx, providerAddr)
func (cp *contextPlugins) ProviderConfigSchema(providerAddr addrs.Provider) (*configschema.Block, error) {
providerSchema, err := cp.ProviderSchema(providerAddr)
if err != nil {
return nil, err
}
@@ -160,8 +110,8 @@ func (cp *contextPlugins) ProviderConfigSchema(ctx context.Context, providerAddr
// Managed resource types have versioned schemas, so the second return value
// is the current schema version number for the requested resource. The version
// is irrelevant for other resource modes.
func (cp *contextPlugins) ResourceTypeSchema(ctx context.Context, providerAddr addrs.Provider, resourceMode addrs.ResourceMode, resourceType string) (*configschema.Block, uint64, error) {
providerSchema, err := cp.ProviderSchema(ctx, providerAddr)
func (cp *contextPlugins) ResourceTypeSchema(providerAddr addrs.Provider, resourceMode addrs.ResourceMode, resourceType string) (*configschema.Block, uint64, error) {
providerSchema, err := cp.ProviderSchema(providerAddr)
if err != nil {
return nil, 0, err
}
@@ -177,17 +127,9 @@ func (cp *contextPlugins) ResourceTypeSchema(ctx context.Context, providerAddr a
// to repeatedly call this method with the same name if various different
// parts of OpenTofu all need the same schema information.
func (cp *contextPlugins) ProvisionerSchema(typ string) (*configschema.Block, error) {
log.Printf("[TRACE] tofu.contextPlugins: Initializing provisioner %q to read its schema", typ)
provisioner, err := cp.NewProvisionerInstance(typ)
if err != nil {
return nil, fmt.Errorf("failed to instantiate provisioner %q to obtain schema: %w", typ, err)
schema, ok := cp.schemas.Provisioners[typ]
if !ok {
return schema, fmt.Errorf("unavailable provisioner %q", typ)
}
defer provisioner.Close()
resp := provisioner.GetSchema()
if resp.Diagnostics.HasErrors() {
return nil, fmt.Errorf("failed to retrieve schema from provisioner %q: %w", typ, resp.Diagnostics.Err())
}
return resp.Provisioner, nil
return schema, nil
}

View File

@@ -31,17 +31,16 @@ func simpleMockPluginLibrary() *contextPlugins {
// factory into real code under test.
provider := simpleMockProvider()
provisioner := simpleMockProvisioner()
ret := &contextPlugins{
providerFactories: map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("test"): func() (providers.Interface, error) {
return provider, nil
},
},
provisionerFactories: map[string]provisioners.Factory{
"test": func() (provisioners.Interface, error) {
return provisioner, nil
},
},
ret, diags := newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("test"): func() (providers.Interface, error) {
return provider, nil
}}, map[string]provisioners.Factory{
"test": func() (provisioners.Interface, error) {
return provisioner, nil
}},
)
if diags.HasErrors() {
panic(diags.Err())
}
return ret
}

View File

@@ -185,7 +185,7 @@ func (c *BuiltinEvalContext) Provider(_ context.Context, addr addrs.AbsProviderC
}
func (c *BuiltinEvalContext) ProviderSchema(ctx context.Context, addr addrs.AbsProviderConfig) (providers.ProviderSchema, error) {
return c.Plugins.ProviderSchema(ctx, addr.Provider)
return c.Plugins.ProviderSchema(addr.Provider)
}
func (c *BuiltinEvalContext) CloseProvider(ctx context.Context, addr addrs.AbsProviderConfig) error {

View File

@@ -12,6 +12,7 @@ import (
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/tfdiags"
"github.com/zclconf/go-cty/cty"
)
@@ -64,9 +65,13 @@ func TestBuildingEvalContextInitProvider(t *testing.T) {
ctx = ctx.WithPath(addrs.RootModuleInstance).(*BuiltinEvalContext)
ctx.ProviderLock = &lock
ctx.ProviderCache = make(map[string]map[addrs.InstanceKey]providers.Interface)
ctx.Plugins = newContextPlugins(map[addrs.Provider]providers.Factory{
var diags tfdiags.Diagnostics
ctx.Plugins, diags = newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("test"): providers.FactoryFixed(testP),
}, nil)
if diags.HasErrors() {
t.Fatal(diags.Err())
}
providerAddrDefault := addrs.AbsProviderConfig{
Module: addrs.RootModule,

View File

@@ -1015,7 +1015,7 @@ func (d *evaluationStateData) GetResource(ctx context.Context, addr addrs.Resour
func (d *evaluationStateData) getResourceSchema(ctx context.Context, addr addrs.Resource, providerAddr addrs.Provider) *configschema.Block {
// TODO: Plumb a useful context.Context through to here.
schema, _, err := d.Evaluator.Plugins.ResourceTypeSchema(ctx, providerAddr, addr.Mode, addr.Type)
schema, _, err := d.Evaluator.Plugins.ResourceTypeSchema(providerAddr, addr.Mode, addr.Type)
if err != nil {
// We have plenty of other codepaths that will detect and report
// schema lookup errors before we'd reach this point, so we'll just

View File

@@ -234,7 +234,7 @@ func (d *evaluationStateData) staticValidateResourceReference(ctx context.Contex
// TODO: Plugin a suitable context.Context through to here.
providerFqn := modCfg.Module.ProviderForLocalConfig(cfg.ProviderConfigAddr())
schema, _, err := d.Evaluator.Plugins.ResourceTypeSchema(ctx, providerFqn, addr.Mode, addr.Type)
schema, _, err := d.Evaluator.Plugins.ResourceTypeSchema(providerFqn, addr.Mode, addr.Type)
if err != nil {
// Prior validation should've taken care of a schema lookup error,
// so we should never get here but we'll handle it here anyway for

View File

@@ -780,9 +780,12 @@ func TestApplyGraphBuilder_withChecks(t *testing.T) {
},
}
plugins := newContextPlugins(map[addrs.Provider]providers.Factory{
plugins, diags := newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("aws"): providers.FactoryFixed(awsProvider),
}, nil)
if diags.HasErrors() {
t.Fatal(diags.Err())
}
b := &ApplyGraphBuilder{
Config: testModule(t, "apply-with-checks"),

View File

@@ -12,6 +12,7 @@ import (
"github.com/google/go-cmp/cmp"
"github.com/opentofu/opentofu/internal/configs"
"github.com/opentofu/opentofu/internal/states"
"github.com/opentofu/opentofu/internal/tfdiags"
"github.com/zclconf/go-cty/cty"
"github.com/opentofu/opentofu/internal/addrs"
@@ -35,10 +36,13 @@ func TestPlanGraphBuilder(t *testing.T) {
},
}
openstackProvider := mockProviderWithResourceTypeSchema("openstack_floating_ip", simpleTestSchema())
plugins := newContextPlugins(map[addrs.Provider]providers.Factory{
plugins, diags := newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("aws"): providers.FactoryFixed(awsProvider),
addrs.NewDefaultProvider("openstack"): providers.FactoryFixed(openstackProvider),
}, nil)
if diags.HasErrors() {
t.Fatal(diags.Err())
}
b := &PlanGraphBuilder{
Config: testModule(t, "graph-builder-plan-basic"),
@@ -79,9 +83,12 @@ func TestPlanGraphBuilder_dynamicBlock(t *testing.T) {
},
},
})
plugins := newContextPlugins(map[addrs.Provider]providers.Factory{
plugins, diags := newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("test"): providers.FactoryFixed(provider),
}, nil)
if diags.HasErrors() {
t.Fatal(diags.Err())
}
b := &PlanGraphBuilder{
Config: testModule(t, "graph-builder-plan-dynblock"),
@@ -135,9 +142,12 @@ func TestPlanGraphBuilder_attrAsBlocks(t *testing.T) {
},
},
})
plugins := newContextPlugins(map[addrs.Provider]providers.Factory{
plugins, diags := newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("test"): providers.FactoryFixed(provider),
}, nil)
if diags.HasErrors() {
t.Fatal(diags.Err())
}
b := &PlanGraphBuilder{
Config: testModule(t, "graph-builder-plan-attr-as-blocks"),
@@ -221,9 +231,12 @@ func TestPlanGraphBuilder_excludeModule(t *testing.T) {
func TestPlanGraphBuilder_forEach(t *testing.T) {
awsProvider := mockProviderWithResourceTypeSchema("aws_instance", simpleTestSchema())
plugins := newContextPlugins(map[addrs.Provider]providers.Factory{
plugins, diags := newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("aws"): providers.FactoryFixed(awsProvider),
}, nil)
if diags.HasErrors() {
t.Fatal(diags.Err())
}
b := &PlanGraphBuilder{
Config: testModule(t, "plan-for-each"),
@@ -256,9 +269,6 @@ func TestPlanGraphBuilder_ephemeralResourceDestroy(t *testing.T) {
b := &PlanGraphBuilder{
Config: &configs.Config{Module: &configs.Module{}},
Operation: walkPlanDestroy,
Plugins: newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("aws"): providers.FactoryFixed(awsProvider),
}, nil),
State: &states.State{
Modules: map[string]*states.Module{
"": {
@@ -280,6 +290,14 @@ func TestPlanGraphBuilder_ephemeralResourceDestroy(t *testing.T) {
},
}
var diags tfdiags.Diagnostics
b.Plugins, diags = newContextPlugins(map[addrs.Provider]providers.Factory{
addrs.NewDefaultProvider("aws"): providers.FactoryFixed(awsProvider),
}, nil)
if diags.HasErrors() {
t.Fatal(diags.Err())
}
g, err := b.Build(t.Context(), addrs.RootModuleInstance)
if err != nil {
t.Fatalf("err: %s", err)
@@ -304,7 +322,7 @@ func TestPlanGraphBuilder_ephemeralResourceDestroy(t *testing.T) {
evalCtx := &MockEvalContext{
ProviderProvider: testProvider("aws"),
}
diags := found.Execute(t.Context(), evalCtx, walkPlanDestroy)
diags = found.Execute(t.Context(), evalCtx, walkPlanDestroy)
got := diags.Err().Error()
want := `An ephemeral resource planned for destroy: A destroy operation has been planned for the ephemeral resource "ephemeral.aws_secretmanager_secret.test". This is an OpenTofu error. Please report this.`
if got != want {

View File

@@ -12,10 +12,9 @@ import (
"sync"
"github.com/opentofu/opentofu/internal/addrs"
"github.com/opentofu/opentofu/internal/configs"
"github.com/opentofu/opentofu/internal/configs/configschema"
"github.com/opentofu/opentofu/internal/providers"
"github.com/opentofu/opentofu/internal/states"
"github.com/opentofu/opentofu/internal/provisioners"
"github.com/opentofu/opentofu/internal/tfdiags"
)
@@ -71,13 +70,13 @@ func (ss *Schemas) ProvisionerConfig(name string) *configschema.Block {
// either misbehavior on the part of one of the providers or of the provider
// protocol itself. When returned with errors, the returned schemas object is
// still valid but may be incomplete.
func loadSchemas(ctx context.Context, config *configs.Config, state *states.State, plugins *contextPlugins) (*Schemas, error) {
func loadSchemas(ctx context.Context, providerFactories map[addrs.Provider]providers.Factory, provisionerFactories map[string]provisioners.Factory) (*Schemas, error) {
var diags tfdiags.Diagnostics
provisioners, provisionerDiags := loadProvisionerSchemas(ctx, config, plugins)
provisioners, provisionerDiags := loadProvisionerSchemas(ctx, provisionerFactories)
diags = diags.Append(provisionerDiags)
providers, providerDiags := loadProviderSchemas(ctx, config, state, plugins)
providers, providerDiags := loadProviderSchemas(ctx, providerFactories)
diags = diags.Append(providerDiags)
return &Schemas{
@@ -86,80 +85,75 @@ func loadSchemas(ctx context.Context, config *configs.Config, state *states.Stat
}, diags.Err()
}
func loadProviderSchemas(ctx context.Context, config *configs.Config, state *states.State, plugins *contextPlugins) (map[addrs.Provider]providers.ProviderSchema, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
func loadProviderSchemas(ctx context.Context, providerFactories map[addrs.Provider]providers.Factory) (map[addrs.Provider]providers.ProviderSchema, tfdiags.Diagnostics) {
var lock sync.Mutex
schemas := map[addrs.Provider]providers.ProviderSchema{}
if config != nil {
for _, fqn := range config.ProviderTypes() {
schemas[fqn] = providers.ProviderSchema{}
}
}
if state != nil {
needed := providers.AddressedTypesAbs(state.ProviderAddrs())
for _, fqn := range needed {
schemas[fqn] = providers.ProviderSchema{}
}
}
var diags tfdiags.Diagnostics
var wg sync.WaitGroup
var lock sync.Mutex
lock.Lock() // Prevent anything from started until we have finished schema map reads
for fqn := range schemas {
for fqn, factory := range providerFactories {
wg.Go(func() {
log.Printf("[TRACE] LoadSchemas: retrieving schema for provider type %q", fqn.String())
schema, err := plugins.ProviderSchema(ctx, fqn)
log.Printf("[TRACE] loadProviderSchemas: retrieving schema for provider type %q", fqn.String())
// Heavy lifting
schema, err := func() (providers.ProviderSchema, error) {
log.Printf("[TRACE] loadProviderSchemas: Initializing provider %q to read its schema", fqn)
provider, err := factory()
if err != nil {
return providers.ProviderSchema{}, fmt.Errorf("failed to instantiate provider %q to obtain schema: %w", fqn, err)
}
defer provider.Close(ctx)
resp := providers.ProviderSchema(provider.GetProviderSchema(ctx))
if resp.Diagnostics.HasErrors() {
return resp, fmt.Errorf("failed to retrieve schema from provider %q: %w", fqn, resp.Diagnostics.Err())
}
if err := resp.Validate(fqn); err != nil {
return resp, err
}
return resp, nil
}()
// Ensure that we don't race on diags or schemas now that the hard work is done
lock.Lock()
defer lock.Unlock()
if err != nil {
diags = diags.Append(err)
return
}
schemas[fqn] = schema
diags = diags.Append(err)
})
}
// Allow execution to start now that reading of schemas map has completed
lock.Unlock()
// Wait for all of the scheduled routines to complete
wg.Wait()
return schemas, diags
}
func loadProvisionerSchemas(ctx context.Context, config *configs.Config, plugins *contextPlugins) (map[string]*configschema.Block, tfdiags.Diagnostics) {
func loadProvisionerSchemas(ctx context.Context, provisioners map[string]provisioners.Factory) (map[string]*configschema.Block, tfdiags.Diagnostics) {
var diags tfdiags.Diagnostics
schemas := map[string]*configschema.Block{}
// Determine the full list of provisioners recursively
var addProvisionersToSchema func(config *configs.Config)
addProvisionersToSchema = func(config *configs.Config) {
if config == nil {
return
}
for _, rc := range config.Module.ManagedResources {
for _, pc := range rc.Managed.Provisioners {
schemas[pc.Type] = &configschema.Block{}
}
}
// Must also visit our child modules, recursively.
for _, cc := range config.Children {
addProvisionersToSchema(cc)
}
}
addProvisionersToSchema(config)
// Populate the schema entries
for name := range schemas {
log.Printf("[TRACE] LoadSchemas: retrieving schema for provisioner %q", name)
schema, err := plugins.ProvisionerSchema(name)
for name, factory := range provisioners {
log.Printf("[TRACE] loadProvisionerSchemas: retrieving schema for provisioner %q", name)
schema, err := func() (*configschema.Block, error) {
log.Printf("[TRACE] loadProvisionerSchemas: Initializing provisioner %q to read its schema", name)
provisioner, err := factory()
if err != nil {
return nil, fmt.Errorf("failed to instantiate provisioner %q to obtain schema: %w", name, err)
}
defer provisioner.Close()
resp := provisioner.GetSchema()
if resp.Diagnostics.HasErrors() {
return nil, fmt.Errorf("failed to retrieve schema from provisioner %q: %w", name, resp.Diagnostics.Err())
}
return resp.Provisioner, nil
}()
if err != nil {
// We'll put a stub in the map so we won't re-attempt this on
// future calls, which would then repeat the same error message

View File

@@ -312,5 +312,9 @@ func schemaOnlyProvidersForTesting(schemas map[addrs.Provider]providers.Provider
}
}
return newContextPlugins(factories, nil)
cp, diags := newContextPlugins(factories, nil)
if diags.HasErrors() {
t.Fatal(diags.Err())
}
return cp
}

View File

@@ -69,7 +69,7 @@ func (t *AttachSchemaTransformer) Transform(ctx context.Context, g *Graph) error
providerFqn := tv.Provider()
// TODO: Plumb a useful context.Context through to here.
schema, version, err := t.Plugins.ResourceTypeSchema(ctx, providerFqn, mode, typeName)
schema, version, err := t.Plugins.ResourceTypeSchema(providerFqn, mode, typeName)
if err != nil {
return fmt.Errorf("failed to read schema for %s in %s: %w", addr, providerFqn, err)
}
@@ -84,7 +84,7 @@ func (t *AttachSchemaTransformer) Transform(ctx context.Context, g *Graph) error
if tv, ok := v.(GraphNodeAttachProviderConfigSchema); ok {
providerAddr := tv.ProviderAddr()
// TODO: Plumb a useful context.Context through to here.
schema, err := t.Plugins.ProviderConfigSchema(ctx, providerAddr.Provider)
schema, err := t.Plugins.ProviderConfigSchema(providerAddr.Provider)
if err != nil {
return fmt.Errorf("failed to read provider configuration schema for %s: %w", providerAddr.Provider, err)
}