Skip to content

Commit

Permalink
Further cleanup
Browse files Browse the repository at this point in the history
  • Loading branch information
balegas committed Aug 23, 2024
1 parent 2e68891 commit 7f083e3
Show file tree
Hide file tree
Showing 9 changed files with 127 additions and 100 deletions.
20 changes: 0 additions & 20 deletions examples/yjs-provider/app/api/awareness/route.ts

This file was deleted.

75 changes: 75 additions & 0 deletions examples/yjs-provider/app/api/compaction/route.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
import { pool } from "../../db"
import { NextRequest, NextResponse } from "next/server"

import * as Y from "yjs"
import * as syncProtocol from "y-protocols/sync"

import * as encoding from "lib0/encoding"
import * as decoding from "lib0/decoding"

import { toBase64, fromBase64 } from "lib0/buffer"

export async function GET(request: NextRequest) {
try {
const { room } = await getRequestParams(request)

doCompation(room)

return NextResponse.json({})
} catch (e) {
const resp = e instanceof Error ? e.message : e
return NextResponse.json(resp, { status: 400 })
}
}

async function doCompation(room: string) {
const db = await pool.connect()
try {
await db.query(`BEGIN`)
const res = await db.query(
`DELETE FROM ydoc_operations
WHERE room = $1
RETURNING *`,
[room]
)

const ydoc = new Y.Doc()
res.rows.map(({ op }) => {
const buf = fromBase64(op)
const decoder = decoding.createDecoder(buf)
syncProtocol.readSyncMessage(
decoder,
encoding.createEncoder(),
ydoc,
`server`
)
})

const encoder = encoding.createEncoder()
syncProtocol.writeUpdate(encoder, Y.encodeStateAsUpdate(ydoc))
const encoded = toBase64(encoding.toUint8Array(encoder))

await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[room, encoded]
)
await db.query(`COMMIT`)
} catch (e) {
await db.query(`ROLLBACK`)
throw e
} finally {
db.release()
}
}

async function getRequestParams(
request: NextRequest
): Promise<{ room: string }> {
const room = await request.nextUrl.searchParams.get(`room`)
if (!room) {
throw new Error(`'room' is required`)
}

return { room }
}
110 changes: 40 additions & 70 deletions examples/yjs-provider/app/api/operation/route.ts
Original file line number Diff line number Diff line change
@@ -1,93 +1,63 @@
import { pool } from "../../db"
import { NextResponse } from "next/server"

// TODO: still loading yjs twice
import * as Y from "yjs"
import * as syncProtocol from "y-protocols/sync"

import * as encoding from "lib0/encoding"
import * as decoding from "lib0/decoding"

import { toBase64, fromBase64 } from "lib0/buffer"
import { PoolClient } from "pg"

const maxRowCount = 50

export async function POST(request: Request) {
const db = await pool.connect()

try {
const body = await request.json()
const { room, op, clientId } = await getRequestParams(request)

const errorResponse = validateRequest(body)
if (errorResponse) {
return errorResponse
if (!clientId) {
saveOperation(room, op)
} else {
saveAwarenessOperation(room, op, clientId)
}

await db.query(`BEGIN`)
await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[body.room, body.op]
)
await maybeCompact(db, body.room)
await db.query(`COMMIT`)

return NextResponse.json({})
} catch (e) {
await db.query(`ROLLBACK`)
throw e
} finally {
db.release()
const resp = e instanceof Error ? e.message : e
return NextResponse.json(resp, { status: 400 })
}
}

// naive implementation of compaction
async function maybeCompact(db: PoolClient, room: string) {
const res = await db.query(
`SELECT id, op FROM ydoc_operations
WHERE room = $1
ORDER BY id DESC`,
[room]
)

if (res.rows.length < maxRowCount) {
return
async function saveOperation(room: string, op: string) {
const db = await pool.connect()
try {
await db.query(`INSERT INTO ydoc_operations (room, op) VALUES ($1, $2)`, [
room,
op,
])
} finally {
db.release()
}
}

console.log(`compaction`)

const ydoc = new Y.Doc()

res.rows.map(({ op }) => {
const buf = fromBase64(op)
const decoder = decoding.createDecoder(buf)
syncProtocol.readSyncMessage(
decoder,
encoding.createEncoder(),
ydoc,
`server`
async function saveAwarenessOperation(
room: string,
op: string,
clientId: string
) {
const db = await pool.connect()
try {
await db.query(
`INSERT INTO ydoc_awareness (room, clientId, op) VALUES ($1, $2, $3)
ON CONFLICT (clientId, room)
DO UPDATE SET op = $3`,
[room, clientId, op]
)
})

const encoder = encoding.createEncoder()
syncProtocol.writeUpdate(encoder, Y.encodeStateAsUpdate(ydoc))
const encoded = toBase64(encoding.toUint8Array(encoder))

await db.query(`TRUNCATE ydoc_operations`)
await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[room, encoded]
)
} finally {
db.release()
}
}

function validateRequest({ room, op }: { room: string; op: string }) {
async function getRequestParams(
request: Request
): Promise<{ room: string; op: string; clientId?: string }> {
const { room, op, clientId } = await request.json()
if (!room) {
return NextResponse.json({ error: `'room' is required` }, { status: 400 })
throw new Error(`'room' is required`)
}

if (!op) {
return NextResponse.json({ error: `'op' is required` }, { status: 400 })
throw new Error(`'op' is required`)
}

return { room, op, clientId }
}
1 change: 0 additions & 1 deletion examples/yjs-provider/app/db.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
import { Pool } from "pg"

console.log(`init pool`)
const pool = new Pool({
host: `localhost`,
port: 54321,
Expand Down
4 changes: 2 additions & 2 deletions examples/yjs-provider/app/layout.tsx
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
export const metadata = {
title: `Next.js Forms Example`,
description: `Example application with forms and Postgres.`,
title: `Yjs <> Electric`,
description: `Yjs synching with Electric`,
}

export default function RootLayout({
Expand Down
3 changes: 2 additions & 1 deletion examples/yjs-provider/app/page.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ if (typeof window !== `undefined`) {
network = new ElectricProvider(`http://localhost:3000/`, room, ydoc, opts)

new BroadcastProvider(room, ydoc, {
connect: true,
awareness,
})
}
Expand Down Expand Up @@ -99,7 +100,7 @@ export default function Home() {
</form>
<p>
This is a demo of <a href="https://github.com/yjs/yjs">Yjs</a> shared
editor backed by Postgres using{` `}
editor synching with {` `}
<a href="https://github.com/electric-sql/electric">Electric</a>.
</p>
<div ref={editor}></div>
Expand Down
2 changes: 1 addition & 1 deletion examples/yjs-provider/app/y-broadcast.js
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,7 @@ export class BroadcastProvider extends Observable {
constructor(
roomname,
doc,
{ connect = true, awareness = new awarenessProtocol.Awareness(doc) } = {}
{ connect = false, awareness = new awarenessProtocol.Awareness(doc) } = {}
) {
super()

Expand Down
8 changes: 5 additions & 3 deletions examples/yjs-provider/app/y-electric.js
Original file line number Diff line number Diff line change
Expand Up @@ -200,11 +200,11 @@ const sendAwareness = (provider, changedClients) => {

if (provider.connected) {
const room = provider.roomname
const clientID = `${provider.doc.clientID}`
const clientId = `${provider.doc.clientID}`

fetch(`/api/awareness`, {
fetch(`/api/operation`, {
method: `POST`,
body: JSON.stringify({ client: clientID, room, op }),
body: JSON.stringify({ clientId, room, op }),
})
}
}
Expand Down Expand Up @@ -267,6 +267,8 @@ export class ElectricProvider extends Observable {
})

this._updateHandler = (update, origin) => {
// prevent pushing operations that come from the
// broadcast provider, when it is being used
if (origin !== this && !origin.bcChannel) {
sendOperation(this, update)
}
Expand Down
4 changes: 2 additions & 2 deletions examples/yjs-provider/db/migrations/01-create_yjs_tables.sql
Original file line number Diff line number Diff line change
Expand Up @@ -5,11 +5,11 @@ CREATE TABLE ydoc_operations(
);

CREATE TABLE ydoc_awareness(
client TEXT,
clientId TEXT,
room TEXT,
op TEXT NOT NULL,
updated TIMESTAMPTZ DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (client, room)
PRIMARY KEY (clientId, room)
);

CREATE OR REPLACE FUNCTION delete_old_rows()
Expand Down

0 comments on commit 7f083e3

Please sign in to comment.