From 97c2132cabf76d79ae1de27e302de2ff470fd7eb Mon Sep 17 00:00:00 2001
From: "opensearch-trigger-bot[bot]"
 <98922864+opensearch-trigger-bot[bot]@users.noreply.github.com>
Date: Thu, 6 Jun 2024 19:26:03 -0700
Subject: [PATCH] [Caching] Move cache removal notifications outside lru lock
 (#14017) (#14054)

---------


(cherry picked from commit 1852b7e4fe62711ac86107ee68823878fdf56bb4)

Signed-off-by: Sagar Upadhyaya <sagar.upadhyaya.121@gmail.com>
Signed-off-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
---
 CHANGELOG.md                                  |  1 +
 .../org/opensearch/common/cache/Cache.java    | 53 ++++++++++++++++---
 .../common/cache/RemovalListener.java         |  5 ++
 3 files changed, 52 insertions(+), 7 deletions(-)

diff --git a/CHANGELOG.md b/CHANGELOG.md
index 9ef586bef7528..5f71b3034fcd1 100644
--- a/CHANGELOG.md
+++ b/CHANGELOG.md
@@ -36,6 +36,7 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),
 - Add ability for Boolean and date field queries to run when only doc_values are enabled ([#11650](https://github.com/opensearch-project/OpenSearch/pull/11650))
 - Refactor implementations of query phase searcher, allow QueryCollectorContext to have zero collectors ([#13481](https://github.com/opensearch-project/OpenSearch/pull/13481))
 - Adds support to inject telemetry instances to plugins ([#13636](https://github.com/opensearch-project/OpenSearch/pull/13636))
+- Move cache removal notifications outside lru lock ([#14017](https://github.com/opensearch-project/OpenSearch/pull/14017))
 
 ### Deprecated
 
diff --git a/server/src/main/java/org/opensearch/common/cache/Cache.java b/server/src/main/java/org/opensearch/common/cache/Cache.java
index 6d346de25cadf..caae81e4387b4 100644
--- a/server/src/main/java/org/opensearch/common/cache/Cache.java
+++ b/server/src/main/java/org/opensearch/common/cache/Cache.java
@@ -36,9 +36,11 @@
 import org.opensearch.common.collect.Tuple;
 import org.opensearch.common.util.concurrent.ReleasableLock;
 
+import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.HashMap;
 import java.util.Iterator;
+import java.util.List;
 import java.util.Map;
 import java.util.Objects;
 import java.util.concurrent.CompletableFuture;
@@ -396,7 +398,12 @@ private V get(K key, long now, Consumer<Entry<K, V>> onExpiration) {
         if (entry == null) {
             return null;
         } else {
-            promote(entry, now);
+            List<RemovalNotification<K, V>> removalNotifications = promote(entry, now).v2();
+            if (!removalNotifications.isEmpty()) {
+                for (RemovalNotification<K, V> removalNotification : removalNotifications) {
+                    removalListener.onRemoval(removalNotification);
+                }
+            }
             return entry.value;
         }
     }
@@ -446,8 +453,14 @@ private V compute(K key, CacheLoader<K, V> loader) throws ExecutionException {
 
         BiFunction<? super Entry<K, V>, Throwable, ? extends V> handler = (ok, ex) -> {
             if (ok != null) {
+                List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
                 try (ReleasableLock ignored = lruLock.acquire()) {
-                    promote(ok, now);
+                    removalNotifications = promote(ok, now).v2();
+                }
+                if (!removalNotifications.isEmpty()) {
+                    for (RemovalNotification<K, V> removalNotification : removalNotifications) {
+                        removalListener.onRemoval(removalNotification);
+                    }
                 }
                 return ok.value;
             } else {
@@ -512,16 +525,22 @@ private void put(K key, V value, long now) {
         CacheSegment<K, V> segment = getCacheSegment(key);
         Tuple<Entry<K, V>, Entry<K, V>> tuple = segment.put(key, value, now);
         boolean replaced = false;
+        List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
         try (ReleasableLock ignored = lruLock.acquire()) {
             if (tuple.v2() != null && tuple.v2().state == State.EXISTING) {
                 if (unlink(tuple.v2())) {
                     replaced = true;
                 }
             }
-            promote(tuple.v1(), now);
+            removalNotifications = promote(tuple.v1(), now).v2();
         }
         if (replaced) {
-            removalListener.onRemoval(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
+            removalNotifications.add(new RemovalNotification<>(tuple.v2().key, tuple.v2().value, RemovalReason.REPLACED));
+        }
+        if (!removalNotifications.isEmpty()) {
+            for (RemovalNotification<K, V> removalNotification : removalNotifications) {
+                removalListener.onRemoval(removalNotification);
+            }
         }
     }
 
@@ -767,8 +786,17 @@ public long getEvictions() {
         }
     }
 
-    private boolean promote(Entry<K, V> entry, long now) {
+    /**
+     * Promotes the desired entry to the head of the lru list and tries to see if it needs to evict any entries in
+     * case the cache size is exceeding or the entry got expired.
+     * @param entry Entry to be promoted
+     * @param now the current time
+     * @return Returns a tuple. v1 signifies whether an entry got promoted, v2 signifies the list of removal
+     * notifications that the callers needs to handle.
+     */
+    private Tuple<Boolean, List<RemovalNotification<K, V>>> promote(Entry<K, V> entry, long now) {
         boolean promoted = true;
+        List<RemovalNotification<K, V>> removalNotifications = new ArrayList<>();
         try (ReleasableLock ignored = lruLock.acquire()) {
             switch (entry.state) {
                 case DELETED:
@@ -782,10 +810,21 @@ private boolean promote(Entry<K, V> entry, long now) {
                     break;
             }
             if (promoted) {
-                evict(now);
+                while (tail != null && shouldPrune(tail, now)) {
+                    Entry<K, V> entryToBeRemoved = tail;
+                    CacheSegment<K, V> segment = getCacheSegment(entryToBeRemoved.key);
+                    if (segment != null) {
+                        segment.remove(entryToBeRemoved.key, entryToBeRemoved.value, f -> {});
+                    }
+                    if (unlink(entryToBeRemoved)) {
+                        removalNotifications.add(
+                            new RemovalNotification<>(entryToBeRemoved.key, entryToBeRemoved.value, RemovalReason.EVICTED)
+                        );
+                    }
+                }
             }
         }
-        return promoted;
+        return new Tuple<>(promoted, removalNotifications);
     }
 
     private void evict(long now) {
diff --git a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java
index 68e1cdf6139e2..eaaaec2bb07e0 100644
--- a/server/src/main/java/org/opensearch/common/cache/RemovalListener.java
+++ b/server/src/main/java/org/opensearch/common/cache/RemovalListener.java
@@ -42,5 +42,10 @@
 @ExperimentalApi
 @FunctionalInterface
 public interface RemovalListener<K, V> {
+
+    /**
+     * This may be called from multiple threads at once. So implementation needs to be thread safe.
+     * @param notification removal notification for desired entry.
+     */
     void onRemoval(RemovalNotification<K, V> notification);
 }