From 74babdab7fd5b72d5eec5bff4070532a9621b8a4 Mon Sep 17 00:00:00 2001 From: Matthias Pohl Date: Tue, 1 Aug 2023 15:44:25 +0200 Subject: [PATCH] [FLINK-32662][test] Fixes JobMasterTest#testRetrievingCheckpointStats with the AdaptiveScheduler The AdaptiveScheduler requires resources to be available before starting the job. The previous version of the test didn't provide these resources which left the scheduler in WaitingForResources state. That stage doesn't perform checkpoint creation for obvious reasons. --- .../runtime/jobmaster/JobMasterTest.java | 30 +++++++++++++++++-- 1 file changed, 27 insertions(+), 3 deletions(-) diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java index e9889970a6c5d..64d462a9090a6 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/jobmaster/JobMasterTest.java @@ -2105,7 +2105,9 @@ void testRetrievingCheckpointStats() throws Exception { // set savepoint settings final SavepointRestoreSettings savepointRestoreSettings = SavepointRestoreSettings.forPath(savepointFile.getAbsolutePath(), true); - final JobGraph jobGraph = createJobGraphWithCheckpointing(savepointRestoreSettings); + final int parallelism = 2; + final JobGraph jobGraph = + createJobGraphWithCheckpointing(parallelism, savepointRestoreSettings); final StandaloneCompletedCheckpointStore completedCheckpointStore = new StandaloneCompletedCheckpointStore(1); @@ -2123,6 +2125,24 @@ void testRetrievingCheckpointStats() throws Exception { // restore from the savepoint jobMaster.start(); + final JobMasterGateway jobMasterGateway = jobMaster.getGateway(); + + // AdaptiveScheduler-specific requirement: the AdaptiveScheduler triggers the + // ExecutionGraph creation only after it received the correct amount of slots + registerSlotsAtJobMaster( + parallelism, + jobMasterGateway, + jobGraph.getJobID(), + new TestingTaskExecutorGatewayBuilder() + .setAddress("firstTaskManager") + .createTestingTaskExecutorGateway(), + new LocalUnresolvedTaskManagerLocation()); + + CommonTestUtils.waitUntilCondition( + () -> + jobMasterGateway.requestJobStatus(testingTimeout).get() + == JobStatus.RUNNING); + CheckpointStatsSnapshot checkpointStatsSnapshot = jobMaster.getGateway().requestCheckpointStats(testingTimeout).get(); @@ -2280,12 +2300,16 @@ private File createSavepoint(long savepointId) throws IOException { savepointId); } - @Nonnull private JobGraph createJobGraphWithCheckpointing( SavepointRestoreSettings savepointRestoreSettings) { + return createJobGraphWithCheckpointing(1, savepointRestoreSettings); + } + + private JobGraph createJobGraphWithCheckpointing( + int parallelism, SavepointRestoreSettings savepointRestoreSettings) { final JobVertex source = new JobVertex("source"); source.setInvokableClass(NoOpInvokable.class); - source.setParallelism(1); + source.setParallelism(parallelism); return TestUtils.createJobGraphFromJobVerticesWithCheckpointing( savepointRestoreSettings, source);