Skip to content

Commit

Permalink
Stragglers
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Mar 10, 2020
1 parent ba7c315 commit e73d6f3
Show file tree
Hide file tree
Showing 6 changed files with 21 additions and 21 deletions.
8 changes: 4 additions & 4 deletions equinox-fc/Domain.Tests/LocationTests.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,16 +15,16 @@ module Location =

module Series =

let resolve store = Resolver(store, Series.Events.codec, Series.Fold.fold, Series.Fold.initial).Resolve
let resolver store = Resolver(store, Series.Events.codec, Series.Fold.fold, Series.Fold.initial).Resolve

module Epoch =

let resolve store = Resolver(store, Epoch.Events.codec, Epoch.Fold.fold, Epoch.Fold.initial).Resolve
let resolver store = Resolver(store, Epoch.Events.codec, Epoch.Fold.fold, Epoch.Fold.initial).Resolve

let createService (zeroBalance, shouldClose) store =
let maxAttempts = Int32.MaxValue
let series = Series.create (Series.resolve store) maxAttempts
let epochs = Epoch.create (Epoch.resolve store) maxAttempts
let series = Series.create (Series.resolver store) maxAttempts
let epochs = Epoch.create (Epoch.resolver store) maxAttempts
create (zeroBalance, shouldClose) (series, epochs)

let run (service : Location.Service) (IdsAtLeastOne locations, deltas : _[], transactionId) = Async.RunSynchronously <| async {
Expand Down
10 changes: 5 additions & 5 deletions equinox-fc/Domain/InventoryEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,15 @@ type Service internal (resolve : InventoryId * InventoryEpochId -> Equinox.Strea
stream.Transact(decideSync capacity events)

let create resolver =
let resolve locationId =
let stream = resolver (streamName locationId)
let resolve ids =
let stream = resolver (streamName ids)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = 2)
Service (resolve)
Service(resolve)

module Cosmos =

let accessStrategy = Equinox.Cosmos.AccessStrategy.Snapshot (Fold.isOrigin, Fold.snapshot)
let resolve (context, cache) =
let resolver (context, cache) =
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
let create (context, cache) = create (resolve (context, cache))
let create (context, cache) = create (resolver (context, cache))
4 changes: 2 additions & 2 deletions equinox-fc/Domain/InventorySeries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -49,12 +49,12 @@ let create resolver =
module Cosmos =

let accessStrategy = Equinox.Cosmos.AccessStrategy.LatestKnownEvent
let resolve (context, cache) =
let resolver (context, cache) =
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
// For this stream, we uniformly use stale reads as:
// a) we don't require any information from competing writers
// b) while there are competing writers [which might cause us to have to retry a Transact], this should be infrequent
let opt = Equinox.ResolveOption.AllowStale
fun id -> Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
let create (context, cache) =
create (resolve (context, cache))
create (resolver (context, cache))
4 changes: 2 additions & 2 deletions equinox-fc/Domain/InventoryTransaction.fs
Original file line number Diff line number Diff line change
Expand Up @@ -148,10 +148,10 @@ module Cosmos =
let accessStrategy = Equinox.Cosmos.AccessStrategy.Unoptimized
// ... and there will generally be a single actor touching it at a given time, so we don't need to do a load (which would be more expensive than normal given the `accessStrategy`) before we sync
let opt = Equinox.AllowStale
let resolve (context, cache) =
let resolver (context, cache) =
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
fun id -> Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve(id, opt)
let createService (context, cache) = create (resolve (context, cache))
let createService (context, cache) = create (resolver (context, cache))

/// Handles requirement to infer when a transaction is 'stuck'
/// Note we don't want to couple to the state in a deep manner; thus we track:
Expand Down
8 changes: 4 additions & 4 deletions equinox-fc/Domain/LocationEpoch.fs
Original file line number Diff line number Diff line change
Expand Up @@ -134,16 +134,16 @@ type Service internal (resolve : LocationId * LocationEpochId -> Equinox.Stream<
stream.Transact(sync prevEpochBalanceCarriedForward decide shouldClose)

let create resolver maxAttempts =
let resolve locId =
let stream = resolver (streamName locId)
let resolve locationId =
let stream = resolver (streamName locationId)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = maxAttempts)
Service (resolve)

module Cosmos =

let accessStrategy = Equinox.Cosmos.AccessStrategy.Unoptimized
let resolve (context, cache) =
let resolver (context, cache) =
let cacheStrategy = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Equinox.Cosmos.Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, accessStrategy).Resolve
let create (context, cache, maxAttempts) =
create (resolve (context, cache)) maxAttempts
create (resolver (context, cache)) maxAttempts
8 changes: 4 additions & 4 deletions equinox-fc/Domain/LocationSeries.fs
Original file line number Diff line number Diff line change
Expand Up @@ -38,18 +38,18 @@ type Service internal (resolve : LocationId -> Equinox.Stream<Events.Event, Fold
stream.Transact(interpretAdvanceIngestionEpoch epochId)

let create resolver maxAttempts =
let resolve locId =
let stream = resolver (streamName locId)
let resolve locationId =
let stream = resolver (streamName locationId)
Equinox.Stream(Serilog.Log.ForContext<Service>(), stream, maxAttempts = maxAttempts)
Service (resolve)

module Cosmos =

open Equinox.Cosmos

let resolve (context, cache) =
let resolver (context, cache) =
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
let opt = Equinox.ResolveOption.AllowStale
fun id -> Resolver(context, Events.codec, Fold.fold, Fold.initial, cacheStrategy, AccessStrategy.LatestKnownEvent).Resolve(id, opt)
let createService (context, cache, maxAttempts) =
create (resolve (context, cache)) maxAttempts
create (resolver (context, cache)) maxAttempts

0 comments on commit e73d6f3

Please sign in to comment.