Skip to content

Commit

Permalink
[apache-kafka] Fix the configuration to allow kafka to run on non-def…
Browse files Browse the repository at this point in the history
  • Loading branch information
shivaraj-bh authored Jan 16, 2024
1 parent 2e58b4a commit c7bed59
Show file tree
Hide file tree
Showing 2 changed files with 120 additions and 64 deletions.
160 changes: 99 additions & 61 deletions nix/apache-kafka.nix
Original file line number Diff line number Diff line change
@@ -1,64 +1,124 @@
# Based on: https://github.com/NixOS/nixpkgs/blob/master/nixos/modules/services/misc/apache-kafka.nix
{ config, lib, pkgs, name, ... }:
let
mkPropertyString =
let
render = {
bool = lib.boolToString;
int = toString;
list = lib.concatMapStringsSep "," mkPropertyString;
string = lib.id;
};
in
v: render.${lib.strings.typeOf v} v;

stringlySettings = lib.mapAttrs (_: mkPropertyString)
(lib.filterAttrs (_: v: v != null) config.settings);

generator = (pkgs.formats.javaProperties { }).generate;
in
with lib;
{
options = {
enable = mkOption {
description = lib.mdDoc "Whether to enable Apache Kafka.";
default = false;
type = types.bool;
};

brokerId = mkOption {
description = lib.mdDoc "Broker ID.";
default = -1;
type = types.int;
};
enable = mkEnableOption (lib.mdDoc "Apache Kafka event streaming broker");

port = mkOption {
description = lib.mdDoc "Port number the broker should listen on.";
default = 9092;
type = types.port;
};

hostname = mkOption {
description = lib.mdDoc "Hostname the broker should bind to.";
default = "localhost";
type = types.str;
};

dataDir = lib.mkOption {
type = types.str;
default = "./data/${name}";
description = "The apache-kafka data directory";
description = lib.mdDoc "The apache-kafka data directory";
};

logDirs = mkOption {
description = lib.mdDoc "Log file directories inside the data directory.";
default = [ "/kafka-logs" ];
type = types.listOf types.path;
};
settings = mkOption {
description = lib.mdDoc ''
[Kafka broker configuration](https://kafka.apache.org/documentation.html#brokerconfigs)
{file}`server.properties`.
zookeeper = mkOption {
description = lib.mdDoc "Zookeeper connection string";
default = "localhost:2181";
type = types.str;
Note that .properties files contain mappings from string to string.
Keys with dots are NOT represented by nested attrs in these settings,
but instead as quoted strings (ie. `settings."broker.id"`, NOT
`settings.broker.id`).
'';
type = types.submodule {
freeformType = with types; let
primitive = oneOf [ bool int str ];
in
lazyAttrsOf (nullOr (either primitive (listOf primitive)));

options = {
"broker.id" = mkOption {
description = lib.mdDoc "Broker ID. -1 or null to auto-allocate in zookeeper mode.";
default = null;
type = with types; nullOr int;
};

"log.dirs" = mkOption {
description = lib.mdDoc "Log file directories.";
# Deliberaly leave out old default and use the rewrite opportunity
# to have users choose a safer value -- /tmp might be volatile and is a
# slightly scary default choice.
# default = [ "/tmp/apache-kafka" ];
type = with types; listOf string;
default = [ (config.dataDir + "/logs") ];
};

"listeners" = mkOption {
description = lib.mdDoc ''
Kafka Listener List.
See [listeners](https://kafka.apache.org/documentation/#brokerconfigs_listeners).
'';
type = types.listOf types.str;
default = [ "PLAINTEXT://localhost:${builtins.toString config.port}" ];
};
};
};
};

extraProperties = mkOption {
description = lib.mdDoc "Extra properties for server.properties.";
type = types.nullOr types.lines;
clusterId = mkOption {
description = lib.mdDoc ''
KRaft mode ClusterId used for formatting log directories. Can be generated with `kafka-storage.sh random-uuid`
'';
type = with types; nullOr str;
default = null;
};

serverProperties = mkOption {
configFiles.serverProperties = mkOption {
description = lib.mdDoc ''
Complete server.properties content. Other server.properties config
options will be ignored if this option is used.
Kafka server.properties configuration file path.
Defaults to the rendered `settings`.
'';
type = types.nullOr types.lines;
default = null;
type = types.path;
default = generator "server.properties" stringlySettings;
};

configFiles.log4jProperties = mkOption {
description = lib.mdDoc "Kafka log4j property configuration file path";
type = types.path;
default = pkgs.writeText "log4j.properties" config.log4jProperties;
defaultText = ''pkgs.writeText "log4j.properties" config.log4jProperties'';
};

formatLogDirs = mkOption {
description = lib.mdDoc ''
Whether to format log dirs in KRaft mode if all log dirs are
unformatted, ie. they contain no meta.properties.
'';
type = types.bool;
default = false;
};

formatLogDirsIgnoreFormatted = mkOption {
description = lib.mdDoc ''
Whether to ignore already formatted log dirs when formatting log dirs,
instead of failing. Useful when replacing or adding disks.
'';
type = types.bool;
default = false;
};

log4jProperties = mkOption {
Expand All @@ -84,12 +144,7 @@ with lib;
];
};

package = mkOption {
description = lib.mdDoc "The kafka package to use";
default = pkgs.apacheKafka;
defaultText = literalExpression "pkgs.apacheKafka";
type = types.package;
};
package = mkPackageOption pkgs "apacheKafka" { };

jre = mkOption {
description = lib.mdDoc "The JRE with which to run Kafka";
Expand All @@ -106,38 +161,21 @@ with lib;
processes = {
"${name}" =
let
serverProperties =
if config.serverProperties != null then
config.serverProperties
else
''
# Generated by services-flake
broker.id=${toString config.brokerId}
port=${toString config.port}
host.name=${config.hostname}
log.dirs=${concatStringsSep "," (builtins.map (dir: "${config.dataDir}${dir}") config.logDirs)}
zookeeper.connect=${config.zookeeper}
${toString config.extraProperties}
'';

serverConfig = pkgs.writeText "server.properties" serverProperties;
logConfig = pkgs.writeText "log4j.properties" config.log4jProperties;

startScript = pkgs.writeShellScriptBin "start-kafka" ''
${config.jre}/bin/java \
-cp "${config.package}/libs/*" \
-Dlog4j.configuration=file:${logConfig} \
-Dlog4j.configuration=file:${config.configFiles.log4jProperties} \
${toString config.jvmOptions} \
kafka.Kafka \
${serverConfig}
${config.configFiles.serverProperties}
'';
in
{
command = "${startScript}/bin/start-kafka";

readiness_probe = {
# TODO: need to find a better way to check if kafka is ready. Maybe use one of the scripts in bin?
exec.command = "${pkgs.netcat.nc}/bin/nc -z ${config.hostname} ${toString config.port}";
exec.command = "${pkgs.netcat.nc}/bin/nc -z localhost ${builtins.toString config.port}";
initial_delay_seconds = 2;
period_seconds = 10;
timeout_seconds = 4;
Expand Down
24 changes: 21 additions & 3 deletions nix/apache-kafka_test.nix
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,32 @@
services.zookeeper."z1".enable = true;
# To avoid conflicting with `zookeeper_test.nix` in case the tests are run in parallel
services.zookeeper."z1".port = 2182;
services.apache-kafka."k1".enable = true;
services.apache-kafka."k1".zookeeper = "localhost:2182";
services.apache-kafka."k1" = {
enable = true;
port = 9094;
settings = {
# Since the available brokers are only 1
"offsets.topic.replication.factor" = 1;
"zookeeper.connect" = [ "localhost:2182" ];
};
};
# kafka should start only after zookeeper is healthy
settings.processes.k1.depends_on."z1".condition = "process_healthy";
settings.processes.test =
{
command = pkgs.writeShellApplication {
runtimeInputs = [ pkgs.bash config.services.apache-kafka.k1.package ];
text = ''
bash kafka-topics.sh --list --bootstrap-server localhost:9092
# Create a topic
kafka-topics.sh --create --bootstrap-server localhost:9094 --partitions 1 \
--replication-factor 1 --topic testtopic
# Producer
echo 'test 1' | kafka-console-producer.sh --broker-list localhost:9094 --topic testtopic
# Consumer
kafka-console-consumer.sh --bootstrap-server localhost:9094 --topic testtopic \
--from-beginning --max-messages 1 | grep -q "test 1"
'';
name = "kafka-test";
};
Expand Down

0 comments on commit c7bed59

Please sign in to comment.