Skip to content

Commit

Permalink
update the flint client api
Browse files Browse the repository at this point in the history
Signed-off-by: YANGDB <[email protected]>
  • Loading branch information
YANG-DB committed Oct 18, 2023
1 parent 79936ae commit d2677d0
Show file tree
Hide file tree
Showing 8 changed files with 260 additions and 156 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,13 @@ public interface FlintClient {
*/
void createIndex(String indexName, FlintMetadata metadata);

/**
* Create an alias name for the given index
* @param indexName
* @param aliasName
*/
void alias(String indexName, String aliasName, FlintMetadata metadata);

/**
* Does Flint index with the given name exist
*
Expand Down

Large diffs are not rendered by default.

Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,11 @@ class FlintSpark(val spark: SparkSession) {
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
val indexName = index.name()
if (flintClient.exists(indexName)) {
val targetName = index.targetName()
if (targetName.nonEmpty) {
//use targetIndex as the index to store the acceleration data
flintClient.alias(targetName.get, indexName, index.metadata())
} else if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ trait FlintSparkIndex {
* @return
* Flint target index name
*/
def targetName(): String
def targetName(): Option[String]

/**
* @return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ import org.apache.spark.sql._
* indexed column list
*/
case class FlintSparkCoveringIndex(
targetIndexName: Option[String],
targetIndexName: Option[String] = None,
indexName: String,
tableName: String,
indexedColumns: Map[String, String],
Expand All @@ -45,8 +45,8 @@ case class FlintSparkCoveringIndex(
* @return
* Flint target index name - index that already exist or has existing template to be created with
*/
override def targetName(): String = {
targetIndexName.getOrElse(name())
override def targetName(): Option[String] = {
targetIndexName
}

override def metadata(): FlintMetadata = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@ import org.apache.spark.sql.flint.{logicalPlanToDataFrame, qualifyTableName}
* index options
*/
case class FlintSparkMaterializedView(
targetIndexName: Option[String],
targetIndexName: Option[String] = None,
mvName: String,
query: String,
outputSchema: Map[String, String],
Expand All @@ -56,8 +56,8 @@ case class FlintSparkMaterializedView(
* @return
* Flint target index name - index that already exist or has existing template to be created with
*/
override def targetName(): String = {
targetIndexName.getOrElse(name())
override def targetName(): Option[String] = {
targetIndexName
}

override def metadata(): FlintMetadata = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,8 +43,8 @@ case class FlintSparkSkippingIndex(
* @return
* Flint target index name ( in skipping index case not allowing using existing indices)
*/
def targetName(): String = {
name()
def targetName(): Option[String] = {
None
}

override def name(): String = {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,53 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| }
|""".stripMargin)
}

test("create materialized view using existing OpebnSearch index successfully") {
val indexOptions =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/"))
flint
.materializedView()
.targetName("existing_index")
.name(testMvName)
.query(testQuery)
.options(indexOptions)
.create()

val index = flint.describeIndex("existing_index")
index shouldBe defined
index.get.metadata().getContent should matchJson(s"""
| {
| "_meta": {
| "version": "${current()}",
| "name": "spark_catalog.default.mv_test_metrics",
| "kind": "mv",
| "source": "$testQuery",
| "indexedColumns": [
| {
| "columnName": "startTime",
| "columnType": "timestamp"
| },{
| "columnName": "count",
| "columnType": "long"
| }],
| "options": {
| "auto_refresh": "true",
| "checkpoint_location": "s3://test/"
| },
| "properties": {}
| },
| "properties": {
| "startTime": {
| "type": "date",
| "format": "strict_date_optional_time_nanos"
| },
| "count": {
| "type": "long"
| }
| }
| }
|""".stripMargin)
}

// TODO: fix this windowing function unable to be used in GROUP BY
ignore("full refresh materialized view") {
Expand Down

0 comments on commit d2677d0

Please sign in to comment.