mattermost/server/platform/services/remotecluster/sendmsg.go
Doug Lauder e694e86d63
Some checks are pending
API / build (push) Waiting to run
Server CI / Compute Go Version (push) Waiting to run
Server CI / Check mocks (push) Blocked by required conditions
Server CI / Check go mod tidy (push) Blocked by required conditions
Server CI / check-style (push) Blocked by required conditions
Server CI / Check serialization methods for hot structs (push) Blocked by required conditions
Server CI / Vet API (push) Blocked by required conditions
Server CI / Check migration files (push) Blocked by required conditions
Server CI / Generate email templates (push) Blocked by required conditions
Server CI / Check store layers (push) Blocked by required conditions
Server CI / Check mmctl docs (push) Blocked by required conditions
Server CI / Postgres with binary parameters (push) Blocked by required conditions
Server CI / Postgres (shard 0) (push) Blocked by required conditions
Server CI / Postgres (shard 1) (push) Blocked by required conditions
Server CI / Postgres (shard 2) (push) Blocked by required conditions
Server CI / Postgres (shard 3) (push) Blocked by required conditions
Server CI / Merge Postgres Test Results (push) Blocked by required conditions
Server CI / Postgres (FIPS) (push) Blocked by required conditions
Server CI / Generate Test Coverage (push) Blocked by required conditions
Server CI / Run mmctl tests (push) Blocked by required conditions
Server CI / Run mmctl tests (FIPS) (push) Blocked by required conditions
Server CI / Build mattermost server app (push) Blocked by required conditions
Tools CI / check-style (mattermost-govet) (push) Waiting to run
Tools CI / Test (mattermost-govet) (push) Waiting to run
Web App CI / check-lint (push) Waiting to run
Web App CI / check-i18n (push) Blocked by required conditions
Web App CI / check-external-links (push) Blocked by required conditions
Web App CI / check-types (push) Blocked by required conditions
Web App CI / test (platform) (push) Blocked by required conditions
Web App CI / test (mattermost-redux) (push) Blocked by required conditions
Web App CI / test (channels shard 1/4) (push) Blocked by required conditions
Web App CI / test (channels shard 2/4) (push) Blocked by required conditions
Web App CI / test (channels shard 3/4) (push) Blocked by required conditions
Web App CI / test (channels shard 4/4) (push) Blocked by required conditions
Web App CI / upload-coverage (push) Blocked by required conditions
Web App CI / build (push) Blocked by required conditions
MM-68204: Use multi-level logging for shared channel and remote cluster service errors (#35949)
* Use multi-level logging for shared channel and remote cluster service errors

  Service-specific log levels (LvlRemoteClusterServiceError, LvlSharedChannelServiceError)
  were hidden from the main log by default, requiring explicit configuration to see them.
  Switch all call sites to use LogM with multi-level combos so each log line is attributed
  to both the standard level (error/warn) and the service-specific level. This surfaces
  errors in the main log while preserving the ability to isolate them into dedicated files.

  Each instance was reviewed and either kept as error (DB failures, security issues, config
  errors, exhausted retries on critical data) or downgraded to warn (transient network
  failures, remote-side reported errors with retry logic, non-critical data like profile
  images, reactions, acknowledgements, and status).
2026-04-06 12:17:47 -04:00

175 lines
5.3 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package remotecluster
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"time"
"github.com/wiggin77/merror"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
type SendMsgResultFunc func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response, err error)
type sendMsgTask struct {
rc *model.RemoteCluster
msg model.RemoteClusterMsg
f SendMsgResultFunc
}
// BroadcastMsg asynchronously sends a message to all remote clusters interested in the message's topic.
//
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
// BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
//
// An optional callback can be provided that receives the success or fail result of sending to each remote cluster.
// Success or fail is regarding message delivery only. If a callback is provided it should return quickly.
func (rcs *Service) BroadcastMsg(ctx context.Context, msg model.RemoteClusterMsg, f SendMsgResultFunc) error {
// get list of interested remotes.
filter := model.RemoteClusterQueryFilter{
Topic: msg.Topic,
}
list, err := rcs.server.GetStore().RemoteCluster().GetAll(0, 999999, filter)
if err != nil {
return err
}
errs := merror.New()
for _, rc := range list {
if err := rcs.SendMsg(ctx, msg, rc, f); err != nil {
errs.Append(err)
}
}
return errs.ErrorOrNil()
}
// SendMsg asynchronously sends a message to a remote cluster.
//
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
// BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
//
// Nil or error return indicates success or failure of message enqueue only.
//
// An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the
// callback is regarding response decoding only. The `resp` contains the decoded bytes returned from the remote.
// If a callback is provided it should return quickly.
func (rcs *Service) SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, f SendMsgResultFunc) error {
task := sendMsgTask{
rc: rc,
msg: msg,
f: f,
}
return rcs.enqueueTask(ctx, rc.RemoteId, task)
}
// sendMsg is called when a sendMsgTask is popped from the send channel.
func (rcs *Service) sendMsg(task sendMsgTask) {
var errResp error
var response Response
// Ensure a panic from the callback does not exit the pool goroutine.
defer func() {
if errResp != nil {
response.Err = errResp.Error()
}
// If callback provided then call it with the results.
if task.f != nil {
task.f(task.msg, task.rc, &response, errResp)
}
}()
frame := &model.RemoteClusterFrame{
RemoteId: task.rc.RemoteId,
Msg: task.msg,
}
u, err := url.Parse(task.rc.SiteURL)
if err != nil {
rcs.server.Log().LogM(mlog.MlvlRemoteClusterServiceError, "Invalid siteURL while sending message to remote",
mlog.String("remote", task.rc.DisplayName),
mlog.String("msgId", task.msg.Id),
mlog.Err(err),
)
errResp = err
return
}
u.Path = path.Join(u.Path, SendMsgURL)
respJSON, err := rcs.sendFrameToRemote(SendTimeout, task.rc, frame, u.String())
if err != nil {
rcs.server.Log().LogM(mlog.MlvlRemoteClusterServiceError, "Remote Cluster send message failed",
mlog.String("remote", task.rc.DisplayName),
mlog.String("msgId", task.msg.Id),
mlog.Err(err),
)
errResp = err
} else {
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "Remote Cluster message sent successfully",
mlog.String("remote", task.rc.DisplayName),
mlog.String("msgId", task.msg.Id),
)
if err = json.Unmarshal(respJSON, &response); err != nil {
rcs.server.Log().Error("Invalid response sending message to remote cluster",
mlog.String("remote", task.rc.DisplayName),
mlog.Err(err),
)
errResp = err
}
}
}
func (rcs *Service) sendFrameToRemote(timeout time.Duration, rc *model.RemoteCluster, frame *model.RemoteClusterFrame, url string) ([]byte, error) {
body, err := json.Marshal(frame)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set(model.HeaderRemoteclusterId, rc.RemoteId)
req.Header.Set(model.HeaderRemoteclusterToken, rc.RemoteToken)
resp, err := rcs.httpClient.Do(req.WithContext(ctx))
if metrics := rcs.server.GetMetrics(); metrics != nil {
if err != nil || resp.StatusCode != http.StatusOK {
metrics.IncrementRemoteClusterMsgErrorsCounter(frame.RemoteId, os.IsTimeout(err))
} else {
metrics.IncrementRemoteClusterMsgSentCounter(frame.RemoteId)
}
}
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err = io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return body, fmt.Errorf("unexpected response: %d - %s", resp.StatusCode, resp.Status)
}
return body, nil
}