Skip to content

Commit

Permalink
Add jvmGCTime metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Nov 11, 2024
1 parent 182689c commit 7ec12fd
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 11 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -175,6 +175,16 @@ public final class MetricConstants {
*/
public static final String INITIAL_CONDITION_CHECK_FAILED_PREFIX = "initialConditionCheck.failed.";

/**
* Metric for tracking the JVM GC time per task
*/
public static final String TASK_JVM_GC_TIME_METRIC = "task.jvmGCTime.count";

/**
* Metric for tracking the total JVM GC time for query
*/
public static final String TOTAL_JVM_GC_TIME_METRIC = "query.totalJvmGCTime.count";

private MetricConstants() {
// Private constructor to prevent instantiation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -6,17 +6,18 @@
package org.opensearch.flint.core.metrics

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.scheduler.{SparkListener, SparkListenerExecutorMetricsUpdate, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession

/**
* Collect and emit bytesRead/Written and recordsRead/Written metrics
* Collect and emit metrics by listening spark events
*/
class ReadWriteBytesSparkListener extends SparkListener with Logging {
class MetricsSparkListener extends SparkListener with Logging {
var bytesRead: Long = 0
var recordsRead: Long = 0
var bytesWritten: Long = 0
var recordsWritten: Long = 0
var totalJvmGcTime: Long = 0

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val inputMetrics = taskEnd.taskMetrics.inputMetrics
Expand All @@ -31,21 +32,33 @@ class ReadWriteBytesSparkListener extends SparkListener with Logging {
recordsRead += inputMetrics.recordsRead
bytesWritten += outputMetrics.bytesWritten
recordsWritten += outputMetrics.recordsWritten
totalJvmGcTime += taskEnd.taskMetrics.jvmGCTime

MetricsUtil.addHistoricGauge(MetricConstants.TASK_JVM_GC_TIME_METRIC, taskEnd.taskMetrics.jvmGCTime)
}

override def onExecutorMetricsUpdate(executorMetricsUpdate: SparkListenerExecutorMetricsUpdate): Unit = {
executorMetricsUpdate.executorUpdates.foreach { case (taskId, metrics) =>
val totalGcTime = metrics.getMetricValue("totalGCTime")
logInfo(s"ExecutorID: ${executorMetricsUpdate.execId}, Task ID: $taskId, Executor totalGcTime: $totalGcTime")
}
}

def emitMetrics(): Unit = {
logInfo(s"Input: totalBytesRead=${bytesRead}, totalRecordsRead=${recordsRead}")
logInfo(s"Output: totalBytesWritten=${bytesWritten}, totalRecordsWritten=${recordsWritten}")
logInfo(s"totalJvmGcTime=${totalJvmGcTime}")
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_BYTES_READ, bytesRead)
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_RECORDS_READ, recordsRead)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_BYTES_WRITTEN, bytesWritten)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_RECORDS_WRITTEN, recordsWritten)
MetricsUtil.addHistoricGauge(MetricConstants.TOTAL_JVM_GC_TIME_METRIC, totalJvmGcTime)
}
}

object ReadWriteBytesSparkListener {
object MetricsSparkListener {
def withMetrics[T](spark: SparkSession, lambda: () => T): T = {
val listener = new ReadWriteBytesSparkListener()
val listener = new MetricsSparkListener()
spark.sparkContext.addSparkListener(listener)

val result = lambda()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ package org.opensearch.flint.spark.refresh

import java.util.Collections

import org.opensearch.flint.core.metrics.ReadWriteBytesSparkListener
import org.opensearch.flint.core.metrics.MetricsSparkListener
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
Expand Down Expand Up @@ -68,7 +68,7 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
// Flint index has specialized logic and capability for incremental refresh
case refresh: StreamingRefresh =>
logInfo("Start refreshing index in streaming style")
val job = ReadWriteBytesSparkListener.withMetrics(
val job = MetricsSparkListener.withMetrics(
spark,
() =>
refresh
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@ import com.codahale.metrics.Timer
import org.opensearch.flint.common.model.{FlintStatement, InteractiveSession, SessionStates}
import org.opensearch.flint.core.FlintOptions
import org.opensearch.flint.core.logging.CustomLogging
import org.opensearch.flint.core.metrics.{MetricConstants, ReadWriteBytesSparkListener}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsSparkListener}
import org.opensearch.flint.core.metrics.MetricsUtil.{getTimerContext, incrementCounter, registerGauge, stopTimer}

import org.apache.spark.SparkConf
Expand Down Expand Up @@ -525,7 +525,7 @@ object FlintREPL extends Logging with FlintJobExecutor {
val statementTimerContext = getTimerContext(
MetricConstants.STATEMENT_PROCESSING_TIME_METRIC)
val (dataToWrite, returnedVerificationResult) =
ReadWriteBytesSparkListener.withMetrics(
MetricsSparkListener.withMetrics(
spark,
() => {
processStatementOnVerification(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ import scala.util.{Failure, Success, Try}

import org.opensearch.flint.common.model.FlintStatement
import org.opensearch.flint.common.scheduler.model.LangType
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, ReadWriteBytesSparkListener}
import org.opensearch.flint.core.metrics.{MetricConstants, MetricsUtil, MetricsSparkListener}
import org.opensearch.flint.core.metrics.MetricsUtil.incrementCounter
import org.opensearch.flint.spark.FlintSpark

Expand Down Expand Up @@ -70,7 +70,7 @@ case class JobOperator(
val statementExecutionManager =
instantiateStatementExecutionManager(commandContext, resultIndex, osClient)

val readWriteBytesSparkListener = new ReadWriteBytesSparkListener()
val readWriteBytesSparkListener = new MetricsSparkListener()
sparkSession.sparkContext.addSparkListener(readWriteBytesSparkListener)

val statement =
Expand Down

0 comments on commit 7ec12fd

Please sign in to comment.