mattermost/server/channels/api4/websocket.go

126 lines
3.9 KiB
Go
Raw Permalink Normal View History

// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
// See LICENSE.txt for license information.
package api4
import (
"net/http"
"strconv"
"github.com/gorilla/websocket"
"github.com/mattermost/mattermost/server/public/model"
"github.com/mattermost/mattermost/server/public/shared/mlog"
"github.com/mattermost/mattermost/server/v8/channels/app/platform"
"github.com/mattermost/mattermost/server/v8/channels/web"
)
const (
connectionIDParam = "connection_id"
sequenceNumberParam = "sequence_number"
postedAckParam = "posted_ack"
disconnectErrCodeParam = "disconnect_err_code"
clientPingTimeoutErrCode = 4000
clientSequenceMismatchErrCode = 4001
)
// validateDisconnectErrCode ensures the specified disconnect error code
// is a valid websocket close code
func validateDisconnectErrCode(errCode string) bool {
if errCode == "" {
return false
}
// Ensure the disconnect code is a standard close code
code, err := strconv.Atoi(errCode)
if err != nil {
return false
}
// We only support the standard close codes between
// 1000 and 1016, and a few custom application codes
if (code < 1000 || code > 1016) &&
code != clientPingTimeoutErrCode &&
code != clientSequenceMismatchErrCode {
return false
}
return true
}
func (api *API) InitWebSocket() {
// Optionally supports a trailing slash
api.BaseRoutes.APIRoot.Handle("/{websocket:websocket(?:\\/)?}", api.APIHandlerTrustRequester(connectWebSocket)).Methods(http.MethodGet)
}
func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
upgrader := websocket.Upgrader{
2021-07-12 14:05:36 -04:00
ReadBufferSize: model.SocketMaxMessageSizeKb,
WriteBufferSize: model.SocketMaxMessageSizeKb,
CheckOrigin: c.App.OriginChecker(),
}
ws, err := upgrader.Upgrade(w, r, nil)
if err != nil {
params := map[string]any{
"BlockedOrigin": r.Header.Get("Origin"),
}
c.Err = model.NewAppError("connect", "api.web_socket.connect.upgrade.app_error", params, "", http.StatusBadRequest).Wrap(err)
return
}
MM-32950: Reliable WebSockets: Basic single server (#17406) * MM-32950: Reliable WebSockets: Basic single server This PR adds reliable websocket support for a single server. Below is a brief overview of the three states of a connection: Normal: - All messages are routed via web hub. - Each web conn has a send queue to which it gets pushed. - A message gets pulled from the queue, and before it gets written to the wire, it is added to the dead queue. Disconnect: - Hub Unregister gets called, where the connection is just marked as inactive. And new messages keep getting pushed to the send queue. If it gets full, the channel is closed and the conn gets removed from conn index. Reconnect: - We query the hub for the connection ID, and get back the queues. - We construct a WebConn reusing the old queues, or a fresh one depending on whether the connection ID was found or not. - Now there is a tricky bit here which needs to be carefully processed. On register, we would always send the hello message in the send queue. But we cannot do that now because the send queue might already have messages. Therefore, we don't send the hello message from web hub, if we reuse a connection. Instead, we move that logic to the web conn write pump. We check if the sequence number is in dead queue, and if it is, then we drain the dead queue, and start consuming from the active queue. No hello message is sent here. But if the message does not exist in the dead queue, and the sequence number is actually something that should have existed, then we set a new connction id and clear the dead queue, and send a hello message. The client, on receiving a new connection id will automatically set its sequence number to 0, and make the sync API calls to manage any lost data. https://mattermost.atlassian.net/browse/MM-32590 ```release-note NONE ``` * gofmt * Add EnableReliableWebSockets to the client config * Refactoring isInDeadQueue * Passing index to drainDeadQueue * refactoring webconn * fix pointer * review comments * simplify hasMsgLoss * safety comment * fix test * Trigger CI * Trigger CI Co-authored-by: Devin Binnie <devin.binnie@mattermost.com> Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
// We initialize webconn with all the necessary data.
// If the queues are empty, they are initialized in the constructor.
cfg := &platform.WebConnConfig{
WebSocket: ws,
Session: *c.AppContext.Session(),
TFunc: c.AppContext.T,
Locale: "",
Active: true,
PostedAck: r.URL.Query().Get(postedAckParam) == "true",
RemoteAddress: c.AppContext.IPAddress(),
XForwardedFor: c.AppContext.XForwardedFor(),
MM-32950: Reliable WebSockets: Basic single server (#17406) * MM-32950: Reliable WebSockets: Basic single server This PR adds reliable websocket support for a single server. Below is a brief overview of the three states of a connection: Normal: - All messages are routed via web hub. - Each web conn has a send queue to which it gets pushed. - A message gets pulled from the queue, and before it gets written to the wire, it is added to the dead queue. Disconnect: - Hub Unregister gets called, where the connection is just marked as inactive. And new messages keep getting pushed to the send queue. If it gets full, the channel is closed and the conn gets removed from conn index. Reconnect: - We query the hub for the connection ID, and get back the queues. - We construct a WebConn reusing the old queues, or a fresh one depending on whether the connection ID was found or not. - Now there is a tricky bit here which needs to be carefully processed. On register, we would always send the hello message in the send queue. But we cannot do that now because the send queue might already have messages. Therefore, we don't send the hello message from web hub, if we reuse a connection. Instead, we move that logic to the web conn write pump. We check if the sequence number is in dead queue, and if it is, then we drain the dead queue, and start consuming from the active queue. No hello message is sent here. But if the message does not exist in the dead queue, and the sequence number is actually something that should have existed, then we set a new connction id and clear the dead queue, and send a hello message. The client, on receiving a new connection id will automatically set its sequence number to 0, and make the sync API calls to manage any lost data. https://mattermost.atlassian.net/browse/MM-32590 ```release-note NONE ``` * gofmt * Add EnableReliableWebSockets to the client config * Refactoring isInDeadQueue * Passing index to drainDeadQueue * refactoring webconn * fix pointer * review comments * simplify hasMsgLoss * safety comment * fix test * Trigger CI * Trigger CI Co-authored-by: Devin Binnie <devin.binnie@mattermost.com> Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
}
disconnectErrCode := r.URL.Query().Get(disconnectErrCodeParam)
if codeValid := validateDisconnectErrCode(disconnectErrCode); codeValid {
cfg.DisconnectErrCode = disconnectErrCode
}
// The WebSocket upgrade request coming from mobile is missing the
// user agent so we need to fallback on the session's metadata.
if c.AppContext.Session().IsMobileApp() {
cfg.OriginClient = "mobile"
} else {
cfg.OriginClient = string(web.GetOriginClient(r))
}
cfg.ConnectionID = r.URL.Query().Get(connectionIDParam)
if cfg.ConnectionID == "" || c.AppContext.Session().UserId == "" {
// If not present, we assume client is not capable yet, or it's a fresh connection.
// We just create a new ID.
cfg.ConnectionID = model.NewId()
// In case of fresh connection id, sequence number is already zero.
} else {
cfg, err = c.App.Srv().Platform().PopulateWebConnConfig(c.AppContext.Session(), cfg, r.URL.Query().Get(sequenceNumberParam))
if err != nil {
c.Logger.Error("Error while populating webconn config", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))
ws.Close()
return
}
}
wc := c.App.Srv().Platform().NewWebConn(cfg, c.App, c.App.Srv().Channels())
if c.AppContext.Session().UserId != "" {
err = c.App.Srv().Platform().HubRegister(wc)
if err != nil {
c.Logger.Error("Error while registering to hub", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))
ws.Close()
return
}
}
wc.Pump()
}