Skip to content

Commit

Permalink
Rename refresh mode to manual and auto
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jan 18, 2024
1 parent 15c990e commit 2d7b174
Showing 1 changed file with 9 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ import org.json4s.native.Serialization
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState._
import org.opensearch.flint.core.metadata.log.OptimisticTransaction.NO_LOG_ENTRY
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{FULL, INCREMENTAL, RefreshMode}
import org.opensearch.flint.spark.FlintSpark.RefreshMode.{AUTO, MANUAL, RefreshMode}
import org.opensearch.flint.spark.FlintSparkIndex.{ID_COLUMN, StreamingRefresh}
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
import org.opensearch.flint.spark.mv.FlintSparkMaterializedView
Expand Down Expand Up @@ -139,7 +139,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
logInfo(s"Refreshing Flint index $indexName")
val index = describeIndex(indexName)
.getOrElse(throw new IllegalStateException(s"Index $indexName doesn't exist"))
val mode = if (index.options.autoRefresh()) INCREMENTAL else FULL
val mode = if (index.options.autoRefresh()) AUTO else MANUAL

try {
flintClient
Expand All @@ -149,7 +149,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
// Change state to active if full, otherwise update index state regularly
if (mode == FULL) {
if (mode == MANUAL) {
logInfo("Updating index state to active")
latest.copy(state = ACTIVE)
} else {
Expand Down Expand Up @@ -293,7 +293,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ => doRefreshIndex(index.get, indexName, INCREMENTAL))
.commit(_ => doRefreshIndex(index.get, indexName, AUTO))

logInfo("Recovery complete")
true
Expand Down Expand Up @@ -344,17 +344,17 @@ class FlintSpark(val spark: SparkSession) extends Logging {
}

val jobId = mode match {
case FULL if isIncrementalRefreshing(indexName) =>
case MANUAL if isIncrementalRefreshing(indexName) =>
throw new IllegalStateException(
s"Index $indexName is incremental refreshing and cannot be manual refreshed")

case FULL =>
case MANUAL =>
logInfo("Start refreshing index in batch style")
batchRefresh()
None

// Flint index has specialized logic and capability for incremental refresh
case INCREMENTAL if index.isInstanceOf[StreamingRefresh] =>
case AUTO if index.isInstanceOf[StreamingRefresh] =>
logInfo("Start refreshing index in streaming style")
val job =
index
Expand All @@ -369,7 +369,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
case INCREMENTAL =>
case AUTO =>
logInfo("Start refreshing index in foreach streaming style")
val job = spark.readStream
.options(options.extraSourceOptions(tableName))
Expand Down Expand Up @@ -439,6 +439,6 @@ object FlintSpark {
*/
object RefreshMode extends Enumeration {
type RefreshMode = Value
val FULL, INCREMENTAL = Value
val MANUAL, AUTO = Value
}
}

0 comments on commit 2d7b174

Please sign in to comment.