From 93e1c000e39ea0cfbc14e53a47c4a715736a8579 Mon Sep 17 00:00:00 2001 From: Thomas Leonard Date: Wed, 4 Sep 2024 14:18:40 +0100 Subject: [PATCH] eio_linux: allow alloc_fixed_or_wait to be cancelled --- lib_eio/core/fiber.ml | 2 +- lib_eio/core/single_waiter.ml | 31 ++++++++++++-------- lib_eio/core/single_waiter.mli | 25 ++++++++++++++++ lib_eio/core/switch.ml | 2 +- lib_eio_linux/low_level.ml | 19 +++++++----- lib_eio_linux/sched.ml | 6 ++-- lib_eio_linux/tests/test.ml | 53 +++++++++++++++++++++++++++------- 7 files changed, 104 insertions(+), 34 deletions(-) create mode 100644 lib_eio/core/single_waiter.mli diff --git a/lib_eio/core/fiber.ml b/lib_eio/core/fiber.ml index a06f8f357..3113e8ccc 100644 --- a/lib_eio/core/fiber.ml +++ b/lib_eio/core/fiber.ml @@ -226,7 +226,7 @@ module List = struct let release t = t.free_fibers <- t.free_fibers + 1; - if t.free_fibers = 1 then Single_waiter.wake t.cond (Ok ()) + if t.free_fibers = 1 then Single_waiter.wake_if_sleeping t.cond let use t fn x = await_free t; diff --git a/lib_eio/core/single_waiter.ml b/lib_eio/core/single_waiter.ml index 6cc823eb6..dfd72d887 100644 --- a/lib_eio/core/single_waiter.ml +++ b/lib_eio/core/single_waiter.ml @@ -1,25 +1,32 @@ -(* Allows a single fiber to wait to be notified by another fiber in the same domain. - If multiple fibers need to wait at once, or the notification comes from another domain, - this can't be used. *) +type 'a state = + | Running + | Sleeping of (('a, exn) result -> unit) -type 'a t = { - mutable wake : ('a, exn) result -> unit; -} +type 'a t = 'a state ref -let create () = { wake = ignore } +let create () = ref Running -let wake t v = t.wake v +let wake t v = + match !t with + | Running -> false + | Sleeping fn -> + t := Running; + fn v; + true + +let wake_if_sleeping t = + ignore (wake t (Ok ()) : bool) let await t op id = let x = Suspend.enter op @@ fun ctx enqueue -> Cancel.Fiber_context.set_cancel_fn ctx (fun ex -> - t.wake <- ignore; + t := Running; enqueue (Error ex) ); - t.wake <- (fun x -> + t := Sleeping (fun x -> Cancel.Fiber_context.clear_cancel_fn ctx; - t.wake <- ignore; + t := Running; enqueue x ) in @@ -29,7 +36,7 @@ let await t op id = let await_protect t op id = let x = Suspend.enter_unchecked op @@ fun _ctx enqueue -> - t.wake <- (fun x -> t.wake <- ignore; enqueue x) + t := Sleeping (fun x -> t := Running; enqueue x) in Trace.get id; x diff --git a/lib_eio/core/single_waiter.mli b/lib_eio/core/single_waiter.mli new file mode 100644 index 000000000..6e0047eb8 --- /dev/null +++ b/lib_eio/core/single_waiter.mli @@ -0,0 +1,25 @@ +(** Allows a single fiber to wait to be notified by another fiber in the same domain. + If multiple fibers need to wait at once, or the notification comes from another domain, + this can't be used. *) + +type 'a t +(** A handle representing a fiber that might be sleeping. + It is either in the Running or Sleeping state. *) + +val create : unit -> 'a t +(** [create ()] is a new waiter, initialling in the Running state. *) + +val wake : 'a t -> ('a, exn) result -> bool +(** [wake t v] resumes [t]'s fiber with value [v] and returns [true] if it was sleeping. + If [t] is Running then this just returns [false]. *) + +val wake_if_sleeping : unit t -> unit +(** [wake_if_sleeping] is [ignore (wake t (Ok ()))]. *) + +val await : 'a t -> string -> Trace.id -> 'a +(** [await t op id] suspends the calling fiber, changing [t]'s state to Sleeping. + If the fiber is cancelled, a cancel exception is raised. + [op] and [id] are used for tracing. *) + +val await_protect : 'a t -> string -> Trace.id -> 'a +(** [await_protect] is like {!await}, but the sleep cannot be cancelled. *) diff --git a/lib_eio/core/switch.ml b/lib_eio/core/switch.ml index 4cf13b129..f9bde2a15 100644 --- a/lib_eio/core/switch.ml +++ b/lib_eio/core/switch.ml @@ -72,7 +72,7 @@ let dec_fibers t = if t.daemon_fibers > 0 && t.fibers = t.daemon_fibers then Cancel.cancel t.cancel Exit; if t.fibers = 0 then - Single_waiter.wake t.waiter (Ok ()) + Single_waiter.wake_if_sleeping t.waiter let with_op t fn = inc_fibers t; diff --git a/lib_eio_linux/low_level.ml b/lib_eio_linux/low_level.ml index 6967fb80a..42e72bfa3 100644 --- a/lib_eio_linux/low_level.ml +++ b/lib_eio_linux/low_level.ml @@ -226,15 +226,20 @@ let alloc_fixed_or_wait () = | exception Uring.Region.No_space -> let id = Eio.Private.Trace.mint_id () in let trigger = Eio.Private.Single_waiter.create () in - Queue.push trigger s.mem_q; - (* todo: remove protect; but needs to remove from queue on cancel *) - Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id - -let free_fixed buf = + let node = Lwt_dllist.add_r trigger s.mem_q in + try + Eio.Private.Single_waiter.await trigger "alloc_fixed_or_wait" id + with ex -> + Lwt_dllist.remove node; + raise ex + +let rec free_fixed buf = let s = Sched.get () in - match Queue.take_opt s.mem_q with + match Lwt_dllist.take_opt_l s.mem_q with | None -> Uring.Region.free buf - | Some k -> Eio.Private.Single_waiter.wake k (Ok buf) + | Some k -> + if not (Eio.Private.Single_waiter.wake k (Ok buf)) then + free_fixed buf (* [k] was already cancelled, but not yet removed from the queue *) let splice src ~dst ~len = Fd.use_exn "splice-src" src @@ fun src -> diff --git a/lib_eio_linux/sched.ml b/lib_eio_linux/sched.ml index 7d9e4b27b..80fc63f4a 100644 --- a/lib_eio_linux/sched.ml +++ b/lib_eio_linux/sched.ml @@ -50,7 +50,7 @@ type t = { uring: io_job Uring.t; mem: Uring.Region.t option; io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *) - mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t; + mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Lwt_dllist.t; (* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *) run_q : runnable Lf_queue.t; @@ -247,7 +247,7 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] = ) else if timeout = None && Uring.active_ops uring = 0 then ( (* Nothing further can happen at this point. If there are no events in progress but also still no memory available, something has gone wrong! *) - assert (Queue.length mem_q = 0); + assert (Lwt_dllist.length mem_q = 0); Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *) `Exit_scheduler ) else ( @@ -536,7 +536,7 @@ let with_sched ?(fallback=no_fallback) config fn = Lf_queue.push run_q IO; let sleep_q = Zzz.create () in let io_q = Queue.create () in - let mem_q = Queue.create () in + let mem_q = Lwt_dllist.create () in with_eventfd @@ fun eventfd -> let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool } diff --git a/lib_eio_linux/tests/test.ml b/lib_eio_linux/tests/test.ml index 7c7e31aad..7c0fcfa1e 100644 --- a/lib_eio_linux/tests/test.ml +++ b/lib_eio_linux/tests/test.ml @@ -211,19 +211,52 @@ let test_signal_race () = (fun () -> Eio.Condition.await_no_mutex cond) (fun () -> ignore (Unix.setitimer ITIMER_REAL { it_interval = 0.; it_value = 0.001 } : Unix.interval_timer_status)) +let test_alloc_fixed_or_wait () = + Eio_linux.run ~n_blocks:1 @@ fun _env -> + let block = Eio_linux.Low_level.alloc_fixed_or_wait () in + (* We have to wait for the block, but get cancelled while waiting. *) + begin + try + Fiber.both + (fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ())) + (fun () -> raise Exit); + with Exit -> () + end; + (* We have to wait for the block, and get it when the old one is freed. *) + Fiber.both + (fun () -> + let x = Eio_linux.Low_level.alloc_fixed_or_wait () in + Eio_linux.Low_level.free_fixed x + ) + (fun () -> + Eio_linux.Low_level.free_fixed block + ); + (* We old block is passed to the waiting fiber, but it's cancelled. *) + let block = Eio_linux.Low_level.alloc_fixed_or_wait () in + Fiber.both + (fun () -> + Fiber.first + (fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ()); assert false) + (fun () -> ()) + ) + (fun () -> Eio_linux.Low_level.free_fixed block); + let block = Eio_linux.Low_level.alloc_fixed_or_wait () in + Eio_linux.Low_level.free_fixed block + let () = let open Alcotest in run "eio_linux" [ "io", [ - test_case "copy" `Quick test_copy; - test_case "direct_copy" `Quick test_direct_copy; - test_case "poll_add" `Quick test_poll_add; - test_case "poll_add_busy" `Quick test_poll_add_busy; - test_case "iovec" `Quick test_iovec; - test_case "no_sqe" `Quick test_no_sqe; - test_case "read_exact" `Quick test_read_exact; - test_case "expose_backend" `Quick test_expose_backend; - test_case "statx" `Quick test_statx; - test_case "signal_race" `Quick test_signal_race; + test_case "copy" `Quick test_copy; + test_case "direct_copy" `Quick test_direct_copy; + test_case "poll_add" `Quick test_poll_add; + test_case "poll_add_busy" `Quick test_poll_add_busy; + test_case "iovec" `Quick test_iovec; + test_case "no_sqe" `Quick test_no_sqe; + test_case "read_exact" `Quick test_read_exact; + test_case "expose_backend" `Quick test_expose_backend; + test_case "statx" `Quick test_statx; + test_case "signal_race" `Quick test_signal_race; + test_case "alloc-fixed-or-wait" `Quick test_alloc_fixed_or_wait; ]; ]