Skip to content

Commit

Permalink
default MemoryLocker
Browse files Browse the repository at this point in the history
  • Loading branch information
fenos committed Dec 8, 2023
1 parent cf9164d commit 560852b
Show file tree
Hide file tree
Showing 9 changed files with 32 additions and 15 deletions.
15 changes: 9 additions & 6 deletions packages/server/src/handlers/BaseHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,16 @@ import type {DataStore, CancellationContext} from '../models'
import type http from 'node:http'
import stream from 'node:stream'
import {ERRORS} from '../constants'
import {MemoryLocker} from '../lockers'

const reExtractFileID = /([^/]+)\/?$/
const reForwardedHost = /host="?([^";]+)/
const reForwardedProto = /proto=(https?)/

type WithRequired<T, K extends keyof T> = T & {[P in K]-?: T[P]}

export class BaseHandler extends EventEmitter {
options: ServerOptions
options: WithRequired<ServerOptions, 'locker'>
store: DataStore

constructor(store: DataStore, options: ServerOptions) {
Expand All @@ -20,8 +23,12 @@ export class BaseHandler extends EventEmitter {
throw new Error('Store must be defined')
}

if (!options.locker) {
options.locker = new MemoryLocker()
}

this.store = store
this.options = options
this.options = options as WithRequired<ServerOptions, 'locker'>
}

write(res: http.ServerResponse, status: number, headers = {}, body = '') {
Expand Down Expand Up @@ -119,10 +126,6 @@ export class BaseHandler extends EventEmitter {
) {
const locker = await this.getLocker(req)

if (!locker) {
return
}

const lock = locker.newLock(id)

await lock.lock(() => {
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/handlers/DeleteHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ export class DeleteHandler extends BaseHandler {
try {
await this.store.remove(id)
} finally {
await lock?.unlock()
await lock.unlock()
}
const writtenRes = this.write(res, 204, {})
this.emit(EVENTS.POST_TERMINATE, req, writtenRes, id)
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/handlers/HeadHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ export class HeadHandler extends BaseHandler {
try {
file = await this.store.getUpload(id)
} finally {
await lock?.unlock()
await lock.unlock()
}

// If a Client does attempt to resume an upload which has since
Expand Down
2 changes: 1 addition & 1 deletion packages/server/src/handlers/OptionsHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ export class OptionsHandler extends BaseHandler {
res.setHeader('Tus-Extension', this.store.extensions.join(','))
}

return this.write(res, 204, {})
return this.write(res, 204)
}
}
2 changes: 1 addition & 1 deletion packages/server/src/handlers/PatchHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ export class PatchHandler extends BaseHandler {

newOffset = await this.writeToStore(req, id, offset, context)
} finally {
await lock?.unlock()
await lock.unlock()
}

upload.offset = newOffset
Expand Down
6 changes: 3 additions & 3 deletions packages/server/src/handlers/PostHandler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,8 @@ const log = debug('tus-node-server:handlers:post')

export class PostHandler extends BaseHandler {
// Overriding the `BaseHandler` type. We always set `namingFunction` in the constructor.
options!: Required<Pick<ServerOptions, 'namingFunction'>> &
Omit<ServerOptions, 'namingFunction'>
options!: Required<Pick<ServerOptions, 'namingFunction' | 'locker'>> &
Omit<ServerOptions, 'namingFunction' | 'locker'>

constructor(store: DataStore, options: ServerOptions) {
if (options.namingFunction && typeof options.namingFunction !== 'function') {
Expand Down Expand Up @@ -120,7 +120,7 @@ export class PostHandler extends BaseHandler {
context.abort()
throw e
} finally {
await lock?.unlock()
await lock.unlock()
}

if (isFinal && this.options.onUploadFinish) {
Expand Down
6 changes: 6 additions & 0 deletions packages/server/src/server.ts
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ import {
import type stream from 'node:stream'
import type {ServerOptions, RouteHandler} from './types'
import type {DataStore, Upload, CancellationContext} from './models'
import {MemoryLocker} from './lockers'

type Handlers = {
GET: InstanceType<typeof GetHandler>
Expand Down Expand Up @@ -58,6 +59,7 @@ interface TusEvents {

type on = EventEmitter['on']
type emit = EventEmitter['emit']

export declare interface Server {
on<Event extends keyof TusEvents>(event: Event, listener: TusEvents[Event]): this
on(eventName: Parameters<on>[0], listener: Parameters<on>[1]): this
Expand Down Expand Up @@ -92,6 +94,10 @@ export class Server extends EventEmitter {
throw new Error("'datastore' is not defined; must have a datastore")
}

if (!options.locker) {
options.locker = new MemoryLocker()
}

const {datastore, ...rest} = options
this.options = rest
this.datastore = datastore
Expand Down
5 changes: 4 additions & 1 deletion packages/server/src/types.ts
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,10 @@ export type ServerOptions = {
// Default uses `crypto.randomBytes(16).toString('hex')`.
namingFunction?: (req: http.IncomingMessage) => string
// locker implementation to support distributed locks
locker?: Locker | ((req: http.IncomingMessage) => Locker | Promise<Locker>)
locker?:
| Locker
| Promise<Locker>
| ((req: http.IncomingMessage) => Locker | Promise<Locker>)
// `onUploadCreate` will be invoked before a new upload is created.
// If the function returns the (modified) response, the upload will be created.
// If an error is thrown, the HTTP request will be aborted and the provided `body` and `status_code` (or their fallbacks)
Expand Down
7 changes: 6 additions & 1 deletion packages/server/test/DeleteHandler.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import {DataStore} from '../src/models/DataStore'
import {DeleteHandler} from '../src/handlers/DeleteHandler'
import {ERRORS, EVENTS} from '../src/constants'
import {CancellationContext} from '../src/models'
import {MemoryLocker} from '../src'

describe('DeleteHandler', () => {
const path = '/test/output'
Expand All @@ -21,7 +22,11 @@ describe('DeleteHandler', () => {

beforeEach(() => {
fake_store.remove.resetHistory()
handler = new DeleteHandler(fake_store, {relativeLocation: true, path})
handler = new DeleteHandler(fake_store, {
relativeLocation: true,
path,
locker: new MemoryLocker(),
})
req = {url: `${path}/1234`, method: 'DELETE'} as http.IncomingMessage
res = httpMocks.createResponse()
const abortController = new AbortController()
Expand Down

0 comments on commit 560852b

Please sign in to comment.