diff --git a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt index c45c1d71b..9cf07f9b8 100644 --- a/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt +++ b/src/main/kotlin/org/opensearch/indexmanagement/rollup/model/Rollup.kt @@ -330,7 +330,7 @@ data class Rollup( // TODO: Make startTime public in Job Scheduler so we can just directly check the value if (seqNo == SequenceNumbers.UNASSIGNED_SEQ_NO || primaryTerm == SequenceNumbers.UNASSIGNED_PRIMARY_TERM) { if (schedule is IntervalSchedule) { - schedule = IntervalSchedule(Instant.now(), schedule.interval, schedule.unit, schedule.delay ?: 0) + schedule = IntervalSchedule(schedule.startTime, schedule.interval, schedule.unit, schedule.delay ?: 0) } } return Rollup( diff --git a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt index e34dd687e..65f18a1dd 100644 --- a/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt +++ b/src/test/kotlin/org/opensearch/indexmanagement/rollup/runner/RollupRunnerIT.kt @@ -639,7 +639,7 @@ class RollupRunnerIT : RollupRestTestCase() { // Tests that a continuous rollup will not be processed until the end of the interval plus delay passes fun `test delaying continuous execution`() { val indexName = "test_index_runner_eighth" - val delay: Long = 15000 + val delay: Long = 7_500 // Define rollup var rollup = randomRollup().copy( id = "$testName-4", @@ -663,40 +663,29 @@ class RollupRunnerIT : RollupRestTestCase() { putDateDocumentInSourceIndex(rollup) // Create rollup job - rollup = createRollup(rollup = rollup, rollupId = rollup.id) + val jobStartTime = Instant.now() + val rollupNow = rollup.copy( + jobSchedule = IntervalSchedule(jobStartTime, 1, ChronoUnit.MINUTES), + jobEnabledTime = jobStartTime + ) + rollup = createRollup(rollup = rollupNow, rollupId = rollupNow.id) - var nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli() - val expectedExecutionTime = rollup.jobEnabledTime!!.plusMillis(delay).toEpochMilli() - val delayIsCorrect = ((expectedExecutionTime - nextExecutionTime) > -500) && ((expectedExecutionTime - nextExecutionTime) < 500) - assertTrue("Delay was not correctly applied", delayIsCorrect) + val expectedFirstExecutionTime = rollup.jobSchedule.getNextExecutionTime(null).toEpochMilli() + assertTrue("The first job execution time should be equal [job start time] + [delay].", expectedFirstExecutionTime == jobStartTime.toEpochMilli() + delay) - waitFor { - // Wait until half a second before the intended execution time - assertTrue(Instant.now().toEpochMilli() >= nextExecutionTime - 500) - // Still should not have run at this point - assertFalse("Target rollup index was created before the delay should allow", indexExists(rollup.targetIndex)) - } - val rollupMetadata = waitFor { + waitFor() { assertTrue("Target rollup index was not created", indexExists(rollup.targetIndex)) val rollupJob = getRollup(rollupId = rollup.id) assertNotNull("Rollup job doesn't have metadata set", rollupJob.metadataID) val rollupMetadata = getRollupMetadata(rollupJob.metadataID!!) assertNotNull("Rollup metadata not found", rollupMetadata) - rollupMetadata } - nextExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli() - val nextExecutionOffset = (nextExecutionTime - Instant.now().toEpochMilli()) - 60000 - val nextExecutionIsCorrect = nextExecutionOffset < 5000 && nextExecutionOffset > -5000 - assertTrue("Next execution time not updated correctly", nextExecutionIsCorrect) - val nextWindowStartTime: Instant = rollupMetadata.continuous!!.nextWindowStartTime - val nextWindowEndTime: Instant = rollupMetadata.continuous!!.nextWindowEndTime - // Assert that after the window was updated, it falls approximately around 'now' - assertTrue("Rollup window start time is incorrect", nextWindowStartTime.plusMillis(delay).minusMillis(1000) < Instant.now()) - assertTrue("Rollup window end time is incorrect", nextWindowEndTime.plusMillis(delay).plusMillis(1000) > Instant.now()) - - // window length should be 5 seconds - val expectedWindowEnd = nextWindowStartTime.plusMillis(5000) - assertEquals("Rollup window length applied incorrectly", expectedWindowEnd, nextWindowEndTime) + + val now = Instant.now().toEpochMilli() + assertTrue("The first job execution must happen after [job start time] + [delay]", now > jobStartTime.toEpochMilli() + delay) + + val secondExecutionTime = rollup.schedule.getNextExecutionTime(null).toEpochMilli() + assertTrue("The second job execution time should be not earlier than a minute after the first execution.", secondExecutionTime - expectedFirstExecutionTime == 60_000L) } fun `test non continuous delay does nothing`() {