mirror of
https://github.com/turbot/steampipe.git
synced 2025-12-19 18:12:43 -05:00
192 lines
5.5 KiB
Go
192 lines
5.5 KiB
Go
package pluginmanager_service
|
|
|
|
import (
|
|
"context"
|
|
"fmt"
|
|
"log"
|
|
"slices"
|
|
"strings"
|
|
|
|
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
|
|
"github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
|
|
sdkplugin "github.com/turbot/steampipe-plugin-sdk/v5/plugin"
|
|
"github.com/turbot/steampipe/v2/pkg/constants"
|
|
"github.com/turbot/steampipe/v2/pkg/db/db_common"
|
|
"github.com/turbot/steampipe/v2/pkg/db/db_local"
|
|
"github.com/turbot/steampipe/v2/pkg/introspection"
|
|
pb "github.com/turbot/steampipe/v2/pkg/pluginmanager_service/grpc/proto"
|
|
"golang.org/x/exp/maps"
|
|
)
|
|
|
|
func (m *PluginManager) initialisePluginColumns(ctx context.Context) error {
|
|
if m.shouldBootstrapPluginColumnTable(ctx) {
|
|
return m.bootstrapPluginColumnTable(ctx)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func (m *PluginManager) shouldBootstrapPluginColumnTable(ctx context.Context) bool {
|
|
pluginColumnTableExists, err := m.tableExists(ctx, constants.InternalSchema, constants.PluginColumnTable)
|
|
if err != nil || !pluginColumnTableExists {
|
|
return true
|
|
}
|
|
// check columns match
|
|
query := fmt.Sprintf(`SELECT column_name
|
|
FROM information_schema.columns
|
|
WHERE table_schema = '%s'
|
|
AND table_name = '%s'`, constants.InternalSchema, constants.PluginColumnTable)
|
|
|
|
rows, err := m.pool.Query(ctx, query)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
defer rows.Close()
|
|
var columns []string
|
|
// Iterate through the rows
|
|
for rows.Next() {
|
|
var s string
|
|
err := rows.Scan(&s)
|
|
if err != nil {
|
|
return true
|
|
}
|
|
columns = append(columns, s)
|
|
}
|
|
|
|
// Check for errors from iterating over rows
|
|
if err = rows.Err(); err != nil {
|
|
return true
|
|
}
|
|
|
|
expectedColumns := []string{"plugin_", "table_name", "name", "type", " description", "list_config", "get_config", "hydrate_name", "default_value"}
|
|
return !slices.Equal(columns, expectedColumns)
|
|
}
|
|
|
|
func (m *PluginManager) bootstrapPluginColumnTable(ctx context.Context) error {
|
|
schemas, err := m.loadPluginSchemas(m.getPluginExemplarConnections())
|
|
if err != nil {
|
|
log.Printf("[WARN] loadPluginSchemas error: %s", err.Error())
|
|
return err
|
|
}
|
|
|
|
if err := m.createPluginColumnsTable(ctx); err != nil {
|
|
log.Printf("[WARN] createPluginColumnsTable error: %s", err.Error())
|
|
return err
|
|
}
|
|
// now populate the table
|
|
log.Printf("[INFO] bootstrapPluginColumnTable loaded schema for plugins: %s", strings.Join(maps.Keys(schemas), ","))
|
|
|
|
return m.populatePluginColumnsTable(ctx, schemas)
|
|
}
|
|
|
|
func (m *PluginManager) createPluginColumnsTable(ctx context.Context) error {
|
|
queries := []db_common.QueryWithArgs{
|
|
introspection.GetPluginColumnTableDropSql(),
|
|
introspection.GetPluginColumnTableCreateSql(),
|
|
introspection.GetPluginColumnTableGrantSql(),
|
|
}
|
|
|
|
conn, err := m.pool.Acquire(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Release()
|
|
|
|
_, err = db_local.ExecuteSqlWithArgsInTransaction(ctx, conn.Conn(), queries...)
|
|
return err
|
|
}
|
|
|
|
func (m *PluginManager) populatePluginColumnsTable(ctx context.Context, schemas map[string]*proto.Schema) error {
|
|
if len(schemas) == 0 {
|
|
log.Printf("[INFO] populatePluginColumnsTable : no updates to plugin_columns table")
|
|
return nil
|
|
}
|
|
log.Printf("[INFO] populating plugin_columns table for plugins %s", strings.Join(maps.Keys(schemas), ","))
|
|
var queries []db_common.QueryWithArgs
|
|
for plugin, schema := range schemas {
|
|
// drop entries for this plugin
|
|
queries = append(queries, introspection.GetPluginColumnTableDeletePluginSql(plugin))
|
|
|
|
// NOTE: we do not support dynamic plugins
|
|
if schema.Mode == sdkplugin.SchemaModeDynamic {
|
|
continue
|
|
}
|
|
pluginQueries, err := introspection.GetPluginColumnTablePopulateSqlForPlugin(plugin, schema.Schema)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
queries = append(queries, pluginQueries...)
|
|
}
|
|
|
|
conn, err := m.pool.Acquire(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Release()
|
|
|
|
_, err = db_local.ExecuteSqlWithArgsInTransaction(ctx, conn.Conn(), queries...)
|
|
return err
|
|
}
|
|
|
|
func (m *PluginManager) removePluginsFromPluginColumnsTable(ctx context.Context, plugins []string) error {
|
|
if len(plugins) == 0 {
|
|
return nil
|
|
}
|
|
|
|
var queries []db_common.QueryWithArgs
|
|
|
|
for _, plugin := range plugins {
|
|
// drop entries for this plugin
|
|
queries = append(queries, introspection.GetPluginColumnTableDeletePluginSql(plugin))
|
|
}
|
|
|
|
conn, err := m.pool.Acquire(ctx)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
defer conn.Release()
|
|
|
|
_, err = db_local.ExecuteSqlWithArgsInTransaction(ctx, conn.Conn(), queries...)
|
|
return err
|
|
}
|
|
|
|
// load the schemas for the given plugin connections
|
|
func (m *PluginManager) loadPluginSchemas(pluginConnectionMap map[string]string) (map[string]*proto.Schema, error) {
|
|
// build Get request
|
|
req := &pb.GetRequest{
|
|
Connections: maps.Values(pluginConnectionMap),
|
|
}
|
|
plugins, err := m.Get(req)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
var res = make(map[string]*proto.Schema)
|
|
|
|
// ok so now we have all necessary plugin reattach configs - fetch the schemas
|
|
|
|
for _, reattach := range plugins.ReattachMap {
|
|
// attach to the plugin process
|
|
pluginClient, err := sdkgrpc.NewPluginClientFromReattach(reattach.Convert(), reattach.Plugin)
|
|
if err != nil {
|
|
log.Printf("[WARN] failed to attach to plugin '%s' - pid %d: %s",
|
|
reattach.Plugin, reattach.Pid, err)
|
|
return nil, err
|
|
}
|
|
|
|
schemaResp, err := pluginClient.GetSchema(reattach.Connections[0])
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
res[reattach.Plugin] = schemaResp
|
|
}
|
|
|
|
return res, nil
|
|
}
|
|
|
|
func (m *PluginManager) UpdatePluginColumnsTable(ctx context.Context, update map[string]*proto.Schema, delete []string) error {
|
|
if err := m.removePluginsFromPluginColumnsTable(ctx, delete); err != nil {
|
|
return err
|
|
}
|
|
return m.populatePluginColumnsTable(ctx, update)
|
|
}
|