Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

@tus/server: add support for lockers #514

Merged
merged 15 commits into from
Dec 12, 2023

Conversation

fenos
Copy link
Collaborator

@fenos fenos commented Nov 10, 2023

Adds distributed locking capabilities via the Locker interface.

new Server({
   ...options,
   locker: new MemoryLocker(),
})

Useful to implement custom lockers with redis / postgres

closes #408

@fenos fenos force-pushed the s3-store/distributed-locks branch from 6aa643a to 9f4630e Compare November 10, 2023 13:48
Copy link
Collaborator

@mitjap mitjap left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for this PR. I gave it a quick look but I see an issue with MemoryLocker. When unlocking a lock, all pending locks are resolved.

it('unlocking a lock should allow only one pending lock to resolve', async () => {
  const locker = new MemoryLocker()
  const lockId = 'upload-id-1'

  await locker.lock(lockId)
  const lock2 = locker.lock(lockId)
  const lock3 = locker.lock(lockId)
  const lock4 = locker.lock(lockId)
  const lock5 = locker.lock(lockId)

  await locker.unlock(lockId)
  await lock2
  await lock3
  await lock4
  await lock5

  throw new Error('should not be reached')
})

packages/server/src/models/Locker.ts Outdated Show resolved Hide resolved
packages/server/src/models/Locker.ts Outdated Show resolved Hide resolved
@fenos fenos force-pushed the s3-store/distributed-locks branch from 9f4630e to ec3b41b Compare November 13, 2023 09:50
@fenos
Copy link
Collaborator Author

fenos commented Nov 13, 2023

Thanks a lot for the great review!
I've changed the memory lock with the queue implementation you proposed! neat!

Let me know if now looks good!

One more note on this, is that - I think we might want to optionally allow to provide a function in to create the locker class.
In my case I run TUS on a multi-tenant way and I'd need to connect to the right backend to acquire the lock.

For example:

new Server({
   ...options,
   locker: (req: Request) => new MyCustomLocker(req.db),
})

then in the BaseHandler i have a getter method:

getLocker(req: http.IncomingMessage) {
   if (typeof this.locker === 'function') {
      return this.locker(req)
   } 
   return this.locker
}

and calling it this way:

const locker = this.getLocker(req)
try {
  locker?.lock(id)
} finally {
  locker?.unlock(id)
}

Any thoughts on this?

@fenos fenos force-pushed the s3-store/distributed-locks branch 3 times, most recently from 28841c9 to aa9b9f1 Compare November 20, 2023 10:35
@Murderlon Murderlon requested a review from Acconut November 21, 2023 09:16
Copy link
Member

@Acconut Acconut left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a great PR, thanks for working on it! Locks are a good mechanism for taming the concurrent nature of requests for resumable uploads, but they also come with their own challenges. I have described this in more detail in tusd's documentation at https://github.com/tus/tusd/blob/main/docs/locks.md#avoiding-locked-uploads. For tusd, we have also implement a mechanism to unlock uploads if a new request attempts to resume the upload. Something similar should be considered here as well.

I think we might want to optionally allow to provide a function in to create the locker class.
In my case I run TUS on a multi-tenant way and I'd need to connect to the right backend to acquire the lock.

I see where this could be useful, even though I don't have a use case and thus cannot judge if this approach comes with caveats. However, if we were to introduce multi-tenant support for lockers, this should be done in a uniform way that also works for the storages, for example.

@fenos
Copy link
Collaborator Author

fenos commented Nov 23, 2023

@Acconut right! I've seen the article and it makes sense! I can implement it on the memory lock!
However, it seems quite difficult to implement it with any sort of stateful backend that handles the locking mechanism. Do you guys have a way to do it with maybe with Redis?

I see where this could be useful, even though I don't have a use case and thus cannot judge if this approach comes with caveats. However, if we were to introduce multi-tenant support for lockers, this should be done in a uniform way that also works for the storages, for example.

Your point makes sense! One way to run Tus multi-tenant would be to create an instance of Server and Store per tenant something like:

const tusServers = new Map()

function getOrCreateServer(tenantId) {
  if (tusServerd.has(tenantId)) {
    return tusServers.get(tenantId)
  }
  const server = new Server(....)
  tusServers.set(tenantId, server)
  return server
}

app.post('/tus', (req, res) => {
   const server = getOrCreateServer(req.tenantId)
   server.handler(req)
})

This would run fully multi-tenant where each tenant has its own bucket and its own Locker.

However, currently, we are running it in a hybrid fashion, where the multi-tenancy is on the database level only; We use each tenant database to apply locks when necessary. I think it makes sense to also optionally support this, by simply creating a locker instance per request, since the locking mechanics are dealt with by the database itself

@Acconut
Copy link
Member

Acconut commented Nov 23, 2023

Do you guys have a way to do it with maybe with Redis?

Redis has a great pub/sub mechanism to send messages. The lock holder could listen for notifications about requests to release the lock. Other request handlers that are interested in acquiring the lock can instruct the holder to cleanup and release the lock by sending a message to a channel which is identified through the upload ID.

I think it makes sense to also optionally support this, by simply creating a locker instance per request, since the locking mechanics are dealt with by the database itself

I don't disagree with this approach, but it's better to think about multi-tenancy for all features of tus-node-server versus building an approach that is just designed for lockers. That ensures that the API is consistent and usable for the long-term future :)

@Murderlon
Copy link
Member

Redis has a great pub/sub mechanism to send messages. The lock holder could listen for notifications about requests to release the lock

Using plain pub/sub for locks might be dangerous as there is no guaranteed arrival. This is why RedLock exists for Redis, which is what I used in my experiments with distributed locks in tusd.

I don't disagree with this approach, but it's better to think about multi-tenancy for all features of tus-node-server versus building an approach that is just designed for lockers. That ensures that the API is consistent and usable for the long-term future :)

I think I agree that taking a step back and seeing how this related to other tus features would be best. But that's a lot more work and likely a breaking change. Since this only adds the ability to turn the locker optionally into a function, I would be okay with merging it as is to not block the PR and the use case.

@Murderlon
Copy link
Member

For tusd, we have also implement a mechanism to unlock uploads if a new request attempts to resume the upload. Something similar should be considered here as well

I do think that we should explore this before merging though.

@Murderlon Murderlon changed the title feat: Distributed locks @tus/server: add support for lockers Nov 23, 2023
@Acconut
Copy link
Member

Acconut commented Nov 23, 2023

Using plain pub/sub for locks might be dangerous as there is no guaranteed arrival.

I am not advocating the use of pub/sub for locking itself, but as a mechanism to communicate that another request handler is interested in acquiring the lock. RedLock might be the best way to implement the lock and then use pub/sub to signal that the lock holder should release it.

Since this only adds the ability to turn the locker optionally into a function, I would be okay with merging it as is to not block the PR and the use case.

Alright, I have no strong opinion on this. Feel free to go that route :)

@fenos
Copy link
Collaborator Author

fenos commented Nov 23, 2023

I currently rely on Postgres Advisory locks to acquire the locks, which works great! However, the complicated part is to signal other servers when to release the lock in a distributed fashion!

Redis Pub/Sub might work in this case 👍

@fenos fenos force-pushed the s3-store/distributed-locks branch from aa9b9f1 to a4edaf0 Compare November 24, 2023 12:52
@fenos
Copy link
Collaborator Author

fenos commented Nov 24, 2023

Hello @Acconut @Murderlon @mitjap I've reworked the implementation in order to support the request cancellation as per spec.

In order for this to work reliably I needed to introduce an AbortController (something like the Context in go) and propagate its signal to all AWS functions as well as all the streams, to cancel their execution.

Once an operation is canceled, the abort controller will propagate the error ERRORS.ABORTED where the server implementation will then respond with Connection: close and terminate the connection early

Additionally I've also added a timeout for acquiring the lock

Let me know if this sounds good to you 👍

@Acconut
Copy link
Member

Acconut commented Nov 27, 2023

In order for this to work reliably I needed to introduce an AbortController (something like the Context in go) and propagate its signal to all AWS functions as well as all the streams, to cancel their execution.

FYI, the implementation in tusd works lightly different. Instead of sending an abort signal that is relayed to the S3 SDK, we effectively stop the incoming request body, so that no more data is read. For the stores, it effectively looks like the request body just ended naturally.

The benefit is that stores do not have to implement an abort mechanism (which is here the case AFAIK). In addition, passing the abort signal to the S3 SDK will likely cause the current operation to fail, meaning that transmitted data might get lost and has to be reuploaded. Stopping the request body appears like a normal EOF to the store, so it will just save all data normally.

I am not sure if a similar approach is feasible in tus-node-server but I just wanted to share the details on how tusd implements it.

@fenos
Copy link
Collaborator Author

fenos commented Nov 27, 2023

Hi @Acconut thanks for sharing this!

However, since you pass the ctx into all the Store functions I thought that if the request is canceled subsequently the context is canceled and also the store operations are then canceled.

Are you sure this is not the case in Tusd?

During upload, if I abort reading the request body (calling req.destroy()) it would indeed propagate the error to the stores, however, this is only true when an upload is ongoing.

For other operations, such as HEAD / DELETE / POST, we don't really read the request body so canceling the request would not make any real difference, thoughts on this?

@Acconut
Copy link
Member

Acconut commented Nov 27, 2023

However, since you pass the ctx into all the Store functions I thought that if the request is canceled subsequently the context is canceled and also the store operations are then canceled.

Are you sure this is not the case in Tusd?

If a lock is requested to be released, the context will make sure to first close the request body. After a delay (a few seconds usually), the context will be cancelled so that all operations will halt. The idea is that closing the request body is enough to cause the storage to exit without error. But if an operation is hanging, the context cancellation will take care of it.

For other operations, such as HEAD / DELETE / POST, we don't really read the request body so canceling the request would not make any real difference, thoughts on this?

These operations should usually finish quickly, so we can just wait until the lock is released.

@fenos
Copy link
Collaborator Author

fenos commented Nov 27, 2023

@Acconut does tusd send back a response when cancelling the request? or simply drop the connection?

@Acconut
Copy link
Member

Acconut commented Nov 27, 2023

Yes, tusd attempts to send a response, but will also close the connection after that. Some HTTP/1.1 are not able to receive this response though because they wait until the full request has been sent before reading a response.

@fenos
Copy link
Collaborator Author

fenos commented Nov 27, 2023

@Acconut Awesome Thanks!
I've got it working as you explained!

To be honest is a bit hacky 😄 because node doesn't provide a nice way to cancel the stream in a graceful way, for this reason, I've created an InteruptableStream which listens for the abort signal. Once the signal is aborted I'm instructing the consumer that there is no more data (by calling this.end()).

During the response, I check if the context was canceled and destroy the request (which closes the connection)

I've included a test that simulates this behavior!

@fenos fenos force-pushed the s3-store/distributed-locks branch 3 times, most recently from 159d405 to 472ef1e Compare December 6, 2023 17:31
@fenos
Copy link
Collaborator Author

fenos commented Dec 7, 2023

Hey @Murderlon @Acconut I've updated the PR with the interface update and added a new option for creating custom context

#514 (comment)

Do you think can now be merged?

@fenos
Copy link
Collaborator Author

fenos commented Dec 7, 2023

The alternative will be to revert the context change and only keep the Locker interface changes.
Then rely on the async locker function and use a global EventEmitter for listening to external release events.

Overall keeps the PR smaller, and we can make the context addition in another PR

To illustrate this is as follows:

Implementation

new Server({
   locker: (req) => createCustomLocker(req.db, pubSub),
})

Locker

class MyCustomLocker implements Locker {
  protected constructor(public readonly db: Database, public readonly events: EventEmitter) {}

  newLock(id: string) {
    return new MyCustomLock(id, database, this.events)
  }
}

let events = new EventEmitter()
let pubSubListening: Promise<void> | undefined

export async function createCustomLocker(db: Database, pubSub: PubSub) {
   if (!pubSubListening) {
      pubSubListening = PubSub.on('lock_release', (id) => {
         events.emit(`release:${id}`)
      })
   }
   
   await pubSubListening

   return new MyCustomLocker(db, events)
}

Lock

class MyCustomLock implements Lock {
   protected requestRelease?: () => void

   constructor(
    private readonly id: string, 
    private readonly db: Database, 
    private readonly events: EventEmitter
    ) {
        this.releaseHandler = this.releaseHandler.bind(this)
    }
    
    releaseHandler() {
       this.requestRelease?.()
    }

   async lock(cancel: RequestCancel) {
      await this.db.acquireLock(this.id)
      this.requestRelease = cancel
      this.events.on(`release:${id}`, this.releaseHandler)
   }

   async unlock() {
     await this.db.unlock(id)
     this.events.removeEventListener(`release:${id}`, this.releaseHandler) 
   }

@fenos fenos force-pushed the s3-store/distributed-locks branch from 965484d to 640363f Compare December 7, 2023 11:39
@fenos fenos force-pushed the s3-store/distributed-locks branch from 640363f to 7577516 Compare December 7, 2023 11:41
@Acconut
Copy link
Member

Acconut commented Dec 8, 2023

Thank you very much for making the changes ❤️ I will leave it up to @Murderlon which interface approach is better suited in this case.

use a global EventEmitter for listening to external release events.

I am not sure I understand this idea properly. With this approach, would these release events still be propagated through whatever external locking server the application is using? For example, if I have a cluster of servers running tus-node-server and I want to use Redis for locks, the release request events should also be routed through Redis, so that every tus-node-server instance has a change of receiving the event. This would not work if we use an EventEmitter, whose scope is limited to one tus-node-server instance.

@fenos
Copy link
Collaborator Author

fenos commented Dec 8, 2023

@Acconut Awesome thanks!

Just to clarify, in the example above, I'm using a local EventEmitter just to forward down to the interested lock, the event received from Redis pubSub.

const eventEmitter = new EventEmitter()
RedisPubSub.on('lock_release, (uploadId) => {
   eventEmitter.emit(`release:${id}`
})

Since you would want to have 1 single redis subscription in each tus-node-server node in the cluster, but we might have many locks with different ids created.

By forwarding the event with the EventEmitter only the Lock that acquired the lock will be notified:

class Lock {

...

lock(release) {
   await this.acquireLock()
   this.events.on(`release:${this.id}`, () => release())
}

it is just a nice pattern to have each lock only listening for the release events they are interested.

If we have a cluster of 6 nodes, each node will receive the redis event as well as emitting the local EventEmitter event, however, only the node that has acquired the lock will act upon the local event, all the other nodes didn't have a handler registered so the EventEmitter event is simply discarded.

Just to clarify, I have removed the createContext approach which i previously added in a previous commit, i think we can introduce it later if needed.

The main reason i introduced it is because i didn't want to create a new instance of MyCustomLocker for every request since i wanted to establish the subsctiption to redis in there only once.

But it can be worked around with a static method or initialising the subscription first and then passing it down to the Locker.

@Murderlon
Copy link
Member

I'm happy you reverted the createContext option as it keeps the PR leaner and I wasn't convinced on the implementation yet. Now it seems to be exactly where we want it to be API wise. Going through it now for final checks.

Copy link
Member

@Murderlon Murderlon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 on the code comments, I think it will really help long-term maintenance.

If I understand correctly, we don't set a default locker and thus we support acquireLock to also return undefined. Wouldn't it be better to set a default locker in the server constructor? Then we have added safety by default, we don't have conditional types everywhere and thus no lock?.unlock(), AFAIK we can even remove the p-queue dependency from FileConfigStore as locks should prevent this now:

/**
* FileConfigstore writes the `Upload` JSON metadata to disk next the uploaded file itself.
* It uses a queue which only processes one operation at a time to prevent unsafe concurrent access.
*/
export class FileConfigstore implements Configstore {

packages/server/src/handlers/BaseHandler.ts Outdated Show resolved Hide resolved
@fenos
Copy link
Collaborator Author

fenos commented Dec 8, 2023

@Murderlon Sure, i can add the MemoryLocker as the default locker

@fenos fenos force-pushed the s3-store/distributed-locks branch from 68ff1a1 to 24071da Compare December 8, 2023 14:24
@fenos
Copy link
Collaborator Author

fenos commented Dec 8, 2023

@Murderlon Done, i've added the MemoryLocker as the default locker on the BaseHandler since there it seemed the more logical place to have it and removed p-queue from the file-store

Copy link
Member

@Murderlon Murderlon left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One last comment and then we can merge this 🎉

packages/server/src/handlers/BaseHandler.ts Outdated Show resolved Hide resolved
@fenos
Copy link
Collaborator Author

fenos commented Dec 12, 2023

@Murderlon I've now added the default option on the Server as per the latest commit 👍

@fenos fenos force-pushed the s3-store/distributed-locks branch from 69c727d to b59f5c2 Compare December 12, 2023 09:21
@Murderlon Murderlon merged commit 7873d93 into tus:main Dec 12, 2023
2 checks passed
@Murderlon
Copy link
Member

Thanks for sticking with it and doing quick iterations on the many rounds of feedback 👌

@fenos fenos deleted the s3-store/distributed-locks branch December 13, 2023 14:42
@netdown
Copy link
Contributor

netdown commented Apr 30, 2024

@Murderlon Is it possible that the locker mechanism is broken? I'm working on a GCS locker (will submit a PR), and noticed that the handler immediately calls unlock after locking. I switched to MemoryLocker, put some logging to lock and unlock functions and started a long running upload (the client is Uppy). It seems that when starting the upload, lock was called and immediately unlocked, and the same happened when the upload finished. Or am I missing something?

@Murderlon
Copy link
Member

We have tests for it and it runs in production at Supabase so if something is not working we'd need a reproducible example or a failing test.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

Add support for lockers, including a distributed lock
5 participants