From c876dd3da91d3b31a75feb5ba1efd2066e047c46 Mon Sep 17 00:00:00 2001 From: caojiajun Date: Mon, 23 Dec 2024 14:19:42 +0800 Subject: [PATCH] feat(proxy): optimize redis-cluster-mode online/offline graceful on ConsensusProxyClusterModeProvider (#367) --- .../ClusterModeCommandMoveInvoker.java | 199 ++++++++++++++++++ .../proxy/cluster/ClusterModeConfig.java | 13 ++ .../DefaultProxyClusterModeProcessor.java | 30 ++- .../cluster/ProxyClusterModeProcessor.java | 8 +- .../ConsensusProxyClusterModeProvider.java | 19 +- .../proxy/command/CommandsTransponder.java | 6 +- 6 files changed, 257 insertions(+), 18 deletions(-) create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ClusterModeCommandMoveInvoker.java diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ClusterModeCommandMoveInvoker.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ClusterModeCommandMoveInvoker.java new file mode 100644 index 000000000..2da15605a --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ClusterModeCommandMoveInvoker.java @@ -0,0 +1,199 @@ +package com.netease.nim.camellia.redis.proxy.cluster; + +import com.googlecode.concurrentlinkedhashmap.ConcurrentLinkedHashMap; +import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf; +import com.netease.nim.camellia.redis.proxy.enums.RedisCommand; +import com.netease.nim.camellia.redis.proxy.enums.RedisKeyword; +import com.netease.nim.camellia.redis.proxy.netty.GlobalRedisProxyEnv; +import com.netease.nim.camellia.redis.proxy.reply.BulkReply; +import com.netease.nim.camellia.redis.proxy.reply.ErrorReply; +import com.netease.nim.camellia.redis.proxy.reply.Reply; +import com.netease.nim.camellia.redis.proxy.upstream.connection.RedisConnection; +import com.netease.nim.camellia.redis.proxy.upstream.connection.RedisConnectionAddr; +import com.netease.nim.camellia.redis.proxy.upstream.connection.RedisConnectionHub; +import com.netease.nim.camellia.redis.proxy.util.Utils; +import com.netease.nim.camellia.tools.utils.SysUtils; +import io.netty.util.concurrent.DefaultThreadFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.util.HashSet; +import java.util.Map; +import java.util.Set; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.Executors; +import java.util.concurrent.ScheduledExecutorService; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.locks.ReentrantLock; + +/** + * Created by caojiajun on 2024/12/23 + */ +public class ClusterModeCommandMoveInvoker { + + private static final Logger logger = LoggerFactory.getLogger(ClusterModeCommandMoveInvoker.class); + + private static final int executorSize; + static { + executorSize = Math.max(4, Math.min(SysUtils.getCpuNum(), 8)); + } + + private final ScheduledExecutorService commandMoveExecutor; + + private final Map slotCacheMap = new ConcurrentLinkedHashMap.Builder() + .initialCapacity(1024) + .maximumWeightedCapacity(1024) + .build(); + private final ReentrantLock[] lockArray = new ReentrantLock[32]; + + private long delayMillis; + private int maxRetry; + private int cacheMillis; + private final ProxyClusterModeProcessor processor; + + public ClusterModeCommandMoveInvoker(ProxyClusterModeProcessor processor) { + this.processor = processor; + int poolSize = ProxyDynamicConf.getInt("cluster.mode.command.move.graceful.execute.pool.size", executorSize); + this.commandMoveExecutor = Executors.newScheduledThreadPool(poolSize, new DefaultThreadFactory("command-move-task")); + for (int i=0; i<32; i++) { + lockArray[i] = new ReentrantLock(); + } + updateConf(); + ProxyDynamicConf.registerCallback(this::updateConf); + } + + private void updateConf() { + delayMillis = ClusterModeConfig.clusterModeCommandMoveDelayMillis(); + maxRetry = ClusterModeConfig.clusterModeCommandMoveMaxRetry(); + cacheMillis = ClusterModeConfig.clusterModeCommandMoveCacheMillis(); + } + + /** + * graceful command move + * @param slot slot + * @return move reply + */ + public CompletableFuture gracefulCommandMove(int slot) { + CompletableFuture future = new CompletableFuture<>(); + CommandMoveTask task = new CommandMoveTask(processor, delayMillis, maxRetry, slot, future); + commandMoveExecutor.schedule(task, delayMillis, TimeUnit.MILLISECONDS); + return future; + } + + private class CommandMoveTask implements Runnable { + + private final ProxyClusterModeProcessor processor; + private final long delayMillis; + private final int maxRetry; + private final CompletableFuture future; + private final int slot; + + private int retry; + + public CommandMoveTask(ProxyClusterModeProcessor processor, long delayMillis, int maxRetry, int slot, CompletableFuture future) { + this.processor = processor; + this.delayMillis = delayMillis; + this.maxRetry = maxRetry; + this.slot = slot; + this.future = future; + } + + @Override + public void run() { + if (!execute()) { + commandMoveExecutor.schedule(this, delayMillis, TimeUnit.MILLISECONDS); + } + } + + private boolean execute() { + retry ++; + ProxyClusterSlotMap clusterSlotMap = processor.getSlotMap(); + ProxyNode node = clusterSlotMap.getBySlot(slot); + if (retry > maxRetry || checkSlotInProxyNode(clusterSlotMap, node, slot)) { + Reply reply = new ErrorReply("MOVED " + slot + " " + node.getHost() + ":" + node.getPort()); + future.complete(reply); + return true; + } else { + return false; + } + } + } + + private static class SlotCache { + long updateTime; + Set slots; + + public SlotCache(long updateTime, Set slots) { + this.updateTime = updateTime; + this.slots = slots; + } + } + + private boolean checkSlotInProxyNode(ProxyClusterSlotMap clusterSlotMap, ProxyNode node, int slot) { + try { + if (clusterSlotMap.getCurrentNode().equals(node)) { + return true; + } + SlotCache slotCache = slotCacheMap.get(node); + if (slotCache != null && System.currentTimeMillis() - slotCache.updateTime <= cacheMillis) { + return slotCache.slots.contains(slot); + } + int lockIndex = Math.abs(node.toString().hashCode()) % 32; + ReentrantLock lock = lockArray[lockIndex]; + lock.lock(); + try { + slotCache = slotCacheMap.get(node); + if (slotCache != null && System.currentTimeMillis() - slotCache.updateTime <= cacheMillis) { + return slotCache.slots.contains(slot); + } + RedisConnectionAddr target = new RedisConnectionAddr(node.getHost(), node.getCport(), null, GlobalRedisProxyEnv.getCportPassword()); + RedisConnection connection = RedisConnectionHub.getInstance().get(target); + if (connection == null) { + logger.error("checkSlotInProxyNode error, proxyNode = {}, slot = {}, connection null", node, slot); + return false; + } + CompletableFuture future = connection.sendCommand(RedisCommand.CLUSTER.raw(), RedisKeyword.NODES.getRaw()); + Reply reply = future.get(1000, TimeUnit.MILLISECONDS); + if (reply instanceof ErrorReply) { + logger.error("checkSlotInProxyNode error, proxyNode = {}, slot = {}, error-reply = {}", node, slot, reply); + return false; + } + if (reply instanceof BulkReply) { + String clusterNodes = Utils.bytesToString(((BulkReply) reply).getRaw()); + String[] split = clusterNodes.split("\n"); + String targetLine = null; + for (String line : split) { + if (line.contains("myself")) { + targetLine = line; + break; + } + } + if (targetLine == null) { + logger.warn("checkSlotInProxyNode error, proxyNode = {}, slot = {}, clusterNodes:\n{}", node, slot, clusterNodes); + return false; + } + String slotStr = (targetLine.split("connected")[1]).trim(); + String[] slotArray = slotStr.split(" "); + Set slots = new HashSet<>(); + for (String str : slotArray) { + String[] startEnd = str.split("-"); + for (int i=Integer.parseInt(startEnd[0]); i<=Integer.parseInt(startEnd[1]); i++) { + slots.add(i); + } + } + slotCacheMap.put(node, new SlotCache(System.currentTimeMillis(), slots)); + boolean result = slots.contains(slot); + logger.info("checkSlotInProxyNode success, proxyNode = {}, slot = {}, slot.size = {}, result = {}", node, slot, slots.size(), result); + return result; + } + logger.error("checkSlotInProxyNode error, proxyNode = {}, slot = {}, reply = {}", node, slot, reply); + return false; + } finally { + lock.unlock(); + } + } catch (Exception e) { + logger.error("checkSlotInProxyNode error, proxyNode = {}, slot = {}", node, slot, e); + return false; + } + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ClusterModeConfig.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ClusterModeConfig.java index 626b4a4db..2ade5754a 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ClusterModeConfig.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ClusterModeConfig.java @@ -26,4 +26,17 @@ public static int clusterModeCommandMoveIntervalSeconds() { public static boolean clusterModeCommandMoveAlways() { return ProxyDynamicConf.getBoolean("proxy.cluster.mode.command.move.always", false); } + + public static long clusterModeCommandMoveDelayMillis() { + return ProxyDynamicConf.getLong("cluster.mode.command.move.graceful.delay.millis", 100L); + } + + public static int clusterModeCommandMoveMaxRetry() { + return ProxyDynamicConf.getInt("cluster.mode.command.move.graceful.max.retry", 5); + } + + public static int clusterModeCommandMoveCacheMillis() { + return ProxyDynamicConf.getInt("cluster.mode.command.move.graceful.cache.millis", 50); + } + } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/DefaultProxyClusterModeProcessor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/DefaultProxyClusterModeProcessor.java index 36312dccd..88ce5ca04 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/DefaultProxyClusterModeProcessor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/DefaultProxyClusterModeProcessor.java @@ -17,7 +17,7 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; -import java.util.List; +import java.util.*; import java.util.concurrent.*; import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.locks.ReentrantLock; @@ -37,12 +37,15 @@ public class DefaultProxyClusterModeProcessor implements ProxyClusterModeProcess private static final ThreadPoolExecutor heartbeatExecutor = new ThreadPoolExecutor(executorSize, executorSize, 0, TimeUnit.SECONDS, new LinkedBlockingQueue<>(10000), new DefaultThreadFactory("proxy-heartbeat-receiver"), new ThreadPoolExecutor.AbortPolicy()); + private final ReentrantLock initLock = new ReentrantLock(); private final ReentrantLock refreshLock = new ReentrantLock(); private final AtomicBoolean refreshing = new AtomicBoolean(false); private final ProxyClusterModeProvider provider; + private ClusterModeCommandMoveInvoker commandMoveInvoker; + private ProxyClusterSlotMap clusterSlotMap; private Reply clusterInfo; @@ -64,6 +67,7 @@ private void init() { try { if (init) return; provider.init(); + commandMoveInvoker = new ClusterModeCommandMoveInvoker(this); refresh(); provider.addSlotMapChangeListener(this::refresh); reloadConf(); @@ -125,7 +129,7 @@ private void refresh() { * @return reply */ @Override - public Reply isCommandMove(Command command) { + public CompletableFuture isCommandMove(Command command) { try { if (!init) return null; if (!clusterModeCommandMoveEnable) return null;//不开启move,则直接返回null @@ -135,20 +139,23 @@ public Reply isCommandMove(Command command) { return null; } if (clusterSlotMap == null) { - return ErrorReply.NOT_AVAILABLE; + return CompletableFuture.completedFuture(ErrorReply.NOT_AVAILABLE); } int lasSlot = -1; + boolean move = false; for (byte[] key : keys) { int slot = RedisClusterCRC16Utils.getSlot(key); if (lasSlot != -1 && lasSlot != slot) { - return ErrorReply.CROSS_SLOT_ERROR; + return CompletableFuture.completedFuture(ErrorReply.CROSS_SLOT_ERROR); } lasSlot = slot; if (clusterSlotMap.isSlotInCurrentNode(slot)) { continue; } - ProxyNode node = clusterSlotMap.getBySlot(slot); - return new ErrorReply("MOVED " + slot + " " + node.getHost() + ":" + node.getPort()); + move = true; + } + if (move) { + return commandMoveInvoker.gracefulCommandMove(lasSlot); } return null; } @@ -169,7 +176,7 @@ public Reply isCommandMove(Command command) { if (!clusterSlotMap.isSlotInCurrentNode(slot)) { ProxyNode node = clusterSlotMap.getBySlot(slot); channelInfo.setLastCommandMoveTime(TimeCache.currentMillis);//记录一下上一次move的时间 - return new ErrorReply("MOVED " + slot + " " + node.getHost() + ":" + node.getPort()); + return CompletableFuture.completedFuture(new ErrorReply("MOVED " + slot + " " + node.getHost() + ":" + node.getPort())); } return null; } catch (Exception e) { @@ -255,4 +262,13 @@ public ProxyNode getCurrentNode() { public List getOnlineNodes() { return clusterSlotMap.getOnlineNodes(); } + + /** + * 获取slot-map + * @return slot-map + */ + @Override + public ProxyClusterSlotMap getSlotMap() { + return clusterSlotMap; + } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ProxyClusterModeProcessor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ProxyClusterModeProcessor.java index 20c4c9c29..09b1e0c4c 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ProxyClusterModeProcessor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/ProxyClusterModeProcessor.java @@ -15,7 +15,7 @@ public interface ProxyClusterModeProcessor { * @param command Command * @return reply */ - Reply isCommandMove(Command command); + CompletableFuture isCommandMove(Command command); /** * cluster相关命令 @@ -35,4 +35,10 @@ public interface ProxyClusterModeProcessor { * @return 节点列表 */ List getOnlineNodes(); + + /** + * 获取slot-map + * @return slot-map + */ + ProxyClusterSlotMap getSlotMap(); } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/provider/ConsensusProxyClusterModeProvider.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/provider/ConsensusProxyClusterModeProvider.java index a28ad164f..6e0009776 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/provider/ConsensusProxyClusterModeProvider.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/cluster/provider/ConsensusProxyClusterModeProvider.java @@ -171,7 +171,7 @@ private ProxyClusterSlotMap getSlotMapFromLeader() { if (currentNodeLeader()) { return ProxyClusterSlotMapUtils.localSlotMap(current(), leaderSelector.getSlotMap()); } - reply = sendCmd(leader, ClusterModeCmd.send_get_slot_map_from_leader, "{}"); + reply = sync(sendCmd(leader, ClusterModeCmd.send_get_slot_map_from_leader, "{}")); if (reply instanceof BulkReply) { String data = Utils.bytesToString(((BulkReply) reply).getRaw()); return ProxyClusterSlotMapUtils.localSlotMap(current(), ProxyClusterSlotMap.parseString(data)); @@ -359,7 +359,7 @@ private void sendHeartbeatToLeader0() { } JSONObject data = new JSONObject(); data.put("status", ClusterModeStatus.getStatus().getValue()); - Reply reply = sendCmd(targetLeader, ClusterModeCmd.send_heartbeat_to_leader, data.toString()); + Reply reply = sync(sendCmd(targetLeader, ClusterModeCmd.send_heartbeat_to_leader, data.toString())); if (reply instanceof ErrorReply) { logger.error("send heartbeat to leader error, leader = {}, error = {}", targetLeader, ((ErrorReply) reply).getError()); } @@ -386,7 +386,7 @@ private boolean checkNode(ProxyNode node, boolean strict) { data.put("md5", slotMap.getMd5()); } try { - Reply reply = sendCmd(node, ClusterModeCmd.send_heartbeat_to_follower, data.toString()); + Reply reply = sync(sendCmd(node, ClusterModeCmd.send_heartbeat_to_follower, data.toString())); if (reply instanceof ErrorReply) { logger.error("send heartbeat to follower error, follower = {}, error = {}", node, ((ErrorReply) reply).getError()); } @@ -441,20 +441,25 @@ private void refreshSlotMap(ProxyClusterSlotMap slotMap, List onlineN data.put("md5", newSlotMap.getMd5()); data.put("slotMap", newSlotMap.toString()); logger.info("send_slot_map_to_follower, target = {}, md5 = {}, requestId = {}", node, newSlotMap.getMd5(), requestId); - sendCmd(node, ClusterModeCmd.send_slot_map_to_follower, data.toString()); + CompletableFuture future = sendCmd(node, ClusterModeCmd.send_slot_map_to_follower, data.toString()); + future.thenAccept(reply -> { + if (reply instanceof ErrorReply) { + logger.error("send_slot_map_to_follower error, target = {}, md5 = {}, requestId = {}, reply = {}", + node, newSlotMap.getMd5(), requestId, reply); + } + }); } catch (Exception e) { logger.error("send_slot_map_to_follower error, target = {}, md5 = {}, requestId = {}", node, newSlotMap.getMd5(), requestId, e); } } } - private Reply sendCmd(ProxyNode target, ClusterModeCmd cmd, String data) { + private CompletableFuture sendCmd(ProxyNode target, ClusterModeCmd cmd, String data) { try { byte[][] args = new byte[][]{RedisCommand.CLUSTER.raw(), RedisKeyword.PROXY_HEARTBEAT.getRaw(), Utils.stringToBytes(current().toString()), Utils.stringToBytes(String.valueOf(cmd.getValue())), Utils.stringToBytes(data)}; RedisConnection connection = RedisConnectionHub.getInstance().get(toAddr(target)); - CompletableFuture future = connection.sendCommand(args); - return sync(future); + return connection.sendCommand(args); } catch (Exception e) { logger.error("send cmd error, target = {}, cmd = {}, data = {}", target, cmd, data, e); throw new CamelliaRedisException(e); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/command/CommandsTransponder.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/command/CommandsTransponder.java index feb963301..772a3107a 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/command/CommandsTransponder.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/command/CommandsTransponder.java @@ -418,9 +418,9 @@ public void transpond(ChannelInfo channelInfo, List commands) { } if (clusterModeProcessor != null) { - Reply moveReply = clusterModeProcessor.isCommandMove(command); - if (moveReply != null) { - task.replyCompleted(moveReply); + CompletableFuture future = clusterModeProcessor.isCommandMove(command); + if (future != null) { + future.thenAccept(task::replyCompleted); hasCommandsSkip = true; continue; }