diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportChannel.java b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportChannel.java index cc9f57a8..46d98447 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportChannel.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportChannel.java @@ -5,31 +5,45 @@ package org.opensearch.performanceanalyzer.transport; - +import java.io.BufferedReader; +import java.io.File; +import java.io.FileNotFoundException; +import java.io.FileReader; import java.io.IOException; +import java.util.HashMap; +import java.util.Map; import java.util.concurrent.atomic.AtomicLong; import org.apache.logging.log4j.LogManager; import org.apache.logging.log4j.Logger; import org.opensearch.core.transport.TransportResponse; -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.ShardBulkDimension; -import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.ShardBulkMetric; +import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor; import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics; +import org.opensearch.performanceanalyzer.commons.os.OSGlobals; +import org.opensearch.performanceanalyzer.commons.os.ThreadDiskIO; +import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode; import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.tags.Tags; import org.opensearch.transport.TransportChannel; public class PerformanceAnalyzerTransportChannel implements TransportChannel, MetricsProcessor { private static final Logger LOG = LogManager.getLogger(PerformanceAnalyzerTransportChannel.class); private static final int KEYS_PATH_LENGTH = 3; + private static final String PID = OSGlobals.getPid(); private static final AtomicLong UNIQUE_ID = new AtomicLong(0); + private static Map startValue; + private static final Logger LOGGER = LogManager.getLogger(ThreadDiskIO.class); + private TransportChannel original; private String indexName; private int shardId; private boolean primary; private String id; private String threadID; + private Counter throughputCounter; void set( TransportChannel original, @@ -37,44 +51,16 @@ void set( String indexName, int shardId, int itemCount, - boolean bPrimary) { + boolean bPrimary, + Counter throughputCounter) { this.original = original; this.id = String.valueOf(UNIQUE_ID.getAndIncrement()); this.indexName = indexName; this.shardId = shardId; this.primary = bPrimary; this.threadID = String.valueOf(ThreadIDUtil.INSTANCE.getNativeCurrentThreadId()); - - StringBuilder value = - new StringBuilder() - .append(PerformanceAnalyzerMetrics.getCurrentTimeMetric()) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkMetric.START_TIME.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(startTime) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkMetric.ITEM_COUNT.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(itemCount) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.INDEX_NAME.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(indexName) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.SHARD_ID.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(shardId) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.PRIMARY.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(bPrimary); - - saveMetricValues( - value.toString(), - startTime, - threadID, - id, - PerformanceAnalyzerMetrics.START_FILE_NAME); + this.startValue = getIOUsage(threadID); + this.throughputCounter = throughputCounter; } @Override @@ -93,6 +79,43 @@ public void sendResponse(TransportResponse response) throws IOException { original.sendResponse(response); } + private Map getIOUsage(String tid) { + try (FileReader fileReader = + new FileReader(new File("/proc/" + PID + "/task/" + tid + "/io")); + BufferedReader bufferedReader = new BufferedReader(fileReader); ) { + String line = null; + Map kvmap = new HashMap<>(); + while ((line = bufferedReader.readLine()) != null) { + String[] toks = line.split("[: ]+"); + String key = toks[0]; + long val = Long.parseLong(toks[1]); + kvmap.put(key, val); + } + return kvmap; + } catch (FileNotFoundException e) { + LOGGER.debug("FileNotFound in parse with exception: {}", () -> e.toString()); + } catch (Exception e) { + LOGGER.debug( + "Error In addSample PID for: {} Tid for: {} with error: {} with ExceptionCode: {}", + () -> PID, + () -> tid, + () -> e.toString(), + () -> StatExceptionCode.THREAD_IO_ERROR.toString()); + StatsCollector.instance().logException(StatExceptionCode.THREAD_IO_ERROR); + } + return null; + } + + private double getIOUtilization(Map v, Map oldv) { + if (v != null && oldv != null) { + return v.get("write_bytes") - oldv.get("write_bytes"); + } else if (v != null) { + return v.get("write_bytes"); + } else { + return 0.0; + } + } + @Override public void sendResponse(Exception exception) throws IOException { emitMetricsFinish(exception); @@ -100,48 +123,18 @@ public void sendResponse(Exception exception) throws IOException { } private void emitMetricsFinish(Exception exception) { - long currTime = System.currentTimeMillis(); - StringBuilder value = - new StringBuilder() - .append(PerformanceAnalyzerMetrics.getCurrentTimeMetric()) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkMetric.FINISH_TIME.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(currTime) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.INDEX_NAME.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(indexName) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.SHARD_ID.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(shardId) - .append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.PRIMARY.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(primary); if (exception != null) { - value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.EXCEPTION.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(exception.getClass().getName()); - value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.FAILED.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(true); - } else { - value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor) - .append(ShardBulkDimension.FAILED.toString()) - .append(PerformanceAnalyzerMetrics.sKeyValueDelimitor) - .append(false); + Map finishValue = getIOUsage(threadID); + double writeThroughput = getIOUtilization(finishValue, startValue); + if (writeThroughput >= 0.0) { + throughputCounter.add( + writeThroughput, + Tags.create() + .addTag("IndexName", indexName) + .addTag("ShardId", shardId) + .addTag("Operation", "shardbulk")); + } } - - saveMetricValues( - value.toString(), - currTime, - threadID, - id, - PerformanceAnalyzerMetrics.FINISH_FILE_NAME); } // This function is called from the security plugin using reflection. Do not diff --git a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java index a4cd946e..7c47dde3 100644 --- a/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java +++ b/src/main/java/org/opensearch/performanceanalyzer/transport/PerformanceAnalyzerTransportRequestHandler.java @@ -12,9 +12,13 @@ import org.apache.logging.log4j.Logger; import org.opensearch.action.bulk.BulkShardRequest; import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest; +import org.opensearch.cluster.service.ClusterService; +import org.opensearch.performanceanalyzer.OpenSearchResources; import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector; import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController; import org.opensearch.tasks.Task; +import org.opensearch.telemetry.metrics.Counter; +import org.opensearch.telemetry.metrics.MetricsRegistry; import org.opensearch.transport.TransportChannel; import org.opensearch.transport.TransportRequest; import org.opensearch.transport.TransportRequestHandler; @@ -27,10 +31,19 @@ public class PerformanceAnalyzerTransportRequestHandler actualHandler; boolean logOnce = false; + private ClusterService clusterService = OpenSearchResources.INSTANCE.getClusterService(); + + private MetricsRegistry metricsRegistry = clusterService.getMetricsRegistry(); + private Counter throughputCounter; + PerformanceAnalyzerTransportRequestHandler( TransportRequestHandler actualHandler, PerformanceAnalyzerController controller) { this.actualHandler = actualHandler; this.controller = controller; + this.throughputCounter = + metricsRegistry.createCounter( + "pa.throughputCounter", "throughput counter", "bytes"); + metricsRegistry = clusterService.getMetricsRegistry(); } PerformanceAnalyzerTransportRequestHandler set(TransportRequestHandler actualHandler) { @@ -87,7 +100,8 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel bsr.index(), bsr.shardId().id(), bsr.items().length, - bPrimary); + bPrimary, + throughputCounter); } catch (Exception ex) { if (!logOnce) { LOG.error(ex);