From 79a70da32c7ff3aa5f64f5b4ab64e83bb795f630 Mon Sep 17 00:00:00 2001 From: Katherine Stanley <11195226+katheris@users.noreply.github.com> Date: Thu, 26 Sep 2024 17:25:22 +0100 Subject: [PATCH 1/5] Blogpost: Managing connector offsets Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com> --- .../2024-09-26-managing-connector-offsets.md | 252 ++++++++++++++++++ 1 file changed, 252 insertions(+) create mode 100644 _posts/2024-09-26-managing-connector-offsets.md diff --git a/_posts/2024-09-26-managing-connector-offsets.md b/_posts/2024-09-26-managing-connector-offsets.md new file mode 100644 index 000000000..a188a93ae --- /dev/null +++ b/_posts/2024-09-26-managing-connector-offsets.md @@ -0,0 +1,252 @@ +--- +layout: post +title: "Managing Connector Offsets" +date: 2024-08-26 +author: kate_stanley +--- + +When building data pipelines using Kafka Connect, or replicating data using MirrorMaker, offsets are used to keep track of the flow of data. +Sink connectors use the usual Kafka consumer offset mechanism, while source connectors can store offsets with a custom format in a Kafka topic. + +To manage connector offsets, rather than directly interacting with the underlying Kafka topics, you can make use of the following endpoints from the Connect REST API: + +* `GET /connectors/{connector}/offsets` to list offsets +* `PATCH /connectors/{connector}/offsets` to alter offsets +* `DELETE /connectors/{connector}/offsets` to reset offsets + +In the next release of Strimzi you will also be able to manage connector offsets directly in the KafkaConnector and KafkaMirrorMaker2 custom resources. +This blog post steps through how you can use this new functionality. +The process is very similar for both Kafka Connect and MirrorMaker, so we'll demonstrate how to do it with KafkaConnect and then explain how the process applies to MirrorMaker. + +### Before we begin + +If you want to follow along the steps in this blog post you need a Kubernetes cluster containing the Strimzi operator, a Kafka cluster, a Connect cluster, and a running connector. +First run through the Strimzi [quickstart guide](https://strimzi.io/quickstarts/) to deploy your Strimzi operator and Kafka cluster. + +Once you have a Kafka cluster you can deploy Connect and a connector using the following commands: +1. `kubectl apply -f https://strimzi.io/examples/latest/connect/kafka-connect-build.yaml -n kafka` +2. `kubectl wait kafkaconnect/my-connect-cluster --for=condition=Ready --timeout=300s -n kafka` +3. `kubectl annotate kafkaconnect my-connect-cluster strimzi.io/use-connector-resources=true -n kafka` +4. `kubectl apply -f https://strimzi.io/examples/latest/connect/source-connector.yaml -n kafka` +5. `kubectl wait kafkaconnector/my-source-connector --for=condition=Ready --timeout=300s -n kafka` + +### Listing offsets + +To list offsets edit your `KafkaConnector` resource using `kubectl edit kafkaconnector my-source-connector -n kafka` to add configuration for where to output the offsets: + +```yaml +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaConnector +# ... +spec: + #... + listOffsets: + toConfigMap: + name: my-connector-offsets + #... +``` + +This tells Strimzi to write the offsets to a ConfigMap called `my-connector-offsets`. + +To trigger the Strimzi to get the latest offsets you need to annotate your `KafkaConnector` resource: + +```shell +$ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n kafka +``` + +After a couple of minutes you should have a new ConfigMap containing the output: + +```shell +$ kubectl get configmap my-connector-offsets -n kafka -oyaml +apiVersion: v1 +kind: ConfigMap +metadata: + creationTimestamp: "2024-09-26T10:20:15Z" (1) + labels: + strimzi.io/cluster: my-connect-cluster + name: my-connector-offsets + namespace: kafka + ownerReferences: (2) + - apiVersion: kafka.strimzi.io/v1beta2 + blockOwnerDeletion: false + controller: false + kind: KafkaConnector + name: my-source-connector + uid: 637e3be7-bd96-43ab-abde-c55b4c4550e0 + resourceVersion: "66951" + uid: 641d60a9-36eb-4f29-9895-8f2c1eb9638e +data: (3) + offsets.json: |- + { + "offsets" : [ { + "partition" : { + "filename" : "/opt/kafka/LICENSE" + }, + "offset" : { + "position" : 15295 + } + } ] + } +``` + +1. If the ConfigMap doesn't already exist Strimzi will create it. +2. The owner reference points to your KafkaConnector resource. To provide a custom owner reference, create the ConfigMap in advance and set an owner reference. +3. Strimzi puts the offsets into a field called `offsets.json`. It doesn't overwrite any other fields when updating an existing ConfigMap. + +You can check that the output matches the results from the Connect REST API by calling the `GET /connectors/{connector}/offsets` endpoint directly: + +```shell +$ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/connectors/my-source-connector/offsets +{"offsets":[{"partition":{"filename":"/opt/kafka/LICENSE"},"offset":{"position":15295}}]} +``` + +### Altering offsets + +To alter offsets for a connector you also need a ConfigMap, this time to tell Strimzi the new offsets to set. +All sink connectors use the same format, however for source connectors it varies. +The easiest way to create the ConfigMap for altering offsets, is actually to reuse the ConfigMap that Strimzi wrote the offsets into. + +To alter connector offsets the connector also needs to be stopped. + +Edit the `KafkaConnector` resource to set the `my-connector-offsets` ConfigMap as the source of offsets for the alter operation, and set the `state` as `stopped`: + +```yaml +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaConnector +# ... +spec: + #... + state: stopped + alterOffsets: + fromConfigMap: + name: my-connector-offsets + #... +``` + +The `status` in the `KafkaConnector` is updated once Strimzi has stopped the connector. + +Now edit the `my-connector-offsets` ConfigMap to change the `position` to, for example, 10: + +```yaml +apiVersion: v1 +kind: ConfigMap +# ... +data: + offsets.json: |- + { + "offsets" : [ { + "partition" : { + "filename" : "/opt/kafka/LICENSE" + }, + "offset" : { + "position" : 10 + } + } ] + } +``` + +Finally trigger the operator to alter offsets by annotating the resource: + +```shell +$ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=alter -n kafka +``` + +Strimzi removes the `strimzi.io/connector-offsets` annotation from the resource once the offsets have been successfully updated. +You can also verify this directly: + +```shell +$ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/connectors/my-source-connector/offsets +{"offsets":[{"partition":{"filename":"/opt/kafka/LICENSE"},"offset":{"position":10}}]} +``` + +### Resetting offsets + +The final action you can perform is to reset the connector offsets. +For this action the connector also needs to be in a `stopped` state, but you don't need any ConfigMap. + +Reset the offsets for your connector by annotating the resource: + +```shell +$ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=reset -n kafka +``` + +Once complete the connector offsets will be empty: + +```shell +$ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/connectors/my-source-connector/offsets +{"offsets":[]} +``` + +### Managing offsets for MirrorMaker + +In addition to connectors managed via the `KafkaConnector` resource, you can also manage the connectors that are deployed as part of a `KafkaMirrorMaker2` resource. +When using a `KafkaMirrorMaker2` resource the configurations for the ConfigMaps for listing and altering offsets are provided on a per-connector basis. For example: + +```yaml +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaMirrorMaker2 +metadata: + name: my-mirror-maker-2 +spec: + # ... + mirrors: + - # ... + sourceConnector: + listOffsets: + toConfigMap: + name: my-connector-offsets + alterOffsets: + fromConfigMap: + name: my-connector-offsets + # ... +``` + +Strimzi allows you to list, alter and reset offsets for all the MirrorMaker connectors (MirrorSourceConnector, MirrorCheckpointConnector, MirrorHeartbeatConnector). +However, be aware that the only connector that currently actively uses its offsets is the MirrorSourceConnector. +You may find listing offsets useful for the MirrorCheckpointConnector and MirrorHeartbeatConnector to track progress, but it is uncommon to need to alter or reset offsets for those connectors. + +Strimzi only allows you to perform an action on a single connector, in a single mirror at a time. +Therefore, to initiate an action from a `KafkaMirrorMaker2` resource you must apply two annotations: +* strimzi.io/connector-offsets +* strimzi.io/mirrormaker-connector + +Set `strimzi.io/connector-offsets` to one of `list`, `alter` or `reset`. +At the same time set `strimzi.io/mirrormaker-connector` to the name of your connector. + +Strimzi names the connectors using the format `->.`, for example `east-kafka->west-kafka.MirrorSourceConnector`. + +You can use a single command to annotate the resource with both annotations. +For example to list offsets for a connector called `east-kafka->west-kafka.MirrorSourceConnector`: + +```shell +$ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="east-kafka->west-kafka.MirrorSourceConnector" -n kafka +``` + +When listing and altering offsets for MirrorMaker connectors, Strimzi uses the connector name in the data field. +For example, the above command to list offsets for the connector `east-kafka->west-kafka.MirrorSourceConnector` results in a ConfigMap containing: + +```yaml +apiVersion: v1 +kind: ConfigMap +# ... +data: + east-kafka--west-kafka.MirrorSourceConnector.json: | + { + "offsets": [ + { + "partition": { + "cluster": "east-kafka", + "partition": 0, + "topic": "mirrormaker2-cluster-configs" + }, + "offset": { + "offset": 0 + } + } + ] + } +``` + +### Conclusion + +Now you can list, alter and reset offsets of your connectors using the `KafkaConnector` and `KafkaMirrorMaker2` custom resources. From ce0c5ec5a70f76e26ea9c3db33785c5acc1162f7 Mon Sep 17 00:00:00 2001 From: Katherine Stanley <11195226+katheris@users.noreply.github.com> Date: Mon, 30 Sep 2024 11:37:54 +0100 Subject: [PATCH 2/5] Address scholzj, see-quick, and PaulRMellor review comments Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com> --- .../2024-09-26-managing-connector-offsets.md | 57 ++++++++++++------- 1 file changed, 38 insertions(+), 19 deletions(-) diff --git a/_posts/2024-09-26-managing-connector-offsets.md b/_posts/2024-09-26-managing-connector-offsets.md index a188a93ae..6463fda29 100644 --- a/_posts/2024-09-26-managing-connector-offsets.md +++ b/_posts/2024-09-26-managing-connector-offsets.md @@ -6,7 +6,7 @@ author: kate_stanley --- When building data pipelines using Kafka Connect, or replicating data using MirrorMaker, offsets are used to keep track of the flow of data. -Sink connectors use the usual Kafka consumer offset mechanism, while source connectors can store offsets with a custom format in a Kafka topic. +Sink connectors use Kafka's standard consumer offset mechanism, while source connectors store offsets in a custom format within a Kafka topic. To manage connector offsets, rather than directly interacting with the underlying Kafka topics, you can make use of the following endpoints from the Connect REST API: @@ -14,7 +14,7 @@ To manage connector offsets, rather than directly interacting with the underlyin * `PATCH /connectors/{connector}/offsets` to alter offsets * `DELETE /connectors/{connector}/offsets` to reset offsets -In the next release of Strimzi you will also be able to manage connector offsets directly in the KafkaConnector and KafkaMirrorMaker2 custom resources. +From Strimzi 0.44 onwards you can also manage connector offsets directly in the `KafkaConnector` and `KafkaMirrorMaker2` custom resources. This blog post steps through how you can use this new functionality. The process is very similar for both Kafka Connect and MirrorMaker, so we'll demonstrate how to do it with KafkaConnect and then explain how the process applies to MirrorMaker. @@ -32,7 +32,7 @@ Once you have a Kafka cluster you can deploy Connect and a connector using the f ### Listing offsets -To list offsets edit your `KafkaConnector` resource using `kubectl edit kafkaconnector my-source-connector -n kafka` to add configuration for where to output the offsets: +To list offsets, edit your `KafkaConnector` resource by running `kubectl edit kafkaconnector my-source-connector -n kafka` and add the configuration to specify where the offsets should be output: ```yaml apiVersion: kafka.strimzi.io/v1beta2 @@ -48,7 +48,7 @@ spec: This tells Strimzi to write the offsets to a ConfigMap called `my-connector-offsets`. -To trigger the Strimzi to get the latest offsets you need to annotate your `KafkaConnector` resource: +To trigger Strimzi to get the latest offsets, annotate your `KafkaConnector` resource: ```shell $ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n kafka @@ -89,8 +89,8 @@ data: (3) } ``` -1. If the ConfigMap doesn't already exist Strimzi will create it. -2. The owner reference points to your KafkaConnector resource. To provide a custom owner reference, create the ConfigMap in advance and set an owner reference. +1. If the ConfigMap doesn't already exist, Strimzi will create it automatically. +2. The owner reference points to your `KafkaConnector` resource. To provide a custom owner reference, create the ConfigMap in advance and set an owner reference manually. 3. Strimzi puts the offsets into a field called `offsets.json`. It doesn't overwrite any other fields when updating an existing ConfigMap. You can check that the output matches the results from the Connect REST API by calling the `GET /connectors/{connector}/offsets` endpoint directly: @@ -102,9 +102,9 @@ $ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/ ### Altering offsets -To alter offsets for a connector you also need a ConfigMap, this time to tell Strimzi the new offsets to set. +To alter a connector's offsets, you need to create a ConfigMap that instructs Strimzi on the new offsets to apply. All sink connectors use the same format, however for source connectors it varies. -The easiest way to create the ConfigMap for altering offsets, is actually to reuse the ConfigMap that Strimzi wrote the offsets into. +The easiest way to create a ConfigMap for altering offsets is to reuse the one that Strimzi originally wrote the offsets to. To alter connector offsets the connector also needs to be stopped. @@ -159,10 +159,25 @@ $ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/ {"offsets":[{"partition":{"filename":"/opt/kafka/LICENSE"},"offset":{"position":10}}]} ``` +#### Resuming the connector + +Once Strimzi has altered the offsets you need to manually resume the connector. +Edit the `KafkaConnector` resource to set the `state` to `running`: + +```yaml +apiVersion: kafka.strimzi.io/v1beta2 +kind: KafkaConnector +# ... +spec: + #... + state: running + #... +``` + ### Resetting offsets The final action you can perform is to reset the connector offsets. -For this action the connector also needs to be in a `stopped` state, but you don't need any ConfigMap. +For this action the connector also needs to be in a `stopped` state, but you don't need a ConfigMap. Reset the offsets for your connector by annotating the resource: @@ -170,17 +185,19 @@ Reset the offsets for your connector by annotating the resource: $ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=reset -n kafka ``` -Once complete the connector offsets will be empty: +Once completed, the connector offsets will be empty: ```shell $ kubectl exec -n kafka -it my-connect-cluster-connect-0 -- curl localhost:8083/connectors/my-source-connector/offsets {"offsets":[]} ``` +Similar to altering offsets, make sure you manually resume the connector by changing the `state` in `KafkaConnector` to `running`. + ### Managing offsets for MirrorMaker In addition to connectors managed via the `KafkaConnector` resource, you can also manage the connectors that are deployed as part of a `KafkaMirrorMaker2` resource. -When using a `KafkaMirrorMaker2` resource the configurations for the ConfigMaps for listing and altering offsets are provided on a per-connector basis. For example: +When using a `KafkaMirrorMaker2` resource, the configurations for the ConfigMaps for listing and altering offsets, as well as the `state` configuration for stopping and resuming the connector, are specified on a per-connector basis. For example: ```yaml apiVersion: kafka.strimzi.io/v1beta2 @@ -198,17 +215,19 @@ spec: alterOffsets: fromConfigMap: name: my-connector-offsets + state: stopped # ... ``` -Strimzi allows you to list, alter and reset offsets for all the MirrorMaker connectors (MirrorSourceConnector, MirrorCheckpointConnector, MirrorHeartbeatConnector). -However, be aware that the only connector that currently actively uses its offsets is the MirrorSourceConnector. -You may find listing offsets useful for the MirrorCheckpointConnector and MirrorHeartbeatConnector to track progress, but it is uncommon to need to alter or reset offsets for those connectors. +Strimzi allows you to list, alter and reset offsets for all the MirrorMaker connectors (`MirrorSourceConnector`, `MirrorCheckpointConnector`, `MirrorHeartbeatConnector`). +However, be aware that the only connector that currently actively uses its offsets is the `MirrorSourceConnector`. +Listing offsets can be useful for the `MirrorCheckpointConnector` and `MirrorHeartbeatConnector` to track progress. +However, altering or resetting offsets for these connectors is rarely necessary. -Strimzi only allows you to perform an action on a single connector, in a single mirror at a time. +Strimzi only allows you to perform actions on one connector within a single mirror at a time. Therefore, to initiate an action from a `KafkaMirrorMaker2` resource you must apply two annotations: -* strimzi.io/connector-offsets -* strimzi.io/mirrormaker-connector +* `strimzi.io/connector-offsets` +* `strimzi.io/mirrormaker-connector` Set `strimzi.io/connector-offsets` to one of `list`, `alter` or `reset`. At the same time set `strimzi.io/mirrormaker-connector` to the name of your connector. @@ -216,7 +235,7 @@ At the same time set `strimzi.io/mirrormaker-connector` to the name of your conn Strimzi names the connectors using the format `->.`, for example `east-kafka->west-kafka.MirrorSourceConnector`. You can use a single command to annotate the resource with both annotations. -For example to list offsets for a connector called `east-kafka->west-kafka.MirrorSourceConnector`: +For example, this command lists offsets for a connector called `east-kafka->west-kafka.MirrorSourceConnector`: ```shell $ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="east-kafka->west-kafka.MirrorSourceConnector" -n kafka @@ -249,4 +268,4 @@ data: ### Conclusion -Now you can list, alter and reset offsets of your connectors using the `KafkaConnector` and `KafkaMirrorMaker2` custom resources. +Now you can list, alter, and reset offsets of your connectors using the `KafkaConnector` and `KafkaMirrorMaker2` custom resources. From dcbee165aedcbff2458648220f55b96d89e6211c Mon Sep 17 00:00:00 2001 From: Katherine Stanley <11195226+katheris@users.noreply.github.com> Date: Tue, 1 Oct 2024 10:53:50 +0100 Subject: [PATCH 3/5] Address mimaison review comments Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com> --- _posts/2024-09-26-managing-connector-offsets.md | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/_posts/2024-09-26-managing-connector-offsets.md b/_posts/2024-09-26-managing-connector-offsets.md index 6463fda29..11a719ee9 100644 --- a/_posts/2024-09-26-managing-connector-offsets.md +++ b/_posts/2024-09-26-managing-connector-offsets.md @@ -8,15 +8,20 @@ author: kate_stanley When building data pipelines using Kafka Connect, or replicating data using MirrorMaker, offsets are used to keep track of the flow of data. Sink connectors use Kafka's standard consumer offset mechanism, while source connectors store offsets in a custom format within a Kafka topic. -To manage connector offsets, rather than directly interacting with the underlying Kafka topics, you can make use of the following endpoints from the Connect REST API: +To manage connector offsets, rather than directly interacting with the underlying Kafka topics, you should make use of the following endpoints from the Connect REST API: * `GET /connectors/{connector}/offsets` to list offsets * `PATCH /connectors/{connector}/offsets` to alter offsets * `DELETE /connectors/{connector}/offsets` to reset offsets -From Strimzi 0.44 onwards you can also manage connector offsets directly in the `KafkaConnector` and `KafkaMirrorMaker2` custom resources. +Some common use-cases for managing connector offsets are: +* skipping a poison record +* replaying records +* specifying a starting offset for a new connector + +From Strimzi 0.44 onwards you can manage connector offsets directly in the `KafkaConnector` and `KafkaMirrorMaker2` custom resources. This blog post steps through how you can use this new functionality. -The process is very similar for both Kafka Connect and MirrorMaker, so we'll demonstrate how to do it with KafkaConnect and then explain how the process applies to MirrorMaker. +The process is very similar for both Kafka Connect and MirrorMaker, so we'll demonstrate how to do it with Kafka Connect and then explain how the process applies to MirrorMaker. ### Before we begin @@ -241,7 +246,7 @@ For example, this command lists offsets for a connector called `east-kafka->west $ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="east-kafka->west-kafka.MirrorSourceConnector" -n kafka ``` -When listing and altering offsets for MirrorMaker connectors, Strimzi uses the connector name in the data field. +When listing and altering offsets for MirrorMaker connectors, Strimzi uses the connector name in the data field replacing `->` with `--`. For example, the above command to list offsets for the connector `east-kafka->west-kafka.MirrorSourceConnector` results in a ConfigMap containing: ```yaml @@ -269,3 +274,4 @@ data: ### Conclusion Now you can list, alter, and reset offsets of your connectors using the `KafkaConnector` and `KafkaMirrorMaker2` custom resources. +Use this new feature to manage the flow of data in your Connect data pipelines, whether that's to skip poison records, replay records, or even to check on the status of your connector. From e6f8be5742f29365492e55495076008428ec89ec Mon Sep 17 00:00:00 2001 From: Katherine Stanley <11195226+katheris@users.noreply.github.com> Date: Tue, 1 Oct 2024 15:57:25 +0100 Subject: [PATCH 4/5] Address ppatierno review comments Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com> --- .../2024-09-26-managing-connector-offsets.md | 31 ++++++++++++++----- 1 file changed, 23 insertions(+), 8 deletions(-) diff --git a/_posts/2024-09-26-managing-connector-offsets.md b/_posts/2024-09-26-managing-connector-offsets.md index 11a719ee9..8960c2939 100644 --- a/_posts/2024-09-26-managing-connector-offsets.md +++ b/_posts/2024-09-26-managing-connector-offsets.md @@ -29,11 +29,26 @@ If you want to follow along the steps in this blog post you need a Kubernetes cl First run through the Strimzi [quickstart guide](https://strimzi.io/quickstarts/) to deploy your Strimzi operator and Kafka cluster. Once you have a Kafka cluster you can deploy Connect and a connector using the following commands: -1. `kubectl apply -f https://strimzi.io/examples/latest/connect/kafka-connect-build.yaml -n kafka` -2. `kubectl wait kafkaconnect/my-connect-cluster --for=condition=Ready --timeout=300s -n kafka` -3. `kubectl annotate kafkaconnect my-connect-cluster strimzi.io/use-connector-resources=true -n kafka` -4. `kubectl apply -f https://strimzi.io/examples/latest/connect/source-connector.yaml -n kafka` -5. `kubectl wait kafkaconnector/my-source-connector --for=condition=Ready --timeout=300s -n kafka` +1. Deploy a Connect cluster + ``` + kubectl apply -f https://strimzi.io/examples/latest/connect/kafka-connect-build.yaml -n kafka + ``` +2. Wait for Connect to be ready + ``` + kubectl wait kafkaconnect/my-connect-cluster --for=condition=Ready --timeout=300s -n kafka + ``` +3. Enable `KafkaConnector` resources on your Connect cluster + ``` + kubectl annotate kafkaconnect my-connect-cluster strimzi.io/use-connector-resources=true -n kafka + ``` +4. Create a source connector + ``` + kubectl apply -f https://strimzi.io/examples/latest/connect/source-connector.yaml -n kafka + ``` +5. Wait for the source connector to be ready + ``` + kubectl wait kafkaconnector/my-source-connector --for=condition=Ready --timeout=300s -n kafka + ``` ### Listing offsets @@ -59,7 +74,7 @@ To trigger Strimzi to get the latest offsets, annotate your `KafkaConnector` res $ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list -n kafka ``` -After a couple of minutes you should have a new ConfigMap containing the output: +Once you have annotated the resource, Strimzi creates a new ConfigMap containing the offsets: ```shell $ kubectl get configmap my-connector-offsets -n kafka -oyaml @@ -95,7 +110,7 @@ data: (3) ``` 1. If the ConfigMap doesn't already exist, Strimzi will create it automatically. -2. The owner reference points to your `KafkaConnector` resource. To provide a custom owner reference, create the ConfigMap in advance and set an owner reference manually. +2. The owner reference points to your `KafkaConnector` resource. To provide a custom owner reference, for example to prevent the ConfigMap being deleted when the `KafkaConnector` resource is, create the ConfigMap in advance and set an owner reference manually. 3. Strimzi puts the offsets into a field called `offsets.json`. It doesn't overwrite any other fields when updating an existing ConfigMap. You can check that the output matches the results from the Connect REST API by calling the `GET /connectors/{connector}/offsets` endpoint directly: @@ -237,7 +252,7 @@ Therefore, to initiate an action from a `KafkaMirrorMaker2` resource you must ap Set `strimzi.io/connector-offsets` to one of `list`, `alter` or `reset`. At the same time set `strimzi.io/mirrormaker-connector` to the name of your connector. -Strimzi names the connectors using the format `->.`, for example `east-kafka->west-kafka.MirrorSourceConnector`. +Strimzi names the connectors using the format `[SOURCE_ALIAS]->[TARGET_ALIAS].[CONNECTOR_TYPE]`, for example `east-kafka->west-kafka.MirrorSourceConnector`. You can use a single command to annotate the resource with both annotations. For example, this command lists offsets for a connector called `east-kafka->west-kafka.MirrorSourceConnector`: From fffa022720e1ef0cdbe896d786d891d41d43bb9e Mon Sep 17 00:00:00 2001 From: Katherine Stanley <11195226+katheris@users.noreply.github.com> Date: Thu, 10 Oct 2024 16:40:14 +0100 Subject: [PATCH 5/5] Fix KafkaMirrorMaker2 annotation command Signed-off-by: Katherine Stanley <11195226+katheris@users.noreply.github.com> --- _posts/2024-09-26-managing-connector-offsets.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/_posts/2024-09-26-managing-connector-offsets.md b/_posts/2024-09-26-managing-connector-offsets.md index 8960c2939..26aace302 100644 --- a/_posts/2024-09-26-managing-connector-offsets.md +++ b/_posts/2024-09-26-managing-connector-offsets.md @@ -258,7 +258,7 @@ You can use a single command to annotate the resource with both annotations. For example, this command lists offsets for a connector called `east-kafka->west-kafka.MirrorSourceConnector`: ```shell -$ kubectl annotate kafkaconnector my-source-connector strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="east-kafka->west-kafka.MirrorSourceConnector" -n kafka +$ kubectl annotate kafkamirrormaker2 my-mirror-maker-2 strimzi.io/connector-offsets=list strimzi.io/mirrormaker-connector="east-kafka->west-kafka.MirrorSourceConnector" -n kafka ``` When listing and altering offsets for MirrorMaker connectors, Strimzi uses the connector name in the data field replacing `->` with `--`.