Skip to content

Commit

Permalink
feat: worker selection tuning (#617)
Browse files Browse the repository at this point in the history
* tests(worker-selection): fine-tune api timeout and worker selection timings

* feat(twitter-worker): worker selection optimizations for multi-threading 👍

---------

Co-authored-by: Brendan Playford <[email protected]>
  • Loading branch information
mudler and teslashibe authored Oct 31, 2024
1 parent d654c56 commit 119f740
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 7 deletions.
2 changes: 1 addition & 1 deletion pkg/api/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ type APIConfig struct {
}

var DefaultConfig = APIConfig{
WorkerResponseTimeout: 60 * time.Second,
WorkerResponseTimeout: 120 * time.Second,
// Set default values for other fields here
}

Expand Down
10 changes: 6 additions & 4 deletions pkg/workers/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,20 +10,22 @@ type WorkerConfig struct {
WorkerTimeout time.Duration
WorkerResponseTimeout time.Duration
ConnectionTimeout time.Duration
FindPeerTimeout time.Duration
MaxRetries int
MaxSpawnAttempts int
WorkerBufferSize int
MaxRemoteWorkers int
}

var DefaultConfig = WorkerConfig{
WorkerTimeout: 45 * time.Second,
WorkerResponseTimeout: 35 * time.Second,
ConnectionTimeout: 500 * time.Millisecond,
WorkerTimeout: 55 * time.Second,
WorkerResponseTimeout: 45 * time.Second,
ConnectionTimeout: 75 * time.Millisecond,
FindPeerTimeout: 50 * time.Millisecond,
MaxRetries: 1,
MaxSpawnAttempts: 1,
WorkerBufferSize: 100,
MaxRemoteWorkers: 25,
MaxRemoteWorkers: 10,
}

var workerConfig *WorkerConfig
Expand Down
10 changes: 8 additions & 2 deletions pkg/workers/worker_manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -128,9 +128,15 @@ func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest
remoteWorkersAttempted++

// Attempt to connect to the worker
peerInfo, err := node.DHT.FindPeer(context.Background(), worker.NodeData.PeerId)
ctx, cancel := context.WithTimeout(context.Background(), workerConfig.FindPeerTimeout)
peerInfo, err := node.DHT.FindPeer(ctx, worker.NodeData.PeerId)
cancel()
if err != nil {
logrus.Warnf("Failed to find peer %s in DHT: %v", worker.NodeData.PeerId.String(), err)
if err == context.DeadlineExceeded {
logrus.Warnf("Timeout while finding peer %s in DHT", worker.NodeData.PeerId.String())
} else {
logrus.Warnf("Failed to find peer %s in DHT: %v", worker.NodeData.PeerId.String(), err)
}
if category == pubsub.CategoryTwitter {
err := node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{
LastNotFoundTime: time.Now(),
Expand Down

0 comments on commit 119f740

Please sign in to comment.