From f09fb2076d4c1376ea855a12e72692c76ef32803 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 8 Oct 2024 15:38:45 -0700 Subject: [PATCH] Feat(workers) implement adaptive worker selection for improved task distribution (#589) * feat(worker-selection): Implement performance-based worker sorting - Add performance metrics fields to NodeData struct - Implement NodeSorter for flexible sorting of worker nodes - Create SortNodesByTwitterReliability function for Twitter workers - Update GetEligibleWorkerNodes to use category-specific sorting - Modify GetEligibleWorkers to use sorted workers and add worker limit This commit enhances the worker selection process by prioritizing workers based on their performance metrics. It introduces a flexible sorting mechanism that can be easily extended to other worker categories in the future. The changes improve reliability and efficiency in task allocation across the Masa Oracle network. * feat(worker-selection): Implement priority-based selection for Twitter work - Update DistributeWork to use priority selection for Twitter category - Maintain round-robin selection for other work categories by shuffling workers - Integrate new GetEligibleWorkers function with work type-specific behavior - Respect MaxRemoteWorkers limit for all work types - Add distinct logging for Twitter and non-Twitter worker selection This commit enhances the work distribution process by implementing priority-based worker selection for Twitter-related tasks while preserving the existing round-robin behavior for other work types. It leverages the newly added performance metrics to choose the most reliable workers for Twitter tasks, and ensures consistent behavior for other categories by shuffling the worker list. This hybrid approach improves efficiency for Twitter tasks while maintaining the expected behavior for all other work types. * Update .gitignore * feat(worker-selection): Implement priority-based sorting for Twitter workers - Add LastNotFoundTime and NotFoundCount fields to NodeData struct - Enhance SortNodesByTwitterReliability function with multi-criteria sorting: 1. Prioritize nodes found more often (lower NotFoundCount) 2. Consider recency of last not-found occurrence 3. Sort by higher number of returned tweets 4. Consider recency of last returned tweet 5. Prioritize nodes with fewer timeouts 6. Consider recency of last timeout 7. Use PeerId for stable sorting when no performance data is available - Remove random shuffling from GetEligibleWorkers function This commit improves worker selection for Twitter tasks by implementing a more sophisticated sorting algorithm that takes into account node reliability and performance metrics. It aims to enhance the efficiency and reliability of task distribution in the Masa Oracle network. * feat(worker-selection): Update Twitter fields in NodeData and Worker Manager Add functions to update Twitter-related metrics in NodeData and integrate updates into Worker Manager processes. This ensures accurate tracking of tweet-related events and peer activity in the system. * feat(worker-selection): Add unit tests for NodeData and NodeDataTracker Introduce unit tests for the NodeData and NodeDataTracker functionalities, covering scenarios involving updates to Twitter-related fields. These tests ensure the correctness of the UpdateTwitterFields method in NodeData and the UpdateNodeDataTwitter method in NodeDataTracker. * chore(workers): update timeouts and bump version --------- Co-authored-by: Bob Stevens <35038919+restevens402@users.noreply.github.com> --- .gitignore | 1 + internal/versioning/version.go | 2 +- pkg/pubsub/node_data.go | 25 +++++ pkg/pubsub/node_event_tracker.go | 91 ++++++++++++++++++- pkg/tests/node_data/node_data_suite_test.go | 13 +++ pkg/tests/node_data/node_data_test.go | 41 +++++++++ pkg/tests/node_data/node_data_tracker_test.go | 60 ++++++++++++ pkg/workers/config.go | 8 +- pkg/workers/worker_manager.go | 47 +++++++++- pkg/workers/worker_selection.go | 13 ++- 10 files changed, 286 insertions(+), 15 deletions(-) create mode 100644 pkg/tests/node_data/node_data_suite_test.go create mode 100644 pkg/tests/node_data/node_data_test.go create mode 100644 pkg/tests/node_data/node_data_tracker_test.go diff --git a/.gitignore b/.gitignore index eaa12583..7b804f9f 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,4 @@ snippets.txt # Build result of goreleaser dist/ +bp-todo.md diff --git a/internal/versioning/version.go b/internal/versioning/version.go index 9c9ee19e..88dea086 100644 --- a/internal/versioning/version.go +++ b/internal/versioning/version.go @@ -5,5 +5,5 @@ var ( // XXX: Bump this value only when there are protocol changes that makes the oracle // incompatible between version! - ProtocolVersion = `v0.8.0` + ProtocolVersion = `v0.8.3` ) diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index 4bd69f6f..ff5ea41d 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -63,6 +63,13 @@ type NodeData struct { Records any `json:"records,omitempty"` Version string `json:"version"` WorkerTimeout time.Time `json:"workerTimeout,omitempty"` + ReturnedTweets int `json:"returnedTweets"` // a running count of the number of tweets returned + LastReturnedTweet time.Time `json:"lastReturnedTweet"` + TweetTimeout bool `json:"tweetTimeout"` + TweetTimeouts int `json:"tweetTimeouts"` // a running countthe number of times a tweet request times out + LastTweetTimeout time.Time `json:"lastTweetTimeout"` + LastNotFoundTime time.Time `json:"lastNotFoundTime"` + NotFoundCount int `json:"notFoundCount"` // a running count of the number of times a node is not found } // NewNodeData creates a new NodeData struct initialized with the given @@ -256,3 +263,21 @@ func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr) { n.Multiaddrs = append(n.Multiaddrs, JSONMultiaddr{Multiaddr: addr}) } } + +func (nd *NodeData) UpdateTwitterFields(fields NodeData) { + if fields.ReturnedTweets != 0 { + nd.ReturnedTweets += fields.ReturnedTweets + } + if !fields.LastReturnedTweet.IsZero() { + nd.LastReturnedTweet = fields.LastReturnedTweet + } + if fields.TweetTimeout { + nd.TweetTimeout = fields.TweetTimeout + nd.TweetTimeouts += fields.TweetTimeouts + nd.LastTweetTimeout = fields.LastTweetTimeout + } + if !fields.LastNotFoundTime.IsZero() { + nd.LastNotFoundTime = fields.LastNotFoundTime + nd.NotFoundCount += fields.NotFoundCount + } +} diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index 91231148..5a52d968 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -29,11 +29,72 @@ type ConnectBufferEntry struct { ConnectTime time.Time } +// NodeSorter provides methods for sorting NodeData slices +type NodeSorter struct { + nodes []NodeData + less func(i, j NodeData) bool +} + +// Len returns the length of the nodes slice +func (s NodeSorter) Len() int { return len(s.nodes) } + +// Swap swaps the nodes at indices i and j +func (s NodeSorter) Swap(i, j int) { s.nodes[i], s.nodes[j] = s.nodes[j], s.nodes[i] } + +// Less compares nodes at indices i and j using the provided less function +func (s NodeSorter) Less(i, j int) bool { return s.less(s.nodes[i], s.nodes[j]) } + +// SortNodesByTwitterReliability sorts the given nodes based on their Twitter reliability. +// It uses multiple criteria to determine the reliability and performance of nodes: +// 1. Prioritizes nodes that have been found more often (lower NotFoundCount) +// 2. Considers the last time a node was not found (earlier LastNotFoundTime is better) +// 3. Sorts by higher number of returned tweets +// 4. Then by more recent last returned tweet +// 5. Then by lower number of timeouts +// 6. Then by less recent last timeout +// 7. Finally, sorts by PeerId for stability when no performance data is available +// +// The function modifies the input slice in-place, sorting the nodes from most to least reliable. +func SortNodesByTwitterReliability(nodes []NodeData) { + sorter := NodeSorter{ + nodes: nodes, + less: func(i, j NodeData) bool { + // First, prioritize nodes that have been found more often + if i.NotFoundCount != j.NotFoundCount { + return i.NotFoundCount < j.NotFoundCount + } + // Then, consider the last time they were not found + if !i.LastNotFoundTime.Equal(j.LastNotFoundTime) { + return i.LastNotFoundTime.Before(j.LastNotFoundTime) + } + // Primary sort: Higher number of returned tweets + if i.ReturnedTweets != j.ReturnedTweets { + return i.ReturnedTweets > j.ReturnedTweets + } + // Secondary sort: More recent last returned tweet + if !i.LastReturnedTweet.Equal(j.LastReturnedTweet) { + return i.LastReturnedTweet.After(j.LastReturnedTweet) + } + // Tertiary sort: Lower number of timeouts + if i.TweetTimeouts != j.TweetTimeouts { + return i.TweetTimeouts < j.TweetTimeouts + } + // Quaternary sort: Less recent last timeout + if !i.LastTweetTimeout.Equal(j.LastTweetTimeout) { + return i.LastTweetTimeout.Before(j.LastTweetTimeout) + } + // Default sort: By PeerId (ensures stable sorting when no performance data is available) + return i.PeerId.String() < j.PeerId.String() + }, + } + sort.Sort(sorter) +} + // NewNodeEventTracker creates a new NodeEventTracker instance. // It initializes the node data map, node data channel, node data file path, // connect buffer map. It loads existing node data from file, starts a goroutine // to clear expired buffer entries, and returns the initialized instance. -func NewNodeEventTracker(version, environment string, hostId string) *NodeEventTracker { +func NewNodeEventTracker(version, environment, hostId string) *NodeEventTracker { net := &NodeEventTracker{ nodeData: NewSafeMap(), nodeVersion: version, @@ -284,6 +345,17 @@ func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []N result = append(result, nodeData) } } + + // Sort the eligible nodes based on the worker category + switch category { + case CategoryTwitter: + SortNodesByTwitterReliability(result) + // Add cases for other categories as needed such as + // web + // discord + // telegram + } + return result } @@ -477,3 +549,20 @@ func (net *NodeEventTracker) cleanupStalePeers(hostId string) { } } } + +func (net *NodeEventTracker) UpdateNodeDataTwitter(peerID string, updates NodeData) error { + nodeData, exists := net.nodeData.Get(peerID) + if !exists { + return fmt.Errorf("node data not found for peer ID: %s", peerID) + } + + // Update fields based on non-zero values + nodeData.UpdateTwitterFields(updates) + + // Save the updated node data + err := net.AddOrUpdateNodeData(nodeData, true) + if err != nil { + return fmt.Errorf("error updating node data: %v", err) + } + return nil +} diff --git a/pkg/tests/node_data/node_data_suite_test.go b/pkg/tests/node_data/node_data_suite_test.go new file mode 100644 index 00000000..9ae8c1aa --- /dev/null +++ b/pkg/tests/node_data/node_data_suite_test.go @@ -0,0 +1,13 @@ +package node_data + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestNodeData(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "NodeData Test Suite") +} diff --git a/pkg/tests/node_data/node_data_test.go b/pkg/tests/node_data/node_data_test.go new file mode 100644 index 00000000..090a8a0c --- /dev/null +++ b/pkg/tests/node_data/node_data_test.go @@ -0,0 +1,41 @@ +package node_data + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/masa-oracle/pkg/pubsub" +) + +var _ = Describe("NodeData", func() { + Describe("UpdateTwitterFields", func() { + It("should correctly update Twitter fields", func() { + initialData := pubsub.NodeData{ + ReturnedTweets: 10, + TweetTimeouts: 2, + NotFoundCount: 1, + } + + updates := pubsub.NodeData{ + ReturnedTweets: 5, + LastReturnedTweet: time.Now(), + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + } + + initialData.UpdateTwitterFields(updates) + + Expect(initialData.ReturnedTweets).To(Equal(15)) + Expect(initialData.TweetTimeouts).To(Equal(3)) + Expect(initialData.NotFoundCount).To(Equal(2)) + Expect(initialData.LastReturnedTweet.IsZero()).To(BeFalse()) + Expect(initialData.LastTweetTimeout.IsZero()).To(BeFalse()) + Expect(initialData.LastNotFoundTime.IsZero()).To(BeFalse()) + }) + }) +}) diff --git a/pkg/tests/node_data/node_data_tracker_test.go b/pkg/tests/node_data/node_data_tracker_test.go new file mode 100644 index 00000000..e43afbfb --- /dev/null +++ b/pkg/tests/node_data/node_data_tracker_test.go @@ -0,0 +1,60 @@ +package node_data + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + . "github.com/masa-finance/masa-oracle/node" + "github.com/masa-finance/masa-oracle/pkg/pubsub" +) + +var _ = Describe("NodeDataTracker", func() { + Context("UpdateNodeDataTwitter", func() { + It("should correctly update NodeData Twitter fields", func() { + testNode, err := NewOracleNode( + context.Background(), + EnableStaked, + EnableRandomIdentity, + ) + Expect(err).NotTo(HaveOccurred()) + + err = testNode.Start() + Expect(err).NotTo(HaveOccurred()) + + initialData := pubsub.NodeData{ + PeerId: testNode.Host.ID(), + LastReturnedTweet: time.Now().Add(-1 * time.Hour), + ReturnedTweets: 10, + TweetTimeout: true, + TweetTimeouts: 2, + LastTweetTimeout: time.Now().Add(-1 * time.Hour), + LastNotFoundTime: time.Now().Add(-1 * time.Hour), + NotFoundCount: 1, + } + + err = testNode.NodeTracker.UpdateNodeDataTwitter(testNode.Host.ID().String(), initialData) + Expect(err).NotTo(HaveOccurred()) + + updates := pubsub.NodeData{ + ReturnedTweets: 5, + LastReturnedTweet: time.Now(), + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + } + + err = testNode.NodeTracker.UpdateNodeDataTwitter(testNode.Host.ID().String(), updates) + Expect(err).NotTo(HaveOccurred()) + + updatedData := testNode.NodeTracker.GetNodeData(testNode.Host.ID().String()) + Expect(updatedData.ReturnedTweets).To(Equal(15)) + Expect(updatedData.TweetTimeouts).To(Equal(3)) + Expect(updatedData.NotFoundCount).To(Equal(2)) + }) + }) +}) diff --git a/pkg/workers/config.go b/pkg/workers/config.go index 03c38957..ed5b74e5 100644 --- a/pkg/workers/config.go +++ b/pkg/workers/config.go @@ -17,13 +17,13 @@ type WorkerConfig struct { } var DefaultConfig = WorkerConfig{ - WorkerTimeout: 55 * time.Second, - WorkerResponseTimeout: 45 * time.Second, - ConnectionTimeout: 10 * time.Second, + WorkerTimeout: 45 * time.Second, + WorkerResponseTimeout: 35 * time.Second, + ConnectionTimeout: 500 * time.Millisecond, MaxRetries: 1, MaxSpawnAttempts: 1, WorkerBufferSize: 100, - MaxRemoteWorkers: 10, + MaxRemoteWorkers: 100, } var workerConfig *WorkerConfig diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index 1665fc61..9aff4972 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math/rand" "strings" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/event" + "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/masa-finance/masa-oracle/pkg/workers/handlers" data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) @@ -97,11 +99,25 @@ func (whm *WorkHandlerManager) getWorkHandler(wType data_types.WorkerType) (Work func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse) { category := data_types.WorkerTypeToCategory(workRequest.WorkType) - remoteWorkers, localWorker := GetEligibleWorkers(node, category) + var remoteWorkers []data_types.Worker + var localWorker *data_types.Worker + + if category == pubsub.CategoryTwitter { + // Use priority-based selection for Twitter work + remoteWorkers, localWorker = GetEligibleWorkers(node, category, workerConfig.MaxRemoteWorkers) + logrus.Info("Starting priority-based worker selection for Twitter work") + } else { + // Use existing selection for other work types + remoteWorkers, localWorker = GetEligibleWorkers(node, category, 0) + // Shuffle the workers to maintain round-robin behavior + rand.Shuffle(len(remoteWorkers), func(i, j int) { + remoteWorkers[i], remoteWorkers[j] = remoteWorkers[j], remoteWorkers[i] + }) + logrus.Info("Starting round-robin worker selection for non-Twitter work") + } remoteWorkersAttempted := 0 var errorList []string - logrus.Info("Starting round-robin worker selection") // Try remote workers first, up to MaxRemoteWorkers for _, worker := range remoteWorkers { @@ -115,6 +131,15 @@ func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest peerInfo, err := node.DHT.FindPeer(context.Background(), worker.NodeData.PeerId) if err != nil { logrus.Warnf("Failed to find peer %s in DHT: %v", worker.NodeData.PeerId.String(), err) + if category == pubsub.CategoryTwitter { + err := node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + }) + if err != nil { + logrus.Warnf("Failed to update node data for peer %s: %v", worker.NodeData.PeerId.String(), err) + } + } continue } @@ -241,6 +266,24 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *node.OracleNode, worker da response.Error = fmt.Sprintf("error unmarshaling response: %v", err) return } + // Update metrics only if the work category is Twitter + if data_types.WorkerTypeToCategory(workRequest.WorkType) == pubsub.CategoryTwitter { + if response.Error == "" { + err = node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + ReturnedTweets: response.RecordCount, + LastReturnedTweet: time.Now(), + }) + } else { + err = node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + }) + } + if err != nil { + logrus.Warnf("Failed to update node data for peer %s: %v", worker.NodeData.PeerId.String(), err) + } + } } return response } diff --git a/pkg/workers/worker_selection.go b/pkg/workers/worker_selection.go index 25b2e468..90b14d34 100644 --- a/pkg/workers/worker_selection.go +++ b/pkg/workers/worker_selection.go @@ -1,8 +1,6 @@ package workers import ( - "math/rand/v2" - "github.com/libp2p/go-libp2p/core/peer" "github.com/sirupsen/logrus" @@ -13,15 +11,11 @@ import ( // GetEligibleWorkers Uses the new NodeTracker method to get the eligible workers for a given message type // I'm leaving this returning an array so that we can easily increase the number of workers in the future -func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory) ([]data_types.Worker, *data_types.Worker) { +func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory, limit int) ([]data_types.Worker, *data_types.Worker) { workers := make([]data_types.Worker, 0) nodes := node.NodeTracker.GetEligibleWorkerNodes(category) var localWorker *data_types.Worker - rand.Shuffle(len(nodes), func(i, j int) { - nodes[i], nodes[j] = nodes[j], nodes[i] - }) - logrus.Info("Getting eligible workers") for _, eligible := range nodes { if eligible.PeerId.String() == node.Host.ID().String() { @@ -33,6 +27,11 @@ func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory) ( continue } workers = append(workers, data_types.Worker{IsLocal: false, NodeData: eligible}) + + // Apply limit if specified + if limit > 0 && len(workers) >= limit { + break + } } logrus.Infof("Found %d eligible remote workers", len(workers))