Skip to content

Commit

Permalink
Confluent Cloud custom connector tutorial (#1553)
Browse files Browse the repository at this point in the history
  • Loading branch information
davetroiano authored May 12, 2023
1 parent 450a9a1 commit 1eda0fb
Show file tree
Hide file tree
Showing 24 changed files with 444 additions and 2 deletions.
53 changes: 53 additions & 0 deletions _data/harnesses/custom-connector/confluent.yml
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
dev:
steps:
- title: Initialize the project
content:
- action: execute
file: tutorial-steps/dev/init.sh
render:
file: tutorials/custom-connector/confluent/markup/dev/init.adoc

- title: Provision your Kafka cluster
content:
- action: skip
render:
file: tutorials/custom-connector/confluent/markup/dev/ccloud-supported-provider-region.adoc
- action: skip
render:
file: shared/markup/ccloud/ccloud-setup-self.adoc

- title: Develop the custom connector
content:
- action: skip
render:
file: tutorials/custom-connector/confluent/markup/dev/write-connector.adoc

- title: Package the custom connector
content:
- action: skip
render:
file: tutorials/custom-connector/confluent/markup/dev/package-connector.adoc

- title: Add the custom connector plugin to Confluent Cloud
content:
- action: skip
render:
file: tutorials/custom-connector/confluent/markup/dev/add-connector-plugin.adoc

- title: Run the custom connector on Confluent Cloud
content:
- action: skip
render:
file: tutorials/custom-connector/confluent/markup/dev/provision-connector.adoc

- title: Monitor the custom connector on Confluent Cloud
content:
- action: skip
render:
file: tutorials/custom-connector/confluent/markup/dev/verify-connector.adoc

- title: Delete the custom connector
content:
- action: skip
render:
file: tutorials/custom-connector/confluent/markup/dev/delete-connector.adoc
16 changes: 14 additions & 2 deletions _data/tutorials.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -206,8 +206,8 @@ aggregating-count:
criteria?
introduction: Suppose you have a topic with events that represent ticket sales for
movies. In this tutorial, you'll see an example of 'groupby count' in Kafka Streams,
ksqlDB, and Flink SQL. We'll write a program that calculates the total number of tickets
sold per movie.
ksqlDB, and Flink SQL. We'll write a program that calculates the total number
of tickets sold per movie.
status:
ksql: enabled
kstreams: enabled
Expand Down Expand Up @@ -358,6 +358,18 @@ generate-test-data-streams:
ksql: enabled
kstreams: disabled
kafka: enabled
custom-connector:
title: Build and run a custom connector on Confluent Cloud
meta-description: Build and run a custom connector on Confluent Cloud
canonical: confluent
slug: /custom-connector
introduction: 'While Confluent Cloud offers many pre-built <a href=''https://docs.confluent.io/cloud/current/connectors/index.html#supported-connectors''>managed
connectors</a>, you may also upload and run custom connectors on Confluent Cloud — either an existing open source connector that you''d like to use,
or a connector that you''ve written. In this tutorial, we''ll write a simple source connector plugin, package it so that it can be uploaded to Confluent Cloud, and then run
the connector. As a developer, you want to write code. Let Confluent Cloud do the rest.'
question: How do I write, package, and run a custom connector on Confluent Cloud?
status:
confluent: enabled
aggregating-average:
title: Compute an average aggregation
meta-description: compute an average aggregation like count or sum
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
tutorial-steps/dev/outputs/
8 changes: 8 additions & 0 deletions _includes/tutorials/custom-connector/confluent/code/Makefile
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
STEPS_DIR := tutorial-steps
DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs
TEMP_DIR := $(shell mktemp -d)
SEQUENCE := "dev, test, prod, ccloud"

tutorial:
harness-runner ../../../../../_data/harnesses/custom-connector/confluent.yml $(TEMP_DIR) $(SEQUENCE)
reset
89 changes: 89 additions & 0 deletions _includes/tutorials/custom-connector/confluent/code/pom.xml
Original file line number Diff line number Diff line change
@@ -0,0 +1,89 @@
<?xml version="1.0" encoding="UTF-8"?>
<!--~
~ Copyright 2023 Confluent Inc.
~
~ Licensed under the Apache License, Version 2.0 (the "License");
~ you may not use this file except in compliance with the License.
~ You may obtain a copy of the License at
~
~ http://www.apache.org/licenses/LICENSE-2.0
~
~ Unless required by applicable law or agreed to in writing, software
~ distributed under the License is distributed on an "AS IS" BASIS,
~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
~ See the License for the specific language governing permissions and
~ limitations under the License.
~-->
<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<modelVersion>4.0.0</modelVersion>

<groupId>io.confluent.developer</groupId>
<artifactId>kafka-connect-counter</artifactId>
<version>0.0.1-SNAPSHOT</version>
<packaging>jar</packaging>

<properties>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
</properties>

<name>kafka-connect-counter</name>

<description>
A dummy Kafka Connect source connector that emits events containing incrementing numbers
</description>

<dependencies>
<dependency>
<groupId>org.apache.kafka</groupId>
<artifactId>connect-api</artifactId>
<version>3.4.0</version>
</dependency>

<!-- logging -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
<version>2.0.7</version>
</dependency>
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-reload4j</artifactId>
<version>2.0.7</version>
</dependency>
</dependencies>

<build>
<plugins>
<plugin>
<groupId>io.confluent</groupId>
<version>0.11.1</version>
<artifactId>kafka-connect-maven-plugin</artifactId>
<executions>
<execution>
<goals>
<goal>kafka-connect</goal>
</goals>
<configuration>
<title>Counter Kafka Connector</title>
<description>Demo connector that emits events with incrementing long values</description>
<ownerUsername>confluentinc</ownerUsername>
<ownerType>organization</ownerType>
<ownerName>Confluent, Inc.</ownerName>
<ownerUrl>https://confluent.io/</ownerUrl>
<componentTypes>
<componentType>source</componentType>
</componentTypes>
<confluentControlCenterIntegration>true
</confluentControlCenterIntegration>
<singleMessageTransforms>false
</singleMessageTransforms>
<supportedEncodings>UTF-8</supportedEncodings>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Original file line number Diff line number Diff line change
@@ -0,0 +1,50 @@
package io.confluent.developer.connect;

import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.connect.connector.Task;
import org.apache.kafka.connect.source.SourceConnector;

import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

public class CounterConnector extends SourceConnector {

private Map<String, String> props;

@Override
public String version() {
return CounterConnector.class.getPackage().getImplementationVersion();
}

@Override
public void start(Map<String, String> props) {
this.props = props;
}

@Override
public void stop() {
}

@Override
public Class<? extends Task> taskClass() {
return CounterSourceTask.class;
}

@Override
public List<Map<String, String>> taskConfigs(int maxTasks) {
List<Map<String, String>> taskConfigs = new ArrayList<>();
for (int i = 0; i < maxTasks; i++) {
Map<String, String> taskConfig = new HashMap<>(this.props);
taskConfig.put(CounterSourceTask.TASK_ID, Integer.toString(i));
taskConfigs.add(taskConfig);
}
return taskConfigs;
}

@Override
public ConfigDef config() {
return CounterConnectorConfig.conf();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,38 @@
package io.confluent.developer.connect;

import org.apache.kafka.common.config.AbstractConfig;
import org.apache.kafka.common.config.ConfigDef;
import org.apache.kafka.common.config.ConfigDef.Importance;
import org.apache.kafka.common.config.ConfigDef.Type;

import java.util.Map;

public class CounterConnectorConfig extends AbstractConfig {

public static final String KAFKA_TOPIC_CONF = "kafka.topic";
private static final String KAFKA_TOPIC_DOC = "Topic to write to";
public static final String INTERVAL_CONF = "interval.ms";
private static final String INTERVAL_DOC = "Interval between messages (ms)";

public CounterConnectorConfig(ConfigDef config, Map<String, String> parsedConfig) {
super(config, parsedConfig);
}

public CounterConnectorConfig(Map<String, String> parsedConfig) {
this(conf(), parsedConfig);
}

public static ConfigDef conf() {
return new ConfigDef()
.define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC)
.define(INTERVAL_CONF, Type.LONG, 1_000L, Importance.HIGH, INTERVAL_DOC);
}

public String getKafkaTopic() {
return this.getString(KAFKA_TOPIC_CONF);
}

public Long getInterval() {
return this.getLong(INTERVAL_CONF);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
package io.confluent.developer.connect;

import org.apache.kafka.connect.data.Schema;
import org.apache.kafka.connect.source.SourceRecord;
import org.apache.kafka.connect.source.SourceTask;

import java.util.Collections;
import java.util.List;
import java.util.Map;

public class CounterSourceTask extends SourceTask {

public static final String TASK_ID = "task.id";
public static final String CURRENT_ITERATION = "current.iteration";

private CounterConnectorConfig config;
private String topic;
private long interval;
private long count = 0L;
private int taskId;
private Map sourcePartition;

@Override
public String version() {
return CounterSourceTask.class.getPackage().getImplementationVersion();
}

@Override
public void start(Map<String, String> props) {
config = new CounterConnectorConfig(props);
topic = config.getKafkaTopic();
interval = config.getInterval();
taskId = Integer.parseInt(props.get(TASK_ID));
sourcePartition = Collections.singletonMap(TASK_ID, taskId);

Map<String, Object> offset = context.offsetStorageReader().offset(sourcePartition);
if (offset != null) {
// the offset contains our next state, so restore it as is
count = ((Long) offset.get(CURRENT_ITERATION));
}
}

@Override
public List<SourceRecord> poll() throws InterruptedException {

if (interval > 0) {
try {
Thread.sleep(interval);
} catch (InterruptedException e) {
Thread.interrupted();
return null;
}
}

Map sourceOffset = Collections.singletonMap(CURRENT_ITERATION, count + 1);

final List<SourceRecord> records = Collections.singletonList(new SourceRecord(
sourcePartition,
sourceOffset,
topic,
null,
Schema.INT64_SCHEMA,
count
));
count++;
return records;
}

@Override
public void stop() {
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mkdir -p custom-connector/src/main/java/io/confluent/developer/connect/ && cd custom-connector
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mkdir -p src/main/java/io/confluent/developer/connect
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
mvn package
Original file line number Diff line number Diff line change
@@ -0,0 +1,20 @@
Now we'll add the connector plugin to Confluent Cloud. We're not actually running the connector in this step; we're just uploading the plugin so that we can run the connector in the next section.

Go ahead and create a Kafka cluster in the Confluent Cloud Console if you didn't already do so link:#provision-your-kafka-cluster[above]. Your Kafka cluster *must* reside in a https://docs.confluent.io/cloud/current/connectors/bring-your-connector/custom-connector-fands.html#cc-byoc-regions[supported cloud provider and region] in order for you to be able to upload the plugin to Confluent Cloud.

On the cluster homepage, select `Connectors` from the lefthand navigation, and then click the `Add plugin` button on the top right.

+++++
<img src="{{ "/assets/img/custom-connector-add-plugin.png" | relative_url }}" alt="Add Custom Connector Plugin" width=800 />
+++++

Fill in the form as follows:

1. Name: `Counter Source`
2. Description: `Source connector that emits incrementing numbers`
3. Connector class: the class that extends Kafka Connect's `SourceConnector` class, i.e., `io.confluent.developer.connect.CounterConnector`
4. Connector type: Select `Source`
5. Click `Select connector archive` and choose the zip file built in the previous step
6. Sensitive properties: leave this section alone since the connector doesn't make use of any sensitive properties like credentials to connect to an external system
7. Review the custom connector disclaimer and check the box agreeing that you are responsible for the code uploaded to Confluent Cloud
8. Click `Submit`!
Original file line number Diff line number Diff line change
@@ -0,0 +1,2 @@
**Important:**
In order to run a custom connector, your Kafka cluster must reside in a https://docs.confluent.io/cloud/current/connectors/bring-your-connector/custom-connector-fands.html#cc-byoc-regions[supported cloud provider and region].
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
To stop the connector:

1. Select `Connectors` in the lefthand navigation, and then click the tile corresponding to the custom connector that's running
2. Click the `Settings` tab
3. Click the `Delete connector` link on the bottom left
4. Enter the connector name and then click `Confirm`
To also uninstall the custom connector plugin:

1. Find the `Counter Source` connector on the `Connectors` page
2. Hover over the connector tile, and then click `Edit plugin`
3. Click the `Delete plugin` link on the bottom left
4. Enter the plugin name and then click `Confirm`
Loading

0 comments on commit 1eda0fb

Please sign in to comment.