Skip to content

Commit

Permalink
Aggregate layout/naming consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 7, 2019
1 parent 6d5a46e commit 31bd923
Show file tree
Hide file tree
Showing 7 changed files with 150 additions and 146 deletions.
30 changes: 15 additions & 15 deletions equinox-fc/Domain.Tests/AllocatorTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,45 +15,45 @@ type Result =

let execute cmd state =
match cmd with
| Commence (a,c) ->
| Commence (a, c) ->
match decideCommence a c state with
| CommenceResult.Accepted, es -> Accepted,es
| CommenceResult.Conflict a, es -> Conflict a,es
| Complete (a,r) -> let es = decideComplete a r state in Accepted, es
| CommenceResult.Accepted, es -> Accepted, es
| CommenceResult.Conflict a, es -> Conflict a, es
| Complete (a, r) -> let es = decideComplete a r state in Accepted, es

let [<Property>] properties c1 c2 =
let res,events = execute c1 Folds.initial
let state1 = Folds.fold Folds.initial events
let res, events = execute c1 Fold.initial
let state1 = Fold.fold Fold.initial events
match c1, res, events, state1 with
| Commence (a,c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state ->
| Commence (a, c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state ->
test <@ a = ea && c = ec && state = Some e @>
| Complete _, Accepted, [], None ->
() // Non-applicable Complete requests are simply ignored
| _, res, l, _ ->
test <@ List.isEmpty l && res = Accepted @>

let res,events = execute c2 state1
let state2 = Folds.fold state1 events
let res, events = execute c2 state1
let state2 = Fold.fold state1 events
match state1, c2, res, events, state2 with
// As per above, normal commence
| None, Commence (a,c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state ->
| None, Commence (a, c), Accepted, [Events.Commenced ({ allocationId = ea; cutoff = ec } as e)], state ->
test <@ a = ea && c = ec && state = Some e @>
// Idempotent accept if same allocationId
| Some active as s1, Commence (a,_), Accepted, [], s2 ->
| Some active as s1, Commence (a, _), Accepted, [], s2 ->
test <@ s1 = s2 && active.allocationId = a @>
// Conflict reports owner allocator
| Some active as s1, Commence (a2,_), Conflict a1, [], s2 ->
| Some active as s1, Commence (a2, _), Conflict a1, [], s2 ->
test <@ s1 = s2 && a2 <> a1 && a1 = active.allocationId @>
// Correct complete for same allocator is accepted
| Some active, Complete (a,r), Accepted, [Events.Completed { allocationId = ea; reason = er }], None ->
| Some active, Complete (a, r), Accepted, [Events.Completed { allocationId = ea; reason = er }], None ->
test <@ er = r && ea = a && active.allocationId = a @>
// Completes not for the same allocator are ignored
| Some active as s1, Complete (a,_), Accepted, [], s2 ->
| Some active as s1, Complete (a, _), Accepted, [], s2 ->
test <@ active.allocationId <> a && s2 = s1 @>
| _, _, res, l, _ ->
test <@ List.isEmpty l && res = Accepted @>

let [<Property>] ``codec can roundtrip`` event =
let ee = Events.codec.Encode(None,event)
let ee = Events.codec.Encode(None, event)
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
test <@ Some event = Events.codec.TryDecode ie @>
20 changes: 10 additions & 10 deletions equinox-fc/Domain.Tests/TicketListTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,31 +5,31 @@ open Swensen.Unquote
open TicketList

let [<Property>] properties c1 c2 =
let events = interpret c1 Folds.initial
let state1 = Folds.fold Folds.initial events
let events = interpret c1 Fold.initial
let state1 = Fold.fold Fold.initial events
match c1, events, state1 with
// Empty request -> no Event
| (_,[]), [], state ->
| (_, []), [], state ->
test <@ Set.isEmpty state @>
| (a,t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state ->
| (a, t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state ->
test <@ a = ea @>
test <@ state = set t @>
test <@ state = set et @>
| _, l, _ ->
test <@ List.isEmpty l @>

let events = interpret c2 state1
let state2 = Folds.fold state1 events
test <@ Folds.fold state2 [Folds.snapshot state2] = state2 @>
let state2 = Fold.fold state1 events
test <@ Fold.fold state2 [Fold.snapshot state2] = state2 @>
match state1, c2, events, state2 with
// Empty request -> no Event, same state
| s1, (_,[]), [], state ->
| s1, (_, []), [], state ->
test <@ state = s1 @>
// Redundant request -> No Event, same state
| s1, (_,t), [], _ ->
| s1, (_, t), [], _ ->
test <@ Set.isSuperset s1 (set t) @>
// Two consecutive commands should both manifest in the state
| s1, (a,t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state ->
| s1, (a, t), [Events.Allocated { allocatorId = ea; ticketIds = et }], state ->
test <@ a = ea @>
let et = Set et
test <@ Set.isSuperset (set t) et @>
Expand All @@ -40,6 +40,6 @@ let [<Property>] properties c1 c2 =
test <@ List.isEmpty l @>

let [<Property>] ``codec can roundtrip`` event =
let ee = Events.codec.Encode(None,event)
let ee = Events.codec.Encode(None, event)
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
test <@ Some event = Events.codec.TryDecode ie @>
32 changes: 16 additions & 16 deletions equinox-fc/Domain.Tests/TicketTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ module TicketTests
open FsCheck.Xunit
open Swensen.Unquote
open Ticket
open Ticket.Folds
open Ticket.Fold

/// We want to generate Allocate requests with and without the same listId in some cases
let (|MaybeSameCommands|) = function
Expand All @@ -14,9 +14,9 @@ let (|MaybeSameCommands|) = function
/// Explicitly generate sequences with the same allocator running twice or three times
let (|MaybeSameIds|) = function
| Choice1Of4 a -> a, a, a
| Choice2Of4 (a,b) -> a, a, b
| Choice3Of4 (a,b) -> a, b, b
| Choice4Of4 (a,b,c) -> a, b, c
| Choice2Of4 (a, b) -> a, a, b
| Choice3Of4 (a, b) -> a, b, b
| Choice4Of4 (a, b, c) -> a, b, c

let (|Invariants|) = function
// Revokes always succeed iff Unallocated
Expand All @@ -36,47 +36,47 @@ let (|ReservedCases|_|) allocator = function
test <@ a = allocator @>
Some ()
// Revokes not by the owner are reported as successful, but we force the real owner to do the real relinquish
| (Reserved by | Allocated(by,_)), Revoke, true, [], _ ->
| (Reserved by | Allocated(by, _)), Revoke, true, [], _ ->
test <@ by <> allocator @>
Some ()
// Revokes succeed iff by the owner
| (Reserved by | Allocated(by,_)), Revoke, true, [Events.Revoked], Unallocated ->
| (Reserved by | Allocated(by, _)), Revoke, true, [Events.Revoked], Unallocated ->
test <@ by = allocator @>
Some ()
// Reservations can transition to Allocations as long as it's the same Allocator requesting
| Reserved a, Allocate l, true, [Events.Allocated { allocatorId = ea; listId = el }], Allocated (sa,sl) ->
| Reserved a, Allocate l, true, [Events.Allocated { allocatorId = ea; listId = el }], Allocated (sa, sl) ->
test <@ a = allocator && a = ea && a = sa && l = el && l = sl @>
Some()
| _ -> None

let [<Property>] properties (MaybeSameIds (a1,a2,a3)) (MaybeSameCommands (c1,c2,c3)) =
let res, events = decide a1 c1 Folds.initial
let state1 = Folds.fold Folds.initial events
let [<Property>] properties (MaybeSameIds (a1, a2, a3)) (MaybeSameCommands (c1, c2, c3)) =
let res, events = decide a1 c1 Fold.initial
let state1 = Fold.fold Fold.initial events

match Folds.initial, c1, res, events, state1 with
match Fold.initial, c1, res, events, state1 with
| _, Reserve, true, [Events.Reserved { allocatorId = a }], Reserved sa ->
test <@ a = a1 && sa = a1 @>
| Invariants -> ()

let res, events = decide a2 c2 state1
let state2 = Folds.fold state1 events
let state2 = Fold.fold state1 events
match state1, c2, res, events, state2 with
| ReservedCases a2 -> ()
| Invariants -> ()

let res, events = decide a3 c3 state2
let state3 = Folds.fold state2 events
let state3 = Fold.fold state2 events
match state2, c3, res, events, state3 with
// Idempotent allocate ignore
| Allocated (a,l), Allocate l3, true, [], _ ->
| Allocated (a, l), Allocate l3, true, [], _ ->
test <@ a = a3 && l = l3 @>
// Allocated -> Revoked
| Allocated (a,_), Revoke, true, [Events.Revoked], Unallocated ->
| Allocated (a, _), Revoke, true, [Events.Revoked], Unallocated ->
test <@ a = a3 @>
| ReservedCases a3 -> ()
| Invariants -> ()

let [<Property>] ``codec can roundtrip`` event =
let ee = Events.codec.Encode(None,event)
let ee = Events.codec.Encode(None, event)
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
test <@ Some event = Events.codec.TryDecode ie @>
87 changes: 44 additions & 43 deletions equinox-fc/Domain/Allocation.fs
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ module Events =
let [<Literal>] categoryId = "Allocation"
let (|AggregateId|) id = Equinox.AggregateId(categoryId, AllocationId.toString id)

module Folds =
module Fold =

type State = NotStarted | Running of States | Canceling of States | Completed
and States =
Expand All @@ -59,7 +59,7 @@ module Folds =
let withRevoked (ToSet xs) x = { withKnown xs x with reserved = Set.difference x.reserved xs }
let withReleasing (ToSet xs) x ={ withKnown xs x with releasing = x.releasing |> Set.union xs } // TODO
let withAssigned listId x = // TODO
let decided,remaining = x.assigning |> List.partition (fun x -> x.listId = listId)
let decided, remaining = x.assigning |> List.partition (fun x -> x.listId = listId)
let xs = seq { for x in decided do yield! x.ticketIds }
{ withRevoked xs x with assigning = remaining }
let initial = NotStarted
Expand Down Expand Up @@ -121,17 +121,17 @@ type ProcessState =
| Cancelling of toAssign : Events.Allocated list * toRelease : TicketId list
| Completed
static member FromFoldState = function
| Folds.NotStarted ->
| Fold.NotStarted ->
NotStarted
| Folds.Running e ->
| Fold.Running e ->
match Set.toList e.reserved, e.assigning, Set.toList e.releasing, Set.toList e.unknown with
| res, [], [], [] ->
Idle (reserved = res)
| res, ass, rel, tor ->
Running (reserved = res, toAssign = ass, toRelease = rel, toReserve = tor)
| Folds.Canceling e ->
| Fold.Canceling e ->
Cancelling (toAssign = e.assigning, toRelease = [yield! e.reserved; yield! e.unknown; yield! e.releasing])
| Folds.Completed ->
| Fold.Completed ->
Completed

/// Updates recording attained progress
Expand All @@ -146,26 +146,26 @@ let (|SetEmpty|_|) s = if Set.isEmpty s then Some () else None

/// Map processed work to associated events that are to be recorded in the stream
let decideUpdate update state =
let owned (s : Folds.States) = Set.union s.releasing (set <| seq { yield! s.unknown; yield! s.reserved })
let owned (s : Fold.States) = Set.union s.releasing (set <| seq { yield! s.unknown; yield! s.reserved })
match state, update with
| (Folds.Completed | Folds.NotStarted), (Failed _|Reserved _|Assigned _|Revoked _) as x ->
| (Fold.Completed | Fold.NotStarted), (Failed _|Reserved _|Assigned _|Revoked _) as x ->
failwithf "Folds.Completed or NotStarted cannot handle (Failed|Revoked|Assigned) %A" x
| (Folds.Running s|Folds.Canceling s), Reserved (ToSet xs) ->
| (Fold.Running s|Fold.Canceling s), Reserved (ToSet xs) ->
match set s.unknown |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Reserved { ticketIds = Set.toArray changed }]
| (Folds.Running s|Folds.Canceling s), Failed (ToSet xs) ->
| (Fold.Running s|Fold.Canceling s), Failed (ToSet xs) ->
match owned s |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Failed { ticketIds = Set.toArray changed }]
| (Folds.Running s|Folds.Canceling s), Revoked (ToSet xs) ->
| (Fold.Running s|Fold.Canceling s), Revoked (ToSet xs) ->
match owned s |> Set.intersect xs with SetEmpty -> [] | changed -> [Events.Revoked { ticketIds = Set.toArray changed }]
| (Folds.Running s|Folds.Canceling s), Assigned listId ->
| (Fold.Running s|Fold.Canceling s), Assigned listId ->
if s.assigning |> List.exists (fun x -> x.listId = listId) then [Events.Assigned { listId = listId }] else []

/// Holds events accumulated from a series of decisions while also evolving the presented `state` to reflect the pended events
type private Accumulator() =
let acc = ResizeArray()
member __.Ingest state : 'res * Events.Event list -> 'res * Folds.State = function
| res, [] -> res,state
| res, [e] -> acc.Add e; res,Folds.evolve state e
| res, xs -> acc.AddRange xs; res,Folds.fold state (Seq.ofList xs)
member __.Ingest state : 'res * Events.Event list -> 'res * Fold.State = function
| res, [] -> res, state
| res, [e] -> acc.Add e; res, Fold.evolve state e
| res, xs -> acc.AddRange xs; res, Fold.fold state (Seq.ofList xs)
member __.Accumulated = List.ofSeq acc

/// Impetus provided to the Aggregate Service from the Process Manager
Expand All @@ -175,78 +175,79 @@ type Command =
| Cancel

/// Apply updates, decide whether Command is applicable, emit state reflecting work to be completed to conclude the in-progress workflow (if any)
let sync (updates : Update seq, command : Command) (state : Folds.State) : (bool*ProcessState) * Events.Event list =
let sync (updates : Update seq, command : Command) (state : Fold.State) : (bool*ProcessState) * Events.Event list =
let acc = Accumulator()

(* Apply any updates *)
let mutable state = state
for x in updates do
let (),state' = acc.Ingest state ((),decideUpdate x state)
let (), state' = acc.Ingest state ((), decideUpdate x state)
state <- state'

(* Decide whether the Command is now acceptable *)
let accepted,state =
let accepted, state =
acc.Ingest state <|
match state, command with
(* Ignore on the basis of being idempotent in the face of retries *)
// TOCONSIDER how to represent that a request is being denied e.g. due to timeout vs due to being complete
| (Folds.Idle|Folds.Releasing _), Apply _ ->
| (Fold.Idle|Fold.Releasing _), Apply _ ->
false, []
(* Defer; Need to allow current request to progress before it can be considered *)
| (Folds.Acquiring _|Folds.Releasing _), Commence _ ->
| (Fold.Acquiring _|Fold.Releasing _), Commence _ ->
true, [] // TODO validate idempotent ?
(* Ok on the basis of idempotency *)
| (Folds.Idle|Folds.Releasing _), Cancel ->
| (Fold.Idle|Fold.Releasing _), Cancel ->
true, []
(* Ok; Currently idle, normal Commence request*)
| Folds.Idle, Commence tickets ->
true,[Events.Commenced { ticketIds = Array.ofList tickets }]
| Fold.Idle, Commence tickets ->
true, [Events.Commenced { ticketIds = Array.ofList tickets }]
(* Ok; normal apply to distribute held tickets *)
| Folds.Acquiring s, Apply (assign,release) ->
| Fold.Acquiring s, Apply (assign, release) ->
let avail = System.Collections.Generic.HashSet s.reserved
let toAssign = [for a in assign -> { a with ticketIds = a.ticketIds |> Array.where avail.Remove }]
let toRelease = (Set.empty,release) ||> List.fold (fun s x -> if avail.Remove x then Set.add x s else s)
let toRelease = (Set.empty, release) ||> List.fold (fun s x -> if avail.Remove x then Set.add x s else s)
true, [
for x in toAssign do if (not << Array.isEmpty) x.ticketIds then yield Events.Allocated x
match toRelease with SetEmpty -> () | toRelease -> yield Events.Released { ticketIds = Set.toArray toRelease }]
(* Ok, normal Cancel *)
| Folds.Acquiring _, Cancel ->
| Fold.Acquiring _, Cancel ->
true, [Events.Cancelled]

(* Yield outstanding processing requirements (if any), together with events accumulated based on the `updates` *)
(accepted, ProcessState.FromFoldState state), acc.Accumulated

type Service internal (resolve, ?maxAttempts) =
type Service internal (log, resolve, ?maxAttempts) =

let log = Serilog.Log.ForContext<Service>()
let (|Stream|) (Events.AggregateId id) = Equinox.Stream<Events.Event,Folds.State>(log, resolve id, maxAttempts = defaultArg maxAttempts 3)
let resolve (Events.AggregateId id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts = defaultArg maxAttempts 3)

member __.Sync(allocationId,updates,command) : Async<bool*ProcessState> =
let (Stream stream) = allocationId
stream.Transact(sync (updates,command))
member __.Sync(allocationId, updates, command) : Async<bool*ProcessState> =
let stream = resolve allocationId
stream.Transact(sync (updates, command))

let create resolve = Service(Serilog.Log.ForContext<Service>(), resolve, maxAttempts = 3)

module EventStore =

open Equinox.EventStore
let resolve (context,cache) =
let resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
// while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent
let opt = Equinox.ResolveOption.AllowStale
// We should be reaching Completed state frequently so no actual Snapshots should get written
fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy).Resolve(id,opt)
let create (context,cache) =
Service(resolve (context,cache))
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy).Resolve(id, opt)
let create (context, cache) =
create (resolve (context, cache))

module Cosmos =

open Equinox.Cosmos
let resolve (context,cache) =
let resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
// while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent
let opt = Equinox.ResolveOption.AllowStale
// TODO impl snapshots
let makeEmptyUnfolds events _state = events,[]
let accessStrategy = AccessStrategy.Custom (Folds.isOrigin,makeEmptyUnfolds)
fun id -> Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, accessStrategy).Resolve(id,opt)
let create (context,cache) =
Service(resolve (context,cache))
let makeEmptyUnfolds events _state = events, []
let accessStrategy = AccessStrategy.Custom (Fold.isOrigin, makeEmptyUnfolds)
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
let create (context, cache) =
create (resolve (context, cache))
Loading

0 comments on commit 31bd923

Please sign in to comment.