Skip to content

Commit

Permalink
Change to use Picos instead of DLA and DLT
Browse files Browse the repository at this point in the history
This adds support for cancelation through Picos and removes explicit support for
timeouts, which simplifies the library.

Support for DLA and DLT is removed.

This basically also means that one can no longer use Kcas without a scheduler.
  • Loading branch information
polytypic committed Nov 10, 2024
1 parent 6c53ef1 commit 2f2193f
Show file tree
Hide file tree
Showing 63 changed files with 786 additions and 1,022 deletions.
6 changes: 6 additions & 0 deletions CHANGES.md
Original file line number Diff line number Diff line change
@@ -1,3 +1,9 @@
## Next version

- Changed to use [Picos](https://github.com/ocaml-multicore/picos/) instead of
[DLA](https://github.com/ocaml-multicore/domain-local-await/) and
[DLT](https://github.com/ocaml-multicore/domain-local-timeout/) (@polytypic)

## 0.7.0

- Numerous minor internal improvements (@polytypic)
Expand Down
84 changes: 40 additions & 44 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,8 +47,8 @@ Features and properties:
read-only compare (CMP) operations that can be performed on overlapping
locations in parallel without interference.

- **_Blocking await_**: The algorithm supports timeouts and awaiting for changes
to any number of shared memory locations.
- **_Blocking await_**: The algorithm supports cancelation and awaiting for
changes to any number of shared memory locations.

- **_Composable_**: Independently developed transactions can be composed with
ease sequentially, conjunctively, conditionally, and disjunctively.
Expand Down Expand Up @@ -76,7 +76,7 @@ is distributed under the [ISC license](LICENSE.md).
- [A transactional lock-free queue](#a-transactional-lock-free-queue)
- [Composing transactions](#composing-transactions)
- [Blocking transactions](#blocking-transactions)
- [Timeouts](#timeouts)
- [Cancelation and Timeouts](#cancelation-and-timeouts)
- [A transactional lock-free leftist heap](#a-transactional-lock-free-leftist-heap)
- [Programming with transactional data structures](#programming-with-transactional-data-structures)
- [The dining philosophers problem](#the-dining-philosophers-problem)
Expand All @@ -101,14 +101,7 @@ is distributed under the [ISC license](LICENSE.md).

To use the library

<!--
```ocaml
# #thread
```
-->

```ocaml
# #require "kcas"
# open Kcas
```

Expand Down Expand Up @@ -143,6 +136,7 @@ Block waiting for changes to locations:

```ocaml
# let a_domain = Domain.spawn @@ fun () ->
Scheduler.run @@ fun () ->
let x = Loc.get_as (fun x -> Retry.unless (x <> 0); x) x in
Printf.sprintf "The answer is %d!" x
val a_domain : string Domain.t = <abstr>
Expand Down Expand Up @@ -551,13 +545,28 @@ and then spawn a domain that tries to atomically both pop and dequeue:

```ocaml
# let a_domain = Domain.spawn @@ fun () ->
Scheduler.run @@ fun () ->
let tx ~xt = (pop ~xt a_stack, dequeue ~xt a_queue) in
let (popped, dequeued) = Xt.commit { tx } in
Printf.sprintf "I popped %d and dequeued %d!"
popped dequeued
val a_domain : string Domain.t = <abstr>
```

**Kcas** uses the [Picos](https://github.com/ocaml-multicore/picos/) interface
to implement blocking. Above `Scheduler.run` starts an effects based
[Picos compatible](https://ocaml-multicore.github.io/picos/doc/picos/index.html#interoperability)
scheduler, which allows **Kcas** to block in a scheduler friendly manner.

> **_Note_**: Typically your entire program would run inside a scheduler and you
> should
> [fork fibers](https://ocaml-multicore.github.io/picos/doc/picos_mux/index.html#examples)
> rather than spawn domains and start schedulers. The
> [MDX](https://github.com/realworldocaml/mdx) tool used for checking this
> document does not allow one to start a scheduler once and run individual code
> snippets within the scheduler, which is why individual examples spawn domains
> and start schedulers.
The domain is now blocked waiting for changes to the stack and the queue. As
long as we don't populate both at the same time

Expand Down Expand Up @@ -585,7 +594,7 @@ The retry mechanism essentially allows a transaction to wait for an arbitrary
condition and can function as a fairly expressive communication and
synchronization mechanism.

#### Timeouts
#### Cancelation and Timeouts

> If you block, will they come?
Expand All @@ -605,43 +614,30 @@ val pop_or_raise_if :
xt:'a Xt.t -> bool Loc.t -> 'b list Loc.t -> xt:'c Xt.t -> 'b = <fun>
```

This works, but creating, checking, and canceling timeouts properly can be a lot
of work. Therefore **Kcas** also directly supports an optional `timeoutf`
argument for potentially blocking operations. For example, to perform a blocking
pop with a timeout, one can simply explicitly pass the desired timeout in
seconds:

```ocaml
# let an_empty_stack = stack () in
Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack }
Exception: Failure "Domain_local_timeout.set_timeoutf not implemented".
```

Oops! What happened above is that the
[_domain local timeout_](https://github.com/ocaml-multicore/domain-local-timeout)
mechanism used by **Kcas** was not implemented on the current domain. The idea
is that, in the future, concurrent schedulers provide the mechanism out of the
box, but there is also a default implementation using the Stdlib `Thread` and
`Unix` modules that works on most platforms. However, to avoid direct
dependencies to `Thread` and `Unix`, we need to explicitly tell the library that
it can use those modules:
This works, but creating, checking, and canceling timeouts properly in this
manner can be a lot of work. Therefore **Kcas** also directly supports
[cancelation](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/index.html#understanding-cancelation)
through the [Picos](https://github.com/ocaml-multicore/picos/) interface. This
both allows **Kcas** transactions to be cleanly terminated in case the program
has encountered an error and also allows one to simply use a timeout mechanism
provided by the scheduler. For example, the sample
[structured concurrency library](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/index.html)

```ocaml
# Domain_local_timeout.set_system (module Thread) (module Unix)
- : unit = ()
# open Picos_std_structured
```

This initialization, if needed, should be done by application code rather than
by libraries.

If we now retry the previous example we will get a
[`Timeout`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Timeout/index.html#exception-Timeout)
exception as expected:
for Picos provides the
[`Control.terminate_after`](https://ocaml-multicore.github.io/picos/doc/picos_std/Picos_std_structured/Control/index.html#val-terminate_after)
operation, which allows one to easily run an operation with a timeout on the
current fiber:

```ocaml
# let an_empty_stack = stack () in
Xt.commit ~timeoutf:0.1 { tx = pop an_empty_stack }
Exception: Kcas.Timeout.Timeout.
Scheduler.run @@ fun () ->
Control.terminate_after ~seconds:0.1 @@ fun () ->
Xt.commit { tx = pop an_empty_stack }
Exception: Picos_std_structured__Control.Terminate.
```

Besides
Expand All @@ -651,7 +647,7 @@ potentially blocking single location operations such as
[`update`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-update),
and
[`modify`](https://ocaml-multicore.github.io/kcas/doc/kcas/Kcas/Loc/index.html#val-modify)
support the optional `timeoutf` argument.
support cancelation.

#### A transactional lock-free leftist heap

Expand Down Expand Up @@ -839,10 +835,9 @@ structures.
One source of ready-made data structures is
[**Kcas_data**](https://ocaml-multicore.github.io/kcas/doc/kcas_data/Kcas_data/index.html).
Let's explore how we can leverage those data structures. Of course, first we
need to `#require` the package and we'll also open it for convenience:
open `Kcas_data` for convenience:

```ocaml
# #require "kcas_data"
# open Kcas_data
```

Expand Down Expand Up @@ -915,6 +910,7 @@ the philosophers:
in
Array.iter Domain.join @@ Array.init philosophers @@ fun i ->
Domain.spawn @@ fun () ->
Scheduler.run @@ fun () ->
let fork_lhs = forks.(i)
and fork_rhs = forks.((i + 1) mod philosophers)
and eaten = eaten.(i) in
Expand Down
4 changes: 2 additions & 2 deletions bench/bench_accumulator.ml
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () =
let n_ops_todo = Atomic.make n_ops |> Multicore_magic.copy_as_padded in

let init _ = () in

let wrap _ _ action = Scheduler.run action in
let work _ () =
let rec work () =
let n = Util.alloc n_ops_todo in
Expand All @@ -34,7 +34,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 180 * Util.iter_factor) () =
(if n_domains = 1 then "" else "s")
in

Times.record ~budgetf ~n_domains ~init ~work ~after ()
Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after ()
|> Times.to_thruput_metrics ~n:n_ops ~config ~singular:"operation"

let run_suite ~budgetf =
Expand Down
6 changes: 4 additions & 2 deletions bench/bench_dllist.ml
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,10 @@ let run_single ~budgetf ?(n_msgs = 15 * Util.iter_factor) () =
assert (Dllist.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let wrap _ _ action = Scheduler.run action in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
Expand All @@ -31,6 +32,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let wrap _ _ action = Scheduler.run action in
let work i () =
if i < n_adders then
let rec work () =
Expand Down Expand Up @@ -70,7 +72,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(n_takers = 2) ?(factor = 1)
(format "taker" false n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
Expand Down
4 changes: 2 additions & 2 deletions bench/bench_hashtbl.ml
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor)
Atomic.set n_ops_todo n_ops;
Random.State.make_self_init ()
in

let wrap _ _ action = Scheduler.run action in
let work _ state =
let rec work () =
let n = Util.alloc n_ops_todo in
Expand Down Expand Up @@ -56,7 +56,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 40 * Util.iter_factor)
percent_read
in

Times.record ~budgetf ~n_domains ~init ~work ()
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_ops ~singular:"operation" ~config

let run_suite ~budgetf =
Expand Down
3 changes: 2 additions & 1 deletion bench/bench_mvar.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
let n_msgs_to_add = Atomic.make n_msgs |> Multicore_magic.copy_as_padded in

let init _ = () in
let wrap _ _ action = Scheduler.run action in
let work i () =
if i < n_adders then
if blocking_add then
Expand Down Expand Up @@ -79,7 +80,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
(format "taker" blocking_take n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ~after ()
Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
Expand Down
4 changes: 2 additions & 2 deletions bench/bench_parallel_cmp.ml
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () =
let n_ops_todo = Atomic.make n_ops |> Multicore_magic.copy_as_padded in

let init i = Array.unsafe_get xs i in

let wrap _ _ action = Scheduler.run action in
let work _ x =
let tx1 ~xt =
let a = Xt.get ~xt a in
Expand Down Expand Up @@ -41,7 +41,7 @@ let run_one ~budgetf ~n_domains ?(n_ops = 50 * Util.iter_factor) () =
Printf.sprintf "%d worker%s" n_domains (if n_domains = 1 then "" else "s")
in

Times.record ~budgetf ~n_domains ~init ~work ~after ()
Times.record ~budgetf ~n_domains ~init ~wrap ~work ~after ()
|> Times.to_thruput_metrics ~n:n_ops ~singular:"transaction" ~config

let run_suite ~budgetf =
Expand Down
6 changes: 4 additions & 2 deletions bench/bench_queue.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
assert (Queue.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let wrap _ _ action = Scheduler.run action in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
Expand All @@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let wrap _ _ action = Scheduler.run action in
let work i () =
if i < n_adders then
let rec work () =
Expand Down Expand Up @@ -79,7 +81,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
(format "taker" blocking_take n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
Expand Down
6 changes: 4 additions & 2 deletions bench/bench_stack.ml
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,10 @@ let run_one_domain ~budgetf ?(n_msgs = 50 * Util.iter_factor) () =
assert (Stack.is_empty t);
Util.generate_push_and_pop_sequence n_msgs
in
let wrap _ _ action = Scheduler.run action in
let work _ bits = Util.Bits.iter op bits in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config:"one domain"

let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
Expand All @@ -29,6 +30,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
Atomic.set n_msgs_to_take n_msgs;
Atomic.set n_msgs_to_add n_msgs
in
let wrap _ _ action = Scheduler.run action in
let work i () =
if i < n_adders then
let rec work () =
Expand Down Expand Up @@ -79,7 +81,7 @@ let run_one ~budgetf ?(n_adders = 2) ?(blocking_add = false) ?(n_takers = 2)
(format "taker" blocking_take n_takers)
in

Times.record ~budgetf ~n_domains ~init ~work ()
Times.record ~budgetf ~n_domains ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_msgs ~singular:"message" ~config

let run_suite ~budgetf =
Expand Down
3 changes: 2 additions & 1 deletion bench/bench_xt.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2)
in

let init _ = () in
let wrap _ _ action = Scheduler.run action in
let work _ () =
let rec loop i =
if i > 0 then begin
Expand All @@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2)

let config = Printf.sprintf "%d loc tx" n_locs in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config

let run_suite ~budgetf =
Expand Down
3 changes: 2 additions & 1 deletion bench/bench_xt_ro.ml
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@ let run_one ~budgetf ?(n_locs = 2)
in

let init _ = () in
let wrap _ _ action = Scheduler.run action in
let work _ () =
let rec loop i =
if i > 0 then begin
Expand All @@ -27,7 +28,7 @@ let run_one ~budgetf ?(n_locs = 2)

let config = Printf.sprintf "%d loc tx" n_locs in

Times.record ~budgetf ~n_domains:1 ~init ~work ()
Times.record ~budgetf ~n_domains:1 ~init ~wrap ~work ()
|> Times.to_thruput_metrics ~n:n_iter ~singular:"transaction" ~config

let run_suite ~budgetf =
Expand Down
1 change: 1 addition & 0 deletions bench/dune
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ let () =
(action
(run %{test} -brief))
(libraries
scheduler
kcas_data
multicore-bench
backoff
Expand Down
2 changes: 1 addition & 1 deletion doc/dune
Original file line number Diff line number Diff line change
Expand Up @@ -5,4 +5,4 @@
(package kcas_data))
(enabled_if
(>= %{ocaml_version} 5.0.0))
(files gkmz-with-read-only-cmp-ops.md scheduler-interop.md))
(files gkmz-with-read-only-cmp-ops.md))
Loading

0 comments on commit 2f2193f

Please sign in to comment.