Skip to content

Commit

Permalink
Add new Spark conf for initial delay too
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed May 17, 2024
1 parent 4836c6e commit 2990adf
Show file tree
Hide file tree
Showing 3 changed files with 32 additions and 14 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -154,14 +154,18 @@ object FlintSparkConf {
.doc("Checkpoint location for incremental refresh index will be mandatory if enabled")
.createWithDefault("true")

val MONITOR_MAX_ERROR_COUNT = FlintConfig("spark.flint.monitor.maxErrorCount")
.doc("Maximum number of consecutive errors allowed in index monitor")
.createWithDefault("5")
val MONITOR_INITIAL_DELAY_SECONDS = FlintConfig("spark.flint.monitor.initialDelaySeconds")
.doc("Initial delay in seconds before starting the monitoring task")
.createWithDefault("15")

val MONITOR_INTERVAL_SECONDS = FlintConfig("spark.flint.monitor.interval")
val MONITOR_INTERVAL_SECONDS = FlintConfig("spark.flint.monitor.intervalSeconds")
.doc("Interval in seconds for scheduling the monitoring task")
.createWithDefault("60")

val MONITOR_MAX_ERROR_COUNT = FlintConfig("spark.flint.monitor.maxErrorCount")
.doc("Maximum number of consecutive errors allowed in index monitor")
.createWithDefault("5")

val SOCKET_TIMEOUT_MILLIS =
FlintConfig(s"spark.datasource.flint.${FlintOptions.SOCKET_TIMEOUT_MILLIS}")
.datasourceOption()
Expand Down Expand Up @@ -231,10 +235,12 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def isCheckpointMandatory: Boolean = CHECKPOINT_MANDATORY.readFrom(reader).toBoolean

def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt
def monitorInitialDelaySeconds(): Int = MONITOR_INITIAL_DELAY_SECONDS.readFrom(reader).toInt

def monitorIntervalSeconds(): Int = MONITOR_INTERVAL_SECONDS.readFrom(reader).toInt

def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt

/**
* spark.sql.session.timeZone
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,23 +35,32 @@ class FlintSparkIndexMonitor(
dataSourceName: String)
extends Logging {

/** Task execution initial delay in seconds */
private val INITIAL_DELAY_SECONDS = FlintSparkConf().monitorInitialDelaySeconds()

/** Task execution interval in seconds */
private val INTERVAL_SECONDS = FlintSparkConf().monitorIntervalSeconds()

/** Max error count allowed */
private val MAX_ERROR_COUNT = FlintSparkConf().monitorMaxErrorCount()

/** Task execution interval */
private val INTERVAL = FlintSparkConf().monitorIntervalSeconds()

/**
* Start monitoring task on the given Flint index.
*
* @param indexName
* Flint index name
*/
def startMonitor(indexName: String): Unit = {
logInfo(s"""Starting index monitor for $indexName with configuration:
| - Initial delay: $INITIAL_DELAY_SECONDS seconds
| - Interval: $INTERVAL_SECONDS seconds
| - Max error count: $MAX_ERROR_COUNT
|""".stripMargin)

val task = FlintSparkIndexMonitor.executor.scheduleWithFixedDelay(
new FlintSparkIndexMonitorTask(indexName),
15, // Delay to ensure final logging is complete first, otherwise version conflicts
INTERVAL,
INITIAL_DELAY_SECONDS, // Delay to ensure final logging is complete first, otherwise version conflicts
INTERVAL_SECONDS,
TimeUnit.SECONDS)

FlintSparkIndexMonitor.indexMonitorTracker.put(indexName, task)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import org.opensearch.flint.core.http.FlintRetryOptions._
import org.scalatest.matchers.should.Matchers.convertToAnyShouldWrapper

import org.apache.spark.FlintSuite
import org.apache.spark.sql.flint.config.FlintSparkConf.{MONITOR_INTERVAL_SECONDS, MONITOR_MAX_ERROR_COUNT}
import org.apache.spark.sql.flint.config.FlintSparkConf.{MONITOR_INITIAL_DELAY_SECONDS, MONITOR_INTERVAL_SECONDS, MONITOR_MAX_ERROR_COUNT}

class FlintSparkConfSuite extends FlintSuite {
test("test spark conf") {
Expand Down Expand Up @@ -87,16 +87,19 @@ class FlintSparkConfSuite extends FlintSuite {

test("test index monitor options") {
val defaultConf = FlintSparkConf()
defaultConf.monitorMaxErrorCount() shouldBe 5
defaultConf.monitorInitialDelaySeconds() shouldBe 15
defaultConf.monitorIntervalSeconds() shouldBe 60
defaultConf.monitorMaxErrorCount() shouldBe 5

withSparkConf(MONITOR_MAX_ERROR_COUNT.key, MONITOR_INTERVAL_SECONDS.key) {
setFlintSparkConf(MONITOR_MAX_ERROR_COUNT, 10)
setFlintSparkConf(MONITOR_INITIAL_DELAY_SECONDS, 5)
setFlintSparkConf(MONITOR_INTERVAL_SECONDS, 30)
setFlintSparkConf(MONITOR_MAX_ERROR_COUNT, 10)

val overrideConf = FlintSparkConf()
overrideConf.monitorMaxErrorCount() shouldBe 10
defaultConf.monitorInitialDelaySeconds() shouldBe 5
overrideConf.monitorIntervalSeconds() shouldBe 30
overrideConf.monitorMaxErrorCount() shouldBe 10
}
}

Expand Down

0 comments on commit 2990adf

Please sign in to comment.