diff --git a/pkg/connection/config_map.go b/pkg/connection/config_map.go index 9f9890426..67863c4d1 100644 --- a/pkg/connection/config_map.go +++ b/pkg/connection/config_map.go @@ -12,8 +12,8 @@ func NewConnectionConfigMap(connectionMap map[string]*modconfig.Connection) Conn for k, v := range connectionMap { configMap[k] = &sdkproto.ConnectionConfig{ Connection: v.Name, - Plugin: v.PluginLongName, - PluginShortName: v.PluginShortName, + Plugin: v.Plugin, + PluginShortName: v.PluginAlias, Config: v.Config, ChildConnections: v.GetResolveConnectionNames(), } diff --git a/pkg/connection/limiter_map.go b/pkg/connection/limiter_map.go index 18f9a6f28..953e3d100 100644 --- a/pkg/connection/limiter_map.go +++ b/pkg/connection/limiter_map.go @@ -5,25 +5,7 @@ import ( "golang.org/x/exp/maps" ) -// map of plugin short name to Limiter map for the plugin -type PluginLimiterMap map[string]LimiterMap - -func (l PluginLimiterMap) Equals(other PluginLimiterMap) bool { - return maps.EqualFunc(l, other, func(m1, m2 LimiterMap) bool { return m1.Equals(m2) }) -} - -type PluginMap map[string]*modconfig.Plugin - -func (p PluginMap) ToPluginLimiterMap() PluginLimiterMap { - var limiterPluginMap = make(PluginLimiterMap) - for name, p := range p { - if len(p.Limiters) > 0 { - limiterPluginMap[name] = NewLimiterMap(p.Limiters) - } - } - return limiterPluginMap -} - +// LimiterMap is a map of limiter name to limiter definition type LimiterMap map[string]*modconfig.RateLimiter func NewLimiterMap(limiters []*modconfig.RateLimiter) LimiterMap { @@ -37,7 +19,7 @@ func (l LimiterMap) Equals(other LimiterMap) bool { return maps.EqualFunc(l, other, func(l1, l2 *modconfig.RateLimiter) bool { return l1.Equals(l2) }) } -// ToPluginLimiterMap converts limiter map keyed by limiter name to a map of limiter maps keyed by plugin +// ToPluginLimiterMap converts limiter map keyed by limiter name to a map of limiter maps keyed by plugin image ref func (l LimiterMap) ToPluginLimiterMap() PluginLimiterMap { res := make(PluginLimiterMap) for name, limiter := range l { diff --git a/pkg/connection/plugin_limiter_map.go b/pkg/connection/plugin_limiter_map.go new file mode 100644 index 000000000..6848a2778 --- /dev/null +++ b/pkg/connection/plugin_limiter_map.go @@ -0,0 +1,25 @@ +package connection + +import ( + "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" + "golang.org/x/exp/maps" +) + +// PluginLimiterMap map of plugin image ref to Limiter map for the plugin +type PluginLimiterMap map[string]LimiterMap + +func (l PluginLimiterMap) Equals(other PluginLimiterMap) bool { + return maps.EqualFunc(l, other, func(m1, m2 LimiterMap) bool { return m1.Equals(m2) }) +} + +type PluginMap map[string]*modconfig.Plugin + +func (p PluginMap) ToPluginLimiterMap() PluginLimiterMap { + var limiterPluginMap = make(PluginLimiterMap) + for imageRef, p := range p { + if len(p.Limiters) > 0 { + limiterPluginMap[imageRef] = NewLimiterMap(p.Limiters) + } + } + return limiterPluginMap +} diff --git a/pkg/connection_state/connection_state_table_sql.go b/pkg/connection_state/connection_state_table_sql.go index 267d5cb6f..cdaf895a9 100644 --- a/pkg/connection_state/connection_state_table_sql.go +++ b/pkg/connection_state/connection_state_table_sql.go @@ -184,7 +184,7 @@ VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,now(),now()) c.Type, c.ImportSchema, nil, - c.PluginLongName, + c.Plugin, schemaMode, schemaHash, commentsSet, diff --git a/pkg/constants/db.go b/pkg/constants/db.go index abaf63d2e..1d12599ec 100644 --- a/pkg/constants/db.go +++ b/pkg/constants/db.go @@ -28,7 +28,7 @@ const ( // constants for installing db and fdw images const ( DatabaseVersion = "14.2.0" - FdwVersion = "1.8.0-rc.6" + FdwVersion = "1.8.0-rc.7" // PostgresImageRef is the OCI Image ref for the database binaries PostgresImageRef = "us-docker.pkg.dev/steampipe/steampipe/db:14.2.0" diff --git a/pkg/ociinstaller/imageref.go b/pkg/ociinstaller/imageref.go index bbe087ac9..8c5899673 100644 --- a/pkg/ociinstaller/imageref.go +++ b/pkg/ociinstaller/imageref.go @@ -16,12 +16,12 @@ const ( DefaultImageType = "plugins" ) -// SteampipeImageRef :: a ref to an OCI image +// SteampipeImageRef a struct encapsulating a ref to an OCI image type SteampipeImageRef struct { requestedRef string } -// NewSteampipeImageRef :: creates and returns a New SteampipeImageRef +// NewSteampipeImageRef creates and returns a New SteampipeImageRef func NewSteampipeImageRef(ref string) *SteampipeImageRef { ref = sanitizeRefStream(ref) return &SteampipeImageRef{ @@ -29,7 +29,7 @@ func NewSteampipeImageRef(ref string) *SteampipeImageRef { } } -// ActualImageRef :: returns the actual, physical full image ref +// ActualImageRef returns the actual, physical full image ref // (us-docker.pkg.dev/steampipe/plugins/turbot/aws:1.0.0) func (r *SteampipeImageRef) ActualImageRef() string { ref := r.requestedRef @@ -47,7 +47,7 @@ func (r *SteampipeImageRef) ActualImageRef() string { return fullRef } -// DisplayImageRef :: returns the "friendly" user-facing full image ref +// DisplayImageRef returns the "friendly" user-facing full image ref // (hub.steampipe.io/plugins/turbot/aws@1.0.0) func (r *SteampipeImageRef) DisplayImageRef() string { fullRef := r.ActualImageRef() @@ -84,8 +84,7 @@ func sanitizeRefStream(ref string) string { return ref } -// GetOrgNameAndStream :: splits the full image reference into -// (org, name, stream) +// GetOrgNameAndStream splits the full image reference into (org, name, stream) func (r *SteampipeImageRef) GetOrgNameAndStream() (string, string, string) { // plugin.Name looks like `hub.steampipe.io/plugins/turbot/aws@latest` split := strings.Split(r.DisplayImageRef(), "/") @@ -96,6 +95,18 @@ func (r *SteampipeImageRef) GetOrgNameAndStream() (string, string, string) { return strings.Join(split[0:len(split)-2], "/"), pluginNameAndStream[0], pluginNameAndStream[1] } +// GetFriendlyName returns a friendly name: +// hub.steampipe.io/plugins/turbot/aws@1.0.0 => aws@1.0.0 +// hub.steampipe.io/plugins/turbot/aws@latest => aws +func (r *SteampipeImageRef) GetFriendlyName() string { + _, pluginName, pluginStream := r.GetOrgNameAndStream() + if pluginStream == DefaultImageTag { + return pluginName + } else { + return fmt.Sprintf("%s@%s", pluginName, pluginStream) + } +} + // possible formats include // us-docker.pkg.dev/steampipe/plugin/turbot/aws:1.0.0 // us-docker.pkg.dev/steampipe/plugin/turbot/aws@sha256:766389c9dd892132c7e7b9124f446b9599a80863d466cd1d333a167dedf2c2b1 diff --git a/pkg/pluginmanager_service/plugin_manager.go b/pkg/pluginmanager_service/plugin_manager.go index bb5d907f3..0d53fd3f3 100644 --- a/pkg/pluginmanager_service/plugin_manager.go +++ b/pkg/pluginmanager_service/plugin_manager.go @@ -61,19 +61,16 @@ type PluginManager struct { logger hclog.Logger messageServer *PluginMessageServer - // map of user configured rate limiter maps, keyed by plugin short name + // map of user configured rate limiter maps, keyed by plugin image ref // NOTE: this is populated from config userLimiters connection.PluginLimiterMap - // map of plugin configured rate limiter maps, keyed by plugin short name + // map of plugin configured rate limiter maps (keyed by plugin image ref) // NOTE: if this is nil, that means the steampipe_rate_limiter tables has not been populalated yet - // the first time we refresh connections we must load all plugins and fetch their rate limiter defs pluginLimiters connection.PluginLimiterMap - // map of plugin configs + // map of plugin configs (keyed by plugin image ref) plugins connection.PluginMap - // map of plugin short name to long name - pluginShortToLongNameMap map[string]string - pluginLongToShortNameMap map[string]string pool *pgxpool.Pool } @@ -81,18 +78,15 @@ type PluginManager struct { func NewPluginManager(ctx context.Context, connectionConfig map[string]*sdkproto.ConnectionConfig, pluginConfigs connection.PluginMap, logger hclog.Logger) (*PluginManager, error) { log.Printf("[INFO] NewPluginManager") pluginManager := &PluginManager{ - logger: logger, - runningPluginMap: make(map[string]*runningPlugin), - connectionConfigMap: connectionConfig, - userLimiters: pluginConfigs.ToPluginLimiterMap(), - plugins: pluginConfigs, - pluginShortToLongNameMap: make(map[string]string), - pluginLongToShortNameMap: make(map[string]string), + logger: logger, + runningPluginMap: make(map[string]*runningPlugin), + connectionConfigMap: connectionConfig, + userLimiters: pluginConfigs.ToPluginLimiterMap(), + plugins: pluginConfigs, } pluginManager.messageServer = &PluginMessageServer{pluginManager: pluginManager} - //time.Sleep(10 * time.Second) // populate plugin connection config map pluginManager.populatePluginConnectionConfigs() // determine cache size for each plugin @@ -463,7 +457,7 @@ func (m *PluginManager) startPluginProcess(pluginName string, connectionConfigs cmd := exec.Command(pluginPath) // see if a plugin config was specified - if so, get the max memory to allow the plugin - pluginConfig := m.plugins[exemplarConnectionConfig.PluginShortName] + pluginConfig := m.plugins[exemplarConnectionConfig.Plugin] // must be there if pluginConfig == nil { return nil, sperr.New("no plugin config is stored for plugin %s", exemplarConnectionConfig.PluginShortName) @@ -597,9 +591,6 @@ func (m *PluginManager) populatePluginConnectionConfigs() { m.pluginConnectionConfigMap = make(map[string][]*sdkproto.ConnectionConfig) for _, config := range m.connectionConfigMap { m.pluginConnectionConfigMap[config.Plugin] = append(m.pluginConnectionConfigMap[config.Plugin], config) - // populate plugin name map - m.pluginShortToLongNameMap[config.PluginShortName] = config.Plugin - m.pluginLongToShortNameMap[config.Plugin] = config.PluginShortName } } diff --git a/pkg/pluginmanager_service/plugin_manager_rate_limiters.go b/pkg/pluginmanager_service/plugin_manager_rate_limiters.go index 9bbec35b4..0ef9533e3 100644 --- a/pkg/pluginmanager_service/plugin_manager_rate_limiters.go +++ b/pkg/pluginmanager_service/plugin_manager_rate_limiters.go @@ -11,6 +11,7 @@ import ( "github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/db/db_common" "github.com/turbot/steampipe/pkg/db/db_local" + "github.com/turbot/steampipe/pkg/ociinstaller" pb "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/proto" "github.com/turbot/steampipe/pkg/rate_limiters" "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" @@ -27,7 +28,7 @@ func (m *PluginManager) ShouldFetchRateLimiterDefs() bool { // for all plugins with changed limiters func (m *PluginManager) HandlePluginLimiterChanges(newLimiters connection.PluginLimiterMap) error { if m.pluginLimiters == nil { - // this must be the first time we have poplkated them + // this must be the first time we have populated them m.pluginLimiters = make(connection.PluginLimiterMap) } for plugin, limitersForPlugin := range newLimiters { @@ -109,13 +110,9 @@ func (m *PluginManager) handleUserLimiterChanges(plugins connection.PluginMap) e func (m *PluginManager) setRateLimitersForPlugin(pluginShortName string) error { // get running plugin for this plugin - // if plugin is not running we have nothing to do - longName, ok := m.pluginShortToLongNameMap[pluginShortName] - if !ok { - log.Printf("[INFO] handleUserLimiterChanges: plugin %s is not currently running - ignoring", pluginShortName) - return nil - } - runningPlugin, ok := m.runningPluginMap[longName] + imageRef := ociinstaller.NewSteampipeImageRef(pluginShortName).DisplayImageRef() + + runningPlugin, ok := m.runningPluginMap[imageRef] if !ok { log.Printf("[INFO] handleUserLimiterChanges: plugin %s is not currently running - ignoring", pluginShortName) return nil @@ -125,13 +122,13 @@ func (m *PluginManager) setRateLimitersForPlugin(pluginShortName string) error { return nil } - pluginClient, err := sdkgrpc.NewPluginClient(runningPlugin.client, longName) + pluginClient, err := sdkgrpc.NewPluginClient(runningPlugin.client, imageRef) if err != nil { - return sperr.WrapWithMessage(err, "failed to create a plugin client when updating the rate limiter for plugin '%s'", longName) + return sperr.WrapWithMessage(err, "failed to create a plugin client when updating the rate limiter for plugin '%s'", imageRef) } if err := m.setRateLimiters(pluginShortName, pluginClient); err != nil { - return sperr.WrapWithMessage(err, "failed to update rate limiters for plugin '%s'", longName) + return sperr.WrapWithMessage(err, "failed to update rate limiters for plugin '%s'", imageRef) } return nil } @@ -318,26 +315,24 @@ func (m *PluginManager) LoadPluginRateLimiters(pluginConnectionMap map[string]st if rateLimiterResp == nil || rateLimiterResp.Definitions == nil { continue } - // populate the plugin name - pluginShortName := m.pluginLongToShortNameMap[reattach.Plugin] limitersForPlugin := make(connection.LimiterMap) for _, l := range rateLimiterResp.Definitions { - r, err := modconfig.RateLimiterFromProto(l) + r, err := modconfig.RateLimiterFromProto(l, reattach.Plugin) if err != nil { errors = append(errors, sperr.WrapWithMessage(err, "failed to create rate limiter %s from plugin definition", err)) continue } - r.Plugin = pluginShortName + // set plugin as source r.Source = modconfig.LimiterSourcePlugin - // derfaulty status to active + // default status to active r.Status = modconfig.LimiterStatusActive // add to map limitersForPlugin[l.Name] = r } // store back - res[pluginShortName] = limitersForPlugin + res[reattach.Plugin] = limitersForPlugin } return res, nil diff --git a/pkg/steampipeconfig/connection_plugin.go b/pkg/steampipeconfig/connection_plugin.go index 523ae0714..d6d85d652 100644 --- a/pkg/steampipeconfig/connection_plugin.go +++ b/pkg/steampipeconfig/connection_plugin.go @@ -123,7 +123,7 @@ func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectio res.AddWarning(fmt.Sprintf("failed to start plugin '%s': %s", failedPlugin, failure)) // figure out which connections are provided by any failed plugins for _, c := range connectionsToCreate { - if c.PluginLongName == failedPlugin { + if c.Plugin == failedPlugin { res.AddFailedConnection(c.Name, constants.ConnectionErrorPluginFailedToStart) } } @@ -134,8 +134,8 @@ func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectio for _, connection := range connectionsToCreate { // is this connection provided by a plugin we have already instantiated? - if existingConnectionPlugin, ok := multiConnectionPlugins[connection.PluginLongName]; ok { - log.Printf("[TRACE] CreateConnectionPlugins - connection %s is provided by existing connectionPlugin %s - reusing", connection.Name, connection.PluginLongName) + if existingConnectionPlugin, ok := multiConnectionPlugins[connection.Plugin]; ok { + log.Printf("[TRACE] CreateConnectionPlugins - connection %s is provided by existing connectionPlugin %s - reusing", connection.Name, connection.Plugin) // store the existing connection plugin in the result map requestedConnectionPluginMap[connection.Name] = existingConnectionPlugin continue @@ -143,7 +143,7 @@ func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectio // do we have a reattach config for this connection's plugin if _, ok := getResponse.ReattachMap[connection.Name]; !ok { - log.Printf("[TRACE] CreateConnectionPlugins skipping connection '%s', plugin '%s' as plugin manager failed to start it", connection.Name, connection.PluginLongName) + log.Printf("[TRACE] CreateConnectionPlugins skipping connection '%s', plugin '%s' as plugin manager failed to start it", connection.Name, connection.Plugin) continue } @@ -157,13 +157,13 @@ func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectio connectionPlugin, err := createConnectionPlugin(connection, reattach) if err != nil { - res.AddWarning(fmt.Sprintf("failed to start plugin '%s': %s", connection.PluginShortName, err)) + res.AddWarning(fmt.Sprintf("failed to start plugin '%s': %s", connection.PluginAlias, err)) continue } requestedConnectionPluginMap[connection.Name] = connectionPlugin if connectionPlugin.SupportedOperations.MultipleConnections { // if it supports multiple connections, store in multiConnectionPlugins too - multiConnectionPlugins[connection.PluginLongName] = connectionPlugin + multiConnectionPlugins[connection.Plugin] = connectionPlugin } } log.Printf("[TRACE] all connection plugins created, populating schemas") @@ -254,7 +254,7 @@ func fullConnectionPluginMap(sparseConnectionPluginMap map[string]*ConnectionPlu // createConnectionPlugin sttaches to the plugin process func createConnectionPlugin(connection *modconfig.Connection, reattach *proto.ReattachConfig) (*ConnectionPlugin, error) { log.Printf("[TRACE] createConnectionPlugin for connection %s", connection.Name) - pluginName := connection.PluginLongName + pluginName := connection.Plugin connectionName := connection.Name log.Printf("[TRACE] plugin manager returned reattach config for connection '%s' - pid %d", @@ -275,7 +275,7 @@ func createConnectionPlugin(connection *modconfig.Connection, reattach *proto.Re log.Printf("[TRACE] plugin client created for %s", pluginName) // now create ConnectionPlugin object return - connectionPlugin := NewConnectionPlugin(connection.PluginShortName, pluginName, pluginClient, reattach.SupportedOperations) + connectionPlugin := NewConnectionPlugin(connection.PluginAlias, pluginName, pluginClient, reattach.SupportedOperations) // if multiple connections are NOT supported, add the config for our one and only connection if reattach.SupportedOperations == nil || !reattach.SupportedOperations.MultipleConnections { diff --git a/pkg/steampipeconfig/connection_state_map.go b/pkg/steampipeconfig/connection_state_map.go index 82fda077d..016aae36e 100644 --- a/pkg/steampipeconfig/connection_state_map.go +++ b/pkg/steampipeconfig/connection_state_map.go @@ -34,12 +34,12 @@ func GetRequiredConnectionStateMap(connectionMap map[string]*modconfig.Connectio utils.LogTime("steampipeconfig.getRequiredConnections config - iteration start") // populate file mod time for each referenced plugin for name, connection := range connectionMap { - remoteSchema := connection.PluginLongName - pluginPath, _ := filepaths.GetPluginPath(connection.PluginLongName, connection.PluginShortName) + remoteSchema := connection.Plugin + pluginPath, _ := filepaths.GetPluginPath(connection.Plugin, connection.PluginAlias) // ignore error if plugin is not available // if plugin is not installed, the path will be returned as empty if pluginPath == "" { - missingPluginMap[connection.PluginLongName] = append(missingPluginMap[connection.PluginLongName], *connection) + missingPluginMap[connection.Plugin] = append(missingPluginMap[connection.Plugin], *connection) continue } diff --git a/pkg/steampipeconfig/connection_updates.go b/pkg/steampipeconfig/connection_updates.go index 7a0627d58..4e9131b73 100644 --- a/pkg/steampipeconfig/connection_updates.go +++ b/pkg/steampipeconfig/connection_updates.go @@ -152,7 +152,7 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, pluginMa // we need to refetch the rate limiters for if res.pluginBinaryChanged { // store map item of plugin name to connection name (so we only have one entry per plugin) - pluginShortName := GlobalConfig.Connections[requiredConnectionState.ConnectionName].PluginShortName + pluginShortName := GlobalConfig.Connections[requiredConnectionState.ConnectionName].PluginAlias updates.PluginsWithUpdatedBinary[pluginShortName] = requiredConnectionState.ConnectionName } } diff --git a/pkg/steampipeconfig/load_config.go b/pkg/steampipeconfig/load_config.go index d140413c1..34304f579 100644 --- a/pkg/steampipeconfig/load_config.go +++ b/pkg/steampipeconfig/load_config.go @@ -256,11 +256,14 @@ func loadConfig(configFolder string, steampipeConfig *SteampipeConfig, opts *loa if moreDiags.HasErrors() { continue } - _, alreadyThere := steampipeConfig.Plugins[plugin.Source] + // get the _display_ image ref to key the map + imageRef := plugin.GetImageRef() + + _, alreadyThere := steampipeConfig.Plugins[imageRef] if alreadyThere { return error_helpers.NewErrorsAndWarning(sperr.New("duplicate plugin: '%s' in '%s'", plugin.Source, block.TypeRange.Filename)) } - steampipeConfig.Plugins[plugin.Source] = plugin + steampipeConfig.Plugins[imageRef] = plugin case modconfig.BlockTypeConnection: connection, moreDiags := parse.DecodeConnection(block) diff --git a/pkg/steampipeconfig/load_config_test.go b/pkg/steampipeconfig/load_config_test.go index af0f6f3e6..711f48e95 100644 --- a/pkg/steampipeconfig/load_config_test.go +++ b/pkg/steampipeconfig/load_config_test.go @@ -47,12 +47,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{ expected: &SteampipeConfig{ Connections: map[string]*modconfig.Connection{ "aws_dmi_001": { - Name: "aws_dmi_001", - PluginShortName: "aws", - PluginLongName: "hub.steampipe.io/plugins/turbot/aws@latest", - Type: "", - ImportSchema: "enabled", - Config: "access_key = \"aws_dmi_001_access_key\"\nregions = \"- us-east-1\\n-us-west-\"\nsecret_key = \"aws_dmi_001_secret_key\"\n", + Name: "aws_dmi_001", + PluginAlias: "aws", + Plugin: "hub.steampipe.io/plugins/turbot/aws@latest", + Type: "", + ImportSchema: "enabled", + Config: "access_key = \"aws_dmi_001_access_key\"\nregions = \"- us-east-1\\n-us-west-\"\nsecret_key = \"aws_dmi_001_secret_key\"\n", DeclRange: modconfig.Range{ Filename: "$$test_pwd$$/testdata/connection_config/multiple_connections/config/connection1.spc", Start: modconfig.Pos{ @@ -68,12 +68,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{ }, }, "aws_dmi_002": { - Name: "aws_dmi_002", - PluginShortName: "aws", - PluginLongName: "hub.steampipe.io/plugins/turbot/aws@latest", - Type: "", - ImportSchema: "enabled", - Config: "access_key = \"aws_dmi_002_access_key\"\nregions = \"- us-east-1\\n-us-west-\"\nsecret_key = \"aws_dmi_002_secret_key\"\n", + Name: "aws_dmi_002", + PluginAlias: "aws", + Plugin: "hub.steampipe.io/plugins/turbot/aws@latest", + Type: "", + ImportSchema: "enabled", + Config: "access_key = \"aws_dmi_002_access_key\"\nregions = \"- us-east-1\\n-us-west-\"\nsecret_key = \"aws_dmi_002_secret_key\"\n", DeclRange: modconfig.Range{ Filename: "$$test_pwd$$/testdata/connection_config/multiple_connections/config/connection2.spc", Start: modconfig.Pos{ @@ -100,12 +100,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{ expected: &SteampipeConfig{ Connections: map[string]*modconfig.Connection{ "a": { - Name: "a", - PluginShortName: "test_data/connection-test-1", - PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", - Type: "", - ImportSchema: "enabled", - Config: "", + Name: "a", + PluginAlias: "test_data/connection-test-1", + Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", + Type: "", + ImportSchema: "enabled", + Config: "", DeclRange: modconfig.Range{ Filename: "$$test_pwd$$/testdata/connection_config/single_connection/config/connection1.spc", Start: modconfig.Pos{ @@ -132,12 +132,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{ expected: &SteampipeConfig{ Connections: map[string]*modconfig.Connection{ "a": { - Name: "a", - PluginShortName: "test_data/connection-test-1", - PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", - Type: "", - ImportSchema: "enabled", - Config: "", + Name: "a", + PluginAlias: "test_data/connection-test-1", + Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", + Type: "", + ImportSchema: "enabled", + Config: "", DeclRange: modconfig.Range{ Filename: "$$test_pwd$$/testdata/connection_config/single_connection_with_default_options/config/connection1.spc", Start: modconfig.Pos{ @@ -178,12 +178,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{ expected: &SteampipeConfig{ Connections: map[string]*modconfig.Connection{ "a": { - Name: "a", - PluginShortName: "test_data/connection-test-1", - PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", - Type: "", - ImportSchema: "enabled", - Config: "", + Name: "a", + PluginAlias: "test_data/connection-test-1", + Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", + Type: "", + ImportSchema: "enabled", + Config: "", DeclRange: modconfig.Range{ Filename: "$$test_pwd$$/testdata/connection_config/single_connection_with_default_options/config/connection1.spc", Start: modconfig.Pos{ @@ -219,12 +219,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{ expected: &SteampipeConfig{ Connections: map[string]*modconfig.Connection{ "a": { - Name: "a", - PluginShortName: "test_data/connection-test-1", - PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", - Type: "", - ImportSchema: "enabled", - Config: "", + Name: "a", + PluginAlias: "test_data/connection-test-1", + Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", + Type: "", + ImportSchema: "enabled", + Config: "", DeclRange: modconfig.Range{ Filename: "$$test_pwd$$/testdata/connection_config/single_connection_with_default_options/config/connection1.spc", Start: modconfig.Pos{ @@ -259,11 +259,11 @@ var testCasesLoadConfig = map[string]loadConfigTest{ expected: &SteampipeConfig{ Connections: map[string]*modconfig.Connection{ "a": { - Name: "a", - ImportSchema: "enabled", - PluginShortName: "test_data/connection-test-1", - PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", - Config: "", + Name: "a", + ImportSchema: "enabled", + PluginAlias: "test_data/connection-test-1", + Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest", + Config: "", Options: &options.Connection{ Cache: &trueVal, CacheTTL: &ttlVal, diff --git a/pkg/steampipeconfig/modconfig/connection.go b/pkg/steampipeconfig/modconfig/connection.go index 16f52e003..e7daa0578 100644 --- a/pkg/steampipeconfig/modconfig/connection.go +++ b/pkg/steampipeconfig/modconfig/connection.go @@ -32,9 +32,9 @@ type Connection struct { // connection name Name string `json:"name,omitempty"` // name of plugin as mentioned in config - PluginShortName string `json:"plugin_short_name,omitempty"` + PluginAlias string `json:"plugin_short_name,omitempty"` // fully qualified name of the plugin. derived from the short name - PluginLongName string `json:"plugin,omitempty"` + Plugin string `json:"plugin,omitempty"` // connection type - supported values: "aggregator" Type string `json:"type,omitempty"` @@ -126,7 +126,7 @@ func (c *Connection) Equals(other *Connection) bool { connectionOptionsEqual = c.Options.Equals(other.Options) } return c.Name == other.Name && - c.PluginLongName == other.PluginLongName && + c.Plugin == other.Plugin && c.Type == other.Type && strings.Join(c.ConnectionNames, ",") == strings.Join(other.ConnectionNames, ",") && connectionOptionsEqual && @@ -153,7 +153,7 @@ func (c *Connection) SetOptions(opts options.Options, block *hcl.Block) hcl.Diag } func (c *Connection) String() string { - return fmt.Sprintf("\n----\nName: %s\nPlugin: %s\nConfig:\n%s\nOptions:\n%s\n", c.Name, c.PluginLongName, c.Config, c.Options.String()) + return fmt.Sprintf("\n----\nName: %s\nPlugin: %s\nConfig:\n%s\nOptions:\n%s\n", c.Name, c.Plugin, c.Config, c.Options.String()) } // Validate verifies the Type property is valid, @@ -194,13 +194,13 @@ func (c *Connection) ValidateAggregatorConnection() (warnings, errors []string) // now ensure all child connections are loaded and use the same plugin as the parent connection for _, childConnection := range c.Connections { - if childConnection.PluginLongName != c.PluginLongName { + if childConnection.Plugin != c.Plugin { validationErrors = append(validationErrors, fmt.Sprintf("aggregator connection '%s' uses plugin %s but child connection '%s' uses plugin '%s'", c.Name, - c.PluginLongName, + c.Plugin, childConnection.Name, - childConnection.PluginLongName, + childConnection.Plugin, )) } @@ -246,7 +246,7 @@ func (c *Connection) PopulateChildren(connectionMap map[string]*Connection) { } if match, _ := path.Match(childName, name); match { // verify that this connection is of a compatible type - if connection.PluginLongName == c.PluginLongName { + if connection.Plugin == c.Plugin { c.Connections[name] = connection log.Printf("[TRACE] connection '%s' matches pattern '%s'", name, childName) } diff --git a/pkg/steampipeconfig/modconfig/plugin.go b/pkg/steampipeconfig/modconfig/plugin.go index 83093a2eb..532dbf66b 100644 --- a/pkg/steampipeconfig/modconfig/plugin.go +++ b/pkg/steampipeconfig/modconfig/plugin.go @@ -3,6 +3,7 @@ package modconfig import ( "github.com/hashicorp/hcl/v2" "github.com/hashicorp/hcl/v2/hclsyntax" + "github.com/turbot/steampipe/pkg/ociinstaller" ) type Plugin struct { @@ -12,14 +13,27 @@ type Plugin struct { FileName *string StartLineNumber *int EndLineNumber *int + imageRef *ociinstaller.SteampipeImageRef +} + +func NewPlugin(connection *Connection) *Plugin { + return &Plugin{ + Source: connection.PluginAlias, + imageRef: ociinstaller.NewSteampipeImageRef(connection.PluginAlias), + } } func (l *Plugin) OnDecoded(block *hcl.Block) { l.FileName = &block.DefRange.Filename l.StartLineNumber = &block.Body.(*hclsyntax.Body).SrcRange.Start.Line l.EndLineNumber = &block.Body.(*hclsyntax.Body).SrcRange.End.Line + l.imageRef = ociinstaller.NewSteampipeImageRef(l.Source) } func (l *Plugin) GetMaxMemoryBytes() int64 { return int64(1024 * 1024 * l.MaxMemoryMb) } + +func (l *Plugin) GetImageRef() string { + return l.imageRef.DisplayImageRef() +} diff --git a/pkg/steampipeconfig/modconfig/rate_limiter.go b/pkg/steampipeconfig/modconfig/rate_limiter.go index 187a2d1ec..889e67ad5 100644 --- a/pkg/steampipeconfig/modconfig/rate_limiter.go +++ b/pkg/steampipeconfig/modconfig/rate_limiter.go @@ -4,6 +4,7 @@ import ( "github.com/hashicorp/hcl/v2" "github.com/hashicorp/hcl/v2/hclsyntax" "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto" + "github.com/turbot/steampipe/pkg/ociinstaller" "sort" "strings" ) @@ -16,22 +17,23 @@ const ( ) type RateLimiter struct { - Name string `hcl:"name,label" db:"name"` - BucketSize *int64 `hcl:"bucket_size,optional" db:"bucket_size"` - FillRate *float32 `hcl:"fill_rate,optional" db:"fill_rate"` - MaxConcurrency *int64 `hcl:"max_concurrency,optional" db:"max_concurrency"` - Scope []string `hcl:"scope,optional" db:"scope"` - Where *string `hcl:"where,optional" db:"where"` - Plugin string `db:"plugin"` - FileName *string `db:"file_name"` - StartLineNumber *int `db:"start_line_number"` - EndLineNumber *int `db:"end_line_number"` - Status string `db:"status"` - Source string `db:"source"` + Name string `hcl:"name,label" db:"name"` + BucketSize *int64 `hcl:"bucket_size,optional" db:"bucket_size"` + FillRate *float32 `hcl:"fill_rate,optional" db:"fill_rate"` + MaxConcurrency *int64 `hcl:"max_concurrency,optional" db:"max_concurrency"` + Scope []string `hcl:"scope,optional" db:"scope"` + Where *string `hcl:"where,optional" db:"where"` + Plugin string `db:"plugin"` + FileName *string `db:"file_name"` + StartLineNumber *int `db:"start_line_number"` + EndLineNumber *int `db:"end_line_number"` + Status string `db:"status"` + Source string `db:"source"` + ImageRef *ociinstaller.SteampipeImageRef `db:"-"` } // RateLimiterFromProto converts the proto format RateLimiterDefinition into a Defintion -func RateLimiterFromProto(p *proto.RateLimiterDefinition) (*RateLimiter, error) { +func RateLimiterFromProto(p *proto.RateLimiterDefinition, pluginImageRef string) (*RateLimiter, error) { var res = &RateLimiter{ Name: p.Name, Scope: p.Scope, @@ -49,6 +51,8 @@ func RateLimiterFromProto(p *proto.RateLimiterDefinition) (*RateLimiter, error) if res.Scope == nil { res.Scope = []string{} } + res.ImageRef = ociinstaller.NewSteampipeImageRef(pluginImageRef) + res.Plugin = res.ImageRef.GetFriendlyName() return res, nil } diff --git a/pkg/steampipeconfig/parse/connection.go b/pkg/steampipeconfig/parse/connection.go index c13b602b8..e139ed98d 100644 --- a/pkg/steampipeconfig/parse/connection.go +++ b/pkg/steampipeconfig/parse/connection.go @@ -30,11 +30,11 @@ func DecodeConnection(block *hcl.Block) (*modconfig.Connection, hcl.Diagnostics) } if strings.HasPrefix(pluginName, "local/") { - connection.PluginLongName = pluginName + connection.Plugin = pluginName } else { - connection.PluginLongName = ociinstaller.NewSteampipeImageRef(pluginName).DisplayImageRef() + connection.Plugin = ociinstaller.NewSteampipeImageRef(pluginName).DisplayImageRef() } - connection.PluginShortName = pluginName + connection.PluginAlias = pluginName if connectionContent.Attributes["type"] != nil { var connectionType string diff --git a/pkg/steampipeconfig/steampipeconfig.go b/pkg/steampipeconfig/steampipeconfig.go index 0eaff5636..ffab68862 100644 --- a/pkg/steampipeconfig/steampipeconfig.go +++ b/pkg/steampipeconfig/steampipeconfig.go @@ -18,7 +18,7 @@ import ( // SteampipeConfig is a struct to hold Connection map and Steampipe options type SteampipeConfig struct { - // map of plugin configs + // map of plugin configs, keyed by plugin image ref Plugins map[string]*modconfig.Plugin // map of connection name to partially parsed connection config Connections map[string]*modconfig.Connection @@ -290,7 +290,7 @@ func (c *SteampipeConfig) ConnectionsForPlugin(pluginLongName string, pluginVers var res []*modconfig.Connection for _, con := range c.Connections { // extract stream from plugin - ref := ociinstaller.NewSteampipeImageRef(con.PluginLongName) + ref := ociinstaller.NewSteampipeImageRef(con.Plugin) org, plugin, stream := ref.GetOrgNameAndStream() longName := fmt.Sprintf("%s/%s", org, plugin) if longName == pluginLongName { @@ -328,12 +328,12 @@ func (c *SteampipeConfig) ConnectionList() []*modconfig.Connection { return res } -// ensure we have a plugin config struct for all plugins mentioned in conneciton config, +// ensure we have a plugin config struct for all plugins mentioned in connection config, // even if there is not an explicit HCL config for it func (c *SteampipeConfig) initializePlugins() { for _, connection := range c.Connections { - if c.Plugins[connection.PluginShortName] == nil { - c.Plugins[connection.PluginShortName] = &modconfig.Plugin{Source: connection.PluginShortName} + if c.Plugins[connection.Plugin] == nil { + c.Plugins[connection.Plugin] = modconfig.NewPlugin(connection) } } log.Printf("[INFO] connections: %s plugins: %s", strings.Join(maps.Keys(c.Connections), ","), strings.Join(maps.Keys(c.Plugins), ","))