mirror of
https://github.com/turbot/steampipe.git
synced 2025-12-19 18:12:43 -05:00
249 lines
9.0 KiB
Go
249 lines
9.0 KiB
Go
package queryexecute
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"os"
|
|
"strings"
|
|
"time"
|
|
|
|
"github.com/spf13/viper"
|
|
"github.com/turbot/pipe-fittings/v2/constants"
|
|
pconstants "github.com/turbot/pipe-fittings/v2/constants"
|
|
"github.com/turbot/pipe-fittings/v2/contexthelpers"
|
|
"github.com/turbot/pipe-fittings/v2/modconfig"
|
|
"github.com/turbot/pipe-fittings/v2/pipes"
|
|
"github.com/turbot/pipe-fittings/v2/querydisplay"
|
|
pqueryresult "github.com/turbot/pipe-fittings/v2/queryresult"
|
|
"github.com/turbot/pipe-fittings/v2/steampipeconfig"
|
|
"github.com/turbot/pipe-fittings/v2/utils"
|
|
"github.com/turbot/steampipe/v2/pkg/cmdconfig"
|
|
"github.com/turbot/steampipe/v2/pkg/connection_sync"
|
|
"github.com/turbot/steampipe/v2/pkg/db/db_common"
|
|
"github.com/turbot/steampipe/v2/pkg/display"
|
|
"github.com/turbot/steampipe/v2/pkg/error_helpers"
|
|
"github.com/turbot/steampipe/v2/pkg/interactive"
|
|
"github.com/turbot/steampipe/v2/pkg/query"
|
|
"github.com/turbot/steampipe/v2/pkg/query/queryresult"
|
|
"github.com/turbot/steampipe/v2/pkg/snapshot"
|
|
)
|
|
|
|
func RunInteractiveSession(ctx context.Context, initData *query.InitData) error {
|
|
utils.LogTime("execute.RunInteractiveSession start")
|
|
defer utils.LogTime("execute.RunInteractiveSession end")
|
|
|
|
// the db executor sends result data over resultsStreamer
|
|
result := interactive.RunInteractivePrompt(ctx, initData)
|
|
|
|
// print the data as it comes
|
|
for r := range result.Streamer.Results {
|
|
// wrap the result from pipe-fittings with our wrapper that has idempotent Close
|
|
wrapped := queryresult.WrapResult(r)
|
|
rowCount, _ := querydisplay.ShowOutput(ctx, r)
|
|
// show timing
|
|
display.DisplayTiming(wrapped, rowCount)
|
|
// signal to the resultStreamer that we are done with this chunk of the stream
|
|
result.Streamer.AllResultsRead()
|
|
}
|
|
return result.PromptErr
|
|
}
|
|
|
|
func RunBatchSession(ctx context.Context, initData *query.InitData) (int, error) {
|
|
// start cancel handler to intercept interrupts and cancel the context
|
|
// NOTE: use the initData Cancel function to ensure any initialisation is cancelled if needed
|
|
contexthelpers.StartCancelHandler(initData.Cancel)
|
|
|
|
// wait for init
|
|
<-initData.Loaded
|
|
|
|
if err := initData.Result.Error; err != nil {
|
|
return 0, err
|
|
}
|
|
|
|
// display any initialisation messages/warnings
|
|
initData.Result.DisplayMessages()
|
|
|
|
// if there is a custom search path, wait until the first connection of each plugin has loaded
|
|
if customSearchPath := initData.Client.GetCustomSearchPath(); customSearchPath != nil {
|
|
if err := connection_sync.WaitForSearchPathSchemas(ctx, initData.Client, customSearchPath); err != nil {
|
|
return 0, err
|
|
}
|
|
}
|
|
|
|
failures := 0
|
|
if len(initData.Queries) > 0 {
|
|
// if we have resolved any queries, run them
|
|
failures = executeQueries(ctx, initData)
|
|
}
|
|
// return the number of query failures and the number of rows that returned errors
|
|
return failures, nil
|
|
}
|
|
|
|
func executeQueries(ctx context.Context, initData *query.InitData) int {
|
|
utils.LogTime("queryexecute.executeQueries start")
|
|
defer utils.LogTime("queryexecute.executeQueries end")
|
|
|
|
// failures return the number of queries that failed and also the number of rows that
|
|
// returned errors
|
|
failures := 0
|
|
t := time.Now()
|
|
|
|
var err error
|
|
|
|
for i, q := range initData.Queries {
|
|
// if executeQuery fails it returns err, else it returns the number of rows that returned errors while execution
|
|
if err, failures = executeQuery(ctx, initData, q); err != nil {
|
|
failures++
|
|
error_helpers.ShowWarning(fmt.Sprintf("query %d of %d failed: %v", i+1, len(initData.Queries), error_helpers.DecodePgError(err)))
|
|
// if timing flag is enabled, show the time taken for the query to fail
|
|
if cmdconfig.Viper().GetString(pconstants.ArgTiming) != pconstants.ArgOff {
|
|
querydisplay.DisplayErrorTiming(t)
|
|
}
|
|
}
|
|
// TODO move into display layer
|
|
// Only show the blank line between queries, not after the last one
|
|
if (i < len(initData.Queries)-1) && showBlankLineBetweenResults() {
|
|
fmt.Println()
|
|
}
|
|
}
|
|
|
|
return failures
|
|
}
|
|
|
|
func executeQuery(ctx context.Context, initData *query.InitData, resolvedQuery *modconfig.ResolvedQuery) (error, int) {
|
|
utils.LogTime("query.execute.executeQuery start")
|
|
defer utils.LogTime("query.execute.executeQuery end")
|
|
|
|
var snap *steampipeconfig.SteampipeSnapshot
|
|
|
|
// the db executor sends result data over resultsStreamer
|
|
resultsStreamer, err := db_common.ExecuteQuery(ctx, initData.Client, resolvedQuery.ExecuteSQL, resolvedQuery.Args...)
|
|
if err != nil {
|
|
return err, 0
|
|
}
|
|
|
|
rowErrors := 0 // get the number of rows that returned an error
|
|
// print the data as it comes
|
|
for r := range resultsStreamer.Results {
|
|
// wrap the result from pipe-fittings with our wrapper that has idempotent Close
|
|
wrapped := queryresult.WrapResult(r)
|
|
|
|
// if the output format is snapshot or export is set or share/snapshot args are set, we need to generate a snapshot
|
|
if needSnapshot() {
|
|
snap, err = snapshot.QueryResultToSnapshot(ctx, r, resolvedQuery, initData.Client.GetRequiredSessionSearchPath(), initData.StartTime)
|
|
if err != nil {
|
|
return err, 0
|
|
}
|
|
|
|
// re-generate the query result from the snapshot. since the row stream in the actual queryresult has been exhausted(while generating the snapshot),
|
|
// we need to re-generate it for other output formats
|
|
newQueryResult, err := snapshot.SnapshotToQueryResult[pqueryresult.TimingContainer](snap, initData.StartTime)
|
|
if err != nil {
|
|
return err, 0
|
|
}
|
|
|
|
// if the output format is snapshot we don't call the querydisplay code in pipe-fittings, instead we
|
|
// generate the snapshot and display it to stdout
|
|
outputFormat := viper.GetString(pconstants.ArgOutput)
|
|
if outputFormat == pconstants.OutputFormatSnapshot || outputFormat == pconstants.OutputFormatSteampipeSnapshotShort {
|
|
|
|
// display the snapshot as JSON
|
|
encoder := json.NewEncoder(os.Stdout)
|
|
encoder.SetIndent("", " ")
|
|
encoder.SetEscapeHTML(false)
|
|
if err := encoder.Encode(snap); err != nil {
|
|
//nolint:forbidigo // acceptable
|
|
fmt.Print("Error displaying result as snapshot", err)
|
|
return err, 0
|
|
}
|
|
}
|
|
|
|
// if we need to export the snapshot, we export it directly from here
|
|
if viper.IsSet(pconstants.ArgExport) {
|
|
exportArgs := viper.GetStringSlice(pconstants.ArgExport)
|
|
exportMsg, err := initData.ExportManager.DoExport(ctx, "query", snap, exportArgs)
|
|
if err != nil {
|
|
return err, 0
|
|
}
|
|
// print the location where the file is exported
|
|
if len(exportMsg) > 0 && viper.GetBool(pconstants.ArgProgress) {
|
|
fmt.Printf("\n") //nolint:forbidigo // intentional use of fmt
|
|
fmt.Println(strings.Join(exportMsg, "\n")) //nolint:forbidigo // intentional use of fmt
|
|
fmt.Printf("\n") //nolint:forbidigo // intentional use of fmt
|
|
}
|
|
}
|
|
|
|
// if we need to publish the snapshot, we publish it directly from here
|
|
if err := publishSnapshotIfNeeded(ctx, snap); err != nil {
|
|
return err, 0
|
|
}
|
|
|
|
// if other output formats are also needed, we call the querydisplay using the re-generated query result
|
|
rowCount, _ := querydisplay.ShowOutput(ctx, newQueryResult)
|
|
// show timing
|
|
display.DisplayTiming(wrapped, rowCount)
|
|
|
|
// signal to the resultStreamer that we are done with this result
|
|
resultsStreamer.AllResultsRead()
|
|
return nil, rowErrors
|
|
}
|
|
|
|
// for other output formats, we call the querydisplay code in pipe-fittings
|
|
rowCount, rowErrs := querydisplay.ShowOutput(ctx, r)
|
|
// show timing
|
|
display.DisplayTiming(wrapped, rowCount)
|
|
|
|
// signal to the resultStreamer that we are done with this result
|
|
resultsStreamer.AllResultsRead()
|
|
rowErrors = rowErrs
|
|
}
|
|
return nil, rowErrors
|
|
}
|
|
|
|
func needSnapshot() bool {
|
|
// Get the output format from the configuration
|
|
outputFormat := viper.GetString(pconstants.ArgOutput)
|
|
shouldShare := viper.GetBool(pconstants.ArgShare)
|
|
shouldUpload := viper.GetBool(pconstants.ArgSnapshot)
|
|
|
|
// Check if the output format is a snapshot format or if ArgExport is set
|
|
if outputFormat == pconstants.OutputFormatSnapshot || outputFormat == pconstants.OutputFormatSteampipeSnapshotShort || viper.IsSet(pconstants.ArgExport) || shouldShare || shouldUpload {
|
|
return true
|
|
}
|
|
|
|
// If none of the conditions are met, return false
|
|
return false
|
|
}
|
|
|
|
func publishSnapshotIfNeeded(ctx context.Context, snapshot *steampipeconfig.SteampipeSnapshot) error {
|
|
shouldShare := viper.GetBool(pconstants.ArgShare)
|
|
shouldUpload := viper.GetBool(pconstants.ArgSnapshot)
|
|
|
|
if !(shouldShare || shouldUpload) {
|
|
return nil
|
|
}
|
|
|
|
message, err := pipes.PublishSnapshot(ctx, snapshot, shouldShare)
|
|
if err != nil {
|
|
// reword "402 Payment Required" error
|
|
return handlePublishSnapshotError(err)
|
|
}
|
|
if viper.GetBool(constants.ArgProgress) {
|
|
fmt.Println(message)
|
|
}
|
|
return nil
|
|
}
|
|
|
|
func handlePublishSnapshotError(err error) error {
|
|
if err.Error() == "402 Payment Required" {
|
|
return fmt.Errorf("maximum number of snapshots reached")
|
|
}
|
|
return err
|
|
}
|
|
|
|
// if we are displaying csv with no header, do not include lines between the query results
|
|
func showBlankLineBetweenResults() bool {
|
|
return !(viper.GetString(pconstants.ArgOutput) == "csv" && !viper.GetBool(pconstants.ArgHeader))
|
|
}
|