Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remote worker selection with local worker fallback & configuration and fine tuning #489

Closed
Show file tree
Hide file tree
Changes from 12 commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
571c4f2
feat: refactor sendWork function in workers.go to make modular
teslashibe Aug 5, 2024
349386f
feat: worker-sub selection refactor
teslashibe Aug 5, 2024
4db2ee0
feat: Implement round-robin worker selection for distributed task pro…
teslashibe Aug 5, 2024
d140ce2
Improve error handling and resilience in send_work.go
teslashibe Aug 5, 2024
1678449
feat: add config.go to worker package to simplify settings
teslashibe Aug 5, 2024
3d6ed60
refactor(workers): move worker selection logic to separate file
teslashibe Aug 6, 2024
4de845a
fix: duplication of getEligibleWorkers func
teslashibe Aug 6, 2024
8c2f849
feat: Enhance worker selection process with configurable remote worke…
teslashibe Aug 7, 2024
3e9c9d6
fix: fine tune timeouts with testing
teslashibe Aug 7, 2024
51a0421
Refactor worker selection and add eligibility check
restevens402 Aug 7, 2024
8a2010c
Merge branch 'test' into teslashibe/worker-remote-worker-selection-wi…
restevens402 Aug 7, 2024
9f28628
fixed case where err was reassigned and hiding the original
restevens402 Aug 7, 2024
a3b5c95
fix: local work is not timing out for long queries >20s
teslashibe Aug 7, 2024
cb0b78d
added more error handling and bubbled up to the tryWorker level
restevens402 Aug 7, 2024
e47f737
Enable randomized node selection and improve config settings
restevens402 Aug 8, 2024
0663ebc
x: add error handling for peer info creation and fine tune timeouts
teslashibe Aug 9, 2024
322dc29
Merge branch 'test' into teslashibe/worker-remote-worker-selection-wi…
teslashibe Aug 9, 2024
5c85feb
chore: add configurable connection timeout to config.go
teslashibe Aug 9, 2024
6f5e248
Merge branch 'teslashibe/worker-remote-worker-selection-with-local-fa…
teslashibe Aug 9, 2024
9beb6ce
chore: revert makefile version tagger
teslashibe Aug 9, 2024
12db5b5
Adjust time intervals and remove unnecessary peer removal
restevens402 Aug 9, 2024
d10c6f8
resolving comments in PR
restevens402 Aug 10, 2024
88911fa
fix: cleanup logs and tune timeouts
teslashibe Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -102,7 +102,7 @@ func handleWorkResponse(c *gin.Context, responseCh chan []byte) {
c.JSON(http.StatusOK, result)
return
// teslashibe: adjust to timeout after 10 seconds for performance testing
case <-time.After(10 * time.Second):
case <-time.After(15 * time.Second):
restevens402 marked this conversation as resolved.
Show resolved Hide resolved
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out, check that port 4001 TCP inbound is open."})
return
case <-c.Done():
Expand Down
16 changes: 11 additions & 5 deletions pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, act
// and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC".
// This can be used by other nodes to connect to this node.
func (n *NodeData) Address() string {
// Add a check for empty addresses
if len(n.Multiaddrs) == 0 {
return ""
}
return fmt.Sprintf("%s/p2p/%s", n.Multiaddrs[0].String(), n.PeerId.String())
}

Expand All @@ -117,16 +121,18 @@ func (n *NodeData) CanDoWork(workerType WorkerCategory) bool {
logrus.Infof("[+] Skipping worker %s due to timeout", n.PeerId)
return false
}

if !(n.IsStaked && n.IsActive) {
return false
}
switch workerType {
case CategoryTwitter:
return n.IsActive && n.IsTwitterScraper
return n.IsTwitterScraper
case CategoryDiscord:
return n.IsActive && n.IsDiscordScraper
return n.IsDiscordScraper
case CategoryTelegram:
return n.IsActive && n.IsTelegramScraper
return n.IsTelegramScraper
case CategoryWeb:
return n.IsActive && n.IsWebScraper
return n.IsWebScraper
default:
return false
}
Expand Down
14 changes: 13 additions & 1 deletion pkg/pubsub/node_event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData {
}

// GetEthAddress returns the Ethereum address for the given remote peer.
// It gets the peer's public key from the network's peerstore, converts
// It gets the peer's public key from the network's peer store, converts
// it to a hex string, and converts that to an Ethereum address.
// Returns an empty string if there is no public key for the peer.
func GetEthAddress(remotePeer peer.ID, n network.Network) string {
Expand All @@ -273,6 +273,18 @@ func GetEthAddress(remotePeer peer.ID, n network.Network) string {
return publicKeyHex
}

// GetEligibleWorkerNodes returns a slice of NodeData for nodes that are eligible to perform a specific category of work.
func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []NodeData {
logrus.Debugf("Getting eligible worker nodes for category: %s", category)
result := make([]NodeData, 0)
for _, nodeData := range net.GetAllNodeData() {
if nodeData.CanDoWork(category) {
result = append(result, nodeData)
}
}
return result
}

// IsStaked returns whether the node with the given peerID is marked as staked in the node data tracker.
// Returns false if no node data is found for the given peerID.
func (net *NodeEventTracker) IsStaked(peerID string) bool {
Expand Down
29 changes: 29 additions & 0 deletions pkg/workers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package workers

import (
"time"
)

type WorkerConfig struct {
WorkerTimeout time.Duration
WorkerResponseTimeout time.Duration
MaxRetries int
MaxSpawnAttempts int
WorkerBufferSize int
MaxRemoteWorkers int
}

var DefaultConfig = WorkerConfig{
WorkerTimeout: 2000 * time.Millisecond,
WorkerResponseTimeout: 1250 * time.Millisecond,
MaxRetries: 1,
MaxSpawnAttempts: 3,
WorkerBufferSize: 100,
MaxRemoteWorkers: 1,
}

func LoadConfig() (*WorkerConfig, error) {
// For now, we'll just return the default config
config := DefaultConfig
return &config, nil
}
10 changes: 7 additions & 3 deletions pkg/workers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (

"github.com/asynkron/protoactor-go/actor"
pubsub2 "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"

masa "github.com/masa-finance/masa-oracle/pkg"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/scrapers/discord"
"github.com/masa-finance/masa-oracle/pkg/scrapers/telegram"
"github.com/masa-finance/masa-oracle/pkg/scrapers/twitter"
"github.com/masa-finance/masa-oracle/pkg/scrapers/web"
"github.com/masa-finance/masa-oracle/pkg/workers/messages"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
)

type LLMChatBody struct {
Expand Down Expand Up @@ -158,7 +159,10 @@ func (a *Worker) HandleWork(ctx actor.Context, m *messages.Work, node *masa.Orac
}

if err != nil {
host, _, err := net.SplitHostPort(m.Sender.Address)
host, _, err2 := net.SplitHostPort(m.Sender.Address)
if err2 != nil {
logrus.Errorf("[-] Error splitting host and port: %v", err2)
}
addrs := node.Host.Addrs()
isLocalHost := false
for _, addr := range addrs {
Expand Down
263 changes: 263 additions & 0 deletions pkg/workers/send_work.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,263 @@
package workers

import (
"fmt"
"time"

masa "github.com/masa-finance/masa-oracle/pkg"
"github.com/masa-finance/masa-oracle/pkg/pubsub"
"github.com/masa-finance/masa-oracle/pkg/workers/messages"

"github.com/asynkron/protoactor-go/actor"

pubsub2 "github.com/libp2p/go-libp2p-pubsub"
"github.com/sirupsen/logrus"
)

var workerConfig *WorkerConfig

func init() {
var err error
workerConfig, err = LoadConfig()
if err != nil {
logrus.Fatalf("Failed to load worker config: %v", err)
}
workerDoneCh = make(chan *pubsub2.Message, workerConfig.WorkerBufferSize)
}

type Worker struct {
IsLocal bool
NodeData pubsub.NodeData
IPAddr string
Node *masa.OracleNode
}

func SendWork(node *masa.OracleNode, m *pubsub2.Message) {
logrus.Infof("Sending work to node %s", node.Host.ID())
props := actor.PropsFromProducer(NewWorker(node))
pid := node.ActorEngine.Spawn(props)
message := createWorkMessage(m, pid)

responseCollector := make(chan *pubsub2.Message, 1)

eligibleWorkers := GetEligibleWorkers(node, message)

success := tryWorkersRoundRobin(node, eligibleWorkers, message, responseCollector)
if !success {
logrus.Error("Failed to process the work")
}
}

func tryWorkersRoundRobin(node *masa.OracleNode, workers []Worker, message *messages.Work, responseCollector chan *pubsub2.Message) bool {
var localWorker *Worker
remoteWorkersAttempted := 0

logrus.Info("Starting round-robin worker selection")

// Try remote workers first, up to MaxRemoteWorkers
for _, worker := range workers {
restevens402 marked this conversation as resolved.
Show resolved Hide resolved
if !worker.IsLocal {
if remoteWorkersAttempted >= workerConfig.MaxRemoteWorkers {
logrus.Infof("Reached maximum remote workers (%d), stopping remote worker attempts", workerConfig.MaxRemoteWorkers)
break
}
remoteWorkersAttempted++
logrus.Infof("Attempting remote worker %s (attempt %d/%d)", worker.NodeData.PeerId, remoteWorkersAttempted, workerConfig.MaxRemoteWorkers)
if tryWorker(node, worker, message, responseCollector) {
logrus.Infof("Remote worker %s successfully completed the work", worker.NodeData.PeerId)
return true
}
logrus.Infof("Remote worker %s failed, moving to next worker", worker.NodeData.PeerId)
} else {
localWorker = &worker
logrus.Info("Found local worker, saving for later if needed")
}
}

// If remote workers fail or don't exist, try local worker
if localWorker != nil {
logrus.Info("Attempting local worker")
return tryWorker(node, *localWorker, message, responseCollector)
}

// If no workers are available, create a local worker as last resort
logrus.Warn("No workers available, creating last resort local worker")
lastResortLocalWorker := Worker{
restevens402 marked this conversation as resolved.
Show resolved Hide resolved
IsLocal: true,
NodeData: pubsub.NodeData{PeerId: node.Host.ID()},
Node: node,
}
return tryWorker(node, lastResortLocalWorker, message, responseCollector)
}

func tryWorker(node *masa.OracleNode, worker Worker, message *messages.Work, responseCollector chan *pubsub2.Message) bool {
workerDone := make(chan bool, 1)

go func() {
if worker.IsLocal {
handleLocalWorker(node, node.ActorEngine.Spawn(actor.PropsFromProducer(NewWorker(node))), message, responseCollector)
} else {
handleRemoteWorker(node, worker.NodeData, worker.IPAddr, actor.PropsFromProducer(NewWorker(node)), message, responseCollector)
}
workerDone <- true
}()

select {
case <-workerDone:
select {
case response := <-responseCollector:
if isSuccessfulResponse(response) {
if worker.IsLocal {
logrus.Infof("Local worker with PeerID %s successfully completed the work", node.Host.ID())
} else {
logrus.Infof("Remote worker with PeerID %s and IP %s successfully completed the work", worker.NodeData.PeerId, worker.IPAddr)
}
processAndSendResponse(response)
return true
}
case <-time.After(workerConfig.WorkerResponseTimeout):
if worker.IsLocal {
logrus.Warnf("Local worker with PeerID %s failed to respond in time", node.Host.ID())
} else {
logrus.Warnf("Remote worker with PeerID %s and IP %s failed to respond in time", worker.NodeData.PeerId, worker.IPAddr)
}
}
case <-time.After(workerConfig.WorkerTimeout):
if worker.IsLocal {
logrus.Warnf("Local worker with PeerID %s timed out", node.Host.ID())
} else {
logrus.Warnf("Remote worker with PeerID %s and IP %s timed out", worker.NodeData.PeerId, worker.IPAddr)
}
}

return false
}

func createWorkMessage(m *pubsub2.Message, pid *actor.PID) *messages.Work {
return &messages.Work{
Data: string(m.Data),
Sender: pid,
Id: m.ReceivedFrom.String(),
Type: int64(pubsub.CategoryTwitter),
}
}

func handleLocalWorker(node *masa.OracleNode, pid *actor.PID, message *messages.Work, responseCollector chan<- *pubsub2.Message) {
logrus.Info("Sending work to local worker")
future := node.ActorEngine.RequestFuture(pid, message, workerConfig.WorkerTimeout)
result, err := future.Result()
if err != nil {
handleWorkerError(err, responseCollector)
return
}

// Log the full response from the local worker
logrus.WithField("full_response", result).Info("Full response from local worker")

processWorkerResponse(result, node.Host.ID(), responseCollector)
}

func handleRemoteWorker(node *masa.OracleNode, p pubsub.NodeData, ipAddr string, props *actor.Props, message *messages.Work, responseCollector chan<- *pubsub2.Message) {
logrus.WithFields(logrus.Fields{
"ip": ipAddr,
"peer": p.PeerId,
}).Info("Handling remote worker")

var spawned *actor.PID
var err error

// Attempt to spawn the remote worker multiple times
for attempt := 1; attempt <= workerConfig.MaxSpawnAttempts; attempt++ {
spawned, err = spawnRemoteWorker(node, ipAddr)
if err == nil {
break
}
logrus.WithError(err).WithFields(logrus.Fields{
"ip": ipAddr,
"attempt": attempt,
}).Warn("Failed to spawn remote worker, retrying")
time.Sleep(time.Second * time.Duration(attempt)) // Exponential backoff
}

if err != nil {
logrus.WithError(err).WithField("ip", ipAddr).Error("Failed to spawn remote worker after multiple attempts")
handleWorkerError(err, responseCollector)
return
}

client := node.ActorEngine.Spawn(props)
node.ActorEngine.Send(spawned, &messages.Connect{Sender: client})

future := node.ActorEngine.RequestFuture(spawned, message, workerConfig.WorkerTimeout)
result, err := future.Result()
if err != nil {
logrus.WithError(err).WithFields(logrus.Fields{
"ip": ipAddr,
"peer": p.PeerId,
}).Error("Error getting result from remote worker")
handleWorkerError(err, responseCollector)
return
}

logrus.WithFields(logrus.Fields{
"ip": ipAddr,
"peer": p.PeerId,
}).Info("Successfully processed remote worker response")
processWorkerResponse(result, p.PeerId, responseCollector)
}

func spawnRemoteWorker(node *masa.OracleNode, ipAddr string) (*actor.PID, error) {
spawned, err := node.ActorRemote.SpawnNamed(fmt.Sprintf("%s:4001", ipAddr), "worker", "peer", -1)
if err != nil {
return nil, err
}

if spawned == nil || spawned.Pid == nil {
return nil, fmt.Errorf("failed to spawn remote worker: PID is nil for IP %s", ipAddr)
}

return spawned.Pid, nil
}

func handleWorkerError(err error, responseCollector chan<- *pubsub2.Message) {
logrus.Errorf("[-] Error with worker: %v", err)
responseCollector <- &pubsub2.Message{
ValidatorData: map[string]interface{}{"error": err.Error()},
}
}

func processWorkerResponse(result interface{}, workerID interface{}, responseCollector chan<- *pubsub2.Message) {
response, ok := result.(*messages.Response)
if !ok {
logrus.Errorf("[-] Invalid response type from worker")
return
}
msg, err := getResponseMessage(response)
if err != nil {
logrus.Errorf("[-] Error converting worker response: %v", err)
return
}
logrus.Infof("Received response from worker %v, sending to responseCollector", workerID)
responseCollector <- msg
}

func isSuccessfulResponse(response *pubsub2.Message) bool {
if response.ValidatorData == nil {
return true
}
validatorData, ok := response.ValidatorData.(map[string]interface{})
if !ok {
return false
}
errorVal, exists := validatorData["error"]
return !exists || errorVal == nil
}

func processAndSendResponse(response *pubsub2.Message) {
logrus.Infof("Processing and sending successful response")
workerDoneCh <- response
}

func init() {
workerDoneCh = make(chan *pubsub2.Message, 100)
}
Loading
Loading