mirror of
https://github.com/turbot/steampipe.git
synced 2026-05-13 16:00:12 -04:00
working 1
This commit is contained in:
@@ -10,11 +10,11 @@ import (
|
||||
|
||||
"github.com/spf13/viper"
|
||||
pconstants "github.com/turbot/pipe-fittings/constants"
|
||||
"github.com/turbot/pipe-fittings/export"
|
||||
"github.com/turbot/pipe-fittings/modconfig"
|
||||
"github.com/turbot/steampipe/pkg/constants"
|
||||
"github.com/turbot/steampipe/pkg/db/db_client"
|
||||
"github.com/turbot/steampipe/pkg/error_helpers"
|
||||
"github.com/turbot/steampipe/pkg/export"
|
||||
"github.com/turbot/steampipe/pkg/initialisation"
|
||||
"github.com/turbot/steampipe/pkg/statushooks"
|
||||
)
|
||||
@@ -23,6 +23,7 @@ type InitData struct {
|
||||
initialisation.InitData
|
||||
|
||||
cancelInitialisation context.CancelFunc
|
||||
StartTime time.Time
|
||||
Loaded chan struct{}
|
||||
// map of query name to resolved query (key is the query text for command line queries)
|
||||
Queries []*modconfig.ResolvedQuery
|
||||
@@ -33,8 +34,9 @@ type InitData struct {
|
||||
// InitData.Done closes after asynchronous initialization completes
|
||||
func NewInitData(ctx context.Context, args []string) *InitData {
|
||||
i := &InitData{
|
||||
InitData: *initialisation.NewInitData(),
|
||||
Loaded: make(chan struct{}),
|
||||
StartTime: time.Now(),
|
||||
InitData: *initialisation.NewInitData(),
|
||||
Loaded: make(chan struct{}),
|
||||
}
|
||||
|
||||
statushooks.SetStatus(ctx, "Loading workspace")
|
||||
@@ -46,8 +48,8 @@ func NewInitData(ctx context.Context, args []string) *InitData {
|
||||
|
||||
func queryExporters() []export.Exporter {
|
||||
// TODO #snapshot
|
||||
return nil
|
||||
//return []export.Exporter{&export.SnapshotExporter{}}
|
||||
// return nil
|
||||
return []export.Exporter{&export.SnapshotExporter{}}
|
||||
}
|
||||
|
||||
func (i *InitData) Cancel() {
|
||||
@@ -84,16 +86,16 @@ func (i *InitData) init(ctx context.Context, args []string) {
|
||||
i.cancelInitialisation = nil
|
||||
}()
|
||||
|
||||
// validate export args
|
||||
if len(viper.GetStringSlice(pconstants.ArgExport)) > 0 {
|
||||
i.RegisterExporters(queryExporters()...)
|
||||
// // validate export args
|
||||
// if len(viper.GetStringSlice(pconstants.ArgExport)) > 0 {
|
||||
// i.RegisterExporters(queryExporters()...)
|
||||
|
||||
// validate required export formats
|
||||
if err := i.ExportManager.ValidateExportFormat(viper.GetStringSlice(pconstants.ArgExport)); err != nil {
|
||||
i.Result.Error = err
|
||||
return
|
||||
}
|
||||
}
|
||||
// // validate required export formats
|
||||
// if err := i.ExportManager.ValidateExportFormat(viper.GetStringSlice(pconstants.ArgExport)); err != nil {
|
||||
// i.Result.Error = err
|
||||
// return
|
||||
// }
|
||||
// }
|
||||
|
||||
// set max DB connections to 1
|
||||
viper.Set(pconstants.ArgMaxParallel, 1)
|
||||
|
||||
@@ -29,7 +29,7 @@ func RunInteractiveSession(ctx context.Context, initData *query.InitData) error
|
||||
|
||||
// print the data as it comes
|
||||
for r := range result.Streamer.Results {
|
||||
rowCount, _ := querydisplay.ShowOutput(ctx, r, nil, nil)
|
||||
rowCount, _ := querydisplay.ShowOutput(ctx, time.Now(), r, nil, nil)
|
||||
// show timing
|
||||
display.DisplayTiming(r, rowCount)
|
||||
// signal to the resultStreamer that we are done with this chunk of the stream
|
||||
@@ -82,7 +82,7 @@ func executeQueries(ctx context.Context, initData *query.InitData) int {
|
||||
|
||||
for i, q := range initData.Queries {
|
||||
// if executeQuery fails it returns err, else it returns the number of rows that returned errors while execution
|
||||
if err, failures = executeQuery(ctx, initData.Client, q); err != nil {
|
||||
if err, failures = executeQuery(ctx, initData, q); err != nil {
|
||||
failures++
|
||||
error_helpers.ShowWarning(fmt.Sprintf("query %d of %d failed: %v", i+1, len(initData.Queries), error_helpers.DecodePgError(err)))
|
||||
// if timing flag is enabled, show the time taken for the query to fail
|
||||
@@ -100,12 +100,12 @@ func executeQueries(ctx context.Context, initData *query.InitData) int {
|
||||
return failures
|
||||
}
|
||||
|
||||
func executeQuery(ctx context.Context, client db_common.Client, resolvedQuery *modconfig.ResolvedQuery) (error, int) {
|
||||
func executeQuery(ctx context.Context, initData *query.InitData, resolvedQuery *modconfig.ResolvedQuery) (error, int) {
|
||||
utils.LogTime("query.execute.executeQuery start")
|
||||
defer utils.LogTime("query.execute.executeQuery end")
|
||||
|
||||
// the db executor sends result data over resultsStreamer
|
||||
resultsStreamer, err := db_common.ExecuteQuery(ctx, client, resolvedQuery.ExecuteSQL, resolvedQuery.Args...)
|
||||
resultsStreamer, err := db_common.ExecuteQuery(ctx, initData.Client, resolvedQuery.ExecuteSQL, resolvedQuery.Args...)
|
||||
if err != nil {
|
||||
return err, 0
|
||||
}
|
||||
@@ -113,7 +113,7 @@ func executeQuery(ctx context.Context, client db_common.Client, resolvedQuery *m
|
||||
rowErrors := 0 // get the number of rows that returned an error
|
||||
// print the data as it comes
|
||||
for r := range resultsStreamer.Results {
|
||||
rowCount, _ := querydisplay.ShowOutput(ctx, r, resolvedQuery, client.GetRequiredSessionSearchPath())
|
||||
rowCount, _ := querydisplay.ShowOutput(ctx, initData.StartTime, r, resolvedQuery, initData.Client.GetRequiredSessionSearchPath())
|
||||
// show timing
|
||||
display.DisplayTiming(r, rowCount)
|
||||
|
||||
|
||||
Reference in New Issue
Block a user