Skip to content

Commit

Permalink
Refactor FlintJob to support WP
Browse files Browse the repository at this point in the history
  • Loading branch information
Shri Saran Raj N committed Nov 21, 2024
1 parent b831d1e commit 8a1262f
Show file tree
Hide file tree
Showing 7 changed files with 512 additions and 37 deletions.
5 changes: 3 additions & 2 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ val sparkMinorVersion = sparkVersion.split("\\.").take(2).mkString(".")

ThisBuild / organization := "org.opensearch"

ThisBuild / version := "0.6.0-SNAPSHOT"
ThisBuild / version := "0.7.0-SNAPSHOT"

ThisBuild / scalaVersion := scala212

Expand Down Expand Up @@ -269,6 +269,7 @@ lazy val integtest = (project in file("integ-test"))
inConfig(IntegrationTest)(Defaults.testSettings ++ Seq(
IntegrationTest / javaSource := baseDirectory.value / "src/integration/java",
IntegrationTest / scalaSource := baseDirectory.value / "src/integration/scala",
IntegrationTest / resourceDirectory := baseDirectory.value / "src/integration/resources",
IntegrationTest / parallelExecution := false,
IntegrationTest / fork := true,
)),
Expand Down Expand Up @@ -378,4 +379,4 @@ lazy val releaseSettings = Seq(
<scm>
<url>git@github.com:opensearch-project/opensearch-spark.git</url>
<connection>scm:git:git@github.com:opensearch-project/opensearch-spark.git</connection>
</scm>)
</scm>)
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.core.JsonProcessingException;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.message.Message;

import java.util.HashMap;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -135,10 +135,30 @@ public final class MetricConstants {
*/
public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count";

public static final String STREAMING_EXECUTION_FAILED_METRIC = "streaming.execution.failed.count";

public static final String STREAMING_RESULT_WRITER_FAILED_METRIC = "streaming.writer.failed.count";

/**
* Metric for tracking the latency of query execution (start to complete query execution) excluding result write.
*/
public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime";
public static final String QUERY_EXECUTION_TIME_METRIC = "streaming.query.execution.processingTime";

public static final String RESULT_WRITER_TIME_METRIC = "streaming.result.writer.processingTime";

public static final String QUERY_TOTAL_TIME_METRIC = "streaming.query.total.processingTime";

// Interactive queries
public static final String STATEMENT_QUERY_EXECUTION_TIME_METRIC = "statement.query.execution.processingTime";

public static final String STATEMENT_RESULT_WRITER_TIME_METRIC = "statement.result.writer.processingTime";

public static final String STATEMENT_QUERY_TOTAL_TIME_METRIC = "statement.query.total.processingTime";

public static final String STATEMENT_EXECUTION_FAILED_METRIC = "statement.execution.failed.count";

public static final String STATEMENT_RESULT_WRITER_FAILED_METRIC = "statement.writer.failed.count";


/**
* Metric for query count of each query type (DROP/VACUUM/ALTER/REFRESH/CREATE INDEX)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,10 @@
import org.apache.http.client.CredentialsProvider;
import org.apache.http.impl.client.BasicCredentialsProvider;
import org.apache.http.impl.nio.client.HttpAsyncClientBuilder;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.apache.spark.SparkContext;
import org.apache.spark.sql.SparkSession;
import org.opensearch.client.RestClient;
import org.opensearch.client.RestClientBuilder;
import org.opensearch.client.RestHighLevelClient;
Expand All @@ -44,6 +48,7 @@ public class OpenSearchClientUtils {
*/
private final static Set<Character> INVALID_INDEX_NAME_CHARS =
Set.of(' ', ',', ':', '"', '+', '/', '\\', '|', '?', '#', '>', '<');
private static final Logger log = LogManager.getLogger(OpenSearchClientUtils.class);

/**
* Used in IT.
Expand Down Expand Up @@ -115,6 +120,7 @@ private static RestClientBuilder configureSigV4Auth(RestClientBuilder restClient
final AtomicReference<AWSCredentialsProvider> customAWSCredentialsProvider =
new AtomicReference<>(new DefaultAWSCredentialsProviderChain());
String customProviderClass = options.getCustomAwsCredentialsProvider();

if (!Strings.isNullOrEmpty(customProviderClass)) {
instantiateProvider(customProviderClass, customAWSCredentialsProvider);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.http.FlintRetryOptions
import org.opensearch.flint.core.logging.CustomLogging

import org.apache.spark.internal.config.ConfigReader
import org.apache.spark.network.util.ByteUnit
Expand Down Expand Up @@ -266,6 +267,10 @@ object FlintSparkConf {
val CUSTOM_QUERY_RESULT_WRITER =
FlintConfig("spark.flint.job.customQueryResultWriter")
.createOptional()
val WARMPOOL_ENABLED =
FlintConfig(s"spark.flint.job.warmpoolEnabled")
.createWithDefault("false")

}

/**
Expand Down
Loading

0 comments on commit 8a1262f

Please sign in to comment.