mirror of
https://github.com/mattermost/mattermost.git
synced 2026-02-03 20:40:00 -05:00
* Adds Remote Cluster related API endpoints
New endpoints for the following routes are added:
- Get Remote Clusters at `GET /api/v4/remotecluster`
- Create Remote Cluster at `POST /api/v4/remotecluster`
- Accept Remote Cluster invite at `POST
/api/v4/remotecluster/accept_invite`
- Generate Remote Cluster invite at `POST
/api/v4/remotecluster/{remote_id}/generate_invite`
- Get Remote Cluster at `GET /api/v4/remotecluster/{remote_id}`
- Patch Remote Cluster at `PATCH /api/v4/remotecluster/{remote_id}`
- Delete Remote Cluster at `DELETE /api/v4/remotecluster/{remote_id}`
These endpoints are planned to be used from the system console, and
gated through the `manage_secure_connections` permission.
* Update server/channels/api4/remote_cluster_test.go
Co-authored-by: Doug Lauder <wiggin77@warpmail.net>
* Fix AppError names
---------
Co-authored-by: Doug Lauder <wiggin77@warpmail.net>
Co-authored-by: Mattermost Build <build@mattermost.com>
175 lines
5.3 KiB
Go
175 lines
5.3 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package remotecluster
|
|
|
|
import (
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"net/url"
|
|
"os"
|
|
"path"
|
|
"time"
|
|
|
|
"github.com/wiggin77/merror"
|
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
|
)
|
|
|
|
type SendMsgResultFunc func(msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response, err error)
|
|
|
|
type sendMsgTask struct {
|
|
rc *model.RemoteCluster
|
|
msg model.RemoteClusterMsg
|
|
f SendMsgResultFunc
|
|
}
|
|
|
|
// BroadcastMsg asynchronously sends a message to all remote clusters interested in the message's topic.
|
|
//
|
|
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
|
|
// BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
|
|
//
|
|
// An optional callback can be provided that receives the success or fail result of sending to each remote cluster.
|
|
// Success or fail is regarding message delivery only. If a callback is provided it should return quickly.
|
|
func (rcs *Service) BroadcastMsg(ctx context.Context, msg model.RemoteClusterMsg, f SendMsgResultFunc) error {
|
|
// get list of interested remotes.
|
|
filter := model.RemoteClusterQueryFilter{
|
|
Topic: msg.Topic,
|
|
}
|
|
list, err := rcs.server.GetStore().RemoteCluster().GetAll(0, 999999, filter)
|
|
if err != nil {
|
|
return err
|
|
}
|
|
|
|
errs := merror.New()
|
|
|
|
for _, rc := range list {
|
|
if err := rcs.SendMsg(ctx, msg, rc, f); err != nil {
|
|
errs.Append(err)
|
|
}
|
|
}
|
|
return errs.ErrorOrNil()
|
|
}
|
|
|
|
// SendMsg asynchronously sends a message to a remote cluster.
|
|
//
|
|
// `ctx` determines behaviour when the outbound queue is full. A timeout or deadline context will return a
|
|
// BufferFullError if the message cannot be enqueued before the timeout. A background context will block indefinitely.
|
|
//
|
|
// Nil or error return indicates success or failure of message enqueue only.
|
|
//
|
|
// An optional callback can be provided that receives the response from the remote cluster. The `err` provided to the
|
|
// callback is regarding response decoding only. The `resp` contains the decoded bytes returned from the remote.
|
|
// If a callback is provided it should return quickly.
|
|
func (rcs *Service) SendMsg(ctx context.Context, msg model.RemoteClusterMsg, rc *model.RemoteCluster, f SendMsgResultFunc) error {
|
|
task := sendMsgTask{
|
|
rc: rc,
|
|
msg: msg,
|
|
f: f,
|
|
}
|
|
return rcs.enqueueTask(ctx, rc.RemoteId, task)
|
|
}
|
|
|
|
// sendMsg is called when a sendMsgTask is popped from the send channel.
|
|
func (rcs *Service) sendMsg(task sendMsgTask) {
|
|
var errResp error
|
|
var response Response
|
|
|
|
// Ensure a panic from the callback does not exit the pool goroutine.
|
|
defer func() {
|
|
if errResp != nil {
|
|
response.Err = errResp.Error()
|
|
}
|
|
|
|
// If callback provided then call it with the results.
|
|
if task.f != nil {
|
|
task.f(task.msg, task.rc, &response, errResp)
|
|
}
|
|
}()
|
|
|
|
frame := &model.RemoteClusterFrame{
|
|
RemoteId: task.rc.RemoteId,
|
|
Msg: task.msg,
|
|
}
|
|
|
|
u, err := url.Parse(task.rc.SiteURL)
|
|
if err != nil {
|
|
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceError, "Invalid siteURL while sending message to remote",
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
mlog.String("msgId", task.msg.Id),
|
|
mlog.Err(err),
|
|
)
|
|
errResp = err
|
|
return
|
|
}
|
|
u.Path = path.Join(u.Path, SendMsgURL)
|
|
|
|
respJSON, err := rcs.sendFrameToRemote(SendTimeout, task.rc, frame, u.String())
|
|
|
|
if err != nil {
|
|
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceError, "Remote Cluster send message failed",
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
mlog.String("msgId", task.msg.Id),
|
|
mlog.Err(err),
|
|
)
|
|
errResp = err
|
|
} else {
|
|
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceDebug, "Remote Cluster message sent successfully",
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
mlog.String("msgId", task.msg.Id),
|
|
)
|
|
|
|
if err = json.Unmarshal(respJSON, &response); err != nil {
|
|
rcs.server.Log().Error("Invalid response sending message to remote cluster",
|
|
mlog.String("remote", task.rc.DisplayName),
|
|
mlog.Err(err),
|
|
)
|
|
errResp = err
|
|
}
|
|
}
|
|
}
|
|
|
|
func (rcs *Service) sendFrameToRemote(timeout time.Duration, rc *model.RemoteCluster, frame *model.RemoteClusterFrame, url string) ([]byte, error) {
|
|
body, err := json.Marshal(frame)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
ctx, cancel := context.WithTimeout(context.Background(), timeout)
|
|
defer cancel()
|
|
|
|
req, err := http.NewRequest("POST", url, bytes.NewReader(body))
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
req.Header.Set("Content-Type", "application/json")
|
|
req.Header.Set(model.HeaderRemoteclusterId, rc.RemoteId)
|
|
req.Header.Set(model.HeaderRemoteclusterToken, rc.RemoteToken)
|
|
|
|
resp, err := rcs.httpClient.Do(req.WithContext(ctx))
|
|
if metrics := rcs.server.GetMetrics(); metrics != nil {
|
|
if err != nil || resp.StatusCode != http.StatusOK {
|
|
metrics.IncrementRemoteClusterMsgErrorsCounter(frame.RemoteId, os.IsTimeout(err))
|
|
} else {
|
|
metrics.IncrementRemoteClusterMsgSentCounter(frame.RemoteId)
|
|
}
|
|
}
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
defer resp.Body.Close()
|
|
body, err = io.ReadAll(resp.Body)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
return body, fmt.Errorf("unexpected response: %d - %s", resp.StatusCode, resp.Status)
|
|
}
|
|
return body, nil
|
|
}
|