From 6ff1b2d058d004f533c605345372104ec0f2fd93 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Wed, 30 Oct 2024 16:53:41 -0700 Subject: [PATCH 1/3] Fallback to internal scheduler when index creation failed Signed-off-by: Louis Chu --- .../opensearch/flint/spark/FlintSpark.scala | 2 +- .../scheduler/AsyncQuerySchedulerBuilder.java | 34 +++++++- .../OpenSearchAsyncQueryScheduler.java | 26 +++++++ .../AsyncQuerySchedulerBuilderTest.java | 77 +++++++++++++++++-- .../scala/org/apache/spark/FlintSuite.scala | 6 +- .../flint/spark/FlintSparkSuite.scala | 3 + 6 files changed, 139 insertions(+), 9 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 1525b55b0..3f08b79f6 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -56,7 +56,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w } private val flintAsyncQueryScheduler: AsyncQueryScheduler = { - AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions()) + AsyncQuerySchedulerBuilder.build(spark, flintSparkConf.flintOptions()) } override protected val flintMetadataLogService: FlintMetadataLogService = { diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java index 3620608b0..6249784ee 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java @@ -7,6 +7,8 @@ import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.flint.config.FlintSparkConf; import org.opensearch.flint.common.scheduler.AsyncQueryScheduler; import org.opensearch.flint.core.FlintOptions; @@ -28,11 +30,27 @@ public enum AsyncQuerySchedulerAction { REMOVE } - public static AsyncQueryScheduler build(FlintOptions options) { + public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) { + return new AsyncQuerySchedulerBuilder().doBuild(sparkSession, options); + } + + /** + * Builds an AsyncQueryScheduler based on the provided options. + * + * @param sparkSession The SparkSession to be used. + * @param options The FlintOptions containing configuration details. + * @return An instance of AsyncQueryScheduler. + */ + protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) { String className = options.getCustomAsyncQuerySchedulerClass(); if (className.isEmpty()) { - return new OpenSearchAsyncQueryScheduler(options); + OpenSearchAsyncQueryScheduler scheduler = createOpenSearchAsyncQueryScheduler(options); + // Check if the scheduler has access to the required index. Disable the external scheduler otherwise. + if (!hasAccessToSchedulerIndex(scheduler)){ + setExternalSchedulerEnabled(sparkSession, false); + } + return scheduler; } // Attempts to instantiate AsyncQueryScheduler using reflection @@ -45,4 +63,16 @@ public static AsyncQueryScheduler build(FlintOptions options) { throw new RuntimeException("Failed to instantiate AsyncQueryScheduler: " + className, e); } } + + protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(FlintOptions options) { + return new OpenSearchAsyncQueryScheduler(options); + } + + protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) { + return scheduler.hasAccessToSchedulerIndex(); + } + + protected void setExternalSchedulerEnabled(SparkSession sparkSession, boolean enabled) { + sparkSession.sqlContext().setConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED().key(), String.valueOf(enabled)); + } } \ No newline at end of file diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java index 19532254b..d8c423657 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -9,6 +9,7 @@ import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.google.common.annotations.VisibleForTesting; +import com.google.common.collect.ImmutableMap; import org.apache.commons.io.IOUtils; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; @@ -55,6 +56,11 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler { private static final ObjectMapper mapper = new ObjectMapper(); private final FlintOptions flintOptions; + @VisibleForTesting + public OpenSearchAsyncQueryScheduler() { + this.flintOptions = new FlintOptions(ImmutableMap.of()); + } + public OpenSearchAsyncQueryScheduler(FlintOptions options) { this.flintOptions = options; } @@ -124,6 +130,26 @@ void createAsyncQuerySchedulerIndex(IRestHighLevelClient client) { } } + /** + * Checks if the current setup has access to the scheduler index. + * + * This method attempts to create a client and ensure that the scheduler index exists. + * If these operations succeed, it indicates that the user has the necessary permissions + * to access and potentially modify the scheduler index. + * + * @see #createClient() + * @see #ensureIndexExists(IRestHighLevelClient) + */ + public boolean hasAccessToSchedulerIndex() { + try { + IRestHighLevelClient client = createClient(); + ensureIndexExists(client); + return true; + } catch (Throwable e) { + LOG.error("Failed to ensure index exists", e); + return false; + } + } private void ensureIndexExists(IRestHighLevelClient client) { try { if (!client.doesIndexExist(new GetIndexRequest(SCHEDULER_INDEX_NAME), RequestOptions.DEFAULT)) { diff --git a/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java index 67b5afee5..ea7b37f6a 100644 --- a/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java +++ b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java @@ -5,7 +5,13 @@ package org.opensearch.flint.core.scheduler; +import org.apache.spark.FlintSuite; +import org.apache.spark.sql.SparkSession; +import org.apache.spark.sql.SQLContext; +import org.junit.Before; import org.junit.Test; +import org.mockito.Mock; +import org.mockito.MockitoAnnotations; import org.opensearch.flint.common.scheduler.AsyncQueryScheduler; import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest; import org.opensearch.flint.core.FlintOptions; @@ -13,26 +19,56 @@ import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler; import static org.junit.Assert.assertTrue; +import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; +import static org.mockito.Mockito.never; +import static org.mockito.Mockito.verify; import static org.mockito.Mockito.when; public class AsyncQuerySchedulerBuilderTest { + @Mock + private SparkSession sparkSession; + + @Mock + private SQLContext sqlContext; + + private AsyncQuerySchedulerBuilderForLocalTest testBuilder; + + @Before + public void setUp() { + MockitoAnnotations.openMocks(this); + when(sparkSession.sqlContext()).thenReturn(sqlContext); + } + + @Test + public void testBuildWithEmptyClassNameAndAccessibleIndex() { + FlintOptions options = mock(FlintOptions.class); + when(options.getCustomAsyncQuerySchedulerClass()).thenReturn(""); + OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class); + + AsyncQueryScheduler scheduler = testBuilder.build(mockScheduler, true, sparkSession, options); + assertTrue(scheduler instanceof OpenSearchAsyncQueryScheduler); + verify(sqlContext, never()).setConf(anyString(), anyString()); + } @Test - public void testBuildWithEmptyClassName() { + public void testBuildWithEmptyClassNameAndInaccessibleIndex() { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()).thenReturn(""); + OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class); - AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(options); + AsyncQueryScheduler scheduler = testBuilder.build(mockScheduler, false, sparkSession, options); assertTrue(scheduler instanceof OpenSearchAsyncQueryScheduler); + verify(sqlContext).setConf("spark.flint.job.externalScheduler.enabled", "false"); } @Test public void testBuildWithCustomClassName() { FlintOptions options = mock(FlintOptions.class); - when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest"); + when(options.getCustomAsyncQuerySchedulerClass()) + .thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest"); - AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(options); + AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(sparkSession, options); assertTrue(scheduler instanceof AsyncQuerySchedulerForLocalTest); } @@ -41,7 +77,7 @@ public void testBuildWithInvalidClassName() { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("invalid.ClassName"); - AsyncQuerySchedulerBuilder.build(options); + AsyncQuerySchedulerBuilder.build(sparkSession, options); } public static class AsyncQuerySchedulerForLocalTest implements AsyncQueryScheduler { @@ -65,4 +101,35 @@ public void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) { // Custom implementation } } + + public static class OpenSearchAsyncQuerySchedulerForLocalTest extends OpenSearchAsyncQueryScheduler { + @Override + public boolean hasAccessToSchedulerIndex() { + return true; + } + } + + public static class AsyncQuerySchedulerBuilderForLocalTest extends AsyncQuerySchedulerBuilder { + private OpenSearchAsyncQueryScheduler mockScheduler; + private Boolean mockHasAccess; + + public AsyncQuerySchedulerBuilderForLocalTest(OpenSearchAsyncQueryScheduler mockScheduler, Boolean mockHasAccess) { + this.mockScheduler = mockScheduler; + this.mockHasAccess = mockHasAccess; + } + + @Override + protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(FlintOptions options) { + return mockScheduler != null ? mockScheduler : super.createOpenSearchAsyncQueryScheduler(options); + } + + @Override + protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) { + return mockHasAccess != null ? mockHasAccess : super.hasAccessToSchedulerIndex(scheduler); + } + + public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) { + return new AsyncQuerySchedulerBuilderForLocalTest(asyncQueryScheduler, hasAccess).doBuild(sparkSession, options); + } + } } \ No newline at end of file diff --git a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala index ee8a52d96..15dc3cbdb 100644 --- a/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala +++ b/flint-spark-integration/src/test/scala/org/apache/spark/FlintSuite.scala @@ -9,7 +9,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation -import org.apache.spark.sql.flint.config.FlintConfigEntry +import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf} import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED import org.apache.spark.sql.internal.SQLConf import org.apache.spark.sql.test.SharedSparkSession @@ -26,6 +26,10 @@ trait FlintSuite extends SharedSparkSession { // ConstantPropagation etc. .set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName) .set("spark.sql.extensions", classOf[FlintSparkExtensions].getName) + // Override scheduler class for unit testing + .set( + FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key, + "org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest") conf } diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 3f843dbe4..95b8c2046 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -23,6 +23,7 @@ import org.scalatestplus.mockito.MockitoSugar.mock import org.apache.spark.{FlintSuite, SparkConf} import org.apache.spark.sql.QueryTest +import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} import org.apache.spark.sql.streaming.StreamTest @@ -49,6 +50,8 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit override def beforeAll(): Unit = { super.beforeAll() + // Revoke override in FlintSuite on IT + conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key) // Replace executor to avoid impact on IT. // TODO: Currently no IT test scheduler so no need to restore it back. From 7aab1cf2c5e4f537c5f278bf36056cb89c8a3e5e Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Fri, 1 Nov 2024 00:18:33 -0700 Subject: [PATCH 2/3] Fix IT Signed-off-by: Louis Chu --- .../flint/spark/FlintSparkUpdateIndexITSuite.scala | 10 ++-------- 1 file changed, 2 insertions(+), 8 deletions(-) diff --git a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala index a6f7e0ed0..c9f6c47f7 100644 --- a/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala +++ b/integ-test/src/integration/scala/org/opensearch/flint/spark/FlintSparkUpdateIndexITSuite.scala @@ -9,7 +9,6 @@ import scala.jdk.CollectionConverters.mapAsJavaMapConverter import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson import org.json4s.native.JsonMethods._ -import org.opensearch.OpenSearchException import org.opensearch.action.get.GetRequest import org.opensearch.client.RequestOptions import org.opensearch.flint.core.FlintOptions @@ -207,13 +206,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { val indexInitial = flint.describeIndex(testIndex).get indexInitial.options.refreshInterval() shouldBe Some("4 Minute") - the[OpenSearchException] thrownBy { - val client = - OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava)) - client.get( - new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, testIndex), - RequestOptions.DEFAULT) - } + indexInitial.options.isExternalSchedulerEnabled() shouldBe false // Update Flint index to change refresh interval val updatedIndex = flint @@ -228,6 +221,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite { val indexFinal = flint.describeIndex(testIndex).get indexFinal.options.autoRefresh() shouldBe true indexFinal.options.refreshInterval() shouldBe Some("5 Minutes") + indexFinal.options.isExternalSchedulerEnabled() shouldBe true indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath) // Verify scheduler index is updated From 4efb5b0b27366db4ff4068a95c985b4c16fd7301 Mon Sep 17 00:00:00 2001 From: Louis Chu Date: Fri, 1 Nov 2024 05:11:41 -0700 Subject: [PATCH 3/3] Fix IOException Signed-off-by: Louis Chu --- .../scheduler/AsyncQuerySchedulerBuilder.java | 7 ++++--- .../scheduler/OpenSearchAsyncQueryScheduler.java | 7 +++++-- .../scheduler/AsyncQuerySchedulerBuilderTest.java | 15 ++++++++------- 3 files changed, 17 insertions(+), 12 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java index 6249784ee..330b38f02 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java @@ -12,6 +12,7 @@ import org.opensearch.flint.common.scheduler.AsyncQueryScheduler; import org.opensearch.flint.core.FlintOptions; +import java.io.IOException; import java.lang.reflect.Constructor; /** @@ -30,7 +31,7 @@ public enum AsyncQuerySchedulerAction { REMOVE } - public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) { + public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) throws IOException { return new AsyncQuerySchedulerBuilder().doBuild(sparkSession, options); } @@ -41,7 +42,7 @@ public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions * @param options The FlintOptions containing configuration details. * @return An instance of AsyncQueryScheduler. */ - protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) { + protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) throws IOException { String className = options.getCustomAsyncQuerySchedulerClass(); if (className.isEmpty()) { @@ -68,7 +69,7 @@ protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(Flin return new OpenSearchAsyncQueryScheduler(options); } - protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) { + protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException { return scheduler.hasAccessToSchedulerIndex(); } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java index d8c423657..a1ef45825 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -38,6 +38,7 @@ import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.rest.RestStatus; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -140,14 +141,16 @@ void createAsyncQuerySchedulerIndex(IRestHighLevelClient client) { * @see #createClient() * @see #ensureIndexExists(IRestHighLevelClient) */ - public boolean hasAccessToSchedulerIndex() { + public boolean hasAccessToSchedulerIndex() throws IOException { + IRestHighLevelClient client = createClient(); try { - IRestHighLevelClient client = createClient(); ensureIndexExists(client); return true; } catch (Throwable e) { LOG.error("Failed to ensure index exists", e); return false; + } finally { + client.close(); } } private void ensureIndexExists(IRestHighLevelClient client) { diff --git a/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java index ea7b37f6a..3c65a96a5 100644 --- a/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java +++ b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java @@ -5,7 +5,6 @@ package org.opensearch.flint.core.scheduler; -import org.apache.spark.FlintSuite; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SQLContext; import org.junit.Before; @@ -18,6 +17,8 @@ import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder; import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler; +import java.io.IOException; + import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -41,7 +42,7 @@ public void setUp() { } @Test - public void testBuildWithEmptyClassNameAndAccessibleIndex() { + public void testBuildWithEmptyClassNameAndAccessibleIndex() throws IOException { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()).thenReturn(""); OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class); @@ -52,7 +53,7 @@ public void testBuildWithEmptyClassNameAndAccessibleIndex() { } @Test - public void testBuildWithEmptyClassNameAndInaccessibleIndex() { + public void testBuildWithEmptyClassNameAndInaccessibleIndex() throws IOException { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()).thenReturn(""); OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class); @@ -63,7 +64,7 @@ public void testBuildWithEmptyClassNameAndInaccessibleIndex() { } @Test - public void testBuildWithCustomClassName() { + public void testBuildWithCustomClassName() throws IOException { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()) .thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest"); @@ -73,7 +74,7 @@ public void testBuildWithCustomClassName() { } @Test(expected = RuntimeException.class) - public void testBuildWithInvalidClassName() { + public void testBuildWithInvalidClassName() throws IOException { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("invalid.ClassName"); @@ -124,11 +125,11 @@ protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(Flin } @Override - protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) { + protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException { return mockHasAccess != null ? mockHasAccess : super.hasAccessToSchedulerIndex(scheduler); } - public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) { + public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) throws IOException { return new AsyncQuerySchedulerBuilderForLocalTest(asyncQueryScheduler, hasAccess).doBuild(sparkSession, options); } }