From 97bbad39ba78ba3f1f2b10b886e0b6347a7250c0 Mon Sep 17 00:00:00 2001 From: Ruben Bartelink Date: Mon, 12 Dec 2022 17:16:20 +0000 Subject: [PATCH] Freshen/review comments/README --- DOCUMENTATION.md | 2 +- README.md | 170 +++++++++++++++--------------- src/Propulsion.Kafka/Consumers.fs | 6 +- src/Propulsion/Propulsion.fsproj | 1 - 4 files changed, 90 insertions(+), 89 deletions(-) diff --git a/DOCUMENTATION.md b/DOCUMENTATION.md index 97cffa69..86a8a4cd 100755 --- a/DOCUMENTATION.md +++ b/DOCUMENTATION.md @@ -84,7 +84,7 @@ As noted in the [Effect of ChangeFeed on Request Charges](https://github.com/jet Propulsion provides components that enable implementing such a strategy: - A publisher that efficiently publishes events in a canonical format ('Kafka StreamSpans') (see the `proProjector` template), with stateful de-duplication of events (important given the fact that the bulk of appends involve an update to the Tip document, and the current form of the changefeed does not intrinsically expose the before and after states) -- A consumer component that consumes and decodes the 'Kafka StreamSpans' for use by a `StreamsProjector` +- A consumer component that consumes and decodes the 'Kafka StreamSpans' for use by a `StreamsSink` It's important to consider deferring the running projections "over a longer wire" until the last responsible moment given: - making the availability and performance of your Reactions and Publishing contingent on the availability and performance of your Kafka cluster should not be considered lightly (introducing another component alongside the event store intrinsically reduces the achievable SLA of the system as a whole) diff --git a/README.md b/README.md index 4d02f547..c2d8c267 100644 --- a/README.md +++ b/README.md @@ -2,36 +2,36 @@ Propulsion provides a granular suite of .NET NuGet packages for building Reactive event processing pipelines. It caters for: -- **Event Sourcing Reactions**: Handling projections and reactions based on event feeds from stores such as EventStoreDB and the Equinox Stores (DynamoStore, CosmosStore, MemoryStore) +- **Event Sourcing Reactions**: Handling projections and reactions based on event feeds from stores such as EventStoreDB and MessageDB, and the Equinox Stores (DynamoStore, CosmosStore, MemoryStore). - **Generic Ingestion and Publishing pipelines**: The same abstractions can also be used for consuming and/or publishing to any target. - **Serverless event pipelines**: The core components do not assume a long-lived process. - The `DynamoStore`-related components implement support for running an end-to-end event sourced system on Amazon DynamoDB and Lambda. -- **Unit and Integration testing support**: The `AwaitCompletion` mechanisms in `MemoryStore` and `FeedSource` provide a clean way to structure test suites in a manner that achieves good test coverage without flaky tests or slow tests. -- **Strong metrics support**: Feed Sources and Projectors provide comprehensive logging and metrics. At present, the primary integration is with Prometheus. + The `DynamoStore`-related components implement support for running an end-to-end event sourced system using only Amazon DynamoDB and Lambda without requiring a long-lived host process. +- **Unit and Integration testing support**: The `AwaitCompletion` mechanisms in `MemoryStore` and `FeedSource` provide a clean way to structure test suites in a manner that achieves high test coverage without flaky tests or slow tests. +- **Strong metrics support**: Feed Sources and Projectors provide comprehensive logging and metrics. (At present, the primary integration is with Prometheus, but the mechanism is exposed in a pluggable manner). If you're looking for a good discussion forum on these kinds of topics, look no further than the [DDD-CQRS-ES Discord](https://github.com/ddd-cqrs-es/community)'s [#equinox channel](https://discord.com/channels/514783899440775168/1002635005429825657) ([invite link](https://discord.gg/sEZGSHNNbH)). ## Core Components -- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics`, `Serilog` +- `Propulsion` [![NuGet](https://img.shields.io/nuget/v/Propulsion.svg)](https://www.nuget.org/packages/Propulsion/) Implements core functionality in a channel-independent fashion. [Depends](https://www.fuget.org/packages/Propulsion) on `MathNet.Numerics` - 1. `StreamsProjector`: High performance pipeline that handles parallelized event processing. Ingestion of events, and checkpointing of progress are handled asynchronously. Each aspect of the pipeline is decoupled such that it can be customized as desired, with good testing support. + 1. `StreamsSink`: High performance pipeline that handles parallelized event processing. Ingestion of events, and checkpointing of progress are handled asynchronously. Each aspect of the pipeline is decoupled such that it can be customized as desired. 2. `Streams.Prometheus`: Helper that exposes per-scheduler metrics for Prometheus scraping. - 3. `ParallelProjector`: Scaled down variant of `StreamsProjector` that does not preserve stream level ordering semantics + 3. `ParallelProjector`: Scaled down variant of `StreamsSink` that does not preserve stream level ordering semantics -- `Propulsion.Feed` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Feed.svg)](https://www.nuget.org/packages/Propulsion.Feed/) Provides helpers for streamwise consumption of a feed of information with an arbitrary interface (e.g. a third-party Feed API), including the maintenance of checkpoints referencing such a feed. [Depends](https://www.fuget.org/packages/Propulsion.Feed) on `Propulsion`, a `IFeedCheckpointStore` implementation (from e.g., `Propulsion.CosmosStore` or `Propulsion.DynamoStore`) +- `Propulsion.Feed` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Feed.svg)](https://www.nuget.org/packages/Propulsion.Feed/) Provides helpers for checkpointed consumption of a feed of stream-based inputs. Provides for custom bindings (e.g. a third-party Feed API) or various other input configurations (e.g. periodically correlating with inputs from a non-streamed source such as a SQL Database). Provides a generic API for checkpoint storage, with diverse implementations hosted in the sibling packages associated with each concrete store (supported stores include DynamoStore, CosmosStore, SQL Server, Postgres). [Depends](https://www.fuget.org/packages/Propulsion.Feed) on `Propulsion`, a `IFeedCheckpointStore` implementation (from e.g., `Propulsion.CosmosStore|DynamoStore|MessageDb|SqlStreamStore`) - 1. `FeedSource`: Handles continual reading and checkpointing of events from a set of feeds ('tranches' of a 'source') that collectively represent a change data capture source from a given system (roughly analogous to how a CosmosDB Container presents a changefeed). A `readTranches` function is expected to yield a `TrancheId` list; the Feed Source operates a logical reader thread per such tranche. Each individual Tranche is required to be able to represent its content as an incrementally retrievable change feed with a monotonically increasing `Index` per `FsCodec.ITimelineEvent` read from a given tranche. - 2. `Monitor.AwaitCompletion`: Enables efficient waiting for completion of reaction processing within an integration test - 3. `PeriodicSource`: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to isolate the changes since a given checkpoint (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as an `IAsyncEnumerable` of `FsCodec.StreamName * FsCodec.IEventData * context`. Checkpointing occurs only when all events have been completely ingested by the Sink. + 1. `FeedSource`: Handles continual reading and checkpointing of events from a set of feeds ('tranches') of a 'source' that collectively represent a change data capture source for a given system (roughly analogous to how a CosmosDB Container presents a changefeed). A `readTranches` function is used to identify the Tranches (sub-feeds) on startup. The Feed Source then operates a logical reader thread per Tranche. Tranches represent content as an incrementally retrievable change feed consisting of batches of `FsCodec.ITimelineEvent` records. Each batch has an optional associated checkpointing callback that's triggered only when the Sink has handled all events within it. + 2. `Monitor.AwaitCompletion`: Enables efficient waiting for completion of reaction processing within an integration test. + 3. `PeriodicSource`: Handles regular crawling of an external datasource (such as a SQL database) where there is no way to save progress and then resume from that saved token (based on either the intrinsic properties of the data, or of the store itself). The source is expected to present its content as an `IAsyncEnumerable` of `FsCodec.StreamName * FsCodec.IEventData * context`. Checkpointing occurs only when all events have been deemed handled by the Sink. 4. `SinglePassFeedSource`: Handles single pass loading of large datasets (such as a SQL database), completing when the full data has been ingested. - 5. `Prometheus`: Exposes reading statistics to Prometheus (including metrics from `DynamoStore.DynamoStoreSource`, `EventStoreDb.EventStoreSource` and `SqlStreamStore.SqlStreamStoreSource`) + 5. `Prometheus`: Exposes reading statistics to Prometheus (including metrics from `DynamoStore.DynamoStoreSource`, `EventStoreDb.EventStoreSource`, `MessageDb.MessageDbSource` and `SqlStreamStore.SqlStreamStoreSource`). (NOTE all other statistics relating to processing throughput and latency etc are exposed from the Scheduler component on the Sink side) - `Propulsion.MemoryStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MemoryStore.svg)](https://www.nuget.org/packages/Propulsion.MemoryStore/). Provides bindings to `Equinox.MemoryStore`. [Depends](https://www.fuget.org/packages/Propulsion.MemoryStore) on `Equinox.MemoryStore` v `4.0.0`, `FsCodec.Box`, `Propulsion` - 1. `MemoryStoreSource`: Forwards from an `Equinox.MemoryStore` into a `Propulsion.Sink`, in order to enable maximum efficiency integration testing. + 1. `MemoryStoreSource`: Presents a Source that adapts an `Equinox.MemoryStore` to feed into a `Propulsion.Sink`. Typically used as part of an overall test suite to enable efficient and deterministic testing where reactions are relevant to a given scenario. 2. `Monitor.AwaitCompletion`: Enables efficient **deterministic** waits for Reaction processing within integration or unit tests. - 3. `ReaderCheckpoint`: ephemeral checkpoint storage for `Propulsion.DynamoStore`/`Feed`/`EventStoreDb`/`SqlStreamSteamStore` in test contexts. + 3. `ReaderCheckpoint`: ephemeral checkpoint storage for `Propulsion.DynamoStore`/`EventStoreDb`/`Feed`/`MessageDb`/`SqlStreamSteamStore` in test contexts. ## Store-specific Components @@ -40,7 +40,7 @@ If you're looking for a good discussion forum on these kinds of topics, look no 1. `CosmosStoreSource`: reading from CosmosDb's ChangeFeed using `Microsoft.Azure.Cosmos` 2. `CosmosStoreSink`: writing to `Equinox.CosmosStore` v `4.0.0`. 3. `CosmosStorePruner`: pruning from `Equinox.CosmosStore` v `4.0.0`. - 4. `ReaderCheckpoint`: checkpoint storage for `Propulsion.EventStoreDb`/`DynamoStore`/`Feed`/`SqlStreamSteamStore` using `Equinox.CosmosStore` v `4.0.0`. + 4. `ReaderCheckpoint`: checkpoint storage for `Propulsion.DynamoStore`/`EventStoreDb`/`Feed`/`MessageDb`/`SqlStreamSteamStore` using `Equinox.CosmosStore` v `4.0.0`. (Reading and position metrics are exposed via `Propulsion.CosmosStore.Prometheus`) @@ -49,7 +49,7 @@ If you're looking for a good discussion forum on these kinds of topics, look no 1. `AppendsIndex`/`AppendsEpoch`: `Equinox.DynamoStore` aggregates that together form the Index Event Store 2. `DynamoStoreIndexer`: writes to `AppendsIndex`/`AppendsEpoch` (used by `Propulsion.DynamoStore.Indexer`, `Propulsion.Tool`) 3. `DynamoStoreSource`: reads from `AppendsIndex`/`AppendsEpoch` (see `DynamoStoreIndexer`) - 4. `ReaderCheckpoint`: checkpoint storage for `Propulsion.DynamoStore`/`Feed`/`EventStoreDb`/`SqlStreamSteamStore` using `Equinox.DynamoStore` v `4.0.0`. + 4. `ReaderCheckpoint`: checkpoint storage for `Propulsion.DynamoStore`/`EventStoreDb`/`Feed`/`MessageDb`/`SqlStreamSteamStore` using `Equinox.DynamoStore` v `4.0.0`. 5. `Monitor.AwaitCompletion`: See `Propulsion.Feed` (Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`) @@ -82,23 +82,23 @@ If you're looking for a good discussion forum on these kinds of topics, look no (Used by [`eqxShipping` template](https://github.com/jet/dotnet-templates/tree/master/equinox-shipping)) -- `Propulsion.EventStoreDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStoreDb.svg)](https://www.nuget.org/packages/Propulsion.EventStoreDb/). Provides bindings to [EventStoreDB](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink`. [Depends](https://www.fuget.org/packages/Propulsion.EventStoreDb) on `Equinox.EventStoreDb` v `4.0.0`, `Serilog` - 1. `EventStoreSource`: reading from an EventStoreDB >= `20.10` `$all` stream using the gRPC interface into a `Propulsion.Sink`. Provides throughput metrics via `Propulsion.Feed.Prometheus` +- `Propulsion.EventStoreDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStoreDb.svg)](https://www.nuget.org/packages/Propulsion.EventStoreDb/). Provides bindings to [EventStoreDB](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink`. [Depends](https://www.fuget.org/packages/Propulsion.EventStoreDb) on `Equinox.EventStoreDb` v `4.0.0` + 1. `EventStoreSource`: reading from an EventStoreDB >= `20.10` `$all` stream using the gRPC interface into a `Propulsion.Sink`. 2. `EventStoreSink`: writing to `Equinox.EventStoreDb` v `4.0.0` 3. `Monitor.AwaitCompletion`: See `Propulsion.Feed` (Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`) -- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.7.0`-`1.9.99`, `Serilog` +- `Propulsion.Kafka` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Kafka.svg)](https://www.nuget.org/packages/Propulsion.Kafka/) Provides bindings for producing and consuming both streamwise and in parallel. Includes a standard codec for use with streamwise projection and consumption, `Propulsion.Kafka.Codec.NewtonsoftJson.RenderedSpan`. [Depends](https://www.fuget.org/packages/Propulsion.Kafka) on `FsKafka` v `1.7.0`-`1.9.99` -- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/) [#181](https://github.com/jet/propulsion/pull/181), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion.Feed`, `Npgsql` >= `6.0.7` - 1. `MessageDbSource`: reading from one or more MessageDb categories into a `Propulsion.Sink` +- `Propulsion.MessageDb` [![NuGet](https://img.shields.io/nuget/v/Propulsion.MessageDb.svg)](https://www.nuget.org/packages/Propulsion.MessageDb/). Provides bindings to [MessageDb](http://docs.eventide-project.org/user-guide/message-db/), maintaining checkpoints in a postgres table [Depends](https://www.fuget.org/packages/Propulsion.MessageDb) on `Propulsion.Feed`, `Npgsql` >= `6.0.7` [#181](https://github.com/jet/propulsion/pull/181) :pray: [@nordfjord](https://github.com/nordfjord) + 1. `MessageDbSource`: reading from one or more MessageDB categories into a `Propulsion.Sink` 2. `CheckpointStore`: checkpoint storage for `Propulsion.Feed` using `Npgsql` (can be initialized via `propulsion initpg -c connstr -s schema`) -- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL table using Dapper. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion.Feed`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3`, `Serilog` +- `Propulsion.SqlStreamStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.SqlStreamStore.svg)](https://www.nuget.org/packages/Propulsion.SqlStreamStore/). Provides bindings to [SqlStreamStore](https://github.com/SQLStreamStore/SQLStreamStore), maintaining checkpoints in a SQL Server table. [Depends](https://www.fuget.org/packages/Propulsion.SqlStreamStore) on `Propulsion.Feed`, `SqlStreamStore`, `Dapper` v `2.0`, `Microsoft.Data.SqlClient` v `1.1.3` 1. `SqlStreamStoreSource`: reading from a SqlStreamStore `$all` stream into a `Propulsion.Sink` - 2. `ReaderCheckpoint`: checkpoint storage for `Propulsion.Feed`/`SqlStreamSteamStore`/`EventStoreDb` using `Dapper`, `Microsoft.Data.SqlClient` + 2. `ReaderCheckpoint`: checkpoint storage for `Propulsion.EventStoreDb`/`Feed`/`SqlStreamSteamStore` using `Dapper`, `Microsoft.Data.SqlClient` 3. `Monitor.AwaitCompletion`: See `Propulsion.Feed` (Reading and position metrics are exposed via `Propulsion.Feed.Prometheus`) @@ -109,19 +109,19 @@ The ubiquitous `Serilog` dependency is solely on the core module, not any sinks. - `Propulsion.Tool` [![Tool NuGet](https://img.shields.io/nuget/v/Propulsion.Tool.svg)](https://www.nuget.org/packages/Propulsion.Tool/): Tool used to initialize a Change Feed Processor `aux` container for `Propulsion.Cosmos` and demonstrate basic projection, including to Kafka. See [quickstart](#quickstart). - - `init` CosmosDB: Initialize an `-aux` Container for ChangeFeedProcessor + - `init`: CosmosDB: Initialize an `-aux` Container for use by the CosmosDb client library ChangeFeedProcessor - `initpg` : MessageDb: Initialize a checkpoints table in a Postgres Database - `index`: DynamoStore: validate and/or reindex DynamoStore Index - - `checkpoint`: CosmosDB/DynamoStore/EventStoreDB/Feed/SqlStreamStore: adjust checkpoints in DynamoStore/CosmosStore/Postgres - - `project`: CosmosDB/DynamoStore/EventStoreDB/MessageDb: walk change feeds/indexes and/or project to Kafka + - `checkpoint`: CosmosStore/DynamoStore/EventStoreDb/Feed/MessageDb/SqlStreamStore: adjust checkpoints in DynamoStore/CosmosStore/SQL Server/Postgres + - `project`: CosmosDB/DynamoStore/EventStoreDb/MessageDb: walk change feeds/indexes and/or project to Kafka ## Deprecated components Propulsion supports recent versions of Equinox and other Store Clients within reason - these components are -intended for use on a short term basis as a way to manage phased updates from older clients to current ones, -adjusting package references while retaining source compatibility to the maximum degree possible. +intended for use on a short term basis as a way to manage phased updates from older clients to current ones by +adjusting package references while retaining source compatibility to the maximum degree possible. -- `Propulsion.Cosmos` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/) Provides bindings to Azure CosmosDB. [Depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDB.ChangeFeedProcessor`, `Serilog` +- `Propulsion.Cosmos` [![NuGet](https://img.shields.io/nuget/v/Propulsion.Cosmos.svg)](https://www.nuget.org/packages/Propulsion.Cosmos/) Provides bindings to Azure CosmosDB. [Depends](https://www.fuget.org/packages/Propulsion.Cosmos) on `Equinox.Cosmos`, `Microsoft.Azure.DocumentDB.ChangeFeedProcessor` - **Deprecated as Equinox.CosmosStore supersedes Equinox.Cosmos** 1. `CosmosSource`: reading from CosmosDb's ChangeFeed by wrapping the [`dotnet-changefeedprocessor` library](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet). @@ -141,7 +141,7 @@ adjusting package references while retaining source compatibility to the maximum (Reading and position metrics are exposed via `Propulsion.CosmosStore.Prometheus`) -- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore` v `4.0.0`, `Serilog` +- `Propulsion.EventStore` [![NuGet](https://img.shields.io/nuget/v/Propulsion.EventStore.svg)](https://www.nuget.org/packages/Propulsion.EventStore/). Provides bindings to [EventStore](https://www.eventstore.org), writing via `Propulsion.EventStore.EventStoreSink` [Depends](https://www.fuget.org/packages/Propulsion.EventStore) on `Equinox.EventStore` v `4.0.0` - **Deprecated as reading (and writing) relies on the legacy EventStoreDB TCP interface** - Contains ultra-high throughput striped reader implementation @@ -156,13 +156,13 @@ adjusting package references while retaining source compatibility to the maximum - See [the `dotnet new` templates repo](https://github.com/jet/dotnet-templates) for examples using the packages herein: - [Propulsion-specific templates](https://github.com/jet/dotnet-templates#propulsion-related): - - `proProjector` template for `CosmosStoreSource`+`StreamsProjector` logic consuming from a CosmosDb `ChangeFeedProcessor`. + - `proProjector` template for `CosmosStoreSource`+`StreamsSink` logic consuming from a CosmosDb `ChangeFeedProcessor`. - `proProjector` template (in `--kafka` mode) for producer logic using `StreamsProducerSink` or `ParallelProducerSink`. - `proConsumer` template for example consumer logic using `ParallelConsumer` and `StreamsConsumer` etc. - [Propulsion+Equinox templates](https://github.com/jet/dotnet-templates#producerreactor-templates-combining-usage-of-equinox-and-propulsion): - - `eqxShipping`: Event-sourced example with a Process Manager. Includes a `Watchdog` component that uses a `StreamsProjector`, with example wiring for `CosmosStore`, `DynamoStore` and `EventStoreDb` - - `proCosmosReactor`. single-source `StreamsProjector` based Reactor. More legible version of `proReactor` template, currently only supports `Propulsion.CosmosStore` + - `eqxShipping`: Event-sourced example with a Process Manager. Includes a `Watchdog` component that uses a `StreamsSink`, with example wiring for `CosmosStore`, `DynamoStore` and `EventStoreDb` + - `proCosmosReactor`. single-source `StreamsSink` based Reactor. More legible version of `proReactor` template, currently only supports `Propulsion.CosmosStore` - `proReactor` generic template, supporting multiple sources and multiple processing modes - `summaryConsumer` consumes from the output of a `proReactor --kafka`, saving them in an `Equinox.CosmosStore` store - `trackingConsumer` consumes from Kafka, feeding into example Ingester logic in an `Equinox.CosmosStore` store @@ -177,16 +177,16 @@ adjusting package references while retaining source compatibility to the maximum ## The Equinox Perspective -Propulsion and Equinox have a [Yin and yang](https://en.wikipedia.org/wiki/Yin_and_yang) relationship; the use cases for both naturally interlock and overlap. +Propulsion and Equinox have a [Yin and yang](https://en.wikipedia.org/wiki/Yin_and_yang) relationship; their use cases naturally interlock and overlap. See [the Equinox Documentation's Overview Diagrams](https://github.com/jet/equinox/blob/master/DOCUMENTATION.md#overview) for the perspective from the other side (TL;DR the same topology, with elements that are de-emphasized here central over there, and vice versa) ## [C4](https://c4model.com) Context diagram -While Equinox focuses on the **Consistent Processing** element of building an event-sourced decision processing system, offering tailored components that interact with a specific **Consistent Event Store**, Propulsion elements support the building of complementary facilities as part of an overall Application. Conceptually one can group such processing based on high level roles such as: +Equinox focuses on the **Consistent Processing** element of building an event-sourced decision processing system, offering relevant components that interact with a specific **Consistent Event Store**. Propulsion elements support the building of complementary facilities as part of an overall Application. Conceptually one can group such processing based on high level roles such as: -- **Ingesters**: read stuff from outside the Bounded Context of the System. This kind of service covers aspects such as feeding reference data into **Read Models**, ingesting changes into a consistent model via **Consistent Processing**. _These services are not acting in reaction to events emanating from the **Consistent Event Store**, as opposed to..._ -- **Publishers**: react to events as they are arrive from the **Consistent Event Store** by filtering, rendering and producing to feeds for downstreams. _While these services may in some cases rely on synchronous queries via **Consistent Processing**, they are not transacting or driving follow-on work; which brings us to..._ +- **Ingesters**: gather/consume data/events from outside the Bounded Context of the System. This role covers aspects such as feeding reference data into **Read Models**, ingesting changes into a consistent model via **Consistent Processing**. _These services are not acting in reaction to events emanating from the system's **Consistent Event Store**, as opposed to..._ +- **Publishers**: react to events as they are fed from the **Consistent Event Store** by filtering, rendering and emitting to feeds for downstream systems. _While these services may in some cases rely on synchronous queries via **Consistent Processing**, they are not themselves transacting or driving follow-on work; which brings us to..._ - **Reactors**: drive reactive actions triggered based on upstream feeds, or events observed in the **Consistent Event Store**. _These services handle anything beyond the duties of **Ingesters** or **Publishers**, and will often drive follow-on processing via Process Managers and/or transacting via **Consistent Processing**. In some cases, a reactor app's function may be to progressively compose a notification for a **Publisher** to eventually publish._ The overall territory is laid out here in this [C4](https://c4model.com) System Context Diagram: @@ -270,7 +270,7 @@ See [CONTRIBUTING.md](CONTRIBUTING.md) ## TEMPLATES -The best place to start, sample-wise is with the [QuickStart](#quickstart), which walks you through sample code, tuned for approachability, from `dotnet new` templates stored [in a dedicated repo](https://github.com/jet/dotnet-templates). +The best place to start, sample-wise is with the the [Equinox QuickStart](https://github.com/jet/equinox#quickstart), which walks you through sample code, tuned for approachability, from `dotnet new` templates stored [in a dedicated repo](https://github.com/jet/dotnet-templates). ## BUILDING @@ -291,22 +291,22 @@ dotnet build build.proj -v n Well, Kafka is definitely not a critical component or a panacea. -You're correct that the bulk of things that can be achieved using Kafka can be accomplished via usage of the ChangeFeed. One thing to point out is that in the context of enterprise systems, having a well maintained Kafka cluster does have less incremental cost that it might do if you're building a smaller system from nothing. +You're correct that the bulk of things that can be achieved using Kafka can be accomplished via usage of the ChangeFeed. One thing to point out is that in the context of enterprise systems, having a well maintained Kafka cluster does have less incremental (or total) cost than it might do if you're building a smaller system from nothing. -Some of the negatives of consuming from the CF direct: +Some negatives of consuming from the ChangeFeed directly: -- each CFP reader imposes RU charges (its a set of continuous queries against each and every physical range of which the Cosmos Container is composed) +- each CFP reader induces RU consumption (its a set of continuous queries against each and every physical partition of which the Cosmos Container is composed) - you can't apply a server-side filter, so you pay to see the full content of any document that's touched - there's an elevated risk of implementation shortcuts that couple the reaction logic to low level specifics of the store or the data structures - (as you alluded to), if there's some logic or work involved in the production of events you'd emit to Kafka, each consumer would need to duplicate that -While many of these concerns can be alleviated to varying degrees by splitting the storage up into multiple Containers such that each consumer will intrinsically be interested in a large proportion of the data it will observe (potentially using database level RU allocations), the write amplification effects of having multiple consumers will always be more significant when reading directly than when using Kafka, the design of which is well suited to running lots of concurrent readers. +Many of these concerns can be alleviated to varying degrees by splitting the storage up into multiple Containers (potentially using database level RU allocations) such that each consumer will intrinsically be interested in a large proportion of the data it will observe, the write amplification effects of having multiple consumers will always be more significant when reading directly than when having a single reader emit to Kafka. The design of Kafka is specifically geared to running lots of concurrent readers. -Splitting event categories into Containers solely to optimize these effects can also make the management of the transactional workload more complex; the ideal for any given Container is to balance the concerns of: +However, splitting event categories into Containers solely to optimize these effects can also make the management of the transactional workload more complex; the ideal for any given Container is thus to balance the concerns of: -- ensuring that datasets for which you want to ringfence availability / RU allocations don't share with containers/databases for which running hot (potentially significant levels of rate limiting but overall high throughput in aggregate as a result of using a high percentage of the allocated capacity) -- avoiding prematurely splitting data prior to it being required by the constraints of CosmosDB (i.e. you want to let splitting primarily be driven by reaching the [10GB] physical partition range) -- not having logical partition hotspots that lead to a small number of physical partitions having significantly above average RU consumption +- ensuring that datasets for which you want to ringfence availability / RU allocations don't share with containers/databases for which running hot (potentially significant levels of rate limiting but overall high throughput in aggregate as a result of maximizing the percentage of the allocated capacity that's being used over time) +- avoiding prematurely splitting data prior to it being required by the constraints of CosmosDB (i.e. you want to let splitting primarily be driven by reaching the [10GB] physical partition size limit) +- not having logical partition hotspots that lead to a small subset of physical partitions having significantly above average RU consumption - having relatively consistent document sizes - economies of scale - if each container (or database if you provision at that level) needs to be individually managed (with a degree of headroom to ensure availability for load spikes etc), you'll tend to require higher aggregate RU assignment for a given overall workload based on a topology that has more containers @@ -314,31 +314,31 @@ Splitting event categories into Containers solely to optimize these effects can > I know for unit testing, I can just test the obvious parts. Or if end to end testing is even required -Depends what you want to achieve. One important technique for doing end-to-end scenarios, especially where some reaction is supposed to feed back into Equinox is to use `Equinox.MemoryStore` as the store, and then wire Propulsion to consume from that using `Propulsion.MemoryStore.MemoryStoreProjector`. +Depends what you want to achieve. One important technique for doing end-to-end scenarios, especially where some reaction is supposed to feed back into Equinox is to use `Equinox.MemoryStore` as the store, and then wire the Propulsion Sink (that will be fed from your real store when deployed in a production scenario) consume from that using `Propulsion.MemoryStore.MemoryStoreProjector`. Other techniques I've seen/heard are: -- rig things to use ephemeral ESDB or Cosmos databases (the CosmosDB emulator works but has restrictions; perhaps you can use serverless or database level RU allocated DBs in a shared environment) to run your system against an ephemeral store. +- rig things to use ephemeral ESDB or Cosmos databases (the CosmosDB emulator works but has restrictions; perhaps you can use serverless or database level RU allocated DBs in a shared environment) to run your system with an isolated throwaway storage with better performance, stability and/or cost properties for test purposes. - Once you have a store, the next question is how to validate your projector (Publisher / Reactor) apps. In some cases, people opt to spin up a large subset of the production system (maybe in docker-compose etc) and then check for externally visible effects in tests. - While it's important to do end-to-end tests with as much of the whole system as possible, that does tend to make for a messy test suite that quickly becomes unmaintainable. In general, the solution is to do smaller test scenarios that achieve that same goal by triangulating on subsets of the overall reactions as smaller scenarios. See the `Shipping.Watchdog.Integration` test suite in the `equinox-shipping` template for an example. In general I'd be looking to use `MemoryStoreProjector` as a default technique, as it provides: - the best performance by far (all synchronous and in-memory, without any simulators) -- a deterministic wait mechanism; after arranging a particular system state, you can pause until a reaction has been processed by using the projector's `AwaitCompletion` facility to efficiently wait for the exact moment at which the event has been handled by the reactor component without padded Sleep seqences (or, worse, retry loops). +- a deterministic wait mechanism; after arranging a particular system state, you can pause until a reaction has been processed by using the projector's `AwaitCompletion` facility to efficiently wait for the exact moment at which the event has been handled by the reactor component without padded Sleep sequences (or, worse: retry loops). To answer more completely, I'd say given a scenario involving Propulsion and Equinox, you'll typically have the following ingredients: -1. writing to the store - you can either assume that's well-tested infra or say you want to know you wired it up properly -2. serialization/deserialization - you can either have unit tests and/or property tests to validate roundtripping as an orthogonal concern, or you can take the view that it's critical to know it really works with real data -3. reading from feed, propagating to handler - that's harder to config and has the biggest variability in a test scenario so either: +1. writing to the store - you can either assume that's well-tested infra or take the view that you need to validate that you wired it up properly +2. serialization/deserialization - you can either have unit tests and/or property tests to validate round-tripping as an orthogonal concern, or you can take the view that it's critical to know it really works with real data +3. reading from the store's change feed and propagating to handler - that's harder to config and has the biggest variability in a test scenario so either: - you want to take it out of the equation - OR you want to know its wired properly -4. does handler work complete cleanly - yes you can and should unit test that, but maybe you want to know it works end-to-end with a much larger proportion of the overall system in play +4. validating that triggered reactions are handled and complete cleanly - yes you can and should unit test that, but maybe you want to know it works end-to-end with a much larger proportion of the overall system in play -5. does it trigger follow-on work, i.e. a cascade of reactions. You can either do triangulation and say its proven if I observe the trigger for the next bit, or you can want to prove it end to end +5. does it trigger follow-on work, i.e. a cascade of reactions. You can either do triangulation and say its proven if I observe the trigger for the next bit, or you may want to prove that end to end -6. does the entire system as a whole really work - sometimes you want to be able to validate workflows rather than having to pay the tax of going in the front door for every aspect (though you'll typically want to have a meaningful set of smoke tests that validate basic system integrity without requiring manual testing) +6. does the entire system as a whole really work - sometimes you want to be able to validate workflows rather than having to pay the complexity tax of going in the front door for every aspect (though you'll typically want to have a meaningful set of smoke tests that validate basic system integrity without requiring manual testing or back-door interfaces) ### Any reason you didn’t use one of the different subscription models available in ESDB? :pray: [@James Booth](https://github.com/absolutejam) @@ -346,34 +346,34 @@ To answer more completely, I'd say given a scenario involving Propulsion and Equ While the implementation and patterns in Propulsion happen to overlap to a degree with the use cases of the ESDB's subscription mechanisms, the main reason they are not used directly stems from the needs and constraints that Propulsion was evolved to cover. -One thing that should be clear is that Propulsion is definitely *not* solving for the need of being the simplest conceivable projection library with a low concept count that's easy to get started with. If you're looking to build such a library, you'll likely give yourself some important guiding non-goals, e.g., if you had to add 3 concepts to get a 50% improvement in throughput, whether or not that's worth it depends on the context. +One thing that should be clear is that Propulsion is definitely *not* attempting to be the simplest conceivable projection library with a low concept count that's easy to get started with. If you were looking to build such a library, you'll likely give yourself some important guiding non-goals to enable that, e.g., if you had to add 3 concepts to get a 50% improvement in throughput, whether or not that's worth it depends on the context - if you're trying to have a low concept count, you might be prepared to leave some performance on the table to enable that. -For Propulsion, almost literally, job one was to be able to shift 1TB of ordered events in streams to/from ESDB/Cosmos/Kafka in well under 24h - a naive implementation reading and writing in small batches takes more like 24d to do the same thing. A secondary goal is to keep them in sync continually after that point (it's definitely more than a one time bulk ingestion system). +For Propulsion, almost literally, job one was to be able to shift 1TB of ordered events in streams to/from ESDB/Cosmos/Kafka in well under 24h - a naive implementation reading and writing in small batches takes more like 24d to do the same thing. A key secondary goal was to be able to keep them in sync continually after that point (it's definitely more than a one time bulk ingestion system). While Propulsion scales down to running simple subscriptions, its got quite a few additional concepts compared to using something built literally for that exact job; the general case of arbitrary projections was almost literally an afterthought. -That's not to say that Propulsion's concepts make for a more complex system when all is said and done; there are lots of scenarios where you avoid having to do concurrent/async tricks one might otherwise do more explicitly in a more basic subscription system. +That's not to say that Propulsion's concepts make for a more complex system when all is said and done; there are lots of scenarios where you avoid having to do concurrent/async tricks one might otherwise do more explicitly in a more simplistic subscription system. -When looking at the vast majority of typical projections/reactions/denormalizers one runs in an event-sourced system it should come as no surprise that EventStoreDB's subscription features offer lots of excellent ways of achieving those common goals with a good balance of: +When looking at the vast majority of typical projections/reactions/denormalizers one runs in an event-sourced system it should come as no surprise that EventStoreDB's subscription features offer plenty ways of achieving those common goals with a good balance of: - time to implement - ease of operation - good enough performance -_That's literally the company's goal: enabling rapidly building systems to solve business problems without overfitting to any specific industry or application's needs_. +_That's literally the company's goal: enabling rapidly building systems to solve business problems, without overfitting to any specific industry or application's needs_. -The potential upsides that Propulsion can offer when used as a projection system can definitely be valuable _when actually needed_, but on average, they'll equally they can be overkill for a given specific requirement. +The potential upsides that Propulsion can offer when used as a projection system can definitely be valuable _when actually needed_, but on average, they can equally be overkill for a given specific requirement. -With that context set, here are some notable aspects of using Propulsion for Projectors rather than building a minimal bespoke wiring on a case by case basis: -- similar APIs regardless of whether events arrive via CosmosDB, DynamoDB, EventStoreDB or Kafka etc +With that context set, here are some notable aspects of using Propulsion for Projectors rather than building bespoke wiring on a case by case basis: +- similar APIs designs and concepts regardless of whether events arrive via CosmosDB, DynamoDB, EventStoreDB, MessageDB, SqlStreamStore, Kafka (or custom sources via `Propulsion.Feed`) - consistent dashboards across all those sources - generally excellent performance for high throughput scenarios (it was built for that) -- good handling for processing of workloads that don't have uniform (and low) cost per handler invocation, i.e., rate-limited writes of events to `Equinox.CosmosStore` (compared to e.g. using a store such as Redis) -- orthogonality to Equinox features but still offering a degree of commonality of concepts and terminology -- provide a degree of isolation from the low level drivers, e.g.: +- good handling for processing of workloads that don't have uniform (and low) cost per handler invocation, i.e., rate-limited writes of events to `Equinox.CosmosStore` or `Equinox.DynamoStore` (compared to e.g. using a store such as Redis) +- orthogonality to Equinox features while still offering a degree of commonality of concepts and terminology +- provide a degree of isolation from the low level drivers, e.g. - moving from the deprecated Cosmos CFP V2 to any future `Azure.Cosmos` V4 SDK will be a matter of changing package references and fixing some minimal compilation errors, as opposed to learning a whole new API set - - moving from EventStore's TCP API / EventStore.Client to the gRPC based >= v20 clients is a package switch - - migrating a workload from EventStoreDB to CosmosDB or vice versa can be accomplished more cleanly if you're only changing the wiring of your projector host while making no changes to the handler implementations - - SqlStreamStore fits logically in as well; using it gives a cleaner mix and match / onramp to/from ESDB (Note however that migrating SSS <-> ESDB is a relatively trivial operation vs migrating from raw EventStore usage to Equinox.CosmosStore, i.e. "we're using Propulsion to isolate us from deciding between SSS or ESDB" is not a good enough reason on its own) + - moving from EventStore's TCP API / `EventStore.ClientAPI` to the gRPC based >= v20 clients is a simple package switch + - migrating a workload from EventStoreDB to or from CosmosDB or DynamoDN can be accomplished more cleanly if you're only changing the wiring of your projector host while making no changes to the handler implementations or the bulk of the reactor applicatino + - SqlStreamStore and MessageDB also fit into the overall picture; using Propulsion can gives a cleaner mix and match / onramp to/from ESDB (Note however that migrating SSS <-> ESDB is a relatively trivial operation vs migrating from raw EventStore usage to `Equinox.CosmosStore` or `Equinox.DynamoStore`, i.e. "we're using Propulsion to isolate us from deciding between SSS or ESDB" may not be a good enough reason on its own) - Specifically when consuming from CosmosDB, being able to do that over a longer wire by feeding to Kafka to limit RU consumption from projections is a relatively minor change. #### A Brief History of Propulsion's feature set @@ -383,10 +383,11 @@ The order in which the need for various components arose (as a side effect of bu - `Propulsion.Cosmos`'s `Source` was the first bit done; it's a light wrapper over the [CFP V2 client](https://github.com/Azure/azure-documentdb-changefeedprocessor-dotnet). Key implications from that are: - order of events in a logical partition can and should be maintained - global ordering of events across all logical streams is not achievable due to how CosmosDB works (the only ordering guarantees are at logical partition level, the data can physically split at any time as data grows) - - `Propulsion.Kafka`'s `Sink` was next; the central goal here is be able to replicate events being read from CosmosDB onto a Kafka Topic _maintaining the ordering guarantees_. Implications: - - There are two high level ways of achieving ordering guarantees in Kafka: - 1. only ever have a single event in flight; only when you've got the ack for a write do you send the next one. *However, literally doing that compromises throughput massively*. - 2. use Kafka's transaction facilities (not implemented in `Confluent.Kafka` at the time) + - `Propulsion.Kafka`'s `Sink` was next; the central goal here is to be able to replicate events being read from CosmosDB onto a Kafka Topic _maintaining the ordering guarantees_. + There are two high level ways of achieving ordering guarantees in Kafka: + 1. only ever have a single event in flight; only when you've got the ack for a write do you send the next one. *However, literally doing that compromises throughput massively*. + 2. use Kafka's transaction facilities (not implemented in `Confluent.Kafka` at the time) + => The approach used is to continuously emit messages concurrently in order to maintain throughput, but guarantee to never emit messages for the same _key_ at the same time. - `Propulsion.Cosmos`'s `Sink` was next up. It writes to CosmosDB using `Equinox.CosmosStore`. Key implications: - because rate-limiting is at the physical partition level, it's crucial for throughput that you keep other partitions busy while wait/retry loops are triggered by hotspots (and you absolutely don't want to exacerbate this effect by competing with yourself) @@ -400,30 +401,31 @@ The order in which the need for various components arose (as a side effect of bu You'll often need a small batch size, which implies larger per-event checkpointing overhead unless you make the checkpointing asynchronous - The implementation thus: - - manages reading async from writing in order to maintain throughput (you define a batch size _and_ a number of batches to read ahead) + => The implementation thus: + - manages reading asynchronously from the writing in order to maintain throughput (you define a batch size _and_ a number of batches to read ahead) - schedules write attempts at stream level (the reader concurrently ingests successor events, making all buffered events available when retrying) - writes checkpoints asynchronously as and when all the items involved complete within the (stream-level) processing - - At the point where `Propulsion.EventStore`'s `Source` and `Sink` were being implemented (within weeks of the `CosmosStore` equivalents; largely overlapping), the implications from realizing goals of providing good throughput while avoiding adding new concepts if it can be avoided are: + - At the point where `Propulsion.EventStore`'s `Source` and `Sink` were being implemented (within weeks of the `CosmosStore` equivalents; largely overlapping), the implications from realizing goals of providing good throughput while avoiding adding new concepts if that can be avoided are: - The cheapest (lowest impact in terms of triggering scattered reads across disks on an ESDB server, with associated latency implications) and most general API set for reading events is to read the `$all` stream - - Maintaining checkpoints in an EventStoreDB that you're also monitoring is prone to feedback effects (so using the Async checkpointing strategy used for `.CosmosStore` but saving them in an external store such as an `Equinox.CosmosStore` makes sense) - - If handlers and/or sinks don't have uniform processing time per message and/or are subject to rate limiting, most of the constraints of the `CosmosSStoreink` apply too; you don't want to sit around retrying the last request out of a batch of 100 while tens of thousands of provisioned RUs are sitting idle in Cosmos and throughput is stalled + - Maintaining checkpoints in an EventStoreDB that you're also monitoring is prone to feedback effects (so using the Async checkpointing strategy used for `.CosmosStore` but saving them in an external store such as an `Equinox.CosmosStore` makes sense) + - If handlers and/or sinks don't have uniform processing time per message and/or are subject to rate limiting, most of the constraints of the `CosmosSStoreink` apply too; you don't want to sit around retrying the last request out of a batch of 100 while tens of thousands of provisioned RUs are sitting idle in Cosmos with throughput sitting close to zero #### Conclusion/comparison checklist The things Propulsion in general accomplishes in the projections space: -- Uniform dashboards for throughput, successes vs failures, and latency distributions over CosmosDB, DynamoDB, EventStoreDB, Kafka and generic Feeds -- Metrics to support trustworthy alerting and detailed analysis of busy, failing and stuck projections +- Uniform dashboards for throughput, successes vs failures, and latency distributions over CosmosDB, DynamoDB, EventStoreDB, MessageDb, SqlStreamStore, Kafka (and custom application-specific Feeds via `Propulsion.Feed`) +- Source-neutral metrics to support trustworthy alerting and detailed analysis of busy, failing and stuck projections - reading, checkpointing, parsing and running handlers are all independent asynchronous activities -- enable handlers to handle backlog of accumulated items for a stream as a batch if desired +- enables handlers to handle backlog of accumulated items for a stream as a batch if desired - maximize concurrency across streams -- (for CosmosDB, but could be achieved for EventStoreDB) provides for running multiple instances of consumers leasing physical partitions roughly how Kafka does it (aka the ChangeFeedProcessor lease management - Propulsion just wraps that and does not seek to impose any significant semantics on top of it) +- strong intrinsic support for handling idempotent processing in the face of at least once delivery and/or catchup scenarios +- (for CosmosDB, but could be achieved for EventStoreDB, DynamoStore and MessageDB via a generic set of extensions to `Propulsion.Feed`) provides for running multiple instances of consumers leasing physical partitions roughly how Kafka does it (aka the ChangeFeedProcessor lease management - Propulsion just wraps that and does not seek to impose any significant semantics on top of it) - provide good instrumentation as to latency, errors, throughput in a pluggable way akin to how Equinox does stuff (e.g. it has built-in Prometheus support) +- handlers/reactors/projections can be ported from `Propulsion.Cosmos` to `Propulsion.CosmosStore` by swapping driver modules; similar to how `Equinox.Cosmos` vs `Equinox.EventStore` provides a common programming model despite the underpinnings being fundamentally quite different in nature - good stories for isolating from specific drivers - i.e., there's `Propulsion.Cosmos` (using the V2 SDK) and a `Propulsion.CosmosStore` (for the V3 SDK) with close-to-identical interfaces (Similarly there's a `Propulsion.EventStoreDb` using the gRPC-based SDKs, replacing the deprecated `Propulsion.EventStore`) -- handlers/reactors/the projections can be ported from `Propulsion.Cosmos` to `Propulsion.CosmosStore` by swapping driver modules; similar to how `Equinox.Cosmos` vs `Equinox.EventStore` provides a common programming model despite the underpinnings being fundamentally quite different in nature - Kafka reading and writing generally fits within the same patterns - i.e. if you want to push CosmosDb CFP output to Kafka and consume over that as a 'longer wire' without placing extra load on the source if you have 50 consumers, you can stand up a ~250 line `dotnet new proProjector` app, and tweak the ~30 lines of consumer app wireup to connect to Kafka instead of CosmosDB -Things EventStoreDB's subscriptions can do that are not covered in Propulsion: +Things EventStoreDB's subscriptions can do that are not presently covered in Propulsion: - `$et-`, `$ec-` streams - honoring the full `$all` order - and more; EventStoreDB is a well designed purpose-built solution catering for diverse system sizes and industries diff --git a/src/Propulsion.Kafka/Consumers.fs b/src/Propulsion.Kafka/Consumers.fs index 89d4980a..a1690975 100644 --- a/src/Propulsion.Kafka/Consumers.fs +++ b/src/Propulsion.Kafka/Consumers.fs @@ -249,7 +249,7 @@ module Core = ?logExternalState = logExternalState, ?purgeInterval = purgeInterval, ?wakeForResults = wakeForResults, ?idleDelay = idleDelay) - // Maps a (potentially `null`) message key to a valid {Category}-{StreamId} StreamName for routing and/or propagation through StreamsProjector + // Maps a (potentially `null`) message key to a valid {Category}-{StreamId} StreamName for routing and/or propagation through StreamsSink let parseMessageKey defaultCategory = function | null -> FsCodec.StreamName.create defaultCategory "" | key -> StreamName.parseWithDefaultCategory defaultCategory key @@ -272,8 +272,8 @@ module Core = let context = { topic = result.Topic; partition = Binding.partitionValue result.Partition; offset = Binding.offsetValue result.Offset } (ReadOnlyMemory data, box context) -/// StreamsProjector buffers and deduplicates messages from a contiguous stream with each event bearing a monotonically incrementing `index`. -/// Where the messages we consume don't have such characteristics, we need to maintain a fake `index` by keeping an int per stream in a dictionary +/// StreamsSink buffers and deduplicates messages from a contiguous stream with each event bearing a monotonically incrementing `Index`. +/// Where the messages we consume don't have such characteristics, we need to maintain a fake `Index` by keeping an int per stream in a dictionary /// Does not need to be thread-safe as the Propulsion.Submission.SubmissionEngine does not unpack input messages/documents in parallel. type StreamNameSequenceGenerator() = diff --git a/src/Propulsion/Propulsion.fsproj b/src/Propulsion/Propulsion.fsproj index 3197bd5e..37897a8e 100644 --- a/src/Propulsion/Propulsion.fsproj +++ b/src/Propulsion/Propulsion.fsproj @@ -21,7 +21,6 @@ -