mattermost/server/public/plugin/python_integration_test.go
Nick Misasi 453ab3dc74 feat(12-01): start gRPC PluginAPI server for Python plugins
- Add startAPIServer function to start gRPC server on random port
- Add APIServerRegistrar type to break import cycle with apiserver package
- Modify WithCommandFromManifest to accept apiImpl and registrar
- Update configurePythonCommand to start API server and set MATTERMOST_PLUGIN_API_TARGET env var
- Add apiServerCleanup field to supervisor struct for cleanup on shutdown
- Add SetAPIServerRegistrar method to Environment for dependency injection
- Wire up API server registrar in app/plugin.go

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 09:46:34 -05:00

1115 lines
31 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package plugin
import (
"encoding/json"
"net/http"
"net/http/httptest"
"os"
"path/filepath"
"runtime"
"strings"
"testing"
"time"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/plugin/utils"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
// TestPythonPluginIntegration tests the complete Python plugin lifecycle:
// 1. Plugin startup with implemented hooks
// 2. OnActivate hook invocation
// 3. MessageHasBeenPosted hook invocation
// 4. ServeHTTP request handling
// 5. OnDeactivate on shutdown
//
// This test uses a fake Python interpreter (compiled Go binary) that implements
// the full PluginHooks gRPC service to validate the integration.
func TestPythonPluginIntegration(t *testing.T) {
// Create temp plugin directory
pluginDir, err := os.MkdirTemp("", "python-integration-test")
require.NoError(t, err)
defer os.RemoveAll(pluginDir)
// Determine the venv path based on OS
var venvPythonPath string
if runtime.GOOS == "windows" {
venvPythonPath = filepath.Join(pluginDir, "venv", "Scripts", "python.exe")
} else {
venvPythonPath = filepath.Join(pluginDir, "venv", "bin", "python")
}
// Create venv directory structure
require.NoError(t, os.MkdirAll(filepath.Dir(venvPythonPath), 0755))
// Compile a comprehensive fake Python interpreter that implements all hooks
// needed for integration testing
utils.CompileGo(t, `
package main
import (
"context"
"fmt"
"io"
"net"
"os"
"os/signal"
"syscall"
"sync/atomic"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
pb "github.com/mattermost/mattermost/server/public/pluginapi/grpc/generated/go/pluginapiv1"
)
// fakePluginHooks implements the PluginHooksServer interface for integration testing
type fakePluginHooks struct {
pb.UnimplementedPluginHooksServer
activated atomic.Bool
deactivated atomic.Bool
messagePostedCount atomic.Int32
httpRequestCount atomic.Int32
}
func (f *fakePluginHooks) Implemented(ctx context.Context, req *pb.ImplementedRequest) (*pb.ImplementedResponse, error) {
return &pb.ImplementedResponse{
Hooks: []string{
"OnActivate",
"OnDeactivate",
"MessageHasBeenPosted",
"ServeHTTP",
},
}, nil
}
func (f *fakePluginHooks) OnActivate(ctx context.Context, req *pb.OnActivateRequest) (*pb.OnActivateResponse, error) {
f.activated.Store(true)
return &pb.OnActivateResponse{}, nil
}
func (f *fakePluginHooks) OnDeactivate(ctx context.Context, req *pb.OnDeactivateRequest) (*pb.OnDeactivateResponse, error) {
f.deactivated.Store(true)
return &pb.OnDeactivateResponse{}, nil
}
func (f *fakePluginHooks) MessageHasBeenPosted(ctx context.Context, req *pb.MessageHasBeenPostedRequest) (*pb.MessageHasBeenPostedResponse, error) {
f.messagePostedCount.Add(1)
return &pb.MessageHasBeenPostedResponse{}, nil
}
func (f *fakePluginHooks) ServeHTTP(stream pb.PluginHooks_ServeHTTPServer) error {
f.httpRequestCount.Add(1)
// Receive the request
var init *pb.ServeHTTPRequestInit
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
if req.Init != nil {
init = req.Init
}
if req.BodyComplete {
break
}
}
// Determine response based on URL
var statusCode int32 = 200
var body string
var contentType string = "application/json"
if init != nil {
switch {
case init.Url == "/hello":
body = ` + "`" + `{"message": "Hello from Python plugin!"}` + "`" + `
case init.Url == "/echo":
body = fmt.Sprintf(` + "`" + `{"method": "%s", "path": "%s"}` + "`" + `, init.Method, init.Url)
default:
statusCode = 404
body = ` + "`" + `{"error": "not found"}` + "`" + `
}
}
// Send response
err := stream.Send(&pb.ServeHTTPResponse{
Init: &pb.ServeHTTPResponseInit{
StatusCode: statusCode,
Headers: []*pb.HTTPHeader{
{Key: "Content-Type", Values: []string{contentType}},
},
},
BodyChunk: []byte(body),
BodyComplete: true,
})
return err
}
func main() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
fmt.Fprintf(os.Stderr, "failed to listen: %v\n", err)
os.Exit(1)
}
grpcServer := grpc.NewServer()
healthServer := health.NewServer()
healthServer.SetServingStatus("plugin", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
pluginHooks := &fakePluginHooks{}
pb.RegisterPluginHooksServer(grpcServer, pluginHooks)
go func() {
if err := grpcServer.Serve(listener); err != nil {
fmt.Fprintf(os.Stderr, "failed to serve: %v\n", err)
}
}()
addr := listener.Addr().String()
fmt.Printf("1|1|tcp|%s|grpc\n", addr)
os.Stdout.Sync()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
<-sigCh
grpcServer.GracefulStop()
}
`, venvPythonPath)
// Create a dummy plugin.py file
scriptPath := filepath.Join(pluginDir, "plugin.py")
require.NoError(t, os.WriteFile(scriptPath, []byte("# Integration test Python plugin\n"), 0644))
// Create plugin.json manifest
manifest := &model.Manifest{
Id: "python-integration-test",
Version: "1.0.0",
Server: &model.ManifestServer{
Executable: "plugin.py",
},
}
manifestJSON, err := json.Marshal(manifest)
require.NoError(t, err)
require.NoError(t, os.WriteFile(filepath.Join(pluginDir, "plugin.json"), manifestJSON, 0644))
// Create bundle info
bundle := model.BundleInfoForPath(pluginDir)
require.NotNil(t, bundle.Manifest)
// Create logger
logger := mlog.CreateConsoleTestLogger(t)
// Create supervisor
sup, err := newSupervisor(bundle, nil, nil, logger, nil, WithCommandFromManifest(bundle, nil, nil))
require.NoError(t, err)
require.NotNil(t, sup)
defer sup.Shutdown()
t.Run("plugin starts and reports implemented hooks", func(t *testing.T) {
// Verify hooks are wired
require.NotNil(t, sup.Hooks(), "Hooks should be wired for Python plugin")
// Verify implemented hooks
impl, err := sup.Hooks().Implemented()
require.NoError(t, err)
assert.Contains(t, impl, "OnActivate")
assert.Contains(t, impl, "OnDeactivate")
assert.Contains(t, impl, "MessageHasBeenPosted")
assert.Contains(t, impl, "ServeHTTP")
// Verify Implements() for tracked hooks
assert.True(t, sup.Implements(OnDeactivateID), "Should implement OnDeactivate")
assert.True(t, sup.Implements(MessageHasBeenPostedID), "Should implement MessageHasBeenPosted")
assert.True(t, sup.Implements(ServeHTTPID), "Should implement ServeHTTP")
})
t.Run("OnActivate is called successfully", func(t *testing.T) {
err := sup.Hooks().OnActivate()
require.NoError(t, err, "OnActivate should succeed")
})
t.Run("MessageHasBeenPosted hook receives events", func(t *testing.T) {
post := &model.Post{
Id: "test-post-id",
ChannelId: "test-channel-id",
UserId: "test-user-id",
Message: "Test message from integration test",
}
// Call the hook - should not panic or error
sup.Hooks().MessageHasBeenPosted(&Context{
SessionId: "test-session",
RequestId: "test-request",
IPAddress: "127.0.0.1",
}, post)
})
t.Run("ServeHTTP handles requests", func(t *testing.T) {
// Create a test HTTP request
req := httptest.NewRequest("GET", "/hello", nil)
w := httptest.NewRecorder()
ctx := &Context{
SessionId: "test-session",
RequestId: "test-request",
IPAddress: "127.0.0.1",
}
// Call ServeHTTP
sup.Hooks().ServeHTTP(ctx, w, req)
// Verify response
resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode, "Should return 200 OK")
// Check Content-Type
assert.Equal(t, "application/json", resp.Header.Get("Content-Type"))
})
t.Run("OnDeactivate is called on shutdown", func(t *testing.T) {
err := sup.Hooks().OnDeactivate()
require.NoError(t, err, "OnDeactivate should succeed")
})
}
// TestPythonPluginServeHTTP tests the ServeHTTP streaming functionality in detail
func TestPythonPluginServeHTTP(t *testing.T) {
// Create temp plugin directory
pluginDir, err := os.MkdirTemp("", "python-servehttp-test")
require.NoError(t, err)
defer os.RemoveAll(pluginDir)
// Determine the venv path based on OS
var venvPythonPath string
if runtime.GOOS == "windows" {
venvPythonPath = filepath.Join(pluginDir, "venv", "Scripts", "python.exe")
} else {
venvPythonPath = filepath.Join(pluginDir, "venv", "bin", "python")
}
// Create venv directory structure
require.NoError(t, os.MkdirAll(filepath.Dir(venvPythonPath), 0755))
// Compile a fake Python interpreter with comprehensive ServeHTTP handling
utils.CompileGo(t, `
package main
import (
"bytes"
"context"
"fmt"
"io"
"net"
"os"
"os/signal"
"strings"
"syscall"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
pb "github.com/mattermost/mattermost/server/public/pluginapi/grpc/generated/go/pluginapiv1"
)
type fakePluginHooks struct {
pb.UnimplementedPluginHooksServer
}
func (f *fakePluginHooks) Implemented(ctx context.Context, req *pb.ImplementedRequest) (*pb.ImplementedResponse, error) {
return &pb.ImplementedResponse{
Hooks: []string{"OnActivate", "ServeHTTP"},
}, nil
}
func (f *fakePluginHooks) OnActivate(ctx context.Context, req *pb.OnActivateRequest) (*pb.OnActivateResponse, error) {
return &pb.OnActivateResponse{}, nil
}
func (f *fakePluginHooks) ServeHTTP(stream pb.PluginHooks_ServeHTTPServer) error {
// Receive the complete request
var init *pb.ServeHTTPRequestInit
var bodyBuf bytes.Buffer
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
if req.Init != nil {
init = req.Init
}
if len(req.BodyChunk) > 0 {
bodyBuf.Write(req.BodyChunk)
}
if req.BodyComplete {
break
}
}
if init == nil {
return fmt.Errorf("no init received")
}
var statusCode int32 = 200
var body string
var headers []*pb.HTTPHeader
switch {
case init.Url == "/api/status":
// Simple GET request test
body = ` + "`" + `{"status": "ok"}` + "`" + `
headers = []*pb.HTTPHeader{
{Key: "Content-Type", Values: []string{"application/json"}},
}
case init.Url == "/api/echo" && init.Method == "POST":
// POST request with body - echo back
body = bodyBuf.String()
headers = []*pb.HTTPHeader{
{Key: "Content-Type", Values: []string{"text/plain"}},
{Key: "X-Echo", Values: []string{"true"}},
}
case init.Url == "/api/large":
// Large response test (100KB)
body = strings.Repeat("X", 100*1024)
headers = []*pb.HTTPHeader{
{Key: "Content-Type", Values: []string{"text/plain"}},
}
case init.Url == "/api/headers":
// Return request headers
var parts []string
for _, h := range init.Headers {
parts = append(parts, fmt.Sprintf("%s=%s", h.Key, strings.Join(h.Values, ",")))
}
body = strings.Join(parts, "\n")
headers = []*pb.HTTPHeader{
{Key: "Content-Type", Values: []string{"text/plain"}},
{Key: "X-Custom-Header", Values: []string{"custom-value"}},
}
case init.Url == "/api/error":
statusCode = 500
body = ` + "`" + `{"error": "internal error"}` + "`" + `
headers = []*pb.HTTPHeader{
{Key: "Content-Type", Values: []string{"application/json"}},
}
default:
statusCode = 404
body = ` + "`" + `{"error": "not found"}` + "`" + `
headers = []*pb.HTTPHeader{
{Key: "Content-Type", Values: []string{"application/json"}},
}
}
// For large responses, send in chunks
bodyBytes := []byte(body)
chunkSize := 64 * 1024 // 64KB chunks
if len(bodyBytes) <= chunkSize {
// Small response - send in one message
return stream.Send(&pb.ServeHTTPResponse{
Init: &pb.ServeHTTPResponseInit{
StatusCode: statusCode,
Headers: headers,
},
BodyChunk: bodyBytes,
BodyComplete: true,
})
}
// Large response - send init first
err := stream.Send(&pb.ServeHTTPResponse{
Init: &pb.ServeHTTPResponseInit{
StatusCode: statusCode,
Headers: headers,
},
BodyChunk: bodyBytes[:chunkSize],
BodyComplete: false,
})
if err != nil {
return err
}
// Send remaining chunks
offset := chunkSize
for offset < len(bodyBytes) {
end := offset + chunkSize
if end > len(bodyBytes) {
end = len(bodyBytes)
}
isLast := end == len(bodyBytes)
err := stream.Send(&pb.ServeHTTPResponse{
BodyChunk: bodyBytes[offset:end],
BodyComplete: isLast,
})
if err != nil {
return err
}
offset = end
}
return nil
}
func main() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
fmt.Fprintf(os.Stderr, "failed to listen: %v\n", err)
os.Exit(1)
}
grpcServer := grpc.NewServer()
healthServer := health.NewServer()
healthServer.SetServingStatus("plugin", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
pb.RegisterPluginHooksServer(grpcServer, &fakePluginHooks{})
go func() {
if err := grpcServer.Serve(listener); err != nil {
fmt.Fprintf(os.Stderr, "failed to serve: %v\n", err)
}
}()
addr := listener.Addr().String()
fmt.Printf("1|1|tcp|%s|grpc\n", addr)
os.Stdout.Sync()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
<-sigCh
grpcServer.GracefulStop()
}
`, venvPythonPath)
// Create a dummy plugin.py file
scriptPath := filepath.Join(pluginDir, "plugin.py")
require.NoError(t, os.WriteFile(scriptPath, []byte("# ServeHTTP test plugin\n"), 0644))
// Create plugin.json manifest
manifest := &model.Manifest{
Id: "python-servehttp-test",
Version: "1.0.0",
Server: &model.ManifestServer{
Executable: "plugin.py",
},
}
manifestJSON, err := json.Marshal(manifest)
require.NoError(t, err)
require.NoError(t, os.WriteFile(filepath.Join(pluginDir, "plugin.json"), manifestJSON, 0644))
// Create bundle info
bundle := model.BundleInfoForPath(pluginDir)
require.NotNil(t, bundle.Manifest)
// Create logger
logger := mlog.CreateConsoleTestLogger(t)
// Create supervisor
sup, err := newSupervisor(bundle, nil, nil, logger, nil, WithCommandFromManifest(bundle, nil, nil))
require.NoError(t, err)
require.NotNil(t, sup)
defer sup.Shutdown()
ctx := &Context{
SessionId: "test-session",
RequestId: "test-request",
IPAddress: "127.0.0.1",
}
t.Run("Simple GET request - verify response status and body", func(t *testing.T) {
req := httptest.NewRequest("GET", "/api/status", nil)
w := httptest.NewRecorder()
sup.Hooks().ServeHTTP(ctx, w, req)
resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "application/json", resp.Header.Get("Content-Type"))
assert.Contains(t, w.Body.String(), `"status": "ok"`)
})
t.Run("POST request with body - verify request body received by plugin", func(t *testing.T) {
body := "Hello, this is the request body!"
req := httptest.NewRequest("POST", "/api/echo", strings.NewReader(body))
req.Header.Set("Content-Type", "text/plain")
w := httptest.NewRecorder()
sup.Hooks().ServeHTTP(ctx, w, req)
resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "text/plain", resp.Header.Get("Content-Type"))
assert.Equal(t, "true", resp.Header.Get("X-Echo"))
assert.Equal(t, body, w.Body.String())
})
t.Run("Large response body - verify streaming works (>64KB response)", func(t *testing.T) {
req := httptest.NewRequest("GET", "/api/large", nil)
w := httptest.NewRecorder()
sup.Hooks().ServeHTTP(ctx, w, req)
resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "text/plain", resp.Header.Get("Content-Type"))
// Verify full 100KB response received
assert.Equal(t, 100*1024, w.Body.Len(), "Should receive full 100KB response")
// Verify content is correct (all X's)
assert.True(t, strings.HasPrefix(w.Body.String(), "XXXX"), "Body should start with X's")
})
t.Run("Request headers - verify headers passed to plugin", func(t *testing.T) {
req := httptest.NewRequest("GET", "/api/headers", nil)
req.Header.Set("X-Test-Header", "test-value")
req.Header.Set("Authorization", "Bearer token123")
w := httptest.NewRecorder()
sup.Hooks().ServeHTTP(ctx, w, req)
resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode)
// Check that our headers were received (they'll be in the response body)
body := w.Body.String()
assert.Contains(t, body, "X-Test-Header")
})
t.Run("Response headers - verify plugin can set response headers", func(t *testing.T) {
req := httptest.NewRequest("GET", "/api/headers", nil)
w := httptest.NewRecorder()
sup.Hooks().ServeHTTP(ctx, w, req)
resp := w.Result()
assert.Equal(t, http.StatusOK, resp.StatusCode)
assert.Equal(t, "custom-value", resp.Header.Get("X-Custom-Header"))
})
t.Run("Error response - verify status codes are correctly propagated", func(t *testing.T) {
req := httptest.NewRequest("GET", "/api/error", nil)
w := httptest.NewRecorder()
sup.Hooks().ServeHTTP(ctx, w, req)
resp := w.Result()
assert.Equal(t, http.StatusInternalServerError, resp.StatusCode)
assert.Contains(t, w.Body.String(), "internal error")
})
t.Run("Not found - verify 404 handling", func(t *testing.T) {
req := httptest.NewRequest("GET", "/api/nonexistent", nil)
w := httptest.NewRecorder()
sup.Hooks().ServeHTTP(ctx, w, req)
resp := w.Result()
assert.Equal(t, http.StatusNotFound, resp.StatusCode)
})
}
// TestPythonPluginEnvironmentIntegration tests the full Environment integration
// with a Python plugin including activation, health checks, and deactivation.
func TestPythonPluginEnvironmentIntegration(t *testing.T) {
// Create temp directories
pluginDir, err := os.MkdirTemp("", "python-env-integration-test-plugins")
require.NoError(t, err)
defer os.RemoveAll(pluginDir)
webappDir, err := os.MkdirTemp("", "python-env-integration-test-webapp")
require.NoError(t, err)
defer os.RemoveAll(webappDir)
// Create plugin subdirectory
pluginPath := filepath.Join(pluginDir, "python-env-test")
require.NoError(t, os.MkdirAll(pluginPath, 0755))
// Determine the venv path based on OS
var venvPythonPath string
if runtime.GOOS == "windows" {
venvPythonPath = filepath.Join(pluginPath, "venv", "Scripts", "python.exe")
} else {
venvPythonPath = filepath.Join(pluginPath, "venv", "bin", "python")
}
// Create venv directory structure
require.NoError(t, os.MkdirAll(filepath.Dir(venvPythonPath), 0755))
// Compile fake Python interpreter
utils.CompileGo(t, `
package main
import (
"context"
"fmt"
"io"
"net"
"os"
"os/signal"
"syscall"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
pb "github.com/mattermost/mattermost/server/public/pluginapi/grpc/generated/go/pluginapiv1"
)
type fakePluginHooks struct {
pb.UnimplementedPluginHooksServer
}
func (f *fakePluginHooks) Implemented(ctx context.Context, req *pb.ImplementedRequest) (*pb.ImplementedResponse, error) {
return &pb.ImplementedResponse{
Hooks: []string{"OnActivate", "OnDeactivate", "OnConfigurationChange", "ServeHTTP"},
}, nil
}
func (f *fakePluginHooks) OnActivate(ctx context.Context, req *pb.OnActivateRequest) (*pb.OnActivateResponse, error) {
return &pb.OnActivateResponse{}, nil
}
func (f *fakePluginHooks) OnDeactivate(ctx context.Context, req *pb.OnDeactivateRequest) (*pb.OnDeactivateResponse, error) {
return &pb.OnDeactivateResponse{}, nil
}
func (f *fakePluginHooks) OnConfigurationChange(ctx context.Context, req *pb.OnConfigurationChangeRequest) (*pb.OnConfigurationChangeResponse, error) {
return &pb.OnConfigurationChangeResponse{}, nil
}
func (f *fakePluginHooks) ServeHTTP(stream pb.PluginHooks_ServeHTTPServer) error {
// Receive request
for {
req, err := stream.Recv()
if err == io.EOF {
break
}
if err != nil {
return err
}
if req.BodyComplete {
break
}
}
// Send simple response
return stream.Send(&pb.ServeHTTPResponse{
Init: &pb.ServeHTTPResponseInit{
StatusCode: 200,
Headers: []*pb.HTTPHeader{
{Key: "Content-Type", Values: []string{"application/json"}},
},
},
BodyChunk: []byte(` + "`" + `{"plugin": "python-env-test"}` + "`" + `),
BodyComplete: true,
})
}
func main() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
fmt.Fprintf(os.Stderr, "failed to listen: %v\n", err)
os.Exit(1)
}
grpcServer := grpc.NewServer()
healthServer := health.NewServer()
healthServer.SetServingStatus("plugin", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
pb.RegisterPluginHooksServer(grpcServer, &fakePluginHooks{})
go func() {
if err := grpcServer.Serve(listener); err != nil {
fmt.Fprintf(os.Stderr, "failed to serve: %v\n", err)
}
}()
addr := listener.Addr().String()
fmt.Printf("1|1|tcp|%s|grpc\n", addr)
os.Stdout.Sync()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
<-sigCh
grpcServer.GracefulStop()
}
`, venvPythonPath)
// Create dummy plugin.py
require.NoError(t, os.WriteFile(filepath.Join(pluginPath, "plugin.py"), []byte("# Env test\n"), 0644))
// Create plugin.json
manifest := &model.Manifest{
Id: "python-env-test",
Version: "1.0.0",
Server: &model.ManifestServer{
Executable: "plugin.py",
},
}
manifestJSON, err := json.Marshal(manifest)
require.NoError(t, err)
require.NoError(t, os.WriteFile(filepath.Join(pluginPath, "plugin.json"), manifestJSON, 0644))
// Create Environment
logger := mlog.CreateConsoleTestLogger(t)
env, err := NewEnvironment(
func(m *model.Manifest) API { return nil },
nil,
pluginDir,
webappDir,
logger,
nil,
)
require.NoError(t, err)
defer env.Shutdown()
t.Run("Activate plugin", func(t *testing.T) {
retManifest, activated, err := env.Activate("python-env-test")
require.NoError(t, err)
assert.True(t, activated, "Plugin should be activated")
assert.NotNil(t, retManifest)
assert.True(t, env.IsActive("python-env-test"))
})
t.Run("Health check succeeds", func(t *testing.T) {
err := env.PerformHealthCheck("python-env-test")
require.NoError(t, err, "Health check should succeed")
})
t.Run("RunMultiPluginHook dispatches hooks", func(t *testing.T) {
// This tests that hook dispatch works through the Environment
// OnConfigurationChange is a void hook that should be callable
env.RunMultiPluginHook(func(hooks Hooks, manifest *model.Manifest) bool {
err := hooks.OnConfigurationChange()
return err == nil
}, OnConfigurationChangeID)
})
t.Run("Deactivate plugin", func(t *testing.T) {
result := env.Deactivate("python-env-test")
assert.True(t, result, "Deactivate should return true")
assert.False(t, env.IsActive("python-env-test"))
})
}
// TestPythonPluginCrashRecovery tests that a crashed Python plugin can be restarted
func TestPythonPluginCrashRecovery(t *testing.T) {
// Create temp directories
pluginDir, err := os.MkdirTemp("", "python-crash-recovery-plugins")
require.NoError(t, err)
defer os.RemoveAll(pluginDir)
webappDir, err := os.MkdirTemp("", "python-crash-recovery-webapp")
require.NoError(t, err)
defer os.RemoveAll(webappDir)
// Create plugin subdirectory
pluginPath := filepath.Join(pluginDir, "python-crash-test")
require.NoError(t, os.MkdirAll(pluginPath, 0755))
// Determine the venv path based on OS
var venvPythonPath string
if runtime.GOOS == "windows" {
venvPythonPath = filepath.Join(pluginPath, "venv", "Scripts", "python.exe")
} else {
venvPythonPath = filepath.Join(pluginPath, "venv", "bin", "python")
}
// Create venv directory structure
require.NoError(t, os.MkdirAll(filepath.Dir(venvPythonPath), 0755))
// Compile fake Python interpreter
utils.CompileGo(t, `
package main
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
pb "github.com/mattermost/mattermost/server/public/pluginapi/grpc/generated/go/pluginapiv1"
)
type fakePluginHooks struct {
pb.UnimplementedPluginHooksServer
}
func (f *fakePluginHooks) Implemented(ctx context.Context, req *pb.ImplementedRequest) (*pb.ImplementedResponse, error) {
return &pb.ImplementedResponse{Hooks: []string{"OnActivate"}}, nil
}
func (f *fakePluginHooks) OnActivate(ctx context.Context, req *pb.OnActivateRequest) (*pb.OnActivateResponse, error) {
return &pb.OnActivateResponse{}, nil
}
func main() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
fmt.Fprintf(os.Stderr, "failed to listen: %v\n", err)
os.Exit(1)
}
grpcServer := grpc.NewServer()
healthServer := health.NewServer()
healthServer.SetServingStatus("plugin", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
pb.RegisterPluginHooksServer(grpcServer, &fakePluginHooks{})
go func() {
if err := grpcServer.Serve(listener); err != nil {
fmt.Fprintf(os.Stderr, "failed to serve: %v\n", err)
}
}()
addr := listener.Addr().String()
fmt.Printf("1|1|tcp|%s|grpc\n", addr)
os.Stdout.Sync()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
<-sigCh
grpcServer.GracefulStop()
}
`, venvPythonPath)
// Create dummy plugin.py
require.NoError(t, os.WriteFile(filepath.Join(pluginPath, "plugin.py"), []byte("# Crash test\n"), 0644))
// Create plugin.json
manifest := &model.Manifest{
Id: "python-crash-test",
Version: "1.0.0",
Server: &model.ManifestServer{
Executable: "plugin.py",
},
}
manifestJSON, err := json.Marshal(manifest)
require.NoError(t, err)
require.NoError(t, os.WriteFile(filepath.Join(pluginPath, "plugin.json"), manifestJSON, 0644))
// Create Environment
logger := mlog.CreateConsoleTestLogger(t)
env, err := NewEnvironment(
func(m *model.Manifest) API { return nil },
nil,
pluginDir,
webappDir,
logger,
nil,
)
require.NoError(t, err)
defer env.Shutdown()
// Activate the plugin
_, activated, err := env.Activate("python-crash-test")
require.NoError(t, err)
require.True(t, activated)
// Verify initial health
err = env.PerformHealthCheck("python-crash-test")
require.NoError(t, err)
// Kill the plugin process
rp, ok := env.registeredPlugins.Load("python-crash-test")
require.True(t, ok)
registeredPlug := rp.(registeredPlugin)
require.NotNil(t, registeredPlug.supervisor.client)
registeredPlug.supervisor.client.Kill()
// Wait for health check to fail
healthCheckFailed := false
for i := 0; i < 10; i++ {
err = env.PerformHealthCheck("python-crash-test")
if err != nil {
healthCheckFailed = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, healthCheckFailed, "Health check should fail after crash")
// Restart the plugin
err = env.RestartPlugin("python-crash-test")
require.NoError(t, err)
// Verify recovery
healthCheckSucceeded := false
for i := 0; i < 20; i++ {
err = env.PerformHealthCheck("python-crash-test")
if err == nil {
healthCheckSucceeded = true
break
}
time.Sleep(100 * time.Millisecond)
}
require.True(t, healthCheckSucceeded, "Health check should succeed after restart")
assert.True(t, env.IsActive("python-crash-test"))
}
// TestPythonPluginImplementsChecking tests that the supervisor correctly
// tracks which hooks are implemented
func TestPythonPluginImplementsChecking(t *testing.T) {
// Create temp plugin directory
pluginDir, err := os.MkdirTemp("", "python-implements-test")
require.NoError(t, err)
defer os.RemoveAll(pluginDir)
// Determine the venv path based on OS
var venvPythonPath string
if runtime.GOOS == "windows" {
venvPythonPath = filepath.Join(pluginDir, "venv", "Scripts", "python.exe")
} else {
venvPythonPath = filepath.Join(pluginDir, "venv", "bin", "python")
}
// Create venv directory structure
require.NoError(t, os.MkdirAll(filepath.Dir(venvPythonPath), 0755))
// Compile fake Python interpreter that only implements specific hooks
utils.CompileGo(t, `
package main
import (
"context"
"fmt"
"net"
"os"
"os/signal"
"syscall"
"google.golang.org/grpc"
"google.golang.org/grpc/health"
"google.golang.org/grpc/health/grpc_health_v1"
pb "github.com/mattermost/mattermost/server/public/pluginapi/grpc/generated/go/pluginapiv1"
)
type fakePluginHooks struct {
pb.UnimplementedPluginHooksServer
}
func (f *fakePluginHooks) Implemented(ctx context.Context, req *pb.ImplementedRequest) (*pb.ImplementedResponse, error) {
// Only implement specific hooks for testing
return &pb.ImplementedResponse{
Hooks: []string{
"OnActivate",
"MessageHasBeenPosted",
"UserHasJoinedChannel",
// Deliberately NOT implementing: OnDeactivate, ServeHTTP, ExecuteCommand, etc.
},
}, nil
}
func (f *fakePluginHooks) OnActivate(ctx context.Context, req *pb.OnActivateRequest) (*pb.OnActivateResponse, error) {
return &pb.OnActivateResponse{}, nil
}
func (f *fakePluginHooks) MessageHasBeenPosted(ctx context.Context, req *pb.MessageHasBeenPostedRequest) (*pb.MessageHasBeenPostedResponse, error) {
return &pb.MessageHasBeenPostedResponse{}, nil
}
func (f *fakePluginHooks) UserHasJoinedChannel(ctx context.Context, req *pb.UserHasJoinedChannelRequest) (*pb.UserHasJoinedChannelResponse, error) {
return &pb.UserHasJoinedChannelResponse{}, nil
}
func main() {
listener, err := net.Listen("tcp", "127.0.0.1:0")
if err != nil {
fmt.Fprintf(os.Stderr, "failed to listen: %v\n", err)
os.Exit(1)
}
grpcServer := grpc.NewServer()
healthServer := health.NewServer()
healthServer.SetServingStatus("plugin", grpc_health_v1.HealthCheckResponse_SERVING)
grpc_health_v1.RegisterHealthServer(grpcServer, healthServer)
pb.RegisterPluginHooksServer(grpcServer, &fakePluginHooks{})
go func() {
if err := grpcServer.Serve(listener); err != nil {
fmt.Fprintf(os.Stderr, "failed to serve: %v\n", err)
}
}()
addr := listener.Addr().String()
fmt.Printf("1|1|tcp|%s|grpc\n", addr)
os.Stdout.Sync()
sigCh := make(chan os.Signal, 1)
signal.Notify(sigCh, syscall.SIGTERM, syscall.SIGINT)
<-sigCh
grpcServer.GracefulStop()
}
`, venvPythonPath)
// Create a dummy plugin.py file
scriptPath := filepath.Join(pluginDir, "plugin.py")
require.NoError(t, os.WriteFile(scriptPath, []byte("# Implements test\n"), 0644))
// Create plugin.json manifest
manifest := &model.Manifest{
Id: "python-implements-test",
Version: "1.0.0",
Server: &model.ManifestServer{
Executable: "plugin.py",
},
}
manifestJSON, err := json.Marshal(manifest)
require.NoError(t, err)
require.NoError(t, os.WriteFile(filepath.Join(pluginDir, "plugin.json"), manifestJSON, 0644))
// Create bundle info
bundle := model.BundleInfoForPath(pluginDir)
require.NotNil(t, bundle.Manifest)
// Create logger
logger := mlog.CreateConsoleTestLogger(t)
// Create supervisor
sup, err := newSupervisor(bundle, nil, nil, logger, nil, WithCommandFromManifest(bundle, nil, nil))
require.NoError(t, err)
require.NotNil(t, sup)
defer sup.Shutdown()
// Test implemented hooks return true
assert.True(t, sup.Implements(MessageHasBeenPostedID), "MessageHasBeenPosted should be implemented")
assert.True(t, sup.Implements(UserHasJoinedChannelID), "UserHasJoinedChannel should be implemented")
// Test non-implemented hooks return false
assert.False(t, sup.Implements(OnDeactivateID), "OnDeactivate should NOT be implemented")
assert.False(t, sup.Implements(ServeHTTPID), "ServeHTTP should NOT be implemented")
assert.False(t, sup.Implements(ExecuteCommandID), "ExecuteCommand should NOT be implemented")
assert.False(t, sup.Implements(ChannelHasBeenCreatedID), "ChannelHasBeenCreated should NOT be implemented")
}