mirror of
https://github.com/mattermost/mattermost.git
synced 2026-02-03 20:40:00 -05:00
- Add _to_command_response_proto helper to convert dict/wrapper/protobuf to CommandResponse protobuf - Plugin developers can now return plain dicts with response_type, text, etc. - Also supports CommandResponse wrapper class and raw protobufs - Export CommandResponse from mattermost_plugin package This fixes the "ExecuteCommand returned unexpected type: <class 'dict'>" error when Python plugin handlers return dict responses.
2606 lines
91 KiB
Python
2606 lines
91 KiB
Python
# Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
# See LICENSE.txt for license information.
|
|
|
|
"""
|
|
gRPC Hook Servicer for Mattermost Python plugins.
|
|
|
|
This module implements the PluginHooksServicer gRPC service, handling hook
|
|
invocations from the Mattermost server.
|
|
|
|
ERROR HANDLING CONVENTION:
|
|
--------------------------
|
|
1. Plugin bugs (exceptions in handlers): Return gRPC INTERNAL status.
|
|
The Go side should log and fail-open where appropriate.
|
|
|
|
2. Expected business rejections (e.g., post rejected by MessageWillBePosted):
|
|
Encoded via response fields (rejection_reason), NOT gRPC errors.
|
|
|
|
3. OnActivate errors: Encoded via response.error field (AppError).
|
|
This allows the Go side to distinguish activation failure from transport failure.
|
|
|
|
HOOK HANDLER INVOCATION:
|
|
------------------------
|
|
- All hook handlers are invoked via the HookRunner with timeout support.
|
|
- Both sync and async handlers are supported.
|
|
- Handler exceptions are caught and converted to appropriate gRPC responses.
|
|
|
|
TYPE STRATEGY:
|
|
--------------
|
|
For this initial implementation, hook handlers receive raw protobuf messages.
|
|
Future versions may add Python wrapper types for better ergonomics.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import logging
|
|
from typing import TYPE_CHECKING, Optional
|
|
|
|
import grpc
|
|
|
|
from mattermost_plugin.grpc import hooks_pb2_grpc
|
|
from mattermost_plugin.grpc import hooks_lifecycle_pb2
|
|
from mattermost_plugin.grpc import hooks_message_pb2
|
|
from mattermost_plugin.grpc import hooks_user_channel_pb2
|
|
from mattermost_plugin.grpc import hooks_command_pb2
|
|
from mattermost_plugin.grpc import hooks_http_pb2
|
|
from mattermost_plugin.grpc import common_pb2
|
|
from mattermost_plugin.grpc import api_remaining_pb2
|
|
from mattermost_plugin._internal.hook_runner import HookRunner, DEFAULT_HOOK_TIMEOUT
|
|
from mattermost_plugin._internal.wrappers import CommandResponse as CommandResponseWrapper
|
|
|
|
if TYPE_CHECKING:
|
|
from mattermost_plugin.plugin import Plugin
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
# DismissPostError is the special string that tells the server to silently
|
|
# dismiss the post (no error shown to user). Must match the Go constant.
|
|
DISMISS_POST_ERROR = "plugin.message_will_be_posted.dismiss_post"
|
|
|
|
|
|
def _make_app_error(
|
|
message: str,
|
|
error_id: str = "plugin.error",
|
|
detailed_error: str = "",
|
|
status_code: int = 500,
|
|
where: str = "",
|
|
) -> common_pb2.AppError:
|
|
"""Create an AppError protobuf message."""
|
|
return common_pb2.AppError(
|
|
id=error_id,
|
|
message=message,
|
|
detailed_error=detailed_error,
|
|
status_code=status_code,
|
|
where=where,
|
|
)
|
|
|
|
|
|
def _to_command_response_proto(
|
|
result: object,
|
|
logger: logging.Logger,
|
|
) -> Optional[api_remaining_pb2.CommandResponse]:
|
|
"""
|
|
Convert a handler result to a CommandResponse protobuf.
|
|
|
|
Supports:
|
|
- dict with command response fields (response_type, text, etc.)
|
|
- CommandResponseWrapper (the SDK wrapper class)
|
|
- api_remaining_pb2.CommandResponse (protobuf, passed through)
|
|
|
|
Args:
|
|
result: The return value from the ExecuteCommand handler.
|
|
logger: Logger for warnings/errors.
|
|
|
|
Returns:
|
|
A CommandResponse protobuf, or None if conversion failed.
|
|
"""
|
|
if result is None:
|
|
return None
|
|
|
|
# Handle dict responses (most common from plugin developers)
|
|
if isinstance(result, dict):
|
|
try:
|
|
return api_remaining_pb2.CommandResponse(
|
|
response_type=result.get("response_type", ""),
|
|
text=result.get("text", ""),
|
|
username=result.get("username", ""),
|
|
channel_id=result.get("channel_id", ""),
|
|
icon_url=result.get("icon_url", ""),
|
|
type=result.get("type", ""),
|
|
goto_location=result.get("goto_location", ""),
|
|
trigger_id=result.get("trigger_id", ""),
|
|
skip_slack_parsing=result.get("skip_slack_parsing", False),
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert dict to CommandResponse: {e}")
|
|
return None
|
|
|
|
# Handle SDK wrapper CommandResponse class
|
|
if isinstance(result, CommandResponseWrapper):
|
|
try:
|
|
return api_remaining_pb2.CommandResponse(
|
|
response_type=result.response_type,
|
|
text=result.text,
|
|
username=result.username,
|
|
channel_id=result.channel_id,
|
|
icon_url=result.icon_url,
|
|
type=result.type,
|
|
goto_location=result.goto_location,
|
|
trigger_id=result.trigger_id,
|
|
skip_slack_parsing=result.skip_slack_parsing,
|
|
)
|
|
except Exception as e:
|
|
logger.warning(f"Failed to convert CommandResponse wrapper: {e}")
|
|
return None
|
|
|
|
# Handle protobuf CommandResponse directly
|
|
if isinstance(result, api_remaining_pb2.CommandResponse):
|
|
return result
|
|
|
|
# Unknown type
|
|
logger.warning(f"ExecuteCommand returned unexpected type: {type(result)}")
|
|
return None
|
|
|
|
|
|
class PluginHooksServicerImpl(hooks_pb2_grpc.PluginHooksServicer):
|
|
"""
|
|
gRPC servicer implementation for plugin hooks.
|
|
|
|
This class receives hook invocations from the Mattermost server and
|
|
dispatches them to the appropriate plugin handler methods.
|
|
|
|
Attributes:
|
|
plugin: The Plugin instance that implements hook handlers.
|
|
runner: HookRunner instance for invoking handlers with timeout.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
plugin: "Plugin",
|
|
timeout: float = DEFAULT_HOOK_TIMEOUT,
|
|
) -> None:
|
|
"""
|
|
Initialize the hook servicer.
|
|
|
|
Args:
|
|
plugin: The Plugin instance to dispatch hooks to.
|
|
timeout: Default timeout for hook invocations in seconds.
|
|
"""
|
|
self.plugin = plugin
|
|
self.runner = HookRunner(timeout=timeout)
|
|
self._logger = logging.getLogger(
|
|
f"{__name__}.{plugin.__class__.__name__}"
|
|
)
|
|
|
|
# =========================================================================
|
|
# IMPLEMENTED RPC
|
|
# =========================================================================
|
|
|
|
async def Implemented(
|
|
self,
|
|
request: hooks_lifecycle_pb2.ImplementedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.ImplementedResponse:
|
|
"""
|
|
Return the list of hooks implemented by this plugin.
|
|
|
|
This method queries the plugin's hook registry to determine which
|
|
hooks have registered handlers.
|
|
|
|
Args:
|
|
request: The ImplementedRequest (contains context info).
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
ImplementedResponse containing list of canonical hook names.
|
|
"""
|
|
hooks = self.plugin.implemented_hooks()
|
|
self._logger.info(f"[DEBUG] Implemented() called, returning hooks: {hooks}")
|
|
print(f"[DEBUG] Implemented() called, returning hooks: {hooks}", flush=True)
|
|
return hooks_lifecycle_pb2.ImplementedResponse(hooks=hooks)
|
|
|
|
# =========================================================================
|
|
# LIFECYCLE HOOKS
|
|
# =========================================================================
|
|
|
|
async def OnActivate(
|
|
self,
|
|
request: hooks_lifecycle_pb2.OnActivateRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.OnActivateResponse:
|
|
"""
|
|
Handle plugin activation.
|
|
|
|
If the handler returns an error or raises an exception, the plugin
|
|
activation will fail. The error is encoded in the response.error field.
|
|
|
|
Args:
|
|
request: The OnActivateRequest.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnActivateResponse, with error field set on failure.
|
|
"""
|
|
hook_name = "OnActivate"
|
|
|
|
self._logger.info(f"[DEBUG] OnActivate gRPC received")
|
|
print(f"[DEBUG] OnActivate gRPC received", flush=True)
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Hook not implemented - activation succeeds by default
|
|
self._logger.info(f"[DEBUG] OnActivate hook not registered, returning success")
|
|
print(f"[DEBUG] OnActivate hook not registered, returning success", flush=True)
|
|
return hooks_lifecycle_pb2.OnActivateResponse()
|
|
|
|
self._logger.info(f"[DEBUG] OnActivate hook found, invoking handler")
|
|
print(f"[DEBUG] OnActivate hook found, invoking handler", flush=True)
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
# Don't pass gRPC context - we encode errors in response.error field
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
hook_name=hook_name,
|
|
)
|
|
|
|
self._logger.info(f"[DEBUG] OnActivate handler returned, result={result}, error={error}")
|
|
print(f"[DEBUG] OnActivate handler returned, result={result}, error={error}", flush=True)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnActivate failed: {error}")
|
|
return hooks_lifecycle_pb2.OnActivateResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.on_activate.error",
|
|
where="OnActivate",
|
|
),
|
|
)
|
|
|
|
# Handler may return an error explicitly
|
|
if isinstance(result, Exception) or isinstance(result, str) and result:
|
|
error_msg = str(result)
|
|
self._logger.error(f"OnActivate returned error: {error_msg}")
|
|
return hooks_lifecycle_pb2.OnActivateResponse(
|
|
error=_make_app_error(
|
|
message=error_msg,
|
|
error_id="plugin.on_activate.error",
|
|
where="OnActivate",
|
|
),
|
|
)
|
|
|
|
return hooks_lifecycle_pb2.OnActivateResponse()
|
|
|
|
async def OnDeactivate(
|
|
self,
|
|
request: hooks_lifecycle_pb2.OnDeactivateRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.OnDeactivateResponse:
|
|
"""
|
|
Handle plugin deactivation.
|
|
|
|
This is best-effort - errors are logged but don't prevent shutdown.
|
|
|
|
Args:
|
|
request: The OnDeactivateRequest.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnDeactivateResponse, with error field set on failure (informational).
|
|
"""
|
|
hook_name = "OnDeactivate"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_lifecycle_pb2.OnDeactivateResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
# Don't pass gRPC context - we encode errors in response.error field
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
hook_name=hook_name,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.warning(f"OnDeactivate error (continuing): {error}")
|
|
return hooks_lifecycle_pb2.OnDeactivateResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.on_deactivate.error",
|
|
where="OnDeactivate",
|
|
),
|
|
)
|
|
|
|
return hooks_lifecycle_pb2.OnDeactivateResponse()
|
|
|
|
async def OnConfigurationChange(
|
|
self,
|
|
request: hooks_lifecycle_pb2.OnConfigurationChangeRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.OnConfigurationChangeResponse:
|
|
"""
|
|
Handle configuration change notification.
|
|
|
|
Errors are logged but do not stop the plugin.
|
|
|
|
Args:
|
|
request: The OnConfigurationChangeRequest.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnConfigurationChangeResponse, with error field set on failure.
|
|
"""
|
|
hook_name = "OnConfigurationChange"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_lifecycle_pb2.OnConfigurationChangeResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
# Don't pass gRPC context - we encode errors in response.error field
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
hook_name=hook_name,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.warning(
|
|
f"OnConfigurationChange error (continuing): {error}"
|
|
)
|
|
return hooks_lifecycle_pb2.OnConfigurationChangeResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.on_configuration_change.error",
|
|
where="OnConfigurationChange",
|
|
),
|
|
)
|
|
|
|
return hooks_lifecycle_pb2.OnConfigurationChangeResponse()
|
|
|
|
# =========================================================================
|
|
# MESSAGE HOOKS
|
|
# =========================================================================
|
|
|
|
async def MessageWillBePosted(
|
|
self,
|
|
request: hooks_message_pb2.MessageWillBePostedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.MessageWillBePostedResponse:
|
|
"""
|
|
Handle MessageWillBePosted hook.
|
|
|
|
This hook is called before a post is saved. The handler can:
|
|
- Allow unchanged: return (None, "")
|
|
- Modify: return (modified_post, "")
|
|
- Reject: return (None, "rejection reason")
|
|
- Dismiss silently: return (None, DISMISS_POST_ERROR)
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and post.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
MessageWillBePostedResponse with modified_post and/or rejection_reason.
|
|
"""
|
|
hook_name = "MessageWillBePosted"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Not implemented - allow post unchanged
|
|
return hooks_message_pb2.MessageWillBePostedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Invoke handler with protobuf objects
|
|
# Handler signature: (context, post) -> (post, string) or (post, rejection)
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.post,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
# Handler exception - treat as rejection with error message
|
|
self._logger.error(f"MessageWillBePosted error: {error}")
|
|
return hooks_message_pb2.MessageWillBePostedResponse(
|
|
rejection_reason=f"Plugin error: {error}",
|
|
)
|
|
|
|
# Parse handler result
|
|
modified_post = None
|
|
rejection_reason = ""
|
|
|
|
if result is None:
|
|
# Allow unchanged
|
|
pass
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
post_result, reason = result
|
|
if post_result is not None:
|
|
modified_post = post_result
|
|
if reason:
|
|
rejection_reason = str(reason)
|
|
elif hasattr(result, "id"):
|
|
# Looks like a Post object - treat as modified post
|
|
modified_post = result
|
|
elif isinstance(result, str):
|
|
# String result - treat as rejection reason
|
|
rejection_reason = result
|
|
|
|
return hooks_message_pb2.MessageWillBePostedResponse(
|
|
modified_post=modified_post,
|
|
rejection_reason=rejection_reason,
|
|
)
|
|
|
|
async def MessageWillBeUpdated(
|
|
self,
|
|
request: hooks_message_pb2.MessageWillBeUpdatedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.MessageWillBeUpdatedResponse:
|
|
"""
|
|
Handle MessageWillBeUpdated hook.
|
|
|
|
Similar to MessageWillBePosted but for post updates.
|
|
Handler receives both new_post and old_post.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, new_post, and old_post.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
MessageWillBeUpdatedResponse with modified_post and/or rejection_reason.
|
|
"""
|
|
hook_name = "MessageWillBeUpdated"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.MessageWillBeUpdatedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (context, new_post, old_post) -> (post, string)
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.new_post,
|
|
request.old_post,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"MessageWillBeUpdated error: {error}")
|
|
return hooks_message_pb2.MessageWillBeUpdatedResponse(
|
|
rejection_reason=f"Plugin error: {error}",
|
|
)
|
|
|
|
# Parse handler result (same logic as MessageWillBePosted)
|
|
modified_post = None
|
|
rejection_reason = ""
|
|
|
|
if result is None:
|
|
pass
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
post_result, reason = result
|
|
if post_result is not None:
|
|
modified_post = post_result
|
|
if reason:
|
|
rejection_reason = str(reason)
|
|
elif hasattr(result, "id"):
|
|
modified_post = result
|
|
elif isinstance(result, str):
|
|
rejection_reason = result
|
|
|
|
return hooks_message_pb2.MessageWillBeUpdatedResponse(
|
|
modified_post=modified_post,
|
|
rejection_reason=rejection_reason,
|
|
)
|
|
|
|
async def MessageHasBeenPosted(
|
|
self,
|
|
request: hooks_message_pb2.MessageHasBeenPostedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.MessageHasBeenPostedResponse:
|
|
"""
|
|
Handle MessageHasBeenPosted notification.
|
|
|
|
This is a fire-and-forget notification. Errors are logged but
|
|
the response is always empty.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and post.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty MessageHasBeenPostedResponse.
|
|
"""
|
|
hook_name = "MessageHasBeenPosted"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.MessageHasBeenPostedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (context, post) -> None
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.post,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"MessageHasBeenPosted error: {error}")
|
|
# Notification hook - still return success
|
|
|
|
return hooks_message_pb2.MessageHasBeenPostedResponse()
|
|
|
|
async def MessageHasBeenUpdated(
|
|
self,
|
|
request: hooks_message_pb2.MessageHasBeenUpdatedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.MessageHasBeenUpdatedResponse:
|
|
"""
|
|
Handle MessageHasBeenUpdated notification.
|
|
|
|
Fire-and-forget notification for post updates.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, new_post, and old_post.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty MessageHasBeenUpdatedResponse.
|
|
"""
|
|
hook_name = "MessageHasBeenUpdated"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.MessageHasBeenUpdatedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (context, new_post, old_post) -> None
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.new_post,
|
|
request.old_post,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"MessageHasBeenUpdated error: {error}")
|
|
|
|
return hooks_message_pb2.MessageHasBeenUpdatedResponse()
|
|
|
|
async def MessageHasBeenDeleted(
|
|
self,
|
|
request: hooks_message_pb2.MessageHasBeenDeletedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.MessageHasBeenDeletedResponse:
|
|
"""
|
|
Handle MessageHasBeenDeleted notification.
|
|
|
|
Fire-and-forget notification for post deletions.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and post.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty MessageHasBeenDeletedResponse.
|
|
"""
|
|
hook_name = "MessageHasBeenDeleted"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.MessageHasBeenDeletedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (context, post) -> None
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.post,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"MessageHasBeenDeleted error: {error}")
|
|
|
|
return hooks_message_pb2.MessageHasBeenDeletedResponse()
|
|
|
|
async def MessagesWillBeConsumed(
|
|
self,
|
|
request: hooks_message_pb2.MessagesWillBeConsumedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.MessagesWillBeConsumedResponse:
|
|
"""
|
|
Handle MessagesWillBeConsumed hook.
|
|
|
|
Called when posts are being delivered to a client. The handler can
|
|
modify the list of posts (e.g., filter, redact).
|
|
|
|
Args:
|
|
request: Contains context and list of posts.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
MessagesWillBeConsumedResponse with (possibly modified) posts.
|
|
"""
|
|
hook_name = "MessagesWillBeConsumed"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Return original posts unchanged
|
|
return hooks_message_pb2.MessagesWillBeConsumedResponse(
|
|
posts=list(request.posts),
|
|
)
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (posts) -> posts
|
|
# Note: No Context parameter for this hook
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
list(request.posts),
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"MessagesWillBeConsumed error: {error}")
|
|
# On error, return original posts
|
|
return hooks_message_pb2.MessagesWillBeConsumedResponse(
|
|
posts=list(request.posts),
|
|
)
|
|
|
|
# Handler should return list of posts
|
|
if result is None:
|
|
posts = list(request.posts)
|
|
elif isinstance(result, (list, tuple)):
|
|
posts = list(result)
|
|
else:
|
|
self._logger.warning(
|
|
f"MessagesWillBeConsumed returned unexpected type: {type(result)}"
|
|
)
|
|
posts = list(request.posts)
|
|
|
|
return hooks_message_pb2.MessagesWillBeConsumedResponse(posts=posts)
|
|
|
|
# =========================================================================
|
|
# HOOK IMPLEMENTATION MATRIX (remaining hooks for 07-03)
|
|
# =========================================================================
|
|
#
|
|
# Hook Name | Python Handler | Return Semantics | Default Behavior
|
|
# -----------------------------------|------------------------|---------------------------|------------------
|
|
# ExecuteCommand | execute_command | (CommandResponse, error) | UNIMPLEMENTED
|
|
# ConfigurationWillBeSaved | configuration_will_be_saved | (config, error) | allow unchanged
|
|
# UserHasBeenCreated | user_has_been_created | void (fire-and-forget) | no-op
|
|
# UserWillLogIn | user_will_log_in | rejection_reason string | allow (empty string)
|
|
# UserHasLoggedIn | user_has_logged_in | void (fire-and-forget) | no-op
|
|
# UserHasBeenDeactivated | user_has_been_deactivated | void (fire-and-forget) | no-op
|
|
# ChannelHasBeenCreated | channel_has_been_created | void (fire-and-forget) | no-op
|
|
# UserHasJoinedChannel | user_has_joined_channel | void (fire-and-forget) | no-op
|
|
# UserHasLeftChannel | user_has_left_channel | void (fire-and-forget) | no-op
|
|
# UserHasJoinedTeam | user_has_joined_team | void (fire-and-forget) | no-op
|
|
# UserHasLeftTeam | user_has_left_team | void (fire-and-forget) | no-op
|
|
# ReactionHasBeenAdded | reaction_has_been_added | void (fire-and-forget) | no-op
|
|
# ReactionHasBeenRemoved | reaction_has_been_removed | void (fire-and-forget) | no-op
|
|
# NotificationWillBePushed | notification_will_be_pushed | (notification, rejection) | allow
|
|
# EmailNotificationWillBeSent | email_notification_will_be_sent | (content, rejection) | allow
|
|
# PreferencesHaveChanged | preferences_have_changed | void (fire-and-forget) | no-op
|
|
# OnInstall | on_install | error | success
|
|
# OnSendDailyTelemetry | on_send_daily_telemetry | void (fire-and-forget) | no-op
|
|
# RunDataRetention | run_data_retention | (deleted_count, error) | (0, nil)
|
|
# OnCloudLimitsUpdated | on_cloud_limits_updated | void (fire-and-forget) | no-op
|
|
# OnWebSocketConnect | on_web_socket_connect | void (fire-and-forget) | no-op
|
|
# OnWebSocketDisconnect | on_web_socket_disconnect | void (fire-and-forget) | no-op
|
|
# WebSocketMessageHasBeenPosted | web_socket_message_has_been_posted | void | no-op
|
|
# OnPluginClusterEvent | on_plugin_cluster_event | void (fire-and-forget) | no-op
|
|
# OnSharedChannelsSyncMsg | on_shared_channels_sync_msg | (SyncResponse, error) | UNIMPLEMENTED
|
|
# OnSharedChannelsPing | on_shared_channels_ping | bool (healthy) | true
|
|
# OnSharedChannelsAttachmentSyncMsg | on_shared_channels_attachment_sync_msg | error | success
|
|
# OnSharedChannelsProfileImageSyncMsg| on_shared_channels_profile_image_sync_msg | error | success
|
|
# GenerateSupportData | generate_support_data | ([]FileData, error) | ([], nil)
|
|
# OnSAMLLogin | on_saml_login | error | success
|
|
#
|
|
# DEFERRED TO PHASE 8 (Streaming):
|
|
# - ServeHTTP: HTTP request/response streaming
|
|
# - ServeMetrics: HTTP request/response streaming
|
|
# - FileWillBeUploaded: Large file streaming (implemented as unary but may have size limits)
|
|
# =========================================================================
|
|
|
|
# =========================================================================
|
|
# COMMAND HOOKS
|
|
# =========================================================================
|
|
|
|
async def ExecuteCommand(
|
|
self,
|
|
request: hooks_command_pb2.ExecuteCommandRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.ExecuteCommandResponse:
|
|
"""
|
|
Handle ExecuteCommand hook.
|
|
|
|
Called when a slash command registered by this plugin is invoked.
|
|
Handler should return a CommandResponse or raise an error.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and command args.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
ExecuteCommandResponse with response or error.
|
|
"""
|
|
hook_name = "ExecuteCommand"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Command hook not implemented - return error
|
|
return hooks_command_pb2.ExecuteCommandResponse(
|
|
error=_make_app_error(
|
|
message="ExecuteCommand hook not implemented",
|
|
error_id="plugin.execute_command.not_implemented",
|
|
status_code=501,
|
|
where="ExecuteCommand",
|
|
),
|
|
)
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (context, args) -> CommandResponse or error
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.args,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"ExecuteCommand error: {error}")
|
|
return hooks_command_pb2.ExecuteCommandResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.execute_command.error",
|
|
where="ExecuteCommand",
|
|
),
|
|
)
|
|
|
|
# Convert handler result to protobuf CommandResponse
|
|
# Supports: dict, CommandResponse wrapper, or protobuf directly
|
|
command_response = _to_command_response_proto(result, self._logger)
|
|
if command_response is not None:
|
|
return hooks_command_pb2.ExecuteCommandResponse(response=command_response)
|
|
else:
|
|
# result was None or conversion failed (warning already logged)
|
|
return hooks_command_pb2.ExecuteCommandResponse()
|
|
|
|
# =========================================================================
|
|
# CONFIGURATION HOOKS
|
|
# =========================================================================
|
|
|
|
async def ConfigurationWillBeSaved(
|
|
self,
|
|
request: hooks_lifecycle_pb2.ConfigurationWillBeSavedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse:
|
|
"""
|
|
Handle ConfigurationWillBeSaved hook.
|
|
|
|
Called before configuration is saved. Handler can return a modified
|
|
config or an error to reject the save.
|
|
|
|
Args:
|
|
request: Contains context and new_config (as ConfigJson).
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
ConfigurationWillBeSavedResponse with optional modified_config or error.
|
|
"""
|
|
hook_name = "ConfigurationWillBeSaved"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Not implemented - allow unchanged
|
|
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (config_json) -> (config_json, error) or just config_json
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.new_config,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"ConfigurationWillBeSaved error: {error}")
|
|
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.configuration_will_be_saved.error",
|
|
where="ConfigurationWillBeSaved",
|
|
),
|
|
)
|
|
|
|
# Parse result
|
|
if result is None:
|
|
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse()
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
config, err = result
|
|
if err:
|
|
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse(
|
|
error=_make_app_error(
|
|
message=str(err),
|
|
error_id="plugin.configuration_will_be_saved.rejected",
|
|
where="ConfigurationWillBeSaved",
|
|
),
|
|
)
|
|
if config is not None:
|
|
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse(
|
|
modified_config=config
|
|
)
|
|
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse()
|
|
elif hasattr(result, "config_json"):
|
|
# Looks like a ConfigJson
|
|
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse(
|
|
modified_config=result
|
|
)
|
|
else:
|
|
return hooks_lifecycle_pb2.ConfigurationWillBeSavedResponse()
|
|
|
|
# =========================================================================
|
|
# USER LIFECYCLE HOOKS
|
|
# =========================================================================
|
|
|
|
async def UserHasBeenCreated(
|
|
self,
|
|
request: hooks_user_channel_pb2.UserHasBeenCreatedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.UserHasBeenCreatedResponse:
|
|
"""
|
|
Handle UserHasBeenCreated notification.
|
|
|
|
Fire-and-forget notification when a user is created.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and user.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty UserHasBeenCreatedResponse.
|
|
"""
|
|
hook_name = "UserHasBeenCreated"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.UserHasBeenCreatedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.user,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"UserHasBeenCreated error: {error}")
|
|
# Fire-and-forget - still return success
|
|
|
|
return hooks_user_channel_pb2.UserHasBeenCreatedResponse()
|
|
|
|
async def UserWillLogIn(
|
|
self,
|
|
request: hooks_user_channel_pb2.UserWillLogInRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.UserWillLogInResponse:
|
|
"""
|
|
Handle UserWillLogIn hook.
|
|
|
|
Called before a user logs in. Return a non-empty string to reject.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and user.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
UserWillLogInResponse with rejection_reason (empty to allow).
|
|
"""
|
|
hook_name = "UserWillLogIn"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Not implemented - allow login
|
|
return hooks_user_channel_pb2.UserWillLogInResponse(rejection_reason="")
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.user,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"UserWillLogIn error: {error}")
|
|
# On error, reject the login
|
|
return hooks_user_channel_pb2.UserWillLogInResponse(
|
|
rejection_reason=f"Plugin error: {error}"
|
|
)
|
|
|
|
# Handler returns string rejection reason (empty = allow)
|
|
rejection_reason = ""
|
|
if result is not None and isinstance(result, str):
|
|
rejection_reason = result
|
|
|
|
return hooks_user_channel_pb2.UserWillLogInResponse(
|
|
rejection_reason=rejection_reason
|
|
)
|
|
|
|
async def UserHasLoggedIn(
|
|
self,
|
|
request: hooks_user_channel_pb2.UserHasLoggedInRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.UserHasLoggedInResponse:
|
|
"""
|
|
Handle UserHasLoggedIn notification.
|
|
|
|
Fire-and-forget notification after user login.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and user.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty UserHasLoggedInResponse.
|
|
"""
|
|
hook_name = "UserHasLoggedIn"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.UserHasLoggedInResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.user,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"UserHasLoggedIn error: {error}")
|
|
|
|
return hooks_user_channel_pb2.UserHasLoggedInResponse()
|
|
|
|
async def UserHasBeenDeactivated(
|
|
self,
|
|
request: hooks_user_channel_pb2.UserHasBeenDeactivatedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.UserHasBeenDeactivatedResponse:
|
|
"""
|
|
Handle UserHasBeenDeactivated notification.
|
|
|
|
Fire-and-forget notification when a user is deactivated.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and user.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty UserHasBeenDeactivatedResponse.
|
|
"""
|
|
hook_name = "UserHasBeenDeactivated"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.UserHasBeenDeactivatedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.user,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"UserHasBeenDeactivated error: {error}")
|
|
|
|
return hooks_user_channel_pb2.UserHasBeenDeactivatedResponse()
|
|
|
|
# =========================================================================
|
|
# CHANNEL AND TEAM HOOKS
|
|
# =========================================================================
|
|
|
|
async def ChannelHasBeenCreated(
|
|
self,
|
|
request: hooks_user_channel_pb2.ChannelHasBeenCreatedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.ChannelHasBeenCreatedResponse:
|
|
"""
|
|
Handle ChannelHasBeenCreated notification.
|
|
|
|
Fire-and-forget notification when a channel is created.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and channel.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty ChannelHasBeenCreatedResponse.
|
|
"""
|
|
hook_name = "ChannelHasBeenCreated"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.ChannelHasBeenCreatedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.channel,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"ChannelHasBeenCreated error: {error}")
|
|
|
|
return hooks_user_channel_pb2.ChannelHasBeenCreatedResponse()
|
|
|
|
async def UserHasJoinedChannel(
|
|
self,
|
|
request: hooks_user_channel_pb2.UserHasJoinedChannelRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.UserHasJoinedChannelResponse:
|
|
"""
|
|
Handle UserHasJoinedChannel notification.
|
|
|
|
Fire-and-forget notification when user joins a channel.
|
|
Actor is optional (nil if self-join).
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, channel_member, and optional actor.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty UserHasJoinedChannelResponse.
|
|
"""
|
|
hook_name = "UserHasJoinedChannel"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.UserHasJoinedChannelResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Actor may be None (optional field)
|
|
actor = request.actor if request.HasField("actor") else None
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.channel_member,
|
|
actor,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"UserHasJoinedChannel error: {error}")
|
|
|
|
return hooks_user_channel_pb2.UserHasJoinedChannelResponse()
|
|
|
|
async def UserHasLeftChannel(
|
|
self,
|
|
request: hooks_user_channel_pb2.UserHasLeftChannelRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.UserHasLeftChannelResponse:
|
|
"""
|
|
Handle UserHasLeftChannel notification.
|
|
|
|
Fire-and-forget notification when user leaves a channel.
|
|
Actor is optional (nil if self-removal).
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, channel_member, and optional actor.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty UserHasLeftChannelResponse.
|
|
"""
|
|
hook_name = "UserHasLeftChannel"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.UserHasLeftChannelResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
actor = request.actor if request.HasField("actor") else None
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.channel_member,
|
|
actor,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"UserHasLeftChannel error: {error}")
|
|
|
|
return hooks_user_channel_pb2.UserHasLeftChannelResponse()
|
|
|
|
async def UserHasJoinedTeam(
|
|
self,
|
|
request: hooks_user_channel_pb2.UserHasJoinedTeamRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.UserHasJoinedTeamResponse:
|
|
"""
|
|
Handle UserHasJoinedTeam notification.
|
|
|
|
Fire-and-forget notification when user joins a team.
|
|
Actor is optional (nil if self-join).
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, team_member, and optional actor.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty UserHasJoinedTeamResponse.
|
|
"""
|
|
hook_name = "UserHasJoinedTeam"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.UserHasJoinedTeamResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
actor = request.actor if request.HasField("actor") else None
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.team_member,
|
|
actor,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"UserHasJoinedTeam error: {error}")
|
|
|
|
return hooks_user_channel_pb2.UserHasJoinedTeamResponse()
|
|
|
|
async def UserHasLeftTeam(
|
|
self,
|
|
request: hooks_user_channel_pb2.UserHasLeftTeamRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.UserHasLeftTeamResponse:
|
|
"""
|
|
Handle UserHasLeftTeam notification.
|
|
|
|
Fire-and-forget notification when user leaves a team.
|
|
Actor is optional (nil if self-removal).
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, team_member, and optional actor.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty UserHasLeftTeamResponse.
|
|
"""
|
|
hook_name = "UserHasLeftTeam"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.UserHasLeftTeamResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
actor = request.actor if request.HasField("actor") else None
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.team_member,
|
|
actor,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"UserHasLeftTeam error: {error}")
|
|
|
|
return hooks_user_channel_pb2.UserHasLeftTeamResponse()
|
|
|
|
# =========================================================================
|
|
# REACTION HOOKS
|
|
# =========================================================================
|
|
|
|
async def ReactionHasBeenAdded(
|
|
self,
|
|
request: hooks_message_pb2.ReactionHasBeenAddedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.ReactionHasBeenAddedResponse:
|
|
"""
|
|
Handle ReactionHasBeenAdded notification.
|
|
|
|
Fire-and-forget notification when a reaction is added.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and reaction.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty ReactionHasBeenAddedResponse.
|
|
"""
|
|
hook_name = "ReactionHasBeenAdded"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.ReactionHasBeenAddedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.reaction,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"ReactionHasBeenAdded error: {error}")
|
|
|
|
return hooks_message_pb2.ReactionHasBeenAddedResponse()
|
|
|
|
async def ReactionHasBeenRemoved(
|
|
self,
|
|
request: hooks_message_pb2.ReactionHasBeenRemovedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.ReactionHasBeenRemovedResponse:
|
|
"""
|
|
Handle ReactionHasBeenRemoved notification.
|
|
|
|
Fire-and-forget notification when a reaction is removed.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and reaction.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty ReactionHasBeenRemovedResponse.
|
|
"""
|
|
hook_name = "ReactionHasBeenRemoved"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.ReactionHasBeenRemovedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.reaction,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"ReactionHasBeenRemoved error: {error}")
|
|
|
|
return hooks_message_pb2.ReactionHasBeenRemovedResponse()
|
|
|
|
# =========================================================================
|
|
# NOTIFICATION HOOKS
|
|
# =========================================================================
|
|
|
|
async def NotificationWillBePushed(
|
|
self,
|
|
request: hooks_message_pb2.NotificationWillBePushedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.NotificationWillBePushedResponse:
|
|
"""
|
|
Handle NotificationWillBePushed hook.
|
|
|
|
Called before a push notification is sent. Handler can:
|
|
- Allow unchanged: return (None, "")
|
|
- Modify: return (modified_notification, "")
|
|
- Reject: return (None, "rejection reason")
|
|
|
|
Args:
|
|
request: Contains context, push_notification, and user_id.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
NotificationWillBePushedResponse with modified_notification and/or rejection_reason.
|
|
"""
|
|
hook_name = "NotificationWillBePushed"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.NotificationWillBePushedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (push_notification, user_id) -> (notification, rejection)
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.push_notification,
|
|
request.user_id,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"NotificationWillBePushed error: {error}")
|
|
return hooks_message_pb2.NotificationWillBePushedResponse(
|
|
rejection_reason=f"Plugin error: {error}",
|
|
)
|
|
|
|
# Parse result
|
|
modified = None
|
|
rejection_reason = ""
|
|
|
|
if result is None:
|
|
pass
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
notif, reason = result
|
|
if notif is not None:
|
|
modified = notif
|
|
if reason:
|
|
rejection_reason = str(reason)
|
|
elif hasattr(result, "platform"):
|
|
# Looks like a PushNotification
|
|
modified = result
|
|
elif isinstance(result, str):
|
|
rejection_reason = result
|
|
|
|
return hooks_message_pb2.NotificationWillBePushedResponse(
|
|
modified_notification=modified,
|
|
rejection_reason=rejection_reason,
|
|
)
|
|
|
|
async def EmailNotificationWillBeSent(
|
|
self,
|
|
request: hooks_message_pb2.EmailNotificationWillBeSentRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.EmailNotificationWillBeSentResponse:
|
|
"""
|
|
Handle EmailNotificationWillBeSent hook.
|
|
|
|
Called before an email notification is sent. Handler can:
|
|
- Allow unchanged: return (None, "")
|
|
- Modify: return (modified_content, "")
|
|
- Reject: return (None, "rejection reason")
|
|
|
|
Args:
|
|
request: Contains context and email_notification (as EmailNotificationJson).
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
EmailNotificationWillBeSentResponse with modified_content and/or rejection_reason.
|
|
"""
|
|
hook_name = "EmailNotificationWillBeSent"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.EmailNotificationWillBeSentResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (email_notification) -> (content, rejection)
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.email_notification,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"EmailNotificationWillBeSent error: {error}")
|
|
return hooks_message_pb2.EmailNotificationWillBeSentResponse(
|
|
rejection_reason=f"Plugin error: {error}",
|
|
)
|
|
|
|
# Parse result
|
|
modified = None
|
|
rejection_reason = ""
|
|
|
|
if result is None:
|
|
pass
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
content, reason = result
|
|
if content is not None:
|
|
modified = content
|
|
if reason:
|
|
rejection_reason = str(reason)
|
|
elif hasattr(result, "subject"):
|
|
# Looks like EmailNotificationContent
|
|
modified = result
|
|
elif isinstance(result, str):
|
|
rejection_reason = result
|
|
|
|
return hooks_message_pb2.EmailNotificationWillBeSentResponse(
|
|
modified_content=modified,
|
|
rejection_reason=rejection_reason,
|
|
)
|
|
|
|
# =========================================================================
|
|
# PREFERENCES HOOKS
|
|
# =========================================================================
|
|
|
|
async def PreferencesHaveChanged(
|
|
self,
|
|
request: hooks_message_pb2.PreferencesHaveChangedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_message_pb2.PreferencesHaveChangedResponse:
|
|
"""
|
|
Handle PreferencesHaveChanged notification.
|
|
|
|
Fire-and-forget notification when user preferences change.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and preferences list.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty PreferencesHaveChangedResponse.
|
|
"""
|
|
hook_name = "PreferencesHaveChanged"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_message_pb2.PreferencesHaveChangedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
list(request.preferences),
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"PreferencesHaveChanged error: {error}")
|
|
|
|
return hooks_message_pb2.PreferencesHaveChangedResponse()
|
|
|
|
# =========================================================================
|
|
# SYSTEM HOOKS
|
|
# =========================================================================
|
|
|
|
async def OnInstall(
|
|
self,
|
|
request: hooks_lifecycle_pb2.OnInstallRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.OnInstallResponse:
|
|
"""
|
|
Handle OnInstall hook.
|
|
|
|
Called after plugin installation. Return error to indicate failure.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and install event.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnInstallResponse with optional error.
|
|
"""
|
|
hook_name = "OnInstall"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_lifecycle_pb2.OnInstallResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.event,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnInstall error: {error}")
|
|
return hooks_lifecycle_pb2.OnInstallResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.on_install.error",
|
|
where="OnInstall",
|
|
),
|
|
)
|
|
|
|
# Handler may return an error explicitly
|
|
if isinstance(result, Exception) or (isinstance(result, str) and result):
|
|
return hooks_lifecycle_pb2.OnInstallResponse(
|
|
error=_make_app_error(
|
|
message=str(result),
|
|
error_id="plugin.on_install.error",
|
|
where="OnInstall",
|
|
),
|
|
)
|
|
|
|
return hooks_lifecycle_pb2.OnInstallResponse()
|
|
|
|
async def OnSendDailyTelemetry(
|
|
self,
|
|
request: hooks_lifecycle_pb2.OnSendDailyTelemetryRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.OnSendDailyTelemetryResponse:
|
|
"""
|
|
Handle OnSendDailyTelemetry notification.
|
|
|
|
Fire-and-forget notification when daily telemetry is sent.
|
|
|
|
Args:
|
|
request: Contains context.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty OnSendDailyTelemetryResponse.
|
|
"""
|
|
hook_name = "OnSendDailyTelemetry"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_lifecycle_pb2.OnSendDailyTelemetryResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnSendDailyTelemetry error: {error}")
|
|
|
|
return hooks_lifecycle_pb2.OnSendDailyTelemetryResponse()
|
|
|
|
async def RunDataRetention(
|
|
self,
|
|
request: hooks_lifecycle_pb2.RunDataRetentionRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.RunDataRetentionResponse:
|
|
"""
|
|
Handle RunDataRetention hook.
|
|
|
|
Called during data retention job. Handler should delete old data
|
|
and return the count of deleted items.
|
|
|
|
Args:
|
|
request: Contains context, now_time, and batch_size.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
RunDataRetentionResponse with deleted_count and optional error.
|
|
"""
|
|
hook_name = "RunDataRetention"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_lifecycle_pb2.RunDataRetentionResponse(deleted_count=0)
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Handler signature: (now_time, batch_size) -> (deleted_count, error)
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.now_time,
|
|
request.batch_size,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"RunDataRetention error: {error}")
|
|
return hooks_lifecycle_pb2.RunDataRetentionResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.run_data_retention.error",
|
|
where="RunDataRetention",
|
|
),
|
|
deleted_count=0,
|
|
)
|
|
|
|
# Parse result
|
|
deleted_count = 0
|
|
if result is None:
|
|
pass
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
count, err = result
|
|
if count is not None:
|
|
deleted_count = int(count)
|
|
if err:
|
|
return hooks_lifecycle_pb2.RunDataRetentionResponse(
|
|
error=_make_app_error(
|
|
message=str(err),
|
|
error_id="plugin.run_data_retention.error",
|
|
where="RunDataRetention",
|
|
),
|
|
deleted_count=deleted_count,
|
|
)
|
|
elif isinstance(result, int):
|
|
deleted_count = result
|
|
|
|
return hooks_lifecycle_pb2.RunDataRetentionResponse(deleted_count=deleted_count)
|
|
|
|
async def OnCloudLimitsUpdated(
|
|
self,
|
|
request: hooks_lifecycle_pb2.OnCloudLimitsUpdatedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_lifecycle_pb2.OnCloudLimitsUpdatedResponse:
|
|
"""
|
|
Handle OnCloudLimitsUpdated notification.
|
|
|
|
Fire-and-forget notification when cloud limits change.
|
|
|
|
Args:
|
|
request: Contains context and limits.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty OnCloudLimitsUpdatedResponse.
|
|
"""
|
|
hook_name = "OnCloudLimitsUpdated"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_lifecycle_pb2.OnCloudLimitsUpdatedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.limits,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnCloudLimitsUpdated error: {error}")
|
|
|
|
return hooks_lifecycle_pb2.OnCloudLimitsUpdatedResponse()
|
|
|
|
# =========================================================================
|
|
# WEBSOCKET HOOKS
|
|
# =========================================================================
|
|
|
|
async def OnWebSocketConnect(
|
|
self,
|
|
request: hooks_command_pb2.OnWebSocketConnectRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.OnWebSocketConnectResponse:
|
|
"""
|
|
Handle OnWebSocketConnect notification.
|
|
|
|
Fire-and-forget notification when a WebSocket connects.
|
|
|
|
Args:
|
|
request: Contains context, web_conn_id, and user_id.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty OnWebSocketConnectResponse.
|
|
"""
|
|
hook_name = "OnWebSocketConnect"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_command_pb2.OnWebSocketConnectResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.web_conn_id,
|
|
request.user_id,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnWebSocketConnect error: {error}")
|
|
|
|
return hooks_command_pb2.OnWebSocketConnectResponse()
|
|
|
|
async def OnWebSocketDisconnect(
|
|
self,
|
|
request: hooks_command_pb2.OnWebSocketDisconnectRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.OnWebSocketDisconnectResponse:
|
|
"""
|
|
Handle OnWebSocketDisconnect notification.
|
|
|
|
Fire-and-forget notification when a WebSocket disconnects.
|
|
|
|
Args:
|
|
request: Contains context, web_conn_id, and user_id.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty OnWebSocketDisconnectResponse.
|
|
"""
|
|
hook_name = "OnWebSocketDisconnect"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_command_pb2.OnWebSocketDisconnectResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.web_conn_id,
|
|
request.user_id,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnWebSocketDisconnect error: {error}")
|
|
|
|
return hooks_command_pb2.OnWebSocketDisconnectResponse()
|
|
|
|
async def WebSocketMessageHasBeenPosted(
|
|
self,
|
|
request: hooks_command_pb2.WebSocketMessageHasBeenPostedRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.WebSocketMessageHasBeenPostedResponse:
|
|
"""
|
|
Handle WebSocketMessageHasBeenPosted notification.
|
|
|
|
Fire-and-forget notification when a WebSocket message is received.
|
|
|
|
Args:
|
|
request: Contains context, web_conn_id, user_id, and request.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty WebSocketMessageHasBeenPostedResponse.
|
|
"""
|
|
hook_name = "WebSocketMessageHasBeenPosted"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_command_pb2.WebSocketMessageHasBeenPostedResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.web_conn_id,
|
|
request.user_id,
|
|
request.request,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"WebSocketMessageHasBeenPosted error: {error}")
|
|
|
|
return hooks_command_pb2.WebSocketMessageHasBeenPostedResponse()
|
|
|
|
# =========================================================================
|
|
# CLUSTER HOOKS
|
|
# =========================================================================
|
|
|
|
async def OnPluginClusterEvent(
|
|
self,
|
|
request: hooks_command_pb2.OnPluginClusterEventRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.OnPluginClusterEventResponse:
|
|
"""
|
|
Handle OnPluginClusterEvent notification.
|
|
|
|
Fire-and-forget notification for intra-cluster plugin events.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, and event.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
Empty OnPluginClusterEventResponse.
|
|
"""
|
|
hook_name = "OnPluginClusterEvent"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_command_pb2.OnPluginClusterEventResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
_, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.event,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnPluginClusterEvent error: {error}")
|
|
|
|
return hooks_command_pb2.OnPluginClusterEventResponse()
|
|
|
|
# =========================================================================
|
|
# SHARED CHANNELS HOOKS
|
|
# =========================================================================
|
|
|
|
async def OnSharedChannelsSyncMsg(
|
|
self,
|
|
request: hooks_command_pb2.OnSharedChannelsSyncMsgRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.OnSharedChannelsSyncMsgResponse:
|
|
"""
|
|
Handle OnSharedChannelsSyncMsg hook.
|
|
|
|
Called when a shared channels sync message is received.
|
|
|
|
Args:
|
|
request: Contains context, sync_msg, and remote_cluster.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnSharedChannelsSyncMsgResponse with response or error.
|
|
"""
|
|
hook_name = "OnSharedChannelsSyncMsg"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Not implemented - return empty response (no sync)
|
|
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.sync_msg,
|
|
request.remote_cluster,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnSharedChannelsSyncMsg error: {error}")
|
|
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.on_shared_channels_sync_msg.error",
|
|
where="OnSharedChannelsSyncMsg",
|
|
),
|
|
)
|
|
|
|
# Parse result
|
|
if result is None:
|
|
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse()
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
response, err = result
|
|
if err:
|
|
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse(
|
|
error=_make_app_error(
|
|
message=str(err),
|
|
error_id="plugin.on_shared_channels_sync_msg.error",
|
|
where="OnSharedChannelsSyncMsg",
|
|
),
|
|
)
|
|
if response is not None:
|
|
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse(
|
|
response=response
|
|
)
|
|
elif hasattr(result, "users_last_update_at"):
|
|
# Looks like a SyncResponse
|
|
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse(response=result)
|
|
|
|
return hooks_command_pb2.OnSharedChannelsSyncMsgResponse()
|
|
|
|
async def OnSharedChannelsPing(
|
|
self,
|
|
request: hooks_command_pb2.OnSharedChannelsPingRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.OnSharedChannelsPingResponse:
|
|
"""
|
|
Handle OnSharedChannelsPing hook.
|
|
|
|
Called to check health of shared channels plugin connection.
|
|
Return True if healthy, False otherwise.
|
|
|
|
Args:
|
|
request: Contains context and remote_cluster.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnSharedChannelsPingResponse with healthy status.
|
|
"""
|
|
hook_name = "OnSharedChannelsPing"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Not implemented - assume healthy
|
|
return hooks_command_pb2.OnSharedChannelsPingResponse(healthy=True)
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.remote_cluster,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnSharedChannelsPing error: {error}")
|
|
return hooks_command_pb2.OnSharedChannelsPingResponse(healthy=False)
|
|
|
|
# Result should be a boolean
|
|
healthy = bool(result) if result is not None else True
|
|
|
|
return hooks_command_pb2.OnSharedChannelsPingResponse(healthy=healthy)
|
|
|
|
async def OnSharedChannelsAttachmentSyncMsg(
|
|
self,
|
|
request: hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse:
|
|
"""
|
|
Handle OnSharedChannelsAttachmentSyncMsg hook.
|
|
|
|
Called when a file attachment sync message is received.
|
|
|
|
Args:
|
|
request: Contains context, file_info, post, and remote_cluster.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnSharedChannelsAttachmentSyncMsgResponse with optional error.
|
|
"""
|
|
hook_name = "OnSharedChannelsAttachmentSyncMsg"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.file_info,
|
|
request.post,
|
|
request.remote_cluster,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnSharedChannelsAttachmentSyncMsg error: {error}")
|
|
return hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.on_shared_channels_attachment_sync_msg.error",
|
|
where="OnSharedChannelsAttachmentSyncMsg",
|
|
),
|
|
)
|
|
|
|
# Handler may return an error
|
|
if isinstance(result, Exception) or (isinstance(result, str) and result):
|
|
return hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse(
|
|
error=_make_app_error(
|
|
message=str(result),
|
|
error_id="plugin.on_shared_channels_attachment_sync_msg.error",
|
|
where="OnSharedChannelsAttachmentSyncMsg",
|
|
),
|
|
)
|
|
|
|
return hooks_command_pb2.OnSharedChannelsAttachmentSyncMsgResponse()
|
|
|
|
async def OnSharedChannelsProfileImageSyncMsg(
|
|
self,
|
|
request: hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse:
|
|
"""
|
|
Handle OnSharedChannelsProfileImageSyncMsg hook.
|
|
|
|
Called when a profile image sync message is received.
|
|
|
|
Args:
|
|
request: Contains context, user, and remote_cluster.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnSharedChannelsProfileImageSyncMsgResponse with optional error.
|
|
"""
|
|
hook_name = "OnSharedChannelsProfileImageSyncMsg"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.user,
|
|
request.remote_cluster,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnSharedChannelsProfileImageSyncMsg error: {error}")
|
|
return hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.on_shared_channels_profile_image_sync_msg.error",
|
|
where="OnSharedChannelsProfileImageSyncMsg",
|
|
),
|
|
)
|
|
|
|
# Handler may return an error
|
|
if isinstance(result, Exception) or (isinstance(result, str) and result):
|
|
return hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse(
|
|
error=_make_app_error(
|
|
message=str(result),
|
|
error_id="plugin.on_shared_channels_profile_image_sync_msg.error",
|
|
where="OnSharedChannelsProfileImageSyncMsg",
|
|
),
|
|
)
|
|
|
|
return hooks_command_pb2.OnSharedChannelsProfileImageSyncMsgResponse()
|
|
|
|
# =========================================================================
|
|
# SUPPORT HOOKS
|
|
# =========================================================================
|
|
|
|
async def GenerateSupportData(
|
|
self,
|
|
request: hooks_command_pb2.GenerateSupportDataRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_command_pb2.GenerateSupportDataResponse:
|
|
"""
|
|
Handle GenerateSupportData hook.
|
|
|
|
Called when generating a support packet. Handler should return
|
|
a list of FileData objects to include in the packet.
|
|
|
|
Args:
|
|
request: Contains context and plugin_context.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
GenerateSupportDataResponse with files list and optional error.
|
|
"""
|
|
hook_name = "GenerateSupportData"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_command_pb2.GenerateSupportDataResponse(files=[])
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"GenerateSupportData error: {error}")
|
|
return hooks_command_pb2.GenerateSupportDataResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.generate_support_data.error",
|
|
where="GenerateSupportData",
|
|
),
|
|
files=[],
|
|
)
|
|
|
|
# Parse result
|
|
files = []
|
|
if result is None:
|
|
pass
|
|
elif isinstance(result, tuple) and len(result) == 2:
|
|
file_list, err = result
|
|
if err:
|
|
return hooks_command_pb2.GenerateSupportDataResponse(
|
|
error=_make_app_error(
|
|
message=str(err),
|
|
error_id="plugin.generate_support_data.error",
|
|
where="GenerateSupportData",
|
|
),
|
|
files=[],
|
|
)
|
|
if file_list is not None:
|
|
files = list(file_list)
|
|
elif isinstance(result, (list, tuple)):
|
|
files = list(result)
|
|
|
|
return hooks_command_pb2.GenerateSupportDataResponse(files=files)
|
|
|
|
# =========================================================================
|
|
# SAML HOOKS
|
|
# =========================================================================
|
|
|
|
async def OnSAMLLogin(
|
|
self,
|
|
request: hooks_user_channel_pb2.OnSAMLLoginRequest,
|
|
context: grpc.aio.ServicerContext,
|
|
) -> hooks_user_channel_pb2.OnSAMLLoginResponse:
|
|
"""
|
|
Handle OnSAMLLogin hook.
|
|
|
|
Called after a successful SAML login. Return error to reject.
|
|
|
|
Args:
|
|
request: Contains context, plugin_context, user, and assertion.
|
|
context: The gRPC servicer context.
|
|
|
|
Returns:
|
|
OnSAMLLoginResponse with optional error.
|
|
"""
|
|
hook_name = "OnSAMLLogin"
|
|
|
|
if not self.plugin.has_hook(hook_name):
|
|
return hooks_user_channel_pb2.OnSAMLLoginResponse()
|
|
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
request.plugin_context,
|
|
request.user,
|
|
request.assertion,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"OnSAMLLogin error: {error}")
|
|
return hooks_user_channel_pb2.OnSAMLLoginResponse(
|
|
error=_make_app_error(
|
|
message=str(error),
|
|
error_id="plugin.on_saml_login.error",
|
|
where="OnSAMLLogin",
|
|
),
|
|
)
|
|
|
|
# Handler may return an error
|
|
if isinstance(result, Exception) or (isinstance(result, str) and result):
|
|
return hooks_user_channel_pb2.OnSAMLLoginResponse(
|
|
error=_make_app_error(
|
|
message=str(result),
|
|
error_id="plugin.on_saml_login.error",
|
|
where="OnSAMLLogin",
|
|
),
|
|
)
|
|
|
|
return hooks_user_channel_pb2.OnSAMLLoginResponse()
|
|
|
|
# =========================================================================
|
|
# HTTP STREAMING HOOKS (Phase 8)
|
|
# =========================================================================
|
|
|
|
async def ServeHTTP(
|
|
self,
|
|
request_iterator, # AsyncIterator[hooks_http_pb2.ServeHTTPRequest]
|
|
context: grpc.aio.ServicerContext,
|
|
):
|
|
"""
|
|
Handle ServeHTTP streaming hook.
|
|
|
|
This is a bidirectional streaming RPC:
|
|
- Go server streams HTTP request body chunks to Python
|
|
- Python plugin streams HTTP response body chunks back to Go
|
|
|
|
Request flow:
|
|
- First message contains ServeHTTPRequestInit with headers/metadata
|
|
- Subsequent messages contain body_chunk data
|
|
- body_complete=True signals end of request body
|
|
|
|
Response flow:
|
|
- First message contains ServeHTTPResponseInit with status/headers
|
|
- Subsequent messages contain body_chunk data
|
|
- body_complete=True signals end of response body
|
|
|
|
The handler receives an HTTPRequest-like object and must return
|
|
an HTTPResponse-like object or yield response chunks.
|
|
|
|
Args:
|
|
request_iterator: Async iterator of ServeHTTPRequest messages.
|
|
context: The gRPC servicer context.
|
|
|
|
Yields:
|
|
ServeHTTPResponse messages with init, body chunks, and completion flag.
|
|
"""
|
|
hook_name = "ServeHTTP"
|
|
|
|
# Read the first message to get request metadata
|
|
try:
|
|
first_msg = await request_iterator.__anext__()
|
|
except StopAsyncIteration:
|
|
self._logger.error("ServeHTTP: empty request stream")
|
|
yield hooks_http_pb2.ServeHTTPResponse(
|
|
init=hooks_http_pb2.ServeHTTPResponseInit(
|
|
status_code=500,
|
|
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
|
|
),
|
|
body_chunk=b"Empty request stream",
|
|
body_complete=True,
|
|
)
|
|
return
|
|
|
|
# Extract request init (metadata)
|
|
init = first_msg.init
|
|
if init is None:
|
|
self._logger.error("ServeHTTP: first message missing init")
|
|
yield hooks_http_pb2.ServeHTTPResponse(
|
|
init=hooks_http_pb2.ServeHTTPResponseInit(
|
|
status_code=500,
|
|
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
|
|
),
|
|
body_chunk=b"First message missing init",
|
|
body_complete=True,
|
|
)
|
|
return
|
|
|
|
# Build an HTTP request wrapper for the handler
|
|
http_request = HTTPRequest(
|
|
method=init.method,
|
|
url=init.url,
|
|
proto=init.proto,
|
|
proto_major=init.proto_major,
|
|
proto_minor=init.proto_minor,
|
|
headers=_convert_headers_to_dict(init.headers),
|
|
host=init.host,
|
|
remote_addr=init.remote_addr,
|
|
request_uri=init.request_uri,
|
|
content_length=init.content_length,
|
|
plugin_context=init.plugin_context,
|
|
)
|
|
|
|
# Collect request body from stream
|
|
# For this initial implementation, we buffer the entire body
|
|
# TODO(08-02): Stream body chunks to handler for true streaming
|
|
body_chunks = []
|
|
if first_msg.body_chunk:
|
|
body_chunks.append(first_msg.body_chunk)
|
|
|
|
if not first_msg.body_complete:
|
|
async for msg in request_iterator:
|
|
if msg.body_chunk:
|
|
body_chunks.append(msg.body_chunk)
|
|
if msg.body_complete:
|
|
break
|
|
# Check for cancellation
|
|
if context.cancelled():
|
|
self._logger.debug("ServeHTTP: request cancelled by client")
|
|
return
|
|
|
|
http_request.body = b"".join(body_chunks)
|
|
|
|
# Check if handler is implemented
|
|
if not self.plugin.has_hook(hook_name):
|
|
# Hook not implemented - return 404 Not Found
|
|
yield hooks_http_pb2.ServeHTTPResponse(
|
|
init=hooks_http_pb2.ServeHTTPResponseInit(
|
|
status_code=404,
|
|
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
|
|
),
|
|
body_chunk=b"Not Found",
|
|
body_complete=True,
|
|
)
|
|
return
|
|
|
|
# Invoke handler
|
|
handler = self.plugin.get_hook_handler(hook_name)
|
|
|
|
# Create response writer
|
|
response_writer = HTTPResponseWriter()
|
|
|
|
try:
|
|
# Handler signature: (context, response_writer, request) -> None
|
|
# Similar to Go's http.Handler pattern
|
|
result, error = await self.runner.invoke(
|
|
handler,
|
|
init.plugin_context,
|
|
response_writer,
|
|
http_request,
|
|
hook_name=hook_name,
|
|
context=context,
|
|
)
|
|
|
|
if error is not None:
|
|
self._logger.error(f"ServeHTTP handler error: {error}")
|
|
yield hooks_http_pb2.ServeHTTPResponse(
|
|
init=hooks_http_pb2.ServeHTTPResponseInit(
|
|
status_code=500,
|
|
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
|
|
),
|
|
body_chunk=f"Internal Server Error: {error}".encode("utf-8"),
|
|
body_complete=True,
|
|
)
|
|
return
|
|
|
|
except Exception as e:
|
|
self._logger.exception(f"ServeHTTP exception: {e}")
|
|
yield hooks_http_pb2.ServeHTTPResponse(
|
|
init=hooks_http_pb2.ServeHTTPResponseInit(
|
|
status_code=500,
|
|
headers=[hooks_http_pb2.HTTPHeader(key="Content-Type", values=["text/plain"])],
|
|
),
|
|
body_chunk=f"Internal Server Error".encode("utf-8"),
|
|
body_complete=True,
|
|
)
|
|
return
|
|
|
|
# Send response from response_writer
|
|
# First message: init with status and headers
|
|
response_headers = _convert_dict_to_headers(response_writer.headers)
|
|
status_code = response_writer.status_code or 200
|
|
|
|
# Get pending writes with flush markers
|
|
pending_writes = response_writer.get_pending_writes()
|
|
|
|
if not pending_writes:
|
|
# No body written - send just init with completion flag
|
|
yield hooks_http_pb2.ServeHTTPResponse(
|
|
init=hooks_http_pb2.ServeHTTPResponseInit(
|
|
status_code=status_code,
|
|
headers=response_headers,
|
|
),
|
|
body_complete=True,
|
|
)
|
|
return
|
|
|
|
# Stream response chunks with flush support
|
|
for i, (chunk, flush) in enumerate(pending_writes):
|
|
is_last = (i == len(pending_writes) - 1)
|
|
|
|
if i == 0:
|
|
# First message includes init
|
|
yield hooks_http_pb2.ServeHTTPResponse(
|
|
init=hooks_http_pb2.ServeHTTPResponseInit(
|
|
status_code=status_code,
|
|
headers=response_headers,
|
|
),
|
|
body_chunk=chunk,
|
|
body_complete=is_last,
|
|
flush=flush,
|
|
)
|
|
else:
|
|
# Subsequent messages are body chunks only
|
|
yield hooks_http_pb2.ServeHTTPResponse(
|
|
body_chunk=chunk,
|
|
body_complete=is_last,
|
|
flush=flush,
|
|
)
|
|
|
|
# Check for cancellation between chunks
|
|
if not is_last and context.cancelled():
|
|
self._logger.debug("ServeHTTP: response stream cancelled by client")
|
|
return
|
|
|
|
# =========================================================================
|
|
# DEFERRED HOOKS (Phase 8 - remaining)
|
|
# =========================================================================
|
|
#
|
|
# The following hooks are deferred to Phase 8-02 or later:
|
|
#
|
|
# - ServeMetrics: HTTP request/response streaming over gRPC (same pattern as ServeHTTP)
|
|
# - FileWillBeUploaded: Large file body streaming (may have size limits)
|
|
#
|
|
# These are NOT included in Implemented() until their respective phase adds support.
|
|
# =========================================================================
|
|
|
|
|
|
# =============================================================================
|
|
# HTTP Request/Response Helper Classes
|
|
# =============================================================================
|
|
|
|
|
|
def _convert_headers_to_dict(headers: list) -> dict:
|
|
"""Convert list of HTTPHeader protos to a dict of header name -> list of values."""
|
|
result = {}
|
|
for h in headers:
|
|
key = h.key
|
|
if key not in result:
|
|
result[key] = []
|
|
result[key].extend(h.values)
|
|
return result
|
|
|
|
|
|
def _convert_dict_to_headers(headers: dict) -> list:
|
|
"""Convert dict of headers to list of HTTPHeader protos."""
|
|
result = []
|
|
for key, values in headers.items():
|
|
if isinstance(values, str):
|
|
values = [values]
|
|
result.append(hooks_http_pb2.HTTPHeader(key=key, values=list(values)))
|
|
return result
|
|
|
|
|
|
class HTTPRequest:
|
|
"""
|
|
HTTP request wrapper for ServeHTTP handlers.
|
|
|
|
Provides a Pythonic interface to the streamed HTTP request data.
|
|
"""
|
|
|
|
def __init__(
|
|
self,
|
|
method: str,
|
|
url: str,
|
|
proto: str,
|
|
proto_major: int,
|
|
proto_minor: int,
|
|
headers: dict,
|
|
host: str,
|
|
remote_addr: str,
|
|
request_uri: str,
|
|
content_length: int,
|
|
plugin_context=None,
|
|
):
|
|
self.method = method
|
|
self.url = url
|
|
self.proto = proto
|
|
self.proto_major = proto_major
|
|
self.proto_minor = proto_minor
|
|
self.headers = headers
|
|
self.host = host
|
|
self.remote_addr = remote_addr
|
|
self.request_uri = request_uri
|
|
self.content_length = content_length
|
|
self.plugin_context = plugin_context
|
|
self.body: bytes = b""
|
|
|
|
def get_header(self, name: str, default: str = "") -> str:
|
|
"""Get a header value by name (case-insensitive)."""
|
|
# HTTP headers are case-insensitive
|
|
for key, values in self.headers.items():
|
|
if key.lower() == name.lower():
|
|
return values[0] if values else default
|
|
return default
|
|
|
|
def get_all_headers(self, name: str) -> list:
|
|
"""Get all values for a header (case-insensitive)."""
|
|
for key, values in self.headers.items():
|
|
if key.lower() == name.lower():
|
|
return list(values)
|
|
return []
|
|
|
|
|
|
class HTTPResponseWriter:
|
|
"""
|
|
HTTP response writer for ServeHTTP handlers.
|
|
|
|
Provides a Pythonic interface similar to Go's http.ResponseWriter.
|
|
Supports both buffered and streaming response modes.
|
|
|
|
STREAMING MODE:
|
|
---------------
|
|
For streaming responses, handlers can use the write() method to send
|
|
chunks incrementally, and flush() to request immediate delivery.
|
|
The servicer will send each chunk as a separate gRPC message.
|
|
|
|
HEADER LOCKING:
|
|
---------------
|
|
Headers are immutable after the first write() or write_header() call.
|
|
This matches Go's http.ResponseWriter behavior.
|
|
|
|
FLUSH SEMANTICS:
|
|
----------------
|
|
Flush is best-effort. The Go side will call http.Flusher.Flush() if
|
|
the underlying ResponseWriter supports it; otherwise silently ignored.
|
|
"""
|
|
|
|
# Recommended maximum chunk size (64KB, matching Go's DefaultChunkSize)
|
|
MAX_CHUNK_SIZE = 64 * 1024
|
|
|
|
def __init__(self):
|
|
self.headers: dict = {}
|
|
self.status_code: Optional[int] = None
|
|
self._body_chunks: list = []
|
|
self._headers_written: bool = False
|
|
self._pending_flush: bool = False
|
|
# Pending chunks for streaming mode: list of (chunk, flush_after) tuples
|
|
self._pending_writes: list = []
|
|
|
|
def set_header(self, name: str, value: str) -> None:
|
|
"""
|
|
Set a response header (replaces existing value).
|
|
Must be called before write() or write_header().
|
|
|
|
Args:
|
|
name: Header name (e.g., "Content-Type")
|
|
value: Header value
|
|
"""
|
|
if self._headers_written:
|
|
logger.warning(f"Cannot set header '{name}' after headers have been written")
|
|
return
|
|
self.headers[name] = [value]
|
|
|
|
def add_header(self, name: str, value: str) -> None:
|
|
"""
|
|
Add a value to a response header (for multi-value headers like Set-Cookie).
|
|
Must be called before write() or write_header().
|
|
|
|
Args:
|
|
name: Header name
|
|
value: Header value to add
|
|
"""
|
|
if self._headers_written:
|
|
logger.warning(f"Cannot add header '{name}' after headers have been written")
|
|
return
|
|
if name not in self.headers:
|
|
self.headers[name] = []
|
|
self.headers[name].append(value)
|
|
|
|
def write_header(self, status_code: int) -> None:
|
|
"""
|
|
Write the HTTP status code. Must be called before write().
|
|
If not called explicitly, write() will implicitly call write_header(200).
|
|
|
|
Args:
|
|
status_code: HTTP status code (100-999)
|
|
"""
|
|
if self._headers_written:
|
|
logger.warning("write_header called after headers already written")
|
|
return
|
|
self.status_code = status_code
|
|
self._headers_written = True
|
|
|
|
def write(self, data: bytes) -> int:
|
|
"""
|
|
Write response body data.
|
|
Implicitly calls write_header(200) if not already called.
|
|
|
|
For large responses, consider chunking the data to avoid memory issues.
|
|
Each write() call will be sent as a separate gRPC message in streaming mode.
|
|
|
|
Args:
|
|
data: Bytes to write (or str, which will be UTF-8 encoded)
|
|
|
|
Returns:
|
|
Number of bytes written
|
|
"""
|
|
if not self._headers_written:
|
|
self.write_header(200)
|
|
if isinstance(data, str):
|
|
data = data.encode("utf-8")
|
|
self._body_chunks.append(data)
|
|
# Record pending write with current flush state
|
|
self._pending_writes.append((data, self._pending_flush))
|
|
self._pending_flush = False
|
|
return len(data)
|
|
|
|
def flush(self) -> None:
|
|
"""
|
|
Request an immediate flush of buffered response data to the client.
|
|
|
|
This is best-effort: the Go side will call http.Flusher.Flush() if
|
|
the underlying ResponseWriter supports it; otherwise silently ignored.
|
|
This matches Go plugin RPC behavior.
|
|
|
|
Note: Flush is applied to the next write, or the last write if called
|
|
after writing data.
|
|
"""
|
|
if self._pending_writes:
|
|
# Apply flush to the last pending write
|
|
data, _ = self._pending_writes[-1]
|
|
self._pending_writes[-1] = (data, True)
|
|
else:
|
|
# Flush will apply to next write
|
|
self._pending_flush = True
|
|
|
|
def get_body(self) -> bytes:
|
|
"""Get the full response body (for buffered mode)."""
|
|
return b"".join(self._body_chunks)
|
|
|
|
def get_pending_writes(self) -> list:
|
|
"""
|
|
Get pending writes for streaming mode.
|
|
|
|
Returns:
|
|
List of (chunk: bytes, flush: bool) tuples
|
|
"""
|
|
return self._pending_writes
|
|
|
|
def clear_pending_writes(self) -> None:
|
|
"""Clear pending writes after they've been sent."""
|
|
self._pending_writes = []
|