Skip to content

Commit

Permalink
code review updates
Browse files Browse the repository at this point in the history
Signed-off-by: Merlin Ran <[email protected]>
  • Loading branch information
merlinran committed Jul 30, 2021
1 parent a279b5d commit f6dabd3
Show file tree
Hide file tree
Showing 2 changed files with 14 additions and 8 deletions.
14 changes: 7 additions & 7 deletions rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,9 +200,6 @@ func (t *Topic) Publish(
}

resultCh := make(chan Response)
if args.ignoreResponse {
close(resultCh)
}
go func() {
defer func() {
t.lk.Lock()
Expand Down Expand Up @@ -269,10 +266,11 @@ func (t *Topic) watch() {
}
}

func (t *Topic) republishTo(peer.ID) {
func (t *Topic) republishTo(p peer.ID) {
t.lk.Lock()
for _, m := range t.ongoing {
go func(m ongoingMessage) {
log.Debugf("republishing %s because peer %s newly joins", t.t, p)
if err := t.t.Publish(m.ctx, m.data, m.opts...); err != nil {
log.Errorf("republishing to topic: %v", err)
}
Expand Down Expand Up @@ -359,10 +357,12 @@ func (t *Topic) resMessageHandler(from peer.ID, topic string, msg []byte) ([]byt
t.lk.Lock()
m, exists := t.ongoing[id]
t.lk.Unlock()
if exists && m.respCh != nil {
m.respCh <- res
if exists {
if m.respCh != nil {
m.respCh <- res
}
} else {
log.Warnf("%s missed response from %s: %s", topic, from, res.ID)
log.Debugf("%s response from %s arrives too late, discarding", topic, from)
}
return nil, nil // no response to a response
}
8 changes: 7 additions & 1 deletion rpc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,6 +86,12 @@ func TestPingPong(t *testing.T) {
assert.NotEmpty(t, r2.ID)
assert.Equal(t, p1.Host().ID().String(), r2.From.String())

// test ignore response - make sure nothing weird happens.
ctx, cancel := context.WithTimeout(context.Background(), time.Second)
_, err = t1.Publish(ctx, []byte("ping"), rpc.WithIgnoreResponse(true))
require.NoError(t, err)
cancel()

// test retries; peer1 requests "pong" from peer2, but peer2 joins topic after the request
t3, err := p1.NewTopic(context.Background(), "topic2", true)
require.NoError(t, err)
Expand All @@ -107,7 +113,7 @@ func TestPingPong(t *testing.T) {
}()

// allow enough time for peer2 join event to be propagated.
ctx, cancel := context.WithTimeout(context.Background(), time.Second*2)
ctx, cancel = context.WithTimeout(context.Background(), time.Second*2)
defer cancel()
rc3, err := t3.Publish(ctx, []byte("ping"))
require.NoError(t, err)
Expand Down

0 comments on commit f6dabd3

Please sign in to comment.