Add support to retrieve plugin rate limiter definitions and use to populate steampipe_rate_limiter table. Closes #3805 (#3803)

This commit is contained in:
kaidaguerre
2023-09-06 13:28:22 +01:00
committed by GitHub
parent dc0042aaf8
commit 2a86d08445
32 changed files with 832 additions and 355 deletions

View File

@@ -33,7 +33,6 @@ func pluginManagerCmd() *cobra.Command {
}
func runPluginManagerCmd(cmd *cobra.Command, _ []string) {
ctx := cmd.Context()
logger := createPluginManagerLog()
log.Printf("[INFO] starting plugin manager")
@@ -57,7 +56,7 @@ func runPluginManagerCmd(cmd *cobra.Command, _ []string) {
configMap := connection.NewConnectionConfigMap(steampipeConfig.Connections)
log.Printf("[TRACE] loaded config map: %s", strings.Join(steampipeConfig.ConnectionNames(), ","))
pluginManager, err := pluginmanager_service.NewPluginManager(ctx, configMap, steampipeConfig.Limiters, logger)
pluginManager, err := pluginmanager_service.NewPluginManager(cmd.Context(), configMap, steampipeConfig.Limiters, logger)
if err != nil {
log.Printf("[WARN] failed to create plugin manager: %s", err.Error())
os.Exit(1)
@@ -65,7 +64,7 @@ func runPluginManagerCmd(cmd *cobra.Command, _ []string) {
if shouldRunConnectionWatcher() {
log.Printf("[INFO] starting connection watcher")
connectionWatcher, err := connection.NewConnectionWatcher(pluginManager.OnConnectionConfigChanged)
connectionWatcher, err := connection.NewConnectionWatcher(pluginManager)
if err != nil {
log.Printf("[WARN] failed to create connection watcher: %s", err.Error())
os.Exit(1)

View File

@@ -364,7 +364,7 @@ func validateConfig() error {
diagnostics, ok := os.LookupEnv(plugin.EnvDiagnosticsLevel)
if ok {
if _, isValid := plugin.ValidDiagnosticsLevels[diagnostics]; !isValid {
if _, isValid := plugin.ValidDiagnosticsLevels[strings.ToUpper(diagnostics)]; !isValid {
return fmt.Errorf(`invalid value of '%s' (%s), must be one of: %s`, plugin.EnvDiagnosticsLevel, diagnostics, strings.Join(maps.Keys(plugin.ValidDiagnosticsLevels), ", "))
}
}

2
go.mod
View File

@@ -45,7 +45,7 @@ require (
github.com/stevenle/topsort v0.2.0
github.com/turbot/go-kit v0.8.0-rc.0
github.com/turbot/steampipe-cloud-sdk-go v0.6.0
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-rc.21
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.23
github.com/xlab/treeprint v1.2.0
github.com/zclconf/go-cty v1.13.3
github.com/zclconf/go-cty-yaml v1.0.3

4
go.sum
View File

@@ -1098,8 +1098,8 @@ github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50 h1:
github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50/go.mod h1:vFnjEGDIIA/Lib7giyE4E9c50Lvl8j0S+7FVlAwDAVw=
github.com/turbot/steampipe-cloud-sdk-go v0.6.0 h1:ufAxOpKS1uq7eejuE5sfEu1+d7QAd0RBjl8Bn6+mIs8=
github.com/turbot/steampipe-cloud-sdk-go v0.6.0/go.mod h1:M42TMBdMim4bV1YTMxhKyzfSGSMo4CXUkm3wt9w7t1Y=
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-rc.21 h1:fSZOhf/zBkWHC97qIwscUASyJqEQXZ8qAGb+5JOygKY=
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-rc.21/go.mod h1:Np0X1Oj3JNTcuf9JmvWwHrCqc0UB4iJLmUlOkRwMCWw=
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.23 h1:1ONFhF7MK5hIjc+pqyS0zkj6gz1/8LrWhf2zQuo3suE=
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.23/go.mod h1:Np0X1Oj3JNTcuf9JmvWwHrCqc0UB4iJLmUlOkRwMCWw=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go v0.0.0-20180813092308-00b869d2f4a5/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=

View File

@@ -14,14 +14,15 @@ import (
)
type ConnectionWatcher struct {
fileWatcherErrorHandler func(error)
watcher *filewatcher.FileWatcher
onConnectionConfigChanged func(ConnectionConfigMap, LimiterMap)
fileWatcherErrorHandler func(error)
watcher *filewatcher.FileWatcher
// interface exposing the plugin manager functions we need
pluginManager pluginManager
}
func NewConnectionWatcher(onConnectionChanged func(ConnectionConfigMap, LimiterMap)) (*ConnectionWatcher, error) {
func NewConnectionWatcher(pluginManager pluginManager) (*ConnectionWatcher, error) {
w := &ConnectionWatcher{
onConnectionConfigChanged: onConnectionChanged,
pluginManager: pluginManager,
}
watcherOptions := &filewatcher.WatcherOptions{
@@ -51,7 +52,7 @@ func NewConnectionWatcher(onConnectionChanged func(ConnectionConfigMap, LimiterM
return w, nil
}
func (w *ConnectionWatcher) handleFileWatcherEvent(events []fsnotify.Event) {
func (w *ConnectionWatcher) handleFileWatcherEvent([]fsnotify.Event) {
defer func() {
if r := recover(); r != nil {
log.Printf("[WARN] ConnectionWatcher caught a panic: %s", helpers.ToError(r).Error())
@@ -79,7 +80,7 @@ func (w *ConnectionWatcher) handleFileWatcherEvent(events []fsnotify.Event) {
// convert config to format expected by plugin manager
// (plugin manager cannot reference steampipe config to avoid circular deps)
configMap := NewConnectionConfigMap(config.Connections)
w.onConnectionConfigChanged(configMap, config.Limiters)
w.pluginManager.OnConnectionConfigChanged(configMap, config.Limiters)
// The only configurations from GlobalConfig which have
// impact during Refresh are Database options and the Connections
@@ -99,7 +100,7 @@ func (w *ConnectionWatcher) handleFileWatcherEvent(events []fsnotify.Event) {
// call RefreshConnections asyncronously
// the RefreshConnections implements its own locking to ensure only a singler execution and a single queues execution
// TODO send warnings on warning_stream
go RefreshConnections(ctx)
go RefreshConnections(ctx, w.pluginManager)
log.Printf("[TRACE] File watch event done")
}

View File

@@ -0,0 +1,16 @@
package connection
import (
"github.com/jackc/pgx/v5/pgxpool"
"github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/shared"
)
type pluginManager interface {
shared.PluginManager
OnConnectionConfigChanged(ConnectionConfigMap, LimiterMap)
GetConnectionConfig() ConnectionConfigMap
HandlePluginLimiterChanges(limiterMap PluginLimiterMap) error
Pool() *pgxpool.Pool
ShouldFetchRateLimiterDefs() bool
LoadPluginRateLimiters(pluginConnectionMap map[string]string) (PluginLimiterMap, error)
}

View File

@@ -1,24 +1,34 @@
package connection
import "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
import (
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"golang.org/x/exp/maps"
)
// map of plugin short name to Limiter map for the plugin
type PluginLimiterMap map[string]LimiterMap
func (l PluginLimiterMap) Equals(other PluginLimiterMap) bool {
return maps.EqualFunc(l, other, func(m1, m2 LimiterMap) bool { return m1.Equals(m2) })
}
type LimiterMap map[string]*modconfig.RateLimiter
// GetPluginsWithChangedLimiters returns a list of plugins (short name)
// who have changed limiter configs (added/deleted/update)
func (l LimiterMap) GetPluginsWithChangedLimiters(other LimiterMap) map[string]struct{} {
var pluginsWithChangedLimiters = make(map[string]struct{})
for name, limiter := range l {
otherLimiter, ok := other[name]
if !ok || !limiter.Equals(otherLimiter) {
pluginsWithChangedLimiters[limiter.Plugin] = struct{}{}
}
}
for name, otherLimiter := range other {
if _, ok := l[name]; !ok {
pluginsWithChangedLimiters[otherLimiter.Plugin] = struct{}{}
}
}
return pluginsWithChangedLimiters
func (l LimiterMap) Equals(other LimiterMap) bool {
return maps.EqualFunc(l, other, func(l1, l2 *modconfig.RateLimiter) bool { return l1.Equals(l2) })
}
// ToPluginMap converts limiter map keyed by limiter name to a map of limiter maps keyed by plugin
func (l LimiterMap) ToPluginMap() PluginLimiterMap{
res := make(PluginLimiterMap)
for name, limiter := range l {
limitersForPlugin := res[limiter.Plugin]
if limitersForPlugin == nil {
limitersForPlugin = make(LimiterMap)
}
limitersForPlugin[name] = limiter
res[limiter.Plugin] = limitersForPlugin
}
return res
}

View File

@@ -6,6 +6,7 @@ import (
"sync"
"time"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/utils"
)
@@ -16,7 +17,14 @@ var executeLock sync.Mutex
// only allow one queued execution
var queueLock sync.Mutex
func RefreshConnections(ctx context.Context, forceUpdateConnectionNames ...string) *steampipeconfig.RefreshConnectionResult {
func RefreshConnections(ctx context.Context, pluginManager pluginManager, forceUpdateConnectionNames ...string) (res *steampipeconfig.RefreshConnectionResult) {
// TODO KAI if we, for example, access a nil map, this does not seem to catch it and startup hangs
defer func() {
if r := recover(); r != nil {
res = steampipeconfig.NewErrorRefreshConnectionResult(helpers.ToError(r))
}
}()
//time.Sleep(10 * time.Second)
utils.LogTime("RefreshConnections start")
defer utils.LogTime("RefreshConnections end")
@@ -45,12 +53,12 @@ func RefreshConnections(ctx context.Context, forceUpdateConnectionNames ...strin
log.Printf("[INFO] acquired refreshExecuteLock, released refreshQueueLock")
// now refresh connections
// package up all necessary data into a state object6
state, err := newRefreshConnectionState(ctx, forceUpdateConnectionNames)
// package up all necessary data into a state object
state, err := newRefreshConnectionState(ctx, pluginManager, forceUpdateConnectionNames)
if err != nil {
return steampipeconfig.NewErrorRefreshConnectionResult(err)
}
defer state.close()
// now do the refresh
state.refreshConnections(ctx)

View File

@@ -40,7 +40,6 @@ type refreshConnectionState struct {
tableUpdater *connectionStateTableUpdater
res *steampipeconfig.RefreshConnectionResult
forceUpdateConnectionNames []string
// properties for schema/comment cloning
exemplarSchemaMapMut sync.Mutex
@@ -49,36 +48,26 @@ type refreshConnectionState struct {
exemplarSchemaMap map[string]string
// if a plugin has an entry in this map, all connections schemas can be cloned from teh exemplar schema
exemplarCommentsMap map[string]string
pluginManager pluginManager
}
func newRefreshConnectionState(ctx context.Context, forceUpdateConnectionNames []string) (*refreshConnectionState, error) {
// create a connection pool to connection refresh
poolsize := 20
pool, err := db_local.CreateConnectionPool(ctx, &db_local.CreateDbOptions{Username: constants.DatabaseSuperUser}, poolsize)
if err != nil {
return nil, err
}
func newRefreshConnectionState(ctx context.Context, pluginManager pluginManager, forceUpdateConnectionNames []string) (*refreshConnectionState, error) {
pool := pluginManager.Pool()
// set user search path first
log.Printf("[INFO] setting up search path")
searchPath, err := db_local.SetUserSearchPath(ctx, pool)
if err != nil {
// note: close pool in case of error
pool.Close()
return nil, err
}
return &refreshConnectionState{
res := &refreshConnectionState{
pool: pool,
searchPath: searchPath,
forceUpdateConnectionNames: forceUpdateConnectionNames,
}, nil
}
func (s *refreshConnectionState) close() {
if s.pool != nil {
s.pool.Close()
pluginManager: pluginManager,
}
return res, nil
}
// RefreshConnections loads required connections from config
@@ -95,8 +84,15 @@ func (s *refreshConnectionState) refreshConnections(ctx context.Context) {
}()
log.Printf("[INFO] building connectionUpdates")
// determine any necessary connection updates
s.connectionUpdates, s.res = steampipeconfig.NewConnectionUpdates(ctx, s.pool, s.forceUpdateConnectionNames...)
var opts []steampipeconfig.ConnectionUpdatesOption
if len(s.forceUpdateConnectionNames) > 0 {
opts = append(opts, steampipeconfig.WithForceUpdate(s.forceUpdateConnectionNames))
}
// build a ConnectionUpdates struct
// this determine any necessary connection updates and starts any necessary plugins
s.connectionUpdates, s.res = steampipeconfig.NewConnectionUpdates(ctx, s.pool, s.pluginManager, opts...)
defer s.logRefreshConnectionResults()
// were we successful?
if s.res.Error != nil {
@@ -105,6 +101,20 @@ func (s *refreshConnectionState) refreshConnections(ctx context.Context) {
log.Printf("[INFO] created connectionUpdates")
// reload plugin rate limiter definitions for all plugins which are updated - the plugin will already be loaded
if len(s.connectionUpdates.PluginsWithUpdatedBinary) > 0 {
updatedPluginLimiters, err := s.pluginManager.LoadPluginRateLimiters(s.connectionUpdates.PluginsWithUpdatedBinary)
if err != nil {
s.res.Error = err
return
}
if len(updatedPluginLimiters) > 0 {
s.pluginManager.HandlePluginLimiterChanges(updatedPluginLimiters)
}
}
// delete the connection state file - it will be rewritten when we are complete
log.Printf("[INFO] deleting connections state file")
steampipeconfig.DeleteConnectionStateFile()
@@ -190,6 +200,7 @@ func (s *refreshConnectionState) logRefreshConnectionResults() {
}
func (s *refreshConnectionState) executeConnectionQueries(ctx context.Context) {
// TODO WHY? WHY NOT FROM ourselves
// retrieve updates from the table updater
connectionUpdates := s.tableUpdater.updates
@@ -332,7 +343,7 @@ func (s *refreshConnectionState) executeUpdateQueries(ctx context.Context) {
return
}
// convert map upd update sets (used for dynamic schemas) to an array of the underlying connection states
// convert map update sets (used for dynamic schemas) to an array of the underlying connection states
func updateSetMapToArray(updateSetMap map[string][]*steampipeconfig.ConnectionState) []*steampipeconfig.ConnectionState {
var res []*steampipeconfig.ConnectionState
for _, updates := range updateSetMap {
@@ -482,6 +493,7 @@ func (s *refreshConnectionState) executeUpdateQuery(ctx context.Context, sql, co
// update the state table
//(the transaction will be aborted - create a connection for the update)
if conn, poolErr := s.pool.Acquire(ctx); poolErr == nil {
defer conn.Release()
if statusErr := s.tableUpdater.onConnectionError(ctx, conn.Conn(), connectionName, err); statusErr != nil {
// NOTE: do not return the error - unless we failed to update the connection state table
return error_helpers.CombineErrorsWithPrefix(fmt.Sprintf("failed to update connection %s and failed to update connection_state table", connectionName), err, statusErr)
@@ -617,6 +629,7 @@ func (s *refreshConnectionState) executeCommentQuery(ctx context.Context, sql, c
// update the state table
//(the transaction will be aborted - create a connection for the update)
if conn, poolErr := s.pool.Acquire(ctx); poolErr == nil {
defer conn.Release()
if statusErr := s.tableUpdater.onConnectionError(ctx, conn.Conn(), connectionName, err); statusErr != nil {
// NOTE: do not return the error - unless we failed to update the connection state table
return error_helpers.CombineErrorsWithPrefix(fmt.Sprintf("failed to update connection %s and failed to update connection_state table", connectionName), err, statusErr)
@@ -711,6 +724,7 @@ func (s *refreshConnectionState) executeDeleteQuery(ctx context.Context, connect
// update the state table
//(the transaction will be aborted - create a connection for the update)
if conn, poolErr := s.pool.Acquire(ctx); poolErr == nil {
defer conn.Release()
if statusErr := s.tableUpdater.onConnectionError(ctx, conn.Conn(), connectionName, err); statusErr != nil {
// NOTE: do not return the error - unless we failed to update the connection state table
return error_helpers.CombineErrorsWithPrefix(fmt.Sprintf("failed to update connection %s and failed to update connection_state table", connectionName), err, statusErr)

View File

@@ -11,6 +11,7 @@ import (
"github.com/turbot/steampipe/pkg/connection_state"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/utils"
)
@@ -141,6 +142,18 @@ INNER JOIN
}
func setupInternal(ctx context.Context, conn *pgx.Conn) error {
statushooks.SetStatus(ctx, "Dropping legacy schema")
if err := dropLegacyInternalSchema(ctx, conn); err != nil {
// do not fail
// worst case scenario is that we have a couple of extra schema
// these won't be in the search path anyway
log.Println("[INFO] failed to drop legacy 'internal' schema", err)
}
// setup internal schema
// this includes setting the state of all connections in the connection_state table to pending
statushooks.SetStatus(ctx, "Setting up internal schema")
utils.LogTime("db_local.setupInternal start")
defer utils.LogTime("db_local.setupInternal end")

View File

@@ -103,41 +103,56 @@ func StartServices(ctx context.Context, listenAddresses []string, port int, invo
res.Status = ServiceAlreadyRunning
}
// start plugin manager if needed
res = ensurePluginManager(res)
if res.Status == ServiceStarted {
// execute post startup setup
if err := postServiceStart(ctx, res); err != nil {
// NOTE do not update res.Status - this will be done by defer block
res.Error = err
return res
}
// start plugin manager if needed
pluginManager, pluginManagerState, err := ensurePluginManager()
res.PluginManagerState = pluginManagerState
if err != nil {
res.Error = err
return res
}
// ask the plugin manager to refresh connections
// this is executed asyncronously by the plugin manager
pluginManager.RefreshConnections(&pb.RefreshConnectionsRequest{})
statushooks.SetStatus(ctx, "Service startup complete")
}
return res
}
func ensurePluginManager(res *StartResult) *StartResult {
func ensurePluginManager() (*pluginmanager.PluginManagerClient, *pluginmanager.PluginManagerState, error) {
// start the plugin manager if needed
res.PluginManagerState, res.Error = pluginmanager.LoadPluginManagerState()
if res.Error != nil {
res.Status = ServiceFailedToStart
return res
state, err := pluginmanager.LoadPluginManagerState()
if err != nil {
return nil, nil, err
}
if !res.PluginManagerState.Running {
if !state.Running {
// get the location of the currently running steampipe process
executable, err := os.Executable()
if err != nil {
log.Printf("[WARN] plugin manager start() - failed to get steampipe executable path: %s", err)
return res.SetError(err)
return nil, nil, err
}
if err := pluginmanager.StartNewInstance(executable); err != nil {
if state, err = pluginmanager.StartNewInstance(executable); err != nil {
log.Printf("[WARN] StartServices plugin manager failed to start: %s", err)
return res.SetError(err)
return nil, nil, err
}
// set status to service started as started plugin manager
res.Status = ServiceStarted
}
return res
client, err := pluginmanager.NewPluginManagerClient(state)
if err != nil {
return nil, state, err
}
return client, state, nil
}
func postServiceStart(ctx context.Context, res *StartResult) error {
@@ -147,20 +162,12 @@ func postServiceStart(ctx context.Context, res *StartResult) error {
}
defer conn.Close(ctx)
statushooks.SetStatus(ctx, "Dropping legacy schema")
if err := dropLegacyInternalSchema(ctx, conn); err != nil {
// do not fail
// worst case scenario is that we have a couple of extra schema
// these won't be in the search path anyway
log.Println("[INFO] failed to drop legacy 'internal' schema", err)
}
// setup internal schema
// this includes setting the state of all connections in the connection_state table to pending
statushooks.SetStatus(ctx, "Setting up internal schema")
if err := setupInternal(ctx, conn); err != nil {
return err
}
statushooks.SetStatus(ctx, "Initialize steampipe_connection_state table")
// ensure connection state table contains entries for all connections in connection config
// (this is to allow for the race condition between polling connection state and calling refresh connections,
// which does not update the connection_state with added connections until it has built the ConnectionUpdates
@@ -168,6 +175,7 @@ func postServiceStart(ctx context.Context, res *StartResult) error {
return err
}
statushooks.SetStatus(ctx, "Create steampipe_server_settings table")
// create the server settings table
// this table contains configuration that this instance of the service
// is booting with
@@ -189,17 +197,7 @@ func postServiceStart(ctx context.Context, res *StartResult) error {
return sperr.WrapWithMessage(err, "failed to migrate db public schema")
}
// call initial refresh connections
// get plugin manager client
pluginManager, err := pluginmanager.GetPluginManager()
if err != nil {
return err
}
// ask the plugin manager to refresh connections
// this is executed asyncronously by the plugin manager
pluginManager.RefreshConnections(&pb.RefreshConnectionsRequest{})
statushooks.SetStatus(ctx, "Service startup complete")
statushooks.SetStatus(ctx, "Call initial refresh connections")
return nil
}

View File

@@ -165,12 +165,16 @@ func StopServices(ctx context.Context, force bool, invoker constants.Invoker) (s
utils.LogTime("db_local.StopDB end")
}()
log.Println("[INFO] shutting down plugin manager")
// stop the plugin manager
// this means it may be stopped even if we fail to stop the service - that is ok - we will restart it if needed
pluginManagerStopError := pluginmanager.Stop()
log.Println("[INFO] shut down plugin manager")
// stop the DB Service
log.Println("[INFO] stopping DB Service")
stopResult, dbStopError := stopDBService(ctx, force)
log.Println("[INFO] stopped DB Service")
return stopResult, error_helpers.CombineErrors(dbStopError, pluginManagerStopError)
}

View File

@@ -103,7 +103,7 @@ func (c *InteractiveClient) readInitDataStream(ctx context.Context) {
if c.initData.Result.Error != nil {
return
}
statushooks.SetStatus(ctx, "Completing initialization…")
statushooks.SetStatus(ctx, "Load plugin schemas…")
// fetch the schema
// TODO make this async https://github.com/turbot/steampipe/issues/3400
// NOTE: we would like to do this asyncronously, but we are currently limited to a single Db conneciton in our
@@ -125,6 +125,7 @@ func (c *InteractiveClient) readInitDataStream(ctx context.Context) {
}
}
statushooks.SetStatus(ctx, "Start notifications listener…")
log.Printf("[TRACE] Start notifications listener")
// create a cancellation context used to cancel the listen thread when we exit
@@ -132,6 +133,8 @@ func (c *InteractiveClient) readInitDataStream(ctx context.Context) {
//nolint:golint,errcheck // worst case is autocomplete isn't update - not a failure
go c.listenToPgNotifications(listenCtx)
c.cancelNotificationListener = cancel
statushooks.SetStatus(ctx, "Completing initialization…")
}
func (c *InteractiveClient) workspaceWatcherErrorHandler(ctx context.Context, err error) {

View File

@@ -17,12 +17,12 @@ import (
)
// StartNewInstance loads the plugin manager state, stops any previous instance and instantiates a new plugin manager
func StartNewInstance(steampipeExecutablePath string) error {
func StartNewInstance(steampipeExecutablePath string) (*PluginManagerState, error) {
// try to load the plugin manager state
state, err := LoadPluginManagerState()
if err != nil {
log.Printf("[WARN] plugin manager StartNewInstance() - load state failed: %s", err)
return err
return nil, err
}
if state.Running {
@@ -30,7 +30,7 @@ func StartNewInstance(steampipeExecutablePath string) error {
// stop the current instance
if err := stop(state); err != nil {
log.Printf("[WARN] failed to stop previous instance of plugin manager: %s", err)
return err
return nil, err
}
}
return start(steampipeExecutablePath)
@@ -40,7 +40,7 @@ func StartNewInstance(steampipeExecutablePath string) error {
// we need to be provided with the exe path as we have no way of knowing where the steampipe exe it
// when the plugin mananager is first started by steampipe, we derive the exe path from the running process and
// store it in the plugin manager state file - then if the fdw needs to start the plugin manager it knows how to
func start(steampipeExecutablePath string) error {
func start(steampipeExecutablePath string) (*PluginManagerState, error) {
// note: we assume the install dir has been assigned to file_paths.SteampipeDir
// - this is done both by the FDW and Steampipe
pluginManagerCmd := exec.Command(steampipeExecutablePath,
@@ -52,7 +52,7 @@ func start(steampipeExecutablePath string) error {
}
// discard logging from the plugin manager client (plugin manager logs will still flow through to the log file
// as this is set up in the pluginb manager)
// as this is set up in the plugin manager)
logger := logging.NewLogger(&hclog.LoggerOptions{Name: "plugin", Output: io.Discard})
// launch the plugin manager the plugin process
@@ -66,7 +66,7 @@ func start(steampipeExecutablePath string) error {
if _, err := client.Start(); err != nil {
log.Printf("[WARN] plugin manager start() failed to start GRPC client for plugin manager: %s", err)
return err
return nil, err
}
// create a plugin manager state.
@@ -75,7 +75,10 @@ func start(steampipeExecutablePath string) error {
log.Printf("[TRACE] start: started plugin manager, pid %d", state.Pid)
// now save the state
return state.Save()
if err := state.Save(); err != nil {
return nil, err
}
return state, nil
}
// Stop loads the plugin manager state and if a running instance is found, stop it
@@ -136,22 +139,23 @@ func getPluginManager(startIfNeeded bool) (pluginshared.PluginManager, error) {
if state.Executable == "" {
return nil, fmt.Errorf("plugin manager is not running and there is no state file")
}
// if the plugin manager is not running, it must have crashed/terminated
if !state.Running {
log.Printf("[TRACE] GetPluginManager called but plugin manager not running")
// is we are not already recursing, start the plugin manager then recurse back into this function
if startIfNeeded {
log.Printf("[TRACE] calling StartNewInstance()")
// start the plugin manager
if err := start(state.Executable); err != nil {
return nil, err
}
// recurse in, setting startIfNeeded to false to avoid further recursion on failure
return getPluginManager(false)
}
// not retrying - just fail
return nil, fmt.Errorf("plugin manager is not running")
if state.Running {
log.Printf("[TRACE] plugin manager is running - returning client")
return NewPluginManagerClient(state)
}
log.Printf("[TRACE] plugin manager is running - returning client")
return NewPluginManagerClient(state)
// if the plugin manager is not running, it must have crashed/terminated
log.Printf("[TRACE] GetPluginManager called but plugin manager not running")
// is we are not already recursing, start the plugin manager then recurse back into this function
if startIfNeeded {
log.Printf("[TRACE] calling StartNewInstance()")
// start the plugin manager
if _, err := start(state.Executable); err != nil {
return nil, err
}
// recurse in, setting startIfNeeded to false to avoid further recursion on failure
return getPluginManager(false)
}
// not retrying - just fail
return nil, fmt.Errorf("plugin manager is not running")
}

View File

@@ -379,7 +379,7 @@ type SupportedOperations struct {
MultipleConnections bool `protobuf:"varint,2,opt,name=multiple_connections,json=multipleConnections,proto3" json:"multiple_connections,omitempty"`
MessageStream bool `protobuf:"varint,3,opt,name=message_stream,json=messageStream,proto3" json:"message_stream,omitempty"`
SetCacheOptions bool `protobuf:"varint,4,opt,name=set_cache_options,json=setCacheOptions,proto3" json:"set_cache_options,omitempty"`
SetRateLimiters bool `protobuf:"varint,5,opt,name=set_rate_limiters,json=setRateLimiters,proto3" json:"set_rate_limiters,omitempty"`
RateLimiters bool `protobuf:"varint,5,opt,name=rate_limiters,json=rateLimiters,proto3" json:"rate_limiters,omitempty"`
}
func (x *SupportedOperations) Reset() {
@@ -442,9 +442,9 @@ func (x *SupportedOperations) GetSetCacheOptions() bool {
return false
}
func (x *SupportedOperations) GetSetRateLimiters() bool {
func (x *SupportedOperations) GetRateLimiters() bool {
if x != nil {
return x.SetRateLimiters
return x.RateLimiters
}
return false
}
@@ -554,7 +554,7 @@ var file_plugin_manager_proto_rawDesc = []byte{
0x73, 0x12, 0x20, 0x0a, 0x0b, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73,
0x18, 0x06, 0x20, 0x03, 0x28, 0x09, 0x52, 0x0b, 0x63, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69,
0x6f, 0x6e, 0x73, 0x12, 0x16, 0x0a, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x18, 0x07, 0x20,
0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x22, 0xe8, 0x01, 0x0a, 0x13,
0x01, 0x28, 0x09, 0x52, 0x06, 0x70, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x22, 0xe1, 0x01, 0x0a, 0x13,
0x53, 0x75, 0x70, 0x70, 0x6f, 0x72, 0x74, 0x65, 0x64, 0x4f, 0x70, 0x65, 0x72, 0x61, 0x74, 0x69,
0x6f, 0x6e, 0x73, 0x12, 0x1f, 0x0a, 0x0b, 0x71, 0x75, 0x65, 0x72, 0x79, 0x5f, 0x63, 0x61, 0x63,
0x68, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0a, 0x71, 0x75, 0x65, 0x72, 0x79, 0x43,
@@ -566,29 +566,28 @@ var file_plugin_manager_proto_rawDesc = []byte{
0x0d, 0x6d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x53, 0x74, 0x72, 0x65, 0x61, 0x6d, 0x12, 0x2a,
0x0a, 0x11, 0x73, 0x65, 0x74, 0x5f, 0x63, 0x61, 0x63, 0x68, 0x65, 0x5f, 0x6f, 0x70, 0x74, 0x69,
0x6f, 0x6e, 0x73, 0x18, 0x04, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x73, 0x65, 0x74, 0x43, 0x61,
0x63, 0x68, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x2a, 0x0a, 0x11, 0x73, 0x65,
0x74, 0x5f, 0x72, 0x61, 0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x73, 0x18,
0x05, 0x20, 0x01, 0x28, 0x08, 0x52, 0x0f, 0x73, 0x65, 0x74, 0x52, 0x61, 0x74, 0x65, 0x4c, 0x69,
0x6d, 0x69, 0x74, 0x65, 0x72, 0x73, 0x22, 0x3d, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x41, 0x64, 0x64,
0x72, 0x12, 0x18, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x01, 0x20, 0x01,
0x28, 0x09, 0x52, 0x07, 0x4e, 0x65, 0x74, 0x77, 0x6f, 0x72, 0x6b, 0x12, 0x18, 0x0a, 0x07, 0x41,
0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x41, 0x64,
0x64, 0x72, 0x65, 0x73, 0x73, 0x32, 0xdb, 0x01, 0x0a, 0x0d, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e,
0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72, 0x12, 0x2e, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x11,
0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x12, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73,
0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x5b, 0x0a, 0x12, 0x52, 0x65, 0x66, 0x72, 0x65,
0x73, 0x68, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x20, 0x2e,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x43, 0x6f, 0x6e,
0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a,
0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x43,
0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e,
0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a, 0x08, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e,
0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77,
0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73,
0x65, 0x22, 0x00, 0x42, 0x09, 0x5a, 0x07, 0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06,
0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
0x63, 0x68, 0x65, 0x4f, 0x70, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x23, 0x0a, 0x0d, 0x72, 0x61,
0x74, 0x65, 0x5f, 0x6c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x73, 0x18, 0x05, 0x20, 0x01, 0x28,
0x08, 0x52, 0x0c, 0x72, 0x61, 0x74, 0x65, 0x4c, 0x69, 0x6d, 0x69, 0x74, 0x65, 0x72, 0x73, 0x22,
0x3d, 0x0a, 0x07, 0x4e, 0x65, 0x74, 0x41, 0x64, 0x64, 0x72, 0x12, 0x18, 0x0a, 0x07, 0x4e, 0x65,
0x74, 0x77, 0x6f, 0x72, 0x6b, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x4e, 0x65, 0x74,
0x77, 0x6f, 0x72, 0x6b, 0x12, 0x18, 0x0a, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x18,
0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x07, 0x41, 0x64, 0x64, 0x72, 0x65, 0x73, 0x73, 0x32, 0xdb,
0x01, 0x0a, 0x0d, 0x50, 0x6c, 0x75, 0x67, 0x69, 0x6e, 0x4d, 0x61, 0x6e, 0x61, 0x67, 0x65, 0x72,
0x12, 0x2e, 0x0a, 0x03, 0x47, 0x65, 0x74, 0x12, 0x11, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e,
0x47, 0x65, 0x74, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x12, 0x2e, 0x70, 0x72, 0x6f,
0x74, 0x6f, 0x2e, 0x47, 0x65, 0x74, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00,
0x12, 0x5b, 0x0a, 0x12, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x43, 0x6f, 0x6e, 0x6e, 0x65,
0x63, 0x74, 0x69, 0x6f, 0x6e, 0x73, 0x12, 0x20, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x52,
0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69, 0x6f, 0x6e,
0x73, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x1a, 0x21, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f,
0x2e, 0x52, 0x65, 0x66, 0x72, 0x65, 0x73, 0x68, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x69,
0x6f, 0x6e, 0x73, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x12, 0x3d, 0x0a,
0x08, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x12, 0x16, 0x2e, 0x70, 0x72, 0x6f, 0x74,
0x6f, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f, 0x77, 0x6e, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73,
0x74, 0x1a, 0x17, 0x2e, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x2e, 0x53, 0x68, 0x75, 0x74, 0x64, 0x6f,
0x77, 0x6e, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x22, 0x00, 0x42, 0x09, 0x5a, 0x07,
0x2e, 0x3b, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33,
}
var (

View File

@@ -45,7 +45,7 @@ message SupportedOperations {
bool multiple_connections = 2;
bool message_stream = 3;
bool set_cache_options = 4;
bool set_rate_limiters = 5;
bool rate_limiters = 5;
}
message NetAddr {

View File

@@ -10,6 +10,6 @@ func SupportedOperationsFromSdk(s *sdkproto.GetSupportedOperationsResponse) *Sup
MultipleConnections: s.MultipleConnections,
MessageStream: s.MessageStream,
SetCacheOptions: s.SetCacheOptions,
SetRateLimiters: s.SetRateLimiters,
RateLimiters: s.RateLimiters,
}
}

View File

@@ -14,6 +14,7 @@ import (
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/sethvargo/go-retry"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
@@ -29,19 +30,9 @@ import (
pb "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/proto"
pluginshared "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/shared"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
)
type runningPlugin struct {
pluginName string
client *plugin.Client
reattach *pb.ReattachConfig
initialized chan struct{}
failed chan struct{}
error error
}
// PluginManager is the implementation of grpc.PluginManager
type PluginManager struct {
pb.UnimplementedPluginManagerServer
@@ -68,40 +59,54 @@ type PluginManager struct {
logger hclog.Logger
messageServer *PluginMessageServer
// map of rater limiters, keyed by name
// todo instead have map of limiters per plugin???
limiters connection.LimiterMap
// map of user configured rate limiter maps, keyed by plugin short name
// NOTE: this is populated from config
userLimiters connection.PluginLimiterMap
// map of plugin configured rate limiter maps, keyed by plugin short name
// NOTE: if this is nil, that means the steampipe_rate_limiter tables has not been populalated yet -
// the first time we refresh connections we must load all plugins and fetch their rate limiter defs
pluginLimiters connection.PluginLimiterMap
// map of plugin short name to long name
pluginNameMap map[string]string
pluginShortToLongNameMap map[string]string
pluginLongToShortNameMap map[string]string
pool *pgxpool.Pool
}
func NewPluginManager(ctx context.Context, connectionConfig map[string]*sdkproto.ConnectionConfig, limiters map[string]*modconfig.RateLimiter, logger hclog.Logger) (*PluginManager, error) {
func NewPluginManager(ctx context.Context, connectionConfig map[string]*sdkproto.ConnectionConfig, userLimiters connection.LimiterMap, logger hclog.Logger) (*PluginManager, error) {
log.Printf("[INFO] NewPluginManager")
pluginManager := &PluginManager{
logger: logger,
runningPluginMap: make(map[string]*runningPlugin),
connectionConfigMap: connectionConfig,
limiters: limiters,
pluginNameMap: make(map[string]string),
userLimiters: userLimiters.ToPluginMap(),
pluginShortToLongNameMap: make(map[string]string),
pluginLongToShortNameMap: make(map[string]string),
}
messageServer, err := NewPluginMessageServer(pluginManager)
if err != nil {
return nil, err
}
pluginManager.messageServer = messageServer
pluginManager.messageServer = &PluginMessageServer{pluginManager: pluginManager}
// create and populate the rate limiter table
if err := pluginManager.refreshRateLimiterTable(ctx); err != nil {
// TODO better handle plugin manager startup failures
log.Println("[WARN] could not refresh rate limiter table", err)
return nil, err
}
//time.Sleep(10 * time.Second)
// populate plugin connection config map
pluginManager.populatePluginConnectionConfigs()
// determine cache size for each plugin
pluginManager.setPluginCacheSizeMap()
// create a connection pool to connection refresh
// in testing, a size of 20 seemed optimal
poolsize := 20
pool, err := db_local.CreateConnectionPool(ctx, &db_local.CreateDbOptions{Username: constants.DatabaseSuperUser}, poolsize)
if err != nil {
return nil, err
}
pluginManager.pool = pool
if err := pluginManager.initialiseRateLimiterDefs(ctx); err != nil {
return nil, err
}
return pluginManager, nil
}
@@ -128,9 +133,8 @@ func (m *PluginManager) Get(req *pb.GetRequest) (*pb.GetResponse, error) {
ReattachMap: make(map[string]*pb.ReattachConfig),
FailureMap: make(map[string]string),
}
// TODO validate we have config for this plugin
// build map of plugins to start, and also a lookup of required connecitons
// build a map of plugins to connection config for requested connections, and a lookup of the requested connections
plugins, requestedConnectionsLookup, err := m.buildRequiredPluginMap(req)
if err != nil {
return resp, err
@@ -160,8 +164,8 @@ func (m *PluginManager) Get(req *pb.GetRequest) (*pb.GetResponse, error) {
return resp, nil
}
// build a map of plugins to connection config for requested connections, and a lookup of the requested connections
func (m *PluginManager) buildRequiredPluginMap(req *pb.GetRequest) (map[string][]*sdkproto.ConnectionConfig, map[string]struct{}, error) {
// build a map of plugins required
var plugins = make(map[string][]*sdkproto.ConnectionConfig)
// also make a map of target connections - used when assigning resuts to the response
var requestedConnectionsLookup = make(map[string]struct{}, len(req.Connections))
@@ -184,7 +188,10 @@ func (m *PluginManager) buildRequiredPluginMap(req *pb.GetRequest) (map[string][
return plugins, requestedConnectionsLookup, nil
}
// Refresh connections asyncronously
func (m *PluginManager) Pool() *pgxpool.Pool {
return m.pool
}
func (m *PluginManager) RefreshConnections(*pb.RefreshConnectionsRequest) (*pb.RefreshConnectionsResponse, error) {
resp := &pb.RefreshConnectionsResponse{}
go m.doRefresh()
@@ -192,7 +199,7 @@ func (m *PluginManager) RefreshConnections(*pb.RefreshConnectionsRequest) (*pb.R
}
func (m *PluginManager) doRefresh() {
refreshResult := connection.RefreshConnections(context.Background())
refreshResult := connection.RefreshConnections(context.Background(), m)
if refreshResult.Error != nil {
// TODO send errors and warnings back to CLI from plugin manager - https://github.com/turbot/steampipe/issues/3603
log.Printf("[WARN] RefreshConnections failed with error: %s", refreshResult.Error.Error())
@@ -211,12 +218,17 @@ func (m *PluginManager) OnConnectionConfigChanged(configMap connection.Connectio
if err != nil {
log.Printf("[WARN] handleConnectionConfigChanges failed: %s", err.Error())
}
err = m.handleLimiterChanges(limiters)
err = m.handleUserLimiterChanges(limiters)
if err != nil {
log.Printf("[WARN] handleLimiterChanges failed: %s", err.Error())
log.Printf("[WARN] handleUserLimiterChanges failed: %s", err.Error())
}
}
func (m *PluginManager) GetConnectionConfig() connection.ConnectionConfigMap {
return m.connectionConfigMap
}
func (m *PluginManager) Shutdown(*pb.ShutdownRequest) (resp *pb.ShutdownResponse, err error) {
log.Printf("[INFO] PluginManager Shutdown")
defer log.Printf("[INFO] PluginManager Shutdown complete")
@@ -241,6 +253,13 @@ func (m *PluginManager) Shutdown(*pb.ShutdownRequest) (resp *pb.ShutdownResponse
m.killPlugin(p)
}
log.Printf("[INFO] PluginManager closing pool")
// close our pool
// log.Println("[INFO] PluginManager pool stats:", m.pool.Stat().TotalConns(), m.pool.Stat().IdleConns())
// m.pool.Close()
// log.Printf("[INFO] PluginManager pool closed")
return &pb.ShutdownResponse{}, nil
}
@@ -254,53 +273,6 @@ func (m *PluginManager) killPlugin(p *runningPlugin) {
p.client.Kill()
}
func (m *PluginManager) handleLimiterChanges(newLimiters connection.LimiterMap) error {
pluginsWithChangedLimiters := m.limiters.GetPluginsWithChangedLimiters(newLimiters)
if len(pluginsWithChangedLimiters) == 0 {
return nil
}
// update stored limiters to the new map
m.limiters = newLimiters
// update the rate_limiters table
if err := m.refreshRateLimiterTable(context.Background()); err != nil {
log.Println("[WARN] could not refresh rate limiter table", err)
}
// now update the plugins
for p := range pluginsWithChangedLimiters {
// get running plugin for this plugin
// if plugin is not running we have nothing to do
longName, ok := m.pluginNameMap[p]
if !ok {
log.Printf("[INFO] handleLimiterChanges: plugin %s is not currently running - ignoring", p)
continue
}
runningPlugin, ok := m.runningPluginMap[longName]
if !ok {
log.Printf("[INFO] handleLimiterChanges: plugin %s is not currently running - ignoring", p)
continue
}
if !runningPlugin.reattach.SupportedOperations.SetRateLimiters {
log.Printf("[INFO] handleLimiterChanges: plugin %s does not support setting rate limit - ignoring", p)
continue
}
pluginClient, err := sdkgrpc.NewPluginClient(runningPlugin.client, longName)
if err != nil {
return sperr.WrapWithMessage(err, "failed to create a plugin client when updating the rate limiter for plugin '%s'", longName)
}
if err := m.setRateLimiters(p, pluginClient); err != nil {
return sperr.WrapWithMessage(err, "failed to update rate limiters for plugin '%s'", longName)
}
}
return nil
}
func (m *PluginManager) ensurePlugin(pluginName string, connectionConfigs []*sdkproto.ConnectionConfig, req *pb.GetRequest) (reattach *pb.ReattachConfig, err error) {
/* call startPluginIfNeeded within a retry block
we will retry if:
@@ -517,9 +489,6 @@ func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.Connectio
pluginName := exemplarConnectionConfig.Plugin
pluginShortName := exemplarConnectionConfig.PluginShortName
// also store name mapping
m.pluginNameMap[pluginShortName] = pluginName
log.Printf("[INFO] initializePlugin %s pid %d (%p)", pluginName, client.ReattachConfig().Pid, req)
// build a client
@@ -564,7 +533,7 @@ func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.Connectio
}
// if this plugin supports setting cache options, do so
if supportedOperations.SetRateLimiters {
if supportedOperations.RateLimiters {
err = m.setRateLimiters(pluginShortName, pluginClient)
if err != nil {
log.Printf("[WARN] failed to set rate limiters for %s: %s", pluginName, err.Error())
@@ -601,6 +570,9 @@ func (m *PluginManager) populatePluginConnectionConfigs() {
m.pluginConnectionConfigMap = make(map[string][]*sdkproto.ConnectionConfig)
for _, config := range m.connectionConfigMap {
m.pluginConnectionConfigMap[config.Plugin] = append(m.pluginConnectionConfigMap[config.Plugin], config)
// populate plugin name map
m.pluginShortToLongNameMap[config.PluginShortName] = config.Plugin
m.pluginLongToShortNameMap[config.Plugin] = config.PluginShortName
}
}
@@ -735,11 +707,8 @@ func (m *PluginManager) setRateLimiters(pluginName string, pluginClient *sdkgrpc
log.Printf("[INFO] setRateLimiters for plugin '%s'", pluginName)
var defs []*sdkproto.RateLimiterDefinition
for _, l := range m.limiters {
// only add limiters for this plugin
if l.Plugin == pluginName {
defs = append(defs, l.AsProto())
}
for _, l := range m.userLimiters[pluginName] {
defs = append(defs, l.AsProto())
}
req := &sdkproto.SetRateLimitersRequest{Definitions: defs}
@@ -753,7 +722,7 @@ func (m *PluginManager) setRateLimiters(pluginName string, pluginClient *sdkgrpc
func (m *PluginManager) updateConnectionSchema(ctx context.Context, connectionName string) {
log.Printf("[TRACE] updateConnectionSchema connection %s", connectionName)
refreshResult := connection.RefreshConnections(ctx, connectionName)
refreshResult := connection.RefreshConnections(ctx, m, connectionName)
if refreshResult.Error != nil {
log.Printf("[TRACE] error refreshing connections: %s", refreshResult.Error)
return
@@ -793,12 +762,21 @@ func (m *PluginManager) handleStartFailure(err error) error {
}
// if this was a panic during startup, reraise an error with the panic string
if strings.Contains(pluginMessage, sdkplugin.StartupPanicMessage) {
if strings.Contains(pluginMessage, sdkplugin.PluginStartupFailureMessage) {
return fmt.Errorf(pluginMessage)
}
return err
}
// getPluginExemplarConnections returns a map of keyed by plugin short name with the value an exemplar connection
func (m *PluginManager) getPluginExemplarConnections() map[string]string {
res := make(map[string]string)
for _, c := range m.connectionConfigMap {
res[c.PluginShortName] = c.Connection
}
return res
}
func nonAggregatorConnectionCount(connections []*sdkproto.ConnectionConfig) int {
res := 0
for _, c := range connections {

View File

@@ -0,0 +1,345 @@
package pluginmanager_service
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/pkg/connection"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
pb "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/proto"
"github.com/turbot/steampipe/pkg/rate_limiters"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"golang.org/x/exp/maps"
"log"
)
func (m *PluginManager) ShouldFetchRateLimiterDefs() bool {
return m.pluginLimiters == nil
}
// HandlePluginLimiterChanges responds to changes in the plugin rate limiter defintions
// update the stored limiters, refrresh the rate limiter table and call `setRateLimiters`
// for all plugins with changed limiters
func (m *PluginManager) HandlePluginLimiterChanges(newLimiters connection.PluginLimiterMap) error {
if m.pluginLimiters == nil {
// this must be the first time we have poplkated them
m.pluginLimiters = make(connection.PluginLimiterMap)
}
for plugin, limitersForPlugin := range newLimiters {
m.pluginLimiters[plugin] = limitersForPlugin
}
// update the rate_limiters table
if err := m.refreshRateLimiterTable(context.Background()); err != nil {
log.Println("[WARN] could not refresh rate limiter table", err)
}
return nil
}
func (m *PluginManager) refreshRateLimiterTable(ctx context.Context) error {
// if we have not yet populated the rate limiter table, do nothing
if m.pluginLimiters == nil {
return nil
}
// update the status of the plugin rate limiters (determine which are overriden and set state accordingly)
m.updateRateLimiterStatus()
queries := []db_common.QueryWithArgs{
rate_limiters.DropRateLimiterTable(),
rate_limiters.CreateRateLimiterTable(),
rate_limiters.GrantsOnRateLimiterTable(),
}
for _, limitersForPlugin := range m.pluginLimiters {
for _, l := range limitersForPlugin {
queries = append(queries, rate_limiters.GetPopulateRateLimiterSql(l))
}
}
for _, limitersForPlugin := range m.userLimiters {
for _, l := range limitersForPlugin {
queries = append(queries, rate_limiters.GetPopulateRateLimiterSql(l))
}
}
conn, err := m.pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
_, err = db_local.ExecuteSqlWithArgsInTransaction(ctx, conn.Conn(), queries...)
return err
}
// respond to changes in the HCL rate limiter config
// update the stored limiters, refrresh the rate limiter table and call `setRateLimiters`
// for all plugins with changed limiters
func (m *PluginManager) handleUserLimiterChanges(newLimiters connection.LimiterMap) error {
newLimiterPluginMap := newLimiters.ToPluginMap()
pluginsWithChangedLimiters := m.getPluginsWithChangedLimiters(newLimiterPluginMap)
if len(pluginsWithChangedLimiters) == 0 {
return nil
}
// update stored limiters to the new map
m.userLimiters = newLimiterPluginMap
// update the rate_limiters table
if err := m.refreshRateLimiterTable(context.Background()); err != nil {
log.Println("[WARN] could not refresh rate limiter table", err)
}
// now update the plugins - call setRateLimiters for any plugin witrh updated user limiters
for p := range pluginsWithChangedLimiters {
if err := m.setRateLimitersForPlugin(p); err != nil {
return err
}
}
return nil
}
func (m *PluginManager) setRateLimitersForPlugin(pluginShortName string) error {
// get running plugin for this plugin
// if plugin is not running we have nothing to do
longName, ok := m.pluginShortToLongNameMap[pluginShortName]
if !ok {
log.Printf("[INFO] handleUserLimiterChanges: plugin %s is not currently running - ignoring", pluginShortName)
return nil
}
runningPlugin, ok := m.runningPluginMap[longName]
if !ok {
log.Printf("[INFO] handleUserLimiterChanges: plugin %s is not currently running - ignoring", pluginShortName)
return nil
}
if !runningPlugin.reattach.SupportedOperations.RateLimiters {
log.Printf("[INFO] handleUserLimiterChanges: plugin %s does not support setting rate limit - ignoring", pluginShortName)
return nil
}
pluginClient, err := sdkgrpc.NewPluginClient(runningPlugin.client, longName)
if err != nil {
return sperr.WrapWithMessage(err, "failed to create a plugin client when updating the rate limiter for plugin '%s'", longName)
}
if err := m.setRateLimiters(pluginShortName, pluginClient); err != nil {
return sperr.WrapWithMessage(err, "failed to update rate limiters for plugin '%s'", longName)
}
return nil
}
func (m *PluginManager) getPluginsWithChangedLimiters(newLimiters connection.PluginLimiterMap) map[string]struct{} {
var pluginsWithChangedLimiters = make(map[string]struct{})
for plugin, limitersForPlugin := range m.userLimiters {
newLimitersForPlugin := newLimiters[plugin]
if !limitersForPlugin.Equals(newLimitersForPlugin) {
pluginsWithChangedLimiters[plugin] = struct{}{}
}
}
// look for plugins did not have limiters before
for plugin := range newLimiters {
_, pluginHasLimiters := m.userLimiters[plugin]
if !pluginHasLimiters {
pluginsWithChangedLimiters[plugin] = struct{}{}
}
}
return pluginsWithChangedLimiters
}
func (m *PluginManager) updateRateLimiterStatus() {
// iterate through limiters for each plug
for plugin, pluginDefinedLimiters := range m.pluginLimiters {
// get user limiters for this plugin
userDefinedLimiters := m.getUserDefinedLimitersForPlugin(plugin)
// is there a user override? - if so set status to overriden
for name, pluginLimiter := range pluginDefinedLimiters {
_, isOverriden := userDefinedLimiters[name]
if isOverriden {
pluginLimiter.Status = modconfig.LimiterStatusOverriden
} else {
pluginLimiter.Status = modconfig.LimiterStatusActive
}
}
}
}
func (m *PluginManager) getUserDefinedLimitersForPlugin(plugin string) connection.LimiterMap {
log.Printf("[WARN] plugin %s", plugin)
userDefinedLimiters := m.userLimiters[plugin]
if userDefinedLimiters == nil {
userDefinedLimiters = make(connection.LimiterMap)
}
return userDefinedLimiters
}
func (m *PluginManager) rateLimiterTableExists(ctx context.Context) (bool, error) {
query := fmt.Sprintf(`SELECT EXISTS (
SELECT FROM
pg_tables
WHERE
schemaname = '%s' AND
tablename = '%s'
);`, constants.InternalSchema, constants.RateLimiterDefinitionTable)
row := m.pool.QueryRow(ctx, query)
var exists bool
err := row.Scan(&exists)
if err != nil {
return false, err
}
return exists, nil
}
func (m *PluginManager) initialiseRateLimiterDefs(ctx context.Context) (e error) {
defer func() {
// this function uses reflection to extract and convert values
// we need to be able to recover from panics while using reflection
if r := recover(); r != nil {
e = sperr.ToError(r, sperr.WithMessage("error loading rate limiter definitions"))
}
}()
rateLimiterTableExists, err := m.rateLimiterTableExists(ctx)
if err != nil {
return err
}
if !rateLimiterTableExists {
return m.bootstrapRateLimiterTable(ctx)
}
rateLimiters, err := m.loadRateLimitersFromTable(ctx)
if err != nil {
return err
}
// split the table result into plugin and user limiters
pluginLimiters, previousUserLimiters := m.getUserAndPluginLimitersFromTableResult(rateLimiters)
// store the plugin limiters
m.pluginLimiters = pluginLimiters
if previousUserLimiters.Equals(m.userLimiters) {
return nil
}
// if the user limiter in the table are different from the current user listeners, the config must have changed
// since we last ran - call refreshRateLimiterTable to (re)write the steampipe_rate_limiter table
return m.refreshRateLimiterTable(ctx)
}
func (m *PluginManager) bootstrapRateLimiterTable(ctx context.Context) error {
pluginLimiters, err := m.LoadPluginRateLimiters(m.getPluginExemplarConnections())
if err != nil {
return err
}
m.pluginLimiters = pluginLimiters
// now populate the table
return m.refreshRateLimiterTable(ctx)
}
func (m *PluginManager) loadRateLimitersFromTable(ctx context.Context) ([]*modconfig.RateLimiter, error) {
rows, err := m.pool.Query(ctx, fmt.Sprintf("SELECT * FROM %s.%s", constants.InternalSchema, constants.RateLimiterDefinitionTable))
if err != nil {
return nil, err
}
defer rows.Close()
rateLimiters, err := pgx.CollectRows(rows, pgx.RowToAddrOfStructByName[modconfig.RateLimiter])
if err != nil {
return nil, err
}
return rateLimiters, nil
}
func (m *PluginManager) getUserAndPluginLimitersFromTableResult(rateLimiters []*modconfig.RateLimiter) (connection.PluginLimiterMap, connection.PluginLimiterMap) {
pluginLimiters := make(connection.PluginLimiterMap)
userLimiters := make(connection.PluginLimiterMap)
for _, r := range rateLimiters {
if r.Source == modconfig.LimiterSourcePlugin {
pluginLimitersForPlugin := pluginLimiters[r.Plugin]
if pluginLimitersForPlugin == nil {
pluginLimitersForPlugin = make(connection.LimiterMap)
}
pluginLimitersForPlugin[r.Name] = r
pluginLimiters[r.Plugin] = pluginLimitersForPlugin
} else {
userLimitersForPlugin := userLimiters[r.Plugin]
if userLimitersForPlugin == nil {
userLimitersForPlugin = make(connection.LimiterMap)
}
userLimitersForPlugin[r.Name] = r
userLimiters[r.Plugin] = userLimitersForPlugin
}
}
return pluginLimiters, userLimiters
}
func (m *PluginManager) LoadPluginRateLimiters(pluginConnectionMap map[string]string) (connection.PluginLimiterMap, error) {
// build Get request
req := &pb.GetRequest{
Connections: maps.Values(pluginConnectionMap),
}
resp, err := m.Get(req)
if err != nil {
return nil, err
}
// ok so now we have all necessary plugin reattach configs - fetch the rate limiter defs
var errors []error
var res = make(connection.PluginLimiterMap)
for _, reattach := range resp.ReattachMap {
if !reattach.SupportedOperations.RateLimiters {
continue
}
// attach to the plugin process
pluginClient, err := sdkgrpc.NewPluginClientFromReattach(reattach.Convert(), reattach.Plugin)
if err != nil {
log.Printf("[WARN] failed to attach to plugin '%s' - pid %d: %s",
reattach.Plugin, reattach.Pid, err)
return nil, err
}
rateLimiterResp, err := pluginClient.GetRateLimiters(&proto.GetRateLimitersRequest{})
if err != nil {
return nil, err
}
if rateLimiterResp == nil || rateLimiterResp.Definitions == nil {
continue
}
// populate the plugin name
pluginShortName := m.pluginLongToShortNameMap[reattach.Plugin]
limitersForPlugin := make(connection.LimiterMap)
for _, l := range rateLimiterResp.Definitions {
r, err := modconfig.RateLimiterFromProto(l)
if err != nil {
errors = append(errors, sperr.WrapWithMessage(err, "failed to create rate limiter %s from plugin definition", err))
continue
}
r.Plugin = pluginShortName
// set plugin as source
r.Source = modconfig.LimiterSourcePlugin
// derfaulty status to active
r.Status = modconfig.LimiterStatusActive
// add to map
limitersForPlugin[l.Name] = r
}
// store back
res[pluginShortName] = limitersForPlugin
}
return res, nil
}

View File

@@ -1,32 +0,0 @@
package pluginmanager_service
import (
"context"
"fmt"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
"github.com/turbot/steampipe/pkg/rate_limiters"
)
func (m *PluginManager) refreshRateLimiterTable(ctx context.Context) error {
queries := []db_common.QueryWithArgs{
// TACTICAL - look at startup order
{Query: fmt.Sprintf(`CREATE SCHEMA IF NOT EXISTS %s;`, constants.InternalSchema)},
rate_limiters.DropRateLimiterTable(),
rate_limiters.CreateRateLimiterTable(),
rate_limiters.GrantsOnRateLimiterTable(),
}
for _, limiter := range m.limiters {
queries = append(queries, rate_limiters.GetPopulateRateLimiterSql(limiter))
}
conn, err := db_local.CreateLocalDbConnection(ctx, &db_local.CreateDbOptions{
Username: constants.DatabaseSuperUser,
})
if err != nil {
return err
}
_, err = db_local.ExecuteSqlWithArgsInTransaction(ctx, conn, queries...)
return err
}

View File

@@ -0,0 +1,15 @@
package pluginmanager_service
import (
"github.com/hashicorp/go-plugin"
pb "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/proto"
)
type runningPlugin struct {
pluginName string
client *plugin.Client
reattach *pb.ReattachConfig
initialized chan struct{}
failed chan struct{}
error error
}

View File

@@ -10,8 +10,8 @@ import (
sdkplugin "github.com/turbot/steampipe-plugin-sdk/v5/plugin"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/pluginmanager"
"github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/proto"
pluginshared "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/shared"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/steampipeconfig/options"
"github.com/turbot/steampipe/pkg/utils"
@@ -35,6 +35,7 @@ type ConnectionPlugin struct {
PluginName string
PluginClient *sdkgrpc.PluginClient
SupportedOperations *proto.SupportedOperations
PluginShortName string
}
func (p ConnectionPlugin) addConnection(name string, config string, connectionOptions *options.Connection, connectionType string) {
@@ -68,8 +69,9 @@ func (p ConnectionPlugin) GetSchema(connectionName string) (*sdkproto.Schema, er
return schema, nil
}
func NewConnectionPlugin(pluginName string, pluginClient *sdkgrpc.PluginClient, supportedOperations *proto.SupportedOperations) *ConnectionPlugin {
func NewConnectionPlugin(pluginShortName, pluginName string, pluginClient *sdkgrpc.PluginClient, supportedOperations *proto.SupportedOperations) *ConnectionPlugin {
return &ConnectionPlugin{
PluginShortName: pluginShortName,
PluginName: pluginName,
PluginClient: pluginClient,
SupportedOperations: supportedOperations,
@@ -77,7 +79,8 @@ func NewConnectionPlugin(pluginName string, pluginClient *sdkgrpc.PluginClient,
}
// CreateConnectionPlugins instantiates plugins for specified connections, and fetches schemas
func CreateConnectionPlugins(connectionNamesToCreate []string) (requestedConnectionPluginMap map[string]*ConnectionPlugin, res *RefreshConnectionResult) {
func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectionNamesToCreate []string) (requestedConnectionPluginMap map[string]*ConnectionPlugin, res *RefreshConnectionResult) {
res = &RefreshConnectionResult{}
requestedConnectionPluginMap = make(map[string]*ConnectionPlugin)
if len(connectionNamesToCreate) == 0 {
@@ -97,13 +100,6 @@ func CreateConnectionPlugins(connectionNamesToCreate []string) (requestedConnect
connectionNames[i] = connection.Name
}
// get plugin manager
pluginManager, err := pluginmanager.GetPluginManager()
if err != nil {
res.Error = err
return nil, res
}
// ask the plugin manager for the reattach config for all required plugins
getResponse, err := pluginManager.Get(&proto.GetRequest{Connections: connectionNames})
if err != nil {
@@ -123,9 +119,6 @@ func CreateConnectionPlugins(connectionNamesToCreate []string) (requestedConnect
}
// now create or retrieve a connection plugin for each connection
// NOTE: multiple connections may use the same plugin
// store a map of multi connection plugins, keyed by connection name
multiConnectionPlugins := make(map[string]*ConnectionPlugin)
for _, connection := range connectionsToCreate {
@@ -271,7 +264,7 @@ func createConnectionPlugin(connection *modconfig.Connection, reattach *proto.Re
log.Printf("[TRACE] plugin client created for %s", pluginName)
// now create ConnectionPlugin object return
connectionPlugin := NewConnectionPlugin(pluginName, pluginClient, reattach.SupportedOperations)
connectionPlugin := NewConnectionPlugin(connection.PluginShortName, pluginName, pluginClient, reattach.SupportedOperations)
// if multiple connections are NOT supported, add the config for our one and only connection
if reattach.SupportedOperations == nil || !reattach.SupportedOperations.MultipleConnections {

View File

@@ -54,7 +54,6 @@ func NewConnectionState(remoteSchema string, connection *modconfig.Connection, c
}
func (d *ConnectionState) Equals(other *ConnectionState) bool {
if d.Plugin != other.Plugin {
return false
}
@@ -76,8 +75,7 @@ func (d *ConnectionState) Equals(other *ConnectionState) bool {
return false
}
// allow for sub ms rounding errors when converting from PG
if d.PluginModTime.Sub(other.PluginModTime).Abs() > 1*time.Millisecond {
if d.pluginModTimeChanged(other) {
return false
}
// do not look at connection mod time as the mod time for the desired state is not relevant
@@ -85,6 +83,14 @@ func (d *ConnectionState) Equals(other *ConnectionState) bool {
return true
}
// allow for sub ms rounding errors when converting from PG
func (d *ConnectionState) pluginModTimeChanged(other *ConnectionState) bool {
if d.PluginModTime.Sub(other.PluginModTime).Abs() > 1*time.Millisecond {
return true
}
return false
}
func (d *ConnectionState) CanCloneSchema() bool {
return d.SchemaMode != plugin.SchemaModeDynamic &&
d.GetType() != modconfig.ConnectionTypeAggregator

View File

@@ -9,16 +9,16 @@ import (
"time"
"github.com/jackc/pgx/v5/pgxpool"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"golang.org/x/exp/maps"
"github.com/turbot/go-kit/helpers"
typehelpers "github.com/turbot/go-kit/types"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v5/plugin"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
pluginshared "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/shared"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/utils"
"golang.org/x/exp/maps"
)
type ConnectionUpdates struct {
@@ -31,17 +31,24 @@ type ConnectionUpdates struct {
// the connections which will exist after the update
FinalConnectionState ConnectionStateMap
// connection plugins required to perform the updates, keyed by connection name
ConnectionPlugins map[string]*ConnectionPlugin
ConnectionPlugins map[string]*ConnectionPlugin
CurrentConnectionState ConnectionStateMap
InvalidConnections map[string]*ValidationFailure
// map of plugin to connection for which we must refetch the rate limiter definitions
PluginsWithUpdatedBinary map[string]string
forceUpdateConnectionNames []string
pluginManager pluginshared.PluginManager
}
// NewConnectionUpdates returns updates to be made to the database to sync with connection config
func NewConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpdateConnectionNames ...string) (*ConnectionUpdates, *RefreshConnectionResult) {
updates, res := populateConnectionUpdates(ctx, pool, forceUpdateConnectionNames...)
func NewConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, pluginManager pluginshared.PluginManager, opts ...ConnectionUpdatesOption) (*ConnectionUpdates, *RefreshConnectionResult) {
updates, res := populateConnectionUpdates(ctx, pool, pluginManager, opts...)
if res.Error != nil {
return updates, res
return nil, res
}
// validate the updates
// this will validate all plugins and connection names and remove any updates which use invalid connections
updates.validate()
@@ -49,14 +56,19 @@ func NewConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpdateCo
return updates, res
}
func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpdateConnectionNames ...string) (*ConnectionUpdates, *RefreshConnectionResult) {
func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, pluginManager pluginshared.PluginManager, opts ...ConnectionUpdatesOption) (*ConnectionUpdates, *RefreshConnectionResult) {
var config = &connectionUpdatesConfig{}
for _, opt := range opts {
opt(config)
}
utils.LogTime("NewConnectionUpdates start")
defer utils.LogTime("NewConnectionUpdates end")
log.Printf("[TRACE] NewConnectionUpdates")
conn, err := pool.Acquire(ctx)
if err != nil {
log.Printf("[WARN] failed to acquire conneciton from pool: %s", err.Error())
log.Printf("[WARN] failed to acquire connection from pool: %s", err.Error())
return nil, NewErrorRefreshConnectionResult(err)
}
defer conn.Release()
@@ -89,13 +101,16 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
}
updates := &ConnectionUpdates{
Delete: make(map[string]struct{}),
Disabled: disabled,
Update: ConnectionStateMap{},
MissingComments: ConnectionStateMap{},
MissingPlugins: missingPlugins,
FinalConnectionState: requiredConnectionStateMap,
InvalidConnections: make(map[string]*ValidationFailure),
Delete: make(map[string]struct{}),
Disabled: disabled,
Update: ConnectionStateMap{},
MissingComments: ConnectionStateMap{},
MissingPlugins: missingPlugins,
FinalConnectionState: requiredConnectionStateMap,
InvalidConnections: make(map[string]*ValidationFailure),
PluginsWithUpdatedBinary: make(map[string]string),
forceUpdateConnectionNames: config.ForceUpdateConnectionNames,
pluginManager: pluginManager,
}
log.Printf("[INFO] loaded connection state")
@@ -105,7 +120,7 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
// for any connections with dynamic schema, we need to reload their schema
// instantiate connection plugins for all connections with dynamic schema - this will retrieve their current schema
dynamicSchemaHashMap, connectionsPluginsWithDynamicSchema, err := getSchemaHashesForDynamicSchemas(requiredConnectionStateMap, currentConnectionStateMap)
dynamicSchemaHashMap, connectionsPluginsWithDynamicSchema, err := updates.getSchemaHashesForDynamicSchemas(requiredConnectionStateMap, currentConnectionStateMap)
if err != nil {
log.Printf("[WARN] getSchemaHashesForDynamicSchemas failed: %s", err.Error())
return nil, NewErrorRefreshConnectionResult(err)
@@ -119,15 +134,25 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
log.Printf("[INFO] identify connections to update")
modTime := time.Now()
// connections to create/update
for name, requiredConnectionState := range requiredConnectionStateMap {
// if the connection requires update, add to list
if connectionRequiresUpdate(forceUpdateConnectionNames, name, currentConnectionStateMap, requiredConnectionState) {
updates.Update[name] = requiredConnectionState
res := connectionRequiresUpdate(config.ForceUpdateConnectionNames, name, currentConnectionStateMap, requiredConnectionState)
if res.requiresUpdate {
log.Printf("[INFO] connection %s is out of date or missing. updates: %v", name, maps.Keys(updates.Update))
updates.Update[name] = requiredConnectionState
// set the connection mod time of required connection data to now
requiredConnectionState.ConnectionModTime = modTime
// if the plugin mod time has changed, add this to the map of connections
// we need to refetch the rate limiters for
if res.pluginBinaryChanged {
// store map item of plugin name to connection name (so we only have one entry per plugin)
pluginShortName := GlobalConfig.Connections[requiredConnectionState.ConnectionName].PluginShortName
updates.PluginsWithUpdatedBinary[pluginShortName] = requiredConnectionState.ConnectionName
}
}
}
@@ -170,6 +195,7 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
connectionData, ok := currentConnectionStateMap[name]
// if the connection exists in the state, does the schemas hash match?
if ok && connectionData.SchemaHash != requiredHash {
log.Printf("[INFO] %s dynamic schema hash does not match - update", connectionData.ConnectionName)
updates.Update[name] = connectionData
}
}
@@ -190,42 +216,61 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, forceUpd
// for all updates/deletes, if there any aggregators of the same plugin type, update those as well
updates.populateAggregators()
res.Updates = updates
return updates, res
}
func connectionRequiresUpdate(forceUpdateConnectionNames []string, name string, currentConnectionStateMap ConnectionStateMap, requiredConnectionState *ConnectionState) bool {
// check whether this connection exists in the state
currentConnectionState, schemaExistsInState := currentConnectionStateMap[name]
type connectionRequiresUpdateResult struct {
requiresUpdate bool
pluginBinaryChanged bool
}
func connectionRequiresUpdate(forceUpdateConnectionNames []string, name string, currentConnectionStateMap ConnectionStateMap, requiredConnectionState *ConnectionState) connectionRequiresUpdateResult {
var res = connectionRequiresUpdateResult{}
// if the required plugin is not installed, return false
if typehelpers.SafeString(requiredConnectionState.ConnectionError) == constants.ConnectionErrorPluginNotInstalled {
return false
return res
}
// check whether this connection exists in the state
currentConnectionState, schemaExistsInState := currentConnectionStateMap[name]
// if the connection has been disabled, return false
if requiredConnectionState.Disabled() {
return false
return res
}
// is this is a new connection
if !schemaExistsInState {
return true
res.requiresUpdate = true
return res
}
// determine whethe the plugin mod time has changed
if currentConnectionState.pluginModTimeChanged(requiredConnectionState) {
res.requiresUpdate = true
res.pluginBinaryChanged = true
return res
}
// if the connection has been enabled (i.e. if it was previously DISABLED) , return true
if currentConnectionState.Disabled() {
return true
res.requiresUpdate = true
return res
}
// are we are forcing an update of this connection,
if helpers.StringSliceContains(forceUpdateConnectionNames, name) {
return true
res.requiresUpdate = true
return res
}
// has this connection previously not fully loaded
if currentConnectionState.State == constants.ConnectionStatePendingIncomplete {
return true
res.requiresUpdate = true
return res
}
// update if the connection state is different
return !currentConnectionState.Equals(requiredConnectionState)
res.requiresUpdate = !currentConnectionState.Equals(requiredConnectionState)
return res
}
// update requiredConnections - set the schema hash and schema mode for all elements of FinalConnectionState
@@ -264,9 +309,11 @@ func (u *ConnectionUpdates) populateConnectionPlugins(alreadyCreatedConnectionPl
// - add connections which will be updated or have the comments updated
// - exclude connections already created
// - for any aggregator connections, instantiate the first child connection instead
// - if FetchRateLimitersForAllPlugins, start ALL plugins, using an abitrary exemplar connection if necessary
connectionsToCreate := u.getConnectionsToCreate(alreadyCreatedConnectionPlugins)
// now create them
connectionPlugins, res := CreateConnectionPlugins(connectionsToCreate)
connectionPluginsByConnection, res := CreateConnectionPlugins(u.pluginManager, connectionsToCreate)
// if any plugins failed to load, set those connections to error
for c, reason := range res.FailedConnections {
u.setError(c, reason)
@@ -277,10 +324,11 @@ func (u *ConnectionUpdates) populateConnectionPlugins(alreadyCreatedConnectionPl
}
// add back in the already created plugins
for name, connectionPlugin := range alreadyCreatedConnectionPlugins {
connectionPlugins[name] = connectionPlugin
connectionPluginsByConnection[name] = connectionPlugin
}
// and set our ConnectionPlugins property
u.ConnectionPlugins = connectionPlugins
u.ConnectionPlugins = connectionPluginsByConnection
return res
}
@@ -297,13 +345,16 @@ func (u *ConnectionUpdates) getConnectionsToCreate(alreadyCreatedConnectionPlugi
connectionMap[child.Name] = child
}
}
// NOTE - we may have already created some connection plugins (if they have dynamic schema)
// - remove these from list of plugins to create
for name := range alreadyCreatedConnectionPlugins {
delete(connectionMap, name)
}
return maps.Keys(connectionMap)
connectionsToStart := maps.Keys(connectionMap)
return connectionsToStart
}
func (u *ConnectionUpdates) HasUpdates() bool {
@@ -331,6 +382,7 @@ func (u *ConnectionUpdates) String() string {
}
func (u *ConnectionUpdates) setError(connectionName string, error string) {
log.Printf("[INFO] ConnectionUpdates.setError connection %s: %s", connectionName, error)
failedConnection, ok := u.FinalConnectionState[connectionName]
if !ok {
return
@@ -386,7 +438,8 @@ func (u *ConnectionUpdates) populateAggregators() {
log.Printf("[INFO] found %d %s with aggregators", len(pluginAggregatorMap), utils.Pluralize("plugin", len(pluginAggregatorMap)))
// for all updates/deletes, if there any aggregators of the same plugin type, update those as well
// build a map of all plugins with connecitons being updated/deleted
// build a map of all plugins with connecti
//ons being updated/deleted
modifiedPluginLookup := make(map[string]struct{})
for _, c := range u.Update {
modifiedPluginLookup[c.Plugin] = struct{}{}
@@ -408,7 +461,7 @@ func (u *ConnectionUpdates) populateAggregators() {
}
func getSchemaHashesForDynamicSchemas(requiredConnectionData ConnectionStateMap, connectionState ConnectionStateMap) (map[string]string, map[string]*ConnectionPlugin, error) {
func (u *ConnectionUpdates) getSchemaHashesForDynamicSchemas(requiredConnectionData ConnectionStateMap, connectionState ConnectionStateMap) (map[string]string, map[string]*ConnectionPlugin, error) {
log.Printf("[TRACE] getSchemaHashesForDynamicSchemas")
// for every required connection, check the connection state to determine whether the schema mode is 'dynamic'
// if we have never loaded the connection, there will be no state, so we cannot retrieve this information
@@ -426,7 +479,7 @@ func getSchemaHashesForDynamicSchemas(requiredConnectionData ConnectionStateMap,
}
}
}
connectionsPluginsWithDynamicSchema, res := CreateConnectionPlugins(maps.Keys(connectionsWithDynamicSchema))
connectionsPluginsWithDynamicSchema, res := CreateConnectionPlugins(u.pluginManager, maps.Keys(connectionsWithDynamicSchema))
if res.Error != nil {
return nil, nil, res.Error
}

View File

@@ -0,0 +1,13 @@
package steampipeconfig
type connectionUpdatesConfig struct {
ForceUpdateConnectionNames []string
}
type ConnectionUpdatesOption func(opt *connectionUpdatesConfig)
func WithForceUpdate(connections []string) ConnectionUpdatesOption {
return func(opt *connectionUpdatesConfig) {
opt.ForceUpdateConnectionNames = connections
}
}

View File

@@ -223,7 +223,7 @@ func loadConfig(configFolder string, steampipeConfig *SteampipeConfig, opts *loa
if alreadyThere {
return error_helpers.NewErrorsAndWarning(sperr.New("duplicate limiter name: '%s' in '%s'", limiter.Name, block.TypeRange.Filename))
}
// TODO key by plugin
steampipeConfig.Limiters[limiter.Name] = limiter
case modconfig.BlockTypeConnection:
connection, moreDiags := parse.DecodeConnection(block)

View File

@@ -142,6 +142,7 @@ func loadModDependency(modDependency *versionmap.ResolvedVersionConstraint, pars
// update loaded dependency mods
parseCtx.AddLoadedDependencyMod(dependencyMod)
// TODO IS THIS NEEDED????
if parseCtx.ParentParseCtx != nil {
// add mod resources to parent parse context
parseCtx.ParentParseCtx.AddModResources(dependencyMod)

View File

@@ -1,6 +1,8 @@
package modconfig
import (
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/hclsyntax"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"sort"
"strings"
@@ -14,27 +16,43 @@ const (
)
type RateLimiter struct {
Name string `hcl:"name,label"`
Plugin string `hcl:"plugin"`
BucketSize *int64 `hcl:"bucket_size,optional"`
FillRate *float32 `hcl:"fill_rate,optional"`
MaxConcurrency *int64 `hcl:"max_concurrency,optional"`
Scope []string `hcl:"scope"`
Where *string `hcl:"where,optional"`
Status string
Source string
FileName *string
StartLineNumber *int
EndLineNumber *int
Name string `hcl:"name,optional" db:"name"`
Plugin string `hcl:"plugin" db:"plugin"`
BucketSize *int64 `hcl:"bucket_size,optional" db:"bucket_size"`
FillRate *float32 `hcl:"fill_rate,optional" db:"fill_rate"`
MaxConcurrency *int64 `hcl:"max_concurrency,optional" db:"max_concurrency"`
Scope []string `hcl:"scope,optional" db:"scope"`
Where *string `hcl:"where,optional" db:"where"`
FileName *string `db:"file_name"`
StartLineNumber *int `db:"start_line_number"`
EndLineNumber *int `db:"end_line_number"`
Status string `db:"status"`
Source string `db:"source"`
}
func (l RateLimiter) scopeString() string {
scope := l.Scope
sort.Strings(scope)
return strings.Join(scope, "'")
// RateLimiterFromProto converts the proto format RateLimiterDefinition into a Defintion
func RateLimiterFromProto(p *proto.RateLimiterDefinition) (*RateLimiter, error) {
var res = &RateLimiter{
Name: p.Name,
Scope: p.Scope,
}
if p.FillRate != 0 {
res.FillRate = &p.FillRate
res.BucketSize = &p.BucketSize
}
if p.MaxConcurrency != 0 {
res.MaxConcurrency = &p.MaxConcurrency
}
if p.Where != "" {
res.Where = &p.Where
}
if res.Scope == nil {
res.Scope = []string{}
}
return res, nil
}
func (l RateLimiter) AsProto() *proto.RateLimiterDefinition {
func (l *RateLimiter) AsProto() *proto.RateLimiterDefinition {
res := &proto.RateLimiterDefinition{
Name: l.Name,
Scope: l.Scope,
@@ -51,10 +69,31 @@ func (l RateLimiter) AsProto() *proto.RateLimiterDefinition {
if l.Where != nil {
res.Where = *l.Where
}
return res
}
func (l RateLimiter) Equals(other *RateLimiter) bool {
func (l *RateLimiter) OnDecoded(block *hcl.Block) {
l.FileName = &block.DefRange.Filename
l.StartLineNumber = &block.Body.(*hclsyntax.Body).SrcRange.Start.Line
l.EndLineNumber = &block.Body.(*hclsyntax.Body).SrcRange.End.Line
if l.Scope == nil {
l.Scope = []string{}
}
if l.Name == "" {
l.Name = block.Labels[0]
}
l.Status = LimiterStatusActive
l.Source = LimiterSourceConfig
}
func (l *RateLimiter) scopeString() string {
scope := l.Scope
sort.Strings(scope)
return strings.Join(scope, "'")
}
func (l *RateLimiter) Equals(other *RateLimiter) bool {
return l.Name == other.Name &&
l.BucketSize == other.BucketSize &&
l.FillRate == other.FillRate &&

View File

@@ -3,7 +3,6 @@ package parse
import (
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/gohcl"
"github.com/hashicorp/hcl/v2/hclsyntax"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
)
@@ -12,11 +11,9 @@ func DecodeLimiter(block *hcl.Block) (*modconfig.RateLimiter, hcl.Diagnostics) {
Name: block.Labels[0],
}
diags := gohcl.DecodeBody(block.Body, nil, limiter)
limiter.FileName = &block.DefRange.Filename
limiter.StartLineNumber = &block.Body.(*hclsyntax.Body).SrcRange.Start.Line
limiter.EndLineNumber = &block.Body.(*hclsyntax.Body).SrcRange.End.Line
limiter.Status = modconfig.LimiterStatusActive
limiter.Source = modconfig.LimiterSourceConfig
if !diags.HasErrors() {
limiter.OnDecoded(block)
}
return limiter, diags
}

View File

@@ -140,6 +140,7 @@ func ParseMod(fileData map[string][]byte, pseudoResources []modconfig.MappableRe
// add the mod to the run context
// - this it to ensure all pseudo resources get added and build the eval context with the variables we just added
// - it also adds the top level resources of the any dependency mods
if diags = parseCtx.AddModResources(mod); diags.HasErrors() {
return nil, error_helpers.NewErrorsAndWarning(plugin.DiagsToError("Failed to add mod to run context", diags))
}

View File

@@ -12,7 +12,6 @@ import (
type RefreshConnectionResult struct {
error_helpers.ErrorAndWarnings
UpdatedConnections bool
Updates *ConnectionUpdates
FailedConnections map[string]string
}

View File

@@ -32,7 +32,7 @@ load "$LIB_BATS_SUPPORT/load.bash"
}
@test "custom database name" {
# Set the STEAMPIPE_INITDB_DATABASE_NAME env variable
# Set the STEAMPIPE_INITDB_DATABASE_NAME env variable
export STEAMPIPE_INITDB_DATABASE_NAME="custom_db_name"
target_install_directory=$(mktemp -d)