Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] Flow Control Transformation for Backpressure Scenarios #1

Open
wants to merge 10 commits into
base: runtimer
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import com.google.gson.Gson;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.client.producer.DefaultMQProducer;
import org.apache.rocketmq.common.message.Message;
import org.apache.rocketmq.common.message.MessageQueue;
import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.dataobject.EventTopicDO;
import org.apache.rocketmq.eventbridge.adapter.persistence.data.mybatis.mapper.EventTopicMapper;
import org.apache.rocketmq.eventbridge.adapter.persistence.rpc.EventDataOnRocketMQConnectAPI;
Expand All @@ -35,6 +38,10 @@
import org.springframework.cache.annotation.Cacheable;
import org.springframework.stereotype.Repository;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

@Repository
@Slf4j
public class RocketMQEventDataRepository implements EventDataRepository {
Expand Down Expand Up @@ -103,4 +110,30 @@ public String getTopicName(String accountId, String eventBusName) {
return topicName;
}

/**
* Rocketmq消息消费算法数据准备
* @param accountId
* @param eventBusName
* @return
* @throws MQClientException
*/
public Map<Integer, Pair<Long, Long>> fetchTopicStats(String accountId, String eventBusName) {
Map<Integer, Pair<Long, Long>> calcuteConsumerLag = new HashMap<>();
String topicName = null;
EventTopicDO eventTopicDO = eventTopicMapper.getTopic(accountId, eventBusName);
if (eventTopicDO != null) {
topicName = eventTopicDO.getName();
} else {
topicName = eventDataOnRocketMQConnectAPI.buildTopicName(accountId, eventBusName);
List<MessageQueue> messageQueueList = null;
try {
messageQueueList = producer.fetchPublishMessageQueues(topicName);
} catch (MQClientException e) {
e.printStackTrace();
}
calcuteConsumerLag = rocketMQMetaService.fetchPartitionInfo(topicName, messageQueueList);
}
return calcuteConsumerLag;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,28 @@

import com.google.common.collect.Maps;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.client.exception.MQBrokerException;
import org.apache.rocketmq.client.exception.MQClientException;

import org.apache.rocketmq.common.message.MessageQueue;

import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
import org.apache.rocketmq.eventbridge.exception.code.DefaultErrorCode;
import org.apache.rocketmq.remoting.exception.RemotingConnectException;
import org.apache.rocketmq.remoting.exception.RemotingException;
import org.apache.rocketmq.remoting.exception.RemotingSendRequestException;
import org.apache.rocketmq.remoting.exception.RemotingTimeoutException;
import org.apache.rocketmq.remoting.protocol.admin.ConsumeStats;
import org.apache.rocketmq.remoting.protocol.admin.OffsetWrapper;
import org.apache.rocketmq.remoting.protocol.admin.TopicOffset;
import org.apache.rocketmq.remoting.protocol.admin.TopicStatsTable;
import org.apache.rocketmq.remoting.protocol.body.ClusterInfo;
import org.apache.rocketmq.remoting.protocol.route.BrokerData;
import org.apache.rocketmq.tools.admin.DefaultMQAdminExt;
Expand Down Expand Up @@ -89,4 +100,45 @@ private Set<String> fetchMasterAndSlaveAddrByClusterName(String clusterName)
}
return brokerAddressSet;
}

/**
* 获取分区信息,计算consumer的消费速度
* 未消费的消息量 ConsumerLag = MaxOffset - consumerOffset
* 正在消费的消息量 InflightMessageCount = PullOffset - ConsumerOffset
* 等待拉取的消息量 AvaliableMessageCount = MaxOffset- PullOffset
* 消费速度 = 正在消费的消息量 / 未消费的消息量 + 正在消费的消息量 + 等待拉取的消息量
* 基于Guava计算限流
* 消费延迟的情况
* - 拉取延时:ConsumerOffset 在每次拉取新消息时更新,而 RocketMQ 是使用长轮询方式更新消息,每次长轮询的默认超时时间是 30s。
* 也就是说如果没有足够数量的消息产生,ConsumerOffset 要 30s 才能更新一次
* - 消费延时:消费者每次提交的 commitOffset 字段是当前还未消费的第一个消息的位点而不是最后一个消费成功的消息的位点。
* 之前的消息未能结束消费的情况下后面已经消费完的消息位点就迟迟得不到更新
* pullOffset的意义为 Broker 在回复消费者拉取消息的响应中有 nextBeginOffset 字段,即当前拉取到的消息的下一条消息的位点
* 5.0之前没有pullOffset,只有在pullOffset与MaxOffset中取最大值
* @param topic
* @param messageQueueList
* @return
*/
public Map<Integer, Pair<Long, Long>> fetchPartitionInfo (String topic, List<MessageQueue> messageQueueList) {
// <ConsumerOffset, <PullOffset,MaxOffset>>
// queueId为consumerOffset
Map<Integer, Pair<Long, Long>> offsetMap =new HashMap<>();
try {
//获取每个分区的最大最小位移
TopicStatsTable topicStatsTable = defaultMQAdminExt.examineTopicStats(topic);
//获取每个队列的
ConsumeStats consumeStats = defaultMQAdminExt.examineConsumeStats("consumerGroup");
Map<MessageQueue, OffsetWrapper> offsetWrapperTable = consumeStats.getOffsetTable();
Map<MessageQueue, TopicOffset> offsetTable = topicStatsTable.getOffsetTable();
for (MessageQueue messageQueue : messageQueueList) {
TopicOffset topicOffset = offsetTable.get(messageQueue);
OffsetWrapper offsetWrapper = offsetWrapperTable.get(messageQueue);
offsetMap.put(messageQueue.getQueueId(), Pair.of(offsetWrapper.getBrokerOffset(), topicOffset.getMaxOffset()));
}

} catch (RemotingException | InterruptedException | MQBrokerException| MQClientException e) {
e.printStackTrace();
}
return offsetMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;

import java.util.List;

Expand All @@ -40,6 +41,7 @@ public class EventBusListener extends ServiceThread {

private final EventSubscriber eventSubscriber;

@Autowired
public EventBusListener(CirculatorContext circulatorContext, EventSubscriber eventSubscriber) {
this.circulatorContext = circulatorContext;
this.eventSubscriber = eventSubscriber;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,12 +24,18 @@
import org.apache.rocketmq.eventbridge.adapter.runtimer.boot.listener.CirculatorContext;
import org.apache.rocketmq.eventbridge.adapter.runtimer.common.ServiceThread;
import org.apache.rocketmq.eventbridge.adapter.runtimer.config.RuntimerConfigDefine;
import org.apache.rocketmq.eventbridge.exception.EventBridgeException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.atomic.AtomicReference;
import java.util.stream.Collectors;

/**
* event target push to sink task
Expand All @@ -42,14 +48,28 @@ public class EventTargetPusher extends ServiceThread{

private final CirculatorContext circulatorContext;

private final Object resultLock;
private final List<ConnectRecord> changeRecordBuffer;
private final int maxBufferSize;

private List<CompletableFuture<String>> completableFutures;
private AtomicReference<EventBridgeException> executionException = new AtomicReference<>();


public EventTargetPusher(CirculatorContext circulatorContext) {
resultLock = new Object();
this.circulatorContext = circulatorContext;
changeRecordBuffer = new ArrayList<>();
this.maxBufferSize = 100;
this.completableFutures = Lists.newArrayList();
}

@Override
public void run() {
while (!stopped) {
ConnectRecord targetRecord = circulatorContext.takeTargetMap();

processRecord(targetRecord);
if (Objects.isNull(targetRecord)) {
logger.info("current target pusher is empty");
this.waitForRunning(1000);
Expand All @@ -59,16 +79,37 @@ public void run() {
logger.debug("start push content by pusher - {}", JSON.toJSONString(targetRecord));
}

ExecutorService executorService = circulatorContext.getExecutorService(targetRecord.getExtensions().getString(RuntimerConfigDefine.TASK_CLASS));
executorService.execute(() -> {
try {
String runnerName = targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);;
sinkTask.put(Lists.newArrayList(targetRecord));
}catch (Exception exception){
logger.error(getServiceName() + " push target exception, record - " + targetRecord + " , stackTrace-", exception);
}
//以异步非阻塞的方式执行批量递送任务
List<ConnectRecord> result = retrieveChanges();
List<CompletableFuture<String>> completableFutures = Lists.newArrayList();
result.forEach(connectRecord -> {
String sinkTaskClass = connectRecord.getExtensions().getString(RuntimerConfigDefine.TASK_CLASS);
//一个sinkTaskClass用一个线程池, 太耗费资源了, 用CompletedFuture异步非阻塞的方式投递
//CompletableFuture中以async结尾的方法将会在一个新的线程中执行组合操作
CompletableFuture<String> taskExecutionResultFuture = CompletableFuture.completedFuture(sinkTaskClass)
.thenCompose(sink -> CompletableFuture.supplyAsync( () -> {
String runnerName = targetRecord.getExtensions().getString(RuntimerConfigDefine.RUNNER_NAME);
SinkTask sinkTask = circulatorContext.getPusherTaskMap().get(runnerName);;
sinkTask.put(Lists.newArrayList(targetRecord));
return sinkTaskClass;
}))
.whenComplete((unused, throwable) -> {
if (throwable != null) {
executionException.compareAndSet(
null,
new EventBridgeException("Error sink Task.", throwable));
}
});
completableFutures.add(taskExecutionResultFuture);
});
try{
CompletableFuture.allOf(completableFutures.toArray(new CompletableFuture[result.size()])).get();
// 是否需要在刷盘的时候通知到一个单独的BlockingQueue, BlockingQueue是否会膨胀
// circulatorContext.offerTargetTaskQueue(result);
logger.info("offer target task queues succeed, transforms - {}", JSON.toJSONString(result));
}catch (Exception exception){
logger.error("transfer event record failed, stackTrace-", exception);
}
}
}

Expand All @@ -77,5 +118,61 @@ public String getServiceName() {
return EventTargetPusher.class.getSimpleName();
}

private void processRecord(ConnectRecord change) {
synchronized (resultLock) {
// wait if the buffer is full
if (changeRecordBuffer.size() >= maxBufferSize) {
try {
resultLock.wait();
} catch (InterruptedException e) {
// ignore
}
} else {
changeRecordBuffer.add(change);
}
}
}

public List<ConnectRecord> retrieveChanges() {
synchronized (resultLock) {
if (isStopped() && executionException.get() == null) {
if (changeRecordBuffer.isEmpty()) {
return new ArrayList<>();
} else {
final List<ConnectRecord> change = new ArrayList<>(changeRecordBuffer);
changeRecordBuffer.clear();
resultLock.notify();
return change;
}
}
else if (!isStopped() && !changeRecordBuffer.isEmpty()) {
final List<ConnectRecord> change = new ArrayList<>(changeRecordBuffer);
changeRecordBuffer.clear();
return change;
}
// no results can be returned anymore
else {
return handleMissingResult();
}
}
}

//异步的方式提交到sink端存在延时,需要将taskExecutionResultFuture的信息做实时判断
private <T> List<ConnectRecord> handleMissingResult() {

// check if the monitoring thread is still there
// we need to wait until we know what is going on
completableFutures = completableFutures.stream().filter( x -> x.isDone()).collect(Collectors.toList());
if (completableFutures.size() < maxBufferSize) {
return new ArrayList<>();
}

if (executionException.get() != null) {
throw executionException.get();
}

// we assume that a task finished
return changeRecordBuffer;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,9 @@
import org.apache.rocketmq.acl.common.AclClientRPCHook;
import org.apache.rocketmq.acl.common.SessionCredentials;
import org.apache.rocketmq.client.AccessChannel;
import org.apache.rocketmq.client.MQAdmin;
import org.apache.rocketmq.client.consumer.DefaultLitePullConsumer;
import org.apache.rocketmq.client.exception.MQClientException;
import org.apache.rocketmq.common.UtilAll;
import org.apache.rocketmq.common.message.MessageExt;
import org.apache.rocketmq.common.utils.NetworkUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,6 +82,7 @@ public Set<TargetRunnerConfig> getLatestTargetRunnerConfig() {
}
}

//添加上游投递的监听器
public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherConfigOnFileService) {
log.info("Watching task file changing:{}", pathName);
int index = pathName.lastIndexOf("/");
Expand All @@ -99,6 +100,7 @@ public void addListen(String pathName, TargetRunnerConfigOnFileObserver pusherCo
if (watchKey != null && !watchKey.pollEvents()
.isEmpty()) {
log.info("Watched the file changed events.");
//判断target-runner.json是否有存在更改
pusherConfigOnFileService.diff();
}
watchKey.reset();
Expand Down Expand Up @@ -139,7 +141,7 @@ private Map<String, TargetRunnerConfig> toMap(Set<TargetRunnerConfig> targetRunn
}

private String getConfigFilePath() {
return this.getClass().getClassLoader().getResource(DEFAULT_TARGET_RUNNER_CONFIG_FILE_NAME).getPath();
return "F:\\gitrepo\\rocketmq-eventbridge\\adapter\\runtimer\\src\\main\\resources\\target-runner.json";
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@
*/
package org.apache.rocketmq.eventbridge.domain.model.bus;

import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.eventbridge.domain.common.exception.EventBridgeErrorCode;
import org.apache.rocketmq.eventbridge.domain.model.AbstractResourceService;
import org.apache.rocketmq.eventbridge.domain.model.PaginationResult;
Expand Down Expand Up @@ -100,4 +104,14 @@ public void checkExist(String accountId, String eventBusName) {
}
}

public Map<Integer, Pair<Long, Long>> fetchConsumeProcessInfo(String accountId, String eventBusName) {
Map<Integer, Pair<Long, Long>> consumerProcessInfo = new HashMap<>();
if (eventBusRepository.getEventBus(accountId, eventBusName) == null) {
return consumerProcessInfo;
} else {
consumerProcessInfo = eventDataRepository.fetchTopicStats(accountId, eventBusName);
}
return consumerProcessInfo;
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,12 @@

package org.apache.rocketmq.eventbridge.domain.repository;

import org.apache.commons.lang3.tuple.Pair;
import org.apache.rocketmq.eventbridge.domain.model.data.PutEventCallback;
import org.apache.rocketmq.eventbridge.event.EventBridgeEvent;
import org.apache.rocketmq.common.*;

import java.util.Map;

public interface EventDataRepository {

Expand Down Expand Up @@ -52,4 +56,14 @@ public interface EventDataRepository {
*/
String getEventBusPersistentContext(String accountId, String eventBusName);


/**
*
* @param accountId
* @param eventBusName
* @return
* @throws MQClientException
*/
public Map<Integer, Pair<Long, Long>> fetchTopicStats(String accountId, String eventBusName);

}