From 2f2193fa9a79c29c43ebb2f7aecdb6fdc8258089 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Fri, 30 Aug 2024 15:02:37 +0300 Subject: [PATCH] Change to use Picos instead of DLA and DLT This adds support for cancelation through Picos and removes explicit support for timeouts, which simplifies the library. Support for DLA and DLT is removed. This basically also means that one can no longer use Kcas without a scheduler. --- CHANGES.md | 6 + README.md | 84 +- bench/bench_accumulator.ml | 4 +- bench/bench_dllist.ml | 6 +- bench/bench_hashtbl.ml | 4 +- bench/bench_mvar.ml | 3 +- bench/bench_parallel_cmp.ml | 4 +- bench/bench_queue.ml | 6 +- bench/bench_stack.ml | 6 +- bench/bench_xt.ml | 3 +- bench/bench_xt_ro.ml | 3 +- bench/dune | 1 + doc/dune | 2 +- doc/scheduler-interop.md | 178 ----- dune | 8 +- dune-project | 34 +- kcas.opam | 6 +- kcas_data.opam | 5 +- src/kcas/dune | 4 +- src/kcas/kcas.ml | 260 ++---- src/kcas/kcas.mli | 192 +++-- src/kcas_data/dllist.ml | 9 +- src/kcas_data/dllist.mli | 2 - src/kcas_data/dllist_intf.ml | 5 +- src/kcas_data/mvar.ml | 11 +- src/kcas_data/mvar.mli | 7 +- src/kcas_data/mvar_intf.ml | 7 +- src/kcas_data/promise.ml | 7 +- src/kcas_data/promise.mli | 2 - src/kcas_data/promise_intf.ml | 5 +- src/kcas_data/queue.ml | 8 +- src/kcas_data/queue.mli | 7 +- src/kcas_data/queue_intf.ml | 5 +- src/kcas_data/stack.ml | 6 +- src/kcas_data/stack.mli | 7 +- src/kcas_data/stack_intf.ml | 5 +- test/kcas/dune | 4 +- test/kcas/loc_modes.ml | 5 +- test/kcas/ms_queue_test.ml | 38 +- test/kcas/test.ml | 750 +++++++++--------- test/kcas/threads.ml | 2 + test/kcas_data/accumulator_test_stm.ml | 4 +- test/kcas_data/dllist_test_stm.ml | 3 +- test/kcas_data/dune | 2 + test/kcas_data/hashtbl_test_stm.ml | 3 +- .../linearizable_chaining_example.ml | 4 +- test/kcas_data/lru_cache.ml | 5 +- test/kcas_data/lru_cache.mli | 2 - test/kcas_data/lru_cache_example.ml | 13 +- test/kcas_data/lru_cache_intf.ml | 3 +- test/kcas_data/mvar_test.ml | 10 +- test/kcas_data/queue_test_stm.ml | 3 +- test/kcas_data/stack_test_stm.ml | 3 +- test/lib/scheduler/dune | 9 + test/lib/scheduler/scheduler.ocaml4.ml | 5 + test/lib/scheduler/scheduler.ocaml5.ml | 7 + test/{kcas_data => lib}/stm_run/dune | 0 .../stm_run/empty.ocaml4.ml | 0 .../stm_run/empty.ocaml5.ml | 0 test/{kcas_data => lib}/stm_run/intf.ml | 3 + .../stm_run/stm_run.ocaml4.ml | 8 +- .../stm_run/stm_run.ocaml5.ml | 10 +- test/{kcas_data => lib}/stm_run/util.ml | 0 63 files changed, 786 insertions(+), 1022 deletions(-) delete mode 100644 doc/scheduler-interop.md create mode 100644 test/lib/scheduler/dune create mode 100644 test/lib/scheduler/scheduler.ocaml4.ml create mode 100644 test/lib/scheduler/scheduler.ocaml5.ml rename test/{kcas_data => lib}/stm_run/dune (100%) rename test/{kcas_data => lib}/stm_run/empty.ocaml4.ml (100%) rename test/{kcas_data => lib}/stm_run/empty.ocaml5.ml (100%) rename test/{kcas_data => lib}/stm_run/intf.ml (96%) rename test/{kcas_data => lib}/stm_run/stm_run.ocaml4.ml (64%) rename test/{kcas_data => lib}/stm_run/stm_run.ocaml5.ml (73%) rename test/{kcas_data => lib}/stm_run/util.ml (100%) diff --git a/CHANGES.md b/CHANGES.md index ba5ddf60..5689cbb4 100644 --- a/CHANGES.md +++ b/CHANGES.md @@ -1,3 +1,9 @@ +## Next version + +- Changed to use [Picos](https://github.com/ocaml-multicore/picos/) instead of + [DLA](https://github.com/ocaml-multicore/domain-local-await/) and + [DLT](https://github.com/ocaml-multicore/domain-local-timeout/) (@polytypic) + ## 0.7.0 - Numerous minor internal improvements (@polytypic) diff --git a/README.md b/README.md index 1efa71be..6659814c 100644 --- a/README.md +++ b/README.md @@ -47,8 +47,8 @@ Features and properties: read-only compare (CMP) operations that can be performed on overlapping locations in parallel without interference. -- **_Blocking await_**: The algorithm supports timeouts and awaiting for changes - to any number of shared memory locations. +- **_Blocking await_**: The algorithm supports cancelation and awaiting for + changes to any number of shared memory locations. - **_Composable_**: Independently developed transactions can be composed with ease sequentially, conjunctively, conditionally, and disjunctively. @@ -76,7 +76,7 @@ is distributed under the [ISC license](LICENSE.md). - [A transactional lock-free queue](#a-transactional-lock-free-queue) - [Composing transactions](#composing-transactions) - [Blocking transactions](#blocking-transactions) - - [Timeouts](#timeouts) + - [Cancelation and Timeouts](#cancelation-and-timeouts) - [A transactional lock-free leftist heap](#a-transactional-lock-free-leftist-heap) - [Programming with transactional data structures](#programming-with-transactional-data-structures) - [The dining philosophers problem](#the-dining-philosophers-problem) @@ -101,14 +101,7 @@ is distributed under the [ISC license](LICENSE.md). To use the library - - -```ocaml -# #require "kcas" # open Kcas ``` @@ -143,6 +136,7 @@ Block waiting for changes to locations: ```ocaml # let a_domain = Domain.spawn @@ fun () -> + Scheduler.run @@ fun () -> let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in Printf.sprintf "The answer is %d!" x val a_domain : string Domain.t = @@ -551,6 +545,7 @@ and then spawn a domain that tries to atomically both pop and dequeue: ```ocaml # let a_domain = Domain.spawn @@ fun () -> + Scheduler.run @@ fun () -> let tx ~xt = (pop ~xt a_stack, dequeue ~xt a_queue) in let (popped, dequeued) = Xt.commit { tx } in Printf.sprintf "I popped %d and dequeued %d!" @@ -558,6 +553,20 @@ and then spawn a domain that tries to atomically both pop and dequeue: val a_domain : string Domain.t = ``` +**Kcas** uses the [Picos](https://github.com/ocaml-multicore/picos/) interface +to implement blocking. Above `Scheduler.run` starts an effects based +[Picos compatible](https://ocaml-multicore.github.io/picos/doc/picos/index.html#interoperability) +scheduler, which allows **Kcas** to block in a scheduler friendly manner. + +> **_Note_**: Typically your entire program would run inside a scheduler and you +> should +> [fork fibers](https://ocaml-multicore.github.io/picos/doc/picos_mux/index.html#examples) +> rather than spawn domains and start schedulers. The +> [MDX](https://github.com/realworldocaml/mdx) tool used for checking this +> document does not allow one to start a scheduler once and run individual code +> snippets within the scheduler, which is why individual examples spawn domains +> and start schedulers. + The domain is now blocked waiting for changes to the stack and the queue. As long as we don't populate both at the same time @@ -585,7 +594,7 @@ The retry mechanism essentially allows a transaction to wait for an arbitrary condition and can function as a fairly expressive communication and synchronization mechanism. -#### Timeouts +#### Cancelation and Timeouts > If you block, will they come? @@ -605,43 +614,30 @@ val pop_or_raise_if : xt:'a Xt.t -> bool Loc.t -> 'b list Loc.t -> xt:'c Xt.t -> 'b = ``` -This works, but creating, checking, and canceling timeouts properly can be a lot -of work. Therefore **Kcas** also directly supports an optional `timeoutf` -argument for potentially blocking operations. For example, to perform a blocking -pop with a timeout, one can simply explicitly pass the desired timeout in -seconds: - -```ocaml -# let an_empty_stack = stack () in - Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack } -Exception: Failure "Domain_local_timeout.set_timeoutf not implemented". -``` - -Oops! What happened above is that the -[_domain local timeout_](https://github.com/ocaml-multicore/domain-local-timeout) -mechanism used by **Kcas** was not implemented on the current domain. The idea -is that, in the future, concurrent schedulers provide the mechanism out of the -box, but there is also a default implementation using the Stdlib `Thread` and -`Unix` modules that works on most platforms. However, to avoid direct -dependencies to `Thread` and `Unix`, we need to explicitly tell the library that -it can use those modules: +This works, but creating, checking, and canceling timeouts properly in this +manner can be a lot of work. Therefore **Kcas** also directly supports +[cancelation](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/index.html#understanding-cancelation) +through the [Picos](https://github.com/ocaml-multicore/picos/) interface. This +both allows **Kcas** transactions to be cleanly terminated in case the program +has encountered an error and also allows one to simply use a timeout mechanism +provided by the scheduler. For example, the sample +[structured concurrency library](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/index.html) ```ocaml -# Domain_local_timeout.set_system (module Thread) (module Unix) -- : unit = () +# open Picos_std_structured ``` -This initialization, if needed, should be done by application code rather than -by libraries. - -If we now retry the previous example we will get a -[`Timeout`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Timeout/index.html#exception-Timeout) -exception as expected: +for Picos provides the +[`Control.terminate_after`](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/Control/index.html#val-terminate_after) +operation, which allows one to easily run an operation with a timeout on the +current fiber: ```ocaml # let an_empty_stack = stack () in - Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack } -Exception: Kcas.Timeout.Timeout. + Scheduler.run @@ fun () -> + Control.terminate_after ~seconds:0.1 @@ fun () -> + Xt.commit { tx = pop an_empty_stack } +Exception: Picos_std_structured__Control.Terminate. ``` Besides @@ -651,7 +647,7 @@ potentially blocking single location operations such as [`update`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-update), and [`modify`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-modify) -support the optional `timeoutf` argument. +support cancelation. #### A transactional lock-free leftist heap @@ -839,10 +835,9 @@ structures. One source of ready-made data structures is [**Kcas_data**](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/index.html). Let's explore how we can leverage those data structures. Of course, first we -need to `#require` the package and we'll also open it for convenience: +open `Kcas_data` for convenience: ```ocaml -# #require "kcas_data" # open Kcas_data ``` @@ -915,6 +910,7 @@ the philosophers: in Array.iter Domain.join @@ Array.init philosophers @@ fun i -> Domain.spawn @@ fun () -> + Scheduler.run @@ fun () -> let fork_lhs = forks.(i) and fork_rhs = forks.((i + 1) mod philosophers) and eaten = eaten.(i) in diff --git a/bench/bench_accumulator.ml b/bench/bench_accumulator.ml index 17fc4609..c0be3d1a 100644 --- a/bench/bench_accumulator.ml +++ b/bench/bench_accumulator.ml @@ -9,7 +9,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () = let n_ops_todo = Atomic.make n_ops |> Multicore_magic.copy_as_padded in let init _ = () in - + let wrap _ _ action = Scheduler.run action in let work _ () = let rec work () = let n = Util.alloc n_ops_todo in @@ -34,7 +34,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () = (if n_domains = 1 then "" else "s") in - Times.record ~budgetf ~n_domains ~init ~work ~after () + Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after () |> Times.to_thruput_metrics ~n:n_ops ~config ~singular:"operation" let run_suite ~budgetf = diff --git a/bench/bench_dllist.ml b/bench/bench_dllist.ml index c58ab302..61895092 100644 --- a/bench/bench_dllist.ml +++ b/bench/bench_dllist.ml @@ -12,9 +12,10 @@ let run_single ~budgetf ?(n_msgs = 15 * Util.iter_factor) () = assert (Dllist.is_empty t); Util.generate_push_and_pop_sequence n_msgs in + let wrap _ _ action = Scheduler.run action in let work _ bits = Util.Bits.iter op bits in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1) @@ -31,6 +32,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ _ action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -70,7 +72,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1) (format "taker" false n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_hashtbl.ml b/bench/bench_hashtbl.ml index 1286ee67..c9ce1c25 100644 --- a/bench/bench_hashtbl.ml +++ b/bench/bench_hashtbl.ml @@ -24,7 +24,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor) Atomic.set n_ops_todo n_ops; Random.State.make_self_init () in - + let wrap _ _ action = Scheduler.run action in let work _ state = let rec work () = let n = Util.alloc n_ops_todo in @@ -56,7 +56,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor) percent_read in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config let run_suite ~budgetf = diff --git a/bench/bench_mvar.ml b/bench/bench_mvar.ml index 2c3988b5..b1871ac5 100644 --- a/bench/bench_mvar.ml +++ b/bench/bench_mvar.ml @@ -11,6 +11,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in let init _ = () in + let wrap _ _ action = Scheduler.run action in let work i () = if i < n_adders then if blocking_add then @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work ~after () + Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_parallel_cmp.ml b/bench/bench_parallel_cmp.ml index e1703213..686c9170 100644 --- a/bench/bench_parallel_cmp.ml +++ b/bench/bench_parallel_cmp.ml @@ -11,7 +11,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () = let n_ops_todo = Atomic.make n_ops |> Multicore_magic.copy_as_padded in let init i = Array.unsafe_get xs i in - + let wrap _ _ action = Scheduler.run action in let work _ x = let tx1 ~xt = let a = Xt.get ~xt a in @@ -41,7 +41,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () = Printf.sprintf "%d worker%s" n_domains (if n_domains = 1 then "" else "s") in - Times.record ~budgetf ~n_domains ~init ~work ~after () + Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after () |> Times.to_thruput_metrics ~n:n_ops ~singular:"transaction" ~config let run_suite ~budgetf = diff --git a/bench/bench_queue.ml b/bench/bench_queue.ml index ebcc9d07..f7afbfea 100644 --- a/bench/bench_queue.ml +++ b/bench/bench_queue.ml @@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = assert (Queue.is_empty t); Util.generate_push_and_pop_sequence n_msgs in + let wrap _ _ action = Scheduler.run action in let work _ bits = Util.Bits.iter op bits in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) @@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ _ action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -79,7 +81,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_stack.ml b/bench/bench_stack.ml index 4886702b..a93b7262 100644 --- a/bench/bench_stack.ml +++ b/bench/bench_stack.ml @@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () = assert (Stack.is_empty t); Util.generate_push_and_pop_sequence n_msgs in + let wrap _ _ action = Scheduler.run action in let work _ bits = Util.Bits.iter op bits in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain" let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) @@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) Atomic.set n_msgs_to_take n_msgs; Atomic.set n_msgs_to_add n_msgs in + let wrap _ _ action = Scheduler.run action in let work i () = if i < n_adders then let rec work () = @@ -79,7 +81,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2) (format "taker" blocking_take n_takers) in - Times.record ~budgetf ~n_domains ~init ~work () + Times.record ~budgetf ~n_domains ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config let run_suite ~budgetf = diff --git a/bench/bench_xt.ml b/bench/bench_xt.ml index 1bdf1ace..39dc451b 100644 --- a/bench/bench_xt.ml +++ b/bench/bench_xt.ml @@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2) in let init _ = () in + let wrap _ _ action = Scheduler.run action in let work _ () = let rec loop i = if i > 0 then begin @@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2) let config = Printf.sprintf "%d loc tx" n_locs in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config let run_suite ~budgetf = diff --git a/bench/bench_xt_ro.ml b/bench/bench_xt_ro.ml index 9629ab3a..5eca629c 100644 --- a/bench/bench_xt_ro.ml +++ b/bench/bench_xt_ro.ml @@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2) in let init _ = () in + let wrap _ _ action = Scheduler.run action in let work _ () = let rec loop i = if i > 0 then begin @@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2) let config = Printf.sprintf "%d loc tx" n_locs in - Times.record ~budgetf ~n_domains:1 ~init ~work () + Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work () |> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config let run_suite ~budgetf = diff --git a/bench/dune b/bench/dune index cd97da36..279eb529 100644 --- a/bench/dune +++ b/bench/dune @@ -14,6 +14,7 @@ let () = (action (run %{test} -brief)) (libraries + scheduler kcas_data multicore-bench backoff diff --git a/doc/dune b/doc/dune index 4bfc69d7..92002ff3 100644 --- a/doc/dune +++ b/doc/dune @@ -5,4 +5,4 @@ (package kcas_data)) (enabled_if (>= %{ocaml_version} 5.0.0)) - (files gkmz-with-read-only-cmp-ops.md scheduler-interop.md)) + (files gkmz-with-read-only-cmp-ops.md)) diff --git a/doc/scheduler-interop.md b/doc/scheduler-interop.md deleted file mode 100644 index 39bb662b..00000000 --- a/doc/scheduler-interop.md +++ /dev/null @@ -1,178 +0,0 @@ -# Scheduler interop - -The blocking mechanism in **Kcas** is based on a -[_domain local await_](https://github.com/ocaml-multicore/domain-local-await) -mechanism that schedulers can choose to implement to allow libraries like -**Kcas** to work with them. - -Implementing schedulers is not really what casual users of **Kcas** are supposed -to do. Below is an example of a _toy_ scheduler whose purpose is only to give a -sketch of how a scheduler can provide the domain local await mechanism. - -Let's also demonstrate the use of the -[`Queue`](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Queue/index.html), -[`Stack`](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Stack/index.html), -and -[`Promise`](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/Promise/index.html) -implementations that are conveniently provided by -[**Kcas_data**](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/index.html). - - - -Here is the full toy scheduler module: - -```ocaml -module Scheduler : sig - type t - val spawn : unit -> t - val join : t -> unit - val fiber : t -> (unit -> 'a) -> 'a Promise.t -end = struct - open Effect.Deep - type _ Effect.t += - | Suspend : (('a, unit) continuation -> unit) -> 'a Effect.t - type t = { - queue: (unit -> unit) Queue.t; - domain: unit Domain.t - } - let spawn () = - let queue = Queue.create () in - let rec scheduler work = - let effc (type a) : a Effect.t -> _ = function - | Suspend ef -> Some ef - | _ -> None in - try_with work () { effc }; - match Queue.take_opt queue with - | Some work -> scheduler work - | None -> () in - let prepare_for_await _ = - let state = Atomic.make `Init in - let release () = - if Atomic.get state != `Released then - match Atomic.exchange state `Released with - | `Awaiting k -> - Queue.add (continue k) queue - | _ -> () in - let await () = - if Atomic.get state != `Released then - Effect.perform @@ Suspend (fun k -> - if not (Atomic.compare_and_set state `Init - (`Awaiting k)) then - continue k ()) - in - Domain_local_await.{ release; await } in - let domain = Domain.spawn @@ fun () -> - try - while true do - let work = Queue.take_blocking queue in - Domain_local_await.using - ~prepare_for_await - ~while_running:(fun () -> scheduler work) - done - with Exit -> () in - { queue; domain } - let join t = - Queue.add (fun () -> raise Exit) t.queue; - Domain.join t.domain - let fiber t thunk = - let (promise, resolver) = Promise.create () in - Queue.add - (fun () -> Promise.resolve resolver (thunk ())) - t.queue; - promise -end -``` - -The idea is that one can spawn a scheduler to run on a new domain. Then one can -run fibers on the scheduler. Because the scheduler provides the domain local -await mechanism libraries like **Kcas** can use it to block in a scheduler -independent and friendly manner. - -Let's then demonstrate the integration. To start we spawn a scheduler: - -```ocaml -# let scheduler = Scheduler.spawn () -val scheduler : Scheduler.t = -``` - -The scheduler is now eagerly awaiting for fibers to run. Let's give it a couple -of them, but, let's first create a queue and a stack to communicate with the -fibers: - -```ocaml -# let in_queue : int Queue.t = Queue.create () -val in_queue : int Kcas_data.Queue.t = -# let out_stack : int Stack.t = Stack.create () -val out_stack : int Kcas_data.Stack.t = -``` - -The first fiber we create just copies elements from the `in_queue` to the -`out_stack`: - -```ocaml -# ignore @@ Scheduler.fiber scheduler @@ fun () -> - while true do - let elem = Queue.take_blocking in_queue in - Printf.printf "Giving %d...\n%!" elem; - Stack.push elem out_stack - done -- : unit = () -``` - -The second fiber awaits to take two elements from the `out_stack`, updates a -state in between, and then returns their sum: - -```ocaml -# let state = Loc.make 0 -val state : int Loc.t = Kcas.Loc.Loc {Kcas.Loc.state = ; id = } -# let sync_to target = - state - |> Loc.get_as @@ fun current -> - Retry.unless (target <= current) -val sync_to : int -> unit = -# let a_promise = Scheduler.fiber scheduler @@ fun () -> - let x = Stack.pop_blocking out_stack in - Printf.printf "First you gave me %d.\n%!" x; - Loc.set state 1; - let y = Stack.pop_blocking out_stack in - Printf.printf "Then you gave me %d.\n%!" y; - Loc.set state 2; - x + y -val a_promise : int Promise.t = -``` - -To interact with the fibers, we add some elements to the `in_queue`: - -```ocaml -# Queue.add 14 in_queue; sync_to 1 -Giving 14... -First you gave me 14. -- : unit = () -# Queue.add 28 in_queue; sync_to 2 -Giving 28... -Then you gave me 28. -- : unit = () -# Promise.await a_promise -- : int = 42 -``` - -As can be seen above, the scheduler multiplexes the domain among the fibers. -Notice that thanks to the domain local await mechanism we could just perform -blocking operations without thinking about the schedulers. Communication between -the main domain, the scheduler domain, and the fibers _just works_ ™. - -Time to close the shop. - -```ocaml -# Scheduler.join scheduler -- : unit = () -``` - -_That's all Folks!_ diff --git a/dune b/dune index 75232a0a..cf531969 100644 --- a/dune +++ b/dune @@ -3,5 +3,11 @@ (deps (package kcas) (package kcas_data)) - (libraries domain_shims) + (libraries + domain_shims + kcas + kcas_data + multicore-magic + picos_std.structured + scheduler) (files README.md)) diff --git a/dune-project b/dune-project index b96846ee..0f2b99c9 100644 --- a/dune-project +++ b/dune-project @@ -34,12 +34,22 @@ (>= 4.13.0)) (backoff (>= 0.1.0)) - (domain-local-await - (>= 1.0.1)) - (domain-local-timeout - (>= 1.0.1)) (multicore-magic (>= 2.3.0)) + (picos + (>= 0.5.0)) + (picos_std + (and + (>= 0.5.0) + :with-test)) + (picos_io + (and + (>= 0.5.0) + :with-test)) + (picos_mux + (and + (>= 0.5.0) + :with-test)) (domain_shims (and (>= 0.1.0) @@ -84,9 +94,21 @@ (and (>= 0.1.0) :with-test)) - (domain-local-await + (picos + (and + (>= 0.5.0) + :with-test)) + (picos_std + (and + (>= 0.5.0) + :with-test)) + (picos_io + (and + (>= 0.5.0) + :with-test)) + (picos_mux (and - (>= 1.0.1) + (>= 0.5.0) :with-test)) (domain_shims (and diff --git a/kcas.opam b/kcas.opam index 33c54daf..49d3718f 100644 --- a/kcas.opam +++ b/kcas.opam @@ -19,9 +19,11 @@ depends: [ "dune" {>= "3.14"} "ocaml" {>= "4.13.0"} "backoff" {>= "0.1.0"} - "domain-local-await" {>= "1.0.1"} - "domain-local-timeout" {>= "1.0.1"} "multicore-magic" {>= "2.3.0"} + "picos" {>= "0.5.0"} + "picos_std" {>= "0.5.0" & with-test} + "picos_io" {>= "0.5.0" & with-test} + "picos_mux" {>= "0.5.0" & with-test} "domain_shims" {>= "0.1.0" & with-test} "alcotest" {>= "1.8.0" & with-test} "qcheck-core" {>= "0.21.2" & with-test} diff --git a/kcas_data.opam b/kcas_data.opam index f129b2f7..8812ff84 100644 --- a/kcas_data.opam +++ b/kcas_data.opam @@ -20,7 +20,10 @@ depends: [ "kcas" {= version} "multicore-magic" {>= "2.3.0"} "backoff" {>= "0.1.0" & with-test} - "domain-local-await" {>= "1.0.1" & with-test} + "picos" {>= "0.5.0" & with-test} + "picos_std" {>= "0.5.0" & with-test} + "picos_io" {>= "0.5.0" & with-test} + "picos_mux" {>= "0.5.0" & with-test} "domain_shims" {>= "0.1.0" & with-test} "multicore-bench" {>= "0.1.5" & with-test} "alcotest" {>= "1.8.0" & with-test} diff --git a/src/kcas/dune b/src/kcas/dune index 70b19af5..8e0d7765 100644 --- a/src/kcas/dune +++ b/src/kcas/dune @@ -1,11 +1,11 @@ (library (name kcas) (public_name kcas) - (libraries domain-local-await domain-local-timeout backoff multicore-magic)) + (libraries picos backoff multicore-magic)) (mdx (package kcas) (deps (package kcas)) - (libraries kcas backoff domain_shims) + (libraries kcas backoff scheduler threads.posix domain_shims) (files kcas.mli)) diff --git a/src/kcas/kcas.ml b/src/kcas/kcas.ml index 5bded5f4..f8fb2051 100644 --- a/src/kcas/kcas.ml +++ b/src/kcas/kcas.ml @@ -3,6 +3,8 @@ * Copyright (c) 2023, Vesa Karvonen *) +open Picos + (** Work around CSE bug in OCaml 5-5.1. *) let[@inline] atomic_get x = Atomic.get ((* Prevents CSE *) Sys.opaque_identity x) @@ -22,82 +24,6 @@ let[@inline] fenceless_get x = let fenceless_get = atomic_get *) -module Timeout = struct - exception Timeout - - let[@inline never] timeout () = raise Timeout - - type _ t = - | Unset : [> `Unset ] t - | Elapsed : [> `Elapsed ] t - | Call : (unit -> unit) -> [> `Call ] t - | Set : { mutable state : [< `Elapsed | `Call ] t } -> [> `Set ] t - - external as_atomic : [< `Set ] t -> [< `Elapsed | `Call ] t Atomic.t - = "%identity" - - (* Fenceless operations are safe here as the timeout state is not not visible - outside of the library and we don't always need the latest value and, when - we do, there is a fence after. *) - - let[@inline] check (t : [< `Set | `Unset ] t) = - match t with - | Unset -> () - | Set set_r -> - if fenceless_get (as_atomic (Set set_r)) == Elapsed then timeout () - - let set seconds (state : [< `Elapsed | `Call ] t Atomic.t) = - Domain_local_timeout.set_timeoutf seconds @@ fun () -> - match Atomic.exchange state Elapsed with - | Call release_or_cancel -> release_or_cancel () - | Elapsed -> () - - let call_id = Call Fun.id - - let[@inline never] alloc seconds = - let (Set set_r as t : [ `Set ] t) = Set { state = call_id } in - let cancel = set seconds (as_atomic t) in - if not (Atomic.compare_and_set (as_atomic t) call_id (Call cancel)) then - timeout (); - Set set_r - - let[@inline] alloc_opt = function - | None -> Unset - | Some seconds -> alloc seconds - - let[@inline never] await (state : [< `Elapsed | `Call ] t Atomic.t) release = - match fenceless_get state with - | Call cancel as alive -> - if Atomic.compare_and_set state alive (Call release) then Call cancel - else timeout () - | Elapsed -> timeout () - - let[@inline] await (t : [ `Unset | `Set ] t) release = - match t with Unset -> Unset | Set r -> await (as_atomic (Set r)) release - - let[@inline never] unawait (state : [< `Elapsed | `Call ] t Atomic.t) alive = - match fenceless_get state with - | Call _ as await -> - if not (Atomic.compare_and_set state await alive) then timeout () - | Elapsed -> timeout () - - let[@inline] unawait t alive = - match (t, alive) with - | Set set_r, Call call_r -> unawait (as_atomic (Set set_r)) (Call call_r) - | _ -> () - - let[@inline] cancel_alive (alive : [< `Unset | `Call ] t) = - match alive with Unset -> () | Call cancel -> cancel () - - let[@inline] cancel (t : [< `Set | `Unset ] t) = - match t with - | Unset -> () - | Set set_r -> ( - match fenceless_get (as_atomic (Set set_r)) with - | Elapsed -> () - | Call cancel -> cancel ()) -end - module Id = struct let neg_id = Atomic.make (-1) let[@inline] neg_ids n = Atomic.fetch_and_add neg_id (-n) @@ -128,14 +54,12 @@ end = struct x end -type awaiter = unit -> unit - -let[@inline] resume_awaiter awaiter = awaiter () +type awaiter = Trigger.t let[@inline] resume_awaiters = function | [] -> () - | [ awaiter ] -> resume_awaiter awaiter - | awaiters -> List.iter resume_awaiter awaiters + | [ awaiter ] -> Trigger.signal awaiter + | awaiters -> List.iter Trigger.signal awaiters module Mode = struct type t = [ `Lock_free | `Obstruction_free ] @@ -166,7 +90,7 @@ and _ tdt = This field must be first, see [root_as_atomic] and [tree_as_ref]. *) - timeout : [ `Set | `Unset ] Timeout.t; + mutable fiber : Fiber.Maybe.t; mutable mode : Mode.t; mutable validate_counter : int; mutable post_commit : Action.t; @@ -467,67 +391,49 @@ let rec remove_awaiter backoff loc before awaiter = if not (Atomic.compare_and_set (as_atomic loc) state_old state_new) then remove_awaiter (Backoff.once backoff) loc before awaiter -let block timeout loc before = - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await timeout t.release in - if add_awaiter loc before t.release then begin - try t.await () - with cancellation_exn -> - remove_awaiter Backoff.default loc before t.release; - Timeout.cancel_alive alive; - raise cancellation_exn - end; - Timeout.unawait timeout alive +let block loc before = + let t = Trigger.create () in + if add_awaiter loc before t then + match Trigger.await t with + | None -> () + | Some (exn, bt) -> + remove_awaiter Backoff.default loc before t; + Printexc.raise_with_backtrace exn bt -let rec update_no_alloc timeout backoff loc state f = +let rec update_no_alloc backoff loc state f = (* Fenceless is safe as we have had a fence before if needed and there is a fence after. *) let state_old = fenceless_get (as_atomic loc) in let before = eval state_old in match f before with | after -> - if before == after then begin - Timeout.cancel timeout; - before - end + if before == after then before else begin state.after <- after; if Atomic.compare_and_set (as_atomic loc) state_old state then begin resume_awaiters state_old.awaiters; - Timeout.cancel timeout; before end - else update_no_alloc timeout (Backoff.once backoff) loc state f + else update_no_alloc (Backoff.once backoff) loc state f end | exception Retry.Later -> - block timeout loc before; - update_no_alloc timeout backoff loc state f - | exception exn -> - Timeout.cancel timeout; - raise exn + block loc before; + update_no_alloc backoff loc state f -let update_with_state timeout backoff loc f state_old = +let update_with_state backoff loc f state_old = let before = eval state_old in match f before with | after -> - if before == after then begin - Timeout.cancel timeout; - before - end + if before == after then before else let state = new_state after in if Atomic.compare_and_set (as_atomic loc) state_old state then begin resume_awaiters state_old.awaiters; - Timeout.cancel timeout; before end - else update_no_alloc timeout (Backoff.once backoff) loc state f + else update_no_alloc (Backoff.once backoff) loc state f | exception Retry.Later -> let state = new_state before in - block timeout loc before; - update_no_alloc timeout backoff loc state f - | exception exn -> - Timeout.cancel timeout; - raise exn + update_no_alloc backoff loc state f let rec exchange_no_alloc backoff loc state = let state_old = atomic_get (as_atomic loc) in @@ -553,8 +459,8 @@ let[@inline] rec cas_with_state backoff loc before state state_old = having installed or removed a waiter. Fenceless is safe as there was a fence before. *) - cas_with_state (Backoff.once backoff) loc before state - (fenceless_get (as_atomic loc))) + let backoff = Backoff.once backoff in + cas_with_state backoff loc before state (atomic_get (as_atomic loc))) let inc x = x + 1 let dec x = x - 1 @@ -584,25 +490,15 @@ module Loc = struct let[@inline] get_id loc = (to_loc loc).id let get loc = eval (atomic_get (as_atomic (to_loc loc))) - let rec get_as timeout f loc state = + let rec get_as f loc state = let before = eval state in - match f before with - | value -> - Timeout.cancel timeout; - value - | exception Retry.Later -> - block timeout (to_loc loc) before; - (* Fenceless is safe as there was already a fence before. *) - get_as timeout f loc (fenceless_get (as_atomic (to_loc loc))) - | exception exn -> - Timeout.cancel timeout; - raise exn - - let[@inline] get_as ?timeoutf f loc = - get_as - (Timeout.alloc_opt timeoutf) - f loc - (atomic_get (as_atomic (to_loc loc))) + try f before + with Retry.Later -> + block (to_loc loc) before; + (* Fenceless is safe as there was already a fence before. *) + get_as f loc (fenceless_get (as_atomic (to_loc loc))) + + let[@inline] get_as f loc = get_as f loc (atomic_get (as_atomic (to_loc loc))) let[@inline] get_mode loc = if (to_loc loc).id < 0 then `Lock_free else `Obstruction_free @@ -612,21 +508,18 @@ module Loc = struct let state_old = atomic_get (as_atomic (to_loc loc)) in cas_with_state backoff (to_loc loc) before state state_old - let fenceless_update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in - update_with_state timeout backoff (to_loc loc) f + let fenceless_update ?(backoff = Backoff.default) loc f = + update_with_state backoff (to_loc loc) f (fenceless_get (as_atomic (to_loc loc))) - let[@inline] fenceless_modify ?timeoutf ?backoff loc f = - fenceless_update ?timeoutf ?backoff loc f |> ignore + let[@inline] fenceless_modify ?backoff loc f = + fenceless_update ?backoff loc f |> ignore - let update ?timeoutf ?(backoff = Backoff.default) loc f = - let timeout = Timeout.alloc_opt timeoutf in - update_with_state timeout backoff (to_loc loc) f + let update ?(backoff = Backoff.default) loc f = + update_with_state backoff (to_loc loc) f (atomic_get (as_atomic (to_loc loc))) - let[@inline] modify ?timeoutf ?backoff loc f = - update ?timeoutf ?backoff loc f |> ignore + let[@inline] modify ?backoff loc f = update ?backoff loc f |> ignore let exchange ?(backoff = Backoff.default) loc value = exchange_no_alloc backoff (to_loc loc) (new_state value) @@ -711,6 +604,17 @@ module Xt = struct tree_as_ref xt := T (Node { loc; state; lt; gt; awaiters = [] }); before + let check (Xt xt_r : _ t) = + match Fiber.Maybe.to_fiber_or_current xt_r.fiber with + | fiber -> + Fiber.check fiber; + if xt_r.fiber != Fiber.Maybe.of_fiber fiber then + xt_r.fiber <- Fiber.Maybe.of_fiber fiber + | exception _ -> + (* Getting the current fiber should never fail except when not running + inside a scheduler. *) + () + let update_old : type c a. _ -> a loc -> c -> (c, a) up -> _ -> _ -> _ -> a = fun (Xt xt_r as xt : _ t) loc c up lt gt state' -> let c0 = xt_r.validate_counter in @@ -721,7 +625,7 @@ module Xt = struct The assumption is that potentially infinite loops will repeatedly access the same locations. *) if c0 land c1 = 0 then begin - Timeout.check xt_r.timeout; + check xt; validate_all_rec xt !(tree_as_ref xt) end; let state : a state = Obj.magic state' in @@ -929,17 +833,13 @@ module Xt = struct let initial_validate_period = 4 - let success (Xt xt_r : _ t) result = - Timeout.cancel xt_r.timeout; - Action.run xt_r.post_commit result - let rec commit backoff (Xt xt_r as xt : _ t) tx = match tx ~xt with | result -> begin match !(tree_as_ref xt) with - | T Leaf -> success xt result + | T Leaf -> Action.run xt_r.post_commit result | T (Node { loc; state; lt = T Leaf; gt = T Leaf; _ }) -> - if is_cmp xt state then success xt result + if is_cmp xt state then Action.run xt_r.post_commit result else begin state.which <- W After; let before = state.before in @@ -948,7 +848,7 @@ module Xt = struct fence. *) let state_old = fenceless_get (as_atomic loc) in if cas_with_state Backoff.default loc before state state_old then - success xt result + Action.run xt_r.post_commit result else commit_once_reuse backoff xt tx end | T (Node node_r) -> begin @@ -956,22 +856,22 @@ module Xt = struct match determine xt 0 root with | status -> if a_cmp_followed_by_a_cas < status then begin - if finish xt root (verify xt root) then success xt result + if finish xt root (verify xt root) then + Action.run xt_r.post_commit result else begin - (* We switch to [`Lock_free] as there was - interference. *) + (* We switch to [`Lock_free] as there was interference. *) commit_once_alloc backoff `Lock_free xt tx end end else if a_cmp = status || finish xt root (if 0 <= status then After else Before) - then success xt result + then Action.run xt_r.post_commit result else commit_once_alloc backoff xt_r.mode xt tx | exception Exit -> (* Fenceless is safe as there was a fence before. *) if fenceless_get (root_as_atomic xt) == R After then - success xt result + Action.run xt_r.post_commit result else commit_once_alloc backoff xt_r.mode xt tx end end @@ -981,58 +881,52 @@ module Xt = struct | T Leaf -> invalid_retry () | T (Node node_r) -> begin let root = Node node_r in - let t = Domain_local_await.prepare_for_await () in - let alive = Timeout.await xt_r.timeout t.release in - match add_awaiters t.release xt root with + let t = Trigger.create () in + match add_awaiters t xt root with | T Leaf -> begin - match t.await () with - | () -> - remove_awaiters t.release xt (T Leaf) root |> ignore; - Timeout.unawait xt_r.timeout alive; + match Trigger.await t with + | None -> + remove_awaiters t xt (T Leaf) root |> ignore; commit_reset_reuse backoff xt tx - | exception cancellation_exn -> - remove_awaiters t.release xt (T Leaf) root |> ignore; - Timeout.cancel_alive alive; - raise cancellation_exn + | Some (exn, bt) -> + remove_awaiters t xt (T Leaf) root |> ignore; + Printexc.raise_with_backtrace exn bt end | T (Node _) as stop -> - remove_awaiters t.release xt stop root |> ignore; - Timeout.unawait xt_r.timeout alive; + remove_awaiters t xt stop root |> ignore; commit_once_reuse backoff xt tx end end - | exception exn -> - Timeout.cancel xt_r.timeout; - raise exn and commit_once_reuse backoff xt tx = + check xt; commit_reuse (Backoff.once backoff) xt tx and commit_reset_reuse backoff xt tx = + check xt; commit_reuse (Backoff.reset backoff) xt tx and commit_reuse backoff (Xt xt_r as xt : _ t) tx = tree_as_ref xt := T Leaf; xt_r.validate_counter <- initial_validate_period; xt_r.post_commit <- Action.noop; - Timeout.check xt_r.timeout; commit backoff xt tx - and commit_once_alloc backoff mode (Xt xt_r : _ t) tx = + and commit_once_alloc backoff mode (Xt xt_r as xt : _ t) tx = + check xt; let backoff = Backoff.once backoff in - Timeout.check xt_r.timeout; let rot = U Leaf in let validate_counter = initial_validate_period in let post_commit = Action.noop in let xt = Xt { xt_r with rot; mode; validate_counter; post_commit } in commit backoff xt tx - let[@inline] commit ?timeoutf ?(backoff = Backoff.default) - ?(mode = `Obstruction_free) { tx } = - let timeout = Timeout.alloc_opt timeoutf + let[@inline] commit ?(backoff = Backoff.default) ?(mode = `Obstruction_free) + { tx } = + let fiber = Fiber.Maybe.nothing and rot = U Leaf and validate_counter = initial_validate_period and post_commit = Action.noop in - let xt = Xt { rot; timeout; mode; validate_counter; post_commit } in + let xt = Xt { rot; fiber; mode; validate_counter; post_commit } in commit backoff xt tx end diff --git a/src/kcas/kcas.mli b/src/kcas/kcas.mli index 185a2c23..21fb617b 100644 --- a/src/kcas/kcas.mli +++ b/src/kcas/kcas.mli @@ -22,7 +22,7 @@ obstruction-free} read-only compare (CMP) operations that can be performed on overlapping locations in parallel without interference. - - {b Blocking await}: The algorithm supports timeouts and awaiting for + - {b Blocking await}: The algorithm supports cancelation and awaiting for changes to any number of shared memory locations. - {b Composable}: Independently developed transactions can be composed with @@ -72,6 +72,7 @@ {[ # let a_domain = Domain.spawn @@ fun () -> + Scheduler.run @@ fun () -> let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in Printf.sprintf "The answer is %d!" x val a_domain : string Domain.t = @@ -80,7 +81,8 @@ Perform transactions over locations: {[ - # let tx ~xt = + # Scheduler.run @@ fun () -> + let tx ~xt = let a = Xt.get ~xt a and b = Xt.get ~xt b in Xt.set ~xt x (b - a) @@ -105,16 +107,9 @@ can skip over these. The documentation links back to these modules where appropriate. *) -(** Timeout support. *) -module Timeout : sig - exception Timeout - (** Exception that may be raised by operations such as {!Loc.get_as}, - {!Loc.update}, {!Loc.modify}, or {!Xt.commit} when given a [~timeoutf] in - seconds. *) -end - -(** Retry support. *) module Retry : sig + (** Retry support. *) + exception Later (** Exception that may be raised to signal that the operation, such as {!Loc.get_as}, {!Loc.update}, or {!Xt.commit}, should be retried, at some @@ -126,7 +121,7 @@ module Retry : sig shared memory locations have already changed. *) val later : unit -> 'a - (** [later ()] is equivalent to [raise Later]. *) + (** [later ()] is equivalent to [raise_notrace Later]. *) val unless : bool -> unit (** [unless condition] is equivalent to [if not condition then later ()]. *) @@ -137,11 +132,12 @@ module Retry : sig outside of the transaction, and the transaction should be retried. *) val invalid : unit -> 'a - (** [invalid ()] is equivalent to [raise Invalid]. *) + (** [invalid ()] is equivalent to [raise_notrace Invalid]. *) end -(** Operating modes of the [k-CAS-n-CMP] algorithm. *) module Mode : sig + (** Operating modes of the [k-CAS-n-CMP] algorithm. *) + type t = [ `Lock_free (** In [`Lock_free] mode the algorithm makes sure that at least one domain will @@ -164,18 +160,15 @@ end - [backoff] specifies the configuration for the [Backoff] mechanism. In special cases, having more detailed knowledge of the application, one - might adjust the configuration to improve performance. + might adjust the configuration to improve performance. *) - - [timeoutf] specifies a timeout in seconds and, if specified, the - {!Timeout.Timeout} exception may be raised by the operation to signal that - the timeout expired. *) +module Loc : sig + (** Shared memory locations. -(** Shared memory locations. + This module is essentially compatible with the [Stdlib.Atomic] module, + except that a number of functions take some optional arguments that one + usually need not worry about. *) - This module is essentially compatible with the [Stdlib.Atomic] module, - except that a number of functions take some optional arguments that one - usually need not worry about. *) -module Loc : sig (** Type of shared memory locations. *) type !'a t = private | Loc : { state : 'state; id : 'id } -> 'a t @@ -225,7 +218,7 @@ module Loc : sig val get : 'a t -> 'a (** [get r] reads the current value of the shared memory location [r]. *) - val get_as : ?timeoutf:float -> ('a -> 'b) -> 'a t -> 'b + val get_as : ('a -> 'b) -> 'a t -> 'b (** [get_as f loc] is equivalent to [f (get loc)]. The given function [f] may raise the {!Retry.Later} exception to signal that the conditional load should be retried only after the location has been modified outside of the @@ -237,7 +230,7 @@ module Loc : sig location [r] to the [after] value if the current value of [r] is the [before] value. *) - val update : ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [update r f] repeats [let b = get r in compare_and_set r b (f b)] until it succeeds and then returns the [b] value. The given function [f] may raise the {!Retry.Later} exception to signal that the update should only be @@ -245,8 +238,7 @@ module Loc : sig also safe for the given function [f] to raise any other exception to abort the update. *) - val modify : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [modify r f] is equivalent to [update r f |> ignore]. *) val exchange : ?backoff:Backoff.t -> 'a t -> 'a -> 'a @@ -277,13 +269,11 @@ module Loc : sig (** [fenceless_get r] is like [get r] except that [fenceless_get]s may be reordered. *) - val fenceless_update : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a + val fenceless_update : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> 'a (** [fenceless_update r f] is like [update r f] except that in case [f x == x] the update may be reordered. *) - val fenceless_modify : - ?timeoutf:float -> ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit + val fenceless_modify : ?backoff:Backoff.t -> 'a t -> ('a -> 'a) -> unit (** [fenceless_modify r f] is like [modify r f] except that in case [f x == x] the modify may be reordered. *) end @@ -318,77 +308,80 @@ end memory locations are atomically marked as having taken effect and subsequent reads of the locations will be able to see the newly written values. *) -(** Explicit transaction log passing on shared memory locations. - - This module provides a way to implement composable transactions over shared - memory locations. A transaction is a function written by the library user - and can be thought of as a specification of a sequence of {!Xt.get} and - {!Xt.set} accesses to shared memory locations. To actually perform the - accesses one then {!Xt.commit}s the transaction. - - Transactions should generally not perform arbitrary side-effects, because - when a transaction is committed it may be attempted multiple times meaning - that the side-effects are also performed multiple times. {!Xt.post_commit} - can be used to perform an action only once after the transaction has been - committed succesfully. - - {b WARNING}: To make it clear, the operations provided by the {!Loc} module - for accessing individual shared memory locations do not implicitly go - through the transaction mechanism and should generally not be used within - transactions. There are advanced algorithms where one might, within a - transaction, perform operations that do not get recorded into the - transaction log. Using such techniques correctly requires expert knowledge - and is not recommended for casual users. - - As an example, consider an implementation of doubly-linked circular - lists. Instead of using a mutable field, [ref], or [Atomic.t], one would use - a shared memory location, or {!Loc.t}, for the pointers in the node type: - - {[ - type 'a node = { - succ: 'a node Loc.t; - pred: 'a node Loc.t; - datum: 'a; - } - ]} - - To remove a node safely one wants to atomically update the [succ] and [pred] - pointers of the predecessor and successor nodes and to also update the - [succ] and [pred] pointers of a node to point to the node itself, so that - removal becomes an {{:https://en.wikipedia.org/wiki/Idempotence} idempotent} - operation. Using explicit transaction log passing one could implement the - [remove] operation as follows: - - {[ - let remove ~xt node = - (* Read pointers to the predecessor and successor nodes: *) - let pred = Xt.get ~xt node.pred in - let succ = Xt.get ~xt node.succ in - (* Update pointers in this node: *) - Xt.set ~xt node.succ node; - Xt.set ~xt node.pred node; - (* Update pointers to this node: *) - Xt.set ~xt pred.succ succ; - Xt.set ~xt succ.pred pred - ]} +module Xt : sig + (** Explicit transaction log passing on shared memory locations. + + This module provides a way to implement composable transactions over + shared memory locations. A transaction is a function written by the + library user and can be thought of as a specification of a sequence of + {!Xt.get} and {!Xt.set} accesses to shared memory locations. To actually + perform the accesses one then {!Xt.commit}s the transaction. + + Transactions should generally not perform arbitrary side-effects, because + when a transaction is committed it may be attempted multiple times meaning + that the side-effects are also performed multiple times. + {!Xt.post_commit} can be used to perform an action only once after the + transaction has been committed succesfully. + + {b WARNING}: To make it clear, the operations provided by the {!Loc} + module for accessing individual shared memory locations do not implicitly + go through the transaction mechanism and should generally not be used + within transactions. There are advanced algorithms where one might, + within a transaction, perform operations that do not get recorded into the + transaction log. Using such techniques correctly requires expert + knowledge and is not recommended for casual users. + + As an example, consider an implementation of doubly-linked circular + lists. Instead of using a mutable field, [ref], or [Atomic.t], one would + use a shared memory location, or {!Loc.t}, for the pointers in the node + type: + + {[ + type 'a node = { + succ: 'a node Loc.t; + pred: 'a node Loc.t; + datum: 'a; + } + ]} + + To remove a node safely one wants to atomically update the [succ] and + [pred] pointers of the predecessor and successor nodes and to also update + the [succ] and [pred] pointers of a node to point to the node itself, so + that removal becomes an {{:https://en.wikipedia.org/wiki/Idempotence} + idempotent} operation. Using explicit transaction log passing one could + implement the [remove] operation as follows: + + {[ + let remove ~xt node = + (* Read pointers to the predecessor and successor nodes: *) + let pred = Xt.get ~xt node.pred in + let succ = Xt.get ~xt node.succ in + (* Update pointers in this node: *) + Xt.set ~xt node.succ node; + Xt.set ~xt node.pred node; + (* Update pointers to this node: *) + Xt.set ~xt pred.succ succ; + Xt.set ~xt succ.pred pred + ]} + + The labeled argument, [~xt], refers to the transaction log. Transactional + operations like {!Xt.get} and {!Xt.set} are then recorded in that log. To + actually remove a node, we need to commit the transaction - The labeled argument, [~xt], refers to the transaction log. Transactional - operations like {!Xt.get} and {!Xt.set} are then recorded in that log. To - actually remove a node, we need to commit the transaction + {@ocaml skip[ + Xt.commit { tx = remove node } + ]} - {@ocaml skip[ - Xt.commit { tx = remove node } - ]} + which repeatedly calls the transaction function, [tx], to record a + transaction log and attempts to atomically perform it until it succeeds. - which repeatedly calls the transaction function, [tx], to record a - transaction log and attempts to atomically perform it until it succeeds. + Notice that [remove] is not recursive. It doesn't have to account for + failure or perform a backoff. It is also not necessary to know or keep + track of what the previous values of locations were. All of that is taken + care of for us by the transaction log and the {!Xt.commit} function. + Furthermore, [remove] can easily be called as a part of a more complex + transaction. *) - Notice that [remove] is not recursive. It doesn't have to account for - failure or perform a backoff. It is also not necessary to know or keep track - of what the previous values of locations were. All of that is taken care of - for us by the transaction log and the {!Xt.commit} function. Furthermore, - [remove] can easily be called as a part of a more complex transaction. *) -module Xt : sig type 'x t (** Type of an explicit transaction log on shared memory locations. @@ -548,8 +541,7 @@ module Xt : sig val call : xt:'x t -> 'a tx -> 'a (** [call ~xt tx] is equivalent to [tx.Xt.tx ~xt]. *) - val commit : - ?timeoutf:float -> ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a + val commit : ?backoff:Backoff.t -> ?mode:Mode.t -> 'a tx -> 'a (** [commit tx] repeatedly calls [tx] to record a log of shared memory accesses and attempts to perform them atomically until it succeeds and then returns whatever [tx] returned. [tx] may raise {!Retry.Later} or diff --git a/src/kcas_data/dllist.ml b/src/kcas_data/dllist.ml index 4b93cd48..7be748a1 100644 --- a/src/kcas_data/dllist.ml +++ b/src/kcas_data/dllist.ml @@ -225,13 +225,8 @@ let move_l node list = Kcas.Xt.commit { tx = Xt.move_l node list } let move_r node list = Kcas.Xt.commit { tx = Xt.move_r node list } let take_opt_l list = Kcas.Xt.commit { tx = Xt.take_opt_l list } let take_opt_r list = Kcas.Xt.commit { tx = Xt.take_opt_r list } - -let take_blocking_l ?timeoutf list = - Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking_l list } - -let take_blocking_r ?timeoutf list = - Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking_r list } - +let take_blocking_l list = Kcas.Xt.commit { tx = Xt.take_blocking_l list } +let take_blocking_r list = Kcas.Xt.commit { tx = Xt.take_blocking_r list } let swap t1 t2 = Kcas.Xt.commit { tx = Xt.swap t1 t2 } let transfer_l t1 t2 = Kcas.Xt.commit { tx = Xt.transfer_l t1 t2 } let transfer_r t1 t2 = Kcas.Xt.commit { tx = Xt.transfer_r t1 t2 } diff --git a/src/kcas_data/dllist.mli b/src/kcas_data/dllist.mli index de21e093..5f0163b2 100644 --- a/src/kcas_data/dllist.mli +++ b/src/kcas_data/dllist.mli @@ -59,7 +59,6 @@ module Xt : with type 'a t := 'a t with type 'a node := 'a node with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on doubly-linked lists. *) (** {1 Non-compositional interface} *) @@ -69,7 +68,6 @@ include with type 'a t := 'a t with type 'a node := 'a node with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn val take_all : 'a t -> 'a t (** [take_all l] removes all nodes of the doubly-linked list [l] and returns a diff --git a/src/kcas_data/dllist_intf.ml b/src/kcas_data/dllist_intf.ml index 738fda2c..406a9f94 100644 --- a/src/kcas_data/dllist_intf.ml +++ b/src/kcas_data/dllist_intf.ml @@ -2,7 +2,6 @@ module type Ops = sig type 'a t type 'a node type ('x, 'fn) fn - type ('x, 'fn) blocking_fn (** {2 Operations on nodes} *) @@ -42,12 +41,12 @@ module type Ops = sig (** [take_opt_r l] removes and returns the value of rightmost node of the doubly-linked list [l], or return [None] if the list is empty. *) - val take_blocking_l : ('x, 'a t -> 'a) blocking_fn + val take_blocking_l : ('x, 'a t -> 'a) fn (** [take_blocking_l l] removes and returns the value of leftmost node of the doubly-linked list [l], or blocks waiting for the list to become non-empty. *) - val take_blocking_r : ('x, 'a t -> 'a) blocking_fn + val take_blocking_r : ('x, 'a t -> 'a) fn (** [take_blocking_r l] removes and returns the value of rightmost node of the doubly-linked list [l], or blocks waiting for the list to become non-empty. *) diff --git a/src/kcas_data/mvar.ml b/src/kcas_data/mvar.ml index ba31e192..690e3dbc 100644 --- a/src/kcas_data/mvar.ml +++ b/src/kcas_data/mvar.ml @@ -25,18 +25,17 @@ end let is_empty mv = Magic_option.is_none (Loc.get mv) -let put ?timeoutf mv value = +let put mv value = (* Fenceless is safe as we always update. *) - Loc.fenceless_modify ?timeoutf mv (Magic_option.put_or_retry value) + Loc.fenceless_modify mv (Magic_option.put_or_retry value) let try_put mv value = Loc.compare_and_set mv Magic_option.none (Magic_option.some value) -let take ?timeoutf mv = +let take mv = (* Fenceless is safe as we always update. *) - Magic_option.get_unsafe - (Loc.fenceless_update ?timeoutf mv Magic_option.take_or_retry) + Magic_option.get_unsafe (Loc.fenceless_update mv Magic_option.take_or_retry) let take_opt mv = Magic_option.to_option (Loc.exchange mv Magic_option.none) -let peek ?timeoutf mv = Loc.get_as ?timeoutf Magic_option.get_or_retry mv +let peek mv = Loc.get_as Magic_option.get_or_retry mv let peek_opt mv = Magic_option.to_option (Loc.get mv) diff --git a/src/kcas_data/mvar.mli b/src/kcas_data/mvar.mli index f0a7d5ca..12a2b8df 100644 --- a/src/kcas_data/mvar.mli +++ b/src/kcas_data/mvar.mli @@ -26,13 +26,8 @@ module Xt : Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction passing on synchronizing variables. *) (** {1 Non-compositional interface} *) -include - Mvar_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn diff --git a/src/kcas_data/mvar_intf.ml b/src/kcas_data/mvar_intf.ml index e9f6c7e3..3df088f2 100644 --- a/src/kcas_data/mvar_intf.ml +++ b/src/kcas_data/mvar_intf.ml @@ -1,13 +1,12 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty mv] determines whether the synchronizing variable [mv] contains a value or not. *) - val put : ('x, 'a t -> 'a -> unit) blocking_fn + val put : ('x, 'a t -> 'a -> unit) fn (** [put mv x] fills the synchronizing variable [mv] with the value [v] or blocks until the variable becomes empty. *) @@ -16,7 +15,7 @@ module type Ops = sig value [v] and returns [true] on success or [false] in case the variable is full. *) - val take : ('x, 'a t -> 'a) blocking_fn + val take : ('x, 'a t -> 'a) fn (** [take mv] removes and returns the current value of the synchronizing variable [mv] or blocks waiting until the variable is filled. *) @@ -24,7 +23,7 @@ module type Ops = sig (** [take_opt mv] removes and returns the current value of the synchronizing variable [mv] or returns [None] in case the variable is empty. *) - val peek : ('x, 'a t -> 'a) blocking_fn + val peek : ('x, 'a t -> 'a) fn (** [peek mv] returns the current value of the synchronizing variable [mv] or blocks waiting until the variable is filled. *) diff --git a/src/kcas_data/promise.ml b/src/kcas_data/promise.ml index 0786ff9c..cf25e189 100644 --- a/src/kcas_data/promise.ml +++ b/src/kcas_data/promise.ml @@ -38,8 +38,7 @@ module Xt = struct let resolve_error ~xt u e = resolve ~xt u (Error e) end -let await ?timeoutf t = - Loc.get_as ?timeoutf Magic_option.get_or_retry (of_promise t) +let await t = Loc.get_as Magic_option.get_or_retry (of_promise t) let resolve u v = if @@ -51,8 +50,8 @@ let resolve u v = let peek t = Magic_option.to_option (Loc.get (of_promise t)) let is_resolved t = Magic_option.is_some (Loc.get (of_promise t)) -let await_exn ?timeoutf t = - match await ?timeoutf t with Ok value -> value | Error exn -> raise exn +let await_exn t = + match await t with Ok value -> value | Error exn -> raise exn let resolve_ok u v = resolve u (Ok v) let resolve_error u e = resolve u (Error e) diff --git a/src/kcas_data/promise.mli b/src/kcas_data/promise.mli index 11c49988..6515f908 100644 --- a/src/kcas_data/promise.mli +++ b/src/kcas_data/promise.mli @@ -42,7 +42,6 @@ module Xt : with type 'a or_exn := 'a or_exn with type 'a u := 'a u with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on promises. *) (** {1 Non-compositional interface} *) @@ -53,4 +52,3 @@ include with type 'a or_exn := 'a or_exn with type 'a u := 'a u with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn diff --git a/src/kcas_data/promise_intf.ml b/src/kcas_data/promise_intf.ml index 93d1034a..ffe512f9 100644 --- a/src/kcas_data/promise_intf.ml +++ b/src/kcas_data/promise_intf.ml @@ -3,14 +3,13 @@ module type Ops = sig type !-'a u type 'a or_exn type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val resolve : ('x, 'a u -> 'a -> unit) fn (** [resolve u v] resolves the promise corresponding to the resolver [u] to the value [v]. Any awaiters of the corresponding promise are then unblocked. *) - val await : ('x, 'a t -> 'a) blocking_fn + val await : ('x, 'a t -> 'a) fn (** [await t] either immediately returns the resolved value of the promise [t] or blocks until the promise [t] is resolved. *) @@ -24,7 +23,7 @@ module type Ops = sig (** {2 Result promises} *) - val await_exn : ('x, 'a or_exn -> 'a) blocking_fn + val await_exn : ('x, 'a or_exn -> 'a) fn (** [await_exn t] is equivalent to [match await t with v -> v | exception e -> raise e]. *) val resolve_ok : ('x, ('a, 'b) result u -> 'a -> unit) fn diff --git a/src/kcas_data/queue.ml b/src/kcas_data/queue.ml index 543bb0c7..d8742eb6 100644 --- a/src/kcas_data/queue.ml +++ b/src/kcas_data/queue.ml @@ -139,10 +139,10 @@ let take_opt q = | None -> Kcas.Xt.commit { tx = Xt.take_opt q } | some -> some -let take_blocking ?timeoutf q = +let take_blocking q = (* Fenceless is safe as we revert to a transaction in case we didn't update. *) match Loc.fenceless_update q.front Elems.tl_safe |> Elems.hd_opt with - | None -> Kcas.Xt.commit ?timeoutf { tx = Xt.take_blocking q } + | None -> Kcas.Xt.commit { tx = Xt.take_blocking q } | Some elem -> elem let take_all q = Kcas.Xt.commit { tx = Xt.take_all q } @@ -152,9 +152,7 @@ let peek_opt q = | None -> Kcas.Xt.commit { tx = Xt.peek_opt q } | some -> some -let peek_blocking ?timeoutf q = - Kcas.Xt.commit ?timeoutf { tx = Xt.peek_blocking q } - +let peek_blocking q = Kcas.Xt.commit { tx = Xt.peek_blocking q } let clear q = Kcas.Xt.commit { tx = Xt.clear q } let swap q1 q2 = Kcas.Xt.commit { tx = Xt.swap q1 q2 } let to_seq q = Kcas.Xt.commit { tx = Xt.to_seq q } diff --git a/src/kcas_data/queue.mli b/src/kcas_data/queue.mli index d955680c..f4dd49a9 100644 --- a/src/kcas_data/queue.mli +++ b/src/kcas_data/queue.mli @@ -31,16 +31,11 @@ module Xt : Queue_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on queues. *) (** {1 Non-compositional interface} *) -include - Queue_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Queue_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn val peek : 'a t -> 'a (** [peek q] returns the first element in queue [s], or raises {!Empty} if the diff --git a/src/kcas_data/queue_intf.ml b/src/kcas_data/queue_intf.ml index c787bdde..5293058e 100644 --- a/src/kcas_data/queue_intf.ml +++ b/src/kcas_data/queue_intf.ml @@ -1,7 +1,6 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty s] determines whether the queue [q] is empty. *) @@ -32,11 +31,11 @@ module type Ops = sig (** [peek_opt q] returns the first element in queue [q], without removing it from the queue, or returns [None] if the queue is empty. *) - val peek_blocking : ('x, 'a t -> 'a) blocking_fn + val peek_blocking : ('x, 'a t -> 'a) fn (** [peek_blocking q] returns the first element in queue [q], without removing it from the queue, or blocks waiting for the queue to become non-empty. *) - val take_blocking : ('x, 'a t -> 'a) blocking_fn + val take_blocking : ('x, 'a t -> 'a) fn (** [take_blocking q] removes and returns the first element in queue [q], or blocks waiting for the queue to become non-empty. *) diff --git a/src/kcas_data/stack.ml b/src/kcas_data/stack.ml index c4d0170a..ad684e36 100644 --- a/src/kcas_data/stack.ml +++ b/src/kcas_data/stack.ml @@ -30,12 +30,12 @@ let push x s = let pop_opt s = Loc.update s Elems.tl_safe |> Elems.hd_opt let pop_all s = Loc.exchange s Elems.empty |> Elems.to_seq -let pop_blocking ?timeoutf s = +let pop_blocking s = (* Fenceless is safe as we always update. *) - Loc.fenceless_update ?timeoutf s Elems.tl_or_retry |> Elems.hd_unsafe + Loc.fenceless_update s Elems.tl_or_retry |> Elems.hd_unsafe let top_opt s = Loc.get s |> Elems.hd_opt -let top_blocking ?timeoutf s = Loc.get_as ?timeoutf Elems.hd_or_retry s +let top_blocking s = Loc.get_as Elems.hd_or_retry s let clear s = Loc.set s Elems.empty let swap s1 s2 = Kcas.Xt.commit { tx = Kcas.Xt.swap s1 s2 } let to_seq s = Elems.to_seq @@ Loc.get s diff --git a/src/kcas_data/stack.mli b/src/kcas_data/stack.mli index fa7a67a0..d84fe9a6 100644 --- a/src/kcas_data/stack.mli +++ b/src/kcas_data/stack.mli @@ -33,16 +33,11 @@ module Xt : Stack_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn (** Explicit transaction log passing on stacks. *) (** {1 Non-compositional interface} *) -include - Stack_intf.Ops - with type 'a t := 'a t - with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn +include Stack_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn val pop : 'a t -> 'a (** [pop s] removes and returns the topmost element in stack [s], or raises diff --git a/src/kcas_data/stack_intf.ml b/src/kcas_data/stack_intf.ml index 640a840f..53e980f6 100644 --- a/src/kcas_data/stack_intf.ml +++ b/src/kcas_data/stack_intf.ml @@ -1,7 +1,6 @@ module type Ops = sig type 'a t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val is_empty : ('x, 'a t -> bool) fn (** [is_empty s] determines whether the stack [s] is empty. *) @@ -34,7 +33,7 @@ module type Ops = sig sequence for iterating through all the elements that were in the stack top to bottom. *) - val pop_blocking : ('x, 'a t -> 'a) blocking_fn + val pop_blocking : ('x, 'a t -> 'a) fn (** [pop_blocking s] removes and returns the topmost element of the stack [s], or blocks waiting for the queue to become non-empty. *) @@ -42,7 +41,7 @@ module type Ops = sig (** [top_opt s] returns the topmost element in stack [s], or [None] if the stack is empty. *) - val top_blocking : ('x, 'a t -> 'a) blocking_fn + val top_blocking : ('x, 'a t -> 'a) fn (** [top_blocking s] returns the topmost element in stack [s], or blocks waiting for the queue to become non-empty. *) end diff --git a/test/kcas/dune b/test/kcas/dune index e78098c1..7582857d 100644 --- a/test/kcas/dune +++ b/test/kcas/dune @@ -1,9 +1,11 @@ (tests (names test ms_queue_test threads loc_modes) (libraries + scheduler + picos_std.finally + picos_std.structured alcotest kcas - domain-local-timeout threads.posix unix domain_shims) diff --git a/test/kcas/loc_modes.ml b/test/kcas/loc_modes.ml index aa4af6f7..8077aa0b 100644 --- a/test/kcas/loc_modes.ml +++ b/test/kcas/loc_modes.ml @@ -1,3 +1,4 @@ +open Picos_std_structured open Kcas let loop_count = try int_of_string Sys.argv.(1) with _ -> Util.iter_factor @@ -76,6 +77,6 @@ let accumulator_thread () = exit := true let () = - accumulator_thread :: List.init n_counters counter_thread - |> List.map Domain.spawn |> List.iter Domain.join; + Scheduler.run ~n_domains:(n_counters + 1) @@ fun () -> + Run.all (accumulator_thread :: List.init n_counters counter_thread); Printf.printf "Loc modes OK!\n%!" diff --git a/test/kcas/ms_queue_test.ml b/test/kcas/ms_queue_test.ml index d48ad6b1..950a075a 100644 --- a/test/kcas/ms_queue_test.ml +++ b/test/kcas/ms_queue_test.ml @@ -1,3 +1,4 @@ +open Picos_std_structured open Kcas module Q = struct @@ -43,6 +44,8 @@ let failure exit msg = failwith msg let write_skew_test n = + Scheduler.run ~n_domains:3 @@ fun () -> + Flock.join_after @@ fun () -> let q1 = Q.queue () and q2 = Q.queue () in let push_to_q2 ~xt = @@ -61,20 +64,20 @@ let write_skew_test n = let exit = Atomic.make false in - let domains = - [ - Domain.spawn (fun () -> - sync (); - while not (Atomic.get exit) do - Xt.commit { tx = push_to_q1 } - done); - Domain.spawn (fun () -> - sync (); - while not (Atomic.get exit) do - Xt.commit { tx = push_to_q2 } - done); - ] - in + begin + Flock.fork @@ fun () -> + sync (); + while not (Atomic.get exit) do + Xt.commit { tx = push_to_q1 } + done + end; + begin + Flock.fork (fun () -> + sync (); + while not (Atomic.get exit) do + Xt.commit { tx = push_to_q2 } + done) + end; sync (); for _ = 1 to n do @@ -82,11 +85,10 @@ let write_skew_test n = | Some _, Some _ -> failure exit "write skew!" | _ -> () done; - Atomic.set exit true; - - List.iter Domain.join domains + Atomic.set exit true let tail_leak_test n = + Scheduler.run ~n_domains:2 @@ fun () -> let q = Q.queue () in let m = 2 in @@ -120,7 +122,7 @@ let tail_leak_test n = raise e in - List.init m domain |> List.map Domain.spawn |> List.iter Domain.join + Run.all (List.init m domain) let () = let n = try int_of_string Sys.argv.(1) with _ -> 1 * Util.iter_factor in diff --git a/test/kcas/test.ml b/test/kcas/test.ml index 1b433c5a..7e063958 100644 --- a/test/kcas/test.ml +++ b/test/kcas/test.ml @@ -24,73 +24,71 @@ OR IN CONNECTION WITH THE USE OR PERFORMANCE OF THIS SOFTWARE. ---------------------------------------------------------------------------*) +open Picos_std_finally +open Picos_std_structured + let is_single = Domain.recommended_domain_count () = 1 open Kcas -let nb_iter = 100 * Util.iter_factor +let nb_iter = 50 * Util.iter_factor +let n_repeats = 30 let assert_kcas loc expected_v = let present_v = Loc.get loc in assert (present_v == expected_v) -let run_domains = function - | [] -> () - | main :: others -> - let others = List.map Domain.spawn others in - main (); - List.iter Domain.join others - (* *) let test_non_linearizable_xt () = - [ `Obstruction_free; `Lock_free ] - |> List.iter @@ fun mode -> - let barrier = Barrier.make 2 - and n_iter = 100 * Util.iter_factor - and test_finished = ref false in - - let a = Loc.make ~mode 0 and b = Loc.make ~mode 0 in - - let cass1a ~xt = - (Xt.get ~xt b == 0 && Xt.compare_and_set ~xt a 0 1) || Retry.invalid () - and cass1b ~xt = - (Xt.get ~xt b == 0 && Xt.compare_and_set ~xt a 1 0) || Retry.invalid () - and cass2a ~xt = - (Xt.compare_and_set ~xt b 0 1 && Xt.get ~xt a == 0) || Retry.invalid () - and cass2b ~xt = - (Xt.compare_and_set ~xt b 1 0 && Xt.get ~xt a == 0) || Retry.invalid () - in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + [ `Obstruction_free; `Lock_free ] + |> List.iter @@ fun mode -> + let barrier = Barrier.make 2 and test_finished = ref false in + + let a = Loc.make ~mode 0 and b = Loc.make ~mode 0 in + + let cass1a ~xt = + (Xt.get ~xt b == 0 && Xt.compare_and_set ~xt a 0 1) || Retry.invalid () + and cass1b ~xt = + (Xt.get ~xt b == 0 && Xt.compare_and_set ~xt a 1 0) || Retry.invalid () + and cass2a ~xt = + (Xt.compare_and_set ~xt b 0 1 && Xt.get ~xt a == 0) || Retry.invalid () + and cass2b ~xt = + (Xt.compare_and_set ~xt b 1 0 && Xt.get ~xt a == 0) || Retry.invalid () + in - let atomically tx = - if Random.bool () then Xt.commit ~mode:`Obstruction_free tx - else Xt.commit tx - in + let atomically tx = + if Random.bool () then Xt.commit ~mode:`Obstruction_free tx + else Xt.commit tx + in - let thread1 () = - Barrier.await barrier; - while not !test_finished do - if atomically { tx = cass1a } then - while not (atomically { tx = cass1b }) do - if is_single then Domain.cpu_relax (); - assert (Loc.get a == 1 && Loc.get b == 0) - done - else if is_single then Domain.cpu_relax () - done - and thread2 () = - Barrier.await barrier; - for _ = 1 to n_iter do - if atomically { tx = cass2a } then - while not (atomically { tx = cass2b }) do - if is_single then Domain.cpu_relax (); - assert (Loc.get a == 0 && Loc.get b == 1) - done - else if is_single then Domain.cpu_relax () - done; - test_finished := true - in + let thread1 () = + Barrier.await barrier; + while not !test_finished do + if atomically { tx = cass1a } then + while not (atomically { tx = cass1b }) do + if is_single then Domain.cpu_relax (); + assert (Loc.get a == 1 && Loc.get b == 0) + done + else if is_single then Domain.cpu_relax () + done + and thread2 () = + Barrier.await barrier; + for _ = 1 to nb_iter do + if atomically { tx = cass2a } then + while not (atomically { tx = cass2b }) do + if is_single then Domain.cpu_relax (); + assert (Loc.get a == 0 && Loc.get b == 1) + done + else if is_single then Domain.cpu_relax () + done; + test_finished := true + in - run_domains [ thread2; thread1 ] + Run.all [ thread2; thread1 ] + done (* *) @@ -103,124 +101,130 @@ let test_set () = (* *) let test_no_skew_xt () = - [ `Obstruction_free; `Lock_free ] - |> List.iter @@ fun mode -> - let barrier = Barrier.make 3 in - let test_finished = Atomic.make false in - - let a1 = Loc.make ~mode 0 in - let a2 = Loc.make ~mode 0 in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:3 @@ fun () -> + [ `Obstruction_free; `Lock_free ] + |> List.iter @@ fun mode -> + let barrier = Barrier.make 3 in + let test_finished = Atomic.make false in + + let a1 = Loc.make ~mode 0 in + let a2 = Loc.make ~mode 0 in + + let thread1 () = + let c1 ~xt = + Xt.compare_and_set ~xt a1 0 1 && Xt.compare_and_set ~xt a2 0 1 + in + let c2 ~xt = + Xt.compare_and_set ~xt a1 1 0 && Xt.compare_and_set ~xt a2 1 0 + in - let thread1 () = - let c1 ~xt = - Xt.compare_and_set ~xt a1 0 1 && Xt.compare_and_set ~xt a2 0 1 + Barrier.await barrier; + + for _ = 1 to nb_iter do + assert_kcas a1 0; + assert_kcas a2 0; + + let out1 = Xt.commit { tx = c1 } in + assert out1; + + assert_kcas a1 1; + assert_kcas a2 1; + + let out2 = Xt.commit { tx = c2 } in + assert out2 + done; + Atomic.set test_finished true + and thread2 () = + let c1 ~xt = Xt.get ~xt a1 == 0 && Xt.get ~xt a2 == 1 in + let c2 ~xt = Xt.get ~xt a2 == 1 && Xt.get ~xt a2 == 0 in + + Barrier.await barrier; + + while not (Atomic.get test_finished) do + let out1 = Xt.commit { tx = c1 } in + let out2 = Xt.commit { tx = c2 } in + assert (not out1); + assert (not out2); + if is_single then Domain.cpu_relax () + done + and thread3 () = + let c1 ~xt = Xt.get ~xt a1 == 1 && Xt.get ~xt a2 == 0 in + let c2 ~xt = Xt.get ~xt a2 == 0 && Xt.get ~xt a2 == 1 in + + Barrier.await barrier; + + while not (Atomic.get test_finished) do + let out1 = Xt.commit { tx = c1 } in + let out2 = Xt.commit { tx = c2 } in + assert (not out1); + assert (not out2); + if is_single then Domain.cpu_relax () + done in - let c2 ~xt = - Xt.compare_and_set ~xt a1 1 0 && Xt.compare_and_set ~xt a2 1 0 - in - - Barrier.await barrier; - - for _ = 1 to nb_iter do - assert_kcas a1 0; - assert_kcas a2 0; - - let out1 = Xt.commit { tx = c1 } in - assert out1; - - assert_kcas a1 1; - assert_kcas a2 1; - - let out2 = Xt.commit { tx = c2 } in - assert out2 - done; - Atomic.set test_finished true - and thread2 () = - let c1 ~xt = Xt.get ~xt a1 == 0 && Xt.get ~xt a2 == 1 in - let c2 ~xt = Xt.get ~xt a2 == 1 && Xt.get ~xt a2 == 0 in - - Barrier.await barrier; - - while not (Atomic.get test_finished) do - let out1 = Xt.commit { tx = c1 } in - let out2 = Xt.commit { tx = c2 } in - assert (not out1); - assert (not out2); - if is_single then Domain.cpu_relax () - done - and thread3 () = - let c1 ~xt = Xt.get ~xt a1 == 1 && Xt.get ~xt a2 == 0 in - let c2 ~xt = Xt.get ~xt a2 == 0 && Xt.get ~xt a2 == 1 in - - Barrier.await barrier; - - while not (Atomic.get test_finished) do - let out1 = Xt.commit { tx = c1 } in - let out2 = Xt.commit { tx = c2 } in - assert (not out1); - assert (not out2); - if is_single then Domain.cpu_relax () - done - in - run_domains [ thread1; thread2; thread3 ] + Run.all [ thread1; thread2; thread3 ] + done (* *) let test_get_seq_xt () = - [ `Obstruction_free; `Lock_free ] - |> List.iter @@ fun mode -> - let barrier = Barrier.make 4 in - let test_finished = Atomic.make false in - - let a1 = Loc.make ~mode 0 in - let a2 = Loc.make ~mode 0 in - - let mutator () = - Barrier.await barrier; - for _ = 0 to nb_iter do - let tx ~xt = - Xt.incr ~xt a1; - Xt.incr ~xt a2 - in - Xt.commit { tx } - done; - Atomic.set test_finished true - and getter () = - Barrier.await barrier; - while not (Atomic.get test_finished) do - let a = Loc.get a1 in - let b = Loc.get a2 in - assert (a <= b); - if is_single then Domain.cpu_relax () - done - and getaser () = - Barrier.await barrier; - while not (Atomic.get test_finished) do - let a = Loc.get_as Fun.id a1 in - let b = Loc.get_as Fun.id a2 in - assert (a <= b); - if is_single then Domain.cpu_relax () - done - and committer () = - Barrier.await barrier; - while not (Atomic.get test_finished) do - let a = Xt.commit { tx = Xt.get a1 } in - let b = Xt.commit { tx = Xt.get a2 } in - assert (a <= b); - if is_single then Domain.cpu_relax () - done - and updater () = - Barrier.await barrier; - while not (Atomic.get test_finished) do - let a = Loc.update a1 Fun.id in - let b = Loc.update a2 Fun.id in - assert (a <= b); - if is_single then Domain.cpu_relax () - done - in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:5 @@ fun () -> + [ `Obstruction_free; `Lock_free ] + |> List.iter @@ fun mode -> + let barrier = Barrier.make 5 in + let test_finished = Atomic.make false in + + let a1 = Loc.make ~mode 0 in + let a2 = Loc.make ~mode 0 in + + let mutator () = + Barrier.await barrier; + for _ = 0 to nb_iter do + let tx ~xt = + Xt.incr ~xt a1; + Xt.incr ~xt a2 + in + Xt.commit { tx } + done; + Atomic.set test_finished true + and getter () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Loc.get a1 in + let b = Loc.get a2 in + assert (a <= b); + if is_single then Domain.cpu_relax () + done + and getaser () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Loc.get_as Fun.id a1 in + let b = Loc.get_as Fun.id a2 in + assert (a <= b); + if is_single then Domain.cpu_relax () + done + and committer () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Xt.commit { tx = Xt.get a1 } in + let b = Xt.commit { tx = Xt.get a2 } in + assert (a <= b); + if is_single then Domain.cpu_relax () + done + and updater () = + Barrier.await barrier; + while not (Atomic.get test_finished) do + let a = Loc.update a1 Fun.id in + let b = Loc.update a2 Fun.id in + assert (a <= b); + if is_single then Domain.cpu_relax () + done + in - run_domains [ mutator; getter; getaser; committer; updater ] + Run.all [ mutator; getter; getaser; committer; updater ] + done (* *) @@ -271,34 +275,37 @@ let in_place_shuffle array = (* *) let test_presort_and_is_in_log_xt () = - let n_incs = 10 * Util.iter_factor and n_domains = 3 and n_locs = 12 in - let n_locs_half = n_locs asr 1 in - - let barrier = Barrier.make n_domains in - - let locs = Array.init n_locs (fun _ -> Loc.make 0) in - - let thread () = - let locs = Array.copy locs in - Random.self_init (); - Barrier.await barrier; - for _ = 1 to n_incs do - in_place_shuffle locs; - let tx ~xt = - for i = 0 to n_locs_half - 1 do - Xt.incr ~xt locs.(i) - done; - assert (Xt.is_in_log ~xt locs.(Random.int n_locs_half)); - assert (not (Xt.is_in_log ~xt locs.(n_locs_half))) - in - Xt.commit { tx } - done - in + for _ = 1 to n_repeats do + let n_incs = 10 * Util.iter_factor and n_domains = 3 and n_locs = 12 in + let n_locs_half = n_locs asr 1 in + + Scheduler.run ~n_domains @@ fun () -> + let barrier = Barrier.make n_domains in + + let locs = Array.init n_locs (fun _ -> Loc.make 0) in + + let thread () = + let locs = Array.copy locs in + Random.self_init (); + Barrier.await barrier; + for _ = 1 to n_incs do + in_place_shuffle locs; + let tx ~xt = + for i = 0 to n_locs_half - 1 do + Xt.incr ~xt locs.(i) + done; + assert (Xt.is_in_log ~xt locs.(Random.int n_locs_half)); + assert (not (Xt.is_in_log ~xt locs.(n_locs_half))) + in + Xt.commit { tx } + done + in - run_domains (List.init n_domains (Fun.const thread)); + Run.all (List.init n_domains (Fun.const thread)); - let sum = locs |> Array.map Loc.get |> Array.fold_left ( + ) 0 in - assert (sum = n_incs * n_locs_half * n_domains) + let sum = locs |> Array.map Loc.get |> Array.fold_left ( + ) 0 in + assert (sum = n_incs * n_locs_half * n_domains) + done (* *) @@ -344,176 +351,194 @@ let test_post_commit () = (* *) let test_blocking () = - let state = Loc.make `Spawned in - let await state' = - (* Intentionally test that [Xt.modify] allows retry. *) - let tx ~xt = - Xt.modify ~xt state @@ fun state -> - Retry.unless (state == state'); - state - in - Xt.commit { tx } - in - - let a = Loc.make 0 and bs = Array.init 10 @@ fun _ -> Loc.make 0 in - - let n = 10 * Util.iter_factor in - - let num_attempts = ref 0 in - - let other_domain = - Domain.spawn @@ fun () -> - Loc.set state `Get_a_non_zero; - Loc.get_as - (fun a -> - incr num_attempts; - Retry.unless (a != 0)) - a; - - Loc.set state `Update_a_zero; - Loc.modify a (fun a -> - incr num_attempts; - Retry.unless (a = 0); - a + 1); - - Loc.set state `Set_1_b_to_0; - let bs = Array.copy bs in - for _ = 1 to n do - (* We access in random order to exercise tx log waiters handling. *) - in_place_shuffle bs; + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + let state = Loc.make `Spawned in + let await state' = + (* Intentionally test that [Xt.modify] allows retry. *) let tx ~xt = - incr num_attempts; - match - bs - |> Array.find_map @@ fun b -> - if Xt.get ~xt b = 1 then Some b - else if Loc.has_awaiters b (* There must be no leaked waiters... *) - then begin - (* ...except if main domain just set the loc *) - assert (Loc.get b = 1); - Retry.later () - end - else None - with - | None -> Retry.later () - | Some b -> Xt.set ~xt b 0 + Xt.modify ~xt state @@ fun state -> + Retry.unless (state == state'); + state in Xt.commit { tx } - done - in + in - await `Get_a_non_zero; - Loc.set a 1; - assert (!num_attempts <= 2 + 1 (* Need to account for race to next. *)); + let a = Loc.make 0 and bs = Array.init 10 @@ fun _ -> Loc.make 0 in - await `Update_a_zero; - Loc.set a 0; - assert (!num_attempts <= 4 + 1 (* Need to account for race to next. *)); + let n = 10 * Util.iter_factor in - await `Set_1_b_to_0; - for _ = 1 to n do - let i = Random.int (Array.length bs) in - Loc.set bs.(i) 1; - Loc.get_as (fun b -> Retry.unless (b = 0)) bs.(i) - done; + let num_attempts = ref 0 in - Domain.join other_domain; + begin + Flock.join_after @@ fun () -> + begin + Flock.fork @@ fun () -> + Loc.set state `Get_a_non_zero; + Loc.get_as + (fun a -> + incr num_attempts; + Retry.unless (a != 0)) + a; + + Loc.set state `Update_a_zero; + Loc.modify a (fun a -> + incr num_attempts; + Retry.unless (a = 0); + a + 1); + + Loc.set state `Set_1_b_to_0; + let bs = Array.copy bs in + for _ = 1 to n do + (* We access in random order to exercise tx log waiters handling. *) + in_place_shuffle bs; + let tx ~xt = + incr num_attempts; + match + bs + |> Array.find_map @@ fun b -> + if Xt.get ~xt b = 1 then Some b + else if + Loc.has_awaiters b (* There must be no leaked waiters... *) + then begin + (* ...except if main domain just set the loc *) + assert (Loc.get b = 1); + Retry.later () + end + else None + with + | None -> Retry.later () + | Some b -> Xt.set ~xt b 0 + in + Xt.commit { tx } + done + end; + + await `Get_a_non_zero; + Loc.set a 1; + assert (!num_attempts <= 3 + 1 (* Need to account for race to next. *)); + + await `Update_a_zero; + Loc.set a 0; + assert (!num_attempts <= 5 + 1 (* Need to account for race to next. *)); + + await `Set_1_b_to_0; + for _ = 1 to n do + let i = Random.int (Array.length bs) in + Loc.set bs.(i) 1; + Loc.get_as (fun b -> Retry.unless (b = 0)) bs.(i) + done + end; - assert (!num_attempts <= 4 + (n * 2)); - for i = 0 to Array.length bs - 1 do - assert (not (Loc.has_awaiters bs.(i))) + assert (!num_attempts <= 5 + (n * 2)); + for i = 0 to Array.length bs - 1 do + assert (not (Loc.has_awaiters bs.(i))) + done done let test_no_unnecessary_wakeups () = - let continue = Loc.make false and tries = Atomic.make 0 in - - let other_domain = - Domain.spawn @@ fun () -> - continue - |> Loc.get_as @@ fun s -> - Atomic.incr tries; - Retry.unless s - in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + let continue = Loc.make false and tries = Atomic.make 0 in - while not (Loc.has_awaiters continue) do - Domain.cpu_relax () - done; + begin + Flock.join_after @@ fun () -> + begin + Flock.fork @@ fun () -> + continue + |> Loc.get_as @@ fun s -> + Atomic.incr tries; + Retry.unless s + end; + + while not (Loc.has_awaiters continue) do + Domain.cpu_relax () + done; - assert (Loc.compare_and_set continue false false); - assert (not (Loc.update continue Fun.id)); - Loc.set continue false; + assert (Loc.compare_and_set continue false false); + assert (not (Loc.update continue Fun.id)); + Loc.set continue false; - Unix.sleepf 0.01; + Unix.sleepf 0.01; - assert (Loc.has_awaiters continue && Atomic.get tries = 1); - Loc.set continue true; - Domain.join other_domain; - assert ((not (Loc.has_awaiters continue)) && Atomic.get tries = 2) + assert (Loc.has_awaiters continue && Atomic.get tries = 1); + Loc.set continue true + end; + + assert ((not (Loc.has_awaiters continue)) && Atomic.get tries = 2) + done (* *) let test_periodic_validation () = - let a = Loc.make 0 and b = Loc.make 0 and looping = ref false in - let non_zero_difference_domain = - Domain.spawn @@ fun () -> - let rec tx ~xt = - let d = Xt.get ~xt a - Xt.get ~xt b in - if d <> 0 then d - else begin - (* We explicitly want this tx to go into infinite loop! Without - validation this would never finish. *) - looping := true; - tx ~xt - end + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> + let a = Loc.make 0 and b = Loc.make 0 and looping = ref false in + let non_zero_difference_domain = + Flock.fork_as_promise @@ fun () -> + let rec tx ~xt = + let d = Xt.get ~xt a - Xt.get ~xt b in + if d <> 0 then d + else begin + (* We explicitly want this tx to go into infinite loop! Without + validation this would never finish. *) + looping := true; + tx ~xt + end + in + Xt.commit { tx } in - Xt.commit { tx } - in - while not !looping do - Domain.cpu_relax () - done; + while not !looping do + Domain.cpu_relax () + done; - Loc.set a 1; + Loc.set a 1; - assert (1 = Domain.join non_zero_difference_domain) + assert (1 = Promise.await non_zero_difference_domain) + done (* *) let test_explicit_validation () = - let a = Loc.make 0 and b = Loc.make 0 in + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> + let a = Loc.make 0 and b = Loc.make 0 in - let exit = ref false and mutator_running = ref false in - let mutator_domain = - Domain.spawn @@ fun () -> - mutator_running := true; - while not !exit do - let tx ~xt = - Xt.decr ~xt a; - Xt.incr ~xt b - in - Xt.commit { tx }; - Domain.cpu_relax () - done - in + let exit = ref false and mutator_running = ref false in - let n = 100 in + begin + Flock.fork @@ fun () -> + mutator_running := true; + while not !exit do + let tx ~xt = + Xt.decr ~xt a; + Xt.incr ~xt b + in + Xt.commit { tx }; + Domain.cpu_relax () + done + end; - while not !mutator_running do - Domain.cpu_relax () - done; + let n = 100 in - for _ = 1 to n do - let tx ~xt = - let a' = Xt.get ~xt a and b' = Xt.get ~xt b in - Xt.validate ~xt a; - assert (a' + b' = 0) - in - Xt.commit { tx } - done; + while not !mutator_running do + Domain.cpu_relax () + done; - exit := true; + for _ = 1 to n do + let tx ~xt = + let a' = Xt.get ~xt a and b' = Xt.get ~xt b in + Xt.validate ~xt a; + assert (a' + b' = 0) + in + Xt.commit { tx } + done; - Domain.join mutator_domain + exit := true + done (* *) @@ -572,43 +597,46 @@ let test_call () = (** This is a non-deterministic test that might fail occasionally. *) let test_timeout () = - Domain_local_timeout.set_system (module Thread) (module Unix); - - let check (op : ?timeoutf:float -> bool Loc.t -> unit) () = - let rec loop n = - let x = Loc.make false in - let (_ : unit -> unit) = - Domain_local_timeout.set_timeoutf 0.6 @@ fun () -> Loc.set x true + for _ = 1 to n_repeats do + Scheduler.run ~n_domains:5 @@ fun () -> + let check (op : bool Loc.t -> unit) () = + Flock.join_after @@ fun () -> + let rec loop n = + let x = Loc.make false in + let@ _ = + finally Promise.terminate @@ fun () -> + Flock.fork_as_promise @@ fun () -> + Control.sleep ~seconds:0.6; + Loc.set x true + in + match Control.terminate_after ~seconds:0.02 (fun () -> op x) with + | () -> if 0 < n then loop (n - 1) else assert false + | exception Control.Terminate -> + Control.terminate_after ~seconds:2.0 (fun () -> op x) in - match op ~timeoutf:0.02 x with - | () -> if 0 < n then loop (n - 1) else assert false - | exception Timeout.Timeout -> op ~timeoutf:2.0 x + loop 10 in - loop 10 - in - run_domains - [ - check (fun ?timeoutf x -> - Loc.get_as ?timeoutf (fun x -> if not x then Retry.later ()) x); - check (fun ?timeoutf x -> - Loc.update ?timeoutf x (fun x -> x || Retry.later ()) |> ignore); - check (fun ?timeoutf x -> - Loc.modify ?timeoutf x (fun x -> x || Retry.later ())); - check (fun ?timeoutf x -> - let y = Loc.make false in - let tx ~xt = - if not (Xt.get ~xt x) then Retry.later (); - Xt.swap ~xt x y - in - Xt.commit ?timeoutf { tx }); - check (fun ?timeoutf x -> - let y = Loc.make false in - let tx ~xt = - if not (Xt.get ~xt x) then Retry.invalid (); - Xt.swap ~xt x y - in - Xt.commit ?timeoutf { tx }); - ] + Run.all + [ + check (fun x -> Loc.get_as (fun x -> if not x then Retry.later ()) x); + check (fun x -> Loc.update x (fun x -> x || Retry.later ()) |> ignore); + check (fun x -> Loc.modify x (fun x -> x || Retry.later ())); + check (fun x -> + let y = Loc.make false in + let tx ~xt = + if not (Xt.get ~xt x) then Retry.later (); + Xt.swap ~xt x y + in + Xt.commit { tx }); + check (fun x -> + let y = Loc.make false in + let tx ~xt = + if not (Xt.get ~xt x) then Retry.invalid (); + Xt.swap ~xt x y + in + Xt.commit { tx }); + ] + done (* *) @@ -637,6 +665,8 @@ let test_xt () = Xt.commit { tx }; assert (Loc.get rx = Loc.get ry) +(* *) + let () = Alcotest.run "Kcas" [ diff --git a/test/kcas/threads.ml b/test/kcas/threads.ml index 5cbcc60c..5873a7ad 100644 --- a/test/kcas/threads.ml +++ b/test/kcas/threads.ml @@ -1,12 +1,14 @@ open Kcas let await_between_threads () = + Scheduler.run @@ fun () -> let x = Loc.make 0 in let y = Loc.make 0 in let a_thread = () |> Thread.create @@ fun () -> + Scheduler.run @@ fun () -> Loc.get_as (fun x -> Retry.unless (x <> 0)) x; Loc.set y 22 in diff --git a/test/kcas_data/accumulator_test_stm.ml b/test/kcas_data/accumulator_test_stm.ml index 0565a9b0..1ac97a71 100644 --- a/test/kcas_data/accumulator_test_stm.ml +++ b/test/kcas_data/accumulator_test_stm.ml @@ -48,6 +48,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Accumulator" (module Spec) - |> exit +let () = Stm_run.run ~name:"Accumulator" (module Spec) |> exit diff --git a/test/kcas_data/dllist_test_stm.ml b/test/kcas_data/dllist_test_stm.ml index c0dd7a84..669ce820 100644 --- a/test/kcas_data/dllist_test_stm.ml +++ b/test/kcas_data/dllist_test_stm.ml @@ -83,5 +83,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Dllist" (module Spec) |> exit +let () = Stm_run.run ~name:"Dllist" (module Spec) |> exit diff --git a/test/kcas_data/dune b/test/kcas_data/dune index 1047e5a0..8dc21040 100644 --- a/test/kcas_data/dune +++ b/test/kcas_data/dune @@ -14,6 +14,8 @@ stack_test_stm xt_test) (libraries + scheduler + picos_std.structured alcotest kcas kcas_data diff --git a/test/kcas_data/hashtbl_test_stm.ml b/test/kcas_data/hashtbl_test_stm.ml index f2cf2910..ff5a15a7 100644 --- a/test/kcas_data/hashtbl_test_stm.ml +++ b/test/kcas_data/hashtbl_test_stm.ml @@ -62,5 +62,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Hashtbl" (module Spec) |> exit +let () = Stm_run.run ~name:"Hashtbl" (module Spec) |> exit diff --git a/test/kcas_data/linearizable_chaining_example.ml b/test/kcas_data/linearizable_chaining_example.ml index 58614b86..b3063af6 100644 --- a/test/kcas_data/linearizable_chaining_example.ml +++ b/test/kcas_data/linearizable_chaining_example.ml @@ -216,6 +216,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Hashtbl_with_order" (module Spec) - |> exit +let () = Stm_run.run ~name:"Hashtbl_with_order" (module Spec) |> exit diff --git a/test/kcas_data/lru_cache.ml b/test/kcas_data/lru_cache.ml index 110ca832..c5271c07 100644 --- a/test/kcas_data/lru_cache.ml +++ b/test/kcas_data/lru_cache.ml @@ -62,8 +62,5 @@ end let capacity_of c = Kcas.Xt.commit { tx = Xt.capacity_of c } let set_capacity c n = Kcas.Xt.commit { tx = Xt.set_capacity c n } let get_opt c k = Kcas.Xt.commit { tx = Xt.get_opt c k } - -let set_blocking ?timeoutf c k v = - Kcas.Xt.commit ?timeoutf { tx = Xt.set_blocking c k v } - +let set_blocking c k v = Kcas.Xt.commit { tx = Xt.set_blocking c k v } let remove c k = Kcas.Xt.commit { tx = Xt.remove c k } diff --git a/test/kcas_data/lru_cache.mli b/test/kcas_data/lru_cache.mli index b32b1aa3..2162d524 100644 --- a/test/kcas_data/lru_cache.mli +++ b/test/kcas_data/lru_cache.mli @@ -9,10 +9,8 @@ module Xt : Lru_cache_intf.Ops with type ('k, 'v) t := ('k, 'v) t with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn - with type ('x, 'fn) blocking_fn := xt:'x Xt.t -> 'fn include Lru_cache_intf.Ops with type ('k, 'v) t := ('k, 'v) t with type ('x, 'fn) fn := 'fn - with type ('x, 'fn) blocking_fn := ?timeoutf:float -> 'fn diff --git a/test/kcas_data/lru_cache_example.ml b/test/kcas_data/lru_cache_example.ml index 0df2ee29..d0308daf 100644 --- a/test/kcas_data/lru_cache_example.ml +++ b/test/kcas_data/lru_cache_example.ml @@ -1,3 +1,4 @@ +open Picos_std_structured open Kcas module Lru_cache = struct @@ -19,21 +20,23 @@ module Lru_cache = struct | exception Retry.Later -> false end - let get ?timeoutf c k = Kcas.Xt.commit ?timeoutf { tx = Xt.get c k } - let get_if ?timeoutf c k p = Kcas.Xt.commit ?timeoutf { tx = Xt.get_if c k p } + let get c k = Kcas.Xt.commit { tx = Xt.get c k } + let get_if c k p = Kcas.Xt.commit { tx = Xt.get_if c k p } let try_set c k d = Kcas.Xt.commit { tx = Xt.try_set c k d } end let () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let c = Lru_cache.create 10 in - let domain = - Domain.spawn @@ fun () -> + let answer = + Flock.fork_as_promise @@ fun () -> let tx ~xt = Lru_cache.Xt.get ~xt c "a" + Lru_cache.Xt.get ~xt c "b" in Xt.commit { tx } in Lru_cache.set_blocking c "b" 30; Lru_cache.set_blocking c "a" 12; - assert (Domain.join domain = 42); + assert (Promise.await answer = 42); () let () = diff --git a/test/kcas_data/lru_cache_intf.ml b/test/kcas_data/lru_cache_intf.ml index 79a8063e..11ef488d 100644 --- a/test/kcas_data/lru_cache_intf.ml +++ b/test/kcas_data/lru_cache_intf.ml @@ -1,11 +1,10 @@ module type Ops = sig type ('k, 'v) t type ('x, 'fn) fn - type ('x, 'fn) blocking_fn val capacity_of : ('x, ('k, 'v) t -> int) fn val set_capacity : ('x, ('k, 'v) t -> int -> unit) fn val get_opt : ('x, ('k, 'v) t -> 'k -> 'v option) fn - val set_blocking : ('x, ('k, 'v) t -> 'k -> 'v -> unit) blocking_fn + val set_blocking : ('x, ('k, 'v) t -> 'k -> 'v -> unit) fn val remove : ('x, ('k, 'v) t -> 'k -> unit) fn end diff --git a/test/kcas_data/mvar_test.ml b/test/kcas_data/mvar_test.ml index f05872a4..d98693e8 100644 --- a/test/kcas_data/mvar_test.ml +++ b/test/kcas_data/mvar_test.ml @@ -1,7 +1,10 @@ +open Picos_std_structured open Kcas open Kcas_data let basics () = + Scheduler.run ~n_domains:2 @@ fun () -> + Flock.join_after @@ fun () -> let mv = Mvar.create (Some 101) in assert (not (Mvar.is_empty mv)); assert (Mvar.take mv = 101); @@ -9,14 +12,13 @@ let basics () = assert (Mvar.take_opt mv = None); Mvar.put mv 42; let running = Mvar.create None in - let d = - Domain.spawn @@ fun () -> + begin + Flock.fork @@ fun () -> Mvar.put running (); Xt.commit { tx = Mvar.Xt.put mv 76 } - in + end; assert (Mvar.take running = ()); assert (Xt.commit { tx = Mvar.Xt.take mv } = 42); - Domain.join d; assert (Mvar.take mv = 76) let () = diff --git a/test/kcas_data/queue_test_stm.ml b/test/kcas_data/queue_test_stm.ml index d3b0174e..63a166c6 100644 --- a/test/kcas_data/queue_test_stm.ml +++ b/test/kcas_data/queue_test_stm.ml @@ -64,5 +64,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Queue" (module Spec) |> exit +let () = Stm_run.run ~name:"Queue" (module Spec) |> exit diff --git a/test/kcas_data/stack_test_stm.ml b/test/kcas_data/stack_test_stm.ml index 6dd8641a..0e3d62eb 100644 --- a/test/kcas_data/stack_test_stm.ml +++ b/test/kcas_data/stack_test_stm.ml @@ -54,5 +54,4 @@ module Spec = struct | _, _ -> false end -let () = - Stm_run.run ~count:1000 ~verbose:true ~name:"Stack" (module Spec) |> exit +let () = Stm_run.run ~name:"Stack" (module Spec) |> exit diff --git a/test/lib/scheduler/dune b/test/lib/scheduler/dune new file mode 100644 index 00000000..9b8a38fe --- /dev/null +++ b/test/lib/scheduler/dune @@ -0,0 +1,9 @@ +(library + (name scheduler) + (libraries + picos_io.select + (select + scheduler.ml + from + (picos_mux.random picos_mux.multififo -> scheduler.ocaml5.ml) + (picos_mux.thread -> scheduler.ocaml4.ml)))) diff --git a/test/lib/scheduler/scheduler.ocaml4.ml b/test/lib/scheduler/scheduler.ocaml4.ml new file mode 100644 index 00000000..1daa66a4 --- /dev/null +++ b/test/lib/scheduler/scheduler.ocaml4.ml @@ -0,0 +1,5 @@ +let () = + Random.self_init (); + Picos_io_select.configure () + +let run ?n_domains:_ main = Picos_mux_thread.run main diff --git a/test/lib/scheduler/scheduler.ocaml5.ml b/test/lib/scheduler/scheduler.ocaml5.ml new file mode 100644 index 00000000..eb069520 --- /dev/null +++ b/test/lib/scheduler/scheduler.ocaml5.ml @@ -0,0 +1,7 @@ +let () = + Random.self_init (); + Picos_io_select.configure () + +let run ?(n_domains = 1) main = + if Random.bool () then Picos_mux_multififo.run_on ~quota:100 ~n_domains main + else Picos_mux_random.run_on ~n_domains main diff --git a/test/kcas_data/stm_run/dune b/test/lib/stm_run/dune similarity index 100% rename from test/kcas_data/stm_run/dune rename to test/lib/stm_run/dune diff --git a/test/kcas_data/stm_run/empty.ocaml4.ml b/test/lib/stm_run/empty.ocaml4.ml similarity index 100% rename from test/kcas_data/stm_run/empty.ocaml4.ml rename to test/lib/stm_run/empty.ocaml4.ml diff --git a/test/kcas_data/stm_run/empty.ocaml5.ml b/test/lib/stm_run/empty.ocaml5.ml similarity index 100% rename from test/kcas_data/stm_run/empty.ocaml5.ml rename to test/lib/stm_run/empty.ocaml5.ml diff --git a/test/kcas_data/stm_run/intf.ml b/test/lib/stm_run/intf.ml similarity index 96% rename from test/kcas_data/stm_run/intf.ml rename to test/lib/stm_run/intf.ml index fd300e7c..74894700 100644 --- a/test/kcas_data/stm_run/intf.ml +++ b/test/lib/stm_run/intf.ml @@ -43,3 +43,6 @@ module type STM_domain = sig val agree_test_par_asym : count:int -> name:string -> QCheck.Test.t val neg_agree_test_par_asym : count:int -> name:string -> QCheck.Test.t end + +let default_count = 1_000 +let default_budgetf = 60.0 diff --git a/test/kcas_data/stm_run/stm_run.ocaml4.ml b/test/lib/stm_run/stm_run.ocaml4.ml similarity index 64% rename from test/kcas_data/stm_run/stm_run.ocaml4.ml rename to test/lib/stm_run/stm_run.ocaml4.ml index 7b8d851b..655fd3ba 100644 --- a/test/kcas_data/stm_run/stm_run.ocaml4.ml +++ b/test/lib/stm_run/stm_run.ocaml4.ml @@ -1,11 +1,7 @@ include Intf -let count = - let factor b = if b then 10 else 1 in - factor (64 <= Sys.word_size) * factor (Sys.backend_type = Native) * 10 - -let run ?(verbose = true) ?(count = count) ?(budgetf = 60.0) ~name ?make_domain - (module Spec : STM.Spec) = +let run ?(verbose = true) ?(count = default_count) ?(budgetf = default_budgetf) + ~name ?make_domain (module Spec : STM.Spec) = let module Seq = STM_sequential.Make (Spec) in let module Con = STM_thread.Make (Spec) [@alert "-experimental"] in Util.run_with_budget ~budgetf ~count @@ fun count -> diff --git a/test/kcas_data/stm_run/stm_run.ocaml5.ml b/test/lib/stm_run/stm_run.ocaml5.ml similarity index 73% rename from test/kcas_data/stm_run/stm_run.ocaml5.ml rename to test/lib/stm_run/stm_run.ocaml5.ml index cece0a71..67af5c41 100644 --- a/test/kcas_data/stm_run/stm_run.ocaml5.ml +++ b/test/lib/stm_run/stm_run.ocaml5.ml @@ -1,13 +1,7 @@ include Intf -let count = - let factor b = if b then 10 else 1 in - factor (64 <= Sys.word_size) - * factor (Sys.backend_type = Native) - * factor (1 < Domain.recommended_domain_count ()) - -let run (type cmd state sut) ?(verbose = true) ?(count = count) - ?(budgetf = 60.0) ~name ?make_domain +let run (type cmd state sut) ?(verbose = true) ?(count = default_count) + ?(budgetf = default_budgetf) ~name ?make_domain (module Spec : STM.Spec with type cmd = cmd and type state = state diff --git a/test/kcas_data/stm_run/util.ml b/test/lib/stm_run/util.ml similarity index 100% rename from test/kcas_data/stm_run/util.ml rename to test/lib/stm_run/util.ml