From bad672e0226def9e482ac575864a4302613c4c57 Mon Sep 17 00:00:00 2001 From: Chen Dai Date: Mon, 24 Jun 2024 16:34:32 -0700 Subject: [PATCH] Revert unused changes Signed-off-by: Chen Dai --- .../scala/org/opensearch/flint/spark/FlintSpark.scala | 11 ++++------- .../flint/spark/FlintSparkIndexMonitor.scala | 6 +++--- 2 files changed, 7 insertions(+), 10 deletions(-) diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 3f610d74b..b2e310cc7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -45,8 +45,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava) /** Flint client for low-level index operation */ - protected val flintClient: FlintClient = - FlintClientBuilder.build(flintSparkConf.flintOptions()) + private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) override protected val flintMetadataLogService: FlintMetadataLogService = { FlintMetadataLogServiceBuilder.build( @@ -99,10 +98,9 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w * @param ignoreIfExists * Ignore existing index */ - def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = { - val indexName = index.name() - val opName = s"Create Flint index with ignoreIfExists $ignoreIfExists" - withTransaction[Unit](indexName, opName, forceInit = true) { tx => + def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = + withTransaction[Unit](index.name(), "Create Flint index", forceInit = true) { tx => + val indexName = index.name() if (flintClient.exists(indexName)) { if (!ignoreIfExists) { throw new IllegalStateException(s"Flint index $indexName already exists") @@ -122,7 +120,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w }) } } - } /** * Start refreshing index data according to the given mode. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 329cc94ca..343299a8c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -35,9 +35,8 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor */ class FlintSparkIndexMonitor( spark: SparkSession, - override val flintMetadataLogService: FlintMetadataLogService) - extends FlintSparkTransactionSupport - with Logging { + flintMetadataLogService: FlintMetadataLogService) + extends Logging { /** Task execution initial delay in seconds */ private val INITIAL_DELAY_SECONDS = FlintSparkConf().monitorInitialDelaySeconds() @@ -153,6 +152,7 @@ class FlintSparkIndexMonitor( private var errorCnt = 0 override def run(): Unit = { + logInfo(s"Scheduler trigger index monitor task for $indexName") try { if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active")