Skip to content

Commit

Permalink
Changes to forwardable messages.
Browse files Browse the repository at this point in the history
  • Loading branch information
colinmcintosh committed Aug 20, 2020
1 parent 62449e0 commit 59ccefb
Show file tree
Hide file tree
Showing 7 changed files with 93 additions and 73 deletions.
6 changes: 3 additions & 3 deletions gateway/connections/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,9 +46,9 @@ import (
type ConnectionManager interface {
// Cache returns the *cache.Cache that contains gNMI Notifications.
Cache() *cache.Cache
// HasTargetLock returns true if this instance of the ConnectionManager
// holds the lock for the named target.
HasTargetLock(target string) bool
// Forwardable returns true if this instance of the ConnectionManager
// holds the lock for a non-cluster member connection for the named target.
Forwardable(target string) bool
// Start will start the loop to listen for TargetConnectionControl messages
// on TargetControlChan.
Start() error
Expand Down
74 changes: 42 additions & 32 deletions gateway/connections/state.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@
// See the License for the specific language governing permissions and
// limitations under the License.

// Portions of this file including TargetState and its receivers (excluding modifications) are from
// Portions of this file including ConnectionState and its receivers (excluding modifications) are from
// https://github.com/openconfig/gnmi/blob/89b2bf29312cda887da916d0f3a32c1624b7935f/cmd/gnmi_collector/gnmi_collector.go

package connections
Expand All @@ -49,33 +49,36 @@ import (
"time"
)

// TargetState makes the calls to connect a target, tracks any associated connection state, and is the container for
// ConnectionState makes the calls to connect a target, tracks any associated connection state, and is the container for
// the target's cache data. It is created once for every device and used as a closure parameter by ProtoHandler.
type TargetState struct {
type ConnectionState struct {
ConnectionLockAcquired bool
config *configuration.GatewayConfig
client *client.ReconnectClient
connManager ConnectionManager
clusterMember bool
config *configuration.GatewayConfig
// connected status is set to true when the first gnmi notification is received.
// it gets reset to false when disconnect call back of ReconnectClient is called.
connected bool
// connecting status is used to signal that some of the connection process has been started and
// full reconnection is necessary if the target configuration changes
connecting bool
connManager ConnectionManager
// lock is the distributed lock that must be acquired before a connection is made if .connectWithLock() is called
lock locking.DistributedLocker
// The unique name of the target that is being connected to
name string
queryTarget string
targetCache *cache.Target
target *targetpb.Target
useLock bool
request *gnmipb.SubscribeRequest
// connected status is set to true when the first gnmi notification is received.
// it gets reset to false when disconnect call back of ReconnectClient is called.
connected bool
// connecting status is used to signal that some of the connection process has been started and
// full reconnection is necessary if the target configuration changes
connecting bool
// seen is the list of targets that have been seen on this connection
seen map[string]bool
// stopped status signals that .disconnect() has been called we no longer want to connect to this target so we
// should stop trying to connect and release any locks that are being held
stopped bool
// synced status signals that a sync message was received from the target.
synced bool
synced bool
target *targetpb.Target
targetCache *cache.Target
useLock bool

// metrics
metricTags map[string]string
Expand All @@ -87,7 +90,7 @@ type TargetState struct {
timerLatency *histogram.PercentileTimer
}

func (t *TargetState) InitializeMetrics() {
func (t *ConnectionState) InitializeMetrics() {
t.metricTags = map[string]string{"gnmigateway.client.target": t.name}
t.counterNotifications = stats.Registry.Counter("gnmigateway.client.subscribe.notifications", t.metricTags)
t.counterRejected = stats.Registry.Counter("gnmigateway.client.subscribe.rejected", t.metricTags)
Expand All @@ -99,8 +102,8 @@ func (t *TargetState) InitializeMetrics() {
}

// Equal returns true if the target config is different than the target config for
// this TargetState instance.
func (t *TargetState) Equal(other *targetpb.Target) bool {
// this ConnectionState instance.
func (t *ConnectionState) Equal(other *targetpb.Target) bool {
if len(t.target.Addresses) != len(other.Addresses) {
return false
}
Expand All @@ -125,7 +128,12 @@ func (t *TargetState) Equal(other *targetpb.Target) bool {
return true
}

func (t *TargetState) doConnect() {
// Seen returns true if the named target has been seen on this connection.
func (t *ConnectionState) Seen(target string) bool {
return t.seen[target]
}

func (t *ConnectionState) doConnect() {
t.connecting = true
t.config.Log.Info().Msgf("Target %s: Connecting", t.name)
query, err := client.NewQuery(t.request)
Expand Down Expand Up @@ -184,9 +192,9 @@ func (t *TargetState) doConnect() {
}
}

// Attempt to acquire a connection slot and connect to the target. If TargetState.disconnect() is called
// Attempt to acquire a connection slot and connect to the target. If ConnectionState.disconnect() is called
// all attempts and connections are aborted.
func (t *TargetState) connect(connectionSlot *semaphore.Weighted) {
func (t *ConnectionState) connect(connectionSlot *semaphore.Weighted) {
var connectionSlotAcquired = false
for !t.stopped {
if !connectionSlotAcquired {
Expand All @@ -202,9 +210,9 @@ func (t *TargetState) connect(connectionSlot *semaphore.Weighted) {
}

// Attempt to acquire a connection slot. After a connection slot is acquired attempt to grab the lock for the target.
// After the lock for the target is acquired connect to the target. If TargetState.disconnect() is called
// After the lock for the target is acquired connect to the target. If ConnectionState.disconnect() is called
// all attempts and connections are aborted.
func (t *TargetState) connectWithLock(connectionSlot *semaphore.Weighted) {
func (t *ConnectionState) connectWithLock(connectionSlot *semaphore.Weighted) {
var connectionSlotAcquired = false
for !t.stopped {
if !connectionSlotAcquired {
Expand Down Expand Up @@ -232,23 +240,24 @@ func (t *TargetState) connectWithLock(connectionSlot *semaphore.Weighted) {
}

// Disconnect from the target or stop trying to connect.
func (t *TargetState) disconnect() error {
func (t *ConnectionState) disconnect() error {
t.config.Log.Info().Msgf("Target %s: Disconnecting", t.name)
t.stopped = true
return t.client.Close() // this will disconnect and reset the cache via the disconnect callback
}

// Callback for gNMI client to signal that it has disconnected.
func (t *TargetState) disconnected() {
func (t *ConnectionState) disconnected() {
t.connected = false
t.synced = false
t.seen = map[string]bool{}
if t.queryTarget != "*" {
t.targetCache.Reset()
}
t.config.Log.Info().Msgf("Target %s: Disconnected", t.name)
}

func (t *TargetState) reconnect() error {
func (t *ConnectionState) reconnect() error {
t.config.Log.Info().Msgf("Target %s: Reconnecting", t.name)
return t.client.Close()
}
Expand All @@ -257,7 +266,7 @@ func (t *TargetState) reconnect() error {
// gNMI SubscribeResponse messages. When the message is an Update, the GnmiUpdate method of the
// cache.Target is called to generate an update. If the message is a sync_response, then targetCache is
// marked as synchronised.
func (t *TargetState) handleUpdate(msg proto.Message) error {
func (t *ConnectionState) handleUpdate(msg proto.Message) error {
//fmt.Printf("%+v\n", msg)
t.counterNotifications.Increment()
if !t.connected {
Expand Down Expand Up @@ -288,6 +297,7 @@ func (t *TargetState) handleUpdate(msg proto.Message) error {
if targetCache == nil {
targetCache = t.connManager.Cache().Add(v.Update.Prefix.Target)
}
t.seen[v.Update.Prefix.Target] = true
err := t.updateTargetCache(targetCache, v.Update)
if err != nil {
return err
Expand All @@ -298,7 +308,7 @@ func (t *TargetState) handleUpdate(msg proto.Message) error {
if v.Update.GetPrefix() == nil {
v.Update.Prefix = &gnmipb.Path{}
}
if v.Update.Prefix.Target == "" && t.queryTarget != "*" {
if v.Update.Prefix.Target == "" {
v.Update.Prefix.Target = t.queryTarget
}
err := t.updateTargetCache(t.targetCache, v.Update)
Expand All @@ -323,8 +333,8 @@ func (t *TargetState) handleUpdate(msg proto.Message) error {
return nil
}

// sync sets the state of the TargetState to synced.
func (t *TargetState) sync() {
// sync sets the state of the ConnectionState to synced.
func (t *ConnectionState) sync() {
t.config.Log.Info().Msgf("Target %s: Synced", t.name)
t.synced = true
t.counterSync.Increment()
Expand All @@ -339,7 +349,7 @@ func (t *TargetState) sync() {
}()
}

func (t *TargetState) updateTargetCache(cache *cache.Target, update *gnmipb.Notification) error {
func (t *ConnectionState) updateTargetCache(cache *cache.Target, update *gnmipb.Notification) error {
err := cache.GnmiUpdate(update)
if err != nil {
// Some errors won't corrupt the cache so no need to return an error to the ProtoHandler caller. For these
Expand All @@ -358,7 +368,7 @@ func (t *TargetState) updateTargetCache(cache *cache.Target, update *gnmipb.Noti

// rejectUpdate returns true if the gNMI notification is unwanted based on the RejectUpdates
// configuration in GatewayConfig.
func (t *TargetState) rejectUpdate(notification *gnmipb.Notification) bool {
func (t *ConnectionState) rejectUpdate(notification *gnmipb.Notification) bool {
for _, update := range notification.GetUpdate() {
path := update.GetPath().GetElem()
for _, rejectionPath := range t.config.UpdateRejections {
Expand Down
77 changes: 43 additions & 34 deletions gateway/connections/zookeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,8 +34,8 @@ type ZookeeperConnectionManager struct {
cache *cache.Cache
config *configuration.GatewayConfig
connLimit *semaphore.Weighted
targets map[string]*TargetState
targetsMutex sync.Mutex
connections map[string]*ConnectionState
connectionsMutex sync.Mutex
targetsConfigChan chan *TargetConnectionControl
zkConn *zk.Conn
}
Expand All @@ -46,7 +46,7 @@ func NewZookeeperConnectionManagerDefault(config *configuration.GatewayConfig, z
mgr := ZookeeperConnectionManager{
config: config,
connLimit: semaphore.NewWeighted(int64(config.TargetLimit)),
targets: make(map[string]*TargetState),
connections: make(map[string]*ConnectionState),
targetsConfigChan: make(chan *TargetConnectionControl, 10),
zkConn: zkConn,
}
Expand All @@ -62,13 +62,13 @@ func (c *ZookeeperConnectionManager) eventListener(zkEvents <-chan zk.Event) {
switch event.State {
case zk.StateDisconnected:
c.config.Log.Info().Msgf("Zookeeper disconnected. Resetting locked target connections.")
c.targetsMutex.Lock()
for _, targetConfig := range c.targets {
c.connectionsMutex.Lock()
for _, targetConfig := range c.connections {
if targetConfig.useLock {
_ = targetConfig.reconnect()
}
}
c.targetsMutex.Unlock()
c.connectionsMutex.Unlock()
}
}
}
Expand All @@ -78,14 +78,20 @@ func (c *ZookeeperConnectionManager) Cache() *cache.Cache {
return c.cache
}

func (c *ZookeeperConnectionManager) HasTargetLock(target string) bool {
c.targetsMutex.Lock()
targetState, exists := c.targets[target]
c.targetsMutex.Unlock()
if !exists || !targetState.ConnectionLockAcquired {
return false
// Forwardable returns true if Notifications from the named target can be
// forwarded to Exporters.
func (c *ZookeeperConnectionManager) Forwardable(target string) bool {
if target == "*" || target == "" {
return true
}
return true
c.connectionsMutex.Lock()
defer c.connectionsMutex.Unlock()
for _, conn := range c.connections {
if !conn.clusterMember && conn.ConnectionLockAcquired && conn.Seen(target) {
return true
}
}
return false
}

func (c *ZookeeperConnectionManager) TargetControlChan() chan<- *TargetConnectionControl {
Expand All @@ -106,9 +112,9 @@ func (c *ZookeeperConnectionManager) ReloadTargets() {
}
}

c.targetsMutex.Lock()
newTargets := make(map[string]*TargetState)
for name, currentConfig := range c.targets {
c.connectionsMutex.Lock()
newConnections := make(map[string]*ConnectionState)
for name, currentConfig := range c.connections {
// Disconnect from everything we want to remove
if stringInSlice(name, targetControlMsg.Remove) {
err := currentConfig.disconnect()
Expand All @@ -122,7 +128,7 @@ func (c *ZookeeperConnectionManager) ReloadTargets() {
newConfig, exists := targetControlMsg.Insert.Target[name]
if !exists {
// no config for this target
newTargets[name] = currentConfig
newConnections[name] = currentConfig
} else {
if !currentConfig.Equal(newConfig) {
// target is different; update the current config with the old one and reconnect
Expand All @@ -135,45 +141,48 @@ func (c *ZookeeperConnectionManager) ReloadTargets() {
c.config.Log.Error().Err(err).Msgf("Error reconnecting connection for %s", name)
}
}
newTargets[name] = currentConfig
newConnections[name] = currentConfig
}
}
}

if targetControlMsg.Insert != nil {
// make new connections
for name, newConfig := range targetControlMsg.Insert.Target {
if _, exists := newTargets[name]; exists {
if _, exists := newConnections[name]; exists {
continue
} else {
// no previous targetCache existed
c.config.Log.Info().Msgf("Initializing target %s (%v) %v.", name, newConfig.Addresses, newConfig.Meta)
_, noLock := newConfig.Meta["NoLock"]
newTargets[name] = &TargetState{
config: c.config,
connManager: c,
name: name,
targetCache: c.cache.Add(name),
target: newConfig,
request: targetControlMsg.Insert.Request[newConfig.Request],
useLock: c.zkConn != nil && !noLock,
_, clusterMember := newConfig.Meta["ClusterMember"]
newConnections[name] = &ConnectionState{
clusterMember: clusterMember,
config: c.config,
connManager: c,
name: name,
targetCache: c.cache.Add(name),
target: newConfig,
request: targetControlMsg.Insert.Request[newConfig.Request],
seen: make(map[string]bool),
useLock: c.zkConn != nil && !noLock,
}
newTargets[name].InitializeMetrics()
if newTargets[name].useLock {
newConnections[name].InitializeMetrics()
if newConnections[name].useLock {
lockPath := MakeTargetLockPath(c.config.ZookeeperPrefix, name)
clusterMemberAddress := c.config.ServerAddress + ":" + strconv.Itoa(c.config.ServerPort)
newTargets[name].lock = locking.NewZookeeperNonBlockingLock(c.zkConn, lockPath, clusterMemberAddress, zk.WorldACL(zk.PermAll))
go newTargets[name].connectWithLock(c.connLimit)
newConnections[name].lock = locking.NewZookeeperNonBlockingLock(c.zkConn, lockPath, clusterMemberAddress, zk.WorldACL(zk.PermAll))
go newConnections[name].connectWithLock(c.connLimit)
} else {
go newTargets[name].connect(c.connLimit)
go newConnections[name].connect(c.connLimit)
}
}
}
}

// save the target changes
c.targets = newTargets
c.targetsMutex.Unlock()
c.connections = newConnections
c.connectionsMutex.Unlock()
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion gateway/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ func (g *Gateway) sendUpdateToClients(leaf *ctree.Leaf) {
if client.External {
notification := leaf.Value().(*gnmi.Notification)
target := notification.GetPrefix().GetTarget()
if g.connMgr.HasTargetLock(target) {
if g.connMgr.Forwardable(target) {
client.Send(leaf)
}
} else {
Expand Down
3 changes: 2 additions & 1 deletion gateway/loaders/cluster/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,8 @@ func (c ClusterTargetLoader) WatchConfiguration(configChan chan<- *connections.T
Credentials: nil,
Request: "all",
Meta: map[string]string{
"NoLock": "yes",
"NoLock": "yes",
"ClusterMember": "yes",
},
},
},
Expand Down
2 changes: 1 addition & 1 deletion gateway/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -500,7 +500,7 @@ func (s *Server) sendStreamingResults(c *streamClient, connMgr connections.Conne
return
}
target := notification.GetPrefix().GetTarget()
if !connMgr.HasTargetLock(target) {
if !connMgr.Forwardable(target) {
// Only forward messages to cluster members if we have a local lock for the target
return
}
Expand Down
Loading

0 comments on commit 59ccefb

Please sign in to comment.