Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

proProjector: Add ES checkpoint storage mode #81

Open
wants to merge 3 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,8 @@ The `Unreleased` section name is replaced by the expected version of next releas

### Added

- `eqxProjector --source cosmos --kafka --synthesizeSequence`: Sample code for custom parsing of document changes [#84](https://github.com/jet/dotnet-templates/pull/84)
- `eqxProjector`: Added `--source cosmos --kafka --synthesizeSequence`: Sample code for custom parsing of document changes [#84](https://github.com/jet/dotnet-templates/pull/84)
- `proProjector`: Added `es` checkpoint storage mode [#81](https://github.com/jet/dotnet-templates/pull/81)

### Changed
### Removed
Expand Down
114 changes: 74 additions & 40 deletions propulsion-projector/Program.fs
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,10 @@ module Args =
let private defaultWithEnvVar varName argName = function
| None -> getEnvVarForArgumentOrThrow varName argName
| Some x -> x
#if esdb
//#if esdb
let private isEnvVarTrue varName =
EnvVar.tryGet varName |> Option.exists (fun s -> String.Equals(s, bool.TrueString, StringComparison.OrdinalIgnoreCase))
#endif
//#endif
let private seconds (x : TimeSpan) = x.TotalSeconds
open Argu
#if cosmos
Expand Down Expand Up @@ -105,7 +105,7 @@ module Args =
let connector = Connector(x.Timeout, x.Retries, x.MaxRetryWaitTime, Log.Logger, mode=x.Mode)
discovery, { database = x.Database; container = x.Container }, connector
#endif
#if esdb
//#if esdb
open Equinox.EventStore
open Propulsion.EventStore
type [<NoEquality; NoComparison>] EsSourceParameters =
Expand All @@ -130,6 +130,7 @@ module Args =
| [<AltCommandLine "-p">] Password of string

| [<CliPrefix(CliPrefix.None); Unique(*ExactlyOnce is not supported*); Last>] Cosmos of ParseResults<CosmosParameters>
| [<CliPrefix(CliPrefix.None); Unique(*ExactlyOnce is not supported*); Last>] Es
interface IArgParserTemplate with
member a.Usage = a |> function
| FromTail -> "Start the processing from the Tail"
Expand All @@ -153,6 +154,7 @@ module Args =
| HeartbeatTimeout _ -> "specify heartbeat timeout in seconds. Default: 1.5."

| Cosmos _ -> "CosmosDB (Checkpoint) Store parameters."
| Es -> "Request storage of checkpoints in EventStore (not recommended due to feedback effects)."
and EsSourceArguments(a : ParseResults<EsSourceParameters>) =
let discovery (host, port, tcp) =
match tcp, port with
Expand Down Expand Up @@ -192,10 +194,11 @@ module Args =
.Connect(appName, discovery, nodePreference) |> Async.RunSynchronously

member __.CheckpointInterval = TimeSpan.FromHours 1.
member val Cosmos : CosmosArguments =
member val Checkpoints : Choice<CosmosArguments, unit> =
match a.TryGetSubCommand() with
| Some (EsSourceParameters.Cosmos cosmos) -> CosmosArguments cosmos
| _ -> raise (MissingArg "Must specify `cosmos` checkpoint store when source is `es`")
| Some (EsSourceParameters.Cosmos cosmos) -> Choice1Of2 (CosmosArguments cosmos)
| Some (EsSourceParameters.Es) -> Choice2Of2 ()
| _ -> raise (MissingArg "Must specify `cosmos` or `es` checkpoint store when source is `es`")
and [<NoEquality; NoComparison>] CosmosParameters =
| [<AltCommandLine "-s">] Connection of string
| [<AltCommandLine "-m">] ConnectionMode of Equinox.Cosmos.ConnectionMode
Expand Down Expand Up @@ -230,8 +233,8 @@ module Args =
seconds x.Timeout, x.Retries, seconds x.MaxRetryWaitTime)
let connector = Equinox.Cosmos.Connector(x.Timeout, x.Retries, x.MaxRetryWaitTime, Log.Logger, mode=x.Mode)
discovery, x.Database, x.Container, connector
#endif
//#if sss
//#endif
#if sss
// TOCONSIDER: add DB connectors other than MsSql
type [<NoEquality; NoComparison>] SqlStreamStoreSourceParameters =
| [<AltCommandLine "-t"; Unique>] Tail of intervalS: float
Expand Down Expand Up @@ -272,7 +275,7 @@ module Args =
let sssConnectionString = String.Join(";", conn, creds)
Log.Information("SqlStreamStore MsSql Connection {connectionString} Schema {schema} AutoCreate {autoCreate}", conn, schema, autoCreate)
Equinox.SqlStreamStore.MsSql.Connector(sssConnectionString, schema, autoCreate=autoCreate).Connect() |> Async.RunSynchronously
//#endif
#endif // sss

[<NoEquality; NoComparison>]
type Parameters =
Expand All @@ -288,12 +291,12 @@ module Args =
#if cosmos
| [<CliPrefix(CliPrefix.None); Last>] Cosmos of ParseResults<CosmosParameters>
#endif
#if esdb
//#if esdb
| [<CliPrefix(CliPrefix.None); Last>] Es of ParseResults<EsSourceParameters>
#endif
//#if sss
| [<CliPrefix(CliPrefix.None); AltCommandLine "ms"; Last>] SqlMs of ParseResults<SqlStreamStoreSourceParameters>
//#endif
#if sss
| [<CliPrefix(CliPrefix.None); AltCommandLine "ms"; Last>] SqlMs of ParseResults<SqlStreamStoreSourceParameters>
#endif
interface IArgParserTemplate with
member a.Usage =
match a with
Expand All @@ -308,12 +311,12 @@ module Args =
#if cosmos
| Cosmos _ -> "specify CosmosDb input parameters"
#endif
#if esdb
//#if esdb
| Es _ -> "specify EventStore input parameters."
#endif
//#if sss
| SqlMs _ -> "specify SqlStreamStore input parameters."
//#endif
#if sss
| SqlMs _ -> "specify SqlStreamStore input parameters."
#endif
and Arguments(a : ParseResults<Parameters>) =
member __.Verbose = a.Contains Verbose
member __.ConsumerGroupName = a.GetResult ConsumerGroupName
Expand All @@ -336,21 +339,29 @@ module Args =
c.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds))
{ database = c.Database; container = c.AuxContainerName }, __.ConsumerGroupName, c.FromTail, c.MaxDocuments, c.LagFrequency
#endif
#if esdb
//#if esdb
member val Es = EsSourceArguments(a.GetResult Es)
member __.BuildEventStoreParams() =
let srcE = __.Es
let startPos, cosmos = srcE.StartPos, srcE.Cosmos
let es = __.Es
Log.Information("Processing Consumer Group {groupName} from {startPos} (force: {forceRestart}) in Database {db} Container {container}",
__.ConsumerGroupName, startPos, srcE.ForceRestart, cosmos.Database, cosmos.Container)
__.ConsumerGroupName, es.StartPos, es.ForceRestart)
Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}], reading up to {maxReadAhead} uncommitted batches ahead",
srcE.MinBatchSize, srcE.StartingBatchSize, __.MaxReadAhead)
srcE, cosmos,
{ groupName = __.ConsumerGroupName; start = startPos; checkpointInterval = srcE.CheckpointInterval; tailInterval = srcE.TailInterval
forceRestart = srcE.ForceRestart
batchSize = srcE.StartingBatchSize; minBatchSize = srcE.MinBatchSize; gorge = srcE.Gorge; streamReaders = 0 }
#endif
//#if sss
es.MinBatchSize, es.StartingBatchSize, __.MaxReadAhead)
es,
{ groupName = __.ConsumerGroupName; start = es.StartPos; checkpointInterval = es.CheckpointInterval; tailInterval = es.TailInterval
forceRestart = es.ForceRestart
batchSize = es.StartingBatchSize; minBatchSize = es.MinBatchSize; gorge = es.Gorge; streamReaders = 0 }
member __.BuildCheckpointStoreParams() =
let es = __.Es
match es.Checkpoints with
| Choice1Of2 cosmos ->
Log.Information("Checkpointing in Database {db} Container {container}", cosmos.Database, cosmos.Container)
Choice1Of2 cosmos
| Choice2Of2 () ->
Log.Information("Checkpointing in source EventStore")
Choice2Of2 ()
//#endif
#if sss
member val SqlStreamStore = SqlStreamStoreSourceArguments(a.GetResult SqlMs)
member __.BuildSqlStreamStoreParams() =
let src = __.SqlStreamStore
Expand All @@ -359,7 +370,7 @@ module Args =
maxBatchSize = src.MaxBatchSize
tailSleepInterval = src.TailInterval }
src, spec
//#endif
#endif
//#if kafka
member val Target = TargetInfo a
and TargetInfo(a : ParseResults<Parameters>) =
Expand All @@ -380,17 +391,35 @@ module Checkpoints =

open Propulsion.EventStore

let codec = FsCodec.NewtonsoftJson.Codec.Create<Checkpoint.Events.Event>()

// In this implementation, we keep the checkpoints in Cosmos when consuming from EventStore
module Cosmos =

let codec = FsCodec.NewtonsoftJson.Codec.Create<Checkpoint.Events.Event>()
let access = Equinox.Cosmos.AccessStrategy.Custom (Checkpoint.Fold.isOrigin, Checkpoint.Fold.transmute)
let create groupName (context, cache) =
let caching = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let resolver = Equinox.Cosmos.Resolver(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access)
let resolve streamName = resolver.Resolve(streamName, Equinox.AllowStale)
Checkpoint.CheckpointSeries(groupName, resolve)

// Alternately, one can keep checkpoints in EventStore, but that brings two factors into play:
// - the feedback effect of each write of a checkpoint triggering a read and so on will increase database traffic significantly
// - you'll want to limit the maximum retained events for checkpoint streams to 1 (or some other low number)
module EventStore =

let access = Equinox.EventStore.AccessStrategy.LatestKnownEvent
let create groupName (context, cache) =
let caching = Equinox.EventStore.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.)
let resolver = Equinox.EventStore.Resolver(context, codec, Checkpoint.Fold.fold, Checkpoint.Fold.initial, caching, access)
let resolve streamName = resolver.Resolve(streamName, Equinox.AllowStale)
Checkpoint.CheckpointSeries(groupName, resolve)

module EventStoreContext =

let create client =
Equinox.EventStore.Context(Equinox.EventStore.Connection(client), Equinox.EventStore.BatchingPolicy(maxBatchSize=500))

module CosmosContext =

let create appName (connector : Equinox.Cosmos.Connector) discovery (database, container) =
Expand Down Expand Up @@ -425,16 +454,21 @@ let build (args : Args.Arguments) =
aux, leaseId, startFromTail, createObserver,
?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency)
#endif // cosmos
#if esdb
let (srcE, cosmos, spec) = args.BuildEventStoreParams()

//#if esdb
let (srcE, spec) = args.BuildEventStoreParams()
let connectEs () = srcE.Connect(Log.Logger, Log.Logger, AppName, Equinox.EventStore.NodePreference.Master)
let (discovery, database, container, connector) = cosmos.BuildConnectionDetails()

let context = CosmosContext.create AppName connector discovery (database, container)
let cache = Equinox.Cache(AppName, sizeMb=10)

let checkpoints = Checkpoints.Cosmos.create spec.groupName (context, cache)
let checkpoints =
match args.BuildCheckpointStoreParams() with
| Choice1Of2 cosmos ->
let (discovery, database, container, connector) = cosmos.BuildConnectionDetails()
let context = CosmosContext.create AppName connector discovery (database, container)
Checkpoints.Cosmos.create spec.groupName (context, cache)
| Choice2Of2 () ->
let esClient = connectEs ()
let context = EventStoreContext.create esClient
Checkpoints.EventStore.create spec.groupName (context, cache)

#if kafka // esdb && kafka
let (broker, topic) = args.Target.BuildTargetParams()
Expand All @@ -450,8 +484,8 @@ let build (args : Args.Arguments) =
Propulsion.EventStore.EventStoreSource.Run(
Log.Logger, sink, checkpoints, connectEs, spec, Handler.tryMapEvent filterByStreamName,
args.MaxReadAhead, args.StatsInterval)
#endif // esdb
//#if sss
//#endif // esdb
#if sss
let (srcSql, spec) = args.BuildSqlStreamStoreParams()

let monitored = srcSql.Connect()
Expand All @@ -469,7 +503,7 @@ let build (args : Args.Arguments) =
let sink = Propulsion.Streams.StreamsProjector.Start(Log.Logger, maxReadAhead, maxConcurrentStreams, Handler.handle, stats, args.StatsInterval)
#endif // sss && !kafka
let pipeline = Propulsion.SqlStreamStore.SqlStreamStoreSource.Run(Log.Logger, monitored, checkpointer, spec, sink, args.StatsInterval)
//#endif // sss
#endif // sss
sink, pipeline

let run args = async {
Expand Down