Skip to content

Commit

Permalink
Add burrow module from wikimedia
Browse files Browse the repository at this point in the history
  • Loading branch information
paladox committed Oct 4, 2024
1 parent aefe56e commit d3cb6f7
Show file tree
Hide file tree
Showing 11 changed files with 586 additions and 0 deletions.
1 change: 1 addition & 0 deletions manifests/site.pp
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,7 @@
node 'kafka181.wikitide.net' {
include base
include role::kafka
include role::burrow
}

node 'ldap171.wikitide.net' {
Expand Down
184 changes: 184 additions & 0 deletions modules/burrow/files/check_kafka_consumer_lag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,184 @@
#!/usr/bin/python
# SPDX-License-Identifier: BSD-2-Clause
# -*- coding: utf-8-*-

# NOTE: This file is managed by puppet.
# This script was copied from https://github.com/williamsjj/kafka_health

####################################################################
# FILENAME: check_kafka_consumer_group.py
# PROJECT: kafka_health
# DESCRIPTION: Nagios check for monitoring Kafka consumer groups
# via a Burrow server.
#
####################################################################
# (C)2016 DigiTar Inc., All Rights Reserved
# Licensed under the BSD license.
####################################################################

from argparse import ArgumentParser
import sys
import urllib2
import time
import json

NAGIOS_CRITICAL = 2
NAGIOS_WARNING = 1
NAGIOS_OK = 0
NAGIOS_UNKNOWN = -1

STATUS_MSG_PREFIX = {
NAGIOS_CRITICAL: "CRITICAL",
NAGIOS_WARNING: "WARNING",
NAGIOS_OK: "OK",
NAGIOS_UNKNOWN: "UNKNOWN"
}

parser = ArgumentParser(
description="Monitor a Kafka consumer group using Burrow. "
"(https://github.com/linkedin/Burrow)"
)
parser.add_argument("--base-url", dest="base_url", required=True,
help="Base URL of Burrow monitoring server without path.")
parser.add_argument("--kafka-cluster", dest="kafka_cluster",
help="Kafka cluster name (as defined in Burrow)",
required=True)
parser.add_argument("--consumer-group", dest="consumer_group",
help="Kafka consumer group to monitor.",
required=True)
parser.add_argument("--critical-lag", dest="critical_lag",
type=int, default=1000,
help="Critical threshold for consumer group lag.")


class Status (object):

def __init__(self, status):
self.status = status
self.status_msg = u""

def updateStatus(self, new_status, msg=None):
if new_status > self.status:
self.status = new_status
if msg:
self.status_msg = u"%s %s" % (self.status_msg, msg)

return


if __name__ == "__main__":
args = parser.parse_args()

# Build Request
if args.base_url[-1] == "/":
args.base_url = args.base_url[:-1]
req = urllib2.Request(
url="%(base_url)s/v2/kafka/%(kafka_cluster)s/consumer/%(consumer_group)s/status" %
args.__dict__
)

# Run Check
try:
start = time.time()
res = urllib2.urlopen(req)
end = time.time()
except urllib2.HTTPError, e:
print(
"CRITICAL: Server %s returned error %d - %s (%s)" %
(args.base_url, e.code, e.msg, e.read())
)
sys.exit(NAGIOS_CRITICAL)
except urllib2.URLError, e:
print("CRITICAL: Problem connecting to %s - %s" % (args.base_url, e.reason))
sys.exit(NAGIOS_CRITICAL)
except Exception, e:
print("CRITICAL: Unknown error occurred. %s" % str(e))
sys.exit(NAGIOS_CRITICAL)

try:
output = res.read()
json_output = json.loads(output)
except Exception, e:
print(
"CRITICAL: Error decoding API result to JSON - %s (API result: %s)" % (str(e), output)
)
sys.exit(NAGIOS_CRITICAL)

if json_output["error"]:
print("CRITICAL: %s" % json_output["message"])
sys.exit(NAGIOS_CRITICAL)

# Set general check status
status_result = Status(NAGIOS_OK)
if json_output["status"]["status"] == "NOTFOUND":
status_result.updateStatus(
NAGIOS_WARNING, "%s consumer group not found in this cluster." % args.consumer_group
)
elif json_output["status"]["status"] == "WARN":
status_result.updateStatus(NAGIOS_WARNING, "Group or partition is in a warning state.")
elif json_output["status"]["status"] == "ERR":
# If maxlag is set, then choose critical.
if json_output["status"]["maxlag"] is not None:
status = NAGIOS_CRITICAL
# else if there are any partition statuses present other than STOP,
# then choose warning.
elif (
[p['status'] for p in json_output['status']['partitions']].count("STOP") !=
len(json_output['status']['partitions'])
):
status = NAGIOS_WARNING
# Else we are in an 'error' state, but everything is really fine.
# Stopped partitions with no lag just means there haven't been
# recent messages in these partitions.
else:
status = NAGIOS_OK

status_result.updateStatus(status, "Group is in an error state.")
elif json_output["status"]["status"] == "STOP":
status_result.updateStatus(NAGIOS_WARNING, "A partition has stopped.")
elif json_output["status"]["status"] == "STALL":
status_result.updateStatus(NAGIOS_CRITICAL, "A partition has stalled.")
elif json_output["status"]["status"] != "OK":
status_result.updateStatus(
NAGIOS_WARNING, "Unexpected status value: %s" % json_output["status"]["status"]
)

# Parse maxlag info
max_lag_detail = ""
if json_output["status"]["maxlag"]:
max_lag = json_output["status"]["maxlag"]
max_lag_detail = "Worst Lag: %s/p%d - lag:%d offset:%d" % (
max_lag["topic"],
max_lag["partition"],
max_lag["end"]["lag"],
max_lag["end"]["offset"]
)
if max_lag["end"]["lag"] >= args.critical_lag:
status_result.updateStatus(NAGIOS_CRITICAL)

status_result.updateStatus(NAGIOS_UNKNOWN, max_lag_detail)

# Compile problem partition stats
problem_topic_partitions = {}
for part in json_output["status"]["partitions"]:
if part["status"] in ["WARN", "STOP", "STALL"]:
if part["topic"] not in problem_topic_partitions:
problem_topic_partitions[part["topic"]] = {"WARN": [], "STOP": [], "STALL": []}
problem_topic_partitions[part["topic"]][part["status"]].append(part["partition"])

problem_partition_detail = ""
for topic in problem_topic_partitions.keys():
problem_partition_detail = "%s(%s WARN:%d STOP:%d STAL:%d) " % (
problem_partition_detail,
topic,
len(problem_topic_partitions[topic]["WARN"]),
len(problem_topic_partitions[topic]["STOP"]),
len(problem_topic_partitions[topic]["STALL"])
)

if problem_partition_detail:
status_result.updateStatus(NAGIOS_UNKNOWN, "|"+problem_partition_detail)

# Return status
print("%s: %s" % (STATUS_MSG_PREFIX[status_result.status], status_result.status_msg))
sys.exit(status_result.status)
126 changes: 126 additions & 0 deletions modules/burrow/manifests/init.pp
Original file line number Diff line number Diff line change
@@ -0,0 +1,126 @@
# SPDX-License-Identifier: Apache-2.0
# == Define: burrow
#
# Burrow is a consumer offset lag monitoring tool for Kafka
# This module helps set up a burrow service that provides a
# http endpoint to query, and will email notifications on
# consumer groups statuses.
# This module supports only Burrow >= 1.0, since older versions are based
# on completely different configurations.
#
# == Parameters
#
# [*client_id*]
# The client ID string to provide to Kafka when consuming
#
# [*httpserver_port*]
# Port at which to make the burrow http endpoint available
#
# [*lagcheck_intervals*]
# Length of window of offsets used to monitor lag
# See: https://github.com/linkedin/Burrow/wiki/Configuration#lagcheck
#
# [*zookeeper_hosts*]
# Array of zookeeper host and their ports.
#
# [*zookeeper_path*]
# The full path to the znode that is the root for the Kafka cluster.
#
# [*kafka_cluster_name*]
# Name of the Kafka cluster to monitor.
#
# [*kafka_brokers*]
# Array of kafka brokers in the Kafka cluster.
#
# [*kafka_api_version*]
# Kafka api version to use with the cluster.
# Current maximum supported one is 1.0.0
# Default: '1.0.0'
#
# [*alert_whitelist*]
# Regex related to a whitelist of consumer groups that can trigger
# notifications via email.
#
# [*smtp_server*]
# SMTP server to send emails from
#
# [*from_email*]
# From email address for notification
#
# [*to_email*]
# Email address to send email notification to
#
# [*email_template*]
# The name of the email template to use for Burrow's alerts
#
# [*consumer_groups_blacklist*]
# Regex used to filter out temporary/not-relevant consumer groups.
# Default: '^(console-consumer-|python-kafka-consumer-|test_).*$'
#
# [*kafka_brokers_port*]
# Port used by Kafka brokers.
# Default: 9092
#
# [*zookeeper_port*]
# Port used by zookeeper.
# Default: 2181
#
define burrow (
$zookeeper_hosts,
$zookeeper_path,
$kafka_cluster_name,
$kafka_brokers,
$alert_whitelist,
$smtp_server,
$from_email,
$to_email,
$smtp_server_port = 25,
$kafka_brokers_port = 9092,
$zookeeper_port = 2181,
$kafka_api_version='1.0.0',
$client_id = 'burrow-client',
$httpserver_port = 8000,
$lagcheck_intervals = 10,
$email_template = 'burrow/email.tmpl.erb',
$consumer_groups_blacklist = '^(console-consumer-|python-kafka-consumer-|test_).*$',
)
{
ensure_packages('burrow')

# Burrow 1.0 accepts one parameter named '--config-dir' that
# expects a directory containing a file named 'burrow.toml'.
# Since multiple instances of Burrow can run on the same hosts,
# it is necessary to create the appropriate etc hierarchy.
$config_dir = "/etc/burrow/${title}"
file { $config_dir:
ensure => 'directory',
}

$email_template_path = "${config_dir}/email.tmpl"
if $to_email {
file { $email_template_path:
content => template($email_template),
}
}

file { "${config_dir}/burrow.toml":
content => template('burrow/burrow.toml.erb'),
}

systemd::service { "burrow-${title}":
ensure => present,
content => systemd_template('burrow'),
restart => true,
subscribe => File["${config_dir}/burrow.toml"],
require => [
Package['burrow'],
],
}

if ! defined(Service['burrow']) {
service { 'burrow':
ensure => stopped,
require => Package['burrow'],
}
}
}
47 changes: 47 additions & 0 deletions modules/burrow/templates/burrow.toml.erb
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
<%#- SPDX-License-Identifier: Apache-2.0 -%>
[general]
pidfile="/run/burrow/burrow-<%= @title %>.pid"
client-id="burrow-client"

[logging]
filename="/var/log/burrow/<%= @title %>.log"
level="info"

[zookeeper]
servers=["<%= @zookeeper_hosts.map{ |host| "#{host}:#{@zookeeper_port}" }.join('","') -%>"]
timeout=6
root-path="/burrow/notifier/<%= @title %>"

[client-profile.<%= @title %>]
kafka-version="<%= @kafka_api_version %>"

[cluster.<%= @title %>]
class-name="kafka"
client-profile="<%= @title %>"
servers=["<%= @kafka_brokers.map{ |host| "#{host}:#{@kafka_brokers_port}" }.join('","') %>"]

[consumer.<%= @title %>]
class-name="kafka"
cluster="<%= @title %>"
servers=["<%= @kafka_brokers.map{ |host| "#{host}:#{@kafka_brokers_port}" }.join('","') %>"]
<% if @consumer_groups_blacklist -%>
group-blacklist="<%= @consumer_groups_blacklist -%>"
<% end -%>
start-latest=true

[httpserver.mylistener]
address=":<%= @httpserver_port -%>"
timeout=60

<% if @to_email -%>
[notifier.<%= @title %>]
class-name="email"
interval=30
threshold=3
group-whitelist="<%= @alert_whitelist -%>"
template-open="<%= @email_template_path -%>"
from="<%= @from_email %>"
to="<%= @to_email %>"
server="<%= @smtp_server %>"
port="<%= @smtp_server_port %>"
<% end -%>
Loading

0 comments on commit d3cb6f7

Please sign in to comment.