mattermost/server/platform/services/remotecluster/recv.go
JG Heithcock 1386024013
Some checks are pending
API / build (push) Waiting to run
Server CI / Compute Go Version (push) Waiting to run
Server CI / Check mocks (push) Blocked by required conditions
Server CI / Check go mod tidy (push) Blocked by required conditions
Server CI / check-style (push) Blocked by required conditions
Server CI / Check serialization methods for hot structs (push) Blocked by required conditions
Server CI / Vet API (push) Blocked by required conditions
Server CI / Check migration files (push) Blocked by required conditions
Server CI / Generate email templates (push) Blocked by required conditions
Server CI / Check store layers (push) Blocked by required conditions
Server CI / Check mmctl docs (push) Blocked by required conditions
Server CI / Postgres with binary parameters (push) Blocked by required conditions
Server CI / Postgres (push) Blocked by required conditions
Server CI / Postgres (FIPS) (push) Blocked by required conditions
Server CI / Generate Test Coverage (push) Blocked by required conditions
Server CI / Run mmctl tests (push) Blocked by required conditions
Server CI / Run mmctl tests (FIPS) (push) Blocked by required conditions
Server CI / Build mattermost server app (push) Blocked by required conditions
Web App CI / check-lint (push) Waiting to run
Web App CI / check-i18n (push) Waiting to run
Web App CI / check-types (push) Waiting to run
Web App CI / test (push) Waiting to run
Web App CI / build (push) Waiting to run
Fix MM-65152 (#34199)
2025-10-29 07:52:34 -07:00

90 lines
2.9 KiB
Go

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package remotecluster
import (
"fmt"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
)
// ReceiveIncomingMsg is called by the Rest API layer, or websocket layer (future), when a Remote Cluster
// message is received. Here we route the message to any topic listeners.
// `rc` and `msg` cannot be nil.
func (rcs *Service) ReceiveIncomingMsg(rc *model.RemoteCluster, msg model.RemoteClusterMsg) Response {
rcs.mux.RLock()
defer rcs.mux.RUnlock()
if metrics := rcs.server.GetMetrics(); metrics != nil {
metrics.IncrementRemoteClusterMsgReceivedCounter(rc.RemoteId)
}
rcSanitized := *rc
rcSanitized.Token = ""
rcSanitized.RemoteToken = ""
var response Response
response.Status = ResponseStatusOK
listeners := rcs.getTopicListeners(msg.Topic)
for _, l := range listeners {
if err := callback(l, msg, &rcSanitized, &response); err != nil {
rcs.server.Log().Log(mlog.LvlRemoteClusterServiceError, "Error from remote cluster message listener",
mlog.String("msgId", msg.Id), mlog.String("topic", msg.Topic), mlog.String("remote", rc.DisplayName), mlog.Err(err))
response.Status = ResponseStatusFail
response.Err = err.Error()
}
}
return response
}
func callback(listener TopicListener, msg model.RemoteClusterMsg, rc *model.RemoteCluster, resp *Response) (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("%v", r)
}
}()
err = listener(msg, rc, resp)
return
}
// ReceiveInviteConfirmation is called by the Rest API layer when a Remote Cluster accepts an invitation from this
// local cluster.
func (rcs *Service) ReceiveInviteConfirmation(confirm model.RemoteClusterInvite) (*model.RemoteCluster, error) {
store := rcs.server.GetStore().RemoteCluster()
rc, err := store.Get(confirm.RemoteId, false)
if err != nil {
return nil, fmt.Errorf("cannot accept invite confirmation for remote %s: %w", confirm.RemoteId, err)
}
if rc.IsConfirmed() {
return nil, fmt.Errorf("cannot accept invite confirmation for remote %s: %w", confirm.RemoteId, RemoteClusterAlreadyConfirmedError)
}
rc.SiteURL = confirm.SiteURL
rc.RemoteToken = confirm.Token
// If the accepting cluster sent a RefreshedToken (its RemoteToken), set it as our Token
if confirm.Version >= 2 && confirm.RefreshedToken != "" {
rc.Token = confirm.RefreshedToken
} else {
// For older versions or if no RefreshedToken was provided, generate a new token
// to invalidate the original invite token and prevent reuse
rc.Token = model.NewId()
}
rcUpdated, err := store.Update(rc)
if err != nil {
return nil, fmt.Errorf("cannot apply invite confirmation for remote %s: %w", confirm.RemoteId, err)
}
// issue the first ping right away. The goroutine will exit when ping completes or PingTimeout exceeded.
go rcs.PingNow(rcUpdated)
return rcUpdated, nil
}