mirror of
https://github.com/opentffoundation/opentf.git
synced 2026-03-16 19:00:27 -04:00
Nomad was manually updated, so revert that to the version in master, remove it from vendor.json and add it to the ignore list. Update all packages that were in an unknown state to their latest master commits.
542 lines
14 KiB
Go
542 lines
14 KiB
Go
package plugin
|
|
|
|
import (
|
|
"bufio"
|
|
"errors"
|
|
"fmt"
|
|
"io"
|
|
"io/ioutil"
|
|
"log"
|
|
"net"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"strconv"
|
|
"strings"
|
|
"sync"
|
|
"sync/atomic"
|
|
"time"
|
|
"unicode"
|
|
)
|
|
|
|
// If this is 1, then we've called CleanupClients. This can be used
|
|
// by plugin RPC implementations to change error behavior since you
|
|
// can expected network connection errors at this point. This should be
|
|
// read by using sync/atomic.
|
|
var Killed uint32 = 0
|
|
|
|
// This is a slice of the "managed" clients which are cleaned up when
|
|
// calling Cleanup
|
|
var managedClients = make([]*Client, 0, 5)
|
|
|
|
// Error types
|
|
var (
|
|
// ErrProcessNotFound is returned when a client is instantiated to
|
|
// reattach to an existing process and it isn't found.
|
|
ErrProcessNotFound = errors.New("Reattachment process not found")
|
|
)
|
|
|
|
// Client handles the lifecycle of a plugin application. It launches
|
|
// plugins, connects to them, dispenses interface implementations, and handles
|
|
// killing the process.
|
|
//
|
|
// Plugin hosts should use one Client for each plugin executable. To
|
|
// dispense a plugin type, use the `Client.Client` function, and then
|
|
// cal `Dispense`. This awkward API is mostly historical but is used to split
|
|
// the client that deals with subprocess management and the client that
|
|
// does RPC management.
|
|
//
|
|
// See NewClient and ClientConfig for using a Client.
|
|
type Client struct {
|
|
config *ClientConfig
|
|
exited bool
|
|
doneLogging chan struct{}
|
|
l sync.Mutex
|
|
address net.Addr
|
|
process *os.Process
|
|
client *RPCClient
|
|
}
|
|
|
|
// ClientConfig is the configuration used to initialize a new
|
|
// plugin client. After being used to initialize a plugin client,
|
|
// that configuration must not be modified again.
|
|
type ClientConfig struct {
|
|
// HandshakeConfig is the configuration that must match servers.
|
|
HandshakeConfig
|
|
|
|
// Plugins are the plugins that can be consumed.
|
|
Plugins map[string]Plugin
|
|
|
|
// One of the following must be set, but not both.
|
|
//
|
|
// Cmd is the unstarted subprocess for starting the plugin. If this is
|
|
// set, then the Client starts the plugin process on its own and connects
|
|
// to it.
|
|
//
|
|
// Reattach is configuration for reattaching to an existing plugin process
|
|
// that is already running. This isn't common.
|
|
Cmd *exec.Cmd
|
|
Reattach *ReattachConfig
|
|
|
|
// Managed represents if the client should be managed by the
|
|
// plugin package or not. If true, then by calling CleanupClients,
|
|
// it will automatically be cleaned up. Otherwise, the client
|
|
// user is fully responsible for making sure to Kill all plugin
|
|
// clients. By default the client is _not_ managed.
|
|
Managed bool
|
|
|
|
// The minimum and maximum port to use for communicating with
|
|
// the subprocess. If not set, this defaults to 10,000 and 25,000
|
|
// respectively.
|
|
MinPort, MaxPort uint
|
|
|
|
// StartTimeout is the timeout to wait for the plugin to say it
|
|
// has started successfully.
|
|
StartTimeout time.Duration
|
|
|
|
// If non-nil, then the stderr of the client will be written to here
|
|
// (as well as the log). This is the original os.Stderr of the subprocess.
|
|
// This isn't the output of synced stderr.
|
|
Stderr io.Writer
|
|
|
|
// SyncStdout, SyncStderr can be set to override the
|
|
// respective os.Std* values in the plugin. Care should be taken to
|
|
// avoid races here. If these are nil, then this will automatically be
|
|
// hooked up to os.Stdin, Stdout, and Stderr, respectively.
|
|
//
|
|
// If the default values (nil) are used, then this package will not
|
|
// sync any of these streams.
|
|
SyncStdout io.Writer
|
|
SyncStderr io.Writer
|
|
}
|
|
|
|
// ReattachConfig is used to configure a client to reattach to an
|
|
// already-running plugin process. You can retrieve this information by
|
|
// calling ReattachConfig on Client.
|
|
type ReattachConfig struct {
|
|
Addr net.Addr
|
|
Pid int
|
|
}
|
|
|
|
// This makes sure all the managed subprocesses are killed and properly
|
|
// logged. This should be called before the parent process running the
|
|
// plugins exits.
|
|
//
|
|
// This must only be called _once_.
|
|
func CleanupClients() {
|
|
// Set the killed to true so that we don't get unexpected panics
|
|
atomic.StoreUint32(&Killed, 1)
|
|
|
|
// Kill all the managed clients in parallel and use a WaitGroup
|
|
// to wait for them all to finish up.
|
|
var wg sync.WaitGroup
|
|
for _, client := range managedClients {
|
|
wg.Add(1)
|
|
|
|
go func(client *Client) {
|
|
client.Kill()
|
|
wg.Done()
|
|
}(client)
|
|
}
|
|
|
|
log.Println("[DEBUG] plugin: waiting for all plugin processes to complete...")
|
|
wg.Wait()
|
|
}
|
|
|
|
// Creates a new plugin client which manages the lifecycle of an external
|
|
// plugin and gets the address for the RPC connection.
|
|
//
|
|
// The client must be cleaned up at some point by calling Kill(). If
|
|
// the client is a managed client (created with NewManagedClient) you
|
|
// can just call CleanupClients at the end of your program and they will
|
|
// be properly cleaned.
|
|
func NewClient(config *ClientConfig) (c *Client) {
|
|
if config.MinPort == 0 && config.MaxPort == 0 {
|
|
config.MinPort = 10000
|
|
config.MaxPort = 25000
|
|
}
|
|
|
|
if config.StartTimeout == 0 {
|
|
config.StartTimeout = 1 * time.Minute
|
|
}
|
|
|
|
if config.Stderr == nil {
|
|
config.Stderr = ioutil.Discard
|
|
}
|
|
|
|
if config.SyncStdout == nil {
|
|
config.SyncStdout = ioutil.Discard
|
|
}
|
|
if config.SyncStderr == nil {
|
|
config.SyncStderr = ioutil.Discard
|
|
}
|
|
|
|
c = &Client{config: config}
|
|
if config.Managed {
|
|
managedClients = append(managedClients, c)
|
|
}
|
|
|
|
return
|
|
}
|
|
|
|
// Client returns an RPC client for the plugin.
|
|
//
|
|
// Subsequent calls to this will return the same RPC client.
|
|
func (c *Client) Client() (*RPCClient, error) {
|
|
addr, err := c.Start()
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
if c.client != nil {
|
|
return c.client, nil
|
|
}
|
|
|
|
// Connect to the client
|
|
conn, err := net.Dial(addr.Network(), addr.String())
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
if tcpConn, ok := conn.(*net.TCPConn); ok {
|
|
// Make sure to set keep alive so that the connection doesn't die
|
|
tcpConn.SetKeepAlive(true)
|
|
}
|
|
|
|
// Create the actual RPC client
|
|
c.client, err = NewRPCClient(conn, c.config.Plugins)
|
|
if err != nil {
|
|
conn.Close()
|
|
return nil, err
|
|
}
|
|
|
|
// Begin the stream syncing so that stdin, out, err work properly
|
|
err = c.client.SyncStreams(
|
|
c.config.SyncStdout,
|
|
c.config.SyncStderr)
|
|
if err != nil {
|
|
c.client.Close()
|
|
c.client = nil
|
|
return nil, err
|
|
}
|
|
|
|
return c.client, nil
|
|
}
|
|
|
|
// Tells whether or not the underlying process has exited.
|
|
func (c *Client) Exited() bool {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
return c.exited
|
|
}
|
|
|
|
// End the executing subprocess (if it is running) and perform any cleanup
|
|
// tasks necessary such as capturing any remaining logs and so on.
|
|
//
|
|
// This method blocks until the process successfully exits.
|
|
//
|
|
// This method can safely be called multiple times.
|
|
func (c *Client) Kill() {
|
|
if c.process == nil {
|
|
return
|
|
}
|
|
|
|
// Close the client to cleanly exit the process
|
|
client, err := c.Client()
|
|
if err == nil {
|
|
err = client.Close()
|
|
}
|
|
if err != nil {
|
|
// If something went wrong somewhere gracefully quitting the
|
|
// plugin, we just force kill it.
|
|
c.process.Kill()
|
|
}
|
|
|
|
// Wait for the client to finish logging so we have a complete log
|
|
<-c.doneLogging
|
|
}
|
|
|
|
// Starts the underlying subprocess, communicating with it to negotiate
|
|
// a port for RPC connections, and returning the address to connect via RPC.
|
|
//
|
|
// This method is safe to call multiple times. Subsequent calls have no effect.
|
|
// Once a client has been started once, it cannot be started again, even if
|
|
// it was killed.
|
|
func (c *Client) Start() (addr net.Addr, err error) {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
if c.address != nil {
|
|
return c.address, nil
|
|
}
|
|
|
|
// If one of cmd or reattach isn't set, then it is an error. We wrap
|
|
// this in a {} for scoping reasons, and hopeful that the escape
|
|
// analysis will pop the stock here.
|
|
{
|
|
cmdSet := c.config.Cmd != nil
|
|
attachSet := c.config.Reattach != nil
|
|
if cmdSet == attachSet {
|
|
return nil, fmt.Errorf("Only one of Cmd or Reattach must be set")
|
|
}
|
|
}
|
|
|
|
// Create the logging channel for when we kill
|
|
c.doneLogging = make(chan struct{})
|
|
|
|
if c.config.Reattach != nil {
|
|
// Verify the process still exists. If not, then it is an error
|
|
p, err := os.FindProcess(c.config.Reattach.Pid)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
// Attempt to connect to the addr since on Unix systems FindProcess
|
|
// doesn't actually return an error if it can't find the process.
|
|
conn, err := net.Dial(
|
|
c.config.Reattach.Addr.Network(),
|
|
c.config.Reattach.Addr.String())
|
|
if err != nil {
|
|
p.Kill()
|
|
return nil, ErrProcessNotFound
|
|
}
|
|
conn.Close()
|
|
|
|
// Goroutine to mark exit status
|
|
go func(pid int) {
|
|
// Wait for the process to die
|
|
pidWait(pid)
|
|
|
|
// Log so we can see it
|
|
log.Printf("[DEBUG] plugin: reattached plugin process exited\n")
|
|
|
|
// Mark it
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
c.exited = true
|
|
|
|
// Close the logging channel since that doesn't work on reattach
|
|
close(c.doneLogging)
|
|
}(p.Pid)
|
|
|
|
// Set the address and process
|
|
c.address = c.config.Reattach.Addr
|
|
c.process = p
|
|
|
|
return c.address, nil
|
|
}
|
|
|
|
env := []string{
|
|
fmt.Sprintf("%s=%s", c.config.MagicCookieKey, c.config.MagicCookieValue),
|
|
fmt.Sprintf("PLUGIN_MIN_PORT=%d", c.config.MinPort),
|
|
fmt.Sprintf("PLUGIN_MAX_PORT=%d", c.config.MaxPort),
|
|
}
|
|
|
|
stdout_r, stdout_w := io.Pipe()
|
|
stderr_r, stderr_w := io.Pipe()
|
|
|
|
cmd := c.config.Cmd
|
|
cmd.Env = append(cmd.Env, os.Environ()...)
|
|
cmd.Env = append(cmd.Env, env...)
|
|
cmd.Stdin = os.Stdin
|
|
cmd.Stderr = stderr_w
|
|
cmd.Stdout = stdout_w
|
|
|
|
log.Printf("[DEBUG] plugin: starting plugin: %s %#v", cmd.Path, cmd.Args)
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
return
|
|
}
|
|
|
|
// Set the process
|
|
c.process = cmd.Process
|
|
|
|
// Make sure the command is properly cleaned up if there is an error
|
|
defer func() {
|
|
r := recover()
|
|
|
|
if err != nil || r != nil {
|
|
cmd.Process.Kill()
|
|
}
|
|
|
|
if r != nil {
|
|
panic(r)
|
|
}
|
|
}()
|
|
|
|
// Start goroutine to wait for process to exit
|
|
exitCh := make(chan struct{})
|
|
go func() {
|
|
// Make sure we close the write end of our stderr/stdout so
|
|
// that the readers send EOF properly.
|
|
defer stderr_w.Close()
|
|
defer stdout_w.Close()
|
|
|
|
// Wait for the command to end.
|
|
cmd.Wait()
|
|
|
|
// Log and make sure to flush the logs write away
|
|
log.Printf("[DEBUG] plugin: %s: plugin process exited\n", cmd.Path)
|
|
os.Stderr.Sync()
|
|
|
|
// Mark that we exited
|
|
close(exitCh)
|
|
|
|
// Set that we exited, which takes a lock
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
c.exited = true
|
|
}()
|
|
|
|
// Start goroutine that logs the stderr
|
|
go c.logStderr(stderr_r)
|
|
|
|
// Start a goroutine that is going to be reading the lines
|
|
// out of stdout
|
|
linesCh := make(chan []byte)
|
|
go func() {
|
|
defer close(linesCh)
|
|
|
|
buf := bufio.NewReader(stdout_r)
|
|
for {
|
|
line, err := buf.ReadBytes('\n')
|
|
if line != nil {
|
|
linesCh <- line
|
|
}
|
|
|
|
if err == io.EOF {
|
|
return
|
|
}
|
|
}
|
|
}()
|
|
|
|
// Make sure after we exit we read the lines from stdout forever
|
|
// so they don't block since it is an io.Pipe
|
|
defer func() {
|
|
go func() {
|
|
for _ = range linesCh {
|
|
}
|
|
}()
|
|
}()
|
|
|
|
// Some channels for the next step
|
|
timeout := time.After(c.config.StartTimeout)
|
|
|
|
// Start looking for the address
|
|
log.Printf("[DEBUG] plugin: waiting for RPC address for: %s", cmd.Path)
|
|
select {
|
|
case <-timeout:
|
|
err = errors.New("timeout while waiting for plugin to start")
|
|
case <-exitCh:
|
|
err = errors.New("plugin exited before we could connect")
|
|
case lineBytes := <-linesCh:
|
|
// Trim the line and split by "|" in order to get the parts of
|
|
// the output.
|
|
line := strings.TrimSpace(string(lineBytes))
|
|
parts := strings.SplitN(line, "|", 4)
|
|
if len(parts) < 4 {
|
|
err = fmt.Errorf(
|
|
"Unrecognized remote plugin message: %s\n\n"+
|
|
"This usually means that the plugin is either invalid or simply\n"+
|
|
"needs to be recompiled to support the latest protocol.", line)
|
|
return
|
|
}
|
|
|
|
// Check the core protocol. Wrapped in a {} for scoping.
|
|
{
|
|
var coreProtocol int64
|
|
coreProtocol, err = strconv.ParseInt(parts[0], 10, 0)
|
|
if err != nil {
|
|
err = fmt.Errorf("Error parsing core protocol version: %s", err)
|
|
return
|
|
}
|
|
|
|
if int(coreProtocol) != CoreProtocolVersion {
|
|
err = fmt.Errorf("Incompatible core API version with plugin. "+
|
|
"Plugin version: %s, Ours: %d\n\n"+
|
|
"To fix this, the plugin usually only needs to be recompiled.\n"+
|
|
"Please report this to the plugin author.", parts[0], CoreProtocolVersion)
|
|
return
|
|
}
|
|
}
|
|
|
|
// Parse the protocol version
|
|
var protocol int64
|
|
protocol, err = strconv.ParseInt(parts[1], 10, 0)
|
|
if err != nil {
|
|
err = fmt.Errorf("Error parsing protocol version: %s", err)
|
|
return
|
|
}
|
|
|
|
// Test the API version
|
|
if uint(protocol) != c.config.ProtocolVersion {
|
|
err = fmt.Errorf("Incompatible API version with plugin. "+
|
|
"Plugin version: %s, Ours: %d", parts[1], c.config.ProtocolVersion)
|
|
return
|
|
}
|
|
|
|
switch parts[2] {
|
|
case "tcp":
|
|
addr, err = net.ResolveTCPAddr("tcp", parts[3])
|
|
case "unix":
|
|
addr, err = net.ResolveUnixAddr("unix", parts[3])
|
|
default:
|
|
err = fmt.Errorf("Unknown address type: %s", parts[3])
|
|
}
|
|
}
|
|
|
|
c.address = addr
|
|
return
|
|
}
|
|
|
|
// ReattachConfig returns the information that must be provided to NewClient
|
|
// to reattach to the plugin process that this client started. This is
|
|
// useful for plugins that detach from their parent process.
|
|
//
|
|
// If this returns nil then the process hasn't been started yet. Please
|
|
// call Start or Client before calling this.
|
|
func (c *Client) ReattachConfig() *ReattachConfig {
|
|
c.l.Lock()
|
|
defer c.l.Unlock()
|
|
|
|
if c.address == nil {
|
|
return nil
|
|
}
|
|
|
|
if c.config.Cmd != nil && c.config.Cmd.Process == nil {
|
|
return nil
|
|
}
|
|
|
|
// If we connected via reattach, just return the information as-is
|
|
if c.config.Reattach != nil {
|
|
return c.config.Reattach
|
|
}
|
|
|
|
return &ReattachConfig{
|
|
Addr: c.address,
|
|
Pid: c.config.Cmd.Process.Pid,
|
|
}
|
|
}
|
|
|
|
func (c *Client) logStderr(r io.Reader) {
|
|
bufR := bufio.NewReader(r)
|
|
for {
|
|
line, err := bufR.ReadString('\n')
|
|
if line != "" {
|
|
c.config.Stderr.Write([]byte(line))
|
|
|
|
line = strings.TrimRightFunc(line, unicode.IsSpace)
|
|
log.Printf("[DEBUG] plugin: %s: %s", filepath.Base(c.config.Cmd.Path), line)
|
|
}
|
|
|
|
if err == io.EOF {
|
|
break
|
|
}
|
|
}
|
|
|
|
// Flag that we've completed logging for others
|
|
close(c.doneLogging)
|
|
}
|