Skip to content

Commit

Permalink
Add checkpoint mandatory option (#92)
Browse files Browse the repository at this point in the history
* Add checkpoint mandatory option and change IT

Signed-off-by: Chen Dai <[email protected]>

* Add doc and fix scalafmt

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Oct 25, 2023
1 parent 62c11df commit 022b974
Show file tree
Hide file tree
Showing 6 changed files with 55 additions and 5 deletions.
1 change: 1 addition & 0 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -345,6 +345,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i
- `spark.datasource.flint.read.scroll_size`: default value is 100.
- `spark.flint.optimizer.enabled`: default is true.
- `spark.flint.index.hybridscan.enabled`: default is false.
- `spark.flint.index.checkpoint.mandatory`: default is true.

#### Data Type Mapping

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,10 @@ object FlintSparkConf {
val HYBRID_SCAN_ENABLED = FlintConfig("spark.flint.index.hybridscan.enabled")
.doc("Enable hybrid scan to include latest source data not refreshed to index yet")
.createWithDefault("false")

val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory")
.doc("Checkpoint location for incremental refresh index will be mandatory if enabled")
.createWithDefault("true")
}

/**
Expand All @@ -137,6 +141,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean

def isCheckpointMandatory: Boolean = CHECKPOINT_MANDATORY.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession}
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger}

/**
Expand Down Expand Up @@ -247,7 +247,13 @@ class FlintSpark(val spark: SparkSession) {
}

def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = {
checkpointLocation.map(dataStream.option("checkpointLocation", _)).getOrElse(dataStream)
checkpointLocation match {
case Some(location) => dataStream.option("checkpointLocation", location)
case None if flintSparkConf.isCheckpointMandatory =>
throw new IllegalStateException(
s"Checkpoint location is mandatory for incremental refresh if ${CHECKPOINT_MANDATORY.key} enabled")
case _ => dataStream
}
}

def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,10 +15,11 @@ import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.must.Matchers.{defined, have}
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY

class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -104,6 +105,22 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
| """.stripMargin)
}

test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") {
setFlintSparkConf(CHECKPOINT_MANDATORY, "true")
try {
the[IllegalStateException] thrownBy {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WITH (auto_refresh = true)
| """.stripMargin)
} should have message
"Checkpoint location is mandatory for incremental refresh if spark.flint.index.checkpoint.mandatory enabled"
} finally {
setFlintSparkConf(CHECKPOINT_MANDATORY, "false")
}
}

test("create covering index with manual refresh") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.storage.FlintOpenSearchClient
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName
import org.scalatest.matchers.must.Matchers.defined
import org.scalatest.matchers.must.Matchers.{defined, have}
import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the}

import org.apache.spark.sql.Row
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY

class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {

Expand Down Expand Up @@ -109,6 +110,22 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite {
| """.stripMargin)
}

test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") {
setFlintSparkConf(CHECKPOINT_MANDATORY, "true")
try {
the[IllegalStateException] thrownBy {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
| ( year PARTITION )
| WITH (auto_refresh = true)
| """.stripMargin)
} should have message
"Checkpoint location is mandatory for incremental refresh if spark.flint.index.checkpoint.mandatory enabled"
} finally {
setFlintSparkConf(CHECKPOINT_MANDATORY, "false")
}
}

test("create skipping index with manual refresh") {
sql(s"""
| CREATE SKIPPING INDEX ON $testTable
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import org.opensearch.flint.OpenSearchSuite

import org.apache.spark.FlintSuite
import org.apache.spark.sql.QueryTest
import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}
import org.apache.spark.sql.streaming.StreamTest

/**
Expand All @@ -26,6 +26,9 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit
setFlintSparkConf(HOST_ENDPOINT, openSearchHost)
setFlintSparkConf(HOST_PORT, openSearchPort)
setFlintSparkConf(REFRESH_POLICY, "true")

// Disable mandatory checkpoint for test convenience
setFlintSparkConf(CHECKPOINT_MANDATORY, "false")
}

protected def awaitStreamingComplete(jobId: String): Unit = {
Expand Down

0 comments on commit 022b974

Please sign in to comment.