From 338157a1289356dbe5defde0b51cdc2b196888f9 Mon Sep 17 00:00:00 2001 From: caojiajun Date: Wed, 8 Jan 2025 15:57:42 +0800 Subject: [PATCH] feat: ValueManifest (#364) --- .../constants/EmbeddedStorageConstants.java | 4 +- .../storage/key/slot/KeyManifest.java | 16 +- .../storage/value/block/BlockType.java | 22 +- .../storage/value/block/IValueManifest.java | 16 +- .../storage/value/block/IndexFile.java | 21 ++ .../storage/value/block/ValueManifest.java | 239 +++++++++++++++++- .../value/persist/ValueFlushExecutor.java | 4 +- .../value/string/StringBlockCache.java | 26 +- .../redis/proxy/test/KeyManifestTest.java | 50 ++-- .../proxy/test/StringValueCodecTest.java | 7 +- .../redis/proxy/test/TestFileSize.java | 23 ++ .../redis/proxy/test/ValueManifestTest.java | 77 ++++++ 12 files changed, 447 insertions(+), 58 deletions(-) create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/value/block/IndexFile.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/TestFileSize.java create mode 100644 camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/ValueManifestTest.java diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java index 682be689f..b69fdd4b9 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/constants/EmbeddedStorageConstants.java @@ -10,7 +10,7 @@ public class EmbeddedStorageConstants { public static final int _64k = 64*1024; public static final int _256k = 256*1024; public static final int _1024k = 1024*1024; - public static final int bit_size = (int)(16*1024*1024*1024L / _64k);//16Gib - public static final long block_size = 128*1024*1024*1024L;//128Gib + public static final int key_manifest_bit_size = (int)(16*1024*1024*1024L / _64k);//16Gib + public static final long data_file_size = 192*1024*1024*1024L;//128Gib public static final int block_header_len = 4+2+1+4+4; } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyManifest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyManifest.java index 1610cd61f..786ef642c 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyManifest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/main/java/com/netease/nim/camellia/redis/proxy/upstream/embedded/storage/key/slot/KeyManifest.java @@ -35,8 +35,8 @@ public class KeyManifest implements IKeyManifest { private final String fileName; private FileChannel fileChannel; - public KeyManifest(String fileName) { - this.fileName = fileName; + public KeyManifest(String dir) { + this.fileName = dir + "/key.manifest"; } @Override @@ -80,7 +80,7 @@ public void load() throws IOException { if (fileId == 0) { continue; } - BitSet bits = fileBitsMap.computeIfAbsent(fileId, k -> new BitSet(EmbeddedStorageConstants.bit_size)); + BitSet bits = fileBitsMap.computeIfAbsent(fileId, k -> new BitSet(EmbeddedStorageConstants.key_manifest_bit_size)); int bitsStart = (int)(offset / EmbeddedStorageConstants._64k); int bitsEnd = (int)((offset + capacity) / EmbeddedStorageConstants._64k); for (int index=bitsStart; index entry : fileBitsMap.entrySet()) { Long fileId = entry.getKey(); BitSet bits = entry.getValue(); - for (int i = 0; i< EmbeddedStorageConstants.bit_size; i++) { + for (int i = 0; i< EmbeddedStorageConstants.key_manifest_bit_size; i++) { boolean used = bits.get(i); if (!used) { update(slot, fileId, (long) i * EmbeddedStorageConstants._64k, EmbeddedStorageConstants._64k); @@ -157,7 +157,7 @@ public SlotInfo expand(short slot) throws IOException { int bitsStart = (int)(offset / EmbeddedStorageConstants._64k); int bitsEnd = (int)((offset + capacity) / EmbeddedStorageConstants._64k); //直接顺延扩容 - if (bitsStart + (bitsEnd - bitsStart) * 2 < EmbeddedStorageConstants.bit_size) { + if (bitsStart + (bitsEnd - bitsStart) * 2 < EmbeddedStorageConstants.key_manifest_bit_size) { boolean directExpand = true; for (int i=bitsEnd; i= bitsStep*2) { - for (int i = 0; i< EmbeddedStorageConstants.bit_size-bitsStep*2; i++) { + if (EmbeddedStorageConstants.key_manifest_bit_size - bits.cardinality() >= bitsStep*2) { + for (int i = 0; i< EmbeddedStorageConstants.key_manifest_bit_size -bitsStep*2; i++) { if (bits.get(i, bitsStep * 2).cardinality() == 0) { //clear old for (int j=bitsStart; j typeMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap> fileIdMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap allocateOffsetMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap bits1Map = new ConcurrentHashMap<>(); + private final ConcurrentHashMap bits2Map = new ConcurrentHashMap<>(); + private final ConcurrentHashMap lockMap = new ConcurrentHashMap<>(); + private final ConcurrentHashMap bitsMmp = new ConcurrentHashMap<>(); + private final ConcurrentHashMap slotsMmp = new ConcurrentHashMap<>(); + + private final ReentrantReadWriteLock lock = new ReentrantReadWriteLock(); + + public ValueManifest(String dir) { + this.dir = dir; + } + @Override public void load() throws IOException { - + File dict = new File(dir); + if (dict.isFile()) { + throw new IOException(dir + " is not dict"); + } + File[] files = dict.listFiles(); + if (files == null) { + return; + } + for (File file : files) { + loadIndexFile(file); + } + logger.info("value manifest load success, dir = {}, index.file.count = {}", dir, typeMap.size()); } @Override public BlockLocation allocate(short slot, BlockType blockType) throws IOException { - return null; + int index = 0; + while (true) { + long fileId = selectFileId(blockType, index); + BlockLocation blockLocation = allocate0(fileId); + if (blockLocation == null) { + index ++; + continue; + } + return blockLocation; + } } @Override - public BlockType blockType(long fileId) throws IOException { - return null; + public void commit(short slot, BlockLocation blockLocation) throws IOException { + long fileId = blockLocation.fileId(); + int blockId = blockLocation.blockId(); + // + BitSet bitSet1 = bits1Map.get(fileId); + if (!bitSet1.get(blockId)) { + throw new IOException("blockId not allocated"); + } + //bits2 + BitSet bitSet2 = bits2Map.get(fileId); + bitSet2.set(blockId, true); + //update file + //bits + int index = blockId / 64; + long changed = bitSet2.toLongArray()[index]; + MappedByteBuffer buffer1 = bitsMmp.get(fileId); + buffer1.putLong(index*8, changed); + //slot + MappedByteBuffer buffer2 = slotsMmp.get(fileId); + buffer2.putShort(blockId*2, slot); } @Override - public void flush(short slot, BlockLocation location) throws IOException { + public void clear(short slot, BlockLocation blockLocation) throws IOException { + long fileId = blockLocation.fileId(); + int blockId = blockLocation.blockId(); + //bits1 + bits1Map.get(fileId).set(blockId, false); + Integer offset = allocateOffsetMap.get(fileId); + if (offset > blockId) { + allocateOffsetMap.put(fileId, blockId); + } + //bits2 + BitSet bitSet = bits2Map.get(fileId); + bitSet.set(blockId, false); + //update file + int index = blockId / 64; + long[] longArray = bitSet.toLongArray(); + long changed; + if (longArray.length > index) { + changed = longArray[index]; + } else { + changed = 0; + } + MappedByteBuffer buffer1 = bitsMmp.get(fileId); + buffer1.putLong(index*8, changed); + + MappedByteBuffer buffer2 = slotsMmp.get(fileId); + buffer2.putShort(blockId*2, (short) -1); + } + + @Override + public BlockType blockType(long fileId) throws IOException { + return typeMap.get(fileId); + } + + private BlockLocation allocate0(long fileId) { + ReentrantLock lock = lockMap.get(fileId); + lock.lock(); + try { + BitSet bitSet = bits1Map.get(fileId); + Integer start = allocateOffsetMap.get(fileId); + for (int i=start; i list = fileIdMap.computeIfAbsent(blockType, k -> new CopyOnWriteArrayList<>()); + lock.readLock().lock(); + try { + if (index < list.size()) { + return list.get(index); + } + } finally { + lock.readLock().unlock(); + } + lock.writeLock().lock(); + try { + long fileId = init(blockType); + list.add(fileId); + return fileId; + } finally { + lock.writeLock().unlock(); + } + } + + private long init(BlockType blockType) throws IOException { + long fileId = System.currentTimeMillis(); + typeMap.put(fileId, blockType); + int bitSize = (int) (data_file_size / blockType.getBlockSize()); + bits1Map.put(fileId, new BitSet(bitSize)); + bits2Map.put(fileId, new BitSet(bitSize)); + lockMap.put(fileId, new ReentrantLock()); + allocateOffsetMap.put(fileId, 0); + String indexFileName = dir + "/" + fileId + "_" + blockType.getType() + ".index"; + String slotFileName = dir + "/" + fileId + "_" + blockType.getType() + ".slot"; + String dataFileName = dir + "/" + fileId + "_" + blockType.getType() + ".data"; + File indexFile = new File(indexFileName); + if (!indexFile.exists()) { + boolean newFile = indexFile.createNewFile(); + logger.info("create index.file = {}, result = {}", indexFileName, newFile); + } + File slotFile = new File(slotFileName); + if (!slotFile.exists()) { + boolean newFile = slotFile.createNewFile(); + logger.info("create slot.file = {}, result = {}", slotFileName, newFile); + } + File dataFile = new File(dataFileName); + if (!dataFile.exists()) { + boolean newFile = dataFile.createNewFile(); + logger.info("create data.file = {}, result = {}", indexFileName, newFile); + } + { + FileChannel fileChannel = FileChannel.open(Paths.get(indexFileName), StandardOpenOption.READ, StandardOpenOption.WRITE); + MappedByteBuffer map = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, bitSize / 8); + bitsMmp.put(fileId, map); + } + { + FileChannel fileChannel = FileChannel.open(Paths.get(slotFileName), StandardOpenOption.READ, StandardOpenOption.WRITE); + MappedByteBuffer map = fileChannel.map(FileChannel.MapMode.READ_WRITE, 0, bitSize * 2L); + slotsMmp.put(fileId, map); + } + return fileId; + } + + private void loadIndexFile(File file) throws IOException { + IndexFile indexFile = IndexFile.parse(file); + if (indexFile == null) { + return; + } + long fileId = indexFile.fileId(); + BlockType blockType = indexFile.blockType(); + FileChannel fileChannel = FileChannel.open(Paths.get(file.getPath()), StandardOpenOption.READ, StandardOpenOption.WRITE); + ByteBuffer buffer = ByteBuffer.allocate((int) fileChannel.size()); + fileChannel.read(buffer); + buffer.flip(); + long[] longArray = new long[(int) (fileChannel.size() / 8)]; + int index = 0; + while (buffer.hasRemaining()) { + long l = buffer.getLong(); + longArray[index] = l; + index ++; + } + BitSet bitSet1 = BitSet.valueOf(longArray); + BitSet bitSet2 = BitSet.valueOf(longArray); + bits1Map.put(fileId, bitSet1); + bits2Map.put(fileId, bitSet2); + typeMap.put(fileId, blockType); + List list = fileIdMap.computeIfAbsent(blockType, k -> new ArrayList<>()); + list.add(fileId); + int offset = 0; + for (int i=0; i list = StringValueCodec.decode(data, blockType); + if (list.isEmpty()) { + return null; + } + if (list.size() > keyInfo.getValueLocation().offset()) { + return null; + } + return list.get(keyInfo.getValueLocation().offset()); } public void updateBlockCache(short slot, long fileId, long offset, byte[] block) { diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyManifestTest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyManifestTest.java index f714d4f92..b979c645a 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyManifestTest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/KeyManifestTest.java @@ -10,7 +10,6 @@ import java.io.File; import java.io.IOException; -import java.util.UUID; import java.util.concurrent.ThreadLocalRandom; /** @@ -18,33 +17,19 @@ */ public class KeyManifestTest { - private final String fileName1 = "/tmp/" + UUID.randomUUID(); - private final String fileName2 = "/tmp/" + UUID.randomUUID(); + private final String dir = "/tmp"; private static final int _64k = 64*1024; @Before public void before() { - { - File file = new File(fileName1); - if (file.exists()) { - boolean delete = file.delete(); - System.out.println("delete tmp file1 = " + delete); - } - } - { - File file = new File(fileName2); - if (file.exists()) { - boolean delete = file.delete(); - System.out.println("delete tmp file2 = " + delete); - } - } + delete(); } @Test public void test() throws IOException { - KeyManifest keySlotMap = new KeyManifest(fileName1); + KeyManifest keySlotMap = new KeyManifest(dir); keySlotMap.load(); short slot = (short)ThreadLocalRandom.current().nextInt(RedisClusterCRC16Utils.SLOT_SIZE / 2); @@ -68,7 +53,7 @@ public void test() throws IOException { SlotInfo slotInfo3 = keySlotMap.init((short) (slot + 200)); Assert.assertEquals(slotInfo3, slotInfo); - KeyManifest keySlotMap2 = new KeyManifest(fileName1); + KeyManifest keySlotMap2 = new KeyManifest(dir); keySlotMap2.load(); SlotInfo slotInfo11 = keySlotMap2.get(slot); @@ -77,11 +62,13 @@ public void test() throws IOException { Assert.assertEquals(slotInfo1, slotInfo11); Assert.assertEquals(slotInfo2, slotInfo22); Assert.assertEquals(slotInfo3, slotInfo33); + + delete(); } @Test public void test2() throws IOException { - KeyManifest keySlotMap = new KeyManifest(fileName2); + KeyManifest keySlotMap = new KeyManifest(dir); keySlotMap.load(); long time1 = System.nanoTime(); @@ -97,23 +84,20 @@ public void test2() throws IOException { } long time3 = System.nanoTime(); System.out.println("rand expand 1000 slot, spend = " + (time3 - time2) / 1000000.0); + + delete(); } @After public void after() { - { - File file = new File(fileName1); - if (file.exists()) { - boolean delete = file.delete(); - System.out.println("delete tmp file1 = " + delete); - } - } - { - File file = new File(fileName2); - if (file.exists()) { - boolean delete = file.delete(); - System.out.println("delete tmp file2 = " + delete); - } + delete(); + } + + private void delete() { + File file = new File(dir + "/key.manifest"); + if (file.exists()) { + boolean delete = file.delete(); + System.out.println("delete " + delete); } } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java index d60d6740a..753ee2031 100644 --- a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/StringValueCodecTest.java @@ -120,7 +120,12 @@ public BlockType blockType(long fileId) throws IOException { } @Override - public void flush(short slot, BlockLocation location) throws IOException { + public void commit(short slot, BlockLocation blockLocation) throws IOException { + + } + + @Override + public void clear(short slot, BlockLocation blockLocation) throws IOException { } } diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/TestFileSize.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/TestFileSize.java new file mode 100644 index 000000000..62e955441 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/TestFileSize.java @@ -0,0 +1,23 @@ +package com.netease.nim.camellia.redis.proxy.test; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.constants.EmbeddedStorageConstants; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.BlockType; +import com.netease.nim.camellia.redis.proxy.util.Utils; +import org.junit.Test; + +/** + * Created by caojiajun on 2025/1/8 + */ +public class TestFileSize { + + @Test + public void test() { + int size = BlockType._4k.valueManifestSize(EmbeddedStorageConstants.data_file_size); + String s = Utils.humanReadableByteCountBin(size);//*.index + System.out.println(s); + + int i = (int) (2 * EmbeddedStorageConstants.data_file_size / 4096); + String s1 = Utils.humanReadableByteCountBin(i);//*.slot + System.out.println(s1); + } +} diff --git a/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/ValueManifestTest.java b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/ValueManifestTest.java new file mode 100644 index 000000000..12ed64a80 --- /dev/null +++ b/camellia-redis-proxy/camellia-redis-proxy-core/src/test/java/com/netease/nim/camellia/redis/proxy/test/ValueManifestTest.java @@ -0,0 +1,77 @@ +package com.netease.nim.camellia.redis.proxy.test; + +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.BlockLocation; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.BlockType; +import com.netease.nim.camellia.redis.proxy.upstream.embedded.storage.value.block.ValueManifest; +import org.junit.After; +import org.junit.Assert; +import org.junit.Before; +import org.junit.Test; + +import java.io.File; +import java.io.IOException; + +/** + * Created by caojiajun on 2025/1/8 + */ +public class ValueManifestTest { + + private static final String dir = "/tmp"; + + @Before + public void before() { + clearIndexFiles(); + } + + @Test + public void test() throws IOException { + { + ValueManifest valueManifest = new ValueManifest(dir); + valueManifest.load(); + + short slot = 1; + + BlockLocation location1 = valueManifest.allocate(slot, BlockType._4k); + Assert.assertEquals(0, location1.blockId()); + + BlockLocation location2 = valueManifest.allocate(slot, BlockType._4k); + Assert.assertEquals(1, location2.blockId()); + + valueManifest.commit(slot, location1); + valueManifest.commit(slot, location2); + } + + { + ValueManifest valueManifest = new ValueManifest(dir); + valueManifest.load(); + + short slot = 1; + + BlockLocation location1 = valueManifest.allocate(slot, BlockType._4k); + Assert.assertEquals(2, location1.blockId()); + + BlockLocation location2 = valueManifest.allocate(slot, BlockType._4k); + Assert.assertEquals(3, location2.blockId()); + + valueManifest.commit(slot, location1); + valueManifest.commit(slot, location2); + } + } + + @After + public void after() { + clearIndexFiles(); + } + + private void clearIndexFiles() { + File file = new File(dir); + File[] files = file.listFiles(); + if (files == null) return; + for (File file1 : files) { + if (file1.getName().endsWith(".index") || file1.getName().endsWith(".data") || file1.getName().endsWith(".slot")) { + boolean delete = file1.delete(); + System.out.println("delete " + file1.getName() + ", result = " + delete); + } + } + } +}