Skip to content

Commit

Permalink
Fix.
Browse files Browse the repository at this point in the history
  • Loading branch information
onukristo committed Oct 30, 2024
1 parent 170bd55 commit ff2e225
Show file tree
Hide file tree
Showing 4 changed files with 4 additions and 3 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ to [Semantic Versioning](https://semver.org/spec/v2.0.0.html).
## 1.44.0 - 2024/10/16

- When registering cron tasks, log error if job already exists, but task is in error state.
- If silent mode is turned on, then this log will not appear.
- If silent mode is turned on, then this log will not appear.

## 1.43.0 - 2024/08/09

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public Bucket(int minPriority, int maxPriority) {
}
}

// Optimization to avoid waiting behind a lock.
// Optimization to avoid waiting behind a lock, but also reducing the amount of signals and context switches.
// One ongoing version update is enough to wake up all necessary components.
private AtomicBoolean versionUpdateInProgress = new AtomicBoolean();

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,6 @@ protected void markAsError(IBaseTask task, String bucketId) {

protected ProcessTaskResponse grabTaskForProcessing(String bucketId, BaseTask task) {
GlobalProcessingState.Bucket bucket = globalProcessingState.getBuckets().get(bucketId);
BucketProperties bucketProperties = bucketsManager.getBucketProperties(bucketId);

ITaskHandler taskHandler = taskHandlerRegistry.getTaskHandler(task);
if (taskHandler == null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -474,6 +474,8 @@ private void commitOffsetsWithLowFrequency(ConsumerBucket consumerBucket) {
.map(e -> e.getKey() + ":" + e.getValue().offset()).collect(Collectors.joining(", ")));
}

releaseCompletedOffsets(consumerBucket);

consumerBucket.getKafkaConsumer().commitAsync(consumerBucket.getOffsetsToBeCommitted(), (map, e) -> {
if (e != null) {
coreMetricsTemplate.registerKafkaTasksExecutionTriggererCommit(bucketId, false, false);
Expand Down

0 comments on commit ff2e225

Please sign in to comment.