Skip to content

Commit

Permalink
Additional communication and synchronization primitives
Browse files Browse the repository at this point in the history
  • Loading branch information
polytypic committed May 14, 2023
1 parent c5a74e4 commit f613096
Show file tree
Hide file tree
Showing 17 changed files with 616 additions and 0 deletions.
75 changes: 75 additions & 0 deletions src/kcas_data/ch.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
open Kcas

(* TODO: The semantics are not quite right here *)

type 'a t = {
givers : 'a option Loc.t Queue.t;
takers : [ `Accepting | `Offer of 'a | `Finished ] Loc.t Queue.t;
}

let create () =
let givers = Queue.create () and takers = Queue.create () in
{ givers; takers }

module Xt = struct
let rec try_give ~xt ch value =
match Queue.Xt.take_opt ~xt ch.takers with
| None -> false
| Some slot -> (
match Xt.compare_and_swap ~xt slot `Accepting (`Offer value) with
| `Accepting -> true
| `Finished -> try_give ~xt ch value
| `Offer _ -> Retry.later ())

let rec take_opt ~xt ch =
match Queue.Xt.take_opt ~xt ch.givers with
| None -> None
| Some slot -> (
match Xt.exchange ~xt slot None with
| None -> take_opt ~xt ch
| Some _ as offer -> offer)
end

let give ch value =
let tx ~xt =
if Xt.try_give ~xt ch value then None
else
let offer = Some value in
let slot = Loc.make offer in
Queue.Xt.add ~xt slot ch.givers;
Some (slot, offer)
in
match Kcas.Xt.commit { tx } with
| None -> ()
| Some (slot, offer) -> (
try Loc.get_as (fun offer -> Retry.unless (offer == None)) slot
with exn ->
Loc.compare_and_set slot offer None |> ignore;
raise exn)

let rec take ch =
let tx ~xt =
match Xt.take_opt ~xt ch with
| None ->
let slot = Loc.make `Accepting in
Queue.Xt.add ~xt slot ch.takers;
`Block slot
| Some value -> `Offer value
in
match Kcas.Xt.commit { tx } with
| `Offer value -> value
| `Block slot -> (
let tx ~xt =
match Kcas.Xt.exchange ~xt slot `Finished with
| `Offer value -> Some value
| `Finished -> None
| `Accepting -> Retry.later ()
in
match Kcas.Xt.commit { tx } with
| None -> take ch
| Some value -> value
| exception exn ->
Loc.compare_and_set slot `Accepting `Finished |> ignore;
raise exn)

let take_opt ch = Kcas.Xt.commit { tx = Xt.take_opt ch }
24 changes: 24 additions & 0 deletions src/kcas_data/ch.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
open Kcas

type 'a t
(** Type of a synchronous channel. *)

val create : unit -> 'a t
(** [create ()] returns a new synchronous channel. *)

module Xt : sig
val try_give : xt:'x Xt.t -> 'a t -> 'a -> bool
(** *)

val take_opt : xt:'x Xt.t -> 'a t -> 'a option
(** *)
end

val give : 'a t -> 'a -> unit
(** *)

val take : 'a t -> 'a
(** *)

val take_opt : 'a t -> 'a option
(** *)
22 changes: 22 additions & 0 deletions src/kcas_data/condition.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
type t = Waiters.t

let create = Waiters.create
let signal = Waiters.signal
let broadcast = Waiters.broadcast

let await cond mutex =
let self = Waiters.enqueue cond in
Mutex.unlock mutex;
Fun.protect
(fun () -> Waiters.await self ~on_cancel:signal ~the:cond)
~finally:(fun () ->
(* TODO: This should be protected also from cancellation *)
Mutex.lock mutex)

let await_no_mutex cond =
Waiters.enqueue cond |> Waiters.await ~on_cancel:signal ~the:cond

module Xt = struct
let signal = Waiters.Xt.signal ~on_none:(fun ~xt:_ -> ())
let broadcast = Waiters.Xt.broadcast
end
31 changes: 31 additions & 0 deletions src/kcas_data/condition.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
open Kcas

(** Condition variable. *)

(** {1 Common interface} *)

type t
(** *)

val create : unit -> t
(** *)

(** {1 Compositional interface} *)

module Xt :
Condition_intf.Ops
with type t := t
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn

(** {1 Non-compositional interface} *)

include Condition_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn

val await : t -> Mutex.t -> unit
(** *)

val await_no_mutex : t -> unit
(** *)

val broadcast : t -> unit
(** *)
10 changes: 10 additions & 0 deletions src/kcas_data/condition_intf.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module type Ops = sig
type t
type ('x, 'fn) fn

val signal : ('x, t -> unit) fn
(** *)

val broadcast : ('x, t -> unit) fn
(** *)
end
6 changes: 6 additions & 0 deletions src/kcas_data/kcas_data.ml
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,13 @@ module Stack = Stack

(** {1 Communication and synchronization primitives} *)

module Ch = Ch
module Condition = Condition
module Mutex = Mutex
module Mvar = Mvar
module Promise = Promise
module Semaphore = Semaphore
module Stream = Stream

(** {1 Linked data structures} *)

Expand Down
96 changes: 96 additions & 0 deletions src/kcas_data/mutex.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
open Kcas

type t = {
state : [ `Unlocked | `Locked | `Poisoned of exn ] Loc.t;
waiters : Waiters.t;
}

exception Poisoned of exn

let poisoned exn = raise @@ Poisoned exn [@@inline never]

let create () =
let state = Loc.make `Unlocked and waiters = Waiters.create () in
{ state; waiters }

let unlock mutex =
let tx ~xt =
match Xt.get ~xt mutex.state with
| `Locked ->
Waiters.Xt.signal ~xt mutex.waiters
~on_none:(Xt.set mutex.state `Unlocked);
None
| `Unlocked ->
let exn = Sys_error "Mutex.unlock: already unlocked!" in
Xt.set ~xt mutex.state @@ `Poisoned exn;
Some exn
| `Poisoned exn -> Some (Poisoned exn)
in
Xt.commit { tx } |> Option.iter raise

let lock mutex =
let tx ~xt =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> None
| `Locked -> Some (Waiters.Xt.enqueue ~xt mutex.waiters)
| `Poisoned exn -> poisoned exn
in
Xt.commit { tx } |> Option.iter (Waiters.await ~on_cancel:unlock ~the:mutex)

let try_lock mutex =
match
Loc.update mutex.state @@ function
| `Unlocked -> `Locked
| (`Locked | `Poisoned _) as other -> other
with
| `Unlocked -> true
| `Locked -> false
| `Poisoned exn -> poisoned exn

let poison mutex exn =
match
Loc.update mutex.state @@ function
| `Locked | `Unlocked -> `Poisoned exn
| `Poisoned _ as poisoned -> poisoned
with
| `Locked | `Unlocked -> Waiters.broadcast mutex.waiters
| `Poisoned _ -> ()

let use_rw mutex thunk =
lock mutex;
match thunk () with
| value ->
unlock mutex;
value
| exception exn ->
poison mutex exn;
raise exn

let use_ro mutex thunk =
lock mutex;
Fun.protect ~finally:(fun () -> unlock mutex) thunk

module Xt = struct
let lock ~xt mutex =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> ()
| `Locked -> Retry.later ()
| `Poisoned exn -> poisoned exn

let try_lock ~xt mutex =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> true
| `Locked -> false
| `Poisoned exn -> poisoned exn

let unlock ~xt mutex =
match Xt.get ~xt mutex.state with
| `Locked ->
Waiters.Xt.signal ~xt mutex.waiters
~on_none:(Xt.set mutex.state `Unlocked)
| `Unlocked ->
let exn = Sys_error "Mutex.unlock: already unlocked!" in
Xt.set ~xt mutex.state @@ `Poisoned exn;
raise exn
| `Poisoned exn -> poisoned exn
end
46 changes: 46 additions & 0 deletions src/kcas_data/mutex.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
open Kcas

(** Mutual exclusion. *)

(* TODO: Consider ordering of mutexes *)

(** {1 Common interface} *)

exception Poisoned of exn
(** *)

type t
(** *)

val create : unit -> t
(** *)

(** {1 Compositional interface} *)

module Xt : sig
val lock : xt:'x Xt.t -> t -> unit
(** *)

val try_lock : xt:'x Xt.t -> t -> bool
(** *)

val unlock : xt:'x Xt.t -> t -> unit
(** *)
end

(** {1 Non-compositional interface} *)

val lock : t -> unit
(** *)

val try_lock : t -> bool
(** *)

val unlock : t -> unit
(** *)

val use_rw : t -> (unit -> 'a) -> 'a
(** *)

val use_ro : t -> (unit -> 'a) -> 'a
(** *)
34 changes: 34 additions & 0 deletions src/kcas_data/mvar.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
open Kcas

type 'a t = 'a Magic_option.t Loc.t

let create () = Loc.make Magic_option.none

module Xt = struct
let is_empty ~xt mv = Magic_option.is_none (Xt.get ~xt mv)

let put_some ~xt mv some =
Retry.unless (Magic_option.is_none (Xt.exchange ~xt mv some))

let put ~xt mv value = put_some ~xt mv (Magic_option.some value)

let take_opt ~xt mv =
Magic_option.to_option (Xt.exchange ~xt mv Magic_option.none)

let take ~xt mv =
Magic_option.get_or_retry (Xt.exchange ~xt mv Magic_option.none)
end

let is_empty mv = Magic_option.is_none (Loc.get mv)

let put mv value =
Loc.modify mv @@ fun current ->
if Magic_option.is_none current then Magic_option.some value
else Retry.later ()

let take mv =
Magic_option.get_unsafe @@ Loc.update mv
@@ fun current ->
if Magic_option.is_none current then Retry.later () else Magic_option.none

let take_opt mv = Magic_option.to_option (Loc.exchange mv Magic_option.none)
24 changes: 24 additions & 0 deletions src/kcas_data/mvar.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
open Kcas

(** Synchronizing variable. *)

(** {1 Common interface} *)

type !'a t
(** The type of a synchronizing variable that may contain a value of type
['a]. *)

val create : unit -> 'a t
(** [create ()] returns a new empty synchronizing variable. *)

(** {1 Compositional interface} *)

module Xt :
Mvar_intf.Ops
with type 'a t := 'a t
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn
(** Explicit transaction passing on synchronizing variables. *)

(** {1 Non-compositional interface} *)

include Mvar_intf.Ops with type 'a t := 'a t with type ('x, 'fn) fn := 'fn
Loading

0 comments on commit f613096

Please sign in to comment.