Concurrent read and close synchronization closes #4805 (#4874)

* Unskip test demonstrating bug #4805: Concurrent read and close may race

Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix #4805: Add synchronization for concurrent StreamRow and Close

The sync.Once in Close() only prevents multiple Close() calls, but
doesn't coordinate with StreamRow() operations. Added a mutex and
closed flag to prevent race conditions when one goroutine streams
rows while another closes the result.

The fix:
- Added mutex (mu) and closed flag to Result struct
- StreamRow checks closed flag before streaming (with RLock)
- Close sets closed flag (with Lock) before closing channel

This prevents "send on closed channel" panics and data races.

Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
Nathan Wallace
2025-11-15 23:51:37 +08:00
committed by GitHub
parent 6016a71053
commit 1a1b380918
2 changed files with 58 additions and 0 deletions

View File

@@ -7,9 +7,12 @@ import (
)
// Result wraps queryresult.Result[TimingResultStream] with idempotent Close()
// and synchronization to prevent race between StreamRow and Close
type Result struct {
*queryresult.Result[TimingResultStream]
closeOnce sync.Once
mu sync.RWMutex
closed bool
}
func NewResult(cols []*queryresult.ColumnDef) *Result {
@@ -21,10 +24,23 @@ func NewResult(cols []*queryresult.ColumnDef) *Result {
// Close closes the row channel in an idempotent manner
func (r *Result) Close() {
r.closeOnce.Do(func() {
r.mu.Lock()
r.closed = true
r.mu.Unlock()
r.Result.Close()
})
}
// StreamRow wraps the underlying StreamRow with synchronization
func (r *Result) StreamRow(row []interface{}) {
r.mu.RLock()
defer r.mu.RUnlock()
if !r.closed {
r.Result.StreamRow(row)
}
}
// WrapResult wraps a pipe-fittings Result with our wrapper that has idempotent Close
func WrapResult(r *queryresult.Result[TimingResultStream]) *Result {
return &Result{

View File

@@ -1,7 +1,9 @@
package queryresult
import (
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/turbot/pipe-fittings/v2/queryresult"
@@ -23,3 +25,43 @@ func TestResultClose_DoubleClose(t *testing.T) {
result.Close()
}, "Result.Close() should be idempotent and not panic on second call")
}
// TestResult_ConcurrentReadAndClose tests concurrent read from RowChan and Close()
// This test demonstrates bug #4805 - race condition when reading while closing
func TestResult_ConcurrentReadAndClose(t *testing.T) {
// Run the test multiple times to increase chance of catching race
for i := 0; i < 100; i++ {
cols := []*queryresult.ColumnDef{
{Name: "id", DataType: "integer"},
}
result := NewResult(cols)
var wg sync.WaitGroup
wg.Add(3)
// Goroutine 1: Stream rows
go func() {
defer wg.Done()
for j := 0; j < 100; j++ {
result.StreamRow([]interface{}{j})
}
}()
// Goroutine 2: Read from RowChan (may race with Close)
go func() {
defer wg.Done()
for range result.RowChan {
// Consume rows - this read may race with channel close
}
}()
// Goroutine 3: Close while reading is happening (triggers the race)
go func() {
defer wg.Done()
time.Sleep(10 * time.Microsecond) // Let some rows stream first
result.Close() // This may race with goroutine 2 reading
}()
wg.Wait()
}
}