diff --git a/.gitignore b/.gitignore index eaa12583..7b804f9f 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,4 @@ snippets.txt # Build result of goreleaser dist/ +bp-todo.md diff --git a/internal/versioning/version.go b/internal/versioning/version.go index 9c9ee19e..88dea086 100644 --- a/internal/versioning/version.go +++ b/internal/versioning/version.go @@ -5,5 +5,5 @@ var ( // XXX: Bump this value only when there are protocol changes that makes the oracle // incompatible between version! - ProtocolVersion = `v0.8.0` + ProtocolVersion = `v0.8.3` ) diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index 4bd69f6f..ff5ea41d 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -63,6 +63,13 @@ type NodeData struct { Records any `json:"records,omitempty"` Version string `json:"version"` WorkerTimeout time.Time `json:"workerTimeout,omitempty"` + ReturnedTweets int `json:"returnedTweets"` // a running count of the number of tweets returned + LastReturnedTweet time.Time `json:"lastReturnedTweet"` + TweetTimeout bool `json:"tweetTimeout"` + TweetTimeouts int `json:"tweetTimeouts"` // a running countthe number of times a tweet request times out + LastTweetTimeout time.Time `json:"lastTweetTimeout"` + LastNotFoundTime time.Time `json:"lastNotFoundTime"` + NotFoundCount int `json:"notFoundCount"` // a running count of the number of times a node is not found } // NewNodeData creates a new NodeData struct initialized with the given @@ -256,3 +263,21 @@ func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr) { n.Multiaddrs = append(n.Multiaddrs, JSONMultiaddr{Multiaddr: addr}) } } + +func (nd *NodeData) UpdateTwitterFields(fields NodeData) { + if fields.ReturnedTweets != 0 { + nd.ReturnedTweets += fields.ReturnedTweets + } + if !fields.LastReturnedTweet.IsZero() { + nd.LastReturnedTweet = fields.LastReturnedTweet + } + if fields.TweetTimeout { + nd.TweetTimeout = fields.TweetTimeout + nd.TweetTimeouts += fields.TweetTimeouts + nd.LastTweetTimeout = fields.LastTweetTimeout + } + if !fields.LastNotFoundTime.IsZero() { + nd.LastNotFoundTime = fields.LastNotFoundTime + nd.NotFoundCount += fields.NotFoundCount + } +} diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index 91231148..5a52d968 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -29,11 +29,72 @@ type ConnectBufferEntry struct { ConnectTime time.Time } +// NodeSorter provides methods for sorting NodeData slices +type NodeSorter struct { + nodes []NodeData + less func(i, j NodeData) bool +} + +// Len returns the length of the nodes slice +func (s NodeSorter) Len() int { return len(s.nodes) } + +// Swap swaps the nodes at indices i and j +func (s NodeSorter) Swap(i, j int) { s.nodes[i], s.nodes[j] = s.nodes[j], s.nodes[i] } + +// Less compares nodes at indices i and j using the provided less function +func (s NodeSorter) Less(i, j int) bool { return s.less(s.nodes[i], s.nodes[j]) } + +// SortNodesByTwitterReliability sorts the given nodes based on their Twitter reliability. +// It uses multiple criteria to determine the reliability and performance of nodes: +// 1. Prioritizes nodes that have been found more often (lower NotFoundCount) +// 2. Considers the last time a node was not found (earlier LastNotFoundTime is better) +// 3. Sorts by higher number of returned tweets +// 4. Then by more recent last returned tweet +// 5. Then by lower number of timeouts +// 6. Then by less recent last timeout +// 7. Finally, sorts by PeerId for stability when no performance data is available +// +// The function modifies the input slice in-place, sorting the nodes from most to least reliable. +func SortNodesByTwitterReliability(nodes []NodeData) { + sorter := NodeSorter{ + nodes: nodes, + less: func(i, j NodeData) bool { + // First, prioritize nodes that have been found more often + if i.NotFoundCount != j.NotFoundCount { + return i.NotFoundCount < j.NotFoundCount + } + // Then, consider the last time they were not found + if !i.LastNotFoundTime.Equal(j.LastNotFoundTime) { + return i.LastNotFoundTime.Before(j.LastNotFoundTime) + } + // Primary sort: Higher number of returned tweets + if i.ReturnedTweets != j.ReturnedTweets { + return i.ReturnedTweets > j.ReturnedTweets + } + // Secondary sort: More recent last returned tweet + if !i.LastReturnedTweet.Equal(j.LastReturnedTweet) { + return i.LastReturnedTweet.After(j.LastReturnedTweet) + } + // Tertiary sort: Lower number of timeouts + if i.TweetTimeouts != j.TweetTimeouts { + return i.TweetTimeouts < j.TweetTimeouts + } + // Quaternary sort: Less recent last timeout + if !i.LastTweetTimeout.Equal(j.LastTweetTimeout) { + return i.LastTweetTimeout.Before(j.LastTweetTimeout) + } + // Default sort: By PeerId (ensures stable sorting when no performance data is available) + return i.PeerId.String() < j.PeerId.String() + }, + } + sort.Sort(sorter) +} + // NewNodeEventTracker creates a new NodeEventTracker instance. // It initializes the node data map, node data channel, node data file path, // connect buffer map. It loads existing node data from file, starts a goroutine // to clear expired buffer entries, and returns the initialized instance. -func NewNodeEventTracker(version, environment string, hostId string) *NodeEventTracker { +func NewNodeEventTracker(version, environment, hostId string) *NodeEventTracker { net := &NodeEventTracker{ nodeData: NewSafeMap(), nodeVersion: version, @@ -284,6 +345,17 @@ func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []N result = append(result, nodeData) } } + + // Sort the eligible nodes based on the worker category + switch category { + case CategoryTwitter: + SortNodesByTwitterReliability(result) + // Add cases for other categories as needed such as + // web + // discord + // telegram + } + return result } @@ -477,3 +549,20 @@ func (net *NodeEventTracker) cleanupStalePeers(hostId string) { } } } + +func (net *NodeEventTracker) UpdateNodeDataTwitter(peerID string, updates NodeData) error { + nodeData, exists := net.nodeData.Get(peerID) + if !exists { + return fmt.Errorf("node data not found for peer ID: %s", peerID) + } + + // Update fields based on non-zero values + nodeData.UpdateTwitterFields(updates) + + // Save the updated node data + err := net.AddOrUpdateNodeData(nodeData, true) + if err != nil { + return fmt.Errorf("error updating node data: %v", err) + } + return nil +} diff --git a/pkg/tests/node_data/node_data_suite_test.go b/pkg/tests/node_data/node_data_suite_test.go new file mode 100644 index 00000000..9ae8c1aa --- /dev/null +++ b/pkg/tests/node_data/node_data_suite_test.go @@ -0,0 +1,13 @@ +package node_data + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestNodeData(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "NodeData Test Suite") +} diff --git a/pkg/tests/node_data/node_data_test.go b/pkg/tests/node_data/node_data_test.go new file mode 100644 index 00000000..090a8a0c --- /dev/null +++ b/pkg/tests/node_data/node_data_test.go @@ -0,0 +1,41 @@ +package node_data + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/masa-oracle/pkg/pubsub" +) + +var _ = Describe("NodeData", func() { + Describe("UpdateTwitterFields", func() { + It("should correctly update Twitter fields", func() { + initialData := pubsub.NodeData{ + ReturnedTweets: 10, + TweetTimeouts: 2, + NotFoundCount: 1, + } + + updates := pubsub.NodeData{ + ReturnedTweets: 5, + LastReturnedTweet: time.Now(), + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + } + + initialData.UpdateTwitterFields(updates) + + Expect(initialData.ReturnedTweets).To(Equal(15)) + Expect(initialData.TweetTimeouts).To(Equal(3)) + Expect(initialData.NotFoundCount).To(Equal(2)) + Expect(initialData.LastReturnedTweet.IsZero()).To(BeFalse()) + Expect(initialData.LastTweetTimeout.IsZero()).To(BeFalse()) + Expect(initialData.LastNotFoundTime.IsZero()).To(BeFalse()) + }) + }) +}) diff --git a/pkg/tests/node_data/node_data_tracker_test.go b/pkg/tests/node_data/node_data_tracker_test.go new file mode 100644 index 00000000..e43afbfb --- /dev/null +++ b/pkg/tests/node_data/node_data_tracker_test.go @@ -0,0 +1,60 @@ +package node_data + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + . "github.com/masa-finance/masa-oracle/node" + "github.com/masa-finance/masa-oracle/pkg/pubsub" +) + +var _ = Describe("NodeDataTracker", func() { + Context("UpdateNodeDataTwitter", func() { + It("should correctly update NodeData Twitter fields", func() { + testNode, err := NewOracleNode( + context.Background(), + EnableStaked, + EnableRandomIdentity, + ) + Expect(err).NotTo(HaveOccurred()) + + err = testNode.Start() + Expect(err).NotTo(HaveOccurred()) + + initialData := pubsub.NodeData{ + PeerId: testNode.Host.ID(), + LastReturnedTweet: time.Now().Add(-1 * time.Hour), + ReturnedTweets: 10, + TweetTimeout: true, + TweetTimeouts: 2, + LastTweetTimeout: time.Now().Add(-1 * time.Hour), + LastNotFoundTime: time.Now().Add(-1 * time.Hour), + NotFoundCount: 1, + } + + err = testNode.NodeTracker.UpdateNodeDataTwitter(testNode.Host.ID().String(), initialData) + Expect(err).NotTo(HaveOccurred()) + + updates := pubsub.NodeData{ + ReturnedTweets: 5, + LastReturnedTweet: time.Now(), + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + } + + err = testNode.NodeTracker.UpdateNodeDataTwitter(testNode.Host.ID().String(), updates) + Expect(err).NotTo(HaveOccurred()) + + updatedData := testNode.NodeTracker.GetNodeData(testNode.Host.ID().String()) + Expect(updatedData.ReturnedTweets).To(Equal(15)) + Expect(updatedData.TweetTimeouts).To(Equal(3)) + Expect(updatedData.NotFoundCount).To(Equal(2)) + }) + }) +}) diff --git a/pkg/workers/config.go b/pkg/workers/config.go index 03c38957..ed5b74e5 100644 --- a/pkg/workers/config.go +++ b/pkg/workers/config.go @@ -17,13 +17,13 @@ type WorkerConfig struct { } var DefaultConfig = WorkerConfig{ - WorkerTimeout: 55 * time.Second, - WorkerResponseTimeout: 45 * time.Second, - ConnectionTimeout: 10 * time.Second, + WorkerTimeout: 45 * time.Second, + WorkerResponseTimeout: 35 * time.Second, + ConnectionTimeout: 500 * time.Millisecond, MaxRetries: 1, MaxSpawnAttempts: 1, WorkerBufferSize: 100, - MaxRemoteWorkers: 10, + MaxRemoteWorkers: 100, } var workerConfig *WorkerConfig diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index 1665fc61..9aff4972 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math/rand" "strings" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/event" + "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/masa-finance/masa-oracle/pkg/workers/handlers" data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) @@ -97,11 +99,25 @@ func (whm *WorkHandlerManager) getWorkHandler(wType data_types.WorkerType) (Work func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse) { category := data_types.WorkerTypeToCategory(workRequest.WorkType) - remoteWorkers, localWorker := GetEligibleWorkers(node, category) + var remoteWorkers []data_types.Worker + var localWorker *data_types.Worker + + if category == pubsub.CategoryTwitter { + // Use priority-based selection for Twitter work + remoteWorkers, localWorker = GetEligibleWorkers(node, category, workerConfig.MaxRemoteWorkers) + logrus.Info("Starting priority-based worker selection for Twitter work") + } else { + // Use existing selection for other work types + remoteWorkers, localWorker = GetEligibleWorkers(node, category, 0) + // Shuffle the workers to maintain round-robin behavior + rand.Shuffle(len(remoteWorkers), func(i, j int) { + remoteWorkers[i], remoteWorkers[j] = remoteWorkers[j], remoteWorkers[i] + }) + logrus.Info("Starting round-robin worker selection for non-Twitter work") + } remoteWorkersAttempted := 0 var errorList []string - logrus.Info("Starting round-robin worker selection") // Try remote workers first, up to MaxRemoteWorkers for _, worker := range remoteWorkers { @@ -115,6 +131,15 @@ func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest peerInfo, err := node.DHT.FindPeer(context.Background(), worker.NodeData.PeerId) if err != nil { logrus.Warnf("Failed to find peer %s in DHT: %v", worker.NodeData.PeerId.String(), err) + if category == pubsub.CategoryTwitter { + err := node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + }) + if err != nil { + logrus.Warnf("Failed to update node data for peer %s: %v", worker.NodeData.PeerId.String(), err) + } + } continue } @@ -241,6 +266,24 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *node.OracleNode, worker da response.Error = fmt.Sprintf("error unmarshaling response: %v", err) return } + // Update metrics only if the work category is Twitter + if data_types.WorkerTypeToCategory(workRequest.WorkType) == pubsub.CategoryTwitter { + if response.Error == "" { + err = node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + ReturnedTweets: response.RecordCount, + LastReturnedTweet: time.Now(), + }) + } else { + err = node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + }) + } + if err != nil { + logrus.Warnf("Failed to update node data for peer %s: %v", worker.NodeData.PeerId.String(), err) + } + } } return response } diff --git a/pkg/workers/worker_selection.go b/pkg/workers/worker_selection.go index 25b2e468..90b14d34 100644 --- a/pkg/workers/worker_selection.go +++ b/pkg/workers/worker_selection.go @@ -1,8 +1,6 @@ package workers import ( - "math/rand/v2" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sirupsen/logrus" @@ -13,15 +11,11 @@ import ( // GetEligibleWorkers Uses the new NodeTracker method to get the eligible workers for a given message type // I'm leaving this returning an array so that we can easily increase the number of workers in the future -func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory) ([]data_types.Worker, *data_types.Worker) { +func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory, limit int) ([]data_types.Worker, *data_types.Worker) { workers := make([]data_types.Worker, 0) nodes := node.NodeTracker.GetEligibleWorkerNodes(category) var localWorker *data_types.Worker - rand.Shuffle(len(nodes), func(i, j int) { - nodes[i], nodes[j] = nodes[j], nodes[i] - }) - logrus.Info("Getting eligible workers") for _, eligible := range nodes { if eligible.PeerId.String() == node.Host.ID().String() { @@ -33,6 +27,11 @@ func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory) ( continue } workers = append(workers, data_types.Worker{IsLocal: false, NodeData: eligible}) + + // Apply limit if specified + if limit > 0 && len(workers) >= limit { + break + } } logrus.Infof("Found %d eligible remote workers", len(workers))