Skip to content

Commit

Permalink
Add repl metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Feb 2, 2024
1 parent 6f70884 commit e559374
Show file tree
Hide file tree
Showing 4 changed files with 107 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,14 @@ public class MetricConstants {
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations.
*/
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";

public static final String S3_ERR_CNT_METRIC = "s3.error.count";
public static final String REPL_RUNNING_METRIC = "session.running.count";
public static final String REPL_FAILED_METRIC = "session.failed.count";
public static final String REPL_SUCCESS_METRIC = "session.success.count";
public static final String REPL_PROCESSING_TIME_METRIC = "session.processingTime";
public static final String STATEMENT_RUNNING_METRIC = "statement.running.count";
public static final String STATEMENT_FAILED_METRIC = "statement.failed.count";
public static final String STATEMENT_SUCCESS_METRIC = "statement.success.count";
public static final String STATEMENT_PROCESSING_TIME_METRIC = "STATEMENT.processingTime";
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@
package org.opensearch.flint.core.metrics;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Timer;
import org.apache.spark.SparkEnv;
import org.apache.spark.metrics.source.FlintMetricSource;
import org.apache.spark.metrics.source.Source;
Expand Down Expand Up @@ -38,6 +39,25 @@ public static void incrementCounter(String metricName) {
}
}

public static void decrementCounter(String metricName) {
Counter counter = getOrCreateCounter(metricName);
if (counter != null) {
if (counter.getCount() > 0) {
counter.dec();
LOG.info(metricName + ": decreased");
} else {
LOG.info(metricName + ": already at minimum, cannot decrease");
}
}
}

public static Timer.Context getTimerContext(String metricName) {
Timer timer = getOrCreateTimer(metricName);
if (timer != null) {
return timer.time();
}
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
SparkEnv sparkEnv = SparkEnv.get();
Expand All @@ -54,6 +74,22 @@ private static Counter getOrCreateCounter(String metricName) {
return counter;
}

// Retrieves or creates a new Timer for the given metric name
private static Timer getOrCreateTimer(String metricName) {
SparkEnv sparkEnv = SparkEnv.get();
if (sparkEnv == null) {
LOG.warning("Spark environment not available, cannot instrument metric: " + metricName);
return null;
}

FlintMetricSource flintMetricSource = getOrInitFlintMetricSource(sparkEnv);
Timer timer = flintMetricSource.metricRegistry().getTimers().get(metricName);
if (timer == null) {
timer = flintMetricSource.metricRegistry().timer(metricName);
}
return timer;
}

// Gets or initializes the FlintMetricSource
private static FlintMetricSource getOrInitFlintMetricSource(SparkEnv sparkEnv) {
Seq<Source> metricSourceSeq = sparkEnv.metricsSystem().getSourcesByName(FlintMetricSource.FLINT_METRIC_SOURCE_NAME());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import scala.concurrent.duration.{Duration, MINUTES}
import com.amazonaws.services.s3.model.AmazonS3Exception
import org.opensearch.flint.core.FlintClient
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import play.api.libs.json.{JsArray, JsBoolean, JsObject, Json, JsString, JsValue}

import org.apache.spark.{SparkConf, SparkException}
Expand Down Expand Up @@ -393,6 +395,7 @@ trait FlintJobExecutor {
case r: ParseException =>
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
case r: AmazonS3Exception =>
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
handleQueryException(
r,
"Fail to read data from S3. Cause",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,11 +13,14 @@ import scala.concurrent.duration._
import scala.util.{Failure, Success, Try}
import scala.util.control.NonFatal

import com.codahale.metrics.Timer
import org.json4s.native.Serialization
import org.opensearch.action.get.GetResponse
import org.opensearch.common.Strings
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
import org.opensearch.flint.app.FlintInstance.formats
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, getTimerContext, incrementCounter}
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -107,7 +110,14 @@ object FlintREPL extends Logging with FlintJobExecutor {
conf.getLong("spark.flint.job.queryWaitTimeoutMillis", DEFAULT_QUERY_WAIT_TIMEOUT_MILLIS)

val flintSessionIndexUpdater = osClient.createUpdater(sessionIndex.get)
addShutdownHook(flintSessionIndexUpdater, osClient, sessionIndex.get, sessionId.get)
val flintSessionContext = getTimerContext(MetricConstants.REPL_PROCESSING_TIME_METRIC)

addShutdownHook(
flintSessionIndexUpdater,
osClient,
sessionIndex.get,
sessionId.get,
flintSessionContext)

// 1 thread for updating heart beat
val threadPool =
Expand Down Expand Up @@ -155,6 +165,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
exponentialBackoffRetry(maxRetries = 5, initialDelay = 2.seconds) {
queryLoop(commandContext)
}
recordSessionSuccess(flintSessionContext)
} catch {
case e: Exception =>
handleSessionError(
Expand All @@ -165,13 +176,14 @@ object FlintREPL extends Logging with FlintJobExecutor {
jobStartTime,
flintSessionIndexUpdater,
osClient,
sessionIndex.get)
sessionIndex.get,
flintSessionContext)
} finally {
if (threadPool != null) {
heartBeatFuture.cancel(true) // Pass `true` to interrupt if running
threadPoolFactory.shutdownThreadPool(threadPool)
}

flintSessionContext.stop()
spark.stop()

// Check for non-daemon threads that may prevent the driver from shutting down.
Expand Down Expand Up @@ -364,6 +376,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
FlintInstance.serializeWithoutJobId(flintJob, currentTime)
}
flintSessionIndexUpdater.upsert(sessionId, serializedFlintInstance)
incrementCounter(MetricConstants.REPL_RUNNING_METRIC)
logInfo(
s"""Updated job: {"jobid": ${flintJob.jobId}, "sessionId": ${flintJob.sessionId}} from $sessionIndex""")
}
Expand All @@ -376,14 +389,18 @@ object FlintREPL extends Logging with FlintJobExecutor {
jobStartTime: Long,
flintSessionIndexUpdater: OpenSearchUpdater,
osClient: OSClient,
sessionIndex: String): Unit = {
sessionIndex: String,
flintSessionContext: Timer.Context): Unit = {
val error = s"Session error: ${e.getMessage}"
logError(error, e)

val flintInstance = getExistingFlintInstance(osClient, sessionIndex, sessionId)
.getOrElse(createFailedFlintInstance(applicationId, jobId, sessionId, jobStartTime, error))

updateFlintInstance(flintInstance, flintSessionIndexUpdater, sessionId)
if (flintInstance.state.equals("fail")) {
recordSessionFailed(flintSessionContext)
}
}

private def getExistingFlintInstance(
Expand Down Expand Up @@ -509,6 +526,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
} else if (!flintReader.hasNext) {
canProceed = false
} else {
val statementContext = getTimerContext(MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)
val flintCommand = processCommandInitiation(flintReader, flintSessionIndexUpdater)

val (dataToWrite, returnedVerificationResult) = processStatementOnVerification(
Expand All @@ -529,7 +547,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintCommand,
resultIndex,
flintSessionIndexUpdater,
osClient)
osClient,
statementContext)
// last query finish time is last activity time
lastActivityTime = currentTimeProvider.currentEpochMillis()
}
Expand All @@ -556,7 +575,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintCommand: FlintCommand,
resultIndex: String,
flintSessionIndexUpdater: OpenSearchUpdater,
osClient: OSClient): Unit = {
osClient: OSClient,
statementContext: Timer.Context): Unit = {
try {
dataToWrite.foreach(df => writeDataFrameToOpensearch(df, resultIndex, osClient))
// todo. it is migration plan to handle https://github
Expand All @@ -567,6 +587,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintCommand.complete()
}
updateSessionIndex(flintCommand, flintSessionIndexUpdater)
recordStatementStateChange(flintCommand, statementContext)
} catch {
// e.g., maybe due to authentication service connection issue
// or invalid catalog (e.g., we are operating on data not defined in provided data source)
Expand All @@ -575,6 +596,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
logError(error, e)
flintCommand.fail()
updateSessionIndex(flintCommand, flintSessionIndexUpdater)
recordStatementStateChange(flintCommand, statementContext)
}
}

Expand Down Expand Up @@ -770,6 +792,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintCommand.running()
logDebug(s"command running: $flintCommand")
updateSessionIndex(flintCommand, flintSessionIndexUpdater)
incrementCounter(MetricConstants.STATEMENT_RUNNING_METRIC)
flintCommand
}

Expand Down Expand Up @@ -823,7 +846,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
osClient: OSClient,
sessionIndex: String,
sessionId: String,
shutdownHookManager: ShutdownHookManagerTrait = DefaultShutdownHookManager): Unit = {
shutdownHookManager: ShutdownHookManagerTrait = DefaultShutdownHookManager,
flintSessionContext: Timer.Context): Unit = {

shutdownHookManager.addShutdownHook(() => {
logInfo("Shutting down REPL")
Expand Down Expand Up @@ -852,7 +876,8 @@ object FlintREPL extends Logging with FlintJobExecutor {
source: java.util.Map[String, AnyRef],
getResponse: GetResponse,
flintSessionIndexUpdater: OpenSearchUpdater,
sessionId: String): Unit = {
sessionId: String,
flintSessionContext: Timer.Context): Unit = {
val flintInstance = FlintInstance.deserializeFromMap(source)
flintInstance.state = "dead"
flintSessionIndexUpdater.updateIf(
Expand All @@ -862,6 +887,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
currentTimeProvider.currentEpochMillis()),
getResponse.getSeqNo,
getResponse.getPrimaryTerm)
recordSessionSuccess(flintSessionContext)
}

/**
Expand Down Expand Up @@ -1023,4 +1049,28 @@ object FlintREPL extends Logging with FlintJobExecutor {

result.getOrElse(throw new RuntimeException("Failed after retries"))
}

private def recordSessionSuccess(sessionContext: Timer.Context): Unit = {
sessionContext.stop()
decrementCounter(MetricConstants.REPL_RUNNING_METRIC)
incrementCounter(MetricConstants.REPL_SUCCESS_METRIC)
}

private def recordSessionFailed(sessionContext: Timer.Context): Unit = {
sessionContext.stop()
decrementCounter(MetricConstants.REPL_RUNNING_METRIC)
incrementCounter(MetricConstants.REPL_FAILED_METRIC)
}

private def recordStatementStateChange(
flintCommand: FlintCommand,
statementContext: Timer.Context): Unit = {
statementContext.stop()
decrementCounter(MetricConstants.STATEMENT_RUNNING_METRIC)
if (flintCommand.isComplete()) {
incrementCounter(MetricConstants.STATEMENT_SUCCESS_METRIC)
} else if (flintCommand.isFailed()) {
incrementCounter(MetricConstants.STATEMENT_FAILED_METRIC)
}
}
}

0 comments on commit e559374

Please sign in to comment.