diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQEventDataRepository.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQEventDataRepository.java index 9566f386..8edf2b67 100644 --- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQEventDataRepository.java +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQEventDataRepository.java @@ -20,8 +20,11 @@ import com.google.gson.Gson; import lombok.SneakyThrows; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.client.producer.DefaultMQProducer; import org.apache.rocketmq.common.message.Message; +import org.apache.rocketmq.common.message.MessageQueue; import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.dataobject.EventTopicDO; import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.mapper.EventTopicMapper; import org.apache.rocketmq.eventbridge.adapter.persistence.rpc.EventDataOnRocketMQConnectAPI; @@ -35,6 +38,10 @@ import org.springframework.cache.annotation.Cacheable; import org.springframework.stereotype.Repository; +import java.util.HashMap; +import java.util.List; +import java.util.Map; + @Repository @Slf4j public class RocketMQEventDataRepository implements EventDataRepository { @@ -103,4 +110,30 @@ public String getTopicName(String accountId, String eventBusName) { return topicName; } + /** + * Rocketmq消息消费算法数据准备 + * @param accountId + * @param eventBusName + * @return + * @throws MQClientException + */ + public Map> fetchTopicStats(String accountId, String eventBusName) { + Map> calcuteConsumerLag = new HashMap<>(); + String topicName = null; + EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName); + if (eventTopicDO != null) { + topicName = eventTopicDO.getName(); + } else { + topicName = eventDataOnRocketMQConnectAPI.buildTopicName(accountId, eventBusName); + List messageQueueList = null; + try { + messageQueueList = producer.fetchPublishMessageQueues(topicName); + } catch (MQClientException e) { + e.printStackTrace(); + } + calcuteConsumerLag = rocketMQMetaService.fetchPartitionInfo(topicName, messageQueueList); + } + return calcuteConsumerLag; + } + } diff --git a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java index bdd586ea..24b4aa47 100644 --- a/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java +++ b/adapter/persistence/src/main/java/org/apache/rocketmq/eventbridge/adapter/persistence/data/rocketmq/repository/RocketMQMetaService.java @@ -18,17 +18,28 @@ import com.google.common.collect.Maps; import java.util.Collection; +import java.util.HashMap; import java.util.HashSet; import java.util.List; +import java.util.Map; import java.util.Set; import lombok.extern.slf4j.Slf4j; +import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.client.exception.MQBrokerException; import org.apache.rocketmq.client.exception.MQClientException; + +import org.apache.rocketmq.common.message.MessageQueue; + import org.apache.rocketmq.eventbridge.exception.EventBridgeException; import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode; import org.apache.rocketmq.remoting.exception.RemotingConnectException; +import org.apache.rocketmq.remoting.exception.RemotingException; import org.apache.rocketmq.remoting.exception.RemotingSendRequestException; import org.apache.rocketmq.remoting.exception.RemotingTimeoutException; +import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats; +import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper; +import org.apache.rocketmq.remoting.protocol.admin.TopicOffset; +import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable; import org.apache.rocketmq.remoting.protocol.body.ClusterInfo; import org.apache.rocketmq.remoting.protocol.route.BrokerData; import org.apache.rocketmq.tools.admin.DefaultMQAdminExt; @@ -89,4 +100,45 @@ private Set fetchMasterAndSlaveAddrByClusterName(String clusterName) } return brokerAddressSet; } + + /** + * 获取分区信息,计算consumer的消费速度 + * 未消费的消息量 ConsumerLag = MaxOffset - consumerOffset + * 正在消费的消息量 InflightMessageCount = PullOffset - ConsumerOffset + * 等待拉取的消息量 AvaliableMessageCount = MaxOffset- PullOffset + * 消费速度 = 正在消费的消息量 / 未消费的消息量 + 正在消费的消息量 + 等待拉取的消息量 + * 基于Guava计算限流 + * 消费延迟的情况 + * - 拉取延时:ConsumerOffset 在每次拉取新消息时更新,而 RocketMQ 是使用长轮询方式更新消息,每次长轮询的默认超时时间是 30s。 + * 也就是说如果没有足够数量的消息产生,ConsumerOffset 要 30s 才能更新一次 + * - 消费延时:消费者每次提交的 commitOffset 字段是当前还未消费的第一个消息的位点而不是最后一个消费成功的消息的位点。 + * 之前的消息未能结束消费的情况下后面已经消费完的消息位点就迟迟得不到更新 + * pullOffset的意义为 Broker 在回复消费者拉取消息的响应中有 nextBeginOffset 字段,即当前拉取到的消息的下一条消息的位点 + * 5.0之前没有pullOffset,只有在pullOffset与MaxOffset中取最大值 + * @param topic + * @param messageQueueList + * @return + */ + public Map> fetchPartitionInfo (String topic, List messageQueueList) { + // > + // queueId为consumerOffset + Map> offsetMap =new HashMap<>(); + try { + //获取每个分区的最大最小位移 + TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic); + //获取每个队列的 + ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats("consumerGroup"); + Map offsetWrapperTable = consumeStats.getOffsetTable(); + Map offsetTable = topicStatsTable.getOffsetTable(); + for (MessageQueue messageQueue : messageQueueList) { + TopicOffset topicOffset = offsetTable.get(messageQueue); + OffsetWrapper offsetWrapper = offsetWrapperTable.get(messageQueue); + offsetMap.put(messageQueue.getQueueId(), Pair.of(offsetWrapper.getBrokerOffset(), topicOffset.getMaxOffset())); + } + + } catch (RemotingException | InterruptedException | MQBrokerException| MQClientException e) { + e.printStackTrace(); + } + return offsetMap; + } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java index 7a34aa33..90af35d4 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventBusListener.java @@ -24,6 +24,7 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import org.springframework.beans.factory.annotation.Autowired; import java.util.List; @@ -40,6 +41,7 @@ public class EventBusListener extends ServiceThread { private final EventSubscriber eventSubscriber; + @Autowired public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber) { this.circulatorContext = circulatorContext; this.eventSubscriber = eventSubscriber; diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java index 43e6b29b..df0369f8 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/EventTargetPusher.java @@ -24,12 +24,18 @@ import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext; import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread; import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine; +import org.apache.rocketmq.eventbridge.exception.EventBridgeException; import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import java.util.ArrayList; +import java.util.List; import java.util.Map; import java.util.Objects; +import java.util.concurrent.CompletableFuture; import java.util.concurrent.ExecutorService; +import java.util.concurrent.atomic.AtomicReference; +import java.util.stream.Collectors; /** * event target push to sink task @@ -42,14 +48,28 @@ public class EventTargetPusher extends ServiceThread{ private final CirculatorContext circulatorContext; + private final Object resultLock; + private final List changeRecordBuffer; + private final int maxBufferSize; + + private List> completableFutures; + private AtomicReference executionException = new AtomicReference<>(); + + public EventTargetPusher(CirculatorContext circulatorContext) { + resultLock = new Object(); this.circulatorContext = circulatorContext; + changeRecordBuffer = new ArrayList<>(); + this.maxBufferSize = 100; + this.completableFutures = Lists.newArrayList(); } @Override public void run() { while (!stopped) { ConnectRecord targetRecord = circulatorContext.takeTargetMap(); + + processRecord(targetRecord); if (Objects.isNull(targetRecord)) { logger.info("current target pusher is empty"); this.waitForRunning(1000); @@ -59,16 +79,37 @@ public void run() { logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecord)); } - ExecutorService executorService = circulatorContext.getExecutorService(targetRecord.getExtensions().getString(RuntimerConfigDefine.TASK_CLASS)); - executorService.execute(() -> { - try { - String runnerName = targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME); - SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);; - sinkTask.put(Lists.newArrayList(targetRecord)); - }catch (Exception exception){ - logger.error(getServiceName() + " push target exception, record - " + targetRecord + " , stackTrace-", exception); - } + //以异步非阻塞的方式执行批量递送任务 + List result = retrieveChanges(); + List> completableFutures = Lists.newArrayList(); + result.forEach(connectRecord -> { + String sinkTaskClass = connectRecord.getExtensions().getString(RuntimerConfigDefine.TASK_CLASS); + //一个sinkTaskClass用一个线程池, 太耗费资源了, 用CompletedFuture异步非阻塞的方式投递 + //CompletableFuture中以async结尾的方法将会在一个新的线程中执行组合操作 + CompletableFuture taskExecutionResultFuture = CompletableFuture.completedFuture(sinkTaskClass) + .thenCompose(sink -> CompletableFuture.supplyAsync( () -> { + String runnerName = targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME); + SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);; + sinkTask.put(Lists.newArrayList(targetRecord)); + return sinkTaskClass; + })) + .whenComplete((unused, throwable) -> { + if (throwable != null) { + executionException.compareAndSet( + null, + new EventBridgeException("Error sink Task.", throwable)); + } + }); + completableFutures.add(taskExecutionResultFuture); }); + try{ + CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[result.size()])).get(); + // 是否需要在刷盘的时候通知到一个单独的BlockingQueue, BlockingQueue是否会膨胀 + // circulatorContext.offerTargetTaskQueue(result); + logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(result)); + }catch (Exception exception){ + logger.error("transfer event record failed, stackTrace-", exception); + } } } @@ -77,5 +118,61 @@ public String getServiceName() { return EventTargetPusher.class.getSimpleName(); } + private void processRecord(ConnectRecord change) { + synchronized (resultLock) { + // wait if the buffer is full + if (changeRecordBuffer.size() >= maxBufferSize) { + try { + resultLock.wait(); + } catch (InterruptedException e) { + // ignore + } + } else { + changeRecordBuffer.add(change); + } + } + } + + public List retrieveChanges() { + synchronized (resultLock) { + if (isStopped() && executionException.get() == null) { + if (changeRecordBuffer.isEmpty()) { + return new ArrayList<>(); + } else { + final List change = new ArrayList<>(changeRecordBuffer); + changeRecordBuffer.clear(); + resultLock.notify(); + return change; + } + } + else if (!isStopped() && !changeRecordBuffer.isEmpty()) { + final List change = new ArrayList<>(changeRecordBuffer); + changeRecordBuffer.clear(); + return change; + } + // no results can be returned anymore + else { + return handleMissingResult(); + } + } + } + + //异步的方式提交到sink端存在延时,需要将taskExecutionResultFuture的信息做实时判断 + private List handleMissingResult() { + + // check if the monitoring thread is still there + // we need to wait until we know what is going on + completableFutures = completableFutures.stream().filter( x -> x.isDone()).collect(Collectors.toList()); + if (completableFutures.size() < maxBufferSize) { + return new ArrayList<>(); + } + + if (executionException.get() != null) { + throw executionException.get(); + } + + // we assume that a task finished + return changeRecordBuffer; + } } diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java index 10d25ac8..4cf844f7 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/boot/listener/RocketMQEventSubscriber.java @@ -34,6 +34,9 @@ import org.apache.rocketmq.acl.common.AclClientRPCHook; import org.apache.rocketmq.acl.common.SessionCredentials; import org.apache.rocketmq.client.AccessChannel; +import org.apache.rocketmq.client.MQAdmin; +import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer; +import org.apache.rocketmq.client.exception.MQClientException; import org.apache.rocketmq.common.UtilAll; import org.apache.rocketmq.common.message.MessageExt; import org.apache.rocketmq.common.utils.NetworkUtil; diff --git a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java index 63f1e9d8..745beb91 100644 --- a/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java +++ b/adapter/runtimer/src/main/java/org/apache/rocketmq/eventbridge/adapter/runtimer/service/TargetRunnerConfigOnFileObserver.java @@ -82,6 +82,7 @@ public Set getLatestTargetRunnerConfig() { } } + //添加上游投递的监听器 public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherConfigOnFileService) { log.info("Watching task file changing:{}", pathName); int index = pathName.lastIndexOf("/"); @@ -99,6 +100,7 @@ public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherCo if (watchKey != null && !watchKey.pollEvents() .isEmpty()) { log.info("Watched the file changed events."); + //判断target-runner.json是否有存在更改 pusherConfigOnFileService.diff(); } watchKey.reset(); @@ -139,7 +141,7 @@ private Map toMap(Set targetRunn } private String getConfigFilePath() { - return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath(); + return "F:\\gitrepo\\rocketmq-eventbridge\\adapter\\runtimer\\src\\main\\resources\\target-runner.json"; } } \ No newline at end of file diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/bus/EventBusService.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/bus/EventBusService.java index eadea4ff..258a3ff4 100644 --- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/bus/EventBusService.java +++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/model/bus/EventBusService.java @@ -16,7 +16,11 @@ */ package org.apache.rocketmq.eventbridge.domain.model.bus; +import java.util.HashMap; import java.util.List; +import java.util.Map; + +import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode; import org.apache.rocketmq.eventbridge.domain.model.AbstractResourceService; import org.apache.rocketmq.eventbridge.domain.model.PaginationResult; @@ -100,4 +104,14 @@ public void checkExist(String accountId, String eventBusName) { } } + public Map> fetchConsumeProcessInfo(String accountId, String eventBusName) { + Map> consumerProcessInfo = new HashMap<>(); + if (eventBusRepository.getEventBus(accountId, eventBusName) == null) { + return consumerProcessInfo; + } else { + consumerProcessInfo = eventDataRepository.fetchTopicStats(accountId, eventBusName); + } + return consumerProcessInfo; + } + } \ No newline at end of file diff --git a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventDataRepository.java b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventDataRepository.java index 0d775f61..19846055 100644 --- a/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventDataRepository.java +++ b/domain/src/main/java/org/apache/rocketmq/eventbridge/domain/repository/EventDataRepository.java @@ -17,8 +17,12 @@ package org.apache.rocketmq.eventbridge.domain.repository; +import org.apache.commons.lang3.tuple.Pair; import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback; import org.apache.rocketmq.eventbridge.event.EventBridgeEvent; +import org.apache.rocketmq.common.*; + +import java.util.Map; public interface EventDataRepository { @@ -52,4 +56,14 @@ public interface EventDataRepository { */ String getEventBusPersistentContext(String accountId, String eventBusName); + + /** + * + * @param accountId + * @param eventBusName + * @return + * @throws MQClientException + */ + public Map> fetchTopicStats(String accountId, String eventBusName); + }