Skip to content

Commit

Permalink
#46 - Resolve compatibility issues between JSON and AVRO
Browse files Browse the repository at this point in the history
  • Loading branch information
ttimot24 committed Sep 20, 2024
1 parent 15b663c commit 4678bad
Show file tree
Hide file tree
Showing 13 changed files with 503 additions and 16 deletions.
19 changes: 19 additions & 0 deletions pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,25 @@

<build>
<plugins>
<!-- other maven plugins in the project -->
<plugin>
<groupId>org.apache.avro</groupId>
<artifactId>avro-maven-plugin</artifactId>
<version>1.12.0</version>
<executions>
<execution>
<phase>generate-sources</phase>
<goals>
<goal>schema</goal>
</goals>
<configuration>
<sourceDirectory>src/main/resources/avro</sourceDirectory> (5)
<outputDirectory>${project.build.directory}/generated-sources</outputDirectory>
<stringType>String</stringType>
</configuration>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-assembly-plugin</artifactId>
<configuration>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,11 +2,12 @@

import java.util.Map;

import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.Producer;

public interface KafkaProducerFactory {

Producer<String, Object> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties);
Producer<String, SpecificRecordBase> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties);

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import java.util.Properties;

import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerConfig;
Expand All @@ -12,8 +13,8 @@
public final class KafkaStandardProducerFactory implements KafkaProducerFactory {

@Override
public Producer<String, Object> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties) {
public Producer<String, SpecificRecordBase> createProducer(String clientId, String bootstrapServer,
Map<String, Object> optionalProperties) {
Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer);
props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package com.github.snuk87.keycloak.kafka.mapper;

import com.github.snuk87.keycloak.kafka.dto.AuthDetails;
import com.github.snuk87.keycloak.kafka.dto.KeycloakAdminEvent;
import com.github.snuk87.keycloak.kafka.dto.KeycloakEvent;
import com.github.snuk87.keycloak.kafka.dto.OperationType;
import org.keycloak.events.Event;
import org.keycloak.events.admin.AdminEvent;

import java.util.Map;
import java.util.Optional;

public class KeycloakMapper {

public static KeycloakAdminEvent mapEventToKeycloakEvent(final AdminEvent event){
return KeycloakAdminEvent.newBuilder()
.setId(event.getId())
.setTime(event.getTime())
.setRealmId(event.getRealmId())
.setRealmName(event.getRealmName())
.setError(event.getError())
.setResourceType(event.getResourceTypeAsString())
.setResourcePath(event.getResourcePath())
.setRepresentation(event.getRepresentation())
.setOperationType(OperationType.valueOf(Optional.ofNullable(event.getOperationType()).map(ot -> ot.toString()).orElse(OperationType.ACTION.toString())))
.setAuthDetails(
Optional.ofNullable(event.getAuthDetails()).map(authDetails -> AuthDetails.newBuilder()
.setClientId(authDetails.getClientId())
.setIpAddress(authDetails.getIpAddress())
.setRealmId(authDetails.getRealmId())
.setRealmName(authDetails.getRealmName())
.setUserId(authDetails.getUserId())
.build())
.orElse(null)
).build();
}

public static KeycloakEvent mapEventToKeycloakEvent(final Event event){
return KeycloakEvent.newBuilder()
.setId(event.getId())
.setTime(event.getTime())
.setRealmId(event.getRealmId())
.setRealmName(event.getRealmName())
.setError(event.getError())
.setClientId(event.getClientId())
.setUserId(event.getUserId())
.setSessionId(event.getSessionId())
.setIpAddress(event.getIpAddress())
.setDetails(event.getDetails())
.build();
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -2,16 +2,20 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.github.snuk87.keycloak.kafka.serializer.mixin.JacksonIgnoreAvroPropertiesMixIn;
import org.apache.avro.specific.SpecificRecord;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.common.errors.SerializationException;
import org.apache.kafka.common.serialization.Serializer;

import java.util.Map;

public class JsonSerializer implements Serializer<Object> {
private final ObjectMapper objectMapper = new ObjectMapper();
public class JsonSerializer implements Serializer<SpecificRecordBase> {
private final ObjectMapper objectMapper;

public JsonSerializer() {

this.objectMapper = new ObjectMapper();
this.objectMapper.addMixIn(SpecificRecord.class, JacksonIgnoreAvroPropertiesMixIn.class);
}

@Override
Expand All @@ -20,7 +24,7 @@ public void configure(Map<String, ?> config, boolean isKey) {
}

@Override
public byte[] serialize(String topic, Object data) {
public byte[] serialize(String topic, SpecificRecordBase data) {
if (data == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
package com.github.snuk87.keycloak.kafka.serializer.mixin;

import com.fasterxml.jackson.annotation.JsonIgnore;
import org.apache.avro.Schema;
import org.apache.avro.specific.SpecificData;

public abstract class JacksonIgnoreAvroPropertiesMixIn {

@JsonIgnore
public abstract Schema getSchema();

@JsonIgnore
public abstract SpecificData getSpecificData();

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@
import java.util.concurrent.TimeoutException;

import com.github.snuk87.keycloak.kafka.KafkaProducerFactory;
import com.github.snuk87.keycloak.kafka.mapper.KeycloakMapper;
import org.apache.avro.specific.SpecificRecordBase;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;
import org.apache.kafka.clients.producer.RecordMetadata;
Expand All @@ -28,7 +30,7 @@ public class KafkaEventListenerProvider implements EventListenerProvider {

private String topicAdminEvents;

private Producer<String, Object> producer;
private Producer<String, SpecificRecordBase> producer;

public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events,
String topicAdminEvents, Map<String, Object> kafkaProducerProperties, KafkaProducerFactory factory) {
Expand All @@ -48,10 +50,10 @@ public KafkaEventListenerProvider(String bootstrapServers, String clientId, Stri
producer = factory.createProducer(clientId, bootstrapServers, kafkaProducerProperties);
}

private void produceEvent(final Object event, final String topic)
private void produceEvent(final SpecificRecordBase event, final String topic)
throws InterruptedException, ExecutionException, TimeoutException {
LOG.debug("Produce to topic: " + topicEvents + " ...");
final ProducerRecord<String, Object> record = new ProducerRecord<>(topic, event);
final ProducerRecord<String, SpecificRecordBase> record = new ProducerRecord<>(topic, event);
Future<RecordMetadata> metaData = producer.send(record);
RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS);
LOG.debug("Produced to topic: " + recordMetadata.topic());
Expand All @@ -61,7 +63,7 @@ private void produceEvent(final Object event, final String topic)
public void onEvent(Event event) {
if (events.contains(event.getType())) {
try {
this.produceEvent(event, topicEvents);
this.produceEvent(KeycloakMapper.mapEventToKeycloakEvent(event), topicEvents);
} catch (ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
Expand All @@ -75,7 +77,7 @@ public void onEvent(Event event) {
public void onEvent(AdminEvent event, boolean includeRepresentation) {
if (topicAdminEvents != null) {
try {
this.produceEvent(event, topicAdminEvents);
this.produceEvent(KeycloakMapper.mapEventToKeycloakEvent(event), topicAdminEvents);
} catch (ExecutionException | TimeoutException e) {
LOG.error(e.getMessage(), e);
} catch (InterruptedException e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1 +1 @@
com.github.snuk87.keycloak.kafka.KafkaEventListenerProviderFactory
com.github.snuk87.keycloak.kafka.spi.KafkaEventListenerProviderFactory
137 changes: 137 additions & 0 deletions src/main/resources/avro/keycloak-admin-events-value.avsc
Original file line number Diff line number Diff line change
@@ -0,0 +1,137 @@
{
"type": "record",
"name": "KeycloakAdminEvent",
"namespace": "com.github.snuk87.keycloak.kafka.dto",
"fields": [
{
"name": "authDetails",
"type": [
"null",
{
"type": "record",
"name": "AuthDetails",
"fields": [
{
"name": "clientId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "ipAddress",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "realmId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "realmName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "userId",
"type": [
"null",
"string"
],
"default": null
}
]
}
],
"default": null
},
{
"name": "error",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "id",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "operationType",
"type": [
"null",
{
"type": "enum",
"name": "OperationType",
"symbols": [
"CREATE",
"UPDATE",
"DELETE",
"ACTION"
]
}
],
"default": null
},
{
"name": "realmId",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "realmName",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "representation",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "resourcePath",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "resourceType",
"type": [
"null",
"string"
],
"default": null
},
{
"name": "time",
"type": "long"
}
]
}
Loading

0 comments on commit 4678bad

Please sign in to comment.