diff --git a/src/KafkaFlow/Clusters/ClusterManager.cs b/src/KafkaFlow/Clusters/ClusterManager.cs index 28e7381f0..88aaf17ae 100644 --- a/src/KafkaFlow/Clusters/ClusterManager.cs +++ b/src/KafkaFlow/Clusters/ClusterManager.cs @@ -99,8 +99,25 @@ public async Task> GetConsumerGroupOffsetsAsyn public async Task CreateIfNotExistsAsync(IEnumerable configurations) { try - { - var topics = configurations + { + var duplicatedTopics = configurations + .GroupBy(c => c, new DuplicateTopicConfigurationEqualityComparer()) + .Where(g => g.Count() > 1) + .Select(g => g.Key.Name) + .ToList(); + + foreach (var duplicatedTopic in duplicatedTopics) + { + _logHandler.Warning( + "Topic {Topic} declaration is duplicated. First topic configuration will be used", + new + { + duplicatedTopic, + }); + } + + var topics = configurations + .Distinct(new DuplicateTopicConfigurationEqualityComparer()) .Select( topicConfiguration => new TopicSpecification { @@ -152,5 +169,12 @@ public void Dispose() { _lazyAdminClient.Value.Dispose(); } + } + + private class DuplicateTopicConfigurationEqualityComparer : IEqualityComparer + { + public bool Equals(TopicConfiguration x, TopicConfiguration y) => x?.Name == y?.Name; + + public int GetHashCode(TopicConfiguration obj) => obj.Name.GetHashCode(); } }