Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support refresh interval and checkpoint location option #39

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
32 changes: 30 additions & 2 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -125,7 +125,7 @@ CREATE SKIPPING INDEX
ON <object>
( column <index_type> [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))
WITH ( options )

REFRESH SKIPPING INDEX ON <object>

Expand Down Expand Up @@ -164,7 +164,7 @@ DROP SKIPPING INDEX ON alb_logs
CREATE INDEX name ON <object>
( column [, ...] )
WHERE <filter_predicate>
WITH (auto_refresh = (true|false))
WITH ( options )

REFRESH INDEX name ON <object>

Expand All @@ -190,6 +190,34 @@ DESCRIBE INDEX elb_and_requestUri ON alb_logs
DROP INDEX elb_and_requestUri ON alb_logs
```

#### Create Index Options

User can provide the following options in `WITH` clause of create statement:

+ `auto_refresh`: triggers Incremental Refresh immediately after index create complete if true. Otherwise, user has to trigger Full Refresh by `REFRESH` statement manually.
+ `refresh_interval`: a string as the time interval for incremental refresh, e.g. 1 minute, 10 seconds. This is only applicable when auto refresh enabled. Please check `org.apache.spark.unsafe.types.CalendarInterval` for valid duration identifiers. By default, next micro batch will be generated as soon as the previous one complete processing.
+ `checkpoint_location`: a string as the location path for incremental refresh job checkpoint. The location has to be a path in an HDFS compatible file system and only applicable when auto refresh enabled. If unspecified, temporary checkpoint directory will be used and may result in checkpoint data lost upon restart.

```sql
WITH (
auto_refresh = [true|false],
refresh_interval = 'time interval expression',
checkpoint_location = 'checkpoint directory path'
)
```

Example:

```sql
CREATE INDEX elb_and_requestUri
ON alb_logs ( elb, requestUri )
WITH (
auto_refresh = true,
refresh_interval = '1 minute',
checkpoint_location = 's3://test/'
)
```

## Index Store

### OpenSearch
Expand Down
8 changes: 7 additions & 1 deletion flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -165,7 +165,6 @@ ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
SHOW: 'SHOW';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';

Expand All @@ -174,6 +173,13 @@ EQ : '=' | '==';
MINUS: '-';


STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
| 'R\'' (~'\'')* '\''
| 'R"'(~'"')* '"'
;

INTEGER_VALUE
: DIGIT+
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package org.opensearch.flint.spark
import scala.collection.JavaConverters._

import org.json4s.{Formats, JArray, NoTypeHints}
import org.json4s.JsonAST.{JField, JObject}
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
Expand All @@ -30,6 +31,7 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.streaming.OutputMode.Append
import org.apache.spark.sql.streaming.Trigger

/**
* Flint Spark integration API entrypoint.
Expand Down Expand Up @@ -128,11 +130,22 @@ class FlintSpark(val spark: SparkSession) {
.writeStream
.queryName(indexName)
.outputMode(Append())
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
Some(job.id.toString)

index.options
.checkpointLocation()
.foreach(location => job.option("checkpointLocation", location))
index.options
.refreshInterval()
.foreach(interval => job.trigger(Trigger.ProcessingTime(interval)))
dai-chen marked this conversation as resolved.
Show resolved Hide resolved

val jobId =
job
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
.id
Some(jobId.toString)
}
}

Expand Down Expand Up @@ -175,8 +188,8 @@ class FlintSpark(val spark: SparkSession) {
*/
def deleteIndex(indexName: String): Boolean = {
if (flintClient.exists(indexName)) {
flintClient.deleteIndex(indexName)
stopRefreshingJob(indexName)
flintClient.deleteIndex(indexName)
true
} else {
false
Expand Down Expand Up @@ -223,6 +236,14 @@ class FlintSpark(val spark: SparkSession) {
val tableName = (meta \ "source").extract[String]
val indexType = (meta \ "kind").extract[String]
val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray]
val indexOptions = FlintSparkIndexOptions(
(meta \ "options")
.asInstanceOf[JObject]
.obj
.map { case JField(key, value) =>
key -> value.values.toString
}
.toMap)

indexType match {
case SKIPPING_INDEX_TYPE =>
Expand All @@ -242,14 +263,15 @@ class FlintSpark(val spark: SparkSession) {
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
}
new FlintSparkSkippingIndex(tableName, strategies)
new FlintSparkSkippingIndex(tableName, strategies, indexOptions)
case COVERING_INDEX_TYPE =>
new FlintSparkCoveringIndex(
indexName,
tableName,
indexedColumns.arr.map { obj =>
((obj \ "columnName").extract[String], (obj \ "columnType").extract[String])
}.toMap)
}.toMap,
indexOptions)
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ trait FlintSparkIndex {
*/
val kind: String

/**
* Index options
*/
val options: FlintSparkIndexOptions

/**
* @return
* Flint index name
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.spark

import org.opensearch.flint.spark.FlintSparkIndexOptions.empty

import org.apache.spark.sql.catalog.Column

/**
Expand All @@ -18,6 +20,9 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
/** Source table name */
protected var tableName: String = ""

/** Index options */
protected var indexOptions: FlintSparkIndexOptions = empty

/** All columns of the given source table */
lazy protected val allColumns: Map[String, Column] = {
require(tableName.nonEmpty, "Source table name is not provided")
Expand All @@ -29,6 +34,19 @@ abstract class FlintSparkIndexBuilder(flint: FlintSpark) {
.toMap
}

/**
* Add index options.
*
* @param options
* index options
* @return
* builder
*/
def options(options: FlintSparkIndexOptions): this.type = {
this.indexOptions = options
this
}

/**
* Create Flint index.
*/
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.spark

/**
* Flint Spark index configurable options.
*
* @param options
* index option mappings
*/
case class FlintSparkIndexOptions(options: Map[String, String]) {

/**
* Is Flint index auto refreshed or manual refreshed.
*
* @return
* auto refresh option value
*/
def autoRefresh(): Boolean = options.getOrElse("auto_refresh", "false").toBoolean

/**
* The refresh interval (only valid if auto refresh enabled).
*
* @return
* refresh interval expression
*/
def refreshInterval(): Option[String] = options.get("refresh_interval")

/**
* The checkpoint location which maybe required by Flint index's refresh.
*
* @return
* checkpoint location path
*/
def checkpointLocation(): Option[String] = options.get("checkpoint_location")
}

object FlintSparkIndexOptions {

/**
* Empty options
*/
val empty: FlintSparkIndexOptions = FlintSparkIndexOptions(Map.empty)
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.json4s.JsonAST.{JArray, JObject, JString}
import org.json4s.native.JsonMethods.{compact, parse, render}
import org.json4s.native.Serialization
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder}
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.flintIndexNamePrefix
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{getFlintIndexName, COVERING_INDEX_TYPE}

import org.apache.spark.sql.DataFrame
Expand All @@ -31,7 +32,8 @@ import org.apache.spark.sql.types.StructType
case class FlintSparkCoveringIndex(
indexName: String,
tableName: String,
indexedColumns: Map[String, String])
indexedColumns: Map[String, String],
override val options: FlintSparkIndexOptions = empty)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand All @@ -49,7 +51,8 @@ case class FlintSparkCoveringIndex(
| "name": "$indexName",
| "kind": "$kind",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName"
| "source": "$tableName",
| "options": $getIndexOptions
| },
| "properties": $getSchema
| }
Expand All @@ -69,6 +72,10 @@ case class FlintSparkCoveringIndex(
Serialization.write(JArray(objects))
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getSchema: String = {
val catalogDDL =
indexedColumns
Expand Down Expand Up @@ -154,6 +161,6 @@ object FlintSparkCoveringIndex {
}

override protected def buildIndex(): FlintSparkIndex =
new FlintSparkCoveringIndex(indexName, tableName, indexedColumns)
new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,9 @@ import org.json4s.native.JsonMethods._
import org.json4s.native.Serialization
import org.opensearch.flint.core.FlintVersion
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder}
import org.opensearch.flint.spark.{FlintSpark, FlintSparkIndex, FlintSparkIndexBuilder, FlintSparkIndexOptions}
import org.opensearch.flint.spark.FlintSparkIndex.{flintIndexNamePrefix, ID_COLUMN}
import org.opensearch.flint.spark.FlintSparkIndexOptions.empty
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.{getSkippingIndexName, FILE_PATH_COLUMN, SKIPPING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKindSerializer
import org.opensearch.flint.spark.skipping.minmax.MinMaxSkippingStrategy
Expand All @@ -34,7 +35,8 @@ import org.apache.spark.sql.types.StructType
*/
class FlintSparkSkippingIndex(
tableName: String,
val indexedColumns: Seq[FlintSparkSkippingStrategy])
val indexedColumns: Seq[FlintSparkSkippingStrategy],
override val options: FlintSparkIndexOptions = empty)
extends FlintSparkIndex {

require(indexedColumns.nonEmpty, "indexed columns must not be empty")
Expand All @@ -56,7 +58,8 @@ class FlintSparkSkippingIndex(
| "version": "${FlintVersion.current()}",
| "kind": "$SKIPPING_INDEX_TYPE",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName"
| "source": "$tableName",
| "options": $getIndexOptions
| },
| "properties": $getSchema
| }
Expand All @@ -82,6 +85,10 @@ class FlintSparkSkippingIndex(
Serialization.write(indexedColumns)
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getSchema: String = {
val allFieldTypes =
indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string")
Expand Down Expand Up @@ -192,7 +199,7 @@ object FlintSparkSkippingIndex {
}

override def buildIndex(): FlintSparkIndex =
new FlintSparkSkippingIndex(tableName, indexedColumns)
new FlintSparkSkippingIndex(tableName, indexedColumns, indexOptions)

private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = {
require(
Expand Down
Loading
Loading