mattermost/server/platform/services/remotecluster/sendmsg.go
Miguel de la Cruz 809ad4f76d
Adds Remote Cluster related API endpoints (#27432)
* Adds Remote Cluster related API endpoints

New endpoints for the following routes are added:

- Get Remote Clusters at `GET /api/v4/remotecluster`
- Create Remote Cluster at `POST /api/v4/remotecluster`
- Accept Remote Cluster invite at `POST
/api/v4/remotecluster/accept_invite`
- Generate Remote Cluster invite at `POST
/api/v4/remotecluster/{remote_id}/generate_invite`
- Get Remote Cluster at `GET /api/v4/remotecluster/{remote_id}`
- Patch Remote Cluster at `PATCH /api/v4/remotecluster/{remote_id}`
- Delete Remote Cluster at `DELETE /api/v4/remotecluster/{remote_id}`

These endpoints are planned to be used from the system console, and
gated through the `manage_secure_connections` permission.

* Update server/channels/api4/remote_cluster_test.go

Co-authored-by: Doug Lauder <wiggin77@warpmail.net>

* Fix AppError names

---------

Co-authored-by: Doug Lauder <wiggin77@warpmail.net>
Co-authored-by: Mattermost Build <build@mattermost.com>
2024-07-04 10:35:26 +02:00

175 lines
5.3 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package remotecluster
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"net/url"
"os"
"path"
"time"
"github.com/wiggin77/merror"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
type SendMsgResultFunc func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response, err error)
type sendMsgTask struct {
rc *model.RemoteCluster
msg model.RemoteClusterMsg
f SendMsgResultFunc
}
// BroadcastMsg asynchronously sends a message to all remote clusters interested in the message's topic.
//
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
// BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
//
// An optional callback can be provided that receives the success or fail result of sending to each remote cluster.
// Success or fail is regarding message delivery only. If a callback is provided it should return quickly.
func (rcs *Service) BroadcastMsg(ctx context.Context, msg model.RemoteClusterMsg, f SendMsgResultFunc) error {
// get list of interested remotes.
filter := model.RemoteClusterQueryFilter{
Topic: msg.Topic,
}
list, err := rcs.server.GetStore().RemoteCluster().GetAll(0, 999999, filter)
if err != nil {
return err
}
errs := merror.New()
for _, rc := range list {
if err := rcs.SendMsg(ctx, msg, rc, f); err != nil {
errs.Append(err)
}
}
return errs.ErrorOrNil()
}
// SendMsg asynchronously sends a message to a remote cluster.
//
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
// BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
//
// Nil or error return indicates success or failure of message enqueue only.
//
// An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the
// callback is regarding response decoding only. The `resp` contains the decoded bytes returned from the remote.
// If a callback is provided it should return quickly.
func (rcs *Service) SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, f SendMsgResultFunc) error {
task := sendMsgTask{
rc: rc,
msg: msg,
f: f,
}
return rcs.enqueueTask(ctx, rc.RemoteId, task)
}
// sendMsg is called when a sendMsgTask is popped from the send channel.
func (rcs *Service) sendMsg(task sendMsgTask) {
var errResp error
var response Response
// Ensure a panic from the callback does not exit the pool goroutine.
defer func() {
if errResp != nil {
response.Err = errResp.Error()
}
// If callback provided then call it with the results.
if task.f != nil {
task.f(task.msg, task.rc, &response, errResp)
}
}()
frame := &model.RemoteClusterFrame{
RemoteId: task.rc.RemoteId,
Msg: task.msg,
}
u, err := url.Parse(task.rc.SiteURL)
if err != nil {
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceError, "Invalid siteURL while sending message to remote",
mlog.String("remote", task.rc.DisplayName),
mlog.String("msgId", task.msg.Id),
mlog.Err(err),
)
errResp = err
return
}
u.Path = path.Join(u.Path, SendMsgURL)
respJSON, err := rcs.sendFrameToRemote(SendTimeout, task.rc, frame, u.String())
if err != nil {
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceError, "Remote Cluster send message failed",
mlog.String("remote", task.rc.DisplayName),
mlog.String("msgId", task.msg.Id),
mlog.Err(err),
)
errResp = err
} else {
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "Remote Cluster message sent successfully",
mlog.String("remote", task.rc.DisplayName),
mlog.String("msgId", task.msg.Id),
)
if err = json.Unmarshal(respJSON, &response); err != nil {
rcs.server.Log().Error("Invalid response sending message to remote cluster",
mlog.String("remote", task.rc.DisplayName),
mlog.Err(err),
)
errResp = err
}
}
}
func (rcs *Service) sendFrameToRemote(timeout time.Duration, rc *model.RemoteCluster, frame *model.RemoteClusterFrame, url string) ([]byte, error) {
body, err := json.Marshal(frame)
if err != nil {
return nil, err
}
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
if err != nil {
return nil, err
}
req.Header.Set("Content-Type", "application/json")
req.Header.Set(model.HeaderRemoteclusterId, rc.RemoteId)
req.Header.Set(model.HeaderRemoteclusterToken, rc.RemoteToken)
resp, err := rcs.httpClient.Do(req.WithContext(ctx))
if metrics := rcs.server.GetMetrics(); metrics != nil {
if err != nil || resp.StatusCode != http.StatusOK {
metrics.IncrementRemoteClusterMsgErrorsCounter(frame.RemoteId, os.IsTimeout(err))
} else {
metrics.IncrementRemoteClusterMsgSentCounter(frame.RemoteId)
}
}
if err != nil {
return nil, err
}
defer resp.Body.Close()
body, err = io.ReadAll(resp.Body)
if err != nil {
return nil, err
}
if resp.StatusCode != http.StatusOK {
return body, fmt.Errorf("unexpected response: %d - %s", resp.StatusCode, resp.Status)
}
return body, nil
}