diff --git a/bench/bench_accumulator.ml b/bench/bench_accumulator.ml index 17fc4609..c0be3d1a 100644 --- a/bench/bench_accumulator.ml +++ b/bench/bench_accumulator.ml @@ -9,7 +9,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () = let n_ops_todo = Atomic.make n_ops |> Multicore_magic.copy_as_padded in let init _ = () in - + let wrap _ _ action = Scheduler.run action in let work _ () = let rec work () = let n = Util.alloc n_ops_todo in @@ -34,7 +34,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () = (if n_domains = 1 then "" else "s") in - Times.record ~budgetf ~n_domains ~init ~work ~after () + Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after () |> Times.to_thruput_metrics ~n:n_ops ~config ~singular:"operation" let run_suite ~budgetf = diff --git a/bench/bench_dllist.ml b/bench/bench_dllist.ml index c58ab302..61895092 100644 --- a/bench/bench_dllist.ml +++ b/bench/bench_dllist.ml @@ -12,9 +12,10 @@ let run_single ~budgetf ?(n_msgs = 15 * Util.iter_factor) () = assert (Dllist.is_empty t); Util.generate_push_and_pop_sequence n_msgs in + let wrap _ _ action = Scheduler.run action in let work _ bits = Util.Bits.iter op bits in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1) @@ -31,6 +32,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ _ action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -70,7 +72,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1) (format "taker" false n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_hashtbl.ml b/bench/bench_hashtbl.ml index 1286ee67..c9ce1c25 100644 --- a/bench/bench_hashtbl.ml +++ b/bench/bench_hashtbl.ml @@ -24,7 +24,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor) Atomic.set n_ops_todo n_ops; Random.State.make_self_init () in - + let wrap _ _ action = Scheduler.run action in let work _ state = let rec work () = let n = Util.alloc n_ops_todo in @@ -56,7 +56,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor) percent_read in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config let run_suite ~budgetf = diff --git a/bench/bench_mvar.ml b/bench/bench_mvar.ml index 2c3988b5..b1871ac5 100644 --- a/bench/bench_mvar.ml +++ b/bench/bench_mvar.ml @@ -11,6 +11,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in let init _ = () in + let wrap _ _ action = Scheduler.run action in let work i () = if i < n_adders then if blocking_add then @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work ~after () + Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_parallel_cmp.ml b/bench/bench_parallel_cmp.ml index e1703213..686c9170 100644 --- a/bench/bench_parallel_cmp.ml +++ b/bench/bench_parallel_cmp.ml @@ -11,7 +11,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () = let n_ops_todo = Atomic.make n_ops |> Multicore_magic.copy_as_padded in let init i = Array.unsafe_get xs i in - + let wrap _ _ action = Scheduler.run action in let work _ x = let tx1 ~xt = let a = Xt.get ~xt a in @@ -41,7 +41,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () = Printf.sprintf "%d worker%s" n_domains (if n_domains = 1 then "" else "s") in - Times.record ~budgetf ~n_domains ~init ~work ~after () + Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after () |> Times.to_thruput_metrics ~n:n_ops ~singular:"transaction" ~config let run_suite ~budgetf = diff --git a/bench/bench_queue.ml b/bench/bench_queue.ml index ebcc9d07..f7afbfea 100644 --- a/bench/bench_queue.ml +++ b/bench/bench_queue.ml @@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = assert (Queue.is_empty t); Util.generate_push_and_pop_sequence n_msgs in + let wrap _ _ action = Scheduler.run action in let work _ bits = Util.Bits.iter op bits in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) @@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ _ action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -79,7 +81,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_stack.ml b/bench/bench_stack.ml index 4886702b..a93b7262 100644 --- a/bench/bench_stack.ml +++ b/bench/bench_stack.ml @@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = assert (Stack.is_empty t); Util.generate_push_and_pop_sequence n_msgs in + let wrap _ _ action = Scheduler.run action in let work _ bits = Util.Bits.iter op bits in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) @@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ _ action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -79,7 +81,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_xt.ml b/bench/bench_xt.ml index 1bdf1ace..39dc451b 100644 --- a/bench/bench_xt.ml +++ b/bench/bench_xt.ml @@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2) in let init _ = () in + let wrap _ _ action = Scheduler.run action in let work _ () = let rec loop i = if i > 0 then begin @@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2) let config = Printf.sprintf "%d loc tx" n_locs in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config let run_suite ~budgetf = diff --git a/bench/bench_xt_ro.ml b/bench/bench_xt_ro.ml index 9629ab3a..5eca629c 100644 --- a/bench/bench_xt_ro.ml +++ b/bench/bench_xt_ro.ml @@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2) in let init _ = () in + let wrap _ _ action = Scheduler.run action in let work _ () = let rec loop i = if i > 0 then begin @@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2) let config = Printf.sprintf "%d loc tx" n_locs in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config let run_suite ~budgetf = diff --git a/bench/dune b/bench/dune index cd97da36..279eb529 100644 --- a/bench/dune +++ b/bench/dune @@ -14,6 +14,7 @@ let () = (action (run %{test} -brief)) (libraries + scheduler kcas_data multicore-bench backoff diff --git a/doc/dune b/doc/dune index 4bfc69d7..92002ff3 100644 --- a/doc/dune +++ b/doc/dune @@ -5,4 +5,4 @@ (package kcas_data)) (enabled_if (>= %{ocaml_version} 5.0.0)) - (files gkmz-with-read-only-cmp-ops.md scheduler-interop.md)) + (files gkmz-with-read-only-cmp-ops.md)) diff --git a/doc/scheduler-interop.md b/doc/scheduler-interop.md deleted file mode 100644 index 39bb662b..00000000 --- a/doc/scheduler-interop.md +++ /dev/null @@ -1,178 +0,0 @@ -# Scheduler interop - -The blocking mechanism in **Kcas** is based on a -[_domain local await_](https://github.com/ocaml-multicore/domain-local-await) -mechanism that schedulers can choose to implement to allow libraries like -**Kcas** to work with them. - -Implementing schedulers is not really what casual users of **Kcas** are supposed -to do. Below is an example of a _toy_ scheduler whose purpose is only to give a -sketch of how a scheduler can provide the domain local await mechanism. - -Let's also demonstrate the use of the -[`Queue`](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Queue/index.html), -[`Stack`](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Stack/index.html), -and -[`Promise`](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Promise/index.html) -implementations that are conveniently provided by -[**Kcas_data**](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/index.html). - - - -Here is the full toy scheduler module: - -```ocaml -module Scheduler : sig - type t - val spawn : unit -> t - val join : t -> unit - val fiber : t -> (unit -> 'a) -> 'a Promise.t -end = struct - open Effect.Deep - type _ Effect.t += - | Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t - type t = { - queue: (unit -> unit) Queue.t; - domain: unit Domain.t - } - let spawn () = - let queue = Queue.create () in - let rec scheduler work = - let effc (type a) : a Effect.t -> _ = function - | Suspend ef -> Some ef - | _ -> None in - try_with work () { effc }; - match Queue.take_opt queue with - | Some work -> scheduler work - | None -> () in - let prepare_for_await _ = - let state = Atomic.make `Init in - let release () = - if Atomic.get state != `Released then - match Atomic.exchange state `Released with - | `Awaiting k -> - Queue.add (continue k) queue - | _ -> () in - let await () = - if Atomic.get state != `Released then - Effect.perform @@ Suspend (fun k -> - if not (Atomic.compare_and_set state `Init - (`Awaiting k)) then - continue k ()) - in - Domain_local_await.{ release; await } in - let domain = Domain.spawn @@ fun () -> - try - while true do - let work = Queue.take_blocking queue in - Domain_local_await.using - ~prepare_for_await - ~while_running:(fun () -> scheduler work) - done - with Exit -> () in - { queue; domain } - let join t = - Queue.add (fun () -> raise Exit) t.queue; - Domain.join t.domain - let fiber t thunk = - let (promise, resolver) = Promise.create () in - Queue.add - (fun () -> Promise.resolve resolver (thunk ())) - t.queue; - promise -end -``` - -The idea is that one can spawn a scheduler to run on a new domain. Then one can -run fibers on the scheduler. Because the scheduler provides the domain local -await mechanism libraries like **Kcas** can use it to block in a scheduler -independent and friendly manner. - -Let's then demonstrate the integration. To start we spawn a scheduler: - -```ocaml -# let scheduler = Scheduler.spawn () -val scheduler : Scheduler.t = -``` - -The scheduler is now eagerly awaiting for fibers to run. Let's give it a couple -of them, but, let's first create a queue and a stack to communicate with the -fibers: - -```ocaml -# let in_queue : int Queue.t = Queue.create () -val in_queue : int Kcas_data.Queue.t = -# let out_stack : int Stack.t = Stack.create () -val out_stack : int Kcas_data.Stack.t = -``` - -The first fiber we create just copies elements from the `in_queue` to the -`out_stack`: - -```ocaml -# ignore @@ Scheduler.fiber scheduler @@ fun () -> - while true do - let elem = Queue.take_blocking in_queue in - Printf.printf "Giving %d...\n%!" elem; - Stack.push elem out_stack - done -- : unit = () -``` - -The second fiber awaits to take two elements from the `out_stack`, updates a -state in between, and then returns their sum: - -```ocaml -# let state = Loc.make 0 -val state : int Loc.t = Kcas.Loc.Loc {Kcas.Loc.state = ; id = } -# let sync_to target = - state - |> Loc.get_as @@ fun current -> - Retry.unless (target <= current) -val sync_to : int -> unit = -# let a_promise = Scheduler.fiber scheduler @@ fun () -> - let x = Stack.pop_blocking out_stack in - Printf.printf "First you gave me %d.\n%!" x; - Loc.set state 1; - let y = Stack.pop_blocking out_stack in - Printf.printf "Then you gave me %d.\n%!" y; - Loc.set state 2; - x + y -val a_promise : int Promise.t = -``` - -To interact with the fibers, we add some elements to the `in_queue`: - -```ocaml -# Queue.add 14 in_queue; sync_to 1 -Giving 14... -First you gave me 14. -- : unit = () -# Queue.add 28 in_queue; sync_to 2 -Giving 28... -Then you gave me 28. -- : unit = () -# Promise.await a_promise -- : int = 42 -``` - -As can be seen above, the scheduler multiplexes the domain among the fibers. -Notice that thanks to the domain local await mechanism we could just perform -blocking operations without thinking about the schedulers. Communication between -the main domain, the scheduler domain, and the fibers _just works_ ™. - -Time to close the shop. - -```ocaml -# Scheduler.join scheduler -- : unit = () -``` - -_That's all Folks!_ diff --git a/dune b/dune index 75232a0a..abdfa8cf 100644 --- a/dune +++ b/dune @@ -1,7 +1,7 @@ -(mdx - (package kcas_data) - (deps - (package kcas) - (package kcas_data)) - (libraries domain_shims) - (files README.md)) +;(mdx +; (package kcas_data) +; (deps +; (package kcas) +; (package kcas_data)) +; (libraries domain_shims) +; (files README.md)) diff --git a/dune-project b/dune-project index 96c1e31d..00309cab 100644 --- a/dune-project +++ b/dune-project @@ -34,12 +34,22 @@ (>= 4.13.0)) (backoff (>= 0.1.0)) - (domain-local-await - (>= 1.0.1)) - (domain-local-timeout - (>= 1.0.1)) (multicore-magic (>= 2.3.0)) + (picos + (>= 0.5.0)) + (picos_std + (and + (>= 0.5.0) + :with-test)) + (picos_io + (and + (>= 0.5.0) + :with-test)) + (picos_mux + (and + (>= 0.5.0) + :with-test)) (domain_shims (and (>= 0.1.0) @@ -84,9 +94,21 @@ (and (>= 0.1.0) :with-test)) - (domain-local-await + (picos + (and + (>= 0.5.0) + :with-test)) + (picos_std + (and + (>= 0.5.0) + :with-test)) + (picos_io + (and + (>= 0.5.0) + :with-test)) + (picos_mux (and - (>= 1.0.1) + (>= 0.5.0) :with-test)) (domain_shims (and diff --git a/dune-workspace b/dune-workspace new file mode 100644 index 00000000..b12e21ad --- /dev/null +++ b/dune-workspace @@ -0,0 +1,13 @@ +(lang dune 3.9) + +(env + (dev + (ocamlopt_flags + (:standard -S)) + (flags + (:standard -warn-error -A))) + (release + (ocamlopt_flags + (:standard -S)))) + +(display verbose) diff --git a/kcas.opam b/kcas.opam index 33c54daf..19f2bae6 100644 --- a/kcas.opam +++ b/kcas.opam @@ -19,9 +19,11 @@ depends: [ "dune" {>= "3.14"} "ocaml" {>= "4.13.0"} "backoff" {>= "0.1.0"} - "domain-local-await" {>= "1.0.1"} - "domain-local-timeout" {>= "1.0.1"} "multicore-magic" {>= "2.3.0"} + "picos" {>= "0.5.0"} + "picos_std" {>= "0.5.0" & with-test} + "picos_io" {>= "0.5.0" & with-test} + "picos_mux" {>= "0.5.0" & with-test} "domain_shims" {>= "0.1.0" & with-test} "alcotest" {>= "1.8.0" & with-test} "qcheck-core" {>= "0.21.2" & with-test} @@ -46,3 +48,10 @@ build: [ ] dev-repo: "git+https://github.com/ocaml-multicore/kcas.git" doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "picos.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] + [ "picos_aux.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] + [ "picos_io.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] + [ "picos_mux.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] + [ "picos_std.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] +] diff --git a/kcas.opam.template b/kcas.opam.template index 0fd71d5e..e702fa3a 100644 --- a/kcas.opam.template +++ b/kcas.opam.template @@ -1 +1,8 @@ doc: "https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/" +pin-depends: [ + [ "picos.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] + [ "picos_aux.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] + [ "picos_io.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] + [ "picos_mux.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] + [ "picos_std.dev" "git+https://github.com/ocaml-multicore/picos#2e66ef2a2ae18806c47dc75ffe0769a656641907" ] +] diff --git a/kcas_data.opam b/kcas_data.opam index 057d28dd..5e4529e4 100644 --- a/kcas_data.opam +++ b/kcas_data.opam @@ -20,7 +20,10 @@ depends: [ "kcas" {= version} "multicore-magic" {>= "2.3.0"} "backoff" {>= "0.1.0" & with-test} - "domain-local-await" {>= "1.0.1" & with-test} + "picos" {>= "0.5.0" & with-test} + "picos_std" {>= "0.5.0" & with-test} + "picos_io" {>= "0.5.0" & with-test} + "picos_mux" {>= "0.5.0" & with-test} "domain_shims" {>= "0.1.0" & with-test} "multicore-bench" {>= "0.1.4" & with-test} "alcotest" {>= "1.8.0" & with-test} diff --git a/src/kcas/dune b/src/kcas/dune index 70b19af5..5a83de7a 100644 --- a/src/kcas/dune +++ b/src/kcas/dune @@ -1,11 +1,11 @@ (library (name kcas) (public_name kcas) - (libraries domain-local-await domain-local-timeout backoff multicore-magic)) + (libraries picos backoff multicore-magic)) -(mdx - (package kcas) - (deps - (package kcas)) - (libraries kcas backoff domain_shims) - (files kcas.mli)) +;(mdx +; (package kcas) +; (deps +; (package kcas)) +; (libraries kcas backoff domain_shims) +; (files kcas.mli)) diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index 5bded5f4..f8fb2051 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -3,6 +3,8 @@ * Copyright (c) 2023, Vesa Karvonen *) +open Picos + (** Work around CSE bug in OCaml 5-5.1. *) let[@inline] atomic_get x = Atomic.get ((* Prevents CSE *) Sys.opaque_identity x) @@ -22,82 +24,6 @@ let[@inline] fenceless_get x = let fenceless_get = atomic_get *) -module Timeout = struct - exception Timeout - - let[@inline never] timeout () = raise Timeout - - type _ t = - | Unset : [> `Unset ] t - | Elapsed : [> `Elapsed ] t - | Call : (unit -> unit) -> [> `Call ] t - | Set : { mutable state : [< `Elapsed | `Call ] t } -> [> `Set ] t - - external as_atomic : [< `Set ] t -> [< `Elapsed | `Call ] t Atomic.t - = "%identity" - - (* Fenceless operations are safe here as the timeout state is not not visible - outside of the library and we don't always need the latest value and, when - we do, there is a fence after. *) - - let[@inline] check (t : [< `Set | `Unset ] t) = - match t with - | Unset -> () - | Set set_r -> - if fenceless_get (as_atomic (Set set_r)) == Elapsed then timeout () - - let set seconds (state : [< `Elapsed | `Call ] t Atomic.t) = - Domain_local_timeout.set_timeoutf seconds @@ fun () -> - match Atomic.exchange state Elapsed with - | Call release_or_cancel -> release_or_cancel () - | Elapsed -> () - - let call_id = Call Fun.id - - let[@inline never] alloc seconds = - let (Set set_r as t : [ `Set ] t) = Set { state = call_id } in - let cancel = set seconds (as_atomic t) in - if not (Atomic.compare_and_set (as_atomic t) call_id (Call cancel)) then - timeout (); - Set set_r - - let[@inline] alloc_opt = function - | None -> Unset - | Some seconds -> alloc seconds - - let[@inline never] await (state : [< `Elapsed | `Call ] t Atomic.t) release = - match fenceless_get state with - | Call cancel as alive -> - if Atomic.compare_and_set state alive (Call release) then Call cancel - else timeout () - | Elapsed -> timeout () - - let[@inline] await (t : [ `Unset | `Set ] t) release = - match t with Unset -> Unset | Set r -> await (as_atomic (Set r)) release - - let[@inline never] unawait (state : [< `Elapsed | `Call ] t Atomic.t) alive = - match fenceless_get state with - | Call _ as await -> - if not (Atomic.compare_and_set state await alive) then timeout () - | Elapsed -> timeout () - - let[@inline] unawait t alive = - match (t, alive) with - | Set set_r, Call call_r -> unawait (as_atomic (Set set_r)) (Call call_r) - | _ -> () - - let[@inline] cancel_alive (alive : [< `Unset | `Call ] t) = - match alive with Unset -> () | Call cancel -> cancel () - - let[@inline] cancel (t : [< `Set | `Unset ] t) = - match t with - | Unset -> () - | Set set_r -> ( - match fenceless_get (as_atomic (Set set_r)) with - | Elapsed -> () - | Call cancel -> cancel ()) -end - module Id = struct let neg_id = Atomic.make (-1) let[@inline] neg_ids n = Atomic.fetch_and_add neg_id (-n) @@ -128,14 +54,12 @@ end = struct x end -type awaiter = unit -> unit - -let[@inline] resume_awaiter awaiter = awaiter () +type awaiter = Trigger.t let[@inline] resume_awaiters = function | [] -> () - | [ awaiter ] -> resume_awaiter awaiter - | awaiters -> List.iter resume_awaiter awaiters + | [ awaiter ] -> Trigger.signal awaiter + | awaiters -> List.iter Trigger.signal awaiters module Mode = struct type t = [ `Lock_free | `Obstruction_free ] @@ -166,7 +90,7 @@ and _ tdt = This field must be first, see [root_as_atomic] and [tree_as_ref]. *) - timeout : [ `Set | `Unset ] Timeout.t; + mutable fiber : Fiber.Maybe.t; mutable mode : Mode.t; mutable validate_counter : int; mutable post_commit : Action.t; @@ -467,67 +391,49 @@ let rec remove_awaiter backoff loc before awaiter = if not (Atomic.compare_and_set (as_atomic loc) state_old state_new) then remove_awaiter (Backoff.once backoff) loc before awaiter -let block timeout loc before = - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await timeout t.release in - if add_awaiter loc before t.release then begin - try t.await () - with cancellation_exn -> - remove_awaiter Backoff.default loc before t.release; - Timeout.cancel_alive alive; - raise cancellation_exn - end; - Timeout.unawait timeout alive +let block loc before = + let t = Trigger.create () in + if add_awaiter loc before t then + match Trigger.await t with + | None -> () + | Some (exn, bt) -> + remove_awaiter Backoff.default loc before t; + Printexc.raise_with_backtrace exn bt -let rec update_no_alloc timeout backoff loc state f = +let rec update_no_alloc backoff loc state f = (* Fenceless is safe as we have had a fence before if needed and there is a fence after. *) let state_old = fenceless_get (as_atomic loc) in let before = eval state_old in match f before with | after -> - if before == after then begin - Timeout.cancel timeout; - before - end + if before == after then before else begin state.after <- after; if Atomic.compare_and_set (as_atomic loc) state_old state then begin resume_awaiters state_old.awaiters; - Timeout.cancel timeout; before end - else update_no_alloc timeout (Backoff.once backoff) loc state f + else update_no_alloc (Backoff.once backoff) loc state f end | exception Retry.Later -> - block timeout loc before; - update_no_alloc timeout backoff loc state f - | exception exn -> - Timeout.cancel timeout; - raise exn + block loc before; + update_no_alloc backoff loc state f -let update_with_state timeout backoff loc f state_old = +let update_with_state backoff loc f state_old = let before = eval state_old in match f before with | after -> - if before == after then begin - Timeout.cancel timeout; - before - end + if before == after then before else let state = new_state after in if Atomic.compare_and_set (as_atomic loc) state_old state then begin resume_awaiters state_old.awaiters; - Timeout.cancel timeout; before end - else update_no_alloc timeout (Backoff.once backoff) loc state f + else update_no_alloc (Backoff.once backoff) loc state f | exception Retry.Later -> let state = new_state before in - block timeout loc before; - update_no_alloc timeout backoff loc state f - | exception exn -> - Timeout.cancel timeout; - raise exn + update_no_alloc backoff loc state f let rec exchange_no_alloc backoff loc state = let state_old = atomic_get (as_atomic loc) in @@ -553,8 +459,8 @@ let[@inline] rec cas_with_state backoff loc before state state_old = having installed or removed a waiter. Fenceless is safe as there was a fence before. *) - cas_with_state (Backoff.once backoff) loc before state - (fenceless_get (as_atomic loc))) + let backoff = Backoff.once backoff in + cas_with_state backoff loc before state (atomic_get (as_atomic loc))) let inc x = x + 1 let dec x = x - 1 @@ -584,25 +490,15 @@ module Loc = struct let[@inline] get_id loc = (to_loc loc).id let get loc = eval (atomic_get (as_atomic (to_loc loc))) - let rec get_as timeout f loc state = + let rec get_as f loc state = let before = eval state in - match f before with - | value -> - Timeout.cancel timeout; - value - | exception Retry.Later -> - block timeout (to_loc loc) before; - (* Fenceless is safe as there was already a fence before. *) - get_as timeout f loc (fenceless_get (as_atomic (to_loc loc))) - | exception exn -> - Timeout.cancel timeout; - raise exn - - let[@inline] get_as ?timeoutf f loc = - get_as - (Timeout.alloc_opt timeoutf) - f loc - (atomic_get (as_atomic (to_loc loc))) + try f before + with Retry.Later -> + block (to_loc loc) before; + (* Fenceless is safe as there was already a fence before. *) + get_as f loc (fenceless_get (as_atomic (to_loc loc))) + + let[@inline] get_as f loc = get_as f loc (atomic_get (as_atomic (to_loc loc))) let[@inline] get_mode loc = if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free @@ -612,21 +508,18 @@ module Loc = struct let state_old = atomic_get (as_atomic (to_loc loc)) in cas_with_state backoff (to_loc loc) before state state_old - let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in - update_with_state timeout backoff (to_loc loc) f + let fenceless_update ?(backoff = Backoff.default) loc f = + update_with_state backoff (to_loc loc) f (fenceless_get (as_atomic (to_loc loc))) - let[@inline] fenceless_modify ?timeoutf ?backoff loc f = - fenceless_update ?timeoutf ?backoff loc f |> ignore + let[@inline] fenceless_modify ?backoff loc f = + fenceless_update ?backoff loc f |> ignore - let update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in - update_with_state timeout backoff (to_loc loc) f + let update ?(backoff = Backoff.default) loc f = + update_with_state backoff (to_loc loc) f (atomic_get (as_atomic (to_loc loc))) - let[@inline] modify ?timeoutf ?backoff loc f = - update ?timeoutf ?backoff loc f |> ignore + let[@inline] modify ?backoff loc f = update ?backoff loc f |> ignore let exchange ?(backoff = Backoff.default) loc value = exchange_no_alloc backoff (to_loc loc) (new_state value) @@ -711,6 +604,17 @@ module Xt = struct tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); before + let check (Xt xt_r : _ t) = + match Fiber.Maybe.to_fiber_or_current xt_r.fiber with + | fiber -> + Fiber.check fiber; + if xt_r.fiber != Fiber.Maybe.of_fiber fiber then + xt_r.fiber <- Fiber.Maybe.of_fiber fiber + | exception _ -> + (* Getting the current fiber should never fail except when not running + inside a scheduler. *) + () + let update_old : type c a. _ -> a loc -> c -> (c, a) up -> _ -> _ -> _ -> a = fun (Xt xt_r as xt : _ t) loc c up lt gt state' -> let c0 = xt_r.validate_counter in @@ -721,7 +625,7 @@ module Xt = struct The assumption is that potentially infinite loops will repeatedly access the same locations. *) if c0 land c1 = 0 then begin - Timeout.check xt_r.timeout; + check xt; validate_all_rec xt !(tree_as_ref xt) end; let state : a state = Obj.magic state' in @@ -929,17 +833,13 @@ module Xt = struct let initial_validate_period = 4 - let success (Xt xt_r : _ t) result = - Timeout.cancel xt_r.timeout; - Action.run xt_r.post_commit result - let rec commit backoff (Xt xt_r as xt : _ t) tx = match tx ~xt with | result -> begin match !(tree_as_ref xt) with - | T Leaf -> success xt result + | T Leaf -> Action.run xt_r.post_commit result | T (Node { loc; state; lt = T Leaf; gt = T Leaf; _ }) -> - if is_cmp xt state then success xt result + if is_cmp xt state then Action.run xt_r.post_commit result else begin state.which <- W After; let before = state.before in @@ -948,7 +848,7 @@ module Xt = struct fence. *) let state_old = fenceless_get (as_atomic loc) in if cas_with_state Backoff.default loc before state state_old then - success xt result + Action.run xt_r.post_commit result else commit_once_reuse backoff xt tx end | T (Node node_r) -> begin @@ -956,22 +856,22 @@ module Xt = struct match determine xt 0 root with | status -> if a_cmp_followed_by_a_cas < status then begin - if finish xt root (verify xt root) then success xt result + if finish xt root (verify xt root) then + Action.run xt_r.post_commit result else begin - (* We switch to [`Lock_free] as there was - interference. *) + (* We switch to [`Lock_free] as there was interference. *) commit_once_alloc backoff `Lock_free xt tx end end else if a_cmp = status || finish xt root (if 0 <= status then After else Before) - then success xt result + then Action.run xt_r.post_commit result else commit_once_alloc backoff xt_r.mode xt tx | exception Exit -> (* Fenceless is safe as there was a fence before. *) if fenceless_get (root_as_atomic xt) == R After then - success xt result + Action.run xt_r.post_commit result else commit_once_alloc backoff xt_r.mode xt tx end end @@ -981,58 +881,52 @@ module Xt = struct | T Leaf -> invalid_retry () | T (Node node_r) -> begin let root = Node node_r in - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await xt_r.timeout t.release in - match add_awaiters t.release xt root with + let t = Trigger.create () in + match add_awaiters t xt root with | T Leaf -> begin - match t.await () with - | () -> - remove_awaiters t.release xt (T Leaf) root |> ignore; - Timeout.unawait xt_r.timeout alive; + match Trigger.await t with + | None -> + remove_awaiters t xt (T Leaf) root |> ignore; commit_reset_reuse backoff xt tx - | exception cancellation_exn -> - remove_awaiters t.release xt (T Leaf) root |> ignore; - Timeout.cancel_alive alive; - raise cancellation_exn + | Some (exn, bt) -> + remove_awaiters t xt (T Leaf) root |> ignore; + Printexc.raise_with_backtrace exn bt end | T (Node _) as stop -> - remove_awaiters t.release xt stop root |> ignore; - Timeout.unawait xt_r.timeout alive; + remove_awaiters t xt stop root |> ignore; commit_once_reuse backoff xt tx end end - | exception exn -> - Timeout.cancel xt_r.timeout; - raise exn and commit_once_reuse backoff xt tx = + check xt; commit_reuse (Backoff.once backoff) xt tx and commit_reset_reuse backoff xt tx = + check xt; commit_reuse (Backoff.reset backoff) xt tx and commit_reuse backoff (Xt xt_r as xt : _ t) tx = tree_as_ref xt := T Leaf; xt_r.validate_counter <- initial_validate_period; xt_r.post_commit <- Action.noop; - Timeout.check xt_r.timeout; commit backoff xt tx - and commit_once_alloc backoff mode (Xt xt_r : _ t) tx = + and commit_once_alloc backoff mode (Xt xt_r as xt : _ t) tx = + check xt; let backoff = Backoff.once backoff in - Timeout.check xt_r.timeout; let rot = U Leaf in let validate_counter = initial_validate_period in let post_commit = Action.noop in let xt = Xt { xt_r with rot; mode; validate_counter; post_commit } in commit backoff xt tx - let[@inline] commit ?timeoutf ?(backoff = Backoff.default) - ?(mode = `Obstruction_free) { tx } = - let timeout = Timeout.alloc_opt timeoutf + let[@inline] commit ?(backoff = Backoff.default) ?(mode = `Obstruction_free) + { tx } = + let fiber = Fiber.Maybe.nothing and rot = U Leaf and validate_counter = initial_validate_period and post_commit = Action.noop in - let xt = Xt { rot; timeout; mode; validate_counter; post_commit } in + let xt = Xt { rot; fiber; mode; validate_counter; post_commit } in commit backoff xt tx end diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 185a2c23..7f7212ca 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -22,7 +22,7 @@ obstruction-free} read-only compare (CMP) operations that can be performed on overlapping locations in parallel without interference. - - {b Blocking await}: The algorithm supports timeouts and awaiting for + - {b Blocking await}: The algorithm supports cancelation and awaiting for changes to any number of shared memory locations. - {b Composable}: Independently developed transactions can be composed with @@ -105,16 +105,9 @@ can skip over these. The documentation links back to these modules where appropriate. *) -(** Timeout support. *) -module Timeout : sig - exception Timeout - (** Exception that may be raised by operations such as {!Loc.get_as}, - {!Loc.update}, {!Loc.modify}, or {!Xt.commit} when given a [~timeoutf] in - seconds. *) -end - -(** Retry support. *) module Retry : sig + (** Retry support. *) + exception Later (** Exception that may be raised to signal that the operation, such as {!Loc.get_as}, {!Loc.update}, or {!Xt.commit}, should be retried, at some @@ -126,7 +119,7 @@ module Retry : sig shared memory locations have already changed. *) val later : unit -> 'a - (** [later ()] is equivalent to [raise Later]. *) + (** [later ()] is equivalent to [raise_notrace Later]. *) val unless : bool -> unit (** [unless condition] is equivalent to [if not condition then later ()]. *) @@ -137,11 +130,12 @@ module Retry : sig outside of the transaction, and the transaction should be retried. *) val invalid : unit -> 'a - (** [invalid ()] is equivalent to [raise Invalid]. *) + (** [invalid ()] is equivalent to [raise_notrace Invalid]. *) end -(** Operating modes of the [k-CAS-n-CMP] algorithm. *) module Mode : sig + (** Operating modes of the [k-CAS-n-CMP] algorithm. *) + type t = [ `Lock_free (** In [`Lock_free] mode the algorithm makes sure that at least one domain will @@ -164,18 +158,15 @@ end - [backoff] specifies the configuration for the [Backoff] mechanism. In special cases, having more detailed knowledge of the application, one - might adjust the configuration to improve performance. + might adjust the configuration to improve performance. *) - - [timeoutf] specifies a timeout in seconds and, if specified, the - {!Timeout.Timeout} exception may be raised by the operation to signal that - the timeout expired. *) +module Loc : sig + (** Shared memory locations. -(** Shared memory locations. + This module is essentially compatible with the [Stdlib.Atomic] module, + except that a number of functions take some optional arguments that one + usually need not worry about. *) - This module is essentially compatible with the [Stdlib.Atomic] module, - except that a number of functions take some optional arguments that one - usually need not worry about. *) -module Loc : sig (** Type of shared memory locations. *) type !'a t = private | Loc : { state : 'state; id : 'id } -> 'a t @@ -225,7 +216,7 @@ module Loc : sig val get : 'a t -> 'a (** [get r] reads the current value of the shared memory location [r]. *) - val get_as : ?timeoutf:float -> ('a -> 'b) -> 'a t -> 'b + val get_as : ('a -> 'b) -> 'a t -> 'b (** [get_as f loc] is equivalent to [f (get loc)]. The given function [f] may raise the {!Retry.Later} exception to signal that the conditional load should be retried only after the location has been modified outside of the @@ -237,7 +228,7 @@ module Loc : sig location [r] to the [after] value if the current value of [r] is the [before] value. *) - val update : ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [update r f] repeats [let b = get r in compare_and_set r b (f b)] until it succeeds and then returns the [b] value. The given function [f] may raise the {!Retry.Later} exception to signal that the update should only be @@ -245,8 +236,7 @@ module Loc : sig also safe for the given function [f] to raise any other exception to abort the update. *) - val modify : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [modify r f] is equivalent to [update r f |> ignore]. *) val exchange : ?backoff:Backoff.t -> 'a t -> 'a -> 'a @@ -277,13 +267,11 @@ module Loc : sig (** [fenceless_get r] is like [get r] except that [fenceless_get]s may be reordered. *) - val fenceless_update : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val fenceless_update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [fenceless_update r f] is like [update r f] except that in case [f x == x] the update may be reordered. *) - val fenceless_modify : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val fenceless_modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [fenceless_modify r f] is like [modify r f] except that in case [f x == x] the modify may be reordered. *) end @@ -318,77 +306,80 @@ end memory locations are atomically marked as having taken effect and subsequent reads of the locations will be able to see the newly written values. *) -(** Explicit transaction log passing on shared memory locations. - - This module provides a way to implement composable transactions over shared - memory locations. A transaction is a function written by the library user - and can be thought of as a specification of a sequence of {!Xt.get} and - {!Xt.set} accesses to shared memory locations. To actually perform the - accesses one then {!Xt.commit}s the transaction. - - Transactions should generally not perform arbitrary side-effects, because - when a transaction is committed it may be attempted multiple times meaning - that the side-effects are also performed multiple times. {!Xt.post_commit} - can be used to perform an action only once after the transaction has been - committed succesfully. - - {b WARNING}: To make it clear, the operations provided by the {!Loc} module - for accessing individual shared memory locations do not implicitly go - through the transaction mechanism and should generally not be used within - transactions. There are advanced algorithms where one might, within a - transaction, perform operations that do not get recorded into the - transaction log. Using such techniques correctly requires expert knowledge - and is not recommended for casual users. - - As an example, consider an implementation of doubly-linked circular - lists. Instead of using a mutable field, [ref], or [Atomic.t], one would use - a shared memory location, or {!Loc.t}, for the pointers in the node type: - - {[ - type 'a node = { - succ: 'a node Loc.t; - pred: 'a node Loc.t; - datum: 'a; - } - ]} - - To remove a node safely one wants to atomically update the [succ] and [pred] - pointers of the predecessor and successor nodes and to also update the - [succ] and [pred] pointers of a node to point to the node itself, so that - removal becomes an {{:https://en.wikipedia.org/wiki/Idempotence} idempotent} - operation. Using explicit transaction log passing one could implement the - [remove] operation as follows: - - {[ - let remove ~xt node = - (* Read pointers to the predecessor and successor nodes: *) - let pred = Xt.get ~xt node.pred in - let succ = Xt.get ~xt node.succ in - (* Update pointers in this node: *) - Xt.set ~xt node.succ node; - Xt.set ~xt node.pred node; - (* Update pointers to this node: *) - Xt.set ~xt pred.succ succ; - Xt.set ~xt succ.pred pred - ]} +module Xt : sig + (** Explicit transaction log passing on shared memory locations. + + This module provides a way to implement composable transactions over + shared memory locations. A transaction is a function written by the + library user and can be thought of as a specification of a sequence of + {!Xt.get} and {!Xt.set} accesses to shared memory locations. To actually + perform the accesses one then {!Xt.commit}s the transaction. + + Transactions should generally not perform arbitrary side-effects, because + when a transaction is committed it may be attempted multiple times meaning + that the side-effects are also performed multiple times. + {!Xt.post_commit} can be used to perform an action only once after the + transaction has been committed succesfully. + + {b WARNING}: To make it clear, the operations provided by the {!Loc} + module for accessing individual shared memory locations do not implicitly + go through the transaction mechanism and should generally not be used + within transactions. There are advanced algorithms where one might, + within a transaction, perform operations that do not get recorded into the + transaction log. Using such techniques correctly requires expert + knowledge and is not recommended for casual users. + + As an example, consider an implementation of doubly-linked circular + lists. Instead of using a mutable field, [ref], or [Atomic.t], one would + use a shared memory location, or {!Loc.t}, for the pointers in the node + type: + + {[ + type 'a node = { + succ: 'a node Loc.t; + pred: 'a node Loc.t; + datum: 'a; + } + ]} + + To remove a node safely one wants to atomically update the [succ] and + [pred] pointers of the predecessor and successor nodes and to also update + the [succ] and [pred] pointers of a node to point to the node itself, so + that removal becomes an {{:https://en.wikipedia.org/wiki/Idempotence} + idempotent} operation. Using explicit transaction log passing one could + implement the [remove] operation as follows: + + {[ + let remove ~xt node = + (* Read pointers to the predecessor and successor nodes: *) + let pred = Xt.get ~xt node.pred in + let succ = Xt.get ~xt node.succ in + (* Update pointers in this node: *) + Xt.set ~xt node.succ node; + Xt.set ~xt node.pred node; + (* Update pointers to this node: *) + Xt.set ~xt pred.succ succ; + Xt.set ~xt succ.pred pred + ]} + + The labeled argument, [~xt], refers to the transaction log. Transactional + operations like {!Xt.get} and {!Xt.set} are then recorded in that log. To + actually remove a node, we need to commit the transaction - The labeled argument, [~xt], refers to the transaction log. Transactional - operations like {!Xt.get} and {!Xt.set} are then recorded in that log. To - actually remove a node, we need to commit the transaction + {@ocaml skip[ + Xt.commit { tx = remove node } + ]} - {@ocaml skip[ - Xt.commit { tx = remove node } - ]} + which repeatedly calls the transaction function, [tx], to record a + transaction log and attempts to atomically perform it until it succeeds. - which repeatedly calls the transaction function, [tx], to record a - transaction log and attempts to atomically perform it until it succeeds. + Notice that [remove] is not recursive. It doesn't have to account for + failure or perform a backoff. It is also not necessary to know or keep + track of what the previous values of locations were. All of that is taken + care of for us by the transaction log and the {!Xt.commit} function. + Furthermore, [remove] can easily be called as a part of a more complex + transaction. *) - Notice that [remove] is not recursive. It doesn't have to account for - failure or perform a backoff. It is also not necessary to know or keep track - of what the previous values of locations were. All of that is taken care of - for us by the transaction log and the {!Xt.commit} function. Furthermore, - [remove] can easily be called as a part of a more complex transaction. *) -module Xt : sig type 'x t (** Type of an explicit transaction log on shared memory locations. @@ -548,8 +539,7 @@ module Xt : sig val call : xt:'x t -> 'a tx -> 'a (** [call ~xt tx] is equivalent to [tx.Xt.tx ~xt]. *) - val commit : - ?timeoutf:float -> ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a + val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a (** [commit tx] repeatedly calls [tx] to record a log of shared memory accesses and attempts to perform them atomically until it succeeds and then returns whatever [tx] returned. [tx] may raise {!Retry.Later} or diff --git a/src/kcas_data/dllist.ml b/src/kcas_data/dllist.ml index 4b93cd48..7be748a1 100644 --- a/src/kcas_data/dllist.ml +++ b/src/kcas_data/dllist.ml @@ -225,13 +225,8 @@ let move_l node list = Kcas.Xt.commit { tx = Xt.move_l node list } let move_r node list = Kcas.Xt.commit { tx = Xt.move_r node list } let take_opt_l list = Kcas.Xt.commit { tx = Xt.take_opt_l list } let take_opt_r list = Kcas.Xt.commit { tx = Xt.take_opt_r list } - -let take_blocking_l ?timeoutf list = - Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking_l list } - -let take_blocking_r ?timeoutf list = - Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking_r list } - +let take_blocking_l list = Kcas.Xt.commit { tx = Xt.take_blocking_l list } +let take_blocking_r list = Kcas.Xt.commit { tx = Xt.take_blocking_r list } let swap t1 t2 = Kcas.Xt.commit { tx = Xt.swap t1 t2 } let transfer_l t1 t2 = Kcas.Xt.commit { tx = Xt.transfer_l t1 t2 } let transfer_r t1 t2 = Kcas.Xt.commit { tx = Xt.transfer_r t1 t2 } diff --git a/src/kcas_data/dllist.mli b/src/kcas_data/dllist.mli index de21e093..5f0163b2 100644 --- a/src/kcas_data/dllist.mli +++ b/src/kcas_data/dllist.mli @@ -59,7 +59,6 @@ module Xt : with type 'a t := 'a t with type 'a node := 'a node with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on doubly-linked lists. *) (** {1 Non-compositional interface} *) @@ -69,7 +68,6 @@ include with type 'a t := 'a t with type 'a node := 'a node with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn val take_all : 'a t -> 'a t (** [take_all l] removes all nodes of the doubly-linked list [l] and returns a diff --git a/src/kcas_data/dllist_intf.ml b/src/kcas_data/dllist_intf.ml index 738fda2c..406a9f94 100644 --- a/src/kcas_data/dllist_intf.ml +++ b/src/kcas_data/dllist_intf.ml @@ -2,7 +2,6 @@ module type Ops = sig type 'a t type 'a node type ('x, 'fn) fn - type ('x, 'fn) blocking_fn (** {2 Operations on nodes} *) @@ -42,12 +41,12 @@ module type Ops = sig (** [take_opt_r l] removes and returns the value of rightmost node of the doubly-linked list [l], or return [None] if the list is empty. *) - val take_blocking_l : ('x, 'a t -> 'a) blocking_fn + val take_blocking_l : ('x, 'a t -> 'a) fn (** [take_blocking_l l] removes and returns the value of leftmost node of the doubly-linked list [l], or blocks waiting for the list to become non-empty. *) - val take_blocking_r : ('x, 'a t -> 'a) blocking_fn + val take_blocking_r : ('x, 'a t -> 'a) fn (** [take_blocking_r l] removes and returns the value of rightmost node of the doubly-linked list [l], or blocks waiting for the list to become non-empty. *) diff --git a/src/kcas_data/mvar.ml b/src/kcas_data/mvar.ml index ba31e192..690e3dbc 100644 --- a/src/kcas_data/mvar.ml +++ b/src/kcas_data/mvar.ml @@ -25,18 +25,17 @@ end let is_empty mv = Magic_option.is_none (Loc.get mv) -let put ?timeoutf mv value = +let put mv value = (* Fenceless is safe as we always update. *) - Loc.fenceless_modify ?timeoutf mv (Magic_option.put_or_retry value) + Loc.fenceless_modify mv (Magic_option.put_or_retry value) let try_put mv value = Loc.compare_and_set mv Magic_option.none (Magic_option.some value) -let take ?timeoutf mv = +let take mv = (* Fenceless is safe as we always update. *) - Magic_option.get_unsafe - (Loc.fenceless_update ?timeoutf mv Magic_option.take_or_retry) + Magic_option.get_unsafe (Loc.fenceless_update mv Magic_option.take_or_retry) let take_opt mv = Magic_option.to_option (Loc.exchange mv Magic_option.none) -let peek ?timeoutf mv = Loc.get_as ?timeoutf Magic_option.get_or_retry mv +let peek mv = Loc.get_as Magic_option.get_or_retry mv let peek_opt mv = Magic_option.to_option (Loc.get mv) diff --git a/src/kcas_data/mvar.mli b/src/kcas_data/mvar.mli index f0a7d5ca..12a2b8df 100644 --- a/src/kcas_data/mvar.mli +++ b/src/kcas_data/mvar.mli @@ -26,13 +26,8 @@ module Xt : Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction passing on synchronizing variables. *) (** {1 Non-compositional interface} *) -include - Mvar_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn diff --git a/src/kcas_data/mvar_intf.ml b/src/kcas_data/mvar_intf.ml index e9f6c7e3..3df088f2 100644 --- a/src/kcas_data/mvar_intf.ml +++ b/src/kcas_data/mvar_intf.ml @@ -1,13 +1,12 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty mv] determines whether the synchronizing variable [mv] contains a value or not. *) - val put : ('x, 'a t -> 'a -> unit) blocking_fn + val put : ('x, 'a t -> 'a -> unit) fn (** [put mv x] fills the synchronizing variable [mv] with the value [v] or blocks until the variable becomes empty. *) @@ -16,7 +15,7 @@ module type Ops = sig value [v] and returns [true] on success or [false] in case the variable is full. *) - val take : ('x, 'a t -> 'a) blocking_fn + val take : ('x, 'a t -> 'a) fn (** [take mv] removes and returns the current value of the synchronizing variable [mv] or blocks waiting until the variable is filled. *) @@ -24,7 +23,7 @@ module type Ops = sig (** [take_opt mv] removes and returns the current value of the synchronizing variable [mv] or returns [None] in case the variable is empty. *) - val peek : ('x, 'a t -> 'a) blocking_fn + val peek : ('x, 'a t -> 'a) fn (** [peek mv] returns the current value of the synchronizing variable [mv] or blocks waiting until the variable is filled. *) diff --git a/src/kcas_data/promise.ml b/src/kcas_data/promise.ml index 0786ff9c..cf25e189 100644 --- a/src/kcas_data/promise.ml +++ b/src/kcas_data/promise.ml @@ -38,8 +38,7 @@ module Xt = struct let resolve_error ~xt u e = resolve ~xt u (Error e) end -let await ?timeoutf t = - Loc.get_as ?timeoutf Magic_option.get_or_retry (of_promise t) +let await t = Loc.get_as Magic_option.get_or_retry (of_promise t) let resolve u v = if @@ -51,8 +50,8 @@ let resolve u v = let peek t = Magic_option.to_option (Loc.get (of_promise t)) let is_resolved t = Magic_option.is_some (Loc.get (of_promise t)) -let await_exn ?timeoutf t = - match await ?timeoutf t with Ok value -> value | Error exn -> raise exn +let await_exn t = + match await t with Ok value -> value | Error exn -> raise exn let resolve_ok u v = resolve u (Ok v) let resolve_error u e = resolve u (Error e) diff --git a/src/kcas_data/promise.mli b/src/kcas_data/promise.mli index 11c49988..6515f908 100644 --- a/src/kcas_data/promise.mli +++ b/src/kcas_data/promise.mli @@ -42,7 +42,6 @@ module Xt : with type 'a or_exn := 'a or_exn with type 'a u := 'a u with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on promises. *) (** {1 Non-compositional interface} *) @@ -53,4 +52,3 @@ include with type 'a or_exn := 'a or_exn with type 'a u := 'a u with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn diff --git a/src/kcas_data/promise_intf.ml b/src/kcas_data/promise_intf.ml index 93d1034a..ffe512f9 100644 --- a/src/kcas_data/promise_intf.ml +++ b/src/kcas_data/promise_intf.ml @@ -3,14 +3,13 @@ module type Ops = sig type !-'a u type 'a or_exn type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val resolve : ('x, 'a u -> 'a -> unit) fn (** [resolve u v] resolves the promise corresponding to the resolver [u] to the value [v]. Any awaiters of the corresponding promise are then unblocked. *) - val await : ('x, 'a t -> 'a) blocking_fn + val await : ('x, 'a t -> 'a) fn (** [await t] either immediately returns the resolved value of the promise [t] or blocks until the promise [t] is resolved. *) @@ -24,7 +23,7 @@ module type Ops = sig (** {2 Result promises} *) - val await_exn : ('x, 'a or_exn -> 'a) blocking_fn + val await_exn : ('x, 'a or_exn -> 'a) fn (** [await_exn t] is equivalent to [match await t with v -> v | exception e -> raise e]. *) val resolve_ok : ('x, ('a, 'b) result u -> 'a -> unit) fn diff --git a/src/kcas_data/queue.ml b/src/kcas_data/queue.ml index 543bb0c7..d8742eb6 100644 --- a/src/kcas_data/queue.ml +++ b/src/kcas_data/queue.ml @@ -139,10 +139,10 @@ let take_opt q = | None -> Kcas.Xt.commit { tx = Xt.take_opt q } | some -> some -let take_blocking ?timeoutf q = +let take_blocking q = (* Fenceless is safe as we revert to a transaction in case we didn't update. *) match Loc.fenceless_update q.front Elems.tl_safe |> Elems.hd_opt with - | None -> Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking q } + | None -> Kcas.Xt.commit { tx = Xt.take_blocking q } | Some elem -> elem let take_all q = Kcas.Xt.commit { tx = Xt.take_all q } @@ -152,9 +152,7 @@ let peek_opt q = | None -> Kcas.Xt.commit { tx = Xt.peek_opt q } | some -> some -let peek_blocking ?timeoutf q = - Kcas.Xt.commit ?timeoutf { tx = Xt.peek_blocking q } - +let peek_blocking q = Kcas.Xt.commit { tx = Xt.peek_blocking q } let clear q = Kcas.Xt.commit { tx = Xt.clear q } let swap q1 q2 = Kcas.Xt.commit { tx = Xt.swap q1 q2 } let to_seq q = Kcas.Xt.commit { tx = Xt.to_seq q } diff --git a/src/kcas_data/queue.mli b/src/kcas_data/queue.mli index d955680c..f4dd49a9 100644 --- a/src/kcas_data/queue.mli +++ b/src/kcas_data/queue.mli @@ -31,16 +31,11 @@ module Xt : Queue_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on queues. *) (** {1 Non-compositional interface} *) -include - Queue_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Queue_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn val peek : 'a t -> 'a (** [peek q] returns the first element in queue [s], or raises {!Empty} if the diff --git a/src/kcas_data/queue_intf.ml b/src/kcas_data/queue_intf.ml index c787bdde..5293058e 100644 --- a/src/kcas_data/queue_intf.ml +++ b/src/kcas_data/queue_intf.ml @@ -1,7 +1,6 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty s] determines whether the queue [q] is empty. *) @@ -32,11 +31,11 @@ module type Ops = sig (** [peek_opt q] returns the first element in queue [q], without removing it from the queue, or returns [None] if the queue is empty. *) - val peek_blocking : ('x, 'a t -> 'a) blocking_fn + val peek_blocking : ('x, 'a t -> 'a) fn (** [peek_blocking q] returns the first element in queue [q], without removing it from the queue, or blocks waiting for the queue to become non-empty. *) - val take_blocking : ('x, 'a t -> 'a) blocking_fn + val take_blocking : ('x, 'a t -> 'a) fn (** [take_blocking q] removes and returns the first element in queue [q], or blocks waiting for the queue to become non-empty. *) diff --git a/src/kcas_data/stack.ml b/src/kcas_data/stack.ml index c4d0170a..ad684e36 100644 --- a/src/kcas_data/stack.ml +++ b/src/kcas_data/stack.ml @@ -30,12 +30,12 @@ let push x s = let pop_opt s = Loc.update s Elems.tl_safe |> Elems.hd_opt let pop_all s = Loc.exchange s Elems.empty |> Elems.to_seq -let pop_blocking ?timeoutf s = +let pop_blocking s = (* Fenceless is safe as we always update. *) - Loc.fenceless_update ?timeoutf s Elems.tl_or_retry |> Elems.hd_unsafe + Loc.fenceless_update s Elems.tl_or_retry |> Elems.hd_unsafe let top_opt s = Loc.get s |> Elems.hd_opt -let top_blocking ?timeoutf s = Loc.get_as ?timeoutf Elems.hd_or_retry s +let top_blocking s = Loc.get_as Elems.hd_or_retry s let clear s = Loc.set s Elems.empty let swap s1 s2 = Kcas.Xt.commit { tx = Kcas.Xt.swap s1 s2 } let to_seq s = Elems.to_seq @@ Loc.get s diff --git a/src/kcas_data/stack.mli b/src/kcas_data/stack.mli index fa7a67a0..d84fe9a6 100644 --- a/src/kcas_data/stack.mli +++ b/src/kcas_data/stack.mli @@ -33,16 +33,11 @@ module Xt : Stack_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on stacks. *) (** {1 Non-compositional interface} *) -include - Stack_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Stack_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn val pop : 'a t -> 'a (** [pop s] removes and returns the topmost element in stack [s], or raises diff --git a/src/kcas_data/stack_intf.ml b/src/kcas_data/stack_intf.ml index 640a840f..53e980f6 100644 --- a/src/kcas_data/stack_intf.ml +++ b/src/kcas_data/stack_intf.ml @@ -1,7 +1,6 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty s] determines whether the stack [s] is empty. *) @@ -34,7 +33,7 @@ module type Ops = sig sequence for iterating through all the elements that were in the stack top to bottom. *) - val pop_blocking : ('x, 'a t -> 'a) blocking_fn + val pop_blocking : ('x, 'a t -> 'a) fn (** [pop_blocking s] removes and returns the topmost element of the stack [s], or blocks waiting for the queue to become non-empty. *) @@ -42,7 +41,7 @@ module type Ops = sig (** [top_opt s] returns the topmost element in stack [s], or [None] if the stack is empty. *) - val top_blocking : ('x, 'a t -> 'a) blocking_fn + val top_blocking : ('x, 'a t -> 'a) fn (** [top_blocking s] returns the topmost element in stack [s], or blocks waiting for the queue to become non-empty. *) end diff --git a/test/kcas/dune b/test/kcas/dune index e78098c1..7582857d 100644 --- a/test/kcas/dune +++ b/test/kcas/dune @@ -1,9 +1,11 @@ (tests (names test ms_queue_test threads loc_modes) (libraries + scheduler + picos_std.finally + picos_std.structured alcotest kcas - domain-local-timeout threads.posix unix domain_shims) diff --git a/test/kcas/loc_modes.ml b/test/kcas/loc_modes.ml index aa4af6f7..8077aa0b 100644 --- a/test/kcas/loc_modes.ml +++ b/test/kcas/loc_modes.ml @@ -1,3 +1,4 @@ +open Picos_std_structured open Kcas let loop_count = try int_of_string Sys.argv.(1) with _ -> Util.iter_factor @@ -76,6 +77,6 @@ let accumulator_thread () = exit := true let () = - accumulator_thread :: List.init n_counters counter_thread - |> List.map Domain.spawn |> List.iter Domain.join; + Scheduler.run ~n_domains:(n_counters + 1) @@ fun () -> + Run.all (accumulator_thread :: List.init n_counters counter_thread); Printf.printf "Loc modes OK!\n%!" diff --git a/test/kcas/ms_queue_test.ml b/test/kcas/ms_queue_test.ml index d48ad6b1..950a075a 100644 --- a/test/kcas/ms_queue_test.ml +++ b/test/kcas/ms_queue_test.ml @@ -1,3 +1,4 @@ +open Picos_std_structured open Kcas module Q = struct @@ -43,6 +44,8 @@ let failure exit msg = failwith msg let write_skew_test n = + Scheduler.run ~n_domains:3 @@ fun () -> + Flock.join_after @@ fun () -> let q1 = Q.queue () and q2 = Q.queue () in let push_to_q2 ~xt = @@ -61,20 +64,20 @@ let write_skew_test n = let exit = Atomic.make false in - let domains = - [ - Domain.spawn (fun () -> - sync (); - while not (Atomic.get exit) do - Xt.commit { tx = push_to_q1 } - done); - Domain.spawn (fun () -> - sync (); - while not (Atomic.get exit) do - Xt.commit { tx = push_to_q2 } - done); - ] - in + begin + Flock.fork @@ fun () -> + sync (); + while not (Atomic.get exit) do + Xt.commit { tx = push_to_q1 } + done + end; + begin + Flock.fork (fun () -> + sync (); + while not (Atomic.get exit) do + Xt.commit { tx = push_to_q2 } + done) + end; sync (); for _ = 1 to n do @@ -82,11 +85,10 @@ let write_skew_test n = | Some _, Some _ -> failure exit "write skew!" | _ -> () done; - Atomic.set exit true; - - List.iter Domain.join domains + Atomic.set exit true let tail_leak_test n = + Scheduler.run ~n_domains:2 @@ fun () -> let q = Q.queue () in let m = 2 in @@ -120,7 +122,7 @@ let tail_leak_test n = raise e in - List.init m domain |> List.map Domain.spawn |> List.iter Domain.join + Run.all (List.init m domain) let () = let n = try int_of_string Sys.argv.(1) with _ -> 1 * Util.iter_factor in diff --git a/test/kcas/test.ml b/test/kcas/test.ml index 1b433c5a..fd8509cb 100644 --- a/test/kcas/test.ml +++ b/test/kcas/test.ml @@ -24,73 +24,71 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ---------------------------------------------------------------------------*) +open Picos_std_finally +open Picos_std_structured + let is_single = Domain.recommended_domain_count () = 1 open Kcas -let nb_iter = 100 * Util.iter_factor +let nb_iter = 50 * Util.iter_factor +let n_repeats = 30 let assert_kcas loc expected_v = let present_v = Loc.get loc in assert (present_v == expected_v) -let run_domains = function - | [] -> () - | main :: others -> - let others = List.map Domain.spawn others in - main (); - List.iter Domain.join others - (* *) let test_non_linearizable_xt () = - [ `Obstruction_free; `Lock_free ] - |> List.iter @@ fun mode -> - let barrier = Barrier.make 2 - and n_iter = 100 * Util.iter_factor - and test_finished = ref false in - - let a = Loc.make ~mode 0 and b = Loc.make ~mode 0 in - - let cass1a ~xt = - (Xt.get ~xt b == 0 && Xt.compare_and_set ~xt a 0 1) || Retry.invalid () - and cass1b ~xt = - (Xt.get ~xt b == 0 && Xt.compare_and_set ~xt a 1 0) || Retry.invalid () - and cass2a ~xt = - (Xt.compare_and_set ~xt b 0 1 && Xt.get ~xt a == 0) || Retry.invalid () - and cass2b ~xt = - (Xt.compare_and_set ~xt b 1 0 && Xt.get ~xt a == 0) || Retry.invalid () - in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + [ `Obstruction_free; `Lock_free ] + |> List.iter @@ fun mode -> + let barrier = Barrier.make 2 and test_finished = ref false in + + let a = Loc.make ~mode 0 and b = Loc.make ~mode 0 in + + let cass1a ~xt = + (Xt.get ~xt b == 0 && Xt.compare_and_set ~xt a 0 1) || Retry.invalid () + and cass1b ~xt = + (Xt.get ~xt b == 0 && Xt.compare_and_set ~xt a 1 0) || Retry.invalid () + and cass2a ~xt = + (Xt.compare_and_set ~xt b 0 1 && Xt.get ~xt a == 0) || Retry.invalid () + and cass2b ~xt = + (Xt.compare_and_set ~xt b 1 0 && Xt.get ~xt a == 0) || Retry.invalid () + in - let atomically tx = - if Random.bool () then Xt.commit ~mode:`Obstruction_free tx - else Xt.commit tx - in + let atomically tx = + if Random.bool () then Xt.commit ~mode:`Obstruction_free tx + else Xt.commit tx + in - let thread1 () = - Barrier.await barrier; - while not !test_finished do - if atomically { tx = cass1a } then - while not (atomically { tx = cass1b }) do - if is_single then Domain.cpu_relax (); - assert (Loc.get a == 1 && Loc.get b == 0) - done - else if is_single then Domain.cpu_relax () - done - and thread2 () = - Barrier.await barrier; - for _ = 1 to n_iter do - if atomically { tx = cass2a } then - while not (atomically { tx = cass2b }) do - if is_single then Domain.cpu_relax (); - assert (Loc.get a == 0 && Loc.get b == 1) - done - else if is_single then Domain.cpu_relax () - done; - test_finished := true - in + let thread1 () = + Barrier.await barrier; + while not !test_finished do + if atomically { tx = cass1a } then + while not (atomically { tx = cass1b }) do + if is_single then Domain.cpu_relax (); + assert (Loc.get a == 1 && Loc.get b == 0) + done + else if is_single then Domain.cpu_relax () + done + and thread2 () = + Barrier.await barrier; + for _ = 1 to nb_iter do + if atomically { tx = cass2a } then + while not (atomically { tx = cass2b }) do + if is_single then Domain.cpu_relax (); + assert (Loc.get a == 0 && Loc.get b == 1) + done + else if is_single then Domain.cpu_relax () + done; + test_finished := true + in - run_domains [ thread2; thread1 ] + Run.all [ thread2; thread1 ] + done (* *) @@ -103,124 +101,130 @@ let test_set () = (* *) let test_no_skew_xt () = - [ `Obstruction_free; `Lock_free ] - |> List.iter @@ fun mode -> - let barrier = Barrier.make 3 in - let test_finished = Atomic.make false in - - let a1 = Loc.make ~mode 0 in - let a2 = Loc.make ~mode 0 in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:3 @@ fun () -> + [ `Obstruction_free; `Lock_free ] + |> List.iter @@ fun mode -> + let barrier = Barrier.make 3 in + let test_finished = Atomic.make false in + + let a1 = Loc.make ~mode 0 in + let a2 = Loc.make ~mode 0 in + + let thread1 () = + let c1 ~xt = + Xt.compare_and_set ~xt a1 0 1 && Xt.compare_and_set ~xt a2 0 1 + in + let c2 ~xt = + Xt.compare_and_set ~xt a1 1 0 && Xt.compare_and_set ~xt a2 1 0 + in - let thread1 () = - let c1 ~xt = - Xt.compare_and_set ~xt a1 0 1 && Xt.compare_and_set ~xt a2 0 1 + Barrier.await barrier; + + for _ = 1 to nb_iter do + assert_kcas a1 0; + assert_kcas a2 0; + + let out1 = Xt.commit { tx = c1 } in + assert out1; + + assert_kcas a1 1; + assert_kcas a2 1; + + let out2 = Xt.commit { tx = c2 } in + assert out2 + done; + Atomic.set test_finished true + and thread2 () = + let c1 ~xt = Xt.get ~xt a1 == 0 && Xt.get ~xt a2 == 1 in + let c2 ~xt = Xt.get ~xt a2 == 1 && Xt.get ~xt a2 == 0 in + + Barrier.await barrier; + + while not (Atomic.get test_finished) do + let out1 = Xt.commit { tx = c1 } in + let out2 = Xt.commit { tx = c2 } in + assert (not out1); + assert (not out2); + if is_single then Domain.cpu_relax () + done + and thread3 () = + let c1 ~xt = Xt.get ~xt a1 == 1 && Xt.get ~xt a2 == 0 in + let c2 ~xt = Xt.get ~xt a2 == 0 && Xt.get ~xt a2 == 1 in + + Barrier.await barrier; + + while not (Atomic.get test_finished) do + let out1 = Xt.commit { tx = c1 } in + let out2 = Xt.commit { tx = c2 } in + assert (not out1); + assert (not out2); + if is_single then Domain.cpu_relax () + done in - let c2 ~xt = - Xt.compare_and_set ~xt a1 1 0 && Xt.compare_and_set ~xt a2 1 0 - in - - Barrier.await barrier; - - for _ = 1 to nb_iter do - assert_kcas a1 0; - assert_kcas a2 0; - - let out1 = Xt.commit { tx = c1 } in - assert out1; - - assert_kcas a1 1; - assert_kcas a2 1; - - let out2 = Xt.commit { tx = c2 } in - assert out2 - done; - Atomic.set test_finished true - and thread2 () = - let c1 ~xt = Xt.get ~xt a1 == 0 && Xt.get ~xt a2 == 1 in - let c2 ~xt = Xt.get ~xt a2 == 1 && Xt.get ~xt a2 == 0 in - - Barrier.await barrier; - - while not (Atomic.get test_finished) do - let out1 = Xt.commit { tx = c1 } in - let out2 = Xt.commit { tx = c2 } in - assert (not out1); - assert (not out2); - if is_single then Domain.cpu_relax () - done - and thread3 () = - let c1 ~xt = Xt.get ~xt a1 == 1 && Xt.get ~xt a2 == 0 in - let c2 ~xt = Xt.get ~xt a2 == 0 && Xt.get ~xt a2 == 1 in - - Barrier.await barrier; - - while not (Atomic.get test_finished) do - let out1 = Xt.commit { tx = c1 } in - let out2 = Xt.commit { tx = c2 } in - assert (not out1); - assert (not out2); - if is_single then Domain.cpu_relax () - done - in - run_domains [ thread1; thread2; thread3 ] + Run.all [ thread1; thread2; thread3 ] + done (* *) let test_get_seq_xt () = - [ `Obstruction_free; `Lock_free ] - |> List.iter @@ fun mode -> - let barrier = Barrier.make 4 in - let test_finished = Atomic.make false in - - let a1 = Loc.make ~mode 0 in - let a2 = Loc.make ~mode 0 in - - let mutator () = - Barrier.await barrier; - for _ = 0 to nb_iter do - let tx ~xt = - Xt.incr ~xt a1; - Xt.incr ~xt a2 - in - Xt.commit { tx } - done; - Atomic.set test_finished true - and getter () = - Barrier.await barrier; - while not (Atomic.get test_finished) do - let a = Loc.get a1 in - let b = Loc.get a2 in - assert (a <= b); - if is_single then Domain.cpu_relax () - done - and getaser () = - Barrier.await barrier; - while not (Atomic.get test_finished) do - let a = Loc.get_as Fun.id a1 in - let b = Loc.get_as Fun.id a2 in - assert (a <= b); - if is_single then Domain.cpu_relax () - done - and committer () = - Barrier.await barrier; - while not (Atomic.get test_finished) do - let a = Xt.commit { tx = Xt.get a1 } in - let b = Xt.commit { tx = Xt.get a2 } in - assert (a <= b); - if is_single then Domain.cpu_relax () - done - and updater () = - Barrier.await barrier; - while not (Atomic.get test_finished) do - let a = Loc.update a1 Fun.id in - let b = Loc.update a2 Fun.id in - assert (a <= b); - if is_single then Domain.cpu_relax () - done - in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:5 @@ fun () -> + [ `Obstruction_free; `Lock_free ] + |> List.iter @@ fun mode -> + let barrier = Barrier.make 5 in + let test_finished = Atomic.make false in + + let a1 = Loc.make ~mode 0 in + let a2 = Loc.make ~mode 0 in + + let mutator () = + Barrier.await barrier; + for _ = 0 to nb_iter do + let tx ~xt = + Xt.incr ~xt a1; + Xt.incr ~xt a2 + in + Xt.commit { tx } + done; + Atomic.set test_finished true + and getter () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Loc.get a1 in + let b = Loc.get a2 in + assert (a <= b); + if is_single then Domain.cpu_relax () + done + and getaser () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Loc.get_as Fun.id a1 in + let b = Loc.get_as Fun.id a2 in + assert (a <= b); + if is_single then Domain.cpu_relax () + done + and committer () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Xt.commit { tx = Xt.get a1 } in + let b = Xt.commit { tx = Xt.get a2 } in + assert (a <= b); + if is_single then Domain.cpu_relax () + done + and updater () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Loc.update a1 Fun.id in + let b = Loc.update a2 Fun.id in + assert (a <= b); + if is_single then Domain.cpu_relax () + done + in - run_domains [ mutator; getter; getaser; committer; updater ] + Run.all [ mutator; getter; getaser; committer; updater ] + done (* *) @@ -271,34 +275,37 @@ let in_place_shuffle array = (* *) let test_presort_and_is_in_log_xt () = - let n_incs = 10 * Util.iter_factor and n_domains = 3 and n_locs = 12 in - let n_locs_half = n_locs asr 1 in - - let barrier = Barrier.make n_domains in - - let locs = Array.init n_locs (fun _ -> Loc.make 0) in - - let thread () = - let locs = Array.copy locs in - Random.self_init (); - Barrier.await barrier; - for _ = 1 to n_incs do - in_place_shuffle locs; - let tx ~xt = - for i = 0 to n_locs_half - 1 do - Xt.incr ~xt locs.(i) - done; - assert (Xt.is_in_log ~xt locs.(Random.int n_locs_half)); - assert (not (Xt.is_in_log ~xt locs.(n_locs_half))) - in - Xt.commit { tx } - done - in + for _ = 1 to n_repeats do + let n_incs = 10 * Util.iter_factor and n_domains = 3 and n_locs = 12 in + let n_locs_half = n_locs asr 1 in + + Scheduler.run ~n_domains @@ fun () -> + let barrier = Barrier.make n_domains in + + let locs = Array.init n_locs (fun _ -> Loc.make 0) in + + let thread () = + let locs = Array.copy locs in + Random.self_init (); + Barrier.await barrier; + for _ = 1 to n_incs do + in_place_shuffle locs; + let tx ~xt = + for i = 0 to n_locs_half - 1 do + Xt.incr ~xt locs.(i) + done; + assert (Xt.is_in_log ~xt locs.(Random.int n_locs_half)); + assert (not (Xt.is_in_log ~xt locs.(n_locs_half))) + in + Xt.commit { tx } + done + in - run_domains (List.init n_domains (Fun.const thread)); + Run.all (List.init n_domains (Fun.const thread)); - let sum = locs |> Array.map Loc.get |> Array.fold_left ( + ) 0 in - assert (sum = n_incs * n_locs_half * n_domains) + let sum = locs |> Array.map Loc.get |> Array.fold_left ( + ) 0 in + assert (sum = n_incs * n_locs_half * n_domains) + done (* *) @@ -344,176 +351,194 @@ let test_post_commit () = (* *) let test_blocking () = - let state = Loc.make `Spawned in - let await state' = - (* Intentionally test that [Xt.modify] allows retry. *) - let tx ~xt = - Xt.modify ~xt state @@ fun state -> - Retry.unless (state == state'); - state - in - Xt.commit { tx } - in - - let a = Loc.make 0 and bs = Array.init 10 @@ fun _ -> Loc.make 0 in - - let n = 10 * Util.iter_factor in - - let num_attempts = ref 0 in - - let other_domain = - Domain.spawn @@ fun () -> - Loc.set state `Get_a_non_zero; - Loc.get_as - (fun a -> - incr num_attempts; - Retry.unless (a != 0)) - a; - - Loc.set state `Update_a_zero; - Loc.modify a (fun a -> - incr num_attempts; - Retry.unless (a = 0); - a + 1); - - Loc.set state `Set_1_b_to_0; - let bs = Array.copy bs in - for _ = 1 to n do - (* We access in random order to exercise tx log waiters handling. *) - in_place_shuffle bs; + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + let state = Loc.make `Spawned in + let await state' = + (* Intentionally test that [Xt.modify] allows retry. *) let tx ~xt = - incr num_attempts; - match - bs - |> Array.find_map @@ fun b -> - if Xt.get ~xt b = 1 then Some b - else if Loc.has_awaiters b (* There must be no leaked waiters... *) - then begin - (* ...except if main domain just set the loc *) - assert (Loc.get b = 1); - Retry.later () - end - else None - with - | None -> Retry.later () - | Some b -> Xt.set ~xt b 0 + Xt.modify ~xt state @@ fun state -> + Retry.unless (state == state'); + state in Xt.commit { tx } - done - in + in - await `Get_a_non_zero; - Loc.set a 1; - assert (!num_attempts <= 2 + 1 (* Need to account for race to next. *)); + let a = Loc.make 0 and bs = Array.init 10 @@ fun _ -> Loc.make 0 in - await `Update_a_zero; - Loc.set a 0; - assert (!num_attempts <= 4 + 1 (* Need to account for race to next. *)); + let n = 10 * Util.iter_factor in - await `Set_1_b_to_0; - for _ = 1 to n do - let i = Random.int (Array.length bs) in - Loc.set bs.(i) 1; - Loc.get_as (fun b -> Retry.unless (b = 0)) bs.(i) - done; + let num_attempts = ref 0 in - Domain.join other_domain; + begin + Flock.join_after @@ fun () -> + begin + Flock.fork @@ fun () -> + Loc.set state `Get_a_non_zero; + Loc.get_as + (fun a -> + incr num_attempts; + Retry.unless (a != 0)) + a; + + Loc.set state `Update_a_zero; + Loc.modify a (fun a -> + incr num_attempts; + Retry.unless (a = 0); + a + 1); + + Loc.set state `Set_1_b_to_0; + let bs = Array.copy bs in + for _ = 1 to n do + (* We access in random order to exercise tx log waiters handling. *) + in_place_shuffle bs; + let tx ~xt = + incr num_attempts; + match + bs + |> Array.find_map @@ fun b -> + if Xt.get ~xt b = 1 then Some b + else if + Loc.has_awaiters b (* There must be no leaked waiters... *) + then begin + (* ...except if main domain just set the loc *) + assert (Loc.get b = 1); + Retry.later () + end + else None + with + | None -> Retry.later () + | Some b -> Xt.set ~xt b 0 + in + Xt.commit { tx } + done + end; + + await `Get_a_non_zero; + Loc.set a 1; + assert (!num_attempts <= 3 + 1 (* Need to account for race to next. *)); + + await `Update_a_zero; + Loc.set a 0; + assert (!num_attempts <= 5 + 1 (* Need to account for race to next. *)); + + await `Set_1_b_to_0; + for _ = 1 to n do + let i = Random.int (Array.length bs) in + Loc.set bs.(i) 1; + Loc.get_as (fun b -> Retry.unless (b = 0)) bs.(i) + done + end; - assert (!num_attempts <= 4 + (n * 2)); - for i = 0 to Array.length bs - 1 do - assert (not (Loc.has_awaiters bs.(i))) + assert (!num_attempts <= 5 + (n * 2)); + for i = 0 to Array.length bs - 1 do + assert (not (Loc.has_awaiters bs.(i))) + done done let test_no_unnecessary_wakeups () = - let continue = Loc.make false and tries = Atomic.make 0 in - - let other_domain = - Domain.spawn @@ fun () -> - continue - |> Loc.get_as @@ fun s -> - Atomic.incr tries; - Retry.unless s - in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + let continue = Loc.make false and tries = Atomic.make 0 in - while not (Loc.has_awaiters continue) do - Domain.cpu_relax () - done; + begin + Flock.join_after @@ fun () -> + begin + Flock.fork @@ fun () -> + continue + |> Loc.get_as @@ fun s -> + Atomic.incr tries; + Retry.unless s + end; + + while not (Loc.has_awaiters continue) do + Domain.cpu_relax () + done; - assert (Loc.compare_and_set continue false false); - assert (not (Loc.update continue Fun.id)); - Loc.set continue false; + assert (Loc.compare_and_set continue false false); + assert (not (Loc.update continue Fun.id)); + Loc.set continue false; - Unix.sleepf 0.01; + Unix.sleepf 0.01; - assert (Loc.has_awaiters continue && Atomic.get tries = 1); - Loc.set continue true; - Domain.join other_domain; - assert ((not (Loc.has_awaiters continue)) && Atomic.get tries = 2) + assert (Loc.has_awaiters continue && Atomic.get tries = 1); + Loc.set continue true + end; + + assert ((not (Loc.has_awaiters continue)) && Atomic.get tries = 2) + done (* *) let test_periodic_validation () = - let a = Loc.make 0 and b = Loc.make 0 and looping = ref false in - let non_zero_difference_domain = - Domain.spawn @@ fun () -> - let rec tx ~xt = - let d = Xt.get ~xt a - Xt.get ~xt b in - if d <> 0 then d - else begin - (* We explicitly want this tx to go into infinite loop! Without - validation this would never finish. *) - looping := true; - tx ~xt - end + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> + let a = Loc.make 0 and b = Loc.make 0 and looping = ref false in + let non_zero_difference_domain = + Flock.fork_as_promise @@ fun () -> + let rec tx ~xt = + let d = Xt.get ~xt a - Xt.get ~xt b in + if d <> 0 then d + else begin + (* We explicitly want this tx to go into infinite loop! Without + validation this would never finish. *) + looping := true; + tx ~xt + end + in + Xt.commit { tx } in - Xt.commit { tx } - in - while not !looping do - Domain.cpu_relax () - done; + while not !looping do + Domain.cpu_relax () + done; - Loc.set a 1; + Loc.set a 1; - assert (1 = Domain.join non_zero_difference_domain) + assert (1 = Promise.await non_zero_difference_domain) + done (* *) let test_explicit_validation () = - let a = Loc.make 0 and b = Loc.make 0 in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> + let a = Loc.make 0 and b = Loc.make 0 in - let exit = ref false and mutator_running = ref false in - let mutator_domain = - Domain.spawn @@ fun () -> - mutator_running := true; - while not !exit do - let tx ~xt = - Xt.decr ~xt a; - Xt.incr ~xt b - in - Xt.commit { tx }; - Domain.cpu_relax () - done - in + let exit = ref false and mutator_running = ref false in - let n = 100 in + begin + Flock.fork @@ fun () -> + mutator_running := true; + while not !exit do + let tx ~xt = + Xt.decr ~xt a; + Xt.incr ~xt b + in + Xt.commit { tx }; + Domain.cpu_relax () + done + end; - while not !mutator_running do - Domain.cpu_relax () - done; + let n = 100 in - for _ = 1 to n do - let tx ~xt = - let a' = Xt.get ~xt a and b' = Xt.get ~xt b in - Xt.validate ~xt a; - assert (a' + b' = 0) - in - Xt.commit { tx } - done; + while not !mutator_running do + Domain.cpu_relax () + done; - exit := true; + for _ = 1 to n do + let tx ~xt = + let a' = Xt.get ~xt a and b' = Xt.get ~xt b in + Xt.validate ~xt a; + assert (a' + b' = 0) + in + Xt.commit { tx } + done; - Domain.join mutator_domain + exit := true + done (* *) @@ -572,43 +597,46 @@ let test_call () = (** This is a non-deterministic test that might fail occasionally. *) let test_timeout () = - Domain_local_timeout.set_system (module Thread) (module Unix); - - let check (op : ?timeoutf:float -> bool Loc.t -> unit) () = - let rec loop n = - let x = Loc.make false in - let (_ : unit -> unit) = - Domain_local_timeout.set_timeoutf 0.6 @@ fun () -> Loc.set x true + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:5 @@ fun () -> + let check (op : bool Loc.t -> unit) () = + Flock.join_after @@ fun () -> + let rec loop n = + let x = Loc.make false in + let@ _ = + finally Promise.terminate @@ fun () -> + Flock.fork_as_promise @@ fun () -> + Control.sleep ~seconds:0.6; + Loc.set x true + in + match Control.terminate_after ~seconds:0.02 (fun () -> op x) with + | () -> if 0 < n then loop (n - 1) else assert false + | exception Control.Terminate -> + Control.terminate_after ~seconds:2.0 (fun () -> op x) in - match op ~timeoutf:0.02 x with - | () -> if 0 < n then loop (n - 1) else assert false - | exception Timeout.Timeout -> op ~timeoutf:2.0 x + loop 10 in - loop 10 - in - run_domains - [ - check (fun ?timeoutf x -> - Loc.get_as ?timeoutf (fun x -> if not x then Retry.later ()) x); - check (fun ?timeoutf x -> - Loc.update ?timeoutf x (fun x -> x || Retry.later ()) |> ignore); - check (fun ?timeoutf x -> - Loc.modify ?timeoutf x (fun x -> x || Retry.later ())); - check (fun ?timeoutf x -> - let y = Loc.make false in - let tx ~xt = - if not (Xt.get ~xt x) then Retry.later (); - Xt.swap ~xt x y - in - Xt.commit ?timeoutf { tx }); - check (fun ?timeoutf x -> - let y = Loc.make false in - let tx ~xt = - if not (Xt.get ~xt x) then Retry.invalid (); - Xt.swap ~xt x y - in - Xt.commit ?timeoutf { tx }); - ] + Run.all + [ + check (fun x -> Loc.get_as (fun x -> if not x then Retry.later ()) x); + check (fun x -> Loc.update x (fun x -> x || Retry.later ()) |> ignore); + check (fun x -> Loc.modify x (fun x -> x || Retry.later ())); + check (fun x -> + let y = Loc.make false in + let tx ~xt = + if not (Xt.get ~xt x) then Retry.later (); + Xt.swap ~xt x y + in + Xt.commit { tx }); + check (fun x -> + let y = Loc.make false in + let tx ~xt = + if not (Xt.get ~xt x) then Retry.invalid (); + Xt.swap ~xt x y + in + Xt.commit { tx }); + ] + done (* *) diff --git a/test/kcas/threads.ml b/test/kcas/threads.ml index 5cbcc60c..5873a7ad 100644 --- a/test/kcas/threads.ml +++ b/test/kcas/threads.ml @@ -1,12 +1,14 @@ open Kcas let await_between_threads () = + Scheduler.run @@ fun () -> let x = Loc.make 0 in let y = Loc.make 0 in let a_thread = () |> Thread.create @@ fun () -> + Scheduler.run @@ fun () -> Loc.get_as (fun x -> Retry.unless (x <> 0)) x; Loc.set y 22 in diff --git a/test/kcas_data/accumulator_test_stm.ml b/test/kcas_data/accumulator_test_stm.ml index 0565a9b0..1ac97a71 100644 --- a/test/kcas_data/accumulator_test_stm.ml +++ b/test/kcas_data/accumulator_test_stm.ml @@ -48,6 +48,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Accumulator" (module Spec) - |> exit +let () = Stm_run.run ~name:"Accumulator" (module Spec) |> exit diff --git a/test/kcas_data/dllist_test_stm.ml b/test/kcas_data/dllist_test_stm.ml index c0dd7a84..669ce820 100644 --- a/test/kcas_data/dllist_test_stm.ml +++ b/test/kcas_data/dllist_test_stm.ml @@ -83,5 +83,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Dllist" (module Spec) |> exit +let () = Stm_run.run ~name:"Dllist" (module Spec) |> exit diff --git a/test/kcas_data/dune b/test/kcas_data/dune index 1047e5a0..8dc21040 100644 --- a/test/kcas_data/dune +++ b/test/kcas_data/dune @@ -14,6 +14,8 @@ stack_test_stm xt_test) (libraries + scheduler + picos_std.structured alcotest kcas kcas_data diff --git a/test/kcas_data/hashtbl_test_stm.ml b/test/kcas_data/hashtbl_test_stm.ml index f2cf2910..ff5a15a7 100644 --- a/test/kcas_data/hashtbl_test_stm.ml +++ b/test/kcas_data/hashtbl_test_stm.ml @@ -62,5 +62,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Hashtbl" (module Spec) |> exit +let () = Stm_run.run ~name:"Hashtbl" (module Spec) |> exit diff --git a/test/kcas_data/linearizable_chaining_example.ml b/test/kcas_data/linearizable_chaining_example.ml index 58614b86..b3063af6 100644 --- a/test/kcas_data/linearizable_chaining_example.ml +++ b/test/kcas_data/linearizable_chaining_example.ml @@ -216,6 +216,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Hashtbl_with_order" (module Spec) - |> exit +let () = Stm_run.run ~name:"Hashtbl_with_order" (module Spec) |> exit diff --git a/test/kcas_data/lru_cache.ml b/test/kcas_data/lru_cache.ml index 110ca832..c5271c07 100644 --- a/test/kcas_data/lru_cache.ml +++ b/test/kcas_data/lru_cache.ml @@ -62,8 +62,5 @@ end let capacity_of c = Kcas.Xt.commit { tx = Xt.capacity_of c } let set_capacity c n = Kcas.Xt.commit { tx = Xt.set_capacity c n } let get_opt c k = Kcas.Xt.commit { tx = Xt.get_opt c k } - -let set_blocking ?timeoutf c k v = - Kcas.Xt.commit ?timeoutf { tx = Xt.set_blocking c k v } - +let set_blocking c k v = Kcas.Xt.commit { tx = Xt.set_blocking c k v } let remove c k = Kcas.Xt.commit { tx = Xt.remove c k } diff --git a/test/kcas_data/lru_cache.mli b/test/kcas_data/lru_cache.mli index b32b1aa3..2162d524 100644 --- a/test/kcas_data/lru_cache.mli +++ b/test/kcas_data/lru_cache.mli @@ -9,10 +9,8 @@ module Xt : Lru_cache_intf.Ops with type ('k, 'v) t := ('k, 'v) t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn include Lru_cache_intf.Ops with type ('k, 'v) t := ('k, 'v) t with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn diff --git a/test/kcas_data/lru_cache_example.ml b/test/kcas_data/lru_cache_example.ml index 0df2ee29..d0308daf 100644 --- a/test/kcas_data/lru_cache_example.ml +++ b/test/kcas_data/lru_cache_example.ml @@ -1,3 +1,4 @@ +open Picos_std_structured open Kcas module Lru_cache = struct @@ -19,21 +20,23 @@ module Lru_cache = struct | exception Retry.Later -> false end - let get ?timeoutf c k = Kcas.Xt.commit ?timeoutf { tx = Xt.get c k } - let get_if ?timeoutf c k p = Kcas.Xt.commit ?timeoutf { tx = Xt.get_if c k p } + let get c k = Kcas.Xt.commit { tx = Xt.get c k } + let get_if c k p = Kcas.Xt.commit { tx = Xt.get_if c k p } let try_set c k d = Kcas.Xt.commit { tx = Xt.try_set c k d } end let () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let c = Lru_cache.create 10 in - let domain = - Domain.spawn @@ fun () -> + let answer = + Flock.fork_as_promise @@ fun () -> let tx ~xt = Lru_cache.Xt.get ~xt c "a" + Lru_cache.Xt.get ~xt c "b" in Xt.commit { tx } in Lru_cache.set_blocking c "b" 30; Lru_cache.set_blocking c "a" 12; - assert (Domain.join domain = 42); + assert (Promise.await answer = 42); () let () = diff --git a/test/kcas_data/lru_cache_intf.ml b/test/kcas_data/lru_cache_intf.ml index 79a8063e..11ef488d 100644 --- a/test/kcas_data/lru_cache_intf.ml +++ b/test/kcas_data/lru_cache_intf.ml @@ -1,11 +1,10 @@ module type Ops = sig type ('k, 'v) t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val capacity_of : ('x, ('k, 'v) t -> int) fn val set_capacity : ('x, ('k, 'v) t -> int -> unit) fn val get_opt : ('x, ('k, 'v) t -> 'k -> 'v option) fn - val set_blocking : ('x, ('k, 'v) t -> 'k -> 'v -> unit) blocking_fn + val set_blocking : ('x, ('k, 'v) t -> 'k -> 'v -> unit) fn val remove : ('x, ('k, 'v) t -> 'k -> unit) fn end diff --git a/test/kcas_data/mvar_test.ml b/test/kcas_data/mvar_test.ml index f05872a4..d98693e8 100644 --- a/test/kcas_data/mvar_test.ml +++ b/test/kcas_data/mvar_test.ml @@ -1,7 +1,10 @@ +open Picos_std_structured open Kcas open Kcas_data let basics () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let mv = Mvar.create (Some 101) in assert (not (Mvar.is_empty mv)); assert (Mvar.take mv = 101); @@ -9,14 +12,13 @@ let basics () = assert (Mvar.take_opt mv = None); Mvar.put mv 42; let running = Mvar.create None in - let d = - Domain.spawn @@ fun () -> + begin + Flock.fork @@ fun () -> Mvar.put running (); Xt.commit { tx = Mvar.Xt.put mv 76 } - in + end; assert (Mvar.take running = ()); assert (Xt.commit { tx = Mvar.Xt.take mv } = 42); - Domain.join d; assert (Mvar.take mv = 76) let () = diff --git a/test/kcas_data/queue_test_stm.ml b/test/kcas_data/queue_test_stm.ml index d3b0174e..63a166c6 100644 --- a/test/kcas_data/queue_test_stm.ml +++ b/test/kcas_data/queue_test_stm.ml @@ -64,5 +64,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Queue" (module Spec) |> exit +let () = Stm_run.run ~name:"Queue" (module Spec) |> exit diff --git a/test/kcas_data/stack_test_stm.ml b/test/kcas_data/stack_test_stm.ml index 6dd8641a..0e3d62eb 100644 --- a/test/kcas_data/stack_test_stm.ml +++ b/test/kcas_data/stack_test_stm.ml @@ -54,5 +54,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Stack" (module Spec) |> exit +let () = Stm_run.run ~name:"Stack" (module Spec) |> exit diff --git a/test/lib/scheduler/dune b/test/lib/scheduler/dune new file mode 100644 index 00000000..9b8a38fe --- /dev/null +++ b/test/lib/scheduler/dune @@ -0,0 +1,9 @@ +(library + (name scheduler) + (libraries + picos_io.select + (select + scheduler.ml + from + (picos_mux.random picos_mux.multififo -> scheduler.ocaml5.ml) + (picos_mux.thread -> scheduler.ocaml4.ml)))) diff --git a/test/lib/scheduler/scheduler.ocaml4.ml b/test/lib/scheduler/scheduler.ocaml4.ml new file mode 100644 index 00000000..0a45a5e8 --- /dev/null +++ b/test/lib/scheduler/scheduler.ocaml4.ml @@ -0,0 +1 @@ +let run ?n_domains:_ main = Picos_mux_thread.run main diff --git a/test/lib/scheduler/scheduler.ocaml5.ml b/test/lib/scheduler/scheduler.ocaml5.ml new file mode 100644 index 00000000..b15f15bb --- /dev/null +++ b/test/lib/scheduler/scheduler.ocaml5.ml @@ -0,0 +1,5 @@ +let () = Random.self_init () + +let run ?(n_domains = 1) main = + if Random.bool () then Picos_mux_multififo.run_on ~quota:100 ~n_domains main + else Picos_mux_random.run_on ~n_domains main diff --git a/test/kcas_data/stm_run/dune b/test/lib/stm_run/dune similarity index 100% rename from test/kcas_data/stm_run/dune rename to test/lib/stm_run/dune diff --git a/test/kcas_data/stm_run/empty.ocaml4.ml b/test/lib/stm_run/empty.ocaml4.ml similarity index 100% rename from test/kcas_data/stm_run/empty.ocaml4.ml rename to test/lib/stm_run/empty.ocaml4.ml diff --git a/test/kcas_data/stm_run/empty.ocaml5.ml b/test/lib/stm_run/empty.ocaml5.ml similarity index 100% rename from test/kcas_data/stm_run/empty.ocaml5.ml rename to test/lib/stm_run/empty.ocaml5.ml diff --git a/test/kcas_data/stm_run/intf.ml b/test/lib/stm_run/intf.ml similarity index 96% rename from test/kcas_data/stm_run/intf.ml rename to test/lib/stm_run/intf.ml index fd300e7c..74894700 100644 --- a/test/kcas_data/stm_run/intf.ml +++ b/test/lib/stm_run/intf.ml @@ -43,3 +43,6 @@ module type STM_domain = sig val agree_test_par_asym : count:int -> name:string -> QCheck.Test.t val neg_agree_test_par_asym : count:int -> name:string -> QCheck.Test.t end + +let default_count = 1_000 +let default_budgetf = 60.0 diff --git a/test/kcas_data/stm_run/stm_run.ocaml4.ml b/test/lib/stm_run/stm_run.ocaml4.ml similarity index 64% rename from test/kcas_data/stm_run/stm_run.ocaml4.ml rename to test/lib/stm_run/stm_run.ocaml4.ml index 7b8d851b..655fd3ba 100644 --- a/test/kcas_data/stm_run/stm_run.ocaml4.ml +++ b/test/lib/stm_run/stm_run.ocaml4.ml @@ -1,11 +1,7 @@ include Intf -let count = - let factor b = if b then 10 else 1 in - factor (64 <= Sys.word_size) * factor (Sys.backend_type = Native) * 10 - -let run ?(verbose = true) ?(count = count) ?(budgetf = 60.0) ~name ?make_domain - (module Spec : STM.Spec) = +let run ?(verbose = true) ?(count = default_count) ?(budgetf = default_budgetf) + ~name ?make_domain (module Spec : STM.Spec) = let module Seq = STM_sequential.Make (Spec) in let module Con = STM_thread.Make (Spec) [@alert "-experimental"] in Util.run_with_budget ~budgetf ~count @@ fun count -> diff --git a/test/kcas_data/stm_run/stm_run.ocaml5.ml b/test/lib/stm_run/stm_run.ocaml5.ml similarity index 73% rename from test/kcas_data/stm_run/stm_run.ocaml5.ml rename to test/lib/stm_run/stm_run.ocaml5.ml index cece0a71..67af5c41 100644 --- a/test/kcas_data/stm_run/stm_run.ocaml5.ml +++ b/test/lib/stm_run/stm_run.ocaml5.ml @@ -1,13 +1,7 @@ include Intf -let count = - let factor b = if b then 10 else 1 in - factor (64 <= Sys.word_size) - * factor (Sys.backend_type = Native) - * factor (1 < Domain.recommended_domain_count ()) - -let run (type cmd state sut) ?(verbose = true) ?(count = count) - ?(budgetf = 60.0) ~name ?make_domain +let run (type cmd state sut) ?(verbose = true) ?(count = default_count) + ?(budgetf = default_budgetf) ~name ?make_domain (module Spec : STM.Spec with type cmd = cmd and type state = state diff --git a/test/kcas_data/stm_run/util.ml b/test/lib/stm_run/util.ml similarity index 100% rename from test/kcas_data/stm_run/util.ml rename to test/lib/stm_run/util.ml