Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Adds Latency Metrics for Reroute Allocation actions #12364

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
30 changes: 29 additions & 1 deletion server/src/main/java/org/opensearch/cluster/ClusterModule.java
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.tasks.Task;
import org.opensearch.tasks.TaskResultsService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;

import java.util.ArrayList;
import java.util.Collection;
Expand Down Expand Up @@ -143,14 +145,40 @@ public ClusterModule(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext
) {
this(
settings,
clusterService,
clusterPlugins,
clusterInfoService,
snapshotsInfoService,
threadContext,
NoopMetricsRegistry.INSTANCE
);
}

public ClusterModule(
Settings settings,
ClusterService clusterService,
List<ClusterPlugin> clusterPlugins,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
ThreadContext threadContext,
MetricsRegistry metricsRegistry
) {
this.clusterPlugins = clusterPlugins;
this.deciderList = createAllocationDeciders(settings, clusterService.getClusterSettings(), clusterPlugins);
this.allocationDeciders = new AllocationDeciders(deciderList);
this.shardsAllocator = createShardsAllocator(settings, clusterService.getClusterSettings(), clusterPlugins);
this.clusterService = clusterService;
this.indexNameExpressionResolver = new IndexNameExpressionResolver(threadContext);
this.allocationService = new AllocationService(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this.allocationService = new AllocationService(
allocationDeciders,
shardsAllocator,
clusterInfoService,
snapshotsInfoService,
metricsRegistry
);
}

public static List<Entry> getNamedWriteables() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,9 +54,13 @@
import org.opensearch.cluster.routing.allocation.command.AllocationCommands;
import org.opensearch.cluster.routing.allocation.decider.AllocationDeciders;
import org.opensearch.cluster.routing.allocation.decider.Decision;
import org.opensearch.common.StopWatch;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.gateway.PriorityComparator;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;

import java.util.ArrayList;
import java.util.Collections;
Expand Down Expand Up @@ -91,6 +95,9 @@ public class AllocationService {
private final ShardsAllocator shardsAllocator;
private final ClusterInfoService clusterInfoService;
private SnapshotsInfoService snapshotsInfoService;
private final MetricsRegistry metricsRegistry;
private final Histogram rerouteAllocationActionLatency;
private static final String UNIT = "1";

// only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator
public AllocationService(
Expand All @@ -100,20 +107,39 @@ public AllocationService(
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService);
this(allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService, snapshotsInfoService, NoopMetricsRegistry.INSTANCE);
}

public AllocationService(
AllocationDeciders allocationDeciders,
GatewayAllocator gatewayAllocator,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
MetricsRegistry metricsRegistry
) {
this(allocationDeciders, shardsAllocator, clusterInfoService, snapshotsInfoService, metricsRegistry);
setExistingShardsAllocators(Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, gatewayAllocator));
}

public AllocationService(
AllocationDeciders allocationDeciders,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService
SnapshotsInfoService snapshotsInfoService,
MetricsRegistry metricsRegistry
) {
this.allocationDeciders = allocationDeciders;
this.shardsAllocator = shardsAllocator;
this.clusterInfoService = clusterInfoService;
this.snapshotsInfoService = snapshotsInfoService;
this.metricsRegistry = metricsRegistry;

this.rerouteAllocationActionLatency = metricsRegistry.createHistogram(
"leader.checker.success.count",
"Counter for number of successful leader checks",
UNIT
);
}

/**
Expand Down Expand Up @@ -534,10 +560,16 @@ private void reroute(RoutingAllocation allocation) {
: "auto-expand replicas out of sync with number of nodes in the cluster";
assert assertInitialized();

StopWatch stopWatch = new StopWatch();
stopWatch.start();

removeDelayMarkers(allocation);

allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first
shardsAllocator.allocate(allocation);
stopWatch.stop();
rerouteAllocationActionLatency.record(stopWatch.lastTaskTime().millis());
logger.info("Routing Action ran for {} millis", stopWatch.lastTaskTime().millis());
assert RoutingNodes.assertShardStats(allocation.routingNodes());
}

Expand Down
3 changes: 2 additions & 1 deletion server/src/main/java/org/opensearch/node/Node.java
Original file line number Diff line number Diff line change
Expand Up @@ -664,7 +664,8 @@ protected Node(
clusterPlugins,
clusterInfoService,
snapshotsInfoService,
threadPool.getThreadContext()
threadPool.getThreadContext(),
metricsRegistry
);
modules.add(clusterModule);
IndicesModule indicesModule = new IndicesModule(pluginsService.filterPlugins(MapperPlugin.class));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,8 @@
import org.opensearch.index.IndexNotFoundException;
import org.opensearch.index.shard.ShardNotFoundException;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.opensearch.telemetry.metrics.Histogram;
import org.opensearch.telemetry.metrics.MetricsRegistry;

import java.util.Arrays;
import java.util.Collections;
Expand All @@ -90,6 +92,13 @@
import static org.hamcrest.Matchers.hasSize;
import static org.hamcrest.Matchers.not;
import static org.hamcrest.Matchers.nullValue;
import static org.mockito.ArgumentMatchers.anyDouble;
import static org.mockito.Mockito.any;
import static org.mockito.Mockito.clearInvocations;
import static org.mockito.Mockito.mock;
import static org.mockito.Mockito.times;
import static org.mockito.Mockito.verify;
import static org.mockito.Mockito.when;

public class AllocationCommandsTests extends OpenSearchAllocationTestCase {
private final Logger logger = LogManager.getLogger(AllocationCommandsTests.class);
Expand Down Expand Up @@ -153,11 +162,15 @@ private AbstractAllocateAllocationCommand randomAllocateCommand(String index, in
}

public void testAllocateCommand() {
MetricsRegistry metricsRegistry = mock(MetricsRegistry.class);
Histogram mockHistogram = mock(Histogram.class);
when(metricsRegistry.createHistogram(any(), any(), any())).thenReturn(mockHistogram);
AllocationService allocation = createAllocationService(
Settings.builder()
.put(EnableAllocationDecider.CLUSTER_ROUTING_ALLOCATION_ENABLE_SETTING.getKey(), "none")
.put(EnableAllocationDecider.CLUSTER_ROUTING_REBALANCE_ENABLE_SETTING.getKey(), "none")
.build()
.build(),
metricsRegistry
);
final String index = "test";

Expand Down Expand Up @@ -193,7 +206,8 @@ public void testAllocateCommand() {
.build();
clusterState = allocation.reroute(clusterState, "reroute");
assertThat(clusterState.getRoutingNodes().shardsWithState(INITIALIZING).size(), equalTo(0));

verify(mockHistogram, times(1)).record(anyDouble());
clearInvocations(mockHistogram);
logger.info("--> allocating to non-existent node, should fail");
try {
allocation.reroute(clusterState, new AllocationCommands(randomAllocateCommand(index, shardId.id(), "node42")), false, false);
Expand Down Expand Up @@ -278,12 +292,16 @@ public void testAllocateCommand() {
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(INITIALIZING).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0));
verify(mockHistogram, times(1)).record(anyDouble());
clearInvocations(mockHistogram);

logger.info("--> start the primary shard");
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(0));
verify(mockHistogram, times(1)).record(anyDouble());
clearInvocations(mockHistogram);

logger.info("--> allocate the replica shard on the primary shard node, should fail");
try {
Expand All @@ -309,13 +327,17 @@ public void testAllocateCommand() {
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(INITIALIZING).size(), equalTo(1));
verify(mockHistogram, times(1)).record(anyDouble());
clearInvocations(mockHistogram);

logger.info("--> start the replica shard");
clusterState = startInitializingShardsAndReroute(allocation, clusterState);
assertThat(clusterState.getRoutingNodes().node("node1").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node1").shardsWithState(STARTED).size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").size(), equalTo(1));
assertThat(clusterState.getRoutingNodes().node("node2").shardsWithState(STARTED).size(), equalTo(1));
verify(mockHistogram, times(1)).record(anyDouble());
clearInvocations(mockHistogram);

logger.info("--> verify that we fail when there are no unassigned shards");
try {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
import org.opensearch.common.settings.Settings;
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.snapshots.EmptySnapshotsInfoService;
import org.opensearch.telemetry.metrics.noop.NoopMetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.gateway.TestGatewayAllocator;

Expand Down Expand Up @@ -158,7 +159,8 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}
},
new EmptyClusterInfoService(),
EmptySnapshotsInfoService.INSTANCE
EmptySnapshotsInfoService.INSTANCE,
NoopMetricsRegistry.INSTANCE
);

final String unrealisticAllocatorName = "unrealistic";
Expand Down Expand Up @@ -261,7 +263,13 @@ public ShardAllocationDecision decideShardAllocation(ShardRouting shard, Routing
}

public void testExplainsNonAllocationOfShardWithUnknownAllocator() {
final AllocationService allocationService = new AllocationService(null, null, null, null);
final AllocationService allocationService = new AllocationService(
null,
(ShardsAllocator) null,
null,
null,
NoopMetricsRegistry.INSTANCE
);
allocationService.setExistingShardsAllocators(
Collections.singletonMap(GatewayAllocator.ALLOCATOR_NAME, new TestGatewayAllocator())
);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.opensearch.gateway.GatewayAllocator;
import org.opensearch.snapshots.SnapshotShardSizeInfo;
import org.opensearch.snapshots.SnapshotsInfoService;
import org.opensearch.telemetry.metrics.MetricsRegistry;
import org.opensearch.test.OpenSearchTestCase;
import org.opensearch.test.gateway.TestGatewayAllocator;

Expand Down Expand Up @@ -118,6 +119,22 @@ public static MockAllocationService createAllocationService(Settings settings, C
);
}

public static MockAllocationService createAllocationService(Settings settings, MetricsRegistry metricsRegistry) {
return createAllocationService(settings, EMPTY_CLUSTER_SETTINGS, random(), metricsRegistry);
}

public static MockAllocationService createAllocationService(Settings settings, ClusterSettings clusterSettings,
Random random, MetricsRegistry metricsRegistry) {
return new MockAllocationService(
randomAllocationDeciders(settings, clusterSettings, random),
new TestGatewayAllocator(),
new BalancedShardsAllocator(settings),
EmptyClusterInfoService.INSTANCE,
SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES,
metricsRegistry
);
}

public static MockAllocationService createAllocationService(Settings settings, GatewayAllocator gatewayAllocator) {
return createAllocationService(settings, gatewayAllocator, SNAPSHOT_INFO_SERVICE_WITH_NO_SHARD_SIZES);
}
Expand Down Expand Up @@ -416,6 +433,17 @@ public MockAllocationService(
super(allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService, snapshotsInfoService);
}

public MockAllocationService(
AllocationDeciders allocationDeciders,
GatewayAllocator gatewayAllocator,
ShardsAllocator shardsAllocator,
ClusterInfoService clusterInfoService,
SnapshotsInfoService snapshotsInfoService,
MetricsRegistry metricsRegistry
) {
super(allocationDeciders, gatewayAllocator, shardsAllocator, clusterInfoService, snapshotsInfoService, metricsRegistry);
}

public void setNanoTimeOverride(long nanoTime) {
this.nanoTimeOverride = nanoTime;
}
Expand Down
Loading