From ece555ecf2c574340922cd37e742049784045c0d Mon Sep 17 00:00:00 2001 From: Athena Brown Date: Fri, 2 Aug 2024 14:50:22 -0600 Subject: [PATCH] Improve security-crypto threadpool overflow handling (#111369) Prior to this PR, when the security-crypto threadpool queue overflows and rejects API key hashing submissions, a toxic value (specifically, a future which will never be completed) is added to the API key auth cache. This toxic cache value causes future authentication attempts with that API key to fail by timeout, because they will attempt to wait for the toxic future, until that value is invalidated and removed from the cache. Additionally, this will hold on to memory for each request that waits on the toxic future, even after the request has timed out. This PR adds a unit test to replicate this case, and adjusts the code which submits the key hashing task to the security-crypto threadpool to properly handle this point of failure by invalidating the cached future and notifying waiting handlers that the computation has failed. --- docs/changelog/111369.yaml | 5 ++ .../xpack/security/authc/ApiKeyService.java | 13 ++- .../security/authc/ApiKeyServiceTests.java | 90 ++++++++++++++++++- 3 files changed, 106 insertions(+), 2 deletions(-) create mode 100644 docs/changelog/111369.yaml diff --git a/docs/changelog/111369.yaml b/docs/changelog/111369.yaml new file mode 100644 index 0000000000000..1a638abea4e1d --- /dev/null +++ b/docs/changelog/111369.yaml @@ -0,0 +1,5 @@ +pr: 111369 +summary: Improve security-crypto threadpool overflow handling +area: Authentication +type: bug +issues: [] diff --git a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java index aaa1841bd2354..d88577f905e96 100644 --- a/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java +++ b/x-pack/plugin/security/src/main/java/org/elasticsearch/xpack/security/authc/ApiKeyService.java @@ -1315,7 +1315,18 @@ void validateApiKeyCredentials( AuthenticationResult.unsuccessful("invalid credentials for API key [" + credentials.getId() + "]", null) ); } - }, listener::onFailure)); + }, exception -> { + // Crypto threadpool queue is full, invalidate this cache entry and make sure nothing is going to wait on it + logger.warn( + Strings.format( + "rejecting possibly valid API key authentication because the [%s] threadpool is full", + SECURITY_CRYPTO_THREAD_POOL_NAME + ) + ); + apiKeyAuthCache.invalidate(credentials.getId(), listenableCacheEntry); + listenableCacheEntry.onFailure(exception); + listener.onFailure(exception); + })); } } else { verifyKeyAgainstHash(apiKeyDoc.hash, credentials, ActionListener.wrap(verified -> { diff --git a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java index 1dce6a038638b..f4d75434b92de 100644 --- a/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java +++ b/x-pack/plugin/security/src/test/java/org/elasticsearch/xpack/security/authc/ApiKeyServiceTests.java @@ -146,6 +146,7 @@ import java.util.Map; import java.util.Set; import java.util.concurrent.CompletableFuture; +import java.util.concurrent.CyclicBarrier; import java.util.concurrent.ExecutionException; import java.util.concurrent.ExecutorService; import java.util.concurrent.Semaphore; @@ -230,6 +231,9 @@ public class ApiKeyServiceTests extends ESTestCase { "search": [ {"names": ["logs"]} ], "replication": [ {"names": ["archive"]} ] }"""); + + private static final int TEST_THREADPOOL_QUEUE_SIZE = 1000; + private ThreadPool threadPool; private Client client; private SecurityIndexManager securityIndex; @@ -245,7 +249,7 @@ public void createThreadPool() { Settings.EMPTY, SECURITY_CRYPTO_THREAD_POOL_NAME, 1, - 1000, + TEST_THREADPOOL_QUEUE_SIZE, "xpack.security.crypto.thread_pool", EsExecutors.TaskTrackingConfig.DO_NOT_TRACK ) @@ -268,6 +272,90 @@ public void setupMocks() { doAnswer(invocation -> Instant.now()).when(clock).instant(); } + public void testFloodThreadpool() throws Exception { + // We're going to be blocking the security-crypto threadpool so we need a new one for the client + ThreadPool clientThreadpool = new TestThreadPool( + this.getTestName(), + new FixedExecutorBuilder( + Settings.EMPTY, + this.getTestName(), + 1, + 100, + "no_settings_used", + EsExecutors.TaskTrackingConfig.DO_NOT_TRACK + ) + ); + try { + when(client.threadPool()).thenReturn(clientThreadpool); + + // setup copied from testAuthenticateWithApiKey + final Settings settings = Settings.builder().put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true).build(); + final ApiKeyService service = createApiKeyService(settings); + + final String id = randomAlphaOfLength(12); + final String key = randomAlphaOfLength(16); + + final User user, authUser; + if (randomBoolean()) { + user = new User("hulk", new String[] { "superuser" }, "Bruce Banner", "hulk@test.com", Map.of(), true); + authUser = new User("authenticated_user", "other"); + } else { + user = new User("hulk", new String[] { "superuser" }, "Bruce Banner", "hulk@test.com", Map.of(), true); + authUser = null; + } + final ApiKey.Type type = randomFrom(ApiKey.Type.values()); + final Map metadata = mockKeyDocument(id, key, user, authUser, false, Duration.ofSeconds(3600), null, type); + + // Block the security crypto threadpool + CyclicBarrier barrier = new CyclicBarrier(2); + threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(() -> safeAwait(barrier)); + // Now fill it up while the one thread is blocked + for (int i = 0; i < TEST_THREADPOOL_QUEUE_SIZE; i++) { + threadPool.executor(SECURITY_CRYPTO_THREAD_POOL_NAME).execute(() -> {}); + } + + // Check that it's full + for (var stat : threadPool.stats().stats()) { + if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) { + assertThat(stat.queue(), equalTo(TEST_THREADPOOL_QUEUE_SIZE)); + assertThat(stat.rejected(), equalTo(0L)); + } + } + + // now try to auth with an API key + final AuthenticationResult auth = tryAuthenticate(service, id, key, type); + assertThat(auth.getStatus(), is(AuthenticationResult.Status.TERMINATE)); + + // Make sure one was rejected and the queue is still full + for (var stat : threadPool.stats().stats()) { + if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) { + assertThat(stat.queue(), equalTo(TEST_THREADPOOL_QUEUE_SIZE)); + assertThat(stat.rejected(), equalTo(1L)); + } + } + ListenableFuture cachedValue = service.getApiKeyAuthCache().get(id); + assertThat("since the request was rejected, there should be no cache entry for this key", cachedValue, nullValue()); + + // unblock the threadpool + safeAwait(barrier); + + // wait for the threadpool queue to drain & check that the stats as as expected + flushThreadPoolExecutor(threadPool, SECURITY_CRYPTO_THREAD_POOL_NAME); + for (var stat : threadPool.stats().stats()) { + if (stat.name().equals(SECURITY_CRYPTO_THREAD_POOL_NAME)) { + assertThat(stat.rejected(), equalTo(1L)); + assertThat(stat.queue(), equalTo(0)); + } + } + + // try to authenticate again with the same key - if this hangs, check the future caching + final AuthenticationResult shouldSucceed = tryAuthenticate(service, id, key, type); + assertThat(shouldSucceed.getStatus(), is(AuthenticationResult.Status.SUCCESS)); + } finally { + terminate(clientThreadpool); + } + } + public void testCreateApiKeyUsesBulkIndexAction() throws Exception { final Settings settings = Settings.builder().put(XPackSettings.API_KEY_SERVICE_ENABLED_SETTING.getKey(), true).build(); final ApiKeyService service = createApiKeyService(settings);