Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remote worker selection with local worker fallback & configuration and fine tuning #489

Conversation

teslashibe
Copy link
Contributor

@teslashibe teslashibe commented Aug 7, 2024

Description

Final implementation

  • We use nodeData to filter peers that can do the work
  • Then we use the linp2p/DHT to do a connection check and select a nearby peer that is eligible to do the work and check it can accept a connection - this assumes port 4001 is open which is a requirement
  • If we cannot connect to the node in <1000ms or if there is another error in <20s we failover to the local worker

In the GetEligibleWorkers function, the connection to eligible workers is checked using the following process:

for _, addr := range eligible.Multiaddrs {
    ipAddr, _ := addr.ValueForProtocol(multiaddr.P_IP4)
    realAddr := fmt.Sprintf("/ip4/%s/udp/4001/quic-v1/p2p/%s", ipAddr, eligible.PeerId.String())
    addr, err := multiaddr.NewMultiaddr(realAddr)
    if err != nil {
        logrus.Errorf("[-] kdht: %s", err.Error())
        continue
    }
    peerInfo, err := peer.AddrInfoFromP2pAddr(addr)
    if err != nil {
        logrus.Errorf("[-] Failed to get peer info: %s", err)
        continue
    }
    ctxWithTimeout, cancel := context.WithTimeout(context.Background(), time.Second*1)
    defer cancel() // Cancel the context when done to release resources
    if err := node.Host.Connect(ctxWithTimeout, *peerInfo); err != nil {
        logrus.Debugf("[-] Failed to connect to peer: %v", err)
        continue
    }
    // Connection successful
    workers = append(workers, Worker{IsLocal: false, NodeData: eligible, IPAddr: ipAddr})
    // ...
}

Here's a breakdown of the connection check process:

  1. For each multiaddress of an eligible worker:

    • Extract the IP address.
    • Construct a full address string including the QUIC protocol and peer ID.
    • Create a new multiaddress from this string.
  2. Convert the multiaddress to peer info.

  3. Attempt to connect to the peer:

    • Use node.Host.Connect() with a 1-second timeout.
    • If the connection fails, log a debug message and continue to the next address.
  4. If the connection succeeds:

    • Add the worker to the workers slice.
    • Log the time taken for worker selection.
    • Set workerFound to true and break the loop.

This process ensures that only workers with successful connections are added to the list of eligible workers. The 1-second timeout prevents the function from hanging too long on unresponsive nodes.

Overview

This PR introduces a new worker selection strategy for our distributed task processing system. The main goal is to increase the overall throughput of our network by more effectively distributing work across all available nodes, while respecting individual node processing limits and potential latency of nodes.

Key Changes

  1. Worker Selection Algorithm

    • Implemented a new GetEligibleWorkers function that collects both local and remote workers.
    • Added random shuffling of the worker list to improve load balancing and randomly distribute work requests.
    • Introduced a round-robin iterator for worker selection, prioritizing remote workers.
  2. Configuration Updates

    • Added a new WorkerConfig struct with the following parameters:
      • WorkerTimeout: 500ms
      • WorkerResponseTimeout: 250ms
      • MaxRetries: 1
      • MaxSpawnAttempts: 3
      • WorkerBufferSize: 100
      • MaxRemoteWorkers: 1
  3. Remote Worker Handling

    • Implemented logic to attempt remote workers before falling back to the local worker.
    • Added a limit (MaxRemoteWorkers) on the number of remote workers to attempt - this is currently synchronous and each worker is given a set time to respond as defined in the config file. This means developers can tune their nodes to their own use cases.
  4. Local Worker Fallback

    • Ensured that the local worker is always available as a fallback option if there are not remote workers or if port 4001 is not open.
  5. Timeout and Retry Mechanism

    • Implemented short timeouts for worker operations to quickly identify and bypass slow or unresponsive workers.
    • Added a retry mechanism with a configurable maximum number of retries.

Implementation Details

Worker Selection

func GetEligibleWorkers(node *masa.OracleNode, message *messages.Work) []Worker {
    // Implementation details...
}

This function now collects all eligible workers, including both local and remote, and shuffles the list for better load distribution.

Round-Robin Iterator

type roundRobinIterator struct {
    // Implementation details...
}

func NewRoundRobinIterator(workers []Worker) *roundRobinIterator {
    // Implementation details...
}

The round-robin iterator ensures a fair distribution of work among available workers.

Worker Configuration

type WorkerConfig struct {
    // Configuration fields...
}

var DefaultConfig = WorkerConfig{
    // Default values...
}

This new configuration structure allows for easy tuning of worker behavior.

Rationale

The new implementation aims to increase network throughput by:

  1. Distributing work across multiple nodes, utilizing the full capacity of the network.
  2. Implementing a basic load balancing mechanism through random shuffling and round-robin selection.
  3. Quickly bypassing slow or unresponsive workers to maintain system responsiveness.
  4. Providing a fallback mechanism to ensure task completion even when remote workers are unavailable.

Performance Implications

  • Throughput: Expected to increase as work is distributed across multiple nodes, each with a 1M request cap in the case of X/Twitter.
  • Latency: May slightly increase due to remote worker attempts, but mitigated by short timeouts. This is acceptable for now and can be tuned by a developer.
  • Resilience: Improved system resilience to individual node failures or capacity issues.

Future Improvements

  • Implement more sophisticated load balancing based on current worker load and performance history.
  • Consider increasing MaxRemoteWorkers to further distribute work across the network concurrently between a limited set of peers i.e. 3
  • Explore parallel execution of tasks on multiple workers for potential additional throughput gains using a request queue or request pool

Notes for Reviewers

Signed commits

  • Yes, I signed my commits.

- Separate out localWorker to send local work requests
- Separate out remoteWorker to enable selection and handling of remote work requests
- Lay foundation for remote worker selection - even though queue is limited all remoteWorkers still get the same request which is inefficient
- Log number of responses in the queue
- Make logs verbose for easy debugging of localWork and remoteWork
…cessing

- Add round-robin iterator for fair worker distribution
- Prioritize local worker when eligible
- Cycle through remote workers in subsequent calls
- Improve error handling and logging for worker responses
- Enhance code readability and maintainability

This update ensures a balanced workload across all available workers
over time, while still prioritizing local processing when possible.
- Add maxSpawnAttempts constant to limit remote worker spawn attempts
- Implement retry mechanism for spawning remote workers with exponential backoff
- Introduce spawnRemoteWorker function for better organization and error handling
- Enhance logging for better visibility into worker spawning and processing
- Improve handling of dead letters and timeouts in remote worker operations
- Refactor handleRemoteWorker to be more robust against transient failures
- Update tryWorker function to handle both local and remote worker scenarios
- Implement round-robin worker selection with retries in SendWork function

These changes aim to increase the reliability of the worker system,
particularly when dealing with remote workers, and provide better
insights into error scenarios for easier debugging and monitoring.
- Created new file worker_selection.go to house worker selection logic
- Moved GetEligibleWorkers, isEligibleRemoteWorker, and RoundRobinIterator to worker_selection.go
- Updated send_work.go to use new exported functions from worker_selection.go
- Renamed newRoundRobinIterator to NewRoundRobinIterator for proper exporting
- Updated imports and function calls in send_work.go to reflect new structure
- Improved code organization and modularity
…r limit

- Add MaxRemoteWorkers to WorkerConfig in config.go
- Update tryWorkersRoundRobin function in send_work.go to respect MaxRemoteWorkers limit
- Implement fallback mechanism to local worker when remote workers fail or are unavailable
- Add detailed logging throughout the worker selection process for better debugging
- Ensure a last resort local worker is always available if no other workers are found
@teslashibe
Copy link
Contributor Author

@teslashibe can you take a look at this today. I didn't pull in your nodeData updates that you put in here https://github.com/masa-finance/masa-oracle/pull/482/files#diff-6eb837343e9a3348891e7ffe57d5f47407edf4ee00d8d5f74018ca6640f9c58d should they go into this PR?

@teslashibe teslashibe added enhancement New feature or request DO NOT MERGE Something will break if you do Feature New feature labels Aug 7, 2024
@teslashibe teslashibe self-assigned this Aug 7, 2024
teslashibe and others added 2 commits August 7, 2024 12:51
Refactored worker selection to use NodeTracker's new GetEligibleWorkerNodes method and introduced an eligibility check for staked workers. Added a new utility function for converting strings to WorkerTypes and a method in NodeEventTracker to retrieve eligible nodes based on work categories.
@teslashibe teslashibe changed the base branch from main to test August 7, 2024 20:32
pkg/api/handlers_data.go Outdated Show resolved Hide resolved
pkg/workers/send_work.go Outdated Show resolved Hide resolved
Copy link
Contributor

@mudler mudler left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From a quick scan it looks ok - however I'd like to give it another look tomorrow with a fresh mind

Makefile Outdated Show resolved Hide resolved
Shuffle eligible nodes before selecting workers. Added context-based connection attempts to ensure we can connect to selected worker before returning.
Adjusted AppConfig initialization order and added version flag for CLI configuration.
- Check and log errors from NewMultiaddr and AddrInfoFromP2pAddr
- Resolve "value of err is never used" linter warning
- Change timeouts on workers to 20s and 15s respectively
@teslashibe teslashibe requested a review from mudler August 9, 2024 13:24
@teslashibe teslashibe requested review from restevens402 and jdutchak and removed request for restevens402 August 9, 2024 13:25
@teslashibe teslashibe removed the DO NOT MERGE Something will break if you do label Aug 9, 2024
@teslashibe teslashibe marked this pull request as ready for review August 9, 2024 13:25
teslashibe and others added 4 commits August 9, 2024 10:03
- Add ConnectionTimeout to WorkerConfig in config.go
- Update GetEligibleWorkers to use the configurable timeout
…llback' of https://github.com/masa-finance/masa-oracle into teslashibe/worker-remote-worker-selection-with-local-fallback
Updated sleep durations to one minute for efficiency and commented out peer removal logic to retain node activity tracking. Also removed the callback for peer removal in DHT initialization.
@restevens402 restevens402 self-requested a review August 10, 2024 05:47
restevens402
restevens402 previously approved these changes Aug 10, 2024
Copy link
Contributor

@restevens402 restevens402 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is a big update. Tests well. The improvements in targeted work distribution are a good improvement.

pkg/workers/worker_selection.go Outdated Show resolved Hide resolved
restevens402
restevens402 previously approved these changes Aug 10, 2024
Copy link
Contributor

@restevens402 restevens402 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All comments covered and resolved. I think we are good to go.

@teslashibe teslashibe force-pushed the teslashibe/worker-remote-worker-selection-with-local-fallback branch from 755064c to 88911fa Compare August 12, 2024 23:02
@teslashibe teslashibe closed this Aug 14, 2024
@teslashibe
Copy link
Contributor Author

teslashibe commented Aug 14, 2024

I cosed this PR: the worker selection code is fundamentally broken and refactoring is not an option. This should not go into a patch release because it will never work in the way it is intended: #504

@teslashibe teslashibe deleted the teslashibe/worker-remote-worker-selection-with-local-fallback branch August 22, 2024 05:24
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
enhancement New feature or request Feature New feature
Projects
None yet
Development

Successfully merging this pull request may close these issues.

3 participants