Skip to content

Commit

Permalink
Add read/write bytes metrics
Browse files Browse the repository at this point in the history
Signed-off-by: Tomoyuki Morita <[email protected]>
  • Loading branch information
ykmr1224 committed Oct 24, 2024
1 parent 7546f58 commit 4348e58
Show file tree
Hide file tree
Showing 9 changed files with 315 additions and 24 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,68 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics;

import com.codahale.metrics.Gauge;
import java.util.ArrayList;
import java.util.Collections;
import java.util.LinkedList;
import java.util.List;

/**
* Gauge which stores historic data points with timestamps.
* This is used for emitting separate data points per request, instead of single aggregated metrics.
*/
public class HistoricGauge implements Gauge<Long> {
public static class DataPoint {
Long value;
long timestamp;

DataPoint(long value, long timestamp) {
this.value = value;
this.timestamp = timestamp;
}

public Long getValue() {
return value;
}

public long getTimestamp() {
return timestamp;
}
}

private final List<DataPoint> dataPoints = Collections.synchronizedList(new LinkedList<>());

/**
* This method will just return first value.
* @return
*/
@Override
public Long getValue() {
if (!dataPoints.isEmpty()) {
return dataPoints.get(0).value;
} else {
return null;
}
}

public void addDataPoint(Long value) {
dataPoints.add(new DataPoint(value, System.currentTimeMillis()));
}

/**
* Return copy of dataPoints and remove them from internal list
* @return copy of the data points
*/
public List<DataPoint> pollDataPoints() {
int size = dataPoints.size();
List<DataPoint> result = new ArrayList<>(dataPoints.subList(0, size));
if (size > 0) {
dataPoints.subList(0, size).clear();
}
return result;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,26 @@ public final class MetricConstants {
*/
public static final String QUERY_EXECUTION_TIME_METRIC = "query.execution.processingTime";

/**
* Metric for tracking the total bytes read from input
*/
public static final String INPUT_TOTAL_BYTES_READ = "input.totalBytesRead.count";

/**
* Metric for tracking the total records read from input
*/
public static final String INPUT_TOTAL_RECORDS_READ = "input.totalRecordsRead.count";

/**
* Metric for tracking the total bytes written to output
*/
public static final String OUTPUT_TOTAL_BYTES_WRITTEN = "output.totalBytesWritten.count";

/**
* Metric for tracking the total records written to output
*/
public static final String OUTPUT_TOTAL_RECORDS_WRITTEN = "output.totalRecordsWritten.count";

private MetricConstants() {
// Private constructor to prevent instantiation
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,15 @@ public static void decrementCounter(String metricName, boolean isIndexMetric) {
}
}

public static void setCounter(String metricName, boolean isIndexMetric, long n) {
Counter counter = getOrCreateCounter(metricName, isIndexMetric);
if (counter != null) {
counter.dec(counter.getCount());
counter.inc(n);
LOG.info("counter: " + counter.getCount());
}
}

/**
* Retrieves a {@link Timer.Context} for the specified metric name, creating a new timer if one does not already exist.
*
Expand Down Expand Up @@ -117,8 +126,16 @@ public static Timer getTimer(String metricName, boolean isIndexMetric) {
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
*/
public static void registerGauge(String metricName, final AtomicInteger value) {
registerGauge(metricName, value, false);
public static void addHistoricGauge(String metricName, final long value) {
HistoricGauge historicGauge = getOrCreateHistoricGauge(metricName);
if (historicGauge != null) {
historicGauge.addDataPoint(value);
}
}

private static HistoricGauge getOrCreateHistoricGauge(String metricName) {
MetricRegistry metricRegistry = getMetricRegistry(false);
return metricRegistry != null ? metricRegistry.gauge(metricName, HistoricGauge::new) : null;
}

/**
Expand All @@ -137,6 +154,17 @@ public static void registerGauge(String metricName, final AtomicInteger value, b
metricRegistry.register(metricName, (Gauge<Integer>) value::get);
}

/**
* Registers a gauge metric with the provided name and value.
*
* @param metricName The name of the gauge metric to register.
* @param value The AtomicInteger whose current value should be reflected by the gauge.
*/
public static void registerGauge(String metricName, final AtomicInteger value) {
registerGauge(metricName, value, false);
}


private static Counter getOrCreateCounter(String metricName, boolean isIndexMetric) {
MetricRegistry metricRegistry = getMetricRegistry(isIndexMetric);
return metricRegistry != null ? metricRegistry.counter(metricName) : null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@
import java.util.stream.LongStream;
import java.util.stream.Stream;
import org.apache.spark.metrics.sink.CloudWatchSink.DimensionNameGroups;
import org.opensearch.flint.core.metrics.HistoricGauge;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down Expand Up @@ -145,7 +146,11 @@ public void report(final SortedMap<String, Gauge> gauges,
gauges.size() + counters.size() + 10 * histograms.size() + 10 * timers.size());

for (final Map.Entry<String, Gauge> gaugeEntry : gauges.entrySet()) {
processGauge(gaugeEntry.getKey(), gaugeEntry.getValue(), metricData);
if (gaugeEntry.getValue() instanceof HistoricGauge) {
processHistoricGauge(gaugeEntry.getKey(), (HistoricGauge) gaugeEntry.getValue(), metricData);
} else {
processGauge(gaugeEntry.getKey(), gaugeEntry.getValue(), metricData);
}
}

for (final Map.Entry<String, Counter> counterEntry : counters.entrySet()) {
Expand Down Expand Up @@ -227,6 +232,13 @@ private void processGauge(final String metricName, final Gauge gauge, final List
}
}

private void processHistoricGauge(final String metricName, final HistoricGauge gauge, final List<MetricDatum> metricData) {
for (HistoricGauge.DataPoint dataPoint: gauge.pollDataPoints()) {
stageMetricDatum(true, metricName, dataPoint.getValue().doubleValue(), StandardUnit.None, DIMENSION_GAUGE, metricData,
dataPoint.getTimestamp());
}
}

private void processCounter(final String metricName, final Counting counter, final List<MetricDatum> metricData) {
long currentCount = counter.getCount();
Long lastCount = lastPolledCounts.get(counter);
Expand Down Expand Up @@ -333,12 +345,25 @@ private void processHistogram(final String metricName, final Histogram histogram
* <p>
* If {@link Builder#withZeroValuesSubmission()} is {@code true}, then all values will be submitted
*/
private void stageMetricDatum(final boolean metricConfigured,
final String metricName,
final double metricValue,
final StandardUnit standardUnit,
final String dimensionValue,
final List<MetricDatum> metricData
) {
stageMetricDatum(metricConfigured, metricName, metricValue, standardUnit,
dimensionValue, metricData, builder.clock.getTime());
}

private void stageMetricDatum(final boolean metricConfigured,
final String metricName,
final double metricValue,
final StandardUnit standardUnit,
final String dimensionValue,
final List<MetricDatum> metricData) {
final List<MetricDatum> metricData,
final Long timestamp
) {
// Only submit metrics that show some data, so let's save some money
if (metricConfigured && (builder.withZeroValuesSubmission || metricValue > 0)) {
final DimensionedName dimensionedName = DimensionedName.decode(metricName);
Expand All @@ -351,7 +376,7 @@ private void stageMetricDatum(final boolean metricConfigured,
MetricInfo metricInfo = getMetricInfo(dimensionedName, dimensions);
for (Set<Dimension> dimensionSet : metricInfo.getDimensionSets()) {
MetricDatum datum = new MetricDatum()
.withTimestamp(new Date(builder.clock.getTime()))
.withTimestamp(new Date(timestamp))
.withValue(cleanMetricValue(metricValue))
.withMetricName(metricInfo.getMetricName())
.withDimensions(dimensionSet)
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics

import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}
import org.apache.spark.sql.SparkSession

/**
* Collect and emit bytesRead/Written and recordsRead/Written metrics
*/
class ReadWriteBytesSparkListener extends SparkListener with Logging {
var bytesRead: Long = 0
var recordsRead: Long = 0
var bytesWritten: Long = 0
var recordsWritten: Long = 0

override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = {
val inputMetrics = taskEnd.taskMetrics.inputMetrics
val outputMetrics = taskEnd.taskMetrics.outputMetrics
val ids = s"(${taskEnd.taskInfo.taskId}, ${taskEnd.taskInfo.partitionId})"
logInfo(
s"${ids} Input: bytesRead=${inputMetrics.bytesRead}, recordsRead=${inputMetrics.recordsRead}")
logInfo(
s"${ids} Output: bytesWritten=${outputMetrics.bytesWritten}, recordsWritten=${outputMetrics.recordsWritten}")

bytesRead += inputMetrics.bytesRead
recordsRead += inputMetrics.recordsRead
bytesWritten += outputMetrics.bytesWritten
recordsWritten += outputMetrics.recordsWritten
}

def emitMetrics(): Unit = {
logInfo(s"Input: totalBytesRead=${bytesRead}, totalRecordsRead=${recordsRead}")
logInfo(s"Output: totalBytesWritten=${bytesWritten}, totalRecordsWritten=${recordsWritten}")
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_BYTES_READ, bytesRead)
MetricsUtil.addHistoricGauge(MetricConstants.INPUT_TOTAL_RECORDS_READ, recordsRead)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_BYTES_WRITTEN, bytesWritten)
MetricsUtil.addHistoricGauge(MetricConstants.OUTPUT_TOTAL_RECORDS_WRITTEN, recordsWritten)
}
}

object ReadWriteBytesSparkListener {
def withMetrics[T](spark: SparkSession, lambda: () => T): T = {
val listener = new ReadWriteBytesSparkListener()
spark.sparkContext.addSparkListener(listener)

val result = lambda()

spark.sparkContext.removeSparkListener(listener)
listener.emitMetrics()

result
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,79 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.flint.core.metrics;

import org.junit.Test;
import static org.junit.Assert.*;
import org.opensearch.flint.core.metrics.HistoricGauge.DataPoint;

import java.util.List;

public class HistoricGaugeTest {

@Test
public void testGetValue_EmptyGauge_ShouldReturnNull() {
HistoricGauge gauge= new HistoricGauge();
assertNull(gauge.getValue());
}

@Test
public void testGetValue_WithSingleDataPoint_ShouldReturnFirstValue() {
HistoricGauge gauge= new HistoricGauge();
Long value = 100L;
gauge.addDataPoint(value);

assertEquals(value, gauge.getValue());
}

@Test
public void testGetValue_WithMultipleDataPoints_ShouldReturnFirstValue() {
HistoricGauge gauge= new HistoricGauge();
Long firstValue = 100L;
Long secondValue = 200L;
gauge.addDataPoint(firstValue);
gauge.addDataPoint(secondValue);

assertEquals(firstValue, gauge.getValue());
}

@Test
public void testPollDataPoints_WithMultipleDataPoints_ShouldReturnAndClearDataPoints() {
HistoricGauge gauge= new HistoricGauge();
gauge.addDataPoint(100L);
gauge.addDataPoint(200L);
gauge.addDataPoint(300L);

List<DataPoint> dataPoints = gauge.pollDataPoints();

assertEquals(3, dataPoints.size());
assertEquals(Long.valueOf(100L), dataPoints.get(0).getValue());
assertEquals(Long.valueOf(200L), dataPoints.get(1).getValue());
assertEquals(Long.valueOf(300L), dataPoints.get(2).getValue());

assertTrue(gauge.pollDataPoints().isEmpty());
}

@Test
public void testAddDataPoint_ShouldAddDataPointWithCorrectValueAndTimestamp() {
HistoricGauge gauge= new HistoricGauge();
Long value = 100L;
gauge.addDataPoint(value);

List<DataPoint> dataPoints = gauge.pollDataPoints();

assertEquals(1, dataPoints.size());
assertEquals(value, dataPoints.get(0).getValue());
assertTrue(dataPoints.get(0).getTimestamp() > 0);
}

@Test
public void testPollDataPoints_EmptyGauge_ShouldReturnEmptyList() {
HistoricGauge gauge= new HistoricGauge();
List<DataPoint> dataPoints = gauge.pollDataPoints();

assertTrue(dataPoints.isEmpty());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ package org.opensearch.flint.spark.refresh

import java.util.Collections

import org.opensearch.flint.core.metrics.ReadWriteBytesSparkListener
import org.opensearch.flint.spark.{FlintSparkIndex, FlintSparkIndexOptions, FlintSparkValidationHelper}
import org.opensearch.flint.spark.FlintSparkIndex.{quotedTableName, StreamingRefresh}
import org.opensearch.flint.spark.refresh.FlintSparkIndexRefresh.RefreshMode.{AUTO, RefreshMode}
Expand Down Expand Up @@ -67,15 +68,17 @@ class AutoIndexRefresh(indexName: String, index: FlintSparkIndex)
// Flint index has specialized logic and capability for incremental refresh
case refresh: StreamingRefresh =>
logInfo("Start refreshing index in streaming style")
val job =
refresh
.buildStream(spark)
.writeStream
.queryName(indexName)
.format(FLINT_DATASOURCE)
.options(flintSparkConf.properties)
.addSinkOptions(options, flintSparkConf)
.start(indexName)
val job = ReadWriteBytesSparkListener.withMetrics(
spark,
() =>
refresh
.buildStream(spark)
.writeStream
.queryName(indexName)
.format(FLINT_DATASOURCE)
.options(flintSparkConf.properties)
.addSinkOptions(options, flintSparkConf)
.start(indexName))
Some(job.id.toString)

// Otherwise, fall back to foreachBatch + batch refresh
Expand Down
Loading

0 comments on commit 4348e58

Please sign in to comment.