Skip to content

Commit

Permalink
Formatting consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Dec 7, 2019
1 parent 31bd923 commit 02fe311
Show file tree
Hide file tree
Showing 11 changed files with 37 additions and 37 deletions.
2 changes: 1 addition & 1 deletion equinox-fc/Domain.Tests/AllocationTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,6 @@ open FsCheck.Xunit
open Swensen.Unquote

let [<Property>] ``codec can roundtrip`` event =
let ee = Events.codec.Encode(None,event)
let ee = Events.codec.Encode(None, event)
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
test <@ Some event = Events.codec.TryDecode ie @>
2 changes: 1 addition & 1 deletion equinox-fc/Domain.Tests/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ module Cosmos =
let context = Context(connection, d, c)
let cache = Equinox.Cache (appName, 10)
context, cache
| s,d,c ->
| s, d, c ->
failwithf "Connection, Database and Container EQUINOX_COSMOS_* Environment variables are required (%b,%b,%b)"
(Option.isSome s) (Option.isSome d) (Option.isSome c)

Expand Down
12 changes: 6 additions & 6 deletions equinox-fc/Domain.Tests/LocationEpochTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -6,23 +6,23 @@ open Swensen.Unquote

let interpret delta _balance =
match delta with
| 0 -> (),[]
| delta -> (),[Events.Delta { value = delta }]
| 0 -> (), []
| delta -> (), [Events.Delta { value = delta }]

let validateAndInterpret expectedBalance delta balance =
test <@ expectedBalance = balance @>
interpret delta balance

let verifyDeltaEvent delta events =
let dEvents = events |> List.filter (function Events.Delta _ -> true | _ -> false)
test <@ interpret delta (Unchecked.defaultof<_>) = ((),dEvents) @>
test <@ interpret delta (Unchecked.defaultof<_>) = ((), dEvents) @>

let [<Property>] properties carriedForward delta1 closeImmediately delta2 close =

(* Starting with an empty stream, we'll need to supply the balance carried forward, optionally we apply a delta and potentially close *)

let initialShouldClose _state = closeImmediately
let res,events = sync (Some carriedForward) (validateAndInterpret carriedForward delta1) initialShouldClose Fold.initial
let res, events = sync (Some carriedForward) (validateAndInterpret carriedForward delta1) initialShouldClose Fold.initial
let cfEvents events = events |> List.filter (function Events.CarriedForward _ -> true | _ -> false)
let closeEvents events = events |> List.filter (function Events.Closed -> true | _ -> false)
let state1 = Fold.fold Fold.initial events
Expand All @@ -38,7 +38,7 @@ let [<Property>] properties carriedForward delta1 closeImmediately delta2 close
(* After initializing, validate we don't need to supply a carriedForward, and don't produce a CarriedForward event *)

let shouldClose _state = close
let { isOpen = isOpen; result = worked; balance = bal },events = sync None (validateAndInterpret expectedBalance delta2) shouldClose state1
let { isOpen = isOpen; result = worked; balance = bal }, events = sync None (validateAndInterpret expectedBalance delta2) shouldClose state1
let expectedBalance = if expectImmediateClose then expectedBalance else expectedBalance + delta2
test <@ [] = cfEvents events
&& (expectImmediateClose || not close || 1 = Seq.length (closeEvents events)) @>
Expand All @@ -49,6 +49,6 @@ let [<Property>] properties carriedForward delta1 closeImmediately delta2 close
verifyDeltaEvent delta2 events

let [<Property>] ``codec can roundtrip`` event =
let ee = Events.codec.Encode(None,event)
let ee = Events.codec.Encode(None, event)
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
test <@ Some event = Events.codec.TryDecode ie @>
2 changes: 1 addition & 1 deletion equinox-fc/Domain.Tests/LocationSeriesTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,6 @@ let [<Property>] properties c1 c2 =
test <@ List.isEmpty l @>

let [<Property>] ``codec can roundtrip`` event =
let ee = Events.codec.Encode(None,event)
let ee = Events.codec.Encode(None, event)
let ie = FsCodec.Core.TimelineEvent.Create(0L, ee.EventType, ee.Data)
test <@ Some event = Events.codec.TryDecode ie @>
14 changes: 7 additions & 7 deletions equinox-fc/Domain.Tests/LocationTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ module Location =

let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Fold.fold, Epoch.Fold.initial).Resolve

let createService (zeroBalance, shouldClose) store =
let create (zeroBalance, shouldClose) store =
let maxAttempts = Int32.MaxValue
let series = Series.create (Series.resolve store) maxAttempts
let epochs = Epoch.create (Epoch.resolve store) maxAttempts
Expand All @@ -39,25 +39,25 @@ let run (service : LocationService) (IdsAtLeastOne locations, deltas : _[]) = As
let value = max -bal delta
if value = 0 then 0, []
else value, [Location.Epoch.Events.Delta { value = value }]
let! appliedDeltas = seq { for loc,x in updates -> async { let! _,eff = service.Execute(loc, adjust x) in return loc,eff } } |> Async.Parallel
let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l,xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq
let! appliedDeltas = seq { for loc, x in updates -> async { let! _, eff = service.Execute(loc, adjust x) in return loc, eff } } |> Async.Parallel
let expectedBalances = Seq.append (seq { for l in locations -> l, 0}) appliedDeltas |> Seq.groupBy fst |> Seq.map (fun (l, xs) -> l, xs |> Seq.sumBy snd) |> Set.ofSeq

(* Verify loading yields identical state *)

let! balances = seq { for loc in locations -> async { let! bal,() = service.Execute(loc,(fun _ -> (),[])) in return loc,bal } } |> Async.Parallel
let! balances = seq { for loc in locations -> async { let! bal, () = service.Execute(loc, (fun _ -> (), [])) in return loc, bal } } |> Async.Parallel
test <@ expectedBalances = Set.ofSeq balances @> }

let [<Property>] ``MemoryStore properties`` maxEvents args =
let store = Equinox.MemoryStore.VolatileStore()
let zeroBalance = 0
let maxEvents = max 1 maxEvents
let shouldClose (state : Epoch.Fold.OpenState) = state.count > maxEvents
let service = Location.MemoryStore.createService (zeroBalance, shouldClose) store
let service = Location.MemoryStore.create (zeroBalance, shouldClose) store
run service args

type Cosmos(testOutput) =

let context,cache = Cosmos.connect ()
let context, cache = Cosmos.connect ()

let log = testOutput |> TestOutputAdapter |> createLogger
do Serilog.Log.Logger <- log
Expand All @@ -66,5 +66,5 @@ type Cosmos(testOutput) =
let zeroBalance = 0
let maxEvents = max 1 maxEvents
let shouldClose (state : Epoch.Fold.OpenState) = state.count > maxEvents
let service = Location.Cosmos.createService (zeroBalance, shouldClose) (context,cache,Int32.MaxValue)
let service = Location.Cosmos.create (zeroBalance, shouldClose) (context, cache, Int32.MaxValue)
run service args
6 changes: 3 additions & 3 deletions equinox-fc/Domain/Allocation.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ module Events =
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let [<Literal>] categoryId = "Allocation"
let (|AggregateId|) id = Equinox.AggregateId(categoryId, AllocationId.toString id)
let (|For|) id = Equinox.AggregateId(categoryId, AllocationId.toString id)

module Fold =

Expand Down Expand Up @@ -216,9 +216,9 @@ let sync (updates : Update seq, command : Command) (state : Fold.State) : (bool*
(* Yield outstanding processing requirements (if any), together with events accumulated based on the `updates` *)
(accepted, ProcessState.FromFoldState state), acc.Accumulated

type Service internal (log, resolve, ?maxAttempts) =
type Service internal (log, resolve, maxAttempts) =

let resolve (Events.AggregateId id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts = defaultArg maxAttempts 3)
let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)

member __.Sync(allocationId, updates, command) : Async<bool*ProcessState> =
let stream = resolve allocationId
Expand Down
8 changes: 4 additions & 4 deletions equinox-fc/Domain/ListAllocation.fs
Original file line number Diff line number Diff line change
Expand Up @@ -8,15 +8,15 @@ type Service(maxListLen, allocators : Allocator.Service, allocations : Allocatio
let cutoff = let now = DateTimeOffset.UtcNow in now.Add transactionTimeout
let! state = allocators.Commence(allocatorId, allocationId, cutoff)
// TODO cancel timed out conflicting work
let! (_,state) = allocations.Sync(allocationId, Seq.empty, Allocation.Commence tickets)
let! _, state = allocations.Sync(allocationId, Seq.empty, Allocation.Commence tickets)
return state }

member __.Read(allocationId) : Async<_> = async {
let! (_,state) = allocations.Sync(allocationId, Seq.empty, Allocation.Command.Apply ([],[]))
let! _, state = allocations.Sync(allocationId, Seq.empty, Allocation.Command.Apply ([], []))
// TODO incorporate allocator state
return state }

member __.Cancel(allocatorId,allocationId) : Async<_> = async {
let! (_,state) = allocations.Sync(allocationId, Seq.empty, Allocation.Command.Cancel)
member __.Cancel(allocatorId, allocationId) : Async<_> = async {
let! _, state = allocations.Sync(allocationId, Seq.empty, Allocation.Command.Cancel)
// TODO propagate to allocator with reason
return state }
8 changes: 4 additions & 4 deletions equinox-fc/Domain/Location.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ type LocationService internal (zeroBalance, shouldClose, series : Series.Service

let rec execute locationId originEpochId =
let rec aux epochId balanceToCarryForward wip = async {
let decide state = match wip with Complete r -> r,[] | Pending decide -> decide state
let decide state = match wip with Complete r -> r, [] | Pending decide -> decide state
match! epochs.Sync(locationId, epochId, balanceToCarryForward, decide, shouldClose) with
| { balance = bal; result = Some res; isOpen = true } ->
if originEpochId <> epochId then
Expand Down Expand Up @@ -40,7 +40,7 @@ module Helpers =

module Cosmos =

let createService (zeroBalance, shouldClose) (context, cache, maxAttempts) =
let series = Series.Cosmos.createService (context, cache, maxAttempts)
let epochs = Epoch.Cosmos.createService (context, cache, maxAttempts)
let create (zeroBalance, shouldClose) (context, cache, maxAttempts) =
let series = Series.Cosmos.create (context, cache, maxAttempts)
let epochs = Epoch.Cosmos.create (context, cache, maxAttempts)
create (zeroBalance, shouldClose) (series, epochs)
8 changes: 4 additions & 4 deletions equinox-fc/Domain/LocationEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ let sync (balanceCarriedForward : Fold.Balance option) (decide : (Fold.Balance -
acc.Ingest state <|
match state with
| Fold.Initial -> failwith "We've just guaranteed not Initial"
| Fold.Open { value = bal } -> let r,es = decide bal in Some r,es
| Fold.Open { value = bal } -> let r, es = decide bal in Some r, es
| Fold.Closed _ -> None, []
// Finally (iff we're `Open`, have run a `decide` and `shouldClose`), we generate a Closed event
let (balance, isOpen), _ =
Expand All @@ -82,8 +82,8 @@ let create resolve maxAttempts = Service(Serilog.Log.ForContext<Service>(), reso
module Cosmos =

open Equinox.Cosmos
let resolve (context,cache) =
let resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.Unoptimized).Resolve
let createService (context,cache,maxAttempts) =
create (resolve (context,cache)) maxAttempts
let create (context, cache, maxAttempts) =
create (resolve (context, cache)) maxAttempts
8 changes: 4 additions & 4 deletions equinox-fc/Domain/LocationSeries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -43,9 +43,9 @@ let create resolve maxAttempts = Service(Serilog.Log.ForContext<Service>(), reso
module Cosmos =

open Equinox.Cosmos
let resolve (context,cache) =
let resolve (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let opt = Equinox.ResolveOption.AllowStale
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id,opt)
let createService (context, cache, maxAttempts) =
create (resolve (context,cache)) maxAttempts
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id, opt)
let create (context, cache, maxAttempts) =
create (resolve (context, cache)) maxAttempts
4 changes: 2 additions & 2 deletions equinox-fc/Domain/TicketList.fs
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ module Events =
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let [<Literal>] categoryId = "TicketList"
let (|AggregateId|) id = Equinox.AggregateId(categoryId, TicketListId.toString id)
let (|For|) id = Equinox.AggregateId(categoryId, TicketListId.toString id)

module Fold =

Expand All @@ -31,7 +31,7 @@ let interpret (allocatorId : AllocatorId, allocated : TicketId list) (state : Fo

type Service internal (log, resolve, maxAttempts) =

let resolve (Events.AggregateId id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)
let resolve (Events.For id) = Equinox.Stream<Events.Event, Fold.State>(log, resolve id, maxAttempts)

member __.Sync(pickListId, allocatorId, assignedTickets) : Async<unit> =
let stream = resolve pickListId
Expand Down

0 comments on commit 02fe311

Please sign in to comment.