Skip to content

Commit

Permalink
eio_linux: allow alloc_fixed_or_wait to be cancelled
Browse files Browse the repository at this point in the history
  • Loading branch information
talex5 committed Sep 4, 2024
1 parent 8b27ef4 commit 0f48f27
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 17 deletions.
11 changes: 7 additions & 4 deletions lib_eio_linux/low_level.ml
Original file line number Diff line number Diff line change
Expand Up @@ -226,13 +226,16 @@ let alloc_fixed_or_wait () =
| exception Uring.Region.No_space ->
let id = Eio.Private.Trace.mint_id () in
let trigger = Eio.Private.Single_waiter.create () in
Queue.push trigger s.mem_q;
(* todo: remove protect; but needs to remove from queue on cancel *)
Eio.Private.Single_waiter.await_protect trigger "alloc_fixed_or_wait" id
let node = Lwt_dllist.add_r trigger s.mem_q in
try
Eio.Private.Single_waiter.await trigger "alloc_fixed_or_wait" id
with ex ->
Lwt_dllist.remove node;
raise ex

let free_fixed buf =
let s = Sched.get () in
match Queue.take_opt s.mem_q with
match Lwt_dllist.take_opt_l s.mem_q with
| None -> Uring.Region.free buf
| Some k -> Eio.Private.Single_waiter.wake k (Ok buf)

Expand Down
6 changes: 3 additions & 3 deletions lib_eio_linux/sched.ml
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type t = {
uring: io_job Uring.t;
mem: Uring.Region.t option;
io_q: (t -> unit) Queue.t; (* waiting for room on [uring] *)
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Queue.t;
mem_q : Uring.Region.chunk Eio.Private.Single_waiter.t Lwt_dllist.t;

(* The queue of runnable fibers ready to be resumed. Note: other domains can also add work items here. *)
run_q : runnable Lf_queue.t;
Expand Down Expand Up @@ -247,7 +247,7 @@ let rec schedule ({run_q; sleep_q; mem_q; uring; _} as st) : [`Exit_scheduler] =
) else if timeout = None && Uring.active_ops uring = 0 then (
(* Nothing further can happen at this point.
If there are no events in progress but also still no memory available, something has gone wrong! *)
assert (Queue.length mem_q = 0);
assert (Lwt_dllist.length mem_q = 0);
Lf_queue.close st.run_q; (* Just to catch bugs if something tries to enqueue later *)
`Exit_scheduler
) else (
Expand Down Expand Up @@ -536,7 +536,7 @@ let with_sched ?(fallback=no_fallback) config fn =
Lf_queue.push run_q IO;
let sleep_q = Zzz.create () in
let io_q = Queue.create () in
let mem_q = Queue.create () in
let mem_q = Lwt_dllist.create () in
with_eventfd @@ fun eventfd ->
let thread_pool = Eio_unix.Private.Thread_pool.create ~sleep_q in
fn { mem; uring; run_q; io_q; mem_q; eventfd; need_wakeup = Atomic.make false; sleep_q; thread_pool }
Expand Down
42 changes: 32 additions & 10 deletions lib_eio_linux/tests/test.ml
Original file line number Diff line number Diff line change
Expand Up @@ -211,19 +211,41 @@ let test_signal_race () =
(fun () -> Eio.Condition.await_no_mutex cond)
(fun () -> ignore (Unix.setitimer ITIMER_REAL { it_interval = 0.; it_value = 0.001 } : Unix.interval_timer_status))

let test_alloc_fixed_or_wait () =
Eio_linux.run ~n_blocks:1 @@ fun _env ->
let block = Eio_linux.Low_level.alloc_fixed_or_wait () in
(* We have to wait for the block, but get cancelled while waiting. *)
begin
try
Fiber.both
(fun () -> ignore (Eio_linux.Low_level.alloc_fixed_or_wait ()))
(fun () -> raise Exit);
with Exit -> ()
end;
(* We have to wait for the block, and get it when the old one is freed. *)
Fiber.both
(fun () ->
let x = Eio_linux.Low_level.alloc_fixed_or_wait () in
Eio_linux.Low_level.free_fixed x
)
(fun () ->
Eio_linux.Low_level.free_fixed block
)

let () =
let open Alcotest in
run "eio_linux" [
"io", [
test_case "copy" `Quick test_copy;
test_case "direct_copy" `Quick test_direct_copy;
test_case "poll_add" `Quick test_poll_add;
test_case "poll_add_busy" `Quick test_poll_add_busy;
test_case "iovec" `Quick test_iovec;
test_case "no_sqe" `Quick test_no_sqe;
test_case "read_exact" `Quick test_read_exact;
test_case "expose_backend" `Quick test_expose_backend;
test_case "statx" `Quick test_statx;
test_case "signal_race" `Quick test_signal_race;
test_case "copy" `Quick test_copy;
test_case "direct_copy" `Quick test_direct_copy;
test_case "poll_add" `Quick test_poll_add;
test_case "poll_add_busy" `Quick test_poll_add_busy;
test_case "iovec" `Quick test_iovec;
test_case "no_sqe" `Quick test_no_sqe;
test_case "read_exact" `Quick test_read_exact;
test_case "expose_backend" `Quick test_expose_backend;
test_case "statx" `Quick test_statx;
test_case "signal_race" `Quick test_signal_race;
test_case "alloc-fixed-or-wait" `Quick test_alloc_fixed_or_wait;
];
]

0 comments on commit 0f48f27

Please sign in to comment.