Skip to content

Commit

Permalink
Add more info in Flint metadata log (#125)
Browse files Browse the repository at this point in the history
* Add missing latestId field

Signed-off-by: Chen Dai <[email protected]>

* Add create time and UT

Signed-off-by: Chen Dai <[email protected]>

* Change FlintSpark API to update job start time

Signed-off-by: Chen Dai <[email protected]>

* Remove unused EMR info in Flint metadata

Signed-off-by: Chen Dai <[email protected]>

* Singleton scheduler executor service and add shutdown hook

Signed-off-by: Chen Dai <[email protected]>

* Support recreate index from logical deleted index

Signed-off-by: Chen Dai <[email protected]>

* Add revert transient log capability for optimistic transaction

Signed-off-by: Chen Dai <[email protected]>

* Add transaction IT with recover command

Signed-off-by: Chen Dai <[email protected]>

* Extract new index monitor class

Signed-off-by: Chen Dai <[email protected]>

* Fix flaky IT due to background scheduler task

Signed-off-by: Chen Dai <[email protected]>

---------

Signed-off-by: Chen Dai <[email protected]>
  • Loading branch information
dai-chen authored Nov 7, 2023
1 parent 5abdffc commit d1280cf
Show file tree
Hide file tree
Showing 12 changed files with 400 additions and 90 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@

package org.opensearch.flint.core.metadata.log;

import static java.util.logging.Level.SEVERE;
import static java.util.logging.Level.WARNING;
import static org.opensearch.flint.core.metadata.log.FlintMetadataLogEntry.IndexState$;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_PRIMARY_TERM;
import static org.opensearch.index.seqno.SequenceNumbers.UNASSIGNED_SEQ_NO;
Expand Down Expand Up @@ -76,23 +78,46 @@ public T commit(Function<FlintMetadataLogEntry, T> operation) {
metadataLog.getLatest().orElseGet(() -> metadataLog.add(emptyLogEntry()));

// Perform initial log check
if (initialCondition.test(latest)) {
if (!initialCondition.test(latest)) {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
}

// Append optional transient log
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));
}
// Append optional transient log
FlintMetadataLogEntry initialLog = latest;
if (transientAction != null) {
latest = metadataLog.add(transientAction.apply(latest));

// Copy latest seqNo and primaryTerm to initialLog for potential rollback use
initialLog = initialLog.copy(
initialLog.id(),
latest.seqNo(),
latest.primaryTerm(),
initialLog.createTime(),
initialLog.state(),
initialLog.dataSource(),
initialLog.error());
}

// Perform operation
// Perform operation
try {
T result = operation.apply(latest);

// Append final log
metadataLog.add(finalAction.apply(latest));
return result;
} else {
LOG.warning("Initial log entry doesn't satisfy precondition " + latest);
throw new IllegalStateException(
"Transaction failed due to initial log precondition not satisfied");
} catch (Exception e) {
LOG.log(SEVERE, "Rolling back transient log due to transaction operation failure", e);
try {
// Roll back transient log if any
if (transientAction != null) {
metadataLog.add(initialLog);
}
} catch (Exception ex) {
LOG.log(WARNING, "Failed to rollback transient log", ex);
}
throw new IllegalStateException("Failed to commit transaction operation");
}
}

Expand All @@ -101,6 +126,7 @@ private FlintMetadataLogEntry emptyLogEntry() {
"",
UNASSIGNED_SEQ_NO,
UNASSIGNED_PRIMARY_TERM,
0L,
IndexState$.MODULE$.EMPTY(),
dataSourceName,
"");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,15 +29,22 @@ case class FlintMetadataLogEntry(
id: String,
seqNo: Long,
primaryTerm: Long,
/**
* This is currently used as streaming job start time. In future, this should represent the
* create timestamp of the log entry
*/
createTime: Long,
state: IndexState,
dataSource: String, // TODO: get from Spark conf
dataSource: String,
error: String) {

def this(id: String, seqNo: Long, primaryTerm: Long, map: java.util.Map[String, AnyRef]) {
this(
id,
seqNo,
primaryTerm,
/* getSourceAsMap() may use Integer or Long even though it's always long in index mapping */
map.get("jobStartTime").asInstanceOf[Number].longValue(),
IndexState.from(map.get("state").asInstanceOf[String]),
map.get("dataSourceName").asInstanceOf[String],
map.get("error").asInstanceOf[String])
Expand All @@ -48,12 +55,14 @@ case class FlintMetadataLogEntry(
s"""
|{
| "version": "1.0",
| "latestId": "$id",
| "type": "flintindexstate",
| "state": "$state",
| "applicationId": "${sys.env.getOrElse("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "unknown")}",
| "jobId": "${sys.env.getOrElse("SERVERLESS_EMR_JOB_ID", "unknown")}",
| "dataSourceName": "$dataSource",
| "lastUpdateTime": "${System.currentTimeMillis()}",
| "jobStartTime": $createTime,
| "lastUpdateTime": ${System.currentTimeMillis()},
| "error": "$error"
|}
|""".stripMargin
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,7 @@ private FlintMetadataLogEntry createLogEntry(FlintMetadataLogEntry logEntry) {
latestId,
logEntry.seqNo(),
logEntry.primaryTerm(),
logEntry.createTime(),
logEntry.state(),
logEntry.dataSource(),
logEntry.error());
Expand Down Expand Up @@ -135,6 +136,7 @@ private FlintMetadataLogEntry writeLogEntry(
logEntry.id(),
response.getSeqNo(),
response.getPrimaryTerm(),
logEntry.createTime(),
logEntry.state(),
logEntry.dataSource(),
logEntry.error());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,12 +7,25 @@ package org.apache.spark.sql

import org.apache.spark.sql.catalyst.plans.logical.LogicalPlan
import org.apache.spark.sql.connector.catalog._
import org.apache.spark.util.ShutdownHookManager

/**
* Flint utility methods that rely on access to private code in Spark SQL package.
*/
package object flint {

/**
* Add shutdown hook to SparkContext with default priority.
*
* @param hook
* hook with the code to run during shutdown
* @return
* a handle that can be used to unregister the shutdown hook.
*/
def addShutdownHook(hook: () => Unit): AnyRef = {
ShutdownHookManager.addShutdownHook(hook)
}

/**
* Convert the given logical plan to Spark data frame.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@

package org.opensearch.flint.spark

import java.util.concurrent.{Executors, ScheduledExecutorService, ScheduledFuture, TimeUnit}

import scala.collection.JavaConverters._

import org.json4s.{Formats, NoTypeHints}
Expand Down Expand Up @@ -46,16 +44,17 @@ class FlintSpark(val spark: SparkSession) extends Logging {
/** Required by json4s parse function */
implicit val formats: Formats = Serialization.formats(NoTypeHints) + SkippingKindSerializer

/** Scheduler for updating index state regularly as needed, such as incremental refreshing */
var executor: ScheduledExecutorService = Executors.newScheduledThreadPool(1)

/**
* Data source name. Assign empty string in case of backward compatibility. TODO: remove this in
* future
*/
private val dataSourceName: String =
spark.conf.getOption("spark.flint.datasource.name").getOrElse("")

/** Flint Spark index monitor */
private val flintIndexMonitor: FlintSparkIndexMonitor =
new FlintSparkIndexMonitor(spark, flintClient, dataSourceName)

/**
* Create index builder for creating index with fluent API.
*
Expand Down Expand Up @@ -106,7 +105,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
try {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == EMPTY)
.initialLog(latest => latest.state == EMPTY || latest.state == DELETED)
.transientLog(latest => latest.copy(state = CREATING))
.finalLog(latest => latest.copy(state = ACTIVE))
.commit(latest =>
Expand Down Expand Up @@ -144,16 +143,17 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == ACTIVE)
.transientLog(latest => latest.copy(state = REFRESHING))
.transientLog(latest =>
latest.copy(state = REFRESHING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
// Change state to active if full, otherwise update index state regularly
if (mode == FULL) {
logInfo("Updating index state to active")
latest.copy(state = ACTIVE)
} else {
// Schedule regular update and return log entry as refreshing state
logInfo("Scheduling index state updater")
scheduleIndexStateUpdate(indexName)
logInfo("Scheduling index state monitor")
flintIndexMonitor.startMonitor(indexName)
latest
}
})
Expand Down Expand Up @@ -223,6 +223,7 @@ class FlintSpark(val spark: SparkSession) extends Logging {
.finalLog(latest => latest.copy(state = DELETED))
.commit(_ => {
// TODO: share same transaction for now
flintIndexMonitor.stopMonitor(indexName)
stopRefreshingJob(indexName)
flintClient.deleteIndex(indexName)
true
Expand Down Expand Up @@ -252,9 +253,10 @@ class FlintSpark(val spark: SparkSession) extends Logging {
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => Set(ACTIVE, REFRESHING, FAILED).contains(latest.state))
.transientLog(latest => latest.copy(state = RECOVERING))
.transientLog(latest =>
latest.copy(state = RECOVERING, createTime = System.currentTimeMillis()))
.finalLog(latest => {
scheduleIndexStateUpdate(indexName)
flintIndexMonitor.startMonitor(indexName)
latest.copy(state = REFRESHING)
})
.commit(_ => doRefreshIndex(index.get, indexName, INCREMENTAL))
Expand Down Expand Up @@ -287,40 +289,6 @@ class FlintSpark(val spark: SparkSession) extends Logging {
private def isIncrementalRefreshing(indexName: String): Boolean =
spark.streams.active.exists(_.name == indexName)

private def scheduleIndexStateUpdate(indexName: String): Unit = {
var task: ScheduledFuture[_] = null // avoid forward reference compile error at task.cancel()
task = executor.scheduleAtFixedRate(
() => {
logInfo(s"Scheduler triggers index log entry update for $indexName")
try {
if (isIncrementalRefreshing(indexName)) {
logInfo("Streaming job is still active")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(latest => latest.state == REFRESHING)
.finalLog(latest => latest) // timestamp will update automatically
.commit(_ => {})
} else {
logError("Streaming job is not active. Cancelling update task")
flintClient
.startTransaction(indexName, dataSourceName)
.initialLog(_ => true)
.finalLog(latest => latest.copy(state = FAILED))
.commit(_ => {})
task.cancel(true)
logInfo("Update task is cancelled")
}
} catch {
case e: Exception =>
logError("Failed to update index log entry", e)
throw new IllegalStateException("Failed to update index log entry")
}
},
15, // Delay to ensure final logging is complete first, otherwise version conflicts
60, // TODO: make interval configurable
TimeUnit.SECONDS)
}

// TODO: move to separate class
private def doRefreshIndex(
index: FlintSparkIndex,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -90,22 +90,6 @@ object FlintSparkIndex {
def flintIndexNamePrefix(fullTableName: String): String =
s"flint_${fullTableName.replace(".", "_")}_"

/**
* Populate environment variables to persist in Flint metadata.
*
* @return
* env key value mapping to populate
*/
def populateEnvToMetadata: Map[String, String] = {
// TODO: avoid hardcoding env name below by providing another config
val envNames = Seq("SERVERLESS_EMR_VIRTUAL_CLUSTER_ID", "SERVERLESS_EMR_JOB_ID")
envNames
.flatMap(key =>
Option(System.getenv(key))
.map(value => key -> value))
.toMap
}

/**
* Create Flint metadata builder with common fields.
*
Expand All @@ -120,12 +104,6 @@ object FlintSparkIndex {
builder.kind(index.kind)
builder.options(index.options.optionsWithDefault.mapValues(_.asInstanceOf[AnyRef]).asJava)

// Index properties
val envs = populateEnvToMetadata
if (envs.nonEmpty) {
builder.addProperty("env", envs.asJava)
}

// Optional index settings
val settings = index.options.indexSettings()
if (settings.isDefined) {
Expand Down
Loading

0 comments on commit d1280cf

Please sign in to comment.