Skip to content

Commit

Permalink
Tidy LoadOptions
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Jan 24, 2022
1 parent 73e9947 commit 7f1f76b
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 33 deletions.
2 changes: 1 addition & 1 deletion samples/Store/Domain/Cart.fs
Original file line number Diff line number Diff line change
Expand Up @@ -148,7 +148,7 @@ type Service internal (resolve : CartId -> Equinox.Decider<Events.Event, Fold.St
return interpretMany Fold.fold (Seq.map interpret commands) state }
#endif
let decider = resolve cartId
decider.Transact(interpret, (if optimistic then Equinox.AllowStale else Equinox.Load))
decider.Transact(interpret, (if optimistic then Equinox.AllowStale else Equinox.RequireLoad))

member __.ExecuteManyAsync(cartId, optimistic, commands : Command seq, ?prepare) : Async<unit> =
__.Run(cartId, optimistic, commands, ?prepare=prepare) |> Async.Ignore
Expand Down
8 changes: 2 additions & 6 deletions src/Equinox.Core/StoreCategory.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,8 @@ namespace Equinox.Core
type private Stream<'event, 'state, 'context>(category : ICategory<'event, 'state, string, 'context>, streamId: string, empty : StreamToken * 'state, ?context : 'context, ?init : unit -> Async<unit>) =

interface IStream<'event, 'state> with
member _.Load(log, opt) =
match opt with
| Equinox.LoadOption.AssumeEmpty -> async { return empty }
| Equinox.LoadOption.FromMemento (streamToken, state) -> async { return (streamToken, state) }
| Equinox.LoadOption.AllowStale -> category.Load(log, streamId, true)
| Equinox.LoadOption.Load -> category.Load(log, streamId, false)
member _.LoadEmpty() = empty
member _.Load(log, allowStale) = category.Load(log, streamId, allowStale)
member _.TrySync(log, token: StreamToken, originState: 'state, events: 'event list) =
let sync = category.TrySync(log, streamId, token, originState, events, context)
match init with
Expand Down
29 changes: 23 additions & 6 deletions src/Equinox/Decider.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,20 @@
open Equinox.Core
open System.Runtime.InteropServices

/// Store-agnostic Loading Options
[<NoComparison; NoEquality>]
type LoadOption<'state> =
/// No special requests; Obtain latest state from store based on consistency level configured
| RequireLoad
/// If the Cache holds any state, use that without checking the backing store for updates, implying:
/// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome)
/// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query`
| AllowStale
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (StreamToken * 'state)

/// Exception yielded by Decider.Transact after `count` attempts have yielded conflicts at the point of syncing with the Store
type MaxResyncsExhaustedException(count) =
inherit exn(sprintf "Concurrency violation; aborting after %i attempts." count)
Expand All @@ -12,19 +26,22 @@ type Decider<'event, 'state>
( log, stream : IStream<'event, 'state>, maxAttempts : int,
[<Optional; DefaultParameterValue(null)>] ?createAttemptsExhaustedException : int -> exn,
[<Optional; DefaultParameterValue(null)>] ?resyncPolicy,
?defaultOption) =
?allowStale) =

let resolveOptions : LoadOption<'state> option -> LoadOption<'state> = function
| None -> defaultArg defaultOption LoadOption.Load
| Some o -> o
let load : LoadOption<'state> option -> _ = function
| None when allowStale = Some true -> fun log -> stream.Load(log, true)
| None | Some RequireLoad -> fun log -> stream.Load(log, false)
| Some AllowStale -> fun log -> stream.Load(log, true)
| Some AssumeEmpty -> fun _log -> async { return stream.LoadEmpty() }
| Some (FromMemento (streamToken, state)) -> fun _log -> async { return (streamToken, state) }

let transact maybeOverride decide mapResult =
let resyncPolicy = defaultArg resyncPolicy (fun _log _attemptNumber resyncF -> async { return! resyncF })
let createDefaultAttemptsExhaustedException attempts : exn = MaxResyncsExhaustedException attempts :> exn
let createAttemptsExhaustedException = defaultArg createAttemptsExhaustedException createDefaultAttemptsExhaustedException
Flow.transact (resolveOptions maybeOverride) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult
Flow.transact (load maybeOverride) (maxAttempts, resyncPolicy, createAttemptsExhaustedException) (stream, log) decide mapResult

let query option args = Flow.query (resolveOptions option) args
let query option args = Flow.query (load option) args

/// 0. Invoke the supplied <c>interpret</c> function with the present state
/// 1a. (if events yielded) Attempt to sync the yielded events events to the stream
Expand Down
26 changes: 7 additions & 19 deletions src/Equinox/Flow.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,20 +5,6 @@ type [<NoComparison>] StreamToken = { value : obj; version: int64 }

namespace Equinox

/// Store-agnostic Loading Options
[<NoComparison; NoEquality>]
type LoadOption<'state> =
/// No special requests; Obtain latest state from store based on consistency level configured
| Load
/// If the Cache holds any state, use that without checking the backing store for updates, implying:
/// - maximizing how much we lean on Optimistic Concurrency Control when doing a `Transact` (you're still guaranteed a consistent outcome)
/// - enabling stale reads [in the face of multiple writers (either in this process or in other processes)] when doing a `Query`
| AllowStale
/// Inhibit load from database based on the fact that the stream is likely not to have been initialized yet
| AssumeEmpty
/// <summary>Instead of loading from database, seed the loading process with the supplied memento, obtained via <c>ISyncContext.CreateMemento()</c></summary>
| FromMemento of memento : (Core.StreamToken * 'state)

/// Exposed by TransactEx / QueryEx, providing access to extended state information for cases where that's required
type ISyncContext<'state> =

Expand Down Expand Up @@ -52,8 +38,10 @@ type SyncResult<'state> =
/// Store-agnostic interface representing interactions a Flow can have with the state of a given event stream. Not intended for direct use by consumer code.
type IStream<'event, 'state> =

abstract LoadEmpty : unit -> StreamToken * 'state

/// Obtain the state from the target stream
abstract Load : log: ILogger * opt : Equinox.LoadOption<'state> -> Async<StreamToken * 'state>
abstract Load : log: ILogger * allowStale : bool -> Async<StreamToken * 'state>

/// Given the supplied `token` [and related `originState`], attempt to move to state `state'` by appending the supplied `events` to the underlying stream
/// SyncResult.Written: implies the state is now the value represented by the Result's value
Expand Down Expand Up @@ -131,12 +119,12 @@ module internal Flow =
// Commence, processing based on the incoming state
loop 1

let transact opt (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async {
let! streamState = stream.Load(log, opt)
let transact load (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) (stream : IStream<_, _>, log) decide mapResult : Async<'result> = async {
let! streamState = load log
let context = SyncContext(streamState, stream.TrySync)
return! run log (maxAttempts, resyncRetryPolicy, createMaxAttemptsExhaustedException) context decide mapResult }

let query opt (stream : IStream<'event, 'state>, log : ILogger) (project: SyncContext<'event, 'state> -> 'result) : Async<'result> = async {
let! streamState = stream.Load(log, opt)
let query load (stream : IStream<'event, 'state>, log : ILogger) (project: SyncContext<'event, 'state> -> 'result) : Async<'result> = async {
let! streamState = load log
let context = SyncContext(streamState, stream.TrySync)
return project context }
2 changes: 1 addition & 1 deletion tools/Equinox.Tool/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -413,7 +413,7 @@ module Dump =
with e -> log.ForContext("str", System.Text.Encoding.UTF8.GetString data).Warning(e, "Parse failure"); reraise()
let readStream (streamName : FsCodec.StreamName) = async {
let stream = cat.Resolve(idCodec,fold,initial,isOriginAndSnapshot) streamName
let! _token,events = stream.Load(storeLog, Equinox.LoadOption.Load)
let! _token,events = stream.Load(storeLog, allowStale = false)
let source = if not doE && not (List.isEmpty unfolds) then Seq.ofList unfolds else Seq.append events unfolds
let mutable prevTs = None
for x in source |> Seq.filter (fun e -> (e.IsUnfold && doU) || (not e.IsUnfold && doE)) do
Expand Down

0 comments on commit 7f1f76b

Please sign in to comment.