Skip to content

Commit

Permalink
add USING statement for covering / MV indices for using existing inde…
Browse files Browse the repository at this point in the history
…x / index template in opensearch

Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Oct 18, 2023
1 parent 3fcf926 commit 79936ae
Show file tree
Hide file tree
Showing 9 changed files with 85 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import org.opensearch.flint.core.metadata.FlintJsonHelper._
case class FlintMetadata(
/** Flint spec version */
version: FlintVersion,
/** Flint index target name */
targetName: Option[String],
/** Flint index name */
name: String,
/** Flint index kind */
Expand Down Expand Up @@ -53,11 +55,11 @@ case class FlintMetadata(
objectField(builder, "_meta") {
builder
.field("version", version.version)
.field("targetName", targetName.getOrElse(name) )
.field("name", name)
.field("kind", kind)
.field("source", source)
.field("indexedColumns", indexedColumns)

optionalObjectField(builder, "options", options)
optionalObjectField(builder, "properties", properties)
}
Expand Down Expand Up @@ -109,6 +111,7 @@ object FlintMetadata {
innerFieldName match {
case "version" => builder.version(FlintVersion.apply(parser.text()))
case "name" => builder.name(parser.text())
case "targetName" => builder.targetName(parser.text())
case "kind" => builder.kind(parser.text())
case "source" => builder.source(parser.text())
case "indexedColumns" =>
Expand Down Expand Up @@ -142,6 +145,7 @@ object FlintMetadata {
class Builder {
private var version: FlintVersion = FlintVersion.current()
private var name: String = ""
private var targetName: Option[String] = None
private var kind: String = ""
private var source: String = ""
private var options: util.Map[String, AnyRef] = new util.HashMap[String, AnyRef]()
Expand All @@ -160,6 +164,11 @@ object FlintMetadata {
this
}

def targetName(name: String): this.type = {
this.targetName = Option(name)
this
}

def kind(kind: String): this.type = {
this.kind = kind
this
Expand Down Expand Up @@ -219,6 +228,7 @@ object FlintMetadata {
def build(): FlintMetadata = {
FlintMetadata(
if (version == null) current() else version,
targetName,
name,
kind,
source,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,7 +55,7 @@ coveringIndexStatement

createCoveringIndexStatement
: CREATE INDEX (IF NOT EXISTS)? indexName
ON tableName
ON tableName (USING indexName)?
LEFT_PAREN indexColumns=multipartIdentifierPropertyList RIGHT_PAREN
(WITH LEFT_PAREN propertyList RIGHT_PAREN)?
;
Expand Down
1 change: 1 addition & 0 deletions flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,7 @@ nonReserved

// Flint lexical tokens

USING: 'USING';
MIN_MAX: 'MIN_MAX';
SKIPPING: 'SKIPPING';
VALUE_SET: 'VALUE_SET';
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,12 @@ trait FlintSparkIndex {
*/
def name(): String

/**
* @return
* Flint target index name
*/
def targetName(): String

/**
* @return
* Flint index metadata
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@ object FlintSparkIndexFactory {
FlintSparkSkippingIndex(metadata.source, strategies, indexOptions)
case COVERING_INDEX_TYPE =>
FlintSparkCoveringIndex(
metadata.targetName,
metadata.name,
metadata.source,
metadata.indexedColumns.map { colInfo =>
Expand All @@ -68,6 +69,7 @@ object FlintSparkIndexFactory {
indexOptions)
case MV_INDEX_TYPE =>
FlintSparkMaterializedView(
metadata.targetName,
metadata.name,
metadata.source,
metadata.indexedColumns.map { colInfo =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import org.apache.spark.sql._
/**
* Flint covering index in Spark.
*
* @param targetIndexName
* optional index target name
* @param indexName
* index name
* @param tableName
Expand All @@ -26,6 +28,7 @@ import org.apache.spark.sql._
* indexed column list
*/
case class FlintSparkCoveringIndex(
targetIndexName: Option[String],
indexName: String,
tableName: String,
indexedColumns: Map[String, String],
Expand All @@ -38,6 +41,14 @@ case class FlintSparkCoveringIndex(

override def name(): String = getFlintIndexName(indexName, tableName)

/**
* @return
* Flint target index name - index that already exist or has existing template to be created with
*/
override def targetName(): String = {
targetIndexName.getOrElse(name())
}

override def metadata(): FlintMetadata = {
val indexColumnMaps = {
indexedColumns.map { case (colName, colType) =>
Expand Down Expand Up @@ -93,6 +104,7 @@ object FlintSparkCoveringIndex {

/** Builder class for covering index build */
class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) {
private var targetIndexName: String = ""
private var indexName: String = ""
private var indexedColumns: Map[String, String] = Map()

Expand All @@ -109,6 +121,19 @@ object FlintSparkCoveringIndex {
this
}

/**
* Set covering index target name.
*
* @param indexName
* index name
* @return
* index builder
*/
def targetName(indexName: String): Builder = {
this.targetIndexName = indexName
this
}

/**
* Configure which source table the index is based on.
*
Expand Down Expand Up @@ -138,6 +163,6 @@ object FlintSparkCoveringIndex {
}

override protected def buildIndex(): FlintSparkIndex =
new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions)
new FlintSparkCoveringIndex(Option.apply(targetIndexName), indexName, tableName, indexedColumns, indexOptions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName}
* index options
*/
case class FlintSparkMaterializedView(
targetIndexName: Option[String],
mvName: String,
query: String,
outputSchema: Map[String, String],
Expand All @@ -51,6 +52,14 @@ case class FlintSparkMaterializedView(

override def name(): String = getFlintIndexName(mvName)

/**
* @return
* Flint target index name - index that already exist or has existing template to be created with
*/
override def targetName(): String = {
targetIndexName.getOrElse(name())
}

override def metadata(): FlintMetadata = {
val indexColumnMaps =
outputSchema.map { case (colName, colType) =>
Expand Down Expand Up @@ -150,9 +159,23 @@ object FlintSparkMaterializedView {

/** Builder class for MV build */
class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) {
private var targetIndexName: String = ""
private var mvName: String = ""
private var query: String = ""

/**
* Set covering index target name.
*
* @param indexName
* index name
* @return
* index builder
*/
def targetName(indexName: String): Builder = {
this.targetIndexName = indexName
this
}

/**
* Set MV name.
*
Expand Down Expand Up @@ -188,7 +211,7 @@ object FlintSparkMaterializedView {
field.name -> field.dataType.typeName
}
.toMap
FlintSparkMaterializedView(mvName, query, outputSchema, indexOptions)
FlintSparkMaterializedView(Option.apply(targetIndexName), mvName, query, outputSchema, indexOptions)
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,14 @@ case class FlintSparkSkippingIndex(
/** Skipping index type */
override val kind: String = SKIPPING_INDEX_TYPE

/**
* @return
* Flint target index name ( in skipping index case not allowing using existing indices)
*/
def targetName(): String = {
name()
}

override def name(): String = {
getSkippingIndexName(tableName)
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitCreateCoveringIndexStatement(
ctx: CreateCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
val indexName = ctx.indexName.getText
val indexName = ctx.indexName.get(0).getText
val tableName = getFullTableName(flint, ctx.tableName)
val indexBuilder =
flint
Expand All @@ -41,14 +41,18 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
}

val ignoreIfExists = ctx.EXISTS() != null
if (ctx.USING() != null) {
indexBuilder.targetName(ctx.indexName().get(1).getText)
}

val indexOptions = visitPropertyList(ctx.propertyList())
indexBuilder
.options(indexOptions)
.create(ignoreIfExists)

// Trigger auto refresh if enabled
if (indexOptions.autoRefresh()) {
val flintIndexName = getFlintIndexName(flint, ctx.indexName, ctx.tableName)
val flintIndexName = getFlintIndexName(flint, ctx.indexName.get(0), ctx.tableName)
flint.refreshIndex(flintIndexName, RefreshMode.INCREMENTAL)
}
Seq.empty
Expand Down

0 comments on commit 79936ae

Please sign in to comment.