mirror of
https://github.com/turbot/steampipe.git
synced 2025-12-19 18:12:43 -05:00
201 lines
5.8 KiB
Go
201 lines
5.8 KiB
Go
package task
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"github.com/spf13/cobra"
|
|
"github.com/turbot/go-kit/files"
|
|
"github.com/turbot/pipe-fittings/v2/plugin"
|
|
"github.com/turbot/pipe-fittings/v2/utils"
|
|
"github.com/turbot/steampipe/v2/pkg/db/db_local"
|
|
"github.com/turbot/steampipe/v2/pkg/error_helpers"
|
|
"github.com/turbot/steampipe/v2/pkg/filepaths"
|
|
"github.com/turbot/steampipe/v2/pkg/installationstate"
|
|
"github.com/turbot/steampipe/v2/pkg/steampipeconfig"
|
|
)
|
|
|
|
const minimumDurationBetweenChecks = 24 * time.Hour
|
|
|
|
type Runner struct {
|
|
currentState installationstate.InstallationState
|
|
options *taskRunConfig
|
|
}
|
|
|
|
// RunTasks runs all tasks asynchronously
|
|
// returns a channel which is closed once all tasks are finished or the provided context is cancelled
|
|
func RunTasks(ctx context.Context, cmd *cobra.Command, args []string, options ...TaskRunOption) chan struct{} {
|
|
utils.LogTime("task.RunTasks start")
|
|
defer utils.LogTime("task.RunTasks end")
|
|
|
|
config := newRunConfig()
|
|
for _, o := range options {
|
|
o(config)
|
|
}
|
|
|
|
doneChannel := make(chan struct{}, 1)
|
|
runner := newRunner(config)
|
|
|
|
// if there are any notifications from the previous run - display them
|
|
if err := runner.displayNotifications(cmd, args); err != nil {
|
|
log.Println("[TRACE] faced error displaying notifications:", err)
|
|
}
|
|
|
|
// asynchronously run the task runner
|
|
go func(c context.Context) {
|
|
defer close(doneChannel)
|
|
// check if a legacy notifications file exists
|
|
exists := files.FileExists(filepaths.LegacyNotificationsFilePath())
|
|
if exists {
|
|
log.Println("[TRACE] found legacy notification file. removing")
|
|
// if the legacy file exists, remove it
|
|
os.Remove(filepaths.LegacyNotificationsFilePath())
|
|
}
|
|
|
|
// if the legacy file existed, then we should enforce a run, since we need
|
|
// to update the available version cache
|
|
if runner.shouldRun() || exists {
|
|
for _, hook := range config.preHooks {
|
|
hook(c)
|
|
}
|
|
runner.run(c)
|
|
}
|
|
}(ctx)
|
|
|
|
return doneChannel
|
|
}
|
|
|
|
func newRunner(config *taskRunConfig) *Runner {
|
|
utils.LogTime("task.NewRunner start")
|
|
defer utils.LogTime("task.NewRunner end")
|
|
|
|
r := new(Runner)
|
|
r.options = config
|
|
|
|
state, err := installationstate.Load()
|
|
if err != nil {
|
|
// this error should never happen
|
|
// log this and carry on
|
|
log.Println("[TRACE] error loading state,", err)
|
|
}
|
|
r.currentState = state
|
|
return r
|
|
}
|
|
|
|
func (r *Runner) run(ctx context.Context) {
|
|
utils.LogTime("task.Runner.Run start")
|
|
defer utils.LogTime("task.Runner.Run end")
|
|
|
|
var availableCliVersion *CLIVersionCheckResponse
|
|
var availablePluginVersions map[string]plugin.PluginVersionCheckReport
|
|
|
|
waitGroup := sync.WaitGroup{}
|
|
|
|
if r.options.runUpdateCheck {
|
|
// check whether an updated version is available
|
|
r.runJobAsync(ctx, func(c context.Context) {
|
|
availableCliVersion, _ = fetchAvailableCLIVersion(ctx, r.currentState.InstallationID)
|
|
}, &waitGroup)
|
|
|
|
// check whether an updated version is available
|
|
r.runJobAsync(ctx, func(ctx context.Context) {
|
|
availablePluginVersions = plugin.GetAllUpdateReport(ctx, r.currentState.InstallationID, steampipeconfig.GlobalConfig.PluginVersions)
|
|
}, &waitGroup)
|
|
}
|
|
|
|
// remove log files older than 7 days
|
|
r.runJobAsync(ctx, func(_ context.Context) { db_local.TrimLogs() }, &waitGroup)
|
|
|
|
// wait for all jobs to complete
|
|
waitGroup.Wait()
|
|
|
|
// check if the context was cancelled before starting any FileIO
|
|
if error_helpers.IsContextCanceled(ctx) {
|
|
// if the context was cancelled, we don't want to do anything
|
|
return
|
|
}
|
|
|
|
// save the notifications, if any
|
|
if err := r.saveAvailableVersions(availableCliVersion, availablePluginVersions); err != nil {
|
|
error_helpers.ShowWarning(fmt.Sprintf("Regular task runner failed to save pending notifications: %s", err))
|
|
}
|
|
|
|
// save the state - this updates the last checked time
|
|
if err := r.currentState.Save(); err != nil {
|
|
error_helpers.ShowWarning(fmt.Sprintf("Regular task runner failed to save state file: %s", err))
|
|
}
|
|
}
|
|
|
|
func (r *Runner) runJobAsync(ctx context.Context, job func(context.Context), wg *sync.WaitGroup) {
|
|
wg.Add(1)
|
|
go func() {
|
|
// do this as defer, so that it always fires - even if there's a panic
|
|
defer wg.Done()
|
|
job(ctx)
|
|
}()
|
|
}
|
|
|
|
// determines whether the task runner should run at all
|
|
// tasks are to be run at most once every 24 hours
|
|
func (r *Runner) shouldRun() bool {
|
|
utils.LogTime("task.Runner.shouldRun start")
|
|
defer utils.LogTime("task.Runner.shouldRun end")
|
|
|
|
now := time.Now()
|
|
if r.currentState.LastCheck == "" {
|
|
return true
|
|
}
|
|
lastCheckedAt, err := time.Parse(time.RFC3339, r.currentState.LastCheck)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
durationElapsedSinceLastCheck := now.Sub(lastCheckedAt)
|
|
|
|
return durationElapsedSinceLastCheck > minimumDurationBetweenChecks
|
|
}
|
|
|
|
func showNotificationsForCommand(cmd *cobra.Command, cmdArgs []string) bool {
|
|
return !(isPluginUpdateCmd(cmd) ||
|
|
IsPluginManagerCmd(cmd) ||
|
|
isServiceStopCmd(cmd) ||
|
|
IsBatchQueryCmd(cmd, cmdArgs) ||
|
|
isCompletionCmd(cmd) ||
|
|
isPluginListCmd(cmd))
|
|
}
|
|
|
|
func isServiceStopCmd(cmd *cobra.Command) bool {
|
|
return cmd.Parent() != nil && cmd.Parent().Name() == "service" && cmd.Name() == "stop"
|
|
}
|
|
func isCompletionCmd(cmd *cobra.Command) bool {
|
|
return cmd.Name() == "completion"
|
|
}
|
|
func IsPluginManagerCmd(cmd *cobra.Command) bool {
|
|
return cmd.Name() == "plugin-manager"
|
|
}
|
|
func isPluginUpdateCmd(cmd *cobra.Command) bool {
|
|
return cmd.Name() == "update" && cmd.Parent() != nil && cmd.Parent().Name() == "plugin"
|
|
}
|
|
func IsBatchQueryCmd(cmd *cobra.Command, cmdArgs []string) bool {
|
|
return cmd.Name() == "query" && len(cmdArgs) > 0
|
|
}
|
|
func isPluginListCmd(cmd *cobra.Command) bool {
|
|
return cmd.Name() == "list" && cmd.Parent() != nil && cmd.Parent().Name() == "plugin"
|
|
}
|
|
|
|
func IsCheckCmd(cmd *cobra.Command) bool {
|
|
return cmd.Name() == "check"
|
|
}
|
|
|
|
func IsDashboardCmd(cmd *cobra.Command) bool {
|
|
return cmd.Name() == "dashboard"
|
|
}
|
|
|
|
func IsModCmd(cmd *cobra.Command) bool {
|
|
parent := cmd.Parent()
|
|
return parent.Name() == "mod"
|
|
}
|