From 6bae45968e9b123ab024453ce050d01ff35e1b2a Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Fri, 31 May 2024 17:27:58 -0700 Subject: [PATCH] Address PR comments Signed-off-by: Chen Dai --- .../opensearch/flint/spark/FlintSparkIndexMonitor.scala | 7 +++++-- .../test/scala/org/apache/spark/sql/FlintJobITSuite.scala | 2 +- 2 files changed, 6 insertions(+), 3 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 434feb980..d3f3ff0ee 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -6,6 +6,7 @@ package org.opensearch.flint.spark import java.time.Duration +import java.time.temporal.ChronoUnit.SECONDS import java.util.Collections.singletonList import java.util.concurrent.{ScheduledExecutorService, ScheduledFuture, TimeUnit} @@ -119,7 +120,8 @@ class FlintSparkIndexMonitor( } catch { case e: Throwable => /** - * Transition the index state to FAILED upon encountering an exception. + * Transition the index state to FAILED upon encountering an exception. Retry in case + * conflicts with final transaction in scheduled task. * ``` * TODO: * 1) Determine the appropriate state code based on the type of exception encountered @@ -202,7 +204,8 @@ class FlintSparkIndexMonitor( val retryPolicy = RetryPolicy .builder[Unit]() .handle(classOf[Throwable]) - .withDelay(Duration.ofSeconds(1)) + .withBackoff(1, 30, SECONDS) + .withJitter(Duration.ofMillis(100)) .withMaxRetries(3) .onFailedAttempt((event: ExecutionAttemptedEvent[Unit]) => logError("Attempt to update index state failed: " + event)) diff --git a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala index bcb1a16c5..990b9e449 100644 --- a/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala +++ b/integ-test/src/test/scala/org/apache/spark/sql/FlintJobITSuite.scala @@ -84,7 +84,7 @@ class FlintJobITSuite extends FlintSparkSuite with JobTest { * all Spark conf required by Flint code underlying manually. */ spark.conf.set(DATA_SOURCE_NAME.key, dataSourceName) - spark.conf.set(JOB_TYPE.key, "batch") + spark.conf.set(JOB_TYPE.key, "streaming") /** * FlintJob.main() is not called because we need to manually set these variables within a