Skip to content

Commit

Permalink
Fix Session state bug and improve Query Efficiency in REPL (#245) (#250)
Browse files Browse the repository at this point in the history
* Fix Session state bug and improve Query Efficiency in REPL

This PR introduces a bug fix and enhancements to FlintREPL's session management and optimizes query execution methods. It addresses a specific issue where marking a session as 'dead' inadvertently triggered the creation of a new, unnecessary session. This behavior resulted in the new session entering a spin-wait state, leading to duplicate jobs.

The improvements include:
- **Introduction of `earlyExitFlag`**: A new flag, `earlyExitFlag`, has been introduced and is set to `true` under two conditions: when a job is excluded or when it is not associated with the current session's job run ID. This flag is evaluated in the shutdown hook to determine whether the session state should be marked as 'dead'. This change effectively prevents the unintended creation of duplicate sessions by the SQL plugin, ensuring resources are utilized more efficiently.
- **Query Method Optimization**: The method for executing queries has been shifted from scrolling to search, eliminating the need for creating unnecessary scroll contexts. This adjustment enhances the performance and efficiency of query operations.
- **Reversion of Previous Commit**: The PR reverts a previous change (be82024) following the resolution of the related issue in the SQL plugin (opensearch-project/sql#2436), further streamlining the operation and maintenance of the system.

**Testing**:
1. Integration tests were added to cover both REPL and streaming job functionalities, ensuring the robustness of the fixes.
2. Manual testing was conducted to validate the bug fix.



* added Java doc



* fix IT by restore env variable change



---------

Signed-off-by: Kaituo Li <[email protected]>
  • Loading branch information
kaituo authored Feb 12, 2024
1 parent bca6681 commit 7e00cc9
Show file tree
Hide file tree
Showing 22 changed files with 1,278 additions and 76 deletions.
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ lazy val commonSettings = Seq(
testScalastyle := (Test / scalastyle).toTask("").value,
Test / test := ((Test / test) dependsOn testScalastyle).value)

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication)
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand Down Expand Up @@ -158,7 +159,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test" )
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.settings(
commonSettings,
name := "integ-test",
Expand All @@ -174,7 +175,9 @@ lazy val integtest = (project in file("integ-test"))
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value))
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value,
(sparkSqlApplication / assembly).value
))

lazy val standaloneCosmetic = project
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class FlintOptions implements Serializable {

public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000;

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* {@link OpenSearchReader} using search. https://opensearch.org/docs/latest/api-reference/search/
*/
public class OpenSearchQueryReader extends OpenSearchReader {

private static final Logger LOG = Logger.getLogger(OpenSearchQueryReader.class.getName());

public OpenSearchQueryReader(RestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder));
}

/**
* search.
*/
Optional<SearchResponse> search(SearchRequest request) throws IOException {
return Optional.of(client.search(request, RequestOptions.DEFAULT));
}

/**
* nothing to clean
*/
void clean() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.client.RestHighLevelClient;
Expand Down Expand Up @@ -48,6 +49,13 @@ public OpenSearchReader(RestHighLevelClient client, SearchRequest searchRequest)
iterator = searchHits.iterator();
}
return iterator.hasNext();
} catch (OpenSearchStatusException e) {
// e.g., org.opensearch.OpenSearchStatusException: OpenSearch exception [type=index_not_found_exception, reason=no such index [query_results2]]
if (e.getMessage() != null && (e.getMessage().contains("index_not_found_exception"))) {
return false;
} else {
throw e;
}
} catch (IOException e) {
// todo. log error.
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,30 @@ object FlintSparkConf {
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))

val DATA_SOURCE_NAME =
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
.createOptional()
val JOB_TYPE =
FlintConfig(s"spark.flint.job.type")
.doc("Flint job type. Including interactive and streaming")
.createWithDefault("interactive")
val SESSION_ID =
FlintConfig(s"spark.flint.job.sessionId")
.doc("Flint session id")
.createOptional()
val REQUEST_INDEX =
FlintConfig(s"spark.flint.job.requestIndex")
.doc("Request index")
.createOptional()
val EXCLUDE_JOB_IDS =
FlintConfig(s"spark.flint.deployment.excludeJobs")
.doc("Exclude job ids")
.createOptional()
val REPL_INACTIVITY_TIMEOUT_MILLIS =
FlintConfig(s"spark.flint.job.inactivityLimitMillis")
.doc("inactivity timeout")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_INACTIVITY_LIMIT_MILLIS))
}

/**
Expand Down Expand Up @@ -196,11 +219,18 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD,
SOCKET_TIMEOUT_MILLIS)
SOCKET_TIMEOUT_MILLIS,
JOB_TYPE,
REPL_INACTIVITY_TIMEOUT_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

val optionsWithoutDefault = Seq(RETRYABLE_EXCEPTION_CLASS_NAMES)
val optionsWithoutDefault = Seq(
RETRYABLE_EXCEPTION_CLASS_NAMES,
DATA_SOURCE_NAME,
SESSION_ID,
REQUEST_INDEX,
EXCLUDE_JOB_IDS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.flatMap {
case (_, None) => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@ class FlintInstance(
val lastUpdateTime: Long,
val jobStartTime: Long = 0,
val excludedJobIds: Seq[String] = Seq.empty[String],
val error: Option[String] = None) {}
val error: Option[String] = None) {
override def toString: String = {
val excludedJobIdsStr = excludedJobIds.mkString("[", ", ", "]")
val errorStr = error.getOrElse("None")
s"FlintInstance(applicationId=$applicationId, jobId=$jobId, sessionId=$sessionId, state=$state, " +
s"lastUpdateTime=$lastUpdateTime, jobStartTime=$jobStartTime, excludedJobIds=$excludedJobIdsStr, error=$errorStr)"
}
}

object FlintInstance {

Expand Down
Loading

0 comments on commit 7e00cc9

Please sign in to comment.