Files
steampipe/pkg/pluginmanager_service/message_server.go
Nathan Wallace 6016a71053 Nil pointer dereference in logReceiveError closes #4798 (#4842)
* Unskip test demonstrating bug #4798: Nil pointer dereference in logReceiveError

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

* Fix #4798: Add nil check to logReceiveError

🤖 Generated with [Claude Code](https://claude.com/claude-code)

Co-Authored-By: Claude <noreply@anthropic.com>

---------

Co-authored-by: Claude <noreply@anthropic.com>
2025-11-15 10:50:17 -05:00

100 lines
3.2 KiB
Go

package pluginmanager_service
import (
"github.com/turbot/steampipe-plugin-sdk/v5/error_helpers"
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"log"
)
type PluginMessageServer struct {
pluginManager *PluginManager
}
func NewPluginMessageServer(pluginManager *PluginManager) (*PluginMessageServer, error) {
res := &PluginMessageServer{
pluginManager: pluginManager,
}
return res, nil
}
func (m *PluginMessageServer) AddConnection(pluginClient *sdkgrpc.PluginClient, pluginName string, connectionNames ...string) error {
log.Printf("[TRACE] PluginMessageServer AddConnection for connections %v", connectionNames)
for _, connection := range connectionNames {
cacheStream, err := m.openMessageStream(pluginClient, pluginName, connection)
if err != nil {
return err
}
// if no cache stream was returned, this plugin cannot support cache streams
if cacheStream == nil {
return nil
}
go m.runMessageListener(cacheStream, connection)
}
return nil
}
func (m *PluginMessageServer) openMessageStream(pluginClient *sdkgrpc.PluginClient, pluginName, connection string) (sdkproto.WrapperPlugin_EstablishMessageStreamClient, error) {
log.Printf("[TRACE] openMessageStream for connection '%s'", connection)
// does this plugin support streaming cache
supportedOperations, err := pluginClient.GetSupportedOperations()
if err != nil {
return nil, err
}
if !supportedOperations.MessageStream {
log.Printf("[WARN] plugin '%s' does not support message stream", pluginName)
return nil, nil
}
log.Printf("[TRACE] calling EstablishMessageStream")
stream, err := pluginClient.EstablishMessageStream()
return stream, err
}
func (m *PluginMessageServer) runMessageListener(stream sdkproto.WrapperPlugin_EstablishMessageStreamClient, connection string) {
defer stream.CloseSend()
log.Printf("[TRACE] runMessageListener connection '%s'", connection)
for {
message, err := stream.Recv()
if err != nil {
m.logReceiveError(err, connection)
return
}
m.handleMessage(stream, message, connection)
}
}
func (m *PluginMessageServer) logReceiveError(err error, connection string) {
if err == nil {
return
}
log.Printf("[TRACE] receive error for connection '%s': %v", connection, err)
switch {
case sdkgrpc.IsEOFError(err):
log.Printf("[TRACE] cache listener received EOF for connection '%s', returning", connection)
case sdkgrpc.IsNotImplementedError(err):
// should not be possible
log.Printf("[TRACE] connection '%s' does not support centralised cache", connection)
case error_helpers.IsContextCancelledError(err):
// ignore
default:
log.Printf("[WARN] error in PluginMessageServer runMessageListener for connection '%s': %v", connection, err)
}
}
func (m *PluginMessageServer) handleMessage(stream sdkproto.WrapperPlugin_EstablishMessageStreamClient, message *sdkproto.PluginMessage, connection string) {
ctx := stream.Context()
switch message.MessageType {
case sdkproto.PluginMessageType_SCHEMA_UPDATED:
log.Printf("[INFO] PluginMessageServer.handleMessage: PluginMessageType_SCHEMA_UPDATED for connection: %s", message.Connection)
m.pluginManager.updateConnectionSchema(ctx, message.Connection)
}
}