Skip to content

Commit

Permalink
Pass index option to Flint metadata and streaming job
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Sep 21, 2023
1 parent 519f1fb commit fcaa6e1
Show file tree
Hide file tree
Showing 4 changed files with 30 additions and 4 deletions.
8 changes: 7 additions & 1 deletion flint-spark-integration/src/main/antlr4/SparkSqlBase.g4
Original file line number Diff line number Diff line change
Expand Up @@ -163,7 +163,6 @@ INDEX: 'INDEX';
ON: 'ON';
PARTITION: 'PARTITION';
REFRESH: 'REFRESH';
STRING: 'STRING';
TRUE: 'TRUE';
WITH: 'WITH';

Expand All @@ -172,6 +171,13 @@ EQ : '=' | '==';
MINUS: '-';


STRING
: '\'' ( ~('\''|'\\') | ('\\' .) )* '\''
| '"' ( ~('"'|'\\') | ('\\' .) )* '"'
| 'R\'' (~'\'')* '\''
| 'R"'(~'"')* '"'
;

INTEGER_VALUE
: DIGIT+
;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ import org.apache.spark.sql.flint.FlintDataSourceV2.FLINT_DATASOURCE
import org.apache.spark.sql.flint.config.FlintSparkConf
import org.apache.spark.sql.flint.config.FlintSparkConf.{DOC_ID_COLUMN_NAME, IGNORE_DOC_ID_COLUMN}
import org.apache.spark.sql.streaming.OutputMode.Append
import org.apache.spark.sql.streaming.Trigger

/**
* Flint Spark integration API entrypoint.
Expand Down Expand Up @@ -129,11 +130,20 @@ class FlintSpark(val spark: SparkSession) {
.writeStream
.queryName(indexName)
.outputMode(Append())

index.options
.checkpointLocation()
.foreach(location => job.option("checkpointLocation", location))
index.options
.refreshInterval()
.foreach(interval => job.trigger(Trigger.ProcessingTime(interval)))

val jobId = job
.foreachBatch { (batchDF: DataFrame, _: Long) =>
writeFlintIndex(batchDF)
}
.start()
Some(job.id.toString)
Some(jobId.toString)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ class FlintSparkCoveringIndex(
| "name": "$indexName",
| "kind": "$kind",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName"
| "source": "$tableName",
| "options": $getIndexOptions
| },
| "properties": $getSchema
| }
Expand All @@ -71,6 +72,10 @@ class FlintSparkCoveringIndex(
Serialization.write(JArray(objects))
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getSchema: String = {
val catalogDDL =
indexedColumns
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,8 @@ class FlintSparkSkippingIndex(
| "version": "${FlintVersion.current()}",
| "kind": "$SKIPPING_INDEX_TYPE",
| "indexedColumns": $getMetaInfo,
| "source": "$tableName"
| "source": "$tableName",
| "options": $getIndexOptions
| },
| "properties": $getSchema
| }
Expand All @@ -84,6 +85,10 @@ class FlintSparkSkippingIndex(
Serialization.write(indexedColumns)
}

private def getIndexOptions: String = {
Serialization.write(options.options)
}

private def getSchema: String = {
val allFieldTypes =
indexedColumns.flatMap(_.outputSchema()).toMap + (FILE_PATH_COLUMN -> "string")
Expand Down

0 comments on commit fcaa6e1

Please sign in to comment.