From 7f083e36e10f599f50e37bc3dd4ce563a9d77564 Mon Sep 17 00:00:00 2001 From: Valter Balegas Date: Fri, 23 Aug 2024 09:04:51 +0100 Subject: [PATCH] Further cleanup --- .../yjs-provider/app/api/awareness/route.ts | 20 ---- .../yjs-provider/app/api/compaction/route.ts | 75 ++++++++++++ .../yjs-provider/app/api/operation/route.ts | 110 +++++++----------- examples/yjs-provider/app/db.ts | 1 - examples/yjs-provider/app/layout.tsx | 4 +- examples/yjs-provider/app/page.tsx | 3 +- examples/yjs-provider/app/y-broadcast.js | 2 +- examples/yjs-provider/app/y-electric.js | 8 +- .../db/migrations/01-create_yjs_tables.sql | 4 +- 9 files changed, 127 insertions(+), 100 deletions(-) delete mode 100644 examples/yjs-provider/app/api/awareness/route.ts create mode 100644 examples/yjs-provider/app/api/compaction/route.ts diff --git a/examples/yjs-provider/app/api/awareness/route.ts b/examples/yjs-provider/app/api/awareness/route.ts deleted file mode 100644 index b6108b8d4c..0000000000 --- a/examples/yjs-provider/app/api/awareness/route.ts +++ /dev/null @@ -1,20 +0,0 @@ -import { pool } from "../../db" -import { NextResponse } from "next/server" - -export async function POST(request: Request) { - const db = await pool.connect() - try { - const body = await request.json() - - await db.query( - `INSERT INTO ydoc_awareness (client, room, op) - VALUES ($1, $2, $3) - ON CONFLICT (client, room) - DO UPDATE SET op = $3`, - [body.client, body.room, body.op] - ) - return NextResponse.json({}) - } finally { - db.release() - } -} diff --git a/examples/yjs-provider/app/api/compaction/route.ts b/examples/yjs-provider/app/api/compaction/route.ts new file mode 100644 index 0000000000..82ed3c3e8c --- /dev/null +++ b/examples/yjs-provider/app/api/compaction/route.ts @@ -0,0 +1,75 @@ +import { pool } from "../../db" +import { NextRequest, NextResponse } from "next/server" + +import * as Y from "yjs" +import * as syncProtocol from "y-protocols/sync" + +import * as encoding from "lib0/encoding" +import * as decoding from "lib0/decoding" + +import { toBase64, fromBase64 } from "lib0/buffer" + +export async function GET(request: NextRequest) { + try { + const { room } = await getRequestParams(request) + + doCompation(room) + + return NextResponse.json({}) + } catch (e) { + const resp = e instanceof Error ? e.message : e + return NextResponse.json(resp, { status: 400 }) + } +} + +async function doCompation(room: string) { + const db = await pool.connect() + try { + await db.query(`BEGIN`) + const res = await db.query( + `DELETE FROM ydoc_operations + WHERE room = $1 + RETURNING *`, + [room] + ) + + const ydoc = new Y.Doc() + res.rows.map(({ op }) => { + const buf = fromBase64(op) + const decoder = decoding.createDecoder(buf) + syncProtocol.readSyncMessage( + decoder, + encoding.createEncoder(), + ydoc, + `server` + ) + }) + + const encoder = encoding.createEncoder() + syncProtocol.writeUpdate(encoder, Y.encodeStateAsUpdate(ydoc)) + const encoded = toBase64(encoding.toUint8Array(encoder)) + + await db.query( + `INSERT INTO ydoc_operations (room, op) + VALUES ($1, $2)`, + [room, encoded] + ) + await db.query(`COMMIT`) + } catch (e) { + await db.query(`ROLLBACK`) + throw e + } finally { + db.release() + } +} + +async function getRequestParams( + request: NextRequest +): Promise<{ room: string }> { + const room = await request.nextUrl.searchParams.get(`room`) + if (!room) { + throw new Error(`'room' is required`) + } + + return { room } +} diff --git a/examples/yjs-provider/app/api/operation/route.ts b/examples/yjs-provider/app/api/operation/route.ts index bd58d49409..8dabcf54ba 100644 --- a/examples/yjs-provider/app/api/operation/route.ts +++ b/examples/yjs-provider/app/api/operation/route.ts @@ -1,93 +1,63 @@ import { pool } from "../../db" import { NextResponse } from "next/server" -// TODO: still loading yjs twice -import * as Y from "yjs" -import * as syncProtocol from "y-protocols/sync" - -import * as encoding from "lib0/encoding" -import * as decoding from "lib0/decoding" - -import { toBase64, fromBase64 } from "lib0/buffer" -import { PoolClient } from "pg" - -const maxRowCount = 50 - export async function POST(request: Request) { - const db = await pool.connect() - try { - const body = await request.json() + const { room, op, clientId } = await getRequestParams(request) - const errorResponse = validateRequest(body) - if (errorResponse) { - return errorResponse + if (!clientId) { + saveOperation(room, op) + } else { + saveAwarenessOperation(room, op, clientId) } - await db.query(`BEGIN`) - await db.query( - `INSERT INTO ydoc_operations (room, op) - VALUES ($1, $2)`, - [body.room, body.op] - ) - await maybeCompact(db, body.room) - await db.query(`COMMIT`) - return NextResponse.json({}) } catch (e) { - await db.query(`ROLLBACK`) - throw e - } finally { - db.release() + const resp = e instanceof Error ? e.message : e + return NextResponse.json(resp, { status: 400 }) } } -// naive implementation of compaction -async function maybeCompact(db: PoolClient, room: string) { - const res = await db.query( - `SELECT id, op FROM ydoc_operations - WHERE room = $1 - ORDER BY id DESC`, - [room] - ) - - if (res.rows.length < maxRowCount) { - return +async function saveOperation(room: string, op: string) { + const db = await pool.connect() + try { + await db.query(`INSERT INTO ydoc_operations (room, op) VALUES ($1, $2)`, [ + room, + op, + ]) + } finally { + db.release() } +} - console.log(`compaction`) - - const ydoc = new Y.Doc() - - res.rows.map(({ op }) => { - const buf = fromBase64(op) - const decoder = decoding.createDecoder(buf) - syncProtocol.readSyncMessage( - decoder, - encoding.createEncoder(), - ydoc, - `server` +async function saveAwarenessOperation( + room: string, + op: string, + clientId: string +) { + const db = await pool.connect() + try { + await db.query( + `INSERT INTO ydoc_awareness (room, clientId, op) VALUES ($1, $2, $3) + ON CONFLICT (clientId, room) + DO UPDATE SET op = $3`, + [room, clientId, op] ) - }) - - const encoder = encoding.createEncoder() - syncProtocol.writeUpdate(encoder, Y.encodeStateAsUpdate(ydoc)) - const encoded = toBase64(encoding.toUint8Array(encoder)) - - await db.query(`TRUNCATE ydoc_operations`) - await db.query( - `INSERT INTO ydoc_operations (room, op) - VALUES ($1, $2)`, - [room, encoded] - ) + } finally { + db.release() + } } -function validateRequest({ room, op }: { room: string; op: string }) { +async function getRequestParams( + request: Request +): Promise<{ room: string; op: string; clientId?: string }> { + const { room, op, clientId } = await request.json() if (!room) { - return NextResponse.json({ error: `'room' is required` }, { status: 400 }) + throw new Error(`'room' is required`) } - if (!op) { - return NextResponse.json({ error: `'op' is required` }, { status: 400 }) + throw new Error(`'op' is required`) } + + return { room, op, clientId } } diff --git a/examples/yjs-provider/app/db.ts b/examples/yjs-provider/app/db.ts index 6847116c08..c778845f4a 100644 --- a/examples/yjs-provider/app/db.ts +++ b/examples/yjs-provider/app/db.ts @@ -1,6 +1,5 @@ import { Pool } from "pg" -console.log(`init pool`) const pool = new Pool({ host: `localhost`, port: 54321, diff --git a/examples/yjs-provider/app/layout.tsx b/examples/yjs-provider/app/layout.tsx index 38b9f24675..ddc3e573cc 100644 --- a/examples/yjs-provider/app/layout.tsx +++ b/examples/yjs-provider/app/layout.tsx @@ -1,6 +1,6 @@ export const metadata = { - title: `Next.js Forms Example`, - description: `Example application with forms and Postgres.`, + title: `Yjs <> Electric`, + description: `Yjs synching with Electric`, } export default function RootLayout({ diff --git a/examples/yjs-provider/app/page.tsx b/examples/yjs-provider/app/page.tsx index a9163cd206..8b750758f4 100644 --- a/examples/yjs-provider/app/page.tsx +++ b/examples/yjs-provider/app/page.tsx @@ -42,6 +42,7 @@ if (typeof window !== `undefined`) { network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts) new BroadcastProvider(room, ydoc, { + connect: true, awareness, }) } @@ -99,7 +100,7 @@ export default function Home() {

This is a demo of Yjs shared - editor backed by Postgres using{` `} + editor synching with {` `} Electric.

diff --git a/examples/yjs-provider/app/y-broadcast.js b/examples/yjs-provider/app/y-broadcast.js index b4b7a3c0b9..382bf165a3 100644 --- a/examples/yjs-provider/app/y-broadcast.js +++ b/examples/yjs-provider/app/y-broadcast.js @@ -63,7 +63,7 @@ export class BroadcastProvider extends Observable { constructor( roomname, doc, - { connect = true, awareness = new awarenessProtocol.Awareness(doc) } = {} + { connect = false, awareness = new awarenessProtocol.Awareness(doc) } = {} ) { super() diff --git a/examples/yjs-provider/app/y-electric.js b/examples/yjs-provider/app/y-electric.js index 9026751ab1..be9f0ea54f 100644 --- a/examples/yjs-provider/app/y-electric.js +++ b/examples/yjs-provider/app/y-electric.js @@ -200,11 +200,11 @@ const sendAwareness = (provider, changedClients) => { if (provider.connected) { const room = provider.roomname - const clientID = `${provider.doc.clientID}` + const clientId = `${provider.doc.clientID}` - fetch(`/api/awareness`, { + fetch(`/api/operation`, { method: `POST`, - body: JSON.stringify({ client: clientID, room, op }), + body: JSON.stringify({ clientId, room, op }), }) } } @@ -267,6 +267,8 @@ export class ElectricProvider extends Observable { }) this._updateHandler = (update, origin) => { + // prevent pushing operations that come from the + // broadcast provider, when it is being used if (origin !== this && !origin.bcChannel) { sendOperation(this, update) } diff --git a/examples/yjs-provider/db/migrations/01-create_yjs_tables.sql b/examples/yjs-provider/db/migrations/01-create_yjs_tables.sql index cc142500f2..419c0a5dd3 100644 --- a/examples/yjs-provider/db/migrations/01-create_yjs_tables.sql +++ b/examples/yjs-provider/db/migrations/01-create_yjs_tables.sql @@ -5,11 +5,11 @@ CREATE TABLE ydoc_operations( ); CREATE TABLE ydoc_awareness( - client TEXT, + clientId TEXT, room TEXT, op TEXT NOT NULL, updated TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP, - PRIMARY KEY (client, room) + PRIMARY KEY (clientId, room) ); CREATE OR REPLACE FUNCTION delete_old_rows()