Files
steampipe/pkg/dashboard/dashboardexecute/runtime_dependency_subscriber_impl.go

326 lines
12 KiB
Go

package dashboardexecute
import (
"context"
"fmt"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"log"
)
type RuntimeDependencySubscriber struct {
// all RuntimeDependencySubscribers are also publishers as they have args/params
RuntimeDependencyPublisherImpl
// map of runtime dependencies, keyed by dependency long name
runtimeDependencies map[string]*dashboardtypes.ResolvedRuntimeDependency
// a list of the (scoped) names of any runtime dependencies that we rely on
RuntimeDependencyNames []string `json:"dependencies,omitempty"`
RawSQL string `json:"sql,omitempty"`
executeSQL string
baseDependencySubscriber *RuntimeDependencySubscriber
}
func NewRuntimeDependencySubscriber(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, run dashboardtypes.DashboardTreeRun, executionTree *DashboardExecutionTree) *RuntimeDependencySubscriber {
b := &RuntimeDependencySubscriber{
runtimeDependencies: make(map[string]*dashboardtypes.ResolvedRuntimeDependency),
}
// TODO [node_reuse]
// HACK
// if this is a run for a base resource there will be no 'run'
if run == nil {
run = b
}
// create RuntimeDependencyPublisherImpl
// (we must create after creating the run as iut requires a ref to the run)
b.RuntimeDependencyPublisherImpl = NewRuntimeDependencyPublisherImpl(resource, parent, run, executionTree)
return b
}
// if the resource is a runtime dependency provider, create with runs and resolve dependencies
func (s *RuntimeDependencySubscriber) initRuntimeDependencies() error {
if _, ok := s.resource.(modconfig.RuntimeDependencyProvider); !ok {
return nil
}
// first call into publisher to start any with runs
if err := s.RuntimeDependencyPublisherImpl.initRuntimeDependencies(); err != nil {
return err
}
// resolve any runtime dependencies
return s.resolveRuntimeDependencies()
}
// if this node has runtime dependencies, find the publisher of the dependency and create a dashboardtypes.ResolvedRuntimeDependency
// which we use to resolve the values
func (s *RuntimeDependencySubscriber) resolveRuntimeDependencies() error {
rdp, ok := s.resource.(modconfig.RuntimeDependencyProvider)
if !ok {
return nil
}
runtimeDependencies := rdp.GetRuntimeDependencies()
for n, d := range runtimeDependencies {
// find a runtime dependency publisher who can provider this runtime dependency
publisher := s.findRuntimeDependencyPublisher(d)
if publisher == nil {
// should never happen as validation should have caught this
return fmt.Errorf("cannot resolve runtime dependency %s", d.String())
}
// read name and dep into local loop vars to ensure correct value used when transform func is invoked
name := n
dep := d
// determine the function to use to retrieve the runtime dependency value
var opts []RuntimeDependencyPublishOption
switch dep.PropertyPath.ItemType {
case modconfig.BlockTypeWith:
// set a transform function to extract the requested with data
opts = append(opts, WithTransform(func(resolvedVal *dashboardtypes.ResolvedRuntimeDependencyValue) *dashboardtypes.ResolvedRuntimeDependencyValue {
transformedResolvedVal := &dashboardtypes.ResolvedRuntimeDependencyValue{Error: resolvedVal.Error}
if resolvedVal.Error == nil {
// the runtime dependency value for a 'with' is *dashboardtypes.LeafData
withValue, err := s.getWithValue(name, resolvedVal.Value.(*dashboardtypes.LeafData), dep.PropertyPath)
if err != nil {
transformedResolvedVal.Error = fmt.Errorf("failed to resolve with value '%s' for %s: %s", dep.PropertyPath.Original, name, err.Error())
} else {
transformedResolvedVal.Value = withValue
}
}
return transformedResolvedVal
}))
}
// subscribe, passing a function which invokes getWithValue to resolve the required with value
valueChannel := publisher.SubscribeToRuntimeDependency(d.SourceResourceName(), opts...)
publisherName := publisher.GetName()
s.runtimeDependencies[name] = dashboardtypes.NewResolvedRuntimeDependency(dep, valueChannel, publisherName)
}
return nil
}
func (s *RuntimeDependencySubscriber) evaluateRuntimeDependencies(ctx context.Context) error {
// now wait for any runtime dependencies then resolve args and params
// (it is possible to have params but no sql)
if s.hasRuntimeDependencies() {
// if there are any unresolved runtime dependencies, wait for them
if err := s.waitForRuntimeDependencies(); err != nil {
return err
}
// ok now we have runtime dependencies, we can resolve the query
if err := s.resolveSQLAndArgs(); err != nil {
return err
}
}
return nil
}
func (s *RuntimeDependencySubscriber) waitForRuntimeDependencies() error {
if !s.hasRuntimeDependencies() {
return nil
}
// wait for base dependencies if we have any
if s.baseDependencySubscriber != nil {
if err := s.baseDependencySubscriber.waitForRuntimeDependencies(); err != nil {
return err
}
}
allRuntimeDepsResolved := true
for _, dep := range s.runtimeDependencies {
if !dep.IsResolved() {
allRuntimeDepsResolved = false
}
}
if allRuntimeDepsResolved {
return nil
}
// set status to blocked
s.setStatus(dashboardtypes.DashboardRunBlocked)
log.Printf("[TRACE] LeafRun '%s' waitForRuntimeDependencies", s.resource.Name())
for _, resolvedDependency := range s.runtimeDependencies {
// TODO [node_reuse] what about dependencies _between_ dependencies - do this async
// block until the dependency is available
err := resolvedDependency.Resolve()
if err != nil {
return err
}
}
log.Printf("[TRACE] %s: all runtime dependencies ready", s.resource.Name())
return nil
}
func (s *RuntimeDependencySubscriber) findRuntimeDependenciesForParentProperty(parentProperty string) []*dashboardtypes.ResolvedRuntimeDependency {
var res []*dashboardtypes.ResolvedRuntimeDependency
for _, dep := range s.runtimeDependencies {
if dep.Dependency.ParentPropertyName == parentProperty {
res = append(res, dep)
}
}
// also look at base subscriber
if s.baseDependencySubscriber != nil {
for _, dep := range s.baseDependencySubscriber.runtimeDependencies {
if dep.Dependency.ParentPropertyName == parentProperty {
res = append(res, dep)
}
}
}
return res
}
func (s *RuntimeDependencySubscriber) findRuntimeDependencyForParentProperty(parentProperty string) *dashboardtypes.ResolvedRuntimeDependency {
res := s.findRuntimeDependenciesForParentProperty(parentProperty)
if len(res) > 1 {
panic(fmt.Sprintf("findRuntimeDependencyForParentProperty for %s, parent property %s, returned more that 1 result", s.Name, parentProperty))
}
if res == nil {
return nil
}
// return first result
return res[0]
}
// resolve the sql for this leaf run into the source sql (i.e. NOT the prepared statement name) and resolved args
func (s *RuntimeDependencySubscriber) resolveSQLAndArgs() error {
log.Printf("[TRACE] LeafRun '%s' resolveSQLAndArgs", s.resource.Name())
queryProvider, ok := s.resource.(modconfig.QueryProvider)
if !ok {
// not a query provider - nothing to do
return nil
}
// convert arg runtime dependencies into arg map
runtimeArgs, err := s.buildRuntimeDependencyArgs()
if err != nil {
log.Printf("[TRACE] LeafRun '%s' buildRuntimeDependencyArgs failed: %s", s.resource.Name(), err.Error())
return err
}
// now if any param defaults had runtime dependencies, populate them
s.populateParamDefaults(queryProvider)
log.Printf("[TRACE] LeafRun '%s' built runtime args: %v", s.resource.Name(), runtimeArgs)
// does this leaf run have any SQL to execute?
// TODO [node_reuse] split this into resolve query and resolve args - we may have args but no query
if queryProvider.RequiresExecution(queryProvider) {
resolvedQuery, err := s.executionTree.workspace.ResolveQueryFromQueryProvider(queryProvider, runtimeArgs)
if err != nil {
return err
}
s.RawSQL = resolvedQuery.RawSQL
s.executeSQL = resolvedQuery.ExecuteSQL
s.Args = resolvedQuery.Args
}
//}
return nil
}
func (s *RuntimeDependencySubscriber) populateParamDefaults(provider modconfig.QueryProvider) {
paramDefs := provider.GetParams()
for _, paramDef := range paramDefs {
if dep := s.findRuntimeDependencyForParentProperty(paramDef.UnqualifiedName); dep != nil {
// assuming the default property is the target, set the default
if typehelpers.SafeString(dep.Dependency.TargetPropertyName) == "default" {
paramDef.SetDefault(dep.Value)
}
}
}
}
// convert runtime dependencies into arg map
func (s *RuntimeDependencySubscriber) buildRuntimeDependencyArgs() (*modconfig.QueryArgs, error) {
res := modconfig.NewQueryArgs()
log.Printf("[TRACE] LeafRun '%s' buildRuntimeDependencyArgs - %d runtime dependencies", s.resource.Name(), len(s.runtimeDependencies))
// if the runtime dependencies use position args, get the max index and ensure the args array is large enough
maxArgIndex := -1
// build list of all args runtime dependencies
argRuntimeDependencies := s.findRuntimeDependenciesForParentProperty(modconfig.AttributeArgs)
for _, dep := range argRuntimeDependencies {
if dep.Dependency.TargetPropertyIndex != nil && *dep.Dependency.TargetPropertyIndex > maxArgIndex {
maxArgIndex = *dep.Dependency.TargetPropertyIndex
}
}
if maxArgIndex != -1 {
res.ArgList = make([]*string, maxArgIndex+1)
}
// now set the arg values
for _, dep := range argRuntimeDependencies {
if dep.Dependency.TargetPropertyName != nil {
err := res.SetNamedArgVal(dep.Value, *dep.Dependency.TargetPropertyName)
if err != nil {
return nil, err
}
} else {
if dep.Dependency.TargetPropertyIndex == nil {
return nil, fmt.Errorf("invalid runtime dependency - both ArgName and ArgIndex are nil ")
}
err := res.SetPositionalArgVal(dep.Value, *dep.Dependency.TargetPropertyIndex)
if err != nil {
return nil, err
}
}
}
return res, nil
}
func (s *RuntimeDependencySubscriber) hasParam(paramName string) bool {
for _, p := range s.Params {
if p.ShortName == paramName {
return true
}
}
return false
}
// populate the list of runtime dependencies that this run depends on
func (s *RuntimeDependencySubscriber) setRuntimeDependencies() {
for _, d := range s.runtimeDependencies {
// add to DependencyWiths using ScopedName, i.e. <parent FullName>.<with UnqualifiedName>.
// we do this as there may be a with from a base resource with a clashing with name
// NOTE: this must be consistent with the naming in RuntimeDependencyPublisherImpl.createWithRuns
s.RuntimeDependencyNames = append(s.RuntimeDependencyNames, d.ScopedName())
}
// get base runtime dependencies (if any)
if s.baseDependencySubscriber != nil {
s.baseDependencySubscriber.setRuntimeDependencies()
s.RuntimeDependencyNames = append(s.RuntimeDependencyNames, s.baseDependencySubscriber.RuntimeDependencyNames...)
}
}
func (s *RuntimeDependencySubscriber) hasRuntimeDependencies() bool {
return len(s.runtimeDependencies)+len(s.baseRuntimeDependencies()) > 0
}
func (s *RuntimeDependencySubscriber) baseRuntimeDependencies() map[string]*dashboardtypes.ResolvedRuntimeDependency {
if s.baseDependencySubscriber == nil {
return map[string]*dashboardtypes.ResolvedRuntimeDependency{}
}
return s.baseDependencySubscriber.runtimeDependencies
}
// override DashboardParentImpl.executeChildrenAsync to also execute 'withs' of our baseDependencySubscriber
func (s *RuntimeDependencySubscriber) executeChildrenAsync(ctx context.Context) {
// if we have a baseDependencySubscriber, do not execute it but execute its with runs
if s.baseDependencySubscriber != nil {
s.baseDependencySubscriber.executeWithsAsync(ctx)
}
// if this leaf run has children (including with runs) execute them asynchronously
s.DashboardParentImpl.executeChildrenAsync(ctx)
}