Skip to content

Commit

Permalink
[#20,#26] Use Memoized Handler for send, always use timeouts for replies
Browse files Browse the repository at this point in the history
  • Loading branch information
petermd committed Nov 19, 2014
1 parent 79e299e commit 0fc2dfc
Show file tree
Hide file tree
Showing 3 changed files with 90 additions and 35 deletions.
54 changes: 42 additions & 12 deletions src/main/java/io/vertx/rxcore/java/eventbus/RxEventBus.java
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package io.vertx.rxcore.java.eventbus;

import io.vertx.rxcore.java.impl.HandlerSubscription;
import io.vertx.rxcore.java.impl.MemoizeHandler;
import io.vertx.rxcore.java.impl.SingleSubscriptionHandler;
import org.vertx.java.core.AsyncResult;
import org.vertx.java.core.Handler;
Expand Down Expand Up @@ -34,17 +35,17 @@ public class RxEventBus {
// Customer handlers

/** Standard SendHandler */
protected static class SendHandler<R> extends SingleSubscriptionHandler<RxMessage<R>,Message<R>> {
protected class SendHandler<R> extends MemoizeHandler<RxMessage<R>,Message<R>> {
@Override public void handle(Message m) {
fireResult(new RxMessage(m));
complete(new RxMessageImpl(m));
}
}

/** Async SendHandler */
protected static class AsyncSendHandler<R> extends SingleSubscriptionHandler<RxMessage<R>, AsyncResult<Message<R>>> {
protected class AsyncSendHandler<R> extends SingleSubscriptionHandler<RxMessage<R>, AsyncResult<Message<R>>> {
@Override public void handle(AsyncResult<Message<R>> r) {
if (r.succeeded()) {
fireResult(new RxMessage(r.result()));
fireResult(new RxMessageImpl(r.result()));
}
else {
fireError(r.cause());
Expand All @@ -53,7 +54,7 @@ protected static class AsyncSendHandler<R> extends SingleSubscriptionHandler<RxM
}

/** Async HandlerSubscription */
protected static class AsyncSendSubscription<R> extends HandlerSubscription<AsyncResult<Message<R>>,RxMessage<R>> {
protected class AsyncSendSubscription<R> extends HandlerSubscription<AsyncResult<Message<R>>,RxMessage<R>> {

/** Create new AsyncSendSubscription */
public AsyncSendSubscription(Subscriber<RxMessage<R>> s) {
Expand All @@ -63,7 +64,7 @@ public AsyncSendSubscription(Subscriber<RxMessage<R>> s) {
/** Handle event */
public void handle(AsyncResult<Message<R>> evt) {
if (evt.succeeded()) {
fireComplete(new RxMessage(evt.result()));
fireComplete(new RxMessageImpl(evt.result()));
}
else {
fireError(evt.cause());
Expand All @@ -72,19 +73,48 @@ public void handle(AsyncResult<Message<R>> evt) {
}

/** Receive handler */
protected static class ReceiveHandler<R> extends SingleSubscriptionHandler<RxMessage<R>,Message> {
protected class ReceiveHandler<R> extends SingleSubscriptionHandler<RxMessage<R>,Message> {
@Override public void handle(Message m) {
fireNext(new RxMessage(m));
fireNext(new RxMessageImpl(m));
}
}


/** RxMessage implementation with inherited timeouts */
protected class RxMessageImpl<R> extends RxMessage<R>
{
/** Create new RxMessageImpl */
public RxMessageImpl(Message<R> coreMessage) {
super(coreMessage);
}

/** Observe a reply */
public <R,T> Observable<RxMessage<T>> observeReply(final R msg)
{
return Observable.create(new AsyncSendHandler<T>() {
@Override public void execute() {
coreMessage.replyWithTimeout(msg,defaultTimeout,this);
}
});
}

/** Observe a reply with timeout */
public <R,T> Observable<RxMessage<T>> observeReplyWithTimeout(final R msg, final long timeout) {
return Observable.create(new AsyncSendHandler<T>() {
@Override public void execute() {
coreMessage.replyWithTimeout(msg,timeout,this);
}
});
}

}

// Instance variables

/** Core bus */
private final EventBus eventBus;

/** Default timeout */
private final int defaultTimeout;
protected final int defaultTimeout;

// Public

Expand All @@ -103,7 +133,7 @@ public RxEventBus(EventBus eventBus, int defaultTimeout) {
public <S,R> Observable<RxMessage<R>> send(final String address, final S msg) {
SendHandler<R> h=new SendHandler<R>();
this.eventBus.send(address,msg,(Handler)h);
return Observable.create(h);
return Observable.create(h.subscribe);
}

/** Send a message with timeout */
Expand Down Expand Up @@ -131,7 +161,7 @@ public <S,R> Observable<RxMessage<R>> observeSendWithTimeout(final String addres
/** Send message for each subscription */
public void call(Subscriber<? super RxMessage<R>> subscriber) {
AsyncSendSubscription hs=new AsyncSendSubscription(subscriber);
eventBus.sendWithTimeout(address, (Object)msg, timeout, (Handler)hs);
eventBus.sendWithTimeout(address, (Object)msg, timeout, hs);
subscriber.add(hs);
}
});
Expand Down
22 changes: 5 additions & 17 deletions src/main/java/io/vertx/rxcore/java/eventbus/RxMessage.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package io.vertx.rxcore.java.eventbus;

import org.vertx.java.core.Handler;
import org.vertx.java.core.eventbus.Message;
import rx.Observable;

Expand All @@ -20,10 +21,10 @@
*
* @author <a href="http://tfox.org">Tim Fox</a>
*/
public class RxMessage<T> {
public abstract class RxMessage<T> {

/** Core Message */
private final Message<T> coreMessage;
protected final Message<T> coreMessage;

/** Wrap Message with RxMessage */
RxMessage(Message<T> coreMessage) {
Expand All @@ -49,7 +50,6 @@ public String replyAddress() {
return coreMessage.replyAddress();
}


/**
* @return The underlying core message
*/
Expand All @@ -68,20 +68,8 @@ public <R> void reply(final R msg) {
}

/** Observe a reply */
public <R,T> Observable<RxMessage<T>> observeReply(final R msg) {
return Observable.create(new RxEventBus.SendHandler<T>() {
@Override public void execute() {
coreMessage.reply(msg,this);
}
});
}
public abstract <R,T> Observable<RxMessage<T>> observeReply(final R msg);

/** Observe a reply with timeout */
public <R,T> Observable<RxMessage<T>> observerReplyWithTimeout(final R msg, final long timeout) {
return Observable.create(new RxEventBus.AsyncSendHandler<T>() {
@Override public void execute() {
coreMessage.replyWithTimeout(msg,timeout,this);
}
});
}
public abstract <R,T> Observable<RxMessage<T>> observeReplyWithTimeout(final R msg, final long timeout);
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
* @author <a href="http://tfox.org">Tim Fox</a>
*/

import java.util.concurrent.TimeoutException;
import java.util.concurrent.atomic.AtomicInteger;

import io.vertx.rxcore.RxSupport;
import io.vertx.rxcore.java.eventbus.RxEventBus;
import io.vertx.rxcore.java.eventbus.RxMessage;
import io.vertx.rxcore.java.eventbus.RxStream;
Expand All @@ -29,17 +29,15 @@
import org.vertx.java.core.Handler;
import org.vertx.java.core.buffer.Buffer;
import org.vertx.java.core.eventbus.Message;
import org.vertx.java.core.eventbus.ReplyException;
import org.vertx.java.core.json.JsonArray;
import org.vertx.testtools.TestVerticle;
import rx.Observable;
import rx.Subscriber;
import rx.Subscription;
import rx.functions.*;
import rx.subscriptions.Subscriptions;

import static io.vertx.rxcore.test.integration.java.RxAssert.assertCountThenComplete;
import static io.vertx.rxcore.test.integration.java.RxAssert.assertMessageThenComplete;
import static io.vertx.rxcore.test.integration.java.RxAssert.assertSingle;
import static io.vertx.rxcore.test.integration.java.RxAssert.assertError;
import static org.vertx.testtools.VertxAssert.assertEquals;
import static org.vertx.testtools.VertxAssert.testComplete;

Expand All @@ -64,6 +62,31 @@ public void call(RxMessage<String> message) {
});
}

@Test
public void testDeferredSubscribe() {
RxEventBus rxEventBus = new RxEventBus(vertx.eventBus());
rxEventBus.<String>registerHandler("foo").subscribe(new Action1<RxMessage<String>>() {
@Override
public void call(RxMessage<String> message) {
message.reply("pong!");
}
});
// Message is sent on send
final Observable<RxMessage<String>> obs = rxEventBus.send("foo", "ping!");
// Defer the subscribe
vertx.setTimer(100,new Handler<Long>() {
public void handle(Long ev) {
obs.subscribe(new Action1<RxMessage<String>>() {
@Override
public void call(RxMessage<String> message) {
assertEquals("pong!", message.body());
testComplete();
}
});
}
});
}

@Test
// Send some messages in series - i.e. wait for result of previous one before sending next one
// PMCD: Added check to enforce 1-at-a-time
Expand Down Expand Up @@ -232,6 +255,21 @@ public Observable<RxMessage<String>> call(RxMessage<String> stringRxMessage) {
assertMessageThenComplete(obsSend2,"goodday2");
}

@Test
public void testTimeout() {
final RxEventBus rx=new RxEventBus(vertx.eventBus());

// Register handler that timesout
rx.<String>registerHandler("thewall").subscribe(new Action1<RxMessage<String>>() {
public void call(RxMessage<String> req) {
// No-one listens
assertError(req.observeReplyWithTimeout("pong",200),ReplyException.class);
}
});

rx.send("thewall","ping");
}

@Test
public void testRetry() {

Expand Down Expand Up @@ -359,5 +397,4 @@ public Buffer call(JsonArray data) {

assertCountThenComplete(regulator.stream(res,out),401);
}

}

0 comments on commit 0fc2dfc

Please sign in to comment.