From 4ae9df827e6d3e7e66f0edfe06de2cd4042ec3ef Mon Sep 17 00:00:00 2001 From: Puskar Basu <45908484+pskrbasu@users.noreply.github.com> Date: Fri, 25 Aug 2023 14:52:49 +0530 Subject: [PATCH] Force stop should also kill any stray plugin manager processes. Closes #3636 --- pkg/db/db_local/install.go | 3 +- pkg/db/db_local/process.go | 262 ++++++++++++++++++++++++++++++ pkg/db/db_local/start_services.go | 49 ------ pkg/db/db_local/stop_services.go | 149 +---------------- 4 files changed, 271 insertions(+), 192 deletions(-) create mode 100644 pkg/db/db_local/process.go diff --git a/pkg/db/db_local/install.go b/pkg/db/db_local/install.go index db89940e4..d03f0d945 100644 --- a/pkg/db/db_local/install.go +++ b/pkg/db/db_local/install.go @@ -219,7 +219,8 @@ func prepareDb(ctx context.Context) error { if needsInit() { statushooks.SetStatus(ctx, "Cleanup any Steampipe processes…") - killInstanceIfAny(ctx) + killPostgresInstanceIfAny(ctx) + killPluginManagerInstanceIfAny(ctx) if err := runInstall(ctx, nil); err != nil { return err } diff --git a/pkg/db/db_local/process.go b/pkg/db/db_local/process.go new file mode 100644 index 000000000..612a707c7 --- /dev/null +++ b/pkg/db/db_local/process.go @@ -0,0 +1,262 @@ +package db_local + +import ( + "context" + "fmt" + "log" + "strings" + "sync" + "syscall" + "time" + + psutils "github.com/shirou/gopsutil/process" + "github.com/turbot/go-kit/helpers" + "github.com/turbot/steampipe/pkg/constants" + "github.com/turbot/steampipe/pkg/statushooks" + "github.com/turbot/steampipe/pkg/utils" +) + +/* +Postgres has three levels of shutdown: + + - SIGTERM - Smart Shutdown : Wait for children to end normally - exit self + - SIGINT - Fast Shutdown : SIGTERM children, causing them to abort current + transations and exit - wait for children to exit - + exit self + - SIGQUIT - Immediate Shutdown : SIGQUIT children - wait at most 5 seconds, + send SIGKILL to children - exit self immediately + +Postgres recommended shutdown is to send a SIGTERM - which initiates +a Smart-Shutdown sequence. + +IMPORTANT: +As per documentation, it is best not to use SIGKILL +to shut down postgres. Doing so will prevent the server +from releasing shared memory and semaphores. + +Reference: +https://www.postgresql.org/docs/12/server-shutdown.html + +By the time we actually try to run this sequence, we will have +checked that the service can indeed shutdown gracefully, +the sequence is there only as a backup. +*/ +func doThreeStepPostgresExit(ctx context.Context, process *psutils.Process) error { + utils.LogTime("db_local.doThreeStepPostgresExit start") + defer utils.LogTime("db_local.doThreeStepPostgresExit end") + + var err error + var exitSuccessful bool + + // send a SIGTERM + err = process.SendSignal(syscall.SIGTERM) + if err != nil { + return err + } + exitSuccessful = waitForProcessExit(ctx, process) + if !exitSuccessful { + // process didn't quit + + // set status, as this is taking time + statushooks.SetStatus(ctx, "Shutting down…") + + // try a SIGINT + err = process.SendSignal(syscall.SIGINT) + if err != nil { + return err + } + exitSuccessful = waitForProcessExit(ctx, process) + } + if !exitSuccessful { + // process didn't quit + // desperation prevails + err = process.SendSignal(syscall.SIGQUIT) + if err != nil { + return err + } + exitSuccessful = waitForProcessExit(ctx, process) + } + + if !exitSuccessful { + log.Println("[ERROR] Failed to stop service") + log.Printf("[ERROR] Service Details:\n%s\n", getPrintableProcessDetails(process, 0)) + return fmt.Errorf("service shutdown timed out") + } + + return nil +} + +func waitForProcessExit(ctx context.Context, process *psutils.Process) bool { + utils.LogTime("db_local.waitForProcessExit start") + defer utils.LogTime("db_local.waitForProcessExit end") + + checkTimer := time.NewTicker(50 * time.Millisecond) + + for { + select { + case <-checkTimer.C: + pEx, _ := utils.PidExists(int(process.Pid)) + if pEx { + continue + } + return true + case <-ctx.Done(): + checkTimer.Stop() + return false + } + } +} + +func getPrintableProcessDetails(process *psutils.Process, indent int) string { + utils.LogTime("db_local.getPrintableProcessDetails start") + defer utils.LogTime("db_local.getPrintableProcessDetails end") + + indentString := strings.Repeat(" ", indent) + appendTo := []string{} + + if name, err := process.Name(); err == nil { + appendTo = append(appendTo, fmt.Sprintf("%s> Name: %s", indentString, name)) + } + if cmdLine, err := process.Cmdline(); err == nil { + appendTo = append(appendTo, fmt.Sprintf("%s> CmdLine: %s", indentString, cmdLine)) + } + if status, err := process.Status(); err == nil { + appendTo = append(appendTo, fmt.Sprintf("%s> Status: %s", indentString, status)) + } + if cwd, err := process.Cwd(); err == nil { + appendTo = append(appendTo, fmt.Sprintf("%s> CWD: %s", indentString, cwd)) + } + if executable, err := process.Exe(); err == nil { + appendTo = append(appendTo, fmt.Sprintf("%s> Executable: %s", indentString, executable)) + } + if username, err := process.Username(); err == nil { + appendTo = append(appendTo, fmt.Sprintf("%s> Username: %s", indentString, username)) + } + if indent == 0 { + // I do not care about the parent of my parent + if parent, err := process.Parent(); err == nil && parent != nil { + appendTo = append(appendTo, "", fmt.Sprintf("%s> Parent Details", indentString)) + parentLog := getPrintableProcessDetails(parent, indent+1) + appendTo = append(appendTo, parentLog, "") + } + + // I do not care about all the children of my parent + if children, err := process.Children(); err == nil && len(children) > 0 { + appendTo = append(appendTo, fmt.Sprintf("%s> Children Details", indentString)) + for _, child := range children { + childLog := getPrintableProcessDetails(child, indent+1) + appendTo = append(appendTo, childLog, "") + } + } + } + + return strings.Join(appendTo, "\n") +} + +// kill all postgres processes that were started as part of steampipe (if any) +func killPostgresInstanceIfAny(ctx context.Context) bool { + processes, err := FindAllSteampipePostgresInstances(ctx) + if err != nil { + return false + } + wg := sync.WaitGroup{} + for _, process := range processes { + wg.Add(1) + go func(p *psutils.Process) { + doThreeStepPostgresExit(ctx, p) + wg.Done() + }(process) + } + wg.Wait() + return len(processes) > 0 +} + +// kill all plugin manager processes that were started as part of steampipe (if any) +func killPluginManagerInstanceIfAny(ctx context.Context) bool { + processGroups, err := FindAllSteampipePluginManagerInstances(ctx) + if err != nil { + return false + } + wg := sync.WaitGroup{} + for _, processGroup := range processGroups { + // add the number of processes in this to the waitGroup + wg.Add(len(processGroup)) + go func(processGrp []*psutils.Process) { + for _, p := range processGrp { + defer wg.Done() + if err := p.KillWithContext(ctx); err != nil { + log.Println("[TRACE] error killing process", err) + } + timeout, cancel := context.WithTimeout(ctx, 2*time.Second) + defer cancel() + if !waitForProcessExit(timeout, p) { + log.Println("[TRACE] timed out waiting for process exit") + } + } + }(processGroup) + } + wg.Wait() + return len(processGroups) > 0 +} + +func FindAllSteampipePostgresInstances(ctx context.Context) ([]*psutils.Process, error) { + var instances []*psutils.Process + allProcesses, err := psutils.ProcessesWithContext(ctx) + if err != nil { + return nil, err + } + for _, p := range allProcesses { + cmdLine, err := p.CmdlineSliceWithContext(ctx) + if err != nil { + return nil, err + } + if isSteampipePostgresProcess(ctx, cmdLine) { + instances = append(instances, p) + } + } + return instances, nil +} + +func FindAllSteampipePluginManagerInstances(ctx context.Context) ([][]*psutils.Process, error) { + var instances [][]*psutils.Process + allProcesses, err := psutils.ProcessesWithContext(ctx) + if err != nil { + return nil, err + } + for _, p := range allProcesses { + cmdLine, err := p.CmdlineSliceWithContext(ctx) + if err != nil { + return nil, err + } + if isSteampipePluginManagerProcess(ctx, cmdLine) { + theseInstances := []*psutils.Process{} + for _, q := range allProcesses { + if ppid, err := q.Ppid(); err == nil && ppid == p.Pid { + // add all child plugin processes too + theseInstances = append(theseInstances, q) + } + } + theseInstances = append(theseInstances, p) + instances = append(instances, theseInstances) + } + } + return instances, nil +} + +func isSteampipePostgresProcess(ctx context.Context, cmdline []string) bool { + if len(cmdline) < 1 { + return false + } + if strings.Contains(cmdline[0], "postgres") { + // this is a postgres process - but is it a steampipe service? + return helpers.StringSliceContains(cmdline, fmt.Sprintf("application_name=%s", constants.AppName)) + } + return false +} + +func isSteampipePluginManagerProcess(ctx context.Context, cmdline []string) bool { + if len(cmdline) < 1 { + return false + } + return strings.HasSuffix(cmdline[0], "steampipe") && strings.EqualFold(cmdline[1], "plugin-manager") +} diff --git a/pkg/db/db_local/start_services.go b/pkg/db/db_local/start_services.go index a17a55d4d..cbdd2b2a2 100644 --- a/pkg/db/db_local/start_services.go +++ b/pkg/db/db_local/start_services.go @@ -8,11 +8,9 @@ import ( "os" "os/exec" "strings" - "sync" "syscall" "github.com/jackc/pgx/v5" - psutils "github.com/shirou/gopsutil/process" "github.com/spf13/viper" "github.com/turbot/go-kit/helpers" "github.com/turbot/steampipe-plugin-sdk/v5/sperr" @@ -601,50 +599,3 @@ func ensureTempTablePermissions(ctx context.Context, databaseName string, rootCl } return nil } - -// kill all postgres processes that were started as part of steampipe (if any) -func killInstanceIfAny(ctx context.Context) bool { - processes, err := FindAllSteampipePostgresInstances(ctx) - if err != nil { - return false - } - wg := sync.WaitGroup{} - for _, process := range processes { - wg.Add(1) - go func(p *psutils.Process) { - doThreeStepPostgresExit(ctx, p) - wg.Done() - }(process) - } - wg.Wait() - return len(processes) > 0 -} - -func FindAllSteampipePostgresInstances(ctx context.Context) ([]*psutils.Process, error) { - var instances []*psutils.Process - allProcesses, err := psutils.ProcessesWithContext(ctx) - if err != nil { - return nil, err - } - for _, p := range allProcesses { - cmdLine, err := p.CmdlineSliceWithContext(ctx) - if err != nil { - return nil, err - } - if isSteampipePostgresProcess(ctx, cmdLine) { - instances = append(instances, p) - } - } - return instances, nil -} - -func isSteampipePostgresProcess(ctx context.Context, cmdline []string) bool { - if len(cmdline) < 1 { - return false - } - if strings.Contains(cmdline[0], "postgres") { - // this is a postgres process - but is it a steampipe service? - return helpers.StringSliceContains(cmdline, fmt.Sprintf("application_name=%s", constants.AppName)) - } - return false -} diff --git a/pkg/db/db_local/stop_services.go b/pkg/db/db_local/stop_services.go index 0118cf4b3..f8392daa2 100644 --- a/pkg/db/db_local/stop_services.go +++ b/pkg/db/db_local/stop_services.go @@ -6,8 +6,6 @@ import ( "log" "os" "strings" - "syscall" - "time" psutils "github.com/shirou/gopsutil/process" "github.com/turbot/steampipe/pkg/constants" @@ -180,8 +178,13 @@ func stopDBService(ctx context.Context, force bool) (StopStatus, error) { // check if we have a process from another install-dir statushooks.SetStatus(ctx, "Checking for running instances…") // do not use a context that can be cancelled - anyStopped := killInstanceIfAny(context.Background()) - if anyStopped { + anyStopped := killPostgresInstanceIfAny(context.Background()) + // plugin manager is already stopped at this point, but we have seen instances where stray plugin manager + // processes were left behind even after force stop. So we kill any leftover plugin manager processes(if any). + // Adding this step adds 1 process call(in the best case scenario) but confirms that no plugin manager processes + // are leftover. + anyPluginManagerStopped := killPluginManagerInstanceIfAny(context.Background()) + if anyStopped || anyPluginManagerStopped { return ServiceStopped, nil } return ServiceNotRunning, nil @@ -213,141 +216,3 @@ func stopDBService(ctx context.Context, force bool) (StopStatus, error) { return ServiceStopped, nil } - -/* -Postgres has three levels of shutdown: - - - SIGTERM - Smart Shutdown : Wait for children to end normally - exit self - - SIGINT - Fast Shutdown : SIGTERM children, causing them to abort current - transations and exit - wait for children to exit - - exit self - - SIGQUIT - Immediate Shutdown : SIGQUIT children - wait at most 5 seconds, - send SIGKILL to children - exit self immediately - -Postgres recommended shutdown is to send a SIGTERM - which initiates -a Smart-Shutdown sequence. - -IMPORTANT: -As per documentation, it is best not to use SIGKILL -to shut down postgres. Doing so will prevent the server -from releasing shared memory and semaphores. - -Reference: -https://www.postgresql.org/docs/12/server-shutdown.html - -By the time we actually try to run this sequence, we will have -checked that the service can indeed shutdown gracefully, -the sequence is there only as a backup. -*/ -func doThreeStepPostgresExit(ctx context.Context, process *psutils.Process) error { - utils.LogTime("db_local.doThreeStepPostgresExit start") - defer utils.LogTime("db_local.doThreeStepPostgresExit end") - - var err error - var exitSuccessful bool - - // send a SIGTERM - err = process.SendSignal(syscall.SIGTERM) - if err != nil { - return err - } - exitSuccessful = waitForProcessExit(process, 2*time.Second) - if !exitSuccessful { - // process didn't quit - - // set status, as this is taking time - statushooks.SetStatus(ctx, "Shutting down…") - - // try a SIGINT - err = process.SendSignal(syscall.SIGINT) - if err != nil { - return err - } - exitSuccessful = waitForProcessExit(process, 2*time.Second) - } - if !exitSuccessful { - // process didn't quit - // desperation prevails - err = process.SendSignal(syscall.SIGQUIT) - if err != nil { - return err - } - exitSuccessful = waitForProcessExit(process, 5*time.Second) - } - - if !exitSuccessful { - log.Println("[ERROR] Failed to stop service") - log.Printf("[ERROR] Service Details:\n%s\n", getPrintableProcessDetails(process, 0)) - return fmt.Errorf("service shutdown timed out") - } - - return nil -} - -func waitForProcessExit(process *psutils.Process, waitFor time.Duration) bool { - utils.LogTime("db_local.waitForProcessExit start") - defer utils.LogTime("db_local.waitForProcessExit end") - - checkTimer := time.NewTicker(50 * time.Millisecond) - timeoutAt := time.After(waitFor) - - for { - select { - case <-checkTimer.C: - pEx, _ := utils.PidExists(int(process.Pid)) - if pEx { - continue - } - return true - case <-timeoutAt: - checkTimer.Stop() - return false - } - } -} - -func getPrintableProcessDetails(process *psutils.Process, indent int) string { - utils.LogTime("db_local.getPrintableProcessDetails start") - defer utils.LogTime("db_local.getPrintableProcessDetails end") - - indentString := strings.Repeat(" ", indent) - appendTo := []string{} - - if name, err := process.Name(); err == nil { - appendTo = append(appendTo, fmt.Sprintf("%s> Name: %s", indentString, name)) - } - if cmdLine, err := process.Cmdline(); err == nil { - appendTo = append(appendTo, fmt.Sprintf("%s> CmdLine: %s", indentString, cmdLine)) - } - if status, err := process.Status(); err == nil { - appendTo = append(appendTo, fmt.Sprintf("%s> Status: %s", indentString, status)) - } - if cwd, err := process.Cwd(); err == nil { - appendTo = append(appendTo, fmt.Sprintf("%s> CWD: %s", indentString, cwd)) - } - if executable, err := process.Exe(); err == nil { - appendTo = append(appendTo, fmt.Sprintf("%s> Executable: %s", indentString, executable)) - } - if username, err := process.Username(); err == nil { - appendTo = append(appendTo, fmt.Sprintf("%s> Username: %s", indentString, username)) - } - if indent == 0 { - // I do not care about the parent of my parent - if parent, err := process.Parent(); err == nil && parent != nil { - appendTo = append(appendTo, "", fmt.Sprintf("%s> Parent Details", indentString)) - parentLog := getPrintableProcessDetails(parent, indent+1) - appendTo = append(appendTo, parentLog, "") - } - - // I do not care about all the children of my parent - if children, err := process.Children(); err == nil && len(children) > 0 { - appendTo = append(appendTo, fmt.Sprintf("%s> Children Details", indentString)) - for _, child := range children { - childLog := getPrintableProcessDetails(child, indent+1) - appendTo = append(appendTo, childLog, "") - } - } - } - - return strings.Join(appendTo, "\n") -}