Skip to content

Commit

Permalink
Adds latency metrics for ClusterState Appliers and Listeners
Browse files Browse the repository at this point in the history
Signed-off-by: Harsh Garg <[email protected]>
  • Loading branch information
Harsh Garg committed Mar 1, 2024
1 parent bb0b4b0 commit 07f2cc1
Show file tree
Hide file tree
Showing 36 changed files with 277 additions and 66 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.plugins.ActionPlugin;
import org.opensearch.rest.RestHandler;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ExecutorBuilder;
import org.opensearch.threadpool.ScalingExecutorBuilder;
Expand Down Expand Up @@ -50,7 +51,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);

clusterService = new ClusterService(settings, clusterSettings, threadPool);
clusterService = new ClusterService(settings, clusterSettings, threadPool, mock(MetricsRegistry.class));

}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import org.opensearch.search.aggregations.bucket.terms.TermsAggregationBuilder;
import org.opensearch.search.aggregations.support.ValueType;
import org.opensearch.search.builder.SearchSourceBuilder;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.junit.Before;

Expand Down Expand Up @@ -57,7 +58,7 @@ public void setup() {
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_ENABLED);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_SIZE);
clusterSettings.registerSetting(QueryInsightsSettings.TOP_N_LATENCY_QUERIES_WINDOW_SIZE);
clusterService = new ClusterService(settings, clusterSettings, null);
clusterService = new ClusterService(settings, clusterSettings, null, mock(MetricsRegistry.class));
when(queryInsightsService.isCollectionEnabled(MetricType.LATENCY)).thenReturn(true);
when(queryInsightsService.getTopQueriesService(MetricType.LATENCY)).thenReturn(topQueriesService);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
import org.opensearch.plugin.insights.rules.action.top_queries.TopQueriesResponse;
import org.opensearch.plugin.insights.rules.model.MetricType;
import org.opensearch.plugin.insights.settings.QueryInsightsSettings;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.threadpool.ThreadPool;
import org.opensearch.transport.TransportService;
Expand All @@ -33,7 +34,7 @@ public class TransportTopQueriesActionTests extends OpenSearchTestCase {
private final Settings.Builder settingsBuilder = Settings.builder();
private final Settings settings = settingsBuilder.build();
private final ClusterSettings clusterSettings = new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS);
private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool);
private final ClusterService clusterService = new ClusterService(settings, clusterSettings, threadPool, mock(MetricsRegistry.class));
private final TransportService transportService = mock(TransportService.class);
private final QueryInsightsService topQueriesByLatencyService = mock(QueryInsightsService.class);
private final ActionFilters actionFilters = mock(ActionFilters.class);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -61,6 +61,9 @@
import org.opensearch.common.util.concurrent.PrioritizedOpenSearchThreadPoolExecutor;
import org.opensearch.common.util.concurrent.ThreadContext;
import org.opensearch.core.concurrency.OpenSearchRejectedExecutionException;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.tags.Tags;
import org.opensearch.threadpool.Scheduler;
import org.opensearch.threadpool.ThreadPool;

Expand Down Expand Up @@ -89,6 +92,10 @@
public class ClusterApplierService extends AbstractLifecycleComponent implements ClusterApplier {
private static final Logger logger = LogManager.getLogger(ClusterApplierService.class);

private static final String LATENCY_METRIC_UNIT = "ms";

private static final String LATENCY_METRIC_OPERATION_TAG_KEY = "Operation";

public static final Setting<TimeValue> CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING = Setting.positiveTimeSetting(
"cluster.service.slow_task_logging_threshold",
TimeValue.timeValueSeconds(30),
Expand Down Expand Up @@ -121,7 +128,17 @@ public class ClusterApplierService extends AbstractLifecycleComponent implements

private NodeConnectionsService nodeConnectionsService;

public ClusterApplierService(String nodeName, Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
private final Histogram clusterStateAppliersHistogram;

private final Histogram clusterStateListenersHistogram;

public ClusterApplierService(
String nodeName,
Settings settings,
ClusterSettings clusterSettings,
ThreadPool threadPool,
MetricsRegistry metricsRegistry
) {
this.clusterSettings = clusterSettings;
this.threadPool = threadPool;
this.state = new AtomicReference<>();
Expand All @@ -132,6 +149,17 @@ public ClusterApplierService(String nodeName, Settings settings, ClusterSettings
CLUSTER_SERVICE_SLOW_TASK_LOGGING_THRESHOLD_SETTING,
this::setSlowTaskLoggingThreshold
);

this.clusterStateAppliersHistogram = metricsRegistry.createHistogram(
"cluster.state.appliers.latency",
"Histogram for tracking the latency of cluster state appliers",
LATENCY_METRIC_UNIT
);
this.clusterStateListenersHistogram = metricsRegistry.createHistogram(
"cluster.state.listeners.latency",
"Histogram for tracking the latency of cluster state listeners",
LATENCY_METRIC_UNIT
);
}

private void setSlowTaskLoggingThreshold(TimeValue slowTaskLoggingThreshold) {
Expand Down Expand Up @@ -597,15 +625,21 @@ private void callClusterStateAppliers(ClusterChangedEvent clusterChangedEvent, S
callClusterStateAppliers(clusterChangedEvent, stopWatch, lowPriorityStateAppliers);
}

private static void callClusterStateAppliers(
private void callClusterStateAppliers(
ClusterChangedEvent clusterChangedEvent,
StopWatch stopWatch,
Collection<ClusterStateApplier> clusterStateAppliers
) {
for (ClusterStateApplier applier : clusterStateAppliers) {
logger.trace("calling [{}] with change to version [{}]", applier, clusterChangedEvent.state().version());
try (TimingHandle ignored = stopWatch.timing("running applier [" + applier + "]")) {
long applierStartTimeMS = currentTimeInMillis();
applier.applyClusterState(clusterChangedEvent);
double applierExecutionTimeMS = (double) Math.max(0, currentTimeInMillis() - applierStartTimeMS);
clusterStateAppliersHistogram.record(
applierExecutionTimeMS,
Tags.create().addTag(LATENCY_METRIC_OPERATION_TAG_KEY, applier.getClass().getSimpleName())
);
}
}
}
Expand All @@ -624,7 +658,13 @@ private void callClusterStateListener(
try {
logger.trace("calling [{}] with change to version [{}]", listener, clusterChangedEvent.state().version());
try (TimingHandle ignored = stopWatch.timing("notifying listener [" + listener + "]")) {
long listenerStartTimeMS = currentTimeInMillis();
listener.clusterChanged(clusterChangedEvent);
double listenerExecutionTimeMS = (double) Math.max(0, currentTimeInMillis() - listenerStartTimeMS);
clusterStateListenersHistogram.record(
listenerExecutionTimeMS,
Tags.create().addTag(LATENCY_METRIC_OPERATION_TAG_KEY, listener.getClass().getSimpleName())
);
}
} catch (Exception ex) {
logger.warn("failed to notify ClusterStateListener", ex);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.index.IndexingPressureService;
import org.opensearch.node.Node;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.threadpool.ThreadPool;

import java.util.Collections;
Expand Down Expand Up @@ -91,12 +92,12 @@ public class ClusterService extends AbstractLifecycleComponent {

private IndexingPressureService indexingPressureService;

public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool) {
public ClusterService(Settings settings, ClusterSettings clusterSettings, ThreadPool threadPool, MetricsRegistry metricsRegistry) {
this(
settings,
clusterSettings,
new ClusterManagerService(settings, clusterSettings, threadPool),
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool)
new ClusterApplierService(Node.NODE_NAME_SETTING.get(settings), settings, clusterSettings, threadPool, metricsRegistry)
);
}

Expand Down
29 changes: 17 additions & 12 deletions server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -595,21 +595,10 @@ protected Node(
getCustomNameResolvers(pluginsService.filterPlugins(DiscoveryPlugin.class))
);

List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(settings, settingsModule.getClusterSettings(), threadPool);
clusterService.addStateApplier(scriptService);
resourcesToClose.add(clusterService);
final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
if (consistentSettings.isEmpty() == false) {
clusterService.addLocalNodeMasterListener(
new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher()
);
}

TracerFactory tracerFactory;
MetricsRegistryFactory metricsRegistryFactory;
if (FeatureFlags.isEnabled(TELEMETRY)) {
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, clusterService.getClusterSettings());
final TelemetrySettings telemetrySettings = new TelemetrySettings(settings, settingsModule.getClusterSettings());
if (telemetrySettings.isTracingFeatureEnabled() || telemetrySettings.isMetricsFeatureEnabled()) {
List<TelemetryPlugin> telemetryPlugins = pluginsService.filterPlugins(TelemetryPlugin.class);
TelemetryModule telemetryModule = new TelemetryModule(telemetryPlugins, telemetrySettings);
Expand Down Expand Up @@ -637,6 +626,22 @@ protected Node(
resourcesToClose.add(tracer::close);
resourcesToClose.add(metricsRegistry::close);

List<ClusterPlugin> clusterPlugins = pluginsService.filterPlugins(ClusterPlugin.class);
final ClusterService clusterService = new ClusterService(
settings,
settingsModule.getClusterSettings(),
threadPool,
metricsRegistry
);
clusterService.addStateApplier(scriptService);
resourcesToClose.add(clusterService);
final Set<Setting<?>> consistentSettings = settingsModule.getConsistentSettings();
if (consistentSettings.isEmpty() == false) {
clusterService.addLocalNodeMasterListener(
new ConsistentSettingsService(settings, clusterService, consistentSettings).newHashPublisher()
);
}

final ClusterInfoService clusterInfoService = newClusterInfoService(settings, clusterService, threadPool, client);
final UsageService usageService = new UsageService();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.opensearch.index.VersionType;
import org.opensearch.indices.SystemIndices;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
Expand Down Expand Up @@ -153,7 +154,12 @@ private void indicesThatCannotBeCreatedTestCase(
null,
new IndexingPressureService(
Settings.EMPTY,
new ClusterService(Settings.EMPTY, new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
new ClusterService(
Settings.EMPTY,
new ClusterSettings(Settings.EMPTY, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null,
mock(MetricsRegistry.class)
)
),
null,
new SystemIndices(emptyMap()),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,7 @@
import org.opensearch.indices.SystemIndices;
import org.opensearch.ingest.IngestService;
import org.opensearch.tasks.Task;
import org.opensearch.telemetry.metrics.NoopMetricsRegistryFactory;
import org.opensearch.telemetry.tracing.noop.NoopTracer;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.VersionUtils;
Expand Down Expand Up @@ -170,7 +171,12 @@ class TestTransportBulkAction extends TransportBulkAction {
),
new IndexingPressureService(
SETTINGS,
new ClusterService(SETTINGS, new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS), null)
new ClusterService(
SETTINGS,
new ClusterSettings(SETTINGS, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS),
null,
new NoopMetricsRegistryFactory().getMetricsRegistry()
)
),
null,
new SystemIndices(emptyMap()),
Expand Down
Loading

0 comments on commit 07f2cc1

Please sign in to comment.