diff --git a/bench/bench_mvar.ml b/bench/bench_mvar.ml index 2c3988b5..6957a9c2 100644 --- a/bench/bench_mvar.ml +++ b/bench/bench_mvar.ml @@ -11,6 +11,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in let init _ = () in + let wrap _ () action = Scheduler.run action in let work i () = if i < n_adders then if blocking_add then @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work ~after () + Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_queue.ml b/bench/bench_queue.ml index ebcc9d07..0ea3ab90 100644 --- a/bench/bench_queue.ml +++ b/bench/bench_queue.ml @@ -29,6 +29,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ () action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_stack.ml b/bench/bench_stack.ml index 4886702b..94a835a9 100644 --- a/bench/bench_stack.ml +++ b/bench/bench_stack.ml @@ -29,6 +29,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ () action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/dune b/bench/dune index cd97da36..279eb529 100644 --- a/bench/dune +++ b/bench/dune @@ -14,6 +14,7 @@ let () = (action (run %{test} -brief)) (libraries + scheduler kcas_data multicore-bench backoff diff --git a/doc/dune b/doc/dune index 4bfc69d7..0694d733 100644 --- a/doc/dune +++ b/doc/dune @@ -1,8 +1,8 @@ -(mdx - (package kcas_data) - (deps - (package kcas) - (package kcas_data)) - (enabled_if - (>= %{ocaml_version} 5.0.0)) - (files gkmz-with-read-only-cmp-ops.md scheduler-interop.md)) +;(mdx +; (package kcas_data) +; (deps +; (package kcas) +; (package kcas_data)) +; (enabled_if +; (>= %{ocaml_version} 5.0.0)) +; (files gkmz-with-read-only-cmp-ops.md scheduler-interop.md)) diff --git a/dune b/dune index 75232a0a..abdfa8cf 100644 --- a/dune +++ b/dune @@ -1,7 +1,7 @@ -(mdx - (package kcas_data) - (deps - (package kcas) - (package kcas_data)) - (libraries domain_shims) - (files README.md)) +;(mdx +; (package kcas_data) +; (deps +; (package kcas) +; (package kcas_data)) +; (libraries domain_shims) +; (files README.md)) diff --git a/dune-project b/dune-project index 96c1e31d..b0d3190f 100644 --- a/dune-project +++ b/dune-project @@ -34,10 +34,8 @@ (>= 4.13.0)) (backoff (>= 0.1.0)) - (domain-local-await - (>= 1.0.1)) - (domain-local-timeout - (>= 1.0.1)) + (picos + (>= 0.5.0)) (multicore-magic (>= 2.3.0)) (domain_shims @@ -84,9 +82,9 @@ (and (>= 0.1.0) :with-test)) - (domain-local-await + (picos (and - (>= 1.0.1) + (>= 0.5.0) :with-test)) (domain_shims (and diff --git a/kcas.opam b/kcas.opam index 33c54daf..07c00bf3 100644 --- a/kcas.opam +++ b/kcas.opam @@ -19,8 +19,7 @@ depends: [ "dune" {>= "3.14"} "ocaml" {>= "4.13.0"} "backoff" {>= "0.1.0"} - "domain-local-await" {>= "1.0.1"} - "domain-local-timeout" {>= "1.0.1"} + "picos" {>= "0.5.0"} "multicore-magic" {>= "2.3.0"} "domain_shims" {>= "0.1.0" & with-test} "alcotest" {>= "1.8.0" & with-test} @@ -46,3 +45,6 @@ build: [ ] dev-repo: "git+https://github.com/ocaml-multicore/kcas.git" doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "picos.dev" "git+https://github.com/ocaml-multicore/picos#53212d0164b32a034ef03f8cb60767fac4ae8029" ] +] diff --git a/kcas.opam.template b/kcas.opam.template index 0fd71d5e..ba960c08 100644 --- a/kcas.opam.template +++ b/kcas.opam.template @@ -1 +1,4 @@ doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "picos.dev" "git+https://github.com/ocaml-multicore/picos#53212d0164b32a034ef03f8cb60767fac4ae8029" ] +] diff --git a/kcas_data.opam b/kcas_data.opam index 057d28dd..a2bd733a 100644 --- a/kcas_data.opam +++ b/kcas_data.opam @@ -20,7 +20,7 @@ depends: [ "kcas" {= version} "multicore-magic" {>= "2.3.0"} "backoff" {>= "0.1.0" & with-test} - "domain-local-await" {>= "1.0.1" & with-test} + "picos" {>= "0.5.0" & with-test} "domain_shims" {>= "0.1.0" & with-test} "multicore-bench" {>= "0.1.4" & with-test} "alcotest" {>= "1.8.0" & with-test} diff --git a/src/kcas/dune b/src/kcas/dune index 70b19af5..5a83de7a 100644 --- a/src/kcas/dune +++ b/src/kcas/dune @@ -1,11 +1,11 @@ (library (name kcas) (public_name kcas) - (libraries domain-local-await domain-local-timeout backoff multicore-magic)) + (libraries picos backoff multicore-magic)) -(mdx - (package kcas) - (deps - (package kcas)) - (libraries kcas backoff domain_shims) - (files kcas.mli)) +;(mdx +; (package kcas) +; (deps +; (package kcas)) +; (libraries kcas backoff domain_shims) +; (files kcas.mli)) diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index 5bded5f4..ea427b86 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -3,6 +3,8 @@ * Copyright (c) 2023, Vesa Karvonen *) +open Picos + (** Work around CSE bug in OCaml 5-5.1. *) let[@inline] atomic_get x = Atomic.get ((* Prevents CSE *) Sys.opaque_identity x) @@ -22,82 +24,6 @@ let[@inline] fenceless_get x = let fenceless_get = atomic_get *) -module Timeout = struct - exception Timeout - - let[@inline never] timeout () = raise Timeout - - type _ t = - | Unset : [> `Unset ] t - | Elapsed : [> `Elapsed ] t - | Call : (unit -> unit) -> [> `Call ] t - | Set : { mutable state : [< `Elapsed | `Call ] t } -> [> `Set ] t - - external as_atomic : [< `Set ] t -> [< `Elapsed | `Call ] t Atomic.t - = "%identity" - - (* Fenceless operations are safe here as the timeout state is not not visible - outside of the library and we don't always need the latest value and, when - we do, there is a fence after. *) - - let[@inline] check (t : [< `Set | `Unset ] t) = - match t with - | Unset -> () - | Set set_r -> - if fenceless_get (as_atomic (Set set_r)) == Elapsed then timeout () - - let set seconds (state : [< `Elapsed | `Call ] t Atomic.t) = - Domain_local_timeout.set_timeoutf seconds @@ fun () -> - match Atomic.exchange state Elapsed with - | Call release_or_cancel -> release_or_cancel () - | Elapsed -> () - - let call_id = Call Fun.id - - let[@inline never] alloc seconds = - let (Set set_r as t : [ `Set ] t) = Set { state = call_id } in - let cancel = set seconds (as_atomic t) in - if not (Atomic.compare_and_set (as_atomic t) call_id (Call cancel)) then - timeout (); - Set set_r - - let[@inline] alloc_opt = function - | None -> Unset - | Some seconds -> alloc seconds - - let[@inline never] await (state : [< `Elapsed | `Call ] t Atomic.t) release = - match fenceless_get state with - | Call cancel as alive -> - if Atomic.compare_and_set state alive (Call release) then Call cancel - else timeout () - | Elapsed -> timeout () - - let[@inline] await (t : [ `Unset | `Set ] t) release = - match t with Unset -> Unset | Set r -> await (as_atomic (Set r)) release - - let[@inline never] unawait (state : [< `Elapsed | `Call ] t Atomic.t) alive = - match fenceless_get state with - | Call _ as await -> - if not (Atomic.compare_and_set state await alive) then timeout () - | Elapsed -> timeout () - - let[@inline] unawait t alive = - match (t, alive) with - | Set set_r, Call call_r -> unawait (as_atomic (Set set_r)) (Call call_r) - | _ -> () - - let[@inline] cancel_alive (alive : [< `Unset | `Call ] t) = - match alive with Unset -> () | Call cancel -> cancel () - - let[@inline] cancel (t : [< `Set | `Unset ] t) = - match t with - | Unset -> () - | Set set_r -> ( - match fenceless_get (as_atomic (Set set_r)) with - | Elapsed -> () - | Call cancel -> cancel ()) -end - module Id = struct let neg_id = Atomic.make (-1) let[@inline] neg_ids n = Atomic.fetch_and_add neg_id (-n) @@ -128,14 +54,12 @@ end = struct x end -type awaiter = unit -> unit - -let[@inline] resume_awaiter awaiter = awaiter () +type awaiter = Trigger.t let[@inline] resume_awaiters = function | [] -> () - | [ awaiter ] -> resume_awaiter awaiter - | awaiters -> List.iter resume_awaiter awaiters + | [ awaiter ] -> Trigger.signal awaiter + | awaiters -> List.iter Trigger.signal awaiters module Mode = struct type t = [ `Lock_free | `Obstruction_free ] @@ -166,7 +90,7 @@ and _ tdt = This field must be first, see [root_as_atomic] and [tree_as_ref]. *) - timeout : [ `Set | `Unset ] Timeout.t; + mutable fiber : Fiber.Maybe.t; mutable mode : Mode.t; mutable validate_counter : int; mutable post_commit : Action.t; @@ -467,67 +391,49 @@ let rec remove_awaiter backoff loc before awaiter = if not (Atomic.compare_and_set (as_atomic loc) state_old state_new) then remove_awaiter (Backoff.once backoff) loc before awaiter -let block timeout loc before = - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await timeout t.release in - if add_awaiter loc before t.release then begin - try t.await () - with cancellation_exn -> - remove_awaiter Backoff.default loc before t.release; - Timeout.cancel_alive alive; - raise cancellation_exn - end; - Timeout.unawait timeout alive +let block loc before = + let t = Trigger.create () in + if add_awaiter loc before t then + match Trigger.await t with + | None -> () + | Some exn_bt -> + remove_awaiter Backoff.default loc before t; + Exn_bt.raise exn_bt -let rec update_no_alloc timeout backoff loc state f = +let rec update_no_alloc backoff loc state f = (* Fenceless is safe as we have had a fence before if needed and there is a fence after. *) let state_old = fenceless_get (as_atomic loc) in let before = eval state_old in match f before with | after -> - if before == after then begin - Timeout.cancel timeout; - before - end + if before == after then before else begin state.after <- after; if Atomic.compare_and_set (as_atomic loc) state_old state then begin resume_awaiters state_old.awaiters; - Timeout.cancel timeout; before end - else update_no_alloc timeout (Backoff.once backoff) loc state f + else update_no_alloc (Backoff.once backoff) loc state f end | exception Retry.Later -> - block timeout loc before; - update_no_alloc timeout backoff loc state f - | exception exn -> - Timeout.cancel timeout; - raise exn + block loc before; + update_no_alloc backoff loc state f -let update_with_state timeout backoff loc f state_old = +let update_with_state backoff loc f state_old = let before = eval state_old in match f before with | after -> - if before == after then begin - Timeout.cancel timeout; - before - end + if before == after then before else let state = new_state after in if Atomic.compare_and_set (as_atomic loc) state_old state then begin resume_awaiters state_old.awaiters; - Timeout.cancel timeout; before end - else update_no_alloc timeout (Backoff.once backoff) loc state f + else update_no_alloc (Backoff.once backoff) loc state f | exception Retry.Later -> let state = new_state before in - block timeout loc before; - update_no_alloc timeout backoff loc state f - | exception exn -> - Timeout.cancel timeout; - raise exn + update_no_alloc backoff loc state f let rec exchange_no_alloc backoff loc state = let state_old = atomic_get (as_atomic loc) in @@ -584,25 +490,15 @@ module Loc = struct let[@inline] get_id loc = (to_loc loc).id let get loc = eval (atomic_get (as_atomic (to_loc loc))) - let rec get_as timeout f loc state = + let rec get_as f loc state = let before = eval state in - match f before with - | value -> - Timeout.cancel timeout; - value - | exception Retry.Later -> - block timeout (to_loc loc) before; - (* Fenceless is safe as there was already a fence before. *) - get_as timeout f loc (fenceless_get (as_atomic (to_loc loc))) - | exception exn -> - Timeout.cancel timeout; - raise exn - - let[@inline] get_as ?timeoutf f loc = - get_as - (Timeout.alloc_opt timeoutf) - f loc - (atomic_get (as_atomic (to_loc loc))) + try f before + with Retry.Later -> + block (to_loc loc) before; + (* Fenceless is safe as there was already a fence before. *) + get_as f loc (fenceless_get (as_atomic (to_loc loc))) + + let[@inline] get_as f loc = get_as f loc (atomic_get (as_atomic (to_loc loc))) let[@inline] get_mode loc = if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free @@ -612,21 +508,18 @@ module Loc = struct let state_old = atomic_get (as_atomic (to_loc loc)) in cas_with_state backoff (to_loc loc) before state state_old - let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in - update_with_state timeout backoff (to_loc loc) f + let fenceless_update ?(backoff = Backoff.default) loc f = + update_with_state backoff (to_loc loc) f (fenceless_get (as_atomic (to_loc loc))) - let[@inline] fenceless_modify ?timeoutf ?backoff loc f = - fenceless_update ?timeoutf ?backoff loc f |> ignore + let[@inline] fenceless_modify ?backoff loc f = + fenceless_update ?backoff loc f |> ignore - let update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in - update_with_state timeout backoff (to_loc loc) f + let update ?(backoff = Backoff.default) loc f = + update_with_state backoff (to_loc loc) f (atomic_get (as_atomic (to_loc loc))) - let[@inline] modify ?timeoutf ?backoff loc f = - update ?timeoutf ?backoff loc f |> ignore + let[@inline] modify ?backoff loc f = update ?backoff loc f |> ignore let exchange ?(backoff = Backoff.default) loc value = exchange_no_alloc backoff (to_loc loc) (new_state value) @@ -711,6 +604,17 @@ module Xt = struct tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); before + let check (Xt xt_r : _ t) = + match Fiber.Maybe.to_fiber_or_current xt_r.fiber with + | fiber -> + Fiber.check fiber; + if xt_r.fiber != Fiber.Maybe.of_fiber fiber then + xt_r.fiber <- Fiber.Maybe.of_fiber fiber + | exception _ -> + (* Getting the current fiber should never fail except when not running + inside a scheduler. *) + () + let update_old : type c a. _ -> a loc -> c -> (c, a) up -> _ -> _ -> _ -> a = fun (Xt xt_r as xt : _ t) loc c up lt gt state' -> let c0 = xt_r.validate_counter in @@ -721,7 +625,7 @@ module Xt = struct The assumption is that potentially infinite loops will repeatedly access the same locations. *) if c0 land c1 = 0 then begin - Timeout.check xt_r.timeout; + check xt; validate_all_rec xt !(tree_as_ref xt) end; let state : a state = Obj.magic state' in @@ -929,17 +833,13 @@ module Xt = struct let initial_validate_period = 4 - let success (Xt xt_r : _ t) result = - Timeout.cancel xt_r.timeout; - Action.run xt_r.post_commit result - let rec commit backoff (Xt xt_r as xt : _ t) tx = match tx ~xt with | result -> begin match !(tree_as_ref xt) with - | T Leaf -> success xt result + | T Leaf -> Action.run xt_r.post_commit result | T (Node { loc; state; lt = T Leaf; gt = T Leaf; _ }) -> - if is_cmp xt state then success xt result + if is_cmp xt state then Action.run xt_r.post_commit result else begin state.which <- W After; let before = state.before in @@ -948,7 +848,7 @@ module Xt = struct fence. *) let state_old = fenceless_get (as_atomic loc) in if cas_with_state Backoff.default loc before state state_old then - success xt result + Action.run xt_r.post_commit result else commit_once_reuse backoff xt tx end | T (Node node_r) -> begin @@ -956,22 +856,22 @@ module Xt = struct match determine xt 0 root with | status -> if a_cmp_followed_by_a_cas < status then begin - if finish xt root (verify xt root) then success xt result + if finish xt root (verify xt root) then + Action.run xt_r.post_commit result else begin - (* We switch to [`Lock_free] as there was - interference. *) + (* We switch to [`Lock_free] as there was interference. *) commit_once_alloc backoff `Lock_free xt tx end end else if a_cmp = status || finish xt root (if 0 <= status then After else Before) - then success xt result + then Action.run xt_r.post_commit result else commit_once_alloc backoff xt_r.mode xt tx | exception Exit -> (* Fenceless is safe as there was a fence before. *) if fenceless_get (root_as_atomic xt) == R After then - success xt result + Action.run xt_r.post_commit result else commit_once_alloc backoff xt_r.mode xt tx end end @@ -981,58 +881,52 @@ module Xt = struct | T Leaf -> invalid_retry () | T (Node node_r) -> begin let root = Node node_r in - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await xt_r.timeout t.release in - match add_awaiters t.release xt root with + let t = Trigger.create () in + match add_awaiters t xt root with | T Leaf -> begin - match t.await () with - | () -> - remove_awaiters t.release xt (T Leaf) root |> ignore; - Timeout.unawait xt_r.timeout alive; + match Trigger.await t with + | None -> + remove_awaiters t xt (T Leaf) root |> ignore; commit_reset_reuse backoff xt tx - | exception cancellation_exn -> - remove_awaiters t.release xt (T Leaf) root |> ignore; - Timeout.cancel_alive alive; - raise cancellation_exn + | Some exn_bt -> + remove_awaiters t xt (T Leaf) root |> ignore; + Exn_bt.raise exn_bt end | T (Node _) as stop -> - remove_awaiters t.release xt stop root |> ignore; - Timeout.unawait xt_r.timeout alive; + remove_awaiters t xt stop root |> ignore; commit_once_reuse backoff xt tx end end - | exception exn -> - Timeout.cancel xt_r.timeout; - raise exn and commit_once_reuse backoff xt tx = + check xt; commit_reuse (Backoff.once backoff) xt tx and commit_reset_reuse backoff xt tx = + check xt; commit_reuse (Backoff.reset backoff) xt tx and commit_reuse backoff (Xt xt_r as xt : _ t) tx = tree_as_ref xt := T Leaf; xt_r.validate_counter <- initial_validate_period; xt_r.post_commit <- Action.noop; - Timeout.check xt_r.timeout; commit backoff xt tx - and commit_once_alloc backoff mode (Xt xt_r : _ t) tx = + and commit_once_alloc backoff mode (Xt xt_r as xt : _ t) tx = + check xt; let backoff = Backoff.once backoff in - Timeout.check xt_r.timeout; let rot = U Leaf in let validate_counter = initial_validate_period in let post_commit = Action.noop in let xt = Xt { xt_r with rot; mode; validate_counter; post_commit } in commit backoff xt tx - let[@inline] commit ?timeoutf ?(backoff = Backoff.default) - ?(mode = `Obstruction_free) { tx } = - let timeout = Timeout.alloc_opt timeoutf + let[@inline] commit ?(backoff = Backoff.default) ?(mode = `Obstruction_free) + { tx } = + let fiber = Fiber.Maybe.nothing and rot = U Leaf and validate_counter = initial_validate_period and post_commit = Action.noop in - let xt = Xt { rot; timeout; mode; validate_counter; post_commit } in + let xt = Xt { rot; fiber; mode; validate_counter; post_commit } in commit backoff xt tx end diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 185a2c23..7f7212ca 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -22,7 +22,7 @@ obstruction-free} read-only compare (CMP) operations that can be performed on overlapping locations in parallel without interference. - - {b Blocking await}: The algorithm supports timeouts and awaiting for + - {b Blocking await}: The algorithm supports cancelation and awaiting for changes to any number of shared memory locations. - {b Composable}: Independently developed transactions can be composed with @@ -105,16 +105,9 @@ can skip over these. The documentation links back to these modules where appropriate. *) -(** Timeout support. *) -module Timeout : sig - exception Timeout - (** Exception that may be raised by operations such as {!Loc.get_as}, - {!Loc.update}, {!Loc.modify}, or {!Xt.commit} when given a [~timeoutf] in - seconds. *) -end - -(** Retry support. *) module Retry : sig + (** Retry support. *) + exception Later (** Exception that may be raised to signal that the operation, such as {!Loc.get_as}, {!Loc.update}, or {!Xt.commit}, should be retried, at some @@ -126,7 +119,7 @@ module Retry : sig shared memory locations have already changed. *) val later : unit -> 'a - (** [later ()] is equivalent to [raise Later]. *) + (** [later ()] is equivalent to [raise_notrace Later]. *) val unless : bool -> unit (** [unless condition] is equivalent to [if not condition then later ()]. *) @@ -137,11 +130,12 @@ module Retry : sig outside of the transaction, and the transaction should be retried. *) val invalid : unit -> 'a - (** [invalid ()] is equivalent to [raise Invalid]. *) + (** [invalid ()] is equivalent to [raise_notrace Invalid]. *) end -(** Operating modes of the [k-CAS-n-CMP] algorithm. *) module Mode : sig + (** Operating modes of the [k-CAS-n-CMP] algorithm. *) + type t = [ `Lock_free (** In [`Lock_free] mode the algorithm makes sure that at least one domain will @@ -164,18 +158,15 @@ end - [backoff] specifies the configuration for the [Backoff] mechanism. In special cases, having more detailed knowledge of the application, one - might adjust the configuration to improve performance. + might adjust the configuration to improve performance. *) - - [timeoutf] specifies a timeout in seconds and, if specified, the - {!Timeout.Timeout} exception may be raised by the operation to signal that - the timeout expired. *) +module Loc : sig + (** Shared memory locations. -(** Shared memory locations. + This module is essentially compatible with the [Stdlib.Atomic] module, + except that a number of functions take some optional arguments that one + usually need not worry about. *) - This module is essentially compatible with the [Stdlib.Atomic] module, - except that a number of functions take some optional arguments that one - usually need not worry about. *) -module Loc : sig (** Type of shared memory locations. *) type !'a t = private | Loc : { state : 'state; id : 'id } -> 'a t @@ -225,7 +216,7 @@ module Loc : sig val get : 'a t -> 'a (** [get r] reads the current value of the shared memory location [r]. *) - val get_as : ?timeoutf:float -> ('a -> 'b) -> 'a t -> 'b + val get_as : ('a -> 'b) -> 'a t -> 'b (** [get_as f loc] is equivalent to [f (get loc)]. The given function [f] may raise the {!Retry.Later} exception to signal that the conditional load should be retried only after the location has been modified outside of the @@ -237,7 +228,7 @@ module Loc : sig location [r] to the [after] value if the current value of [r] is the [before] value. *) - val update : ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [update r f] repeats [let b = get r in compare_and_set r b (f b)] until it succeeds and then returns the [b] value. The given function [f] may raise the {!Retry.Later} exception to signal that the update should only be @@ -245,8 +236,7 @@ module Loc : sig also safe for the given function [f] to raise any other exception to abort the update. *) - val modify : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [modify r f] is equivalent to [update r f |> ignore]. *) val exchange : ?backoff:Backoff.t -> 'a t -> 'a -> 'a @@ -277,13 +267,11 @@ module Loc : sig (** [fenceless_get r] is like [get r] except that [fenceless_get]s may be reordered. *) - val fenceless_update : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val fenceless_update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [fenceless_update r f] is like [update r f] except that in case [f x == x] the update may be reordered. *) - val fenceless_modify : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val fenceless_modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [fenceless_modify r f] is like [modify r f] except that in case [f x == x] the modify may be reordered. *) end @@ -318,77 +306,80 @@ end memory locations are atomically marked as having taken effect and subsequent reads of the locations will be able to see the newly written values. *) -(** Explicit transaction log passing on shared memory locations. - - This module provides a way to implement composable transactions over shared - memory locations. A transaction is a function written by the library user - and can be thought of as a specification of a sequence of {!Xt.get} and - {!Xt.set} accesses to shared memory locations. To actually perform the - accesses one then {!Xt.commit}s the transaction. - - Transactions should generally not perform arbitrary side-effects, because - when a transaction is committed it may be attempted multiple times meaning - that the side-effects are also performed multiple times. {!Xt.post_commit} - can be used to perform an action only once after the transaction has been - committed succesfully. - - {b WARNING}: To make it clear, the operations provided by the {!Loc} module - for accessing individual shared memory locations do not implicitly go - through the transaction mechanism and should generally not be used within - transactions. There are advanced algorithms where one might, within a - transaction, perform operations that do not get recorded into the - transaction log. Using such techniques correctly requires expert knowledge - and is not recommended for casual users. - - As an example, consider an implementation of doubly-linked circular - lists. Instead of using a mutable field, [ref], or [Atomic.t], one would use - a shared memory location, or {!Loc.t}, for the pointers in the node type: - - {[ - type 'a node = { - succ: 'a node Loc.t; - pred: 'a node Loc.t; - datum: 'a; - } - ]} - - To remove a node safely one wants to atomically update the [succ] and [pred] - pointers of the predecessor and successor nodes and to also update the - [succ] and [pred] pointers of a node to point to the node itself, so that - removal becomes an {{:https://en.wikipedia.org/wiki/Idempotence} idempotent} - operation. Using explicit transaction log passing one could implement the - [remove] operation as follows: - - {[ - let remove ~xt node = - (* Read pointers to the predecessor and successor nodes: *) - let pred = Xt.get ~xt node.pred in - let succ = Xt.get ~xt node.succ in - (* Update pointers in this node: *) - Xt.set ~xt node.succ node; - Xt.set ~xt node.pred node; - (* Update pointers to this node: *) - Xt.set ~xt pred.succ succ; - Xt.set ~xt succ.pred pred - ]} +module Xt : sig + (** Explicit transaction log passing on shared memory locations. + + This module provides a way to implement composable transactions over + shared memory locations. A transaction is a function written by the + library user and can be thought of as a specification of a sequence of + {!Xt.get} and {!Xt.set} accesses to shared memory locations. To actually + perform the accesses one then {!Xt.commit}s the transaction. + + Transactions should generally not perform arbitrary side-effects, because + when a transaction is committed it may be attempted multiple times meaning + that the side-effects are also performed multiple times. + {!Xt.post_commit} can be used to perform an action only once after the + transaction has been committed succesfully. + + {b WARNING}: To make it clear, the operations provided by the {!Loc} + module for accessing individual shared memory locations do not implicitly + go through the transaction mechanism and should generally not be used + within transactions. There are advanced algorithms where one might, + within a transaction, perform operations that do not get recorded into the + transaction log. Using such techniques correctly requires expert + knowledge and is not recommended for casual users. + + As an example, consider an implementation of doubly-linked circular + lists. Instead of using a mutable field, [ref], or [Atomic.t], one would + use a shared memory location, or {!Loc.t}, for the pointers in the node + type: + + {[ + type 'a node = { + succ: 'a node Loc.t; + pred: 'a node Loc.t; + datum: 'a; + } + ]} + + To remove a node safely one wants to atomically update the [succ] and + [pred] pointers of the predecessor and successor nodes and to also update + the [succ] and [pred] pointers of a node to point to the node itself, so + that removal becomes an {{:https://en.wikipedia.org/wiki/Idempotence} + idempotent} operation. Using explicit transaction log passing one could + implement the [remove] operation as follows: + + {[ + let remove ~xt node = + (* Read pointers to the predecessor and successor nodes: *) + let pred = Xt.get ~xt node.pred in + let succ = Xt.get ~xt node.succ in + (* Update pointers in this node: *) + Xt.set ~xt node.succ node; + Xt.set ~xt node.pred node; + (* Update pointers to this node: *) + Xt.set ~xt pred.succ succ; + Xt.set ~xt succ.pred pred + ]} + + The labeled argument, [~xt], refers to the transaction log. Transactional + operations like {!Xt.get} and {!Xt.set} are then recorded in that log. To + actually remove a node, we need to commit the transaction - The labeled argument, [~xt], refers to the transaction log. Transactional - operations like {!Xt.get} and {!Xt.set} are then recorded in that log. To - actually remove a node, we need to commit the transaction + {@ocaml skip[ + Xt.commit { tx = remove node } + ]} - {@ocaml skip[ - Xt.commit { tx = remove node } - ]} + which repeatedly calls the transaction function, [tx], to record a + transaction log and attempts to atomically perform it until it succeeds. - which repeatedly calls the transaction function, [tx], to record a - transaction log and attempts to atomically perform it until it succeeds. + Notice that [remove] is not recursive. It doesn't have to account for + failure or perform a backoff. It is also not necessary to know or keep + track of what the previous values of locations were. All of that is taken + care of for us by the transaction log and the {!Xt.commit} function. + Furthermore, [remove] can easily be called as a part of a more complex + transaction. *) - Notice that [remove] is not recursive. It doesn't have to account for - failure or perform a backoff. It is also not necessary to know or keep track - of what the previous values of locations were. All of that is taken care of - for us by the transaction log and the {!Xt.commit} function. Furthermore, - [remove] can easily be called as a part of a more complex transaction. *) -module Xt : sig type 'x t (** Type of an explicit transaction log on shared memory locations. @@ -548,8 +539,7 @@ module Xt : sig val call : xt:'x t -> 'a tx -> 'a (** [call ~xt tx] is equivalent to [tx.Xt.tx ~xt]. *) - val commit : - ?timeoutf:float -> ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a + val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a (** [commit tx] repeatedly calls [tx] to record a log of shared memory accesses and attempts to perform them atomically until it succeeds and then returns whatever [tx] returned. [tx] may raise {!Retry.Later} or diff --git a/src/kcas_data/dllist.ml b/src/kcas_data/dllist.ml index 4b93cd48..7be748a1 100644 --- a/src/kcas_data/dllist.ml +++ b/src/kcas_data/dllist.ml @@ -225,13 +225,8 @@ let move_l node list = Kcas.Xt.commit { tx = Xt.move_l node list } let move_r node list = Kcas.Xt.commit { tx = Xt.move_r node list } let take_opt_l list = Kcas.Xt.commit { tx = Xt.take_opt_l list } let take_opt_r list = Kcas.Xt.commit { tx = Xt.take_opt_r list } - -let take_blocking_l ?timeoutf list = - Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking_l list } - -let take_blocking_r ?timeoutf list = - Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking_r list } - +let take_blocking_l list = Kcas.Xt.commit { tx = Xt.take_blocking_l list } +let take_blocking_r list = Kcas.Xt.commit { tx = Xt.take_blocking_r list } let swap t1 t2 = Kcas.Xt.commit { tx = Xt.swap t1 t2 } let transfer_l t1 t2 = Kcas.Xt.commit { tx = Xt.transfer_l t1 t2 } let transfer_r t1 t2 = Kcas.Xt.commit { tx = Xt.transfer_r t1 t2 } diff --git a/src/kcas_data/dllist.mli b/src/kcas_data/dllist.mli index de21e093..5f0163b2 100644 --- a/src/kcas_data/dllist.mli +++ b/src/kcas_data/dllist.mli @@ -59,7 +59,6 @@ module Xt : with type 'a t := 'a t with type 'a node := 'a node with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on doubly-linked lists. *) (** {1 Non-compositional interface} *) @@ -69,7 +68,6 @@ include with type 'a t := 'a t with type 'a node := 'a node with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn val take_all : 'a t -> 'a t (** [take_all l] removes all nodes of the doubly-linked list [l] and returns a diff --git a/src/kcas_data/dllist_intf.ml b/src/kcas_data/dllist_intf.ml index 738fda2c..406a9f94 100644 --- a/src/kcas_data/dllist_intf.ml +++ b/src/kcas_data/dllist_intf.ml @@ -2,7 +2,6 @@ module type Ops = sig type 'a t type 'a node type ('x, 'fn) fn - type ('x, 'fn) blocking_fn (** {2 Operations on nodes} *) @@ -42,12 +41,12 @@ module type Ops = sig (** [take_opt_r l] removes and returns the value of rightmost node of the doubly-linked list [l], or return [None] if the list is empty. *) - val take_blocking_l : ('x, 'a t -> 'a) blocking_fn + val take_blocking_l : ('x, 'a t -> 'a) fn (** [take_blocking_l l] removes and returns the value of leftmost node of the doubly-linked list [l], or blocks waiting for the list to become non-empty. *) - val take_blocking_r : ('x, 'a t -> 'a) blocking_fn + val take_blocking_r : ('x, 'a t -> 'a) fn (** [take_blocking_r l] removes and returns the value of rightmost node of the doubly-linked list [l], or blocks waiting for the list to become non-empty. *) diff --git a/src/kcas_data/mvar.ml b/src/kcas_data/mvar.ml index ba31e192..690e3dbc 100644 --- a/src/kcas_data/mvar.ml +++ b/src/kcas_data/mvar.ml @@ -25,18 +25,17 @@ end let is_empty mv = Magic_option.is_none (Loc.get mv) -let put ?timeoutf mv value = +let put mv value = (* Fenceless is safe as we always update. *) - Loc.fenceless_modify ?timeoutf mv (Magic_option.put_or_retry value) + Loc.fenceless_modify mv (Magic_option.put_or_retry value) let try_put mv value = Loc.compare_and_set mv Magic_option.none (Magic_option.some value) -let take ?timeoutf mv = +let take mv = (* Fenceless is safe as we always update. *) - Magic_option.get_unsafe - (Loc.fenceless_update ?timeoutf mv Magic_option.take_or_retry) + Magic_option.get_unsafe (Loc.fenceless_update mv Magic_option.take_or_retry) let take_opt mv = Magic_option.to_option (Loc.exchange mv Magic_option.none) -let peek ?timeoutf mv = Loc.get_as ?timeoutf Magic_option.get_or_retry mv +let peek mv = Loc.get_as Magic_option.get_or_retry mv let peek_opt mv = Magic_option.to_option (Loc.get mv) diff --git a/src/kcas_data/mvar.mli b/src/kcas_data/mvar.mli index f0a7d5ca..12a2b8df 100644 --- a/src/kcas_data/mvar.mli +++ b/src/kcas_data/mvar.mli @@ -26,13 +26,8 @@ module Xt : Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction passing on synchronizing variables. *) (** {1 Non-compositional interface} *) -include - Mvar_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn diff --git a/src/kcas_data/mvar_intf.ml b/src/kcas_data/mvar_intf.ml index e9f6c7e3..3df088f2 100644 --- a/src/kcas_data/mvar_intf.ml +++ b/src/kcas_data/mvar_intf.ml @@ -1,13 +1,12 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty mv] determines whether the synchronizing variable [mv] contains a value or not. *) - val put : ('x, 'a t -> 'a -> unit) blocking_fn + val put : ('x, 'a t -> 'a -> unit) fn (** [put mv x] fills the synchronizing variable [mv] with the value [v] or blocks until the variable becomes empty. *) @@ -16,7 +15,7 @@ module type Ops = sig value [v] and returns [true] on success or [false] in case the variable is full. *) - val take : ('x, 'a t -> 'a) blocking_fn + val take : ('x, 'a t -> 'a) fn (** [take mv] removes and returns the current value of the synchronizing variable [mv] or blocks waiting until the variable is filled. *) @@ -24,7 +23,7 @@ module type Ops = sig (** [take_opt mv] removes and returns the current value of the synchronizing variable [mv] or returns [None] in case the variable is empty. *) - val peek : ('x, 'a t -> 'a) blocking_fn + val peek : ('x, 'a t -> 'a) fn (** [peek mv] returns the current value of the synchronizing variable [mv] or blocks waiting until the variable is filled. *) diff --git a/src/kcas_data/promise.ml b/src/kcas_data/promise.ml index 0786ff9c..cf25e189 100644 --- a/src/kcas_data/promise.ml +++ b/src/kcas_data/promise.ml @@ -38,8 +38,7 @@ module Xt = struct let resolve_error ~xt u e = resolve ~xt u (Error e) end -let await ?timeoutf t = - Loc.get_as ?timeoutf Magic_option.get_or_retry (of_promise t) +let await t = Loc.get_as Magic_option.get_or_retry (of_promise t) let resolve u v = if @@ -51,8 +50,8 @@ let resolve u v = let peek t = Magic_option.to_option (Loc.get (of_promise t)) let is_resolved t = Magic_option.is_some (Loc.get (of_promise t)) -let await_exn ?timeoutf t = - match await ?timeoutf t with Ok value -> value | Error exn -> raise exn +let await_exn t = + match await t with Ok value -> value | Error exn -> raise exn let resolve_ok u v = resolve u (Ok v) let resolve_error u e = resolve u (Error e) diff --git a/src/kcas_data/promise.mli b/src/kcas_data/promise.mli index 11c49988..6515f908 100644 --- a/src/kcas_data/promise.mli +++ b/src/kcas_data/promise.mli @@ -42,7 +42,6 @@ module Xt : with type 'a or_exn := 'a or_exn with type 'a u := 'a u with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on promises. *) (** {1 Non-compositional interface} *) @@ -53,4 +52,3 @@ include with type 'a or_exn := 'a or_exn with type 'a u := 'a u with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn diff --git a/src/kcas_data/promise_intf.ml b/src/kcas_data/promise_intf.ml index 93d1034a..ffe512f9 100644 --- a/src/kcas_data/promise_intf.ml +++ b/src/kcas_data/promise_intf.ml @@ -3,14 +3,13 @@ module type Ops = sig type !-'a u type 'a or_exn type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val resolve : ('x, 'a u -> 'a -> unit) fn (** [resolve u v] resolves the promise corresponding to the resolver [u] to the value [v]. Any awaiters of the corresponding promise are then unblocked. *) - val await : ('x, 'a t -> 'a) blocking_fn + val await : ('x, 'a t -> 'a) fn (** [await t] either immediately returns the resolved value of the promise [t] or blocks until the promise [t] is resolved. *) @@ -24,7 +23,7 @@ module type Ops = sig (** {2 Result promises} *) - val await_exn : ('x, 'a or_exn -> 'a) blocking_fn + val await_exn : ('x, 'a or_exn -> 'a) fn (** [await_exn t] is equivalent to [match await t with v -> v | exception e -> raise e]. *) val resolve_ok : ('x, ('a, 'b) result u -> 'a -> unit) fn diff --git a/src/kcas_data/queue.ml b/src/kcas_data/queue.ml index 543bb0c7..d8742eb6 100644 --- a/src/kcas_data/queue.ml +++ b/src/kcas_data/queue.ml @@ -139,10 +139,10 @@ let take_opt q = | None -> Kcas.Xt.commit { tx = Xt.take_opt q } | some -> some -let take_blocking ?timeoutf q = +let take_blocking q = (* Fenceless is safe as we revert to a transaction in case we didn't update. *) match Loc.fenceless_update q.front Elems.tl_safe |> Elems.hd_opt with - | None -> Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking q } + | None -> Kcas.Xt.commit { tx = Xt.take_blocking q } | Some elem -> elem let take_all q = Kcas.Xt.commit { tx = Xt.take_all q } @@ -152,9 +152,7 @@ let peek_opt q = | None -> Kcas.Xt.commit { tx = Xt.peek_opt q } | some -> some -let peek_blocking ?timeoutf q = - Kcas.Xt.commit ?timeoutf { tx = Xt.peek_blocking q } - +let peek_blocking q = Kcas.Xt.commit { tx = Xt.peek_blocking q } let clear q = Kcas.Xt.commit { tx = Xt.clear q } let swap q1 q2 = Kcas.Xt.commit { tx = Xt.swap q1 q2 } let to_seq q = Kcas.Xt.commit { tx = Xt.to_seq q } diff --git a/src/kcas_data/queue.mli b/src/kcas_data/queue.mli index d955680c..f4dd49a9 100644 --- a/src/kcas_data/queue.mli +++ b/src/kcas_data/queue.mli @@ -31,16 +31,11 @@ module Xt : Queue_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on queues. *) (** {1 Non-compositional interface} *) -include - Queue_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Queue_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn val peek : 'a t -> 'a (** [peek q] returns the first element in queue [s], or raises {!Empty} if the diff --git a/src/kcas_data/queue_intf.ml b/src/kcas_data/queue_intf.ml index c787bdde..5293058e 100644 --- a/src/kcas_data/queue_intf.ml +++ b/src/kcas_data/queue_intf.ml @@ -1,7 +1,6 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty s] determines whether the queue [q] is empty. *) @@ -32,11 +31,11 @@ module type Ops = sig (** [peek_opt q] returns the first element in queue [q], without removing it from the queue, or returns [None] if the queue is empty. *) - val peek_blocking : ('x, 'a t -> 'a) blocking_fn + val peek_blocking : ('x, 'a t -> 'a) fn (** [peek_blocking q] returns the first element in queue [q], without removing it from the queue, or blocks waiting for the queue to become non-empty. *) - val take_blocking : ('x, 'a t -> 'a) blocking_fn + val take_blocking : ('x, 'a t -> 'a) fn (** [take_blocking q] removes and returns the first element in queue [q], or blocks waiting for the queue to become non-empty. *) diff --git a/src/kcas_data/stack.ml b/src/kcas_data/stack.ml index c4d0170a..ad684e36 100644 --- a/src/kcas_data/stack.ml +++ b/src/kcas_data/stack.ml @@ -30,12 +30,12 @@ let push x s = let pop_opt s = Loc.update s Elems.tl_safe |> Elems.hd_opt let pop_all s = Loc.exchange s Elems.empty |> Elems.to_seq -let pop_blocking ?timeoutf s = +let pop_blocking s = (* Fenceless is safe as we always update. *) - Loc.fenceless_update ?timeoutf s Elems.tl_or_retry |> Elems.hd_unsafe + Loc.fenceless_update s Elems.tl_or_retry |> Elems.hd_unsafe let top_opt s = Loc.get s |> Elems.hd_opt -let top_blocking ?timeoutf s = Loc.get_as ?timeoutf Elems.hd_or_retry s +let top_blocking s = Loc.get_as Elems.hd_or_retry s let clear s = Loc.set s Elems.empty let swap s1 s2 = Kcas.Xt.commit { tx = Kcas.Xt.swap s1 s2 } let to_seq s = Elems.to_seq @@ Loc.get s diff --git a/src/kcas_data/stack.mli b/src/kcas_data/stack.mli index fa7a67a0..d84fe9a6 100644 --- a/src/kcas_data/stack.mli +++ b/src/kcas_data/stack.mli @@ -33,16 +33,11 @@ module Xt : Stack_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on stacks. *) (** {1 Non-compositional interface} *) -include - Stack_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Stack_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn val pop : 'a t -> 'a (** [pop s] removes and returns the topmost element in stack [s], or raises diff --git a/src/kcas_data/stack_intf.ml b/src/kcas_data/stack_intf.ml index 640a840f..53e980f6 100644 --- a/src/kcas_data/stack_intf.ml +++ b/src/kcas_data/stack_intf.ml @@ -1,7 +1,6 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty s] determines whether the stack [s] is empty. *) @@ -34,7 +33,7 @@ module type Ops = sig sequence for iterating through all the elements that were in the stack top to bottom. *) - val pop_blocking : ('x, 'a t -> 'a) blocking_fn + val pop_blocking : ('x, 'a t -> 'a) fn (** [pop_blocking s] removes and returns the topmost element of the stack [s], or blocks waiting for the queue to become non-empty. *) @@ -42,7 +41,7 @@ module type Ops = sig (** [top_opt s] returns the topmost element in stack [s], or [None] if the stack is empty. *) - val top_blocking : ('x, 'a t -> 'a) blocking_fn + val top_blocking : ('x, 'a t -> 'a) fn (** [top_blocking s] returns the topmost element in stack [s], or blocks waiting for the queue to become non-empty. *) end diff --git a/test/kcas/barrier.ml b/test/kcas/barrier.ml index 334be94f..3393b3a9 100644 --- a/test/kcas/barrier.ml +++ b/test/kcas/barrier.ml @@ -1,3 +1,5 @@ +open Picos_structured + type t = { counter : int Atomic.t; total : int } let make total = { counter = Atomic.make 0; total } @@ -5,5 +7,5 @@ let make total = { counter = Atomic.make 0; total } let await { counter; total } = Atomic.incr counter; while Atomic.get counter < total do - Domain.cpu_relax () + Control.yield () done diff --git a/test/kcas/dune b/test/kcas/dune index e78098c1..1d4a86af 100644 --- a/test/kcas/dune +++ b/test/kcas/dune @@ -1,9 +1,11 @@ (tests (names test ms_queue_test threads loc_modes) (libraries + scheduler + picos.finally + picos.structured alcotest kcas - domain-local-timeout threads.posix unix domain_shims) diff --git a/test/kcas/loc_modes.ml b/test/kcas/loc_modes.ml index aa4af6f7..2547e23e 100644 --- a/test/kcas/loc_modes.ml +++ b/test/kcas/loc_modes.ml @@ -1,3 +1,4 @@ +open Picos_structured open Kcas let loop_count = try int_of_string Sys.argv.(1) with _ -> Util.iter_factor @@ -76,6 +77,6 @@ let accumulator_thread () = exit := true let () = - accumulator_thread :: List.init n_counters counter_thread - |> List.map Domain.spawn |> List.iter Domain.join; + Scheduler.run ~n_domains:(n_counters + 1) @@ fun () -> + Run.all (accumulator_thread :: List.init n_counters counter_thread); Printf.printf "Loc modes OK!\n%!" diff --git a/test/kcas/test.ml b/test/kcas/test.ml index 1b433c5a..cdf4b684 100644 --- a/test/kcas/test.ml +++ b/test/kcas/test.ml @@ -24,6 +24,9 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ---------------------------------------------------------------------------*) +open Picos_finally +open Picos_structured + let is_single = Domain.recommended_domain_count () = 1 open Kcas @@ -34,16 +37,10 @@ let assert_kcas loc expected_v = let present_v = Loc.get loc in assert (present_v == expected_v) -let run_domains = function - | [] -> () - | main :: others -> - let others = List.map Domain.spawn others in - main (); - List.iter Domain.join others - (* *) let test_non_linearizable_xt () = + Scheduler.run ~n_domains:2 @@ fun () -> [ `Obstruction_free; `Lock_free ] |> List.iter @@ fun mode -> let barrier = Barrier.make 2 @@ -90,7 +87,7 @@ let test_non_linearizable_xt () = test_finished := true in - run_domains [ thread2; thread1 ] + Run.all [ thread2; thread1 ] (* *) @@ -103,6 +100,7 @@ let test_set () = (* *) let test_no_skew_xt () = + Scheduler.run ~n_domains:3 @@ fun () -> [ `Obstruction_free; `Lock_free ] |> List.iter @@ fun mode -> let barrier = Barrier.make 3 in @@ -163,14 +161,15 @@ let test_no_skew_xt () = done in - run_domains [ thread1; thread2; thread3 ] + Run.all [ thread1; thread2; thread3 ] (* *) let test_get_seq_xt () = + Scheduler.run ~n_domains:5 @@ fun () -> [ `Obstruction_free; `Lock_free ] |> List.iter @@ fun mode -> - let barrier = Barrier.make 4 in + let barrier = Barrier.make 5 in let test_finished = Atomic.make false in let a1 = Loc.make ~mode 0 in @@ -220,7 +219,7 @@ let test_get_seq_xt () = done in - run_domains [ mutator; getter; getaser; committer; updater ] + Run.all [ mutator; getter; getaser; committer; updater ] (* *) @@ -274,6 +273,7 @@ let test_presort_and_is_in_log_xt () = let n_incs = 10 * Util.iter_factor and n_domains = 3 and n_locs = 12 in let n_locs_half = n_locs asr 1 in + Scheduler.run ~n_domains @@ fun () -> let barrier = Barrier.make n_domains in let locs = Array.init n_locs (fun _ -> Loc.make 0) in @@ -295,7 +295,7 @@ let test_presort_and_is_in_log_xt () = done in - run_domains (List.init n_domains (Fun.const thread)); + Run.all (List.init n_domains (Fun.const thread)); let sum = locs |> Array.map Loc.get |> Array.fold_left ( + ) 0 in assert (sum = n_incs * n_locs_half * n_domains) @@ -344,6 +344,7 @@ let test_post_commit () = (* *) let test_blocking () = + Scheduler.run ~n_domains:2 @@ fun () -> let state = Loc.make `Spawned in let await state' = (* Intentionally test that [Xt.modify] allows retry. *) @@ -361,101 +362,109 @@ let test_blocking () = let num_attempts = ref 0 in - let other_domain = - Domain.spawn @@ fun () -> - Loc.set state `Get_a_non_zero; - Loc.get_as - (fun a -> - incr num_attempts; - Retry.unless (a != 0)) - a; - - Loc.set state `Update_a_zero; - Loc.modify a (fun a -> - incr num_attempts; - Retry.unless (a = 0); - a + 1); - - Loc.set state `Set_1_b_to_0; - let bs = Array.copy bs in - for _ = 1 to n do - (* We access in random order to exercise tx log waiters handling. *) - in_place_shuffle bs; - let tx ~xt = - incr num_attempts; - match - bs - |> Array.find_map @@ fun b -> - if Xt.get ~xt b = 1 then Some b - else if Loc.has_awaiters b (* There must be no leaked waiters... *) - then begin - (* ...except if main domain just set the loc *) - assert (Loc.get b = 1); - Retry.later () - end - else None - with - | None -> Retry.later () - | Some b -> Xt.set ~xt b 0 - in - Xt.commit { tx } - done - in + begin + Flock.join_after @@ fun () -> + begin + Flock.fork @@ fun () -> + Loc.set state `Get_a_non_zero; + Loc.get_as + (fun a -> + incr num_attempts; + Retry.unless (a != 0)) + a; + + Loc.set state `Update_a_zero; + Loc.modify a (fun a -> + incr num_attempts; + Retry.unless (a = 0); + a + 1); + + Loc.set state `Set_1_b_to_0; + let bs = Array.copy bs in + for _ = 1 to n do + (* We access in random order to exercise tx log waiters handling. *) + in_place_shuffle bs; + let tx ~xt = + incr num_attempts; + match + bs + |> Array.find_map @@ fun b -> + if Xt.get ~xt b = 1 then Some b + else if + Loc.has_awaiters b (* There must be no leaked waiters... *) + then begin + (* ...except if main domain just set the loc *) + assert (Loc.get b = 1); + Retry.later () + end + else None + with + | None -> Retry.later () + | Some b -> Xt.set ~xt b 0 + in + Xt.commit { tx } + done + end; - await `Get_a_non_zero; - Loc.set a 1; - assert (!num_attempts <= 2 + 1 (* Need to account for race to next. *)); + await `Get_a_non_zero; + Loc.set a 1; + assert (!num_attempts <= 3 + 1 (* Need to account for race to next. *)); - await `Update_a_zero; - Loc.set a 0; - assert (!num_attempts <= 4 + 1 (* Need to account for race to next. *)); + await `Update_a_zero; + Loc.set a 0; + assert (!num_attempts <= 5 + 1 (* Need to account for race to next. *)); - await `Set_1_b_to_0; - for _ = 1 to n do - let i = Random.int (Array.length bs) in - Loc.set bs.(i) 1; - Loc.get_as (fun b -> Retry.unless (b = 0)) bs.(i) - done; - - Domain.join other_domain; + await `Set_1_b_to_0; + for _ = 1 to n do + let i = Random.int (Array.length bs) in + Loc.set bs.(i) 1; + Loc.get_as (fun b -> Retry.unless (b = 0)) bs.(i) + done + end; - assert (!num_attempts <= 4 + (n * 2)); + assert (!num_attempts <= 5 + (n * 2)); for i = 0 to Array.length bs - 1 do assert (not (Loc.has_awaiters bs.(i))) done let test_no_unnecessary_wakeups () = + Scheduler.run ~n_domains:2 @@ fun () -> let continue = Loc.make false and tries = Atomic.make 0 in - let other_domain = - Domain.spawn @@ fun () -> - continue - |> Loc.get_as @@ fun s -> - Atomic.incr tries; - Retry.unless s - in + begin + Flock.join_after @@ fun () -> + begin + Flock.fork @@ fun () -> + continue + |> Loc.get_as @@ fun s -> + Atomic.incr tries; + Retry.unless s + end; - while not (Loc.has_awaiters continue) do - Domain.cpu_relax () - done; + while not (Loc.has_awaiters continue) do + Domain.cpu_relax () + done; - assert (Loc.compare_and_set continue false false); - assert (not (Loc.update continue Fun.id)); - Loc.set continue false; + assert (Loc.compare_and_set continue false false); + assert (not (Loc.update continue Fun.id)); + Loc.set continue false; - Unix.sleepf 0.01; + Unix.sleepf 0.01; + + assert (Loc.has_awaiters continue && Atomic.get tries = 1); + Loc.set continue true + end; - assert (Loc.has_awaiters continue && Atomic.get tries = 1); - Loc.set continue true; - Domain.join other_domain; assert ((not (Loc.has_awaiters continue)) && Atomic.get tries = 2) (* *) let test_periodic_validation () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let a = Loc.make 0 and b = Loc.make 0 and looping = ref false in let non_zero_difference_domain = - Domain.spawn @@ fun () -> + Flock.fork_as_promise @@ fun () -> let rec tx ~xt = let d = Xt.get ~xt a - Xt.get ~xt b in if d <> 0 then d @@ -475,7 +484,7 @@ let test_periodic_validation () = Loc.set a 1; - assert (1 = Domain.join non_zero_difference_domain) + assert (1 = Promise.await non_zero_difference_domain) (* *) @@ -572,42 +581,43 @@ let test_call () = (** This is a non-deterministic test that might fail occasionally. *) let test_timeout () = - Domain_local_timeout.set_system (module Thread) (module Unix); - - let check (op : ?timeoutf:float -> bool Loc.t -> unit) () = + Scheduler.run ~n_domains:5 @@ fun () -> + let check (op : bool Loc.t -> unit) () = + Flock.join_after @@ fun () -> let rec loop n = let x = Loc.make false in - let (_ : unit -> unit) = - Domain_local_timeout.set_timeoutf 0.6 @@ fun () -> Loc.set x true + let@ _ = + finally Promise.terminate @@ fun () -> + Flock.fork_as_promise @@ fun () -> + Control.sleep ~seconds:0.6; + Loc.set x true in - match op ~timeoutf:0.02 x with + match Control.terminate_after ~seconds:0.02 (fun () -> op x) with | () -> if 0 < n then loop (n - 1) else assert false - | exception Timeout.Timeout -> op ~timeoutf:2.0 x + | exception Control.Terminate -> + Control.terminate_after ~seconds:2.0 (fun () -> op x) in loop 10 in - run_domains + Run.all [ - check (fun ?timeoutf x -> - Loc.get_as ?timeoutf (fun x -> if not x then Retry.later ()) x); - check (fun ?timeoutf x -> - Loc.update ?timeoutf x (fun x -> x || Retry.later ()) |> ignore); - check (fun ?timeoutf x -> - Loc.modify ?timeoutf x (fun x -> x || Retry.later ())); - check (fun ?timeoutf x -> + check (fun x -> Loc.get_as (fun x -> if not x then Retry.later ()) x); + check (fun x -> Loc.update x (fun x -> x || Retry.later ()) |> ignore); + check (fun x -> Loc.modify x (fun x -> x || Retry.later ())); + check (fun x -> let y = Loc.make false in let tx ~xt = if not (Xt.get ~xt x) then Retry.later (); Xt.swap ~xt x y in - Xt.commit ?timeoutf { tx }); - check (fun ?timeoutf x -> + Xt.commit { tx }); + check (fun x -> let y = Loc.make false in let tx ~xt = if not (Xt.get ~xt x) then Retry.invalid (); Xt.swap ~xt x y in - Xt.commit ?timeoutf { tx }); + Xt.commit { tx }); ] (* *) diff --git a/test/kcas/threads.ml b/test/kcas/threads.ml index 5cbcc60c..5873a7ad 100644 --- a/test/kcas/threads.ml +++ b/test/kcas/threads.ml @@ -1,12 +1,14 @@ open Kcas let await_between_threads () = + Scheduler.run @@ fun () -> let x = Loc.make 0 in let y = Loc.make 0 in let a_thread = () |> Thread.create @@ fun () -> + Scheduler.run @@ fun () -> Loc.get_as (fun x -> Retry.unless (x <> 0)) x; Loc.set y 22 in diff --git a/test/kcas_data/dune b/test/kcas_data/dune index 1047e5a0..61167ee2 100644 --- a/test/kcas_data/dune +++ b/test/kcas_data/dune @@ -14,6 +14,8 @@ stack_test_stm xt_test) (libraries + scheduler + picos.structured alcotest kcas kcas_data diff --git a/test/kcas_data/lru_cache.ml b/test/kcas_data/lru_cache.ml index 110ca832..c5271c07 100644 --- a/test/kcas_data/lru_cache.ml +++ b/test/kcas_data/lru_cache.ml @@ -62,8 +62,5 @@ end let capacity_of c = Kcas.Xt.commit { tx = Xt.capacity_of c } let set_capacity c n = Kcas.Xt.commit { tx = Xt.set_capacity c n } let get_opt c k = Kcas.Xt.commit { tx = Xt.get_opt c k } - -let set_blocking ?timeoutf c k v = - Kcas.Xt.commit ?timeoutf { tx = Xt.set_blocking c k v } - +let set_blocking c k v = Kcas.Xt.commit { tx = Xt.set_blocking c k v } let remove c k = Kcas.Xt.commit { tx = Xt.remove c k } diff --git a/test/kcas_data/lru_cache.mli b/test/kcas_data/lru_cache.mli index b32b1aa3..2162d524 100644 --- a/test/kcas_data/lru_cache.mli +++ b/test/kcas_data/lru_cache.mli @@ -9,10 +9,8 @@ module Xt : Lru_cache_intf.Ops with type ('k, 'v) t := ('k, 'v) t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn include Lru_cache_intf.Ops with type ('k, 'v) t := ('k, 'v) t with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn diff --git a/test/kcas_data/lru_cache_example.ml b/test/kcas_data/lru_cache_example.ml index 0df2ee29..709bf74f 100644 --- a/test/kcas_data/lru_cache_example.ml +++ b/test/kcas_data/lru_cache_example.ml @@ -1,3 +1,4 @@ +open Picos_structured open Kcas module Lru_cache = struct @@ -19,21 +20,23 @@ module Lru_cache = struct | exception Retry.Later -> false end - let get ?timeoutf c k = Kcas.Xt.commit ?timeoutf { tx = Xt.get c k } - let get_if ?timeoutf c k p = Kcas.Xt.commit ?timeoutf { tx = Xt.get_if c k p } + let get c k = Kcas.Xt.commit { tx = Xt.get c k } + let get_if c k p = Kcas.Xt.commit { tx = Xt.get_if c k p } let try_set c k d = Kcas.Xt.commit { tx = Xt.try_set c k d } end let () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let c = Lru_cache.create 10 in - let domain = - Domain.spawn @@ fun () -> + let answer = + Flock.fork_as_promise @@ fun () -> let tx ~xt = Lru_cache.Xt.get ~xt c "a" + Lru_cache.Xt.get ~xt c "b" in Xt.commit { tx } in Lru_cache.set_blocking c "b" 30; Lru_cache.set_blocking c "a" 12; - assert (Domain.join domain = 42); + assert (Promise.await answer = 42); () let () = diff --git a/test/kcas_data/lru_cache_intf.ml b/test/kcas_data/lru_cache_intf.ml index 79a8063e..11ef488d 100644 --- a/test/kcas_data/lru_cache_intf.ml +++ b/test/kcas_data/lru_cache_intf.ml @@ -1,11 +1,10 @@ module type Ops = sig type ('k, 'v) t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val capacity_of : ('x, ('k, 'v) t -> int) fn val set_capacity : ('x, ('k, 'v) t -> int -> unit) fn val get_opt : ('x, ('k, 'v) t -> 'k -> 'v option) fn - val set_blocking : ('x, ('k, 'v) t -> 'k -> 'v -> unit) blocking_fn + val set_blocking : ('x, ('k, 'v) t -> 'k -> 'v -> unit) fn val remove : ('x, ('k, 'v) t -> 'k -> unit) fn end diff --git a/test/kcas_data/mvar_test.ml b/test/kcas_data/mvar_test.ml index f05872a4..07aecbf9 100644 --- a/test/kcas_data/mvar_test.ml +++ b/test/kcas_data/mvar_test.ml @@ -1,7 +1,10 @@ +open Picos_structured open Kcas open Kcas_data let basics () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let mv = Mvar.create (Some 101) in assert (not (Mvar.is_empty mv)); assert (Mvar.take mv = 101); @@ -9,14 +12,13 @@ let basics () = assert (Mvar.take_opt mv = None); Mvar.put mv 42; let running = Mvar.create None in - let d = - Domain.spawn @@ fun () -> + begin + Flock.fork @@ fun () -> Mvar.put running (); Xt.commit { tx = Mvar.Xt.put mv 76 } - in + end; assert (Mvar.take running = ()); assert (Xt.commit { tx = Mvar.Xt.take mv } = 42); - Domain.join d; assert (Mvar.take mv = 76) let () = diff --git a/test/lib/scheduler/dune b/test/lib/scheduler/dune new file mode 100644 index 00000000..29be77cf --- /dev/null +++ b/test/lib/scheduler/dune @@ -0,0 +1,9 @@ +(library + (name scheduler) + (libraries + picos.select + (select + scheduler.ml + from + (picos.randos -> scheduler.ocaml5.ml) + (picos.threaded -> scheduler.ocaml4.ml)))) diff --git a/test/lib/scheduler/scheduler.ocaml4.ml b/test/lib/scheduler/scheduler.ocaml4.ml new file mode 100644 index 00000000..9ddf31b6 --- /dev/null +++ b/test/lib/scheduler/scheduler.ocaml4.ml @@ -0,0 +1 @@ +let run ?n_domains:_ main = Picos_threaded.run main diff --git a/test/lib/scheduler/scheduler.ocaml5.ml b/test/lib/scheduler/scheduler.ocaml5.ml new file mode 100644 index 00000000..ea7bb262 --- /dev/null +++ b/test/lib/scheduler/scheduler.ocaml5.ml @@ -0,0 +1,2 @@ +let () = Picos_select.configure () +let run ?(n_domains = 1) main = Picos_randos.run_on ~n_domains main diff --git a/test/kcas_data/stm_run/dune b/test/lib/stm_run/dune similarity index 100% rename from test/kcas_data/stm_run/dune rename to test/lib/stm_run/dune diff --git a/test/kcas_data/stm_run/empty.ocaml4.ml b/test/lib/stm_run/empty.ocaml4.ml similarity index 100% rename from test/kcas_data/stm_run/empty.ocaml4.ml rename to test/lib/stm_run/empty.ocaml4.ml diff --git a/test/kcas_data/stm_run/empty.ocaml5.ml b/test/lib/stm_run/empty.ocaml5.ml similarity index 100% rename from test/kcas_data/stm_run/empty.ocaml5.ml rename to test/lib/stm_run/empty.ocaml5.ml diff --git a/test/kcas_data/stm_run/intf.ml b/test/lib/stm_run/intf.ml similarity index 100% rename from test/kcas_data/stm_run/intf.ml rename to test/lib/stm_run/intf.ml diff --git a/test/kcas_data/stm_run/stm_run.ocaml4.ml b/test/lib/stm_run/stm_run.ocaml4.ml similarity index 100% rename from test/kcas_data/stm_run/stm_run.ocaml4.ml rename to test/lib/stm_run/stm_run.ocaml4.ml diff --git a/test/kcas_data/stm_run/stm_run.ocaml5.ml b/test/lib/stm_run/stm_run.ocaml5.ml similarity index 100% rename from test/kcas_data/stm_run/stm_run.ocaml5.ml rename to test/lib/stm_run/stm_run.ocaml5.ml diff --git a/test/kcas_data/stm_run/util.ml b/test/lib/stm_run/util.ml similarity index 100% rename from test/kcas_data/stm_run/util.ml rename to test/lib/stm_run/util.ml