From 419e224b73106fa8458cd9de1dd44614a5a11e37 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Thu, 3 Oct 2024 18:43:01 -0700 Subject: [PATCH] feat(twitter): Scraper Enhancements Account Rotation and Rate Limit Handling (#576) * chore: cleanup and delete old tests * chore: delete old tests * refactor(twitter): remove sentiment and trends handlers - Delete TwitterSentimentHandler and TwitterTrendsHandler structs - Remove corresponding HandleWork functions for sentiment and trends - Update WorkerType constants and related functions to exclude sentiment and trends - Adjust WorkHandlerManager initialization to remove sentiment and trends handlers * refactor(twitter): export Auth function, update scrapers, and remove objx dependency - **Exported Auth Function:** - Renamed `auth` to `Auth` in `tweets.go` to make it publicly accessible. - Updated all scraper files (e.g., `followers.go`, `tweets.go`) to use the exported `Auth` function. - **Removed Unused Dependency:** - Eliminated `github.com/stretchr/objx` from `go.mod` as it was no longer needed. - **Optimized Sleep Durations:** - Reduced sleep durations in the `Auth` function from `500ms` to `100ms` for better performance. - **Cleaned Up Codebase:** - Removed obsolete sentiment analysis code from `tweets.go` to streamline the codebase. - **Enhanced Test Configuration:** - Fixed environment variable loading in `twitter_auth_test.go` by ensuring `.env` is correctly loaded via `scrapers_suite_test.go`. - Added and updated tests in `twitter_auth_test.go` and `scrapers_suite_test.go` to validate Twitter authentication and session reuse. * chore: delete scrape tweets by trends (deprecated) * feat(tests): enhance Twitter auth and scraping tests This commit improves the Twitter authentication and scraping tests in the pkg/tests/scrapers/twitter_auth_test.go file. The changes include: - Add godotenv package to load environment variables - Implement a loadEnv function to handle .env file loading - Enhance "authenticates and logs in successfully" test: - Verify cookie file doesn't exist before authentication - Check cookie file creation after authentication - Perform a simple profile scrape to validate the session - Improve "reuses session from cookies" test: - Verify cookie file creation - Force cookie reuse by clearing the first scraper - Validate the reused session with a profile scrape - Add new test "scrapes the profile of 'god' and recent #Bitcoin tweets using saved cookies": - Authenticate twice to ensure cookie reuse - Scrape the profile of user 'god' - Fetch and verify the last 3 tweets containing #Bitcoin - Log scraped data for manual inspection These changes provide more robust testing of the Twitter authentication process, session reuse, and scraping functionality, ensuring better coverage and reliability of the Twitter-related features. * chore: rename files * chore: add godoc dev notes * feat(twitter): implement account rotation and rate limit handling This commit introduces significant improvements to the Twitter scraping functionality: 1. Account Management: - Add TwitterAccount struct to represent individual Twitter accounts - Implement TwitterAccountManager for managing multiple accounts - Create functions for account rotation and rate limit tracking 2. Authentication: - Refactor Auth function to use account rotation - Implement cookie-based session management for each account - Add retry logic for authentication failures 3. Scraping Functions: - Update ScrapeTweetsByQuery and ScrapeTweetsProfile to use account rotation - Implement rate limit detection and account switching - Add retry mechanisms for failed operations 4. Configuration: - Move from hardcoded credentials to .env file-based configuration - Implement loadAccountsFromConfig to read multiple accounts from .env 5. Error Handling: - Improve error logging and handling throughout the package - Add specific handling for rate limit errors 6. Performance: - Implement concurrent scraping with multiple accounts - Add delays between requests to avoid aggressive rate limiting These changes significantly enhance the robustness and efficiency of the Twitter scraping functionality, allowing for better handling of rate limits and improved reliability through account rotation. * feat(twitter): centralize configuration management - Extract sleep time configuration into TwitterConfig struct in config.go - Update Auth function in auth.go to accept TwitterConfig parameter - Remove hardcoded sleep time values from auth.go This change improves modularity and flexibility by centralizing configuration management for the Twitter scraper. It allows for easier modification of sleep times and future expansion of configuration options without altering the core authentication logic. BREAKING CHANGE: Auth function now requires a TwitterConfig parameter. Callers must create a TwitterConfig instance using NewTwitterConfig() before invoking Auth. * Revert "feat(twitter): centralize configuration management" This reverts commit 7ad1bfea4bd0482afbe4e4200a168fac9a40c2a2. * refactor(twitter): move sleep configs to config.go and fix import cycle - Extracted sleep configurations from `auth.go` into `config.go`. - Defined `ShortSleepDuration` and `RateLimitDuration` constants. - Created `ShortSleep()` and `GetRateLimitDuration()` functions. - Updated `auth.go` to use the new config functions. This improves modularity by separating concerns and adheres to idiomatic Go practices. * refactor(twitter): replace Auth with NewScraper in followers and tweets - Updated `followers.go` and `tweets.go` to replace calls to `Auth` with `NewScraper`. - Resolved `undefined: Auth` errors due to previous refactoring. - Ensured all functionality and error handling remains consistent. - Improved codebase consistency following the constructor pattern. This completes the refactoring of the scraper creation process, enhancing code readability and maintainability. * fix(twitter): resolve type incompatibility errors after Scraper refactoring - Updated function signatures and variable types in `tweets.go` and `followers.go` to use the custom `*Scraper` type. - Adjusted return statements and method calls to match the new `Scraper` type. - Fixed `IncompatibleAssign` errors by ensuring consistency across all files. - Ensured all methods utilize the embedded `*twitterscraper.Scraper` methods through the custom `Scraper` type. This change finalizes the refactoring, ensuring all components work together seamlessly and conform to idiomatic Go practices. * refactor(twitter): move retry logic to config.go - Extract generic Retry function to config.go - Remove specific retry functions from tweets.go - Update ScrapeTweetsByQuery and ScrapeTweetsProfile to use new Retry function - Move MaxRetries constant to config.go * refactor(twitter): modularize scraper components and improve error handling - Move account management logic from auth.go to a separate file - Centralize authentication and rate limit handling in common functions - Simplify ScrapeFollowersForProfile and ScrapeTweetsByQuery using Retry - Remove duplicate code and unnecessary initializations - Increase WorkerResponseTimeout from 30 to 45 seconds in DefaultConfig --- go.mod | 1 - go.sum | 1 - pkg/scrapers/twitter/account.go | 45 +++ pkg/scrapers/twitter/auth.go | 72 ++--- pkg/scrapers/twitter/common.go | 85 ++++++ pkg/scrapers/twitter/config.go | 35 +++ pkg/scrapers/twitter/cookies.go | 27 +- pkg/scrapers/twitter/followers.go | 40 +-- pkg/scrapers/twitter/profile.go | 23 ++ pkg/scrapers/twitter/scraper.go | 17 ++ pkg/scrapers/twitter/tweets.go | 213 ++------------ pkg/tests/api_test.go | 25 -- pkg/tests/auth_test.go | 56 ---- pkg/tests/chain_test.go | 18 -- pkg/tests/scrape_test.go | 317 --------------------- pkg/tests/scrapers/scrapers_suite_test.go | 55 ++++ pkg/tests/scrapers/twitter_scraper_test.go | 158 ++++++++++ pkg/workers/config.go | 2 +- pkg/workers/handlers/twitter.go | 29 -- pkg/workers/types/work_types.go | 6 +- pkg/workers/worker_manager.go | 2 - tests/node1/private.key | Bin 1197 -> 0 bytes tests/node2/private.key | Bin 1195 -> 0 bytes tests/node_listener.go | 156 ---------- 24 files changed, 502 insertions(+), 881 deletions(-) create mode 100644 pkg/scrapers/twitter/account.go create mode 100644 pkg/scrapers/twitter/common.go create mode 100644 pkg/scrapers/twitter/config.go create mode 100644 pkg/scrapers/twitter/profile.go create mode 100644 pkg/scrapers/twitter/scraper.go delete mode 100644 pkg/tests/api_test.go delete mode 100644 pkg/tests/auth_test.go delete mode 100644 pkg/tests/chain_test.go delete mode 100644 pkg/tests/scrape_test.go create mode 100644 pkg/tests/scrapers/scrapers_suite_test.go create mode 100644 pkg/tests/scrapers/twitter_scraper_test.go delete mode 100644 tests/node1/private.key delete mode 100644 tests/node2/private.key delete mode 100644 tests/node_listener.go diff --git a/go.mod b/go.mod index 5f20b64c..f1ee6ac3 100644 --- a/go.mod +++ b/go.mod @@ -218,7 +218,6 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect - github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/supranational/blst v0.3.13 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect diff --git a/go.sum b/go.sum index 52acedb4..fb72a48b 100644 --- a/go.sum +++ b/go.sum @@ -713,7 +713,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/pkg/scrapers/twitter/account.go b/pkg/scrapers/twitter/account.go new file mode 100644 index 00000000..519e3893 --- /dev/null +++ b/pkg/scrapers/twitter/account.go @@ -0,0 +1,45 @@ +package twitter + +import ( + "sync" + "time" +) + +type TwitterAccount struct { + Username string + Password string + TwoFACode string + RateLimitedUntil time.Time +} + +type TwitterAccountManager struct { + accounts []*TwitterAccount + index int + mutex sync.Mutex +} + +func NewTwitterAccountManager(accounts []*TwitterAccount) *TwitterAccountManager { + return &TwitterAccountManager{ + accounts: accounts, + index: 0, + } +} + +func (manager *TwitterAccountManager) GetNextAccount() *TwitterAccount { + manager.mutex.Lock() + defer manager.mutex.Unlock() + for i := 0; i < len(manager.accounts); i++ { + account := manager.accounts[manager.index] + manager.index = (manager.index + 1) % len(manager.accounts) + if time.Now().After(account.RateLimitedUntil) { + return account + } + } + return nil +} + +func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAccount) { + manager.mutex.Lock() + defer manager.mutex.Unlock() + account.RateLimitedUntil = time.Now().Add(GetRateLimitDuration()) +} diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index e2064e53..a6be7767 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -3,49 +3,53 @@ package twitter import ( "fmt" - twitterscraper "github.com/masa-finance/masa-twitter-scraper" + "github.com/sirupsen/logrus" ) -// Login attempts to log in to the Twitter scraper service. -// It supports three modes of operation: -// 1. Basic login using just a username and password. -// 2. Login requiring an email confirmation, using a username, password, and email address. -// 3. Login with two-factor authentication, using a username, password, and 2FA code. -// Parameters: -// - scraper: A pointer to an instance of the twitterscraper.Scraper. -// - credentials: A variadic list of strings representing login credentials. -// The function expects either two strings (username, password) for basic login, -// or three strings (username, password, email/2FA code) for email confirmation or 2FA. -// -// Returns an error if login fails or if an invalid number of credentials is provided. -func Login(scraper *twitterscraper.Scraper, credentials ...string) error { +func NewScraper(account *TwitterAccount, cookieDir string) *Scraper { + scraper := &Scraper{Scraper: newTwitterScraper()} + + if err := LoadCookies(scraper.Scraper, account, cookieDir); err == nil { + logrus.Debugf("Cookies loaded for user %s.", account.Username) + if scraper.IsLoggedIn() { + logrus.Debugf("Already logged in as %s.", account.Username) + return scraper + } + } + + ShortSleep() + + if err := scraper.Login(account.Username, account.Password, account.TwoFACode); err != nil { + logrus.WithError(err).Warnf("Login failed for %s", account.Username) + return nil + } + + ShortSleep() + + if err := SaveCookies(scraper.Scraper, account, cookieDir); err != nil { + logrus.WithError(err).Errorf("Failed to save cookies for %s", account.Username) + } + + logrus.Debugf("Login successful for %s", account.Username) + return scraper +} + +func (scraper *Scraper) Login(username, password string, twoFACode ...string) error { var err error - switch len(credentials) { - case 2: - // Basic login with username and password. - err = scraper.Login(credentials[0], credentials[1]) - case 3: - // The third parameter is used for either email confirmation or a 2FA code. - // This design assumes the Twitter scraper's Login method can contextually handle both cases. - err = scraper.Login(credentials[0], credentials[1], credentials[2]) - default: - // Return an error if the number of provided credentials is neither 2 nor 3. - return fmt.Errorf("invalid number of login credentials provided") + if len(twoFACode) > 0 { + err = scraper.Scraper.Login(username, password, twoFACode[0]) + } else { + err = scraper.Scraper.Login(username, password) } if err != nil { - return fmt.Errorf("%v", err) + return fmt.Errorf("login failed: %v", err) } return nil } -func IsLoggedIn(scraper *twitterscraper.Scraper) bool { - return scraper.IsLoggedIn() -} - -func Logout(scraper *twitterscraper.Scraper) error { - err := scraper.Logout() - if err != nil { - return fmt.Errorf("[-] Logout failed: %v", err) +func (scraper *Scraper) Logout() error { + if err := scraper.Scraper.Logout(); err != nil { + return fmt.Errorf("logout failed: %v", err) } return nil } diff --git a/pkg/scrapers/twitter/common.go b/pkg/scrapers/twitter/common.go new file mode 100644 index 00000000..008c7aed --- /dev/null +++ b/pkg/scrapers/twitter/common.go @@ -0,0 +1,85 @@ +package twitter + +import ( + "fmt" + "os" + "strings" + "sync" + + "github.com/joho/godotenv" + "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/sirupsen/logrus" +) + +var ( + accountManager *TwitterAccountManager + once sync.Once +) + +func initializeAccountManager() { + accounts := loadAccountsFromConfig() + accountManager = NewTwitterAccountManager(accounts) +} + +func loadAccountsFromConfig() []*TwitterAccount { + err := godotenv.Load() + if err != nil { + logrus.Fatalf("error loading .env file: %v", err) + } + + accountsEnv := os.Getenv("TWITTER_ACCOUNTS") + if accountsEnv == "" { + logrus.Fatal("TWITTER_ACCOUNTS not set in .env file") + } + + return parseAccounts(strings.Split(accountsEnv, ",")) +} + +func parseAccounts(accountPairs []string) []*TwitterAccount { + return filterMap(accountPairs, func(pair string) (*TwitterAccount, bool) { + credentials := strings.Split(pair, ":") + if len(credentials) != 2 { + logrus.Warnf("invalid account credentials: %s", pair) + return nil, false + } + return &TwitterAccount{ + Username: strings.TrimSpace(credentials[0]), + Password: strings.TrimSpace(credentials[1]), + }, true + }) +} + +func getAuthenticatedScraper() (*Scraper, *TwitterAccount, error) { + once.Do(initializeAccountManager) + baseDir := config.GetInstance().MasaDir + + account := accountManager.GetNextAccount() + if account == nil { + return nil, nil, fmt.Errorf("all accounts are rate-limited") + } + scraper := NewScraper(account, baseDir) + if scraper == nil { + logrus.Errorf("Authentication failed for %s", account.Username) + return nil, account, fmt.Errorf("Twitter authentication failed for %s", account.Username) + } + return scraper, account, nil +} + +func handleRateLimit(err error, account *TwitterAccount) bool { + if strings.Contains(err.Error(), "Rate limit exceeded") { + accountManager.MarkAccountRateLimited(account) + logrus.Warnf("rate limited: %s", account.Username) + return true + } + return false +} + +func filterMap[T any, R any](slice []T, f func(T) (R, bool)) []R { + result := make([]R, 0, len(slice)) + for _, v := range slice { + if r, ok := f(v); ok { + result = append(result, r) + } + } + return result +} diff --git a/pkg/scrapers/twitter/config.go b/pkg/scrapers/twitter/config.go new file mode 100644 index 00000000..6e7e6433 --- /dev/null +++ b/pkg/scrapers/twitter/config.go @@ -0,0 +1,35 @@ +package twitter + +import ( + "fmt" + "time" + + "github.com/sirupsen/logrus" +) + +const ( + ShortSleepDuration = 20 * time.Millisecond + RateLimitDuration = time.Hour + MaxRetries = 3 +) + +func ShortSleep() { + time.Sleep(ShortSleepDuration) +} + +func GetRateLimitDuration() time.Duration { + return RateLimitDuration +} + +func Retry[T any](operation func() (T, error), maxAttempts int) (T, error) { + var zero T + for attempt := 1; attempt <= maxAttempts; attempt++ { + result, err := operation() + if err == nil { + return result, nil + } + logrus.Errorf("retry attempt %d failed: %v", attempt, err) + time.Sleep(time.Duration(attempt) * time.Second) + } + return zero, fmt.Errorf("operation failed after %d attempts", maxAttempts) +} diff --git a/pkg/scrapers/twitter/cookies.go b/pkg/scrapers/twitter/cookies.go index aef042e0..467e8686 100644 --- a/pkg/scrapers/twitter/cookies.go +++ b/pkg/scrapers/twitter/cookies.go @@ -5,37 +5,32 @@ import ( "fmt" "net/http" "os" + "path/filepath" twitterscraper "github.com/masa-finance/masa-twitter-scraper" ) -func SaveCookies(scraper *twitterscraper.Scraper, filePath string) error { +func SaveCookies(scraper *twitterscraper.Scraper, account *TwitterAccount, baseDir string) error { + cookieFile := filepath.Join(baseDir, fmt.Sprintf("%s_twitter_cookies.json", account.Username)) cookies := scraper.GetCookies() - js, err := json.Marshal(cookies) + data, err := json.Marshal(cookies) if err != nil { return fmt.Errorf("error marshaling cookies: %v", err) } - err = os.WriteFile(filePath, js, 0644) - if err != nil { - return fmt.Errorf("error saving cookies to file: %v", err) - } - - // Load the saved cookies back into the scraper - if err := LoadCookies(scraper, filePath); err != nil { - return fmt.Errorf("error loading saved cookies: %v", err) + if err = os.WriteFile(cookieFile, data, 0644); err != nil { + return fmt.Errorf("error saving cookies: %v", err) } - return nil } -func LoadCookies(scraper *twitterscraper.Scraper, filePath string) error { - js, err := os.ReadFile(filePath) +func LoadCookies(scraper *twitterscraper.Scraper, account *TwitterAccount, baseDir string) error { + cookieFile := filepath.Join(baseDir, fmt.Sprintf("%s_twitter_cookies.json", account.Username)) + data, err := os.ReadFile(cookieFile) if err != nil { - return fmt.Errorf("error reading cookies from file: %v", err) + return fmt.Errorf("error reading cookies: %v", err) } var cookies []*http.Cookie - err = json.Unmarshal(js, &cookies) - if err != nil { + if err = json.Unmarshal(data, &cookies); err != nil { return fmt.Errorf("error unmarshaling cookies: %v", err) } scraper.SetCookies(cookies) diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index 1d54c554..18746b55 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -1,38 +1,28 @@ package twitter import ( - "encoding/json" "fmt" - _ "github.com/lib/pq" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) -// ScrapeFollowersForProfile scrapes the profile and tweets of a specific Twitter user. -// It takes the username as a parameter and returns the scraped profile information and an error if any. func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Legacy, error) { - scraper := auth() + return Retry(func() ([]twitterscraper.Legacy, error) { + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return nil, err + } - if scraper == nil { - return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") - } + followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") + if errString != "" { + if handleRateLimit(fmt.Errorf(errString), account) { + return nil, fmt.Errorf("rate limited") + } + logrus.Errorf("Error fetching followers: %v", errString) + return nil, fmt.Errorf("%v", errString) + } - followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") - if errString != "" { - logrus.Printf("Error fetching profile: %v", errString) - return nil, fmt.Errorf("%v", errString) - } - - // Marshal the followingResponse into a JSON string for logging - responseJSON, err := json.Marshal(followingResponse) - if err != nil { - // Log the error if the marshaling fails - logrus.Errorf("[-] Error marshaling followingResponse: %v", err) - } else { - // Log the JSON string of followingResponse - logrus.Debugf("Following response: %s", responseJSON) - } - - return followingResponse, nil + return followingResponse, nil + }, MaxRetries) } diff --git a/pkg/scrapers/twitter/profile.go b/pkg/scrapers/twitter/profile.go new file mode 100644 index 00000000..8337f186 --- /dev/null +++ b/pkg/scrapers/twitter/profile.go @@ -0,0 +1,23 @@ +package twitter + +import ( + twitterscraper "github.com/masa-finance/masa-twitter-scraper" +) + +func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { + return Retry(func() (twitterscraper.Profile, error) { + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return twitterscraper.Profile{}, err + } + + profile, err := scraper.GetProfile(username) + if err != nil { + if handleRateLimit(err, account) { + return twitterscraper.Profile{}, err + } + return twitterscraper.Profile{}, err + } + return profile, nil + }, MaxRetries) +} diff --git a/pkg/scrapers/twitter/scraper.go b/pkg/scrapers/twitter/scraper.go new file mode 100644 index 00000000..2ca66861 --- /dev/null +++ b/pkg/scrapers/twitter/scraper.go @@ -0,0 +1,17 @@ +package twitter + +import ( + twitterscraper "github.com/masa-finance/masa-twitter-scraper" +) + +type Scraper struct { + *twitterscraper.Scraper +} + +func newTwitterScraper() *twitterscraper.Scraper { + return twitterscraper.New() +} + +func (s *Scraper) IsLoggedIn() bool { + return s.Scraper.IsLoggedIn() +} diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index a4061095..2b5c229d 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -2,18 +2,8 @@ package twitter import ( "context" - "fmt" - "path/filepath" - "strings" - "time" - - _ "github.com/lib/pq" twitterscraper "github.com/masa-finance/masa-twitter-scraper" - "github.com/sirupsen/logrus" - - "github.com/masa-finance/masa-oracle/pkg/config" - "github.com/masa-finance/masa-oracle/pkg/llmbridge" ) type TweetResult struct { @@ -21,194 +11,25 @@ type TweetResult struct { Error error } -// auth initializes and returns a new Twitter scraper instance. It attempts to load cookies from a file to reuse an existing session. -// If no valid session is found, it performs a login with credentials specified in the application's configuration. -// On successful login, it saves the session cookies for future use. If the login fails, it returns nil. -func auth() *twitterscraper.Scraper { - scraper := twitterscraper.New() - appConfig := config.GetInstance() - cookieFilePath := filepath.Join(appConfig.MasaDir, "twitter_cookies.json") - - if err := LoadCookies(scraper, cookieFilePath); err == nil { - logrus.Debug("Cookies loaded successfully.") - if IsLoggedIn(scraper) { - logrus.Debug("Already logged in via cookies.") - return scraper - } - } - - username := appConfig.TwitterUsername - password := appConfig.TwitterPassword - twoFACode := appConfig.Twitter2FaCode - - time.Sleep(500 * time.Millisecond) - - var err error - if twoFACode != "" { - err = Login(scraper, username, password, twoFACode) - } else { - err = Login(scraper, username, password) - } - - if err != nil { - logrus.WithError(err).Warning("[-] Login failed") - return nil - } - - time.Sleep(500 * time.Millisecond) - - if err = SaveCookies(scraper, cookieFilePath); err != nil { - logrus.WithError(err).Error("[-] Failed to save cookies") - } - - logrus.WithFields(logrus.Fields{ - "auth": true, - "username": username, - }).Debug("Login successful") - - return scraper -} - -// ScrapeTweetsForSentiment is a function that scrapes tweets based on a given query, analyzes their sentiment using a specified model, and returns the sentiment analysis results. -// Parameters: -// - query: The search query string to find matching tweets. -// - count: The maximum number of tweets to retrieve and analyze. -// - model: The model to use for sentiment analysis. -// -// Returns: -// - A string representing the sentiment analysis prompt. -// - A string representing the sentiment analysis result. -// - An error if the scraping or sentiment analysis process encounters any issues. -func ScrapeTweetsForSentiment(query string, count int, model string) (string, string, error) { - scraper := auth() - var tweets []*TweetResult - - if scraper == nil { - return "", "", fmt.Errorf("there was an error authenticating with your Twitter credentials") - } - - // Set search mode - scraper.SetSearchMode(twitterscraper.SearchLatest) - - // Perform the search with the specified query and count - for tweetResult := range scraper.SearchTweets(context.Background(), query, count) { - var tweet TweetResult - if tweetResult.Error != nil { - tweet = TweetResult{ - Tweet: nil, - Error: tweetResult.Error, - } - } else { - tweet = TweetResult{ - Tweet: &tweetResult.Tweet, - Error: nil, - } - } - tweets = append(tweets, &tweet) - } - sentimentPrompt := "Please perform a sentiment analysis on the following tweets, using an unbiased approach. Sentiment analysis involves identifying and categorizing opinions expressed in text, particularly to determine whether the writer's attitude towards a particular topic, product, etc., is positive, negative, or neutral. After analyzing, please provide a summary of the overall sentiment expressed in these tweets, including the proportion of positive, negative, and neutral sentiments if applicable." - - twitterScraperTweets := make([]*twitterscraper.TweetResult, len(tweets)) - for i, tweet := range tweets { - twitterScraperTweets[i] = &twitterscraper.TweetResult{ - Tweet: *tweet.Tweet, - Error: tweet.Error, - } - } - prompt, sentiment, err := llmbridge.AnalyzeSentimentTweets(twitterScraperTweets, model, sentimentPrompt) - if err != nil { - return "", "", err - } - return prompt, sentiment, tweets[0].Error -} - -// ScrapeTweetsByQuery performs a search on Twitter for tweets matching the specified query. -// It fetches up to the specified count of tweets and returns a slice of Tweet pointers. -// Parameters: -// - query: The search query string to find matching tweets. -// - count: The maximum number of tweets to retrieve. -// -// Returns: -// - A slice of pointers to twitterscraper.Tweet objects that match the search query. -// - An error if the scraping process encounters any issues. func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { - scraper := auth() - var tweets []*TweetResult - var lastError error - - if scraper == nil { - return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") - } - - // Set search mode - scraper.SetSearchMode(twitterscraper.SearchLatest) - - // Perform the search with the specified query and count - for tweetResult := range scraper.SearchTweets(context.Background(), query, count) { - if tweetResult.Error != nil { - lastError = tweetResult.Error - logrus.Warnf("[+] Error encountered while scraping tweet: %v", tweetResult.Error) - if strings.Contains(tweetResult.Error.Error(), "Rate limit exceeded") { - return nil, fmt.Errorf("Twitter API rate limit exceeded (429 error)") - } - continue + return Retry(func() ([]*TweetResult, error) { + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return nil, err } - tweets = append(tweets, &TweetResult{Tweet: &tweetResult.Tweet, Error: nil}) - } - - if len(tweets) == 0 && lastError != nil { - return nil, lastError - } - - return tweets, nil -} - -// ScrapeTweetsByTrends scrapes the current trending topics on Twitter. -// It returns a slice of strings representing the trending topics. -// If an error occurs during the scraping process, it returns an error. -func ScrapeTweetsByTrends() ([]*TweetResult, error) { - scraper := auth() - var trendResults []*TweetResult - - if scraper == nil { - return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") - } - // Set search mode - scraper.SetSearchMode(twitterscraper.SearchLatest) - - trends, err := scraper.GetTrends() - if err != nil { - return nil, err - } - - for _, trend := range trends { - trendResult := &TweetResult{ - Tweet: &twitterscraper.Tweet{Text: trend}, - Error: nil, + var tweets []*TweetResult + ctx := context.Background() + scraper.SetSearchMode(twitterscraper.SearchLatest) + for tweet := range scraper.SearchTweets(ctx, query, count) { + if tweet.Error != nil { + if handleRateLimit(tweet.Error, account) { + return nil, tweet.Error + } + return nil, tweet.Error + } + tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) } - trendResults = append(trendResults, trendResult) - } - - return trendResults, trendResults[0].Error -} - -// ScrapeTweetsProfile scrapes the profile and tweets of a specific Twitter user. -// It takes the username as a parameter and returns the scraped profile information and an error if any. -func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { - scraper := auth() - - if scraper == nil { - return twitterscraper.Profile{}, fmt.Errorf("there was an error authenticating with your Twitter credentials") - } - - // Set search mode - scraper.SetSearchMode(twitterscraper.SearchLatest) - - profile, err := scraper.GetProfile(username) - if err != nil { - return twitterscraper.Profile{}, err - } - - return profile, nil + return tweets, nil + }, MaxRetries) } diff --git a/pkg/tests/api_test.go b/pkg/tests/api_test.go deleted file mode 100644 index 4784fd1a..00000000 --- a/pkg/tests/api_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package tests - -import ( - "testing" - - "github.com/masa-finance/masa-oracle/node" - "github.com/masa-finance/masa-oracle/pkg/api" - "github.com/masa-finance/masa-oracle/pkg/pubsub" - "github.com/masa-finance/masa-oracle/pkg/workers" -) - -func TestAPI(t *testing.T) { - // Create a new OracleNode instance - n := &node.OracleNode{} - whm := &workers.WorkHandlerManager{} - pubKeySub := &pubsub.PublicKeySubscriptionHandler{} - - // Initialize the API - api := api.NewAPI(n, whm, pubKeySub) - - // Test API initialization - if api == nil { - t.Fatal("Failed to initialize API") - } -} diff --git a/pkg/tests/auth_test.go b/pkg/tests/auth_test.go deleted file mode 100644 index b598e4ec..00000000 --- a/pkg/tests/auth_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package tests - -import ( - "os" - "testing" - - "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" - twitterscraper "github.com/masa-finance/masa-twitter-scraper" - "github.com/sirupsen/logrus" -) - -func TestAuth(t *testing.T) { - logrus.SetLevel(logrus.DebugLevel) - scraper := twitterscraper.New() - username := "" - password := "" - logrus.WithFields(logrus.Fields{"username": username}).Debug("Attempting to login") - - // Attempt to retrieve a 2FA code from environment variables - twoFACode := os.Getenv("TWITTER_2FA_CODE") - if twoFACode != "" { - logrus.WithField("2FA", "provided").Debug("2FA code is provided, attempting login with 2FA") - } else { - logrus.Debug("No 2FA code provided, attempting basic login") - } - - var err error - if twoFACode != "" { - // If a 2FA code is provided, use it for login - err = twitter.Login(scraper, username, password, twoFACode) - } else { - // Otherwise, proceed with basic login - err = twitter.Login(scraper, username, password) - } - - if err != nil { - logrus.WithError(err).Warning("[-] Login failed") - } else { - logrus.Debug("[+] Login successful") - } - - // Optionally, check if logged in - if twitter.IsLoggedIn(scraper) { - logrus.Debug("[+] Confirmed logged in.") - } else { - logrus.Debug("[-] Not logged in.") - } - - // Don't forget to logout after testing - err = twitter.Logout(scraper) - if err != nil { - logrus.WithError(err).Warn("[-] Logout failed") - } else { - logrus.Debug("[+] Logged out successfully.") - } -} diff --git a/pkg/tests/chain_test.go b/pkg/tests/chain_test.go deleted file mode 100644 index 5302ecc4..00000000 --- a/pkg/tests/chain_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package tests - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -// MockChain is a mock implementation of the Chain interface -type MockChain struct { - mock.Mock -} - -func TestGetLatestBlockNumber(t *testing.T) { - - assert.NotNil(t, 1) -} diff --git a/pkg/tests/scrape_test.go b/pkg/tests/scrape_test.go deleted file mode 100644 index 66650b19..00000000 --- a/pkg/tests/scrape_test.go +++ /dev/null @@ -1,317 +0,0 @@ -// go test -v ./pkg/tests -run TestScrapeTweetsByQuery -// export TWITTER_2FA_CODE="873855" -package tests - -import ( - "encoding/csv" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "os" - "path/filepath" - "runtime" - "testing" - "time" - - "github.com/joho/godotenv" - "github.com/masa-finance/masa-oracle/pkg/llmbridge" - twitterscraper "github.com/masa-finance/masa-twitter-scraper" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - - "github.com/masa-finance/masa-oracle/pkg/config" - "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" -) - -// Global scraper instance -var scraper *twitterscraper.Scraper - -func setup() { - var err error - _, b, _, _ := runtime.Caller(0) - rootDir := filepath.Join(filepath.Dir(b), "../..") - if _, _ = os.Stat(rootDir + "/.env"); !os.IsNotExist(err) { - _ = godotenv.Load() - } - - logrus.SetLevel(logrus.DebugLevel) - if scraper == nil { - scraper = twitterscraper.New() - } - - // Use GetInstance from config to access MasaDir - appConfig := config.GetInstance() - - // Construct the cookie file path using MasaDir from AppConfig - cookieFilePath := filepath.Join(appConfig.MasaDir, "twitter_cookies.json") - - // Attempt to load cookies - if err = twitter.LoadCookies(scraper, cookieFilePath); err == nil { - logrus.Debug("Cookies loaded successfully.") - if twitter.IsLoggedIn(scraper) { - logrus.Debug("Already logged in via cookies.") - return - } - } - - // If cookies are not valid or do not exist, proceed with login - username := appConfig.TwitterUsername - password := appConfig.TwitterPassword - logrus.WithFields(logrus.Fields{"username": username, "password": password}).Debug("Attempting to login") - - twoFACode := appConfig.Twitter2FaCode - - if twoFACode != "" { - logrus.WithField("2FA", "provided").Debug("2FA code is provided, attempting login with 2FA") - err = twitter.Login(scraper, username, password, twoFACode) - } else { - logrus.Debug("No 2FA code provided, attempting basic login") - err = twitter.Login(scraper, username, password) - } - - if err != nil { - logrus.WithError(err).Warning("[-] Login failed") - return - } - - // Save cookies after successful login - if err = twitter.SaveCookies(scraper, cookieFilePath); err != nil { - logrus.WithError(err).Error("[-] Failed to save cookies") - return - } - - logrus.Debug("[+] Login successful") -} - -func scrapeTweets(outputFile string) error { - // Implement the tweet scraping logic here - // This function should: - // 1. Make API calls to the MASA_NODE_URL - // 2. Process the responses - // 3. Write the tweets to the outputFile in CSV format - // 4. Handle rate limiting and retries - // 5. Return an error if something goes wrong - - // For now, we'll just create a dummy file - file, err := os.Create(outputFile) - if err != nil { - return fmt.Errorf("error creating file: %v", err) - } - defer file.Close() - - writer := csv.NewWriter(file) - defer writer.Flush() - - writer.Write([]string{"tweet", "datetime"}) - writer.Write([]string{"Test tweet #1", time.Now().Format(time.RFC3339)}) - writer.Write([]string{"Test tweet #2", time.Now().Format(time.RFC3339)}) - - return nil -} - -func TestSetup(t *testing.T) { - setup() -} - -func TestScrapeTweetsWithSentimentByQuery(t *testing.T) { - // Ensure setup is done before running the test - setup() - - query := "$MASA Token Masa" - count := 100 - tweets, err := twitter.ScrapeTweetsByQuery(query, count) - if err != nil { - logrus.WithError(err).Error("[-] Failed to scrape tweets") - return - } - - // Serialize the tweets data to JSON - tweetsData, err := json.Marshal(tweets) - if err != nil { - logrus.WithError(err).Error("[-] Failed to serialize tweets data") - return - } - - // Write the serialized data to a file - filePath := "scraped_tweets.json" - err = os.WriteFile(filePath, tweetsData, 0644) - if err != nil { - logrus.WithError(err).Error("[-] Failed to write tweets data to file") - return - } - logrus.WithField("file", filePath).Debug("[+] Tweets data written to file successfully.") - - // Read the serialized data from the file - fileData, err := os.ReadFile(filePath) - if err != nil { - logrus.WithError(err).Error("[-] Failed to read tweets data from file") - return - } - - // Correctly declare a new variable for the deserialized data - var deserializedTweets []*twitterscraper.Tweet - err = json.Unmarshal(fileData, &deserializedTweets) - if err != nil { - logrus.WithError(err).Error("[-] Failed to deserialize tweets data") - return - } - - // Now, deserializedTweets contains the tweets loaded from the file - // Send the tweets data to Claude for sentiment analysis - twitterScraperTweets := make([]*twitterscraper.TweetResult, len(deserializedTweets)) - for i, tweet := range deserializedTweets { - twitterScraperTweets[i] = &twitterscraper.TweetResult{ - Tweet: *tweet, - Error: nil, - } - } - sentimentRequest, sentimentSummary, err := llmbridge.AnalyzeSentimentTweets(twitterScraperTweets, "claude-3-opus-20240229", "Please perform a sentiment analysis on the following tweets, using an unbiased approach. Sentiment analysis involves identifying and categorizing opinions expressed in text, particularly to determine whether the writer's attitude towards a particular topic, product, etc., is positive, negative, or neutral. After analyzing, please provide a summary of the overall sentiment expressed in these tweets, including the proportion of positive, negative, and neutral sentiments if applicable.") - if err != nil { - logrus.WithError(err).Error("[-] Failed to analyze sentiment") - err = os.Remove(filePath) - if err != nil { - logrus.WithError(err).Error("[-] Failed to delete the temporary file") - } else { - logrus.WithField("file", filePath).Debug("[+] Temporary file deleted successfully") - } - return - } - logrus.WithFields(logrus.Fields{ - "sentimentRequest": sentimentRequest, - "sentimentSummary": sentimentSummary, - }).Debug("[+] Sentiment analysis completed successfully.") - - // Delete the created file after the test - err = os.Remove(filePath) - if err != nil { - logrus.WithError(err).Error("[-] Failed to delete the temporary file") - } else { - logrus.WithField("file", filePath).Debug("[+] Temporary file deleted successfully") - } -} - -func TestScrapeTweetsWithMockServer(t *testing.T) { - setup() - - // Mock server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("Expected POST request, got %s", r.Method) - } - - var requestBody map[string]interface{} - json.NewDecoder(r.Body).Decode(&requestBody) - - if query, ok := requestBody["query"].(string); !ok || query == "" { - t.Errorf("Expected query in request body") - } - - response := map[string]interface{}{ - "data": []map[string]interface{}{ - { - "Text": "Test tweet #1", - "Timestamp": time.Now().Unix(), - }, - { - "Text": "Test tweet #2", - "Timestamp": time.Now().Unix(), - }, - }, - } - - json.NewEncoder(w).Encode(response) - })) - defer server.Close() - - // Set up test environment - os.Setenv("MASA_NODE_URL", server.URL) - outputFile := "test_tweets.csv" - defer os.Remove(outputFile) - - // Run the scrape function (you'll need to implement this) - err := scrapeTweets(outputFile) - if err != nil { - t.Fatalf("Error scraping tweets: %v", err) - } - - // Verify the output - file, err := os.Open(outputFile) - if err != nil { - t.Fatalf("Error opening output file: %v", err) - } - defer file.Close() - - reader := csv.NewReader(file) - records, err := reader.ReadAll() - if err != nil { - t.Fatalf("Error reading CSV: %v", err) - } - - if len(records) != 3 { // Header + 2 tweets - t.Errorf("Expected 3 records, got %d", len(records)) - } - - if records[0][0] != "tweet" || records[0][1] != "datetime" { - t.Errorf("Unexpected header: %v", records[0]) - } - - for i, record := range records[1:] { - if record[0] == "" || record[1] == "" { - t.Errorf("Empty field in record %d: %v", i+1, record) - } - _, err := time.Parse(time.RFC3339, record[1]) - if err != nil { - t.Errorf("Invalid datetime format in record %d: %v", i+1, record[1]) - } - } -} - -func TestScrapeTweets(t *testing.T) { - setup() - - query := "Sadhguru" - count := 10 - - for i := 0; i < 100; i++ { - tweets, err := twitter.ScrapeTweetsByQuery(query, count) - if err != nil { - logrus.WithError(err).Error("[-] Failed to scrape tweets") - return - } - - var validTweets []*twitter.TweetResult - for _, tweet := range tweets { - if tweet.Error != nil { - logrus.WithError(tweet.Error).Warn("[-] Error in tweet") - continue - } - validTweets = append(validTweets, tweet) - } - - tweetsData, err := json.Marshal(validTweets) - if err != nil { - logrus.WithError(err).Error("[-] Failed to serialize tweets data") - return - } - - logrus.WithFields(logrus.Fields{ - "total_tweets": len(tweets), - "valid_tweets": len(validTweets), - "tweets_sample": string(tweetsData[:min(100, len(tweetsData))]), - }).Debug("[+] Tweets data") - - if len(tweetsData) > 0 { - assert.NotNil(t, tweetsData[:min(10, len(tweetsData))]) - } else { - assert.Nil(t, tweetsData) - } - } -} - -func min(a, b int) int { - if a < b { - return a - } - return b -} diff --git a/pkg/tests/scrapers/scrapers_suite_test.go b/pkg/tests/scrapers/scrapers_suite_test.go new file mode 100644 index 00000000..b27edc56 --- /dev/null +++ b/pkg/tests/scrapers/scrapers_suite_test.go @@ -0,0 +1,55 @@ +package scrapers_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/joho/godotenv" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" +) + +func TestScrapers(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Scrapers Suite") +} + +func TestMain(m *testing.M) { + // Override os.Args to prevent flag parsing errors + os.Args = []string{os.Args[0]} + + // Get the current working directory + cwd, err := os.Getwd() + if err != nil { + logrus.Fatalf("Failed to get current directory: %v", err) + } + + // Define the project root path + projectRoot := filepath.Join(cwd, "..", "..", "..") + envPath := filepath.Join(projectRoot, ".env") + + // Load the .env file + err = godotenv.Load(envPath) + if err != nil { + logrus.Warnf("Error loading .env file from %s: %v", envPath, err) + } else { + logrus.Info("Loaded .env from project root") + } + + // Verify that the required environment variables are set + requiredEnvVars := []string{"USER_AGENTS", "TWITTER_USERNAME", "TWITTER_PASSWORD"} + for _, envVar := range requiredEnvVars { + value := os.Getenv(envVar) + if value == "" { + logrus.Warnf("%s environment variable is not set", envVar) + } else { + logrus.Debugf("%s: %s", envVar, value) + } + } + + // Run the tests + exitCode := m.Run() + os.Exit(exitCode) +} diff --git a/pkg/tests/scrapers/twitter_scraper_test.go b/pkg/tests/scrapers/twitter_scraper_test.go new file mode 100644 index 00000000..e2b61bdc --- /dev/null +++ b/pkg/tests/scrapers/twitter_scraper_test.go @@ -0,0 +1,158 @@ +// Package scrapers_test contains integration tests for the Twitter scraper functionality. +// +// Dev Notes: +// - These tests require valid Twitter credentials set in environment variables. +// - The tests use a temporary directory for storing cookies and other data. +// - Make sure to run these tests in a controlled environment to avoid rate limiting. +// - The tests cover authentication, session reuse, and basic scraping operations. +package scrapers_test + +import ( + "os" + "path/filepath" + "runtime" + + "github.com/joho/godotenv" + "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" + twitterscraper "github.com/masa-finance/masa-twitter-scraper" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" +) + +var _ = Describe("Twitter Auth Function", func() { + var ( + twitterUsername string + twitterPassword string + twoFACode string + ) + + loadEnv := func() { + _, filename, _, _ := runtime.Caller(0) + projectRoot := filepath.Join(filepath.Dir(filename), "..", "..", "..") + envPath := filepath.Join(projectRoot, ".env") + + err := godotenv.Load(envPath) + if err != nil { + logrus.Warnf("Error loading .env file from %s: %v", envPath, err) + } else { + logrus.Infof("Loaded .env from %s", envPath) + } + } + + BeforeEach(func() { + loadEnv() + + tempDir := GinkgoT().TempDir() + config.GetInstance().MasaDir = tempDir + + twitterUsername = os.Getenv("TWITTER_USERNAME") + twitterPassword = os.Getenv("TWITTER_PASSWORD") + twoFACode = os.Getenv("TWITTER_2FA_CODE") + + Expect(twitterUsername).NotTo(BeEmpty(), "TWITTER_USERNAME environment variable is not set") + Expect(twitterPassword).NotTo(BeEmpty(), "TWITTER_PASSWORD environment variable is not set") + + config.GetInstance().TwitterUsername = twitterUsername + config.GetInstance().TwitterPassword = twitterPassword + config.GetInstance().Twitter2FaCode = twoFACode + }) + + authenticate := func() *twitterscraper.Scraper { + return twitter.Auth() + } + + checkLoggedIn := func(scraper *twitterscraper.Scraper) bool { + return twitter.IsLoggedIn(scraper) + } + + It("authenticates and logs in successfully", func() { + // Ensure cookie file doesn't exist before authentication + cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + Expect(cookieFile).NotTo(BeAnExistingFile()) + + // Authenticate + scraper := authenticate() + Expect(scraper).NotTo(BeNil()) + + // Check if cookie file was created + Expect(cookieFile).To(BeAnExistingFile()) + + // Verify logged in state + Expect(checkLoggedIn(scraper)).To(BeTrue()) + + // Attempt a simple operation to verify the session is valid + profile, err := twitter.ScrapeTweetsProfile("twitter") + Expect(err).To(BeNil()) + Expect(profile.Username).To(Equal("twitter")) + + logrus.Info("Authenticated and logged in to Twitter successfully") + }) + + It("reuses session from cookies", func() { + // First authentication + firstScraper := authenticate() + Expect(firstScraper).NotTo(BeNil()) + + // Verify cookie file is created + cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + Expect(cookieFile).To(BeAnExistingFile()) + + // Clear the scraper to force cookie reuse + firstScraper = nil + + // Second authentication (should use cookies) + secondScraper := authenticate() + Expect(secondScraper).NotTo(BeNil()) + + // Verify logged in state + Expect(checkLoggedIn(secondScraper)).To(BeTrue()) + + // Attempt a simple operation to verify the session is valid + profile, err := twitter.ScrapeTweetsProfile("twitter") + Expect(err).To(BeNil()) + Expect(profile.Username).To(Equal("twitter")) + + logrus.Info("Reused session from cookies successfully") + }) + + It("scrapes the profile of 'god' and recent #Bitcoin tweets using saved cookies", func() { + // First authentication + firstScraper := authenticate() + Expect(firstScraper).NotTo(BeNil()) + + // Verify cookie file is created + cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + Expect(cookieFile).To(BeAnExistingFile()) + + // Clear the scraper to force cookie reuse + firstScraper = nil + + // Second authentication (should use cookies) + secondScraper := authenticate() + Expect(secondScraper).NotTo(BeNil()) + + // Verify logged in state + Expect(twitter.IsLoggedIn(secondScraper)).To(BeTrue()) + + // Attempt to scrape profile + profile, err := twitter.ScrapeTweetsProfile("god") + Expect(err).To(BeNil()) + logrus.Infof("Profile of 'god': %+v", profile) + + // Scrape recent #Bitcoin tweets + tweets, err := twitter.ScrapeTweetsByQuery("#Bitcoin", 3) + Expect(err).To(BeNil()) + Expect(tweets).To(HaveLen(3)) + + logrus.Info("Recent #Bitcoin tweets:") + for i, tweet := range tweets { + logrus.Infof("Tweet %d: %s", i+1, tweet.Tweet.Text) + } + }) + + AfterEach(func() { + os.RemoveAll(config.GetInstance().MasaDir) + }) +}) diff --git a/pkg/workers/config.go b/pkg/workers/config.go index ba266114..03c38957 100644 --- a/pkg/workers/config.go +++ b/pkg/workers/config.go @@ -18,7 +18,7 @@ type WorkerConfig struct { var DefaultConfig = WorkerConfig{ WorkerTimeout: 55 * time.Second, - WorkerResponseTimeout: 30 * time.Second, + WorkerResponseTimeout: 45 * time.Second, ConnectionTimeout: 10 * time.Second, MaxRetries: 1, MaxSpawnAttempts: 1, diff --git a/pkg/workers/handlers/twitter.go b/pkg/workers/handlers/twitter.go index 72d164ef..bd4cc00e 100644 --- a/pkg/workers/handlers/twitter.go +++ b/pkg/workers/handlers/twitter.go @@ -12,8 +12,6 @@ import ( type TwitterQueryHandler struct{} type TwitterFollowersHandler struct{} type TwitterProfileHandler struct{} -type TwitterSentimentHandler struct{} -type TwitterTrendsHandler struct{} func (h *TwitterQueryHandler) HandleWork(data []byte) data_types.WorkResponse { logrus.Infof("[+] TwitterQueryHandler input: %s", data) @@ -73,30 +71,3 @@ func (h *TwitterProfileHandler) HandleWork(data []byte) data_types.WorkResponse logrus.Infof("[+] TwitterProfileHandler Work response for %s: %d records returned", data_types.TwitterProfile, 1) return data_types.WorkResponse{Data: resp, RecordCount: 1} } - -func (h *TwitterSentimentHandler) HandleWork(data []byte) data_types.WorkResponse { - logrus.Infof("[+] TwitterSentimentHandler %s", data) - dataMap, err := JsonBytesToMap(data) - if err != nil { - return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse twitter sentiment data: %v", err)} - } - count := int(dataMap["count"].(float64)) - query := dataMap["query"].(string) - model := dataMap["model"].(string) - _, resp, err := twitter.ScrapeTweetsForSentiment(query, count, model) - if err != nil { - return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter sentiment: %v", err)} - } - logrus.Infof("[+] TwitterSentimentHandler Work response for %s: %d records returned", data_types.TwitterSentiment, 1) - return data_types.WorkResponse{Data: resp, RecordCount: 1} -} - -func (h *TwitterTrendsHandler) HandleWork(data []byte) data_types.WorkResponse { - logrus.Infof("[+] TwitterTrendsHandler %s", data) - resp, err := twitter.ScrapeTweetsByTrends() - if err != nil { - return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter trends: %v", err)} - } - logrus.Infof("[+] TwitterTrendsHandler Work response for %s: %d records returned", data_types.TwitterTrends, len(resp)) - return data_types.WorkResponse{Data: resp, RecordCount: len(resp)} -} diff --git a/pkg/workers/types/work_types.go b/pkg/workers/types/work_types.go index 8e81f335..bdbf68fc 100644 --- a/pkg/workers/types/work_types.go +++ b/pkg/workers/types/work_types.go @@ -21,8 +21,6 @@ const ( Twitter WorkerType = "twitter" TwitterFollowers WorkerType = "twitter-followers" TwitterProfile WorkerType = "twitter-profile" - TwitterSentiment WorkerType = "twitter-sentiment" - TwitterTrends WorkerType = "twitter-trends" Web WorkerType = "web" WebSentiment WorkerType = "web-sentiment" Test WorkerType = "test" @@ -43,7 +41,7 @@ func WorkerTypeToCategory(wt WorkerType) pubsub.WorkerCategory { case TelegramSentiment, TelegramChannelMessages: logrus.Info("WorkerType is related to Telegram") return pubsub.CategoryTelegram - case Twitter, TwitterFollowers, TwitterProfile, TwitterSentiment, TwitterTrends: + case Twitter, TwitterFollowers, TwitterProfile: logrus.Info("WorkerType is related to Twitter") return pubsub.CategoryTwitter case Web, WebSentiment: @@ -65,7 +63,7 @@ func WorkerTypeToDataSource(wt WorkerType) string { case TelegramSentiment, TelegramChannelMessages: logrus.Info("WorkerType is related to Telegram") return DataSourceTelegram - case Twitter, TwitterFollowers, TwitterProfile, TwitterSentiment, TwitterTrends: + case Twitter, TwitterFollowers, TwitterProfile: logrus.Info("WorkerType is related to Twitter") return DataSourceTwitter case Web, WebSentiment: diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index f44b1c66..7ea38831 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -34,8 +34,6 @@ func NewWorkHandlerManager(opts ...WorkerOptionFunc) *WorkHandlerManager { whm.addWorkHandler(data_types.Twitter, &handlers.TwitterQueryHandler{}) whm.addWorkHandler(data_types.TwitterFollowers, &handlers.TwitterFollowersHandler{}) whm.addWorkHandler(data_types.TwitterProfile, &handlers.TwitterProfileHandler{}) - whm.addWorkHandler(data_types.TwitterSentiment, &handlers.TwitterSentimentHandler{}) - whm.addWorkHandler(data_types.TwitterTrends, &handlers.TwitterTrendsHandler{}) } if options.isWebScraperWorker { diff --git a/tests/node1/private.key b/tests/node1/private.key deleted file mode 100644 index c15295b1b9a32767c165474d8191c5b15cd274aa..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1197 zcmV;e1XBA501~JPFoFc60s#O5f&l>lxQLWrec2v>;OHSM=R;V~LuUn80HrA&Ja|X| zGBER#UpEVI*qtR7`_LmZ5ZO}Q=$91*{luFI{M1WP#-pULHzD1V<8mYdKeFt~JWzJgr+)Ti9@){__g zZKIS$Xy8wj8Sxeq&sMepTl#786uwoj2iNk;Nunq!`>cZ2Qr!L)aiE|@NZ4cb6ccUJ_|l9;?O5=0fNes*8TXvBW>M=uQP~K$Q~HUV{$lIbG8Gn4Gq;b8XhmmZ3-sbX+2lT26mPmC3 zElKKmBJ+&b!APfSHjTU!C*l|sw>_;~XY^d{mb6OS3RNN_|E9hKisB5*=gJNpHvW^I zT>^oD0N@4ytC%O;nh}Q>_O=ujmmU_BQFuTWxnrGsA)@zA2dh^|GCdm1;JH4R0J+-$ z)cYZhG_mww&|fnu+w|ky-EDN}R{1dpz3(OD)lttZ>O$|yB{^H?1HExev@L~^$z{Tf zH=T3k(Kcajx->NLb7VrTi?|5tUHv|wVCgfa@?CU)#}8EwUcAToQMr>t8M~MX_!GULm-4gPhPYS7Cp)^Q=wf#p z#O?OoodSV@0F%}SAX~6^Y?kt|>jJtEy)fcpyZB>4t7fH{#s$PUHv)lxTD)cf@h=)lLsC*{?X)>^ ztWf6qMX@aufnrDN=1S1YY}0oFZKRw=OW;ya%f9@&j=X)WB0gyR<>DtlUU0hDC+(T7 zmx)+2zMOMe0|7vYsh5p5)L=Sm$9%~`GXwAluql888ADm`?k9W=2l2t0FzWYzGzcWs L&zr=@CyQ;}w&_l%o?ku9=S)~+vi0?pVKKf^`l|RPNt760N22|{%jAoNKJ($ZlAUy7E8xlN z^e`CWdhW3yBf)1EA0fWa!Gs-#KstO zMG~G`gElb+L0r5QUo&D%AJ69P0s{d60Rn;n04$IqE8-$(3;S|vP!fbgL3lx+l@O?7 z$x2-M&hqrNQ(odIj;?u!z%f85ioQ-h#R_8jFkJ#X)o{5iTl8wEeYzK{wfqP#oUK%C@&2s%uz zNm6$rF+E8{N3fDpTDmN7_elbwXQoQt8U#_>7e=iiIYU( zJ_`?Kik%*N>0K~o!r<71sn6Opa8-PIhLABUXJq=O^NaYffxok9!QKC$oV}n~*Uc_Y2J#znqDFm{B$7+%nL>1|#Bg8D?Mj99Hvw zMpMJP#POz}Vy{?>@Oy~1Tn^@-4env0ZcKF)`vQT0EX-NLyX?=~^UQ{QrjY?se@z+L zxek8X;~@Mt$~bgmDd~Dw-Hiudchcko>(v;%l`beVPVzMyQGAkGe_IyP@V-td@>1S$ zV3Vz#A&^2Mg_wV|`&Sn>7Gjs18?6QGr{-{6vBoX-=p2%9gLNT!_)1t)sq%j#)U7aO zl(#bN0)c>9EJ=ncN=KT|ZssUwL95K5+E7{wGT?@R#Xx7`*^n4*sj_iz;i z=FgvYyhSvpk@U)DTV1{RJ+}5Rtw&A=T{P@djrb1&fq;1cQq#fmlh3&4s=c&2;66)J zLc9&MZ06@3-Al^#(k`?WOPS0q8^eT)jY1t{aY9 JdcPvWoD2IpLVW-L diff --git a/tests/node_listener.go b/tests/node_listener.go deleted file mode 100644 index c4378c29..00000000 --- a/tests/node_listener.go +++ /dev/null @@ -1,156 +0,0 @@ -package tests - -import ( - "crypto/rand" - "crypto/rsa" - "crypto/tls" - "crypto/x509" - "crypto/x509/pkix" - "math/big" - "time" - - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/sec" - "github.com/libp2p/go-libp2p/core/sec/insecure" - "github.com/libp2p/go-libp2p/core/transport" - rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - "github.com/libp2p/go-libp2p/p2p/muxer/yamux" - "github.com/libp2p/go-libp2p/p2p/net/upgrader" - libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" - "github.com/libp2p/go-libp2p/p2p/transport/websocket" - "github.com/multiformats/go-multiaddr" - "github.com/sirupsen/logrus" -) - -type NodeListener struct { - PrivKey crypto.PrivKey - Address multiaddr.Multiaddr - ServerId peer.ID - Upgrader transport.Upgrader - Listener transport.Listener -} - -func NewNodeListener(connString string) (*NodeListener, error) { - multiAddr := multiaddr.StringCast(connString) - privKey, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) - if err != nil { - return nil, err - } - id, u, err := newUpgrader(privKey) - if err != nil { - return nil, err - } - return &NodeListener{ - PrivKey: privKey, - Address: multiAddr, - ServerId: id, - Upgrader: u, - }, nil -} - -func (ml *NodeListener) Start() error { - logrus.Infof("[+] NodeListener --> Start()") - var opts []websocket.Option - tlsConf, err := generateTLSConfig() - if err != nil { - return err - } - opts = append(opts, websocket.WithTLSConfig(tlsConf)) - tpt, err := websocket.New(ml.Upgrader, &network.NullResourceManager{}, opts...) - if err != nil { - return err - } - - l, err := tpt.Listen(ml.Address) - if err != nil { - return err - } - // Start with the default scaling limits. - scalingLimits := rcmgr.DefaultLimits - concreteLimits := scalingLimits.AutoScale() - limiter := rcmgr.NewFixedLimiter(concreteLimits) - - rm, err := rcmgr.NewResourceManager(limiter) - if err != nil { - return err - } - - host, err := libp2p.New( - libp2p.Transport(websocket.New), - libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/3001/ws"), - libp2p.ResourceManager(rm), - libp2p.Identity(ml.PrivKey), - libp2p.Ping(false), // disable built-in ping - libp2p.Security(libp2ptls.ID, libp2ptls.New), - ) - if err != nil { - return err - } - - host.SetStreamHandler("/websocket/1.0.0", handleStream) - ml.Listener = l - peerInfo := peer.AddrInfo{ - ID: host.ID(), - Addrs: host.Addrs(), - } - multiaddrs, err := peer.AddrInfoToP2pAddrs(&peerInfo) - if err != nil { - return err - } - addr1 := ml.Address.String() - addr2 := multiaddrs[0].String() - logrus.Infof("[+] libp2p host address: %s", addr1) - logrus.Infof("[+] libp2p host address: %s", addr2) - return nil -} - -func handleStream(stream network.Stream) { - defer stream.Close() - - buf := make([]byte, 1024) - n, err := stream.Read(buf) - if err != nil { - logrus.Errorf("[-] Error reading from stream: %s", err) - return - } - - logrus.Infof("[+] Received message: %s", string(buf[:n])) -} - -func generateTLSConfig() (*tls.Config, error) { - priv, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - return nil, err - } - tmpl := &x509.Certificate{ - SerialNumber: big.NewInt(1), - Subject: pkix.Name{}, - SignatureAlgorithm: x509.SHA256WithRSA, - NotBefore: time.Now(), - NotAfter: time.Now().Add(time.Hour), // valid for an hour - BasicConstraintsValid: true, - } - certDER, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, priv.Public(), priv) - return &tls.Config{ - Certificates: []tls.Certificate{{ - PrivateKey: priv, - Certificate: [][]byte{certDER}, - }}, - }, nil -} - -func newUpgrader(privKey crypto.PrivKey) (peer.ID, transport.Upgrader, error) { - id, err := peer.IDFromPrivateKey(privKey) - security := []sec.SecureTransport{insecure.NewWithIdentity(insecure.ID, id, privKey)} - if err != nil { - return "", nil, err - } - upgrader, err := upgrader.New(security, []upgrader.StreamMuxer{{ID: "/yamux", Muxer: yamux.DefaultTransport}}, nil, nil, nil) - if err != nil { - return "", nil, err - } - return id, upgrader, nil -}