Skip to content

Commit

Permalink
Merge branch 'v1.14' into issue_3321
Browse files Browse the repository at this point in the history
  • Loading branch information
yaron2 authored Nov 18, 2024
2 parents 348fd1c + 3f719f2 commit 3c548c8
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,47 @@ As messages are sent to the given message handler code, there is no concept of r

The example below shows the different ways to stream subscribe to a topic.

{{< tabs Go>}}
{{< tabs ".NET" Go>}}

{{% codetab %}}

```csharp
using Dapr.Messaging.PublishSubscribe;
var clientBuilder = new DaprPublishSubscribeClientBuilder();
var daprMessagingClient = clientBuilder.Build();
async Task<TopicResponseAction> HandleMessage(TopicMessage message, CancellationToken cancellationToken = default)
{
try
{
//Do something with the message
Console.WriteLine(Encoding.UTF8.GetString(message.Data.Span));
return await Task.FromResult(TopicResponseAction.Success);
}
catch
{
return await Task.FromResult(TopicResponseAction.Retry);
}
}
//Create a dynamic streaming subscription
var subscription = daprMessagingClient.Register("pubsub", "myTopic",
new DaprSubscriptionOptions(new MessageHandlingPolicy(TimeSpan.FromSeconds(15), TopicResponseAction.Retry)),
HandleMessage, CancellationToken.None);
//Subscribe to messages on it with a timeout of 30 seconds
var cancellationTokenSource = new CancellationTokenSource(TimeSpan.FromSeconds(30));
await subscription.SubscribeAsync(cancellationTokenSource.Token);
await Task.Delay(TimeSpan.FromMinutes(1));
//When you're done with the subscription, simply dispose of it
await subscription.DisposeAsync();
```

{{% /codetab %}}

{{% codetab %}}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -468,7 +468,7 @@ Apache Kafka supports the following bulk metadata options:

When invoking the Kafka pub/sub, its possible to provide an optional partition key by using the `metadata` query param in the request url.

The param name is `partitionKey`.
The param name can either be `partitionKey` or `__key`

Example:

Expand All @@ -484,7 +484,7 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.partiti

### Message headers

All other metadata key/value pairs (that are not `partitionKey`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message.
All other metadata key/value pairs (that are not `partitionKey` or `__key`) are set as headers in the Kafka message. Here is an example setting a `correlationId` for the message.

```shell
curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correlationId=myCorrelationID&metadata.partitionKey=key1 \
Expand All @@ -495,7 +495,51 @@ curl -X POST http://localhost:3500/v1.0/publish/myKafka/myTopic?metadata.correla
}
}'
```
### Kafka Pubsub special message headers received on consumer side

When consuming messages, special message metadata are being automatically passed as headers. These are:
- `__key`: the message key if available
- `__topic`: the topic for the message
- `__partition`: the partition number for the message
- `__offset`: the offset of the message in the partition
- `__timestamp`: the timestamp for the message

You can access them within the consumer endpoint as follows:
{{< tabs "Python (FastAPI)" >}}

{{% codetab %}}

```python
from fastapi import APIRouter, Body, Response, status
import json
import sys
app = FastAPI()
router = APIRouter()
@router.get('/dapr/subscribe')
def subscribe():
subscriptions = [{'pubsubname': 'pubsub',
'topic': 'my-topic',
'route': 'my_topic_subscriber',
}]
return subscriptions
@router.post('/my_topic_subscriber')
def my_topic_subscriber(
key: Annotated[str, Header(alias="__key")],
offset: Annotated[int, Header(alias="__offset")],
event_data=Body()):
print(f"key={key} - offset={offset} - data={event_data}", flush=True)
return Response(status_code=status.HTTP_200_OK)
app.include_router(router)
```

{{% /codetab %}}
## Receiving message headers with special characters

The consumer application may be required to receive message headers that include special characters, which may cause HTTP protocol validation errors.
Expand Down

0 comments on commit 3c548c8

Please sign in to comment.