Skip to content

Commit

Permalink
Add repl metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Louis Chu <[email protected]>
  • Loading branch information
noCharger committed Feb 1, 2024
1 parent 6f70884 commit 1a923be
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,4 +21,12 @@ public class MetricConstants {
* Similar to OS_READ_METRIC_PREFIX, this constant is used for categorizing and identifying metrics that pertain to write operations.
*/
public static final String OS_WRITE_OP_METRIC_PREFIX = "opensearch.write";

public static final String S3_ERR_CNT_METRIC = "s3.error.count";
public static final String REPL_RUNNING_METRIC = "session.running.count";
public static final String REPL_FAILED_METRIC = "session.failed.count";
public static final String REPL_SUCCESS_METRIC = "session.success.count";
public static final String STATEMENT_RUNNING_METRIC = "statement.running.count";
public static final String STATEMENT_FAILED_METRIC = "statement.failed.count";
public static final String STATEMENT_SUCCESS_METRIC = "statement.success.count";
}
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,18 @@ public static void incrementCounter(String metricName) {
}
}

public static void decrementCounter(String metricName) {
Counter counter = getOrCreateCounter(metricName);
if (counter != null) {
if (counter.getCount() > 0) {
counter.dec();
LOG.info(metricName + ": decreased");
} else {
LOG.info(metricName + ": already at minimum, cannot decrease");
}
}
}

// Retrieves or creates a new counter for the given metric name
private static Counter getOrCreateCounter(String metricName) {
SparkEnv sparkEnv = SparkEnv.get();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,8 @@ import scala.concurrent.duration.{Duration, MINUTES}
import com.amazonaws.services.s3.model.AmazonS3Exception
import org.opensearch.flint.core.FlintClient
import org.opensearch.flint.core.metadata.FlintMetadata
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import play.api.libs.json.{JsArray, JsBoolean, JsObject, Json, JsString, JsValue}

import org.apache.spark.{SparkConf, SparkException}
Expand Down Expand Up @@ -393,6 +395,7 @@ trait FlintJobExecutor {
case r: ParseException =>
handleQueryException(r, "Syntax error", spark, dataSource, query, queryId, sessionId)
case r: AmazonS3Exception =>
incrementCounter(MetricConstants.S3_ERR_CNT_METRIC)
handleQueryException(
r,
"Fail to read data from S3. Cause",
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@ import org.opensearch.action.get.GetResponse
import org.opensearch.common.Strings
import org.opensearch.flint.app.{FlintCommand, FlintInstance}
import org.opensearch.flint.app.FlintInstance.formats
import org.opensearch.flint.core.metrics.MetricConstants
import org.opensearch.flint.core.metrics.MetricsUtil.{decrementCounter, incrementCounter}
import org.opensearch.flint.core.storage.{FlintReader, OpenSearchUpdater}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -155,6 +157,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
exponentialBackoffRetry(maxRetries = 5, initialDelay = 2.seconds) {
queryLoop(commandContext)
}
recordSessionSuccess()
} catch {
case e: Exception =>
handleSessionError(
Expand Down Expand Up @@ -364,6 +367,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
FlintInstance.serializeWithoutJobId(flintJob, currentTime)
}
flintSessionIndexUpdater.upsert(sessionId, serializedFlintInstance)
incrementCounter(MetricConstants.REPL_RUNNING_METRIC)
logInfo(
s"""Updated job: {"jobid": ${flintJob.jobId}, "sessionId": ${flintJob.sessionId}} from $sessionIndex""")
}
Expand All @@ -384,6 +388,9 @@ object FlintREPL extends Logging with FlintJobExecutor {
.getOrElse(createFailedFlintInstance(applicationId, jobId, sessionId, jobStartTime, error))

updateFlintInstance(flintInstance, flintSessionIndexUpdater, sessionId)
if (flintInstance.state.equals("fail")) {
recordSessionFailed()
}
}

private def getExistingFlintInstance(
Expand Down Expand Up @@ -567,6 +574,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintCommand.complete()
}
updateSessionIndex(flintCommand, flintSessionIndexUpdater)
recordCommandStateChange(flintCommand)
} catch {
// e.g., maybe due to authentication service connection issue
// or invalid catalog (e.g., we are operating on data not defined in provided data source)
Expand All @@ -575,6 +583,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
logError(error, e)
flintCommand.fail()
updateSessionIndex(flintCommand, flintSessionIndexUpdater)
recordCommandStateChange(flintCommand)
}
}

Expand Down Expand Up @@ -770,6 +779,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
flintCommand.running()
logDebug(s"command running: $flintCommand")
updateSessionIndex(flintCommand, flintSessionIndexUpdater)
incrementCounter(MetricConstants.STATEMENT_SUCCESS_METRIC)
flintCommand
}

Expand Down Expand Up @@ -862,6 +872,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
currentTimeProvider.currentEpochMillis()),
getResponse.getSeqNo,
getResponse.getPrimaryTerm)
recordSessionSuccess()
}

/**
Expand Down Expand Up @@ -1023,4 +1034,23 @@ object FlintREPL extends Logging with FlintJobExecutor {

result.getOrElse(throw new RuntimeException("Failed after retries"))
}

private def recordSessionSuccess(): Unit = {
decrementCounter(MetricConstants.REPL_RUNNING_METRIC)
incrementCounter(MetricConstants.REPL_SUCCESS_METRIC)
}

private def recordSessionFailed(): Unit = {
decrementCounter(MetricConstants.REPL_RUNNING_METRIC)
incrementCounter(MetricConstants.REPL_FAILED_METRIC)
}

private def recordCommandStateChange(flintCommand: FlintCommand): Unit = {
decrementCounter(MetricConstants.STATEMENT_RUNNING_METRIC)
if (flintCommand.isComplete()) {
incrementCounter(MetricConstants.STATEMENT_SUCCESS_METRIC)
} else if (flintCommand.isFailed()) {
incrementCounter(MetricConstants.STATEMENT_FAILED_METRIC)
}
}
}

0 comments on commit 1a923be

Please sign in to comment.