Skip to content

Commit

Permalink
feat: add warm logic (#364)
Browse files Browse the repository at this point in the history
  • Loading branch information
caojiajun committed Jan 10, 2025
1 parent 8704cae commit d077774
Show file tree
Hide file tree
Showing 8 changed files with 96 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,18 +37,25 @@ private SlotKeyReadWrite get(short slot) {
}

public ValueWrapper<KeyInfo> getForRunToCompletion(short slot, Key key) {
KeyInfo keyInfo = cache.get(key);
if (keyInfo != null) {
if (keyInfo == KeyInfo.DELETE) {
KeyInfo keyInfo1 = cache.get(key);
if (keyInfo1 != null) {
if (keyInfo1 == KeyInfo.DELETE) {
return () -> null;
}
if (keyInfo.isExpire()) {
if (keyInfo1.isExpire()) {
cache.put(key, KeyInfo.DELETE);
return () -> null;
}
return () -> keyInfo;
return () -> keyInfo1;
}
ValueWrapper<KeyInfo> valueWrapper = get(slot).getForRunToCompletion(key);
if (valueWrapper != null) {
KeyInfo keyInfo2 = valueWrapper.get();
if (keyInfo2 != null && keyInfo2.isExpire()) {
return () -> null;
}
}
return get(slot).getForRunToCompletion(key);
return valueWrapper;
}

public KeyInfo get(short slot, Key key) throws IOException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,9 +10,12 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot.IKeyManifest;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot.SlotInfo;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.util.KeyHashUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Map;
import java.util.Set;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants.*;

Expand All @@ -21,6 +24,8 @@
*/
public class KeyBlockReadWrite implements IKeyBlockReadWrite {

private static final Logger logger = LoggerFactory.getLogger(KeyBlockReadWrite.class);

private static final String READ_CACHE_CONFIG_KEY = "local.storage.key.block.read.cache.capacity";
private static final String WRITE_CACHE_CONFIG_KEY = "local.storage.key.block.write.cache.capacity";

Expand All @@ -34,12 +39,28 @@ public KeyBlockReadWrite(IKeyManifest keyManifest) {
this.keyManifest = keyManifest;
this.readCache = new LRUCache<>("key-read-block-cache", READ_CACHE_CONFIG_KEY, "32M", _4k, SizeCalculator.STRING_INSTANCE, SizeCalculator.BYTES_INSTANCE);
this.writeCache = new LRUCache<>("key-write-block-cache", WRITE_CACHE_CONFIG_KEY, "32M", _4k, SizeCalculator.STRING_INSTANCE, SizeCalculator.BYTES_INSTANCE);
warm();
}

private String file(long fileId) {
return FileNames.keyFile(keyManifest.dir(), fileId);
}

private void warm() {
try {
Set<Long> fileIds = keyManifest.getFileIds();
for (Long fileId : fileIds) {
try {
readBlocks(fileId, 0, _4k);
} catch (Exception e) {
logger.error("warm error, fileId = {}", fileId, e);
}
}
} catch (Exception e) {
logger.error("warm error");
}
}

@Override
public KeyInfo get(short slot, Key key) throws IOException {
SlotInfo slotInfo = keyManifest.get(slot);
Expand Down
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.netease.nim.camellia.redis.proxy.upstream.local.storage.key.slot;

import java.io.IOException;
import java.util.Set;

/**
* Created by caojiajun on 2025/1/6
Expand All @@ -13,6 +14,12 @@ public interface IKeyManifest {
*/
String dir();

/**
* get fileId list
* @return list
*/
Set<Long> getFileIds();

/**
* init and load
* 初始化
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,11 @@ public void load() throws IOException {
}
}

@Override
public Set<Long> getFileIds() {
return new HashSet<>(fileBitsMap.keySet());
}

@Override
public SlotInfo get(short slot) {
readWriteLock.readLock().lock();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,17 +28,17 @@ public class SlotKeyReadWrite {

private final short slot;
private final KeyFlushExecutor executor;
private final KeyBlockReadWrite blockCache;
private final KeyBlockReadWrite keyBlockReadWrite;

private final Map<Key, KeyInfo> mutable = new HashMap<>();
private final Map<Key, KeyInfo> immutable = new HashMap<>();

private volatile FlushStatus flushStatus = FlushStatus.FLUSH_OK;

public SlotKeyReadWrite(short slot, KeyFlushExecutor executor, KeyBlockReadWrite blockCache) {
public SlotKeyReadWrite(short slot, KeyFlushExecutor executor, KeyBlockReadWrite keyBlockReadWrite) {
this.slot = slot;
this.executor = executor;
this.blockCache = blockCache;
this.keyBlockReadWrite = keyBlockReadWrite;
}

/**
Expand All @@ -55,7 +55,7 @@ public KeyInfo get(Key key) throws IOException {
if (keyInfo == KeyInfo.DELETE) {
return null;
}
keyInfo = blockCache.get(slot, key);
keyInfo = keyBlockReadWrite.get(slot, key);
if (keyInfo == KeyInfo.DELETE) {
return null;
}
Expand All @@ -76,7 +76,7 @@ public KeyInfo getForCompact(Key key) throws IOException {
if (keyInfo == KeyInfo.DELETE) {
return null;
}
keyInfo = blockCache.getForCompact(slot, key);
keyInfo = keyBlockReadWrite.getForCompact(slot, key);
if (keyInfo == KeyInfo.DELETE) {
return null;
}
Expand All @@ -89,16 +89,16 @@ public KeyInfo getForCompact(Key key) throws IOException {
* @return key
*/
public ValueWrapper<KeyInfo> getForRunToCompletion(Key key) {
KeyInfo keyInfo = mutable.get(key);
if (keyInfo == KeyInfo.DELETE) {
KeyInfo keyInfo1 = mutable.get(key);
if (keyInfo1 == KeyInfo.DELETE) {
return () -> null;
}
keyInfo = immutable.get(key);
if (keyInfo == KeyInfo.DELETE) {
KeyInfo keyInfo2 = immutable.get(key);
if (keyInfo2 == KeyInfo.DELETE) {
return () -> null;
}
if (keyInfo.isExpire()) {
return () -> null;
if (keyInfo2 != null) {
return () -> keyInfo2;
}
return null;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,9 @@

import java.io.IOException;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

/**
* Created by caojiajun on 2025/1/6
Expand All @@ -24,6 +26,14 @@ default String dir() {
default void load() throws IOException {
}

/**
* get fileId list
* @return list
*/
default Map<Long, BlockType> getFileIds() {
return new HashMap<>();
}

/**
* allocate block
* @param slot slot
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,7 @@
import java.nio.channels.FileChannel;
import java.nio.file.Paths;
import java.nio.file.StandardOpenOption;
import java.util.ArrayList;
import java.util.BitSet;
import java.util.List;
import java.util.Map;
import java.util.*;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.locks.ReentrantLock;
Expand Down Expand Up @@ -66,6 +63,11 @@ public void load() throws IOException {
logger.info("value manifest load success, dir = {}, index.file.count = {}", dir, typeMap.size());
}

@Override
public Map<Long, BlockType> getFileIds() {
return new HashMap<>(typeMap);
}

@Override
public BlockLocation allocate(short slot, BlockType blockType) throws IOException {
int index = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,10 +11,13 @@
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.BlockLocation;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.BlockType;
import com.netease.nim.camellia.redis.proxy.upstream.local.storage.value.block.IValueManifest;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Map;

import static com.netease.nim.camellia.redis.proxy.upstream.local.storage.constants.LocalStorageConstants._4k;

Expand All @@ -23,6 +26,8 @@
*/
public class StringBlockReadWrite implements IStringBlockReadWrite {

private static final Logger logger = LoggerFactory.getLogger(StringBlockReadWrite.class);

private static final String READ_CACHE_CONFIG_KEY = "local.storage.string.block.read.cache.capacity";
private static final String WRITE_CACHE_CONFIG_KEY = "local.storage.string.block.write.cache.capacity";

Expand All @@ -36,12 +41,30 @@ public StringBlockReadWrite(IValueManifest valueManifest) {
this.valueManifest = valueManifest;
this.readCache = new LRUCache<>("string-read-block-cache", READ_CACHE_CONFIG_KEY, "32M", _4k, SizeCalculator.STRING_INSTANCE, SizeCalculator.BYTES_INSTANCE);
this.writeCache = new LRUCache<>("string-write-block-cache", WRITE_CACHE_CONFIG_KEY, "32M", _4k, SizeCalculator.STRING_INSTANCE, SizeCalculator.BYTES_INSTANCE);
warm();
}

private String file(BlockType blockType, long fileId) {
return FileNames.stringBlockFile(valueManifest.dir(), blockType, fileId);
}

private void warm() {
try {
Map<Long, BlockType> fileIds = valueManifest.getFileIds();
for (Map.Entry<Long, BlockType> entry : fileIds.entrySet()) {
Long fileId = entry.getKey();
BlockType blockType = entry.getValue();
try {
readBlocks(fileId, 0, blockType.getBlockSize());
} catch (Exception e) {
logger.error("warm error, fileId = {}, blockType = {}", fileId, blockType, e);
}
}
} catch (Exception e) {
logger.error("warm error", e);
}
}

@Override
public byte[] get(KeyInfo keyInfo) throws IOException {
BlockLocation blockLocation = keyInfo.getValueLocation().blockLocation();
Expand Down

0 comments on commit d077774

Please sign in to comment.