Skip to content

Service Overview

Dong Lin edited this page Jan 18, 2018 · 10 revisions

The most flexible way to start kafka-monitor is to run kafka-monitor-start.sh with a config file, which allows you to instantiate multiple Service or App that are already implemented in Kafka Monitor and tune their configs to monitor your clusters. Each service has its own thread or scheduler to carry out pre-defined tasks, e.g. produce message or consume message.

In this section we introduce some Service classes that have been implemented in Kafka Monitor. See Service Configuration for their configs.

ProduceService

ProduceService produces messages at regular interval to each partition of a fabricated topic. By producing messages at regular interval, ProduceService is able to measure the availability of produce service provided by Kafka as a fraction number. Furthermore, the message payload contains incremental integers and timestamp, which can be used by the ConsumeService to measure e.g., end-to-end latency and message loss.

In order for ProduceService to measure cluster availability accurately, the topic should be created in advance with the same replication factor as that of most other topics. And the leadership of its partitions ideally should be evenly distributed to every broker in the cluster. User can use TopicManagementService to make sure that the fabricated topic meets these requirement.

By default, ProduceService uses new producer to produce message. User can implement a thin wrapper around their existing producer implementation that implements interface com.linkedin.kmf.producer.KMBaseProducer, to run ProduceService with their custom producer implementation.

To measure availability of produce service, ProduceService keeps track of the message produce_rate and error_rate. Error_rate will be incremented if an exception is thrown and caught when producer produces messages. Availability is measured as average of per-partition availability. per-partition availability is measured as produce_rate/(produce_rate + error_rate) where the rate is measure over the past 60 seconds. If there is no exception and no message produced in the past 60 seconds, both produce_rate + error_rate will be 0 and we consider availability of this partition to be 1. This ensures that ProduceService will print exception in the log for further investigation if availability drops below 1. Note that Apache Kafka producer should throw exception if message can not be sent within request.timeout.ms * (retries + 1) + retry.backoff.ms * retries.

ConsumeService

ConsumeService consumes messages from a topic. The messages should be produced by ProduceService. Using incremental integers and timestamp provided in the message payload, ConsumeService is able to measure the message loss rate, message duplicated rate, end-to-end latency etc. Further, ConsumeService can report ConsumeAvailability, defined as the percentage of consumes messages that are either lost or whose latency is within the user-specified consume.latency.sla.ms.

ConsumeService has built-in support for old consumer and new consumer. User can run ConsumeService with their choice of consumer and configuration. User can also implement a thin wrapper around their existing consumer implementation that implements interface com.linkedin.kmf.consumer.KMBaseConsumer, to run ConsumeService with their custom consumer implementation.

In order to measure the message loss rate and message duplicated rate, ProduceService produces messages with integer index in the message payload. This integer index is incremented by 1 for every successful send per partition. ConsumeService reads index from message payload, and compares the index with the last index observed from the same partition, to determine whether there is lost or duplicated message.

In order to measure end-to-end latency, message payload should contain timestamp at the time the message is constructed. ConsumeService parses the message to obtain the timestamp, and determines the end-to-end latency by subtracting message receive time by this timestamp. This allows user to monitor the latency of a pipeline of Kafka clusters connected by Mirror Makers.

TopicManagementService

TopicManagementService manages the monitor topic of a cluster ensure that every broker is leader of at least one partition of the monitor topic, so that produce-availability metric will drop below 1 if any broker fails. In order to achieve this goal, TopicManagementService monitors the number of brokers, number of partitions, partition assignment across broker and leader distribution across brokers. It may expand partition, reassign partition or trigger preferred leader election if needed.

TopicMangementService can create topic with user-specified config (e.g. replication factor and partition-to-broker ratio) if the fabricated topic doesn't exist yet. By default it creates topic using the Apache Kafka's API. Users can also implement the interface com.linkedin.kmf.topicfactory.TopicFactory to provide custom topic creation logic, e.g., one that setups access-control-list for the fabricated topic in their cluster.

TopicManagementService will also automatically trigger preferred leader election, expand partitions of the monitor topic, or reassign partitions across brokers to ensure that the leadership of partitions of the monitor topic is evenly distributed across brokers. For example, if a new broker is added to the Kafka cluster, TopicManagementService will reassign some partition of the monitor topic to the new broker, and expand partition of the monitor if necessary to ensure that partitionNum > topic-management.partitionsToBrokersRatio * brokerNum.

Clone this wiki locally