mirror of
https://github.com/opentffoundation/opentf.git
synced 2025-12-20 02:09:26 -05:00
Continuing our work to gradually plumb context.Context to everywhere that we want to generate OpenTelemetry traces, this completes the call path for most (but not all) of the gRPC requests to provider plugins, so that we can add OpenTelemetry trace instrumentation in a future commit. Unfortunately there are still a few providers.Interface callers left in functions that don't have context.Context plumbed to them yet, and so those are temporarily stubbed as context.TODO() here so we can more easily find and complete them later. The two gRPC implementations of providers.Interface were previously making provider requests using a single context.Context established at the time the provider process was started, but that isn't an appropriate context to use for per-request concerns like tracing, so that context is now unused and could potentially be removed in a future commit, but this change already got pretty large and so I intend to deal with that separately later. This now exposes the gRPC provider calls to potential context cancellation that they would previously observe only indirectly though the Stop method. Since Stop is primarily used for graceful shutdown of ApplyResourceChange, the changes here explicitly disconnect the cancellation signal for ApplyResourceChange in particular, while letting the others get canceled in the normal way since they are expected to be free of significant side-effects. In future work we could consider removing Stop from the internal API entirely and keeping it only as an implementation detail of the gRPC implementation of this interface, with ApplyResourceChange directly reacting to context cancellation and sending the gRPC Stop call itself, but again that's too much change for this already-large commit. The internal/legacy package currently contains some legacy code preserved for the benefit of the backends, and unfortunately contains more than is strictly necessary to support those callers, and so there was some dead code there that also needed updating. provider_mock.go is removed entirely because it's just an older copy of the similar file in package tofu. The few calls to providers in schemas.go are updated to use context.Background() rather than context.TODO() because we have no intention of plumbing context.Context into that legacy code, and will hopefully just delete it wholesale one day. Signed-off-by: Martin Atkins <mart@degeneration.co.uk>
934 lines
29 KiB
Go
934 lines
29 KiB
Go
// Copyright (c) The OpenTofu Authors
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
// Copyright (c) 2023 HashiCorp, Inc.
|
|
// SPDX-License-Identifier: MPL-2.0
|
|
|
|
package plugin
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/zclconf/go-cty/cty"
|
|
|
|
plugin "github.com/hashicorp/go-plugin"
|
|
ctyjson "github.com/zclconf/go-cty/cty/json"
|
|
"github.com/zclconf/go-cty/cty/msgpack"
|
|
"google.golang.org/grpc"
|
|
|
|
"github.com/opentofu/opentofu/internal/addrs"
|
|
"github.com/opentofu/opentofu/internal/logging"
|
|
"github.com/opentofu/opentofu/internal/plugin/convert"
|
|
"github.com/opentofu/opentofu/internal/providers"
|
|
proto "github.com/opentofu/opentofu/internal/tfplugin5"
|
|
)
|
|
|
|
var logger = logging.HCLogger()
|
|
|
|
// GRPCProviderPlugin implements plugin.GRPCPlugin for the go-plugin package.
|
|
type GRPCProviderPlugin struct {
|
|
plugin.Plugin
|
|
GRPCProvider func() proto.ProviderServer
|
|
}
|
|
|
|
func (p *GRPCProviderPlugin) GRPCClient(ctx context.Context, broker *plugin.GRPCBroker, c *grpc.ClientConn) (interface{}, error) {
|
|
return &GRPCProvider{
|
|
client: proto.NewProviderClient(c),
|
|
ctx: ctx,
|
|
}, nil
|
|
}
|
|
|
|
func (p *GRPCProviderPlugin) GRPCServer(broker *plugin.GRPCBroker, s *grpc.Server) error {
|
|
proto.RegisterProviderServer(s, p.GRPCProvider())
|
|
return nil
|
|
}
|
|
|
|
// GRPCProvider handles the client, or core side of the plugin rpc connection.
|
|
// The GRPCProvider methods are mostly a translation layer between the
|
|
// tofu providers types and the grpc proto types, directly converting
|
|
// between the two.
|
|
type GRPCProvider struct {
|
|
// PluginClient provides a reference to the plugin.Client which controls the plugin process.
|
|
// This allows the GRPCProvider a way to shutdown the plugin process.
|
|
PluginClient *plugin.Client
|
|
|
|
// TestServer contains a grpc.Server to close when the GRPCProvider is being
|
|
// used in an end to end test of a provider.
|
|
TestServer *grpc.Server
|
|
|
|
// Addr uniquely identifies the type of provider.
|
|
// Normally executed providers will have this set during initialization,
|
|
// but it may not always be available for alternative execute modes.
|
|
Addr addrs.Provider
|
|
|
|
// Proto client use to make the grpc service calls.
|
|
client proto.ProviderClient
|
|
|
|
// this context is created by the plugin package, and is canceled when the
|
|
// plugin process ends.
|
|
//
|
|
// THIS IS NOT THE RIGHT CONTEXT TO USE FOR GRPC CALLS! This represents
|
|
// the overall context in which the plugin was launched and represents the
|
|
// full runtime of the plugin process, whereas the [context.Context] passed
|
|
// to individual [providers.Interface] methods is a more tightly-scoped
|
|
// context focused on each individual request, and so is more appropriate
|
|
// to use as the parent context for gRPC API calls.
|
|
ctx context.Context
|
|
|
|
mu sync.Mutex
|
|
// schema stores the schema for this provider. This is used to properly
|
|
// serialize the requests for schemas.
|
|
schema providers.GetProviderSchemaResponse
|
|
}
|
|
|
|
var _ providers.Interface = new(GRPCProvider)
|
|
|
|
func (p *GRPCProvider) GetProviderSchema(ctx context.Context) (resp providers.GetProviderSchemaResponse) {
|
|
logger.Trace("GRPCProvider: GetProviderSchema")
|
|
p.mu.Lock()
|
|
defer p.mu.Unlock()
|
|
|
|
// First, we check the global cache.
|
|
// The cache could contain this schema if an instance of this provider has previously been started.
|
|
if !p.Addr.IsZero() {
|
|
// Even if the schema is cached, GetProviderSchemaOptional could be false. This would indicate that once instantiated,
|
|
// this provider requires the get schema call to be made at least once, as it handles part of the provider's setup.
|
|
// At this point, we don't know if this is the first call to a provider instance or not, so we don't use the result in that case.
|
|
if schemaCached, ok := providers.SchemaCache.Get(p.Addr); ok && schemaCached.ServerCapabilities.GetProviderSchemaOptional {
|
|
logger.Trace("GRPCProvider: GetProviderSchema: serving from global schema cache", "address", p.Addr)
|
|
return schemaCached
|
|
}
|
|
}
|
|
|
|
// If the local cache is non-zero, we know this instance has called
|
|
// GetProviderSchema at least once, so has satisfied the possible requirement of `GetProviderSchemaOptional=false`.
|
|
// This means that we can return early now using the locally cached schema, without making this call again.
|
|
if p.schema.Provider.Block != nil {
|
|
return p.schema
|
|
}
|
|
|
|
resp.ResourceTypes = make(map[string]providers.Schema)
|
|
resp.DataSources = make(map[string]providers.Schema)
|
|
resp.Functions = make(map[string]providers.FunctionSpec)
|
|
|
|
// Some providers may generate quite large schemas, and the internal default
|
|
// grpc response size limit is 4MB. 64MB should cover most any use case, and
|
|
// if we get providers nearing that we may want to consider a finer-grained
|
|
// API to fetch individual resource schemas.
|
|
// Note: this option is marked as EXPERIMENTAL in the grpc API. We keep
|
|
// this for compatibility, but recent providers all set the max message
|
|
// size much higher on the server side, which is the supported method for
|
|
// determining payload size.
|
|
const maxRecvSize = 64 << 20
|
|
protoResp, err := p.client.GetSchema(ctx, new(proto.GetProviderSchema_Request), grpc.MaxRecvMsgSizeCallOption{MaxRecvMsgSize: maxRecvSize})
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
|
|
if resp.Diagnostics.HasErrors() {
|
|
return resp
|
|
}
|
|
|
|
if protoResp.Provider == nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(errors.New("missing provider schema"))
|
|
return resp
|
|
}
|
|
|
|
resp.Provider = convert.ProtoToProviderSchema(protoResp.Provider)
|
|
if protoResp.ProviderMeta == nil {
|
|
logger.Debug("No provider meta schema returned")
|
|
} else {
|
|
resp.ProviderMeta = convert.ProtoToProviderSchema(protoResp.ProviderMeta)
|
|
}
|
|
|
|
for name, res := range protoResp.ResourceSchemas {
|
|
resp.ResourceTypes[name] = convert.ProtoToProviderSchema(res)
|
|
}
|
|
|
|
for name, data := range protoResp.DataSourceSchemas {
|
|
resp.DataSources[name] = convert.ProtoToProviderSchema(data)
|
|
}
|
|
|
|
for name, fn := range protoResp.Functions {
|
|
resp.Functions[name] = convert.ProtoToFunctionSpec(fn)
|
|
}
|
|
|
|
if protoResp.ServerCapabilities != nil {
|
|
resp.ServerCapabilities.PlanDestroy = protoResp.ServerCapabilities.PlanDestroy
|
|
resp.ServerCapabilities.GetProviderSchemaOptional = protoResp.ServerCapabilities.GetProviderSchemaOptional
|
|
}
|
|
|
|
// Set the global provider cache so that future calls to this provider can use the cached value.
|
|
// Crucially, this doesn't look at GetProviderSchemaOptional, because the layers above could use this cache
|
|
// *without* creating an instance of this provider. And if there is no instance,
|
|
// then we don't need to set up anything (cause there is nothing to set up), so we need no call
|
|
// to the providers GetSchema rpc.
|
|
if !p.Addr.IsZero() {
|
|
providers.SchemaCache.Set(p.Addr, resp)
|
|
}
|
|
|
|
// Always store this here in the client for providers that are not able to use GetProviderSchemaOptional.
|
|
// Crucially, this indicates that we've made at least one call to GetProviderSchema to this instance of the provider,
|
|
// which means in the future we'll be able to return using this cache
|
|
// (because the possible setup contained in the GetProviderSchema call has happened).
|
|
// If GetProviderSchemaOptional is true then this cache won't actually ever be used, because the calls to this method
|
|
// will be satisfied by the global provider cache.
|
|
p.schema = resp
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) ValidateProviderConfig(ctx context.Context, r providers.ValidateProviderConfigRequest) (resp providers.ValidateProviderConfigResponse) {
|
|
logger.Trace("GRPCProvider: ValidateProviderConfig")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
ty := schema.Provider.Block.ImpliedType()
|
|
|
|
mp, err := msgpack.Marshal(r.Config, ty)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.PrepareProviderConfig_Request{
|
|
Config: &proto.DynamicValue{Msgpack: mp},
|
|
}
|
|
|
|
protoResp, err := p.client.PrepareProviderConfig(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
|
|
config, err := decodeDynamicValue(protoResp.PreparedConfig, ty)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
resp.PreparedConfig = config
|
|
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) ValidateResourceConfig(ctx context.Context, r providers.ValidateResourceConfigRequest) (resp providers.ValidateResourceConfigResponse) {
|
|
logger.Trace("GRPCProvider: ValidateResourceConfig")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
resourceSchema, ok := schema.ResourceTypes[r.TypeName]
|
|
if !ok {
|
|
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown resource type %q", r.TypeName))
|
|
return resp
|
|
}
|
|
|
|
mp, err := msgpack.Marshal(r.Config, resourceSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.ValidateResourceTypeConfig_Request{
|
|
TypeName: r.TypeName,
|
|
Config: &proto.DynamicValue{Msgpack: mp},
|
|
}
|
|
|
|
protoResp, err := p.client.ValidateResourceTypeConfig(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) ValidateDataResourceConfig(ctx context.Context, r providers.ValidateDataResourceConfigRequest) (resp providers.ValidateDataResourceConfigResponse) {
|
|
logger.Trace("GRPCProvider: ValidateDataResourceConfig")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
dataSchema, ok := schema.DataSources[r.TypeName]
|
|
if !ok {
|
|
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown data source %q", r.TypeName))
|
|
return resp
|
|
}
|
|
|
|
mp, err := msgpack.Marshal(r.Config, dataSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.ValidateDataSourceConfig_Request{
|
|
TypeName: r.TypeName,
|
|
Config: &proto.DynamicValue{Msgpack: mp},
|
|
}
|
|
|
|
protoResp, err := p.client.ValidateDataSourceConfig(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) UpgradeResourceState(ctx context.Context, r providers.UpgradeResourceStateRequest) (resp providers.UpgradeResourceStateResponse) {
|
|
logger.Trace("GRPCProvider: UpgradeResourceState")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
resSchema, ok := schema.ResourceTypes[r.TypeName]
|
|
if !ok {
|
|
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown resource type %q", r.TypeName))
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.UpgradeResourceState_Request{
|
|
TypeName: r.TypeName,
|
|
Version: r.Version,
|
|
RawState: &proto.RawState{
|
|
Json: r.RawStateJSON,
|
|
Flatmap: r.RawStateFlatmap,
|
|
},
|
|
}
|
|
|
|
protoResp, err := p.client.UpgradeResourceState(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
|
|
ty := resSchema.Block.ImpliedType()
|
|
resp.UpgradedState = cty.NullVal(ty)
|
|
if protoResp.UpgradedState == nil {
|
|
return resp
|
|
}
|
|
|
|
state, err := decodeDynamicValue(protoResp.UpgradedState, ty)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
resp.UpgradedState = state
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) ConfigureProvider(ctx context.Context, r providers.ConfigureProviderRequest) (resp providers.ConfigureProviderResponse) {
|
|
logger.Trace("GRPCProvider: ConfigureProvider")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
var mp []byte
|
|
|
|
// we don't have anything to marshal if there's no config
|
|
mp, err := msgpack.Marshal(r.Config, schema.Provider.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.Configure_Request{
|
|
TerraformVersion: r.TerraformVersion,
|
|
Config: &proto.DynamicValue{
|
|
Msgpack: mp,
|
|
},
|
|
}
|
|
|
|
protoResp, err := p.client.Configure(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) Stop(ctx context.Context) error {
|
|
logger.Trace("GRPCProvider: Stop")
|
|
// NOTE: The contract for providers.Interface guarantees that the ctx
|
|
// passed to this function is never canceled, so we can safely use it
|
|
// to make our "stop" request here. The context passed to other methods
|
|
// _can_ be cancelled in some cases, so other methods should use
|
|
// ctx = context.WithoutCancel(ctx) before making requests that need
|
|
// to be able to terminate gracefully in response to "Stop".
|
|
|
|
resp, err := p.client.Stop(ctx, new(proto.Stop_Request))
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
if resp.Error != "" {
|
|
return errors.New(resp.Error)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (p *GRPCProvider) ReadResource(ctx context.Context, r providers.ReadResourceRequest) (resp providers.ReadResourceResponse) {
|
|
logger.Trace("GRPCProvider: ReadResource")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
resSchema, ok := schema.ResourceTypes[r.TypeName]
|
|
if !ok {
|
|
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown resource type %s", r.TypeName))
|
|
return resp
|
|
}
|
|
|
|
metaSchema := schema.ProviderMeta
|
|
|
|
mp, err := msgpack.Marshal(r.PriorState, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.ReadResource_Request{
|
|
TypeName: r.TypeName,
|
|
CurrentState: &proto.DynamicValue{Msgpack: mp},
|
|
Private: r.Private,
|
|
}
|
|
|
|
if metaSchema.Block != nil {
|
|
metaMP, err := msgpack.Marshal(r.ProviderMeta, metaSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
protoReq.ProviderMeta = &proto.DynamicValue{Msgpack: metaMP}
|
|
}
|
|
|
|
protoResp, err := p.client.ReadResource(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
|
|
state, err := decodeDynamicValue(protoResp.NewState, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
resp.NewState = state
|
|
resp.Private = protoResp.Private
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) PlanResourceChange(ctx context.Context, r providers.PlanResourceChangeRequest) (resp providers.PlanResourceChangeResponse) {
|
|
logger.Trace("GRPCProvider: PlanResourceChange")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
resSchema, ok := schema.ResourceTypes[r.TypeName]
|
|
if !ok {
|
|
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown resource type %q", r.TypeName))
|
|
return resp
|
|
}
|
|
|
|
metaSchema := schema.ProviderMeta
|
|
capabilities := schema.ServerCapabilities
|
|
|
|
// If the provider doesn't support planning a destroy operation, we can
|
|
// return immediately.
|
|
if r.ProposedNewState.IsNull() && !capabilities.PlanDestroy {
|
|
resp.PlannedState = r.ProposedNewState
|
|
resp.PlannedPrivate = r.PriorPrivate
|
|
return resp
|
|
}
|
|
|
|
priorMP, err := msgpack.Marshal(r.PriorState, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
configMP, err := msgpack.Marshal(r.Config, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
propMP, err := msgpack.Marshal(r.ProposedNewState, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.PlanResourceChange_Request{
|
|
TypeName: r.TypeName,
|
|
PriorState: &proto.DynamicValue{Msgpack: priorMP},
|
|
Config: &proto.DynamicValue{Msgpack: configMP},
|
|
ProposedNewState: &proto.DynamicValue{Msgpack: propMP},
|
|
PriorPrivate: r.PriorPrivate,
|
|
}
|
|
|
|
if metaSchema.Block != nil {
|
|
metaMP, err := msgpack.Marshal(r.ProviderMeta, metaSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
protoReq.ProviderMeta = &proto.DynamicValue{Msgpack: metaMP}
|
|
}
|
|
|
|
protoResp, err := p.client.PlanResourceChange(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
|
|
state, err := decodeDynamicValue(protoResp.PlannedState, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
resp.PlannedState = state
|
|
|
|
for _, p := range protoResp.RequiresReplace {
|
|
resp.RequiresReplace = append(resp.RequiresReplace, convert.AttributePathToPath(p))
|
|
}
|
|
|
|
resp.PlannedPrivate = protoResp.PlannedPrivate
|
|
|
|
resp.LegacyTypeSystem = protoResp.LegacyTypeSystem
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) ApplyResourceChange(ctx context.Context, r providers.ApplyResourceChangeRequest) (resp providers.ApplyResourceChangeResponse) {
|
|
logger.Trace("GRPCProvider: ApplyResourceChange")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
// Aside from fetching the schema above, the work of this function must
|
|
// not be directly canceled by the incoming context's cancellation
|
|
// signal, because we ask a provider plugin to gracefully cancel by
|
|
// calling the Stop method and then its apply operation must be allowed
|
|
// to run to completion to terminate gracefully if possible.
|
|
ctx = context.WithoutCancel(ctx)
|
|
|
|
resSchema, ok := schema.ResourceTypes[r.TypeName]
|
|
if !ok {
|
|
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown resource type %q", r.TypeName))
|
|
return resp
|
|
}
|
|
|
|
metaSchema := schema.ProviderMeta
|
|
|
|
priorMP, err := msgpack.Marshal(r.PriorState, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
plannedMP, err := msgpack.Marshal(r.PlannedState, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
configMP, err := msgpack.Marshal(r.Config, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.ApplyResourceChange_Request{
|
|
TypeName: r.TypeName,
|
|
PriorState: &proto.DynamicValue{Msgpack: priorMP},
|
|
PlannedState: &proto.DynamicValue{Msgpack: plannedMP},
|
|
Config: &proto.DynamicValue{Msgpack: configMP},
|
|
PlannedPrivate: r.PlannedPrivate,
|
|
}
|
|
|
|
if metaSchema.Block != nil {
|
|
metaMP, err := msgpack.Marshal(r.ProviderMeta, metaSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
protoReq.ProviderMeta = &proto.DynamicValue{Msgpack: metaMP}
|
|
}
|
|
|
|
protoResp, err := p.client.ApplyResourceChange(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
|
|
resp.Private = protoResp.Private
|
|
|
|
state, err := decodeDynamicValue(protoResp.NewState, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
resp.NewState = state
|
|
|
|
resp.LegacyTypeSystem = protoResp.LegacyTypeSystem
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) ImportResourceState(ctx context.Context, r providers.ImportResourceStateRequest) (resp providers.ImportResourceStateResponse) {
|
|
logger.Trace("GRPCProvider: ImportResourceState")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.ImportResourceState_Request{
|
|
TypeName: r.TypeName,
|
|
Id: r.ID,
|
|
}
|
|
|
|
protoResp, err := p.client.ImportResourceState(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
|
|
for _, imported := range protoResp.ImportedResources {
|
|
resource := providers.ImportedResource{
|
|
TypeName: imported.TypeName,
|
|
Private: imported.Private,
|
|
}
|
|
|
|
resSchema, ok := schema.ResourceTypes[r.TypeName]
|
|
if !ok {
|
|
resp.Diagnostics = resp.Diagnostics.Append(fmt.Errorf("unknown resource type %q", r.TypeName))
|
|
continue
|
|
}
|
|
|
|
state, err := decodeDynamicValue(imported.State, resSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
resource.State = state
|
|
resp.ImportedResources = append(resp.ImportedResources, resource)
|
|
}
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) MoveResourceState(ctx context.Context, r providers.MoveResourceStateRequest) providers.MoveResourceStateResponse {
|
|
var resp providers.MoveResourceStateResponse
|
|
logger.Trace("GRPCProvider: MoveResourceState")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
resourceSchema, ok := schema.ResourceTypes[r.TargetTypeName]
|
|
if !ok {
|
|
schema.Diagnostics = schema.Diagnostics.Append(fmt.Errorf("unknown data source %q", r.TargetTypeName))
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.MoveResourceState_Request{
|
|
SourceProviderAddress: r.SourceProviderAddress,
|
|
SourceTypeName: r.SourceTypeName,
|
|
SourceSchemaVersion: int64(r.SourceSchemaVersion),
|
|
SourceState: &proto.RawState{
|
|
Json: r.SourceStateJSON,
|
|
Flatmap: r.SourceStateFlatmap,
|
|
},
|
|
SourcePrivate: r.SourcePrivate,
|
|
TargetTypeName: r.TargetTypeName,
|
|
}
|
|
|
|
protoResp, err := p.client.MoveResourceState(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
|
|
state, err := decodeDynamicValue(protoResp.TargetState, resourceSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
resp.TargetState = state
|
|
resp.TargetPrivate = protoResp.TargetPrivate
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) ReadDataSource(ctx context.Context, r providers.ReadDataSourceRequest) (resp providers.ReadDataSourceResponse) {
|
|
logger.Trace("GRPCProvider: ReadDataSource")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
resp.Diagnostics = schema.Diagnostics
|
|
return resp
|
|
}
|
|
|
|
dataSchema, ok := schema.DataSources[r.TypeName]
|
|
if !ok {
|
|
schema.Diagnostics = schema.Diagnostics.Append(fmt.Errorf("unknown data source %q", r.TypeName))
|
|
}
|
|
|
|
metaSchema := schema.ProviderMeta
|
|
|
|
config, err := msgpack.Marshal(r.Config, dataSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
|
|
protoReq := &proto.ReadDataSource_Request{
|
|
TypeName: r.TypeName,
|
|
Config: &proto.DynamicValue{
|
|
Msgpack: config,
|
|
},
|
|
}
|
|
|
|
if metaSchema.Block != nil {
|
|
metaMP, err := msgpack.Marshal(r.ProviderMeta, metaSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
protoReq.ProviderMeta = &proto.DynamicValue{Msgpack: metaMP}
|
|
}
|
|
|
|
protoResp, err := p.client.ReadDataSource(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
|
|
state, err := decodeDynamicValue(protoResp.State, dataSchema.Block.ImpliedType())
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(err)
|
|
return resp
|
|
}
|
|
resp.State = state
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) GetFunctions(ctx context.Context) (resp providers.GetFunctionsResponse) {
|
|
logger.Trace("GRPCProvider: GetFunctions")
|
|
|
|
protoReq := &proto.GetFunctions_Request{}
|
|
|
|
protoResp, err := p.client.GetFunctions(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Diagnostics = resp.Diagnostics.Append(grpcErr(err))
|
|
return resp
|
|
}
|
|
resp.Diagnostics = resp.Diagnostics.Append(convert.ProtoToDiagnostics(protoResp.Diagnostics))
|
|
resp.Functions = make(map[string]providers.FunctionSpec)
|
|
|
|
for name, fn := range protoResp.Functions {
|
|
resp.Functions[name] = convert.ProtoToFunctionSpec(fn)
|
|
}
|
|
|
|
return resp
|
|
}
|
|
|
|
func (p *GRPCProvider) CallFunction(ctx context.Context, r providers.CallFunctionRequest) (resp providers.CallFunctionResponse) {
|
|
logger.Trace("GRPCProvider: CallFunction")
|
|
|
|
schema := p.GetProviderSchema(ctx)
|
|
if schema.Diagnostics.HasErrors() {
|
|
// This should be unreachable
|
|
resp.Error = schema.Diagnostics.Err()
|
|
return resp
|
|
}
|
|
|
|
spec, ok := schema.Functions[r.Name]
|
|
if !ok {
|
|
funcs := p.GetFunctions(ctx)
|
|
if funcs.Diagnostics.HasErrors() {
|
|
// This should be unreachable
|
|
resp.Error = funcs.Diagnostics.Err()
|
|
return resp
|
|
}
|
|
spec, ok = funcs.Functions[r.Name]
|
|
if !ok {
|
|
// This should be unreachable
|
|
resp.Error = fmt.Errorf("invalid CallFunctionRequest: function %s not defined in provider schema", r.Name)
|
|
return resp
|
|
}
|
|
}
|
|
|
|
protoReq := &proto.CallFunction_Request{
|
|
Name: r.Name,
|
|
Arguments: make([]*proto.DynamicValue, len(r.Arguments)),
|
|
}
|
|
|
|
// Translate the arguments
|
|
// As this is functionality is always sitting behind cty/function.Function, we skip some validation
|
|
// checks of from the function and param spec. We still include basic validation to prevent panics,
|
|
// just in case there are bugs in cty. See context_functions_test.go for explicit testing of argument
|
|
// handling and short-circuiting.
|
|
if len(r.Arguments) < len(spec.Parameters) {
|
|
// This should be unreachable
|
|
resp.Error = fmt.Errorf("invalid CallFunctionRequest: function %s expected %d parameters and got %d instead", r.Name, len(spec.Parameters), len(r.Arguments))
|
|
return resp
|
|
}
|
|
|
|
for i, arg := range r.Arguments {
|
|
var paramSpec providers.FunctionParameterSpec
|
|
if i < len(spec.Parameters) {
|
|
paramSpec = spec.Parameters[i]
|
|
} else {
|
|
// We are past the end of spec.Parameters, this is either variadic or an error
|
|
if spec.VariadicParameter != nil {
|
|
paramSpec = *spec.VariadicParameter
|
|
} else {
|
|
// This should be unreachable
|
|
resp.Error = fmt.Errorf("invalid CallFunctionRequest: too many arguments passed to non-variadic function %s", r.Name)
|
|
return resp
|
|
}
|
|
}
|
|
|
|
if !paramSpec.AllowUnknownValues && !arg.IsWhollyKnown() {
|
|
// Unlike the standard in cty, AllowUnknownValues == false does not just apply to
|
|
// the root of the value (IsKnown) and instead also applies to values inside collections
|
|
// and structures (IsWhollyKnown).
|
|
// This is documented in the tfplugin proto file comments.
|
|
//
|
|
// The standard cty logic can be found in:
|
|
// https://github.com/zclconf/go-cty/blob/ea922e7a95ba2be57897697117f318670e066d22/cty/function/function.go#L288-L290
|
|
resp.Result = cty.UnknownVal(spec.Return)
|
|
return resp
|
|
}
|
|
|
|
if arg.IsNull() {
|
|
if paramSpec.AllowNullValue {
|
|
continue
|
|
} else {
|
|
resp.Error = &providers.CallFunctionArgumentError{
|
|
Text: fmt.Sprintf("parameter %s is null, which is not allowed for function %s", paramSpec.Name, r.Name),
|
|
FunctionArgument: i,
|
|
}
|
|
}
|
|
|
|
}
|
|
|
|
encodedArg, err := msgpack.Marshal(arg, paramSpec.Type)
|
|
if err != nil {
|
|
resp.Error = err
|
|
return
|
|
}
|
|
|
|
protoReq.Arguments[i] = &proto.DynamicValue{
|
|
Msgpack: encodedArg,
|
|
}
|
|
}
|
|
|
|
protoResp, err := p.client.CallFunction(ctx, protoReq)
|
|
if err != nil {
|
|
resp.Error = err
|
|
return
|
|
}
|
|
|
|
if protoResp.Error != nil {
|
|
err := &providers.CallFunctionArgumentError{
|
|
Text: protoResp.Error.Text,
|
|
}
|
|
if protoResp.Error.FunctionArgument != nil {
|
|
err.FunctionArgument = int(*protoResp.Error.FunctionArgument)
|
|
}
|
|
resp.Error = err
|
|
return
|
|
}
|
|
|
|
resp.Result, resp.Error = decodeDynamicValue(protoResp.Result, spec.Return)
|
|
return
|
|
}
|
|
|
|
// closing the grpc connection is final, and tofu will call it at the end of every phase.
|
|
func (p *GRPCProvider) Close(ctx context.Context) error {
|
|
logger.Trace("GRPCProvider: Close")
|
|
|
|
// Make sure to stop the server if we're not running within go-plugin.
|
|
if p.TestServer != nil {
|
|
p.TestServer.Stop()
|
|
}
|
|
|
|
// Check this since it's not automatically inserted during plugin creation.
|
|
// It's currently only inserted by the command package, because that is
|
|
// where the factory is built and is the only point with access to the
|
|
// plugin.Client.
|
|
if p.PluginClient == nil {
|
|
logger.Debug("provider has no plugin.Client")
|
|
return nil
|
|
}
|
|
|
|
p.PluginClient.Kill()
|
|
return nil
|
|
}
|
|
|
|
// Decode a DynamicValue from either the JSON or MsgPack encoding.
|
|
func decodeDynamicValue(v *proto.DynamicValue, ty cty.Type) (cty.Value, error) {
|
|
// always return a valid value
|
|
var err error
|
|
res := cty.NullVal(ty)
|
|
if v == nil {
|
|
return res, nil
|
|
}
|
|
|
|
switch {
|
|
case len(v.Msgpack) > 0:
|
|
res, err = msgpack.Unmarshal(v.Msgpack, ty)
|
|
case len(v.Json) > 0:
|
|
res, err = ctyjson.Unmarshal(v.Json, ty)
|
|
}
|
|
return res, err
|
|
}
|