Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Unblock queues on shutdown or errors #10

Open
wants to merge 14 commits into
base: master
Choose a base branch
from

Conversation

deavmi
Copy link
Owner

@deavmi deavmi commented Nov 26, 2023

Purpose ✍️

We want to unblock all calls to dequeue() in two scenarios:

  1. If the underlying Stream (river stream) has an error (remote host disconnects)
  2. If the stop() method is called in the Manager

Todo ✅

  • On stop()
    • Implementation
    • Unittest
  • On Stream read failure (would be BClient actually)
    • Implementation
    • Unittest

Problems

  • User-initiated stop() can trigger WATCHER_FAILED as the BClient unblocks quicker to then call stop_FailedWatcher() triggering to save said exit reason, and then dequeue() wakes up before we race to set the correct exit reason for it to see instead of the old one
  • Shutdown default queue as well
  • Resources
    • After shutdown release all queues

- Added a TODO for `stop()`
@deavmi deavmi self-assigned this Nov 26, 2023
@deavmi deavmi added bug Something isn't working enhancement New feature or request labels Nov 26, 2023
- Added TODO where we need to handling the unblocking of all `dequeue()` calls
- Added enum member `UNSET`
- Added an `exitReason` and an `alive` (set to `true` on construction)
- Calling `shutdownQueue(ErrorType)` will set the exit reason, will also set the aliveness to `false` and wake up ALL `dequeue()`'s blocking
- `dequeue()` first check in wakeup routine duty cycle is to check if we are alive
- Added new member `MANAGER_SHUTDOWN`
- Implemented `shutdownAllQueues()`
- Calling `stop()` now shuts down all queues
@deavmi
Copy link
Owner Author

deavmi commented Nov 26, 2023

Purpose ✍️

We want to unblock all calls to dequeue() in two scenarios:

  1. If the underlying Stream (river stream) has an error (remote host disconnects)
  2. If the stop() method is called in the Manager

Todo ✅

  • On stop()

    • Implementation
    • Unittest
  • On Stream read failure (would be BClient actually)

    • Implementation
    • Unittest

Working on this now, I should be able to finish this before the next weekend.

@deavmi
Copy link
Owner Author

deavmi commented Nov 26, 2023

I think I have the unblcoking of dequeue() when the manager is stop()'d working

- Hoisted common imports into a `version(unittest)`

Watcher (unittests)

- Added a unittest for testing `stop()` to unblock `dequeue()`s
- Don't call `stop()` on the `Manager` when exiting the loop
- Added new member `WATCHER_FAILED`
- Updated documentation for `stop()`
- Added `stop(ErrorType)` which will let you set the error type used for unblocking the `dequeue()` calls
- Added `stop_FailedWatcher()` which sets the `ErrorType` to `WATCHER_FAILED`
- `shutdownAllQueues()` now takes in a `ErrorType`
- When we exit the read loop (upon an error) call `stop_FailedWatcher()` on the associated `Manager`

Watcher (unittests)

- Added a unit test for the above test case
@deavmi
Copy link
Owner Author

deavmi commented Nov 26, 2023

Also got this working for a failing remote end point now

- Added a note about something we ought to fix
- Added a TODO
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
bug Something isn't working enhancement New feature or request
Projects
None yet
Development

Successfully merging this pull request may close these issues.

1 participant