From 4dd70bda0776d6fcf685826311c65189ae2d3181 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 23 Oct 2023 11:19:25 -0700 Subject: [PATCH 1/2] Add checkpoint mandatory option and change IT Signed-off-by: Chen Dai --- .../sql/flint/config/FlintSparkConf.scala | 6 ++++++ .../opensearch/flint/spark/FlintSpark.scala | 10 ++++++++-- .../FlintSparkCoveringIndexSqlITSuite.scala | 19 ++++++++++++++++++- .../FlintSparkSkippingIndexSqlITSuite.scala | 19 ++++++++++++++++++- .../flint/spark/FlintSparkSuite.scala | 6 ++++-- 5 files changed, 54 insertions(+), 6 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala index 45d80202a..b7b5a1f76 100644 --- a/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala +++ b/flint-spark-integration/src/main/scala/org/apache/spark/sql/flint/config/FlintSparkConf.scala @@ -113,6 +113,10 @@ object FlintSparkConf { val HYBRID_SCAN_ENABLED = FlintConfig("spark.flint.index.hybridscan.enabled") .doc("Enable hybrid scan to include latest source data not refreshed to index yet") .createWithDefault("false") + + val CHECKPOINT_MANDATORY = FlintConfig("spark.flint.index.checkpoint.mandatory") + .doc("Checkpoint location for incremental refresh index will be mandatory if enabled") + .createWithDefault("true") } /** @@ -137,6 +141,8 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable def isHybridScanEnabled: Boolean = HYBRID_SCAN_ENABLED.readFrom(reader).toBoolean + def isCheckpointMandatory: Boolean = CHECKPOINT_MANDATORY.readFrom(reader).toBoolean + /** * spark.sql.session.timeZone */ diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 5bdab6276..632730afd 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -21,7 +21,7 @@ import org.apache.spark.sql.{DataFrame, Row, SparkSession} import org.apache.spark.sql.SaveMode._ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf -import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} +import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} import org.apache.spark.sql.streaming.{DataStreamWriter, Trigger} /** @@ -247,7 +247,13 @@ class FlintSpark(val spark: SparkSession) { } def addCheckpointLocation(checkpointLocation: Option[String]): DataStreamWriter[Row] = { - checkpointLocation.map(dataStream.option("checkpointLocation", _)).getOrElse(dataStream) + checkpointLocation match { + case Some(location) => dataStream.option("checkpointLocation", location) + case None if flintSparkConf.isCheckpointMandatory => + throw new IllegalStateException( + s"Checkpoint location is mandatory for incremental refresh if ${CHECKPOINT_MANDATORY.key} enabled") + case _ => dataStream + } } def addRefreshInterval(refreshInterval: Option[String]): DataStreamWriter[Row] = { diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala index 0d3f7a887..3b97f889b 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkCoveringIndexSqlITSuite.scala @@ -15,10 +15,11 @@ import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.getFlintIndexName import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.must.Matchers.{defined, have} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row +import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { @@ -104,6 +105,22 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite { | """.stripMargin) } + test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") { + setFlintSparkConf(CHECKPOINT_MANDATORY, "true") + try { + the[IllegalStateException] thrownBy { + sql(s""" + | CREATE INDEX $testIndex ON $testTable + | (name, age) + | WITH (auto_refresh = true) + | """.stripMargin) + } should have message + "Checkpoint location is mandatory for incremental refresh if spark.flint.index.checkpoint.mandatory enabled" + } finally { + setFlintSparkConf(CHECKPOINT_MANDATORY, "false") + } + } + test("create covering index with manual refresh") { sql(s""" | CREATE INDEX $testIndex ON $testTable diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala index dbd349b63..41e3539cd 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSkippingIndexSqlITSuite.scala @@ -14,11 +14,12 @@ import org.json4s.native.Serialization import org.opensearch.flint.core.FlintOptions import org.opensearch.flint.core.storage.FlintOpenSearchClient import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.getSkippingIndexName -import org.scalatest.matchers.must.Matchers.defined +import org.scalatest.matchers.must.Matchers.{defined, have} import org.scalatest.matchers.should.Matchers.{convertToAnyShouldWrapper, the} import org.apache.spark.sql.Row import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE +import org.apache.spark.sql.flint.config.FlintSparkConf.CHECKPOINT_MANDATORY class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { @@ -109,6 +110,22 @@ class FlintSparkSkippingIndexSqlITSuite extends FlintSparkSuite { | """.stripMargin) } + test("create skipping index with auto refresh should fail if mandatory checkpoint enabled") { + setFlintSparkConf(CHECKPOINT_MANDATORY, "true") + try { + the[IllegalStateException] thrownBy { + sql(s""" + | CREATE SKIPPING INDEX ON $testTable + | ( year PARTITION ) + | WITH (auto_refresh = true) + | """.stripMargin) + } should have message + "Checkpoint location is mandatory for incremental refresh if spark.flint.index.checkpoint.mandatory enabled" + } finally { + setFlintSparkConf(CHECKPOINT_MANDATORY, "false") + } + } + test("create skipping index with manual refresh") { sql(s""" | CREATE SKIPPING INDEX ON $testTable diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 2b93ca12a..11ab16e3a 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -6,10 +6,9 @@ package org.opensearch.flint.spark import org.opensearch.flint.OpenSearchSuite - import org.apache.spark.FlintSuite import org.apache.spark.sql.QueryTest -import org.apache.spark.sql.flint.config.FlintSparkConf.{HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} +import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY} import org.apache.spark.sql.streaming.StreamTest /** @@ -26,6 +25,9 @@ trait FlintSparkSuite extends QueryTest with FlintSuite with OpenSearchSuite wit setFlintSparkConf(HOST_ENDPOINT, openSearchHost) setFlintSparkConf(HOST_PORT, openSearchPort) setFlintSparkConf(REFRESH_POLICY, "true") + + // Disable mandatory checkpoint for test convenience + setFlintSparkConf(CHECKPOINT_MANDATORY, "false") } protected def awaitStreamingComplete(jobId: String): Unit = { From 13db2a5590ec93fb265dfb8a54d5ba27f7aec61a Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 23 Oct 2023 11:25:21 -0700 Subject: [PATCH 2/2] Add doc and fix scalafmt Signed-off-by: Chen Dai --- docs/index.md | 1 + .../test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala | 1 + 2 files changed, 2 insertions(+) diff --git a/docs/index.md b/docs/index.md index 3c1671c98..3c733ce8c 100644 --- a/docs/index.md +++ b/docs/index.md @@ -341,6 +341,7 @@ In the index mapping, the `_meta` and `properties`field stores meta and schema i - `spark.datasource.flint.read.scroll_size`: default value is 100. - `spark.flint.optimizer.enabled`: default is true. - `spark.flint.index.hybridscan.enabled`: default is false. +- `spark.flint.index.checkpoint.mandatory`: default is true. #### Data Type Mapping diff --git a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala index 11ab16e3a..168279eb3 100644 --- a/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala +++ b/integ-test/src/test/scala/org/opensearch/flint/spark/FlintSparkSuite.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import org.opensearch.flint.OpenSearchSuite + import org.apache.spark.FlintSuite import org.apache.spark.sql.QueryTest import org.apache.spark.sql.flint.config.FlintSparkConf.{CHECKPOINT_MANDATORY, HOST_ENDPOINT, HOST_PORT, REFRESH_POLICY}