diff --git a/Dockerfile b/Dockerfile
index 73e7f99..cba2b28 100644
--- a/Dockerfile
+++ b/Dockerfile
@@ -1,6 +1,6 @@
# https://www.keycloak.org/server/containers
-FROM quay.io/keycloak/keycloak:19.0.3
-RUN curl -sL https://github.com/SnuK87/keycloak-kafka/releases/download/1.1.1/keycloak-kafka-1.1.1-jar-with-dependencies.jar -o /opt/keycloak/providers/keycloak-kafka-1.1.1-jar-with-dependencies.jar
+FROM quay.io/keycloak/keycloak:latest
+COPY target/keycloak-kafka-1.2.0-jar-with-dependencies.jar /opt/keycloak/providers/keycloak-kafka-1.2.0-jar-with-dependencies.jar
RUN /opt/keycloak/bin/kc.sh build
ENTRYPOINT ["/opt/keycloak/bin/kc.sh"]
\ No newline at end of file
diff --git a/docker-compose.yml b/docker-compose.yml
index b44d7b2..226a7b0 100644
--- a/docker-compose.yml
+++ b/docker-compose.yml
@@ -2,16 +2,21 @@ version: '3'
services:
keycloak:
depends_on:
- - "kafka"
+ - kafka
image: keycloak-kafka
+ build: .
ports:
- "8080:8080"
environment:
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: admin
KAFKA_TOPIC: keycloak-events
+ KAFKA_ADMIN_TOPIC: keycloak-admin-events
KAFKA_CLIENT_ID: keycloak
- KAFKA_BOOTSTRAP_SERVERS: kafka:9094
+ KAFKA_BOOTSTRAP_SERVERS: kafka:9092
+ KAFKA_SCHEMA_REGISTRY_URL: http://schema-registry:8081
+ KAFKA_VALUE_SERIALIZER_CLASS: io.confluent.kafka.serializers.KafkaAvroSerializer
+ AUTO_REGISTER_SCHEMAS: 'true'
command: start-dev
zookeeper:
@@ -24,7 +29,7 @@ services:
kafka:
depends_on:
- - "zookeeper"
+ - zookeeper
image: bitnami/kafka:latest
ports:
- "9092:9092"
@@ -38,4 +43,45 @@ services:
KAFKA_CREATE_TOPICS: "keycloak-events:1:1"
ALLOW_PLAINTEXT_LISTENER: "yes"
volumes:
- - /var/run/docker.sock:/var/run/docker.sock
\ No newline at end of file
+ - /var/run/docker.sock:/var/run/docker.sock
+
+ schema-registry:
+ image: confluentinc/cp-schema-registry:7.3.3
+ restart: always
+ depends_on:
+ - kafka
+ ports:
+ - "8081:8081"
+ environment:
+ SCHEMA_REGISTRY_HOST_NAME: schema-registry
+ SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
+ SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
+ deploy:
+ resources:
+ limits:
+ memory: 512MB
+
+ redpanda-console:
+ image: docker.redpanda.com/vectorized/console:latest
+ restart: always
+ depends_on:
+ - kafka
+ ports:
+ - "9000:8080"
+ environment:
+ SERVER_BASEPATH: /redpanda
+ #METRICSNAMESPACE: redpanda-console
+ KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:9092}
+ KAFKA_SCHEMAREGISTRY_ENABLED: "true"
+ KAFKA_SCHEMAREGISTRY_URLS: "http://schema-registry:8081"
+ CONNECT_ENABLED: "false"
+ CONNECT_CLUSTERS_NAME: connect-cluster
+ CONNECT_CLUSTERS_URL: "http://connect:8083"
+ deploy:
+ resources:
+ limits:
+ memory: 1G
+
+networks:
+ default:
+ name: keycloak-kafka-network
\ No newline at end of file
diff --git a/pom.xml b/pom.xml
index 61d575f..7bb1221 100644
--- a/pom.xml
+++ b/pom.xml
@@ -16,6 +16,13 @@
3.6.1.Final
+
+
+ confluent
+ https://packages.confluent.io/maven/
+
+
+
org.keycloak
@@ -62,6 +69,17 @@
${junit.version}
test
+
+
+ io.confluent
+ kafka-schema-registry
+ ${confluent.version}
+
+
+ io.confluent
+ kafka-avro-serializer
+ ${confluent.version}
+
diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java
index 3b67b8d..0308b27 100644
--- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java
+++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java
@@ -2,6 +2,8 @@
import java.util.HashMap;
import java.util.Map;
+
+import org.apache.kafka.clients.producer.ProducerConfig;
import org.keycloak.Config.Scope;
public class KafkaProducerConfig {
@@ -9,8 +11,8 @@ public class KafkaProducerConfig {
// https://kafka.apache.org/documentation/#producerconfigs
public static Map init(Scope scope) {
- Map propertyMap = new HashMap<>();
- KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();
+ final Map propertyMap = new HashMap<>();
+ final KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();
for (KafkaProducerProperty property : producerProperties) {
String propertyEnv = System.getenv("KAFKA_" + property.name());
@@ -27,6 +29,13 @@ public static Map init(Scope scope) {
}
enum KafkaProducerProperty {
+ KEY_SERIALIZER_CLASS(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG),
+ VALUE_SERIALIZER_CLASS(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
+ AUTO_REGISTER_SCHEMAS("auto.register.schemas"),
+ USE_LATEST_VERSION("use.latest.version"),
+ SCHEMA_REGISTRY_URL("schema.registry.url"),
+ SCHEMA_REGISTRY_USER("schema.registry.user"),
+ SCHEMA_REGISTRY_PASSWORD("schema.registry.password"),
ACKS("acks"), //
BUFFER_MEMORY("buffer.memory"), //
COMPRESSION_TYPE("compression.type"), //
diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java
index 9e45c74..72216f7 100644
--- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java
+++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java
@@ -8,21 +8,17 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
public final class KafkaStandardProducerFactory implements KafkaProducerFactory {
- private static final Logger log = LoggerFactory.getLogger(KafkaStandardProducerFactory.class);
-
@Override
public Producer createProducer(String clientId, String bootstrapServer,
Map optionalProperties) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
- props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
- props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
+ props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
+ props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()));
props.putAll(optionalProperties);
return new KafkaProducer<>(props);