Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Enhanced Data Collection and Response Time for SendWork and Twitter #471

Merged
merged 38 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 11 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5f83a76
scraper tests
jdutchak Jul 30, 2024
d219340
scrape test
jdutchak Jul 31, 2024
04c06b7
handling worker errors, adding unit testing for 429
jdutchak Jul 31, 2024
1d4280d
added worker category and method in node data per bob to get capabili…
jdutchak Aug 1, 2024
789a39b
updated get twitter trends
jdutchak Aug 1, 2024
bd130a4
merge
jdutchak Aug 1, 2024
bafca74
Merge branch 'fix/tweet-errors' of https://github.com/masa-finance/ma…
jdutchak Aug 1, 2024
d91712c
local test
jdutchak Aug 1, 2024
36cf87d
testing *twitterscraper.TweetResult
jdutchak Aug 1, 2024
e03d9fc
added missing cfg.TelegramScraper check
jdutchak Aug 1, 2024
4c01581
Merge branch 'test' into fix/tweet-errors
jdutchak Aug 1, 2024
0964727
fix: update timeout to 10 seconds and update worker selection to n=3
teslashibe Aug 1, 2024
3ccec6a
Merge branch 'fix/tweet-errors' of https://github.com/masa-finance/ma…
teslashibe Aug 1, 2024
a910a76
fix: linting flags
teslashibe Aug 1, 2024
0caafeb
fix: linting in scrape_test.go
teslashibe Aug 1, 2024
4412ad2
fix: update sendWork to have a lower timeout than the hander timeout
teslashibe Aug 1, 2024
c112b94
merge conflict
jdutchak Aug 1, 2024
52abaee
modified error message to include peer id re processing
jdutchak Aug 1, 2024
a65940c
Implement adaptive timeouts based on historical node performance in n…
jdutchak Aug 1, 2024
30965ff
finished 429 unit test scrape_test and sending tweets error back to t…
jdutchak Aug 1, 2024
0b5831d
updated timeout to 16 min
jdutchak Aug 1, 2024
c2b7524
collecting err in twitter calls into Tweet struct, err will be in idx 0
jdutchak Aug 1, 2024
e29339c
parsing error for local and remote separately
jdutchak Aug 1, 2024
a9ed152
formatting error
jdutchak Aug 1, 2024
75f446c
moved workertimeoutcheck to candowork
jdutchak Aug 1, 2024
680e8b6
removed notes
jdutchak Aug 1, 2024
272ece4
removed obsoleted struct and commented code
jdutchak Aug 2, 2024
7fcd2a7
tests
jdutchak Aug 2, 2024
8bc5d80
added cleanupStalePeers feat
jdutchak Aug 2, 2024
f789126
added responseCollector <- &pubsub2.Message{} when local worker hangs
jdutchak Aug 2, 2024
280b764
checking peerid for empty
jdutchak Aug 2, 2024
29393a8
logging for nd parse err
jdutchak Aug 2, 2024
776bfac
fixing panic on multiaddr issue in addorupdatenodedata
jdutchak Aug 2, 2024
3fd2f06
Merge branch 'test' into fix/tweet-errors
5u6r054 Aug 2, 2024
701776e
dont remove self
jdutchak Aug 2, 2024
646cd99
added error message in sendwork to Set WorkerTimeout for the node
jdutchak Aug 2, 2024
421353d
checking response for proper error msg
jdutchak Aug 2, 2024
5bfa147
checking response for proper error msg
jdutchak Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ DISCORD_SCRAPER=true
DISCORD_BOT_TOKEN= your discord bot token
TWITTER_SCRAPER=true
WEB_SCRAPER=true
TELEGRAM_SCRAPER=false

# Go to my.telegram.org/auth and after logging in click the developer option to get these
TELEGRAM_APP_ID=
TELEGRAM_APP_HASH=

## Optional added features for ORACLE or VALIDATOR nodes only

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ client: build
@./bin/masa-node-cli

test:
@go test ./...
@go test -v -count=1 ./...

clean:
@rm -rf bin
Expand Down
2 changes: 1 addition & 1 deletion pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func handleWorkResponse(c *gin.Context, responseCh chan []byte) {

c.JSON(http.StatusOK, result)
return
case <-time.After(90 * time.Second):
case <-time.After(60 * time.Second):
jdutchak marked this conversation as resolved.
Show resolved Hide resolved
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out"})
return
case <-c.Done():
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func GetInstance() *AppConfig {
once.Do(func() {
instance = &AppConfig{}

instance.setDefaultConfig()
instance.setEnvVariableConfig()
instance.setDefaultConfig()
instance.setFileConfig(viper.GetString("FILE_PATH"))
err := instance.setCommandLineConfig()
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions pkg/llmbridge/sentiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ import (
// AnalyzeSentimentTweets analyzes the sentiment of the provided tweets by sending them to the Claude API.
// It concatenates the tweets, creates a payload, sends a request to Claude, parses the response,
// and returns the concatenated tweets content, a sentiment summary, and any error.
func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string, prompt string) (string, string, error) {
func AnalyzeSentimentTweets(tweets []*twitterscraper.TweetResult, model string, prompt string) (string, string, error) {
// check if we are using claude or gpt, can add others easily
if strings.Contains(model, "claude-") {
client := NewClaudeClient() // Adjusted to call without arguments
tweetsContent := ConcatenateTweets(tweets)

var validTweets []*twitterscraper.TweetResult
for _, tweet := range tweets {
if tweet.Error != nil {
logrus.WithError(tweet.Error).Warn("[-] Error in tweet")
continue
}
validTweets = append(validTweets, tweet)
}

tweetsContent := ConcatenateTweets(validTweets)
payloadBytes, err := CreatePayload(tweetsContent, model, prompt)
if err != nil {
logrus.Errorf("[-] Error creating payload: %v", err)
Expand Down Expand Up @@ -101,10 +111,10 @@ func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string, prompt

// ConcatenateTweets concatenates the text of the provided tweets into a single string,
// with each tweet separated by a newline character.
func ConcatenateTweets(tweets []*twitterscraper.Tweet) string {
func ConcatenateTweets(tweets []*twitterscraper.TweetResult) string {
var tweetsTexts []string
for _, tweet := range tweets {
tweetsTexts = append(tweetsTexts, tweet.Text)
for _, t := range tweets {
tweetsTexts = append(tweetsTexts, t.Tweet.Text)
}
return strings.Join(tweetsTexts, "\n")
}
Expand Down
78 changes: 55 additions & 23 deletions pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -44,36 +44,36 @@ func (m *JSONMultiaddr) UnmarshalJSON(b []byte) error {
}

type NodeData struct {
Multiaddrs []JSONMultiaddr `json:"multiaddrs,omitempty"`
PeerId peer.ID `json:"peerId"`
FirstJoinedUnix int64 `json:"firstJoined,omitempty"`
LastJoinedUnix int64 `json:"lastJoined,omitempty"`
LastLeftUnix int64 `json:"-"`
LastUpdatedUnix int64 `json:"lastUpdated,omitempty"`
CurrentUptime time.Duration `json:"uptime,omitempty"`
CurrentUptimeStr string `json:"uptimeStr,omitempty"`
AccumulatedUptime time.Duration `json:"accumulatedUptime,omitempty"`
AccumulatedUptimeStr string `json:"accumulatedUptimeStr,omitempty"`
EthAddress string `json:"ethAddress,omitempty"`
Activity int `json:"activity,omitempty"`
IsActive bool `json:"isActive"`
IsStaked bool `json:"isStaked"`
SelfIdentified bool `json:"-"`
IsValidator bool `json:"isValidator"`
IsTwitterScraper bool `json:"isTwitterScraper"`
IsDiscordScraper bool `json:"isDiscordScraper"`
IsTelegramScraper bool `json:"isTelegramScraper"`
IsWebScraper bool `json:"isWebScraper"`
Records any `json:"records,omitempty"`
Version string `json:"version"`
Multiaddrs []JSONMultiaddr `json:"multiaddrs,omitempty"`
PeerId peer.ID `json:"peerId"`
FirstJoinedUnix int64 `json:"firstJoined,omitempty"`
LastJoinedUnix int64 `json:"lastJoined,omitempty"`
LastLeftUnix int64 `json:"-"`
LastUpdatedUnix int64 `json:"lastUpdated,omitempty"`
CurrentUptime time.Duration `json:"uptime,omitempty"`
CurrentUptimeStr string `json:"uptimeStr,omitempty"`
AccumulatedUptime time.Duration `json:"accumulatedUptime,omitempty"`
AccumulatedUptimeStr string `json:"accumulatedUptimeStr,omitempty"`
EthAddress string `json:"ethAddress,omitempty"`
Activity int `json:"activity,omitempty"`
IsActive bool `json:"isActive"`
IsStaked bool `json:"isStaked"`
SelfIdentified bool `json:"-"`
IsValidator bool `json:"isValidator"`
IsTwitterScraper bool `json:"isTwitterScraper"`
IsDiscordScraper bool `json:"isDiscordScraper"`
IsTelegramScraper bool `json:"isTelegramScraper"`
IsWebScraper bool `json:"isWebScraper"`
Records any `json:"records,omitempty"`
Version string `json:"version"`
TimeoutChecks map[string]time.Time `json:"timeoutChecks,omitempty"`
}

// NewNodeData creates a new NodeData struct initialized with the given
// parameters. It is used to represent data about a node in the network.
func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData {
multiaddrs := make([]JSONMultiaddr, 0)
multiaddrs = append(multiaddrs, JSONMultiaddr{addr})
// cfg := config.GetInstance()

return &NodeData{
PeerId: peerId,
Expand All @@ -94,6 +94,38 @@ func (n *NodeData) Address() string {
return fmt.Sprintf("%s/p2p/%s", n.Multiaddrs[0].String(), n.PeerId.String())
}

// WorkerCategory represents the main categories of workers
type WorkerCategory int

const (
CategoryDiscord WorkerCategory = iota
CategoryTelegram
CategoryTwitter
CategoryWeb
)

// String returns the string representation of the WorkerCategory
func (wc WorkerCategory) String() string {
return [...]string{"Discord", "Telegram", "Twitter", "Web"}[wc]
}

// CanDoWork checks if the node can perform work of the specified WorkerType.
// It returns true if the node is configured for the given worker type, false otherwise.
func (n *NodeData) CanDoWork(workerType WorkerCategory) bool {
switch workerType {
case CategoryTwitter:
return n.IsActive && n.IsTwitterScraper
case CategoryDiscord:
return n.IsActive && n.IsDiscordScraper
case CategoryTelegram:
return n.IsActive && n.IsTelegramScraper
case CategoryWeb:
return n.IsActive && n.IsWebScraper
default:
return false
}
}

// TwitterScraper checks if the current node is configured as a Twitter scraper.
// It retrieves the configuration instance and returns the value of the TwitterScraper field.
func (n *NodeData) TwitterScraper() bool {
Expand Down
64 changes: 49 additions & 15 deletions pkg/scrapers/twitter/tweets.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
"github.com/sirupsen/logrus"
)

type TweetResult struct {
Tweet *twitterscraper.Tweet
Error error
}

// auth initializes and returns a new Twitter scraper instance. It attempts to load cookies from a file to reuse an existing session.
// If no valid session is found, it performs a login with credentials specified in the application's configuration.
// On successful login, it saves the session cookies for future use. If the login fails, it returns nil.
Expand Down Expand Up @@ -69,7 +74,7 @@
// - An error if the scraping or sentiment analysis process encounters any issues.
func ScrapeTweetsForSentiment(query string, count int, model string) (string, string, error) {
scraper := auth()
var tweets []*twitterscraper.Tweet
var tweets []*TweetResult

if scraper == nil {
return "", "", fmt.Errorf("there was an error authenticating with your Twitter credentials")
Expand All @@ -80,14 +85,30 @@

// Perform the search with the specified query and count
for tweetResult := range scraper.SearchTweets(context.Background(), query, count) {
var tweet TweetResult
if tweetResult.Error != nil {
logrus.Printf("Error fetching tweet: %v", tweetResult.Error)
continue
tweet = TweetResult{
Tweet: nil,
Error: tweetResult.Error,
}
} else {
tweet = TweetResult{
Tweet: tweetResult.Tweet,

Check failure on line 96 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / test

cannot use tweetResult.Tweet (variable of type twitterscraper.Tweet) as *twitterscraper.Tweet value in struct literal

Check failure on line 96 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / test

cannot use tweetResult.Tweet (variable of type twitterscraper.Tweet) as *twitterscraper.Tweet value in struct literal

Check failure on line 96 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / build-pr

cannot use tweetResult.Tweet (variable of type twitterscraper.Tweet) as *twitterscraper.Tweet value in struct literal
Error: nil,
}
}
tweets = append(tweets, &tweetResult.Tweet)
tweets = append(tweets, &tweet)
}
sentimentPrompt := "Please perform a sentiment analysis on the following tweets, using an unbiased approach. Sentiment analysis involves identifying and categorizing opinions expressed in text, particularly to determine whether the writer's attitude towards a particular topic, product, etc., is positive, negative, or neutral. After analyzing, please provide a summary of the overall sentiment expressed in these tweets, including the proportion of positive, negative, and neutral sentiments if applicable."
prompt, sentiment, err := llmbridge.AnalyzeSentimentTweets(tweets, model, sentimentPrompt)

twitterScraperTweets := make([]*twitterscraper.TweetResult, len(tweets))
for i, tweet := range tweets {
twitterScraperTweets[i] = &twitterscraper.TweetResult{
Tweet: tweet.Tweet,

Check failure on line 107 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / test

cannot use tweet.Tweet (variable of type *twitterscraper.Tweet) as twitterscraper.Tweet value in struct literal

Check failure on line 107 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / test

cannot use tweet.Tweet (variable of type *twitterscraper.Tweet) as twitterscraper.Tweet value in struct literal

Check failure on line 107 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / build-pr

cannot use tweet.Tweet (variable of type *twitterscraper.Tweet) as twitterscraper.Tweet value in struct literal
Error: tweet.Error,
}
}
prompt, sentiment, err := llmbridge.AnalyzeSentimentTweets(twitterScraperTweets, model, sentimentPrompt)
if err != nil {
return "", "", err
}
Expand All @@ -103,9 +124,9 @@
// Returns:
// - A slice of pointers to twitterscraper.Tweet objects that match the search query.
// - An error if the scraping process encounters any issues.
func ScrapeTweetsByQuery(query string, count int) ([]*twitterscraper.Tweet, error) {
func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) {
scraper := auth()
var tweets []*twitterscraper.Tweet
var tweets []*TweetResult

if scraper == nil {
return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials")
Expand All @@ -116,21 +137,29 @@

// Perform the search with the specified query and count
for tweetResult := range scraper.SearchTweets(context.Background(), query, count) {
var tweet TweetResult
if tweetResult.Error != nil {
logrus.Printf("Error fetching tweet: %v", tweetResult.Error)
continue
tweet = TweetResult{
Tweet: nil,
Error: tweetResult.Error,
}
} else {
tweet = TweetResult{
Tweet: tweetResult.Tweet,

Check failure on line 148 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / test

cannot use tweetResult.Tweet (variable of type twitterscraper.Tweet) as *twitterscraper.Tweet value in struct literal

Check failure on line 148 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / test

cannot use tweetResult.Tweet (variable of type twitterscraper.Tweet) as *twitterscraper.Tweet value in struct literal

Check failure on line 148 in pkg/scrapers/twitter/tweets.go

View workflow job for this annotation

GitHub Actions / build-pr

cannot use tweetResult.Tweet (variable of type twitterscraper.Tweet) as *twitterscraper.Tweet value in struct literal
Error: nil,
}
}
tweets = append(tweets, &tweetResult.Tweet)
tweets = append(tweets, &tweet)
}
return tweets, nil
}

// ScrapeTweetsByTrends scrapes the current trending topics on Twitter.
// It returns a slice of strings representing the trending topics.
// If an error occurs during the scraping process, it returns an error.
func ScrapeTweetsByTrends() ([]string, error) {
func ScrapeTweetsByTrends() ([]*TweetResult, error) {
scraper := auth()
var tweets []string
var trendResults []*TweetResult

if scraper == nil {
return nil, fmt.Errorf("there was an error authenticating with your Twitter credentials")
Expand All @@ -141,13 +170,18 @@

trends, err := scraper.GetTrends()
if err != nil {
logrus.Printf("Error fetching tweet: %v", err)
return nil, err
}

tweets = append(tweets, trends...)
for _, trend := range trends {
trendResult := &TweetResult{
Tweet: &twitterscraper.Tweet{Text: trend},
Error: nil,
}
trendResults = append(trendResults, trendResult)
}

return tweets, nil
return trendResults, nil
}

// ScrapeTweetsProfile scrapes the profile and tweets of a specific Twitter user.
Expand Down
Loading
Loading