Skip to content

Commit

Permalink
Fallback to internal scheduler when index creation failed (#861)
Browse files Browse the repository at this point in the history
  • Loading branch information
noCharger authored Nov 1, 2024
1 parent d332ff0 commit 2ac46da
Show file tree
Hide file tree
Showing 7 changed files with 148 additions and 19 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}

private val flintAsyncQueryScheduler: AsyncQueryScheduler = {
AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions())
AsyncQuerySchedulerBuilder.build(spark, flintSparkConf.flintOptions())
}

override protected val flintMetadataLogService: FlintMetadataLogService = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.flint.config.FlintSparkConf;
import org.opensearch.flint.common.scheduler.AsyncQueryScheduler;
import org.opensearch.flint.core.FlintOptions;

import java.io.IOException;
import java.lang.reflect.Constructor;

/**
Expand All @@ -28,11 +31,27 @@ public enum AsyncQuerySchedulerAction {
REMOVE
}

public static AsyncQueryScheduler build(FlintOptions options) {
public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) throws IOException {
return new AsyncQuerySchedulerBuilder().doBuild(sparkSession, options);
}

/**
* Builds an AsyncQueryScheduler based on the provided options.
*
* @param sparkSession The SparkSession to be used.
* @param options The FlintOptions containing configuration details.
* @return An instance of AsyncQueryScheduler.
*/
protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) throws IOException {
String className = options.getCustomAsyncQuerySchedulerClass();

if (className.isEmpty()) {
return new OpenSearchAsyncQueryScheduler(options);
OpenSearchAsyncQueryScheduler scheduler = createOpenSearchAsyncQueryScheduler(options);
// Check if the scheduler has access to the required index. Disable the external scheduler otherwise.
if (!hasAccessToSchedulerIndex(scheduler)){
setExternalSchedulerEnabled(sparkSession, false);
}
return scheduler;
}

// Attempts to instantiate AsyncQueryScheduler using reflection
Expand All @@ -45,4 +64,16 @@ public static AsyncQueryScheduler build(FlintOptions options) {
throw new RuntimeException("Failed to instantiate AsyncQueryScheduler: " + className, e);
}
}

protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(FlintOptions options) {
return new OpenSearchAsyncQueryScheduler(options);
}

protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException {
return scheduler.hasAccessToSchedulerIndex();
}

protected void setExternalSchedulerEnabled(SparkSession sparkSession, boolean enabled) {
sparkSession.sqlContext().setConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED().key(), String.valueOf(enabled));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -37,6 +38,7 @@
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.rest.RestStatus;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand All @@ -55,6 +57,11 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {
private static final ObjectMapper mapper = new ObjectMapper();
private final FlintOptions flintOptions;

@VisibleForTesting
public OpenSearchAsyncQueryScheduler() {
this.flintOptions = new FlintOptions(ImmutableMap.of());
}

public OpenSearchAsyncQueryScheduler(FlintOptions options) {
this.flintOptions = options;
}
Expand Down Expand Up @@ -124,6 +131,28 @@ void createAsyncQuerySchedulerIndex(IRestHighLevelClient client) {
}
}

/**
* Checks if the current setup has access to the scheduler index.
*
* This method attempts to create a client and ensure that the scheduler index exists.
* If these operations succeed, it indicates that the user has the necessary permissions
* to access and potentially modify the scheduler index.
*
* @see #createClient()
* @see #ensureIndexExists(IRestHighLevelClient)
*/
public boolean hasAccessToSchedulerIndex() throws IOException {
IRestHighLevelClient client = createClient();
try {
ensureIndexExists(client);
return true;
} catch (Throwable e) {
LOG.error("Failed to ensure index exists", e);
return false;
} finally {
client.close();
}
}
private void ensureIndexExists(IRestHighLevelClient client) {
try {
if (!client.doesIndexExist(new GetIndexRequest(SCHEDULER_INDEX_NAME), RequestOptions.DEFAULT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,80 @@

package org.opensearch.flint.core.scheduler;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.flint.common.scheduler.AsyncQueryScheduler;
import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder;
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler;

import java.io.IOException;

import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class AsyncQuerySchedulerBuilderTest {
@Mock
private SparkSession sparkSession;

@Mock
private SQLContext sqlContext;

private AsyncQuerySchedulerBuilderForLocalTest testBuilder;

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
when(sparkSession.sqlContext()).thenReturn(sqlContext);
}

@Test
public void testBuildWithEmptyClassNameAndAccessibleIndex() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("");
OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class);

AsyncQueryScheduler scheduler = testBuilder.build(mockScheduler, true, sparkSession, options);
assertTrue(scheduler instanceof OpenSearchAsyncQueryScheduler);
verify(sqlContext, never()).setConf(anyString(), anyString());
}

@Test
public void testBuildWithEmptyClassName() {
public void testBuildWithEmptyClassNameAndInaccessibleIndex() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("");
OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class);

AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(options);
AsyncQueryScheduler scheduler = testBuilder.build(mockScheduler, false, sparkSession, options);
assertTrue(scheduler instanceof OpenSearchAsyncQueryScheduler);
verify(sqlContext).setConf("spark.flint.job.externalScheduler.enabled", "false");
}

@Test
public void testBuildWithCustomClassName() {
public void testBuildWithCustomClassName() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest");
when(options.getCustomAsyncQuerySchedulerClass())
.thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest");

AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(options);
AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(sparkSession, options);
assertTrue(scheduler instanceof AsyncQuerySchedulerForLocalTest);
}

@Test(expected = RuntimeException.class)
public void testBuildWithInvalidClassName() {
public void testBuildWithInvalidClassName() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("invalid.ClassName");

AsyncQuerySchedulerBuilder.build(options);
AsyncQuerySchedulerBuilder.build(sparkSession, options);
}

public static class AsyncQuerySchedulerForLocalTest implements AsyncQueryScheduler {
Expand All @@ -65,4 +102,35 @@ public void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
// Custom implementation
}
}

public static class OpenSearchAsyncQuerySchedulerForLocalTest extends OpenSearchAsyncQueryScheduler {
@Override
public boolean hasAccessToSchedulerIndex() {
return true;
}
}

public static class AsyncQuerySchedulerBuilderForLocalTest extends AsyncQuerySchedulerBuilder {
private OpenSearchAsyncQueryScheduler mockScheduler;
private Boolean mockHasAccess;

public AsyncQuerySchedulerBuilderForLocalTest(OpenSearchAsyncQueryScheduler mockScheduler, Boolean mockHasAccess) {
this.mockScheduler = mockScheduler;
this.mockHasAccess = mockHasAccess;
}

@Override
protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(FlintOptions options) {
return mockScheduler != null ? mockScheduler : super.createOpenSearchAsyncQueryScheduler(options);
}

@Override
protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException {
return mockHasAccess != null ? mockHasAccess : super.hasAccessToSchedulerIndex(scheduler);
}

public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) throws IOException {
return new AsyncQuerySchedulerBuilderForLocalTest(asyncQueryScheduler, hasAccess).doBuild(sparkSession, options);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions

import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.FlintConfigEntry
import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf}
import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -26,6 +26,10 @@ trait FlintSuite extends SharedSparkSession {
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
.set("spark.sql.extensions", classOf[FlintSparkExtensions].getName)
// Override scheduler class for unit testing
.set(
FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key,
"org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest")
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.{FlintSuite, SparkConf}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest

Expand All @@ -49,6 +50,8 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit

override def beforeAll(): Unit = {
super.beforeAll()
// Revoke override in FlintSuite on IT
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)

// Replace executor to avoid impact on IT.
// TODO: Currently no IT test scheduler so no need to restore it back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import scala.jdk.CollectionConverters.mapAsJavaMapConverter

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.native.JsonMethods._
import org.opensearch.OpenSearchException
import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.flint.core.FlintOptions
Expand Down Expand Up @@ -207,13 +206,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {

val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.refreshInterval() shouldBe Some("4 Minute")
the[OpenSearchException] thrownBy {
val client =
OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava))
client.get(
new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, testIndex),
RequestOptions.DEFAULT)
}
indexInitial.options.isExternalSchedulerEnabled() shouldBe false

// Update Flint index to change refresh interval
val updatedIndex = flint
Expand All @@ -228,6 +221,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some("5 Minutes")
indexFinal.options.isExternalSchedulerEnabled() shouldBe true
indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)

// Verify scheduler index is updated
Expand Down

0 comments on commit 2ac46da

Please sign in to comment.