diff --git a/pkg/network/kdht.go b/pkg/network/kdht.go index 26e592fb..9b6cd1a1 100644 --- a/pkg/network/kdht.go +++ b/pkg/network/kdht.go @@ -65,9 +65,7 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul } peerChan <- pe if removePeerCallback != nil { - if p != "" { - removePeerCallback(p) - } + removePeerCallback(p) } } diff --git a/pkg/oracle_node.go b/pkg/oracle_node.go index 0346e504..ee93b70f 100644 --- a/pkg/oracle_node.go +++ b/pkg/oracle_node.go @@ -178,7 +178,7 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), Context: ctx, PeerChan: make(chan myNetwork.PeerEvent), - NodeTracker: pubsub2.NewNodeEventTracker(config.Version, cfg.Environment), + NodeTracker: pubsub2.NewNodeEventTracker(config.Version, cfg.Environment, hst.ID().String()), PubSubManager: subscriptionManager, IsStaked: isStaked, IsValidator: cfg.Validator, @@ -217,9 +217,7 @@ func (node *OracleNode) Start() (err error) { go node.handleDiscoveredPeers() removePeerCallback := func(p peer.ID) { - if p != "" { - node.NodeTracker.RemoveNodeData(p.String()) - } + node.NodeTracker.RemoveNodeData(p.String()) } node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked, removePeerCallback) diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index 14bee14b..f5402bda 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -32,7 +32,7 @@ type ConnectBufferEntry struct { // It initializes the node data map, node data channel, node data file path, // connect buffer map. It loads existing node data from file, starts a goroutine // to clear expired buffer entries, and returns the initialized instance. -func NewNodeEventTracker(version, environment string) *NodeEventTracker { +func NewNodeEventTracker(version, environment string, hostId string) *NodeEventTracker { net := &NodeEventTracker{ nodeData: NewSafeMap(), NodeDataChan: make(chan *NodeData), @@ -40,7 +40,7 @@ func NewNodeEventTracker(version, environment string) *NodeEventTracker { ConnectBuffer: make(map[string]ConnectBufferEntry), } go net.ClearExpiredBufferEntries() - go net.StartCleanupRoutine(context.Background()) + go net.StartCleanupRoutine(context.Background(), hostId) return net } @@ -336,8 +336,6 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip nd.LastUpdatedUnix = nodeData.LastUpdatedUnix net.nodeData.Set(nodeData.PeerId.String(), nd) - } else { - return nil } // If the node data exists, check if the multiaddress is already in the list @@ -420,19 +418,19 @@ func (net *NodeEventTracker) ClearExpiredWorkerTimeouts() { } const ( - maxDisconnectionTime = 3 * time.Minute - cleanupInterval = 5 * time.Minute + maxDisconnectionTime = 1 * time.Minute + cleanupInterval = 2 * time.Minute ) // StartCleanupRoutine starts a goroutine that periodically checks for and removes stale peers -func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context) { +func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context, hostId string) { ticker := time.NewTicker(cleanupInterval) defer ticker.Stop() for { select { case <-ticker.C: - net.cleanupStalePeers() + net.cleanupStalePeers(hostId) case <-ctx.Done(): return } @@ -440,13 +438,13 @@ func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context) { } // cleanupStalePeers checks for and removes stale peers from both the routing table and node data -func (net *NodeEventTracker) cleanupStalePeers() { +func (net *NodeEventTracker) cleanupStalePeers(hostId string) { now := time.Now() for _, nodeData := range net.GetAllNodeData() { if now.Sub(time.Unix(nodeData.LastUpdatedUnix, 0)) > maxDisconnectionTime { - logrus.Infof("Removing stale peer: %s", nodeData.PeerId) - if nodeData.PeerId.String() != "" { + if nodeData.PeerId.String() != hostId { + logrus.Infof("Removing stale peer: %s", nodeData.PeerId) net.RemoveNodeData(nodeData.PeerId.String()) delete(net.ConnectBuffer, nodeData.PeerId.String()) @@ -457,6 +455,10 @@ func (net *NodeEventTracker) cleanupStalePeers() { LastUpdatedUnix: now.Unix(), } } + + // Use the node parameter to access OracleNode methods if needed + // For example: + // node.SomeMethod(nodeData.PeerId) } } }