mattermost/server/public/plugin/hooks_grpc_client.go
Nick Misasi 01643af641 debug: add extensive logging to trace hook registration flow
Go side:
- Log hooks returned by Implemented()
- Log each hook name -> ID mapping
- Log OnActivate implementation status
- Log OnActivate call flow

Python side:
- Log Implemented() return value
- Log OnActivate gRPC receipt and handler invocation

This is temporary debug logging to diagnose why OnActivate
isn't being called for Python plugins.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
2026-01-20 09:12:22 -05:00

1901 lines
54 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package plugin
import (
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"sync"
"time"
saml2 "github.com/mattermost/gosaml2"
"google.golang.org/grpc"
"google.golang.org/protobuf/types/known/structpb"
"github.com/mattermost/mattermost/server/public/model"
pb "github.com/mattermost/mattermost/server/public/pluginapi/grpc/generated/go/pluginapiv1"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
const (
// defaultGRPCTimeout is the default timeout for gRPC hook calls.
defaultGRPCTimeout = 30 * time.Second
// serveHTTPChunkSize is the chunk size for ServeHTTP streaming (64KB).
serveHTTPChunkSize = 64 * 1024
)
// init registers hook names that are not in client_rpc_generated.go
// These hooks are handled specially (not auto-generated) but Python plugins
// still need them in the hookNameToId map for the Implemented() mechanism.
func init() {
hookNameToId["OnActivate"] = OnActivateID
hookNameToId["ServeHTTP"] = ServeHTTPID
hookNameToId["MessageWillBePosted"] = MessageWillBePostedID
hookNameToId["MessageWillBeUpdated"] = MessageWillBeUpdatedID
hookNameToId["ServeMetrics"] = ServeMetricsID
}
// hooksGRPCClient implements the Hooks interface by delegating to a gRPC PluginHooksClient.
// This enables Python plugins to receive hook invocations through the same infrastructure
// as Go plugins.
type hooksGRPCClient struct {
client pb.PluginHooksClient
implemented [TotalHooksID]bool
log *mlog.Logger
}
// Compile-time check to ensure hooksGRPCClient implements Hooks interface.
var _ Hooks = (*hooksGRPCClient)(nil)
// newHooksGRPCClient creates a new hooksGRPCClient.
// It calls Implemented() to populate the implemented hooks array.
func newHooksGRPCClient(conn grpc.ClientConnInterface, log *mlog.Logger) (*hooksGRPCClient, error) {
client := &hooksGRPCClient{
client: pb.NewPluginHooksClient(conn),
log: log,
}
// Query which hooks the plugin implements
hooks, err := client.Implemented()
if err != nil {
return nil, fmt.Errorf("failed to query implemented hooks: %w", err)
}
// DEBUG: Log the hooks returned by the plugin
log.Debug("Python plugin Implemented() returned hooks", mlog.Any("hooks", hooks))
// Populate the implemented array
for _, hookName := range hooks {
if hookID, ok := hookNameToId[hookName]; ok {
client.implemented[hookID] = true
log.Debug("Registered hook", mlog.String("name", hookName), mlog.Int("id", hookID))
} else {
log.Warn("Unknown hook name from Python plugin", mlog.String("name", hookName))
}
}
// DEBUG: Log OnActivate status
log.Debug("OnActivate implementation status",
mlog.Bool("implemented", client.implemented[OnActivateID]),
mlog.Int("OnActivateID", OnActivateID))
return client, nil
}
// =============================================================================
// Lifecycle Hooks
// =============================================================================
// Implemented returns the list of hooks that the plugin implements.
func (h *hooksGRPCClient) Implemented() ([]string, error) {
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.Implemented(ctx, &pb.ImplementedRequest{})
if err != nil {
return nil, fmt.Errorf("gRPC Implemented call failed: %w", err)
}
if resp.GetError() != nil {
return nil, appErrorFromProto(resp.GetError())
}
return resp.GetHooks(), nil
}
// OnActivate is invoked when the plugin is activated.
func (h *hooksGRPCClient) OnActivate() error {
h.log.Debug("OnActivate called", mlog.Bool("implemented", h.implemented[OnActivateID]))
if !h.implemented[OnActivateID] {
h.log.Debug("OnActivate not implemented, skipping")
return nil
}
h.log.Debug("Calling gRPC OnActivate")
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.OnActivate(ctx, &pb.OnActivateRequest{})
if err != nil {
h.log.Error("gRPC OnActivate call failed", mlog.Err(err))
return fmt.Errorf("gRPC OnActivate call failed: %w", err)
}
if resp.GetError() != nil {
h.log.Error("OnActivate returned error", mlog.String("error", resp.GetError().GetMessage()))
return appErrorFromProto(resp.GetError())
}
h.log.Debug("OnActivate completed successfully")
return nil
}
// OnDeactivate is invoked when the plugin is deactivated.
func (h *hooksGRPCClient) OnDeactivate() error {
if !h.implemented[OnDeactivateID] {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.OnDeactivate(ctx, &pb.OnDeactivateRequest{})
if err != nil {
h.log.Error("gRPC OnDeactivate call failed", mlog.Err(err))
return fmt.Errorf("gRPC OnDeactivate call failed: %w", err)
}
if resp.GetError() != nil {
return appErrorFromProto(resp.GetError())
}
return nil
}
// OnConfigurationChange is invoked when configuration changes may have been made.
func (h *hooksGRPCClient) OnConfigurationChange() error {
if !h.implemented[OnConfigurationChangeID] {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.OnConfigurationChange(ctx, &pb.OnConfigurationChangeRequest{})
if err != nil {
h.log.Error("gRPC OnConfigurationChange call failed", mlog.Err(err))
return fmt.Errorf("gRPC OnConfigurationChange call failed: %w", err)
}
if resp.GetError() != nil {
return appErrorFromProto(resp.GetError())
}
return nil
}
// =============================================================================
// ServeHTTP with Bidirectional Streaming
// =============================================================================
// ServeHTTP handles HTTP requests using bidirectional streaming.
func (h *hooksGRPCClient) ServeHTTP(c *Context, w http.ResponseWriter, r *http.Request) {
if !h.implemented[ServeHTTPID] {
http.NotFound(w, r)
return
}
// Use request context for cancellation propagation
ctx := r.Context()
// Start bidirectional stream
stream, err := h.client.ServeHTTP(ctx)
if err != nil {
h.log.Error("gRPC ServeHTTP stream failed to open", mlog.Err(err))
http.Error(w, "Service Unavailable", http.StatusServiceUnavailable)
return
}
// Cancel context for sender goroutine when we're done
cancelCtx, cancelSend := context.WithCancel(ctx)
defer cancelSend()
// Channel for send errors
sendErrCh := make(chan error, 1)
// WaitGroup to ensure send goroutine finishes
var wg sync.WaitGroup
wg.Add(1)
// Send request in a goroutine
go func() {
defer wg.Done()
sendErrCh <- h.sendHTTPRequest(cancelCtx, stream, c, r)
}()
// Receive and process response
if err := h.receiveHTTPResponse(stream, w); err != nil {
cancelSend()
wg.Wait()
// Error already logged in receiveHTTPResponse
return
}
// Wait for sender to finish
wg.Wait()
// Check for send errors (non-blocking)
select {
case sendErr := <-sendErrCh:
if sendErr != nil && sendErr != context.Canceled {
h.log.Error("gRPC ServeHTTP send error", mlog.Err(sendErr))
}
default:
}
}
// sendHTTPRequest sends the HTTP request to the plugin as a stream.
func (h *hooksGRPCClient) sendHTTPRequest(ctx context.Context, stream pb.PluginHooks_ServeHTTPClient, c *Context, r *http.Request) error {
defer stream.CloseSend()
// Build request init
init := h.buildServeHTTPRequestInit(c, r)
// Handle nil body
if r.Body == nil {
return stream.Send(&pb.ServeHTTPRequest{
Init: init,
BodyComplete: true,
})
}
defer r.Body.Close()
// Read body in chunks
buf := make([]byte, serveHTTPChunkSize)
firstMessage := true
for {
// Check for cancellation
select {
case <-stream.Context().Done():
return stream.Context().Err()
case <-ctx.Done():
return ctx.Err()
default:
}
n, err := r.Body.Read(buf)
isEOF := err == io.EOF
if firstMessage {
msg := &pb.ServeHTTPRequest{
Init: init,
BodyComplete: isEOF,
}
if n > 0 {
msg.BodyChunk = buf[:n]
}
if sendErr := stream.Send(msg); sendErr != nil {
return sendErr
}
firstMessage = false
} else if n > 0 || isEOF {
msg := &pb.ServeHTTPRequest{
BodyComplete: isEOF,
}
if n > 0 {
msg.BodyChunk = buf[:n]
}
if sendErr := stream.Send(msg); sendErr != nil {
return sendErr
}
}
if isEOF {
break
}
if err != nil {
return err
}
}
return nil
}
// buildServeHTTPRequestInit creates the init message for ServeHTTP.
func (h *hooksGRPCClient) buildServeHTTPRequestInit(c *Context, r *http.Request) *pb.ServeHTTPRequestInit {
init := &pb.ServeHTTPRequestInit{
Method: r.Method,
Url: r.URL.String(),
Proto: r.Proto,
ProtoMajor: int32(r.ProtoMajor),
ProtoMinor: int32(r.ProtoMinor),
Host: r.Host,
RemoteAddr: r.RemoteAddr,
RequestUri: r.RequestURI,
ContentLength: r.ContentLength,
Headers: convertHTTPHeadersToProto(r.Header),
}
if c != nil {
init.PluginContext = &pb.PluginContext{
SessionId: c.SessionId,
RequestId: c.RequestId,
IpAddress: c.IPAddress,
AcceptLanguage: c.AcceptLanguage,
UserAgent: c.UserAgent,
}
}
return init
}
// convertHTTPHeadersToProto converts http.Header to proto HTTPHeader messages.
func convertHTTPHeadersToProto(h http.Header) []*pb.HTTPHeader {
headers := make([]*pb.HTTPHeader, 0, len(h))
for key, values := range h {
headers = append(headers, &pb.HTTPHeader{
Key: key,
Values: values,
})
}
return headers
}
// receiveHTTPResponse receives and processes the HTTP response from the stream.
func (h *hooksGRPCClient) receiveHTTPResponse(stream pb.PluginHooks_ServeHTTPClient, w http.ResponseWriter) error {
// Receive first response with headers
firstResp, err := stream.Recv()
if err != nil {
h.log.Error("gRPC ServeHTTP failed to receive response", mlog.Err(err))
http.Error(w, "Bad Gateway", http.StatusBadGateway)
return err
}
// Write response headers
if err := h.writeHTTPResponseHeaders(w, firstResp.GetInit()); err != nil {
return err
}
// Write first body chunk if present
if len(firstResp.GetBodyChunk()) > 0 {
if _, err := w.Write(firstResp.GetBodyChunk()); err != nil {
return err
}
}
// Handle flush on first message
if firstResp.GetFlush() {
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
// Check if response is complete
if firstResp.GetBodyComplete() {
return nil
}
// Stream remaining response body
for {
resp, err := stream.Recv()
if err == io.EOF {
return nil
}
if err != nil {
h.log.Error("gRPC ServeHTTP failed to receive response chunk", mlog.Err(err))
return err
}
if len(resp.GetBodyChunk()) > 0 {
if _, err := w.Write(resp.GetBodyChunk()); err != nil {
return err
}
}
if resp.GetFlush() {
if flusher, ok := w.(http.Flusher); ok {
flusher.Flush()
}
}
if resp.GetBodyComplete() {
return nil
}
}
}
// writeHTTPResponseHeaders writes the HTTP response headers.
func (h *hooksGRPCClient) writeHTTPResponseHeaders(w http.ResponseWriter, init *pb.ServeHTTPResponseInit) error {
if init == nil {
w.WriteHeader(http.StatusOK)
return nil
}
statusCode := int(init.GetStatusCode())
if statusCode == 0 {
statusCode = http.StatusOK
}
// Validate status code (prevent panic from invalid codes)
if statusCode < 100 || statusCode > 999 {
h.log.Error(fmt.Sprintf("Plugin tried to write invalid HTTP status code: %d", statusCode))
http.Error(w, "Internal Server Error", http.StatusInternalServerError)
return fmt.Errorf("invalid status code: %d", statusCode)
}
// Copy headers
for _, header := range init.GetHeaders() {
for _, v := range header.GetValues() {
w.Header().Add(header.GetKey(), v)
}
}
w.WriteHeader(statusCode)
return nil
}
// =============================================================================
// Message Hooks
// =============================================================================
// MessageWillBePosted is invoked when a message is posted before it is committed.
func (h *hooksGRPCClient) MessageWillBePosted(c *Context, post *model.Post) (*model.Post, string) {
if !h.implemented[MessageWillBePostedID] {
return post, ""
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.MessageWillBePosted(ctx, &pb.MessageWillBePostedRequest{
PluginContext: pluginContextToProto(c),
Post: postToProto(post),
})
if err != nil {
h.log.Error("gRPC MessageWillBePosted call failed", mlog.Err(err))
return post, ""
}
if resp.GetRejectionReason() != "" {
return nil, resp.GetRejectionReason()
}
if resp.GetModifiedPost() != nil {
return postFromProto(resp.GetModifiedPost()), ""
}
return post, ""
}
// MessageWillBeUpdated is invoked when a message is updated before it is committed.
func (h *hooksGRPCClient) MessageWillBeUpdated(c *Context, newPost, oldPost *model.Post) (*model.Post, string) {
if !h.implemented[MessageWillBeUpdatedID] {
return newPost, ""
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.MessageWillBeUpdated(ctx, &pb.MessageWillBeUpdatedRequest{
PluginContext: pluginContextToProto(c),
NewPost: postToProto(newPost),
OldPost: postToProto(oldPost),
})
if err != nil {
h.log.Error("gRPC MessageWillBeUpdated call failed", mlog.Err(err))
return newPost, ""
}
if resp.GetRejectionReason() != "" {
return nil, resp.GetRejectionReason()
}
if resp.GetModifiedPost() != nil {
return postFromProto(resp.GetModifiedPost()), ""
}
return newPost, ""
}
// MessageHasBeenPosted is invoked after the message has been committed.
func (h *hooksGRPCClient) MessageHasBeenPosted(c *Context, post *model.Post) {
if !h.implemented[MessageHasBeenPostedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.MessageHasBeenPosted(ctx, &pb.MessageHasBeenPostedRequest{
PluginContext: pluginContextToProto(c),
Post: postToProto(post),
})
if err != nil {
h.log.Error("gRPC MessageHasBeenPosted call failed", mlog.Err(err))
}
}
// MessageHasBeenUpdated is invoked after a message update has been committed.
func (h *hooksGRPCClient) MessageHasBeenUpdated(c *Context, newPost, oldPost *model.Post) {
if !h.implemented[MessageHasBeenUpdatedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.MessageHasBeenUpdated(ctx, &pb.MessageHasBeenUpdatedRequest{
PluginContext: pluginContextToProto(c),
NewPost: postToProto(newPost),
OldPost: postToProto(oldPost),
})
if err != nil {
h.log.Error("gRPC MessageHasBeenUpdated call failed", mlog.Err(err))
}
}
// MessagesWillBeConsumed is invoked when messages are requested by a client.
func (h *hooksGRPCClient) MessagesWillBeConsumed(posts []*model.Post) []*model.Post {
if !h.implemented[MessagesWillBeConsumedID] {
return posts
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
pbPosts := make([]*pb.Post, len(posts))
for i, p := range posts {
pbPosts[i] = postToProto(p)
}
resp, err := h.client.MessagesWillBeConsumed(ctx, &pb.MessagesWillBeConsumedRequest{
Posts: pbPosts,
})
if err != nil {
h.log.Error("gRPC MessagesWillBeConsumed call failed", mlog.Err(err))
return posts
}
if resp.GetPosts() != nil {
result := make([]*model.Post, len(resp.GetPosts()))
for i, p := range resp.GetPosts() {
result[i] = postFromProto(p)
}
return result
}
return posts
}
// MessageHasBeenDeleted is invoked after a message has been deleted.
func (h *hooksGRPCClient) MessageHasBeenDeleted(c *Context, post *model.Post) {
if !h.implemented[MessageHasBeenDeletedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.MessageHasBeenDeleted(ctx, &pb.MessageHasBeenDeletedRequest{
PluginContext: pluginContextToProto(c),
Post: postToProto(post),
})
if err != nil {
h.log.Error("gRPC MessageHasBeenDeleted call failed", mlog.Err(err))
}
}
// =============================================================================
// User Hooks
// =============================================================================
// UserHasBeenCreated is invoked after a user was created.
func (h *hooksGRPCClient) UserHasBeenCreated(c *Context, user *model.User) {
if !h.implemented[UserHasBeenCreatedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.UserHasBeenCreated(ctx, &pb.UserHasBeenCreatedRequest{
PluginContext: pluginContextToProto(c),
User: userToProto(user),
})
if err != nil {
h.log.Error("gRPC UserHasBeenCreated call failed", mlog.Err(err))
}
}
// UserWillLogIn is invoked before the login of the user is returned.
func (h *hooksGRPCClient) UserWillLogIn(c *Context, user *model.User) string {
if !h.implemented[UserWillLogInID] {
return ""
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.UserWillLogIn(ctx, &pb.UserWillLogInRequest{
PluginContext: pluginContextToProto(c),
User: userToProto(user),
})
if err != nil {
h.log.Error("gRPC UserWillLogIn call failed", mlog.Err(err))
return ""
}
return resp.GetRejectionReason()
}
// UserHasLoggedIn is invoked after a user has logged in.
func (h *hooksGRPCClient) UserHasLoggedIn(c *Context, user *model.User) {
if !h.implemented[UserHasLoggedInID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.UserHasLoggedIn(ctx, &pb.UserHasLoggedInRequest{
PluginContext: pluginContextToProto(c),
User: userToProto(user),
})
if err != nil {
h.log.Error("gRPC UserHasLoggedIn call failed", mlog.Err(err))
}
}
// UserHasBeenDeactivated is invoked when a user is deactivated.
func (h *hooksGRPCClient) UserHasBeenDeactivated(c *Context, user *model.User) {
if !h.implemented[UserHasBeenDeactivatedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.UserHasBeenDeactivated(ctx, &pb.UserHasBeenDeactivatedRequest{
PluginContext: pluginContextToProto(c),
User: userToProto(user),
})
if err != nil {
h.log.Error("gRPC UserHasBeenDeactivated call failed", mlog.Err(err))
}
}
// =============================================================================
// Channel/Team Hooks
// =============================================================================
// ChannelHasBeenCreated is invoked after a channel has been created.
func (h *hooksGRPCClient) ChannelHasBeenCreated(c *Context, channel *model.Channel) {
if !h.implemented[ChannelHasBeenCreatedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.ChannelHasBeenCreated(ctx, &pb.ChannelHasBeenCreatedRequest{
PluginContext: pluginContextToProto(c),
Channel: channelToProto(channel),
})
if err != nil {
h.log.Error("gRPC ChannelHasBeenCreated call failed", mlog.Err(err))
}
}
// UserHasJoinedChannel is invoked after a user has joined a channel.
func (h *hooksGRPCClient) UserHasJoinedChannel(c *Context, channelMember *model.ChannelMember, actor *model.User) {
if !h.implemented[UserHasJoinedChannelID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.UserHasJoinedChannel(ctx, &pb.UserHasJoinedChannelRequest{
PluginContext: pluginContextToProto(c),
ChannelMember: channelMemberToProto(channelMember),
Actor: userToProto(actor),
})
if err != nil {
h.log.Error("gRPC UserHasJoinedChannel call failed", mlog.Err(err))
}
}
// UserHasLeftChannel is invoked after a user has left a channel.
func (h *hooksGRPCClient) UserHasLeftChannel(c *Context, channelMember *model.ChannelMember, actor *model.User) {
if !h.implemented[UserHasLeftChannelID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.UserHasLeftChannel(ctx, &pb.UserHasLeftChannelRequest{
PluginContext: pluginContextToProto(c),
ChannelMember: channelMemberToProto(channelMember),
Actor: userToProto(actor),
})
if err != nil {
h.log.Error("gRPC UserHasLeftChannel call failed", mlog.Err(err))
}
}
// UserHasJoinedTeam is invoked after a user has joined a team.
func (h *hooksGRPCClient) UserHasJoinedTeam(c *Context, teamMember *model.TeamMember, actor *model.User) {
if !h.implemented[UserHasJoinedTeamID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.UserHasJoinedTeam(ctx, &pb.UserHasJoinedTeamRequest{
PluginContext: pluginContextToProto(c),
TeamMember: teamMemberToProto(teamMember),
Actor: userToProto(actor),
})
if err != nil {
h.log.Error("gRPC UserHasJoinedTeam call failed", mlog.Err(err))
}
}
// UserHasLeftTeam is invoked after a user has left a team.
func (h *hooksGRPCClient) UserHasLeftTeam(c *Context, teamMember *model.TeamMember, actor *model.User) {
if !h.implemented[UserHasLeftTeamID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.UserHasLeftTeam(ctx, &pb.UserHasLeftTeamRequest{
PluginContext: pluginContextToProto(c),
TeamMember: teamMemberToProto(teamMember),
Actor: userToProto(actor),
})
if err != nil {
h.log.Error("gRPC UserHasLeftTeam call failed", mlog.Err(err))
}
}
// =============================================================================
// Command/WebSocket Hooks
// =============================================================================
// ExecuteCommand executes a registered slash command.
func (h *hooksGRPCClient) ExecuteCommand(c *Context, args *model.CommandArgs) (*model.CommandResponse, *model.AppError) {
if !h.implemented[ExecuteCommandID] {
return nil, nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.ExecuteCommand(ctx, &pb.ExecuteCommandRequest{
PluginContext: pluginContextToProto(c),
Args: commandArgsToProto(args),
})
if err != nil {
h.log.Error("gRPC ExecuteCommand call failed", mlog.Err(err))
return nil, model.NewAppError("ExecuteCommand", "plugin.grpc.execute_command.error", nil, err.Error(), http.StatusInternalServerError)
}
if resp.GetError() != nil {
return nil, appErrorFromProto(resp.GetError())
}
return commandResponseFromProto(resp.GetResponse()), nil
}
// OnWebSocketConnect is invoked when a new WebSocket connection is opened.
func (h *hooksGRPCClient) OnWebSocketConnect(webConnID, userID string) {
if !h.implemented[OnWebSocketConnectID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.OnWebSocketConnect(ctx, &pb.OnWebSocketConnectRequest{
WebConnId: webConnID,
UserId: userID,
})
if err != nil {
h.log.Error("gRPC OnWebSocketConnect call failed", mlog.Err(err))
}
}
// OnWebSocketDisconnect is invoked when a WebSocket connection is closed.
func (h *hooksGRPCClient) OnWebSocketDisconnect(webConnID, userID string) {
if !h.implemented[OnWebSocketDisconnectID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.OnWebSocketDisconnect(ctx, &pb.OnWebSocketDisconnectRequest{
WebConnId: webConnID,
UserId: userID,
})
if err != nil {
h.log.Error("gRPC OnWebSocketDisconnect call failed", mlog.Err(err))
}
}
// WebSocketMessageHasBeenPosted is invoked when a WebSocket message is received.
func (h *hooksGRPCClient) WebSocketMessageHasBeenPosted(webConnID, userID string, req *model.WebSocketRequest) {
if !h.implemented[WebSocketMessageHasBeenPostedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.WebSocketMessageHasBeenPosted(ctx, &pb.WebSocketMessageHasBeenPostedRequest{
WebConnId: webConnID,
UserId: userID,
Request: webSocketRequestToProto(req),
})
if err != nil {
h.log.Error("gRPC WebSocketMessageHasBeenPosted call failed", mlog.Err(err))
}
}
// =============================================================================
// Remaining Hooks
// =============================================================================
// FileWillBeUploaded is invoked when a file is uploaded before it is committed.
func (h *hooksGRPCClient) FileWillBeUploaded(c *Context, info *model.FileInfo, file io.Reader, output io.Writer) (*model.FileInfo, string) {
if !h.implemented[FileWillBeUploadedID] {
return info, ""
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
// Read file content (for now, read entire file - streaming to be added later)
fileContent, err := io.ReadAll(file)
if err != nil {
h.log.Error("Failed to read file for FileWillBeUploaded", mlog.Err(err))
return info, ""
}
resp, err := h.client.FileWillBeUploaded(ctx, &pb.FileWillBeUploadedRequest{
PluginContext: pluginContextToProto(c),
FileInfo: fileInfoToProto(info),
FileContent: fileContent,
})
if err != nil {
h.log.Error("gRPC FileWillBeUploaded call failed", mlog.Err(err))
return info, ""
}
if resp.GetRejectionReason() != "" {
return nil, resp.GetRejectionReason()
}
// Write modified content to output
if len(resp.GetModifiedContent()) > 0 {
if _, err := output.Write(resp.GetModifiedContent()); err != nil {
h.log.Error("Failed to write modified file content", mlog.Err(err))
}
}
if resp.GetModifiedFileInfo() != nil {
return fileInfoFromProto(resp.GetModifiedFileInfo()), ""
}
return info, ""
}
// ReactionHasBeenAdded is invoked after a reaction has been committed.
func (h *hooksGRPCClient) ReactionHasBeenAdded(c *Context, reaction *model.Reaction) {
if !h.implemented[ReactionHasBeenAddedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.ReactionHasBeenAdded(ctx, &pb.ReactionHasBeenAddedRequest{
PluginContext: pluginContextToProto(c),
Reaction: reactionToProto(reaction),
})
if err != nil {
h.log.Error("gRPC ReactionHasBeenAdded call failed", mlog.Err(err))
}
}
// ReactionHasBeenRemoved is invoked after a reaction has been removed.
func (h *hooksGRPCClient) ReactionHasBeenRemoved(c *Context, reaction *model.Reaction) {
if !h.implemented[ReactionHasBeenRemovedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.ReactionHasBeenRemoved(ctx, &pb.ReactionHasBeenRemovedRequest{
PluginContext: pluginContextToProto(c),
Reaction: reactionToProto(reaction),
})
if err != nil {
h.log.Error("gRPC ReactionHasBeenRemoved call failed", mlog.Err(err))
}
}
// OnPluginClusterEvent is invoked when an intra-cluster plugin event is received.
func (h *hooksGRPCClient) OnPluginClusterEvent(c *Context, ev model.PluginClusterEvent) {
if !h.implemented[OnPluginClusterEventID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.OnPluginClusterEvent(ctx, &pb.OnPluginClusterEventRequest{
PluginContext: pluginContextToProto(c),
Event: &pb.PluginClusterEvent{
Id: ev.Id,
Data: ev.Data,
},
})
if err != nil {
h.log.Error("gRPC OnPluginClusterEvent call failed", mlog.Err(err))
}
}
// OnInstall is invoked after the installation of a plugin.
func (h *hooksGRPCClient) OnInstall(c *Context, event model.OnInstallEvent) error {
if !h.implemented[OnInstallID] {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.OnInstall(ctx, &pb.OnInstallRequest{
PluginContext: pluginContextToProto(c),
Event: &pb.OnInstallEvent{
UserId: event.UserId,
},
})
if err != nil {
h.log.Error("gRPC OnInstall call failed", mlog.Err(err))
return fmt.Errorf("gRPC OnInstall call failed: %w", err)
}
if resp.GetError() != nil {
return appErrorFromProto(resp.GetError())
}
return nil
}
// OnSendDailyTelemetry is invoked when the server sends daily telemetry data.
func (h *hooksGRPCClient) OnSendDailyTelemetry() {
if !h.implemented[OnSendDailyTelemetryID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.OnSendDailyTelemetry(ctx, &pb.OnSendDailyTelemetryRequest{})
if err != nil {
h.log.Error("gRPC OnSendDailyTelemetry call failed", mlog.Err(err))
}
}
// RunDataRetention is invoked during a DataRetentionJob.
func (h *hooksGRPCClient) RunDataRetention(nowTime, batchSize int64) (int64, error) {
if !h.implemented[RunDataRetentionID] {
return 0, nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.RunDataRetention(ctx, &pb.RunDataRetentionRequest{
NowTime: nowTime,
BatchSize: batchSize,
})
if err != nil {
h.log.Error("gRPC RunDataRetention call failed", mlog.Err(err))
return 0, fmt.Errorf("gRPC RunDataRetention call failed: %w", err)
}
if resp.GetError() != nil {
return 0, appErrorFromProto(resp.GetError())
}
return resp.GetDeletedCount(), nil
}
// OnCloudLimitsUpdated is invoked when cloud product limits change.
func (h *hooksGRPCClient) OnCloudLimitsUpdated(limits *model.ProductLimits) {
if !h.implemented[OnCloudLimitsUpdatedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
_, err := h.client.OnCloudLimitsUpdated(ctx, &pb.OnCloudLimitsUpdatedRequest{
Limits: productLimitsToProto(limits),
})
if err != nil {
h.log.Error("gRPC OnCloudLimitsUpdated call failed", mlog.Err(err))
}
}
// ConfigurationWillBeSaved is invoked before saving configuration.
func (h *hooksGRPCClient) ConfigurationWillBeSaved(newCfg *model.Config) (*model.Config, error) {
if !h.implemented[ConfigurationWillBeSavedID] {
return newCfg, nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
// Serialize config to JSON
configJSON, err := json.Marshal(newCfg)
if err != nil {
return newCfg, fmt.Errorf("failed to marshal config: %w", err)
}
resp, err := h.client.ConfigurationWillBeSaved(ctx, &pb.ConfigurationWillBeSavedRequest{
NewConfig: &pb.ConfigJson{ConfigJson: configJSON},
})
if err != nil {
h.log.Error("gRPC ConfigurationWillBeSaved call failed", mlog.Err(err))
return newCfg, fmt.Errorf("gRPC ConfigurationWillBeSaved call failed: %w", err)
}
if resp.GetError() != nil {
return nil, appErrorFromProto(resp.GetError())
}
// Deserialize modified config if present
if resp.GetModifiedConfig() != nil && len(resp.GetModifiedConfig().GetConfigJson()) > 0 {
var modifiedCfg model.Config
if err := json.Unmarshal(resp.GetModifiedConfig().GetConfigJson(), &modifiedCfg); err != nil {
return newCfg, fmt.Errorf("failed to unmarshal modified config: %w", err)
}
return &modifiedCfg, nil
}
return newCfg, nil
}
// NotificationWillBePushed is invoked before a push notification is sent.
func (h *hooksGRPCClient) NotificationWillBePushed(pushNotification *model.PushNotification, userID string) (*model.PushNotification, string) {
if !h.implemented[NotificationWillBePushedID] {
return pushNotification, ""
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.NotificationWillBePushed(ctx, &pb.NotificationWillBePushedRequest{
PushNotification: pushNotificationToProto(pushNotification),
UserId: userID,
})
if err != nil {
h.log.Error("gRPC NotificationWillBePushed call failed", mlog.Err(err))
return pushNotification, ""
}
if resp.GetRejectionReason() != "" {
return nil, resp.GetRejectionReason()
}
if resp.GetModifiedNotification() != nil {
return pushNotificationFromProto(resp.GetModifiedNotification()), ""
}
return pushNotification, ""
}
// PreferencesHaveChanged is invoked after a user's preferences have changed.
func (h *hooksGRPCClient) PreferencesHaveChanged(c *Context, preferences []model.Preference) {
if !h.implemented[PreferencesHaveChangedID] {
return
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
pbPrefs := make([]*pb.Preference, len(preferences))
for i, p := range preferences {
pbPrefs[i] = preferenceToProto(p)
}
_, err := h.client.PreferencesHaveChanged(ctx, &pb.PreferencesHaveChangedRequest{
PluginContext: pluginContextToProto(c),
Preferences: pbPrefs,
})
if err != nil {
h.log.Error("gRPC PreferencesHaveChanged call failed", mlog.Err(err))
}
}
// OnSharedChannelsSyncMsg is invoked when a shared channels sync message is received.
func (h *hooksGRPCClient) OnSharedChannelsSyncMsg(msg *model.SyncMsg, rc *model.RemoteCluster) (model.SyncResponse, error) {
if !h.implemented[OnSharedChannelsSyncMsgID] {
return model.SyncResponse{}, nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.OnSharedChannelsSyncMsg(ctx, &pb.OnSharedChannelsSyncMsgRequest{
SyncMsg: syncMsgToProto(msg),
RemoteCluster: remoteClusterToProto(rc),
})
if err != nil {
h.log.Error("gRPC OnSharedChannelsSyncMsg call failed", mlog.Err(err))
return model.SyncResponse{}, fmt.Errorf("gRPC OnSharedChannelsSyncMsg call failed: %w", err)
}
if resp.GetError() != nil {
return model.SyncResponse{}, appErrorFromProto(resp.GetError())
}
return syncResponseFromProto(resp.GetResponse()), nil
}
// OnSharedChannelsPing is invoked to check the health of the shared channels plugin.
func (h *hooksGRPCClient) OnSharedChannelsPing(rc *model.RemoteCluster) bool {
if !h.implemented[OnSharedChannelsPingID] {
return true
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.OnSharedChannelsPing(ctx, &pb.OnSharedChannelsPingRequest{
RemoteCluster: remoteClusterToProto(rc),
})
if err != nil {
h.log.Error("gRPC OnSharedChannelsPing call failed", mlog.Err(err))
return false
}
return resp.GetHealthy()
}
// OnSharedChannelsAttachmentSyncMsg is invoked when a file attachment sync message is received.
func (h *hooksGRPCClient) OnSharedChannelsAttachmentSyncMsg(fi *model.FileInfo, post *model.Post, rc *model.RemoteCluster) error {
if !h.implemented[OnSharedChannelsAttachmentSyncMsgID] {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.OnSharedChannelsAttachmentSyncMsg(ctx, &pb.OnSharedChannelsAttachmentSyncMsgRequest{
FileInfo: fileInfoToProto(fi),
Post: postToProto(post),
RemoteCluster: remoteClusterToProto(rc),
})
if err != nil {
h.log.Error("gRPC OnSharedChannelsAttachmentSyncMsg call failed", mlog.Err(err))
return fmt.Errorf("gRPC OnSharedChannelsAttachmentSyncMsg call failed: %w", err)
}
if resp.GetError() != nil {
return appErrorFromProto(resp.GetError())
}
return nil
}
// OnSharedChannelsProfileImageSyncMsg is invoked when a profile image sync message is received.
func (h *hooksGRPCClient) OnSharedChannelsProfileImageSyncMsg(user *model.User, rc *model.RemoteCluster) error {
if !h.implemented[OnSharedChannelsProfileImageSyncMsgID] {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.OnSharedChannelsProfileImageSyncMsg(ctx, &pb.OnSharedChannelsProfileImageSyncMsgRequest{
User: userToProto(user),
RemoteCluster: remoteClusterToProto(rc),
})
if err != nil {
h.log.Error("gRPC OnSharedChannelsProfileImageSyncMsg call failed", mlog.Err(err))
return fmt.Errorf("gRPC OnSharedChannelsProfileImageSyncMsg call failed: %w", err)
}
if resp.GetError() != nil {
return appErrorFromProto(resp.GetError())
}
return nil
}
// GenerateSupportData is invoked when a Support Packet is generated.
func (h *hooksGRPCClient) GenerateSupportData(c *Context) ([]*model.FileData, error) {
if !h.implemented[GenerateSupportDataID] {
return nil, nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
resp, err := h.client.GenerateSupportData(ctx, &pb.GenerateSupportDataRequest{
PluginContext: pluginContextToProto(c),
})
if err != nil {
h.log.Error("gRPC GenerateSupportData call failed", mlog.Err(err))
return nil, fmt.Errorf("gRPC GenerateSupportData call failed: %w", err)
}
if resp.GetError() != nil {
return nil, appErrorFromProto(resp.GetError())
}
files := make([]*model.FileData, len(resp.GetFiles()))
for i, f := range resp.GetFiles() {
files[i] = &model.FileData{
Filename: f.GetFilename(),
Body: f.GetData(),
}
}
return files, nil
}
// OnSAMLLogin is invoked after a successful SAML login.
func (h *hooksGRPCClient) OnSAMLLogin(c *Context, user *model.User, assertion *saml2.AssertionInfo) error {
if !h.implemented[OnSAMLLoginID] {
return nil
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
// Serialize SAML assertion to JSON since it's a complex type
var assertionJSON []byte
if assertion != nil {
var err error
assertionJSON, err = json.Marshal(assertion)
if err != nil {
h.log.Error("Failed to marshal SAML assertion", mlog.Err(err))
}
}
resp, err := h.client.OnSAMLLogin(ctx, &pb.OnSAMLLoginRequest{
PluginContext: pluginContextToProto(c),
User: userToProto(user),
Assertion: &pb.SamlAssertionInfoJson{AssertionJson: assertionJSON},
})
if err != nil {
h.log.Error("gRPC OnSAMLLogin call failed", mlog.Err(err))
return fmt.Errorf("gRPC OnSAMLLogin call failed: %w", err)
}
if resp.GetError() != nil {
return appErrorFromProto(resp.GetError())
}
return nil
}
// EmailNotificationWillBeSent is invoked before an email notification is sent.
func (h *hooksGRPCClient) EmailNotificationWillBeSent(emailNotification *model.EmailNotification) (*model.EmailNotificationContent, string) {
if !h.implemented[EmailNotificationWillBeSentID] {
return nil, ""
}
ctx, cancel := context.WithTimeout(context.Background(), defaultGRPCTimeout)
defer cancel()
// Serialize email notification to JSON since it's a complex type
notificationJSON, err := json.Marshal(emailNotification)
if err != nil {
h.log.Error("Failed to marshal email notification", mlog.Err(err))
return nil, ""
}
resp, err := h.client.EmailNotificationWillBeSent(ctx, &pb.EmailNotificationWillBeSentRequest{
EmailNotification: &pb.EmailNotificationJson{NotificationJson: notificationJSON},
})
if err != nil {
h.log.Error("gRPC EmailNotificationWillBeSent call failed", mlog.Err(err))
return nil, ""
}
if resp.GetRejectionReason() != "" {
return nil, resp.GetRejectionReason()
}
if resp.GetModifiedContent() != nil {
return emailNotificationContentFromProto(resp.GetModifiedContent()), ""
}
return nil, ""
}
// ServeMetrics allows plugins to expose their own metrics endpoint.
// Note: This uses the same bidirectional streaming pattern as ServeHTTP.
// For now, we return 404 as ServeMetrics streaming is deferred to Phase 8.
func (h *hooksGRPCClient) ServeMetrics(c *Context, w http.ResponseWriter, r *http.Request) {
if !h.implemented[ServeMetricsID] {
http.NotFound(w, r)
return
}
// ServeMetrics uses the same streaming pattern as ServeHTTP
// For now, return 501 Not Implemented since streaming for ServeMetrics is deferred
http.Error(w, "ServeMetrics not yet implemented for gRPC plugins", http.StatusNotImplemented)
}
// =============================================================================
// Conversion Helpers
// =============================================================================
// appErrorFromProto converts a proto AppError to model.AppError.
func appErrorFromProto(pbErr *pb.AppError) *model.AppError {
if pbErr == nil {
return nil
}
var params map[string]any
if pbErr.Params != nil {
params = pbErr.Params.AsMap()
}
return model.NewAppError(pbErr.Where, pbErr.Id, params, pbErr.DetailedError, int(pbErr.StatusCode))
}
// pluginContextToProto converts a plugin.Context to proto PluginContext.
func pluginContextToProto(c *Context) *pb.PluginContext {
if c == nil {
return nil
}
return &pb.PluginContext{
SessionId: c.SessionId,
RequestId: c.RequestId,
IpAddress: c.IPAddress,
AcceptLanguage: c.AcceptLanguage,
UserAgent: c.UserAgent,
}
}
// postToProto converts a model.Post to proto Post.
func postToProto(post *model.Post) *pb.Post {
if post == nil {
return nil
}
pbPost := &pb.Post{
Id: post.Id,
CreateAt: post.CreateAt,
UpdateAt: post.UpdateAt,
EditAt: post.EditAt,
DeleteAt: post.DeleteAt,
IsPinned: post.IsPinned,
UserId: post.UserId,
ChannelId: post.ChannelId,
RootId: post.RootId,
OriginalId: post.OriginalId,
Message: post.Message,
MessageSource: post.MessageSource,
Type: post.Type,
Hashtags: post.Hashtags,
FileIds: post.FileIds,
PendingPostId: post.PendingPostId,
HasReactions: post.HasReactions,
ReplyCount: post.ReplyCount,
LastReplyAt: post.LastReplyAt,
RemoteId: post.RemoteId,
IsFollowing: post.IsFollowing,
}
return pbPost
}
// postFromProto converts a proto Post to model.Post.
func postFromProto(pbPost *pb.Post) *model.Post {
if pbPost == nil {
return nil
}
return &model.Post{
Id: pbPost.Id,
CreateAt: pbPost.CreateAt,
UpdateAt: pbPost.UpdateAt,
EditAt: pbPost.EditAt,
DeleteAt: pbPost.DeleteAt,
IsPinned: pbPost.IsPinned,
UserId: pbPost.UserId,
ChannelId: pbPost.ChannelId,
RootId: pbPost.RootId,
OriginalId: pbPost.OriginalId,
Message: pbPost.Message,
MessageSource: pbPost.MessageSource,
Type: pbPost.Type,
Hashtags: pbPost.Hashtags,
FileIds: pbPost.FileIds,
PendingPostId: pbPost.PendingPostId,
HasReactions: pbPost.HasReactions,
ReplyCount: pbPost.ReplyCount,
LastReplyAt: pbPost.LastReplyAt,
RemoteId: pbPost.RemoteId,
IsFollowing: pbPost.IsFollowing,
}
}
// userToProto converts a model.User to proto User.
func userToProto(u *model.User) *pb.User {
if u == nil {
return nil
}
return &pb.User{
Id: u.Id,
CreateAt: u.CreateAt,
UpdateAt: u.UpdateAt,
DeleteAt: u.DeleteAt,
Username: u.Username,
Password: u.Password,
AuthService: u.AuthService,
AuthData: u.AuthData,
Email: u.Email,
EmailVerified: u.EmailVerified,
Nickname: u.Nickname,
FirstName: u.FirstName,
LastName: u.LastName,
Position: u.Position,
Roles: u.Roles,
AllowMarketing: u.AllowMarketing,
Props: u.Props,
NotifyProps: u.NotifyProps,
LastPasswordUpdate: u.LastPasswordUpdate,
LastPictureUpdate: u.LastPictureUpdate,
FailedAttempts: int32(u.FailedAttempts),
Locale: u.Locale,
Timezone: u.Timezone,
MfaActive: u.MfaActive,
MfaSecret: u.MfaSecret,
RemoteId: u.RemoteId,
LastActivityAt: u.LastActivityAt,
IsBot: u.IsBot,
BotDescription: u.BotDescription,
BotLastIconUpdate: u.BotLastIconUpdate,
TermsOfServiceId: u.TermsOfServiceId,
TermsOfServiceCreateAt: u.TermsOfServiceCreateAt,
DisableWelcomeEmail: u.DisableWelcomeEmail,
LastLogin: u.LastLogin,
}
}
// channelToProto converts a model.Channel to proto Channel.
func channelToProto(c *model.Channel) *pb.Channel {
if c == nil {
return nil
}
return &pb.Channel{
Id: c.Id,
CreateAt: c.CreateAt,
UpdateAt: c.UpdateAt,
DeleteAt: c.DeleteAt,
TeamId: c.TeamId,
DisplayName: c.DisplayName,
Name: c.Name,
Header: c.Header,
Purpose: c.Purpose,
LastPostAt: c.LastPostAt,
TotalMsgCount: c.TotalMsgCount,
ExtraUpdateAt: c.ExtraUpdateAt,
CreatorId: c.CreatorId,
SchemeId: c.SchemeId,
GroupConstrained: c.GroupConstrained,
Shared: c.Shared,
TotalMsgCountRoot: c.TotalMsgCountRoot,
PolicyId: c.PolicyID,
LastRootPostAt: c.LastRootPostAt,
}
}
// channelMemberToProto converts a model.ChannelMember to proto ChannelMember.
func channelMemberToProto(cm *model.ChannelMember) *pb.ChannelMember {
if cm == nil {
return nil
}
return &pb.ChannelMember{
ChannelId: cm.ChannelId,
UserId: cm.UserId,
Roles: cm.Roles,
LastViewedAt: cm.LastViewedAt,
MsgCount: cm.MsgCount,
MentionCount: cm.MentionCount,
MentionCountRoot: cm.MentionCountRoot,
MsgCountRoot: cm.MsgCountRoot,
NotifyProps: cm.NotifyProps,
LastUpdateAt: cm.LastUpdateAt,
SchemeGuest: cm.SchemeGuest,
SchemeUser: cm.SchemeUser,
SchemeAdmin: cm.SchemeAdmin,
UrgentMentionCount: cm.UrgentMentionCount,
}
}
// teamMemberToProto converts a model.TeamMember to proto TeamMember.
func teamMemberToProto(tm *model.TeamMember) *pb.TeamMember {
if tm == nil {
return nil
}
return &pb.TeamMember{
TeamId: tm.TeamId,
UserId: tm.UserId,
Roles: tm.Roles,
DeleteAt: tm.DeleteAt,
SchemeGuest: tm.SchemeGuest,
SchemeUser: tm.SchemeUser,
SchemeAdmin: tm.SchemeAdmin,
CreateAt: tm.CreateAt,
}
}
// commandArgsToProto converts a model.CommandArgs to proto CommandArgs.
func commandArgsToProto(args *model.CommandArgs) *pb.CommandArgs {
if args == nil {
return nil
}
return &pb.CommandArgs{
UserId: args.UserId,
ChannelId: args.ChannelId,
TeamId: args.TeamId,
RootId: args.RootId,
ParentId: args.ParentId,
TriggerId: args.TriggerId,
Command: args.Command,
SiteUrl: args.SiteURL,
}
}
// commandResponseFromProto converts a proto CommandResponse to model.CommandResponse.
func commandResponseFromProto(resp *pb.CommandResponse) *model.CommandResponse {
if resp == nil {
return nil
}
result := &model.CommandResponse{
ResponseType: resp.ResponseType,
Text: resp.Text,
Username: resp.Username,
ChannelId: resp.ChannelId,
IconURL: resp.IconUrl,
GotoLocation: resp.GotoLocation,
TriggerId: resp.TriggerId,
SkipSlackParsing: resp.SkipSlackParsing,
}
if resp.Props != nil {
result.Props = resp.Props.AsMap()
}
return result
}
// webSocketRequestToProto converts a model.WebSocketRequest to proto WebSocketRequest.
func webSocketRequestToProto(req *model.WebSocketRequest) *pb.WebSocketRequest {
if req == nil {
return nil
}
pbReq := &pb.WebSocketRequest{
Seq: req.Seq,
Action: req.Action,
}
// Convert data to structpb.Struct
if req.Data != nil {
if s, err := structpb.NewStruct(req.Data); err == nil {
pbReq.Data = s
}
}
return pbReq
}
// fileInfoToProto converts a model.FileInfo to proto FileInfo.
func fileInfoToProto(fi *model.FileInfo) *pb.FileInfo {
if fi == nil {
return nil
}
pbFileInfo := &pb.FileInfo{
Id: fi.Id,
CreatorId: fi.CreatorId,
PostId: fi.PostId,
ChannelId: fi.ChannelId,
CreateAt: fi.CreateAt,
UpdateAt: fi.UpdateAt,
DeleteAt: fi.DeleteAt,
Name: fi.Name,
Extension: fi.Extension,
Size: fi.Size,
MimeType: fi.MimeType,
Width: int32(fi.Width),
Height: int32(fi.Height),
HasPreviewImage: fi.HasPreviewImage,
Archived: fi.Archived,
}
// Handle optional MiniPreview
if fi.MiniPreview != nil {
pbFileInfo.MiniPreview = *fi.MiniPreview
}
// Handle optional RemoteId
if fi.RemoteId != nil {
pbFileInfo.RemoteId = fi.RemoteId
}
return pbFileInfo
}
// fileInfoFromProto converts a proto FileInfo to model.FileInfo.
func fileInfoFromProto(fi *pb.FileInfo) *model.FileInfo {
if fi == nil {
return nil
}
modelFileInfo := &model.FileInfo{
Id: fi.Id,
CreatorId: fi.CreatorId,
PostId: fi.PostId,
ChannelId: fi.ChannelId,
CreateAt: fi.CreateAt,
UpdateAt: fi.UpdateAt,
DeleteAt: fi.DeleteAt,
Name: fi.Name,
Extension: fi.Extension,
Size: fi.Size,
MimeType: fi.MimeType,
Width: int(fi.Width),
Height: int(fi.Height),
HasPreviewImage: fi.HasPreviewImage,
Archived: fi.Archived,
}
// Handle optional MiniPreview
if len(fi.MiniPreview) > 0 {
miniPreview := fi.MiniPreview
modelFileInfo.MiniPreview = &miniPreview
}
// Handle optional RemoteId
if fi.RemoteId != nil {
remoteId := fi.GetRemoteId()
modelFileInfo.RemoteId = &remoteId
}
return modelFileInfo
}
// reactionToProto converts a model.Reaction to proto Reaction.
func reactionToProto(r *model.Reaction) *pb.Reaction {
if r == nil {
return nil
}
pbReaction := &pb.Reaction{
UserId: r.UserId,
PostId: r.PostId,
EmojiName: r.EmojiName,
CreateAt: r.CreateAt,
UpdateAt: r.UpdateAt,
DeleteAt: r.DeleteAt,
RemoteId: r.RemoteId,
}
if r.ChannelId != "" {
pbReaction.ChannelId = &r.ChannelId
}
return pbReaction
}
// productLimitsToProto converts a model.ProductLimits to proto ProductLimits.
func productLimitsToProto(limits *model.ProductLimits) *pb.ProductLimits {
if limits == nil {
return nil
}
pbLimits := &pb.ProductLimits{}
if limits.Files != nil {
pbLimits.Files = &pb.FilesLimits{}
// Note: TotalStorage conversion would require wrapperspb handling
}
if limits.Messages != nil {
pbLimits.Messages = &pb.MessagesLimits{}
// Note: History conversion would require wrapperspb handling
}
if limits.Teams != nil {
pbLimits.Teams = &pb.TeamsLimits{}
// Note: Active conversion would require wrapperspb handling
}
return pbLimits
}
// pushNotificationToProto converts a model.PushNotification to proto PushNotification.
func pushNotificationToProto(pn *model.PushNotification) *pb.PushNotification {
if pn == nil {
return nil
}
return &pb.PushNotification{
AckId: pn.AckId,
Platform: pn.Platform,
ServerId: pn.ServerId,
DeviceId: pn.DeviceId,
PostId: pn.PostId,
Category: pn.Category,
Sound: pn.Sound,
Message: pn.Message,
Badge: fmt.Sprintf("%d", pn.Badge),
TeamId: pn.TeamId,
ChannelId: pn.ChannelId,
RootId: pn.RootId,
ChannelName: pn.ChannelName,
Type: pn.Type,
SenderId: pn.SenderId,
SenderName: pn.SenderName,
OverrideUsername: pn.OverrideUsername,
OverrideIconUrl: pn.OverrideIconURL,
FromWebhook: pn.FromWebhook,
Version: pn.Version,
}
}
// pushNotificationFromProto converts a proto PushNotification to model.PushNotification.
func pushNotificationFromProto(pn *pb.PushNotification) *model.PushNotification {
if pn == nil {
return nil
}
badge := 0
if pn.Badge != "" {
_ = json.Unmarshal([]byte(pn.Badge), &badge)
}
return &model.PushNotification{
AckId: pn.AckId,
Platform: pn.Platform,
ServerId: pn.ServerId,
DeviceId: pn.DeviceId,
PostId: pn.PostId,
Category: pn.Category,
Sound: pn.Sound,
Message: pn.Message,
Badge: badge,
TeamId: pn.TeamId,
ChannelId: pn.ChannelId,
RootId: pn.RootId,
ChannelName: pn.ChannelName,
Type: pn.Type,
SenderId: pn.SenderId,
SenderName: pn.SenderName,
OverrideUsername: pn.OverrideUsername,
OverrideIconURL: pn.OverrideIconUrl,
FromWebhook: pn.FromWebhook,
Version: pn.Version,
}
}
// preferenceToProto converts a model.Preference to proto Preference.
func preferenceToProto(p model.Preference) *pb.Preference {
return &pb.Preference{
UserId: p.UserId,
Category: p.Category,
Name: p.Name,
Value: p.Value,
}
}
// syncMsgToProto converts a model.SyncMsg to proto SyncMsgJson.
func syncMsgToProto(msg *model.SyncMsg) *pb.SyncMsgJson {
if msg == nil {
return nil
}
// Serialize the complex SyncMsg to JSON
data, err := json.Marshal(msg)
if err != nil {
return nil
}
return &pb.SyncMsgJson{
SyncMsgJson: data,
}
}
// remoteClusterToProto converts a model.RemoteCluster to proto RemoteCluster.
func remoteClusterToProto(rc *model.RemoteCluster) *pb.RemoteCluster {
if rc == nil {
return nil
}
return &pb.RemoteCluster{
RemoteId: rc.RemoteId,
RemoteTeamId: rc.RemoteTeamId,
Name: rc.Name,
DisplayName: rc.DisplayName,
SiteUrl: rc.SiteURL,
CreateAt: rc.CreateAt,
LastPingAt: rc.LastPingAt,
Token: rc.Token,
RemoteToken: rc.RemoteToken,
Topics: rc.Topics,
CreatorId: rc.CreatorId,
}
}
// syncResponseFromProto converts a proto SyncResponse to model.SyncResponse.
func syncResponseFromProto(resp *pb.SyncResponse) model.SyncResponse {
if resp == nil {
return model.SyncResponse{}
}
return model.SyncResponse{
UsersLastUpdateAt: resp.GetUsersLastUpdateAt(),
UserErrors: resp.GetUserErrors(),
UsersSyncd: resp.GetUsersSyncd(),
PostsLastUpdateAt: resp.GetPostsLastUpdateAt(),
PostErrors: resp.GetPostErrors(),
ReactionsLastUpdateAt: resp.GetReactionsLastUpdateAt(),
ReactionErrors: resp.GetReactionErrors(),
AcknowledgementsLastUpdateAt: resp.GetAcknowledgementsLastUpdateAt(),
AcknowledgementErrors: resp.GetAcknowledgementErrors(),
StatusErrors: resp.GetStatusErrors(),
}
}
// emailNotificationContentFromProto converts a proto EmailNotificationContent to model.EmailNotificationContent.
func emailNotificationContentFromProto(content *pb.EmailNotificationContent) *model.EmailNotificationContent {
if content == nil {
return nil
}
return &model.EmailNotificationContent{
Subject: content.Subject,
Title: content.Title,
SubTitle: content.SubTitle,
MessageHTML: content.MessageHtml,
MessageText: content.MessageText,
ButtonText: content.ButtonText,
ButtonURL: content.ButtonUrl,
FooterText: content.FooterText,
}
}
// Ensure url package is used
var _ = url.URL{}