From 15b663c21bc11a7ef761497b460731272f709ebd Mon Sep 17 00:00:00 2001 From: ttimot24 Date: Fri, 20 Sep 2024 11:02:17 +0200 Subject: [PATCH] #46 - Added AVRO serializer capability --- Dockerfile | 4 +- docker-compose.yml | 54 +++++++++++++++++-- pom.xml | 18 +++++++ .../keycloak/kafka/KafkaProducerConfig.java | 13 ++++- .../kafka/KafkaStandardProducerFactory.java | 8 +-- 5 files changed, 83 insertions(+), 14 deletions(-) diff --git a/Dockerfile b/Dockerfile index 73e7f99..cba2b28 100644 --- a/Dockerfile +++ b/Dockerfile @@ -1,6 +1,6 @@ # https://www.keycloak.org/server/containers -FROM quay.io/keycloak/keycloak:19.0.3 -RUN curl -sL https://github.com/SnuK87/keycloak-kafka/releases/download/1.1.1/keycloak-kafka-1.1.1-jar-with-dependencies.jar -o /opt/keycloak/providers/keycloak-kafka-1.1.1-jar-with-dependencies.jar +FROM quay.io/keycloak/keycloak:latest +COPY target/keycloak-kafka-1.2.0-jar-with-dependencies.jar /opt/keycloak/providers/keycloak-kafka-1.2.0-jar-with-dependencies.jar RUN /opt/keycloak/bin/kc.sh build ENTRYPOINT ["/opt/keycloak/bin/kc.sh"] \ No newline at end of file diff --git a/docker-compose.yml b/docker-compose.yml index b44d7b2..226a7b0 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -2,16 +2,21 @@ version: '3' services: keycloak: depends_on: - - "kafka" + - kafka image: keycloak-kafka + build: . ports: - "8080:8080" environment: KEYCLOAK_ADMIN: admin KEYCLOAK_ADMIN_PASSWORD: admin KAFKA_TOPIC: keycloak-events + KAFKA_ADMIN_TOPIC: keycloak-admin-events KAFKA_CLIENT_ID: keycloak - KAFKA_BOOTSTRAP_SERVERS: kafka:9094 + KAFKA_BOOTSTRAP_SERVERS: kafka:9092 + KAFKA_SCHEMA_REGISTRY_URL: http://schema-registry:8081 + KAFKA_VALUE_SERIALIZER_CLASS: io.confluent.kafka.serializers.KafkaAvroSerializer + AUTO_REGISTER_SCHEMAS: 'true' command: start-dev zookeeper: @@ -24,7 +29,7 @@ services: kafka: depends_on: - - "zookeeper" + - zookeeper image: bitnami/kafka:latest ports: - "9092:9092" @@ -38,4 +43,45 @@ services: KAFKA_CREATE_TOPICS: "keycloak-events:1:1" ALLOW_PLAINTEXT_LISTENER: "yes" volumes: - - /var/run/docker.sock:/var/run/docker.sock \ No newline at end of file + - /var/run/docker.sock:/var/run/docker.sock + + schema-registry: + image: confluentinc/cp-schema-registry:7.3.3 + restart: always + depends_on: + - kafka + ports: + - "8081:8081" + environment: + SCHEMA_REGISTRY_HOST_NAME: schema-registry + SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092' + SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081 + deploy: + resources: + limits: + memory: 512MB + + redpanda-console: + image: docker.redpanda.com/vectorized/console:latest + restart: always + depends_on: + - kafka + ports: + - "9000:8080" + environment: + SERVER_BASEPATH: /redpanda + #METRICSNAMESPACE: redpanda-console + KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:9092} + KAFKA_SCHEMAREGISTRY_ENABLED: "true" + KAFKA_SCHEMAREGISTRY_URLS: "http://schema-registry:8081" + CONNECT_ENABLED: "false" + CONNECT_CLUSTERS_NAME: connect-cluster + CONNECT_CLUSTERS_URL: "http://connect:8083" + deploy: + resources: + limits: + memory: 1G + +networks: + default: + name: keycloak-kafka-network \ No newline at end of file diff --git a/pom.xml b/pom.xml index 61d575f..7bb1221 100644 --- a/pom.xml +++ b/pom.xml @@ -16,6 +16,13 @@ 3.6.1.Final + + + confluent + https://packages.confluent.io/maven/ + + + org.keycloak @@ -62,6 +69,17 @@ ${junit.version} test + + + io.confluent + kafka-schema-registry + ${confluent.version} + + + io.confluent + kafka-avro-serializer + ${confluent.version} + diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java index 3b67b8d..0308b27 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerConfig.java @@ -2,6 +2,8 @@ import java.util.HashMap; import java.util.Map; + +import org.apache.kafka.clients.producer.ProducerConfig; import org.keycloak.Config.Scope; public class KafkaProducerConfig { @@ -9,8 +11,8 @@ public class KafkaProducerConfig { // https://kafka.apache.org/documentation/#producerconfigs public static Map init(Scope scope) { - Map propertyMap = new HashMap<>(); - KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values(); + final Map propertyMap = new HashMap<>(); + final KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values(); for (KafkaProducerProperty property : producerProperties) { String propertyEnv = System.getenv("KAFKA_" + property.name()); @@ -27,6 +29,13 @@ public static Map init(Scope scope) { } enum KafkaProducerProperty { + KEY_SERIALIZER_CLASS(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG), + VALUE_SERIALIZER_CLASS(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG), + AUTO_REGISTER_SCHEMAS("auto.register.schemas"), + USE_LATEST_VERSION("use.latest.version"), + SCHEMA_REGISTRY_URL("schema.registry.url"), + SCHEMA_REGISTRY_USER("schema.registry.user"), + SCHEMA_REGISTRY_PASSWORD("schema.registry.password"), ACKS("acks"), // BUFFER_MEMORY("buffer.memory"), // COMPRESSION_TYPE("compression.type"), // diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java index 9e45c74..72216f7 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java @@ -8,21 +8,17 @@ import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; import org.apache.kafka.common.serialization.StringSerializer; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; public final class KafkaStandardProducerFactory implements KafkaProducerFactory { - private static final Logger log = LoggerFactory.getLogger(KafkaStandardProducerFactory.class); - @Override public Producer createProducer(String clientId, String bootstrapServer, Map optionalProperties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); - props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()); - props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()); + props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName())); + props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName())); props.putAll(optionalProperties); return new KafkaProducer<>(props);