Skip to content

Commit

Permalink
Address PR comments
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 1, 2024
1 parent bafba6b commit 6bae459
Show file tree
Hide file tree
Showing 2 changed files with 6 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.spark

import java.time.Duration
import java.time.temporal.ChronoUnit.SECONDS
import java.util.Collections.singletonList
import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit}

Expand Down Expand Up @@ -119,7 +120,8 @@ class FlintSparkIndexMonitor(
} catch {
case e: Throwable =>
/**
* Transition the index state to FAILED upon encountering an exception.
* Transition the index state to FAILED upon encountering an exception. Retry in case
* conflicts with final transaction in scheduled task.
* ```
* TODO:
* 1) Determine the appropriate state code based on the type of exception encountered
Expand Down Expand Up @@ -202,7 +204,8 @@ class FlintSparkIndexMonitor(
val retryPolicy = RetryPolicy
.builder[Unit]()
.handle(classOf[Throwable])
.withDelay(Duration.ofSeconds(1))
.withBackoff(1, 30, SECONDS)
.withJitter(Duration.ofMillis(100))
.withMaxRetries(3)
.onFailedAttempt((event: ExecutionAttemptedEvent[Unit]) =>
logError("Attempt to update index state failed: " + event))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -84,7 +84,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest {
* all Spark conf required by Flint code underlying manually.
*/
spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName)
spark.conf.set(JOB_TYPE.key, "batch")
spark.conf.set(JOB_TYPE.key, "streaming")

/**
* FlintJob.main() is not called because we need to manually set these variables within a
Expand Down

0 comments on commit 6bae459

Please sign in to comment.