Skip to content

Commit

Permalink
add params max_retry_channel
Browse files Browse the repository at this point in the history
  • Loading branch information
bagusandrian committed Nov 29, 2023
1 parent 4b7e99c commit bf066d0
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 15 deletions.
8 changes: 8 additions & 0 deletions nsqd/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -245,9 +245,17 @@ func (s *httpServer) doPUB(w http.ResponseWriter, req *http.Request, ps httprout
return nil, http_api.Err{400, "INVALID_DEFER"}
}
}
var maxRetryChannel int
if mrc, ok := reqParams["max_retry_channel"]; ok {
maxRetryChannel, err = strconv.Atoi(mrc[0])
if err != nil {
return nil, http_api.Err{400, "INVALID_DEFER_MAX_RETRY_CHANNEL"}
}
}

msg := NewMessage(topic.GenerateID(), body)
msg.deferred = deferred
msg.maxRetryChannel = maxRetryChannel
err = topic.PutMessage(msg)
if err != nil {
return nil, http_api.Err{503, "EXITING"}
Expand Down
11 changes: 6 additions & 5 deletions nsqd/message.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,12 @@ type Message struct {
Attempts uint16

// for in-flight handling
deliveryTS time.Time
clientID int64
pri int64
index int
deferred time.Duration
deliveryTS time.Time
clientID int64
pri int64
index int
deferred time.Duration
maxRetryChannel int
}

func NewMessage(id MessageID, body []byte) *Message {
Expand Down
19 changes: 9 additions & 10 deletions nsqd/topic.go
Original file line number Diff line number Diff line change
Expand Up @@ -330,20 +330,19 @@ func (t *Topic) messagePump() {
channel.PutMessageDeferred(chanMsg, chanMsg.deferred)
continue
}
var attempts uint16
if msg.Attempts == 0 {
attempts = 2
} else {
attempts = msg.Attempts
}

for retry := uint16(0); retry < attempts; i++ {
checkRetry := 0
for {
err := channel.PutMessage(chanMsg)
if err != nil {
t.nsqd.logf(LOG_ERROR,
"TOPIC(%s) ERROR: failed to put msg(%s) to channel(%s) - %s will retry[%d]",
t.name, msg.ID, channel.name, err, retry+1)
continue
t.name, msg.ID, channel.name, err)
if checkRetry >= msg.maxRetryChannel {
break
} else {
checkRetry++
continue
}
}
break
}
Expand Down

0 comments on commit bf066d0

Please sign in to comment.