diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskReschedulingIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskReschedulingIntTest.java new file mode 100644 index 00000000..495ffd46 --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskReschedulingIntTest.java @@ -0,0 +1,191 @@ +package com.transferwise.tasks.testapp; + +import static com.transferwise.tasks.domain.TaskStatus.WAITING; +import static org.awaitility.Awaitility.await; +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import com.transferwise.common.baseutils.UuidUtils; +import com.transferwise.tasks.BaseIntTest; +import com.transferwise.tasks.ITaskDataSerializer; +import com.transferwise.tasks.ITasksService; +import com.transferwise.tasks.ITasksService.RescheduleTaskResponse.Result; +import com.transferwise.tasks.dao.ITaskDao; +import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskStatus; +import com.transferwise.tasks.management.ITasksManagementService; +import com.transferwise.tasks.test.ITestTasksService; +import io.micrometer.core.instrument.Counter; +import java.time.ZonedDateTime; +import java.util.List; +import java.util.UUID; +import lombok.extern.slf4j.Slf4j; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.params.ParameterizedTest; +import org.junit.jupiter.params.provider.EnumSource; +import org.springframework.beans.factory.annotation.Autowired; + +@Slf4j +public class TaskReschedulingIntTest extends BaseIntTest { + + @Autowired + private ITasksService tasksService; + @Autowired + private ITestTasksService testTasksService; + @Autowired + private ITaskDataSerializer taskDataSerializer; + @Autowired + private ITasksManagementService tasksManagementService; + @Autowired + private ITaskDao taskDao; + + @BeforeEach + void setup() { + transactionsHelper.withTransaction().asNew().call(() -> { + testTasksService.reset(); + return null; + }); + } + + @Test + void taskCanBeSuccessfullyRescheduled() { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I want to be rescheduled")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + + var task = tasksManagementService.getTasksById( + new ITasksManagementService.GetTasksByIdRequest().setTaskIds(List.of(taskId)) + ).getTasks().stream().filter(t -> t.getTaskVersionId().getId().equals(taskId)).findFirst().orElseThrow(); + + assertTrue(transactionsHelper.withTransaction().asNew().call(() -> + tasksService.rescheduleTask( + new ITasksService.RescheduleTaskRequest() + .setTaskId(taskId) + .setVersion(task.getTaskVersionId().getVersion()) + .setRunAfterTime(ZonedDateTime.now().minusHours(1)) + ).getResult() == Result.OK + )); + + await().until(() -> testTasksService.getTasks("test", null, WAITING).isEmpty()); + await().until(() -> resultRegisteringSyncTaskProcessor.getTaskResults().get(taskId) != null); + assertEquals(0, getFailedNextEventTimeChangeCount()); + assertEquals(1, getTaskRescheduledCount()); + } + + @Test + void taskWillNotBeRescheduleIfVersionHasAlreadyChanged() { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + final long initialFailedNextEventTimeChangeCount = getFailedNextEventTimeChangeCount(); + final UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I want to be rescheduled too!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(1))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + + var task = tasksManagementService.getTasksById( + new ITasksManagementService.GetTasksByIdRequest().setTaskIds(List.of(taskId)) + ).getTasks().stream().filter(t -> t.getTaskVersionId().getId().equals(taskId)).findFirst().orElseThrow(); + + assertFalse( + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.rescheduleTask( + new ITasksService.RescheduleTaskRequest() + .setTaskId(taskId) + .setVersion(task.getTaskVersionId().getVersion() - 1) + .setRunAfterTime(ZonedDateTime.now().plusHours(2)) + ).getResult() == Result.OK + ) + ); + assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedNextEventTimeChangeCount()); + assertEquals(0, getTaskRescheduledCount()); + } + + @ParameterizedTest + @EnumSource(value = TaskStatus.class, + names = {"WAITING", "UNKNOWN"}, + mode = EnumSource.Mode.EXCLUDE) + void taskWillNotBeRescheduleIfNotWaiting(TaskStatus status) { + testTaskHandlerAdapter.setProcessor(resultRegisteringSyncTaskProcessor); + final long initialFailedNextEventTimeChangeCount = getFailedNextEventTimeChangeCount(); + final UUID taskId = UuidUtils.generatePrefixCombUuid(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.addTask(new ITasksService.AddTaskRequest() + .setTaskId(taskId) + .setData(taskDataSerializer.serialize("I do not want to be rescheduled!")) + .setType("test").setRunAfterTime(ZonedDateTime.now().plusHours(2))) + ); + + await().until(() -> !testTasksService.getWaitingTasks("test", null).isEmpty()); + List tasks = testTasksService.getWaitingTasks("test", null); + Task task = tasks.stream().filter(t -> t.getId().equals(taskId)).findFirst().orElseThrow(); + + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.resumeTask(new ITasksService.ResumeTaskRequest().setTaskId(taskId).setVersion(task.getVersion())) + ); + + await().until(() -> testTasksService.getWaitingTasks("test", null).isEmpty()); + + var updateTask = tasksManagementService.getTasksById( + new ITasksManagementService.GetTasksByIdRequest().setTaskIds(List.of(taskId)) + ).getTasks().stream().filter(t -> t.getTaskVersionId().getId().equals(taskId)).findFirst().orElseThrow(); + + taskDao.setStatus(taskId, status, updateTask.getTaskVersionId().getVersion()); + + var finalTask = tasksManagementService.getTasksById( + new ITasksManagementService.GetTasksByIdRequest().setTaskIds(List.of(taskId)) + ).getTasks().stream().filter(t -> t.getTaskVersionId().getId().equals(taskId)).findFirst().orElseThrow(); + + assertFalse( + transactionsHelper.withTransaction().asNew().call(() -> + tasksService.rescheduleTask( + new ITasksService.RescheduleTaskRequest() + .setTaskId(taskId) + .setVersion(finalTask.getTaskVersionId().getVersion()) + .setRunAfterTime(ZonedDateTime.now().plusHours(2)) + ).getResult() == Result.OK + ) + ); + assertEquals(initialFailedNextEventTimeChangeCount + 1, getFailedNextEventTimeChangeCount()); + assertEquals(0, getTaskRescheduledCount()); + } + + private long getFailedNextEventTimeChangeCount() { + Counter counter = meterRegistry.find("twTasks.tasks.failedNextEventTimeChangeCount").tags( + "taskType", "test" + ).counter(); + + if (counter == null) { + return 0; + } else { + return (long) counter.count(); + } + } + + private long getTaskRescheduledCount() { + Counter counter = meterRegistry.find("twTasks.tasks.rescheduledCount").tags( + "taskType", "test" + ).counter(); + + if (counter == null) { + return 0; + } else { + return (long) counter.count(); + } + } +} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index 5ce12be2..7ba9a118 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -83,6 +83,32 @@ class ResumeTaskRequest { private boolean force; } + /** + * Reschedules a task in WAITING state. It is useful, when you want to change the next time the task is executed. + * + *

If the task in another state NOT_ALLOWED would be returned. + */ + RescheduleTaskResponse rescheduleTask(RescheduleTaskRequest request); + + @Data + @Accessors(chain = true) + class RescheduleTaskRequest { + private UUID taskId; + private long version; + private ZonedDateTime runAfterTime; + } + + @Data + @Accessors(chain = true) + class RescheduleTaskResponse { + private UUID taskId; + private Result result; + + public enum Result { + OK, NOT_FOUND, NOT_ALLOWED, FAILED + } + } + void startTasksProcessing(String bucketId); Future stopTasksProcessing(String bucketId); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 79a0fd3d..3415fdcc 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -3,9 +3,11 @@ import com.transferwise.common.context.TwContextClockHolder; import com.transferwise.common.context.UnitOfWorkManager; import com.transferwise.common.gracefulshutdown.GracefulShutdownStrategy; +import com.transferwise.tasks.ITasksService.RescheduleTaskResponse.Result; import com.transferwise.tasks.dao.ITaskDao; import com.transferwise.tasks.domain.BaseTask; import com.transferwise.tasks.domain.BaseTask1; +import com.transferwise.tasks.domain.FullTaskRecord; import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.entrypoints.EntryPoint; import com.transferwise.tasks.entrypoints.EntryPointsGroups; @@ -189,6 +191,47 @@ public boolean resumeTask(ResumeTaskRequest request) { }); } + @Override + @EntryPoint(usesExisting = true) + @Transactional(rollbackFor = Exception.class) + public RescheduleTaskResponse rescheduleTask(RescheduleTaskRequest request) { + return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.RESCHEDULE_TASK, + () -> { + UUID taskId = request.getTaskId(); + mdcService.put(request.getTaskId(), request.getVersion()); + + FullTaskRecord task = taskDao.getTask(taskId, FullTaskRecord.class); + + if (task == null) { + log.debug("Cannot reschedule task '" + taskId + "' as it was not found."); + return new RescheduleTaskResponse().setResult(Result.NOT_FOUND).setTaskId(taskId); + } + + mdcService.put(task); + + long version = task.getVersion(); + + if (version != request.getVersion()) { + coreMetricsTemplate.registerFailedNextEventTimeChange(task.getType(), task.getNextEventTime(), request.getRunAfterTime()); + log.debug("Expected version " + request.getVersion() + " does not match " + version + "."); + return new RescheduleTaskResponse().setResult(Result.NOT_FOUND).setTaskId(taskId); + } + + if (task.getStatus().equals(TaskStatus.WAITING.name())) { + if (!taskDao.setNextEventTime(taskId, request.getRunAfterTime(), version, TaskStatus.WAITING.name())) { + coreMetricsTemplate.registerFailedNextEventTimeChange(task.getType(), task.getNextEventTime(), request.getRunAfterTime()); + return new RescheduleTaskResponse().setResult(RescheduleTaskResponse.Result.FAILED).setTaskId(taskId); + } else { + coreMetricsTemplate.registerTaskRescheduled(null, task.getType()); + return new RescheduleTaskResponse().setResult(RescheduleTaskResponse.Result.OK).setTaskId(taskId); + } + } + + coreMetricsTemplate.registerFailedNextEventTimeChange(task.getType(), task.getNextEventTime(), request.getRunAfterTime()); + return new RescheduleTaskResponse().setResult(RescheduleTaskResponse.Result.NOT_ALLOWED).setTaskId(taskId); + }); + } + @Override public void startTasksProcessing(String bucketId) { tasksExecutionTriggerer.startTasksProcessing(bucketId); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java index e4670898..c5470d9c 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java @@ -116,6 +116,8 @@ class DeleteFinishedOldTasksResult { boolean setStatus(UUID taskId, TaskStatus status, long version); + boolean setNextEventTime(UUID taskId, ZonedDateTime nextEventTime, long version, String state); + boolean markAsSubmitted(UUID taskId, long version, ZonedDateTime maxStuckTime); Long getTaskVersion(UUID id); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java index 17c81f80..558460b7 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java @@ -99,6 +99,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String grabForProcessingWithStatusAssertionSql; protected String grabForProcessingSql; protected String setStatusSql; + protected String setNextEventTimeSql; protected String getStuckTasksSql; protected String prepareStuckOnProcessingTaskForResumingSql; protected String prepareStuckOnProcessingTaskForResumingSql1; @@ -157,6 +158,8 @@ public void afterPropertiesSet() { + ",processing_start_time=?,next_event_time=?,processing_tries_count=processing_tries_count+1" + ",state_time=?,time_updated=?,version=? where id=? and version=?"; setStatusSql = "update " + taskTable + " set status=?,next_event_time=?,state_time=?,time_updated=?,version=? where id=? and version=?"; + setNextEventTimeSql = "update " + taskTable + + " set next_event_time=?,state_time=?,time_updated=?,version=? where id=? and version=? and status=?"; getStuckTasksSql = "select id,version,type,priority,status from " + taskTable + " where status=?" + " and next_event_time