Files
steampipe/pluginmanager/plugin_manager.go

634 lines
21 KiB
Go

package pluginmanager
import (
"fmt"
"github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin"
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v4/grpc"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v4/grpc/proto"
"github.com/turbot/steampipe/pkg/constants"
"log"
"os"
"os/exec"
"runtime/debug"
"strconv"
"strings"
"sync"
"time"
"github.com/turbot/go-kit/helpers"
sdkshared "github.com/turbot/steampipe-plugin-sdk/v4/grpc/shared"
"github.com/turbot/steampipe/pkg/utils"
"github.com/turbot/steampipe/pluginmanager/grpc/proto"
pluginshared "github.com/turbot/steampipe/pluginmanager/grpc/shared"
)
type runningPlugin struct {
client *plugin.Client
reattach *proto.ReattachConfig
initialized chan struct{}
}
// PluginManager is the real implementation of grpc.PluginManager
type PluginManager struct {
proto.UnimplementedPluginManagerServer
// map of running plugins keyed by plugin name
pluginMap map[string]*runningPlugin
// map of running plugins keyed by connection nasme
connectionPluginMap map[string]*runningPlugin
// map of connection configs, keyed by plugin name
pluginConnectionConfigMap map[string][]*sdkproto.ConnectionConfig
// map of connection configs, keyed by connection name
connectionConfigMap map[string]*sdkproto.ConnectionConfig
// map of max cache size, keyed by plugin name
pluginCacheSizeMap map[string]int64
mut sync.Mutex
logger hclog.Logger
}
func NewPluginManager(connectionConfig map[string]*sdkproto.ConnectionConfig, logger hclog.Logger) (*PluginManager, error) {
log.Printf("[TRACE] NewPluginManager")
pluginManager := &PluginManager{
logger: logger,
pluginMap: make(map[string]*runningPlugin),
connectionPluginMap: make(map[string]*runningPlugin),
connectionConfigMap: connectionConfig,
// pluginConnectionConfigMap is created by populatePluginConnectionConfigs
}
// populate plugin connection config map
pluginManager.populatePluginConnectionConfigs()
// determine cache size for each plugin
pluginManager.setPluginCacheSizeMap()
return pluginManager, nil
}
// plugin interface functions
func (m *PluginManager) Serve() {
// create a plugin map, using ourselves as the implementation
pluginMap := map[string]plugin.Plugin{
pluginshared.PluginName: &pluginshared.PluginManagerPlugin{Impl: m},
}
plugin.Serve(&plugin.ServeConfig{
HandshakeConfig: pluginshared.Handshake,
Plugins: pluginMap,
// enable gRPC serving for this plugin...
GRPCServer: plugin.DefaultGRPCServer,
})
}
func (m *PluginManager) Get(req *proto.GetRequest) (*proto.GetResponse, error) {
resp := &proto.GetResponse{
ReattachMap: make(map[string]*proto.ReattachConfig),
FailureMap: make(map[string]string),
}
log.Printf("[TRACE] PluginManager Get, connections: '%s'\n", req.Connections)
for _, connectionName := range req.Connections {
connectionConfig, err := m.getConnectionConfig(connectionName)
if err != nil {
return nil, err
}
pluginName := connectionConfig.Plugin
// have we already tried and failed to load this plugin - if so skip
if _, pluginAlreadyFailed := resp.FailureMap[pluginName]; pluginAlreadyFailed {
continue
}
reattach, err := m.getPlugin(connectionName)
if err != nil {
resp.FailureMap[pluginName] = err.Error()
} else {
resp.ReattachMap[connectionName] = reattach
}
}
log.Printf("[TRACE] PluginManager Get returning %+v", resp)
return resp, nil
}
func (m *PluginManager) SetConnectionConfigMap(configMap map[string]*sdkproto.ConnectionConfig) {
m.mut.Lock()
defer m.mut.Unlock()
names := make([]string, len(configMap))
idx := 0
for name := range configMap {
names[idx] = name
idx++
}
log.Printf("[TRACE] SetConnectionConfigMap: %s", strings.Join(names, ","))
err := m.handleConnectionConfigChanges(configMap)
if err != nil {
log.Printf("[WARN] handleConnectionConfigChanges returned error: %s", err.Error())
}
}
func (m *PluginManager) handleConnectionConfigChanges(configMap map[string]*sdkproto.ConnectionConfig) error {
// now determine whether there are any new or deleted connections
addedConnections, deletedConnections, changedConnections := m.getConnectionChanges(configMap)
requestMap := make(map[string]*sdkproto.UpdateConnectionConfigsRequest)
// for deleted connections, remove from plugins and pluginConnectionConfigs
m.handleDeletedConnections(deletedConnections, requestMap)
// for new connections, add to plugins and pluginConnectionConfigs
m.handleAddedConnections(addedConnections, requestMap)
// for updated connections just add to request map
m.handleUpdatedConnections(changedConnections, requestMap)
// update connectionConfigMap
m.connectionConfigMap = configMap
// rebuild pluginConnectionConfigMap
m.populatePluginConnectionConfigs()
// now send UpdateConnectionConfigs for all update plugins
return m.sendUpdateConnectionConfigs(requestMap)
}
func (m *PluginManager) sendUpdateConnectionConfigs(requestMap map[string]*sdkproto.UpdateConnectionConfigsRequest) error {
var errors []error
for plugin, req := range requestMap {
runningPlugin, pluginAlreadyRunning := m.pluginMap[plugin]
if !pluginAlreadyRunning {
// not expected
continue
}
pluginClient, err := sdkgrpc.NewPluginClient(runningPlugin.client, plugin)
if err != nil {
errors = append(errors, err)
continue
}
err = pluginClient.UpdateConnectionConfigs(req)
if err != nil {
errors = append(errors, err)
}
}
return utils.CombineErrors(errors...)
}
// this mutates requestMap
func (m *PluginManager) handleAddedConnections(addedConnections map[string][]*sdkproto.ConnectionConfig, requestMap map[string]*sdkproto.UpdateConnectionConfigsRequest) {
// for new connections, add to plugins , pluginConnectionConfigs and connectionConfig
// (but only if the plugin is already started - if not we do nothing here - refreshConnections will start the plugin)
for p, connections := range addedConnections {
// find the existing running plugin for this plugin
// if this plugins is NOT running, skip here - we will start it when running refreshConnections
runningPlugin, pluginAlreadyRunning := m.pluginMap[p]
if !pluginAlreadyRunning {
log.Printf("[TRACE] handleAddedConnections - plugin '%s' has been added to connection config and is not running - doing nothing here as it will be started by refreshConnections", p)
continue
}
// get or create req for this plugin
req, ok := requestMap[p]
if !ok {
req = &sdkproto.UpdateConnectionConfigsRequest{}
}
for _, connection := range connections {
// add this connection to the running plugin
runningPlugin.reattach.AddConnection(connection.Connection)
// add to updateConnectionConfigsRequest
req.Added = append(req.Added, connection)
// add this connection to connection-running plugin map
m.connectionPluginMap[connection.Connection] = runningPlugin
}
// write back to map
requestMap[p] = req
}
}
// this mutates requestMap
func (m *PluginManager) handleDeletedConnections(deletedConnections map[string][]*sdkproto.ConnectionConfig, requestMap map[string]*sdkproto.UpdateConnectionConfigsRequest) {
for p, connections := range deletedConnections {
runningPlugin, pluginAlreadyRunning := m.pluginMap[p]
if !pluginAlreadyRunning {
continue
}
// get or create req for this plugin
req, ok := requestMap[p]
if !ok {
req = &sdkproto.UpdateConnectionConfigsRequest{}
}
for _, connection := range connections {
// remove this connection from the running plugin
runningPlugin.reattach.RemoveConnection(connection.Connection)
// add to updateConnectionConfigsRequest
req.Deleted = append(req.Deleted, connection)
// remove this connection from connection plugin map
delete(m.connectionPluginMap, connection.Connection)
}
// write back to map
requestMap[p] = req
}
}
// this mutates requestMap
func (m *PluginManager) handleUpdatedConnections(updatedConnections map[string][]*sdkproto.ConnectionConfig, requestMap map[string]*sdkproto.UpdateConnectionConfigsRequest) {
// for new connections, add to plugins , pluginConnectionConfigs and connectionConfig
// (but only if the plugin is already started - if not we do nothing here - refreshConnections will start the plugin)
for p, connections := range updatedConnections {
// get or create req for this plugin
req, ok := requestMap[p]
if !ok {
req = &sdkproto.UpdateConnectionConfigsRequest{}
}
for _, connection := range connections {
// add to updateConnectionConfigsRequest
req.Changed = append(req.Added, connection)
}
// write back to map
requestMap[p] = req
}
}
func (m *PluginManager) Shutdown(req *proto.ShutdownRequest) (resp *proto.ShutdownResponse, err error) {
log.Printf("[TRACE] PluginManager Shutdown")
debug.PrintStack()
m.mut.Lock()
defer func() {
m.mut.Unlock()
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
for name, p := range m.connectionPluginMap {
if p.client == nil {
log.Printf("[WARN] plugin %s has no client - cannot kill", name)
// shouldn't happen but has been observed in error situations
continue
}
log.Printf("[TRACE] killing plugin %s (%v)", name, p.reattach.Pid)
p.client.Kill()
}
return &proto.ShutdownResponse{}, nil
}
func (m *PluginManager) getConnectionConfig(connectionName string) (*sdkproto.ConnectionConfig, error) {
connectionConfig, ok := m.connectionConfigMap[connectionName]
if !ok {
return nil, fmt.Errorf("plugin manager: no connection config loaded for connection '%s'", connectionName)
}
return connectionConfig, nil
}
func (m *PluginManager) getPlugin(connectionName string) (_ *proto.ReattachConfig, err error) {
defer func() {
if r := recover(); r != nil {
err = helpers.ToError(r)
}
}()
log.Printf("[TRACE] PluginManager getPlugin connection '%s'\n", connectionName)
// reason for starting the plugin (if we need to
var reason string
// is this plugin already running
// lock access to plugin map
m.mut.Lock()
p, ok := m.connectionPluginMap[connectionName]
if ok {
// unlock access to map
m.mut.Unlock()
// so we have the plugin in our map - is it started?
err = m.waitForPluginLoad(connectionName, p)
if err != nil {
return nil, err
}
log.Printf("[TRACE] connection %s is loaded, check for running PID", connectionName)
// ok so the plugin should now be running
// now check if the plugins process IS running
reattach := p.reattach
// check the pid exists
exists, _ := utils.PidExists(int(reattach.Pid))
if exists {
// so the plugin is good
log.Printf("[TRACE] PluginManager found '%s' in map, pid %d", connectionName, reattach.Pid)
// return the reattach config
return reattach, nil
}
// either the pid does not exist or the plugin has exited
// remove from map
m.mut.Lock()
delete(m.connectionPluginMap, connectionName)
m.connectionPluginMap[connectionName] = &runningPlugin{
initialized: make(chan struct{}, 1),
}
m.mut.Unlock()
// update reason
reason = fmt.Sprintf("PluginManager found pid %d for connection '%s' in plugin map but plugin process does not exist - killing client and removing from map", reattach.Pid, connectionName)
} else {
// so the plugin is NOT loaded or loading - this is the first time anyone has requested this connection
// put in a placeholder so no other thread tries to create start this plugin
m.connectionPluginMap[connectionName] = &runningPlugin{
initialized: make(chan struct{}, 1),
}
// unlock access to map
m.mut.Unlock()
reason = fmt.Sprintf("PluginManager %p '%s' NOT found in map - starting", m, connectionName)
}
// NOTE: It is an error to try to start a plugin which is already running
// this may happen if the file watcher has been triggered by a connection being added for an existing plugin
// if this happened, the plugin manager should ALREADY have called UpdateConnectionConfig to send the config
// for the new connection to the plugin
// TODO ADD CHECK THAT PLUGIN IS NOT ALREADY RUNNING
// fall through to plugin startup
// log the startup reason
log.Printf("[TRACE] %s", reason)
// so we need to start the plugin
client, reattach, err := m.startPlugin(connectionName)
if err != nil {
m.mut.Lock()
delete(m.connectionPluginMap, connectionName)
m.mut.Unlock()
log.Println("[TRACE] startPlugin failed with", err)
return nil, err
}
// store the client to our map
m.storeClientToMap(connectionName, client, reattach)
log.Printf("[TRACE] PluginManager getPlugin complete, returning reattach config with PID: %d", reattach.Pid)
// and return
return reattach, nil
}
// create reattach config for plugin, store to map and close initialized channel
func (m *PluginManager) storeClientToMap(connection string, client *plugin.Client, reattach *proto.ReattachConfig) {
// lock access to map
m.mut.Lock()
defer m.mut.Unlock()
// a RunningPlugin in initializing state will already have been put into the Plugins map
// populate its properties
p := m.connectionPluginMap[connection]
p.client = client
p.reattach = reattach
// store fully initialised runningPlugin to pluginMap
m.pluginMap[reattach.Plugin] = p
// NOTE: if this plugin supports multiple connections, reattach.Connections will be a list of all connections
// provided by this plugin
// add map entries for all other connections using this plugin (all pointing to same RunningPlugin)
for _, c := range reattach.Connections {
m.connectionPluginMap[c] = p
}
// mark as initialized
close(p.initialized)
}
// populate map of connection configs for each plugin
func (m *PluginManager) populatePluginConnectionConfigs() {
m.pluginConnectionConfigMap = make(map[string][]*sdkproto.ConnectionConfig)
for _, config := range m.connectionConfigMap {
m.pluginConnectionConfigMap[config.Plugin] = append(m.pluginConnectionConfigMap[config.Plugin], config)
}
}
// repopulate the map of running plugins keyed by connection
// (this is called when connections have been added or removed)
func (m *PluginManager) populateConnectionPluginMap() {
m.connectionPluginMap = make(map[string]*runningPlugin)
// iterate through our map of connection configs keyed by plugin
for pluginName := range m.pluginConnectionConfigMap {
if p, ok := m.pluginMap[pluginName]; ok {
for _, c := range p.reattach.Connections {
m.connectionPluginMap[c] = p
}
}
}
}
// populate map of connection configs for each plugin
func (m *PluginManager) setPluginCacheSizeMap() {
m.pluginCacheSizeMap = make(map[string]int64, len(m.pluginConnectionConfigMap))
// read the env var setting cache size
maxCacheSizeMb, _ := strconv.Atoi(os.Getenv(constants.EnvMaxCacheSize))
// get total connection count for this plugin (excluding aggregators)
numConnections := m.nonAggregatorConnectionCount()
log.Printf("[WARN] PluginManager setPluginCacheSizeMap: %d %s.", numConnections, utils.Pluralize("connection", numConnections))
log.Printf("[WARN] Total cache size %dMb", maxCacheSizeMb)
for plugin, connections := range m.pluginConnectionConfigMap {
var size int64 = 0
// if no max size is set, just set all plugins to zero (unlimited)
if maxCacheSizeMb > 0 {
// get connection count for this plugin (excluding aggregators)
numPluginConnections := nonAggregatorConnectionCount(connections)
size = int64(float64(numPluginConnections) / float64(numConnections) * float64(maxCacheSizeMb))
// make this at least 1 Mb (as zero means unlimited)
if size == 0 {
size = 1
}
log.Printf("[WARN] Plugin '%s', %d %s, max cache size %dMb", plugin, numPluginConnections, utils.Pluralize("connection", numPluginConnections), size)
}
m.pluginCacheSizeMap[plugin] = size
}
}
func (m *PluginManager) startPlugin(connectionName string) (_ *plugin.Client, _ *proto.ReattachConfig, err error) {
log.Printf("[TRACE] ************ start plugin %s ********************\n", connectionName)
// get connection config
connectionConfig, err := m.getConnectionConfig(connectionName)
if err != nil {
return nil, nil, err
}
pluginPath, err := GetPluginPath(connectionConfig.Plugin, connectionConfig.PluginShortName)
if err != nil {
return nil, nil, err
}
// create the plugin map
pluginName := connectionConfig.Plugin
pluginMap := map[string]plugin.Plugin{
pluginName: &sdkshared.WrapperPlugin{},
}
cmd := exec.Command(pluginPath)
client := plugin.NewClient(&plugin.ClientConfig{
HandshakeConfig: sdkshared.Handshake,
Plugins: pluginMap,
Cmd: cmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC},
// pass our logger to the plugin client to ensure plugin logs end up in logfile
Logger: m.logger,
})
if _, err := client.Start(); err != nil {
return nil, nil, err
}
// ensure we shut down in case of failure
defer func() {
if err != nil {
// we failed - shut down the plugin again
client.Kill()
}
}()
// get the supported operations
pluginClient, err := sdkgrpc.NewPluginClient(client, pluginName)
if err != nil {
return nil, nil, err
}
// fetch the supported operations
supportedOperations, _ := pluginClient.GetSupportedOperations()
// ignore errors - just create an empty support structure if needed
if supportedOperations == nil {
supportedOperations = &sdkproto.GetSupportedOperationsResponse{}
}
log.Printf("[TRACE] supportedOperations: %v", supportedOperations)
var connections = []string{connectionName}
if supportedOperations.MultipleConnections {
// send the connection config for all connections for this plugin
// this returns a list of all connections provided by this plugin
connections, err = m.setAllConnectionConfigs(pluginClient, pluginName)
} else {
// send the connection config using legacy single connection function
err = m.setSingleConnectionConfig(pluginClient, connectionName)
}
if err != nil {
log.Printf("[WARN] failed to set connection config: %s", err.Error())
return nil, nil, err
}
reattach := proto.NewReattachConfig(pluginName, client.ReattachConfig(), proto.SupportedOperationsFromSdk(supportedOperations), connections)
return client, reattach, nil
}
func (m *PluginManager) waitForPluginLoad(connection string, p *runningPlugin) error {
pluginStartTimeoutSecs := 5
select {
case <-p.initialized:
log.Printf("[TRACE] initialized: %d", p.reattach.Pid)
log.Printf("[TRACE] initialized: %d", p.reattach.Pid)
return nil
case <-time.After(time.Duration(pluginStartTimeoutSecs) * time.Second):
return fmt.Errorf("timed out waiting for %s to startup after %d seconds", connection, pluginStartTimeoutSecs)
}
}
func (m *PluginManager) getConnectionsForPlugin(pluginName string) []string {
var res = make([]string, len(m.pluginConnectionConfigMap[pluginName]))
for i, c := range m.pluginConnectionConfigMap[pluginName] {
res[i] = c.Connection
}
return res
}
// set connection config for multiple connection, for compatible plugins
// NOTE: we DO NOT set connection config for aggregator connections
func (m *PluginManager) setAllConnectionConfigs(pluginClient *sdkgrpc.PluginClient, pluginName string) ([]string, error) {
configs, ok := m.pluginConnectionConfigMap[pluginName]
if !ok {
// should never happen
return nil, fmt.Errorf("no config loaded for plugin '%s'", pluginName)
}
req := &sdkproto.SetAllConnectionConfigsRequest{
Configs: configs,
MaxCacheSizeMb: m.pluginCacheSizeMap[pluginName],
}
// build list of connections
connections := make([]string, len(configs))
for i, config := range configs {
connections[i] = config.Connection
}
return connections, pluginClient.SetAllConnectionConfigs(req)
}
// set connection config for single connection, for legacy plugins)
func (m *PluginManager) setSingleConnectionConfig(pluginClient *sdkgrpc.PluginClient, connectionName string) error {
connectionConfig, err := m.getConnectionConfig(connectionName)
if err != nil {
return err
}
// set the connection config
req := &sdkproto.SetConnectionConfigRequest{
ConnectionName: connectionName,
ConnectionConfig: connectionConfig.Config,
}
return pluginClient.SetConnectionConfig(req)
}
func (m *PluginManager) getConnectionChanges(newConfigMap map[string]*sdkproto.ConnectionConfig) (addedConnections, deletedConnections, changedConnections map[string][]*sdkproto.ConnectionConfig) {
// results are maps os connections keyed by plugin
addedConnections = make(map[string][]*sdkproto.ConnectionConfig)
deletedConnections = make(map[string][]*sdkproto.ConnectionConfig)
changedConnections = make(map[string][]*sdkproto.ConnectionConfig)
for currentName, currentConnection := range m.connectionConfigMap {
if newConnection, ok := newConfigMap[currentName]; !ok {
deletedConnections[currentConnection.Plugin] = append(deletedConnections[currentConnection.Plugin], currentConnection)
} else if !currentConnection.Equals(newConnection) {
changedConnections[currentConnection.Plugin] = append(changedConnections[currentConnection.Plugin], currentConnection)
}
}
for newName, newConnection := range newConfigMap {
if _, ok := m.connectionConfigMap[newName]; !ok {
addedConnections[newConnection.Plugin] = append(addedConnections[newConnection.Plugin], newConnection)
}
}
return
}
func (m *PluginManager) nonAggregatorConnectionCount() int {
res := 0
for _, connections := range m.pluginConnectionConfigMap {
res += nonAggregatorConnectionCount(connections)
}
return res
}
func nonAggregatorConnectionCount(connections []*sdkproto.ConnectionConfig) int {
res := 0
for _, c := range connections {
if len(c.ChildConnections) == 0 {
res++
}
}
return res
}