Remove daemon process. Closes #1146

This commit is contained in:
kaidaguerre
2021-11-18 17:57:06 +00:00
committed by GitHub
parent c760000652
commit 8d7d2b6b04
10 changed files with 85 additions and 115 deletions

View File

@@ -1,48 +0,0 @@
package cmd
import (
"fmt"
"os"
"os/exec"
"os/signal"
"syscall"
"github.com/spf13/cobra"
"github.com/spf13/viper"
"github.com/turbot/steampipe/constants"
)
func daemonCmd() *cobra.Command {
cmd := &cobra.Command{
Use: "daemon",
Run: runDaemonCmd,
Hidden: true,
}
return cmd
}
func runDaemonCmd(cmd *cobra.Command, args []string) {
// get the location of the currently running steampipe process
executable, err := os.Executable()
if err != nil {
fmt.Printf("[WARN] plugin manager start() - failed to get steampipe executable path: %s", err)
os.Exit(1)
}
// create command which will run steampipe plugin-manager
pluginManagerCmd := exec.Command(executable, "plugin-manager", "--install-dir", viper.GetString(constants.ArgInstallDir))
pluginManagerCmd.Stdout = os.Stdout
pluginManagerCmd.Start()
// wait to be killed
sigchan := make(chan os.Signal, 1)
signal.Notify(sigchan,
syscall.SIGINT,
syscall.SIGKILL,
syscall.SIGTERM,
syscall.SIGQUIT)
<-sigchan
// kill our child
// NOTE we will not do this if kill -9 is run
pluginManagerCmd.Process.Kill()
}

View File

@@ -148,7 +148,6 @@ func AddCommands() {
checkCmd(),
serviceCmd(),
generateCompletionScriptsCmd(),
daemonCmd(),
pluginManagerCmd(),
)
}

View File

@@ -3,9 +3,11 @@ package connection_watcher
import (
"fmt"
"log"
"time"
"github.com/fsnotify/fsnotify"
filehelpers "github.com/turbot/go-kit/files"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe/cmdconfig"
"github.com/turbot/steampipe/constants"
"github.com/turbot/steampipe/db/db_local"
@@ -47,11 +49,23 @@ func NewConnectionWatcher(onConnectionChanged func(configMap map[string]*pb.Conn
log.Printf("[WARN] Failed to reload connection config: %s", err.Error())
}
go func() {
// start the watcher after a delay (to avoid refereshing connections before/while steampipe is doing it)
time.Sleep(5 * time.Second)
watcher.Start()
}()
log.Printf("[INFO] created ConnectionWatcher")
return w, nil
}
func (w *ConnectionWatcher) handleFileWatcherEvent([]fsnotify.Event) {
defer func() {
if r := recover(); r != nil {
log.Printf("[WARN] ConnectionWatcher caught a panic: %s", helpers.ToError(r).Error())
}
}()
log.Printf("[TRACE] ConnectionWatcher handleFileWatcherEvent")
config, err := steampipeconfig.LoadConnectionConfig()
if err != nil {
@@ -68,6 +82,7 @@ func (w *ConnectionWatcher) handleFileWatcherEvent([]fsnotify.Event) {
steampipeconfig.GlobalConfig = config
// update the viper default based on this loaded config
cmdconfig.SetViperDefaults(config.ConfigMap())
refreshResult := client.RefreshConnectionAndSearchPaths()
if refreshResult.Error != nil {
fmt.Println()

View File

@@ -21,7 +21,7 @@ import (
"github.com/turbot/steampipe/utils"
)
// StartResult is a pseudoEnum for outcomes of Start
// StartResult is a pseudoEnum for outcomes of StartNewInstance
type StartResult int
// StartListenType is a pseudoEnum of network binding for postgres
@@ -158,7 +158,13 @@ func StartDB(port int, listen StartListenType, invoker constants.Invoker) (start
}
// start the plugin manager
if err := plugin_manager.Start(); err != nil {
// get the location of the currently running steampipe process
executable, err := os.Executable()
if err != nil {
log.Printf("[WARN] plugin manager start() - failed to get steampipe executable path: %s", err)
return ServiceFailedToStart, err
}
if err := plugin_manager.StartNewInstance(executable); err != nil {
log.Printf("[WARN] StartDB plugin manager failed to start: %s", err)
return ServiceFailedToStart, err
}

View File

@@ -4,53 +4,45 @@ import (
"fmt"
"io/ioutil"
"log"
"os"
"os/exec"
"syscall"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/spf13/viper"
"github.com/turbot/steampipe-plugin-sdk/logging"
"github.com/turbot/steampipe/constants"
pb "github.com/turbot/steampipe/plugin_manager/grpc/proto"
pluginshared "github.com/turbot/steampipe/plugin_manager/grpc/shared"
)
// Start loads the plugin manager state, stops any previous instance and instantiates a new the plugin manager
func Start() error {
// StartNewInstance loads the plugin manager state, stops any previous instance and instantiates a new plugin manager
func StartNewInstance(steampipeExecutablePath string) error {
// try to load the plugin manager state
state, err := loadPluginManagerState(true)
state, err := loadPluginManagerState()
if err != nil {
log.Printf("[WARN] plugin manager Start() - load state failed: %s", err)
log.Printf("[WARN] plugin manager StartNewInstance() - load state failed: %s", err)
return err
}
if state != nil {
log.Printf("[TRACE] plugin manager Start() found previous instance of plugin manager still running - stopping it")
if state != nil && state.Running {
log.Printf("[TRACE] plugin manager StartNewInstance() found previous instance of plugin manager still running - stopping it")
// stop the current instance
if err := stop(state); err != nil {
log.Printf("[WARN] failed to stop previous instance of plugin manager: %s", err)
return err
}
}
return start()
return start(steampipeExecutablePath)
}
// start plugin manager, without checking it is already running
func start() error {
// create command which will start plugin-manager
// we have to spawn a separate process to do this so the plugin process itself is not an orphan
// get the location of the currently running steampipe process
executable, err := os.Executable()
if err != nil {
log.Printf("[WARN] plugin manager start() - failed to get steampipe executable path: %s", err)
return err
}
log.Printf("[TRACE] plugin manager start() - got steampipe exe path: %s", executable)
pluginManagerCmd := exec.Command(executable, "daemon", "--install-dir", viper.GetString(constants.ArgInstallDir))
// we need to be provided with the exe path as we have no way of knowing where the steampipe exe it
// when ther plugin mananager is first started by steampipe, we derive the exe path from the running process and
// store it in the plugin manager state file - then if the fdw needs to start the plugin manager it knows how to
func start(steampipeExecutablePath string) error {
// note: we assume the install dir has been assigned to constants.SteampipeDir
// - this is done both by the FDW and Steampipe
pluginManagerCmd := exec.Command(steampipeExecutablePath, "plugin-manager", "--install-dir", constants.SteampipeDir)
// set attributes on the command to ensure the process is not shutdown when its parent terminates
pluginManagerCmd.SysProcAttr = &syscall.SysProcAttr{
Setpgid: true,
@@ -62,12 +54,11 @@ func start() error {
// launch the plugin manager the plugin process
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: pluginshared.Handshake,
Plugins: pluginshared.PluginMap,
Cmd: pluginManagerCmd,
AllowedProtocols: []plugin.Protocol{
plugin.ProtocolNetRPC, plugin.ProtocolGRPC},
Logger: logger,
HandshakeConfig: pluginshared.Handshake,
Plugins: pluginshared.PluginMap,
Cmd: pluginManagerCmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
Logger: logger,
})
if _, err := client.Start(); err != nil {
log.Printf("[WARN] plugin manager start() failed to start GRPC client for plugin manager: %s", err)
@@ -75,10 +66,9 @@ func start() error {
}
// create a plugin manager state.
// NOTE: the pid returned by the reattach config is the pid of the daemon process
state := NewPluginManagerState(client.ReattachConfig())
state := NewPluginManagerState(steampipeExecutablePath, client.ReattachConfig())
log.Printf("[TRACE] start: started plugin manager, daemon pid %d", state.Pid)
log.Printf("[TRACE] start: started plugin manager, pid %d", state.Pid)
// now save the state
return state.Save()
@@ -87,11 +77,11 @@ func start() error {
// Stop loads the plugin manager state and if a running instance is found, stop it
func Stop() error {
// try to load the plugin manager state
state, err := loadPluginManagerState(true)
state, err := loadPluginManagerState()
if err != nil {
return err
}
if state == nil {
if state == nil || state.Running == false {
// nothing to do
return nil
}
@@ -130,19 +120,25 @@ func GetPluginManager() (pluginshared.PluginManager, error) {
// it then returns a plugin manager client
func getPluginManager(startIfNeeded bool) (pluginshared.PluginManager, error) {
// try to load the plugin manager state
state, err := loadPluginManagerState(true)
state, err := loadPluginManagerState()
if err != nil {
log.Printf("[WARN] failed to load plugin manager state: %s", err.Error())
return nil, err
}
// if we did not load it and there was no error, it means the plugin manager is not running
// we cannot start it as we do not know the correct steampipe exe path - which is stored in the state
// this is not expected - we would expect the plugin manager to have been started with the datatbase
if state == nil {
return nil, fmt.Errorf("plugin manager is not running and there is no state file")
}
// if the plugin manager is not running, it must have crashed/terminated
if !state.Running {
log.Printf("[TRACE] GetPluginManager called but plugin manager not running")
// is we are not already recursing, start the plugin manager then recurse back into this function
if startIfNeeded {
log.Printf("[TRACE] calling Start()")
log.Printf("[TRACE] calling StartNewInstance()")
// start the plugin manager
if err := start(); err != nil {
if err := start(state.Executable); err != nil {
return nil, err
}
// recurse in, setting startIfNeeded to false to avoid further recursion on failure

View File

@@ -20,10 +20,15 @@ type pluginManagerState struct {
ProtocolVersion int
Addr *pb.SimpleAddr
Pid int
// path to the steampipe executable
Executable string
// is the plugin manager running
Running bool
}
func NewPluginManagerState(reattach *plugin.ReattachConfig) *pluginManagerState {
func NewPluginManagerState(executable string, reattach *plugin.ReattachConfig) *pluginManagerState {
return &pluginManagerState{
Executable: executable,
Protocol: reattach.Protocol,
ProtocolVersion: reattach.ProtocolVersion,
Addr: pb.NewSimpleAddr(reattach.Addr),
@@ -48,21 +53,22 @@ func (s *pluginManagerState) Save() error {
return ioutil.WriteFile(constants.PluginManagerStateFilePath(), content, 0644)
}
func (s *pluginManagerState) verifyServiceRunning() (bool, error) {
// check whether the plugin manager is running
// it it is NOT, delete the state file
// if it is, set the 'running property of the statefile to true
func (s *pluginManagerState) verifyRunning() error {
pidExists, err := utils.PidExists(s.Pid)
if err != nil {
return false, fmt.Errorf("failed to verify plugin manager is running: %s", err.Error())
}
if !pidExists {
// if we fail to determine if the plugin manager is running, assume it is NOT
if err == nil && pidExists {
s.Running = true
} else if err = s.delete(); err != nil {
// file is outdated - delete
if err := s.delete(); err != nil {
return false, err
}
// plugin manager is NOT running
return false, nil
log.Printf("[WARN] plugin manager is not running but failed to delete state file: %s", err.Error())
err = fmt.Errorf("plugin manager is not running but failed to delete state file: %s", err.Error())
}
// plugin manager IS running
return true, nil
// return error (which may be nil)
return err
}
// kill the plugin manager process and delete the state
@@ -85,7 +91,7 @@ func (s *pluginManagerState) delete() error {
return os.Remove(constants.PluginManagerStateFilePath())
}
func loadPluginManagerState(verify bool) (*pluginManagerState, error) {
func loadPluginManagerState() (*pluginManagerState, error) {
if !helpers.FileExists(constants.PluginManagerStateFilePath()) {
log.Printf("[TRACE] plugin manager state file not found")
return nil, nil
@@ -101,15 +107,11 @@ func loadPluginManagerState(verify bool) (*pluginManagerState, error) {
return nil, err
}
if verify {
if running, err := s.verifyServiceRunning(); err != nil {
log.Printf("[TRACE] plugin manager is running, pid %d", s.Pid)
return nil, err
} else if !running {
log.Printf("[TRACE] plugin manager state file exists but pid %d is not running - deleting file", s.Pid)
return nil, nil
}
// check is the manager is running - this deletes that state file if it si not running,
// and set the 'Running' property on the state if it is
if err = s.verifyRunning(); err != nil {
return nil, err
}
return s, nil
}

View File

@@ -3,7 +3,7 @@
if [[ ! ${MY_PATH} ]];
then
MY_PATH="`dirname \"$0\"`" # relative
MY_PATH="`( cd \"$MY_PATH\" && pwd )`" # absolutized and normalized
MY_PATH="`( cd \"$MY_PATH\" && pwd )`" # absolutized and normalized
fi
if [[ ! ${TIME_TO_QUERY} ]];

View File

@@ -83,8 +83,6 @@ func NewWatcher(opts *WatcherOptions) (*FileWatcher, error) {
watcher.addDirectory(d)
}
// start the watcher
watcher.start()
return watcher, nil
}
@@ -95,7 +93,7 @@ func (w *FileWatcher) Close() {
w.closeChan <- true
}
func (w *FileWatcher) start() {
func (w *FileWatcher) Start() {
// make an initial call to addWatches to add watches on existing files matching our criteria
w.addWatches()

View File

@@ -113,6 +113,8 @@ func (w *Workspace) SetupWatcher(client db_common.Client, errorHandler func(erro
return err
}
w.watcher = watcher
// start the watcher
watcher.Start()
// set the file watcher error handler, which will get called when there are parsing errors
// after a file watcher event