Skip to content

Commit

Permalink
test changes for PA
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Apr 19, 2024
1 parent eeec44a commit 49cdf9d
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 76 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -5,76 +5,62 @@

package org.opensearch.performanceanalyzer.transport;


import java.io.BufferedReader;
import java.io.File;
import java.io.FileNotFoundException;
import java.io.FileReader;
import java.io.IOException;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.atomic.AtomicLong;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.transport.TransportResponse;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.ShardBulkDimension;
import org.opensearch.performanceanalyzer.commons.metrics.AllMetrics.ShardBulkMetric;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.os.OSGlobals;
import org.opensearch.performanceanalyzer.commons.os.ThreadDiskIO;
import org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode;
import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.transport.TransportChannel;

public class PerformanceAnalyzerTransportChannel implements TransportChannel, MetricsProcessor {
private static final Logger LOG =
LogManager.getLogger(PerformanceAnalyzerTransportChannel.class);
private static final int KEYS_PATH_LENGTH = 3;
private static final String PID = OSGlobals.getPid();
private static final AtomicLong UNIQUE_ID = new AtomicLong(0);

private static Map<String, Long> startValue;
private static final Logger LOGGER = LogManager.getLogger(ThreadDiskIO.class);

private TransportChannel original;
private String indexName;
private int shardId;
private boolean primary;
private String id;
private String threadID;
private Counter throughputCounter;

void set(
TransportChannel original,
long startTime,
String indexName,
int shardId,
int itemCount,
boolean bPrimary) {
boolean bPrimary,
Counter throughputCounter) {
this.original = original;
this.id = String.valueOf(UNIQUE_ID.getAndIncrement());
this.indexName = indexName;
this.shardId = shardId;
this.primary = bPrimary;
this.threadID = String.valueOf(ThreadIDUtil.INSTANCE.getNativeCurrentThreadId());

StringBuilder value =
new StringBuilder()
.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric())
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkMetric.START_TIME.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(startTime)
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkMetric.ITEM_COUNT.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(itemCount)
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.INDEX_NAME.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(indexName)
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.SHARD_ID.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(shardId)
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.PRIMARY.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(bPrimary);

saveMetricValues(
value.toString(),
startTime,
threadID,
id,
PerformanceAnalyzerMetrics.START_FILE_NAME);
this.startValue = getIOUsage(threadID);
this.throughputCounter = throughputCounter;
}

@Override
Expand All @@ -93,55 +79,62 @@ public void sendResponse(TransportResponse response) throws IOException {
original.sendResponse(response);
}

private Map<String, Long> getIOUsage(String tid) {
try (FileReader fileReader =
new FileReader(new File("/proc/" + PID + "/task/" + tid + "/io"));
BufferedReader bufferedReader = new BufferedReader(fileReader); ) {
String line = null;
Map<String, Long> kvmap = new HashMap<>();
while ((line = bufferedReader.readLine()) != null) {
String[] toks = line.split("[: ]+");
String key = toks[0];
long val = Long.parseLong(toks[1]);
kvmap.put(key, val);
}
return kvmap;
} catch (FileNotFoundException e) {
LOGGER.debug("FileNotFound in parse with exception: {}", () -> e.toString());
} catch (Exception e) {
LOGGER.debug(
"Error In addSample PID for: {} Tid for: {} with error: {} with ExceptionCode: {}",
() -> PID,
() -> tid,
() -> e.toString(),
() -> StatExceptionCode.THREAD_IO_ERROR.toString());
StatsCollector.instance().logException(StatExceptionCode.THREAD_IO_ERROR);
}
return null;
}

private double getIOUtilization(Map<String, Long> v, Map<String, Long> oldv) {
if (v != null && oldv != null) {
return v.get("write_bytes") - oldv.get("write_bytes");
} else if (v != null) {
return v.get("write_bytes");
} else {
return 0.0;
}
}

@Override
public void sendResponse(Exception exception) throws IOException {
emitMetricsFinish(exception);
original.sendResponse(exception);
}

private void emitMetricsFinish(Exception exception) {
long currTime = System.currentTimeMillis();
StringBuilder value =
new StringBuilder()
.append(PerformanceAnalyzerMetrics.getCurrentTimeMetric())
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkMetric.FINISH_TIME.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(currTime)
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.INDEX_NAME.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(indexName)
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.SHARD_ID.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(shardId)
.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.PRIMARY.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(primary);
if (exception != null) {
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.EXCEPTION.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(exception.getClass().getName());
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.FAILED.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(true);
} else {
value.append(PerformanceAnalyzerMetrics.sMetricNewLineDelimitor)
.append(ShardBulkDimension.FAILED.toString())
.append(PerformanceAnalyzerMetrics.sKeyValueDelimitor)
.append(false);
Map<String, Long> finishValue = getIOUsage(threadID);
double writeThroughput = getIOUtilization(finishValue, startValue);
if (writeThroughput >= 0.0) {
throughputCounter.add(
writeThroughput,
Tags.create()
.addTag("IndexName", indexName)
.addTag("ShardId", shardId)
.addTag("Operation", "shardbulk"));
}
}

saveMetricValues(
value.toString(),
currTime,
threadID,
id,
PerformanceAnalyzerMetrics.FINISH_FILE_NAME);
}

// This function is called from the security plugin using reflection. Do not
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,13 @@
import org.apache.logging.log4j.Logger;
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.opensearch.cluster.service.ClusterService;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.Counter;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.transport.TransportChannel;
import org.opensearch.transport.TransportRequest;
import org.opensearch.transport.TransportRequestHandler;
Expand All @@ -27,10 +31,19 @@ public class PerformanceAnalyzerTransportRequestHandler<T extends TransportReque
private TransportRequestHandler<T> actualHandler;
boolean logOnce = false;

private ClusterService clusterService = OpenSearchResources.INSTANCE.getClusterService();

private MetricsRegistry metricsRegistry = clusterService.getMetricsRegistry();
private Counter throughputCounter;

PerformanceAnalyzerTransportRequestHandler(
TransportRequestHandler<T> actualHandler, PerformanceAnalyzerController controller) {
this.actualHandler = actualHandler;
this.controller = controller;
this.throughputCounter =
metricsRegistry.createCounter(
"pa.throughputCounter", "throughput counter", "bytes");
metricsRegistry = clusterService.getMetricsRegistry();
}

PerformanceAnalyzerTransportRequestHandler<T> set(TransportRequestHandler<T> actualHandler) {
Expand Down Expand Up @@ -87,7 +100,8 @@ private TransportChannel getShardBulkChannel(T request, TransportChannel channel
bsr.index(),
bsr.shardId().id(),
bsr.items().length,
bPrimary);
bPrimary,
throughputCounter);
} catch (Exception ex) {
if (!logOnce) {
LOG.error(ex);
Expand Down

0 comments on commit 49cdf9d

Please sign in to comment.