mirror of
https://github.com/mattermost/focalboard.git
synced 2024-12-24 13:43:12 +02:00
Websocket client now connects once and subscribe/desubscribe on the fly (#752)
* Websocket client now connects once and subscribe/desubscribe on the fly * Fixing eslint error Co-authored-by: Harshil Sharma <harshilsharma63@gmail.com>
This commit is contained in:
parent
efedf2f481
commit
6a7d3f797b
@ -21,6 +21,7 @@ const (
|
||||
websocketActionSubscribeWorkspace = "SUBSCRIBE_WORKSPACE"
|
||||
websocketActionUnsubscribeWorkspace = "UNSUBSCRIBE_WORKSPACE"
|
||||
websocketActionSubscribeBlocks = "SUBSCRIBE_BLOCKS"
|
||||
websocketActionUnsubscribeBlocks = "UNSUBSCRIBE_BLOCKS"
|
||||
)
|
||||
|
||||
type Hub interface {
|
||||
@ -211,6 +212,26 @@ func (ws *Server) handleWebSocket(w http.ResponseWriter, r *http.Request) {
|
||||
continue
|
||||
}
|
||||
|
||||
if command.Action == websocketActionUnsubscribeBlocks {
|
||||
ws.logger.Debug(`Command: UNSUBSCRIBE_BLOCKS`,
|
||||
mlog.String("workspaceID", command.WorkspaceID),
|
||||
mlog.Stringer("client", wsSession.client.RemoteAddr()),
|
||||
)
|
||||
|
||||
if !ws.isCommandReadTokenValid(command) {
|
||||
ws.logger.Error(`Rejected invalid read token`,
|
||||
mlog.Stringer("client", wsSession.client.RemoteAddr()),
|
||||
mlog.String("action", command.Action),
|
||||
mlog.String("readToken", command.ReadToken),
|
||||
)
|
||||
|
||||
continue
|
||||
}
|
||||
|
||||
ws.unsubscribeListenerFromBlocks(wsSession.client, command.BlockIDs)
|
||||
continue
|
||||
}
|
||||
|
||||
// if the command is not authenticated at this point, it will
|
||||
// not be processed
|
||||
if !wsSession.isAuthenticated() {
|
||||
|
@ -22,6 +22,7 @@ import LoginPage from './pages/loginPage'
|
||||
import RegisterPage from './pages/registerPage'
|
||||
import {IUser} from './user'
|
||||
import {Utils} from './utils'
|
||||
import wsClient from './wsclient'
|
||||
import {importNativeAppSettings} from './nativeApp'
|
||||
import {fetchCurrentUser, getCurrentUser} from './store/currentUser'
|
||||
import {getLanguage, fetchLanguage} from './store/language'
|
||||
@ -43,6 +44,13 @@ const App = React.memo((): JSX.Element => {
|
||||
})
|
||||
}, [])
|
||||
|
||||
useEffect(() => {
|
||||
wsClient.open()
|
||||
return () => {
|
||||
wsClient.close()
|
||||
}
|
||||
}, [])
|
||||
|
||||
return (
|
||||
<IntlProvider
|
||||
locale={language.split(/[_]/)[0]}
|
||||
|
@ -28,7 +28,6 @@ const CardDialog = (props: Props) => {
|
||||
const [cardTree, setCardTree] = useState<CardTree>()
|
||||
const intl = useIntl()
|
||||
useCardListener(
|
||||
[props.cardId],
|
||||
async (blocks) => {
|
||||
Utils.log(`cardListener.onChanged: ${blocks.length}`)
|
||||
const newCardTree = cardTree ? MutableCardTree.incrementalUpdate(cardTree, blocks) : await MutableCardTree.sync(props.cardId)
|
||||
|
@ -54,7 +54,6 @@ const Gallery = (props: Props): JSX.Element => {
|
||||
const visibleTitle = boardTree.activeView.visiblePropertyIds.includes(Constants.titleColumnId)
|
||||
|
||||
useCardListener(
|
||||
cards.map((c) => c.id),
|
||||
async (blocks) => {
|
||||
cards.forEach(async (c) => {
|
||||
const cardTree = cardTrees[c.id]
|
||||
|
@ -53,7 +53,6 @@ const Kanban = (props: Props) => {
|
||||
cardTreeRef.current = cardTrees
|
||||
|
||||
useCardListener(
|
||||
cards.map((c) => c.id),
|
||||
async (blocks) => {
|
||||
for (const block of blocks) {
|
||||
const cardTree = cardTreeRef.current && cardTreeRef.current[block.parentId]
|
||||
@ -68,7 +67,6 @@ const Kanban = (props: Props) => {
|
||||
setCardTrees((oldTree) => ({...oldTree, [c.id]: newCardTree}))
|
||||
})
|
||||
},
|
||||
false,
|
||||
)
|
||||
|
||||
const propertyNameChanged = async (option: IPropertyOption, text: string): Promise<void> => {
|
||||
|
@ -122,7 +122,7 @@ const SidebarBoardItem = React.memo((props: Props) => {
|
||||
intl.formatMessage({id: 'Sidebar.delete-board', defaultMessage: 'Delete board'}),
|
||||
async () => {
|
||||
if (props.nextBoardId) {
|
||||
// This delay is needed because OctoListener has a default 100 ms notification delay before updates
|
||||
// This delay is needed because WSClient has a default 100 ms notification delay before updates
|
||||
setTimeout(() => {
|
||||
showBoard(props.nextBoardId)
|
||||
}, 120)
|
||||
|
@ -46,7 +46,6 @@ const Table = (props: Props) => {
|
||||
cardTreeRef.current = cardTrees
|
||||
|
||||
useCardListener(
|
||||
cards.map((c) => c.id),
|
||||
async (blocks) => {
|
||||
for (const block of blocks) {
|
||||
const cardTree = cardTreeRef.current && cardTreeRef.current[block.parentId]
|
||||
@ -67,7 +66,6 @@ const Table = (props: Props) => {
|
||||
setCardTrees((oldTree) => ({...oldTree, [c.id]: newCardTree}))
|
||||
})
|
||||
},
|
||||
false,
|
||||
)
|
||||
|
||||
const {offset, resizingColumn} = useDragLayer((monitor) => {
|
||||
|
@ -44,7 +44,7 @@ const ViewMenu = React.memo((props: Props) => {
|
||||
newView,
|
||||
'duplicate view',
|
||||
async () => {
|
||||
// This delay is needed because OctoListener has a default 100 ms notification delay before updates
|
||||
// This delay is needed because WSClient has a default 100 ms notification delay before updates
|
||||
setTimeout(() => {
|
||||
showView(newView.id)
|
||||
}, 120)
|
||||
@ -91,7 +91,7 @@ const ViewMenu = React.memo((props: Props) => {
|
||||
view,
|
||||
'add view',
|
||||
async () => {
|
||||
// This delay is needed because OctoListener has a default 100 ms notification delay before updates
|
||||
// This delay is needed because WSClient has a default 100 ms notification delay before updates
|
||||
setTimeout(() => {
|
||||
showView(view.id)
|
||||
}, 120)
|
||||
@ -120,7 +120,7 @@ const ViewMenu = React.memo((props: Props) => {
|
||||
view,
|
||||
'add view',
|
||||
async () => {
|
||||
// This delay is needed because OctoListener has a default 100 ms notification delay before updates
|
||||
// This delay is needed because WSClient has a default 100 ms notification delay before updates
|
||||
setTimeout(() => {
|
||||
Utils.log(`showView: ${view.id}`)
|
||||
showView(view.id)
|
||||
@ -148,7 +148,7 @@ const ViewMenu = React.memo((props: Props) => {
|
||||
view,
|
||||
'add view',
|
||||
async () => {
|
||||
// This delay is needed because OctoListener has a default 100 ms notification delay before updates
|
||||
// This delay is needed because WSClient has a default 100 ms notification delay before updates
|
||||
setTimeout(() => {
|
||||
Utils.log(`showView: ${view.id}`)
|
||||
showView(view.id)
|
||||
|
@ -3,44 +3,16 @@
|
||||
import {useEffect} from 'react'
|
||||
|
||||
import {IBlock} from '../blocks/block'
|
||||
import octoClient from '../octoClient'
|
||||
import {OctoListener} from '../octoListener'
|
||||
import {Utils} from '../utils'
|
||||
|
||||
export default function useCardListener(cardIds: string[], onChange: (blocks: IBlock[]) => void, onReconnect: () => void, initialCall = true): void {
|
||||
let cardListener: OctoListener | null = null
|
||||
|
||||
const deleteListener = () => {
|
||||
cardListener?.close()
|
||||
cardListener = null
|
||||
}
|
||||
|
||||
const createListener = () => {
|
||||
deleteListener()
|
||||
|
||||
cardListener = new OctoListener()
|
||||
cardListener.open(
|
||||
octoClient.workspaceId,
|
||||
cardIds,
|
||||
onChange,
|
||||
onReconnect,
|
||||
)
|
||||
}
|
||||
|
||||
const createCardTreeAndSync = async () => {
|
||||
if (initialCall) {
|
||||
onReconnect()
|
||||
}
|
||||
|
||||
createListener()
|
||||
}
|
||||
import wsClient, {WSClient} from '../wsclient'
|
||||
|
||||
export default function useCardListener(onChange: (blocks: IBlock[]) => void, onReconnect: () => void): void {
|
||||
useEffect(() => {
|
||||
Utils.log(`useCardListener.connect: ${cardIds}`)
|
||||
createCardTreeAndSync()
|
||||
const onChangeHandler = (_: WSClient, blocks: IBlock[]) => onChange(blocks)
|
||||
wsClient.addOnChange(onChangeHandler)
|
||||
wsClient.addOnReconnect(onReconnect)
|
||||
return () => {
|
||||
Utils.log(`useCardListener.disconnect: ${cardIds}`)
|
||||
deleteListener()
|
||||
wsClient.removeOnChange(onChangeHandler)
|
||||
wsClient.removeOnReconnect(onReconnect)
|
||||
}
|
||||
}, [cardIds.join('-')])
|
||||
}, [])
|
||||
}
|
||||
|
@ -1,248 +0,0 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
import {IBlock} from './blocks/block'
|
||||
import {Utils} from './utils'
|
||||
|
||||
// These are outgoing commands to the server
|
||||
type WSCommand = {
|
||||
action: string
|
||||
workspaceId?: string
|
||||
readToken?: string
|
||||
blockIds?: string[]
|
||||
}
|
||||
|
||||
// These are messages from the server
|
||||
type WSMessage = {
|
||||
action?: string
|
||||
block?: IBlock
|
||||
error?: string
|
||||
}
|
||||
|
||||
type OnChangeHandler = (blocks: IBlock[]) => void
|
||||
type OnStateChange = (state: 'open' | 'close') => void
|
||||
|
||||
//
|
||||
// OctoListener calls a handler when a block or any of its children changes
|
||||
//
|
||||
class OctoListener {
|
||||
get isOpen(): boolean {
|
||||
return Boolean(this.ws)
|
||||
}
|
||||
|
||||
readonly serverUrl: string
|
||||
private workspaceId?: string
|
||||
private token: string
|
||||
private readToken: string
|
||||
private ws?: WebSocket
|
||||
private blockIds: string[] = []
|
||||
private isInitialized = false
|
||||
|
||||
private onChange?: OnChangeHandler
|
||||
private updatedBlocks: IBlock[] = []
|
||||
private updateTimeout?: NodeJS.Timeout
|
||||
|
||||
notificationDelay = 100
|
||||
reopenDelay = 3000
|
||||
|
||||
constructor(serverUrl?: string, token?: string, readToken?: string) {
|
||||
this.serverUrl = serverUrl || Utils.buildURL('', true).replace(/\/$/, '')
|
||||
this.token = token || localStorage.getItem('focalboardSessionId') || ''
|
||||
this.readToken = readToken || OctoListener.getReadToken()
|
||||
Utils.log(`OctoListener serverUrl: ${this.serverUrl}`)
|
||||
}
|
||||
|
||||
static getReadToken(): string {
|
||||
const queryString = new URLSearchParams(window.location.search)
|
||||
const readToken = queryString.get('r') || ''
|
||||
return readToken
|
||||
}
|
||||
|
||||
open(workspaceId: string, blockIds: string[], onChange: OnChangeHandler, onReconnect: () => void, onStateChange?: OnStateChange): void {
|
||||
if (this.ws) {
|
||||
this.close()
|
||||
}
|
||||
|
||||
this.onChange = onChange
|
||||
this.workspaceId = workspaceId
|
||||
|
||||
const url = new URL(this.serverUrl)
|
||||
const protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:'
|
||||
const wsServerUrl = `${protocol}//${url.host}${url.pathname.replace(/\/$/, '')}/ws`
|
||||
Utils.log(`OctoListener open: ${wsServerUrl}`)
|
||||
const ws = new WebSocket(wsServerUrl)
|
||||
this.ws = ws
|
||||
|
||||
ws.onopen = () => {
|
||||
Utils.log('OctoListener webSocket opened.')
|
||||
this.authenticate(workspaceId)
|
||||
this.subscribeToWorkspace(workspaceId)
|
||||
this.isInitialized = true
|
||||
onStateChange?.('open')
|
||||
}
|
||||
|
||||
ws.onerror = (e) => {
|
||||
Utils.logError(`OctoListener websocket onerror. data: ${e}`)
|
||||
}
|
||||
|
||||
ws.onclose = (e) => {
|
||||
Utils.log(`OctoListener websocket onclose, code: ${e.code}, reason: ${e.reason}`)
|
||||
if (ws === this.ws) {
|
||||
// Unexpected close, re-open
|
||||
const reopenBlockIds = this.isInitialized ? this.blockIds.slice() : blockIds.slice()
|
||||
Utils.logError(`Unexpected close, re-opening with ${reopenBlockIds.length} blocks...`)
|
||||
onStateChange?.('close')
|
||||
setTimeout(() => {
|
||||
this.open(workspaceId, reopenBlockIds, onChange, onReconnect)
|
||||
onReconnect()
|
||||
}, this.reopenDelay)
|
||||
}
|
||||
}
|
||||
|
||||
ws.onmessage = (e) => {
|
||||
// Utils.log(`OctoListener websocket onmessage. data: ${e.data}`)
|
||||
if (ws !== this.ws) {
|
||||
Utils.log('Ignoring closed ws')
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const message = JSON.parse(e.data) as WSMessage
|
||||
if (message.error) {
|
||||
Utils.logError(`Listener websocket error: ${message.error}`)
|
||||
return
|
||||
}
|
||||
|
||||
switch (message.action) {
|
||||
case 'UPDATE_BLOCK':
|
||||
this.queueUpdateNotification(message.block!)
|
||||
break
|
||||
default:
|
||||
Utils.logError(`Unexpected action: ${message.action}`)
|
||||
}
|
||||
} catch (err) {
|
||||
Utils.log('message is not an object')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (!this.ws) {
|
||||
return
|
||||
}
|
||||
|
||||
Utils.log(`OctoListener close: ${this.ws.url}`)
|
||||
|
||||
// Use this sequence so the onclose method doesn't try to re-open
|
||||
const ws = this.ws
|
||||
this.ws = undefined
|
||||
this.blockIds = []
|
||||
this.onChange = undefined
|
||||
this.isInitialized = false
|
||||
try {
|
||||
ws?.close()
|
||||
} catch {
|
||||
try {
|
||||
(ws as any)?.websocket?.close()
|
||||
} catch {
|
||||
Utils.log('OctoListener unable to close the websocket')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private authenticate(workspaceId: string): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('OctoListener.addBlocks: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
if (!this.token) {
|
||||
return
|
||||
}
|
||||
const command = {
|
||||
action: 'AUTH',
|
||||
token: this.token,
|
||||
workspaceId,
|
||||
}
|
||||
this.ws.send(JSON.stringify(command))
|
||||
}
|
||||
|
||||
private subscribeToBlocks(blockIds: string[]): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('OctoListener.subscribeToBlocks: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
const command: WSCommand = {
|
||||
action: 'SUBSCRIBE_BLOCKS',
|
||||
blockIds,
|
||||
workspaceId: this.workspaceId,
|
||||
readToken: this.readToken,
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(command))
|
||||
this.blockIds.push(...blockIds)
|
||||
}
|
||||
|
||||
private subscribeToWorkspace(workspaceId: string): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('OctoListener.subscribeToWorkspace: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
const command: WSCommand = {
|
||||
action: 'SUBSCRIBE_WORKSPACE',
|
||||
workspaceId,
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(command))
|
||||
}
|
||||
|
||||
private removeBlocks(blockIds: string[]): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('OctoListener.removeBlocks: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
const command: WSCommand = {
|
||||
action: 'REMOVE',
|
||||
blockIds,
|
||||
workspaceId: this.workspaceId,
|
||||
readToken: this.readToken,
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(command))
|
||||
|
||||
// Remove registered blockIds, maintinging multiple copies (simple ref-counting)
|
||||
for (let i = 0; i < this.blockIds.length; i++) {
|
||||
for (let j = 0; j < blockIds.length; j++) {
|
||||
if (this.blockIds[i] === blockIds[j]) {
|
||||
this.blockIds.splice(i, 1)
|
||||
blockIds.splice(j, 1)
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private queueUpdateNotification(block: IBlock) {
|
||||
this.updatedBlocks = this.updatedBlocks.filter((o) => o.id !== block.id) // Remove existing queued update
|
||||
this.updatedBlocks.push(block)
|
||||
if (this.updateTimeout) {
|
||||
clearTimeout(this.updateTimeout)
|
||||
this.updateTimeout = undefined
|
||||
}
|
||||
|
||||
this.updateTimeout = setTimeout(() => {
|
||||
this.flushUpdateNotifications()
|
||||
}, this.notificationDelay)
|
||||
}
|
||||
|
||||
private flushUpdateNotifications() {
|
||||
for (const block of this.updatedBlocks) {
|
||||
Utils.log(`OctoListener flush update block: ${block.id}`)
|
||||
}
|
||||
this.onChange?.(this.updatedBlocks)
|
||||
this.updatedBlocks = []
|
||||
}
|
||||
}
|
||||
|
||||
export {OctoListener}
|
@ -13,8 +13,8 @@ import {sendFlashMessage} from '../components/flashMessages'
|
||||
import Workspace from '../components/workspace'
|
||||
import mutator from '../mutator'
|
||||
import octoClient from '../octoClient'
|
||||
import {OctoListener} from '../octoListener'
|
||||
import {Utils} from '../utils'
|
||||
import wsClient, {WSClient} from '../wsclient'
|
||||
import {BoardTree, MutableBoardTree} from '../viewModel/boardTree'
|
||||
import {MutableWorkspaceTree, WorkspaceTree} from '../viewModel/workspaceTree'
|
||||
import './boardPage.scss'
|
||||
@ -38,8 +38,6 @@ type State = {
|
||||
}
|
||||
|
||||
class BoardPage extends React.Component<Props, State> {
|
||||
private workspaceListener = new OctoListener()
|
||||
|
||||
constructor(props: Props) {
|
||||
super(props)
|
||||
|
||||
@ -146,17 +144,32 @@ class BoardPage extends React.Component<Props, State> {
|
||||
}
|
||||
}
|
||||
|
||||
updateWebsocketState = (_: WSClient, newState: 'init'|'open'|'close'): void => {
|
||||
if (newState === 'open') {
|
||||
const token = localStorage.getItem('focalboardSessionId') || ''
|
||||
wsClient.authenticate(this.props.match.params.workspaceId || '0', token)
|
||||
wsClient.subscribeToWorkspace(this.props.match.params.workspaceId || '0')
|
||||
}
|
||||
this.setState({websocketClosed: newState === 'close'})
|
||||
}
|
||||
|
||||
componentDidMount(): void {
|
||||
if (this.props.match.params.boardId) {
|
||||
this.attachToBoard(this.props.match.params.boardId, this.props.match.params.viewId)
|
||||
} else {
|
||||
this.sync()
|
||||
}
|
||||
wsClient.addOnChange(this.incrementalUpdate)
|
||||
wsClient.addOnReconnect(this.sync)
|
||||
wsClient.addOnStateChange(this.updateWebsocketState)
|
||||
}
|
||||
|
||||
componentWillUnmount(): void {
|
||||
Utils.log(`boardPage.componentWillUnmount: ${this.props.match.params.boardId}`)
|
||||
this.workspaceListener.close()
|
||||
wsClient.unsubscribeToWorkspace(this.props.match.params.workspaceId || '0')
|
||||
wsClient.removeOnChange(this.incrementalUpdate)
|
||||
wsClient.removeOnReconnect(this.sync)
|
||||
wsClient.removeOnStateChange(this.updateWebsocketState)
|
||||
}
|
||||
|
||||
render(): JSX.Element {
|
||||
@ -226,7 +239,7 @@ class BoardPage extends React.Component<Props, State> {
|
||||
}
|
||||
}
|
||||
|
||||
private async sync() {
|
||||
private sync = async () => {
|
||||
Utils.log(`sync start: ${this.props.match.params.boardId}`)
|
||||
|
||||
let workspace: IWorkspace | undefined
|
||||
@ -239,53 +252,8 @@ class BoardPage extends React.Component<Props, State> {
|
||||
}
|
||||
|
||||
const workspaceTree = await MutableWorkspaceTree.sync()
|
||||
const boardIds = [...workspaceTree.boards.map((o) => o.id), ...workspaceTree.boardTemplates.map((o) => o.id)]
|
||||
this.setState({workspace, workspaceTree})
|
||||
|
||||
let boardIdsToListen: string[]
|
||||
if (boardIds.length > 0) {
|
||||
boardIdsToListen = ['', ...boardIds]
|
||||
} else {
|
||||
// Read-only view
|
||||
boardIdsToListen = [this.props.match.params.boardId || '']
|
||||
}
|
||||
|
||||
// Listen to boards plus all blocks at root (Empty string for parentId)
|
||||
this.workspaceListener.open(
|
||||
octoClient.workspaceId,
|
||||
boardIdsToListen,
|
||||
async (blocks) => {
|
||||
Utils.log(`workspaceListener.onChanged: ${blocks.length}`)
|
||||
this.incrementalUpdate(blocks)
|
||||
},
|
||||
() => {
|
||||
Utils.log('workspaceListener.onReconnect')
|
||||
this.sync()
|
||||
},
|
||||
(state) => {
|
||||
switch (state) {
|
||||
case 'close': {
|
||||
// Show error after a delay to ignore brief interruptions
|
||||
if (!this.state.websocketClosed && !this.state.websocketClosedTimeOutId) {
|
||||
const timeoutId = setTimeout(() => {
|
||||
this.setState({websocketClosed: true, websocketClosedTimeOutId: undefined})
|
||||
}, 5000)
|
||||
this.setState({websocketClosedTimeOutId: timeoutId})
|
||||
}
|
||||
break
|
||||
}
|
||||
case 'open': {
|
||||
if (this.state.websocketClosedTimeOutId) {
|
||||
clearTimeout(this.state.websocketClosedTimeOutId)
|
||||
}
|
||||
this.setState({websocketClosed: false, websocketClosedTimeOutId: undefined})
|
||||
Utils.log('Connection established')
|
||||
break
|
||||
}
|
||||
}
|
||||
},
|
||||
)
|
||||
|
||||
if (this.props.match.params.boardId) {
|
||||
const boardTree = await MutableBoardTree.sync(this.props.match.params.boardId || '', this.props.match.params.viewId || '', this.props.usersById)
|
||||
|
||||
@ -314,7 +282,7 @@ class BoardPage extends React.Component<Props, State> {
|
||||
}
|
||||
}
|
||||
|
||||
private async incrementalUpdate(blocks: IBlock[]) {
|
||||
private incrementalUpdate = async (_: WSClient, blocks: IBlock[]) => {
|
||||
const {workspaceTree, boardTree} = this.state
|
||||
|
||||
let newState = {workspaceTree, boardTree}
|
||||
|
285
webapp/src/wsclient.ts
Normal file
285
webapp/src/wsclient.ts
Normal file
@ -0,0 +1,285 @@
|
||||
// Copyright (c) 2015-present Mattermost, Inc. All Rights Reserved.
|
||||
// See LICENSE.txt for license information.
|
||||
import {Utils} from './utils'
|
||||
import {IBlock} from './blocks/block'
|
||||
|
||||
// These are outgoing commands to the server
|
||||
type WSCommand = {
|
||||
action: string
|
||||
workspaceId?: string
|
||||
readToken?: string
|
||||
blockIds?: string[]
|
||||
}
|
||||
|
||||
// These are messages from the server
|
||||
type WSMessage = {
|
||||
action?: string
|
||||
block?: IBlock
|
||||
error?: string
|
||||
}
|
||||
|
||||
type OnChangeHandler = (client: WSClient, blocks: IBlock[]) => void
|
||||
type OnReconnectHandler = (client: WSClient) => void
|
||||
type OnStateChangeHandler = (client: WSClient, state: 'init' | 'open' | 'close') => void
|
||||
type OnErrorHandler = (client: WSClient, e: Event) => void
|
||||
|
||||
class WSClient {
|
||||
ws: WebSocket|null = null
|
||||
serverUrl: string
|
||||
state: 'init'|'open'|'close' = 'init'
|
||||
onStateChange: OnStateChangeHandler[] = []
|
||||
onReconnect: OnReconnectHandler[] = []
|
||||
onChange: OnChangeHandler[] = []
|
||||
onError: OnErrorHandler[] = []
|
||||
private notificationDelay = 100
|
||||
private reopenDelay = 3000
|
||||
private updatedBlocks: IBlock[] = []
|
||||
private updateTimeout?: NodeJS.Timeout
|
||||
|
||||
constructor(serverUrl?: string) {
|
||||
this.serverUrl = (serverUrl || Utils.getBaseURL(true)).replace(/\/$/, '')
|
||||
Utils.log(`WSClient serverUrl: ${this.serverUrl}`)
|
||||
}
|
||||
|
||||
addOnChange(handler: OnChangeHandler): void {
|
||||
this.onChange.push(handler)
|
||||
}
|
||||
|
||||
removeOnChange(handler: OnChangeHandler): void {
|
||||
const index = this.onChange.indexOf(handler)
|
||||
if (index !== -1) {
|
||||
this.onChange.splice(index, 1)
|
||||
}
|
||||
}
|
||||
|
||||
addOnReconnect(handler: OnReconnectHandler): void {
|
||||
this.onReconnect.push(handler)
|
||||
}
|
||||
|
||||
removeOnReconnect(handler: OnReconnectHandler): void {
|
||||
const index = this.onReconnect.indexOf(handler)
|
||||
if (index !== -1) {
|
||||
this.onReconnect.splice(index, 1)
|
||||
}
|
||||
}
|
||||
|
||||
addOnStateChange(handler: OnStateChangeHandler): void {
|
||||
this.onStateChange.push(handler)
|
||||
}
|
||||
|
||||
removeOnStateChange(handler: OnStateChangeHandler): void {
|
||||
const index = this.onStateChange.indexOf(handler)
|
||||
if (index !== -1) {
|
||||
this.onStateChange.splice(index, 1)
|
||||
}
|
||||
}
|
||||
|
||||
addOnError(handler: OnErrorHandler): void {
|
||||
this.onError.push(handler)
|
||||
}
|
||||
|
||||
removeOnError(handler: OnErrorHandler): void {
|
||||
const index = this.onError.indexOf(handler)
|
||||
if (index !== -1) {
|
||||
this.onError.splice(index, 1)
|
||||
}
|
||||
}
|
||||
|
||||
open(): void {
|
||||
const url = new URL(this.serverUrl)
|
||||
const protocol = (url.protocol === 'https:') ? 'wss:' : 'ws:'
|
||||
const wsServerUrl = `${protocol}//${url.host}${url.pathname.replace(/\/$/, '')}/ws`
|
||||
Utils.log(`WSClient open: ${wsServerUrl}`)
|
||||
const ws = new WebSocket(wsServerUrl)
|
||||
this.ws = ws
|
||||
|
||||
ws.onopen = () => {
|
||||
Utils.log('WSClient webSocket opened.')
|
||||
for (const handler of this.onStateChange) {
|
||||
handler(this, 'open')
|
||||
}
|
||||
this.state = 'open'
|
||||
}
|
||||
|
||||
ws.onerror = (e) => {
|
||||
Utils.logError(`WSClient websocket onerror. data: ${e}`)
|
||||
for (const handler of this.onError) {
|
||||
handler(this, e)
|
||||
}
|
||||
}
|
||||
|
||||
ws.onclose = (e) => {
|
||||
Utils.log(`WSClient websocket onclose, code: ${e.code}, reason: ${e.reason}`)
|
||||
if (ws === this.ws) {
|
||||
// Unexpected close, re-open
|
||||
Utils.logError('Unexpected close, re-opening websocket')
|
||||
for (const handler of this.onStateChange) {
|
||||
handler(this, 'close')
|
||||
}
|
||||
this.state = 'close'
|
||||
setTimeout(() => {
|
||||
this.open()
|
||||
for (const handler of this.onReconnect) {
|
||||
handler(this)
|
||||
}
|
||||
}, this.reopenDelay)
|
||||
}
|
||||
}
|
||||
|
||||
ws.onmessage = (e) => {
|
||||
// Utils.log(`WSClient websocket onmessage. data: ${e.data}`)
|
||||
if (ws !== this.ws) {
|
||||
Utils.log('Ignoring closed ws')
|
||||
return
|
||||
}
|
||||
|
||||
try {
|
||||
const message = JSON.parse(e.data) as WSMessage
|
||||
if (message.error) {
|
||||
Utils.logError(`Listener websocket error: ${message.error}`)
|
||||
return
|
||||
}
|
||||
|
||||
switch (message.action) {
|
||||
case 'UPDATE_BLOCK':
|
||||
this.queueUpdateNotification(message.block!)
|
||||
break
|
||||
default:
|
||||
Utils.logError(`Unexpected action: ${message.action}`)
|
||||
}
|
||||
} catch (err) {
|
||||
Utils.log('message is not an object')
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
authenticate(workspaceId: string, token: string): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('WSClient.addBlocks: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
if (!token) {
|
||||
return
|
||||
}
|
||||
const command = {
|
||||
action: 'AUTH',
|
||||
token,
|
||||
workspaceId,
|
||||
}
|
||||
this.ws.send(JSON.stringify(command))
|
||||
}
|
||||
|
||||
subscribeToBlocks(workspaceId: string, blockIds: string[], readToken = ''): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('WSClient.subscribeToBlocks: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
const command: WSCommand = {
|
||||
action: 'SUBSCRIBE_BLOCKS',
|
||||
blockIds,
|
||||
workspaceId,
|
||||
readToken,
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(command))
|
||||
}
|
||||
|
||||
unsubscribeToWorkspace(workspaceId: string): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('WSClient.subscribeToWorkspace: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
const command: WSCommand = {
|
||||
action: 'UNSUBSCRIBE_WORKSPACE',
|
||||
workspaceId,
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(command))
|
||||
}
|
||||
|
||||
subscribeToWorkspace(workspaceId: string): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('WSClient.subscribeToWorkspace: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
const command: WSCommand = {
|
||||
action: 'SUBSCRIBE_WORKSPACE',
|
||||
workspaceId,
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(command))
|
||||
}
|
||||
|
||||
unsubscribeFromBlocks(workspaceId: string, blockIds: string[], readToken = ''): void {
|
||||
if (!this.ws) {
|
||||
Utils.assertFailure('WSClient.removeBlocks: ws is not open')
|
||||
return
|
||||
}
|
||||
|
||||
const command: WSCommand = {
|
||||
action: 'UNSUBSCRIBE_BLOCKS',
|
||||
blockIds,
|
||||
workspaceId,
|
||||
readToken,
|
||||
}
|
||||
|
||||
this.ws.send(JSON.stringify(command))
|
||||
}
|
||||
|
||||
private queueUpdateNotification(block: IBlock) {
|
||||
this.updatedBlocks = this.updatedBlocks.filter((o) => o.id !== block.id) // Remove existing queued update
|
||||
this.updatedBlocks.push(block)
|
||||
if (this.updateTimeout) {
|
||||
clearTimeout(this.updateTimeout)
|
||||
this.updateTimeout = undefined
|
||||
}
|
||||
|
||||
this.updateTimeout = setTimeout(() => {
|
||||
this.flushUpdateNotifications()
|
||||
}, this.notificationDelay)
|
||||
}
|
||||
|
||||
private flushUpdateNotifications() {
|
||||
for (const block of this.updatedBlocks) {
|
||||
Utils.log(`WSClient flush update block: ${block.id}`)
|
||||
}
|
||||
for (const handler of this.onChange) {
|
||||
handler(this, this.updatedBlocks)
|
||||
}
|
||||
this.updatedBlocks = []
|
||||
}
|
||||
|
||||
close(): void {
|
||||
if (!this.ws) {
|
||||
return
|
||||
}
|
||||
|
||||
Utils.log(`WSClient close: ${this.ws.url}`)
|
||||
|
||||
// Use this sequence so the onclose method doesn't try to re-open
|
||||
const ws = this.ws
|
||||
this.ws = null
|
||||
this.onChange = []
|
||||
this.onReconnect = []
|
||||
this.onStateChange = []
|
||||
this.onError = []
|
||||
try {
|
||||
ws?.close()
|
||||
} catch {
|
||||
try {
|
||||
(ws as any)?.websocket?.close()
|
||||
} catch {
|
||||
Utils.log('WSClient unable to close the websocket')
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
const wsClient = new WSClient()
|
||||
|
||||
export {WSClient}
|
||||
export default wsClient
|
Loading…
Reference in New Issue
Block a user