diff --git a/api/pom.xml b/api/pom.xml index f22ffef8..4cdd7364 100644 --- a/api/pom.xml +++ b/api/pom.xml @@ -22,6 +22,7 @@ src/main/java/ca/bc/gov/educ/api/trax/scheduler/**, src/main/java/ca/bc/gov/educ/api/trax/exception/**, src/main/java/ca/bc/gov/educ/api/trax/model/**, + src/main/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorService, src/main/java/ca/bc/gov/educ/api/trax/util/**, src/main/java/ca/bc/gov/educ/api/trax/repository/** diff --git a/api/src/main/java/ca/bc/gov/educ/api/trax/choreographer/ChoreographEventHandler.java b/api/src/main/java/ca/bc/gov/educ/api/trax/choreographer/ChoreographEventHandler.java index b06a9f0b..246bfa36 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/trax/choreographer/ChoreographEventHandler.java +++ b/api/src/main/java/ca/bc/gov/educ/api/trax/choreographer/ChoreographEventHandler.java @@ -10,6 +10,7 @@ import org.springframework.lang.NonNull; import org.springframework.stereotype.Component; +import java.time.Duration; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -25,19 +26,20 @@ @Component @Slf4j public class ChoreographEventHandler { - private final Executor singleTaskExecutor = new EnhancedQueueExecutor.Builder() - .setThreadFactory(new ThreadFactoryBuilder().setNameFormat("task-executor-%d").build()) - .setCorePoolSize(1).setMaximumPoolSize(1).build(); + private final Executor eventExecutor; private final Map eventServiceMap; public ChoreographEventHandler(final List eventServices) { this.eventServiceMap = new HashMap<>(); + this.eventExecutor = new EnhancedQueueExecutor.Builder() + .setThreadFactory(new ThreadFactoryBuilder().setNameFormat("event-executor-%d").build()) + .setCorePoolSize(10).setMaximumPoolSize(20).setKeepAliveTime(Duration.ofSeconds(60)).build(); eventServices.forEach(eventService -> this.eventServiceMap.put(eventService.getEventType(), eventService)); } public void handleEvent(@NonNull final Event event) { //only one thread will process all the request. since RDB won't handle concurrent requests. - this.singleTaskExecutor.execute(() -> { + this.eventExecutor.execute(() -> { try { switch (event.getEventType()) { case "GRAD_STUDENT_GRADUATED": diff --git a/api/src/main/java/ca/bc/gov/educ/api/trax/messaging/jetstream/Subscriber.java b/api/src/main/java/ca/bc/gov/educ/api/trax/messaging/jetstream/Subscriber.java index 0d61d97c..a8fa6e2a 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/trax/messaging/jetstream/Subscriber.java +++ b/api/src/main/java/ca/bc/gov/educ/api/trax/messaging/jetstream/Subscriber.java @@ -38,9 +38,7 @@ @Slf4j public class Subscriber { - private final Executor subscriberExecutor = new EnhancedQueueExecutor.Builder() - .setThreadFactory(new ThreadFactoryBuilder().setNameFormat("jet-stream-subscriber-%d").build()) - .setCorePoolSize(2).setMaximumPoolSize(2).setKeepAliveTime(Duration.ofMillis(1000)).build(); + private final Executor subscriberExecutor; private final EventHandlerDelegatorService eventHandlerDelegatorService; private final Map> streamTopicsMap = new HashMap<>(); // one stream can have multiple topics. private final Connection natsConnection; @@ -57,6 +55,9 @@ public Subscriber(final Connection natsConnection, final EventHandlerDelegatorSe this.eventHandlerDelegatorService = eventHandlerDelegatorService; this.natsConnection = natsConnection; this.constants = constants; + this.subscriberExecutor = new EnhancedQueueExecutor.Builder() + .setThreadFactory(new ThreadFactoryBuilder().setNameFormat("jet-stream-subscriber-%d").build()) + .setCorePoolSize(10).setMaximumPoolSize(10).setKeepAliveTime(Duration.ofSeconds(60)).build(); this.initializeStreamTopicMap(); } diff --git a/api/src/main/java/ca/bc/gov/educ/api/trax/scheduler/JetStreamEventScheduler.java b/api/src/main/java/ca/bc/gov/educ/api/trax/scheduler/JetStreamEventScheduler.java index b727e41d..89d7f90e 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/trax/scheduler/JetStreamEventScheduler.java +++ b/api/src/main/java/ca/bc/gov/educ/api/trax/scheduler/JetStreamEventScheduler.java @@ -13,6 +13,8 @@ import org.springframework.scheduling.annotation.Scheduled; import org.springframework.stereotype.Component; +import java.time.LocalDateTime; + import static ca.bc.gov.educ.api.trax.constant.EventStatus.DB_COMMITTED; @Component @@ -54,12 +56,13 @@ public JetStreamEventScheduler(final EventRepository eventRepository, @Scheduled(cron = "${cron.scheduled.process.events.grad-to-trax.run}") @SchedulerLock(name = "PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.grad-to-trax.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.grad-to-trax.lockAtMostFor}") public void findAndProcessEvents() { - log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getGradToTraxCronRun(), constants.getGradToTraxLockAtMostFor()); LockAssert.assertLocked(); + log.debug("PROCESS_CHOREOGRAPHED_EVENTS_FROM_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getGradToTraxCronRun(), constants.getGradToTraxLockAtMostFor()); final var results = this.eventRepository.findAllByEventStatusOrderByCreateDate(DB_COMMITTED.toString()); if (!results.isEmpty()) { + var filteredList = results.stream().filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5))).toList(); int cnt = 0; - for (Event e : results) { + for (Event e : filteredList) { if (cnt++ >= constants.getGradToTraxProcessingThreshold()) { log.info(" ==> Reached the processing threshold of {}", constants.getGradToTraxProcessingThreshold()); break; @@ -77,12 +80,13 @@ public void findAndProcessEvents() { @Scheduled(cron = "${cron.scheduled.process.events.trax-to-grad.run}") @SchedulerLock(name = "PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM", lockAtLeastFor = "${cron.scheduled.process.events.trax-to-grad.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.events.trax-to-grad.lockAtMostFor}") public void findAndPublishGradStatusEventsToJetStream() { - log.debug("PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor()); LockAssert.assertLocked(); + log.debug("PUBLISH_TRAX_UPDATED_EVENTS_TO_JET_STREAM: started - cron {}, lockAtMostFor {}", constants.getTraxToGradCronRun(), constants.getTraxToGradLockAtMostFor()); final var results = this.traxUpdatedPubEventRepository.findByEventStatusOrderByCreateDate(DB_COMMITTED.toString()); if (!results.isEmpty()) { + var filteredList = results.stream().filter(el -> el.getUpdateDate().isBefore(LocalDateTime.now().minusMinutes(5))).toList(); int cnt = 0; - for (TraxUpdatedPubEvent el : results) { + for (TraxUpdatedPubEvent el : filteredList) { if (cnt++ >= constants.getTraxToGradProcessingThreshold()) { log.info(" ==> Reached the processing threshold of {}", constants.getTraxToGradProcessingThreshold()); break; diff --git a/api/src/main/java/ca/bc/gov/educ/api/trax/scheduler/TraxUpdateTriggeredRecordScheduler.java b/api/src/main/java/ca/bc/gov/educ/api/trax/scheduler/TraxUpdateTriggeredRecordScheduler.java index 6d87b0e3..39d05b6c 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/trax/scheduler/TraxUpdateTriggeredRecordScheduler.java +++ b/api/src/main/java/ca/bc/gov/educ/api/trax/scheduler/TraxUpdateTriggeredRecordScheduler.java @@ -25,8 +25,8 @@ public TraxUpdateTriggeredRecordScheduler(final TraxUpdateService traxUpdateServ @Scheduled(cron = "${cron.scheduled.process.trigger-jobs.read-trax-update.run}") // every 5 minute @SchedulerLock(name = "PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS", lockAtLeastFor = "${cron.scheduled.process.trigger-jobs.read-trax-update.lockAtLeastFor}", lockAtMostFor = "${cron.scheduled.process.trigger-jobs.read-trax-update.lockAtMostFor}") public void scheduledRunForTraxUpdates() { - log.debug("PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS: started - cron {}, lockAtMostFor {}", constants.getTraxTriggersCronRun(), constants.getTraxTriggersLockAtMostFor()); LockAssert.assertLocked(); + log.debug("PROCESS_TRAX_UPDATE_IN_GRAD_RECORDS: started - cron {}, lockAtMostFor {}", constants.getTraxTriggersCronRun(), constants.getTraxTriggersLockAtMostFor()); final var results = traxUpdateService.getOutstandingList(); if (!results.isEmpty()) { int cnt = 0; diff --git a/api/src/main/java/ca/bc/gov/educ/api/trax/service/ChoreographedEventPersistenceService.java b/api/src/main/java/ca/bc/gov/educ/api/trax/service/ChoreographedEventPersistenceService.java index 46e6f2db..533bd86d 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/trax/service/ChoreographedEventPersistenceService.java +++ b/api/src/main/java/ca/bc/gov/educ/api/trax/service/ChoreographedEventPersistenceService.java @@ -53,7 +53,7 @@ public Event persistEventToDB(final ChoreographedEvent choreographedEvent) throw .createDate(LocalDateTime.now()) .updateDate(LocalDateTime.now()) .build(); - return eventRepository.save(event); + return this.eventRepository.save(event); } /** diff --git a/api/src/main/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorService.java b/api/src/main/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorService.java index 62c12ac8..a6c217e0 100644 --- a/api/src/main/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorService.java +++ b/api/src/main/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorService.java @@ -45,16 +45,12 @@ public void handleChoreographyEvent(@NonNull final ChoreographedEvent choreograp try { if (message.getSubject().equalsIgnoreCase(TRAX_UPDATE_EVENT_TOPIC.toString())) { this.choreographedEventPersistenceService.updateEventStatus(choreographedEvent); - if (message.isJetStream()) { - message.ack(); - log.warn("acknowledged to Jet Stream for TRAX UPDATE EVENT sent..."); - } + message.ack(); + log.warn("acknowledged to Jet Stream for TRAX UPDATE EVENT sent..."); } else { final var persistedEvent = this.choreographedEventPersistenceService.persistEventToDB(choreographedEvent); - if (message.isJetStream()) { - message.ack(); // acknowledge to Jet Stream that api got the message and it is now in DB. - log.warn("acknowledged to Jet Stream for GRAD STATUS EVENT received..."); - } + message.ack(); // acknowledge to Jet Stream that api got the message and it is now in DB. + log.warn("acknowledged to Jet Stream for GRAD STATUS EVENT received..."); this.choreographer.handleEvent(persistedEvent); } } catch (final BusinessException businessException) { diff --git a/api/src/test/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorServiceTest.java b/api/src/test/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorServiceTest.java deleted file mode 100644 index f03395da..00000000 --- a/api/src/test/java/ca/bc/gov/educ/api/trax/service/EventHandlerDelegatorServiceTest.java +++ /dev/null @@ -1,107 +0,0 @@ -package ca.bc.gov.educ.api.trax.service; - -import ca.bc.gov.educ.api.trax.choreographer.ChoreographEventHandler; -import ca.bc.gov.educ.api.trax.constant.EventOutcome; -import ca.bc.gov.educ.api.trax.constant.EventType; -import ca.bc.gov.educ.api.trax.constant.Topics; -import ca.bc.gov.educ.api.trax.exception.BusinessException; -import ca.bc.gov.educ.api.trax.messaging.NatsConnection; -import ca.bc.gov.educ.api.trax.messaging.jetstream.Publisher; -import ca.bc.gov.educ.api.trax.messaging.jetstream.Subscriber; -import ca.bc.gov.educ.api.trax.model.dto.ChoreographedEvent; -import ca.bc.gov.educ.api.trax.model.entity.Event; -import ca.bc.gov.educ.api.trax.model.entity.TraxUpdatedPubEvent; -import io.nats.client.Message; -import io.nats.client.impl.NatsMessage; -import org.junit.Test; -import org.junit.runner.RunWith; -import org.mockito.Mockito; -import org.springframework.beans.factory.annotation.Autowired; -import org.springframework.boot.test.context.SpringBootTest; -import org.springframework.boot.test.mock.mockito.MockBean; -import org.springframework.test.context.ActiveProfiles; -import org.springframework.test.context.junit4.SpringRunner; - -import java.io.IOException; -import java.util.UUID; - -import static ca.bc.gov.educ.api.trax.constant.EventStatus.DB_COMMITTED; -import static org.assertj.core.api.Assertions.assertThatNoException; - -@RunWith(SpringRunner.class) -@SpringBootTest -@ActiveProfiles("test") -public class EventHandlerDelegatorServiceTest { - - @Autowired - EventHandlerDelegatorService eventHandlerDelegatorService; - - @MockBean - ChoreographedEventPersistenceService choreographedEventPersistenceService; - - @MockBean - ChoreographEventHandler choreographer; - - // NATS - @MockBean - private NatsConnection natsConnection; - - @MockBean - private Publisher publisher; - - @MockBean - private Subscriber subscriber; - - @Test - public void testProcessEvent_givenEventAndMessage() throws IOException, BusinessException { - UUID eventId = UUID.randomUUID(); - - ChoreographedEvent choreographedEvent = new ChoreographedEvent(); - choreographedEvent.setEventID(eventId); - choreographedEvent.setEventType(EventType.GRAD_STUDENT_UPDATED); - - Event savedEvent = new Event(); - savedEvent.setEventType(EventType.GRAD_STUDENT_UPDATED.toString()); - savedEvent.setEventStatus(DB_COMMITTED.toString()); - savedEvent.setEventId(eventId); - savedEvent.setEventOutcome(EventOutcome.GRAD_STATUS_UPDATED.toString()); - savedEvent.setReplicationEventId(UUID.randomUUID()); - - Mockito.when(choreographedEventPersistenceService.persistEventToDB(choreographedEvent)).thenReturn(savedEvent); - - Message reply = NatsMessage.builder() - .subject(Topics.GRAD_STATUS_EVENT_TOPIC.toString()) - .data(savedEvent.getEventPayloadBytes()) - .build(); - - this.eventHandlerDelegatorService.handleChoreographyEvent(choreographedEvent, reply); - assertThatNoException(); - } - - @Test - public void testProcessEvent_givenTraxUpdatedEventAndMessage() throws IOException { - UUID eventId = UUID.randomUUID(); - - ChoreographedEvent choreographedEvent = new ChoreographedEvent(); - choreographedEvent.setEventID(eventId); - choreographedEvent.setEventType(EventType.UPD_GRAD); - - TraxUpdatedPubEvent savedEvent = new TraxUpdatedPubEvent(); - savedEvent.setEventType(EventType.UPD_GRAD.toString()); - savedEvent.setEventStatus(DB_COMMITTED.toString()); - savedEvent.setEventId(eventId); - savedEvent.setEventOutcome(EventOutcome.TRAX_STUDENT_MASTER_UPDATED.toString()); - - Mockito.doNothing().when(choreographedEventPersistenceService).updateEventStatus(choreographedEvent); - - Message reply = NatsMessage.builder() - .subject(Topics.TRAX_UPDATE_EVENT_TOPIC.toString()) - .data(savedEvent.getEventPayloadBytes()) - .build(); - - - this.eventHandlerDelegatorService.handleChoreographyEvent(choreographedEvent, reply); - assertThatNoException(); - } - -}