Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fallback to internal scheduler when index creation failed #861

Merged
merged 3 commits into from
Nov 1, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
}

private val flintAsyncQueryScheduler: AsyncQueryScheduler = {
AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions())
AsyncQuerySchedulerBuilder.build(spark, flintSparkConf.flintOptions())
}

override protected val flintMetadataLogService: FlintMetadataLogService = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@

import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.flint.config.FlintSparkConf;
import org.opensearch.flint.common.scheduler.AsyncQueryScheduler;
import org.opensearch.flint.core.FlintOptions;

import java.io.IOException;
import java.lang.reflect.Constructor;

/**
Expand All @@ -28,11 +31,27 @@ public enum AsyncQuerySchedulerAction {
REMOVE
}

public static AsyncQueryScheduler build(FlintOptions options) {
public static AsyncQueryScheduler build(SparkSession sparkSession, FlintOptions options) throws IOException {
return new AsyncQuerySchedulerBuilder().doBuild(sparkSession, options);
}

/**
* Builds an AsyncQueryScheduler based on the provided options.
*
* @param sparkSession The SparkSession to be used.
* @param options The FlintOptions containing configuration details.
* @return An instance of AsyncQueryScheduler.
*/
protected AsyncQueryScheduler doBuild(SparkSession sparkSession, FlintOptions options) throws IOException {
String className = options.getCustomAsyncQuerySchedulerClass();

if (className.isEmpty()) {
return new OpenSearchAsyncQueryScheduler(options);
OpenSearchAsyncQueryScheduler scheduler = createOpenSearchAsyncQueryScheduler(options);
// Check if the scheduler has access to the required index. Disable the external scheduler otherwise.
if (!hasAccessToSchedulerIndex(scheduler)){
setExternalSchedulerEnabled(sparkSession, false);
}
return scheduler;
}

// Attempts to instantiate AsyncQueryScheduler using reflection
Expand All @@ -45,4 +64,16 @@ public static AsyncQueryScheduler build(FlintOptions options) {
throw new RuntimeException("Failed to instantiate AsyncQueryScheduler: " + className, e);
}
}

protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(FlintOptions options) {
return new OpenSearchAsyncQueryScheduler(options);
}

protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException {
return scheduler.hasAccessToSchedulerIndex();
}

protected void setExternalSchedulerEnabled(SparkSession sparkSession, boolean enabled) {
sparkSession.sqlContext().setConf(FlintSparkConf.EXTERNAL_SCHEDULER_ENABLED().key(), String.valueOf(enabled));
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.google.common.annotations.VisibleForTesting;
import com.google.common.collect.ImmutableMap;
import org.apache.commons.io.IOUtils;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
Expand Down Expand Up @@ -37,6 +38,7 @@
import org.opensearch.jobscheduler.spi.schedule.Schedule;
import org.opensearch.rest.RestStatus;

import java.io.IOException;
import java.io.InputStream;
import java.nio.charset.StandardCharsets;
import java.time.Instant;
Expand All @@ -55,6 +57,11 @@ public class OpenSearchAsyncQueryScheduler implements AsyncQueryScheduler {
private static final ObjectMapper mapper = new ObjectMapper();
private final FlintOptions flintOptions;

@VisibleForTesting
public OpenSearchAsyncQueryScheduler() {
this.flintOptions = new FlintOptions(ImmutableMap.of());
}

public OpenSearchAsyncQueryScheduler(FlintOptions options) {
this.flintOptions = options;
}
Expand Down Expand Up @@ -124,6 +131,28 @@ void createAsyncQuerySchedulerIndex(IRestHighLevelClient client) {
}
}

/**
* Checks if the current setup has access to the scheduler index.
*
* This method attempts to create a client and ensure that the scheduler index exists.
* If these operations succeed, it indicates that the user has the necessary permissions
* to access and potentially modify the scheduler index.
*
* @see #createClient()
* @see #ensureIndexExists(IRestHighLevelClient)
*/
public boolean hasAccessToSchedulerIndex() throws IOException {
IRestHighLevelClient client = createClient();
try {
ensureIndexExists(client);
return true;
} catch (Throwable e) {
LOG.error("Failed to ensure index exists", e);
return false;
} finally {
client.close();
}
}
private void ensureIndexExists(IRestHighLevelClient client) {
try {
if (!client.doesIndexExist(new GetIndexRequest(SCHEDULER_INDEX_NAME), RequestOptions.DEFAULT)) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,43 +5,80 @@

package org.opensearch.flint.core.scheduler;

import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.SQLContext;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mock;
import org.mockito.MockitoAnnotations;
import org.opensearch.flint.common.scheduler.AsyncQueryScheduler;
import org.opensearch.flint.common.scheduler.model.AsyncQuerySchedulerRequest;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.spark.scheduler.AsyncQuerySchedulerBuilder;
import org.opensearch.flint.spark.scheduler.OpenSearchAsyncQueryScheduler;

import java.io.IOException;

import static org.junit.Assert.assertTrue;
import static org.mockito.ArgumentMatchers.anyString;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.never;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class AsyncQuerySchedulerBuilderTest {
@Mock
private SparkSession sparkSession;

@Mock
private SQLContext sqlContext;

private AsyncQuerySchedulerBuilderForLocalTest testBuilder;

@Before
public void setUp() {
MockitoAnnotations.openMocks(this);
when(sparkSession.sqlContext()).thenReturn(sqlContext);
}

@Test
public void testBuildWithEmptyClassNameAndAccessibleIndex() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("");
OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class);

AsyncQueryScheduler scheduler = testBuilder.build(mockScheduler, true, sparkSession, options);
assertTrue(scheduler instanceof OpenSearchAsyncQueryScheduler);
verify(sqlContext, never()).setConf(anyString(), anyString());
}

@Test
public void testBuildWithEmptyClassName() {
public void testBuildWithEmptyClassNameAndInaccessibleIndex() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("");
OpenSearchAsyncQueryScheduler mockScheduler = mock(OpenSearchAsyncQueryScheduler.class);

AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(options);
AsyncQueryScheduler scheduler = testBuilder.build(mockScheduler, false, sparkSession, options);
assertTrue(scheduler instanceof OpenSearchAsyncQueryScheduler);
verify(sqlContext).setConf("spark.flint.job.externalScheduler.enabled", "false");
}

@Test
public void testBuildWithCustomClassName() {
public void testBuildWithCustomClassName() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest");
when(options.getCustomAsyncQuerySchedulerClass())
.thenReturn("org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest");

AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(options);
AsyncQueryScheduler scheduler = AsyncQuerySchedulerBuilder.build(sparkSession, options);
assertTrue(scheduler instanceof AsyncQuerySchedulerForLocalTest);
}

@Test(expected = RuntimeException.class)
public void testBuildWithInvalidClassName() {
public void testBuildWithInvalidClassName() throws IOException {
FlintOptions options = mock(FlintOptions.class);
when(options.getCustomAsyncQuerySchedulerClass()).thenReturn("invalid.ClassName");

AsyncQuerySchedulerBuilder.build(options);
AsyncQuerySchedulerBuilder.build(sparkSession, options);
}

public static class AsyncQuerySchedulerForLocalTest implements AsyncQueryScheduler {
Expand All @@ -65,4 +102,35 @@ public void removeJob(AsyncQuerySchedulerRequest asyncQuerySchedulerRequest) {
// Custom implementation
}
}

public static class OpenSearchAsyncQuerySchedulerForLocalTest extends OpenSearchAsyncQueryScheduler {
@Override
public boolean hasAccessToSchedulerIndex() {
return true;
}
}

public static class AsyncQuerySchedulerBuilderForLocalTest extends AsyncQuerySchedulerBuilder {
private OpenSearchAsyncQueryScheduler mockScheduler;
private Boolean mockHasAccess;

public AsyncQuerySchedulerBuilderForLocalTest(OpenSearchAsyncQueryScheduler mockScheduler, Boolean mockHasAccess) {
this.mockScheduler = mockScheduler;
this.mockHasAccess = mockHasAccess;
}

@Override
protected OpenSearchAsyncQueryScheduler createOpenSearchAsyncQueryScheduler(FlintOptions options) {
return mockScheduler != null ? mockScheduler : super.createOpenSearchAsyncQueryScheduler(options);
}

@Override
protected boolean hasAccessToSchedulerIndex(OpenSearchAsyncQueryScheduler scheduler) throws IOException {
return mockHasAccess != null ? mockHasAccess : super.hasAccessToSchedulerIndex(scheduler);
}

public static AsyncQueryScheduler build(OpenSearchAsyncQueryScheduler asyncQueryScheduler, Boolean hasAccess, SparkSession sparkSession, FlintOptions options) throws IOException {
return new AsyncQuerySchedulerBuilderForLocalTest(asyncQueryScheduler, hasAccess).doBuild(sparkSession, options);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.opensearch.flint.spark.FlintSparkExtensions

import org.apache.spark.sql.catalyst.expressions.CodegenObjectFactoryMode
import org.apache.spark.sql.catalyst.optimizer.ConvertToLocalRelation
import org.apache.spark.sql.flint.config.FlintConfigEntry
import org.apache.spark.sql.flint.config.{FlintConfigEntry, FlintSparkConf}
import org.apache.spark.sql.flint.config.FlintSparkConf.HYBRID_SCAN_ENABLED
import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.test.SharedSparkSession
Expand All @@ -26,6 +26,10 @@ trait FlintSuite extends SharedSparkSession {
// ConstantPropagation etc.
.set(SQLConf.OPTIMIZER_EXCLUDED_RULES.key, ConvertToLocalRelation.ruleName)
.set("spark.sql.extensions", classOf[FlintSparkExtensions].getName)
// Override scheduler class for unit testing
.set(
FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key,
"org.opensearch.flint.core.scheduler.AsyncQuerySchedulerBuilderTest$AsyncQuerySchedulerForLocalTest")
conf
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ import org.scalatestplus.mockito.MockitoSugar.mock

import org.apache.spark.{FlintSuite, SparkConf}
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest

Expand All @@ -49,6 +50,8 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit

override def beforeAll(): Unit = {
super.beforeAll()
// Revoke override in FlintSuite on IT
conf.unsetConf(FlintSparkConf.CUSTOM_FLINT_SCHEDULER_CLASS.key)

// Replace executor to avoid impact on IT.
// TODO: Currently no IT test scheduler so no need to restore it back.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import scala.jdk.CollectionConverters.mapAsJavaMapConverter

import com.stephenn.scalatest.jsonassert.JsonMatchers.matchJson
import org.json4s.native.JsonMethods._
import org.opensearch.OpenSearchException
import org.opensearch.action.get.GetRequest
import org.opensearch.client.RequestOptions
import org.opensearch.flint.core.FlintOptions
Expand Down Expand Up @@ -207,13 +206,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {

val indexInitial = flint.describeIndex(testIndex).get
indexInitial.options.refreshInterval() shouldBe Some("4 Minute")
the[OpenSearchException] thrownBy {
val client =
OpenSearchClientUtils.createClient(new FlintOptions(openSearchOptions.asJava))
client.get(
new GetRequest(OpenSearchAsyncQueryScheduler.SCHEDULER_INDEX_NAME, testIndex),
RequestOptions.DEFAULT)
}
indexInitial.options.isExternalSchedulerEnabled() shouldBe false

// Update Flint index to change refresh interval
val updatedIndex = flint
Expand All @@ -228,6 +221,7 @@ class FlintSparkUpdateIndexITSuite extends FlintSparkSuite {
val indexFinal = flint.describeIndex(testIndex).get
indexFinal.options.autoRefresh() shouldBe true
indexFinal.options.refreshInterval() shouldBe Some("5 Minutes")
indexFinal.options.isExternalSchedulerEnabled() shouldBe true
indexFinal.options.checkpointLocation() shouldBe Some(checkpointDir.getAbsolutePath)

// Verify scheduler index is updated
Expand Down
Loading