From d07777443043d9feefacece06288ee1aa0b82861 Mon Sep 17 00:00:00 2001 From: caojiajun Date: Fri, 10 Jan 2025 17:51:24 +0800 Subject: [PATCH] feat: add warm logic (#364) --- .../local/storage/key/KeyReadWrite.java | 19 ++++++++++----- .../storage/key/block/KeyBlockReadWrite.java | 21 +++++++++++++++++ .../local/storage/key/slot/IKeyManifest.java | 7 ++++++ .../local/storage/key/slot/KeyManifest.java | 5 ++++ .../storage/key/slot/SlotKeyReadWrite.java | 22 +++++++++--------- .../storage/value/block/IValueManifest.java | 10 ++++++++ .../storage/value/block/ValueManifest.java | 10 ++++---- .../string/block/StringBlockReadWrite.java | 23 +++++++++++++++++++ 8 files changed, 96 insertions(+), 21 deletions(-) diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java index 38047b5c1..a6df69567 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java @@ -37,18 +37,25 @@ private SlotKeyReadWrite get(short slot) { } public ValueWrapper getForRunToCompletion(short slot, Key key) { - KeyInfo keyInfo = cache.get(key); - if (keyInfo != null) { - if (keyInfo == KeyInfo.DELETE) { + KeyInfo keyInfo1 = cache.get(key); + if (keyInfo1 != null) { + if (keyInfo1 == KeyInfo.DELETE) { return () -> null; } - if (keyInfo.isExpire()) { + if (keyInfo1.isExpire()) { cache.put(key, KeyInfo.DELETE); return () -> null; } - return () -> keyInfo; + return () -> keyInfo1; + } + ValueWrapper valueWrapper = get(slot).getForRunToCompletion(key); + if (valueWrapper != null) { + KeyInfo keyInfo2 = valueWrapper.get(); + if (keyInfo2 != null && keyInfo2.isExpire()) { + return () -> null; + } } - return get(slot).getForRunToCompletion(key); + return valueWrapper; } public KeyInfo get(short slot, Key key) throws IOException { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/KeyBlockReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/KeyBlockReadWrite.java index 56cb5aaad..fa22b7c26 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/KeyBlockReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/KeyBlockReadWrite.java @@ -10,9 +10,12 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot.IKeyManifest; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot.SlotInfo; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.util.KeyHashUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Map; +import java.util.Set; import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants.*; @@ -21,6 +24,8 @@ */ public class KeyBlockReadWrite implements IKeyBlockReadWrite { + private static final Logger logger = LoggerFactory.getLogger(KeyBlockReadWrite.class); + private static final String READ_CACHE_CONFIG_KEY = "local.storage.key.block.read.cache.capacity"; private static final String WRITE_CACHE_CONFIG_KEY = "local.storage.key.block.write.cache.capacity"; @@ -34,12 +39,28 @@ public KeyBlockReadWrite(IKeyManifest keyManifest) { this.keyManifest = keyManifest; this.readCache = new LRUCache<>("key-read-block-cache", READ_CACHE_CONFIG_KEY, "32M", _4k, SizeCalculator.STRING_INSTANCE, SizeCalculator.BYTES_INSTANCE); this.writeCache = new LRUCache<>("key-write-block-cache", WRITE_CACHE_CONFIG_KEY, "32M", _4k, SizeCalculator.STRING_INSTANCE, SizeCalculator.BYTES_INSTANCE); + warm(); } private String file(long fileId) { return FileNames.keyFile(keyManifest.dir(), fileId); } + private void warm() { + try { + Set fileIds = keyManifest.getFileIds(); + for (Long fileId : fileIds) { + try { + readBlocks(fileId, 0, _4k); + } catch (Exception e) { + logger.error("warm error, fileId = {}", fileId, e); + } + } + } catch (Exception e) { + logger.error("warm error"); + } + } + @Override public KeyInfo get(short slot, Key key) throws IOException { SlotInfo slotInfo = keyManifest.get(slot); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/IKeyManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/IKeyManifest.java index da8b5524a..0392ed508 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/IKeyManifest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/IKeyManifest.java @@ -1,6 +1,7 @@ package com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot; import java.io.IOException; +import java.util.Set; /** * Created by caojiajun on 2025/1/6 @@ -13,6 +14,12 @@ public interface IKeyManifest { */ String dir(); + /** + * get fileId list + * @return list + */ + Set getFileIds(); + /** * init and load * 初始化 diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/KeyManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/KeyManifest.java index 5e6791595..52b88b2c4 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/KeyManifest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/KeyManifest.java @@ -105,6 +105,11 @@ public void load() throws IOException { } } + @Override + public Set getFileIds() { + return new HashSet<>(fileBitsMap.keySet()); + } + @Override public SlotInfo get(short slot) { readWriteLock.readLock().lock(); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java index 0400d52e3..8aa2f9105 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java @@ -28,17 +28,17 @@ public class SlotKeyReadWrite { private final short slot; private final KeyFlushExecutor executor; - private final KeyBlockReadWrite blockCache; + private final KeyBlockReadWrite keyBlockReadWrite; private final Map mutable = new HashMap<>(); private final Map immutable = new HashMap<>(); private volatile FlushStatus flushStatus = FlushStatus.FLUSH_OK; - public SlotKeyReadWrite(short slot, KeyFlushExecutor executor, KeyBlockReadWrite blockCache) { + public SlotKeyReadWrite(short slot, KeyFlushExecutor executor, KeyBlockReadWrite keyBlockReadWrite) { this.slot = slot; this.executor = executor; - this.blockCache = blockCache; + this.keyBlockReadWrite = keyBlockReadWrite; } /** @@ -55,7 +55,7 @@ public KeyInfo get(Key key) throws IOException { if (keyInfo == KeyInfo.DELETE) { return null; } - keyInfo = blockCache.get(slot, key); + keyInfo = keyBlockReadWrite.get(slot, key); if (keyInfo == KeyInfo.DELETE) { return null; } @@ -76,7 +76,7 @@ public KeyInfo getForCompact(Key key) throws IOException { if (keyInfo == KeyInfo.DELETE) { return null; } - keyInfo = blockCache.getForCompact(slot, key); + keyInfo = keyBlockReadWrite.getForCompact(slot, key); if (keyInfo == KeyInfo.DELETE) { return null; } @@ -89,16 +89,16 @@ public KeyInfo getForCompact(Key key) throws IOException { * @return key */ public ValueWrapper getForRunToCompletion(Key key) { - KeyInfo keyInfo = mutable.get(key); - if (keyInfo == KeyInfo.DELETE) { + KeyInfo keyInfo1 = mutable.get(key); + if (keyInfo1 == KeyInfo.DELETE) { return () -> null; } - keyInfo = immutable.get(key); - if (keyInfo == KeyInfo.DELETE) { + KeyInfo keyInfo2 = immutable.get(key); + if (keyInfo2 == KeyInfo.DELETE) { return () -> null; } - if (keyInfo.isExpire()) { - return () -> null; + if (keyInfo2 != null) { + return () -> keyInfo2; } return null; } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/IValueManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/IValueManifest.java index cfe8b05f7..0cf807099 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/IValueManifest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/IValueManifest.java @@ -2,7 +2,9 @@ import java.io.IOException; import java.util.ArrayList; +import java.util.HashMap; import java.util.List; +import java.util.Map; /** * Created by caojiajun on 2025/1/6 @@ -24,6 +26,14 @@ default String dir() { default void load() throws IOException { } + /** + * get fileId list + * @return list + */ + default Map getFileIds() { + return new HashMap<>(); + } + /** * allocate block * @param slot slot diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/ValueManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/ValueManifest.java index bfdb9b938..e0eaf8a29 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/ValueManifest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/ValueManifest.java @@ -11,10 +11,7 @@ import java.nio.channels.FileChannel; import java.nio.file.Paths; import java.nio.file.StandardOpenOption; -import java.util.ArrayList; -import java.util.BitSet; -import java.util.List; -import java.util.Map; +import java.util.*; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.CopyOnWriteArrayList; import java.util.concurrent.locks.ReentrantLock; @@ -66,6 +63,11 @@ public void load() throws IOException { logger.info("value manifest load success, dir = {}, index.file.count = {}", dir, typeMap.size()); } + @Override + public Map getFileIds() { + return new HashMap<>(typeMap); + } + @Override public BlockLocation allocate(short slot, BlockType blockType) throws IOException { int index = 0; diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java index a4643f476..2cde37cdf 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java @@ -11,10 +11,13 @@ import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.BlockLocation; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.BlockType; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.IValueManifest; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; import java.io.IOException; import java.util.Arrays; import java.util.List; +import java.util.Map; import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._4k; @@ -23,6 +26,8 @@ */ public class StringBlockReadWrite implements IStringBlockReadWrite { + private static final Logger logger = LoggerFactory.getLogger(StringBlockReadWrite.class); + private static final String READ_CACHE_CONFIG_KEY = "local.storage.string.block.read.cache.capacity"; private static final String WRITE_CACHE_CONFIG_KEY = "local.storage.string.block.write.cache.capacity"; @@ -36,12 +41,30 @@ public StringBlockReadWrite(IValueManifest valueManifest) { this.valueManifest = valueManifest; this.readCache = new LRUCache<>("string-read-block-cache", READ_CACHE_CONFIG_KEY, "32M", _4k, SizeCalculator.STRING_INSTANCE, SizeCalculator.BYTES_INSTANCE); this.writeCache = new LRUCache<>("string-write-block-cache", WRITE_CACHE_CONFIG_KEY, "32M", _4k, SizeCalculator.STRING_INSTANCE, SizeCalculator.BYTES_INSTANCE); + warm(); } private String file(BlockType blockType, long fileId) { return FileNames.stringBlockFile(valueManifest.dir(), blockType, fileId); } + private void warm() { + try { + Map fileIds = valueManifest.getFileIds(); + for (Map.Entry entry : fileIds.entrySet()) { + Long fileId = entry.getKey(); + BlockType blockType = entry.getValue(); + try { + readBlocks(fileId, 0, blockType.getBlockSize()); + } catch (Exception e) { + logger.error("warm error, fileId = {}, blockType = {}", fileId, blockType, e); + } + } + } catch (Exception e) { + logger.error("warm error", e); + } + } + @Override public byte[] get(KeyInfo keyInfo) throws IOException { BlockLocation blockLocation = keyInfo.getValueLocation().blockLocation();