Skip to content

Wrapper for kafka consumer which supports health check, retries and offset diff report

License

Notifications You must be signed in to change notification settings

galcohen92/kafka-consumer-manager

 
 

Repository files navigation

kafka-consumer-manager

Build Status

NPM NPM

This package is used to to simplify the common use of kafka consumer by:

  • Support multiple kafka-consumer-manager by instance creation
  • Provides support for autoCommit: false and throttling by saving messages to queues and working messages by message per partition, (concurrency level equals to the partitions number)
  • Provides api for kafka consumer offset out of sync check by checking that the offset of the partition is synced to zookeeper
  • Accepts a promise with the business logic each consumed message should go through
  • Accepts a promise function with the business logic of when to pause and when to resume the consuming
  • Provides api for sending message back to the topic (usually for retries)

Install

npm install --save kafka-consumer-manager

API

How to use

let KafkaConsumerManager = require('kafka-consumer-manager');
let configuration = {
        KafkaUrl: "localhost:9092",
        GroupId: "some-group-id",
        KafkaConnectionTimeout: 10000,
        KafkaOffsetDiffThreshold: 3,
        Topics: ["TOPIC-A", "TOPIC-B"],
        ResumePauseIntervalMs: 30000,
        ResumePauseCheckFunction: (consumer) => {
            return shouldPauseConsuming(consumer)
        },
        MessageFunction: (msg) => { return handleMessage(msg) },
        MaxMessagesInMemory: 100,
        ResumeMaxMessagesRatio: 0.25,
        CreateProducer: false
    };
(async () => {
let kafkaConsumerManager = new KafkaConsumerManager()
await kafkaConsumerManager.init(configuration)
    .then(() => {})
    })()
Configuration
  • KafkaUrl – URL of Kafka.
  • GroupId – Defines the Consumer Group this process is consuming on behalf of.
  • KafkaConnectionTimeout – Max wait time wait kafka to connect.
  • KafkaOffsetDiffThreshold – Tolerance for how far the partition offset of the consumer can be from the real offset, this value is used by the health check to reject in case the offset is out of sync.
  • Topics – Array of topics that should be consumed.
  • ResumePauseIntervalMs – Interval of when to run the ResumePauseCheckFunction (Optional).
  • ResumePauseCheckFunction – Promise that should always be resolve. In case of resolve with true value, the consumer will be resumed, if false it will be paused (Mandatory if ResumePauseIntervalMs provided). this function accepts one param (consumer).
  • MessageFunction – Promise that should always be resolve. this function applied to each consumed message, It accepts one param (message), please make sure to resolve only after messages is considered as done. Don't change the original message, it may cause it may cause unstable behaviour in getLastMessage function.
  • FetchMaxBytes – The maximum bytes to include in the message set for this partition. This helps bound the size of the response. (Default 1024^2).
  • WriteBackDelay – Delay the produced messages by ms. (optional).
  • AutoCommit – Boolean, If AutoCommit is false, the consumer will queue messages from each partition to a specific queue and will handle messages by the order and commit the offset when it's done.
  • LoggerName – String, the value of consumer_name field of the internal logger, if empty this field will not exist.
  • CreateProducer – Boolean, If CreateProducer is true it will create Producer instance.(Default true)
AutoCommit: true settings
  • MaxMessagesInMemory – If enabled, the consumer will pause after having this number of messages in memory, to lower the counter call the finishedHandlingMessage function (Optional).
  • ResumeMaxMessagesRatio – If enabled when the consumer is paused it will resume only when MaxMessagesInMemory * ResumeMaxMessagesRatio < CurrentMessagesInMemory, number should be below 1 (Optional).
AutoCommit: false settings
  • ThrottlingThreshold – If the consumer will have more messages than this value it will pause, it will resume consuming once the value is below that given threshold`.
  • ThrottlingCheckIntervalMs – The interval in ms of when to check if messages are above or below the threshold`.
  • CommitEachMessage – Boolean, If CommitEachMessage is false the commit will be each AutoCommitIntervalMs.(Default true)
  • AutoCommitIntervalMs – The interval in ms to make commit to the broker, relevant only if CommitEachMessage is false.(Default 5000)

await kafka-consumer-manager.init(configuration)

Init the consumer and the producer, make sure to pass full configuration object else you will get exceptions.

The function returns Promise.

kafka-consumer-manager.validateOffsetsAreSynced()

Runs a check the offset of the partitions are synced and moving as expected by checking progress and zookeeper offsets.

The function returns Promise.

await kafka-consumer-manager.closeConnection()

Closes the connection to kafka, return Promise.

kafka-consumer-manager.pause()

Pause the consuming of new messages.

kafka-consumer-manager.resume()

Resume the consuming of new messages.

kafka-consumer-manager.send(message, topic)

Send a message back to a topic. returns a promise.

kafka-consumer-manager.finishedHandlingMessage()

Decrease the counter of how many messages currently processed in the service, used with combine of the env params: ResumeMaxMessagesRatio and MaxMessagesInMemory Only relevant for autoCommit: true

kafka-consumer-manager.getLastMessage()

Get the last message that the consumer received. Don't change the original message, it may cause unstable behaviour in MessageFunction function.

Running Tests

Using mocha and istanbul

npm test

About

Wrapper for kafka consumer which supports health check, retries and offset diff report

Resources

License

Stars

Watchers

Forks

Releases

No releases published

Packages

No packages published

Languages

  • JavaScript 100.0%