Skip to content

Commit

Permalink
Revert "[0.5-nexus] Write mock metadata cache data to mappings _meta (o…
Browse files Browse the repository at this point in the history
…pensearch-project#744)"

This reverts commit b3adf46.
  • Loading branch information
seankao-az committed Oct 30, 2024
1 parent 8e72f85 commit a562501
Show file tree
Hide file tree
Showing 5 changed files with 9 additions and 324 deletions.

This file was deleted.

This file was deleted.

Original file line number Diff line number Diff line change
Expand Up @@ -253,10 +253,6 @@ object FlintSparkConf {
FlintConfig("spark.metadata.accessAWSCredentialsProvider")
.doc("AWS credentials provider for metadata access permission")
.createOptional()
val METADATA_CACHE_WRITE = FlintConfig("spark.flint.metadataCacheWrite.enabled")
.doc("Enable Flint metadata cache write to Flint index mappings")
.createWithDefault("true")

val CUSTOM_SESSION_MANAGER =
FlintConfig("spark.flint.job.customSessionManager")
.createOptional()
Expand Down Expand Up @@ -313,8 +309,6 @@ case class FlintSparkConf(properties: JMap[String, String]) extends Serializable

def monitorMaxErrorCount(): Int = MONITOR_MAX_ERROR_COUNT.readFrom(reader).toInt

def isMetadataCacheWriteEnabled: Boolean = METADATA_CACHE_WRITE.readFrom(reader).toBoolean

/**
* spark.sql.session.timeZone
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,6 @@ import org.opensearch.flint.common.scheduler.AsyncQueryScheduler
import org.opensearch.flint.core.{FlintClient, FlintClientBuilder}
import org.opensearch.flint.core.metadata.FlintIndexMetadataServiceBuilder
import org.opensearch.flint.core.metadata.log.FlintMetadataLogServiceBuilder
import org.opensearch.flint.core.storage.FlintOpenSearchMetadataCacheWriter
import org.opensearch.flint.spark.FlintSparkIndex.ID_COLUMN
import org.opensearch.flint.spark.FlintSparkIndexOptions.OptionName._
import org.opensearch.flint.spark.covering.FlintSparkCoveringIndex
Expand Down Expand Up @@ -55,9 +54,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
FlintIndexMetadataServiceBuilder.build(flintSparkConf.flintOptions())
}

private val flintMetadataCacheWriteService = new FlintOpenSearchMetadataCacheWriter(
flintSparkConf.flintOptions())

private val flintAsyncQueryScheduler: AsyncQueryScheduler = {
AsyncQuerySchedulerBuilder.build(flintSparkConf.flintOptions())
}
Expand Down Expand Up @@ -119,6 +115,7 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
throw new IllegalStateException(s"Flint index $indexName already exists")
}
} else {
val metadata = index.metadata()
val jobSchedulingService = FlintSparkJobSchedulingService.create(
index,
spark,
Expand All @@ -130,17 +127,14 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(latest => {
val metadata = latest match {
case null => // in case transaction capability is disabled
index.metadata()
case latestEntry =>
logInfo(s"Creating index with metadata log entry ID ${latestEntry.id}")
index.metadata().copy(latestId = Some(latestEntry.id))
}
flintClient.createIndex(indexName, metadata)
flintIndexMetadataService.updateIndexMetadata(indexName, metadata)
if (isMetadataCacheWriteEnabled) {
flintMetadataCacheWriteService.updateMetadataCache(indexName, metadata)
if (latest == null) { // in case transaction capability is disabled
flintClient.createIndex(indexName, metadata)
flintIndexMetadataService.updateIndexMetadata(indexName, metadata)
} else {
logInfo(s"Creating index with metadata log entry ID ${latest.id}")
flintClient.createIndex(indexName, metadata.copy(latestId = Some(latest.id)))
flintIndexMetadataService
.updateIndexMetadata(indexName, metadata.copy(latestId = Some(latest.id)))
}
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.SCHEDULE)
})
Expand Down Expand Up @@ -405,10 +399,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
new DataTypeSkippingStrategy().analyzeSkippingIndexColumns(tableName, spark)
}

private def isMetadataCacheWriteEnabled: Boolean = {
flintSparkConf.isMetadataCacheWriteEnabled
}

private def getAllIndexMetadata(indexNamePattern: String): Map[String, FlintMetadata] = {
if (flintIndexMetadataService.supportsGetByIndexPattern) {
flintIndexMetadataService
Expand Down Expand Up @@ -578,9 +568,6 @@ class FlintSpark(val spark: SparkSession) extends FlintSparkTransactionSupport w
})
.commit(_ => {
flintIndexMetadataService.updateIndexMetadata(indexName, index.metadata)
if (isMetadataCacheWriteEnabled) {
flintMetadataCacheWriteService.updateMetadataCache(indexName, index.metadata)
}
logInfo("Update index options complete")
jobSchedulingService.handleJob(index, AsyncQuerySchedulerAction.UPDATE)
})
Expand Down

This file was deleted.

0 comments on commit a562501

Please sign in to comment.