Skip to content

Commit

Permalink
Support of @ExecuteOn(Scheduler) (#865)
Browse files Browse the repository at this point in the history
  • Loading branch information
artem-v authored Oct 14, 2024
1 parent 07c9b05 commit a4d844d
Show file tree
Hide file tree
Showing 10 changed files with 450 additions and 49 deletions.
56 changes: 55 additions & 1 deletion services-api/src/main/java/io/scalecube/services/Reflect.java
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@
import static io.scalecube.services.CommunicationMode.REQUEST_RESPONSE;
import static io.scalecube.services.CommunicationMode.REQUEST_STREAM;

import io.scalecube.services.annotations.ExecuteOn;
import io.scalecube.services.annotations.RequestType;
import io.scalecube.services.annotations.ResponseType;
import io.scalecube.services.annotations.Service;
Expand All @@ -28,6 +29,8 @@
import org.reactivestreams.Publisher;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Scheduler;
import reactor.core.scheduler.Schedulers;

public class Reflect {

Expand Down Expand Up @@ -172,7 +175,8 @@ public static Map<Method, MethodInfo> methodsInfo(Class<?> serviceInterface) {
method.getParameterCount(),
requestType(method),
isRequestTypeServiceMessage(method),
isSecured(method)))));
isSecured(method),
null))));
}

/**
Expand Down Expand Up @@ -379,4 +383,54 @@ public static boolean isSecured(Method method) {
return method.isAnnotationPresent(Secured.class)
|| method.getDeclaringClass().isAnnotationPresent(Secured.class);
}

public static Scheduler executeOnScheduler(Method method, Map<String, Scheduler> schedulers) {
final Class<?> declaringClass = method.getDeclaringClass();

if (method.isAnnotationPresent(ExecuteOn.class)) {
final var executeOn = method.getAnnotation(ExecuteOn.class);
final var name = executeOn.value();
final var scheduler = schedulers.get(name);
if (scheduler == null) {
throw new IllegalArgumentException(
"Wrong @ExecuteOn definition on "
+ declaringClass.getName()
+ "."
+ method.getName()
+ ": scheduler (name="
+ name
+ ") cannot be found");
}
return scheduler;
}

// If @ExecuteOn annotation is not present on service method, then find it on service class

ExecuteOn executeOn = null;
for (var clazz = declaringClass; clazz != null; clazz = clazz.getSuperclass()) {
executeOn = clazz.getAnnotation(ExecuteOn.class);
if (executeOn != null) {
break;
}
}

if (executeOn == null) {
return Schedulers.immediate();
}

final var name = executeOn.value();
final var scheduler = schedulers.get(name);
if (scheduler == null) {
throw new IllegalArgumentException(
"Wrong @ExecuteOn definition on "
+ declaringClass.getName()
+ "."
+ method.getName()
+ ": scheduler (name="
+ name
+ ") cannot be found");
}

return scheduler;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,26 @@
package io.scalecube.services.annotations;

import static java.lang.annotation.ElementType.METHOD;
import static java.lang.annotation.ElementType.TYPE;
import static java.lang.annotation.RetentionPolicy.RUNTIME;

import java.lang.annotation.Documented;
import java.lang.annotation.Retention;
import java.lang.annotation.Target;

/**
* This annotation is used to mark that particular service method or all service methods will be
* executed in the specified scheduler.
*/
@Documented
@Target({METHOD, TYPE})
@Retention(RUNTIME)
public @interface ExecuteOn {

/**
* Returns scheduler name.
*
* @return scheduler name
*/
String value();
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

/** Indicates that an annotated class is an Service Fabric service object. */
/** Indicates that annotated class is a ScaleCube service object. */
@Retention(RetentionPolicy.RUNTIME)
@Target(ElementType.TYPE)
public @interface Service {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
import java.lang.annotation.Target;

/**
* Indicates that an annotated method is a service method available via Service Fabric framework.
* Indicates that annotated method is a ScaleCube service method.
*/
@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.METHOD, ElementType.ANNOTATION_TYPE})
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import io.scalecube.services.api.Qualifier;
import java.lang.reflect.Type;
import java.util.StringJoiner;
import reactor.core.scheduler.Scheduler;

public final class MethodInfo {

Expand All @@ -17,6 +18,7 @@ public final class MethodInfo {
private final Class<?> requestType;
private final boolean isRequestTypeServiceMessage;
private final boolean isSecured;
private final Scheduler scheduler;

/**
* Create a new service info.
Expand All @@ -30,6 +32,7 @@ public final class MethodInfo {
* @param requestType the type of the request
* @param isRequestTypeServiceMessage is request service message
* @param isSecured is method protected by authentication
* @param scheduler scheduler
*/
public MethodInfo(
String serviceName,
Expand All @@ -40,7 +43,8 @@ public MethodInfo(
int parameterCount,
Class<?> requestType,
boolean isRequestTypeServiceMessage,
boolean isSecured) {
boolean isSecured,
Scheduler scheduler) {
this.parameterizedReturnType = parameterizedReturnType;
this.isReturnTypeServiceMessage = isReturnTypeServiceMessage;
this.communicationMode = communicationMode;
Expand All @@ -51,6 +55,7 @@ public MethodInfo(
this.requestType = requestType;
this.isRequestTypeServiceMessage = isRequestTypeServiceMessage;
this.isSecured = isSecured;
this.scheduler = scheduler;
}

public String serviceName() {
Expand Down Expand Up @@ -101,19 +106,24 @@ public boolean isSecured() {
return isSecured;
}

public Scheduler scheduler() {
return scheduler;
}

@Override
public String toString() {
return new StringJoiner(", ", MethodInfo.class.getSimpleName() + "[", "]")
.add("serviceName=" + serviceName)
.add("methodName=" + methodName)
.add("qualifier=" + qualifier)
.add("serviceName='" + serviceName + "'")
.add("methodName='" + methodName + "'")
.add("qualifier='" + qualifier + "'")
.add("parameterizedReturnType=" + parameterizedReturnType)
.add("isReturnTypeServiceMessage=" + isReturnTypeServiceMessage)
.add("communicationMode=" + communicationMode)
.add("parameterCount=" + parameterCount)
.add("requestType=" + requestType)
.add("isRequestTypeServiceMessage=" + isRequestTypeServiceMessage)
.add("isSecured=" + isSecured)
.add("scheduler=" + scheduler)
.toString();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -70,7 +70,8 @@ public Mono<ServiceMessage> invokeOne(ServiceMessage message) {
.flatMap(authData -> deferWithContextOne(message, authData))
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
.onErrorResume(
throwable -> Mono.just(errorMapper.toMessage(message.qualifier(), throwable)));
throwable -> Mono.just(errorMapper.toMessage(message.qualifier(), throwable)))
.subscribeOn(methodInfo.scheduler());
}

/**
Expand All @@ -84,7 +85,8 @@ public Flux<ServiceMessage> invokeMany(ServiceMessage message) {
.flatMapMany(authData -> deferWithContextMany(message, authData))
.map(response -> toResponse(response, message.qualifier(), message.dataFormat()))
.onErrorResume(
throwable -> Flux.just(errorMapper.toMessage(message.qualifier(), throwable)));
throwable -> Flux.just(errorMapper.toMessage(message.qualifier(), throwable)))
.subscribeOn(methodInfo.scheduler());
}

/**
Expand All @@ -104,7 +106,8 @@ public Flux<ServiceMessage> invokeBidirectional(Publisher<ServiceMessage> publis
toResponse(response, first.get().qualifier(), first.get().dataFormat()))
.onErrorResume(
throwable ->
Flux.just(errorMapper.toMessage(first.get().qualifier(), throwable))));
Flux.just(errorMapper.toMessage(first.get().qualifier(), throwable)))
.subscribeOn(methodInfo.scheduler()));
}

private Mono<?> deferWithContextOne(ServiceMessage message, Object authData) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.mockito.Mockito;
import reactor.core.publisher.Flux;
import reactor.core.publisher.Mono;
import reactor.core.scheduler.Schedulers;
import reactor.test.StepVerifier;

class ServiceMethodInvokerTest {
Expand Down Expand Up @@ -62,7 +63,8 @@ void testInvokeOneWhenReturnNull() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -100,7 +102,8 @@ void testInvokeManyWhenReturnNull() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -138,7 +141,8 @@ void testInvokeBidirectionalWhenReturnNull() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -177,7 +181,8 @@ void testInvokeOneWhenThrowException() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -219,7 +224,8 @@ void testInvokeManyWhenThrowException() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -260,7 +266,8 @@ void testInvokeBidirectionalWhenThrowException() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -305,7 +312,8 @@ void testAuthMethodWhenNoContextAndNoAuthenticator() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -347,7 +355,8 @@ void testAuthMethodWhenThereIsContextAndNoAuthenticator() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

serviceMethodInvoker =
new ServiceMethodInvoker(
Expand Down Expand Up @@ -387,7 +396,8 @@ void testAuthMethodWhenNoContextButThereIsAuthenticator() throws Exception {
method.getParameterCount(),
Void.TYPE,
IS_REQUEST_TYPE_SERVICE_MESSAGE,
AUTH);
AUTH,
Schedulers.immediate());

//noinspection unchecked,rawtypes
Authenticator<Map> mockedAuthenticator = Mockito.mock(Authenticator.class);
Expand Down
Loading

0 comments on commit a4d844d

Please sign in to comment.