Skip to content

Commit

Permalink
[FLINK-24402][metric] Add a metric for back-pressure from the Changel…
Browse files Browse the repository at this point in the history
…ogStateBackend
  • Loading branch information
leiyanfei authored and pnowojski committed Jul 17, 2023
1 parent 5cacd12 commit bcdbc51
Show file tree
Hide file tree
Showing 6 changed files with 81 additions and 7 deletions.
7 changes: 6 additions & 1 deletion docs/content.zh/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1632,7 +1632,7 @@ Note that the metrics are only available via reporters.
<td>Histogram</td>
</tr>
<tr>
<th rowspan="23"><strong>Task</strong></th>
<th rowspan="24"><strong>Task</strong></th>
<td>numBytesInLocal</td>
<td><span class="label label-danger">Attention:</span> deprecated, use <a href="{{< ref "docs/ops/metrics" >}}#default-shuffle-service">Default shuffle service metrics</a>.</td>
<td>Counter</td>
Expand Down Expand Up @@ -1732,6 +1732,11 @@ Note that the metrics are only available via reporters.
<td>Maximum recorded duration of a single consecutive period of the task being in the hard back pressure state in the last sampling period. Please check softBackPressuredTimeMsPerSecond and hardBackPressuredTimeMsPerSecond for more information.</td>
<td>Gauge</td>
</tr>
<tr>
<td>changelogBusyTimeMsPerSecond</td>
<td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'dstl.dfs.upload.max-in-flight' for more information.</td>
<td>Gauge</td>
</tr>
<tr>
<td>mailboxMailsPerSecond</td>
<td>The number of actions processed from the task's mailbox per second which includes all actions, e.g., checkpointing, timer, or cancellation actions.</td>
Expand Down
7 changes: 6 additions & 1 deletion docs/content/docs/ops/metrics.md
Original file line number Diff line number Diff line change
Expand Up @@ -1617,7 +1617,7 @@ Note that the metrics are only available via reporters.
<td>Histogram</td>
</tr>
<tr>
<th rowspan="23"><strong>Task</strong></th>
<th rowspan="24"><strong>Task</strong></th>
<td>numBytesInLocal</td>
<td><span class="label label-danger">Attention:</span> deprecated, use <a href="{{< ref "docs/ops/metrics" >}}#default-shuffle-service">Default shuffle service metrics</a>.</td>
<td>Counter</td>
Expand Down Expand Up @@ -1717,6 +1717,11 @@ Note that the metrics are only available via reporters.
<td>Maximum recorded duration of a single consecutive period of the task being in the hard back pressure state in the last sampling period. Please check softBackPressuredTimeMsPerSecond and hardBackPressuredTimeMsPerSecond for more information.</td>
<td>Gauge</td>
</tr>
<tr>
<td>changelogBusyTimeMsPerSecond</td>
<td>The time (in milliseconds) taken by the Changelog state backend to do IO operations, only positive when Changelog state backend is enabled. Please check 'dstl.dfs.upload.max-in-flight' for more information.</td>
<td>Gauge</td>
</tr>
<tr>
<td>mailboxMailsPerSecond</td>
<td>The number of actions processed from the task's mailbox per second which includes all actions, e.g., checkpointing, timer, or cancellation actions.</td>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ public static String currentInputWatermarkName(int index) {
"softBackPressuredTimeMs" + SUFFIX_RATE;
public static final String TASK_HARD_BACK_PRESSURED_TIME =
"hardBackPressuredTimeMs" + SUFFIX_RATE;
public static final String CHANGELOG_BUSY_TIME = "changelogBusyTimeMs" + SUFFIX_RATE;
public static final String TASK_MAX_SOFT_BACK_PRESSURED_TIME = "maxSoftBackPressureTimeMs";
public static final String TASK_MAX_HARD_BACK_PRESSURED_TIME = "maxHardBackPressureTimeMs";

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@

package org.apache.flink.runtime.metrics.groups;

import org.apache.flink.annotation.VisibleForTesting;
import org.apache.flink.metrics.Counter;
import org.apache.flink.metrics.Gauge;
import org.apache.flink.metrics.Histogram;
Expand Down Expand Up @@ -59,6 +60,7 @@ public class TaskIOMetricGroup extends ProxyMetricGroup<TaskMetricGroup> {
private final Gauge<Long> backPressuredTimePerSecond;
private final TimerGauge softBackPressuredTimePerSecond;
private final TimerGauge hardBackPressuredTimePerSecond;
private final TimerGauge changelogBusyTimeMsPerSecond;
private final Gauge<Long> maxSoftBackPressuredTime;
private final Gauge<Long> maxHardBackPressuredTime;
private final Gauge<Long> accumulatedBackPressuredTime;
Expand Down Expand Up @@ -113,6 +115,9 @@ public TaskIOMetricGroup(TaskMetricGroup parent) {

this.busyTimePerSecond = gauge(MetricNames.TASK_BUSY_TIME, this::getBusyTimePerSecond);

this.changelogBusyTimeMsPerSecond =
gauge(MetricNames.CHANGELOG_BUSY_TIME, new TimerGauge());

this.accumulatedBusyTime =
gauge(MetricNames.ACC_TASK_BUSY_TIME, this::getAccumulatedBusyTime);
this.accumulatedBackPressuredTime =
Expand Down Expand Up @@ -182,6 +187,10 @@ public TimerGauge getHardBackPressuredTimePerSecond() {
return hardBackPressuredTimePerSecond;
}

public TimerGauge getChangelogBusyTimeMsPerSecond() {
return changelogBusyTimeMsPerSecond;
}

public long getBackPressuredTimeMsPerSecond() {
return getSoftBackPressuredTimePerSecond().getValue()
+ getHardBackPressuredTimePerSecond().getValue();
Expand All @@ -200,12 +209,14 @@ public void setEnableBusyTime(boolean enabled) {
busyTimeEnabled = enabled;
}

private double getBusyTimePerSecond() {
@VisibleForTesting
double getBusyTimePerSecond() {
double busyTime = idleTimePerSecond.getValue() + getBackPressuredTimeMsPerSecond();
return busyTimeEnabled ? 1000.0 - Math.min(busyTime, 1000.0) : Double.NaN;
}

private double getAccumulatedBusyTime() {
@VisibleForTesting
double getAccumulatedBusyTime() {
return busyTimeEnabled
? Math.max(
System.currentTimeMillis()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,11 @@ void testTaskIOMetricGroup() throws InterruptedException {
Thread.sleep(hardSleepTime);
taskIO.getHardBackPressuredTimePerSecond().markEnd();

long ioSleepTime = 3L;
taskIO.getChangelogBusyTimeMsPerSecond().markStart();
Thread.sleep(ioSleepTime);
taskIO.getChangelogBusyTimeMsPerSecond().markEnd();

IOMetrics io = taskIO.createSnapshot();
assertThat(io.getNumRecordsIn()).isEqualTo(32L);
assertThat(io.getNumRecordsOut()).isEqualTo(64L);
Expand All @@ -95,6 +100,54 @@ void testTaskIOMetricGroup() throws InterruptedException {
.isGreaterThanOrEqualTo(softSleepTime);
assertThat(taskIO.getHardBackPressuredTimePerSecond().getCount())
.isGreaterThanOrEqualTo(hardSleepTime);
assertThat(taskIO.getChangelogBusyTimeMsPerSecond().getCount())
.isGreaterThanOrEqualTo(ioSleepTime);
}

@Test
void testConsistencyOfTime() throws InterruptedException {
TaskMetricGroup task = UnregisteredMetricGroups.createUnregisteredTaskMetricGroup();
TaskIOMetricGroup taskIO = task.getIOMetricGroup();
taskIO.setEnableBusyTime(true);
taskIO.markTaskStart();
final long startTime = System.currentTimeMillis();
long softBackpressureTime = 100L;
taskIO.getSoftBackPressuredTimePerSecond().markStart();
Thread.sleep(softBackpressureTime);
taskIO.getSoftBackPressuredTimePerSecond().markEnd();
assertThat(taskIO.getSoftBackPressuredTimePerSecond().getAccumulatedCount())
.isGreaterThanOrEqualTo(softBackpressureTime);

long hardBackpressureTime = 200L;
taskIO.getHardBackPressuredTimePerSecond().markStart();
Thread.sleep(hardBackpressureTime);
taskIO.getHardBackPressuredTimePerSecond().markEnd();
assertThat(taskIO.getHardBackPressuredTimePerSecond().getAccumulatedCount())
.isGreaterThanOrEqualTo(hardBackpressureTime);

long changelogBusyTime = 300L;
taskIO.getChangelogBusyTimeMsPerSecond().markStart();
Thread.sleep(changelogBusyTime);
taskIO.getChangelogBusyTimeMsPerSecond().markEnd();
assertThat(taskIO.getChangelogBusyTimeMsPerSecond().getAccumulatedCount())
.isGreaterThanOrEqualTo(changelogBusyTime);

long idleTime = 200L;
taskIO.getIdleTimeMsPerSecond().markStart();
Thread.sleep(idleTime);
taskIO.getIdleTimeMsPerSecond().markEnd();
assertThat(taskIO.getIdleTimeMsPerSecond().getAccumulatedCount())
.isGreaterThanOrEqualTo(idleTime);
long totalDuration = System.currentTimeMillis() - startTime;

// Theoretically: busy time = total time - idle time - backpressure time.
// For the robustness, let the error be within 10ms.
assertThat(
totalDuration
- (taskIO.getAccumulatedBusyTime()
+ taskIO.getAccumulatedBackPressuredTimeMs()
+ taskIO.getIdleTimeMsPerSecond().getAccumulatedCount()))
.isLessThan(10);
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -597,9 +597,8 @@ protected void processInput(MailboxDefaultAction.Controller controller) throws E
timer = new GaugePeriodTimer(ioMetrics.getIdleTimeMsPerSecond());
resumeFuture = inputProcessor.getAvailableFuture();
} else if (changelogWriterAvailabilityProvider != null) {
// currently, waiting for changelog availability is reported as busy
// todo: add new metric (FLINK-24402)
timer = null;
// waiting for changelog availability is reported as busy
timer = new GaugePeriodTimer(ioMetrics.getChangelogBusyTimeMsPerSecond());
resumeFuture = changelogWriterAvailabilityProvider.getAvailableFuture();
} else {
// data availability has changed in the meantime; retry immediately
Expand Down

0 comments on commit bcdbc51

Please sign in to comment.