Skip to content

Commit

Permalink
feat: optimize compact (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 9, 2025
1 parent 9800701 commit bc5bf94
Show file tree
Hide file tree
Showing 7 changed files with 108 additions and 22 deletions.
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.compact;

import com.netease.nim.camellia.redis.proxy.conf.ProxyDynamicConf;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.cache.CacheKey;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueCodec;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.codec.StringValueDecodeResult;
Expand All @@ -24,38 +25,52 @@ public class CompactExecutor {

private static final Logger logger = LoggerFactory.getLogger(CompactExecutor.class);

private KeyReadWrite keyReadWrite;
private StringReadWrite stringReadWrite;
private final IValueManifest valueManifest;

private IValueManifest valueManifest;
private StringBlockReadWrite stringBlockReadWrite;
private final KeyReadWrite keyReadWrite;
private final StringReadWrite stringReadWrite;
private final StringBlockReadWrite stringBlockReadWrite;

private int compactIntervalSeconds;
private Map<BlockType, Integer> blockLimit;

private final ConcurrentHashMap<Short, Long> lastCompactTimeMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<String, Integer> nextOffsetMap = new ConcurrentHashMap<>();
private final ConcurrentHashMap<Short, BlockType> nextBlockTypeMap = new ConcurrentHashMap<>();

public CompactExecutor(IValueManifest valueManifest, KeyReadWrite keyReadWrite,
StringReadWrite stringReadWrite, StringBlockReadWrite stringBlockReadWrite) {
this.valueManifest = valueManifest;
this.keyReadWrite = keyReadWrite;
this.stringReadWrite = stringReadWrite;
this.stringBlockReadWrite = stringBlockReadWrite;
updateConf();
ProxyDynamicConf.registerCallback(this::updateConf);
}

public void compact(short slot) {
Long lastCompactTime = lastCompactTimeMap.get(slot);
if (lastCompactTime != null && TimeCache.currentMillis - lastCompactTime < 10*1000) {
if (lastCompactTime != null && TimeCache.currentMillis - lastCompactTime < compactIntervalSeconds*1000L) {
return;
}
BlockType blockType = nextBlockType(slot);
int offset = nextOffset(blockType, slot);
int limit = blockLimit.getOrDefault(blockType, 1);
try {
BlockType blockType = nextBlockType(slot);
int offset = nextOffset(blockType, slot);
List<BlockLocation> blocks = valueManifest.getBlocks(slot, blockType, offset, 4);
List<BlockLocation> blocks = valueManifest.getBlocks(slot, blockType, offset, limit);
if (blocks.isEmpty()) {
updateNextOffset(blockType, slot, 0);
return;
}
List<Pair<KeyInfo, byte[]>> values = new ArrayList<>();
List<BlockLocation> recycleBlocks = new ArrayList<>();

for (BlockLocation block : blocks) {
long fileId = block.fileId();
int blockId = block.blockId();
byte[] bytes = stringBlockReadWrite.getBlock(blockType, fileId, (long) blockId * blockType.getBlockSize());
for (BlockLocation blockLocation : blocks) {
long fileId = blockLocation.fileId();
int blockId = blockLocation.blockId();
byte[] block = stringBlockReadWrite.getBlock(blockType, fileId, (long) blockId * blockType.getBlockSize());

StringValueDecodeResult decodeResult = StringValueCodec.decode(bytes, blockType);
StringValueDecodeResult decodeResult = StringValueCodec.decode(block, blockType);
List<byte[]> list = decodeResult.values();

List<Pair<KeyInfo, byte[]>> surviving = new ArrayList<>();
Expand All @@ -68,8 +83,7 @@ public void compact(short slot) {
continue;
}
if (keyInfo.getValueLocation() != null) {
BlockLocation blockLocation = keyInfo.getValueLocation().blockLocation();
if (blockLocation.equals(block)) {
if (keyInfo.getValueLocation().blockLocation().equals(blockLocation)) {
surviving.add(new Pair<>(keyInfo, stringValue.value()));
}
}
Expand All @@ -90,7 +104,7 @@ public void compact(short slot) {
}
if (recycle) {
values.addAll(surviving);
recycleBlocks.add(block);
recycleBlocks.add(blockLocation);
}
}
if (!values.isEmpty()) {
Expand All @@ -103,15 +117,29 @@ public void compact(short slot) {
valueManifest.recycle(slot, block);
}
if (recycleBlocks.isEmpty()) {
updateNextOffset(blockType, slot, offset + 4);
updateNextOffset(blockType, slot, offset + limit);
}
} catch (Exception e) {
logger.error("compact error, slot = {}", slot, e);
logger.error("compact error, slot = {}, blockType = {}, offset = {}, limit = {}", slot, blockType, offset, limit, e);
} finally {
lastCompactTimeMap.put(slot, TimeCache.currentMillis);
}
}

private void updateConf() {
compactIntervalSeconds = ProxyDynamicConf.getInt("local.storage.compact.interval.seconds", 10);
Map<BlockType, Integer> blockLimit = new HashMap<>();
for (BlockType type : BlockType.values()) {
String key = "local.storage.compact.block.type." + type.getType() + ".limit";
if (type == BlockType._4k) {
blockLimit.put(type, ProxyDynamicConf.getInt(key, 4));
} else {
blockLimit.put(type, ProxyDynamicConf.getInt(key, 1));
}
}
this.blockLimit = blockLimit;
}

private BlockType nextBlockType(short slot) {
BlockType blockType = nextBlockTypeMap.get(slot);
if (blockType == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,14 +58,14 @@ public KeyInfo get(short slot, CacheKey key) throws IOException {
}

public KeyInfo getForCompact(short slot, CacheKey key) throws IOException {
KeyInfo keyInfo = cache.get(new CacheKey(key.key()));
KeyInfo keyInfo = cache.get(key);
if (keyInfo != null) {
if (keyInfo == KeyInfo.DELETE) {
return null;
}
return keyInfo;
}
keyInfo = get(slot).get(key);
keyInfo = get(slot).getForCompact(key);
if (keyInfo == null) {
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,15 @@ public interface IKeyBlockReadWrite extends IBlockReadWrite {
*/
KeyInfo get(short slot, CacheKey key) throws IOException;

/**
* 获取一个key
* @param slot slot
* @param key key
* @return key
* @throws IOException exception
*/
KeyInfo getForCompact(short slot, CacheKey key) throws IOException;

/**
* get block
* @param fileId fileId
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,32 @@ public KeyInfo get(short slot, CacheKey key) throws IOException {
return data;
}

@Override
public KeyInfo getForCompact(short slot, CacheKey key) throws IOException {
SlotInfo slotInfo = keyManifest.get(slot);
long fileId = slotInfo.fileId();
long offset = slotInfo.offset();
int capacity = slotInfo.capacity();
int bucketSize = capacity / _4k;
int bucket = KeyHashUtils.hash(key.key()) % bucketSize;

String cacheKey = fileId + "|" + offset;
byte[] block = readCache.get(cacheKey);
if (block == null) {
block = writeCache.get(cacheKey);
}
if (block == null) {
block = fileReadWrite.read(file(fileId), offset, bucket * _4k);
writeCache.put(cacheKey, block);
}
Map<CacheKey, KeyInfo> map = KeyCodec.decodeBucket(block);
KeyInfo data = map.get(key);
if (data == null) {
return KeyInfo.DELETE;
}
return data;
}

@Override
public void updateBlockCache(long fileId, long offset, byte[] block) {
if (block.length != _4k) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,27 @@ public KeyInfo get(CacheKey key) throws IOException {
return keyInfo;
}

/**
* 获取一个key
* @param key key
* @return key
*/
public KeyInfo getForCompact(CacheKey key) throws IOException {
KeyInfo keyInfo = mutable.get(key);
if (keyInfo == KeyInfo.DELETE) {
return null;
}
keyInfo = immutable.get(key);
if (keyInfo == KeyInfo.DELETE) {
return null;
}
keyInfo = blockCache.getForCompact(slot, key);
if (keyInfo == KeyInfo.DELETE) {
return null;
}
return keyInfo;
}

/**
* 写入一个key
* @param key key
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ public void recycle(short slot, BlockLocation blockLocation) throws IOException
//bits1
BitSet bitSet1 = bits1Map.get(fileId);
if (!bitSet1.get(blockId)) {
return;
throw new IOException("fileId=" + fileId + ",blockId=" + blockId + " not allocated");
}
bitSet1.set(blockId, false);
Integer offset = allocateOffsetMap.get(fileId);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -93,7 +93,9 @@ public byte[] getBlock(BlockType blockType, long fileId, long offset) throws IOE
return block;
}
String file = file(blockType, fileId);
return fileReadWrite.read(file, offset, blockType.getBlockSize());
block = fileReadWrite.read(file, offset, blockType.getBlockSize());
writeCache.put(cacheKey, block);;
return block;
}

@Override
Expand Down

0 comments on commit bc5bf94

Please sign in to comment.