From 160ef8d5231dbc3cf9feee7d5ac46448147ecac5 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Fri, 30 Aug 2024 15:02:37 +0300 Subject: [PATCH] Change to use Picos instead of DLT and DLA This implements support for cancelation and removes explicit support for timeouts, which simplifes the main library implementations. This basically also means that one can no longer use Kcas without a scheduler. --- bench/bench_mvar.ml | 3 +- bench/bench_queue.ml | 3 +- bench/bench_stack.ml | 3 +- bench/dune | 1 + doc/dune | 16 +- dune | 14 +- dune-project | 10 +- dune-workspace | 13 + kcas.opam | 6 +- kcas.opam.template | 3 + kcas_data.opam | 2 +- src/kcas/dune | 14 +- src/kcas/kcas.ml | 256 +++++------------- src/kcas/kcas.mli | 188 ++++++------- src/kcas_data/dllist.ml | 9 +- src/kcas_data/dllist.mli | 2 - src/kcas_data/dllist_intf.ml | 5 +- src/kcas_data/mvar.ml | 11 +- src/kcas_data/mvar.mli | 7 +- src/kcas_data/mvar_intf.ml | 7 +- src/kcas_data/promise.ml | 7 +- src/kcas_data/promise.mli | 2 - src/kcas_data/promise_intf.ml | 5 +- src/kcas_data/queue.ml | 8 +- src/kcas_data/queue.mli | 7 +- src/kcas_data/queue_intf.ml | 5 +- src/kcas_data/stack.ml | 6 +- src/kcas_data/stack.mli | 7 +- src/kcas_data/stack_intf.ml | 5 +- test/kcas/dune | 4 +- test/kcas/loc_modes.ml | 5 +- test/kcas/test.ml | 218 ++++++++------- test/kcas/threads.ml | 2 + test/kcas_data/accumulator_test_stm.ml | 4 +- test/kcas_data/dllist_test_stm.ml | 3 +- test/kcas_data/dune | 2 + test/kcas_data/hashtbl_test_stm.ml | 3 +- .../linearizable_chaining_example.ml | 4 +- test/kcas_data/lru_cache.ml | 5 +- test/kcas_data/lru_cache.mli | 2 - test/kcas_data/lru_cache_example.ml | 13 +- test/kcas_data/lru_cache_intf.ml | 3 +- test/kcas_data/mvar_test.ml | 10 +- test/kcas_data/queue_test_stm.ml | 3 +- test/kcas_data/stack_test_stm.ml | 3 +- test/lib/scheduler/dune | 9 + test/lib/scheduler/scheduler.ocaml4.ml | 1 + test/lib/scheduler/scheduler.ocaml5.ml | 2 + test/{kcas_data => lib}/stm_run/dune | 0 .../stm_run/empty.ocaml4.ml | 0 .../stm_run/empty.ocaml5.ml | 0 test/{kcas_data => lib}/stm_run/intf.ml | 3 + .../stm_run/stm_run.ocaml4.ml | 8 +- .../stm_run/stm_run.ocaml5.ml | 10 +- test/{kcas_data => lib}/stm_run/util.ml | 0 55 files changed, 413 insertions(+), 529 deletions(-) create mode 100644 dune-workspace create mode 100644 test/lib/scheduler/dune create mode 100644 test/lib/scheduler/scheduler.ocaml4.ml create mode 100644 test/lib/scheduler/scheduler.ocaml5.ml rename test/{kcas_data => lib}/stm_run/dune (100%) rename test/{kcas_data => lib}/stm_run/empty.ocaml4.ml (100%) rename test/{kcas_data => lib}/stm_run/empty.ocaml5.ml (100%) rename test/{kcas_data => lib}/stm_run/intf.ml (96%) rename test/{kcas_data => lib}/stm_run/stm_run.ocaml4.ml (64%) rename test/{kcas_data => lib}/stm_run/stm_run.ocaml5.ml (73%) rename test/{kcas_data => lib}/stm_run/util.ml (100%) diff --git a/bench/bench_mvar.ml b/bench/bench_mvar.ml index 2c3988b5..6957a9c2 100644 --- a/bench/bench_mvar.ml +++ b/bench/bench_mvar.ml @@ -11,6 +11,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in let init _ = () in + let wrap _ () action = Scheduler.run action in let work i () = if i < n_adders then if blocking_add then @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work ~after () + Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_queue.ml b/bench/bench_queue.ml index ebcc9d07..0ea3ab90 100644 --- a/bench/bench_queue.ml +++ b/bench/bench_queue.ml @@ -29,6 +29,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ () action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_stack.ml b/bench/bench_stack.ml index 4886702b..94a835a9 100644 --- a/bench/bench_stack.ml +++ b/bench/bench_stack.ml @@ -29,6 +29,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ () action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/dune b/bench/dune index cd97da36..279eb529 100644 --- a/bench/dune +++ b/bench/dune @@ -14,6 +14,7 @@ let () = (action (run %{test} -brief)) (libraries + scheduler kcas_data multicore-bench backoff diff --git a/doc/dune b/doc/dune index 4bfc69d7..0694d733 100644 --- a/doc/dune +++ b/doc/dune @@ -1,8 +1,8 @@ -(mdx - (package kcas_data) - (deps - (package kcas) - (package kcas_data)) - (enabled_if - (>= %{ocaml_version} 5.0.0)) - (files gkmz-with-read-only-cmp-ops.md scheduler-interop.md)) +;(mdx +; (package kcas_data) +; (deps +; (package kcas) +; (package kcas_data)) +; (enabled_if +; (>= %{ocaml_version} 5.0.0)) +; (files gkmz-with-read-only-cmp-ops.md scheduler-interop.md)) diff --git a/dune b/dune index 75232a0a..abdfa8cf 100644 --- a/dune +++ b/dune @@ -1,7 +1,7 @@ -(mdx - (package kcas_data) - (deps - (package kcas) - (package kcas_data)) - (libraries domain_shims) - (files README.md)) +;(mdx +; (package kcas_data) +; (deps +; (package kcas) +; (package kcas_data)) +; (libraries domain_shims) +; (files README.md)) diff --git a/dune-project b/dune-project index 96c1e31d..b0d3190f 100644 --- a/dune-project +++ b/dune-project @@ -34,10 +34,8 @@ (>= 4.13.0)) (backoff (>= 0.1.0)) - (domain-local-await - (>= 1.0.1)) - (domain-local-timeout - (>= 1.0.1)) + (picos + (>= 0.5.0)) (multicore-magic (>= 2.3.0)) (domain_shims @@ -84,9 +82,9 @@ (and (>= 0.1.0) :with-test)) - (domain-local-await + (picos (and - (>= 1.0.1) + (>= 0.5.0) :with-test)) (domain_shims (and diff --git a/dune-workspace b/dune-workspace new file mode 100644 index 00000000..b12e21ad --- /dev/null +++ b/dune-workspace @@ -0,0 +1,13 @@ +(lang dune 3.9) + +(env + (dev + (ocamlopt_flags + (:standard -S)) + (flags + (:standard -warn-error -A))) + (release + (ocamlopt_flags + (:standard -S)))) + +(display verbose) diff --git a/kcas.opam b/kcas.opam index 33c54daf..a4e2f20d 100644 --- a/kcas.opam +++ b/kcas.opam @@ -19,8 +19,7 @@ depends: [ "dune" {>= "3.14"} "ocaml" {>= "4.13.0"} "backoff" {>= "0.1.0"} - "domain-local-await" {>= "1.0.1"} - "domain-local-timeout" {>= "1.0.1"} + "picos" {>= "0.5.0"} "multicore-magic" {>= "2.3.0"} "domain_shims" {>= "0.1.0" & with-test} "alcotest" {>= "1.8.0" & with-test} @@ -46,3 +45,6 @@ build: [ ] dev-repo: "git+https://github.com/ocaml-multicore/kcas.git" doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "picos.dev" "git+https://github.com/ocaml-multicore/picos#c035ca8ba4cf9b8d549367763c68e4a77ebfd532" ] +] diff --git a/kcas.opam.template b/kcas.opam.template index 0fd71d5e..37f26461 100644 --- a/kcas.opam.template +++ b/kcas.opam.template @@ -1 +1,4 @@ doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "picos.dev" "git+https://github.com/ocaml-multicore/picos#c035ca8ba4cf9b8d549367763c68e4a77ebfd532" ] +] diff --git a/kcas_data.opam b/kcas_data.opam index 057d28dd..a2bd733a 100644 --- a/kcas_data.opam +++ b/kcas_data.opam @@ -20,7 +20,7 @@ depends: [ "kcas" {= version} "multicore-magic" {>= "2.3.0"} "backoff" {>= "0.1.0" & with-test} - "domain-local-await" {>= "1.0.1" & with-test} + "picos" {>= "0.5.0" & with-test} "domain_shims" {>= "0.1.0" & with-test} "multicore-bench" {>= "0.1.4" & with-test} "alcotest" {>= "1.8.0" & with-test} diff --git a/src/kcas/dune b/src/kcas/dune index 70b19af5..5a83de7a 100644 --- a/src/kcas/dune +++ b/src/kcas/dune @@ -1,11 +1,11 @@ (library (name kcas) (public_name kcas) - (libraries domain-local-await domain-local-timeout backoff multicore-magic)) + (libraries picos backoff multicore-magic)) -(mdx - (package kcas) - (deps - (package kcas)) - (libraries kcas backoff domain_shims) - (files kcas.mli)) +;(mdx +; (package kcas) +; (deps +; (package kcas)) +; (libraries kcas backoff domain_shims) +; (files kcas.mli)) diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index 5bded5f4..ea427b86 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -3,6 +3,8 @@ * Copyright (c) 2023, Vesa Karvonen *) +open Picos + (** Work around CSE bug in OCaml 5-5.1. *) let[@inline] atomic_get x = Atomic.get ((* Prevents CSE *) Sys.opaque_identity x) @@ -22,82 +24,6 @@ let[@inline] fenceless_get x = let fenceless_get = atomic_get *) -module Timeout = struct - exception Timeout - - let[@inline never] timeout () = raise Timeout - - type _ t = - | Unset : [> `Unset ] t - | Elapsed : [> `Elapsed ] t - | Call : (unit -> unit) -> [> `Call ] t - | Set : { mutable state : [< `Elapsed | `Call ] t } -> [> `Set ] t - - external as_atomic : [< `Set ] t -> [< `Elapsed | `Call ] t Atomic.t - = "%identity" - - (* Fenceless operations are safe here as the timeout state is not not visible - outside of the library and we don't always need the latest value and, when - we do, there is a fence after. *) - - let[@inline] check (t : [< `Set | `Unset ] t) = - match t with - | Unset -> () - | Set set_r -> - if fenceless_get (as_atomic (Set set_r)) == Elapsed then timeout () - - let set seconds (state : [< `Elapsed | `Call ] t Atomic.t) = - Domain_local_timeout.set_timeoutf seconds @@ fun () -> - match Atomic.exchange state Elapsed with - | Call release_or_cancel -> release_or_cancel () - | Elapsed -> () - - let call_id = Call Fun.id - - let[@inline never] alloc seconds = - let (Set set_r as t : [ `Set ] t) = Set { state = call_id } in - let cancel = set seconds (as_atomic t) in - if not (Atomic.compare_and_set (as_atomic t) call_id (Call cancel)) then - timeout (); - Set set_r - - let[@inline] alloc_opt = function - | None -> Unset - | Some seconds -> alloc seconds - - let[@inline never] await (state : [< `Elapsed | `Call ] t Atomic.t) release = - match fenceless_get state with - | Call cancel as alive -> - if Atomic.compare_and_set state alive (Call release) then Call cancel - else timeout () - | Elapsed -> timeout () - - let[@inline] await (t : [ `Unset | `Set ] t) release = - match t with Unset -> Unset | Set r -> await (as_atomic (Set r)) release - - let[@inline never] unawait (state : [< `Elapsed | `Call ] t Atomic.t) alive = - match fenceless_get state with - | Call _ as await -> - if not (Atomic.compare_and_set state await alive) then timeout () - | Elapsed -> timeout () - - let[@inline] unawait t alive = - match (t, alive) with - | Set set_r, Call call_r -> unawait (as_atomic (Set set_r)) (Call call_r) - | _ -> () - - let[@inline] cancel_alive (alive : [< `Unset | `Call ] t) = - match alive with Unset -> () | Call cancel -> cancel () - - let[@inline] cancel (t : [< `Set | `Unset ] t) = - match t with - | Unset -> () - | Set set_r -> ( - match fenceless_get (as_atomic (Set set_r)) with - | Elapsed -> () - | Call cancel -> cancel ()) -end - module Id = struct let neg_id = Atomic.make (-1) let[@inline] neg_ids n = Atomic.fetch_and_add neg_id (-n) @@ -128,14 +54,12 @@ end = struct x end -type awaiter = unit -> unit - -let[@inline] resume_awaiter awaiter = awaiter () +type awaiter = Trigger.t let[@inline] resume_awaiters = function | [] -> () - | [ awaiter ] -> resume_awaiter awaiter - | awaiters -> List.iter resume_awaiter awaiters + | [ awaiter ] -> Trigger.signal awaiter + | awaiters -> List.iter Trigger.signal awaiters module Mode = struct type t = [ `Lock_free | `Obstruction_free ] @@ -166,7 +90,7 @@ and _ tdt = This field must be first, see [root_as_atomic] and [tree_as_ref]. *) - timeout : [ `Set | `Unset ] Timeout.t; + mutable fiber : Fiber.Maybe.t; mutable mode : Mode.t; mutable validate_counter : int; mutable post_commit : Action.t; @@ -467,67 +391,49 @@ let rec remove_awaiter backoff loc before awaiter = if not (Atomic.compare_and_set (as_atomic loc) state_old state_new) then remove_awaiter (Backoff.once backoff) loc before awaiter -let block timeout loc before = - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await timeout t.release in - if add_awaiter loc before t.release then begin - try t.await () - with cancellation_exn -> - remove_awaiter Backoff.default loc before t.release; - Timeout.cancel_alive alive; - raise cancellation_exn - end; - Timeout.unawait timeout alive +let block loc before = + let t = Trigger.create () in + if add_awaiter loc before t then + match Trigger.await t with + | None -> () + | Some exn_bt -> + remove_awaiter Backoff.default loc before t; + Exn_bt.raise exn_bt -let rec update_no_alloc timeout backoff loc state f = +let rec update_no_alloc backoff loc state f = (* Fenceless is safe as we have had a fence before if needed and there is a fence after. *) let state_old = fenceless_get (as_atomic loc) in let before = eval state_old in match f before with | after -> - if before == after then begin - Timeout.cancel timeout; - before - end + if before == after then before else begin state.after <- after; if Atomic.compare_and_set (as_atomic loc) state_old state then begin resume_awaiters state_old.awaiters; - Timeout.cancel timeout; before end - else update_no_alloc timeout (Backoff.once backoff) loc state f + else update_no_alloc (Backoff.once backoff) loc state f end | exception Retry.Later -> - block timeout loc before; - update_no_alloc timeout backoff loc state f - | exception exn -> - Timeout.cancel timeout; - raise exn + block loc before; + update_no_alloc backoff loc state f -let update_with_state timeout backoff loc f state_old = +let update_with_state backoff loc f state_old = let before = eval state_old in match f before with | after -> - if before == after then begin - Timeout.cancel timeout; - before - end + if before == after then before else let state = new_state after in if Atomic.compare_and_set (as_atomic loc) state_old state then begin resume_awaiters state_old.awaiters; - Timeout.cancel timeout; before end - else update_no_alloc timeout (Backoff.once backoff) loc state f + else update_no_alloc (Backoff.once backoff) loc state f | exception Retry.Later -> let state = new_state before in - block timeout loc before; - update_no_alloc timeout backoff loc state f - | exception exn -> - Timeout.cancel timeout; - raise exn + update_no_alloc backoff loc state f let rec exchange_no_alloc backoff loc state = let state_old = atomic_get (as_atomic loc) in @@ -584,25 +490,15 @@ module Loc = struct let[@inline] get_id loc = (to_loc loc).id let get loc = eval (atomic_get (as_atomic (to_loc loc))) - let rec get_as timeout f loc state = + let rec get_as f loc state = let before = eval state in - match f before with - | value -> - Timeout.cancel timeout; - value - | exception Retry.Later -> - block timeout (to_loc loc) before; - (* Fenceless is safe as there was already a fence before. *) - get_as timeout f loc (fenceless_get (as_atomic (to_loc loc))) - | exception exn -> - Timeout.cancel timeout; - raise exn - - let[@inline] get_as ?timeoutf f loc = - get_as - (Timeout.alloc_opt timeoutf) - f loc - (atomic_get (as_atomic (to_loc loc))) + try f before + with Retry.Later -> + block (to_loc loc) before; + (* Fenceless is safe as there was already a fence before. *) + get_as f loc (fenceless_get (as_atomic (to_loc loc))) + + let[@inline] get_as f loc = get_as f loc (atomic_get (as_atomic (to_loc loc))) let[@inline] get_mode loc = if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free @@ -612,21 +508,18 @@ module Loc = struct let state_old = atomic_get (as_atomic (to_loc loc)) in cas_with_state backoff (to_loc loc) before state state_old - let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in - update_with_state timeout backoff (to_loc loc) f + let fenceless_update ?(backoff = Backoff.default) loc f = + update_with_state backoff (to_loc loc) f (fenceless_get (as_atomic (to_loc loc))) - let[@inline] fenceless_modify ?timeoutf ?backoff loc f = - fenceless_update ?timeoutf ?backoff loc f |> ignore + let[@inline] fenceless_modify ?backoff loc f = + fenceless_update ?backoff loc f |> ignore - let update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in - update_with_state timeout backoff (to_loc loc) f + let update ?(backoff = Backoff.default) loc f = + update_with_state backoff (to_loc loc) f (atomic_get (as_atomic (to_loc loc))) - let[@inline] modify ?timeoutf ?backoff loc f = - update ?timeoutf ?backoff loc f |> ignore + let[@inline] modify ?backoff loc f = update ?backoff loc f |> ignore let exchange ?(backoff = Backoff.default) loc value = exchange_no_alloc backoff (to_loc loc) (new_state value) @@ -711,6 +604,17 @@ module Xt = struct tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); before + let check (Xt xt_r : _ t) = + match Fiber.Maybe.to_fiber_or_current xt_r.fiber with + | fiber -> + Fiber.check fiber; + if xt_r.fiber != Fiber.Maybe.of_fiber fiber then + xt_r.fiber <- Fiber.Maybe.of_fiber fiber + | exception _ -> + (* Getting the current fiber should never fail except when not running + inside a scheduler. *) + () + let update_old : type c a. _ -> a loc -> c -> (c, a) up -> _ -> _ -> _ -> a = fun (Xt xt_r as xt : _ t) loc c up lt gt state' -> let c0 = xt_r.validate_counter in @@ -721,7 +625,7 @@ module Xt = struct The assumption is that potentially infinite loops will repeatedly access the same locations. *) if c0 land c1 = 0 then begin - Timeout.check xt_r.timeout; + check xt; validate_all_rec xt !(tree_as_ref xt) end; let state : a state = Obj.magic state' in @@ -929,17 +833,13 @@ module Xt = struct let initial_validate_period = 4 - let success (Xt xt_r : _ t) result = - Timeout.cancel xt_r.timeout; - Action.run xt_r.post_commit result - let rec commit backoff (Xt xt_r as xt : _ t) tx = match tx ~xt with | result -> begin match !(tree_as_ref xt) with - | T Leaf -> success xt result + | T Leaf -> Action.run xt_r.post_commit result | T (Node { loc; state; lt = T Leaf; gt = T Leaf; _ }) -> - if is_cmp xt state then success xt result + if is_cmp xt state then Action.run xt_r.post_commit result else begin state.which <- W After; let before = state.before in @@ -948,7 +848,7 @@ module Xt = struct fence. *) let state_old = fenceless_get (as_atomic loc) in if cas_with_state Backoff.default loc before state state_old then - success xt result + Action.run xt_r.post_commit result else commit_once_reuse backoff xt tx end | T (Node node_r) -> begin @@ -956,22 +856,22 @@ module Xt = struct match determine xt 0 root with | status -> if a_cmp_followed_by_a_cas < status then begin - if finish xt root (verify xt root) then success xt result + if finish xt root (verify xt root) then + Action.run xt_r.post_commit result else begin - (* We switch to [`Lock_free] as there was - interference. *) + (* We switch to [`Lock_free] as there was interference. *) commit_once_alloc backoff `Lock_free xt tx end end else if a_cmp = status || finish xt root (if 0 <= status then After else Before) - then success xt result + then Action.run xt_r.post_commit result else commit_once_alloc backoff xt_r.mode xt tx | exception Exit -> (* Fenceless is safe as there was a fence before. *) if fenceless_get (root_as_atomic xt) == R After then - success xt result + Action.run xt_r.post_commit result else commit_once_alloc backoff xt_r.mode xt tx end end @@ -981,58 +881,52 @@ module Xt = struct | T Leaf -> invalid_retry () | T (Node node_r) -> begin let root = Node node_r in - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await xt_r.timeout t.release in - match add_awaiters t.release xt root with + let t = Trigger.create () in + match add_awaiters t xt root with | T Leaf -> begin - match t.await () with - | () -> - remove_awaiters t.release xt (T Leaf) root |> ignore; - Timeout.unawait xt_r.timeout alive; + match Trigger.await t with + | None -> + remove_awaiters t xt (T Leaf) root |> ignore; commit_reset_reuse backoff xt tx - | exception cancellation_exn -> - remove_awaiters t.release xt (T Leaf) root |> ignore; - Timeout.cancel_alive alive; - raise cancellation_exn + | Some exn_bt -> + remove_awaiters t xt (T Leaf) root |> ignore; + Exn_bt.raise exn_bt end | T (Node _) as stop -> - remove_awaiters t.release xt stop root |> ignore; - Timeout.unawait xt_r.timeout alive; + remove_awaiters t xt stop root |> ignore; commit_once_reuse backoff xt tx end end - | exception exn -> - Timeout.cancel xt_r.timeout; - raise exn and commit_once_reuse backoff xt tx = + check xt; commit_reuse (Backoff.once backoff) xt tx and commit_reset_reuse backoff xt tx = + check xt; commit_reuse (Backoff.reset backoff) xt tx and commit_reuse backoff (Xt xt_r as xt : _ t) tx = tree_as_ref xt := T Leaf; xt_r.validate_counter <- initial_validate_period; xt_r.post_commit <- Action.noop; - Timeout.check xt_r.timeout; commit backoff xt tx - and commit_once_alloc backoff mode (Xt xt_r : _ t) tx = + and commit_once_alloc backoff mode (Xt xt_r as xt : _ t) tx = + check xt; let backoff = Backoff.once backoff in - Timeout.check xt_r.timeout; let rot = U Leaf in let validate_counter = initial_validate_period in let post_commit = Action.noop in let xt = Xt { xt_r with rot; mode; validate_counter; post_commit } in commit backoff xt tx - let[@inline] commit ?timeoutf ?(backoff = Backoff.default) - ?(mode = `Obstruction_free) { tx } = - let timeout = Timeout.alloc_opt timeoutf + let[@inline] commit ?(backoff = Backoff.default) ?(mode = `Obstruction_free) + { tx } = + let fiber = Fiber.Maybe.nothing and rot = U Leaf and validate_counter = initial_validate_period and post_commit = Action.noop in - let xt = Xt { rot; timeout; mode; validate_counter; post_commit } in + let xt = Xt { rot; fiber; mode; validate_counter; post_commit } in commit backoff xt tx end diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 185a2c23..7f7212ca 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -22,7 +22,7 @@ obstruction-free} read-only compare (CMP) operations that can be performed on overlapping locations in parallel without interference. - - {b Blocking await}: The algorithm supports timeouts and awaiting for + - {b Blocking await}: The algorithm supports cancelation and awaiting for changes to any number of shared memory locations. - {b Composable}: Independently developed transactions can be composed with @@ -105,16 +105,9 @@ can skip over these. The documentation links back to these modules where appropriate. *) -(** Timeout support. *) -module Timeout : sig - exception Timeout - (** Exception that may be raised by operations such as {!Loc.get_as}, - {!Loc.update}, {!Loc.modify}, or {!Xt.commit} when given a [~timeoutf] in - seconds. *) -end - -(** Retry support. *) module Retry : sig + (** Retry support. *) + exception Later (** Exception that may be raised to signal that the operation, such as {!Loc.get_as}, {!Loc.update}, or {!Xt.commit}, should be retried, at some @@ -126,7 +119,7 @@ module Retry : sig shared memory locations have already changed. *) val later : unit -> 'a - (** [later ()] is equivalent to [raise Later]. *) + (** [later ()] is equivalent to [raise_notrace Later]. *) val unless : bool -> unit (** [unless condition] is equivalent to [if not condition then later ()]. *) @@ -137,11 +130,12 @@ module Retry : sig outside of the transaction, and the transaction should be retried. *) val invalid : unit -> 'a - (** [invalid ()] is equivalent to [raise Invalid]. *) + (** [invalid ()] is equivalent to [raise_notrace Invalid]. *) end -(** Operating modes of the [k-CAS-n-CMP] algorithm. *) module Mode : sig + (** Operating modes of the [k-CAS-n-CMP] algorithm. *) + type t = [ `Lock_free (** In [`Lock_free] mode the algorithm makes sure that at least one domain will @@ -164,18 +158,15 @@ end - [backoff] specifies the configuration for the [Backoff] mechanism. In special cases, having more detailed knowledge of the application, one - might adjust the configuration to improve performance. + might adjust the configuration to improve performance. *) - - [timeoutf] specifies a timeout in seconds and, if specified, the - {!Timeout.Timeout} exception may be raised by the operation to signal that - the timeout expired. *) +module Loc : sig + (** Shared memory locations. -(** Shared memory locations. + This module is essentially compatible with the [Stdlib.Atomic] module, + except that a number of functions take some optional arguments that one + usually need not worry about. *) - This module is essentially compatible with the [Stdlib.Atomic] module, - except that a number of functions take some optional arguments that one - usually need not worry about. *) -module Loc : sig (** Type of shared memory locations. *) type !'a t = private | Loc : { state : 'state; id : 'id } -> 'a t @@ -225,7 +216,7 @@ module Loc : sig val get : 'a t -> 'a (** [get r] reads the current value of the shared memory location [r]. *) - val get_as : ?timeoutf:float -> ('a -> 'b) -> 'a t -> 'b + val get_as : ('a -> 'b) -> 'a t -> 'b (** [get_as f loc] is equivalent to [f (get loc)]. The given function [f] may raise the {!Retry.Later} exception to signal that the conditional load should be retried only after the location has been modified outside of the @@ -237,7 +228,7 @@ module Loc : sig location [r] to the [after] value if the current value of [r] is the [before] value. *) - val update : ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [update r f] repeats [let b = get r in compare_and_set r b (f b)] until it succeeds and then returns the [b] value. The given function [f] may raise the {!Retry.Later} exception to signal that the update should only be @@ -245,8 +236,7 @@ module Loc : sig also safe for the given function [f] to raise any other exception to abort the update. *) - val modify : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [modify r f] is equivalent to [update r f |> ignore]. *) val exchange : ?backoff:Backoff.t -> 'a t -> 'a -> 'a @@ -277,13 +267,11 @@ module Loc : sig (** [fenceless_get r] is like [get r] except that [fenceless_get]s may be reordered. *) - val fenceless_update : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val fenceless_update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [fenceless_update r f] is like [update r f] except that in case [f x == x] the update may be reordered. *) - val fenceless_modify : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val fenceless_modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [fenceless_modify r f] is like [modify r f] except that in case [f x == x] the modify may be reordered. *) end @@ -318,77 +306,80 @@ end memory locations are atomically marked as having taken effect and subsequent reads of the locations will be able to see the newly written values. *) -(** Explicit transaction log passing on shared memory locations. - - This module provides a way to implement composable transactions over shared - memory locations. A transaction is a function written by the library user - and can be thought of as a specification of a sequence of {!Xt.get} and - {!Xt.set} accesses to shared memory locations. To actually perform the - accesses one then {!Xt.commit}s the transaction. - - Transactions should generally not perform arbitrary side-effects, because - when a transaction is committed it may be attempted multiple times meaning - that the side-effects are also performed multiple times. {!Xt.post_commit} - can be used to perform an action only once after the transaction has been - committed succesfully. - - {b WARNING}: To make it clear, the operations provided by the {!Loc} module - for accessing individual shared memory locations do not implicitly go - through the transaction mechanism and should generally not be used within - transactions. There are advanced algorithms where one might, within a - transaction, perform operations that do not get recorded into the - transaction log. Using such techniques correctly requires expert knowledge - and is not recommended for casual users. - - As an example, consider an implementation of doubly-linked circular - lists. Instead of using a mutable field, [ref], or [Atomic.t], one would use - a shared memory location, or {!Loc.t}, for the pointers in the node type: - - {[ - type 'a node = { - succ: 'a node Loc.t; - pred: 'a node Loc.t; - datum: 'a; - } - ]} - - To remove a node safely one wants to atomically update the [succ] and [pred] - pointers of the predecessor and successor nodes and to also update the - [succ] and [pred] pointers of a node to point to the node itself, so that - removal becomes an {{:https://en.wikipedia.org/wiki/Idempotence} idempotent} - operation. Using explicit transaction log passing one could implement the - [remove] operation as follows: - - {[ - let remove ~xt node = - (* Read pointers to the predecessor and successor nodes: *) - let pred = Xt.get ~xt node.pred in - let succ = Xt.get ~xt node.succ in - (* Update pointers in this node: *) - Xt.set ~xt node.succ node; - Xt.set ~xt node.pred node; - (* Update pointers to this node: *) - Xt.set ~xt pred.succ succ; - Xt.set ~xt succ.pred pred - ]} +module Xt : sig + (** Explicit transaction log passing on shared memory locations. + + This module provides a way to implement composable transactions over + shared memory locations. A transaction is a function written by the + library user and can be thought of as a specification of a sequence of + {!Xt.get} and {!Xt.set} accesses to shared memory locations. To actually + perform the accesses one then {!Xt.commit}s the transaction. + + Transactions should generally not perform arbitrary side-effects, because + when a transaction is committed it may be attempted multiple times meaning + that the side-effects are also performed multiple times. + {!Xt.post_commit} can be used to perform an action only once after the + transaction has been committed succesfully. + + {b WARNING}: To make it clear, the operations provided by the {!Loc} + module for accessing individual shared memory locations do not implicitly + go through the transaction mechanism and should generally not be used + within transactions. There are advanced algorithms where one might, + within a transaction, perform operations that do not get recorded into the + transaction log. Using such techniques correctly requires expert + knowledge and is not recommended for casual users. + + As an example, consider an implementation of doubly-linked circular + lists. Instead of using a mutable field, [ref], or [Atomic.t], one would + use a shared memory location, or {!Loc.t}, for the pointers in the node + type: + + {[ + type 'a node = { + succ: 'a node Loc.t; + pred: 'a node Loc.t; + datum: 'a; + } + ]} + + To remove a node safely one wants to atomically update the [succ] and + [pred] pointers of the predecessor and successor nodes and to also update + the [succ] and [pred] pointers of a node to point to the node itself, so + that removal becomes an {{:https://en.wikipedia.org/wiki/Idempotence} + idempotent} operation. Using explicit transaction log passing one could + implement the [remove] operation as follows: + + {[ + let remove ~xt node = + (* Read pointers to the predecessor and successor nodes: *) + let pred = Xt.get ~xt node.pred in + let succ = Xt.get ~xt node.succ in + (* Update pointers in this node: *) + Xt.set ~xt node.succ node; + Xt.set ~xt node.pred node; + (* Update pointers to this node: *) + Xt.set ~xt pred.succ succ; + Xt.set ~xt succ.pred pred + ]} + + The labeled argument, [~xt], refers to the transaction log. Transactional + operations like {!Xt.get} and {!Xt.set} are then recorded in that log. To + actually remove a node, we need to commit the transaction - The labeled argument, [~xt], refers to the transaction log. Transactional - operations like {!Xt.get} and {!Xt.set} are then recorded in that log. To - actually remove a node, we need to commit the transaction + {@ocaml skip[ + Xt.commit { tx = remove node } + ]} - {@ocaml skip[ - Xt.commit { tx = remove node } - ]} + which repeatedly calls the transaction function, [tx], to record a + transaction log and attempts to atomically perform it until it succeeds. - which repeatedly calls the transaction function, [tx], to record a - transaction log and attempts to atomically perform it until it succeeds. + Notice that [remove] is not recursive. It doesn't have to account for + failure or perform a backoff. It is also not necessary to know or keep + track of what the previous values of locations were. All of that is taken + care of for us by the transaction log and the {!Xt.commit} function. + Furthermore, [remove] can easily be called as a part of a more complex + transaction. *) - Notice that [remove] is not recursive. It doesn't have to account for - failure or perform a backoff. It is also not necessary to know or keep track - of what the previous values of locations were. All of that is taken care of - for us by the transaction log and the {!Xt.commit} function. Furthermore, - [remove] can easily be called as a part of a more complex transaction. *) -module Xt : sig type 'x t (** Type of an explicit transaction log on shared memory locations. @@ -548,8 +539,7 @@ module Xt : sig val call : xt:'x t -> 'a tx -> 'a (** [call ~xt tx] is equivalent to [tx.Xt.tx ~xt]. *) - val commit : - ?timeoutf:float -> ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a + val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a (** [commit tx] repeatedly calls [tx] to record a log of shared memory accesses and attempts to perform them atomically until it succeeds and then returns whatever [tx] returned. [tx] may raise {!Retry.Later} or diff --git a/src/kcas_data/dllist.ml b/src/kcas_data/dllist.ml index 4b93cd48..7be748a1 100644 --- a/src/kcas_data/dllist.ml +++ b/src/kcas_data/dllist.ml @@ -225,13 +225,8 @@ let move_l node list = Kcas.Xt.commit { tx = Xt.move_l node list } let move_r node list = Kcas.Xt.commit { tx = Xt.move_r node list } let take_opt_l list = Kcas.Xt.commit { tx = Xt.take_opt_l list } let take_opt_r list = Kcas.Xt.commit { tx = Xt.take_opt_r list } - -let take_blocking_l ?timeoutf list = - Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking_l list } - -let take_blocking_r ?timeoutf list = - Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking_r list } - +let take_blocking_l list = Kcas.Xt.commit { tx = Xt.take_blocking_l list } +let take_blocking_r list = Kcas.Xt.commit { tx = Xt.take_blocking_r list } let swap t1 t2 = Kcas.Xt.commit { tx = Xt.swap t1 t2 } let transfer_l t1 t2 = Kcas.Xt.commit { tx = Xt.transfer_l t1 t2 } let transfer_r t1 t2 = Kcas.Xt.commit { tx = Xt.transfer_r t1 t2 } diff --git a/src/kcas_data/dllist.mli b/src/kcas_data/dllist.mli index de21e093..5f0163b2 100644 --- a/src/kcas_data/dllist.mli +++ b/src/kcas_data/dllist.mli @@ -59,7 +59,6 @@ module Xt : with type 'a t := 'a t with type 'a node := 'a node with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on doubly-linked lists. *) (** {1 Non-compositional interface} *) @@ -69,7 +68,6 @@ include with type 'a t := 'a t with type 'a node := 'a node with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn val take_all : 'a t -> 'a t (** [take_all l] removes all nodes of the doubly-linked list [l] and returns a diff --git a/src/kcas_data/dllist_intf.ml b/src/kcas_data/dllist_intf.ml index 738fda2c..406a9f94 100644 --- a/src/kcas_data/dllist_intf.ml +++ b/src/kcas_data/dllist_intf.ml @@ -2,7 +2,6 @@ module type Ops = sig type 'a t type 'a node type ('x, 'fn) fn - type ('x, 'fn) blocking_fn (** {2 Operations on nodes} *) @@ -42,12 +41,12 @@ module type Ops = sig (** [take_opt_r l] removes and returns the value of rightmost node of the doubly-linked list [l], or return [None] if the list is empty. *) - val take_blocking_l : ('x, 'a t -> 'a) blocking_fn + val take_blocking_l : ('x, 'a t -> 'a) fn (** [take_blocking_l l] removes and returns the value of leftmost node of the doubly-linked list [l], or blocks waiting for the list to become non-empty. *) - val take_blocking_r : ('x, 'a t -> 'a) blocking_fn + val take_blocking_r : ('x, 'a t -> 'a) fn (** [take_blocking_r l] removes and returns the value of rightmost node of the doubly-linked list [l], or blocks waiting for the list to become non-empty. *) diff --git a/src/kcas_data/mvar.ml b/src/kcas_data/mvar.ml index ba31e192..690e3dbc 100644 --- a/src/kcas_data/mvar.ml +++ b/src/kcas_data/mvar.ml @@ -25,18 +25,17 @@ end let is_empty mv = Magic_option.is_none (Loc.get mv) -let put ?timeoutf mv value = +let put mv value = (* Fenceless is safe as we always update. *) - Loc.fenceless_modify ?timeoutf mv (Magic_option.put_or_retry value) + Loc.fenceless_modify mv (Magic_option.put_or_retry value) let try_put mv value = Loc.compare_and_set mv Magic_option.none (Magic_option.some value) -let take ?timeoutf mv = +let take mv = (* Fenceless is safe as we always update. *) - Magic_option.get_unsafe - (Loc.fenceless_update ?timeoutf mv Magic_option.take_or_retry) + Magic_option.get_unsafe (Loc.fenceless_update mv Magic_option.take_or_retry) let take_opt mv = Magic_option.to_option (Loc.exchange mv Magic_option.none) -let peek ?timeoutf mv = Loc.get_as ?timeoutf Magic_option.get_or_retry mv +let peek mv = Loc.get_as Magic_option.get_or_retry mv let peek_opt mv = Magic_option.to_option (Loc.get mv) diff --git a/src/kcas_data/mvar.mli b/src/kcas_data/mvar.mli index f0a7d5ca..12a2b8df 100644 --- a/src/kcas_data/mvar.mli +++ b/src/kcas_data/mvar.mli @@ -26,13 +26,8 @@ module Xt : Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction passing on synchronizing variables. *) (** {1 Non-compositional interface} *) -include - Mvar_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn diff --git a/src/kcas_data/mvar_intf.ml b/src/kcas_data/mvar_intf.ml index e9f6c7e3..3df088f2 100644 --- a/src/kcas_data/mvar_intf.ml +++ b/src/kcas_data/mvar_intf.ml @@ -1,13 +1,12 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty mv] determines whether the synchronizing variable [mv] contains a value or not. *) - val put : ('x, 'a t -> 'a -> unit) blocking_fn + val put : ('x, 'a t -> 'a -> unit) fn (** [put mv x] fills the synchronizing variable [mv] with the value [v] or blocks until the variable becomes empty. *) @@ -16,7 +15,7 @@ module type Ops = sig value [v] and returns [true] on success or [false] in case the variable is full. *) - val take : ('x, 'a t -> 'a) blocking_fn + val take : ('x, 'a t -> 'a) fn (** [take mv] removes and returns the current value of the synchronizing variable [mv] or blocks waiting until the variable is filled. *) @@ -24,7 +23,7 @@ module type Ops = sig (** [take_opt mv] removes and returns the current value of the synchronizing variable [mv] or returns [None] in case the variable is empty. *) - val peek : ('x, 'a t -> 'a) blocking_fn + val peek : ('x, 'a t -> 'a) fn (** [peek mv] returns the current value of the synchronizing variable [mv] or blocks waiting until the variable is filled. *) diff --git a/src/kcas_data/promise.ml b/src/kcas_data/promise.ml index 0786ff9c..cf25e189 100644 --- a/src/kcas_data/promise.ml +++ b/src/kcas_data/promise.ml @@ -38,8 +38,7 @@ module Xt = struct let resolve_error ~xt u e = resolve ~xt u (Error e) end -let await ?timeoutf t = - Loc.get_as ?timeoutf Magic_option.get_or_retry (of_promise t) +let await t = Loc.get_as Magic_option.get_or_retry (of_promise t) let resolve u v = if @@ -51,8 +50,8 @@ let resolve u v = let peek t = Magic_option.to_option (Loc.get (of_promise t)) let is_resolved t = Magic_option.is_some (Loc.get (of_promise t)) -let await_exn ?timeoutf t = - match await ?timeoutf t with Ok value -> value | Error exn -> raise exn +let await_exn t = + match await t with Ok value -> value | Error exn -> raise exn let resolve_ok u v = resolve u (Ok v) let resolve_error u e = resolve u (Error e) diff --git a/src/kcas_data/promise.mli b/src/kcas_data/promise.mli index 11c49988..6515f908 100644 --- a/src/kcas_data/promise.mli +++ b/src/kcas_data/promise.mli @@ -42,7 +42,6 @@ module Xt : with type 'a or_exn := 'a or_exn with type 'a u := 'a u with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on promises. *) (** {1 Non-compositional interface} *) @@ -53,4 +52,3 @@ include with type 'a or_exn := 'a or_exn with type 'a u := 'a u with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn diff --git a/src/kcas_data/promise_intf.ml b/src/kcas_data/promise_intf.ml index 93d1034a..ffe512f9 100644 --- a/src/kcas_data/promise_intf.ml +++ b/src/kcas_data/promise_intf.ml @@ -3,14 +3,13 @@ module type Ops = sig type !-'a u type 'a or_exn type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val resolve : ('x, 'a u -> 'a -> unit) fn (** [resolve u v] resolves the promise corresponding to the resolver [u] to the value [v]. Any awaiters of the corresponding promise are then unblocked. *) - val await : ('x, 'a t -> 'a) blocking_fn + val await : ('x, 'a t -> 'a) fn (** [await t] either immediately returns the resolved value of the promise [t] or blocks until the promise [t] is resolved. *) @@ -24,7 +23,7 @@ module type Ops = sig (** {2 Result promises} *) - val await_exn : ('x, 'a or_exn -> 'a) blocking_fn + val await_exn : ('x, 'a or_exn -> 'a) fn (** [await_exn t] is equivalent to [match await t with v -> v | exception e -> raise e]. *) val resolve_ok : ('x, ('a, 'b) result u -> 'a -> unit) fn diff --git a/src/kcas_data/queue.ml b/src/kcas_data/queue.ml index 543bb0c7..d8742eb6 100644 --- a/src/kcas_data/queue.ml +++ b/src/kcas_data/queue.ml @@ -139,10 +139,10 @@ let take_opt q = | None -> Kcas.Xt.commit { tx = Xt.take_opt q } | some -> some -let take_blocking ?timeoutf q = +let take_blocking q = (* Fenceless is safe as we revert to a transaction in case we didn't update. *) match Loc.fenceless_update q.front Elems.tl_safe |> Elems.hd_opt with - | None -> Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking q } + | None -> Kcas.Xt.commit { tx = Xt.take_blocking q } | Some elem -> elem let take_all q = Kcas.Xt.commit { tx = Xt.take_all q } @@ -152,9 +152,7 @@ let peek_opt q = | None -> Kcas.Xt.commit { tx = Xt.peek_opt q } | some -> some -let peek_blocking ?timeoutf q = - Kcas.Xt.commit ?timeoutf { tx = Xt.peek_blocking q } - +let peek_blocking q = Kcas.Xt.commit { tx = Xt.peek_blocking q } let clear q = Kcas.Xt.commit { tx = Xt.clear q } let swap q1 q2 = Kcas.Xt.commit { tx = Xt.swap q1 q2 } let to_seq q = Kcas.Xt.commit { tx = Xt.to_seq q } diff --git a/src/kcas_data/queue.mli b/src/kcas_data/queue.mli index d955680c..f4dd49a9 100644 --- a/src/kcas_data/queue.mli +++ b/src/kcas_data/queue.mli @@ -31,16 +31,11 @@ module Xt : Queue_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on queues. *) (** {1 Non-compositional interface} *) -include - Queue_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Queue_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn val peek : 'a t -> 'a (** [peek q] returns the first element in queue [s], or raises {!Empty} if the diff --git a/src/kcas_data/queue_intf.ml b/src/kcas_data/queue_intf.ml index c787bdde..5293058e 100644 --- a/src/kcas_data/queue_intf.ml +++ b/src/kcas_data/queue_intf.ml @@ -1,7 +1,6 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty s] determines whether the queue [q] is empty. *) @@ -32,11 +31,11 @@ module type Ops = sig (** [peek_opt q] returns the first element in queue [q], without removing it from the queue, or returns [None] if the queue is empty. *) - val peek_blocking : ('x, 'a t -> 'a) blocking_fn + val peek_blocking : ('x, 'a t -> 'a) fn (** [peek_blocking q] returns the first element in queue [q], without removing it from the queue, or blocks waiting for the queue to become non-empty. *) - val take_blocking : ('x, 'a t -> 'a) blocking_fn + val take_blocking : ('x, 'a t -> 'a) fn (** [take_blocking q] removes and returns the first element in queue [q], or blocks waiting for the queue to become non-empty. *) diff --git a/src/kcas_data/stack.ml b/src/kcas_data/stack.ml index c4d0170a..ad684e36 100644 --- a/src/kcas_data/stack.ml +++ b/src/kcas_data/stack.ml @@ -30,12 +30,12 @@ let push x s = let pop_opt s = Loc.update s Elems.tl_safe |> Elems.hd_opt let pop_all s = Loc.exchange s Elems.empty |> Elems.to_seq -let pop_blocking ?timeoutf s = +let pop_blocking s = (* Fenceless is safe as we always update. *) - Loc.fenceless_update ?timeoutf s Elems.tl_or_retry |> Elems.hd_unsafe + Loc.fenceless_update s Elems.tl_or_retry |> Elems.hd_unsafe let top_opt s = Loc.get s |> Elems.hd_opt -let top_blocking ?timeoutf s = Loc.get_as ?timeoutf Elems.hd_or_retry s +let top_blocking s = Loc.get_as Elems.hd_or_retry s let clear s = Loc.set s Elems.empty let swap s1 s2 = Kcas.Xt.commit { tx = Kcas.Xt.swap s1 s2 } let to_seq s = Elems.to_seq @@ Loc.get s diff --git a/src/kcas_data/stack.mli b/src/kcas_data/stack.mli index fa7a67a0..d84fe9a6 100644 --- a/src/kcas_data/stack.mli +++ b/src/kcas_data/stack.mli @@ -33,16 +33,11 @@ module Xt : Stack_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on stacks. *) (** {1 Non-compositional interface} *) -include - Stack_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Stack_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn val pop : 'a t -> 'a (** [pop s] removes and returns the topmost element in stack [s], or raises diff --git a/src/kcas_data/stack_intf.ml b/src/kcas_data/stack_intf.ml index 640a840f..53e980f6 100644 --- a/src/kcas_data/stack_intf.ml +++ b/src/kcas_data/stack_intf.ml @@ -1,7 +1,6 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty s] determines whether the stack [s] is empty. *) @@ -34,7 +33,7 @@ module type Ops = sig sequence for iterating through all the elements that were in the stack top to bottom. *) - val pop_blocking : ('x, 'a t -> 'a) blocking_fn + val pop_blocking : ('x, 'a t -> 'a) fn (** [pop_blocking s] removes and returns the topmost element of the stack [s], or blocks waiting for the queue to become non-empty. *) @@ -42,7 +41,7 @@ module type Ops = sig (** [top_opt s] returns the topmost element in stack [s], or [None] if the stack is empty. *) - val top_blocking : ('x, 'a t -> 'a) blocking_fn + val top_blocking : ('x, 'a t -> 'a) fn (** [top_blocking s] returns the topmost element in stack [s], or blocks waiting for the queue to become non-empty. *) end diff --git a/test/kcas/dune b/test/kcas/dune index e78098c1..1d4a86af 100644 --- a/test/kcas/dune +++ b/test/kcas/dune @@ -1,9 +1,11 @@ (tests (names test ms_queue_test threads loc_modes) (libraries + scheduler + picos.finally + picos.structured alcotest kcas - domain-local-timeout threads.posix unix domain_shims) diff --git a/test/kcas/loc_modes.ml b/test/kcas/loc_modes.ml index aa4af6f7..2547e23e 100644 --- a/test/kcas/loc_modes.ml +++ b/test/kcas/loc_modes.ml @@ -1,3 +1,4 @@ +open Picos_structured open Kcas let loop_count = try int_of_string Sys.argv.(1) with _ -> Util.iter_factor @@ -76,6 +77,6 @@ let accumulator_thread () = exit := true let () = - accumulator_thread :: List.init n_counters counter_thread - |> List.map Domain.spawn |> List.iter Domain.join; + Scheduler.run ~n_domains:(n_counters + 1) @@ fun () -> + Run.all (accumulator_thread :: List.init n_counters counter_thread); Printf.printf "Loc modes OK!\n%!" diff --git a/test/kcas/test.ml b/test/kcas/test.ml index 1b433c5a..cdf4b684 100644 --- a/test/kcas/test.ml +++ b/test/kcas/test.ml @@ -24,6 +24,9 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ---------------------------------------------------------------------------*) +open Picos_finally +open Picos_structured + let is_single = Domain.recommended_domain_count () = 1 open Kcas @@ -34,16 +37,10 @@ let assert_kcas loc expected_v = let present_v = Loc.get loc in assert (present_v == expected_v) -let run_domains = function - | [] -> () - | main :: others -> - let others = List.map Domain.spawn others in - main (); - List.iter Domain.join others - (* *) let test_non_linearizable_xt () = + Scheduler.run ~n_domains:2 @@ fun () -> [ `Obstruction_free; `Lock_free ] |> List.iter @@ fun mode -> let barrier = Barrier.make 2 @@ -90,7 +87,7 @@ let test_non_linearizable_xt () = test_finished := true in - run_domains [ thread2; thread1 ] + Run.all [ thread2; thread1 ] (* *) @@ -103,6 +100,7 @@ let test_set () = (* *) let test_no_skew_xt () = + Scheduler.run ~n_domains:3 @@ fun () -> [ `Obstruction_free; `Lock_free ] |> List.iter @@ fun mode -> let barrier = Barrier.make 3 in @@ -163,14 +161,15 @@ let test_no_skew_xt () = done in - run_domains [ thread1; thread2; thread3 ] + Run.all [ thread1; thread2; thread3 ] (* *) let test_get_seq_xt () = + Scheduler.run ~n_domains:5 @@ fun () -> [ `Obstruction_free; `Lock_free ] |> List.iter @@ fun mode -> - let barrier = Barrier.make 4 in + let barrier = Barrier.make 5 in let test_finished = Atomic.make false in let a1 = Loc.make ~mode 0 in @@ -220,7 +219,7 @@ let test_get_seq_xt () = done in - run_domains [ mutator; getter; getaser; committer; updater ] + Run.all [ mutator; getter; getaser; committer; updater ] (* *) @@ -274,6 +273,7 @@ let test_presort_and_is_in_log_xt () = let n_incs = 10 * Util.iter_factor and n_domains = 3 and n_locs = 12 in let n_locs_half = n_locs asr 1 in + Scheduler.run ~n_domains @@ fun () -> let barrier = Barrier.make n_domains in let locs = Array.init n_locs (fun _ -> Loc.make 0) in @@ -295,7 +295,7 @@ let test_presort_and_is_in_log_xt () = done in - run_domains (List.init n_domains (Fun.const thread)); + Run.all (List.init n_domains (Fun.const thread)); let sum = locs |> Array.map Loc.get |> Array.fold_left ( + ) 0 in assert (sum = n_incs * n_locs_half * n_domains) @@ -344,6 +344,7 @@ let test_post_commit () = (* *) let test_blocking () = + Scheduler.run ~n_domains:2 @@ fun () -> let state = Loc.make `Spawned in let await state' = (* Intentionally test that [Xt.modify] allows retry. *) @@ -361,101 +362,109 @@ let test_blocking () = let num_attempts = ref 0 in - let other_domain = - Domain.spawn @@ fun () -> - Loc.set state `Get_a_non_zero; - Loc.get_as - (fun a -> - incr num_attempts; - Retry.unless (a != 0)) - a; - - Loc.set state `Update_a_zero; - Loc.modify a (fun a -> - incr num_attempts; - Retry.unless (a = 0); - a + 1); - - Loc.set state `Set_1_b_to_0; - let bs = Array.copy bs in - for _ = 1 to n do - (* We access in random order to exercise tx log waiters handling. *) - in_place_shuffle bs; - let tx ~xt = - incr num_attempts; - match - bs - |> Array.find_map @@ fun b -> - if Xt.get ~xt b = 1 then Some b - else if Loc.has_awaiters b (* There must be no leaked waiters... *) - then begin - (* ...except if main domain just set the loc *) - assert (Loc.get b = 1); - Retry.later () - end - else None - with - | None -> Retry.later () - | Some b -> Xt.set ~xt b 0 - in - Xt.commit { tx } - done - in + begin + Flock.join_after @@ fun () -> + begin + Flock.fork @@ fun () -> + Loc.set state `Get_a_non_zero; + Loc.get_as + (fun a -> + incr num_attempts; + Retry.unless (a != 0)) + a; + + Loc.set state `Update_a_zero; + Loc.modify a (fun a -> + incr num_attempts; + Retry.unless (a = 0); + a + 1); + + Loc.set state `Set_1_b_to_0; + let bs = Array.copy bs in + for _ = 1 to n do + (* We access in random order to exercise tx log waiters handling. *) + in_place_shuffle bs; + let tx ~xt = + incr num_attempts; + match + bs + |> Array.find_map @@ fun b -> + if Xt.get ~xt b = 1 then Some b + else if + Loc.has_awaiters b (* There must be no leaked waiters... *) + then begin + (* ...except if main domain just set the loc *) + assert (Loc.get b = 1); + Retry.later () + end + else None + with + | None -> Retry.later () + | Some b -> Xt.set ~xt b 0 + in + Xt.commit { tx } + done + end; - await `Get_a_non_zero; - Loc.set a 1; - assert (!num_attempts <= 2 + 1 (* Need to account for race to next. *)); + await `Get_a_non_zero; + Loc.set a 1; + assert (!num_attempts <= 3 + 1 (* Need to account for race to next. *)); - await `Update_a_zero; - Loc.set a 0; - assert (!num_attempts <= 4 + 1 (* Need to account for race to next. *)); + await `Update_a_zero; + Loc.set a 0; + assert (!num_attempts <= 5 + 1 (* Need to account for race to next. *)); - await `Set_1_b_to_0; - for _ = 1 to n do - let i = Random.int (Array.length bs) in - Loc.set bs.(i) 1; - Loc.get_as (fun b -> Retry.unless (b = 0)) bs.(i) - done; - - Domain.join other_domain; + await `Set_1_b_to_0; + for _ = 1 to n do + let i = Random.int (Array.length bs) in + Loc.set bs.(i) 1; + Loc.get_as (fun b -> Retry.unless (b = 0)) bs.(i) + done + end; - assert (!num_attempts <= 4 + (n * 2)); + assert (!num_attempts <= 5 + (n * 2)); for i = 0 to Array.length bs - 1 do assert (not (Loc.has_awaiters bs.(i))) done let test_no_unnecessary_wakeups () = + Scheduler.run ~n_domains:2 @@ fun () -> let continue = Loc.make false and tries = Atomic.make 0 in - let other_domain = - Domain.spawn @@ fun () -> - continue - |> Loc.get_as @@ fun s -> - Atomic.incr tries; - Retry.unless s - in + begin + Flock.join_after @@ fun () -> + begin + Flock.fork @@ fun () -> + continue + |> Loc.get_as @@ fun s -> + Atomic.incr tries; + Retry.unless s + end; - while not (Loc.has_awaiters continue) do - Domain.cpu_relax () - done; + while not (Loc.has_awaiters continue) do + Domain.cpu_relax () + done; - assert (Loc.compare_and_set continue false false); - assert (not (Loc.update continue Fun.id)); - Loc.set continue false; + assert (Loc.compare_and_set continue false false); + assert (not (Loc.update continue Fun.id)); + Loc.set continue false; - Unix.sleepf 0.01; + Unix.sleepf 0.01; + + assert (Loc.has_awaiters continue && Atomic.get tries = 1); + Loc.set continue true + end; - assert (Loc.has_awaiters continue && Atomic.get tries = 1); - Loc.set continue true; - Domain.join other_domain; assert ((not (Loc.has_awaiters continue)) && Atomic.get tries = 2) (* *) let test_periodic_validation () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let a = Loc.make 0 and b = Loc.make 0 and looping = ref false in let non_zero_difference_domain = - Domain.spawn @@ fun () -> + Flock.fork_as_promise @@ fun () -> let rec tx ~xt = let d = Xt.get ~xt a - Xt.get ~xt b in if d <> 0 then d @@ -475,7 +484,7 @@ let test_periodic_validation () = Loc.set a 1; - assert (1 = Domain.join non_zero_difference_domain) + assert (1 = Promise.await non_zero_difference_domain) (* *) @@ -572,42 +581,43 @@ let test_call () = (** This is a non-deterministic test that might fail occasionally. *) let test_timeout () = - Domain_local_timeout.set_system (module Thread) (module Unix); - - let check (op : ?timeoutf:float -> bool Loc.t -> unit) () = + Scheduler.run ~n_domains:5 @@ fun () -> + let check (op : bool Loc.t -> unit) () = + Flock.join_after @@ fun () -> let rec loop n = let x = Loc.make false in - let (_ : unit -> unit) = - Domain_local_timeout.set_timeoutf 0.6 @@ fun () -> Loc.set x true + let@ _ = + finally Promise.terminate @@ fun () -> + Flock.fork_as_promise @@ fun () -> + Control.sleep ~seconds:0.6; + Loc.set x true in - match op ~timeoutf:0.02 x with + match Control.terminate_after ~seconds:0.02 (fun () -> op x) with | () -> if 0 < n then loop (n - 1) else assert false - | exception Timeout.Timeout -> op ~timeoutf:2.0 x + | exception Control.Terminate -> + Control.terminate_after ~seconds:2.0 (fun () -> op x) in loop 10 in - run_domains + Run.all [ - check (fun ?timeoutf x -> - Loc.get_as ?timeoutf (fun x -> if not x then Retry.later ()) x); - check (fun ?timeoutf x -> - Loc.update ?timeoutf x (fun x -> x || Retry.later ()) |> ignore); - check (fun ?timeoutf x -> - Loc.modify ?timeoutf x (fun x -> x || Retry.later ())); - check (fun ?timeoutf x -> + check (fun x -> Loc.get_as (fun x -> if not x then Retry.later ()) x); + check (fun x -> Loc.update x (fun x -> x || Retry.later ()) |> ignore); + check (fun x -> Loc.modify x (fun x -> x || Retry.later ())); + check (fun x -> let y = Loc.make false in let tx ~xt = if not (Xt.get ~xt x) then Retry.later (); Xt.swap ~xt x y in - Xt.commit ?timeoutf { tx }); - check (fun ?timeoutf x -> + Xt.commit { tx }); + check (fun x -> let y = Loc.make false in let tx ~xt = if not (Xt.get ~xt x) then Retry.invalid (); Xt.swap ~xt x y in - Xt.commit ?timeoutf { tx }); + Xt.commit { tx }); ] (* *) diff --git a/test/kcas/threads.ml b/test/kcas/threads.ml index 5cbcc60c..5873a7ad 100644 --- a/test/kcas/threads.ml +++ b/test/kcas/threads.ml @@ -1,12 +1,14 @@ open Kcas let await_between_threads () = + Scheduler.run @@ fun () -> let x = Loc.make 0 in let y = Loc.make 0 in let a_thread = () |> Thread.create @@ fun () -> + Scheduler.run @@ fun () -> Loc.get_as (fun x -> Retry.unless (x <> 0)) x; Loc.set y 22 in diff --git a/test/kcas_data/accumulator_test_stm.ml b/test/kcas_data/accumulator_test_stm.ml index 0565a9b0..1ac97a71 100644 --- a/test/kcas_data/accumulator_test_stm.ml +++ b/test/kcas_data/accumulator_test_stm.ml @@ -48,6 +48,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Accumulator" (module Spec) - |> exit +let () = Stm_run.run ~name:"Accumulator" (module Spec) |> exit diff --git a/test/kcas_data/dllist_test_stm.ml b/test/kcas_data/dllist_test_stm.ml index c0dd7a84..669ce820 100644 --- a/test/kcas_data/dllist_test_stm.ml +++ b/test/kcas_data/dllist_test_stm.ml @@ -83,5 +83,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Dllist" (module Spec) |> exit +let () = Stm_run.run ~name:"Dllist" (module Spec) |> exit diff --git a/test/kcas_data/dune b/test/kcas_data/dune index 1047e5a0..61167ee2 100644 --- a/test/kcas_data/dune +++ b/test/kcas_data/dune @@ -14,6 +14,8 @@ stack_test_stm xt_test) (libraries + scheduler + picos.structured alcotest kcas kcas_data diff --git a/test/kcas_data/hashtbl_test_stm.ml b/test/kcas_data/hashtbl_test_stm.ml index f2cf2910..ff5a15a7 100644 --- a/test/kcas_data/hashtbl_test_stm.ml +++ b/test/kcas_data/hashtbl_test_stm.ml @@ -62,5 +62,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Hashtbl" (module Spec) |> exit +let () = Stm_run.run ~name:"Hashtbl" (module Spec) |> exit diff --git a/test/kcas_data/linearizable_chaining_example.ml b/test/kcas_data/linearizable_chaining_example.ml index 58614b86..b3063af6 100644 --- a/test/kcas_data/linearizable_chaining_example.ml +++ b/test/kcas_data/linearizable_chaining_example.ml @@ -216,6 +216,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Hashtbl_with_order" (module Spec) - |> exit +let () = Stm_run.run ~name:"Hashtbl_with_order" (module Spec) |> exit diff --git a/test/kcas_data/lru_cache.ml b/test/kcas_data/lru_cache.ml index 110ca832..c5271c07 100644 --- a/test/kcas_data/lru_cache.ml +++ b/test/kcas_data/lru_cache.ml @@ -62,8 +62,5 @@ end let capacity_of c = Kcas.Xt.commit { tx = Xt.capacity_of c } let set_capacity c n = Kcas.Xt.commit { tx = Xt.set_capacity c n } let get_opt c k = Kcas.Xt.commit { tx = Xt.get_opt c k } - -let set_blocking ?timeoutf c k v = - Kcas.Xt.commit ?timeoutf { tx = Xt.set_blocking c k v } - +let set_blocking c k v = Kcas.Xt.commit { tx = Xt.set_blocking c k v } let remove c k = Kcas.Xt.commit { tx = Xt.remove c k } diff --git a/test/kcas_data/lru_cache.mli b/test/kcas_data/lru_cache.mli index b32b1aa3..2162d524 100644 --- a/test/kcas_data/lru_cache.mli +++ b/test/kcas_data/lru_cache.mli @@ -9,10 +9,8 @@ module Xt : Lru_cache_intf.Ops with type ('k, 'v) t := ('k, 'v) t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn include Lru_cache_intf.Ops with type ('k, 'v) t := ('k, 'v) t with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn diff --git a/test/kcas_data/lru_cache_example.ml b/test/kcas_data/lru_cache_example.ml index 0df2ee29..709bf74f 100644 --- a/test/kcas_data/lru_cache_example.ml +++ b/test/kcas_data/lru_cache_example.ml @@ -1,3 +1,4 @@ +open Picos_structured open Kcas module Lru_cache = struct @@ -19,21 +20,23 @@ module Lru_cache = struct | exception Retry.Later -> false end - let get ?timeoutf c k = Kcas.Xt.commit ?timeoutf { tx = Xt.get c k } - let get_if ?timeoutf c k p = Kcas.Xt.commit ?timeoutf { tx = Xt.get_if c k p } + let get c k = Kcas.Xt.commit { tx = Xt.get c k } + let get_if c k p = Kcas.Xt.commit { tx = Xt.get_if c k p } let try_set c k d = Kcas.Xt.commit { tx = Xt.try_set c k d } end let () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let c = Lru_cache.create 10 in - let domain = - Domain.spawn @@ fun () -> + let answer = + Flock.fork_as_promise @@ fun () -> let tx ~xt = Lru_cache.Xt.get ~xt c "a" + Lru_cache.Xt.get ~xt c "b" in Xt.commit { tx } in Lru_cache.set_blocking c "b" 30; Lru_cache.set_blocking c "a" 12; - assert (Domain.join domain = 42); + assert (Promise.await answer = 42); () let () = diff --git a/test/kcas_data/lru_cache_intf.ml b/test/kcas_data/lru_cache_intf.ml index 79a8063e..11ef488d 100644 --- a/test/kcas_data/lru_cache_intf.ml +++ b/test/kcas_data/lru_cache_intf.ml @@ -1,11 +1,10 @@ module type Ops = sig type ('k, 'v) t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val capacity_of : ('x, ('k, 'v) t -> int) fn val set_capacity : ('x, ('k, 'v) t -> int -> unit) fn val get_opt : ('x, ('k, 'v) t -> 'k -> 'v option) fn - val set_blocking : ('x, ('k, 'v) t -> 'k -> 'v -> unit) blocking_fn + val set_blocking : ('x, ('k, 'v) t -> 'k -> 'v -> unit) fn val remove : ('x, ('k, 'v) t -> 'k -> unit) fn end diff --git a/test/kcas_data/mvar_test.ml b/test/kcas_data/mvar_test.ml index f05872a4..07aecbf9 100644 --- a/test/kcas_data/mvar_test.ml +++ b/test/kcas_data/mvar_test.ml @@ -1,7 +1,10 @@ +open Picos_structured open Kcas open Kcas_data let basics () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let mv = Mvar.create (Some 101) in assert (not (Mvar.is_empty mv)); assert (Mvar.take mv = 101); @@ -9,14 +12,13 @@ let basics () = assert (Mvar.take_opt mv = None); Mvar.put mv 42; let running = Mvar.create None in - let d = - Domain.spawn @@ fun () -> + begin + Flock.fork @@ fun () -> Mvar.put running (); Xt.commit { tx = Mvar.Xt.put mv 76 } - in + end; assert (Mvar.take running = ()); assert (Xt.commit { tx = Mvar.Xt.take mv } = 42); - Domain.join d; assert (Mvar.take mv = 76) let () = diff --git a/test/kcas_data/queue_test_stm.ml b/test/kcas_data/queue_test_stm.ml index d3b0174e..63a166c6 100644 --- a/test/kcas_data/queue_test_stm.ml +++ b/test/kcas_data/queue_test_stm.ml @@ -64,5 +64,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Queue" (module Spec) |> exit +let () = Stm_run.run ~name:"Queue" (module Spec) |> exit diff --git a/test/kcas_data/stack_test_stm.ml b/test/kcas_data/stack_test_stm.ml index 6dd8641a..0e3d62eb 100644 --- a/test/kcas_data/stack_test_stm.ml +++ b/test/kcas_data/stack_test_stm.ml @@ -54,5 +54,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Stack" (module Spec) |> exit +let () = Stm_run.run ~name:"Stack" (module Spec) |> exit diff --git a/test/lib/scheduler/dune b/test/lib/scheduler/dune new file mode 100644 index 00000000..29be77cf --- /dev/null +++ b/test/lib/scheduler/dune @@ -0,0 +1,9 @@ +(library + (name scheduler) + (libraries + picos.select + (select + scheduler.ml + from + (picos.randos -> scheduler.ocaml5.ml) + (picos.threaded -> scheduler.ocaml4.ml)))) diff --git a/test/lib/scheduler/scheduler.ocaml4.ml b/test/lib/scheduler/scheduler.ocaml4.ml new file mode 100644 index 00000000..9ddf31b6 --- /dev/null +++ b/test/lib/scheduler/scheduler.ocaml4.ml @@ -0,0 +1 @@ +let run ?n_domains:_ main = Picos_threaded.run main diff --git a/test/lib/scheduler/scheduler.ocaml5.ml b/test/lib/scheduler/scheduler.ocaml5.ml new file mode 100644 index 00000000..ea7bb262 --- /dev/null +++ b/test/lib/scheduler/scheduler.ocaml5.ml @@ -0,0 +1,2 @@ +let () = Picos_select.configure () +let run ?(n_domains = 1) main = Picos_randos.run_on ~n_domains main diff --git a/test/kcas_data/stm_run/dune b/test/lib/stm_run/dune similarity index 100% rename from test/kcas_data/stm_run/dune rename to test/lib/stm_run/dune diff --git a/test/kcas_data/stm_run/empty.ocaml4.ml b/test/lib/stm_run/empty.ocaml4.ml similarity index 100% rename from test/kcas_data/stm_run/empty.ocaml4.ml rename to test/lib/stm_run/empty.ocaml4.ml diff --git a/test/kcas_data/stm_run/empty.ocaml5.ml b/test/lib/stm_run/empty.ocaml5.ml similarity index 100% rename from test/kcas_data/stm_run/empty.ocaml5.ml rename to test/lib/stm_run/empty.ocaml5.ml diff --git a/test/kcas_data/stm_run/intf.ml b/test/lib/stm_run/intf.ml similarity index 96% rename from test/kcas_data/stm_run/intf.ml rename to test/lib/stm_run/intf.ml index fd300e7c..dfbde501 100644 --- a/test/kcas_data/stm_run/intf.ml +++ b/test/lib/stm_run/intf.ml @@ -43,3 +43,6 @@ module type STM_domain = sig val agree_test_par_asym : count:int -> name:string -> QCheck.Test.t val neg_agree_test_par_asym : count:int -> name:string -> QCheck.Test.t end + +let default_count = 5_000 +let default_budgetf = 120.0 diff --git a/test/kcas_data/stm_run/stm_run.ocaml4.ml b/test/lib/stm_run/stm_run.ocaml4.ml similarity index 64% rename from test/kcas_data/stm_run/stm_run.ocaml4.ml rename to test/lib/stm_run/stm_run.ocaml4.ml index 7b8d851b..655fd3ba 100644 --- a/test/kcas_data/stm_run/stm_run.ocaml4.ml +++ b/test/lib/stm_run/stm_run.ocaml4.ml @@ -1,11 +1,7 @@ include Intf -let count = - let factor b = if b then 10 else 1 in - factor (64 <= Sys.word_size) * factor (Sys.backend_type = Native) * 10 - -let run ?(verbose = true) ?(count = count) ?(budgetf = 60.0) ~name ?make_domain - (module Spec : STM.Spec) = +let run ?(verbose = true) ?(count = default_count) ?(budgetf = default_budgetf) + ~name ?make_domain (module Spec : STM.Spec) = let module Seq = STM_sequential.Make (Spec) in let module Con = STM_thread.Make (Spec) [@alert "-experimental"] in Util.run_with_budget ~budgetf ~count @@ fun count -> diff --git a/test/kcas_data/stm_run/stm_run.ocaml5.ml b/test/lib/stm_run/stm_run.ocaml5.ml similarity index 73% rename from test/kcas_data/stm_run/stm_run.ocaml5.ml rename to test/lib/stm_run/stm_run.ocaml5.ml index cece0a71..67af5c41 100644 --- a/test/kcas_data/stm_run/stm_run.ocaml5.ml +++ b/test/lib/stm_run/stm_run.ocaml5.ml @@ -1,13 +1,7 @@ include Intf -let count = - let factor b = if b then 10 else 1 in - factor (64 <= Sys.word_size) - * factor (Sys.backend_type = Native) - * factor (1 < Domain.recommended_domain_count ()) - -let run (type cmd state sut) ?(verbose = true) ?(count = count) - ?(budgetf = 60.0) ~name ?make_domain +let run (type cmd state sut) ?(verbose = true) ?(count = default_count) + ?(budgetf = default_budgetf) ~name ?make_domain (module Spec : STM.Spec with type cmd = cmd and type state = state diff --git a/test/kcas_data/stm_run/util.ml b/test/lib/stm_run/util.ml similarity index 100% rename from test/kcas_data/stm_run/util.ml rename to test/lib/stm_run/util.ml