Skip to content

Commit

Permalink
eio_linux: allow alloc_fixed_or_wait to be cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
talex5 committed Sep 5, 2024
1 parent 0f6b65d commit 93e1c00
Show file tree
Hide file tree
Showing 7 changed files with 104 additions and 34 deletions.
2 changes: 1 addition & 1 deletion lib_eio/core/fiber.ml
Original file line number Diff line number Diff line change
Expand Up @@ -226,7 +226,7 @@ module List = struct

let release t =
t.free_fibers <- t.free_fibers + 1;
if t.free_fibers = 1 then Single_waiter.wake t.cond (Ok ())
if t.free_fibers = 1 then Single_waiter.wake_if_sleeping t.cond

let use t fn x =
await_free t;
Expand Down
31 changes: 19 additions & 12 deletions lib_eio/core/single_waiter.ml
Original file line number Diff line number Diff line change
@@ -1,25 +1,32 @@
(* Allows a single fiber to wait to be notified by another fiber in the same domain.
If multiple fibers need to wait at once, or the notification comes from another domain,
this can't be used. *)
type 'a state =
| Running
| Sleeping of (('a, exn) result -> unit)

type 'a t = {
mutable wake : ('a, exn) result -> unit;
}
type 'a t = 'a state ref

let create () = { wake = ignore }
let create () = ref Running

let wake t v = t.wake v
let wake t v =
match !t with
| Running -> false
| Sleeping fn ->
t := Running;
fn v;
true

let wake_if_sleeping t =
ignore (wake t (Ok ()) : bool)

let await t op id =
let x =
Suspend.enter op @@ fun ctx enqueue ->
Cancel.Fiber_context.set_cancel_fn ctx (fun ex ->
t.wake <- ignore;
t := Running;
enqueue (Error ex)
);
t.wake <- (fun x ->
t := Sleeping (fun x ->
Cancel.Fiber_context.clear_cancel_fn ctx;
t.wake <- ignore;
t := Running;
enqueue x
)
in
Expand All @@ -29,7 +36,7 @@ let await t op id =
let await_protect t op id =
let x =
Suspend.enter_unchecked op @@ fun _ctx enqueue ->
t.wake <- (fun x -> t.wake <- ignore; enqueue x)
t := Sleeping (fun x -> t := Running; enqueue x)
in
Trace.get id;
x
25 changes: 25 additions & 0 deletions lib_eio/core/single_waiter.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
(** Allows a single fiber to wait to be notified by another fiber in the same domain.
If multiple fibers need to wait at once, or the notification comes from another domain,
this can't be used. *)

type 'a t
(** A handle representing a fiber that might be sleeping.
It is either in the Running or Sleeping state. *)

val create : unit -> 'a t
(** [create ()] is a new waiter, initialling in the Running state. *)

val wake : 'a t -> ('a, exn) result -> bool
(** [wake t v] resumes [t]'s fiber with value [v] and returns [true] if it was sleeping.
If [t] is Running then this just returns [false]. *)

val wake_if_sleeping : unit t -> unit
(** [wake_if_sleeping] is [ignore (wake t (Ok ()))]. *)

val await : 'a t -> string -> Trace.id -> 'a
(** [await t op id] suspends the calling fiber, changing [t]'s state to Sleeping.
If the fiber is cancelled, a cancel exception is raised.
[op] and [id] are used for tracing. *)

val await_protect : 'a t -> string -> Trace.id -> 'a
(** [await_protect] is like {!await}, but the sleep cannot be cancelled. *)
2 changes: 1 addition & 1 deletion lib_eio/core/switch.ml
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ let dec_fibers t =
if t.daemon_fibers > 0 && t.fibers = t.daemon_fibers then
Cancel.cancel t.cancel Exit;
if t.fibers = 0 then
Single_waiter.wake t.waiter (Ok ())
Single_waiter.wake_if_sleeping t.waiter

let with_op t fn =
inc_fibers t;
Expand Down
19 changes: 12 additions & 7 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -226,15 +226,20 @@ let alloc_fixed_or_wait () =
| exception Uring.Region.No_space ->
let id = Eio.Private.Trace.mint_id () in
let trigger = Eio.Private.Single_waiter.create () in
Queue.push trigger s.mem_q;
(* todo: remove protect; but needs to remove from queue on cancel *)
Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id

let free_fixed buf =
let node = Lwt_dllist.add_r trigger s.mem_q in
try
Eio.Private.Single_waiter.await trigger "alloc_fixed_or_wait" id
with ex ->
Lwt_dllist.remove node;
raise ex

let rec free_fixed buf =
let s = Sched.get () in
match Queue.take_opt s.mem_q with
match Lwt_dllist.take_opt_l s.mem_q with
| None -> Uring.Region.free buf
| Some k -> Eio.Private.Single_waiter.wake k (Ok buf)
| Some k ->
if not (Eio.Private.Single_waiter.wake k (Ok buf)) then
free_fixed buf (* [k] was already cancelled, but not yet removed from the queue *)

let splice src ~dst ~len =
Fd.use_exn "splice-src" src @@ fun src ->
Expand Down
6 changes: 3 additions & 3 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type t = {
uring: io_job Uring.t;
mem: Uring.Region.t option;
io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Lwt_dllist.t;

(* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
run_q : runnable Lf_queue.t;
Expand Down Expand Up @@ -247,7 +247,7 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
) else if timeout = None && Uring.active_ops uring = 0 then (
(* Nothing further can happen at this point.
If there are no events in progress but also still no memory available, something has gone wrong! *)
assert (Queue.length mem_q = 0);
assert (Lwt_dllist.length mem_q = 0);
Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *)
`Exit_scheduler
) else (
Expand Down Expand Up @@ -536,7 +536,7 @@ let with_sched ?(fallback=no_fallback) config fn =
Lf_queue.push run_q IO;
let sleep_q = Zzz.create () in
let io_q = Queue.create () in
let mem_q = Queue.create () in
let mem_q = Lwt_dllist.create () in
with_eventfd @@ fun eventfd ->
let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in
fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool }
Expand Down
53 changes: 43 additions & 10 deletions lib_eio_linux/tests/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,52 @@ let test_signal_race () =
(fun () -> Eio.Condition.await_no_mutex cond)
(fun () -> ignore (Unix.setitimer ITIMER_REAL { it_interval = 0.; it_value = 0.001 } : Unix.interval_timer_status))

let test_alloc_fixed_or_wait () =
Eio_linux.run ~n_blocks:1 @@ fun _env ->
let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
(* We have to wait for the block, but get cancelled while waiting. *)
begin
try
Fiber.both
(fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ()))
(fun () -> raise Exit);
with Exit -> ()
end;
(* We have to wait for the block, and get it when the old one is freed. *)
Fiber.both
(fun () ->
let x = Eio_linux.Low_level.alloc_fixed_or_wait () in
Eio_linux.Low_level.free_fixed x
)
(fun () ->
Eio_linux.Low_level.free_fixed block
);
(* We old block is passed to the waiting fiber, but it's cancelled. *)
let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
Fiber.both
(fun () ->
Fiber.first
(fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ()); assert false)
(fun () -> ())
)
(fun () -> Eio_linux.Low_level.free_fixed block);
let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
Eio_linux.Low_level.free_fixed block

let () =
let open Alcotest in
run "eio_linux" [
"io", [
test_case "copy" `Quick test_copy;
test_case "direct_copy" `Quick test_direct_copy;
test_case "poll_add" `Quick test_poll_add;
test_case "poll_add_busy" `Quick test_poll_add_busy;
test_case "iovec" `Quick test_iovec;
test_case "no_sqe" `Quick test_no_sqe;
test_case "read_exact" `Quick test_read_exact;
test_case "expose_backend" `Quick test_expose_backend;
test_case "statx" `Quick test_statx;
test_case "signal_race" `Quick test_signal_race;
test_case "copy" `Quick test_copy;
test_case "direct_copy" `Quick test_direct_copy;
test_case "poll_add" `Quick test_poll_add;
test_case "poll_add_busy" `Quick test_poll_add_busy;
test_case "iovec" `Quick test_iovec;
test_case "no_sqe" `Quick test_no_sqe;
test_case "read_exact" `Quick test_read_exact;
test_case "expose_backend" `Quick test_expose_backend;
test_case "statx" `Quick test_statx;
test_case "signal_race" `Quick test_signal_race;
test_case "alloc-fixed-or-wait" `Quick test_alloc_fixed_or_wait;
];
]

0 comments on commit 93e1c00

Please sign in to comment.