remove controlexecute and controlstatus and remove control and benchmark refs from dashboardexecute

This commit is contained in:
Puskar Basu
2024-05-30 17:21:42 +05:30
parent 2ea42e690d
commit 4966e01a67
25 changed files with 0 additions and 2016 deletions

View File

@@ -1,409 +0,0 @@
package controlexecute
import (
"context"
"fmt"
"log"
"sync"
"time"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/control/controlstatus"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/query/queryresult"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
)
// ControlRun is a struct representing the execution of a control run. It will contain one or more result items (i.e. for one or more resources).
type ControlRun struct {
// properties from control
ControlId string `json:"-"`
FullName string `json:"name"`
Title string `json:"title,omitempty"`
Description string `json:"description,omitempty"`
Documentation string `json:"documentation,omitempty"`
Tags map[string]string `json:"tags,omitempty"`
Display string `json:"display,omitempty"`
Type string `json:"display_type,omitempty"`
// this will be serialised under 'properties'
Severity string `json:"-"`
// "control"
NodeType string `json:"panel_type"`
// the control being run
Control *modconfig.Control `json:"properties,omitempty"`
// control summary
Summary *controlstatus.StatusSummary `json:"summary"`
RunStatus dashboardtypes.RunStatus `json:"status"`
// result rows
Rows ResultRows `json:"-"`
// the results in snapshot format
Data *dashboardtypes.LeafData `json:"data"`
// a list of distinct dimension keys from the results of this control
DimensionKeys []string `json:"-"`
// execution duration
Duration time.Duration `json:"-"`
// parent result group
Group *ResultGroup `json:"-"`
// execution tree
Tree *ExecutionTree `json:"-"`
// save run error as string for JSON export
RunErrorString string `json:"error,omitempty"`
runError error
// the query result stream
queryResult *queryresult.Result
rowMap map[string]ResultRows
stateLock sync.Mutex
doneChan chan bool
attempts int
}
func NewControlRun(control *modconfig.Control, group *ResultGroup, executionTree *ExecutionTree) *ControlRun {
controlId := control.Name()
// only show qualified control names for controls from dependent mods
if control.Mod.Name() == executionTree.Workspace.Mod.Name() {
controlId = control.UnqualifiedName
}
res := &ControlRun{
Control: control,
ControlId: controlId,
FullName: control.Name(),
Description: control.GetDescription(),
Documentation: control.GetDocumentation(),
Tags: control.GetTags(),
Display: control.GetDisplay(),
Type: control.GetType(),
Severity: typehelpers.SafeString(control.Severity),
Title: typehelpers.SafeString(control.Title),
rowMap: make(map[string]ResultRows),
Summary: &controlstatus.StatusSummary{},
Tree: executionTree,
RunStatus: dashboardtypes.RunInitialized,
Group: group,
NodeType: modconfig.BlockTypeControl,
doneChan: make(chan bool, 1),
}
return res
}
// GetControlId implements ControlRunStatusProvider
func (r *ControlRun) GetControlId() string {
r.stateLock.Lock()
defer r.stateLock.Unlock()
return r.ControlId
}
// GetRunStatus implements ControlRunStatusProvider
func (r *ControlRun) GetRunStatus() dashboardtypes.RunStatus {
r.stateLock.Lock()
defer r.stateLock.Unlock()
return r.RunStatus
}
// GetStatusSummary implements ControlRunStatusProvider
func (r *ControlRun) GetStatusSummary() *controlstatus.StatusSummary {
r.stateLock.Lock()
defer r.stateLock.Unlock()
return r.Summary
}
func (r *ControlRun) Finished() bool {
return r.GetRunStatus().IsFinished()
}
// MatchTag returns the value corresponding to the input key. Returns 'false' if not found
func (r *ControlRun) MatchTag(key string, value string) bool {
val, found := r.Control.GetTags()[key]
return found && (val == value)
}
func (r *ControlRun) GetError() error {
return r.runError
}
// IsSnapshotPanel implements SnapshotPanel
func (*ControlRun) IsSnapshotPanel() {}
// IsExecutionTreeNode implements ExecutionTreeNode
func (*ControlRun) IsExecutionTreeNode() {}
// GetChildren implements ExecutionTreeNode
func (*ControlRun) GetChildren() []ExecutionTreeNode { return nil }
// GetName implements ExecutionTreeNode
func (r *ControlRun) GetName() string { return r.Control.Name() }
// AsTreeNode implements ExecutionTreeNode
func (r *ControlRun) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
res := &dashboardtypes.SnapshotTreeNode{
Name: r.Control.Name(),
NodeType: r.NodeType,
}
return res
}
func (r *ControlRun) setError(ctx context.Context, err error) {
if err == nil {
return
}
if r.runError == context.DeadlineExceeded {
r.runError = fmt.Errorf("control execution timed out")
} else {
r.runError = error_helpers.TransformErrorToSteampipe(err)
}
r.RunErrorString = r.runError.Error()
// update error count
r.Summary.Error++
if error_helpers.IsContextCancelledError(err) {
r.setRunStatus(ctx, dashboardtypes.RunCanceled)
} else {
r.setRunStatus(ctx, dashboardtypes.RunError)
}
}
func (r *ControlRun) skip(ctx context.Context) {
r.setRunStatus(ctx, dashboardtypes.RunComplete)
}
func (r *ControlRun) execute(ctx context.Context, client db_common.Client) {
utils.LogTime("ControlRun.execute start")
defer utils.LogTime("ControlRun.execute end")
log.Printf("[TRACE] begin ControlRun.Start: %s\n", r.Control.Name())
defer log.Printf("[TRACE] end ControlRun.Start: %s\n", r.Control.Name())
control := r.Control
startTime := time.Now()
// function to cleanup and update status after control run completion
defer func() {
// update the result group status with our status - this will be passed all the way up the execution tree
r.Group.updateSummary(r.Summary)
if len(r.Severity) != 0 {
r.Group.updateSeverityCounts(r.Severity, r.Summary)
}
r.Duration = time.Since(startTime)
if r.Group != nil {
r.Group.onChildDone()
}
log.Printf("[TRACE] finishing with concurrency, %s, , %d\n", r.Control.Name(), r.Tree.Progress.Executing)
}()
// get a db connection
sessionResult := r.acquireSession(ctx, client)
if sessionResult.Error != nil {
if !error_helpers.IsCancelledError(sessionResult.Error) {
log.Printf("[TRACE] controlRun %s execute failed to acquire session: %s", r.ControlId, sessionResult.Error)
sessionResult.Error = fmt.Errorf("error acquiring database connection, %s", sessionResult.Error.Error())
r.setError(ctx, sessionResult.Error)
}
return
}
dbSession := sessionResult.Session
defer func() {
// do this in a closure, otherwise the argument will not get evaluated during calltime
dbSession.Close(error_helpers.IsContextCanceled(ctx))
}()
// set our status
r.RunStatus = dashboardtypes.RunRunning
// update the current running control in the Progress renderer
r.Tree.Progress.OnControlStart(ctx, r)
defer func() {
// update Progress
if r.GetRunStatus() == dashboardtypes.RunError {
r.Tree.Progress.OnControlError(ctx, r)
} else {
r.Tree.Progress.OnControlComplete(ctx, r)
}
}()
// resolve the control query
resolvedQuery, err := r.resolveControlQuery(control)
if err != nil {
r.setError(ctx, err)
return
}
controlExecutionCtx := r.getControlQueryContext(ctx)
// execute the control query
// NOTE no need to pass an OnComplete callback - we are already closing our session after waiting for results
log.Printf("[TRACE] execute start for, %s\n", control.Name())
queryResult, err := client.ExecuteInSession(controlExecutionCtx, dbSession, nil, resolvedQuery.ExecuteSQL, resolvedQuery.Args...)
log.Printf("[TRACE] execute finish for, %s\n", control.Name())
if err != nil {
r.attempts++
// is this an rpc EOF error - meaning that the plugin somehow crashed
if grpc.IsGRPCConnectivityError(err) {
if r.attempts < constants.MaxControlRunAttempts {
log.Printf("[TRACE] control %s query failed with plugin connectivity error %s - retrying…", r.Control.Name(), err)
// recurse into this function to retry using the original context - which Execute will use to create it's own timeout context
r.execute(ctx, client)
return
} else {
log.Printf("[TRACE] control %s query failed again with plugin connectivity error %s - NOT retrying…", r.Control.Name(), err)
}
}
r.setError(ctx, err)
return
}
r.queryResult = queryResult
// now wait for control completion
log.Printf("[TRACE] wait result for, %s\n", control.Name())
r.waitForResults(ctx)
log.Printf("[TRACE] finish result for, %s\n", control.Name())
}
// try to acquire a database session - retry up to 4 times if there is an error
func (r *ControlRun) acquireSession(ctx context.Context, client db_common.Client) *db_common.AcquireSessionResult {
var sessionResult *db_common.AcquireSessionResult
for attempt := 0; attempt < 4; attempt++ {
sessionResult = client.AcquireSession(ctx)
if sessionResult.Error == nil || error_helpers.IsCancelledError(sessionResult.Error) {
break
}
log.Printf("[TRACE] controlRun %s acquireSession failed with error: %s - retrying", r.ControlId, sessionResult.Error)
}
return sessionResult
}
// create a context with status updates disabled (we do not want to show 'loading' results)
func (r *ControlRun) getControlQueryContext(ctx context.Context) context.Context {
// disable the status spinner to hide 'loading' results)
newCtx := statushooks.DisableStatusHooks(ctx)
return newCtx
}
func (r *ControlRun) resolveControlQuery(control *modconfig.Control) (*modconfig.ResolvedQuery, error) {
resolvedQuery, err := r.Tree.Workspace.ResolveQueryFromQueryProvider(control, nil)
if err != nil {
return nil, fmt.Errorf(`cannot run %s - failed to resolve query "%s": %s`, control.Name(), typehelpers.SafeString(control.SQL), err.Error())
}
return resolvedQuery, nil
}
func (r *ControlRun) waitForResults(ctx context.Context) {
defer func() {
dimensionsSchema := r.getDimensionSchema()
// convert the data to snapshot format
r.Data = r.Rows.ToLeafData(dimensionsSchema)
}()
for {
select {
case <-ctx.Done():
r.setError(ctx, ctx.Err())
return
case row := <-*r.queryResult.RowChan:
// nil row means control run is complete
if row == nil {
// nil row means we are done
r.setRunStatus(ctx, dashboardtypes.RunComplete)
r.createdOrderedResultRows()
return
}
// if the row is in error then we terminate the run
if row.Error != nil {
// set error status (parent summary will be set from parent defer)
r.setError(ctx, row.Error)
return
}
// so all is ok - create another result row
result, err := NewResultRow(r, row, r.queryResult.Cols)
if err != nil {
r.setError(ctx, err)
return
}
r.addResultRow(result)
case <-r.doneChan:
return
}
}
}
func (r *ControlRun) getDimensionSchema() map[string]*queryresult.ColumnDef {
var dimensionsSchema = make(map[string]*queryresult.ColumnDef)
for _, row := range r.Rows {
for _, dim := range row.Dimensions {
if _, ok := dimensionsSchema[dim.Key]; !ok {
// add to map
dimensionsSchema[dim.Key] = &queryresult.ColumnDef{
Name: dim.Key,
DataType: dim.SqlType,
}
// also add to DimensionKeys
r.DimensionKeys = append(r.DimensionKeys, dim.Key)
}
}
}
// add keys to group
r.Group.addDimensionKeys(r.DimensionKeys...)
return dimensionsSchema
}
// add the result row to our results and update the summary with the row status
func (r *ControlRun) addResultRow(row *ResultRow) {
// update results
r.rowMap[row.Status] = append(r.rowMap[row.Status], row)
// update summary
switch row.Status {
case constants.ControlOk:
r.Summary.Ok++
case constants.ControlAlarm:
r.Summary.Alarm++
case constants.ControlSkip:
r.Summary.Skip++
case constants.ControlInfo:
r.Summary.Info++
case constants.ControlError:
r.Summary.Error++
}
}
// populate ordered list of rows
func (r *ControlRun) createdOrderedResultRows() {
statusOrder := []string{constants.ControlError, constants.ControlAlarm, constants.ControlInfo, constants.ControlOk, constants.ControlSkip}
for _, status := range statusOrder {
r.Rows = append(r.Rows, r.rowMap[status]...)
}
}
func (r *ControlRun) setRunStatus(ctx context.Context, status dashboardtypes.RunStatus) {
r.stateLock.Lock()
r.RunStatus = status
r.stateLock.Unlock()
if r.Finished() {
// close the doneChan - we don't need it anymore
close(r.doneChan)
}
}

View File

@@ -1,9 +0,0 @@
package controlexecute
// Dimension is a struct representing an attribute returned by a control run.
// An attribute is stored as a dimension if it's not a standard attribute (reason, resource, status).
type Dimension struct {
Key string `json:"key"`
Value string `json:"value"`
SqlType string `json:"-"`
}

View File

@@ -1,153 +0,0 @@
package controlexecute
import (
"fmt"
)
type DimensionColorGenerator struct {
Map map[string]map[string]uint8
startingRow uint8
startingColumn uint8
// state
allocatedColorCodes []uint8
forbiddenColumns map[uint8]bool
currentRow uint8
currentColumn uint8
}
const minColumn = 16
const maxColumn = 51
const minRow = 0
const maxRow = 5
// NewDimensionColorGenerator creates a new NewDimensionColorGenerator
func NewDimensionColorGenerator(startingRow, startingColumn uint8) (*DimensionColorGenerator, error) {
forbiddenColumns := map[uint8]bool{
16: true, 17: true, 18: true, 19: true, 20: true, // red
22: true, 23: true, 27: true, 28: true, 29: true, //orange
34: true, 35: true, 36: true, 40: true, 41: true, 42: true, //green/orange
46: true, 47: true, 48: true, 49: true, // green
}
if startingColumn < minColumn || startingColumn > maxColumn {
return nil, fmt.Errorf("starting column must be between 16 and 51")
}
if startingRow < minRow || startingRow > maxRow {
return nil, fmt.Errorf("starting row must be between 0 and 5")
}
g := &DimensionColorGenerator{
Map: make(map[string]map[string]uint8),
startingRow: startingRow,
startingColumn: startingColumn,
forbiddenColumns: forbiddenColumns,
}
g.reset()
return g, nil
}
func (g *DimensionColorGenerator) GetDimensionProperties() []string {
var res []string
for d := range g.Map {
res = append(res, d)
}
return res
}
func (g *DimensionColorGenerator) reset() {
// create the state map
g.currentRow = g.startingRow
g.currentColumn = g.startingColumn
// clear allocated colors
g.allocatedColorCodes = nil
}
func (g *DimensionColorGenerator) populate(e *ExecutionTree) {
for _, run := range e.ControlRuns {
for _, r := range run.Rows {
for _, d := range r.Dimensions {
if !g.hasDimensionValue(d) {
g.addDimensionValue(d)
}
}
}
}
}
func (g *DimensionColorGenerator) hasDimensionValue(dimension Dimension) bool {
dimensionMap, ok := g.Map[dimension.Key]
var gotValue bool
if ok {
// so we have a dimension map for this dimension
// - do we have a dimension color for this property value?
_, gotValue = dimensionMap[dimension.Value]
}
return gotValue
}
func (g *DimensionColorGenerator) addDimensionValue(d Dimension) {
// do we have a dimension map for this dimension property?
if g.Map[d.Key] == nil {
g.Map[d.Key] = make(map[string]uint8)
}
// store the color keyed by property VALUE
color := g.getNextColor()
g.Map[d.Key][d.Value] = color
}
func (g *DimensionColorGenerator) getNextColor() uint8 {
g.incrementCurrentColumn(2)
g.incrementCurrentRow(2)
// does this color clash, or is it forbidden
color := g.getCurrentColor()
origColor := color
for g.colorClashes(color) {
g.incrementCurrentColumn(1)
g.incrementCurrentRow(1)
color = g.getCurrentColor()
if color == origColor {
// we have tried them all reset and start from the first color
g.reset()
return g.getNextColor()
}
}
// store this color code
g.allocatedColorCodes = append(g.allocatedColorCodes, color)
return color
}
func (g *DimensionColorGenerator) getCurrentColor() uint8 {
return g.currentColumn + g.currentRow*36
}
func (g *DimensionColorGenerator) incrementCurrentRow(increment uint8) {
g.currentRow += increment
if g.currentRow > maxRow {
g.currentRow -= maxRow
}
}
func (g *DimensionColorGenerator) incrementCurrentColumn(increment uint8) {
g.currentColumn += increment
if g.currentColumn > maxColumn {
// reset to 16
g.currentColumn -= maxColumn - minColumn + 1
}
for ; g.forbiddenColumns[g.currentColumn]; g.currentColumn++ {
}
}
// check map our map of color indexes - if we are within 5 of any other element, skip this color
func (g *DimensionColorGenerator) colorClashes(color uint8) bool {
for _, a := range g.allocatedColorCodes {
if a == color {
return true
}
}
return false
}

View File

@@ -1,32 +0,0 @@
package controlexecute
import (
"fmt"
"testing"
"time"
"github.com/logrusorgru/aurora"
)
func TestGetNextColor(t *testing.T) {
var startingCol uint8
var startingRow uint8
for startingCol = 16; startingCol <= 51; startingCol++ {
for startingRow = 0; startingRow <= 5; startingRow++ {
fmt.Printf("\nROW %d COL %d\n", startingRow, startingCol)
g, err := NewDimensionColorGenerator(startingRow, startingCol)
if err != nil {
t.Fatal(err)
}
for i := 0; i < 10; i++ {
color := g.getNextColor()
fmt.Printf("%s\n", aurora.Index(color, fmt.Sprintf("XXXXXXXXXXXXXXXXXX, color: %d", color)))
}
time.Sleep(20 * time.Millisecond)
}
}
fmt.Println()
}

View File

@@ -1,43 +0,0 @@
package controlexecute
import (
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
)
// DirectChildrenModDecorator is a struct used to wrap a Mod but modify the results of GetChildren to only return
// immediate mod children (as opposed to all resources in dependency mods as well)
// This is needed when running 'check all' for a mod which has dependency mopds'
type DirectChildrenModDecorator struct {
*modconfig.Mod
}
// GetChildren is overridden
func (r DirectChildrenModDecorator) GetChildren() []modconfig.ModTreeItem {
var res []modconfig.ModTreeItem
for _, child := range r.Mod.GetChildren() {
if child.GetMod().ShortName == r.Mod.ShortName {
res = append(res, child)
}
}
return res
}
// GetDocumentation implements DashboardLeafNode
func (r DirectChildrenModDecorator) GetDocumentation() string {
return r.Mod.GetDocumentation()
}
// GetDisplay implements DashboardLeafNode
func (r DirectChildrenModDecorator) GetDisplay() string {
return ""
}
// GetType implements DashboardLeafNode
func (r DirectChildrenModDecorator) GetType() string {
return ""
}
// GetWidth implements DashboardLeafNode
func (r DirectChildrenModDecorator) GetWidth() int {
return 0
}

View File

@@ -1,295 +0,0 @@
package controlexecute
import (
"context"
"fmt"
"log"
"sort"
"time"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/pkg/connection_sync"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/control/controlstatus"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/query/queryresult"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
"github.com/turbot/steampipe/pkg/workspace"
"golang.org/x/sync/semaphore"
)
// ExecutionTree is a structure representing the control execution hierarchy
type ExecutionTree struct {
Root *ResultGroup `json:"root"`
// flat list of all control runs
ControlRuns []*ControlRun `json:"-"`
StartTime time.Time `json:"start_time"`
EndTime time.Time `json:"end_time"`
Progress *controlstatus.ControlProgress `json:"progress"`
// map of dimension property name to property value to color map
DimensionColorGenerator *DimensionColorGenerator `json:"-"`
// the current session search path
SearchPath []string `json:"-"`
Workspace *workspace.Workspace `json:"-"`
client db_common.Client
// an optional map of control names used to filter the controls which are run
controlNameFilterMap map[string]bool
}
func NewExecutionTree(ctx context.Context, workspace *workspace.Workspace, client db_common.Client, controlFilterWhereClause string, args ...string) (*ExecutionTree, error) {
if len(args) < 1 {
return nil, sperr.New("need at least one argument to create a check execution tree")
}
searchPath := client.GetRequiredSessionSearchPath()
// now populate the ExecutionTree
executionTree := &ExecutionTree{
Workspace: workspace,
client: client,
SearchPath: utils.UnquoteStringArray(searchPath),
}
// if a "--where" or "--tag" parameter was passed, build a map of control names used to filter the controls to run
// create a context with status hooks disabled
noStatusCtx := statushooks.DisableStatusHooks(ctx)
err := executionTree.populateControlFilterMap(noStatusCtx, controlFilterWhereClause)
if err != nil {
return nil, err
}
var resolvedItem modconfig.ModTreeItem
// if only one argument is provided, add this as execution root
if len(args) == 1 {
resolvedItem, err = executionTree.getExecutionRootFromArg(args[0])
if err != nil {
return nil, err
}
} else {
// for multiple items, use a root benchmark as the parent of the items
// this root benchmark will be converted to a ResultGroup that can be worked with
// this is necessary because snapshots only support a single tree item as the child of the root
items := []modconfig.ModTreeItem{}
for _, arg := range args {
item, err := executionTree.getExecutionRootFromArg(arg)
if err != nil {
return nil, err
}
items = append(items, item)
}
// create a root benchmark with `items` as it's children
resolvedItem = modconfig.NewRootBenchmarkWithChildren(workspace.Mod, items).(modconfig.ModTreeItem)
}
// build tree of result groups, starting with a synthetic 'root' node
executionTree.Root = NewRootResultGroup(ctx, executionTree, resolvedItem)
// after tree has built, ControlCount will be set - create progress rendered
executionTree.Progress = controlstatus.NewControlProgress(len(executionTree.ControlRuns))
return executionTree, nil
}
// IsExportSourceData implements ExportSourceData
func (*ExecutionTree) IsExportSourceData() {}
// AddControl checks whether control should be included in the tree
// if so, creates a ControlRun, which is added to the parent group
func (e *ExecutionTree) AddControl(ctx context.Context, control *modconfig.Control, group *ResultGroup) {
// note we use short name to determine whether to include a control
if e.ShouldIncludeControl(control.ShortName) {
// create new ControlRun with treeItem as the parent
controlRun := NewControlRun(control, group, e)
// add it into the group
group.addControl(controlRun)
// also add it into the execution tree control run list
e.ControlRuns = append(e.ControlRuns, controlRun)
}
}
func (e *ExecutionTree) Execute(ctx context.Context) error {
log.Println("[TRACE]", "begin ExecutionTree.Execute")
defer log.Println("[TRACE]", "end ExecutionTree.Execute")
e.StartTime = time.Now()
e.Progress.Start(ctx)
defer func() {
e.EndTime = time.Now()
e.Progress.Finish(ctx)
}()
// TODO should we always wait even with non custom search path?
// if there is a custom search path, wait until the first connection of each plugin has loaded
if customSearchPath := e.client.GetCustomSearchPath(); customSearchPath != nil {
if err := connection_sync.WaitForSearchPathSchemas(ctx, e.client, customSearchPath); err != nil {
return err
}
}
// the number of goroutines parallel to start
var maxParallelGoRoutines int64 = constants.DefaultMaxConnections
if viper.IsSet(constants.ArgMaxParallel) {
maxParallelGoRoutines = viper.GetInt64(constants.ArgMaxParallel)
}
// to limit the number of parallel controls go routines started
parallelismLock := semaphore.NewWeighted(maxParallelGoRoutines)
// just execute the root - it will traverse the tree
e.Root.execute(ctx, e.client, parallelismLock)
if err := e.waitForActiveRunsToComplete(ctx, parallelismLock, maxParallelGoRoutines); err != nil {
log.Printf("[WARN] timed out waiting for active runs to complete")
}
// now build map of dimension property name to property value to color map
e.DimensionColorGenerator, _ = NewDimensionColorGenerator(4, 27)
e.DimensionColorGenerator.populate(e)
return nil
}
func (e *ExecutionTree) waitForActiveRunsToComplete(ctx context.Context, parallelismLock *semaphore.Weighted, maxParallelGoRoutines int64) error {
waitCtx := ctx
// if the context was already cancelled, we must creat ea new one to use when waiting to acquire the lock
if ctx.Err() != nil {
// use a Background context - since the original context has been cancelled
// this lets us wait for the active control queries to cancel
c, cancel := context.WithTimeout(context.Background(), constants.ControlQueryCancellationTimeoutSecs*time.Second)
waitCtx = c
defer cancel()
}
// wait till we can acquire all semaphores - meaning that all active runs have finished
return parallelismLock.Acquire(waitCtx, maxParallelGoRoutines)
}
func (e *ExecutionTree) populateControlFilterMap(ctx context.Context, controlFilterWhereClause string) error {
// if we derived or were passed a where clause, run the filter
if len(controlFilterWhereClause) > 0 {
log.Println("[TRACE]", "filtering controls with", controlFilterWhereClause)
var err error
e.controlNameFilterMap, err = e.getControlMapFromWhereClause(ctx, controlFilterWhereClause)
if err != nil {
return err
}
}
return nil
}
func (e *ExecutionTree) ShouldIncludeControl(controlName string) bool {
if e.controlNameFilterMap == nil {
return true
}
_, ok := e.controlNameFilterMap[controlName]
return ok
}
// getExecutionRootFromArg resolves the arg into the execution root
// - if the arg is a control name, the root will be the Control with that name
// - if the arg is a benchmark name, the root will be the Benchmark with that name
// - if the arg is a mod name, the root will be the Mod with that name
// - if the arg is 'all' the root will be a node with all Mods as children
func (e *ExecutionTree) getExecutionRootFromArg(arg string) (modconfig.ModTreeItem, error) {
// special case handling for the string "all"
if arg == "all" {
// if the arg is "all", we want to execute all _direct_ children of the Mod
// but NOT children which come from dependency mods
// to achieve this, use a DirectChildrenModDecorator
return &DirectChildrenModDecorator{Mod: e.Workspace.Mod}, nil
}
// if the arg is the name of one of the workspace dependendencies, wrap it in DirectChildrenModDecorator
// so we only execute _its_ direct children
for _, mod := range e.Workspace.Mods {
if mod.ShortName == arg {
return &DirectChildrenModDecorator{Mod: mod}, nil
}
}
// what resource type is arg?
parsedName, err := modconfig.ParseResourceName(arg)
if err != nil {
// just log error
return nil, fmt.Errorf("failed to parse check argument '%s': %v", arg, err)
}
resource, found := e.Workspace.GetResource(parsedName)
root, ok := resource.(modconfig.ModTreeItem)
if !found || !ok {
return nil, fmt.Errorf("no resources found matching argument '%s'", arg)
}
// root item must be either a benchmark or a control
if !helpers.StringSliceContains([]string{modconfig.BlockTypeControl, modconfig.BlockTypeBenchmark}, root.BlockType()) {
return nil, fmt.Errorf("cannot execute '%s' using check, only controls and benchmarks may be run", resource.Name())
}
return root, nil
}
// Get a map of control names from the introspection table steampipe_control
// This is used to implement the 'where' control filtering
func (e *ExecutionTree) getControlMapFromWhereClause(ctx context.Context, whereClause string) (map[string]bool, error) {
// query may either be a 'where' clause, or a named query
resolvedQuery, _, err := e.Workspace.ResolveQueryAndArgsFromSQLString(whereClause)
if err != nil {
return nil, err
}
// did we in fact resolve a named query, or just return the 'name' as the query
isNamedQuery := resolvedQuery.ExecuteSQL != whereClause
// if the query is NOT a named query, we need to construct a full query by adding a select
if !isNamedQuery {
resolvedQuery.ExecuteSQL = fmt.Sprintf("select resource_name from %s where %s", constants.IntrospectionTableControl, whereClause)
}
res, err := e.client.ExecuteSync(ctx, resolvedQuery.ExecuteSQL, resolvedQuery.Args...)
if err != nil {
return nil, err
}
//
// find the "resource_name" column index
resourceNameColumnIndex := -1
for i, c := range res.Cols {
if c.Name == "resource_name" {
resourceNameColumnIndex = i
}
}
if resourceNameColumnIndex == -1 {
return nil, fmt.Errorf("the named query passed in the 'where' argument must return the 'resource_name' column")
}
var controlNames = make(map[string]bool)
for _, row := range res.Rows {
rowResult := row.(*queryresult.RowResult)
controlName := rowResult.Data[resourceNameColumnIndex].(string)
controlNames[controlName] = true
}
return controlNames, nil
}
func (e *ExecutionTree) GetAllTags() []string {
// map keep track which tags have been added as columns
tagColumnMap := make(map[string]bool)
var tagColumns []string
for _, r := range e.ControlRuns {
if r.Control.Tags != nil {
for tag := range r.Control.Tags {
if !tagColumnMap[tag] {
tagColumns = append(tagColumns, tag)
tagColumnMap[tag] = true
}
}
}
}
sort.Strings(tagColumns)
return tagColumns
}

View File

@@ -1,11 +0,0 @@
package controlexecute
import "github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
// ExecutionTreeNode is implemented by all control execution tree nodes
type ExecutionTreeNode interface {
IsExecutionTreeNode()
GetChildren() []ExecutionTreeNode
GetName() string
AsTreeNode() *dashboardtypes.SnapshotTreeNode
}

View File

@@ -1,340 +0,0 @@
package controlexecute
import (
"context"
"log"
"sort"
"sync"
"sync/atomic"
"time"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/control/controlstatus"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"golang.org/x/sync/semaphore"
)
const RootResultGroupName = "root_result_group"
// ResultGroup is a struct representing a grouping of control results
// It may correspond to a Benchmark, or some other arbitrary grouping
type ResultGroup struct {
GroupId string `json:"name" csv:"group_id"`
Title string `json:"title,omitempty" csv:"title"`
Description string `json:"description,omitempty" csv:"description"`
Tags map[string]string `json:"tags,omitempty"`
Documentation string `json:"documentation,omitempty"`
Display string `json:"display,omitempty"`
Type string `json:"type,omitempty"`
// the overall summary of the group
Summary *GroupSummary `json:"summary"`
// child result groups
Groups []*ResultGroup `json:"-"`
// child control runs
ControlRuns []*ControlRun `json:"-"`
// list of children stored as controlexecute.ExecutionTreeNode
Children []ExecutionTreeNode `json:"-"`
Severity map[string]controlstatus.StatusSummary `json:"-"`
// "benchmark"
NodeType string `json:"panel_type"`
// the control tree item associated with this group(i.e. a mod/benchmark)
GroupItem modconfig.ModTreeItem `json:"-"`
Parent *ResultGroup `json:"-"`
Duration time.Duration `json:"-"`
// a list of distinct dimension keys from descendant controls
DimensionKeys []string `json:"-"`
childrenComplete uint32
executionStartTime time.Time
// lock to prevent multiple control_runs updating this
updateLock *sync.Mutex
}
type GroupSummary struct {
Status controlstatus.StatusSummary `json:"status"`
Severity map[string]controlstatus.StatusSummary `json:"-"`
}
func NewGroupSummary() *GroupSummary {
return &GroupSummary{Severity: make(map[string]controlstatus.StatusSummary)}
}
// NewRootResultGroup creates a ResultGroup to act as the root node of a control execution tree
func NewRootResultGroup(ctx context.Context, executionTree *ExecutionTree, rootItem modconfig.ModTreeItem) *ResultGroup {
root := &ResultGroup{
GroupId: RootResultGroupName,
Groups: []*ResultGroup{},
Tags: make(map[string]string),
Summary: NewGroupSummary(),
Severity: make(map[string]controlstatus.StatusSummary),
updateLock: new(sync.Mutex),
NodeType: modconfig.BlockTypeBenchmark,
Title: rootItem.GetTitle(),
}
// if root item is a benchmark, create new result group with root as parent
if control, ok := rootItem.(*modconfig.Control); ok {
// if root item is a control, add control run
executionTree.AddControl(ctx, control, root)
} else {
// create a result group for this item
itemGroup := NewResultGroup(ctx, executionTree, rootItem, root)
root.addResultGroup(itemGroup)
}
return root
}
// NewResultGroup creates a result group from a ModTreeItem
func NewResultGroup(ctx context.Context, executionTree *ExecutionTree, treeItem modconfig.ModTreeItem, parent *ResultGroup) *ResultGroup {
group := &ResultGroup{
GroupId: treeItem.Name(),
Title: treeItem.GetTitle(),
Description: treeItem.GetDescription(),
Tags: treeItem.GetTags(),
GroupItem: treeItem,
Parent: parent,
Groups: []*ResultGroup{},
Summary: NewGroupSummary(),
Severity: make(map[string]controlstatus.StatusSummary),
updateLock: new(sync.Mutex),
NodeType: modconfig.BlockTypeBenchmark,
}
// populate additional properties (this avoids adding GetDocumentation, GetDisplay and GetType to all ModTreeItems)
switch t := treeItem.(type) {
case *modconfig.Benchmark:
group.Documentation = t.GetDocumentation()
group.Display = t.GetDisplay()
group.Type = t.GetType()
case *modconfig.Control:
group.Documentation = t.GetDocumentation()
group.Display = t.GetDisplay()
group.Type = t.GetType()
}
// add child groups for children which are benchmarks
for _, c := range treeItem.GetChildren() {
if benchmark, ok := c.(*modconfig.Benchmark); ok {
// create a result group for this item
benchmarkGroup := NewResultGroup(ctx, executionTree, benchmark, group)
// if the group has any control runs, add to tree
if benchmarkGroup.ControlRunCount() > 0 {
// create a new result group with 'group' as the parent
group.addResultGroup(benchmarkGroup)
}
}
if control, ok := c.(*modconfig.Control); ok {
executionTree.AddControl(ctx, control, group)
}
}
return group
}
func (r *ResultGroup) AllTagKeys() []string {
tags := []string{}
for k := range r.Tags {
tags = append(tags, k)
}
for _, child := range r.Groups {
tags = append(tags, child.AllTagKeys()...)
}
for _, run := range r.ControlRuns {
for k := range run.Control.Tags {
tags = append(tags, k)
}
}
tags = helpers.StringSliceDistinct(tags)
sort.Strings(tags)
return tags
}
// GetGroupByName finds an immediate child ResultGroup with a specific name
func (r *ResultGroup) GetGroupByName(name string) *ResultGroup {
for _, group := range r.Groups {
if group.GroupId == name {
return group
}
}
return nil
}
// GetChildGroupByName finds a nested child ResultGroup with a specific name
func (r *ResultGroup) GetChildGroupByName(name string) *ResultGroup {
for _, group := range r.Groups {
if group.GroupId == name {
return group
}
if child := group.GetChildGroupByName(name); child != nil {
return child
}
}
return nil
}
// GetControlRunByName finds a child ControlRun with a specific control name
func (r *ResultGroup) GetControlRunByName(name string) *ControlRun {
for _, run := range r.ControlRuns {
if run.Control.Name() == name {
return run
}
}
return nil
}
func (r *ResultGroup) ControlRunCount() int {
count := len(r.ControlRuns)
for _, g := range r.Groups {
count += g.ControlRunCount()
}
return count
}
// IsSnapshotPanel implements SnapshotPanel
func (*ResultGroup) IsSnapshotPanel() {}
// IsExecutionTreeNode implements ExecutionTreeNode
func (*ResultGroup) IsExecutionTreeNode() {}
// GetChildren implements ExecutionTreeNode
func (r *ResultGroup) GetChildren() []ExecutionTreeNode { return r.Children }
// GetName implements ExecutionTreeNode
func (r *ResultGroup) GetName() string { return r.GroupId }
// AsTreeNode implements ExecutionTreeNode
func (r *ResultGroup) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
res := &dashboardtypes.SnapshotTreeNode{
Name: r.GroupId,
Children: make([]*dashboardtypes.SnapshotTreeNode, len(r.Children)),
NodeType: r.NodeType,
}
for i, c := range r.Children {
res.Children[i] = c.AsTreeNode()
}
return res
}
// add result group into our list, and also add a tree node into our child list
func (r *ResultGroup) addResultGroup(group *ResultGroup) {
r.Groups = append(r.Groups, group)
r.Children = append(r.Children, group)
}
// add control into our list, and also add a tree node into our child list
func (r *ResultGroup) addControl(controlRun *ControlRun) {
r.ControlRuns = append(r.ControlRuns, controlRun)
r.Children = append(r.Children, controlRun)
}
func (r *ResultGroup) addDimensionKeys(keys ...string) {
r.updateLock.Lock()
defer r.updateLock.Unlock()
r.DimensionKeys = append(r.DimensionKeys, keys...)
if r.Parent != nil {
r.Parent.addDimensionKeys(keys...)
}
r.DimensionKeys = helpers.StringSliceDistinct(r.DimensionKeys)
sort.Strings(r.DimensionKeys)
}
// onChildDone is a callback that gets called from the children of this result group when they are done
func (r *ResultGroup) onChildDone() {
newCount := atomic.AddUint32(&r.childrenComplete, 1)
totalCount := uint32(len(r.ControlRuns) + len(r.Groups))
if newCount < totalCount {
// all children haven't finished execution yet
return
}
// all children are done
r.Duration = time.Since(r.executionStartTime)
if r.Parent != nil {
r.Parent.onChildDone()
}
}
func (r *ResultGroup) updateSummary(summary *controlstatus.StatusSummary) {
r.updateLock.Lock()
defer r.updateLock.Unlock()
r.Summary.Status.Skip += summary.Skip
r.Summary.Status.Alarm += summary.Alarm
r.Summary.Status.Info += summary.Info
r.Summary.Status.Ok += summary.Ok
r.Summary.Status.Error += summary.Error
if r.Parent != nil {
r.Parent.updateSummary(summary)
}
}
func (r *ResultGroup) updateSeverityCounts(severity string, summary *controlstatus.StatusSummary) {
r.updateLock.Lock()
defer r.updateLock.Unlock()
val, exists := r.Severity[severity]
if !exists {
val = controlstatus.StatusSummary{}
}
val.Alarm += summary.Alarm
val.Error += summary.Error
val.Info += summary.Info
val.Ok += summary.Ok
val.Skip += summary.Skip
r.Summary.Severity[severity] = val
if r.Parent != nil {
r.Parent.updateSeverityCounts(severity, summary)
}
}
func (r *ResultGroup) execute(ctx context.Context, client db_common.Client, parallelismLock *semaphore.Weighted) {
log.Printf("[TRACE] begin ResultGroup.Execute: %s\n", r.GroupId)
defer log.Printf("[TRACE] end ResultGroup.Execute: %s\n", r.GroupId)
r.executionStartTime = time.Now()
for _, controlRun := range r.ControlRuns {
if error_helpers.IsContextCanceled(ctx) {
controlRun.setError(ctx, ctx.Err())
continue
}
if viper.GetBool(constants.ArgDryRun) {
controlRun.skip(ctx)
continue
}
err := parallelismLock.Acquire(ctx, 1)
if err != nil {
controlRun.setError(ctx, err)
continue
}
go executeRun(ctx, controlRun, parallelismLock, client)
}
for _, child := range r.Groups {
child.execute(ctx, client, parallelismLock)
}
}
func executeRun(ctx context.Context, run *ControlRun, parallelismLock *semaphore.Weighted, client db_common.Client) {
defer func() {
if r := recover(); r != nil {
// if the Execute panic'ed, set it as an error
run.setError(ctx, helpers.ToError(r))
}
// Release in defer, so that we don't retain the lock even if there's a panic inside
parallelismLock.Release(1)
}()
run.execute(ctx, client)
}

View File

@@ -1,144 +0,0 @@
package controlexecute
import (
"fmt"
"github.com/turbot/go-kit/helpers"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/query/queryresult"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
)
type ResultRows []*ResultRow
// ToLeafData converts the result rows to snapshot data format
func (r ResultRows) ToLeafData(dimensionSchema map[string]*queryresult.ColumnDef) *dashboardtypes.LeafData {
var res = &dashboardtypes.LeafData{
Columns: []*queryresult.ColumnDef{
{Name: "reason", DataType: "TEXT"},
{Name: "resource", DataType: "TEXT"},
{Name: "status", DataType: "TEXT"},
},
Rows: make([]map[string]interface{}, len(r)),
}
for _, d := range dimensionSchema {
res.Columns = append(res.Columns, d)
}
for i, row := range r {
res.Rows[i] = map[string]interface{}{
"reason": row.Reason,
"resource": row.Resource,
"status": row.Status,
}
// flatten dimensions
for _, d := range row.Dimensions {
res.Rows[i][d.Key] = d.Value
}
}
return res
}
// ResultRow is the result of a control execution for a single resource
type ResultRow struct {
// reason for the status
Reason string `json:"reason" csv:"reason"`
// resource name
Resource string `json:"resource" csv:"resource"`
// status of the row (ok, info, alarm, error, skip)
Status string `json:"status" csv:"status"`
// dimensions for this row
Dimensions []Dimension `json:"dimensions"`
// parent control run
Run *ControlRun `json:"-"`
// source control
Control *modconfig.Control `json:"-" csv:"control_id:UnqualifiedName,control_title:Title,control_description:Description"`
}
// GetDimensionValue returns the value for a dimension key. Returns an empty string with 'false' if not found
func (r *ResultRow) GetDimensionValue(key string) string {
for _, dim := range r.Dimensions {
if dim.Key == key {
return dim.Value
}
}
return ""
}
// AddDimension checks whether a column value is a scalar type, and if so adds it to the Dimensions map
func (r *ResultRow) AddDimension(c *queryresult.ColumnDef, val interface{}) {
r.Dimensions = append(r.Dimensions, Dimension{
Key: c.Name,
Value: typehelpers.ToString(val),
SqlType: c.DataType,
})
}
func NewResultRow(run *ControlRun, row *queryresult.RowResult, cols []*queryresult.ColumnDef) (*ResultRow, error) {
// validate the required columns exist in the result
if err := validateColumns(cols); err != nil {
return nil, err
}
res := &ResultRow{
Run: run,
Control: run.Control,
}
// was there a SQL error _executing the control
// Note: this is different from the control state being 'error'
if row.Error != nil {
return nil, row.Error
}
for i, c := range cols {
switch c.Name {
case "reason":
res.Reason = typehelpers.ToString(row.Data[i])
case "resource":
res.Resource = typehelpers.ToString(row.Data[i])
case "status":
status := typehelpers.ToString(row.Data[i])
if !IsValidControlStatus(status) {
return nil, fmt.Errorf("invalid control status '%s'", status)
}
res.Status = status
default:
// if this is a scalar type, add to dimensions
val := row.Data[i]
// isScalar may mutate the ColumnDef struct by lazily populating the internal isScalar property
if c.IsScalar(val) {
res.AddDimension(c, val)
}
}
}
return res, nil
}
func IsValidControlStatus(status string) bool {
return helpers.StringSliceContains([]string{constants.ControlOk, constants.ControlAlarm, constants.ControlInfo, constants.ControlError, constants.ControlSkip}, status)
}
func validateColumns(cols []*queryresult.ColumnDef) error {
requiredColumns := []string{"reason", "resource", "status"}
var missingColumns []string
for _, col := range requiredColumns {
if !columnTypesContainsColumn(col, cols) {
missingColumns = append(missingColumns, col)
}
}
if len(missingColumns) > 0 {
return fmt.Errorf("control result is missing required %s: %v", utils.Pluralize("column", len(missingColumns)), missingColumns)
}
return nil
}
func columnTypesContainsColumn(col string, colTypes []*queryresult.ColumnDef) bool {
for _, ct := range colTypes {
if ct.Name == col {
return true
}
}
return false
}

View File

@@ -1,52 +0,0 @@
package controlstatus
import (
"context"
"github.com/turbot/steampipe/pkg/contexthelpers"
)
var (
contextKeyControlHook = contexthelpers.ContextKey("control_hook")
)
func AddControlHooksToContext(ctx context.Context, statusHooks ControlHooks) context.Context {
// if the context already contains ControlHooks, do nothing
// this may happen when executing a dashboard snapshot -
if _, ok := ctx.Value(contextKeyControlHook).(ControlHooks); ok {
return ctx
}
return context.WithValue(ctx, contextKeyControlHook, statusHooks)
}
func ControlHooksFromContext(ctx context.Context) ControlHooks {
if ctx == nil {
return NullHooks
}
if val, ok := ctx.Value(contextKeyControlHook).(ControlHooks); ok {
return val
}
// no status hook in context - return null status hook
return NullHooks
}
func OnStart(ctx context.Context, p *ControlProgress) {
ControlHooksFromContext(ctx).OnStart(ctx, p)
}
func OnControlStart(ctx context.Context, controlRun ControlRunStatusProvider, p *ControlProgress) {
ControlHooksFromContext(ctx).OnControlStart(ctx, controlRun, p)
}
func OnControlComplete(ctx context.Context, controlRun ControlRunStatusProvider, p *ControlProgress) {
ControlHooksFromContext(ctx).OnControlComplete(ctx, controlRun, p)
}
func OnControlError(ctx context.Context, controlRun ControlRunStatusProvider, p *ControlProgress) {
ControlHooksFromContext(ctx).OnControlError(ctx, controlRun, p)
}
func OnComplete(ctx context.Context, p *ControlProgress) {
ControlHooksFromContext(ctx).OnComplete(ctx, p)
}

View File

@@ -1,13 +0,0 @@
package controlstatus
import (
"context"
)
type ControlHooks interface {
OnStart(context.Context, *ControlProgress)
OnControlStart(context.Context, ControlRunStatusProvider, *ControlProgress)
OnControlComplete(context.Context, ControlRunStatusProvider, *ControlProgress)
OnControlError(context.Context, ControlRunStatusProvider, *ControlProgress)
OnComplete(context.Context, *ControlProgress)
}

View File

@@ -1,19 +0,0 @@
package controlstatus
import (
"context"
)
var NullHooks = &NullControlHook{}
type NullControlHook struct{}
func (*NullControlHook) OnStart(context.Context, *ControlProgress) {
}
func (*NullControlHook) OnControlStart(context.Context, ControlRunStatusProvider, *ControlProgress) {
}
func (*NullControlHook) OnControlComplete(context.Context, ControlRunStatusProvider, *ControlProgress) {
}
func (*NullControlHook) OnControlError(context.Context, ControlRunStatusProvider, *ControlProgress) {
}
func (*NullControlHook) OnComplete(context.Context, *ControlProgress) {}

View File

@@ -1,36 +0,0 @@
package controlstatus
import (
"context"
"github.com/spf13/viper"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/statushooks"
)
// SnapshotControlHooks is a struct which implements ControlHooks, and displays the control progress as a status message
type SnapshotControlHooks struct {
Enabled bool
}
func NewSnapshotControlHooks() *SnapshotControlHooks {
return &SnapshotControlHooks{
Enabled: viper.GetBool(constants.ArgProgress),
}
}
func (c *SnapshotControlHooks) OnStart(context.Context, *ControlProgress) {
}
func (c *SnapshotControlHooks) OnControlStart(context.Context, ControlRunStatusProvider, *ControlProgress) {
}
func (c *SnapshotControlHooks) OnControlComplete(ctx context.Context, _ ControlRunStatusProvider, progress *ControlProgress) {
statushooks.UpdateSnapshotProgress(ctx, progress.StatusSummaries.TotalCount())
}
func (c *SnapshotControlHooks) OnControlError(ctx context.Context, _ ControlRunStatusProvider, _ *ControlProgress) {
statushooks.SnapshotError(ctx)
}
func (c *SnapshotControlHooks) OnComplete(_ context.Context, _ *ControlProgress) {
}

View File

@@ -1,77 +0,0 @@
package controlstatus
import (
"context"
"fmt"
"github.com/spf13/viper"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/utils"
)
// StatusControlHooks is a struct which implements ControlHooks, and displays the control progress as a status message
type StatusControlHooks struct {
Enabled bool
}
func NewStatusControlHooks() *StatusControlHooks {
return &StatusControlHooks{
Enabled: viper.GetBool(constants.ArgProgress),
}
}
func (c *StatusControlHooks) OnStart(ctx context.Context, _ *ControlProgress) {
if !c.Enabled {
return
}
statushooks.SetStatus(ctx, "Starting controls…")
statushooks.Show(ctx)
}
func (c *StatusControlHooks) OnControlStart(ctx context.Context, _ ControlRunStatusProvider, p *ControlProgress) {
if !c.Enabled {
return
}
c.setStatusFromProgress(ctx, p)
}
func (c *StatusControlHooks) OnControlComplete(ctx context.Context, _ ControlRunStatusProvider, p *ControlProgress) {
if !c.Enabled {
return
}
c.setStatusFromProgress(ctx, p)
}
func (c *StatusControlHooks) OnControlError(ctx context.Context, _ ControlRunStatusProvider, p *ControlProgress) {
if !c.Enabled {
return
}
c.setStatusFromProgress(ctx, p)
}
func (c *StatusControlHooks) OnComplete(ctx context.Context, _ *ControlProgress) {
if !c.Enabled {
return
}
statushooks.Done(ctx)
}
func (c *StatusControlHooks) setStatusFromProgress(ctx context.Context, p *ControlProgress) {
message := fmt.Sprintf("Running %d %s. (%d complete, %d running, %d pending, %d %s)",
p.Total,
utils.Pluralize("control", p.Total),
p.Complete,
p.Executing,
p.Pending,
p.Error,
utils.Pluralize("error", p.Error),
)
statushooks.SetStatus(ctx, message)
}

View File

@@ -1,10 +0,0 @@
package controlstatus
import "github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
// ControlRunStatusProvider is an interface used to allow us to pass a control as the payload of ControlComplete and ControlError events -
type ControlRunStatusProvider interface {
GetControlId() string
GetRunStatus() dashboardtypes.RunStatus
GetStatusSummary() *StatusSummary
}

View File

@@ -1,69 +0,0 @@
package controlstatus
import (
"context"
"sync"
)
type ControlProgress struct {
updateLock *sync.Mutex
Total int `json:"total"`
Pending int `json:"pending"`
Complete int `json:"complete"`
Error int `json:"error"`
Executing int `json:"executing"`
StatusSummaries *StatusSummary `json:"summary"`
}
func NewControlProgress(total int) *ControlProgress {
return &ControlProgress{
updateLock: &sync.Mutex{},
Total: total,
Pending: total,
StatusSummaries: &StatusSummary{},
}
}
func (p *ControlProgress) Start(ctx context.Context) {
p.updateLock.Lock()
defer p.updateLock.Unlock()
OnStart(ctx, p)
}
func (p *ControlProgress) OnControlStart(ctx context.Context, controlRun ControlRunStatusProvider) {
p.updateLock.Lock()
defer p.updateLock.Unlock()
// increment the parallel execution count
p.Executing++
// decrement pending count
p.Pending--
OnControlStart(ctx, controlRun, p)
}
func (p *ControlProgress) OnControlComplete(ctx context.Context, controlRun ControlRunStatusProvider) {
p.updateLock.Lock()
defer p.updateLock.Unlock()
p.Complete++
// decrement the parallel execution count
p.Executing--
p.StatusSummaries.Merge(controlRun.GetStatusSummary())
OnControlComplete(ctx, controlRun, p)
}
func (p *ControlProgress) OnControlError(ctx context.Context, controlRun ControlRunStatusProvider) {
p.updateLock.Lock()
defer p.updateLock.Unlock()
p.Error++
// decrement the parallel execution count
p.Executing--
p.StatusSummaries.Merge(controlRun.GetStatusSummary())
OnControlError(ctx, controlRun, p)
}
func (p *ControlProgress) Finish(ctx context.Context) {
OnComplete(ctx, p)
}

View File

@@ -1,30 +0,0 @@
package controlstatus
// StatusSummary is a struct containing the counts of each possible control status
type StatusSummary struct {
Alarm int `json:"alarm"`
Ok int `json:"ok"`
Info int `json:"info"`
Skip int `json:"skip"`
Error int `json:"error"`
}
func (s *StatusSummary) PassedCount() int {
return s.Ok + s.Info
}
func (s *StatusSummary) FailedCount() int {
return s.Alarm + s.Error
}
func (s *StatusSummary) TotalCount() int {
return s.Alarm + s.Ok + s.Info + s.Skip + s.Error
}
func (s *StatusSummary) Merge(summary *StatusSummary) {
s.Alarm += summary.Alarm
s.Ok += summary.Ok
s.Info += summary.Info
s.Skip += summary.Skip
s.Error += summary.Error
}

View File

@@ -1,18 +0,0 @@
package dashboardevents
import (
"github.com/turbot/steampipe/pkg/control/controlstatus"
"time"
)
type ControlComplete struct {
Progress *controlstatus.ControlProgress
Control controlstatus.ControlRunStatusProvider
Name string
Session string
ExecutionId string
Timestamp time.Time
}
// IsDashboardEvent implements DashboardEvent interface
func (*ControlComplete) IsDashboardEvent() {}

View File

@@ -1,18 +0,0 @@
package dashboardevents
import (
"github.com/turbot/steampipe/pkg/control/controlstatus"
"time"
)
type ControlError struct {
Control controlstatus.ControlRunStatusProvider
Progress *controlstatus.ControlProgress
Name string
Session string
ExecutionId string
Timestamp time.Time
}
// IsDashboardEvent implements DashboardEvent interface
func (*ControlError) IsDashboardEvent() {}

View File

@@ -1,126 +0,0 @@
package dashboardexecute
import (
"context"
"github.com/turbot/steampipe/pkg/control/controlexecute"
"github.com/turbot/steampipe/pkg/control/controlstatus"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
)
// CheckRun is a struct representing the execution of a control or benchmark
type CheckRun struct {
DashboardParentImpl
Summary *controlexecute.GroupSummary `json:"summary"`
SessionId string `json:"-"`
// if the dashboard node is a control, serialise to json as 'properties'
Control *modconfig.Control `json:"properties,omitempty"`
Root controlexecute.ExecutionTreeNode `json:"-"`
controlExecutionTree *controlexecute.ExecutionTree
}
func (r *CheckRun) AsTreeNode() *dashboardtypes.SnapshotTreeNode {
return r.Root.AsTreeNode()
}
func NewCheckRun(resource modconfig.DashboardLeafNode, parent dashboardtypes.DashboardParent, executionTree *DashboardExecutionTree) (*CheckRun, error) {
c := &CheckRun{SessionId: executionTree.sessionId}
// create NewDashboardTreeRunImpl
// (we must create after creating the run as it requires a ref to the run)
c.DashboardParentImpl = newDashboardParentImpl(resource, parent, c, executionTree)
c.NodeType = resource.BlockType()
// set status to initialized
c.Status = dashboardtypes.RunInitialized
// add r into execution tree
executionTree.runs[c.Name] = c
return c, nil
}
// Initialise implements DashboardTreeRun
func (r *CheckRun) Initialise(ctx context.Context) {
// build control execution tree during init, rather than in Execute, so that it is populated when the ExecutionStarted event is sent
controlFilterWhereClause := ""
executionTree, err := controlexecute.NewExecutionTree(ctx, r.executionTree.workspace, r.executionTree.client, controlFilterWhereClause, r.resource.Name())
if err != nil {
// set the error status on the counter - this will raise counter error event
r.SetError(ctx, err)
return
}
r.controlExecutionTree = executionTree
r.Root = executionTree.Root.Children[0]
}
// Execute implements DashboardTreeRun
func (r *CheckRun) Execute(ctx context.Context) {
utils.LogTime("CheckRun.execute start")
defer utils.LogTime("CheckRun.execute end")
// set status to running (this sends update event)
r.setRunning(ctx)
// create a context with a DashboardEventControlHooks to report control execution progress
ctx = controlstatus.AddControlHooksToContext(ctx, NewDashboardEventControlHooks(r))
if err := r.controlExecutionTree.Execute(ctx); err != nil {
r.SetError(ctx, err)
return
}
// set the summary on the CheckRun
r.Summary = r.controlExecutionTree.Root.Summary
// set complete status on counter - this will raise counter complete event
r.SetComplete(ctx)
}
// ChildrenComplete implements DashboardTreeRun (override base)
func (r *CheckRun) ChildrenComplete() bool {
return r.RunComplete()
}
// IsSnapshotPanel implements SnapshotPanel
func (*CheckRun) IsSnapshotPanel() {}
// SetError implements DashboardTreeRun (override to set snapshothook status
func (r *CheckRun) SetError(ctx context.Context, err error) {
// increment error count for snapshot hook
statushooks.SnapshotError(ctx)
r.DashboardTreeRunImpl.SetError(ctx, err)
}
// SetComplete implements DashboardTreeRun (override to set snapshothook status
func (r *CheckRun) SetComplete(ctx context.Context) {
// call snapshot hooks with progress
statushooks.UpdateSnapshotProgress(ctx, 1)
r.DashboardTreeRunImpl.SetComplete(ctx)
}
// BuildSnapshotPanels is a custom implementation of BuildSnapshotPanels - be nice to just use the DashboardExecutionTree but work is needed on common interface types/generics
func (r *CheckRun) BuildSnapshotPanels(leafNodeMap map[string]dashboardtypes.SnapshotPanel) map[string]dashboardtypes.SnapshotPanel {
// if this check run is for a control, just add the controlRUn
if controlRun, ok := r.Root.(*controlexecute.ControlRun); ok {
leafNodeMap[controlRun.Control.Name()] = controlRun
return leafNodeMap
}
leafNodeMap[r.GetName()] = r
return r.buildSnapshotPanelsUnder(r.Root, leafNodeMap)
}
func (r *CheckRun) buildSnapshotPanelsUnder(parent controlexecute.ExecutionTreeNode, res map[string]dashboardtypes.SnapshotPanel) map[string]dashboardtypes.SnapshotPanel {
for _, c := range parent.GetChildren() {
// if this node is a snapshot node, add to map
if snapshotNode, ok := c.(dashboardtypes.SnapshotPanel); ok {
res[c.GetName()] = snapshotNode
}
res = r.buildSnapshotPanelsUnder(c, res)
}
return res
}

View File

@@ -59,11 +59,6 @@ func NewDashboardContainerRun(container *modconfig.DashboardContainer, parent da
if err != nil {
return nil, err
}
case *modconfig.Benchmark, *modconfig.Control:
childRun, err = NewCheckRun(i.(modconfig.DashboardLeafNode), r, executionTree)
if err != nil {
return nil, err
}
default:
// ensure this item is a DashboardLeafNode

View File

@@ -1,56 +0,0 @@
package dashboardexecute
import (
"context"
"github.com/turbot/steampipe/pkg/control/controlstatus"
"time"
"github.com/turbot/steampipe/pkg/dashboard/dashboardevents"
)
// DashboardEventControlHooks is a struct which implements ControlHooks,
// and raises ControlComplete and ControlError dashboard events
type DashboardEventControlHooks struct {
CheckRun *CheckRun
}
func NewDashboardEventControlHooks(r *CheckRun) *DashboardEventControlHooks {
return &DashboardEventControlHooks{
CheckRun: r,
}
}
func (c *DashboardEventControlHooks) OnStart(ctx context.Context, _ *controlstatus.ControlProgress) {
// nothing to do
}
func (c *DashboardEventControlHooks) OnControlStart(context.Context, controlstatus.ControlRunStatusProvider, *controlstatus.ControlProgress) {
}
func (c *DashboardEventControlHooks) OnControlComplete(ctx context.Context, controlRun controlstatus.ControlRunStatusProvider, progress *controlstatus.ControlProgress) {
event := &dashboardevents.ControlComplete{
Control: controlRun,
Progress: progress,
Name: c.CheckRun.Name,
ExecutionId: c.CheckRun.executionTree.id,
Session: c.CheckRun.SessionId,
Timestamp: time.Now(),
}
c.CheckRun.executionTree.workspace.PublishDashboardEvent(ctx, event)
}
func (c *DashboardEventControlHooks) OnControlError(ctx context.Context, controlRun controlstatus.ControlRunStatusProvider, progress *controlstatus.ControlProgress) {
var event = &dashboardevents.ControlError{
Control: controlRun,
Progress: progress,
Name: c.CheckRun.Name,
ExecutionId: c.CheckRun.executionTree.id,
Session: c.CheckRun.SessionId,
Timestamp: time.Now(),
}
c.CheckRun.executionTree.workspace.PublishDashboardEvent(ctx, event)
}
func (c *DashboardEventControlHooks) OnComplete(ctx context.Context, _ *controlstatus.ControlProgress) {
// nothing to do - LeafNodeDone will be sent anyway
}

View File

@@ -77,18 +77,6 @@ func (e *DashboardExecutionTree) createRootItem(rootName string) (dashboardtypes
rootName = fullName
}
switch parsedName.ItemType {
case modconfig.BlockTypeDashboard:
dashboard, ok := e.workspace.GetResourceMaps().Dashboards[rootName]
if !ok {
return nil, fmt.Errorf("dashboard '%s' does not exist in workspace", rootName)
}
return NewDashboardRun(dashboard, e, e)
case modconfig.BlockTypeBenchmark:
benchmark, ok := e.workspace.GetResourceMaps().Benchmarks[rootName]
if !ok {
return nil, fmt.Errorf("benchmark '%s' does not exist in workspace", rootName)
}
return NewCheckRun(benchmark, e, e)
case modconfig.BlockTypeQuery:
// wrap in a table
query, ok := e.workspace.GetResourceMaps().Queries[rootName]
@@ -103,18 +91,6 @@ func (e *DashboardExecutionTree) createRootItem(rootName string) (dashboardtypes
return nil, err
}
return NewDashboardRun(dashboard, e, e)
case modconfig.BlockTypeControl:
// wrap in a table
control, ok := e.workspace.GetResourceMaps().Controls[rootName]
if !ok {
return nil, fmt.Errorf("query '%s' does not exist in workspace", rootName)
}
// wrap this in a chart and a dashboard
dashboard, err := modconfig.NewQueryDashboard(control)
if err != nil {
return nil, err
}
return NewDashboardRun(dashboard, e, e)
default:
return nil, fmt.Errorf("reporting type %s cannot be executed as dashboard", parsedName.ItemType)
}
@@ -274,13 +250,6 @@ func (e *DashboardExecutionTree) BuildSnapshotPanels() map[string]dashboardtypes
for name, run := range e.runs {
res[name] = run.(dashboardtypes.SnapshotPanel)
// special case handling for check runs
if checkRun, ok := run.(*CheckRun); ok {
checkRunChildren := checkRun.BuildSnapshotPanels(res)
for k, v := range checkRunChildren {
res[k] = v
}
}
}
return res
}

View File

@@ -134,11 +134,6 @@ func (r *DashboardRun) createChildRuns(executionTree *DashboardExecutionTree) er
if err != nil {
return err
}
case *modconfig.Benchmark, *modconfig.Control:
childRun, err = NewCheckRun(i.(modconfig.DashboardLeafNode), r, executionTree)
if err != nil {
return err
}
case *modconfig.DashboardInput:
// NOTE: clone the input to avoid mutating the original
// TODO remove the need for this when we refactor input values resolution

View File

@@ -45,21 +45,6 @@ func GetReferencedVariables(root dashboardtypes.DashboardTreeRun, w *workspace.W
return true, nil
},
)
case *CheckRun:
switch n := r.resource.(type) {
case *modconfig.Benchmark:
//nolint:errcheck // we don't care about errors here, since the callback does not return an error
n.WalkResources(
func(resource modconfig.ModTreeItem) (bool, error) {
if resourceWithMetadata, ok := resource.(modconfig.ResourceWithMetadata); ok {
addReferencedVars(resourceWithMetadata.GetReferences())
}
return true, nil
},
)
case *modconfig.Control:
addReferencedVars(n.GetReferences())
}
}
return referencedVariables