diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java index cb6df14f4..50eeb5cab 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/compact/CompactExecutor.java @@ -1,5 +1,6 @@ package com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact; +import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.CacheKey; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueCodec; import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueDecodeResult; @@ -24,25 +25,39 @@ public class CompactExecutor { private static final Logger logger = LoggerFactory.getLogger(CompactExecutor.class); - private KeyReadWrite keyReadWrite; - private StringReadWrite stringReadWrite; + private final IValueManifest valueManifest; - private IValueManifest valueManifest; - private StringBlockReadWrite stringBlockReadWrite; + private final KeyReadWrite keyReadWrite; + private final StringReadWrite stringReadWrite; + private final StringBlockReadWrite stringBlockReadWrite; + + private int compactIntervalSeconds; + private Map blockLimit; private final ConcurrentHashMap lastCompactTimeMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap nextOffsetMap = new ConcurrentHashMap<>(); private final ConcurrentHashMap nextBlockTypeMap = new ConcurrentHashMap<>(); + public CompactExecutor(IValueManifest valueManifest, KeyReadWrite keyReadWrite, + StringReadWrite stringReadWrite, StringBlockReadWrite stringBlockReadWrite) { + this.valueManifest = valueManifest; + this.keyReadWrite = keyReadWrite; + this.stringReadWrite = stringReadWrite; + this.stringBlockReadWrite = stringBlockReadWrite; + updateConf(); + ProxyDynamicConf.registerCallback(this::updateConf); + } + public void compact(short slot) { Long lastCompactTime = lastCompactTimeMap.get(slot); - if (lastCompactTime != null && TimeCache.currentMillis - lastCompactTime < 10*1000) { + if (lastCompactTime != null && TimeCache.currentMillis - lastCompactTime < compactIntervalSeconds*1000L) { return; } + BlockType blockType = nextBlockType(slot); + int offset = nextOffset(blockType, slot); + int limit = blockLimit.getOrDefault(blockType, 1); try { - BlockType blockType = nextBlockType(slot); - int offset = nextOffset(blockType, slot); - List blocks = valueManifest.getBlocks(slot, blockType, offset, 4); + List blocks = valueManifest.getBlocks(slot, blockType, offset, limit); if (blocks.isEmpty()) { updateNextOffset(blockType, slot, 0); return; @@ -50,12 +65,12 @@ public void compact(short slot) { List> values = new ArrayList<>(); List recycleBlocks = new ArrayList<>(); - for (BlockLocation block : blocks) { - long fileId = block.fileId(); - int blockId = block.blockId(); - byte[] bytes = stringBlockReadWrite.getBlock(blockType, fileId, (long) blockId * blockType.getBlockSize()); + for (BlockLocation blockLocation : blocks) { + long fileId = blockLocation.fileId(); + int blockId = blockLocation.blockId(); + byte[] block = stringBlockReadWrite.getBlock(blockType, fileId, (long) blockId * blockType.getBlockSize()); - StringValueDecodeResult decodeResult = StringValueCodec.decode(bytes, blockType); + StringValueDecodeResult decodeResult = StringValueCodec.decode(block, blockType); List list = decodeResult.values(); List> surviving = new ArrayList<>(); @@ -68,8 +83,7 @@ public void compact(short slot) { continue; } if (keyInfo.getValueLocation() != null) { - BlockLocation blockLocation = keyInfo.getValueLocation().blockLocation(); - if (blockLocation.equals(block)) { + if (keyInfo.getValueLocation().blockLocation().equals(blockLocation)) { surviving.add(new Pair<>(keyInfo, stringValue.value())); } } @@ -90,7 +104,7 @@ public void compact(short slot) { } if (recycle) { values.addAll(surviving); - recycleBlocks.add(block); + recycleBlocks.add(blockLocation); } } if (!values.isEmpty()) { @@ -103,15 +117,29 @@ public void compact(short slot) { valueManifest.recycle(slot, block); } if (recycleBlocks.isEmpty()) { - updateNextOffset(blockType, slot, offset + 4); + updateNextOffset(blockType, slot, offset + limit); } } catch (Exception e) { - logger.error("compact error, slot = {}", slot, e); + logger.error("compact error, slot = {}, blockType = {}, offset = {}, limit = {}", slot, blockType, offset, limit, e); } finally { lastCompactTimeMap.put(slot, TimeCache.currentMillis); } } + private void updateConf() { + compactIntervalSeconds = ProxyDynamicConf.getInt("local.storage.compact.interval.seconds", 10); + Map blockLimit = new HashMap<>(); + for (BlockType type : BlockType.values()) { + String key = "local.storage.compact.block.type." + type.getType() + ".limit"; + if (type == BlockType._4k) { + blockLimit.put(type, ProxyDynamicConf.getInt(key, 4)); + } else { + blockLimit.put(type, ProxyDynamicConf.getInt(key, 1)); + } + } + this.blockLimit = blockLimit; + } + private BlockType nextBlockType(short slot) { BlockType blockType = nextBlockTypeMap.get(slot); if (blockType == null) { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java index 021d2f5c6..1e58b42cd 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/KeyReadWrite.java @@ -58,14 +58,14 @@ public KeyInfo get(short slot, CacheKey key) throws IOException { } public KeyInfo getForCompact(short slot, CacheKey key) throws IOException { - KeyInfo keyInfo = cache.get(new CacheKey(key.key())); + KeyInfo keyInfo = cache.get(key); if (keyInfo != null) { if (keyInfo == KeyInfo.DELETE) { return null; } return keyInfo; } - keyInfo = get(slot).get(key); + keyInfo = get(slot).getForCompact(key); if (keyInfo == null) { return null; } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/IKeyBlockReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/IKeyBlockReadWrite.java index a251da4e8..3b69a328b 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/IKeyBlockReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/IKeyBlockReadWrite.java @@ -21,6 +21,15 @@ public interface IKeyBlockReadWrite extends IBlockReadWrite { */ KeyInfo get(short slot, CacheKey key) throws IOException; + /** + * 获取一个key + * @param slot slot + * @param key key + * @return key + * @throws IOException exception + */ + KeyInfo getForCompact(short slot, CacheKey key) throws IOException; + /** * get block * @param fileId fileId diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/KeyBlockReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/KeyBlockReadWrite.java index 3edf97614..3aa25c066 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/KeyBlockReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/block/KeyBlockReadWrite.java @@ -70,6 +70,32 @@ public KeyInfo get(short slot, CacheKey key) throws IOException { return data; } + @Override + public KeyInfo getForCompact(short slot, CacheKey key) throws IOException { + SlotInfo slotInfo = keyManifest.get(slot); + long fileId = slotInfo.fileId(); + long offset = slotInfo.offset(); + int capacity = slotInfo.capacity(); + int bucketSize = capacity / _4k; + int bucket = KeyHashUtils.hash(key.key()) % bucketSize; + + String cacheKey = fileId + "|" + offset; + byte[] block = readCache.get(cacheKey); + if (block == null) { + block = writeCache.get(cacheKey); + } + if (block == null) { + block = fileReadWrite.read(file(fileId), offset, bucket * _4k); + writeCache.put(cacheKey, block); + } + Map map = KeyCodec.decodeBucket(block); + KeyInfo data = map.get(key); + if (data == null) { + return KeyInfo.DELETE; + } + return data; + } + @Override public void updateBlockCache(long fileId, long offset, byte[] block) { if (block.length != _4k) { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java index 040db8aa2..9f78f140a 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/key/slot/SlotKeyReadWrite.java @@ -58,6 +58,27 @@ public KeyInfo get(CacheKey key) throws IOException { return keyInfo; } + /** + * 获取一个key + * @param key key + * @return key + */ + public KeyInfo getForCompact(CacheKey key) throws IOException { + KeyInfo keyInfo = mutable.get(key); + if (keyInfo == KeyInfo.DELETE) { + return null; + } + keyInfo = immutable.get(key); + if (keyInfo == KeyInfo.DELETE) { + return null; + } + keyInfo = blockCache.getForCompact(slot, key); + if (keyInfo == KeyInfo.DELETE) { + return null; + } + return keyInfo; + } + /** * 写入一个key * @param key key diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/ValueManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/ValueManifest.java index 04a117b43..5661b3b41 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/ValueManifest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/block/ValueManifest.java @@ -109,7 +109,7 @@ public void recycle(short slot, BlockLocation blockLocation) throws IOException //bits1 BitSet bitSet1 = bits1Map.get(fileId); if (!bitSet1.get(blockId)) { - return; + throw new IOException("fileId=" + fileId + ",blockId=" + blockId + " not allocated"); } bitSet1.set(blockId, false); Integer offset = allocateOffsetMap.get(fileId); diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java index 2c3c3734d..eef2fbffa 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/local/storage/value/string/block/StringBlockReadWrite.java @@ -93,7 +93,9 @@ public byte[] getBlock(BlockType blockType, long fileId, long offset) throws IOE return block; } String file = file(blockType, fileId); - return fileReadWrite.read(file, offset, blockType.getBlockSize()); + block = fileReadWrite.read(file, offset, blockType.getBlockSize()); + writeCache.put(cacheKey, block);; + return block; } @Override