Files
steampipe/pkg/pluginmanager_service/message_server.go
kaidaguerre 40804a3201 Execute RefreshConnections asyncronously and optimise for high connection count. Add connection_state table.
- Execute RefreshConnections asyncronously
- Add connection_state table to indicate the loading state of connections
- Optimise RefreshConnections by cloning connection schemas
- Add locking to ensure only a single instance of RefreshConnections runs
- Start executing queries without waiting for connections to load, add smart error handling to wait for required connection
- Optimise autocomplete for high connection count
- Autocomplete and inspect data available before all conections are refreshed
- Update file watcher to respond to CHMOD, so thaat it pickes up deletion of file contents

 Closes #3394
 Closes #3267
2023-05-10 09:05:08 +01:00

97 lines
3.2 KiB
Go

package pluginmanager_service
import (
"github.com/turbot/steampipe-plugin-sdk/v5/error_helpers"
sdkgrpc "github.com/turbot/steampipe-plugin-sdk/v5/grpc"
sdkproto "github.com/turbot/steampipe-plugin-sdk/v5/grpc/proto"
"log"
)
type PluginMessageServer struct {
pluginManager *PluginManager
}
func NewPluginMessageServer(pluginManager *PluginManager) (*PluginMessageServer, error) {
res := &PluginMessageServer{
pluginManager: pluginManager,
}
return res, nil
}
func (m *PluginMessageServer) AddConnection(pluginClient *sdkgrpc.PluginClient, pluginName string, connectionNames ...string) error {
log.Printf("[TRACE] PluginMessageServer AddConnection for connections %v", connectionNames)
for _, connection := range connectionNames {
cacheStream, err := m.openMessageStream(pluginClient, pluginName, connection)
if err != nil {
return err
}
// if no cache stream was returned, this plugin cannot support cache streams
if cacheStream == nil {
return nil
}
go m.runMessageListener(cacheStream, connection)
}
return nil
}
func (m *PluginMessageServer) openMessageStream(pluginClient *sdkgrpc.PluginClient, pluginName, connection string) (sdkproto.WrapperPlugin_EstablishMessageStreamClient, error) {
log.Printf("[TRACE] openMessageStream for connection '%s'", connection)
// does this plugin support streaming cache
supportedOperations, err := pluginClient.GetSupportedOperations()
if err != nil {
return nil, err
}
if !supportedOperations.MessageStream {
log.Printf("[WARN] plugin '%s' does not support message stream", pluginName)
return nil, nil
}
log.Printf("[TRACE] calling EstablishMessageStream")
stream, err := pluginClient.EstablishMessageStream()
return stream, err
}
func (m *PluginMessageServer) runMessageListener(stream sdkproto.WrapperPlugin_EstablishMessageStreamClient, connection string) {
defer stream.CloseSend()
log.Printf("[TRACE] runMessageListener connection '%s'", connection)
for {
message, err := stream.Recv()
if err != nil {
m.logReceiveError(err, connection)
return
}
m.handleMessage(stream, message, connection)
}
}
func (m *PluginMessageServer) logReceiveError(err error, connection string) {
log.Printf("[TRACE] receive error for connection '%s': %v", connection, err)
switch {
case sdkgrpc.IsEOFError(err):
log.Printf("[TRACE] cache listener received EOF for connection '%s', returning", connection)
case sdkgrpc.IsNotImplementedError(err):
// should not be possible
log.Printf("[TRACE] connection '%s' does not support centralised cache", connection)
case error_helpers.IsContextCancelledError(err):
// ignore
default:
log.Printf("[WARN] error in PluginMessageServer runMessageListener for connection '%s': %v", connection, err)
}
}
func (m *PluginMessageServer) handleMessage(stream sdkproto.WrapperPlugin_EstablishMessageStreamClient, message *sdkproto.PluginMessage, connection string) {
ctx := stream.Context()
switch message.MessageType {
case sdkproto.PluginMessageType_SCHEMA_UPDATED:
log.Printf("[TRACE] PluginMessageServer.handleMessage: PluginMessageType_SCHEMA_UPDATED for connection: %s", message.Connection)
m.pluginManager.updateConnectionSchema(ctx, message.Connection)
}
}