mattermost/python-sdk/build/lib/mattermost_plugin/servicers/hooks_servicer.py
Nick Misasi a26942bde1 fix(python-sdk): accept dict responses from ExecuteCommand hook
- Add _to_command_response_proto helper to convert dict/wrapper/protobuf
  to CommandResponse protobuf
- Plugin developers can now return plain dicts with response_type, text, etc.
- Also supports CommandResponse wrapper class and raw protobufs
- Export CommandResponse from mattermost_plugin package

This fixes the "ExecuteCommand returned unexpected type: <class 'dict'>"
error when Python plugin handlers return dict responses.
2026-01-20 10:08:47 -05:00

2606 lines
91 KiB
Python

# Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
# See LICENSE.txt for license information.
"""
gRPC Hook Servicer for Mattermost Python plugins.
This module implements the PluginHooksServicer gRPC service, handling hook
invocations from the Mattermost server.
ERROR HANDLING CONVENTION:
--------------------------
1. Plugin bugs (exceptions in handlers): Return gRPC INTERNAL status.
The Go side should log and fail-open where appropriate.
2. Expected business rejections (e.g., post rejected by MessageWillBePosted):
Encoded via response fields (rejection_reason), NOT gRPC errors.
3. OnActivate errors: Encoded via response.error field (AppError).
This allows the Go side to distinguish activation failure from transport failure.
HOOK HANDLER INVOCATION:
------------------------
- All hook handlers are invoked via the HookRunner with timeout support.
- Both sync and async handlers are supported.
- Handler exceptions are caught and converted to appropriate gRPC responses.
TYPE STRATEGY:
--------------
For this initial implementation, hook handlers receive raw protobuf messages.
Future versions may add Python wrapper types for better ergonomics.
"""
from __future__ import annotations
import logging
from typing import TYPE_CHECKING, Optional
import grpc
from mattermost_plugin.grpc import hooks_pb2_grpc
from mattermost_plugin.grpc import hooks_lifecycle_pb2
from mattermost_plugin.grpc import hooks_message_pb2
from mattermost_plugin.grpc import hooks_user_channel_pb2
from mattermost_plugin.grpc import hooks_command_pb2
from mattermost_plugin.grpc import hooks_http_pb2
from mattermost_plugin.grpc import common_pb2
from mattermost_plugin.grpc import api_remaining_pb2
from mattermost_plugin._internal.hook_runner import HookRunner, DEFAULT_HOOK_TIMEOUT
from mattermost_plugin._internal.wrappers import CommandResponse as CommandResponseWrapper
if TYPE_CHECKING:
from mattermost_plugin.plugin import Plugin
logger = logging.getLogger(__name__)
# DismissPostError is the special string that tells the server to silently
# dismiss the post (no error shown to user). Must match the Go constant.
DISMISS_POST_ERROR = "plugin.message_will_be_posted.dismiss_post"
def _make_app_error(
message: str,
error_id: str = "plugin.error",
detailed_error: str = "",
status_code: int = 500,
where: str = "",
) -> common_pb2.AppError:
"""Create an AppError protobuf message."""
return common_pb2.AppError(
id=error_id,
message=message,
detailed_error=detailed_error,
status_code=status_code,
where=where,
)
def _to_command_response_proto(
result: object,
logger: logging.Logger,
) -> Optional[api_remaining_pb2.CommandResponse]:
"""
Convert a handler result to a CommandResponse protobuf.
Supports:
- dict with command response fields (response_type, text, etc.)
- CommandResponseWrapper (the SDK wrapper class)
- api_remaining_pb2.CommandResponse (protobuf, passed through)
Args:
result: The return value from the ExecuteCommand handler.
logger: Logger for warnings/errors.
Returns:
A CommandResponse protobuf, or None if conversion failed.
"""
if result is None:
return None
# Handle dict responses (most common from plugin developers)
if isinstance(result, dict):
try:
return api_remaining_pb2.CommandResponse(
response_type=result.get("response_type", ""),
text=result.get("text", ""),
username=result.get("username", ""),
channel_id=result.get("channel_id", ""),
icon_url=result.get("icon_url", ""),
type=result.get("type", ""),
goto_location=result.get("goto_location", ""),
trigger_id=result.get("trigger_id", ""),
skip_slack_parsing=result.get("skip_slack_parsing", False),
)
except Exception as e:
logger.warning(f"Failed to convert dict to CommandResponse: {e}")
return None
# Handle SDK wrapper CommandResponse class
if isinstance(result, CommandResponseWrapper):
try:
return api_remaining_pb2.CommandResponse(
response_type=result.response_type,
text=result.text,
username=result.username,
channel_id=result.channel_id,
icon_url=result.icon_url,
type=result.type,
goto_location=result.goto_location,
trigger_id=result.trigger_id,
skip_slack_parsing=result.skip_slack_parsing,
)
except Exception as e:
logger.warning(f"Failed to convert CommandResponse wrapper: {e}")
return None
# Handle protobuf CommandResponse directly
if isinstance(result, api_remaining_pb2.CommandResponse):
return result
# Unknown type
logger.warning(f"ExecuteCommand returned unexpected type: {type(result)}")
return None
class PluginHooksServicerImpl(hooks_pb2_grpc.PluginHooksServicer):
"""
gRPC servicer implementation for plugin hooks.
This class receives hook invocations from the Mattermost server and
dispatches them to the appropriate plugin handler methods.
Attributes:
plugin: The Plugin instance that implements hook handlers.
runner: HookRunner instance for invoking handlers with timeout.
"""
def __init__(
self,
plugin: "Plugin",
timeout: float = DEFAULT_HOOK_TIMEOUT,
) -> None:
"""
Initialize the hook servicer.
Args:
plugin: The Plugin instance to dispatch hooks to.
timeout: Default timeout for hook invocations in seconds.
"""
self.plugin = plugin
self.runner = HookRunner(timeout=timeout)
self._logger = logging.getLogger(
f"{__name__}.{plugin.__class__.__name__}"
)
# =========================================================================
# IMPLEMENTED RPC
# =========================================================================
async def Implemented(
self,
request: hooks_lifecycle_pb2.ImplementedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.ImplementedResponse:
"""
Return the list of hooks implemented by this plugin.
This method queries the plugin's hook registry to determine which
hooks have registered handlers.
Args:
request: The ImplementedRequest (contains context info).
context: The gRPC servicer context.
Returns:
ImplementedResponse containing list of canonical hook names.
"""
hooks = self.plugin.implemented_hooks()
self._logger.info(f"[DEBUG] Implemented() called, returning hooks: {hooks}")
print(f"[DEBUG] Implemented() called, returning hooks: {hooks}", flush=True)
return hooks_lifecycle_pb2.ImplementedResponse(hooks=hooks)
# =========================================================================
# LIFECYCLE HOOKS
# =========================================================================
async def OnActivate(
self,
request: hooks_lifecycle_pb2.OnActivateRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.OnActivateResponse:
"""
Handle plugin activation.
If the handler returns an error or raises an exception, the plugin
activation will fail. The error is encoded in the response.error field.
Args:
request: The OnActivateRequest.
context: The gRPC servicer context.
Returns:
OnActivateResponse, with error field set on failure.
"""
hook_name = "OnActivate"
self._logger.info(f"[DEBUG] OnActivate gRPC received")
print(f"[DEBUG] OnActivate gRPC received", flush=True)
if not self.plugin.has_hook(hook_name):
# Hook not implemented - activation succeeds by default
self._logger.info(f"[DEBUG] OnActivate hook not registered, returning success")
print(f"[DEBUG] OnActivate hook not registered, returning success", flush=True)
return hooks_lifecycle_pb2.OnActivateResponse()
self._logger.info(f"[DEBUG] OnActivate hook found, invoking handler")
print(f"[DEBUG] OnActivate hook found, invoking handler", flush=True)
handler = self.plugin.get_hook_handler(hook_name)
# Don't pass gRPC context - we encode errors in response.error field
result, error = await self.runner.invoke(
handler,
hook_name=hook_name,
)
self._logger.info(f"[DEBUG] OnActivate handler returned, result={result}, error={error}")
print(f"[DEBUG] OnActivate handler returned, result={result}, error={error}", flush=True)
if error is not None:
self._logger.error(f"OnActivate failed: {error}")
return hooks_lifecycle_pb2.OnActivateResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.on_activate.error",
where="OnActivate",
),
)
# Handler may return an error explicitly
if isinstance(result, Exception) or isinstance(result, str) and result:
error_msg = str(result)
self._logger.error(f"OnActivate returned error: {error_msg}")
return hooks_lifecycle_pb2.OnActivateResponse(
error=_make_app_error(
message=error_msg,
error_id="plugin.on_activate.error",
where="OnActivate",
),
)
return hooks_lifecycle_pb2.OnActivateResponse()
async def OnDeactivate(
self,
request: hooks_lifecycle_pb2.OnDeactivateRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.OnDeactivateResponse:
"""
Handle plugin deactivation.
This is best-effort - errors are logged but don't prevent shutdown.
Args:
request: The OnDeactivateRequest.
context: The gRPC servicer context.
Returns:
OnDeactivateResponse, with error field set on failure (informational).
"""
hook_name = "OnDeactivate"
if not self.plugin.has_hook(hook_name):
return hooks_lifecycle_pb2.OnDeactivateResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Don't pass gRPC context - we encode errors in response.error field
result, error = await self.runner.invoke(
handler,
hook_name=hook_name,
)
if error is not None:
self._logger.warning(f"OnDeactivate error (continuing): {error}")
return hooks_lifecycle_pb2.OnDeactivateResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.on_deactivate.error",
where="OnDeactivate",
),
)
return hooks_lifecycle_pb2.OnDeactivateResponse()
async def OnConfigurationChange(
self,
request: hooks_lifecycle_pb2.OnConfigurationChangeRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.OnConfigurationChangeResponse:
"""
Handle configuration change notification.
Errors are logged but do not stop the plugin.
Args:
request: The OnConfigurationChangeRequest.
context: The gRPC servicer context.
Returns:
OnConfigurationChangeResponse, with error field set on failure.
"""
hook_name = "OnConfigurationChange"
if not self.plugin.has_hook(hook_name):
return hooks_lifecycle_pb2.OnConfigurationChangeResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Don't pass gRPC context - we encode errors in response.error field
result, error = await self.runner.invoke(
handler,
hook_name=hook_name,
)
if error is not None:
self._logger.warning(
f"OnConfigurationChange error (continuing): {error}"
)
return hooks_lifecycle_pb2.OnConfigurationChangeResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.on_configuration_change.error",
where="OnConfigurationChange",
),
)
return hooks_lifecycle_pb2.OnConfigurationChangeResponse()
# =========================================================================
# MESSAGE HOOKS
# =========================================================================
async def MessageWillBePosted(
self,
request: hooks_message_pb2.MessageWillBePostedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.MessageWillBePostedResponse:
"""
Handle MessageWillBePosted hook.
This hook is called before a post is saved. The handler can:
- Allow unchanged: return (None, "")
- Modify: return (modified_post, "")
- Reject: return (None, "rejection reason")
- Dismiss silently: return (None, DISMISS_POST_ERROR)
Args:
request: Contains context, plugin_context, and post.
context: The gRPC servicer context.
Returns:
MessageWillBePostedResponse with modified_post and/or rejection_reason.
"""
hook_name = "MessageWillBePosted"
if not self.plugin.has_hook(hook_name):
# Not implemented - allow post unchanged
return hooks_message_pb2.MessageWillBePostedResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Invoke handler with protobuf objects
# Handler signature: (context, post) -> (post, string) or (post, rejection)
result, error = await self.runner.invoke(
handler,
request.plugin_context,
request.post,
hook_name=hook_name,
context=context,
)
if error is not None:
# Handler exception - treat as rejection with error message
self._logger.error(f"MessageWillBePosted error: {error}")
return hooks_message_pb2.MessageWillBePostedResponse(
rejection_reason=f"Plugin error: {error}",
)
# Parse handler result
modified_post = None
rejection_reason = ""
if result is None:
# Allow unchanged
pass
elif isinstance(result, tuple) and len(result) == 2:
post_result, reason = result
if post_result is not None:
modified_post = post_result
if reason:
rejection_reason = str(reason)
elif hasattr(result, "id"):
# Looks like a Post object - treat as modified post
modified_post = result
elif isinstance(result, str):
# String result - treat as rejection reason
rejection_reason = result
return hooks_message_pb2.MessageWillBePostedResponse(
modified_post=modified_post,
rejection_reason=rejection_reason,
)
async def MessageWillBeUpdated(
self,
request: hooks_message_pb2.MessageWillBeUpdatedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.MessageWillBeUpdatedResponse:
"""
Handle MessageWillBeUpdated hook.
Similar to MessageWillBePosted but for post updates.
Handler receives both new_post and old_post.
Args:
request: Contains context, plugin_context, new_post, and old_post.
context: The gRPC servicer context.
Returns:
MessageWillBeUpdatedResponse with modified_post and/or rejection_reason.
"""
hook_name = "MessageWillBeUpdated"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.MessageWillBeUpdatedResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (context, new_post, old_post) -> (post, string)
result, error = await self.runner.invoke(
handler,
request.plugin_context,
request.new_post,
request.old_post,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"MessageWillBeUpdated error: {error}")
return hooks_message_pb2.MessageWillBeUpdatedResponse(
rejection_reason=f"Plugin error: {error}",
)
# Parse handler result (same logic as MessageWillBePosted)
modified_post = None
rejection_reason = ""
if result is None:
pass
elif isinstance(result, tuple) and len(result) == 2:
post_result, reason = result
if post_result is not None:
modified_post = post_result
if reason:
rejection_reason = str(reason)
elif hasattr(result, "id"):
modified_post = result
elif isinstance(result, str):
rejection_reason = result
return hooks_message_pb2.MessageWillBeUpdatedResponse(
modified_post=modified_post,
rejection_reason=rejection_reason,
)
async def MessageHasBeenPosted(
self,
request: hooks_message_pb2.MessageHasBeenPostedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.MessageHasBeenPostedResponse:
"""
Handle MessageHasBeenPosted notification.
This is a fire-and-forget notification. Errors are logged but
the response is always empty.
Args:
request: Contains context, plugin_context, and post.
context: The gRPC servicer context.
Returns:
Empty MessageHasBeenPostedResponse.
"""
hook_name = "MessageHasBeenPosted"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.MessageHasBeenPostedResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (context, post) -> None
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.post,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"MessageHasBeenPosted error: {error}")
# Notification hook - still return success
return hooks_message_pb2.MessageHasBeenPostedResponse()
async def MessageHasBeenUpdated(
self,
request: hooks_message_pb2.MessageHasBeenUpdatedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.MessageHasBeenUpdatedResponse:
"""
Handle MessageHasBeenUpdated notification.
Fire-and-forget notification for post updates.
Args:
request: Contains context, plugin_context, new_post, and old_post.
context: The gRPC servicer context.
Returns:
Empty MessageHasBeenUpdatedResponse.
"""
hook_name = "MessageHasBeenUpdated"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.MessageHasBeenUpdatedResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (context, new_post, old_post) -> None
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.new_post,
request.old_post,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"MessageHasBeenUpdated error: {error}")
return hooks_message_pb2.MessageHasBeenUpdatedResponse()
async def MessageHasBeenDeleted(
self,
request: hooks_message_pb2.MessageHasBeenDeletedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.MessageHasBeenDeletedResponse:
"""
Handle MessageHasBeenDeleted notification.
Fire-and-forget notification for post deletions.
Args:
request: Contains context, plugin_context, and post.
context: The gRPC servicer context.
Returns:
Empty MessageHasBeenDeletedResponse.
"""
hook_name = "MessageHasBeenDeleted"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.MessageHasBeenDeletedResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (context, post) -> None
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.post,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"MessageHasBeenDeleted error: {error}")
return hooks_message_pb2.MessageHasBeenDeletedResponse()
async def MessagesWillBeConsumed(
self,
request: hooks_message_pb2.MessagesWillBeConsumedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.MessagesWillBeConsumedResponse:
"""
Handle MessagesWillBeConsumed hook.
Called when posts are being delivered to a client. The handler can
modify the list of posts (e.g., filter, redact).
Args:
request: Contains context and list of posts.
context: The gRPC servicer context.
Returns:
MessagesWillBeConsumedResponse with (possibly modified) posts.
"""
hook_name = "MessagesWillBeConsumed"
if not self.plugin.has_hook(hook_name):
# Return original posts unchanged
return hooks_message_pb2.MessagesWillBeConsumedResponse(
posts=list(request.posts),
)
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (posts) -> posts
# Note: No Context parameter for this hook
result, error = await self.runner.invoke(
handler,
list(request.posts),
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"MessagesWillBeConsumed error: {error}")
# On error, return original posts
return hooks_message_pb2.MessagesWillBeConsumedResponse(
posts=list(request.posts),
)
# Handler should return list of posts
if result is None:
posts = list(request.posts)
elif isinstance(result, (list, tuple)):
posts = list(result)
else:
self._logger.warning(
f"MessagesWillBeConsumed returned unexpected type: {type(result)}"
)
posts = list(request.posts)
return hooks_message_pb2.MessagesWillBeConsumedResponse(posts=posts)
# =========================================================================
# HOOK IMPLEMENTATION MATRIX (remaining hooks for 07-03)
# =========================================================================
#
# Hook Name | Python Handler | Return Semantics | Default Behavior
# -----------------------------------|------------------------|---------------------------|------------------
# ExecuteCommand | execute_command | (CommandResponse, error) | UNIMPLEMENTED
# ConfigurationWillBeSaved | configuration_will_be_saved | (config, error) | allow unchanged
# UserHasBeenCreated | user_has_been_created | void (fire-and-forget) | no-op
# UserWillLogIn | user_will_log_in | rejection_reason string | allow (empty string)
# UserHasLoggedIn | user_has_logged_in | void (fire-and-forget) | no-op
# UserHasBeenDeactivated | user_has_been_deactivated | void (fire-and-forget) | no-op
# ChannelHasBeenCreated | channel_has_been_created | void (fire-and-forget) | no-op
# UserHasJoinedChannel | user_has_joined_channel | void (fire-and-forget) | no-op
# UserHasLeftChannel | user_has_left_channel | void (fire-and-forget) | no-op
# UserHasJoinedTeam | user_has_joined_team | void (fire-and-forget) | no-op
# UserHasLeftTeam | user_has_left_team | void (fire-and-forget) | no-op
# ReactionHasBeenAdded | reaction_has_been_added | void (fire-and-forget) | no-op
# ReactionHasBeenRemoved | reaction_has_been_removed | void (fire-and-forget) | no-op
# NotificationWillBePushed | notification_will_be_pushed | (notification, rejection) | allow
# EmailNotificationWillBeSent | email_notification_will_be_sent | (content, rejection) | allow
# PreferencesHaveChanged | preferences_have_changed | void (fire-and-forget) | no-op
# OnInstall | on_install | error | success
# OnSendDailyTelemetry | on_send_daily_telemetry | void (fire-and-forget) | no-op
# RunDataRetention | run_data_retention | (deleted_count, error) | (0, nil)
# OnCloudLimitsUpdated | on_cloud_limits_updated | void (fire-and-forget) | no-op
# OnWebSocketConnect | on_web_socket_connect | void (fire-and-forget) | no-op
# OnWebSocketDisconnect | on_web_socket_disconnect | void (fire-and-forget) | no-op
# WebSocketMessageHasBeenPosted | web_socket_message_has_been_posted | void | no-op
# OnPluginClusterEvent | on_plugin_cluster_event | void (fire-and-forget) | no-op
# OnSharedChannelsSyncMsg | on_shared_channels_sync_msg | (SyncResponse, error) | UNIMPLEMENTED
# OnSharedChannelsPing | on_shared_channels_ping | bool (healthy) | true
# OnSharedChannelsAttachmentSyncMsg | on_shared_channels_attachment_sync_msg | error | success
# OnSharedChannelsProfileImageSyncMsg| on_shared_channels_profile_image_sync_msg | error | success
# GenerateSupportData | generate_support_data | ([]FileData, error) | ([], nil)
# OnSAMLLogin | on_saml_login | error | success
#
# DEFERRED TO PHASE 8 (Streaming):
# - ServeHTTP: HTTP request/response streaming
# - ServeMetrics: HTTP request/response streaming
# - FileWillBeUploaded: Large file streaming (implemented as unary but may have size limits)
# =========================================================================
# =========================================================================
# COMMAND HOOKS
# =========================================================================
async def ExecuteCommand(
self,
request: hooks_command_pb2.ExecuteCommandRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.ExecuteCommandResponse:
"""
Handle ExecuteCommand hook.
Called when a slash command registered by this plugin is invoked.
Handler should return a CommandResponse or raise an error.
Args:
request: Contains context, plugin_context, and command args.
context: The gRPC servicer context.
Returns:
ExecuteCommandResponse with response or error.
"""
hook_name = "ExecuteCommand"
if not self.plugin.has_hook(hook_name):
# Command hook not implemented - return error
return hooks_command_pb2.ExecuteCommandResponse(
error=_make_app_error(
message="ExecuteCommand hook not implemented",
error_id="plugin.execute_command.not_implemented",
status_code=501,
where="ExecuteCommand",
),
)
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (context, args) -> CommandResponse or error
result, error = await self.runner.invoke(
handler,
request.plugin_context,
request.args,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"ExecuteCommand error: {error}")
return hooks_command_pb2.ExecuteCommandResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.execute_command.error",
where="ExecuteCommand",
),
)
# Convert handler result to protobuf CommandResponse
# Supports: dict, CommandResponse wrapper, or protobuf directly
command_response = _to_command_response_proto(result, self._logger)
if command_response is not None:
return hooks_command_pb2.ExecuteCommandResponse(response=command_response)
else:
# result was None or conversion failed (warning already logged)
return hooks_command_pb2.ExecuteCommandResponse()
# =========================================================================
# CONFIGURATION HOOKS
# =========================================================================
async def ConfigurationWillBeSaved(
self,
request: hooks_lifecycle_pb2.ConfigurationWillBeSavedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse:
"""
Handle ConfigurationWillBeSaved hook.
Called before configuration is saved. Handler can return a modified
config or an error to reject the save.
Args:
request: Contains context and new_config (as ConfigJson).
context: The gRPC servicer context.
Returns:
ConfigurationWillBeSavedResponse with optional modified_config or error.
"""
hook_name = "ConfigurationWillBeSaved"
if not self.plugin.has_hook(hook_name):
# Not implemented - allow unchanged
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (config_json) -> (config_json, error) or just config_json
result, error = await self.runner.invoke(
handler,
request.new_config,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"ConfigurationWillBeSaved error: {error}")
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.configuration_will_be_saved.error",
where="ConfigurationWillBeSaved",
),
)
# Parse result
if result is None:
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse()
elif isinstance(result, tuple) and len(result) == 2:
config, err = result
if err:
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse(
error=_make_app_error(
message=str(err),
error_id="plugin.configuration_will_be_saved.rejected",
where="ConfigurationWillBeSaved",
),
)
if config is not None:
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse(
modified_config=config
)
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse()
elif hasattr(result, "config_json"):
# Looks like a ConfigJson
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse(
modified_config=result
)
else:
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse()
# =========================================================================
# USER LIFECYCLE HOOKS
# =========================================================================
async def UserHasBeenCreated(
self,
request: hooks_user_channel_pb2.UserHasBeenCreatedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.UserHasBeenCreatedResponse:
"""
Handle UserHasBeenCreated notification.
Fire-and-forget notification when a user is created.
Args:
request: Contains context, plugin_context, and user.
context: The gRPC servicer context.
Returns:
Empty UserHasBeenCreatedResponse.
"""
hook_name = "UserHasBeenCreated"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.UserHasBeenCreatedResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.user,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"UserHasBeenCreated error: {error}")
# Fire-and-forget - still return success
return hooks_user_channel_pb2.UserHasBeenCreatedResponse()
async def UserWillLogIn(
self,
request: hooks_user_channel_pb2.UserWillLogInRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.UserWillLogInResponse:
"""
Handle UserWillLogIn hook.
Called before a user logs in. Return a non-empty string to reject.
Args:
request: Contains context, plugin_context, and user.
context: The gRPC servicer context.
Returns:
UserWillLogInResponse with rejection_reason (empty to allow).
"""
hook_name = "UserWillLogIn"
if not self.plugin.has_hook(hook_name):
# Not implemented - allow login
return hooks_user_channel_pb2.UserWillLogInResponse(rejection_reason="")
handler = self.plugin.get_hook_handler(hook_name)
result, error = await self.runner.invoke(
handler,
request.plugin_context,
request.user,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"UserWillLogIn error: {error}")
# On error, reject the login
return hooks_user_channel_pb2.UserWillLogInResponse(
rejection_reason=f"Plugin error: {error}"
)
# Handler returns string rejection reason (empty = allow)
rejection_reason = ""
if result is not None and isinstance(result, str):
rejection_reason = result
return hooks_user_channel_pb2.UserWillLogInResponse(
rejection_reason=rejection_reason
)
async def UserHasLoggedIn(
self,
request: hooks_user_channel_pb2.UserHasLoggedInRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.UserHasLoggedInResponse:
"""
Handle UserHasLoggedIn notification.
Fire-and-forget notification after user login.
Args:
request: Contains context, plugin_context, and user.
context: The gRPC servicer context.
Returns:
Empty UserHasLoggedInResponse.
"""
hook_name = "UserHasLoggedIn"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.UserHasLoggedInResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.user,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"UserHasLoggedIn error: {error}")
return hooks_user_channel_pb2.UserHasLoggedInResponse()
async def UserHasBeenDeactivated(
self,
request: hooks_user_channel_pb2.UserHasBeenDeactivatedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.UserHasBeenDeactivatedResponse:
"""
Handle UserHasBeenDeactivated notification.
Fire-and-forget notification when a user is deactivated.
Args:
request: Contains context, plugin_context, and user.
context: The gRPC servicer context.
Returns:
Empty UserHasBeenDeactivatedResponse.
"""
hook_name = "UserHasBeenDeactivated"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.UserHasBeenDeactivatedResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.user,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"UserHasBeenDeactivated error: {error}")
return hooks_user_channel_pb2.UserHasBeenDeactivatedResponse()
# =========================================================================
# CHANNEL AND TEAM HOOKS
# =========================================================================
async def ChannelHasBeenCreated(
self,
request: hooks_user_channel_pb2.ChannelHasBeenCreatedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.ChannelHasBeenCreatedResponse:
"""
Handle ChannelHasBeenCreated notification.
Fire-and-forget notification when a channel is created.
Args:
request: Contains context, plugin_context, and channel.
context: The gRPC servicer context.
Returns:
Empty ChannelHasBeenCreatedResponse.
"""
hook_name = "ChannelHasBeenCreated"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.ChannelHasBeenCreatedResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.channel,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"ChannelHasBeenCreated error: {error}")
return hooks_user_channel_pb2.ChannelHasBeenCreatedResponse()
async def UserHasJoinedChannel(
self,
request: hooks_user_channel_pb2.UserHasJoinedChannelRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.UserHasJoinedChannelResponse:
"""
Handle UserHasJoinedChannel notification.
Fire-and-forget notification when user joins a channel.
Actor is optional (nil if self-join).
Args:
request: Contains context, plugin_context, channel_member, and optional actor.
context: The gRPC servicer context.
Returns:
Empty UserHasJoinedChannelResponse.
"""
hook_name = "UserHasJoinedChannel"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.UserHasJoinedChannelResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Actor may be None (optional field)
actor = request.actor if request.HasField("actor") else None
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.channel_member,
actor,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"UserHasJoinedChannel error: {error}")
return hooks_user_channel_pb2.UserHasJoinedChannelResponse()
async def UserHasLeftChannel(
self,
request: hooks_user_channel_pb2.UserHasLeftChannelRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.UserHasLeftChannelResponse:
"""
Handle UserHasLeftChannel notification.
Fire-and-forget notification when user leaves a channel.
Actor is optional (nil if self-removal).
Args:
request: Contains context, plugin_context, channel_member, and optional actor.
context: The gRPC servicer context.
Returns:
Empty UserHasLeftChannelResponse.
"""
hook_name = "UserHasLeftChannel"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.UserHasLeftChannelResponse()
handler = self.plugin.get_hook_handler(hook_name)
actor = request.actor if request.HasField("actor") else None
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.channel_member,
actor,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"UserHasLeftChannel error: {error}")
return hooks_user_channel_pb2.UserHasLeftChannelResponse()
async def UserHasJoinedTeam(
self,
request: hooks_user_channel_pb2.UserHasJoinedTeamRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.UserHasJoinedTeamResponse:
"""
Handle UserHasJoinedTeam notification.
Fire-and-forget notification when user joins a team.
Actor is optional (nil if self-join).
Args:
request: Contains context, plugin_context, team_member, and optional actor.
context: The gRPC servicer context.
Returns:
Empty UserHasJoinedTeamResponse.
"""
hook_name = "UserHasJoinedTeam"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.UserHasJoinedTeamResponse()
handler = self.plugin.get_hook_handler(hook_name)
actor = request.actor if request.HasField("actor") else None
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.team_member,
actor,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"UserHasJoinedTeam error: {error}")
return hooks_user_channel_pb2.UserHasJoinedTeamResponse()
async def UserHasLeftTeam(
self,
request: hooks_user_channel_pb2.UserHasLeftTeamRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.UserHasLeftTeamResponse:
"""
Handle UserHasLeftTeam notification.
Fire-and-forget notification when user leaves a team.
Actor is optional (nil if self-removal).
Args:
request: Contains context, plugin_context, team_member, and optional actor.
context: The gRPC servicer context.
Returns:
Empty UserHasLeftTeamResponse.
"""
hook_name = "UserHasLeftTeam"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.UserHasLeftTeamResponse()
handler = self.plugin.get_hook_handler(hook_name)
actor = request.actor if request.HasField("actor") else None
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.team_member,
actor,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"UserHasLeftTeam error: {error}")
return hooks_user_channel_pb2.UserHasLeftTeamResponse()
# =========================================================================
# REACTION HOOKS
# =========================================================================
async def ReactionHasBeenAdded(
self,
request: hooks_message_pb2.ReactionHasBeenAddedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.ReactionHasBeenAddedResponse:
"""
Handle ReactionHasBeenAdded notification.
Fire-and-forget notification when a reaction is added.
Args:
request: Contains context, plugin_context, and reaction.
context: The gRPC servicer context.
Returns:
Empty ReactionHasBeenAddedResponse.
"""
hook_name = "ReactionHasBeenAdded"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.ReactionHasBeenAddedResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.reaction,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"ReactionHasBeenAdded error: {error}")
return hooks_message_pb2.ReactionHasBeenAddedResponse()
async def ReactionHasBeenRemoved(
self,
request: hooks_message_pb2.ReactionHasBeenRemovedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.ReactionHasBeenRemovedResponse:
"""
Handle ReactionHasBeenRemoved notification.
Fire-and-forget notification when a reaction is removed.
Args:
request: Contains context, plugin_context, and reaction.
context: The gRPC servicer context.
Returns:
Empty ReactionHasBeenRemovedResponse.
"""
hook_name = "ReactionHasBeenRemoved"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.ReactionHasBeenRemovedResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.reaction,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"ReactionHasBeenRemoved error: {error}")
return hooks_message_pb2.ReactionHasBeenRemovedResponse()
# =========================================================================
# NOTIFICATION HOOKS
# =========================================================================
async def NotificationWillBePushed(
self,
request: hooks_message_pb2.NotificationWillBePushedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.NotificationWillBePushedResponse:
"""
Handle NotificationWillBePushed hook.
Called before a push notification is sent. Handler can:
- Allow unchanged: return (None, "")
- Modify: return (modified_notification, "")
- Reject: return (None, "rejection reason")
Args:
request: Contains context, push_notification, and user_id.
context: The gRPC servicer context.
Returns:
NotificationWillBePushedResponse with modified_notification and/or rejection_reason.
"""
hook_name = "NotificationWillBePushed"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.NotificationWillBePushedResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (push_notification, user_id) -> (notification, rejection)
result, error = await self.runner.invoke(
handler,
request.push_notification,
request.user_id,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"NotificationWillBePushed error: {error}")
return hooks_message_pb2.NotificationWillBePushedResponse(
rejection_reason=f"Plugin error: {error}",
)
# Parse result
modified = None
rejection_reason = ""
if result is None:
pass
elif isinstance(result, tuple) and len(result) == 2:
notif, reason = result
if notif is not None:
modified = notif
if reason:
rejection_reason = str(reason)
elif hasattr(result, "platform"):
# Looks like a PushNotification
modified = result
elif isinstance(result, str):
rejection_reason = result
return hooks_message_pb2.NotificationWillBePushedResponse(
modified_notification=modified,
rejection_reason=rejection_reason,
)
async def EmailNotificationWillBeSent(
self,
request: hooks_message_pb2.EmailNotificationWillBeSentRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.EmailNotificationWillBeSentResponse:
"""
Handle EmailNotificationWillBeSent hook.
Called before an email notification is sent. Handler can:
- Allow unchanged: return (None, "")
- Modify: return (modified_content, "")
- Reject: return (None, "rejection reason")
Args:
request: Contains context and email_notification (as EmailNotificationJson).
context: The gRPC servicer context.
Returns:
EmailNotificationWillBeSentResponse with modified_content and/or rejection_reason.
"""
hook_name = "EmailNotificationWillBeSent"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.EmailNotificationWillBeSentResponse()
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (email_notification) -> (content, rejection)
result, error = await self.runner.invoke(
handler,
request.email_notification,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"EmailNotificationWillBeSent error: {error}")
return hooks_message_pb2.EmailNotificationWillBeSentResponse(
rejection_reason=f"Plugin error: {error}",
)
# Parse result
modified = None
rejection_reason = ""
if result is None:
pass
elif isinstance(result, tuple) and len(result) == 2:
content, reason = result
if content is not None:
modified = content
if reason:
rejection_reason = str(reason)
elif hasattr(result, "subject"):
# Looks like EmailNotificationContent
modified = result
elif isinstance(result, str):
rejection_reason = result
return hooks_message_pb2.EmailNotificationWillBeSentResponse(
modified_content=modified,
rejection_reason=rejection_reason,
)
# =========================================================================
# PREFERENCES HOOKS
# =========================================================================
async def PreferencesHaveChanged(
self,
request: hooks_message_pb2.PreferencesHaveChangedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_message_pb2.PreferencesHaveChangedResponse:
"""
Handle PreferencesHaveChanged notification.
Fire-and-forget notification when user preferences change.
Args:
request: Contains context, plugin_context, and preferences list.
context: The gRPC servicer context.
Returns:
Empty PreferencesHaveChangedResponse.
"""
hook_name = "PreferencesHaveChanged"
if not self.plugin.has_hook(hook_name):
return hooks_message_pb2.PreferencesHaveChangedResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.plugin_context,
list(request.preferences),
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"PreferencesHaveChanged error: {error}")
return hooks_message_pb2.PreferencesHaveChangedResponse()
# =========================================================================
# SYSTEM HOOKS
# =========================================================================
async def OnInstall(
self,
request: hooks_lifecycle_pb2.OnInstallRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.OnInstallResponse:
"""
Handle OnInstall hook.
Called after plugin installation. Return error to indicate failure.
Args:
request: Contains context, plugin_context, and install event.
context: The gRPC servicer context.
Returns:
OnInstallResponse with optional error.
"""
hook_name = "OnInstall"
if not self.plugin.has_hook(hook_name):
return hooks_lifecycle_pb2.OnInstallResponse()
handler = self.plugin.get_hook_handler(hook_name)
result, error = await self.runner.invoke(
handler,
request.plugin_context,
request.event,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnInstall error: {error}")
return hooks_lifecycle_pb2.OnInstallResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.on_install.error",
where="OnInstall",
),
)
# Handler may return an error explicitly
if isinstance(result, Exception) or (isinstance(result, str) and result):
return hooks_lifecycle_pb2.OnInstallResponse(
error=_make_app_error(
message=str(result),
error_id="plugin.on_install.error",
where="OnInstall",
),
)
return hooks_lifecycle_pb2.OnInstallResponse()
async def OnSendDailyTelemetry(
self,
request: hooks_lifecycle_pb2.OnSendDailyTelemetryRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.OnSendDailyTelemetryResponse:
"""
Handle OnSendDailyTelemetry notification.
Fire-and-forget notification when daily telemetry is sent.
Args:
request: Contains context.
context: The gRPC servicer context.
Returns:
Empty OnSendDailyTelemetryResponse.
"""
hook_name = "OnSendDailyTelemetry"
if not self.plugin.has_hook(hook_name):
return hooks_lifecycle_pb2.OnSendDailyTelemetryResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnSendDailyTelemetry error: {error}")
return hooks_lifecycle_pb2.OnSendDailyTelemetryResponse()
async def RunDataRetention(
self,
request: hooks_lifecycle_pb2.RunDataRetentionRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.RunDataRetentionResponse:
"""
Handle RunDataRetention hook.
Called during data retention job. Handler should delete old data
and return the count of deleted items.
Args:
request: Contains context, now_time, and batch_size.
context: The gRPC servicer context.
Returns:
RunDataRetentionResponse with deleted_count and optional error.
"""
hook_name = "RunDataRetention"
if not self.plugin.has_hook(hook_name):
return hooks_lifecycle_pb2.RunDataRetentionResponse(deleted_count=0)
handler = self.plugin.get_hook_handler(hook_name)
# Handler signature: (now_time, batch_size) -> (deleted_count, error)
result, error = await self.runner.invoke(
handler,
request.now_time,
request.batch_size,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"RunDataRetention error: {error}")
return hooks_lifecycle_pb2.RunDataRetentionResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.run_data_retention.error",
where="RunDataRetention",
),
deleted_count=0,
)
# Parse result
deleted_count = 0
if result is None:
pass
elif isinstance(result, tuple) and len(result) == 2:
count, err = result
if count is not None:
deleted_count = int(count)
if err:
return hooks_lifecycle_pb2.RunDataRetentionResponse(
error=_make_app_error(
message=str(err),
error_id="plugin.run_data_retention.error",
where="RunDataRetention",
),
deleted_count=deleted_count,
)
elif isinstance(result, int):
deleted_count = result
return hooks_lifecycle_pb2.RunDataRetentionResponse(deleted_count=deleted_count)
async def OnCloudLimitsUpdated(
self,
request: hooks_lifecycle_pb2.OnCloudLimitsUpdatedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_lifecycle_pb2.OnCloudLimitsUpdatedResponse:
"""
Handle OnCloudLimitsUpdated notification.
Fire-and-forget notification when cloud limits change.
Args:
request: Contains context and limits.
context: The gRPC servicer context.
Returns:
Empty OnCloudLimitsUpdatedResponse.
"""
hook_name = "OnCloudLimitsUpdated"
if not self.plugin.has_hook(hook_name):
return hooks_lifecycle_pb2.OnCloudLimitsUpdatedResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.limits,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnCloudLimitsUpdated error: {error}")
return hooks_lifecycle_pb2.OnCloudLimitsUpdatedResponse()
# =========================================================================
# WEBSOCKET HOOKS
# =========================================================================
async def OnWebSocketConnect(
self,
request: hooks_command_pb2.OnWebSocketConnectRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.OnWebSocketConnectResponse:
"""
Handle OnWebSocketConnect notification.
Fire-and-forget notification when a WebSocket connects.
Args:
request: Contains context, web_conn_id, and user_id.
context: The gRPC servicer context.
Returns:
Empty OnWebSocketConnectResponse.
"""
hook_name = "OnWebSocketConnect"
if not self.plugin.has_hook(hook_name):
return hooks_command_pb2.OnWebSocketConnectResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.web_conn_id,
request.user_id,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnWebSocketConnect error: {error}")
return hooks_command_pb2.OnWebSocketConnectResponse()
async def OnWebSocketDisconnect(
self,
request: hooks_command_pb2.OnWebSocketDisconnectRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.OnWebSocketDisconnectResponse:
"""
Handle OnWebSocketDisconnect notification.
Fire-and-forget notification when a WebSocket disconnects.
Args:
request: Contains context, web_conn_id, and user_id.
context: The gRPC servicer context.
Returns:
Empty OnWebSocketDisconnectResponse.
"""
hook_name = "OnWebSocketDisconnect"
if not self.plugin.has_hook(hook_name):
return hooks_command_pb2.OnWebSocketDisconnectResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.web_conn_id,
request.user_id,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnWebSocketDisconnect error: {error}")
return hooks_command_pb2.OnWebSocketDisconnectResponse()
async def WebSocketMessageHasBeenPosted(
self,
request: hooks_command_pb2.WebSocketMessageHasBeenPostedRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.WebSocketMessageHasBeenPostedResponse:
"""
Handle WebSocketMessageHasBeenPosted notification.
Fire-and-forget notification when a WebSocket message is received.
Args:
request: Contains context, web_conn_id, user_id, and request.
context: The gRPC servicer context.
Returns:
Empty WebSocketMessageHasBeenPostedResponse.
"""
hook_name = "WebSocketMessageHasBeenPosted"
if not self.plugin.has_hook(hook_name):
return hooks_command_pb2.WebSocketMessageHasBeenPostedResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.web_conn_id,
request.user_id,
request.request,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"WebSocketMessageHasBeenPosted error: {error}")
return hooks_command_pb2.WebSocketMessageHasBeenPostedResponse()
# =========================================================================
# CLUSTER HOOKS
# =========================================================================
async def OnPluginClusterEvent(
self,
request: hooks_command_pb2.OnPluginClusterEventRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.OnPluginClusterEventResponse:
"""
Handle OnPluginClusterEvent notification.
Fire-and-forget notification for intra-cluster plugin events.
Args:
request: Contains context, plugin_context, and event.
context: The gRPC servicer context.
Returns:
Empty OnPluginClusterEventResponse.
"""
hook_name = "OnPluginClusterEvent"
if not self.plugin.has_hook(hook_name):
return hooks_command_pb2.OnPluginClusterEventResponse()
handler = self.plugin.get_hook_handler(hook_name)
_, error = await self.runner.invoke(
handler,
request.plugin_context,
request.event,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnPluginClusterEvent error: {error}")
return hooks_command_pb2.OnPluginClusterEventResponse()
# =========================================================================
# SHARED CHANNELS HOOKS
# =========================================================================
async def OnSharedChannelsSyncMsg(
self,
request: hooks_command_pb2.OnSharedChannelsSyncMsgRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.OnSharedChannelsSyncMsgResponse:
"""
Handle OnSharedChannelsSyncMsg hook.
Called when a shared channels sync message is received.
Args:
request: Contains context, sync_msg, and remote_cluster.
context: The gRPC servicer context.
Returns:
OnSharedChannelsSyncMsgResponse with response or error.
"""
hook_name = "OnSharedChannelsSyncMsg"
if not self.plugin.has_hook(hook_name):
# Not implemented - return empty response (no sync)
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse()
handler = self.plugin.get_hook_handler(hook_name)
result, error = await self.runner.invoke(
handler,
request.sync_msg,
request.remote_cluster,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnSharedChannelsSyncMsg error: {error}")
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.on_shared_channels_sync_msg.error",
where="OnSharedChannelsSyncMsg",
),
)
# Parse result
if result is None:
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse()
elif isinstance(result, tuple) and len(result) == 2:
response, err = result
if err:
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse(
error=_make_app_error(
message=str(err),
error_id="plugin.on_shared_channels_sync_msg.error",
where="OnSharedChannelsSyncMsg",
),
)
if response is not None:
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse(
response=response
)
elif hasattr(result, "users_last_update_at"):
# Looks like a SyncResponse
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse(response=result)
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse()
async def OnSharedChannelsPing(
self,
request: hooks_command_pb2.OnSharedChannelsPingRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.OnSharedChannelsPingResponse:
"""
Handle OnSharedChannelsPing hook.
Called to check health of shared channels plugin connection.
Return True if healthy, False otherwise.
Args:
request: Contains context and remote_cluster.
context: The gRPC servicer context.
Returns:
OnSharedChannelsPingResponse with healthy status.
"""
hook_name = "OnSharedChannelsPing"
if not self.plugin.has_hook(hook_name):
# Not implemented - assume healthy
return hooks_command_pb2.OnSharedChannelsPingResponse(healthy=True)
handler = self.plugin.get_hook_handler(hook_name)
result, error = await self.runner.invoke(
handler,
request.remote_cluster,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnSharedChannelsPing error: {error}")
return hooks_command_pb2.OnSharedChannelsPingResponse(healthy=False)
# Result should be a boolean
healthy = bool(result) if result is not None else True
return hooks_command_pb2.OnSharedChannelsPingResponse(healthy=healthy)
async def OnSharedChannelsAttachmentSyncMsg(
self,
request: hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse:
"""
Handle OnSharedChannelsAttachmentSyncMsg hook.
Called when a file attachment sync message is received.
Args:
request: Contains context, file_info, post, and remote_cluster.
context: The gRPC servicer context.
Returns:
OnSharedChannelsAttachmentSyncMsgResponse with optional error.
"""
hook_name = "OnSharedChannelsAttachmentSyncMsg"
if not self.plugin.has_hook(hook_name):
return hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse()
handler = self.plugin.get_hook_handler(hook_name)
result, error = await self.runner.invoke(
handler,
request.file_info,
request.post,
request.remote_cluster,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnSharedChannelsAttachmentSyncMsg error: {error}")
return hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.on_shared_channels_attachment_sync_msg.error",
where="OnSharedChannelsAttachmentSyncMsg",
),
)
# Handler may return an error
if isinstance(result, Exception) or (isinstance(result, str) and result):
return hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse(
error=_make_app_error(
message=str(result),
error_id="plugin.on_shared_channels_attachment_sync_msg.error",
where="OnSharedChannelsAttachmentSyncMsg",
),
)
return hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse()
async def OnSharedChannelsProfileImageSyncMsg(
self,
request: hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse:
"""
Handle OnSharedChannelsProfileImageSyncMsg hook.
Called when a profile image sync message is received.
Args:
request: Contains context, user, and remote_cluster.
context: The gRPC servicer context.
Returns:
OnSharedChannelsProfileImageSyncMsgResponse with optional error.
"""
hook_name = "OnSharedChannelsProfileImageSyncMsg"
if not self.plugin.has_hook(hook_name):
return hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse()
handler = self.plugin.get_hook_handler(hook_name)
result, error = await self.runner.invoke(
handler,
request.user,
request.remote_cluster,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnSharedChannelsProfileImageSyncMsg error: {error}")
return hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.on_shared_channels_profile_image_sync_msg.error",
where="OnSharedChannelsProfileImageSyncMsg",
),
)
# Handler may return an error
if isinstance(result, Exception) or (isinstance(result, str) and result):
return hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse(
error=_make_app_error(
message=str(result),
error_id="plugin.on_shared_channels_profile_image_sync_msg.error",
where="OnSharedChannelsProfileImageSyncMsg",
),
)
return hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse()
# =========================================================================
# SUPPORT HOOKS
# =========================================================================
async def GenerateSupportData(
self,
request: hooks_command_pb2.GenerateSupportDataRequest,
context: grpc.aio.ServicerContext,
) -> hooks_command_pb2.GenerateSupportDataResponse:
"""
Handle GenerateSupportData hook.
Called when generating a support packet. Handler should return
a list of FileData objects to include in the packet.
Args:
request: Contains context and plugin_context.
context: The gRPC servicer context.
Returns:
GenerateSupportDataResponse with files list and optional error.
"""
hook_name = "GenerateSupportData"
if not self.plugin.has_hook(hook_name):
return hooks_command_pb2.GenerateSupportDataResponse(files=[])
handler = self.plugin.get_hook_handler(hook_name)
result, error = await self.runner.invoke(
handler,
request.plugin_context,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"GenerateSupportData error: {error}")
return hooks_command_pb2.GenerateSupportDataResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.generate_support_data.error",
where="GenerateSupportData",
),
files=[],
)
# Parse result
files = []
if result is None:
pass
elif isinstance(result, tuple) and len(result) == 2:
file_list, err = result
if err:
return hooks_command_pb2.GenerateSupportDataResponse(
error=_make_app_error(
message=str(err),
error_id="plugin.generate_support_data.error",
where="GenerateSupportData",
),
files=[],
)
if file_list is not None:
files = list(file_list)
elif isinstance(result, (list, tuple)):
files = list(result)
return hooks_command_pb2.GenerateSupportDataResponse(files=files)
# =========================================================================
# SAML HOOKS
# =========================================================================
async def OnSAMLLogin(
self,
request: hooks_user_channel_pb2.OnSAMLLoginRequest,
context: grpc.aio.ServicerContext,
) -> hooks_user_channel_pb2.OnSAMLLoginResponse:
"""
Handle OnSAMLLogin hook.
Called after a successful SAML login. Return error to reject.
Args:
request: Contains context, plugin_context, user, and assertion.
context: The gRPC servicer context.
Returns:
OnSAMLLoginResponse with optional error.
"""
hook_name = "OnSAMLLogin"
if not self.plugin.has_hook(hook_name):
return hooks_user_channel_pb2.OnSAMLLoginResponse()
handler = self.plugin.get_hook_handler(hook_name)
result, error = await self.runner.invoke(
handler,
request.plugin_context,
request.user,
request.assertion,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"OnSAMLLogin error: {error}")
return hooks_user_channel_pb2.OnSAMLLoginResponse(
error=_make_app_error(
message=str(error),
error_id="plugin.on_saml_login.error",
where="OnSAMLLogin",
),
)
# Handler may return an error
if isinstance(result, Exception) or (isinstance(result, str) and result):
return hooks_user_channel_pb2.OnSAMLLoginResponse(
error=_make_app_error(
message=str(result),
error_id="plugin.on_saml_login.error",
where="OnSAMLLogin",
),
)
return hooks_user_channel_pb2.OnSAMLLoginResponse()
# =========================================================================
# HTTP STREAMING HOOKS (Phase 8)
# =========================================================================
async def ServeHTTP(
self,
request_iterator, # AsyncIterator[hooks_http_pb2.ServeHTTPRequest]
context: grpc.aio.ServicerContext,
):
"""
Handle ServeHTTP streaming hook.
This is a bidirectional streaming RPC:
- Go server streams HTTP request body chunks to Python
- Python plugin streams HTTP response body chunks back to Go
Request flow:
- First message contains ServeHTTPRequestInit with headers/metadata
- Subsequent messages contain body_chunk data
- body_complete=True signals end of request body
Response flow:
- First message contains ServeHTTPResponseInit with status/headers
- Subsequent messages contain body_chunk data
- body_complete=True signals end of response body
The handler receives an HTTPRequest-like object and must return
an HTTPResponse-like object or yield response chunks.
Args:
request_iterator: Async iterator of ServeHTTPRequest messages.
context: The gRPC servicer context.
Yields:
ServeHTTPResponse messages with init, body chunks, and completion flag.
"""
hook_name = "ServeHTTP"
# Read the first message to get request metadata
try:
first_msg = await request_iterator.__anext__()
except StopAsyncIteration:
self._logger.error("ServeHTTP: empty request stream")
yield hooks_http_pb2.ServeHTTPResponse(
init=hooks_http_pb2.ServeHTTPResponseInit(
status_code=500,
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
),
body_chunk=b"Empty request stream",
body_complete=True,
)
return
# Extract request init (metadata)
init = first_msg.init
if init is None:
self._logger.error("ServeHTTP: first message missing init")
yield hooks_http_pb2.ServeHTTPResponse(
init=hooks_http_pb2.ServeHTTPResponseInit(
status_code=500,
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
),
body_chunk=b"First message missing init",
body_complete=True,
)
return
# Build an HTTP request wrapper for the handler
http_request = HTTPRequest(
method=init.method,
url=init.url,
proto=init.proto,
proto_major=init.proto_major,
proto_minor=init.proto_minor,
headers=_convert_headers_to_dict(init.headers),
host=init.host,
remote_addr=init.remote_addr,
request_uri=init.request_uri,
content_length=init.content_length,
plugin_context=init.plugin_context,
)
# Collect request body from stream
# For this initial implementation, we buffer the entire body
# TODO(08-02): Stream body chunks to handler for true streaming
body_chunks = []
if first_msg.body_chunk:
body_chunks.append(first_msg.body_chunk)
if not first_msg.body_complete:
async for msg in request_iterator:
if msg.body_chunk:
body_chunks.append(msg.body_chunk)
if msg.body_complete:
break
# Check for cancellation
if context.cancelled():
self._logger.debug("ServeHTTP: request cancelled by client")
return
http_request.body = b"".join(body_chunks)
# Check if handler is implemented
if not self.plugin.has_hook(hook_name):
# Hook not implemented - return 404 Not Found
yield hooks_http_pb2.ServeHTTPResponse(
init=hooks_http_pb2.ServeHTTPResponseInit(
status_code=404,
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
),
body_chunk=b"Not Found",
body_complete=True,
)
return
# Invoke handler
handler = self.plugin.get_hook_handler(hook_name)
# Create response writer
response_writer = HTTPResponseWriter()
try:
# Handler signature: (context, response_writer, request) -> None
# Similar to Go's http.Handler pattern
result, error = await self.runner.invoke(
handler,
init.plugin_context,
response_writer,
http_request,
hook_name=hook_name,
context=context,
)
if error is not None:
self._logger.error(f"ServeHTTP handler error: {error}")
yield hooks_http_pb2.ServeHTTPResponse(
init=hooks_http_pb2.ServeHTTPResponseInit(
status_code=500,
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
),
body_chunk=f"Internal Server Error: {error}".encode("utf-8"),
body_complete=True,
)
return
except Exception as e:
self._logger.exception(f"ServeHTTP exception: {e}")
yield hooks_http_pb2.ServeHTTPResponse(
init=hooks_http_pb2.ServeHTTPResponseInit(
status_code=500,
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
),
body_chunk=f"Internal Server Error".encode("utf-8"),
body_complete=True,
)
return
# Send response from response_writer
# First message: init with status and headers
response_headers = _convert_dict_to_headers(response_writer.headers)
status_code = response_writer.status_code or 200
# Get pending writes with flush markers
pending_writes = response_writer.get_pending_writes()
if not pending_writes:
# No body written - send just init with completion flag
yield hooks_http_pb2.ServeHTTPResponse(
init=hooks_http_pb2.ServeHTTPResponseInit(
status_code=status_code,
headers=response_headers,
),
body_complete=True,
)
return
# Stream response chunks with flush support
for i, (chunk, flush) in enumerate(pending_writes):
is_last = (i == len(pending_writes) - 1)
if i == 0:
# First message includes init
yield hooks_http_pb2.ServeHTTPResponse(
init=hooks_http_pb2.ServeHTTPResponseInit(
status_code=status_code,
headers=response_headers,
),
body_chunk=chunk,
body_complete=is_last,
flush=flush,
)
else:
# Subsequent messages are body chunks only
yield hooks_http_pb2.ServeHTTPResponse(
body_chunk=chunk,
body_complete=is_last,
flush=flush,
)
# Check for cancellation between chunks
if not is_last and context.cancelled():
self._logger.debug("ServeHTTP: response stream cancelled by client")
return
# =========================================================================
# DEFERRED HOOKS (Phase 8 - remaining)
# =========================================================================
#
# The following hooks are deferred to Phase 8-02 or later:
#
# - ServeMetrics: HTTP request/response streaming over gRPC (same pattern as ServeHTTP)
# - FileWillBeUploaded: Large file body streaming (may have size limits)
#
# These are NOT included in Implemented() until their respective phase adds support.
# =========================================================================
# =============================================================================
# HTTP Request/Response Helper Classes
# =============================================================================
def _convert_headers_to_dict(headers: list) -> dict:
"""Convert list of HTTPHeader protos to a dict of header name -> list of values."""
result = {}
for h in headers:
key = h.key
if key not in result:
result[key] = []
result[key].extend(h.values)
return result
def _convert_dict_to_headers(headers: dict) -> list:
"""Convert dict of headers to list of HTTPHeader protos."""
result = []
for key, values in headers.items():
if isinstance(values, str):
values = [values]
result.append(hooks_http_pb2.HTTPHeader(key=key, values=list(values)))
return result
class HTTPRequest:
"""
HTTP request wrapper for ServeHTTP handlers.
Provides a Pythonic interface to the streamed HTTP request data.
"""
def __init__(
self,
method: str,
url: str,
proto: str,
proto_major: int,
proto_minor: int,
headers: dict,
host: str,
remote_addr: str,
request_uri: str,
content_length: int,
plugin_context=None,
):
self.method = method
self.url = url
self.proto = proto
self.proto_major = proto_major
self.proto_minor = proto_minor
self.headers = headers
self.host = host
self.remote_addr = remote_addr
self.request_uri = request_uri
self.content_length = content_length
self.plugin_context = plugin_context
self.body: bytes = b""
def get_header(self, name: str, default: str = "") -> str:
"""Get a header value by name (case-insensitive)."""
# HTTP headers are case-insensitive
for key, values in self.headers.items():
if key.lower() == name.lower():
return values[0] if values else default
return default
def get_all_headers(self, name: str) -> list:
"""Get all values for a header (case-insensitive)."""
for key, values in self.headers.items():
if key.lower() == name.lower():
return list(values)
return []
class HTTPResponseWriter:
"""
HTTP response writer for ServeHTTP handlers.
Provides a Pythonic interface similar to Go's http.ResponseWriter.
Supports both buffered and streaming response modes.
STREAMING MODE:
---------------
For streaming responses, handlers can use the write() method to send
chunks incrementally, and flush() to request immediate delivery.
The servicer will send each chunk as a separate gRPC message.
HEADER LOCKING:
---------------
Headers are immutable after the first write() or write_header() call.
This matches Go's http.ResponseWriter behavior.
FLUSH SEMANTICS:
----------------
Flush is best-effort. The Go side will call http.Flusher.Flush() if
the underlying ResponseWriter supports it; otherwise silently ignored.
"""
# Recommended maximum chunk size (64KB, matching Go's DefaultChunkSize)
MAX_CHUNK_SIZE = 64 * 1024
def __init__(self):
self.headers: dict = {}
self.status_code: Optional[int] = None
self._body_chunks: list = []
self._headers_written: bool = False
self._pending_flush: bool = False
# Pending chunks for streaming mode: list of (chunk, flush_after) tuples
self._pending_writes: list = []
def set_header(self, name: str, value: str) -> None:
"""
Set a response header (replaces existing value).
Must be called before write() or write_header().
Args:
name: Header name (e.g., "Content-Type")
value: Header value
"""
if self._headers_written:
logger.warning(f"Cannot set header '{name}' after headers have been written")
return
self.headers[name] = [value]
def add_header(self, name: str, value: str) -> None:
"""
Add a value to a response header (for multi-value headers like Set-Cookie).
Must be called before write() or write_header().
Args:
name: Header name
value: Header value to add
"""
if self._headers_written:
logger.warning(f"Cannot add header '{name}' after headers have been written")
return
if name not in self.headers:
self.headers[name] = []
self.headers[name].append(value)
def write_header(self, status_code: int) -> None:
"""
Write the HTTP status code. Must be called before write().
If not called explicitly, write() will implicitly call write_header(200).
Args:
status_code: HTTP status code (100-999)
"""
if self._headers_written:
logger.warning("write_header called after headers already written")
return
self.status_code = status_code
self._headers_written = True
def write(self, data: bytes) -> int:
"""
Write response body data.
Implicitly calls write_header(200) if not already called.
For large responses, consider chunking the data to avoid memory issues.
Each write() call will be sent as a separate gRPC message in streaming mode.
Args:
data: Bytes to write (or str, which will be UTF-8 encoded)
Returns:
Number of bytes written
"""
if not self._headers_written:
self.write_header(200)
if isinstance(data, str):
data = data.encode("utf-8")
self._body_chunks.append(data)
# Record pending write with current flush state
self._pending_writes.append((data, self._pending_flush))
self._pending_flush = False
return len(data)
def flush(self) -> None:
"""
Request an immediate flush of buffered response data to the client.
This is best-effort: the Go side will call http.Flusher.Flush() if
the underlying ResponseWriter supports it; otherwise silently ignored.
This matches Go plugin RPC behavior.
Note: Flush is applied to the next write, or the last write if called
after writing data.
"""
if self._pending_writes:
# Apply flush to the last pending write
data, _ = self._pending_writes[-1]
self._pending_writes[-1] = (data, True)
else:
# Flush will apply to next write
self._pending_flush = True
def get_body(self) -> bytes:
"""Get the full response body (for buffered mode)."""
return b"".join(self._body_chunks)
def get_pending_writes(self) -> list:
"""
Get pending writes for streaming mode.
Returns:
List of (chunk: bytes, flush: bool) tuples
"""
return self._pending_writes
def clear_pending_writes(self) -> None:
"""Clear pending writes after they've been sent."""
self._pending_writes = []