Skip to content

Latest commit



420 lines (292 loc) · 14.6 KB

File metadata and controls

420 lines (292 loc) · 14.6 KB



  • Run build steps on Node 16 instead of Node 10. Minimum Node is now 12.0.0.
  • Keep a build step running on Node 12 for backwards compatibility.

All packages

  • Update most dependencies to the latest patch version.


  • Fix: message encoding/decoding inconsistency between @azure/storage-queue (in 1.4.0) and azure-storage (in 1.3.0 and lower).


  • Add logs to composite input source to debug erroring sources.


  • Add a PubSub source.


  • Remove the result of call.getPeer() from the metrics tags to reduce the cardinality of metrics.


  • Add an optional client id prefix for the automatically generated clientId.
  • Add an optional parameter that specifies additional header names to be pulled into the message metadata.


  • Replace shortid with nanoid for generating the consumerId.



  • Switch from Travis to Github Actions


  • Expose IBlobClient interface

    export interface IBlobClient extends IRequireInitialization {
        createContainerIfNotExists(context?: SpanContext): Promise<boolean>;
        write(context: SpanContext, blobId: string, content: Buffer | string): Promise<void>;
        readAsText(context: SpanContext, blobId: string): Promise<string>;
        exists(context: SpanContext, blobId: string): Promise<boolean>;
        deleteFolderIfExists(context: SpanContext, folderId: string): Promise<boolean>;
        deleteBlobIfExists(context: SpanContext, blobId: string): Promise<boolean>;
        listBlobs(context: SpanContext, prefix: string): AsyncIterableIterator<string>;
  • Run integration tests via emulators for Cosmos DB and Azure Storage in a Windows VM

  • Expose ICosmosDocument field ttl in order to allow setting time-to-live for Cosmos DB documents

  • Fix: don't extract data field in QueueInputSource when encoder isEmbeddable

  • Fix: handle the url and connectionString fields of IQueueConfiguration correctly as separate fields


  • Log a detailed error message when an error occurs while inserting data into BiqQuery
  • Fix: pass error.message instead of error object to increment metric


  • Fix: pass correct time value to the RequestProcessingTime timing metric


  • Add TLS support
  • Expose compression modes setting in kafkaSink: None, Gzip, Snappy, LZ4
  • Expose KafkaJS Client Configuration fields: connectionTimeout and requestTimeout
  • Fix: properly connect/disconnect KafkaJS producer
  • Fix: properly export IRawKafkaMessage


  • Update mssql from 4.x.x to 6.x.x
  • Expose connection configuration fields: connectionTimeout, requestTimeout and stream
  • Add schema support for Sproc and Table mode
  • Add bulk insert support for Table mode


  • Fix: properly export IPrometheusConfiguration


  • Fix: redis client no longer stops processing messages in a batch when it encounters an invalid message
  • Fix: force application to restart on unexpected redis disconnect because reconnect doesn't seem to work properly
  • Fix: log correct error when redis ack fails
  • Fix: pass RedisError type instead of Error object to increment metric



  • fixed bug where BoundedPriorityQueue could de-prioritize elements under certain circumstances which could lead to out-of-order processing
  • deprecated support for Node v8


  • detect stale broker metadata and crash kafkajs client; this seems to be a bug in kafkajs where the metadata can get stale and all reconnect attempts of kafkajs will fail.


  • allow negative values for histograms


  • less verbose logging


  • new module with sink and source for AMQP compatible message buses


  • support embedding encoded protobuf data as base64 in JSON documents


  • support multiple streams for source and sink
  • added missing metrics
  • fix messages getting lost when receiving in batches


  • support accessing multiple cosmos collection from within the same service
  • added support for dead letter queues for Azure Queues


  • fixed sink swallows actual error message when a transaction fails to commit



Richer API for Retries

The retry feature that was added in version 1.1 has been extended to allow message handlers and sinks to get access to metadata about the retry loop and overwrite retry intervals.

export class MyMessageHandler {
    public async function onSomeInput(msg: SomeInput, ctx: IDispatchContext) {
        try {
            // currentAttempt, maxAttempts, isFinalAttempt
            // are automatically included in all errors
            // logged by Cookie Cutter
  "attempting to invoke API", {
                currentAttempt: ctx.retry.currentAttempt,
                maxAttempts: ctx.retry.maxAttempts,
                isFinalAttempt: ctx.retry.isFinalAttempt(),

            await someApi.invoke(msg);
        } catch (e) {
            if (e instanceof ServiceUnavailableError) {
            } else if (e instanceof RetryLaterError) {

            throw e;

export class MySink implements IOutputSink<IPublishedMessage> {
    public async sink(items: IterableIterator<IPublishedMessage>, retry: IRetrierContext) {
        // use `retry` the same way as in message handler example above

Evicting In-Flight Messages

Kafka's rebalance protocol dictates that every consumer must complete processing all in-flight messages and commit its offsets before acknowledging a rebalance back to the broker. In order to be compliant with this requirement we added a new feature for input sources that allows them to evict pending messages from the input queue and wait for all messages, that are currently being processed, to complete.

Full compliance for the Kafka input source is currently still blocked by an open ticket in the kafkajs side. There is currently a race condition where evicted items may still be around after the synchronization barrier for the rebalance was crossed.

export class MyInputSource implements IInputSource {
    public async *start(ctx: IInputSourceContext): AsyncIterableIterator<MessageRef> {
        // --- PSEUDO code for a Kafka client ---

        this.client.onRebalance(async () => {
            // this will clear all messages from the input queue
            // and wait for all other messages to complete processing
            await ctx.evict((msg) => isFromKafkaInputSource(msg));
            await this.client.commitOffsets();

        while (const msg = await {
            yield new MessageRef({}, msg);


You can now test applications with materialized views more easily.

function createTestApp(): IApplicationBuilder {
    return Application.create()
        .dispatch(new MessageHandler())
            mockMaterializedState(Customer, {
                "customer-1": new Customer({ name: "John Doe" }),
                "customer-2": new Customer({ name: "Jane Doe" }),

Better Throughout in RPC Mode

When a Cookie Cutter Applications runs in RPC mode and encounters a Sequence Conflict, it will no longer retry all messages that follow the one that triggered the Sequence Conflict. However, Cookie Cutter still guarantees that the output state is correct, even when competing events are handled in parallel.

Mode Guarantees Notes
Serial * Ordering
* Correctness of State
Inefficient due to serial nature of I/O, this mode should generally not be used
Concurrent * Ordering
* Correctness of State
Efficient I/O with guaranteed ordering, good for stream processing
RPC * Correctness of State Efficient I/O, handling multiple requests in parallel while guaranteeing correctness of state -> good for gRPC services

Custom Handling of Invalid Messages

It is now possible to customize how invalid messages are handled. The example below reroutes all invalid messages to a dedicated Kafka queue.

        .add(new SomeInput())
        invalid: async (msg: IMessage, ctx: IDispatchContext) {
            ctx.publish(msg.payload, { [KafkaMetadata.Topic]: "invalid-messages" });


  • use the new retry API (see above) to overwrite the next retry interval when reaching RU limits with Microsoft's suggested retry interval from the HTTP header of the response
  • upgrade to @cosmos/azure version 3.0
  • expose url as configuration for Blob Storage (useful for testing with Microsoft's Emulator Docker image)
  • support for message pre-processing for Azure Queue data (similar to the pre-processor in the Kafka module)
  • Allow opt-in to creating Azure queues dynamically on write


  • Implemented eviction of pending messages with new mechanism described above. KafkaJS currently does not provide an async callback on rebalance, so this is still not compliant with Kafka's rebalance protocol until that is resolved (see tulios/kafkajs#592), but should already make rebalances smoother for slow consumers and avoid committed offsets moving backwards
  • support multiple bootstrap brokers
  • make session timeout configurable so slow consumers don't trigger unnecessary rebalances.


  • implemented workaround for watch stopping to work after some time period


  • better error message when connection fails
  • automatically retry intermittent errors


  • initial support for collecting metrics via Prometheus



Redesign of Retry Logic

The retry logic was redesigned to make it easier to distinguish between errors that should be retried and those that shouldn't. Sinks receive a bail function as a parameter now that can be called when retries should stop.

class MySink implements IOutputSink<IPublishedMessage> {
    public async sink(items: IterableIterator<IPublishedMessage>, bail: (err: any) => never) {
        try {
            await someOperation(...);
        } catch (e) {
            if (e instanceof SomethingThatDoesNotMakeSenseToRetry) {

            throw e; // this will be retried

The same concept can be applied to message handlers

class MyHandler {
    public async function onSomeInput(msg: ISomeInput, ctx: IDispatchContext) {
        if (msg.apiVersion > 2) {
            ctx.bail(new Error("API version not supported"));

In addition to this the number of retries per message is now configurable via IApplicationBehavior and the ErrorHandlingModes were changed to

export enum ErrorHandlingMode {
    LogAndContinue = 1,

LogAndRetry maps to an infinite number of retries (the application fails when a non-retriable error is encountered). LogAndRetryOrContinue vs. LogAndRetryOrFail define what is supposed to happen if the allowed number of reties is exceeded (either fail the application or continue with the next message). The default number of retries is 5. LogAndContinue and LogAndFail work the same but set the retries to zero.

Metric for Number of Concurrently Active Message Handlers in RPC Mode

A new gauge metric is emitted when running in RPC mode that shows the number of concurrently active message handler functions.


The DefaultComponentContext class was added that can be used as a default by any implementor of IRequireInitialization until initialize is called. This is mainly handy for unit tests as it removes the requirement to explicitly initialize components with NullLoggers, NullMetrics, etc ...


Timeout for BlobClient

The timeout for put requests is now configurable for the Blob Storage client.

JSON Embeddings

JSON Embeddings are now supported by all Azure Cosmos sinks. When the JsonMessageEncoder is used for any sink the document in Cosmos DB will properly reflect the JSON vs. storing the JSON in serialized form as a UInt8Array.


  • LZ4 compressed messages are supported out of the box now
  • new metrics for low/high watermarks, committed offsets and current lag are emitted
  • kafkajs was upgraded to 1.10.0


  • Kubernetes Validation Hooks are now supported


  • improve definition of 'required' to catch empty strings


  • initial version


  • initial version


  • initial version


  • initial release

Bug Fixes


  • 1.0.1: ensure SequenceConflictError excludes its 'context' property when serialized
  • 1.0.2: add missing optimistic concurrency check when event handler does not produce any output


  • 1.0.1: handle response being null/undefined in BlobClient callbacks
  • 1.0.2: update QueueInputSource to support protobuf encoded data that is stored as an object
  • 1.0.3: fixed typo in variable name in upsert stored procedure
  • 1.0.4: support new optimistic concurrency checks (see core v1.0.2)
  • 1.0.5: upgrade @cookie-cutter/core dependency
  • 1.0.6: fixed infinite loop in materialized view sink introduced with 1.0.4
  • 1.0.7: fixed bad deployment of 1.0.6 package


  • 1.0.1: upgrade to @instana/[email protected] for bug fix for errors with cyclic data
  • 1.0.2: upgrade @cookie-cutter/core dependency


  • 1.0.1: fixed Kafka headers aren't properly decoded
  • 1.0.2: upgrade @cookie-cutter/core dependency
  • 1.0.3: fixed dispose function to check whether the kafka client was initialized before closing it
  • 1.0.4: fixed span hydration to correctly use default headers
  • 1.0.5: fixed snappy codec setup that did not work properly
  • 1.0.6: fixed bad deployment of 1.0.5 package


  • 1.0.1: upgrade @cookie-cutter/core dependency


  • 1.0.1: upgrade @cookie-cutter/core dependency
  • 1.0.2: fixed dispose function to check whether the mssql client was initialized before closing it


  • 1.0.1: upgrade @cookie-cutter/core dependency


  • 1.0.1: upgrade @cookie-cutter/core dependency


  • 1.0.1: upgrade @cookie-cutter/core dependency
  • 1.0.2: fixed dispose function to check whether the statsd client was initialized before closing it


  • 1.0.1: upgrade @cookie-cutter/core dependency


  • 1.0.1: upgrade @cookie-cutter/core dependency