Skip to content

Commit

Permalink
#46 - Added AVRO serializer capability
Browse files Browse the repository at this point in the history
  • Loading branch information
ttimot24 committed Sep 20, 2024
1 parent 4de9668 commit 15b663c
Show file tree
Hide file tree
Showing 5 changed files with 83 additions and 14 deletions.
4 changes: 2 additions & 2 deletions Dockerfile
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
# https://www.keycloak.org/server/containers
FROM quay.io/keycloak/keycloak:19.0.3
RUN curl -sL https://github.com/SnuK87/keycloak-kafka/releases/download/1.1.1/keycloak-kafka-1.1.1-jar-with-dependencies.jar -o /opt/keycloak/providers/keycloak-kafka-1.1.1-jar-with-dependencies.jar
FROM quay.io/keycloak/keycloak:latest
COPY target/keycloak-kafka-1.2.0-jar-with-dependencies.jar /opt/keycloak/providers/keycloak-kafka-1.2.0-jar-with-dependencies.jar
RUN /opt/keycloak/bin/kc.sh build

ENTRYPOINT ["/opt/keycloak/bin/kc.sh"]
54 changes: 50 additions & 4 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,21 @@ version: '3'
services:
keycloak:
depends_on:
- "kafka"
- kafka
image: keycloak-kafka
build: .
ports:
- "8080:8080"
environment:
KEYCLOAK_ADMIN: admin
KEYCLOAK_ADMIN_PASSWORD: admin
KAFKA_TOPIC: keycloak-events
KAFKA_ADMIN_TOPIC: keycloak-admin-events
KAFKA_CLIENT_ID: keycloak
KAFKA_BOOTSTRAP_SERVERS: kafka:9094
KAFKA_BOOTSTRAP_SERVERS: kafka:9092
KAFKA_SCHEMA_REGISTRY_URL: http://schema-registry:8081
KAFKA_VALUE_SERIALIZER_CLASS: io.confluent.kafka.serializers.KafkaAvroSerializer
AUTO_REGISTER_SCHEMAS: 'true'
command: start-dev

zookeeper:
Expand All @@ -24,7 +29,7 @@ services:

kafka:
depends_on:
- "zookeeper"
- zookeeper
image: bitnami/kafka:latest
ports:
- "9092:9092"
Expand All @@ -38,4 +43,45 @@ services:
KAFKA_CREATE_TOPICS: "keycloak-events:1:1"
ALLOW_PLAINTEXT_LISTENER: "yes"
volumes:
- /var/run/docker.sock:/var/run/docker.sock
- /var/run/docker.sock:/var/run/docker.sock

schema-registry:
image: confluentinc/cp-schema-registry:7.3.3
restart: always
depends_on:
- kafka
ports:
- "8081:8081"
environment:
SCHEMA_REGISTRY_HOST_NAME: schema-registry
SCHEMA_REGISTRY_KAFKASTORE_BOOTSTRAP_SERVERS: 'kafka:9092'
SCHEMA_REGISTRY_LISTENERS: http://0.0.0.0:8081
deploy:
resources:
limits:
memory: 512MB

redpanda-console:
image: docker.redpanda.com/vectorized/console:latest
restart: always
depends_on:
- kafka
ports:
- "9000:8080"
environment:
SERVER_BASEPATH: /redpanda
#METRICSNAMESPACE: redpanda-console
KAFKA_BROKERS: ${KAFKA_BROKERS:-kafka:9092}
KAFKA_SCHEMAREGISTRY_ENABLED: "true"
KAFKA_SCHEMAREGISTRY_URLS: "http://schema-registry:8081"
CONNECT_ENABLED: "false"
CONNECT_CLUSTERS_NAME: connect-cluster
CONNECT_CLUSTERS_URL: "http://connect:8083"
deploy:
resources:
limits:
memory: 1G

networks:
default:
name: keycloak-kafka-network
18 changes: 18 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,13 @@
<jboss-logging.version>3.6.1.Final</jboss-logging.version>
</properties>

<repositories>
<repository>
<id>confluent</id>
<url>https://packages.confluent.io/maven/</url>
</repository>
</repositories>

<dependencies>
<dependency>
<groupId>org.keycloak</groupId>
Expand Down Expand Up @@ -62,6 +69,17 @@
<version>${junit.version}</version>
<scope>test</scope>
</dependency>

<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-schema-registry</artifactId>
<version>${confluent.version}</version>
</dependency>
<dependency>
<groupId>io.confluent</groupId>
<artifactId>kafka-avro-serializer</artifactId>
<version>${confluent.version}</version>
</dependency>
</dependencies>

<build>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,15 +2,17 @@

import java.util.HashMap;
import java.util.Map;

import org.apache.kafka.clients.producer.ProducerConfig;
import org.keycloak.Config.Scope;

public class KafkaProducerConfig {

// https://kafka.apache.org/documentation/#producerconfigs

public static Map<String, Object> init(Scope scope) {
Map<String, Object> propertyMap = new HashMap<>();
KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();
final Map<String, Object> propertyMap = new HashMap<>();
final KafkaProducerProperty[] producerProperties = KafkaProducerProperty.values();

for (KafkaProducerProperty property : producerProperties) {
String propertyEnv = System.getenv("KAFKA_" + property.name());
Expand All @@ -27,6 +29,13 @@ public static Map<String, Object> init(Scope scope) {
}

enum KafkaProducerProperty {
KEY_SERIALIZER_CLASS(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG),
VALUE_SERIALIZER_CLASS(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG),
AUTO_REGISTER_SCHEMAS("auto.register.schemas"),
USE_LATEST_VERSION("use.latest.version"),
SCHEMA_REGISTRY_URL("schema.registry.url"),
SCHEMA_REGISTRY_USER("schema.registry.user"),
SCHEMA_REGISTRY_PASSWORD("schema.registry.password"),
ACKS("acks"), //
BUFFER_MEMORY("buffer.memory"), //
COMPRESSION_TYPE("compression.type"), //
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,21 +8,17 @@
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public final class KafkaStandardProducerFactory implements KafkaProducerFactory {

private static final Logger log = LoggerFactory.getLogger(KafkaStandardProducerFactory.class);

@Override
public Producer<String, Object> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName());
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName());
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class.getName()));
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, optionalProperties.getOrDefault(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, JsonSerializer.class.getName()));
props.putAll(optionalProperties);

return new KafkaProducer<>(props);
Expand Down

0 comments on commit 15b663c

Please sign in to comment.