-
Notifications
You must be signed in to change notification settings - Fork 12
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Context
semantics for cross-actor-task cancellation and overruns
#357
Open
goodboy
wants to merge
29
commits into
master
Choose a base branch
from
ctx_cancel_semantics_and_overruns
base: master
Could not load branches
Branch not found: {{ refName }}
Loading
Could not load tags
Nothing to show
Loading
Are you sure you want to change the base?
Some commits from the old base branch may be removed from the timeline,
and old review comments may become outdated.
Conversation
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
goodboy
added
enhancement
New feature or request
messaging
messaging patterns and protocols
streaming
cancellation
SC teardown semantics and anti-zombie semantics
labels
Apr 14, 2023
goodboy
force-pushed
the
ctx_cancel_semantics_and_overruns
branch
2 times, most recently
from
May 15, 2023 13:19
a8eb962
to
a4ecc2a
Compare
These will verify new changes to the runtime/messaging core which allows us to adopt an "ignore cancel if requested by us" style handling of `ContextCancelled` more like how `trio` does with `trio.Nursery.cancel_scope.cancel()`. We now expect a `ContextCancelled.canceller: tuple` which is set to the actor uid of the actor which requested the cancellation which eventually resulted in the remote error-msg. Also adds some experimental tweaks to the "backpressure" test which it turns out is very problematic in coordination with context cancellation since blocking on the feed mem chan to some task will block the ipc msg loop and thus handling of cancellation.. More to come to both the test and core to address this hopefully since right now this test is failing.
To handle both remote cancellation this adds `ContextCanceled.canceller: tuple` the uid of the cancel requesting actor and is expected to be set by the runtime when servicing any remote cancel request. This makes it possible for `ContextCancelled` receivers to know whether "their actor runtime" is the source of the cancellation. Also add an explicit `RemoteActor.src_actor_uid` which better formalizes the notion of "which remote actor" the error originated from. Both of these new attrs are expected to be packed in the `.msgdata` when the errors are loaded locally.
Turns out stuff was totally broken in these cases because we're either closing the underlying mem chan too early or not handling the "allow_overruns" mode's cancellation correctly..
This adds remote cancellation semantics to our `tractor.Context` machinery to more closely match that of `trio.CancelScope` but with operational differences to handle the nature of parallel tasks interoperating across multiple memory boundaries: - if an actor task cancels some context it has opened via `Context.cancel()`, the remote (scope linked) task will be cancelled using the normal `CancelScope` semantics of `trio` meaning the remote cancel scope surrounding the far side task is cancelled and `trio.Cancelled`s are expected to be raised in that scope as per normal `trio` operation, and in the case where no error is raised in that remote scope, a `ContextCancelled` error is raised inside the runtime machinery and relayed back to the opener/caller side of the context. - if any actor task cancels a full remote actor runtime using `Portal.cancel_actor()` the same semantics as above apply except every other remote actor task which also has an open context with the actor which was cancelled will also be sent a `ContextCancelled` **but** with the `.canceller` field set to the uid of the original cancel requesting actor. This changeset also includes a more "proper" solution to the issue of "allowing overruns" during streaming without attempting to implement any form of IPC streaming backpressure. Implementing task-granularity backpressure cross-process turns out to be more or less impossible without augmenting out streaming protocol (likely at the cost of performance). Further allowing overruns requires special care since any blocking of the runtime RPC msg loop task effectively can block control msgs such as cancels and stream terminations. The implementation details per abstraction layer are as follows. ._streaming.Context: - add a new contructor factor func `mk_context()` which provides a strictly private init-er whilst allowing us to not have to define an `.__init__()` on the type def. - add public `.cancel_called` and `.cancel_called_remote` properties. - general rename of what was the internal `._backpressure` var to `._allow_overruns: bool`. - move the old contents of `Actor._push_result()` into a new `._deliver_msg()` allowing for better encapsulation of per-ctx msg handling. - always check for received 'error' msgs and process them with the new `_maybe_cancel_and_set_remote_error()` **before** any msg delivery to the local task, thus guaranteeing error and cancellation handling despite any overflow handling. - add a new `._drain_overflows()` task-method for use with new `._allow_overruns: bool = True` mode. - add back a `._scope_nursery: trio.Nursery` (allocated in `Portal.open_context()`) who's sole purpose is to spawn a single task which runs the above method; anything else is an error. - augment `._deliver_msg()` to start a task and run the above method when operating in no overrun mode; the task queues overflow msgs and attempts to send them to the underlying mem chan using a blocking `.send()` call. - on context exit, any existing "drainer task" will be cancelled and remaining overflow queued msgs are discarded with a warning. - rename `._error` -> `_remote_error` and set it in a new method `_maybe_cancel_and_set_remote_error()` which is called before processing - adjust `.result()` to always call `._maybe_raise_remote_err()` at its start such that whenever a `ContextCancelled` arrives we do logic for whether or not to immediately raise that error or ignore it due to the current actor being the one who requested the cancel, by checking the error's `.canceller` field. - set the default value of `._result` to be `id(Context()` thus avoiding conflict with any `.result()` actually being `False`.. ._runtime.Actor: - augment `.cancel()` and `._cancel_task()` and `.cancel_rpc_tasks()` to take a `requesting_uid: tuple` indicating the source actor of every cancellation request. - pass through the new `Context._allow_overruns` through `.get_context()` - call the new `Context._deliver_msg()` from `._push_result()` (since the factoring out that method's contents). ._runtime._invoke: - `TastStatus.started()` back a `Context` (unless an error is raised) instead of the cancel scope to make it easy to set/get state on that context for the purposes of cancellation and remote error relay. - always raise any remote error via `Context._maybe_raise_remote_err()` before doing any `ContextCancelled` logic. - assign any `Context._cancel_called_remote` set by the `requesting_uid` cancel methods (mentioned above) to the `ContextCancelled.canceller`. ._runtime.process_messages: - always pass a `requesting_uid: tuple` to `Actor.cancel()` and `._cancel_task` to that any corresponding `ContextCancelled.canceller` can be set inside `._invoke()`.
This actually caught further runtime bugs so it's gud i tried.. Add overrun-ignore enabled / disabled cases and error catching for all of them. More or less this should cover every possible outcome when it comes to setting `allow_overruns: bool` i hope XD
Because obviously we probably want to support `allow_overruns` on the remote callee side as well XD Only found the bugs fixed in this patch this thanks to writing a much more exhaustive test set for overrun cases B)
Turns out you can get a case where you might be opening multiple ctx-streams concurrently and during the context opening phase you block for all contexts to open, but then when you eventually start opening streams some slow to start context has caused the others become in an overrun state.. so we need to let the caller control whether that's an error ;) This also needs a test!
goodboy
force-pushed
the
ctx_cancel_semantics_and_overruns
branch
from
May 15, 2023 14:00
a4ecc2a
to
ead9e41
Compare
goodboy
added a commit
to pikers/piker
that referenced
this pull request
May 17, 2023
Requires goodboy/tractor#357. Avoid overruns when doing concurrent live feed init over multiple brokers.
- `Context._cancel_called_remote` -> `._cancelled_remote` since "called" implies the cancellation was "requested" when it could be due to another error and the actor uid is the value - only set once the far end task scope is terminated due to either error or cancel, which has nothing to do with *what* caused the cancellation. - `Actor._cancel_called_remote` -> `._cancel_called_by_remote` which emphasizes that this variable is **only set** IFF some remote actor **requested that** this actor's runtime be cancelled via `Actor.cancel()`.
51 tasks
goodboy
added a commit
that referenced
this pull request
Oct 18, 2023
Tests that appropriate `Context` exit state, the relay of a `ContextCancelled` error and its `.canceller: tuple[str, str]` value are set when an inter-peer cancellation happens via an "out of band" request method (in this case using `Portal.cancel_actor()` and that cancellation is propagated "horizontally" to other peers. Verify that any such cancellation scenario which also experiences an "error during `ContextCancelled` handling" DOES NOT result in that further error being suppressed and that the user's exception bubbles out of the `Context.open_context()` block(s) appropriately! Likely more tests to come as well as some factoring of the teardown state checks where possible. Pertains to serious testing the major work landing in #357
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Labels
cancellation
SC teardown semantics and anti-zombie semantics
enhancement
New feature or request
messaging
messaging patterns and protocols
streaming
Add this suggestion to a batch that can be applied as a single commit.
This suggestion is invalid because no changes were made to the code.
Suggestions cannot be applied while the pull request is closed.
Suggestions cannot be applied while viewing a subset of changes.
Only one suggestion per line can be applied in a batch.
Add this suggestion to a batch that can be applied as a single commit.
Applying suggestions on deleted lines is not supported.
You must change the existing code in this line in order to create a valid suggestion.
Outdated suggestions cannot be applied.
This suggestion has been applied or marked resolved.
Suggestions cannot be applied from pending reviews.
Suggestions cannot be applied on multi-line comments.
Suggestions cannot be applied while the pull request is queued to merge.
Suggestion cannot be applied right now. Please check back later.
Finally!
This is a rigorous rework of our
Context
andContextCancelled
semantics to be more like
trio.CancelScope
for cross-actor linked tasks!
High level mechanics:
calling
Actor.cancel()
or the internalActor._cancel_stask()
(whichis called by
Context.cancel()
), that requesting actor'suid is set on the target far-end
Context._cancel_called_remote
andany eventually raised
ContextCancelled.canceller: tuple
.ContextCancelled
error-msg tocheck if they were the cancellation "requester", in which case the
error is not raised locally and instead the cancellation is
"caught" (and silently ignored inside the
Portal.open_context()
block, and possibly
@tractor.context
callee func) just like normaltrio
cancel scopes: if you callcs.cancel()
inside some scope ornursery block.
ContextCancelled.canceller != current_actor().uid
,then the error is raised thus making it explicit that the
cancellation was not anticipated by the cancel-receiving (actor) task
and thus can be subsequently handled as an (unanticipated) remote
cancellation.
Also included is new support and public API for allowing contexts (and
their streams) to "allow overruns" (since backpressure isn't really
possible without a msg/transport protocol extension): enabling the case
where some sender is pushing msgs faster then the other side is
receiving them and no error is received on the sender side.
Normally (and by default) the rx side will receive (via
RemoteActorError
msg) and subsequently raise aStreamOverrun
that'sbeen relayed from the sender. The receiver then can decide how to the
handle the condition; previously any sent msg delivered that caused an
overrun would be discarded.
Instead we now offer a
allow_overruns: bool
to theContext.open_stream()
API which adds a new mode with the followingfunctionality:
task in the
Context._scope_nursery: trio.Nursery
which runsContext._drain_overflows()
(not convinced of naming yet btw 😂):await self._send_chan(msg)
for every queued up overrun-condition msg(stored in the new
Context._overflow_q: deque
) in a blockingfashion but without blocking the RPC msg
loop.
._in_overrun: bool
is already set we presume the task isalready running and instead new overflow msgs are queued and
a warning log msg emitted.
Context
exit any existing drainer task is cancelled and lingeringmsg discarded but reported in a warning log msg.
Details of implementation:
Context
related code to a new._context
mod.and overruns (ignoring) cases.
ContextCancelled.canceller: tuple[str, str]
.Context.cancel_called
,.cancelled_caught
and.cancel_called_remote
properties.Context._deliver_msg()
as the factored content from what wasActor._push_result()
which now calls the former.mk_context() -> Context
factory.Possibly still todo:
no_overruns
orignore_overruns
or something elsea better var name?
StreamOverrun
msg content?Context.result()
returnNone
if the default is never set,more like a function that has no return?
._debug.breakpoint()
comments from._context.py
?cs.shield = True
that was insideContext.cancel()
?Context._send_chan.aclose()
calls (currently commented out)?@dataclass
forContext
and instead gomsgspec.Struct
?error traceback to cancelled context tasks which were not the cause of
their own cancellation?
ContextCancelled.canceller != current_actor().uid
case where it might be handy to know why, for eg., some parent actor
cancelled some other child that you were attached to, instead of just
seeing that it was cancelled by someone else?
.canceller
set?piker
err
toContext._maybe_raise_remote_err()