Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@tus/server: add support for lockers #514

Merged
merged 15 commits into from
Dec 12, 2023
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions .yarnrc.yml
Original file line number Diff line number Diff line change
Expand Up @@ -7,3 +7,5 @@ plugins:
spec: "@yarnpkg/plugin-typescript"
- path: .yarn/plugins/@yarnpkg/plugin-interactive-tools.cjs
spec: "@yarnpkg/plugin-interactive-tools"

yarnPath: .yarn/releases/yarn-3.2.3.cjs
25 changes: 10 additions & 15 deletions packages/file-store/configstores/FileConfigstore.ts
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
import fs from 'node:fs/promises'
import path from 'node:path'
import {Upload} from '@tus/server'
import PQueue from 'p-queue'

import {Configstore} from './Types'

Expand All @@ -11,40 +10,36 @@ import {Configstore} from './Types'
*/
export class FileConfigstore implements Configstore {
directory: string
queue: PQueue

constructor(path: string) {
this.directory = path
this.queue = new PQueue({concurrency: 1})
}

async get(key: string): Promise<Upload | undefined> {
try {
const buffer = await this.queue.add(() => fs.readFile(this.resolve(key), 'utf8'))
const buffer = await fs.readFile(this.resolve(key), 'utf8')
return JSON.parse(buffer as string)
} catch {
return undefined
}
}

async set(key: string, value: Upload): Promise<void> {
await this.queue.add(() => fs.writeFile(this.resolve(key), JSON.stringify(value)))
await fs.writeFile(this.resolve(key), JSON.stringify(value))
}

async delete(key: string): Promise<void> {
await this.queue.add(() => fs.rm(this.resolve(key)))
await fs.rm(this.resolve(key))
}

async list(): Promise<Array<string>> {
return this.queue.add(async () => {
const files = await fs.readdir(this.directory)
const sorted = files.sort((a, b) => a.localeCompare(b))
const name = (file: string) => path.basename(file, '.json')
// To only return tus file IDs we check if the file has a corresponding JSON info file
return sorted.filter(
(file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1])
)
})
const files = await fs.readdir(this.directory)
const sorted = files.sort((a, b) => a.localeCompare(b))
const name = (file: string) => path.basename(file, '.json')
// To only return tus file IDs we check if the file has a corresponding JSON info file
return sorted.filter(
(file, idx) => idx < sorted.length - 1 && name(file) === name(sorted[idx + 1])
)
}

private resolve(key: string): string {
Expand Down
3 changes: 1 addition & 2 deletions packages/file-store/package.json
Original file line number Diff line number Diff line change
Expand Up @@ -21,8 +21,7 @@
"test": "mocha test.ts --exit --extension ts --require ts-node/register"
},
"dependencies": {
"debug": "^4.3.4",
"p-queue": "^6.6.2"
"debug": "^4.3.4"
},
"devDependencies": {
"@tus/server": "workspace:^",
Expand Down
8 changes: 8 additions & 0 deletions packages/server/src/constants.ts
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,14 @@ export const ERRORS = {
status_code: 403,
body: 'Upload-Offset header required\n',
},
ABORTED: {
status_code: 400,
body: 'Request aborted due to lock acquired',
},
ERR_LOCK_TIMEOUT: {
status_code: 500,
body: 'failed to acquire lock before timeout',
},
INVALID_CONTENT_TYPE: {
status_code: 403,
body: 'Content-Type header required\n',
Expand Down
73 changes: 70 additions & 3 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,20 @@
import EventEmitter from 'node:events'

import type {ServerOptions} from '../types'
import type {DataStore} from '../models'
import type {DataStore, CancellationContext} from '../models'
import type http from 'node:http'
import stream from 'node:stream'
import {ERRORS} from '../constants'
import {MemoryLocker} from '../lockers'

const reExtractFileID = /([^/]+)\/?$/
const reForwardedHost = /host="?([^";]+)/
const reForwardedProto = /proto=(https?)/

type WithRequired<T, K extends keyof T> = T & {[P in K]-?: T[P]}

export class BaseHandler extends EventEmitter {
options: ServerOptions
options: WithRequired<ServerOptions, 'locker'>
store: DataStore

constructor(store: DataStore, options: ServerOptions) {
Expand All @@ -18,15 +23,20 @@ export class BaseHandler extends EventEmitter {
throw new Error('Store must be defined')
}

if (!options.locker) {
options.locker = new MemoryLocker()
}
fenos marked this conversation as resolved.
Show resolved Hide resolved

this.store = store
this.options = options
this.options = options as WithRequired<ServerOptions, 'locker'>
}

write(res: http.ServerResponse, status: number, headers = {}, body = '') {
if (status !== 204) {
// @ts-expect-error not explicitly typed but possible
headers['Content-Length'] = Buffer.byteLength(body, 'utf8')
}

res.writeHead(status, headers)
res.write(body)
return res.end()
Expand Down Expand Up @@ -101,4 +111,61 @@ export class BaseHandler extends EventEmitter {

return {host: host as string, proto}
}

protected async getLocker(req: http.IncomingMessage) {
if (typeof this.options.locker === 'function') {
return this.options.locker(req)
fenos marked this conversation as resolved.
Show resolved Hide resolved
}
return this.options.locker
}

protected async acquireLock(
req: http.IncomingMessage,
id: string,
context: CancellationContext
) {
const locker = await this.getLocker(req)

const lock = locker.newLock(id)

await lock.lock(() => {
context.cancel()
})

return lock
}

protected writeToStore(
req: http.IncomingMessage,
id: string,
offset: number,
context: CancellationContext
) {
return new Promise<number>(async (resolve, reject) => {
if (context.signal.aborted) {
reject(ERRORS.ABORTED)
return
}

const proxy = new stream.PassThrough()
stream.addAbortSignal(context.signal, proxy)

proxy.on('error', (err) => {
req.unpipe(proxy)
if (err.name === 'AbortError') {
reject(ERRORS.ABORTED)
} else {
reject(err)
}
})

req.on('error', (err) => {
if (!proxy.closed) {
proxy.destroy(err)
}
})

this.store.write(req.pipe(proxy), id, offset).then(resolve).catch(reject)
Acconut marked this conversation as resolved.
Show resolved Hide resolved
})
}
}
14 changes: 12 additions & 2 deletions packages/server/src/handlers/DeleteHandler.ts
Original file line number Diff line number Diff line change
@@ -1,10 +1,15 @@
import {BaseHandler} from './BaseHandler'
import {ERRORS, EVENTS} from '../constants'
import {CancellationContext} from '../models'

import type http from 'node:http'

export class DeleteHandler extends BaseHandler {
async send(req: http.IncomingMessage, res: http.ServerResponse) {
async send(
req: http.IncomingMessage,
res: http.ServerResponse,
context: CancellationContext
) {
const id = this.getFileIdFromRequest(req)
if (!id) {
throw ERRORS.FILE_NOT_FOUND
Expand All @@ -14,7 +19,12 @@ export class DeleteHandler extends BaseHandler {
await this.options.onIncomingRequest(req, res, id)
}

await this.store.remove(id)
const lock = await this.acquireLock(req, id, context)
try {
await this.store.remove(id)
} finally {
await lock.unlock()
}
const writtenRes = this.write(res, 204, {})
this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id)
return writtenRes
Expand Down
1 change: 1 addition & 0 deletions packages/server/src/handlers/GetHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ export class GetHandler extends BaseHandler {
}

const stats = await this.store.getUpload(id)

if (!stats || stats.offset !== stats.size) {
throw ERRORS.FILE_NOT_FOUND
}
Expand Down
17 changes: 14 additions & 3 deletions packages/server/src/handlers/HeadHandler.ts
Original file line number Diff line number Diff line change
@@ -1,12 +1,16 @@
import {BaseHandler} from './BaseHandler'

import {ERRORS} from '../constants'
import {Metadata} from '../models'
import {Metadata, Upload, CancellationContext} from '../models'

import type http from 'node:http'

export class HeadHandler extends BaseHandler {
async send(req: http.IncomingMessage, res: http.ServerResponse) {
async send(
req: http.IncomingMessage,
res: http.ServerResponse,
context: CancellationContext
) {
const id = this.getFileIdFromRequest(req)
if (!id) {
throw ERRORS.FILE_NOT_FOUND
Expand All @@ -16,7 +20,14 @@ export class HeadHandler extends BaseHandler {
await this.options.onIncomingRequest(req, res, id)
}

const file = await this.store.getUpload(id)
const lock = await this.acquireLock(req, id, context)

let file: Upload
try {
file = await this.store.getUpload(id)
} finally {
await lock.unlock()
}

// If a Client does attempt to resume an upload which has since
// been removed by the Server, the Server SHOULD respond with the
Expand Down
Loading