mattermost/server/public/plugin/supervisor.go
Nick Misasi 98eb666891 feat(12-01): wire API server lifecycle into supervisor shutdown
- Add apiServerCleanup call to supervisor Shutdown method
- Cleanup happens AFTER Python process terminates for graceful disconnect
- Fix test assertion to check for relative executable path

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 09:49:51 -05:00

305 lines
7.9 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package plugin
import (
"crypto/sha256"
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"runtime"
"strings"
"sync"
"time"
plugin "github.com/hashicorp/go-plugin"
"github.com/pkg/errors"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
type supervisor struct {
lock sync.RWMutex
pluginID string
appDriver AppDriver
client *plugin.Client
hooks Hooks
implemented [TotalHooksID]bool
hooksClient *hooksRPCClient
isReattached bool
apiServerCleanup func() // Cleanup function for Python plugin API server
}
type driverForPlugin struct {
AppDriver
pluginID string
}
func (d *driverForPlugin) Conn(isMaster bool) (string, error) {
return d.AppDriver.ConnWithPluginID(isMaster, d.pluginID)
}
func WithExecutableFromManifest(pluginInfo *model.BundleInfo) func(*supervisor, *plugin.ClientConfig) error {
return func(_ *supervisor, clientConfig *plugin.ClientConfig) error {
executable := pluginInfo.Manifest.GetExecutableForRuntime(runtime.GOOS, runtime.GOARCH)
if executable == "" {
return fmt.Errorf("backend executable not found for environment: %s/%s", runtime.GOOS, runtime.GOARCH)
}
executable = filepath.Clean(filepath.Join(".", executable))
if strings.HasPrefix(executable, "..") {
return fmt.Errorf("invalid backend executable: %s", executable)
}
executable = filepath.Join(pluginInfo.Path, executable)
cmd := exec.Command(executable)
// This doesn't add more security than before
// but removes the SecureConfig is nil warning.
// https://mattermost.atlassian.net/browse/MM-49167
pluginChecksum, err := getPluginExecutableChecksum(executable)
if err != nil {
return errors.Wrapf(err, "unable to generate plugin checksum")
}
clientConfig.Cmd = cmd
clientConfig.SecureConfig = &plugin.SecureConfig{
Checksum: pluginChecksum,
Hash: sha256.New(),
}
return nil
}
}
func WithReattachConfig(pluginReattachConfig *model.PluginReattachConfig) func(*supervisor, *plugin.ClientConfig) error {
return func(sup *supervisor, clientConfig *plugin.ClientConfig) error {
clientConfig.Reattach = pluginReattachConfig.ToHashicorpPluginReattachmentConfig()
sup.isReattached = true
return nil
}
}
func newSupervisor(pluginInfo *model.BundleInfo, apiImpl API, driver AppDriver, parentLogger *mlog.Logger, metrics metricsInterface, opts ...func(*supervisor, *plugin.ClientConfig) error) (retSupervisor *supervisor, retErr error) {
sup := supervisor{
pluginID: pluginInfo.Manifest.Id,
}
if driver != nil {
sup.appDriver = &driverForPlugin{AppDriver: driver, pluginID: pluginInfo.Manifest.Id}
}
defer func() {
if retErr != nil {
sup.Shutdown()
}
}()
wrappedLogger := pluginInfo.WrapLogger(parentLogger)
hclogAdaptedLogger := &hclogAdapter{
wrappedLogger: wrappedLogger,
extrasKey: "wrapped_extras",
}
pluginMap := map[string]plugin.Plugin{
"hooks": &hooksPlugin{
log: wrappedLogger,
driverImpl: sup.appDriver,
apiImpl: &apiTimerLayer{pluginInfo.Manifest.Id, apiImpl, metrics},
},
}
clientConfig := &plugin.ClientConfig{
HandshakeConfig: handshake,
Plugins: pluginMap,
SyncStdout: wrappedLogger.With(mlog.String("source", "plugin_stdout")).StdLogWriter(),
SyncStderr: wrappedLogger.With(mlog.String("source", "plugin_stderr")).StdLogWriter(),
Logger: hclogAdaptedLogger,
StartTimeout: time.Second * 3,
}
// Check if this is a Python plugin and configure accordingly
isPython := isPythonPlugin(pluginInfo.Manifest)
if isPython {
// Python plugins use gRPC protocol instead of net/rpc
// go-plugin defaults to net/rpc only when AllowedProtocols is nil,
// so we must explicitly allow gRPC for Python plugins
clientConfig.AllowedProtocols = []plugin.Protocol{plugin.ProtocolGRPC}
// Increase start timeout for Python plugins to account for interpreter
// startup time and module imports
clientConfig.StartTimeout = time.Second * 10
}
for _, opt := range opts {
err := opt(&sup, clientConfig)
if err != nil {
return nil, errors.Wrap(err, "failed to apply option")
}
}
sup.client = plugin.NewClient(clientConfig)
rpcClient, err := sup.client.Client()
if err != nil {
return nil, err
}
// Python plugins use gRPC for hook communication.
if isPython {
// Extract the gRPC connection from go-plugin's client.
grpcClient, ok := rpcClient.(*plugin.GRPCClient)
if !ok {
return nil, errors.New("expected gRPC client for Python plugin")
}
conn := grpcClient.Conn
hooksClient, err := newHooksGRPCClient(conn, wrappedLogger)
if err != nil {
return nil, errors.Wrap(err, "failed to create gRPC hooks client")
}
sup.hooks = &hooksTimerLayer{pluginInfo.Manifest.Id, hooksClient, metrics}
// Populate implemented array from gRPC client
impl, err := hooksClient.Implemented()
if err != nil {
return nil, errors.Wrap(err, "failed to query implemented hooks")
}
for _, hookName := range impl {
if hookId, ok := hookNameToId[hookName]; ok {
sup.implemented[hookId] = true
}
}
wrappedLogger.Info("Python plugin started with gRPC hooks")
return &sup, nil
}
raw, err := rpcClient.Dispense("hooks")
if err != nil {
return nil, err
}
c, ok := raw.(*hooksRPCClient)
if ok {
sup.hooksClient = c
}
sup.hooks = &hooksTimerLayer{pluginInfo.Manifest.Id, raw.(Hooks), metrics}
impl, err := sup.hooks.Implemented()
if err != nil {
return nil, err
}
for _, hookName := range impl {
if hookId, ok := hookNameToId[hookName]; ok {
sup.implemented[hookId] = true
}
}
return &sup, nil
}
func (sup *supervisor) Shutdown() {
sup.lock.RLock()
defer sup.lock.RUnlock()
if sup.client != nil {
// For reattached plugins, Kill() is mostly a no-op, so manually clean up the
// underlying rpcClient. This might be something to upstream unless we're doing
// something else wrong.
if sup.isReattached {
rpcClient, err := sup.client.Client()
if err != nil {
mlog.Warn("Failed to obtain rpcClient on Shutdown")
} else {
if err = rpcClient.Close(); err != nil {
mlog.Warn("Failed to close rpcClient on Shutdown")
}
}
}
sup.client.Kill()
}
// Wait for API RPC server and DB RPC server to exit.
// And then shutdown conns.
if sup.hooksClient != nil {
sup.hooksClient.doneWg.Wait()
if sup.appDriver != nil {
sup.appDriver.ShutdownConns(sup.pluginID)
}
}
// Clean up the Python plugin API server if it was started.
// This must happen AFTER the Python process terminates to allow graceful disconnect.
if sup.apiServerCleanup != nil {
sup.apiServerCleanup()
}
}
func (sup *supervisor) Hooks() Hooks {
sup.lock.RLock()
defer sup.lock.RUnlock()
return sup.hooks
}
// PerformHealthCheck checks the plugin through an an RPC ping.
func (sup *supervisor) PerformHealthCheck() error {
// No need for a lock here because Ping is read-locked.
if pingErr := sup.Ping(); pingErr != nil {
for pingFails := 1; pingFails < HealthCheckPingFailLimit; pingFails++ {
pingErr = sup.Ping()
if pingErr == nil {
break
}
}
if pingErr != nil {
return fmt.Errorf("plugin RPC connection is not responding")
}
}
return nil
}
// Ping checks that the RPC connection with the plugin is alive and healthy.
func (sup *supervisor) Ping() error {
sup.lock.RLock()
defer sup.lock.RUnlock()
client, err := sup.client.Client()
if err != nil {
return err
}
return client.Ping()
}
func (sup *supervisor) Implements(hookId int) bool {
sup.lock.RLock()
defer sup.lock.RUnlock()
return sup.implemented[hookId]
}
func getPluginExecutableChecksum(executablePath string) ([]byte, error) {
pathHash := sha256.New()
file, err := os.Open(executablePath)
if err != nil {
return nil, err
}
defer file.Close()
_, err = io.Copy(pathHash, file)
if err != nil {
return nil, err
}
return pathHash.Sum(nil), nil
}