Skip to content

Commit

Permalink
Improve security-crypto threadpool overflow handling (elastic#111369) (
Browse files Browse the repository at this point in the history
…elastic#111564)

Prior to this PR, when the security-crypto threadpool queue overflows and rejects API key hashing submissions, a toxic value (specifically, a future which will never be completed) is added to the API key auth cache. This toxic cache value causes future authentication attempts with that API key to fail by timeout, because they will attempt to wait for the toxic future, until that value is invalidated and removed from the cache. Additionally, this will hold on to memory for each request that waits on the toxic future, even after the request has timed out.

This PR adds a unit test to replicate this case, and adjusts the code which submits the key hashing task to the security-crypto threadpool to properly handle this point of failure by invalidating the cached future and notifying waiting handlers that the computation has failed.
  • Loading branch information
gwbrown authored Aug 2, 2024
1 parent dde00b6 commit 1a77947
Show file tree
Hide file tree
Showing 3 changed files with 106 additions and 2 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/111369.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
pr: 111369
summary: Improve security-crypto threadpool overflow handling
area: Authentication
type: bug
issues: []
Original file line number Diff line number Diff line change
Expand Up @@ -1315,7 +1315,18 @@ void validateApiKeyCredentials(
AuthenticationResult.unsuccessful("invalid credentials for API key [" + credentials.getId() + "]", null)
);
}
}, listener::onFailure));
}, exception -> {
// Crypto threadpool queue is full, invalidate this cache entry and make sure nothing is going to wait on it
logger.warn(
Strings.format(
"rejecting possibly valid API key authentication because the [%s] threadpool is full",
SECURITY_CRYPTO_THREAD_POOL_NAME
)
);
apiKeyAuthCache.invalidate(credentials.getId(), listenableCacheEntry);
listenableCacheEntry.onFailure(exception);
listener.onFailure(exception);
}));
}
} else {
verifyKeyAgainstHash(apiKeyDoc.hash, credentials, ActionListener.wrap(verified -> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,7 @@
import java.util.Map;
import java.util.Set;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Semaphore;
Expand Down Expand Up @@ -230,6 +231,9 @@ public class ApiKeyServiceTests extends ESTestCase {
"search": [ {"names": ["logs"]} ],
"replication": [ {"names": ["archive"]} ]
}""");

private static final int TEST_THREADPOOL_QUEUE_SIZE = 1000;

private ThreadPool threadPool;
private Client client;
private SecurityIndexManager securityIndex;
Expand All @@ -245,7 +249,7 @@ public void createThreadPool() {
Settings.EMPTY,
SECURITY_CRYPTO_THREAD_POOL_NAME,
1,
1000,
TEST_THREADPOOL_QUEUE_SIZE,
"xpack.security.crypto.thread_pool",
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
)
Expand All @@ -268,6 +272,90 @@ public void setupMocks() {
doAnswer(invocation -> Instant.now()).when(clock).instant();
}

public void testFloodThreadpool() throws Exception {
// We're going to be blocking the security-crypto threadpool so we need a new one for the client
ThreadPool clientThreadpool = new TestThreadPool(
this.getTestName(),
new FixedExecutorBuilder(
Settings.EMPTY,
this.getTestName(),
1,
100,
"no_settings_used",
EsExecutors.TaskTrackingConfig.DO_NOT_TRACK
)
);
try {
when(client.threadPool()).thenReturn(clientThreadpool);

// setup copied from testAuthenticateWithApiKey
final Settings settings = Settings.builder().put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true).build();
final ApiKeyService service = createApiKeyService(settings);

final String id = randomAlphaOfLength(12);
final String key = randomAlphaOfLength(16);

final User user, authUser;
if (randomBoolean()) {
user = new User("hulk", new String[] { "superuser" }, "Bruce Banner", "[email protected]", Map.of(), true);
authUser = new User("authenticated_user", "other");
} else {
user = new User("hulk", new String[] { "superuser" }, "Bruce Banner", "[email protected]", Map.of(), true);
authUser = null;
}
final ApiKey.Type type = randomFrom(ApiKey.Type.values());
final Map<String, Object> metadata = mockKeyDocument(id, key, user, authUser, false, Duration.ofSeconds(3600), null, type);

// Block the security crypto threadpool
CyclicBarrier barrier = new CyclicBarrier(2);
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(() -> safeAwait(barrier));
// Now fill it up while the one thread is blocked
for (int i = 0; i < TEST_THREADPOOL_QUEUE_SIZE; i++) {
threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(() -> {});
}

// Check that it's full
for (var stat : threadPool.stats().stats()) {
if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) {
assertThat(stat.queue(), equalTo(TEST_THREADPOOL_QUEUE_SIZE));
assertThat(stat.rejected(), equalTo(0L));
}
}

// now try to auth with an API key
final AuthenticationResult<User> auth = tryAuthenticate(service, id, key, type);
assertThat(auth.getStatus(), is(AuthenticationResult.Status.TERMINATE));

// Make sure one was rejected and the queue is still full
for (var stat : threadPool.stats().stats()) {
if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) {
assertThat(stat.queue(), equalTo(TEST_THREADPOOL_QUEUE_SIZE));
assertThat(stat.rejected(), equalTo(1L));
}
}
ListenableFuture<CachedApiKeyHashResult> cachedValue = service.getApiKeyAuthCache().get(id);
assertThat("since the request was rejected, there should be no cache entry for this key", cachedValue, nullValue());

// unblock the threadpool
safeAwait(barrier);

// wait for the threadpool queue to drain & check that the stats as as expected
flushThreadPoolExecutor(threadPool, SECURITY_CRYPTO_THREAD_POOL_NAME);
for (var stat : threadPool.stats().stats()) {
if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) {
assertThat(stat.rejected(), equalTo(1L));
assertThat(stat.queue(), equalTo(0));
}
}

// try to authenticate again with the same key - if this hangs, check the future caching
final AuthenticationResult<User> shouldSucceed = tryAuthenticate(service, id, key, type);
assertThat(shouldSucceed.getStatus(), is(AuthenticationResult.Status.SUCCESS));
} finally {
terminate(clientThreadpool);
}
}

public void testCreateApiKeyUsesBulkIndexAction() throws Exception {
final Settings settings = Settings.builder().put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true).build();
final ApiKeyService service = createApiKeyService(settings);
Expand Down

0 comments on commit 1a77947

Please sign in to comment.