remove pkg/dashboard/dashboardserver

This commit is contained in:
Puskar Basu
2024-05-30 16:17:17 +05:30
parent 824b368532
commit 608a8717e8
5 changed files with 0 additions and 1053 deletions

View File

@@ -1,82 +0,0 @@
package dashboardserver
import (
"context"
"fmt"
"log"
"net/http"
"path"
"time"
"github.com/gin-contrib/static"
"github.com/gin-gonic/gin"
"github.com/spf13/viper"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/filepaths"
"gopkg.in/olahol/melody.v1"
)
func startAPIAsync(ctx context.Context, webSocket *melody.Melody) chan struct{} {
doneChan := make(chan struct{})
go func() {
gin.SetMode(gin.ReleaseMode)
router := gin.New()
// only add the Recovery middleware
router.Use(gin.Recovery())
assetsDirectory := filepaths.EnsureDashboardAssetsDir()
router.Use(static.Serve("/", static.LocalFile(assetsDirectory, true)))
router.GET("/ws", func(c *gin.Context) {
webSocket.HandleRequest(c.Writer, c.Request)
})
router.NoRoute(func(c *gin.Context) {
// https://stackoverflow.com/questions/49547/how-do-we-control-web-page-caching-across-all-browsers
c.Header("Cache-Control", "no-cache, no-store, must-revalidate") // HTTP 1.1.
c.Header("Pragma", "no-cache") // HTTP 1.0.
c.Header("Expires", "0") // Proxies.
c.File(path.Join(assetsDirectory, "index.html"))
})
dashboardServerPort := viper.GetInt(constants.ArgDashboardPort)
dashboardServerListen := "localhost"
if viper.GetString(constants.ArgDashboardListen) == string(ListenTypeNetwork) {
dashboardServerListen = ""
}
srv := &http.Server{
Addr: fmt.Sprintf("%s:%d", dashboardServerListen, dashboardServerPort),
Handler: router,
}
go func() {
// service connections
if err := srv.ListenAndServe(); err != nil {
log.Printf("listen: %s\n", err)
}
}()
outputReady(ctx, fmt.Sprintf("Dashboard server started on %d and listening on %s", dashboardServerPort, viper.GetString(constants.ArgDashboardListen)))
OutputMessage(ctx, fmt.Sprintf("Visit http://localhost:%d", dashboardServerPort))
OutputMessage(ctx, "Press Ctrl+C to exit")
<-ctx.Done()
log.Println("Shutdown Server…")
shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
if err := srv.Shutdown(shutdownCtx); err != nil {
error_helpers.ShowErrorWithMessage(ctx, err, "Server shutdown failed")
}
log.Println("[TRACE] Server exiting")
// indicate the API server is done
doneChan <- struct{}{}
}()
return doneChan
}

View File

@@ -1,75 +0,0 @@
package dashboardserver
import (
"context"
"fmt"
"io"
"os"
"path/filepath"
"time"
"github.com/fatih/color"
"github.com/mattn/go-isatty"
"github.com/spf13/viper"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/filepaths"
)
var logSink io.Writer
const (
errorPrefix = "[ Error ]"
messagePrefix = "[ Message ]"
readyPrefix = "[ Ready ]"
waitPrefix = "[ Wait ]"
)
func initLogSink() {
if viper.GetBool(constants.ArgServiceMode) {
logName := fmt.Sprintf("dashboard-%s.log", time.Now().Format("2006-01-02"))
logPath := filepath.Join(filepaths.EnsureLogDir(), logName)
f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666)
if err != nil {
fmt.Printf("failed to open dashboard manager log file: %s\n", err.Error())
os.Exit(3)
}
logSink = f
} else {
logSink = os.Stdout
}
}
func output(_ context.Context, prefix string, msg interface{}) {
if logSink == nil {
logSink = os.Stdout
}
fmt.Fprintf(logSink, "%s %v\n", prefix, msg)
}
func OutputMessage(ctx context.Context, msg string) {
output(ctx, applyColor(messagePrefix, color.HiGreenString), msg)
}
func OutputWarning(ctx context.Context, msg string) {
output(ctx, applyColor(messagePrefix, color.RedString), msg)
}
func OutputError(ctx context.Context, err error) {
output(ctx, applyColor(errorPrefix, color.RedString), err)
}
func outputReady(ctx context.Context, msg string) {
output(ctx, applyColor(readyPrefix, color.GreenString), msg)
}
func OutputWait(ctx context.Context, msg string) {
output(ctx, applyColor(waitPrefix, color.CyanString), msg)
}
func applyColor(str string, color func(format string, a ...interface{}) string) string {
if !isatty.IsTerminal(os.Stdout.Fd()) || viper.GetBool(constants.ArgServiceMode) {
return str
} else {
return color((str))
}
}

View File

@@ -1,244 +0,0 @@
package dashboardserver
import (
"encoding/json"
"fmt"
"github.com/spf13/viper"
typeHelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/dashboard/dashboardevents"
"github.com/turbot/steampipe/pkg/dashboard/dashboardexecute"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/version"
)
func buildDashboardMetadataPayload(workspaceResources *modconfig.ResourceMaps, cloudMetadata *steampipeconfig.CloudMetadata) ([]byte, error) {
installedMods := make(map[string]ModDashboardMetadata)
for _, mod := range workspaceResources.Mods {
// Ignore current mod
if mod.FullName == workspaceResources.Mod.FullName {
continue
}
installedMods[mod.FullName] = ModDashboardMetadata{
Title: typeHelpers.SafeString(mod.Title),
FullName: mod.FullName,
ShortName: mod.ShortName,
}
}
payload := DashboardMetadataPayload{
Action: "dashboard_metadata",
Metadata: DashboardMetadata{
CLI: DashboardCLIMetadata{
Version: version.VersionString,
},
InstalledMods: installedMods,
Telemetry: viper.GetString(constants.ArgTelemetry),
},
}
if mod := workspaceResources.Mod; mod != nil {
payload.Metadata.Mod = &ModDashboardMetadata{
Title: typeHelpers.SafeString(mod.Title),
FullName: mod.FullName,
ShortName: mod.ShortName,
}
}
// if telemetry is enabled, send cloud metadata
if payload.Metadata.Telemetry != constants.TelemetryNone {
payload.Metadata.Cloud = cloudMetadata
}
return json.Marshal(payload)
}
func addBenchmarkChildren(benchmark *modconfig.Benchmark, recordTrunk bool, trunk []string, trunks map[string][][]string) []ModAvailableBenchmark {
var children []ModAvailableBenchmark
for _, child := range benchmark.GetChildren() {
switch t := child.(type) {
case *modconfig.Benchmark:
childTrunk := make([]string, len(trunk)+1)
copy(childTrunk, trunk)
childTrunk[len(childTrunk)-1] = t.FullName
if recordTrunk {
trunks[t.FullName] = append(trunks[t.FullName], childTrunk)
}
availableBenchmark := ModAvailableBenchmark{
Title: t.GetTitle(),
FullName: t.FullName,
ShortName: t.ShortName,
Tags: t.Tags,
Children: addBenchmarkChildren(t, recordTrunk, childTrunk, trunks),
}
children = append(children, availableBenchmark)
}
}
return children
}
func buildAvailableDashboardsPayload(workspaceResources *modconfig.ResourceMaps) ([]byte, error) {
payload := AvailableDashboardsPayload{
Action: "available_dashboards",
Dashboards: make(map[string]ModAvailableDashboard),
Benchmarks: make(map[string]ModAvailableBenchmark),
Snapshots: workspaceResources.Snapshots,
}
// if workspace resources has a mod, populate dashboards and benchmarks
if workspaceResources.Mod != nil {
// build a map of the dashboards provided by each mod
// iterate over the dashboards for the top level mod - this will include the dashboards from dependency mods
for _, dashboard := range workspaceResources.Mod.ResourceMaps.Dashboards {
mod := dashboard.Mod
// add this dashboard
payload.Dashboards[dashboard.FullName] = ModAvailableDashboard{
Title: typeHelpers.SafeString(dashboard.Title),
FullName: dashboard.FullName,
ShortName: dashboard.ShortName,
Tags: dashboard.Tags,
ModFullName: mod.FullName,
}
}
benchmarkTrunks := make(map[string][][]string)
for _, benchmark := range workspaceResources.Mod.ResourceMaps.Benchmarks {
if benchmark.IsAnonymous() {
continue
}
// Find any benchmarks who have a parent that is a mod - we consider these top-level
isTopLevel := false
for _, parent := range benchmark.GetParents() {
switch parent.(type) {
case *modconfig.Mod:
isTopLevel = true
}
}
mod := benchmark.Mod
trunk := []string{benchmark.FullName}
if isTopLevel {
benchmarkTrunks[benchmark.FullName] = [][]string{trunk}
}
availableBenchmark := ModAvailableBenchmark{
Title: benchmark.GetTitle(),
FullName: benchmark.FullName,
ShortName: benchmark.ShortName,
Tags: benchmark.Tags,
IsTopLevel: isTopLevel,
Children: addBenchmarkChildren(benchmark, isTopLevel, trunk, benchmarkTrunks),
ModFullName: mod.FullName,
}
payload.Benchmarks[benchmark.FullName] = availableBenchmark
}
for benchmarkName, trunks := range benchmarkTrunks {
if foundBenchmark, ok := payload.Benchmarks[benchmarkName]; ok {
foundBenchmark.Trunks = trunks
payload.Benchmarks[benchmarkName] = foundBenchmark
}
}
}
return json.Marshal(payload)
}
func buildWorkspaceErrorPayload(e *dashboardevents.WorkspaceError) ([]byte, error) {
payload := ErrorPayload{
Action: "workspace_error",
Error: e.Error.Error(),
}
return json.Marshal(payload)
}
func buildControlCompletePayload(event *dashboardevents.ControlComplete) ([]byte, error) {
payload := ControlEventPayload{
Action: "control_complete",
Control: event.Control,
Name: event.Name,
Progress: event.Progress,
ExecutionId: event.ExecutionId,
Timestamp: event.Timestamp,
}
return json.Marshal(payload)
}
func buildControlErrorPayload(event *dashboardevents.ControlError) ([]byte, error) {
payload := ControlEventPayload{
Action: "control_error",
Control: event.Control,
Name: event.Name,
Progress: event.Progress,
ExecutionId: event.ExecutionId,
Timestamp: event.Timestamp,
}
return json.Marshal(payload)
}
func buildLeafNodeUpdatedPayload(event *dashboardevents.LeafNodeUpdated) ([]byte, error) {
payload := LeafNodeUpdatedPayload{
SchemaVersion: fmt.Sprintf("%d", LeafNodeUpdatedSchemaVersion),
Action: "leaf_node_updated",
DashboardNode: event.LeafNode,
ExecutionId: event.ExecutionId,
Timestamp: event.Timestamp,
}
return json.Marshal(payload)
}
func buildExecutionStartedPayload(event *dashboardevents.ExecutionStarted) ([]byte, error) {
payload := ExecutionStartedPayload{
SchemaVersion: fmt.Sprintf("%d", ExecutionStartedSchemaVersion),
Action: "execution_started",
ExecutionId: event.ExecutionId,
Panels: event.Panels,
Layout: event.Root.AsTreeNode(),
Inputs: event.Inputs,
Variables: event.Variables,
StartTime: event.StartTime,
}
return json.Marshal(payload)
}
func buildExecutionErrorPayload(event *dashboardevents.ExecutionError) ([]byte, error) {
payload := ExecutionErrorPayload{
Action: "execution_error",
Error: event.Error.Error(),
Timestamp: event.Timestamp,
}
return json.Marshal(payload)
}
func buildExecutionCompletePayload(event *dashboardevents.ExecutionComplete) ([]byte, error) {
snap := dashboardexecute.ExecutionCompleteToSnapshot(event)
payload := &ExecutionCompletePayload{
Action: "execution_complete",
SchemaVersion: fmt.Sprintf("%d", ExecutionCompletePayloadSchemaVersion),
ExecutionId: event.ExecutionId,
Snapshot: snap,
}
return json.Marshal(payload)
}
func buildDisplaySnapshotPayload(snap map[string]any) ([]byte, error) {
payload := &DisplaySnapshotPayload{
Action: "execution_complete",
SchemaVersion: fmt.Sprintf("%d", ExecutionCompletePayloadSchemaVersion),
Snapshot: snap,
}
return json.Marshal(payload)
}
func buildInputValuesClearedPayload(event *dashboardevents.InputValuesCleared) ([]byte, error) {
payload := InputValuesClearedPayload{
Action: "input_values_cleared",
ClearedInputs: event.ClearedInputs,
ExecutionId: event.ExecutionId,
}
return json.Marshal(payload)
}

View File

@@ -1,479 +0,0 @@
package dashboardserver
import (
"context"
"encoding/json"
"fmt"
"github.com/turbot/go-kit/helpers"
typeHelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe/pkg/dashboard/dashboardevents"
"github.com/turbot/steampipe/pkg/dashboard/dashboardexecute"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/workspace"
"gopkg.in/olahol/melody.v1"
"log"
"os"
"reflect"
"strings"
"sync"
)
type Server struct {
dbClient db_common.Client
mutex *sync.Mutex
dashboardClients map[string]*DashboardClientInfo
webSocket *melody.Melody
workspace *workspace.Workspace
}
func NewServer(ctx context.Context, dbClient db_common.Client, w *workspace.Workspace) (*Server, error) {
initLogSink()
OutputWait(ctx, "Starting Dashboard Server")
webSocket := melody.New()
var dashboardClients = make(map[string]*DashboardClientInfo)
var mutex = &sync.Mutex{}
server := &Server{
dbClient: dbClient,
mutex: mutex,
dashboardClients: dashboardClients,
webSocket: webSocket,
workspace: w,
}
w.RegisterDashboardEventHandler(ctx, server.HandleDashboardEvent)
err := w.SetupWatcher(ctx, dbClient, func(c context.Context, e error) {})
OutputMessage(ctx, "Workspace loaded")
return server, err
}
// Start starts the API server
// it returns a channel which is signalled when the API server terminates
func (s *Server) Start(ctx context.Context) chan struct{} {
s.initAsync(ctx)
return startAPIAsync(ctx, s.webSocket)
}
// Shutdown stops the API server
func (s *Server) Shutdown(ctx context.Context) {
log.Println("[TRACE] Server shutdown")
if s.webSocket != nil {
log.Println("[TRACE] closing websocket")
if err := s.webSocket.Close(); err != nil {
error_helpers.ShowErrorWithMessage(ctx, err, "Websocket shutdown failed")
}
log.Println("[TRACE] closed websocket")
}
log.Println("[TRACE] Server shutdown complete")
}
func (s *Server) HandleDashboardEvent(ctx context.Context, event dashboardevents.DashboardEvent) {
var payloadError error
var payload []byte
defer func() {
if payloadError != nil {
// we don't expect the build functions to ever error during marshalling
// this is because the data getting marshalled are not expected to have go specific
// properties/data in them
panic(fmt.Errorf("error building payload for '%s': %v", reflect.TypeOf(event).String(), payloadError))
}
}()
switch e := event.(type) {
case *dashboardevents.WorkspaceError:
log.Printf("[TRACE] WorkspaceError event: %s", e.Error)
payload, payloadError = buildWorkspaceErrorPayload(e)
if payloadError != nil {
return
}
_ = s.webSocket.Broadcast(payload)
OutputError(ctx, e.Error)
case *dashboardevents.ExecutionStarted:
log.Printf("[TRACE] ExecutionStarted event session %s, dashboard %s", e.Session, e.Root.GetName())
payload, payloadError = buildExecutionStartedPayload(e)
if payloadError != nil {
return
}
s.writePayloadToSession(e.Session, payload)
OutputWait(ctx, fmt.Sprintf("Dashboard execution started: %s", e.Root.GetName()))
case *dashboardevents.ExecutionError:
log.Println("[TRACE] execution error event")
payload, payloadError = buildExecutionErrorPayload(e)
if payloadError != nil {
return
}
s.writePayloadToSession(e.Session, payload)
OutputError(ctx, e.Error)
case *dashboardevents.ExecutionComplete:
log.Println("[TRACE] execution complete event")
payload, payloadError = buildExecutionCompletePayload(e)
if payloadError != nil {
return
}
dashboardName := e.Root.GetName()
s.writePayloadToSession(e.Session, payload)
outputReady(ctx, fmt.Sprintf("Execution complete: %s", dashboardName))
case *dashboardevents.ControlComplete:
log.Printf("[TRACE] ControlComplete event session %s, control %s", e.Session, e.Control.GetControlId())
payload, payloadError = buildControlCompletePayload(e)
if payloadError != nil {
return
}
s.writePayloadToSession(e.Session, payload)
case *dashboardevents.ControlError:
log.Printf("[TRACE] ControlError event session %s, control %s", e.Session, e.Control.GetControlId())
payload, payloadError = buildControlErrorPayload(e)
if payloadError != nil {
return
}
s.writePayloadToSession(e.Session, payload)
case *dashboardevents.LeafNodeUpdated:
payload, payloadError = buildLeafNodeUpdatedPayload(e)
if payloadError != nil {
return
}
s.writePayloadToSession(e.Session, payload)
case *dashboardevents.DashboardChanged:
log.Println("[TRACE] DashboardChanged event")
deletedDashboards := e.DeletedDashboards
newDashboards := e.NewDashboards
changedBenchmarks := e.ChangedBenchmarks
changedCategories := e.ChangedCategories
changedContainers := e.ChangedContainers
changedControls := e.ChangedControls
changedCards := e.ChangedCards
changedCharts := e.ChangedCharts
changedDashboards := e.ChangedDashboards
changedEdges := e.ChangedEdges
changedFlows := e.ChangedFlows
changedGraphs := e.ChangedGraphs
changedHierarchies := e.ChangedHierarchies
changedImages := e.ChangedImages
changedInputs := e.ChangedInputs
changedNodes := e.ChangedNodes
changedTables := e.ChangedTables
changedTexts := e.ChangedTexts
// If nothing has changed, ignore
if len(deletedDashboards) == 0 &&
len(newDashboards) == 0 &&
len(changedBenchmarks) == 0 &&
len(changedCategories) == 0 &&
len(changedContainers) == 0 &&
len(changedControls) == 0 &&
len(changedCards) == 0 &&
len(changedCharts) == 0 &&
len(changedDashboards) == 0 &&
len(changedEdges) == 0 &&
len(changedFlows) == 0 &&
len(changedGraphs) == 0 &&
len(changedHierarchies) == 0 &&
len(changedImages) == 0 &&
len(changedInputs) == 0 &&
len(changedNodes) == 0 &&
len(changedTables) == 0 &&
len(changedTexts) == 0 {
return
}
for k, v := range s.dashboardClients {
log.Printf("[TRACE] Dashboard client: %v %v\n", k, typeHelpers.SafeString(v.Dashboard))
}
// If) any deleted/new/changed dashboards, emit an available dashboards message to clients
if len(deletedDashboards) != 0 || len(newDashboards) != 0 || len(changedDashboards) != 0 || len(changedBenchmarks) != 0 {
OutputMessage(ctx, "Available Dashboards updated")
// Emit dashboard metadata event in case there is a new mod - else the UI won't know about this mod
payload, payloadError = buildDashboardMetadataPayload(s.workspace.GetResourceMaps(), s.workspace.CloudMetadata)
if payloadError != nil {
return
}
_ = s.webSocket.Broadcast(payload)
// Emit available dashboards event
payload, payloadError = buildAvailableDashboardsPayload(s.workspace.GetResourceMaps())
if payloadError != nil {
return
}
_ = s.webSocket.Broadcast(payload)
}
var dashboardsBeingWatched []string
dashboardClients := s.getDashboardClients()
for _, dashboardClientInfo := range dashboardClients {
dashboardName := typeHelpers.SafeString(dashboardClientInfo.Dashboard)
if dashboardClientInfo.Dashboard != nil {
if helpers.StringSliceContains(dashboardsBeingWatched, dashboardName) {
continue
}
dashboardsBeingWatched = append(dashboardsBeingWatched, dashboardName)
}
}
var changedDashboardNames []string
var newDashboardNames []string
// Process the changed items and make a note of the dashboard(s) they're in
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedBenchmarks)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCategories)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedContainers)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedControls)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCards)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCharts)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedEdges)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedFlows)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedGraphs)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedHierarchies)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedImages)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedInputs)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedNodes)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedTables)...)
changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedTexts)...)
for _, changedDashboard := range changedDashboards {
if helpers.StringSliceContains(changedDashboardNames, changedDashboard.Name) {
continue
}
changedDashboardNames = append(changedDashboardNames, changedDashboard.Name)
}
for _, changedDashboardName := range changedDashboardNames {
sessionMap := s.getDashboardClients()
for sessionId, dashboardClientInfo := range sessionMap {
if typeHelpers.SafeString(dashboardClientInfo.Dashboard) == changedDashboardName {
_ = dashboardexecute.Executor.ExecuteDashboard(ctx, sessionId, changedDashboardName, dashboardClientInfo.DashboardInputs, s.workspace, s.dbClient)
}
}
}
// Special case - if we previously had a workspace error, any previously existing dashboards
// will come in here as new, so we need to check if any of those new dashboards are being watched.
// If so, execute them
for _, newDashboard := range newDashboards {
if helpers.StringSliceContains(newDashboardNames, newDashboard.Name()) {
continue
}
newDashboardNames = append(newDashboardNames, newDashboard.Name())
}
sessionMap := s.getDashboardClients()
for _, newDashboardName := range newDashboardNames {
for sessionId, dashboardClientInfo := range sessionMap {
if typeHelpers.SafeString(dashboardClientInfo.Dashboard) == newDashboardName {
_ = dashboardexecute.Executor.ExecuteDashboard(ctx, sessionId, newDashboardName, dashboardClientInfo.DashboardInputs, s.workspace, s.dbClient)
}
}
}
case *dashboardevents.InputValuesCleared:
log.Println("[TRACE] input values cleared event", *e)
payload, payloadError = buildInputValuesClearedPayload(e)
if payloadError != nil {
return
}
dashboardClients := s.getDashboardClients()
if sessionInfo, ok := dashboardClients[e.Session]; ok {
for _, clearedInput := range e.ClearedInputs {
delete(sessionInfo.DashboardInputs, clearedInput)
}
}
s.writePayloadToSession(e.Session, payload)
}
}
func (s *Server) initAsync(ctx context.Context) {
go func() {
// Return list of dashboards on connect
s.webSocket.HandleConnect(func(session *melody.Session) {
log.Println("[TRACE] client connected")
s.addSession(session)
})
s.webSocket.HandleDisconnect(func(session *melody.Session) {
log.Println("[TRACE] client disconnected")
s.clearSession(ctx, session)
})
s.webSocket.HandleMessage(s.handleMessageFunc(ctx))
OutputMessage(ctx, "Initialization complete")
}()
}
func (s *Server) handleMessageFunc(ctx context.Context) func(session *melody.Session, msg []byte) {
return func(session *melody.Session, msg []byte) {
sessionId := s.getSessionId(session)
var request ClientRequest
// if we could not decode message - ignore
err := json.Unmarshal(msg, &request)
if err != nil {
log.Printf("[WARN] failed to marshal message: %s", err.Error())
return
}
if request.Action != "keep_alive" {
log.Println("[TRACE] message", string(msg))
}
switch request.Action {
case "get_dashboard_metadata":
payload, err := buildDashboardMetadataPayload(s.workspace.GetResourceMaps(), s.workspace.CloudMetadata)
if err != nil {
panic(fmt.Errorf("error building payload for get_metadata: %v", err))
}
_ = session.Write(payload)
case "get_available_dashboards":
payload, err := buildAvailableDashboardsPayload(s.workspace.GetResourceMaps())
if err != nil {
panic(fmt.Errorf("error building payload for get_available_dashboards: %v", err))
}
_ = session.Write(payload)
case "select_dashboard":
s.setDashboardForSession(sessionId, request.Payload.Dashboard.FullName, request.Payload.InputValues)
_ = dashboardexecute.Executor.ExecuteDashboard(ctx, sessionId, request.Payload.Dashboard.FullName, request.Payload.InputValues, s.workspace, s.dbClient)
case "select_snapshot":
snapshotName := request.Payload.Dashboard.FullName
s.setDashboardForSession(sessionId, snapshotName, request.Payload.InputValues)
snap, err := dashboardexecute.Executor.LoadSnapshot(ctx, sessionId, snapshotName, s.workspace)
// TACTICAL- handle with error message
error_helpers.FailOnError(err)
// error handling???
payload, err := buildDisplaySnapshotPayload(snap)
// TACTICAL- handle with error message
error_helpers.FailOnError(err)
s.writePayloadToSession(sessionId, payload)
outputReady(ctx, fmt.Sprintf("Show snapshot complete: %s", snapshotName))
case "input_changed":
s.setDashboardInputsForSession(sessionId, request.Payload.InputValues)
_ = dashboardexecute.Executor.OnInputChanged(ctx, sessionId, request.Payload.InputValues, request.Payload.ChangedInput)
case "clear_dashboard":
s.setDashboardInputsForSession(sessionId, nil)
dashboardexecute.Executor.CancelExecutionForSession(ctx, sessionId)
}
}
}
func (s *Server) clearSession(ctx context.Context, session *melody.Session) {
if strings.ToUpper(os.Getenv("DEBUG")) == "TRUE" {
return
}
sessionId := s.getSessionId(session)
dashboardexecute.Executor.CancelExecutionForSession(ctx, sessionId)
s.deleteDashboardClient(sessionId)
}
func (s *Server) addSession(session *melody.Session) {
sessionId := s.getSessionId(session)
clientSession := &DashboardClientInfo{
Session: session,
}
s.addDashboardClient(sessionId, clientSession)
}
func (s *Server) setDashboardInputsForSession(sessionId string, inputs map[string]interface{}) {
dashboardClients := s.getDashboardClients()
if sessionInfo, ok := dashboardClients[sessionId]; ok {
sessionInfo.DashboardInputs = inputs
}
}
func (s *Server) getSessionId(session *melody.Session) string {
return fmt.Sprintf("%p", session)
}
// functions providing locked access to member properties
func (s *Server) setDashboardForSession(sessionId string, dashboardName string, inputs map[string]interface{}) *DashboardClientInfo {
s.mutex.Lock()
defer s.mutex.Unlock()
dashboardClientInfo := s.dashboardClients[sessionId]
dashboardClientInfo.Dashboard = &dashboardName
dashboardClientInfo.DashboardInputs = inputs
return dashboardClientInfo
}
func (s *Server) writePayloadToSession(sessionId string, payload []byte) {
s.mutex.Lock()
defer s.mutex.Unlock()
if sessionInfo, ok := s.dashboardClients[sessionId]; ok {
_ = sessionInfo.Session.Write(payload)
}
}
func (s *Server) getDashboardClients() map[string]*DashboardClientInfo {
s.mutex.Lock()
defer s.mutex.Unlock()
return s.dashboardClients
}
func (s *Server) addDashboardClient(sessionId string, clientSession *DashboardClientInfo) {
s.mutex.Lock()
s.dashboardClients[sessionId] = clientSession
s.mutex.Unlock()
}
func (s *Server) deleteDashboardClient(sessionId string) {
s.mutex.Lock()
delete(s.dashboardClients, sessionId)
s.mutex.Unlock()
}
func getDashboardsInterestedInResourceChanges(dashboardsBeingWatched []string, existingChangedDashboardNames []string, changedItems []*modconfig.DashboardTreeItemDiffs) []string {
var changedDashboardNames []string
for _, changedItem := range changedItems {
paths := changedItem.Item.GetPaths()
for _, nodePath := range paths {
for _, nodeName := range nodePath {
resourceParts, _ := modconfig.ParseResourceName(nodeName)
// We only care about changes from these resource types
if !helpers.StringSliceContains([]string{modconfig.BlockTypeDashboard, modconfig.BlockTypeBenchmark}, resourceParts.ItemType) {
continue
}
if helpers.StringSliceContains(existingChangedDashboardNames, nodeName) || helpers.StringSliceContains(changedDashboardNames, nodeName) || !helpers.StringSliceContains(dashboardsBeingWatched, nodeName) {
continue
}
changedDashboardNames = append(changedDashboardNames, nodeName)
}
}
}
return changedDashboardNames
}

View File

@@ -1,173 +0,0 @@
package dashboardserver
import (
"fmt"
"github.com/turbot/steampipe/pkg/control/controlstatus"
"github.com/turbot/steampipe/pkg/dashboard/dashboardtypes"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"gopkg.in/olahol/melody.v1"
"time"
)
type ListenType string
const (
ListenTypeLocal ListenType = "local"
ListenTypeNetwork ListenType = "network"
)
// IsValid is a validator for ListenType known values
func (lt ListenType) IsValid() error {
switch lt {
case ListenTypeNetwork, ListenTypeLocal:
return nil
}
return fmt.Errorf("invalid listen type. Must be one of '%v' or '%v'", ListenTypeNetwork, ListenTypeLocal)
}
type ListenPort int
// IsValid is a validator for ListenType known values
func (lp ListenPort) IsValid() error {
if lp < 1 || lp > 65535 {
return fmt.Errorf("invalid port - must be within range (1:65535)")
}
return nil
}
type ErrorPayload struct {
Action string `json:"action"`
Error string `json:"error"`
}
var ExecutionStartedSchemaVersion int64 = 20221222
type ExecutionStartedPayload struct {
SchemaVersion string `json:"schema_version"`
Action string `json:"action"`
ExecutionId string `json:"execution_id"`
Panels map[string]any `json:"panels"`
Layout *dashboardtypes.SnapshotTreeNode `json:"layout"`
Inputs map[string]interface{} `json:"inputs,omitempty"`
Variables map[string]string `json:"variables,omitempty"`
StartTime time.Time `json:"start_time"`
}
var LeafNodeUpdatedSchemaVersion int64 = 20221222
type LeafNodeUpdatedPayload struct {
SchemaVersion string `json:"schema_version"`
Action string `json:"action"`
DashboardNode map[string]any `json:"dashboard_node"`
ExecutionId string `json:"execution_id"`
Timestamp time.Time `json:"timestamp"`
}
type ControlEventPayload struct {
Action string `json:"action"`
Control controlstatus.ControlRunStatusProvider `json:"control"`
Name string `json:"name"`
Progress *controlstatus.ControlProgress `json:"progress"`
ExecutionId string `json:"execution_id"`
Timestamp time.Time `json:"timestamp"`
}
type ExecutionErrorPayload struct {
Action string `json:"action"`
Error string `json:"error"`
Timestamp time.Time `json:"timestamp"`
}
var ExecutionCompletePayloadSchemaVersion int64 = 20221222
type ExecutionCompletePayload struct {
Action string `json:"action"`
SchemaVersion string `json:"schema_version"`
Snapshot *dashboardtypes.SteampipeSnapshot `json:"snapshot"`
ExecutionId string `json:"execution_id"`
}
type DisplaySnapshotPayload struct {
Action string `json:"action"`
SchemaVersion string `json:"schema_version"`
// snapshot is a map here as we cannot deserialise SteampipeSnapshot into a struct
// (without custom deserialisation code) as the Panels property is an interface
Snapshot map[string]any `json:"snapshot"`
ExecutionId string `json:"execution_id"`
}
type InputValuesClearedPayload struct {
Action string `json:"action"`
ClearedInputs []string `json:"cleared_inputs"`
ExecutionId string `json:"execution_id"`
}
type DashboardClientInfo struct {
Session *melody.Session
Dashboard *string
DashboardInputs map[string]interface{}
}
type ClientRequestDashboardPayload struct {
FullName string `json:"full_name"`
}
type ClientRequestPayload struct {
Dashboard ClientRequestDashboardPayload `json:"dashboard"`
InputValues map[string]interface{} `json:"input_values"`
ChangedInput string `json:"changed_input"`
}
type ClientRequest struct {
Action string `json:"action"`
Payload ClientRequestPayload `json:"payload"`
}
type ModAvailableDashboard struct {
Title string `json:"title,omitempty"`
FullName string `json:"full_name"`
ShortName string `json:"short_name"`
Tags map[string]string `json:"tags"`
ModFullName string `json:"mod_full_name"`
}
type ModAvailableBenchmark struct {
Title string `json:"title,omitempty"`
FullName string `json:"full_name"`
ShortName string `json:"short_name"`
Tags map[string]string `json:"tags"`
IsTopLevel bool `json:"is_top_level"`
Children []ModAvailableBenchmark `json:"children,omitempty"`
Trunks [][]string `json:"trunks"`
ModFullName string `json:"mod_full_name"`
}
type AvailableDashboardsPayload struct {
Action string `json:"action"`
Dashboards map[string]ModAvailableDashboard `json:"dashboards"`
Benchmarks map[string]ModAvailableBenchmark `json:"benchmarks"`
Snapshots map[string]string `json:"snapshots"`
}
type ModDashboardMetadata struct {
Title string `json:"title,omitempty"`
FullName string `json:"full_name"`
ShortName string `json:"short_name"`
}
type DashboardCLIMetadata struct {
Version string `json:"version,omitempty"`
}
type DashboardMetadata struct {
Mod *ModDashboardMetadata `json:"mod,omitempty"`
InstalledMods map[string]ModDashboardMetadata `json:"installed_mods,omitempty"`
CLI DashboardCLIMetadata `json:"cli"`
Cloud *steampipeconfig.CloudMetadata `json:"cloud,omitempty"`
Telemetry string `json:"telemetry"`
}
type DashboardMetadataPayload struct {
Action string `json:"action"`
Metadata DashboardMetadata `json:"metadata"`
}