Skip to content

Commit

Permalink
Add mutex locks for thread safety in network operations
Browse files Browse the repository at this point in the history
Introduce mutex locks in the peer connection count and node data update functions to ensure thread safety. These changes prevent potential race conditions when multiple goroutines access shared resources. Note that some outdated comments have been updated to reflect the need for careful cleanup.
  • Loading branch information
restevens402 committed Dec 3, 2024
1 parent 290c72a commit 8476646
Show file tree
Hide file tree
Showing 2 changed files with 11 additions and 6 deletions.
6 changes: 5 additions & 1 deletion pkg/network/kdht.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,14 +30,16 @@ type dbValidator struct{}
func (dbValidator) Validate(_ string, _ []byte) error { return nil }
func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

var peerCountMutex sync.Mutex

func EnableDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, protocolId, prefix protocol.ID, peerChan chan PeerEvent, nodeData *pubsub.NodeData) (*dht.IpfsDHT, error) {
options := make([]dht.Option, 0)
options = append(options, dht.BucketSize(100)) // Adjust bucket size
options = append(options, dht.Concurrency(100)) // Increase concurrency
options = append(options, dht.RoutingTableRefreshPeriod(time.Minute*5)) // Set refresh interval
options = append(options, dht.Mode(dht.ModeAutoServer))
options = append(options, dht.ProtocolPrefix(prefix))
// WTF: Why?
// TODO: If we are no longer using this it should be cleaned up, but carefully. the Node Listener assumes this is there
options = append(options, dht.NamespacedValidator("db", dbValidator{}))

kademliaDHT, err := dht.New(ctx, host, options...)
Expand Down Expand Up @@ -115,7 +117,9 @@ func EnableDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.M
}
return
}
peerCountMutex.Lock()
peerConnectionCount++
peerCountMutex.Unlock()
defer func(stream network.Stream) {
err := stream.Close()
if err != nil {
Expand Down
11 changes: 6 additions & 5 deletions pkg/pubsub/node_event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"encoding/json"
"fmt"
"sort"
"sync"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
Expand All @@ -18,10 +19,9 @@ import (

type NodeEventTracker struct {
NodeDataChan chan *NodeData
// WTF: Do we really need this? Can't we store it in the libp2p PeerStore metadata?
nodeData *SafeMap
// WTF: Unused?
nodeDataFile string
// TODO: Do we really need this? Can't we store it in the libp2p PeerStore metadata?
nodeData *SafeMap
NodeDataMutex sync.Mutex
ConnectBuffer map[string]ConnectBufferEntry
nodeVersion string
}
Expand Down Expand Up @@ -99,7 +99,6 @@ func NewNodeEventTracker(version, environment, hostId string) *NodeEventTracker
nodeData: NewSafeMap(),
nodeVersion: version,
NodeDataChan: make(chan *NodeData),
nodeDataFile: fmt.Sprintf("%s_%s_node_data.json", version, environment),
ConnectBuffer: make(map[string]ConnectBufferEntry),
}
go net.ClearExpiredBufferEntries()
Expand Down Expand Up @@ -377,6 +376,8 @@ func (net *NodeEventTracker) IsStaked(peerID string) bool {
// It also sends the updated node data to the NodeDataChan if the data changed or forceGossip is true.
func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip bool) error {
logrus.Debugf("Handling node data for: %s", nodeData.PeerId)
net.NodeDataMutex.Lock()
defer net.NodeDataMutex.Unlock()
dataChanged := false

nd, ok := net.nodeData.Get(nodeData.PeerId.String())
Expand Down

0 comments on commit 8476646

Please sign in to comment.