From fcaa6e10c87057b089d7787baf4415abfa872083 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Thu, 21 Sep 2023 09:10:59 -0700 Subject: [PATCH] Pass index option to Flint metadata and streaming job Signed-off-by: Chen Dai --- .../src/main/antlr4/SparkSqlBase.g4 | 8 +++++++- .../org/opensearch/flint/spark/FlintSpark.scala | 12 +++++++++++- .../spark/covering/FlintSparkCoveringIndex.scala | 7 ++++++- .../spark/skipping/FlintSparkSkippingIndex.scala | 7 ++++++- 4 files changed, 30 insertions(+), 4 deletions(-) diff --git a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 index 17627c190..edba43200 100644 --- a/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 +++ b/flint-spark-integration/src/main/antlr4/SparkSqlBase.g4 @@ -163,7 +163,6 @@ INDEX: 'INDEX'; ON: 'ON'; PARTITION: 'PARTITION'; REFRESH: 'REFRESH'; -STRING: 'STRING'; TRUE: 'TRUE'; WITH: 'WITH'; @@ -172,6 +171,13 @@ EQ : '=' | '=='; MINUS: '-'; +STRING + : '\'' ( ~('\''|'\\') | ('\\' .) )* '\'' + | '"' ( ~('"'|'\\') | ('\\' .) )* '"' + | 'R\'' (~'\'')* '\'' + | 'R"'(~'"')* '"' + ; + INTEGER_VALUE : DIGIT+ ; diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 3aceeadb5..f18357615 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -31,6 +31,7 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE import org.apache.spark.sql.flint.config.FlintSparkConf import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN} import org.apache.spark.sql.streaming.OutputMode.Append +import org.apache.spark.sql.streaming.Trigger /** * Flint Spark integration API entrypoint. @@ -129,11 +130,20 @@ class FlintSpark(val spark: SparkSession) { .writeStream .queryName(indexName) .outputMode(Append()) + + index.options + .checkpointLocation() + .foreach(location => job.option("checkpointLocation", location)) + index.options + .refreshInterval() + .foreach(interval => job.trigger(Trigger.ProcessingTime(interval))) + + val jobId = job .foreachBatch { (batchDF: DataFrame, _: Long) => writeFlintIndex(batchDF) } .start() - Some(job.id.toString) + Some(jobId.toString) } } diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala index 96b7f39bf..4c8ca7ecf 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/covering/FlintSparkCoveringIndex.scala @@ -51,7 +51,8 @@ class FlintSparkCoveringIndex( | "name": "$indexName", | "kind": "$kind", | "indexedColumns": $getMetaInfo, - | "source": "$tableName" + | "source": "$tableName", + | "options": $getIndexOptions | }, | "properties": $getSchema | } @@ -71,6 +72,10 @@ class FlintSparkCoveringIndex( Serialization.write(JArray(objects)) } + private def getIndexOptions: String = { + Serialization.write(options.options) + } + private def getSchema: String = { val catalogDDL = indexedColumns diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala index 2eebffebf..da69cc1fa 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/skipping/FlintSparkSkippingIndex.scala @@ -58,7 +58,8 @@ class FlintSparkSkippingIndex( | "version": "${FlintVersion.current()}", | "kind": "$SKIPPING_INDEX_TYPE", | "indexedColumns": $getMetaInfo, - | "source": "$tableName" + | "source": "$tableName", + | "options": $getIndexOptions | }, | "properties": $getSchema | } @@ -84,6 +85,10 @@ class FlintSparkSkippingIndex( Serialization.write(indexedColumns) } + private def getIndexOptions: String = { + Serialization.write(options.options) + } + private def getSchema: String = { val allFieldTypes = indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string")