diff --git a/cmd/daemon.go b/cmd/daemon.go deleted file mode 100644 index 1118d88d7..000000000 --- a/cmd/daemon.go +++ /dev/null @@ -1,48 +0,0 @@ -package cmd - -import ( - "fmt" - "os" - "os/exec" - "os/signal" - "syscall" - - "github.com/spf13/cobra" - "github.com/spf13/viper" - "github.com/turbot/steampipe/constants" -) - -func daemonCmd() *cobra.Command { - cmd := &cobra.Command{ - Use: "daemon", - Run: runDaemonCmd, - Hidden: true, - } - return cmd -} - -func runDaemonCmd(cmd *cobra.Command, args []string) { - // get the location of the currently running steampipe process - executable, err := os.Executable() - if err != nil { - fmt.Printf("[WARN] plugin manager start() - failed to get steampipe executable path: %s", err) - os.Exit(1) - } - // create command which will run steampipe plugin-manager - pluginManagerCmd := exec.Command(executable, "plugin-manager", "--install-dir", viper.GetString(constants.ArgInstallDir)) - pluginManagerCmd.Stdout = os.Stdout - pluginManagerCmd.Start() - - // wait to be killed - sigchan := make(chan os.Signal, 1) - signal.Notify(sigchan, - syscall.SIGINT, - syscall.SIGKILL, - syscall.SIGTERM, - syscall.SIGQUIT) - <-sigchan - - // kill our child - // NOTE we will not do this if kill -9 is run - pluginManagerCmd.Process.Kill() -} diff --git a/cmd/plugin-manager.go b/cmd/plugin_manager.go similarity index 100% rename from cmd/plugin-manager.go rename to cmd/plugin_manager.go diff --git a/cmd/root.go b/cmd/root.go index 927a14c17..b012e5794 100644 --- a/cmd/root.go +++ b/cmd/root.go @@ -148,7 +148,6 @@ func AddCommands() { checkCmd(), serviceCmd(), generateCompletionScriptsCmd(), - daemonCmd(), pluginManagerCmd(), ) } diff --git a/connection_watcher/connection_watcher.go b/connection_watcher/connection_watcher.go index 70fa02827..48df5f92d 100644 --- a/connection_watcher/connection_watcher.go +++ b/connection_watcher/connection_watcher.go @@ -3,9 +3,11 @@ package connection_watcher import ( "fmt" "log" + "time" "github.com/fsnotify/fsnotify" filehelpers "github.com/turbot/go-kit/files" + "github.com/turbot/go-kit/helpers" "github.com/turbot/steampipe/cmdconfig" "github.com/turbot/steampipe/constants" "github.com/turbot/steampipe/db/db_local" @@ -47,11 +49,23 @@ func NewConnectionWatcher(onConnectionChanged func(configMap map[string]*pb.Conn log.Printf("[WARN] Failed to reload connection config: %s", err.Error()) } + go func() { + // start the watcher after a delay (to avoid refereshing connections before/while steampipe is doing it) + time.Sleep(5 * time.Second) + watcher.Start() + }() + log.Printf("[INFO] created ConnectionWatcher") return w, nil } func (w *ConnectionWatcher) handleFileWatcherEvent([]fsnotify.Event) { + defer func() { + if r := recover(); r != nil { + log.Printf("[WARN] ConnectionWatcher caught a panic: %s", helpers.ToError(r).Error()) + } + }() + log.Printf("[TRACE] ConnectionWatcher handleFileWatcherEvent") config, err := steampipeconfig.LoadConnectionConfig() if err != nil { @@ -68,6 +82,7 @@ func (w *ConnectionWatcher) handleFileWatcherEvent([]fsnotify.Event) { steampipeconfig.GlobalConfig = config // update the viper default based on this loaded config cmdconfig.SetViperDefaults(config.ConfigMap()) + refreshResult := client.RefreshConnectionAndSearchPaths() if refreshResult.Error != nil { fmt.Println() diff --git a/db/db_local/start_database.go b/db/db_local/start_database.go index d7d667bc8..827d3e505 100644 --- a/db/db_local/start_database.go +++ b/db/db_local/start_database.go @@ -21,7 +21,7 @@ import ( "github.com/turbot/steampipe/utils" ) -// StartResult is a pseudoEnum for outcomes of Start +// StartResult is a pseudoEnum for outcomes of StartNewInstance type StartResult int // StartListenType is a pseudoEnum of network binding for postgres @@ -158,7 +158,13 @@ func StartDB(port int, listen StartListenType, invoker constants.Invoker) (start } // start the plugin manager - if err := plugin_manager.Start(); err != nil { + // get the location of the currently running steampipe process + executable, err := os.Executable() + if err != nil { + log.Printf("[WARN] plugin manager start() - failed to get steampipe executable path: %s", err) + return ServiceFailedToStart, err + } + if err := plugin_manager.StartNewInstance(executable); err != nil { log.Printf("[WARN] StartDB plugin manager failed to start: %s", err) return ServiceFailedToStart, err } diff --git a/plugin_manager/lifecycle.go b/plugin_manager/lifecycle.go index d2dc7665a..946833e15 100644 --- a/plugin_manager/lifecycle.go +++ b/plugin_manager/lifecycle.go @@ -4,53 +4,45 @@ import ( "fmt" "io/ioutil" "log" - "os" "os/exec" "syscall" "github.com/hashicorp/go-hclog" "github.com/hashicorp/go-plugin" - "github.com/spf13/viper" "github.com/turbot/steampipe-plugin-sdk/logging" "github.com/turbot/steampipe/constants" pb "github.com/turbot/steampipe/plugin_manager/grpc/proto" pluginshared "github.com/turbot/steampipe/plugin_manager/grpc/shared" ) -// Start loads the plugin manager state, stops any previous instance and instantiates a new the plugin manager -func Start() error { +// StartNewInstance loads the plugin manager state, stops any previous instance and instantiates a new plugin manager +func StartNewInstance(steampipeExecutablePath string) error { // try to load the plugin manager state - state, err := loadPluginManagerState(true) + state, err := loadPluginManagerState() if err != nil { - log.Printf("[WARN] plugin manager Start() - load state failed: %s", err) + log.Printf("[WARN] plugin manager StartNewInstance() - load state failed: %s", err) return err } - if state != nil { - log.Printf("[TRACE] plugin manager Start() found previous instance of plugin manager still running - stopping it") + if state != nil && state.Running { + log.Printf("[TRACE] plugin manager StartNewInstance() found previous instance of plugin manager still running - stopping it") // stop the current instance if err := stop(state); err != nil { log.Printf("[WARN] failed to stop previous instance of plugin manager: %s", err) return err } } - return start() + return start(steampipeExecutablePath) } // start plugin manager, without checking it is already running -func start() error { - // create command which will start plugin-manager - // we have to spawn a separate process to do this so the plugin process itself is not an orphan - - // get the location of the currently running steampipe process - executable, err := os.Executable() - if err != nil { - log.Printf("[WARN] plugin manager start() - failed to get steampipe executable path: %s", err) - return err - } - log.Printf("[TRACE] plugin manager start() - got steampipe exe path: %s", executable) - - pluginManagerCmd := exec.Command(executable, "daemon", "--install-dir", viper.GetString(constants.ArgInstallDir)) +// we need to be provided with the exe path as we have no way of knowing where the steampipe exe it +// when ther plugin mananager is first started by steampipe, we derive the exe path from the running process and +// store it in the plugin manager state file - then if the fdw needs to start the plugin manager it knows how to +func start(steampipeExecutablePath string) error { + // note: we assume the install dir has been assigned to constants.SteampipeDir + // - this is done both by the FDW and Steampipe + pluginManagerCmd := exec.Command(steampipeExecutablePath, "plugin-manager", "--install-dir", constants.SteampipeDir) // set attributes on the command to ensure the process is not shutdown when its parent terminates pluginManagerCmd.SysProcAttr = &syscall.SysProcAttr{ Setpgid: true, @@ -62,12 +54,11 @@ func start() error { // launch the plugin manager the plugin process client := plugin.NewClient(&plugin.ClientConfig{ - HandshakeConfig: pluginshared.Handshake, - Plugins: pluginshared.PluginMap, - Cmd: pluginManagerCmd, - AllowedProtocols: []plugin.Protocol{ - plugin.ProtocolNetRPC, plugin.ProtocolGRPC}, - Logger: logger, + HandshakeConfig: pluginshared.Handshake, + Plugins: pluginshared.PluginMap, + Cmd: pluginManagerCmd, + AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, + Logger: logger, }) if _, err := client.Start(); err != nil { log.Printf("[WARN] plugin manager start() failed to start GRPC client for plugin manager: %s", err) @@ -75,10 +66,9 @@ func start() error { } // create a plugin manager state. - // NOTE: the pid returned by the reattach config is the pid of the daemon process - state := NewPluginManagerState(client.ReattachConfig()) + state := NewPluginManagerState(steampipeExecutablePath, client.ReattachConfig()) - log.Printf("[TRACE] start: started plugin manager, daemon pid %d", state.Pid) + log.Printf("[TRACE] start: started plugin manager, pid %d", state.Pid) // now save the state return state.Save() @@ -87,11 +77,11 @@ func start() error { // Stop loads the plugin manager state and if a running instance is found, stop it func Stop() error { // try to load the plugin manager state - state, err := loadPluginManagerState(true) + state, err := loadPluginManagerState() if err != nil { return err } - if state == nil { + if state == nil || state.Running == false { // nothing to do return nil } @@ -130,19 +120,25 @@ func GetPluginManager() (pluginshared.PluginManager, error) { // it then returns a plugin manager client func getPluginManager(startIfNeeded bool) (pluginshared.PluginManager, error) { // try to load the plugin manager state - state, err := loadPluginManagerState(true) + state, err := loadPluginManagerState() if err != nil { log.Printf("[WARN] failed to load plugin manager state: %s", err.Error()) return nil, err } // if we did not load it and there was no error, it means the plugin manager is not running + // we cannot start it as we do not know the correct steampipe exe path - which is stored in the state + // this is not expected - we would expect the plugin manager to have been started with the datatbase if state == nil { + return nil, fmt.Errorf("plugin manager is not running and there is no state file") + } + // if the plugin manager is not running, it must have crashed/terminated + if !state.Running { log.Printf("[TRACE] GetPluginManager called but plugin manager not running") + // is we are not already recursing, start the plugin manager then recurse back into this function if startIfNeeded { - - log.Printf("[TRACE] calling Start()") + log.Printf("[TRACE] calling StartNewInstance()") // start the plugin manager - if err := start(); err != nil { + if err := start(state.Executable); err != nil { return nil, err } // recurse in, setting startIfNeeded to false to avoid further recursion on failure diff --git a/plugin_manager/state.go b/plugin_manager/state.go index 548a03407..5309eb504 100644 --- a/plugin_manager/state.go +++ b/plugin_manager/state.go @@ -20,10 +20,15 @@ type pluginManagerState struct { ProtocolVersion int Addr *pb.SimpleAddr Pid int + // path to the steampipe executable + Executable string + // is the plugin manager running + Running bool } -func NewPluginManagerState(reattach *plugin.ReattachConfig) *pluginManagerState { +func NewPluginManagerState(executable string, reattach *plugin.ReattachConfig) *pluginManagerState { return &pluginManagerState{ + Executable: executable, Protocol: reattach.Protocol, ProtocolVersion: reattach.ProtocolVersion, Addr: pb.NewSimpleAddr(reattach.Addr), @@ -48,21 +53,22 @@ func (s *pluginManagerState) Save() error { return ioutil.WriteFile(constants.PluginManagerStateFilePath(), content, 0644) } -func (s *pluginManagerState) verifyServiceRunning() (bool, error) { +// check whether the plugin manager is running +// it it is NOT, delete the state file +// if it is, set the 'running property of the statefile to true +func (s *pluginManagerState) verifyRunning() error { pidExists, err := utils.PidExists(s.Pid) - if err != nil { - return false, fmt.Errorf("failed to verify plugin manager is running: %s", err.Error()) - } - if !pidExists { + + // if we fail to determine if the plugin manager is running, assume it is NOT + if err == nil && pidExists { + s.Running = true + } else if err = s.delete(); err != nil { // file is outdated - delete - if err := s.delete(); err != nil { - return false, err - } - // plugin manager is NOT running - return false, nil + log.Printf("[WARN] plugin manager is not running but failed to delete state file: %s", err.Error()) + err = fmt.Errorf("plugin manager is not running but failed to delete state file: %s", err.Error()) } - // plugin manager IS running - return true, nil + // return error (which may be nil) + return err } // kill the plugin manager process and delete the state @@ -85,7 +91,7 @@ func (s *pluginManagerState) delete() error { return os.Remove(constants.PluginManagerStateFilePath()) } -func loadPluginManagerState(verify bool) (*pluginManagerState, error) { +func loadPluginManagerState() (*pluginManagerState, error) { if !helpers.FileExists(constants.PluginManagerStateFilePath()) { log.Printf("[TRACE] plugin manager state file not found") return nil, nil @@ -101,15 +107,11 @@ func loadPluginManagerState(verify bool) (*pluginManagerState, error) { return nil, err } - if verify { - if running, err := s.verifyServiceRunning(); err != nil { - log.Printf("[TRACE] plugin manager is running, pid %d", s.Pid) - return nil, err - } else if !running { - log.Printf("[TRACE] plugin manager state file exists but pid %d is not running - deleting file", s.Pid) - return nil, nil - } - + // check is the manager is running - this deletes that state file if it si not running, + // and set the 'Running' property on the state if it is + if err = s.verifyRunning(); err != nil { + return nil, err } + return s, nil } diff --git a/tests/acceptance/run.sh b/tests/acceptance/run.sh index 455abec41..fc6f32701 100755 --- a/tests/acceptance/run.sh +++ b/tests/acceptance/run.sh @@ -3,7 +3,7 @@ if [[ ! ${MY_PATH} ]]; then MY_PATH="`dirname \"$0\"`" # relative - MY_PATH="`( cd \"$MY_PATH\" && pwd )`" # absolutized and normalized + MY_PATH="`( cd \"$MY_PATH\" && pwd )`" # absolutized and normalized fi if [[ ! ${TIME_TO_QUERY} ]]; diff --git a/utils/file_watcher.go b/utils/file_watcher.go index 7b76a5209..f8250a54d 100644 --- a/utils/file_watcher.go +++ b/utils/file_watcher.go @@ -83,8 +83,6 @@ func NewWatcher(opts *WatcherOptions) (*FileWatcher, error) { watcher.addDirectory(d) } - // start the watcher - watcher.start() return watcher, nil } @@ -95,7 +93,7 @@ func (w *FileWatcher) Close() { w.closeChan <- true } -func (w *FileWatcher) start() { +func (w *FileWatcher) Start() { // make an initial call to addWatches to add watches on existing files matching our criteria w.addWatches() diff --git a/workspace/workspace.go b/workspace/workspace.go index 479204f0b..c0eaabb81 100644 --- a/workspace/workspace.go +++ b/workspace/workspace.go @@ -113,6 +113,8 @@ func (w *Workspace) SetupWatcher(client db_common.Client, errorHandler func(erro return err } w.watcher = watcher + // start the watcher + watcher.Start() // set the file watcher error handler, which will get called when there are parsing errors // after a file watcher event