Skip to content

Commit

Permalink
Merge pull request #11 from bazen-teklehaymanot/bugfix
Browse files Browse the repository at this point in the history
Updated consumer and producer build options
  • Loading branch information
idanasulin2706 authored Mar 14, 2024
2 parents d01b800 + 4431a0a commit f03922c
Show file tree
Hide file tree
Showing 5 changed files with 23 additions and 77 deletions.
18 changes: 4 additions & 14 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -18,20 +18,15 @@ To use `Superstream` with kafka producer, first define the producer configuratio

```c#
var config = new ProducerConfig { BootstrapServers = brokerList };
var options = new ProducerBuildOptions
{
Token = "<superstream-token>",
Host = "<superstream-host>",
ProducerConfig = config
};
var options = new ProducerBuildOptions(config);
```

Then create a new instance of kafka producer and use `SuperstreamInitializer.Init` to initialize the producer with `Superstream` options:

```c#
var kafkaProducer = new ProducerBuilder<string?, byte[]>(config)
.Build();
using var producer = SuperstreamInitializer.Init(kafkaProducer, options);
using var producer = SuperstreamInitializer.Init("<superstream-token>", "<superstream-host>", kafkaProducer, options);
```

Finally, to produce messages to kafka, use `ProduceAsync` or `Produce`:
Expand All @@ -51,12 +46,7 @@ var config = new ConsumerConfig
BootstrapServers = brokerList,
EnableAutoCommit = false
};
var options = new ConsumerBuildOptions
{
Token = "<superstream-token>",
Host = "<superstream-host>",
ConsumerConfig = config
};
var options = new ConsumerBuildOptions(config);
```

Then create a new instance of kafka consumer and use `SuperstreamInitializer.Init` to initialize the consumer with `Superstream` options:
Expand All @@ -65,7 +55,7 @@ Then create a new instance of kafka consumer and use `SuperstreamInitializer.Ini
var kafkaConsumer = new ConsumerBuilder<Ignore, byte[]>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
using var consumer = SuperstreamInitializer.Init(kafkaConsumer, options);
using var consumer = SuperstreamInitializer.Init("<superstream-token>", "<superstream-host>",kafkaConsumer, options);
```

Finally, to consume messages from kafka, use `Consume`:
Expand Down
9 changes: 2 additions & 7 deletions examples/Consumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -21,17 +21,12 @@
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain
};
var options = new ConsumerBuildOptions
{
Token = token,
Host = host,
ConsumerConfig = config
};
var options = new ConsumerBuildOptions(config);

var kafkaConsumer = new ConsumerBuilder<Ignore, byte[]>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
using var consumer = SuperstreamInitializer.Init(kafkaConsumer, options);
using var consumer = SuperstreamInitializer.Init(token, host, kafkaConsumer, options);

// consume by specific partition
consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 1, Offset.Beginning)).ToList());
Expand Down
7 changes: 2 additions & 5 deletions examples/Producer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -18,17 +18,14 @@
SaslMechanism = SaslMechanism.Plain
};

var options = new ProducerBuildOptions
var options = new ProducerBuildOptions(config)
{
Token = token,
Host = host,
ProducerConfig = config,
LearningFactor = 250 // optional
};

var kafkaProducer = new ProducerBuilder<string?, byte[]>(config)
.Build();
using var producer = SuperstreamInitializer.Init(kafkaProducer, options);
using var producer = SuperstreamInitializer.Init(token, host, kafkaProducer, options);

Console.WriteLine("\n-----------------------------------------------------------------------");
Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
Expand Down
13 changes: 7 additions & 6 deletions src/Superstream/BuildOptions.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,10 @@ namespace Superstream;

public class BuildOptions
{
public string Token { get; set; } = null!;
public string Host { get; set; } = null!;
internal string Token { get; set; } = null!;
public string Host { get; internal set; } = null!;
public int LearningFactor { get; set; } = 0;
public string ConsumerGroup { get; set; } = string.Empty;

internal virtual void EnsureIsValid()
{
Expand All @@ -15,9 +16,9 @@ internal virtual void EnsureIsValid()
}
}

public sealed class ProducerBuildOptions : BuildOptions
public sealed class ProducerBuildOptions(ProducerConfig producerConfig) : BuildOptions
{
public ProducerConfig ProducerConfig { get; set; } = null!;
public ProducerConfig ProducerConfig { get; set; } = producerConfig;

internal override void EnsureIsValid()
{
Expand All @@ -27,9 +28,9 @@ internal override void EnsureIsValid()
}
}

public sealed class ConsumerBuildOptions : BuildOptions
public sealed class ConsumerBuildOptions(ConsumerConfig consumerConfig) : BuildOptions
{
public ConsumerConfig ConsumerConfig { get; set; } = null!;
public ConsumerConfig ConsumerConfig { get; set; } = consumerConfig;

internal override void EnsureIsValid()
{
Expand Down
53 changes: 8 additions & 45 deletions src/Superstream/SuperstreamInitializer.cs
Original file line number Diff line number Diff line change
Expand Up @@ -2,56 +2,15 @@

public static class SuperstreamInitializer
{
internal static void Init<TKey, TValue>(
ref IProducer<TKey, TValue> producer,
ProducerBuildOptions options
)
{
options.EnsureIsValid();
producer = ProducerInterceptor<TKey, TValue>.Init(
producer,
options.ProducerConfig,
options.Token,
options.Host,
options.LearningFactor
);
}

internal static void Init<TKey, TValue>(
ref IConsumer<TKey, TValue> consumer,
ConsumerBuildOptions options
)
{
consumer = ConsumerInterceptor<TKey, TValue>.Init(
consumer,
options.ConsumerConfig,
options.Token,
options.Host,
options.LearningFactor
);
}

internal static IConsumer<TKey, TValue> Init<TKey, TValue>(
ConsumerBuilder<TKey, TValue> builder,
ConsumerBuildOptions options
)
{
return builder.BuildWithSuperstream(options);
}

internal static IProducer<TKey, TValue> Init<TKey, TValue>(
ProducerBuilder<TKey, TValue> builder,
ProducerBuildOptions options
)
{
return builder.BuildWithSuperstream(options);
}

public static IProducer<K, V> Init<K, V>(
string token,
string host,
IProducer<K, V> target,
ProducerBuildOptions options
)
{
options.Token = token;
options.Host = host;
options.EnsureIsValid();
return ProducerInterceptor<K, V>.Init(
target,
Expand All @@ -63,10 +22,14 @@ ProducerBuildOptions options
}

public static IConsumer<K, V> Init<K, V>(
string token,
string host,
IConsumer<K, V> target,
ConsumerBuildOptions options
)
{
options.Token = token;
options.Host = host;
return ConsumerInterceptor<K, V>.Init(
target,
options.ConsumerConfig,
Expand Down

0 comments on commit f03922c

Please sign in to comment.