Skip to content

Commit

Permalink
Merge branch 'main' into implement-bloom-filter-query-rewrite-no-push…
Browse files Browse the repository at this point in the history
…down
  • Loading branch information
dai-chen committed Feb 29, 2024
2 parents 1cc7cf1 + faa0d07 commit c22c7d1
Show file tree
Hide file tree
Showing 38 changed files with 1,896 additions and 215 deletions.
2 changes: 2 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -19,3 +19,5 @@ logs/
*.class
*.log
*.zip

gen/
5 changes: 3 additions & 2 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ Version compatibility:
|---------------|-------------|---------------|---------------|------------|
| 0.1.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.6+ |

## Flint Extension Usage

Expand Down Expand Up @@ -50,7 +51,7 @@ sbt clean standaloneCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.2.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.3.0-SNAPSHOT"
```

### PPL Build & Run
Expand All @@ -62,7 +63,7 @@ sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.2.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT"
```

## Code of Conduct
Expand Down
11 changes: 7 additions & 4 deletions build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ lazy val opensearchVersion = "2.6.0"

ThisBuild / organization := "org.opensearch"

ThisBuild / version := "0.2.0-SNAPSHOT"
ThisBuild / version := "0.3.0-SNAPSHOT"

ThisBuild / scalaVersion := scala212

Expand Down Expand Up @@ -42,8 +42,9 @@ lazy val commonSettings = Seq(
testScalastyle := (Test / scalastyle).toTask("").value,
Test / test := ((Test / test) dependsOn testScalastyle).value)

// running `scalafmtAll` includes all subprojects under root
lazy val root = (project in file("."))
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication)
.aggregate(flintCore, flintSparkIntegration, pplSparkIntegration, sparkSqlApplication, integtest)
.disablePlugins(AssemblyPlugin)
.settings(name := "flint", publish / skip := true)

Expand Down Expand Up @@ -159,7 +160,7 @@ lazy val flintSparkIntegration = (project in file("flint-spark-integration"))

// Test assembly package with integration test.
lazy val integtest = (project in file("integ-test"))
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test" )
.dependsOn(flintSparkIntegration % "test->test", pplSparkIntegration % "test->test", sparkSqlApplication % "test->test")
.settings(
commonSettings,
name := "integ-test",
Expand All @@ -175,7 +176,9 @@ lazy val integtest = (project in file("integ-test"))
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
libraryDependencies ++= deps(sparkVersion),
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value))
Test / fullClasspath ++= Seq((flintSparkIntegration / assembly).value, (pplSparkIntegration / assembly).value,
(sparkSqlApplication / assembly).value
))

lazy val standaloneCosmetic = project
.settings(
Expand Down
4 changes: 2 additions & 2 deletions docs/PPL-on-Spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.2.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT"
```

### PPL Extension Usage
Expand All @@ -46,7 +46,7 @@ spark-sql --conf "spark.sql.extensions=org.opensearch.flint.FlintPPLSparkExtensi
```

### Running With both Flint & PPL Extensions
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.2.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.2.0-SNAPSHOT`) to the cluster's
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.3.0-SNAPSHOT`) to the cluster's
classpath.

Next need to configure both extensions :
Expand Down
6 changes: 3 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Currently, Flint metadata is only static configuration without version control a

```json
{
"version": "0.2.0",
"version": "0.3.0",
"name": "...",
"kind": "skipping",
"source": "...",
Expand Down Expand Up @@ -570,7 +570,7 @@ For now, only single or conjunct conditions (conditions connected by AND) in WHE
### AWS EMR Spark Integration - Using execution role
Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). When running in EMR Spark, Flint use executionRole credentials
```
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.2.0-SNAPSHOT \
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT \
--conf spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
Expand Down Expand Up @@ -612,7 +612,7 @@ Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJa
```
3. Set the spark.datasource.flint.customAWSCredentialsProvider property with value as com.amazonaws.emr.AssumeRoleAWSCredentialsProvider. Set the environment variable ASSUME_ROLE_CREDENTIALS_ROLE_ARN with the ARN value of CrossAccountRoleB.
```
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.2.0-SNAPSHOT \
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT \
--conf spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@

package org.opensearch.flint.core;

import org.opensearch.OpenSearchException;
import org.opensearch.action.bulk.BulkRequest;
import org.opensearch.action.bulk.BulkResponse;
import org.opensearch.action.delete.DeleteRequest;
Expand All @@ -26,6 +27,7 @@
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.action.admin.indices.delete.DeleteIndexRequest;
import org.opensearch.client.RequestOptions;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.Closeable;
import java.io.IOException;
Expand All @@ -52,11 +54,62 @@ public interface IRestHighLevelClient extends Closeable {

IndexResponse index(IndexRequest indexRequest, RequestOptions options) throws IOException;

Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;
Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException;

SearchResponse search(SearchRequest searchRequest, RequestOptions options) throws IOException;

SearchResponse scroll(SearchScrollRequest searchScrollRequest, RequestOptions options) throws IOException;

DocWriteResponse update(UpdateRequest updateRequest, RequestOptions options) throws IOException;


/**
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
* This method constructs the metric name by appending ".200.count" to the provided metric name prefix.
* The metric name is then used to increment the counter, indicating a successful operation.
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success
*/
static void recordOperationSuccess(String metricNamePrefix) {
String successMetricName = metricNamePrefix + ".2xx.count";
MetricsUtil.incrementCounter(successMetricName);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
* it increments a metric specifically for that status code.
* Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx).
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure
* @param e the exception encountered during the operation, used to determine the type of failure
*/
static void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;

if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
MetricsUtil.incrementCounter(failureMetricName);
}

/**
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private static OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,13 @@
import org.opensearch.action.search.SearchResponse;
import org.opensearch.action.search.SearchScrollRequest;
import org.opensearch.action.update.UpdateRequest;
import org.opensearch.OpenSearchException;
import org.opensearch.action.update.UpdateResponse;
import org.opensearch.client.RequestOptions;
import org.opensearch.client.RestHighLevelClient;
import org.opensearch.client.indices.CreateIndexRequest;
import org.opensearch.client.indices.CreateIndexResponse;
import org.opensearch.client.indices.GetIndexRequest;
import org.opensearch.client.indices.GetIndexResponse;
import org.opensearch.flint.core.metrics.MetricsUtil;

import java.io.IOException;

Expand Down Expand Up @@ -91,7 +89,7 @@ public IndexResponse index(IndexRequest indexRequest, RequestOptions options) th
}

@Override
public Boolean isIndexExists(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
public Boolean doesIndexExist(GetIndexRequest getIndexRequest, RequestOptions options) throws IOException {
return execute(OS_READ_OP_METRIC_PREFIX, () -> client.indices().exists(getIndexRequest, options));
}

Expand Down Expand Up @@ -122,64 +120,14 @@ public UpdateResponse update(UpdateRequest updateRequest, RequestOptions options
private <T> T execute(String metricNamePrefix, IOCallable<T> operation) throws IOException {
try {
T result = operation.call();
recordOperationSuccess(metricNamePrefix);
IRestHighLevelClient.recordOperationSuccess(metricNamePrefix);
return result;
} catch (Exception e) {
recordOperationFailure(metricNamePrefix, e);
IRestHighLevelClient.recordOperationFailure(metricNamePrefix, e);
throw e;
}
}

/**
* Records the success of an OpenSearch operation by incrementing the corresponding metric counter.
* This method constructs the metric name by appending ".200.count" to the provided metric name prefix.
* The metric name is then used to increment the counter, indicating a successful operation.
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for success
*/
private void recordOperationSuccess(String metricNamePrefix) {
String successMetricName = metricNamePrefix + ".2xx.count";
MetricsUtil.incrementCounter(successMetricName);
}

/**
* Records the failure of an OpenSearch operation by incrementing the corresponding metric counter.
* If the exception is an OpenSearchException with a specific status code (e.g., 403),
* it increments a metric specifically for that status code.
* Otherwise, it increments a general failure metric counter based on the status code category (e.g., 4xx, 5xx).
*
* @param metricNamePrefix the prefix for the metric name which is used to construct the full metric name for failure
* @param e the exception encountered during the operation, used to determine the type of failure
*/
private void recordOperationFailure(String metricNamePrefix, Exception e) {
OpenSearchException openSearchException = extractOpenSearchException(e);
int statusCode = openSearchException != null ? openSearchException.status().getStatus() : 500;

if (statusCode == 403) {
String forbiddenErrorMetricName = metricNamePrefix + ".403.count";
MetricsUtil.incrementCounter(forbiddenErrorMetricName);
}

String failureMetricName = metricNamePrefix + "." + (statusCode / 100) + "xx.count";
MetricsUtil.incrementCounter(failureMetricName);
}

/**
* Extracts an OpenSearchException from the given Throwable.
* Checks if the Throwable is an instance of OpenSearchException or caused by one.
*
* @param ex the exception to be checked
* @return the extracted OpenSearchException, or null if not found
*/
private OpenSearchException extractOpenSearchException(Throwable ex) {
if (ex instanceof OpenSearchException) {
return (OpenSearchException) ex;
} else if (ex.getCause() instanceof OpenSearchException) {
return (OpenSearchException) ex.getCause();
}
return null;
}

/**
* Functional interface for operations that can throw IOException.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@
/**
* This class defines custom metric constants used for monitoring flint operations.
*/
public class MetricConstants {
public final class MetricConstants {

/**
* The prefix for all read-related metrics in OpenSearch.
Expand All @@ -21,4 +21,93 @@ public class MetricConstants {
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations.
*/
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";

/**
* Metric name for counting the errors encountered with Amazon S3 operations.
*/
public static final String S3_ERR_CNT_METRIC = "s3.error.count";

/**
* Metric name for counting the number of sessions currently running.
*/
public static final String REPL_RUNNING_METRIC = "session.running.count";

/**
* Metric name for counting the number of sessions that have failed.
*/
public static final String REPL_FAILED_METRIC = "session.failed.count";

/**
* Metric name for counting the number of sessions that have successfully completed.
*/
public static final String REPL_SUCCESS_METRIC = "session.success.count";

/**
* Metric name for tracking the processing time of sessions.
*/
public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime";

/**
* Prefix for metrics related to the request metadata read operations.
*/
public static final String REQUEST_METADATA_READ_METRIC_PREFIX = "request.metadata.read";

/**
* Prefix for metrics related to the request metadata write operations.
*/
public static final String REQUEST_METADATA_WRITE_METRIC_PREFIX = "request.metadata.write";

/**
* Metric name for counting failed heartbeat operations on request metadata.
*/
public static final String REQUEST_METADATA_HEARTBEAT_FAILED_METRIC = "request.metadata.heartbeat.failed.count";

/**
* Prefix for metrics related to the result metadata write operations.
*/
public static final String RESULT_METADATA_WRITE_METRIC_PREFIX = "result.metadata.write";

/**
* Metric name for counting the number of statements currently running.
*/
public static final String STATEMENT_RUNNING_METRIC = "statement.running.count";

/**
* Metric name for counting the number of statements that have failed.
*/
public static final String STATEMENT_FAILED_METRIC = "statement.failed.count";

/**
* Metric name for counting the number of statements that have successfully completed.
*/
public static final String STATEMENT_SUCCESS_METRIC = "statement.success.count";

/**
* Metric name for tracking the processing time of statements.
*/
public static final String STATEMENT_PROCESSING_TIME_METRIC = "statement.processingTime";

/**
* Metric for tracking the count of currently running streaming jobs.
*/
public static final String STREAMING_RUNNING_METRIC = "streaming.running.count";

/**
* Metric for tracking the count of streaming jobs that have failed.
*/
public static final String STREAMING_FAILED_METRIC = "streaming.failed.count";

/**
* Metric for tracking the count of streaming jobs that have completed successfully.
*/
public static final String STREAMING_SUCCESS_METRIC = "streaming.success.count";

/**
* Metric for tracking the count of failed heartbeat signals in streaming jobs.
*/
public static final String STREAMING_HEARTBEAT_FAILED_METRIC = "streaming.heartbeat.failed.count";

private MetricConstants() {
// Private constructor to prevent instantiation
}
}
Loading

0 comments on commit c22c7d1

Please sign in to comment.