From b033248a20351fe837be1600b082178c205d0807 Mon Sep 17 00:00:00 2001 From: Finn Carroll Date: Tue, 5 Nov 2024 11:24:02 -0800 Subject: [PATCH] Move fetchBlob to PrivilegedExceptionAction. Catch and unwrap IOException. Signed-off-by: Finn Carroll --- .../store/remote/utils/TransferManager.java | 48 ++++++++++--------- 1 file changed, 26 insertions(+), 22 deletions(-) diff --git a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java index 09355c18ace5a..110fcf0fd4b4b 100644 --- a/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java +++ b/server/src/main/java/org/opensearch/index/store/remote/utils/TransferManager.java @@ -25,6 +25,8 @@ import java.nio.file.Path; import java.security.AccessController; import java.security.PrivilegedAction; +import java.security.PrivilegedActionException; +import java.security.PrivilegedExceptionAction; import java.util.concurrent.CompletableFuture; import java.util.concurrent.CompletionException; import java.util.concurrent.atomic.AtomicBoolean; @@ -70,30 +72,32 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio final Path key = blobFetchRequest.getFilePath(); logger.trace("fetchBlob called for {}", key.toString()); - return AccessController.doPrivileged((PrivilegedAction) () -> { - CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { - if (cachedIndexInput == null || cachedIndexInput.isClosed()) { - logger.trace("Transfer Manager - IndexInput closed or not in cache"); - // Doesn't exist or is closed, either way create a new one - return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); - } else { - logger.trace("Transfer Manager - Already in cache"); - // already in the cache and ready to be used (open) - return cachedIndexInput; + try { + return AccessController.doPrivileged((PrivilegedExceptionAction) () -> { + CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> { + if (cachedIndexInput == null || cachedIndexInput.isClosed()) { + logger.trace("Transfer Manager - IndexInput closed or not in cache"); + // Doesn't exist or is closed, either way create a new one + return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest); + } else { + logger.trace("Transfer Manager - Already in cache"); + // already in the cache and ready to be used (open) + return cachedIndexInput; + } + }); + + // Cache entry was either retrieved from the cache or newly added, either + // way the reference count has been incremented by one. We can only + // decrement this reference _after_ creating the clone to be returned. + try { + return cacheEntry.getIndexInput().clone(); + } finally { + fileCache.decRef(key); } }); - - // Cache entry was either retrieved from the cache or newly added, either - // way the reference count has been incremented by one. We can only - // decrement this reference _after_ creating the clone to be returned. - try { - return cacheEntry.getIndexInput().clone(); - } catch (IOException e) { - throw new RuntimeException(e); - } finally { - fileCache.decRef(key); - } - }); + } catch (PrivilegedActionException e) { + throw (IOException) e.getException(); + } } @SuppressWarnings("removal")