From 4431a0adc84ae0ea3bdf2f286d822dfff40c10f6 Mon Sep 17 00:00:00 2001 From: Bazen <49089563+bazen-teklehaymanot@users.noreply.github.com> Date: Wed, 13 Mar 2024 23:45:04 +0200 Subject: [PATCH] Updated consumer and producer build options --- README.md | 18 ++------ examples/Consumer/Program.cs | 9 +--- examples/Producer/Program.cs | 7 +-- src/Superstream/BuildOptions.cs | 13 +++--- src/Superstream/SuperstreamInitializer.cs | 53 ++++------------------- 5 files changed, 23 insertions(+), 77 deletions(-) diff --git a/README.md b/README.md index 1405ca3..643ec80 100644 --- a/README.md +++ b/README.md @@ -18,12 +18,7 @@ To use `Superstream` with kafka producer, first define the producer configuratio ```c# var config = new ProducerConfig { BootstrapServers = brokerList }; -var options = new ProducerBuildOptions -{ - Token = "", - Host = "", - ProducerConfig = config -}; +var options = new ProducerBuildOptions(config); ``` Then create a new instance of kafka producer and use `SuperstreamInitializer.Init` to initialize the producer with `Superstream` options: @@ -31,7 +26,7 @@ Then create a new instance of kafka producer and use `SuperstreamInitializer.Ini ```c# var kafkaProducer = new ProducerBuilder(config) .Build(); -using var producer = SuperstreamInitializer.Init(kafkaProducer, options); +using var producer = SuperstreamInitializer.Init("", "", kafkaProducer, options); ``` Finally, to produce messages to kafka, use `ProduceAsync` or `Produce`: @@ -51,12 +46,7 @@ var config = new ConsumerConfig BootstrapServers = brokerList, EnableAutoCommit = false }; -var options = new ConsumerBuildOptions -{ - Token = "", - Host = "", - ConsumerConfig = config -}; +var options = new ConsumerBuildOptions(config); ``` Then create a new instance of kafka consumer and use `SuperstreamInitializer.Init` to initialize the consumer with `Superstream` options: @@ -65,7 +55,7 @@ Then create a new instance of kafka consumer and use `SuperstreamInitializer.Ini var kafkaConsumer = new ConsumerBuilder(config) .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) .Build(); -using var consumer = SuperstreamInitializer.Init(kafkaConsumer, options); +using var consumer = SuperstreamInitializer.Init("", "",kafkaConsumer, options); ``` Finally, to consume messages from kafka, use `Consume`: diff --git a/examples/Consumer/Program.cs b/examples/Consumer/Program.cs index 51fa11c..5c8cab8 100644 --- a/examples/Consumer/Program.cs +++ b/examples/Consumer/Program.cs @@ -21,17 +21,12 @@ SecurityProtocol = SecurityProtocol.SaslSsl, SaslMechanism = SaslMechanism.Plain }; -var options = new ConsumerBuildOptions -{ - Token = token, - Host = host, - ConsumerConfig = config -}; +var options = new ConsumerBuildOptions(config); var kafkaConsumer = new ConsumerBuilder(config) .SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}")) .Build(); -using var consumer = SuperstreamInitializer.Init(kafkaConsumer, options); +using var consumer = SuperstreamInitializer.Init(token, host, kafkaConsumer, options); // consume by specific partition consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 1, Offset.Beginning)).ToList()); diff --git a/examples/Producer/Program.cs b/examples/Producer/Program.cs index 39627be..bc1db9d 100644 --- a/examples/Producer/Program.cs +++ b/examples/Producer/Program.cs @@ -18,17 +18,14 @@ SaslMechanism = SaslMechanism.Plain }; -var options = new ProducerBuildOptions +var options = new ProducerBuildOptions(config) { - Token = token, - Host = host, - ProducerConfig = config, LearningFactor = 250 // optional }; var kafkaProducer = new ProducerBuilder(config) .Build(); -using var producer = SuperstreamInitializer.Init(kafkaProducer, options); +using var producer = SuperstreamInitializer.Init(token, host, kafkaProducer, options); Console.WriteLine("\n-----------------------------------------------------------------------"); Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}."); diff --git a/src/Superstream/BuildOptions.cs b/src/Superstream/BuildOptions.cs index 630d699..87cb9aa 100644 --- a/src/Superstream/BuildOptions.cs +++ b/src/Superstream/BuildOptions.cs @@ -2,9 +2,10 @@ namespace Superstream; public class BuildOptions { - public string Token { get; set; } = null!; - public string Host { get; set; } = null!; + internal string Token { get; set; } = null!; + public string Host { get; internal set; } = null!; public int LearningFactor { get; set; } = 0; + public string ConsumerGroup { get; set; } = string.Empty; internal virtual void EnsureIsValid() { @@ -15,9 +16,9 @@ internal virtual void EnsureIsValid() } } -public sealed class ProducerBuildOptions : BuildOptions +public sealed class ProducerBuildOptions(ProducerConfig producerConfig) : BuildOptions { - public ProducerConfig ProducerConfig { get; set; } = null!; + public ProducerConfig ProducerConfig { get; set; } = producerConfig; internal override void EnsureIsValid() { @@ -27,9 +28,9 @@ internal override void EnsureIsValid() } } -public sealed class ConsumerBuildOptions : BuildOptions +public sealed class ConsumerBuildOptions(ConsumerConfig consumerConfig) : BuildOptions { - public ConsumerConfig ConsumerConfig { get; set; } = null!; + public ConsumerConfig ConsumerConfig { get; set; } = consumerConfig; internal override void EnsureIsValid() { diff --git a/src/Superstream/SuperstreamInitializer.cs b/src/Superstream/SuperstreamInitializer.cs index 3fd33d2..591c284 100644 --- a/src/Superstream/SuperstreamInitializer.cs +++ b/src/Superstream/SuperstreamInitializer.cs @@ -2,56 +2,15 @@ public static class SuperstreamInitializer { - internal static void Init( - ref IProducer producer, - ProducerBuildOptions options - ) - { - options.EnsureIsValid(); - producer = ProducerInterceptor.Init( - producer, - options.ProducerConfig, - options.Token, - options.Host, - options.LearningFactor - ); - } - - internal static void Init( - ref IConsumer consumer, - ConsumerBuildOptions options - ) - { - consumer = ConsumerInterceptor.Init( - consumer, - options.ConsumerConfig, - options.Token, - options.Host, - options.LearningFactor - ); - } - - internal static IConsumer Init( - ConsumerBuilder builder, - ConsumerBuildOptions options - ) - { - return builder.BuildWithSuperstream(options); - } - - internal static IProducer Init( - ProducerBuilder builder, - ProducerBuildOptions options - ) - { - return builder.BuildWithSuperstream(options); - } - public static IProducer Init( + string token, + string host, IProducer target, ProducerBuildOptions options ) { + options.Token = token; + options.Host = host; options.EnsureIsValid(); return ProducerInterceptor.Init( target, @@ -63,10 +22,14 @@ ProducerBuildOptions options } public static IConsumer Init( + string token, + string host, IConsumer target, ConsumerBuildOptions options ) { + options.Token = token; + options.Host = host; return ConsumerInterceptor.Init( target, options.ConsumerConfig,