Skip to content

Commit

Permalink
Revert unused changes
Browse files Browse the repository at this point in the history
Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen committed Jun 24, 2024
1 parent cca8711 commit bad672e
Show file tree
Hide file tree
Showing 2 changed files with 7 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -45,8 +45,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
IGNORE_DOC_ID_COLUMN.optionKey -> "true").asJava)

/** Flint client for low-level index operation */
protected val flintClient: FlintClient =
FlintClientBuilder.build(flintSparkConf.flintOptions())
private val flintClient: FlintClient = FlintClientBuilder.build(flintSparkConf.flintOptions())

override protected val flintMetadataLogService: FlintMetadataLogService = {
FlintMetadataLogServiceBuilder.build(
Expand Down Expand Up @@ -99,10 +98,9 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
* @param ignoreIfExists
* Ignore existing index
*/
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit = {
val indexName = index.name()
val opName = s"Create Flint index with ignoreIfExists $ignoreIfExists"
withTransaction[Unit](indexName, opName, forceInit = true) { tx =>
def createIndex(index: FlintSparkIndex, ignoreIfExists: Boolean = false): Unit =
withTransaction[Unit](index.name(), "Create Flint index", forceInit = true) { tx =>
val indexName = index.name()
if (flintClient.exists(indexName)) {
if (!ignoreIfExists) {
throw new IllegalStateException(s"Flint index $indexName already exists")
Expand All @@ -122,7 +120,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
})
}
}
}

/**
* Start refreshing index data according to the given mode.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,9 +35,8 @@ import org.apache.spark.sql.flint.newDaemonThreadPoolScheduledExecutor
*/
class FlintSparkIndexMonitor(
spark: SparkSession,
override val flintMetadataLogService: FlintMetadataLogService)
extends FlintSparkTransactionSupport
with Logging {
flintMetadataLogService: FlintMetadataLogService)
extends Logging {

/** Task execution initial delay in seconds */
private val INITIAL_DELAY_SECONDS = FlintSparkConf().monitorInitialDelaySeconds()
Expand Down Expand Up @@ -153,6 +152,7 @@ class FlintSparkIndexMonitor(
private var errorCnt = 0

override def run(): Unit = {
logInfo(s"Scheduler trigger index monitor task for $indexName")
try {
if (isStreamingJobActive(indexName)) {
logInfo("Streaming job is still active")
Expand Down

0 comments on commit bad672e

Please sign in to comment.