Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix Session state bug and improve Query Efficiency in REPL #245

Merged
merged 3 commits into from
Feb 9, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 6 additions & 3 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,9 @@ lazy val commonSettings = Seq(
testScalastyle := (Test / scalastyle).toTask("").value,
Test / test := ((Test / test) dependsOn testScalastyle).value)

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication)
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand Down Expand Up @@ -159,7 +160,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test" )
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.settings(
commonSettings,
name := "integ-test",
Expand All @@ -175,7 +176,9 @@ lazy val integtest = (project in file("integ-test"))
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value))
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value,
(sparkSqlApplication / assembly).value
))

lazy val standaloneCosmetic = project
.settings(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,8 @@ public class FlintOptions implements Serializable {

public static final int DEFAULT_SOCKET_TIMEOUT_MILLIS = 60000;

public static final int DEFAULT_INACTIVITY_LIMIT_MILLIS = 10 * 60 * 1000;

public FlintOptions(Map<String, String> options) {
this.options = options;
this.retryOptions = new FlintRetryOptions(options);
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.ClearScrollRequest;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.common.Strings;
import org.opensearch.common.unit.TimeValue;
import org.opensearch.flint.core.FlintOptions;
import org.opensearch.flint.core.IRestHighLevelClient;
import org.opensearch.search.builder.SearchSourceBuilder;

import java.io.IOException;
import java.util.Optional;
import java.util.logging.Level;
import java.util.logging.Logger;

/**
* {@link OpenSearchReader} using search. https://opensearch.org/docs/latest/api-reference/search/
*/
public class OpenSearchQueryReader extends OpenSearchReader {

private static final Logger LOG = Logger.getLogger(OpenSearchQueryReader.class.getName());

public OpenSearchQueryReader(IRestHighLevelClient client, String indexName, SearchSourceBuilder searchSourceBuilder) {
super(client, new SearchRequest().indices(indexName).source(searchSourceBuilder));
}

/**
* search.
*/
Optional<SearchResponse> search(SearchRequest request) throws IOException {
return Optional.of(client.search(request, RequestOptions.DEFAULT));
}

/**
* nothing to clean
*/
void clean() throws IOException {}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core.storage;

import org.opensearch.OpenSearchStatusException;
import org.opensearch.action.search.SearchRequest;
import org.opensearch.action.search.SearchResponse;
import org.opensearch.flint.core.IRestHighLevelClient;
Expand Down Expand Up @@ -48,6 +49,13 @@ public OpenSearchReader(IRestHighLevelClient client, SearchRequest searchRequest
iterator = searchHits.iterator();
}
return iterator.hasNext();
} catch (OpenSearchStatusException e) {
// e.g., org.opensearch.OpenSearchStatusException: OpenSearch exception [type=index_not_found_exception, reason=no such index [query_results2]]
if (e.getMessage() != null && (e.getMessage().contains("index_not_found_exception"))) {
return false;
} else {
throw e;
}
} catch (IOException e) {
// todo. log error.
throw new RuntimeException(e);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,20 @@ public void testGetDimensionsFromSystemEnv() throws NoSuchFieldException, Illega
Field field = classOfMap.getDeclaredField("m");
field.setAccessible(true);
Map<String, String> writeableEnvironmentVariables = (Map<String, String>)field.get(System.getenv());
writeableEnvironmentVariables.put("TEST_VAR", "dummy1");
writeableEnvironmentVariables.put("SERVERLESS_EMR_JOB_ID", "dummy2");
Dimension result1 = DimensionUtils.constructDimension("TEST_VAR", parts);
assertEquals("TEST_VAR", result1.getName());
assertEquals("dummy1", result1.getValue());
Dimension result2 = DimensionUtils.constructDimension("jobId", parts);
assertEquals("jobId", result2.getName());
assertEquals("dummy2", result2.getValue());
try {
writeableEnvironmentVariables.put("TEST_VAR", "dummy1");
writeableEnvironmentVariables.put("SERVERLESS_EMR_JOB_ID", "dummy2");
Dimension result1 = DimensionUtils.constructDimension("TEST_VAR", parts);
assertEquals("TEST_VAR", result1.getName());
assertEquals("dummy1", result1.getValue());
Dimension result2 = DimensionUtils.constructDimension("jobId", parts);
assertEquals("jobId", result2.getName());
assertEquals("dummy2", result2.getValue());
} finally {
// since system environment is shared by other tests. Make sure to remove them before exiting.
writeableEnvironmentVariables.remove("SERVERLESS_EMR_JOB_ID");
writeableEnvironmentVariables.remove("TEST_VAR");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,30 @@ object FlintSparkConf {
.datasourceOption()
.doc("socket duration in milliseconds")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_SOCKET_TIMEOUT_MILLIS))

val DATA_SOURCE_NAME =
FlintConfig(s"spark.flint.datasource.name")
.doc("data source name")
.createOptional()
val JOB_TYPE =
FlintConfig(s"spark.flint.job.type")
.doc("Flint job type. Including interactive and streaming")
.createWithDefault("interactive")
val SESSION_ID =
FlintConfig(s"spark.flint.job.sessionId")
.doc("Flint session id")
.createOptional()
val REQUEST_INDEX =
FlintConfig(s"spark.flint.job.requestIndex")
.doc("Request index")
.createOptional()
val EXCLUDE_JOB_IDS =
FlintConfig(s"spark.flint.deployment.excludeJobs")
.doc("Exclude job ids")
.createOptional()
val REPL_INACTIVITY_TIMEOUT_MILLIS =
FlintConfig(s"spark.flint.job.inactivityLimitMillis")
.doc("inactivity timeout")
.createWithDefault(String.valueOf(FlintOptions.DEFAULT_INACTIVITY_LIMIT_MILLIS))
}

/**
Expand Down Expand Up @@ -196,11 +219,18 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable
CUSTOM_AWS_CREDENTIALS_PROVIDER,
USERNAME,
PASSWORD,
SOCKET_TIMEOUT_MILLIS)
SOCKET_TIMEOUT_MILLIS,
JOB_TYPE,
REPL_INACTIVITY_TIMEOUT_MILLIS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.toMap

val optionsWithoutDefault = Seq(RETRYABLE_EXCEPTION_CLASS_NAMES)
val optionsWithoutDefault = Seq(
RETRYABLE_EXCEPTION_CLASS_NAMES,
DATA_SOURCE_NAME,
SESSION_ID,
REQUEST_INDEX,
EXCLUDE_JOB_IDS)
.map(conf => (conf.optionKey, conf.readFrom(reader)))
.flatMap {
case (_, None) => None
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,14 @@ class FlintInstance(
val lastUpdateTime: Long,
val jobStartTime: Long = 0,
val excludedJobIds: Seq[String] = Seq.empty[String],
val error: Option[String] = None) {}
val error: Option[String] = None) {
override def toString: String = {
val excludedJobIdsStr = excludedJobIds.mkString("[", ", ", "]")
val errorStr = error.getOrElse("None")
s"FlintInstance(applicationId=$applicationId, jobId=$jobId, sessionId=$sessionId, state=$state, " +
s"lastUpdateTime=$lastUpdateTime, jobStartTime=$jobStartTime, excludedJobIds=$excludedJobIdsStr, error=$errorStr)"
}
}

object FlintInstance {

Expand Down
Loading
Loading