Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Release #8

Merged
merged 3 commits into from
Mar 12, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
31 changes: 0 additions & 31 deletions .vscode/settings.json
Original file line number Diff line number Diff line change
Expand Up @@ -22,31 +22,6 @@
"**/node_modules": true,
"**/build": true
},
"[typescript]": {
"editor.defaultFormatter": "esbenp.prettier-vscode",
"editor.codeActionsOnSave": {
"source.organizeImports": true,
"source.fixAll": true
}
},
"[typescriptreact]": {
"editor.defaultFormatter": "esbenp.prettier-vscode",
"editor.codeActionsOnSave": {
"source.organizeImports": true,
"source.fixAll": true
}
},
"typescript.updateImportsOnFileMove.enabled": "always",
"eslint.enable": true,
"eslint.lintTask.enable": true,
"eslint.workingDirectories": [
{
"mode": "auto"
}
],
"files.associations": {
"*.json": "jsonc"
},
"files.exclude": {
"**/.git": true,
"**/.hg": true,
Expand All @@ -58,10 +33,4 @@
"editor.tabSize": 2,
"editor.insertSpaces": true,
"editor.detectIndentation": false,
"[typescript][typescriptreact]": {
"editor.codeActionsOnSave": {
"source.organizeImports": "explicit",
"source.fixAll": "explicit"
}
},
}
16 changes: 10 additions & 6 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,14 @@ var options = new ProducerBuildOptions
};
```

Then create a new instance of kafka producer use `BuildWithSuperstream()`:
Then create a new instance of kafka producer and use `SuperstreamInitializer.Init` to initialize the producer with `Superstream` options:

```c#
using var producer = new ProducerBuilder<string?, byte[]>(config)
.BuildWithSuperstream(options);
var kafkaProducer = new ProducerBuilder<string?, byte[]>(config)
.Build();
using var producer = SuperstreamInitializer.Init(kafkaProducer, options);
```

Finally, to produce messages to kafka, use `ProduceAsync` or `Produce`:

```c#
Expand All @@ -57,11 +59,13 @@ var options = new ConsumerBuildOptions
};
```

Then create a new instance of kafka consumer use `BuildWithSuperstream()`:
Then create a new instance of kafka consumer and use `SuperstreamInitializer.Init` to initialize the consumer with `Superstream` options:

```c#
using var consumer = new ConsumerBuilder<Ignore, byte[]>(config)
.BuildWithSuperstream(options);
var kafkaConsumer = new ConsumerBuilder<Ignore, byte[]>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.Build();
using var consumer = SuperstreamInitializer.Init(kafkaConsumer, options);
```

Finally, to consume messages from kafka, use `Consume`:
Expand Down
11 changes: 6 additions & 5 deletions examples/Consumer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
var token = "<superstream-token>";
var host = "<superstream-host>";
string brokerList = "<brokers>";
var topics = new List<string> {"test"};
var topics = new List<string> { "test" };

Console.WriteLine($"Started consumer, Ctrl-C to stop consuming");

Expand All @@ -16,9 +16,9 @@
GroupId = "cg",
BootstrapServers = brokerList,
EnableAutoCommit = false,
SaslPassword= "...",
SaslPassword = "...",
SaslUsername = "...",
SecurityProtocol = SecurityProtocol.SaslSsl,
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain
};
var options = new ConsumerBuildOptions
Expand All @@ -28,9 +28,10 @@
ConsumerConfig = config
};

using var consumer = new ConsumerBuilder<Ignore, byte[]>(config)
var kafkaConsumer = new ConsumerBuilder<Ignore, byte[]>(config)
.SetErrorHandler((_, e) => Console.WriteLine($"Error: {e.Reason}"))
.BuildWithSuperstream(options);
.Build();
using var consumer = SuperstreamInitializer.Init(kafkaConsumer, options);

// consume by specific partition
consumer.Assign(topics.Select(topic => new TopicPartitionOffset(topic, 1, Offset.Beginning)).ToList());
Expand Down
25 changes: 15 additions & 10 deletions examples/Producer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -6,15 +6,17 @@

var token = "<superstream-token>";
var host = "<superstream-host>";
string brokerList = "<brokers>";
string brokerList = "";
string topicName = "topic_1";

var config = new ProducerConfig {
BootstrapServers = brokerList,
SaslPassword= "...",
SaslUsername= "...",
SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain};
var config = new ProducerConfig
{
BootstrapServers = brokerList,
// SaslPassword= "...",
// SaslUsername= "...",
// SecurityProtocol = SecurityProtocol.SaslSsl,
SaslMechanism = SaslMechanism.Plain
};

var options = new ProducerBuildOptions
{
Expand All @@ -23,8 +25,11 @@
ProducerConfig = config,
LearningFactor = 250 // optional
};
using var producer = new ProducerBuilder<string?, byte[]>(config)
.BuildWithSuperstream(options);

var kafkaProducer = new ProducerBuilder<string?, byte[]>(config)
.Build();
using var producer = SuperstreamInitializer.Init(kafkaProducer, options);

Console.WriteLine("\n-----------------------------------------------------------------------");
Console.WriteLine($"Producer {producer.Name} producing on topic {topicName}.");
Console.WriteLine("-----------------------------------------------------------------------");
Expand All @@ -49,7 +54,7 @@
var deliveryReport = await producer.ProduceAsync(
topicName, new() { Key = key, Value = JsonSerializer.SerializeToUtf8Bytes(person) });

await producer.ProduceAsync("<topic>", new() { Value = Encoding.UTF8.GetBytes("{\"test_key\":\"test_value\"}")});
await producer.ProduceAsync("<topic>", new() { Value = Encoding.UTF8.GetBytes("{\"test_key\":\"test_value\"}") });
Console.WriteLine($"Delivered to : {deliveryReport.TopicPartitionOffset}");

}
Expand Down
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace Superstream;

public static class SuperstreamInitializerExtensions
internal static class SuperstreamInitializerExtensions
{
public static IProducer<TKey, TValue> BuildWithSuperstream<TKey, TValue>(
internal static IProducer<TKey, TValue> BuildWithSuperstream<TKey, TValue>(
this ProducerBuilder<TKey, TValue> builder,
ProducerBuildOptions options
)
Expand All @@ -17,7 +17,7 @@ ProducerBuildOptions options
);
}

public static IConsumer<TKey, TValue> BuildWithSuperstream<TKey, TValue>(
internal static IConsumer<TKey, TValue> BuildWithSuperstream<TKey, TValue>(
this ConsumerBuilder<TKey, TValue> builder,
ConsumerBuildOptions options
)
Expand Down
22 changes: 21 additions & 1 deletion src/Superstream/Interceptors/ConsumerInterceptor.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,8 @@
namespace Superstream.Interceptors;

internal class ConsumerInterceptor<TKey, TValue> : DispatchProxy
internal class ConsumerInterceptor<TKey, TValue> : DispatchProxy, IDisposable
{
private bool _disposed = false;
#nullable disable
public IConsumer<TKey, TValue> Target { get; set; }
public SuperstreamClient Client { get; set; }
Expand All @@ -20,7 +21,7 @@
if (typeof(TValue) != typeof(byte[]))
return result;
int partition = -1;
OnConsume(result as ConsumeResult<TKey, byte[]>, partition);

Check warning on line 24 in src/Superstream/Interceptors/ConsumerInterceptor.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'result' in 'void ConsumerInterceptor<TKey, TValue>.OnConsume(ConsumeResult<TKey, byte[]> result, int partition)'.
return result;
}

Expand Down Expand Up @@ -133,4 +134,23 @@
return proxy as IConsumer<K, V>
?? throw new InvalidOperationException(typeof(IConsumer<K, V>).Name);
}

public void Dispose()
{
Dispose(true);
GC.SuppressFinalize(this);
}

public void Dispose(bool disposing)
{
if (_disposed)
{
return;
}
if (disposing)
{
Target.Dispose();
}
_disposed = true;
}
}
25 changes: 24 additions & 1 deletion src/Superstream/Interceptors/ProducerInterceptor.cs
Original file line number Diff line number Diff line change
@@ -1,7 +1,11 @@
namespace Superstream.Interceptors;

internal class ProducerInterceptor<TKey, TValue> : DispatchProxy
internal class ProducerInterceptor<TKey, TValue> : DispatchProxy, IDisposable
{
private bool _disposed;



#nullable disable
public IProducer<TKey, TValue> Target { get; set; }
public SuperstreamClient Client { get; set; }
Expand All @@ -20,16 +24,16 @@
if (typeof(TValue) != typeof(byte[]))
return targetMethod?.Invoke(Target, args);

int partition = default;

Check warning on line 27 in src/Superstream/Interceptors/ProducerInterceptor.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.

Check warning on line 27 in src/Superstream/Interceptors/ProducerInterceptor.cs

View workflow job for this annotation

GitHub Actions / build

Dereference of a possibly null reference.
string topic = string.Empty;
if (args[0].GetType() == typeof(TopicPartition))
{
partition = (args[0] as TopicPartition)!.Partition;
topic = (args[0] as TopicPartition)!.Topic;
}
else

Check warning on line 34 in src/Superstream/Interceptors/ProducerInterceptor.cs

View workflow job for this annotation

GitHub Actions / build

Converting null literal or possible null value to non-nullable type.
{
topic = args[0] as string;

Check warning on line 36 in src/Superstream/Interceptors/ProducerInterceptor.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'topic' in 'void ProducerInterceptor<TKey, TValue>.OnSend(string topic, Message<TKey, byte[]> message, int partition = 0)'.

Check warning on line 36 in src/Superstream/Interceptors/ProducerInterceptor.cs

View workflow job for this annotation

GitHub Actions / build

Possible null reference argument for parameter 'message' in 'void ProducerInterceptor<TKey, TValue>.OnSend(string topic, Message<TKey, byte[]> message, int partition = 0)'.
}
OnSend(topic, args[1] as Message<TKey, byte[]>, partition);
var result = targetMethod?.Invoke(Target, args);
Expand Down Expand Up @@ -96,7 +100,7 @@
Client.ProducerProtoDescriptor.MasterMessageName
)
.GetAwaiter()
.GetResult() ?? throw new Exception("Failed to serialize message");

Check warning on line 103 in src/Superstream/Interceptors/ProducerInterceptor.cs

View workflow job for this annotation

GitHub Actions / build

The variable 'ex' is declared but never used
}
catch (Exception ex)
{
Expand Down Expand Up @@ -148,4 +152,23 @@
return proxy as IProducer<K, V>
?? throw new InvalidOperationException(typeof(IProducer<K, V>).Name);
}

public void Dispose()
{
Dispose(disposing: true);
GC.SuppressFinalize(this);
}

protected virtual void Dispose(bool disposing)
{
if (!_disposed)
{
if (disposing)
{
Target.Dispose();
}

_disposed = true;
}
}
}
4 changes: 2 additions & 2 deletions src/Superstream/Superstream.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -7,14 +7,14 @@
<LangVersion>12</LangVersion>

<PackageId>Superstream</PackageId>
<Version>1.0.0</Version>
<Version>1.0.1</Version>
<Authors>Memphis.dev team</Authors>
<Company>Memphis.dev</Company>
<PackageTags>Superstream</PackageTags>
<PackageReadmeFile>README.md</PackageReadmeFile>
<PackageLicenseFile>LICENSE</PackageLicenseFile>
<EnablePackageValidation>false</EnablePackageValidation>
<PackageValidationBaselineVersion>1.0.0</PackageValidationBaselineVersion>
<PackageValidationBaselineVersion>1.0.1</PackageValidationBaselineVersion>

</PropertyGroup>

Expand Down
39 changes: 34 additions & 5 deletions src/Superstream/SuperstreamInitializer.cs
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
namespace Superstream;

internal static class SuperstreamInitializer
public static class SuperstreamInitializer
{
public static void Init<TKey, TValue>(
internal static void Init<TKey, TValue>(
ref IProducer<TKey, TValue> producer,
ProducerBuildOptions options
)
Expand All @@ -17,7 +17,7 @@ ProducerBuildOptions options
);
}

public static void Init<TKey, TValue>(
internal static void Init<TKey, TValue>(
ref IConsumer<TKey, TValue> consumer,
ConsumerBuildOptions options
)
Expand All @@ -31,19 +31,48 @@ ConsumerBuildOptions options
);
}

public static IConsumer<TKey, TValue> Init<TKey, TValue>(
internal static IConsumer<TKey, TValue> Init<TKey, TValue>(
ConsumerBuilder<TKey, TValue> builder,
ConsumerBuildOptions options
)
{
return builder.BuildWithSuperstream(options);
}

public static IProducer<TKey, TValue> Init<TKey, TValue>(
internal static IProducer<TKey, TValue> Init<TKey, TValue>(
ProducerBuilder<TKey, TValue> builder,
ProducerBuildOptions options
)
{
return builder.BuildWithSuperstream(options);
}

public static IProducer<K, V> Init<K, V>(
IProducer<K, V> target,
ProducerBuildOptions options
)
{
options.EnsureIsValid();
return ProducerInterceptor<K, V>.Init(
target,
options.ProducerConfig,
options.Token,
options.Host,
options.LearningFactor
);
}

public static IConsumer<K, V> Init<K, V>(
IConsumer<K, V> target,
ConsumerBuildOptions options
)
{
return ConsumerInterceptor<K, V>.Init(
target,
options.ConsumerConfig,
options.Token,
options.Host,
options.LearningFactor
);
}
}
2 changes: 1 addition & 1 deletion src/Superstream/Utils/Constants.cs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,6 @@ internal static class Subjects

internal static class SdkInfo
{
public const string SdkVersion = "1.0.0";
public const string SdkVersion = "1.0.1";
public const string SdkLanguage = "C#";
}
2 changes: 1 addition & 1 deletion version.conf
Original file line number Diff line number Diff line change
@@ -1 +1 @@
1.0.0
1.0.1
Loading