mirror of
https://github.com/turbot/steampipe.git
synced 2025-12-19 18:12:43 -05:00
* Add test for #4803: Race condition in initialisationComplete flag Add TestInitialisationComplete_RaceCondition to demonstrate the data race that occurs when the initialisationComplete boolean flag is accessed concurrently by multiple goroutines without synchronization. The test simulates: - Init goroutine writing to the flag - Query executor reading via isInitialised() - Notification handler reading the flag directly This test will fail when run with the -race flag, exposing the bug. Co-Authored-By: Claude <noreply@anthropic.com> * Fix #4803: Use atomic.Bool for initialisationComplete flag Replace the plain boolean initialisationComplete field with atomic.Bool to prevent data races when accessed concurrently by multiple goroutines. Changes: - Change field type from bool to atomic.Bool - Use .Store(true) for writes - Use .Load() for reads in isInitialised() and handleConnectionUpdateNotification() - Update test to use atomic operations The test now passes with -race flag, confirming the race condition is fixed. --------- Co-authored-by: Claude <noreply@anthropic.com>
This commit is contained in:
@@ -10,6 +10,7 @@ import (
|
||||
"os/signal"
|
||||
"strings"
|
||||
"sync"
|
||||
"sync/atomic"
|
||||
"time"
|
||||
|
||||
"github.com/alecthomas/chroma/formatters"
|
||||
@@ -58,7 +59,7 @@ type InteractiveClient struct {
|
||||
// channel used internally to pass the initialisation result
|
||||
initResultChan chan *db_common.InitResult
|
||||
// flag set when initialisation is complete (with or without errors)
|
||||
initialisationComplete bool
|
||||
initialisationComplete atomic.Bool
|
||||
afterClose AfterPromptCloseAction
|
||||
// lock while execution is occurring to avoid errors/warnings being shown
|
||||
executionLock sync.Mutex
|
||||
@@ -731,7 +732,7 @@ func (c *InteractiveClient) handleConnectionUpdateNotification(ctx context.Conte
|
||||
// ignore schema update notifications until initialisation is complete
|
||||
// (we may receive schema update messages from the initial refresh connections, but we do not need to reload
|
||||
// the schema as we will have already loaded the correct schema)
|
||||
if !c.initialisationComplete {
|
||||
if !c.initialisationComplete.Load() {
|
||||
log.Printf("[INFO] received schema update notification but ignoring it as we are initializing")
|
||||
return
|
||||
}
|
||||
|
||||
@@ -16,7 +16,7 @@ import (
|
||||
func (c *InteractiveClient) handleInitResult(ctx context.Context, initResult *db_common.InitResult) {
|
||||
// whatever happens, set initialisationComplete
|
||||
defer func() {
|
||||
c.initialisationComplete = true
|
||||
c.initialisationComplete.Store(true)
|
||||
}()
|
||||
|
||||
if initResult.Error != nil {
|
||||
@@ -127,7 +127,7 @@ func (c *InteractiveClient) readInitDataStream(ctx context.Context) {
|
||||
// return whether the client is initialises
|
||||
// there are 3 conditions>
|
||||
func (c *InteractiveClient) isInitialised() bool {
|
||||
return c.initialisationComplete
|
||||
return c.initialisationComplete.Load()
|
||||
}
|
||||
|
||||
func (c *InteractiveClient) waitForInitData(ctx context.Context) error {
|
||||
|
||||
@@ -2,6 +2,7 @@ package interactive
|
||||
|
||||
import (
|
||||
"strings"
|
||||
"sync"
|
||||
"testing"
|
||||
|
||||
"github.com/c-bata/go-prompt"
|
||||
@@ -289,9 +290,8 @@ func TestIsInitialised(t *testing.T) {
|
||||
|
||||
for _, tt := range tests {
|
||||
t.Run(tt.name, func(t *testing.T) {
|
||||
c := &InteractiveClient{
|
||||
initialisationComplete: tt.initialisationComplete,
|
||||
}
|
||||
c := &InteractiveClient{}
|
||||
c.initialisationComplete.Store(tt.initialisationComplete)
|
||||
|
||||
result := c.isInitialised()
|
||||
|
||||
@@ -532,3 +532,51 @@ func TestCancelActiveQueryIfAny(t *testing.T) {
|
||||
}
|
||||
})
|
||||
}
|
||||
|
||||
// TestInitialisationComplete_RaceCondition tests that concurrent access to
|
||||
// the initialisationComplete flag does not cause data races.
|
||||
//
|
||||
// This test simulates the real-world scenario where:
|
||||
// - One goroutine (init goroutine) writes to initialisationComplete
|
||||
// - Other goroutines (query executor, notification handler) read from it
|
||||
//
|
||||
// Bug: #4803
|
||||
func TestInitialisationComplete_RaceCondition(t *testing.T) {
|
||||
c := &InteractiveClient{}
|
||||
c.initialisationComplete.Store(false)
|
||||
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// Simulate initialization goroutine writing to the flag
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 100; i++ {
|
||||
c.initialisationComplete.Store(true)
|
||||
c.initialisationComplete.Store(false)
|
||||
}
|
||||
}()
|
||||
|
||||
// Simulate query executor reading the flag
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 100; i++ {
|
||||
_ = c.isInitialised()
|
||||
}
|
||||
}()
|
||||
|
||||
// Simulate notification handler reading the flag
|
||||
wg.Add(1)
|
||||
go func() {
|
||||
defer wg.Done()
|
||||
for i := 0; i < 100; i++ {
|
||||
// Check the flag directly (as handleConnectionUpdateNotification does)
|
||||
if !c.initialisationComplete.Load() {
|
||||
continue
|
||||
}
|
||||
}
|
||||
}()
|
||||
|
||||
wg.Wait()
|
||||
}
|
||||
|
||||
Reference in New Issue
Block a user