diff --git a/gateway/connections/manager.go b/gateway/connections/manager.go index fff5ef6..b0c9e7b 100644 --- a/gateway/connections/manager.go +++ b/gateway/connections/manager.go @@ -46,9 +46,9 @@ import ( type ConnectionManager interface { // Cache returns the *cache.Cache that contains gNMI Notifications. Cache() *cache.Cache - // HasTargetLock returns true if this instance of the ConnectionManager - // holds the lock for the named target. - HasTargetLock(target string) bool + // Forwardable returns true if this instance of the ConnectionManager + // holds the lock for a non-cluster member connection for the named target. + Forwardable(target string) bool // Start will start the loop to listen for TargetConnectionControl messages // on TargetControlChan. Start() error diff --git a/gateway/connections/state.go b/gateway/connections/state.go index 5709c68..d2d8b14 100644 --- a/gateway/connections/state.go +++ b/gateway/connections/state.go @@ -24,7 +24,7 @@ // See the License for the specific language governing permissions and // limitations under the License. -// Portions of this file including TargetState and its receivers (excluding modifications) are from +// Portions of this file including ConnectionState and its receivers (excluding modifications) are from // https://github.com/openconfig/gnmi/blob/89b2bf29312cda887da916d0f3a32c1624b7935f/cmd/gnmi_collector/gnmi_collector.go package connections @@ -49,33 +49,36 @@ import ( "time" ) -// TargetState makes the calls to connect a target, tracks any associated connection state, and is the container for +// ConnectionState makes the calls to connect a target, tracks any associated connection state, and is the container for // the target's cache data. It is created once for every device and used as a closure parameter by ProtoHandler. -type TargetState struct { +type ConnectionState struct { ConnectionLockAcquired bool - config *configuration.GatewayConfig client *client.ReconnectClient - connManager ConnectionManager + clusterMember bool + config *configuration.GatewayConfig + // connected status is set to true when the first gnmi notification is received. + // it gets reset to false when disconnect call back of ReconnectClient is called. + connected bool + // connecting status is used to signal that some of the connection process has been started and + // full reconnection is necessary if the target configuration changes + connecting bool + connManager ConnectionManager // lock is the distributed lock that must be acquired before a connection is made if .connectWithLock() is called lock locking.DistributedLocker // The unique name of the target that is being connected to name string queryTarget string - targetCache *cache.Target - target *targetpb.Target - useLock bool request *gnmipb.SubscribeRequest - // connected status is set to true when the first gnmi notification is received. - // it gets reset to false when disconnect call back of ReconnectClient is called. - connected bool - // connecting status is used to signal that some of the connection process has been started and - // full reconnection is necessary if the target configuration changes - connecting bool + // seen is the list of targets that have been seen on this connection + seen map[string]bool // stopped status signals that .disconnect() has been called we no longer want to connect to this target so we // should stop trying to connect and release any locks that are being held stopped bool // synced status signals that a sync message was received from the target. - synced bool + synced bool + target *targetpb.Target + targetCache *cache.Target + useLock bool // metrics metricTags map[string]string @@ -87,7 +90,7 @@ type TargetState struct { timerLatency *histogram.PercentileTimer } -func (t *TargetState) InitializeMetrics() { +func (t *ConnectionState) InitializeMetrics() { t.metricTags = map[string]string{"gnmigateway.client.target": t.name} t.counterNotifications = stats.Registry.Counter("gnmigateway.client.subscribe.notifications", t.metricTags) t.counterRejected = stats.Registry.Counter("gnmigateway.client.subscribe.rejected", t.metricTags) @@ -99,8 +102,8 @@ func (t *TargetState) InitializeMetrics() { } // Equal returns true if the target config is different than the target config for -// this TargetState instance. -func (t *TargetState) Equal(other *targetpb.Target) bool { +// this ConnectionState instance. +func (t *ConnectionState) Equal(other *targetpb.Target) bool { if len(t.target.Addresses) != len(other.Addresses) { return false } @@ -125,7 +128,12 @@ func (t *TargetState) Equal(other *targetpb.Target) bool { return true } -func (t *TargetState) doConnect() { +// Seen returns true if the named target has been seen on this connection. +func (t *ConnectionState) Seen(target string) bool { + return t.seen[target] +} + +func (t *ConnectionState) doConnect() { t.connecting = true t.config.Log.Info().Msgf("Target %s: Connecting", t.name) query, err := client.NewQuery(t.request) @@ -184,9 +192,9 @@ func (t *TargetState) doConnect() { } } -// Attempt to acquire a connection slot and connect to the target. If TargetState.disconnect() is called +// Attempt to acquire a connection slot and connect to the target. If ConnectionState.disconnect() is called // all attempts and connections are aborted. -func (t *TargetState) connect(connectionSlot *semaphore.Weighted) { +func (t *ConnectionState) connect(connectionSlot *semaphore.Weighted) { var connectionSlotAcquired = false for !t.stopped { if !connectionSlotAcquired { @@ -202,9 +210,9 @@ func (t *TargetState) connect(connectionSlot *semaphore.Weighted) { } // Attempt to acquire a connection slot. After a connection slot is acquired attempt to grab the lock for the target. -// After the lock for the target is acquired connect to the target. If TargetState.disconnect() is called +// After the lock for the target is acquired connect to the target. If ConnectionState.disconnect() is called // all attempts and connections are aborted. -func (t *TargetState) connectWithLock(connectionSlot *semaphore.Weighted) { +func (t *ConnectionState) connectWithLock(connectionSlot *semaphore.Weighted) { var connectionSlotAcquired = false for !t.stopped { if !connectionSlotAcquired { @@ -232,23 +240,24 @@ func (t *TargetState) connectWithLock(connectionSlot *semaphore.Weighted) { } // Disconnect from the target or stop trying to connect. -func (t *TargetState) disconnect() error { +func (t *ConnectionState) disconnect() error { t.config.Log.Info().Msgf("Target %s: Disconnecting", t.name) t.stopped = true return t.client.Close() // this will disconnect and reset the cache via the disconnect callback } // Callback for gNMI client to signal that it has disconnected. -func (t *TargetState) disconnected() { +func (t *ConnectionState) disconnected() { t.connected = false t.synced = false + t.seen = map[string]bool{} if t.queryTarget != "*" { t.targetCache.Reset() } t.config.Log.Info().Msgf("Target %s: Disconnected", t.name) } -func (t *TargetState) reconnect() error { +func (t *ConnectionState) reconnect() error { t.config.Log.Info().Msgf("Target %s: Reconnecting", t.name) return t.client.Close() } @@ -257,7 +266,7 @@ func (t *TargetState) reconnect() error { // gNMI SubscribeResponse messages. When the message is an Update, the GnmiUpdate method of the // cache.Target is called to generate an update. If the message is a sync_response, then targetCache is // marked as synchronised. -func (t *TargetState) handleUpdate(msg proto.Message) error { +func (t *ConnectionState) handleUpdate(msg proto.Message) error { //fmt.Printf("%+v\n", msg) t.counterNotifications.Increment() if !t.connected { @@ -288,6 +297,7 @@ func (t *TargetState) handleUpdate(msg proto.Message) error { if targetCache == nil { targetCache = t.connManager.Cache().Add(v.Update.Prefix.Target) } + t.seen[v.Update.Prefix.Target] = true err := t.updateTargetCache(targetCache, v.Update) if err != nil { return err @@ -298,7 +308,7 @@ func (t *TargetState) handleUpdate(msg proto.Message) error { if v.Update.GetPrefix() == nil { v.Update.Prefix = &gnmipb.Path{} } - if v.Update.Prefix.Target == "" && t.queryTarget != "*" { + if v.Update.Prefix.Target == "" { v.Update.Prefix.Target = t.queryTarget } err := t.updateTargetCache(t.targetCache, v.Update) @@ -323,8 +333,8 @@ func (t *TargetState) handleUpdate(msg proto.Message) error { return nil } -// sync sets the state of the TargetState to synced. -func (t *TargetState) sync() { +// sync sets the state of the ConnectionState to synced. +func (t *ConnectionState) sync() { t.config.Log.Info().Msgf("Target %s: Synced", t.name) t.synced = true t.counterSync.Increment() @@ -339,7 +349,7 @@ func (t *TargetState) sync() { }() } -func (t *TargetState) updateTargetCache(cache *cache.Target, update *gnmipb.Notification) error { +func (t *ConnectionState) updateTargetCache(cache *cache.Target, update *gnmipb.Notification) error { err := cache.GnmiUpdate(update) if err != nil { // Some errors won't corrupt the cache so no need to return an error to the ProtoHandler caller. For these @@ -358,7 +368,7 @@ func (t *TargetState) updateTargetCache(cache *cache.Target, update *gnmipb.Noti // rejectUpdate returns true if the gNMI notification is unwanted based on the RejectUpdates // configuration in GatewayConfig. -func (t *TargetState) rejectUpdate(notification *gnmipb.Notification) bool { +func (t *ConnectionState) rejectUpdate(notification *gnmipb.Notification) bool { for _, update := range notification.GetUpdate() { path := update.GetPath().GetElem() for _, rejectionPath := range t.config.UpdateRejections { diff --git a/gateway/connections/zookeeper.go b/gateway/connections/zookeeper.go index 62efeff..24b4f86 100644 --- a/gateway/connections/zookeeper.go +++ b/gateway/connections/zookeeper.go @@ -34,8 +34,8 @@ type ZookeeperConnectionManager struct { cache *cache.Cache config *configuration.GatewayConfig connLimit *semaphore.Weighted - targets map[string]*TargetState - targetsMutex sync.Mutex + connections map[string]*ConnectionState + connectionsMutex sync.Mutex targetsConfigChan chan *TargetConnectionControl zkConn *zk.Conn } @@ -46,7 +46,7 @@ func NewZookeeperConnectionManagerDefault(config *configuration.GatewayConfig, z mgr := ZookeeperConnectionManager{ config: config, connLimit: semaphore.NewWeighted(int64(config.TargetLimit)), - targets: make(map[string]*TargetState), + connections: make(map[string]*ConnectionState), targetsConfigChan: make(chan *TargetConnectionControl, 10), zkConn: zkConn, } @@ -62,13 +62,13 @@ func (c *ZookeeperConnectionManager) eventListener(zkEvents <-chan zk.Event) { switch event.State { case zk.StateDisconnected: c.config.Log.Info().Msgf("Zookeeper disconnected. Resetting locked target connections.") - c.targetsMutex.Lock() - for _, targetConfig := range c.targets { + c.connectionsMutex.Lock() + for _, targetConfig := range c.connections { if targetConfig.useLock { _ = targetConfig.reconnect() } } - c.targetsMutex.Unlock() + c.connectionsMutex.Unlock() } } } @@ -78,14 +78,20 @@ func (c *ZookeeperConnectionManager) Cache() *cache.Cache { return c.cache } -func (c *ZookeeperConnectionManager) HasTargetLock(target string) bool { - c.targetsMutex.Lock() - targetState, exists := c.targets[target] - c.targetsMutex.Unlock() - if !exists || !targetState.ConnectionLockAcquired { - return false +// Forwardable returns true if Notifications from the named target can be +// forwarded to Exporters. +func (c *ZookeeperConnectionManager) Forwardable(target string) bool { + if target == "*" || target == "" { + return true } - return true + c.connectionsMutex.Lock() + defer c.connectionsMutex.Unlock() + for _, conn := range c.connections { + if !conn.clusterMember && conn.ConnectionLockAcquired && conn.Seen(target) { + return true + } + } + return false } func (c *ZookeeperConnectionManager) TargetControlChan() chan<- *TargetConnectionControl { @@ -106,9 +112,9 @@ func (c *ZookeeperConnectionManager) ReloadTargets() { } } - c.targetsMutex.Lock() - newTargets := make(map[string]*TargetState) - for name, currentConfig := range c.targets { + c.connectionsMutex.Lock() + newConnections := make(map[string]*ConnectionState) + for name, currentConfig := range c.connections { // Disconnect from everything we want to remove if stringInSlice(name, targetControlMsg.Remove) { err := currentConfig.disconnect() @@ -122,7 +128,7 @@ func (c *ZookeeperConnectionManager) ReloadTargets() { newConfig, exists := targetControlMsg.Insert.Target[name] if !exists { // no config for this target - newTargets[name] = currentConfig + newConnections[name] = currentConfig } else { if !currentConfig.Equal(newConfig) { // target is different; update the current config with the old one and reconnect @@ -135,7 +141,7 @@ func (c *ZookeeperConnectionManager) ReloadTargets() { c.config.Log.Error().Err(err).Msgf("Error reconnecting connection for %s", name) } } - newTargets[name] = currentConfig + newConnections[name] = currentConfig } } } @@ -143,37 +149,40 @@ func (c *ZookeeperConnectionManager) ReloadTargets() { if targetControlMsg.Insert != nil { // make new connections for name, newConfig := range targetControlMsg.Insert.Target { - if _, exists := newTargets[name]; exists { + if _, exists := newConnections[name]; exists { continue } else { // no previous targetCache existed c.config.Log.Info().Msgf("Initializing target %s (%v) %v.", name, newConfig.Addresses, newConfig.Meta) _, noLock := newConfig.Meta["NoLock"] - newTargets[name] = &TargetState{ - config: c.config, - connManager: c, - name: name, - targetCache: c.cache.Add(name), - target: newConfig, - request: targetControlMsg.Insert.Request[newConfig.Request], - useLock: c.zkConn != nil && !noLock, + _, clusterMember := newConfig.Meta["ClusterMember"] + newConnections[name] = &ConnectionState{ + clusterMember: clusterMember, + config: c.config, + connManager: c, + name: name, + targetCache: c.cache.Add(name), + target: newConfig, + request: targetControlMsg.Insert.Request[newConfig.Request], + seen: make(map[string]bool), + useLock: c.zkConn != nil && !noLock, } - newTargets[name].InitializeMetrics() - if newTargets[name].useLock { + newConnections[name].InitializeMetrics() + if newConnections[name].useLock { lockPath := MakeTargetLockPath(c.config.ZookeeperPrefix, name) clusterMemberAddress := c.config.ServerAddress + ":" + strconv.Itoa(c.config.ServerPort) - newTargets[name].lock = locking.NewZookeeperNonBlockingLock(c.zkConn, lockPath, clusterMemberAddress, zk.WorldACL(zk.PermAll)) - go newTargets[name].connectWithLock(c.connLimit) + newConnections[name].lock = locking.NewZookeeperNonBlockingLock(c.zkConn, lockPath, clusterMemberAddress, zk.WorldACL(zk.PermAll)) + go newConnections[name].connectWithLock(c.connLimit) } else { - go newTargets[name].connect(c.connLimit) + go newConnections[name].connect(c.connLimit) } } } } // save the target changes - c.targets = newTargets - c.targetsMutex.Unlock() + c.connections = newConnections + c.connectionsMutex.Unlock() } } } diff --git a/gateway/gateway.go b/gateway/gateway.go index 0690f01..956f65f 100644 --- a/gateway/gateway.go +++ b/gateway/gateway.go @@ -289,7 +289,7 @@ func (g *Gateway) sendUpdateToClients(leaf *ctree.Leaf) { if client.External { notification := leaf.Value().(*gnmi.Notification) target := notification.GetPrefix().GetTarget() - if g.connMgr.HasTargetLock(target) { + if g.connMgr.Forwardable(target) { client.Send(leaf) } } else { diff --git a/gateway/loaders/cluster/cluster.go b/gateway/loaders/cluster/cluster.go index 94d6148..b61eb69 100644 --- a/gateway/loaders/cluster/cluster.go +++ b/gateway/loaders/cluster/cluster.go @@ -110,7 +110,8 @@ func (c ClusterTargetLoader) WatchConfiguration(configChan chan<- *connections.T Credentials: nil, Request: "all", Meta: map[string]string{ - "NoLock": "yes", + "NoLock": "yes", + "ClusterMember": "yes", }, }, }, diff --git a/gateway/server/server.go b/gateway/server/server.go index aa02587..7f3ef29 100644 --- a/gateway/server/server.go +++ b/gateway/server/server.go @@ -500,7 +500,7 @@ func (s *Server) sendStreamingResults(c *streamClient, connMgr connections.Conne return } target := notification.GetPrefix().GetTarget() - if !connMgr.HasTargetLock(target) { + if !connMgr.Forwardable(target) { // Only forward messages to cluster members if we have a local lock for the target return } diff --git a/gateway/server/server_test.go b/gateway/server/server_test.go index b12baef..3453d24 100644 --- a/gateway/server/server_test.go +++ b/gateway/server/server_test.go @@ -1320,7 +1320,7 @@ func (m MockConnectionManager) Cache() *cache.Cache { panic("implement me") } -func (m MockConnectionManager) HasTargetLock(target string) bool { +func (m MockConnectionManager) Forwardable(target string) bool { panic("implement me") }