mirror of
https://github.com/turbot/steampipe.git
synced 2025-12-19 18:12:43 -05:00
Updates for steampipe_plugin_column table. Closes #4022
* Fix inconsistent value for plugin in steampipe_plugin_column table - always use long name. * Rename `plugin_name` column to `plugin` * Ensure empty col values
This commit is contained in:
@@ -13,7 +13,7 @@ import (
|
|||||||
func GetPluginColumnTableCreateSql() db_common.QueryWithArgs {
|
func GetPluginColumnTableCreateSql() db_common.QueryWithArgs {
|
||||||
return db_common.QueryWithArgs{
|
return db_common.QueryWithArgs{
|
||||||
Query: fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s (
|
Query: fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s (
|
||||||
plugin_name TEXT NOT NULL,
|
plugin TEXT NOT NULL,
|
||||||
table_name TEXT NOT NULL,
|
table_name TEXT NOT NULL,
|
||||||
name TEXT NOT NULL,
|
name TEXT NOT NULL,
|
||||||
type TEXT NOT NULL,
|
type TEXT NOT NULL,
|
||||||
@@ -61,10 +61,10 @@ func GetPluginColumnTablePopulateSql(
|
|||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
var listConfig, getConfig keyColumn
|
var listConfig, getConfig *keyColumn
|
||||||
|
|
||||||
if getKeyColumn != nil {
|
if getKeyColumn != nil {
|
||||||
getConfig = newKeyColumn(getKeyColumn.Operators, getConfig.Required, getKeyColumn.CacheMatch)
|
getConfig = newKeyColumn(getKeyColumn.Operators, getKeyColumn.Require, getKeyColumn.CacheMatch)
|
||||||
}
|
}
|
||||||
if listKeyColumn != nil {
|
if listKeyColumn != nil {
|
||||||
listConfig = newKeyColumn(listKeyColumn.Operators, listKeyColumn.Require, listKeyColumn.CacheMatch)
|
listConfig = newKeyColumn(listKeyColumn.Operators, listKeyColumn.Require, listKeyColumn.CacheMatch)
|
||||||
@@ -74,9 +74,14 @@ func GetPluginColumnTablePopulateSql(
|
|||||||
if s, ok := defaultValue.(string); ok {
|
if s, ok := defaultValue.(string); ok {
|
||||||
defaultValue = fmt.Sprintf(`"%s"`, s)
|
defaultValue = fmt.Sprintf(`"%s"`, s)
|
||||||
}
|
}
|
||||||
|
var hydrate any = nil
|
||||||
|
if columnSchema.Hydrate != "" {
|
||||||
|
hydrate = columnSchema.Hydrate
|
||||||
|
}
|
||||||
|
|
||||||
q := db_common.QueryWithArgs{
|
q := db_common.QueryWithArgs{
|
||||||
Query: fmt.Sprintf(`INSERT INTO %s.%s (
|
Query: fmt.Sprintf(`INSERT INTO %s.%s (
|
||||||
plugin_name,
|
plugin,
|
||||||
table_name ,
|
table_name ,
|
||||||
name,
|
name,
|
||||||
type,
|
type,
|
||||||
@@ -95,7 +100,7 @@ func GetPluginColumnTablePopulateSql(
|
|||||||
description,
|
description,
|
||||||
listConfig,
|
listConfig,
|
||||||
getConfig,
|
getConfig,
|
||||||
columnSchema.Hydrate,
|
hydrate,
|
||||||
defaultValue,
|
defaultValue,
|
||||||
},
|
},
|
||||||
}
|
}
|
||||||
@@ -117,7 +122,7 @@ func GetPluginColumnTableDeletePluginSql(plugin string) db_common.QueryWithArgs
|
|||||||
return db_common.QueryWithArgs{
|
return db_common.QueryWithArgs{
|
||||||
Query: fmt.Sprintf(
|
Query: fmt.Sprintf(
|
||||||
`DELETE FROM %s.%s
|
`DELETE FROM %s.%s
|
||||||
WHERE plugin_name = $1;`,
|
WHERE plugin = $1;`,
|
||||||
constants.InternalSchema,
|
constants.InternalSchema,
|
||||||
constants.PluginColumnTable,
|
constants.PluginColumnTable,
|
||||||
),
|
),
|
||||||
@@ -138,14 +143,14 @@ func GetPluginColumnTableGrantSql() db_common.QueryWithArgs {
|
|||||||
|
|
||||||
type keyColumn struct {
|
type keyColumn struct {
|
||||||
Operators []string `json:"operators,omitempty"`
|
Operators []string `json:"operators,omitempty"`
|
||||||
Required string `json:"required,omitempty"`
|
Require string `json:"require,omitempty"`
|
||||||
CacheMatch string `json:"cache_match,omitempty"`
|
CacheMatch string `json:"cache_match,omitempty"`
|
||||||
}
|
}
|
||||||
|
|
||||||
func newKeyColumn(operators []string, required string, cacheMatch string) keyColumn {
|
func newKeyColumn(operators []string, require string, cacheMatch string) *keyColumn {
|
||||||
return keyColumn{
|
return &keyColumn{
|
||||||
Operators: cleanOperators(operators),
|
Operators: cleanOperators(operators),
|
||||||
Required: required,
|
Require: require,
|
||||||
CacheMatch: cacheMatch,
|
CacheMatch: cacheMatch,
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
@@ -159,13 +164,13 @@ func cleanOperators(operators []string) []string {
|
|||||||
case "<>":
|
case "<>":
|
||||||
operator = "!="
|
operator = "!="
|
||||||
case ">":
|
case ">":
|
||||||
operator = "GT"
|
operator = "gt"
|
||||||
case "<":
|
case "<":
|
||||||
operator = "LT"
|
operator = "lt"
|
||||||
case ">=":
|
case ">=":
|
||||||
operator = "GE"
|
operator = "ge"
|
||||||
case "<=":
|
case "<=":
|
||||||
operator = "LE"
|
operator = "le"
|
||||||
}
|
}
|
||||||
res[i] = operator
|
res[i] = operator
|
||||||
}
|
}
|
||||||
|
|||||||
@@ -56,7 +56,6 @@ func (m *PluginManager) sendUpdateConnectionConfigs(requestMap map[string]*sdkpr
|
|||||||
errors = append(errors, err)
|
errors = append(errors, err)
|
||||||
continue
|
continue
|
||||||
}
|
}
|
||||||
|
|
||||||
err = pluginClient.UpdateConnectionConfigs(req)
|
err = pluginClient.UpdateConnectionConfigs(req)
|
||||||
if err != nil {
|
if err != nil {
|
||||||
errors = append(errors, err)
|
errors = append(errors, err)
|
||||||
|
|||||||
@@ -2,6 +2,7 @@ package pluginmanager_service
|
|||||||
|
|
||||||
import (
|
import (
|
||||||
"context"
|
"context"
|
||||||
|
"fmt"
|
||||||
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
|
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
|
||||||
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
|
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
|
||||||
sdkplugin "github.com/turbot/steampipe-plugin-sdk/v5/plugin"
|
sdkplugin "github.com/turbot/steampipe-plugin-sdk/v5/plugin"
|
||||||
@@ -12,30 +13,67 @@ import (
|
|||||||
pb "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/proto"
|
pb "github.com/turbot/steampipe/pkg/pluginmanager_service/grpc/proto"
|
||||||
"golang.org/x/exp/maps"
|
"golang.org/x/exp/maps"
|
||||||
"log"
|
"log"
|
||||||
|
"slices"
|
||||||
|
"strings"
|
||||||
)
|
)
|
||||||
|
|
||||||
func (m *PluginManager) initialisePluginColumns(ctx context.Context) error {
|
func (m *PluginManager) initialisePluginColumns(ctx context.Context) error {
|
||||||
pluginColumnTableExists, err := m.tableExists(ctx, constants.InternalSchema, constants.PluginColumnTable)
|
if m.shouldBootstrapPluginColumnTable(ctx) {
|
||||||
if err != nil {
|
|
||||||
return err
|
|
||||||
}
|
|
||||||
|
|
||||||
if !pluginColumnTableExists {
|
|
||||||
return m.bootstrapPluginColumnTable(ctx)
|
return m.bootstrapPluginColumnTable(ctx)
|
||||||
}
|
}
|
||||||
return nil
|
return nil
|
||||||
}
|
}
|
||||||
|
|
||||||
|
func (m *PluginManager) shouldBootstrapPluginColumnTable(ctx context.Context) bool {
|
||||||
|
pluginColumnTableExists, err := m.tableExists(ctx, constants.InternalSchema, constants.PluginColumnTable)
|
||||||
|
if err != nil || !pluginColumnTableExists {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
// check columns match
|
||||||
|
query := fmt.Sprintf(`SELECT column_name
|
||||||
|
FROM information_schema.columns
|
||||||
|
WHERE table_schema = '%s'
|
||||||
|
AND table_name = '%s'`, constants.InternalSchema, constants.PluginColumnTable)
|
||||||
|
|
||||||
|
rows, err := m.pool.Query(ctx, query)
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
defer rows.Close()
|
||||||
|
var columns []string
|
||||||
|
// Iterate through the rows
|
||||||
|
for rows.Next() {
|
||||||
|
var s string
|
||||||
|
err := rows.Scan(&s)
|
||||||
|
if err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
columns = append(columns, s)
|
||||||
|
}
|
||||||
|
|
||||||
|
// Check for errors from iterating over rows
|
||||||
|
if err = rows.Err(); err != nil {
|
||||||
|
return true
|
||||||
|
}
|
||||||
|
|
||||||
|
expectedColumns := []string{"plugin_", "table_name", "name", "type", " description", "list_config", "get_config", "hydrate_name", "default_value"}
|
||||||
|
return !slices.Equal(columns, expectedColumns)
|
||||||
|
}
|
||||||
|
|
||||||
func (m *PluginManager) bootstrapPluginColumnTable(ctx context.Context) error {
|
func (m *PluginManager) bootstrapPluginColumnTable(ctx context.Context) error {
|
||||||
schemas, err := m.loadPluginSchemas(m.getPluginExemplarConnections())
|
schemas, err := m.loadPluginSchemas(m.getPluginExemplarConnections())
|
||||||
if err != nil {
|
if err != nil {
|
||||||
|
log.Printf("[WARN] loadPluginSchemas error: %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
|
|
||||||
if err := m.createPluginColumnsTable(ctx); err != nil {
|
if err := m.createPluginColumnsTable(ctx); err != nil {
|
||||||
|
log.Printf("[WARN] createPluginColumnsTable error: %s", err.Error())
|
||||||
return err
|
return err
|
||||||
}
|
}
|
||||||
// now populate the table
|
// now populate the table
|
||||||
|
log.Printf("[INFO] bootstrapPluginColumnTable loaded schema for plugins: %s", strings.Join(maps.Keys(schemas), ","))
|
||||||
|
|
||||||
return m.populatePluginColumnsTable(ctx, schemas)
|
return m.populatePluginColumnsTable(ctx, schemas)
|
||||||
}
|
}
|
||||||
|
|
||||||
@@ -57,6 +95,11 @@ func (m *PluginManager) createPluginColumnsTable(ctx context.Context) error {
|
|||||||
}
|
}
|
||||||
|
|
||||||
func (m *PluginManager) populatePluginColumnsTable(ctx context.Context, schemas map[string]*proto.Schema) error {
|
func (m *PluginManager) populatePluginColumnsTable(ctx context.Context, schemas map[string]*proto.Schema) error {
|
||||||
|
if len(schemas) == 0 {
|
||||||
|
log.Printf("[INFO] populatePluginColumnsTable : no updates to plugin_columns table")
|
||||||
|
return nil
|
||||||
|
}
|
||||||
|
log.Printf("[INFO] populating plugin_columns table for plugins %s", strings.Join(maps.Keys(schemas), ","))
|
||||||
var queries []db_common.QueryWithArgs
|
var queries []db_common.QueryWithArgs
|
||||||
for plugin, schema := range schemas {
|
for plugin, schema := range schemas {
|
||||||
// drop entries for this plugin
|
// drop entries for this plugin
|
||||||
|
|||||||
@@ -154,8 +154,8 @@ func populateConnectionUpdates(ctx context.Context, pool *pgxpool.Pool, pluginMa
|
|||||||
// we need to refetch the rate limiters for this plugin
|
// we need to refetch the rate limiters for this plugin
|
||||||
if res.pluginBinaryChanged {
|
if res.pluginBinaryChanged {
|
||||||
// store map item of plugin name to connection name (so we only have one entry per plugin)
|
// store map item of plugin name to connection name (so we only have one entry per plugin)
|
||||||
pluginShortName := GlobalConfig.Connections[requiredConnectionState.ConnectionName].PluginAlias
|
pluginLogName := GlobalConfig.Connections[requiredConnectionState.ConnectionName].Plugin
|
||||||
updates.PluginsWithUpdatedBinary[pluginShortName] = requiredConnectionState.ConnectionName
|
updates.PluginsWithUpdatedBinary[pluginLogName] = requiredConnectionState.ConnectionName
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|||||||
Reference in New Issue
Block a user