When plugin startup experiences panic, plugin writes known string to stdout so plugin manager can intercept the "Unrecognized remote plugin message" and instead return the panic message. DO not retry plugin startup if initialisation fails. #3732.

This commit is contained in:
kaidaguerre
2023-08-09 13:20:03 +01:00
committed by GitHub
parent 72323ef37a
commit d8a92f37bf
2 changed files with 29 additions and 12 deletions

4
go.sum
View File

@@ -1072,8 +1072,8 @@ github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50 h1:
github.com/turbot/go-prompt v0.2.6-steampipe.0.0.20221028122246-eb118ec58d50/go.mod h1:vFnjEGDIIA/Lib7giyE4E9c50Lvl8j0S+7FVlAwDAVw=
github.com/turbot/steampipe-cloud-sdk-go v0.6.0 h1:ufAxOpKS1uq7eejuE5sfEu1+d7QAd0RBjl8Bn6+mIs8=
github.com/turbot/steampipe-cloud-sdk-go v0.6.0/go.mod h1:M42TMBdMim4bV1YTMxhKyzfSGSMo4CXUkm3wt9w7t1Y=
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.16 h1:61HPiCofKxRCr+PL/Cf3lROa/rQdF4oNvQyoTHn1Q5o=
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.16/go.mod h1:Np0X1Oj3JNTcuf9JmvWwHrCqc0UB4iJLmUlOkRwMCWw=
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.13 h1:ertavdOZLsu45h8H6R0A07W/SGmQYA5m7PrR5Fo4t+8=
github.com/turbot/steampipe-plugin-sdk/v5 v5.6.0-dev.13/go.mod h1:aCOVDbfgl/y/vGUaEKkzi9jBrNerEN46l88DkQUu3Vw=
github.com/twitchyliquid64/golang-asm v0.15.1 h1:SU5vSMR7hnwNxj24w34ZyCi/FmDZTkS4MhqMhdFk5YI=
github.com/twitchyliquid64/golang-asm v0.15.1/go.mod h1:a1lVb/DtPvCB8fslRZhAngC2+aY1QWCk3Cedj/Gdt08=
github.com/ugorji/go v0.0.0-20180813092308-00b869d2f4a5/go.mod h1:hnLbHMwcvSihnDhEfx2/BzKp2xb0Y+ErdfYcrs9tkJQ=

View File

@@ -4,6 +4,7 @@ import (
"context"
"crypto/md5"
"fmt"
"github.com/turbot/steampipe-plugin-sdk/v5/sperr"
"log"
"os"
"os/exec"
@@ -127,7 +128,7 @@ func (m *PluginManager) Get(req *pb.GetRequest) (*pb.GetResponse, error) {
reattach, err := m.ensurePlugin(pluginName, connectionConfigs, req)
if err != nil {
log.Printf("[WARN] PluginManager Get failed for %s: %s (%p)", pluginName, err.Error(), resp)
resp.FailureMap[pluginName] = err.Error()
resp.FailureMap[pluginName] = sperr.WrapWithMessage(err, "failed to start '%s'", pluginName).Error()
} else {
log.Printf("[TRACE] PluginManager Get succeeded for %s, pid %d (%p)", pluginName, reattach.Pid, resp)
@@ -452,10 +453,10 @@ func (m *PluginManager) startPlugin(pluginName string, connectionConfigs []*sdkp
// close failed chan to signal to anyone waiting for the plugin to startup that it failed
close(startingPlugin.failed)
log.Printf("[WARN] startPluginProcess failed: %s (%p)", err.Error(), req)
log.Printf("[INFO] startPluginProcess failed: %s (%p)", err.Error(), req)
// kill the client
if startingPlugin.client != nil {
log.Printf("[WARN] failed pid: %d (%p)", startingPlugin.client.ReattachConfig().Pid, req)
log.Printf("[INFO] failed pid: %d (%p)", startingPlugin.client.ReattachConfig().Pid, req)
startingPlugin.client.Kill()
}
@@ -557,6 +558,7 @@ func (m *PluginManager) startPluginProcess(pluginName string, connectionConfigs
})
if _, err := client.Start(); err != nil {
err := m.handleStartFailure(err)
return nil, err
}
@@ -589,7 +591,7 @@ func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.Connectio
if supportedOperations == nil {
supportedOperations = &sdkproto.GetSupportedOperationsResponse{}
}
// if this plugin does not support multiple connections, we no longer support is
// if this plugin does not support multiple connections, we no longer support it
if !supportedOperations.MultipleConnections {
// TODO SEND NOTIFICATION TO CLI
return nil, fmt.Errorf("plugins which do not supprt multiple connections (using SDK version < v4) are no longer supported. Upgrade plugin '%s", pluginName)
@@ -605,18 +607,16 @@ func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.Connectio
// this returns a list of all connections provided by this plugin
err = m.setAllConnectionConfigs(connectionConfigs, pluginClient, supportedOperations)
if err != nil {
// return retryable error
log.Printf("[WARN] failed to set connection config for %s: %s", pluginName, err.Error())
return nil, retry.RetryableError(err)
return nil, err
}
// if this plugin supports setting cache options, do so
if supportedOperations.SetCacheOptions {
err = m.setCacheOptions(pluginClient)
if err != nil {
// return retryable error
log.Printf("[WARN] failed to set cache options for %s: %s", pluginName, err.Error())
return nil, retry.RetryableError(err)
return nil, err
}
}
@@ -625,8 +625,7 @@ func (m *PluginManager) initializePlugin(connectionConfigs []*sdkproto.Connectio
// if this plugin has a dynamic schema, add connections to message server
err = m.notifyNewDynamicSchemas(pluginClient, exemplarConnectionConfig, connectionNames)
if err != nil {
// send err down running plugin error channel
return nil, retry.RetryableError(err)
return nil, err
}
log.Printf("[INFO] initializePlugin complete pid %d", client.ReattachConfig().Pid)
@@ -813,6 +812,24 @@ func (m *PluginManager) nonAggregatorConnectionCount() int {
return res
}
func (m *PluginManager) handleStartFailure(err error) error {
// extract the plugin message
_, pluginMessage, found := strings.Cut(err.Error(), sdkplugin.UnrecognizedRemotePluginMessage)
if !found {
return err
}
pluginMessage, _, found = strings.Cut(pluginMessage, sdkplugin.UnrecognizedRemotePluginMessageSuffix)
if !found {
return err
}
// if this was a panic during startup, reraise an error with the panic string
if strings.Contains(pluginMessage, sdkplugin.StartupPanicMessage) {
return fmt.Errorf(pluginMessage)
}
return err
}
func nonAggregatorConnectionCount(connections []*sdkproto.ConnectionConfig) int {
res := 0
for _, c := range connections {