mirror of
https://github.com/opentffoundation/opentf.git
synced 2026-02-21 20:00:43 -05:00
Clean up schema cache logic and fix provisioner closing
Signed-off-by: Christian Mesh <christianmesh1@gmail.com>
This commit is contained in:
@@ -25,27 +25,36 @@ type Library interface {
|
||||
func NewLibrary(providerFactories ProviderFactories, provisionerFactories ProvisionerFactories) Library {
|
||||
return &library{
|
||||
providerFactories: providerFactories,
|
||||
providerSchemas: map[addrs.Provider]providerSchemaEntry{},
|
||||
providerSchemas: map[addrs.Provider]*providerSchemaEntry{},
|
||||
|
||||
provisionerFactories: provisionerFactories,
|
||||
provisionerSchemas: map[string]provisionerSchemaEntry{},
|
||||
provisionerSchemas: map[string]*provisionerSchemaEntry{},
|
||||
}
|
||||
}
|
||||
|
||||
type providerSchemaResult struct {
|
||||
type providerSchemaEntry struct {
|
||||
sync.Mutex
|
||||
populated bool
|
||||
|
||||
schema providers.ProviderSchema
|
||||
diags tfdiags.Diagnostics
|
||||
}
|
||||
type providerSchemaEntry func() providerSchemaResult
|
||||
type provisionerSchemaEntry func() (*configschema.Block, error)
|
||||
|
||||
type provisionerSchemaEntry struct {
|
||||
sync.Mutex
|
||||
populated bool
|
||||
|
||||
schema *configschema.Block
|
||||
err error
|
||||
}
|
||||
|
||||
type library struct {
|
||||
providerSchemasLock sync.Mutex
|
||||
providerSchemas map[addrs.Provider]providerSchemaEntry
|
||||
providerSchemas map[addrs.Provider]*providerSchemaEntry
|
||||
providerFactories ProviderFactories
|
||||
|
||||
provisionerSchemasLock sync.Mutex
|
||||
provisionerSchemas map[string]provisionerSchemaEntry
|
||||
provisionerSchemas map[string]*provisionerSchemaEntry
|
||||
provisionerFactories ProvisionerFactories
|
||||
}
|
||||
|
||||
|
||||
@@ -67,37 +67,18 @@ func (p *providerManager) HasProvider(addr addrs.Provider) bool {
|
||||
}
|
||||
|
||||
func (p *providerManager) GetProviderSchema(ctx context.Context, addr addrs.Provider) (providers.ProviderSchema, tfdiags.Diagnostics) {
|
||||
if p.closed {
|
||||
// It's technically possible, but highly unlikely that a manager could be closed while fetching the schema
|
||||
// In that scenario, we will start and then stop the corresponding provider internally to this function and not
|
||||
// interfere with the set of known instances.
|
||||
return providers.ProviderSchema{}, tfdiags.Diagnostics{}.Append(fmt.Errorf("bug: unable to start provider %s, manager is closed", addr))
|
||||
}
|
||||
|
||||
// Coarse lock only for ensuring that a valid entry exists
|
||||
p.providerSchemasLock.Lock()
|
||||
entry, ok := p.providerSchemas[addr]
|
||||
if !ok {
|
||||
entry = sync.OnceValue(func() providerSchemaResult {
|
||||
log.Printf("[TRACE] plugins.providerManager Initializing provider %q to read its schema", addr)
|
||||
|
||||
var diags tfdiags.Diagnostics
|
||||
|
||||
if p.closed {
|
||||
return providerSchemaResult{diags: tfdiags.Diagnostics{}.Append(fmt.Errorf("bug: unable to start provider %s, manager is closed", addr))}
|
||||
}
|
||||
|
||||
provider, err := p.providerFactories.NewInstance(addr)
|
||||
if err != nil {
|
||||
return providerSchemaResult{diags: diags.Append(fmt.Errorf("failed to instantiate provider %q to obtain schema: %w", addr, err))}
|
||||
}
|
||||
|
||||
schema := provider.GetProviderSchema(ctx)
|
||||
diags = diags.Append(schema.Diagnostics)
|
||||
if diags.HasErrors() {
|
||||
return providerSchemaResult{schema, diags}
|
||||
}
|
||||
|
||||
err = schema.Validate(addr)
|
||||
if err != nil {
|
||||
diags = diags.Append(err)
|
||||
}
|
||||
|
||||
return providerSchemaResult{schema, diags}
|
||||
})
|
||||
entry = &providerSchemaEntry{}
|
||||
p.providerSchemas[addr] = entry
|
||||
}
|
||||
// This lock is only for access to the map. We don't need to hold the lock when calling
|
||||
@@ -106,8 +87,33 @@ func (p *providerManager) GetProviderSchema(ctx context.Context, addr addrs.Prov
|
||||
// and we want to release as soon as possible for multiple concurrent callers of different providers
|
||||
p.providerSchemasLock.Unlock()
|
||||
|
||||
result := entry()
|
||||
return result.schema, result.diags
|
||||
entry.Lock()
|
||||
defer entry.Unlock()
|
||||
|
||||
if !entry.populated {
|
||||
log.Printf("[TRACE] plugins.providerManager Initializing provider %q to read its schema", addr)
|
||||
|
||||
provider, err := p.providerFactories.NewInstance(addr)
|
||||
if err != nil {
|
||||
// Might be a transient error. Don't memoize this result
|
||||
return providers.ProviderSchema{}, tfdiags.Diagnostics{}.Append(fmt.Errorf("failed to instantiate provider %q to obtain schema: %w", addr, err))
|
||||
}
|
||||
// TODO consider using the p.NewProvider(ctx, addr) call once we have a clear
|
||||
// .Close() call for all usages of the provider manager
|
||||
defer provider.Close(context.WithoutCancel(ctx))
|
||||
|
||||
entry.schema = provider.GetProviderSchema(ctx)
|
||||
entry.diags = entry.diags.Append(entry.schema.Diagnostics)
|
||||
entry.populated = true
|
||||
|
||||
if !entry.diags.HasErrors() {
|
||||
// Validate only if GetProviderSchema succeeded
|
||||
err := entry.schema.Validate(addr)
|
||||
entry.diags = entry.diags.Append(err)
|
||||
}
|
||||
}
|
||||
|
||||
return entry.schema, entry.diags
|
||||
}
|
||||
|
||||
func (p *providerManager) NewProvider(ctx context.Context, addr addrs.Provider) (providers.Interface, tfdiags.Diagnostics) {
|
||||
|
||||
@@ -47,6 +47,9 @@ type provisionerManager struct {
|
||||
|
||||
instancesLock sync.Mutex
|
||||
instances map[string]provisioners.Interface
|
||||
|
||||
// TODO handle closed
|
||||
closed bool
|
||||
}
|
||||
|
||||
func (l *library) NewProvisionerManager() ProvisionerManager {
|
||||
@@ -64,6 +67,10 @@ func (p *provisionerManager) provisioner(typ string) (provisioners.Interface, er
|
||||
p.instancesLock.Lock()
|
||||
defer p.instancesLock.Unlock()
|
||||
|
||||
if p.closed {
|
||||
return nil, fmt.Errorf("bug: unable to start provisioner %s, manager is closed", typ)
|
||||
}
|
||||
|
||||
instance, ok := p.instances[typ]
|
||||
if !ok {
|
||||
var err error
|
||||
@@ -88,20 +95,7 @@ func (p *provisionerManager) ProvisionerSchema(typ string) (*configschema.Block,
|
||||
p.provisionerSchemasLock.Lock()
|
||||
entry, ok := p.provisionerSchemas[typ]
|
||||
if !ok {
|
||||
entry = sync.OnceValues(func() (*configschema.Block, error) {
|
||||
log.Printf("[TRACE] Initializing provisioner %q to read its schema", typ)
|
||||
provisioner, err := p.provisioner(typ)
|
||||
if err != nil {
|
||||
return nil, fmt.Errorf("failed to instantiate provisioner %q to obtain schema: %w", typ, err)
|
||||
}
|
||||
defer provisioner.Close()
|
||||
|
||||
resp := provisioner.GetSchema()
|
||||
if resp.Diagnostics.HasErrors() {
|
||||
return nil, fmt.Errorf("failed to retrieve schema from provisioner %q: %w", typ, resp.Diagnostics.Err())
|
||||
}
|
||||
return resp.Provisioner, nil
|
||||
})
|
||||
entry = &provisionerSchemaEntry{}
|
||||
p.provisionerSchemas[typ] = entry
|
||||
}
|
||||
// This lock is only for access to the map. We don't need to hold the lock when calling
|
||||
@@ -110,7 +104,27 @@ func (p *provisionerManager) ProvisionerSchema(typ string) (*configschema.Block,
|
||||
// and we want to release as soon as possible for multiple concurrent callers of different provisioners
|
||||
p.provisionerSchemasLock.Unlock()
|
||||
|
||||
return entry()
|
||||
if !entry.populated {
|
||||
log.Printf("[TRACE] Initializing provisioner %q to read its schema", typ)
|
||||
provisioner, err := p.provisionerFactories.NewInstance(typ)
|
||||
if err != nil {
|
||||
// Might be a transient error. Don't memoize this result
|
||||
return nil, fmt.Errorf("failed to instantiate provisioner %q to obtain schema: %w", typ, err)
|
||||
}
|
||||
// TODO consider using the p.provisioner(typ) call once we have a clear
|
||||
// .Close() call for all usages of the provisioner manager
|
||||
defer provisioner.Close()
|
||||
|
||||
resp := provisioner.GetSchema()
|
||||
|
||||
entry.populated = true
|
||||
entry.schema = resp.Provisioner
|
||||
if resp.Diagnostics.HasErrors() {
|
||||
entry.err = fmt.Errorf("failed to retrieve schema from provisioner %q: %w", typ, resp.Diagnostics.Err())
|
||||
}
|
||||
}
|
||||
|
||||
return entry.schema, entry.err
|
||||
}
|
||||
|
||||
func (p *provisionerManager) ValidateProvisionerConfig(ctx context.Context, typ string, config cty.Value) tfdiags.Diagnostics {
|
||||
@@ -139,6 +153,8 @@ func (p *provisionerManager) Close() error {
|
||||
p.instancesLock.Lock()
|
||||
defer p.instancesLock.Unlock()
|
||||
|
||||
p.closed = true
|
||||
|
||||
var diags tfdiags.Diagnostics
|
||||
for name, prov := range p.instances {
|
||||
err := prov.Close()
|
||||
@@ -146,7 +162,6 @@ func (p *provisionerManager) Close() error {
|
||||
diags = diags.Append(fmt.Errorf("provisioner.Close %s: %w", name, err))
|
||||
}
|
||||
}
|
||||
clear(p.instances)
|
||||
return diags.Err()
|
||||
}
|
||||
|
||||
@@ -161,6 +176,5 @@ func (p *provisionerManager) Stop() error {
|
||||
diags = diags.Append(fmt.Errorf("provisioner.Stop %s: %w", name, err))
|
||||
}
|
||||
}
|
||||
clear(p.instances)
|
||||
return diags.Err()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user