Skip to content

Commit

Permalink
feat(proxy): optimize redis-cluster-mode online/offline graceful on C…
Browse files Browse the repository at this point in the history
…onsensusProxyClusterModeProvider (#367)
  • Loading branch information
caojiajun committed Dec 23, 2024
1 parent c84dd1e commit c876dd3
Show file tree
Hide file tree
Showing 6 changed files with 257 additions and 18 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,199 @@
package com.netease.nim.camellia.redis.proxy.cluster;

import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap;
import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.enums.RedisCommand;
import com.netease.nim.camellia.redis.proxy.enums.RedisKeyword;
import com.netease.nim.camellia.redis.proxy.netty.GlobalRedisProxyEnv;
import com.netease.nim.camellia.redis.proxy.reply.BulkReply;
import com.netease.nim.camellia.redis.proxy.reply.ErrorReply;
import com.netease.nim.camellia.redis.proxy.reply.Reply;
import com.netease.nim.camellia.redis.proxy.upstream.connection.RedisConnection;
import com.netease.nim.camellia.redis.proxy.upstream.connection.RedisConnectionAddr;
import com.netease.nim.camellia.redis.proxy.upstream.connection.RedisConnectionHub;
import com.netease.nim.camellia.redis.proxy.util.Utils;
import com.netease.nim.camellia.tools.utils.SysUtils;
import io.netty.util.concurrent.DefaultThreadFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.HashSet;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.locks.ReentrantLock;

/**
* Created by caojiajun on 2024/12/23
*/
public class ClusterModeCommandMoveInvoker {

private static final Logger logger = LoggerFactory.getLogger(ClusterModeCommandMoveInvoker.class);

private static final int executorSize;
static {
executorSize = Math.max(4, Math.min(SysUtils.getCpuNum(), 8));
}

private final ScheduledExecutorService commandMoveExecutor;

private final Map<ProxyNode, SlotCache> slotCacheMap = new ConcurrentLinkedHashMap.Builder<ProxyNode, SlotCache>()
.initialCapacity(1024)
.maximumWeightedCapacity(1024)
.build();
private final ReentrantLock[] lockArray = new ReentrantLock[32];

private long delayMillis;
private int maxRetry;
private int cacheMillis;
private final ProxyClusterModeProcessor processor;

public ClusterModeCommandMoveInvoker(ProxyClusterModeProcessor processor) {
this.processor = processor;
int poolSize = ProxyDynamicConf.getInt("cluster.mode.command.move.graceful.execute.pool.size", executorSize);
this.commandMoveExecutor = Executors.newScheduledThreadPool(poolSize, new DefaultThreadFactory("command-move-task"));
for (int i=0; i<32; i++) {
lockArray[i] = new ReentrantLock();
}
updateConf();
ProxyDynamicConf.registerCallback(this::updateConf);
}

private void updateConf() {
delayMillis = ClusterModeConfig.clusterModeCommandMoveDelayMillis();
maxRetry = ClusterModeConfig.clusterModeCommandMoveMaxRetry();
cacheMillis = ClusterModeConfig.clusterModeCommandMoveCacheMillis();
}

/**
* graceful command move
* @param slot slot
* @return move reply
*/
public CompletableFuture<Reply> gracefulCommandMove(int slot) {
CompletableFuture<Reply> future = new CompletableFuture<>();
CommandMoveTask task = new CommandMoveTask(processor, delayMillis, maxRetry, slot, future);
commandMoveExecutor.schedule(task, delayMillis, TimeUnit.MILLISECONDS);
return future;
}

private class CommandMoveTask implements Runnable {

private final ProxyClusterModeProcessor processor;
private final long delayMillis;
private final int maxRetry;
private final CompletableFuture<Reply> future;
private final int slot;

private int retry;

public CommandMoveTask(ProxyClusterModeProcessor processor, long delayMillis, int maxRetry, int slot, CompletableFuture<Reply> future) {
this.processor = processor;
this.delayMillis = delayMillis;
this.maxRetry = maxRetry;
this.slot = slot;
this.future = future;
}

@Override
public void run() {
if (!execute()) {
commandMoveExecutor.schedule(this, delayMillis, TimeUnit.MILLISECONDS);
}
}

private boolean execute() {
retry ++;
ProxyClusterSlotMap clusterSlotMap = processor.getSlotMap();
ProxyNode node = clusterSlotMap.getBySlot(slot);
if (retry > maxRetry || checkSlotInProxyNode(clusterSlotMap, node, slot)) {
Reply reply = new ErrorReply("MOVED " + slot + " " + node.getHost() + ":" + node.getPort());
future.complete(reply);
return true;
} else {
return false;
}
}
}

private static class SlotCache {
long updateTime;
Set<Integer> slots;

public SlotCache(long updateTime, Set<Integer> slots) {
this.updateTime = updateTime;
this.slots = slots;
}
}

private boolean checkSlotInProxyNode(ProxyClusterSlotMap clusterSlotMap, ProxyNode node, int slot) {
try {
if (clusterSlotMap.getCurrentNode().equals(node)) {
return true;
}
SlotCache slotCache = slotCacheMap.get(node);
if (slotCache != null && System.currentTimeMillis() - slotCache.updateTime <= cacheMillis) {
return slotCache.slots.contains(slot);
}
int lockIndex = Math.abs(node.toString().hashCode()) % 32;
ReentrantLock lock = lockArray[lockIndex];
lock.lock();
try {
slotCache = slotCacheMap.get(node);
if (slotCache != null && System.currentTimeMillis() - slotCache.updateTime <= cacheMillis) {
return slotCache.slots.contains(slot);
}
RedisConnectionAddr target = new RedisConnectionAddr(node.getHost(), node.getCport(), null, GlobalRedisProxyEnv.getCportPassword());
RedisConnection connection = RedisConnectionHub.getInstance().get(target);
if (connection == null) {
logger.error("checkSlotInProxyNode error, proxyNode = {}, slot = {}, connection null", node, slot);
return false;
}
CompletableFuture<Reply> future = connection.sendCommand(RedisCommand.CLUSTER.raw(), RedisKeyword.NODES.getRaw());
Reply reply = future.get(1000, TimeUnit.MILLISECONDS);
if (reply instanceof ErrorReply) {
logger.error("checkSlotInProxyNode error, proxyNode = {}, slot = {}, error-reply = {}", node, slot, reply);
return false;
}
if (reply instanceof BulkReply) {
String clusterNodes = Utils.bytesToString(((BulkReply) reply).getRaw());
String[] split = clusterNodes.split("\n");
String targetLine = null;
for (String line : split) {
if (line.contains("myself")) {
targetLine = line;
break;
}
}
if (targetLine == null) {
logger.warn("checkSlotInProxyNode error, proxyNode = {}, slot = {}, clusterNodes:\n{}", node, slot, clusterNodes);
return false;
}
String slotStr = (targetLine.split("connected")[1]).trim();
String[] slotArray = slotStr.split(" ");
Set<Integer> slots = new HashSet<>();
for (String str : slotArray) {
String[] startEnd = str.split("-");
for (int i=Integer.parseInt(startEnd[0]); i<=Integer.parseInt(startEnd[1]); i++) {
slots.add(i);
}
}
slotCacheMap.put(node, new SlotCache(System.currentTimeMillis(), slots));
boolean result = slots.contains(slot);
logger.info("checkSlotInProxyNode success, proxyNode = {}, slot = {}, slot.size = {}, result = {}", node, slot, slots.size(), result);
return result;
}
logger.error("checkSlotInProxyNode error, proxyNode = {}, slot = {}, reply = {}", node, slot, reply);
return false;
} finally {
lock.unlock();
}
} catch (Exception e) {
logger.error("checkSlotInProxyNode error, proxyNode = {}, slot = {}", node, slot, e);
return false;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -26,4 +26,17 @@ public static int clusterModeCommandMoveIntervalSeconds() {
public static boolean clusterModeCommandMoveAlways() {
return ProxyDynamicConf.getBoolean("proxy.cluster.mode.command.move.always", false);
}

public static long clusterModeCommandMoveDelayMillis() {
return ProxyDynamicConf.getLong("cluster.mode.command.move.graceful.delay.millis", 100L);
}

public static int clusterModeCommandMoveMaxRetry() {
return ProxyDynamicConf.getInt("cluster.mode.command.move.graceful.max.retry", 5);
}

public static int clusterModeCommandMoveCacheMillis() {
return ProxyDynamicConf.getInt("cluster.mode.command.move.graceful.cache.millis", 50);
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.util.List;
import java.util.*;
import java.util.concurrent.*;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.ReentrantLock;
Expand All @@ -37,12 +37,15 @@ public class DefaultProxyClusterModeProcessor implements ProxyClusterModeProcess
private static final ThreadPoolExecutor heartbeatExecutor = new ThreadPoolExecutor(executorSize, executorSize,
0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000), new DefaultThreadFactory("proxy-heartbeat-receiver"), new ThreadPoolExecutor.AbortPolicy());


private final ReentrantLock initLock = new ReentrantLock();
private final ReentrantLock refreshLock = new ReentrantLock();
private final AtomicBoolean refreshing = new AtomicBoolean(false);

private final ProxyClusterModeProvider provider;

private ClusterModeCommandMoveInvoker commandMoveInvoker;

private ProxyClusterSlotMap clusterSlotMap;

private Reply clusterInfo;
Expand All @@ -64,6 +67,7 @@ private void init() {
try {
if (init) return;
provider.init();
commandMoveInvoker = new ClusterModeCommandMoveInvoker(this);
refresh();
provider.addSlotMapChangeListener(this::refresh);
reloadConf();
Expand Down Expand Up @@ -125,7 +129,7 @@ private void refresh() {
* @return reply
*/
@Override
public Reply isCommandMove(Command command) {
public CompletableFuture<Reply> isCommandMove(Command command) {
try {
if (!init) return null;
if (!clusterModeCommandMoveEnable) return null;//不开启move,则直接返回null
Expand All @@ -135,20 +139,23 @@ public Reply isCommandMove(Command command) {
return null;
}
if (clusterSlotMap == null) {
return ErrorReply.NOT_AVAILABLE;
return CompletableFuture.completedFuture(ErrorReply.NOT_AVAILABLE);
}
int lasSlot = -1;
boolean move = false;
for (byte[] key : keys) {
int slot = RedisClusterCRC16Utils.getSlot(key);
if (lasSlot != -1 && lasSlot != slot) {
return ErrorReply.CROSS_SLOT_ERROR;
return CompletableFuture.completedFuture(ErrorReply.CROSS_SLOT_ERROR);
}
lasSlot = slot;
if (clusterSlotMap.isSlotInCurrentNode(slot)) {
continue;
}
ProxyNode node = clusterSlotMap.getBySlot(slot);
return new ErrorReply("MOVED " + slot + " " + node.getHost() + ":" + node.getPort());
move = true;
}
if (move) {
return commandMoveInvoker.gracefulCommandMove(lasSlot);
}
return null;
}
Expand All @@ -169,7 +176,7 @@ public Reply isCommandMove(Command command) {
if (!clusterSlotMap.isSlotInCurrentNode(slot)) {
ProxyNode node = clusterSlotMap.getBySlot(slot);
channelInfo.setLastCommandMoveTime(TimeCache.currentMillis);//记录一下上一次move的时间
return new ErrorReply("MOVED " + slot + " " + node.getHost() + ":" + node.getPort());
return CompletableFuture.completedFuture(new ErrorReply("MOVED " + slot + " " + node.getHost() + ":" + node.getPort()));
}
return null;
} catch (Exception e) {
Expand Down Expand Up @@ -255,4 +262,13 @@ public ProxyNode getCurrentNode() {
public List<ProxyNode> getOnlineNodes() {
return clusterSlotMap.getOnlineNodes();
}

/**
* 获取slot-map
* @return slot-map
*/
@Override
public ProxyClusterSlotMap getSlotMap() {
return clusterSlotMap;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ public interface ProxyClusterModeProcessor {
* @param command Command
* @return reply
*/
Reply isCommandMove(Command command);
CompletableFuture<Reply> isCommandMove(Command command);

/**
* cluster相关命令
Expand All @@ -35,4 +35,10 @@ public interface ProxyClusterModeProcessor {
* @return 节点列表
*/
List<ProxyNode> getOnlineNodes();

/**
* 获取slot-map
* @return slot-map
*/
ProxyClusterSlotMap getSlotMap();
}
Original file line number Diff line number Diff line change
Expand Up @@ -171,7 +171,7 @@ private ProxyClusterSlotMap getSlotMapFromLeader() {
if (currentNodeLeader()) {
return ProxyClusterSlotMapUtils.localSlotMap(current(), leaderSelector.getSlotMap());
}
reply = sendCmd(leader, ClusterModeCmd.send_get_slot_map_from_leader, "{}");
reply = sync(sendCmd(leader, ClusterModeCmd.send_get_slot_map_from_leader, "{}"));
if (reply instanceof BulkReply) {
String data = Utils.bytesToString(((BulkReply) reply).getRaw());
return ProxyClusterSlotMapUtils.localSlotMap(current(), ProxyClusterSlotMap.parseString(data));
Expand Down Expand Up @@ -359,7 +359,7 @@ private void sendHeartbeatToLeader0() {
}
JSONObject data = new JSONObject();
data.put("status", ClusterModeStatus.getStatus().getValue());
Reply reply = sendCmd(targetLeader, ClusterModeCmd.send_heartbeat_to_leader, data.toString());
Reply reply = sync(sendCmd(targetLeader, ClusterModeCmd.send_heartbeat_to_leader, data.toString()));
if (reply instanceof ErrorReply) {
logger.error("send heartbeat to leader error, leader = {}, error = {}", targetLeader, ((ErrorReply) reply).getError());
}
Expand All @@ -386,7 +386,7 @@ private boolean checkNode(ProxyNode node, boolean strict) {
data.put("md5", slotMap.getMd5());
}
try {
Reply reply = sendCmd(node, ClusterModeCmd.send_heartbeat_to_follower, data.toString());
Reply reply = sync(sendCmd(node, ClusterModeCmd.send_heartbeat_to_follower, data.toString()));
if (reply instanceof ErrorReply) {
logger.error("send heartbeat to follower error, follower = {}, error = {}", node, ((ErrorReply) reply).getError());
}
Expand Down Expand Up @@ -441,20 +441,25 @@ private void refreshSlotMap(ProxyClusterSlotMap slotMap, List<ProxyNode> onlineN
data.put("md5", newSlotMap.getMd5());
data.put("slotMap", newSlotMap.toString());
logger.info("send_slot_map_to_follower, target = {}, md5 = {}, requestId = {}", node, newSlotMap.getMd5(), requestId);
sendCmd(node, ClusterModeCmd.send_slot_map_to_follower, data.toString());
CompletableFuture<Reply> future = sendCmd(node, ClusterModeCmd.send_slot_map_to_follower, data.toString());
future.thenAccept(reply -> {
if (reply instanceof ErrorReply) {
logger.error("send_slot_map_to_follower error, target = {}, md5 = {}, requestId = {}, reply = {}",
node, newSlotMap.getMd5(), requestId, reply);
}
});
} catch (Exception e) {
logger.error("send_slot_map_to_follower error, target = {}, md5 = {}, requestId = {}", node, newSlotMap.getMd5(), requestId, e);
}
}
}

private Reply sendCmd(ProxyNode target, ClusterModeCmd cmd, String data) {
private CompletableFuture<Reply> sendCmd(ProxyNode target, ClusterModeCmd cmd, String data) {
try {
byte[][] args = new byte[][]{RedisCommand.CLUSTER.raw(), RedisKeyword.PROXY_HEARTBEAT.getRaw(), Utils.stringToBytes(current().toString()),
Utils.stringToBytes(String.valueOf(cmd.getValue())), Utils.stringToBytes(data)};
RedisConnection connection = RedisConnectionHub.getInstance().get(toAddr(target));
CompletableFuture<Reply> future = connection.sendCommand(args);
return sync(future);
return connection.sendCommand(args);
} catch (Exception e) {
logger.error("send cmd error, target = {}, cmd = {}, data = {}", target, cmd, data, e);
throw new CamelliaRedisException(e);
Expand Down
Loading

0 comments on commit c876dd3

Please sign in to comment.