mirror of
https://github.com/mattermost/mattermost.git
synced 2026-02-03 20:40:00 -05:00
Go side: - Log hooks returned by Implemented() - Log each hook name -> ID mapping - Log OnActivate implementation status - Log OnActivate call flow Python side: - Log Implemented() return value - Log OnActivate gRPC receipt and handler invocation This is temporary debug logging to diagnose why OnActivate isn't being called for Python plugins. Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
368 lines
11 KiB
Python
368 lines
11 KiB
Python
# Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
# See LICENSE.txt for license information.
|
|
|
|
"""
|
|
Plugin server bootstrap for Mattermost Python plugins.
|
|
|
|
This module provides the gRPC server infrastructure for Python plugins:
|
|
- Async gRPC server with configurable options
|
|
- Health service registration (grpc.health.v1.Health)
|
|
- go-plugin compatible handshake line output
|
|
- Graceful shutdown handling
|
|
|
|
Usage:
|
|
from mattermost_plugin import Plugin, hook, HookName
|
|
from mattermost_plugin.server import serve_plugin
|
|
|
|
class MyPlugin(Plugin):
|
|
@hook(HookName.OnActivate)
|
|
def on_activate(self) -> None:
|
|
self.logger.info("Plugin activated!")
|
|
|
|
if __name__ == "__main__":
|
|
import asyncio
|
|
asyncio.run(serve_plugin(MyPlugin))
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import asyncio
|
|
import logging
|
|
import signal
|
|
import sys
|
|
from typing import Optional, Sequence, Tuple, Type, TYPE_CHECKING
|
|
|
|
import grpc
|
|
from grpc import aio as grpc_aio
|
|
from grpc_health.v1 import health, health_pb2, health_pb2_grpc
|
|
|
|
from mattermost_plugin.runtime_config import RuntimeConfig, load_runtime_config
|
|
from mattermost_plugin._internal.hook_runner import DEFAULT_HOOK_TIMEOUT
|
|
from mattermost_plugin.servicers.hooks_servicer import PluginHooksServicerImpl
|
|
from mattermost_plugin.grpc import hooks_pb2_grpc
|
|
|
|
if TYPE_CHECKING:
|
|
from mattermost_plugin.plugin import Plugin
|
|
from mattermost_plugin.client import PluginAPIClient
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# Default gRPC server options (consistent with Phase 6 channel options)
|
|
DEFAULT_SERVER_OPTIONS: Sequence[Tuple[str, int]] = [
|
|
("grpc.max_send_message_length", 64 * 1024 * 1024), # 64MB
|
|
("grpc.max_receive_message_length", 64 * 1024 * 1024), # 64MB
|
|
]
|
|
|
|
# go-plugin handshake protocol version
|
|
GO_PLUGIN_CORE_PROTOCOL_VERSION = "1"
|
|
GO_PLUGIN_APP_PROTOCOL_VERSION = "1"
|
|
|
|
|
|
def format_handshake_line(port: int) -> str:
|
|
"""
|
|
Format the go-plugin handshake line.
|
|
|
|
The handshake format is:
|
|
CORE-PROTOCOL-VERSION | APP-PROTOCOL-VERSION | NETWORK-TYPE | NETWORK-ADDR | PROTOCOL
|
|
|
|
Args:
|
|
port: The port number the gRPC server is listening on.
|
|
|
|
Returns:
|
|
The handshake line string (without newline).
|
|
"""
|
|
return f"{GO_PLUGIN_CORE_PROTOCOL_VERSION}|{GO_PLUGIN_APP_PROTOCOL_VERSION}|tcp|127.0.0.1:{port}|grpc"
|
|
|
|
|
|
class PluginServer:
|
|
"""
|
|
gRPC server wrapper for Mattermost Python plugins.
|
|
|
|
This class manages the gRPC server lifecycle, including:
|
|
- Server creation with health service
|
|
- Handshake line output
|
|
- Graceful shutdown
|
|
|
|
The server binds to an ephemeral port on localhost (127.0.0.1:0)
|
|
and outputs the actual port in the handshake line.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
plugin_instance: "Plugin",
|
|
config: Optional[RuntimeConfig] = None,
|
|
options: Optional[Sequence[Tuple[str, int]]] = None,
|
|
) -> None:
|
|
"""
|
|
Initialize the plugin server.
|
|
|
|
Args:
|
|
plugin_instance: The Plugin instance to serve.
|
|
config: Runtime configuration. If None, loaded from environment.
|
|
options: gRPC server options. If None, uses defaults.
|
|
"""
|
|
self.plugin = plugin_instance
|
|
self.config = config or load_runtime_config()
|
|
self.options = options or DEFAULT_SERVER_OPTIONS
|
|
|
|
self._server: Optional[grpc_aio.Server] = None
|
|
self._port: Optional[int] = None
|
|
self._shutdown_event: Optional[asyncio.Event] = None
|
|
self._health_servicer: Optional[health.HealthServicer] = None
|
|
self._hooks_servicer: Optional[PluginHooksServicerImpl] = None
|
|
|
|
@property
|
|
def port(self) -> Optional[int]:
|
|
"""The port the server is listening on, or None if not started."""
|
|
return self._port
|
|
|
|
async def start(self) -> int:
|
|
"""
|
|
Start the gRPC server.
|
|
|
|
This method:
|
|
1. Creates the async gRPC server
|
|
2. Registers the health service
|
|
3. Binds to an ephemeral port
|
|
4. Starts the server
|
|
5. Outputs the handshake line to stdout
|
|
|
|
Returns:
|
|
The port number the server is listening on.
|
|
|
|
Raises:
|
|
RuntimeError: If the server is already started.
|
|
"""
|
|
if self._server is not None:
|
|
raise RuntimeError("Server is already started")
|
|
|
|
# Create the async server
|
|
self._server = grpc_aio.server(options=list(self.options))
|
|
|
|
# Register health service
|
|
self._health_servicer = health.HealthServicer()
|
|
health_pb2_grpc.add_HealthServicer_to_server(
|
|
self._health_servicer, self._server
|
|
)
|
|
|
|
# Set health status for "plugin" service (required by go-plugin)
|
|
self._health_servicer.set(
|
|
"plugin",
|
|
health_pb2.HealthCheckResponse.SERVING,
|
|
)
|
|
|
|
# Also set overall health (empty service name)
|
|
self._health_servicer.set(
|
|
"",
|
|
health_pb2.HealthCheckResponse.SERVING,
|
|
)
|
|
|
|
# Register hook servicer
|
|
self._hooks_servicer = PluginHooksServicerImpl(
|
|
plugin=self.plugin,
|
|
timeout=DEFAULT_HOOK_TIMEOUT,
|
|
)
|
|
hooks_pb2_grpc.add_PluginHooksServicer_to_server(
|
|
self._hooks_servicer, self._server
|
|
)
|
|
logger.debug("Hook servicer registered")
|
|
|
|
# Bind to ephemeral port on localhost
|
|
self._port = self._server.add_insecure_port("127.0.0.1:0")
|
|
|
|
# Start the server
|
|
await self._server.start()
|
|
|
|
logger.info(f"Plugin server started on port {self._port}")
|
|
|
|
# Output handshake line to stdout (MUST be first stdout line)
|
|
# This is critical for go-plugin compatibility
|
|
handshake = format_handshake_line(self._port)
|
|
print(handshake, flush=True)
|
|
|
|
logger.debug(f"Handshake output: {handshake}")
|
|
|
|
return self._port
|
|
|
|
async def stop(self, grace_period: float = 5.0) -> None:
|
|
"""
|
|
Stop the gRPC server gracefully.
|
|
|
|
Args:
|
|
grace_period: Time in seconds to wait for ongoing RPCs to complete.
|
|
"""
|
|
if self._server is None:
|
|
return
|
|
|
|
logger.info("Stopping plugin server...")
|
|
|
|
# Set health to NOT_SERVING before shutdown
|
|
if self._health_servicer is not None:
|
|
self._health_servicer.set(
|
|
"plugin",
|
|
health_pb2.HealthCheckResponse.NOT_SERVING,
|
|
)
|
|
self._health_servicer.set(
|
|
"",
|
|
health_pb2.HealthCheckResponse.NOT_SERVING,
|
|
)
|
|
|
|
# Stop the server with grace period
|
|
await self._server.stop(grace_period)
|
|
|
|
self._server = None
|
|
self._port = None
|
|
|
|
logger.info("Plugin server stopped")
|
|
|
|
async def wait_for_termination(self) -> None:
|
|
"""
|
|
Wait for the server to terminate.
|
|
|
|
This method blocks until the server is stopped, either by
|
|
calling stop() or by receiving a shutdown signal.
|
|
"""
|
|
if self._server is None:
|
|
return
|
|
|
|
await self._server.wait_for_termination()
|
|
|
|
|
|
async def serve_plugin(
|
|
plugin_class: Type["Plugin"],
|
|
config: Optional[RuntimeConfig] = None,
|
|
options: Optional[Sequence[Tuple[str, int]]] = None,
|
|
) -> None:
|
|
"""
|
|
Start a plugin server and wait for termination.
|
|
|
|
This is the main entry point for running a Python plugin. It:
|
|
1. Loads runtime configuration from environment
|
|
2. Creates the plugin instance with API client
|
|
3. Starts the gRPC server with health service
|
|
4. Outputs the go-plugin handshake line
|
|
5. Waits for termination (signal or explicit stop)
|
|
|
|
Args:
|
|
plugin_class: The Plugin subclass to instantiate and serve.
|
|
config: Optional runtime configuration. If None, loaded from env.
|
|
options: Optional gRPC server options.
|
|
|
|
Example:
|
|
from mattermost_plugin import Plugin, hook, HookName
|
|
from mattermost_plugin.server import serve_plugin
|
|
|
|
class MyPlugin(Plugin):
|
|
@hook(HookName.OnActivate)
|
|
def on_activate(self) -> None:
|
|
self.logger.info("Activated!")
|
|
|
|
if __name__ == "__main__":
|
|
import asyncio
|
|
asyncio.run(serve_plugin(MyPlugin))
|
|
"""
|
|
# Load configuration
|
|
if config is None:
|
|
config = load_runtime_config()
|
|
|
|
# Configure logging (to stderr, not stdout)
|
|
config.configure_logging()
|
|
|
|
logger.info(f"Starting plugin: {config.plugin_id or plugin_class.__name__}")
|
|
logger.debug(f"API target: {config.api_target}")
|
|
|
|
# Create API client (optional - may not be connected yet)
|
|
api_client: Optional["PluginAPIClient"] = None
|
|
try:
|
|
from mattermost_plugin.client import PluginAPIClient
|
|
|
|
if config.api_target:
|
|
api_client = PluginAPIClient(target=config.api_target)
|
|
# Note: We don't connect yet - hooks will connect when needed
|
|
except Exception as e:
|
|
logger.warning(f"Could not create API client: {e}")
|
|
|
|
# Create plugin instance
|
|
plugin_instance = plugin_class(
|
|
api=api_client,
|
|
config=config,
|
|
)
|
|
|
|
logger.info(
|
|
f"Plugin implements hooks: {plugin_class.implemented_hooks()}"
|
|
)
|
|
|
|
# Create and start server
|
|
server = PluginServer(
|
|
plugin_instance=plugin_instance,
|
|
config=config,
|
|
options=options,
|
|
)
|
|
|
|
# Set up signal handlers for graceful shutdown
|
|
shutdown_event = asyncio.Event()
|
|
|
|
def signal_handler(sig: signal.Signals) -> None:
|
|
logger.info(f"Received signal {sig.name}, initiating shutdown...")
|
|
shutdown_event.set()
|
|
|
|
# Register signal handlers
|
|
loop = asyncio.get_running_loop()
|
|
for sig in (signal.SIGTERM, signal.SIGINT):
|
|
try:
|
|
loop.add_signal_handler(sig, signal_handler, sig)
|
|
except NotImplementedError:
|
|
# Windows doesn't support add_signal_handler for all signals
|
|
pass
|
|
|
|
try:
|
|
# Start the server
|
|
await server.start()
|
|
|
|
# Wait for shutdown signal or server termination
|
|
await asyncio.wait(
|
|
[
|
|
asyncio.create_task(shutdown_event.wait()),
|
|
asyncio.create_task(server.wait_for_termination()),
|
|
],
|
|
return_when=asyncio.FIRST_COMPLETED,
|
|
)
|
|
|
|
except Exception as e:
|
|
logger.exception(f"Error running plugin server: {e}")
|
|
raise
|
|
|
|
finally:
|
|
# Clean shutdown
|
|
await server.stop()
|
|
|
|
# Close API client if connected
|
|
if api_client is not None:
|
|
api_client.close()
|
|
|
|
logger.info("Plugin shutdown complete")
|
|
|
|
|
|
def run_plugin(plugin_class: Type["Plugin"]) -> None:
|
|
"""
|
|
Convenience function to run a plugin synchronously.
|
|
|
|
This function handles the asyncio event loop creation and
|
|
runs the plugin until termination.
|
|
|
|
Args:
|
|
plugin_class: The Plugin subclass to run.
|
|
|
|
Example:
|
|
from mattermost_plugin import Plugin, hook, HookName
|
|
from mattermost_plugin.server import run_plugin
|
|
|
|
class MyPlugin(Plugin):
|
|
@hook(HookName.OnActivate)
|
|
def on_activate(self) -> None:
|
|
pass
|
|
|
|
if __name__ == "__main__":
|
|
run_plugin(MyPlugin)
|
|
"""
|
|
asyncio.run(serve_plugin(plugin_class))
|