Add support for param runtime dependencies. Closes #2910

This commit is contained in:
kaidaguerre
2022-12-22 13:44:40 +00:00
committed by GitHub
parent f0a14a23bf
commit dfcff84f2d
52 changed files with 615 additions and 713 deletions

View File

@@ -1,337 +0,0 @@
# CLI DOCS
## Command Initialisation
### Query
#### Batch
#### Interactive
1) Create InitData
```
type InitData struct {
Loaded chan struct{} // a channel which is closed when the initialisation is complete
Queries []string // a list of queries specifed in the command args
Workspace *workspace.Workspace // the workspace
Client db_common.Client // the database client
Result *db_common.InitResult // the initialisation result
cancel context.CancelFunc // a cancel function used to cancel during initialisation
}
initData := query.NewInitData(ctx, w, args)
```
During initialisation we
### Check
### Dashboard
## Refresh connections
...
Finally, call `LoadForeignSchemaNames` which updates the client `foreignSchemas` property with a list of foreign schema
###Setting Search Path
`LocalDbClient.RefreshConnectionAndSearchPaths` simplified, does this:
```
refreshConnections()
setUserSearchPath()
SetSessionSearchPath()
```
####setUserSearchPath
This function sets the search path for all steampipe users of the db service.
We do this so that the search path is set even when connecting to the DB from a non Steampipe client.
(When using Steampipe to connect to the DB, it is the Session search path which is respected.)
It does this by finding all users assigned to the role `steampipe_users` and setting their search path.
To determine the search path to set, it checks whether the `search-path` config is set.
- If set, it uses the configured value (with "internal" at the end)
- If not, it calls `getDefaultSearchPath` which builds a search path from the connection schemas, bookended with `public` and `internal`.
#### SetRequiredSessionSearchPath
This function populates the `requiredSessionSearchPath` property on the client.
This will be used during session initialisation to actually set the search path
In order to construct the required search path, `ContructSearchPath` is called
#### ContructSearchPath
- If a custom search path has been provided, prefix this with the search path prefix (if any) and suffix with `internal`
- Otherwise use the default search path, prefixed with the search path prefix (if any)
If either a `search-path` or `search-path-prefix` is set in config, this sets the search path
(otherwise fall back to the user search path set in setUserSearchPath`)
### Plugin Manager
### Connection watching
The plugin manager starts a file watch which detects changes in the connection config
Whenever a connection config change is detected:
- The first change event is ignored - we always receive stray event on startup
- The connection config is loaded
- The updated connection config is sent to the plugin manager
- *RefreshConnections* is called - this will ensure the database schema and search paths are updated to match the connection config
NOTE: if a connection is added while a query session is running:
- The new schema will be available for use
- The search path will NOT be updated, as this is set per database session.
For the new search path to apply, a NEW session must be started
## Session data
### Introspection tables
### Prepared statements
## Control Hooks
When executing controls, a struct implementing `ControlHooks` interface is injected into the context.
```
type ControlHooks interface {
OnStart(context.Context, *ControlProgress)
OnControlStart(context.Context, ControlRunStatusProvider, *ControlProgress)
OnControlComplete(context.Context, ControlRunStatusProvider, *ControlProgress)
OnControlError(context.Context, ControlRunStatusProvider, *ControlProgress)
OnComplete(context.Context, *ControlProgress)
}
```
### Check Implementation
When executing `steampipe check`, an instance of `StatusControlHooks` is used for the `ControlHooks`.
This implementation displays the status of the current control run.
### Dashboard Implementation
When executing `steampipe dashboard`, for each `CheckRun` an instance of `DashboardEventControlHooks` is used for the `ControlHooks`.
This implementation raises dashboard events when the check run completes or experiences an error
NOTE: this is injected into the context in CheckRun.Execute, i.e. each dashboard in a check run will have iuts own implementation
### Snapshot Implementation
## Service management
## Option naming standards
## Plugin query result caching
IndexBuckets are stored keyed by table name and connection
IndexBuckets contain an array of IndexItems:
```
// IndexBucket contains index items for all cache results for a given table and connection
type IndexBucket struct {
Items []*IndexItem
}
```
Each index item has the key of a cache result, and the columns, quals and insertion time of that item.
```
// IndexItem stores the columns and cached index for a single cached query result
// note - this index item it tied to a specific table and set of quals
type IndexItem struct {
Columns []string
Key string
Limit int64
Quals map[string]*proto.Quals
InsertionTime time.Time
}
```
### Cache Get
- Build index bucket key from connection name and table
- Get the index bucket from cache
- If the index bucket exists, determine whether it contains an index item which satisfies the quals, columns, limit and ttl or the request.
(NOTE: only key column quals are used when checking cached data)
- If a matching index item is found, use the `Key` property to retrieve the result
#### Identifying cache hits
- Columns
- Limit
- Qual subset
- Qual exact match
## Plugin Instantiation
```
hub.getConnectionPlugin(connection name)
get plugin name from connection config for connection
<other stuff>
hub.createConnectionPlugin(plugin name, connection name)
CreateConnectionPlugins(connections []*modconfig.Connection)
pluginManager.Get(&proto.GetRequest{})
also during refresh connections
populateConnectionPlugins
CreateConnectionPlugins(connectionsToCreate
```
## Config Initialisation and Precedence
Connection config consists of:
- Steampipe connections (including options overrides)
- Steampipe default options (from the config directory)
- Workspace specific options (from the mod location)
#### Load Workspace Profile
#### Set Install dir
If not set default to wd
### Load connection config
Uses mod location to load config files in mod directory
## Why does FDW Need connection config?
1. FDW needs to know if a connection is an aggregator and if so, it needs to resolve the child connection names
It does this to determine whether to push down limit and build the limit map
2. FDW needs the connection options to get cache parameters
Possible solution is for plugin manager to have an endpoint which returns the necessary connection information
## Interface Usage
missing from QueryProviderBase
Name
GetTitle
GetUnqualifiedName
**QueryProvider**
```
HclResource
GetArgs() *QueryArgs
GetParams() []*ParamDef
GetSQL() *string
GetQuery() *Query
SetArgs(*QueryArgs)
SetParams([]*ParamDef)
GetMod() *Mod
GetDescription() string
GetPreparedStatementExecuteSQL(*QueryArgs) (*ResolvedQuery, error)
// implemented by QueryProviderBase
AddRuntimeDependencies([]*RuntimeDependency)
GetRuntimeDependencies() map[string]*RuntimeDependency
RequiresExecution(QueryProvider) bool
VerifyQuery(QueryProvider) error
MergeParentArgs(QueryProvider, QueryProvider) (diags hcl.Diagnostics)
```
- Control
- DashboardCard
- DashboardChart
- DashboardEdge
- DashboardFlow
- DashboardGraph
- DashboardHiearchy
- DashboardImage
- DashboardInput
- DashboardNode
- DashboardTable
- Query
**HclResource**
```
Name() string
GetTitle() string
GetUnqualifiedName() string
CtyValue() (cty.Value, error)
OnDecoded(*hcl.Block, ResourceMapsProvider) hcl.Diagnostics
GetDeclRange() *hcl.Range
BlockType() string
GetDescription() string
GetTags() map[string]string
```
- DirectChildrenModDecorator
- Benchmark x
- Control x
- Dashboard x
- DashboardCard x
- DashboardCategory x
- DashboardChart x
- DashboardEdge x
- DashboardFlow x
- DashboardGraph x
- DashboardHiearchy x
- DashboardImage x
- DashboardInput x
- DashboardNode x
- DashboardTable x
- DashboardText
- Local x
- Mod x
- Query x
- variable x
- DashboardContainer
**ModTreeItem**
```
AddParent(ModTreeItem) error
GetParents() []ModTreeItem
GetChildren() []ModTreeItem
// TODO move to Hcl Resource
GetDocumentation() string
// GetPaths returns an array resource paths
GetPaths() []NodePath
SetPaths()
GetMod() *Mod
```
- Benchmark x
- Control x
- Dashboard x
- DashboardCard
- DashboardCategory
- DashboardChart
- DashboardEdge
- DashboardFlow
- DashboardGraph
- DashboardHiearchy
- DashboardImage
- DashboardInput
- DashboardNode
- DashboardTable
- DashboardText
- Local x
- Mod x
- Query x
- variable x
Meed to parse sepatarely into HclResourceBase and ModTreeItemBase
```
# SDK DOCS
## Connection source file watching
### Limit handling
#### FDW Limit Parsing
#### SDK Limit behaviour

View File

@@ -1,41 +0,0 @@
## Modfile Parsing
Modfile parsing and decoding is executed before the remainder of the mod resources are parsed.
This is necessary as we need to identify mod dependencies.
**This means we DO NOT support hcl references within the modfile defintion**
The exception to this is when passing args to dependent mods. These are parsed separately at the end os the workspace parse process
##Database setup and Initialisation
DB Installation is ensured by calling `EnsureDBInstalled`
### Overview
If the database `IsInstalled`, it calls `prepareDb`
If not installed, the db is installed, migrating public schem data if there is a major version update
### Details
#### IsInstalled
This function determines if the database is installed by:
- looking for the initDb, postgres and fdw binaries
- looking for the fdw control and sql file
If any of these are missing the database is deemed not installed and a full installation occurs
#### prepareDb
This function:
- checks if the installed db version has the correct ImageRef. In other words, has the database package changed, without the Postgres version changing. If so, it installs the new database package (and FDW), retaining the existing data
- checks if the correct FDW version is installed - if not it installs it
- checks if the database is initialised (by testing whether pg_hba.conf exists) and if not, initialise it *TODO* identify when this can occur
#### Database Installation
If `IsInstalled` returns false a full db installation is carried out.
- first verify if a service is running. If so, display an error and return
- download and install the db files
- if this is a Major version update, use pg_dump to backup the public schema
- install the FDW
NOTE: if a backup was taken it is restored by `restoreBackup` which is called from `RefreshConnectionAndSearchPaths` -
MOVE THIS: https://github.com/turbot/steampipe/issues/2037

View File

@@ -1,15 +0,0 @@
# Runtime dependencies
Runtime dependencies are identified when an `arg` or `param` definition references either a `with`, `input` or `param` block
They are populated on the resource as part of the argument decoding (this is handled by `QueryProviderBase`)
When constructing `LeafRun` objects for resources, the `LeafRun` runtime dependencies are populated from the resource
in `resolveRuntimeDependencies`
CHANGES
- only top level nodes can have param or with
- for query providers, base does not inherit with, params or args. Instead store a reference to the base,
- only execute with runs trhat are needed by runtime dep
- in leaf run, if resource has a base and its with are required, resolve runtime depos to populate args/params on base object

View File

@@ -52,7 +52,7 @@ func NewCheckRun(resource modconfig.DashboardLeafNode, parent dashboardtypes.Das
return c, nil
}
// Initialise implements DashboardRunNode
// Initialise implements DashboardTreeRun
func (r *CheckRun) Initialise(ctx context.Context) {
// build control execution tree during init, rather than in Execute, so that it is populated when the ExecutionStarted event is sent
executionTree, err := controlexecute.NewExecutionTree(ctx, r.executionTree.workspace, r.executionTree.client, r.resource.Name(), "")
@@ -65,7 +65,7 @@ func (r *CheckRun) Initialise(ctx context.Context) {
r.Root = executionTree.Root.Children[0]
}
// Execute implements DashboardRunNode
// Execute implements DashboardTreeRun
func (r *CheckRun) Execute(ctx context.Context) {
utils.LogTime("CheckRun.execute start")
defer utils.LogTime("CheckRun.execute end")

View File

@@ -44,7 +44,7 @@ func NewDashboardContainerRun(container *modconfig.DashboardContainer, parent da
if container.Width != nil {
r.Width = *container.Width
}
r.childComplete = make(chan dashboardtypes.DashboardTreeRun, len(children))
r.childCompleteChan = make(chan dashboardtypes.DashboardTreeRun, len(children))
for _, child := range children {
var childRun dashboardtypes.DashboardTreeRun
var err error
@@ -97,7 +97,7 @@ func NewDashboardContainerRun(container *modconfig.DashboardContainer, parent da
// IsSnapshotPanel implements SnapshotPanel
func (*DashboardContainerRun) IsSnapshotPanel() {}
// Initialise implements DashboardRunNode
// Initialise implements DashboardTreeRun
func (r *DashboardContainerRun) Initialise(ctx context.Context) {
// initialise our children
if err := r.initialiseChildren(ctx); err != nil {
@@ -105,7 +105,7 @@ func (r *DashboardContainerRun) Initialise(ctx context.Context) {
}
}
// Execute implements DashboardRunNode
// Execute implements DashboardTreeRun
// execute all children and wait for them to complete
func (r *DashboardContainerRun) Execute(ctx context.Context) {
// execute all children asynchronously
@@ -116,7 +116,7 @@ func (r *DashboardContainerRun) Execute(ctx context.Context) {
// wait for children to complete
var errors []error
for !r.ChildrenComplete() {
completeChild := <-r.childComplete
completeChild := <-r.childCompleteChan
if completeChild.GetRunStatus() == dashboardtypes.DashboardRunError {
errors = append(errors, completeChild.GetError())
}
@@ -140,12 +140,12 @@ func (r *DashboardContainerRun) SetError(_ context.Context, err error) {
// error type does not serialise to JSON so copy into a string
r.ErrorString = err.Error()
r.Status = dashboardtypes.DashboardRunError
r.parent.ChildCompleteChan() <- r
r.notifyParentOfCompletion()
}
// SetComplete implements DashboardTreeRun
func (r *DashboardContainerRun) SetComplete(context.Context) {
r.Status = dashboardtypes.DashboardRunComplete
// tell parent we are done
r.parent.ChildCompleteChan() <- r
r.notifyParentOfCompletion()
}

View File

@@ -242,12 +242,20 @@ func (e *DashboardExecutionTree) Cancel() {
}
func (e *DashboardExecutionTree) BuildSnapshotPanels() map[string]dashboardtypes.SnapshotPanel {
// just build from e.runs
res := map[string]dashboardtypes.SnapshotPanel{}
// if this node is a snapshot node, add to map
if snapshotNode, ok := e.Root.(dashboardtypes.SnapshotPanel); ok {
res[e.Root.GetName()] = snapshotNode
for name, run := range e.runs {
res[name] = run.(dashboardtypes.SnapshotPanel)
// special case handling for check runs
if checkRun, ok := run.(*CheckRun); ok {
checkRunChildren := checkRun.BuildSnapshotPanels(res)
for k, v := range checkRunChildren {
res[k] = v
}
}
}
return e.buildSnapshotPanelsUnder(e.Root.(dashboardtypes.DashboardParent), res)
return res
}
// InputRuntimeDependencies returns the names of all inputs which are runtime dependencies
@@ -265,23 +273,6 @@ func (e *DashboardExecutionTree) InputRuntimeDependencies() []string {
return maps.Keys(deps)
}
func (e *DashboardExecutionTree) buildSnapshotPanelsUnder(parent dashboardtypes.DashboardParent, res map[string]dashboardtypes.SnapshotPanel) map[string]dashboardtypes.SnapshotPanel {
if checkRun, ok := parent.(*CheckRun); ok {
return checkRun.BuildSnapshotPanels(res)
}
for _, c := range parent.GetChildren() {
// if this node is a snapshot node, add to map
if snapshotNode, ok := c.(dashboardtypes.SnapshotPanel); ok {
res[c.GetName()] = snapshotNode
}
if p, ok := c.(dashboardtypes.DashboardParent); ok {
res = e.buildSnapshotPanelsUnder(p, res)
}
}
return res
}
// GetChildren implements DashboardParent
func (e *DashboardExecutionTree) GetChildren() []dashboardtypes.DashboardTreeRun {
return []dashboardtypes.DashboardTreeRun{e.Root}
@@ -293,6 +284,7 @@ func (e *DashboardExecutionTree) ChildrenComplete() bool {
}
// Tactical: Empty implementations of DashboardParent functions
// TODO remove need for this
func (e *DashboardExecutionTree) Initialise(ctx context.Context) {
panic("should never call for DashboardExecutionTree")
@@ -318,6 +310,10 @@ func (e *DashboardExecutionTree) GetInputsDependingOn(s string) []string {
panic("should never call for DashboardExecutionTree")
}
func (e *DashboardExecutionTree) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
func (*DashboardExecutionTree) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
panic("should never call for DashboardExecutionTree")
}
func (*DashboardExecutionTree) GetResource() modconfig.DashboardLeafNode {
panic("should never call for DashboardExecutionTree")
}

View File

@@ -10,14 +10,15 @@ import (
type DashboardParentImpl struct {
DashboardTreeRunImpl
children []dashboardtypes.DashboardTreeRun
childComplete chan dashboardtypes.DashboardTreeRun
children []dashboardtypes.DashboardTreeRun
childCompleteChan chan dashboardtypes.DashboardTreeRun
}
func (r *DashboardParentImpl) initialiseChildren(ctx context.Context) error {
var errors []error
for _, child := range r.children {
child.Initialise(ctx)
if err := child.GetError(); err != nil {
errors = append(errors, err)
}
@@ -44,13 +45,12 @@ func (r *DashboardParentImpl) ChildrenComplete() bool {
}
func (r *DashboardParentImpl) ChildCompleteChan() chan dashboardtypes.DashboardTreeRun {
return r.childComplete
return r.childCompleteChan
}
func (r *DashboardParentImpl) createChildCompleteChan() {
// create buffered child complete chan
if childCount := len(r.children); childCount > 0 {
r.childComplete = make(chan dashboardtypes.DashboardTreeRun, childCount)
r.childCompleteChan = make(chan dashboardtypes.DashboardTreeRun, childCount)
}
}
@@ -70,11 +70,11 @@ func (r *DashboardParentImpl) executeWithsAsync(ctx context.Context) {
}
}
func (r *DashboardParentImpl) waitForChildren() chan error {
log.Printf("[TRACE] %s waitForChildren", r.Name)
func (r *DashboardParentImpl) waitForChildrenAsync() chan error {
log.Printf("[TRACE] %s waitForChildrenAsync", r.Name)
var doneChan = make(chan error)
if len(r.children) == 0 {
log.Printf("[TRACE] %s waitForChildren - no children so we're done", r.Name)
log.Printf("[TRACE] %s waitForChildrenAsync - no children so we're done", r.Name)
// if there are no children, return a closed channel so we do not wait
close(doneChan)
} else {
@@ -83,7 +83,7 @@ func (r *DashboardParentImpl) waitForChildren() chan error {
var errors []error
for !(r.ChildrenComplete()) {
completeChild := <-r.childComplete
completeChild := <-r.childCompleteChan
log.Printf("[TRACE] %s got child complete for %s", r.Name, completeChild.GetName())
if completeChild.GetRunStatus() == dashboardtypes.DashboardRunError {
errors = append(errors, completeChild.GetError())

View File

@@ -42,6 +42,9 @@ func NewDashboardRun(dashboard *modconfig.Dashboard, parent dashboardtypes.Dashb
// (we must create after creating the run as it requires a ref to the run)
// TODO [node_reuse] do this a different way
r.RuntimeDependencyPublisherImpl = NewRuntimeDependencyPublisherImpl(dashboard, parent, r, executionTree)
// add r into execution tree BEFORE creating child runs or initialising runtime depdencies
// - this is so child runs can find this dashboard run
executionTree.runs[r.Name] = r
// set inputs map on RuntimeDependencyPublisherImpl BEFORE creating child runs
r.inputs = dashboard.GetInputs()
@@ -57,16 +60,13 @@ func NewDashboardRun(dashboard *modconfig.Dashboard, parent dashboardtypes.Dashb
return nil, err
}
// add r into execution tree
executionTree.runs[r.Name] = r
// create buffered channel for children to report their completion
r.createChildCompleteChan()
return r, nil
}
// Initialise implements DashboardRunNode
// Initialise implements DashboardTreeRun
func (r *DashboardRun) Initialise(ctx context.Context) {
// initialise our children
if err := r.initialiseChildren(ctx); err != nil {
@@ -74,13 +74,13 @@ func (r *DashboardRun) Initialise(ctx context.Context) {
}
}
// Execute implements DashboardRunNode
// Execute implements DashboardTreeRun
// execute all children and wait for them to complete
func (r *DashboardRun) Execute(ctx context.Context) {
r.executeChildrenAsync(ctx)
// wait for children to complete
err := <-r.waitForChildren()
err := <-r.waitForChildrenAsync()
log.Printf("[TRACE] Execute run %s all children complete, error: %v", r.Name, err)
if err == nil {
@@ -101,14 +101,14 @@ func (r *DashboardRun) SetError(_ context.Context, err error) {
// error type does not serialise to JSON so copy into a string
r.ErrorString = err.Error()
r.Status = dashboardtypes.DashboardRunError
r.parent.ChildCompleteChan() <- r
r.notifyParentOfCompletion()
}
// SetComplete implements DashboardTreeRun
func (r *DashboardRun) SetComplete(context.Context) {
r.Status = dashboardtypes.DashboardRunComplete
// tell parent we are done
r.parent.ChildCompleteChan() <- r
r.notifyParentOfCompletion()
}
// GetInput searches for an input with the given name
@@ -157,12 +157,15 @@ func (r *DashboardRun) createChildRuns(executionTree *DashboardExecutionTree) er
// NOTE: clone the input to avoid mutating the original
// TODO remove the need for this when we refactor input values resolution
// TODO https://github.com/turbot/steampipe/issues/2864
childRun, err = NewLeafRun(i.Clone(), r, executionTree)
// TACTICAL: as this is a runtime dependency, set the run name to the 'scoped name'
// this is to match the name in the panel dependendencies
// TODO [node_reuse] tidy this
inputRunName := fmt.Sprintf("%s.%s", r.DashboardName, i.UnqualifiedName)
childRun, err = NewLeafRun(i.Clone(), r, executionTree, setName(inputRunName))
if err != nil {
return err
}
// TACTICAL: as this is a runtime dependency, set the run name to the 'scoped name'
childRun.(*LeafRun).Name = fmt.Sprintf("%s.%s", r.DashboardName, i.UnqualifiedName)
default:
// ensure this item is a DashboardLeafNode

View File

@@ -30,7 +30,8 @@ type DashboardTreeRunImpl struct {
resource modconfig.DashboardLeafNode
// store the top level run which embeds this struct
// we need this for setStatus which serialises the run for the message payload
run dashboardtypes.DashboardTreeRun
run dashboardtypes.DashboardTreeRun
executeConfig dashboardtypes.TreeRunExecuteConfig
}
func NewDashboardTreeRunImpl(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, run dashboardtypes.DashboardTreeRun, executionTree *DashboardExecutionTree) DashboardTreeRunImpl {
@@ -108,18 +109,26 @@ func (r *DashboardTreeRunImpl) GetNodeType() string {
return r.NodeType
}
// Initialise implements DashboardTreeRun
func (r *DashboardTreeRunImpl) Initialise(context.Context) {
panic("must be implemented by child struct")
}
// Execute implements DashboardTreeRun
func (r *DashboardTreeRunImpl) Execute(ctx context.Context) {
panic("must be implemented by child struct")
}
// AsTreeNode implements DashboardTreeRun
func (r *DashboardTreeRunImpl) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
panic("must be implemented by child struct")
}
// GetResource implements DashboardTreeRun
func (r *DashboardTreeRunImpl) GetResource() modconfig.DashboardLeafNode {
return r.resource
}
// TODO [node_reuse] do this a different way
// maybe move to a different embedded struct - ExecutableRun, to differentiate between Base runs
@@ -134,7 +143,7 @@ func (r *DashboardTreeRunImpl) SetError(ctx context.Context, err error) {
// set status (this sends update event)
r.setStatus(dashboardtypes.DashboardRunError)
// tell parent we are done
r.parent.ChildCompleteChan() <- r
r.notifyParentOfCompletion()
}
// SetComplete implements DashboardTreeRun
@@ -142,15 +151,22 @@ func (r *DashboardTreeRunImpl) SetComplete(context.Context) {
// set status (this sends update event)
r.setStatus(dashboardtypes.DashboardRunComplete)
// tell parent we are done
r.parent.ChildCompleteChan() <- r
r.notifyParentOfCompletion()
}
func (r *DashboardTreeRunImpl) setStatus(status dashboardtypes.DashboardRunStatus) {
r.Status = status
// raise LeafNodeUpdated event
// TODO [node_reuse] tidy this up
// NOTE: pass the full run struct - 'r.run', rather than ourselves - so we serialize all properties
e, _ := dashboardevents.NewLeafNodeUpdate(r.run, r.executionTree.sessionId, r.executionTree.id)
r.executionTree.workspace.PublishDashboardEvent(e)
// do not send events for runtime dependency execution (i.e. when we are executing base resources
// for their runtime dependencies)
if !r.executeConfig.RuntimeDependenciesOnly {
// raise LeafNodeUpdated event
// TODO [node_reuse] tidy this up
// TACTICAL: pass the full run struct - 'r.run', rather than ourselves - so we serialize all properties
e, _ := dashboardevents.NewLeafNodeUpdate(r.run, r.executionTree.sessionId, r.executionTree.id)
r.executionTree.workspace.PublishDashboardEvent(e)
}
}
func (r *DashboardTreeRunImpl) notifyParentOfCompletion() {
r.parent.ChildCompleteChan() <- r
}

View File

@@ -14,7 +14,7 @@ import (
// LeafRun is a struct representing the execution of a leaf dashboard node
type LeafRun struct {
// all RuntimeDependencySubscribers are also publishers as they have args/params
RuntimeDependencySubscriber
RuntimeDependencySubscriberImpl
Resource modconfig.DashboardLeafNode `json:"properties,omitempty"`
Data *dashboardtypes.LeafData `json:"data,omitempty"`
@@ -31,23 +31,22 @@ func (r *LeafRun) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
}
}
func NewLeafRun(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, executionTree *DashboardExecutionTree) (*LeafRun, error) {
func NewLeafRun(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, executionTree *DashboardExecutionTree, opts ...LeafRunOption) (*LeafRun, error) {
r := &LeafRun{
Resource: resource,
}
// create RuntimeDependencySubscriber- this handles 'with' run creation and resolving runtime dependency resolution
// create RuntimeDependencySubscriberImpl- this handles 'with' run creation and resolving runtime dependency resolution
// (NOTE: we have to do this after creating run as we need to pass a ref to the run)
r.RuntimeDependencySubscriber = *NewRuntimeDependencySubscriber(resource, parent, r, executionTree)
r.RuntimeDependencySubscriberImpl = *NewRuntimeDependencySubscriber(resource, parent, r, executionTree)
err := r.initRuntimeDependencies()
if err != nil {
return nil, err
// apply options AFTER calling NewRuntimeDependencySubscriber
for _, opt := range opts {
opt(r)
}
// if our underlying resource has a base which has runtime dependencies,
// create a RuntimeDependencySubscriber for it
if err := r.initBaseRuntimeDependencySubscriber(executionTree); err != nil {
err := r.initRuntimeDependencies(executionTree)
if err != nil {
return nil, err
}
@@ -77,22 +76,6 @@ func NewLeafRun(resource modconfig.DashboardLeafNode, parent dashboardtypes.Dash
return r, nil
}
func (r *LeafRun) initBaseRuntimeDependencySubscriber(executionTree *DashboardExecutionTree) error {
if base := r.resource.(modconfig.HclResource).GetBase(); base != nil {
if _, ok := base.(modconfig.RuntimeDependencyProvider); ok {
r.baseDependencySubscriber = NewRuntimeDependencySubscriber(base.(modconfig.DashboardLeafNode), nil, r, executionTree)
err := r.baseDependencySubscriber.initRuntimeDependencies()
if err != nil {
return err
}
// create buffered channel for base with to report their completion
r.baseDependencySubscriber.createChildCompleteChan()
}
}
return nil
}
func (r *LeafRun) createChildRuns(executionTree *DashboardExecutionTree) error {
children := r.resource.GetChildren()
if len(children) == 0 {
@@ -102,20 +85,20 @@ func (r *LeafRun) createChildRuns(executionTree *DashboardExecutionTree) error {
r.children = make([]dashboardtypes.DashboardTreeRun, len(children))
var errors []error
// if the leaf run has children (nodes/edges) create a run for this too
for i, c := range children {
// TODO [node_reuse] what about with nodes - only relevant when running base withs
childRun, err := NewLeafRun(c.(modconfig.DashboardLeafNode), r, executionTree)
var opts []LeafRunOption
childRun, err := NewLeafRun(c.(modconfig.DashboardLeafNode), r, executionTree, opts...)
if err != nil {
errors = append(errors, err)
continue
}
r.children[i] = childRun
}
return error_helpers.CombineErrors(errors...)
}
// Execute implements DashboardRunNode
// Execute implements DashboardTreeRun
func (r *LeafRun) Execute(ctx context.Context) {
defer func() {
// call our oncomplete is we have one
@@ -138,9 +121,9 @@ func (r *LeafRun) Execute(ctx context.Context) {
r.executeChildrenAsync(ctx)
// start a goroutine to wait for children to complete
doneChan := r.waitForChildren()
doneChan := r.waitForChildrenAsync()
if err := r.evaluateRuntimeDependencies(ctx); err != nil {
if err := r.evaluateRuntimeDependencies(); err != nil {
r.SetError(ctx, err)
return
}
@@ -149,7 +132,8 @@ func (r *LeafRun) Execute(ctx context.Context) {
r.setStatus(dashboardtypes.DashboardRunRunning)
// if we have sql to execute, do it now
if r.executeSQL != "" {
// (if we are only performing a base execution, do not run the query)
if r.executeSQL != "" && !r.executeConfig.BaseExecution {
if err := r.executeQuery(ctx); err != nil {
r.SetError(ctx, err)
return
@@ -174,12 +158,13 @@ func (r *LeafRun) SetError(ctx context.Context, err error) {
r.err = err
// error type does not serialise to JSON so copy into a string
r.ErrorString = err.Error()
// increment error count for snapshot hook
statushooks.SnapshotError(ctx)
// set status (this sends update event)
r.setStatus(dashboardtypes.DashboardRunError)
r.parent.ChildCompleteChan() <- r
r.notifyParentOfCompletion()
}
// SetComplete implements DashboardTreeRun
@@ -191,7 +176,7 @@ func (r *LeafRun) SetComplete(ctx context.Context) {
statushooks.UpdateSnapshotProgress(ctx, 1)
// tell parent we are done
r.parent.ChildCompleteChan() <- r
r.notifyParentOfCompletion()
}
// IsSnapshotPanel implements SnapshotPanel

View File

@@ -0,0 +1,9 @@
package dashboardexecute
type LeafRunOption = func(target *LeafRun)
func setName(name string) LeafRunOption {
return func(target *LeafRun) {
target.Name = name
}
}

View File

@@ -14,9 +14,8 @@ import (
type RuntimeDependencyPublisherImpl struct {
DashboardParentImpl
Args []any `json:"args,omitempty"`
Params []*modconfig.ParamDef `json:"params,omitempty"`
Args []any `json:"args,omitempty"`
Params []*modconfig.ParamDef `json:"params,omitempty"`
subscriptions map[string][]*RuntimeDependencyPublishTarget
withValueMutex sync.Mutex
withRuns map[string]*LeafRun
@@ -270,48 +269,14 @@ func (p *RuntimeDependencyPublisherImpl) withsComplete() bool {
return true
}
func (p *RuntimeDependencyPublisherImpl) findRuntimeDependencyPublisher(runtimeDependency *modconfig.RuntimeDependency) RuntimeDependencyPublisher {
// the runtime dependency publisher is usually the root node of the execution tree
// the exception to this is if the node is a LeafRun(?) for a base node which has a with block,
// in which case it may provide its own runtime depdency
// try ourselves
if p.ProvidesRuntimeDependency(runtimeDependency) {
return p
}
// try root node
// NOTE: we cannot just use b.executionTree.Root as this function is called at init time before Root is assigned
rootPublisher := p.getRoot().(RuntimeDependencyPublisher)
if rootPublisher.ProvidesRuntimeDependency(runtimeDependency) {
return rootPublisher
}
return nil
}
// get the root of the tree by searching up the parents
func (p *RuntimeDependencyPublisherImpl) getRoot() dashboardtypes.DashboardTreeRun {
var root dashboardtypes.DashboardTreeRun = p
for {
parent := root.GetParent()
if parent == p.executionTree {
break
}
root = parent.(dashboardtypes.DashboardTreeRun)
}
return root
}
func (p *RuntimeDependencyPublisherImpl) createWithRuns(withs []*modconfig.DashboardWith, executionTree *DashboardExecutionTree) error {
for _, w := range withs {
withRun, err := NewLeafRun(w, p, executionTree)
// NOTE: set the name of the run to be the scoped name
withRunName := fmt.Sprintf("%s.%s", p.GetName(), w.UnqualifiedName)
withRun, err := NewLeafRun(w, p, executionTree, setName(withRunName))
if err != nil {
return err
}
// NOTE: set the name of the run toe be the scoped name
withRun.Name = fmt.Sprintf("%s.%s", withRun.GetParent().GetName(), w.UnqualifiedName)
// set an onComplete function to populate 'with' data
withRun.onComplete = func() { p.setWithValue(withRun) }
@@ -320,3 +285,17 @@ func (p *RuntimeDependencyPublisherImpl) createWithRuns(withs []*modconfig.Dashb
}
return nil
}
func (p *RuntimeDependencyPublisherImpl) argsResolved(args []any) {
// use params to get param names for each arg and then look of subscriber
for i, param := range p.Params {
if i == len(args) {
return
}
// do we have a subscription for this param
if _, ok := p.subscriptions[param.UnqualifiedName]; ok {
p.PublishRuntimeDependencyValue(param.UnqualifiedName, &dashboardtypes.ResolvedRuntimeDependencyValue{Value: args[i]})
}
}
log.Printf("[TRACE] %s: argsResolved", p.Name)
}

View File

@@ -0,0 +1,6 @@
package dashboardexecute
type RuntimeDependencySubscriber interface {
RuntimeDependencyPublisher
GetBaseDependencySubscriber() RuntimeDependencySubscriber
}

View File

@@ -3,34 +3,35 @@ package dashboardexecute
import (
"context"
"fmt"
"github.com/turbot/go-kit/helpers"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"golang.org/x/exp/maps"
"log"
"sync"
)
type RuntimeDependencySubscriber struct {
type RuntimeDependencySubscriberImpl struct {
// all RuntimeDependencySubscribers are also publishers as they have args/params
RuntimeDependencyPublisherImpl
// map of runtime dependencies, keyed by dependency long name
runtimeDependencies map[string]*dashboardtypes.ResolvedRuntimeDependency
// a list of the (scoped) names of any runtime dependencies that we rely on
RuntimeDependencyNames []string `json:"dependencies,omitempty"`
RawSQL string `json:"sql,omitempty"`
executeSQL string
baseDependencySubscriber *RuntimeDependencySubscriber
RuntimeDependencyNames []string `json:"dependencies,omitempty"`
RawSQL string `json:"sql,omitempty"`
executeSQL string
// if the underlying resource has a base resource, create a RuntimeDependencySubscriberImpl instance to handle
// generation and publication of runtime depdencies from the base resource
baseDependencySubscriber *RuntimeDependencySubscriberImpl
}
func NewRuntimeDependencySubscriber(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, run dashboardtypes.DashboardTreeRun, executionTree *DashboardExecutionTree) *RuntimeDependencySubscriber {
b := &RuntimeDependencySubscriber{
func NewRuntimeDependencySubscriber(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, run dashboardtypes.DashboardTreeRun, executionTree *DashboardExecutionTree) *RuntimeDependencySubscriberImpl {
b := &RuntimeDependencySubscriberImpl{
runtimeDependencies: make(map[string]*dashboardtypes.ResolvedRuntimeDependency),
}
// TODO [node_reuse]
// HACK
// if this is a run for a base resource there will be no 'run'
if run == nil {
run = b
}
// create RuntimeDependencyPublisherImpl
// (we must create after creating the run as iut requires a ref to the run)
@@ -39,12 +40,24 @@ func NewRuntimeDependencySubscriber(resource modconfig.DashboardLeafNode, parent
return b
}
// GetBaseDependencySubscriber implements RuntimeDependencySubscriber
func (s *RuntimeDependencySubscriberImpl) GetBaseDependencySubscriber() RuntimeDependencySubscriber {
return s.baseDependencySubscriber
}
// if the resource is a runtime dependency provider, create with runs and resolve dependencies
func (s *RuntimeDependencySubscriber) initRuntimeDependencies() error {
func (s *RuntimeDependencySubscriberImpl) initRuntimeDependencies(executionTree *DashboardExecutionTree) error {
if _, ok := s.resource.(modconfig.RuntimeDependencyProvider); !ok {
return nil
}
// first call into publisher to start any with runs
// if our underlying resource has a base which has runtime dependencies,
// create a RuntimeDependencySubscriberImpl for it
if err := s.initBaseRuntimeDependencySubscriber(executionTree); err != nil {
return err
}
// call into publisher to start any with runs
if err := s.RuntimeDependencyPublisherImpl.initRuntimeDependencies(); err != nil {
return err
}
@@ -52,9 +65,27 @@ func (s *RuntimeDependencySubscriber) initRuntimeDependencies() error {
return s.resolveRuntimeDependencies()
}
func (s *RuntimeDependencySubscriberImpl) initBaseRuntimeDependencySubscriber(executionTree *DashboardExecutionTree) error {
if base := s.resource.(modconfig.HclResource).GetBase(); base != nil {
if _, ok := base.(modconfig.RuntimeDependencyProvider); ok {
// create base dependency subscriber
// pass ourselves as 'run'
// - this is only used when sending update events, which will not happen for the baseDependencySubscriber
s.baseDependencySubscriber = NewRuntimeDependencySubscriber(base.(modconfig.DashboardLeafNode), nil, s, executionTree)
err := s.baseDependencySubscriber.initRuntimeDependencies(executionTree)
if err != nil {
return err
}
// create buffered channel for base with to report their completion
s.baseDependencySubscriber.createChildCompleteChan()
}
}
return nil
}
// if this node has runtime dependencies, find the publisher of the dependency and create a dashboardtypes.ResolvedRuntimeDependency
// which we use to resolve the values
func (s *RuntimeDependencySubscriber) resolveRuntimeDependencies() error {
func (s *RuntimeDependencySubscriberImpl) resolveRuntimeDependencies() error {
rdp, ok := s.resource.(modconfig.RuntimeDependencyProvider)
if !ok {
return nil
@@ -100,10 +131,49 @@ func (s *RuntimeDependencySubscriber) resolveRuntimeDependencies() error {
publisherName := publisher.GetName()
s.runtimeDependencies[name] = dashboardtypes.NewResolvedRuntimeDependency(dep, valueChannel, publisherName)
}
return nil
}
func (s *RuntimeDependencySubscriber) evaluateRuntimeDependencies(ctx context.Context) error {
func (s *RuntimeDependencySubscriberImpl) findRuntimeDependencyPublisher(runtimeDependency *modconfig.RuntimeDependency) RuntimeDependencyPublisher {
// the runtime dependency publisher is either the root dashboard run,
// or if this resource (or in case of a node/edge, the resource parent) has a base,
// the baseDependencySubscriber for that base
var subscriber RuntimeDependencySubscriber = s
if s.NodeType == modconfig.BlockTypeNode || s.NodeType == modconfig.BlockTypeEdge {
subscriber = s.parent.(RuntimeDependencySubscriber)
}
baseSubscriber := subscriber.GetBaseDependencySubscriber()
// not check the provider property on the runtime dependency
// - if the matches the underlying resource for the baseDependencySubscriber,
// then baseDependencySubscriber _should_ be the dependency publisher
if !helpers.IsNil(baseSubscriber) && runtimeDependency.Provider == baseSubscriber.GetResource() {
if baseSubscriber.ProvidesRuntimeDependency(runtimeDependency) {
return baseSubscriber
}
// unexpected
log.Printf("[WARN] dependency %s has a dependency provider matching the base resource %s but the BaseDependencySubscriber does not provider the runtime dependency",
runtimeDependency.String(), baseSubscriber.GetName())
return nil
}
// see if we can satisfy the dependency (this would occur when initialising the baseDependencySubscriber
if s.ProvidesRuntimeDependency(runtimeDependency) {
return s
}
// otherwise the dashboard run must be the publisher
dashboardRun := s.executionTree.runs[s.DashboardName].(RuntimeDependencyPublisher)
if dashboardRun.ProvidesRuntimeDependency(runtimeDependency) {
return dashboardRun
}
return nil
}
func (s *RuntimeDependencySubscriberImpl) evaluateRuntimeDependencies() error {
log.Printf("[TRACE] %s: evaluateRuntimeDependencies", s.Name)
// now wait for any runtime dependencies then resolve args and params
// (it is possible to have params but no sql)
if s.hasRuntimeDependencies() {
@@ -111,55 +181,96 @@ func (s *RuntimeDependencySubscriber) evaluateRuntimeDependencies(ctx context.Co
if err := s.waitForRuntimeDependencies(); err != nil {
return err
}
log.Printf("[TRACE] %s: runtime dependencies availablem resolving sql and args", s.Name)
// ok now we have runtime dependencies, we can resolve the query
if err := s.resolveSQLAndArgs(); err != nil {
return err
}
s.argsResolved(s.Args)
}
return nil
}
func (s *RuntimeDependencySubscriber) waitForRuntimeDependencies() error {
func (s *RuntimeDependencySubscriberImpl) waitForRuntimeDependencies() error {
log.Printf("[TRACE] %s: waitForRuntimeDependencies", s.Name)
if !s.hasRuntimeDependencies() {
log.Printf("[TRACE] %s: no runtime dependencies", s.Name)
return nil
}
// wait for base dependencies if we have any
if s.baseDependencySubscriber != nil {
log.Printf("[TRACE] %s: calling baseDependencySubscriber.waitForRuntimeDependencies", s.Name)
if err := s.baseDependencySubscriber.waitForRuntimeDependencies(); err != nil {
return err
}
}
log.Printf("[TRACE] %s: checking whether all depdencies are resolved", s.Name)
allRuntimeDepsResolved := true
for _, dep := range s.runtimeDependencies {
if !dep.IsResolved() {
allRuntimeDepsResolved = false
log.Printf("[TRACE] %s: dependency %s is NOT resolved", s.Name, dep.Dependency.String())
}
}
if allRuntimeDepsResolved {
return nil
}
log.Printf("[TRACE] %s: BLOCKED", s.Name)
// set status to blocked
s.setStatus(dashboardtypes.DashboardRunBlocked)
log.Printf("[TRACE] LeafRun '%s' waitForRuntimeDependencies", s.resource.Name())
for _, resolvedDependency := range s.runtimeDependencies {
// TODO [node_reuse] what about dependencies _between_ dependencies - do this async
// block until the dependency is available
err := resolvedDependency.Resolve()
if err != nil {
return err
var wg sync.WaitGroup
var errChan = make(chan error)
var doneChan = make(chan struct{})
for _, r := range s.runtimeDependencies {
if !r.IsResolved() {
// make copy of loop var for goroutine
resolvedDependency := r
log.Printf("[TRACE] %s: wait for %s", s.Name, resolvedDependency.Dependency.String())
wg.Add(1)
go func() {
defer wg.Done()
// block until the dependency is available
err := resolvedDependency.Resolve()
log.Printf("[TRACE] %s: Resolve returned for %s", s.Name, resolvedDependency.Dependency.String())
if err != nil {
log.Printf("[TRACE] %s: Resolve for %s returned error:L %s", s.Name, resolvedDependency.Dependency.String(), err.Error())
errChan <- err
}
}()
}
}
go func() {
log.Printf("[TRACE] %s: goroutine waiting for all runtime deps to be available", s.Name)
wg.Wait()
close(doneChan)
}()
var errors []error
wait_loop:
for {
select {
case err := <-errChan:
errors = append(errors, err)
case <-doneChan:
break wait_loop
}
}
log.Printf("[TRACE] %s: all runtime dependencies ready", s.resource.Name())
return nil
return error_helpers.CombineErrors(errors...)
}
func (s *RuntimeDependencySubscriber) findRuntimeDependenciesForParentProperty(parentProperty string) []*dashboardtypes.ResolvedRuntimeDependency {
func (s *RuntimeDependencySubscriberImpl) findRuntimeDependenciesForParentProperty(parentProperty string) []*dashboardtypes.ResolvedRuntimeDependency {
var res []*dashboardtypes.ResolvedRuntimeDependency
for _, dep := range s.runtimeDependencies {
if dep.Dependency.ParentPropertyName == parentProperty {
@@ -177,7 +288,7 @@ func (s *RuntimeDependencySubscriber) findRuntimeDependenciesForParentProperty(p
return res
}
func (s *RuntimeDependencySubscriber) findRuntimeDependencyForParentProperty(parentProperty string) *dashboardtypes.ResolvedRuntimeDependency {
func (s *RuntimeDependencySubscriberImpl) findRuntimeDependencyForParentProperty(parentProperty string) *dashboardtypes.ResolvedRuntimeDependency {
res := s.findRuntimeDependenciesForParentProperty(parentProperty)
if len(res) > 1 {
panic(fmt.Sprintf("findRuntimeDependencyForParentProperty for %s, parent property %s, returned more that 1 result", s.Name, parentProperty))
@@ -190,8 +301,8 @@ func (s *RuntimeDependencySubscriber) findRuntimeDependencyForParentProperty(par
}
// resolve the sql for this leaf run into the source sql (i.e. NOT the prepared statement name) and resolved args
func (s *RuntimeDependencySubscriber) resolveSQLAndArgs() error {
log.Printf("[TRACE] LeafRun '%s' resolveSQLAndArgs", s.resource.Name())
func (s *RuntimeDependencySubscriberImpl) resolveSQLAndArgs() error {
log.Printf("[TRACE] %s: resolveSQLAndArgs", s.resource.Name())
queryProvider, ok := s.resource.(modconfig.QueryProvider)
if !ok {
// not a query provider - nothing to do
@@ -201,18 +312,18 @@ func (s *RuntimeDependencySubscriber) resolveSQLAndArgs() error {
// convert arg runtime dependencies into arg map
runtimeArgs, err := s.buildRuntimeDependencyArgs()
if err != nil {
log.Printf("[TRACE] LeafRun '%s' buildRuntimeDependencyArgs failed: %s", s.resource.Name(), err.Error())
log.Printf("[TRACE] %s: buildRuntimeDependencyArgs failed: %s", s.resource.Name(), err.Error())
return err
}
// now if any param defaults had runtime dependencies, populate them
s.populateParamDefaults(queryProvider)
log.Printf("[TRACE] LeafRun '%s' built runtime args: %v", s.resource.Name(), runtimeArgs)
log.Printf("[TRACE] %s: built runtime args: %v", s.resource.Name(), runtimeArgs)
// does this leaf run have any SQL to execute?
// TODO [node_reuse] split this into resolve query and resolve args - we may have args but no query
if queryProvider.RequiresExecution(queryProvider) {
log.Printf("[TRACE] ResolveArgsFromQueryProvider for %s", queryProvider.Name())
resolvedQuery, err := s.executionTree.workspace.ResolveQueryFromQueryProvider(queryProvider, runtimeArgs)
if err != nil {
return err
@@ -220,12 +331,25 @@ func (s *RuntimeDependencySubscriber) resolveSQLAndArgs() error {
s.RawSQL = resolvedQuery.RawSQL
s.executeSQL = resolvedQuery.ExecuteSQL
s.Args = resolvedQuery.Args
} else {
// otherwise just resolve the args
// merge the base args with the runtime args
runtimeArgs, err = modconfig.MergeArgs(queryProvider, runtimeArgs)
if err != nil {
return err
}
args, err := modconfig.ResolveArgs(queryProvider, runtimeArgs)
if err != nil {
return err
}
s.Args = args
}
//}
return nil
}
func (s *RuntimeDependencySubscriber) populateParamDefaults(provider modconfig.QueryProvider) {
func (s *RuntimeDependencySubscriberImpl) populateParamDefaults(provider modconfig.QueryProvider) {
paramDefs := provider.GetParams()
for _, paramDef := range paramDefs {
if dep := s.findRuntimeDependencyForParentProperty(paramDef.UnqualifiedName); dep != nil {
@@ -238,10 +362,10 @@ func (s *RuntimeDependencySubscriber) populateParamDefaults(provider modconfig.Q
}
// convert runtime dependencies into arg map
func (s *RuntimeDependencySubscriber) buildRuntimeDependencyArgs() (*modconfig.QueryArgs, error) {
func (s *RuntimeDependencySubscriberImpl) buildRuntimeDependencyArgs() (*modconfig.QueryArgs, error) {
res := modconfig.NewQueryArgs()
log.Printf("[TRACE] LeafRun '%s' buildRuntimeDependencyArgs - %d runtime dependencies", s.resource.Name(), len(s.runtimeDependencies))
log.Printf("[TRACE] %s: buildRuntimeDependencyArgs - %d runtime dependencies", s.resource.Name(), len(s.runtimeDependencies))
// if the runtime dependencies use position args, get the max index and ensure the args array is large enough
maxArgIndex := -1
@@ -278,7 +402,7 @@ func (s *RuntimeDependencySubscriber) buildRuntimeDependencyArgs() (*modconfig.Q
return res, nil
}
func (s *RuntimeDependencySubscriber) hasParam(paramName string) bool {
func (s *RuntimeDependencySubscriberImpl) hasParam(paramName string) bool {
for _, p := range s.Params {
if p.ShortName == paramName {
return true
@@ -288,12 +412,16 @@ func (s *RuntimeDependencySubscriber) hasParam(paramName string) bool {
}
// populate the list of runtime dependencies that this run depends on
func (s *RuntimeDependencySubscriber) setRuntimeDependencies() {
func (s *RuntimeDependencySubscriberImpl) setRuntimeDependencies() {
names := make(map[string]struct{}, len(s.runtimeDependencies))
for _, d := range s.runtimeDependencies {
// tactical - exclude params
//if d.Dependency.PropertyPath.ItemType =modconfig.BlockTypeParam
// add to DependencyWiths using ScopedName, i.e. <parent FullName>.<with UnqualifiedName>.
// we do this as there may be a with from a base resource with a clashing with name
// NOTE: this must be consistent with the naming in RuntimeDependencyPublisherImpl.createWithRuns
s.RuntimeDependencyNames = append(s.RuntimeDependencyNames, d.ScopedName())
names[d.ScopedName()] = struct{}{}
}
// get base runtime dependencies (if any)
@@ -301,25 +429,37 @@ func (s *RuntimeDependencySubscriber) setRuntimeDependencies() {
s.baseDependencySubscriber.setRuntimeDependencies()
s.RuntimeDependencyNames = append(s.RuntimeDependencyNames, s.baseDependencySubscriber.RuntimeDependencyNames...)
}
s.RuntimeDependencyNames = maps.Keys(names)
}
func (s *RuntimeDependencySubscriber) hasRuntimeDependencies() bool {
func (s *RuntimeDependencySubscriberImpl) hasRuntimeDependencies() bool {
return len(s.runtimeDependencies)+len(s.baseRuntimeDependencies()) > 0
}
func (s *RuntimeDependencySubscriber) baseRuntimeDependencies() map[string]*dashboardtypes.ResolvedRuntimeDependency {
func (s *RuntimeDependencySubscriberImpl) baseRuntimeDependencies() map[string]*dashboardtypes.ResolvedRuntimeDependency {
if s.baseDependencySubscriber == nil {
return map[string]*dashboardtypes.ResolvedRuntimeDependency{}
}
return s.baseDependencySubscriber.runtimeDependencies
}
// override DashboardParentImpl.executeChildrenAsync to also execute 'withs' of our baseDependencySubscriber
func (s *RuntimeDependencySubscriber) executeChildrenAsync(ctx context.Context) {
// if we have a baseDependencySubscriber, do not execute it but execute its with runs
// override DashboardParentImpl.executeChildrenAsync to also execute 'withs' of our baseRun
func (s *RuntimeDependencySubscriberImpl) executeChildrenAsync(ctx context.Context) {
// if we have a baseDependencySubscriber, execute it
if s.baseDependencySubscriber != nil {
s.baseDependencySubscriber.executeWithsAsync(ctx)
go s.baseDependencySubscriber.executeWithsAsync(ctx)
}
// if this leaf run has children (including with runs) execute them asynchronously
// set RuntimeDependenciesOnly if needed
s.DashboardParentImpl.executeChildrenAsync(ctx)
}
func (s *RuntimeDependencySubscriberImpl) argsResolved(args []any) {
if s.baseDependencySubscriber != nil {
s.baseDependencySubscriber.argsResolved(args)
}
s.RuntimeDependencyPublisherImpl.argsResolved(args)
}

View File

@@ -220,8 +220,6 @@ func buildExecutionCompletePayload(event *dashboardevents.ExecutionComplete) ([]
SchemaVersion: fmt.Sprintf("%d", ExecutionCompletePayloadSchemaVersion),
ExecutionId: event.ExecutionId,
Snapshot: snap,
StartTime: event.StartTime,
EndTime: event.EndTime,
}
return json.Marshal(payload)
}

View File

@@ -213,16 +213,16 @@ func (s *Server) HandleDashboardEvent(event dashboardevents.DashboardEvent) {
_ = s.webSocket.Broadcast(payload)
}
var dashboardssBeingWatched []string
var dashboardsBeingWatched []string
dashboardClients := s.getDashboardClients()
for _, dashboardClientInfo := range dashboardClients {
dashboardName := typeHelpers.SafeString(dashboardClientInfo.Dashboard)
if dashboardClientInfo.Dashboard != nil {
if helpers.StringSliceContains(dashboardssBeingWatched, dashboardName) {
if helpers.StringSliceContains(dashboardsBeingWatched, dashboardName) {
continue
}
dashboardssBeingWatched = append(dashboardssBeingWatched, dashboardName)
dashboardsBeingWatched = append(dashboardsBeingWatched, dashboardName)
}
}
@@ -230,21 +230,21 @@ func (s *Server) HandleDashboardEvent(event dashboardevents.DashboardEvent) {
var newDashboardNames []string
// Process the changed items and make a note of the dashboard(s) they're in
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedBenchmarks)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedCategories)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedContainers)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedControls)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedCards)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedCharts)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedEdges)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedFlows)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedGraphs)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedHierarchies)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedImages)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedInputs)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedNodes)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedTables)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardssBeingWatched, changedDashboardNames, changedTexts)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedBenchmarks)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCategories)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedContainers)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedControls)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCards)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCharts)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedEdges)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedFlows)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedGraphs)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedHierarchies)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedImages)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedInputs)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedNodes)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedTables)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedTexts)...)
for _, changedDashboard := range changedDashboards {
if helpers.StringSliceContains(changedDashboardNames, changedDashboard.Name) {

View File

@@ -82,8 +82,6 @@ type ExecutionCompletePayload struct {
SchemaVersion string `json:"schema_version"`
Snapshot *dashboardtypes.SteampipeSnapshot `json:"snapshot"`
ExecutionId string `json:"execution_id"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
}
type DisplaySnapshotPayload struct {

View File

@@ -2,6 +2,7 @@ package dashboardtypes
import (
"context"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
)
// DashboardTreeRun is an interface implemented by all dashboard run nodes
@@ -19,4 +20,10 @@ type DashboardTreeRun interface {
GetInputsDependingOn(string) []string
GetNodeType() string
AsTreeNode() *SnapshotTreeNode
GetResource() modconfig.DashboardLeafNode
}
type TreeRunExecuteConfig struct {
RuntimeDependenciesOnly bool
BaseExecution bool
}

View File

@@ -130,8 +130,6 @@ func (v *VersionChecker) getLatestVersionsForPlugins(ctx context.Context, plugin
return map[string]VersionCheckReport{}
}
log.Println("[TRACE] serverResponse:", serverResponse)
for _, pluginResponseData := range serverResponse {
r := reports[pluginResponseData.getMapKey()]
r.CheckResponse = pluginResponseData

View File

@@ -249,7 +249,7 @@ func (c *Control) setBaseProperties(resourceMapProvider ResourceMapsProvider) {
}
// copy base into the HclResourceImpl 'base' property so it is accessible to all nested structs
c.base = c.Base
// call into parent nested struct setBaseProperties
// call into parent nested struct setBaseProperties
c.QueryProviderImpl.setBaseProperties()
if c.SearchPath == nil {
@@ -272,5 +272,5 @@ func (c *Control) setBaseProperties(resourceMapProvider ResourceMapsProvider) {
c.Display = c.Base.Display
}
c.MergeRuntimeDependencies(c.Base)
c.MergeBaseDependencies(c.Base)
}

View File

@@ -181,5 +181,5 @@ func (c *DashboardCard) setBaseProperties(resourceMapProvider ResourceMapsProvid
c.Width = c.Base.Width
}
c.MergeRuntimeDependencies(c.Base)
c.MergeBaseDependencies(c.Base)
}

View File

@@ -204,5 +204,5 @@ func (c *DashboardChart) setBaseProperties(resourceMapProvider ResourceMapsProvi
c.Width = c.Base.Width
}
c.MergeRuntimeDependencies(c.Base)
c.MergeBaseDependencies(c.Base)
}

View File

@@ -132,5 +132,5 @@ func (e *DashboardEdge) setBaseProperties(resourceMapProvider ResourceMapsProvid
if e.Category == nil {
e.Category = e.Base.Category
}
e.MergeRuntimeDependencies(e.Base)
e.MergeBaseDependencies(e.Base)
}

View File

@@ -27,10 +27,19 @@ func (l *DashboardEdgeList) Get(name string) *DashboardEdge {
return nil
}
func (l DashboardEdgeList) Names() []string {
res := make([]string, len(l))
for i, e := range l {
func (l *DashboardEdgeList) Names() []string {
res := make([]string, len(*l))
for i, e := range *l {
res[i] = e.Name()
}
return res
}
func (l *DashboardEdgeList) Contains(other *DashboardEdge) bool {
for _, e := range *l {
if e == other {
return true
}
}
return false
}

View File

@@ -241,5 +241,5 @@ func (f *DashboardFlow) setBaseProperties(resourceMapProvider ResourceMapsProvid
} else {
f.Nodes.Merge(f.Base.Nodes)
}
f.MergeRuntimeDependencies(f.Base)
f.MergeBaseDependencies(f.Base)
}

View File

@@ -246,5 +246,5 @@ func (g *DashboardGraph) setBaseProperties(resourceMapProvider ResourceMapsProvi
} else {
g.Nodes.Merge(g.Base.Nodes)
}
g.MergeRuntimeDependencies(g.Base)
g.MergeBaseDependencies(g.Base)
}

View File

@@ -243,5 +243,5 @@ func (h *DashboardHierarchy) setBaseProperties(resourceMapProvider ResourceMapsP
} else {
h.Nodes.Merge(h.Base.Nodes)
}
h.MergeRuntimeDependencies(h.Base)
h.MergeBaseDependencies(h.Base)
}

View File

@@ -149,5 +149,5 @@ func (i *DashboardImage) setBaseProperties(resourceMapProvider ResourceMapsProvi
i.Display = i.Base.Display
}
i.MergeRuntimeDependencies(i.Base)
i.MergeBaseDependencies(i.Base)
}

View File

@@ -212,5 +212,5 @@ func (i *DashboardInput) setBaseProperties(resourceMapProvider ResourceMapsProvi
i.Width = i.Base.Width
}
i.MergeRuntimeDependencies(i.Base)
i.MergeBaseDependencies(i.Base)
}

View File

@@ -147,5 +147,5 @@ func (n *DashboardNode) setBaseProperties(resourceMapProvider ResourceMapsProvid
if n.Params == nil {
n.Params = n.Base.Params
}
n.MergeRuntimeDependencies(n.Base)
n.MergeBaseDependencies(n.Base)
}

View File

@@ -18,8 +18,8 @@ func (l *DashboardNodeList) Merge(other DashboardNodeList) {
}
}
func (l DashboardNodeList) Get(name string) *DashboardNode {
for _, n := range l {
func (l *DashboardNodeList) Get(name string) *DashboardNode {
for _, n := range *l {
if n.Name() == name {
return n
}
@@ -27,10 +27,19 @@ func (l DashboardNodeList) Get(name string) *DashboardNode {
return nil
}
func (l DashboardNodeList) Names() []string {
res := make([]string, len(l))
for i, n := range l {
func (l *DashboardNodeList) Names() []string {
res := make([]string, len(*l))
for i, n := range *l {
res[i] = n.Name()
}
return res
}
func (l *DashboardNodeList) Contains(other *DashboardNode) bool {
for _, e := range *l {
if e == other {
return true
}
}
return false
}

View File

@@ -93,5 +93,5 @@ func (w *DashboardWith) setBaseProperties(resourceMapProvider ResourceMapsProvid
// call into parent nested struct setBaseProperties
w.QueryProviderImpl.setBaseProperties()
w.MergeRuntimeDependencies(w.base.(*DashboardWith))
w.MergeBaseDependencies(w.base.(*DashboardWith))
}

View File

@@ -74,6 +74,7 @@ type QueryProvider interface {
MergeParentArgs(QueryProvider, QueryProvider) hcl.Diagnostics
GetQueryProviderImpl() *QueryProviderImpl
ParamsInheritedFromBase() bool
ArgsInheritedFromBase() bool
}
type CtyValueProvider interface {
@@ -106,6 +107,7 @@ type ResourceMapsProvider interface {
// NodeAndEdgeProvider must be implemented by any dashboard leaf node which supports edges and nodes
// (DashboardGraph, DashboardFlow, DashboardHierarchy)
// TODO [node_reuse] add NodeAndEdgeProviderImpl
type NodeAndEdgeProvider interface {
QueryProvider
GetEdges() DashboardEdgeList

View File

@@ -29,7 +29,7 @@ func MergeArgs(queryProvider QueryProvider, runtimeArgs *QueryArgs) (*QueryArgs,
// it returns the arg values as a csv string which can be used in a prepared statement invocation
// (the arg values and param defaults will already have been converted to postgres format)
func ResolveArgs(qp QueryProvider, runtimeArgs *QueryArgs) ([]any, error) {
var paramVals []any
var argVals []any
var missingParams []string
var err error
// validate args
@@ -52,12 +52,12 @@ func ResolveArgs(qp QueryProvider, runtimeArgs *QueryArgs) ([]any, error) {
log.Printf("[TRACE] %s defines %d named %s but has no parameters definitions", qp.Name(), namedArgCount, utils.Pluralize("arg", namedArgCount))
} else {
// do params contain named params?
paramVals, missingParams, err = mergedArgs.resolveNamedParameters(qp)
argVals, missingParams, err = mergedArgs.resolveNamedParameters(qp)
}
} else {
// resolve as positional parameters
// (or fall back to defaults if no positional params are present)
paramVals, missingParams, err = mergedArgs.resolvePositionalParameters(qp)
argVals, missingParams, err = mergedArgs.resolvePositionalParameters(qp)
}
if err != nil {
return nil, err
@@ -70,15 +70,15 @@ func ResolveArgs(qp QueryProvider, runtimeArgs *QueryArgs) ([]any, error) {
}
// are there any params?
if len(paramVals) == 0 {
if len(argVals) == 0 {
return nil, nil
}
// convert any array args into a strongly typed array
for i, v := range paramVals {
paramVals[i] = type_conversion.AnySliceToTypedSlice(v)
for i, v := range argVals {
argVals[i] = type_conversion.AnySliceToTypedSlice(v)
}
// success!
return paramVals, nil
return argVals, nil
}

View File

@@ -18,8 +18,10 @@ type QueryProviderImpl struct {
Params []*ParamDef `cty:"params" column:"params,jsonb" json:"-"`
withs []*DashboardWith
runtimeDependencies map[string]*RuntimeDependency
disableCtySerialise bool
// flags to indicate if params and args were inherited from base resource
argsInheritedFromBase bool
paramsInheritedFromBase bool
}
// GetParams implements QueryProvider
@@ -122,7 +124,7 @@ func (b *QueryProviderImpl) MergeParentArgs(queryProvider QueryProvider, parent
return nil
}
// GetQueryProviderBase implements QueryProvider
// GetQueryProviderImpl implements QueryProvider
func (b *QueryProviderImpl) GetQueryProviderImpl() *QueryProviderImpl {
return b
}
@@ -130,22 +132,13 @@ func (b *QueryProviderImpl) GetQueryProviderImpl() *QueryProviderImpl {
// ParamsInheritedFromBase implements QueryProvider
// determine whether our params were inherited from base resource
func (b *QueryProviderImpl) ParamsInheritedFromBase() bool {
// note: this depends on baseQueryProvider being a reference to the same object as the derived class
// base property which was used to populate the params
if b.base == nil {
return false
}
return b.paramsInheritedFromBase
}
baseParams := b.base.(QueryProvider).GetParams()
if len(b.Params) != len(baseParams) {
return false
}
for i, p := range b.Params {
if baseParams[i] != p {
return false
}
}
return true
// ArgsInheritedFromBase implements QueryProvider
// determine whether our args were inherited from base resource
func (b *QueryProviderImpl) ArgsInheritedFromBase() bool {
return b.argsInheritedFromBase
}
// CtyValue implements CtyValueProvider
@@ -166,12 +159,34 @@ func (b *QueryProviderImpl) setBaseProperties() {
}
if b.Args == nil {
b.Args = b.getBaseImpl().Args
b.argsInheritedFromBase = true
}
if b.Params == nil {
b.Params = b.getBaseImpl().Params
b.paramsInheritedFromBase = true
}
}
func (b *QueryProviderImpl) getBaseImpl() *QueryProviderImpl {
return b.base.(QueryProvider).GetQueryProviderImpl()
}
func (b *QueryProviderImpl) MergeBaseDependencies(base QueryProvider) {
//only merge dependency if target property of other was inherited
//i.e. if other target propery
baseRuntimeDependencies := base.GetRuntimeDependencies()
if b.runtimeDependencies == nil {
b.runtimeDependencies = make(map[string]*RuntimeDependency)
}
for _, baseDep := range baseRuntimeDependencies {
if _, ok := b.runtimeDependencies[baseDep.String()]; !ok {
// was this target parent property (args/params) inherited
if (baseDep.ParentPropertyName == "args" && !b.ArgsInheritedFromBase()) ||
!b.ParamsInheritedFromBase() {
continue
}
b.runtimeDependencies[baseDep.String()] = baseDep
}
}
}

View File

@@ -11,6 +11,7 @@ type ResolvedQuery struct {
Args []any
}
// QueryArgs converts the ResolvedQuery into QueryArgs
func (r ResolvedQuery) QueryArgs() *QueryArgs {
res := NewQueryArgs()

View File

@@ -15,6 +15,9 @@ type RuntimeDependency struct {
// this provides support for args which convert a runtime dependency to an array, like:
// arns = [input.arn]
IsArray bool
// resource which provides has the dependency
Provider HclResource
}
func (d *RuntimeDependency) SourceResourceName() string {

View File

@@ -41,22 +41,12 @@ func (b *RuntimeDependencyProviderImpl) AddRuntimeDependencies(dependencies []*R
b.runtimeDependencies = make(map[string]*RuntimeDependency)
}
for _, dependency := range dependencies {
// set the dependency provider (this is used if this resource is inherited via base)
dependency.Provider = b
b.runtimeDependencies[dependency.String()] = dependency
}
}
func (b *RuntimeDependencyProviderImpl) MergeRuntimeDependencies(other QueryProvider) {
dependencies := other.GetRuntimeDependencies()
if b.runtimeDependencies == nil {
b.runtimeDependencies = make(map[string]*RuntimeDependency)
}
for _, dependency := range dependencies {
if _, ok := b.runtimeDependencies[dependency.String()]; !ok {
b.runtimeDependencies[dependency.String()] = dependency
}
}
}
func (b *RuntimeDependencyProviderImpl) GetRuntimeDependencies() map[string]*RuntimeDependency {
return b.runtimeDependencies
}

View File

@@ -285,7 +285,7 @@ func decodeVariable(block *hcl.Block, parseCtx *ModParseContext) (*modconfig.Var
}
func decodeQueryProvider(block *hcl.Block, parseCtx *ModParseContext) (modconfig.HclResource, *decodeResult) {
func decodeQueryProvider(block *hcl.Block, parseCtx *ModParseContext) (modconfig.QueryProvider, *decodeResult) {
res := newDecodeResult()
// TODO [node_reuse] need raise errors for invalid properties
@@ -313,7 +313,7 @@ func decodeQueryProvider(block *hcl.Block, parseCtx *ModParseContext) (modconfig
// decode 'with',args and params blocks
res.Merge(decodeQueryProviderBlocks(block, remain.(*hclsyntax.Body), resource, parseCtx))
return resource, res
return resource.(modconfig.QueryProvider), res
}
func decodeQueryProviderBlocks(block *hcl.Block, content *hclsyntax.Body, resource modconfig.HclResource, parseCtx *ModParseContext) *decodeResult {
@@ -431,6 +431,14 @@ func decodeNodeAndEdgeProviderBlocks(content *hclsyntax.Body, nodeAndEdgeProvide
case modconfig.BlockTypeNode, modconfig.BlockTypeEdge:
child, childRes := decodeQueryProvider(block, parseCtx)
// TACTICAL if child has any runtime dependencies, claim them
// this is to ensure if this resourc eis used as base, we can be correctly identified
// as the publisher of the runtime dependencies
for _, r := range child.GetRuntimeDependencies() {
r.Provider = nodeAndEdgeProvider
}
// populate metadata, set references and call OnDecoded
handleModDecodeResult(child, childRes, block, parseCtx)
res.Merge(childRes)

View File

@@ -1,24 +0,0 @@
dashboard "base_with" {
with "dw1" {
sql = "select 'foo'"
}
table {
base = table.t1
# args = {
# "p1": with.w1.rows[0]
# }
}
}
table "t1"{
with "w1" {
sql = "select 'foo'"
}
sql = "select $1 as c1"
param "p1" {
default = with.w1.rows[0]
}
}

View File

@@ -0,0 +1,29 @@
dashboard "base_with" {
# with "w1" {
# sql = "select 'dashboard foo'"
# }
table {
base = table.t1
}
# table {
# title = "nested level table"
# base = table.t1
# args = {
# "p1": with.w1.rows[0]
# }
# }
}
table "t1"{
title = "top level table"
with "w1" {
sql = "select 'foo'"
}
sql = "select $1 as c1"
param "p1" {
default = with.w1.rows[0]
}
}

View File

@@ -16,38 +16,37 @@ dashboard "many_withs" {
EOQ
}
container {
graph {
title = "Relationships"
width = 12
type = "graph"
graph {
title = "Relationships"
width = 12
type = "graph"
node "n1" {
sql = <<-EOQ
select
$1 as id,
$1 as title
EOQ
args = [ with.n1.rows[0]]
}
node "n2" {
sql = <<-EOQ
select
$1 as id,
$1 as title
EOQ
node "n1" {
sql = <<-EOQ
select
$1 as id,
$1 as title
EOQ
args = [ with.n1.rows[0]]
}
node "n2" {
sql = <<-EOQ
select
$1 as id,
$1 as title
EOQ
args = [ with.n2.rows[0]]
}
edge "n1_n2" {
sql = <<-EOQ
select
$1 as from_id,
$2 as to_id
EOQ
args = [with.n1.rows[0], with.n2.rows[0]]
}
args = [ with.n2.rows[0]]
}
edge "n1_n2" {
sql = <<-EOQ
select
$1 as from_id,
$2 as to_id
EOQ
args = [with.n1.rows[0], with.n2.rows[0]]
}
}
}

View File

@@ -0,0 +1,27 @@
dashboard "many_withs_base" {
title = "Many Withs Base"
graph {
base = graph.g1
}
}
graph "g1"{
with "n1" {
sql = <<-EOQ
select 'n1'
EOQ
}
node "n1" {
sql = <<-EOQ
select
$1 as id,
$1 as title
EOQ
args = [ with.n1.rows[0]]
}
}

View File

@@ -1,10 +1,13 @@
dashboard "name_graph" {
title = "named graph with base and args"
input "bucket_arn" {
title = "Select a bucket:"
query = query.s3_bucket_input
width = 4
}
with "bucket_policy" {
sql = <<-EOQ
select
@@ -14,16 +17,19 @@ dashboard "name_graph" {
where
arn = $1;
EOQ
args = [self.input.bucket_arn.value]
}
graph {
base = graph.iam_policy_structure
args = {
policy_std = with.bucket_policy[0].policy_std
policy_std = with.bucket_policy.rows[0].policy_std
}
}
}
query "s3_bucket_input" {
sql = <<-EOQ
select
@@ -39,88 +45,108 @@ query "s3_bucket_input" {
title;
EOQ
}
//** The Graph....
graph "iam_policy_structure" {
title = "IAM Policy"
param "policy_std" {}
# node {
# base = node.iam_policy_statement
# args = {
# iam_policy_std = param.policy_std
# }
# }
node {
base = node.iam_policy_statement_action_notaction
args = {
iam_policy_std = param.policy_std
}
}
node {
base = node.iam_policy_statement_condition
args = {
iam_policy_std = param.policy_std
}
}
node {
base = node.iam_policy_statement_condition_key
args = {
iam_policy_std = param.policy_std
}
}
node {
base = node.iam_policy_statement_condition_key_value
args = {
iam_policy_std = param.policy_std
}
}
node {
base = node.iam_policy_statement_resource_notresource
args = {
iam_policy_std = param.policy_std
}
}
# edge {
# base = edge.iam_policy_statement
# args = {
# iam_policy_arns = [self.input.policy_arn.value]
# }
# }
edge {
base = edge.iam_policy_statement_action
args = {
iam_policy_std = param.policy_std
}
}
edge {
base = edge.iam_policy_statement_condition
args = {
iam_policy_std = param.policy_std
}
}
edge {
base = edge.iam_policy_statement_condition_key
args = {
iam_policy_std = param.policy_std
}
}
edge {
base = edge.iam_policy_statement_condition_key_value
args = {
iam_policy_std = param.policy_std
}
}
edge {
base = edge.iam_policy_statement_notaction
args = {
iam_policy_std = param.policy_std
}
}
edge {
base = edge.iam_policy_statement_notresource
args = {
iam_policy_std = param.policy_std
}
}
edge {
base = edge.iam_policy_statement_resource
args = {
@@ -128,9 +154,15 @@ graph "iam_policy_structure" {
}
}
}
// nodes
node "iam_policy_statement" {
category = category.iam_policy_statement
sql = <<-EOQ
select
concat('statement:', i) as id,
@@ -141,11 +173,15 @@ node "iam_policy_statement" {
from
jsonb_array_elements(($1 :: jsonb) -> 'Statement') with ordinality as t(stmt,i)
EOQ
param "iam_policy_std" {}
}
node "iam_policy_statement_action_notaction" {
category = category.iam_policy_action
sql = <<-EOQ
select
concat('action:', action) as id,
action as title
@@ -153,10 +189,13 @@ node "iam_policy_statement_action_notaction" {
jsonb_array_elements(($1 :: jsonb) -> 'Statement') with ordinality as t(stmt,i),
jsonb_array_elements_text(coalesce(t.stmt -> 'Action','[]'::jsonb) || coalesce(t.stmt -> 'NotAction','[]'::jsonb)) as action
EOQ
param "iam_policy_std" {}
}
node "iam_policy_statement_condition" {
category = category.iam_policy_condition
sql = <<-EOQ
select
condition.key as title,
@@ -168,10 +207,13 @@ node "iam_policy_statement_condition" {
where
stmt -> 'Condition' <> 'null'
EOQ
param "iam_policy_std" {}
}
node "iam_policy_statement_condition_key" {
category = category.iam_policy_condition_key
sql = <<-EOQ
select
condition_key.key as title,
@@ -184,10 +226,13 @@ node "iam_policy_statement_condition_key" {
where
stmt -> 'Condition' <> 'null'
EOQ
param "iam_policy_std" {}
}
node "iam_policy_statement_condition_key_value" {
category = category.iam_policy_condition_value
sql = <<-EOQ
select
condition_value as title,
@@ -200,10 +245,13 @@ node "iam_policy_statement_condition_key_value" {
where
stmt -> 'Condition' <> 'null'
EOQ
param "iam_policy_std" {}
}
node "iam_policy_statement_resource_notresource" {
category = category.iam_policy_resource
sql = <<-EOQ
select
resource as id,
@@ -213,12 +261,17 @@ node "iam_policy_statement_resource_notresource" {
jsonb_array_elements_text(coalesce(t.stmt -> 'Action','[]'::jsonb) || coalesce(t.stmt -> 'NotAction','[]'::jsonb)) as action,
jsonb_array_elements_text(coalesce(t.stmt -> 'Resource','[]'::jsonb) || coalesce(t.stmt -> 'NotResource','[]'::jsonb)) as resource
EOQ
param "iam_policy_std" {}
}
// edges
edge "iam_policy_statement_action" {
//title = "allows"
sql = <<-EOQ
select
--distinct on (p.arn,action)
concat('action:', action) as to_id,
@@ -229,11 +282,14 @@ edge "iam_policy_statement_action" {
jsonb_array_elements(($1 :: jsonb) -> 'Statement') with ordinality as t(stmt,i),
jsonb_array_elements_text(t.stmt -> 'Action') as action
EOQ
param "iam_policy_std" {}
}
edge "iam_policy_statement_condition" {
title = "condition"
sql = <<-EOQ
select
concat('statement:', i, ':condition:', condition.key) as to_id,
concat('statement:', i) as from_id
@@ -243,8 +299,10 @@ edge "iam_policy_statement_condition" {
where
stmt -> 'Condition' <> 'null'
EOQ
param "iam_policy_std" {}
}
edge "iam_policy_statement_condition_key" {
title = "all of"
sql = <<-EOQ
@@ -258,8 +316,10 @@ edge "iam_policy_statement_condition_key" {
where
stmt -> 'Condition' <> 'null'
EOQ
param "iam_policy_std" {}
}
edge "iam_policy_statement_condition_key_value" {
title = "any of"
sql = <<-EOQ
@@ -274,10 +334,13 @@ edge "iam_policy_statement_condition_key_value" {
where
stmt -> 'Condition' <> 'null'
EOQ
param "iam_policy_std" {}
}
edge "iam_policy_statement_notaction" {
sql = <<-EOQ
select
--distinct on (p.arn,notaction)
concat('action:', notaction) as to_id,
@@ -288,10 +351,13 @@ edge "iam_policy_statement_notaction" {
jsonb_array_elements(($1 :: jsonb) -> 'Statement') with ordinality as t(stmt,i),
jsonb_array_elements_text(t.stmt -> 'NotAction') as notaction
EOQ
param "iam_policy_std" {}
}
edge "iam_policy_statement_notresource" {
title = "not resource"
sql = <<-EOQ
select
concat('action:', coalesce(action, notaction)) as from_id,
@@ -303,10 +369,13 @@ edge "iam_policy_statement_notresource" {
left join jsonb_array_elements_text(stmt -> 'NotAction') as notaction on true
left join jsonb_array_elements_text(stmt -> 'NotResource') as notresource on true
EOQ
param "iam_policy_std" {}
}
edge "iam_policy_statement_resource" {
title = "resource"
sql = <<-EOQ
select
concat('action:', coalesce(action, notaction)) as from_id,
@@ -318,57 +387,75 @@ edge "iam_policy_statement_resource" {
left join jsonb_array_elements_text(stmt -> 'NotAction') as notaction on true
left join jsonb_array_elements_text(stmt -> 'Resource') as resource on true
EOQ
param "iam_policy_std" {}
}
// categories
category "iam_policy" {
title = "IAM Policy"
color = local.iam_color
href = "/aws_insights.dashboard.iam_policy_detail?input.policy_arn={{.properties.'ARN' | @uri}}"
icon = "rule"
}
category "iam_policy_action" {
href = "/aws_insights.dashboard.iam_action_glob_report?input.action_glob={{.title | @uri}}"
icon = "electric-bolt"
color = local.iam_color
title = "Action"
}
category "iam_policy_condition" {
icon = "help"
color = local.iam_color
title = "Condition"
}
category "iam_policy_condition_key" {
icon = "vpn-key"
color = local.iam_color
title = "Condition Key"
}
category "iam_policy_condition_value" {
icon = "text:val"
color = local.iam_color
title = "Condition Value"
}
category "iam_policy_notaction" {
icon = "flash-off"
color = local.iam_color
title = "NotAction"
}
category "iam_policy_notresource" {
icon = "bookmark-remove"
color = local.iam_color
title = "NotResource"
}
category "iam_policy_resource" {
icon = "bookmark"
color = local.iam_color
title = "Resource"
}
category "iam_policy_statement" {
icon = "assignment"
color = local.iam_color
title = "Statement"
}
// color
locals {
analytics_color = "purple"
application_integration_color = "deeppink"

View File

@@ -7,12 +7,14 @@ dashboard "param_ref" {
}
}
table "t1"{
param "dash" {
default = "foo"
with "w1" {
sql = "select 'foo'"
}
sql = "select $1 as c1"
args = [ param.dash]
param "p1" {
default = with.w1.rows[0]
}
}

View File

@@ -0,0 +1,3 @@
mod reports_poc {
title = "Reports POC"
}