Skip to content

Commit

Permalink
Store CRDT updates to database along event history bundles
Browse files Browse the repository at this point in the history
  • Loading branch information
raimohanska committed Feb 18, 2024
1 parent 9521703 commit 1ef8a3c
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 189 deletions.
4 changes: 2 additions & 2 deletions YJS_CRDT_WIP.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,9 @@ The Y.js based collaborative editing support is under construction.

## TODO

- Persistence: store diffs to event bundles. Figure out how the compactor should work. Can diffs just be concatenated?
- Persistence: boot the SharedDoc based on stored diffs
- Persistence: consider storing CRDT snapshot
- Persistence: make sure the compactor works
- Persistence: implement local/offline persistence
- Domain: Tag the CRDT based item (properties)
- Domain: Use a separate CRDT field for each item/property. All stored in the single document.
- Domain: Consider if CRDT field values should also be included in the JSON presentation, maybe on save
Expand Down
1 change: 1 addition & 0 deletions backend/migrations/007_add_crdt_update.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
alter table board_event add column crdt_update bytea null;
51 changes: 39 additions & 12 deletions backend/src/board-state.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,23 @@
import { merge } from "lodash"
import { boardReducer } from "../../common/src/board-reducer"
import { Board, BoardCursorPositions, BoardHistoryEntry, Id } from "../../common/src/domain"
import { sleep } from "../../common/src/sleep"
import { createAccessToken, createBoard, fetchBoard, saveRecentEvents } from "./board-store"
import { compactBoardHistory, quickCompactBoardHistory } from "./compact-history"
import { quickCompactBoardHistory } from "./compact-history"
import { Locks } from "./locker"
import { UserSession, broadcastItemLocks, getBoardSessionCount, getSessionCount } from "./websocket-sessions"
import * as Y from "yjs"

// A mutable state object for server side state
export type ServerSideBoardState = {
ready: true
board: Board
recentEvents: BoardHistoryEntry[]
storingEvents: BoardHistoryEntry[]
recentCrdtUpdate: Uint8Array | null
currentlyStoring: {
events: BoardHistoryEntry[]
crdtUpdate: Uint8Array | null
} | null
locks: ReturnType<typeof Locks>
cursorsMoved: boolean
cursorPositions: BoardCursorPositions
Expand Down Expand Up @@ -40,7 +47,8 @@ export async function getBoard(id: Id): Promise<ServerSideBoardState | null> {
board,
accessTokens,
recentEvents: [],
storingEvents: [],
recentCrdtUpdate: null,
currentlyStoring: null,
locks: Locks((changedLocks) => broadcastItemLocks(id, changedLocks)),
cursorsMoved: false,
cursorPositions: {},
Expand All @@ -65,8 +73,8 @@ export async function getBoard(id: Id): Promise<ServerSideBoardState | null> {
}
} catch (e) {
boards.delete(id)
console.error(`Board load failed for board ${id}. Running compact/fix.`)
await compactBoardHistory(id)
// TODO: avoid retry loop
console.error(`Board load failed for board ${id}`)
throw e
}
} else if (!state.ready) {
Expand Down Expand Up @@ -96,6 +104,13 @@ export function updateBoards(boardState: ServerSideBoardState, appEvent: BoardHi
return serial
}

export function updateBoardCrdt(id: Id, crdtUpdate: Uint8Array) {
const boardState = maybeGetBoard(id)

if (!boardState) throw Error(`No state for board ${id}`)
boardState.recentCrdtUpdate = combineCrdtUpdates(boardState.recentCrdtUpdate, crdtUpdate)
}

export async function addBoard(board: Board, createToken?: boolean): Promise<ServerSideBoardState> {
await createBoard(board)
const accessTokens = createToken ? [await createAccessToken(board)] : []
Expand All @@ -104,7 +119,8 @@ export async function addBoard(board: Board, createToken?: boolean): Promise<Ser
board,
serial: 0,
recentEvents: [],
storingEvents: [],
recentCrdtUpdate: null,
currentlyStoring: null,
locks: Locks((changedLocks) => broadcastItemLocks(board.id, changedLocks)),
cursorsMoved: false,
cursorPositions: {},
Expand Down Expand Up @@ -135,21 +151,26 @@ export async function awaitSavingChanges() {

async function saveBoardChanges(state: ServerSideBoardState) {
if (state.recentEvents.length > 0) {
if (state.storingEvents.length > 0) {
if (state.currentlyStoring) {
throw Error("Invariant failed: storingEvents not empty")
}
state.storingEvents = state.recentEvents.splice(0)
state.currentlyStoring = {
events: state.recentEvents.splice(0),
crdtUpdate: state.recentCrdtUpdate,
}
state.recentCrdtUpdate = null
console.log(
`Saving board ${state.board.id} at serial ${state.board.serial} with ${state.storingEvents.length} new events`,
`Saving board ${state.board.id} at serial ${state.board.serial} with ${state.currentlyStoring.events.length} new events`,
)
try {
await saveRecentEvents(state.board.id, state.storingEvents)
await saveRecentEvents(state.board.id, state.currentlyStoring.events, state.currentlyStoring.crdtUpdate)
} catch (e) {
// Push event back to the head of save list for retrying later
state.recentEvents = [...state.storingEvents, ...state.recentEvents]
state.recentEvents = [...state.currentlyStoring.events, ...state.recentEvents]
state.recentCrdtUpdate = merge(state.currentlyStoring.crdtUpdate, state.recentCrdtUpdate)
console.error("Board save failed for board", state.board.id, e)
}
state.storingEvents = []
state.currentlyStoring = null
}
if (state.recentEvents.length === 0 && getBoardSessionCount(state.board.id) === 0) {
console.log(`Purging board ${state.board.id} from memory`)
Expand All @@ -158,6 +179,12 @@ async function saveBoardChanges(state: ServerSideBoardState) {
}
}

export function combineCrdtUpdates(a: Uint8Array | null, b: Uint8Array | null) {
if (!a) return b
if (!b) return a
return Y.mergeUpdates([a, b])
}

export function getActiveBoardCount() {
return boards.size
}
Expand Down
32 changes: 17 additions & 15 deletions backend/src/board-store.ts
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ export async function createBoard(board: Board): Promise<void> {

if (!isBoardEmpty(board)) {
console.log(`Creating non-empty board ${board.id} -> bootstrapping history`)
storeEventHistoryBundle(board.id, [mkBootStrapEvent(board.id, board)], client)
storeEventHistoryBundle(board.id, [mkBootStrapEvent(board.id, board)], null, client)
}
})
}
Expand Down Expand Up @@ -182,8 +182,8 @@ export async function createAccessToken(board: Board): Promise<string> {
return token
}

export async function saveRecentEvents(id: Id, recentEvents: BoardHistoryEntry[]) {
await inTransaction(async (client) => storeEventHistoryBundle(id, recentEvents, client))
export async function saveRecentEvents(id: Id, recentEvents: BoardHistoryEntry[], crdtUpdate: Uint8Array | null) {
await inTransaction(async (client) => storeEventHistoryBundle(id, recentEvents, crdtUpdate, client))
}

type StreamingBoardEventCallback = (chunk: BoardHistoryEntry[]) => void
Expand Down Expand Up @@ -322,6 +322,7 @@ export async function saveBoardSnapshot(board: Board, client: PoolClient) {
export async function storeEventHistoryBundle(
boardId: Id,
events: BoardHistoryEntry[],
crdtUpdate: Uint8Array | null,
client: PoolClient,
savedAt = new Date(),
) {
Expand All @@ -332,8 +333,8 @@ export async function storeEventHistoryBundle(
const firstSerial = assertNotNull(events[0].serial)
const lastSerial = assertNotNull(events[events.length - 1].serial)
await client.query(
`INSERT INTO board_event(board_id, first_serial, last_serial, events, saved_at) VALUES ($1, $2, $3, $4, $5)`,
[boardId, firstSerial, lastSerial, { events }, savedAt],
`INSERT INTO board_event(board_id, first_serial, last_serial, events, crdt_update, saved_at) VALUES ($1, $2, $3, $4, $5, $6)`,
[boardId, firstSerial, lastSerial, { events }, crdtUpdate, savedAt],
)
}
}
Expand All @@ -344,15 +345,7 @@ export type BoardHistoryBundle = {
events: {
events: BoardHistoryEntry[]
}
}

export async function getBoardHistoryBundles(client: PoolClient, id: Id): Promise<BoardHistoryBundle[]> {
return (
await client.query(
`SELECT board_id, last_serial, events FROM board_event WHERE board_id=$1 ORDER BY last_serial`,
[id],
)
).rows.map(migrateBundle)
crdt_update: Uint8Array | null
}

export async function getBoardHistoryBundlesWithLastSerialsBetween(
Expand All @@ -363,12 +356,21 @@ export async function getBoardHistoryBundlesWithLastSerialsBetween(
): Promise<BoardHistoryBundle[]> {
return (
await client.query(
`SELECT board_id, last_serial, events FROM board_event WHERE board_id=$1 AND last_serial >= $2 AND last_serial <= $3 ORDER BY last_serial`,
`SELECT board_id, last_serial, events, crdt_update FROM board_event WHERE board_id=$1 AND last_serial >= $2 AND last_serial <= $3 ORDER BY last_serial`,
[id, lsMin, lsMax],
)
).rows.map(migrateBundle)
}

export async function getBoardHistoryCrdtUpdates(client: PoolClient, id: Id): Promise<Uint8Array[]> {
return (
await client.query(
`SELECT crdt_update FROM board_event WHERE board_id=$1 AND crdt_update IS NOT NULL ORDER BY last_serial`,
[id],
)
).rows.map((row) => row.crdt_update)
}

function migrateBundle(b: BoardHistoryBundle): BoardHistoryBundle {
return { ...b, events: { ...b.events, events: b.events.events.map(migrateEvent) } }
}
Expand Down
108 changes: 5 additions & 103 deletions backend/src/compact-history.ts
Original file line number Diff line number Diff line change
@@ -1,20 +1,15 @@
import { format } from "date-fns"
import _ from "lodash"
import { PoolClient } from "pg"
import { boardReducer } from "../../common/src/board-reducer"
import { Board, BoardHistoryEntry, Id, Serial } from "../../common/src/domain"
import { migrateBoard, mkBootStrapEvent } from "../../common/src/migration"
import { BoardHistoryEntry, Id } from "../../common/src/domain"
import {
getBoardHistoryBundleMetas,
getBoardHistoryBundles,
getBoardHistoryBundlesWithLastSerialsBetween,
mkSnapshot,
saveBoardSnapshot,
storeEventHistoryBundle,
verifyContinuity,
verifyContinuityFromMetas,
verifyEventArrayContinuity,
} from "./board-store"
import * as Y from "yjs"
import { inTransaction } from "./db"

export async function quickCompactBoardHistory(id: Id): Promise<number> {
Expand Down Expand Up @@ -45,6 +40,8 @@ export async function quickCompactBoardHistory(id: Id): Promise<number> {
)
const eventArrays = bundlesWithData.map((b) => b.events.events)
const events: BoardHistoryEntry[] = eventArrays.flat()
const crdtUpdates = bundlesWithData.flatMap((d) => (d.crdt_update ? [d.crdt_update] : []))
const combinedCrdtUpdate = crdtUpdates.length ? Y.mergeUpdates(crdtUpdates) : null
const consistent =
verifyContinuity(id, firstBundle.first_serial - 1, ...eventArrays) &&
verifyEventArrayContinuity(id, firstBundle.first_serial - 1, events)
Expand All @@ -62,7 +59,7 @@ export async function quickCompactBoardHistory(id: Id): Promise<number> {
)
}
// 2. store as a single bundle
await storeEventHistoryBundle(id, events, client, lastBundle.saved_at)
await storeEventHistoryBundle(id, events, combinedCrdtUpdate, client, lastBundle.saved_at)
} else {
throw Error("Discontinuity detected in compacted history.")
}
Expand All @@ -86,98 +83,3 @@ export async function quickCompactBoardHistory(id: Id): Promise<number> {
return 0
}
}

// TODO: get rid of the legacy compactor altogether after more experience with the quick one.
// It's role is currently work as a fallback in case the quick one fails. It's very unlikely though
// that it can fix any problems for real. The actual need is more like rebooting the whole history in case
// it's not consistent. And that would be the bootstrapHistory thing.
export function compactBoardHistory(id: Id) {
return inTransaction(async (client) => {
const bundles = await getBoardHistoryBundles(client, id)
const eventArrays = bundles.map((b) => b.events.events)
const consistent = verifyContinuity(id, 0, ...eventArrays)
const events: BoardHistoryEntry[] = eventArrays.flat()
if (consistent) {
const sorted: DateBundle[] = []
let current: DateBundle | null = null
type DateBundle = { date: string; events: BoardHistoryEntry[] }
for (let event of events) {
const date = format(new Date(event.timestamp), "yyyy-MM-dd hh")
if (current == null || date !== current.date) {
if (current !== null) {
sorted.push(current)
}
current = { events: [], date }
}
current.events.push(event)
}
if (current != null) {
sorted.push(current)
}
const compactedOk = verifyContinuity(id, 0, ...sorted.map((e) => e.events))
if (!compactedOk || sorted.flatMap((e) => e.events).length !== events.length) {
throw Error("Compaction failure")
}
if (sorted.length === bundles.length) {
console.log(
`Board ${id}: Verified ${bundles.length} bundles containing ${events.length} events => no need to compact`,
)
return
}
console.log(
`Board ${id}: Verified ${bundles.length} bundles containing ${events.length} events => compacting to ${sorted.length} bundles`,
)
//console.log(sorted.map(s => s.date).join("\n "))

const serials = bundles.map((b) => b.last_serial!)
const result = await client.query(
`DELETE FROM board_event where board_id=$1 and last_serial in (${serials.join(",")})`,
[id],
)
if (result.rowCount != serials.length) {
throw Error(`Unexpected rowcount ${result.rowCount} for board ${id}`)
}
for (let newBundle of sorted) {
await storeEventHistoryBundle(id, newBundle.events, client)
}
} else {
console.warn(`Aborting compaction of board ${id} due to inconsistent history`)
const result = await client.query("SELECT content FROM board WHERE id=$1", [id])
if (result.rowCount != 1) {
console.warn("Board not found!?")
return
}
const snapshot = result.rows[0].content as Board
if (snapshot.serial) {
console.log(`Found snapshot at serial ${snapshot.serial}`)

const followingEvents = events.filter((e) => e.serial! > snapshot.serial)

if (followingEvents.length > 0 && followingEvents[0].serial !== snapshot.serial + 1) {
console.log(
`Cannot find a consecutive event for snapshot. First event is ${followingEvents[0]?.serial}`,
)
} else {
console.log("Bootstraping history based on snapshot")
await bootstrapHistory(id, snapshot, followingEvents, client)
}
} else {
console.warn("No snapshot with serial available discarding history and bootstrapping to latest state.")
await bootstrapHistory(id, mkSnapshot(snapshot, 0), [], client)
}
}
})
}

// TODO: should re-serialize events as well to make it consistent. Then the clients should somehow detect that their serial is ahead and start from scratch. Otherwise they'll ignore future events
async function bootstrapHistory(boardId: Id, snap: Board, events: BoardHistoryEntry[], client: PoolClient) {
const board = migrateBoard(events.reduce((b, e) => boardReducer(b, e)[0], snap))

const bootstrappedHistory = [mkBootStrapEvent(boardId, board)] as BoardHistoryEntry[]

await client.query(`DELETE FROM board_event where board_id=$1`, [boardId])
await storeEventHistoryBundle(boardId, bootstrappedHistory, client)
await saveBoardSnapshot(board, client)

console.log(`Bootstrapped history for board ${boardId}`)
}
Loading

0 comments on commit 1ef8a3c

Please sign in to comment.