From 2b87c6bf7a9491ad335e6fdf0f83a03fd965b390 Mon Sep 17 00:00:00 2001 From: Anders Fugmann Date: Mon, 30 Jan 2023 23:20:40 +0100 Subject: [PATCH] Fix stream implementation to ensure that all pending publishers get to publish when stream is closed --- lib/channel.ml | 29 ----- lib/utils.ml | 281 ++++++++++++++++++++++++++--------------------- stress/stress.ml | 2 +- 3 files changed, 157 insertions(+), 155 deletions(-) diff --git a/lib/channel.ml b/lib/channel.ml index 0a9a8c5..f4e84ec 100644 --- a/lib/channel.ml +++ b/lib/channel.ml @@ -218,35 +218,6 @@ let handle_consumer_queue ~service ~flow_queue = in loop true Consumer_set.empty - - -(* This should monitor all consumer queues, and post a message for - state changes every time the queue changes state. - - - Each time a consumer is created we start a fiber to flip between queue full and queue empty. - We send a message (empty|full, consumer_name) to a fiber. - The fiber should keep channel flow state and onblock if blocked and now existing consumers are blocked.Amqp_client_eio - We should just keep a set of consumers. -*) - - -let monitor_queue_length ~service ~receive_stream = - let set_flow ~active = - let Spec.Channel.Flow_ok.{ active=active' } = Spec.Channel.Flow.client_request service ~active () in - assert (active' = active) - in - - let rec loop () = - Stream.wait_full receive_stream; - set_flow ~active:false; - Stream.wait_empty receive_stream; - set_flow ~active:true; - loop () - in - loop () - - let init: type a. sw:Eio.Switch.t -> Connection.t -> a confirm -> a t = fun ~sw connection confirm_type -> let receive_stream = Stream.create () in diff --git a/lib/utils.ml b/lib/utils.ml index 647994a..c72e6e7 100644 --- a/lib/utils.ml +++ b/lib/utils.ml @@ -29,48 +29,89 @@ module Condition = struct Waiters.wake_one t.waiters () end -(** Extension to Eio.Stream, which allows closing the stream. - A closed stream might still hold messages. Once the last message is taken off - a close stream, the stream will raise a user defined exception. +(* A Simpler solution would be an unbounded stream. + - That is not an option. + + - Closing a stream is a signalling. If a consumer closes a stream, we want to + poster to understand this.... Why not wrap the existing stream. That should be simple. + + We have atomic operations - that should work to create a mutable exlusive region. + + Pending publishers are hard to cancel. Which is needed if the channel is full. + - But this means that if the stream is closed by a consumer it _MUST_ be flushed. + + Close posts a close message to the stream (in a fiber) + Flush just reads until receiving the close message + + - All consumers will see the close message and redeliver + - All publishers will succeed. + + New publishes will be stopped. + + Ok. Thats a simple solution which should not require a complete copy of + the existing implementation (The eio implementation is not lock free for capacity > 0) + + - The condition to signal once the queue is empty or full is not needed. + - Because we cannot use it. Receiving messages with ack it should be done + - setting prefetch. + - If we want to signal full then we could to a try_post. + + - We need to extend the implementation with add_nonblocking and add ~force. + - Send patches upstream for this. - Posting to or closing a closed stream will raise same exception. - {1 note} This is a far from perfect implementation: - - Closing a stream must wait until there is room in the stream. - This should be re-written to use a promise for indicating that the channel has been closed. - This way any pending publishers can be cancelled. Still need to figure out how reliably signal consumers. - Well these could wait also wait on the promise. If the promise is ready, they check if there are more messages. - ( Could start with a nonblocking read before checking ). Would be so easy if we had a switch (and could fork). - We only want one message to be posted. + Alternative: + Create a wrapper around the stream implementation: + Closed can be checked before posting. + That region must be guarded - so we dont race. + Does this mean that we will always take a lock. + Damn! If post blocks then we cannot release the lock! + We can approximate, but that will allow messages to arrive after the close message. + - However that may not be a problem. But the close message must not be received before + - all other messages has been consumed - and thats not guaranteed! + +*) + +(** Extension to Eio.Stream, which allows closing the stream. + + Closing a stream will allow all pending publishers to complete. + All pending receivers (=> queue is initially empty) will receive an exception + Posting to or closing a closed stream will raise same exception. *) module Stream = struct open Eio.Private - type 'a item = ('a, exn) result type 'a t = { mutex : Mutex.t; id : Ctf.id; - capacity : int; - readers : 'a item Waiters.t; - writers : (unit, exn) result Waiters.t; + + capacity : int; (* [capacity > 0] *) items : 'a Queue.t; - condition: Condition.t; - mutable flow : bool; (** If false, receiver will be blocked receiving from the queue *) + + (* Readers suspended because [items] is empty. *) + readers : ('a, exn) result Waiters.t; + + (* Writers suspended because [items] is at capacity. *) + writers : unit Waiters.t; + mutable closed : exn option; } - let with_lock l f = - Mutex.lock l; - try - let res = f () in - Mutex.unlock l; - res - with e -> - Mutex.unlock l; - raise e + let with_mutex t f = + Mutex.lock t.mutex; + match f () with + | x -> Mutex.unlock t.mutex; x + | exception ex -> Mutex.unlock t.mutex; raise ex + + (* Invariants *) + let _validate t = + with_mutex t @@ fun () -> + assert (Queue.length t.items <= t.capacity); + assert (Waiters.is_empty t.readers || Queue.is_empty t.items); + assert (Waiters.is_empty t.writers || Queue.length t.items = t.capacity) let create ?(capacity = Int.max_int) () = - assert (capacity >= 0); + assert (capacity > 0); let id = Ctf.mint_id () in Ctf.note_created id Ctf.Stream; { @@ -80,8 +121,6 @@ module Stream = struct items = Queue.create (); readers = Waiters.create (); writers = Waiters.create (); - condition = Condition.create (); - flow = true; closed = None; } @@ -89,13 +128,11 @@ module Stream = struct @raise Closed if the stream has been closed @param force if true, ignore max_capacity and send will not block *) - let send t ?(force=false) item = + let add ?(force=false) t item = Mutex.lock t.mutex; match Waiters.wake_one t.readers (Ok item) with | `Ok -> Mutex.unlock t.mutex | `Queue_empty -> - - (* Raise if the stream has been closed. Note that all waiters will have been awakend at this point, so we can do it here *) let () = match t.closed with | Some exn -> Mutex.unlock t.mutex; @@ -103,140 +140,134 @@ module Stream = struct | None -> () in (* No-one is waiting for an item. Queue it. *) - if (force || Queue.length t.items < t.capacity) then ( + if Queue.length t.items < t.capacity || force then ( Queue.add item t.items; Mutex.unlock t.mutex ) else ( (* The queue is full. Wait for our turn first. *) - Condition.broadcast t.condition; - Suspend.enter_unchecked @@ fun ctx enqueue -> Waiters.await_internal ~mutex:(Some t.mutex) t.writers t.id ctx (fun r -> - (* This is called directly from [wake_one] and so we have the lock. - We're still running in [wake_one]'s domain here. *) - if Result.is_ok r then ( - (* We get here immediately when called by [take], either: - 1. after removing an item, so there is space, or - 2. if [capacity = 0]; [take] will immediately remove the new item. *) - Queue.add item t.items; - ); - let r = match r with - | Error _ as e -> e - | Ok (Ok r) -> Ok r - | Ok (Error _ as e) -> e - in - enqueue r - ) + (* This is called directly from [wake_one] and so we have the lock. + We're still running in [wake_one]'s domain here. *) + if Result.is_ok r then ( + (* We get here immediately when called by [take], after removing an item, + so there is space *) + Queue.add item t.items; + ); + enqueue r + ) ) + let send = add - (** Pop the first element of the stream. - @raise exception if the stream has been closed, and there are no more message on the stream - *) - let receive t = + let take t = Mutex.lock t.mutex; match Queue.take_opt t.items with | None -> - Condition.broadcast t.condition; - (* There aren't any items, so we probably need to wait for one. - However, there's also the special case of a zero-capacity queue to deal with. - [is_empty writers || capacity = 0] *) - begin match Waiters.wake_one t.writers (Ok ()) with - | `Queue_empty -> begin - (* Don't sleep if the queue has been closed *) - let () = match t.closed with - | Some exn -> - Mutex.unlock t.mutex; - raise exn - | None -> () - in - match Waiters.await ~mutex:(Some t.mutex) t.readers t.id with - | Ok x -> x - | Error exn -> raise exn - end - | `Ok -> - (* [capacity = 0] (this is the only way we can get waiters and no items). - [wake_one] has just added an item to the queue; remove it to restore - the invariant before closing the mutex. *) - let x = Queue.take t.items in - Mutex.unlock t.mutex; - x + let () = match t.closed with + | Some exn -> + Mutex.unlock t.mutex; + raise exn + | None -> () + in + begin + (* There aren't any items, so we need to wait for one. *) + match Waiters.await ~mutex:(Some t.mutex) t.readers t.id with + | Ok item -> item + | Error exn -> raise exn end | Some v -> (* If anyone was waiting for space, let the next one go. [is_empty writers || length items = t.capacity - 1] *) - begin match Waiters.wake_one t.writers (Ok ()) with - | `Ok (* [length items = t.capacity] again *) - | `Queue_empty -> () (* [is_empty writers] *) + begin match Waiters.wake_one t.writers () with + | `Ok (* [length items = t.capacity] again *) + | `Queue_empty -> () (* [is_empty writers] *) end; Mutex.unlock t.mutex; v + let receive = take + + let take_nonblocking t = + Mutex.lock t.mutex; + let () = match t.closed with + | Some exn -> + Mutex.unlock t.mutex; + raise exn + | None -> () + in + + match Queue.take_opt t.items with + | None -> Mutex.unlock t.mutex; None (* There aren't any items. *) + | Some v -> + (* If anyone was waiting for space, let the next one go. + [is_empty writers || length items = t.capacity - 1] *) + begin match Waiters.wake_one t.writers () with + | `Ok (* [length items = t.capacity] again *) + | `Queue_empty -> () (* [is_empty writers] *) + end; + Mutex.unlock t.mutex; + Some v + + let length t = + Mutex.lock t.mutex; + let len = Queue.length t.items in + Mutex.unlock t.mutex; + len + let receive_all t = - let item = receive t in - let items = - with_lock t.mutex @@ fun () -> - let rec loop () = - match Queue.take_opt t.items with - | None -> [] - | Some i -> i :: loop () - in + let rec loop () = + match Queue.take_opt t.items with + | None -> [] + | Some i -> i :: loop () + in + let get_all_items () = + Mutex.lock t.mutex; + Waiters.wake_all t.writers (); let items = loop () in - Waiters.wake_all t.writers (Ok ()); - Condition.broadcast t.condition; (* Signal empty *) + Mutex.unlock t.mutex; items in + let item = receive t in + let items = get_all_items () in item :: items - (** Close the stream. - Reading from a closed stream will raise the close exception once empty, if the stream was closed - with [notify_consumers]. Closing an already closed stream does nothing (and will not update the close reason). - - @param message Post a message onto the queue after its closed, guaranteeing that its the last message on the queue (weak guarantee). Note that this might block until the stream has room - - *) let close ?message t reason = - with_lock t.mutex @@ fun () -> + Mutex.lock t.mutex; t.closed <- Some reason; - (* Add the last message to the queue if needed *) + (* Wake all writers. They will just publish the message to the queue *) + Waiters.wake_all t.writers (); + (* If there were pending writers => no pending readers *) + (* If there were pending readers => no pending writers *) + let () = match message with | Some item -> begin match Waiters.wake_one t.readers (Ok item) with | `Ok -> () - | `Queue_empty -> - Queue.add item t.items; + | `Queue_empty -> Queue.add item t.items; end | None -> () in - Waiters.wake_all t.writers (Error reason); + (* If there are pending readers, that mean that the queue is empty. + Since we will not allow new publishes, we signal close to them + *) Waiters.wake_all t.readers (Error reason); - Condition.broadcast t.condition; + Mutex.unlock t.mutex; () let is_empty t = - with_lock t.mutex @@ fun () -> Queue.is_empty t.items + Mutex.lock t.mutex; + let res = Queue.is_empty t.items in + Mutex.unlock t.mutex; + res let is_closed t = - with_lock t.mutex @@ fun () -> Option.is_some t.closed - - let wait ~condition t = - let rec loop () = - Option.iter raise t.closed; - match condition () with - | true -> () - | false -> - Condition.await t.condition t.mutex; - loop () - in - with_lock t.mutex @@ loop - - let wait_empty t = - wait ~condition:(fun () -> Queue.is_empty t.items) t - - let wait_full t = - wait ~condition:(fun () -> Queue.length t.items >= t.capacity) t - - + Mutex.lock t.mutex; + let res = Option.is_some t.closed in + Mutex.unlock t.mutex; + res + let dump f t = + Fmt.pf f "" (length t) t.capacity end open Eio diff --git a/stress/stress.ml b/stress/stress.ml index 02365ba..d661db8 100644 --- a/stress/stress.ml +++ b/stress/stress.ml @@ -24,7 +24,7 @@ let test_amqp env = () ) -(* Create a product on one domain and a consumer on another domain *) +(* Create a producer on one domain and a consumer on another domain *) let () = Eio_main.run test_amqp