Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Introduce simplified producer syntax #2148

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,8 @@ class Program
try
{
var dr = await p.ProduceAsync("test-topic", new Message<Null, string> { Value="test" });
// or simplified syntax
var dr = await p.ProduceAsync("test-topic", "test");
Console.WriteLine($"Delivered '{dr.Value}' to '{dr.TopicPartitionOffset}'");
}
catch (ProduceException<Null, string> e)
Expand Down Expand Up @@ -136,6 +138,8 @@ class Program
for (int i=0; i<100; ++i)
{
p.Produce("my-topic", new Message<Null, string> { Value = i.ToString() }, handler);
// or simplified syntax
p.Produce("my-topic", i.ToString(), handler);
}

// wait for up to 10 seconds for any inflight messages to be delivered.
Expand Down
15 changes: 6 additions & 9 deletions examples/AvroBlogExamples/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ async static Task ProduceGeneric(string bootstrapServers, string schemaRegistryU
record.Add("Message", "a test log message");
record.Add("Severity", new GenericEnum(logLevelSchema, "Error"));
await producer
.ProduceAsync("log-messages", new Message<Null, GenericRecord> { Value = record })
.ProduceAsync("log-messages", record)
.ContinueWith(task => Console.WriteLine(
task.IsFaulted
? $"error producing message: {task.Exception.Message}"
Expand All @@ -75,15 +75,12 @@ async static Task ProduceSpecific(string bootstrapServers, string schemaRegistry
.Build())
{
await producer.ProduceAsync("log-messages",
new Message<Null, MessageTypes.LogMessage>
new MessageTypes.LogMessage
{
Value = new MessageTypes.LogMessage
{
IP = "192.168.0.1",
Message = "a test message 2",
Severity = MessageTypes.LogLevel.Info,
Tags = new Dictionary<string, string> { { "location", "CA" } }
}
IP = "192.168.0.1",
Message = "a test message 2",
Severity = MessageTypes.LogLevel.Info,
Tags = new Dictionary<string, string> { { "location", "CA" } }
});

producer.Flush(TimeSpan.FromSeconds(30));
Expand Down
2 changes: 1 addition & 1 deletion examples/AvroGeneric/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ static async Task Main(string[] args)

try
{
var dr = await producer.ProduceAsync(topicName, new Message<string, GenericRecord> { Key = text, Value = record });
var dr = await producer.ProduceAsync(topicName, (text, record));
Console.WriteLine($"produced to: {dr.TopicPartitionOffset}");
}
catch (ProduceException<string, GenericRecord> ex)
Expand Down
2 changes: 1 addition & 1 deletion examples/AvroSpecific/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ static void Main(string[] args)
{
User user = new User { name = text, favorite_color = "green", favorite_number = ++i, hourly_rate = new Avro.AvroDecimal(67.99) };
producer
.ProduceAsync(topicName, new Message<string, User> { Key = text, Value = user })
.ProduceAsync(topicName, (text, user))
.ContinueWith(task =>
{
if (!task.IsFaulted)
Expand Down
8 changes: 4 additions & 4 deletions examples/ConfluentCloud/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -23,9 +23,9 @@ namespace ConfluentCloudExample
/// <summary>
/// This is a simple example demonstrating how to produce a message to
/// Confluent Cloud then read it back again.
///
///
/// https://www.confluent.io/confluent-cloud/
///
///
/// Confluent Cloud does not auto-create topics. You will need to use the ccloud
/// cli to create the dotnet-test-topic topic before running this example. The
/// <ccloud bootstrap servers>, <ccloud key> and <ccloud secret> parameters are
Expand All @@ -51,11 +51,11 @@ static void Main(string[] args)

using (var producer = new ProducerBuilder<Null, string>(pConfig).Build())
{
producer.ProduceAsync("dotnet-test-topic", new Message<Null, string> { Value = "test value" })
producer.ProduceAsync("dotnet-test-topic", "test value")
.ContinueWith(task => task.IsFaulted
? $"error producing message: {task.Exception.Message}"
: $"produced to: {task.Result.TopicPartitionOffset}");

// block until all in-flight produce requests have completed (successfully
// or otherwise) or 10s has elapsed.
producer.Flush(TimeSpan.FromSeconds(10));
Expand Down
6 changes: 3 additions & 3 deletions examples/ExactlyOnce/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -263,7 +263,7 @@ static async Task Generator_LineInputData(string brokerList, string clientId, Ca
foreach (var l in lines)
{
await Task.Delay(TimeSpan.FromSeconds(1), ct); // slow down the calls to produce to make the output more interesting to watch.
await producer.ProduceAsync(Topic_InputLines, new Message<Null, string> { Value = l }, ct); // Note: producing synchronously is slow and should generally be avoided.
await producer.ProduceAsync(Topic_InputLines, l, ct); // Note: producing synchronously is slow and should generally be avoided.
lCount += 1;
if (lCount % 10 == 0)
{
Expand Down Expand Up @@ -406,7 +406,7 @@ static void Processor_MapWords(string brokerList, string clientId, CancellationT
{
try
{
producer.Produce(Topic_Words, new Message<string, Null> { Key = w });
producer.Produce(Topic_Words, (w, null));
// Note: when using transactions, there is no need to check for errors of individual
// produce call delivery reports because if the transaction commits successfully, you
// can be sure that all the constituent messages were delivered successfully and in order.
Expand Down Expand Up @@ -661,7 +661,7 @@ public static void Processor_AggregateWords(string brokerList, string clientId,
count += 1;
WordCountState[cr.Partition].Session.Upsert(key, count);

producer.Produce(Topic_Counts, new Message<string, int> { Key = cr.Message.Key, Value = count });
producer.Produce(Topic_Counts, (cr.Message.Key, count));

wCount += 1;
}
Expand Down
6 changes: 3 additions & 3 deletions examples/ExactlyOnceOldBroker/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -195,7 +195,7 @@ static async Task Generator_LineInputData(string brokerList, CancellationToken c
foreach (var l in lines)
{
await Task.Delay(TimeSpan.FromSeconds(1), ct); // slow down the calls to produce to make the output more interesting to watch.
await producer.ProduceAsync(Topic_InputLines, new Message<Null, string> { Value = l }, ct);
await producer.ProduceAsync(Topic_InputLines, l, ct);
lCount += 1;
if (lCount % 10 == 0)
{
Expand Down Expand Up @@ -330,7 +330,7 @@ static void Processor_MapWords(string brokerList, string clientId, CancellationT
{
try
{
producerState[cr.TopicPartition].Producer.Produce(Topic_Words, new Message<string, Null> { Key = w });
producerState[cr.TopicPartition].Producer.Produce(Topic_Words, (w, null));
// Note: when using transactions, there is no need to check for errors of individual
// produce call delivery reports because if the transaction commits successfully, you
// can be sure that all the constituent messages were delivered successfully and in order.
Expand Down Expand Up @@ -562,7 +562,7 @@ public static void Processor_AggregateWords(string brokerList, string clientId,
try
{
producerState[cr.TopicPartition].Producer.Produce(
Topic_Counts, new Message<string, int> { Key = cr.Message.Key, Value = updatedV });
Topic_Counts, (cr.Message.Key, updatedV));
}
catch (KafkaException e)
{
Expand Down
12 changes: 6 additions & 6 deletions examples/JsonSerialization/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -26,15 +26,15 @@


/// <summary>
/// An example of working with JSON data, Apache Kafka and
/// An example of working with JSON data, Apache Kafka and
/// Confluent Schema Registry (v5.5 or later required for
/// JSON schema support).
/// </summary>
namespace Confluent.Kafka.Examples.JsonSerialization
{
/// <summary>
/// A POCO class corresponding to the JSON data written
/// to Kafka, where the schema is implicitly defined through
/// to Kafka, where the schema is implicitly defined through
/// the class properties and their attributes.
/// </summary>
/// <remarks>
Expand Down Expand Up @@ -83,7 +83,7 @@ static async Task Main(string[] args)
var schemaRegistryConfig = new SchemaRegistryConfig
{
// Note: you can specify more than one schema registry url using the
// schema.registry.url property for redundancy (comma separated list).
// schema.registry.url property for redundancy (comma separated list).
// The property name is not plural to follow the convention set by
// the Java implementation.
Url = schemaRegistryUrl
Expand Down Expand Up @@ -149,11 +149,11 @@ static async Task Main(string[] args)
while ((text = Console.ReadLine()) != "q")
{
User user = new User { Name = text, FavoriteColor = "blue", FavoriteNumber = i++ };
try
try
{
await producer.ProduceAsync(topicName, new Message<string, User> { Value = user });
await producer.ProduceAsync(topicName, user);
}
catch (Exception e)
catch (Exception e)
{
Console.WriteLine($"error producing message: {e.Message}");
}
Expand Down
6 changes: 1 addition & 5 deletions examples/JsonWithReferences/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,7 @@ static async Task Main(string[] args)
};
try
{
await producer.ProduceAsync(topicName, new Message<long, Product>
{
Key = product.ProductId,
Value = product
});
await producer.ProduceAsync(topicName, (product.ProductId, product));
}
catch (Exception e)
{
Expand Down
2 changes: 1 addition & 1 deletion examples/OAuthOIDC/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ public static async Task Main(string[] args)
var msg = Guid.NewGuid().ToString();
try
{
var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Value = msg });
var deliveryReport = await producer.ProduceAsync(topicName, msg);
Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}, {msg}");
}
catch (ProduceException<string, string> e)
Expand Down
2 changes: 1 addition & 1 deletion examples/OAuthProducer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -113,7 +113,7 @@ void OauthCallback(IClient client, string cfg)

try
{
var deliveryReport = await producer.ProduceAsync(topicName, new Message<string, string> { Value = msg });
var deliveryReport = await producer.ProduceAsync(topicName, msg);
Console.WriteLine($"Produced message to {deliveryReport.TopicPartitionOffset}, {msg}");
}
catch (ProduceException<string, string> e)
Expand Down
2 changes: 1 addition & 1 deletion examples/Producer/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,7 @@ public static async Task Main(string[] args)
// from proceeding until the acknowledgement from the broker is received (at the
// expense of low throughput).
var deliveryReport = await producer.ProduceAsync(
topicName, new Message<string, string> { Key = key, Value = val });
topicName, (key, val));

Console.WriteLine($"delivered to: {deliveryReport.TopicPartitionOffset}");
}
Expand Down
2 changes: 1 addition & 1 deletion examples/Protobuf/Program.cs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ static async Task Main(string[] args)
{
User user = new User { Name = text, FavoriteColor = "green", FavoriteNumber = i++ };
await producer
.ProduceAsync(topicName, new Message<string, User> { Key = text, Value = user })
.ProduceAsync(topicName, (text, user))
.ContinueWith(task => task.IsFaulted
? $"error producing message: {task.Exception.Message}"
: $"produced to: {task.Result.TopicPartitionOffset}");
Expand Down
1 change: 1 addition & 0 deletions src/Confluent.Kafka/Confluent.Kafka.csproj
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,7 @@
<PackageReference Include="System.Runtime.InteropServices" Version="4.3.0" />
<PackageReference Include="System.Runtime.Extensions" Version="4.3.0" />
<PackageReference Include="System.Threading" Version="4.3.0" />
<PackageReference Include="System.ValueTuple" Version="4.5.0" />
</ItemGroup>

</Project>
Loading