2019-11-29 06:59:40 -05:00
|
|
|
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
|
|
|
// See LICENSE.txt for license information.
|
2017-03-28 04:58:19 -04:00
|
|
|
|
|
|
|
|
package api4
|
|
|
|
|
|
|
|
|
|
import (
|
|
|
|
|
"net/http"
|
2025-05-30 08:15:20 -04:00
|
|
|
"strconv"
|
2017-03-28 04:58:19 -04:00
|
|
|
|
2021-03-24 05:59:23 -04:00
|
|
|
"github.com/gorilla/websocket"
|
2021-01-07 12:12:43 -05:00
|
|
|
|
2023-06-11 01:24:35 -04:00
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
|
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
|
|
|
|
"github.com/mattermost/mattermost/server/v8/channels/app/platform"
|
2024-04-16 12:49:49 -04:00
|
|
|
"github.com/mattermost/mattermost/server/v8/channels/web"
|
2021-03-31 00:35:32 -04:00
|
|
|
)
|
|
|
|
|
|
|
|
|
|
const (
|
2025-05-30 08:15:20 -04:00
|
|
|
connectionIDParam = "connection_id"
|
|
|
|
|
sequenceNumberParam = "sequence_number"
|
|
|
|
|
postedAckParam = "posted_ack"
|
|
|
|
|
disconnectErrCodeParam = "disconnect_err_code"
|
|
|
|
|
|
|
|
|
|
clientPingTimeoutErrCode = 4000
|
|
|
|
|
clientSequenceMismatchErrCode = 4001
|
2017-03-28 04:58:19 -04:00
|
|
|
)
|
|
|
|
|
|
2025-05-30 08:15:20 -04:00
|
|
|
// validateDisconnectErrCode ensures the specified disconnect error code
|
|
|
|
|
// is a valid websocket close code
|
|
|
|
|
func validateDisconnectErrCode(errCode string) bool {
|
|
|
|
|
if errCode == "" {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// Ensure the disconnect code is a standard close code
|
|
|
|
|
code, err := strconv.Atoi(errCode)
|
|
|
|
|
if err != nil {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
// We only support the standard close codes between
|
|
|
|
|
// 1000 and 1016, and a few custom application codes
|
|
|
|
|
if (code < 1000 || code > 1016) &&
|
|
|
|
|
code != clientPingTimeoutErrCode &&
|
|
|
|
|
code != clientSequenceMismatchErrCode {
|
|
|
|
|
return false
|
|
|
|
|
}
|
|
|
|
|
|
|
|
|
|
return true
|
|
|
|
|
}
|
|
|
|
|
|
2017-09-22 13:54:27 -04:00
|
|
|
func (api *API) InitWebSocket() {
|
2019-10-29 06:44:32 -04:00
|
|
|
// Optionally supports a trailing slash
|
2024-07-15 10:52:03 -04:00
|
|
|
api.BaseRoutes.APIRoot.Handle("/{websocket:websocket(?:\\/)?}", api.APIHandlerTrustRequester(connectWebSocket)).Methods(http.MethodGet)
|
2017-03-28 04:58:19 -04:00
|
|
|
}
|
|
|
|
|
|
|
|
|
|
func connectWebSocket(c *Context, w http.ResponseWriter, r *http.Request) {
|
2021-03-24 05:59:23 -04:00
|
|
|
upgrader := websocket.Upgrader{
|
2021-07-12 14:05:36 -04:00
|
|
|
ReadBufferSize: model.SocketMaxMessageSizeKb,
|
|
|
|
|
WriteBufferSize: model.SocketMaxMessageSizeKb,
|
2021-03-24 05:59:23 -04:00
|
|
|
CheckOrigin: c.App.OriginChecker(),
|
2017-03-28 04:58:19 -04:00
|
|
|
}
|
|
|
|
|
|
2021-03-24 05:59:23 -04:00
|
|
|
ws, err := upgrader.Upgrade(w, r, nil)
|
2017-03-28 04:58:19 -04:00
|
|
|
if err != nil {
|
2024-04-05 22:55:50 -04:00
|
|
|
params := map[string]any{
|
|
|
|
|
"BlockedOrigin": r.Header.Get("Origin"),
|
|
|
|
|
}
|
2024-04-22 06:03:28 -04:00
|
|
|
c.Err = model.NewAppError("connect", "api.web_socket.connect.upgrade.app_error", params, "", http.StatusBadRequest).Wrap(err)
|
2017-03-28 04:58:19 -04:00
|
|
|
return
|
|
|
|
|
}
|
|
|
|
|
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
// We initialize webconn with all the necessary data.
|
|
|
|
|
// If the queues are empty, they are initialized in the constructor.
|
2022-10-06 04:04:21 -04:00
|
|
|
cfg := &platform.WebConnConfig{
|
2024-06-13 03:01:49 -04:00
|
|
|
WebSocket: ws,
|
|
|
|
|
Session: *c.AppContext.Session(),
|
|
|
|
|
TFunc: c.AppContext.T,
|
|
|
|
|
Locale: "",
|
|
|
|
|
Active: true,
|
|
|
|
|
PostedAck: r.URL.Query().Get(postedAckParam) == "true",
|
|
|
|
|
RemoteAddress: c.AppContext.IPAddress(),
|
|
|
|
|
XForwardedFor: c.AppContext.XForwardedFor(),
|
MM-32950: Reliable WebSockets: Basic single server (#17406)
* MM-32950: Reliable WebSockets: Basic single server
This PR adds reliable websocket support for a single server.
Below is a brief overview of the three states of a connection:
Normal:
- All messages are routed via web hub.
- Each web conn has a send queue to which it gets pushed.
- A message gets pulled from the queue, and before it
gets written to the wire, it is added to the dead queue.
Disconnect:
- Hub Unregister gets called, where the connection is just
marked as inactive. And new messages keep getting pushed
to the send queue.
If it gets full, the channel is closed and the conn gets removed
from conn index.
Reconnect:
- We query the hub for the connection ID, and get back the
queues.
- We construct a WebConn reusing the old queues, or a fresh one
depending on whether the connection ID was found or not.
- Now there is a tricky bit here which needs to be carefully processed.
On register, we would always send the hello message in the send queue.
But we cannot do that now because the send queue might already have messages.
Therefore, we don't send the hello message from web hub, if we reuse a connection.
Instead, we move that logic to the web conn write pump. We check if
the sequence number is in dead queue, and if it is, then we drain
the dead queue, and start consuming from the active queue.
No hello message is sent here.
But if the message does not exist in the dead queue, and the sequence number
is actually something that should have existed, then we set
a new connction id and clear the dead queue, and send a hello message.
The client, on receiving a new connection id will automatically
set its sequence number to 0, and make the sync API calls to manage
any lost data.
https://mattermost.atlassian.net/browse/MM-32590
```release-note
NONE
```
* gofmt
* Add EnableReliableWebSockets to the client config
* Refactoring isInDeadQueue
* Passing index to drainDeadQueue
* refactoring webconn
* fix pointer
* review comments
* simplify hasMsgLoss
* safety comment
* fix test
* Trigger CI
* Trigger CI
Co-authored-by: Devin Binnie <devin.binnie@mattermost.com>
Co-authored-by: Mattermod <mattermod@users.noreply.github.com>
2021-04-26 10:21:25 -04:00
|
|
|
}
|
2025-05-30 08:15:20 -04:00
|
|
|
|
|
|
|
|
disconnectErrCode := r.URL.Query().Get(disconnectErrCodeParam)
|
|
|
|
|
if codeValid := validateDisconnectErrCode(disconnectErrCode); codeValid {
|
|
|
|
|
cfg.DisconnectErrCode = disconnectErrCode
|
|
|
|
|
}
|
|
|
|
|
|
2024-04-16 12:49:49 -04:00
|
|
|
// The WebSocket upgrade request coming from mobile is missing the
|
|
|
|
|
// user agent so we need to fallback on the session's metadata.
|
|
|
|
|
if c.AppContext.Session().IsMobileApp() {
|
|
|
|
|
cfg.OriginClient = "mobile"
|
|
|
|
|
} else {
|
|
|
|
|
cfg.OriginClient = string(web.GetOriginClient(r))
|
|
|
|
|
}
|
2021-03-24 05:59:23 -04:00
|
|
|
|
2021-12-03 09:59:57 -05:00
|
|
|
cfg.ConnectionID = r.URL.Query().Get(connectionIDParam)
|
|
|
|
|
if cfg.ConnectionID == "" || c.AppContext.Session().UserId == "" {
|
|
|
|
|
// If not present, we assume client is not capable yet, or it's a fresh connection.
|
|
|
|
|
// We just create a new ID.
|
|
|
|
|
cfg.ConnectionID = model.NewId()
|
|
|
|
|
// In case of fresh connection id, sequence number is already zero.
|
|
|
|
|
} else {
|
2022-10-06 04:04:21 -04:00
|
|
|
cfg, err = c.App.Srv().Platform().PopulateWebConnConfig(c.AppContext.Session(), cfg, r.URL.Query().Get(sequenceNumberParam))
|
2021-12-03 09:59:57 -05:00
|
|
|
if err != nil {
|
2024-11-07 23:27:54 -05:00
|
|
|
c.Logger.Error("Error while populating webconn config", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))
|
2021-12-03 09:59:57 -05:00
|
|
|
ws.Close()
|
|
|
|
|
return
|
2021-03-31 00:35:32 -04:00
|
|
|
}
|
|
|
|
|
}
|
|
|
|
|
|
2022-12-21 14:10:26 -05:00
|
|
|
wc := c.App.Srv().Platform().NewWebConn(cfg, c.App, c.App.Srv().Channels())
|
2021-05-11 06:00:44 -04:00
|
|
|
if c.AppContext.Session().UserId != "" {
|
2024-11-07 23:27:54 -05:00
|
|
|
err = c.App.Srv().Platform().HubRegister(wc)
|
|
|
|
|
if err != nil {
|
|
|
|
|
c.Logger.Error("Error while registering to hub", mlog.String("id", r.URL.Query().Get(connectionIDParam)), mlog.Err(err))
|
|
|
|
|
ws.Close()
|
|
|
|
|
return
|
|
|
|
|
}
|
2017-03-28 04:58:19 -04:00
|
|
|
}
|
|
|
|
|
|
2021-03-24 05:59:23 -04:00
|
|
|
wc.Pump()
|
2017-03-28 04:58:19 -04:00
|
|
|
}
|