Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KRaft for dev clusters #349

Open
wants to merge 9 commits into
base: no-namespaces-for-kustomize
Choose a base branch
from
2 changes: 1 addition & 1 deletion consumers-prometheus/kminion.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
spec:
containers:
- name: kminion
image: quay.io/cloudhut/kminion:v2.1.0@sha256:99e3217ec4a6a61a0eb68da86d3f2c16fa9847a6db8498fea74044935096353b
image: solsson/kafka-consumers-prometheus@sha256:3d902ed8fa9e2cee7f44ff8a5567a5562a21d6bbfb11fe12faea49e85bd65059
env:
- name: TELEMETRY_HOST
value: 0.0.0.0
Expand Down
15 changes: 7 additions & 8 deletions events-kube/topic-create.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,14 @@ spec:
spec:
containers:
- name: topic-create
image: solsson/kafka:native-cli@sha256:fbf29c59182fb87921c5199783d2d5796856ecbfe34a9c03eca658b3cf50f3c4
image: vectorized/redpanda:v21.11.10@sha256:0841e5a06ee5ee60385121aa2e766ceee5964c359a4ab42e73475f6d25fdd057
command:
- ./bin/kafka-topics.sh
- --zookeeper
- zookeeper.kafka.svc.cluster.local:2181
- --create
- --if-not-exists
- --topic
- ops.kube-events.stream.json
- rpk
- topic
- create
- --brokers
- bootstrap:9092
- ops.kube-events.stream.json
resources:
limits:
cpu: 100m
Expand Down
90 changes: 59 additions & 31 deletions kafka/10broker-config.yml
Original file line number Diff line number Diff line change
Expand Up @@ -10,13 +10,38 @@ data:
cp /etc/kafka-configmap/log4j.properties /etc/kafka/

KAFKA_BROKER_ID=${HOSTNAME##*-}
SEDS=("s/#init#broker.id=#init#/broker.id=$KAFKA_BROKER_ID/")
LABELS="kafka-broker-id=$KAFKA_BROKER_ID"
ANNOTATIONS=""

hash kubectl 2>/dev/null || {
# A custom ADVERTISE_ADDR can be set as env on the init container using for example value: $(POD_NAME).kafka.$(POD_NAMESPACE).custom.cluster.dns
[ -n "$ADVERTISE_ADDR" ] && echo "ADVERTISE_ADDR=$ADVERTISE_ADDR" || echo "ADVERTISE_ADDR is empty, Kafka will detect a hostname to advertise"

ADVERTISED_LISTENERS="PLAINTEXT://${ADVERTISE_ADDR}:9092"

# OUTSIDE_HOST and OUTSIDE_PORT can be set as envs, or set OUTSIDE_NODE_JSONPATH to get the value from the node, for example: '{.status.addresses[?(@.type=="InternalIP")].address}'
[ -z "$OUTSIDE_HOST_JSONPATH" ] || OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath="$OUTSIDE_HOST_JSONPATH")
[ -n "$OUTSIDE_PORT" ] || OUTSIDE_PORT=$((32400 + ${KAFKA_BROKER_ID}))
[ -z "$OUTSIDE_HOST" ] || ADVERTISED_LISTENERS="$ADVERTISED_LISTENERS,OUTSIDE://${OUTSIDE_HOST}:${OUTSIDE_PORT}"

if grep 'node.id' /etc/kafka-configmap/server.properties; then
echo "KRaft config detected"
SEDS=("s/#init#node.id=#init#/node.id=$KAFKA_BROKER_ID/")
SEDS+=("s/#init#controller.quorum.voters=#init#/controller.quorum.voters=$KAFKA_BROKER_ID@${ADVERTISE_ADDR}:9093/")
# We don't need to, actually should not, advertise controllers
# ADVERTISED_LISTENERS="$ADVERTISED_LISTENERS,CONTROLLER://${ADVERTISE_ADDR}:9093"
else
SEDS=("s/#init#broker.id=#init#/broker.id=$KAFKA_BROKER_ID/")
fi

# Note how the outside-N services have a fixed target port
[ -z "$OUTSIDE_HOST" ] || SEDS+=("s|^listeners=\(.*\)|listeners=\1,OUTSIDE://:9094|")

SEDS+=("s|#init#advertised.listeners=PLAINTEXT://#init#|advertised.listeners=$ADVERTISED_LISTENERS|")
ANNOTATIONS="$ANNOTATIONS kafka-listener-outside-host=$OUTSIDE_HOST kafka-listener-outside-port=$OUTSIDE_PORT"

if ! command -v kubectl >/dev/null; then
SEDS+=("s/#init#broker.rack=#init#/#init#broker.rack=# kubectl not found in path/")
} && {
else
ZONE=$(kubectl get node "$NODE_NAME" -o=go-template='{{index .metadata.labels "topology.kubernetes.io/zone"}}')
if [ "x$ZONE" == "x<no value>" ]; then
SEDS+=("s/#init#broker.rack=#init#/#init#broker.rack=# zone label not found for node $NODE_NAME/")
Expand All @@ -25,29 +50,37 @@ data:
LABELS="$LABELS kafka-broker-rack=$ZONE"
fi

[ -z "$ADVERTISE_ADDR" ] && echo "ADVERTISE_ADDR is empty, will advertise detected DNS name"
OUTSIDE_HOST=$(kubectl get node "$NODE_NAME" -o jsonpath='{.status.addresses[?(@.type=="InternalIP")].address}')
OUTSIDE_PORT=$((32400 + ${KAFKA_BROKER_ID}))
SEDS+=("s|#init#advertised.listeners=PLAINTEXT://#init#|advertised.listeners=PLAINTEXT://${ADVERTISE_ADDR}:9092,OUTSIDE://${OUTSIDE_HOST}:${OUTSIDE_PORT}|")
ANNOTATIONS="$ANNOTATIONS kafka-listener-outside-host=$OUTSIDE_HOST kafka-listener-outside-port=$OUTSIDE_PORT"

if [ ! -z "$LABELS" ]; then
kubectl -n $POD_NAMESPACE label pod $POD_NAME $LABELS || echo "Failed to label $POD_NAMESPACE.$POD_NAME - RBAC issue?"
fi
if [ ! -z "$ANNOTATIONS" ]; then
kubectl -n $POD_NAMESPACE annotate pod $POD_NAME $ANNOTATIONS || echo "Failed to annotate $POD_NAMESPACE.$POD_NAME - RBAC issue?"
fi
}
fi
printf '%s\n' "${SEDS[@]}" | sed -f - /etc/kafka-configmap/server.properties > /etc/kafka/server.properties.tmp
[ $? -eq 0 ] && mv /etc/kafka/server.properties.tmp /etc/kafka/server.properties
ln -s /etc/kafka/server.properties /etc/kafka/server.properties.$POD_NAME

init-kraft.sh: |-
#!/bin/bash
set -e
set -x
if [ -f /var/lib/kafka/data/kraft-combined/meta.properties ]; then
echo "Storage dir appears initialized already"
elif [ ! -f ./bin/kafka-storage.sh ]; then
echo "./bin/kafka-storage.sh not found, unable to set up KRaft storage directories"
else
echo "See https://github.com/apache/kafka/tree/trunk/config/kraft#format-storage-directories"
UUID=$(./bin/kafka-storage.sh random-uuid)
./bin/kafka-storage.sh format -t $UUID -c /etc/kafka/server.properties
fi

server.properties: |-
############################# Log Basics #############################

# A comma seperated list of directories under which to store log files
# Overrides log.dir
log.dirs=/var/lib/kafka/data/topics
log.dirs=/var/lib/kafka/data/kraft-combined

# The default number of log partitions per topic. More partitions allow greater
# parallelism for consumption, but this will also result in more files across
Expand All @@ -66,31 +99,39 @@ data:

############################# Server Basics #############################

# The role of this server. Setting this puts us in KRaft mode
process.roles=broker,controller

# The id of the broker. This must be set to a unique integer for each broker.
#init#broker.id=#init#
#init#node.id=#init#
#init#controller.quorum.voters=#init#

#init#broker.rack=#init#

############################# Socket Server Settings #############################

# The address the socket server listens on. It will get the value returned from
# The address the socket server listens on. It will get the value returned from
# java.net.InetAddress.getCanonicalHostName() if not configured.
# FORMAT:
# listeners = listener_name://host_name:port
# EXAMPLE:
# listeners = PLAINTEXT://your.host.name:9092
#listeners=PLAINTEXT://:9092
listeners=PLAINTEXT://:9092,OUTSIDE://:9094
listeners=PLAINTEXT://:9092,CONTROLLER://:9093

# Hostname and port the broker will advertise to producers and consumers. If not set,
# Hostname and port the broker will advertise to producers and consumers. If not set,
# it uses the value for "listeners" if configured. Otherwise, it will use the value
# returned from java.net.InetAddress.getCanonicalHostName().
#advertised.listeners=PLAINTEXT://your.host.name:9092
#advertised.listeners=PLAINTEXT://your.host.name:9092,CONTROLLER://:9093
#init#advertised.listeners=PLAINTEXT://#init#

# Listener, host name, and port for the controller to advertise to the brokers. If
# this server is a controller, this listener must be configured.
controller.listener.names=CONTROLLER

# Maps listener names to security protocols, the default is for them to be the same. See the config documentation for more details
#listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:PLAINTEXT
#listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL
listener.security.protocol.map=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT,SSL:SSL,SASL_PLAINTEXT:SASL_PLAINTEXT,SASL_SSL:SASL_SSL,OUTSIDE:PLAINTEXT
inter.broker.listener.name=PLAINTEXT

# The number of threads that the server uses for receiving requests from the network and sending responses to the network
Expand Down Expand Up @@ -156,19 +197,6 @@ data:
# to the retention policies
#log.retention.check.interval.ms=300000

############################# Zookeeper #############################

# Zookeeper connection string (see zookeeper docs for details).
# This is a comma separated host:port pairs, each corresponding to a zk
# server. e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002".
# You can also append an optional chroot string to the urls to specify the
# root directory for all kafka znodes.
zookeeper.connect=zookeeper:2181

# Timeout in ms for connecting to zookeeper
#zookeeper.connection.timeout.ms=18000


############################# Group Coordinator Settings #############################

# The following configuration specifies the time, in milliseconds, that the GroupCoordinator will delay the initial consumer rebalance.
Expand Down
23 changes: 21 additions & 2 deletions kafka/50kafka.yml
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ spec:
annotations:
spec:
terminationGracePeriodSeconds: 30
securityContext:
fsGroup: 65534
initContainers:
- name: init-config
image: solsson/kafka:initutils@sha256:0cd27e24dea0b27e17e72cd05259a4de394d527473c6707e89f26b87e356a04d
image: docker.io/yolean/toil:ab798035e825381728daffe35ade7c9c836021aa@sha256:eb1dd353e46270d7cdc69a118198bfafa9c2861520680276421c34f1d2b5c1a2
env:
- name: NODE_NAME
valueFrom:
Expand All @@ -34,6 +36,8 @@ spec:
valueFrom:
fieldRef:
fieldPath: metadata.namespace
- name: OUTSIDE_HOST_JSONPATH
value: '{.status.addresses[?(@.type=="InternalIP")].address}'
command: ['/bin/bash', '/etc/kafka-configmap/init.sh']
volumeMounts:
- name: configmap
Expand All @@ -42,9 +46,21 @@ spec:
mountPath: /etc/kafka
- name: extensions
mountPath: /opt/kafka/libs/extensions
- name: init-kraft
image: yolean/kafka:3.1.0-java11
command: ['/bin/bash', '/etc/kafka-configmap/init-kraft.sh']
volumeMounts:
- name: configmap
mountPath: /etc/kafka-configmap
- name: config
mountPath: /etc/kafka
- name: data
mountPath: /var/lib/kafka/data
- name: extensions
mountPath: /opt/kafka/libs/extensions
containers:
- name: broker
image: solsson/kafka:2.8.0@sha256:69274245ad16fba37e5b277335502a02e41c0efeae3ccbc3374f6cf624b58347
image: yolean/kafka:3.1.0-java11
env:
- name: POD_NAME
valueFrom:
Expand All @@ -63,13 +79,16 @@ spec:
ports:
- name: inside
containerPort: 9092
- name: controller
containerPort: 9093
- name: outside
containerPort: 9094
- name: jmx
containerPort: 5555
command:
- ./bin/kafka-server-start.sh
- /etc/kafka/server.properties.$(POD_NAME)
args: []
lifecycle:
preStop:
exec:
Expand Down
2 changes: 0 additions & 2 deletions nonroot/entrypoint-from-image.yaml

This file was deleted.

4 changes: 0 additions & 4 deletions nonroot/fsgroup-65534.yaml

This file was deleted.

44 changes: 0 additions & 44 deletions nonroot/kustomization.yaml

This file was deleted.

15 changes: 0 additions & 15 deletions nonroot/nonroot-image-kafka.yaml

This file was deleted.

31 changes: 0 additions & 31 deletions nonroot/nonroot-image-zookeeper.yaml

This file was deleted.

4 changes: 2 additions & 2 deletions variants/dev-small/listener-localhost.json
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
[
{"op": "add", "path": "/spec/template/spec/containers/0/args/1", "value": "--override"},
{"op": "add", "path": "/spec/template/spec/containers/0/args/2", "value": "advertised.listeners=PLAINTEXT://:9092,OUTSIDE://localhost:9094"}
{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--override"},
{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "advertised.listeners=PLAINTEXT://:9092,OUTSIDE://localhost:9094"}
]
2 changes: 0 additions & 2 deletions variants/scale-1/kafka-scale1-overrides.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
[
{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--override"},
{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "zookeeper.connect=zoo-0.zoo.$(POD_NAMESPACE).svc.cluster.local:2181" },
{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--override"},
{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "default.replication.factor=1"},
{"op": "add", "path": "/spec/template/spec/containers/0/args/-", "value": "--override"},
Expand Down
3 changes: 1 addition & 2 deletions variants/scale-1/kustomization.yaml
Original file line number Diff line number Diff line change
@@ -1,8 +1,7 @@
bases:
- ../../native
- ../../kafka
patchesStrategicMerge:
- kafka.yaml
- zookeeper.yaml
patchesJson6902:
- target:
group: apps
Expand Down