Skip to content

Commit

Permalink
test changes
Browse files Browse the repository at this point in the history
Signed-off-by: Gagan Juneja <[email protected]>
  • Loading branch information
Gagan Juneja committed Jul 17, 2024
1 parent bf2b5ea commit 6a2f94d
Show file tree
Hide file tree
Showing 13 changed files with 767 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,9 @@
import org.opensearch.performanceanalyzer.http_action.whoami.TransportWhoAmIAction;
import org.opensearch.performanceanalyzer.http_action.whoami.WhoAmIAction;
import org.opensearch.performanceanalyzer.listener.PerformanceAnalyzerSearchListener;
import org.opensearch.performanceanalyzer.listener.RTFPerformanceAnalyzerSearchListener;
import org.opensearch.performanceanalyzer.transport.PerformanceAnalyzerTransportInterceptor;
import org.opensearch.performanceanalyzer.transport.RTFPerformanceAnalyzerTransportInterceptor;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.performanceanalyzer.writer.EventLogQueueProcessor;
import org.opensearch.plugins.ActionPlugin;
Expand Down Expand Up @@ -302,7 +304,10 @@ public List<ActionFilter> getActionFilters() {
public void onIndexModule(IndexModule indexModule) {
PerformanceAnalyzerSearchListener performanceanalyzerSearchListener =
new PerformanceAnalyzerSearchListener(performanceAnalyzerController);
RTFPerformanceAnalyzerSearchListener rtfPerformanceAnalyzerSearchListener =
new RTFPerformanceAnalyzerSearchListener(performanceAnalyzerController);
indexModule.addSearchOperationListener(performanceanalyzerSearchListener);
indexModule.addSearchOperationListener(rtfPerformanceAnalyzerSearchListener);
}

// follower check, leader check
Expand Down Expand Up @@ -330,8 +335,9 @@ public void onDiscovery(Discovery discovery) {
@Override
public List<TransportInterceptor> getTransportInterceptors(
NamedWriteableRegistry namedWriteableRegistry, ThreadContext threadContext) {
return singletonList(
new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController));
return Arrays.asList(
new PerformanceAnalyzerTransportInterceptor(performanceAnalyzerController),
new RTFPerformanceAnalyzerTransportInterceptor(performanceAnalyzerController));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -383,4 +383,13 @@ public boolean isCollectorDisabled(

return disabledCollectorsList.contains(collectorName);
}

/**
* Collectors Setting value.
*
* @return collectorsSettingValue
*/
public int getCollectorsSettingValue() {
return collectorsSettingValue;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import org.opensearch.performanceanalyzer.commons.metrics.MetricsProcessor;
import org.opensearch.performanceanalyzer.commons.metrics.PerformanceAnalyzerMetrics;
import org.opensearch.performanceanalyzer.commons.util.ThreadIDUtil;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.search.internal.SearchContext;

Expand All @@ -37,7 +38,14 @@ public String toString() {
}

private SearchListener getSearchListener() {
return controller.isPerformanceAnalyzerEnabled() ? this : NO_OP_SEARCH_LISTENER;
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

private boolean isSearchListenerEnabled() {
return controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
== Util.CollectorMode.RCA.getValue());
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,240 @@
/*
* Copyright OpenSearch Contributors
* SPDX-License-Identifier: Apache-2.0
*/

package org.opensearch.performanceanalyzer.listener;

import static org.opensearch.performanceanalyzer.commons.stats.metrics.StatExceptionCode.OPENSEARCH_REQUEST_INTERCEPTOR_ERROR;

import com.sun.management.ThreadMXBean;
import java.lang.management.ManagementFactory;
import java.util.Map;
import org.apache.logging.log4j.LogManager;
import org.apache.logging.log4j.Logger;
import org.opensearch.core.action.NotifyOnceListener;
import org.opensearch.index.shard.SearchOperationListener;
import org.opensearch.performanceanalyzer.OpenSearchResources;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.metrics.RTFMetrics;
import org.opensearch.performanceanalyzer.commons.os.OSGlobals;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.performanceanalyzer.util.Utils;
import org.opensearch.search.internal.SearchContext;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;

public class RTFPerformanceAnalyzerSearchListener
implements SearchOperationListener, SearchListener {

private static final Logger LOG =
LogManager.getLogger(RTFPerformanceAnalyzerSearchListener.class);
private static final String OPERATION_SHARD_FETCH = "shard_fetch";
private static final String OPERATION_SHARD_QUERY = "shard_query";
public static final String QUERY_CPU_START_TIME = "query_cpu";
public static final String QUERY_START_TIME = "query_start_time";
public static final String FETCH_CPU_START_TIME = "fetch_cpu";
public static final String FETCH_START_TIME = "fetch_start_time";
private final ThreadLocal<Map<String, Long>> threadLocal = new ThreadLocal<>();
private static final SearchListener NO_OP_SEARCH_LISTENER = new NoOpSearchListener();
private static final ThreadMXBean threadMXBean =
(ThreadMXBean) ManagementFactory.getThreadMXBean();
private final long scClkTck;
private final PerformanceAnalyzerController controller;
private Histogram cpuUtilizationHistogram;

public RTFPerformanceAnalyzerSearchListener(final PerformanceAnalyzerController controller) {
this.controller = controller;
this.scClkTck = OSGlobals.getScClkTck();
this.cpuUtilizationHistogram = createCPUUtilizationHistogram();
}

private Histogram createCPUUtilizationHistogram() {
MetricsRegistry metricsRegistry = OpenSearchResources.INSTANCE.getMetricsRegistry();
if (metricsRegistry != null) {
return metricsRegistry.createHistogram(
RTFMetrics.OSMetrics.CPU_UTILIZATION.toString(),
"CPU Utilization per shard for an operation",
"rate");
} else {
return null;
}
}

@Override
public String toString() {
return RTFPerformanceAnalyzerSearchListener.class.getSimpleName();
}

private SearchListener getSearchListener() {
return isSearchListenerEnabled() ? this : NO_OP_SEARCH_LISTENER;
}

private boolean isSearchListenerEnabled() {
return cpuUtilizationHistogram != null
&& controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
== Util.CollectorMode.TELEMETRY.getValue());
}


@Override
public void onPreQueryPhase(SearchContext searchContext) {
try {
getSearchListener().preQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onQueryPhase(SearchContext searchContext, long tookInNanos) {
try {
getSearchListener().queryPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedQueryPhase(SearchContext searchContext) {
try {
getSearchListener().failedQueryPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onPreFetchPhase(SearchContext searchContext) {
try {
getSearchListener().preFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFetchPhase(SearchContext searchContext, long tookInNanos) {
try {
getSearchListener().fetchPhase(searchContext, tookInNanos);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void onFailedFetchPhase(SearchContext searchContext) {
try {
getSearchListener().failedFetchPhase(searchContext);
} catch (Exception ex) {
LOG.error(ex);
StatsCollector.instance().logException(OPENSEARCH_REQUEST_INTERCEPTOR_ERROR);
}
}

@Override
public void preQueryPhase(SearchContext searchContext) {
threadLocal.get().put(QUERY_START_TIME, System.nanoTime());
}

@Override
public void queryPhase(SearchContext searchContext, long tookInNanos) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_FETCH, false);
}

@Override
public void failedQueryPhase(SearchContext searchContext) {
long queryStartTime = threadLocal.get().getOrDefault(QUERY_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
searchContext, queryStartTime, OPERATION_SHARD_FETCH, true);
}

@Override
public void preFetchPhase(SearchContext searchContext) {
threadLocal.get().put(FETCH_START_TIME, System.nanoTime());
}

@Override
public void fetchPhase(SearchContext searchContext, long tookInNanos) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, false);
}

@Override
public void failedFetchPhase(SearchContext searchContext) {
long fetchStartTime = threadLocal.get().getOrDefault(FETCH_START_TIME, 0l);
addCPUResourceTrackingCompletionListener(
searchContext, fetchStartTime, OPERATION_SHARD_FETCH, true);
}

private double calculateCPUUtilization(long phaseStartTime, long phaseCPUStartTime) {
long totalCpuTime =
Math.min(0, (threadMXBean.getCurrentThreadCpuTime() - phaseCPUStartTime));
return Utils.calculateCPUUtilization(
totalCpuTime, scClkTck, phaseStartTime - System.nanoTime());
}

private void recordCPUUtilizationMetric(
SearchContext searchContext, double cpuUtilization, String operation) {
cpuUtilizationHistogram.record(
cpuUtilization,
Tags.create()
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.shardTarget().getShardId().getId())
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.shardTarget().getShardId().getIndex().getName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext.shardTarget().getShardId().getIndex().getUUID())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation));
}

private NotifyOnceListener<Task> addCPUResourceTrackingCompletionListener(
SearchContext searchContext, long startTime, String operation, boolean isFailed) {
return new NotifyOnceListener<Task>() {
@Override
protected void innerOnResponse(Task task) {
LOG.debug("Updating the counter for task {}", task.getId());
cpuUtilizationHistogram.record(
Utils.calculateCPUUtilization(
task.getTotalResourceStats().getCpuTimeInNanos(),
scClkTck,
(System.nanoTime() - startTime)),
Tags.create()
.addTag(
RTFMetrics.CommonDimension.INDEX_NAME.toString(),
searchContext.shardTarget().getShardId().getIndexName())
.addTag(
RTFMetrics.CommonDimension.INDEX_UUID.toString(),
searchContext
.shardTarget()
.getShardId()
.getIndex()
.getUUID())
.addTag(
RTFMetrics.CommonDimension.SHARD_ID.toString(),
searchContext.shardTarget().getShardId().getId())
.addTag(RTFMetrics.CommonDimension.OPERATION.toString(), operation)
.addTag(RTFMetrics.CommonDimension.FAILED.toString(), isFailed));
}

@Override
protected void innerOnFailure(Exception e) {}
};
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,7 @@
import org.opensearch.action.bulk.BulkShardRequest;
import org.opensearch.action.support.replication.TransportReplicationAction.ConcreteShardRequest;
import org.opensearch.performanceanalyzer.commons.collectors.StatsCollector;
import org.opensearch.performanceanalyzer.commons.util.Util;
import org.opensearch.performanceanalyzer.config.PerformanceAnalyzerController;
import org.opensearch.tasks.Task;
import org.opensearch.transport.TransportChannel;
Expand Down Expand Up @@ -45,7 +46,7 @@ public void messageReceived(T request, TransportChannel channel, Task task) thro

@VisibleForTesting
TransportChannel getChannel(T request, TransportChannel channel, Task task) {
if (!controller.isPerformanceAnalyzerEnabled()) {
if (!isCollectorEnabled()) {
return channel;
}

Expand All @@ -56,6 +57,13 @@ TransportChannel getChannel(T request, TransportChannel channel, Task task) {
}
}

private boolean isCollectorEnabled() {
return controller.isPerformanceAnalyzerEnabled()
&& (controller.getCollectorsSettingValue() == Util.CollectorMode.DUAL.getValue()
|| controller.getCollectorsSettingValue()
== Util.CollectorMode.RCA.getValue());
}

private TransportChannel getShardBulkChannel(T request, TransportChannel channel, Task task) {
String className = request.getClass().getName();
boolean bPrimary = false;
Expand Down
Loading

0 comments on commit 6a2f94d

Please sign in to comment.