Skip to content

Commit

Permalink
udpates docs
Browse files Browse the repository at this point in the history
  • Loading branch information
Shubhang Balkundi committed Mar 26, 2024
1 parent 0100059 commit 6ca2a70
Show file tree
Hide file tree
Showing 2 changed files with 51 additions and 2 deletions.
51 changes: 50 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -225,4 +225,53 @@ router.HandlerFunc("mobile_app_log_consumer/mobile-application-logs/(1|3|5|7|9|1
router.HandlerFunc("mobile_app_log_consumer/mobile-application-logs/(2|4|6|8|10|12)", func (ctx, *ziggurat.Event) error {....})
```

Based on how the routing path is set by the message consumer implementation, you can define your routing paths.
Based on how the routing path is set by the message consumer implementation, you can define your routing paths.

### Retries
Ziggurat-Go includes rabbitmq as the backend for message retries. Message retries are useful when message processing from one message consumer fails and needs to be retried.

The `rabbitMQ.AutoRetry(qc QueueConfig,opts ...Opts)` function creates an instance of the `rabbitmq.ARetry struct`

Required params
```go
type QueueConfig struct {
QueueKey string // A queue key to be used for retries, any arbitrary string would do
DelayExpirationInMS string // The time in milliseconds after which to reconsume the message for processing
RetryCount int // The number of times to retry the message
ConsumerPrefetchCount int // The number of messages in batch to be consumed from RabbitMQ
ConsumerCount int // Number of concurrent consumer instances
}

type Queues []QueueConfig
```
#### How does queue key work
A practical example
Suppose your queue key is called `foo_retries`. The RabbitMQ retry module will automatically create 3 queues namely
- `foo_retries_instant_queue`
- `foo_retries_delay_queue`
- `foo_retries_dlq`
It will also create an exchange by the name `foo_retries_exchange`. This exchange is internally used to send messages to the right queue.
Consumption only happens from the instant queue. The delay queue is where the retried message is sent and once the retries are exhausted they are sent to the dlq.

#### I have a lot of messages in my dead letter queue, how do I replay them
The RabbitMQ package provides HTTP handlers which clear the messages on the RabbitMQ queues. These handlers can be used with any HTTP server.

Example usage:
```go
router := http.NewServeMux()
ar := rabbitmq.AutoRetry()
router.Handle("POST /ds_replay", ar.DSReplayHandler(context.Background()))
router.Handle("POST /ds_view", ar.DSViewHandler(context.Background()))
http.ListenAndServe("localhost:8080", router)
```
Just invoke the API with the following query params
| Param | Example |
| ------- | --------- |
| count\* | 100 |
| queue\* | foo_retry |

> [!NOTE]
> * indicates a required param
> [!CAUTION]
> Using a Prefetch of 1 is not beneficial for consumption and can fill up the RabbitMQ queues, use a higher value from 10 to 300.
2 changes: 1 addition & 1 deletion mw/rabbitmq/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ func createQueuesAndExchanges(ch *amqp.Channel, queueName string, logger ziggura
for _, qt := range queueTypes {
args := amqp.Table{}
logger.Info("creating queue", map[string]interface{}{"queue": queueName, "type": qt})
if qt == "delay" {
if qt == QueueTypeDelay {
args = amqp.Table{
"x-dead-letter-exchange": fmt.Sprintf("%s_%s", queueName, "exchange"),
"x-dead-letter-routing-key": QueueTypeInstant,
Expand Down

0 comments on commit 6ca2a70

Please sign in to comment.