Skip to content

Commit

Permalink
transactions rework: ensure proper context and nesting
Browse files Browse the repository at this point in the history
  • Loading branch information
sdumetz committed Dec 5, 2024
1 parent 14b2991 commit 3cc155e
Show file tree
Hide file tree
Showing 12 changed files with 865 additions and 572 deletions.
1,139 changes: 643 additions & 496 deletions source/server/package-lock.json

Large diffs are not rendered by default.

4 changes: 2 additions & 2 deletions source/server/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,8 +46,8 @@
"handlebars": "^4.7.8",
"morgan": "^1.10.0",
"nodemailer": "^6.9.7",
"sqlite": "^4.1.2",
"sqlite3": "^5.1.2",
"sqlite": "^5.1.1",
"sqlite3": "^5.1.7",
"ts-node": "^10.9.2",
"xml-js": "^1.6.11"
},
Expand Down
2 changes: 1 addition & 1 deletion source/server/routes/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ export default async function createServer(config = defaultConfig) :Promise<expr
try{
await vfs.fillHashes()
}catch(e){
console.error("Failed to fill-in missing hashsums in database");
console.error("Failed to fill-in missing hashsums in database. Application may be unstable.");
}

setInterval(()=>{
Expand Down
5 changes: 2 additions & 3 deletions source/server/routes/scenes/get.ts
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,7 @@ export default async function getScenes(req :Request, res :Response){
"text": ()=> res.status(200).send(scenes.map(m=>m.name).join("\n")+"\n"),

"application/zip": async ()=>{
async function *getFiles(tr: Vfs):AsyncGenerator<ZipEntry,any, unknown>{
async function *getFiles(vfs: Vfs):AsyncGenerator<ZipEntry,any, unknown>{
for(let scene of scenes){
let root = `scenes/${scene.name}`;

Expand Down Expand Up @@ -125,8 +125,7 @@ export default async function getScenes(req :Request, res :Response){
let again = res.write(data);
if(!again) await once(res, "drain");
}

})
});
res.end();
}
});
Expand Down
8 changes: 3 additions & 5 deletions source/server/routes/scenes/post.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@ describe("POST /scenes", function(){
.send(zip.body)
.expect(200);

expect(await vfs.getScene("foo")).to.be.ok;
expect(await vfs.getScene("bar")).to.be.ok;
await expect(vfs.getScene("foo"), `scene "foo" should now exist`).to.be.fulfilled.to.be.ok;
await expect(vfs.getScene("bar"), `scene "bar" should now exist`).to.be.fulfilled.to.be.ok;
let {id}= await vfs.getScene("foo");
expect(await vfs.getFileProps({scene: "foo", name:"articles/hello.html"})).to.have.property("hash", "IHQcUEH8CVmcu6Jc_zSB5HCc0K9HPvP0XGSk3S6f0rQ");
expect(await vfs.getDoc(id)).to.have.property("data", `{"id":1}`);
Expand Down Expand Up @@ -102,16 +102,14 @@ describe("POST /scenes", function(){

expect(res.body).to.be.an("object");
expect(res.body.fail).to.deep.equal([]);
console.log(res.body.ok);
expect(res.body.ok).to.deep.equal([
'foo',
'foo/articles/',
'foo/articles/hello.html',
'foo/models/',
'foo/scene.svx.json'
]);

await expect(vfs.getScene("foo")).to.be.fulfilled;
await expect(vfs.getScene("foo"), `expect scene "foo" to be restored`).to.be.fulfilled;
let {id}= await vfs.getScene("foo");
expect(await vfs.getFileProps({scene: "foo", name:"articles/hello.html"})).to.have.property("hash", "IHQcUEH8CVmcu6Jc_zSB5HCc0K9HPvP0XGSk3S6f0rQ");
expect(await vfs.getDoc(id)).to.have.property("data", `{"id":1}`);
Expand Down
30 changes: 17 additions & 13 deletions source/server/routes/scenes/post.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,22 +24,25 @@ export default async function postScenes(req :Request, res :Response){
let file_name = Uid.toString(Uid.make());
let tmpfile = path.join(vfs.uploadsDir, file_name);
let results :ImportResults = {fail:[], ok:[]};
let handle = await fs.open(tmpfile, "wx+");
try{
let handle = await fs.open(tmpfile, "wx+");
try{
for await (let data of req){
await handle.write(data);
}
}finally{
await handle.close();
for await (let data of req){
await handle.write(data);
}
}catch(e){
await fs.rm(tmpfile, {force: true}).catch(e=>{});
throw e;
}
finally{
await handle.close();
}

/** @fixme make atomic */
await vfs.isolate(async (vfs)=>{
for (let record of await unzip(tmpfile)){
let m = /^(?<contained>scenes\/)?(?<scene>[^/]+)(?:\/(?<name>.+))?\/?$/.exec(record.filename);
const scene :string|undefined = m?.groups?.scene;
const name :string|undefined = m?.groups?.name;

console.log("Parse %s %s from zip", scene, name);
if(!scene){
results.fail.push(`${record.filename}: not matching pattern`);
continue;
Expand All @@ -48,6 +51,7 @@ export default async function postScenes(req :Request, res :Response){
if(!name){
//Create the scene
try{
console.log("create scene");
await vfs.createScene(scene, requester.uid);
}catch(e){
if((e as HTTPError).code != 409) throw e;
Expand Down Expand Up @@ -76,13 +80,13 @@ export default async function postScenes(req :Request, res :Response){
}else{
//Add the file
let rs = createReadStream(tmpfile, {start: record.start, end: record.end});
let f = await vfs.writeFile(rs, {user_id: requester.uid, scene, name, mime: getMimeType(name)});
await vfs.writeFile(rs, {user_id: requester.uid, scene, name, mime: getMimeType(name)});
}

results.ok.push(`${scene}/${name}`);
}
}finally{
await fs.rm(tmpfile, {force: true});
}
console.log("Scenes : ", await vfs.getScene("foo"));
}).finally(() => fs.rm(tmpfile, {force: true}));

res.status(200).send(results);
};
24 changes: 17 additions & 7 deletions source/server/vfs/Base.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import { InternalError, NotFoundError } from "../utils/errors.js";
import { FileProps } from "./types.js";


export type Isolate<that, T> = (this: that, vfs :that)=> Promise<T>;

export default abstract class BaseVfs{

constructor(protected rootDir :string, protected db :Database){}
Expand All @@ -18,21 +20,29 @@ export default abstract class BaseVfs{

/**
* Runs a sequence of methods in isolation
* Every calls to Vfs.db inside of the callback will be wrapped in a transaction
* It _can_ be nested but be sure you understand how savepoints will be unwrapped and how SQLITE_BUSY works
* Every calls to Vfs.db inside of the callback will be seriualized and wrapped in a transaction
*
* @see Database.beginTransaction
*/
public isolate = async <T>(fn :(this: typeof this, vfs :typeof this)=> Promise<T>)=>{
return await this.db.beginTransaction(async (transaction)=>{
let that = new Proxy<typeof this>(this, {
public async isolate<T>(fn :Isolate<typeof this, T>) :Promise<T>{
const parent = this;
return await this.db.beginTransaction(async function isolatedTransaction(transaction){
let closed = false;
let that = new Proxy<typeof parent>(parent, {
get(target, prop, receiver){
if(prop === "db"){
return transaction;
}else if (prop === "isOpen"){
return !closed;
}
return Reflect.get(target, prop, receiver);
}
});
return await fn.call(that, that);
try{
return await fn.call(that, that);
}finally{
closed = true;
}
}) as T;
}

Expand All @@ -47,7 +57,7 @@ export default abstract class BaseVfs{
}

abstract close() :Promise<any>;
public abstract get isOpen():boolean;
public abstract isOpen :boolean;
/**
* Converts a date as stored by sqlite into a js Date object
* Necessary because sqlite applies the ISO standard and omits the "Z" for UTC generated timestamps,
Expand Down
5 changes: 4 additions & 1 deletion source/server/vfs/Clean.ts
Original file line number Diff line number Diff line change
Expand Up @@ -39,7 +39,10 @@ export default abstract class CleanVfs extends BaseVfs{
}

public async optimize(){
await this.db.exec(`PRAGMA optimize;`);
await this.db.exec(`
PRAGMA optimize;
PRAGMA wal_checkpoint(PASSIVE);
`);
}

/**
Expand Down
142 changes: 109 additions & 33 deletions source/server/vfs/helpers/db.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -67,38 +67,114 @@ describe("Database", function(){
});
});

it("opens and makes a transaction", async function(){
await expect(db.beginTransaction(async (tr)=>{
await tr.exec(`INSERT INTO test (name) VALUES ("bar")`);
return await tr.all(`SELECT * FROM test`);
})).to.eventually.have.property("length", 2);
await expect(db.all(`SELECT * FROM test`)).to.eventually.have.property("length", 2);
});

it("rollbacks when an error occurs", async function(){
await expect(db.beginTransaction(async (tr)=>{
await tr.exec(`INSERT INTO test (name) VALUES ("bar")`);
await tr.exec(`INSERT INTO test (name) VALUES ("foo")`); //UNIQUE VIOLATION
})).to.be.rejectedWith("SQLITE_CONSTRAINT: UNIQUE");
await expect(db.all(`SELECT * FROM test`)).to.eventually.have.property("length", 1);
});
describe("beginTransaction()", function(){
it("provides isolation to parent db", async function(){
let p;
let length = (await db.all(`SELECT * FROM test`)).length
await expect(db.beginTransaction(async (tr)=>{
await tr.run(`INSERT INTO test (name) VALUES ($val)`,{$val: "bar"});
p = await db.all(`SELECT * FROM test`)
})).to.be.fulfilled;
expect(p).to.have.property("length", length);
});

it("provides isolation from parent db", async function(){
let p;
await expect(db.beginTransaction(async (tr)=>{
await tr.get("SELECT * FROM test") // makes the transaction explicit
p = db.run(`INSERT INTO test (name) VALUES ("bar")`)
return await tr.all(`SELECT name FROM test`);
})).to.eventually.deep.equal([{name: "foo"}]);
await expect(p).to.be.fulfilled;
});

it("nested transactions reuse the same database instance", async function(){
let count = 0;
await expect(db.beginTransaction(async (tr)=>{
let _close = tr.close;
tr.close = async ()=>{
_close.call(tr);
count++;
}
expect(tr).to.not.equal(db);
await tr.beginTransaction(async tr2=>{
expect(tr2).to.equal(tr);
})
})).to.be.fulfilled;
expect(count).to.equal(1);

});

it("opens and makes a transaction", async function(){
await expect(db.beginTransaction(async (tr)=>{
await tr.exec(`INSERT INTO test (name) VALUES ("bar")`);
return await tr.all(`SELECT * FROM test`);
})).to.eventually.have.property("length", 2);
await expect(db.all(`SELECT * FROM test`)).to.eventually.have.property("length", 2);
});

it("rollbacks when an error occurs", async function(){
await expect(db.beginTransaction(async (tr)=>{
await tr.exec(`INSERT INTO test (name) VALUES ("bar")`);
await tr.exec(`INSERT INTO test (name) VALUES ("foo")`); //UNIQUE VIOLATION
})).to.be.rejectedWith("SQLITE_CONSTRAINT: UNIQUE");
await expect(db.all(`SELECT * FROM test`)).to.eventually.have.property("length", 1);
});

it("won't deadlock", async function(){
await expect(db.beginTransaction(async (tr)=>{
await tr.all("SELECT * FROM test");
await tr.run(`INSERT INTO test (name) VALUES ("alice")`);
await tr.beginTransaction(async tr2=>{
await tr2.all("SELECT * FROM test");
await tr2.run(`INSERT INTO test (name) VALUES ("bob")`);
})
await tr.all("SELECT * FROM test");
await tr.run(`INSERT INTO test (name) VALUES ("charlie")`)
})).to.be.fulfilled;
});

it("rollback unwraps properly from within", async function(){
let exp = [
{id: 1, name: "foo"},
{id: 2, name: "alice"},
{id: 3, name: "bob"}
]
await expect(db.beginTransaction(async (tr)=>{
await tr.run(`INSERT INTO test (name) VALUES ("alice")`);
await expect(tr.beginTransaction(async tr2=>{
await tr2.all("SELECT * FROM test");
throw new Error("Dummy");
})).to.be.rejectedWith("Dummy");
await tr.run(`INSERT INTO test (name) VALUES ("bob")`);
return await tr.all("SELECT * FROM test");
})).to.eventually.deep.equal(exp);

expect(await db.all("SELECT * FROM test"), `changes should not be rolled back`).to.deep.equal(exp);

})

it("picks up changes made within a transaction", async function(){
await expect(db.beginTransaction(async (tr)=>{
await tr.run(`PRAGMA user_version = 42`);
})).to.be.fulfilled;
await expect(db.get("PRAGMA user_version"), `user_version change should be picked up from outside the transaction`).to.eventually.have.property("user_version", 42);
});

it("clears the transaction stack", async function(){
await expect(db.beginTransaction(async (tr)=>{
await tr.beginTransaction(async ()=>{});
//Ending a transaction will cause the outer COMMIT to fail
//unless the previous transaction is still on the stack
await tr.run(`END`);
})).to.be.rejectedWith("SQLITE_ERROR: no such savepoint");

it("provides isolation to parent db", async function(){
let p;
let length = (await db.all(`SELECT * FROM test`)).length
await expect(db.beginTransaction(async (tr)=>{
await tr.exec(`INSERT INTO test (name) VALUES ($val)`,{$val:"bar"});
p = await db.all(`SELECT * FROM test`)
})).to.be.fulfilled;
expect(p).to.have.property("length", length);
});
it("provides isolation from parent db", async function(){
let p;
await expect(db.beginTransaction(async (tr)=>{
await tr.get("SELECT * FROM test") // makes the transaction explicit
p = db.exec(`INSERT INTO test (name) VALUES ("bar")`)
return await tr.all(`SELECT name FROM test`);
})).to.eventually.deep.equal([{name: "foo"}]);
await expect(p).to.be.fulfilled;
});
await expect(db.beginTransaction(async (tr)=>{
await expect(tr.beginTransaction(()=>Promise.reject(new Error("dummy")))).to.be.rejectedWith("dummy");
//Ending a transaction will cause the outer COMMIT to fail
//unless the previous transaction is still on the stack
await tr.run(`END`);
})).to.be.rejectedWith("SQLITE_ERROR: no such savepoint");
});
})
});
17 changes: 12 additions & 5 deletions source/server/vfs/helpers/db.ts
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ import {open as openDatabase, ISqlite, Database as IDatabase } from "sqlite";
import sqlite from "sqlite3";
import config from "../../utils/config.js";
import { debuglog } from "util";
import uid from "../../utils/uid.js";

export interface DbOptions {
filename:string;
Expand All @@ -24,8 +25,13 @@ async function openAndConfigure({filename} :DbOptions){
let db = await openDatabase({
filename,
driver: sqlite.Database,
mode: sqlite.OPEN_URI|sqlite.OPEN_CREATE|sqlite.OPEN_READWRITE,
mode: 0
| sqlite.OPEN_URI
| sqlite.OPEN_CREATE
| sqlite.OPEN_READWRITE
});
await db.run("PRAGMA journal_mode = WAL");
await db.run("PRAGMA synchronous = normal");
await db.run("PRAGMA temp_store = memory");
await db.run(`PRAGMA mmap_size = ${100 * 1000 /*kB*/ * 1000 /*MB*/}`);
await db.run(`PRAGMA page_size = 32768`);
Expand Down Expand Up @@ -58,19 +64,20 @@ export default async function open({filename, forceMigration=true} :DbOptions) :


async function performTransaction<T>(this:Database|Transaction, work :TransactionWork<T>, commit :boolean=true):Promise<T>{
let transaction_id = uid();
// See : https://www.sqlite.org/lang_savepoint.html
if(commit) await this.run(`SAVEPOINT VFS_TRANSACTION`);
if(commit) await this.run(`SAVEPOINT VFS_TRANSACTION_${transaction_id}`);
try{
let res = await work(this);
if(commit) await this.run("RELEASE SAVEPOINT VFS_TRANSACTION");
if(commit) await this.run(`RELEASE SAVEPOINT VFS_TRANSACTION_${transaction_id}`);
return res;
}catch(e){
if(commit) await this.run("ROLLBACK TRANSACTION TO VFS_TRANSACTION").catch(e=>{});
if(commit) await this.run(`ROLLBACK TRANSACTION TO VFS_TRANSACTION_${transaction_id}`);
throw e;
}
}

(db as Database).beginTransaction = async function<T>(work :TransactionWork<T>, commit :boolean = true):Promise<T>{
(db as Database).beginTransaction = async function beginTransaction<T>(work :TransactionWork<T>, commit :boolean = true):Promise<T>{
let conn = await openAndConfigure({filename: db.config.filename}) as Transaction;
conn.beginTransaction = performTransaction.bind(conn) as any;
try{
Expand Down
Loading

0 comments on commit 3cc155e

Please sign in to comment.