Skip to content

Commit

Permalink
add coroutines support. (#15)
Browse files Browse the repository at this point in the history
* add coroutines support.

* simplify.

* simplify artifact-ids in gradle.properties.

* seperate the codec.
  • Loading branch information
portlek authored Jun 14, 2024
1 parent 59b8e1a commit 9bfde17
Show file tree
Hide file tree
Showing 31 changed files with 438 additions and 109 deletions.
10 changes: 8 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,9 @@ repositories {
}
dependencies {
// Base module
// Base modules
implementation "net.infumia:pubsub:VERSION"
implementation "net.infumia:pubsub-codec:VERSION"
// Required, https://mvnrepository.com/artifact/com.github.ben-manes.caffeine/caffeine/
implementation "com.github.ben-manes.caffeine:caffeine:2.9.3" // for java-8+
implementation "com.github.ben-manes.caffeine:caffeine:3.1.8" // for java-11+
Expand All @@ -21,12 +22,17 @@ dependencies {
// A simple codec using Jackson (Optional)
implementation "net.infumia:pubsub-jackson:VERSION"
// Required, https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind
// Required, https://mvnrepository.com/artifact/com.fasterxml.jackson.core/jackson-databind/
implementation "com.fasterxml.jackson.core:jackson-databind:2.17.1"
// Kotlin extensions (Optional)
implementation "net.infumia:pubsub-kotlin:VERSION"
// Kotlin coroutines (Optional)
implementation "net.infumia:pubsub-kotlin-coroutines:VERSION"
// Required, https://mvnrepository.com/artifact/org.jetbrains.kotlinx/kotlinx-coroutines-core/
implementation "org.jetbrains.kotlinx:kotlinx-coroutines-core:1.8.1"
// Kotlin protobuf serializer (Optional)
implementation "net.infumia:pubsub-kotlin-protobuf:VERSION"
// Required, https://mvnrepository.com/artifact/org.jetbrains.kotlin/kotlin-reflect/
Expand Down
3 changes: 2 additions & 1 deletion build.gradle.kts
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ subprojects {
}
}

val projectName = project.property("artifact-id") as String
val moduleName = project.findProperty("artifact-id") as String?
val projectName = "pubsub${if (moduleName == null) "" else "-$moduleName"}"
val signRequired = project.hasProperty("sign-required")

extensions.configure<MavenPublishBaseExtension> {
Expand Down
1 change: 1 addition & 0 deletions codec/gradle.properties
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
artifact-id=codec
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@
/**
* The interface for encoding and decoding objects of type {@link T} to and from byte arrays.
*
* @param <T> the type of the object to be encoded and decoded.
* @param <T> type of the object to be encoded and decoded.
*/
public interface Codec<T> {
/**
Expand All @@ -17,7 +17,7 @@ public interface Codec<T> {
/**
* Decodes a byte array into an object of type {@link T}.
*
* @param bytes the byte array to decode.
* @param bytes the bytes array to decode.
* @return the decoded object.
*/
T decode(byte[] bytes);
Expand Down
2 changes: 2 additions & 0 deletions common/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,3 +1,5 @@
dependencies {
compileOnly(project(":codec"))

compileOnly(libs.caffeine)
}
1 change: 0 additions & 1 deletion common/gradle.properties

This file was deleted.

63 changes: 15 additions & 48 deletions common/src/main/java/net/infumia/pubsub/Broker.java
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
package net.infumia.pubsub;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
Expand Down Expand Up @@ -34,17 +33,16 @@ public interface Broker extends AutoCloseable {
* @param message the message to send. Cannot be {@code null}
* @param targets the targets to send the message to. Can be empty, cannot be {@code null}
*/
default void send(final Object message, final Target... targets) {
this.send(message, Arrays.asList(targets));
}
void send(Object message, Target... targets);

/**
* Listens for messages using a handler.
*
* @param handler the handler to process incoming messages. Cannot be {@code null}
* @param <T> the type of the message to listen for.
* @return an {@link AutoCloseable} to stop listening. Cannot be {@code null}
*/
AutoCloseable listen(Handler<?> handler);
<T> AutoCloseable listen(Handler<T> handler);

/**
* Listens for messages of a specific type using a consumer.
Expand All @@ -54,19 +52,7 @@ default void send(final Object message, final Target... targets) {
* @param <T> the type of the message to listen for.
* @return an {@link AutoCloseable} to stop listening. Cannot be {@code null}
*/
default <T> AutoCloseable listen(final Class<T> type, final Consumer<T> handler) {
return this.listen(new Handler<T>() {
@Override
public Class<T> type() {
return type;
}

@Override
public void handle(final T t) {
handler.accept(t);
}
});
}
<T> AutoCloseable listen(Class<T> type, Consumer<T> handler);

/**
* Sends a request to targets and awaits a response within a timeout.
Expand All @@ -91,46 +77,39 @@ <R> CompletableFuture<R> request(Object message, Class<R> responseType, Duration
* @param <R> the type of the response.
* @return a {@link CompletableFuture} representing the response. Cannot be {@code null}
*/
default <R> CompletableFuture<R> request(final Object message, final Class<R> responseType, final Duration timeout,
final Target... targets) {
return this.request(message, responseType, timeout, Arrays.asList(targets));
}
<R> CompletableFuture<R> request(Object message, Class<R> responseType, Duration timeout, Target... targets);

/**
* Sends a request and awaits a response using the default timeout.
* Sends a request to targets and awaits a response using the default timeout.
*
* @param message the request message. Cannot be {@code null}
* @param responseType the class object representing the response type {@link R}. Cannot be {@code null}
* @param targets the targets to send the request to. Can be empty, cannot be {@code null}
* @param targets the targets to send the request to. Cannot be {@code null}
* @param <R> the type of the response.
* @return a {@link CompletableFuture} representing the response. Cannot be {@code null}
*/
default <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Target... targets) {
return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets);
}
<R> CompletableFuture<R> request(Object message, Class<R> responseType, Collection<Target> targets);

/**
* Sends a request to targets and awaits a response using the default timeout.
* Sends a request and awaits a response using the default timeout.
*
* @param message the request message. Cannot be {@code null}
* @param responseType the class object representing the response type {@link R}. Cannot be {@code null}
* @param targets the targets to send the request to. Cannot be {@code null}
* @param targets the targets to send the request to. Can be empty, cannot be {@code null}
* @param <R> the type of the response.
* @return a {@link CompletableFuture} representing the response. Cannot be {@code null}
*/
default <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Collection<Target> targets) {
return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets);
}
<R> CompletableFuture<R> request(Object message, Class<R> responseType, Target... targets);

/**
* Registers a responder to handle requests of a specific type.
*
* @param responder the responder to handle requests. Cannot be {@code null}
* @param <T> the type of the request message.
* @param <Y> the type of the response.
* @return an {@link AutoCloseable} to stop responding. Cannot be {@code null}
*/
AutoCloseable respond(Responder<?, ?> responder);
<T, Y> AutoCloseable respond(Responder<T, Y> responder);

/**
* Registers a function to respond to requests of a specific type.
Expand All @@ -141,19 +120,7 @@ default <R> CompletableFuture<R> request(final Object message, final Class<R> re
* @param <Y> the type of the response.
* @return an {@link AutoCloseable} to stop responding. Cannot be {@code null}
*/
default <T, Y> AutoCloseable respond(final Class<T> type, final Function<T, Y> responder) {
return this.respond(new Responder<T, Y>() {
@Override
public Class<T> type() {
return type;
}

@Override
public Y handle(final T t) {
return responder.apply(t);
}
});
}
<T, Y> AutoCloseable respond(Class<T> type, Function<T, Y> responder);

@Override
void close();
Expand Down
63 changes: 57 additions & 6 deletions common/src/main/java/net/infumia/pubsub/BrokerStringAbstract.java
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,17 @@
import com.github.benmanes.caffeine.cache.Caffeine;

import java.time.Duration;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Abstract class for a message broker that operates with strings and supports sending, receiving, and responding to messages.
* Abstract class for a message broker that operates with strings and supports sending, receiving,
* and responding to messages.
*/
public abstract class BrokerStringAbstract implements Broker {
private final UUID brokerId = UUID.randomUUID();
Expand Down Expand Up @@ -45,12 +49,26 @@ public final void send(final Object message, final Collection<Target> targets) {
}

@Override
public final AutoCloseable listen(final Handler<?> handler) {
return this.respond(new HandlerToResponder<>(handler));
public void send(final Object message, final Target... targets) {
this.send(message, Arrays.asList(targets));
}

@Override
public final <R> CompletableFuture<R> request(final Object message, final Class<R> responseType, final Duration timeout, final Collection<Target> targets) {
public final <T> AutoCloseable listen(final Handler<T> handler) {
return this.listen(handler.type(), handler);
}

@Override
public <T> AutoCloseable listen(final Class<T> type, final Consumer<T> handler) {
return this.respond(type, message -> {
handler.accept(message);
return null;
});
}

@Override
public final <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Duration timeout, final Collection<Target> targets) {
final Envelope envelope = Internal.newEnvelope(this.codecProvider, this.brokerId, message);
final AwaitingResponder<R> responder = new AwaitingResponder<>(responseType);
this.awaitingResponders.put(envelope.messageId, responder);
Expand All @@ -59,10 +77,43 @@ public final <R> CompletableFuture<R> request(final Object message, final Class<
}

@Override
public final AutoCloseable respond(final Responder<?, ?> responder) {
public <R> CompletableFuture<R> request(final Object message, final Class<R> responseType, final Duration timeout,
final Target... targets) {
return this.request(message, responseType, timeout, Arrays.asList(targets));
}

@Override
public <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Collection<Target> targets) {
return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets);
}

@Override
public <R> CompletableFuture<R> request(final Object message, final Class<R> responseType,
final Target... targets) {
return this.request(message, responseType, Internal.REQUEST_TIMEOUT, targets);
}

@Override
public final <T, Y> AutoCloseable respond(final Responder<T, Y> responder) {
return this.handlerRegistry.register(this.messageTypeId(responder.type()), responder);
}

@Override
public <T, Y> AutoCloseable respond(final Class<T> type, final Function<T, Y> responder) {
return this.respond(new Responder<T, Y>() {
@Override
public Class<T> type() {
return type;
}

@Override
public Y apply(final T message) {
return responder.apply(message);
}
});
}

@Override
public void close() {
this.handlerRegistry.close();
Expand Down Expand Up @@ -146,7 +197,7 @@ private <T, Y> Envelope handleEnvelope(
final Responder<T, Y> responder
) {
final T decoded = this.codecProvider.provide(responder.type()).decode(envelope.messagePayload);
final Y response = responder.handle(decoded);
final Y response = responder.apply(decoded);
if (response == null) {
return null;
} else {
Expand Down
11 changes: 3 additions & 8 deletions common/src/main/java/net/infumia/pubsub/Handler.java
Original file line number Diff line number Diff line change
@@ -1,22 +1,17 @@
package net.infumia.pubsub;

import java.util.function.Consumer;

/**
* The interface for handling Pub/Sub messages of type {@link T}.
*
* @param <T> the type of the message to be handled.
*/
public interface Handler<T> {
public interface Handler<T> extends Consumer<T> {
/**
* Retrieves the class type of the message being handled.
*
* @return the {@link Class} object representing the type {@link T}.
*/
Class<T> type();

/**
* Handles a message of type {@link T} received from Pub/Sub.
*
* @param t the message to handle.
*/
void handle(T t);
}
20 changes: 0 additions & 20 deletions common/src/main/java/net/infumia/pubsub/HandlerToResponder.java

This file was deleted.

12 changes: 3 additions & 9 deletions common/src/main/java/net/infumia/pubsub/Responder.java
Original file line number Diff line number Diff line change
@@ -1,25 +1,19 @@
package net.infumia.pubsub;

import java.util.function.Function;

/**
* The interface for handling Pub/Sub messages of type {@code T} and providing a response of type {@code Y}.
*
* @param <T> the type of the message to be handled.
* @param <Y> the type of the response.
*/
public interface Responder<T, Y> {
public interface Responder<T, Y> extends Function<T, Y> {

/**
* Retrieves the class type of the message being handled.
*
* @return the {@link Class} object representing the type {@link T}.
*/
Class<T> type();

/**
* Handles a message of type {@link T} received from Pub/Sub and provides a response of type {@link Y}.
*
* @param t the message to handle.
* @return the response of type {@link Y}.
*/
Y handle(T t);
}
3 changes: 2 additions & 1 deletion gradle/libs.versions.toml
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,11 @@ indra = "3.1.3"
[libraries]
caffeine = { module = "com.github.ben-manes.caffeine:caffeine", version = "2.9.3" }
redis = { module = "io.lettuce:lettuce-core", version = "6.3.2.RELEASE" }
jackson = { module = "com.fasterxml.jackson.core:jackson-databind", version = "2.17.1" }
kotlin-reflect = { module = "org.jetbrains.kotlin:kotlin-reflect", version.ref = "kotlin" }
kotlinx-serialization-core = { module = "org.jetbrains.kotlinx:kotlinx-serialization-core", version.ref = "kotlinserialization" }
kotlinx-serialization-protobuf = { module = "org.jetbrains.kotlinx:kotlinx-serialization-protobuf", version.ref = "kotlinserialization" }
jackson = { module = "com.fasterxml.jackson.core:jackson-databind", version = "2.17.1" }
kotlinx-coroutines = { module = "org.jetbrains.kotlinx:kotlinx-coroutines-core", version = "1.8.1" }

[plugins]
kotlin = { id = "org.jetbrains.kotlin.jvm", version.ref = "kotlin" }
Expand Down
2 changes: 1 addition & 1 deletion jackson/build.gradle.kts
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
dependencies {
compileOnly(project(":common"))
compileOnly(project(":codec"))

compileOnly(libs.jackson)
}
2 changes: 1 addition & 1 deletion jackson/gradle.properties
Original file line number Diff line number Diff line change
@@ -1 +1 @@
artifact-id=pubsub-jackson
artifact-id=jackson
Loading

0 comments on commit 9bfde17

Please sign in to comment.