Skip to content

Commit

Permalink
[FLINK-32662][test] Fixes JobMasterTest#testRetrievingCheckpointStats…
Browse files Browse the repository at this point in the history
… with the AdaptiveScheduler

The AdaptiveScheduler requires resources to be available before starting the job. The previous
version of the test didn't provide these resources which left the scheduler in WaitingForResources
state. That stage doesn't perform checkpoint creation for obvious reasons.
  • Loading branch information
XComp committed Aug 2, 2023
1 parent 99b9833 commit 74babda
Showing 1 changed file with 27 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2105,7 +2105,9 @@ void testRetrievingCheckpointStats() throws Exception {
// set savepoint settings
final SavepointRestoreSettings savepointRestoreSettings =
SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true);
final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings);
final int parallelism = 2;
final JobGraph jobGraph =
createJobGraphWithCheckpointing(parallelism, savepointRestoreSettings);

final StandaloneCompletedCheckpointStore completedCheckpointStore =
new StandaloneCompletedCheckpointStore(1);
Expand All @@ -2123,6 +2125,24 @@ void testRetrievingCheckpointStats() throws Exception {
// restore from the savepoint
jobMaster.start();

final JobMasterGateway jobMasterGateway = jobMaster.getGateway();

// AdaptiveScheduler-specific requirement: the AdaptiveScheduler triggers the
// ExecutionGraph creation only after it received the correct amount of slots
registerSlotsAtJobMaster(
parallelism,
jobMasterGateway,
jobGraph.getJobID(),
new TestingTaskExecutorGatewayBuilder()
.setAddress("firstTaskManager")
.createTestingTaskExecutorGateway(),
new LocalUnresolvedTaskManagerLocation());

CommonTestUtils.waitUntilCondition(
() ->
jobMasterGateway.requestJobStatus(testingTimeout).get()
== JobStatus.RUNNING);

CheckpointStatsSnapshot checkpointStatsSnapshot =
jobMaster.getGateway().requestCheckpointStats(testingTimeout).get();

Expand Down Expand Up @@ -2280,12 +2300,16 @@ private File createSavepoint(long savepointId) throws IOException {
savepointId);
}

@Nonnull
private JobGraph createJobGraphWithCheckpointing(
SavepointRestoreSettings savepointRestoreSettings) {
return createJobGraphWithCheckpointing(1, savepointRestoreSettings);
}

private JobGraph createJobGraphWithCheckpointing(
int parallelism, SavepointRestoreSettings savepointRestoreSettings) {
final JobVertex source = new JobVertex("source");
source.setInvokableClass(NoOpInvokable.class);
source.setParallelism(1);
source.setParallelism(parallelism);

return TestUtils.createJobGraphFromJobVerticesWithCheckpointing(
savepointRestoreSettings, source);
Expand Down

0 comments on commit 74babda

Please sign in to comment.