Skip to content

Commit

Permalink
13486 tentative backpressure fix
Browse files Browse the repository at this point in the history
Signed-off-by: Oleg Mazurov <[email protected]>
  • Loading branch information
OlegMazurov committed May 28, 2024
1 parent 3d97a2b commit ed686e4
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

import com.swirlds.config.api.ConfigData;
import com.swirlds.config.api.ConfigProperty;
import java.time.Duration;

/**
* Basic configuration data record. This record contains all general config properties that can not be defined for a
Expand All @@ -36,4 +37,5 @@
@ConfigData
public record BasicCommonConfig(
@ConfigProperty(defaultValue = "true") boolean showInternalStats,
@ConfigProperty(defaultValue = "false") boolean verboseStatistics) {}
@ConfigProperty(defaultValue = "false") boolean verboseStatistics,
@ConfigProperty(defaultValue = "1ms") Duration backPressureSleepInterval) {}
Original file line number Diff line number Diff line change
Expand Up @@ -90,15 +90,15 @@ public boolean block() throws InterruptedException {
*/
@Override
public boolean isReleasable() {
final long resultingCount = count.incrementAndGet();
if (resultingCount <= capacity) {
// We didn't violate capacity by incrementing the count, so we're done.
return true;
} else {
// We may have violated capacity restrictions by incrementing the count.
// Decrement count and take the slow pathway.
count.decrementAndGet();
return false;
for (; ; ) {
final long resultingCount = count.get();
if (resultingCount < capacity) {
if (count.compareAndSet(resultingCount, resultingCount + 1)) {
return true;
}
} else {
return false;
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -77,16 +77,16 @@ public BackpressureObjectCounter(
*/
@Override
public void onRamp() {
final long resultingCount = count.incrementAndGet();
if (resultingCount <= capacity) {
// We didn't violate capacity by incrementing the count, so we're done.
return;
} else {
// We may have violated capacity restrictions by incrementing the count.
// Decrement count and take the slow pathway.
count.decrementAndGet();
for (; ; ) {
final long resultingCount = count.get();
if (resultingCount < capacity) {
if (count.compareAndSet(resultingCount, resultingCount + 1)) {
return;
}
} else {
break;
}
}

// Slow case. Capacity wasn't reserved, so we need to block.

while (true) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import static com.swirlds.common.wiring.schedulers.builders.TaskSchedulerType.NO_OP;
import static com.swirlds.logging.legacy.LogMarker.EXCEPTION;

import com.swirlds.common.config.BasicCommonConfig;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.wiring.counters.BackpressureObjectCounter;
import com.swirlds.common.wiring.counters.MultiObjectCounter;
Expand Down Expand Up @@ -68,7 +69,7 @@ public abstract class AbstractTaskSchedulerBuilder<OUT> implements TaskScheduler
protected boolean unhandledTaskMetricEnabled = false;
protected boolean busyFractionMetricEnabled = false;

protected Duration sleepDuration = Duration.ofNanos(100);
protected Duration sleepDuration;

protected final PlatformContext platformContext;

Expand Down Expand Up @@ -101,6 +102,10 @@ public AbstractTaskSchedulerBuilder(
}
this.name = name;
this.pool = Objects.requireNonNull(defaultPool);
sleepDuration = platformContext
.getConfiguration()
.getConfigData(BasicCommonConfig.class)
.backPressureSleepInterval();
}

/**
Expand Down

0 comments on commit ed686e4

Please sign in to comment.