1
0
mirror of https://github.com/mattermost/focalboard.git synced 2025-11-06 09:19:31 +02:00

Recovers inactive websockets connections on reconnect in plugin mode (#1324)

* Stores a cache of inactive connections so their subscriptions are recovered on reconnections

* Adds test for getUserIDsForWorkspace and simplify messages through helpers

* Make the plugin websocket client more resilient

* Remove missed event callback and limit ws state polling to one at a time

* Add read lock for the plugin adapter and guarantee atomic ops on inactiveAt

* Add mutex to the plugin adapter client and tests to cover for races

* Split plugin adapter mutex in two and use them to lock only on data access

* Group plugin adapter fields by the mutex that's guarding them
This commit is contained in:
Miguel de la Cruz
2021-09-29 18:19:34 +02:00
committed by GitHub
parent 7970d4e6c5
commit ce98ec55fc
9 changed files with 3217 additions and 90 deletions

57
server/ws/helpers_test.go Normal file
View File

@@ -0,0 +1,57 @@
package ws
import (
"testing"
authMocks "github.com/mattermost/focalboard/server/auth/mocks"
wsMocks "github.com/mattermost/focalboard/server/ws/mocks"
mmModel "github.com/mattermost/mattermost-server/v6/model"
"github.com/golang/mock/gomock"
)
type TestHelper struct {
api *wsMocks.MockAPI
auth *authMocks.MockAuthInterface
ctrl *gomock.Controller
pa *PluginAdapter
}
func SetupTestHelper(t *testing.T) *TestHelper {
ctrl := gomock.NewController(t)
mockAPI := wsMocks.NewMockAPI(ctrl)
mockAuth := authMocks.NewMockAuthInterface(ctrl)
mockAPI.EXPECT().LogDebug(gomock.Any(), gomock.Any()).AnyTimes()
mockAPI.EXPECT().LogInfo(gomock.Any(), gomock.Any()).AnyTimes()
mockAPI.EXPECT().LogError(gomock.Any(), gomock.Any()).AnyTimes()
mockAPI.EXPECT().LogWarn(gomock.Any(), gomock.Any()).AnyTimes()
return &TestHelper{
api: mockAPI,
auth: mockAuth,
ctrl: ctrl,
pa: NewPluginAdapter(mockAPI, mockAuth),
}
}
func (th *TestHelper) ReceiveWebSocketMessage(webConnID, userID, action string, data map[string]interface{}) {
req := &mmModel.WebSocketRequest{Action: websocketMessagePrefix + action, Data: data}
th.pa.WebSocketMessageHasBeenPosted(webConnID, userID, req)
}
func (th *TestHelper) SubscribeWebConnToWorkspace(webConnID, userID, workspaceID string) {
th.auth.EXPECT().
DoesUserHaveWorkspaceAccess(userID, workspaceID).
Return(true)
msgData := map[string]interface{}{"workspaceId": workspaceID}
th.ReceiveWebSocketMessage(webConnID, userID, websocketActionSubscribeWorkspace, msgData)
}
func (th *TestHelper) UnsubscribeWebConnFromWorkspace(webConnID, userID, workspaceID string) {
msgData := map[string]interface{}{"workspaceId": workspaceID}
th.ReceiveWebSocketMessage(webConnID, userID, websocketActionUnsubscribeWorkspace, msgData)
}

File diff suppressed because it is too large Load Diff

View File

@@ -1,9 +1,11 @@
//go:generate mockgen --build_flags=--mod=mod -destination=mocks/mockpluginapi.go -package mocks github.com/mattermost/mattermost-server/v6/plugin API
package ws
import (
"fmt"
"strings"
"sync"
"sync/atomic"
"time"
"github.com/mattermost/focalboard/server/auth"
@@ -17,45 +19,10 @@ const websocketMessagePrefix = "custom_focalboard_"
var errMissingWorkspaceInCommand = fmt.Errorf("command doesn't contain workspaceId")
type PluginAdapterClient struct {
webConnID string
userID string
workspaces []string
blocks []string
}
func (pac *PluginAdapterClient) isSubscribedToWorkspace(workspaceID string) bool {
for _, id := range pac.workspaces {
if id == workspaceID {
return true
}
}
return false
}
func (pac *PluginAdapterClient) isSubscribedToBlock(blockID string) bool {
for _, id := range pac.blocks {
if id == blockID {
return true
}
}
return false
}
type PluginAdapterInterface interface {
addListener(pac *PluginAdapterClient)
removeListener(pac *PluginAdapterClient)
removeListenerFromWorkspace(pac *PluginAdapterClient, workspaceID string)
removeListenerFromBlock(pac *PluginAdapterClient, blockID string)
subscribeListenerToWorkspace(pac *PluginAdapterClient, workspaceID string)
unsubscribeListenerFromWorkspace(pac *PluginAdapterClient, workspaceID string)
unsubscribeListenerFromBlocks(pac *PluginAdapterClient, blockIDs []string)
OnWebSocketConnect(webConnID, userID string)
OnWebSocketDisconnect(webConnID, userID string)
WebSocketMessageHasBeenPosted(webConnID, userID string, req *mmModel.WebSocketRequest)
getUserIDsForWorkspace(workspaceID string) []string
BroadcastConfigChange(clientConfig model.ClientConfig)
BroadcastBlockChange(workspaceID string, block model.Block)
BroadcastBlockDelete(workspaceID, blockID, parentID string)
@@ -63,35 +30,73 @@ type PluginAdapterInterface interface {
}
type PluginAdapter struct {
api plugin.API
auth *auth.Auth
api plugin.API
auth auth.AuthInterface
staleThreshold time.Duration
listeners map[string]*PluginAdapterClient
listenersMU sync.RWMutex
listeners map[string]*PluginAdapterClient
listenersByUserID map[string][]*PluginAdapterClient
subscriptionsMU sync.RWMutex
listenersByWorkspace map[string][]*PluginAdapterClient
listenersByBlock map[string][]*PluginAdapterClient
mu sync.RWMutex
}
func NewPluginAdapter(api plugin.API, auth *auth.Auth) *PluginAdapter {
func NewPluginAdapter(api plugin.API, auth auth.AuthInterface) *PluginAdapter {
return &PluginAdapter{
api: api,
auth: auth,
staleThreshold: 5 * time.Minute,
listeners: make(map[string]*PluginAdapterClient),
listenersByUserID: make(map[string][]*PluginAdapterClient),
listenersByWorkspace: make(map[string][]*PluginAdapterClient),
listenersByBlock: make(map[string][]*PluginAdapterClient),
mu: sync.RWMutex{},
listenersMU: sync.RWMutex{},
subscriptionsMU: sync.RWMutex{},
}
}
func (pa *PluginAdapter) GetListenerByWebConnID(webConnID string) (pac *PluginAdapterClient, ok bool) {
pa.listenersMU.RLock()
defer pa.listenersMU.RUnlock()
pac, ok = pa.listeners[webConnID]
return
}
func (pa *PluginAdapter) GetListenersByUserID(userID string) []*PluginAdapterClient {
pa.listenersMU.RLock()
defer pa.listenersMU.RUnlock()
return pa.listenersByUserID[userID]
}
func (pa *PluginAdapter) GetListenersByWorkspace(workspaceID string) []*PluginAdapterClient {
pa.subscriptionsMU.RLock()
defer pa.subscriptionsMU.RUnlock()
return pa.listenersByWorkspace[workspaceID]
}
func (pa *PluginAdapter) GetListenersByBlock(blockID string) []*PluginAdapterClient {
pa.subscriptionsMU.RLock()
defer pa.subscriptionsMU.RUnlock()
return pa.listenersByBlock[blockID]
}
func (pa *PluginAdapter) addListener(pac *PluginAdapterClient) {
pa.mu.Lock()
defer pa.mu.Unlock()
pa.listenersMU.Lock()
defer pa.listenersMU.Unlock()
pa.listeners[pac.webConnID] = pac
pa.listenersByUserID[pac.userID] = append(pa.listenersByUserID[pac.userID], pac)
}
func (pa *PluginAdapter) removeListener(pac *PluginAdapterClient) {
pa.mu.Lock()
defer pa.mu.Unlock()
pa.listenersMU.Lock()
defer pa.listenersMU.Unlock()
// workspace subscriptions
for _, workspace := range pac.workspaces {
@@ -103,43 +108,52 @@ func (pa *PluginAdapter) removeListener(pac *PluginAdapterClient) {
pa.removeListenerFromBlock(pac, block)
}
// user ID list
newUserListeners := []*PluginAdapterClient{}
for _, listener := range pa.listenersByUserID[pac.userID] {
if listener.webConnID != pac.webConnID {
newUserListeners = append(newUserListeners, listener)
}
}
pa.listenersByUserID[pac.userID] = newUserListeners
delete(pa.listeners, pac.webConnID)
}
func (pa *PluginAdapter) removeExpiredForUserID(userID string) {
for _, pac := range pa.GetListenersByUserID(userID) {
if !pac.isActive() && pac.hasExpired(pa.staleThreshold) {
pa.removeListener(pac)
}
}
}
func (pa *PluginAdapter) removeListenerFromWorkspace(pac *PluginAdapterClient, workspaceID string) {
newWorkspaceListeners := []*PluginAdapterClient{}
for _, listener := range pa.listenersByWorkspace[workspaceID] {
for _, listener := range pa.GetListenersByWorkspace(workspaceID) {
if listener.webConnID != pac.webConnID {
newWorkspaceListeners = append(newWorkspaceListeners, listener)
}
}
pa.subscriptionsMU.Lock()
pa.listenersByWorkspace[workspaceID] = newWorkspaceListeners
pa.subscriptionsMU.Unlock()
newClientWorkspaces := []string{}
for _, id := range pac.workspaces {
if id != workspaceID {
newClientWorkspaces = append(newClientWorkspaces, id)
}
}
pac.workspaces = newClientWorkspaces
pac.unsubscribeFromWorkspace(workspaceID)
}
func (pa *PluginAdapter) removeListenerFromBlock(pac *PluginAdapterClient, blockID string) {
newBlockListeners := []*PluginAdapterClient{}
for _, listener := range pa.listenersByBlock[blockID] {
for _, listener := range pa.GetListenersByBlock(blockID) {
if listener.webConnID != pac.webConnID {
newBlockListeners = append(newBlockListeners, listener)
}
}
pa.subscriptionsMU.Lock()
pa.listenersByBlock[blockID] = newBlockListeners
pa.subscriptionsMU.Unlock()
newClientBlocks := []string{}
for _, id := range pac.blocks {
if id != blockID {
newClientBlocks = append(newClientBlocks, id)
}
}
pac.blocks = newClientBlocks
pac.unsubscribeFromBlock(blockID)
}
func (pa *PluginAdapter) subscribeListenerToWorkspace(pac *PluginAdapterClient, workspaceID string) {
@@ -147,11 +161,11 @@ func (pa *PluginAdapter) subscribeListenerToWorkspace(pac *PluginAdapterClient,
return
}
pa.mu.Lock()
defer pa.mu.Unlock()
pa.subscriptionsMU.Lock()
pa.listenersByWorkspace[workspaceID] = append(pa.listenersByWorkspace[workspaceID], pac)
pac.workspaces = append(pac.workspaces, workspaceID)
pa.subscriptionsMU.Unlock()
pac.subscribeToWorkspace(workspaceID)
}
func (pa *PluginAdapter) unsubscribeListenerFromWorkspace(pac *PluginAdapterClient, workspaceID string) {
@@ -159,16 +173,26 @@ func (pa *PluginAdapter) unsubscribeListenerFromWorkspace(pac *PluginAdapterClie
return
}
pa.mu.Lock()
defer pa.mu.Unlock()
pa.removeListenerFromWorkspace(pac, workspaceID)
}
func (pa *PluginAdapter) unsubscribeListenerFromBlocks(pac *PluginAdapterClient, blockIDs []string) {
pa.mu.Lock()
defer pa.mu.Unlock()
func (pa *PluginAdapter) getUserIDsForWorkspace(workspaceID string) []string {
userMap := map[string]bool{}
for _, pac := range pa.GetListenersByWorkspace(workspaceID) {
if pac.isActive() {
userMap[pac.userID] = true
}
}
userIDs := []string{}
for userID := range userMap {
userIDs = append(userIDs, userID)
}
return userIDs
}
//nolint:unused
func (pa *PluginAdapter) unsubscribeListenerFromBlocks(pac *PluginAdapterClient, blockIDs []string) {
for _, blockID := range blockIDs {
if pac.isSubscribedToBlock(blockID) {
pa.removeListenerFromBlock(pac, blockID)
@@ -177,18 +201,29 @@ func (pa *PluginAdapter) unsubscribeListenerFromBlocks(pac *PluginAdapterClient,
}
func (pa *PluginAdapter) OnWebSocketConnect(webConnID, userID string) {
pac := &PluginAdapterClient{
if existingPAC, ok := pa.GetListenerByWebConnID(webConnID); ok {
pa.api.LogDebug("inactive connection found for webconn, reusing",
"webConnID", webConnID,
"userID", userID,
)
atomic.StoreInt64(&existingPAC.inactiveAt, 0)
return
}
newPAC := &PluginAdapterClient{
inactiveAt: 0,
webConnID: webConnID,
userID: userID,
workspaces: []string{},
blocks: []string{},
}
pa.addListener(pac)
pa.addListener(newPAC)
pa.removeExpiredForUserID(userID)
}
func (pa *PluginAdapter) OnWebSocketDisconnect(webConnID, userID string) {
pac, ok := pa.listeners[webConnID]
pac, ok := pa.GetListenerByWebConnID(webConnID)
if !ok {
pa.api.LogError("received a disconnect for an unregistered webconn",
"webConnID", webConnID,
@@ -197,7 +232,7 @@ func (pa *PluginAdapter) OnWebSocketDisconnect(webConnID, userID string) {
return
}
pa.removeListener(pac)
atomic.StoreInt64(&pac.inactiveAt, mmModel.GetMillis())
}
func commandFromRequest(req *mmModel.WebSocketRequest) (*WebsocketCommand, error) {
@@ -221,7 +256,7 @@ func commandFromRequest(req *mmModel.WebSocketRequest) (*WebsocketCommand, error
}
func (pa *PluginAdapter) WebSocketMessageHasBeenPosted(webConnID, userID string, req *mmModel.WebSocketRequest) {
pac, ok := pa.listeners[webConnID]
pac, ok := pa.GetListenerByWebConnID(webConnID)
if !ok {
pa.api.LogError("received a message for an unregistered webconn",
"webConnID", webConnID,
@@ -282,19 +317,6 @@ func (pa *PluginAdapter) WebSocketMessageHasBeenPosted(webConnID, userID string,
}
}
func (pa *PluginAdapter) getUserIDsForWorkspace(workspaceID string) []string {
userMap := map[string]bool{}
for _, pac := range pa.listenersByWorkspace[workspaceID] {
userMap[pac.userID] = true
}
userIDs := []string{}
for userID := range userMap {
userIDs = append(userIDs, userID)
}
return userIDs
}
func (pa *PluginAdapter) sendMessageToAllSkipCluster(payload map[string]interface{}) {
// Empty &mmModel.WebsocketBroadcast will send to all users
pa.api.PublishWebSocketEvent(websocketActionUpdateConfig, payload, &mmModel.WebsocketBroadcast{})

View File

@@ -0,0 +1,86 @@
package ws
import (
"sync"
"sync/atomic"
"time"
mmModel "github.com/mattermost/mattermost-server/v6/model"
)
type PluginAdapterClient struct {
inactiveAt int64
webConnID string
userID string
workspaces []string
blocks []string
mu sync.RWMutex
}
func (pac *PluginAdapterClient) isActive() bool {
return atomic.LoadInt64(&pac.inactiveAt) == 0
}
func (pac *PluginAdapterClient) hasExpired(threshold time.Duration) bool {
return !mmModel.GetTimeForMillis(atomic.LoadInt64(&pac.inactiveAt)).Add(threshold).After(time.Now())
}
func (pac *PluginAdapterClient) subscribeToWorkspace(workspaceID string) {
pac.mu.Lock()
defer pac.mu.Unlock()
pac.workspaces = append(pac.workspaces, workspaceID)
}
func (pac *PluginAdapterClient) unsubscribeFromWorkspace(workspaceID string) {
pac.mu.Lock()
defer pac.mu.Unlock()
newClientWorkspaces := []string{}
for _, id := range pac.workspaces {
if id != workspaceID {
newClientWorkspaces = append(newClientWorkspaces, id)
}
}
pac.workspaces = newClientWorkspaces
}
func (pac *PluginAdapterClient) unsubscribeFromBlock(blockID string) {
pac.mu.Lock()
defer pac.mu.Unlock()
newClientBlocks := []string{}
for _, id := range pac.blocks {
if id != blockID {
newClientBlocks = append(newClientBlocks, id)
}
}
pac.blocks = newClientBlocks
}
func (pac *PluginAdapterClient) isSubscribedToWorkspace(workspaceID string) bool {
pac.mu.RLock()
defer pac.mu.RUnlock()
for _, id := range pac.workspaces {
if id == workspaceID {
return true
}
}
return false
}
//nolint:unused
func (pac *PluginAdapterClient) isSubscribedToBlock(blockID string) bool {
pac.mu.RLock()
defer pac.mu.RUnlock()
for _, id := range pac.blocks {
if id == blockID {
return true
}
}
return false
}

View File

@@ -0,0 +1,346 @@
package ws
import (
"sync"
"testing"
mmModel "github.com/mattermost/mattermost-server/v6/model"
"github.com/stretchr/testify/require"
)
func TestPluginAdapterWorkspaceSubscription(t *testing.T) {
th := SetupTestHelper(t)
webConnID := mmModel.NewId()
userID := mmModel.NewId()
workspaceID := mmModel.NewId()
var pac *PluginAdapterClient
t.Run("Should correctly add a connection", func(t *testing.T) {
require.Empty(t, th.pa.listeners)
require.Empty(t, th.pa.listenersByWorkspace)
th.pa.OnWebSocketConnect(webConnID, userID)
require.Len(t, th.pa.listeners, 1)
var ok bool
pac, ok = th.pa.listeners[webConnID]
require.True(t, ok)
require.NotNil(t, pac)
require.Equal(t, userID, pac.userID)
require.Empty(t, th.pa.listenersByWorkspace)
})
t.Run("Should correctly subscribe to a workspace", func(t *testing.T) {
require.False(t, pac.isSubscribedToWorkspace(workspaceID))
th.SubscribeWebConnToWorkspace(pac.webConnID, pac.userID, workspaceID)
require.Len(t, th.pa.listenersByWorkspace[workspaceID], 1)
require.Contains(t, th.pa.listenersByWorkspace[workspaceID], pac)
require.Len(t, pac.workspaces, 1)
require.Contains(t, pac.workspaces, workspaceID)
require.True(t, pac.isSubscribedToWorkspace(workspaceID))
})
t.Run("Subscribing again to a subscribed workspace would have no effect", func(t *testing.T) {
require.True(t, pac.isSubscribedToWorkspace(workspaceID))
th.SubscribeWebConnToWorkspace(pac.webConnID, pac.userID, workspaceID)
require.Len(t, th.pa.listenersByWorkspace[workspaceID], 1)
require.Contains(t, th.pa.listenersByWorkspace[workspaceID], pac)
require.Len(t, pac.workspaces, 1)
require.Contains(t, pac.workspaces, workspaceID)
require.True(t, pac.isSubscribedToWorkspace(workspaceID))
})
t.Run("Should correctly unsubscribe to a workspace", func(t *testing.T) {
require.True(t, pac.isSubscribedToWorkspace(workspaceID))
th.UnsubscribeWebConnFromWorkspace(pac.webConnID, pac.userID, workspaceID)
require.Empty(t, th.pa.listenersByWorkspace[workspaceID])
require.Empty(t, pac.workspaces)
require.False(t, pac.isSubscribedToWorkspace(workspaceID))
})
t.Run("Unsubscribing again to an unsubscribed workspace would have no effect", func(t *testing.T) {
require.False(t, pac.isSubscribedToWorkspace(workspaceID))
th.UnsubscribeWebConnFromWorkspace(pac.webConnID, pac.userID, workspaceID)
require.Empty(t, th.pa.listenersByWorkspace[workspaceID])
require.Empty(t, pac.workspaces)
require.False(t, pac.isSubscribedToWorkspace(workspaceID))
})
t.Run("Should correctly be marked as inactive if disconnected", func(t *testing.T) {
require.Len(t, th.pa.listeners, 1)
require.True(t, th.pa.listeners[webConnID].isActive())
th.pa.OnWebSocketDisconnect(webConnID, userID)
require.Len(t, th.pa.listeners, 1)
require.False(t, th.pa.listeners[webConnID].isActive())
})
t.Run("Should be marked back as active if reconnect", func(t *testing.T) {
require.Len(t, th.pa.listeners, 1)
require.False(t, th.pa.listeners[webConnID].isActive())
th.pa.OnWebSocketConnect(webConnID, userID)
require.Len(t, th.pa.listeners, 1)
require.True(t, th.pa.listeners[webConnID].isActive())
})
}
func TestPluginAdapterClientReconnect(t *testing.T) {
th := SetupTestHelper(t)
webConnID := mmModel.NewId()
userID := mmModel.NewId()
workspaceID := mmModel.NewId()
var pac *PluginAdapterClient
t.Run("A user should be able to reconnect within the accepted threshold and keep their subscriptions", func(t *testing.T) {
// create the connection
require.Len(t, th.pa.listeners, 0)
require.Len(t, th.pa.listenersByUserID[userID], 0)
th.pa.OnWebSocketConnect(webConnID, userID)
require.Len(t, th.pa.listeners, 1)
require.Len(t, th.pa.listenersByUserID[userID], 1)
var ok bool
pac, ok = th.pa.listeners[webConnID]
require.True(t, ok)
require.NotNil(t, pac)
th.SubscribeWebConnToWorkspace(pac.webConnID, pac.userID, workspaceID)
require.True(t, pac.isSubscribedToWorkspace(workspaceID))
// disconnect
th.pa.OnWebSocketDisconnect(webConnID, userID)
require.False(t, pac.isActive())
require.Len(t, th.pa.listeners, 1)
require.Len(t, th.pa.listenersByUserID[userID], 1)
// reconnect right away. The connection should still be subscribed
th.pa.OnWebSocketConnect(webConnID, userID)
require.Len(t, th.pa.listeners, 1)
require.Len(t, th.pa.listenersByUserID[userID], 1)
require.True(t, pac.isActive())
require.True(t, pac.isSubscribedToWorkspace(workspaceID))
})
t.Run("Should remove old inactive connection when user connects with a different ID", func(t *testing.T) {
// we set the stale threshold to zero so inactive connections always get deleted
oldStaleThreshold := th.pa.staleThreshold
th.pa.staleThreshold = 0
defer func() { th.pa.staleThreshold = oldStaleThreshold }()
th.pa.OnWebSocketDisconnect(webConnID, userID)
require.Len(t, th.pa.listeners, 1)
require.Len(t, th.pa.listenersByUserID[userID], 1)
require.Equal(t, webConnID, th.pa.listenersByUserID[userID][0].webConnID)
newWebConnID := mmModel.NewId()
th.pa.OnWebSocketConnect(newWebConnID, userID)
require.Len(t, th.pa.listeners, 1)
require.Len(t, th.pa.listenersByUserID[userID], 1)
require.Contains(t, th.pa.listeners, newWebConnID)
require.NotContains(t, th.pa.listeners, webConnID)
require.Equal(t, newWebConnID, th.pa.listenersByUserID[userID][0].webConnID)
// if the same ID connects again, it should have no subscriptions
th.pa.OnWebSocketConnect(webConnID, userID)
require.Len(t, th.pa.listeners, 2)
require.Len(t, th.pa.listenersByUserID[userID], 2)
reconnectedPAC, ok := th.pa.listeners[webConnID]
require.True(t, ok)
require.False(t, reconnectedPAC.isSubscribedToWorkspace(workspaceID))
})
t.Run("Should not remove active connections when user connects with a different ID", func(t *testing.T) {
// we set the stale threshold to zero so inactive connections always get deleted
oldStaleThreshold := th.pa.staleThreshold
th.pa.staleThreshold = 0
defer func() { th.pa.staleThreshold = oldStaleThreshold }()
// currently we have two listeners for userID, both active
require.Len(t, th.pa.listeners, 2)
// a new user connects
th.pa.OnWebSocketConnect(mmModel.NewId(), userID)
// and we should have three connections, all of them active
require.Len(t, th.pa.listeners, 3)
for _, listener := range th.pa.listeners {
require.True(t, listener.isActive())
}
})
}
func TestGetUserIDsForWorkspace(t *testing.T) {
th := SetupTestHelper(t)
// we have two workspaces
workspaceID1 := mmModel.NewId()
workspaceID2 := mmModel.NewId()
// user 1 has two connections
userID1 := mmModel.NewId()
webConnID1 := mmModel.NewId()
webConnID2 := mmModel.NewId()
// user 2 has one connection
userID2 := mmModel.NewId()
webConnID3 := mmModel.NewId()
wg := new(sync.WaitGroup)
wg.Add(3)
go func(wg *sync.WaitGroup) {
th.pa.OnWebSocketConnect(webConnID1, userID1)
th.SubscribeWebConnToWorkspace(webConnID1, userID1, workspaceID1)
wg.Done()
}(wg)
go func(wg *sync.WaitGroup) {
th.pa.OnWebSocketConnect(webConnID2, userID1)
th.SubscribeWebConnToWorkspace(webConnID2, userID1, workspaceID2)
wg.Done()
}(wg)
go func(wg *sync.WaitGroup) {
th.pa.OnWebSocketConnect(webConnID3, userID2)
th.SubscribeWebConnToWorkspace(webConnID3, userID2, workspaceID2)
wg.Done()
}(wg)
wg.Wait()
t.Run("should find that only user1 is connected to workspace 1", func(t *testing.T) {
userIDs := th.pa.getUserIDsForWorkspace(workspaceID1)
require.ElementsMatch(t, []string{userID1}, userIDs)
})
t.Run("should find that both users are connected to workspace 2", func(t *testing.T) {
userIDs := th.pa.getUserIDsForWorkspace(workspaceID2)
require.ElementsMatch(t, []string{userID1, userID2}, userIDs)
})
t.Run("should ignore user1 if webConn 2 inactive when getting workspace 2 user ids", func(t *testing.T) {
th.pa.OnWebSocketDisconnect(webConnID2, userID1)
userIDs := th.pa.getUserIDsForWorkspace(workspaceID2)
require.ElementsMatch(t, []string{userID2}, userIDs)
})
t.Run("should still find user 1 in workspace 1 after the webConn 2 disconnection", func(t *testing.T) {
userIDs := th.pa.getUserIDsForWorkspace(workspaceID1)
require.ElementsMatch(t, []string{userID1}, userIDs)
})
t.Run("should find again both users if the webConn 2 comes back", func(t *testing.T) {
th.pa.OnWebSocketConnect(webConnID2, userID1)
userIDs := th.pa.getUserIDsForWorkspace(workspaceID2)
require.ElementsMatch(t, []string{userID1, userID2}, userIDs)
})
}
func TestParallelSubscriptionsOnMultipleConnections(t *testing.T) {
th := SetupTestHelper(t)
workspaceID1 := mmModel.NewId()
workspaceID2 := mmModel.NewId()
workspaceID3 := mmModel.NewId()
workspaceID4 := mmModel.NewId()
userID := mmModel.NewId()
webConnID1 := mmModel.NewId()
webConnID2 := mmModel.NewId()
th.pa.OnWebSocketConnect(webConnID1, userID)
pac1, ok := th.pa.GetListenerByWebConnID(webConnID1)
require.True(t, ok)
th.pa.OnWebSocketConnect(webConnID2, userID)
pac2, ok := th.pa.GetListenerByWebConnID(webConnID2)
require.True(t, ok)
wg := new(sync.WaitGroup)
wg.Add(4)
go func(wg *sync.WaitGroup) {
th.SubscribeWebConnToWorkspace(webConnID1, userID, workspaceID1)
require.True(t, pac1.isSubscribedToWorkspace(workspaceID1))
th.SubscribeWebConnToWorkspace(webConnID2, userID, workspaceID1)
require.True(t, pac2.isSubscribedToWorkspace(workspaceID1))
th.UnsubscribeWebConnFromWorkspace(webConnID1, userID, workspaceID1)
require.False(t, pac1.isSubscribedToWorkspace(workspaceID1))
th.UnsubscribeWebConnFromWorkspace(webConnID2, userID, workspaceID1)
require.False(t, pac2.isSubscribedToWorkspace(workspaceID1))
wg.Done()
}(wg)
go func(wg *sync.WaitGroup) {
th.SubscribeWebConnToWorkspace(webConnID1, userID, workspaceID2)
require.True(t, pac1.isSubscribedToWorkspace(workspaceID2))
th.SubscribeWebConnToWorkspace(webConnID2, userID, workspaceID2)
require.True(t, pac2.isSubscribedToWorkspace(workspaceID2))
th.UnsubscribeWebConnFromWorkspace(webConnID1, userID, workspaceID2)
require.False(t, pac1.isSubscribedToWorkspace(workspaceID2))
th.UnsubscribeWebConnFromWorkspace(webConnID2, userID, workspaceID2)
require.False(t, pac2.isSubscribedToWorkspace(workspaceID2))
wg.Done()
}(wg)
go func(wg *sync.WaitGroup) {
th.SubscribeWebConnToWorkspace(webConnID1, userID, workspaceID3)
require.True(t, pac1.isSubscribedToWorkspace(workspaceID3))
th.SubscribeWebConnToWorkspace(webConnID2, userID, workspaceID3)
require.True(t, pac2.isSubscribedToWorkspace(workspaceID3))
th.UnsubscribeWebConnFromWorkspace(webConnID1, userID, workspaceID3)
require.False(t, pac1.isSubscribedToWorkspace(workspaceID3))
th.UnsubscribeWebConnFromWorkspace(webConnID2, userID, workspaceID3)
require.False(t, pac2.isSubscribedToWorkspace(workspaceID3))
wg.Done()
}(wg)
go func(wg *sync.WaitGroup) {
th.SubscribeWebConnToWorkspace(webConnID1, userID, workspaceID4)
require.True(t, pac1.isSubscribedToWorkspace(workspaceID4))
th.SubscribeWebConnToWorkspace(webConnID2, userID, workspaceID4)
require.True(t, pac2.isSubscribedToWorkspace(workspaceID4))
th.UnsubscribeWebConnFromWorkspace(webConnID1, userID, workspaceID4)
require.False(t, pac1.isSubscribedToWorkspace(workspaceID4))
th.UnsubscribeWebConnFromWorkspace(webConnID2, userID, workspaceID4)
require.False(t, pac2.isSubscribedToWorkspace(workspaceID4))
wg.Done()
}(wg)
wg.Wait()
}