From 6055418f8906b8d6b050aaaa7dcc69aac16f2340 Mon Sep 17 00:00:00 2001 From: Leandro Ostera Date: Mon, 18 Dec 2023 20:16:05 +0100 Subject: [PATCH 1/2] feat: sleep a scheduler if there's no work to do --- riot/runtime/core/proc_queue.ml | 1 + riot/runtime/core/proc_queue.mli | 1 + riot/runtime/net/io.ml | 2 ++ riot/runtime/net/io.mli | 1 + riot/runtime/scheduler/scheduler.ml | 26 +++++++++++++++++++++++++- riot/runtime/scheduler/scheduler.mli | 2 ++ riot/runtime/time/timer_wheel.ml | 1 + riot/runtime/time/timer_wheel.mli | 1 + riot/runtime/util/dashmap.ml | 1 + riot/runtime/util/dashmap.mli | 1 + 10 files changed, 36 insertions(+), 1 deletion(-) diff --git a/riot/runtime/core/proc_queue.ml b/riot/runtime/core/proc_queue.ml index fdc650bf..9983f9d7 100644 --- a/riot/runtime/core/proc_queue.ml +++ b/riot/runtime/core/proc_queue.ml @@ -4,6 +4,7 @@ type t = { alive : Proc_set.t; queue : Process.t Lf_queue.t } let create () = { queue = Lf_queue.create (); alive = Proc_set.create () } let size t = Proc_set.size t.alive +let is_empty t = size t = 0 let queue t proc = if Proc_set.contains t.alive proc then () diff --git a/riot/runtime/core/proc_queue.mli b/riot/runtime/core/proc_queue.mli index 47f3b2b9..89a1c19d 100644 --- a/riot/runtime/core/proc_queue.mli +++ b/riot/runtime/core/proc_queue.mli @@ -2,5 +2,6 @@ type t val create : unit -> t val size : t -> int +val is_empty : t -> bool val queue : t -> Process.t -> unit val next : t -> Process.t option diff --git a/riot/runtime/net/io.ml b/riot/runtime/net/io.ml index 1851b85b..0852e32d 100644 --- a/riot/runtime/net/io.ml +++ b/riot/runtime/net/io.ml @@ -95,6 +95,8 @@ let gc t = is_open); Dashmap.remove_by t.procs (fun (_fd, (proc, _)) -> Process.is_waiting_io proc) +let can_poll t = not (Dashmap.is_empty t.procs) + let poll t fn = gc t; let ready_count = Poll.ppoll_or_poll t.poll t.poll_idx t.poll_timeout in diff --git a/riot/runtime/net/io.mli b/riot/runtime/net/io.mli index bcf0456b..cddc4430 100644 --- a/riot/runtime/net/io.mli +++ b/riot/runtime/net/io.mli @@ -16,6 +16,7 @@ val pp : Format.formatter -> t -> unit val register : t -> Process.t -> [ `r | `rw | `w ] -> Fd.t -> unit val unregister_process : t -> Process.t -> unit val poll : t -> (Process.t * Fd.Mode.t -> unit) -> unit +val can_poll : t -> bool val close : t -> Fd.t -> unit val getaddrinfo : diff --git a/riot/runtime/scheduler/scheduler.ml b/riot/runtime/scheduler/scheduler.ml index 704a5ce6..10c98813 100644 --- a/riot/runtime/scheduler/scheduler.ml +++ b/riot/runtime/scheduler/scheduler.ml @@ -12,6 +12,8 @@ type t = { sleep_set : Proc_set.t; timers : Timer_wheel.t; io_tbl : Io.t; + idle_mutex : Mutex.t; + idle_condition : Condition.t; } type pool = { @@ -32,6 +34,8 @@ module Scheduler = struct sleep_set = Proc_set.create (); io_tbl = Io.create (); timers = Timer_wheel.create (); + idle_mutex = Mutex.create (); + idle_condition = Condition.create (); } let get_current_scheduler, set_current_scheduler = @@ -50,8 +54,10 @@ module Scheduler = struct Timer_wheel.make_timer sch.timers time mode fn let add_to_run_queue sch (proc : Process.t) = + Mutex.protect sch.idle_mutex @@ fun () -> Proc_set.remove sch.sleep_set proc; Proc_queue.queue sch.run_queue proc; + Condition.signal sch.idle_condition; Log.trace (fun f -> f "Adding process to run_queue queue[%d]: %a" (Proc_queue.size sch.run_queue) @@ -239,6 +245,17 @@ module Scheduler = struct while true do if pool.stop then raise_notrace Exit; + Mutex.lock sch.idle_mutex; + while + (not pool.stop) + && Proc_queue.is_empty sch.run_queue + && (not (Timer_wheel.can_tick sch.timers)) + && not (Io.can_poll sch.io_tbl) + do + Condition.wait sch.idle_condition sch.idle_mutex + done; + Mutex.unlock sch.idle_mutex; + for _ = 0 to Int.min (Proc_queue.size sch.run_queue) 10 do match Proc_queue.next sch.run_queue with | Some proc -> @@ -260,8 +277,15 @@ module Pool = struct let get_pool, set_pool = Thread_local.make ~name:"POOL" let shutdown pool = + let rec wake_up_scheduler sch = + if Mutex.try_lock sch.idle_mutex then ( + Condition.signal sch.idle_condition; + Mutex.unlock sch.idle_mutex) + else wake_up_scheduler sch + in Log.trace (fun f -> f "shutdown called"); - pool.stop <- true + pool.stop <- true; + List.iter wake_up_scheduler pool.schedulers let register_process pool _scheduler proc = Proc_table.register_process pool.processes proc diff --git a/riot/runtime/scheduler/scheduler.mli b/riot/runtime/scheduler/scheduler.mli index ad69c1b6..7bd340d3 100644 --- a/riot/runtime/scheduler/scheduler.mli +++ b/riot/runtime/scheduler/scheduler.mli @@ -9,6 +9,8 @@ type t = { sleep_set : Proc_set.t; timers : Time.Timer_wheel.t; io_tbl : Net.Io.t; + idle_mutex : Mutex.t; + idle_condition : Condition.t; } type pool = { diff --git a/riot/runtime/time/timer_wheel.ml b/riot/runtime/time/timer_wheel.ml index 1ac7e0df..b6895b6f 100644 --- a/riot/runtime/time/timer_wheel.ml +++ b/riot/runtime/time/timer_wheel.ml @@ -31,6 +31,7 @@ type t = { } let create () = { timers = Dashmap.create (); last_t = Ptime_clock.now () } +let can_tick t = not (Dashmap.is_empty t.timers) let make_timer t time mode fn = let timer = Timer.make time mode fn in diff --git a/riot/runtime/time/timer_wheel.mli b/riot/runtime/time/timer_wheel.mli index 144b1145..5b874d81 100644 --- a/riot/runtime/time/timer_wheel.mli +++ b/riot/runtime/time/timer_wheel.mli @@ -17,3 +17,4 @@ val make_timer : val ends_at : Ptime.t -> Ptime.span -> Ptime.t val tick : t -> unit +val can_tick : t -> bool diff --git a/riot/runtime/util/dashmap.ml b/riot/runtime/util/dashmap.ml index bb35799b..bcccfee8 100644 --- a/riot/runtime/util/dashmap.ml +++ b/riot/runtime/util/dashmap.ml @@ -6,6 +6,7 @@ let find t k = List.assoc_opt k (entries t) let find_by t fn = List.find_opt fn (entries t) let find_all_by t fn = List.find_all fn (entries t) let has_key t k = find t k |> Option.is_some +let is_empty t = entries t = [] let rec insert t k v = let tbl1 = entries t in diff --git a/riot/runtime/util/dashmap.mli b/riot/runtime/util/dashmap.mli index a6560295..9d6d5b6c 100644 --- a/riot/runtime/util/dashmap.mli +++ b/riot/runtime/util/dashmap.mli @@ -1,6 +1,7 @@ type ('k, 'v) t val create : 'a -> ('b, 'c) t +val is_empty : ('k, 'v) t -> bool val entries : ('a, 'b) t -> ('a * 'b) list val find_by : ('a, 'b) t -> ('a * 'b -> bool) -> ('a * 'b) option val find_all_by : ('a, 'b) t -> ('a * 'b -> bool) -> ('a * 'b) list From 831789bd885c74c0e4e98b8f57ba5f5014ce3308 Mon Sep 17 00:00:00 2001 From: Leandro Ostera Date: Mon, 18 Dec 2023 20:36:49 +0100 Subject: [PATCH 2/2] bench: move spawn_many to bench folder and bump run batch size --- bench/dune | 3 +++ {test => bench}/spawn_many.erl | 0 {test => bench}/spawn_many.ml | 0 riot/runtime/scheduler/scheduler.ml | 2 +- 4 files changed, 4 insertions(+), 1 deletion(-) create mode 100644 bench/dune rename {test => bench}/spawn_many.erl (100%) rename {test => bench}/spawn_many.ml (100%) diff --git a/bench/dune b/bench/dune new file mode 100644 index 00000000..253bd1aa --- /dev/null +++ b/bench/dune @@ -0,0 +1,3 @@ +(executables + (names spawn_many) + (libraries riot)) diff --git a/test/spawn_many.erl b/bench/spawn_many.erl similarity index 100% rename from test/spawn_many.erl rename to bench/spawn_many.erl diff --git a/test/spawn_many.ml b/bench/spawn_many.ml similarity index 100% rename from test/spawn_many.ml rename to bench/spawn_many.ml diff --git a/riot/runtime/scheduler/scheduler.ml b/riot/runtime/scheduler/scheduler.ml index 10c98813..999933b0 100644 --- a/riot/runtime/scheduler/scheduler.ml +++ b/riot/runtime/scheduler/scheduler.ml @@ -256,7 +256,7 @@ module Scheduler = struct done; Mutex.unlock sch.idle_mutex; - for _ = 0 to Int.min (Proc_queue.size sch.run_queue) 10 do + for _ = 0 to Int.min (Proc_queue.size sch.run_queue) 5_000 do match Proc_queue.next sch.run_queue with | Some proc -> set_current_process_pid proc.pid;