Skip to content

Commit

Permalink
Merge branch 'master' of https://github.com/jovezhao/nest
Browse files Browse the repository at this point in the history
  • Loading branch information
jovezhao committed Feb 9, 2022
2 parents 3e47136 + 8f44b01 commit d738acf
Show file tree
Hide file tree
Showing 3 changed files with 31 additions and 3 deletions.
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.zhaofujun.nest.context.appservice;


import com.zhaofujun.nest.context.event.delay.DelayMessageBacklog;
import com.zhaofujun.nest.context.event.message.MessageInfo;
import com.zhaofujun.nest.context.model.BaseEntity;
import com.zhaofujun.nest.standard.Identifier;
Expand Down Expand Up @@ -46,5 +47,9 @@ public <T extends BaseEntity> T getEntity(Class tClass, Identifier identifier) {
public void addMessageBacklog(String messageGroup, MessageInfo messageInfo) {
this.unitOfWork.addMessageBacklog(messageGroup, messageInfo);
}

public void addDelayMessageBacklog(DelayMessageBacklog delayMessageBacklog) {
this.unitOfWork.addDelayMessageBacklog(delayMessageBacklog);
}
}

Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,9 @@
import com.zhaofujun.nest.context.event.EventConfiguration;
import com.zhaofujun.nest.context.event.channel.MessageChannelProviderFactory;
import com.zhaofujun.nest.context.event.channel.distribute.DistributeMessageChannel;
import com.zhaofujun.nest.context.event.delay.DelayMessageBacklog;
import com.zhaofujun.nest.context.event.delay.DelayMessageStore;
import com.zhaofujun.nest.context.event.delay.DelayMessageStoreFactory;
import com.zhaofujun.nest.context.event.message.MessageBacklog;
import com.zhaofujun.nest.context.event.message.MessageConverterFactory;
import com.zhaofujun.nest.context.event.message.MessageInfo;
Expand Down Expand Up @@ -100,7 +103,7 @@ private void commitEntity() {
clearCache(repositoryMap, cacheClient);
throw ex;
}

}

private void clearCache(Map<Repository, Map<EntityOperateEnum, List<BaseEntity>>> repositoryMap, CacheClient cacheClient) {
Expand All @@ -115,6 +118,9 @@ private void clearCache(Map<Repository, Map<EntityOperateEnum, List<BaseEntity>>

private Set<MessageBacklog> messageBacklogs = new HashSet<>();


private Set<DelayMessageBacklog> delayMessageBacklogs = new HashSet<>();

public void addMessageBacklog(String eventCode, MessageInfo messageInfo) {
String messageInfoString = MessageConverterFactory.create().messageToString(messageInfo);
messageBacklogs.add(new MessageBacklog(eventCode, messageInfoString,messageInfo.getData().getClass().getName(),messageInfo.getMessageId()));
Expand All @@ -137,6 +143,15 @@ private void commitMessage() {
messageResendStore.add(p);
}
});
delayMessageBacklogs.forEach(p -> {
try {
DelayMessageStore delayMessageStore = DelayMessageStoreFactory.create();
delayMessageStore.add(p);
} catch (Exception ex) {
//考虑一般情况下,DelayMessageStore和MessageResendStore存储中间件相同,所以加到待发送区域也应该会失败,所以暂不做重试逻辑
logger.warn("提交延时消息失败,失败原因", ex);
}
});
}

void commit() {
Expand Down Expand Up @@ -174,6 +189,7 @@ void commit() {
throw new SystemException("提交工作单元时失败", ex);
} finally {
messageBacklogs.clear();
delayMessageBacklogs.clear();
}

NestApplication.current().committed(serviceContext);
Expand Down Expand Up @@ -245,6 +261,10 @@ private void entityNotify(EntityOperateEnum operateEnum, List<BaseEntity> entiti


}

public void addDelayMessageBacklog(DelayMessageBacklog delayMessageBacklog) {
delayMessageBacklogs.add(delayMessageBacklog);
}
}


Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.zhaofujun.nest.context.event;

import com.zhaofujun.nest.NestApplication;
import com.zhaofujun.nest.context.appservice.ServiceContextManager;
import com.zhaofujun.nest.context.event.channel.MessageChannelProvider;
import com.zhaofujun.nest.context.event.channel.MessageChannelProviderFactory;
import com.zhaofujun.nest.context.event.channel.MessageConsumer;
Expand All @@ -22,6 +23,7 @@
public class DefaultEventBus implements EventBus {


@Override
public void publish(EventData eventData, String eventSource, long delaySecond) {


Expand All @@ -38,8 +40,8 @@ public void publish(EventData eventData, String eventSource, long delaySecond) {
MessageProducer messageProducer = messageChannel.getMessageProducer();
messageProducer.send(eventData.getEventCode(), messageInfo);
} else {
DelayMessageStore delayMessageStore = DelayMessageStoreFactory.create();
delayMessageStore.add(new DelayMessageBacklog(new MessageBacklog(eventData.getEventCode(), messageString,eventData.getClass().getName(),messageInfo.getMessageId()), LocalDateTime.now().plusSeconds(delaySecond)));
//延时事件先提交到当前上下文,待事务提交后再save到DelayMessageStore,保证事务提交成功再发送消息
ServiceContextManager.get().addDelayMessageBacklog(new DelayMessageBacklog(new MessageBacklog(eventData.getEventCode(), messageString, eventData.getClass().getName(), messageInfo.getMessageId()), LocalDateTime.now().plusSeconds(delaySecond)));
}
}

Expand All @@ -48,6 +50,7 @@ private EventConfiguration getEventConfigurationByEventCode(String eventCode) {
}


@Override
public void registerHandler(EventHandler eventHandler) {

EventConfiguration eventConfiguration = getEventConfigurationByEventCode(eventHandler.getEventCode());
Expand Down

0 comments on commit d738acf

Please sign in to comment.