Skip to content

Commit

Permalink
fix[cache]: fixed a bug where expired data could not be deleted
Browse files Browse the repository at this point in the history
  • Loading branch information
jaysunxiao committed Aug 16, 2024
1 parent 02f6574 commit 96485d1
Show file tree
Hide file tree
Showing 2 changed files with 28 additions and 13 deletions.
17 changes: 13 additions & 4 deletions scheduler/src/main/java/com/zfoo/scheduler/util/LazyCache.java
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
package com.zfoo.scheduler.util;

import com.zfoo.protocol.collection.CollectionUtils;
import com.zfoo.protocol.model.Pair;
import com.zfoo.protocol.util.AssertionUtils;

Expand Down Expand Up @@ -123,6 +124,16 @@ private void removeForCause(K key, RemovalCause removalCause) {
}
}

private void removeForCause(List<Pair<K, V>> list, RemovalCause removalCause) {
if (CollectionUtils.isEmpty(list)) {
return;
}
var removeList = list.stream()
.filter(it -> cacheMap.remove(it.getKey()) != null)
.toList();
removeListener.accept(removeList, removalCause);
}

public void forEach(BiConsumer<K, V> biConsumer) {
for (var entry : cacheMap.entrySet()) {
biConsumer.accept(entry.getKey(), entry.getValue().value);
Expand All @@ -143,8 +154,7 @@ private void checkMaximumSize() {
.limit(Math.max(0, cacheMap.size() - maximumSize))
.map(it -> new Pair<>(it.getKey(), it.getValue().value))
.toList();
removeList.forEach(it -> cacheMap.remove(it.getKey()));
removeListener.accept(removeList, RemovalCause.SIZE);
removeForCause(removeList, RemovalCause.SIZE);
}
}

Expand All @@ -166,8 +176,7 @@ private void checkExpire() {
minTimestamp = expireTime;
}
}
removeList.forEach(it -> cacheMap.remove(it.getKey()));
removeListener.accept(removeList, RemovalCause.EXPIRED);
removeForCause(removeList, RemovalCause.EXPIRED);
if (minTimestamp < Long.MAX_VALUE) {
this.minExpireTime = minTimestamp;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public class LazyCacheTesting {
private static final BiConsumer<List<Pair<Integer, String>>, LazyCache.RemovalCause> myRemoveCallback = new BiConsumer<List<Pair<Integer, String>>, LazyCache.RemovalCause>() {
@Override
public void accept(List<Pair<Integer, String>> pairs, LazyCache.RemovalCause removalCause) {
for(var pair : pairs) {
for (var pair : pairs) {
logger.info("remove key:[{}] value:[{}] removalCause:[{}]", pair.getKey(), pair.getValue(), removalCause);
}
}
Expand All @@ -46,8 +46,10 @@ public void putTest() {
lazyCache.put(11, "k");
lazyCache.put(12, "l");
ThreadUtils.sleep(3000);
System.out.println("first ->");
lazyCache.put(13, "m");
ThreadUtils.sleep(3000);
System.out.println("second ->");
lazyCache.put(14, "n");
ThreadUtils.sleep(3000);
}
Expand Down Expand Up @@ -109,17 +111,18 @@ public void multipleThreadMaxSizeTest() {
for (int i = 0; i < executors.length; i++) {
executors[i] = Executors.newSingleThreadExecutor();
}
var lazyCache = new LazyCache<Integer, String>(1_00, 10000000 * TimeUtils.MILLIS_PER_SECOND, 5 * TimeUtils.MILLIS_PER_SECOND, myRemoveCallback);
var lazyCache = new LazyCache<Integer, String>(1_00, 10000000 * TimeUtils.MILLIS_PER_SECOND, 5 * TimeUtils.MILLIS_PER_SECOND, null);
for (int i = 0; i < executors.length; i++) {

var executor = executors[i];
int i1 = i;
executor.execute(new Runnable() {
@Override
public void run() {
var startIndex = i1 * 1_0000;
for (int j = i1 * 1_0000; j < startIndex + 1_0000; j++) {
lazyCache.put(j, String.valueOf(j));
while (true) {
for (int j = i1 * 1_0000; j < startIndex + 1_0000; j++) {
lazyCache.put(j, String.valueOf(j));
}
}
}
});
Expand All @@ -138,7 +141,7 @@ public void multipleThreadCheckIntervalTest() {
for (int i = 0; i < executors.length; i++) {
executors[i] = Executors.newSingleThreadExecutor();
}
var lazyCache = new LazyCache<Integer, String>(1_0000, 10000000 * TimeUtils.MILLIS_PER_SECOND, 0, myRemoveCallback);
var lazyCache = new LazyCache<Integer, String>(1_0000, 10000000 * TimeUtils.MILLIS_PER_SECOND, 0, null);
for (int i = 0; i < executors.length; i++) {

var executor = executors[i];
Expand All @@ -147,8 +150,11 @@ public void multipleThreadCheckIntervalTest() {
@Override
public void run() {
var startIndex = i1 * 1_0000;
for (int j = i1 * 1_0000; j < startIndex + 1_0000; j++) {
lazyCache.put(j, String.valueOf(j));
while (true) {
for (int j = i1 * 1_0000; j < startIndex + 1_0000; j++) {
lazyCache.put(j, String.valueOf(j));
}

}
}
});
Expand Down Expand Up @@ -197,7 +203,7 @@ public void multipleThreadNotExpiredTest() {
for (int i = 0; i < executors.length; i++) {
executors[i] = Executors.newSingleThreadExecutor();
}
var lazyCache = new LazyCache<Integer, String>(1_0000, 100000 * TimeUtils.MILLIS_PER_SECOND, 5 * TimeUtils.MILLIS_PER_SECOND, myRemoveCallback);
var lazyCache = new LazyCache<Integer, String>(1_0000, 100000 * TimeUtils.MILLIS_PER_SECOND, 5 * TimeUtils.MILLIS_PER_SECOND, null);
for (int i = 0; i < executors.length; i++) {

var executor = executors[i];
Expand Down

0 comments on commit 96485d1

Please sign in to comment.