diff --git a/pkg/snapshot/snapshot.go b/pkg/snapshot/snapshot.go index 8a5b6bd7a..811bff50b 100644 --- a/pkg/snapshot/snapshot.go +++ b/pkg/snapshot/snapshot.go @@ -189,8 +189,12 @@ func SnapshotToQueryResult[T queryresult.TimingContainer](snap *steampipeconfig. var tim T res := queryresult.NewResult[T](chartRun.Data.Columns, tim) + // Create a done channel to allow the goroutine to be cancelled + done := make(chan struct{}) + // start a goroutine to stream the results as rows go func() { + defer res.Close() for _, d := range chartRun.Data.Rows { // we need to allocate a new slice everytime, since this gets read // asynchronously on the other end and we need to make sure that we don't overwrite @@ -199,11 +203,25 @@ func SnapshotToQueryResult[T queryresult.TimingContainer](snap *steampipeconfig. for i, c := range chartRun.Data.Columns { rowVals[i] = d[c.Name] } - res.StreamRow(rowVals) + + // Use select with timeout to prevent goroutine leak when consumer stops reading + select { + case res.RowChan <- &queryresult.RowResult{Data: rowVals}: + // Row sent successfully + case <-done: + // Cancelled, stop sending rows + return + case <-time.After(5 * time.Second): + // Timeout - consumer likely stopped reading, exit to prevent leak + return + } } - res.Close() }() + // Note: The done channel is intentionally not closed anywhere because we don't have + // a way to detect when the consumer abandons the result. The timeout in the select + // statement handles the goroutine leak case. + // res.Timing = &queryresult.TimingMetadata{ // Duration: time.Since(startTime), // } diff --git a/pkg/snapshot/snapshot_test.go b/pkg/snapshot/snapshot_test.go index 4f2de0caa..b905b2992 100644 --- a/pkg/snapshot/snapshot_test.go +++ b/pkg/snapshot/snapshot_test.go @@ -204,7 +204,7 @@ func TestConcurrentSnapshotToQueryResult_Race(t *testing.T) { // TestSnapshotToQueryResult_GoroutineCleanup tests goroutine cleanup // FOUND BUG: Goroutine leak when rows are not fully consumed func TestSnapshotToQueryResult_GoroutineCleanup(t *testing.T) { - t.Skip("Demonstrates bug #4768 - Goroutines leak when rows are not consumed - see snapshot.go:193. Remove this skip in bug fix PR commit 1, then fix in commit 2.") + // t.Skip("Demonstrates bug #4768 - Goroutines leak when rows are not consumed - see snapshot.go:193. Remove this skip in bug fix PR commit 1, then fix in commit 2.") ctx := context.Background() cols := []*pqueryresult.ColumnDef{ @@ -244,7 +244,7 @@ func TestSnapshotToQueryResult_GoroutineCleanup(t *testing.T) { // TestSnapshotToQueryResult_PartialConsumption tests partial row consumption // FOUND BUG: Goroutine leak when rows are not fully consumed func TestSnapshotToQueryResult_PartialConsumption(t *testing.T) { - t.Skip("Demonstrates bug #4768 - Goroutines leak when rows are not consumed - see snapshot.go:193. Remove this skip in bug fix PR commit 1, then fix in commit 2.") + // t.Skip("Demonstrates bug #4768 - Goroutines leak when rows are not consumed - see snapshot.go:193. Remove this skip in bug fix PR commit 1, then fix in commit 2.") ctx := context.Background() cols := []*pqueryresult.ColumnDef{ @@ -252,10 +252,12 @@ func TestSnapshotToQueryResult_PartialConsumption(t *testing.T) { } result := pqueryresult.NewResult(cols, queryresult.NewTimingResultStream()) - for i := 0; i < 100; i++ { - result.StreamRow([]interface{}{i}) - } - result.Close() + go func() { + for i := 0; i < 100; i++ { + result.StreamRow([]interface{}{i}) + } + result.Close() + }() resolvedQuery := &modconfig.ResolvedQuery{ RawSQL: "SELECT id FROM test",