Skip to content

Commit

Permalink
dispatcher: migrate to SubConn.Shutdown and deprecate Addresses
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Nov 15, 2023
1 parent c8a85a3 commit 701bd63
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 8 deletions.
7 changes: 5 additions & 2 deletions broker/protocol/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -141,6 +141,9 @@ func (d *dispatcher) updateSubConnState(sc balancer.SubConn, state balancer.SubC
})
}

// This method has been deprecated but may still be in use. See https://github.com/grpc/grpc-go/pull/6481
// For updates to the logic, apply them to `updateSubConnState` instead which has been integrated with the new
// StateListener interface
func (d *dispatcher) UpdateSubConnState(sc balancer.SubConn, state balancer.SubConnState) {
d.updateSubConnState(sc, state)
}
Expand Down Expand Up @@ -301,10 +304,10 @@ func (d *dispatcher) sweep() {
d.mu.Unlock()

for _, sc := range toSweep {
// RemoveSubConn begins SubConn shutdown. We expect to see a
// SubConn.Shutdown begins a shutdown. We expect to see a
// HandleSubConnStateChange with connectivity.Shutdown, at which
// point we'll de-index it.
d.cc.RemoveSubConn(sc)
sc.Shutdown()
}
}

Expand Down
12 changes: 6 additions & 6 deletions broker/protocol/dispatcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {

// Case: Specific remote peer is dispatched to.
ctx = WithDispatchRoute(context.Background(),
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})

result, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrNoSubConnAvailable)
Expand Down Expand Up @@ -130,7 +130,7 @@ func (s *DispatcherSuite) TestDispatchCases(c *gc.C) {
mockSubConn{Name: "remote.addr", disp: disp}.UpdateState(balancer.SubConnState{ConnectivityState: connectivity.TransientFailure})

ctx = WithDispatchRoute(context.Background(),
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})
buildRouteFixture(), ProcessSpec_ID{Zone: "remote", Suffix: "primary"})

_, err = disp.Pick(balancer.PickInfo{Ctx: ctx})
c.Check(err, gc.Equals, balancer.ErrTransientFailure)
Expand Down Expand Up @@ -253,7 +253,7 @@ func (s mockSubConn) UpdateAddresses([]resolver.Address) { panic("deprecated") }
func (s mockSubConn) UpdateState(state balancer.SubConnState) { s.disp.updateSubConnState(s, state) }
func (s mockSubConn) Connect() {}
func (s mockSubConn) GetOrBuildProducer(balancer.ProducerBuilder) (balancer.Producer, func()) {
return nil, func() {}
return nil, func() {}
}
func (s mockSubConn) Shutdown() {
var c = s.disp.cc.(*mockClientConn)
Expand All @@ -266,12 +266,12 @@ func (c *mockClientConn) NewSubConn(a []resolver.Address, _ balancer.NewSubConnO
return sc, c.err
}

func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) {}
func (c *mockClientConn) UpdateAddresses(balancer.SubConn, []resolver.Address) { panic("deprecated") }
func (c *mockClientConn) UpdateState(balancer.State) {}
func (c *mockClientConn) ResolveNow(resolver.ResolveNowOptions) {}
func (c *mockClientConn) Target() string { return "default.addr" }
func (c *mockClientConn) RemoveSubConn(balancer.SubConn) {
panic("deprecated")
func (c *mockClientConn) RemoveSubConn(sc balancer.SubConn) {
sc.Shutdown()
}

type mockRouter struct{ invalidated string }
Expand Down

0 comments on commit 701bd63

Please sign in to comment.