From 8aa121259ff6182ae8b67fe1e9dcbdf65793b6bd Mon Sep 17 00:00:00 2001 From: Whit Waldo Date: Mon, 16 Sep 2024 14:47:24 -0500 Subject: [PATCH 1/3] Added .NET implementation detail for streaming subscription support Signed-off-by: Whit Waldo --- .../pubsub/subscription-methods.md | 42 ++++++++++++++++++- 1 file changed, 41 insertions(+), 1 deletion(-) diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md index 436c1629585..7c90a7c58c6 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md @@ -204,7 +204,47 @@ As messages are sent to the given message handler code, there is no concept of r The example below shows the different ways to stream subscribe to a topic. -{{< tabs Go>}} +{{< tabs ".NET" Go>}} + +{{% codetab %}} + +```csharp +using Dapr.Messaging.PublishSubscribe; + +var clientBuilder = new DaprPublishSubscribeClientBuilder(); +var daprMessagingClient = clientBuilder.Build(); + +async Task HandleMessage(TopicMessage message, CancellationToken cancellationToken = default) +{ + try + { + //Do something with the message + Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span)); + + return await Task.FromResult(TopicResponseAction.Success); + } + catch + { + return await Task.FromResult(TopicResponseAction.Retry); + } +} + +//Create a dynamic streaming subscription +var subscription = daprMessagingClient.Register("pubsub", "myTopic", + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(15), TopicResponseAction.Retry)), + HandleMessage, CancellationToken.None); + +//Subscribe to messages on it with a timeout of 30 seconds +var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); +await subscription.SubscribeAsync(cancellationTokenSource.Token); + +await Task.Delay(TimeSpan.FromMinutes(1)); + +//When you're done with the subscription, simply dispose of it +await subscription.DisposeAsync(); +``` + +{{% /codetab %}} {{% codetab %}} From 1ab3eca476b056b82d748acf2b55daebdc9ad680 Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 13 Nov 2024 08:05:01 -0800 Subject: [PATCH 2/3] Added missing information about kafka-pubsub special metadata headers Signed-off-by: Patrick Assuied --- .../supported-pubsub/setup-apache-kafka.md | 48 ++++++++++++++++++- 1 file changed, 46 insertions(+), 2 deletions(-) diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 75b1b758c2b..002aa4d69e9 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options: When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url. -The param name is `partitionKey`. +The param name can either `partitionKey` or `__key` Example: @@ -484,7 +484,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partiti ### Message headers -All other metadata key/value pairs (that are not `partitionKey`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message. +All other metadata key/value pairs (that are not `partitionKey` or `__key`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message. ```shell curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \ @@ -495,7 +495,51 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla } }' ``` +### Kafka Pubsub special message headers received on consumer side +When consuming messages, special message metadata are being automatically passed as headers. These are: +- `__key`: the message key if applicable +- `__topic`: the topic for the message +- `__partition`: the partition number for the message +- `__offset`: the offset of the message in the partition +- `__timestamp`: the timestamp for the message + +You can access them within the consumer endpoint as follows: +{{< tabs "Python (FastAPI)" >}} + +{{% codetab %}} + +```python +from fastapi import APIRouter, Body, Response, status +import json +import sys + +app = FastAPI() + +router = APIRouter() + + +@router.get('/dapr/subscribe') +def subscribe(): + subscriptions = [{'pubsubname': 'pubsub', + 'topic': 'my-topic', + 'route': 'my_topic_subscriber', + }] + return subscriptions + +@router.post('/my_topic_subscriber') +def my_topic_subscriber( + key: Annotated[str, Header(alias="__key")], + offset: Annotated[int, Header(alias="__offset")], + event_data=Body()): + print(f"key={key} - offset={offset} - data={event_data}", flush=True) + return Response(status_code=status.HTTP_200_OK) + +app.include_router(router) + +``` + +{{% /codetab %}} ## Receiving message headers with special characters The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors. From 224c7de4ce7aeef00b056f1605c67493beb3507c Mon Sep 17 00:00:00 2001 From: Patrick Assuied Date: Wed, 13 Nov 2024 08:17:48 -0800 Subject: [PATCH 3/3] Typo Signed-off-by: Patrick Assuied --- .../supported-pubsub/setup-apache-kafka.md | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 002aa4d69e9..c6f71888370 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options: When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url. -The param name can either `partitionKey` or `__key` +The param name can either be `partitionKey` or `__key` Example: @@ -498,7 +498,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla ### Kafka Pubsub special message headers received on consumer side When consuming messages, special message metadata are being automatically passed as headers. These are: -- `__key`: the message key if applicable +- `__key`: the message key if available - `__topic`: the topic for the message - `__partition`: the partition number for the message - `__offset`: the offset of the message in the partition