diff --git a/pkg/dashboard/dashboardserver/api.go b/pkg/dashboard/dashboardserver/api.go deleted file mode 100644 index 985e3a30d..000000000 --- a/pkg/dashboard/dashboardserver/api.go +++ /dev/null @@ -1,82 +0,0 @@ -package dashboardserver - -import ( - "context" - "fmt" - "log" - "net/http" - "path" - "time" - - "github.com/gin-contrib/static" - "github.com/gin-gonic/gin" - "github.com/spf13/viper" - "github.com/turbot/steampipe/pkg/constants" - "github.com/turbot/steampipe/pkg/error_helpers" - "github.com/turbot/steampipe/pkg/filepaths" - "gopkg.in/olahol/melody.v1" -) - -func startAPIAsync(ctx context.Context, webSocket *melody.Melody) chan struct{} { - doneChan := make(chan struct{}) - - go func() { - gin.SetMode(gin.ReleaseMode) - router := gin.New() - // only add the Recovery middleware - router.Use(gin.Recovery()) - - assetsDirectory := filepaths.EnsureDashboardAssetsDir() - - router.Use(static.Serve("/", static.LocalFile(assetsDirectory, true))) - - router.GET("/ws", func(c *gin.Context) { - webSocket.HandleRequest(c.Writer, c.Request) - }) - - router.NoRoute(func(c *gin.Context) { - // https://stackoverflow.com/questions/49547/how-do-we-control-web-page-caching-across-all-browsers - c.Header("Cache-Control", "no-cache, no-store, must-revalidate") // HTTP 1.1. - c.Header("Pragma", "no-cache") // HTTP 1.0. - c.Header("Expires", "0") // Proxies. - c.File(path.Join(assetsDirectory, "index.html")) - }) - - dashboardServerPort := viper.GetInt(constants.ArgDashboardPort) - dashboardServerListen := "localhost" - if viper.GetString(constants.ArgDashboardListen) == string(ListenTypeNetwork) { - dashboardServerListen = "" - } - - srv := &http.Server{ - Addr: fmt.Sprintf("%s:%d", dashboardServerListen, dashboardServerPort), - Handler: router, - } - - go func() { - // service connections - if err := srv.ListenAndServe(); err != nil { - log.Printf("listen: %s\n", err) - } - }() - - outputReady(ctx, fmt.Sprintf("Dashboard server started on %d and listening on %s", dashboardServerPort, viper.GetString(constants.ArgDashboardListen))) - OutputMessage(ctx, fmt.Sprintf("Visit http://localhost:%d", dashboardServerPort)) - OutputMessage(ctx, "Press Ctrl+C to exit") - <-ctx.Done() - log.Println("Shutdown Server…") - - shutdownCtx, cancel := context.WithTimeout(context.Background(), 5*time.Second) - defer cancel() - - if err := srv.Shutdown(shutdownCtx); err != nil { - error_helpers.ShowErrorWithMessage(ctx, err, "Server shutdown failed") - } - log.Println("[TRACE] Server exiting") - - // indicate the API server is done - doneChan <- struct{}{} - }() - - return doneChan -} diff --git a/pkg/dashboard/dashboardserver/output.go b/pkg/dashboard/dashboardserver/output.go deleted file mode 100644 index 84791cd4b..000000000 --- a/pkg/dashboard/dashboardserver/output.go +++ /dev/null @@ -1,75 +0,0 @@ -package dashboardserver - -import ( - "context" - "fmt" - "io" - "os" - "path/filepath" - "time" - - "github.com/fatih/color" - "github.com/mattn/go-isatty" - "github.com/spf13/viper" - "github.com/turbot/steampipe/pkg/constants" - "github.com/turbot/steampipe/pkg/filepaths" -) - -var logSink io.Writer - -const ( - errorPrefix = "[ Error ]" - messagePrefix = "[ Message ]" - readyPrefix = "[ Ready ]" - waitPrefix = "[ Wait ]" -) - -func initLogSink() { - if viper.GetBool(constants.ArgServiceMode) { - logName := fmt.Sprintf("dashboard-%s.log", time.Now().Format("2006-01-02")) - logPath := filepath.Join(filepaths.EnsureLogDir(), logName) - f, err := os.OpenFile(logPath, os.O_RDWR|os.O_CREATE|os.O_APPEND, 0666) - if err != nil { - fmt.Printf("failed to open dashboard manager log file: %s\n", err.Error()) - os.Exit(3) - } - logSink = f - } else { - logSink = os.Stdout - } -} - -func output(_ context.Context, prefix string, msg interface{}) { - if logSink == nil { - logSink = os.Stdout - } - fmt.Fprintf(logSink, "%s %v\n", prefix, msg) -} - -func OutputMessage(ctx context.Context, msg string) { - output(ctx, applyColor(messagePrefix, color.HiGreenString), msg) -} - -func OutputWarning(ctx context.Context, msg string) { - output(ctx, applyColor(messagePrefix, color.RedString), msg) -} - -func OutputError(ctx context.Context, err error) { - output(ctx, applyColor(errorPrefix, color.RedString), err) -} - -func outputReady(ctx context.Context, msg string) { - output(ctx, applyColor(readyPrefix, color.GreenString), msg) -} - -func OutputWait(ctx context.Context, msg string) { - output(ctx, applyColor(waitPrefix, color.CyanString), msg) -} - -func applyColor(str string, color func(format string, a ...interface{}) string) string { - if !isatty.IsTerminal(os.Stdout.Fd()) || viper.GetBool(constants.ArgServiceMode) { - return str - } else { - return color((str)) - } -} diff --git a/pkg/dashboard/dashboardserver/payload.go b/pkg/dashboard/dashboardserver/payload.go deleted file mode 100644 index 6c95cb9f4..000000000 --- a/pkg/dashboard/dashboardserver/payload.go +++ /dev/null @@ -1,244 +0,0 @@ -package dashboardserver - -import ( - "encoding/json" - "fmt" - "github.com/spf13/viper" - typeHelpers "github.com/turbot/go-kit/types" - "github.com/turbot/steampipe/pkg/constants" - "github.com/turbot/steampipe/pkg/dashboard/dashboardevents" - "github.com/turbot/steampipe/pkg/dashboard/dashboardexecute" - "github.com/turbot/steampipe/pkg/steampipeconfig" - "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" - "github.com/turbot/steampipe/pkg/version" -) - -func buildDashboardMetadataPayload(workspaceResources *modconfig.ResourceMaps, cloudMetadata *steampipeconfig.CloudMetadata) ([]byte, error) { - installedMods := make(map[string]ModDashboardMetadata) - for _, mod := range workspaceResources.Mods { - // Ignore current mod - if mod.FullName == workspaceResources.Mod.FullName { - continue - } - installedMods[mod.FullName] = ModDashboardMetadata{ - Title: typeHelpers.SafeString(mod.Title), - FullName: mod.FullName, - ShortName: mod.ShortName, - } - } - - payload := DashboardMetadataPayload{ - Action: "dashboard_metadata", - Metadata: DashboardMetadata{ - CLI: DashboardCLIMetadata{ - Version: version.VersionString, - }, - InstalledMods: installedMods, - Telemetry: viper.GetString(constants.ArgTelemetry), - }, - } - - if mod := workspaceResources.Mod; mod != nil { - payload.Metadata.Mod = &ModDashboardMetadata{ - Title: typeHelpers.SafeString(mod.Title), - FullName: mod.FullName, - ShortName: mod.ShortName, - } - } - // if telemetry is enabled, send cloud metadata - if payload.Metadata.Telemetry != constants.TelemetryNone { - payload.Metadata.Cloud = cloudMetadata - } - - return json.Marshal(payload) -} - -func addBenchmarkChildren(benchmark *modconfig.Benchmark, recordTrunk bool, trunk []string, trunks map[string][][]string) []ModAvailableBenchmark { - var children []ModAvailableBenchmark - for _, child := range benchmark.GetChildren() { - switch t := child.(type) { - case *modconfig.Benchmark: - childTrunk := make([]string, len(trunk)+1) - copy(childTrunk, trunk) - childTrunk[len(childTrunk)-1] = t.FullName - if recordTrunk { - trunks[t.FullName] = append(trunks[t.FullName], childTrunk) - } - availableBenchmark := ModAvailableBenchmark{ - Title: t.GetTitle(), - FullName: t.FullName, - ShortName: t.ShortName, - Tags: t.Tags, - Children: addBenchmarkChildren(t, recordTrunk, childTrunk, trunks), - } - children = append(children, availableBenchmark) - } - } - return children -} - -func buildAvailableDashboardsPayload(workspaceResources *modconfig.ResourceMaps) ([]byte, error) { - - payload := AvailableDashboardsPayload{ - Action: "available_dashboards", - Dashboards: make(map[string]ModAvailableDashboard), - Benchmarks: make(map[string]ModAvailableBenchmark), - Snapshots: workspaceResources.Snapshots, - } - - // if workspace resources has a mod, populate dashboards and benchmarks - if workspaceResources.Mod != nil { - // build a map of the dashboards provided by each mod - - // iterate over the dashboards for the top level mod - this will include the dashboards from dependency mods - for _, dashboard := range workspaceResources.Mod.ResourceMaps.Dashboards { - mod := dashboard.Mod - // add this dashboard - payload.Dashboards[dashboard.FullName] = ModAvailableDashboard{ - Title: typeHelpers.SafeString(dashboard.Title), - FullName: dashboard.FullName, - ShortName: dashboard.ShortName, - Tags: dashboard.Tags, - ModFullName: mod.FullName, - } - } - - benchmarkTrunks := make(map[string][][]string) - for _, benchmark := range workspaceResources.Mod.ResourceMaps.Benchmarks { - if benchmark.IsAnonymous() { - continue - } - - // Find any benchmarks who have a parent that is a mod - we consider these top-level - isTopLevel := false - for _, parent := range benchmark.GetParents() { - switch parent.(type) { - case *modconfig.Mod: - isTopLevel = true - } - } - - mod := benchmark.Mod - trunk := []string{benchmark.FullName} - - if isTopLevel { - benchmarkTrunks[benchmark.FullName] = [][]string{trunk} - } - - availableBenchmark := ModAvailableBenchmark{ - Title: benchmark.GetTitle(), - FullName: benchmark.FullName, - ShortName: benchmark.ShortName, - Tags: benchmark.Tags, - IsTopLevel: isTopLevel, - Children: addBenchmarkChildren(benchmark, isTopLevel, trunk, benchmarkTrunks), - ModFullName: mod.FullName, - } - - payload.Benchmarks[benchmark.FullName] = availableBenchmark - } - for benchmarkName, trunks := range benchmarkTrunks { - if foundBenchmark, ok := payload.Benchmarks[benchmarkName]; ok { - foundBenchmark.Trunks = trunks - payload.Benchmarks[benchmarkName] = foundBenchmark - } - } - } - - return json.Marshal(payload) -} - -func buildWorkspaceErrorPayload(e *dashboardevents.WorkspaceError) ([]byte, error) { - payload := ErrorPayload{ - Action: "workspace_error", - Error: e.Error.Error(), - } - return json.Marshal(payload) -} - -func buildControlCompletePayload(event *dashboardevents.ControlComplete) ([]byte, error) { - payload := ControlEventPayload{ - Action: "control_complete", - Control: event.Control, - Name: event.Name, - Progress: event.Progress, - ExecutionId: event.ExecutionId, - Timestamp: event.Timestamp, - } - return json.Marshal(payload) -} - -func buildControlErrorPayload(event *dashboardevents.ControlError) ([]byte, error) { - payload := ControlEventPayload{ - Action: "control_error", - Control: event.Control, - Name: event.Name, - Progress: event.Progress, - ExecutionId: event.ExecutionId, - Timestamp: event.Timestamp, - } - return json.Marshal(payload) -} - -func buildLeafNodeUpdatedPayload(event *dashboardevents.LeafNodeUpdated) ([]byte, error) { - payload := LeafNodeUpdatedPayload{ - SchemaVersion: fmt.Sprintf("%d", LeafNodeUpdatedSchemaVersion), - Action: "leaf_node_updated", - DashboardNode: event.LeafNode, - ExecutionId: event.ExecutionId, - Timestamp: event.Timestamp, - } - return json.Marshal(payload) -} - -func buildExecutionStartedPayload(event *dashboardevents.ExecutionStarted) ([]byte, error) { - payload := ExecutionStartedPayload{ - SchemaVersion: fmt.Sprintf("%d", ExecutionStartedSchemaVersion), - Action: "execution_started", - ExecutionId: event.ExecutionId, - Panels: event.Panels, - Layout: event.Root.AsTreeNode(), - Inputs: event.Inputs, - Variables: event.Variables, - StartTime: event.StartTime, - } - return json.Marshal(payload) -} - -func buildExecutionErrorPayload(event *dashboardevents.ExecutionError) ([]byte, error) { - payload := ExecutionErrorPayload{ - Action: "execution_error", - Error: event.Error.Error(), - Timestamp: event.Timestamp, - } - return json.Marshal(payload) -} - -func buildExecutionCompletePayload(event *dashboardevents.ExecutionComplete) ([]byte, error) { - snap := dashboardexecute.ExecutionCompleteToSnapshot(event) - payload := &ExecutionCompletePayload{ - Action: "execution_complete", - SchemaVersion: fmt.Sprintf("%d", ExecutionCompletePayloadSchemaVersion), - ExecutionId: event.ExecutionId, - Snapshot: snap, - } - return json.Marshal(payload) -} - -func buildDisplaySnapshotPayload(snap map[string]any) ([]byte, error) { - payload := &DisplaySnapshotPayload{ - Action: "execution_complete", - SchemaVersion: fmt.Sprintf("%d", ExecutionCompletePayloadSchemaVersion), - Snapshot: snap, - } - return json.Marshal(payload) -} - -func buildInputValuesClearedPayload(event *dashboardevents.InputValuesCleared) ([]byte, error) { - payload := InputValuesClearedPayload{ - Action: "input_values_cleared", - ClearedInputs: event.ClearedInputs, - ExecutionId: event.ExecutionId, - } - return json.Marshal(payload) -} diff --git a/pkg/dashboard/dashboardserver/server.go b/pkg/dashboard/dashboardserver/server.go deleted file mode 100644 index 5f7b6394f..000000000 --- a/pkg/dashboard/dashboardserver/server.go +++ /dev/null @@ -1,479 +0,0 @@ -package dashboardserver - -import ( - "context" - "encoding/json" - "fmt" - "github.com/turbot/go-kit/helpers" - typeHelpers "github.com/turbot/go-kit/types" - "github.com/turbot/steampipe/pkg/dashboard/dashboardevents" - "github.com/turbot/steampipe/pkg/dashboard/dashboardexecute" - "github.com/turbot/steampipe/pkg/db/db_common" - "github.com/turbot/steampipe/pkg/error_helpers" - "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" - "github.com/turbot/steampipe/pkg/workspace" - "gopkg.in/olahol/melody.v1" - "log" - "os" - "reflect" - "strings" - "sync" -) - -type Server struct { - dbClient db_common.Client - mutex *sync.Mutex - dashboardClients map[string]*DashboardClientInfo - webSocket *melody.Melody - workspace *workspace.Workspace -} - -func NewServer(ctx context.Context, dbClient db_common.Client, w *workspace.Workspace) (*Server, error) { - initLogSink() - - OutputWait(ctx, "Starting Dashboard Server") - - webSocket := melody.New() - - var dashboardClients = make(map[string]*DashboardClientInfo) - - var mutex = &sync.Mutex{} - - server := &Server{ - dbClient: dbClient, - mutex: mutex, - dashboardClients: dashboardClients, - webSocket: webSocket, - workspace: w, - } - - w.RegisterDashboardEventHandler(ctx, server.HandleDashboardEvent) - err := w.SetupWatcher(ctx, dbClient, func(c context.Context, e error) {}) - OutputMessage(ctx, "Workspace loaded") - - return server, err -} - -// Start starts the API server -// it returns a channel which is signalled when the API server terminates -func (s *Server) Start(ctx context.Context) chan struct{} { - s.initAsync(ctx) - return startAPIAsync(ctx, s.webSocket) -} - -// Shutdown stops the API server -func (s *Server) Shutdown(ctx context.Context) { - log.Println("[TRACE] Server shutdown") - - if s.webSocket != nil { - log.Println("[TRACE] closing websocket") - if err := s.webSocket.Close(); err != nil { - error_helpers.ShowErrorWithMessage(ctx, err, "Websocket shutdown failed") - } - log.Println("[TRACE] closed websocket") - } - - log.Println("[TRACE] Server shutdown complete") - -} - -func (s *Server) HandleDashboardEvent(ctx context.Context, event dashboardevents.DashboardEvent) { - var payloadError error - var payload []byte - defer func() { - if payloadError != nil { - // we don't expect the build functions to ever error during marshalling - // this is because the data getting marshalled are not expected to have go specific - // properties/data in them - panic(fmt.Errorf("error building payload for '%s': %v", reflect.TypeOf(event).String(), payloadError)) - } - }() - - switch e := event.(type) { - - case *dashboardevents.WorkspaceError: - log.Printf("[TRACE] WorkspaceError event: %s", e.Error) - payload, payloadError = buildWorkspaceErrorPayload(e) - if payloadError != nil { - return - } - _ = s.webSocket.Broadcast(payload) - OutputError(ctx, e.Error) - - case *dashboardevents.ExecutionStarted: - log.Printf("[TRACE] ExecutionStarted event session %s, dashboard %s", e.Session, e.Root.GetName()) - payload, payloadError = buildExecutionStartedPayload(e) - if payloadError != nil { - return - } - s.writePayloadToSession(e.Session, payload) - OutputWait(ctx, fmt.Sprintf("Dashboard execution started: %s", e.Root.GetName())) - - case *dashboardevents.ExecutionError: - log.Println("[TRACE] execution error event") - payload, payloadError = buildExecutionErrorPayload(e) - if payloadError != nil { - return - } - - s.writePayloadToSession(e.Session, payload) - OutputError(ctx, e.Error) - - case *dashboardevents.ExecutionComplete: - log.Println("[TRACE] execution complete event") - payload, payloadError = buildExecutionCompletePayload(e) - if payloadError != nil { - return - } - dashboardName := e.Root.GetName() - s.writePayloadToSession(e.Session, payload) - outputReady(ctx, fmt.Sprintf("Execution complete: %s", dashboardName)) - - case *dashboardevents.ControlComplete: - log.Printf("[TRACE] ControlComplete event session %s, control %s", e.Session, e.Control.GetControlId()) - payload, payloadError = buildControlCompletePayload(e) - if payloadError != nil { - return - } - s.writePayloadToSession(e.Session, payload) - - case *dashboardevents.ControlError: - log.Printf("[TRACE] ControlError event session %s, control %s", e.Session, e.Control.GetControlId()) - payload, payloadError = buildControlErrorPayload(e) - if payloadError != nil { - return - } - s.writePayloadToSession(e.Session, payload) - - case *dashboardevents.LeafNodeUpdated: - payload, payloadError = buildLeafNodeUpdatedPayload(e) - if payloadError != nil { - return - } - s.writePayloadToSession(e.Session, payload) - - case *dashboardevents.DashboardChanged: - log.Println("[TRACE] DashboardChanged event") - deletedDashboards := e.DeletedDashboards - newDashboards := e.NewDashboards - - changedBenchmarks := e.ChangedBenchmarks - changedCategories := e.ChangedCategories - changedContainers := e.ChangedContainers - changedControls := e.ChangedControls - changedCards := e.ChangedCards - changedCharts := e.ChangedCharts - changedDashboards := e.ChangedDashboards - changedEdges := e.ChangedEdges - changedFlows := e.ChangedFlows - changedGraphs := e.ChangedGraphs - changedHierarchies := e.ChangedHierarchies - changedImages := e.ChangedImages - changedInputs := e.ChangedInputs - changedNodes := e.ChangedNodes - changedTables := e.ChangedTables - changedTexts := e.ChangedTexts - - // If nothing has changed, ignore - if len(deletedDashboards) == 0 && - len(newDashboards) == 0 && - len(changedBenchmarks) == 0 && - len(changedCategories) == 0 && - len(changedContainers) == 0 && - len(changedControls) == 0 && - len(changedCards) == 0 && - len(changedCharts) == 0 && - len(changedDashboards) == 0 && - len(changedEdges) == 0 && - len(changedFlows) == 0 && - len(changedGraphs) == 0 && - len(changedHierarchies) == 0 && - len(changedImages) == 0 && - len(changedInputs) == 0 && - len(changedNodes) == 0 && - len(changedTables) == 0 && - len(changedTexts) == 0 { - return - } - - for k, v := range s.dashboardClients { - log.Printf("[TRACE] Dashboard client: %v %v\n", k, typeHelpers.SafeString(v.Dashboard)) - } - - // If) any deleted/new/changed dashboards, emit an available dashboards message to clients - if len(deletedDashboards) != 0 || len(newDashboards) != 0 || len(changedDashboards) != 0 || len(changedBenchmarks) != 0 { - OutputMessage(ctx, "Available Dashboards updated") - - // Emit dashboard metadata event in case there is a new mod - else the UI won't know about this mod - payload, payloadError = buildDashboardMetadataPayload(s.workspace.GetResourceMaps(), s.workspace.CloudMetadata) - if payloadError != nil { - return - } - _ = s.webSocket.Broadcast(payload) - - // Emit available dashboards event - payload, payloadError = buildAvailableDashboardsPayload(s.workspace.GetResourceMaps()) - if payloadError != nil { - return - } - _ = s.webSocket.Broadcast(payload) - } - - var dashboardsBeingWatched []string - - dashboardClients := s.getDashboardClients() - for _, dashboardClientInfo := range dashboardClients { - dashboardName := typeHelpers.SafeString(dashboardClientInfo.Dashboard) - if dashboardClientInfo.Dashboard != nil { - if helpers.StringSliceContains(dashboardsBeingWatched, dashboardName) { - continue - } - dashboardsBeingWatched = append(dashboardsBeingWatched, dashboardName) - } - } - - var changedDashboardNames []string - var newDashboardNames []string - - // Process the changed items and make a note of the dashboard(s) they're in - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedBenchmarks)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCategories)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedContainers)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedControls)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCards)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedCharts)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedEdges)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedFlows)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedGraphs)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedHierarchies)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedImages)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedInputs)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedNodes)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedTables)...) - changedDashboardNames = append(changedDashboardNames, getDashboardsInterestedInResourceChanges(dashboardsBeingWatched, changedDashboardNames, changedTexts)...) - - for _, changedDashboard := range changedDashboards { - if helpers.StringSliceContains(changedDashboardNames, changedDashboard.Name) { - continue - } - changedDashboardNames = append(changedDashboardNames, changedDashboard.Name) - } - - for _, changedDashboardName := range changedDashboardNames { - sessionMap := s.getDashboardClients() - for sessionId, dashboardClientInfo := range sessionMap { - if typeHelpers.SafeString(dashboardClientInfo.Dashboard) == changedDashboardName { - _ = dashboardexecute.Executor.ExecuteDashboard(ctx, sessionId, changedDashboardName, dashboardClientInfo.DashboardInputs, s.workspace, s.dbClient) - } - } - } - - // Special case - if we previously had a workspace error, any previously existing dashboards - // will come in here as new, so we need to check if any of those new dashboards are being watched. - // If so, execute them - for _, newDashboard := range newDashboards { - if helpers.StringSliceContains(newDashboardNames, newDashboard.Name()) { - continue - } - newDashboardNames = append(newDashboardNames, newDashboard.Name()) - } - - sessionMap := s.getDashboardClients() - for _, newDashboardName := range newDashboardNames { - for sessionId, dashboardClientInfo := range sessionMap { - if typeHelpers.SafeString(dashboardClientInfo.Dashboard) == newDashboardName { - _ = dashboardexecute.Executor.ExecuteDashboard(ctx, sessionId, newDashboardName, dashboardClientInfo.DashboardInputs, s.workspace, s.dbClient) - } - } - } - - case *dashboardevents.InputValuesCleared: - log.Println("[TRACE] input values cleared event", *e) - - payload, payloadError = buildInputValuesClearedPayload(e) - if payloadError != nil { - return - } - - dashboardClients := s.getDashboardClients() - if sessionInfo, ok := dashboardClients[e.Session]; ok { - for _, clearedInput := range e.ClearedInputs { - delete(sessionInfo.DashboardInputs, clearedInput) - } - } - s.writePayloadToSession(e.Session, payload) - } -} - -func (s *Server) initAsync(ctx context.Context) { - go func() { - // Return list of dashboards on connect - s.webSocket.HandleConnect(func(session *melody.Session) { - log.Println("[TRACE] client connected") - s.addSession(session) - }) - - s.webSocket.HandleDisconnect(func(session *melody.Session) { - log.Println("[TRACE] client disconnected") - s.clearSession(ctx, session) - }) - - s.webSocket.HandleMessage(s.handleMessageFunc(ctx)) - OutputMessage(ctx, "Initialization complete") - }() -} - -func (s *Server) handleMessageFunc(ctx context.Context) func(session *melody.Session, msg []byte) { - return func(session *melody.Session, msg []byte) { - - sessionId := s.getSessionId(session) - - var request ClientRequest - // if we could not decode message - ignore - err := json.Unmarshal(msg, &request) - if err != nil { - log.Printf("[WARN] failed to marshal message: %s", err.Error()) - return - } - - if request.Action != "keep_alive" { - log.Println("[TRACE] message", string(msg)) - } - - switch request.Action { - case "get_dashboard_metadata": - payload, err := buildDashboardMetadataPayload(s.workspace.GetResourceMaps(), s.workspace.CloudMetadata) - if err != nil { - panic(fmt.Errorf("error building payload for get_metadata: %v", err)) - } - _ = session.Write(payload) - case "get_available_dashboards": - payload, err := buildAvailableDashboardsPayload(s.workspace.GetResourceMaps()) - if err != nil { - panic(fmt.Errorf("error building payload for get_available_dashboards: %v", err)) - } - _ = session.Write(payload) - case "select_dashboard": - s.setDashboardForSession(sessionId, request.Payload.Dashboard.FullName, request.Payload.InputValues) - _ = dashboardexecute.Executor.ExecuteDashboard(ctx, sessionId, request.Payload.Dashboard.FullName, request.Payload.InputValues, s.workspace, s.dbClient) - case "select_snapshot": - snapshotName := request.Payload.Dashboard.FullName - s.setDashboardForSession(sessionId, snapshotName, request.Payload.InputValues) - snap, err := dashboardexecute.Executor.LoadSnapshot(ctx, sessionId, snapshotName, s.workspace) - // TACTICAL- handle with error message - error_helpers.FailOnError(err) - // error handling??? - payload, err := buildDisplaySnapshotPayload(snap) - // TACTICAL- handle with error message - error_helpers.FailOnError(err) - - s.writePayloadToSession(sessionId, payload) - outputReady(ctx, fmt.Sprintf("Show snapshot complete: %s", snapshotName)) - case "input_changed": - s.setDashboardInputsForSession(sessionId, request.Payload.InputValues) - _ = dashboardexecute.Executor.OnInputChanged(ctx, sessionId, request.Payload.InputValues, request.Payload.ChangedInput) - case "clear_dashboard": - s.setDashboardInputsForSession(sessionId, nil) - dashboardexecute.Executor.CancelExecutionForSession(ctx, sessionId) - } - } -} - -func (s *Server) clearSession(ctx context.Context, session *melody.Session) { - if strings.ToUpper(os.Getenv("DEBUG")) == "TRUE" { - return - } - - sessionId := s.getSessionId(session) - - dashboardexecute.Executor.CancelExecutionForSession(ctx, sessionId) - - s.deleteDashboardClient(sessionId) -} - -func (s *Server) addSession(session *melody.Session) { - sessionId := s.getSessionId(session) - - clientSession := &DashboardClientInfo{ - Session: session, - } - - s.addDashboardClient(sessionId, clientSession) -} - -func (s *Server) setDashboardInputsForSession(sessionId string, inputs map[string]interface{}) { - dashboardClients := s.getDashboardClients() - if sessionInfo, ok := dashboardClients[sessionId]; ok { - sessionInfo.DashboardInputs = inputs - } -} - -func (s *Server) getSessionId(session *melody.Session) string { - return fmt.Sprintf("%p", session) -} - -// functions providing locked access to member properties - -func (s *Server) setDashboardForSession(sessionId string, dashboardName string, inputs map[string]interface{}) *DashboardClientInfo { - s.mutex.Lock() - defer s.mutex.Unlock() - - dashboardClientInfo := s.dashboardClients[sessionId] - dashboardClientInfo.Dashboard = &dashboardName - dashboardClientInfo.DashboardInputs = inputs - - return dashboardClientInfo -} - -func (s *Server) writePayloadToSession(sessionId string, payload []byte) { - s.mutex.Lock() - defer s.mutex.Unlock() - - if sessionInfo, ok := s.dashboardClients[sessionId]; ok { - _ = sessionInfo.Session.Write(payload) - } -} - -func (s *Server) getDashboardClients() map[string]*DashboardClientInfo { - s.mutex.Lock() - defer s.mutex.Unlock() - - return s.dashboardClients -} - -func (s *Server) addDashboardClient(sessionId string, clientSession *DashboardClientInfo) { - s.mutex.Lock() - s.dashboardClients[sessionId] = clientSession - s.mutex.Unlock() -} - -func (s *Server) deleteDashboardClient(sessionId string) { - s.mutex.Lock() - delete(s.dashboardClients, sessionId) - s.mutex.Unlock() -} - -func getDashboardsInterestedInResourceChanges(dashboardsBeingWatched []string, existingChangedDashboardNames []string, changedItems []*modconfig.DashboardTreeItemDiffs) []string { - var changedDashboardNames []string - - for _, changedItem := range changedItems { - paths := changedItem.Item.GetPaths() - for _, nodePath := range paths { - for _, nodeName := range nodePath { - resourceParts, _ := modconfig.ParseResourceName(nodeName) - // We only care about changes from these resource types - if !helpers.StringSliceContains([]string{modconfig.BlockTypeDashboard, modconfig.BlockTypeBenchmark}, resourceParts.ItemType) { - continue - } - - if helpers.StringSliceContains(existingChangedDashboardNames, nodeName) || helpers.StringSliceContains(changedDashboardNames, nodeName) || !helpers.StringSliceContains(dashboardsBeingWatched, nodeName) { - continue - } - - changedDashboardNames = append(changedDashboardNames, nodeName) - } - } - } - - return changedDashboardNames -} diff --git a/pkg/dashboard/dashboardserver/types.go b/pkg/dashboard/dashboardserver/types.go deleted file mode 100644 index 5303eaa80..000000000 --- a/pkg/dashboard/dashboardserver/types.go +++ /dev/null @@ -1,173 +0,0 @@ -package dashboardserver - -import ( - "fmt" - "github.com/turbot/steampipe/pkg/control/controlstatus" - "github.com/turbot/steampipe/pkg/dashboard/dashboardtypes" - "github.com/turbot/steampipe/pkg/steampipeconfig" - "gopkg.in/olahol/melody.v1" - "time" -) - -type ListenType string - -const ( - ListenTypeLocal ListenType = "local" - ListenTypeNetwork ListenType = "network" -) - -// IsValid is a validator for ListenType known values -func (lt ListenType) IsValid() error { - switch lt { - case ListenTypeNetwork, ListenTypeLocal: - return nil - } - return fmt.Errorf("invalid listen type. Must be one of '%v' or '%v'", ListenTypeNetwork, ListenTypeLocal) -} - -type ListenPort int - -// IsValid is a validator for ListenType known values -func (lp ListenPort) IsValid() error { - if lp < 1 || lp > 65535 { - return fmt.Errorf("invalid port - must be within range (1:65535)") - } - return nil -} - -type ErrorPayload struct { - Action string `json:"action"` - Error string `json:"error"` -} - -var ExecutionStartedSchemaVersion int64 = 20221222 - -type ExecutionStartedPayload struct { - SchemaVersion string `json:"schema_version"` - Action string `json:"action"` - ExecutionId string `json:"execution_id"` - Panels map[string]any `json:"panels"` - Layout *dashboardtypes.SnapshotTreeNode `json:"layout"` - Inputs map[string]interface{} `json:"inputs,omitempty"` - Variables map[string]string `json:"variables,omitempty"` - StartTime time.Time `json:"start_time"` -} - -var LeafNodeUpdatedSchemaVersion int64 = 20221222 - -type LeafNodeUpdatedPayload struct { - SchemaVersion string `json:"schema_version"` - Action string `json:"action"` - DashboardNode map[string]any `json:"dashboard_node"` - ExecutionId string `json:"execution_id"` - Timestamp time.Time `json:"timestamp"` -} - -type ControlEventPayload struct { - Action string `json:"action"` - Control controlstatus.ControlRunStatusProvider `json:"control"` - Name string `json:"name"` - Progress *controlstatus.ControlProgress `json:"progress"` - ExecutionId string `json:"execution_id"` - Timestamp time.Time `json:"timestamp"` -} - -type ExecutionErrorPayload struct { - Action string `json:"action"` - Error string `json:"error"` - Timestamp time.Time `json:"timestamp"` -} - -var ExecutionCompletePayloadSchemaVersion int64 = 20221222 - -type ExecutionCompletePayload struct { - Action string `json:"action"` - SchemaVersion string `json:"schema_version"` - Snapshot *dashboardtypes.SteampipeSnapshot `json:"snapshot"` - ExecutionId string `json:"execution_id"` -} - -type DisplaySnapshotPayload struct { - Action string `json:"action"` - SchemaVersion string `json:"schema_version"` - // snapshot is a map here as we cannot deserialise SteampipeSnapshot into a struct - // (without custom deserialisation code) as the Panels property is an interface - Snapshot map[string]any `json:"snapshot"` - ExecutionId string `json:"execution_id"` -} - -type InputValuesClearedPayload struct { - Action string `json:"action"` - ClearedInputs []string `json:"cleared_inputs"` - ExecutionId string `json:"execution_id"` -} - -type DashboardClientInfo struct { - Session *melody.Session - Dashboard *string - DashboardInputs map[string]interface{} -} - -type ClientRequestDashboardPayload struct { - FullName string `json:"full_name"` -} - -type ClientRequestPayload struct { - Dashboard ClientRequestDashboardPayload `json:"dashboard"` - InputValues map[string]interface{} `json:"input_values"` - ChangedInput string `json:"changed_input"` -} - -type ClientRequest struct { - Action string `json:"action"` - Payload ClientRequestPayload `json:"payload"` -} - -type ModAvailableDashboard struct { - Title string `json:"title,omitempty"` - FullName string `json:"full_name"` - ShortName string `json:"short_name"` - Tags map[string]string `json:"tags"` - ModFullName string `json:"mod_full_name"` -} - -type ModAvailableBenchmark struct { - Title string `json:"title,omitempty"` - FullName string `json:"full_name"` - ShortName string `json:"short_name"` - Tags map[string]string `json:"tags"` - IsTopLevel bool `json:"is_top_level"` - Children []ModAvailableBenchmark `json:"children,omitempty"` - Trunks [][]string `json:"trunks"` - ModFullName string `json:"mod_full_name"` -} - -type AvailableDashboardsPayload struct { - Action string `json:"action"` - Dashboards map[string]ModAvailableDashboard `json:"dashboards"` - Benchmarks map[string]ModAvailableBenchmark `json:"benchmarks"` - Snapshots map[string]string `json:"snapshots"` -} - -type ModDashboardMetadata struct { - Title string `json:"title,omitempty"` - FullName string `json:"full_name"` - ShortName string `json:"short_name"` -} - -type DashboardCLIMetadata struct { - Version string `json:"version,omitempty"` -} - -type DashboardMetadata struct { - Mod *ModDashboardMetadata `json:"mod,omitempty"` - InstalledMods map[string]ModDashboardMetadata `json:"installed_mods,omitempty"` - CLI DashboardCLIMetadata `json:"cli"` - Cloud *steampipeconfig.CloudMetadata `json:"cloud,omitempty"` - Telemetry string `json:"telemetry"` -} - -type DashboardMetadataPayload struct { - Action string `json:"action"` - Metadata DashboardMetadata `json:"metadata"` -}