diff --git a/common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java b/common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java index 7a00ab9..c3fd87a 100644 --- a/common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java +++ b/common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java @@ -159,6 +159,6 @@ private Collection channelsForMessage(final Object message, final Collec } private String messageTypeId(final Class messageType) { - return messageType.toString(); + return messageType.getTypeName(); } } diff --git a/common/src/main/java/net/infumia/pubsub/Envelope.java b/common/src/main/java/net/infumia/pubsub/Envelope.java index b81a1e1..27170ed 100644 --- a/common/src/main/java/net/infumia/pubsub/Envelope.java +++ b/common/src/main/java/net/infumia/pubsub/Envelope.java @@ -5,13 +5,15 @@ import java.util.UUID; final class Envelope { - final UUID brokerId; - final UUID messageId; + UUID brokerId; + UUID messageId; /** * Can be {@code null}. */ - final UUID respondsTo; - final byte[] messagePayload; + UUID respondsTo; + byte[] messagePayload; + + Envelope() {} Envelope( final UUID brokerId, diff --git a/gradle/libs.versions.toml b/gradle/libs.versions.toml index af6ef45..65ec3f4 100644 --- a/gradle/libs.versions.toml +++ b/gradle/libs.versions.toml @@ -9,6 +9,7 @@ redis = { module = "io.lettuce:lettuce-core", version = "6.3.2.RELEASE" } kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" } kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core", version.ref = "kotlinserialization" } kotlinx-serialization-protobuf = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinserialization" } +jackson = { module = "com.fasterxml.jackson.core:jackson-databind", version = "2.17.1" } [plugins] kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" } diff --git a/jackson/build.gradle.kts b/jackson/build.gradle.kts new file mode 100644 index 0000000..c778490 --- /dev/null +++ b/jackson/build.gradle.kts @@ -0,0 +1,5 @@ +dependencies { + compileOnly(project(":common")) + + compileOnly(libs.jackson) +} diff --git a/jackson/gradle.properties b/jackson/gradle.properties new file mode 100644 index 0000000..469fd60 --- /dev/null +++ b/jackson/gradle.properties @@ -0,0 +1 @@ +artifact-id=pubsub-jackson diff --git a/jackson/src/main/java/net/infumia/pubsub/CodecJackson.java b/jackson/src/main/java/net/infumia/pubsub/CodecJackson.java new file mode 100644 index 0000000..a4c2086 --- /dev/null +++ b/jackson/src/main/java/net/infumia/pubsub/CodecJackson.java @@ -0,0 +1,34 @@ +package net.infumia.pubsub; + +import com.fasterxml.jackson.core.JsonProcessingException; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.IOException; + +final class CodecJackson implements Codec { + private final ObjectMapper mapper; + private final Class type; + + CodecJackson(final ObjectMapper mapper, final Class type) { + this.mapper = mapper; + this.type = type; + } + + @Override + public byte[] encode(final T t) { + try { + return this.mapper.writerFor(this.type).writeValueAsBytes(t); + } catch (final JsonProcessingException e) { + throw new RuntimeException("Failed to serialize " + this.type, e); + } + } + + @Override + public T decode(final byte[] bytes) { + try { + return this.mapper.readerFor(this.type).readValue(bytes); + } catch (final IOException e) { + throw new RuntimeException("Failed to deserialize " + this.type, e); + } + } +} diff --git a/jackson/src/main/java/net/infumia/pubsub/CodecProviderJackson.java b/jackson/src/main/java/net/infumia/pubsub/CodecProviderJackson.java new file mode 100644 index 0000000..90853af --- /dev/null +++ b/jackson/src/main/java/net/infumia/pubsub/CodecProviderJackson.java @@ -0,0 +1,24 @@ +package net.infumia.pubsub; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A {@link CodecProvider} implementation that uses Jackson for JSON serialization and deserialization. + */ +public final class CodecProviderJackson implements CodecProvider { + private final JacksonProvider provider; + + /** + * Ctor. + * + * @param provider the {@link JacksonProvider} used to provide {@link ObjectMapper} instances. + */ + public CodecProviderJackson(final JacksonProvider provider) { + this.provider = provider; + } + + @Override + public Codec provide(final Class type) { + return new CodecJackson<>(this.provider.provide(), type); + } +} diff --git a/jackson/src/main/java/net/infumia/pubsub/JacksonProvider.java b/jackson/src/main/java/net/infumia/pubsub/JacksonProvider.java new file mode 100644 index 0000000..e7ecb58 --- /dev/null +++ b/jackson/src/main/java/net/infumia/pubsub/JacksonProvider.java @@ -0,0 +1,15 @@ +package net.infumia.pubsub; + +import com.fasterxml.jackson.databind.ObjectMapper; + +/** + * A provider interface for supplying {@link ObjectMapper} instances. + */ +public interface JacksonProvider { + /** + * Provides an {@link ObjectMapper} instance. + * + * @return an {@link ObjectMapper} instance. + */ + ObjectMapper provide(); +} diff --git a/redis/src/main/java/net/infumia/pubsub/BrokerRedis.java b/redis/src/main/java/net/infumia/pubsub/BrokerRedis.java index c756332..a53ed8a 100644 --- a/redis/src/main/java/net/infumia/pubsub/BrokerRedis.java +++ b/redis/src/main/java/net/infumia/pubsub/BrokerRedis.java @@ -56,7 +56,10 @@ public void message(final String pattern, final String channel, final String mes BrokerRedis.this.callHandlers(channel, message); } }); - this.subscribeConnection.sync().psubscribe(this.channelPrefixes.get().toArray(new String[0])); + final String[] channels = this.channelPrefixes.get().stream() + .map(s -> s + "*") + .toArray(String[]::new); + this.subscribeConnection.sync().psubscribe(channels); } @Override diff --git a/settings.gradle.kts b/settings.gradle.kts index 16b9972..38c6d16 100644 --- a/settings.gradle.kts +++ b/settings.gradle.kts @@ -4,7 +4,7 @@ plugins { rootProject.name = "pubsub" -include("common", "redis", "kotlin-extensions", "kotlin-protobuf") +include("common", "redis", "jackson", "kotlin-extensions", "kotlin-protobuf") project(":kotlin-extensions").projectDir = file("kotlin/extensions") project(":kotlin-protobuf").projectDir = file("kotlin/protobuf")