Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Additional communication and synchronization primitives #49

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
75 changes: 75 additions & 0 deletions src/kcas_data/ch.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
open Kcas

(* TODO: The semantics are not quite right here *)

type 'a t = {
givers : 'a option Loc.t Queue.t;
takers : [ `Accepting | `Offer of 'a | `Finished ] Loc.t Queue.t;
}

let create () =
let givers = Queue.create () and takers = Queue.create () in
{ givers; takers }

module Xt = struct
let rec try_give ~xt ch value =
match Queue.Xt.take_opt ~xt ch.takers with
| None -> false
| Some slot -> (
match Xt.compare_and_swap ~xt slot `Accepting (`Offer value) with
| `Accepting -> true
| `Finished -> try_give ~xt ch value
| `Offer _ -> Retry.later ())

let rec take_opt ~xt ch =
match Queue.Xt.take_opt ~xt ch.givers with
| None -> None
| Some slot -> (
match Xt.exchange ~xt slot None with
| None -> take_opt ~xt ch
| Some _ as offer -> offer)
end

let give ch value =
let tx ~xt =
if Xt.try_give ~xt ch value then None
else
let offer = Some value in
let slot = Loc.make offer in
Queue.Xt.add ~xt slot ch.givers;
Some (slot, offer)
in
match Kcas.Xt.commit { tx } with
| None -> ()
| Some (slot, offer) -> (
try Loc.get_as (fun offer -> Retry.unless (offer == None)) slot
with exn ->
Loc.compare_and_set slot offer None |> ignore;
raise exn)

let rec take ch =
let tx ~xt =
match Xt.take_opt ~xt ch with
| None ->
let slot = Loc.make `Accepting in
Queue.Xt.add ~xt slot ch.takers;
`Block slot
| Some value -> `Offer value
in
match Kcas.Xt.commit { tx } with
| `Offer value -> value
| `Block slot -> (
let tx ~xt =
match Kcas.Xt.exchange ~xt slot `Finished with
| `Offer value -> Some value
| `Finished -> None
| `Accepting -> Retry.later ()
in
match Kcas.Xt.commit { tx } with
| None -> take ch
| Some value -> value
| exception exn ->
Loc.compare_and_set slot `Accepting `Finished |> ignore;
raise exn)

let take_opt ch = Kcas.Xt.commit { tx = Xt.take_opt ch }
24 changes: 24 additions & 0 deletions src/kcas_data/ch.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
open Kcas

type 'a t
(** Type of a synchronous channel. *)

val create : unit -> 'a t
(** [create ()] returns a new synchronous channel. *)

module Xt : sig
val try_give : xt:'x Xt.t -> 'a t -> 'a -> bool
(** *)

val take_opt : xt:'x Xt.t -> 'a t -> 'a option
(** *)
end

val give : 'a t -> 'a -> unit
(** *)

val take : 'a t -> 'a
(** *)

val take_opt : 'a t -> 'a option
(** *)
22 changes: 22 additions & 0 deletions src/kcas_data/condition.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
type t = Waiters.t

let create = Waiters.create
let signal = Waiters.signal
let broadcast = Waiters.broadcast

let await cond mutex =
let self = Waiters.enqueue cond in
Mutex.unlock mutex;
Fun.protect
(fun () -> Waiters.await self ~on_cancel:signal ~the:cond)
~finally:(fun () ->
(* TODO: This should be protected also from cancellation *)
Mutex.lock mutex)

let await_no_mutex cond =
Waiters.enqueue cond |> Waiters.await ~on_cancel:signal ~the:cond

module Xt = struct
let signal = Waiters.Xt.signal ~on_none:(fun ~xt:_ -> ())
let broadcast = Waiters.Xt.broadcast
end
31 changes: 31 additions & 0 deletions src/kcas_data/condition.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
open Kcas

(** Condition variable. *)

(** {1 Common interface} *)

type t
(** *)

val create : unit -> t
(** *)

(** {1 Compositional interface} *)

module Xt :
Condition_intf.Ops
with type t := t
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn

(** {1 Non-compositional interface} *)

include Condition_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn

val await : t -> Mutex.t -> unit
(** *)

val await_no_mutex : t -> unit
(** *)

val broadcast : t -> unit
(** *)
10 changes: 10 additions & 0 deletions src/kcas_data/condition_intf.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,10 @@
module type Ops = sig
type t
type ('x, 'fn) fn

val signal : ('x, t -> unit) fn
(** *)

val broadcast : ('x, t -> unit) fn
(** *)
end
5 changes: 5 additions & 0 deletions src/kcas_data/kcas_data.ml
Original file line number Diff line number Diff line change
Expand Up @@ -3,5 +3,10 @@ module Queue = Queue
module Stack = Stack
module Mvar = Mvar
module Promise = Promise
module Ch = Ch
module Condition = Condition
module Mutex = Mutex
module Semaphore = Semaphore
module Stream = Stream
module Dllist = Dllist
module Accumulator = Accumulator
96 changes: 96 additions & 0 deletions src/kcas_data/mutex.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,96 @@
open Kcas

type t = {
state : [ `Unlocked | `Locked | `Poisoned of exn ] Loc.t;
waiters : Waiters.t;
}

exception Poisoned of exn

let poisoned exn = raise @@ Poisoned exn [@@inline never]

let create () =
let state = Loc.make `Unlocked and waiters = Waiters.create () in
{ state; waiters }

let unlock mutex =
let tx ~xt =
match Xt.get ~xt mutex.state with
| `Locked ->
Waiters.Xt.signal ~xt mutex.waiters
~on_none:(Xt.set mutex.state `Unlocked);
None
| `Unlocked ->
let exn = Sys_error "Mutex.unlock: already unlocked!" in
Xt.set ~xt mutex.state @@ `Poisoned exn;
Some exn
| `Poisoned exn -> Some (Poisoned exn)
in
Xt.commit { tx } |> Option.iter raise

let lock mutex =
let tx ~xt =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> None
| `Locked -> Some (Waiters.Xt.enqueue ~xt mutex.waiters)
| `Poisoned exn -> poisoned exn
in
Xt.commit { tx } |> Option.iter (Waiters.await ~on_cancel:unlock ~the:mutex)

let try_lock mutex =
match
Loc.update mutex.state @@ function
| `Unlocked -> `Locked
| (`Locked | `Poisoned _) as other -> other
with
| `Unlocked -> true
| `Locked -> false
| `Poisoned exn -> poisoned exn

let poison mutex exn =
match
Loc.update mutex.state @@ function
| `Locked | `Unlocked -> `Poisoned exn
| `Poisoned _ as poisoned -> poisoned
with
| `Locked | `Unlocked -> Waiters.broadcast mutex.waiters
| `Poisoned _ -> ()

let use_rw mutex thunk =
lock mutex;
match thunk () with
| value ->
unlock mutex;
value
| exception exn ->
poison mutex exn;
raise exn

let use_ro mutex thunk =
lock mutex;
Fun.protect ~finally:(fun () -> unlock mutex) thunk

module Xt = struct
let lock ~xt mutex =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> ()
| `Locked -> Retry.later ()
| `Poisoned exn -> poisoned exn

let try_lock ~xt mutex =
match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with
| `Unlocked -> true
| `Locked -> false
| `Poisoned exn -> poisoned exn

let unlock ~xt mutex =
match Xt.get ~xt mutex.state with
| `Locked ->
Waiters.Xt.signal ~xt mutex.waiters
~on_none:(Xt.set mutex.state `Unlocked)
| `Unlocked ->
let exn = Sys_error "Mutex.unlock: already unlocked!" in
Xt.set ~xt mutex.state @@ `Poisoned exn;
raise exn
| `Poisoned exn -> poisoned exn
end
46 changes: 46 additions & 0 deletions src/kcas_data/mutex.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,46 @@
open Kcas

(** Mutual exclusion. *)

(* TODO: Consider ordering of mutexes *)

(** {1 Common interface} *)

exception Poisoned of exn
(** *)

type t
(** *)

val create : unit -> t
(** *)

(** {1 Compositional interface} *)

module Xt : sig
val lock : xt:'x Xt.t -> t -> unit
(** *)

val try_lock : xt:'x Xt.t -> t -> bool
(** *)

val unlock : xt:'x Xt.t -> t -> unit
(** *)
end

(** {1 Non-compositional interface} *)

val lock : t -> unit
(** *)

val try_lock : t -> bool
(** *)

val unlock : t -> unit
(** *)

val use_rw : t -> (unit -> 'a) -> 'a
(** *)

val use_ro : t -> (unit -> 'a) -> 'a
(** *)
31 changes: 31 additions & 0 deletions src/kcas_data/semaphore.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
open Kcas

let non_neg_dec n = if 0 < n then n - 1 else n

type t = { count : int Loc.t; waiters : Waiters.t }

let make n =
if n < 0 then invalid_arg "n < 0";
let count = Loc.make n and waiters = Waiters.create () in
{ count; waiters }

let get_value sem = Loc.get sem.count

module Xt = struct
let get_value ~xt sem = Xt.get ~xt sem.count

let release ~xt sem =
Waiters.Xt.signal ~xt sem.waiters ~on_none:(Xt.incr sem.count)

let acquire ~xt sem = Retry.unless (0 < Xt.fetch_and_add ~xt sem.count (-1))
end

let release sem = Kcas.Xt.commit { tx = Xt.release sem }

let acquire sem =
let tx ~xt =
if 0 < Kcas.Xt.update ~xt sem.count non_neg_dec then None
else Some (Waiters.Xt.enqueue ~xt sem.waiters)
in
Kcas.Xt.commit { tx }
|> Option.iter (Waiters.await ~on_cancel:release ~the:sem)
23 changes: 23 additions & 0 deletions src/kcas_data/semaphore.mli
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
open Kcas

(** *)

(** {1 Common interface} *)

type t
(** *)

val make : int -> t
(** *)

(** {1 Compositional interface} *)

module Xt :
Semaphore_intf.Ops
with type t := t
with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn
(** Explicit transaction log passing on semaphores. *)

(** {1 Non-compositional interface} *)

include Semaphore_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn
13 changes: 13 additions & 0 deletions src/kcas_data/semaphore_intf.ml
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
module type Ops = sig
type t
type ('x, 'fn) fn

val release : ('x, t -> unit) fn
(** *)

val acquire : ('x, t -> unit) fn
(** *)

val get_value : ('x, t -> int) fn
(** *)
end
Loading
Loading