mirror of
https://github.com/turbot/steampipe.git
synced 2025-12-25 03:00:48 -05:00
Fixes issue where task running goroutine would try to access viper config as it is being written to. Closes #2845
This commit is contained in:
18
cmd/root.go
18
cmd/root.go
@@ -34,7 +34,7 @@ import (
|
||||
|
||||
var exitCode int
|
||||
var waitForTasksChannel chan struct{}
|
||||
var tasksCancel context.CancelFunc
|
||||
var tasksCancelFn context.CancelFunc
|
||||
|
||||
// rootCmd represents the base command when called without any subcommands
|
||||
var rootCmd = &cobra.Command{
|
||||
@@ -47,7 +47,7 @@ var rootCmd = &cobra.Command{
|
||||
// wait for the async tasks to finish
|
||||
select {
|
||||
case <-time.After(100 * time.Millisecond):
|
||||
tasksCancel()
|
||||
tasksCancelFn()
|
||||
return
|
||||
case <-waitForTasksChannel:
|
||||
return
|
||||
@@ -66,12 +66,22 @@ var rootCmd = &cobra.Command{
|
||||
|
||||
createLogger()
|
||||
|
||||
// set up the global viper config with default values from
|
||||
// config files and ENV variables
|
||||
initGlobalConfig()
|
||||
|
||||
var taskUpdateCtx context.Context
|
||||
taskUpdateCtx, tasksCancel = context.WithCancel(cmd.Context())
|
||||
taskUpdateCtx, tasksCancelFn = context.WithCancel(cmd.Context())
|
||||
|
||||
waitForTasksChannel = task.RunTasks(taskUpdateCtx, cmd, args)
|
||||
waitForTasksChannel = task.RunTasks(
|
||||
taskUpdateCtx,
|
||||
cmd,
|
||||
args,
|
||||
// pass the config value in rather than runRasks querying viper directly - to avoid concurrent map access issues
|
||||
// (we can use the update-check viper config here, since initGlobalConfig has already set it up
|
||||
// with values from the config files and ENV settings - update-check cannot be set from the command line)
|
||||
task.WithUpdateCheck(viper.GetBool(constants.ArgUpdateCheck)),
|
||||
)
|
||||
|
||||
// set the max memory
|
||||
debug.SetMemoryLimit(plugin.GetMaxMemoryBytes())
|
||||
|
||||
19
pkg/task/config.go
Normal file
19
pkg/task/config.go
Normal file
@@ -0,0 +1,19 @@
|
||||
package task
|
||||
|
||||
type taskRunConfig struct {
|
||||
runUpdateCheck bool
|
||||
}
|
||||
|
||||
func newRunConfig() *taskRunConfig {
|
||||
return &taskRunConfig{
|
||||
runUpdateCheck: true,
|
||||
}
|
||||
}
|
||||
|
||||
type TaskRunOption func(o *taskRunConfig)
|
||||
|
||||
func WithUpdateCheck(run bool) TaskRunOption {
|
||||
return func(o *taskRunConfig) {
|
||||
o.runUpdateCheck = run
|
||||
}
|
||||
}
|
||||
@@ -8,8 +8,6 @@ import (
|
||||
"time"
|
||||
|
||||
"github.com/spf13/cobra"
|
||||
"github.com/spf13/viper"
|
||||
"github.com/turbot/steampipe/pkg/constants"
|
||||
"github.com/turbot/steampipe/pkg/db/db_local"
|
||||
"github.com/turbot/steampipe/pkg/error_helpers"
|
||||
"github.com/turbot/steampipe/pkg/statefile"
|
||||
@@ -20,16 +18,22 @@ const minimumDurationBetweenChecks = 24 * time.Hour
|
||||
|
||||
type Runner struct {
|
||||
currentState statefile.State
|
||||
options *taskRunConfig
|
||||
}
|
||||
|
||||
// RunTasks runs all tasks asynchronously
|
||||
// returns a channel which is closed once all tasks are finished or the provided context is cancelled
|
||||
func RunTasks(ctx context.Context, cmd *cobra.Command, args []string) chan struct{} {
|
||||
func RunTasks(ctx context.Context, cmd *cobra.Command, args []string, options ...TaskRunOption) chan struct{} {
|
||||
utils.LogTime("task.RunTasks start")
|
||||
defer utils.LogTime("task.RunTasks end")
|
||||
|
||||
config := newRunConfig()
|
||||
for _, o := range options {
|
||||
o(config)
|
||||
}
|
||||
|
||||
doneChannel := make(chan struct{}, 1)
|
||||
runner := newRunner()
|
||||
runner := newRunner(config)
|
||||
|
||||
// if there are any notifications from the previous run - display them
|
||||
if err := runner.displayNotifications(cmd, args); err != nil {
|
||||
@@ -47,11 +51,12 @@ func RunTasks(ctx context.Context, cmd *cobra.Command, args []string) chan struc
|
||||
return doneChannel
|
||||
}
|
||||
|
||||
func newRunner() *Runner {
|
||||
func newRunner(config *taskRunConfig) *Runner {
|
||||
utils.LogTime("task.NewRunner start")
|
||||
defer utils.LogTime("task.NewRunner end")
|
||||
|
||||
r := new(Runner)
|
||||
r.options = config
|
||||
|
||||
state, err := statefile.LoadState()
|
||||
if err != nil {
|
||||
@@ -72,7 +77,7 @@ func (r *Runner) run(ctx context.Context) {
|
||||
|
||||
waitGroup := sync.WaitGroup{}
|
||||
|
||||
if viper.GetBool(constants.ArgUpdateCheck) {
|
||||
if r.options.runUpdateCheck {
|
||||
// check whether an updated version is available
|
||||
r.runJobAsync(ctx, func(c context.Context) {
|
||||
versionNotificationLines = checkSteampipeVersion(c, r.currentState.InstallationID)
|
||||
|
||||
Reference in New Issue
Block a user