mirror of
https://github.com/mattermost/mattermost.git
synced 2026-02-20 00:10:19 -05:00
When a user disconnects from the hub, we would spawn off a goroutine which would make a cluster request, and then update the user status as offline in the DB. This was another case of unbounded concurrency where the number of goroutines spawned was user controlled. Therefore, we would see a clear spike in DB connections on master when a lot of users would suddenly disconnect. To fix this, we implement concurrency control in two areas: 1. In making the cluster request. We implement a counting semaphore per-hub to avoid making unbounded cluster requests. 2. We use a buffered channel with a periodic flusher to process status updates. We also add a new store method to upsert multiple statuses in a single query. The statusUpdateThreshold is set to 32, which means no more than 32 rows will be upserted at one time, keeping the SQL query load reasonable. https://mattermost.atlassian.net/browse/MM-64298 ```release-note We improve DB connection spikes on user disconnect by processing status updates in batches. ```
590 lines
18 KiB
Go
590 lines
18 KiB
Go
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
|
// See LICENSE.txt for license information.
|
|
|
|
package platform
|
|
|
|
import (
|
|
"encoding/json"
|
|
"errors"
|
|
"net/http"
|
|
"time"
|
|
|
|
"github.com/mattermost/mattermost/server/public/model"
|
|
"github.com/mattermost/mattermost/server/public/shared/mlog"
|
|
"github.com/mattermost/mattermost/server/v8/channels/store"
|
|
"github.com/mattermost/mattermost/server/v8/platform/services/cache"
|
|
)
|
|
|
|
func (ps *PlatformService) AddStatusCacheSkipClusterSend(status *model.Status) {
|
|
if err := ps.statusCache.SetWithDefaultExpiry(status.UserId, status); err != nil {
|
|
ps.logger.Warn("Failed to set cache entry for status", mlog.String("user_id", status.UserId), mlog.Err(err))
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) AddStatusCache(status *model.Status) {
|
|
ps.AddStatusCacheSkipClusterSend(status)
|
|
|
|
if ps.Cluster() != nil {
|
|
statusJSON, err := json.Marshal(status)
|
|
if err != nil {
|
|
ps.logger.Warn("Failed to encode status to JSON", mlog.Err(err))
|
|
}
|
|
msg := &model.ClusterMessage{
|
|
Event: model.ClusterEventUpdateStatus,
|
|
SendType: model.ClusterSendBestEffort,
|
|
Data: statusJSON,
|
|
}
|
|
ps.Cluster().SendClusterMessage(msg)
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) GetAllStatuses() map[string]*model.Status {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return map[string]*model.Status{}
|
|
}
|
|
|
|
statusMap := map[string]*model.Status{}
|
|
err := ps.statusCache.Scan(func(keys []string) error {
|
|
if len(keys) == 0 {
|
|
return nil
|
|
}
|
|
|
|
toPass := allocateCacheTargets[*model.Status](len(keys))
|
|
errs := ps.statusCache.GetMulti(keys, toPass)
|
|
for i, err := range errs {
|
|
if err != nil {
|
|
if err != cache.ErrKeyNotFound {
|
|
return err
|
|
}
|
|
continue
|
|
}
|
|
gotStatus := *(toPass[i].(**model.Status))
|
|
if gotStatus != nil {
|
|
statusMap[keys[i]] = gotStatus
|
|
continue
|
|
}
|
|
ps.logger.Warn("Found nil status in GetAllStatuses. This is not expected")
|
|
}
|
|
return nil
|
|
})
|
|
if err != nil {
|
|
ps.logger.Warn("Error while getting all status in GetAllStatuses", mlog.Err(err))
|
|
return nil
|
|
}
|
|
return statusMap
|
|
}
|
|
|
|
func (ps *PlatformService) GetStatusesByIds(userIDs []string) (map[string]any, *model.AppError) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return map[string]any{}, nil
|
|
}
|
|
|
|
statusMap := map[string]any{}
|
|
metrics := ps.Metrics()
|
|
missingUserIds := []string{}
|
|
|
|
toPass := allocateCacheTargets[*model.Status](len(userIDs))
|
|
// First, we do a GetMulti to get all the status objects.
|
|
errs := ps.statusCache.GetMulti(userIDs, toPass)
|
|
for i, err := range errs {
|
|
if err != nil {
|
|
if err != cache.ErrKeyNotFound {
|
|
ps.logger.Warn("Error in GetStatusesByIds: ", mlog.Err(err))
|
|
}
|
|
missingUserIds = append(missingUserIds, userIDs[i])
|
|
if metrics != nil {
|
|
metrics.IncrementMemCacheMissCounter(ps.statusCache.Name())
|
|
}
|
|
} else {
|
|
// If we get a hit, we need to cast it back to the right type.
|
|
gotStatus := *(toPass[i].(**model.Status))
|
|
if gotStatus == nil {
|
|
ps.logger.Warn("Found nil in GetStatusesByIds. This is not expected")
|
|
continue
|
|
}
|
|
statusMap[userIDs[i]] = gotStatus.Status
|
|
if metrics != nil {
|
|
metrics.IncrementMemCacheHitCounter(ps.statusCache.Name())
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(missingUserIds) > 0 {
|
|
// For cache misses, we fill them back from the DB.
|
|
statuses, err := ps.Store.Status().GetByIds(missingUserIds)
|
|
if err != nil {
|
|
return nil, model.NewAppError("GetStatusesByIds", "app.status.get.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
|
|
}
|
|
|
|
for _, s := range statuses {
|
|
ps.AddStatusCacheSkipClusterSend(s)
|
|
statusMap[s.UserId] = s.Status
|
|
}
|
|
}
|
|
|
|
// For the case where the user does not have a row in the Status table and cache
|
|
for _, userID := range missingUserIds {
|
|
if _, ok := statusMap[userID]; !ok {
|
|
statusMap[userID] = model.StatusOffline
|
|
}
|
|
}
|
|
|
|
return statusMap, nil
|
|
}
|
|
|
|
// GetUserStatusesByIds used by apiV4
|
|
func (ps *PlatformService) GetUserStatusesByIds(userIDs []string) ([]*model.Status, *model.AppError) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return []*model.Status{}, nil
|
|
}
|
|
|
|
var statusMap []*model.Status
|
|
metrics := ps.Metrics()
|
|
|
|
missingUserIds := []string{}
|
|
toPass := allocateCacheTargets[*model.Status](len(userIDs))
|
|
// First, we do a GetMulti to get all the status objects.
|
|
errs := ps.statusCache.GetMulti(userIDs, toPass)
|
|
for i, err := range errs {
|
|
if err != nil {
|
|
if err != cache.ErrKeyNotFound {
|
|
ps.logger.Warn("Error in GetUserStatusesByIds: ", mlog.Err(err))
|
|
}
|
|
missingUserIds = append(missingUserIds, userIDs[i])
|
|
if metrics != nil {
|
|
metrics.IncrementMemCacheMissCounter(ps.statusCache.Name())
|
|
}
|
|
} else {
|
|
// If we get a hit, we need to cast it back to the right type.
|
|
gotStatus := *(toPass[i].(**model.Status))
|
|
if gotStatus == nil {
|
|
ps.logger.Warn("Found nil in GetUserStatusesByIds. This is not expected")
|
|
continue
|
|
}
|
|
statusMap = append(statusMap, gotStatus)
|
|
if metrics != nil {
|
|
metrics.IncrementMemCacheHitCounter(ps.statusCache.Name())
|
|
}
|
|
}
|
|
}
|
|
|
|
if len(missingUserIds) > 0 {
|
|
// For cache misses, we fill them back from the DB.
|
|
statuses, err := ps.Store.Status().GetByIds(missingUserIds)
|
|
if err != nil {
|
|
return nil, model.NewAppError("GetUserStatusesByIds", "app.status.get.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
|
|
}
|
|
|
|
for _, s := range statuses {
|
|
ps.AddStatusCacheSkipClusterSend(s)
|
|
}
|
|
|
|
statusMap = append(statusMap, statuses...)
|
|
}
|
|
|
|
// For the case where the user does not have a row in the Status table and cache
|
|
// remove the existing ids from missingUserIds and then create a offline state for the missing ones
|
|
// This also return the status offline for the non-existing Ids in the system
|
|
for i := 0; i < len(missingUserIds); i++ {
|
|
missingUserId := missingUserIds[i]
|
|
for _, userMap := range statusMap {
|
|
if missingUserId == userMap.UserId {
|
|
missingUserIds = append(missingUserIds[:i], missingUserIds[i+1:]...)
|
|
i--
|
|
break
|
|
}
|
|
}
|
|
}
|
|
for _, userID := range missingUserIds {
|
|
statusMap = append(statusMap, &model.Status{UserId: userID, Status: "offline"})
|
|
}
|
|
|
|
return statusMap, nil
|
|
}
|
|
|
|
func (ps *PlatformService) BroadcastStatus(status *model.Status) {
|
|
if ps.Busy.IsBusy() {
|
|
// this is considered a non-critical service and will be disabled when server busy.
|
|
return
|
|
}
|
|
event := model.NewWebSocketEvent(model.WebsocketEventStatusChange, "", "", status.UserId, nil, "")
|
|
event.Add("status", status.Status)
|
|
event.Add("user_id", status.UserId)
|
|
ps.Publish(event)
|
|
}
|
|
|
|
func (ps *PlatformService) SaveAndBroadcastStatus(status *model.Status) {
|
|
ps.AddStatusCache(status)
|
|
|
|
if err := ps.Store.Status().SaveOrUpdate(status); err != nil {
|
|
ps.Log().Warn("Failed to save status", mlog.String("user_id", status.UserId), mlog.Err(err))
|
|
}
|
|
|
|
ps.BroadcastStatus(status)
|
|
}
|
|
|
|
func (ps *PlatformService) GetStatusFromCache(userID string) *model.Status {
|
|
var status *model.Status
|
|
if err := ps.statusCache.Get(userID, &status); err == nil {
|
|
return status
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (ps *PlatformService) GetStatus(userID string) (*model.Status, *model.AppError) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return &model.Status{}, nil
|
|
}
|
|
|
|
status := ps.GetStatusFromCache(userID)
|
|
if status != nil {
|
|
return status, nil
|
|
}
|
|
|
|
status, err := ps.Store.Status().Get(userID)
|
|
if err != nil {
|
|
var nfErr *store.ErrNotFound
|
|
switch {
|
|
case errors.As(err, &nfErr):
|
|
return nil, model.NewAppError("GetStatus", "app.status.get.missing.app_error", nil, "", http.StatusNotFound).Wrap(err)
|
|
default:
|
|
return nil, model.NewAppError("GetStatus", "app.status.get.app_error", nil, "", http.StatusInternalServerError).Wrap(err)
|
|
}
|
|
}
|
|
|
|
return status, nil
|
|
}
|
|
|
|
// SetStatusLastActivityAt sets the last activity at for a user on the local app server and updates
|
|
// status to away if needed. Used by the WS to set status to away if an 'online' device disconnects
|
|
// while an 'away' device is still connected
|
|
func (ps *PlatformService) SetStatusLastActivityAt(userID string, activityAt int64) {
|
|
var status *model.Status
|
|
var err *model.AppError
|
|
if status, err = ps.GetStatus(userID); err != nil {
|
|
return
|
|
}
|
|
|
|
status.LastActivityAt = activityAt
|
|
|
|
ps.AddStatusCacheSkipClusterSend(status)
|
|
ps.SetStatusAwayIfNeeded(userID, false)
|
|
if ps.sharedChannelService != nil {
|
|
ps.sharedChannelService.NotifyUserStatusChanged(status)
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) UpdateLastActivityAtIfNeeded(session model.Session) {
|
|
now := model.GetMillis()
|
|
|
|
ps.UpdateWebConnUserActivity(session, now)
|
|
|
|
if now-session.LastActivityAt < model.SessionActivityTimeout {
|
|
return
|
|
}
|
|
|
|
if err := ps.Store.Session().UpdateLastActivityAt(session.Id, now); err != nil {
|
|
ps.Log().Warn("Failed to update LastActivityAt", mlog.String("user_id", session.UserId), mlog.String("session_id", session.Id), mlog.Err(err))
|
|
}
|
|
|
|
session.LastActivityAt = now
|
|
if err := ps.AddSessionToCache(&session); err != nil {
|
|
ps.Log().Warn("Failed to add session to cache", mlog.String("user_id", session.UserId), mlog.String("session_id", session.Id), mlog.Err(err))
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) SetStatusOnline(userID string, manual bool) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return
|
|
}
|
|
|
|
broadcast := false
|
|
|
|
var oldStatus string = model.StatusOffline
|
|
var oldTime int64
|
|
var oldManual bool
|
|
var status *model.Status
|
|
var err *model.AppError
|
|
|
|
if status, err = ps.GetStatus(userID); err != nil {
|
|
status = &model.Status{UserId: userID, Status: model.StatusOnline, Manual: false, LastActivityAt: model.GetMillis(), ActiveChannel: ""}
|
|
broadcast = true
|
|
} else {
|
|
if status.Manual && !manual {
|
|
return // manually set status always overrides non-manual one
|
|
}
|
|
|
|
if status.Status != model.StatusOnline {
|
|
broadcast = true
|
|
}
|
|
|
|
oldStatus = status.Status
|
|
oldTime = status.LastActivityAt
|
|
oldManual = status.Manual
|
|
|
|
status.Status = model.StatusOnline
|
|
status.Manual = false // for "online" there's no manual setting
|
|
status.LastActivityAt = model.GetMillis()
|
|
}
|
|
|
|
ps.AddStatusCache(status)
|
|
|
|
// Only update the database if the status has changed, the status has been manually set,
|
|
// or enough time has passed since the previous action
|
|
if status.Status != oldStatus || status.Manual != oldManual || status.LastActivityAt-oldTime > model.StatusMinUpdateTime {
|
|
if broadcast {
|
|
if err := ps.Store.Status().SaveOrUpdate(status); err != nil {
|
|
mlog.Warn("Failed to save status", mlog.String("user_id", userID), mlog.Err(err), mlog.String("user_id", userID))
|
|
}
|
|
} else {
|
|
if err := ps.Store.Status().UpdateLastActivityAt(status.UserId, status.LastActivityAt); err != nil {
|
|
mlog.Error("Failed to save status", mlog.String("user_id", userID), mlog.Err(err), mlog.String("user_id", userID))
|
|
}
|
|
}
|
|
if ps.sharedChannelService != nil {
|
|
ps.sharedChannelService.NotifyUserStatusChanged(status)
|
|
}
|
|
}
|
|
|
|
if broadcast {
|
|
ps.BroadcastStatus(status)
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) SetStatusOffline(userID string, manual bool, force bool) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return
|
|
}
|
|
|
|
status, err := ps.GetStatus(userID)
|
|
if err != nil {
|
|
ps.Log().Warn("Error getting status. Setting it to offline forcefully.", mlog.String("user_id", userID), mlog.Err(err))
|
|
} else if !force && status.Manual && !manual {
|
|
return // manually set status always overrides non-manual one
|
|
}
|
|
ps._setStatusOfflineAndNotify(userID, manual)
|
|
}
|
|
|
|
func (ps *PlatformService) _setStatusOfflineAndNotify(userID string, manual bool) {
|
|
status := &model.Status{UserId: userID, Status: model.StatusOffline, Manual: manual, LastActivityAt: model.GetMillis(), ActiveChannel: ""}
|
|
ps.SaveAndBroadcastStatus(status)
|
|
if ps.sharedChannelService != nil {
|
|
ps.sharedChannelService.NotifyUserStatusChanged(status)
|
|
}
|
|
}
|
|
|
|
// QueueSetStatusOffline queues a status update to set a user offline
|
|
// instead of directly updating it for better performance during high load
|
|
func (ps *PlatformService) QueueSetStatusOffline(userID string, manual bool) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return
|
|
}
|
|
|
|
status, err := ps.GetStatus(userID)
|
|
if err != nil {
|
|
ps.Log().Warn("Error getting status. Setting it to offline forcefully.", mlog.String("user_id", userID), mlog.Err(err))
|
|
} else if status.Manual && !manual {
|
|
// Force will be false here, so no need to add another variable.
|
|
return // manually set status always overrides non-manual one
|
|
}
|
|
|
|
status = &model.Status{
|
|
UserId: userID,
|
|
Status: model.StatusOffline,
|
|
Manual: manual,
|
|
LastActivityAt: model.GetMillis(),
|
|
ActiveChannel: "",
|
|
}
|
|
|
|
select {
|
|
case ps.statusUpdateChan <- status:
|
|
// Successfully queued
|
|
default:
|
|
// Channel is full, fall back to direct update
|
|
ps.Log().Warn("Status update channel is full. Falling back to direct update")
|
|
ps._setStatusOfflineAndNotify(userID, manual)
|
|
}
|
|
}
|
|
|
|
const (
|
|
statusUpdateBufferSize = sendQueueSize // We use the webConn sendQueue size as a reference point for the buffer size.
|
|
statusUpdateFlushThreshold = statusUpdateBufferSize / 8
|
|
statusUpdateBatchInterval = 500 * time.Millisecond // Max time to wait before processing
|
|
)
|
|
|
|
// processStatusUpdates processes status updates in batches for better performance
|
|
// This runs as a goroutine and continuously monitors the statusUpdateChan
|
|
func (ps *PlatformService) processStatusUpdates() {
|
|
defer close(ps.statusUpdateDoneSignal)
|
|
|
|
statusBatch := make(map[string]*model.Status)
|
|
ticker := time.NewTicker(statusUpdateBatchInterval)
|
|
defer ticker.Stop()
|
|
|
|
flush := func(broadcast bool) {
|
|
if len(statusBatch) == 0 {
|
|
return
|
|
}
|
|
|
|
// Add each status to cache.
|
|
for _, status := range statusBatch {
|
|
ps.AddStatusCache(status)
|
|
}
|
|
|
|
// Process statuses in batch
|
|
if err := ps.Store.Status().SaveOrUpdateMany(statusBatch); err != nil {
|
|
ps.logger.Warn("Failed to save multiple statuses", mlog.Err(err))
|
|
}
|
|
|
|
// Broadcast each status only if hub is still running
|
|
if broadcast {
|
|
for _, status := range statusBatch {
|
|
ps.BroadcastStatus(status)
|
|
if ps.sharedChannelService != nil {
|
|
ps.sharedChannelService.NotifyUserStatusChanged(status)
|
|
}
|
|
}
|
|
}
|
|
|
|
clear(statusBatch)
|
|
}
|
|
|
|
for {
|
|
select {
|
|
case status := <-ps.statusUpdateChan:
|
|
// In case of duplicates, we override the last entry
|
|
statusBatch[status.UserId] = status
|
|
|
|
if len(statusBatch) >= statusUpdateFlushThreshold {
|
|
ps.logger.Debug("Flushing statuses because the current buffer exceeded the flush threshold.", mlog.Int("current_buffer", len(statusBatch)), mlog.Int("flush_threshold", statusUpdateFlushThreshold))
|
|
flush(true)
|
|
}
|
|
case <-ticker.C:
|
|
flush(true)
|
|
case <-ps.statusUpdateExitSignal:
|
|
// Process any remaining statuses before shutting down
|
|
// Skip broadcast since hub is already stopped
|
|
ps.logger.Debug("Exit signal received. Flushing any remaining statuses.")
|
|
flush(false)
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) SetStatusAwayIfNeeded(userID string, manual bool) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return
|
|
}
|
|
|
|
status, err := ps.GetStatus(userID)
|
|
|
|
if err != nil {
|
|
status = &model.Status{UserId: userID, Status: model.StatusOffline, Manual: manual, LastActivityAt: 0, ActiveChannel: ""}
|
|
}
|
|
|
|
if !manual && status.Manual {
|
|
return // manually set status always overrides non-manual one
|
|
}
|
|
|
|
if !manual {
|
|
if status.Status == model.StatusAway {
|
|
return
|
|
}
|
|
|
|
if !ps.isUserAway(status.LastActivityAt) {
|
|
return
|
|
}
|
|
}
|
|
|
|
status.Status = model.StatusAway
|
|
status.Manual = manual
|
|
status.ActiveChannel = ""
|
|
|
|
ps.SaveAndBroadcastStatus(status)
|
|
if ps.sharedChannelService != nil {
|
|
ps.sharedChannelService.NotifyUserStatusChanged(status)
|
|
}
|
|
}
|
|
|
|
// SetStatusDoNotDisturbTimed takes endtime in unix epoch format in UTC
|
|
// and sets status of given userId to dnd which will be restored back after endtime
|
|
func (ps *PlatformService) SetStatusDoNotDisturbTimed(userID string, endtime int64) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return
|
|
}
|
|
|
|
status, err := ps.GetStatus(userID)
|
|
|
|
if err != nil {
|
|
status = &model.Status{UserId: userID, Status: model.StatusOffline, Manual: false, LastActivityAt: 0, ActiveChannel: ""}
|
|
}
|
|
|
|
status.PrevStatus = status.Status
|
|
status.Status = model.StatusDnd
|
|
status.Manual = true
|
|
|
|
status.DNDEndTime = truncateDNDEndTime(endtime)
|
|
|
|
ps.SaveAndBroadcastStatus(status)
|
|
if ps.sharedChannelService != nil {
|
|
ps.sharedChannelService.NotifyUserStatusChanged(status)
|
|
}
|
|
}
|
|
|
|
// truncateDNDEndTime takes a user-provided timestamp (in seconds) for when their DND expiry should end and truncates
|
|
// it to line up with the DND expiry job so that the user's DND time doesn't expire late by an interval. The job to
|
|
// expire statuses runs every minute currently, so this trims the seconds and milliseconds off the given timestamp.
|
|
//
|
|
// This will result in statuses expiring slightly earlier than specified in the UI, but the status will expire at
|
|
// the correct time on the wall clock. For example, if the time is currently 13:04:29 and the user sets the expiry to
|
|
// 5 minutes, truncating will make the status will expire at 13:09:00 instead of at 13:10:00.
|
|
//
|
|
// Note that the timestamps used by this are in seconds, not milliseconds. This matches UserStatus.DNDEndTime.
|
|
func truncateDNDEndTime(endtime int64) int64 {
|
|
return time.Unix(endtime, 0).Truncate(model.DNDExpiryInterval).Unix()
|
|
}
|
|
|
|
func (ps *PlatformService) SetStatusDoNotDisturb(userID string) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return
|
|
}
|
|
|
|
status, err := ps.GetStatus(userID)
|
|
|
|
if err != nil {
|
|
status = &model.Status{UserId: userID, Status: model.StatusOffline, Manual: false, LastActivityAt: 0, ActiveChannel: ""}
|
|
}
|
|
|
|
status.Status = model.StatusDnd
|
|
status.Manual = true
|
|
|
|
ps.SaveAndBroadcastStatus(status)
|
|
if ps.sharedChannelService != nil {
|
|
ps.sharedChannelService.NotifyUserStatusChanged(status)
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) SetStatusOutOfOffice(userID string) {
|
|
if !*ps.Config().ServiceSettings.EnableUserStatuses {
|
|
return
|
|
}
|
|
|
|
status, err := ps.GetStatus(userID)
|
|
|
|
if err != nil {
|
|
status = &model.Status{UserId: userID, Status: model.StatusOutOfOffice, Manual: false, LastActivityAt: 0, ActiveChannel: ""}
|
|
}
|
|
|
|
status.Status = model.StatusOutOfOffice
|
|
status.Manual = true
|
|
|
|
ps.SaveAndBroadcastStatus(status)
|
|
if ps.sharedChannelService != nil {
|
|
ps.sharedChannelService.NotifyUserStatusChanged(status)
|
|
}
|
|
}
|
|
|
|
func (ps *PlatformService) isUserAway(lastActivityAt int64) bool {
|
|
return model.GetMillis()-lastActivityAt >= *ps.Config().TeamSettings.UserStatusAwayTimeout*1000
|
|
}
|