Adds 'steampipe_server_settings' table populated with server settings data during service startup. Closes #3462

This commit is contained in:
Binaek Sarkar
2023-06-21 17:53:14 +05:30
committed by GitHub
parent fc541bda84
commit a1caf53eeb
15 changed files with 231 additions and 128 deletions

View File

@@ -313,6 +313,8 @@ jobs:
test_block:
- "migration"
- "service_and_plugin"
- "service"
- "settings"
- "search_path"
- "chaos_and_query"
- "dynamic_schema"

View File

@@ -95,6 +95,7 @@ jobs:
- "service_and_plugin"
- "blank_aggregators"
- "service"
- "settings"
- "search_path"
- "chaos_and_query"
- "dynamic_schema"

View File

@@ -50,6 +50,9 @@ const (
// also used to send commands to the FDW
InternalSchema = "steampipe_internal"
// ServerSettingsTable is the table used to store steampipe service configuration
ServerSettingsTable = "steampipe_server_settings"
// ConnectionStateTable is the table used to store steampipe connection state
ConnectionStateTable = "steampipe_connection_state"
ConnectionStatePending = "pending"

View File

@@ -1,108 +0,0 @@
package controldisplay
import (
"fmt"
"reflect"
"runtime/debug"
"sort"
"strings"
"github.com/turbot/steampipe/pkg/control/controlexecute"
)
type CsvColumnPair struct {
fieldName string
columnName string
}
type ResultColumns struct {
AllColumns []string
GroupColumns []CsvColumnPair
ResultColumns []CsvColumnPair
DimensionColumns []string
TagColumns []string
}
func newResultColumns(e *controlexecute.ExecutionTree) *ResultColumns {
groupColumns := getCsvColumns(*e.Root)
rowColumns := getCsvColumns(controlexecute.ResultRow{})
dimensionColumns := e.DimensionColorGenerator.GetDimensionProperties()
tagColumns := e.GetAllTags()
sort.Strings(dimensionColumns)
sort.Strings(tagColumns)
sort.SliceStable(rowColumns[:], func(i, j int) bool {
iControlField := strings.HasPrefix(rowColumns[i].fieldName, "Control")
jControlField := strings.HasPrefix(rowColumns[j].fieldName, "Control")
// if both are `Control` fields - let them be as is
// if one of them is a `Control` field - bring it to the front
return iControlField != jControlField
// TODO :: try to make this a bit generic, so that it's not only the
// `Control` subfields which are considered
})
allColumns := []string{}
for _, gC := range groupColumns {
allColumns = append(allColumns, gC.columnName)
}
for _, rC := range rowColumns {
allColumns = append(allColumns, rC.columnName)
}
allColumns = append(allColumns, dimensionColumns...)
allColumns = append(allColumns, tagColumns...)
return &ResultColumns{
GroupColumns: groupColumns,
ResultColumns: rowColumns,
DimensionColumns: dimensionColumns,
TagColumns: tagColumns,
AllColumns: allColumns,
}
}
func getCsvColumns(item interface{}) []CsvColumnPair {
columns := []CsvColumnPair{}
t := reflect.TypeOf(item)
val := reflect.ValueOf(item)
for i := 0; i < val.NumField(); i++ {
fieldName := val.Type().Field(i).Name
field, _ := t.FieldByName(fieldName)
tag, ok := field.Tag.Lookup("csv")
if ok {
// split by comma
csvAttrs := strings.Split(tag, ",")
for _, csvAttr := range csvAttrs {
// trim spaces from the sides
csvAttr = strings.TrimSpace(csvAttr)
// csvColumnName[:propertyNameOfValue]
split := strings.SplitN(csvAttr, ":", 2)
if len(split) > 1 {
// is this a sub-property
columns = append(columns, CsvColumnPair{
fieldName: fmt.Sprintf("%s.%s", fieldName, strings.TrimSpace(split[1])),
columnName: strings.TrimSpace(split[0]),
})
} else {
columns = append(columns, CsvColumnPair{
fieldName: fieldName,
columnName: csvAttr,
})
}
}
}
}
if len(columns) == 0 {
debug.PrintStack()
panic(fmt.Errorf("getCsvColumns: given interface does not contain any CSV tags"))
}
return columns
}

View File

@@ -13,6 +13,7 @@ import (
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/serversettings"
"github.com/turbot/steampipe/pkg/steampipeconfig"
"github.com/turbot/steampipe/pkg/utils"
"golang.org/x/exp/maps"
@@ -24,6 +25,9 @@ type DbClient struct {
connectionString string
pool *pgxpool.Pool
// the settings of the server that this client is connected to
serverSettings *db_common.ServerSettings
// this flag is set if the service that this client
// is connected to is running in the same physical system
isLocalService bool
@@ -50,7 +54,7 @@ type DbClient struct {
onConnectionCallback DbConnectionCallback
}
func NewDbClient(ctx context.Context, connectionString string, onConnectionCallback DbConnectionCallback) (*DbClient, error) {
func NewDbClient(ctx context.Context, connectionString string, onConnectionCallback DbConnectionCallback) (_ *DbClient, err error) {
utils.LogTime("db_client.NewDbClient start")
defer utils.LogTime("db_client.NewDbClient end")
@@ -76,25 +80,57 @@ func NewDbClient(ctx context.Context, connectionString string, onConnectionCallb
connectionString: connectionString,
}
defer func() {
if err != nil {
// try closing the client
client.Close(ctx)
}
}()
if err := client.establishConnectionPool(ctx); err != nil {
return nil, err
}
// load up the server settings
if err := client.loadServerSettings(ctx); err != nil {
return nil, err
}
// set user search path
err := client.LoadUserSearchPath(ctx)
if err != nil {
if err := client.LoadUserSearchPath(ctx); err != nil {
return nil, err
}
// populate customSearchPath
if err := client.SetRequiredSessionSearchPath(ctx); err != nil {
client.Close(ctx)
return nil, err
}
return client, nil
}
func (c *DbClient) loadServerSettings(ctx context.Context) error {
conn, _, err := c.GetDatabaseConnectionWithRetries(ctx)
if err != nil {
return err
}
defer conn.Release()
serverSettings, err := serversettings.Load(ctx, conn.Conn())
if err != nil {
if _, _, notFound := IsRelationNotFoundError(err); notFound {
// when connecting to pre-0.21.0 services, the server_settings table will not be available.
// this is expected and not an error
// code which uses server_settings should handle this
log.Printf("[INFO] could not find %s.%s table.", constants.InternalSchema, constants.ServerSettingsTable)
return nil
}
return err
}
c.serverSettings = serverSettings
log.Println("[TRACE] loaded server settings:", serverSettings)
return nil
}
func (c *DbClient) setShouldShowTiming(ctx context.Context, session *db_common.DatabaseSession) {
currentShowTimingFlag := viper.GetBool(constants.ArgTiming)
@@ -111,6 +147,14 @@ func (c *DbClient) shouldShowTiming() bool {
return c.showTimingFlag && !c.disableTiming
}
// ServerSettings returns the settings of the steampipe service that this DbClient is connected to
//
// Keep in mind that when connecting to pre-0.21.x servers, the server_settings data is not available. This is expected.
// Code which read server_settings should take this into account.
func (c *DbClient) ServerSettings() *db_common.ServerSettings {
return c.serverSettings
}
// Close implements Client
// closes the connection to the database and shuts down the backend
func (c *DbClient) Close(context.Context) error {

View File

@@ -9,10 +9,8 @@ import (
"github.com/jackc/pgx/v5"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe/pkg/cmdconfig"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/steampipeconfig"
)
// SetRequiredSessionSearchPath implements Client
@@ -88,19 +86,6 @@ func (c *DbClient) GetCustomSearchPath() []string {
return c.customSearchPath
}
// reload Steampipe config, update viper and re-set required search path
func (c *DbClient) updateRequiredSearchPath(ctx context.Context) error {
config, errorsAndWarnings := steampipeconfig.LoadSteampipeConfig(viper.GetString(constants.ArgModLocation), "dashboard")
if errorsAndWarnings.GetError() != nil {
return errorsAndWarnings.GetError()
}
// todo review this usage of GlobalConfig
// https://github.com/turbot/steampipe/issues/3387
steampipeconfig.GlobalConfig = config
cmdconfig.SetDefaultsFromConfig(steampipeconfig.GlobalConfig.ConfigMap())
return c.SetRequiredSessionSearchPath(ctx)
}
// ensure the search path for the database session is as required
func (c *DbClient) ensureSessionSearchPath(ctx context.Context, session *db_common.DatabaseSession) error {
log.Printf("[TRACE] ensureSessionSearchPath")

View File

@@ -2,6 +2,7 @@ package db_common
import (
"context"
"github.com/jackc/pgx/v5/pgxpool"
"github.com/turbot/steampipe/pkg/query/queryresult"
)
@@ -27,4 +28,6 @@ type Client interface {
RefreshSessions(context.Context) *AcquireSessionResult
GetSchemaFromDB(context.Context) (*SchemaMetadata, error)
ServerSettings() *ServerSettings
}

View File

@@ -0,0 +1,14 @@
package db_common
import (
"time"
)
type ServerSettings struct {
StartTime time.Time `db:"start_time"`
SteampipeVersion string `db:"steampipe_version"`
FdwVersion string `db:"fdw_version"`
CacheMaxTtl int `db:"cache_max_ttl"`
CacheMaxSizeMb int `db:"cache_max_size_mb"`
CacheEnabled bool `db:"cache_enabled"`
}

View File

@@ -2,6 +2,7 @@ package db_local
import (
"context"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgconn"
"github.com/turbot/steampipe/pkg/constants"

View File

@@ -0,0 +1,41 @@
package db_local
import (
"context"
"log"
"time"
"github.com/jackc/pgx/v5"
"github.com/spf13/viper"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/serversettings"
"github.com/turbot/steampipe/pkg/version"
)
// setupServerSettingsTable creates a new read-only table with information in the current
// settings the service has been started with.
//
// The table also includes the CLI and FDW versions for reference
func setupServerSettingsTable(ctx context.Context, conn *pgx.Conn) error {
settings := db_common.ServerSettings{
StartTime: time.Now(),
SteampipeVersion: version.VersionString,
FdwVersion: constants.FdwVersion,
CacheMaxTtl: viper.GetInt(constants.ArgCacheMaxTtl),
CacheMaxSizeMb: viper.GetInt(constants.ArgMaxCacheSizeMb),
CacheEnabled: viper.GetBool(constants.ArgServiceCacheEnabled),
}
queries := []db_common.QueryWithArgs{
serversettings.DropServerSettingsTable(ctx),
serversettings.CreateServerSettingsTable(ctx),
serversettings.GrantsOnServerSettingsTable(ctx),
serversettings.GetPopulateServerSettingsSql(ctx, settings),
}
log.Println("[TRACE] saved server settings:", settings)
_, err := ExecuteSqlWithArgsInTransaction(ctx, conn, queries...)
return err
}

View File

@@ -165,13 +165,20 @@ func postServiceStart(ctx context.Context, res *StartResult) error {
if err := setupInternal(ctx, conn); err != nil {
return err
}
// ensure connection stat etable contains entries for all connections in connection config
// ensure connection state table contains entries for all connections in connection config
// (this is to allow for the race condition between polling connection state and calling refresh connections,
// which does not update the connection_state with added connections until it has built the ConnectionUpdates
if err := initializeConnectionStateTable(ctx, conn); err != nil {
return err
}
// create the server settings table
// this table contains configuration that this instance of the service
// is booting with
if err := setupServerSettingsTable(ctx, conn); err != nil {
return err
}
// create the clone_foreign_schema function
if _, err := executeSqlAsRoot(ctx, cloneForeignSchemaSQL); err != nil {
return sperr.WrapWithMessage(err, "failed to create clone_foreign_schema function")

View File

@@ -0,0 +1,29 @@
package serversettings
import (
"context"
"fmt"
"github.com/jackc/pgx/v5"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/sperr"
)
func Load(ctx context.Context, conn *pgx.Conn) (_ *db_common.ServerSettings, e error) {
defer func() {
// this function uses reflection to extract and convert values
// we need to be able to recover from panics while using reflection
if r := recover(); r != nil {
e = sperr.ToError(r, sperr.WithMessage("error loading server settings"))
}
}()
rows, err := conn.Query(ctx, fmt.Sprintf("SELECT * FROM %s.%s", constants.InternalSchema, constants.ServerSettingsTable))
if err != nil {
return nil, err
}
defer rows.Close()
return pgx.CollectOneRow(rows, pgx.RowToAddrOfStructByName[db_common.ServerSettings])
}

View File

@@ -0,0 +1,64 @@
package serversettings
import (
"context"
"fmt"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_common"
)
func GetPopulateServerSettingsSql(ctx context.Context, settings db_common.ServerSettings) db_common.QueryWithArgs {
return db_common.QueryWithArgs{
Query: fmt.Sprintf(`INSERT INTO %s.%s (
start_time,
steampipe_version,
fdw_version,
cache_max_ttl,
cache_max_size_mb,
cache_enabled)
VALUES($1,$2,$3,$4,$5,$6)`, constants.InternalSchema, constants.ServerSettingsTable),
Args: []any{
settings.StartTime,
settings.SteampipeVersion,
settings.FdwVersion,
settings.CacheMaxTtl,
settings.CacheMaxSizeMb,
settings.CacheEnabled,
},
}
}
func CreateServerSettingsTable(ctx context.Context) db_common.QueryWithArgs {
return db_common.QueryWithArgs{
Query: fmt.Sprintf(`CREATE TABLE IF NOT EXISTS %s.%s (
start_time TIMESTAMPTZ NOT NULL,
steampipe_version TEXT NOT NULL,
fdw_version TEXT NOT NULL,
cache_max_ttl INTEGER NOT NULL,
cache_max_size_mb INTEGER NOT NULL,
cache_enabled BOOLEAN NOT NULL
);`, constants.InternalSchema, constants.ServerSettingsTable),
}
}
func GrantsOnServerSettingsTable(ctx context.Context) db_common.QueryWithArgs {
return db_common.QueryWithArgs{
Query: fmt.Sprintf(
`GRANT SELECT ON TABLE %s.%s to %s;`,
constants.InternalSchema,
constants.ServerSettingsTable,
constants.DatabaseUsersRole,
),
}
}
func DropServerSettingsTable(ctx context.Context) db_common.QueryWithArgs {
return db_common.QueryWithArgs{
Query: fmt.Sprintf(
`DROP TABLE IF EXISTS %s.%s;`,
constants.InternalSchema,
constants.ServerSettingsTable,
),
}
}

View File

@@ -1,6 +1,16 @@
load "$LIB_BATS_ASSERT/load.bash"
load "$LIB_BATS_SUPPORT/load.bash"
@test "verify installed fdw version" {
run steampipe query "select * from steampipe_internal.steampipe_server_settings" --output=json
# extract the first mod_name from the list
fdw_version=$(echo $output | jq '.[0].fdw_version')
desired_fdw_version=$(cat $STEAMPIPE_INSTALL_DIR/db/versions.json | jq '.fdw_extension.version')
assert_equal "$fdw_version" "$desired_fdw_version"
}
@test "service stability" {
echo "# Setting up"
steampipe query "select 1"

View File

@@ -0,0 +1,7 @@
load "$LIB_BATS_ASSERT/load.bash"
load "$LIB_BATS_SUPPORT/load.bash"
@test "verify steampipe_server_settings table" {
run steampipe query "select * from steampipe_server_settings"
assert_success
}