diff --git a/kop/java/README.md b/kop/java/README.md
index ff8ebaa..b968996 100644
--- a/kop/java/README.md
+++ b/kop/java/README.md
@@ -13,6 +13,8 @@ This document describes how to produce messages to and consume messages from a K
# Example
+## Example: Token authentication
+
See [KoP Security](https://github.com/streamnative/kop/blob/master/docs/security.md) for how to configure KoP with token authentication. This example takes a topic named `my-topic` under `public/default` namespace as reference.
1. Grant produce and consume permissions to the specific role.
@@ -64,3 +66,98 @@ See [KoP Security](https://github.com/streamnative/kop/blob/master/docs/security
```
Receive record: hello from persistent://public/default/my-topic-0@0
```
+
+## Example: OAuth2 authentication
+
+See [KoP Security](https://github.com/streamnative/kop/blob/master/docs/security.md#oauthbearer) for how to configure KoP with OAuth authentication. This example takes a topic named `my-topic` under `public/default` namespace as reference.
+
+1. Configure the pulsar broker, this example will use the follow values:
+
+ > **Note**
+ >
+ > Need to change the `credentials.json` and `kop-handler-oauth2.properties` paths to your local path. The example file can be found in `src/main/resources/`.
+ >
+ ```properties
+ # Enable KoP
+ messagingProtocols=kafka
+ protocolHandlerDirectory=./protocols
+ allowAutoTopicCreationType=partitioned
+
+ # Use `kafkaListeners` here for KoP 2.8.0 because `listeners` is marked as deprecated from KoP 2.8.0
+ kafkaListeners=PLAINTEXT://127.0.0.1:9092
+ # This config is not required unless you want to expose another address to the Kafka client.
+ # If it’s not configured, it will be the same with `kafkaListeners` config by default
+ kafkaAdvertisedListeners=PLAINTEXT://127.0.0.1:9092
+ brokerEntryMetadataInterceptors=org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor
+
+ brokerDeleteInactiveTopicsEnabled=false
+
+ # Enable the authentication
+ authenticationEnabled=true
+ authenticationProviders=org.apache.pulsar.broker.authentication.AuthenticationProviderToken
+ superUserRoles=Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x@clients
+ brokerClientAuthenticationPlugin=org.apache.pulsar.client.impl.auth.oauth2.AuthenticationOAuth2
+ brokerClientAuthenticationParameters={"type":"client_credentials","privateKey":"/path/to/credentials.json","issuerUrl":"https://dev-kt-aa9ne.us.auth0.com","audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/"}
+ tokenPublicKey=data:;base64,MIIBIjANBgkqhkiG9w0BAQEFAAOCAQ8AMIIBCgKCAQEA2tZd/4gJda3U2Pc3tpgRAN7JPGWx/Gn17v/0IiZlNNRbP/Mmf0Vc6G1qsnaRaWNWOR+t6/a6ekFHJMikQ1N2X6yfz4UjMc8/G2FDPRmWjA+GURzARjVhxc/BBEYGoD0Kwvbq/u9CZm2QjlKrYaLfg3AeB09j0btNrDJ8rBsNzU6AuzChRvXj9IdcE/A/4N/UQ+S9cJ4UXP6NJbToLwajQ5km+CnxdGE6nfB7LWHvOFHjn9C2Rb9e37CFlmeKmIVFkagFM0gbmGOb6bnGI8Bp/VNGV0APef4YaBvBTqwoZ1Z4aDHy5eRxXfAMdtBkBupmBXqL6bpd15XRYUbu/7ck9QIDAQAB
+
+ # Use the KoP's built-in handler
+ kopOauth2AuthenticateCallbackHandler=io.streamnative.pulsar.handlers.kop.security.oauth.OauthValidatorCallbackHandler
+
+ # Java property configuration file of OauthValidatorCallbackHandler
+ kopOauth2ConfigFile=/path/to/kop-handler-oauth2.properties
+
+ saslAllowedMechanisms=OAUTHBEARER
+ ```
+
+
+2. Configure the credentials in [credentials.json](src/main/resources/credentials.json).
+
+ ```json
+ {
+ "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+ "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
+ "audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/",
+ "grant_type":"client_credentials"
+ }
+ ```
+
+3. Configure the oauth in [oauth.properties](src/main/resources/oauth.properties).
+
+ ```properties
+ bootstrap.servers=localhost:9092
+ topic=persistent://public/default/my-topic
+ group=my-group
+ issuerUrl=https://dev-kt-aa9ne.us.auth0.com
+ credentialsUrl=/path/to/credentials.json
+ audience=https://dev-kt-aa9ne.us.auth0.com/api/v2/
+ ```
+
+4. Compile the project.
+
+ ```
+ mvn clean compile
+ ```
+
+5. Run a Kafka producer to produce a `hello` message.
+
+ ```bash
+ mvn exec:java -Dexec.mainClass=io.streamnative.examples.kafka.OAuthProducer
+ ```
+
+ **Output:**
+
+ ```
+ Send hello to persistent://public/default/my-topic-0@0
+ ```
+
+6. Run a Kafka consumer to consume some messages.
+
+ ```bash
+ mvn exec:java -Dexec.mainClass=io.streamnative.examples.kafka.OAuthConsumer
+ ```
+
+ **Output:**
+
+ ```
+ Receive record: hello from persistent://public/default/my-topic-0@0
+ ```
diff --git a/kop/java/pom.xml b/kop/java/pom.xml
index 25b458d..c9b4a3d 100644
--- a/kop/java/pom.xml
+++ b/kop/java/pom.xml
@@ -41,6 +41,12 @@
${slf4j.simple.version}
runtime
+
+
+ io.streamnative.pulsar.handlers
+ oauth-client
+ 2.8.3.1
+
diff --git a/kop/java/src/main/java/io/streamnative/examples/kafka/OAuthConsumer.java b/kop/java/src/main/java/io/streamnative/examples/kafka/OAuthConsumer.java
new file mode 100644
index 0000000..51bcc00
--- /dev/null
+++ b/kop/java/src/main/java/io/streamnative/examples/kafka/OAuthConsumer.java
@@ -0,0 +1,74 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.examples.kafka;
+
+import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
+import org.apache.kafka.clients.consumer.ConsumerConfig;
+import org.apache.kafka.clients.consumer.ConsumerRecords;
+import org.apache.kafka.clients.consumer.KafkaConsumer;
+import org.apache.kafka.common.serialization.StringDeserializer;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.time.Duration;
+import java.util.Collections;
+import java.util.Properties;
+
+public class OAuthConsumer {
+
+ public static void main(String[] args) throws IOException {
+ // 1. Get the configured parameters from oauth.properties
+ final Properties properties = new Properties();
+ properties.load(TokenProducer.class.getClassLoader().getResourceAsStream("oauth.properties"));
+ String bootstrapServers = properties.getProperty("bootstrap.servers");
+ String topic = properties.getProperty("topic");
+ String group = properties.getProperty("group");
+ String issuerUrl = properties.getProperty("issuerUrl");
+ String credentialsUrl = properties.getProperty("credentialsUrl");
+ String audience = properties.getProperty("audience");
+
+ // 2. Create a consumer with OAuth authentication.
+ final Properties props = new Properties();
+ props.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
+ props.put(ConsumerConfig.GROUP_ID_CONFIG, group);
+ props.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
+ props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
+ props.setProperty("security.protocol", "SASL_PLAINTEXT");
+ props.setProperty("sasl.mechanism", "OAUTHBEARER");
+ final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
+ + " oauth.issuer.url=\"%s\""
+ + " oauth.credentials.url=\"%s\""
+ + " oauth.audience=\"%s\";";
+ props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
+ issuerUrl,
+ "file://" + Paths.get(credentialsUrl).toAbsolutePath(),
+ audience
+ ));
+ final KafkaConsumer consumer = new KafkaConsumer<>(props);
+ consumer.subscribe(Collections.singleton(topic));
+
+ // 3. Consume some messages and quit immediately
+ boolean running = true;
+ while (running) {
+ final ConsumerRecords records = consumer.poll(Duration.ofSeconds(1));
+ if (!records.isEmpty()) {
+ records.forEach(record -> System.out.println("Receive record: " + record.value() + " from "
+ + record.topic() + "-" + record.partition() + "@" + record.offset()));
+ running = false;
+ }
+ }
+ consumer.close();
+ }
+}
diff --git a/kop/java/src/main/java/io/streamnative/examples/kafka/OAuthProducer.java b/kop/java/src/main/java/io/streamnative/examples/kafka/OAuthProducer.java
new file mode 100644
index 0000000..1600fb3
--- /dev/null
+++ b/kop/java/src/main/java/io/streamnative/examples/kafka/OAuthProducer.java
@@ -0,0 +1,65 @@
+/**
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package io.streamnative.examples.kafka;
+
+import io.streamnative.pulsar.handlers.kop.security.oauth.OauthLoginCallbackHandler;
+import org.apache.kafka.clients.producer.KafkaProducer;
+import org.apache.kafka.clients.producer.ProducerConfig;
+import org.apache.kafka.clients.producer.ProducerRecord;
+import org.apache.kafka.clients.producer.RecordMetadata;
+import org.apache.kafka.common.serialization.StringSerializer;
+import java.io.IOException;
+import java.nio.file.Paths;
+import java.util.Properties;
+import java.util.concurrent.ExecutionException;
+import java.util.concurrent.Future;
+
+public class OAuthProducer {
+
+ public static void main(String[] args) throws IOException, ExecutionException, InterruptedException {
+ // 1. Get the configured parameters from oauth.properties
+ final Properties properties = new Properties();
+ properties.load(OAuthProducer.class.getClassLoader().getResourceAsStream("oauth.properties"));
+ String bootstrapServers = properties.getProperty("bootstrap.servers");
+ String topic = properties.getProperty("topic");
+ String issuerUrl = properties.getProperty("issuerUrl");
+ String credentialsUrl = properties.getProperty("credentialsUrl");
+ String audience = properties.getProperty("audience");
+
+ // 2. Create a producer with OAuth authentication.
+ final Properties props = new Properties();
+ props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServers);
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
+ props.setProperty("sasl.login.callback.handler.class", OauthLoginCallbackHandler.class.getName());
+ props.setProperty("security.protocol", "SASL_PLAINTEXT");
+ props.setProperty("sasl.mechanism", "OAUTHBEARER");
+ final String jaasTemplate = "org.apache.kafka.common.security.oauthbearer.OAuthBearerLoginModule required"
+ + " oauth.issuer.url=\"%s\""
+ + " oauth.credentials.url=\"%s\""
+ + " oauth.audience=\"%s\";";
+ props.setProperty("sasl.jaas.config", String.format(jaasTemplate,
+ issuerUrl,
+ "file://" + Paths.get(credentialsUrl).toAbsolutePath(),
+ audience
+ ));
+ final KafkaProducer producer = new KafkaProducer<>(props);
+
+ // 3. Produce one message
+ final Future recordMetadataFuture = producer.send(new ProducerRecord<>(topic, "hello"));
+ final RecordMetadata recordMetadata = recordMetadataFuture.get();
+ System.out.println("Send hello to " + recordMetadata);
+ producer.close();
+ }
+}
diff --git a/kop/java/src/main/resources/credentials.json b/kop/java/src/main/resources/credentials.json
new file mode 100644
index 0000000..ce61f69
--- /dev/null
+++ b/kop/java/src/main/resources/credentials.json
@@ -0,0 +1,6 @@
+{
+ "client_id":"Xd23RHsUnvUlP7wchjNYOaIfazgeHd9x",
+ "client_secret":"rT7ps7WY8uhdVuBTKWZkttwLdQotmdEliaM5rLfmgNibvqziZ-g07ZH52N_poGAb",
+ "audience":"https://dev-kt-aa9ne.us.auth0.com/api/v2/",
+ "grant_type":"client_credentials"
+}
\ No newline at end of file
diff --git a/kop/java/src/main/resources/kop-handler-oauth2.properties b/kop/java/src/main/resources/kop-handler-oauth2.properties
new file mode 100644
index 0000000..d110e59
--- /dev/null
+++ b/kop/java/src/main/resources/kop-handler-oauth2.properties
@@ -0,0 +1,15 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+oauth.validate.method=token
diff --git a/kop/java/src/main/resources/oauth.properties b/kop/java/src/main/resources/oauth.properties
new file mode 100644
index 0000000..c4aa39f
--- /dev/null
+++ b/kop/java/src/main/resources/oauth.properties
@@ -0,0 +1,20 @@
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+bootstrap.servers=localhost:9092
+topic=persistent://public/default/my-topic
+group=my-group
+issuerUrl=https://dev-kt-aa9ne.us.auth0.com
+credentialsUrl=/path/to/credentials.json
+audience=https://dev-kt-aa9ne.us.auth0.com/api/v2/
\ No newline at end of file