From 1cff2568a195e545245ec3e996227c77d7bf77c6 Mon Sep 17 00:00:00 2001 From: Yann ROBIN Date: Fri, 22 Dec 2023 23:40:18 +0100 Subject: [PATCH 1/2] fix: handle duplicated declaration of topic --- src/KafkaFlow/Clusters/ClusterManager.cs | 28 ++++++++++++++++++++++-- 1 file changed, 26 insertions(+), 2 deletions(-) diff --git a/src/KafkaFlow/Clusters/ClusterManager.cs b/src/KafkaFlow/Clusters/ClusterManager.cs index 28e7381f0..0ddbb9eff 100644 --- a/src/KafkaFlow/Clusters/ClusterManager.cs +++ b/src/KafkaFlow/Clusters/ClusterManager.cs @@ -99,8 +99,25 @@ public async Task> GetConsumerGroupOffsetsAsyn public async Task CreateIfNotExistsAsync(IEnumerable configurations) { try - { - var topics = configurations + { + var duplicatedTopics = configurations + .GroupBy(c => c, new DuplicateTopicConfigurationEqualityComparer()) + .Where(g => g.Count() > 1) + .Select(g => g.Key.Name) + .ToList(); + + foreach (var duplicatedTopic in duplicatedTopics) + { + _logHandler.Warning( + "Topic {Topic} declaration is duplicated. First one configuration will be used", + new + { + duplicatedTopic, + }); + } + + var topics = configurations + .Distinct(new DuplicateTopicConfigurationEqualityComparer()) .Select( topicConfiguration => new TopicSpecification { @@ -152,5 +169,12 @@ public void Dispose() { _lazyAdminClient.Value.Dispose(); } + } + + private class DuplicateTopicConfigurationEqualityComparer : IEqualityComparer + { + public bool Equals(TopicConfiguration x, TopicConfiguration y) => x?.Name == y?.Name; + + public int GetHashCode(TopicConfiguration obj) => obj.Name.GetHashCode(); } } From b4e2bd0cfbc9c631c683becb5aca1b96c30ec36d Mon Sep 17 00:00:00 2001 From: Yann ROBIN Date: Fri, 22 Dec 2023 23:55:05 +0100 Subject: [PATCH 2/2] fix: warning message for dedup --- src/KafkaFlow/Clusters/ClusterManager.cs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/KafkaFlow/Clusters/ClusterManager.cs b/src/KafkaFlow/Clusters/ClusterManager.cs index 0ddbb9eff..88aaf17ae 100644 --- a/src/KafkaFlow/Clusters/ClusterManager.cs +++ b/src/KafkaFlow/Clusters/ClusterManager.cs @@ -109,7 +109,7 @@ public async Task CreateIfNotExistsAsync(IEnumerable configu foreach (var duplicatedTopic in duplicatedTopics) { _logHandler.Warning( - "Topic {Topic} declaration is duplicated. First one configuration will be used", + "Topic {Topic} declaration is duplicated. First topic configuration will be used", new { duplicatedTopic,