diff --git a/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/adapters/AsyncToSyncAggregateEventsAdapter.java b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/adapters/AsyncToSyncAggregateEventsAdapter.java index 3e9a085..0991404 100644 --- a/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/adapters/AsyncToSyncAggregateEventsAdapter.java +++ b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/adapters/AsyncToSyncAggregateEventsAdapter.java @@ -4,6 +4,8 @@ import io.eventuate.SubscriberOptions; import io.eventuate.javaclient.commonimpl.AggregateEvents; import io.eventuate.javaclient.commonimpl.SerializedEvent; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Set; @@ -11,6 +13,7 @@ import java.util.function.Function; public class AsyncToSyncAggregateEventsAdapter implements io.eventuate.javaclient.commonimpl.sync.AggregateEvents { + private Logger logger = LoggerFactory.getLogger(getClass()); private io.eventuate.javaclient.commonimpl.AggregateEvents target; private AsyncToSyncTimeoutOptions timeoutOptions = new AsyncToSyncTimeoutOptions(); @@ -21,9 +24,13 @@ public AsyncToSyncAggregateEventsAdapter(AggregateEvents target) { @Override public void subscribe(String subscriberId, Map> aggregatesAndEvents, SubscriberOptions subscriberOptions, Function> handler) { + logger.info("Subscribing: subscriberId = {}, aggregatesAndEvents = {}, options = {}", subscriberId, aggregatesAndEvents, subscriberOptions); try { target.subscribe(subscriberId, aggregatesAndEvents, subscriberOptions, handler).get(timeoutOptions.getTimeout(), timeoutOptions.getTimeUnit()); + logger.info("Subscribed: subscriberId = {}, aggregatesAndEvents = {}, options = {}", subscriberId, aggregatesAndEvents, subscriberOptions); } catch (Throwable e) { + logger.error("Subscription failed", e); + Throwable unwrapped = CompletableFutureUtil.unwrap(e); if (unwrapped instanceof RuntimeException) throw (RuntimeException)unwrapped; diff --git a/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/adapters/SyncToAsyncAggregateEventsAdapter.java b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/adapters/SyncToAsyncAggregateEventsAdapter.java index 32987c0..5685bd8 100644 --- a/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/adapters/SyncToAsyncAggregateEventsAdapter.java +++ b/eventuate-client-java-common-impl/src/main/java/io/eventuate/javaclient/commonimpl/adapters/SyncToAsyncAggregateEventsAdapter.java @@ -4,6 +4,8 @@ import io.eventuate.SubscriberOptions; import io.eventuate.javaclient.commonimpl.SerializedEvent; import io.eventuate.javaclient.commonimpl.sync.AggregateEvents; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.util.Map; import java.util.Set; @@ -11,6 +13,7 @@ import java.util.function.Function; public class SyncToAsyncAggregateEventsAdapter implements io.eventuate.javaclient.commonimpl.AggregateEvents { + private Logger logger = LoggerFactory.getLogger(getClass()); private io.eventuate.javaclient.commonimpl.sync.AggregateEvents target; @@ -21,9 +24,12 @@ public SyncToAsyncAggregateEventsAdapter(AggregateEvents target) { @Override public CompletableFuture subscribe(String subscriberId, Map> aggregatesAndEvents, SubscriberOptions subscriberOptions, Function> handler) { try { + logger.info("Subscribing: subscriberId = {}, aggregatesAndEvents = {}, options = {}", subscriberId, aggregatesAndEvents, subscriberOptions); target.subscribe(subscriberId, aggregatesAndEvents, subscriberOptions, handler); + logger.info("Subscribed: subscriberId = {}, aggregatesAndEvents = {}, options = {}", subscriberId, aggregatesAndEvents, subscriberOptions); return CompletableFuture.completedFuture(null); } catch (RuntimeException e) { + logger.error("Subscription failed", e); return CompletableFutureUtil.failedFuture(e); } } diff --git a/eventuate-client-java-event-dispatching/src/main/java/io/eventuate/javaclient/eventdispatcher/EventDispatcherInitializer.java b/eventuate-client-java-event-dispatching/src/main/java/io/eventuate/javaclient/eventdispatcher/EventDispatcherInitializer.java index e7586ca..6e0b089 100644 --- a/eventuate-client-java-event-dispatching/src/main/java/io/eventuate/javaclient/eventdispatcher/EventDispatcherInitializer.java +++ b/eventuate-client-java-event-dispatching/src/main/java/io/eventuate/javaclient/eventdispatcher/EventDispatcherInitializer.java @@ -8,6 +8,8 @@ import io.eventuate.javaclient.eventhandling.exceptionhandling.EventDeliveryExceptionHandlerManagerImpl; import io.eventuate.javaclient.eventhandling.exceptionhandling.EventDeliveryExceptionHandler; import org.apache.commons.lang.StringUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import org.springframework.core.annotation.AnnotationUtils; import org.springframework.util.ReflectionUtils; @@ -29,6 +31,7 @@ import static java.util.stream.Collectors.toList; public class EventDispatcherInitializer { + protected Logger logger = LoggerFactory.getLogger(getClass()); private EventHandlerProcessor[] processors; private EventuateAggregateStore aggregateStore; @@ -46,6 +49,7 @@ public EventDispatcherInitializer(EventHandlerProcessor[] processors, EventuateA public void registerEventHandler(Object eventHandlerBean, String beanName, Class beanClass) { + logger.info("registering event handler: bean: {}, name: {}, class", eventHandlerBean, beanName, beanClass); List fieldsAndMethods = Stream.concat(Arrays.stream(ReflectionUtils.getUniqueDeclaredMethods(beanClass)), Arrays.stream(beanClass.getDeclaredFields())) @@ -104,8 +108,11 @@ public void registerEventHandler(Object eventHandlerBean, String beanName, Class subscriberOptions, de -> swimlaneBasedDispatcher.dispatch(de, eventDispatcher::dispatch)).get(20, TimeUnit.SECONDS); subscriptionsRegistry.add(new RegisteredSubscription(subscriberId, aggregatesAndEvents, beanClass)); } catch (InterruptedException | TimeoutException | ExecutionException e) { + logger.error("registering event handler failed", e); throw new EventuateSubscriptionFailedException(subscriberId, e); } + + logger.info("registered event handler: bean: {}, name: {}, class", eventHandlerBean, beanName, beanClass); } private boolean isExceptionHandlerField(Field f) { diff --git a/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/EventuateSTOMPClient.java b/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/EventuateSTOMPClient.java index 6d016c5..578434e 100644 --- a/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/EventuateSTOMPClient.java +++ b/eventuate-client-java-http-stomp/src/main/java/io/eventuate/javaclient/stompclient/EventuateSTOMPClient.java @@ -50,10 +50,12 @@ public EventuateSTOMPClient(Vertx vertx, EventuateCredentials eventuateCredentia this.useSsl = uri.getScheme().startsWith("stomp+ssl"); this.customConnectHeaders = customConnectHeaders; if (logger.isInfoEnabled()) - logger.debug("STOMP connection: " + Arrays.asList(host, port, useSsl)); + logger.info("STOMP connection: " + Arrays.asList(host, port, useSsl)); } public void initialize() { + logger.info("Initializing stomp client"); + state.status = ConnectionStatus.CONNECTING; StompClientOptions options = new StompClientOptions(); @@ -67,6 +69,7 @@ public void initialize() { f.setAccessible(true); f.set(options, useSsl); } catch (NoSuchFieldException | IllegalAccessException e) { + logger.error("Initialization of stomp client failed", e); throw new RuntimeException(e); } @@ -83,7 +86,7 @@ public void initialize() { stompClient.connect(x -> { if (x.succeeded()) { - logger.debug("Connected!"); + logger.info("Connected!"); handleConnectSucceeded(x.result()); } else { logger.error("Connect attempt failed", x.cause()); @@ -108,7 +111,7 @@ private void handleConnectFailed() { public void handleClose(Void x) { if (state.status != ConnectionStatus.CLOSED) { - logger.debug("Reconnecting..."); + logger.info("Reconnecting..."); state.status = ConnectionStatus.CONNECTING; initialize(); } @@ -188,6 +191,8 @@ public boolean isConnected() throws ExecutionException, InterruptedException { } public CompletableFuture close() { + logger.info("closing eventuate stomp client"); + CompletableFuture outcome = new CompletableFuture<>(); context.runOnContext(x -> { switch (state.status) { @@ -207,6 +212,9 @@ public CompletableFuture close() { outcome.completeExceptionally(new UnsupportedOperationException("Do not know what to do with this state: " + state.status)); } }); + + logger.info("closed eventuate stomp client"); + return outcome; } @@ -291,7 +299,7 @@ public CompletableFuture subscribe(String subscriberId, Map headers = new HashMap<>(); headers.put(Frame.ID, sub.uniqueId); @@ -300,7 +308,7 @@ private void doSubscribe(Subscription sub) { state.connection.subscribe(destination, headers, frame -> frameHandler(frame, sub), rh -> { if (logger.isInfoEnabled()) - logger.debug("Subscribed: " + sub.subscriberId); + logger.info("Subscribed: " + sub.subscriberId); sub.noteSubscribed(); }); diff --git a/eventuate-client-java/src/main/java/io/eventuate/SubscriberOptions.java b/eventuate-client-java/src/main/java/io/eventuate/SubscriberOptions.java index 540e607..5671c99 100644 --- a/eventuate-client-java/src/main/java/io/eventuate/SubscriberOptions.java +++ b/eventuate-client-java/src/main/java/io/eventuate/SubscriberOptions.java @@ -1,5 +1,7 @@ package io.eventuate; +import org.apache.commons.lang.builder.ToStringBuilder; + public class SubscriberOptions { private SubscriberDurability durability; @@ -28,6 +30,11 @@ public SubscriberInitialPosition getReadFrom() { public boolean isProgressNotifications() { return progressNotifications; } + + @Override + public String toString() { + return ToStringBuilder.reflectionToString(this); + } }