diff --git a/CHANGELOG.md b/CHANGELOG.md index 8e7814e9d..7441d604f 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -9,6 +9,9 @@ The `Unreleased` section name is replaced by the expected version of next releas ## [Unreleased] ### Added + +- EventStore source support for `summaryProjector` [#31](https://github.com/jet/dotnet-templates/pull/31) + ### Changed ### Removed ### Fixed diff --git a/README.md b/README.md index 9e1f0a8fa..b72f5c877 100644 --- a/README.md +++ b/README.md @@ -19,7 +19,7 @@ This repo hosts the source for Jet's [`dotnet new`](https://docs.microsoft.com/e ## Templates combining usage of Equinox and Propulsion -- [`summaryProjector`](propulsion-summary-projector/README.md) - Boilerplate for an Azure CosmosDb ChangeFeedProcessor generating versioned [Summary Event](http://verraes.net/2019/05/patterns-for-decoupling-distsys-summary-event/) feed from an `Equinox.Cosmos` store using `Propulsion.Cosmos`. +- [`summaryProjector`](propulsion-summary-projector/README.md) - Boilerplate for an a Projector that can consume from a) Azure CosmosDb ChangeFeedProcessor b) EventStore generating versioned [Summary Event](http://verraes.net/2019/05/patterns-for-decoupling-distsys-summary-event/) feed from an `Equinox.Cosmos`/`.EventStore` store using `Propulsion.Cosmos`/`.EventStore`. - [`summaryConsumer`](propulsion-summary-consumer/README.md) - Boilerplate for an Apache Kafka Consumer using [`Propulsion.Kafka`](https://github.com/jet/propulsion) to ingest versioned summaries produced by a `dotnet new summaryProjector`) @@ -90,7 +90,7 @@ To use from the command line, the outline is: ## CONTRIBUTING -Please don't hesitate to [create a GitHub issue](https://github.com/jet/dotnet-templates/issues/new) for any questions so others can benefit from the discussion. For any significant planned changes or additions, please err on the side of [reaching out early](https://github.com/jet/dotnet-templates/issues/new) so we can align expectationss - there's nothing more frustrating than having your hard work not yielding a mutually agreeable result ;) +Please don't hesitate to [create a GitHub issue](https://github.com/jet/dotnet-templates/issues/new) for any questions so others can benefit from the discussion. For any significant planned changes or additions, please err on the side of [reaching out early](https://github.com/jet/dotnet-templates/issues/new) so we can align expectations - there's nothing more frustrating than having your hard work not yielding a mutually agreeable result ;) ### Contribution guidelines - `equinox-*` templates diff --git a/propulsion-summary-projector/Program.fs b/propulsion-summary-projector/Program.fs index 74ab7dc2d..2db61eaf4 100644 --- a/propulsion-summary-projector/Program.fs +++ b/propulsion-summary-projector/Program.fs @@ -1,7 +1,9 @@ module ProjectorTemplate.Program open Equinox.Cosmos +open Equinox.EventStore open Propulsion.Cosmos +open Propulsion.EventStore open Serilog open System @@ -21,6 +23,7 @@ module CmdParser = | [] MaxWriters of int | [] Verbose | [] VerboseConsole + | [] SrcEs of ParseResults | [] SrcCosmos of ParseResults interface IArgParserTemplate with member a.Usage = @@ -31,6 +34,7 @@ module CmdParser = | Verbose -> "request Verbose Logging. Default: off." | VerboseConsole -> "request Verbose Console Logging. Default: off." | SrcCosmos _ -> "specify CosmosDB input parameters." + | SrcEs _ -> "specify EventStore input parameters." and Arguments(a : ParseResults) = member __.ConsumerGroupName = a.GetResult ConsumerGroupName member __.Verbose = a.Contains Parameters.Verbose @@ -38,12 +42,24 @@ module CmdParser = member __.MaxReadAhead = a.GetResult(MaxReadAhead,64) member __.MaxConcurrentStreams = a.GetResult(MaxWriters,1024) member __.StatsInterval = TimeSpan.FromMinutes 1. - member val Source : CosmosSourceArguments = + member val Source : Choice = match a.TryGetSubCommand() with - | Some (SrcCosmos cosmos) -> CosmosSourceArguments cosmos - | _ -> raise (MissingArg "Must specify one of cosmos for Src") - member x.SourceParams() = - let srcC = x.Source + | Some (SrcEs es) -> Choice1Of2 (EsSourceArguments es) + | Some (SrcCosmos cosmos) -> Choice2Of2 (CosmosSourceArguments cosmos) + | _ -> raise (MissingArg "Must specify one of cosmos or es for Src") + member x.SourceParams() : Choice = + match x.Source with + | Choice1Of2 srcE -> + let startPos,cosmos = srcE.StartPos, srcE.CheckpointStore + Log.Information("Processing Consumer Group {groupName} from {startPos} (force: {forceRestart}) in Database {db} Container {container}", + x.ConsumerGroupName, startPos, srcE.ForceRestart, cosmos.Database, cosmos.Container) + Log.Information("Ingesting in batches of [{minBatchSize}..{batchSize}], reading up to {maxReadAhead} uncommitted batches ahead", + srcE.MinBatchSize, srcE.StartingBatchSize, x.MaxReadAhead) + Choice1Of2 (srcE,cosmos, + { groupName = x.ConsumerGroupName; start = startPos; checkpointInterval = srcE.CheckpointInterval; tailInterval = srcE.TailInterval + forceRestart = srcE.ForceRestart + batchSize = srcE.StartingBatchSize; minBatchSize = srcE.MinBatchSize; gorge = srcE.Gorge; streamReaders = srcE.StreamReaders }) + | Choice2Of2 srcC -> let disco, auxColl = match srcC.LeaseContainer with | None -> srcC.Discovery, { database = srcC.Database; container = srcC.Container + "-aux" } @@ -53,7 +69,128 @@ module CmdParser = x.ConsumerGroupName, auxColl.database, auxColl.container, srcC.MaxDocuments) if srcC.FromTail then Log.Warning("(If new projector group) Skipping projection of all existing events.") srcC.LagFrequency |> Option.iter (fun s -> Log.Information("Dumping lag stats at {lagS:n0}s intervals", s.TotalSeconds)) - srcC,(disco, auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency) + Choice2Of2 (srcC,(disco, auxColl, x.ConsumerGroupName, srcC.FromTail, srcC.MaxDocuments, srcC.LagFrequency)) + and [] EsSourceParameters = + | [] FromTail + | [] Gorge of int + | [] StreamReaders of int + | [] Tail of intervalS: float + | [] ForceRestart + | [] BatchSize of int + | [] MinBatchSize of int + | [] Position of int64 + | [] Chunk of int + | [] Percent of float + + | [] Verbose + | [] Timeout of float + | [] Retries of int + | [] HeartbeatTimeout of float + | [] Host of string + | [] Port of int + | [] Username of string + | [] Password of string + + | [] Cosmos of ParseResults + interface IArgParserTemplate with + member a.Usage = a |> function + | FromTail -> "Start the processing from the Tail" + | Gorge _ -> "Request Parallel readers phase during initial catchup, running one chunk (256MB) apart. Default: off" + | StreamReaders _ -> "number of concurrent readers that will fetch a missing stream when in tailing mode. Default: 1. TODO: IMPLEMENT!" + | Tail _ -> "attempt to read from tail at specified interval in Seconds. Default: 1" + | ForceRestart _ -> "Forget the current committed position; start from (and commit) specified position. Default: start from specified position or resume from committed." + | BatchSize _ -> "maximum item count to request from feed. Default: 4096" + | MinBatchSize _ -> "minimum item count to drop down to in reaction to read failures. Default: 512" + | Position _ -> "EventStore $all Stream Position to commence from" + | Chunk _ -> "EventStore $all Chunk to commence from" + | Percent _ -> "EventStore $all Stream Position to commence from (as a percentage of current tail position)" + + | Verbose -> "Include low level Store logging." + | Host _ -> "specify a DNS query, using Gossip-driven discovery against all A records returned. Default: envvar:EQUINOX_ES_HOST." + | Port _ -> "specify a custom port. Defaults: envvar:EQUINOX_ES_PORT, 30778." + | Username _ -> "specify a username. Default: envvar:EQUINOX_ES_USERNAME." + | Password _ -> "specify a Password. Default: envvar:EQUINOX_ES_PASSWORD." + | Timeout _ -> "specify operation timeout in seconds. Default: 20." + | Retries _ -> "specify operation retries. Default: 3." + | HeartbeatTimeout _ -> "specify heartbeat timeout in seconds. Default: 1.5." + + | Cosmos _ -> "CosmosDb Checkpoint Store parameters." + and EsSourceArguments(a : ParseResults) = + member __.Gorge = a.TryGetResult Gorge + member __.StreamReaders = a.GetResult(StreamReaders,1) + member __.TailInterval = a.GetResult(Tail,1.) |> TimeSpan.FromSeconds + member __.ForceRestart = a.Contains ForceRestart + member __.StartingBatchSize = a.GetResult(BatchSize,4096) + member __.MinBatchSize = a.GetResult(MinBatchSize,512) + member __.StartPos = + match a.TryGetResult Position, a.TryGetResult Chunk, a.TryGetResult Percent, a.Contains EsSourceParameters.FromTail with + | Some p, _, _, _ -> Absolute p + | _, Some c, _, _ -> StartPos.Chunk c + | _, _, Some p, _ -> Percentage p + | None, None, None, true -> StartPos.TailOrCheckpoint + | None, None, None, _ -> StartPos.StartOrCheckpoint + + member __.Discovery = match __.Port with Some p -> Discovery.GossipDnsCustomPort (__.Host, p) | None -> Discovery.GossipDns __.Host + member __.Host = match a.TryGetResult Host with Some x -> x | None -> envBackstop "Host" "EQUINOX_ES_HOST" + member __.Port = match a.TryGetResult Port with Some x -> Some x | None -> Environment.GetEnvironmentVariable "EQUINOX_ES_PORT" |> Option.ofObj |> Option.map int + member __.User = match a.TryGetResult Username with Some x -> x | None -> envBackstop "Username" "EQUINOX_ES_USERNAME" + member __.Password = match a.TryGetResult Password with Some x -> x | None -> envBackstop "Password" "EQUINOX_ES_PASSWORD" + member __.Timeout = a.GetResult(EsSourceParameters.Timeout,20.) |> TimeSpan.FromSeconds + member __.Retries = a.GetResult(EsSourceParameters.Retries,3) + member __.Heartbeat = a.GetResult(HeartbeatTimeout,1.5) |> TimeSpan.FromSeconds + member __.Connect(log: ILogger, storeLog: ILogger, connectionStrategy) = + let s (x : TimeSpan) = x.TotalSeconds + log.Information("EventStore {host} heartbeat: {heartbeat}s Timeout: {timeout}s Retries {retries}", __.Host, s __.Heartbeat, s __.Timeout, __.Retries) + let log=if storeLog.IsEnabled Serilog.Events.LogEventLevel.Debug then Logger.SerilogVerbose storeLog else Logger.SerilogNormal storeLog + let tags=["M", Environment.MachineName; "I", Guid.NewGuid() |> string] + Connector(__.User, __.Password, __.Timeout, __.Retries, log=log, heartbeatTimeout=__.Heartbeat, tags=tags) + .Establish("SyncTemplate", __.Discovery, connectionStrategy) |> Async.RunSynchronously + + member __.CheckpointInterval = TimeSpan.FromHours 1. + member val CheckpointStore : CosmosArguments = + match a.TryGetSubCommand() with + | Some (EsSourceParameters.Cosmos cosmos) -> CosmosArguments cosmos + | _ -> raise (MissingArg "Must specify `cosmos` checkpoint store source is `es`") + and [] CosmosParameters = + | [] Connection of string + | [] ConnectionMode of ConnectionMode + | [] Database of string + | [] Container of string + | [] Timeout of float + | [] Retries of int + | [] RetriesWaitTime of int + | [] Kafka of ParseResults + interface IArgParserTemplate with + member a.Usage = + match a with + | ConnectionMode _ -> "override the connection mode. Default: Direct." + | Connection _ -> "specify a connection string for a Cosmos account. Default: envvar:EQUINOX_COSMOS_CONNECTION." + | Database _ -> "specify a database name for Cosmos store. Default: envvar:EQUINOX_COSMOS_DATABASE." + | Container _ -> "specify a container name for Cosmos store. Default: envvar:EQUINOX_COSMOS_CONTAINER." + | Timeout _ -> "specify operation timeout in seconds. Default: 5." + | Retries _ -> "specify operation retries. Default: 1." + | RetriesWaitTime _ -> "specify max wait-time for retry when being throttled by Cosmos in seconds. Default: 5." + | Kafka _ -> "Kafka Sink parameters." + and CosmosArguments(a : ParseResults) = + member __.Mode = a.GetResult(CosmosParameters.ConnectionMode,Equinox.Cosmos.ConnectionMode.Direct) + member __.Connection = match a.TryGetResult CosmosParameters.Connection with Some x -> x | None -> envBackstop "Connection" "EQUINOX_COSMOS_CONNECTION" + member __.Database = match a.TryGetResult CosmosParameters.Database with Some x -> x | None -> envBackstop "Database" "EQUINOX_COSMOS_DATABASE" + member __.Container = match a.TryGetResult CosmosParameters.Container with Some x -> x | None -> envBackstop "Container" "EQUINOX_COSMOS_CONTAINER" + member __.Timeout = a.GetResult(CosmosParameters.Timeout,5.) |> TimeSpan.FromSeconds + member __.Retries = a.GetResult(CosmosParameters.Retries, 1) + member __.MaxRetryWaitTime = a.GetResult(CosmosParameters.RetriesWaitTime, 5) + member x.BuildConnectionDetails() = + let (Discovery.UriAndKey (endpointUri,_) as discovery) = Discovery.FromConnectionString x.Connection + Log.Information("CosmosDb {mode} {endpointUri} Database {database} Container {container}.", + x.Mode, endpointUri, x.Database, x.Container) + Log.Information("CosmosDb timeout {timeout}s; Throttling retries {retries}, max wait {maxRetryWaitTime}s", + (let t = x.Timeout in t.TotalSeconds), x.Retries, x.MaxRetryWaitTime) + let connector = Equinox.Cosmos.Connector(x.Timeout, x.Retries, x.MaxRetryWaitTime, Log.Logger, mode=x.Mode) + discovery, { database = x.Database; container = x.Container }, connector + member val Sink = + match a.TryGetSubCommand() with + | Some (CosmosParameters.Kafka kafka) -> KafkaSinkArguments kafka + | _ -> raise (MissingArg "Must specify `kafka` arguments") and [] CosmosSourceParameters = | [] FromTail | [] MaxDocuments of int @@ -150,11 +287,17 @@ module Logging = let [] appName = "ProjectorTemplate" +module EventStoreContext = + let cache = Equinox.EventStore.Caching.Cache(appName, sizeMb = 10) + let create connection = Context(connection, BatchingPolicy(maxBatchSize=500)) + let build (args : CmdParser.Arguments) = let log = Logging.initialize args.Verbose args.VerboseConsole - let (srcC,(auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) = args.SourceParams() + let src = args.SourceParams() let (discovery,cosmos,connector),(broker,topic) = - srcC.BuildConnectionDetails(),srcC.Sink.BuildTargetParams() + match src with + | Choice1Of2 (_srcE,cosmos,_spec) -> cosmos.BuildConnectionDetails(),cosmos.Sink.BuildTargetParams() + | Choice2Of2 (srcC,_srcSpec) -> srcC.BuildConnectionDetails(),srcC.Sink.BuildTargetParams() let producer = Propulsion.Kafka.Producer(Log.Logger, appName, broker, topic) let produceSummary (x : Propulsion.Codec.NewtonsoftJson.RenderedSummary) = producer.ProduceAsync(x.s, Propulsion.Codec.NewtonsoftJson.Serdes.Serialize x) @@ -163,19 +306,47 @@ let build (args : CmdParser.Arguments) = let connection = connector.Connect(appName, discovery) |> Async.RunSynchronously let context = Equinox.Cosmos.Context(connection, cosmos.database, cosmos.container) - let service = Todo.Repository.createService cache context - let handle = Handler.handleCosmosStreamEvents (service,produceSummary) - - let sink = - Propulsion.Streams.Sync.StreamsSync.Start( - log, args.MaxReadAhead, args.MaxConcurrentStreams, handle, category, - statsInterval=TimeSpan.FromMinutes 1., dumpExternalStats=producer.DumpStats) - let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq = - docs |> Seq.collect EquinoxCosmosParser.enumStreamEvents - let createObserver () = CosmosSource.CreateObserver(log, sink.StartIngester, mapToStreamItems) - sink,CosmosSource.Run(log, discovery, connector.ClientOptions, cosmos, - aux, leaseId, startFromTail, createObserver, - ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency, auxDiscovery=auxDiscovery) + match src with + | Choice1Of2 (srcE,_cosmos,spec) -> + let resolveCheckpointStream = + let codec = FsCodec.NewtonsoftJson.Codec.Create() + let caching = Equinox.Cosmos.CachingStrategy.SlidingWindow (cache, TimeSpan.FromMinutes 20.) + let transmute' e s = let e,u = Checkpoint.Folds.transmute e s in e,List.singleton u // TODO fix at source + let access = Equinox.Cosmos.AccessStrategy.RollingUnfolds (Checkpoint.Folds.isOrigin, transmute') + Equinox.Cosmos.Resolver(context, codec, Checkpoint.Folds.fold, Checkpoint.Folds.initial, caching, access).Resolve + let checkpoints = Checkpoint.CheckpointSeries(spec.groupName, log.ForContext(), resolveCheckpointStream) + let service = + let connection = srcE.Connect(log, log, ConnectionStrategy.ClusterSingle NodePreference.PreferSlave) + let context = EventStoreContext.create connection + Todo.EventStoreRepository.createService EventStoreContext.cache context + let handle = Handler.handleEventStoreStreamEvents (service,produceSummary) + + let sink = + Propulsion.Streams.Sync.StreamsSync.Start( + log, args.MaxReadAhead, args.MaxConcurrentStreams, handle, category, + statsInterval=TimeSpan.FromMinutes 1., dumpExternalStats=producer.DumpStats) + let connect () = let c = srcE.Connect(log, log, ConnectionStrategy.ClusterSingle NodePreference.PreferSlave) in c.ReadConnection + let tryMapEvent (x : EventStore.ClientAPI.ResolvedEvent) = + match x.Event with + | e when not e.IsJson || e.EventStreamId.StartsWith "$" -> None + | PropulsionStreamEvent e -> Some e + sink,EventStoreSource.Run( + log, sink, checkpoints, connect, spec, category, tryMapEvent, + args.MaxReadAhead, args.StatsInterval) + | Choice2Of2 (_srcC, (auxDiscovery, aux, leaseId, startFromTail, maxDocuments, lagFrequency)) -> + let service = Todo.CosmosRepository.createService cache context + let handle = Handler.handleCosmosStreamEvents (service,produceSummary) + + let sink = + Propulsion.Streams.Sync.StreamsSync.Start( + log, args.MaxReadAhead, args.MaxConcurrentStreams, handle, category, + statsInterval=TimeSpan.FromMinutes 1., dumpExternalStats=producer.DumpStats) + let mapToStreamItems (docs : Microsoft.Azure.Documents.Document seq) : Propulsion.Streams.StreamEvent<_> seq = + docs |> Seq.collect EquinoxCosmosParser.enumStreamEvents + let createObserver () = CosmosSource.CreateObserver(log, sink.StartIngester, mapToStreamItems) + sink,CosmosSource.Run(log, discovery, connector.ClientOptions, cosmos, + aux, leaseId, startFromTail, createObserver, + ?maxDocuments=maxDocuments, ?lagReportFreq=lagFrequency, auxDiscovery=auxDiscovery) /// Handles command line parsing and running the program loop // NOTE Any custom logic should go in main diff --git a/propulsion-summary-projector/README.md b/propulsion-summary-projector/README.md index fe2b59304..66595c696 100644 --- a/propulsion-summary-projector/README.md +++ b/propulsion-summary-projector/README.md @@ -41,4 +41,22 @@ This project was generated using: # (assuming you've scaled up enough to have >1 range, you can run a second instance in a second console with the same arguments) -2. To create a Consumer, use `dotnet new summaryConsumer` (see README therein for details) \ No newline at end of file +2. To run an instance of the Projector from EventStore + + # (either add environment variables as per step 0 or use -s/-d/-c to specify them after the `cosmos` argument token) + + $env:EQUINOX_ES_USERNAME="admin" # or use -u + $env:EQUINOX_ES_PASSWORD="changeit" # or use -p + $env:EQUINOX_ES_HOST="localhost" # or use -g + + $env:PROPULSION_KAFKA_BROKER="instance.kafka.mysite.com:9092" # or use -b + + # `default` defines the Projector Group identity - each id has separated state in the aux container (aka LeaseId) + # `es` specifies the source (if you have specified 3x EQUINOX_ES_* environment vars, no arguments are needed) + # `cosmos` specifies the destination and the checkpoint store (if you have specified 3x EQUINOX_COSMOS_* environment vars, no arguments are needed) + # `-t topic0` identifies the Kafka topic to which the Projector should write + dotnet run -- default es cosmos kafka -t topic0 + + # NB running more than one projector will cause them to duel, and is hence not advised + +3. To create a Consumer, use `dotnet new summaryConsumer` (see README therein for details) \ No newline at end of file diff --git a/propulsion-summary-projector/SummaryProjector.fsproj b/propulsion-summary-projector/SummaryProjector.fsproj index c95db0d9a..5efac9111 100644 --- a/propulsion-summary-projector/SummaryProjector.fsproj +++ b/propulsion-summary-projector/SummaryProjector.fsproj @@ -20,6 +20,7 @@ + diff --git a/propulsion-summary-projector/Todo.fs b/propulsion-summary-projector/Todo.fs index dcb388f42..3847cd77e 100644 --- a/propulsion-summary-projector/Todo.fs +++ b/propulsion-summary-projector/Todo.fs @@ -62,11 +62,19 @@ type Service(log, resolve, ?maxAttempts) = let private createService resolve = Service(Serilog.Log.ForContext(), resolve) -module Repository = +module CosmosRepository = open Equinox.Cosmos // Everything until now is independent of a concrete store let private resolve cache context = let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) let accessStrategy = AccessStrategy.Snapshot (Folds.isOrigin,Folds.snapshot) Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy, accessStrategy).Resolve + let createService cache context = resolve cache context |> createService + +module EventStoreRepository = + + open Equinox.EventStore // Everything until now is independent of a concrete store + let private resolve cache context = + let cacheStrategy = CachingStrategy.SlidingWindow (cache, System.TimeSpan.FromMinutes 20.) + Resolver(context, Events.codec, Folds.fold, Folds.initial, cacheStrategy).Resolve let createService cache context = resolve cache context |> createService \ No newline at end of file