Files
steampipe/pkg/pluginmanager_service/plugin_manager.go
Nathan Wallace 754c7e6832 Nil pointer dereference in waitForPluginLoad closes #4752 (#4896)
* Add test for #4752: Nil pointer dereference in waitForPluginLoad

This test demonstrates the bug where waitForPluginLoad() panics with a nil
pointer dereference when a plugin fails during startup before the reattach
config is set.

The test creates a runningPlugin with reattach=nil and closes the failed
channel, simulating a plugin that fails in startPluginProcess before
initializePlugin is called.

* Fix #4752: Add nil check for reattach in waitForPluginLoad

Adds a nil pointer check before accessing p.reattach.Pid when a plugin
fails during startup. If the plugin fails before the reattach config is
set (e.g., in startPluginProcess), reattach will be nil and the code
would previously panic when trying to log the PID.

The fix checks if reattach is nil and logs an appropriate message in
each case, preventing the panic while still providing useful debugging
information.
2025-11-16 15:53:59 -05:00

871 lines
31 KiB
Go

package pluginmanager_service
import (
"context"
"fmt"
"log"
"os"
"os/exec"
"strconv"
"strings"
"sync"
"time"
"github.com/hashicorp/go-hclog"
goplugin "github.com/hashicorp/go-plugin"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/sethvargo/go-retry"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
pconstants "github.com/turbot/pipe-fittings/v2/constants"
"github.com/turbot/pipe-fittings/v2/filepaths"
"github.com/turbot/pipe-fittings/v2/plugin"
"github.com/turbot/pipe-fittings/v2/utils"
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
sdkshared "github.com/turbot/steampipe-plugin-sdk/v5/grpc/shared"
sdkplugin "github.com/turbot/steampipe-plugin-sdk/v5/plugin"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/v2/pkg/connection"
"github.com/turbot/steampipe/v2/pkg/constants"
"github.com/turbot/steampipe/v2/pkg/db/db_local"
"github.com/turbot/steampipe/v2/pkg/error_helpers"
"github.com/turbot/steampipe/v2/pkg/pluginmanager_service/grpc"
pb "github.com/turbot/steampipe/v2/pkg/pluginmanager_service/grpc/proto"
pluginshared "github.com/turbot/steampipe/v2/pkg/pluginmanager_service/grpc/shared"
"github.com/turbot/steampipe/v2/pkg/steampipeconfig"
)
// PluginManager is the implementation of grpc.PluginManager
type PluginManager struct {
pb.UnimplementedPluginManagerServer
// map of running plugins keyed by plugin instance
runningPluginMap map[string]*runningPlugin
// map of connection configs, keyed by plugin instance
// this is populated at startup and updated when a connection config change is detected
pluginConnectionConfigMap map[string][]*sdkproto.ConnectionConfig
// map of connection configs, keyed by connection name
// this is populated at startup and updated when a connection config change is detected
connectionConfigMap connection.ConnectionConfigMap
// map of max cache size, keyed by plugin instance
pluginCacheSizeMap map[string]int64
// map lock
mut sync.RWMutex
// shutdown synchronization
// do not start any plugins while shutting down
shutdownMut sync.RWMutex
shuttingDown bool
// do not shutdown until all plugins have loaded
startPluginWg sync.WaitGroup
logger hclog.Logger
messageServer *PluginMessageServer
// map of user configured rate limiter maps, keyed by plugin instance
// NOTE: this is populated from config
userLimiters connection.PluginLimiterMap
// map of plugin configured rate limiter maps (keyed by plugin instance)
// NOTE: if this is nil, that means the steampipe_rate_limiter tables has not been populated yet -
// the first time we refresh connections we must load all plugins and fetch their rate limiter defs
pluginLimiters connection.PluginLimiterMap
// map of plugin configs (keyed by plugin instance)
plugins connection.PluginMap
pool *pgxpool.Pool
}
func NewPluginManager(ctx context.Context, connectionConfig map[string]*sdkproto.ConnectionConfig, pluginConfigs connection.PluginMap, logger hclog.Logger) (*PluginManager, error) {
log.Printf("[INFO] NewPluginManager")
pluginManager := &PluginManager{
logger: logger,
runningPluginMap: make(map[string]*runningPlugin),
connectionConfigMap: connectionConfig,
userLimiters: pluginConfigs.ToPluginLimiterMap(),
plugins: pluginConfigs,
}
pluginManager.messageServer = &PluginMessageServer{pluginManager: pluginManager}
// populate plugin connection config map
pluginManager.populatePluginConnectionConfigs()
// determine cache size for each plugin
pluginManager.setPluginCacheSizeMap()
// create a connection pool to connection refresh
// in testing, a size of 20 seemed optimal
poolsize := 20
pool, err := db_local.CreateConnectionPool(ctx, &db_local.CreateDbOptions{Username: constants.DatabaseSuperUser}, poolsize)
if err != nil {
return nil, err
}
pluginManager.pool = pool
if err := pluginManager.initialiseRateLimiterDefs(ctx); err != nil {
return nil, err
}
if err := pluginManager.initialisePluginColumns(ctx); err != nil {
return nil, err
}
return pluginManager, nil
}
// plugin interface functions
func (m *PluginManager) Serve() {
// create a plugin map, using ourselves as the implementation
pluginMap := map[string]goplugin.Plugin{
pluginshared.PluginName: &pluginshared.PluginManagerPlugin{Impl: m},
}
goplugin.Serve(&goplugin.ServeConfig{
HandshakeConfig: pluginshared.Handshake,
Plugins: pluginMap,
// enable gRPC serving for this plugin...
GRPCServer: goplugin.DefaultGRPCServer,
})
}
func (m *PluginManager) Get(req *pb.GetRequest) (_ *pb.GetResponse, err error) {
defer func() {
if r := recover(); r != nil {
err = sperr.ToError(r, sperr.WithMessage("unexpected error encountered"))
}
}()
log.Printf("[TRACE] PluginManager Get %p", req)
defer log.Printf("[TRACE] PluginManager Get DONE %p", req)
resp := newGetResponse()
// build a map of plugins to connection config for requested connections, and a lookup of the requested connections
plugins, requestedConnectionsLookup, err := m.buildRequiredPluginMap(req)
if err != nil {
return resp.GetResponse, err
}
log.Printf("[TRACE] PluginManager Get, connections: '%s'\n", req.Connections)
var pluginWg sync.WaitGroup
for pluginInstance, connectionConfigs := range plugins {
m.ensurePluginAsync(req, resp, pluginInstance, connectionConfigs, requestedConnectionsLookup, &pluginWg)
}
pluginWg.Wait()
log.Printf("[TRACE] PluginManager Get DONE")
return resp.GetResponse, nil
}
func (m *PluginManager) ensurePluginAsync(req *pb.GetRequest, resp *getResponse, pluginInstance string, connectionConfigs []*sdkproto.ConnectionConfig, requestedConnectionsLookup map[string]struct{}, pluginWg *sync.WaitGroup) {
pluginWg.Add(1)
go func() {
defer pluginWg.Done()
// ensure plugin is running
reattach, err := m.ensurePlugin(pluginInstance, connectionConfigs, req)
if err != nil {
log.Printf("[WARN] PluginManager Get failed for %s: %s (%p)", pluginInstance, err.Error(), resp)
resp.AddFailure(pluginInstance, err.Error())
} else {
log.Printf("[TRACE] PluginManager Get succeeded for %s, pid %d (%p)", pluginInstance, reattach.Pid, resp)
// assign reattach for requested connections
// (NOTE: connectionConfigs contains ALL connections for the plugin)
for _, config := range connectionConfigs {
// if this connection was requested, copy reattach into responses
if _, connectionWasRequested := requestedConnectionsLookup[config.Connection]; connectionWasRequested {
resp.AddReattach(config.Connection, reattach)
}
}
}
}()
}
// build a map of plugins to connection config for requested connections, keyed by plugin instance,
// and a lookup of the requested connections
func (m *PluginManager) buildRequiredPluginMap(req *pb.GetRequest) (map[string][]*sdkproto.ConnectionConfig, map[string]struct{}, error) {
var plugins = make(map[string][]*sdkproto.ConnectionConfig)
// also make a map of target connections - used when assigning results to the response
var requestedConnectionsLookup = make(map[string]struct{}, len(req.Connections))
for _, connectionName := range req.Connections {
// store connection in requested connection map
requestedConnectionsLookup[connectionName] = struct{}{}
connectionConfig, err := m.getConnectionConfig(connectionName)
if err != nil {
return nil, nil, err
}
pluginInstance := connectionConfig.PluginInstance
// if we have not added this plugin instance, add it now
if _, addedPlugin := plugins[pluginInstance]; !addedPlugin {
// now get ALL connection configs for this plugin
// (not just the requested connections)
plugins[pluginInstance] = m.pluginConnectionConfigMap[pluginInstance]
}
}
return plugins, requestedConnectionsLookup, nil
}
func (m *PluginManager) Pool() *pgxpool.Pool {
return m.pool
}
func (m *PluginManager) RefreshConnections(*pb.RefreshConnectionsRequest) (*pb.RefreshConnectionsResponse, error) {
log.Printf("[INFO] PluginManager RefreshConnections")
resp := &pb.RefreshConnectionsResponse{}
log.Printf("[INFO] calling RefreshConnections asyncronously")
go m.doRefresh()
return resp, nil
}
func (m *PluginManager) doRefresh() {
refreshResult := connection.RefreshConnections(context.Background(), m)
if refreshResult.Error != nil {
// NOTE: the RefreshConnectionState will already have sent a notification to the CLI
log.Printf("[WARN] RefreshConnections failed with error: %s", refreshResult.Error.Error())
}
}
// OnConnectionConfigChanged is the callback function invoked by the connection watcher when the config changed
func (m *PluginManager) OnConnectionConfigChanged(ctx context.Context, configMap connection.ConnectionConfigMap, plugins map[string]*plugin.Plugin) {
m.mut.Lock()
defer m.mut.Unlock()
log.Printf("[TRACE] OnConnectionConfigChanged: connections: %s plugin instances: %s", strings.Join(utils.SortedMapKeys(configMap), ","), strings.Join(utils.SortedMapKeys(plugins), ","))
if err := m.handleConnectionConfigChanges(ctx, configMap); err != nil {
log.Printf("[WARN] handleConnectionConfigChanges failed: %s", err.Error())
}
// update our plugin configs
if err := m.handlePluginInstanceChanges(ctx, plugins); err != nil {
log.Printf("[WARN] handlePluginInstanceChanges failed: %s", err.Error())
}
if err := m.handleUserLimiterChanges(ctx, plugins); err != nil {
log.Printf("[WARN] handleUserLimiterChanges failed: %s", err.Error())
}
}
func (m *PluginManager) GetConnectionConfig() connection.ConnectionConfigMap {
return m.connectionConfigMap
}
func (m *PluginManager) Shutdown(*pb.ShutdownRequest) (resp *pb.ShutdownResponse, err error) {
log.Printf("[INFO] PluginManager Shutdown")
defer log.Printf("[INFO] PluginManager Shutdown complete")
// lock shutdownMut before waiting for startPluginWg
// this enables us to exit from ensurePlugin early if needed
m.shutdownMut.Lock()
m.shuttingDown = true
m.shutdownMut.Unlock()
m.startPluginWg.Wait()
// close our pool
if m.pool != nil {
log.Printf("[INFO] PluginManager closing pool")
m.pool.Close()
}
m.mut.RLock()
defer func() {
m.mut.RUnlock()
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
// kill all plugins in pluginMultiConnectionMap
for _, p := range m.runningPluginMap {
log.Printf("[INFO] Kill plugin %s (%p)", p.pluginInstance, p.client)
m.killPlugin(p)
}
return &pb.ShutdownResponse{}, nil
}
func (m *PluginManager) killPlugin(p *runningPlugin) {
log.Println("[DEBUG] PluginManager killPlugin start")
defer log.Println("[DEBUG] PluginManager killPlugin complete")
if p.client == nil {
log.Printf("[WARN] plugin %s has no client - cannot kill client", p.pluginInstance)
// shouldn't happen but has been observed in error situations
return
}
log.Printf("[INFO] PluginManager killing plugin %s (%v)", p.pluginInstance, p.reattach.Pid)
p.client.Kill()
}
func (m *PluginManager) ensurePlugin(pluginInstance string, connectionConfigs []*sdkproto.ConnectionConfig, req *pb.GetRequest) (reattach *pb.ReattachConfig, err error) {
/* call startPluginIfNeeded within a retry block
we will retry if:
- we enter the plugin startup flow, but discover another process has beaten us to it an is starting the plugin already
- plugin initialization fails
- there was a runningPlugin entry in our map but the pid did not exist
(i.e we thought the plugin was running, but it was not)
*/
backoff := retry.WithMaxRetries(5, retry.NewConstant(10*time.Millisecond))
// ensure we do not shutdown until this has finished
m.startPluginWg.Add(1)
defer func() {
m.startPluginWg.Done()
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
// do not install a plugin while shutting down
if m.isShuttingDown() {
return nil, fmt.Errorf("plugin manager is shutting down")
}
log.Printf("[TRACE] PluginManager ensurePlugin %s (%p)", pluginInstance, req)
err = retry.Do(context.Background(), backoff, func(ctx context.Context) error {
reattach, err = m.startPluginIfNeeded(pluginInstance, connectionConfigs, req)
return err
})
return
}
func (m *PluginManager) startPluginIfNeeded(pluginInstance string, connectionConfigs []*sdkproto.ConnectionConfig, req *pb.GetRequest) (*pb.ReattachConfig, error) {
// is this plugin already running
// lock access to plugin map
m.mut.RLock()
startingPlugin, ok := m.runningPluginMap[pluginInstance]
m.mut.RUnlock()
if ok {
log.Printf("[TRACE] startPluginIfNeeded got running plugin (%p)", req)
// wait for plugin to process connection config, and verify it is running
err := m.waitForPluginLoad(startingPlugin, req)
if err == nil {
// so plugin has loaded - we are done
// NOTE: ensure the connections assigned to this plugin are correct
// (may be out of sync if a connection is being added)
m.mut.Lock()
startingPlugin.reattach.UpdateConnections(connectionConfigs)
m.mut.Unlock()
log.Printf("[TRACE] waitForPluginLoad succeeded %s (%p)", pluginInstance, req)
return startingPlugin.reattach, nil
}
log.Printf("[TRACE] waitForPluginLoad failed %s (%p)", err.Error(), req)
// just return the error
return nil, err
}
// so the plugin is NOT loaded or loading
// fall through to plugin startup
log.Printf("[INFO] plugin %s NOT started or starting - start now (%p)", pluginInstance, req)
return m.startPlugin(pluginInstance, connectionConfigs, req)
}
func (m *PluginManager) startPlugin(pluginInstance string, connectionConfigs []*sdkproto.ConnectionConfig, req *pb.GetRequest) (_ *pb.ReattachConfig, err error) {
log.Printf("[DEBUG] startPlugin %s (%p) start", pluginInstance, req)
defer log.Printf("[DEBUG] startPlugin %s (%p) end", pluginInstance, req)
// add a new running plugin to pluginMultiConnectionMap
// (if someone beat us to it and added a starting plugin before we get the write lock,
// this will return a retryable error)
startingPlugin, err := m.addRunningPlugin(pluginInstance)
if err != nil {
log.Printf("[INFO] addRunningPlugin returned error %s (%p)", err.Error(), req)
return nil, err
}
log.Printf("[INFO] added running plugin (%p)", req)
// ensure we clean up the starting plugin in case of error
defer func() {
if err != nil {
m.mut.Lock()
// delete from map
delete(m.runningPluginMap, pluginInstance)
// set error on running plugin
startingPlugin.error = err
// close failed chan to signal to anyone waiting for the plugin to startup that it failed
close(startingPlugin.failed)
log.Printf("[INFO] startPluginProcess failed: %s (%p)", err.Error(), req)
// kill the client
if startingPlugin.client != nil {
log.Printf("[INFO] failed pid: %d (%p)", startingPlugin.client.ReattachConfig().Pid, req)
startingPlugin.client.Kill()
}
m.mut.Unlock()
}
}()
// OK so now proceed with plugin startup
log.Printf("[INFO] start plugin (%p)", req)
// now start the process
client, err := m.startPluginProcess(pluginInstance, connectionConfigs)
if err != nil {
// do not retry - no reason to think this will fix itself
return nil, err
}
startingPlugin.client = client
// set the connection configs and build a ReattachConfig
reattach, err := m.initializePlugin(connectionConfigs, client, req)
if err != nil {
log.Printf("[WARN] initializePlugin failed: %s (%p)", err.Error(), req)
return nil, err
}
startingPlugin.reattach = reattach
// close initialized chan to advertise that this plugin is ready
close(startingPlugin.initialized)
log.Printf("[INFO] PluginManager ensurePlugin complete, returning reattach config with PID: %d (%p)", reattach.Pid, req)
// and return
return reattach, nil
}
func (m *PluginManager) addRunningPlugin(pluginInstance string) (*runningPlugin, error) {
// add a new running plugin to pluginMultiConnectionMap
// this is a placeholder so no other thread tries to create start this plugin
// acquire write lock
m.mut.Lock()
defer m.mut.Unlock()
log.Printf("[TRACE] add running plugin for %s (if someone didn't beat us to it)", pluginInstance)
// check someone else has beaten us to it (there is a race condition to starting a plugin)
if _, ok := m.runningPluginMap[pluginInstance]; ok {
log.Printf("[TRACE] re checked map and found a starting plugin - return retryable error so we wait for this plugin")
// if so, just retry, which will wait for the loading plugin
return nil, retry.RetryableError(fmt.Errorf("another client has already started the plugin"))
}
// get the config for this instance
pluginConfig := m.plugins[pluginInstance]
if pluginConfig == nil {
// not expected
return nil, sperr.New("plugin manager has no config for plugin instance %s", pluginInstance)
}
// create the running plugin
startingPlugin := &runningPlugin{
pluginInstance: pluginInstance,
imageRef: pluginConfig.Plugin,
initialized: make(chan struct{}),
failed: make(chan struct{}),
}
// write back
m.runningPluginMap[pluginInstance] = startingPlugin
log.Printf("[INFO] written running plugin to map")
return startingPlugin, nil
}
func (m *PluginManager) startPluginProcess(pluginInstance string, connectionConfigs []*sdkproto.ConnectionConfig) (*goplugin.Client, error) {
// retrieve the plugin config
pluginConfig := m.plugins[pluginInstance]
// must be there (if no explicit config was specified, we create a default)
if pluginConfig == nil {
panic(fmt.Sprintf("no plugin config is stored for plugin instance %s", pluginInstance))
}
imageRef := pluginConfig.Plugin
log.Printf("[INFO] ************ start plugin: %s, label: %s ********************\n", imageRef, pluginConfig.Instance)
// NOTE: pass pluginConfig.Alias as the pluginAlias
// - this is just used for the error message if we fail to load
pluginPath, err := filepaths.GetPluginPath(imageRef, pluginConfig.Alias)
if err != nil {
return nil, err
}
log.Printf("[INFO] ************ plugin path %s ********************\n", pluginPath)
// create the plugin map
pluginMap := map[string]goplugin.Plugin{
imageRef: &sdkshared.WrapperPlugin{},
}
cmd := exec.Command(pluginPath)
m.setPluginMaxMemory(pluginConfig, cmd)
pluginStartTimeoutDuration := time.Duration(viper.GetInt64(pconstants.ArgPluginStartTimeout)) * time.Second
log.Printf("[TRACE] %s pluginStartTimeoutDuration: %s", pluginPath, pluginStartTimeoutDuration)
client := goplugin.NewClient(&goplugin.ClientConfig{
HandshakeConfig: sdkshared.Handshake,
Plugins: pluginMap,
Cmd: cmd,
AllowedProtocols: []goplugin.Protocol{goplugin.ProtocolGRPC},
StartTimeout: pluginStartTimeoutDuration,
// pass our logger to the plugin client to ensure plugin logs end up in logfile
Logger: m.logger,
})
if _, err := client.Start(); err != nil {
// attempt to retrieve error message encoded in the plugin stdout
err := grpc.HandleStartFailure(err)
return nil, err
}
return client, nil
}
func (m *PluginManager) setPluginMaxMemory(pluginConfig *plugin.Plugin, cmd *exec.Cmd) {
maxMemoryBytes := pluginConfig.GetMaxMemoryBytes()
if maxMemoryBytes == 0 {
if viper.IsSet(pconstants.ArgMemoryMaxMbPlugin) {
maxMemoryBytes = viper.GetInt64(pconstants.ArgMemoryMaxMbPlugin) * 1024 * 1024
}
}
if maxMemoryBytes != 0 {
log.Printf("[INFO] Setting max memory for plugin '%s' to %d Mb", pluginConfig.Instance, maxMemoryBytes/(1024*1024))
// set GOMEMLIMIT for the plugin command env
// TODO should I check for GOMEMLIMIT or does this just override
cmd.Env = append(os.Environ(), fmt.Sprintf("GOMEMLIMIT=%d", maxMemoryBytes))
}
}
// set the connection configs and build a ReattachConfig
func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.ConnectionConfig, client *goplugin.Client, req *pb.GetRequest) (_ *pb.ReattachConfig, err error) {
// extract connection names
connectionNames := make([]string, len(connectionConfigs))
for i, c := range connectionConfigs {
connectionNames[i] = c.Connection
}
exemplarConnectionConfig := connectionConfigs[0]
pluginName := exemplarConnectionConfig.Plugin
pluginInstance := exemplarConnectionConfig.PluginInstance
log.Printf("[INFO] initializePlugin %s pid %d (%p)", pluginName, client.ReattachConfig().Pid, req)
// build a client
pluginClient, err := sdkgrpc.NewPluginClient(client, pluginName)
if err != nil {
return nil, err
}
// fetch the supported operations
supportedOperations, _ := pluginClient.GetSupportedOperations()
// ignore errors - just create an empty support structure if needed
if supportedOperations == nil {
supportedOperations = &sdkproto.GetSupportedOperationsResponse{}
}
// if this plugin does not support multiple connections, we no longer support it
if !supportedOperations.MultipleConnections {
return nil, fmt.Errorf("%s", error_helpers.PluginSdkCompatibilityError)
}
// provide opportunity to avoid setting connection configs if we are shutting down
if m.isShuttingDown() {
log.Printf("[INFO] aborting plugin %s initialization - plugin manager is shutting down", pluginName)
return nil, fmt.Errorf("plugin manager is shutting down")
}
// send the connection config for all connections for this plugin
// this returns a list of all connections provided by this plugin
err = m.setAllConnectionConfigs(connectionConfigs, pluginClient, supportedOperations)
if err != nil {
log.Printf("[WARN] failed to set connection config for %s: %s", pluginName, err.Error())
return nil, err
}
// if this plugin supports setting cache options, do so
if supportedOperations.SetCacheOptions {
err = m.setCacheOptions(pluginClient)
if err != nil {
log.Printf("[WARN] failed to set cache options for %s: %s", pluginName, err.Error())
return nil, err
}
}
// if this plugin supports setting cache options, do so
if supportedOperations.RateLimiters {
err = m.setRateLimiters(pluginInstance, pluginClient)
if err != nil {
log.Printf("[WARN] failed to set rate limiters for %s: %s", pluginName, err.Error())
return nil, err
}
}
reattach := pb.NewReattachConfig(pluginName, client.ReattachConfig(), pb.SupportedOperationsFromSdk(supportedOperations), connectionNames)
// if this plugin has a dynamic schema, add connections to message server
err = m.notifyNewDynamicSchemas(pluginClient, exemplarConnectionConfig, connectionNames)
if err != nil {
return nil, err
}
log.Printf("[INFO] initializePlugin complete pid %d", client.ReattachConfig().Pid)
return reattach, nil
}
// return whether the plugin manager is shutting down
func (m *PluginManager) isShuttingDown() bool {
m.shutdownMut.RLock()
defer m.shutdownMut.RUnlock()
return m.shuttingDown
}
// populate map of connection configs for each plugin instance
func (m *PluginManager) populatePluginConnectionConfigs() {
m.pluginConnectionConfigMap = make(map[string][]*sdkproto.ConnectionConfig)
for _, config := range m.connectionConfigMap {
m.pluginConnectionConfigMap[config.PluginInstance] = append(m.pluginConnectionConfigMap[config.PluginInstance], config)
}
}
// populate map of connection configs for each plugin
func (m *PluginManager) setPluginCacheSizeMap() {
m.pluginCacheSizeMap = make(map[string]int64, len(m.pluginConnectionConfigMap))
// read the env var setting cache size
maxCacheSizeMb, _ := strconv.Atoi(os.Getenv(constants.EnvCacheMaxSize))
// get total connection count for this pluginInstance (excluding aggregators)
numConnections := m.nonAggregatorConnectionCount()
log.Printf("[TRACE] PluginManager setPluginCacheSizeMap: %d %s.", numConnections, utils.Pluralize("connection", numConnections))
log.Printf("[TRACE] Total cache size %dMb", maxCacheSizeMb)
for pluginInstance, connections := range m.pluginConnectionConfigMap {
var size int64 = 0
// if no max size is set, just set all plugins to zero (unlimited)
if maxCacheSizeMb > 0 {
// get connection count for this pluginInstance (excluding aggregators)
numPluginConnections := nonAggregatorConnectionCount(connections)
size = int64(float64(numPluginConnections) / float64(numConnections) * float64(maxCacheSizeMb))
// make this at least 1 Mb (as zero means unlimited)
if size == 0 {
size = 1
}
log.Printf("[INFO] Plugin '%s', %d %s, max cache size %dMb", pluginInstance, numPluginConnections, utils.Pluralize("connection", numPluginConnections), size)
}
m.pluginCacheSizeMap[pluginInstance] = size
}
}
func (m *PluginManager) notifyNewDynamicSchemas(pluginClient *sdkgrpc.PluginClient, exemplarConnectionConfig *sdkproto.ConnectionConfig, connectionNames []string) error {
// fetch the schema for the first connection so we know if it is dynamic
schema, err := pluginClient.GetSchema(exemplarConnectionConfig.Connection)
if err != nil {
log.Printf("[WARN] failed to set fetch schema for %s: %s", exemplarConnectionConfig, err.Error())
return err
}
if schema.Mode == sdkplugin.SchemaModeDynamic {
_ = m.messageServer.AddConnection(pluginClient, exemplarConnectionConfig.Plugin, connectionNames...)
}
return nil
}
func (m *PluginManager) waitForPluginLoad(p *runningPlugin, req *pb.GetRequest) error {
pluginConfig := m.plugins[p.pluginInstance]
if pluginConfig == nil {
// not expected
return sperr.New("plugin manager has no config for plugin instance %s", p.pluginInstance)
}
pluginStartTimeoutSecs := pluginConfig.GetStartTimeout()
if pluginStartTimeoutSecs == 0 {
if viper.IsSet(pconstants.ArgPluginStartTimeout) {
pluginStartTimeoutSecs = viper.GetInt64(pconstants.ArgPluginStartTimeout)
}
}
log.Printf("[TRACE] waitForPluginLoad: waiting %d seconds (%p)", pluginStartTimeoutSecs, req)
// wait for the plugin to be initialized
select {
case <-time.After(time.Duration(pluginStartTimeoutSecs) * time.Second):
log.Printf("[WARN] timed out waiting for %s to startup after %d seconds (%p)", p.pluginInstance, pluginStartTimeoutSecs, req)
// do not retry
return fmt.Errorf("timed out waiting for %s to startup after %d seconds (%p)", p.pluginInstance, pluginStartTimeoutSecs, req)
case <-p.initialized:
log.Printf("[TRACE] plugin initialized: pid %d (%p)", p.reattach.Pid, req)
case <-p.failed:
// reattach may be nil if plugin failed before it was set
if p.reattach != nil {
log.Printf("[TRACE] plugin pid %d failed %s (%p)", p.reattach.Pid, p.error.Error(), req)
} else {
log.Printf("[TRACE] plugin %s failed before reattach was set: %s (%p)", p.pluginInstance, p.error.Error(), req)
}
// get error from running plugin
return p.error
}
// now double-check the plugins process IS running
if !p.client.Exited() {
// so the plugin is good
log.Printf("[INFO] waitForPluginLoad: %s is now loaded and ready (%p)", p.pluginInstance, req)
return nil
}
// so even though our data structure indicates the plugin is running, the client says the underlying pid has exited
// - it must have terminated for some reason
log.Printf("[INFO] waitForPluginLoad: pid %d exists in runningPluginMap but pid has exited (%p)", p.reattach.Pid, req)
// remove this plugin from the map
// NOTE: multiple thread may be trying to remove the failed plugin from the map
// - and then someone will add a new running plugin when the startup is retried
// So we must check the pid before deleting
m.mut.Lock()
if r, ok := m.runningPluginMap[p.pluginInstance]; ok {
// is the running plugin we read from the map the same as our running plugin?
// if not, it must already have been removed by another thread - do nothing
if r == p {
log.Printf("[INFO] delete plugin %s from runningPluginMap (%p)", p.pluginInstance, req)
delete(m.runningPluginMap, p.pluginInstance)
}
}
m.mut.Unlock()
// so the pid does not exist
err := fmt.Errorf("PluginManager found pid %d for plugin '%s' in plugin map but plugin process does not exist (%p)", p.reattach.Pid, p.pluginInstance, req)
// we need to start the plugin again - make the error retryable
return retry.RetryableError(err)
}
// set connection config for multiple connection
// NOTE: we DO NOT set connection config for aggregator connections
func (m *PluginManager) setAllConnectionConfigs(connectionConfigs []*sdkproto.ConnectionConfig, pluginClient *sdkgrpc.PluginClient, supportedOperations *sdkproto.GetSupportedOperationsResponse) error {
// TODO does this fail all connections if one fails
exemplarConnectionConfig := connectionConfigs[0]
pluginInstance := exemplarConnectionConfig.PluginInstance
req := &sdkproto.SetAllConnectionConfigsRequest{
Configs: connectionConfigs,
// NOTE: set MaxCacheSizeMb to -1so that query cache is not created until we call SetCacheOptions (if supported)
MaxCacheSizeMb: -1,
}
// if plugin _does not_ support setting the cache options separately, pass the max size now
// (if it does support SetCacheOptions, it will be called after we return)
if !supportedOperations.SetCacheOptions {
req.MaxCacheSizeMb = m.pluginCacheSizeMap[pluginInstance]
}
_, err := pluginClient.SetAllConnectionConfigs(req)
return err
}
func (m *PluginManager) setCacheOptions(pluginClient *sdkgrpc.PluginClient) error {
req := &sdkproto.SetCacheOptionsRequest{
Enabled: viper.GetBool(pconstants.ArgServiceCacheEnabled),
Ttl: viper.GetInt64(pconstants.ArgCacheMaxTtl),
MaxSizeMb: viper.GetInt64(pconstants.ArgMaxCacheSizeMb),
}
_, err := pluginClient.SetCacheOptions(req)
return err
}
func (m *PluginManager) setRateLimiters(pluginInstance string, pluginClient *sdkgrpc.PluginClient) error {
log.Printf("[INFO] setRateLimiters for plugin '%s'", pluginInstance)
var defs []*sdkproto.RateLimiterDefinition
m.mut.RLock()
for _, l := range m.userLimiters[pluginInstance] {
defs = append(defs, RateLimiterAsProto(l))
}
m.mut.RUnlock()
req := &sdkproto.SetRateLimitersRequest{Definitions: defs}
_, err := pluginClient.SetRateLimiters(req)
return err
}
// update the schema for the specified connection
// called from the message server after receiving a PluginMessageType_SCHEMA_UPDATED message from plugin
func (m *PluginManager) updateConnectionSchema(ctx context.Context, connectionName string) {
log.Printf("[INFO] updateConnectionSchema connection %s", connectionName)
refreshResult := connection.RefreshConnections(ctx, m, connectionName)
if refreshResult.Error != nil {
log.Printf("[TRACE] error refreshing connections: %s", refreshResult.Error)
return
}
// also send a postgres notification
notification := steampipeconfig.NewSchemaUpdateNotification()
if m.pool == nil {
log.Printf("[WARN] cannot send schema update notification: pool is nil")
return
}
conn, err := m.pool.Acquire(ctx)
if err != nil {
log.Printf("[WARN] failed to send schema update notification: %s", err)
return
}
defer conn.Release()
err = db_local.SendPostgresNotification(ctx, conn.Conn(), notification)
if err != nil {
log.Printf("[WARN] failed to send schema update notification: %s", err)
}
}
func (m *PluginManager) nonAggregatorConnectionCount() int {
res := 0
for _, connections := range m.pluginConnectionConfigMap {
res += nonAggregatorConnectionCount(connections)
}
return res
}
// getPluginExemplarConnections returns a map of keyed by plugin full name with the value an exemplar connection
func (m *PluginManager) getPluginExemplarConnections() map[string]string {
res := make(map[string]string)
for _, c := range m.connectionConfigMap {
res[c.Plugin] = c.Connection
}
return res
}
func (m *PluginManager) tableExists(ctx context.Context, schema, table string) (bool, error) {
query := fmt.Sprintf(`SELECT EXISTS (
SELECT FROM
pg_tables
WHERE
schemaname = '%s' AND
tablename = '%s'
);`, schema, table)
row := m.pool.QueryRow(ctx, query)
var exists bool
err := row.Scan(&exists)
if err != nil {
return false, err
}
return exists, nil
}
func nonAggregatorConnectionCount(connections []*sdkproto.ConnectionConfig) int {
res := 0
for _, c := range connections {
if len(c.ChildConnections) == 0 {
res++
}
}
return res
}