Skip to content

Commit

Permalink
Merge branch 'main' into feature-support-sigv4a-signer
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger authored Apr 27, 2024
2 parents 03f7210 + 222e473 commit b86b5e7
Show file tree
Hide file tree
Showing 45 changed files with 972 additions and 472 deletions.
7 changes: 4 additions & 3 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,8 @@ Version compatibility:
|---------------|-------------|---------------|---------------|------------|
| 0.1.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.2.0 | 11+ | 3.3.1 | 2.12.14 | 2.6+ |
| 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.6+ |
| 0.3.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |
| 0.4.0 | 11+ | 3.3.2 | 2.12.14 | 2.13+ |

## Flint Extension Usage

Expand Down Expand Up @@ -51,7 +52,7 @@ sbt clean standaloneCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.3.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark_2.12:0.4.0-SNAPSHOT"
```

### PPL Build & Run
Expand All @@ -63,7 +64,7 @@ sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.4.0-SNAPSHOT"
```

## Code of Conduct
Expand Down
8 changes: 7 additions & 1 deletion build.sbt
Original file line number Diff line number Diff line change
Expand Up @@ -7,10 +7,14 @@ import Dependencies._
lazy val scala212 = "2.12.14"
lazy val sparkVersion = "3.3.2"
lazy val opensearchVersion = "2.6.0"
lazy val icebergVersion = "1.5.0"

val scalaMinorVersion = scala212.split("\\.").take(2).mkString(".")
val sparkMinorVersion = sparkVersion.split("\\.").take(2).mkString(".")

ThisBuild / organization := "org.opensearch"

ThisBuild / version := "0.3.0-SNAPSHOT"
ThisBuild / version := "0.4.0-SNAPSHOT"

ThisBuild / scalaVersion := scala212

Expand Down Expand Up @@ -173,6 +177,8 @@ lazy val integtest = (project in file("integ-test"))
"org.scalatest" %% "scalatest" % "3.2.15" % "test",
"com.stephenn" %% "scalatest-json-jsonassert" % "0.2.5" % "test",
"org.testcontainers" % "testcontainers" % "1.18.0" % "test",
"org.apache.iceberg" %% s"iceberg-spark-runtime-$sparkMinorVersion" % icebergVersion % "test",
"org.scala-lang.modules" %% "scala-collection-compat" % "2.11.0" % "test",
// add opensearch-java client to get node stats
"org.opensearch.client" % "opensearch-java" % "2.6.0" % "test"
exclude ("com.fasterxml.jackson.core", "jackson-databind")),
Expand Down
4 changes: 2 additions & 2 deletions docs/PPL-on-Spark.md
Original file line number Diff line number Diff line change
Expand Up @@ -34,7 +34,7 @@ sbt clean sparkPPLCosmetic/publishM2
```
then add org.opensearch:opensearch-spark_2.12 when run spark application, for example,
```
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT"
bin/spark-shell --packages "org.opensearch:opensearch-spark-ppl_2.12:0.4.0-SNAPSHOT"
```

### PPL Extension Usage
Expand All @@ -46,7 +46,7 @@ spark-sql --conf "spark.sql.extensions=org.opensearch.flint.spark.FlintPPLSparkE
```

### Running With both Flint & PPL Extensions
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.3.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.3.0-SNAPSHOT`) to the cluster's
In order to make use of both flint and ppl extension, one can simply add both jars (`org.opensearch:opensearch-spark-ppl_2.12:0.4.0-SNAPSHOT`,`org.opensearch:opensearch-spark_2.12:0.4.0-SNAPSHOT`) to the cluster's
classpath.

Next need to configure both extensions :
Expand Down
6 changes: 3 additions & 3 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ Currently, Flint metadata is only static configuration without version control a

```json
{
"version": "0.3.0",
"version": "0.4.0",
"name": "...",
"kind": "skipping",
"source": "...",
Expand Down Expand Up @@ -666,7 +666,7 @@ For now, only single or conjunct conditions (conditions connected by AND) in WHE
### AWS EMR Spark Integration - Using execution role
Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJavaSDK/latest/javadoc/com/amazonaws/auth/DefaultAWSCredentialsProviderChain.html). When running in EMR Spark, Flint use executionRole credentials
```
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT \
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.4.0-SNAPSHOT \
--conf spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
Expand Down Expand Up @@ -708,7 +708,7 @@ Flint use [DefaultAWSCredentialsProviderChain](https://docs.aws.amazon.com/AWSJa
```
3. Set the spark.datasource.flint.customAWSCredentialsProvider property with value as com.amazonaws.emr.AssumeRoleAWSCredentialsProvider. Set the environment variable ASSUME_ROLE_CREDENTIALS_ROLE_ARN with the ARN value of CrossAccountRoleB.
```
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.3.0-SNAPSHOT \
--conf spark.jars.packages=org.opensearch:opensearch-spark-standalone_2.12:0.4.0-SNAPSHOT \
--conf spark.jars.repositories=https://aws.oss.sonatype.org/content/repositories/snapshots \
--conf spark.emr-serverless.driverEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
--conf spark.executorEnv.JAVA_HOME=/usr/lib/jvm/java-17-amazon-corretto.x86_64 \
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ object FlintVersion {
val V_0_1_0: FlintVersion = FlintVersion("0.1.0")
val V_0_2_0: FlintVersion = FlintVersion("0.2.0")
val V_0_3_0: FlintVersion = FlintVersion("0.3.0")
val V_0_4_0: FlintVersion = FlintVersion("0.4.0")

def current(): FlintVersion = V_0_3_0
def current(): FlintVersion = V_0_4_0
}
Original file line number Diff line number Diff line change
Expand Up @@ -407,9 +407,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
case (true, false) => AUTO
case (false, false) => FULL
case (false, true) => INCREMENTAL
case (true, true) =>
throw new IllegalArgumentException(
"auto_refresh and incremental_refresh options cannot both be true")
}

// validate allowed options depending on refresh mode
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark
import scala.collection.JavaConverters.mapAsJavaMapConverter

import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh

import org.apache.spark.sql.catalog.Column
import org.apache.spark.sql.catalyst.util.CharVarcharUtils
Expand Down Expand Up @@ -59,7 +60,7 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
* ignore existing index
*/
def create(ignoreIfExists: Boolean = false): Unit =
flint.createIndex(buildIndex(), ignoreIfExists)
flint.createIndex(validateIndex(buildIndex()), ignoreIfExists)

/**
* Copy Flint index with updated options.
Expand All @@ -80,7 +81,24 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
val updatedMetadata = index
.metadata()
.copy(options = updatedOptions.options.mapValues(_.asInstanceOf[AnyRef]).asJava)
FlintSparkIndexFactory.create(updatedMetadata).get
validateIndex(FlintSparkIndexFactory.create(updatedMetadata).get)
}

/**
* Pre-validate index to ensure its validity. By default, this method validates index options by
* delegating to specific index refresh (index options are mostly serving index refresh).
* Subclasses can extend this method to include additional validation logic.
*
* @param index
* Flint index to be validated
* @return
* the index or exception occurred if validation failed
*/
protected def validateIndex(index: FlintSparkIndex): FlintSparkIndex = {
FlintSparkIndexRefresh
.create(index.name(), index) // TODO: remove first argument?
.validate(flint.spark)
index
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

import java.io.IOException

import org.apache.hadoop.fs.Path
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex

import org.apache.spark.internal.Logging
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.catalyst.analysis.UnresolvedRelation
import org.apache.spark.sql.execution.command.DDLUtils
import org.apache.spark.sql.execution.streaming.CheckpointFileManager
import org.apache.spark.sql.flint.{loadTable, parseTableName, qualifyTableName}

/**
* Flint Spark validation helper.
*/
trait FlintSparkValidationHelper extends Logging {

/**
* Determines whether the source table(s) for a given Flint index are supported.
*
* @param spark
* Spark session
* @param index
* Flint index
* @return
* true if all non Hive, otherwise false
*/
def isTableProviderSupported(spark: SparkSession, index: FlintSparkIndex): Boolean = {
// Extract source table name (possibly more than one for MV query)
val tableNames = index match {
case skipping: FlintSparkSkippingIndex => Seq(skipping.tableName)
case covering: FlintSparkCoveringIndex => Seq(covering.tableName)
case mv: FlintSparkMaterializedView =>
spark.sessionState.sqlParser
.parsePlan(mv.query)
.collect { case relation: UnresolvedRelation =>
qualifyTableName(spark, relation.tableName)
}
}

// Validate if any source table is not supported (currently Hive only)
tableNames.exists { tableName =>
val (catalog, ident) = parseTableName(spark, tableName)
val table = loadTable(catalog, ident).get

// TODO: add allowed table provider list
DDLUtils.isHiveTable(Option(table.properties().get("provider")))
}
}

/**
* Checks whether a specified checkpoint location is accessible. Accessibility, in this context,
* means that the folder exists and the current Spark session has the necessary permissions to
* access it.
*
* @param spark
* Spark session
* @param checkpointLocation
* checkpoint location
* @return
* true if accessible, otherwise false
*/
def isCheckpointLocationAccessible(spark: SparkSession, checkpointLocation: String): Boolean = {
try {
val checkpointManager =
CheckpointFileManager.create(
new Path(checkpointLocation),
spark.sessionState.newHadoopConf())

// The primary intent here is to catch any exceptions during the accessibility check.
// The actual result is ignored, as Spark can create any necessary sub-folders
// when the streaming job starts.
checkpointManager.exists(new Path(checkpointLocation))
true
} catch {
case e: IOException =>
logWarning(s"Failed to check if checkpoint location $checkpointLocation exists", e)
false
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@

package org.opensearch.flint.spark.refresh

import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions}
import java.util.Collections

import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}

Expand All @@ -23,10 +25,41 @@ import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}
* @param index
* Flint index
*/
class AutoIndexRefresh(indexName: String, index: FlintSparkIndex) extends FlintSparkIndexRefresh {
class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
extends FlintSparkIndexRefresh
with FlintSparkValidationHelper {

override def refreshMode: RefreshMode = AUTO

override def validate(spark: SparkSession): Unit = {
// Incremental refresh cannot enabled at the same time
val options = index.options
require(
!options.incrementalRefresh(),
"Incremental refresh cannot be enabled if auto refresh is enabled")

// Hive table doesn't support auto refresh
require(
!isTableProviderSupported(spark, index),
"Index auto refresh doesn't support Hive table")

// Checkpoint location is required if mandatory option set
val flintSparkConf = new FlintSparkConf(Collections.emptyMap[String, String])
val checkpointLocation = options.checkpointLocation()
if (flintSparkConf.isCheckpointMandatory) {
require(
checkpointLocation.isDefined,
s"Checkpoint location is required if ${CHECKPOINT_MANDATORY.key} option enabled")
}

// Checkpoint location must be accessible
if (checkpointLocation.isDefined) {
require(
isCheckpointLocationAccessible(spark, checkpointLocation.get),
s"No permission to access the checkpoint location ${checkpointLocation.get}")
}
}

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
val options = index.options
val tableName = index.metadata().source
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,20 @@ trait FlintSparkIndexRefresh extends Logging {
*/
def refreshMode: RefreshMode

/**
* Validates the current index refresh settings before the actual execution begins. This method
* checks for the integrity of the index refresh configurations and ensures that all options set
* for the current refresh mode are valid. This preemptive validation helps in identifying
* configuration issues before the refresh operation is initiated, minimizing runtime errors and
* potential inconsistencies.
*
* @param spark
* Spark session
* @throws IllegalArgumentException
* if any invalid or inapplicable config identified
*/
def validate(spark: SparkSession): Unit

/**
* Start refreshing the index.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,11 @@ class FullIndexRefresh(

override def refreshMode: RefreshMode = FULL

override def validate(spark: SparkSession): Unit = {
// Full refresh validates nothing for now, including Hive table validation.
// This allows users to continue using their existing Hive table with full refresh only.
}

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
logInfo(s"Start refreshing index $indexName in full mode")
index
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@

package org.opensearch.flint.spark.refresh

import org.opensearch.flint.spark.FlintSparkIndex
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkValidationHelper}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{INCREMENTAL, RefreshMode}

import org.apache.spark.sql.SparkSession
Expand All @@ -20,18 +20,31 @@ import org.apache.spark.sql.flint.config.FlintSparkConf
* Flint index
*/
class IncrementalIndexRefresh(indexName: String, index: FlintSparkIndex)
extends FlintSparkIndexRefresh {
extends FlintSparkIndexRefresh
with FlintSparkValidationHelper {

override def refreshMode: RefreshMode = INCREMENTAL

override def validate(spark: SparkSession): Unit = {
// Non-Hive table is required for incremental refresh
require(
!isTableProviderSupported(spark, index),
"Index incremental refresh doesn't support Hive table")

// Checkpoint location is required regardless of mandatory option
val options = index.options
val checkpointLocation = options.checkpointLocation()
require(
options.checkpointLocation().nonEmpty,
"Checkpoint location is required by incremental refresh")
require(
isCheckpointLocationAccessible(spark, checkpointLocation.get),
s"No permission to access the checkpoint location ${checkpointLocation.get}")
}

override def start(spark: SparkSession, flintSparkConf: FlintSparkConf): Option[String] = {
logInfo(s"Start refreshing index $indexName in incremental mode")

// TODO: move this to validation method together in future
if (index.options.checkpointLocation().isEmpty) {
throw new IllegalStateException("Checkpoint location is required by incremental refresh")
}

// Reuse auto refresh which uses AvailableNow trigger and will stop once complete
val jobId =
new AutoIndexRefresh(indexName, index)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
super.beforeAll()
// initialized after the container is started
osClient = new OSClient(new FlintOptions(openSearchOptions.asJava))
createPartitionedMultiRowTable(testTable)
createPartitionedMultiRowAddressTable(testTable)
}

protected override def afterEach(): Unit = {
Expand Down
Loading

0 comments on commit b86b5e7

Please sign in to comment.