diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java index 6249784ee..330b38f02 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/AsyncQuerySchedulerBuilder.java @@ -12,6 +12,7 @@ import org.opensearch.flint.common.scheduler.AsyncQueryScheduler; import org.opensearch.flint.core.FlintOptions; +import java.io.IOException; import java.lang.reflect.Constructor; /** @@ -30,7 +31,7 @@ public enum AsyncQuerySchedulerAction { REMOVE } - public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) { + public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) throws IOException { return new AsyncQuerySchedulerBuilder().doBuild(sparkSession, options); } @@ -41,7 +42,7 @@ public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions * @param options The FlintOptions containing configuration details. * @return An instance of AsyncQueryScheduler. */ - protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) { + protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) throws IOException { String className = options.getCustomAsyncQuerySchedulerClass(); if (className.isEmpty()) { @@ -68,7 +69,7 @@ protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(Flin return new OpenSearchAsyncQueryScheduler(options); } - protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) { + protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException { return scheduler.hasAccessToSchedulerIndex(); } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java index d8c423657..a1ef45825 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/scheduler/OpenSearchAsyncQueryScheduler.java @@ -38,6 +38,7 @@ import org.opensearch.jobscheduler.spi.schedule.Schedule; import org.opensearch.rest.RestStatus; +import java.io.IOException; import java.io.InputStream; import java.nio.charset.StandardCharsets; import java.time.Instant; @@ -140,14 +141,16 @@ void createAsyncQuerySchedulerIndex(IRestHighLevelClient client) { * @see #createClient() * @see #ensureIndexExists(IRestHighLevelClient) */ - public boolean hasAccessToSchedulerIndex() { + public boolean hasAccessToSchedulerIndex() throws IOException { + IRestHighLevelClient client = createClient(); try { - IRestHighLevelClient client = createClient(); ensureIndexExists(client); return true; } catch (Throwable e) { LOG.error("Failed to ensure index exists", e); return false; + } finally { + client.close(); } } private void ensureIndexExists(IRestHighLevelClient client) { diff --git a/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java index ea7b37f6a..3c65a96a5 100644 --- a/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java +++ b/flint-spark-integration/src/test/java/org/opensearch/flint/core/scheduler/AsyncQuerySchedulerBuilderTest.java @@ -5,7 +5,6 @@ package org.opensearch.flint.core.scheduler; -import org.apache.spark.FlintSuite; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.SQLContext; import org.junit.Before; @@ -18,6 +17,8 @@ import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder; import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler; +import java.io.IOException; + import static org.junit.Assert.assertTrue; import static org.mockito.ArgumentMatchers.anyString; import static org.mockito.Mockito.mock; @@ -41,7 +42,7 @@ public void setUp() { } @Test - public void testBuildWithEmptyClassNameAndAccessibleIndex() { + public void testBuildWithEmptyClassNameAndAccessibleIndex() throws IOException { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()).thenReturn(""); OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class); @@ -52,7 +53,7 @@ public void testBuildWithEmptyClassNameAndAccessibleIndex() { } @Test - public void testBuildWithEmptyClassNameAndInaccessibleIndex() { + public void testBuildWithEmptyClassNameAndInaccessibleIndex() throws IOException { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()).thenReturn(""); OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class); @@ -63,7 +64,7 @@ public void testBuildWithEmptyClassNameAndInaccessibleIndex() { } @Test - public void testBuildWithCustomClassName() { + public void testBuildWithCustomClassName() throws IOException { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()) .thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest"); @@ -73,7 +74,7 @@ public void testBuildWithCustomClassName() { } @Test(expected = RuntimeException.class) - public void testBuildWithInvalidClassName() { + public void testBuildWithInvalidClassName() throws IOException { FlintOptions options = mock(FlintOptions.class); when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("invalid.ClassName"); @@ -124,11 +125,11 @@ protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(Flin } @Override - protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) { + protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException { return mockHasAccess != null ? mockHasAccess : super.hasAccessToSchedulerIndex(scheduler); } - public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) { + public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) throws IOException { return new AsyncQuerySchedulerBuilderForLocalTest(asyncQueryScheduler, hasAccess).doBuild(sparkSession, options); } }