From 2388c4ad9ccb44704bee0186232a0f3852285693 Mon Sep 17 00:00:00 2001 From: Samuel Vazquez Date: Fri, 12 Jul 2024 15:54:21 -0700 Subject: [PATCH] feat(batching): v6 synchronize executions when attempting to remove an entry (#2014) ### :pencil: Description cherry pick https://github.com/ExpediaGroup/graphql-kotlin/pull/2013 --------- Co-authored-by: Samuel Vazquez --- .../extensions/CompletableFutureExtensions.kt | 20 +++---- .../state/SyncExecutionExhaustedState.kt | 52 +++++++------------ ...ncExecutionExhaustedInstrumentationTest.kt | 8 +-- 3 files changed, 34 insertions(+), 46 deletions(-) diff --git a/executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/extensions/CompletableFutureExtensions.kt b/executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/extensions/CompletableFutureExtensions.kt index 98df382ed5..52fb37f17f 100644 --- a/executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/extensions/CompletableFutureExtensions.kt +++ b/executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/extensions/CompletableFutureExtensions.kt @@ -37,23 +37,25 @@ fun CompletableFuture.dispatchIfNeeded( val dataLoaderRegistry = environment.dataLoaderRegistry as? KotlinDataLoaderRegistry ?: throw MissingKotlinDataLoaderRegistryException() if (dataLoaderRegistry.dataLoadersInvokedOnDispatch()) { - val cantContinueExecution = when { + when { environment.graphQlContext.hasKey(ExecutionLevelDispatchedState::class) -> { - environment - .graphQlContext.get(ExecutionLevelDispatchedState::class) - .allExecutionsDispatched(Level(environment.executionStepInfo.path.level)) + val cantContinueExecution = + environment + .graphQlContext.get(ExecutionLevelDispatchedState::class) + .allExecutionsDispatched(Level(environment.executionStepInfo.path.level)) + if (cantContinueExecution) { + dataLoaderRegistry.dispatchAll() + } } environment.graphQlContext.hasKey(SyncExecutionExhaustedState::class) -> { environment .graphQlContext.get(SyncExecutionExhaustedState::class) - .allSyncExecutionsExhausted() + .ifAllSyncExecutionsExhausted { + dataLoaderRegistry.dispatchAll() + } } else -> throw MissingInstrumentationStateException() } - - if (cantContinueExecution) { - dataLoaderRegistry.dispatchAll() - } } return this } diff --git a/executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/state/SyncExecutionExhaustedState.kt b/executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/state/SyncExecutionExhaustedState.kt index b5fbc4bd2b..3105b2e2f8 100644 --- a/executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/state/SyncExecutionExhaustedState.kt +++ b/executions/graphql-kotlin-dataloader-instrumentation/src/main/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/state/SyncExecutionExhaustedState.kt @@ -46,20 +46,6 @@ class SyncExecutionExhaustedState( private val totalExecutions: AtomicReference = AtomicReference(totalOperations) val executions = ConcurrentHashMap() - /** - * Remove an [ExecutionBatchState] from the state in case operation does not qualify for starting an execution, - * for example: - * - parsing, validation errors - * - persisted query errors - * - an exception during execution was thrown - */ - private fun removeExecution(executionId: ExecutionId) { - if (executions.containsKey(executionId)) { - executions.remove(executionId) - totalExecutions.set(totalExecutions.get() - 1) - } - } - /** * Create the [ExecutionBatchState] When a specific [ExecutionInput] starts his execution * @@ -84,11 +70,12 @@ class SyncExecutionExhaustedState( override fun onCompleted(result: ExecutionResult?, t: Throwable?) { if ((result != null && result.errors.size > 0) || t != null) { if (executions.containsKey(parameters.executionInput.executionId)) { - executions.remove(parameters.executionInput.executionId) - totalExecutions.set(totalExecutions.get() - 1) - val allSyncExecutionsExhausted = allSyncExecutionsExhausted() - if (allSyncExecutionsExhausted) { - onSyncExecutionExhausted(executions.keys().toList()) + synchronized(executions) { + executions.remove(parameters.executionInput.executionId) + totalExecutions.set(totalExecutions.get() - 1) + } + ifAllSyncExecutionsExhausted { executionIds -> + onSyncExecutionExhausted(executionIds) } } } @@ -147,9 +134,8 @@ class SyncExecutionExhaustedState( executionState } - val allSyncExecutionsExhausted = allSyncExecutionsExhausted() - if (allSyncExecutionsExhausted) { - onSyncExecutionExhausted(executions.keys().toList()) + ifAllSyncExecutionsExhausted { executionIds -> + onSyncExecutionExhausted(executionIds) } } override fun onCompleted(result: Any?, t: Throwable?) { @@ -158,26 +144,26 @@ class SyncExecutionExhaustedState( executionState } - val allSyncExecutionsExhausted = allSyncExecutionsExhausted() - if (allSyncExecutionsExhausted) { - onSyncExecutionExhausted(executions.keys().toList()) + ifAllSyncExecutionsExhausted { executionIds -> + onSyncExecutionExhausted(executionIds) } } } } /** - * Provide the information about when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution + * execute a given [predicate] when all [ExecutionInput] sharing a [GraphQLContext] exhausted their execution. * A Synchronous Execution is considered Exhausted when all [DataFetcher]s of all paths were executed up until * a scalar leaf or a [DataFetcher] that returns a [CompletableFuture] */ - fun allSyncExecutionsExhausted(): Boolean = synchronized(executions) { - val operationsToExecute = totalExecutions.get() - when { - executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled() -> false - else -> { - executions.values.all(ExecutionBatchState::isSyncExecutionExhausted) + fun ifAllSyncExecutionsExhausted(predicate: (List) -> Unit) = + synchronized(executions) { + val operationsToExecute = totalExecutions.get() + if (executions.size < operationsToExecute || !dataLoaderRegistry.onDispatchFuturesHandled()) + return@synchronized + + if (executions.values.all(ExecutionBatchState::isSyncExecutionExhausted)) { + predicate(executions.keys().toList()) } } - } } diff --git a/executions/graphql-kotlin-dataloader-instrumentation/src/test/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentationTest.kt b/executions/graphql-kotlin-dataloader-instrumentation/src/test/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentationTest.kt index 5f5a8b74fd..89c4fa5d6b 100644 --- a/executions/graphql-kotlin-dataloader-instrumentation/src/test/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentationTest.kt +++ b/executions/graphql-kotlin-dataloader-instrumentation/src/test/kotlin/com/expediagroup/graphql/dataloader/instrumentation/syncexhaustion/DataLoaderSyncExecutionExhaustedInstrumentationTest.kt @@ -616,9 +616,9 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest { fun `Instrumentation should not consider executions that thrown exceptions`() { val executions = listOf( ExecutionInput.newExecutionInput("query test1 { astronaut(id: 1) { id name } }").operationName("test1").build(), - ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("test2").build(), - ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("test3").build(), - ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build() + ExecutionInput.newExecutionInput("query test2 { astronaut(id: 2) { id name } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(), + ExecutionInput.newExecutionInput("query test3 { mission(id: 3) { id designation } }").operationName("OPERATION_NOT_IN_DOCUMENT").build(), + ExecutionInput.newExecutionInput("query test4 { mission(id: 4) { designation } }").operationName("test4").build() ) val (results, kotlinDataLoaderRegistry) = AstronautGraphQL.execute( @@ -633,7 +633,7 @@ class DataLoaderSyncExecutionExhaustedInstrumentationTest { val missionStatistics = kotlinDataLoaderRegistry.dataLoadersMap["MissionDataLoader"]?.statistics assertEquals(1, astronautStatistics?.batchInvokeCount) - assertEquals(2, astronautStatistics?.batchLoadCount) + assertEquals(1, astronautStatistics?.batchLoadCount) assertEquals(1, missionStatistics?.batchInvokeCount) assertEquals(1, missionStatistics?.batchLoadCount)