diff --git a/docker-compose-mysql-binlog.yml b/docker-compose-mysql-binlog.yml index 6989642..fe5876a 100755 --- a/docker-compose-mysql-binlog.yml +++ b/docker-compose-mysql-binlog.yml @@ -45,10 +45,6 @@ services: environment: EVENTUATELOCAL_KAFKA_BOOTSTRAP_SERVERS: kafka:29092 EVENTUATE_HTTP_PROXY_ZOOKEEPER_CONNECTION_STRING: zookeeper:2181 - SPRING_DATASOURCE_URL: jdbc:mysql://mysql/eventuate - SPRING_DATASOURCE_USERNAME: mysqluser - SPRING_DATASOURCE_PASSWORD: mysqlpw - SPRING_DATASOURCE_DRIVER_CLASS_NAME: com.mysql.jdbc.Driver EVENTUATE_LOCAL_KAFKA_CONSUMER_PROPERTIES_SESSION_TIMEOUT_MS: 6000 proxy-follower: diff --git a/eventuate-tram-http-spring-consumer/build.gradle b/eventuate-tram-http-spring-consumer/build.gradle deleted file mode 100644 index 6bb5821..0000000 --- a/eventuate-tram-http-spring-consumer/build.gradle +++ /dev/null @@ -1,13 +0,0 @@ -apply plugin: "io.spring.dependency-management" - -dependencies { - compile (project(":eventuate-tram-consumer-http-common")) { - exclude group: "io.micronaut" - } - - compile "io.eventuate.tram.core:eventuate-tram-spring-consumer-jdbc:$eventuateTramVersion" - - compile "org.springframework.boot:spring-boot-starter-web:$springBootVersion" - compile "org.springframework.boot:spring-boot-starter-aop:$springBootVersion" - compile "org.springframework.boot:spring-boot-starter-jdbc:$springBootVersion" -} diff --git a/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandler.java b/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandler.java deleted file mode 100644 index 2865bf1..0000000 --- a/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandler.java +++ /dev/null @@ -1,8 +0,0 @@ -package io.eventuate.tram.http.spring.consumer.duplicatedetection; - -import java.lang.annotation.Retention; -import java.lang.annotation.RetentionPolicy; - -@Retention(RetentionPolicy.RUNTIME) -public @interface IdempotentHandler { -} diff --git a/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandlerAspect.java b/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandlerAspect.java deleted file mode 100644 index 534a621..0000000 --- a/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandlerAspect.java +++ /dev/null @@ -1,44 +0,0 @@ -package io.eventuate.tram.http.spring.consumer.duplicatedetection; - -import io.eventuate.tram.consumer.common.DuplicateMessageDetector; -import io.eventuate.tram.consumer.http.common.EventuateHttpHeaders; -import org.aspectj.lang.ProceedingJoinPoint; -import org.aspectj.lang.annotation.Around; -import org.aspectj.lang.annotation.Aspect; -import org.springframework.transaction.support.TransactionTemplate; -import org.springframework.web.context.request.RequestContextHolder; -import org.springframework.web.context.request.ServletRequestAttributes; - -import javax.servlet.http.HttpServletRequest; - -@Aspect -public class IdempotentHandlerAspect { - private TransactionTemplate transactionTemplate; - private DuplicateMessageDetector duplicateMessageDetector; - - public IdempotentHandlerAspect(TransactionTemplate transactionTemplate, DuplicateMessageDetector duplicateMessageDetector) { - this.transactionTemplate = transactionTemplate; - this.duplicateMessageDetector = duplicateMessageDetector; - } - - @Around("@annotation(io.eventuate.tram.http.spring.consumer.duplicatedetection.IdempotentHandler)") - public Object check(ProceedingJoinPoint proceedingJoinPoint) { - return transactionTemplate.execute(status -> { - try { - HttpServletRequest request = - ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest(); - - String subscriberId = request.getHeader(EventuateHttpHeaders.SUBSCRIBER_ID); - String messageId = request.getHeader(EventuateHttpHeaders.MESSAGE_ID); - - if (messageId == null || !duplicateMessageDetector.isDuplicate(subscriberId, messageId)) { - return proceedingJoinPoint.proceed(); - } - - return null; - } catch (Throwable t) { - throw new RuntimeException(t); - } - }); - } -} diff --git a/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandlerConfiguration.java b/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandlerConfiguration.java deleted file mode 100644 index 529b5e9..0000000 --- a/eventuate-tram-http-spring-consumer/src/main/java/io/eventuate/tram/http/spring/consumer/duplicatedetection/IdempotentHandlerConfiguration.java +++ /dev/null @@ -1,15 +0,0 @@ -package io.eventuate.tram.http.spring.consumer.duplicatedetection; - -import io.eventuate.tram.consumer.common.DuplicateMessageDetector; -import org.springframework.context.annotation.Bean; -import org.springframework.context.annotation.Configuration; -import org.springframework.transaction.support.TransactionTemplate; - -@Configuration -public class IdempotentHandlerConfiguration { - @Bean - public IdempotentHandlerAspect duplicatePublishingAspect(TransactionTemplate transactionTemplate, - DuplicateMessageDetector duplicateMessageDetector) { - return new IdempotentHandlerAspect(transactionTemplate, duplicateMessageDetector); - } -} diff --git a/eventuate-tram-messaging-proxy-service/build.gradle b/eventuate-tram-messaging-proxy-service/build.gradle index ff8331a..3019c87 100644 --- a/eventuate-tram-messaging-proxy-service/build.gradle +++ b/eventuate-tram-messaging-proxy-service/build.gradle @@ -6,8 +6,6 @@ dependencies { exclude group: "io.micronaut" } - - compile "io.eventuate.tram.core:eventuate-tram-spring-consumer-jdbc:$eventuateTramVersion" compile "io.eventuate.tram.core:eventuate-tram-spring-consumer-kafka:$eventuateTramVersion" compile "io.eventuate.tram.core:eventuate-tram-events:$eventuateTramVersion" compile "io.eventuate.tram.core:eventuate-tram-commands:$eventuateTramVersion" @@ -19,8 +17,6 @@ dependencies { compile('org.apache.curator:curator-recipes:4.2.0') - testCompile project(":eventuate-tram-http-spring-consumer") - testCompile "junit:junit:4.12" testCompile "org.springframework.boot:spring-boot-starter-test" testCompile "io.eventuate.util:eventuate-util-test:$eventuateUtilVersion" diff --git a/eventuate-tram-messaging-proxy-service/src/main/java/io/eventuate/tram/messaging/proxy/service/ProxyConfiguration.java b/eventuate-tram-messaging-proxy-service/src/main/java/io/eventuate/tram/messaging/proxy/service/ProxyConfiguration.java index e6e1e61..dcc7fb7 100644 --- a/eventuate-tram-messaging-proxy-service/src/main/java/io/eventuate/tram/messaging/proxy/service/ProxyConfiguration.java +++ b/eventuate-tram-messaging-proxy-service/src/main/java/io/eventuate/tram/messaging/proxy/service/ProxyConfiguration.java @@ -1,10 +1,9 @@ package io.eventuate.tram.messaging.proxy.service; -import io.eventuate.common.spring.jdbc.EventuateCommonJdbcOperationsConfiguration; import io.eventuate.tram.consumer.common.MessageConsumerImplementation; import io.eventuate.tram.messaging.common.ChannelMapping; import io.eventuate.tram.messaging.common.DefaultChannelMapping; -import io.eventuate.tram.spring.consumer.jdbc.TramConsumerJdbcAutoConfiguration; +import io.eventuate.tram.spring.consumer.common.TramNoopDuplicateMessageDetectorConfiguration; import io.eventuate.tram.spring.consumer.kafka.EventuateTramKafkaMessageConsumerConfiguration; import org.apache.curator.RetryPolicy; import org.apache.curator.framework.CuratorFramework; @@ -19,9 +18,8 @@ import org.springframework.web.client.RestTemplate; @Configuration -@Import({TramConsumerJdbcAutoConfiguration.class, - EventuateTramKafkaMessageConsumerConfiguration.class, - EventuateCommonJdbcOperationsConfiguration.class}) +@Import({EventuateTramKafkaMessageConsumerConfiguration.class, + TramNoopDuplicateMessageDetectorConfiguration.class}) public class ProxyConfiguration { @Bean diff --git a/eventuate-tram-messaging-proxy-service/src/main/resources/application.properties b/eventuate-tram-messaging-proxy-service/src/main/resources/application.properties index c5f4521..790f959 100644 --- a/eventuate-tram-messaging-proxy-service/src/main/resources/application.properties +++ b/eventuate-tram-messaging-proxy-service/src/main/resources/application.properties @@ -1,8 +1,4 @@ # logging.level.root=DEBUG eventuate.http.proxy.zookeeper.connection.string=${DOCKER_HOST_IP:localhost}:2181 -spring.datasource.url=jdbc:mysql://${DOCKER_HOST_IP:localhost}/eventuate -spring.datasource.username=mysqluser -spring.datasource.password=mysqlpw -spring.datasource.driver-class-name=com.mysql.cj.jdbc.Driver eventuatelocal.kafka.bootstrap.servers=${DOCKER_HOST_IP:localhost}:9092 \ No newline at end of file diff --git a/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/EventuateHttpMessageSubscriberTest.java b/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/EventuateHttpMessageSubscriberTest.java index 832bcdb..9c8096b 100644 --- a/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/EventuateHttpMessageSubscriberTest.java +++ b/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/EventuateHttpMessageSubscriberTest.java @@ -4,7 +4,6 @@ import io.eventuate.tram.commands.producer.CommandProducer; import io.eventuate.tram.consumer.http.common.HttpMessage; import io.eventuate.tram.events.publisher.DomainEventPublisher; -import io.eventuate.tram.http.spring.consumer.duplicatedetection.IdempotentHandlerConfiguration; import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.messaging.producer.MessageBuilder; import io.eventuate.tram.messaging.producer.common.MessageProducerImplementation; @@ -13,7 +12,6 @@ import io.eventuate.tram.spring.commands.producer.TramCommandProducerConfiguration; import io.eventuate.tram.spring.events.publisher.TramEventsPublisherConfiguration; import io.eventuate.tram.spring.messaging.producer.jdbc.TramMessageProducerJdbcConfiguration; -import io.eventuate.util.test.async.Eventually; import org.junit.Before; import org.junit.Test; import org.junit.runner.RunWith; @@ -25,7 +23,6 @@ import org.springframework.context.annotation.Configuration; import org.springframework.context.annotation.EnableAspectJAutoProxy; import org.springframework.context.annotation.Import; -import org.springframework.jdbc.core.JdbcTemplate; import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.Collections; @@ -45,8 +42,7 @@ public class EventuateHttpMessageSubscriberTest { EventuateMessageSubscriberConfiguration.class, TramMessageProducerJdbcConfiguration.class, TramEventsPublisherConfiguration.class, - TramCommandProducerConfiguration.class, - IdempotentHandlerConfiguration.class}) + TramCommandProducerConfiguration.class}) @EnableAutoConfiguration @ComponentScan @EnableAspectJAutoProxy @@ -69,9 +65,6 @@ public SubscriptionController subscriptionController() { @Autowired private CommandProducer commandProducer; - @Autowired - private JdbcTemplate jdbcTemplate; - private String messageChannel = "test-channel"; private String commandChannel = "test-command-channel"; private String commandReplyChannel = "test-reply-channel"; @@ -93,14 +86,12 @@ public void init() { public void testMessageHandled() throws InterruptedException { sendMessage(); assertMessage(); - assertMessageCheckedForDuplicate(messageId); } @Test public void testEventHandled() throws InterruptedException { sendEvent(); assertEvent(); - assertMessageCheckedForDuplicate(messageId); } @Test @@ -108,7 +99,6 @@ public void testCommandHandled() throws InterruptedException { sendCommand(); assertCommand(); assertReply(); - assertMessageCheckedForDuplicate(messageId); } private void sendCommand() { @@ -180,11 +170,6 @@ private void assertMessage() throws InterruptedException { assertEquals(messageChannel, message.getHeaders().get(Message.DESTINATION)); } - private void assertMessageCheckedForDuplicate(String id) { - Eventually.eventually(() -> - assertEquals(1, jdbcTemplate.queryForList("select * from eventuate.received_messages where message_id = ?", id).size())); - } - private String generateId() { return UUID.randomUUID().toString(); } diff --git a/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/TestController.java b/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/TestController.java index fd53d26..a904170 100644 --- a/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/TestController.java +++ b/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/TestController.java @@ -8,7 +8,6 @@ import io.eventuate.tram.messaging.common.Message; import io.eventuate.tram.messaging.producer.MessageBuilder; import io.eventuate.tram.messaging.producer.MessageProducer; -import io.eventuate.tram.http.spring.consumer.duplicatedetection.IdempotentHandler; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.web.bind.annotation.PathVariable; import org.springframework.web.bind.annotation.PostMapping; @@ -48,19 +47,16 @@ public BlockingQueue getReceivedReplies() { } @PostMapping(path = "/messages/s3") - @IdempotentHandler public void handleMessages(@RequestBody HttpMessage httpMessage) { receivedMessages.add(httpMessage); } @PostMapping(path = "/events/s4/TestAggregate/{aggregateId}/io.eventuate.tram.messaging.proxy.consumer.TestEvent/{eventId}") - @IdempotentHandler public void handleEvent(@PathVariable String aggregateId, @PathVariable String eventId, @RequestBody TestEvent testEvent) { receivedEvents.add(new TestEventInfo(testEvent, aggregateId, eventId)); } @PostMapping(path = "/commands/d1/{messageId}/io.eventuate.tram.messaging.proxy.consumer.TestCommand/{replyChannel}/test-resource/{value}") - @IdempotentHandler public void handleCommand(@PathVariable String messageId, @PathVariable String replyChannel, @PathVariable String value, diff --git a/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/customereventexample/CustomerEventSubscriberTest.java b/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/customereventexample/CustomerEventSubscriberTest.java index f99bbef..59e6c46 100644 --- a/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/customereventexample/CustomerEventSubscriberTest.java +++ b/eventuate-tram-messaging-proxy-service/src/test/java/io/eventuate/tram/messaging/proxy/consumer/customereventexample/CustomerEventSubscriberTest.java @@ -17,6 +17,7 @@ import org.springframework.test.context.junit4.SpringJUnit4ClassRunner; import java.util.UUID; +import java.util.concurrent.TimeUnit; import static java.util.Collections.singletonList; import static org.junit.Assert.assertEquals; @@ -67,7 +68,7 @@ private void sendEvents() { } private void assertEvents() { - Eventually.eventually(() -> { + Eventually.eventually(60, 500, TimeUnit.MILLISECONDS, () -> { assertEquals(3, customerEventController.getReceivedEvents().size()); CustomerCreditReservedEvent customerCreditReservedEvent = diff --git a/settings.gradle b/settings.gradle index 806a769..db39c07 100644 --- a/settings.gradle +++ b/settings.gradle @@ -1,4 +1,3 @@ include 'eventuate-tram-consumer-http-common' include 'eventuate-tram-messaging-proxy-service' include 'eventuate-tram-consumer-http-micronaut' -include 'eventuate-tram-http-spring-consumer'