-
Notifications
You must be signed in to change notification settings - Fork 72
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Feature request: Stream select #577
Comments
Can you implement that in terms of |
That's clever! I hadn't thought of this at all, but it obviously makes the implementation trivial. I will try to prepare a PR to see if it is met with interest. Unfortunately the use case of |
Something like this needs to be atomic, which The easy solution is to use kcas queues, which can be composed easily. However, it would also be nice to have direct support in Eio.Stream too. The general idea is that you create an Atomic to hold the result and then CAS that to the value. There are two implementations of Stream, depending on whether it is zero-capacity or not. For the zero-capacity case (sync.ml) there is already support for rejecting a value (causing the producer to retry with the next client). For the non-zero-capacity case, you have a lock and can just decide not to pop the value, I think. |
Thank you for the quick reply! I've been looking around a bit, mostly out of interest, and some questions came up. Keep in mind that it's likely I'm just missing your point here.
I've interpreted this to mean that, in order to keep within the framework that there is, we'd still have one fiber for each stream, but an additional atomic to make sure exactly one item is returned and no item is lost. For now, I'm only focused on the (* WARNING: this is NOT real code, it WILL lead to deadlocks *)
let select streams =
let result = Atomic.make None in
(* compare_and_set works for options, I guess? At least as long as we're comparing None to None. *)
let place_result r = Atomic.compare_and_set result None r in
let wait_for stream () = begin
let item = take stream in
if place_result item
then () (* This channel was the first to receive an item. Return. *)
(* This channel was not first. Place item back into stream. *)
(* This will block this fiber indefinitely f the stream
has been filled up in the meantime and there are no other readers! *)
else add stream item
end in
let spawn_fibers sw = Fiber.any (List.map (fun stream -> fun () -> Fiber.fork ~sw (wait_for stream)) streams) in
Switch.run spawn_fibers;
Atomic.get result
It appears that this is tricky to implement. One way to extend the code above is to implement a At that point, the item is already in our hands, and we have to do something with it. Assuming another stream has already yielded an item and called I've skipped the |
No, there shouldn't be a need for any extra fibers. The taking fiber suspends itself and then registers with each stream being watched. When woken, it tries to store the result but if it fails, it rejects the value instead.
Yes, I think this needs to be changed. Probably |
BTW, one reason I haven't tried this already is that we might want to replace the use of Waiters with Cells here first anyway. |
Thank you very much for your work on eio! I've thoroughly enjoyed working with it, despite being a newcomer to OCaml. Eio feels more advanced than the async frameworks in other languages I've used.
So far, I have been missing one feature: Waiting on multiple streams, which in languages like Go or Rust is called select. I've taken a look at the source code, but as a newbie I didn't want to charge ahead with some untenable solution (and besides, any possible implementation appeared non-trivial to me).
I don't have a specific API in mind, but something simple like
to select between different channels producing items of the same type and permitting per-stream handling, or
for selecting between exactly two streams may already prove to be useful. I'd love to hear what you think.
The text was updated successfully, but these errors were encountered: