From cc24ecaf2e6ead80d0bcdaf9db9c4a870e8338e7 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Fri, 9 Aug 2024 00:55:58 +0300 Subject: [PATCH 1/7] jambi --- .../tasks/testapp/TaskDataIntTest.java | 69 ++++++++++++++++++ .../testapp/TaskInterceptionIntTest.java | 28 ++++++++ .../testapp/config/TestConfiguration.java | 14 ++++ .../testbeans/JambiTaskInterceptor.java | 25 +++++++ .../JambiTaskRegistrationDecorator.java | 19 +++++ .../com/transferwise/tasks/ITasksService.java | 6 ++ .../com/transferwise/tasks/TasksService.java | 25 +++++-- .../com/transferwise/tasks/dao/ITaskDao.java | 2 + .../transferwise/tasks/dao/JdbcTaskDao.java | 59 ++++++++++----- .../tasks/domain/FullTaskRecord.java | 4 +- .../com/transferwise/tasks/domain/Task.java | 2 + .../tasks/domain/TaskContext.java | 28 ++++++++ .../ITaskRegistrationDecorator.java | 8 +++ .../db/changelog/db.tw-tasks-mysql.xml | 60 +++++++++------- .../db/changelog/db.tw-tasks-postgres.xml | 71 ++++++++++--------- 15 files changed, 337 insertions(+), 83 deletions(-) create mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java create mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java create mode 100644 integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java create mode 100644 tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java create mode 100644 tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java index 60c14ba8..45a6e194 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java @@ -1,30 +1,40 @@ package com.transferwise.tasks.testapp; +import static com.transferwise.tasks.dao.JdbcTaskDao.NULL_BLOB; import static org.assertj.core.api.Assertions.assertThat; +import static org.junit.jupiter.api.Assertions.assertEquals; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Strings; import com.transferwise.tasks.BaseIntTest; import com.transferwise.tasks.CompressionAlgorithm; +import com.transferwise.tasks.ITaskDataSerializer; import com.transferwise.tasks.ITasksService.AddTaskRequest; import com.transferwise.tasks.ITasksService.AddTaskRequest.CompressionRequest; import com.transferwise.tasks.ITasksService.AddTaskResponse; +import com.transferwise.tasks.TaskDataSerializer; import com.transferwise.tasks.TasksProperties; import com.transferwise.tasks.dao.ITaskDao; +import com.transferwise.tasks.dao.ITaskDaoDataSerializer; import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData; import com.transferwise.tasks.dao.ITaskSqlMapper; import com.transferwise.tasks.dao.MySqlTaskTypesMapper; import com.transferwise.tasks.domain.FullTaskRecord; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.test.ITestTasksService; import com.transferwise.tasks.test.dao.ITestTaskDao; import java.nio.charset.StandardCharsets; +import java.util.Map; import java.util.UUID; import java.util.stream.Stream; +import lombok.SneakyThrows; import org.junit.jupiter.api.AfterEach; import org.junit.jupiter.api.BeforeEach; import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; +import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; @@ -43,6 +53,10 @@ public class TaskDataIntTest extends BaseIntTest { private TasksProperties tasksProperties; @Autowired private JdbcTemplate jdbcTemplate; + @Autowired + private ObjectMapper objectMapper; + @Autowired + private ITaskDaoDataSerializer taskDataSerializer; private CompressionAlgorithm originalAlgorithm; private int originalMinSize; @@ -137,4 +151,59 @@ void oldDataFieldIsNotSetForNewTasks() { assertThat(oldData).isEqualTo(""); } + @ParameterizedTest + @MethodSource("providedAlgorithms") + @SneakyThrows + void testWritingTaskContextData() { + testTasksService.stopProcessing(); + + String data = "Hello World!"; + var context = new TaskContext().setContextMap(Map.of("adam-jones", "jambi")); + AddTaskRequest addTaskRequest = new AddTaskRequest() + .setType("test_data") + .setData(data.getBytes(StandardCharsets.UTF_8)) + .setTaskContext(context); + AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest); + UUID taskId = addTaskResponse.getTaskId(); + + FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class); + assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8)); + + byte[] contextBlob = jdbcTemplate + .queryForObject("select task_context from tw_task_data where task_id=?", byte[].class, taskSqlMapper.uuidToSqlTaskId(taskId)); + + TaskContext contextData = objectMapper.readValue(contextBlob, TaskContext.class); + assertEquals(Map.of("adam-jones", "jambi"), contextData.getContextMap()); + + } + + + @ParameterizedTest + @MethodSource("providedAlgorithms") + void testReadingTaskContextData(String algorithm) { + testTasksService.stopProcessing(); + + String data = "Hello World!"; + var context = new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")); + AddTaskRequest addTaskRequest = new AddTaskRequest() + .setType("test_data") + .setData(data.getBytes(StandardCharsets.UTF_8)) + .setTaskContext(context) + .setCompression(algorithm == null ? null : new CompressionRequest().setAlgorithm(CompressionAlgorithm.valueOf(algorithm))); + AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest); + UUID taskId = addTaskResponse.getTaskId(); + + FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class); + assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8)); + assertEquals(Map.of("danny-carey", "the-grudge"), fullTaskRecord.getTaskContext().getContextMap()); + + Task taskRecord = taskDao.getTask(taskId, Task.class); + assertEquals(new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")), taskRecord.getTaskContext()); + + } + + private static Stream providedAlgorithms() { + return Stream.of("GZIP", "LZ4", null, "NONE"); + } + } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java new file mode 100644 index 00000000..160971eb --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskInterceptionIntTest.java @@ -0,0 +1,28 @@ +package com.transferwise.tasks.testapp; + +import static org.junit.jupiter.api.Assertions.assertEquals; + +import com.transferwise.common.baseutils.UuidUtils; +import com.transferwise.tasks.BaseIntTest; +import com.transferwise.tasks.ITasksService.AddTaskRequest; +import com.transferwise.tasks.TasksService; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.Test; +import org.springframework.beans.factory.annotation.Autowired; + +public class TaskInterceptionIntTest extends BaseIntTest { + + @Autowired + private TasksService tasksService; + + @Test + void jambiTaskIsInterceptedCorrectly() { + testTasksService.resetAndDeleteTasksWithTypes("test"); + tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi")); + tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi")); + Awaitility.await().until(() -> testTasksService.getFinishedTasks("test", "Jambi").size() == 2); + assertEquals(2, meterRegistry.counter("tool", "song", "eulogy").count()); + + } + +} diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java index 5e42c05b..2b8c4136 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/config/TestConfiguration.java @@ -9,7 +9,11 @@ import com.transferwise.tasks.helpers.kafka.messagetotask.IKafkaMessageHandler; import com.transferwise.tasks.impl.jobs.interfaces.IJob; import com.transferwise.tasks.processing.ITaskProcessingInterceptor; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; +import com.transferwise.tasks.testapp.testbeans.JambiTaskInterceptor; +import com.transferwise.tasks.testapp.testbeans.JambiTaskRegistrationDecorator; import com.transferwise.tasks.utils.LogUtils; +import io.micrometer.core.instrument.MeterRegistry; import java.time.ZonedDateTime; import java.util.Arrays; import java.util.Collections; @@ -136,4 +140,14 @@ public IKafkaListenerConsumerPropertiesProvider twTasksKafkaListenerSpringKafkaC return props; }; } + + @Bean + ITaskRegistrationDecorator jambiRegistrationInterceptor() { + return new JambiTaskRegistrationDecorator(); + } + + @Bean + ITaskProcessingInterceptor jambiProcessingInterceptor(MeterRegistry meterRegistry) { + return new JambiTaskInterceptor(meterRegistry); + } } diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java new file mode 100644 index 00000000..36945087 --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskInterceptor.java @@ -0,0 +1,25 @@ +package com.transferwise.tasks.testapp.testbeans; + +import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.processing.ITaskProcessingInterceptor; +import io.micrometer.core.instrument.MeterRegistry; +import lombok.RequiredArgsConstructor; +import org.springframework.stereotype.Component; + +@Component +@RequiredArgsConstructor +public class JambiTaskInterceptor implements ITaskProcessingInterceptor { + + private final MeterRegistry meterRegistry; + + @Override + public void doProcess(Task task, Runnable processor) { + if ("Jambi".equals(task.getSubType())) { + if (task.getTaskContext() != null) { + var name = task.getTaskContext().get("adam-jones", String.class); + meterRegistry.counter("tool", "song", name).increment(); + } + } + processor.run(); + } +} diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java new file mode 100644 index 00000000..59faa5bc --- /dev/null +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java @@ -0,0 +1,19 @@ +package com.transferwise.tasks.testapp.testbeans; + +import com.transferwise.tasks.ITasksService.AddTaskRequest; +import com.transferwise.tasks.domain.TaskContext; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; +import java.util.Map; +import org.springframework.stereotype.Component; + +@Component +public class JambiTaskRegistrationDecorator implements ITaskRegistrationDecorator { + + @Override + public AddTaskRequest intercept(AddTaskRequest request) { + if ("Jambi".equals(request.getSubType())) { + return request.setTaskContext(new TaskContext().setContextMap(Map.of("adam-jones", "eulogy"))); + } + return request; + } +} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java index dce558f7..0515004c 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/ITasksService.java @@ -1,6 +1,7 @@ package com.transferwise.tasks; import com.transferwise.tasks.ITasksService.AddTaskResponse.Result; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.Duration; @@ -45,6 +46,7 @@ class AddTaskRequest { private boolean warnWhenTaskExists; private Duration expectedQueueTime; private CompressionRequest compression; + private TaskContext taskContext; @Data @Accessors(chain = true) @@ -95,6 +97,7 @@ class ResumeTaskRequest { @Data @Accessors(chain = true) class RescheduleTaskRequest { + private UUID taskId; private long version; private ZonedDateTime runAfterTime; @@ -103,6 +106,7 @@ class RescheduleTaskRequest { @Data @Accessors(chain = true) class RescheduleTaskResponse { + private UUID taskId; private Result result; @@ -116,12 +120,14 @@ public enum Result { @Data @Accessors(chain = true) class GetTaskRequest { + private UUID taskId; } @Data @Accessors(chain = true) class GetTaskResponse { + private UUID taskId; private String type; private long version; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 51bf53d2..6bef8fc2 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -16,9 +16,12 @@ import com.transferwise.tasks.handler.interfaces.ITaskHandlerRegistry; import com.transferwise.tasks.helpers.ICoreMetricsTemplate; import com.transferwise.tasks.helpers.executors.IExecutorsHelper; +import com.transferwise.tasks.processing.ITaskRegistrationDecorator; import com.transferwise.tasks.triggering.ITasksExecutionTriggerer; import com.transferwise.tasks.utils.LogUtils; import java.time.ZonedDateTime; +import java.util.ArrayList; +import java.util.List; import java.util.UUID; import java.util.concurrent.ExecutorService; import java.util.concurrent.Future; @@ -58,6 +61,8 @@ public class TasksService implements ITasksService, GracefulShutdownStrategy, In private IEnvironmentValidator environmentValidator; @Autowired private ICoreMetricsTemplate coreMetricsTemplate; + @Autowired(required = false) + private List taskRegistrationInterceptors = new ArrayList<>(); private ExecutorService afterCommitExecutorService; private TxSyncAdapterFactory txSyncAdapterFactory; @@ -85,12 +90,18 @@ public void afterPropertiesSet() { @Override @EntryPoint(usesExisting = true) @Transactional(rollbackFor = Exception.class) - public AddTaskResponse addTask(AddTaskRequest request) { + public AddTaskResponse addTask(AddTaskRequest requestParam) { return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.ADD_TASK, () -> { + AddTaskRequest request = requestParam; mdcService.put(request.getTaskId(), 0L); mdcService.putType(request.getType()); mdcService.putSubType(request.getSubType()); + + for (ITaskRegistrationDecorator interceptor : taskRegistrationInterceptors) { + request = interceptor.intercept(request); + } + ZonedDateTime now = ZonedDateTime.now(TwContextClockHolder.getClock()); final TaskStatus status = request.getRunAfterTime() == null || !request.getRunAfterTime().isAfter(now) ? TaskStatus.SUBMITTED : TaskStatus.WAITING; @@ -102,17 +113,19 @@ public AddTaskResponse addTask(AddTaskRequest request) { ZonedDateTime maxStuckTime = request.getExpectedQueueTime() == null ? now.plus(tasksProperties.getTaskStuckTimeout()) : now.plus(request.getExpectedQueueTime()); - byte[] data = request.getData(); + ITaskDao.InsertTaskResponse insertTaskResponse = taskDao.insertTask( - new ITaskDao.InsertTaskRequest().setData(data).setKey(request.getUniqueKey()) + new ITaskDao.InsertTaskRequest().setData(request.getData()).setKey(request.getUniqueKey()) .setRunAfterTime(request.getRunAfterTime()) .setSubType(request.getSubType()) .setType(request.getType()).setTaskId(request.getTaskId()) .setMaxStuckTime(maxStuckTime).setStatus(status).setPriority(priority) - .setCompression(request.getCompression())); + .setCompression(request.getCompression()) + .setTaskContext(request.getTaskContext()) + ); - coreMetricsTemplate - .registerTaskAdding(request.getType(), request.getUniqueKey(), insertTaskResponse.isInserted(), request.getRunAfterTime(), data); + coreMetricsTemplate.registerTaskAdding(request.getType(), request.getUniqueKey(), + insertTaskResponse.isInserted(), request.getRunAfterTime(), request.getData()); if (!insertTaskResponse.isInserted()) { coreMetricsTemplate.registerDuplicateTask(request.getType(), !request.isWarnWhenTaskExists()); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java index c5470d9c..4522fa1d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/ITaskDao.java @@ -4,6 +4,7 @@ import com.transferwise.tasks.domain.BaseTask; import com.transferwise.tasks.domain.IBaseTask; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.domain.TaskVersionId; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; @@ -54,6 +55,7 @@ class InsertTaskRequest { private ZonedDateTime maxStuckTime; private Integer priority; private CompressionRequest compression; + private TaskContext taskContext; } @Data diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java index 558460b7..3ab66017 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java @@ -2,6 +2,7 @@ import static com.transferwise.tasks.utils.TimeUtils.toZonedDateTime; +import com.fasterxml.jackson.databind.ObjectMapper; import com.google.common.base.Preconditions; import com.transferwise.common.baseutils.ExceptionUtils; import com.transferwise.common.baseutils.UuidUtils; @@ -12,6 +13,7 @@ import com.transferwise.tasks.domain.BaseTask1; import com.transferwise.tasks.domain.FullTaskRecord; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.domain.TaskVersionId; import com.transferwise.tasks.helpers.ICoreMetricsTemplate; @@ -30,6 +32,7 @@ import java.time.Instant; import java.time.ZonedDateTime; import java.util.ArrayList; +import java.util.Arrays; import java.util.HashMap; import java.util.List; import java.util.Map; @@ -74,6 +77,8 @@ public abstract class JdbcTaskDao implements ITaskDao, InitializingBean { protected ITaskDaoDataSerializer taskDataSerializer; @Autowired protected ICoreMetricsTemplate coreMetricsTemplate; + @Autowired + protected ObjectMapper objectMapper; private final ConcurrentHashMap sqlCache = new ConcurrentHashMap<>(); @@ -94,6 +99,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String insertTaskSql; protected String insertUniqueTaskKeySql; protected String insertTaskDataSql; + protected String insertTaskContext; protected String setToBeRetriedSql; protected String setToBeRetriedSql1; protected String grabForProcessingWithStatusAssertionSql; @@ -130,7 +136,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String getApproximateTaskDatasCountSql1; protected final int[] questionBuckets = {1, 5, 25, 125, 625}; - + public static final byte[] NULL_BLOB = "®".getBytes(StandardCharsets.UTF_8); protected final TaskStatus[] stuckStatuses = new TaskStatus[]{TaskStatus.NEW, TaskStatus.SUBMITTED, TaskStatus.WAITING, TaskStatus.PROCESSING}; protected ITwTaskTables twTaskTables(TasksProperties tasksProperties) { @@ -147,7 +153,7 @@ public void afterPropertiesSet() { insertTaskSql = "insert ignore into " + taskTable + "(id,type,sub_type,status,data,next_event_time" + ",state_time,time_created,time_updated,processing_tries_count,version,priority) values (?,?,?,?,?,?,?,?,?,?,?,?)"; insertUniqueTaskKeySql = "insert ignore into " + uniqueTaskKeyTable + "(task_id,key_hash,`key`) values (?, ?, ?)"; - insertTaskDataSql = "insert into " + taskDataTable + "(task_id,data_format,data) values (?,?,?)"; + insertTaskDataSql = "insert into " + taskDataTable + "(task_id,data_format,data,task_context_format,task_context) values (?,?,?,?,?)"; setToBeRetriedSql = "update " + taskTable + " set status=?,next_event_time=?,state_time=?,time_updated=?,version=? where id=? and version=?"; setToBeRetriedSql1 = "update " + taskTable + " set status=?,next_event_time=?" + ",processing_tries_count=?,state_time=?,time_updated=?,version=? where id=? and version=?"; @@ -174,11 +180,12 @@ public void afterPropertiesSet() { getStuckTasksCountGroupedSql = "select type, count(*) from (select type from " + taskTable + " where status=?" + " and next_event_time T getTask(UUID taskId, Class clazz) { } else if (clazz.equals(Task.class)) { List result = jdbcTemplate.query(getTaskSql1, args(taskId), (rs, rowNum) -> { byte[] data = getData(rs, 7, 9, 10); + TaskContext context = getContext(rs, 11, 12); return new Task().setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) .setVersion(rs.getLong(2)).setType(rs.getString(3)) .setStatus(rs.getString(4)).setPriority(rs.getInt(5)) .setSubType(rs.getString(6)).setData(data) - .setProcessingTriesCount(rs.getLong(8)); + .setProcessingTriesCount(rs.getLong(8)) + .setTaskContext(context); }); return (T) getFirst(result); } else if (clazz.equals(FullTaskRecord.class)) { List result = jdbcTemplate.query(getTaskSql2, args(taskId), (rs, rowNum) -> { byte[] data = getData(rs, 7, 12, 13); + TaskContext context = getContext(rs, 14, 15); return new FullTaskRecord().setId(sqlMapper.sqlTaskIdToUuid(rs.getObject(1))) .setVersion(rs.getLong(2)).setType(rs.getString(3)) .setStatus(rs.getString(4)).setPriority(rs.getInt(5)) @@ -475,7 +488,8 @@ public T getTask(UUID taskId, Class clazz) { .setProcessingTriesCount(rs.getLong(8)) .setStateTime(toZonedDateTime(rs.getTimestamp(9))) .setNextEventTime(toZonedDateTime(rs.getTimestamp(10))) - .setProcessingClientId(rs.getString(11)); + .setProcessingClientId(rs.getString(11)) + .setTaskContext(context); }); return (T) getFirst(result); } else { @@ -716,9 +730,22 @@ protected void assertIsolationLevel(Isolation isolation) { } } + protected TaskContext getContext(ResultSet rs, int contextFormatIdx, int contextIdx) throws SQLException { + return ExceptionUtils.doUnchecked(() -> { + var blob = taskDataSerializer.deserialize(new SerializedData() + .setDataFormat(rs.getInt(contextFormatIdx)) + .setData(rs.getBytes(contextIdx)) + ); + if (blob == null) { + return null; + } + return objectMapper.readValue(blob, TaskContext.class); + }); + } + protected byte[] getData(ResultSet rs, int deprecatedDataIdx, int dataFormatIdx, int dataIdx) throws SQLException { byte[] data = rs.getBytes(dataIdx); - if (data != null) { + if (data != null && !Arrays.equals(NULL_BLOB, data)) { return taskDataSerializer.deserialize(new SerializedData().setDataFormat(rs.getInt(dataFormatIdx)).setData(data)); } else { String deprecatedData = rs.getString(deprecatedDataIdx); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java index 4364e4bd..cc286d1d 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/FullTaskRecord.java @@ -2,6 +2,7 @@ import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; import java.time.ZonedDateTime; +import java.util.Objects; import java.util.UUID; import lombok.Data; import lombok.experimental.Accessors; @@ -22,7 +23,8 @@ public class FullTaskRecord implements ITask { private ZonedDateTime stateTime; private ZonedDateTime nextEventTime; private String processingClientId; - + private TaskContext taskContext; + @Override public ITaskVersionId getVersionId() { return new TaskVersionId(id, version); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java index 2e97e061..03f95d96 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/Task.java @@ -1,6 +1,7 @@ package com.transferwise.tasks.domain; import edu.umd.cs.findbugs.annotations.SuppressFBWarnings; +import java.util.Objects; import java.util.UUID; import lombok.Data; import lombok.experimental.Accessors; @@ -18,6 +19,7 @@ public class Task implements ITask { private long version; private long processingTriesCount; private int priority; + private TaskContext taskContext; // TODO: We should create an interface instead. public BaseTask toBaseTask() { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java new file mode 100644 index 00000000..add371a7 --- /dev/null +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/domain/TaskContext.java @@ -0,0 +1,28 @@ +package com.transferwise.tasks.domain; + +import java.util.HashMap; +import java.util.Map; +import java.util.Optional; +import lombok.Data; +import lombok.experimental.Accessors; + +@Data +@Accessors(chain = true) +public class TaskContext { + + public static final TaskContext EMPTY = new TaskContext(); + + private Map contextMap = new HashMap<>(); + + public T get(String key, Class cls) { + return Optional.ofNullable(contextMap.get(key)).map(cls::cast).orElse(null); + } + + public void merge(TaskContext taskContext) { + if (taskContext != null) { + if (taskContext.contextMap != null) { + contextMap.putAll(taskContext.contextMap); + } + } + } +} diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java new file mode 100644 index 00000000..d593c600 --- /dev/null +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java @@ -0,0 +1,8 @@ +package com.transferwise.tasks.processing; + +import com.transferwise.tasks.ITasksService.AddTaskRequest; + +public interface ITaskRegistrationDecorator { + + AddTaskRequest intercept(AddTaskRequest request); +} diff --git a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml index 224c13b6..00628713 100644 --- a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml +++ b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-mysql.xml @@ -1,41 +1,47 @@ + xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" + xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.0.xsd"> - CREATE TABLE tw_task ( - id BINARY(16) PRIMARY KEY NOT NULL, - status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), - -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 - next_event_time DATETIME(6) NOT NULL, - state_time DATETIME(6) NOT NULL, - version BIGINT NOT NULL, - priority INT NOT NULL DEFAULT 5, - processing_start_time DATETIME(6) NULL, - processing_tries_count BIGINT NOT NULL, - time_created DATETIME(6) NOT NULL, - time_updated DATETIME(6) NOT NULL, - type VARCHAR(250) CHARACTER SET latin1 NOT NULL, - sub_type VARCHAR(250) CHARACTER SET latin1 NULL, - processing_client_id VARCHAR(250) CHARACTER SET latin1 NULL, - data LONGTEXT NOT NULL); + CREATE TABLE tw_task + ( + id BINARY(16) PRIMARY KEY NOT NULL, + status ENUM('NEW', 'WAITING', 'SUBMITTED', 'PROCESSING', 'DONE', 'ERROR', 'FAILED'), + -- Microsecond precision (6) is strongly recommended here to reduce the chance of gap locks deadlocking on tw_task_idx1 + next_event_time DATETIME(6) NOT NULL, + state_time DATETIME(6) NOT NULL, + version BIGINT NOT NULL, + priority INT NOT NULL DEFAULT 5, + processing_start_time DATETIME(6) NULL, + processing_tries_count BIGINT NOT NULL, + time_created DATETIME(6) NOT NULL, + time_updated DATETIME(6) NOT NULL, + type VARCHAR(250) CHARACTER SET latin1 NOT NULL, + sub_type VARCHAR(250) CHARACTER SET latin1 NULL, + processing_client_id VARCHAR(250) CHARACTER SET latin1 NULL, + data LONGTEXT NOT NULL + ); CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); - CREATE TABLE tw_task_data ( - task_id BINARY(16) PRIMARY KEY NOT NULL, - data_format INT NOT NULL, - data LONGBLOB NOT NULL + CREATE TABLE tw_task_data + ( + task_id BINARY(16) PRIMARY KEY NOT NULL, + data_format INT NOT NULL, + data LONGBLOB NOT NULL, + task_context_format SMALLINT, + task_context BLOB ); - CREATE TABLE unique_tw_task_key ( - task_id BINARY(16) PRIMARY KEY, - key_hash INT NOT NULL, - `key` VARCHAR(150) CHARACTER SET latin1 NOT NULL, - UNIQUE KEY uidx1 (key_hash, `key`) + CREATE TABLE unique_tw_task_key + ( + task_id BINARY(16) PRIMARY KEY, + key_hash INT NOT NULL, + `key` VARCHAR(150) CHARACTER SET latin1 NOT NULL, + UNIQUE KEY uidx1 (key_hash, `key`) ); diff --git a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml index b7364a06..39cd09f2 100644 --- a/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml +++ b/tw-tasks-core/src/main/resources/db/changelog/db.tw-tasks-postgres.xml @@ -4,41 +4,46 @@ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://www.liquibase.org/xml/ns/dbchangelog http://www.liquibase.org/xml/ns/dbchangelog/dbchangelog-3.0.xsd"> - - - CREATE TABLE tw_task ( - id UUID PRIMARY KEY, - type TEXT NOT NULL, - sub_type TEXT NULL, - status TEXT NOT NULL, - data TEXT NOT NULL, - next_event_time TIMESTAMPTZ(6) NOT NULL, - state_time TIMESTAMPTZ(3) NOT NULL, - processing_client_id TEXT NULL, - processing_start_time TIMESTAMPTZ(3) NULL, - time_created TIMESTAMPTZ(3) NOT NULL, - time_updated TIMESTAMPTZ(3) NOT NULL, - processing_tries_count BIGINT NOT NULL, - version BIGINT NOT NULL, - priority INT NOT NULL DEFAULT 5 - ); + + + CREATE TABLE tw_task + ( + id UUID PRIMARY KEY, + type TEXT NOT NULL, + sub_type TEXT NULL, + status TEXT NOT NULL, + data TEXT NOT NULL, + next_event_time TIMESTAMPTZ(6) NOT NULL, + state_time TIMESTAMPTZ(3) NOT NULL, + processing_client_id TEXT NULL, + processing_start_time TIMESTAMPTZ(3) NULL, + time_created TIMESTAMPTZ(3) NOT NULL, + time_updated TIMESTAMPTZ(3) NOT NULL, + processing_tries_count BIGINT NOT NULL, + version BIGINT NOT NULL, + priority INT NOT NULL DEFAULT 5 + ); - CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); + CREATE INDEX tw_task_idx1 ON tw_task (status, next_event_time); - CREATE TABLE tw_task_data ( - task_id UUID PRIMARY KEY NOT NULL, - data_format INT NOT NULL, - data BYTEA NOT NULL - ) WITH (toast_tuple_target=8160); + CREATE TABLE tw_task_data + ( + task_id UUID PRIMARY KEY NOT NULL, + data_format INT NOT NULL, + data BYTEA NOT NULL, + task_context_format SMALLINT, + task_context BYTEA + ) WITH (toast_tuple_target = 8160); - ALTER TABLE tw_task_data ALTER COLUMN data SET STORAGE EXTERNAL; + ALTER TABLE tw_task_data ALTER COLUMN data SET STORAGE EXTERNAL; - CREATE TABLE unique_tw_task_key ( - task_id UUID PRIMARY KEY NOT NULL, - key_hash INT NOT NULL, - key TEXT NOT NULL, - unique (key_hash, key) - ); - - + CREATE TABLE unique_tw_task_key + ( + task_id UUID PRIMARY KEY NOT NULL, + key_hash INT NOT NULL, + key TEXT NOT NULL, + unique (key_hash, key) + ); + + From f250c43a2912fa81f2b8a579cbeccbb1deca530e Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Fri, 9 Aug 2024 11:34:32 +0300 Subject: [PATCH 2/7] spotgbugs --- .../java/com/transferwise/tasks/testapp/TaskDataIntTest.java | 4 ---- .../src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java | 2 +- 2 files changed, 1 insertion(+), 5 deletions(-) diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java index 45a6e194..c3b8539b 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/TaskDataIntTest.java @@ -1,6 +1,5 @@ package com.transferwise.tasks.testapp; -import static com.transferwise.tasks.dao.JdbcTaskDao.NULL_BLOB; import static org.assertj.core.api.Assertions.assertThat; import static org.junit.jupiter.api.Assertions.assertEquals; @@ -8,11 +7,9 @@ import com.google.common.base.Strings; import com.transferwise.tasks.BaseIntTest; import com.transferwise.tasks.CompressionAlgorithm; -import com.transferwise.tasks.ITaskDataSerializer; import com.transferwise.tasks.ITasksService.AddTaskRequest; import com.transferwise.tasks.ITasksService.AddTaskRequest.CompressionRequest; import com.transferwise.tasks.ITasksService.AddTaskResponse; -import com.transferwise.tasks.TaskDataSerializer; import com.transferwise.tasks.TasksProperties; import com.transferwise.tasks.dao.ITaskDao; import com.transferwise.tasks.dao.ITaskDaoDataSerializer; @@ -34,7 +31,6 @@ import org.junit.jupiter.api.Test; import org.junit.jupiter.params.ParameterizedTest; import org.junit.jupiter.params.provider.Arguments; -import org.junit.jupiter.params.provider.CsvSource; import org.junit.jupiter.params.provider.MethodSource; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.jdbc.core.JdbcTemplate; diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java index 3ab66017..bdf8d3fe 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java @@ -136,7 +136,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String getApproximateTaskDatasCountSql1; protected final int[] questionBuckets = {1, 5, 25, 125, 625}; - public static final byte[] NULL_BLOB = "®".getBytes(StandardCharsets.UTF_8); + static final byte[] NULL_BLOB = "./g".getBytes(StandardCharsets.UTF_8); protected final TaskStatus[] stuckStatuses = new TaskStatus[]{TaskStatus.NEW, TaskStatus.SUBMITTED, TaskStatus.WAITING, TaskStatus.PROCESSING}; protected ITwTaskTables twTaskTables(TasksProperties tasksProperties) { From 5989df3b15e6b7d12321c067317e2f5b0efc109c Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Fri, 9 Aug 2024 15:43:34 +0300 Subject: [PATCH 3/7] add changelog --- CHANGELOG.md | 54 +++++++++++++++++-- gradle.properties | 2 +- .../JambiTaskRegistrationDecorator.java | 2 +- .../com/transferwise/tasks/TasksService.java | 6 +-- .../transferwise/tasks/dao/JdbcTaskDao.java | 24 +++++---- .../ITaskRegistrationDecorator.java | 2 +- 6 files changed, 71 insertions(+), 19 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index ccc0f263..6e002033 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -5,52 +5,97 @@ All notable changes to this project will be documented in this file. The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/), and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). +#### 1.43.0 - 2024/08/09 + +- Added support for task context + +You will need to do the following migration: + +Postgres: + +``` +ALTER TABLE tw_task_data +ADD COLUMN task_context_format SMALLINT, +ADD COLUMN task_context BYTEA; +``` + +MariaDB: + +``` +ALTER TABLE tw_task_data +ADD COLUMN task_context_format SMALLINT, +ADD COLUMN task_context BLOB, +ALGORITHM=INPLACE, LOCK=NONE; +``` + #### 1.42.0 - 2024/07/16 + ### Added + - Support for Spring Boot 3.3. ### Removed + - Support for spring boot 3.1 and 2.7 versions. #### 1.41.6 - 2024/04/17 + ### Added -- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with extreme amount of tasks might benefit from this. + +- `/getTaskTypes` endpoint may be disabled through configuration property `tw-tasks.core.tasks-management.enable-get-task-types: false`. Services with + extreme amount of tasks might benefit from this. #### 1.41.5 - 2024/04/05 + ### Changed + * Use static methods to create BeanPostProcessors. #### 1.41.4 - 2024/04/02 + ### Changed + - `/getTaskTypes` endpoint accepts optional query parameter `status` to filter only types of tasks in the particular status(es). - Fixed a bug with `taskType` and `taskSubType` filters on query endpoints when multiple values are supplied, where it would consider only one value. #### 1.41.3 - 2024/02/29 + ### Changed + * Add compatibility with Spring Boot 3.2. * Update dependencies #### 1.41.2 - 2024/02/16 + ### Changed -* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet. + +* Kafka producer instantiation will be attempted up to 5 times with a 500ms delay between each attempt. In some cases, it has been observed that the + CI fails to start the Kafka producer because the kafka docker container itself seems to not be fully up & accessible yet. #### 1.41.1 - 2023/12/19 + ### Changed + - When building a Spring `ResponseEntity` with an explicit status, provide an integer derived from the `HttpStatus` enum, rather than providing the `HttpStatus` directly, to handle binary incompatibility between Spring 5 and 6 causing NoSuchMethod errors when tw-tasks is used with Spring 6 #### 1.41.0 - 2023/11/16 + ### Added + - Added `taskType` and `taskSubType` parameters to management query endpoints. - Added `/getTaskTypes` endpoint to retrieve list of registered task types and sub-types ## 1.40.6 - 2023/11/16 + ### Fixed * NullPointerException in TaskManagementService.getTaskData in case task is not found #### 1.40.5 - 2023/10/30 + ### Added + - Setting METADATA_MAX_AGE_CONFIG to two minutes for producer #### 1.40.4 - 2023/10/06 @@ -75,7 +120,8 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ### Added -* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task types in your service. +* introduced a new configuration parameter `tw-tasks.core.no-op-task-types` that allows a default no operation task handler to pick up deprecated task + types in your service. #### 1.40.1 - 2023/07/12 @@ -86,6 +132,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). #### 1.40.0 - 2023/06/12 ### Added + * CronJob annotation for Spring bean's methods #### 1.39.2 - 2023/06/06 @@ -99,6 +146,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). #### 1.39.1 - 2023/04/19 ### Changed + * Kafka consumer offset duration is always considered as positive since we cannot reset the offsets to future timestamps. * Both `PT1H` and `-PT1H` are treated the same ie `PT1H`. This value gets subtracted by now() timestamp. * Added second kafka consumer for the tests in `SeekToDurationOnRebalanceListenerIntTest` class diff --git a/gradle.properties b/gradle.properties index 168f4a0e..cc94780d 100644 --- a/gradle.properties +++ b/gradle.properties @@ -1,2 +1,2 @@ -version=1.42.0 +version=1.43.0 org.gradle.internal.http.socketTimeout=120000 diff --git a/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java index 59faa5bc..2acf4e84 100644 --- a/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java +++ b/integration-tests/src/test/java/com/transferwise/tasks/testapp/testbeans/JambiTaskRegistrationDecorator.java @@ -10,7 +10,7 @@ public class JambiTaskRegistrationDecorator implements ITaskRegistrationDecorator { @Override - public AddTaskRequest intercept(AddTaskRequest request) { + public AddTaskRequest decorate(AddTaskRequest request) { if ("Jambi".equals(request.getSubType())) { return request.setTaskContext(new TaskContext().setContextMap(Map.of("adam-jones", "eulogy"))); } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 6bef8fc2..37e40738 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -62,7 +62,7 @@ public class TasksService implements ITasksService, GracefulShutdownStrategy, In @Autowired private ICoreMetricsTemplate coreMetricsTemplate; @Autowired(required = false) - private List taskRegistrationInterceptors = new ArrayList<>(); + private List taskRegistrationDecorators = new ArrayList<>(); private ExecutorService afterCommitExecutorService; private TxSyncAdapterFactory txSyncAdapterFactory; @@ -98,8 +98,8 @@ public AddTaskResponse addTask(AddTaskRequest requestParam) { mdcService.putType(request.getType()); mdcService.putSubType(request.getSubType()); - for (ITaskRegistrationDecorator interceptor : taskRegistrationInterceptors) { - request = interceptor.intercept(request); + for (ITaskRegistrationDecorator interceptor : taskRegistrationDecorators) { + request = interceptor.decorate(request); } ZonedDateTime now = ZonedDateTime.now(TwContextClockHolder.getClock()); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java index bdf8d3fe..7b840a71 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java @@ -279,16 +279,20 @@ public InsertTaskResponse insertTask(InsertTaskRequest request) { DataSourceUtils.releaseConnection(con, dataSource); } - byte[] data = request.getData() == null ? NULL_BLOB : request.getData(); - SerializedData serializedData = taskDataSerializer.serialize(data, request.getCompression()); - byte[] contextBlob = objectMapper.writeValueAsBytes(request.getTaskContext()); - SerializedData serializedContext = taskDataSerializer.serialize(contextBlob, request.getCompression()); - jdbcTemplate.update( - insertTaskDataSql, - args(taskId, Integer.valueOf(serializedData.getDataFormat()), serializedData.getData(), serializedContext.getDataFormat(), - serializedContext.getData()) - ); - coreMetricsTemplate.registerDaoTaskDataSerialization(request.getType(), data.length, serializedData.getData().length); + if (request.getData() != null || request.getTaskContext() != null) { + byte[] data = request.getData() == null ? NULL_BLOB : request.getData(); + SerializedData serializedData = taskDataSerializer.serialize(data, request.getCompression()); + byte[] contextBlob = objectMapper.writeValueAsBytes(request.getTaskContext()); + SerializedData serializedContext = taskDataSerializer.serialize(contextBlob, request.getCompression()); + jdbcTemplate.update( + insertTaskDataSql, + args(taskId, Integer.valueOf(serializedData.getDataFormat()), serializedData.getData(), serializedContext.getDataFormat(), + serializedContext.getData()) + ); + if (request.getData() != null) { + coreMetricsTemplate.registerDaoTaskDataSerialization(request.getType(), data.length, serializedData.getData().length); + } + } return new InsertTaskResponse().setTaskId(taskId).setInserted(true); }); } diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java index d593c600..2f817d13 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/ITaskRegistrationDecorator.java @@ -4,5 +4,5 @@ public interface ITaskRegistrationDecorator { - AddTaskRequest intercept(AddTaskRequest request); + AddTaskRequest decorate(AddTaskRequest request); } From 865ed17dfe37cb6fb9ea41dade32a361c0cb259b Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Fri, 6 Sep 2024 15:56:37 +0300 Subject: [PATCH 4/7] hey --- .../src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java index 7b840a71..35fc50c6 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/dao/JdbcTaskDao.java @@ -136,7 +136,7 @@ public JdbcTaskDao(DataSource dataSource, ITaskSqlMapper sqlMapper) { protected String getApproximateTaskDatasCountSql1; protected final int[] questionBuckets = {1, 5, 25, 125, 625}; - static final byte[] NULL_BLOB = "./g".getBytes(StandardCharsets.UTF_8); + static final byte[] NULL_BLOB = "®".getBytes(StandardCharsets.UTF_8); protected final TaskStatus[] stuckStatuses = new TaskStatus[]{TaskStatus.NEW, TaskStatus.SUBMITTED, TaskStatus.WAITING, TaskStatus.PROCESSING}; protected ITwTaskTables twTaskTables(TasksProperties tasksProperties) { From 7af8dcd7fd03658f2ab4882d49ad69cdabe7b315 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Mon, 9 Sep 2024 11:10:46 +0300 Subject: [PATCH 5/7] minor change --- .../src/main/java/com/transferwise/tasks/TasksService.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java index 37e40738..c32b8077 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/TasksService.java @@ -98,8 +98,8 @@ public AddTaskResponse addTask(AddTaskRequest requestParam) { mdcService.putType(request.getType()); mdcService.putSubType(request.getSubType()); - for (ITaskRegistrationDecorator interceptor : taskRegistrationDecorators) { - request = interceptor.decorate(request); + for (ITaskRegistrationDecorator decorator : taskRegistrationDecorators) { + request = decorator.decorate(request); } ZonedDateTime now = ZonedDateTime.now(TwContextClockHolder.getClock()); From f7f1ade4fd5a5dbbecad8fbbcf796d987a94ef78 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Mon, 9 Sep 2024 14:22:13 +0300 Subject: [PATCH 6/7] fix testDao --- tw-tasks-core-test/build.gradle | 1 + .../tasks/test/dao/JdbcTestTaskDao.java | 18 ++++++++++++++++-- 2 files changed, 17 insertions(+), 2 deletions(-) diff --git a/tw-tasks-core-test/build.gradle b/tw-tasks-core-test/build.gradle index 91a966ef..2c89fcea 100644 --- a/tw-tasks-core-test/build.gradle +++ b/tw-tasks-core-test/build.gradle @@ -9,4 +9,5 @@ dependencies { implementation libraries.springBeans implementation libraries.springJdbc implementation libraries.springContext + implementation libraries.jacksonDatabind } diff --git a/tw-tasks-core-test/src/main/java/com/transferwise/tasks/test/dao/JdbcTestTaskDao.java b/tw-tasks-core-test/src/main/java/com/transferwise/tasks/test/dao/JdbcTestTaskDao.java index 83452730..cfdc93bb 100644 --- a/tw-tasks-core-test/src/main/java/com/transferwise/tasks/test/dao/JdbcTestTaskDao.java +++ b/tw-tasks-core-test/src/main/java/com/transferwise/tasks/test/dao/JdbcTestTaskDao.java @@ -1,10 +1,14 @@ package com.transferwise.tasks.test.dao; +import com.fasterxml.jackson.databind.ObjectMapper; +import com.transferwise.common.baseutils.jackson.DefaultJsonConverter; +import com.transferwise.common.baseutils.jackson.JsonConverter; import com.transferwise.tasks.dao.ITaskDaoDataSerializer; import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData; import com.transferwise.tasks.dao.ITaskSqlMapper; import com.transferwise.tasks.dao.ITwTaskTables; import com.transferwise.tasks.domain.Task; +import com.transferwise.tasks.domain.TaskContext; import com.transferwise.tasks.domain.TaskStatus; import com.transferwise.tasks.helpers.sql.ArgumentPreparedStatementSetter; import com.transferwise.tasks.helpers.sql.CacheKey; @@ -59,7 +63,7 @@ private static class Queries { " and status in (??)" }; getTasksByTypeAndStatusAndSubType = new String[]{ - "select id,type,sub_type,t.data,status,version,processing_tries_count,priority,d.data_format,d.data" + "select id,type,sub_type,t.data,status,version,processing_tries_count,priority,d.data_format,d.data,d.task_context_format,d.task_context" + " from " + tasksTable + " t left join " + dataTable + " d on t.id=d.task_id" + " where type=?", " and status in (??)", @@ -74,6 +78,7 @@ private static class Queries { private final ITaskSqlMapper sqlMapper; private final ConcurrentHashMap sqlCache; private final ITaskDaoDataSerializer taskDataSerializer; + private final JsonConverter jsonConverter = new DefaultJsonConverter(new ObjectMapper()); public JdbcTestTaskDao(DataSource dataSource, ITwTaskTables tables, ITaskSqlMapper sqlMapper, ITaskDaoDataSerializer taskDataSerializer) { this.sqlCache = new ConcurrentHashMap<>(); @@ -198,8 +203,16 @@ public List findTasksByTypeSubTypeAndStatus(String type, String subType, T (rs, rowNum) -> { byte[] data; byte[] newData = rs.getBytes(10); + TaskContext context = null; if (newData != null) { data = taskDataSerializer.deserialize(new SerializedData().setDataFormat(rs.getInt(9)).setData(newData)); + var contextBlob = taskDataSerializer.deserialize(new SerializedData() + .setDataFormat(rs.getInt(11)) + .setData(rs.getBytes(12)) + ); + if (contextBlob != null) { + context = jsonConverter.toObject(contextBlob, TaskContext.class); + } } else { data = rs.getBytes(4); } @@ -211,7 +224,8 @@ public List findTasksByTypeSubTypeAndStatus(String type, String subType, T .setStatus(rs.getString(5)) .setVersion(rs.getLong(6)) .setProcessingTriesCount(rs.getLong(7)) - .setPriority(rs.getInt(8)); + .setPriority(rs.getInt(8)) + .setTaskContext(context); } ); } From 95dc04bb76335dd0a7b42fb62346ed9468ab6e15 Mon Sep 17 00:00:00 2001 From: Hussain Kara Fallah Date: Mon, 9 Sep 2024 16:37:58 +0300 Subject: [PATCH 7/7] java-17 --- build.common.gradle | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/build.common.gradle b/build.common.gradle index ca7eab54..26a549f9 100644 --- a/build.common.gradle +++ b/build.common.gradle @@ -92,8 +92,8 @@ dependencies { java { if (springBootVersion.startsWith("2.")) { - sourceCompatibility = JavaVersion.VERSION_11 - targetCompatibility = JavaVersion.VERSION_11 + sourceCompatibility = JavaVersion.VERSION_17 + targetCompatibility = JavaVersion.VERSION_17 } else { sourceCompatibility = JavaVersion.VERSION_17 targetCompatibility = JavaVersion.VERSION_17