Skip to content

Commit

Permalink
Add more javadoc
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Oct 13, 2023
1 parent a6121fd commit 24f2296
Show file tree
Hide file tree
Showing 4 changed files with 26 additions and 5 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -137,6 +137,7 @@ class FlintSpark(val spark: SparkSession) {
batchRefresh()
None

// Flint index has specialized logic and capability for incremental refresh
case INCREMENTAL if index.isInstanceOf[StreamingRefresh] =>
val job =
index
Expand All @@ -152,6 +153,7 @@ class FlintSpark(val spark: SparkSession) {
.start(indexName)
Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
case INCREMENTAL =>
val job = spark.readStream
.table(tableName)
Expand Down Expand Up @@ -256,6 +258,7 @@ class FlintSpark(val spark: SparkSession) {
val indexOptions = FlintSparkIndexOptions(
metadata.options.asScala.mapValues(_.asInstanceOf[String]).toMap)

// Convert generic Map[String,AnyRef] in metadata to specific data structure in Flint index
metadata.kind match {
case SKIPPING_INDEX_TYPE =>
val strategies = metadata.indexedColumns.map { colInfo =>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ object FlintSparkIndex {
builder.kind(index.kind)
builder.options(index.options.options.mapValues(_.asInstanceOf[AnyRef]).asJava)

// Index properties
// Optional index properties
val envs = populateEnvToMetadata
if (envs.nonEmpty) {
builder.addProperty("env", envs.asJava)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,18 @@ import org.apache.spark.sql.catalyst.util.IntervalUtils
import org.apache.spark.sql.flint.logicalPlanToDataFrame
import org.apache.spark.unsafe.types.UTF8String

/**
* Flint materialized view in Spark.
*
* @param mvName
* MV name
* @param query
* source query that generates MV data
* @param outputSchema
* output schema
* @param options
* index options
*/
case class FlintSparkMaterializedView(
mvName: String,
query: String,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,10 +42,13 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
}

test("create materialized view with metadata successfully") {
val indexOptions =
FlintSparkIndexOptions(Map("auto_refresh" -> "true", "checkpoint_location" -> "s3://test/"))
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.options(indexOptions)
.create()

val index = flint.describeIndex(testFlintIndex)
Expand All @@ -65,7 +68,10 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {
| "columnName": "count",
| "columnType": "long"
| }],
| "options": {},
| "options": {
| "auto_refresh": "true",
| "checkpoint_location": "s3://test/"
| },
| "properties": {}
| },
| "properties": {
Expand Down Expand Up @@ -102,13 +108,13 @@ class FlintSparkMaterializedViewITSuite extends FlintSparkSuite {

test("incremental refresh materialized view") {
withTempDir { checkpointDir =>
val checkpointOption =
FlintSparkIndexOptions(Map("checkpoint_location" -> checkpointDir.getAbsolutePath))
val indexOptions = FlintSparkIndexOptions(
Map("auto_refresh" -> "true", "checkpoint_location" -> checkpointDir.getAbsolutePath))
flint
.materializedView()
.name(testMvName)
.query(testQuery)
.options(checkpointOption)
.options(indexOptions)
.create()

flint
Expand Down

0 comments on commit 24f2296

Please sign in to comment.