From c1f0054d3de63417b53b4a24700834285cf53026 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 4 Sep 2024 12:03:49 -0700 Subject: [PATCH] Use SQL thread pool --- .../spark/scheduler/job/ScheduledAsyncQueryJobRunner.java | 6 +++++- .../scheduler/job/ScheduledAsyncQueryJobRunnerTest.java | 7 +++++-- 2 files changed, 10 insertions(+), 3 deletions(-) diff --git a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java index 1134c05039..3652acf295 100644 --- a/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java +++ b/async-query/src/main/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunner.java @@ -13,6 +13,7 @@ import org.opensearch.jobscheduler.spi.ScheduledJobParameter; import org.opensearch.jobscheduler.spi.ScheduledJobRunner; import org.opensearch.plugins.Plugin; +import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; @@ -34,6 +35,9 @@ * plugin. */ public class ScheduledAsyncQueryJobRunner implements ScheduledJobRunner { + // Share SQL plugin thread pool + private static final String ASYNC_QUERY_THREAD_POOL_NAME = + AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME; private static final Logger LOGGER = LogManager.getLogger(ScheduledAsyncQueryJobRunner.class); private static ScheduledAsyncQueryJobRunner INSTANCE = new ScheduledAsyncQueryJobRunner(); @@ -96,7 +100,7 @@ public void runJob(ScheduledJobParameter jobParameter, JobExecutionContext conte LOGGER.error(throwable); } }; - threadPool.generic().submit(runnable); + threadPool.executor(ASYNC_QUERY_THREAD_POOL_NAME).submit(runnable); } void doRefresh(ScheduledAsyncQueryJobRequest request) { diff --git a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java index e8d2760dff..cba8d43a2a 100644 --- a/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java +++ b/async-query/src/test/java/org/opensearch/sql/spark/scheduler/job/ScheduledAsyncQueryJobRunnerTest.java @@ -31,6 +31,7 @@ import org.opensearch.cluster.service.ClusterService; import org.opensearch.jobscheduler.spi.JobExecutionContext; import org.opensearch.jobscheduler.spi.ScheduledJobParameter; +import org.opensearch.sql.legacy.executor.AsyncRestExecutor; import org.opensearch.sql.spark.asyncquery.AsyncQueryExecutorService; import org.opensearch.sql.spark.asyncquery.model.NullAsyncQueryRequestContext; import org.opensearch.sql.spark.rest.model.CreateAsyncQueryRequest; @@ -86,7 +87,8 @@ public void testRunJobWithCorrectParameter() { spyJobRunner.runJob(request, context); ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); - verify(threadPool.generic()).submit(captor.capture()); + verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME)) + .submit(captor.capture()); Runnable runnable = captor.getValue(); runnable.run(); @@ -143,7 +145,8 @@ public void testDoRefreshThrowsException() { spyJobRunner.runJob(request, context); ArgumentCaptor captor = ArgumentCaptor.forClass(Runnable.class); - verify(threadPool.generic()).submit(captor.capture()); + verify(threadPool.executor(AsyncRestExecutor.SQL_WORKER_THREAD_POOL_NAME)) + .submit(captor.capture()); Runnable runnable = captor.getValue(); runnable.run();