Skip to content

Commit

Permalink
Feat(workers) implement adaptive worker selection for improved task d…
Browse files Browse the repository at this point in the history
…istribution (#589)

* feat(worker-selection): Implement performance-based worker sorting

- Add performance metrics fields to NodeData struct
- Implement NodeSorter for flexible sorting of worker nodes
- Create SortNodesByTwitterReliability function for Twitter workers
- Update GetEligibleWorkerNodes to use category-specific sorting
- Modify GetEligibleWorkers to use sorted workers and add worker limit

This commit enhances the worker selection process by prioritizing workers
based on their performance metrics. It introduces a flexible sorting
mechanism that can be easily extended to other worker categories in the
future. The changes improve reliability and efficiency in task allocation
across the Masa Oracle network.

* feat(worker-selection): Implement priority-based selection for Twitter work

- Update DistributeWork to use priority selection for Twitter category
- Maintain round-robin selection for other work categories by shuffling workers
- Integrate new GetEligibleWorkers function with work type-specific behavior
- Respect MaxRemoteWorkers limit for all work types
- Add distinct logging for Twitter and non-Twitter worker selection

This commit enhances the work distribution process by implementing
priority-based worker selection for Twitter-related tasks while
preserving the existing round-robin behavior for other work types.
It leverages the newly added performance metrics to choose the most
reliable workers for Twitter tasks, and ensures consistent behavior
for other categories by shuffling the worker list. This hybrid approach
improves efficiency for Twitter tasks while maintaining the expected
behavior for all other work types.

* Update .gitignore

* feat(worker-selection): Implement priority-based sorting for Twitter workers

- Add LastNotFoundTime and NotFoundCount fields to NodeData struct
- Enhance SortNodesByTwitterReliability function with multi-criteria sorting:
  1. Prioritize nodes found more often (lower NotFoundCount)
  2. Consider recency of last not-found occurrence
  3. Sort by higher number of returned tweets
  4. Consider recency of last returned tweet
  5. Prioritize nodes with fewer timeouts
  6. Consider recency of last timeout
  7. Use PeerId for stable sorting when no performance data is available
- Remove random shuffling from GetEligibleWorkers function

This commit improves worker selection for Twitter tasks by implementing
a more sophisticated sorting algorithm that takes into account node
reliability and performance metrics. It aims to enhance the efficiency
and reliability of task distribution in the Masa Oracle network.

* feat(worker-selection): Update Twitter fields in NodeData and Worker Manager

Add functions to update Twitter-related metrics in NodeData and integrate updates into Worker Manager processes. This ensures accurate tracking of tweet-related events and peer activity in the system.

* feat(worker-selection): Add unit tests for NodeData and NodeDataTracker

Introduce unit tests for the NodeData and NodeDataTracker functionalities, covering scenarios involving updates to Twitter-related fields. These tests ensure the correctness of the UpdateTwitterFields method in NodeData and the UpdateNodeDataTwitter method in NodeDataTracker.

* chore(workers): update timeouts and bump version

---------

Co-authored-by: Bob Stevens <[email protected]>
  • Loading branch information
teslashibe and restevens402 authored Oct 8, 2024
1 parent 0ef0df4 commit f09fb20
Show file tree
Hide file tree
Showing 10 changed files with 286 additions and 15 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,4 @@ snippets.txt

# Build result of goreleaser
dist/
bp-todo.md
2 changes: 1 addition & 1 deletion internal/versioning/version.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,5 +5,5 @@ var (

// XXX: Bump this value only when there are protocol changes that makes the oracle
// incompatible between version!
ProtocolVersion = `v0.8.0`
ProtocolVersion = `v0.8.3`
)
25 changes: 25 additions & 0 deletions pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,13 @@ type NodeData struct {
Records any `json:"records,omitempty"`
Version string `json:"version"`
WorkerTimeout time.Time `json:"workerTimeout,omitempty"`
ReturnedTweets int `json:"returnedTweets"` // a running count of the number of tweets returned
LastReturnedTweet time.Time `json:"lastReturnedTweet"`
TweetTimeout bool `json:"tweetTimeout"`
TweetTimeouts int `json:"tweetTimeouts"` // a running countthe number of times a tweet request times out
LastTweetTimeout time.Time `json:"lastTweetTimeout"`
LastNotFoundTime time.Time `json:"lastNotFoundTime"`
NotFoundCount int `json:"notFoundCount"` // a running count of the number of times a node is not found
}

// NewNodeData creates a new NodeData struct initialized with the given
Expand Down Expand Up @@ -256,3 +263,21 @@ func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr) {
n.Multiaddrs = append(n.Multiaddrs, JSONMultiaddr{Multiaddr: addr})
}
}

func (nd *NodeData) UpdateTwitterFields(fields NodeData) {
if fields.ReturnedTweets != 0 {
nd.ReturnedTweets += fields.ReturnedTweets
}
if !fields.LastReturnedTweet.IsZero() {
nd.LastReturnedTweet = fields.LastReturnedTweet
}
if fields.TweetTimeout {
nd.TweetTimeout = fields.TweetTimeout
nd.TweetTimeouts += fields.TweetTimeouts
nd.LastTweetTimeout = fields.LastTweetTimeout
}
if !fields.LastNotFoundTime.IsZero() {
nd.LastNotFoundTime = fields.LastNotFoundTime
nd.NotFoundCount += fields.NotFoundCount
}
}
91 changes: 90 additions & 1 deletion pkg/pubsub/node_event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,11 +29,72 @@ type ConnectBufferEntry struct {
ConnectTime time.Time
}

// NodeSorter provides methods for sorting NodeData slices
type NodeSorter struct {
nodes []NodeData
less func(i, j NodeData) bool
}

// Len returns the length of the nodes slice
func (s NodeSorter) Len() int { return len(s.nodes) }

// Swap swaps the nodes at indices i and j
func (s NodeSorter) Swap(i, j int) { s.nodes[i], s.nodes[j] = s.nodes[j], s.nodes[i] }

// Less compares nodes at indices i and j using the provided less function
func (s NodeSorter) Less(i, j int) bool { return s.less(s.nodes[i], s.nodes[j]) }

// SortNodesByTwitterReliability sorts the given nodes based on their Twitter reliability.
// It uses multiple criteria to determine the reliability and performance of nodes:
// 1. Prioritizes nodes that have been found more often (lower NotFoundCount)
// 2. Considers the last time a node was not found (earlier LastNotFoundTime is better)
// 3. Sorts by higher number of returned tweets
// 4. Then by more recent last returned tweet
// 5. Then by lower number of timeouts
// 6. Then by less recent last timeout
// 7. Finally, sorts by PeerId for stability when no performance data is available
//
// The function modifies the input slice in-place, sorting the nodes from most to least reliable.
func SortNodesByTwitterReliability(nodes []NodeData) {
sorter := NodeSorter{
nodes: nodes,
less: func(i, j NodeData) bool {
// First, prioritize nodes that have been found more often
if i.NotFoundCount != j.NotFoundCount {
return i.NotFoundCount < j.NotFoundCount
}
// Then, consider the last time they were not found
if !i.LastNotFoundTime.Equal(j.LastNotFoundTime) {
return i.LastNotFoundTime.Before(j.LastNotFoundTime)
}
// Primary sort: Higher number of returned tweets
if i.ReturnedTweets != j.ReturnedTweets {
return i.ReturnedTweets > j.ReturnedTweets
}
// Secondary sort: More recent last returned tweet
if !i.LastReturnedTweet.Equal(j.LastReturnedTweet) {
return i.LastReturnedTweet.After(j.LastReturnedTweet)
}
// Tertiary sort: Lower number of timeouts
if i.TweetTimeouts != j.TweetTimeouts {
return i.TweetTimeouts < j.TweetTimeouts
}
// Quaternary sort: Less recent last timeout
if !i.LastTweetTimeout.Equal(j.LastTweetTimeout) {
return i.LastTweetTimeout.Before(j.LastTweetTimeout)
}
// Default sort: By PeerId (ensures stable sorting when no performance data is available)
return i.PeerId.String() < j.PeerId.String()
},
}
sort.Sort(sorter)
}

// NewNodeEventTracker creates a new NodeEventTracker instance.
// It initializes the node data map, node data channel, node data file path,
// connect buffer map. It loads existing node data from file, starts a goroutine
// to clear expired buffer entries, and returns the initialized instance.
func NewNodeEventTracker(version, environment string, hostId string) *NodeEventTracker {
func NewNodeEventTracker(version, environment, hostId string) *NodeEventTracker {
net := &NodeEventTracker{
nodeData: NewSafeMap(),
nodeVersion: version,
Expand Down Expand Up @@ -284,6 +345,17 @@ func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []N
result = append(result, nodeData)
}
}

// Sort the eligible nodes based on the worker category
switch category {
case CategoryTwitter:
SortNodesByTwitterReliability(result)
// Add cases for other categories as needed such as
// web
// discord
// telegram
}

return result
}

Expand Down Expand Up @@ -477,3 +549,20 @@ func (net *NodeEventTracker) cleanupStalePeers(hostId string) {
}
}
}

func (net *NodeEventTracker) UpdateNodeDataTwitter(peerID string, updates NodeData) error {
nodeData, exists := net.nodeData.Get(peerID)
if !exists {
return fmt.Errorf("node data not found for peer ID: %s", peerID)
}

// Update fields based on non-zero values
nodeData.UpdateTwitterFields(updates)

// Save the updated node data
err := net.AddOrUpdateNodeData(nodeData, true)
if err != nil {
return fmt.Errorf("error updating node data: %v", err)
}
return nil
}
13 changes: 13 additions & 0 deletions pkg/tests/node_data/node_data_suite_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
package node_data

import (
"testing"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"
)

func TestNodeData(t *testing.T) {
RegisterFailHandler(Fail)
RunSpecs(t, "NodeData Test Suite")
}
41 changes: 41 additions & 0 deletions pkg/tests/node_data/node_data_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package node_data

import (
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

"github.com/masa-finance/masa-oracle/pkg/pubsub"
)

var _ = Describe("NodeData", func() {
Describe("UpdateTwitterFields", func() {
It("should correctly update Twitter fields", func() {
initialData := pubsub.NodeData{
ReturnedTweets: 10,
TweetTimeouts: 2,
NotFoundCount: 1,
}

updates := pubsub.NodeData{
ReturnedTweets: 5,
LastReturnedTweet: time.Now(),
TweetTimeout: true,
TweetTimeouts: 1,
LastTweetTimeout: time.Now(),
LastNotFoundTime: time.Now(),
NotFoundCount: 1,
}

initialData.UpdateTwitterFields(updates)

Expect(initialData.ReturnedTweets).To(Equal(15))
Expect(initialData.TweetTimeouts).To(Equal(3))
Expect(initialData.NotFoundCount).To(Equal(2))
Expect(initialData.LastReturnedTweet.IsZero()).To(BeFalse())
Expect(initialData.LastTweetTimeout.IsZero()).To(BeFalse())
Expect(initialData.LastNotFoundTime.IsZero()).To(BeFalse())
})
})
})
60 changes: 60 additions & 0 deletions pkg/tests/node_data/node_data_tracker_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package node_data

import (
"context"
"time"

. "github.com/onsi/ginkgo/v2"
. "github.com/onsi/gomega"

. "github.com/masa-finance/masa-oracle/node"
"github.com/masa-finance/masa-oracle/pkg/pubsub"
)

var _ = Describe("NodeDataTracker", func() {
Context("UpdateNodeDataTwitter", func() {
It("should correctly update NodeData Twitter fields", func() {
testNode, err := NewOracleNode(
context.Background(),
EnableStaked,
EnableRandomIdentity,
)
Expect(err).NotTo(HaveOccurred())

err = testNode.Start()
Expect(err).NotTo(HaveOccurred())

initialData := pubsub.NodeData{
PeerId: testNode.Host.ID(),
LastReturnedTweet: time.Now().Add(-1 * time.Hour),
ReturnedTweets: 10,
TweetTimeout: true,
TweetTimeouts: 2,
LastTweetTimeout: time.Now().Add(-1 * time.Hour),
LastNotFoundTime: time.Now().Add(-1 * time.Hour),
NotFoundCount: 1,
}

err = testNode.NodeTracker.UpdateNodeDataTwitter(testNode.Host.ID().String(), initialData)
Expect(err).NotTo(HaveOccurred())

updates := pubsub.NodeData{
ReturnedTweets: 5,
LastReturnedTweet: time.Now(),
TweetTimeout: true,
TweetTimeouts: 1,
LastTweetTimeout: time.Now(),
LastNotFoundTime: time.Now(),
NotFoundCount: 1,
}

err = testNode.NodeTracker.UpdateNodeDataTwitter(testNode.Host.ID().String(), updates)
Expect(err).NotTo(HaveOccurred())

updatedData := testNode.NodeTracker.GetNodeData(testNode.Host.ID().String())
Expect(updatedData.ReturnedTweets).To(Equal(15))
Expect(updatedData.TweetTimeouts).To(Equal(3))
Expect(updatedData.NotFoundCount).To(Equal(2))
})
})
})
8 changes: 4 additions & 4 deletions pkg/workers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,13 @@ type WorkerConfig struct {
}

var DefaultConfig = WorkerConfig{
WorkerTimeout: 55 * time.Second,
WorkerResponseTimeout: 45 * time.Second,
ConnectionTimeout: 10 * time.Second,
WorkerTimeout: 45 * time.Second,
WorkerResponseTimeout: 35 * time.Second,
ConnectionTimeout: 500 * time.Millisecond,
MaxRetries: 1,
MaxSpawnAttempts: 1,
WorkerBufferSize: 100,
MaxRemoteWorkers: 10,
MaxRemoteWorkers: 100,
}

var workerConfig *WorkerConfig
Expand Down
47 changes: 45 additions & 2 deletions pkg/workers/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"math/rand"
"strings"
"sync"
"time"
Expand All @@ -17,6 +18,7 @@ import (
"github.com/masa-finance/masa-oracle/node"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/event"
"github.com/masa-finance/masa-oracle/pkg/pubsub"
"github.com/masa-finance/masa-oracle/pkg/workers/handlers"
data_types "github.com/masa-finance/masa-oracle/pkg/workers/types"
)
Expand Down Expand Up @@ -97,11 +99,25 @@ func (whm *WorkHandlerManager) getWorkHandler(wType data_types.WorkerType) (Work

func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse) {
category := data_types.WorkerTypeToCategory(workRequest.WorkType)
remoteWorkers, localWorker := GetEligibleWorkers(node, category)
var remoteWorkers []data_types.Worker
var localWorker *data_types.Worker

if category == pubsub.CategoryTwitter {
// Use priority-based selection for Twitter work
remoteWorkers, localWorker = GetEligibleWorkers(node, category, workerConfig.MaxRemoteWorkers)
logrus.Info("Starting priority-based worker selection for Twitter work")
} else {
// Use existing selection for other work types
remoteWorkers, localWorker = GetEligibleWorkers(node, category, 0)
// Shuffle the workers to maintain round-robin behavior
rand.Shuffle(len(remoteWorkers), func(i, j int) {
remoteWorkers[i], remoteWorkers[j] = remoteWorkers[j], remoteWorkers[i]
})
logrus.Info("Starting round-robin worker selection for non-Twitter work")
}

remoteWorkersAttempted := 0
var errorList []string
logrus.Info("Starting round-robin worker selection")

// Try remote workers first, up to MaxRemoteWorkers
for _, worker := range remoteWorkers {
Expand All @@ -115,6 +131,15 @@ func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest
peerInfo, err := node.DHT.FindPeer(context.Background(), worker.NodeData.PeerId)
if err != nil {
logrus.Warnf("Failed to find peer %s in DHT: %v", worker.NodeData.PeerId.String(), err)
if category == pubsub.CategoryTwitter {
err := node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{
LastNotFoundTime: time.Now(),
NotFoundCount: 1,
})
if err != nil {
logrus.Warnf("Failed to update node data for peer %s: %v", worker.NodeData.PeerId.String(), err)
}
}
continue
}

Expand Down Expand Up @@ -241,6 +266,24 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *node.OracleNode, worker da
response.Error = fmt.Sprintf("error unmarshaling response: %v", err)
return
}
// Update metrics only if the work category is Twitter
if data_types.WorkerTypeToCategory(workRequest.WorkType) == pubsub.CategoryTwitter {
if response.Error == "" {
err = node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{
ReturnedTweets: response.RecordCount,
LastReturnedTweet: time.Now(),
})
} else {
err = node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{
TweetTimeout: true,
TweetTimeouts: 1,
LastTweetTimeout: time.Now(),
})
}
if err != nil {
logrus.Warnf("Failed to update node data for peer %s: %v", worker.NodeData.PeerId.String(), err)
}
}
}
return response
}
Expand Down
Loading

0 comments on commit f09fb20

Please sign in to comment.