steampipe compiles

This commit is contained in:
kai
2024-09-02 15:58:37 +01:00
committed by Puskar Basu
parent fca92eb5c2
commit 80ad514e1d
16 changed files with 82 additions and 187 deletions

View File

@@ -5,6 +5,7 @@ import (
"encoding/json" "encoding/json"
"errors" "errors"
"fmt" "fmt"
"github.com/turbot/steampipe/pkg/ociinstaller"
"strings" "strings"
"sync" "sync"
"time" "time"
@@ -487,7 +488,7 @@ func runPluginUpdateCmd(cmd *cobra.Command, args []string) {
defer cancel() defer cancel()
statushooks.SetStatus(ctx, "Checking for available updates") statushooks.SetStatus(ctx, "Checking for available updates")
reports := plugin.GetUpdateReport(timeoutCtx, state.InstallationID, runUpdatesFor, constants.SteampipeHubOCIBase) reports := plugin.GetUpdateReport(timeoutCtx, state.InstallationID, runUpdatesFor)
statushooks.Done(ctx) statushooks.Done(ctx)
if len(reports) == 0 { if len(reports) == 0 {
// this happens if for some reason the update server could not be contacted, // this happens if for some reason the update server could not be contacted,
@@ -592,7 +593,7 @@ func installPlugin(ctx context.Context, resolvedPlugin plugin.ResolvedPluginVers
} }
}() }()
image, err := plugin.Install(ctx, resolvedPlugin, progress, constants.BaseImageRef, putils.WithSkipConfig(viper.GetBool(constants.ArgSkipConfig))) image, err := plugin.Install(ctx, resolvedPlugin, progress, constants.BaseImageRef, ociinstaller.SteampipeMediaTypeProvider{}, putils.WithSkipConfig(viper.GetBool(constants.ArgSkipConfig)))
if err != nil { if err != nil {
msg := "" msg := ""
// used to build data for the plugin install report to be used for display purposes // used to build data for the plugin install report to be used for display purposes
@@ -679,7 +680,7 @@ func runPluginListCmd(cmd *cobra.Command, _ []string) {
} }
func showPluginListOutput(pluginList []plugin.PluginListItem, failedPluginMap, missingPluginMap map[string][]*modconfig.Connection, res perror_helpers.ErrorAndWarnings, outputFormat string) error { func showPluginListOutput(pluginList []plugin.PluginListItem, failedPluginMap, missingPluginMap map[string][]plugin.PluginConnection, res perror_helpers.ErrorAndWarnings, outputFormat string) error {
switch outputFormat { switch outputFormat {
case "table": case "table":
return showPluginListAsTable(pluginList, failedPluginMap, missingPluginMap, res) return showPluginListAsTable(pluginList, failedPluginMap, missingPluginMap, res)
@@ -690,7 +691,7 @@ func showPluginListOutput(pluginList []plugin.PluginListItem, failedPluginMap, m
} }
} }
func showPluginListAsTable(pluginList []plugin.PluginListItem, failedPluginMap, missingPluginMap map[string][]*modconfig.Connection, res perror_helpers.ErrorAndWarnings) error { func showPluginListAsTable(pluginList []plugin.PluginListItem, failedPluginMap, missingPluginMap map[string][]plugin.PluginConnection, res perror_helpers.ErrorAndWarnings) error {
headers := []string{"Installed", "Version", "Connections"} headers := []string{"Installed", "Version", "Connections"}
var rows [][]string var rows [][]string
// List installed plugins in a table // List installed plugins in a table
@@ -713,7 +714,7 @@ func showPluginListAsTable(pluginList []plugin.PluginListItem, failedPluginMap,
// failed plugins // failed plugins
for p, item := range failedPluginMap { for p, item := range failedPluginMap {
for _, conn := range item { for _, conn := range item {
conns = append(conns, conn.Name) conns = append(conns, conn.GetName())
} }
missingRows = append(missingRows, []string{p, strings.Join(conns, ","), constants.ConnectionErrorPluginFailedToStart}) missingRows = append(missingRows, []string{p, strings.Join(conns, ","), constants.ConnectionErrorPluginFailedToStart})
conns = []string{} conns = []string{}
@@ -722,7 +723,7 @@ func showPluginListAsTable(pluginList []plugin.PluginListItem, failedPluginMap,
// missing plugins // missing plugins
for p, item := range missingPluginMap { for p, item := range missingPluginMap {
for _, conn := range item { for _, conn := range item {
conns = append(conns, conn.Name) conns = append(conns, conn.GetName())
} }
missingRows = append(missingRows, []string{p, strings.Join(conns, ","), constants.InstallMessagePluginNotInstalled}) missingRows = append(missingRows, []string{p, strings.Join(conns, ","), constants.InstallMessagePluginNotInstalled})
conns = []string{} conns = []string{}
@@ -740,7 +741,7 @@ func showPluginListAsTable(pluginList []plugin.PluginListItem, failedPluginMap,
return nil return nil
} }
func showPluginListAsJSON(pluginList []plugin.PluginListItem, failedPluginMap, missingPluginMap map[string][]*modconfig.Connection, res perror_helpers.ErrorAndWarnings) error { func showPluginListAsJSON(pluginList []plugin.PluginListItem, failedPluginMap, missingPluginMap map[string][]plugin.PluginConnection, res perror_helpers.ErrorAndWarnings) error {
output := pluginJsonOutput{} output := pluginJsonOutput{}
for _, item := range pluginList { for _, item := range pluginList {
@@ -755,7 +756,7 @@ func showPluginListAsJSON(pluginList []plugin.PluginListItem, failedPluginMap, m
for p, item := range failedPluginMap { for p, item := range failedPluginMap {
connections := make([]string, len(item)) connections := make([]string, len(item))
for i, conn := range item { for i, conn := range item {
connections[i] = conn.Name connections[i] = conn.GetName()
} }
failed := failedPlugin{ failed := failedPlugin{
Name: p, Name: p,
@@ -768,7 +769,7 @@ func showPluginListAsJSON(pluginList []plugin.PluginListItem, failedPluginMap, m
for p, item := range missingPluginMap { for p, item := range missingPluginMap {
connections := make([]string, len(item)) connections := make([]string, len(item))
for i, conn := range item { for i, conn := range item {
connections[i] = conn.Name connections[i] = conn.GetName()
} }
missing := failedPlugin{ missing := failedPlugin{
Name: p, Name: p,
@@ -823,7 +824,7 @@ func runPluginUninstallCmd(cmd *cobra.Command, args []string) {
return return
} }
reports := steampipeconfig.PluginRemoveReports{} reports := plugin.PluginRemoveReports{}
statushooks.SetStatus(ctx, fmt.Sprintf("Uninstalling %s", utils.Pluralize("plugin", len(args)))) statushooks.SetStatus(ctx, fmt.Sprintf("Uninstalling %s", utils.Pluralize("plugin", len(args))))
for _, p := range args { for _, p := range args {
statushooks.SetStatus(ctx, fmt.Sprintf("Uninstalling %s", p)) statushooks.SetStatus(ctx, fmt.Sprintf("Uninstalling %s", p))
@@ -841,7 +842,7 @@ func runPluginUninstallCmd(cmd *cobra.Command, args []string) {
reports.Print() reports.Print()
} }
func getPluginList(ctx context.Context) (pluginList []plugin.PluginListItem, failedPluginMap, missingPluginMap map[string][]*modconfig.Connection, res perror_helpers.ErrorAndWarnings) { func getPluginList(ctx context.Context) (pluginList []plugin.PluginListItem, failedPluginMap, missingPluginMap map[string][]plugin.PluginConnection, res perror_helpers.ErrorAndWarnings) {
statushooks.Show(ctx) statushooks.Show(ctx)
defer statushooks.Done(ctx) defer statushooks.Done(ctx)
@@ -872,7 +873,7 @@ func getPluginList(ctx context.Context) (pluginList []plugin.PluginListItem, fai
return pluginList, failedPluginMap, missingPluginMap, res return pluginList, failedPluginMap, missingPluginMap, res
} }
func getPluginConnectionMap(ctx context.Context) (pluginConnectionMap, failedPluginMap, missingPluginMap map[string][]*modconfig.Connection, res perror_helpers.ErrorAndWarnings) { func getPluginConnectionMap(ctx context.Context) (pluginConnectionMap, failedPluginMap, missingPluginMap map[string][]plugin.PluginConnection, res perror_helpers.ErrorAndWarnings) {
utils.LogTime("cmd.getPluginConnectionMap start") utils.LogTime("cmd.getPluginConnectionMap start")
defer utils.LogTime("cmd.getPluginConnectionMap end") defer utils.LogTime("cmd.getPluginConnectionMap end")
@@ -887,9 +888,9 @@ func getPluginConnectionMap(ctx context.Context) (pluginConnectionMap, failedPlu
} }
// create the map of failed/missing plugins and available/loaded plugins // create the map of failed/missing plugins and available/loaded plugins
failedPluginMap = map[string][]*modconfig.Connection{} failedPluginMap = map[string][]plugin.PluginConnection{}
missingPluginMap = map[string][]*modconfig.Connection{} missingPluginMap = map[string][]plugin.PluginConnection{}
pluginConnectionMap = make(map[string][]*modconfig.Connection) pluginConnectionMap = make(map[string][]plugin.PluginConnection)
for _, state := range connectionStateMap { for _, state := range connectionStateMap {
connection, ok := steampipeconfig.GlobalConfig.Connections[state.ConnectionName] connection, ok := steampipeconfig.GlobalConfig.Connections[state.ConnectionName]

View File

@@ -2,6 +2,7 @@ package connection
import ( import (
"context" "context"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/turbot/pipe-fittings/error_helpers" "github.com/turbot/pipe-fittings/error_helpers"
"github.com/turbot/pipe-fittings/plugin" "github.com/turbot/pipe-fittings/plugin"
@@ -11,7 +12,7 @@ import (
type pluginManager interface { type pluginManager interface {
shared.PluginManager shared.PluginManager
OnConnectionConfigChanged(context.Context, ConnectionConfigMap, map[string]*modconfig.Plugin) OnConnectionConfigChanged(context.Context, ConnectionConfigMap, map[string]*plugin.Plugin)
GetConnectionConfig() ConnectionConfigMap GetConnectionConfig() ConnectionConfigMap
HandlePluginLimiterChanges(PluginLimiterMap) error HandlePluginLimiterChanges(PluginLimiterMap) error
Pool() *pgxpool.Pool Pool() *pgxpool.Pool

View File

@@ -6,9 +6,9 @@ import (
) )
// LimiterMap is a map of limiter name to limiter definition // LimiterMap is a map of limiter name to limiter definition
type LimiterMap map[string]*modconfig.RateLimiter type LimiterMap map[string]*plugin.RateLimiter
func NewLimiterMap(limiters []*modconfig.RateLimiter) LimiterMap { func NewLimiterMap(limiters []*plugin.RateLimiter) LimiterMap {
res := make(LimiterMap) res := make(LimiterMap)
for _, l := range limiters { for _, l := range limiters {
res[l.Name] = l res[l.Name] = l
@@ -16,7 +16,7 @@ func NewLimiterMap(limiters []*modconfig.RateLimiter) LimiterMap {
return res return res
} }
func (l LimiterMap) Equals(other LimiterMap) bool { func (l LimiterMap) Equals(other LimiterMap) bool {
return maps.EqualFunc(l, other, func(l1, l2 *modconfig.RateLimiter) bool { return l1.Equals(l2) }) return maps.EqualFunc(l, other, func(l1, l2 *plugin.RateLimiter) bool { return l1.Equals(l2) })
} }
// ToPluginLimiterMap converts limiter map keyed by limiter name to a map of limiter maps keyed by plugin image ref // ToPluginLimiterMap converts limiter map keyed by limiter name to a map of limiter maps keyed by plugin image ref

View File

@@ -12,7 +12,7 @@ func (l PluginLimiterMap) Equals(other PluginLimiterMap) bool {
return maps.EqualFunc(l, other, func(m1, m2 LimiterMap) bool { return m1.Equals(m2) }) return maps.EqualFunc(l, other, func(m1, m2 LimiterMap) bool { return m1.Equals(m2) })
} }
type PluginMap map[string]*modconfig.Plugin type PluginMap map[string]*plugin.Plugin
func (p PluginMap) ToPluginLimiterMap() PluginLimiterMap { func (p PluginMap) ToPluginLimiterMap() PluginLimiterMap {
var limiterPluginMap = make(PluginLimiterMap) var limiterPluginMap = make(PluginLimiterMap)
@@ -24,11 +24,11 @@ func (p PluginMap) ToPluginLimiterMap() PluginLimiterMap {
return limiterPluginMap return limiterPluginMap
} }
//func (p PluginMap) Diff(otherMap PluginMap) (added, deleted, changed map[string][]*modconfig.Plugin) { //func (p PluginMap) Diff(otherMap PluginMap) (added, deleted, changed map[string][]*plugin.Plugin) {
// // results are maps of connections keyed by plugin instance // // results are maps of connections keyed by plugin instance
// added = make(map[string][]*modconfig.Plugin) // added = make(map[string][]*plugin.Plugin)
// deleted = make(map[string][]*modconfig.Plugin) // deleted = make(map[string][]*plugin.Plugin)
// changed = make(map[string][]*modconfig.Plugin) // changed = make(map[string][]*plugin.Plugin)
// //
// for name, plugin := range p { // for name, plugin := range p {
// if otherConnection, ok := otherMap[name]; !ok { // if otherConnection, ok := otherMap[name]; !ok {

View File

@@ -166,7 +166,7 @@ func (i *InitData) Init(ctx context.Context, invoker constants.Invoker, opts ...
i.Client = client i.Client = client
} }
func validateModRequirementsRecursively(mod *modconfig.Mod, pluginVersionMap map[string]*modconfig.PluginVersionString) []string { func validateModRequirementsRecursively(mod *modconfig.Mod, pluginVersionMap map[string]*plugin.PluginVersionString) []string {
var validationErrors []string var validationErrors []string
// validate this mod // validate this mod

View File

@@ -2,8 +2,8 @@ package introspection
import ( import (
"fmt" "fmt"
"github.com/turbot/pipe-fittings/plugin"
"github.com/turbot/pipe-fittings/plugin"
"github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common" "github.com/turbot/steampipe/pkg/db/db_common"
) )
@@ -23,7 +23,7 @@ func GetPluginTableCreateSql() db_common.QueryWithArgs {
} }
} }
func GetPluginTablePopulateSql(plugin *modconfig.Plugin) db_common.QueryWithArgs { func GetPluginTablePopulateSql(plugin *plugin.Plugin) db_common.QueryWithArgs {
return db_common.QueryWithArgs{ return db_common.QueryWithArgs{
Query: fmt.Sprintf(`INSERT INTO %s.%s ( Query: fmt.Sprintf(`INSERT INTO %s.%s (
plugin, plugin,

View File

@@ -2,8 +2,8 @@ package introspection
import ( import (
"fmt" "fmt"
"github.com/turbot/pipe-fittings/plugin"
"github.com/turbot/pipe-fittings/plugin"
"github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common" "github.com/turbot/steampipe/pkg/db/db_common"
) )
@@ -38,7 +38,7 @@ func GetRateLimiterTableDropSql() db_common.QueryWithArgs {
} }
} }
func GetRateLimiterTablePopulateSql(settings *modconfig.RateLimiter) db_common.QueryWithArgs { func GetRateLimiterTablePopulateSql(settings *plugin.RateLimiter) db_common.QueryWithArgs {
return db_common.QueryWithArgs{ return db_common.QueryWithArgs{
Query: fmt.Sprintf(`INSERT INTO %s.%s ( Query: fmt.Sprintf(`INSERT INTO %s.%s (
"name", "name",

View File

@@ -3,8 +3,6 @@ package modinstaller
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/turbot/pipe-fittings/plugin"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"log" "log"
"os" "os"
"path" "path"
@@ -14,11 +12,13 @@ import (
git "github.com/go-git/go-git/v5" git "github.com/go-git/go-git/v5"
"github.com/otiai10/copy" "github.com/otiai10/copy"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/turbot/pipe-fittings/plugin"
"github.com/turbot/pipe-fittings/utils" "github.com/turbot/pipe-fittings/utils"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr" "github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe/pkg/constants" "github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/error_helpers" "github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/filepaths" "github.com/turbot/steampipe/pkg/filepaths"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig" "github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/steampipeconfig/parse" "github.com/turbot/steampipe/pkg/steampipeconfig/parse"
"github.com/turbot/steampipe/pkg/steampipeconfig/versionmap" "github.com/turbot/steampipe/pkg/steampipeconfig/versionmap"
@@ -35,7 +35,7 @@ type ModInstaller struct {
oldRequire *modconfig.Require oldRequire *modconfig.Require
// installed plugins // installed plugins
installedPlugins map[string]*modconfig.PluginVersionString installedPlugins map[string]*plugin.PluginVersionString
mods versionmap.VersionConstraintMap mods versionmap.VersionConstraintMap

View File

@@ -3,8 +3,6 @@ package pluginmanager_service
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/turbot/pipe-fittings/filepaths"
"github.com/turbot/pipe-fittings/plugin"
"log" "log"
"os" "os"
"os/exec" "os/exec"
@@ -14,11 +12,13 @@ import (
"time" "time"
"github.com/hashicorp/go-hclog" "github.com/hashicorp/go-hclog"
"github.com/hashicorp/go-plugin" goplugin "github.com/hashicorp/go-plugin"
"github.com/jackc/pgx/v5/pgxpool" "github.com/jackc/pgx/v5/pgxpool"
"github.com/sethvargo/go-retry" "github.com/sethvargo/go-retry"
"github.com/spf13/viper" "github.com/spf13/viper"
"github.com/turbot/go-kit/helpers" "github.com/turbot/go-kit/helpers"
"github.com/turbot/pipe-fittings/filepaths"
"github.com/turbot/pipe-fittings/plugin"
"github.com/turbot/pipe-fittings/utils" "github.com/turbot/pipe-fittings/utils"
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc" sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto" sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
@@ -117,14 +117,14 @@ func NewPluginManager(ctx context.Context, connectionConfig map[string]*sdkproto
func (m *PluginManager) Serve() { func (m *PluginManager) Serve() {
// create a plugin map, using ourselves as the implementation // create a plugin map, using ourselves as the implementation
pluginMap := map[string]plugin.Plugin{ pluginMap := map[string]goplugin.Plugin{
pluginshared.PluginName: &pluginshared.PluginManagerPlugin{Impl: m}, pluginshared.PluginName: &pluginshared.PluginManagerPlugin{Impl: m},
} }
plugin.Serve(&plugin.ServeConfig{ goplugin.Serve(&goplugin.ServeConfig{
HandshakeConfig: pluginshared.Handshake, HandshakeConfig: pluginshared.Handshake,
Plugins: pluginMap, Plugins: pluginMap,
// enable gRPC serving for this plugin... // enable gRPC serving for this plugin...
GRPCServer: plugin.DefaultGRPCServer, GRPCServer: goplugin.DefaultGRPCServer,
}) })
} }
@@ -229,7 +229,7 @@ func (m *PluginManager) doRefresh() {
} }
// OnConnectionConfigChanged is the callback function invoked by the connection watcher when the config changed // OnConnectionConfigChanged is the callback function invoked by the connection watcher when the config changed
func (m *PluginManager) OnConnectionConfigChanged(ctx context.Context, configMap connection.ConnectionConfigMap, plugins map[string]*modconfig.Plugin) { func (m *PluginManager) OnConnectionConfigChanged(ctx context.Context, configMap connection.ConnectionConfigMap, plugins map[string]*plugin.Plugin) {
m.mut.Lock() m.mut.Lock()
defer m.mut.Unlock() defer m.mut.Unlock()
@@ -474,7 +474,7 @@ func (m *PluginManager) addRunningPlugin(pluginInstance string) (*runningPlugin,
return startingPlugin, nil return startingPlugin, nil
} }
func (m *PluginManager) startPluginProcess(pluginInstance string, connectionConfigs []*sdkproto.ConnectionConfig) (*plugin.Client, error) { func (m *PluginManager) startPluginProcess(pluginInstance string, connectionConfigs []*sdkproto.ConnectionConfig) (*goplugin.Client, error) {
// retrieve the plugin config // retrieve the plugin config
pluginConfig := m.plugins[pluginInstance] pluginConfig := m.plugins[pluginInstance]
// must be there (if no explicit config was specified, we create a default) // must be there (if no explicit config was specified, we create a default)
@@ -494,17 +494,17 @@ func (m *PluginManager) startPluginProcess(pluginInstance string, connectionConf
log.Printf("[INFO] ************ plugin path %s ********************\n", pluginPath) log.Printf("[INFO] ************ plugin path %s ********************\n", pluginPath)
// create the plugin map // create the plugin map
pluginMap := map[string]plugin.Plugin{ pluginMap := map[string]goplugin.Plugin{
imageRef: &sdkshared.WrapperPlugin{}, imageRef: &sdkshared.WrapperPlugin{},
} }
cmd := exec.Command(pluginPath) cmd := exec.Command(pluginPath)
m.setPluginMaxMemory(pluginConfig, cmd) m.setPluginMaxMemory(pluginConfig, cmd)
client := plugin.NewClient(&plugin.ClientConfig{ client := goplugin.NewClient(&goplugin.ClientConfig{
HandshakeConfig: sdkshared.Handshake, HandshakeConfig: sdkshared.Handshake,
Plugins: pluginMap, Plugins: pluginMap,
Cmd: cmd, Cmd: cmd,
AllowedProtocols: []plugin.Protocol{plugin.ProtocolGRPC}, AllowedProtocols: []goplugin.Protocol{goplugin.ProtocolGRPC},
// pass our logger to the plugin client to ensure plugin logs end up in logfile // pass our logger to the plugin client to ensure plugin logs end up in logfile
Logger: m.logger, Logger: m.logger,
@@ -520,7 +520,7 @@ func (m *PluginManager) startPluginProcess(pluginInstance string, connectionConf
} }
func (m *PluginManager) setPluginMaxMemory(pluginConfig *modconfig.Plugin, cmd *exec.Cmd) { func (m *PluginManager) setPluginMaxMemory(pluginConfig *plugin.Plugin, cmd *exec.Cmd) {
maxMemoryBytes := pluginConfig.GetMaxMemoryBytes() maxMemoryBytes := pluginConfig.GetMaxMemoryBytes()
if maxMemoryBytes == 0 { if maxMemoryBytes == 0 {
if viper.IsSet(constants.ArgMemoryMaxMbPlugin) { if viper.IsSet(constants.ArgMemoryMaxMbPlugin) {
@@ -536,7 +536,7 @@ func (m *PluginManager) setPluginMaxMemory(pluginConfig *modconfig.Plugin, cmd *
} }
// set the connection configs and build a ReattachConfig // set the connection configs and build a ReattachConfig
func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.ConnectionConfig, client *plugin.Client, req *pb.GetRequest) (_ *pb.ReattachConfig, err error) { func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.ConnectionConfig, client *goplugin.Client, req *pb.GetRequest) (_ *pb.ReattachConfig, err error) {
// extract connection names // extract connection names
connectionNames := make([]string, len(connectionConfigs)) connectionNames := make([]string, len(connectionConfigs))
for i, c := range connectionConfigs { for i, c := range connectionConfigs {

View File

@@ -9,7 +9,7 @@ import (
) )
func (m *PluginManager) handlePluginInstanceChanges(ctx context.Context, newPlugins connection.PluginMap) error { func (m *PluginManager) handlePluginInstanceChanges(ctx context.Context, newPlugins connection.PluginMap) error {
if maps.EqualFunc(m.plugins, newPlugins, func(l *modconfig.Plugin, r *modconfig.Plugin) bool { if maps.EqualFunc(m.plugins, newPlugins, func(l *plugin.Plugin, r *plugin.Plugin) bool {
return l.Equals(r) return l.Equals(r)
}) { }) {
return nil return nil

View File

@@ -3,11 +3,12 @@ package pluginmanager_service
import ( import (
"context" "context"
"fmt" "fmt"
"github.com/turbot/pipe-fittings/plugin"
"log" "log"
"github.com/jackc/pgx/v5" "github.com/jackc/pgx/v5"
"github.com/turbot/pipe-fittings/ociinstaller" "github.com/turbot/pipe-fittings/ociinstaller"
"github.com/turbot/pipe-fittings/plugin"
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc" sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto" "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr" "github.com/turbot/steampipe-plugin-sdk/v5/sperr"
@@ -155,17 +156,17 @@ func (m *PluginManager) getPluginsWithChangedLimiters(newLimiters connection.Plu
func (m *PluginManager) updateRateLimiterStatus() { func (m *PluginManager) updateRateLimiterStatus() {
// iterate through limiters for each plug // iterate through limiters for each plug
for plugin, pluginDefinedLimiters := range m.pluginLimiters { for p, pluginDefinedLimiters := range m.pluginLimiters {
// get user limiters for this plugin // get user limiters for this plugin
userDefinedLimiters := m.getUserDefinedLimitersForPlugin(plugin) userDefinedLimiters := m.getUserDefinedLimitersForPlugin(p)
// is there a user override? - if so set status to overriden // is there a user override? - if so set status to overriden
for name, pluginLimiter := range pluginDefinedLimiters { for name, pluginLimiter := range pluginDefinedLimiters {
_, isOverriden := userDefinedLimiters[name] _, isOverriden := userDefinedLimiters[name]
if isOverriden { if isOverriden {
pluginLimiter.Status = modconfig.LimiterStatusOverridden pluginLimiter.Status = plugin.LimiterStatusOverridden
} else { } else {
pluginLimiter.Status = modconfig.LimiterStatusActive pluginLimiter.Status = plugin.LimiterStatusActive
} }
} }
} }
@@ -225,19 +226,19 @@ func (m *PluginManager) bootstrapRateLimiterTable(ctx context.Context) error {
return m.refreshRateLimiterTable(ctx) return m.refreshRateLimiterTable(ctx)
} }
func (m *PluginManager) loadRateLimitersFromTable(ctx context.Context) ([]*modconfig.RateLimiter, error) { func (m *PluginManager) loadRateLimitersFromTable(ctx context.Context) ([]*plugin.RateLimiter, error) {
rows, err := m.pool.Query(ctx, fmt.Sprintf("SELECT * FROM %s.%s", constants.InternalSchema, constants.RateLimiterDefinitionTable)) rows, err := m.pool.Query(ctx, fmt.Sprintf("SELECT * FROM %s.%s", constants.InternalSchema, constants.RateLimiterDefinitionTable))
if err != nil { if err != nil {
return nil, err return nil, err
} }
defer rows.Close() defer rows.Close()
rateLimiters, err := pgx.CollectRows(rows, pgx.RowToStructByNameLax[modconfig.RateLimiter]) rateLimiters, err := pgx.CollectRows(rows, pgx.RowToStructByNameLax[plugin.RateLimiter])
if err != nil { if err != nil {
return nil, err return nil, err
} }
// convert to pointer array // convert to pointer array
pRateLimiters := make([]*modconfig.RateLimiter, len(rateLimiters)) pRateLimiters := make([]*plugin.RateLimiter, len(rateLimiters))
for i, r := range rateLimiters { for i, r := range rateLimiters {
// copy into loop var // copy into loop var
rateLimiter := r rateLimiter := r
@@ -246,11 +247,11 @@ func (m *PluginManager) loadRateLimitersFromTable(ctx context.Context) ([]*modco
return pRateLimiters, nil return pRateLimiters, nil
} }
func (m *PluginManager) getUserAndPluginLimitersFromTableResult(rateLimiters []*modconfig.RateLimiter) (connection.PluginLimiterMap, connection.PluginLimiterMap) { func (m *PluginManager) getUserAndPluginLimitersFromTableResult(rateLimiters []*plugin.RateLimiter) (connection.PluginLimiterMap, connection.PluginLimiterMap) {
pluginLimiters := make(connection.PluginLimiterMap) pluginLimiters := make(connection.PluginLimiterMap)
userLimiters := make(connection.PluginLimiterMap) userLimiters := make(connection.PluginLimiterMap)
for _, r := range rateLimiters { for _, r := range rateLimiters {
if r.Source == modconfig.LimiterSourcePlugin { if r.Source == plugin.LimiterSourcePlugin {
pluginLimitersForPlugin := pluginLimiters[r.Plugin] pluginLimitersForPlugin := pluginLimiters[r.Plugin]
if pluginLimitersForPlugin == nil { if pluginLimitersForPlugin == nil {
pluginLimitersForPlugin = make(connection.LimiterMap) pluginLimitersForPlugin = make(connection.LimiterMap)
@@ -305,16 +306,16 @@ func (m *PluginManager) LoadPluginRateLimiters(pluginConnectionMap map[string]st
limitersForPlugin := make(connection.LimiterMap) limitersForPlugin := make(connection.LimiterMap)
for _, l := range rateLimiterResp.Definitions { for _, l := range rateLimiterResp.Definitions {
r, err := modconfig.RateLimiterFromProto(l, reattach.Plugin, pluginInstance) r, err := plugin.RateLimiterFromProto(l, reattach.Plugin, pluginInstance)
if err != nil { if err != nil {
errors = append(errors, sperr.WrapWithMessage(err, "failed to create rate limiter %s from plugin definition", err)) errors = append(errors, sperr.WrapWithMessage(err, "failed to create rate limiter %s from plugin definition", err))
continue continue
} }
// set plugin as source // set plugin as source
r.Source = modconfig.LimiterSourcePlugin r.Source = plugin.LimiterSourcePlugin
// default status to active // default status to active
r.Status = modconfig.LimiterStatusActive r.Status = plugin.LimiterStatusActive
// add to map // add to map
limitersForPlugin[l.Name] = r limitersForPlugin[l.Name] = r
} }

View File

@@ -2,6 +2,7 @@ package modconfig
import ( import (
"fmt" "fmt"
"github.com/turbot/pipe-fittings/plugin"
"log" "log"
"path" "path"
"reflect" "reflect"
@@ -63,64 +64,28 @@ type Connection struct {
// options // options
Options *options.Connection `json:"options,omitempty"` Options *options.Connection `json:"options,omitempty"`
DeclRange Range `json:"decl_range"` DeclRange plugin.Range `json:"decl_range"`
} }
// Range represents a span of characters between two positions in a source file. func (c *Connection) GetDeclRange() plugin.Range {
// This is a direct re-implementation of hcl.Range, allowing us to control JSON serialization return c.DeclRange
type Range struct {
// Filename is the name of the file into which this range's positions point.
Filename string `json:"filename,omitempty"`
// Start and End represent the bounds of this range. Start is inclusive and End is exclusive.
Start Pos `json:"start,omitempty"`
End Pos `json:"end,omitempty"`
} }
func (r Range) GetLegacy() hcl.Range { func (c *Connection) GetName() string {
return hcl.Range{ return c.Name
Filename: r.Filename, }
Start: r.Start.GetLegacy(),
End: r.End.GetLegacy(), func (c *Connection) GetDisplayName() string {
} if c.ImportDisabled() {
} return fmt.Sprintf("%s (disabled)", c.Name)
func NewRange(sourceRange hcl.Range) Range {
return Range{
Filename: sourceRange.Filename,
Start: NewPos(sourceRange.Start),
End: NewPos(sourceRange.End),
}
}
// Pos represents a single position in a source file
// This is a direct re-implementation of hcl.Pos, allowing us to control JSON serialization
type Pos struct {
Line int `json:"line"`
Column int `json:"column"`
Byte int `json:"byte"`
}
func (r Pos) GetLegacy() hcl.Pos {
return hcl.Pos{
Line: r.Line,
Column: r.Column,
Byte: r.Byte,
}
}
func NewPos(sourcePos hcl.Pos) Pos {
return Pos{
Line: sourcePos.Line,
Column: sourcePos.Column,
Byte: sourcePos.Byte,
} }
return c.Name
} }
func NewConnection(block *hcl.Block) *Connection { func NewConnection(block *hcl.Block) *Connection {
return &Connection{ return &Connection{
Name: block.Labels[0], Name: block.Labels[0],
DeclRange: NewRange(hclhelpers.BlockRange(block)), DeclRange: plugin.NewRange(hclhelpers.BlockRange(block)),
ImportSchema: ImportSchemaEnabled, ImportSchema: ImportSchemaEnabled,
// default to plugin // default to plugin
Type: ConnectionTypePlugin, Type: ConnectionTypePlugin,

View File

@@ -2,7 +2,7 @@ package modconfig
import ( import (
"fmt" "fmt"
modconfig2 "github.com/turbot/pipe-fittings/plugin" "github.com/turbot/pipe-fittings/plugin"
"os" "os"
"path/filepath" "path/filepath"
@@ -312,7 +312,7 @@ func (m *Mod) SetFilePath(modFilePath string) {
} }
// ValidateRequirements validates that the current steampipe CLI and the installed plugins is compatible with the mod // ValidateRequirements validates that the current steampipe CLI and the installed plugins is compatible with the mod
func (m *Mod) ValidateRequirements(pluginVersionMap map[string]*modconfig2.PluginVersionString) []error { func (m *Mod) ValidateRequirements(pluginVersionMap map[string]*plugin.PluginVersionString) []error {
validationErrors := []error{} validationErrors := []error{}
if err := m.validateSteampipeVersion(); err != nil { if err := m.validateSteampipeVersion(); err != nil {
validationErrors = append(validationErrors, err) validationErrors = append(validationErrors, err)
@@ -329,7 +329,7 @@ func (m *Mod) validateSteampipeVersion() error {
return m.Require.validateSteampipeVersion(m.Name()) return m.Require.validateSteampipeVersion(m.Name())
} }
func (m *Mod) validatePluginVersions(availablePlugins map[string]*modconfig2.PluginVersionString) []error { func (m *Mod) validatePluginVersions(availablePlugins map[string]*plugin.PluginVersionString) []error {
if m.Require == nil { if m.Require == nil {
return nil return nil
} }

View File

@@ -136,7 +136,7 @@ func (r *Require) validateSteampipeVersion(modName string) error {
} }
// validatePluginVersions validates that for every plugin requirement there's at least one plugin installed // validatePluginVersions validates that for every plugin requirement there's at least one plugin installed
func (r *Require) validatePluginVersions(modName string, plugins map[string]*plugin.Plugin) []error { func (r *Require) validatePluginVersions(modName string, plugins map[string]*plugin.PluginVersionString) []error {
if len(r.Plugins) == 0 { if len(r.Plugins) == 0 {
return nil return nil
} }
@@ -151,7 +151,7 @@ func (r *Require) validatePluginVersions(modName string, plugins map[string]*plu
// searchInstalledPluginForRequirement returns plugin validation errors if no plugin is found which satisfies // searchInstalledPluginForRequirement returns plugin validation errors if no plugin is found which satisfies
// the mod requirement. If plugin is found nil error is returned. // the mod requirement. If plugin is found nil error is returned.
func (r *Require) searchInstalledPluginForRequirement(modName string, requirement *plugin.PluginVersion, plugins map[string]*plugin.Plugin) error { func (r *Require) searchInstalledPluginForRequirement(modName string, requirement *plugin.PluginVersion, plugins map[string]*plugin.PluginVersionString) error {
for installedName, installed := range plugins { for installedName, installed := range plugins {
org, name, _ := ociinstaller.NewImageRef(installedName).GetOrgNameAndStream() org, name, _ := ociinstaller.NewImageRef(installedName).GetOrgNameAndStream()
if org != requirement.Org || name != requirement.Name { if org != requirement.Org || name != requirement.Name {

View File

@@ -6,8 +6,8 @@ import (
"github.com/turbot/pipe-fittings/plugin" "github.com/turbot/pipe-fittings/plugin"
) )
func DecodeLimiter(block *hcl.Block) (*modconfig.RateLimiter, hcl.Diagnostics) { func DecodeLimiter(block *hcl.Block) (*plugin.RateLimiter, hcl.Diagnostics) {
var limiter = &modconfig.RateLimiter{ var limiter = &plugin.RateLimiter{
// populate name from label // populate name from label
Name: block.Labels[0], Name: block.Labels[0],
} }

View File

@@ -1,73 +0,0 @@
package steampipeconfig
import (
"fmt"
"sort"
"strings"
"github.com/turbot/pipe-fittings/ociinstaller"
"github.com/turbot/pipe-fittings/utils"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
)
type PluginRemoveReport struct {
Image *ociinstaller.ImageRef
ShortName string
Connections []*modconfig.Connection
}
type PluginRemoveReports []PluginRemoveReport
func (r PluginRemoveReports) Print() {
length := len(r)
var staleConnections []*modconfig.Connection
if length > 0 {
fmt.Printf("\nUninstalled %s:\n", utils.Pluralize("plugin", length))
for _, report := range r {
org, name, _ := report.Image.GetOrgNameAndStream()
fmt.Printf("* %s/%s\n", org, name)
staleConnections = append(staleConnections, report.Connections...)
// sort the connections by line number while we are at it!
sort.SliceStable(report.Connections, func(i, j int) bool {
left := report.Connections[i]
right := report.Connections[j]
return left.DeclRange.Start.Line < right.DeclRange.Start.Line
})
}
fmt.Println()
staleLength := len(staleConnections)
uniqueFiles := map[string]bool{}
// get the unique files
if staleLength > 0 {
for _, report := range r {
for _, conn := range report.Connections {
uniqueFiles[conn.DeclRange.Filename] = true
}
}
str := append([]string{}, fmt.Sprintf(
"Please remove %s %s to continue using steampipe:",
utils.Pluralize("this", len(uniqueFiles)),
utils.Pluralize("connection", len(uniqueFiles)),
))
str = append(str, "")
for file := range uniqueFiles {
str = append(str, fmt.Sprintf(" * %s", constants.Bold(file)))
for _, report := range r {
for _, conn := range report.Connections {
if conn.DeclRange.Filename == file {
str = append(str, fmt.Sprintf(" '%s' (line %2d)", conn.Name, conn.DeclRange.Start.Line))
}
}
}
str = append(str, "")
}
fmt.Println(strings.Join(str, "\n"))
}
}
}