From ff2e225dae76d4f6565933546dcc65cb0b37c443 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Kristo=20Kuusk=C3=BCll?= Date: Wed, 30 Oct 2024 14:25:30 +0200 Subject: [PATCH] Fix. --- CHANGELOG.md | 2 +- .../transferwise/tasks/processing/GlobalProcessingState.java | 2 +- .../transferwise/tasks/processing/TasksProcessingService.java | 1 - .../tasks/triggering/KafkaTasksExecutionTriggerer.java | 2 ++ 4 files changed, 4 insertions(+), 3 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 0a11d400..6cf07a4b 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -15,7 +15,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html). ## 1.44.0 - 2024/10/16 - When registering cron tasks, log error if job already exists, but task is in error state. - - If silent mode is turned on, then this log will not appear. +- If silent mode is turned on, then this log will not appear. ## 1.43.0 - 2024/08/09 diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/GlobalProcessingState.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/GlobalProcessingState.java index 04350b6f..a8b2e4ac 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/GlobalProcessingState.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/GlobalProcessingState.java @@ -49,7 +49,7 @@ public Bucket(int minPriority, int maxPriority) { } } - // Optimization to avoid waiting behind a lock. + // Optimization to avoid waiting behind a lock, but also reducing the amount of signals and context switches. // One ongoing version update is enough to wake up all necessary components. private AtomicBoolean versionUpdateInProgress = new AtomicBoolean(); diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java index c2d9c532..9736ab60 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/processing/TasksProcessingService.java @@ -299,7 +299,6 @@ protected void markAsError(IBaseTask task, String bucketId) { protected ProcessTaskResponse grabTaskForProcessing(String bucketId, BaseTask task) { GlobalProcessingState.Bucket bucket = globalProcessingState.getBuckets().get(bucketId); - BucketProperties bucketProperties = bucketsManager.getBucketProperties(bucketId); ITaskHandler taskHandler = taskHandlerRegistry.getTaskHandler(task); if (taskHandler == null) { diff --git a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java index 7d86710a..69760124 100644 --- a/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java +++ b/tw-tasks-core/src/main/java/com/transferwise/tasks/triggering/KafkaTasksExecutionTriggerer.java @@ -474,6 +474,8 @@ private void commitOffsetsWithLowFrequency(ConsumerBucket consumerBucket) { .map(e -> e.getKey() + ":" + e.getValue().offset()).collect(Collectors.joining(", "))); } + releaseCompletedOffsets(consumerBucket); + consumerBucket.getKafkaConsumer().commitAsync(consumerBucket.getOffsetsToBeCommitted(), (map, e) -> { if (e != null) { coreMetricsTemplate.registerKafkaTasksExecutionTriggererCommit(bucketId, false, false);