Files
steampipe/pkg/pluginmanager_service/message_server_test.go
Nathan Wallace 55e2d407a0 Fix #4783: Nil pointer dereference in updateConnectionSchema (#4901)
* Add test demonstrating bug #4783 - updateConnectionSchema with nil pool

This test verifies that updateConnectionSchema handles a nil pool gracefully.
While RefreshConnections (via newRefreshConnectionState) already checks for
nil pool since #4778, this test demonstrates that updateConnectionSchema
should perform an early nil check for better error handling.

The test currently passes because newRefreshConnectionState catches the nil
pool, but we should add an explicit check at the start of updateConnectionSchema
for clarity and to avoid unnecessary work.

Related to issue #4783

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix #4783: Add nil pool check in updateConnectionSchema

Add an early nil check for the pool at the beginning of updateConnectionSchema
to prevent unnecessary work and provide clearer error handling.

While newRefreshConnectionState (called by RefreshConnections) already checks
for nil pool since #4778, adding the check at the start of updateConnectionSchema
provides several benefits:

1. Avoids unnecessary work - we don't call RefreshConnections if pool is nil
2. Clearer error logging - warning message specifically indicates the issue
   is in updateConnectionSchema
3. Defense in depth - validates preconditions before executing the method

The method is called from the message server when a plugin sends a schema
update notification, so the nil check ensures we handle edge cases gracefully.

Closes #4783

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-11-17 04:00:18 -05:00

366 lines
8.9 KiB
Go

package pluginmanager_service
import (
"context"
"runtime"
"sync"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
)
// Test helpers for message server tests
func newTestMessageServer(t *testing.T) *PluginMessageServer {
t.Helper()
pm := newTestPluginManager(t)
return &PluginMessageServer{
pluginManager: pm,
}
}
// Test 1: NewPluginMessageServer
func TestNewPluginMessageServer(t *testing.T) {
pm := newTestPluginManager(t)
ms, err := NewPluginMessageServer(pm)
require.NoError(t, err)
assert.NotNil(t, ms)
assert.Equal(t, pm, ms.pluginManager)
}
// Test 2: PluginMessageServer Initialization
func TestPluginManager_MessageServerInitialization(t *testing.T) {
pm := newTestPluginManager(t)
assert.NotNil(t, pm.messageServer, "messageServer should be initialized")
assert.Equal(t, pm, pm.messageServer.pluginManager, "messageServer should reference parent PluginManager")
}
// Test 3: Concurrent Access
func TestPluginMessageServer_ConcurrentAccess(t *testing.T) {
ms := newTestMessageServer(t)
var wg sync.WaitGroup
numGoroutines := 50
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func() {
defer wg.Done()
_ = ms.pluginManager
}()
}
wg.Wait()
}
// Test 4: LogReceiveError with Valid Errors
func TestPluginMessageServer_LogReceiveError(t *testing.T) {
ms := newTestMessageServer(t)
// Should not panic for various error types
ms.logReceiveError(context.Canceled, "test-connection")
ms.logReceiveError(context.DeadlineExceeded, "test-connection")
}
// TestPluginMessageServer_LogReceiveError_NilError tests that logReceiveError
// handles nil error gracefully without panicking
func TestPluginMessageServer_LogReceiveError_NilError(t *testing.T) {
// Create a message server
pm := &PluginManager{}
server := &PluginMessageServer{
pluginManager: pm,
}
// This should not panic - calling logReceiveError with nil error
server.logReceiveError(nil, "test-connection")
}
// Test 5: Multiple Message Servers
func TestPluginManager_MultipleMessageServers(t *testing.T) {
pm := newTestPluginManager(t)
ms1, err1 := NewPluginMessageServer(pm)
ms2, err2 := NewPluginMessageServer(pm)
require.NoError(t, err1)
require.NoError(t, err2)
assert.NotNil(t, ms1)
assert.NotNil(t, ms2)
// Both should reference the same plugin manager
assert.Equal(t, pm, ms1.pluginManager)
assert.Equal(t, pm, ms2.pluginManager)
}
// Test 6: Message Server with Nil Plugin Manager
func TestPluginMessageServer_NilPluginManager(t *testing.T) {
ms := &PluginMessageServer{
pluginManager: nil,
}
assert.Nil(t, ms.pluginManager)
}
// Test 7: Goroutine Cleanup
func TestPluginMessageServer_GoroutineCleanup(t *testing.T) {
before := runtime.NumGoroutine()
ms := newTestMessageServer(t)
_ = ms
time.Sleep(100 * time.Millisecond)
after := runtime.NumGoroutine()
// Creating a message server shouldn't leak goroutines
if after > before+5 {
t.Errorf("Potential goroutine leak: before=%d, after=%d", before, after)
}
}
// Test 8: Message Type Structure
func TestPluginMessage_SchemaUpdatedType(t *testing.T) {
message := &sdkproto.PluginMessage{
MessageType: sdkproto.PluginMessageType_SCHEMA_UPDATED,
Connection: "test-connection",
}
assert.Equal(t, sdkproto.PluginMessageType_SCHEMA_UPDATED, message.MessageType)
assert.Equal(t, "test-connection", message.Connection)
}
// Test 9: LogReceiveError with Different Error Types
func TestPluginMessageServer_LogReceiveError_ErrorTypes(t *testing.T) {
ms := newTestMessageServer(t)
// Test various error types don't cause panics
errors := []error{
context.Canceled,
context.DeadlineExceeded,
assert.AnError,
}
for _, err := range errors {
ms.logReceiveError(err, "test-connection")
}
}
// Test 10: Message Server Initialization Consistency
func TestPluginManager_MessageServer_Consistency(t *testing.T) {
pm := newTestPluginManager(t)
// Verify messageServer is initialized and consistent
assert.NotNil(t, pm.messageServer)
assert.Equal(t, pm, pm.messageServer.pluginManager)
// Accessing it multiple times should return the same instance
ms1 := pm.messageServer
ms2 := pm.messageServer
assert.Equal(t, ms1, ms2)
}
// Test 11: Message Server Survives Plugin Manager Operations
func TestPluginMessageServer_SurvivesPluginManagerOperations(t *testing.T) {
pm := newTestPluginManager(t)
ms := pm.messageServer
// Perform various plugin manager operations
pm.populatePluginConnectionConfigs()
pm.setPluginCacheSizeMap()
pm.nonAggregatorConnectionCount()
// Message server should still be accessible
assert.Equal(t, pm, ms.pluginManager)
assert.NotNil(t, pm.messageServer)
}
// Test 12: Concurrent NewPluginMessageServer Calls
func TestNewPluginMessageServer_Concurrent(t *testing.T) {
pm := newTestPluginManager(t)
var wg sync.WaitGroup
numGoroutines := 50
servers := make([]*PluginMessageServer, numGoroutines)
errors := make([]error, numGoroutines)
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
servers[idx], errors[idx] = NewPluginMessageServer(pm)
}(i)
}
wg.Wait()
// All should succeed
for i := 0; i < numGoroutines; i++ {
assert.NoError(t, errors[i])
assert.NotNil(t, servers[i])
assert.Equal(t, pm, servers[i].pluginManager)
}
}
// Test 13: Message Server Pointer Stability
func TestPluginMessageServer_PointerStability(t *testing.T) {
pm := newTestPluginManager(t)
ms1 := pm.messageServer
ms2 := pm.messageServer
// Should be the same pointer
assert.True(t, ms1 == ms2, "messageServer pointer should be stable")
}
// Test 14: LogReceiveError Concurrent Calls
func TestPluginMessageServer_LogReceiveError_Concurrent(t *testing.T) {
ms := newTestMessageServer(t)
var wg sync.WaitGroup
numGoroutines := 100
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
err := assert.AnError
if idx%2 == 0 {
err = context.Canceled
}
ms.logReceiveError(err, "test-connection")
}(i)
}
wg.Wait()
}
// Test 15: Message Server Field Access
func TestPluginMessageServer_FieldAccess(t *testing.T) {
ms := newTestMessageServer(t)
// Verify fields are accessible and not nil
assert.NotNil(t, ms.pluginManager)
assert.NotNil(t, ms.pluginManager.logger)
assert.NotNil(t, ms.pluginManager.runningPluginMap)
}
// Test 16: Message Server Doesn't Block Plugin Manager
func TestPluginMessageServer_DoesNotBlockPluginManager(t *testing.T) {
pm := newTestPluginManager(t)
// Message server should not prevent these operations
config := newTestConnectionConfig("plugin1", "instance1", "conn1")
pm.connectionConfigMap["conn1"] = config
pm.populatePluginConnectionConfigs()
// Verify operations worked
assert.Len(t, pm.pluginConnectionConfigMap, 1)
// Message server should still be valid
assert.NotNil(t, pm.messageServer)
assert.Equal(t, pm, pm.messageServer.pluginManager)
}
// Test 17: Stress Test for Concurrent Access
func TestPluginMessageServer_StressConcurrentAccess(t *testing.T) {
if testing.Short() {
t.Skip("Skipping stress test in short mode")
}
pm := newTestPluginManager(t)
ms := pm.messageServer
var wg sync.WaitGroup
duration := 1 * time.Second
stopCh := make(chan struct{})
// Multiple readers accessing pluginManager
for i := 0; i < 20; i++ {
wg.Add(1)
go func() {
defer wg.Done()
for {
select {
case <-stopCh:
return
default:
_ = ms.pluginManager
if ms.pluginManager != nil {
_ = ms.pluginManager.connectionConfigMap
}
}
}
}()
}
time.Sleep(duration)
close(stopCh)
wg.Wait()
}
// Test 18: UpdateConnectionSchema with Nil Pool
// Tests that updateConnectionSchema handles nil pool gracefully without panicking
// Issue #4783: The method calls RefreshConnections which accesses m.pool before the nil check
func TestPluginManager_UpdateConnectionSchema_NilPool(t *testing.T) {
// Create a PluginManager with a nil pool
pm := &PluginManager{
runningPluginMap: make(map[string]*runningPlugin),
pool: nil, // explicitly nil pool
}
ctx := context.Background()
// This should not panic - calling updateConnectionSchema with nil pool
// Previously this would panic because RefreshConnections accesses pool before nil check
pm.updateConnectionSchema(ctx, "test-connection")
// If we get here without panicking, the test passes
}
// Test 19: UpdateConnectionSchema with Nil Pool Concurrent
// Tests that concurrent calls to updateConnectionSchema with nil pool don't cause race conditions or panics
func TestPluginManager_UpdateConnectionSchema_NilPool_Concurrent(t *testing.T) {
pm := &PluginManager{
runningPluginMap: make(map[string]*runningPlugin),
pool: nil,
}
ctx := context.Background()
var wg sync.WaitGroup
numGoroutines := 10
for i := 0; i < numGoroutines; i++ {
wg.Add(1)
go func(idx int) {
defer wg.Done()
// Should not panic
pm.updateConnectionSchema(ctx, "test-connection")
}(i)
}
wg.Wait()
}