Skip to content

Commit

Permalink
Move fetchBlob to PrivilegedExceptionAction. Catch and unwrap IOExcep…
Browse files Browse the repository at this point in the history
…tion.

Signed-off-by: Finn Carroll <[email protected]>
  • Loading branch information
finnegancarroll committed Nov 5, 2024
1 parent 1fdfb53 commit b033248
Showing 1 changed file with 26 additions and 22 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,8 @@
import java.nio.file.Path;
import java.security.AccessController;
import java.security.PrivilegedAction;
import java.security.PrivilegedActionException;
import java.security.PrivilegedExceptionAction;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CompletionException;
import java.util.concurrent.atomic.AtomicBoolean;
Expand Down Expand Up @@ -70,30 +72,32 @@ public IndexInput fetchBlob(BlobFetchRequest blobFetchRequest) throws IOExceptio
final Path key = blobFetchRequest.getFilePath();
logger.trace("fetchBlob called for {}", key.toString());

return AccessController.doPrivileged((PrivilegedAction<IndexInput>) () -> {
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
try {
return AccessController.doPrivileged((PrivilegedExceptionAction<IndexInput>) () -> {
CachedIndexInput cacheEntry = fileCache.compute(key, (path, cachedIndexInput) -> {
if (cachedIndexInput == null || cachedIndexInput.isClosed()) {
logger.trace("Transfer Manager - IndexInput closed or not in cache");
// Doesn't exist or is closed, either way create a new one
return new DelayedCreationCachedIndexInput(fileCache, streamReader, blobFetchRequest);
} else {
logger.trace("Transfer Manager - Already in cache");
// already in the cache and ready to be used (open)
return cachedIndexInput;
}
});

// Cache entry was either retrieved from the cache or newly added, either
// way the reference count has been incremented by one. We can only
// decrement this reference _after_ creating the clone to be returned.
try {
return cacheEntry.getIndexInput().clone();
} finally {
fileCache.decRef(key);
}
});

// Cache entry was either retrieved from the cache or newly added, either
// way the reference count has been incremented by one. We can only
// decrement this reference _after_ creating the clone to be returned.
try {
return cacheEntry.getIndexInput().clone();
} catch (IOException e) {
throw new RuntimeException(e);
} finally {
fileCache.decRef(key);
}
});
} catch (PrivilegedActionException e) {
throw (IOException) e.getException();
}
}

@SuppressWarnings("removal")
Expand Down

0 comments on commit b033248

Please sign in to comment.