Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add isolated views to EventLoop, Promise, and Future #2969

Open
wants to merge 11 commits into
base: main
Choose a base branch
from
2 changes: 1 addition & 1 deletion Sources/NIOCore/AsyncChannel/AsyncChannelHandler.swift
Original file line number Diff line number Diff line change
Expand Up @@ -188,7 +188,7 @@ extension NIOAsyncChannelHandler: ChannelInboundHandler {
// We are making sure to be on our event loop so we can safely use self in whenComplete
channelReadTransformation(unwrapped)
.hop(to: context.eventLoop)
.assumeIsolated()
.assumeIsolatedUnsafeUnchecked()
.whenComplete { result in
switch result {
case .success:
Expand Down
24 changes: 13 additions & 11 deletions Sources/NIOCore/ChannelHandlers.swift
Original file line number Diff line number Diff line change
Expand Up @@ -114,7 +114,7 @@ public final class AcceptBackoffHandler: ChannelDuplexHandler, RemovableChannelH
}

private func scheduleRead(at: NIODeadline, context: ChannelHandlerContext) {
self.scheduledRead = context.eventLoop.assumeIsolated().scheduleTask(deadline: at) {
self.scheduledRead = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(deadline: at) {
self.doRead(context)
}
}
Expand Down Expand Up @@ -252,7 +252,9 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl
}

let writePromise = promise ?? context.eventLoop.makePromise()
writePromise.futureResult.assumeIsolated().whenComplete { (_: Result<Void, Error>) in
writePromise.futureResult.hop(
to: context.eventLoop
).assumeIsolatedUnsafeUnchecked().whenComplete { (_: Result<Void, Error>) in
self.lastWriteCompleteTime = .now()
}
context.write(data, promise: writePromise)
Expand All @@ -272,7 +274,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl
}

if self.reading {
self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask(
self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
in: timeout,
self.makeReadTimeoutTask(context, timeout)
)
Expand All @@ -282,15 +284,15 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl
let diff = .now() - self.lastReadTime
if diff >= timeout {
// Reader is idle - set a new timeout and trigger an event through the pipeline
self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask(
self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
in: timeout,
self.makeReadTimeoutTask(context, timeout)
)

context.fireUserInboundEventTriggered(IdleStateEvent.read)
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask(
self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
deadline: self.lastReadTime + timeout,
self.makeReadTimeoutTask(context, timeout)
)
Expand All @@ -309,15 +311,15 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl

if diff >= timeout {
// Writer is idle - set a new timeout and notify the callback.
self.scheduledWriterTask = context.eventLoop.assumeIsolated().scheduleTask(
self.scheduledWriterTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
in: timeout,
self.makeWriteTimeoutTask(context, timeout)
)

context.fireUserInboundEventTriggered(IdleStateEvent.write)
} else {
// Write occurred before the timeout - set a new timeout with shorter delay.
self.scheduledWriterTask = context.eventLoop.assumeIsolated().scheduleTask(
self.scheduledWriterTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
deadline: self.lastWriteCompleteTime + timeout,
self.makeWriteTimeoutTask(context, timeout)
)
Expand All @@ -332,7 +334,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl
}

if self.reading {
self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask(
self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
in: timeout,
self.makeAllTimeoutTask(context, timeout)
)
Expand All @@ -345,15 +347,15 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl
let diff = .now() - latestLast
if diff >= timeout {
// Reader is idle - set a new timeout and trigger an event through the pipeline
self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask(
self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
in: timeout,
self.makeAllTimeoutTask(context, timeout)
)

context.fireUserInboundEventTriggered(IdleStateEvent.all)
} else {
// Read occurred before the timeout - set a new timeout with shorter delay.
self.scheduledReaderTask = context.eventLoop.assumeIsolated().scheduleTask(
self.scheduledReaderTask = context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(
deadline: latestLast + timeout,
self.makeAllTimeoutTask(context, timeout)
)
Expand All @@ -367,7 +369,7 @@ public final class IdleStateHandler: ChannelDuplexHandler, RemovableChannelHandl
_ body: @escaping (ChannelHandlerContext, TimeAmount) -> (() -> Void)
) -> Scheduled<Void>? {
if let timeout = amount {
return context.eventLoop.assumeIsolated().scheduleTask(in: timeout, body(context, timeout))
return context.eventLoop.assumeIsolatedUnsafeUnchecked().scheduleTask(in: timeout, body(context, timeout))
}
return nil
}
Expand Down
12 changes: 6 additions & 6 deletions Sources/NIOCore/ChannelPipeline.swift
Original file line number Diff line number Diff line change
Expand Up @@ -472,10 +472,10 @@ public final class ChannelPipeline: ChannelInvoker {
let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self)

if self.eventLoop.inEventLoop {
promise.assumeIsolated().completeWith(self.contextSync(handler: handler))
promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(handler: handler))
} else {
self.eventLoop.execute {
promise.assumeIsolated().completeWith(self.contextSync(handler: handler))
promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(handler: handler))
}
}

Expand All @@ -501,10 +501,10 @@ public final class ChannelPipeline: ChannelInvoker {
let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self)

if self.eventLoop.inEventLoop {
promise.assumeIsolated().completeWith(self.contextSync(name: name))
promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(name: name))
} else {
self.eventLoop.execute {
promise.assumeIsolated().completeWith(self.contextSync(name: name))
promise.assumeIsolatedUnsafeUnchecked().completeWith(self.contextSync(name: name))
}
}

Expand Down Expand Up @@ -534,10 +534,10 @@ public final class ChannelPipeline: ChannelInvoker {
let promise = self.eventLoop.makePromise(of: ChannelHandlerContext.self)

if self.eventLoop.inEventLoop {
promise.assumeIsolated().completeWith(self._contextSync(handlerType: handlerType))
promise.assumeIsolatedUnsafeUnchecked().completeWith(self._contextSync(handlerType: handlerType))
} else {
self.eventLoop.execute {
promise.assumeIsolated().completeWith(self._contextSync(handlerType: handlerType))
promise.assumeIsolatedUnsafeUnchecked().completeWith(self._contextSync(handlerType: handlerType))
}
}

Expand Down
2 changes: 1 addition & 1 deletion Sources/NIOCore/Codec.swift
Original file line number Diff line number Diff line change
Expand Up @@ -749,7 +749,7 @@ extension ByteToMessageHandler: RemovableChannelHandler {
public func removeHandler(context: ChannelHandlerContext, removalToken: ChannelHandlerContext.RemovalToken) {
precondition(self.removalState == .notBeingRemoved)
self.removalState = .removalStarted
context.eventLoop.assumeIsolated().execute {
context.eventLoop.assumeIsolatedUnsafeUnchecked().execute {
self.processLeftovers(context: context)
assert(!self.state.isLeftoversNeedProcessing, "illegal state: \(self.state)")
switch self.removalState {
Expand Down
74 changes: 60 additions & 14 deletions Sources/NIOCore/Docs.docc/loops-futures-concurrency.md
Original file line number Diff line number Diff line change
Expand Up @@ -138,10 +138,11 @@ guaranteed to fire on the same isolation domain as the ``ChannelHandlerContext``
of data race is present. However, Swift Concurrency cannot guarantee this at compile time,
as the specific isolation domain is determined only at runtime.

In these contexts, today users can make their callbacks safe using ``NIOLoopBound`` and
``NIOLoopBoundBox``. These values can only be constructed on the event loop, and only allow
access to their values on the same event loop. These constraints are enforced at runtime,
so at compile time these are unconditionally `Sendable`.
In these contexts, users that cannot require NIO 2.77.0 can make their callbacks
safe using ``NIOLoopBound`` and ``NIOLoopBoundBox``. These values can only be
constructed on the event loop, and only allow access to their values on the same
event loop. These constraints are enforced at runtime, so at compile time these are
unconditionally `Sendable`.

> Warning: ``NIOLoopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks
with runtime ones. This makes it possible to introduce crashes in your code. Please
Expand All @@ -150,18 +151,43 @@ so at compile time these are unconditionally `Sendable`.
``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain
before using these types.

> Note: In a future NIO release we intend to improve the ergonomics of this common problem
by offering a related type that can only be created from an ``EventLoopFuture`` on a
given ``EventLoop``. This minimises the number of runtime checks, and will make it
easier and more pleasant to write this kind of code.
From NIO 2.77.0, new types were introduced to make this common problem easier. These types are
``EventLoopFuture/Isolated`` and ``EventLoopPromise/Isolated``. These isolated view types
can only be constructed from an existing Future or Promise, and they can only be constructed
on the ``EventLoop`` to which those futures or promises are bound. Because they are not
`Sendable`, we can be confident that these values never escape the isolation domain in which
they are created, which must be the same isolation domain where the callbacks execute.

As a result, these types can drop the `@Sendable` requirements on all the future closures, and
many of the `Sendable` requirements on the return types from future closures. They can also
drop the `Sendable` requirements from the promise completion functions.

These isolated views can be obtained by calling ``EventLoopFuture/assumeIsolated()`` or
``EventLoopPromise/assumeIsolated()``.

> Warning: ``EventLoopFuture/assumeIsolated()`` and ``EventLoopPromise/assumeIsolated()``
supplement compile-time isolation checks with runtime ones. This makes it possible
to introduce crashes in your code. Please ensure that you are 100% confident that the
isolation domains align. If you are not sure that the ``EventLoopFuture`` or
``EventLoopPromise`` you wish to attach a callback to is bound to your
``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain
before using these types.

> Warning: ``EventLoopFuture/assumeIsolated()`` and ``EventLoopPromise/assumeIsolated()``
**must not** be called from a Swift concurrency context, either an async method or
from within an actor. This is because it uses runtime checking of the event loop
to confirm that the value is not being sent to a different concurrency domain.

When using an ``EventLoop`` as a custom actor executor, this API can be used to retrieve
a value that region based isolation will then allow to be sent to another domain.

## Interacting with Event Loops on the Event Loop

As with Futures, there are occasionally times where it is necessary to schedule
``EventLoop`` operations on the ``EventLoop`` where your code is currently executing.

Much like with ``EventLoopFuture``, you can use ``NIOLoopBound`` and ``NIOLoopBoundBox``
to make these callbacks safe.
Much like with ``EventLoopFuture``, if you need to support NIO versions before 2.77.0
you can use ``NIOLoopBound`` and ``NIOLoopBoundBox`` to make these callbacks safe.

> Warning: ``NIOLoopBound`` and ``NIOLoopBoundBox`` replace compile-time isolation checks
with runtime ones. This makes it possible to introduce crashes in your code. Please
Expand All @@ -170,7 +196,27 @@ to make these callbacks safe.
``EventLoop``, use ``EventLoopFuture/hop(to:)`` to move it to your isolation domain
before using these types.

> Note: In a future NIO release we intend to improve the ergonomics of this common problem
by offering a related type that can only be created from an ``EventLoopFuture`` on a
given ``EventLoop``. This minimises the number of runtime checks, and will make it
easier and more pleasant to write this kind of code.
From NIO 2.77.0, a new type was introduced to make this common problem easier. This type is
``NIOIsolatedEventLoop``. This isolated view type can only be constructed from an existing
``EventLoop``, and it can only be constructed on the ``EventLoop`` that is being wrapped.
Because this type is not `Sendable`, we can be confident that this value never escapes the
isolation domain in which it was created, which must be the same isolation domain where the
callbacks execute.

As a result, this type can drop the `@Sendable` requirements on all the operation closures, and
many of the `Sendable` requirements on the return types from these closures.

This isolated view can be obtained by calling ``EventLoop/assumeIsolated()``.

> Warning: ``EventLoop/assumeIsolated()`` supplements compile-time isolation checks with
runtime ones. This makes it possible to introduce crashes in your code. Please ensure
that you are 100% confident that the isolation domains align. If you are not sure that
the your code is running on the relevant ``EventLoop``, prefer the non-isolated type.

> Warning: ``EventLoop/assumeIsolated()`` **must not** be called from a Swift concurrency
context, either an async method or from within an actor. This is because it uses runtime
checking of the event loop to confirm that the value is not being sent to a different
concurrency domain.

When using an ``EventLoop`` as a custom actor executor, this API can be used to retrieve
a value that region based isolation will then allow to be sent to another domain.
Loading
Loading