Skip to content

Commit

Permalink
Added connection pool
Browse files Browse the repository at this point in the history
More cleanup
  • Loading branch information
balegas committed Aug 15, 2024
1 parent 6747ef2 commit 1faec98
Show file tree
Hide file tree
Showing 6 changed files with 86 additions and 213 deletions.
20 changes: 12 additions & 8 deletions examples/yjs-provider/app/api/awareness/route.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,20 @@
import { db } from "../../db"
import { pool } from "../../db"
import { NextResponse } from "next/server"

export async function POST(request: Request) {
const body = await request.json()
const db = await pool.connect()
try {
const body = await request.json()

await db.query(
`INSERT INTO ydoc_awareness (client, room, op)
await db.query(
`INSERT INTO ydoc_awareness (client, room, op)
VALUES ($1, $2, $3)
ON CONFLICT (client, room)
DO UPDATE SET op = $3`,
[body.client, body.room, body.op]
)

return NextResponse.json({})
[body.client, body.room, body.op]
)
return NextResponse.json({})
} finally {
db.release()
}
}
58 changes: 0 additions & 58 deletions examples/yjs-provider/app/api/compaction/route.ts

This file was deleted.

82 changes: 46 additions & 36 deletions examples/yjs-provider/app/api/operation/route.ts
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
import { db } from "../../db"
import { pool } from "../../db"
import { NextResponse } from "next/server"

// TODO: still loading yjs twice
Expand All @@ -9,57 +9,57 @@ import * as encoding from "lib0/encoding"
import * as decoding from "lib0/decoding"

import { toBase64, fromBase64 } from "lib0/buffer"
import { PoolClient } from "pg"

const maxRowCount = 50

export async function POST(request: Request) {
const body = await request.json()
const db = await pool.connect()

const errorResponse = validateRequest(body)
if (errorResponse) {
return errorResponse
}
try {
const body = await request.json()

await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[body.room, body.op]
)
await maybeCompact(body.room)
const errorResponse = validateRequest(body)
if (errorResponse) {
return errorResponse
}

return NextResponse.json({})
}

function validateRequest({ room, op }: { room: string; op: string }) {
if (!room) {
return NextResponse.json({ error: `'room' is required` }, { status: 400 })
}
await db.query(`BEGIN`)
await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
[body.room, body.op]
)
await maybeCompact(db, body.room)
await db.query(`COMMIT`)

if (!op) {
return NextResponse.json({ error: `'op' is required` }, { status: 400 })
return NextResponse.json({})
} catch (e) {
await db.query(`ROLLBACK`)
throw e
} finally {
db.release()
}
}

// naive implementation of compaction
async function maybeCompact(room: string) {
const ydoc = new Y.Doc()

const res0 = await db.query(
`SELECT COUNT(*) as count FROM ydoc_operations`,
[]
)
if (res0.rows[0].count < maxRowCount) {
return
}

console.log(`compaction`)
const res1 = await db.query(
async function maybeCompact(db: PoolClient, room: string) {
const res = await db.query(
`SELECT id, op FROM ydoc_operations
WHERE room = $1
ORDER BY id DESC`,
[room]
)
res1.rows.map(({ op }) => {

if (res.rows.length < maxRowCount) {
return
}

console.log(`compaction`)

const ydoc = new Y.Doc()

res.rows.map(({ op }) => {
const buf = fromBase64(op)
const decoder = decoding.createDecoder(buf)
syncProtocol.readSyncMessage(
Expand All @@ -77,7 +77,17 @@ async function maybeCompact(room: string) {
await db.query(`TRUNCATE ydoc_operations`)
await db.query(
`INSERT INTO ydoc_operations (room, op)
VALUES ($1, $2)`,
VALUES ($1, $2)`,
[room, encoded]
)
}

function validateRequest({ room, op }: { room: string; op: string }) {
if (!room) {
return NextResponse.json({ error: `'room' is required` }, { status: 400 })
}

if (!op) {
return NextResponse.json({ error: `'op' is required` }, { status: 400 })
}
}
11 changes: 5 additions & 6 deletions examples/yjs-provider/app/db.ts
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
import pgPkg from "pg"
const { Client } = pgPkg
import { Pool } from "pg"

const db = new Client({
console.log(`init pool`)
const pool = new Pool({
host: `localhost`,
port: 54321,
password: `password`,
user: `postgres`,
database: `electric`,
max: 1,
})

db.connect()

export { db }
export { pool }
Loading

0 comments on commit 1faec98

Please sign in to comment.