diff --git a/pom.xml b/pom.xml index 7bb1221..fd1a3d6 100644 --- a/pom.xml +++ b/pom.xml @@ -84,6 +84,25 @@ + + + org.apache.avro + avro-maven-plugin + 1.12.0 + + + generate-sources + + schema + + + src/main/resources/avro (5) + ${project.build.directory}/generated-sources + String + + + + maven-assembly-plugin diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java index 1f3ce03..2d1f9ee 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaProducerFactory.java @@ -2,11 +2,12 @@ import java.util.Map; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.producer.Producer; public interface KafkaProducerFactory { - Producer createProducer(String clientId, String bootstrapServer, - Map optionalProperties); + Producer createProducer(String clientId, String bootstrapServer, + Map optionalProperties); } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java index 72216f7..d4018d4 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/KafkaStandardProducerFactory.java @@ -4,6 +4,7 @@ import java.util.Properties; import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.producer.KafkaProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerConfig; @@ -12,8 +13,8 @@ public final class KafkaStandardProducerFactory implements KafkaProducerFactory { @Override - public Producer createProducer(String clientId, String bootstrapServer, - Map optionalProperties) { + public Producer createProducer(String clientId, String bootstrapServer, + Map optionalProperties) { Properties props = new Properties(); props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, bootstrapServer); props.put(ProducerConfig.CLIENT_ID_CONFIG, clientId); diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java b/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java new file mode 100644 index 0000000..ec95d72 --- /dev/null +++ b/src/main/java/com/github/snuk87/keycloak/kafka/mapper/KeycloakMapper.java @@ -0,0 +1,53 @@ +package com.github.snuk87.keycloak.kafka.mapper; + +import com.github.snuk87.keycloak.kafka.dto.AuthDetails; +import com.github.snuk87.keycloak.kafka.dto.KeycloakAdminEvent; +import com.github.snuk87.keycloak.kafka.dto.KeycloakEvent; +import com.github.snuk87.keycloak.kafka.dto.OperationType; +import org.keycloak.events.Event; +import org.keycloak.events.admin.AdminEvent; + +import java.util.Map; +import java.util.Optional; + +public class KeycloakMapper { + + public static KeycloakAdminEvent mapEventToKeycloakEvent(final AdminEvent event){ + return KeycloakAdminEvent.newBuilder() + .setId(event.getId()) + .setTime(event.getTime()) + .setRealmId(event.getRealmId()) + .setRealmName(event.getRealmName()) + .setError(event.getError()) + .setResourceType(event.getResourceTypeAsString()) + .setResourcePath(event.getResourcePath()) + .setRepresentation(event.getRepresentation()) + .setOperationType(OperationType.valueOf(Optional.ofNullable(event.getOperationType()).map(ot -> ot.toString()).orElse(OperationType.ACTION.toString()))) + .setAuthDetails( + Optional.ofNullable(event.getAuthDetails()).map(authDetails -> AuthDetails.newBuilder() + .setClientId(authDetails.getClientId()) + .setIpAddress(authDetails.getIpAddress()) + .setRealmId(authDetails.getRealmId()) + .setRealmName(authDetails.getRealmName()) + .setUserId(authDetails.getUserId()) + .build()) + .orElse(null) + ).build(); + } + + public static KeycloakEvent mapEventToKeycloakEvent(final Event event){ + return KeycloakEvent.newBuilder() + .setId(event.getId()) + .setTime(event.getTime()) + .setRealmId(event.getRealmId()) + .setRealmName(event.getRealmName()) + .setError(event.getError()) + .setClientId(event.getClientId()) + .setUserId(event.getUserId()) + .setSessionId(event.getSessionId()) + .setIpAddress(event.getIpAddress()) + .setDetails(event.getDetails()) + .build(); + } + +} diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java index fd455d5..2638b1b 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/JsonSerializer.java @@ -2,16 +2,20 @@ import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.ObjectMapper; +import com.github.snuk87.keycloak.kafka.serializer.mixin.JacksonIgnoreAvroPropertiesMixIn; +import org.apache.avro.specific.SpecificRecord; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.common.errors.SerializationException; import org.apache.kafka.common.serialization.Serializer; import java.util.Map; -public class JsonSerializer implements Serializer { - private final ObjectMapper objectMapper = new ObjectMapper(); +public class JsonSerializer implements Serializer { + private final ObjectMapper objectMapper; public JsonSerializer() { - + this.objectMapper = new ObjectMapper(); + this.objectMapper.addMixIn(SpecificRecord.class, JacksonIgnoreAvroPropertiesMixIn.class); } @Override @@ -20,7 +24,7 @@ public void configure(Map config, boolean isKey) { } @Override - public byte[] serialize(String topic, Object data) { + public byte[] serialize(String topic, SpecificRecordBase data) { if (data == null) { return null; } diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/serializer/mixin/JacksonIgnoreAvroPropertiesMixIn.java b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/mixin/JacksonIgnoreAvroPropertiesMixIn.java new file mode 100644 index 0000000..b8888a1 --- /dev/null +++ b/src/main/java/com/github/snuk87/keycloak/kafka/serializer/mixin/JacksonIgnoreAvroPropertiesMixIn.java @@ -0,0 +1,15 @@ +package com.github.snuk87.keycloak.kafka.serializer.mixin; + +import com.fasterxml.jackson.annotation.JsonIgnore; +import org.apache.avro.Schema; +import org.apache.avro.specific.SpecificData; + +public abstract class JacksonIgnoreAvroPropertiesMixIn { + + @JsonIgnore + public abstract Schema getSchema(); + + @JsonIgnore + public abstract SpecificData getSpecificData(); + +} diff --git a/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java index 6a41e54..758935b 100644 --- a/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java +++ b/src/main/java/com/github/snuk87/keycloak/kafka/spi/KafkaEventListenerProvider.java @@ -9,6 +9,8 @@ import java.util.concurrent.TimeoutException; import com.github.snuk87.keycloak.kafka.KafkaProducerFactory; +import com.github.snuk87.keycloak.kafka.mapper.KeycloakMapper; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.clients.producer.ProducerRecord; import org.apache.kafka.clients.producer.RecordMetadata; @@ -28,7 +30,7 @@ public class KafkaEventListenerProvider implements EventListenerProvider { private String topicAdminEvents; - private Producer producer; + private Producer producer; public KafkaEventListenerProvider(String bootstrapServers, String clientId, String topicEvents, String[] events, String topicAdminEvents, Map kafkaProducerProperties, KafkaProducerFactory factory) { @@ -48,10 +50,10 @@ public KafkaEventListenerProvider(String bootstrapServers, String clientId, Stri producer = factory.createProducer(clientId, bootstrapServers, kafkaProducerProperties); } - private void produceEvent(final Object event, final String topic) + private void produceEvent(final SpecificRecordBase event, final String topic) throws InterruptedException, ExecutionException, TimeoutException { LOG.debug("Produce to topic: " + topicEvents + " ..."); - final ProducerRecord record = new ProducerRecord<>(topic, event); + final ProducerRecord record = new ProducerRecord<>(topic, event); Future metaData = producer.send(record); RecordMetadata recordMetadata = metaData.get(30, TimeUnit.SECONDS); LOG.debug("Produced to topic: " + recordMetadata.topic()); @@ -61,7 +63,7 @@ private void produceEvent(final Object event, final String topic) public void onEvent(Event event) { if (events.contains(event.getType())) { try { - this.produceEvent(event, topicEvents); + this.produceEvent(KeycloakMapper.mapEventToKeycloakEvent(event), topicEvents); } catch (ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } catch (InterruptedException e) { @@ -75,7 +77,7 @@ public void onEvent(Event event) { public void onEvent(AdminEvent event, boolean includeRepresentation) { if (topicAdminEvents != null) { try { - this.produceEvent(event, topicAdminEvents); + this.produceEvent(KeycloakMapper.mapEventToKeycloakEvent(event), topicAdminEvents); } catch (ExecutionException | TimeoutException e) { LOG.error(e.getMessage(), e); } catch (InterruptedException e) { diff --git a/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory b/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory index d5d1328..8dea32f 100644 --- a/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory +++ b/src/main/resources/META-INF/services/org.keycloak.events.EventListenerProviderFactory @@ -1 +1 @@ -com.github.snuk87.keycloak.kafka.KafkaEventListenerProviderFactory \ No newline at end of file +com.github.snuk87.keycloak.kafka.spi.KafkaEventListenerProviderFactory \ No newline at end of file diff --git a/src/main/resources/avro/keycloak-admin-events-value.avsc b/src/main/resources/avro/keycloak-admin-events-value.avsc new file mode 100644 index 0000000..82f722f --- /dev/null +++ b/src/main/resources/avro/keycloak-admin-events-value.avsc @@ -0,0 +1,137 @@ +{ + "type": "record", + "name": "KeycloakAdminEvent", + "namespace": "com.github.snuk87.keycloak.kafka.dto", + "fields": [ + { + "name": "authDetails", + "type": [ + "null", + { + "type": "record", + "name": "AuthDetails", + "fields": [ + { + "name": "clientId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ipAddress", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmName", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "userId", + "type": [ + "null", + "string" + ], + "default": null + } + ] + } + ], + "default": null + }, + { + "name": "error", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "operationType", + "type": [ + "null", + { + "type": "enum", + "name": "OperationType", + "symbols": [ + "CREATE", + "UPDATE", + "DELETE", + "ACTION" + ] + } + ], + "default": null + }, + { + "name": "realmId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmName", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "representation", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "resourcePath", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "resourceType", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "time", + "type": "long" + } + ] +} \ No newline at end of file diff --git a/src/main/resources/avro/keycloak-events-value.avsc b/src/main/resources/avro/keycloak-events-value.avsc new file mode 100644 index 0000000..41d74f2 --- /dev/null +++ b/src/main/resources/avro/keycloak-events-value.avsc @@ -0,0 +1,213 @@ +{ + "type": "record", + "name": "KeycloakEvent", + "namespace": "com.github.snuk87.keycloak.kafka.dto", + "fields": [ + { + "name": "clientId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "details", + "type": [ + "null", + { + "type": "map", + "values": "string" + } + ], + "default": null + }, + { + "name": "error", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "id", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "ipAddress", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "realmName", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "sessionId", + "type": [ + "null", + "string" + ], + "default": null + }, + { + "name": "time", + "type": "long" + }, + { + "name": "type", + "type": [ + "null", + { + "type": "enum", + "name": "EventType", + "symbols": [ + "LOGIN", + "LOGIN_ERROR", + "REGISTER", + "REGISTER_ERROR", + "LOGOUT", + "LOGOUT_ERROR", + "CODE_TO_TOKEN", + "CODE_TO_TOKEN_ERROR", + "CLIENT_LOGIN", + "CLIENT_LOGIN_ERROR", + "REFRESH_TOKEN", + "REFRESH_TOKEN_ERROR", + "VALIDATE_ACCESS_TOKEN", + "VALIDATE_ACCESS_TOKEN_ERROR", + "INTROSPECT_TOKEN", + "INTROSPECT_TOKEN_ERROR", + "FEDERATED_IDENTITY_LINK", + "FEDERATED_IDENTITY_LINK_ERROR", + "REMOVE_FEDERATED_IDENTITY", + "REMOVE_FEDERATED_IDENTITY_ERROR", + "UPDATE_EMAIL", + "UPDATE_EMAIL_ERROR", + "UPDATE_PROFILE", + "UPDATE_PROFILE_ERROR", + "UPDATE_PASSWORD", + "UPDATE_PASSWORD_ERROR", + "UPDATE_TOTP", + "UPDATE_TOTP_ERROR", + "VERIFY_EMAIL", + "VERIFY_EMAIL_ERROR", + "VERIFY_PROFILE", + "VERIFY_PROFILE_ERROR", + "REMOVE_TOTP", + "REMOVE_TOTP_ERROR", + "GRANT_CONSENT", + "GRANT_CONSENT_ERROR", + "UPDATE_CONSENT", + "UPDATE_CONSENT_ERROR", + "REVOKE_GRANT", + "REVOKE_GRANT_ERROR", + "SEND_VERIFY_EMAIL", + "SEND_VERIFY_EMAIL_ERROR", + "SEND_RESET_PASSWORD", + "SEND_RESET_PASSWORD_ERROR", + "SEND_IDENTITY_PROVIDER_LINK", + "SEND_IDENTITY_PROVIDER_LINK_ERROR", + "RESET_PASSWORD", + "RESET_PASSWORD_ERROR", + "RESTART_AUTHENTICATION", + "RESTART_AUTHENTICATION_ERROR", + "INVALID_SIGNATURE", + "INVALID_SIGNATURE_ERROR", + "REGISTER_NODE", + "REGISTER_NODE_ERROR", + "UNREGISTER_NODE", + "UNREGISTER_NODE_ERROR", + "USER_INFO_REQUEST", + "USER_INFO_REQUEST_ERROR", + "IDENTITY_PROVIDER_LINK_ACCOUNT", + "IDENTITY_PROVIDER_LINK_ACCOUNT_ERROR", + "IDENTITY_PROVIDER_LOGIN", + "IDENTITY_PROVIDER_LOGIN_ERROR", + "IDENTITY_PROVIDER_FIRST_LOGIN", + "IDENTITY_PROVIDER_FIRST_LOGIN_ERROR", + "IDENTITY_PROVIDER_POST_LOGIN", + "IDENTITY_PROVIDER_POST_LOGIN_ERROR", + "IDENTITY_PROVIDER_RESPONSE", + "IDENTITY_PROVIDER_RESPONSE_ERROR", + "IDENTITY_PROVIDER_RETRIEVE_TOKEN", + "IDENTITY_PROVIDER_RETRIEVE_TOKEN_ERROR", + "IMPERSONATE", + "IMPERSONATE_ERROR", + "CUSTOM_REQUIRED_ACTION", + "CUSTOM_REQUIRED_ACTION_ERROR", + "EXECUTE_ACTIONS", + "EXECUTE_ACTIONS_ERROR", + "EXECUTE_ACTION_TOKEN", + "EXECUTE_ACTION_TOKEN_ERROR", + "CLIENT_INFO", + "CLIENT_INFO_ERROR", + "CLIENT_REGISTER", + "CLIENT_REGISTER_ERROR", + "CLIENT_UPDATE", + "CLIENT_UPDATE_ERROR", + "CLIENT_DELETE", + "CLIENT_DELETE_ERROR", + "CLIENT_INITIATED_ACCOUNT_LINKING", + "CLIENT_INITIATED_ACCOUNT_LINKING_ERROR", + "TOKEN_EXCHANGE", + "TOKEN_EXCHANGE_ERROR", + "OAUTH2_DEVICE_AUTH", + "OAUTH2_DEVICE_AUTH_ERROR", + "OAUTH2_DEVICE_VERIFY_USER_CODE", + "OAUTH2_DEVICE_VERIFY_USER_CODE_ERROR", + "OAUTH2_DEVICE_CODE_TO_TOKEN", + "OAUTH2_DEVICE_CODE_TO_TOKEN_ERROR", + "AUTHREQID_TO_TOKEN", + "AUTHREQID_TO_TOKEN_ERROR", + "PERMISSION_TOKEN", + "PERMISSION_TOKEN_ERROR", + "DELETE_ACCOUNT", + "DELETE_ACCOUNT_ERROR", + "PUSHED_AUTHORIZATION_REQUEST", + "PUSHED_AUTHORIZATION_REQUEST_ERROR", + "USER_DISABLED_BY_PERMANENT_LOCKOUT", + "USER_DISABLED_BY_PERMANENT_LOCKOUT_ERROR", + "USER_DISABLED_BY_TEMPORARY_LOCKOUT", + "USER_DISABLED_BY_TEMPORARY_LOCKOUT_ERROR", + "OAUTH2_EXTENSION_GRANT", + "OAUTH2_EXTENSION_GRANT_ERROR", + "FEDERATED_IDENTITY_OVERRIDE_LINK", + "FEDERATED_IDENTITY_OVERRIDE_LINK_ERROR", + "INVITE_ORG", + "INVITE_ORG_ERROR" + ] + } + ], + "default": null + }, + { + "name": "userId", + "type": [ + "null", + "string" + ], + "default": null + } + ] +} diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java index 66ea508..d820690 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaEventListenerProviderTests.java @@ -29,6 +29,7 @@ void setUp() throws Exception { @Test void shouldProduceEventWhenTypeIsDefined() throws Exception { Event event = new Event(); + event.setId("1"); event.setType(EventType.REGISTER); MockProducer producer = getProducerUsingReflection(); @@ -40,6 +41,7 @@ void shouldProduceEventWhenTypeIsDefined() throws Exception { @Test void shouldDoNothingWhenTypeIsNotDefined() throws Exception { Event event = new Event(); + event.setId("1"); event.setType(EventType.CLIENT_DELETE); MockProducer producer = getProducerUsingReflection(); @@ -51,6 +53,7 @@ void shouldDoNothingWhenTypeIsNotDefined() throws Exception { @Test void shouldProduceEventWhenTopicAdminEventsIsNotNull() throws Exception { AdminEvent event = new AdminEvent(); + event.setId("1"); MockProducer producer = getProducerUsingReflection(); listener.onEvent(event, false); @@ -62,6 +65,7 @@ void shouldProduceEventWhenTopicAdminEventsIsNotNull() throws Exception { void shouldDoNothingWhenTopicAdminEventsIsNull() throws Exception { listener = new KafkaEventListenerProvider("", "", "", new String[] { "REGISTER" }, null, Map.of(), factory); AdminEvent event = new AdminEvent(); + event.setId("1"); MockProducer producer = getProducerUsingReflection(); listener.onEvent(event, false); diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java index 64f7ea3..c36d5a0 100644 --- a/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KafkaMockProducerFactory.java @@ -3,6 +3,7 @@ import java.util.Map; import com.github.snuk87.keycloak.kafka.serializer.JsonSerializer; +import org.apache.avro.specific.SpecificRecordBase; import org.apache.kafka.clients.producer.MockProducer; import org.apache.kafka.clients.producer.Producer; import org.apache.kafka.common.serialization.StringSerializer; @@ -10,8 +11,8 @@ class KafkaMockProducerFactory implements KafkaProducerFactory { @Override - public Producer createProducer(String clientId, String bootstrapServer, - Map optionalProperties) { + public Producer createProducer(String clientId, String bootstrapServer, + Map optionalProperties) { return new MockProducer<>(true, new StringSerializer(), new JsonSerializer()); } diff --git a/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java b/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java new file mode 100644 index 0000000..66c932d --- /dev/null +++ b/src/test/java/com/github/snuk87/keycloak/kafka/KeycloakEventAvroSerializerTest.java @@ -0,0 +1,37 @@ +package com.github.snuk87.keycloak.kafka; + +import com.github.snuk87.keycloak.kafka.dto.KeycloakAdminEvent; +import org.apache.avro.Schema; +import org.apache.kafka.clients.admin.Admin; +import org.junit.jupiter.api.Test; +import io.confluent.kafka.schemaregistry.avro.AvroSchemaUtils; +import org.keycloak.events.Event; +import org.keycloak.events.EventType; +import org.keycloak.events.admin.AdminEvent; +import org.keycloak.events.admin.OperationType; + +import static org.junit.jupiter.api.Assertions.assertNotNull; + +public class KeycloakEventAvroSerializerTest { + + @Test + void eventCanBeSerialized() throws Exception { + + final AdminEvent adminevent = new AdminEvent(); + adminevent.setOperationType(OperationType.CREATE); + + final Event event = new Event(); + event.setType(EventType.REGISTER); + + final Schema schema1 = AvroSchemaUtils.getSchema(adminevent, true, false, false); + final Schema schema2= AvroSchemaUtils.getSchema(event, true, true, true); + + + System.out.println(schema1); + System.out.println(schema2); + + assertNotNull(schema1); + assertNotNull(schema2); + } + +}