Files
steampipe/pkg/initialisation/init_data.go

216 lines
7.0 KiB
Go

package initialisation
import (
"context"
"fmt"
"log"
"github.com/jackc/pgx/v5"
"github.com/spf13/viper"
"github.com/turbot/go-kit/helpers"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"github.com/turbot/steampipe-plugin-sdk/v5/telemetry"
"github.com/turbot/steampipe/pkg/constants"
"github.com/turbot/steampipe/pkg/db/db_client"
"github.com/turbot/steampipe/pkg/db/db_common"
"github.com/turbot/steampipe/pkg/db/db_local"
"github.com/turbot/steampipe/pkg/error_helpers"
"github.com/turbot/steampipe/pkg/export"
"github.com/turbot/steampipe/pkg/modinstaller"
"github.com/turbot/steampipe/pkg/plugin"
"github.com/turbot/steampipe/pkg/statushooks"
"github.com/turbot/steampipe/pkg/steampipeconfig/modconfig"
"github.com/turbot/steampipe/pkg/workspace"
)
type InitData struct {
Workspace *workspace.Workspace
Client db_common.Client
Result *db_common.InitResult
ShutdownTelemetry func()
ExportManager *export.Manager
}
func NewErrorInitData(err error) *InitData {
return &InitData{
Result: &db_common.InitResult{Error: err},
}
}
func NewInitData() *InitData {
i := &InitData{
Result: &db_common.InitResult{},
ExportManager: export.NewManager(),
}
return i
}
func (i *InitData) RegisterExporters(exporters ...export.Exporter) *InitData {
for _, e := range exporters {
if err := i.ExportManager.Register(e); err != nil {
// short circuit if there is an error
i.Result.Error = err
return i
}
}
return i
}
func (i *InitData) Init(ctx context.Context, invoker constants.Invoker, opts ...db_client.ClientOption) {
defer func() {
if r := recover(); r != nil {
i.Result.Error = helpers.ToError(r)
}
// if there is no error, return context cancellation error (if any)
if i.Result.Error == nil {
i.Result.Error = ctx.Err()
}
}()
log.Printf("[INFO] Initializing...")
// code after this depends of i.Workspace being defined. make sure that it is
if i.Workspace == nil {
i.Result.Error = sperr.WrapWithRootMessage(error_helpers.InvalidStateError, "InitData.Init called before setting up Workspace")
return
}
statushooks.SetStatus(ctx, "Initializing")
// initialise telemetry
shutdownTelemetry, err := telemetry.Init(constants.AppName)
if err != nil {
i.Result.AddWarnings(err.Error())
} else {
i.ShutdownTelemetry = shutdownTelemetry
}
// install mod dependencies if needed
if viper.GetBool(constants.ArgModInstall) {
statushooks.SetStatus(ctx, "Installing workspace dependencies")
log.Printf("[INFO] Installing workspace dependencies")
opts := modinstaller.NewInstallOpts(i.Workspace.Mod)
// use force install so that errors are ignored during installation
// (we are validating prereqs later)
opts.Force = true
_, err := modinstaller.InstallWorkspaceDependencies(ctx, opts)
if err != nil {
i.Result.Error = err
return
}
}
// retrieve cloud metadata
cloudMetadata, err := getCloudMetadata(ctx)
if err != nil {
i.Result.Error = err
return
}
// set cloud metadata (may be nil)
i.Workspace.CloudMetadata = cloudMetadata
statushooks.SetStatus(ctx, "Checking for required plugins")
log.Printf("[INFO] Checking for required plugins")
pluginsInstalled, err := plugin.GetInstalledPlugins(ctx)
if err != nil {
i.Result.Error = err
return
}
// no need to validate local steampipe and plugin versions for when connecting to remote steampipe database
// ArgConnectionString is empty when connecting to local database
if connectionString := viper.GetString(constants.ArgConnectionString); connectionString == "" {
// validate steampipe version and required plugin version
validationWarnings := validateModRequirementsRecursively(i.Workspace.Mod, pluginsInstalled)
i.Result.AddWarnings(validationWarnings...)
}
// if introspection tables are enabled, setup the session data callback
var ensureSessionData db_client.DbConnectionCallback
if viper.GetString(constants.ArgIntrospection) != constants.IntrospectionNone {
ensureSessionData = func(ctx context.Context, conn *pgx.Conn) error {
return workspace.EnsureSessionData(ctx, i.Workspace.GetResourceMaps(), conn)
}
}
// get a client
// add a message rendering function to the context - this is used for the fdw update message and
// allows us to render it as a standard initialisation message
getClientCtx := statushooks.AddMessageRendererToContext(ctx, func(format string, a ...any) {
i.Result.AddMessage(fmt.Sprintf(format, a...))
})
statushooks.SetStatus(ctx, "Connecting to steampipe database")
log.Printf("[INFO] Connecting to steampipe database")
client, errorsAndWarnings := GetDbClient(getClientCtx, invoker, ensureSessionData, opts...)
if errorsAndWarnings.Error != nil {
i.Result.Error = errorsAndWarnings.Error
return
}
i.Result.AddWarnings(errorsAndWarnings.Warnings...)
log.Printf("[INFO] ValidateClientCacheSettings")
errorsAndWarnings = db_common.ValidateClientCacheSettings(client)
if errorsAndWarnings.GetError() != nil {
i.Result.Error = errorsAndWarnings.GetError()
}
i.Result.AddWarnings(errorsAndWarnings.Warnings...)
i.Client = client
}
func validateModRequirementsRecursively(mod *modconfig.Mod, pluginVersionMap map[string]*modconfig.PluginVersionString) []string {
var validationErrors []string
// validate this mod
for _, err := range mod.ValidateRequirements(pluginVersionMap) {
validationErrors = append(validationErrors, err.Error())
}
// validate dependent mods
for childDependencyName, childMod := range mod.ResourceMaps.Mods {
// TODO : The 'mod.DependencyName == childMod.DependencyName' check has to be done because
// of a bug in the resource loading code which also puts the mod itself into the resource map
// [https://github.com/turbot/steampipe/issues/3341]
if childDependencyName == "local" || mod.DependencyName == childMod.DependencyName {
// this is a reference to self - skip (otherwise we will end up with a recursion loop)
continue
}
childValidationErrors := validateModRequirementsRecursively(childMod, pluginVersionMap)
validationErrors = append(validationErrors, childValidationErrors...)
}
return validationErrors
}
// GetDbClient either creates a DB client using the configured connection string (if present) or creates a LocalDbClient
func GetDbClient(ctx context.Context, invoker constants.Invoker, onConnectionCallback db_client.DbConnectionCallback, opts ...db_client.ClientOption) (db_common.Client, error_helpers.ErrorAndWarnings) {
if connectionString := viper.GetString(constants.ArgConnectionString); connectionString != "" {
statushooks.SetStatus(ctx, "Connecting to remote Steampipe database")
client, err := db_client.NewDbClient(ctx, connectionString, onConnectionCallback, opts...)
return client, error_helpers.NewErrorsAndWarning(err)
}
statushooks.SetStatus(ctx, "Starting local Steampipe database")
log.Printf("[INFO] Starting local Steampipe database")
return db_local.GetLocalClient(ctx, invoker, onConnectionCallback, opts...)
}
func (i *InitData) Cleanup(ctx context.Context) {
if i.Client != nil {
i.Client.Close(ctx)
}
if i.ShutdownTelemetry != nil {
i.ShutdownTelemetry()
}
if i.Workspace != nil {
i.Workspace.Close()
}
}