Skip to content

Commit

Permalink
Close related Y.js sockets when main websocket session is terminated
Browse files Browse the repository at this point in the history
  • Loading branch information
raimohanska committed Feb 25, 2024
1 parent 408b865 commit b1881a8
Show file tree
Hide file tree
Showing 3 changed files with 48 additions and 12 deletions.
12 changes: 7 additions & 5 deletions YJS_CRDT_WIP.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,21 +11,23 @@ The Y.js based collaborative editing support is under construction.

## TODO

Must-haves
Must-haves, experimental flagged rollout

- None?

Must-haves, enable by default

- APIs
- Manage session on the server side: terminate YJS sockets when websocket session is terminated
- Mobile check
- Performance testing
- Storage requirement measurements
- APIs
- Playwright tests (create text, reload, change, reload, use two clients, clear indexeddb and reload...)
- Include API basic tests in Playwright tests

Nice-to-haves

- Undo buffer integration. Editor has its own local undo but we should also add the full edit as a global undo item
- Persistence: consider storing CRDT snapshot
- Persistence: make sure the compactor works
- UI: Show proper username by the cursor when hovering. Now shows some large number
- UI: Add a formatting toolbar. Needs some styling - if you now enable toolbar in Quill, it looks broken
- Storage requirement measurements
- Sharing: split the TypeScript y-websocket server into a separate shared module for others to enjoy
41 changes: 39 additions & 2 deletions backend/src/board-yjs-server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,24 @@ import { withDBClient } from "./db"
import { getSessionIdFromCookies } from "./http-session"
import { getSessionById } from "./websocket-sessions"
import YWebSocketServer from "./y-websocket-server/YWebSocketServer"
import * as WebSocket from "ws"

const socketsBySessionId: Record<string, WebSocket[]> = {}

export function closeYjsSocketsBySessionId(sessionId: string) {
const sockets = socketsBySessionId[sessionId]
if (sockets) {
for (const socket of sockets) {
socket.close()
}
delete socketsBySessionId[sessionId]
console.log(
`CLOSED ${sockets.length} y.js sockets by session id ${sessionId} - remaining sockets exist for ${
Object.keys(socketsBySessionId).length
} other sessions`,
)
}
}

export function BoardYJSServer(ws: expressWs.Instance, path: string) {
const yWebSocketServer = new YWebSocketServer({
Expand Down Expand Up @@ -34,12 +52,31 @@ export function BoardYJSServer(ws: expressWs.Instance, path: string) {
const boardId = req.params.boardId
const sessionId = getSessionIdFromCookies(req)
const session = sessionId ? getSessionById(sessionId) : undefined
if (!session) {
if (!sessionId || !session) {
//console.warn("No session for YJS connection for board", boardId)
socket.close()
return
}
console.log("Got YJS connection for board", boardId)
if (!socketsBySessionId[sessionId]) {
socketsBySessionId[sessionId] = []
}
socketsBySessionId[sessionId].push(socket)
console.log(
`OPENED y.js connection for session ${sessionId}. Now sockets exist for ${
Object.keys(socketsBySessionId).length
} sessions`,
)
socket.addEventListener("close", () => {
if (socketsBySessionId[sessionId]) {
socketsBySessionId[sessionId] = socketsBySessionId[sessionId].filter((s) => s !== socket)
if (socketsBySessionId[sessionId].length === 0) {
delete socketsBySessionId[sessionId]
}
console.log(
`CLOSED y.js connection. Now sockets exist for ${Object.keys(socketsBySessionId).length} sessions`,
)
}
})
const docName = boardId
try {
await yWebSocketServer.setupWSConnection(socket, docName)
Expand Down
7 changes: 2 additions & 5 deletions backend/src/websocket-sessions.ts
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import {
} from "../../common/src/domain"
import { ServerSideBoardState, maybeGetBoard } from "./board-state"
import { getBoardHistory } from "./board-store"
import { closeYjsSocketsBySessionId } from "./board-yjs-server"
import { randomProfession } from "./professions"
import { getUserIdForEmail } from "./user-store"
import { WsWrapper, toBuffer } from "./ws-wrapper"
Expand Down Expand Up @@ -115,6 +116,7 @@ export function endSession(socket: WsWrapper) {
}
}
delete sessions[socket.id]
closeYjsSocketsBySessionId(sessionId)
}
export function getBoardSessionCount(id: Id) {
return everyoneOnTheBoard(id).length
Expand All @@ -131,11 +133,6 @@ export function terminateSessions() {
Object.values(sessions).forEach((session) => session.close())
}

function describeRange(events: BoardHistoryEntry[]) {
if (events.length === 0) return "[]"
return `${events[0].serial}..${events[events.length - 1].serial}`
}

export async function addSessionToBoard(
boardState: ServerSideBoardState,
origin: WsWrapper,
Expand Down

0 comments on commit b1881a8

Please sign in to comment.