Skip to content

Commit

Permalink
Helper naming consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 4, 2019
1 parent 87e62f4 commit 7f509c6
Show file tree
Hide file tree
Showing 24 changed files with 96 additions and 88 deletions.
4 changes: 2 additions & 2 deletions equinox-testbed/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ module Guid =
/// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant
type ClientId = Guid<clientId>
and [<Measure>] clientId
module ClientId = let toStringN (value : ClientId) : string = Guid.toStringN %value
module ClientId = let toString (value : ClientId) : string = Guid.toStringN %value

/// SkuId strongly typed id; represented internally as a Guid
// NB Perf is suboptimal as a key, see Equinox's samples/Store for expanded version
type SkuId = Guid<skuId>
and [<Measure>] skuId
module SkuId = let toStringN (value : SkuId) : string = Guid.toStringN %value
module SkuId = let toString (value : SkuId) : string = Guid.toStringN %value
5 changes: 4 additions & 1 deletion equinox-testbed/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Domain =

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module Events =

type Favorited = { date: System.DateTimeOffset; skuId: SkuId }
type Unfavorited = { skuId: SkuId }
module Compaction =
Expand All @@ -20,6 +21,7 @@ module Domain =
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Folds =

type State = Events.Favorited []

type private InternalState(input: State) =
Expand Down Expand Up @@ -61,7 +63,8 @@ module Domain =
[ Events.Unfavorited { skuId = skuId } ]

type Service(log, resolveStream, ?maxAttempts) =
let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toStringN id)

let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toString id)
let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, defaultArg maxAttempts 2)
let execute (Stream stream) command : Async<unit> =
stream.Transact(Commands.interpret command)
Expand Down
8 changes: 5 additions & 3 deletions equinox-web/Domain/Aggregate.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

// NB - these types and names reflect the actual storage formats and hence need to be versioned with care
module Events =
type Compacted = { happened: bool }

type CompactedData = { happened: bool }

type Event =
| Happened
| Compacted of Compacted
| Compacted of CompactedData
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>(rejectNullaryCases=false)

module Folds =

Expand All @@ -33,6 +34,7 @@ module Commands =
type View = { sorted : bool }

type Service(handlerLog, resolve, ?maxAttempts) =

let (|AggregateId|) (id: string) = Equinox.AggregateId("Aggregate", id)
let (|Stream|) (AggregateId id) = Equinox.Stream(handlerLog, resolve id, maxAttempts = defaultArg maxAttempts 2)

Expand Down
2 changes: 1 addition & 1 deletion equinox-web/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ module Guid =
type ClientId = Guid<clientId>
and [<Measure>] clientId
module ClientId =
let toStringN (value : ClientId) : string = Guid.toStringN %value
let toString (value : ClientId) : string = Guid.toStringN %value
3 changes: 2 additions & 1 deletion equinox-web/Domain/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type View = { id: int; order: int; title: string; completed: bool }

/// Defines operations that a Controller can perform on a Todo List
type Service(handlerLog, resolve, ?maxAttempts) =

/// Maps a ClientId to the AggregateId that specifies the Stream in which the data for that client will be held
let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId("Todos", ClientId.toStringN clientId)
let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId("Todos", ClientId.toString clientId)

/// Maps a ClientId to Handler for the relevant stream
let (|Stream|) (AggregateId id) = Equinox.Stream(handlerLog, resolve id, maxAttempts = defaultArg maxAttempts 2)
Expand Down
12 changes: 6 additions & 6 deletions propulsion-consumer/Examples.fs
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ module MultiStreams =
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let tryDecode = StreamCodec.tryDecode codec
let [<Literal>] CategoryId = "SavedForLater"
let [<Literal>] categoryId = "SavedForLater"

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module Favorites =
Expand All @@ -70,7 +70,7 @@ module MultiStreams =
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let tryDecode = StreamCodec.tryDecode codec
let [<Literal>] CategoryId = "Favorites"
let [<Literal>] categoryId = "Favorites"

type Stat = Faves of int | Saves of int | OtherCategory of string * int | OtherMessage of string

Expand All @@ -85,10 +85,10 @@ module MultiStreams =
let (|FavoritesEvents|SavedForLaterEvents|OtherCategory|UnknownMessage|) (streamName, span : Propulsion.Streams.StreamSpan<byte[]>) =
let decode tryDecode = span.events |> Seq.choose (tryDecode log streamName) |> Array.ofSeq
match category streamName with
| Category (Favorites.CategoryId, id) ->
| Category (Favorites.categoryId, id) ->
let s = match faves.TryGetValue id with true, value -> value | false, _ -> new HashSet<SkuId>()
FavoritesEvents (id, s, decode Favorites.tryDecode)
| Category (SavedForLater.CategoryId, id) ->
| Category (SavedForLater.categoryId, id) ->
let s = match saves.TryGetValue id with true, value -> value | false, _ -> []
SavedForLaterEvents (id, s, decode SavedForLater.tryDecode)
| Category (categoryName, _) -> OtherCategory (categoryName, Seq.length span.events)
Expand Down Expand Up @@ -185,8 +185,8 @@ module MultiMessages =
let span = Propulsion.Codec.NewtonsoftJson.RenderedSpan.Parse spanJson
let decode tryDecode wrap = RenderedSpan.enum span |> Seq.choose (fun x -> x.event |> tryDecode log streamName |> Option.map wrap)
match streamName with
| Category (Favorites.CategoryId,_) -> yield! decode Favorites.tryDecode Fave
| Category (SavedForLater.CategoryId,_) -> yield! decode SavedForLater.tryDecode Save
| Category (Favorites.categoryId,_) -> yield! decode Favorites.tryDecode Fave
| Category (SavedForLater.categoryId,_) -> yield! decode SavedForLater.tryDecode Save
| Category (otherCategoryName,_) -> yield OtherCat (otherCategoryName, Seq.length span.e)
| _ -> yield Unclassified streamName }

Expand Down
4 changes: 2 additions & 2 deletions propulsion-consumer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -73,8 +73,8 @@ let start (args : CmdParser.Arguments) =

/// Handles command line parsing and running the program loop
// NOTE Any custom logic should go in main
let run argv =
try use consumer = argv |> CmdParser.parse |> start
let run args =
try use consumer = args |> CmdParser.parse |> start
consumer.AwaitCompletion() |> Async.RunSynchronously
if consumer.RanToCompletion then 0 else 2
with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
Expand Down
4 changes: 2 additions & 2 deletions propulsion-consumer/Publisher.fs
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,7 @@ module Input =

let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let tryDecode = StreamCodec.tryDecode codec
let [<Literal>] CategoryId = "Inventory"
let [<Literal>] categoryId = "Inventory"

module Output =

Expand Down Expand Up @@ -113,7 +113,7 @@ module Processor =

let private enumStreamEvents(KeyValue (streamName : string, spanJson)) : seq<Propulsion.Streams.StreamEvent<_>> =
match streamName with
| Category (Input.CategoryId,_) -> Propulsion.Codec.NewtonsoftJson.RenderedSpan.parse spanJson
| Category (Input.categoryId,_) -> Propulsion.Codec.NewtonsoftJson.RenderedSpan.parse spanJson
| _ -> Seq.empty

let log = Log.ForContext<Handler>()
Expand Down
4 changes: 2 additions & 2 deletions propulsion-projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -213,8 +213,8 @@ let build (args : CmdParser.Arguments) =

/// Handles command line parsing and running the program loop
// NOTE Any custom logic should go in main
let run argv =
try let sink,runSourcePipeline = argv |> CmdParser.parse |> build
let run args =
try let sink,runSourcePipeline = args |> CmdParser.parse |> build
runSourcePipeline |> Async.Start
sink.AwaitCompletion() |> Async.RunSynchronously
if sink.RanToCompletion then 0 else 2
Expand Down
2 changes: 1 addition & 1 deletion propulsion-summary-consumer/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ module Guid =
type ClientId = Guid<clientId>
and [<Measure>] clientId
module ClientId =
let toStringN (value : ClientId) : string = Guid.toStringN %value
let toString (value : ClientId) : string = Guid.toStringN %value
let parse (value : string) : ClientId = let raw = Guid.Parse value in % raw
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// Follows a feed of updates, holding the most recently observed one; each update recieved is intended to completely supersede all previous updates
/// Due to this, we should ensure that writes only happen where the update is not redundant and/or a replay of a previus message
module ConsumerTemplate.SummaryIngester
module ConsumerTemplate.Ingester

open System

Expand Down Expand Up @@ -31,7 +31,7 @@ type Outcome = NoRelevantEvents of count : int | Ok of count : int | Skipped of
type Stats(log, ?statsInterval, ?stateInterval) =
inherit Propulsion.Kafka.StreamsConsumerStats<Outcome>(log, defaultArg statsInterval (TimeSpan.FromMinutes 1.), defaultArg stateInterval (TimeSpan.FromMinutes 5.))

let mutable (ok, na, redundant) = 0, 0, 0
let mutable ok, na, redundant = 0, 0, 0

override __.HandleOk res = res |> function
| Outcome.Ok count -> ok <- ok + 1; redundant <- redundant + count - 1
Expand All @@ -51,7 +51,7 @@ let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log
{ items =
[| for x in x.items ->
{ id = x.id; order = x.order; title = x.title; completed = x.completed } |]}
let (|ClientId|) (value : string) = ClientId.parse value
let (|ClientId|) = ClientId.parse
let (|DecodeNewest|_|) (codec : FsCodec.IUnionEncoder<_,_>) (stream, span : Propulsion.Streams.StreamSpan<_>) : 'summary option =
span.events |> Seq.rev |> Seq.tryPick (StreamCodec.tryDecode codec log stream)
let ingestIncomingSummaryMessage (stream, span : Propulsion.Streams.StreamSpan<_>) : Async<Outcome> = async {
Expand Down
6 changes: 3 additions & 3 deletions propulsion-summary-consumer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -112,12 +112,12 @@ let start (args : CmdParser.Arguments) =
Jet.ConfluentKafka.FSharp.KafkaConsumerConfig.Create(
appName, args.Broker, [args.Topic], args.Group,
maxInFlightBytes = args.MaxInFlightBytes, ?statisticsInterval = args.LagFrequency)
SummaryIngester.startConsumer config Log.Logger service args.MaxDop
Ingester.startConsumer config Log.Logger service args.MaxDop

/// Handles command line parsing and running the program loop
// NOTE Any custom logic should go in main
let run argv =
try use consumer = argv |> CmdParser.parse |> start
let run args =
try use consumer = args |> CmdParser.parse |> start
consumer.AwaitCompletion() |> Async.RunSynchronously
if consumer.RanToCompletion then 0 else 2
with :? Argu.ArguParseException as e -> eprintfn "%s" e.Message; 1
Expand Down
2 changes: 1 addition & 1 deletion propulsion-summary-consumer/SummaryConsumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<None Include="README.md" />
<Compile Include="Infrastructure.fs" />
<Compile Include="TodoSummary.fs" />
<Compile Include="SumaryIngester.fs" />
<Compile Include="Ingester.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

Expand Down
24 changes: 13 additions & 11 deletions propulsion-summary-consumer/TodoSummary.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ module Events =

type ItemData = { id: int; order: int; title: string; completed: bool }
type SummaryData = { items : ItemData[] }

type IngestedData = { version : int64; value : SummaryData }
type Event =
| Ingested of {| version: int64; value : SummaryData |}
| Ingested of IngestedData
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

Expand All @@ -18,7 +18,7 @@ module Folds =
let evolve _state = function
| Events.Ingested e -> { version = e.version; value = Some e.value }
let fold (state : State) : Events.Event seq -> State = Seq.fold evolve state
let isOrigin = function _ -> true
let private isOrigin = function Events.Ingested _ -> true
// A `transmute` function gets presented with:XX
// a) events a command decided to generate (in it's `interpret`)
// b) the state after applying them
Expand All @@ -31,18 +31,20 @@ module Folds =
// and use AccessStrategy.RollingUnfolds with this `transmute` function so we instead convey:
// a) "don't actually write these events we just decided on in `interpret` [and don't insert a new event batch document]"
// b) "can you treat these events as snapshots please"
let transmute events _state : Events.Event list * Events.Event list =
let private transmute events _state : Events.Event list * Events.Event list =
[],events
// We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots
let accessStrategy = Equinox.Cosmos.AccessStrategy.RollingUnfolds (isOrigin,transmute)

module Commands =
type Command =
| Consume of version: int64 * Events.SummaryData
| Consume of version : int64 * value : Events.SummaryData

let decide command (state : Folds.State) =
match command with
| Consume (version,value) ->
if state.version <= version then false,[] else
true,[Events.Ingested {| version = version; value = value |}]
true,[Events.Ingested { version = version; value = value }]

type Item = { id: int; order: int; title: string; completed: bool }
let render : Folds.State -> Item[] = function
Expand All @@ -54,11 +56,12 @@ let render : Folds.State -> Item[] = function
completed = x.completed } |]
| _ -> [||]

let [<Literal>]categoryId = "TodoSummary"
let [<Literal>] categoryId = "TodoSummary"

/// Defines the operations that the Read side of a Controller and/or the Ingester can perform on the 'aggregate'
type Service(log, resolve, ?maxAttempts) =
let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toStringN clientId)

let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toString clientId)
let (|Stream|) (AggregateId id) = Equinox.Stream<Events.Event,Folds.State>(log, resolve id, maxAttempts = defaultArg maxAttempts 2)

let execute (Stream stream) command : Async<bool> =
Expand All @@ -74,10 +77,9 @@ type Service(log, resolve, ?maxAttempts) =
query clientId render

module Repository =

open Equinox.Cosmos // Everything until now is independent of a concrete store
let private resolve cache context =
// We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots
let accessStrategy = AccessStrategy.RollingUnfolds (Folds.isOrigin, Folds.transmute)
let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.)
Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, accessStrategy).Resolve
Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, Folds.accessStrategy).Resolve
let createService cache context = Service(Serilog.Log.ForContext<Service>(), resolve cache context)
1 change: 0 additions & 1 deletion propulsion-summary-projector/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ module StreamNameParser =
| [| category; id |] -> Category (category, id)
| _ -> Unknown streamName


module Guid =
let inline toStringN (x : Guid) = x.ToString "N"

Expand Down
2 changes: 1 addition & 1 deletion propulsion-summary-projector/Producer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module Contract =
let ofState (state : Todo.Folds.State) : SummaryEvent =
Summary { items = [| for x in state.items -> render x |]}

let (|ClientId|) (value : string) = ClientId.parse value
let (|ClientId|) = ClientId.parse

let (|Decode|) (codec : FsCodec.IUnionEncoder<_,_>) stream (span : Propulsion.Streams.StreamSpan<_>) =
span.events |> Seq.choose (StreamCodec.tryDecodeSpan codec Serilog.Log.Logger stream)
Expand Down
4 changes: 2 additions & 2 deletions propulsion-summary-projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -177,8 +177,8 @@ let build (args : CmdParser.Arguments) =

/// Handles command line parsing and running the program loop
// NOTE Any custom logic should go in main
let run argv =
try let projector,runSourcePipeline = argv |> CmdParser.parse |> build
let run args =
try let projector,runSourcePipeline = args |> CmdParser.parse |> build
runSourcePipeline |> Async.Start
projector.AwaitCompletion() |> Async.RunSynchronously
if projector.RanToCompletion then 0 else 2
Expand Down
16 changes: 9 additions & 7 deletions propulsion-summary-projector/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ module Events =
| Cleared of {| nextId: int |}
/// For EventStore, AccessStrategy.RollingSnapshots embeds these events every `batchSize` events
/// For Cosmos, AccessStrategy.Snapshot maintains this as an event in the `u`nfolds list in the Tip-document
| Snapshot of {| nextId: int; items: ItemData[] |}
| Snapshotted of {| nextId: int; items: ItemData[] |}
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

Expand All @@ -31,20 +31,21 @@ module Folds =
| Events.Updated value -> { s with items = s.items |> List.map (function { id = id } when id = value.id -> value | item -> item) }
| Events.Deleted e -> { s with items = s.items |> List.filter (fun x -> x.id <> e.id) }
| Events.Cleared e -> { nextId = e.nextId; items = [] }
| Events.Snapshot s -> { nextId = s.nextId; items = List.ofArray s.items }
| Events.Snapshotted s -> { nextId = s.nextId; items = List.ofArray s.items }
/// Folds a set of events from the store into a given `state`
let fold (state : State) : Events.Event seq -> State = Seq.fold evolve state
/// Determines whether a given event represents a checkpoint that implies we don't need to see any preceding events
let isOrigin = function Events.Cleared _ | Events.Snapshot _ -> true | _ -> false
let isOrigin = function Events.Cleared _ | Events.Snapshotted _ -> true | _ -> false
/// Prepares an Event that encodes all relevant aspects of a State such that `evolve` can rehydrate a complete State from it
let snapshot state = Events.Snapshot {| nextId = state.nextId; items = Array.ofList state.items |}
/// Allows us to slkip producing summaries for events that we know won't result in an externally discernable change to the summary output
let impliesStateChange = function Events.Snapshot _ -> false | _ -> true
let snapshot state = Events.Snapshotted {| nextId = state.nextId; items = Array.ofList state.items |}
/// Allows us to skip producing summaries for events that we know won't result in an externally discernable change to the summary output
let impliesStateChange = function Events.Snapshotted _ -> false | _ -> true

let [<Literal>]categoryId = "Todos"
let [<Literal>] categoryId = "Todos"

/// Defines operations that a Controller or Projector can perform on a Todo List
type Service(log, resolve, ?maxAttempts) =

/// Maps a ClientId to the AggregateId that specifies the Stream in which the data for that client will be held
let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toString clientId)

Expand All @@ -60,6 +61,7 @@ type Service(log, resolve, ?maxAttempts) =
queryEx clientId render

module Repository =

open Equinox.Cosmos // Everything until now is independent of a concrete store
let private resolve cache context =
let accessStrategy = AccessStrategy.Snapshot (Folds.isOrigin,Folds.snapshot)
Expand Down
4 changes: 2 additions & 2 deletions propulsion-sync/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -575,8 +575,8 @@ let build (args : CmdParser.Arguments) =

/// Handles command line parsing and running the program loop
// NOTE Any custom logic should go in main
let run argv =
try let sink,runSourcePipeline = CmdParser.parse argv |> build
let run args =
try let sink,runSourcePipeline = CmdParser.parse args |> build
runSourcePipeline |> Async.Start
sink.AwaitCompletion() |> Async.RunSynchronously
if sink.RanToCompletion then 0 else 2
Expand Down
Loading

0 comments on commit 7f509c6

Please sign in to comment.