From 3c43e3dd2fdbbccf77b8c491e0b707fdd24ddb93 Mon Sep 17 00:00:00 2001 From: Vesa Karvonen Date: Wed, 12 Apr 2023 00:03:08 +0300 Subject: [PATCH] Additional communication and synchronization primitives --- src/kcas_data/ch.ml | 75 ++++++++++++++++++++++++++ src/kcas_data/ch.mli | 24 +++++++++ src/kcas_data/condition.ml | 22 ++++++++ src/kcas_data/condition.mli | 31 +++++++++++ src/kcas_data/condition_intf.ml | 10 ++++ src/kcas_data/kcas_data.ml | 5 ++ src/kcas_data/mutex.ml | 96 +++++++++++++++++++++++++++++++++ src/kcas_data/mutex.mli | 46 ++++++++++++++++ src/kcas_data/semaphore.ml | 31 +++++++++++ src/kcas_data/semaphore.mli | 23 ++++++++ src/kcas_data/semaphore_intf.ml | 13 +++++ src/kcas_data/stream.ml | 82 ++++++++++++++++++++++++++++ src/kcas_data/stream.mli | 25 +++++++++ src/kcas_data/waiters.ml | 54 +++++++++++++++++++ 14 files changed, 537 insertions(+) create mode 100644 src/kcas_data/ch.ml create mode 100644 src/kcas_data/ch.mli create mode 100644 src/kcas_data/condition.ml create mode 100644 src/kcas_data/condition.mli create mode 100644 src/kcas_data/condition_intf.ml create mode 100644 src/kcas_data/mutex.ml create mode 100644 src/kcas_data/mutex.mli create mode 100644 src/kcas_data/semaphore.ml create mode 100644 src/kcas_data/semaphore.mli create mode 100644 src/kcas_data/semaphore_intf.ml create mode 100644 src/kcas_data/stream.ml create mode 100644 src/kcas_data/stream.mli create mode 100644 src/kcas_data/waiters.ml diff --git a/src/kcas_data/ch.ml b/src/kcas_data/ch.ml new file mode 100644 index 00000000..4fb17cc1 --- /dev/null +++ b/src/kcas_data/ch.ml @@ -0,0 +1,75 @@ +open Kcas + +(* TODO: The semantics are not quite right here *) + +type 'a t = { + givers : 'a option Loc.t Queue.t; + takers : [ `Accepting | `Offer of 'a | `Finished ] Loc.t Queue.t; +} + +let create () = + let givers = Queue.create () and takers = Queue.create () in + { givers; takers } + +module Xt = struct + let rec try_give ~xt ch value = + match Queue.Xt.take_opt ~xt ch.takers with + | None -> false + | Some slot -> ( + match Xt.compare_and_swap ~xt slot `Accepting (`Offer value) with + | `Accepting -> true + | `Finished -> try_give ~xt ch value + | `Offer _ -> Retry.later ()) + + let rec take_opt ~xt ch = + match Queue.Xt.take_opt ~xt ch.givers with + | None -> None + | Some slot -> ( + match Xt.exchange ~xt slot None with + | None -> take_opt ~xt ch + | Some _ as offer -> offer) +end + +let give ch value = + let tx ~xt = + if Xt.try_give ~xt ch value then None + else + let offer = Some value in + let slot = Loc.make offer in + Queue.Xt.add ~xt slot ch.givers; + Some (slot, offer) + in + match Kcas.Xt.commit { tx } with + | None -> () + | Some (slot, offer) -> ( + try Loc.get_as (fun offer -> Retry.unless (offer == None)) slot + with exn -> + Loc.compare_and_set slot offer None |> ignore; + raise exn) + +let rec take ch = + let tx ~xt = + match Xt.take_opt ~xt ch with + | None -> + let slot = Loc.make `Accepting in + Queue.Xt.add ~xt slot ch.takers; + `Block slot + | Some value -> `Offer value + in + match Kcas.Xt.commit { tx } with + | `Offer value -> value + | `Block slot -> ( + let tx ~xt = + match Kcas.Xt.exchange ~xt slot `Finished with + | `Offer value -> Some value + | `Finished -> None + | `Accepting -> Retry.later () + in + match Kcas.Xt.commit { tx } with + | None -> take ch + | Some value -> value + | exception exn -> + Loc.compare_and_set slot `Accepting `Finished |> ignore; + raise exn) + +let take_opt ch = Kcas.Xt.commit { tx = Xt.take_opt ch } diff --git a/src/kcas_data/ch.mli b/src/kcas_data/ch.mli new file mode 100644 index 00000000..6abd1d21 --- /dev/null +++ b/src/kcas_data/ch.mli @@ -0,0 +1,24 @@ +open Kcas + +type 'a t +(** Type of a synchronous channel. *) + +val create : unit -> 'a t +(** [create ()] returns a new synchronous channel. *) + +module Xt : sig + val try_give : xt:'x Xt.t -> 'a t -> 'a -> bool + (** *) + + val take_opt : xt:'x Xt.t -> 'a t -> 'a option + (** *) +end + +val give : 'a t -> 'a -> unit +(** *) + +val take : 'a t -> 'a +(** *) + +val take_opt : 'a t -> 'a option +(** *) diff --git a/src/kcas_data/condition.ml b/src/kcas_data/condition.ml new file mode 100644 index 00000000..a99f74c6 --- /dev/null +++ b/src/kcas_data/condition.ml @@ -0,0 +1,22 @@ +type t = Waiters.t + +let create = Waiters.create +let signal = Waiters.signal +let broadcast = Waiters.broadcast + +let await cond mutex = + let self = Waiters.enqueue cond in + Mutex.unlock mutex; + Fun.protect + (fun () -> Waiters.await self ~on_cancel:signal ~the:cond) + ~finally:(fun () -> + (* TODO: This should be protected also from cancellation *) + Mutex.lock mutex) + +let await_no_mutex cond = + Waiters.enqueue cond |> Waiters.await ~on_cancel:signal ~the:cond + +module Xt = struct + let signal = Waiters.Xt.signal ~on_none:(fun ~xt:_ -> ()) + let broadcast = Waiters.Xt.broadcast +end diff --git a/src/kcas_data/condition.mli b/src/kcas_data/condition.mli new file mode 100644 index 00000000..aed421a1 --- /dev/null +++ b/src/kcas_data/condition.mli @@ -0,0 +1,31 @@ +open Kcas + +(** Condition variable. *) + +(** {1 Common interface} *) + +type t +(** *) + +val create : unit -> t +(** *) + +(** {1 Compositional interface} *) + +module Xt : + Condition_intf.Ops + with type t := t + with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn + +(** {1 Non-compositional interface} *) + +include Condition_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn + +val await : t -> Mutex.t -> unit +(** *) + +val await_no_mutex : t -> unit +(** *) + +val broadcast : t -> unit +(** *) diff --git a/src/kcas_data/condition_intf.ml b/src/kcas_data/condition_intf.ml new file mode 100644 index 00000000..80985390 --- /dev/null +++ b/src/kcas_data/condition_intf.ml @@ -0,0 +1,10 @@ +module type Ops = sig + type t + type ('x, 'fn) fn + + val signal : ('x, t -> unit) fn + (** *) + + val broadcast : ('x, t -> unit) fn + (** *) +end diff --git a/src/kcas_data/kcas_data.ml b/src/kcas_data/kcas_data.ml index 4176883e..1a8f0d9f 100644 --- a/src/kcas_data/kcas_data.ml +++ b/src/kcas_data/kcas_data.ml @@ -3,5 +3,10 @@ module Queue = Queue module Stack = Stack module Mvar = Mvar module Promise = Promise +module Ch = Ch +module Condition = Condition +module Mutex = Mutex +module Semaphore = Semaphore +module Stream = Stream module Dllist = Dllist module Accumulator = Accumulator diff --git a/src/kcas_data/mutex.ml b/src/kcas_data/mutex.ml new file mode 100644 index 00000000..956fa822 --- /dev/null +++ b/src/kcas_data/mutex.ml @@ -0,0 +1,96 @@ +open Kcas + +type t = { + state : [ `Unlocked | `Locked | `Poisoned of exn ] Loc.t; + waiters : Waiters.t; +} + +exception Poisoned of exn + +let poisoned exn = raise @@ Poisoned exn [@@inline never] + +let create () = + let state = Loc.make `Unlocked and waiters = Waiters.create () in + { state; waiters } + +let unlock mutex = + let tx ~xt = + match Xt.get ~xt mutex.state with + | `Locked -> + Waiters.Xt.signal ~xt mutex.waiters + ~on_none:(Xt.set mutex.state `Unlocked); + None + | `Unlocked -> + let exn = Sys_error "Mutex.unlock: already unlocked!" in + Xt.set ~xt mutex.state @@ `Poisoned exn; + Some exn + | `Poisoned exn -> Some (Poisoned exn) + in + Xt.commit { tx } |> Option.iter raise + +let lock mutex = + let tx ~xt = + match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with + | `Unlocked -> None + | `Locked -> Some (Waiters.Xt.enqueue ~xt mutex.waiters) + | `Poisoned exn -> poisoned exn + in + Xt.commit { tx } |> Option.iter (Waiters.await ~on_cancel:unlock ~the:mutex) + +let try_lock mutex = + match + Loc.update mutex.state @@ function + | `Unlocked -> `Locked + | (`Locked | `Poisoned _) as other -> other + with + | `Unlocked -> true + | `Locked -> false + | `Poisoned exn -> poisoned exn + +let poison mutex exn = + match + Loc.update mutex.state @@ function + | `Locked | `Unlocked -> `Poisoned exn + | `Poisoned _ as poisoned -> poisoned + with + | `Locked | `Unlocked -> Waiters.broadcast mutex.waiters + | `Poisoned _ -> () + +let use_rw mutex thunk = + lock mutex; + match thunk () with + | value -> + unlock mutex; + value + | exception exn -> + poison mutex exn; + raise exn + +let use_ro mutex thunk = + lock mutex; + Fun.protect ~finally:(fun () -> unlock mutex) thunk + +module Xt = struct + let lock ~xt mutex = + match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with + | `Unlocked -> () + | `Locked -> Retry.later () + | `Poisoned exn -> poisoned exn + + let try_lock ~xt mutex = + match Xt.compare_and_swap ~xt mutex.state `Unlocked `Locked with + | `Unlocked -> true + | `Locked -> false + | `Poisoned exn -> poisoned exn + + let unlock ~xt mutex = + match Xt.get ~xt mutex.state with + | `Locked -> + Waiters.Xt.signal ~xt mutex.waiters + ~on_none:(Xt.set mutex.state `Unlocked) + | `Unlocked -> + let exn = Sys_error "Mutex.unlock: already unlocked!" in + Xt.set ~xt mutex.state @@ `Poisoned exn; + raise exn + | `Poisoned exn -> poisoned exn +end diff --git a/src/kcas_data/mutex.mli b/src/kcas_data/mutex.mli new file mode 100644 index 00000000..a426bc8c --- /dev/null +++ b/src/kcas_data/mutex.mli @@ -0,0 +1,46 @@ +open Kcas + +(** Mutual exclusion. *) + +(* TODO: Consider ordering of mutexes *) + +(** {1 Common interface} *) + +exception Poisoned of exn +(** *) + +type t +(** *) + +val create : unit -> t +(** *) + +(** {1 Compositional interface} *) + +module Xt : sig + val lock : xt:'x Xt.t -> t -> unit + (** *) + + val try_lock : xt:'x Xt.t -> t -> bool + (** *) + + val unlock : xt:'x Xt.t -> t -> unit + (** *) +end + +(** {1 Non-compositional interface} *) + +val lock : t -> unit +(** *) + +val try_lock : t -> bool +(** *) + +val unlock : t -> unit +(** *) + +val use_rw : t -> (unit -> 'a) -> 'a +(** *) + +val use_ro : t -> (unit -> 'a) -> 'a +(** *) diff --git a/src/kcas_data/semaphore.ml b/src/kcas_data/semaphore.ml new file mode 100644 index 00000000..c62a97a1 --- /dev/null +++ b/src/kcas_data/semaphore.ml @@ -0,0 +1,31 @@ +open Kcas + +let non_neg_dec n = if 0 < n then n - 1 else n + +type t = { count : int Loc.t; waiters : Waiters.t } + +let make n = + if n < 0 then invalid_arg "n < 0"; + let count = Loc.make n and waiters = Waiters.create () in + { count; waiters } + +let get_value sem = Loc.get sem.count + +module Xt = struct + let get_value ~xt sem = Xt.get ~xt sem.count + + let release ~xt sem = + Waiters.Xt.signal ~xt sem.waiters ~on_none:(Xt.incr sem.count) + + let acquire ~xt sem = Retry.unless (0 < Xt.fetch_and_add ~xt sem.count (-1)) +end + +let release sem = Kcas.Xt.commit { tx = Xt.release sem } + +let acquire sem = + let tx ~xt = + if 0 < Kcas.Xt.update ~xt sem.count non_neg_dec then None + else Some (Waiters.Xt.enqueue ~xt sem.waiters) + in + Kcas.Xt.commit { tx } + |> Option.iter (Waiters.await ~on_cancel:release ~the:sem) diff --git a/src/kcas_data/semaphore.mli b/src/kcas_data/semaphore.mli new file mode 100644 index 00000000..04632ba4 --- /dev/null +++ b/src/kcas_data/semaphore.mli @@ -0,0 +1,23 @@ +open Kcas + +(** *) + +(** {1 Common interface} *) + +type t +(** *) + +val make : int -> t +(** *) + +(** {1 Compositional interface} *) + +module Xt : + Semaphore_intf.Ops + with type t := t + with type ('x, 'fn) fn := xt:'x Xt.t -> 'fn +(** Explicit transaction log passing on semaphores. *) + +(** {1 Non-compositional interface} *) + +include Semaphore_intf.Ops with type t := t with type ('x, 'fn) fn := 'fn diff --git a/src/kcas_data/semaphore_intf.ml b/src/kcas_data/semaphore_intf.ml new file mode 100644 index 00000000..8d97c1f7 --- /dev/null +++ b/src/kcas_data/semaphore_intf.ml @@ -0,0 +1,13 @@ +module type Ops = sig + type t + type ('x, 'fn) fn + + val release : ('x, t -> unit) fn + (** *) + + val acquire : ('x, t -> unit) fn + (** *) + + val get_value : ('x, t -> int) fn + (** *) +end diff --git a/src/kcas_data/stream.ml b/src/kcas_data/stream.ml new file mode 100644 index 00000000..6e786610 --- /dev/null +++ b/src/kcas_data/stream.ml @@ -0,0 +1,82 @@ +open Kcas + +module Bounded = struct + type 'a t = { capacity : int; queue : 'a Queue.t } + + let create capacity = { capacity; queue = Queue.create () } + let capacity_of bounded = bounded.capacity + let is_empty bounded = Queue.is_empty bounded.queue + let length bounded = Queue.length bounded.queue + + let add bounded value = + let tx ~xt = + Retry.unless (Queue.Xt.length ~xt bounded.queue < bounded.capacity); + Queue.Xt.add ~xt value bounded.queue + in + Xt.commit { tx } + + let take bounded = Queue.take_blocking bounded.queue + let take_nonblocking bounded = Queue.take_opt bounded.queue +end + +module Unbounded = struct + type 'a t = 'a Queue.t + + let create () = Queue.create () + let capacity_of _ = Int.max_int + let is_empty = Queue.is_empty + let length = Queue.length + let add stream value = Queue.add value stream + let take = Queue.take_blocking + let take_nonblocking = Queue.take_opt +end + +type 'a t = + | Ch of 'a Ch.t + | Mvar of 'a Mvar.t + | Bounded of 'a Bounded.t + | Unbounded of 'a Unbounded.t + +let create capacity = + if capacity <= 1 then + if capacity < 0 then invalid_arg "capacity < 0" + else if capacity = 0 then Ch (Ch.create ()) + else Mvar (Mvar.create None) + else if capacity = Int.max_int then Unbounded (Unbounded.create ()) + else Bounded (Bounded.create capacity) + +let capacity_of = function + | Ch _ -> 0 + | Mvar _ -> 1 + | Bounded bounded -> Bounded.capacity_of bounded + | Unbounded unbounded -> Unbounded.capacity_of unbounded + +let is_empty = function + | Ch _ -> true + | Mvar mvar -> Mvar.is_empty mvar + | Bounded bounded -> Bounded.is_empty bounded + | Unbounded unbounded -> Unbounded.is_empty unbounded + +let length = function + | Ch _ -> 0 + | Mvar mvar -> Bool.to_int (not (Mvar.is_empty mvar)) + | Bounded bounded -> Bounded.length bounded + | Unbounded unbounded -> Unbounded.length unbounded + +let add = function + | Ch ch -> Ch.give ch + | Mvar mvar -> Mvar.put mvar + | Bounded bounded -> Bounded.add bounded + | Unbounded unbounded -> Unbounded.add unbounded + +let take = function + | Ch ch -> Ch.take ch + | Mvar mvar -> Mvar.take mvar + | Bounded bounded -> Bounded.take bounded + | Unbounded unbounded -> Unbounded.take unbounded + +let take_nonblocking = function + | Ch ch -> Ch.take_opt ch + | Mvar mvar -> Mvar.take_opt mvar + | Bounded bounded -> Bounded.take_nonblocking bounded + | Unbounded unbounded -> Unbounded.take_nonblocking unbounded diff --git a/src/kcas_data/stream.mli b/src/kcas_data/stream.mli new file mode 100644 index 00000000..b0c1eebb --- /dev/null +++ b/src/kcas_data/stream.mli @@ -0,0 +1,25 @@ +(** *) + +type !'a t +(** *) + +val create : int -> 'a t +(** *) + +val capacity_of : 'a t -> int +(** *) + +val is_empty : 'a t -> bool +(** *) + +val length : 'a t -> int +(** *) + +val add : 'a t -> 'a -> unit +(** *) + +val take : 'a t -> 'a +(** *) + +val take_nonblocking : 'a t -> 'a option +(** *) diff --git a/src/kcas_data/waiters.ml b/src/kcas_data/waiters.ml new file mode 100644 index 00000000..2178de03 --- /dev/null +++ b/src/kcas_data/waiters.ml @@ -0,0 +1,54 @@ +open Kcas + +type t = [ `Awaiting | `Signaled ] Loc.t Queue.t + +let create () = Queue.create () + +let await ~on_cancel ~the self = + try Loc.get_as (fun s -> Retry.unless (s == `Signaled)) self + with exn -> + (* We have been canceled, so try to mark as signaled or release another. + The node is left in the queue and repeated cancellations could cause a + space leak. Another alternative would be to use a doubly-linked list. *) + if not (Loc.compare_and_set self `Awaiting `Signaled) then on_cancel the; + raise exn + +let rec signal waiters = + Queue.take_opt waiters + |> Option.iter @@ fun awaiter -> + if not (Loc.compare_and_set awaiter `Awaiting `Signaled) then + signal waiters + +let broadcast waiters = + Queue.take_all waiters + |> Seq.iter @@ fun awaiter -> + Loc.compare_and_set awaiter `Awaiting `Signaled |> ignore + +let enqueue waiters = + let self = Loc.make `Awaiting in + Queue.add self waiters; + self + +module Xt = struct + let enqueue ~xt waiters = + let self = Loc.make `Awaiting in + Queue.Xt.add ~xt self waiters; + self + + let rec signal ~xt ~on_none waiters = + match Queue.Xt.take_opt ~xt waiters with + | None -> on_none ~xt + | Some awaiter -> + if Xt.get ~xt awaiter != `Signaled then + Xt.post_commit ~xt @@ fun () -> Loc.set awaiter `Signaled + else + (* Apparently the awaiter was canceled, so signal another. *) + signal ~xt ~on_none waiters + + let broadcast ~xt t = + let awaiters = Queue.Xt.take_all ~xt t in + Xt.post_commit ~xt @@ fun () -> + awaiters + |> Seq.iter @@ fun awaiter -> + Loc.compare_and_set awaiter `Awaiting `Signaled |> ignore +end