From 6ced8b5d1b63a7501d9023bbd64c4920d161a304 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:10:02 -0700 Subject: [PATCH 01/16] chore: cleanup and delete old tests --- tests/node1/private.key | Bin 1197 -> 0 bytes tests/node2/private.key | Bin 1195 -> 0 bytes tests/node_listener.go | 156 ---------------------------------------- 3 files changed, 156 deletions(-) delete mode 100644 tests/node1/private.key delete mode 100644 tests/node2/private.key delete mode 100644 tests/node_listener.go diff --git a/tests/node1/private.key b/tests/node1/private.key deleted file mode 100644 index c15295b1b9a32767c165474d8191c5b15cd274aa..0000000000000000000000000000000000000000 GIT binary patch literal 0 HcmV?d00001 literal 1197 zcmV;e1XBA501~JPFoFc60s#O5f&l>lxQLWrec2v>;OHSM=R;V~LuUn80HrA&Ja|X| zGBER#UpEVI*qtR7`_LmZ5ZO}Q=$91*{luFI{M1WP#-pULHzD1V<8mYdKeFt~JWzJgr+)Ti9@){__g zZKIS$Xy8wj8Sxeq&sMepTl#786uwoj2iNk;Nunq!`>cZ2Qr!L)aiE|@NZ4cb6ccUJ_|l9;?O5=0fNes*8TXvBW>M=uQP~K$Q~HUV{$lIbG8Gn4Gq;b8XhmmZ3-sbX+2lT26mPmC3 zElKKmBJ+&b!APfSHjTU!C*l|sw>_;~XY^d{mb6OS3RNN_|E9hKisB5*=gJNpHvW^I zT>^oD0N@4ytC%O;nh}Q>_O=ujmmU_BQFuTWxnrGsA)@zA2dh^|GCdm1;JH4R0J+-$ z)cYZhG_mww&|fnu+w|ky-EDN}R{1dpz3(OD)lttZ>O$|yB{^H?1HExev@L~^$z{Tf zH=T3k(Kcajx->NLb7VrTi?|5tUHv|wVCgfa@?CU)#}8EwUcAToQMr>t8M~MX_!GULm-4gPhPYS7Cp)^Q=wf#p z#O?OoodSV@0F%}SAX~6^Y?kt|>jJtEy)fcpyZB>4t7fH{#s$PUHv)lxTD)cf@h=)lLsC*{?X)>^ ztWf6qMX@aufnrDN=1S1YY}0oFZKRw=OW;ya%f9@&j=X)WB0gyR<>DtlUU0hDC+(T7 zmx)+2zMOMe0|7vYsh5p5)L=Sm$9%~`GXwAluql888ADm`?k9W=2l2t0FzWYzGzcWs L&zr=@CyQ;}w&_l%o?ku9=S)~+vi0?pVKKf^`l|RPNt760N22|{%jAoNKJ($ZlAUy7E8xlN z^e`CWdhW3yBf)1EA0fWa!Gs-#KstO zMG~G`gElb+L0r5QUo&D%AJ69P0s{d60Rn;n04$IqE8-$(3;S|vP!fbgL3lx+l@O?7 z$x2-M&hqrNQ(odIj;?u!z%f85ioQ-h#R_8jFkJ#X)o{5iTl8wEeYzK{wfqP#oUK%C@&2s%uz zNm6$rF+E8{N3fDpTDmN7_elbwXQoQt8U#_>7e=iiIYU( zJ_`?Kik%*N>0K~o!r<71sn6Opa8-PIhLABUXJq=O^NaYffxok9!QKC$oV}n~*Uc_Y2J#znqDFm{B$7+%nL>1|#Bg8D?Mj99Hvw zMpMJP#POz}Vy{?>@Oy~1Tn^@-4env0ZcKF)`vQT0EX-NLyX?=~^UQ{QrjY?se@z+L zxek8X;~@Mt$~bgmDd~Dw-Hiudchcko>(v;%l`beVPVzMyQGAkGe_IyP@V-td@>1S$ zV3Vz#A&^2Mg_wV|`&Sn>7Gjs18?6QGr{-{6vBoX-=p2%9gLNT!_)1t)sq%j#)U7aO zl(#bN0)c>9EJ=ncN=KT|ZssUwL95K5+E7{wGT?@R#Xx7`*^n4*sj_iz;i z=FgvYyhSvpk@U)DTV1{RJ+}5Rtw&A=T{P@djrb1&fq;1cQq#fmlh3&4s=c&2;66)J zLc9&MZ06@3-Al^#(k`?WOPS0q8^eT)jY1t{aY9 JdcPvWoD2IpLVW-L diff --git a/tests/node_listener.go b/tests/node_listener.go deleted file mode 100644 index c4378c29..00000000 --- a/tests/node_listener.go +++ /dev/null @@ -1,156 +0,0 @@ -package tests - -import ( - "crypto/rand" - "crypto/rsa" - "crypto/tls" - "crypto/x509" - "crypto/x509/pkix" - "math/big" - "time" - - "github.com/libp2p/go-libp2p" - "github.com/libp2p/go-libp2p/core/crypto" - "github.com/libp2p/go-libp2p/core/network" - "github.com/libp2p/go-libp2p/core/peer" - "github.com/libp2p/go-libp2p/core/sec" - "github.com/libp2p/go-libp2p/core/sec/insecure" - "github.com/libp2p/go-libp2p/core/transport" - rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" - "github.com/libp2p/go-libp2p/p2p/muxer/yamux" - "github.com/libp2p/go-libp2p/p2p/net/upgrader" - libp2ptls "github.com/libp2p/go-libp2p/p2p/security/tls" - "github.com/libp2p/go-libp2p/p2p/transport/websocket" - "github.com/multiformats/go-multiaddr" - "github.com/sirupsen/logrus" -) - -type NodeListener struct { - PrivKey crypto.PrivKey - Address multiaddr.Multiaddr - ServerId peer.ID - Upgrader transport.Upgrader - Listener transport.Listener -} - -func NewNodeListener(connString string) (*NodeListener, error) { - multiAddr := multiaddr.StringCast(connString) - privKey, _, err := crypto.GenerateKeyPair(crypto.RSA, 2048) - if err != nil { - return nil, err - } - id, u, err := newUpgrader(privKey) - if err != nil { - return nil, err - } - return &NodeListener{ - PrivKey: privKey, - Address: multiAddr, - ServerId: id, - Upgrader: u, - }, nil -} - -func (ml *NodeListener) Start() error { - logrus.Infof("[+] NodeListener --> Start()") - var opts []websocket.Option - tlsConf, err := generateTLSConfig() - if err != nil { - return err - } - opts = append(opts, websocket.WithTLSConfig(tlsConf)) - tpt, err := websocket.New(ml.Upgrader, &network.NullResourceManager{}, opts...) - if err != nil { - return err - } - - l, err := tpt.Listen(ml.Address) - if err != nil { - return err - } - // Start with the default scaling limits. - scalingLimits := rcmgr.DefaultLimits - concreteLimits := scalingLimits.AutoScale() - limiter := rcmgr.NewFixedLimiter(concreteLimits) - - rm, err := rcmgr.NewResourceManager(limiter) - if err != nil { - return err - } - - host, err := libp2p.New( - libp2p.Transport(websocket.New), - libp2p.ListenAddrStrings("/ip4/0.0.0.0/tcp/3001/ws"), - libp2p.ResourceManager(rm), - libp2p.Identity(ml.PrivKey), - libp2p.Ping(false), // disable built-in ping - libp2p.Security(libp2ptls.ID, libp2ptls.New), - ) - if err != nil { - return err - } - - host.SetStreamHandler("/websocket/1.0.0", handleStream) - ml.Listener = l - peerInfo := peer.AddrInfo{ - ID: host.ID(), - Addrs: host.Addrs(), - } - multiaddrs, err := peer.AddrInfoToP2pAddrs(&peerInfo) - if err != nil { - return err - } - addr1 := ml.Address.String() - addr2 := multiaddrs[0].String() - logrus.Infof("[+] libp2p host address: %s", addr1) - logrus.Infof("[+] libp2p host address: %s", addr2) - return nil -} - -func handleStream(stream network.Stream) { - defer stream.Close() - - buf := make([]byte, 1024) - n, err := stream.Read(buf) - if err != nil { - logrus.Errorf("[-] Error reading from stream: %s", err) - return - } - - logrus.Infof("[+] Received message: %s", string(buf[:n])) -} - -func generateTLSConfig() (*tls.Config, error) { - priv, err := rsa.GenerateKey(rand.Reader, 2048) - if err != nil { - return nil, err - } - tmpl := &x509.Certificate{ - SerialNumber: big.NewInt(1), - Subject: pkix.Name{}, - SignatureAlgorithm: x509.SHA256WithRSA, - NotBefore: time.Now(), - NotAfter: time.Now().Add(time.Hour), // valid for an hour - BasicConstraintsValid: true, - } - certDER, err := x509.CreateCertificate(rand.Reader, tmpl, tmpl, priv.Public(), priv) - return &tls.Config{ - Certificates: []tls.Certificate{{ - PrivateKey: priv, - Certificate: [][]byte{certDER}, - }}, - }, nil -} - -func newUpgrader(privKey crypto.PrivKey) (peer.ID, transport.Upgrader, error) { - id, err := peer.IDFromPrivateKey(privKey) - security := []sec.SecureTransport{insecure.NewWithIdentity(insecure.ID, id, privKey)} - if err != nil { - return "", nil, err - } - upgrader, err := upgrader.New(security, []upgrader.StreamMuxer{{ID: "/yamux", Muxer: yamux.DefaultTransport}}, nil, nil, nil) - if err != nil { - return "", nil, err - } - return id, upgrader, nil -} From 0cd796a2c984f9be93c789312ddfe8bc64180c24 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:11:17 -0700 Subject: [PATCH 02/16] chore: delete old tests --- pkg/tests/api_test.go | 25 --- pkg/tests/auth_test.go | 56 ------- pkg/tests/chain_test.go | 18 --- pkg/tests/scrape_test.go | 317 --------------------------------------- 4 files changed, 416 deletions(-) delete mode 100644 pkg/tests/api_test.go delete mode 100644 pkg/tests/auth_test.go delete mode 100644 pkg/tests/chain_test.go delete mode 100644 pkg/tests/scrape_test.go diff --git a/pkg/tests/api_test.go b/pkg/tests/api_test.go deleted file mode 100644 index 4784fd1a..00000000 --- a/pkg/tests/api_test.go +++ /dev/null @@ -1,25 +0,0 @@ -package tests - -import ( - "testing" - - "github.com/masa-finance/masa-oracle/node" - "github.com/masa-finance/masa-oracle/pkg/api" - "github.com/masa-finance/masa-oracle/pkg/pubsub" - "github.com/masa-finance/masa-oracle/pkg/workers" -) - -func TestAPI(t *testing.T) { - // Create a new OracleNode instance - n := &node.OracleNode{} - whm := &workers.WorkHandlerManager{} - pubKeySub := &pubsub.PublicKeySubscriptionHandler{} - - // Initialize the API - api := api.NewAPI(n, whm, pubKeySub) - - // Test API initialization - if api == nil { - t.Fatal("Failed to initialize API") - } -} diff --git a/pkg/tests/auth_test.go b/pkg/tests/auth_test.go deleted file mode 100644 index b598e4ec..00000000 --- a/pkg/tests/auth_test.go +++ /dev/null @@ -1,56 +0,0 @@ -package tests - -import ( - "os" - "testing" - - "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" - twitterscraper "github.com/masa-finance/masa-twitter-scraper" - "github.com/sirupsen/logrus" -) - -func TestAuth(t *testing.T) { - logrus.SetLevel(logrus.DebugLevel) - scraper := twitterscraper.New() - username := "" - password := "" - logrus.WithFields(logrus.Fields{"username": username}).Debug("Attempting to login") - - // Attempt to retrieve a 2FA code from environment variables - twoFACode := os.Getenv("TWITTER_2FA_CODE") - if twoFACode != "" { - logrus.WithField("2FA", "provided").Debug("2FA code is provided, attempting login with 2FA") - } else { - logrus.Debug("No 2FA code provided, attempting basic login") - } - - var err error - if twoFACode != "" { - // If a 2FA code is provided, use it for login - err = twitter.Login(scraper, username, password, twoFACode) - } else { - // Otherwise, proceed with basic login - err = twitter.Login(scraper, username, password) - } - - if err != nil { - logrus.WithError(err).Warning("[-] Login failed") - } else { - logrus.Debug("[+] Login successful") - } - - // Optionally, check if logged in - if twitter.IsLoggedIn(scraper) { - logrus.Debug("[+] Confirmed logged in.") - } else { - logrus.Debug("[-] Not logged in.") - } - - // Don't forget to logout after testing - err = twitter.Logout(scraper) - if err != nil { - logrus.WithError(err).Warn("[-] Logout failed") - } else { - logrus.Debug("[+] Logged out successfully.") - } -} diff --git a/pkg/tests/chain_test.go b/pkg/tests/chain_test.go deleted file mode 100644 index 5302ecc4..00000000 --- a/pkg/tests/chain_test.go +++ /dev/null @@ -1,18 +0,0 @@ -package tests - -import ( - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" -) - -// MockChain is a mock implementation of the Chain interface -type MockChain struct { - mock.Mock -} - -func TestGetLatestBlockNumber(t *testing.T) { - - assert.NotNil(t, 1) -} diff --git a/pkg/tests/scrape_test.go b/pkg/tests/scrape_test.go deleted file mode 100644 index 66650b19..00000000 --- a/pkg/tests/scrape_test.go +++ /dev/null @@ -1,317 +0,0 @@ -// go test -v ./pkg/tests -run TestScrapeTweetsByQuery -// export TWITTER_2FA_CODE="873855" -package tests - -import ( - "encoding/csv" - "encoding/json" - "fmt" - "net/http" - "net/http/httptest" - "os" - "path/filepath" - "runtime" - "testing" - "time" - - "github.com/joho/godotenv" - "github.com/masa-finance/masa-oracle/pkg/llmbridge" - twitterscraper "github.com/masa-finance/masa-twitter-scraper" - "github.com/sirupsen/logrus" - "github.com/stretchr/testify/assert" - - "github.com/masa-finance/masa-oracle/pkg/config" - "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" -) - -// Global scraper instance -var scraper *twitterscraper.Scraper - -func setup() { - var err error - _, b, _, _ := runtime.Caller(0) - rootDir := filepath.Join(filepath.Dir(b), "../..") - if _, _ = os.Stat(rootDir + "/.env"); !os.IsNotExist(err) { - _ = godotenv.Load() - } - - logrus.SetLevel(logrus.DebugLevel) - if scraper == nil { - scraper = twitterscraper.New() - } - - // Use GetInstance from config to access MasaDir - appConfig := config.GetInstance() - - // Construct the cookie file path using MasaDir from AppConfig - cookieFilePath := filepath.Join(appConfig.MasaDir, "twitter_cookies.json") - - // Attempt to load cookies - if err = twitter.LoadCookies(scraper, cookieFilePath); err == nil { - logrus.Debug("Cookies loaded successfully.") - if twitter.IsLoggedIn(scraper) { - logrus.Debug("Already logged in via cookies.") - return - } - } - - // If cookies are not valid or do not exist, proceed with login - username := appConfig.TwitterUsername - password := appConfig.TwitterPassword - logrus.WithFields(logrus.Fields{"username": username, "password": password}).Debug("Attempting to login") - - twoFACode := appConfig.Twitter2FaCode - - if twoFACode != "" { - logrus.WithField("2FA", "provided").Debug("2FA code is provided, attempting login with 2FA") - err = twitter.Login(scraper, username, password, twoFACode) - } else { - logrus.Debug("No 2FA code provided, attempting basic login") - err = twitter.Login(scraper, username, password) - } - - if err != nil { - logrus.WithError(err).Warning("[-] Login failed") - return - } - - // Save cookies after successful login - if err = twitter.SaveCookies(scraper, cookieFilePath); err != nil { - logrus.WithError(err).Error("[-] Failed to save cookies") - return - } - - logrus.Debug("[+] Login successful") -} - -func scrapeTweets(outputFile string) error { - // Implement the tweet scraping logic here - // This function should: - // 1. Make API calls to the MASA_NODE_URL - // 2. Process the responses - // 3. Write the tweets to the outputFile in CSV format - // 4. Handle rate limiting and retries - // 5. Return an error if something goes wrong - - // For now, we'll just create a dummy file - file, err := os.Create(outputFile) - if err != nil { - return fmt.Errorf("error creating file: %v", err) - } - defer file.Close() - - writer := csv.NewWriter(file) - defer writer.Flush() - - writer.Write([]string{"tweet", "datetime"}) - writer.Write([]string{"Test tweet #1", time.Now().Format(time.RFC3339)}) - writer.Write([]string{"Test tweet #2", time.Now().Format(time.RFC3339)}) - - return nil -} - -func TestSetup(t *testing.T) { - setup() -} - -func TestScrapeTweetsWithSentimentByQuery(t *testing.T) { - // Ensure setup is done before running the test - setup() - - query := "$MASA Token Masa" - count := 100 - tweets, err := twitter.ScrapeTweetsByQuery(query, count) - if err != nil { - logrus.WithError(err).Error("[-] Failed to scrape tweets") - return - } - - // Serialize the tweets data to JSON - tweetsData, err := json.Marshal(tweets) - if err != nil { - logrus.WithError(err).Error("[-] Failed to serialize tweets data") - return - } - - // Write the serialized data to a file - filePath := "scraped_tweets.json" - err = os.WriteFile(filePath, tweetsData, 0644) - if err != nil { - logrus.WithError(err).Error("[-] Failed to write tweets data to file") - return - } - logrus.WithField("file", filePath).Debug("[+] Tweets data written to file successfully.") - - // Read the serialized data from the file - fileData, err := os.ReadFile(filePath) - if err != nil { - logrus.WithError(err).Error("[-] Failed to read tweets data from file") - return - } - - // Correctly declare a new variable for the deserialized data - var deserializedTweets []*twitterscraper.Tweet - err = json.Unmarshal(fileData, &deserializedTweets) - if err != nil { - logrus.WithError(err).Error("[-] Failed to deserialize tweets data") - return - } - - // Now, deserializedTweets contains the tweets loaded from the file - // Send the tweets data to Claude for sentiment analysis - twitterScraperTweets := make([]*twitterscraper.TweetResult, len(deserializedTweets)) - for i, tweet := range deserializedTweets { - twitterScraperTweets[i] = &twitterscraper.TweetResult{ - Tweet: *tweet, - Error: nil, - } - } - sentimentRequest, sentimentSummary, err := llmbridge.AnalyzeSentimentTweets(twitterScraperTweets, "claude-3-opus-20240229", "Please perform a sentiment analysis on the following tweets, using an unbiased approach. Sentiment analysis involves identifying and categorizing opinions expressed in text, particularly to determine whether the writer's attitude towards a particular topic, product, etc., is positive, negative, or neutral. After analyzing, please provide a summary of the overall sentiment expressed in these tweets, including the proportion of positive, negative, and neutral sentiments if applicable.") - if err != nil { - logrus.WithError(err).Error("[-] Failed to analyze sentiment") - err = os.Remove(filePath) - if err != nil { - logrus.WithError(err).Error("[-] Failed to delete the temporary file") - } else { - logrus.WithField("file", filePath).Debug("[+] Temporary file deleted successfully") - } - return - } - logrus.WithFields(logrus.Fields{ - "sentimentRequest": sentimentRequest, - "sentimentSummary": sentimentSummary, - }).Debug("[+] Sentiment analysis completed successfully.") - - // Delete the created file after the test - err = os.Remove(filePath) - if err != nil { - logrus.WithError(err).Error("[-] Failed to delete the temporary file") - } else { - logrus.WithField("file", filePath).Debug("[+] Temporary file deleted successfully") - } -} - -func TestScrapeTweetsWithMockServer(t *testing.T) { - setup() - - // Mock server - server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { - if r.Method != "POST" { - t.Errorf("Expected POST request, got %s", r.Method) - } - - var requestBody map[string]interface{} - json.NewDecoder(r.Body).Decode(&requestBody) - - if query, ok := requestBody["query"].(string); !ok || query == "" { - t.Errorf("Expected query in request body") - } - - response := map[string]interface{}{ - "data": []map[string]interface{}{ - { - "Text": "Test tweet #1", - "Timestamp": time.Now().Unix(), - }, - { - "Text": "Test tweet #2", - "Timestamp": time.Now().Unix(), - }, - }, - } - - json.NewEncoder(w).Encode(response) - })) - defer server.Close() - - // Set up test environment - os.Setenv("MASA_NODE_URL", server.URL) - outputFile := "test_tweets.csv" - defer os.Remove(outputFile) - - // Run the scrape function (you'll need to implement this) - err := scrapeTweets(outputFile) - if err != nil { - t.Fatalf("Error scraping tweets: %v", err) - } - - // Verify the output - file, err := os.Open(outputFile) - if err != nil { - t.Fatalf("Error opening output file: %v", err) - } - defer file.Close() - - reader := csv.NewReader(file) - records, err := reader.ReadAll() - if err != nil { - t.Fatalf("Error reading CSV: %v", err) - } - - if len(records) != 3 { // Header + 2 tweets - t.Errorf("Expected 3 records, got %d", len(records)) - } - - if records[0][0] != "tweet" || records[0][1] != "datetime" { - t.Errorf("Unexpected header: %v", records[0]) - } - - for i, record := range records[1:] { - if record[0] == "" || record[1] == "" { - t.Errorf("Empty field in record %d: %v", i+1, record) - } - _, err := time.Parse(time.RFC3339, record[1]) - if err != nil { - t.Errorf("Invalid datetime format in record %d: %v", i+1, record[1]) - } - } -} - -func TestScrapeTweets(t *testing.T) { - setup() - - query := "Sadhguru" - count := 10 - - for i := 0; i < 100; i++ { - tweets, err := twitter.ScrapeTweetsByQuery(query, count) - if err != nil { - logrus.WithError(err).Error("[-] Failed to scrape tweets") - return - } - - var validTweets []*twitter.TweetResult - for _, tweet := range tweets { - if tweet.Error != nil { - logrus.WithError(tweet.Error).Warn("[-] Error in tweet") - continue - } - validTweets = append(validTweets, tweet) - } - - tweetsData, err := json.Marshal(validTweets) - if err != nil { - logrus.WithError(err).Error("[-] Failed to serialize tweets data") - return - } - - logrus.WithFields(logrus.Fields{ - "total_tweets": len(tweets), - "valid_tweets": len(validTweets), - "tweets_sample": string(tweetsData[:min(100, len(tweetsData))]), - }).Debug("[+] Tweets data") - - if len(tweetsData) > 0 { - assert.NotNil(t, tweetsData[:min(10, len(tweetsData))]) - } else { - assert.Nil(t, tweetsData) - } - } -} - -func min(a, b int) int { - if a < b { - return a - } - return b -} From 413ae8063c45ae6230bd2e5380b2a9b6da217adc Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 1 Oct 2024 11:19:04 -0700 Subject: [PATCH 03/16] refactor(twitter): remove sentiment and trends handlers - Delete TwitterSentimentHandler and TwitterTrendsHandler structs - Remove corresponding HandleWork functions for sentiment and trends - Update WorkerType constants and related functions to exclude sentiment and trends - Adjust WorkHandlerManager initialization to remove sentiment and trends handlers --- pkg/tests/scrapers/twitter_test.go | 0 pkg/workers/handlers/twitter.go | 29 ----------------------------- pkg/workers/types/work_types.go | 6 ++---- pkg/workers/worker_manager.go | 2 -- 4 files changed, 2 insertions(+), 35 deletions(-) create mode 100644 pkg/tests/scrapers/twitter_test.go diff --git a/pkg/tests/scrapers/twitter_test.go b/pkg/tests/scrapers/twitter_test.go new file mode 100644 index 00000000..e69de29b diff --git a/pkg/workers/handlers/twitter.go b/pkg/workers/handlers/twitter.go index 72d164ef..bd4cc00e 100644 --- a/pkg/workers/handlers/twitter.go +++ b/pkg/workers/handlers/twitter.go @@ -12,8 +12,6 @@ import ( type TwitterQueryHandler struct{} type TwitterFollowersHandler struct{} type TwitterProfileHandler struct{} -type TwitterSentimentHandler struct{} -type TwitterTrendsHandler struct{} func (h *TwitterQueryHandler) HandleWork(data []byte) data_types.WorkResponse { logrus.Infof("[+] TwitterQueryHandler input: %s", data) @@ -73,30 +71,3 @@ func (h *TwitterProfileHandler) HandleWork(data []byte) data_types.WorkResponse logrus.Infof("[+] TwitterProfileHandler Work response for %s: %d records returned", data_types.TwitterProfile, 1) return data_types.WorkResponse{Data: resp, RecordCount: 1} } - -func (h *TwitterSentimentHandler) HandleWork(data []byte) data_types.WorkResponse { - logrus.Infof("[+] TwitterSentimentHandler %s", data) - dataMap, err := JsonBytesToMap(data) - if err != nil { - return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse twitter sentiment data: %v", err)} - } - count := int(dataMap["count"].(float64)) - query := dataMap["query"].(string) - model := dataMap["model"].(string) - _, resp, err := twitter.ScrapeTweetsForSentiment(query, count, model) - if err != nil { - return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter sentiment: %v", err)} - } - logrus.Infof("[+] TwitterSentimentHandler Work response for %s: %d records returned", data_types.TwitterSentiment, 1) - return data_types.WorkResponse{Data: resp, RecordCount: 1} -} - -func (h *TwitterTrendsHandler) HandleWork(data []byte) data_types.WorkResponse { - logrus.Infof("[+] TwitterTrendsHandler %s", data) - resp, err := twitter.ScrapeTweetsByTrends() - if err != nil { - return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter trends: %v", err)} - } - logrus.Infof("[+] TwitterTrendsHandler Work response for %s: %d records returned", data_types.TwitterTrends, len(resp)) - return data_types.WorkResponse{Data: resp, RecordCount: len(resp)} -} diff --git a/pkg/workers/types/work_types.go b/pkg/workers/types/work_types.go index 8e81f335..bdbf68fc 100644 --- a/pkg/workers/types/work_types.go +++ b/pkg/workers/types/work_types.go @@ -21,8 +21,6 @@ const ( Twitter WorkerType = "twitter" TwitterFollowers WorkerType = "twitter-followers" TwitterProfile WorkerType = "twitter-profile" - TwitterSentiment WorkerType = "twitter-sentiment" - TwitterTrends WorkerType = "twitter-trends" Web WorkerType = "web" WebSentiment WorkerType = "web-sentiment" Test WorkerType = "test" @@ -43,7 +41,7 @@ func WorkerTypeToCategory(wt WorkerType) pubsub.WorkerCategory { case TelegramSentiment, TelegramChannelMessages: logrus.Info("WorkerType is related to Telegram") return pubsub.CategoryTelegram - case Twitter, TwitterFollowers, TwitterProfile, TwitterSentiment, TwitterTrends: + case Twitter, TwitterFollowers, TwitterProfile: logrus.Info("WorkerType is related to Twitter") return pubsub.CategoryTwitter case Web, WebSentiment: @@ -65,7 +63,7 @@ func WorkerTypeToDataSource(wt WorkerType) string { case TelegramSentiment, TelegramChannelMessages: logrus.Info("WorkerType is related to Telegram") return DataSourceTelegram - case Twitter, TwitterFollowers, TwitterProfile, TwitterSentiment, TwitterTrends: + case Twitter, TwitterFollowers, TwitterProfile: logrus.Info("WorkerType is related to Twitter") return DataSourceTwitter case Web, WebSentiment: diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index f44b1c66..7ea38831 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -34,8 +34,6 @@ func NewWorkHandlerManager(opts ...WorkerOptionFunc) *WorkHandlerManager { whm.addWorkHandler(data_types.Twitter, &handlers.TwitterQueryHandler{}) whm.addWorkHandler(data_types.TwitterFollowers, &handlers.TwitterFollowersHandler{}) whm.addWorkHandler(data_types.TwitterProfile, &handlers.TwitterProfileHandler{}) - whm.addWorkHandler(data_types.TwitterSentiment, &handlers.TwitterSentimentHandler{}) - whm.addWorkHandler(data_types.TwitterTrends, &handlers.TwitterTrendsHandler{}) } if options.isWebScraperWorker { From f5c5b2c750abd629c128e93fe27ea412f3491fc3 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 1 Oct 2024 12:07:01 -0700 Subject: [PATCH 04/16] refactor(twitter): export Auth function, update scrapers, and remove objx dependency - **Exported Auth Function:** - Renamed `auth` to `Auth` in `tweets.go` to make it publicly accessible. - Updated all scraper files (e.g., `followers.go`, `tweets.go`) to use the exported `Auth` function. - **Removed Unused Dependency:** - Eliminated `github.com/stretchr/objx` from `go.mod` as it was no longer needed. - **Optimized Sleep Durations:** - Reduced sleep durations in the `Auth` function from `500ms` to `100ms` for better performance. - **Cleaned Up Codebase:** - Removed obsolete sentiment analysis code from `tweets.go` to streamline the codebase. - **Enhanced Test Configuration:** - Fixed environment variable loading in `twitter_auth_test.go` by ensuring `.env` is correctly loaded via `scrapers_suite_test.go`. - Added and updated tests in `twitter_auth_test.go` and `scrapers_suite_test.go` to validate Twitter authentication and session reuse. --- go.mod | 1 - go.sum | 1 - pkg/scrapers/twitter/followers.go | 2 +- pkg/scrapers/twitter/tweets.go | 66 ++------------------- pkg/tests/scrapers/scrapers_suite_test.go | 55 ++++++++++++++++++ pkg/tests/scrapers/twitter_auth_test.go | 71 +++++++++++++++++++++++ pkg/tests/scrapers/twitter_test.go | 0 7 files changed, 133 insertions(+), 63 deletions(-) create mode 100644 pkg/tests/scrapers/scrapers_suite_test.go create mode 100644 pkg/tests/scrapers/twitter_auth_test.go delete mode 100644 pkg/tests/scrapers/twitter_test.go diff --git a/go.mod b/go.mod index 730e7072..2b5b5bf4 100644 --- a/go.mod +++ b/go.mod @@ -216,7 +216,6 @@ require ( github.com/spaolacci/murmur3 v1.1.0 // indirect github.com/spf13/afero v1.11.0 // indirect github.com/spf13/cast v1.6.0 // indirect - github.com/stretchr/objx v0.5.2 // indirect github.com/subosito/gotenv v1.6.0 // indirect github.com/supranational/blst v0.3.11 // indirect github.com/syndtr/goleveldb v1.0.1-0.20210819022825-2ae1ddf74ef7 // indirect diff --git a/go.sum b/go.sum index faeb6a6f..57701fab 100644 --- a/go.sum +++ b/go.sum @@ -713,7 +713,6 @@ github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+ github.com/stretchr/objx v0.2.0/go.mod h1:qt09Ya8vawLte6SNmTgCsAVtYtaKzEcn8ATUoHMkEqE= github.com/stretchr/objx v0.4.0/go.mod h1:YvHI0jy2hoMjB+UWwv71VJQ9isScKT/TqJzVSSt89Yw= github.com/stretchr/objx v0.5.0/go.mod h1:Yh+to48EsGEfYuaHDzXPcE3xhTkx73EhmCGUpEOglKo= -github.com/stretchr/objx v0.5.2 h1:xuMeJ0Sdp5ZMRXx/aWO6RZxdr3beISkG5/G/aIRr3pY= github.com/stretchr/objx v0.5.2/go.mod h1:FRsXN1f5AsAjCGJKqEizvkpNtU+EGNCLh3NxZ/8L+MA= github.com/stretchr/testify v1.2.2/go.mod h1:a8OnRcib4nhh0OaRAV+Yts87kKdq0PP7pXfy6kDkUVs= github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index 1d54c554..543adc56 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -12,7 +12,7 @@ import ( // ScrapeFollowersForProfile scrapes the profile and tweets of a specific Twitter user. // It takes the username as a parameter and returns the scraped profile information and an error if any. func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Legacy, error) { - scraper := auth() + scraper := Auth() if scraper == nil { return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index a4061095..482e2536 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -13,7 +13,6 @@ import ( "github.com/sirupsen/logrus" "github.com/masa-finance/masa-oracle/pkg/config" - "github.com/masa-finance/masa-oracle/pkg/llmbridge" ) type TweetResult struct { @@ -24,7 +23,7 @@ type TweetResult struct { // auth initializes and returns a new Twitter scraper instance. It attempts to load cookies from a file to reuse an existing session. // If no valid session is found, it performs a login with credentials specified in the application's configuration. // On successful login, it saves the session cookies for future use. If the login fails, it returns nil. -func auth() *twitterscraper.Scraper { +func Auth() *twitterscraper.Scraper { scraper := twitterscraper.New() appConfig := config.GetInstance() cookieFilePath := filepath.Join(appConfig.MasaDir, "twitter_cookies.json") @@ -41,7 +40,7 @@ func auth() *twitterscraper.Scraper { password := appConfig.TwitterPassword twoFACode := appConfig.Twitter2FaCode - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) var err error if twoFACode != "" { @@ -55,7 +54,7 @@ func auth() *twitterscraper.Scraper { return nil } - time.Sleep(500 * time.Millisecond) + time.Sleep(100 * time.Millisecond) if err = SaveCookies(scraper, cookieFilePath); err != nil { logrus.WithError(err).Error("[-] Failed to save cookies") @@ -69,59 +68,6 @@ func auth() *twitterscraper.Scraper { return scraper } -// ScrapeTweetsForSentiment is a function that scrapes tweets based on a given query, analyzes their sentiment using a specified model, and returns the sentiment analysis results. -// Parameters: -// - query: The search query string to find matching tweets. -// - count: The maximum number of tweets to retrieve and analyze. -// - model: The model to use for sentiment analysis. -// -// Returns: -// - A string representing the sentiment analysis prompt. -// - A string representing the sentiment analysis result. -// - An error if the scraping or sentiment analysis process encounters any issues. -func ScrapeTweetsForSentiment(query string, count int, model string) (string, string, error) { - scraper := auth() - var tweets []*TweetResult - - if scraper == nil { - return "", "", fmt.Errorf("there was an error authenticating with your Twitter credentials") - } - - // Set search mode - scraper.SetSearchMode(twitterscraper.SearchLatest) - - // Perform the search with the specified query and count - for tweetResult := range scraper.SearchTweets(context.Background(), query, count) { - var tweet TweetResult - if tweetResult.Error != nil { - tweet = TweetResult{ - Tweet: nil, - Error: tweetResult.Error, - } - } else { - tweet = TweetResult{ - Tweet: &tweetResult.Tweet, - Error: nil, - } - } - tweets = append(tweets, &tweet) - } - sentimentPrompt := "Please perform a sentiment analysis on the following tweets, using an unbiased approach. Sentiment analysis involves identifying and categorizing opinions expressed in text, particularly to determine whether the writer's attitude towards a particular topic, product, etc., is positive, negative, or neutral. After analyzing, please provide a summary of the overall sentiment expressed in these tweets, including the proportion of positive, negative, and neutral sentiments if applicable." - - twitterScraperTweets := make([]*twitterscraper.TweetResult, len(tweets)) - for i, tweet := range tweets { - twitterScraperTweets[i] = &twitterscraper.TweetResult{ - Tweet: *tweet.Tweet, - Error: tweet.Error, - } - } - prompt, sentiment, err := llmbridge.AnalyzeSentimentTweets(twitterScraperTweets, model, sentimentPrompt) - if err != nil { - return "", "", err - } - return prompt, sentiment, tweets[0].Error -} - // ScrapeTweetsByQuery performs a search on Twitter for tweets matching the specified query. // It fetches up to the specified count of tweets and returns a slice of Tweet pointers. // Parameters: @@ -132,7 +78,7 @@ func ScrapeTweetsForSentiment(query string, count int, model string) (string, st // - A slice of pointers to twitterscraper.Tweet objects that match the search query. // - An error if the scraping process encounters any issues. func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { - scraper := auth() + scraper := Auth() var tweets []*TweetResult var lastError error @@ -167,7 +113,7 @@ func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { // It returns a slice of strings representing the trending topics. // If an error occurs during the scraping process, it returns an error. func ScrapeTweetsByTrends() ([]*TweetResult, error) { - scraper := auth() + scraper := Auth() var trendResults []*TweetResult if scraper == nil { @@ -196,7 +142,7 @@ func ScrapeTweetsByTrends() ([]*TweetResult, error) { // ScrapeTweetsProfile scrapes the profile and tweets of a specific Twitter user. // It takes the username as a parameter and returns the scraped profile information and an error if any. func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { - scraper := auth() + scraper := Auth() if scraper == nil { return twitterscraper.Profile{}, fmt.Errorf("there was an error authenticating with your Twitter credentials") diff --git a/pkg/tests/scrapers/scrapers_suite_test.go b/pkg/tests/scrapers/scrapers_suite_test.go new file mode 100644 index 00000000..b27edc56 --- /dev/null +++ b/pkg/tests/scrapers/scrapers_suite_test.go @@ -0,0 +1,55 @@ +package scrapers_test + +import ( + "os" + "path/filepath" + "testing" + + "github.com/joho/godotenv" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" +) + +func TestScrapers(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Scrapers Suite") +} + +func TestMain(m *testing.M) { + // Override os.Args to prevent flag parsing errors + os.Args = []string{os.Args[0]} + + // Get the current working directory + cwd, err := os.Getwd() + if err != nil { + logrus.Fatalf("Failed to get current directory: %v", err) + } + + // Define the project root path + projectRoot := filepath.Join(cwd, "..", "..", "..") + envPath := filepath.Join(projectRoot, ".env") + + // Load the .env file + err = godotenv.Load(envPath) + if err != nil { + logrus.Warnf("Error loading .env file from %s: %v", envPath, err) + } else { + logrus.Info("Loaded .env from project root") + } + + // Verify that the required environment variables are set + requiredEnvVars := []string{"USER_AGENTS", "TWITTER_USERNAME", "TWITTER_PASSWORD"} + for _, envVar := range requiredEnvVars { + value := os.Getenv(envVar) + if value == "" { + logrus.Warnf("%s environment variable is not set", envVar) + } else { + logrus.Debugf("%s: %s", envVar, value) + } + } + + // Run the tests + exitCode := m.Run() + os.Exit(exitCode) +} diff --git a/pkg/tests/scrapers/twitter_auth_test.go b/pkg/tests/scrapers/twitter_auth_test.go new file mode 100644 index 00000000..59095519 --- /dev/null +++ b/pkg/tests/scrapers/twitter_auth_test.go @@ -0,0 +1,71 @@ +package scrapers_test + +import ( + "os" + "path/filepath" + + "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" + twitterscraper "github.com/masa-finance/masa-twitter-scraper" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" + "github.com/sirupsen/logrus" +) + +var _ = Describe("Twitter Auth Function", func() { + var ( + twitterUsername string + twitterPassword string + twoFACode string + ) + + BeforeEach(func() { + tempDir := GinkgoT().TempDir() + config.GetInstance().MasaDir = tempDir + + twitterUsername = os.Getenv("TWITTER_USERNAME") + twitterPassword = os.Getenv("TWITTER_PASSWORD") + twoFACode = os.Getenv("TWITTER_2FA_CODE") + + Expect(twitterUsername).NotTo(BeEmpty(), "TWITTER_USERNAME environment variable is not set") + Expect(twitterPassword).NotTo(BeEmpty(), "TWITTER_PASSWORD environment variable is not set") + + config.GetInstance().TwitterUsername = twitterUsername + config.GetInstance().TwitterPassword = twitterPassword + config.GetInstance().Twitter2FaCode = twoFACode + }) + + authenticate := func() *twitterscraper.Scraper { + return twitter.Auth() + } + + checkLoggedIn := func(scraper *twitterscraper.Scraper) bool { + return twitter.IsLoggedIn(scraper) + } + + It("authenticates and logs in successfully", func() { + scraper := authenticate() + Expect(scraper).NotTo(BeNil()) + + cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + Expect(cookieFile).To(BeAnExistingFile()) + + Expect(checkLoggedIn(scraper)).To(BeTrue()) + logrus.Info("Authenticated and logged in to Twitter") + }) + + It("reuses session from cookies", func() { + firstScraper := authenticate() + Expect(firstScraper).NotTo(BeNil()) + + secondScraper := authenticate() + Expect(secondScraper).NotTo(BeNil()) + + Expect(checkLoggedIn(secondScraper)).To(BeTrue()) + logrus.Info("Reused session from cookies") + }) + + AfterEach(func() { + os.RemoveAll(config.GetInstance().MasaDir) + }) +}) diff --git a/pkg/tests/scrapers/twitter_test.go b/pkg/tests/scrapers/twitter_test.go deleted file mode 100644 index e69de29b..00000000 From 107fc6771cc93d995af3f2c5b2a280f403f3d695 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 1 Oct 2024 13:49:00 -0700 Subject: [PATCH 05/16] chore: delete scrape tweets by trends (deprecated) --- pkg/scrapers/twitter/tweets.go | 30 ------------------------------ 1 file changed, 30 deletions(-) diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index 482e2536..30fed430 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -109,36 +109,6 @@ func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { return tweets, nil } -// ScrapeTweetsByTrends scrapes the current trending topics on Twitter. -// It returns a slice of strings representing the trending topics. -// If an error occurs during the scraping process, it returns an error. -func ScrapeTweetsByTrends() ([]*TweetResult, error) { - scraper := Auth() - var trendResults []*TweetResult - - if scraper == nil { - return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") - } - - // Set search mode - scraper.SetSearchMode(twitterscraper.SearchLatest) - - trends, err := scraper.GetTrends() - if err != nil { - return nil, err - } - - for _, trend := range trends { - trendResult := &TweetResult{ - Tweet: &twitterscraper.Tweet{Text: trend}, - Error: nil, - } - trendResults = append(trendResults, trendResult) - } - - return trendResults, trendResults[0].Error -} - // ScrapeTweetsProfile scrapes the profile and tweets of a specific Twitter user. // It takes the username as a parameter and returns the scraped profile information and an error if any. func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { From 6ca2fdf9caeb9f1137ae8329b88e59726da7a3d4 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 1 Oct 2024 14:03:36 -0700 Subject: [PATCH 06/16] feat(tests): enhance Twitter auth and scraping tests This commit improves the Twitter authentication and scraping tests in the pkg/tests/scrapers/twitter_auth_test.go file. The changes include: - Add godotenv package to load environment variables - Implement a loadEnv function to handle .env file loading - Enhance "authenticates and logs in successfully" test: - Verify cookie file doesn't exist before authentication - Check cookie file creation after authentication - Perform a simple profile scrape to validate the session - Improve "reuses session from cookies" test: - Verify cookie file creation - Force cookie reuse by clearing the first scraper - Validate the reused session with a profile scrape - Add new test "scrapes the profile of 'god' and recent #Bitcoin tweets using saved cookies": - Authenticate twice to ensure cookie reuse - Scrape the profile of user 'god' - Fetch and verify the last 3 tweets containing #Bitcoin - Log scraped data for manual inspection These changes provide more robust testing of the Twitter authentication process, session reuse, and scraping functionality, ensuring better coverage and reliability of the Twitter-related features. --- pkg/tests/scrapers/twitter_auth_test.go | 79 ++++++++++++++++++++++++- 1 file changed, 76 insertions(+), 3 deletions(-) diff --git a/pkg/tests/scrapers/twitter_auth_test.go b/pkg/tests/scrapers/twitter_auth_test.go index 59095519..81c5f4be 100644 --- a/pkg/tests/scrapers/twitter_auth_test.go +++ b/pkg/tests/scrapers/twitter_auth_test.go @@ -4,6 +4,7 @@ import ( "os" "path/filepath" + "github.com/joho/godotenv" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" twitterscraper "github.com/masa-finance/masa-twitter-scraper" @@ -19,7 +20,16 @@ var _ = Describe("Twitter Auth Function", func() { twoFACode string ) + loadEnv := func() { + err := godotenv.Load() + if err != nil { + logrus.Warn("Error loading .env file") + } + } + BeforeEach(func() { + loadEnv() + tempDir := GinkgoT().TempDir() config.GetInstance().MasaDir = tempDir @@ -44,25 +54,88 @@ var _ = Describe("Twitter Auth Function", func() { } It("authenticates and logs in successfully", func() { + // Ensure cookie file doesn't exist before authentication + cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + Expect(cookieFile).NotTo(BeAnExistingFile()) + + // Authenticate scraper := authenticate() Expect(scraper).NotTo(BeNil()) - cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + // Check if cookie file was created Expect(cookieFile).To(BeAnExistingFile()) + // Verify logged in state Expect(checkLoggedIn(scraper)).To(BeTrue()) - logrus.Info("Authenticated and logged in to Twitter") + + // Attempt a simple operation to verify the session is valid + profile, err := twitter.ScrapeTweetsProfile("twitter") + Expect(err).To(BeNil()) + Expect(profile.Username).To(Equal("twitter")) + + logrus.Info("Authenticated and logged in to Twitter successfully") }) It("reuses session from cookies", func() { + // First authentication firstScraper := authenticate() Expect(firstScraper).NotTo(BeNil()) + // Verify cookie file is created + cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + Expect(cookieFile).To(BeAnExistingFile()) + + // Clear the scraper to force cookie reuse + firstScraper = nil + + // Second authentication (should use cookies) secondScraper := authenticate() Expect(secondScraper).NotTo(BeNil()) + // Verify logged in state Expect(checkLoggedIn(secondScraper)).To(BeTrue()) - logrus.Info("Reused session from cookies") + + // Attempt a simple operation to verify the session is valid + profile, err := twitter.ScrapeTweetsProfile("twitter") + Expect(err).To(BeNil()) + Expect(profile.Username).To(Equal("twitter")) + + logrus.Info("Reused session from cookies successfully") + }) + + It("scrapes the profile of 'god' and recent #Bitcoin tweets using saved cookies", func() { + // First authentication + firstScraper := authenticate() + Expect(firstScraper).NotTo(BeNil()) + + // Verify cookie file is created + cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + Expect(cookieFile).To(BeAnExistingFile()) + + // Clear the scraper to force cookie reuse + firstScraper = nil + + // Second authentication (should use cookies) + secondScraper := authenticate() + Expect(secondScraper).NotTo(BeNil()) + + // Verify logged in state + Expect(twitter.IsLoggedIn(secondScraper)).To(BeTrue()) + + // Attempt to scrape profile + profile, err := twitter.ScrapeTweetsProfile("god") + Expect(err).To(BeNil()) + logrus.Infof("Profile of 'god': %+v", profile) + + // Scrape recent #Bitcoin tweets + tweets, err := twitter.ScrapeTweetsByQuery("#Bitcoin", 3) + Expect(err).To(BeNil()) + Expect(tweets).To(HaveLen(3)) + + logrus.Info("Recent #Bitcoin tweets:") + for i, tweet := range tweets { + logrus.Infof("Tweet %d: %s", i+1, tweet.Tweet.Text) + } }) AfterEach(func() { From fbc08e2dd9965ea46f933b9a2fd998f802090a71 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 1 Oct 2024 14:14:46 -0700 Subject: [PATCH 07/16] chore: rename files --- .../{twitter_auth_test.go => twitter_scraper_test.go} | 11 +++++++++-- 1 file changed, 9 insertions(+), 2 deletions(-) rename pkg/tests/scrapers/{twitter_auth_test.go => twitter_scraper_test.go} (92%) diff --git a/pkg/tests/scrapers/twitter_auth_test.go b/pkg/tests/scrapers/twitter_scraper_test.go similarity index 92% rename from pkg/tests/scrapers/twitter_auth_test.go rename to pkg/tests/scrapers/twitter_scraper_test.go index 81c5f4be..d4485ff1 100644 --- a/pkg/tests/scrapers/twitter_auth_test.go +++ b/pkg/tests/scrapers/twitter_scraper_test.go @@ -3,6 +3,7 @@ package scrapers_test import ( "os" "path/filepath" + "runtime" "github.com/joho/godotenv" "github.com/masa-finance/masa-oracle/pkg/config" @@ -21,9 +22,15 @@ var _ = Describe("Twitter Auth Function", func() { ) loadEnv := func() { - err := godotenv.Load() + _, filename, _, _ := runtime.Caller(0) + projectRoot := filepath.Join(filepath.Dir(filename), "..", "..", "..") + envPath := filepath.Join(projectRoot, ".env") + + err := godotenv.Load(envPath) if err != nil { - logrus.Warn("Error loading .env file") + logrus.Warnf("Error loading .env file from %s: %v", envPath, err) + } else { + logrus.Infof("Loaded .env from %s", envPath) } } From 92d636b1820b75f7ab99190bbf07f1d9b2de4e5d Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Tue, 1 Oct 2024 14:57:38 -0700 Subject: [PATCH 08/16] chore: add godoc dev notes --- pkg/tests/scrapers/twitter_scraper_test.go | 7 +++++++ 1 file changed, 7 insertions(+) diff --git a/pkg/tests/scrapers/twitter_scraper_test.go b/pkg/tests/scrapers/twitter_scraper_test.go index d4485ff1..e2b61bdc 100644 --- a/pkg/tests/scrapers/twitter_scraper_test.go +++ b/pkg/tests/scrapers/twitter_scraper_test.go @@ -1,3 +1,10 @@ +// Package scrapers_test contains integration tests for the Twitter scraper functionality. +// +// Dev Notes: +// - These tests require valid Twitter credentials set in environment variables. +// - The tests use a temporary directory for storing cookies and other data. +// - Make sure to run these tests in a controlled environment to avoid rate limiting. +// - The tests cover authentication, session reuse, and basic scraping operations. package scrapers_test import ( From b9d936f932d99a28c8cf7b642f9fcf7aa94f3f7e Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Wed, 2 Oct 2024 21:28:16 -0700 Subject: [PATCH 09/16] feat(twitter): implement account rotation and rate limit handling This commit introduces significant improvements to the Twitter scraping functionality: 1. Account Management: - Add TwitterAccount struct to represent individual Twitter accounts - Implement TwitterAccountManager for managing multiple accounts - Create functions for account rotation and rate limit tracking 2. Authentication: - Refactor Auth function to use account rotation - Implement cookie-based session management for each account - Add retry logic for authentication failures 3. Scraping Functions: - Update ScrapeTweetsByQuery and ScrapeTweetsProfile to use account rotation - Implement rate limit detection and account switching - Add retry mechanisms for failed operations 4. Configuration: - Move from hardcoded credentials to .env file-based configuration - Implement loadAccountsFromConfig to read multiple accounts from .env 5. Error Handling: - Improve error logging and handling throughout the package - Add specific handling for rate limit errors 6. Performance: - Implement concurrent scraping with multiple accounts - Add delays between requests to avoid aggressive rate limiting These changes significantly enhance the robustness and efficiency of the Twitter scraping functionality, allowing for better handling of rate limits and improved reliability through account rotation. --- pkg/scrapers/twitter/auth.go | 102 +++++++++++--- pkg/scrapers/twitter/cookies.go | 27 ++-- pkg/scrapers/twitter/followers.go | 55 +++++--- pkg/scrapers/twitter/tweets.go | 220 +++++++++++++++++++----------- 4 files changed, 265 insertions(+), 139 deletions(-) diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index e2064e53..ffa8735a 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -2,35 +2,98 @@ package twitter import ( "fmt" + "sync" + "time" + "github.com/masa-finance/masa-oracle/pkg/config" twitterscraper "github.com/masa-finance/masa-twitter-scraper" + "github.com/sirupsen/logrus" ) -// Login attempts to log in to the Twitter scraper service. -// It supports three modes of operation: -// 1. Basic login using just a username and password. -// 2. Login requiring an email confirmation, using a username, password, and email address. -// 3. Login with two-factor authentication, using a username, password, and 2FA code. -// Parameters: -// - scraper: A pointer to an instance of the twitterscraper.Scraper. -// - credentials: A variadic list of strings representing login credentials. -// The function expects either two strings (username, password) for basic login, -// or three strings (username, password, email/2FA code) for email confirmation or 2FA. -// -// Returns an error if login fails or if an invalid number of credentials is provided. +type TwitterAccount struct { + Username string + Password string + TwoFACode string + RateLimitedUntil time.Time +} + +type TwitterAccountManager struct { + accounts []*TwitterAccount + index int + mutex sync.Mutex +} + +func NewTwitterAccountManager(accounts []*TwitterAccount) *TwitterAccountManager { + return &TwitterAccountManager{ + accounts: accounts, + index: 0, + } +} + +func (manager *TwitterAccountManager) GetNextAccount() *TwitterAccount { + manager.mutex.Lock() + defer manager.mutex.Unlock() + for i := 0; i < len(manager.accounts); i++ { + account := manager.accounts[manager.index] + manager.index = (manager.index + 1) % len(manager.accounts) + if time.Now().After(account.RateLimitedUntil) { + return account + } + } + return nil +} + +func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAccount) { + manager.mutex.Lock() + defer manager.mutex.Unlock() + account.RateLimitedUntil = time.Now().Add(time.Hour) +} + +func Auth(account *TwitterAccount) *twitterscraper.Scraper { + scraper := twitterscraper.New() + baseDir := config.GetInstance().MasaDir + + if err := LoadCookies(scraper, account, baseDir); err == nil { + logrus.Debugf("Cookies loaded for user %s.", account.Username) + if IsLoggedIn(scraper) { + logrus.Debugf("Already logged in as %s.", account.Username) + return scraper + } + } + + time.Sleep(100 * time.Millisecond) + + var err error + if account.TwoFACode != "" { + err = Login(scraper, account.Username, account.Password, account.TwoFACode) + } else { + err = Login(scraper, account.Username, account.Password) + } + + if err != nil { + logrus.WithError(err).Warnf("Login failed for %s", account.Username) + return nil + } + + time.Sleep(100 * time.Millisecond) + + if err = SaveCookies(scraper, account, baseDir); err != nil { + logrus.WithError(err).Errorf("Failed to save cookies for %s", account.Username) + } + + logrus.Debugf("Login successful for %s", account.Username) + return scraper +} + func Login(scraper *twitterscraper.Scraper, credentials ...string) error { var err error switch len(credentials) { case 2: - // Basic login with username and password. err = scraper.Login(credentials[0], credentials[1]) case 3: - // The third parameter is used for either email confirmation or a 2FA code. - // This design assumes the Twitter scraper's Login method can contextually handle both cases. err = scraper.Login(credentials[0], credentials[1], credentials[2]) default: - // Return an error if the number of provided credentials is neither 2 nor 3. - return fmt.Errorf("invalid number of login credentials provided") + return fmt.Errorf("invalid number of credentials") } if err != nil { return fmt.Errorf("%v", err) @@ -43,9 +106,8 @@ func IsLoggedIn(scraper *twitterscraper.Scraper) bool { } func Logout(scraper *twitterscraper.Scraper) error { - err := scraper.Logout() - if err != nil { - return fmt.Errorf("[-] Logout failed: %v", err) + if err := scraper.Logout(); err != nil { + return fmt.Errorf("logout failed: %v", err) } return nil } diff --git a/pkg/scrapers/twitter/cookies.go b/pkg/scrapers/twitter/cookies.go index aef042e0..467e8686 100644 --- a/pkg/scrapers/twitter/cookies.go +++ b/pkg/scrapers/twitter/cookies.go @@ -5,37 +5,32 @@ import ( "fmt" "net/http" "os" + "path/filepath" twitterscraper "github.com/masa-finance/masa-twitter-scraper" ) -func SaveCookies(scraper *twitterscraper.Scraper, filePath string) error { +func SaveCookies(scraper *twitterscraper.Scraper, account *TwitterAccount, baseDir string) error { + cookieFile := filepath.Join(baseDir, fmt.Sprintf("%s_twitter_cookies.json", account.Username)) cookies := scraper.GetCookies() - js, err := json.Marshal(cookies) + data, err := json.Marshal(cookies) if err != nil { return fmt.Errorf("error marshaling cookies: %v", err) } - err = os.WriteFile(filePath, js, 0644) - if err != nil { - return fmt.Errorf("error saving cookies to file: %v", err) - } - - // Load the saved cookies back into the scraper - if err := LoadCookies(scraper, filePath); err != nil { - return fmt.Errorf("error loading saved cookies: %v", err) + if err = os.WriteFile(cookieFile, data, 0644); err != nil { + return fmt.Errorf("error saving cookies: %v", err) } - return nil } -func LoadCookies(scraper *twitterscraper.Scraper, filePath string) error { - js, err := os.ReadFile(filePath) +func LoadCookies(scraper *twitterscraper.Scraper, account *TwitterAccount, baseDir string) error { + cookieFile := filepath.Join(baseDir, fmt.Sprintf("%s_twitter_cookies.json", account.Username)) + data, err := os.ReadFile(cookieFile) if err != nil { - return fmt.Errorf("error reading cookies from file: %v", err) + return fmt.Errorf("error reading cookies: %v", err) } var cookies []*http.Cookie - err = json.Unmarshal(js, &cookies) - if err != nil { + if err = json.Unmarshal(data, &cookies); err != nil { return fmt.Errorf("error unmarshaling cookies: %v", err) } scraper.SetCookies(cookies) diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index 543adc56..3ef98874 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -3,36 +3,49 @@ package twitter import ( "encoding/json" "fmt" + "strings" _ "github.com/lib/pq" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) -// ScrapeFollowersForProfile scrapes the profile and tweets of a specific Twitter user. -// It takes the username as a parameter and returns the scraped profile information and an error if any. +// ScrapeFollowersForProfile scrapes the followers of a specific Twitter user. +// It takes the username and count as parameters and returns the scraped followers information and an error if any. func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Legacy, error) { - scraper := Auth() + once.Do(initializeAccountManager) - if scraper == nil { - return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") - } + for { + account := accountManager.GetNextAccount() + if account == nil { + return nil, fmt.Errorf("all accounts are rate-limited") + } - followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") - if errString != "" { - logrus.Printf("Error fetching profile: %v", errString) - return nil, fmt.Errorf("%v", errString) - } + scraper := Auth(account) + if scraper == nil { + logrus.Errorf("Authentication failed for %s", account.Username) + continue + } - // Marshal the followingResponse into a JSON string for logging - responseJSON, err := json.Marshal(followingResponse) - if err != nil { - // Log the error if the marshaling fails - logrus.Errorf("[-] Error marshaling followingResponse: %v", err) - } else { - // Log the JSON string of followingResponse - logrus.Debugf("Following response: %s", responseJSON) - } + followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") + if errString != "" { + if strings.Contains(errString, "Rate limit exceeded") { + accountManager.MarkAccountRateLimited(account) + logrus.Warnf("Rate limited: %s", account.Username) + continue + } + logrus.Errorf("Error fetching followers: %v", errString) + return nil, fmt.Errorf("%v", errString) + } - return followingResponse, nil + // Marshal the followingResponse into a JSON string for logging + responseJSON, err := json.Marshal(followingResponse) + if err != nil { + logrus.Errorf("Error marshaling followingResponse: %v", err) + } else { + logrus.Debugf("Following response: %s", responseJSON) + } + + return followingResponse, nil + } } diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index 30fed430..e8ba7fdf 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -3,16 +3,20 @@ package twitter import ( "context" "fmt" - "path/filepath" + "os" "strings" + "sync" "time" - _ "github.com/lib/pq" - + "github.com/joho/godotenv" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" +) - "github.com/masa-finance/masa-oracle/pkg/config" +var ( + accountManager *TwitterAccountManager + once sync.Once + maxRetries = 3 ) type TweetResult struct { @@ -20,111 +24,163 @@ type TweetResult struct { Error error } -// auth initializes and returns a new Twitter scraper instance. It attempts to load cookies from a file to reuse an existing session. -// If no valid session is found, it performs a login with credentials specified in the application's configuration. -// On successful login, it saves the session cookies for future use. If the login fails, it returns nil. -func Auth() *twitterscraper.Scraper { - scraper := twitterscraper.New() - appConfig := config.GetInstance() - cookieFilePath := filepath.Join(appConfig.MasaDir, "twitter_cookies.json") - - if err := LoadCookies(scraper, cookieFilePath); err == nil { - logrus.Debug("Cookies loaded successfully.") - if IsLoggedIn(scraper) { - logrus.Debug("Already logged in via cookies.") - return scraper - } - } - - username := appConfig.TwitterUsername - password := appConfig.TwitterPassword - twoFACode := appConfig.Twitter2FaCode - - time.Sleep(100 * time.Millisecond) +func initializeAccountManager() { + accounts := loadAccountsFromConfig() + accountManager = NewTwitterAccountManager(accounts) +} - var err error - if twoFACode != "" { - err = Login(scraper, username, password, twoFACode) - } else { - err = Login(scraper, username, password) +// loadAccountsFromConfig reads Twitter accounts from the .env file +func loadAccountsFromConfig() []*TwitterAccount { + err := godotenv.Load() + if err != nil { + logrus.Fatalf("error loading .env file: %v", err) } - if err != nil { - logrus.WithError(err).Warning("[-] Login failed") - return nil + accountsEnv := os.Getenv("TWITTER_ACCOUNTS") + if accountsEnv == "" { + logrus.Fatal("TWITTER_ACCOUNTS not set in .env file") } - time.Sleep(100 * time.Millisecond) + accountPairs := strings.Split(accountsEnv, ",") + var accounts []*TwitterAccount - if err = SaveCookies(scraper, cookieFilePath); err != nil { - logrus.WithError(err).Error("[-] Failed to save cookies") + for _, pair := range accountPairs { + credentials := strings.Split(pair, ":") + if len(credentials) != 2 { + logrus.Warnf("invalid account credentials: %s", pair) + continue + } + account := &TwitterAccount{ + Username: strings.TrimSpace(credentials[0]), + Password: strings.TrimSpace(credentials[1]), + } + accounts = append(accounts, account) } - logrus.WithFields(logrus.Fields{ - "auth": true, - "username": username, - }).Debug("Login successful") - - return scraper + return accounts } -// ScrapeTweetsByQuery performs a search on Twitter for tweets matching the specified query. -// It fetches up to the specified count of tweets and returns a slice of Tweet pointers. -// Parameters: -// - query: The search query string to find matching tweets. -// - count: The maximum number of tweets to retrieve. -// -// Returns: -// - A slice of pointers to twitterscraper.Tweet objects that match the search query. -// - An error if the scraping process encounters any issues. func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { - scraper := Auth() - var tweets []*TweetResult - var lastError error + once.Do(initializeAccountManager) - if scraper == nil { - return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") + getAuthenticatedScraper := func() (*twitterscraper.Scraper, *TwitterAccount, error) { + account := accountManager.GetNextAccount() + if account == nil { + return nil, nil, fmt.Errorf("all accounts are rate-limited") + } + scraper := Auth(account) + if scraper == nil { + logrus.Errorf("authentication failed for %s", account.Username) + return nil, account, fmt.Errorf("authentication failed for %s", account.Username) + } + return scraper, account, nil } - // Set search mode - scraper.SetSearchMode(twitterscraper.SearchLatest) - - // Perform the search with the specified query and count - for tweetResult := range scraper.SearchTweets(context.Background(), query, count) { - if tweetResult.Error != nil { - lastError = tweetResult.Error - logrus.Warnf("[+] Error encountered while scraping tweet: %v", tweetResult.Error) - if strings.Contains(tweetResult.Error.Error(), "Rate limit exceeded") { - return nil, fmt.Errorf("Twitter API rate limit exceeded (429 error)") + scrapeTweets := func(scraper *twitterscraper.Scraper) ([]*TweetResult, error) { + var tweets []*TweetResult + ctx := context.Background() + scraper.SetSearchMode(twitterscraper.SearchLatest) + for tweet := range scraper.SearchTweets(ctx, query, count) { + if tweet.Error != nil { + return nil, tweet.Error } - continue + tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) } - tweets = append(tweets, &TweetResult{Tweet: &tweetResult.Tweet, Error: nil}) + return tweets, nil } - if len(tweets) == 0 && lastError != nil { - return nil, lastError + handleRateLimit := func(err error, account *TwitterAccount) bool { + if strings.Contains(err.Error(), "Rate limit exceeded") { + accountManager.MarkAccountRateLimited(account) + logrus.Warnf("rate limited: %s", account.Username) + return true + } + return false } - return tweets, nil + return retryTweets(func() ([]*TweetResult, error) { + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return nil, err + } + + tweets, err := scrapeTweets(scraper) + if err != nil { + if handleRateLimit(err, account) { + return nil, err + } + return nil, err + } + return tweets, nil + }, maxRetries) } -// ScrapeTweetsProfile scrapes the profile and tweets of a specific Twitter user. -// It takes the username as a parameter and returns the scraped profile information and an error if any. func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { - scraper := Auth() + once.Do(initializeAccountManager) - if scraper == nil { - return twitterscraper.Profile{}, fmt.Errorf("there was an error authenticating with your Twitter credentials") + getAuthenticatedScraper := func() (*twitterscraper.Scraper, *TwitterAccount, error) { + account := accountManager.GetNextAccount() + if account == nil { + return nil, nil, fmt.Errorf("all accounts are rate-limited") + } + scraper := Auth(account) + if scraper == nil { + logrus.Errorf("authentication failed for %s", account.Username) + return nil, account, fmt.Errorf("authentication failed for %s", account.Username) + } + return scraper, account, nil } - // Set search mode - scraper.SetSearchMode(twitterscraper.SearchLatest) + getProfile := func(scraper *twitterscraper.Scraper) (twitterscraper.Profile, error) { + return scraper.GetProfile(username) + } - profile, err := scraper.GetProfile(username) - if err != nil { - return twitterscraper.Profile{}, err + handleRateLimit := func(err error, account *TwitterAccount) bool { + if strings.Contains(err.Error(), "Rate limit exceeded") { + accountManager.MarkAccountRateLimited(account) + logrus.Warnf("rate limited: %s", account.Username) + return true + } + return false } - return profile, nil + return retryProfile(func() (twitterscraper.Profile, error) { + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return twitterscraper.Profile{}, err + } + + profile, err := getProfile(scraper) + if err != nil { + if handleRateLimit(err, account) { + return twitterscraper.Profile{}, err + } + return twitterscraper.Profile{}, err + } + return profile, nil + }, maxRetries) +} + +func retryTweets(operation func() ([]*TweetResult, error), maxAttempts int) ([]*TweetResult, error) { + for attempt := 1; attempt <= maxAttempts; attempt++ { + result, err := operation() + if err == nil { + return result, nil + } + logrus.Errorf("retry attempt %d failed: %v", attempt, err) + time.Sleep(time.Duration(attempt) * time.Second) + } + return nil, fmt.Errorf("operation failed after %d attempts", maxAttempts) +} + +func retryProfile(operation func() (twitterscraper.Profile, error), maxAttempts int) (twitterscraper.Profile, error) { + for attempt := 1; attempt <= maxAttempts; attempt++ { + result, err := operation() + if err == nil { + return result, nil + } + logrus.Errorf("retry attempt %d failed: %v", attempt, err) + time.Sleep(time.Duration(attempt) * time.Second) + } + return twitterscraper.Profile{}, fmt.Errorf("operation failed after %d attempts", maxAttempts) } From 7ad1bfea4bd0482afbe4e4200a168fac9a40c2a2 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Thu, 3 Oct 2024 16:59:21 -0700 Subject: [PATCH 10/16] feat(twitter): centralize configuration management - Extract sleep time configuration into TwitterConfig struct in config.go - Update Auth function in auth.go to accept TwitterConfig parameter - Remove hardcoded sleep time values from auth.go This change improves modularity and flexibility by centralizing configuration management for the Twitter scraper. It allows for easier modification of sleep times and future expansion of configuration options without altering the core authentication logic. BREAKING CHANGE: Auth function now requires a TwitterConfig parameter. Callers must create a TwitterConfig instance using NewTwitterConfig() before invoking Auth. --- pkg/scrapers/twitter/auth.go | 7 +++---- pkg/scrapers/twitter/config.go | 13 +++++++++++++ 2 files changed, 16 insertions(+), 4 deletions(-) create mode 100644 pkg/scrapers/twitter/config.go diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index ffa8735a..81c3faa6 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -5,7 +5,6 @@ import ( "sync" "time" - "github.com/masa-finance/masa-oracle/pkg/config" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) @@ -49,7 +48,7 @@ func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAcc account.RateLimitedUntil = time.Now().Add(time.Hour) } -func Auth(account *TwitterAccount) *twitterscraper.Scraper { +func Auth(account *TwitterAccount, config *twitter.TwitterConfig) *twitterscraper.Scraper { scraper := twitterscraper.New() baseDir := config.GetInstance().MasaDir @@ -61,7 +60,7 @@ func Auth(account *TwitterAccount) *twitterscraper.Scraper { } } - time.Sleep(100 * time.Millisecond) + time.Sleep(config.SleepTime) var err error if account.TwoFACode != "" { @@ -75,7 +74,7 @@ func Auth(account *TwitterAccount) *twitterscraper.Scraper { return nil } - time.Sleep(100 * time.Millisecond) + time.Sleep(config.SleepTime) if err = SaveCookies(scraper, account, baseDir); err != nil { logrus.WithError(err).Errorf("Failed to save cookies for %s", account.Username) diff --git a/pkg/scrapers/twitter/config.go b/pkg/scrapers/twitter/config.go new file mode 100644 index 00000000..915cbc73 --- /dev/null +++ b/pkg/scrapers/twitter/config.go @@ -0,0 +1,13 @@ +package twitter + +import "time" + +type TwitterConfig struct { + SleepTime time.Duration +} + +func NewTwitterConfig() *TwitterConfig { + return &TwitterConfig{ + SleepTime: 100 * time.Millisecond, + } +} From 93ef5432a075416a380d7c53f9e8bd8e3b46aa36 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Thu, 3 Oct 2024 17:26:27 -0700 Subject: [PATCH 11/16] Revert "feat(twitter): centralize configuration management" This reverts commit 7ad1bfea4bd0482afbe4e4200a168fac9a40c2a2. --- pkg/scrapers/twitter/auth.go | 7 ++++--- pkg/scrapers/twitter/config.go | 13 ------------- 2 files changed, 4 insertions(+), 16 deletions(-) delete mode 100644 pkg/scrapers/twitter/config.go diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index 81c3faa6..ffa8735a 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -5,6 +5,7 @@ import ( "sync" "time" + "github.com/masa-finance/masa-oracle/pkg/config" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) @@ -48,7 +49,7 @@ func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAcc account.RateLimitedUntil = time.Now().Add(time.Hour) } -func Auth(account *TwitterAccount, config *twitter.TwitterConfig) *twitterscraper.Scraper { +func Auth(account *TwitterAccount) *twitterscraper.Scraper { scraper := twitterscraper.New() baseDir := config.GetInstance().MasaDir @@ -60,7 +61,7 @@ func Auth(account *TwitterAccount, config *twitter.TwitterConfig) *twitterscrape } } - time.Sleep(config.SleepTime) + time.Sleep(100 * time.Millisecond) var err error if account.TwoFACode != "" { @@ -74,7 +75,7 @@ func Auth(account *TwitterAccount, config *twitter.TwitterConfig) *twitterscrape return nil } - time.Sleep(config.SleepTime) + time.Sleep(100 * time.Millisecond) if err = SaveCookies(scraper, account, baseDir); err != nil { logrus.WithError(err).Errorf("Failed to save cookies for %s", account.Username) diff --git a/pkg/scrapers/twitter/config.go b/pkg/scrapers/twitter/config.go deleted file mode 100644 index 915cbc73..00000000 --- a/pkg/scrapers/twitter/config.go +++ /dev/null @@ -1,13 +0,0 @@ -package twitter - -import "time" - -type TwitterConfig struct { - SleepTime time.Duration -} - -func NewTwitterConfig() *TwitterConfig { - return &TwitterConfig{ - SleepTime: 100 * time.Millisecond, - } -} From 1797bfecdf8646966133efd13390d0d95c7c4ba6 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Thu, 3 Oct 2024 17:31:44 -0700 Subject: [PATCH 12/16] refactor(twitter): move sleep configs to config.go and fix import cycle - Extracted sleep configurations from `auth.go` into `config.go`. - Defined `ShortSleepDuration` and `RateLimitDuration` constants. - Created `ShortSleep()` and `GetRateLimitDuration()` functions. - Updated `auth.go` to use the new config functions. This improves modularity by separating concerns and adheres to idiomatic Go practices. --- pkg/scrapers/twitter/auth.go | 6 +++--- pkg/scrapers/twitter/config.go | 16 ++++++++++++++++ 2 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 pkg/scrapers/twitter/config.go diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index ffa8735a..93083f46 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -46,7 +46,7 @@ func (manager *TwitterAccountManager) GetNextAccount() *TwitterAccount { func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAccount) { manager.mutex.Lock() defer manager.mutex.Unlock() - account.RateLimitedUntil = time.Now().Add(time.Hour) + account.RateLimitedUntil = time.Now().Add(GetRateLimitDuration()) } func Auth(account *TwitterAccount) *twitterscraper.Scraper { @@ -61,7 +61,7 @@ func Auth(account *TwitterAccount) *twitterscraper.Scraper { } } - time.Sleep(100 * time.Millisecond) + ShortSleep() var err error if account.TwoFACode != "" { @@ -75,7 +75,7 @@ func Auth(account *TwitterAccount) *twitterscraper.Scraper { return nil } - time.Sleep(100 * time.Millisecond) + ShortSleep() if err = SaveCookies(scraper, account, baseDir); err != nil { logrus.WithError(err).Errorf("Failed to save cookies for %s", account.Username) diff --git a/pkg/scrapers/twitter/config.go b/pkg/scrapers/twitter/config.go new file mode 100644 index 00000000..f358cccf --- /dev/null +++ b/pkg/scrapers/twitter/config.go @@ -0,0 +1,16 @@ +package twitter + +import "time" + +const ( + ShortSleepDuration = 100 * time.Millisecond + RateLimitDuration = time.Hour +) + +func ShortSleep() { + time.Sleep(ShortSleepDuration) +} + +func GetRateLimitDuration() time.Duration { + return RateLimitDuration +} From 4816f0d837689ed906e1e3036444bc4df8b3720a Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Thu, 3 Oct 2024 17:36:45 -0700 Subject: [PATCH 13/16] refactor(twitter): replace Auth with NewScraper in followers and tweets - Updated `followers.go` and `tweets.go` to replace calls to `Auth` with `NewScraper`. - Resolved `undefined: Auth` errors due to previous refactoring. - Ensured all functionality and error handling remains consistent. - Improved codebase consistency following the constructor pattern. This completes the refactoring of the scraper creation process, enhancing code readability and maintainability. --- pkg/scrapers/twitter/auth.go | 2 +- pkg/scrapers/twitter/followers.go | 2 +- pkg/scrapers/twitter/tweets.go | 4 ++-- 3 files changed, 4 insertions(+), 4 deletions(-) diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index 93083f46..c20710db 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -49,7 +49,7 @@ func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAcc account.RateLimitedUntil = time.Now().Add(GetRateLimitDuration()) } -func Auth(account *TwitterAccount) *twitterscraper.Scraper { +func NewScraper(account *TwitterAccount) *twitterscraper.Scraper { scraper := twitterscraper.New() baseDir := config.GetInstance().MasaDir diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index 3ef98874..0d68fd6e 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -21,7 +21,7 @@ func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Leg return nil, fmt.Errorf("all accounts are rate-limited") } - scraper := Auth(account) + scraper := NewScraper(account) if scraper == nil { logrus.Errorf("Authentication failed for %s", account.Username) continue diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index e8ba7fdf..65c443b2 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -68,7 +68,7 @@ func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { if account == nil { return nil, nil, fmt.Errorf("all accounts are rate-limited") } - scraper := Auth(account) + scraper := NewScraper(account) if scraper == nil { logrus.Errorf("authentication failed for %s", account.Username) return nil, account, fmt.Errorf("authentication failed for %s", account.Username) @@ -123,7 +123,7 @@ func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { if account == nil { return nil, nil, fmt.Errorf("all accounts are rate-limited") } - scraper := Auth(account) + scraper := NewScraper(account) if scraper == nil { logrus.Errorf("authentication failed for %s", account.Username) return nil, account, fmt.Errorf("authentication failed for %s", account.Username) From e9ef220c328938d7ca077f14c4ea85c5649b765e Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Thu, 3 Oct 2024 17:52:05 -0700 Subject: [PATCH 14/16] fix(twitter): resolve type incompatibility errors after Scraper refactoring - Updated function signatures and variable types in `tweets.go` and `followers.go` to use the custom `*Scraper` type. - Adjusted return statements and method calls to match the new `Scraper` type. - Fixed `IncompatibleAssign` errors by ensuring consistency across all files. - Ensured all methods utilize the embedded `*twitterscraper.Scraper` methods through the custom `Scraper` type. This change finalizes the refactoring, ensuring all components work together seamlessly and conform to idiomatic Go practices. --- pkg/scrapers/twitter/auth.go | 53 ++++++++++++++----------------- pkg/scrapers/twitter/followers.go | 13 ++------ pkg/scrapers/twitter/tweets.go | 23 ++++++++------ 3 files changed, 39 insertions(+), 50 deletions(-) diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index c20710db..2d1b4cac 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -5,11 +5,14 @@ import ( "sync" "time" - "github.com/masa-finance/masa-oracle/pkg/config" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) +type Scraper struct { + *twitterscraper.Scraper +} + type TwitterAccount struct { Username string Password string @@ -49,13 +52,13 @@ func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAcc account.RateLimitedUntil = time.Now().Add(GetRateLimitDuration()) } -func NewScraper(account *TwitterAccount) *twitterscraper.Scraper { - scraper := twitterscraper.New() - baseDir := config.GetInstance().MasaDir +func NewScraper(account *TwitterAccount, cookieDir string) *Scraper { + ts := twitterscraper.New() + scraper := &Scraper{Scraper: ts} - if err := LoadCookies(scraper, account, baseDir); err == nil { + if err := LoadCookies(scraper.Scraper, account, cookieDir); err == nil { logrus.Debugf("Cookies loaded for user %s.", account.Username) - if IsLoggedIn(scraper) { + if scraper.IsLoggedIn() { logrus.Debugf("Already logged in as %s.", account.Username) return scraper } @@ -63,21 +66,14 @@ func NewScraper(account *TwitterAccount) *twitterscraper.Scraper { ShortSleep() - var err error - if account.TwoFACode != "" { - err = Login(scraper, account.Username, account.Password, account.TwoFACode) - } else { - err = Login(scraper, account.Username, account.Password) - } - - if err != nil { + if err := scraper.Login(account.Username, account.Password, account.TwoFACode); err != nil { logrus.WithError(err).Warnf("Login failed for %s", account.Username) return nil } ShortSleep() - if err = SaveCookies(scraper, account, baseDir); err != nil { + if err := SaveCookies(scraper.Scraper, account, cookieDir); err != nil { logrus.WithError(err).Errorf("Failed to save cookies for %s", account.Username) } @@ -85,29 +81,26 @@ func NewScraper(account *TwitterAccount) *twitterscraper.Scraper { return scraper } -func Login(scraper *twitterscraper.Scraper, credentials ...string) error { +func (scraper *Scraper) Login(username, password string, twoFACode ...string) error { var err error - switch len(credentials) { - case 2: - err = scraper.Login(credentials[0], credentials[1]) - case 3: - err = scraper.Login(credentials[0], credentials[1], credentials[2]) - default: - return fmt.Errorf("invalid number of credentials") + if len(twoFACode) > 0 { + err = scraper.Scraper.Login(username, password, twoFACode[0]) + } else { + err = scraper.Scraper.Login(username, password) } if err != nil { - return fmt.Errorf("%v", err) + return fmt.Errorf("login failed: %v", err) } return nil } -func IsLoggedIn(scraper *twitterscraper.Scraper) bool { - return scraper.IsLoggedIn() -} - -func Logout(scraper *twitterscraper.Scraper) error { - if err := scraper.Logout(); err != nil { +func (scraper *Scraper) Logout() error { + if err := scraper.Scraper.Logout(); err != nil { return fmt.Errorf("logout failed: %v", err) } return nil } + +func (scraper *Scraper) IsLoggedIn() bool { + return scraper.Scraper.IsLoggedIn() +} diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index 0d68fd6e..9eafbf43 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -1,11 +1,11 @@ package twitter import ( - "encoding/json" "fmt" "strings" _ "github.com/lib/pq" + "github.com/masa-finance/masa-oracle/pkg/config" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) @@ -14,6 +14,7 @@ import ( // It takes the username and count as parameters and returns the scraped followers information and an error if any. func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Legacy, error) { once.Do(initializeAccountManager) + baseDir := config.GetInstance().MasaDir for { account := accountManager.GetNextAccount() @@ -21,7 +22,7 @@ func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Leg return nil, fmt.Errorf("all accounts are rate-limited") } - scraper := NewScraper(account) + scraper := NewScraper(account, baseDir) if scraper == nil { logrus.Errorf("Authentication failed for %s", account.Username) continue @@ -38,14 +39,6 @@ func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Leg return nil, fmt.Errorf("%v", errString) } - // Marshal the followingResponse into a JSON string for logging - responseJSON, err := json.Marshal(followingResponse) - if err != nil { - logrus.Errorf("Error marshaling followingResponse: %v", err) - } else { - logrus.Debugf("Following response: %s", responseJSON) - } - return followingResponse, nil } } diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index 65c443b2..3be07bed 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -9,6 +9,7 @@ import ( "time" "github.com/joho/godotenv" + "github.com/masa-finance/masa-oracle/pkg/config" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) @@ -62,21 +63,22 @@ func loadAccountsFromConfig() []*TwitterAccount { func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { once.Do(initializeAccountManager) + baseDir := config.GetInstance().MasaDir - getAuthenticatedScraper := func() (*twitterscraper.Scraper, *TwitterAccount, error) { + getAuthenticatedScraper := func() (*Scraper, *TwitterAccount, error) { account := accountManager.GetNextAccount() if account == nil { return nil, nil, fmt.Errorf("all accounts are rate-limited") } - scraper := NewScraper(account) + scraper := NewScraper(account, baseDir) if scraper == nil { - logrus.Errorf("authentication failed for %s", account.Username) - return nil, account, fmt.Errorf("authentication failed for %s", account.Username) + logrus.Errorf("Authentication failed for %s", account.Username) + return nil, account, fmt.Errorf(" Twitter authentication failed for %s", account.Username) } return scraper, account, nil } - scrapeTweets := func(scraper *twitterscraper.Scraper) ([]*TweetResult, error) { + scrapeTweets := func(scraper *Scraper) ([]*TweetResult, error) { var tweets []*TweetResult ctx := context.Background() scraper.SetSearchMode(twitterscraper.SearchLatest) @@ -117,21 +119,22 @@ func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { once.Do(initializeAccountManager) + baseDir := config.GetInstance().MasaDir - getAuthenticatedScraper := func() (*twitterscraper.Scraper, *TwitterAccount, error) { + getAuthenticatedScraper := func() (*Scraper, *TwitterAccount, error) { account := accountManager.GetNextAccount() if account == nil { return nil, nil, fmt.Errorf("all accounts are rate-limited") } - scraper := NewScraper(account) + scraper := NewScraper(account, baseDir) if scraper == nil { - logrus.Errorf("authentication failed for %s", account.Username) - return nil, account, fmt.Errorf("authentication failed for %s", account.Username) + logrus.Errorf("Authentication failed for %s", account.Username) + return nil, account, fmt.Errorf("Twitter authentication failed for %s", account.Username) } return scraper, account, nil } - getProfile := func(scraper *twitterscraper.Scraper) (twitterscraper.Profile, error) { + getProfile := func(scraper *Scraper) (twitterscraper.Profile, error) { return scraper.GetProfile(username) } From 5b6645f3bb0cf0f6590c69c7c3932097c473c9c6 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Thu, 3 Oct 2024 18:23:45 -0700 Subject: [PATCH 15/16] refactor(twitter): move retry logic to config.go - Extract generic Retry function to config.go - Remove specific retry functions from tweets.go - Update ScrapeTweetsByQuery and ScrapeTweetsProfile to use new Retry function - Move MaxRetries constant to config.go --- pkg/scrapers/twitter/config.go | 23 ++++++++++++++++++++-- pkg/scrapers/twitter/tweets.go | 36 +++++----------------------------- 2 files changed, 26 insertions(+), 33 deletions(-) diff --git a/pkg/scrapers/twitter/config.go b/pkg/scrapers/twitter/config.go index f358cccf..6e7e6433 100644 --- a/pkg/scrapers/twitter/config.go +++ b/pkg/scrapers/twitter/config.go @@ -1,10 +1,16 @@ package twitter -import "time" +import ( + "fmt" + "time" + + "github.com/sirupsen/logrus" +) const ( - ShortSleepDuration = 100 * time.Millisecond + ShortSleepDuration = 20 * time.Millisecond RateLimitDuration = time.Hour + MaxRetries = 3 ) func ShortSleep() { @@ -14,3 +20,16 @@ func ShortSleep() { func GetRateLimitDuration() time.Duration { return RateLimitDuration } + +func Retry[T any](operation func() (T, error), maxAttempts int) (T, error) { + var zero T + for attempt := 1; attempt <= maxAttempts; attempt++ { + result, err := operation() + if err == nil { + return result, nil + } + logrus.Errorf("retry attempt %d failed: %v", attempt, err) + time.Sleep(time.Duration(attempt) * time.Second) + } + return zero, fmt.Errorf("operation failed after %d attempts", maxAttempts) +} diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index 3be07bed..54695c3d 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -6,7 +6,6 @@ import ( "os" "strings" "sync" - "time" "github.com/joho/godotenv" "github.com/masa-finance/masa-oracle/pkg/config" @@ -17,7 +16,6 @@ import ( var ( accountManager *TwitterAccountManager once sync.Once - maxRetries = 3 ) type TweetResult struct { @@ -73,7 +71,7 @@ func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { scraper := NewScraper(account, baseDir) if scraper == nil { logrus.Errorf("Authentication failed for %s", account.Username) - return nil, account, fmt.Errorf(" Twitter authentication failed for %s", account.Username) + return nil, account, fmt.Errorf("Twitter authentication failed for %s", account.Username) } return scraper, account, nil } @@ -100,7 +98,7 @@ func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { return false } - return retryTweets(func() ([]*TweetResult, error) { + return Retry(func() ([]*TweetResult, error) { scraper, account, err := getAuthenticatedScraper() if err != nil { return nil, err @@ -114,7 +112,7 @@ func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { return nil, err } return tweets, nil - }, maxRetries) + }, MaxRetries) } func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { @@ -147,7 +145,7 @@ func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { return false } - return retryProfile(func() (twitterscraper.Profile, error) { + return Retry(func() (twitterscraper.Profile, error) { scraper, account, err := getAuthenticatedScraper() if err != nil { return twitterscraper.Profile{}, err @@ -161,29 +159,5 @@ func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { return twitterscraper.Profile{}, err } return profile, nil - }, maxRetries) -} - -func retryTweets(operation func() ([]*TweetResult, error), maxAttempts int) ([]*TweetResult, error) { - for attempt := 1; attempt <= maxAttempts; attempt++ { - result, err := operation() - if err == nil { - return result, nil - } - logrus.Errorf("retry attempt %d failed: %v", attempt, err) - time.Sleep(time.Duration(attempt) * time.Second) - } - return nil, fmt.Errorf("operation failed after %d attempts", maxAttempts) -} - -func retryProfile(operation func() (twitterscraper.Profile, error), maxAttempts int) (twitterscraper.Profile, error) { - for attempt := 1; attempt <= maxAttempts; attempt++ { - result, err := operation() - if err == nil { - return result, nil - } - logrus.Errorf("retry attempt %d failed: %v", attempt, err) - time.Sleep(time.Duration(attempt) * time.Second) - } - return twitterscraper.Profile{}, fmt.Errorf("operation failed after %d attempts", maxAttempts) + }, MaxRetries) } From e5f2176c7b57c4f1b03d950e386224b778465144 Mon Sep 17 00:00:00 2001 From: Brendan Playford <34052452+teslashibe@users.noreply.github.com> Date: Thu, 3 Oct 2024 18:38:25 -0700 Subject: [PATCH 16/16] refactor(twitter): modularize scraper components and improve error handling - Move account management logic from auth.go to a separate file - Centralize authentication and rate limit handling in common functions - Simplify ScrapeFollowersForProfile and ScrapeTweetsByQuery using Retry - Remove duplicate code and unnecessary initializations - Increase WorkerResponseTimeout from 30 to 45 seconds in DefaultConfig --- pkg/scrapers/twitter/account.go | 45 ++++++++++ pkg/scrapers/twitter/auth.go | 53 +---------- pkg/scrapers/twitter/common.go | 85 ++++++++++++++++++ pkg/scrapers/twitter/followers.go | 30 ++----- pkg/scrapers/twitter/profile.go | 23 +++++ pkg/scrapers/twitter/scraper.go | 17 ++++ pkg/scrapers/twitter/tweets.go | 142 ++---------------------------- pkg/workers/config.go | 2 +- 8 files changed, 186 insertions(+), 211 deletions(-) create mode 100644 pkg/scrapers/twitter/account.go create mode 100644 pkg/scrapers/twitter/common.go create mode 100644 pkg/scrapers/twitter/profile.go create mode 100644 pkg/scrapers/twitter/scraper.go diff --git a/pkg/scrapers/twitter/account.go b/pkg/scrapers/twitter/account.go new file mode 100644 index 00000000..519e3893 --- /dev/null +++ b/pkg/scrapers/twitter/account.go @@ -0,0 +1,45 @@ +package twitter + +import ( + "sync" + "time" +) + +type TwitterAccount struct { + Username string + Password string + TwoFACode string + RateLimitedUntil time.Time +} + +type TwitterAccountManager struct { + accounts []*TwitterAccount + index int + mutex sync.Mutex +} + +func NewTwitterAccountManager(accounts []*TwitterAccount) *TwitterAccountManager { + return &TwitterAccountManager{ + accounts: accounts, + index: 0, + } +} + +func (manager *TwitterAccountManager) GetNextAccount() *TwitterAccount { + manager.mutex.Lock() + defer manager.mutex.Unlock() + for i := 0; i < len(manager.accounts); i++ { + account := manager.accounts[manager.index] + manager.index = (manager.index + 1) % len(manager.accounts) + if time.Now().After(account.RateLimitedUntil) { + return account + } + } + return nil +} + +func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAccount) { + manager.mutex.Lock() + defer manager.mutex.Unlock() + account.RateLimitedUntil = time.Now().Add(GetRateLimitDuration()) +} diff --git a/pkg/scrapers/twitter/auth.go b/pkg/scrapers/twitter/auth.go index 2d1b4cac..a6be7767 100644 --- a/pkg/scrapers/twitter/auth.go +++ b/pkg/scrapers/twitter/auth.go @@ -2,59 +2,12 @@ package twitter import ( "fmt" - "sync" - "time" - twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) -type Scraper struct { - *twitterscraper.Scraper -} - -type TwitterAccount struct { - Username string - Password string - TwoFACode string - RateLimitedUntil time.Time -} - -type TwitterAccountManager struct { - accounts []*TwitterAccount - index int - mutex sync.Mutex -} - -func NewTwitterAccountManager(accounts []*TwitterAccount) *TwitterAccountManager { - return &TwitterAccountManager{ - accounts: accounts, - index: 0, - } -} - -func (manager *TwitterAccountManager) GetNextAccount() *TwitterAccount { - manager.mutex.Lock() - defer manager.mutex.Unlock() - for i := 0; i < len(manager.accounts); i++ { - account := manager.accounts[manager.index] - manager.index = (manager.index + 1) % len(manager.accounts) - if time.Now().After(account.RateLimitedUntil) { - return account - } - } - return nil -} - -func (manager *TwitterAccountManager) MarkAccountRateLimited(account *TwitterAccount) { - manager.mutex.Lock() - defer manager.mutex.Unlock() - account.RateLimitedUntil = time.Now().Add(GetRateLimitDuration()) -} - func NewScraper(account *TwitterAccount, cookieDir string) *Scraper { - ts := twitterscraper.New() - scraper := &Scraper{Scraper: ts} + scraper := &Scraper{Scraper: newTwitterScraper()} if err := LoadCookies(scraper.Scraper, account, cookieDir); err == nil { logrus.Debugf("Cookies loaded for user %s.", account.Username) @@ -100,7 +53,3 @@ func (scraper *Scraper) Logout() error { } return nil } - -func (scraper *Scraper) IsLoggedIn() bool { - return scraper.Scraper.IsLoggedIn() -} diff --git a/pkg/scrapers/twitter/common.go b/pkg/scrapers/twitter/common.go new file mode 100644 index 00000000..008c7aed --- /dev/null +++ b/pkg/scrapers/twitter/common.go @@ -0,0 +1,85 @@ +package twitter + +import ( + "fmt" + "os" + "strings" + "sync" + + "github.com/joho/godotenv" + "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/sirupsen/logrus" +) + +var ( + accountManager *TwitterAccountManager + once sync.Once +) + +func initializeAccountManager() { + accounts := loadAccountsFromConfig() + accountManager = NewTwitterAccountManager(accounts) +} + +func loadAccountsFromConfig() []*TwitterAccount { + err := godotenv.Load() + if err != nil { + logrus.Fatalf("error loading .env file: %v", err) + } + + accountsEnv := os.Getenv("TWITTER_ACCOUNTS") + if accountsEnv == "" { + logrus.Fatal("TWITTER_ACCOUNTS not set in .env file") + } + + return parseAccounts(strings.Split(accountsEnv, ",")) +} + +func parseAccounts(accountPairs []string) []*TwitterAccount { + return filterMap(accountPairs, func(pair string) (*TwitterAccount, bool) { + credentials := strings.Split(pair, ":") + if len(credentials) != 2 { + logrus.Warnf("invalid account credentials: %s", pair) + return nil, false + } + return &TwitterAccount{ + Username: strings.TrimSpace(credentials[0]), + Password: strings.TrimSpace(credentials[1]), + }, true + }) +} + +func getAuthenticatedScraper() (*Scraper, *TwitterAccount, error) { + once.Do(initializeAccountManager) + baseDir := config.GetInstance().MasaDir + + account := accountManager.GetNextAccount() + if account == nil { + return nil, nil, fmt.Errorf("all accounts are rate-limited") + } + scraper := NewScraper(account, baseDir) + if scraper == nil { + logrus.Errorf("Authentication failed for %s", account.Username) + return nil, account, fmt.Errorf("Twitter authentication failed for %s", account.Username) + } + return scraper, account, nil +} + +func handleRateLimit(err error, account *TwitterAccount) bool { + if strings.Contains(err.Error(), "Rate limit exceeded") { + accountManager.MarkAccountRateLimited(account) + logrus.Warnf("rate limited: %s", account.Username) + return true + } + return false +} + +func filterMap[T any, R any](slice []T, f func(T) (R, bool)) []R { + result := make([]R, 0, len(slice)) + for _, v := range slice { + if r, ok := f(v); ok { + result = append(result, r) + } + } + return result +} diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index 9eafbf43..18746b55 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -2,43 +2,27 @@ package twitter import ( "fmt" - "strings" - _ "github.com/lib/pq" - "github.com/masa-finance/masa-oracle/pkg/config" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" ) -// ScrapeFollowersForProfile scrapes the followers of a specific Twitter user. -// It takes the username and count as parameters and returns the scraped followers information and an error if any. func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Legacy, error) { - once.Do(initializeAccountManager) - baseDir := config.GetInstance().MasaDir - - for { - account := accountManager.GetNextAccount() - if account == nil { - return nil, fmt.Errorf("all accounts are rate-limited") - } - - scraper := NewScraper(account, baseDir) - if scraper == nil { - logrus.Errorf("Authentication failed for %s", account.Username) - continue + return Retry(func() ([]twitterscraper.Legacy, error) { + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return nil, err } followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") if errString != "" { - if strings.Contains(errString, "Rate limit exceeded") { - accountManager.MarkAccountRateLimited(account) - logrus.Warnf("Rate limited: %s", account.Username) - continue + if handleRateLimit(fmt.Errorf(errString), account) { + return nil, fmt.Errorf("rate limited") } logrus.Errorf("Error fetching followers: %v", errString) return nil, fmt.Errorf("%v", errString) } return followingResponse, nil - } + }, MaxRetries) } diff --git a/pkg/scrapers/twitter/profile.go b/pkg/scrapers/twitter/profile.go new file mode 100644 index 00000000..8337f186 --- /dev/null +++ b/pkg/scrapers/twitter/profile.go @@ -0,0 +1,23 @@ +package twitter + +import ( + twitterscraper "github.com/masa-finance/masa-twitter-scraper" +) + +func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { + return Retry(func() (twitterscraper.Profile, error) { + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return twitterscraper.Profile{}, err + } + + profile, err := scraper.GetProfile(username) + if err != nil { + if handleRateLimit(err, account) { + return twitterscraper.Profile{}, err + } + return twitterscraper.Profile{}, err + } + return profile, nil + }, MaxRetries) +} diff --git a/pkg/scrapers/twitter/scraper.go b/pkg/scrapers/twitter/scraper.go new file mode 100644 index 00000000..2ca66861 --- /dev/null +++ b/pkg/scrapers/twitter/scraper.go @@ -0,0 +1,17 @@ +package twitter + +import ( + twitterscraper "github.com/masa-finance/masa-twitter-scraper" +) + +type Scraper struct { + *twitterscraper.Scraper +} + +func newTwitterScraper() *twitterscraper.Scraper { + return twitterscraper.New() +} + +func (s *Scraper) IsLoggedIn() bool { + return s.Scraper.IsLoggedIn() +} diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index 54695c3d..2b5c229d 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -2,20 +2,8 @@ package twitter import ( "context" - "fmt" - "os" - "strings" - "sync" - "github.com/joho/godotenv" - "github.com/masa-finance/masa-oracle/pkg/config" twitterscraper "github.com/masa-finance/masa-twitter-scraper" - "github.com/sirupsen/logrus" -) - -var ( - accountManager *TwitterAccountManager - once sync.Once ) type TweetResult struct { @@ -23,141 +11,25 @@ type TweetResult struct { Error error } -func initializeAccountManager() { - accounts := loadAccountsFromConfig() - accountManager = NewTwitterAccountManager(accounts) -} - -// loadAccountsFromConfig reads Twitter accounts from the .env file -func loadAccountsFromConfig() []*TwitterAccount { - err := godotenv.Load() - if err != nil { - logrus.Fatalf("error loading .env file: %v", err) - } - - accountsEnv := os.Getenv("TWITTER_ACCOUNTS") - if accountsEnv == "" { - logrus.Fatal("TWITTER_ACCOUNTS not set in .env file") - } - - accountPairs := strings.Split(accountsEnv, ",") - var accounts []*TwitterAccount - - for _, pair := range accountPairs { - credentials := strings.Split(pair, ":") - if len(credentials) != 2 { - logrus.Warnf("invalid account credentials: %s", pair) - continue - } - account := &TwitterAccount{ - Username: strings.TrimSpace(credentials[0]), - Password: strings.TrimSpace(credentials[1]), - } - accounts = append(accounts, account) - } - - return accounts -} - func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { - once.Do(initializeAccountManager) - baseDir := config.GetInstance().MasaDir - - getAuthenticatedScraper := func() (*Scraper, *TwitterAccount, error) { - account := accountManager.GetNextAccount() - if account == nil { - return nil, nil, fmt.Errorf("all accounts are rate-limited") - } - scraper := NewScraper(account, baseDir) - if scraper == nil { - logrus.Errorf("Authentication failed for %s", account.Username) - return nil, account, fmt.Errorf("Twitter authentication failed for %s", account.Username) + return Retry(func() ([]*TweetResult, error) { + scraper, account, err := getAuthenticatedScraper() + if err != nil { + return nil, err } - return scraper, account, nil - } - scrapeTweets := func(scraper *Scraper) ([]*TweetResult, error) { var tweets []*TweetResult ctx := context.Background() scraper.SetSearchMode(twitterscraper.SearchLatest) for tweet := range scraper.SearchTweets(ctx, query, count) { if tweet.Error != nil { + if handleRateLimit(tweet.Error, account) { + return nil, tweet.Error + } return nil, tweet.Error } tweets = append(tweets, &TweetResult{Tweet: &tweet.Tweet}) } return tweets, nil - } - - handleRateLimit := func(err error, account *TwitterAccount) bool { - if strings.Contains(err.Error(), "Rate limit exceeded") { - accountManager.MarkAccountRateLimited(account) - logrus.Warnf("rate limited: %s", account.Username) - return true - } - return false - } - - return Retry(func() ([]*TweetResult, error) { - scraper, account, err := getAuthenticatedScraper() - if err != nil { - return nil, err - } - - tweets, err := scrapeTweets(scraper) - if err != nil { - if handleRateLimit(err, account) { - return nil, err - } - return nil, err - } - return tweets, nil - }, MaxRetries) -} - -func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { - once.Do(initializeAccountManager) - baseDir := config.GetInstance().MasaDir - - getAuthenticatedScraper := func() (*Scraper, *TwitterAccount, error) { - account := accountManager.GetNextAccount() - if account == nil { - return nil, nil, fmt.Errorf("all accounts are rate-limited") - } - scraper := NewScraper(account, baseDir) - if scraper == nil { - logrus.Errorf("Authentication failed for %s", account.Username) - return nil, account, fmt.Errorf("Twitter authentication failed for %s", account.Username) - } - return scraper, account, nil - } - - getProfile := func(scraper *Scraper) (twitterscraper.Profile, error) { - return scraper.GetProfile(username) - } - - handleRateLimit := func(err error, account *TwitterAccount) bool { - if strings.Contains(err.Error(), "Rate limit exceeded") { - accountManager.MarkAccountRateLimited(account) - logrus.Warnf("rate limited: %s", account.Username) - return true - } - return false - } - - return Retry(func() (twitterscraper.Profile, error) { - scraper, account, err := getAuthenticatedScraper() - if err != nil { - return twitterscraper.Profile{}, err - } - - profile, err := getProfile(scraper) - if err != nil { - if handleRateLimit(err, account) { - return twitterscraper.Profile{}, err - } - return twitterscraper.Profile{}, err - } - return profile, nil }, MaxRetries) } diff --git a/pkg/workers/config.go b/pkg/workers/config.go index ba266114..03c38957 100644 --- a/pkg/workers/config.go +++ b/pkg/workers/config.go @@ -18,7 +18,7 @@ type WorkerConfig struct { var DefaultConfig = WorkerConfig{ WorkerTimeout: 55 * time.Second, - WorkerResponseTimeout: 30 * time.Second, + WorkerResponseTimeout: 45 * time.Second, ConnectionTimeout: 10 * time.Second, MaxRetries: 1, MaxSpawnAttempts: 1,