Skip to content

Commit

Permalink
Inline Flow.run
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Feb 11, 2022
1 parent bf6e8be commit 11afa8f
Show file tree
Hide file tree
Showing 3 changed files with 28 additions and 48 deletions.
37 changes: 0 additions & 37 deletions src/Equinox/Flow.fs → src/Equinox/Core.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,40 +26,3 @@ and [<NoEquality; NoComparison; RequireQualifiedAccess>] SyncResult<'state> =

/// Store-specific opaque token to be used for synchronization purposes
and [<NoComparison>] StreamToken = { value : obj; version: int64 }

/// Internal implementation of the Optimistic Concurrency Control loop within which a decider function runs. See Decider.fs for App-facing APIs.
module internal Flow =

/// Process a command, ensuring a consistent final state is established on the stream.
/// 1. make a decision predicated on the known state
/// 2a. if no changes required, exit with known state
/// 2b. if conflicting changes, retry by recommencing at step 1 with the updated state
/// 2c. if saved without conflict, exit with updated state
let transact (originState : StreamToken * 'state)
(decide : StreamToken * 'state -> Async<'result * 'event list>)
(log : Serilog.ILogger)
(trySync : Serilog.ILogger * StreamToken * 'state * 'event list -> Async<SyncResult<'state>>)
(maxSyncAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException)
(mapResult : 'result -> StreamToken * 'state -> 'view) : Async<'view> =

// Runs one decision loop, potentially recursing with resynced state if there's a conflict on the write
let rec loop (token, state) attempt : Async<'view> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
match! decide (token, state) with
| result, [] ->
log.Debug "No events generated"
return mapResult result (token, state)
| result, events ->
match! trySync (log, token, state, events) with
| SyncResult.Conflict resync ->
if attempt <> maxSyncAttempts then
let! streamState' = resyncRetryPolicy log attempt resync
log.Debug "Resyncing and retrying"
return! loop streamState' (attempt + 1)
else
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
| SyncResult.Written (token', streamState') ->
return mapResult result (token', streamState') }
// Commence, processing based on the incoming state
loop originState 1
37 changes: 27 additions & 10 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
@@ -1,6 +1,5 @@
namespace Equinox

open Equinox.Core
open System.Runtime.InteropServices

/// Exception yielded by Decider.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store
Expand All @@ -9,12 +8,12 @@ type MaxResyncsExhaustedException(count) =

/// Central Application-facing API. Wraps the handling of decision or query flows in a manner that is store agnostic
type Decider<'event, 'state>
( log, stream : IStream<'event, 'state>, maxAttempts : int,
( log, stream : Core.IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException : int -> exn,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy) =

do if maxAttempts < 1 then raise <| System.ArgumentOutOfRangeException("maxAttempts", maxAttempts, "should be >= 1")
let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async<StreamToken * 'state>) = function
let fetch : LoadOption<'state> option -> (Serilog.ILogger -> Async<Core.StreamToken * 'state>) = function
| None | Some RequireLoad -> fun log -> stream.Load(log, allowStale = false)
| Some AllowStale -> fun log -> stream.Load(log, allowStale = true)
| Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() }
Expand All @@ -23,12 +22,30 @@ type Decider<'event, 'state>
let! tokenAndState = fetch maybeOption log
return project tokenAndState }
let transact maybeOption decide mapResult = async {
let! tokenAndState = fetch maybeOption log
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let! originTokenAndState = fetch maybeOption log
let resyncRetryPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
return! Flow.transact tokenAndState decide log stream.TrySync (maxAttempts, resyncPolicy, createAttemptsExhaustedException) mapResult }
let (|Context|) (token, state) =
let createMaxAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
let rec loop (token, state) attempt : Async<'view> = async {
let log = if attempt = 1 then log else log.ForContext("syncAttempt", attempt)
match! decide (token, state) with
| result, [] ->
log.Debug "No events generated"
return mapResult result (token, state)
| result, events ->
match! stream.TrySync (log, token, state, events) with
| Core.SyncResult.Conflict resync ->
if attempt <> maxAttempts then
let! streamState' = resyncRetryPolicy log attempt resync
log.Debug "Resyncing and retrying"
return! loop streamState' (attempt + 1)
else
log.Debug "Max Sync Attempts exceeded"
return raise (createMaxAttemptsExhaustedException attempt)
| Core.SyncResult.Written (token', streamState') ->
return mapResult result (token', streamState') }
return! loop originTokenAndState 1 }
let (|Context|) (token : Core.StreamToken, state) =
{ new ISyncContext<'state> with
member _.State = state
member _.Version = token.version
Expand Down Expand Up @@ -81,7 +98,7 @@ and [<NoComparison; NoEquality>] LoadOption<'state> =
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet, and we will be generating events
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (StreamToken * 'state)
| FromMemento of memento : (Core.StreamToken * 'state)

/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required
and ISyncContext<'state> =
Expand All @@ -96,4 +113,4 @@ and ISyncContext<'state> =
abstract member State : 'state

/// Represents a Checkpoint position on a Stream's timeline; Can be used to manage continuations via LoadOption.FromMemento
abstract member CreateMemento : unit -> StreamToken * 'state
abstract member CreateMemento : unit -> Core.StreamToken * 'state
2 changes: 1 addition & 1 deletion src/Equinox/Equinox.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
</PropertyGroup>

<ItemGroup>
<Compile Include="Flow.fs" />
<Compile Include="Core.fs" />
<Compile Include="Decider.fs" />
</ItemGroup>

Expand Down

0 comments on commit 11afa8f

Please sign in to comment.