From cc24ecaf2e6ead80d0bcdaf9db9c4a870e8338e7 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Fri, 9 Aug 2024 00:55:58 +0300 Subject: [PATCH] jambi --- .../tasks/testapp/TaskDataIntTest.java | 69 ++++++++++++++++++ .../testapp/TaskInterceptionIntTest.java | 28 ++++++++ .../testapp/config/TestConfiguration.java | 14 ++++ .../testbeans/JambiTaskInterceptor.java | 25 +++++++ .../JambiTaskRegistrationDecorator.java | 19 +++++ .../com/transferwise/tasks/ITasksService.java | 6 ++ .../com/transferwise/tasks/TasksService.java | 25 +++++-- .../com/transferwise/tasks/dao/ITaskDao.java | 2 + .../transferwise/tasks/dao/JdbcTaskDao.java | 59 ++++++++++----- .../tasks/domain/FullTaskRecord.java | 4 +- .../com/transferwise/tasks/domain/Task.java | 2 + .../tasks/domain/TaskContext.java | 28 ++++++++ .../ITaskRegistrationDecorator.java | 8 +++ .../db/changelog/db.tw-tasks-mysql.xml | 60 +++++++++------- .../db/changelog/db.tw-tasks-postgres.xml | 71 ++++++++++--------- 15 files changed, 337 insertions(+), 83 deletions(-) create mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java create mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java create mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java create mode 100644 tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java create mode 100644 tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java index 60c14ba8..45a6e194 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java @@ -1,30 +1,40 @@ package com.transferwise.tasks.testapp; +import static com.transferwise.tasks.dao.JdbcTaskDao.NULL_BLOB; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.transferwise.tasks.BaseIntTest; import com.transferwise.tasks.CompressionAlgorithm; +import com.transferwise.tasks.ITaskDataSerializer; import com.transferwise.tasks.ITasksService.AddTaskRequest; import com.transferwise.tasks.ITasksService.AddTaskRequest.CompressionRequest; import com.transferwise.tasks.ITasksService.AddTaskResponse; +import com.transferwise.tasks.TaskDataSerializer; import com.transferwise.tasks.TasksProperties; import com.transferwise.tasks.dao.ITaskDao; +import com.transferwise.tasks.dao.ITaskDaoDataSerializer; import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData; import com.transferwise.tasks.dao.ITaskSqlMapper; import com.transferwise.tasks.dao.MySqlTaskTypesMapper; import com.transferwise.tasks.domain.FullTaskRecord; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.test.ITestTasksService; import com.transferwise.tasks.test.dao.ITestTaskDao; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.UUID; import java.util.stream.Stream; +import lombok.SneakyThrows; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; @@ -43,6 +53,10 @@ public class TaskDataIntTest extends BaseIntTest { private TasksProperties tasksProperties; @Autowired private JdbcTemplate jdbcTemplate; + @Autowired + private ObjectMapper objectMapper; + @Autowired + private ITaskDaoDataSerializer taskDataSerializer; private CompressionAlgorithm originalAlgorithm; private int originalMinSize; @@ -137,4 +151,59 @@ void oldDataFieldIsNotSetForNewTasks() { assertThat(oldData).isEqualTo(""); } + @ParameterizedTest + @MethodSource("providedAlgorithms") + @SneakyThrows + void testWritingTaskContextData() { + testTasksService.stopProcessing(); + + String data = "Hello World!"; + var context = new TaskContext().setContextMap(Map.of("adam-jones", "jambi")); + AddTaskRequest addTaskRequest = new AddTaskRequest() + .setType("test_data") + .setData(data.getBytes(StandardCharsets.UTF_8)) + .setTaskContext(context); + AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest); + UUID taskId = addTaskResponse.getTaskId(); + + FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class); + assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8)); + + byte[] contextBlob = jdbcTemplate + .queryForObject("select task_context from tw_task_data where task_id=?", byte[].class, taskSqlMapper.uuidToSqlTaskId(taskId)); + + TaskContext contextData = objectMapper.readValue(contextBlob, TaskContext.class); + assertEquals(Map.of("adam-jones", "jambi"), contextData.getContextMap()); + + } + + + @ParameterizedTest + @MethodSource("providedAlgorithms") + void testReadingTaskContextData(String algorithm) { + testTasksService.stopProcessing(); + + String data = "Hello World!"; + var context = new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")); + AddTaskRequest addTaskRequest = new AddTaskRequest() + .setType("test_data") + .setData(data.getBytes(StandardCharsets.UTF_8)) + .setTaskContext(context) + .setCompression(algorithm == null ? null : new CompressionRequest().setAlgorithm(CompressionAlgorithm.valueOf(algorithm))); + AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest); + UUID taskId = addTaskResponse.getTaskId(); + + FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class); + assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8)); + assertEquals(Map.of("danny-carey", "the-grudge"), fullTaskRecord.getTaskContext().getContextMap()); + + Task taskRecord = taskDao.getTask(taskId, Task.class); + assertEquals(new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")), taskRecord.getTaskContext()); + + } + + private static Stream providedAlgorithms() { + return Stream.of("GZIP", "LZ4", null, "NONE"); + } + } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java new file mode 100644 index 00000000..160971eb --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java @@ -0,0 +1,28 @@ +package com.transferwise.tasks.testapp; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.transferwise.common.baseutils.UuidUtils; +import com.transferwise.tasks.BaseIntTest; +import com.transferwise.tasks.ITasksService.AddTaskRequest; +import com.transferwise.tasks.TasksService; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +public class TaskInterceptionIntTest extends BaseIntTest { + + @Autowired + private TasksService tasksService; + + @Test + void jambiTaskIsInterceptedCorrectly() { + testTasksService.resetAndDeleteTasksWithTypes("test"); + tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi")); + tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi")); + Awaitility.await().until(() -> testTasksService.getFinishedTasks("test", "Jambi").size() == 2); + assertEquals(2, meterRegistry.counter("tool", "song", "eulogy").count()); + + } + +} diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java index 5e42c05b..2b8c4136 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java @@ -9,7 +9,11 @@ import com.transferwise.tasks.helpers.kafka.messagetotask.IKafkaMessageHandler; import com.transferwise.tasks.impl.jobs.interfaces.IJob; import com.transferwise.tasks.processing.ITaskProcessingInterceptor; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; +import com.transferwise.tasks.testapp.testbeans.JambiTaskInterceptor; +import com.transferwise.tasks.testapp.testbeans.JambiTaskRegistrationDecorator; import com.transferwise.tasks.utils.LogUtils; +import io.micrometer.core.instrument.MeterRegistry; import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collections; @@ -136,4 +140,14 @@ public IKafkaListenerConsumerPropertiesProvider twTasksKafkaListenerSpringKafkaC return props; }; } + + @Bean + ITaskRegistrationDecorator jambiRegistrationInterceptor() { + return new JambiTaskRegistrationDecorator(); + } + + @Bean + ITaskProcessingInterceptor jambiProcessingInterceptor(MeterRegistry meterRegistry) { + return new JambiTaskInterceptor(meterRegistry); + } } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java new file mode 100644 index 00000000..36945087 --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java @@ -0,0 +1,25 @@ +package com.transferwise.tasks.testapp.testbeans; + +import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.processing.ITaskProcessingInterceptor; +import io.micrometer.core.instrument.MeterRegistry; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class JambiTaskInterceptor implements ITaskProcessingInterceptor { + + private final MeterRegistry meterRegistry; + + @Override + public void doProcess(Task task, Runnable processor) { + if ("Jambi".equals(task.getSubType())) { + if (task.getTaskContext() != null) { + var name = task.getTaskContext().get("adam-jones", String.class); + meterRegistry.counter("tool", "song", name).increment(); + } + } + processor.run(); + } +} diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java new file mode 100644 index 00000000..59faa5bc --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java @@ -0,0 +1,19 @@ +package com.transferwise.tasks.testapp.testbeans; + +import com.transferwise.tasks.ITasksService.AddTaskRequest; +import com.transferwise.tasks.domain.TaskContext; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class JambiTaskRegistrationDecorator implements ITaskRegistrationDecorator { + + @Override + public AddTaskRequest intercept(AddTaskRequest request) { + if ("Jambi".equals(request.getSubType())) { + return request.setTaskContext(new TaskContext().setContextMap(Map.of("adam-jones", "eulogy"))); + } + return request; + } +} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index dce558f7..0515004c 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -1,6 +1,7 @@ package com.transferwise.tasks; import com.transferwise.tasks.ITasksService.AddTaskResponse.Result; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Duration; @@ -45,6 +46,7 @@ class AddTaskRequest { private boolean warnWhenTaskExists; private Duration expectedQueueTime; private CompressionRequest compression; + private TaskContext taskContext; @Data @Accessors(chain = true) @@ -95,6 +97,7 @@ class ResumeTaskRequest { @Data @Accessors(chain = true) class RescheduleTaskRequest { + private UUID taskId; private long version; private ZonedDateTime runAfterTime; @@ -103,6 +106,7 @@ class RescheduleTaskRequest { @Data @Accessors(chain = true) class RescheduleTaskResponse { + private UUID taskId; private Result result; @@ -116,12 +120,14 @@ public enum Result { @Data @Accessors(chain = true) class GetTaskRequest { + private UUID taskId; } @Data @Accessors(chain = true) class GetTaskResponse { + private UUID taskId; private String type; private long version; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 51bf53d2..6bef8fc2 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -16,9 +16,12 @@ import com.transferwise.tasks.handler.interfaces.ITaskHandlerRegistry; import com.transferwise.tasks.helpers.ICoreMetricsTemplate; import com.transferwise.tasks.helpers.executors.IExecutorsHelper; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; import com.transferwise.tasks.triggering.ITasksExecutionTriggerer; import com.transferwise.tasks.utils.LogUtils; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -58,6 +61,8 @@ public class TasksService implements ITasksService, GracefulShutdownStrategy, In private IEnvironmentValidator environmentValidator; @Autowired private ICoreMetricsTemplate coreMetricsTemplate; + @Autowired(required = false) + private List taskRegistrationInterceptors = new ArrayList<>(); private ExecutorService afterCommitExecutorService; private TxSyncAdapterFactory txSyncAdapterFactory; @@ -85,12 +90,18 @@ public void afterPropertiesSet() { @Override @EntryPoint(usesExisting = true) @Transactional(rollbackFor = Exception.class) - public AddTaskResponse addTask(AddTaskRequest request) { + public AddTaskResponse addTask(AddTaskRequest requestParam) { return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.ADD_TASK, () -> { + AddTaskRequest request = requestParam; mdcService.put(request.getTaskId(), 0L); mdcService.putType(request.getType()); mdcService.putSubType(request.getSubType()); + + for (ITaskRegistrationDecorator interceptor : taskRegistrationInterceptors) { + request = interceptor.intercept(request); + } + ZonedDateTime now = ZonedDateTime.now(TwContextClockHolder.getClock()); final TaskStatus status = request.getRunAfterTime() == null || !request.getRunAfterTime().isAfter(now) ? TaskStatus.SUBMITTED : TaskStatus.WAITING; @@ -102,17 +113,19 @@ public AddTaskResponse addTask(AddTaskRequest request) { ZonedDateTime maxStuckTime = request.getExpectedQueueTime() == null ? now.plus(tasksProperties.getTaskStuckTimeout()) : now.plus(request.getExpectedQueueTime()); - byte[] data = request.getData(); + ITaskDao.InsertTaskResponse insertTaskResponse = taskDao.insertTask( - new ITaskDao.InsertTaskRequest().setData(data).setKey(request.getUniqueKey()) + new ITaskDao.InsertTaskRequest().setData(request.getData()).setKey(request.getUniqueKey()) .setRunAfterTime(request.getRunAfterTime()) .setSubType(request.getSubType()) .setType(request.getType()).setTaskId(request.getTaskId()) .setMaxStuckTime(maxStuckTime).setStatus(status).setPriority(priority) - .setCompression(request.getCompression())); + .setCompression(request.getCompression()) + .setTaskContext(request.getTaskContext()) + ); - coreMetricsTemplate - .registerTaskAdding(request.getType(), request.getUniqueKey(), insertTaskResponse.isInserted(), request.getRunAfterTime(), data); + coreMetricsTemplate.registerTaskAdding(request.getType(), request.getUniqueKey(), + insertTaskResponse.isInserted(), request.getRunAfterTime(), request.getData()); if (!insertTaskResponse.isInserted()) { coreMetricsTemplate.registerDuplicateTask(request.getType(), !request.isWarnWhenTaskExists()); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java index c5470d9c..4522fa1d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java @@ -4,6 +4,7 @@ import com.transferwise.tasks.domain.BaseTask; import com.transferwise.tasks.domain.IBaseTask; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.domain.TaskVersionId; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -54,6 +55,7 @@ class InsertTaskRequest { private ZonedDateTime maxStuckTime; private Integer priority; private CompressionRequest compression; + private TaskContext taskContext; } @Data diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java index 558460b7..3ab66017 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java @@ -2,6 +2,7 @@ import static com.transferwise.tasks.utils.TimeUtils.toZonedDateTime; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.transferwise.common.baseutils.ExceptionUtils; import com.transferwise.common.baseutils.UuidUtils; @@ -12,6 +13,7 @@ import com.transferwise.tasks.domain.BaseTask1; import com.transferwise.tasks.domain.FullTaskRecord; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.domain.TaskVersionId; import com.transferwise.tasks.helpers.ICoreMetricsTemplate; @@ -30,6 +32,7 @@ import java.time.Instant; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,6 +77,8 @@ public abstract class JdbcTaskDao implements ITaskDao, InitializingBean { protected ITaskDaoDataSerializer taskDataSerializer; @Autowired protected ICoreMetricsTemplate coreMetricsTemplate; + @Autowired + protected ObjectMapper objectMapper; private final ConcurrentHashMap sqlCache = new ConcurrentHashMap<>(); @@ -94,6 +99,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String insertTaskSql; protected String insertUniqueTaskKeySql; protected String insertTaskDataSql; + protected String insertTaskContext; protected String setToBeRetriedSql; protected String setToBeRetriedSql1; protected String grabForProcessingWithStatusAssertionSql; @@ -130,7 +136,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String getApproximateTaskDatasCountSql1; protected final int[] questionBuckets = {1, 5, 25, 125, 625}; - + public static final byte[] NULL_BLOB = "®".getBytes(StandardCharsets.UTF_8); protected final TaskStatus[] stuckStatuses = new TaskStatus[]{TaskStatus.NEW, TaskStatus.SUBMITTED, TaskStatus.WAITING, TaskStatus.PROCESSING}; protected ITwTaskTables twTaskTables(TasksProperties tasksProperties) { @@ -147,7 +153,7 @@ public void afterPropertiesSet() { insertTaskSql = "insert ignore into " + taskTable + "(id,type,sub_type,status,data,next_event_time" + ",state_time,time_created,time_updated,processing_tries_count,version,priority) values (?,?,?,?,?,?,?,?,?,?,?,?)"; insertUniqueTaskKeySql = "insert ignore into " + uniqueTaskKeyTable + "(task_id,key_hash,`key`) values (?, ?, ?)"; - insertTaskDataSql = "insert into " + taskDataTable + "(task_id,data_format,data) values (?,?,?)"; + insertTaskDataSql = "insert into " + taskDataTable + "(task_id,data_format,data,task_context_format,task_context) values (?,?,?,?,?)"; setToBeRetriedSql = "update " + taskTable + " set status=?,next_event_time=?,state_time=?,time_updated=?,version=? where id=? and version=?"; setToBeRetriedSql1 = "update " + taskTable + " set status=?,next_event_time=?" + ",processing_tries_count=?,state_time=?,time_updated=?,version=? where id=? and version=?"; @@ -174,11 +180,12 @@ public void afterPropertiesSet() { getStuckTasksCountGroupedSql = "select type, count(*) from (select type from " + taskTable + " where status=?" + " and next_event_time T getTask(UUID taskId, Class clazz) { } else if (clazz.equals(Task.class)) { List result = jdbcTemplate.query(getTaskSql1, args(taskId), (rs, rowNum) -> { byte[] data = getData(rs, 7, 9, 10); + TaskContext context = getContext(rs, 11, 12); return new Task().setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) .setVersion(rs.getLong(2)).setType(rs.getString(3)) .setStatus(rs.getString(4)).setPriority(rs.getInt(5)) .setSubType(rs.getString(6)).setData(data) - .setProcessingTriesCount(rs.getLong(8)); + .setProcessingTriesCount(rs.getLong(8)) + .setTaskContext(context); }); return (T) getFirst(result); } else if (clazz.equals(FullTaskRecord.class)) { List result = jdbcTemplate.query(getTaskSql2, args(taskId), (rs, rowNum) -> { byte[] data = getData(rs, 7, 12, 13); + TaskContext context = getContext(rs, 14, 15); return new FullTaskRecord().setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) .setVersion(rs.getLong(2)).setType(rs.getString(3)) .setStatus(rs.getString(4)).setPriority(rs.getInt(5)) @@ -475,7 +488,8 @@ public T getTask(UUID taskId, Class clazz) { .setProcessingTriesCount(rs.getLong(8)) .setStateTime(toZonedDateTime(rs.getTimestamp(9))) .setNextEventTime(toZonedDateTime(rs.getTimestamp(10))) - .setProcessingClientId(rs.getString(11)); + .setProcessingClientId(rs.getString(11)) + .setTaskContext(context); }); return (T) getFirst(result); } else { @@ -716,9 +730,22 @@ protected void assertIsolationLevel(Isolation isolation) { } } + protected TaskContext getContext(ResultSet rs, int contextFormatIdx, int contextIdx) throws SQLException { + return ExceptionUtils.doUnchecked(() -> { + var blob = taskDataSerializer.deserialize(new SerializedData() + .setDataFormat(rs.getInt(contextFormatIdx)) + .setData(rs.getBytes(contextIdx)) + ); + if (blob == null) { + return null; + } + return objectMapper.readValue(blob, TaskContext.class); + }); + } + protected byte[] getData(ResultSet rs, int deprecatedDataIdx, int dataFormatIdx, int dataIdx) throws SQLException { byte[] data = rs.getBytes(dataIdx); - if (data != null) { + if (data != null && !Arrays.equals(NULL_BLOB, data)) { return taskDataSerializer.deserialize(new SerializedData().setDataFormat(rs.getInt(dataFormatIdx)).setData(data)); } else { String deprecatedData = rs.getString(deprecatedDataIdx); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java index 4364e4bd..cc286d1d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java @@ -2,6 +2,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.ZonedDateTime; +import java.util.Objects; import java.util.UUID; import lombok.Data; import lombok.experimental.Accessors; @@ -22,7 +23,8 @@ public class FullTaskRecord implements ITask { private ZonedDateTime stateTime; private ZonedDateTime nextEventTime; private String processingClientId; - + private TaskContext taskContext; + @Override public ITaskVersionId getVersionId() { return new TaskVersionId(id, version); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java index 2e97e061..03f95d96 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java @@ -1,6 +1,7 @@ package com.transferwise.tasks.domain; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Objects; import java.util.UUID; import lombok.Data; import lombok.experimental.Accessors; @@ -18,6 +19,7 @@ public class Task implements ITask { private long version; private long processingTriesCount; private int priority; + private TaskContext taskContext; // TODO: We should create an interface instead. public BaseTask toBaseTask() { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java new file mode 100644 index 00000000..add371a7 --- /dev/null +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java @@ -0,0 +1,28 @@ +package com.transferwise.tasks.domain; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import lombok.Data; +import lombok.experimental.Accessors; + +@Data +@Accessors(chain = true) +public class TaskContext { + + public static final TaskContext EMPTY = new TaskContext(); + + private Map contextMap = new HashMap<>(); + + public T get(String key, Class cls) { + return Optional.ofNullable(contextMap.get(key)).map(cls::cast).orElse(null); + } + + public void merge(TaskContext taskContext) { + if (taskContext != null) { + if (taskContext.contextMap != null) { + contextMap.putAll(taskContext.contextMap); + } + } + } +} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java new file mode 100644 index 00000000..d593c600 --- /dev/null +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java @@ -0,0 +1,8 @@ +package com.transferwise.tasks.processing; + +import com.transferwise.tasks.ITasksService.AddTaskRequest; + +public interface ITaskRegistrationDecorator { + + AddTaskRequest intercept(AddTaskRequest request); +} diff --git a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml index 224c13b6..00628713 100644 --- a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml +++ b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml @@ -1,41 +1,47 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.0.xsd"> - CREATE TABLE tw_task ( - id BINARY(16) PRIMARY KEY NOT NULL, - status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), - -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 - next_event_time DATETIME(6) NOT NULL, - state_time DATETIME(6) NOT NULL, - version BIGINT NOT NULL, - priority INT NOT NULL DEFAULT 5, - processing_start_time DATETIME(6) NULL, - processing_tries_count BIGINT NOT NULL, - time_created DATETIME(6) NOT NULL, - time_updated DATETIME(6) NOT NULL, - type VARCHAR(250) CHARACTER SET latin1 NOT NULL, - sub_type VARCHAR(250) CHARACTER SET latin1 NULL, - processing_client_id VARCHAR(250) CHARACTER SET latin1 NULL, - data LONGTEXT NOT NULL); + CREATE TABLE tw_task + ( + id BINARY(16) PRIMARY KEY NOT NULL, + status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), + -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 + next_event_time DATETIME(6) NOT NULL, + state_time DATETIME(6) NOT NULL, + version BIGINT NOT NULL, + priority INT NOT NULL DEFAULT 5, + processing_start_time DATETIME(6) NULL, + processing_tries_count BIGINT NOT NULL, + time_created DATETIME(6) NOT NULL, + time_updated DATETIME(6) NOT NULL, + type VARCHAR(250) CHARACTER SET latin1 NOT NULL, + sub_type VARCHAR(250) CHARACTER SET latin1 NULL, + processing_client_id VARCHAR(250) CHARACTER SET latin1 NULL, + data LONGTEXT NOT NULL + ); CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); - CREATE TABLE tw_task_data ( - task_id BINARY(16) PRIMARY KEY NOT NULL, - data_format INT NOT NULL, - data LONGBLOB NOT NULL + CREATE TABLE tw_task_data + ( + task_id BINARY(16) PRIMARY KEY NOT NULL, + data_format INT NOT NULL, + data LONGBLOB NOT NULL, + task_context_format SMALLINT, + task_context BLOB ); - CREATE TABLE unique_tw_task_key ( - task_id BINARY(16) PRIMARY KEY, - key_hash INT NOT NULL, - `key` VARCHAR(150) CHARACTER SET latin1 NOT NULL, - UNIQUE KEY uidx1 (key_hash, `key`) + CREATE TABLE unique_tw_task_key + ( + task_id BINARY(16) PRIMARY KEY, + key_hash INT NOT NULL, + `key` VARCHAR(150) CHARACTER SET latin1 NOT NULL, + UNIQUE KEY uidx1 (key_hash, `key`) ); diff --git a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml index b7364a06..39cd09f2 100644 --- a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml +++ b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml @@ -4,41 +4,46 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.0.xsd"> - - - CREATE TABLE tw_task ( - id UUID PRIMARY KEY, - type TEXT NOT NULL, - sub_type TEXT NULL, - status TEXT NOT NULL, - data TEXT NOT NULL, - next_event_time TIMESTAMPTZ(6) NOT NULL, - state_time TIMESTAMPTZ(3) NOT NULL, - processing_client_id TEXT NULL, - processing_start_time TIMESTAMPTZ(3) NULL, - time_created TIMESTAMPTZ(3) NOT NULL, - time_updated TIMESTAMPTZ(3) NOT NULL, - processing_tries_count BIGINT NOT NULL, - version BIGINT NOT NULL, - priority INT NOT NULL DEFAULT 5 - ); + + + CREATE TABLE tw_task + ( + id UUID PRIMARY KEY, + type TEXT NOT NULL, + sub_type TEXT NULL, + status TEXT NOT NULL, + data TEXT NOT NULL, + next_event_time TIMESTAMPTZ(6) NOT NULL, + state_time TIMESTAMPTZ(3) NOT NULL, + processing_client_id TEXT NULL, + processing_start_time TIMESTAMPTZ(3) NULL, + time_created TIMESTAMPTZ(3) NOT NULL, + time_updated TIMESTAMPTZ(3) NOT NULL, + processing_tries_count BIGINT NOT NULL, + version BIGINT NOT NULL, + priority INT NOT NULL DEFAULT 5 + ); - CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); + CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); - CREATE TABLE tw_task_data ( - task_id UUID PRIMARY KEY NOT NULL, - data_format INT NOT NULL, - data BYTEA NOT NULL - ) WITH (toast_tuple_target=8160); + CREATE TABLE tw_task_data + ( + task_id UUID PRIMARY KEY NOT NULL, + data_format INT NOT NULL, + data BYTEA NOT NULL, + task_context_format SMALLINT, + task_context BYTEA + ) WITH (toast_tuple_target = 8160); - ALTER TABLE tw_task_data ALTER COLUMN data SET STORAGE EXTERNAL; + ALTER TABLE tw_task_data ALTER COLUMN data SET STORAGE EXTERNAL; - CREATE TABLE unique_tw_task_key ( - task_id UUID PRIMARY KEY NOT NULL, - key_hash INT NOT NULL, - key TEXT NOT NULL, - unique (key_hash, key) - ); - - + CREATE TABLE unique_tw_task_key + ( + task_id UUID PRIMARY KEY NOT NULL, + key_hash INT NOT NULL, + key TEXT NOT NULL, + unique (key_hash, key) + ); + +