Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: sleep a scheduler if there's no work to do #16

Merged
merged 2 commits into from
Dec 18, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions bench/dune
Original file line number Diff line number Diff line change
@@ -0,0 +1,3 @@
(executables
(names spawn_many)
(libraries riot))
File renamed without changes.
File renamed without changes.
1 change: 1 addition & 0 deletions riot/runtime/core/proc_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ type t = { alive : Proc_set.t; queue : Process.t Lf_queue.t }

let create () = { queue = Lf_queue.create (); alive = Proc_set.create () }
let size t = Proc_set.size t.alive
let is_empty t = size t = 0

let queue t proc =
if Proc_set.contains t.alive proc then ()
Expand Down
1 change: 1 addition & 0 deletions riot/runtime/core/proc_queue.mli
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@ type t

val create : unit -> t
val size : t -> int
val is_empty : t -> bool
val queue : t -> Process.t -> unit
val next : t -> Process.t option
2 changes: 2 additions & 0 deletions riot/runtime/net/io.ml
Original file line number Diff line number Diff line change
Expand Up @@ -95,6 +95,8 @@ let gc t =
is_open);
Dashmap.remove_by t.procs (fun (_fd, (proc, _)) -> Process.is_waiting_io proc)

let can_poll t = not (Dashmap.is_empty t.procs)

let poll t fn =
gc t;
let ready_count = Poll.ppoll_or_poll t.poll t.poll_idx t.poll_timeout in
Expand Down
1 change: 1 addition & 0 deletions riot/runtime/net/io.mli
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ val pp : Format.formatter -> t -> unit
val register : t -> Process.t -> [ `r | `rw | `w ] -> Fd.t -> unit
val unregister_process : t -> Process.t -> unit
val poll : t -> (Process.t * Fd.Mode.t -> unit) -> unit
val can_poll : t -> bool
val close : t -> Fd.t -> unit

val getaddrinfo :
Expand Down
28 changes: 26 additions & 2 deletions riot/runtime/scheduler/scheduler.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,8 @@ type t = {
sleep_set : Proc_set.t;
timers : Timer_wheel.t;
io_tbl : Io.t;
idle_mutex : Mutex.t;
idle_condition : Condition.t;
}

type pool = {
Expand All @@ -32,6 +34,8 @@ module Scheduler = struct
sleep_set = Proc_set.create ();
io_tbl = Io.create ();
timers = Timer_wheel.create ();
idle_mutex = Mutex.create ();
idle_condition = Condition.create ();
}

let get_current_scheduler, set_current_scheduler =
Expand All @@ -50,8 +54,10 @@ module Scheduler = struct
Timer_wheel.make_timer sch.timers time mode fn

let add_to_run_queue sch (proc : Process.t) =
Mutex.protect sch.idle_mutex @@ fun () ->
Proc_set.remove sch.sleep_set proc;
Proc_queue.queue sch.run_queue proc;
Condition.signal sch.idle_condition;
Log.trace (fun f ->
f "Adding process to run_queue queue[%d]: %a"
(Proc_queue.size sch.run_queue)
Expand Down Expand Up @@ -239,7 +245,18 @@ module Scheduler = struct
while true do
if pool.stop then raise_notrace Exit;

for _ = 0 to Int.min (Proc_queue.size sch.run_queue) 10 do
Mutex.lock sch.idle_mutex;
while
(not pool.stop)
&& Proc_queue.is_empty sch.run_queue
&& (not (Timer_wheel.can_tick sch.timers))
&& not (Io.can_poll sch.io_tbl)
do
Condition.wait sch.idle_condition sch.idle_mutex
done;
Mutex.unlock sch.idle_mutex;

for _ = 0 to Int.min (Proc_queue.size sch.run_queue) 5_000 do
match Proc_queue.next sch.run_queue with
| Some proc ->
set_current_process_pid proc.pid;
Expand All @@ -260,8 +277,15 @@ module Pool = struct
let get_pool, set_pool = Thread_local.make ~name:"POOL"

let shutdown pool =
let rec wake_up_scheduler sch =
if Mutex.try_lock sch.idle_mutex then (
Condition.signal sch.idle_condition;
Mutex.unlock sch.idle_mutex)
else wake_up_scheduler sch
in
Log.trace (fun f -> f "shutdown called");
pool.stop <- true
pool.stop <- true;
List.iter wake_up_scheduler pool.schedulers

let register_process pool _scheduler proc =
Proc_table.register_process pool.processes proc
Expand Down
2 changes: 2 additions & 0 deletions riot/runtime/scheduler/scheduler.mli
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ type t = {
sleep_set : Proc_set.t;
timers : Time.Timer_wheel.t;
io_tbl : Net.Io.t;
idle_mutex : Mutex.t;
idle_condition : Condition.t;
}

type pool = {
Expand Down
1 change: 1 addition & 0 deletions riot/runtime/time/timer_wheel.ml
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ type t = {
}

let create () = { timers = Dashmap.create (); last_t = Ptime_clock.now () }
let can_tick t = not (Dashmap.is_empty t.timers)

let make_timer t time mode fn =
let timer = Timer.make time mode fn in
Expand Down
1 change: 1 addition & 0 deletions riot/runtime/time/timer_wheel.mli
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ val make_timer :

val ends_at : Ptime.t -> Ptime.span -> Ptime.t
val tick : t -> unit
val can_tick : t -> bool
1 change: 1 addition & 0 deletions riot/runtime/util/dashmap.ml
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ let find t k = List.assoc_opt k (entries t)
let find_by t fn = List.find_opt fn (entries t)
let find_all_by t fn = List.find_all fn (entries t)
let has_key t k = find t k |> Option.is_some
let is_empty t = entries t = []

let rec insert t k v =
let tbl1 = entries t in
Expand Down
1 change: 1 addition & 0 deletions riot/runtime/util/dashmap.mli
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
type ('k, 'v) t

val create : 'a -> ('b, 'c) t
val is_empty : ('k, 'v) t -> bool
val entries : ('a, 'b) t -> ('a * 'b) list
val find_by : ('a, 'b) t -> ('a * 'b -> bool) -> ('a * 'b) option
val find_all_by : ('a, 'b) t -> ('a * 'b -> bool) -> ('a * 'b) list
Expand Down
Loading