Skip to content

Commit

Permalink
fix: stop flow manager after calling revoke handlers
Browse files Browse the repository at this point in the history
  • Loading branch information
massada committed Jul 20, 2024
1 parent 433f337 commit e37c8b6
Showing 1 changed file with 1 addition and 1 deletion.
2 changes: 1 addition & 1 deletion src/KafkaFlow/Consumers/Consumer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -242,11 +242,11 @@ private void EnsureConsumer()
.SetPartitionsRevokedHandler(
(consumer, partitions) =>
{
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
this.Assignment = new List<TopicPartition>();
this.Subscription = new List<string>();
_currentPartitionsOffsets.Clear();
_flowManager.Stop();
_partitionsRevokedHandlers.ForEach(handler => handler(_dependencyResolver, consumer, partitions));
})
.SetErrorHandler((consumer, error) => _errorsHandlers.ForEach(x => x(consumer, error)))
.SetStatisticsHandler((consumer, statistics) => _statisticsHandlers.ForEach(x => x(consumer, statistics)));
Expand Down

0 comments on commit e37c8b6

Please sign in to comment.