Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: remote worker selection with local worker fallback & configuration and fine tuning #489

Closed
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
23 commits
Select commit Hold shift + click to select a range
571c4f2
feat: refactor sendWork function in workers.go to make modular
teslashibe Aug 5, 2024
349386f
feat: worker-sub selection refactor
teslashibe Aug 5, 2024
4db2ee0
feat: Implement round-robin worker selection for distributed task pro…
teslashibe Aug 5, 2024
d140ce2
Improve error handling and resilience in send_work.go
teslashibe Aug 5, 2024
1678449
feat: add config.go to worker package to simplify settings
teslashibe Aug 5, 2024
3d6ed60
refactor(workers): move worker selection logic to separate file
teslashibe Aug 6, 2024
4de845a
fix: duplication of getEligibleWorkers func
teslashibe Aug 6, 2024
8c2f849
feat: Enhance worker selection process with configurable remote worke…
teslashibe Aug 7, 2024
3e9c9d6
fix: fine tune timeouts with testing
teslashibe Aug 7, 2024
51a0421
Refactor worker selection and add eligibility check
restevens402 Aug 7, 2024
8a2010c
Merge branch 'test' into teslashibe/worker-remote-worker-selection-wi…
restevens402 Aug 7, 2024
9f28628
fixed case where err was reassigned and hiding the original
restevens402 Aug 7, 2024
a3b5c95
fix: local work is not timing out for long queries >20s
teslashibe Aug 7, 2024
cb0b78d
added more error handling and bubbled up to the tryWorker level
restevens402 Aug 7, 2024
e47f737
Enable randomized node selection and improve config settings
restevens402 Aug 8, 2024
0663ebc
x: add error handling for peer info creation and fine tune timeouts
teslashibe Aug 9, 2024
322dc29
Merge branch 'test' into teslashibe/worker-remote-worker-selection-wi…
teslashibe Aug 9, 2024
5c85feb
chore: add configurable connection timeout to config.go
teslashibe Aug 9, 2024
6f5e248
Merge branch 'teslashibe/worker-remote-worker-selection-with-local-fa…
teslashibe Aug 9, 2024
9beb6ce
chore: revert makefile version tagger
teslashibe Aug 9, 2024
12db5b5
Adjust time intervals and remove unnecessary peer removal
restevens402 Aug 9, 2024
d10c6f8
resolving comments in PR
restevens402 Aug 10, 2024
88911fa
fix: cleanup logs and tune timeouts
teslashibe Aug 12, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
22 changes: 22 additions & 0 deletions pkg/api/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,22 @@
package api

import "time"

// APIConfig contains configuration settings for the API
type APIConfig struct {
WorkerResponseTimeout time.Duration
// Add other API-specific configuration fields here
}

var DefaultConfig = APIConfig{
WorkerResponseTimeout: 60 * time.Second,
// Set default values for other fields here
}

// LoadConfig loads the API configuration
// This can be expanded later to load from environment variables or a file
func LoadConfig() (*APIConfig, error) {
// For now, we'll just return the default config
config := DefaultConfig
return &config, nil
}
10 changes: 8 additions & 2 deletions pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,13 @@ func publishWorkRequest(api *API, requestID string, request workers.WorkerType,
// - c: The gin.Context object, which provides the context for the HTTP request.
// - responseCh: A channel that receives the worker's response as a byte slice.
func handleWorkResponse(c *gin.Context, responseCh chan []byte) {
config, err := LoadConfig()
if err != nil {
logrus.Errorf("Failed to load API config: %v", err)
c.JSON(http.StatusInternalServerError, gin.H{"error": "Internal server error"})
return
}

for {
select {
case response := <-responseCh:
Expand All @@ -101,8 +108,7 @@ func handleWorkResponse(c *gin.Context, responseCh chan []byte) {

c.JSON(http.StatusOK, result)
return
// teslashibe: adjust to timeout after 10 seconds for performance testing
case <-time.After(10 * time.Second):
case <-time.After(config.WorkerResponseTimeout):
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out, check that port 4001 TCP inbound is open."})
return
case <-c.Done():
Expand Down
4 changes: 3 additions & 1 deletion pkg/config/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func GetInstance() *AppConfig {
once.Do(func() {
instance = &AppConfig{}

instance.setEnvVariableConfig()
instance.setDefaultConfig()
instance.setEnvVariableConfig()
instance.setFileConfig(viper.GetString("FILE_PATH"))
err := instance.setCommandLineConfig()
if err != nil {
Expand Down Expand Up @@ -194,6 +194,7 @@ func (c *AppConfig) setCommandLineConfig() error {
pflag.StringVar(&c.StakeAmount, "stake", viper.GetString(StakeAmount), "Amount of tokens to stake")
pflag.BoolVar(&c.Debug, "debug", viper.GetBool(Debug), "Override some protections for debugging (temporary)")
pflag.StringVar(&c.Environment, "env", viper.GetString(Environment), "Environment to connect to")
pflag.StringVar(&c.Version, "version", viper.GetString("VERSION"), "application version")
pflag.BoolVar(&c.AllowedPeer, "allowedPeer", viper.GetBool(AllowedPeer), "Set to true to allow setting this node as the allowed peer")
pflag.StringVar(&c.PrivateKey, "privateKey", viper.GetString(PrivateKey), "The private key")
pflag.StringVar(&c.PrivateKeyFile, "privKeyFile", viper.GetString(PrivKeyFile), "The private key file")
Expand Down Expand Up @@ -222,6 +223,7 @@ func (c *AppConfig) setCommandLineConfig() error {
pflag.BoolVar(&c.WebScraper, "webScraper", viper.GetBool(WebScraper), "WebScraper")
pflag.BoolVar(&c.LlmServer, "llmServer", viper.GetBool(LlmServer), "Can service LLM requests")
pflag.BoolVar(&c.Faucet, "faucet", viper.GetBool(Faucet), "Faucet")

pflag.Parse()

// Bind command line flags to Viper (optional, if you want to use Viper for additional configuration)
Expand Down
5 changes: 3 additions & 2 deletions pkg/network/discover.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ import (
"github.com/cenkalti/backoff/v4"

"github.com/libp2p/go-libp2p/core/peer"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/multiformats/go-multiaddr"

"github.com/masa-finance/masa-oracle/pkg/config"

dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
Expand Down Expand Up @@ -39,7 +40,7 @@ func Discover(ctx context.Context, host host.Host, dht *dht.IpfsDHT, protocol pr
logrus.Infof("[+] Successfully advertised protocol %s", protocolString)
}

ticker := time.NewTicker(time.Second * 10)
ticker := time.NewTicker(time.Minute * 1)
defer ticker.Stop()

var peerChan <-chan peer.AddrInfo
Expand Down
12 changes: 1 addition & 11 deletions pkg/network/kdht.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (dbValidator) Validate(_ string, _ []byte) error { return nil }
func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr,
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool, removePeerCallback func(peer.ID)) (*dht.IpfsDHT, error) {
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool) (*dht.IpfsDHT, error) {
options := make([]dht.Option, 0)
options = append(options, dht.BucketSize(100)) // Adjust bucket size
options = append(options, dht.Concurrency(100)) // Increase concurrency
Expand All @@ -46,8 +46,6 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
go monitorRoutingTable(ctx, kademliaDHT, time.Minute)

kademliaDHT.RoutingTable().PeerAdded = func(p peer.ID) {
logrus.Infof("[+] Peer added to DHT: %s", p.String())

pe := PeerEvent{
AddrInfo: peer.AddrInfo{ID: p},
Action: PeerAdded,
Expand All @@ -57,16 +55,12 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
}

kademliaDHT.RoutingTable().PeerRemoved = func(p peer.ID) {
logrus.Infof("[-] Peer removed from DHT: %s", p)
pe := PeerEvent{
AddrInfo: peer.AddrInfo{ID: p},
Action: PeerRemoved,
Source: "kdht",
}
peerChan <- pe
if removePeerCallback != nil {
removePeerCallback(p)
}
}

if err = kademliaDHT.Bootstrap(ctx); err != nil {
Expand Down Expand Up @@ -153,10 +147,6 @@ func monitorRoutingTable(ctx context.Context, dht *dht.IpfsDHT, interval time.Du
routingTable := dht.RoutingTable()
// Log the size of the routing table
logrus.Infof("[+] Routing table size: %d", routingTable.Size())
// Log the peer IDs in the routing table
for _, p := range routingTable.ListPeers() {
logrus.Debugf("[+] Peer in routing table: %s", p.String())
}
case <-ctx.Done():
// If the context is cancelled, stop the goroutine
return
Expand Down
10 changes: 3 additions & 7 deletions pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ import (
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
Expand All @@ -37,7 +36,8 @@ import (

shell "github.com/ipfs/go-ipfs-api"
pubsub "github.com/libp2p/go-libp2p-pubsub"
chain "github.com/masa-finance/masa-oracle/pkg/chain"

"github.com/masa-finance/masa-oracle/pkg/chain"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/masacrypto"
myNetwork "github.com/masa-finance/masa-oracle/pkg/network"
Expand Down Expand Up @@ -216,11 +216,7 @@ func (node *OracleNode) Start() (err error) {
go node.ListenToNodeTracker()
go node.handleDiscoveredPeers()

removePeerCallback := func(p peer.ID) {
node.NodeTracker.RemoveNodeData(p.String())
}

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked, removePeerCallback)
node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked)
if err != nil {
return err
}
Expand Down
16 changes: 11 additions & 5 deletions pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,10 @@ func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, act
// and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC".
// This can be used by other nodes to connect to this node.
func (n *NodeData) Address() string {
// Add a check for empty addresses
if len(n.Multiaddrs) == 0 {
return ""
}
return fmt.Sprintf("%s/p2p/%s", n.Multiaddrs[0].String(), n.PeerId.String())
}

Expand All @@ -117,16 +121,18 @@ func (n *NodeData) CanDoWork(workerType WorkerCategory) bool {
logrus.Infof("[+] Skipping worker %s due to timeout", n.PeerId)
return false
}

if !(n.IsStaked && n.IsActive) {
return false
}
switch workerType {
case CategoryTwitter:
return n.IsActive && n.IsTwitterScraper
return n.IsTwitterScraper
case CategoryDiscord:
return n.IsActive && n.IsDiscordScraper
return n.IsDiscordScraper
case CategoryTelegram:
return n.IsActive && n.IsTelegramScraper
return n.IsTelegramScraper
case CategoryWeb:
return n.IsActive && n.IsWebScraper
return n.IsWebScraper
default:
return false
}
Expand Down
29 changes: 21 additions & 8 deletions pkg/pubsub/node_event_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -249,7 +249,7 @@ func (net *NodeEventTracker) GetUpdatedNodes(since time.Time) []NodeData {
}

// GetEthAddress returns the Ethereum address for the given remote peer.
// It gets the peer's public key from the network's peerstore, converts
// It gets the peer's public key from the network's peer store, converts
// it to a hex string, and converts that to an Ethereum address.
// Returns an empty string if there is no public key for the peer.
func GetEthAddress(remotePeer peer.ID, n network.Network) string {
Expand All @@ -273,6 +273,18 @@ func GetEthAddress(remotePeer peer.ID, n network.Network) string {
return publicKeyHex
}

// GetEligibleWorkerNodes returns a slice of NodeData for nodes that are eligible to perform a specific category of work.
func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []NodeData {
logrus.Debugf("Getting eligible worker nodes for category: %s", category)
result := make([]NodeData, 0)
for _, nodeData := range net.GetAllNodeData() {
if nodeData.CanDoWork(category) {
result = append(result, nodeData)
}
}
return result
}

// IsStaked returns whether the node with the given peerID is marked as staked in the node data tracker.
// Returns false if no node data is found for the given peerID.
func (net *NodeEventTracker) IsStaked(peerID string) bool {
Expand Down Expand Up @@ -366,7 +378,7 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip
// entry, and if expired, processes the connect and removes the entry.
func (net *NodeEventTracker) ClearExpiredBufferEntries() {
for {
time.Sleep(30 * time.Second) // E.g., every 5 seconds
time.Sleep(1 * time.Minute)
now := time.Now()
for peerID, entry := range net.ConnectBuffer {
if now.Sub(entry.ConnectTime) > time.Minute*1 {
Expand All @@ -388,11 +400,13 @@ func (net *NodeEventTracker) ClearExpiredBufferEntries() {
//
// Parameters:
// - peerID: A string representing the ID of the peer to be removed.
func (net *NodeEventTracker) RemoveNodeData(peerID string) {
net.nodeData.Delete(peerID)
delete(net.ConnectBuffer, peerID)
logrus.Infof("[+] Removed peer %s from NodeTracker", peerID)
}
//
// TODO: we should never remove node data from the internal map. Otherwise we lose all tracking of activity.
//func (net *NodeEventTracker) RemoveNodeData(peerID string) {
// net.nodeData.Delete(peerID)
// delete(net.ConnectBuffer, peerID)
// logrus.Infof("[+] Removed peer %s from NodeTracker", peerID)
//}

// ClearExpiredWorkerTimeouts periodically checks and clears expired worker timeouts.
// It runs in an infinite loop, sleeping for 5 minutes between each iteration.
Expand Down Expand Up @@ -445,7 +459,6 @@ func (net *NodeEventTracker) cleanupStalePeers(hostId string) {
if now.Sub(time.Unix(nodeData.LastUpdatedUnix, 0)) > maxDisconnectionTime {
if nodeData.PeerId.String() != hostId {
logrus.Infof("Removing stale peer: %s", nodeData.PeerId)
net.RemoveNodeData(nodeData.PeerId.String())
delete(net.ConnectBuffer, nodeData.PeerId.String())

// Notify about peer removal
Expand Down
31 changes: 31 additions & 0 deletions pkg/workers/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
package workers

import (
"time"
)

type WorkerConfig struct {
WorkerTimeout time.Duration
WorkerResponseTimeout time.Duration
ConnectionTimeout time.Duration
MaxRetries int
MaxSpawnAttempts int
WorkerBufferSize int
MaxRemoteWorkers int
}

var DefaultConfig = WorkerConfig{
WorkerTimeout: 55 * time.Second,
WorkerResponseTimeout: 30 * time.Second,
ConnectionTimeout: 1 * time.Second,
MaxRetries: 1,
MaxSpawnAttempts: 1,
WorkerBufferSize: 100,
MaxRemoteWorkers: 1,
}

func LoadConfig() (*WorkerConfig, error) {
// For now, we'll just return the default config
config := DefaultConfig
return &config, nil
}
10 changes: 7 additions & 3 deletions pkg/workers/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,15 +9,16 @@ import (

"github.com/asynkron/protoactor-go/actor"
pubsub2 "github.com/libp2p/go-libp2p-pubsub"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"

masa "github.com/masa-finance/masa-oracle/pkg"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/scrapers/discord"
"github.com/masa-finance/masa-oracle/pkg/scrapers/telegram"
"github.com/masa-finance/masa-oracle/pkg/scrapers/twitter"
"github.com/masa-finance/masa-oracle/pkg/scrapers/web"
"github.com/masa-finance/masa-oracle/pkg/workers/messages"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
)

type LLMChatBody struct {
Expand Down Expand Up @@ -158,7 +159,10 @@ func (a *Worker) HandleWork(ctx actor.Context, m *messages.Work, node *masa.Orac
}

if err != nil {
host, _, err := net.SplitHostPort(m.Sender.Address)
host, _, err2 := net.SplitHostPort(m.Sender.Address)
if err2 != nil {
logrus.Errorf("[-] Error splitting host and port: %v", err2)
}
addrs := node.Host.Addrs()
isLocalHost := false
for _, addr := range addrs {
Expand Down
Loading
Loading