Skip to content

Commit

Permalink
Changing listener logic, removing tiered cache integration with IRC
Browse files Browse the repository at this point in the history
Signed-off-by: Sagar Upadhyaya <[email protected]>
  • Loading branch information
sgup432 committed Dec 21, 2023
1 parent 13a03a9 commit 42a111d
Show file tree
Hide file tree
Showing 20 changed files with 159 additions and 487 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,16 +6,16 @@
* compatible open source license.
*/

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.LoadAwareCacheLoader;
package org.opensearch.common.cache;

/**
* Represents a cache interface.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface Cache<K, V> {
public interface ICache<K, V> {
V get(K key);

void put(K key, V value);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,20 +15,20 @@
import org.opensearch.common.cache.RemovalNotification;
import org.opensearch.common.cache.store.builders.StoreAwareCacheBuilder;
import org.opensearch.common.cache.store.enums.CacheStoreType;
import org.opensearch.common.cache.store.listeners.EventType;
import org.opensearch.common.cache.store.listeners.dispatchers.StoreAwareCacheEventListenerDispatcher;
import org.opensearch.common.cache.store.listeners.dispatchers.StoreAwareCacheListenerDispatcherDefaultImpl;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;

/**
* This variant of on-heap cache uses OpenSearch custom cache implementation.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public class OpenSearchOnHeapCache<K, V> implements StoreAwareCache<K, V>, RemovalListener<K, V> {

private final Cache<K, V> cache;

private final StoreAwareCacheEventListenerDispatcher<K, V> eventDispatcher;
private final StoreAwareCacheEventListener<K, V> eventListener;

public OpenSearchOnHeapCache(Builder<K, V> builder) {
CacheBuilder<K, V> cacheBuilder = CacheBuilder.<K, V>builder()
Expand All @@ -39,34 +39,34 @@ public OpenSearchOnHeapCache(Builder<K, V> builder) {
cacheBuilder.setExpireAfterAccess(builder.getExpireAfterAcess());
}
cache = cacheBuilder.build();
this.eventDispatcher = new StoreAwareCacheListenerDispatcherDefaultImpl<>(builder.getListenerConfiguration());
this.eventListener = builder.getEventListener();
}

@Override
public V get(K key) {
V value = cache.get(key);
if (value != null) {
eventDispatcher.dispatch(key, value, CacheStoreType.ON_HEAP, EventType.ON_HIT);
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventDispatcher.dispatch(key, null, CacheStoreType.ON_HEAP, EventType.ON_MISS);
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
}
return value;
}

@Override
public void put(K key, V value) {
cache.put(key, value);
eventDispatcher.dispatch(key, value, CacheStoreType.ON_HEAP, EventType.ON_CACHED);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}

@Override
public V computeIfAbsent(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
V value = cache.computeIfAbsent(key, key1 -> loader.load(key));
if (!loader.isLoaded()) {
eventDispatcher.dispatch(key, value, CacheStoreType.ON_HEAP, EventType.ON_HIT);
eventListener.onHit(key, value, CacheStoreType.ON_HEAP);
} else {
eventDispatcher.dispatch(key, value, CacheStoreType.ON_HEAP, EventType.ON_MISS);
eventDispatcher.dispatch(key, value, CacheStoreType.ON_HEAP, EventType.ON_CACHED);
eventListener.onMiss(key, CacheStoreType.ON_HEAP);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
}
return value;
}
Expand All @@ -79,7 +79,7 @@ public void invalidate(K key) {
@Override
public V compute(K key, LoadAwareCacheLoader<K, V> loader) throws Exception {
V value = cache.compute(key, key1 -> loader.load(key));
eventDispatcher.dispatch(key, value, CacheStoreType.ON_HEAP, EventType.ON_CACHED);
eventListener.onCached(key, value, CacheStoreType.ON_HEAP);
return value;
}

Expand Down Expand Up @@ -110,7 +110,7 @@ public CacheStoreType getTierType() {

@Override
public void onRemoval(RemovalNotification<K, V> notification) {
eventDispatcher.dispatchRemovalEvent(
eventListener.onRemoval(
new StoreAwareCacheRemovalNotification<>(
notification.getKey(),
notification.getValue(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,13 +8,16 @@

package org.opensearch.common.cache.store;

import org.opensearch.common.cache.ICache;
import org.opensearch.common.cache.store.enums.CacheStoreType;

/**
* Represents a cache with a specific type of store like onHeap, disk etc.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.experimental
*/
public interface StoreAwareCache<K, V> extends Cache<K, V> {
public interface StoreAwareCache<K, V> extends ICache<K, V> {
CacheStoreType getTierType();
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,8 @@
* Removal notification for store aware cache.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public class StoreAwareCacheRemovalNotification<K, V> extends RemovalNotification<K, V> {
private final CacheStoreType cacheStoreType;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@
/**
* Represents a store aware cache value.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public class StoreAwareCacheValue<V> {
private final V value;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@
package org.opensearch.common.cache.store.builders;

import org.opensearch.common.cache.store.StoreAwareCache;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListenerConfiguration;
import org.opensearch.common.cache.store.listeners.StoreAwareCacheEventListener;
import org.opensearch.common.unit.TimeValue;

import java.util.function.ToLongBiFunction;
Expand All @@ -18,6 +18,8 @@
* Builder for store aware cache.
* @param <K> Type of key.
* @param <V> Type of value.
*
* @opensearch.internal
*/
public abstract class StoreAwareCacheBuilder<K, V> {

Expand All @@ -27,7 +29,7 @@ public abstract class StoreAwareCacheBuilder<K, V> {

private TimeValue expireAfterAcess;

private StoreAwareCacheEventListenerConfiguration<K, V> listenerConfiguration;
private StoreAwareCacheEventListener<K, V> eventListener;

public StoreAwareCacheBuilder() {}

Expand All @@ -46,10 +48,8 @@ public StoreAwareCacheBuilder<K, V> setExpireAfterAccess(TimeValue expireAfterAc
return this;
}

public StoreAwareCacheBuilder<K, V> setEventListenerConfiguration(
StoreAwareCacheEventListenerConfiguration<K, V> listenerConfiguration
) {
this.listenerConfiguration = listenerConfiguration;
public StoreAwareCacheBuilder<K, V> setEventListener(StoreAwareCacheEventListener<K, V> eventListener) {
this.eventListener = eventListener;
return this;
}

Expand All @@ -65,8 +65,8 @@ public ToLongBiFunction<K, V> getWeigher() {
return weigher;
}

public StoreAwareCacheEventListenerConfiguration<K, V> getListenerConfiguration() {
return listenerConfiguration;
public StoreAwareCacheEventListener<K, V> getEventListener() {
return eventListener;
}

public abstract StoreAwareCache<K, V> build();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

/**
* Cache store types in tiered cache.
*
* @opensearch.internal
*/
public enum CacheStoreType {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@

/**
* Describes various event types which is used to notify the called via listener.
*
* @opensearch.internal
*/
public enum EventType {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@
* This can be used to listen to tiered caching events
* @param <K> Type of key
* @param <V> Type of value
*
* @opensearch.internal
*/
public interface StoreAwareCacheEventListener<K, V> {

Expand Down

This file was deleted.

This file was deleted.

This file was deleted.

This file was deleted.

Loading

0 comments on commit 42a111d

Please sign in to comment.