Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add TaskContext to tasks to be track e2e instant transfer information #203

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,30 +1,40 @@
package com.transferwise.tasks.testapp;

import static com.transferwise.tasks.dao.JdbcTaskDao.NULL_BLOB;
import static org.assertj.core.api.Assertions.assertThat;
import static org.junit.jupiter.api.Assertions.assertEquals;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Strings;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.CompressionAlgorithm;
import com.transferwise.tasks.ITaskDataSerializer;
import com.transferwise.tasks.ITasksService.AddTaskRequest;
import com.transferwise.tasks.ITasksService.AddTaskRequest.CompressionRequest;
import com.transferwise.tasks.ITasksService.AddTaskResponse;
import com.transferwise.tasks.TaskDataSerializer;
import com.transferwise.tasks.TasksProperties;
import com.transferwise.tasks.dao.ITaskDao;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer;
import com.transferwise.tasks.dao.ITaskDaoDataSerializer.SerializedData;
import com.transferwise.tasks.dao.ITaskSqlMapper;
import com.transferwise.tasks.dao.MySqlTaskTypesMapper;
import com.transferwise.tasks.domain.FullTaskRecord;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.test.ITestTasksService;
import com.transferwise.tasks.test.dao.ITestTaskDao;
import java.nio.charset.StandardCharsets;
import java.util.Map;
import java.util.UUID;
import java.util.stream.Stream;
import lombok.SneakyThrows;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.Arguments;
import org.junit.jupiter.params.provider.CsvSource;
import org.junit.jupiter.params.provider.MethodSource;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.jdbc.core.JdbcTemplate;
Expand All @@ -43,6 +53,10 @@ public class TaskDataIntTest extends BaseIntTest {
private TasksProperties tasksProperties;
@Autowired
private JdbcTemplate jdbcTemplate;
@Autowired
private ObjectMapper objectMapper;
@Autowired
private ITaskDaoDataSerializer taskDataSerializer;

private CompressionAlgorithm originalAlgorithm;
private int originalMinSize;
Expand Down Expand Up @@ -137,4 +151,59 @@ void oldDataFieldIsNotSetForNewTasks() {
assertThat(oldData).isEqualTo("");
}

@ParameterizedTest
@MethodSource("providedAlgorithms")
@SneakyThrows
void testWritingTaskContextData() {
testTasksService.stopProcessing();

String data = "Hello World!";
var context = new TaskContext().setContextMap(Map.of("adam-jones", "jambi"));
AddTaskRequest addTaskRequest = new AddTaskRequest()
.setType("test_data")
.setData(data.getBytes(StandardCharsets.UTF_8))
.setTaskContext(context);
AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest);
UUID taskId = addTaskResponse.getTaskId();

FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8));

byte[] contextBlob = jdbcTemplate
.queryForObject("select task_context from tw_task_data where task_id=?", byte[].class, taskSqlMapper.uuidToSqlTaskId(taskId));

TaskContext contextData = objectMapper.readValue(contextBlob, TaskContext.class);
assertEquals(Map.of("adam-jones", "jambi"), contextData.getContextMap());

}


@ParameterizedTest
@MethodSource("providedAlgorithms")
void testReadingTaskContextData(String algorithm) {
testTasksService.stopProcessing();

String data = "Hello World!";
var context = new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge"));
AddTaskRequest addTaskRequest = new AddTaskRequest()
.setType("test_data")
.setData(data.getBytes(StandardCharsets.UTF_8))
.setTaskContext(context)
.setCompression(algorithm == null ? null : new CompressionRequest().setAlgorithm(CompressionAlgorithm.valueOf(algorithm)));
AddTaskResponse addTaskResponse = testTasksService.addTask(addTaskRequest);
UUID taskId = addTaskResponse.getTaskId();

FullTaskRecord fullTaskRecord = taskDao.getTask(taskId, FullTaskRecord.class);
assertThat(fullTaskRecord.getData()).isEqualTo(data.getBytes(StandardCharsets.UTF_8));
assertEquals(Map.of("danny-carey", "the-grudge"), fullTaskRecord.getTaskContext().getContextMap());

Task taskRecord = taskDao.getTask(taskId, Task.class);
assertEquals(new TaskContext().setContextMap(Map.of("danny-carey", "the-grudge")), taskRecord.getTaskContext());

}

private static Stream<String> providedAlgorithms() {
return Stream.of("GZIP", "LZ4", null, "NONE");
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,28 @@
package com.transferwise.tasks.testapp;

import static org.junit.jupiter.api.Assertions.assertEquals;

import com.transferwise.common.baseutils.UuidUtils;
import com.transferwise.tasks.BaseIntTest;
import com.transferwise.tasks.ITasksService.AddTaskRequest;
import com.transferwise.tasks.TasksService;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;

public class TaskInterceptionIntTest extends BaseIntTest {

@Autowired
private TasksService tasksService;

@Test
void jambiTaskIsInterceptedCorrectly() {
testTasksService.resetAndDeleteTasksWithTypes("test");
tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi"));
tasksService.addTask(new AddTaskRequest().setTaskId(UuidUtils.generatePrefixCombUuid()).setType("test").setSubType("Jambi"));
Awaitility.await().until(() -> testTasksService.getFinishedTasks("test", "Jambi").size() == 2);
assertEquals(2, meterRegistry.counter("tool", "song", "eulogy").count());

}

}
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,11 @@
import com.transferwise.tasks.helpers.kafka.messagetotask.IKafkaMessageHandler;
import com.transferwise.tasks.impl.jobs.interfaces.IJob;
import com.transferwise.tasks.processing.ITaskProcessingInterceptor;
import com.transferwise.tasks.processing.ITaskRegistrationDecorator;
import com.transferwise.tasks.testapp.testbeans.JambiTaskInterceptor;
import com.transferwise.tasks.testapp.testbeans.JambiTaskRegistrationDecorator;
import com.transferwise.tasks.utils.LogUtils;
import io.micrometer.core.instrument.MeterRegistry;
import java.time.ZonedDateTime;
import java.util.Arrays;
import java.util.Collections;
Expand Down Expand Up @@ -136,4 +140,14 @@ public IKafkaListenerConsumerPropertiesProvider twTasksKafkaListenerSpringKafkaC
return props;
};
}

@Bean
ITaskRegistrationDecorator jambiRegistrationInterceptor() {
return new JambiTaskRegistrationDecorator();
}

@Bean
ITaskProcessingInterceptor jambiProcessingInterceptor(MeterRegistry meterRegistry) {
return new JambiTaskInterceptor(meterRegistry);
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
package com.transferwise.tasks.testapp.testbeans;

import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.processing.ITaskProcessingInterceptor;
import io.micrometer.core.instrument.MeterRegistry;
import lombok.RequiredArgsConstructor;
import org.springframework.stereotype.Component;

@Component
@RequiredArgsConstructor
public class JambiTaskInterceptor implements ITaskProcessingInterceptor {

private final MeterRegistry meterRegistry;

@Override
public void doProcess(Task task, Runnable processor) {
if ("Jambi".equals(task.getSubType())) {
if (task.getTaskContext() != null) {
var name = task.getTaskContext().get("adam-jones", String.class);
meterRegistry.counter("tool", "song", name).increment();
}
}
processor.run();
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,19 @@
package com.transferwise.tasks.testapp.testbeans;

import com.transferwise.tasks.ITasksService.AddTaskRequest;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.processing.ITaskRegistrationDecorator;
import java.util.Map;
import org.springframework.stereotype.Component;

@Component
public class JambiTaskRegistrationDecorator implements ITaskRegistrationDecorator {

@Override
public AddTaskRequest intercept(AddTaskRequest request) {
if ("Jambi".equals(request.getSubType())) {
return request.setTaskContext(new TaskContext().setContextMap(Map.of("adam-jones", "eulogy")));
}
return request;
}
}
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package com.transferwise.tasks;

import com.transferwise.tasks.ITasksService.AddTaskResponse.Result;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.domain.TaskStatus;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import java.time.Duration;
Expand Down Expand Up @@ -45,6 +46,7 @@ class AddTaskRequest {
private boolean warnWhenTaskExists;
private Duration expectedQueueTime;
private CompressionRequest compression;
private TaskContext taskContext;

@Data
@Accessors(chain = true)
Expand Down Expand Up @@ -95,6 +97,7 @@ class ResumeTaskRequest {
@Data
@Accessors(chain = true)
class RescheduleTaskRequest {

private UUID taskId;
private long version;
private ZonedDateTime runAfterTime;
Expand All @@ -103,6 +106,7 @@ class RescheduleTaskRequest {
@Data
@Accessors(chain = true)
class RescheduleTaskResponse {

private UUID taskId;
private Result result;

Expand All @@ -116,12 +120,14 @@ public enum Result {
@Data
@Accessors(chain = true)
class GetTaskRequest {

private UUID taskId;
}

@Data
@Accessors(chain = true)
class GetTaskResponse {

private UUID taskId;
private String type;
private long version;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -16,9 +16,12 @@
import com.transferwise.tasks.handler.interfaces.ITaskHandlerRegistry;
import com.transferwise.tasks.helpers.ICoreMetricsTemplate;
import com.transferwise.tasks.helpers.executors.IExecutorsHelper;
import com.transferwise.tasks.processing.ITaskRegistrationDecorator;
import com.transferwise.tasks.triggering.ITasksExecutionTriggerer;
import com.transferwise.tasks.utils.LogUtils;
import java.time.ZonedDateTime;
import java.util.ArrayList;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
Expand Down Expand Up @@ -58,6 +61,8 @@ public class TasksService implements ITasksService, GracefulShutdownStrategy, In
private IEnvironmentValidator environmentValidator;
@Autowired
private ICoreMetricsTemplate coreMetricsTemplate;
@Autowired(required = false)
private List<ITaskRegistrationDecorator> taskRegistrationInterceptors = new ArrayList<>();

private ExecutorService afterCommitExecutorService;
private TxSyncAdapterFactory txSyncAdapterFactory;
Expand Down Expand Up @@ -85,12 +90,18 @@ public void afterPropertiesSet() {
@Override
@EntryPoint(usesExisting = true)
@Transactional(rollbackFor = Exception.class)
public AddTaskResponse addTask(AddTaskRequest request) {
public AddTaskResponse addTask(AddTaskRequest requestParam) {
return entryPointsHelper.continueOrCreate(EntryPointsGroups.TW_TASKS_ENGINE, EntryPointsNames.ADD_TASK,
() -> {
AddTaskRequest request = requestParam;
mdcService.put(request.getTaskId(), 0L);
mdcService.putType(request.getType());
mdcService.putSubType(request.getSubType());

for (ITaskRegistrationDecorator interceptor : taskRegistrationInterceptors) {
request = interceptor.intercept(request);
}

ZonedDateTime now = ZonedDateTime.now(TwContextClockHolder.getClock());
final TaskStatus status =
request.getRunAfterTime() == null || !request.getRunAfterTime().isAfter(now) ? TaskStatus.SUBMITTED : TaskStatus.WAITING;
Expand All @@ -102,17 +113,19 @@ public AddTaskResponse addTask(AddTaskRequest request) {

ZonedDateTime maxStuckTime =
request.getExpectedQueueTime() == null ? now.plus(tasksProperties.getTaskStuckTimeout()) : now.plus(request.getExpectedQueueTime());
byte[] data = request.getData();

ITaskDao.InsertTaskResponse insertTaskResponse = taskDao.insertTask(
new ITaskDao.InsertTaskRequest().setData(data).setKey(request.getUniqueKey())
new ITaskDao.InsertTaskRequest().setData(request.getData()).setKey(request.getUniqueKey())
.setRunAfterTime(request.getRunAfterTime())
.setSubType(request.getSubType())
.setType(request.getType()).setTaskId(request.getTaskId())
.setMaxStuckTime(maxStuckTime).setStatus(status).setPriority(priority)
.setCompression(request.getCompression()));
.setCompression(request.getCompression())
.setTaskContext(request.getTaskContext())
);

coreMetricsTemplate
.registerTaskAdding(request.getType(), request.getUniqueKey(), insertTaskResponse.isInserted(), request.getRunAfterTime(), data);
coreMetricsTemplate.registerTaskAdding(request.getType(), request.getUniqueKey(),
insertTaskResponse.isInserted(), request.getRunAfterTime(), request.getData());

if (!insertTaskResponse.isInserted()) {
coreMetricsTemplate.registerDuplicateTask(request.getType(), !request.isWarnWhenTaskExists());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
import com.transferwise.tasks.domain.BaseTask;
import com.transferwise.tasks.domain.IBaseTask;
import com.transferwise.tasks.domain.Task;
import com.transferwise.tasks.domain.TaskContext;
import com.transferwise.tasks.domain.TaskStatus;
import com.transferwise.tasks.domain.TaskVersionId;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
Expand Down Expand Up @@ -54,6 +55,7 @@ class InsertTaskRequest {
private ZonedDateTime maxStuckTime;
private Integer priority;
private CompressionRequest compression;
private TaskContext taskContext;
}

@Data
Expand Down
Loading
Loading