diff --git a/.env.example b/.env.example index 8cc56f57..b6386fee 100644 --- a/.env.example +++ b/.env.example @@ -34,6 +34,11 @@ DISCORD_SCRAPER=true DISCORD_BOT_TOKEN= your discord bot token TWITTER_SCRAPER=true WEB_SCRAPER=true +TELEGRAM_SCRAPER=false + +# Go to my.telegram.org/auth and after logging in click the developer option to get these +TELEGRAM_APP_ID= +TELEGRAM_APP_HASH= ## Optional added features for ORACLE or VALIDATOR nodes only diff --git a/Makefile b/Makefile index 0ca862f4..29b9ba4c 100644 --- a/Makefile +++ b/Makefile @@ -23,7 +23,7 @@ client: build @./bin/masa-node-cli test: - @go test ./... + @go test -v -count=1 ./... clean: @rm -rf bin diff --git a/RELEASE_NOTES.md b/RELEASE_NOTES.md index d5052e6a..1c1a6175 100644 --- a/RELEASE_NOTES.md +++ b/RELEASE_NOTES.md @@ -2,7 +2,7 @@ [All Releases](https://github.com/masa-finance/masa-oracle/releases) -## [0.0.9-beta](https://github.com/masa-finance/masa-oracle/releases) (2024) +## [0.0.5.0](https://github.com/masa-finance/masa-oracle/releases) (2024) ## Overview @@ -10,18 +10,26 @@ This release of the Masa Oracle Node introduces the following new features and c ### Breaking Changes -* Protocol verion change to 0.0.9 +* Protocol verion change to 0.5.0 ### Bug fixes -* None +* Increased from selecting all available nodes to a maximum of XX nodes per request +* Reduced global timeout from 60 seconds to XX tbd seconds for faster response times ### New Features -* Added protocol blockchain ledger +* Node selection now considers worker category (Twitter, Telegram, Discord, Web) for more targeted task distribution ### Enhancements +* Improved Twitter scraping timeout and performance +* Implemented error handling in tweet collection +* Upgraded sentiment analysis to work updated tweet data structures +* Optimized worker selection for faster and more efficient data processing +* Implemented reduced timeout handling to have faster response times +* Implemented per-node timeout checks to identify and temporarily exclude underperforming nodes + ## Change Log -* Version: 0.0.9-beta +* Version: 0.5.0 diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index d4ff7088..ea416574 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -99,6 +99,7 @@ func main() { if node.IsStaked { go workers.MonitorWorkers(ctx, node) go masa.SubscribeToBlocks(ctx, node) + go node.NodeTracker.ClearExpiredWorkerTimeouts() } // Listen for SIGINT (CTRL+C) diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index 9c4e5e22..807cf456 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -100,8 +100,9 @@ func handleWorkResponse(c *gin.Context, responseCh chan []byte) { c.JSON(http.StatusOK, result) return - case <-time.After(90 * time.Second): - c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out"}) + // teslashibe: adjust to timeout after 10 seconds for performance testing + case <-time.After(10 * time.Second): + c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out, check that port 4001 TCP inbound is open."}) return case <-c.Done(): return diff --git a/pkg/config/app.go b/pkg/config/app.go index bba00661..aae42f03 100644 --- a/pkg/config/app.go +++ b/pkg/config/app.go @@ -112,8 +112,8 @@ func GetInstance() *AppConfig { once.Do(func() { instance = &AppConfig{} - instance.setDefaultConfig() instance.setEnvVariableConfig() + instance.setDefaultConfig() instance.setFileConfig(viper.GetString("FILE_PATH")) err := instance.setCommandLineConfig() if err != nil { diff --git a/pkg/llmbridge/sentiment.go b/pkg/llmbridge/sentiment.go index 504015f1..e4384241 100644 --- a/pkg/llmbridge/sentiment.go +++ b/pkg/llmbridge/sentiment.go @@ -19,11 +19,21 @@ import ( // AnalyzeSentimentTweets analyzes the sentiment of the provided tweets by sending them to the Claude API. // It concatenates the tweets, creates a payload, sends a request to Claude, parses the response, // and returns the concatenated tweets content, a sentiment summary, and any error. -func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string, prompt string) (string, string, error) { +func AnalyzeSentimentTweets(tweets []*twitterscraper.TweetResult, model string, prompt string) (string, string, error) { // check if we are using claude or gpt, can add others easily if strings.Contains(model, "claude-") { client := NewClaudeClient() // Adjusted to call without arguments - tweetsContent := ConcatenateTweets(tweets) + + var validTweets []*twitterscraper.TweetResult + for _, tweet := range tweets { + if tweet.Error != nil { + logrus.WithError(tweet.Error).Warn("[-] Error in tweet") + continue + } + validTweets = append(validTweets, tweet) + } + + tweetsContent := ConcatenateTweets(validTweets) payloadBytes, err := CreatePayload(tweetsContent, model, prompt) if err != nil { logrus.Errorf("[-] Error creating payload: %v", err) @@ -101,10 +111,10 @@ func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string, prompt // ConcatenateTweets concatenates the text of the provided tweets into a single string, // with each tweet separated by a newline character. -func ConcatenateTweets(tweets []*twitterscraper.Tweet) string { +func ConcatenateTweets(tweets []*twitterscraper.TweetResult) string { var tweetsTexts []string - for _, tweet := range tweets { - tweetsTexts = append(tweetsTexts, tweet.Text) + for _, t := range tweets { + tweetsTexts = append(tweetsTexts, t.Tweet.Text) } return strings.Join(tweetsTexts, "\n") } diff --git a/pkg/network/kdht.go b/pkg/network/kdht.go index 88d36bb9..9b6cd1a1 100644 --- a/pkg/network/kdht.go +++ b/pkg/network/kdht.go @@ -30,7 +30,7 @@ func (dbValidator) Validate(_ string, _ []byte) error { return nil } func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, - protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool) (*dht.IpfsDHT, error) { + protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool, removePeerCallback func(peer.ID)) (*dht.IpfsDHT, error) { options := make([]dht.Option, 0) options = append(options, dht.BucketSize(100)) // Adjust bucket size options = append(options, dht.Concurrency(100)) // Increase concurrency @@ -64,6 +64,9 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul Source: "kdht", } peerChan <- pe + if removePeerCallback != nil { + removePeerCallback(p) + } } if err = kademliaDHT.Bootstrap(ctx); err != nil { @@ -113,7 +116,7 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul if strings.Contains(err.Error(), "protocols not supported") { logrus.Fatalf("[-] %s Please update to the latest version and make sure you are connecting to the correct network.", err.Error()) } else { - logrus.Error("[-] Error opening stream:", err) + logrus.Error("[-] Error opening stream: ", err) } return } diff --git a/pkg/oracle_node.go b/pkg/oracle_node.go index 37c0658a..ee93b70f 100644 --- a/pkg/oracle_node.go +++ b/pkg/oracle_node.go @@ -24,6 +24,7 @@ import ( dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" @@ -177,7 +178,7 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), Context: ctx, PeerChan: make(chan myNetwork.PeerEvent), - NodeTracker: pubsub2.NewNodeEventTracker(config.Version, cfg.Environment), + NodeTracker: pubsub2.NewNodeEventTracker(config.Version, cfg.Environment, hst.ID().String()), PubSubManager: subscriptionManager, IsStaked: isStaked, IsValidator: cfg.Validator, @@ -215,7 +216,11 @@ func (node *OracleNode) Start() (err error) { go node.ListenToNodeTracker() go node.handleDiscoveredPeers() - node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked) + removePeerCallback := func(p peer.ID) { + node.NodeTracker.RemoveNodeData(p.String()) + } + + node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked, removePeerCallback) if err != nil { return err } diff --git a/pkg/oracle_node_listener.go b/pkg/oracle_node_listener.go index 7c2d387f..06598aa3 100644 --- a/pkg/oracle_node_listener.go +++ b/pkg/oracle_node_listener.go @@ -161,6 +161,12 @@ func (node *OracleNode) ReceiveNodeData(stream network.Stream) { for scanner.Scan() { data := scanner.Bytes() var page NodeDataPage + + if err := json.Unmarshal(data, &page); err != nil { + logrus.Errorf("[-] Failed to unmarshal NodeData page: %v %s %+v", err, string(data), page) + continue + } + if err := json.Unmarshal(data, &page); err != nil { logrus.Errorf("[-] Failed to unmarshal NodeData page: %v", err) continue diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index f12e8a6c..dfcd6dc7 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -66,6 +66,7 @@ type NodeData struct { IsWebScraper bool `json:"isWebScraper"` Records any `json:"records,omitempty"` Version string `json:"version"` + WorkerTimeout time.Time `json:"workerTimeout,omitempty"` } // NewNodeData creates a new NodeData struct initialized with the given @@ -73,7 +74,6 @@ type NodeData struct { func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData { multiaddrs := make([]JSONMultiaddr, 0) multiaddrs = append(multiaddrs, JSONMultiaddr{addr}) - // cfg := config.GetInstance() return &NodeData{ PeerId: peerId, @@ -94,6 +94,44 @@ func (n *NodeData) Address() string { return fmt.Sprintf("%s/p2p/%s", n.Multiaddrs[0].String(), n.PeerId.String()) } +// WorkerCategory represents the main categories of workers +type WorkerCategory int + +const ( + CategoryDiscord WorkerCategory = iota + CategoryTelegram + CategoryTwitter + CategoryWeb +) + +// String returns the string representation of the WorkerCategory +func (wc WorkerCategory) String() string { + return [...]string{"Discord", "Telegram", "Twitter", "Web"}[wc] +} + +// CanDoWork checks if the node can perform work of the specified WorkerType. +// It returns true if the node is configured for the given worker type, false otherwise. +func (n *NodeData) CanDoWork(workerType WorkerCategory) bool { + + if !n.WorkerTimeout.IsZero() && time.Since(n.WorkerTimeout) < 16*time.Minute { + logrus.Infof("[+] Skipping worker %s due to timeout", n.PeerId) + return false + } + + switch workerType { + case CategoryTwitter: + return n.IsActive && n.IsTwitterScraper + case CategoryDiscord: + return n.IsActive && n.IsDiscordScraper + case CategoryTelegram: + return n.IsActive && n.IsTelegramScraper + case CategoryWeb: + return n.IsActive && n.IsWebScraper + default: + return false + } +} + // TwitterScraper checks if the current node is configured as a Twitter scraper. // It retrieves the configuration instance and returns the value of the TwitterScraper field. func (n *NodeData) TwitterScraper() bool { diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index 5a6ca099..f5402bda 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -1,6 +1,7 @@ package pubsub import ( + "context" "encoding/json" "fmt" "sort" @@ -31,7 +32,7 @@ type ConnectBufferEntry struct { // It initializes the node data map, node data channel, node data file path, // connect buffer map. It loads existing node data from file, starts a goroutine // to clear expired buffer entries, and returns the initialized instance. -func NewNodeEventTracker(version, environment string) *NodeEventTracker { +func NewNodeEventTracker(version, environment string, hostId string) *NodeEventTracker { net := &NodeEventTracker{ nodeData: NewSafeMap(), NodeDataChan: make(chan *NodeData), @@ -39,6 +40,7 @@ func NewNodeEventTracker(version, environment string) *NodeEventTracker { ConnectBuffer: make(map[string]ConnectBufferEntry), } go net.ClearExpiredBufferEntries() + go net.StartCleanupRoutine(context.Background(), hostId) return net } @@ -313,24 +315,47 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip dataChanged = true nd.EthAddress = nodeData.EthAddress } - // If the node data exists, check if the multiaddress is already in the list - multiAddress := nodeData.Multiaddrs[0].Multiaddr - addrExists := false - for _, addr := range nodeData.Multiaddrs { - if addr.Equal(multiAddress) { - addrExists = true - break + + if len(nodeData.Multiaddrs) > 0 { + multiAddress := nodeData.Multiaddrs[0].Multiaddr + addrExists := false + for _, addr := range nodeData.Multiaddrs { + if addr.Equal(multiAddress) { + addrExists = true + break + } } - } - if !addrExists { - nodeData.Multiaddrs = append(nodeData.Multiaddrs, JSONMultiaddr{multiAddress}) - } - if dataChanged || forceGossip { - net.NodeDataChan <- nd + if !addrExists { + nodeData.Multiaddrs = append(nodeData.Multiaddrs, JSONMultiaddr{multiAddress}) + } + + if dataChanged || forceGossip { + net.NodeDataChan <- nd + } + + nd.LastUpdatedUnix = nodeData.LastUpdatedUnix + net.nodeData.Set(nodeData.PeerId.String(), nd) + } - nd.LastUpdatedUnix = nodeData.LastUpdatedUnix - net.nodeData.Set(nodeData.PeerId.String(), nd) + // If the node data exists, check if the multiaddress is already in the list + // multiAddress := nodeData.Multiaddrs[0].Multiaddr + // addrExists := false + // for _, addr := range nodeData.Multiaddrs { + // if addr.Equal(multiAddress) { + // addrExists = true + // break + // } + // } + // if !addrExists { + // nodeData.Multiaddrs = append(nodeData.Multiaddrs, JSONMultiaddr{multiAddress}) + // } + // if dataChanged || forceGossip { + // net.NodeDataChan <- nd + // } + + // nd.LastUpdatedUnix = nodeData.LastUpdatedUnix + // net.nodeData.Set(nodeData.PeerId.String(), nd) } return nil } @@ -356,8 +381,84 @@ func (net *NodeEventTracker) ClearExpiredBufferEntries() { } } +// RemoveNodeData removes the node data associated with the given peer ID from the NodeEventTracker. +// It deletes the node data from the internal map and removes any corresponding entry +// from the connect buffer. This function is typically called when a peer disconnects +// or is no longer part of the network. +// +// Parameters: +// - peerID: A string representing the ID of the peer to be removed. func (net *NodeEventTracker) RemoveNodeData(peerID string) { net.nodeData.Delete(peerID) delete(net.ConnectBuffer, peerID) logrus.Infof("[+] Removed peer %s from NodeTracker", peerID) } + +// ClearExpiredWorkerTimeouts periodically checks and clears expired worker timeouts. +// It runs in an infinite loop, sleeping for 5 minutes between each iteration. +// For each node in the network, it checks if the worker timeout has expired (after 60 minutes). +// If a timeout has expired, it resets the WorkerTimeout to zero and updates the node data. +// This function helps manage the availability of workers in the network by clearing +// temporary timeout states. +func (net *NodeEventTracker) ClearExpiredWorkerTimeouts() { + for { + time.Sleep(5 * time.Minute) // Check every 5 minutes + now := time.Now() + + for _, nodeData := range net.GetAllNodeData() { + if !nodeData.WorkerTimeout.IsZero() && now.Sub(nodeData.WorkerTimeout) >= 16*time.Minute { + nodeData.WorkerTimeout = time.Time{} // Reset to zero value + err := net.AddOrUpdateNodeData(&nodeData, true) + if err != nil { + logrus.Warnf("Error adding worker timeout %v", err) + } + } + } + } +} + +const ( + maxDisconnectionTime = 1 * time.Minute + cleanupInterval = 2 * time.Minute +) + +// StartCleanupRoutine starts a goroutine that periodically checks for and removes stale peers +func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context, hostId string) { + ticker := time.NewTicker(cleanupInterval) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + net.cleanupStalePeers(hostId) + case <-ctx.Done(): + return + } + } +} + +// cleanupStalePeers checks for and removes stale peers from both the routing table and node data +func (net *NodeEventTracker) cleanupStalePeers(hostId string) { + now := time.Now() + + for _, nodeData := range net.GetAllNodeData() { + if now.Sub(time.Unix(nodeData.LastUpdatedUnix, 0)) > maxDisconnectionTime { + if nodeData.PeerId.String() != hostId { + logrus.Infof("Removing stale peer: %s", nodeData.PeerId) + net.RemoveNodeData(nodeData.PeerId.String()) + delete(net.ConnectBuffer, nodeData.PeerId.String()) + + // Notify about peer removal + net.NodeDataChan <- &NodeData{ + PeerId: nodeData.PeerId, + Activity: ActivityLeft, + LastUpdatedUnix: now.Unix(), + } + } + + // Use the node parameter to access OracleNode methods if needed + // For example: + // node.SomeMethod(nodeData.PeerId) + } + } +} diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index 1779ffa8..8348d6de 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -13,6 +13,11 @@ import ( "github.com/sirupsen/logrus" ) +type TweetResult struct { + Tweet *twitterscraper.Tweet + Error error +} + // auth initializes and returns a new Twitter scraper instance. It attempts to load cookies from a file to reuse an existing session. // If no valid session is found, it performs a login with credentials specified in the application's configuration. // On successful login, it saves the session cookies for future use. If the login fails, it returns nil. @@ -69,7 +74,7 @@ func auth() *twitterscraper.Scraper { // - An error if the scraping or sentiment analysis process encounters any issues. func ScrapeTweetsForSentiment(query string, count int, model string) (string, string, error) { scraper := auth() - var tweets []*twitterscraper.Tweet + var tweets []*TweetResult if scraper == nil { return "", "", fmt.Errorf("there was an error authenticating with your Twitter credentials") @@ -80,18 +85,34 @@ func ScrapeTweetsForSentiment(query string, count int, model string) (string, st // Perform the search with the specified query and count for tweetResult := range scraper.SearchTweets(context.Background(), query, count) { + var tweet TweetResult if tweetResult.Error != nil { - logrus.Printf("Error fetching tweet: %v", tweetResult.Error) - continue + tweet = TweetResult{ + Tweet: nil, + Error: tweetResult.Error, + } + } else { + tweet = TweetResult{ + Tweet: &tweetResult.Tweet, + Error: nil, + } } - tweets = append(tweets, &tweetResult.Tweet) + tweets = append(tweets, &tweet) } sentimentPrompt := "Please perform a sentiment analysis on the following tweets, using an unbiased approach. Sentiment analysis involves identifying and categorizing opinions expressed in text, particularly to determine whether the writer's attitude towards a particular topic, product, etc., is positive, negative, or neutral. After analyzing, please provide a summary of the overall sentiment expressed in these tweets, including the proportion of positive, negative, and neutral sentiments if applicable." - prompt, sentiment, err := llmbridge.AnalyzeSentimentTweets(tweets, model, sentimentPrompt) + + twitterScraperTweets := make([]*twitterscraper.TweetResult, len(tweets)) + for i, tweet := range tweets { + twitterScraperTweets[i] = &twitterscraper.TweetResult{ + Tweet: *tweet.Tweet, + Error: tweet.Error, + } + } + prompt, sentiment, err := llmbridge.AnalyzeSentimentTweets(twitterScraperTweets, model, sentimentPrompt) if err != nil { return "", "", err } - return prompt, sentiment, nil + return prompt, sentiment, tweets[0].Error } // ScrapeTweetsByQuery performs a search on Twitter for tweets matching the specified query. @@ -103,9 +124,9 @@ func ScrapeTweetsForSentiment(query string, count int, model string) (string, st // Returns: // - A slice of pointers to twitterscraper.Tweet objects that match the search query. // - An error if the scraping process encounters any issues. -func ScrapeTweetsByQuery(query string, count int) ([]*twitterscraper.Tweet, error) { +func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { scraper := auth() - var tweets []*twitterscraper.Tweet + var tweets []*TweetResult if scraper == nil { return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") @@ -116,21 +137,29 @@ func ScrapeTweetsByQuery(query string, count int) ([]*twitterscraper.Tweet, erro // Perform the search with the specified query and count for tweetResult := range scraper.SearchTweets(context.Background(), query, count) { + var tweet TweetResult if tweetResult.Error != nil { - logrus.Printf("Error fetching tweet: %v", tweetResult.Error) - continue + tweet = TweetResult{ + Tweet: nil, + Error: tweetResult.Error, + } + } else { + tweet = TweetResult{ + Tweet: &tweetResult.Tweet, + Error: nil, + } } - tweets = append(tweets, &tweetResult.Tweet) + tweets = append(tweets, &tweet) } - return tweets, nil + return tweets, tweets[0].Error } // ScrapeTweetsByTrends scrapes the current trending topics on Twitter. // It returns a slice of strings representing the trending topics. // If an error occurs during the scraping process, it returns an error. -func ScrapeTweetsByTrends() ([]string, error) { +func ScrapeTweetsByTrends() ([]*TweetResult, error) { scraper := auth() - var tweets []string + var trendResults []*TweetResult if scraper == nil { return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials") @@ -141,13 +170,18 @@ func ScrapeTweetsByTrends() ([]string, error) { trends, err := scraper.GetTrends() if err != nil { - logrus.Printf("Error fetching tweet: %v", err) return nil, err } - tweets = append(tweets, trends...) + for _, trend := range trends { + trendResult := &TweetResult{ + Tweet: &twitterscraper.Tweet{Text: trend}, + Error: nil, + } + trendResults = append(trendResults, trendResult) + } - return tweets, nil + return trendResults, trendResults[0].Error } // ScrapeTweetsProfile scrapes the profile and tweets of a specific Twitter user. @@ -164,7 +198,6 @@ func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { profile, err := scraper.GetProfile(username) if err != nil { - logrus.Printf("Error fetching profile: %v", err) return twitterscraper.Profile{}, err } diff --git a/pkg/tests/scrape_test.go b/pkg/tests/scrape_test.go index d4e45dbe..66650b19 100644 --- a/pkg/tests/scrape_test.go +++ b/pkg/tests/scrape_test.go @@ -3,14 +3,22 @@ package tests import ( + "encoding/csv" "encoding/json" + "fmt" + "net/http" + "net/http/httptest" "os" "path/filepath" + "runtime" "testing" + "time" + "github.com/joho/godotenv" "github.com/masa-finance/masa-oracle/pkg/llmbridge" twitterscraper "github.com/masa-finance/masa-twitter-scraper" "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" @@ -20,6 +28,13 @@ import ( var scraper *twitterscraper.Scraper func setup() { + var err error + _, b, _, _ := runtime.Caller(0) + rootDir := filepath.Join(filepath.Dir(b), "../..") + if _, _ = os.Stat(rootDir + "/.env"); !os.IsNotExist(err) { + _ = godotenv.Load() + } + logrus.SetLevel(logrus.DebugLevel) if scraper == nil { scraper = twitterscraper.New() @@ -32,7 +47,7 @@ func setup() { cookieFilePath := filepath.Join(appConfig.MasaDir, "twitter_cookies.json") // Attempt to load cookies - if err := twitter.LoadCookies(scraper, cookieFilePath); err == nil { + if err = twitter.LoadCookies(scraper, cookieFilePath); err == nil { logrus.Debug("Cookies loaded successfully.") if twitter.IsLoggedIn(scraper) { logrus.Debug("Already logged in via cookies.") @@ -46,7 +61,7 @@ func setup() { logrus.WithFields(logrus.Fields{"username": username, "password": password}).Debug("Attempting to login") twoFACode := appConfig.Twitter2FaCode - var err error + if twoFACode != "" { logrus.WithField("2FA", "provided").Debug("2FA code is provided, attempting login with 2FA") err = twitter.Login(scraper, username, password, twoFACode) @@ -61,7 +76,7 @@ func setup() { } // Save cookies after successful login - if err := twitter.SaveCookies(scraper, cookieFilePath); err != nil { + if err = twitter.SaveCookies(scraper, cookieFilePath); err != nil { logrus.WithError(err).Error("[-] Failed to save cookies") return } @@ -69,7 +84,37 @@ func setup() { logrus.Debug("[+] Login successful") } -func TestScrapeTweetsByQuery(t *testing.T) { +func scrapeTweets(outputFile string) error { + // Implement the tweet scraping logic here + // This function should: + // 1. Make API calls to the MASA_NODE_URL + // 2. Process the responses + // 3. Write the tweets to the outputFile in CSV format + // 4. Handle rate limiting and retries + // 5. Return an error if something goes wrong + + // For now, we'll just create a dummy file + file, err := os.Create(outputFile) + if err != nil { + return fmt.Errorf("error creating file: %v", err) + } + defer file.Close() + + writer := csv.NewWriter(file) + defer writer.Flush() + + writer.Write([]string{"tweet", "datetime"}) + writer.Write([]string{"Test tweet #1", time.Now().Format(time.RFC3339)}) + writer.Write([]string{"Test tweet #2", time.Now().Format(time.RFC3339)}) + + return nil +} + +func TestSetup(t *testing.T) { + setup() +} + +func TestScrapeTweetsWithSentimentByQuery(t *testing.T) { // Ensure setup is done before running the test setup() @@ -114,7 +159,14 @@ func TestScrapeTweetsByQuery(t *testing.T) { // Now, deserializedTweets contains the tweets loaded from the file // Send the tweets data to Claude for sentiment analysis - sentimentRequest, sentimentSummary, err := llmbridge.AnalyzeSentimentTweets(deserializedTweets, "claude-3-opus-20240229", "Please perform a sentiment analysis on the following tweets, using an unbiased approach. Sentiment analysis involves identifying and categorizing opinions expressed in text, particularly to determine whether the writer's attitude towards a particular topic, product, etc., is positive, negative, or neutral. After analyzing, please provide a summary of the overall sentiment expressed in these tweets, including the proportion of positive, negative, and neutral sentiments if applicable.") + twitterScraperTweets := make([]*twitterscraper.TweetResult, len(deserializedTweets)) + for i, tweet := range deserializedTweets { + twitterScraperTweets[i] = &twitterscraper.TweetResult{ + Tweet: *tweet, + Error: nil, + } + } + sentimentRequest, sentimentSummary, err := llmbridge.AnalyzeSentimentTweets(twitterScraperTweets, "claude-3-opus-20240229", "Please perform a sentiment analysis on the following tweets, using an unbiased approach. Sentiment analysis involves identifying and categorizing opinions expressed in text, particularly to determine whether the writer's attitude towards a particular topic, product, etc., is positive, negative, or neutral. After analyzing, please provide a summary of the overall sentiment expressed in these tweets, including the proportion of positive, negative, and neutral sentiments if applicable.") if err != nil { logrus.WithError(err).Error("[-] Failed to analyze sentiment") err = os.Remove(filePath) @@ -138,3 +190,128 @@ func TestScrapeTweetsByQuery(t *testing.T) { logrus.WithField("file", filePath).Debug("[+] Temporary file deleted successfully") } } + +func TestScrapeTweetsWithMockServer(t *testing.T) { + setup() + + // Mock server + server := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + if r.Method != "POST" { + t.Errorf("Expected POST request, got %s", r.Method) + } + + var requestBody map[string]interface{} + json.NewDecoder(r.Body).Decode(&requestBody) + + if query, ok := requestBody["query"].(string); !ok || query == "" { + t.Errorf("Expected query in request body") + } + + response := map[string]interface{}{ + "data": []map[string]interface{}{ + { + "Text": "Test tweet #1", + "Timestamp": time.Now().Unix(), + }, + { + "Text": "Test tweet #2", + "Timestamp": time.Now().Unix(), + }, + }, + } + + json.NewEncoder(w).Encode(response) + })) + defer server.Close() + + // Set up test environment + os.Setenv("MASA_NODE_URL", server.URL) + outputFile := "test_tweets.csv" + defer os.Remove(outputFile) + + // Run the scrape function (you'll need to implement this) + err := scrapeTweets(outputFile) + if err != nil { + t.Fatalf("Error scraping tweets: %v", err) + } + + // Verify the output + file, err := os.Open(outputFile) + if err != nil { + t.Fatalf("Error opening output file: %v", err) + } + defer file.Close() + + reader := csv.NewReader(file) + records, err := reader.ReadAll() + if err != nil { + t.Fatalf("Error reading CSV: %v", err) + } + + if len(records) != 3 { // Header + 2 tweets + t.Errorf("Expected 3 records, got %d", len(records)) + } + + if records[0][0] != "tweet" || records[0][1] != "datetime" { + t.Errorf("Unexpected header: %v", records[0]) + } + + for i, record := range records[1:] { + if record[0] == "" || record[1] == "" { + t.Errorf("Empty field in record %d: %v", i+1, record) + } + _, err := time.Parse(time.RFC3339, record[1]) + if err != nil { + t.Errorf("Invalid datetime format in record %d: %v", i+1, record[1]) + } + } +} + +func TestScrapeTweets(t *testing.T) { + setup() + + query := "Sadhguru" + count := 10 + + for i := 0; i < 100; i++ { + tweets, err := twitter.ScrapeTweetsByQuery(query, count) + if err != nil { + logrus.WithError(err).Error("[-] Failed to scrape tweets") + return + } + + var validTweets []*twitter.TweetResult + for _, tweet := range tweets { + if tweet.Error != nil { + logrus.WithError(tweet.Error).Warn("[-] Error in tweet") + continue + } + validTweets = append(validTweets, tweet) + } + + tweetsData, err := json.Marshal(validTweets) + if err != nil { + logrus.WithError(err).Error("[-] Failed to serialize tweets data") + return + } + + logrus.WithFields(logrus.Fields{ + "total_tweets": len(tweets), + "valid_tweets": len(validTweets), + "tweets_sample": string(tweetsData[:min(100, len(tweetsData))]), + }).Debug("[+] Tweets data") + + if len(tweetsData) > 0 { + assert.NotNil(t, tweetsData[:min(10, len(tweetsData))]) + } else { + assert.Nil(t, tweetsData) + } + } +} + +func min(a, b int) int { + if a < b { + return a + } + return b +} diff --git a/pkg/workers/handler.go b/pkg/workers/handler.go index f641c891..75089170 100644 --- a/pkg/workers/handler.go +++ b/pkg/workers/handler.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "net" "strings" "github.com/asynkron/protoactor-go/actor" @@ -157,7 +158,26 @@ func (a *Worker) HandleWork(ctx actor.Context, m *messages.Work, node *masa.Orac } if err != nil { - logrus.Errorf("[-] Error processing request: %v", err) + host, _, err := net.SplitHostPort(m.Sender.Address) + addrs := node.Host.Addrs() + isLocalHost := false + for _, addr := range addrs { + addrStr := addr.String() + if strings.HasPrefix(addrStr, "/ip4/") { + ipStr := strings.Split(strings.Split(addrStr, "/")[2], "/")[0] + if host == ipStr { + isLocalHost = true + break + } + } + } + + if isLocalHost { + logrus.Errorf("[-] Local node: Error processing request: %s", err.Error()) + } else { + logrus.Errorf("[-] Remote node %s: Error processing request: %s", m.Sender, err.Error()) + } + chanResponse := ChanResponse{ Response: map[string]interface{}{"error": err.Error()}, ChannelId: workData["request_id"], @@ -187,6 +207,7 @@ func (a *Worker) HandleWork(ctx actor.Context, m *messages.Work, node *masa.Orac return } cfg := config.GetInstance() + if cfg.TwitterScraper || cfg.DiscordScraper || cfg.TelegramScraper || cfg.WebScraper { ctx.Respond(&messages.Response{RequestId: workData["request_id"], Value: string(jsn)}) } diff --git a/pkg/workers/messages/protos.pb.go b/pkg/workers/messages/protos.pb.go index 9fa2751e..b7fd1348 100644 --- a/pkg/workers/messages/protos.pb.go +++ b/pkg/workers/messages/protos.pb.go @@ -123,6 +123,7 @@ type Work struct { Sender *actor.PID `protobuf:"bytes,1,opt,name=Sender,proto3" json:"Sender,omitempty"` Data string `protobuf:"bytes,2,opt,name=Data,proto3" json:"Data,omitempty"` Id string `protobuf:"bytes,3,opt,name=Id,proto3" json:"Id,omitempty"` + Type int64 `protobuf:"varint,4,opt,name=Type,proto3" json:"Type,omitempty"` } func (x *Work) Reset() { @@ -178,6 +179,13 @@ func (x *Work) GetId() string { return "" } +func (x *Work) GetType() int64 { + if x != nil { + return x.Type + } + return 0 +} + type Response struct { state protoimpl.MessageState sizeCache protoimpl.SizeCache @@ -243,20 +251,21 @@ var file_protos_proto_rawDesc = []byte{ 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x06, 0x53, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x22, 0x25, 0x0a, 0x09, 0x43, 0x6f, 0x6e, 0x6e, 0x65, 0x63, 0x74, 0x65, 0x64, 0x12, 0x18, 0x0a, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x18, 0x01, 0x20, 0x01, - 0x28, 0x09, 0x52, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x4e, 0x0a, 0x04, 0x57, + 0x28, 0x09, 0x52, 0x07, 0x4d, 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x22, 0x62, 0x0a, 0x04, 0x57, 0x6f, 0x72, 0x6b, 0x12, 0x22, 0x0a, 0x06, 0x53, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x18, 0x01, 0x20, 0x01, 0x28, 0x0b, 0x32, 0x0a, 0x2e, 0x61, 0x63, 0x74, 0x6f, 0x72, 0x2e, 0x50, 0x49, 0x44, 0x52, 0x06, 0x53, 0x65, 0x6e, 0x64, 0x65, 0x72, 0x12, 0x12, 0x0a, 0x04, 0x44, 0x61, 0x74, 0x61, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, 0x52, 0x04, 0x44, 0x61, 0x74, 0x61, 0x12, 0x0e, 0x0a, 0x02, 0x49, - 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x64, 0x22, 0x3e, 0x0a, 0x08, 0x52, - 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, - 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, 0x65, 0x12, 0x1c, 0x0a, - 0x09, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x18, 0x02, 0x20, 0x01, 0x28, 0x09, - 0x52, 0x09, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x42, 0x36, 0x5a, 0x34, 0x67, - 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x61, 0x73, 0x61, 0x2d, 0x66, - 0x69, 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x2f, 0x6d, 0x61, 0x73, 0x61, 0x2d, 0x6f, 0x72, 0x61, 0x63, - 0x6c, 0x65, 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x2f, 0x6d, 0x65, 0x73, 0x73, 0x61, - 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, + 0x64, 0x18, 0x03, 0x20, 0x01, 0x28, 0x09, 0x52, 0x02, 0x49, 0x64, 0x12, 0x12, 0x0a, 0x04, 0x54, + 0x79, 0x70, 0x65, 0x18, 0x04, 0x20, 0x01, 0x28, 0x03, 0x52, 0x04, 0x54, 0x79, 0x70, 0x65, 0x22, + 0x3e, 0x0a, 0x08, 0x52, 0x65, 0x73, 0x70, 0x6f, 0x6e, 0x73, 0x65, 0x12, 0x14, 0x0a, 0x05, 0x56, + 0x61, 0x6c, 0x75, 0x65, 0x18, 0x01, 0x20, 0x01, 0x28, 0x09, 0x52, 0x05, 0x56, 0x61, 0x6c, 0x75, + 0x65, 0x12, 0x1c, 0x0a, 0x09, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x18, 0x02, + 0x20, 0x01, 0x28, 0x09, 0x52, 0x09, 0x52, 0x65, 0x71, 0x75, 0x65, 0x73, 0x74, 0x49, 0x64, 0x42, + 0x36, 0x5a, 0x34, 0x67, 0x69, 0x74, 0x68, 0x75, 0x62, 0x2e, 0x63, 0x6f, 0x6d, 0x2f, 0x6d, 0x61, + 0x73, 0x61, 0x2d, 0x66, 0x69, 0x6e, 0x61, 0x6e, 0x63, 0x65, 0x2f, 0x6d, 0x61, 0x73, 0x61, 0x2d, + 0x6f, 0x72, 0x61, 0x63, 0x6c, 0x65, 0x2f, 0x77, 0x6f, 0x72, 0x6b, 0x65, 0x72, 0x73, 0x2f, 0x6d, + 0x65, 0x73, 0x73, 0x61, 0x67, 0x65, 0x73, 0x62, 0x06, 0x70, 0x72, 0x6f, 0x74, 0x6f, 0x33, } var ( diff --git a/pkg/workers/messages/protos.proto b/pkg/workers/messages/protos.proto index 47b9f5ec..5fd0bc63 100644 --- a/pkg/workers/messages/protos.proto +++ b/pkg/workers/messages/protos.proto @@ -15,6 +15,7 @@ message Work { actor.PID Sender = 1; string Data = 2; string Id = 3; + int64 Type = 4; } message Response { diff --git a/pkg/workers/workers.go b/pkg/workers/workers.go index d7ac48cc..28da302b 100644 --- a/pkg/workers/workers.go +++ b/pkg/workers/workers.go @@ -76,15 +76,20 @@ var ( workerDoneCh = make(chan *pubsub2.Message) ) -type CID struct { - Duration float64 `json:"duration"` - RecordId string `json:"cid"` - Timestamp int64 `json:"timestamp"` -} - -type Record struct { - PeerId string `json:"peerid"` - CIDs []CID `json:"cids"` +// WorkerTypeToCategory maps WorkerType to WorkerCategory +func WorkerTypeToCategory(wt WorkerType) pubsub.WorkerCategory { + switch wt { + case Discord, DiscordProfile, DiscordChannelMessages, DiscordSentiment, DiscordGuildChannels, DiscordUserGuilds: + return pubsub.CategoryDiscord + case TelegramSentiment, TelegramChannelMessages: + return pubsub.CategoryTelegram + case Twitter, TwitterFollowers, TwitterProfile, TwitterSentiment, TwitterTrends: + return pubsub.CategoryTwitter + case Web, WebSentiment: + return pubsub.CategoryWeb + default: + return -1 // Invalid category + } } type ChanResponse struct { @@ -202,10 +207,11 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { var wg sync.WaitGroup props := actor.PropsFromProducer(NewWorker(node)) pid := node.ActorEngine.Spawn(props) - message := &messages.Work{Data: string(m.Data), Sender: pid, Id: m.ReceivedFrom.String()} + message := &messages.Work{Data: string(m.Data), Sender: pid, Id: m.ReceivedFrom.String(), Type: int64(pubsub.CategoryTwitter)} + n := 0 responseCollector := make(chan *pubsub2.Message, 100) // Buffered channel to collect responses - timeout := time.After(60 * time.Second) + timeout := time.After(8 * time.Second) // Local worker if node.IsStaked && node.IsWorker() { @@ -216,6 +222,9 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { result, err := future.Result() if err != nil { logrus.Errorf("[-] Error receiving response from local worker: %v", err) + responseCollector <- &pubsub2.Message{ + ValidatorData: map[string]interface{}{"error": err.Error()}, + } return } response := result.(*messages.Response) @@ -225,12 +234,15 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { gMsg, gErr := getResponseMessage(result.(*messages.Response)) if gErr != nil { logrus.Errorf("[-] Error getting response message: %v", gErr) - workerDoneCh <- &pubsub2.Message{} + responseCollector <- &pubsub2.Message{ + ValidatorData: map[string]interface{}{"error": gErr.Error()}, + } return } msg = gMsg } responseCollector <- msg + n++ }() } @@ -239,7 +251,9 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { for _, p := range peers { for _, addr := range p.Multiaddrs { ipAddr, _ := addr.ValueForProtocol(multiaddr.P_IP4) - if p.IsStaked && (p.IsTwitterScraper || p.IsWebScraper || p.IsDiscordScraper || p.IsTelegramScraper) { + if (p.PeerId.String() != node.Host.ID().String()) && + p.IsStaked && + node.NodeTracker.GetNodeData(p.PeerId.String()).CanDoWork(pubsub.WorkerCategory(message.Type)) { logrus.Infof("[+] Worker Address: %s", ipAddr) wg.Add(1) go func(p pubsub.NodeData) { @@ -247,12 +261,18 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { spawned, err := node.ActorRemote.SpawnNamed(fmt.Sprintf("%s:4001", ipAddr), "worker", "peer", -1) if err != nil { logrus.Debugf("[-] Error spawning remote worker: %v", err) + responseCollector <- &pubsub2.Message{ + ValidatorData: map[string]interface{}{"error": err.Error()}, + } return } spawnedPID := spawned.Pid logrus.Infof("[+] Worker Address: %s", spawnedPID) if spawnedPID == nil { logrus.Errorf("[-] Spawned PID is nil for IP: %s", ipAddr) + responseCollector <- &pubsub2.Message{ + ValidatorData: map[string]interface{}{"error": "Spawned PID is nil"}, + } return } client := node.ActorEngine.Spawn(props) @@ -261,6 +281,9 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { result, fErr := future.Result() if fErr != nil { logrus.Debugf("[-] Error receiving response from remote worker: %v", fErr) + responseCollector <- &pubsub2.Message{ + ValidatorData: map[string]interface{}{"error": fErr.Error()}, + } return } response := result.(*messages.Response) @@ -270,6 +293,9 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { gMsg, gErr := getResponseMessage(response) if gErr != nil { logrus.Errorf("[-] Error getting response message: %v", gErr) + responseCollector <- &pubsub2.Message{ + ValidatorData: map[string]interface{}{"error": gErr.Error()}, + } return } if gMsg != nil { @@ -277,6 +303,12 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { } } responseCollector <- msg + n++ + // cap at 3 for performance + if n == len(peers) || n == 3 { + logrus.Info("[+] All workers have responded") + responseCollector <- msg + } }(p) } } @@ -290,9 +322,7 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { case response := <-responseCollector: responses = append(responses, response) case <-timeout: - // Send queued responses to workerDoneCh for _, resp := range responses { - // @TODO add handling of failed responses here... workerDoneCh <- resp } return @@ -345,6 +375,7 @@ func MonitorWorkers(ctx context.Context, node *masa.OracleNode) { case work := <-node.WorkerTracker.WorkerStatusCh: logrus.Info("[+] Sending work to network") var workData map[string]string + err := json.Unmarshal(work.Data, &workData) if err != nil { logrus.Error("[-] Error unmarshalling work: ", err) @@ -392,11 +423,18 @@ func MonitorWorkers(ctx context.Context, node *masa.OracleNode) { * @param {masa.OracleNode} node - The OracleNode instance. */ func processValidatorData(data *pubsub2.Message, validatorDataMap map[string]interface{}, startTime *time.Time, node *masa.OracleNode) { + logrus.Infof("[+] Work validatorDataMap %s", validatorDataMap) if response, ok := validatorDataMap["Response"].(map[string]interface{}); ok { - if _, ok := response["error"].(string); ok { logrus.Infof("[+] Work failed %s", response["error"]) + // Set WorkerTimeout for the node + nodeData := node.NodeTracker.GetNodeData(data.ReceivedFrom.String()) + if nodeData != nil { + nodeData.WorkerTimeout = time.Now() + node.NodeTracker.AddOrUpdateNodeData(nodeData, true) + } + } else if work, ok := response["data"].(string); ok { processWork(data, work, startTime, node)