Skip to content

Commit

Permalink
Merge pull request #209 from mreiferson/retry-lookup
Browse files Browse the repository at this point in the history
consumer: retry nsqlookupd queries
  • Loading branch information
jehiah authored Jun 4, 2017
2 parents 3c0361b + 29f287e commit 973bf9a
Show file tree
Hide file tree
Showing 4 changed files with 16 additions and 5 deletions.
1 change: 1 addition & 0 deletions api_request.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ func (c *deadlinedConn) Write(b []byte) (n int, err error) {

func newDeadlineTransport(timeout time.Duration) *http.Transport {
transport := &http.Transport{
DisableKeepAlives: true,
Dial: func(netw, addr string) (net.Conn, error) {
c, err := net.DialTimeout(netw, addr, timeout)
if err != nil {
Expand Down
1 change: 0 additions & 1 deletion conn.go
Original file line number Diff line number Diff line change
Expand Up @@ -487,7 +487,6 @@ func (c *Conn) readLoop() {
}

frameType, data, err := ReadUnpackedResponse(c)

if err != nil {
if err == io.EOF && atomic.LoadInt32(&c.closeFlag) == 1 {
goto exit
Expand Down
8 changes: 8 additions & 0 deletions consumer.go
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,9 @@ type peerInfo struct {
//
// initiate a connection to any new producers that are identified.
func (r *Consumer) queryLookupd() {
retries := 0

retry:
endpoint := r.nextLookupdEndpoint()

r.log(LogLevelInfo, "querying nsqlookupd %s", endpoint)
Expand All @@ -462,6 +465,11 @@ func (r *Consumer) queryLookupd() {
err := apiRequestNegotiateV1("GET", endpoint, nil, &data)
if err != nil {
r.log(LogLevelError, "error querying nsqlookupd (%s) - %s", endpoint, err)
retries++
if retries < 3 {
r.log(LogLevelInfo, "retrying with next nsqlookupd")
goto retry
}
return
}

Expand Down
11 changes: 7 additions & 4 deletions consumer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ func SendMessage(t *testing.T, port int, topic string, method string, body []byt
t.Fatalf(err.Error())
return
}
if resp.StatusCode != 200 {
t.Fatalf("%s status code: %d", method, resp.StatusCode)
}
resp.Body.Close()
}

Expand Down Expand Up @@ -165,17 +168,17 @@ func consumerTest(t *testing.T, cb func(c *Config)) {
}
topicName = topicName + strconv.Itoa(int(time.Now().Unix()))
q, _ := NewConsumer(topicName, "ch", config)
// q.SetLogger(nullLogger, LogLevelInfo)
q.SetLogger(log.New(os.Stderr, "", log.Flags()), LogLevelDebug)

h := &MyTestHandler{
t: t,
q: q,
}
q.AddHandler(h)

SendMessage(t, 4151, topicName, "put", []byte(`{"msg":"single"}`))
SendMessage(t, 4151, topicName, "mput", []byte("{\"msg\":\"double\"}\n{\"msg\":\"double\"}"))
SendMessage(t, 4151, topicName, "put", []byte("TOBEFAILED"))
SendMessage(t, 4151, topicName, "pub", []byte(`{"msg":"single"}`))
SendMessage(t, 4151, topicName, "mpub", []byte("{\"msg\":\"double\"}\n{\"msg\":\"double\"}"))
SendMessage(t, 4151, topicName, "pub", []byte("TOBEFAILED"))
h.messagesSent = 4

addr := "127.0.0.1:4150"
Expand Down

0 comments on commit 973bf9a

Please sign in to comment.