mattermost/python-sdk/tests/test_hooks_grpc_integration.py
Nick Misasi ab345d7e57 test(07-02): add gRPC integration tests for hook servicer
- Add test_hooks_grpc_integration.py with in-process gRPC tests:
  - Test Implemented RPC returns correct hook list
  - Test lifecycle hooks (OnActivate, OnDeactivate, OnConfigurationChange)
  - Test MessageWillBePosted allow/reject/modify scenarios
  - Test notification hooks (MessageHasBeenPosted)
  - Test activation failure propagation via response.error
  - Test async handlers work through gRPC
- Fix lifecycle hooks to encode errors in response, not gRPC status
  - Remove context parameter from runner.invoke() for lifecycle hooks
  - Ensures activation failures are returned as AppError, not gRPC error

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-19 11:27:41 -05:00

355 lines
12 KiB
Python

# Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
# See LICENSE.txt for license information.
"""
Integration tests for hook servicer using real gRPC server.
These tests start an in-process gRPC server with the hook servicer registered
and use the generated stub to invoke methods, verifying end-to-end behavior.
"""
import asyncio
import pytest
import grpc
from grpc import aio as grpc_aio
from mattermost_plugin import Plugin, hook, HookName
from mattermost_plugin.servicers.hooks_servicer import (
PluginHooksServicerImpl,
DISMISS_POST_ERROR,
)
from mattermost_plugin.grpc import hooks_pb2_grpc
from mattermost_plugin.grpc import hooks_lifecycle_pb2
from mattermost_plugin.grpc import hooks_message_pb2
from mattermost_plugin.grpc import hooks_common_pb2
from mattermost_plugin.grpc import post_pb2
def make_test_post(message: str = "test message", post_id: str = "post123") -> post_pb2.Post:
"""Create a test Post protobuf message."""
return post_pb2.Post(
id=post_id,
message=message,
user_id="user123",
channel_id="channel123",
)
def make_plugin_context() -> hooks_common_pb2.PluginContext:
"""Create a test PluginContext."""
return hooks_common_pb2.PluginContext(
session_id="session123",
request_id="request123",
)
@pytest.fixture
def sample_plugin():
"""Create a sample plugin with various hooks implemented."""
class SamplePlugin(Plugin):
def __init__(self):
super().__init__()
self.activated = False
self.deactivated = False
self.config_changed = False
self.received_posts = []
@hook(HookName.OnActivate)
def on_activate(self) -> None:
self.activated = True
@hook(HookName.OnDeactivate)
def on_deactivate(self) -> None:
self.deactivated = True
@hook(HookName.OnConfigurationChange)
def on_config_change(self) -> None:
self.config_changed = True
@hook(HookName.MessageWillBePosted)
def filter_message(self, ctx, post):
self.received_posts.append(post.message)
if "spam" in post.message.lower():
return None, "Spam detected"
if "uppercase" in post.message.lower():
modified = post_pb2.Post()
modified.CopyFrom(post)
modified.message = post.message.upper()
return modified, ""
return None, ""
@hook(HookName.MessageHasBeenPosted)
def on_message_posted(self, ctx, post) -> None:
# Just track that we received it
pass
return SamplePlugin()
@pytest.fixture
async def grpc_server_and_stub(sample_plugin):
"""
Create a gRPC server with hook servicer and return the stub.
This fixture starts an async gRPC server on an ephemeral port
and yields the stub for making RPC calls.
"""
# Create server
server = grpc_aio.server()
# Register hook servicer
servicer = PluginHooksServicerImpl(sample_plugin)
hooks_pb2_grpc.add_PluginHooksServicer_to_server(servicer, server)
# Bind to ephemeral port
port = server.add_insecure_port("127.0.0.1:0")
# Start server
await server.start()
# Create channel and stub
channel = grpc_aio.insecure_channel(f"127.0.0.1:{port}")
stub = hooks_pb2_grpc.PluginHooksStub(channel)
yield stub, sample_plugin
# Cleanup
await channel.close()
await server.stop(grace=0)
class TestGrpcImplemented:
"""Test Implemented RPC via gRPC."""
@pytest.mark.asyncio
async def test_returns_implemented_hooks(self, grpc_server_and_stub):
"""Test that Implemented returns the correct hook list."""
stub, plugin = grpc_server_and_stub
request = hooks_lifecycle_pb2.ImplementedRequest()
response = await stub.Implemented(request)
hooks = list(response.hooks)
assert "OnActivate" in hooks
assert "OnDeactivate" in hooks
assert "OnConfigurationChange" in hooks
assert "MessageWillBePosted" in hooks
assert "MessageHasBeenPosted" in hooks
class TestGrpcLifecycleHooks:
"""Test lifecycle hooks via gRPC."""
@pytest.mark.asyncio
async def test_on_activate_success(self, grpc_server_and_stub):
"""Test successful activation via gRPC."""
stub, plugin = grpc_server_and_stub
request = hooks_lifecycle_pb2.OnActivateRequest()
response = await stub.OnActivate(request)
assert not response.HasField("error")
assert plugin.activated is True
@pytest.mark.asyncio
async def test_on_deactivate_success(self, grpc_server_and_stub):
"""Test deactivation via gRPC."""
stub, plugin = grpc_server_and_stub
request = hooks_lifecycle_pb2.OnDeactivateRequest()
response = await stub.OnDeactivate(request)
# Response should succeed even if deactivate fails
assert plugin.deactivated is True
@pytest.mark.asyncio
async def test_on_configuration_change_success(self, grpc_server_and_stub):
"""Test configuration change via gRPC."""
stub, plugin = grpc_server_and_stub
request = hooks_lifecycle_pb2.OnConfigurationChangeRequest()
response = await stub.OnConfigurationChange(request)
assert not response.HasField("error")
assert plugin.config_changed is True
class TestGrpcMessageWillBePosted:
"""Test MessageWillBePosted hook via gRPC."""
@pytest.mark.asyncio
async def test_allow_post(self, grpc_server_and_stub):
"""Test allowing a post through."""
stub, plugin = grpc_server_and_stub
request = hooks_message_pb2.MessageWillBePostedRequest(
plugin_context=make_plugin_context(),
post=make_test_post("normal message"),
)
response = await stub.MessageWillBePosted(request)
assert response.rejection_reason == ""
assert not response.HasField("modified_post")
assert "normal message" in plugin.received_posts
@pytest.mark.asyncio
async def test_reject_spam(self, grpc_server_and_stub):
"""Test rejecting spam posts."""
stub, plugin = grpc_server_and_stub
request = hooks_message_pb2.MessageWillBePostedRequest(
plugin_context=make_plugin_context(),
post=make_test_post("Buy spam now!"),
)
response = await stub.MessageWillBePosted(request)
assert response.rejection_reason == "Spam detected"
@pytest.mark.asyncio
async def test_modify_post(self, grpc_server_and_stub):
"""Test modifying a post to uppercase."""
stub, plugin = grpc_server_and_stub
request = hooks_message_pb2.MessageWillBePostedRequest(
plugin_context=make_plugin_context(),
post=make_test_post("convert to uppercase please"),
)
response = await stub.MessageWillBePosted(request)
assert response.rejection_reason == ""
assert response.HasField("modified_post")
assert response.modified_post.message == "CONVERT TO UPPERCASE PLEASE"
class TestGrpcMessageHasBeenPosted:
"""Test MessageHasBeenPosted notification via gRPC."""
@pytest.mark.asyncio
async def test_notification_succeeds(self, grpc_server_and_stub):
"""Test that notification hook responds successfully."""
stub, plugin = grpc_server_and_stub
request = hooks_message_pb2.MessageHasBeenPostedRequest(
plugin_context=make_plugin_context(),
post=make_test_post(),
)
response = await stub.MessageHasBeenPosted(request)
# Notification hooks always return empty response
assert isinstance(response, hooks_message_pb2.MessageHasBeenPostedResponse)
class TestGrpcActivationFailure:
"""Test activation failure scenarios."""
@pytest.mark.asyncio
async def test_activation_failure_propagates(self):
"""Test that activation failure is returned in response."""
class FailingPlugin(Plugin):
@hook(HookName.OnActivate)
def on_activate(self) -> None:
raise RuntimeError("Database connection failed")
plugin = FailingPlugin()
# Create server
server = grpc_aio.server()
servicer = PluginHooksServicerImpl(plugin)
hooks_pb2_grpc.add_PluginHooksServicer_to_server(servicer, server)
port = server.add_insecure_port("127.0.0.1:0")
await server.start()
try:
channel = grpc_aio.insecure_channel(f"127.0.0.1:{port}")
stub = hooks_pb2_grpc.PluginHooksStub(channel)
request = hooks_lifecycle_pb2.OnActivateRequest()
response = await stub.OnActivate(request)
assert response.HasField("error")
# Error message should contain the exception info
assert "error" in response.error.message.lower() or "Database" in response.error.message
await channel.close()
finally:
await server.stop(grace=0)
class TestGrpcAsyncHandlers:
"""Test async hook handlers via gRPC."""
@pytest.mark.asyncio
async def test_async_activate_handler(self):
"""Test that async handlers work through gRPC."""
class AsyncPlugin(Plugin):
def __init__(self):
super().__init__()
self.activated = False
@hook(HookName.OnActivate)
async def on_activate(self) -> None:
await asyncio.sleep(0.01) # Simulate async work
self.activated = True
plugin = AsyncPlugin()
server = grpc_aio.server()
servicer = PluginHooksServicerImpl(plugin)
hooks_pb2_grpc.add_PluginHooksServicer_to_server(servicer, server)
port = server.add_insecure_port("127.0.0.1:0")
await server.start()
try:
channel = grpc_aio.insecure_channel(f"127.0.0.1:{port}")
stub = hooks_pb2_grpc.PluginHooksStub(channel)
request = hooks_lifecycle_pb2.OnActivateRequest()
response = await stub.OnActivate(request)
assert not response.HasField("error")
assert plugin.activated is True
await channel.close()
finally:
await server.stop(grace=0)
@pytest.mark.asyncio
async def test_async_message_handler(self):
"""Test async message handler through gRPC."""
class AsyncMessagePlugin(Plugin):
@hook(HookName.MessageWillBePosted)
async def filter(self, ctx, post):
await asyncio.sleep(0.01) # Simulate async work
modified = post_pb2.Post()
modified.CopyFrom(post)
modified.message = "async: " + post.message
return modified, ""
plugin = AsyncMessagePlugin()
server = grpc_aio.server()
servicer = PluginHooksServicerImpl(plugin)
hooks_pb2_grpc.add_PluginHooksServicer_to_server(servicer, server)
port = server.add_insecure_port("127.0.0.1:0")
await server.start()
try:
channel = grpc_aio.insecure_channel(f"127.0.0.1:{port}")
stub = hooks_pb2_grpc.PluginHooksStub(channel)
request = hooks_message_pb2.MessageWillBePostedRequest(
plugin_context=make_plugin_context(),
post=make_test_post("hello"),
)
response = await stub.MessageWillBePosted(request)
assert response.modified_post.message == "async: hello"
await channel.close()
finally:
await server.stop(grace=0)