Skip to content

Commit

Permalink
Merge pull request #12 from dartartem/master
Browse files Browse the repository at this point in the history
Extended logging.
  • Loading branch information
cer authored Apr 14, 2021
2 parents 75e0673 + 0827dda commit e106c08
Show file tree
Hide file tree
Showing 6 changed files with 99 additions and 1 deletion.
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.eventuate.tram.messaging.proxy.consumer;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;

public class CommandSubscriptionData {
private String channel;
private String resource;
Expand Down Expand Up @@ -37,4 +39,9 @@ public String getCommands() {
public void setCommands(String commands) {
this.commands = commands;
}

@Override
public String toString() {
return ReflectionToStringBuilder.toString(this);
}
}
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.eventuate.tram.messaging.proxy.consumer;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;

public class EventSubscriptionData {
private String aggregate;
private String events;
Expand Down Expand Up @@ -28,4 +30,9 @@ public String getBaseUrl() {
public void setBaseUrl(String baseUrl) {
this.baseUrl = baseUrl;
}

@Override
public String toString() {
return ReflectionToStringBuilder.toString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@

import io.eventuate.tram.messaging.proxy.service.ProxyConfiguration;
import io.eventuate.tram.messaging.proxy.service.SubscriptionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
Expand All @@ -12,9 +14,18 @@
@Import(ProxyConfiguration.class)
public class EventuateMessageSubscriberConfiguration {

private Logger logger = LoggerFactory.getLogger(getClass());

@Bean
public EventuateTramHttpMessageSubscriptionInitializer eventuateTramRestMessageSubscriptionInitializer(SubscriptionService subscriptionService,
EventuateSubscriptionProperties eventuateSubscriptionProperties) {
return new EventuateTramHttpMessageSubscriptionInitializer(eventuateSubscriptionProperties, subscriptionService);
logger.info("Creating EventuateTramHttpMessageSubscriptionInitializer bean");

EventuateTramHttpMessageSubscriptionInitializer eventuateTramHttpMessageSubscriptionInitializer =
new EventuateTramHttpMessageSubscriptionInitializer(eventuateSubscriptionProperties, subscriptionService);

logger.info("Created EventuateTramHttpMessageSubscriptionInitializer bean");

return eventuateTramHttpMessageSubscriptionInitializer;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package io.eventuate.tram.messaging.proxy.consumer;

import io.eventuate.tram.messaging.proxy.service.SubscriptionService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.PostConstruct;
import java.util.Arrays;
Expand All @@ -9,6 +11,8 @@
import java.util.stream.Collectors;

public class EventuateTramHttpMessageSubscriptionInitializer {
private Logger logger = LoggerFactory.getLogger(getClass());

private EventuateSubscriptionProperties eventuateSubscriptionProperties;
private SubscriptionService subscriptionService;

Expand All @@ -21,56 +25,92 @@ public EventuateTramHttpMessageSubscriptionInitializer(EventuateSubscriptionProp

@PostConstruct
public void subscribe() {
logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribe() started");

subscribeToMessages();
subscribeToEvents();
subscribeToCommands();
subscribeToReplies();

logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribe() finished");
}

private void subscribeToMessages() {
logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribeToMessages() started");

eventuateSubscriptionProperties.getMessage().keySet().forEach(subscriberId -> {
MessageSubscriptionData messageSubscriptionData = eventuateSubscriptionProperties.getMessage().get(subscriberId);

logger.info("Subscribing to message {} with subscriber id {}", messageSubscriptionData, subscriberId);

subscriptionService.subscribeToMessage(subscriberId,
stringToSet(messageSubscriptionData.getChannels()),
messageSubscriptionData.getBaseUrl(),
subscriberId);

logger.info("Subscribed to message {} with id {}", messageSubscriptionData, subscriberId);
});

logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribeToMessages() finished");
}

private void subscribeToEvents() {
logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribeToEvents() started");

eventuateSubscriptionProperties.getEvent().keySet().forEach(subscriberId -> {
EventSubscriptionData eventSubscriptionData = eventuateSubscriptionProperties.getEvent().get(subscriberId);

logger.info("Subscribing to event {} with subscriber id {}", eventSubscriptionData, subscriberId);

subscriptionService.subscribeToEvent(subscriberId,
eventSubscriptionData.getAggregate(),
stringToSet(eventSubscriptionData.getEvents()),
eventSubscriptionData.getBaseUrl());

logger.info("Subscribed to event {} with subscriber id {}", eventSubscriptionData, subscriberId);
});

logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribeToEvents() finished");
}

private void subscribeToCommands() {
logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribeToCommands() started");

eventuateSubscriptionProperties.getCommand().keySet().forEach(dispatcherId -> {
CommandSubscriptionData commandSubscriptionData = eventuateSubscriptionProperties.getCommand().get(dispatcherId);

logger.info("Subscribing to command {} with dispatcher id {}", commandSubscriptionData, dispatcherId);

subscriptionService.subscribeToCommand(dispatcherId,
commandSubscriptionData.getChannel(),
Optional.ofNullable(commandSubscriptionData.getResource()),
stringToSet(commandSubscriptionData.getCommands()),
commandSubscriptionData.getBaseUrl());

logger.info("Subscribed to command {} with dispatcher id {}", commandSubscriptionData, dispatcherId);
});

logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribeToCommands() finished");
}

private void subscribeToReplies() {
logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribeToReplies() started");

eventuateSubscriptionProperties.getReply().keySet().forEach(subscriberId -> {
ReplySubscriptionData replySubscriptionData = eventuateSubscriptionProperties.getReply().get(subscriberId);

logger.info("Subscribing to reply {} with subscriber id {}", replySubscriptionData, subscriberId);

subscriptionService.subscribeToReply(subscriberId,
replySubscriptionData.getReplyChannel(),
Optional.ofNullable(replySubscriptionData.getResource()),
stringToSet(replySubscriptionData.getCommands()),
replySubscriptionData.getBaseUrl());

logger.info("Subscribed to reply {} with subscriber id {}", replySubscriptionData, subscriberId);
});

logger.info("EventuateTramHttpMessageSubscriptionInitializer.subscribeToReplies() finished");
}

private Set<String> stringToSet(String value) {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,5 +1,7 @@
package io.eventuate.tram.messaging.proxy.consumer;

import org.apache.commons.lang.builder.ReflectionToStringBuilder;

public class MessageSubscriptionData {
private String channels;
private String baseUrl;
Expand All @@ -19,4 +21,9 @@ public String getBaseUrl() {
public void setBaseUrl(String baseUrl) {
this.baseUrl = baseUrl;
}

@Override
public String toString() {
return ReflectionToStringBuilder.toString(this);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
import io.eventuate.tram.messaging.common.Message;
import io.eventuate.tram.messaging.consumer.MessageSubscription;
import io.eventuate.tram.messaging.producer.MessageBuilder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.http.HttpEntity;
import org.springframework.http.HttpHeaders;
import org.springframework.http.MediaType;
Expand All @@ -28,6 +30,8 @@
import java.util.stream.Collectors;

public class SubscriptionService {
private Logger logger = LoggerFactory.getLogger(getClass());

private SubscriptionPersistenceService subscriptionPersistenceService;
private SubscriptionRequestManager subscriptionRequestManager;
private RestTemplate restTemplate;
Expand Down Expand Up @@ -79,6 +83,8 @@ private void publishReply(Message message,
String subscriberId,
Set<String> commands,
Optional<String> resource) {
logger.debug("publishing reply {}", message);

String command = message.getRequiredHeader(CommandMessageHeaders.inReply(CommandMessageHeaders.COMMAND_TYPE));

if (!commands.contains(command)) {
Expand All @@ -98,9 +104,13 @@ private void publishReply(Message message,
message.getRequiredHeader(ReplyMessageHeaders.REPLY_OUTCOME),
message.getHeader(CommandMessageHeaders.inReply(CommandMessageHeaders.RESOURCE)).orElse(""));

logger.debug("sending reply {} to location {}", message, location);

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
restTemplate.postForLocation(location, new HttpEntity<>(message.getPayload(), headers));

logger.debug("sent reply {} to location {}", message, location);
}

public String subscribeToCommand(String commandDispatcherId,
Expand Down Expand Up @@ -155,13 +165,17 @@ private void publishMessage(Message message,
String subscriptionInstanceId) {
String location = callbackUrl + "/" + subscriptionInstanceId;

logger.debug("sending message {} to location {}", message, location);

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
addCommonHeaders(headers, subscriberId, message.getId());

HttpMessage httpMessage = new HttpMessage(message.getId(), message.getHeaders(), message.getPayload());

restTemplate.postForLocation(location, new HttpEntity<>(httpMessage, headers));

logger.debug("sent message {} to location {}", message, location);
}

private void publishEvent(Message message,
Expand All @@ -170,6 +184,8 @@ private void publishEvent(Message message,
String callbackUrl,
String subscriberId) {

logger.debug("publishing event {}", message);

String event = message.getRequiredHeader(EventMessageHeaders.EVENT_TYPE);

if (!events.contains(event)) {
Expand All @@ -184,10 +200,14 @@ private void publishEvent(Message message,
event,
message.getRequiredHeader(Message.ID));

logger.debug("sending event {} to location {}", message, location);

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
addCommonHeaders(headers, subscriberId, message.getId());
restTemplate.postForLocation(location, new HttpEntity<>(message.getPayload(), headers));

logger.debug("sent event {} to location {}", message, location);
}

private void publishCommand(Message message,
Expand All @@ -196,6 +216,8 @@ private void publishCommand(Message message,
Set<String> commands,
String callbackUrl) {

logger.debug("publishing command {}", message);

String command = message.getRequiredHeader(CommandMessageHeaders.COMMAND_TYPE);

if (!commands.contains(command)) {
Expand All @@ -217,12 +239,16 @@ private void publishCommand(Message message,
replyChannel,
resource.isPresent() ? message.getRequiredHeader(CommandMessageHeaders.RESOURCE) : "");

logger.debug("sending command {} to location {}", message, location);

HttpHeaders headers = new HttpHeaders();
headers.setContentType(MediaType.APPLICATION_JSON);
Map<String, String> correlationHeaders = correlationHeaders(message.getHeaders());
headers.add(EventuateHttpHeaders.COMMAND_REPLY_HEADERS, JSonMapper.toJson(correlationHeaders));
addCommonHeaders(headers, commandDispatcherId, message.getId());
restTemplate.postForLocation(location, new HttpEntity<>(message.getPayload(), headers));

logger.debug("sent command {} to location {}", message, location);
}

private boolean shouldPublishResource(Optional<String> resource, Optional<String> messageResource) {
Expand Down

0 comments on commit e106c08

Please sign in to comment.