From 2171ce66ba98144e78d7dfd2e25c2808dfd9cf47 Mon Sep 17 00:00:00 2001 From: kaidaguerre Date: Fri, 3 Mar 2023 18:45:40 +0000 Subject: [PATCH] Fix Postgres error "cached plan must not change result type" when dynamic plugin schema changes. Update interactive .inspect and autocomplete based on changes to connection config or dynamic schema updates. Closes #3184. Closes #3185 --- cmd/plugin.go | 2 +- cmd/plugin_manager.go | 2 +- pkg/connectionwatcher/connection_watcher.go | 32 ++--- pkg/constants/notifications.go | 5 + pkg/db/db_client/db_client.go | 4 - pkg/db/db_client/db_client_session.go | 1 + pkg/db/db_common/client.go | 3 - pkg/db/db_local/local_db_client.go | 15 +-- pkg/initialisation/init_data.go | 12 +- pkg/interactive/interactive_client.go | 113 +++++++++++++++++- .../interactive_client_autocomplete.go | 5 +- pkg/interactive/interactive_client_init.go | 8 +- pkg/query/init_data.go | 4 - pkg/query/metaquery/handlers.go | 4 +- .../connection_update_notification.go | 17 +++ .../refresh_connections_result.go | 1 + pkg/version/version.go | 2 +- pluginmanager_service/plugin_manager.go | 47 +++++++- 18 files changed, 217 insertions(+), 60 deletions(-) create mode 100644 pkg/constants/notifications.go create mode 100644 pkg/steampipeconfig/connection_update_notification.go diff --git a/cmd/plugin.go b/cmd/plugin.go index f1cb6871b..c03e762a3 100644 --- a/cmd/plugin.go +++ b/cmd/plugin.go @@ -699,7 +699,7 @@ func getPluginConnectionMap(ctx context.Context) (map[string][]modconfig.Connect } pluginConnectionMap := make(map[string][]modconfig.Connection) - for _, v := range *client.ConnectionMap() { + for _, v := range res.ConnectionMap { _, found := pluginConnectionMap[v.Plugin] if !found { pluginConnectionMap[v.Plugin] = []modconfig.Connection{} diff --git a/cmd/plugin_manager.go b/cmd/plugin_manager.go index 36af5273f..518460644 100644 --- a/cmd/plugin_manager.go +++ b/cmd/plugin_manager.go @@ -56,7 +56,7 @@ func runPluginManagerCmd(cmd *cobra.Command, args []string) { if shouldRunConnectionWatcher() { log.Printf("[INFO] starting connection watcher") - connectionWatcher, err := connectionwatcher.NewConnectionWatcher(pluginManager.SetConnectionConfigMap) + connectionWatcher, err := connectionwatcher.NewConnectionWatcher(pluginManager.OnConnectionConfigChanged, pluginManager.OnSchemaChanged) if err != nil { log.Printf("[WARN] failed to create connection watcher: %s", err.Error()) error_helpers.ShowError(ctx, err) diff --git a/pkg/connectionwatcher/connection_watcher.go b/pkg/connectionwatcher/connection_watcher.go index b252f1219..a339e316d 100644 --- a/pkg/connectionwatcher/connection_watcher.go +++ b/pkg/connectionwatcher/connection_watcher.go @@ -18,12 +18,14 @@ import ( type ConnectionWatcher struct { fileWatcherErrorHandler func(error) watcher *filewatcher.FileWatcher - onConnectionConfigChanged func(configMap ConnectionConfigMap) + onConnectionConfigChanged func(ConnectionConfigMap) + onSchemaChanged func(*steampipeconfig.RefreshConnectionResult) } -func NewConnectionWatcher(onConnectionChanged func(configMap ConnectionConfigMap)) (*ConnectionWatcher, error) { +func NewConnectionWatcher(onConnectionChanged func(ConnectionConfigMap), onSchemaChanged func(*steampipeconfig.RefreshConnectionResult)) (*ConnectionWatcher, error) { w := &ConnectionWatcher{ onConnectionConfigChanged: onConnectionChanged, + onSchemaChanged: onSchemaChanged, } watcherOptions := &filewatcher.WatcherOptions{ @@ -77,24 +79,18 @@ func (w *ConnectionWatcher) handleFileWatcherEvent(_ []fsnotify.Event) { } defer client.Close(ctx) - log.Printf("[TRACE] loaded updated config") - - log.Printf("[TRACE] calling onConnectionConfigChanged") - // convert config to format expected by plugin manager - // (plugin manager cannot reference steampipe config to avoid circular deps) - configMap := NewConnectionConfigMap(config.Connections) - // call on changed callback - // (this calls pluginmanager.SetConnectionConfigMap) - w.onConnectionConfigChanged(configMap) - - log.Printf("[TRACE] calling RefreshConnectionAndSearchPaths") - // We need to update the viper config and GlobalConfig // as these are both used by RefreshConnectionAndSearchPaths // set the global steampipe config steampipeconfig.GlobalConfig = config + // call on changed callback - we must call this BEFORE calling refresh connections + // convert config to format expected by plugin manager + // (plugin manager cannot reference steampipe config to avoid circular deps) + configMap := NewConnectionConfigMap(config.Connections) + w.onConnectionConfigChanged(configMap) + // The only configurations from GlobalConfig which have // impact during Refresh are Database options and the Connections // themselves. @@ -108,16 +104,22 @@ func (w *ConnectionWatcher) handleFileWatcherEvent(_ []fsnotify.Event) { // to use the GlobalConfig here and ignore Workspace Profile in general cmdconfig.SetDefaultsFromConfig(steampipeconfig.GlobalConfig.ConfigMap()) + log.Printf("[TRACE] calling RefreshConnectionAndSearchPaths") // now refresh connections and search paths refreshResult := client.RefreshConnectionAndSearchPaths(ctx) if refreshResult.Error != nil { log.Printf("[WARN] error refreshing connections: %s", refreshResult.Error) return } + // if the connections were added or removed, call the schema changed callback + if refreshResult.UpdatedConnections { + w.onSchemaChanged(refreshResult) + } // display any refresh warnings - // TODO send warnings on warning_stream (to FDW???) + // TODO send warnings on warning_stream refreshResult.ShowWarnings() + log.Printf("[TRACE] File watch event done") } func (w *ConnectionWatcher) Close() { diff --git a/pkg/constants/notifications.go b/pkg/constants/notifications.go new file mode 100644 index 000000000..ca36e88b9 --- /dev/null +++ b/pkg/constants/notifications.go @@ -0,0 +1,5 @@ +package constants + +const ( + NotificationConnectionUpdate = "connection_update" +) diff --git a/pkg/db/db_client/db_client.go b/pkg/db/db_client/db_client.go index b51d5fdc0..2b879cac0 100644 --- a/pkg/db/db_client/db_client.go +++ b/pkg/db/db_client/db_client.go @@ -124,10 +124,6 @@ func (c *DbClient) Close(context.Context) error { return nil } -func (c *DbClient) ConnectionMap() *steampipeconfig.ConnectionDataMap { - return &steampipeconfig.ConnectionDataMap{} -} - // ForeignSchemaNames implements Client func (c *DbClient) ForeignSchemaNames() []string { return c.foreignSchemaNames diff --git a/pkg/db/db_client/db_client_session.go b/pkg/db/db_client/db_client_session.go index d2d55efef..2de474566 100644 --- a/pkg/db/db_client/db_client_session.go +++ b/pkg/db/db_client/db_client_session.go @@ -80,6 +80,7 @@ func (c *DbClient) AcquireSession(ctx context.Context) (sessionResult *db_common return sessionResult } + sessionResult.Error = ctx.Err() return sessionResult } diff --git a/pkg/db/db_common/client.go b/pkg/db/db_common/client.go index 6f9860df5..5e435b494 100644 --- a/pkg/db/db_common/client.go +++ b/pkg/db/db_common/client.go @@ -3,7 +3,6 @@ package db_common import ( "context" "database/sql" - "github.com/turbot/steampipe/pkg/query/queryresult" "github.com/turbot/steampipe/pkg/schema" "github.com/turbot/steampipe/pkg/steampipeconfig" @@ -14,9 +13,7 @@ type Client interface { ForeignSchemaNames() []string AllSchemaNames() []string - LoadSchemaNames(ctx context.Context) error - ConnectionMap() *steampipeconfig.ConnectionDataMap GetCurrentSearchPath(context.Context) ([]string, error) GetCurrentSearchPathForDbConnection(context.Context, *sql.Conn) ([]string, error) diff --git a/pkg/db/db_local/local_db_client.go b/pkg/db/db_local/local_db_client.go index 25c6e2870..bb2c902e3 100644 --- a/pkg/db/db_local/local_db_client.go +++ b/pkg/db/db_local/local_db_client.go @@ -20,9 +20,8 @@ import ( // LocalDbClient wraps over DbClient type LocalDbClient struct { - client *db_client.DbClient - invoker constants.Invoker - connectionMap *steampipeconfig.ConnectionDataMap + client *db_client.DbClient + invoker constants.Invoker } // GetLocalClient starts service if needed and creates a new LocalDbClient @@ -98,10 +97,6 @@ func (c *LocalDbClient) LoadSchemaNames(ctx context.Context) error { return c.client.LoadSchemaNames(ctx) } -func (c *LocalDbClient) ConnectionMap() *steampipeconfig.ConnectionDataMap { - return c.connectionMap -} - func (c *LocalDbClient) RefreshSessions(ctx context.Context) *db_common.AcquireSessionResult { return c.client.RefreshSessions(ctx) } @@ -182,7 +177,7 @@ func (c *LocalDbClient) GetSchemaFromDB(ctx context.Context) (*schema.Metadata, query := c.buildSchemasQuery(schemas) acquireSessionResult := c.AcquireSession(ctx) - if acquireSessionResult.Error != nil { + if acquireSessionResult.Error != nil && acquireSessionResult.Session != nil { acquireSessionResult.Session.Close(false) return nil, err } @@ -263,8 +258,6 @@ WHERE %s return query } -// local only functions - func (c *LocalDbClient) RefreshConnectionAndSearchPaths(ctx context.Context, forceUpdateConnectionNames ...string) *steampipeconfig.RefreshConnectionResult { statushooks.SetStatus(ctx, "Refreshing connections") res := c.refreshConnections(ctx, forceUpdateConnectionNames...) @@ -292,7 +285,7 @@ func (c *LocalDbClient) RefreshConnectionAndSearchPaths(ctx context.Context, for res.Error = err return res } - c.connectionMap = &connectionMap + res.ConnectionMap = connectionMap // set user search path first - client may fall back to using it statushooks.SetStatus(ctx, "Setting up search path") diff --git a/pkg/initialisation/init_data.go b/pkg/initialisation/init_data.go index 7293e0ccd..8dfea1698 100644 --- a/pkg/initialisation/init_data.go +++ b/pkg/initialisation/init_data.go @@ -12,10 +12,10 @@ import ( "github.com/turbot/steampipe/pkg/db/db_client" "github.com/turbot/steampipe/pkg/db/db_common" "github.com/turbot/steampipe/pkg/db/db_local" - "github.com/turbot/steampipe/pkg/error_helpers" "github.com/turbot/steampipe/pkg/export" "github.com/turbot/steampipe/pkg/modinstaller" "github.com/turbot/steampipe/pkg/statushooks" + "github.com/turbot/steampipe/pkg/steampipeconfig" "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" "github.com/turbot/steampipe/pkg/workspace" ) @@ -30,6 +30,7 @@ type InitData struct { ShutdownTelemetry func() ExportManager *export.Manager + ConnectionMap steampipeconfig.ConnectionDataMap } func NewErrorInitData(err error) *InitData { @@ -140,15 +141,8 @@ func (i *InitData) Init(ctx context.Context, invoker constants.Invoker) { i.Result.Error = refreshResult.Error return } + i.ConnectionMap = refreshResult.ConnectionMap - // force creation of session data - se we see any prepared statement errors at once - sessionResult := i.Client.AcquireSession(ctx) - i.Result.AddWarnings(sessionResult.Warnings...) - if sessionResult.Error != nil { - i.Result.Error = fmt.Errorf("error acquiring database connection, %s", sessionResult.Error.Error()) - } else { - sessionResult.Session.Close(error_helpers.IsContextCanceled(ctx)) - } // add refresh connection warnings i.Result.AddWarnings(refreshResult.Warnings...) } diff --git a/pkg/interactive/interactive_client.go b/pkg/interactive/interactive_client.go index 73eaac383..74f52f008 100644 --- a/pkg/interactive/interactive_client.go +++ b/pkg/interactive/interactive_client.go @@ -3,6 +3,7 @@ package interactive import ( "bytes" "context" + "encoding/json" "fmt" "log" "os" @@ -15,6 +16,8 @@ import ( "github.com/alecthomas/chroma/lexers" "github.com/alecthomas/chroma/styles" "github.com/c-bata/go-prompt" + "github.com/jackc/pgx/v5/pgconn" + "github.com/jackc/pgx/v5/pgxpool" "github.com/spf13/viper" "github.com/turbot/go-kit/helpers" "github.com/turbot/steampipe/pkg/cmdconfig" @@ -27,6 +30,7 @@ import ( "github.com/turbot/steampipe/pkg/query/queryhistory" "github.com/turbot/steampipe/pkg/schema" "github.com/turbot/steampipe/pkg/statushooks" + "github.com/turbot/steampipe/pkg/steampipeconfig" "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" "github.com/turbot/steampipe/pkg/utils" "github.com/turbot/steampipe/pkg/version" @@ -51,6 +55,11 @@ type InteractiveClient struct { // NOTE: should ONLY be called by cancelActiveQueryIfAny cancelActiveQuery context.CancelFunc cancelPrompt context.CancelFunc + // this cancellation is used to stop the pg notification listener which + // we use to get connection config updates from the plugin manager + // this is tied to a context which remaing valid throughout the life of the + // interactive session + cancelNotificationListener context.CancelFunc // channel used internally to pass the initialisation result initResultChan chan *db_common.InitResult // flag set when initialisation is complete (with or without errors) @@ -94,6 +103,7 @@ func newInteractiveClient(ctx context.Context, initData *query.InitData, result // asynchronously wait for init to complete // we start this immediately rather than lazy loading as we want to handle errors asap go c.readInitDataStream(ctx) + return c, nil } @@ -150,6 +160,10 @@ func (c *InteractiveClient) InteractivePrompt(parentContext context.Context) { c.hidePrompt = true c.interactivePrompt.ClearLine() + // stop the notification listener + if c.cancelNotificationListener != nil { + c.cancelNotificationListener() + } return } // create new context with a cancellation func @@ -246,8 +260,7 @@ func (c *InteractiveClient) runInteractivePrompt(ctx context.Context) (ret utils Key: prompt.ControlD, Fn: func(b *prompt.Buffer) { if b.Text() == "" { - // just set after close action - go prompt will handle the prompt shutdown - c.afterClose = AfterPromptCloseExit + c.ClosePrompt(AfterPromptCloseExit) } }, }), @@ -368,6 +381,7 @@ func (c *InteractiveClient) executor(ctx context.Context, line string) { c.cancelActiveQueryIfAny() } else { + // otherwise execute query t := time.Now() result, err := c.client().Execute(queryCtx, resolvedQuery.ExecuteSQL, resolvedQuery.Args...) @@ -503,7 +517,7 @@ func (c *InteractiveClient) executeMetaquery(ctx context.Context, query string) Query: query, Executor: client, Schema: c.schemaMetadata, - Connections: client.ConnectionMap(), + Connections: c.initData.ConnectionMap, Prompt: c.interactivePrompt, ClosePrompt: func() { c.afterClose = AfterPromptCloseExit }, }) @@ -612,3 +626,96 @@ func (c *InteractiveClient) startCancelHandler() chan bool { }() return quitChannel } + +func (c *InteractiveClient) listenToPgNotifications(ctx context.Context) error { + log.Printf("[TRACE] InteractiveClient listenToPgNotifications") + for ctx.Err() == nil { + conn, err := c.getNotificationConnection(ctx) + if err != nil { + return err + } + + log.Printf("[TRACE] Wait for notification") + notification, err := conn.Conn().WaitForNotification(ctx) + if err != nil && !error_helpers.IsContextCancelledError(err) { + log.Printf("[INFO] Error waiting for notification: %s", err) + } + conn.Release() + + if notification != nil { + c.handleConnectionUpdateNotification(ctx, notification) + } + } + log.Printf("[TRACE] InteractiveClient listenToPgNotifications DONE") + + return nil +} + +func (c *InteractiveClient) getNotificationConnection(ctx context.Context) (*pgxpool.Conn, error) { + sessionResult := c.client().AcquireSession(ctx) + + if sessionResult.Error != nil { + return nil, fmt.Errorf("error acquiring database connection to listenToPgNotifications to notifications, %s", sessionResult.Error.Error()) + } + + conn := sessionResult.Session.Connection + + listenSql := fmt.Sprintf("listen %s", constants.NotificationConnectionUpdate) + _, err := conn.Exec(context.Background(), listenSql) + if err != nil { + log.Printf("[INFO] Error listening to schema channel: %s", err) + conn.Release() + return nil, err + } + return conn, nil +} + +func (c *InteractiveClient) handleConnectionUpdateNotification(ctx context.Context, notification *pgconn.Notification) { + if notification == nil { + return + } + log.Printf("[TRACE] handleConnectionUpdateNotification: %s", notification.Payload) + n := &steampipeconfig.ConnectionUpdateNotification{} + err := json.Unmarshal([]byte(notification.Payload), n) + if err != nil { + log.Printf("[INFO] Error unmarshalling notification: %s", err) + return + } + + // reload the connection data map + // first load foreign schema names + if err := c.client().LoadSchemaNames(ctx); err != nil { + log.Printf("[INFO] Error loading foreign schema names: %v", err) + } + // now reload state + connectionMap, _, err := steampipeconfig.GetConnectionState(c.client().ForeignSchemaNames()) + if err != nil { + log.Printf("[INFO] Error loading connection state: %v", err) + return + } + // and save it + c.initData.ConnectionMap = connectionMap + + // reload config before reloading schema + config, err := steampipeconfig.LoadSteampipeConfig(viper.GetString(constants.ArgModLocation), "query") + if err != nil { + log.Printf("[WARN] Error reloading config: %s", err) + return + } + steampipeconfig.GlobalConfig = config + + // reload schema + if err := c.loadSchema(); err != nil { + log.Printf("[INFO] Error unmarshalling notification: %s", err) + return + } + // reinitialise autocomplete suggestions + c.initialiseSuggestions() + + // refresh the db session inside an execution lock + c.executionLock.Lock() + defer c.executionLock.Unlock() + + c.client().RefreshSessions(ctx) + log.Printf("[TRACE] completed refresh session") +} diff --git a/pkg/interactive/interactive_client_autocomplete.go b/pkg/interactive/interactive_client_autocomplete.go index 08a751de0..376af040a 100644 --- a/pkg/interactive/interactive_client_autocomplete.go +++ b/pkg/interactive/interactive_client_autocomplete.go @@ -76,12 +76,11 @@ func (c *InteractiveClient) initialiseTableSuggestions() { } var s []prompt.Suggest - connectionMap := c.initData.Client.ConnectionMap() // schema names var schemasToAdd []string // unqualified table names - initialise to the introspection table names - unqualifiedTablesToAdd := []string{} + var unqualifiedTablesToAdd []string // fully qualified table names var qualifiedTablesToAdd []string @@ -95,7 +94,7 @@ func (c *InteractiveClient) initialiseTableSuggestions() { // all other schema are ignored. // therefore, the only schema which will not have a connection is `public` var pluginOfThisSchema string - schemaConnection, hasConnectionForSchema := (*connectionMap)[schemaName] + schemaConnection, hasConnectionForSchema := c.initData.ConnectionMap[schemaName] if hasConnectionForSchema { pluginOfThisSchema = stripVersionFromPluginName(schemaConnection.Plugin) } diff --git a/pkg/interactive/interactive_client_init.go b/pkg/interactive/interactive_client_init.go index dbe329f42..f5fdc461a 100644 --- a/pkg/interactive/interactive_client_init.go +++ b/pkg/interactive/interactive_client_init.go @@ -3,7 +3,6 @@ package interactive import ( "context" "fmt" - "github.com/turbot/steampipe/pkg/statushooks" "log" "time" @@ -12,6 +11,7 @@ import ( "github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/db/db_common" "github.com/turbot/steampipe/pkg/error_helpers" + "github.com/turbot/steampipe/pkg/statushooks" "github.com/turbot/steampipe/pkg/workspace" ) @@ -59,6 +59,7 @@ func (c *InteractiveClient) handleInitResult(ctx context.Context, initResult *db c.interactivePrompt.Render() } + // initialise autocomplete suggestions c.initialiseSuggestions() // tell the workspace to reset the prompt after displaying async filewatcher messages c.initData.Workspace.SetOnFileWatcherEventMessages(func() { @@ -99,6 +100,11 @@ func (c *InteractiveClient) readInitDataStream(ctx context.Context) { c.initData.Result.Error = err } } + + // create a cancellation context used to cancel the listen thread when we exit + listenCtx, cancel := context.WithCancel(ctx) + go c.listenToPgNotifications(listenCtx) + c.cancelNotificationListener = cancel } func (c *InteractiveClient) workspaceWatcherErrorHandler(ctx context.Context, err error) { diff --git a/pkg/query/init_data.go b/pkg/query/init_data.go index 8230648f8..fe5623074 100644 --- a/pkg/query/init_data.go +++ b/pkg/query/init_data.go @@ -3,7 +3,6 @@ package query import ( "context" "fmt" - "github.com/spf13/viper" "github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/export" @@ -109,9 +108,6 @@ func (i *InitData) init(parentCtx context.Context, args []string) { i.Result.AddWarnings(errAndWarnings.Warnings...) i.Workspace = w - // set max DB connections to 1 - viper.Set(constants.ArgMaxParallel, 1) - statushooks.SetStatus(ctx, "Resolving arguments") // convert the query or sql file arg into an array of executable queries - check names queries in the current workspace diff --git a/pkg/query/metaquery/handlers.go b/pkg/query/metaquery/handlers.go index fc7f1fd38..4f4f9cb42 100644 --- a/pkg/query/metaquery/handlers.go +++ b/pkg/query/metaquery/handlers.go @@ -35,7 +35,7 @@ type HandlerInput struct { Query string Executor QueryExecutor Schema *schema.Metadata - Connections *steampipeconfig.ConnectionDataMap + Connections steampipeconfig.ConnectionDataMap Prompt *prompt.Prompt ClosePrompt func() } @@ -335,7 +335,7 @@ func listConnections(ctx context.Context, input *HandlerInput) error { if schema == input.Schema.TemporarySchemaName { continue } - plugin, found := (*input.Connections)[schema] + plugin, found := input.Connections[schema] if found { rows = append(rows, []string{schema, plugin.Plugin}) } else { diff --git a/pkg/steampipeconfig/connection_update_notification.go b/pkg/steampipeconfig/connection_update_notification.go new file mode 100644 index 000000000..62731efa9 --- /dev/null +++ b/pkg/steampipeconfig/connection_update_notification.go @@ -0,0 +1,17 @@ +package steampipeconfig + +import ( + "golang.org/x/exp/maps" +) + +type ConnectionUpdateNotification struct { + Update []string + Delete []string +} + +func NewConnectionUpdateNotification(updates *ConnectionUpdates) *ConnectionUpdateNotification { + return &ConnectionUpdateNotification{ + Update: maps.Keys(updates.Update), + Delete: maps.Keys(updates.Delete), + } +} diff --git a/pkg/steampipeconfig/refresh_connections_result.go b/pkg/steampipeconfig/refresh_connections_result.go index 7ebf5b741..6f38d031c 100644 --- a/pkg/steampipeconfig/refresh_connections_result.go +++ b/pkg/steampipeconfig/refresh_connections_result.go @@ -14,6 +14,7 @@ type RefreshConnectionResult struct { UpdatedConnections bool Updates *ConnectionUpdates FailedConnections map[string]string + ConnectionMap ConnectionDataMap } func (r *RefreshConnectionResult) Merge(other *RefreshConnectionResult) { diff --git a/pkg/version/version.go b/pkg/version/version.go index 3aff9f873..3d3a85c09 100644 --- a/pkg/version/version.go +++ b/pkg/version/version.go @@ -23,7 +23,7 @@ var steampipeVersion = "0.19.0" // A pre-release marker for the version. If this is "" (empty string) // then it means that it is a final release. Otherwise, this is a pre-release // such as "dev" (in development), "beta", "rc1", etc. -var prerelease = "rc.7" +var prerelease = "rc.8" // SteampipeVersion is an instance of semver.Version. This has the secondary // benefit of verifying during tests and init time that our version is a diff --git a/pluginmanager_service/plugin_manager.go b/pluginmanager_service/plugin_manager.go index f3bdb0069..6bbd34471 100644 --- a/pluginmanager_service/plugin_manager.go +++ b/pluginmanager_service/plugin_manager.go @@ -3,8 +3,10 @@ package pluginmanager_service import ( "context" "crypto/md5" + "encoding/json" "fmt" "github.com/turbot/steampipe/pkg/connectionwatcher" + "github.com/turbot/steampipe/pkg/steampipeconfig" "log" "os" "os/exec" @@ -133,17 +135,37 @@ func (m *PluginManager) Get(req *proto.GetRequest) (*proto.GetResponse, error) { return resp, nil } -func (m *PluginManager) SetConnectionConfigMap(configMap connectionwatcher.ConnectionConfigMap) { +// OnConnectionConfigChanged is the callback function invoked by the connection watcher when the config changed +func (m *PluginManager) OnConnectionConfigChanged(configMap connectionwatcher.ConnectionConfigMap) { m.mut.Lock() defer m.mut.Unlock() names := utils.SortedMapKeys(configMap) - log.Printf("[TRACE] SetConnectionConfigMap: %s", strings.Join(names, ",")) + log.Printf("[TRACE] OnConnectionConfigChanged: %s", strings.Join(names, ",")) err := m.handleConnectionConfigChanges(configMap) if err != nil { log.Printf("[WARN] handleConnectionConfigChanges returned error: %s", err.Error()) } + +} + +// OnSchemaChanged is the callback function invoked by the connection watcher when connections are added or removed +func (m *PluginManager) OnSchemaChanged(refreshResult *steampipeconfig.RefreshConnectionResult) { + // this is a file system event handler and not bound to any context + ctx := context.Background() + + client, err := db_local.NewLocalClient(ctx, constants.InvokerConnectionWatcher, nil) + if err != nil { + log.Printf("[TRACE] error creating client to handle updated connection config: %s", err.Error()) + } + defer client.Close(ctx) + notification := steampipeconfig.NewConnectionUpdateNotification(refreshResult.Updates) + if err != nil { + log.Printf("[WARN] Error sending notification: %s", err) + } else { + m.notifySchemaChange(notification, client) + } } func (m *PluginManager) Shutdown(req *proto.ShutdownRequest) (resp *proto.ShutdownResponse, err error) { @@ -696,6 +718,8 @@ func (m *PluginManager) setSingleConnectionConfig(pluginClient *sdkgrpc.PluginCl return pluginClient.SetConnectionConfig(req) } +// update the schema for the specified connection +// called from the message server func (m *PluginManager) updateConnectionSchema(ctx context.Context, connection string) { log.Printf("[TRACE] updateConnectionSchema connection %s", connection) // now refresh connections and search paths @@ -710,6 +734,25 @@ func (m *PluginManager) updateConnectionSchema(ctx context.Context, connection s log.Printf("[TRACE] error refreshing connections: %s", refreshResult.Error) return } + + // also send a postgres notification + m.notifySchemaChange(&steampipeconfig.ConnectionUpdateNotification{Update: []string{connection}}, client) +} + +// send a postgres notification that the schema has chganged +func (m *PluginManager) notifySchemaChange(notification *steampipeconfig.ConnectionUpdateNotification, client *db_local.LocalDbClient) { + notificationBytes, err := json.Marshal(notification) + if err != nil { + log.Printf("[WARN] Error marshalling schema change notification notification: %s", err) + return + } + log.Printf("[WARN] Send update notification") + + sql := fmt.Sprintf("select pg_notify('%s', $1)", constants.NotificationConnectionUpdate) + _, err = client.ExecuteSync(context.Background(), sql, notificationBytes) + if err != nil { + log.Printf("[WARN] Error sending notification: %s", err) + } } func (m *PluginManager) nonAggregatorConnectionCount() int {