diff --git a/cmd/plugin.go b/cmd/plugin.go index f1cb6871b..c03e762a3 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -699,7 +699,7 @@ func getPluginConnectionMap(ctx context.Context) (map[string][]modconfig.Connect } pluginConnectionMap := make(map[string][]modconfig.Connection) - for _, v := range *client.ConnectionMap() { + for _, v := range res.ConnectionMap { _, found := pluginConnectionMap[v.Plugin] if !found { pluginConnectionMap[v.Plugin] = []modconfig.Connection{} diff --git a/cmd/plugin_manager.go b/cmd/plugin_manager.go index 36af5273f..518460644 100644 --- a/cmd/plugin_manager.go +++ b/cmd/plugin_manager.go @@ -56,7 +56,7 @@ func runPluginManagerCmd(cmd *cobra.Command, args []string) { if shouldRunConnectionWatcher() { log.Printf("[INFO] starting connection watcher") - connectionWatcher, err := connectionwatcher.NewConnectionWatcher(pluginManager.SetConnectionConfigMap) + connectionWatcher, err := connectionwatcher.NewConnectionWatcher(pluginManager.OnConnectionConfigChanged, pluginManager.OnSchemaChanged) if err != nil { log.Printf("[WARN] failed to create connection watcher: %s", err.Error()) error_helpers.ShowError(ctx, err) diff --git a/pkg/connectionwatcher/connection_watcher.go b/pkg/connectionwatcher/connection_watcher.go index b252f1219..a339e316d 100644 --- a/pkg/connectionwatcher/connection_watcher.go +++ b/pkg/connectionwatcher/connection_watcher.go @@ -18,12 +18,14 @@ import ( type ConnectionWatcher struct { fileWatcherErrorHandler func(error) watcher *filewatcher.FileWatcher - onConnectionConfigChanged func(configMap ConnectionConfigMap) + onConnectionConfigChanged func(ConnectionConfigMap) + onSchemaChanged func(*steampipeconfig.RefreshConnectionResult) } -func NewConnectionWatcher(onConnectionChanged func(configMap ConnectionConfigMap)) (*ConnectionWatcher, error) { +func NewConnectionWatcher(onConnectionChanged func(ConnectionConfigMap), onSchemaChanged func(*steampipeconfig.RefreshConnectionResult)) (*ConnectionWatcher, error) { w := &ConnectionWatcher{ onConnectionConfigChanged: onConnectionChanged, + onSchemaChanged: onSchemaChanged, } watcherOptions := &filewatcher.WatcherOptions{ @@ -77,24 +79,18 @@ func (w *ConnectionWatcher) handleFileWatcherEvent(_ []fsnotify.Event) { } defer client.Close(ctx) - log.Printf("[TRACE] loaded updated config") - - log.Printf("[TRACE] calling onConnectionConfigChanged") - // convert config to format expected by plugin manager - // (plugin manager cannot reference steampipe config to avoid circular deps) - configMap := NewConnectionConfigMap(config.Connections) - // call on changed callback - // (this calls pluginmanager.SetConnectionConfigMap) - w.onConnectionConfigChanged(configMap) - - log.Printf("[TRACE] calling RefreshConnectionAndSearchPaths") - // We need to update the viper config and GlobalConfig // as these are both used by RefreshConnectionAndSearchPaths // set the global steampipe config steampipeconfig.GlobalConfig = config + // call on changed callback - we must call this BEFORE calling refresh connections + // convert config to format expected by plugin manager + // (plugin manager cannot reference steampipe config to avoid circular deps) + configMap := NewConnectionConfigMap(config.Connections) + w.onConnectionConfigChanged(configMap) + // The only configurations from GlobalConfig which have // impact during Refresh are Database options and the Connections // themselves. @@ -108,16 +104,22 @@ func (w *ConnectionWatcher) handleFileWatcherEvent(_ []fsnotify.Event) { // to use the GlobalConfig here and ignore Workspace Profile in general cmdconfig.SetDefaultsFromConfig(steampipeconfig.GlobalConfig.ConfigMap()) + log.Printf("[TRACE] calling RefreshConnectionAndSearchPaths") // now refresh connections and search paths refreshResult := client.RefreshConnectionAndSearchPaths(ctx) if refreshResult.Error != nil { log.Printf("[WARN] error refreshing connections: %s", refreshResult.Error) return } + // if the connections were added or removed, call the schema changed callback + if refreshResult.UpdatedConnections { + w.onSchemaChanged(refreshResult) + } // display any refresh warnings - // TODO send warnings on warning_stream (to FDW???) + // TODO send warnings on warning_stream refreshResult.ShowWarnings() + log.Printf("[TRACE] File watch event done") } func (w *ConnectionWatcher) Close() { diff --git a/pkg/constants/notifications.go b/pkg/constants/notifications.go new file mode 100644 index 000000000..ca36e88b9 --- /dev/null +++ b/pkg/constants/notifications.go @@ -0,0 +1,5 @@ +package constants + +const ( + NotificationConnectionUpdate = "connection_update" +) diff --git a/pkg/db/db_client/db_client.go b/pkg/db/db_client/db_client.go index b51d5fdc0..2b879cac0 100644 --- a/pkg/db/db_client/db_client.go +++ b/pkg/db/db_client/db_client.go @@ -124,10 +124,6 @@ func (c *DbClient) Close(context.Context) error { return nil } -func (c *DbClient) ConnectionMap() *steampipeconfig.ConnectionDataMap { - return &steampipeconfig.ConnectionDataMap{} -} - // ForeignSchemaNames implements Client func (c *DbClient) ForeignSchemaNames() []string { return c.foreignSchemaNames diff --git a/pkg/db/db_client/db_client_session.go b/pkg/db/db_client/db_client_session.go index d2d55efef..2de474566 100644 --- a/pkg/db/db_client/db_client_session.go +++ b/pkg/db/db_client/db_client_session.go @@ -80,6 +80,7 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common return sessionResult } + sessionResult.Error = ctx.Err() return sessionResult } diff --git a/pkg/db/db_common/client.go b/pkg/db/db_common/client.go index 6f9860df5..5e435b494 100644 --- a/pkg/db/db_common/client.go +++ b/pkg/db/db_common/client.go @@ -3,7 +3,6 @@ package db_common import ( "context" "database/sql" - "github.com/turbot/steampipe/pkg/query/queryresult" "github.com/turbot/steampipe/pkg/schema" "github.com/turbot/steampipe/pkg/steampipeconfig" @@ -14,9 +13,7 @@ type Client interface { ForeignSchemaNames() []string AllSchemaNames() []string - LoadSchemaNames(ctx context.Context) error - ConnectionMap() *steampipeconfig.ConnectionDataMap GetCurrentSearchPath(context.Context) ([]string, error) GetCurrentSearchPathForDbConnection(context.Context, *sql.Conn) ([]string, error) diff --git a/pkg/db/db_local/local_db_client.go b/pkg/db/db_local/local_db_client.go index 25c6e2870..bb2c902e3 100644 --- a/pkg/db/db_local/local_db_client.go +++ b/pkg/db/db_local/local_db_client.go @@ -20,9 +20,8 @@ import ( // LocalDbClient wraps over DbClient type LocalDbClient struct { - client *db_client.DbClient - invoker constants.Invoker - connectionMap *steampipeconfig.ConnectionDataMap + client *db_client.DbClient + invoker constants.Invoker } // GetLocalClient starts service if needed and creates a new LocalDbClient @@ -98,10 +97,6 @@ func (c *LocalDbClient) LoadSchemaNames(ctx context.Context) error { return c.client.LoadSchemaNames(ctx) } -func (c *LocalDbClient) ConnectionMap() *steampipeconfig.ConnectionDataMap { - return c.connectionMap -} - func (c *LocalDbClient) RefreshSessions(ctx context.Context) *db_common.AcquireSessionResult { return c.client.RefreshSessions(ctx) } @@ -182,7 +177,7 @@ func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, query := c.buildSchemasQuery(schemas) acquireSessionResult := c.AcquireSession(ctx) - if acquireSessionResult.Error != nil { + if acquireSessionResult.Error != nil && acquireSessionResult.Session != nil { acquireSessionResult.Session.Close(false) return nil, err } @@ -263,8 +258,6 @@ WHERE %s return query } -// local only functions - func (c *LocalDbClient) RefreshConnectionAndSearchPaths(ctx context.Context, forceUpdateConnectionNames ...string) *steampipeconfig.RefreshConnectionResult { statushooks.SetStatus(ctx, "Refreshing connections") res := c.refreshConnections(ctx, forceUpdateConnectionNames...) @@ -292,7 +285,7 @@ func (c *LocalDbClient) RefreshConnectionAndSearchPaths(ctx context.Context, for res.Error = err return res } - c.connectionMap = &connectionMap + res.ConnectionMap = connectionMap // set user search path first - client may fall back to using it statushooks.SetStatus(ctx, "Setting up search path") diff --git a/pkg/initialisation/init_data.go b/pkg/initialisation/init_data.go index 7293e0ccd..8dfea1698 100644 --- a/pkg/initialisation/init_data.go +++ b/pkg/initialisation/init_data.go @@ -12,10 +12,10 @@ import ( "github.com/turbot/steampipe/pkg/db/db_client" "github.com/turbot/steampipe/pkg/db/db_common" "github.com/turbot/steampipe/pkg/db/db_local" - "github.com/turbot/steampipe/pkg/error_helpers" "github.com/turbot/steampipe/pkg/export" "github.com/turbot/steampipe/pkg/modinstaller" "github.com/turbot/steampipe/pkg/statushooks" + "github.com/turbot/steampipe/pkg/steampipeconfig" "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" "github.com/turbot/steampipe/pkg/workspace" ) @@ -30,6 +30,7 @@ type InitData struct { ShutdownTelemetry func() ExportManager *export.Manager + ConnectionMap steampipeconfig.ConnectionDataMap } func NewErrorInitData(err error) *InitData { @@ -140,15 +141,8 @@ func (i *InitData) Init(ctx context.Context, invoker constants.Invoker) { i.Result.Error = refreshResult.Error return } + i.ConnectionMap = refreshResult.ConnectionMap - // force creation of session data - se we see any prepared statement errors at once - sessionResult := i.Client.AcquireSession(ctx) - i.Result.AddWarnings(sessionResult.Warnings...) - if sessionResult.Error != nil { - i.Result.Error = fmt.Errorf("error acquiring database connection, %s", sessionResult.Error.Error()) - } else { - sessionResult.Session.Close(error_helpers.IsContextCanceled(ctx)) - } // add refresh connection warnings i.Result.AddWarnings(refreshResult.Warnings...) } diff --git a/pkg/interactive/interactive_client.go b/pkg/interactive/interactive_client.go index 73eaac383..74f52f008 100644 --- a/pkg/interactive/interactive_client.go +++ b/pkg/interactive/interactive_client.go @@ -3,6 +3,7 @@ package interactive import ( "bytes" "context" + "encoding/json" "fmt" "log" "os" @@ -15,6 +16,8 @@ import ( "github.com/alecthomas/chroma/lexers" "github.com/alecthomas/chroma/styles" "github.com/c-bata/go-prompt" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" "github.com/spf13/viper" "github.com/turbot/go-kit/helpers" "github.com/turbot/steampipe/pkg/cmdconfig" @@ -27,6 +30,7 @@ import ( "github.com/turbot/steampipe/pkg/query/queryhistory" "github.com/turbot/steampipe/pkg/schema" "github.com/turbot/steampipe/pkg/statushooks" + "github.com/turbot/steampipe/pkg/steampipeconfig" "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" "github.com/turbot/steampipe/pkg/utils" "github.com/turbot/steampipe/pkg/version" @@ -51,6 +55,11 @@ type InteractiveClient struct { // NOTE: should ONLY be called by cancelActiveQueryIfAny cancelActiveQuery context.CancelFunc cancelPrompt context.CancelFunc + // this cancellation is used to stop the pg notification listener which + // we use to get connection config updates from the plugin manager + // this is tied to a context which remaing valid throughout the life of the + // interactive session + cancelNotificationListener context.CancelFunc // channel used internally to pass the initialisation result initResultChan chan *db_common.InitResult // flag set when initialisation is complete (with or without errors) @@ -94,6 +103,7 @@ func newInteractiveClient(ctx context.Context, initData *query.InitData, result // asynchronously wait for init to complete // we start this immediately rather than lazy loading as we want to handle errors asap go c.readInitDataStream(ctx) + return c, nil } @@ -150,6 +160,10 @@ func (c *InteractiveClient) InteractivePrompt(parentContext context.Context) { c.hidePrompt = true c.interactivePrompt.ClearLine() + // stop the notification listener + if c.cancelNotificationListener != nil { + c.cancelNotificationListener() + } return } // create new context with a cancellation func @@ -246,8 +260,7 @@ func (c *InteractiveClient) runInteractivePrompt(ctx context.Context) (ret utils Key: prompt.ControlD, Fn: func(b *prompt.Buffer) { if b.Text() == "" { - // just set after close action - go prompt will handle the prompt shutdown - c.afterClose = AfterPromptCloseExit + c.ClosePrompt(AfterPromptCloseExit) } }, }), @@ -368,6 +381,7 @@ func (c *InteractiveClient) executor(ctx context.Context, line string) { c.cancelActiveQueryIfAny() } else { + // otherwise execute query t := time.Now() result, err := c.client().Execute(queryCtx, resolvedQuery.ExecuteSQL, resolvedQuery.Args...) @@ -503,7 +517,7 @@ func (c *InteractiveClient) executeMetaquery(ctx context.Context, query string) Query: query, Executor: client, Schema: c.schemaMetadata, - Connections: client.ConnectionMap(), + Connections: c.initData.ConnectionMap, Prompt: c.interactivePrompt, ClosePrompt: func() { c.afterClose = AfterPromptCloseExit }, }) @@ -612,3 +626,96 @@ func (c *InteractiveClient) startCancelHandler() chan bool { }() return quitChannel } + +func (c *InteractiveClient) listenToPgNotifications(ctx context.Context) error { + log.Printf("[TRACE] InteractiveClient listenToPgNotifications") + for ctx.Err() == nil { + conn, err := c.getNotificationConnection(ctx) + if err != nil { + return err + } + + log.Printf("[TRACE] Wait for notification") + notification, err := conn.Conn().WaitForNotification(ctx) + if err != nil && !error_helpers.IsContextCancelledError(err) { + log.Printf("[INFO] Error waiting for notification: %s", err) + } + conn.Release() + + if notification != nil { + c.handleConnectionUpdateNotification(ctx, notification) + } + } + log.Printf("[TRACE] InteractiveClient listenToPgNotifications DONE") + + return nil +} + +func (c *InteractiveClient) getNotificationConnection(ctx context.Context) (*pgxpool.Conn, error) { + sessionResult := c.client().AcquireSession(ctx) + + if sessionResult.Error != nil { + return nil, fmt.Errorf("error acquiring database connection to listenToPgNotifications to notifications, %s", sessionResult.Error.Error()) + } + + conn := sessionResult.Session.Connection + + listenSql := fmt.Sprintf("listen %s", constants.NotificationConnectionUpdate) + _, err := conn.Exec(context.Background(), listenSql) + if err != nil { + log.Printf("[INFO] Error listening to schema channel: %s", err) + conn.Release() + return nil, err + } + return conn, nil +} + +func (c *InteractiveClient) handleConnectionUpdateNotification(ctx context.Context, notification *pgconn.Notification) { + if notification == nil { + return + } + log.Printf("[TRACE] handleConnectionUpdateNotification: %s", notification.Payload) + n := &steampipeconfig.ConnectionUpdateNotification{} + err := json.Unmarshal([]byte(notification.Payload), n) + if err != nil { + log.Printf("[INFO] Error unmarshalling notification: %s", err) + return + } + + // reload the connection data map + // first load foreign schema names + if err := c.client().LoadSchemaNames(ctx); err != nil { + log.Printf("[INFO] Error loading foreign schema names: %v", err) + } + // now reload state + connectionMap, _, err := steampipeconfig.GetConnectionState(c.client().ForeignSchemaNames()) + if err != nil { + log.Printf("[INFO] Error loading connection state: %v", err) + return + } + // and save it + c.initData.ConnectionMap = connectionMap + + // reload config before reloading schema + config, err := steampipeconfig.LoadSteampipeConfig(viper.GetString(constants.ArgModLocation), "query") + if err != nil { + log.Printf("[WARN] Error reloading config: %s", err) + return + } + steampipeconfig.GlobalConfig = config + + // reload schema + if err := c.loadSchema(); err != nil { + log.Printf("[INFO] Error unmarshalling notification: %s", err) + return + } + // reinitialise autocomplete suggestions + c.initialiseSuggestions() + + // refresh the db session inside an execution lock + c.executionLock.Lock() + defer c.executionLock.Unlock() + + c.client().RefreshSessions(ctx) + log.Printf("[TRACE] completed refresh session") +} diff --git a/pkg/interactive/interactive_client_autocomplete.go b/pkg/interactive/interactive_client_autocomplete.go index 08a751de0..376af040a 100644 --- a/pkg/interactive/interactive_client_autocomplete.go +++ b/pkg/interactive/interactive_client_autocomplete.go @@ -76,12 +76,11 @@ func (c *InteractiveClient) initialiseTableSuggestions() { } var s []prompt.Suggest - connectionMap := c.initData.Client.ConnectionMap() // schema names var schemasToAdd []string // unqualified table names - initialise to the introspection table names - unqualifiedTablesToAdd := []string{} + var unqualifiedTablesToAdd []string // fully qualified table names var qualifiedTablesToAdd []string @@ -95,7 +94,7 @@ func (c *InteractiveClient) initialiseTableSuggestions() { // all other schema are ignored. // therefore, the only schema which will not have a connection is `public` var pluginOfThisSchema string - schemaConnection, hasConnectionForSchema := (*connectionMap)[schemaName] + schemaConnection, hasConnectionForSchema := c.initData.ConnectionMap[schemaName] if hasConnectionForSchema { pluginOfThisSchema = stripVersionFromPluginName(schemaConnection.Plugin) } diff --git a/pkg/interactive/interactive_client_init.go b/pkg/interactive/interactive_client_init.go index dbe329f42..f5fdc461a 100644 --- a/pkg/interactive/interactive_client_init.go +++ b/pkg/interactive/interactive_client_init.go @@ -3,7 +3,6 @@ package interactive import ( "context" "fmt" - "github.com/turbot/steampipe/pkg/statushooks" "log" "time" @@ -12,6 +11,7 @@ import ( "github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/db/db_common" "github.com/turbot/steampipe/pkg/error_helpers" + "github.com/turbot/steampipe/pkg/statushooks" "github.com/turbot/steampipe/pkg/workspace" ) @@ -59,6 +59,7 @@ func (c *InteractiveClient) handleInitResult(ctx context.Context, initResult *db c.interactivePrompt.Render() } + // initialise autocomplete suggestions c.initialiseSuggestions() // tell the workspace to reset the prompt after displaying async filewatcher messages c.initData.Workspace.SetOnFileWatcherEventMessages(func() { @@ -99,6 +100,11 @@ func (c *InteractiveClient) readInitDataStream(ctx context.Context) { c.initData.Result.Error = err } } + + // create a cancellation context used to cancel the listen thread when we exit + listenCtx, cancel := context.WithCancel(ctx) + go c.listenToPgNotifications(listenCtx) + c.cancelNotificationListener = cancel } func (c *InteractiveClient) workspaceWatcherErrorHandler(ctx context.Context, err error) { diff --git a/pkg/query/init_data.go b/pkg/query/init_data.go index 8230648f8..fe5623074 100644 --- a/pkg/query/init_data.go +++ b/pkg/query/init_data.go @@ -3,7 +3,6 @@ package query import ( "context" "fmt" - "github.com/spf13/viper" "github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/export" @@ -109,9 +108,6 @@ func (i *InitData) init(parentCtx context.Context, args []string) { i.Result.AddWarnings(errAndWarnings.Warnings...) i.Workspace = w - // set max DB connections to 1 - viper.Set(constants.ArgMaxParallel, 1) - statushooks.SetStatus(ctx, "Resolving arguments") // convert the query or sql file arg into an array of executable queries - check names queries in the current workspace diff --git a/pkg/query/metaquery/handlers.go b/pkg/query/metaquery/handlers.go index fc7f1fd38..4f4f9cb42 100644 --- a/pkg/query/metaquery/handlers.go +++ b/pkg/query/metaquery/handlers.go @@ -35,7 +35,7 @@ type HandlerInput struct { Query string Executor QueryExecutor Schema *schema.Metadata - Connections *steampipeconfig.ConnectionDataMap + Connections steampipeconfig.ConnectionDataMap Prompt *prompt.Prompt ClosePrompt func() } @@ -335,7 +335,7 @@ func listConnections(ctx context.Context, input *HandlerInput) error { if schema == input.Schema.TemporarySchemaName { continue } - plugin, found := (*input.Connections)[schema] + plugin, found := input.Connections[schema] if found { rows = append(rows, []string{schema, plugin.Plugin}) } else { diff --git a/pkg/steampipeconfig/connection_update_notification.go b/pkg/steampipeconfig/connection_update_notification.go new file mode 100644 index 000000000..62731efa9 --- /dev/null +++ b/pkg/steampipeconfig/connection_update_notification.go @@ -0,0 +1,17 @@ +package steampipeconfig + +import ( + "golang.org/x/exp/maps" +) + +type ConnectionUpdateNotification struct { + Update []string + Delete []string +} + +func NewConnectionUpdateNotification(updates *ConnectionUpdates) *ConnectionUpdateNotification { + return &ConnectionUpdateNotification{ + Update: maps.Keys(updates.Update), + Delete: maps.Keys(updates.Delete), + } +} diff --git a/pkg/steampipeconfig/refresh_connections_result.go b/pkg/steampipeconfig/refresh_connections_result.go index 7ebf5b741..6f38d031c 100644 --- a/pkg/steampipeconfig/refresh_connections_result.go +++ b/pkg/steampipeconfig/refresh_connections_result.go @@ -14,6 +14,7 @@ type RefreshConnectionResult struct { UpdatedConnections bool Updates *ConnectionUpdates FailedConnections map[string]string + ConnectionMap ConnectionDataMap } func (r *RefreshConnectionResult) Merge(other *RefreshConnectionResult) { diff --git a/pkg/version/version.go b/pkg/version/version.go index 3aff9f873..3d3a85c09 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -23,7 +23,7 @@ var steampipeVersion = "0.19.0" // A pre-release marker for the version. If this is "" (empty string) // then it means that it is a final release. Otherwise, this is a pre-release // such as "dev" (in development), "beta", "rc1", etc. -var prerelease = "rc.7" +var prerelease = "rc.8" // SteampipeVersion is an instance of semver.Version. This has the secondary // benefit of verifying during tests and init time that our version is a diff --git a/pluginmanager_service/plugin_manager.go b/pluginmanager_service/plugin_manager.go index f3bdb0069..6bbd34471 100644 --- a/pluginmanager_service/plugin_manager.go +++ b/pluginmanager_service/plugin_manager.go @@ -3,8 +3,10 @@ package pluginmanager_service import ( "context" "crypto/md5" + "encoding/json" "fmt" "github.com/turbot/steampipe/pkg/connectionwatcher" + "github.com/turbot/steampipe/pkg/steampipeconfig" "log" "os" "os/exec" @@ -133,17 +135,37 @@ func (m *PluginManager) Get(req *proto.GetRequest) (*proto.GetResponse, error) { return resp, nil } -func (m *PluginManager) SetConnectionConfigMap(configMap connectionwatcher.ConnectionConfigMap) { +// OnConnectionConfigChanged is the callback function invoked by the connection watcher when the config changed +func (m *PluginManager) OnConnectionConfigChanged(configMap connectionwatcher.ConnectionConfigMap) { m.mut.Lock() defer m.mut.Unlock() names := utils.SortedMapKeys(configMap) - log.Printf("[TRACE] SetConnectionConfigMap: %s", strings.Join(names, ",")) + log.Printf("[TRACE] OnConnectionConfigChanged: %s", strings.Join(names, ",")) err := m.handleConnectionConfigChanges(configMap) if err != nil { log.Printf("[WARN] handleConnectionConfigChanges returned error: %s", err.Error()) } + +} + +// OnSchemaChanged is the callback function invoked by the connection watcher when connections are added or removed +func (m *PluginManager) OnSchemaChanged(refreshResult *steampipeconfig.RefreshConnectionResult) { + // this is a file system event handler and not bound to any context + ctx := context.Background() + + client, err := db_local.NewLocalClient(ctx, constants.InvokerConnectionWatcher, nil) + if err != nil { + log.Printf("[TRACE] error creating client to handle updated connection config: %s", err.Error()) + } + defer client.Close(ctx) + notification := steampipeconfig.NewConnectionUpdateNotification(refreshResult.Updates) + if err != nil { + log.Printf("[WARN] Error sending notification: %s", err) + } else { + m.notifySchemaChange(notification, client) + } } func (m *PluginManager) Shutdown(req *proto.ShutdownRequest) (resp *proto.ShutdownResponse, err error) { @@ -696,6 +718,8 @@ func (m *PluginManager) setSingleConnectionConfig(pluginClient *sdkgrpc.PluginCl return pluginClient.SetConnectionConfig(req) } +// update the schema for the specified connection +// called from the message server func (m *PluginManager) updateConnectionSchema(ctx context.Context, connection string) { log.Printf("[TRACE] updateConnectionSchema connection %s", connection) // now refresh connections and search paths @@ -710,6 +734,25 @@ func (m *PluginManager) updateConnectionSchema(ctx context.Context, connection s log.Printf("[TRACE] error refreshing connections: %s", refreshResult.Error) return } + + // also send a postgres notification + m.notifySchemaChange(&steampipeconfig.ConnectionUpdateNotification{Update: []string{connection}}, client) +} + +// send a postgres notification that the schema has chganged +func (m *PluginManager) notifySchemaChange(notification *steampipeconfig.ConnectionUpdateNotification, client *db_local.LocalDbClient) { + notificationBytes, err := json.Marshal(notification) + if err != nil { + log.Printf("[WARN] Error marshalling schema change notification notification: %s", err) + return + } + log.Printf("[WARN] Send update notification") + + sql := fmt.Sprintf("select pg_notify('%s', $1)", constants.NotificationConnectionUpdate) + _, err = client.ExecuteSync(context.Background(), sql, notificationBytes) + if err != nil { + log.Printf("[WARN] Error sending notification: %s", err) + } } func (m *PluginManager) nonAggregatorConnectionCount() int {