Skip to content

Commit

Permalink
Add filtering condition support for covering index
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 23, 2023
1 parent 5e92e29 commit 4695f4b
Show file tree
Hide file tree
Showing 7 changed files with 72 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,11 @@ materializedViewQuery
;

whereClause
: WHERE .+?
: WHERE filterCondition
;

filterCondition
: .+?
;

indexColTypeList
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ object FlintSparkIndexFactory {
metadata.indexedColumns.map { colInfo =>
getString(colInfo, "columnName") -> getString(colInfo, "columnType")
}.toMap,
getOptString(metadata.properties, "filterCondition"),
indexOptions)
case MV_INDEX_TYPE =>
FlintSparkMaterializedView(
Expand All @@ -80,4 +81,13 @@ object FlintSparkIndexFactory {
private def getString(map: java.util.Map[String, AnyRef], key: String): String = {
map.get(key).asInstanceOf[String]
}

private def getOptString(map: java.util.Map[String, AnyRef], key: String): Option[String] = {
val value = map.get(key)
if (value == null) {
None
} else {
Some(value.asInstanceOf[String])
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ case class FlintSparkCoveringIndex(
indexName: String,
tableName: String,
indexedColumns: Map[String, String],
filterCondition: Option[String] = None,
override val options: FlintSparkIndexOptions = empty)
extends FlintSparkIndex {

Expand All @@ -46,17 +47,25 @@ case class FlintSparkCoveringIndex(
}
val schemaJson = generateSchemaJSON(indexedColumns)

metadataBuilder(this)
val builder = metadataBuilder(this)
.name(indexName)
.source(tableName)
.indexedColumns(indexColumnMaps)
.schema(schemaJson)
.build()

// Add optional index properties
filterCondition.map(builder.addProperty("filterCondition", _))
builder.build()
}

override def build(spark: SparkSession, df: Option[DataFrame]): DataFrame = {
val colNames = indexedColumns.keys.toSeq
df.getOrElse(spark.read.table(tableName))
val job = df.getOrElse(spark.read.table(tableName))

// Add optional filtering condition
filterCondition
.map(job.where)
.getOrElse(job)
.select(colNames.head, colNames.tail: _*)
}
}
Expand Down Expand Up @@ -95,6 +104,7 @@ object FlintSparkCoveringIndex {
class Builder(flint: FlintSpark) extends FlintSparkIndexBuilder(flint) {
private var indexName: String = ""
private var indexedColumns: Map[String, String] = Map()
private var filterCondition: Option[String] = None

/**
* Set covering index name.
Expand Down Expand Up @@ -137,7 +147,25 @@ object FlintSparkCoveringIndex {
this
}

/**
* Add filtering condition.
*
* @param condition
* filter condition
* @return
* index builder
*/
def filterBy(condition: String): Builder = {
filterCondition = Some(condition)
this
}

override protected def buildIndex(): FlintSparkIndex =
new FlintSparkCoveringIndex(indexName, tableName, indexedColumns, indexOptions)
new FlintSparkCoveringIndex(
indexName,
tableName,
indexedColumns,
filterCondition,
indexOptions)
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -27,12 +27,6 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
override def visitCreateCoveringIndexStatement(
ctx: CreateCoveringIndexStatementContext): Command = {
FlintSparkSqlCommand() { flint =>
// TODO: support filtering condition
if (ctx.whereClause() != null) {
throw new UnsupportedOperationException(
s"Filtering condition is not supported: ${getSqlText(ctx.whereClause())}")
}

val indexName = ctx.indexName.getText
val tableName = getFullTableName(flint, ctx.tableName)
val indexBuilder =
Expand All @@ -46,6 +40,10 @@ trait FlintSparkCoveringIndexAstBuilder extends FlintSparkSqlExtensionsVisitor[A
indexBuilder.addIndexColumns(colName)
}

if (ctx.whereClause() != null) {
indexBuilder.filterBy(getSqlText(ctx.whereClause().filterCondition()))
}

val ignoreIfExists = ctx.EXISTS() != null
val indexOptions = visitPropertyList(ctx.propertyList())
indexBuilder
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ class FlintSparkSqlParserSuite extends FlintSuite with Matchers {
} should have message "Filtering condition is not supported: WHERE status != 200"
}

test("create covering index with filtering condition") {
ignore("create covering index with filtering condition") {
the[UnsupportedOperationException] thrownBy {
sql("""
| CREATE INDEX test ON alb_logs
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
.name(testIndex)
.onTable(testTable)
.addIndexColumns("name", "age")
.filterBy("age > 30")
.create()

val index = flint.describeIndex(testFlintIndex)
Expand All @@ -60,7 +61,9 @@ class FlintSparkCoveringIndexITSuite extends FlintSparkSuite {
| }],
| "source": "spark_catalog.default.ci_test",
| "options": { "auto_refresh": "false" },
| "properties": {}
| "properties": {
| "filterCondition": "age > 30"
| }
| },
| "properties": {
| "name": {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,22 @@ class FlintSparkCoveringIndexSqlITSuite extends FlintSparkSuite {
indexData.count() shouldBe 2
}

test("create covering index with filtering condition") {
sql(s"""
| CREATE INDEX $testIndex ON $testTable
| (name, age)
| WHERE address = 'Portland'
| WITH (auto_refresh = true)
|""".stripMargin)

// Wait for streaming job complete current micro batch
val job = spark.streams.active.find(_.name == testFlintIndex)
awaitStreamingComplete(job.get.id.toString)

val indexData = flint.queryIndex(testFlintIndex)
indexData.count() shouldBe 1
}

test("create covering index with streaming job options") {
withTempDir { checkpointDir =>
sql(s"""
Expand Down

0 comments on commit 4695f4b

Please sign in to comment.