Skip to content

Commit

Permalink
dont remove self
Browse files Browse the repository at this point in the history
  • Loading branch information
jdutchak committed Aug 2, 2024
1 parent 3fd2f06 commit 701776e
Show file tree
Hide file tree
Showing 3 changed files with 16 additions and 18 deletions.
4 changes: 1 addition & 3 deletions pkg/network/kdht.go
Original file line number Diff line number Diff line change
Expand Up @@ -65,9 +65,7 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
}
peerChan <- pe
if removePeerCallback != nil {
if p != "" {
removePeerCallback(p)
}
removePeerCallback(p)
}
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -178,7 +178,7 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {
multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst),
Context: ctx,
PeerChan: make(chan myNetwork.PeerEvent),
NodeTracker: pubsub2.NewNodeEventTracker(config.Version, cfg.Environment),
NodeTracker: pubsub2.NewNodeEventTracker(config.Version, cfg.Environment, hst.ID().String()),
PubSubManager: subscriptionManager,
IsStaked: isStaked,
IsValidator: cfg.Validator,
Expand Down Expand Up @@ -217,9 +217,7 @@ func (node *OracleNode) Start() (err error) {
go node.handleDiscoveredPeers()

removePeerCallback := func(p peer.ID) {
if p != "" {
node.NodeTracker.RemoveNodeData(p.String())
}
node.NodeTracker.RemoveNodeData(p.String())
}

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked, removePeerCallback)
Expand Down
24 changes: 13 additions & 11 deletions pkg/pubsub/node_event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,15 +32,15 @@ type ConnectBufferEntry struct {
// It initializes the node data map, node data channel, node data file path,
// connect buffer map. It loads existing node data from file, starts a goroutine
// to clear expired buffer entries, and returns the initialized instance.
func NewNodeEventTracker(version, environment string) *NodeEventTracker {
func NewNodeEventTracker(version, environment string, hostId string) *NodeEventTracker {
net := &NodeEventTracker{
nodeData: NewSafeMap(),
NodeDataChan: make(chan *NodeData),
nodeDataFile: fmt.Sprintf("%s_%s_node_data.json", version, environment),
ConnectBuffer: make(map[string]ConnectBufferEntry),
}
go net.ClearExpiredBufferEntries()
go net.StartCleanupRoutine(context.Background())
go net.StartCleanupRoutine(context.Background(), hostId)
return net
}

Expand Down Expand Up @@ -336,8 +336,6 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip
nd.LastUpdatedUnix = nodeData.LastUpdatedUnix
net.nodeData.Set(nodeData.PeerId.String(), nd)

} else {
return nil
}

// If the node data exists, check if the multiaddress is already in the list
Expand Down Expand Up @@ -420,33 +418,33 @@ func (net *NodeEventTracker) ClearExpiredWorkerTimeouts() {
}

const (
maxDisconnectionTime = 3 * time.Minute
cleanupInterval = 5 * time.Minute
maxDisconnectionTime = 1 * time.Minute
cleanupInterval = 2 * time.Minute
)

// StartCleanupRoutine starts a goroutine that periodically checks for and removes stale peers
func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context) {
func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context, hostId string) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
net.cleanupStalePeers()
net.cleanupStalePeers(hostId)
case <-ctx.Done():
return
}
}
}

// cleanupStalePeers checks for and removes stale peers from both the routing table and node data
func (net *NodeEventTracker) cleanupStalePeers() {
func (net *NodeEventTracker) cleanupStalePeers(hostId string) {
now := time.Now()

for _, nodeData := range net.GetAllNodeData() {
if now.Sub(time.Unix(nodeData.LastUpdatedUnix, 0)) > maxDisconnectionTime {
logrus.Infof("Removing stale peer: %s", nodeData.PeerId)
if nodeData.PeerId.String() != "" {
if nodeData.PeerId.String() != hostId {
logrus.Infof("Removing stale peer: %s", nodeData.PeerId)
net.RemoveNodeData(nodeData.PeerId.String())
delete(net.ConnectBuffer, nodeData.PeerId.String())

Expand All @@ -457,6 +455,10 @@ func (net *NodeEventTracker) cleanupStalePeers() {
LastUpdatedUnix: now.Unix(),
}
}

// Use the node parameter to access OracleNode methods if needed
// For example:
// node.SomeMethod(nodeData.PeerId)
}
}
}

0 comments on commit 701776e

Please sign in to comment.