The Consumer Freshness Tracker is a tool to track the "freshness" of a consumer - how far behind in time the consumer is from the head (Log End Offset) of a queue. This is separate from Burrow to allow it to track the timestamp and then do a delta to the LEO. Internally, it leverages both Burrow and direct queries against Kafka.
Metrics are exported via Prometheus at localhost:8081/metrics
and continuously updated, until stopped.
- Commit Lag
- The amount of time the most recent committed offset has spent in the queue before it is marked committed
- Freshness
- Difference between when the most recent committed offset first entered the queue and the timestamp of the most recent added offset (the Log End Offset - LEO - in Kafka parlance).
- JVM
- Burrow
bazel build //kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness:ConsumerFreshness_deploy.jar
java -jar bazel-bin/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness_deploy.jar --help
Copy this jar to the desired path.
bazel-bin/kafka_consumer_freshness_tracker/src/main/java/com/tesla/data/consumer/freshness/ConsumerFreshness_deploy.jar
Note: Publishing pre-compiled binary is on the roadmap.
Bundled as a jar you can run the Tracker with a configuration file with the --conf <path to conf>
flag. For local
testing, you can also just add the --once
flag to just run the Tracker once, and then dump the metrics to the console,
before stopping.
Configuration is read as a YAML file.
Because the Tracker reads from Burrow and Kafka, you need to configure both. It only supports reading from a single cluster currently. Below is a minimal configuration:
burrow:
url: "http://burrow.example.com"
clusters:
- name: logs-cluster
kafka:
bootstrap.servers: "l1.example.com:9092, l2.example.com:9092, l3.example.com:9092"
- name: metrics-cluster
kafka:
bootstrap.servers: "m1.example.com:9092, m2.example.com:9092, m3.example.com:9092"
The Kafka configs are passed directly to the consumer, so you can add any standard Kafka consumers configs as needed.
port
- Port over which to expose metrics
- Default: 8081
frequency_sec
- Number of seconds between execution intervals. Should be approximately the same as the Burrow topic scrape interval
- Default: 30
workerThreadCount
- Number of concurrent workers accessing kafka
- Default: 15
numConsumers
- Number of consumer instances to create per-Kafka cluster. Applicable under each cluster, for example:
clusters: - name: analytics numConsumers: 10 kafka: ...
- Default: 15
- For smaller clusters or multiple clusters, you will want to adjust this to manage memory allocation (each consumer takes a non-trivial chunk of memory, which can add up when monitoring multiple clusters).
Logically, the implementation is fairly simple - scrape burrow for the current state of all the consumers. Then for each consumer, ask Kafka for the timestamp of the committed offset and the latest offset, then calculate freshness and queue time.
Our implementation is slightly more complicated because running single threaded is too slow. The KafkaConsumer API is not thread safe, thus we have a little dance so that each compute task waits for an available KafkaConsumer, does its work, and then hands it back to the Tracker for use by the next compute task.