Skip to content

Commit

Permalink
feat(workers): enhance error handling and worker selection in Distrib…
Browse files Browse the repository at this point in the history
…uteWork (#540)

* feat(workers): enhance error handling and worker selection in DistributeWork

- Collect errors from all failed worker attempts
- Continue trying workers after failures until max attempts reached
- Include all collected errors in final error message
- Improve logging for detailed error information
- Maintain existing behavior of returning on first success

Ensures all eligible workers are attempted before failing, improving
reliability and providing comprehensive error reports for debugging.

* feat: add Twitter cookies example template

Add twitter_cookies.example.json as a template for storing Twitter authentication cookies.
This file provides a structure for users to input their own cookie values securely.

* feat(workers): Improve worker selection for Twitter authentication errors

- Modify DistributeWork function to continue trying workers on Twitter auth failures
- Implement specific error handling for Twitter credential authentication errors
- Ensure up to MaxRemoteWorkers (10) attempts before giving up
- Enhance logging to provide better visibility into worker selection process

This change aims to increase resilience in the face of Twitter API authentication issues by allowing the system to try multiple workers before failing.

* Enhance worker selection to try all available remote workers

- Modify DistributeWork function in worker_manager.go to attempt all remote workers
- Continue worker selection loop on Twitter authentication errors
- Only return early on successful worker response
- Improve error handling and logging for better debugging
- Ensure MaxRemoteWorkers limit is respected

This change addresses an issue where the system was prematurely returning after
the first worker failure, instead of trying all available workers. Now, it will
attempt to use each eligible remote worker before falling back to local execution
or reporting a failure, improving the robustness of the worker selection process.

* feat(workers): Enhance worker selection to try all available remote workers

- Modify GetEligibleWorkers in worker_selection.go to return all eligible workers
- Update DistributeWork in worker_manager.go to attempt connection with each worker
- Improve error handling and logging for better visibility into worker selection process

This change addresses an issue where the system was only attempting to use a single
remote worker before falling back to local execution. Now, it will attempt to
connect to and use each eligible remote worker before giving up, improving the
robustness of the worker selection process and increasing the chances of successful
task completion.

Part of #XXX (replace with actual ticket number)
  • Loading branch information
teslashibe authored and mudler committed Sep 17, 2024
1 parent 8b505fe commit 8afdc49
Show file tree
Hide file tree
Showing 3 changed files with 134 additions and 42 deletions.
50 changes: 43 additions & 7 deletions pkg/workers/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"strings"
"sync"
"time"

Expand Down Expand Up @@ -100,6 +101,7 @@ func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest
remoteWorkers, localWorker := GetEligibleWorkers(node, category, workerConfig)

remoteWorkersAttempted := 0
var errors []string
logrus.Info("Starting round-robin worker selection")

// Try remote workers first, up to MaxRemoteWorkers
Expand All @@ -109,23 +111,57 @@ func (whm *WorkHandlerManager) DistributeWork(node *masa.OracleNode, workRequest
break
}
remoteWorkersAttempted++

// Attempt to connect to the worker
peerInfo, err := node.DHT.FindPeer(context.Background(), worker.NodeData.PeerId)
if err != nil {
logrus.Warnf("Failed to find peer %s in DHT: %v", worker.NodeData.PeerId.String(), err)
continue
}

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), workerConfig.ConnectionTimeout)
err = node.Host.Connect(ctxWithTimeout, peerInfo)
cancel()
if err != nil {
logrus.Warnf("Failed to connect to peer %s: %v", worker.NodeData.PeerId.String(), err)
continue
}

worker.AddrInfo = &peerInfo

logrus.Infof("Attempting remote worker %s (attempt %d/%d)", worker.NodeData.PeerId, remoteWorkersAttempted, workerConfig.MaxRemoteWorkers)
response = whm.sendWorkToWorker(node, worker, workRequest)
if response.Error != "" {
logrus.Errorf("error sending work to worker: %s: %s", response.WorkerPeerId, response.Error)
errorMsg := fmt.Sprintf("Worker %s: %s", worker.NodeData.PeerId, response.Error)
errors = append(errors, errorMsg)
logrus.Errorf("error sending work to worker: %s", errorMsg)
logrus.Infof("Remote worker %s failed, moving to next worker", worker.NodeData.PeerId)
continue

// Check if the error is related to Twitter authentication
if strings.Contains(response.Error, "unable to get twitter profile: there was an error authenticating with your Twitter credentials") {
logrus.Warnf("Worker %s failed due to Twitter authentication error. Skipping to the next worker.", worker.NodeData.PeerId)
continue
}
} else {
return response
}
return response
}
// Fallback to local execution if local worker is eligible

// Fallback to local execution if local worker is eligible and all remote workers failed
if localWorker != nil {
return whm.ExecuteWork(workRequest)
response = whm.ExecuteWork(workRequest)
if response.Error != "" {
errors = append(errors, fmt.Sprintf("Local worker: %s", response.Error))
} else {
return response
}
}
if response.Error == "" {

// If we reach here, all attempts failed
if len(errors) == 0 {
response.Error = "no eligible workers found"
} else {
response.Error = fmt.Sprintf("no workers could process: remote attempt failed due to: %s", response.Error)
response.Error = fmt.Sprintf("All workers failed. Errors: %s", strings.Join(errors, "; "))
}
return response
}
Expand Down
38 changes: 3 additions & 35 deletions pkg/workers/worker_selection.go
Original file line number Diff line number Diff line change
@@ -1,9 +1,7 @@
package workers

import (
"context"
"math/rand/v2"
"time"

"github.com/sirupsen/logrus"

Expand All @@ -23,45 +21,15 @@ func GetEligibleWorkers(node *masa.OracleNode, category pubsub.WorkerCategory, c
nodes[i], nodes[j] = nodes[j], nodes[i]
})

logrus.Info("Checking connections to eligible workers")
start := time.Now()
logrus.Info("Getting eligible workers")
for _, eligible := range nodes {
if eligible.PeerId.String() == node.Host.ID().String() {
localWorker = &data_types.Worker{IsLocal: true, NodeData: eligible}
continue
}

// Use the DHT to find the peer's address information
peerInfo, err := node.DHT.FindPeer(context.Background(), eligible.PeerId)
if err != nil {
logrus.Warnf("Failed to find peer %s in DHT: %v", eligible.PeerId.String(), err)
continue
}

ctxWithTimeout, cancel := context.WithTimeout(context.Background(), config.ConnectionTimeout)
err = node.Host.Connect(ctxWithTimeout, peerInfo)
cancel()
if err != nil {
logrus.Warnf("Failed to connect to peer %s: %v", eligible.PeerId.String(), err)
continue
}

workers = append(workers, data_types.Worker{IsLocal: false, NodeData: eligible, AddrInfo: &peerInfo})
dur := time.Since(start).Milliseconds()
logrus.Infof("Worker selection took %v milliseconds", dur)
break
}

if localWorker == nil {
nd := node.NodeTracker.GetNodeData(node.Host.ID().String())
if nd != nil && nd.CanDoWork(category) {
localWorker = &data_types.Worker{IsLocal: true, NodeData: *nd}
}
}

if len(workers) == 0 && localWorker == nil {
logrus.Warn("No eligible workers found, including local worker")
workers = append(workers, data_types.Worker{IsLocal: false, NodeData: eligible})
}

logrus.Infof("Found %d eligible remote workers", len(workers))
return workers, localWorker
}
88 changes: 88 additions & 0 deletions twitter_cookies.example.json
Original file line number Diff line number Diff line change
@@ -0,0 +1,88 @@
// How to obtain this information from your browser:
// 1. Log in to Twitter in your web browser
// 2. Open the browser's developer tools (usually F12 or right-click > Inspect)
// 3. Go to the "Application" or "Storage" tab
// 4. In the left sidebar, expand "Cookies" and click on "https://twitter.com"
// 5. Look for the cookie names listed above and copy their values
// 6. Replace the "X" placeholders in the "Value" field with the actual values
// 7. Save the file as "twitter_cookies.json" (remove ".example" from the filename)

// Note: Most browsers only show the Name, Value, Domain, and Path fields in the developer tools.
// The other fields (Expires, MaxAge, Secure, HttpOnly, SameSite) may not be visible or editable.
// You can leave these fields as they are in this template.

// IMPORTANT: Be extremely cautious with your auth_token and other sensitive cookies.
// Never share them publicly or commit them to version control.

[
{
"Name": "personalization_id",
"Value": "v1_XXXXXXXXXXXXXXXXXXXXXXXX==",
"Path": "",
"Domain": "twitter.com",
"Expires": "0001-01-01T00:00:00Z",
"RawExpires": "",
"MaxAge": 0,
"Secure": false,
"HttpOnly": false,
"SameSite": 0,
"Raw": "",
"Unparsed": null
},
{
"Name": "kdt",
"Value": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"Path": "",
"Domain": "twitter.com",
"Expires": "0001-01-01T00:00:00Z",
"RawExpires": "",
"MaxAge": 0,
"Secure": false,
"HttpOnly": false,
"SameSite": 0,
"Raw": "",
"Unparsed": null
},
{
"Name": "twid",
"Value": "u=XXXXXXXXX",
"Path": "",
"Domain": "twitter.com",
"Expires": "0001-01-01T00:00:00Z",
"RawExpires": "",
"MaxAge": 0,
"Secure": false,
"HttpOnly": false,
"SameSite": 0,
"Raw": "",
"Unparsed": null
},
{
"Name": "ct0",
"Value": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"Path": "",
"Domain": "twitter.com",
"Expires": "0001-01-01T00:00:00Z",
"RawExpires": "",
"MaxAge": 0,
"Secure": false,
"HttpOnly": false,
"SameSite": 0,
"Raw": "",
"Unparsed": null
},
{
"Name": "auth_token",
"Value": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX",
"Path": "",
"Domain": "twitter.com",
"Expires": "0001-01-01T00:00:00Z",
"RawExpires": "",
"MaxAge": 0,
"Secure": false,
"HttpOnly": false,
"SameSite": 0,
"Raw": "",
"Unparsed": null
}
]

0 comments on commit 8afdc49

Please sign in to comment.