Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add more index options in create statement #1

Draft
wants to merge 17 commits into
base: add-covering-index-sql-support
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,23 @@ public class FlintMetadata {
// TODO: define metadata format and create strong-typed class
private final String content;

// TODO: temporary and will be refactored along with entire metadata
/** Optional index settings */
private String indexSettings;

public FlintMetadata(String content) {
this.content = content;
}

public String getContent() {
return content;
}

public String getIndexSettings() {
return indexSettings;
}

public void setIndexSettings(String indexSettings) {
this.indexSettings = indexSettings;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,9 @@ public FlintOpenSearchClient(FlintOptions options) {
public void createIndex(String indexName, FlintMetadata metadata) {
try (RestHighLevelClient client = createClient()) {
CreateIndexRequest request = new CreateIndexRequest(indexName);
if (metadata.getIndexSettings() != null) {
request.settings(metadata.getIndexSettings(), XContentType.JSON);
}
request.mapping(metadata.getContent(), XContentType.JSON);

client.indices().create(request, RequestOptions.DEFAULT);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ singleStatement

statement
: skippingIndexStatement
| coveringIndexStatement
;

skippingIndexStatement
Expand Down Expand Up @@ -43,6 +44,26 @@ dropSkippingIndexStatement
: DROP SKIPPING INDEX ON tableName=multipartIdentifier
;

coveringIndexStatement
: createCoveringIndexStatement
| refreshCoveringIndexStatement
| dropCoveringIndexStatement
;

createCoveringIndexStatement
: CREATE INDEX indexName=identifier ON tableName=multipartIdentifier
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;

refreshCoveringIndexStatement
: REFRESH INDEX indexName=identifier ON tableName=multipartIdentifier
;

dropCoveringIndexStatement
: DROP INDEX indexName=identifier ON tableName=multipartIdentifier
;

indexColTypeList
: indexColType (COMMA indexColType)*
;
Expand Down
16 changes: 15 additions & 1 deletion flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -85,6 +85,14 @@ grammar SparkSqlBase;
}


multipartIdentifierPropertyList
: multipartIdentifierProperty (COMMA multipartIdentifierProperty)*
;

multipartIdentifierProperty
: multipartIdentifier (options=propertyList)?
;

propertyList
: property (COMMA property)*
;
Expand Down Expand Up @@ -155,7 +163,6 @@ INDEX: 'INDEX';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';

Expand All @@ -164,6 +171,13 @@ EQ : '=' | '==';
MINUS: '-';


STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
| 'R\'' (~'\'')* '\''
| 'R"'(~'"')* '"'
;

INTEGER_VALUE
: DIGIT+
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,15 +7,17 @@ package org.opensearch.flint.spark

import scala.collection.JavaConverters._

import org.json4s.{Formats, JArray, NoTypeHints}
import org.json4s.{Formats, JArray, JObject, NoTypeHints}
import org.json4s.JsonAST.JField
import org.json4s.native.JsonMethods.parse
import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.spark.FlintSpark._
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.skipping.{FlintSparkSkippingIndex, FlintSparkSkippingStrategy}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex.{parseFlintIndexName, COVERING_INDEX_TYPE}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex
import org.opensearch.flint.spark.skipping.FlintSparkSkippingIndex.SKIPPING_INDEX_TYPE
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.{SkippingKind, SkippingKindSerializer}
import org.opensearch.flint.spark.skipping.FlintSparkSkippingStrategy.SkippingKind.{MIN_MAX, PARTITION, VALUE_SET}
Expand All @@ -25,12 +27,11 @@ import org.opensearch.flint.spark.skipping.valueset.ValueSetSkippingStrategy

import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.SaveMode._
import org.apache.spark.sql.catalog.Column
import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.streaming.OutputMode.Append
import org.apache.spark.sql.streaming.StreamingQuery
import org.apache.spark.sql.streaming.Trigger

/**
* Flint Spark integration API entrypoint.
Expand All @@ -42,8 +43,7 @@ class FlintSpark(val spark: SparkSession) {
FlintSparkConf(
Map(
DOC_ID_COLUMN_NAME.optionKey -> ID_COLUMN,
IGNORE_DOC_ID_COLUMN.optionKey -> "true"
).asJava)
IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava)

/** Flint client for low-level index operation */
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())
Expand All @@ -57,8 +57,18 @@ class FlintSpark(val spark: SparkSession) {
* @return
* index builder
*/
def skippingIndex(): IndexBuilder = {
new IndexBuilder(this)
def skippingIndex(): FlintSparkSkippingIndex.Builder = {
new FlintSparkSkippingIndex.Builder(this)
}

/**
* Create index builder for creating index with fluent API.
*
* @return
* index builder
*/
def coveringIndex(): FlintSparkCoveringIndex.Builder = {
new FlintSparkCoveringIndex.Builder(this)
}

/**
Expand Down Expand Up @@ -120,11 +130,21 @@ class FlintSpark(val spark: SparkSession) {
.writeStream
.queryName(indexName)
.outputMode(Append())
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
Some(job.id.toString)

index.options
.checkpointLocation()
.foreach(location => job.option("checkpointLocation", location))
index.options
.refreshInterval()
.foreach(interval => job.trigger(Trigger.ProcessingTime(interval)))

val jobId =
job
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
Some(jobId.toString)
}
}

Expand Down Expand Up @@ -199,9 +219,18 @@ class FlintSpark(val spark: SparkSession) {
*/
private def deserialize(metadata: FlintMetadata): FlintSparkIndex = {
val meta = parse(metadata.getContent) \ "_meta"
val indexName = (meta \ "name").extract[String]
val tableName = (meta \ "source").extract[String]
val indexType = (meta \ "kind").extract[String]
val indexedColumns = (meta \ "indexedColumns").asInstanceOf[JArray]
val indexOptions = FlintSparkIndexOptions(
(meta \ "options")
.asInstanceOf[JObject]
.obj
.map { case JField(key, value) =>
key -> value.values.toString
}
.toMap)

indexType match {
case SKIPPING_INDEX_TYPE =>
Expand All @@ -221,7 +250,14 @@ class FlintSpark(val spark: SparkSession) {
throw new IllegalStateException(s"Unknown skipping strategy: $other")
}
}
new FlintSparkSkippingIndex(tableName, strategies)
new FlintSparkSkippingIndex(tableName, strategies, indexOptions)
case COVERING_INDEX_TYPE =>
new FlintSparkCoveringIndex(
parseFlintIndexName(indexName, tableName),
tableName,
indexedColumns.arr.map { obj =>
((obj \ "columnName").extract[String], (obj \ "columnType").extract[String])
}.toMap)
}
}
}
Expand All @@ -236,102 +272,4 @@ object FlintSpark {
type RefreshMode = Value
val FULL, INCREMENTAL = Value
}

/**
* Helper class for index class construct. For now only skipping index supported.
*/
class IndexBuilder(flint: FlintSpark) {
var tableName: String = ""
var indexedColumns: Seq[FlintSparkSkippingStrategy] = Seq()

lazy val allColumns: Map[String, Column] = {
flint.spark.catalog
.listColumns(tableName)
.collect()
.map(col => (col.name, col))
.toMap
}

/**
* Configure which source table the index is based on.
*
* @param tableName
* full table name
* @return
* index builder
*/
def onTable(tableName: String): IndexBuilder = {
this.tableName = tableName
this
}

/**
* Add partition skipping indexed columns.
*
* @param colNames
* indexed column names
* @return
* index builder
*/
def addPartitions(colNames: String*): IndexBuilder = {
require(tableName.nonEmpty, "table name cannot be empty")

colNames
.map(findColumn)
.map(col => PartitionSkippingStrategy(columnName = col.name, columnType = col.dataType))
.foreach(addIndexedColumn)
this
}

/**
* Add value set skipping indexed column.
*
* @param colName
* indexed column name
* @return
* index builder
*/
def addValueSet(colName: String): IndexBuilder = {
require(tableName.nonEmpty, "table name cannot be empty")

val col = findColumn(colName)
addIndexedColumn(ValueSetSkippingStrategy(columnName = col.name, columnType = col.dataType))
this
}

/**
* Add min max skipping indexed column.
*
* @param colName
* indexed column name
* @return
* index builder
*/
def addMinMax(colName: String): IndexBuilder = {
val col = findColumn(colName)
indexedColumns =
indexedColumns :+ MinMaxSkippingStrategy(columnName = col.name, columnType = col.dataType)
this
}

/**
* Create index.
*/
def create(): Unit = {
flint.createIndex(new FlintSparkSkippingIndex(tableName, indexedColumns))
}

private def findColumn(colName: String): Column =
allColumns.getOrElse(
colName,
throw new IllegalArgumentException(s"Column $colName does not exist"))

private def addIndexedColumn(indexedCol: FlintSparkSkippingStrategy): Unit = {
require(
indexedColumns.forall(_.columnName != indexedCol.columnName),
s"${indexedCol.columnName} is already indexed")

indexedColumns = indexedColumns :+ indexedCol
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,11 @@ trait FlintSparkIndex {
*/
val kind: String

/**
* Index options
*/
val options: FlintSparkIndexOptions

/**
* @return
* Flint index name
Expand Down
Loading