diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala index 3f610d74b..b2e310cc7 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSpark.scala @@ -45,8 +45,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava) /** Flint client for low-level index operation */ - protected val flintClient: FlintClient = - FlintClientBuilder.build(flintSparkConf.flintOptions()) + private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions()) override protected val flintMetadataLogService: FlintMetadataLogService = { FlintMetadataLogServiceBuilder.build( @@ -99,10 +98,9 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w * @param ignoreIfExists * Ignore existing index */ - def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = { - val indexName = index.name() - val opName = s"Create Flint index with ignoreIfExists $ignoreIfExists" - withTransaction[Unit](indexName, opName, forceInit = true) { tx => + def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = + withTransaction[Unit](index.name(), "Create Flint index", forceInit = true) { tx => + val indexName = index.name() if (flintClient.exists(indexName)) { if (!ignoreIfExists) { throw new IllegalStateException(s"Flint index $indexName already exists") @@ -122,7 +120,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w }) } } - } /** * Start refreshing index data according to the given mode. diff --git a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala index 329cc94ca..343299a8c 100644 --- a/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala +++ b/flint-spark-integration/src/main/scala/org/opensearch/flint/spark/FlintSparkIndexMonitor.scala @@ -35,9 +35,8 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor */ class FlintSparkIndexMonitor( spark: SparkSession, - override val flintMetadataLogService: FlintMetadataLogService) - extends FlintSparkTransactionSupport - with Logging { + flintMetadataLogService: FlintMetadataLogService) + extends Logging { /** Task execution initial delay in seconds */ private val INITIAL_DELAY_SECONDS = FlintSparkConf().monitorInitialDelaySeconds() @@ -153,6 +152,7 @@ class FlintSparkIndexMonitor( private var errorCnt = 0 override def run(): Unit = { + logInfo(s"Scheduler trigger index monitor task for $indexName") try { if (isStreamingJobActive(indexName)) { logInfo("Streaming job is still active")