diff --git a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md index 62ed2811ebe..ad5745abc37 100644 --- a/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md +++ b/daprdocs/content/en/developing-applications/building-blocks/pubsub/subscription-methods.md @@ -203,7 +203,47 @@ As messages are sent to the given message handler code, there is no concept of r The example below shows the different ways to stream subscribe to a topic. -{{< tabs Go>}} +{{< tabs ".NET" Go>}} + +{{% codetab %}} + +```csharp +using Dapr.Messaging.PublishSubscribe; + +var clientBuilder = new DaprPublishSubscribeClientBuilder(); +var daprMessagingClient = clientBuilder.Build(); + +async Task HandleMessage(TopicMessage message, CancellationToken cancellationToken = default) +{ + try + { + //Do something with the message + Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span)); + + return await Task.FromResult(TopicResponseAction.Success); + } + catch + { + return await Task.FromResult(TopicResponseAction.Retry); + } +} + +//Create a dynamic streaming subscription +var subscription = daprMessagingClient.Register("pubsub", "myTopic", + new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(15), TopicResponseAction.Retry)), + HandleMessage, CancellationToken.None); + +//Subscribe to messages on it with a timeout of 30 seconds +var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30)); +await subscription.SubscribeAsync(cancellationTokenSource.Token); + +await Task.Delay(TimeSpan.FromMinutes(1)); + +//When you're done with the subscription, simply dispose of it +await subscription.DisposeAsync(); +``` + +{{% /codetab %}} {{% codetab %}} diff --git a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md index 75b1b758c2b..c6f71888370 100644 --- a/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md +++ b/daprdocs/content/en/reference/components-reference/supported-pubsub/setup-apache-kafka.md @@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options: When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url. -The param name is `partitionKey`. +The param name can either be `partitionKey` or `__key` Example: @@ -484,7 +484,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partiti ### Message headers -All other metadata key/value pairs (that are not `partitionKey`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message. +All other metadata key/value pairs (that are not `partitionKey` or `__key`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message. ```shell curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \ @@ -495,7 +495,51 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla } }' ``` +### Kafka Pubsub special message headers received on consumer side +When consuming messages, special message metadata are being automatically passed as headers. These are: +- `__key`: the message key if available +- `__topic`: the topic for the message +- `__partition`: the partition number for the message +- `__offset`: the offset of the message in the partition +- `__timestamp`: the timestamp for the message + +You can access them within the consumer endpoint as follows: +{{< tabs "Python (FastAPI)" >}} + +{{% codetab %}} + +```python +from fastapi import APIRouter, Body, Response, status +import json +import sys + +app = FastAPI() + +router = APIRouter() + + +@router.get('/dapr/subscribe') +def subscribe(): + subscriptions = [{'pubsubname': 'pubsub', + 'topic': 'my-topic', + 'route': 'my_topic_subscriber', + }] + return subscriptions + +@router.post('/my_topic_subscriber') +def my_topic_subscriber( + key: Annotated[str, Header(alias="__key")], + offset: Annotated[int, Header(alias="__offset")], + event_data=Body()): + print(f"key={key} - offset={offset} - data={event_data}", flush=True) + return Response(status_code=status.HTTP_200_OK) + +app.include_router(router) + +``` + +{{% /codetab %}} ## Receiving message headers with special characters The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.