Skip to content

Commit

Permalink
Support refresh interval and checkpoint location option (#39)
Browse files Browse the repository at this point in the history
* Add Flint index options and pass it from parser to index

Signed-off-by: Chen Dai <[email protected]>

* Update more javadoc

Signed-off-by: Chen Dai <[email protected]>

* Pass index option to Flint metadata and streaming job

Signed-off-by: Chen Dai <[email protected]>

* Fix IT and add new IT

Signed-off-by: Chen Dai <[email protected]>

* Add more IT

Signed-off-by: Chen Dai <[email protected]>

* Update job id var name

Signed-off-by: Chen Dai <[email protected]>

* Update doc with index options

Signed-off-by: Chen Dai <[email protected]>

* Update doc with default behavior

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Sep 26, 2023
1 parent eadb614 commit 896fa9f
Show file tree
Hide file tree
Showing 15 changed files with 294 additions and 66 deletions.
32 changes: 30 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ CREATE SKIPPING INDEX
ON <object>
( column <index_type> [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))
WITH ( options )

REFRESH SKIPPING INDEX ON <object>

Expand Down Expand Up @@ -164,7 +164,7 @@ DROP SKIPPING INDEX ON alb_logs
CREATE INDEX name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))
WITH ( options )

REFRESH INDEX name ON <object>

Expand All @@ -190,6 +190,34 @@ DESCRIBE INDEX elb_and_requestUri ON alb_logs
DROP INDEX elb_and_requestUri ON alb_logs
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:

+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.

```sql
WITH (
auto_refresh = [true|false],
refresh_interval = 'time interval expression',
checkpoint_location = 'checkpoint directory path'
)
```

Example:

```sql
CREATE INDEX elb_and_requestUri
ON alb_logs ( elb, requestUri )
WITH (
auto_refresh = true,
refresh_interval = '1 minute',
checkpoint_location = 's3://test/'
)
```

## Index Store

### OpenSearch
Expand Down
8 changes: 7 additions & 1 deletion flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';

Expand All @@ -174,6 +173,13 @@ EQ : '=' | '==';
MINUS: '-';


STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
| 'R\'' (~'\'')* '\''
| 'R"'(~'"')* '"'
;

INTEGER_VALUE
: DIGIT+
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark
import scala.collection.JavaConverters._

import org.json4s.{Formats, JArray, NoTypeHints}
import org.json4s.JsonAST.{JField, JObject}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.streaming.OutputMode.Append
import org.apache.spark.sql.streaming.Trigger

/**
* Flint Spark integration API entrypoint.
Expand Down Expand Up @@ -128,11 +130,22 @@ class FlintSpark(val spark: SparkSession) {
.writeStream
.queryName(indexName)
.outputMode(Append())
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
Some(job.id.toString)

index.options
.checkpointLocation()
.foreach(location => job.option("checkpointLocation", location))
index.options
.refreshInterval()
.foreach(interval => job.trigger(Trigger.ProcessingTime(interval)))

val jobId =
job
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
.id
Some(jobId.toString)
}
}

Expand Down Expand Up @@ -175,8 +188,8 @@ class FlintSpark(val spark: SparkSession) {
*/
def deleteIndex(indexName: String): Boolean = {
if (flintClient.exists(indexName)) {
flintClient.deleteIndex(indexName)
stopRefreshingJob(indexName)
flintClient.deleteIndex(indexName)
true
} else {
false
Expand Down Expand Up @@ -223,6 +236,14 @@ class FlintSpark(val spark: SparkSession) {
val tableName = (meta \ "source").extract[String]
val indexType = (meta \ "kind").extract[String]
val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray]
val indexOptions = FlintSparkIndexOptions(
(meta \ "options")
.asInstanceOf[JObject]
.obj
.map { case JField(key, value) =>
key -> value.values.toString
}
.toMap)

indexType match {
case SKIPPING_INDEX_TYPE =>
Expand All @@ -242,14 +263,15 @@ class FlintSpark(val spark: SparkSession) {
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
}
new FlintSparkSkippingIndex(tableName, strategies)
new FlintSparkSkippingIndex(tableName, strategies, indexOptions)
case COVERING_INDEX_TYPE =>
new FlintSparkCoveringIndex(
indexName,
tableName,
indexedColumns.arr.map { obj =>
((obj \ "columnName").extract[String], (obj \ "columnType").extract[String])
}.toMap)
}.toMap,
indexOptions)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ trait FlintSparkIndex {
*/
val kind: String

/**
* Index options
*/
val options: FlintSparkIndexOptions

/**
* @return
* Flint index name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.empty

import org.apache.spark.sql.catalog.Column

/**
Expand All @@ -18,6 +20,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
/** Source table name */
protected var tableName: String = ""

/** Index options */
protected var indexOptions: FlintSparkIndexOptions = empty

/** All columns of the given source table */
lazy protected val allColumns: Map[String, Column] = {
require(tableName.nonEmpty, "Source table name is not provided")
Expand All @@ -29,6 +34,19 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
.toMap
}

/**
* Add index options.
*
* @param options
* index options
* @return
* builder
*/
def options(options: FlintSparkIndexOptions): this.type = {
this.indexOptions = options
this
}

/**
* Create Flint index.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

/**
* Flint Spark index configurable options.
*
* @param options
* index option mappings
*/
case class FlintSparkIndexOptions(options: Map[String, String]) {

/**
* Is Flint index auto refreshed or manual refreshed.
*
* @return
* auto refresh option value
*/
def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean

/**
* The refresh interval (only valid if auto refresh enabled).
*
* @return
* refresh interval expression
*/
def refreshInterval(): Option[String] = options.get("refresh_interval")

/**
* The checkpoint location which maybe required by Flint index's refresh.
*
* @return
* checkpoint location path
*/
def checkpointLocation(): Option[String] = options.get("checkpoint_location")
}

object FlintSparkIndexOptions {

/**
* Empty options
*/
val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.json4s.JsonAST.{JArray, JObject, JString}
import org.json4s.native.JsonMethods.{compact, parse, render}
import org.json4s.native.Serialization
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder}
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

import org.apache.spark.sql.DataFrame
Expand All @@ -31,7 +32,8 @@ import org.apache.spark.sql.types.StructType
case class FlintSparkCoveringIndex(
indexName: String,
tableName: String,
indexedColumns: Map[String, String])
indexedColumns: Map[String, String],
override val options: FlintSparkIndexOptions = empty)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand All @@ -49,7 +51,8 @@ case class FlintSparkCoveringIndex(
| "name": "$indexName",
| "kind": "$kind",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName"
| "source": "$tableName",
| "options": $getIndexOptions
| },
| "properties": $getSchema
| }
Expand All @@ -69,6 +72,10 @@ case class FlintSparkCoveringIndex(
Serialization.write(JArray(objects))
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getSchema: String = {
val catalogDDL =
indexedColumns
Expand Down Expand Up @@ -154,6 +161,6 @@ object FlintSparkCoveringIndex {
}

override protected def buildIndex(): FlintSparkIndex =
new FlintSparkCoveringIndex(indexName, tableName, indexedColumns)
new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder}
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
Expand All @@ -34,7 +35,8 @@ import org.apache.spark.sql.types.StructType
*/
class FlintSparkSkippingIndex(
tableName: String,
val indexedColumns: Seq[FlintSparkSkippingStrategy])
val indexedColumns: Seq[FlintSparkSkippingStrategy],
override val options: FlintSparkIndexOptions = empty)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand All @@ -56,7 +58,8 @@ class FlintSparkSkippingIndex(
| "version": "${FlintVersion.current()}",
| "kind": "$SKIPPING_INDEX_TYPE",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName"
| "source": "$tableName",
| "options": $getIndexOptions
| },
| "properties": $getSchema
| }
Expand All @@ -82,6 +85,10 @@ class FlintSparkSkippingIndex(
Serialization.write(indexedColumns)
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getSchema: String = {
val allFieldTypes =
indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string")
Expand Down Expand Up @@ -192,7 +199,7 @@ object FlintSparkSkippingIndex {
}

override def buildIndex(): FlintSparkIndex =
new FlintSparkSkippingIndex(tableName, indexedColumns)
new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions)

private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = {
require(
Expand Down
Loading

0 comments on commit 896fa9f

Please sign in to comment.