Skip to content

Commit

Permalink
Fix IOException
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Nov 1, 2024
1 parent 644d74f commit 5085615
Show file tree
Hide file tree
Showing 3 changed files with 17 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@
import org.opensearch.flint.common.scheduler.AsyncQueryScheduler;
import org.opensearch.flint.core.FlintOptions;

import java.io.IOException;
import java.lang.reflect.Constructor;

/**
Expand All @@ -30,7 +31,7 @@ public enum AsyncQuerySchedulerAction {
REMOVE
}

public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) {
public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) throws IOException {
return new AsyncQuerySchedulerBuilder().doBuild(sparkSession, options);
}

Expand All @@ -41,7 +42,7 @@ public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions
* @param options The FlintOptions containing configuration details.
* @return An instance of AsyncQueryScheduler.
*/
protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) {
protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) throws IOException {
String className = options.getCustomAsyncQuerySchedulerClass();

if (className.isEmpty()) {
Expand All @@ -68,7 +69,7 @@ protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(Flin
return new OpenSearchAsyncQueryScheduler(options);
}

protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) {
protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException {
return scheduler.hasAccessToSchedulerIndex();
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.rest.RestStatus;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand Down Expand Up @@ -140,14 +141,16 @@ void createAsyncQuerySchedulerIndex(IRestHighLevelClient client) {
* @see #createClient()
* @see #ensureIndexExists(IRestHighLevelClient)
*/
public boolean hasAccessToSchedulerIndex() {
public boolean hasAccessToSchedulerIndex() throws IOException {
IRestHighLevelClient client = createClient();
try {
IRestHighLevelClient client = createClient();
ensureIndexExists(client);
return true;
} catch (Throwable e) {
LOG.error("Failed to ensure index exists", e);
return false;
} finally {
client.close();
}
}
private void ensureIndexExists(IRestHighLevelClient client) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@

package org.opensearch.flint.core.scheduler;

import org.apache.spark.FlintSuite;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.junit.Before;
Expand All @@ -18,6 +17,8 @@
import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder;
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler;

import java.io.IOException;

import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
Expand All @@ -41,7 +42,7 @@ public void setUp() {
}

@Test
public void testBuildWithEmptyClassNameAndAccessibleIndex() {
public void testBuildWithEmptyClassNameAndAccessibleIndex() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("");
OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class);
Expand All @@ -52,7 +53,7 @@ public void testBuildWithEmptyClassNameAndAccessibleIndex() {
}

@Test
public void testBuildWithEmptyClassNameAndInaccessibleIndex() {
public void testBuildWithEmptyClassNameAndInaccessibleIndex() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("");
OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class);
Expand All @@ -63,7 +64,7 @@ public void testBuildWithEmptyClassNameAndInaccessibleIndex() {
}

@Test
public void testBuildWithCustomClassName() {
public void testBuildWithCustomClassName() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass())
.thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest");
Expand All @@ -73,7 +74,7 @@ public void testBuildWithCustomClassName() {
}

@Test(expected = RuntimeException.class)
public void testBuildWithInvalidClassName() {
public void testBuildWithInvalidClassName() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("invalid.ClassName");

Expand Down Expand Up @@ -124,11 +125,11 @@ protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(Flin
}

@Override
protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) {
protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException {
return mockHasAccess != null ? mockHasAccess : super.hasAccessToSchedulerIndex(scheduler);
}

public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) {
public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) throws IOException {
return new AsyncQuerySchedulerBuilderForLocalTest(asyncQueryScheduler, hasAccess).doBuild(sparkSession, options);
}
}
Expand Down

0 comments on commit 5085615

Please sign in to comment.