Fixed deadlocks on adding connections

This commit is contained in:
Graza
2025-12-16 11:29:18 +00:00
parent 1d9139db44
commit ef0f334239
4 changed files with 100 additions and 15 deletions

View File

@@ -60,11 +60,15 @@ func doRunPluginManager(cmd *cobra.Command) error {
log.Printf("[INFO] starting connection watcher")
connectionWatcher, err := connection.NewConnectionWatcher(pluginManager)
if err != nil {
log.Printf("[ERROR] failed to create connection watcher: %v", err)
return err
}
log.Printf("[INFO] connection watcher created successfully")
// close the connection watcher
defer connectionWatcher.Close()
} else {
log.Printf("[WARN] connection watcher is DISABLED")
}
log.Printf("[INFO] about to serve")

View File

@@ -26,12 +26,19 @@ func NewConnectionWatcher(pluginManager pluginManager) (*ConnectionWatcher, erro
pluginManager: pluginManager,
}
configDir := filepaths.EnsureConfigDir()
log.Printf("[INFO] ConnectionWatcher will watch directory: %s for %s files", configDir, constants.ConfigExtension)
watcherOptions := &filewatcher.WatcherOptions{
Directories: []string{filepaths.EnsureConfigDir()},
Directories: []string{configDir},
Include: filehelpers.InclusionsFromExtensions([]string{constants.ConfigExtension}),
ListFlag: filehelpers.FilesRecursive,
EventMask: fsnotify.Create | fsnotify.Remove | fsnotify.Rename | fsnotify.Write | fsnotify.Chmod,
OnChange: func(events []fsnotify.Event) {
log.Printf("[INFO] ConnectionWatcher detected %d file events", len(events))
for _, event := range events {
log.Printf("[INFO] ConnectionWatcher event: %s - %s", event.Op, event.Name)
}
w.handleFileWatcherEvent(events)
},
}
@@ -80,13 +87,17 @@ func (w *ConnectionWatcher) handleFileWatcherEvent([]fsnotify.Event) {
// as these are both used by RefreshConnectionAndSearchPathsWithLocalClient
// set the global steampipe config
log.Printf("[DEBUG] ConnectionWatcher: setting GlobalConfig")
steampipeconfig.GlobalConfig = config
// call on changed callback - we must call this BEFORE calling refresh connections
// convert config to format expected by plugin manager
// (plugin manager cannot reference steampipe config to avoid circular deps)
log.Printf("[DEBUG] ConnectionWatcher: creating connection config map")
configMap := NewConnectionConfigMap(config.Connections)
log.Printf("[DEBUG] ConnectionWatcher: calling OnConnectionConfigChanged with %d connections", len(configMap))
w.pluginManager.OnConnectionConfigChanged(ctx, configMap, config.PluginsInstances)
log.Printf("[DEBUG] ConnectionWatcher: OnConnectionConfigChanged complete")
// The only configurations from GlobalConfig which have
// impact during Refresh are Database options and the Connections
@@ -99,7 +110,9 @@ func (w *ConnectionWatcher) handleFileWatcherEvent([]fsnotify.Event) {
// Workspace Profile does not have any setting which can alter
// behavior in service mode (namely search path). Therefore, it is safe
// to use the GlobalConfig here and ignore Workspace Profile in general
log.Printf("[DEBUG] ConnectionWatcher: calling SetDefaultsFromConfig")
cmdconfig.SetDefaultsFromConfig(steampipeconfig.GlobalConfig.ConfigMap())
log.Printf("[DEBUG] ConnectionWatcher: SetDefaultsFromConfig complete")
log.Printf("[INFO] calling RefreshConnections asyncronously")

View File

@@ -51,7 +51,29 @@ type PluginManager struct {
// map of max cache size, keyed by plugin instance
pluginCacheSizeMap map[string]int64
// map lock
// mut protects concurrent access to plugin manager state (runningPluginMap, connectionConfigMap, etc.)
//
// LOCKING PATTERN TO PREVENT DEADLOCKS:
// - Functions that acquire mut.Lock() and call other methods MUST only call *Internal versions
// - Public methods that need locking: acquire lock → call internal version → release lock
// - Internal methods: assume caller holds lock, never acquire lock themselves
//
// Example:
// func (m *PluginManager) SomeMethod() {
// m.mut.Lock()
// defer m.mut.Unlock()
// return m.someMethodInternal()
// }
// func (m *PluginManager) someMethodInternal() {
// // NOTE: caller must hold m.mut lock
// // ... implementation without locking ...
// }
//
// Functions with internal/external versions:
// - refreshRateLimiterTable / refreshRateLimiterTableInternal
// - updateRateLimiterStatus / updateRateLimiterStatusInternal
// - setRateLimiters / setRateLimitersInternal
// - getPluginsWithChangedLimiters / getPluginsWithChangedLimitersInternal
mut sync.RWMutex
// shutdown synchronization
@@ -231,23 +253,32 @@ func (m *PluginManager) doRefresh() {
// OnConnectionConfigChanged is the callback function invoked by the connection watcher when the config changed
func (m *PluginManager) OnConnectionConfigChanged(ctx context.Context, configMap connection.ConnectionConfigMap, plugins map[string]*plugin.Plugin) {
log.Printf("[DEBUG] OnConnectionConfigChanged: acquiring lock")
m.mut.Lock()
defer m.mut.Unlock()
log.Printf("[DEBUG] OnConnectionConfigChanged: lock acquired")
log.Printf("[TRACE] OnConnectionConfigChanged: connections: %s plugin instances: %s", strings.Join(utils.SortedMapKeys(configMap), ","), strings.Join(utils.SortedMapKeys(plugins), ","))
log.Printf("[DEBUG] OnConnectionConfigChanged: calling handleConnectionConfigChanges")
if err := m.handleConnectionConfigChanges(ctx, configMap); err != nil {
log.Printf("[WARN] handleConnectionConfigChanges failed: %s", err.Error())
}
log.Printf("[DEBUG] OnConnectionConfigChanged: handleConnectionConfigChanges complete")
// update our plugin configs
log.Printf("[DEBUG] OnConnectionConfigChanged: calling handlePluginInstanceChanges")
if err := m.handlePluginInstanceChanges(ctx, plugins); err != nil {
log.Printf("[WARN] handlePluginInstanceChanges failed: %s", err.Error())
}
log.Printf("[DEBUG] OnConnectionConfigChanged: handlePluginInstanceChanges complete")
log.Printf("[DEBUG] OnConnectionConfigChanged: calling handleUserLimiterChanges")
if err := m.handleUserLimiterChanges(ctx, plugins); err != nil {
log.Printf("[WARN] handleUserLimiterChanges failed: %s", err.Error())
}
log.Printf("[DEBUG] OnConnectionConfigChanged: handleUserLimiterChanges complete")
log.Printf("[DEBUG] OnConnectionConfigChanged: about to release lock and return")
}
func (m *PluginManager) GetConnectionConfig() connection.ConnectionConfigMap {
@@ -776,14 +807,19 @@ func (m *PluginManager) setCacheOptions(pluginClient *sdkgrpc.PluginClient) erro
}
func (m *PluginManager) setRateLimiters(pluginInstance string, pluginClient *sdkgrpc.PluginClient) error {
m.mut.RLock()
defer m.mut.RUnlock()
return m.setRateLimitersInternal(pluginInstance, pluginClient)
}
func (m *PluginManager) setRateLimitersInternal(pluginInstance string, pluginClient *sdkgrpc.PluginClient) error {
// NOTE: caller must hold m.mut lock (at least RLock)
log.Printf("[INFO] setRateLimiters for plugin '%s'", pluginInstance)
var defs []*sdkproto.RateLimiterDefinition
m.mut.RLock()
for _, l := range m.userLimiters[pluginInstance] {
defs = append(defs, RateLimiterAsProto(l))
}
m.mut.RUnlock()
req := &sdkproto.SetRateLimitersRequest{Definitions: defs}

View File

@@ -29,6 +29,9 @@ func (m *PluginManager) ShouldFetchRateLimiterDefs() bool {
// update the stored limiters, refrresh the rate limiter table and call `setRateLimiters`
// for all plugins with changed limiters
func (m *PluginManager) HandlePluginLimiterChanges(newLimiters connection.PluginLimiterMap) error {
m.mut.Lock()
defer m.mut.Unlock()
if m.pluginLimiters == nil {
// this must be the first time we have populated them
m.pluginLimiters = make(connection.PluginLimiterMap)
@@ -38,13 +41,22 @@ func (m *PluginManager) HandlePluginLimiterChanges(newLimiters connection.Plugin
}
// update the steampipe_plugin_limiters table
if err := m.refreshRateLimiterTable(context.Background()); err != nil {
// NOTE: we hold m.mut lock, so call internal version
if err := m.refreshRateLimiterTableInternal(context.Background()); err != nil {
log.Println("[WARN] could not refresh rate limiter table", err)
}
return nil
}
func (m *PluginManager) refreshRateLimiterTable(ctx context.Context) error {
m.mut.Lock()
defer m.mut.Unlock()
return m.refreshRateLimiterTableInternal(ctx)
}
func (m *PluginManager) refreshRateLimiterTableInternal(ctx context.Context) error {
// NOTE: caller must hold m.mut lock
// if we have not yet populated the rate limiter table, do nothing
if m.pluginLimiters == nil {
return nil
@@ -56,7 +68,7 @@ func (m *PluginManager) refreshRateLimiterTable(ctx context.Context) error {
}
// update the status of the plugin rate limiters (determine which are overriden and set state accordingly)
m.updateRateLimiterStatus()
m.updateRateLimiterStatusInternal()
queries := []db_common.QueryWithArgs{
introspection.GetRateLimiterTableDropSql(),
@@ -70,13 +82,12 @@ func (m *PluginManager) refreshRateLimiterTable(ctx context.Context) error {
}
}
m.mut.RLock()
// NOTE: no lock needed here, caller already holds m.mut
for _, limitersForPlugin := range m.userLimiters {
for _, l := range limitersForPlugin {
queries = append(queries, introspection.GetRateLimiterTablePopulateSql(l))
}
}
m.mut.RUnlock()
conn, err := m.pool.Acquire(ctx)
if err != nil {
@@ -92,30 +103,42 @@ func (m *PluginManager) refreshRateLimiterTable(ctx context.Context) error {
// update the stored limiters, refresh the rate limiter table and call `setRateLimiters`
// for all plugins with changed limiters
func (m *PluginManager) handleUserLimiterChanges(_ context.Context, plugins connection.PluginMap) error {
log.Printf("[DEBUG] handleUserLimiterChanges: start")
limiterPluginMap := plugins.ToPluginLimiterMap()
pluginsWithChangedLimiters := m.getPluginsWithChangedLimiters(limiterPluginMap)
log.Printf("[DEBUG] handleUserLimiterChanges: got limiter plugin map")
// NOTE: caller (OnConnectionConfigChanged) already holds m.mut lock, so use internal version
pluginsWithChangedLimiters := m.getPluginsWithChangedLimitersInternal(limiterPluginMap)
log.Printf("[DEBUG] handleUserLimiterChanges: found %d plugins with changed limiters", len(pluginsWithChangedLimiters))
if len(pluginsWithChangedLimiters) == 0 {
log.Printf("[DEBUG] handleUserLimiterChanges: no changes, returning")
return nil
}
// update stored limiters to the new map
m.mut.Lock()
// NOTE: caller (OnConnectionConfigChanged) already holds m.mut lock, so we don't lock here
log.Printf("[DEBUG] handleUserLimiterChanges: updating user limiters")
m.userLimiters = limiterPluginMap
m.mut.Unlock()
// update the steampipe_plugin_limiters table
if err := m.refreshRateLimiterTable(context.Background()); err != nil {
// NOTE: caller already holds m.mut lock, so call internal version
log.Printf("[DEBUG] handleUserLimiterChanges: calling refreshRateLimiterTableInternal")
if err := m.refreshRateLimiterTableInternal(context.Background()); err != nil {
log.Println("[WARN] could not refresh rate limiter table", err)
}
log.Printf("[DEBUG] handleUserLimiterChanges: refreshRateLimiterTableInternal complete")
// now update the plugins - call setRateLimiters for any plugin with updated user limiters
log.Printf("[DEBUG] handleUserLimiterChanges: setting rate limiters for plugins")
for p := range pluginsWithChangedLimiters {
log.Printf("[DEBUG] handleUserLimiterChanges: calling setRateLimitersForPlugin for %s", p)
if err := m.setRateLimitersForPlugin(p); err != nil {
return err
}
log.Printf("[DEBUG] handleUserLimiterChanges: setRateLimitersForPlugin complete for %s", p)
}
log.Printf("[DEBUG] handleUserLimiterChanges: complete")
return nil
}
@@ -138,17 +161,22 @@ func (m *PluginManager) setRateLimitersForPlugin(pluginShortName string) error {
return sperr.WrapWithMessage(err, "failed to create a plugin client when updating the rate limiter for plugin '%s'", imageRef)
}
if err := m.setRateLimiters(pluginShortName, pluginClient); err != nil {
// NOTE: caller (handleUserLimiterChanges via OnConnectionConfigChanged) already holds m.mut lock
if err := m.setRateLimitersInternal(pluginShortName, pluginClient); err != nil {
return sperr.WrapWithMessage(err, "failed to update rate limiters for plugin '%s'", imageRef)
}
return nil
}
func (m *PluginManager) getPluginsWithChangedLimiters(newLimiters connection.PluginLimiterMap) map[string]struct{} {
var pluginsWithChangedLimiters = make(map[string]struct{})
m.mut.RLock()
defer m.mut.RUnlock()
return m.getPluginsWithChangedLimitersInternal(newLimiters)
}
func (m *PluginManager) getPluginsWithChangedLimitersInternal(newLimiters connection.PluginLimiterMap) map[string]struct{} {
// NOTE: caller must hold m.mut lock (at least RLock)
var pluginsWithChangedLimiters = make(map[string]struct{})
for plugin, limitersForPlugin := range m.userLimiters {
newLimitersForPlugin := newLimiters[plugin]
@@ -169,7 +197,11 @@ func (m *PluginManager) getPluginsWithChangedLimiters(newLimiters connection.Plu
func (m *PluginManager) updateRateLimiterStatus() {
m.mut.Lock()
defer m.mut.Unlock()
m.updateRateLimiterStatusInternal()
}
func (m *PluginManager) updateRateLimiterStatusInternal() {
// NOTE: caller must hold m.mut lock
// iterate through limiters for each plug
for p, pluginDefinedLimiters := range m.pluginLimiters {
// get user limiters for this plugin (already holding lock, so call internal version)