Skip to content

Commit

Permalink
Merge branch 'main' into transforming_rank_to_retriever
Browse files Browse the repository at this point in the history
  • Loading branch information
elasticmachine authored Nov 6, 2024
2 parents 9d10872 + 8f24e8e commit fefabe6
Show file tree
Hide file tree
Showing 8 changed files with 345 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,16 @@ public void onNewInput(T input) {
}
}

/**
* enqueues {@code input} if {@code expectedLatestKnownInput} is the latest known input.
* Neither of the parameters can be null.
*/
protected boolean compareAndEnqueue(T expectedLatestKnownInput, T input) {
assert expectedLatestKnownInput != null;
assert input != null;
return enqueuedInput.compareAndSet(Objects.requireNonNull(expectedLatestKnownInput), Objects.requireNonNull(input));
}

/**
* @return {@code false} iff there are no active/enqueued computations
*/
Expand All @@ -67,7 +77,7 @@ protected boolean isFresh(T input) {
/**
* Process the given input.
*
* @param input the value that was last received by {@link #onNewInput} before invocation.
* @param input the value that was last received by {@link #onNewInput} or {@link #compareAndEnqueue} before invocation.
*/
protected abstract void processInput(T input);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,18 @@
public record DesiredBalance(
long lastConvergedIndex,
Map<ShardId, ShardAssignment> assignments,
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> weightsPerNode
Map<DiscoveryNode, DesiredBalanceMetrics.NodeWeightStats> weightsPerNode,
ComputationFinishReason finishReason
) {

enum ComputationFinishReason {
CONVERGED,
YIELD_TO_NEW_INPUT,
STOP_EARLY
}

public DesiredBalance(long lastConvergedIndex, Map<ShardId, ShardAssignment> assignments) {
this(lastConvergedIndex, assignments, Map.of());
this(lastConvergedIndex, assignments, Map.of(), ComputationFinishReason.CONVERGED);
}

public static final DesiredBalance INITIAL = new DesiredBalance(-1, Map.of());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@
import java.util.Set;
import java.util.TreeMap;
import java.util.TreeSet;
import java.util.function.LongSupplier;
import java.util.function.Predicate;

import static java.util.stream.Collectors.toUnmodifiableSet;
Expand All @@ -49,8 +50,8 @@ public class DesiredBalanceComputer {

private static final Logger logger = LogManager.getLogger(DesiredBalanceComputer.class);

private final ThreadPool threadPool;
private final ShardsAllocator delegateAllocator;
private final LongSupplier timeSupplierMillis;

// stats
protected final MeanMetric iterations = new MeanMetric();
Expand All @@ -63,12 +64,28 @@ public class DesiredBalanceComputer {
Setting.Property.NodeScope
);

public static final Setting<TimeValue> MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING = Setting.timeSetting(
"cluster.routing.allocation.desired_balance.max_balance_computation_time_during_index_creation",
TimeValue.timeValueSeconds(1),
Setting.Property.Dynamic,
Setting.Property.NodeScope
);

private TimeValue progressLogInterval;
private long maxBalanceComputationTimeDuringIndexCreationMillis;

public DesiredBalanceComputer(ClusterSettings clusterSettings, ThreadPool threadPool, ShardsAllocator delegateAllocator) {
this.threadPool = threadPool;
this(clusterSettings, delegateAllocator, threadPool::relativeTimeInMillis);
}

DesiredBalanceComputer(ClusterSettings clusterSettings, ShardsAllocator delegateAllocator, LongSupplier timeSupplierMillis) {
this.delegateAllocator = delegateAllocator;
this.timeSupplierMillis = timeSupplierMillis;
clusterSettings.initializeAndWatch(PROGRESS_LOG_INTERVAL_SETTING, value -> this.progressLogInterval = value);
clusterSettings.initializeAndWatch(
MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
value -> this.maxBalanceComputationTimeDuringIndexCreationMillis = value.millis()
);
}

public DesiredBalance compute(
Expand All @@ -77,7 +94,6 @@ public DesiredBalance compute(
Queue<List<MoveAllocationCommand>> pendingDesiredBalanceMoves,
Predicate<DesiredBalanceInput> isFresh
) {

if (logger.isTraceEnabled()) {
logger.trace(
"Recomputing desired balance for [{}]: {}, {}, {}, {}",
Expand All @@ -97,9 +113,10 @@ public DesiredBalance compute(
final var changes = routingAllocation.changes();
final var ignoredShards = getIgnoredShardsWithDiscardedAllocationStatus(desiredBalanceInput.ignoredShards());
final var clusterInfoSimulator = new ClusterInfoSimulator(routingAllocation);
DesiredBalance.ComputationFinishReason finishReason = DesiredBalance.ComputationFinishReason.CONVERGED;

if (routingNodes.size() == 0) {
return new DesiredBalance(desiredBalanceInput.index(), Map.of());
return new DesiredBalance(desiredBalanceInput.index(), Map.of(), Map.of(), finishReason);
}

// we assume that all ongoing recoveries will complete
Expand Down Expand Up @@ -263,11 +280,12 @@ public DesiredBalance compute(

final int iterationCountReportInterval = computeIterationCountReportInterval(routingAllocation);
final long timeWarningInterval = progressLogInterval.millis();
final long computationStartedTime = threadPool.relativeTimeInMillis();
final long computationStartedTime = timeSupplierMillis.getAsLong();
long nextReportTime = computationStartedTime + timeWarningInterval;

int i = 0;
boolean hasChanges = false;
boolean assignedNewlyCreatedPrimaryShards = false;
while (true) {
if (hasChanges) {
// Not the first iteration, so every remaining unassigned shard has been ignored, perhaps due to throttling. We must bring
Expand All @@ -293,6 +311,15 @@ public DesiredBalance compute(
for (final var shardRouting : routingNode) {
if (shardRouting.initializing()) {
hasChanges = true;
if (shardRouting.primary()
&& shardRouting.unassignedInfo() != null
&& shardRouting.unassignedInfo().reason() == UnassignedInfo.Reason.INDEX_CREATED) {
// TODO: we could include more cases that would cause early publishing of desired balance in case of a long
// computation. e.g.:
// - unassigned search replicas in case the shard has no assigned shard replicas
// - other reasons for an unassigned shard such as NEW_INDEX_RESTORED
assignedNewlyCreatedPrimaryShards = true;
}
clusterInfoSimulator.simulateShardStarted(shardRouting);
routingNodes.startShard(shardRouting, changes, 0L);
}
Expand All @@ -301,14 +328,14 @@ public DesiredBalance compute(

i++;
final int iterations = i;
final long currentTime = threadPool.relativeTimeInMillis();
final long currentTime = timeSupplierMillis.getAsLong();
final boolean reportByTime = nextReportTime <= currentTime;
final boolean reportByIterationCount = i % iterationCountReportInterval == 0;
if (reportByTime || reportByIterationCount) {
nextReportTime = currentTime + timeWarningInterval;
}

if (hasChanges == false) {
if (hasComputationConverged(hasChanges, i)) {
logger.debug(
"Desired balance computation for [{}] converged after [{}] and [{}] iterations",
desiredBalanceInput.index(),
Expand All @@ -324,9 +351,25 @@ public DesiredBalance compute(
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations as newer cluster state received. "
+ "Publishing intermediate desired balance and restarting computation",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i
);
finishReason = DesiredBalance.ComputationFinishReason.YIELD_TO_NEW_INPUT;
break;
}

if (assignedNewlyCreatedPrimaryShards
&& currentTime - computationStartedTime >= maxBalanceComputationTimeDuringIndexCreationMillis) {
logger.info(
"Desired balance computation for [{}] interrupted after [{}] and [{}] iterations "
+ "in order to not delay assignment of newly created index shards for more than [{}]. "
+ "Publishing intermediate desired balance and restarting computation",
desiredBalanceInput.index(),
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString(),
i,
TimeValue.timeValueMillis(currentTime - computationStartedTime).toString()
TimeValue.timeValueMillis(maxBalanceComputationTimeDuringIndexCreationMillis).toString()
);
finishReason = DesiredBalance.ComputationFinishReason.STOP_EARLY;
break;
}

Expand Down Expand Up @@ -368,7 +411,12 @@ public DesiredBalance compute(
}

long lastConvergedIndex = hasChanges ? previousDesiredBalance.lastConvergedIndex() : desiredBalanceInput.index();
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode());
return new DesiredBalance(lastConvergedIndex, assignments, routingNodes.getBalanceWeightStatsPerNode(), finishReason);
}

// visible for testing
boolean hasComputationConverged(boolean hasRoutingChanges, int currentIteration) {
return hasRoutingChanges == false;
}

private static Map<ShardId, ShardAssignment> collectShardAssignments(RoutingNodes routingNodes) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -136,7 +136,16 @@ protected void processInput(DesiredBalanceInput desiredBalanceInput) {
)
);
computationsExecuted.inc();
if (isFresh(desiredBalanceInput)) {

if (currentDesiredBalance.finishReason() == DesiredBalance.ComputationFinishReason.STOP_EARLY) {
logger.debug(
"Desired balance computation for [{}] terminated early with partial result, scheduling reconciliation",
index
);
submitReconcileTask(currentDesiredBalance);
var newInput = DesiredBalanceInput.create(indexGenerator.incrementAndGet(), desiredBalanceInput.routingAllocation());
desiredBalanceComputation.compareAndEnqueue(desiredBalanceInput, newInput);
} else if (isFresh(desiredBalanceInput)) {
logger.debug("Desired balance computation for [{}] is completed, scheduling reconciliation", index);
computationsConverged.inc();
submitReconcileTask(currentDesiredBalance);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,7 @@ public void apply(Settings value, Settings current, Settings previous) {
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MAX_WRITE_THREADS,
DataStreamAutoShardingService.CLUSTER_AUTO_SHARDING_MIN_WRITE_THREADS,
DesiredBalanceComputer.PROGRESS_LOG_INTERVAL_SETTING,
DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_INTERVAL_SETTING,
DesiredBalanceReconciler.UNDESIRED_ALLOCATIONS_LOG_THRESHOLD_SETTING,
BreakerSettings.CIRCUIT_BREAKER_LIMIT_SETTING,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.util.concurrent.CyclicBarrier;
import java.util.concurrent.Semaphore;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -73,6 +74,68 @@ protected void processInput(Integer input) {
assertTrue(Arrays.toString(valuePerThread) + " vs " + result.get(), Arrays.stream(valuePerThread).anyMatch(i -> i == result.get()));
}

public void testCompareAndEnqueue() throws Exception {
final var initialInput = new Object();
final var compareAndEnqueueCount = between(1, 10);
final var remaining = new AtomicInteger(compareAndEnqueueCount);
final var computationsExecuted = new AtomicInteger();
final var result = new AtomicReference<>();
final var computation = new ContinuousComputation<>(threadPool.generic()) {
@Override
protected void processInput(Object input) {
result.set(input);
if (remaining.decrementAndGet() >= 0) {
compareAndEnqueue(input, new Object());
}
computationsExecuted.incrementAndGet();
}
};
computation.onNewInput(initialInput);
assertBusy(() -> assertFalse(computation.isActive()));
assertNotEquals(result.get(), initialInput);
assertEquals(computationsExecuted.get(), 1 + compareAndEnqueueCount);
}

public void testCompareAndEnqueueSkipped() throws Exception {
final var barrier = new CyclicBarrier(2);
final var computationsExecuted = new AtomicInteger();
final var initialInput = new Object();
final var conditionalInput = new Object();
final var newInput = new Object();
final var submitConditional = new AtomicBoolean(true);
final var result = new AtomicReference<>();

final var computation = new ContinuousComputation<>(threadPool.generic()) {
@Override
protected void processInput(Object input) {
assertNotEquals(input, conditionalInput);
safeAwait(barrier); // start
safeAwait(barrier); // continue
if (submitConditional.getAndSet(false)) {
compareAndEnqueue(input, conditionalInput);
}
result.set(input);
safeAwait(barrier); // finished
computationsExecuted.incrementAndGet();
}
};
computation.onNewInput(initialInput);

safeAwait(barrier); // start
computation.onNewInput(newInput);
safeAwait(barrier); // continue
safeAwait(barrier); // finished
assertEquals(result.get(), initialInput);

safeAwait(barrier); // start
safeAwait(barrier); // continue
safeAwait(barrier); // finished

assertBusy(() -> assertFalse(computation.isActive()));
assertEquals(result.get(), newInput);
assertEquals(computationsExecuted.get(), 2);
}

public void testSkipsObsoleteValues() throws Exception {
final var barrier = new CyclicBarrier(2);
final Runnable await = () -> safeAwait(barrier);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1210,7 +1210,12 @@ private void checkIterationLogging(int iterations, long eachIterationDuration, M
var currentTime = new AtomicLong(0L);
when(mockThreadPool.relativeTimeInMillis()).thenAnswer(invocation -> currentTime.addAndGet(eachIterationDuration));

var desiredBalanceComputer = new DesiredBalanceComputer(createBuiltInClusterSettings(), mockThreadPool, new ShardsAllocator() {
// Some runs of this test try to simulate a long desired balance computation. Setting a high value on the following setting
// prevents interrupting a long computation.
var clusterSettings = createBuiltInClusterSettings(
Settings.builder().put(DesiredBalanceComputer.MAX_BALANCE_COMPUTATION_TIME_DURING_INDEX_CREATION_SETTING.getKey(), "2m").build()
);
var desiredBalanceComputer = new DesiredBalanceComputer(clusterSettings, mockThreadPool, new ShardsAllocator() {
@Override
public void allocate(RoutingAllocation allocation) {
final var unassignedIterator = allocation.routingNodes().unassigned().iterator();
Expand Down
Loading

0 comments on commit fefabe6

Please sign in to comment.