From f9fdb3d1147c7ebe31ec678671a0fd7c9249f330 Mon Sep 17 00:00:00 2001 From: Rishab Nahata Date: Mon, 22 Jul 2024 11:59:14 +0530 Subject: [PATCH] Add settings for allocator timeout and add additional logs Signed-off-by: Rishab Nahata --- .../routing/allocation/AllocationService.java | 18 +++-- .../allocation/ExistingShardsAllocator.java | 9 +++ .../allocator/BalancedShardsAllocator.java | 26 +++++-- .../allocator/LocalShardsBalancer.java | 19 ++++- .../allocator/RemoteShardsBalancer.java | 2 +- .../allocation/allocator/ShardsBalancer.java | 2 +- .../common/settings/ClusterSettings.java | 3 + .../gateway/ShardsBatchGatewayAllocator.java | 69 ++++++++++++++++++- 8 files changed, 134 insertions(+), 14 deletions(-) diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java index 9eeb75d4f05aa..b888a6854c91c 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/AllocationService.java @@ -73,6 +73,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Supplier; @@ -102,7 +103,6 @@ public class AllocationService { private final ClusterInfoService clusterInfoService; private SnapshotsInfoService snapshotsInfoService; private final ClusterManagerMetrics clusterManagerMetrics; - private final long maxRunTimeoutInMillis = 5; // only for tests that use the GatewayAllocator as the unique ExistingShardsAllocator public AllocationService( @@ -568,8 +568,14 @@ private void reroute(RoutingAllocation allocation) { long rerouteStartTimeNS = System.nanoTime(); removeDelayMarkers(allocation); + long startTime = System.nanoTime(); allocateExistingUnassignedShards(allocation); // try to allocate existing shard copies first + logger.info("Completing allocate unassigned, elapsed time: [{}]", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); + startTime = System.nanoTime(); shardsAllocator.allocate(allocation); + logger.info("Shard allocator to allocate all shards, elapsed time: [{}]", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); clusterManagerMetrics.recordLatency( clusterManagerMetrics.rerouteHistogram, (double) Math.max(0, TimeValue.nsecToMSec(System.nanoTime() - rerouteStartTimeNS)) @@ -620,22 +626,26 @@ private void allocateExistingUnassignedShards(RoutingAllocation allocation) { private void allocateAllUnassignedShards(RoutingAllocation allocation) { ExistingShardsAllocator allocator = existingShardsAllocators.get(ShardsBatchGatewayAllocator.ALLOCATOR_NAME); - executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, true), () -> maxRunTimeoutInMillis); + executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, true), () -> allocator.getPrimaryBatchAllocatorTimeout().millis(), true); allocator.afterPrimariesBeforeReplicas(allocation); // Replicas Assignment - executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, false), () -> maxRunTimeoutInMillis); + executeTimedRunnables(allocator.allocateAllUnassignedShards(allocation, false), () -> allocator.getReplicaBatchAllocatorTimeout().millis(), false); } - private void executeTimedRunnables(List> runnables, Supplier maxRunTimeSupplier) { + private void executeTimedRunnables(List> runnables, Supplier maxRunTimeSupplier, boolean primary) { + logger.info("Executing timed runnables for primary [{}] of size [{}]", primary, runnables.size()); Collections.shuffle(runnables); long startTime = System.nanoTime(); for (Consumer workQueue : runnables) { if (System.nanoTime() - startTime < TimeValue.timeValueMillis(maxRunTimeSupplier.get()).nanos()) { + logger.info("Starting primary [{}] batch to allocate", primary); workQueue.accept(false); } else { + logger.info("Timing out primary [{}] batch to allocate", primary); workQueue.accept(true); } } + logger.info("Time taken to execute timed runnables in this cycle:[{}ms]", TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } private void disassociateDeadNodes(RoutingAllocation allocation) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java index 097ee95d186fa..a70fc6e4bc926 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/ExistingShardsAllocator.java @@ -38,6 +38,7 @@ import org.opensearch.cluster.routing.UnassignedInfo; import org.opensearch.common.Nullable; import org.opensearch.common.settings.Setting; +import org.opensearch.common.unit.TimeValue; import org.opensearch.gateway.GatewayAllocator; import org.opensearch.gateway.ShardsBatchGatewayAllocator; @@ -122,6 +123,14 @@ default List> allocateAllUnassignedShards(RoutingAllocation al return runnables; } + default TimeValue getPrimaryBatchAllocatorTimeout() { + return TimeValue.MINUS_ONE; + } + + default TimeValue getReplicaBatchAllocatorTimeout() { + return TimeValue.MINUS_ONE; + } + /** * Returns an explanation for a single unassigned shard. */ diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java index b2443490dd973..47fe51013b38f 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/BalancedShardsAllocator.java @@ -54,6 +54,7 @@ import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Setting.Property; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import java.util.HashMap; import java.util.HashSet; @@ -162,6 +163,13 @@ public class BalancedShardsAllocator implements ShardsAllocator { Property.NodeScope ); + public static final Setting ALLOCATE_UNASSIGNED_TIMEOUT_SETTING = Setting.timeSetting( + "cluster.routing.allocation.allocate_unassigned_timeout", + TimeValue.MINUS_ONE, + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private volatile boolean movePrimaryFirst; private volatile ShardMovementStrategy shardMovementStrategy; @@ -172,6 +180,7 @@ public class BalancedShardsAllocator implements ShardsAllocator { private volatile float shardBalanceFactor; private volatile WeightFunction weightFunction; private volatile float threshold; + private volatile TimeValue allocateUnassignedTimeout; public BalancedShardsAllocator(Settings settings) { this(settings, new ClusterSettings(settings, ClusterSettings.BUILT_IN_CLUSTER_SETTINGS)); @@ -187,6 +196,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting setPreferPrimaryShardBalance(PREFER_PRIMARY_SHARD_BALANCE.get(settings)); setPreferPrimaryShardRebalance(PREFER_PRIMARY_SHARD_REBALANCE.get(settings)); setShardMovementStrategy(SHARD_MOVEMENT_STRATEGY_SETTING.get(settings)); + setAllocateUnassignedTimeout(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING.get(settings)); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_BALANCE, this::setPreferPrimaryShardBalance); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVE_PRIMARY_FIRST_SETTING, this::setMovePrimaryFirst); clusterSettings.addSettingsUpdateConsumer(SHARD_MOVEMENT_STRATEGY_SETTING, this::setShardMovementStrategy); @@ -195,6 +205,7 @@ public BalancedShardsAllocator(Settings settings, ClusterSettings clusterSetting clusterSettings.addSettingsUpdateConsumer(PRIMARY_SHARD_REBALANCE_BUFFER, this::updatePreferPrimaryShardBalanceBuffer); clusterSettings.addSettingsUpdateConsumer(PREFER_PRIMARY_SHARD_REBALANCE, this::setPreferPrimaryShardRebalance); clusterSettings.addSettingsUpdateConsumer(THRESHOLD_SETTING, this::setThreshold); + clusterSettings.addSettingsUpdateConsumer(ALLOCATE_UNASSIGNED_TIMEOUT_SETTING, this::setAllocateUnassignedTimeout); } /** @@ -269,6 +280,10 @@ private void setThreshold(float threshold) { this.threshold = threshold; } + private void setAllocateUnassignedTimeout(TimeValue allocateUnassignedTimeout) { + this.allocateUnassignedTimeout = allocateUnassignedTimeout; + } + @Override public void allocate(RoutingAllocation allocation) { if (allocation.routingNodes().size() == 0) { @@ -282,7 +297,8 @@ public void allocate(RoutingAllocation allocation) { weightFunction, threshold, preferPrimaryShardBalance, - preferPrimaryShardRebalance + preferPrimaryShardRebalance, + allocateUnassignedTimeout ); localShardsBalancer.allocateUnassigned(); localShardsBalancer.moveShards(); @@ -297,6 +313,7 @@ public void allocate(RoutingAllocation allocation) { @Override public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, final RoutingAllocation allocation) { + long startTime = System.nanoTime(); ShardsBalancer localShardsBalancer = new LocalShardsBalancer( logger, allocation, @@ -304,12 +321,13 @@ public ShardAllocationDecision decideShardAllocation(final ShardRouting shard, f weightFunction, threshold, preferPrimaryShardBalance, - preferPrimaryShardRebalance + preferPrimaryShardRebalance, + allocateUnassignedTimeout ); AllocateUnassignedDecision allocateUnassignedDecision = AllocateUnassignedDecision.NOT_TAKEN; MoveDecision moveDecision = MoveDecision.NOT_TAKEN; if (shard.unassigned()) { - allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard); + allocateUnassignedDecision = localShardsBalancer.decideAllocateUnassigned(shard, startTime); } else { moveDecision = localShardsBalancer.decideMove(shard); if (moveDecision.isDecisionTaken() && moveDecision.canRemain()) { @@ -558,7 +576,7 @@ public Balancer( float threshold, boolean preferPrimaryBalance ) { - super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false); + super(logger, allocation, shardMovementStrategy, weight, threshold, preferPrimaryBalance, false, TimeValue.MINUS_ONE); } } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java index 00eb79add9f1d..d2336608a9c23 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/LocalShardsBalancer.java @@ -29,6 +29,7 @@ import org.opensearch.cluster.routing.allocation.decider.Decision; import org.opensearch.cluster.routing.allocation.decider.DiskThresholdDecider; import org.opensearch.common.collect.Tuple; +import org.opensearch.common.unit.TimeValue; import org.opensearch.gateway.PriorityComparator; import java.util.ArrayList; @@ -40,6 +41,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.concurrent.TimeUnit; import java.util.stream.Stream; import java.util.stream.StreamSupport; @@ -60,6 +62,7 @@ public class LocalShardsBalancer extends ShardsBalancer { private final boolean preferPrimaryBalance; private final boolean preferPrimaryRebalance; + private final TimeValue allocateUnassignedTimeout; private final BalancedShardsAllocator.WeightFunction weight; private final float threshold; @@ -77,7 +80,8 @@ public LocalShardsBalancer( BalancedShardsAllocator.WeightFunction weight, float threshold, boolean preferPrimaryBalance, - boolean preferPrimaryRebalance + boolean preferPrimaryRebalance, + TimeValue allocateUnassignedTimeout ) { this.logger = logger; this.allocation = allocation; @@ -94,6 +98,7 @@ public LocalShardsBalancer( this.preferPrimaryBalance = preferPrimaryBalance; this.preferPrimaryRebalance = preferPrimaryRebalance; this.shardMovementStrategy = shardMovementStrategy; + this.allocateUnassignedTimeout = allocateUnassignedTimeout; } /** @@ -742,6 +747,7 @@ private Map buildModelFromAssigned() */ @Override void allocateUnassigned() { + long startTime = System.nanoTime(); RoutingNodes.UnassignedShards unassigned = routingNodes.unassigned(); assert !nodes.isEmpty(); if (logger.isTraceEnabled()) { @@ -797,7 +803,7 @@ void allocateUnassigned() { do { for (int i = 0; i < primaryLength; i++) { ShardRouting shard = primary[i]; - final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard); + final AllocateUnassignedDecision allocationDecision = decideAllocateUnassigned(shard, startTime); final String assignedNodeId = allocationDecision.getTargetNode() != null ? allocationDecision.getTargetNode().getId() : null; @@ -870,6 +876,8 @@ void allocateUnassigned() { secondaryLength = 0; } while (primaryLength > 0); // clear everything we have either added it or moved to ignoreUnassigned + logger.debug("Time taken in allocate unassigned [{}]", + TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } /** @@ -879,12 +887,17 @@ void allocateUnassigned() { * is of type {@link Decision.Type#NO}, then the assigned node will be null. */ @Override - AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard) { + AllocateUnassignedDecision decideAllocateUnassigned(final ShardRouting shard, long startTime) { if (shard.assignedToNode()) { // we only make decisions for unassigned shards here return AllocateUnassignedDecision.NOT_TAKEN; } + if (System.nanoTime() - startTime > allocateUnassignedTimeout.nanos()) { + logger.info("Timed out while running Local shard balancer allocate unassigned - outer loop"); + return AllocateUnassignedDecision.throttle(null); + } + final boolean explain = allocation.debugDecision(); Decision shardLevelDecision = allocation.deciders().canAllocate(shard, allocation); if (shardLevelDecision.type() == Decision.Type.NO && explain == false) { diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java index a05938c176678..0fc48092c95a2 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/RemoteShardsBalancer.java @@ -288,7 +288,7 @@ private Map calculateNodePrimaryShardCount(List re } @Override - AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting) { + AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting, long startTime) { throw new UnsupportedOperationException("remote shards balancer does not support decision operations"); } diff --git a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java index ef2dbd34644a7..f6284797a303d 100644 --- a/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java +++ b/server/src/main/java/org/opensearch/cluster/routing/allocation/allocator/ShardsBalancer.java @@ -42,7 +42,7 @@ public abstract class ShardsBalancer { * @param shardRouting the shard for which the decision has to be made * @return the allocation decision */ - abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting); + abstract AllocateUnassignedDecision decideAllocateUnassigned(ShardRouting shardRouting, long startTime); /** * Makes a decision on whether to move a started shard to another node. diff --git a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java index 5dcf23ae52294..d8af832d0d9b2 100644 --- a/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java +++ b/server/src/main/java/org/opensearch/common/settings/ClusterSettings.java @@ -267,6 +267,7 @@ public void apply(Settings value, Settings current, Settings previous) { BalancedShardsAllocator.SHARD_MOVE_PRIMARY_FIRST_SETTING, BalancedShardsAllocator.SHARD_MOVEMENT_STRATEGY_SETTING, BalancedShardsAllocator.THRESHOLD_SETTING, + BalancedShardsAllocator.ALLOCATE_UNASSIGNED_TIMEOUT_SETTING, BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING, BreakerSettings.CIRCUIT_BREAKER_OVERHEAD_SETTING, BreakerSettings.CIRCUIT_BREAKER_TYPE, @@ -342,6 +343,8 @@ public void apply(Settings value, Settings current, Settings previous) { GatewayService.RECOVER_AFTER_NODES_SETTING, GatewayService.RECOVER_AFTER_TIME_SETTING, ShardsBatchGatewayAllocator.GATEWAY_ALLOCATOR_BATCH_SIZE, + ShardsBatchGatewayAllocator.PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, + ShardsBatchGatewayAllocator.REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, PersistedClusterStateService.SLOW_WRITE_LOGGING_THRESHOLD, NetworkModule.HTTP_DEFAULT_TYPE_SETTING, NetworkModule.TRANSPORT_DEFAULT_TYPE_SETTING, diff --git a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java index 6247f4a554f9f..f47a4bdf73a9f 100644 --- a/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java +++ b/server/src/main/java/org/opensearch/gateway/ShardsBatchGatewayAllocator.java @@ -27,8 +27,10 @@ import org.opensearch.common.UUIDs; import org.opensearch.common.inject.Inject; import org.opensearch.common.lease.Releasables; +import org.opensearch.common.settings.ClusterSettings; import org.opensearch.common.settings.Setting; import org.opensearch.common.settings.Settings; +import org.opensearch.common.unit.TimeValue; import org.opensearch.common.util.concurrent.ConcurrentCollections; import org.opensearch.common.util.set.Sets; import org.opensearch.core.action.ActionListener; @@ -52,6 +54,7 @@ import java.util.Set; import java.util.Spliterators; import java.util.concurrent.ConcurrentMap; +import java.util.concurrent.TimeUnit; import java.util.function.Consumer; import java.util.function.Predicate; import java.util.stream.Collectors; @@ -70,6 +73,12 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { private final long maxBatchSize; private static final short DEFAULT_SHARD_BATCH_SIZE = 2000; + private static final String PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = "cluster.routing.allocation.shards_batch_gateway_allocator.primary_allocator_timeout"; + private static final String REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY = "cluster.routing.allocation.shards_batch_gateway_allocator.replica_allocator_timeout"; + + private TimeValue primaryShardsBatchGatewayAllocatorTimeout; + private TimeValue replicaShardsBatchGatewayAllocatorTimeout; + /** * Number of shards we send in one batch to data nodes for fetching metadata */ @@ -81,6 +90,20 @@ public class ShardsBatchGatewayAllocator implements ExistingShardsAllocator { Setting.Property.NodeScope ); + public static final Setting PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( + PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, + TimeValue.timeValueSeconds(20), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + + public static final Setting REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING = Setting.timeSetting( + REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING_KEY, + TimeValue.timeValueSeconds(20), + Setting.Property.NodeScope, + Setting.Property.Dynamic + ); + private final RerouteService rerouteService; private final PrimaryShardBatchAllocator primaryShardBatchAllocator; private final ReplicaShardBatchAllocator replicaShardBatchAllocator; @@ -99,7 +122,8 @@ public ShardsBatchGatewayAllocator( RerouteService rerouteService, TransportNodesListGatewayStartedShardsBatch batchStartedAction, TransportNodesListShardStoreMetadataBatch batchStoreAction, - Settings settings + Settings settings, + ClusterSettings clusterSettings ) { this.rerouteService = rerouteService; this.primaryShardBatchAllocator = new InternalPrimaryBatchShardAllocator(); @@ -107,6 +131,16 @@ public ShardsBatchGatewayAllocator( this.batchStartedAction = batchStartedAction; this.batchStoreAction = batchStoreAction; this.maxBatchSize = GATEWAY_ALLOCATOR_BATCH_SIZE.get(settings); + this.primaryShardsBatchGatewayAllocatorTimeout = PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + PRIMARY_BATCH_ALLOCATOR_TIMEOUT_SETTING, + this::setPrimaryBatchAllocatorTimeout + ); + this.replicaShardsBatchGatewayAllocatorTimeout = REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING.get(settings); + clusterSettings.addSettingsUpdateConsumer( + REPLICA_BATCH_ALLOCATOR_TIMEOUT_SETTING, + this::setReplicaBatchAllocatorTimeout + ); } @Override @@ -129,7 +163,28 @@ protected ShardsBatchGatewayAllocator(long batchSize) { this.batchStoreAction = null; this.replicaShardBatchAllocator = null; this.maxBatchSize = batchSize; + this.primaryShardsBatchGatewayAllocatorTimeout = null; + this.replicaShardsBatchGatewayAllocatorTimeout = null; } + + @Override + public TimeValue getPrimaryBatchAllocatorTimeout() { + return this.primaryShardsBatchGatewayAllocatorTimeout; + } + + public void setPrimaryBatchAllocatorTimeout(TimeValue primaryShardsBatchGatewayAllocatorTimeout) { + this.primaryShardsBatchGatewayAllocatorTimeout = primaryShardsBatchGatewayAllocatorTimeout; + } + + @Override + public TimeValue getReplicaBatchAllocatorTimeout() { + return this.primaryShardsBatchGatewayAllocatorTimeout; + } + + public void setReplicaBatchAllocatorTimeout(TimeValue replicaShardsBatchGatewayAllocatorTimeout) { + this.replicaShardsBatchGatewayAllocatorTimeout = replicaShardsBatchGatewayAllocatorTimeout; + } + // for tests @Override @@ -215,9 +270,15 @@ protected List> innerAllocateUnassignedBatch( .forEach( shardsBatch -> runnables.add((timedOut) -> { if(timedOut) { + long startTime = System.nanoTime(); primaryBatchShardAllocator.allocateUnassignedBatchOnTimeout(shardsBatch.getBatchedShardRoutings(), allocation); + logger.info("Time taken to execute allocateUnassignedBatchOnTimeout for unassigned primary batch with id [{}], size : [{}] in this cycle:[{}ms]", + shardsBatch.batchId, shardsBatch.getBatchedShardRoutings().size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } else { + long startTime = System.nanoTime(); primaryBatchShardAllocator.allocateUnassignedBatch(shardsBatch.getBatchedShardRoutings(), allocation); + logger.info("Time taken to allocate unassigned primary batch with id [{}], size : [{}] in this cycle:[{}ms]", + shardsBatch.batchId, shardsBatch.getBatchedShardRoutings().size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } }) ); @@ -228,9 +289,15 @@ protected List> innerAllocateUnassignedBatch( .forEach( batch -> runnables.add((timedOut) -> { if(timedOut) { + long startTime = System.nanoTime(); replicaBatchShardAllocator.allocateUnassignedBatchOnTimeout(batch.getBatchedShardRoutings(), allocation); + logger.info("Time taken to execute allocateUnassignedBatchOnTimeout for unassigned replica batch with id [{}], size : [{}] in this cycle:[{}ms]", + batch.batchId, batch.getBatchedShardRoutings().size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } else { + long startTime = System.nanoTime(); replicaBatchShardAllocator.allocateUnassignedBatch(batch.getBatchedShardRoutings(), allocation); + logger.info("Time taken to allocate unassigned replica batch with id [{}], size : [{}] in this cycle:[{}ms]", + batch.batchId, batch.getBatchedShardRoutings().size(), TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - startTime)); } }) );