Skip to content

Commit

Permalink
Helper naming consistency
Browse files Browse the repository at this point in the history
  • Loading branch information
bartelink committed Oct 4, 2019
1 parent 87e62f4 commit b6f0edd
Show file tree
Hide file tree
Showing 18 changed files with 65 additions and 57 deletions.
4 changes: 2 additions & 2 deletions equinox-testbed/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,10 @@ module Guid =
/// ClientId strongly typed id; represented internally as a Guid; not used for storage so rendering is not significant
type ClientId = Guid<clientId>
and [<Measure>] clientId
module ClientId = let toStringN (value : ClientId) : string = Guid.toStringN %value
module ClientId = let toString (value : ClientId) : string = Guid.toStringN %value

/// SkuId strongly typed id; represented internally as a Guid
// NB Perf is suboptimal as a key, see Equinox's samples/Store for expanded version
type SkuId = Guid<skuId>
and [<Measure>] skuId
module SkuId = let toStringN (value : SkuId) : string = Guid.toStringN %value
module SkuId = let toString (value : SkuId) : string = Guid.toStringN %value
5 changes: 4 additions & 1 deletion equinox-testbed/Services.fs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ module Domain =

// NB - these schemas reflect the actual storage formats and hence need to be versioned with care
module Events =

type Favorited = { date: System.DateTimeOffset; skuId: SkuId }
type Unfavorited = { skuId: SkuId }
module Compaction =
Expand All @@ -20,6 +21,7 @@ module Domain =
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

module Folds =

type State = Events.Favorited []

type private InternalState(input: State) =
Expand Down Expand Up @@ -61,7 +63,8 @@ module Domain =
[ Events.Unfavorited { skuId = skuId } ]

type Service(log, resolveStream, ?maxAttempts) =
let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toStringN id)

let (|AggregateId|) (id: ClientId) = Equinox.AggregateId("Favorites", ClientId.toString id)
let (|Stream|) (AggregateId id) = Equinox.Stream(log, resolveStream id, defaultArg maxAttempts 2)
let execute (Stream stream) command : Async<unit> =
stream.Transact(Commands.interpret command)
Expand Down
8 changes: 5 additions & 3 deletions equinox-web/Domain/Aggregate.fs
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,14 @@

// NB - these types and names reflect the actual storage formats and hence need to be versioned with care
module Events =
type Compacted = { happened: bool }

type CompactedData = { happened: bool }

type Event =
| Happened
| Compacted of Compacted
| Compacted of CompactedData
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>(rejectNullaryCases=false)

module Folds =

Expand All @@ -33,6 +34,7 @@ module Commands =
type View = { sorted : bool }

type Service(handlerLog, resolve, ?maxAttempts) =

let (|AggregateId|) (id: string) = Equinox.AggregateId("Aggregate", id)
let (|Stream|) (AggregateId id) = Equinox.Stream(handlerLog, resolve id, maxAttempts = defaultArg maxAttempts 2)

Expand Down
2 changes: 1 addition & 1 deletion equinox-web/Domain/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -10,4 +10,4 @@ module Guid =
type ClientId = Guid<clientId>
and [<Measure>] clientId
module ClientId =
let toStringN (value : ClientId) : string = Guid.toStringN %value
let toString (value : ClientId) : string = Guid.toStringN %value
3 changes: 2 additions & 1 deletion equinox-web/Domain/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -72,8 +72,9 @@ type View = { id: int; order: int; title: string; completed: bool }

/// Defines operations that a Controller can perform on a Todo List
type Service(handlerLog, resolve, ?maxAttempts) =

/// Maps a ClientId to the AggregateId that specifies the Stream in which the data for that client will be held
let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId("Todos", ClientId.toStringN clientId)
let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId("Todos", ClientId.toString clientId)

/// Maps a ClientId to Handler for the relevant stream
let (|Stream|) (AggregateId id) = Equinox.Stream(handlerLog, resolve id, maxAttempts = defaultArg maxAttempts 2)
Expand Down
2 changes: 1 addition & 1 deletion propulsion-summary-consumer/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -32,5 +32,5 @@ module Guid =
type ClientId = Guid<clientId>
and [<Measure>] clientId
module ClientId =
let toStringN (value : ClientId) : string = Guid.toStringN %value
let toString (value : ClientId) : string = Guid.toStringN %value
let parse (value : string) : ClientId = let raw = Guid.Parse value in % raw
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
/// Follows a feed of updates, holding the most recently observed one; each update recieved is intended to completely supersede all previous updates
/// Due to this, we should ensure that writes only happen where the update is not redundant and/or a replay of a previus message
module ConsumerTemplate.SummaryIngester
module ConsumerTemplate.Ingester

open System

Expand Down Expand Up @@ -31,7 +31,7 @@ type Outcome = NoRelevantEvents of count : int | Ok of count : int | Skipped of
type Stats(log, ?statsInterval, ?stateInterval) =
inherit Propulsion.Kafka.StreamsConsumerStats<Outcome>(log, defaultArg statsInterval (TimeSpan.FromMinutes 1.), defaultArg stateInterval (TimeSpan.FromMinutes 5.))

let mutable (ok, na, redundant) = 0, 0, 0
let mutable ok, na, redundant = 0, 0, 0

override __.HandleOk res = res |> function
| Outcome.Ok count -> ok <- ok + 1; redundant <- redundant + count - 1
Expand All @@ -51,7 +51,7 @@ let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log
{ items =
[| for x in x.items ->
{ id = x.id; order = x.order; title = x.title; completed = x.completed } |]}
let (|ClientId|) (value : string) = ClientId.parse value
let (|ClientId|) = ClientId.parse
let (|DecodeNewest|_|) (codec : FsCodec.IUnionEncoder<_,_>) (stream, span : Propulsion.Streams.StreamSpan<_>) : 'summary option =
span.events |> Seq.rev |> Seq.tryPick (StreamCodec.tryDecode codec log stream)
let ingestIncomingSummaryMessage (stream, span : Propulsion.Streams.StreamSpan<_>) : Async<Outcome> = async {
Expand Down
2 changes: 1 addition & 1 deletion propulsion-summary-consumer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ let start (args : CmdParser.Arguments) =
Jet.ConfluentKafka.FSharp.KafkaConsumerConfig.Create(
appName, args.Broker, [args.Topic], args.Group,
maxInFlightBytes = args.MaxInFlightBytes, ?statisticsInterval = args.LagFrequency)
SummaryIngester.startConsumer config Log.Logger service args.MaxDop
Ingester.startConsumer config Log.Logger service args.MaxDop

/// Handles command line parsing and running the program loop
// NOTE Any custom logic should go in main
Expand Down
2 changes: 1 addition & 1 deletion propulsion-summary-consumer/SummaryConsumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<None Include="README.md" />
<Compile Include="Infrastructure.fs" />
<Compile Include="TodoSummary.fs" />
<Compile Include="SumaryIngester.fs" />
<Compile Include="Ingester.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

Expand Down
14 changes: 8 additions & 6 deletions propulsion-summary-consumer/TodoSummary.fs
Original file line number Diff line number Diff line change
Expand Up @@ -5,9 +5,9 @@ module Events =

type ItemData = { id: int; order: int; title: string; completed: bool }
type SummaryData = { items : ItemData[] }

type IngestedData = { version : int64; value : SummaryData }
type Event =
| Ingested of {| version: int64; value : SummaryData |}
| Ingested of IngestedData
interface TypeShape.UnionContract.IUnionContract
let codec = FsCodec.NewtonsoftJson.Codec.Create<Event>()

Expand Down Expand Up @@ -36,13 +36,13 @@ module Folds =

module Commands =
type Command =
| Consume of version: int64 * Events.SummaryData
| Consume of version : int64 * value : Events.SummaryData

let decide command (state : Folds.State) =
match command with
| Consume (version,value) ->
if state.version <= version then false,[] else
true,[Events.Ingested {| version = version; value = value |}]
true,[Events.Ingested { version = version; value = value }]

type Item = { id: int; order: int; title: string; completed: bool }
let render : Folds.State -> Item[] = function
Expand All @@ -54,11 +54,12 @@ let render : Folds.State -> Item[] = function
completed = x.completed } |]
| _ -> [||]

let [<Literal>]categoryId = "TodoSummary"
let [<Literal>] categoryId = "TodoSummary"

/// Defines the operations that the Read side of a Controller and/or the Ingester can perform on the 'aggregate'
type Service(log, resolve, ?maxAttempts) =
let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toStringN clientId)

let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toString clientId)
let (|Stream|) (AggregateId id) = Equinox.Stream<Events.Event,Folds.State>(log, resolve id, maxAttempts = defaultArg maxAttempts 2)

let execute (Stream stream) command : Async<bool> =
Expand All @@ -74,6 +75,7 @@ type Service(log, resolve, ?maxAttempts) =
query clientId render

module Repository =

open Equinox.Cosmos // Everything until now is independent of a concrete store
let private resolve cache context =
// We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots
Expand Down
1 change: 0 additions & 1 deletion propulsion-summary-projector/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ module StreamNameParser =
| [| category; id |] -> Category (category, id)
| _ -> Unknown streamName


module Guid =
let inline toStringN (x : Guid) = x.ToString "N"

Expand Down
2 changes: 1 addition & 1 deletion propulsion-summary-projector/Producer.fs
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ module Contract =
let ofState (state : Todo.Folds.State) : SummaryEvent =
Summary { items = [| for x in state.items -> render x |]}

let (|ClientId|) (value : string) = ClientId.parse value
let (|ClientId|) = ClientId.parse

let (|Decode|) (codec : FsCodec.IUnionEncoder<_,_>) stream (span : Propulsion.Streams.StreamSpan<_>) =
span.events |> Seq.choose (StreamCodec.tryDecodeSpan codec Serilog.Log.Logger stream)
Expand Down
4 changes: 3 additions & 1 deletion propulsion-summary-projector/Todo.fs
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,11 @@ module Folds =
/// Allows us to slkip producing summaries for events that we know won't result in an externally discernable change to the summary output
let impliesStateChange = function Events.Snapshot _ -> false | _ -> true

let [<Literal>]categoryId = "Todos"
let [<Literal>] categoryId = "Todos"

/// Defines operations that a Controller or Projector can perform on a Todo List
type Service(log, resolve, ?maxAttempts) =

/// Maps a ClientId to the AggregateId that specifies the Stream in which the data for that client will be held
let (|AggregateId|) (clientId: ClientId) = Equinox.AggregateId(categoryId, ClientId.toString clientId)

Expand All @@ -60,6 +61,7 @@ type Service(log, resolve, ?maxAttempts) =
queryEx clientId render

module Repository =

open Equinox.Cosmos // Everything until now is independent of a concrete store
let private resolve cache context =
let accessStrategy = AccessStrategy.Snapshot (Folds.isOrigin,Folds.snapshot)
Expand Down
18 changes: 16 additions & 2 deletions propulsion-tracking-consumer/Infrastructure.fs
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,22 @@ module StreamCodec =
None
| x -> x

module Guid =
let inline toStringN (x : Guid) = x.ToString "N"
// TODO use one included in Propulsion.Kafka.Core
/// StreamsConsumer buffers and deduplicates messages from a contiguous stream with each message bearing an index.
/// The messages we consume don't have such characteristics, so we generate a fake `index` by keeping an int per stream in a dictionary
type StreamKeyEventSequencer() =
// we synthesize a monotonically increasing index to render the deduplication facility inert
let indices = System.Collections.Generic.Dictionary()
let genIndex streamName =
match indices.TryGetValue streamName with
| true, v -> let x = v + 1 in indices.[streamName] <- x; int64 x
| false, _ -> let x = 0 in indices.[streamName] <- x; int64 x

// Stuff the full content of the message into an Event record - we'll parse it when it comes out the other end in a span
member __.ToStreamEvent(KeyValue (k,v : string), ?eventType) : Propulsion.Streams.StreamEvent<byte[]> seq =
let eventType = defaultArg eventType String.Empty
let e = FsCodec.Core.IndexedEventData(genIndex k,false,eventType,System.Text.Encoding.UTF8.GetBytes v,null,DateTimeOffset.UtcNow)
Seq.singleton { stream=k; event=e }

/// SkuId strongly typed id; represented internally as a string
type SkuId = string<skuId>
Expand Down
Original file line number Diff line number Diff line change
@@ -1,12 +1,12 @@
/// Follows a feed of messages representing items being added/updated on an aggregate that maintains a list of child items
/// Compared to the SummaryIngester in the `summaryProjector` template, each event is potentially relevant
module ConsumerTemplate.SkuIngester
/// Compared to the Ingester in the `summaryProjector` template, each event is potentially relevant
module ConsumerTemplate.Ingester

open ConsumerTemplate.SkuSummary.Events
open System

/// Defines the shape of input messages on the topic we're consuming
module SkuUpdates =
module Contract =

type OrderInfo = { poNumber : string; reservedUnitQuantity : int }
type Message =
Expand Down Expand Up @@ -36,29 +36,14 @@ type Stats(log, ?statsInterval, ?stateInterval) =
log.Information(" Used {ok} Ignored {skipped}", ok, skipped)
ok <- 0; skipped <- 0

/// StreamsConsumer buffers and deduplicates messages from a contiguous stream with each message bearing an index.
/// The messages we consume don't have such characteristics, so we generate a fake `index` by keeping an int per stream in a dictionary
type MessagesByArrivalOrder() =
// we synthesize a monotonically increasing index to render the deduplication facility inert
let indices = System.Collections.Generic.Dictionary()
let genIndex streamName =
match indices.TryGetValue streamName with
| true, v -> let x = v + 1 in indices.[streamName] <- x; int64 x
| false, _ -> let x = 0 in indices.[streamName] <- x; int64 x

// Stuff the full content of the message into an Event record - we'll parse it when it comes out the other end in a span
member __.ToStreamEvent (KeyValue (k,v : string)) : Propulsion.Streams.StreamEvent<byte[]> seq =
let e = FsCodec.Core.IndexedEventData(genIndex k,false,String.Empty,System.Text.Encoding.UTF8.GetBytes v,null,DateTimeOffset.UtcNow)
Seq.singleton { stream=k; event=e }

let (|SkuId|) (value : string) = SkuId.parse value
let (|SkuId|) = SkuId.parse

/// Starts a processing loop accumulating messages by stream - each time we handle all the incoming updates for a give Sku as a single transaction
let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log : Serilog.ILogger) (service : SkuSummary.Service) maxDop =
let ingestIncomingSummaryMessage(SkuId skuId, span : Propulsion.Streams.StreamSpan<_>) : Async<Outcome> = async {
let items =
[ for e in span.events do
let x = SkuUpdates.parse e.Data
let x = Contract.parse e.Data
for o in x.purchaseOrderInfo do
yield { locationId = x.locationId
messageIndex = x.messageIndex
Expand All @@ -69,7 +54,7 @@ let startConsumer (config : Jet.ConfluentKafka.FSharp.KafkaConsumerConfig) (log
return Outcome.Completed(used,List.length items)
}
let stats = Stats(log)
// No categorization required, out inputs are all one big family defying categorization
// No categorization required, our inputs are all one big family defying categorization
let category _streamName = "Sku"
let sequencer = MessagesByArrivalOrder()
let sequencer = StreamKeyEventSequencer()
Propulsion.Kafka.StreamsConsumer.Start(log, config, sequencer.ToStreamEvent, ingestIncomingSummaryMessage, maxDop, stats, category)
2 changes: 1 addition & 1 deletion propulsion-tracking-consumer/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -112,7 +112,7 @@ let start (args : CmdParser.Arguments) =
Jet.ConfluentKafka.FSharp.KafkaConsumerConfig.Create(
appName, args.Broker, [args.Topic], args.Group,
maxInFlightBytes = args.MaxInFlightBytes, ?statisticsInterval = args.LagFrequency)
SkuIngester.startConsumer config Log.Logger service args.MaxDop
Ingester.startConsumer config Log.Logger service args.MaxDop

/// Handles command line parsing and running the program loop
// NOTE Any custom logic should go in main
Expand Down
16 changes: 8 additions & 8 deletions propulsion-tracking-consumer/SkuSummary.fs
Original file line number Diff line number Diff line change
Expand Up @@ -17,9 +17,7 @@ module Events =

module Folds =

open Events

type State = ItemData list
type State = Events.ItemData list
module State =
let equals (x : Events.ItemData) (y : Events.ItemData) =
x.locationId = y.locationId
Expand All @@ -36,10 +34,10 @@ module Folds =
| Events.Snapshotted _ -> true // Yes, a snapshot is enough info
| Events.Ingested _ -> false
let evolve state = function
| Ingested e -> e :: state
| Snapshotted items -> List.ofArray items
let fold (state : State) : Event seq -> State = Seq.fold evolve state
let snapshot (x : State) : Event = Snapshotted (Array.ofList x)
| Events.Ingested e -> e :: state
| Events.Snapshotted items -> List.ofArray items
let fold (state : State) : Events.Event seq -> State = Seq.fold evolve state
let snapshot (x : State) : Events.Event = Events.Snapshotted (Array.ofList x)

module Commands =

Expand All @@ -51,9 +49,10 @@ module Commands =
| Consume updates ->
[for x in updates do if x |> Folds.State.isNewOrUpdated state then yield Events.Ingested x]

let [<Literal>]categoryId = "SkuSummary"
let [<Literal>] categoryId = "SkuSummary"

type Service(log, resolve, ?maxAttempts) =

let (|AggregateId|) (id : SkuId) = Equinox.AggregateId(categoryId, SkuId.toString id)
let (|Stream|) (AggregateId id) = Equinox.Stream<Events.Event,Folds.State>(log, resolve id, maxAttempts = defaultArg maxAttempts 2)

Expand All @@ -74,6 +73,7 @@ type Service(log, resolve, ?maxAttempts) =
query skuId id

module Repository =

open Equinox.Cosmos // Everything until now is independent of a concrete store
let private resolve cache context =
// We don't want to write any events, so here we supply the `transmute` function to teach it how to treat our events as snapshots
Expand Down
2 changes: 1 addition & 1 deletion propulsion-tracking-consumer/TrackingConsumer.fsproj
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
<None Include="README.md" />
<Compile Include="Infrastructure.fs" />
<Compile Include="SkuSummary.fs" />
<Compile Include="SkuIngester.fs" />
<Compile Include="Ingester.fs" />
<Compile Include="Program.fs" />
</ItemGroup>

Expand Down

0 comments on commit b6f0edd

Please sign in to comment.