-
Notifications
You must be signed in to change notification settings - Fork 11
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
Additional communication and synchronization primitives
- Loading branch information
Showing
14 changed files
with
537 additions
and
0 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,75 @@ | ||
open Kcas | ||
|
||
(* TODO: The semantics are not quite right here *) | ||
|
||
type 'a t = { | ||
givers : 'a option Loc.t Queue.t; | ||
takers : [ `Accepting | `Offer of 'a | `Finished ] Loc.t Queue.t; | ||
} | ||
|
||
let create () = | ||
let givers = Queue.create () and takers = Queue.create () in | ||
{ givers; takers } | ||
|
||
module Xt = struct | ||
let rec try_give ~xt ch value = | ||
match Queue.Xt.take_opt ~xt ch.takers with | ||
| None -> false | ||
| Some slot -> ( | ||
match Xt.compare_and_swap ~xt slot `Accepting (`Offer value) with | ||
| `Accepting -> true | ||
| `Finished -> try_give ~xt ch value | ||
| `Offer _ -> Retry.later ()) | ||
|
||
let rec take_opt ~xt ch = | ||
match Queue.Xt.take_opt ~xt ch.givers with | ||
| None -> None | ||
| Some slot -> ( | ||
match Xt.exchange ~xt slot None with | ||
| None -> take_opt ~xt ch | ||
| Some _ as offer -> offer) | ||
end | ||
|
||
let give ch value = | ||
let tx ~xt = | ||
if Xt.try_give ~xt ch value then None | ||
else | ||
let offer = Some value in | ||
let slot = Loc.make offer in | ||
Queue.Xt.add ~xt slot ch.givers; | ||
Some (slot, offer) | ||
in | ||
match Kcas.Xt.commit { tx } with | ||
| None -> () | ||
| Some (slot, offer) -> ( | ||
try Loc.get_as (fun offer -> Retry.unless (offer == None)) slot | ||
with exn -> | ||
Loc.compare_and_set slot offer None |> ignore; | ||
raise exn) | ||
|
||
let rec take ch = | ||
let tx ~xt = | ||
match Xt.take_opt ~xt ch with | ||
| None -> | ||
let slot = Loc.make `Accepting in | ||
Queue.Xt.add ~xt slot ch.takers; | ||
`Block slot | ||
| Some value -> `Offer value | ||
in | ||
match Kcas.Xt.commit { tx } with | ||
| `Offer value -> value | ||
| `Block slot -> ( | ||
let tx ~xt = | ||
match Kcas.Xt.exchange ~xt slot `Finished with | ||
| `Offer value -> Some value | ||
| `Finished -> None | ||
| `Accepting -> Retry.later () | ||
in | ||
match Kcas.Xt.commit { tx } with | ||
| None -> take ch | ||
| Some value -> value | ||
| exception exn -> | ||
Loc.compare_and_set slot `Accepting `Finished |> ignore; | ||
raise exn) | ||
|
||
let take_opt ch = Kcas.Xt.commit { tx = Xt.take_opt ch } |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,24 @@ | ||
open Kcas | ||
|
||
type 'a t | ||
(** Type of a synchronous channel. *) | ||
|
||
val create : unit -> 'a t | ||
(** [create ()] returns a new synchronous channel. *) | ||
|
||
module Xt : sig | ||
val try_give : xt:'x Xt.t -> 'a t -> 'a -> bool | ||
(** *) | ||
|
||
val take_opt : xt:'x Xt.t -> 'a t -> 'a option | ||
(** *) | ||
end | ||
|
||
val give : 'a t -> 'a -> unit | ||
(** *) | ||
|
||
val take : 'a t -> 'a | ||
(** *) | ||
|
||
val take_opt : 'a t -> 'a option | ||
(** *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,22 @@ | ||
type t = Waiters.t | ||
|
||
let create = Waiters.create | ||
let signal = Waiters.signal | ||
let broadcast = Waiters.broadcast | ||
|
||
let await cond mutex = | ||
let self = Waiters.enqueue cond in | ||
Mutex.unlock mutex; | ||
Fun.protect | ||
(fun () -> Waiters.await self ~on_cancel:signal ~the:cond) | ||
~finally:(fun () -> | ||
(* TODO: This should be protected also from cancellation *) | ||
Mutex.lock mutex) | ||
|
||
let await_no_mutex cond = | ||
Waiters.enqueue cond |> Waiters.await ~on_cancel:signal ~the:cond | ||
|
||
module Xt = struct | ||
let signal = Waiters.Xt.signal ~on_none:(fun ~xt:_ -> ()) | ||
let broadcast = Waiters.Xt.broadcast | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
open Kcas | ||
|
||
(** Condition variable. *) | ||
|
||
(** {1 Common interface} *) | ||
|
||
type t | ||
(** *) | ||
|
||
val create : unit -> t | ||
(** *) | ||
|
||
(** {1 Compositional interface} *) | ||
|
||
module Xt : | ||
Condition_intf.Ops | ||
with type t := t | ||
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn | ||
|
||
(** {1 Non-compositional interface} *) | ||
|
||
include Condition_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn | ||
|
||
val await : t -> Mutex.t -> unit | ||
(** *) | ||
|
||
val await_no_mutex : t -> unit | ||
(** *) | ||
|
||
val broadcast : t -> unit | ||
(** *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,10 @@ | ||
module type Ops = sig | ||
type t | ||
type ('x, 'fn) fn | ||
|
||
val signal : ('x, t -> unit) fn | ||
(** *) | ||
|
||
val broadcast : ('x, t -> unit) fn | ||
(** *) | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,96 @@ | ||
open Kcas | ||
|
||
type t = { | ||
state : [ `Unlocked | `Locked | `Poisoned of exn ] Loc.t; | ||
waiters : Waiters.t; | ||
} | ||
|
||
exception Poisoned of exn | ||
|
||
let poisoned exn = raise @@ Poisoned exn [@@inline never] | ||
|
||
let create () = | ||
let state = Loc.make `Unlocked and waiters = Waiters.create () in | ||
{ state; waiters } | ||
|
||
let unlock mutex = | ||
let tx ~xt = | ||
match Xt.get ~xt mutex.state with | ||
| `Locked -> | ||
Waiters.Xt.signal ~xt mutex.waiters | ||
~on_none:(Xt.set mutex.state `Unlocked); | ||
None | ||
| `Unlocked -> | ||
let exn = Sys_error "Mutex.unlock: already unlocked!" in | ||
Xt.set ~xt mutex.state @@ `Poisoned exn; | ||
Some exn | ||
| `Poisoned exn -> Some (Poisoned exn) | ||
in | ||
Xt.commit { tx } |> Option.iter raise | ||
|
||
let lock mutex = | ||
let tx ~xt = | ||
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with | ||
| `Unlocked -> None | ||
| `Locked -> Some (Waiters.Xt.enqueue ~xt mutex.waiters) | ||
| `Poisoned exn -> poisoned exn | ||
in | ||
Xt.commit { tx } |> Option.iter (Waiters.await ~on_cancel:unlock ~the:mutex) | ||
|
||
let try_lock mutex = | ||
match | ||
Loc.update mutex.state @@ function | ||
| `Unlocked -> `Locked | ||
| (`Locked | `Poisoned _) as other -> other | ||
with | ||
| `Unlocked -> true | ||
| `Locked -> false | ||
| `Poisoned exn -> poisoned exn | ||
|
||
let poison mutex exn = | ||
match | ||
Loc.update mutex.state @@ function | ||
| `Locked | `Unlocked -> `Poisoned exn | ||
| `Poisoned _ as poisoned -> poisoned | ||
with | ||
| `Locked | `Unlocked -> Waiters.broadcast mutex.waiters | ||
| `Poisoned _ -> () | ||
|
||
let use_rw mutex thunk = | ||
lock mutex; | ||
match thunk () with | ||
| value -> | ||
unlock mutex; | ||
value | ||
| exception exn -> | ||
poison mutex exn; | ||
raise exn | ||
|
||
let use_ro mutex thunk = | ||
lock mutex; | ||
Fun.protect ~finally:(fun () -> unlock mutex) thunk | ||
|
||
module Xt = struct | ||
let lock ~xt mutex = | ||
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with | ||
| `Unlocked -> () | ||
| `Locked -> Retry.later () | ||
| `Poisoned exn -> poisoned exn | ||
|
||
let try_lock ~xt mutex = | ||
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with | ||
| `Unlocked -> true | ||
| `Locked -> false | ||
| `Poisoned exn -> poisoned exn | ||
|
||
let unlock ~xt mutex = | ||
match Xt.get ~xt mutex.state with | ||
| `Locked -> | ||
Waiters.Xt.signal ~xt mutex.waiters | ||
~on_none:(Xt.set mutex.state `Unlocked) | ||
| `Unlocked -> | ||
let exn = Sys_error "Mutex.unlock: already unlocked!" in | ||
Xt.set ~xt mutex.state @@ `Poisoned exn; | ||
raise exn | ||
| `Poisoned exn -> poisoned exn | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,46 @@ | ||
open Kcas | ||
|
||
(** Mutual exclusion. *) | ||
|
||
(* TODO: Consider ordering of mutexes *) | ||
|
||
(** {1 Common interface} *) | ||
|
||
exception Poisoned of exn | ||
(** *) | ||
|
||
type t | ||
(** *) | ||
|
||
val create : unit -> t | ||
(** *) | ||
|
||
(** {1 Compositional interface} *) | ||
|
||
module Xt : sig | ||
val lock : xt:'x Xt.t -> t -> unit | ||
(** *) | ||
|
||
val try_lock : xt:'x Xt.t -> t -> bool | ||
(** *) | ||
|
||
val unlock : xt:'x Xt.t -> t -> unit | ||
(** *) | ||
end | ||
|
||
(** {1 Non-compositional interface} *) | ||
|
||
val lock : t -> unit | ||
(** *) | ||
|
||
val try_lock : t -> bool | ||
(** *) | ||
|
||
val unlock : t -> unit | ||
(** *) | ||
|
||
val use_rw : t -> (unit -> 'a) -> 'a | ||
(** *) | ||
|
||
val use_ro : t -> (unit -> 'a) -> 'a | ||
(** *) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,31 @@ | ||
open Kcas | ||
|
||
let non_neg_dec n = if 0 < n then n - 1 else n | ||
|
||
type t = { count : int Loc.t; waiters : Waiters.t } | ||
|
||
let make n = | ||
if n < 0 then invalid_arg "n < 0"; | ||
let count = Loc.make n and waiters = Waiters.create () in | ||
{ count; waiters } | ||
|
||
let get_value sem = Loc.get sem.count | ||
|
||
module Xt = struct | ||
let get_value ~xt sem = Xt.get ~xt sem.count | ||
|
||
let release ~xt sem = | ||
Waiters.Xt.signal ~xt sem.waiters ~on_none:(Xt.incr sem.count) | ||
|
||
let acquire ~xt sem = Retry.unless (0 < Xt.fetch_and_add ~xt sem.count (-1)) | ||
end | ||
|
||
let release sem = Kcas.Xt.commit { tx = Xt.release sem } | ||
|
||
let acquire sem = | ||
let tx ~xt = | ||
if 0 < Kcas.Xt.update ~xt sem.count non_neg_dec then None | ||
else Some (Waiters.Xt.enqueue ~xt sem.waiters) | ||
in | ||
Kcas.Xt.commit { tx } | ||
|> Option.iter (Waiters.await ~on_cancel:release ~the:sem) |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,23 @@ | ||
open Kcas | ||
|
||
(** *) | ||
|
||
(** {1 Common interface} *) | ||
|
||
type t | ||
(** *) | ||
|
||
val make : int -> t | ||
(** *) | ||
|
||
(** {1 Compositional interface} *) | ||
|
||
module Xt : | ||
Semaphore_intf.Ops | ||
with type t := t | ||
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn | ||
(** Explicit transaction log passing on semaphores. *) | ||
|
||
(** {1 Non-compositional interface} *) | ||
|
||
include Semaphore_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,13 @@ | ||
module type Ops = sig | ||
type t | ||
type ('x, 'fn) fn | ||
|
||
val release : ('x, t -> unit) fn | ||
(** *) | ||
|
||
val acquire : ('x, t -> unit) fn | ||
(** *) | ||
|
||
val get_value : ('x, t -> int) fn | ||
(** *) | ||
end |
Oops, something went wrong.