Key the rate limiter and plugin config maps by plugin image ref, not short name. Closes #3820

This commit is contained in:
kaidaguerre
2023-09-11 15:56:35 +01:00
committed by GitHub
parent fa5240064f
commit 95fed2ed2a
18 changed files with 174 additions and 149 deletions

View File

@@ -12,8 +12,8 @@ func NewConnectionConfigMap(connectionMap map[string]*modconfig.Connection) Conn
for k, v := range connectionMap {
configMap[k] = &sdkproto.ConnectionConfig{
Connection: v.Name,
Plugin: v.PluginLongName,
PluginShortName: v.PluginShortName,
Plugin: v.Plugin,
PluginShortName: v.PluginAlias,
Config: v.Config,
ChildConnections: v.GetResolveConnectionNames(),
}

View File

@@ -5,25 +5,7 @@ import (
"golang.org/x/exp/maps"
)
// map of plugin short name to Limiter map for the plugin
type PluginLimiterMap map[string]LimiterMap
func (l PluginLimiterMap) Equals(other PluginLimiterMap) bool {
return maps.EqualFunc(l, other, func(m1, m2 LimiterMap) bool { return m1.Equals(m2) })
}
type PluginMap map[string]*modconfig.Plugin
func (p PluginMap) ToPluginLimiterMap() PluginLimiterMap {
var limiterPluginMap = make(PluginLimiterMap)
for name, p := range p {
if len(p.Limiters) > 0 {
limiterPluginMap[name] = NewLimiterMap(p.Limiters)
}
}
return limiterPluginMap
}
// LimiterMap is a map of limiter name to limiter definition
type LimiterMap map[string]*modconfig.RateLimiter
func NewLimiterMap(limiters []*modconfig.RateLimiter) LimiterMap {
@@ -37,7 +19,7 @@ func (l LimiterMap) Equals(other LimiterMap) bool {
return maps.EqualFunc(l, other, func(l1, l2 *modconfig.RateLimiter) bool { return l1.Equals(l2) })
}
// ToPluginLimiterMap converts limiter map keyed by limiter name to a map of limiter maps keyed by plugin
// ToPluginLimiterMap converts limiter map keyed by limiter name to a map of limiter maps keyed by plugin image ref
func (l LimiterMap) ToPluginLimiterMap() PluginLimiterMap {
res := make(PluginLimiterMap)
for name, limiter := range l {

View File

@@ -0,0 +1,25 @@
package connection
import (
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"golang.org/x/exp/maps"
)
// PluginLimiterMap map of plugin image ref to Limiter map for the plugin
type PluginLimiterMap map[string]LimiterMap
func (l PluginLimiterMap) Equals(other PluginLimiterMap) bool {
return maps.EqualFunc(l, other, func(m1, m2 LimiterMap) bool { return m1.Equals(m2) })
}
type PluginMap map[string]*modconfig.Plugin
func (p PluginMap) ToPluginLimiterMap() PluginLimiterMap {
var limiterPluginMap = make(PluginLimiterMap)
for imageRef, p := range p {
if len(p.Limiters) > 0 {
limiterPluginMap[imageRef] = NewLimiterMap(p.Limiters)
}
}
return limiterPluginMap
}

View File

@@ -184,7 +184,7 @@ VALUES($1,$2,$3,$4,$5,$6,$7,$8,$9,now(),now())
c.Type,
c.ImportSchema,
nil,
c.PluginLongName,
c.Plugin,
schemaMode,
schemaHash,
commentsSet,

View File

@@ -28,7 +28,7 @@ const (
// constants for installing db and fdw images
const (
DatabaseVersion = "14.2.0"
FdwVersion = "1.8.0-rc.6"
FdwVersion = "1.8.0-rc.7"
// PostgresImageRef is the OCI Image ref for the database binaries
PostgresImageRef = "us-docker.pkg.dev/steampipe/steampipe/db:14.2.0"

View File

@@ -16,12 +16,12 @@ const (
DefaultImageType = "plugins"
)
// SteampipeImageRef :: a ref to an OCI image
// SteampipeImageRef a struct encapsulating a ref to an OCI image
type SteampipeImageRef struct {
requestedRef string
}
// NewSteampipeImageRef :: creates and returns a New SteampipeImageRef
// NewSteampipeImageRef creates and returns a New SteampipeImageRef
func NewSteampipeImageRef(ref string) *SteampipeImageRef {
ref = sanitizeRefStream(ref)
return &SteampipeImageRef{
@@ -29,7 +29,7 @@ func NewSteampipeImageRef(ref string) *SteampipeImageRef {
}
}
// ActualImageRef :: returns the actual, physical full image ref
// ActualImageRef returns the actual, physical full image ref
// (us-docker.pkg.dev/steampipe/plugins/turbot/aws:1.0.0)
func (r *SteampipeImageRef) ActualImageRef() string {
ref := r.requestedRef
@@ -47,7 +47,7 @@ func (r *SteampipeImageRef) ActualImageRef() string {
return fullRef
}
// DisplayImageRef :: returns the "friendly" user-facing full image ref
// DisplayImageRef returns the "friendly" user-facing full image ref
// (hub.steampipe.io/plugins/turbot/aws@1.0.0)
func (r *SteampipeImageRef) DisplayImageRef() string {
fullRef := r.ActualImageRef()
@@ -84,8 +84,7 @@ func sanitizeRefStream(ref string) string {
return ref
}
// GetOrgNameAndStream :: splits the full image reference into
// (org, name, stream)
// GetOrgNameAndStream splits the full image reference into (org, name, stream)
func (r *SteampipeImageRef) GetOrgNameAndStream() (string, string, string) {
// plugin.Name looks like `hub.steampipe.io/plugins/turbot/aws@latest`
split := strings.Split(r.DisplayImageRef(), "/")
@@ -96,6 +95,18 @@ func (r *SteampipeImageRef) GetOrgNameAndStream() (string, string, string) {
return strings.Join(split[0:len(split)-2], "/"), pluginNameAndStream[0], pluginNameAndStream[1]
}
// GetFriendlyName returns a friendly name:
// hub.steampipe.io/plugins/turbot/aws@1.0.0 => aws@1.0.0
// hub.steampipe.io/plugins/turbot/aws@latest => aws
func (r *SteampipeImageRef) GetFriendlyName() string {
_, pluginName, pluginStream := r.GetOrgNameAndStream()
if pluginStream == DefaultImageTag {
return pluginName
} else {
return fmt.Sprintf("%s@%s", pluginName, pluginStream)
}
}
// possible formats include
// us-docker.pkg.dev/steampipe/plugin/turbot/aws:1.0.0
// us-docker.pkg.dev/steampipe/plugin/turbot/aws@sha256:766389c9dd892132c7e7b9124f446b9599a80863d466cd1d333a167dedf2c2b1

View File

@@ -61,19 +61,16 @@ type PluginManager struct {
logger hclog.Logger
messageServer *PluginMessageServer
// map of user configured rate limiter maps, keyed by plugin short name
// map of user configured rate limiter maps, keyed by plugin image ref
// NOTE: this is populated from config
userLimiters connection.PluginLimiterMap
// map of plugin configured rate limiter maps, keyed by plugin short name
// map of plugin configured rate limiter maps (keyed by plugin image ref)
// NOTE: if this is nil, that means the steampipe_rate_limiter tables has not been populalated yet -
// the first time we refresh connections we must load all plugins and fetch their rate limiter defs
pluginLimiters connection.PluginLimiterMap
// map of plugin configs
// map of plugin configs (keyed by plugin image ref)
plugins connection.PluginMap
// map of plugin short name to long name
pluginShortToLongNameMap map[string]string
pluginLongToShortNameMap map[string]string
pool *pgxpool.Pool
}
@@ -81,18 +78,15 @@ type PluginManager struct {
func NewPluginManager(ctx context.Context, connectionConfig map[string]*sdkproto.ConnectionConfig, pluginConfigs connection.PluginMap, logger hclog.Logger) (*PluginManager, error) {
log.Printf("[INFO] NewPluginManager")
pluginManager := &PluginManager{
logger: logger,
runningPluginMap: make(map[string]*runningPlugin),
connectionConfigMap: connectionConfig,
userLimiters: pluginConfigs.ToPluginLimiterMap(),
plugins: pluginConfigs,
pluginShortToLongNameMap: make(map[string]string),
pluginLongToShortNameMap: make(map[string]string),
logger: logger,
runningPluginMap: make(map[string]*runningPlugin),
connectionConfigMap: connectionConfig,
userLimiters: pluginConfigs.ToPluginLimiterMap(),
plugins: pluginConfigs,
}
pluginManager.messageServer = &PluginMessageServer{pluginManager: pluginManager}
//time.Sleep(10 * time.Second)
// populate plugin connection config map
pluginManager.populatePluginConnectionConfigs()
// determine cache size for each plugin
@@ -463,7 +457,7 @@ func (m *PluginManager) startPluginProcess(pluginName string, connectionConfigs
cmd := exec.Command(pluginPath)
// see if a plugin config was specified - if so, get the max memory to allow the plugin
pluginConfig := m.plugins[exemplarConnectionConfig.PluginShortName]
pluginConfig := m.plugins[exemplarConnectionConfig.Plugin]
// must be there
if pluginConfig == nil {
return nil, sperr.New("no plugin config is stored for plugin %s", exemplarConnectionConfig.PluginShortName)
@@ -597,9 +591,6 @@ func (m *PluginManager) populatePluginConnectionConfigs() {
m.pluginConnectionConfigMap = make(map[string][]*sdkproto.ConnectionConfig)
for _, config := range m.connectionConfigMap {
m.pluginConnectionConfigMap[config.Plugin] = append(m.pluginConnectionConfigMap[config.Plugin], config)
// populate plugin name map
m.pluginShortToLongNameMap[config.PluginShortName] = config.Plugin
m.pluginLongToShortNameMap[config.Plugin] = config.PluginShortName
}
}

View File

@@ -11,6 +11,7 @@ import (
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
"github.com/turbot/steampipe/pkg/ociinstaller"
pb "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/proto"
"github.com/turbot/steampipe/pkg/rate_limiters"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
@@ -27,7 +28,7 @@ func (m *PluginManager) ShouldFetchRateLimiterDefs() bool {
// for all plugins with changed limiters
func (m *PluginManager) HandlePluginLimiterChanges(newLimiters connection.PluginLimiterMap) error {
if m.pluginLimiters == nil {
// this must be the first time we have poplkated them
// this must be the first time we have populated them
m.pluginLimiters = make(connection.PluginLimiterMap)
}
for plugin, limitersForPlugin := range newLimiters {
@@ -109,13 +110,9 @@ func (m *PluginManager) handleUserLimiterChanges(plugins connection.PluginMap) e
func (m *PluginManager) setRateLimitersForPlugin(pluginShortName string) error {
// get running plugin for this plugin
// if plugin is not running we have nothing to do
longName, ok := m.pluginShortToLongNameMap[pluginShortName]
if !ok {
log.Printf("[INFO] handleUserLimiterChanges: plugin %s is not currently running - ignoring", pluginShortName)
return nil
}
runningPlugin, ok := m.runningPluginMap[longName]
imageRef := ociinstaller.NewSteampipeImageRef(pluginShortName).DisplayImageRef()
runningPlugin, ok := m.runningPluginMap[imageRef]
if !ok {
log.Printf("[INFO] handleUserLimiterChanges: plugin %s is not currently running - ignoring", pluginShortName)
return nil
@@ -125,13 +122,13 @@ func (m *PluginManager) setRateLimitersForPlugin(pluginShortName string) error {
return nil
}
pluginClient, err := sdkgrpc.NewPluginClient(runningPlugin.client, longName)
pluginClient, err := sdkgrpc.NewPluginClient(runningPlugin.client, imageRef)
if err != nil {
return sperr.WrapWithMessage(err, "failed to create a plugin client when updating the rate limiter for plugin '%s'", longName)
return sperr.WrapWithMessage(err, "failed to create a plugin client when updating the rate limiter for plugin '%s'", imageRef)
}
if err := m.setRateLimiters(pluginShortName, pluginClient); err != nil {
return sperr.WrapWithMessage(err, "failed to update rate limiters for plugin '%s'", longName)
return sperr.WrapWithMessage(err, "failed to update rate limiters for plugin '%s'", imageRef)
}
return nil
}
@@ -318,26 +315,24 @@ func (m *PluginManager) LoadPluginRateLimiters(pluginConnectionMap map[string]st
if rateLimiterResp == nil || rateLimiterResp.Definitions == nil {
continue
}
// populate the plugin name
pluginShortName := m.pluginLongToShortNameMap[reattach.Plugin]
limitersForPlugin := make(connection.LimiterMap)
for _, l := range rateLimiterResp.Definitions {
r, err := modconfig.RateLimiterFromProto(l)
r, err := modconfig.RateLimiterFromProto(l, reattach.Plugin)
if err != nil {
errors = append(errors, sperr.WrapWithMessage(err, "failed to create rate limiter %s from plugin definition", err))
continue
}
r.Plugin = pluginShortName
// set plugin as source
r.Source = modconfig.LimiterSourcePlugin
// derfaulty status to active
// default status to active
r.Status = modconfig.LimiterStatusActive
// add to map
limitersForPlugin[l.Name] = r
}
// store back
res[pluginShortName] = limitersForPlugin
res[reattach.Plugin] = limitersForPlugin
}
return res, nil

View File

@@ -123,7 +123,7 @@ func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectio
res.AddWarning(fmt.Sprintf("failed to start plugin '%s': %s", failedPlugin, failure))
// figure out which connections are provided by any failed plugins
for _, c := range connectionsToCreate {
if c.PluginLongName == failedPlugin {
if c.Plugin == failedPlugin {
res.AddFailedConnection(c.Name, constants.ConnectionErrorPluginFailedToStart)
}
}
@@ -134,8 +134,8 @@ func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectio
for _, connection := range connectionsToCreate {
// is this connection provided by a plugin we have already instantiated?
if existingConnectionPlugin, ok := multiConnectionPlugins[connection.PluginLongName]; ok {
log.Printf("[TRACE] CreateConnectionPlugins - connection %s is provided by existing connectionPlugin %s - reusing", connection.Name, connection.PluginLongName)
if existingConnectionPlugin, ok := multiConnectionPlugins[connection.Plugin]; ok {
log.Printf("[TRACE] CreateConnectionPlugins - connection %s is provided by existing connectionPlugin %s - reusing", connection.Name, connection.Plugin)
// store the existing connection plugin in the result map
requestedConnectionPluginMap[connection.Name] = existingConnectionPlugin
continue
@@ -143,7 +143,7 @@ func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectio
// do we have a reattach config for this connection's plugin
if _, ok := getResponse.ReattachMap[connection.Name]; !ok {
log.Printf("[TRACE] CreateConnectionPlugins skipping connection '%s', plugin '%s' as plugin manager failed to start it", connection.Name, connection.PluginLongName)
log.Printf("[TRACE] CreateConnectionPlugins skipping connection '%s', plugin '%s' as plugin manager failed to start it", connection.Name, connection.Plugin)
continue
}
@@ -157,13 +157,13 @@ func CreateConnectionPlugins(pluginManager pluginshared.PluginManager, connectio
connectionPlugin, err := createConnectionPlugin(connection, reattach)
if err != nil {
res.AddWarning(fmt.Sprintf("failed to start plugin '%s': %s", connection.PluginShortName, err))
res.AddWarning(fmt.Sprintf("failed to start plugin '%s': %s", connection.PluginAlias, err))
continue
}
requestedConnectionPluginMap[connection.Name] = connectionPlugin
if connectionPlugin.SupportedOperations.MultipleConnections {
// if it supports multiple connections, store in multiConnectionPlugins too
multiConnectionPlugins[connection.PluginLongName] = connectionPlugin
multiConnectionPlugins[connection.Plugin] = connectionPlugin
}
}
log.Printf("[TRACE] all connection plugins created, populating schemas")
@@ -254,7 +254,7 @@ func fullConnectionPluginMap(sparseConnectionPluginMap map[string]*ConnectionPlu
// createConnectionPlugin sttaches to the plugin process
func createConnectionPlugin(connection *modconfig.Connection, reattach *proto.ReattachConfig) (*ConnectionPlugin, error) {
log.Printf("[TRACE] createConnectionPlugin for connection %s", connection.Name)
pluginName := connection.PluginLongName
pluginName := connection.Plugin
connectionName := connection.Name
log.Printf("[TRACE] plugin manager returned reattach config for connection '%s' - pid %d",
@@ -275,7 +275,7 @@ func createConnectionPlugin(connection *modconfig.Connection, reattach *proto.Re
log.Printf("[TRACE] plugin client created for %s", pluginName)
// now create ConnectionPlugin object return
connectionPlugin := NewConnectionPlugin(connection.PluginShortName, pluginName, pluginClient, reattach.SupportedOperations)
connectionPlugin := NewConnectionPlugin(connection.PluginAlias, pluginName, pluginClient, reattach.SupportedOperations)
// if multiple connections are NOT supported, add the config for our one and only connection
if reattach.SupportedOperations == nil || !reattach.SupportedOperations.MultipleConnections {

View File

@@ -34,12 +34,12 @@ func GetRequiredConnectionStateMap(connectionMap map[string]*modconfig.Connectio
utils.LogTime("steampipeconfig.getRequiredConnections config - iteration start")
// populate file mod time for each referenced plugin
for name, connection := range connectionMap {
remoteSchema := connection.PluginLongName
pluginPath, _ := filepaths.GetPluginPath(connection.PluginLongName, connection.PluginShortName)
remoteSchema := connection.Plugin
pluginPath, _ := filepaths.GetPluginPath(connection.Plugin, connection.PluginAlias)
// ignore error if plugin is not available
// if plugin is not installed, the path will be returned as empty
if pluginPath == "" {
missingPluginMap[connection.PluginLongName] = append(missingPluginMap[connection.PluginLongName], *connection)
missingPluginMap[connection.Plugin] = append(missingPluginMap[connection.Plugin], *connection)
continue
}

View File

@@ -152,7 +152,7 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, pluginMa
// we need to refetch the rate limiters for
if res.pluginBinaryChanged {
// store map item of plugin name to connection name (so we only have one entry per plugin)
pluginShortName := GlobalConfig.Connections[requiredConnectionState.ConnectionName].PluginShortName
pluginShortName := GlobalConfig.Connections[requiredConnectionState.ConnectionName].PluginAlias
updates.PluginsWithUpdatedBinary[pluginShortName] = requiredConnectionState.ConnectionName
}
}

View File

@@ -256,11 +256,14 @@ func loadConfig(configFolder string, steampipeConfig *SteampipeConfig, opts *loa
if moreDiags.HasErrors() {
continue
}
_, alreadyThere := steampipeConfig.Plugins[plugin.Source]
// get the _display_ image ref to key the map
imageRef := plugin.GetImageRef()
_, alreadyThere := steampipeConfig.Plugins[imageRef]
if alreadyThere {
return error_helpers.NewErrorsAndWarning(sperr.New("duplicate plugin: '%s' in '%s'", plugin.Source, block.TypeRange.Filename))
}
steampipeConfig.Plugins[plugin.Source] = plugin
steampipeConfig.Plugins[imageRef] = plugin
case modconfig.BlockTypeConnection:
connection, moreDiags := parse.DecodeConnection(block)

View File

@@ -47,12 +47,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{
expected: &SteampipeConfig{
Connections: map[string]*modconfig.Connection{
"aws_dmi_001": {
Name: "aws_dmi_001",
PluginShortName: "aws",
PluginLongName: "hub.steampipe.io/plugins/turbot/aws@latest",
Type: "",
ImportSchema: "enabled",
Config: "access_key = \"aws_dmi_001_access_key\"\nregions = \"- us-east-1\\n-us-west-\"\nsecret_key = \"aws_dmi_001_secret_key\"\n",
Name: "aws_dmi_001",
PluginAlias: "aws",
Plugin: "hub.steampipe.io/plugins/turbot/aws@latest",
Type: "",
ImportSchema: "enabled",
Config: "access_key = \"aws_dmi_001_access_key\"\nregions = \"- us-east-1\\n-us-west-\"\nsecret_key = \"aws_dmi_001_secret_key\"\n",
DeclRange: modconfig.Range{
Filename: "$$test_pwd$$/testdata/connection_config/multiple_connections/config/connection1.spc",
Start: modconfig.Pos{
@@ -68,12 +68,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{
},
},
"aws_dmi_002": {
Name: "aws_dmi_002",
PluginShortName: "aws",
PluginLongName: "hub.steampipe.io/plugins/turbot/aws@latest",
Type: "",
ImportSchema: "enabled",
Config: "access_key = \"aws_dmi_002_access_key\"\nregions = \"- us-east-1\\n-us-west-\"\nsecret_key = \"aws_dmi_002_secret_key\"\n",
Name: "aws_dmi_002",
PluginAlias: "aws",
Plugin: "hub.steampipe.io/plugins/turbot/aws@latest",
Type: "",
ImportSchema: "enabled",
Config: "access_key = \"aws_dmi_002_access_key\"\nregions = \"- us-east-1\\n-us-west-\"\nsecret_key = \"aws_dmi_002_secret_key\"\n",
DeclRange: modconfig.Range{
Filename: "$$test_pwd$$/testdata/connection_config/multiple_connections/config/connection2.spc",
Start: modconfig.Pos{
@@ -100,12 +100,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{
expected: &SteampipeConfig{
Connections: map[string]*modconfig.Connection{
"a": {
Name: "a",
PluginShortName: "test_data/connection-test-1",
PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Type: "",
ImportSchema: "enabled",
Config: "",
Name: "a",
PluginAlias: "test_data/connection-test-1",
Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Type: "",
ImportSchema: "enabled",
Config: "",
DeclRange: modconfig.Range{
Filename: "$$test_pwd$$/testdata/connection_config/single_connection/config/connection1.spc",
Start: modconfig.Pos{
@@ -132,12 +132,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{
expected: &SteampipeConfig{
Connections: map[string]*modconfig.Connection{
"a": {
Name: "a",
PluginShortName: "test_data/connection-test-1",
PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Type: "",
ImportSchema: "enabled",
Config: "",
Name: "a",
PluginAlias: "test_data/connection-test-1",
Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Type: "",
ImportSchema: "enabled",
Config: "",
DeclRange: modconfig.Range{
Filename: "$$test_pwd$$/testdata/connection_config/single_connection_with_default_options/config/connection1.spc",
Start: modconfig.Pos{
@@ -178,12 +178,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{
expected: &SteampipeConfig{
Connections: map[string]*modconfig.Connection{
"a": {
Name: "a",
PluginShortName: "test_data/connection-test-1",
PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Type: "",
ImportSchema: "enabled",
Config: "",
Name: "a",
PluginAlias: "test_data/connection-test-1",
Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Type: "",
ImportSchema: "enabled",
Config: "",
DeclRange: modconfig.Range{
Filename: "$$test_pwd$$/testdata/connection_config/single_connection_with_default_options/config/connection1.spc",
Start: modconfig.Pos{
@@ -219,12 +219,12 @@ var testCasesLoadConfig = map[string]loadConfigTest{
expected: &SteampipeConfig{
Connections: map[string]*modconfig.Connection{
"a": {
Name: "a",
PluginShortName: "test_data/connection-test-1",
PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Type: "",
ImportSchema: "enabled",
Config: "",
Name: "a",
PluginAlias: "test_data/connection-test-1",
Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Type: "",
ImportSchema: "enabled",
Config: "",
DeclRange: modconfig.Range{
Filename: "$$test_pwd$$/testdata/connection_config/single_connection_with_default_options/config/connection1.spc",
Start: modconfig.Pos{
@@ -259,11 +259,11 @@ var testCasesLoadConfig = map[string]loadConfigTest{
expected: &SteampipeConfig{
Connections: map[string]*modconfig.Connection{
"a": {
Name: "a",
ImportSchema: "enabled",
PluginShortName: "test_data/connection-test-1",
PluginLongName: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Config: "",
Name: "a",
ImportSchema: "enabled",
PluginAlias: "test_data/connection-test-1",
Plugin: "hub.steampipe.io/plugins/test_data/connection-test-1@latest",
Config: "",
Options: &options.Connection{
Cache: &trueVal,
CacheTTL: &ttlVal,

View File

@@ -32,9 +32,9 @@ type Connection struct {
// connection name
Name string `json:"name,omitempty"`
// name of plugin as mentioned in config
PluginShortName string `json:"plugin_short_name,omitempty"`
PluginAlias string `json:"plugin_short_name,omitempty"`
// fully qualified name of the plugin. derived from the short name
PluginLongName string `json:"plugin,omitempty"`
Plugin string `json:"plugin,omitempty"`
// connection type - supported values: "aggregator"
Type string `json:"type,omitempty"`
@@ -126,7 +126,7 @@ func (c *Connection) Equals(other *Connection) bool {
connectionOptionsEqual = c.Options.Equals(other.Options)
}
return c.Name == other.Name &&
c.PluginLongName == other.PluginLongName &&
c.Plugin == other.Plugin &&
c.Type == other.Type &&
strings.Join(c.ConnectionNames, ",") == strings.Join(other.ConnectionNames, ",") &&
connectionOptionsEqual &&
@@ -153,7 +153,7 @@ func (c *Connection) SetOptions(opts options.Options, block *hcl.Block) hcl.Diag
}
func (c *Connection) String() string {
return fmt.Sprintf("\n----\nName: %s\nPlugin: %s\nConfig:\n%s\nOptions:\n%s\n", c.Name, c.PluginLongName, c.Config, c.Options.String())
return fmt.Sprintf("\n----\nName: %s\nPlugin: %s\nConfig:\n%s\nOptions:\n%s\n", c.Name, c.Plugin, c.Config, c.Options.String())
}
// Validate verifies the Type property is valid,
@@ -194,13 +194,13 @@ func (c *Connection) ValidateAggregatorConnection() (warnings, errors []string)
// now ensure all child connections are loaded and use the same plugin as the parent connection
for _, childConnection := range c.Connections {
if childConnection.PluginLongName != c.PluginLongName {
if childConnection.Plugin != c.Plugin {
validationErrors = append(validationErrors,
fmt.Sprintf("aggregator connection '%s' uses plugin %s but child connection '%s' uses plugin '%s'",
c.Name,
c.PluginLongName,
c.Plugin,
childConnection.Name,
childConnection.PluginLongName,
childConnection.Plugin,
))
}
@@ -246,7 +246,7 @@ func (c *Connection) PopulateChildren(connectionMap map[string]*Connection) {
}
if match, _ := path.Match(childName, name); match {
// verify that this connection is of a compatible type
if connection.PluginLongName == c.PluginLongName {
if connection.Plugin == c.Plugin {
c.Connections[name] = connection
log.Printf("[TRACE] connection '%s' matches pattern '%s'", name, childName)
}

View File

@@ -3,6 +3,7 @@ package modconfig
import (
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/hclsyntax"
"github.com/turbot/steampipe/pkg/ociinstaller"
)
type Plugin struct {
@@ -12,14 +13,27 @@ type Plugin struct {
FileName *string
StartLineNumber *int
EndLineNumber *int
imageRef *ociinstaller.SteampipeImageRef
}
func NewPlugin(connection *Connection) *Plugin {
return &Plugin{
Source: connection.PluginAlias,
imageRef: ociinstaller.NewSteampipeImageRef(connection.PluginAlias),
}
}
func (l *Plugin) OnDecoded(block *hcl.Block) {
l.FileName = &block.DefRange.Filename
l.StartLineNumber = &block.Body.(*hclsyntax.Body).SrcRange.Start.Line
l.EndLineNumber = &block.Body.(*hclsyntax.Body).SrcRange.End.Line
l.imageRef = ociinstaller.NewSteampipeImageRef(l.Source)
}
func (l *Plugin) GetMaxMemoryBytes() int64 {
return int64(1024 * 1024 * l.MaxMemoryMb)
}
func (l *Plugin) GetImageRef() string {
return l.imageRef.DisplayImageRef()
}

View File

@@ -4,6 +4,7 @@ import (
"github.com/hashicorp/hcl/v2"
"github.com/hashicorp/hcl/v2/hclsyntax"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe/pkg/ociinstaller"
"sort"
"strings"
)
@@ -16,22 +17,23 @@ const (
)
type RateLimiter struct {
Name string `hcl:"name,label" db:"name"`
BucketSize *int64 `hcl:"bucket_size,optional" db:"bucket_size"`
FillRate *float32 `hcl:"fill_rate,optional" db:"fill_rate"`
MaxConcurrency *int64 `hcl:"max_concurrency,optional" db:"max_concurrency"`
Scope []string `hcl:"scope,optional" db:"scope"`
Where *string `hcl:"where,optional" db:"where"`
Plugin string `db:"plugin"`
FileName *string `db:"file_name"`
StartLineNumber *int `db:"start_line_number"`
EndLineNumber *int `db:"end_line_number"`
Status string `db:"status"`
Source string `db:"source"`
Name string `hcl:"name,label" db:"name"`
BucketSize *int64 `hcl:"bucket_size,optional" db:"bucket_size"`
FillRate *float32 `hcl:"fill_rate,optional" db:"fill_rate"`
MaxConcurrency *int64 `hcl:"max_concurrency,optional" db:"max_concurrency"`
Scope []string `hcl:"scope,optional" db:"scope"`
Where *string `hcl:"where,optional" db:"where"`
Plugin string `db:"plugin"`
FileName *string `db:"file_name"`
StartLineNumber *int `db:"start_line_number"`
EndLineNumber *int `db:"end_line_number"`
Status string `db:"status"`
Source string `db:"source"`
ImageRef *ociinstaller.SteampipeImageRef `db:"-"`
}
// RateLimiterFromProto converts the proto format RateLimiterDefinition into a Defintion
func RateLimiterFromProto(p *proto.RateLimiterDefinition) (*RateLimiter, error) {
func RateLimiterFromProto(p *proto.RateLimiterDefinition, pluginImageRef string) (*RateLimiter, error) {
var res = &RateLimiter{
Name: p.Name,
Scope: p.Scope,
@@ -49,6 +51,8 @@ func RateLimiterFromProto(p *proto.RateLimiterDefinition) (*RateLimiter, error)
if res.Scope == nil {
res.Scope = []string{}
}
res.ImageRef = ociinstaller.NewSteampipeImageRef(pluginImageRef)
res.Plugin = res.ImageRef.GetFriendlyName()
return res, nil
}

View File

@@ -30,11 +30,11 @@ func DecodeConnection(block *hcl.Block) (*modconfig.Connection, hcl.Diagnostics)
}
if strings.HasPrefix(pluginName, "local/") {
connection.PluginLongName = pluginName
connection.Plugin = pluginName
} else {
connection.PluginLongName = ociinstaller.NewSteampipeImageRef(pluginName).DisplayImageRef()
connection.Plugin = ociinstaller.NewSteampipeImageRef(pluginName).DisplayImageRef()
}
connection.PluginShortName = pluginName
connection.PluginAlias = pluginName
if connectionContent.Attributes["type"] != nil {
var connectionType string

View File

@@ -18,7 +18,7 @@ import (
// SteampipeConfig is a struct to hold Connection map and Steampipe options
type SteampipeConfig struct {
// map of plugin configs
// map of plugin configs, keyed by plugin image ref
Plugins map[string]*modconfig.Plugin
// map of connection name to partially parsed connection config
Connections map[string]*modconfig.Connection
@@ -290,7 +290,7 @@ func (c *SteampipeConfig) ConnectionsForPlugin(pluginLongName string, pluginVers
var res []*modconfig.Connection
for _, con := range c.Connections {
// extract stream from plugin
ref := ociinstaller.NewSteampipeImageRef(con.PluginLongName)
ref := ociinstaller.NewSteampipeImageRef(con.Plugin)
org, plugin, stream := ref.GetOrgNameAndStream()
longName := fmt.Sprintf("%s/%s", org, plugin)
if longName == pluginLongName {
@@ -328,12 +328,12 @@ func (c *SteampipeConfig) ConnectionList() []*modconfig.Connection {
return res
}
// ensure we have a plugin config struct for all plugins mentioned in conneciton config,
// ensure we have a plugin config struct for all plugins mentioned in connection config,
// even if there is not an explicit HCL config for it
func (c *SteampipeConfig) initializePlugins() {
for _, connection := range c.Connections {
if c.Plugins[connection.PluginShortName] == nil {
c.Plugins[connection.PluginShortName] = &modconfig.Plugin{Source: connection.PluginShortName}
if c.Plugins[connection.Plugin] == nil {
c.Plugins[connection.Plugin] = modconfig.NewPlugin(connection)
}
}
log.Printf("[INFO] connections: %s plugins: %s", strings.Join(maps.Keys(c.Connections), ","), strings.Join(maps.Keys(c.Plugins), ","))