Skip to content

Commit

Permalink
@tus/server: add support for lockers (#514)
Browse files Browse the repository at this point in the history
Co-authored-by: Merlijn Vos <[email protected]>
  • Loading branch information
fenos and Murderlon authored Dec 12, 2023
1 parent f1a4ac3 commit 7873d93
Show file tree
Hide file tree
Showing 32 changed files with 987 additions and 256 deletions.
2 changes: 2 additions & 0 deletions .yarnrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ plugins:
spec: "@yarnpkg/plugin-typescript"
- path: .yarn/plugins/@yarnpkg/plugin-interactive-tools.cjs
spec: "@yarnpkg/plugin-interactive-tools"

yarnPath: .yarn/releases/yarn-3.2.3.cjs
25 changes: 10 additions & 15 deletions packages/file-store/configstores/FileConfigstore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import fs from 'node:fs/promises'
import path from 'node:path'
import {Upload} from '@tus/server'
import PQueue from 'p-queue'

import {Configstore} from './Types'

Expand All @@ -11,40 +10,36 @@ import {Configstore} from './Types'
*/
export class FileConfigstore implements Configstore {
directory: string
queue: PQueue

constructor(path: string) {
this.directory = path
this.queue = new PQueue({concurrency: 1})
}

async get(key: string): Promise<Upload | undefined> {
try {
const buffer = await this.queue.add(() => fs.readFile(this.resolve(key), 'utf8'))
const buffer = await fs.readFile(this.resolve(key), 'utf8')
return JSON.parse(buffer as string)
} catch {
return undefined
}
}

async set(key: string, value: Upload): Promise<void> {
await this.queue.add(() => fs.writeFile(this.resolve(key), JSON.stringify(value)))
await fs.writeFile(this.resolve(key), JSON.stringify(value))
}

async delete(key: string): Promise<void> {
await this.queue.add(() => fs.rm(this.resolve(key)))
await fs.rm(this.resolve(key))
}

async list(): Promise<Array<string>> {
return this.queue.add(async () => {
const files = await fs.readdir(this.directory)
const sorted = files.sort((a, b) => a.localeCompare(b))
const name = (file: string) => path.basename(file, '.json')
// To only return tus file IDs we check if the file has a corresponding JSON info file
return sorted.filter(
(file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1])
)
})
const files = await fs.readdir(this.directory)
const sorted = files.sort((a, b) => a.localeCompare(b))
const name = (file: string) => path.basename(file, '.json')
// To only return tus file IDs we check if the file has a corresponding JSON info file
return sorted.filter(
(file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1])
)
}

private resolve(key: string): string {
Expand Down
3 changes: 1 addition & 2 deletions packages/file-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
"test": "mocha test.ts --exit --extension ts --require ts-node/register"
},
"dependencies": {
"debug": "^4.3.4",
"p-queue": "^6.6.2"
"debug": "^4.3.4"
},
"devDependencies": {
"@tus/server": "workspace:^",
Expand Down
8 changes: 8 additions & 0 deletions packages/server/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ export const ERRORS = {
status_code: 403,
body: 'Upload-Offset header required\n',
},
ABORTED: {
status_code: 400,
body: 'Request aborted due to lock acquired',
},
ERR_LOCK_TIMEOUT: {
status_code: 500,
body: 'failed to acquire lock before timeout',
},
INVALID_CONTENT_TYPE: {
status_code: 403,
body: 'Content-Type header required\n',
Expand Down
68 changes: 64 additions & 4 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
@@ -1,18 +1,20 @@
import EventEmitter from 'node:events'

import type {ServerOptions} from '../types'
import type {DataStore} from '../models'
import type {ServerOptions, WithRequired} from '../types'
import type {DataStore, CancellationContext} from '../models'
import type http from 'node:http'
import stream from 'node:stream'
import {ERRORS} from '../constants'

const reExtractFileID = /([^/]+)\/?$/
const reForwardedHost = /host="?([^";]+)/
const reForwardedProto = /proto=(https?)/

export class BaseHandler extends EventEmitter {
options: ServerOptions
options: WithRequired<ServerOptions, 'locker'>
store: DataStore

constructor(store: DataStore, options: ServerOptions) {
constructor(store: DataStore, options: WithRequired<ServerOptions, 'locker'>) {
super()
if (!store) {
throw new Error('Store must be defined')
Expand All @@ -27,6 +29,7 @@ export class BaseHandler extends EventEmitter {
// @ts-expect-error not explicitly typed but possible
headers['Content-Length'] = Buffer.byteLength(body, 'utf8')
}

res.writeHead(status, headers)
res.write(body)
return res.end()
Expand Down Expand Up @@ -101,4 +104,61 @@ export class BaseHandler extends EventEmitter {

return {host: host as string, proto}
}

protected async getLocker(req: http.IncomingMessage) {
if (typeof this.options.locker === 'function') {
return this.options.locker(req)
}
return this.options.locker
}

protected async acquireLock(
req: http.IncomingMessage,
id: string,
context: CancellationContext
) {
const locker = await this.getLocker(req)

const lock = locker.newLock(id)

await lock.lock(() => {
context.cancel()
})

return lock
}

protected writeToStore(
req: http.IncomingMessage,
id: string,
offset: number,
context: CancellationContext
) {
return new Promise<number>(async (resolve, reject) => {
if (context.signal.aborted) {
reject(ERRORS.ABORTED)
return
}

const proxy = new stream.PassThrough()
stream.addAbortSignal(context.signal, proxy)

proxy.on('error', (err) => {
req.unpipe(proxy)
if (err.name === 'AbortError') {
reject(ERRORS.ABORTED)
} else {
reject(err)
}
})

req.on('error', (err) => {
if (!proxy.closed) {
proxy.destroy(err)
}
})

this.store.write(req.pipe(proxy), id, offset).then(resolve).catch(reject)
})
}
}
14 changes: 12 additions & 2 deletions packages/server/src/handlers/DeleteHandler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import {BaseHandler} from './BaseHandler'
import {ERRORS, EVENTS} from '../constants'
import {CancellationContext} from '../models'

import type http from 'node:http'

export class DeleteHandler extends BaseHandler {
async send(req: http.IncomingMessage, res: http.ServerResponse) {
async send(
req: http.IncomingMessage,
res: http.ServerResponse,
context: CancellationContext
) {
const id = this.getFileIdFromRequest(req)
if (!id) {
throw ERRORS.FILE_NOT_FOUND
Expand All @@ -14,7 +19,12 @@ export class DeleteHandler extends BaseHandler {
await this.options.onIncomingRequest(req, res, id)
}

await this.store.remove(id)
const lock = await this.acquireLock(req, id, context)
try {
await this.store.remove(id)
} finally {
await lock.unlock()
}
const writtenRes = this.write(res, 204, {})
this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id)
return writtenRes
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/handlers/GetHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class GetHandler extends BaseHandler {
}

const stats = await this.store.getUpload(id)

if (!stats || stats.offset !== stats.size) {
throw ERRORS.FILE_NOT_FOUND
}
Expand Down
17 changes: 14 additions & 3 deletions packages/server/src/handlers/HeadHandler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import {BaseHandler} from './BaseHandler'

import {ERRORS} from '../constants'
import {Metadata} from '../models'
import {Metadata, Upload, CancellationContext} from '../models'

import type http from 'node:http'

export class HeadHandler extends BaseHandler {
async send(req: http.IncomingMessage, res: http.ServerResponse) {
async send(
req: http.IncomingMessage,
res: http.ServerResponse,
context: CancellationContext
) {
const id = this.getFileIdFromRequest(req)
if (!id) {
throw ERRORS.FILE_NOT_FOUND
Expand All @@ -16,7 +20,14 @@ export class HeadHandler extends BaseHandler {
await this.options.onIncomingRequest(req, res, id)
}

const file = await this.store.getUpload(id)
const lock = await this.acquireLock(req, id, context)

let file: Upload
try {
file = await this.store.getUpload(id)
} finally {
await lock.unlock()
}

// If a Client does attempt to resume an upload which has since
// been removed by the Server, the Server SHOULD respond with the
Expand Down
Loading

0 comments on commit 7873d93

Please sign in to comment.