package cmd import ( "context" "errors" "fmt" "log" "os" "os/signal" "strings" "time" psutils "github.com/shirou/gopsutil/process" "github.com/spf13/cobra" "github.com/spf13/viper" "github.com/turbot/go-kit/helpers" pconstants "github.com/turbot/pipe-fittings/v2/constants" "github.com/turbot/pipe-fittings/v2/querydisplay" putils "github.com/turbot/pipe-fittings/v2/utils" "github.com/turbot/steampipe-plugin-sdk/v5/sperr" "github.com/turbot/steampipe/v2/pkg/cmdconfig" "github.com/turbot/steampipe/v2/pkg/constants" "github.com/turbot/steampipe/v2/pkg/db/db_local" "github.com/turbot/steampipe/v2/pkg/error_helpers" "github.com/turbot/steampipe/v2/pkg/filepaths" "github.com/turbot/steampipe/v2/pkg/pluginmanager" pb "github.com/turbot/steampipe/v2/pkg/pluginmanager_service/grpc/proto" "github.com/turbot/steampipe/v2/pkg/statushooks" ) func serviceCmd() *cobra.Command { var cmd = &cobra.Command{ Use: "service [command]", Args: cobra.NoArgs, Short: "Steampipe service management", Long: `Steampipe service management. Run Steampipe as a local service, exposing it as a database endpoint for connection from any Postgres compatible database client.`, } cmd.AddCommand(serviceStartCmd()) cmd.AddCommand(serviceStatusCmd()) cmd.AddCommand(serviceStopCmd()) cmd.AddCommand(serviceRestartCmd()) cmd.Flags().BoolP(pconstants.ArgHelp, "h", false, "Help for service") return cmd } // handler for service start func serviceStartCmd() *cobra.Command { var cmd = &cobra.Command{ Use: "start", Args: cobra.NoArgs, Run: runServiceStartCmd, Short: "Start Steampipe in service mode", Long: `Start the Steampipe service. Run Steampipe as a local service, exposing it as a database endpoint for connection from any Postgres compatible database client.`, } cmdconfig. OnCmd(cmd). AddBoolFlag(pconstants.ArgHelp, false, "Help for service start", cmdconfig.FlagOptions.WithShortHand("h")). AddIntFlag(pconstants.ArgDatabasePort, constants.DatabaseDefaultPort, "Database service port"). AddStringFlag(pconstants.ArgDatabaseListenAddresses, string(db_local.ListenTypeNetwork), "Accept connections from: `local` (an alias for `localhost` only), `network` (an alias for `*`), or a comma separated list of hosts and/or IP addresses"). AddStringFlag(pconstants.ArgServicePassword, "", "Set the database password for this session"). // default is false and hides the database user password from service start prompt AddBoolFlag(pconstants.ArgServiceShowPassword, false, "View database password for connecting from another machine"). // foreground enables the service to run in the foreground - till exit AddBoolFlag(pconstants.ArgForeground, false, "Run the service in the foreground"). // hidden flags for internal use AddStringFlag(pconstants.ArgInvoker, string(constants.InvokerService), "Invoked by \"service\" or \"query\"", cmdconfig.FlagOptions.Hidden()) return cmd } // serviceStatusCmd :: handler for service status func serviceStatusCmd() *cobra.Command { var cmd = &cobra.Command{ Use: "status", Args: cobra.NoArgs, Run: runServiceStatusCmd, Short: "Status of the Steampipe service", Long: `Status of the Steampipe service. Report current status of the Steampipe database service.`, } cmdconfig.OnCmd(cmd). AddBoolFlag(pconstants.ArgHelp, false, "Help for service status", cmdconfig.FlagOptions.WithShortHand("h")). // default is false and hides the database user password from service start prompt AddBoolFlag(pconstants.ArgServiceShowPassword, false, "View database password for connecting from another machine"). AddBoolFlag(pconstants.ArgAll, false, "Bypasses the INSTALL_DIR and reports status of all running steampipe services") return cmd } // handler for service stop func serviceStopCmd() *cobra.Command { cmd := &cobra.Command{ Use: "stop", Args: cobra.NoArgs, Run: runServiceStopCmd, Short: "Stop Steampipe service", Long: `Stop the Steampipe service.`, } cmdconfig. OnCmd(cmd). AddBoolFlag(pconstants.ArgHelp, false, "Help for service stop", cmdconfig.FlagOptions.WithShortHand("h")). AddBoolFlag(pconstants.ArgForce, false, "Forces all services to shutdown, releasing all open connections and ports") return cmd } // restarts the database service func serviceRestartCmd() *cobra.Command { var cmd = &cobra.Command{ Use: "restart", Args: cobra.NoArgs, Run: runServiceRestartCmd, Short: "Restart Steampipe service", Long: `Restart the Steampipe service.`, } cmdconfig. OnCmd(cmd). AddBoolFlag(pconstants.ArgHelp, false, "Help for service restart", cmdconfig.FlagOptions.WithShortHand("h")). AddBoolFlag(pconstants.ArgForce, false, "Forces the service to restart, releasing all open connections and ports") return cmd } func runServiceStartCmd(cmd *cobra.Command, _ []string) { ctx := cmd.Context() putils.LogTime("runServiceStartCmd start") defer func() { putils.LogTime("runServiceStartCmd end") if r := recover(); r != nil { error_helpers.ShowError(ctx, helpers.ToError(r)) if exitCode == constants.ExitCodeSuccessful { // there was an error and the exitcode // was not set to a non-zero value. // set it exitCode = constants.ExitCodeUnknownErrorPanic } } }() ctx, cancel := signal.NotifyContext(ctx, os.Interrupt, os.Kill) defer cancel() listenAddresses := db_local.StartListenType(viper.GetString(pconstants.ArgDatabaseListenAddresses)).ToListenAddresses() port := viper.GetInt(pconstants.ArgDatabasePort) if port < 1 || port > 65535 { exitCode = constants.ExitCodeInsufficientOrWrongInputs panic("Invalid port - must be within range (1:65535)") } invoker := constants.Invoker(cmdconfig.Viper().GetString(pconstants.ArgInvoker)) if invoker.IsValid() != nil { exitCode = constants.ExitCodeInsufficientOrWrongInputs error_helpers.FailOnError(invoker.IsValid()) } startResult, dbServiceStarted := startService(ctx, listenAddresses, port, invoker) alreadyRunning := !dbServiceStarted printStatus(ctx, startResult.DbState, startResult.PluginManagerState, alreadyRunning) if viper.GetBool(pconstants.ArgForeground) { runServiceInForeground(ctx) } } func startService(ctx context.Context, listenAddresses []string, port int, invoker constants.Invoker) (_ *db_local.StartResult, dbServiceStarted bool) { statushooks.Show(ctx) defer statushooks.Done(ctx) log.Printf("[TRACE] startService - listenAddresses=%q", listenAddresses) err := db_local.EnsureDBInstalled(ctx) if err != nil { exitCode = constants.ExitCodeServiceStartupFailure error_helpers.FailOnError(err) } // start db, refreshing connections startResult := startServiceAndRefreshConnections(ctx, listenAddresses, port, invoker) if startResult.Status == db_local.ServiceFailedToStart { error_helpers.ShowError(ctx, sperr.New("steampipe service failed to start")) exitCode = constants.ExitCodeServiceStartupFailure return } // if the service is already running, then service start should make the service persistent if startResult.Status == db_local.ServiceAlreadyRunning { // check that we have the same port and listen parameters if port != startResult.DbState.Port { exitCode = constants.ExitCodeInsufficientOrWrongInputs error_helpers.FailOnError(sperr.New("service is already running on port %d - cannot change port while it's running", startResult.DbState.Port)) } if !startResult.DbState.MatchWithGivenListenAddresses(listenAddresses) { exitCode = constants.ExitCodeInsufficientOrWrongInputs // this messaging assumes that the resolved addresses from the given addresses have not changed while the service is running // although this is an edge case, ideally, we should check for the resolved addresses and give the relevant message error_helpers.FailOnError(sperr.New("service is already running and listening on %s - cannot change listen address while it's running", strings.Join(startResult.DbState.ResolvedListenAddresses, ", "))) } // convert to being invoked by service startResult.DbState.Invoker = constants.InvokerService err = startResult.DbState.Save() if err != nil { exitCode = constants.ExitCodeFileSystemAccessFailure error_helpers.FailOnErrorWithMessage(err, "service was already running, but could not make it persistent") } } dbServiceStarted = startResult.Status == db_local.ServiceStarted return startResult, dbServiceStarted } func startServiceAndRefreshConnections(ctx context.Context, listenAddresses []string, port int, invoker constants.Invoker) *db_local.StartResult { startResult := db_local.StartServices(ctx, listenAddresses, port, invoker) if startResult.Error != nil { exitCode = constants.ExitCodeServiceStartupFailure error_helpers.FailOnError(startResult.Error) } if startResult.Status == db_local.ServiceStarted { // ask the plugin manager to refresh connections // this is executed asyncronously by the plugin manager // we ignore this error, since RefreshConnections is async and all errors will flow through // the notification system // we do not expect any I/O errors on this since the PluginManager is running in the same box _, _ = startResult.PluginManager.RefreshConnections(&pb.RefreshConnectionsRequest{}) } return startResult } func runServiceInForeground(ctx context.Context) { fmt.Println("Hit Ctrl+C to stop the service") sigIntChannel := make(chan os.Signal, 1) signal.Notify(sigIntChannel, os.Interrupt) checkTimer := time.NewTicker(100 * time.Millisecond) defer checkTimer.Stop() var lastCtrlC time.Time for { select { case <-checkTimer.C: // get the current status newInfo, err := db_local.GetState() if err != nil { continue } if newInfo == nil { fmt.Println("Steampipe service stopped.") return } case <-sigIntChannel: fmt.Print("\r") // if we have received this signal, then the user probably wants to shut down // everything. Shutdowns MUST NOT happen in cancellable contexts connectedClients, err := db_local.GetClientCount(context.Background()) if err != nil { // report the error in the off chance that there's one error_helpers.ShowError(ctx, err) return } // we know there will be at least 1 client (connectionWatcher) if connectedClients.TotalClients > 1 { if lastCtrlC.IsZero() || time.Since(lastCtrlC) > 30*time.Second { lastCtrlC = time.Now() fmt.Println(buildForegroundClientsConnectedMsg()) continue } } fmt.Println("Stopping Steampipe service.") if _, err := db_local.StopServices(ctx, false, constants.InvokerService); err != nil { error_helpers.ShowError(ctx, err) } else { fmt.Println("Steampipe service stopped.") } return } } } func runServiceRestartCmd(cmd *cobra.Command, _ []string) { ctx := cmd.Context() putils.LogTime("runServiceRestartCmd start") defer func() { putils.LogTime("runServiceRestartCmd end") if r := recover(); r != nil { error_helpers.ShowError(ctx, helpers.ToError(r)) if exitCode == constants.ExitCodeSuccessful { // there was an error and the exitcode // was not set to a non-zero value. // set it exitCode = constants.ExitCodeUnknownErrorPanic } } }() dbStartResult := restartService(ctx) if dbStartResult != nil { printStatus(ctx, dbStartResult.DbState, dbStartResult.PluginManagerState, false) } } func restartService(ctx context.Context) (_ *db_local.StartResult) { statushooks.Show(ctx) defer statushooks.Done(ctx) // get current db statue currentDbState, err := db_local.GetState() error_helpers.FailOnError(err) if currentDbState == nil { fmt.Println("Steampipe service is not running.") return } // stop db stopStatus, err := db_local.StopServices(ctx, viper.GetBool(pconstants.ArgForce), constants.InvokerService) if err != nil { exitCode = constants.ExitCodeServiceStopFailure error_helpers.FailOnErrorWithMessage(err, "could not stop current instance") } if stopStatus != db_local.ServiceStopped { fmt.Println(` Service stop failed. Try using: steampipe service restart --force to force a restart. `) return } // the DB must be installed and therefore is a noop, // and EnsureDBInstalled also checks and installs the latest FDW err = db_local.EnsureDBInstalled(ctx) if err != nil { exitCode = constants.ExitCodeServiceStartupFailure error_helpers.FailOnError(err) } // set the password in 'viper' so that it can be used by 'service start' viper.Set(pconstants.ArgServicePassword, currentDbState.Password) // start db dbStartResult := startServiceAndRefreshConnections(ctx, currentDbState.ResolvedListenAddresses, currentDbState.Port, currentDbState.Invoker) if dbStartResult.Status == db_local.ServiceFailedToStart { exitCode = constants.ExitCodeServiceStartupFailure fmt.Println("Steampipe service was stopped, but failed to restart.") return } return dbStartResult } func runServiceStatusCmd(cmd *cobra.Command, _ []string) { ctx := cmd.Context() putils.LogTime("runServiceStatusCmd status") defer func() { putils.LogTime("runServiceStatusCmd end") if r := recover(); r != nil { error_helpers.ShowError(ctx, helpers.ToError(r)) } }() if !db_local.IsDBInstalled() || !db_local.IsFDWInstalled() { fmt.Println("Steampipe service is not installed.") return } if viper.GetBool(pconstants.ArgAll) { showAllStatus(ctx) } else { dbState, dbStateErr := db_local.GetState() pmState, pmStateErr := pluginmanager.LoadState() if dbStateErr != nil || pmStateErr != nil { error_helpers.ShowError(ctx, composeStateError(dbStateErr, pmStateErr)) return } printStatus(ctx, dbState, pmState, false) } } func composeStateError(dbStateErr error, pmStateErr error) error { msg := "could not get Steampipe service status:" if dbStateErr != nil { msg = fmt.Sprintf(`%s failed to get db state: %s`, msg, dbStateErr.Error()) } if pmStateErr != nil { msg = fmt.Sprintf(`%s failed to get plugin manager state: %s`, msg, pmStateErr.Error()) } return errors.New(msg) } func runServiceStopCmd(cmd *cobra.Command, _ []string) { ctx := cmd.Context() putils.LogTime("runServiceStopCmd stop") var status db_local.StopStatus var dbStopError error var dbState *db_local.RunningDBInstanceInfo defer func() { putils.LogTime("runServiceStopCmd end") if r := recover(); r != nil { error_helpers.ShowError(ctx, helpers.ToError(r)) if exitCode == constants.ExitCodeSuccessful { // there was an error and the exitcode // was not set to a non-zero value. // set it exitCode = constants.ExitCodeUnknownErrorPanic } } }() force := cmdconfig.Viper().GetBool(pconstants.ArgForce) if force { status, dbStopError = db_local.StopServices(ctx, force, constants.InvokerService) dbStopError = error_helpers.CombineErrors(dbStopError) if dbStopError != nil { exitCode = constants.ExitCodeServiceStopFailure error_helpers.FailOnError(dbStopError) } } else { dbState, dbStopError = db_local.GetState() if dbStopError != nil { exitCode = constants.ExitCodeServiceStopFailure error_helpers.FailOnErrorWithMessage(dbStopError, "could not stop Steampipe service") } if dbState == nil { fmt.Println("Steampipe service is not running.") return } if dbState.Invoker != constants.InvokerService { printRunningImplicit(dbState.Invoker) return } // check if there are any connected clients to the service connectedClients, err := db_local.GetClientCount(ctx) if err != nil { exitCode = constants.ExitCodeServiceStopFailure error_helpers.FailOnErrorWithMessage(err, "service stop failed") } // if there are any clients connected (apart from plugin manager clients), do not exit if connectedClients.TotalClients-connectedClients.PluginManagerClients > 0 { printClientsConnected() return } status, err = db_local.StopServices(ctx, false, constants.InvokerService) if err != nil { exitCode = constants.ExitCodeServiceStopFailure error_helpers.FailOnErrorWithMessage(err, "service stop failed") } } switch status { case db_local.ServiceStopped: fmt.Println("Steampipe database service stopped.") case db_local.ServiceNotRunning: fmt.Println("Steampipe service is not running.") case db_local.ServiceStopFailed: fmt.Println("Could not stop Steampipe service.") case db_local.ServiceStopTimedOut: fmt.Println(` Service stop operation timed-out. This is probably because other clients are connected to the database service. Disconnect all clients, or use steampipe service stop --force to force a shutdown. `) } } func showAllStatus(ctx context.Context) { var processes []*psutils.Process var err error statushooks.SetStatus(ctx, "Getting details") processes, err = db_local.FindAllSteampipePostgresInstances(ctx) statushooks.Done(ctx) error_helpers.FailOnError(err) if len(processes) == 0 { fmt.Println("There are no steampipe services running.") return } headers := []string{"PID", "Install Directory", "Port", "Listen"} rows := [][]string{} for _, process := range processes { pid, installDir, port, listen := getServiceProcessDetails(process) rows = append(rows, []string{pid, installDir, port, string(listen)}) } querydisplay.ShowWrappedTable(headers, rows, &querydisplay.ShowWrappedTableOptions{AutoMerge: false}) } func getServiceProcessDetails(process *psutils.Process) (string, string, string, db_local.StartListenType) { cmdLine, _ := process.CmdlineSlice() installDir := strings.TrimSuffix(cmdLine[0], filepaths.ServiceExecutableRelativeLocation()) var port string var listenType db_local.StartListenType for idx, param := range cmdLine { if param == "-p" { port = cmdLine[idx+1] } if strings.HasPrefix(param, "listen_addresses") { if strings.Contains(param, "localhost") { listenType = db_local.ListenTypeLocal } else { listenType = db_local.ListenTypeNetwork } } } return fmt.Sprintf("%d", process.Pid), installDir, port, listenType } func printStatus(ctx context.Context, dbState *db_local.RunningDBInstanceInfo, pmState *pluginmanager.State, alreadyRunning bool) { if dbState == nil && !pmState.Running { fmt.Println("Service is not running") return } var statusMessage string prefix := `Steampipe service is running: ` if alreadyRunning { prefix = `Steampipe service is already running: ` } suffix := ` Managing the Steampipe service: # Get status of the service steampipe service status # View database password for connecting from another machine steampipe service status --show-password # Restart the service steampipe service restart # Stop the service steampipe service stop ` var connectionStr string var password string if viper.GetBool(pconstants.ArgServiceShowPassword) { connectionStr = fmt.Sprintf( "postgres://%v:%v@%v:%v/%v", dbState.User, dbState.Password, putils.GetFirstListenAddress(dbState.ResolvedListenAddresses), dbState.Port, dbState.Database, ) password = dbState.Password } else { connectionStr = fmt.Sprintf( "postgres://%v@%v:%v/%v", dbState.User, putils.GetFirstListenAddress(dbState.ResolvedListenAddresses), dbState.Port, dbState.Database, ) password = "********* [use --show-password to reveal]" } postgresFmt := ` Database: Host(s): %v Port: %v Database: %v User: %v Password: %v Connection string: %v ` postgresMsg := fmt.Sprintf( postgresFmt, strings.Join(dbState.ResolvedListenAddresses, ", "), dbState.Port, dbState.Database, dbState.User, password, connectionStr, ) if dbState.Invoker == constants.InvokerService { statusMessage = fmt.Sprintf( "%s%s%s", prefix, postgresMsg, suffix, ) } else { msg := ` Steampipe service was started for an active %s session. The service will exit when all active sessions exit. To keep the service running after the %s session completes, use %s. ` statusMessage = fmt.Sprintf( msg, fmt.Sprintf("steampipe %s", dbState.Invoker), dbState.Invoker, pconstants.Bold("steampipe service start"), ) } fmt.Println(statusMessage) if dbState != nil && pmState == nil { // the service is running, but the plugin_manager is not running and there's no state file // meaning that it cannot be restarted by the FDW // it's an ERROR error_helpers.ShowError(ctx, sperr.New(` Service is running, but the Plugin Manager cannot be recovered. Please use %s to recover the service `, pconstants.Bold("steampipe service restart"), )) } } func printRunningImplicit(invoker constants.Invoker) { fmt.Printf(` Steampipe service is running exclusively for an active %s session. To force stop the service, use %s `, fmt.Sprintf("steampipe %s", invoker), pconstants.Bold("steampipe service stop --force"), ) } func printClientsConnected() { fmt.Printf( ` Cannot stop service since there are clients connected to the service. To force stop the service, use %s `, pconstants.Bold("steampipe service stop --force"), ) } func buildForegroundClientsConnectedMsg() string { return ` Not shutting down service as there as clients connected. To force shutdown, press Ctrl+C again. ` }