This guide walks through of using the Kubernetes-Kafka-Connect-Operator,
- [kubectl][kubectl_tool] version v1.12.0+.
- Access to a Kubernetes v1.12.0+ cluster.
This operator run kafka connect clusters in Kubernetes platform.
The kafka connect clusters are deployed by kubernetes Deployement
objects generated from the objects of the KafkaConnect
custom resource type.
This operator also expose the kafka connect rest api as a service and an ingress object
This operator can also expose the connector lag via the new kubernetes custom metrics API (optional)
The most common way of using a KafkaConnect
is store the KafkaConnect
specification in a YAML file.
For more information, check the API Specification.
apiVersion: kafkaconnect.operator.io/v1alpha1
kind: KafkaConnect
metadata:
name: kafkaconnectcloud
labels:
app: kafkaconnect
type: elasticsearch-sink
spec:
kafkaConnectRestAPIPort: 8083
ingressSpec:
parentDomain: apps-crc.testing
style: domainStyle
connectors:
taskPerPod: 1
initPodReplicas: 1
connectorConfigs:
- name: connector-elastic
exposeLagMetric: true
url: https://raw.githubusercontent.com/amadeusitgroup/kubernetes-kafka-connect-operator/master/connector-examples/connector1.json
podSpec:
containers:
- name: kafkaconnect
image: confluentinc/cp-kafka-connect:5.3.1
imagePullPolicy: IfNotPresent
ports:
- containerPort: 8083
env:
- name: CONNECT_BOOTSTRAP_SERVERS
value: localhost:9092
- name: CONNECT_GROUP_ID
value: connector-elastic
- name: CONNECT_CONFIG_STORAGE_TOPIC
value: connect-configs-logger
- name: CONNECT_STATUS_STORAGE_TOPIC
value: connect-status-logger
- name: CONNECT_OFFSET_STORAGE_TOPIC
value: connect-offsets-logger
- name: CONNECT_INTERNAL_KEY_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: CONNECT_INTERNAL_VALUE_CONVERTER
value: org.apache.kafka.connect.json.JsonConverter
- name: CONNECT_VALUE_CONVERTER
value: org.apache.kafka.connect.storage.StringConverter
- name: CONNECT_KEY_CONVERTER
value: org.apache.kafka.connect.storage.StringConverter
- name: CONNECT_PLUGIN_PATH
value: /usr/share/java,/usr/share/confluent-hub-components
- name: CONNECT_LOG4J_ROOT_LOGLEVEL
value: INFO
- name: CONNECT_LOG4J_LOGGERS
value: org.reflections=INFO
- name: CONNECT_REST_ADVERTISED_HOST_NAME
valueFrom:
fieldRef:
fieldPath: status.podIP
- name: KAFKA_HEAP_OPTS
value: "-Xms500M -Xmx800M"
readinessProbe:
httpGet:
path: /connectors
port: 8083
initialDelaySeconds: 120
periodSeconds: 5
timeoutSeconds: 1
livenessProbe:
httpGet:
path: /connectors
port: 8083
initialDelaySeconds: 120
periodSeconds: 20
timeoutSeconds: 1
resources:
limits:
cpu: 500m
memory: 1Gi
requests:
memory: 700Mi
cpu: 300m
This operator can also auto-scale Kafka Connect clusters by using the scale api of deployment objects and the connector's task by using the kafka connect cluster's api
The algo to auto-scale kafka connector task number is same as the Kubernetes Horizontal Pod Autoscaler and base on the taskPerPod
value in a KafkaConnect
object spec the Deployment
's replicas (Pod
number) will also be updated proportionally.
For more information, check the API Specification.
apiVersion: kafkaconnect.operator.io/v1alpha1
kind: KafkaConnectAutoScaler
metadata:
name: example-kafkaconnectautoscaler
spec:
kcScaleTargetRef:
apiVersion: kafkaconnect.operator.io/v1alpha1
name: kafkaconnectcloud
kafkaConnectorName: connector-elastic
minTasks: 1
maxTasks: 10
metrics:
- type: Object
object:
describedObject:
kind: KafkaConnect
name: kafkaconnectcloud
apiVersion: kafkaconnect.operator.io/v1alpha1
metric:
name: connector-elastic-lag
target:
type: Value
value: "10"