From 11afa8ff853c1a18fbd9aa80ede41acd39d5eb5a Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Fri, 11 Feb 2022 12:56:44 +0000 Subject: [PATCH] Inline Flow.run --- src/Equinox/{Flow.fs => Core.fs} | 37 -------------------------------- src/Equinox/Decider.fs | 37 +++++++++++++++++++++++--------- src/Equinox/Equinox.fsproj | 2 +- 3 files changed, 28 insertions(+), 48 deletions(-) rename src/Equinox/{Flow.fs => Core.fs} (50%) diff --git a/src/Equinox/Flow.fs b/src/Equinox/Core.fs similarity index 50% rename from src/Equinox/Flow.fs rename to src/Equinox/Core.fs index 9e065f161..cf00d40ac 100755 --- a/src/Equinox/Flow.fs +++ b/src/Equinox/Core.fs @@ -26,40 +26,3 @@ and [] SyncResult<'state> = /// Store-specific opaque token to be used for synchronization purposes and [] StreamToken = { value : obj; version: int64 } - -/// Internal implementation of the Optimistic Concurrency Control loop within which a decider function runs. See Decider.fs for App-facing APIs. -module internal Flow = - - /// Process a command, ensuring a consistent final state is established on the stream. - /// 1. make a decision predicated on the known state - /// 2a. if no changes required, exit with known state - /// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state - /// 2c. if saved without conflict, exit with updated state - let transact (originState : StreamToken * 'state) - (decide : StreamToken * 'state -> Async<'result * 'event list>) - (log : Serilog.ILogger) - (trySync : Serilog.ILogger * StreamToken * 'state * 'event list -> Async>) - (maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) - (mapResult : 'result -> StreamToken * 'state -> 'view) : Async<'view> = - - // Runs one decision loop, potentially recursing with resynced state if there's a conflict on the write - let rec loop (token, state) attempt : Async<'view> = async { - let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) - match! decide (token, state) with - | result, [] -> - log.Debug "No events generated" - return mapResult result (token, state) - | result, events -> - match! trySync (log, token, state, events) with - | SyncResult.Conflict resync -> - if attempt <> maxSyncAttempts then - let! streamState' = resyncRetryPolicy log attempt resync - log.Debug "Resyncing and retrying" - return! loop streamState' (attempt + 1) - else - log.Debug "Max Sync Attempts exceeded" - return raise (createMaxAttemptsExhaustedException attempt) - | SyncResult.Written (token', streamState') -> - return mapResult result (token', streamState') } - // Commence, processing based on the incoming state - loop originState 1 diff --git a/src/Equinox/Decider.fs b/src/Equinox/Decider.fs index b0708a32b..468915079 100755 --- a/src/Equinox/Decider.fs +++ b/src/Equinox/Decider.fs @@ -1,6 +1,5 @@ namespace Equinox -open Equinox.Core open System.Runtime.InteropServices /// Exception yielded by Decider.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store @@ -9,12 +8,12 @@ type MaxResyncsExhaustedException(count) = /// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic type Decider<'event, 'state> - ( log, stream : IStream<'event, 'state>, maxAttempts : int, + ( log, stream : Core.IStream<'event, 'state>, maxAttempts : int, [] ?createAttemptsExhaustedException : int -> exn, [] ?resyncPolicy) = do if maxAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxAttempts", maxAttempts, "should be >= 1") - let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async) = function + let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async) = function | None | Some RequireLoad -> fun log -> stream.Load(log, allowStale = false) | Some AllowStale -> fun log -> stream.Load(log, allowStale = true) | Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() } @@ -23,12 +22,30 @@ type Decider<'event, 'state> let! tokenAndState = fetch maybeOption log return project tokenAndState } let transact maybeOption decide mapResult = async { - let! tokenAndState = fetch maybeOption log - let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF }) + let! originTokenAndState = fetch maybeOption log + let resyncRetryPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF }) let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn - let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException - return! Flow.transact tokenAndState decide log stream.TrySync (maxAttempts, resyncPolicy, createAttemptsExhaustedException) mapResult } - let (|Context|) (token, state) = + let createMaxAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException + let rec loop (token, state) attempt : Async<'view> = async { + let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt) + match! decide (token, state) with + | result, [] -> + log.Debug "No events generated" + return mapResult result (token, state) + | result, events -> + match! stream.TrySync (log, token, state, events) with + | Core.SyncResult.Conflict resync -> + if attempt <> maxAttempts then + let! streamState' = resyncRetryPolicy log attempt resync + log.Debug "Resyncing and retrying" + return! loop streamState' (attempt + 1) + else + log.Debug "Max Sync Attempts exceeded" + return raise (createMaxAttemptsExhaustedException attempt) + | Core.SyncResult.Written (token', streamState') -> + return mapResult result (token', streamState') } + return! loop originTokenAndState 1 } + let (|Context|) (token : Core.StreamToken, state) = { new ISyncContext<'state> with member _.State = state member _.Version = token.version @@ -81,7 +98,7 @@ and [] LoadOption<'state> = /// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet, and we will be generating events | AssumeEmpty /// Instead of loading from database, seed the loading process with the supplied memento, obtained via ISyncContext.CreateMemento() - | FromMemento of memento : (StreamToken * 'state) + | FromMemento of memento : (Core.StreamToken * 'state) /// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required and ISyncContext<'state> = @@ -96,4 +113,4 @@ and ISyncContext<'state> = abstract member State : 'state /// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via LoadOption.FromMemento - abstract member CreateMemento : unit -> StreamToken * 'state + abstract member CreateMemento : unit -> Core.StreamToken * 'state diff --git a/src/Equinox/Equinox.fsproj b/src/Equinox/Equinox.fsproj index f78de642a..817d542c2 100644 --- a/src/Equinox/Equinox.fsproj +++ b/src/Equinox/Equinox.fsproj @@ -9,7 +9,7 @@ - +