diff --git a/_data/harnesses/custom-connector/confluent.yml b/_data/harnesses/custom-connector/confluent.yml new file mode 100644 index 0000000000..b685feb923 --- /dev/null +++ b/_data/harnesses/custom-connector/confluent.yml @@ -0,0 +1,53 @@ +dev: + steps: + - title: Initialize the project + content: + - action: execute + file: tutorial-steps/dev/init.sh + render: + file: tutorials/custom-connector/confluent/markup/dev/init.adoc + + - title: Provision your Kafka cluster + content: + - action: skip + render: + file: tutorials/custom-connector/confluent/markup/dev/ccloud-supported-provider-region.adoc + - action: skip + render: + file: shared/markup/ccloud/ccloud-setup-self.adoc + + - title: Develop the custom connector + content: + - action: skip + render: + file: tutorials/custom-connector/confluent/markup/dev/write-connector.adoc + + - title: Package the custom connector + content: + - action: skip + render: + file: tutorials/custom-connector/confluent/markup/dev/package-connector.adoc + + - title: Add the custom connector plugin to Confluent Cloud + content: + - action: skip + render: + file: tutorials/custom-connector/confluent/markup/dev/add-connector-plugin.adoc + + - title: Run the custom connector on Confluent Cloud + content: + - action: skip + render: + file: tutorials/custom-connector/confluent/markup/dev/provision-connector.adoc + + - title: Monitor the custom connector on Confluent Cloud + content: + - action: skip + render: + file: tutorials/custom-connector/confluent/markup/dev/verify-connector.adoc + + - title: Delete the custom connector + content: + - action: skip + render: + file: tutorials/custom-connector/confluent/markup/dev/delete-connector.adoc diff --git a/_data/tutorials.yaml b/_data/tutorials.yaml index 59feb6c1d6..f89f04c097 100644 --- a/_data/tutorials.yaml +++ b/_data/tutorials.yaml @@ -206,8 +206,8 @@ aggregating-count: criteria? introduction: Suppose you have a topic with events that represent ticket sales for movies. In this tutorial, you'll see an example of 'groupby count' in Kafka Streams, - ksqlDB, and Flink SQL. We'll write a program that calculates the total number of tickets - sold per movie. + ksqlDB, and Flink SQL. We'll write a program that calculates the total number + of tickets sold per movie. status: ksql: enabled kstreams: enabled @@ -358,6 +358,18 @@ generate-test-data-streams: ksql: enabled kstreams: disabled kafka: enabled +custom-connector: + title: Build and run a custom connector on Confluent Cloud + meta-description: Build and run a custom connector on Confluent Cloud + canonical: confluent + slug: /custom-connector + introduction: 'While Confluent Cloud offers many pre-built managed + connectors, you may also upload and run custom connectors on Confluent Cloud — either an existing open source connector that you''d like to use, + or a connector that you''ve written. In this tutorial, we''ll write a simple source connector plugin, package it so that it can be uploaded to Confluent Cloud, and then run + the connector. As a developer, you want to write code. Let Confluent Cloud do the rest.' + question: How do I write, package, and run a custom connector on Confluent Cloud? + status: + confluent: enabled aggregating-average: title: Compute an average aggregation meta-description: compute an average aggregation like count or sum diff --git a/_includes/tutorials/custom-connector/confluent/code/.gitignore b/_includes/tutorials/custom-connector/confluent/code/.gitignore new file mode 100644 index 0000000000..62eb2be5aa --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/.gitignore @@ -0,0 +1 @@ +tutorial-steps/dev/outputs/ diff --git a/_includes/tutorials/custom-connector/confluent/code/Makefile b/_includes/tutorials/custom-connector/confluent/code/Makefile new file mode 100644 index 0000000000..2ec7991715 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/Makefile @@ -0,0 +1,8 @@ +STEPS_DIR := tutorial-steps +DEV_OUTPUTS_DIR := $(STEPS_DIR)/dev/outputs +TEMP_DIR := $(shell mktemp -d) +SEQUENCE := "dev, test, prod, ccloud" + +tutorial: + harness-runner ../../../../../_data/harnesses/custom-connector/confluent.yml $(TEMP_DIR) $(SEQUENCE) + reset diff --git a/_includes/tutorials/custom-connector/confluent/code/pom.xml b/_includes/tutorials/custom-connector/confluent/code/pom.xml new file mode 100644 index 0000000000..7ac1becfb3 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/pom.xml @@ -0,0 +1,89 @@ + + + + 4.0.0 + + io.confluent.developer + kafka-connect-counter + 0.0.1-SNAPSHOT + jar + + + UTF-8 + 1.8 + 1.8 + + + kafka-connect-counter + + + A dummy Kafka Connect source connector that emits events containing incrementing numbers + + + + + org.apache.kafka + connect-api + 3.4.0 + + + + + org.slf4j + slf4j-api + 2.0.7 + + + org.slf4j + slf4j-reload4j + 2.0.7 + + + + + + + io.confluent + 0.11.1 + kafka-connect-maven-plugin + + + + kafka-connect + + + Counter Kafka Connector + Demo connector that emits events with incrementing long values + confluentinc + organization + Confluent, Inc. + https://confluent.io/ + + source + + true + + false + + UTF-8 + + + + + + + diff --git a/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterConnector.java b/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterConnector.java new file mode 100644 index 0000000000..8ad3a30488 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterConnector.java @@ -0,0 +1,50 @@ +package io.confluent.developer.connect; + +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.connect.connector.Task; +import org.apache.kafka.connect.source.SourceConnector; + +import java.util.ArrayList; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + +public class CounterConnector extends SourceConnector { + + private Map props; + + @Override + public String version() { + return CounterConnector.class.getPackage().getImplementationVersion(); + } + + @Override + public void start(Map props) { + this.props = props; + } + + @Override + public void stop() { + } + + @Override + public Class taskClass() { + return CounterSourceTask.class; + } + + @Override + public List> taskConfigs(int maxTasks) { + List> taskConfigs = new ArrayList<>(); + for (int i = 0; i < maxTasks; i++) { + Map taskConfig = new HashMap<>(this.props); + taskConfig.put(CounterSourceTask.TASK_ID, Integer.toString(i)); + taskConfigs.add(taskConfig); + } + return taskConfigs; + } + + @Override + public ConfigDef config() { + return CounterConnectorConfig.conf(); + } +} diff --git a/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterConnectorConfig.java b/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterConnectorConfig.java new file mode 100644 index 0000000000..d588b435a0 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterConnectorConfig.java @@ -0,0 +1,38 @@ +package io.confluent.developer.connect; + +import org.apache.kafka.common.config.AbstractConfig; +import org.apache.kafka.common.config.ConfigDef; +import org.apache.kafka.common.config.ConfigDef.Importance; +import org.apache.kafka.common.config.ConfigDef.Type; + +import java.util.Map; + +public class CounterConnectorConfig extends AbstractConfig { + + public static final String KAFKA_TOPIC_CONF = "kafka.topic"; + private static final String KAFKA_TOPIC_DOC = "Topic to write to"; + public static final String INTERVAL_CONF = "interval.ms"; + private static final String INTERVAL_DOC = "Interval between messages (ms)"; + + public CounterConnectorConfig(ConfigDef config, Map parsedConfig) { + super(config, parsedConfig); + } + + public CounterConnectorConfig(Map parsedConfig) { + this(conf(), parsedConfig); + } + + public static ConfigDef conf() { + return new ConfigDef() + .define(KAFKA_TOPIC_CONF, Type.STRING, Importance.HIGH, KAFKA_TOPIC_DOC) + .define(INTERVAL_CONF, Type.LONG, 1_000L, Importance.HIGH, INTERVAL_DOC); + } + + public String getKafkaTopic() { + return this.getString(KAFKA_TOPIC_CONF); + } + + public Long getInterval() { + return this.getLong(INTERVAL_CONF); + } +} diff --git a/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterSourceTask.java b/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterSourceTask.java new file mode 100644 index 0000000000..ff057da9b9 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterSourceTask.java @@ -0,0 +1,72 @@ +package io.confluent.developer.connect; + +import org.apache.kafka.connect.data.Schema; +import org.apache.kafka.connect.source.SourceRecord; +import org.apache.kafka.connect.source.SourceTask; + +import java.util.Collections; +import java.util.List; +import java.util.Map; + +public class CounterSourceTask extends SourceTask { + + public static final String TASK_ID = "task.id"; + public static final String CURRENT_ITERATION = "current.iteration"; + + private CounterConnectorConfig config; + private String topic; + private long interval; + private long count = 0L; + private int taskId; + private Map sourcePartition; + + @Override + public String version() { + return CounterSourceTask.class.getPackage().getImplementationVersion(); + } + + @Override + public void start(Map props) { + config = new CounterConnectorConfig(props); + topic = config.getKafkaTopic(); + interval = config.getInterval(); + taskId = Integer.parseInt(props.get(TASK_ID)); + sourcePartition = Collections.singletonMap(TASK_ID, taskId); + + Map offset = context.offsetStorageReader().offset(sourcePartition); + if (offset != null) { + // the offset contains our next state, so restore it as is + count = ((Long) offset.get(CURRENT_ITERATION)); + } + } + + @Override + public List poll() throws InterruptedException { + + if (interval > 0) { + try { + Thread.sleep(interval); + } catch (InterruptedException e) { + Thread.interrupted(); + return null; + } + } + + Map sourceOffset = Collections.singletonMap(CURRENT_ITERATION, count + 1); + + final List records = Collections.singletonList(new SourceRecord( + sourcePartition, + sourceOffset, + topic, + null, + Schema.INT64_SCHEMA, + count + )); + count++; + return records; + } + + @Override + public void stop() { + } +} diff --git a/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/init.sh b/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/init.sh new file mode 100644 index 0000000000..8b87274995 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/init.sh @@ -0,0 +1 @@ +mkdir -p custom-connector/src/main/java/io/confluent/developer/connect/ && cd custom-connector diff --git a/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/make-src-dir.sh b/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/make-src-dir.sh new file mode 100644 index 0000000000..67eb8ee52b --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/make-src-dir.sh @@ -0,0 +1 @@ +mkdir -p src/main/java/io/confluent/developer/connect diff --git a/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/package-connector.sh b/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/package-connector.sh new file mode 100644 index 0000000000..5b5d577e71 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/code/tutorial-steps/dev/package-connector.sh @@ -0,0 +1 @@ +mvn package \ No newline at end of file diff --git a/_includes/tutorials/custom-connector/confluent/markup/dev/add-connector-plugin.adoc b/_includes/tutorials/custom-connector/confluent/markup/dev/add-connector-plugin.adoc new file mode 100644 index 0000000000..322b88c289 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/markup/dev/add-connector-plugin.adoc @@ -0,0 +1,20 @@ +Now we'll add the connector plugin to Confluent Cloud. We're not actually running the connector in this step; we're just uploading the plugin so that we can run the connector in the next section. + +Go ahead and create a Kafka cluster in the Confluent Cloud Console if you didn't already do so link:#provision-your-kafka-cluster[above]. Your Kafka cluster *must* reside in a https://docs.confluent.io/cloud/current/connectors/bring-your-connector/custom-connector-fands.html#cc-byoc-regions[supported cloud provider and region] in order for you to be able to upload the plugin to Confluent Cloud. + +On the cluster homepage, select `Connectors` from the lefthand navigation, and then click the `Add plugin` button on the top right. + ++++++ +Add Custom Connector Plugin ++++++ + +Fill in the form as follows: + +1. Name: `Counter Source` +2. Description: `Source connector that emits incrementing numbers` +3. Connector class: the class that extends Kafka Connect's `SourceConnector` class, i.e., `io.confluent.developer.connect.CounterConnector` +4. Connector type: Select `Source` +5. Click `Select connector archive` and choose the zip file built in the previous step +6. Sensitive properties: leave this section alone since the connector doesn't make use of any sensitive properties like credentials to connect to an external system +7. Review the custom connector disclaimer and check the box agreeing that you are responsible for the code uploaded to Confluent Cloud +8. Click `Submit`! diff --git a/_includes/tutorials/custom-connector/confluent/markup/dev/ccloud-supported-provider-region.adoc b/_includes/tutorials/custom-connector/confluent/markup/dev/ccloud-supported-provider-region.adoc new file mode 100644 index 0000000000..b661305a83 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/markup/dev/ccloud-supported-provider-region.adoc @@ -0,0 +1,2 @@ +**Important:** +In order to run a custom connector, your Kafka cluster must reside in a https://docs.confluent.io/cloud/current/connectors/bring-your-connector/custom-connector-fands.html#cc-byoc-regions[supported cloud provider and region]. \ No newline at end of file diff --git a/_includes/tutorials/custom-connector/confluent/markup/dev/delete-connector.adoc b/_includes/tutorials/custom-connector/confluent/markup/dev/delete-connector.adoc new file mode 100644 index 0000000000..9a53c5995c --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/markup/dev/delete-connector.adoc @@ -0,0 +1,13 @@ +To stop the connector: + +1. Select `Connectors` in the lefthand navigation, and then click the tile corresponding to the custom connector that's running +2. Click the `Settings` tab +3. Click the `Delete connector` link on the bottom left +4. Enter the connector name and then click `Confirm` + +To also uninstall the custom connector plugin: + +1. Find the `Counter Source` connector on the `Connectors` page +2. Hover over the connector tile, and then click `Edit plugin` +3. Click the `Delete plugin` link on the bottom left +4. Enter the plugin name and then click `Confirm` \ No newline at end of file diff --git a/_includes/tutorials/custom-connector/confluent/markup/dev/init.adoc b/_includes/tutorials/custom-connector/confluent/markup/dev/init.adoc new file mode 100644 index 0000000000..148e6001b0 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/markup/dev/init.adoc @@ -0,0 +1,5 @@ +To get started, make a new directory anywhere you'd like for this project: + ++++++ +
{% include_raw tutorials/custom-connector/confluent/code/tutorial-steps/dev/init.sh %}
++++++ diff --git a/_includes/tutorials/custom-connector/confluent/markup/dev/package-connector.adoc b/_includes/tutorials/custom-connector/confluent/markup/dev/package-connector.adoc new file mode 100644 index 0000000000..a2cb79c2a0 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/markup/dev/package-connector.adoc @@ -0,0 +1,7 @@ +In your terminal, run the following command. You can ignore the warnings generated by the `kafka-connect-maven-plugin`. + ++++++ +
{% include_raw tutorials/custom-connector/confluent/code/tutorial-steps/dev/package-connector.sh %}
++++++ + +This generates the connector zip in the `target/components/packages/` directory. diff --git a/_includes/tutorials/custom-connector/confluent/markup/dev/provision-connector.adoc b/_includes/tutorials/custom-connector/confluent/markup/dev/provision-connector.adoc new file mode 100644 index 0000000000..6d2c07694a --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/markup/dev/provision-connector.adoc @@ -0,0 +1,10 @@ +Let's now run the connector. + +1. On the cluster homepage, select `Connectors` from the lefthand navigation +2. In the `Filter by:` section, click the `Deployment` dropdown and select `Custom` +3. Click the `Counter Source` connector tile +4. Click `Generate API key & download` in order to create Kafka credentials that the connector will use, and then click `Continue` on the bottom right +5. In the `Configuration` step, enter `kafka.topic` as the key and `counter-topic` as the value. This is the only required property. There is also an `interval.ms` property to specify the number of milliseconds to wait between events, but we can leave that out and accept the default of 1000 (1 second). +6. Click `Continue` to proceed to the `Networking` section, and then `Continue` again to proceed to the `Sizing` section since there are no connection endpoints to allow list +7. Click `Continue` to run the connector with a single task +8. Click `Continue` once more to provision the connector. You'll see a tile for the connector on the Connectors page showing that the connector is `Provisioning`. diff --git a/_includes/tutorials/custom-connector/confluent/markup/dev/verify-connector.adoc b/_includes/tutorials/custom-connector/confluent/markup/dev/verify-connector.adoc new file mode 100644 index 0000000000..a977b2cacc --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/markup/dev/verify-connector.adoc @@ -0,0 +1,16 @@ +Once the connector has been provisioned, click on its tile on the `Connectors` page. + +In the `Overview` tab, you'll see that the connector is generating messages at a throughput of 1 message per second. + ++++++ +Custom Connector Overview ++++++ + +Now click the `Logs` tab, where you can search and filter log messages that the connector is generating in order to monitor or debug. + ++++++ +Custom Connector Logs ++++++ + +Now, click on `Topics` in the lefthand navigation and select the topic that the source connector is writing to (`counter-topic`). In the `Messages` viewer, you'll see incrementing +numbers being emitted. diff --git a/_includes/tutorials/custom-connector/confluent/markup/dev/write-connector.adoc b/_includes/tutorials/custom-connector/confluent/markup/dev/write-connector.adoc new file mode 100644 index 0000000000..d969c5ad60 --- /dev/null +++ b/_includes/tutorials/custom-connector/confluent/markup/dev/write-connector.adoc @@ -0,0 +1,36 @@ +We'll start by developing a simple source connector that emits numeric data to a Kafka topic. + +We'll build the connector with Maven, so ensure that you have it https://maven.apache.org/install.html[installed] and on +your path. Before proceeding, verify that `mvn -version` succeeds when you run it on the command line. + +First, create the following `pom.xml` file in the `custom-connector` directory: + ++++++ +
{% include_raw tutorials/custom-connector/confluent/code/pom.xml %}
++++++ + +Most of the POM is boilerplate, but notice two important sections that any custom connector should have: + +1. A dependency on the Kafka Connect API, `connect-api` + +2. The `kafka-connect-maven-plugin` plugin, which does the heavy lifting to build the connector zip following https://docs.confluent.io/kafka-connectors/self-managed/confluent-hub/component-archive.html[this spec]. This lets us simply run `mvn package` later in this tutorial in order to build the plugin zip that we will upload to Confluent Cloud. + +Now we'll create the three classes that comprise our connector: a configuration class, the connector class that most notably builds connector task configurations, and the source task class that generates records. + +Go ahead and create the following file at `src/main/java/io/confluent/developer/connect/CounterConnectorConfig.java`. This class defines the two configuration properties for our connector: the topic to write to (`kafka.topic`), and the number of milliseconds to wait between emitting events (`interval.ms`). + ++++++ +
{% include_raw tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterConnectorConfig.java %}
++++++ + +Next, create the following file at `src/main/java/io/confluent/developer/connect/CounterConnector.java`. You can think of this class as the glue between the connector configuration and the task implementation. We'll provide the fully qualified name of this class (`io.confluent.developer.connect.CounterConnector`) later in this tutorial when adding the connector plugin to Confluent Cloud. + ++++++ +
{% include_raw tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterConnector.java %}
++++++ + +Finally, create the following file at `src/main/java/io/confluent/developer/connect/CounterSourceTask.java`. The `poll()` method in this class generates our numeric events. + ++++++ +
{% include_raw tutorials/custom-connector/confluent/code/src/main/java/io/confluent/developer/connect/CounterSourceTask.java %}
++++++ diff --git a/assets/img/custom-connector-add-plugin.png b/assets/img/custom-connector-add-plugin.png new file mode 100644 index 0000000000..6fc138f715 Binary files /dev/null and b/assets/img/custom-connector-add-plugin.png differ diff --git a/assets/img/custom-connector-logs.png b/assets/img/custom-connector-logs.png new file mode 100644 index 0000000000..096a41f00d Binary files /dev/null and b/assets/img/custom-connector-logs.png differ diff --git a/assets/img/custom-connector-overview.png b/assets/img/custom-connector-overview.png new file mode 100644 index 0000000000..9ca75a88fc Binary files /dev/null and b/assets/img/custom-connector-overview.png differ diff --git a/index.html b/index.html index 4431e91f2d..dc6c6b02b6 100644 --- a/index.html +++ b/index.html @@ -200,6 +200,7 @@

From Idea to Proof-of-concept: Learn the Apache Kafka® fun
  • Generate complex mock Kafka data
  • Add a key to a stream from a JDBC source
  • Convert timezone before sending to a database
  • +
  • Build and deploy a custom connector on Confluent Cloud
  • diff --git a/tutorials/custom-connector/confluent.html b/tutorials/custom-connector/confluent.html new file mode 100644 index 0000000000..defb0d804e --- /dev/null +++ b/tutorials/custom-connector/confluent.html @@ -0,0 +1,6 @@ +--- +layout: tutorial +permalink: /custom-connector/confluent +stack: confluent +static_data: custom-connector +---