From 28013358fc234e6a423a99153296adf13a4c404d Mon Sep 17 00:00:00 2001 From: Bob Stevens <35038919+restevens402@users.noreply.github.com> Date: Tue, 22 Oct 2024 09:06:06 -0700 Subject: [PATCH] Squashed commit of the following: commit d654c567a3a1a998690edbc889bfc983d41c26b2 Author: smeb y <48400087+a3165458@users.noreply.github.com> Date: Mon Oct 21 15:55:05 2024 +0800 fix(Dockerfile): add ca-certificate (#604) Update Dockerfile Signed-off-by: smeb y <48400087+a3165458@users.noreply.github.com> commit c1e624ae25e3e2187353551615eda96fc7235f53 Author: Ettore Di Giacinto Date: Thu Oct 17 15:23:03 2024 +0200 chore(docs): drop unnecessary duplicated content Signed-off-by: Ettore Di Giacinto commit 9529e4ccd6336de6af0ed0be17230ad420de736e Author: Ettore Di Giacinto Date: Thu Oct 17 15:21:07 2024 +0200 chore(docs): update .env.example (#603) Signed-off-by: Ettore Di Giacinto commit 038fad6352f92e527a9482642163440362136bb6 Author: Ettore Di Giacinto Date: Thu Oct 17 14:55:25 2024 +0200 fix(contracts): load config from embedded (#602) * fix(contracts): load config from embedded There was still some code reading from the filesystem instead of the embedded files in the binary. Regression introduced in https://github.com/masa-finance/masa-oracle/pull/523. Fixes: https://github.com/masa-finance/masa-oracle/issues/578 See also: https://github.com/masa-finance/masa-oracle/pull/579 Signed-off-by: mudler * chore(tests): temporarly disable Twitter tests These are going to be taken care of as part of https://github.com/masa-finance/masa-oracle/pull/573 Signed-off-by: mudler --------- Signed-off-by: mudler commit a8a77a6f608aceb7056c197f457c83ee96c667db Author: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue Oct 15 14:32:29 2024 -0700 feat(twitter): Implement random sleep and improve login process (#601) - Add RandomSleep function to introduce variability in request timing - Update NewScraper to use RandomSleep before and after login attempts - Adjust sleep duration range to 500ms - 2s for more natural behavior - Improve error handling and logging in the login process commit 01ec8c4e7455ac9b5b2034315f4f5cc5b30f54f9 Author: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue Oct 15 08:59:08 2024 -0700 chore(version): update protocol version and update twitter_cookies.example.json commit 6f594e1f7204a0a8e1260ed9270f80f38e4548c6 Author: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue Oct 15 08:48:01 2024 -0700 feat(twitter): Enhanced Twitter Worker Selection Algorithm (#591) * Add detailed error logging and track worker update time Enhanced the worker manager to append specific error messages to a list for better debugging. Additionally, updated node data to track the last update time, improving data consistency and traceability. * Update version.go * refactor(twitter): remove retry functionality from scraper - Remove Retry function and MaxRetries constant from config.go - Update ScrapeFollowersForProfile, ScrapeTweetsProfile, and ScrapeTweetsByQuery to remove Retry wrapper - Adjust error handling in each function to directly return errors - Simplify code structure and reduce complexity - Maintain rate limit handling functionality * chore(workers): update max workers to 50 * chore(workers): upate to 25 * feat(pubsub): improve node sorting algorithm for Twitter reliability - Prioritize nodes with more recent last returned tweets - Maintain high importance for total returned tweet count - Consider time since last timeout to allow recovery from temporary issues - Deprioritize nodes with recent "not found" occurrences - Remove NotFoundCount from sorting criteria This change aims to better balance node performance and recent activity, while allowing nodes to recover quickly from temporary issues like rate limiting. * feat(workers): improve Twitter worker selection algorithm - Modify GetEligibleWorkers to use a specialized selection for Twitter workers - Introduce controlled randomness in Twitter worker selection - Balance between prioritizing high-performing Twitter workers and fair distribution - Maintain existing behavior for non-Twitter worker selection - Preserve handling of local worker and respect original worker limit This change enhances the worker selection algorithm for Twitter tasks to provide a better balance between utilizing top-performing nodes and ensuring fair work distribution. It introduces a dynamic pool size calculation and controlled randomness for Twitter workers, while maintaining the existing round-robin approach for other worker types. --------- Co-authored-by: Bob Stevens <35038919+restevens402@users.noreply.github.com> commit f09fb2076d4c1376ea855a12e72692c76ef32803 Author: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue Oct 8 15:38:45 2024 -0700 Feat(workers) implement adaptive worker selection for improved task distribution (#589) * feat(worker-selection): Implement performance-based worker sorting - Add performance metrics fields to NodeData struct - Implement NodeSorter for flexible sorting of worker nodes - Create SortNodesByTwitterReliability function for Twitter workers - Update GetEligibleWorkerNodes to use category-specific sorting - Modify GetEligibleWorkers to use sorted workers and add worker limit This commit enhances the worker selection process by prioritizing workers based on their performance metrics. It introduces a flexible sorting mechanism that can be easily extended to other worker categories in the future. The changes improve reliability and efficiency in task allocation across the Masa Oracle network. * feat(worker-selection): Implement priority-based selection for Twitter work - Update DistributeWork to use priority selection for Twitter category - Maintain round-robin selection for other work categories by shuffling workers - Integrate new GetEligibleWorkers function with work type-specific behavior - Respect MaxRemoteWorkers limit for all work types - Add distinct logging for Twitter and non-Twitter worker selection This commit enhances the work distribution process by implementing priority-based worker selection for Twitter-related tasks while preserving the existing round-robin behavior for other work types. It leverages the newly added performance metrics to choose the most reliable workers for Twitter tasks, and ensures consistent behavior for other categories by shuffling the worker list. This hybrid approach improves efficiency for Twitter tasks while maintaining the expected behavior for all other work types. * Update .gitignore * feat(worker-selection): Implement priority-based sorting for Twitter workers - Add LastNotFoundTime and NotFoundCount fields to NodeData struct - Enhance SortNodesByTwitterReliability function with multi-criteria sorting: 1. Prioritize nodes found more often (lower NotFoundCount) 2. Consider recency of last not-found occurrence 3. Sort by higher number of returned tweets 4. Consider recency of last returned tweet 5. Prioritize nodes with fewer timeouts 6. Consider recency of last timeout 7. Use PeerId for stable sorting when no performance data is available - Remove random shuffling from GetEligibleWorkers function This commit improves worker selection for Twitter tasks by implementing a more sophisticated sorting algorithm that takes into account node reliability and performance metrics. It aims to enhance the efficiency and reliability of task distribution in the Masa Oracle network. * feat(worker-selection): Update Twitter fields in NodeData and Worker Manager Add functions to update Twitter-related metrics in NodeData and integrate updates into Worker Manager processes. This ensures accurate tracking of tweet-related events and peer activity in the system. * feat(worker-selection): Add unit tests for NodeData and NodeDataTracker Introduce unit tests for the NodeData and NodeDataTracker functionalities, covering scenarios involving updates to Twitter-related fields. These tests ensure the correctness of the UpdateTwitterFields method in NodeData and the UpdateNodeDataTwitter method in NodeDataTracker. * chore(workers): update timeouts and bump version --------- Co-authored-by: Bob Stevens <35038919+restevens402@users.noreply.github.com> commit 0ef0df429ff3d031ca571bf0f0347079a84415c6 Author: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue Oct 8 14:37:01 2024 -0700 feat(api): Add configurable API server enablement (#586) * feat(api): Add configurable API server enablement This commit introduces a new feature that allows the API server to be conditionally enabled or disabled based on configuration. The changes include: 1. In cmd/masa-node/main.go: - Refactored signal handling into a separate function `handleSignals` - Added conditional logic to start the API server only if enabled - Improved logging to indicate API server status 2. In pkg/config/app.go: - Added `APIEnabled` field to the `AppConfig` struct - Set default value for `APIEnabled` to false in `setDefaultConfig` - Added command-line flag for `apiEnabled` in `setCommandLineConfig` 3. In pkg/config/constants.go: - Added `APIEnabled` constant for environment variable configuration These changes provide more flexibility in node configuration, allowing users to run the node with or without the API server. This can be useful for security purposes or in scenarios where the API is not needed. The API can now be enabled via: - Environment variable: API_ENABLED=true - Command-line flag: --apiEnabled - Configuration file: apiEnabled: true By default, the API server will be disabled for enhanced security. * chore(config): update to take api-enabled=true and update Makefile with run-api case * Update Makefile --- .env.example | 4 +- .gitignore | 1 + Dockerfile | 5 +- Makefile | 3 + cmd/masa-node/main.go | 55 ++++++------ docs/oracle-node/quickstart.md | 4 +- internal/versioning/version.go | 2 +- pkg/config/app.go | 59 +++++------- pkg/config/constants.go | 1 + pkg/pubsub/node_data.go | 26 ++++++ pkg/pubsub/node_event_tracker.go | 89 ++++++++++++++++++- pkg/scrapers/twitter/auth.go | 4 +- pkg/scrapers/twitter/config.go | 35 ++++---- pkg/scrapers/twitter/followers.go | 26 +++--- pkg/scrapers/twitter/profile.go | 22 +++-- pkg/scrapers/twitter/tweets.go | 31 ++++--- pkg/staking/config.go | 15 ++-- pkg/staking/config_test.go | 19 ++++ pkg/tests/node_data/node_data_suite_test.go | 13 +++ pkg/tests/node_data/node_data_test.go | 41 +++++++++ pkg/tests/node_data/node_data_tracker_test.go | 60 +++++++++++++ pkg/tests/scrapers/twitter_scraper_test.go | 19 ++-- pkg/tests/twitter/twitter_scraper_test.go | 19 ++-- pkg/workers/config.go | 8 +- pkg/workers/worker_manager.go | 50 ++++++++++- pkg/workers/worker_selection.go | 77 ++++++++++++++-- twitter_cookies.example.json | 32 ++++--- 27 files changed, 528 insertions(+), 192 deletions(-) create mode 100644 pkg/staking/config_test.go create mode 100644 pkg/tests/node_data/node_data_suite_test.go create mode 100644 pkg/tests/node_data/node_data_test.go create mode 100644 pkg/tests/node_data/node_data_tracker_test.go diff --git a/.env.example b/.env.example index 33a3ba66..4fa6a51b 100644 --- a/.env.example +++ b/.env.example @@ -23,7 +23,7 @@ RPC_URL=https://ethereum-sepolia.publicnode.com # Twitter Configuration # Note: A pro-paid Twitter account is required to run a Twitter worker TWITTER_SCRAPER=true -TWITTER_USERNAME=your pro-paid twitter username (without the '@' symbol) +TWITTER_ACCOUNTS=your pro-paid twitter username (without the '@' symbol) TWITTER_PASSWORD=your twitter password # Important: If your 2FA code times out, you'll need to restart your node and login by submitting a request. # We recommend temporarily disabling 2FA to save your cookies locally to your .home or .masa directory, then re-enabling it afterwards. @@ -46,4 +46,4 @@ TELEGRAM_APP_ID=your telegram app id TELEGRAM_APP_HASH=your telegram app hash # Configure your Telegram bot and add it to the channel you want to scrape TELEGRAM_BOT_TOKEN=your telegram bot token -TELEGRAM_CHANNEL_USERNAME=username of the channel to scrape (without the '@' symbol) \ No newline at end of file +TELEGRAM_CHANNEL_USERNAME=username of the channel to scrape (without the '@' symbol) diff --git a/.gitignore b/.gitignore index eaa12583..7b804f9f 100644 --- a/.gitignore +++ b/.gitignore @@ -72,3 +72,4 @@ snippets.txt # Build result of goreleaser dist/ +bp-todo.md diff --git a/Dockerfile b/Dockerfile index c775bcb5..41c1a84b 100644 --- a/Dockerfile +++ b/Dockerfile @@ -20,6 +20,9 @@ RUN make build # Use the official Ubuntu 22.04 image as a base for the final image FROM ubuntu:22.04 AS base +# Install ca-certificates to ensure TLS verification works +RUN apt-get update && apt-get install -y ca-certificates && update-ca-certificates + COPY --from=builder /app/bin/masa-node /usr/bin/masa-node RUN chmod +x /usr/bin/masa-node @@ -38,4 +41,4 @@ EXPOSE 4001 8080 # Set default command to start the Go application -CMD /usr/bin/masa-node --bootnodes="$BOOTNODES" --env="$ENV" --validator="$VALIDATOR" --cachePath="$CACHE_PATH" \ No newline at end of file +CMD /usr/bin/masa-node --bootnodes="$BOOTNODES" --env="$ENV" --validator="$VALIDATOR" --cachePath="$CACHE_PATH" diff --git a/Makefile b/Makefile index 7f925ade..e7c03c02 100644 --- a/Makefile +++ b/Makefile @@ -32,6 +32,9 @@ install: run: build @./bin/masa-node +run-api-enabled: build + @./bin/masa-node --api-enabled=true + faucet: build ./bin/masa-node --faucet diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index 409688ef..511140e8 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -100,34 +100,20 @@ func main() { // Init cache resolver db.InitResolverCache(masaNode, keyManager) - // Listen for SIGINT (CTRL+C) - c := make(chan os.Signal, 1) - signal.Notify(c, os.Interrupt, syscall.SIGTERM) - // Cancel the context when SIGINT is received - go func() { - <-c - nodeData := masaNode.NodeTracker.GetNodeData(masaNode.Host.ID().String()) - if nodeData != nil { - nodeData.Left() - } - cancel() - // Call the global StopFunc to stop the Telegram background connection - cfg := config.GetInstance() - if cfg.TelegramStop != nil { - if err := cfg.TelegramStop(); err != nil { - logrus.Errorf("Error stopping the background connection: %v", err) - } - } - }() + go handleSignals(cancel, masaNode, cfg) - router := api.SetupRoutes(masaNode, workHandlerManager, pubKeySub) - go func() { - err = router.Run() - if err != nil { - logrus.Fatal(err) - } - }() + if cfg.APIEnabled { + router := api.SetupRoutes(masaNode, workHandlerManager, pubKeySub) + go func() { + if err := router.Run(); err != nil { + logrus.Fatal(err) + } + }() + logrus.Info("API server started") + } else { + logrus.Info("API server is disabled") + } // Get the multiaddress and IP address of the node multiAddr := masaNode.GetMultiAddrs() // Get the multiaddress @@ -137,3 +123,20 @@ func main() { <-ctx.Done() } + +func handleSignals(cancel context.CancelFunc, masaNode *node.OracleNode, cfg *config.AppConfig) { + c := make(chan os.Signal, 1) + signal.Notify(c, os.Interrupt, syscall.SIGTERM) + + <-c + nodeData := masaNode.NodeTracker.GetNodeData(masaNode.Host.ID().String()) + if nodeData != nil { + nodeData.Left() + } + cancel() + if cfg.TelegramStop != nil { + if err := cfg.TelegramStop(); err != nil { + logrus.Errorf("Error stopping the background connection: %v", err) + } + } +} diff --git a/docs/oracle-node/quickstart.md b/docs/oracle-node/quickstart.md index a19c8e91..d2a1731b 100644 --- a/docs/oracle-node/quickstart.md +++ b/docs/oracle-node/quickstart.md @@ -32,7 +32,9 @@ Create a `.env` file in the root directory with the following content: ```plaintext # Default .env configuration -BOOTNODES=/ip4/35.223.224.220/udp/4001/quic-v1/p2p/16Uiu2HAmPxXXjR1XJEwckh6q1UStheMmGaGe8fyXdeRs3SejadSa,/ip4/34.121.111.128/udp/4001/quic-v1/p2p/16Uiu2HAmKULCxKgiQn1EcfKnq1Qam6psYLDTM99XsZFhr57wLadF + +# Check bootnodes addresses in the Masa documentation https://developers.masa.ai/docs/welcome-to-masa +BOOTNODES= API_KEY= RPC_URL=https://ethereum-sepolia.publicnode.com diff --git a/internal/versioning/version.go b/internal/versioning/version.go index 9c9ee19e..21e4407a 100644 --- a/internal/versioning/version.go +++ b/internal/versioning/version.go @@ -5,5 +5,5 @@ var ( // XXX: Bump this value only when there are protocol changes that makes the oracle // incompatible between version! - ProtocolVersion = `v0.8.0` + ProtocolVersion = `v0.8.4` ) diff --git a/pkg/config/app.go b/pkg/config/app.go index 3d7fdf87..1ecbebab 100644 --- a/pkg/config/app.go +++ b/pkg/config/app.go @@ -17,31 +17,6 @@ import ( "github.com/spf13/viper" ) -// AppConfig is designed as a singleton to ensure that there is only one instance -// of the configuration throughout the application. This design pattern is useful -// for managing global application settings, allowing various parts of the application -// to access configuration settings consistently without the need to pass the configuration -// object around or risk having multiple, potentially conflicting instances of the configuration. -// -// The singleton pattern is implemented using a combination of a private instance variable -// (`instance`) and a public `GetInstance` method. The `instance` variable holds the single -// instance of AppConfig, while the `GetInstance` method provides a global access point to that instance. -// Additionally, the `sync.Once` mechanism ensures that the AppConfig instance is initialized only once, -// making the initialization thread-safe. -// -// Usage: -// To access the AppConfig instance, call the GetInstance method from anywhere in your application: -// -// config := config.GetInstance() -// -// This call will return the singleton AppConfig instance. If the instance has not been initialized yet, -// `GetInstance` will initialize it by setting default values, reading configuration from files, -// environment variables, and command-line flags, and then return the instance. Subsequent calls to -// `GetInstance` will return the same instance without reinitializing it. -// -// It's important to note that since AppConfig is a singleton, any modifications to the configuration -// settings through the AppConfig instance will be reflected across the entire application. - var ( instance *AppConfig once sync.Once @@ -93,6 +68,7 @@ type AppConfig struct { LlmServer bool `mapstructure:"llmServer"` LLMChatUrl string `mapstructure:"llmChatUrl"` LLMCfUrl string `mapstructure:"llmCfUrl"` + APIEnabled bool `mapstructure:"api_enabled"` TelegramStop bg.StopFunc } @@ -125,9 +101,10 @@ func GetInstance() *AppConfig { if err := viper.Unmarshal(instance); err != nil { logrus.Errorf("[-] Unable to unmarshal config into struct, %v", err) - instance = nil // Ensure instance is nil if unmarshalling fails + instance = nil } + instance.APIEnabled = viper.GetBool("api_enabled") }) return instance } @@ -135,7 +112,6 @@ func GetInstance() *AppConfig { // setDefaultConfig sets the default configuration values for the AppConfig instance. // It retrieves the user's home directory and sets default values for various configuration options // such as the MasaDir, Bootnodes, RpcUrl, Environment, FilePath, Validator, and CachePath. -// It also fetches bootnode information from a remote URL based on the environment (dev, test, or main). func (c *AppConfig) setDefaultConfig() { usr, err := user.Current() @@ -147,7 +123,7 @@ func (c *AppConfig) setDefaultConfig() { viper.SetDefault(MasaDir, filepath.Join(usr.HomeDir, ".masa")) // Set defaults - viper.SetDefault("Version", versioning.ProtocolVersion) + viper.SetDefault("version", versioning.ProtocolVersion) viper.SetDefault(PortNbr, "4001") viper.SetDefault(UDP, true) viper.SetDefault(TCP, false) @@ -157,6 +133,8 @@ func (c *AppConfig) setDefaultConfig() { viper.SetDefault(LogLevel, "info") viper.SetDefault(LogFilePath, "masa_node.log") viper.SetDefault(PrivKeyFile, filepath.Join(viper.GetString(MasaDir), "masa_oracle_key")) + + viper.SetDefault("api_enabled", false) } // setFileConfig loads configuration from a YAML file. @@ -196,7 +174,7 @@ func (c *AppConfig) setCommandLineConfig() error { pflag.StringVar(&c.StakeAmount, "stake", viper.GetString(StakeAmount), "Amount of tokens to stake") pflag.BoolVar(&c.Debug, "debug", viper.GetBool(Debug), "Override some protections for debugging (temporary)") pflag.StringVar(&c.Environment, "env", viper.GetString(Environment), "Environment to connect to") - pflag.StringVar(&c.Version, "version", viper.GetString("VERSION"), "application version") + pflag.StringVar(&c.Version, "version", viper.GetString("version"), "Application version") pflag.BoolVar(&c.AllowedPeer, "allowedPeer", viper.GetBool(AllowedPeer), "Set to true to allow setting this node as the allowed peer") pflag.StringVar(&c.PrivateKey, "privateKey", viper.GetString(PrivateKey), "The private key") pflag.StringVar(&c.PrivateKeyFile, "privKeyFile", viper.GetString(PrivKeyFile), "The private key file") @@ -214,26 +192,32 @@ func (c *AppConfig) setCommandLineConfig() error { pflag.StringVar(&c.Twitter2FaCode, "twitter2FaCode", viper.GetString(Twitter2FaCode), "Twitter 2FA Code") pflag.StringVar(&c.DiscordBotToken, "discordBotToken", viper.GetString(DiscordBotToken), "Discord Bot Token") pflag.StringVar(&c.ClaudeApiKey, "claudeApiKey", viper.GetString(ClaudeApiKey), "Claude API Key") - pflag.StringVar(&c.ClaudeApiURL, "claudeApiUrl", viper.GetString(ClaudeApiURL), "Claude API Url") + pflag.StringVar(&c.ClaudeApiURL, "claudeApiUrl", viper.GetString(ClaudeApiURL), "Claude API URL") pflag.StringVar(&c.ClaudeApiVersion, "claudeApiVersion", viper.GetString(ClaudeApiVersion), "Claude API Version") pflag.StringVar(&c.GPTApiKey, "gptApiKey", viper.GetString(GPTApiKey), "OpenAI API Key") pflag.StringVar(&c.LLMChatUrl, "llmChatUrl", viper.GetString(LlmChatUrl), "URL for support LLM Chat calls") pflag.StringVar(&c.LLMCfUrl, "llmCfUrl", viper.GetString(LlmCfUrl), "URL for support LLM Cloudflare calls") - pflag.BoolVar(&c.TwitterScraper, "twitterScraper", viper.GetBool(TwitterScraper), "TwitterScraper") - pflag.BoolVar(&c.DiscordScraper, "discordScraper", viper.GetBool(DiscordScraper), "DiscordScraper") - pflag.BoolVar(&c.TelegramScraper, "telegramScraper", viper.GetBool(TelegramScraper), "TelegramScraper") - pflag.BoolVar(&c.WebScraper, "webScraper", viper.GetBool(WebScraper), "WebScraper") + pflag.BoolVar(&c.TwitterScraper, "twitterScraper", viper.GetBool(TwitterScraper), "Twitter Scraper") + pflag.BoolVar(&c.DiscordScraper, "discordScraper", viper.GetBool(DiscordScraper), "Discord Scraper") + pflag.BoolVar(&c.TelegramScraper, "telegramScraper", viper.GetBool(TelegramScraper), "Telegram Scraper") + pflag.BoolVar(&c.WebScraper, "webScraper", viper.GetBool(WebScraper), "Web Scraper") pflag.BoolVar(&c.LlmServer, "llmServer", viper.GetBool(LlmServer), "Can service LLM requests") pflag.BoolVar(&c.Faucet, "faucet", viper.GetBool(Faucet), "Faucet") + pflag.BoolVar(&c.APIEnabled, "api-enabled", viper.GetBool("api_enabled"), "Enable API server") pflag.Parse() - // Bind command line flags to Viper (optional, if you want to use Viper for additional configuration) + // Bind command line flags to Viper err := viper.BindPFlags(pflag.CommandLine) if err != nil { return err } + + // Add this line after binding flags + viper.Set("api_enabled", c.APIEnabled) + c.Bootnodes = strings.Split(bootnodes, ",") + return nil } @@ -243,13 +227,12 @@ func (c *AppConfig) LogConfig() { val := reflect.ValueOf(*c) typeOfStruct := val.Type() - // logrus.Info("Current AppConfig values:") for i := 0; i < val.NumField(); i++ { field := typeOfStruct.Field(i) value := val.Field(i).Interface() - // Example of skipping sensitive fields - if field.Name == "PrivateKeyFile" || field.Name == "Signature" { + // Skipping sensitive fields + if field.Name == "PrivateKey" || field.Name == "Signature" || field.Name == "PrivateKeyFile" { continue } logrus.Infof("%s: %v", field.Name, value) diff --git a/pkg/config/constants.go b/pkg/config/constants.go index f350ffe3..4772238a 100644 --- a/pkg/config/constants.go +++ b/pkg/config/constants.go @@ -121,6 +121,7 @@ const ( LlmServer = "LLM_SERVER" LlmChatUrl = "LLM_CHAT_URL" LlmCfUrl = "LLM_CF_URL" + APIEnabled = "API_ENABLED" ) // Function to call the Cloudflare API and parse the response diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index 4bd69f6f..37db8cf3 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -63,6 +63,13 @@ type NodeData struct { Records any `json:"records,omitempty"` Version string `json:"version"` WorkerTimeout time.Time `json:"workerTimeout,omitempty"` + ReturnedTweets int `json:"returnedTweets"` // a running count of the number of tweets returned + LastReturnedTweet time.Time `json:"lastReturnedTweet"` + TweetTimeout bool `json:"tweetTimeout"` + TweetTimeouts int `json:"tweetTimeouts"` // a running countthe number of times a tweet request times out + LastTweetTimeout time.Time `json:"lastTweetTimeout"` + LastNotFoundTime time.Time `json:"lastNotFoundTime"` + NotFoundCount int `json:"notFoundCount"` // a running count of the number of times a node is not found } // NewNodeData creates a new NodeData struct initialized with the given @@ -256,3 +263,22 @@ func (n *NodeData) MergeMultiaddresses(addr multiaddr.Multiaddr) { n.Multiaddrs = append(n.Multiaddrs, JSONMultiaddr{Multiaddr: addr}) } } + +func (nd *NodeData) UpdateTwitterFields(fields NodeData) { + if fields.ReturnedTweets != 0 { + nd.ReturnedTweets += fields.ReturnedTweets + } + if !fields.LastReturnedTweet.IsZero() { + nd.LastReturnedTweet = fields.LastReturnedTweet + } + if fields.TweetTimeout { + nd.TweetTimeout = fields.TweetTimeout + nd.TweetTimeouts += fields.TweetTimeouts + nd.LastTweetTimeout = fields.LastTweetTimeout + } + if !fields.LastNotFoundTime.IsZero() { + nd.LastNotFoundTime = fields.LastNotFoundTime + nd.NotFoundCount += fields.NotFoundCount + } + nd.LastUpdatedUnix = time.Now().Unix() +} diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index 91231148..a2b963b6 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -29,11 +29,70 @@ type ConnectBufferEntry struct { ConnectTime time.Time } +// NodeSorter provides methods for sorting NodeData slices +type NodeSorter struct { + nodes []NodeData + less func(i, j NodeData) bool +} + +// Len returns the length of the nodes slice +func (s NodeSorter) Len() int { return len(s.nodes) } + +// Swap swaps the nodes at indices i and j +func (s NodeSorter) Swap(i, j int) { s.nodes[i], s.nodes[j] = s.nodes[j], s.nodes[i] } + +// Less compares nodes at indices i and j using the provided less function +func (s NodeSorter) Less(i, j int) bool { return s.less(s.nodes[i], s.nodes[j]) } + +// SortNodesByTwitterReliability sorts the given nodes based on their Twitter reliability. +// It uses multiple criteria to determine the reliability and performance of nodes: +// 1. Prioritizes nodes with more recent last returned tweet +// 2. Then by higher number of returned tweets +// 3. Considers the time since last timeout (longer time is better) +// 4. Then by lower number of timeouts +// 5. Deprioritizes nodes with more recent last not found time +// 6. Finally, sorts by PeerId for stability when no performance data is available +// +// The function modifies the input slice in-place, sorting the nodes from most to least reliable. +func SortNodesByTwitterReliability(nodes []NodeData) { + now := time.Now() + sorter := NodeSorter{ + nodes: nodes, + less: func(i, j NodeData) bool { + // Primary sort: More recent last returned tweet + if !i.LastReturnedTweet.Equal(j.LastReturnedTweet) { + return i.LastReturnedTweet.After(j.LastReturnedTweet) + } + // Secondary sort: Higher number of returned tweets + if i.ReturnedTweets != j.ReturnedTweets { + return i.ReturnedTweets > j.ReturnedTweets + } + // Tertiary sort: Longer time since last timeout + iTimeSinceTimeout := now.Sub(i.LastTweetTimeout) + jTimeSinceTimeout := now.Sub(j.LastTweetTimeout) + if iTimeSinceTimeout != jTimeSinceTimeout { + return iTimeSinceTimeout > jTimeSinceTimeout + } + // Quaternary sort: Lower number of timeouts + if i.TweetTimeouts != j.TweetTimeouts { + return i.TweetTimeouts < j.TweetTimeouts + } + // Quinary sort: Earlier last not found time (deprioritize more recent not found) + if !i.LastNotFoundTime.Equal(j.LastNotFoundTime) { + return i.LastNotFoundTime.Before(j.LastNotFoundTime) + } + // Default sort: By PeerId (ensures stable sorting when no performance data is available) + return i.PeerId.String() < j.PeerId.String() + }, + } + sort.Sort(sorter) +} + // NewNodeEventTracker creates a new NodeEventTracker instance. // It initializes the node data map, node data channel, node data file path, // connect buffer map. It loads existing node data from file, starts a goroutine // to clear expired buffer entries, and returns the initialized instance. -func NewNodeEventTracker(version, environment string, hostId string) *NodeEventTracker { +func NewNodeEventTracker(version, environment, hostId string) *NodeEventTracker { net := &NodeEventTracker{ nodeData: NewSafeMap(), nodeVersion: version, @@ -284,6 +343,17 @@ func (net *NodeEventTracker) GetEligibleWorkerNodes(category WorkerCategory) []N result = append(result, nodeData) } } + + // Sort the eligible nodes based on the worker category + switch category { + case CategoryTwitter: + SortNodesByTwitterReliability(result) + // Add cases for other categories as needed such as + // web + // discord + // telegram + } + return result } @@ -477,3 +547,20 @@ func (net *NodeEventTracker) cleanupStalePeers(hostId string) { } } } + +func (net *NodeEventTracker) UpdateNodeDataTwitter(peerID string, updates NodeData) error { + nodeData, exists := net.nodeData.Get(peerID) + if !exists { + return fmt.Errorf("node data not found for peer ID: %s", peerID) + } + + // Update fields based on non-zero values + nodeData.UpdateTwitterFields(updates) + + // Save the updated node data + err := net.AddOrUpdateNodeData(nodeData, true) + if err != nil { + return fmt.Errorf("error updating node data: %v", err) + } + return nil +} diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index a6be7767..47163aeb 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -17,14 +17,14 @@ func NewScraper(account *TwitterAccount, cookieDir string) *Scraper { } } - ShortSleep() + RandomSleep() if err := scraper.Login(account.Username, account.Password, account.TwoFACode); err != nil { logrus.WithError(err).Warnf("Login failed for %s", account.Username) return nil } - ShortSleep() + RandomSleep() if err := SaveCookies(scraper.Scraper, account, cookieDir); err != nil { logrus.WithError(err).Errorf("Failed to save cookies for %s", account.Username) diff --git a/pkg/scrapers/twitter/config.go b/pkg/scrapers/twitter/config.go index 6e7e6433..42b69a88 100644 --- a/pkg/scrapers/twitter/config.go +++ b/pkg/scrapers/twitter/config.go @@ -1,35 +1,32 @@ package twitter import ( - "fmt" + "math/rand" "time" "github.com/sirupsen/logrus" ) const ( - ShortSleepDuration = 20 * time.Millisecond - RateLimitDuration = time.Hour - MaxRetries = 3 + minSleepDuration = 500 * time.Millisecond + maxSleepDuration = 2 * time.Second + RateLimitDuration = 15 * time.Minute ) -func ShortSleep() { - time.Sleep(ShortSleepDuration) +var ( + rng *rand.Rand +) + +func init() { + rng = rand.New(rand.NewSource(time.Now().UnixNano())) } -func GetRateLimitDuration() time.Duration { - return RateLimitDuration +func RandomSleep() { + duration := minSleepDuration + time.Duration(rng.Int63n(int64(maxSleepDuration-minSleepDuration))) + logrus.Debugf("Sleeping for %v", duration) + time.Sleep(duration) } -func Retry[T any](operation func() (T, error), maxAttempts int) (T, error) { - var zero T - for attempt := 1; attempt <= maxAttempts; attempt++ { - result, err := operation() - if err == nil { - return result, nil - } - logrus.Errorf("retry attempt %d failed: %v", attempt, err) - time.Sleep(time.Duration(attempt) * time.Second) - } - return zero, fmt.Errorf("operation failed after %d attempts", maxAttempts) +func GetRateLimitDuration() time.Duration { + return RateLimitDuration } diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index 18746b55..c2acdd90 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -8,21 +8,19 @@ import ( ) func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Legacy, error) { - return Retry(func() ([]twitterscraper.Legacy, error) { - scraper, account, err := getAuthenticatedScraper() - if err != nil { - return nil, err - } + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return nil, err + } - followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") - if errString != "" { - if handleRateLimit(fmt.Errorf(errString), account) { - return nil, fmt.Errorf("rate limited") - } - logrus.Errorf("Error fetching followers: %v", errString) - return nil, fmt.Errorf("%v", errString) + followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") + if errString != "" { + if handleRateLimit(fmt.Errorf(errString), account) { + return nil, fmt.Errorf("rate limited") } + logrus.Errorf("Error fetching followers: %v", errString) + return nil, fmt.Errorf("%v", errString) + } - return followingResponse, nil - }, MaxRetries) + return followingResponse, nil } diff --git a/pkg/scrapers/twitter/profile.go b/pkg/scrapers/twitter/profile.go index 8337f186..cfe77096 100644 --- a/pkg/scrapers/twitter/profile.go +++ b/pkg/scrapers/twitter/profile.go @@ -5,19 +5,17 @@ import ( ) func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { - return Retry(func() (twitterscraper.Profile, error) { - scraper, account, err := getAuthenticatedScraper() - if err != nil { - return twitterscraper.Profile{}, err - } + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return twitterscraper.Profile{}, err + } - profile, err := scraper.GetProfile(username) - if err != nil { - if handleRateLimit(err, account) { - return twitterscraper.Profile{}, err - } + profile, err := scraper.GetProfile(username) + if err != nil { + if handleRateLimit(err, account) { return twitterscraper.Profile{}, err } - return profile, nil - }, MaxRetries) + return twitterscraper.Profile{}, err + } + return profile, nil } diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index 322553dc..b32b2c4d 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -10,25 +10,24 @@ type TweetResult struct { Tweet *twitterscraper.Tweet Error error } + func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { - return Retry(func() ([]*TweetResult, error) { - scraper, account, err := getAuthenticatedScraper() - if err != nil { - return nil, err - } + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return nil, err + } - var tweets []*TweetResult - ctx := context.Background() - scraper.SetSearchMode(twitterscraper.SearchLatest) - for tweet := range scraper.SearchTweets(ctx, query, count) { - if tweet.Error != nil { - if handleRateLimit(tweet.Error, account) { - return nil, tweet.Error - } + var tweets []*TweetResult + ctx := context.Background() + scraper.SetSearchMode(twitterscraper.SearchLatest) + for tweet := range scraper.SearchTweets(ctx, query, count) { + if tweet.Error != nil { + if handleRateLimit(tweet.Error, account) { return nil, tweet.Error } - tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) + return nil, tweet.Error } - return tweets, nil - }, MaxRetries) + tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) + } + return tweets, nil } diff --git a/pkg/staking/config.go b/pkg/staking/config.go index ed2cf953..d2b2e029 100644 --- a/pkg/staking/config.go +++ b/pkg/staking/config.go @@ -2,23 +2,28 @@ package staking import ( "encoding/json" - "os" "path/filepath" + + "github.com/masa-finance/masa-oracle/contracts" ) // LoadContractAddresses loads the contract addresses from the addresses.json file. // It returns a ContractAddresses struct containing the loaded addresses. func LoadContractAddresses() (*ContractAddresses, error) { - masaOracleTokensPath := filepath.Join("contracts", "node_modules", "@masa-finance", "masa-contracts-oracle", "addresses.json") - masaTokenPath := filepath.Join("contracts", "node_modules", "@masa-finance", "masa-token", "addresses.json") - masaTokenData, err := os.ReadFile(masaTokenPath) + masaTokenPath := filepath.Join("node_modules", "@masa-finance", "masa-token", "addresses.json") + + masaTokenData, err := contracts.EmbeddedContracts.ReadFile(masaTokenPath) if err != nil { return nil, err } - masaOracleTokensData, err := os.ReadFile(masaOracleTokensPath) + + masaOracleTokensPath := filepath.Join("node_modules", "@masa-finance", "masa-contracts-oracle", "addresses.json") + + masaOracleTokensData, err := contracts.EmbeddedContracts.ReadFile(masaOracleTokensPath) if err != nil { return nil, err } + var tokenAddresses map[string]map[string]string var addresses ContractAddresses err = json.Unmarshal(masaTokenData, &tokenAddresses) diff --git a/pkg/staking/config_test.go b/pkg/staking/config_test.go new file mode 100644 index 00000000..d72641b3 --- /dev/null +++ b/pkg/staking/config_test.go @@ -0,0 +1,19 @@ +package staking_test + +import ( + . "github.com/masa-finance/masa-oracle/pkg/staking" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Staking tests", func() { + Context("LoadContractAddresses", func() { + It("Returns the contract addresses", func() { + cont, err := LoadContractAddresses() + Expect(err).ToNot(HaveOccurred()) + Expect(cont.Sepolia.MasaToken).ToNot(BeEmpty()) + Expect(cont.Sepolia.MasaFaucet).ToNot(BeEmpty()) + Expect(cont.Sepolia.ProtocolStaking).ToNot(BeEmpty()) + }) + }) +}) diff --git a/pkg/tests/node_data/node_data_suite_test.go b/pkg/tests/node_data/node_data_suite_test.go new file mode 100644 index 00000000..9ae8c1aa --- /dev/null +++ b/pkg/tests/node_data/node_data_suite_test.go @@ -0,0 +1,13 @@ +package node_data + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestNodeData(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "NodeData Test Suite") +} diff --git a/pkg/tests/node_data/node_data_test.go b/pkg/tests/node_data/node_data_test.go new file mode 100644 index 00000000..090a8a0c --- /dev/null +++ b/pkg/tests/node_data/node_data_test.go @@ -0,0 +1,41 @@ +package node_data + +import ( + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + "github.com/masa-finance/masa-oracle/pkg/pubsub" +) + +var _ = Describe("NodeData", func() { + Describe("UpdateTwitterFields", func() { + It("should correctly update Twitter fields", func() { + initialData := pubsub.NodeData{ + ReturnedTweets: 10, + TweetTimeouts: 2, + NotFoundCount: 1, + } + + updates := pubsub.NodeData{ + ReturnedTweets: 5, + LastReturnedTweet: time.Now(), + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + } + + initialData.UpdateTwitterFields(updates) + + Expect(initialData.ReturnedTweets).To(Equal(15)) + Expect(initialData.TweetTimeouts).To(Equal(3)) + Expect(initialData.NotFoundCount).To(Equal(2)) + Expect(initialData.LastReturnedTweet.IsZero()).To(BeFalse()) + Expect(initialData.LastTweetTimeout.IsZero()).To(BeFalse()) + Expect(initialData.LastNotFoundTime.IsZero()).To(BeFalse()) + }) + }) +}) diff --git a/pkg/tests/node_data/node_data_tracker_test.go b/pkg/tests/node_data/node_data_tracker_test.go new file mode 100644 index 00000000..e43afbfb --- /dev/null +++ b/pkg/tests/node_data/node_data_tracker_test.go @@ -0,0 +1,60 @@ +package node_data + +import ( + "context" + "time" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + + . "github.com/masa-finance/masa-oracle/node" + "github.com/masa-finance/masa-oracle/pkg/pubsub" +) + +var _ = Describe("NodeDataTracker", func() { + Context("UpdateNodeDataTwitter", func() { + It("should correctly update NodeData Twitter fields", func() { + testNode, err := NewOracleNode( + context.Background(), + EnableStaked, + EnableRandomIdentity, + ) + Expect(err).NotTo(HaveOccurred()) + + err = testNode.Start() + Expect(err).NotTo(HaveOccurred()) + + initialData := pubsub.NodeData{ + PeerId: testNode.Host.ID(), + LastReturnedTweet: time.Now().Add(-1 * time.Hour), + ReturnedTweets: 10, + TweetTimeout: true, + TweetTimeouts: 2, + LastTweetTimeout: time.Now().Add(-1 * time.Hour), + LastNotFoundTime: time.Now().Add(-1 * time.Hour), + NotFoundCount: 1, + } + + err = testNode.NodeTracker.UpdateNodeDataTwitter(testNode.Host.ID().String(), initialData) + Expect(err).NotTo(HaveOccurred()) + + updates := pubsub.NodeData{ + ReturnedTweets: 5, + LastReturnedTweet: time.Now(), + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + } + + err = testNode.NodeTracker.UpdateNodeDataTwitter(testNode.Host.ID().String(), updates) + Expect(err).NotTo(HaveOccurred()) + + updatedData := testNode.NodeTracker.GetNodeData(testNode.Host.ID().String()) + Expect(updatedData.ReturnedTweets).To(Equal(15)) + Expect(updatedData.TweetTimeouts).To(Equal(3)) + Expect(updatedData.NotFoundCount).To(Equal(2)) + }) + }) +}) diff --git a/pkg/tests/scrapers/twitter_scraper_test.go b/pkg/tests/scrapers/twitter_scraper_test.go index e2b61bdc..96a1c45f 100644 --- a/pkg/tests/scrapers/twitter_scraper_test.go +++ b/pkg/tests/scrapers/twitter_scraper_test.go @@ -60,14 +60,11 @@ var _ = Describe("Twitter Auth Function", func() { }) authenticate := func() *twitterscraper.Scraper { - return twitter.Auth() + return nil + //return twitter.Auth() } - checkLoggedIn := func(scraper *twitterscraper.Scraper) bool { - return twitter.IsLoggedIn(scraper) - } - - It("authenticates and logs in successfully", func() { + PIt("authenticates and logs in successfully", func() { // Ensure cookie file doesn't exist before authentication cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") Expect(cookieFile).NotTo(BeAnExistingFile()) @@ -80,7 +77,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(cookieFile).To(BeAnExistingFile()) // Verify logged in state - Expect(checkLoggedIn(scraper)).To(BeTrue()) + Expect(scraper.IsLoggedIn()).To(BeTrue()) // Attempt a simple operation to verify the session is valid profile, err := twitter.ScrapeTweetsProfile("twitter") @@ -90,7 +87,7 @@ var _ = Describe("Twitter Auth Function", func() { logrus.Info("Authenticated and logged in to Twitter successfully") }) - It("reuses session from cookies", func() { + PIt("reuses session from cookies", func() { // First authentication firstScraper := authenticate() Expect(firstScraper).NotTo(BeNil()) @@ -107,7 +104,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(secondScraper).NotTo(BeNil()) // Verify logged in state - Expect(checkLoggedIn(secondScraper)).To(BeTrue()) + Expect(secondScraper.IsLoggedIn()).To(BeTrue()) // Attempt a simple operation to verify the session is valid profile, err := twitter.ScrapeTweetsProfile("twitter") @@ -117,7 +114,7 @@ var _ = Describe("Twitter Auth Function", func() { logrus.Info("Reused session from cookies successfully") }) - It("scrapes the profile of 'god' and recent #Bitcoin tweets using saved cookies", func() { + PIt("scrapes the profile of 'god' and recent #Bitcoin tweets using saved cookies", func() { // First authentication firstScraper := authenticate() Expect(firstScraper).NotTo(BeNil()) @@ -134,7 +131,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(secondScraper).NotTo(BeNil()) // Verify logged in state - Expect(twitter.IsLoggedIn(secondScraper)).To(BeTrue()) + Expect(secondScraper.IsLoggedIn()).To(BeTrue()) // Attempt to scrape profile profile, err := twitter.ScrapeTweetsProfile("god") diff --git a/pkg/tests/twitter/twitter_scraper_test.go b/pkg/tests/twitter/twitter_scraper_test.go index d4485ff1..ee251b4b 100644 --- a/pkg/tests/twitter/twitter_scraper_test.go +++ b/pkg/tests/twitter/twitter_scraper_test.go @@ -53,14 +53,11 @@ var _ = Describe("Twitter Auth Function", func() { }) authenticate := func() *twitterscraper.Scraper { - return twitter.Auth() + return nil + //return twitter.Auth() } - checkLoggedIn := func(scraper *twitterscraper.Scraper) bool { - return twitter.IsLoggedIn(scraper) - } - - It("authenticates and logs in successfully", func() { + PIt("authenticates and logs in successfully", func() { // Ensure cookie file doesn't exist before authentication cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") Expect(cookieFile).NotTo(BeAnExistingFile()) @@ -73,7 +70,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(cookieFile).To(BeAnExistingFile()) // Verify logged in state - Expect(checkLoggedIn(scraper)).To(BeTrue()) + Expect(scraper.IsLoggedIn()).To(BeTrue()) // Attempt a simple operation to verify the session is valid profile, err := twitter.ScrapeTweetsProfile("twitter") @@ -83,7 +80,7 @@ var _ = Describe("Twitter Auth Function", func() { logrus.Info("Authenticated and logged in to Twitter successfully") }) - It("reuses session from cookies", func() { + PIt("reuses session from cookies", func() { // First authentication firstScraper := authenticate() Expect(firstScraper).NotTo(BeNil()) @@ -100,7 +97,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(secondScraper).NotTo(BeNil()) // Verify logged in state - Expect(checkLoggedIn(secondScraper)).To(BeTrue()) + Expect(secondScraper.IsLoggedIn()).To(BeTrue()) // Attempt a simple operation to verify the session is valid profile, err := twitter.ScrapeTweetsProfile("twitter") @@ -110,7 +107,7 @@ var _ = Describe("Twitter Auth Function", func() { logrus.Info("Reused session from cookies successfully") }) - It("scrapes the profile of 'god' and recent #Bitcoin tweets using saved cookies", func() { + PIt("scrapes the profile of 'god' and recent #Bitcoin tweets using saved cookies", func() { // First authentication firstScraper := authenticate() Expect(firstScraper).NotTo(BeNil()) @@ -127,7 +124,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(secondScraper).NotTo(BeNil()) // Verify logged in state - Expect(twitter.IsLoggedIn(secondScraper)).To(BeTrue()) + Expect(secondScraper.IsLoggedIn()).To(BeTrue()) // Attempt to scrape profile profile, err := twitter.ScrapeTweetsProfile("god") diff --git a/pkg/workers/config.go b/pkg/workers/config.go index 03c38957..70a34bd0 100644 --- a/pkg/workers/config.go +++ b/pkg/workers/config.go @@ -17,13 +17,13 @@ type WorkerConfig struct { } var DefaultConfig = WorkerConfig{ - WorkerTimeout: 55 * time.Second, - WorkerResponseTimeout: 45 * time.Second, - ConnectionTimeout: 10 * time.Second, + WorkerTimeout: 45 * time.Second, + WorkerResponseTimeout: 35 * time.Second, + ConnectionTimeout: 500 * time.Millisecond, MaxRetries: 1, MaxSpawnAttempts: 1, WorkerBufferSize: 100, - MaxRemoteWorkers: 10, + MaxRemoteWorkers: 25, } var workerConfig *WorkerConfig diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index e7558b3a..e48b320a 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "math/rand" "strings" "sync" "time" @@ -17,6 +18,7 @@ import ( "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/event" + "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/masa-finance/masa-oracle/pkg/workers/handlers" data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) @@ -97,11 +99,25 @@ func (whm *WorkHandlerManager) GetWorkHandler(wType data_types.WorkerType) (Work func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest data_types.WorkRequest) (response data_types.WorkResponse) { category := data_types.WorkerTypeToCategory(workRequest.WorkType) - remoteWorkers, localWorker := GetEligibleWorkers(node, category) + var remoteWorkers []data_types.Worker + var localWorker *data_types.Worker + + if category == pubsub.CategoryTwitter { + // Use priority-based selection for Twitter work + remoteWorkers, localWorker = GetEligibleWorkers(node, category, workerConfig.MaxRemoteWorkers) + logrus.Info("Starting priority-based worker selection for Twitter work") + } else { + // Use existing selection for other work types + remoteWorkers, localWorker = GetEligibleWorkers(node, category, 0) + // Shuffle the workers to maintain round-robin behavior + rand.Shuffle(len(remoteWorkers), func(i, j int) { + remoteWorkers[i], remoteWorkers[j] = remoteWorkers[j], remoteWorkers[i] + }) + logrus.Info("Starting round-robin worker selection for non-Twitter work") + } remoteWorkersAttempted := 0 var errorList []string - logrus.Info("Starting round-robin worker selection") // Try remote workers first, up to MaxRemoteWorkers for _, worker := range remoteWorkers { @@ -115,6 +131,15 @@ func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest peerInfo, err := node.DHT.FindPeer(context.Background(), worker.NodeData.PeerId) if err != nil { logrus.Warnf("Failed to find peer %s in DHT: %v", worker.NodeData.PeerId.String(), err) + if category == pubsub.CategoryTwitter { + err := node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + LastNotFoundTime: time.Now(), + NotFoundCount: 1, + }) + if err != nil { + logrus.Warnf("Failed to update node data for peer %s: %v", worker.NodeData.PeerId.String(), err) + } + } continue } @@ -131,6 +156,9 @@ func (whm *WorkHandlerManager) DistributeWork(node *node.OracleNode, workRequest logrus.Infof("Attempting remote worker %s (attempt %d/%d)", worker.NodeData.PeerId, remoteWorkersAttempted, workerConfig.MaxRemoteWorkers) response = whm.sendWorkToWorker(node, worker, workRequest) if response.Error != "" { + errorMsg := fmt.Sprintf("Worker %s: %s", worker.NodeData.PeerId, response.Error) + errorList = append(errorList, errorMsg) + whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String()) logrus.Errorf("error sending work to worker: %s: %s", response.WorkerPeerId, response.Error) logrus.Infof("Remote worker %s failed, moving to next worker", worker.NodeData.PeerId) @@ -241,6 +269,24 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *node.OracleNode, worker da response.Error = fmt.Sprintf("error unmarshaling response: %v", err) return } + // Update metrics only if the work category is Twitter + if data_types.WorkerTypeToCategory(workRequest.WorkType) == pubsub.CategoryTwitter { + if response.Error == "" { + err = node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + ReturnedTweets: response.RecordCount, + LastReturnedTweet: time.Now(), + }) + } else { + err = node.NodeTracker.UpdateNodeDataTwitter(worker.NodeData.PeerId.String(), pubsub.NodeData{ + TweetTimeout: true, + TweetTimeouts: 1, + LastTweetTimeout: time.Now(), + }) + } + if err != nil { + logrus.Warnf("Failed to update node data for peer %s: %v", worker.NodeData.PeerId.String(), err) + } + } } return response } diff --git a/pkg/workers/worker_selection.go b/pkg/workers/worker_selection.go index 25b2e468..fd05954b 100644 --- a/pkg/workers/worker_selection.go +++ b/pkg/workers/worker_selection.go @@ -1,7 +1,8 @@ package workers import ( - "math/rand/v2" + "math/rand" + "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/sirupsen/logrus" @@ -11,18 +12,46 @@ import ( data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) -// GetEligibleWorkers Uses the new NodeTracker method to get the eligible workers for a given message type -// I'm leaving this returning an array so that we can easily increase the number of workers in the future -func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory) ([]data_types.Worker, *data_types.Worker) { - workers := make([]data_types.Worker, 0) +// GetEligibleWorkers returns eligible workers for a given message type. +// For Twitter workers, it uses a balanced approach between high-performing workers and fair distribution. +// For other worker types, it returns all eligible workers without modification. +func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory, limit int) ([]data_types.Worker, *data_types.Worker) { nodes := node.NodeTracker.GetEligibleWorkerNodes(category) - var localWorker *data_types.Worker - rand.Shuffle(len(nodes), func(i, j int) { - nodes[i], nodes[j] = nodes[j], nodes[i] + logrus.Infof("Getting eligible workers for category: %s", category) + + if category == pubsub.CategoryTwitter { + return getTwitterWorkers(node, nodes, limit) + } + + // For non-Twitter categories, return all eligible workers without modification + return getAllWorkers(node, nodes, limit) +} + +// getTwitterWorkers selects and shuffles a pool of top-performing Twitter workers +func getTwitterWorkers(node *node.OracleNode, nodes []pubsub.NodeData, limit int) ([]data_types.Worker, *data_types.Worker) { + poolSize := calculatePoolSize(len(nodes), limit) + topPerformers := nodes[:poolSize] + + // Shuffle the top performers + rand.Seed(time.Now().UnixNano()) + rand.Shuffle(len(topPerformers), func(i, j int) { + topPerformers[i], topPerformers[j] = topPerformers[j], topPerformers[i] }) - logrus.Info("Getting eligible workers") + return createWorkerList(node, topPerformers, limit) +} + +// getAllWorkers returns all eligible workers for non-Twitter categories +func getAllWorkers(node *node.OracleNode, nodes []pubsub.NodeData, limit int) ([]data_types.Worker, *data_types.Worker) { + return createWorkerList(node, nodes, limit) +} + +// createWorkerList creates a list of workers from the given nodes, respecting the limit +func createWorkerList(node *node.OracleNode, nodes []pubsub.NodeData, limit int) ([]data_types.Worker, *data_types.Worker) { + workers := make([]data_types.Worker, 0, limit) + var localWorker *data_types.Worker + for _, eligible := range nodes { if eligible.PeerId.String() == node.Host.ID().String() { localAddrInfo := peer.AddrInfo{ @@ -33,8 +62,38 @@ func GetEligibleWorkers(node *node.OracleNode, category pubsub.WorkerCategory) ( continue } workers = append(workers, data_types.Worker{IsLocal: false, NodeData: eligible}) + + // Apply limit if specified + if limit > 0 && len(workers) >= limit { + break + } } logrus.Infof("Found %d eligible remote workers", len(workers)) return workers, localWorker } + +// calculatePoolSize determines the size of the top performers pool for Twitter workers +func calculatePoolSize(totalNodes, limit int) int { + if limit <= 0 { + return totalNodes // If no limit, consider all nodes + } + // Use the larger of 5, double the limit, or 20% of total nodes + poolSize := max(5, limit*2) + poolSize = max(poolSize, totalNodes/5) + return min(poolSize, totalNodes) +} + +func max(a, b int) int { + if a > b { + return a + } + return b +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/twitter_cookies.example.json b/twitter_cookies.example.json index b573bf73..b421c7f0 100644 --- a/twitter_cookies.example.json +++ b/twitter_cookies.example.json @@ -1,19 +1,3 @@ - // How to obtain this information from your browser: - // 1. Log in to Twitter in your web browser - // 2. Open the browser's developer tools (usually F12 or right-click > Inspect) - // 3. Go to the "Application" or "Storage" tab - // 4. In the left sidebar, expand "Cookies" and click on "https://twitter.com" - // 5. Look for the cookie names listed above and copy their values - // 6. Replace the "X" placeholders in the "Value" field with the actual values - // 7. Save the file as "twitter_cookies.json" (remove ".example" from the filename) - - // Note: Most browsers only show the Name, Value, Domain, and Path fields in the developer tools. - // The other fields (Expires, MaxAge, Secure, HttpOnly, SameSite) may not be visible or editable. - // You can leave these fields as they are in this template. - - // IMPORTANT: Be extremely cautious with your auth_token and other sensitive cookies. - // Never share them publicly or commit them to version control. - [ { "Name": "personalization_id", @@ -84,5 +68,19 @@ "SameSite": 0, "Raw": "", "Unparsed": null + }, + { + "Name": "att", + "Value": "XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX", + "Path": "", + "Domain": "twitter.com", + "Expires": "0001-01-01T00:00:00Z", + "RawExpires": "", + "MaxAge": 0, + "Secure": false, + "HttpOnly": false, + "SameSite": 0, + "Raw": "", + "Unparsed": null } - ] \ No newline at end of file +]