Skip to content

Commit

Permalink
fix: Enhanced Data Collection and Response Time for SendWork and Twit…
Browse files Browse the repository at this point in the history
…ter (#471)

* scraper tests

* scrape test

* handling worker errors, adding unit testing for 429

* added worker category and method in node data per bob to get capabilities

* updated get twitter trends

* merge

* local test

* testing *twitterscraper.TweetResult

* added missing cfg.TelegramScraper check

* fix: update timeout to 10 seconds and update worker selection to n=3

* fix: linting flags

* fix: linting in scrape_test.go

* fix: update sendWork to have a lower timeout than the hander timeout

* merge conflict

* modified error message to include peer id re processing

* Implement adaptive timeouts based on historical node performance in node tracker

* finished 429 unit test scrape_test and sending tweets error back to trigger worker timeout check

* updated timeout to 16 min

* collecting err in twitter calls into Tweet struct, err will be in idx 0

* parsing error for local and remote separately

* formatting error

* moved workertimeoutcheck to candowork

* removed notes

* removed obsoleted struct and commented code

* tests

* added cleanupStalePeers feat

* added responseCollector <- &pubsub2.Message{} when local worker hangs

* checking peerid for empty

* logging for nd parse err

* fixing panic on multiaddr issue in addorupdatenodedata

* dont remove self

* added error message in sendwork to Set WorkerTimeout for the node

* checking response for proper error msg

* checking response for proper error msg

---------

Co-authored-by: Brendan Playford <[email protected]>
Co-authored-by: J2D3 <[email protected]>
  • Loading branch information
3 people authored Aug 2, 2024
1 parent b212ff7 commit 5aa99ab
Show file tree
Hide file tree
Showing 18 changed files with 542 additions and 85 deletions.
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ DISCORD_SCRAPER=true
DISCORD_BOT_TOKEN= your discord bot token
TWITTER_SCRAPER=true
WEB_SCRAPER=true
TELEGRAM_SCRAPER=false

# Go to my.telegram.org/auth and after logging in click the developer option to get these
TELEGRAM_APP_ID=
TELEGRAM_APP_HASH=

## Optional added features for ORACLE or VALIDATOR nodes only

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ client: build
@./bin/masa-node-cli

test:
@go test ./...
@go test -v -count=1 ./...

clean:
@rm -rf bin
Expand Down
18 changes: 13 additions & 5 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,34 @@

[All Releases](https://github.com/masa-finance/masa-oracle/releases)

## [0.0.9-beta](https://github.com/masa-finance/masa-oracle/releases) (2024)
## [0.0.5.0](https://github.com/masa-finance/masa-oracle/releases) (2024)

## Overview

This release of the Masa Oracle Node introduces the following new features and changes.

### Breaking Changes

* Protocol verion change to 0.0.9
* Protocol verion change to 0.5.0

### Bug fixes

* None
* Increased from selecting all available nodes to a maximum of XX nodes per request
* Reduced global timeout from 60 seconds to XX tbd seconds for faster response times

### New Features

* Added protocol blockchain ledger
* Node selection now considers worker category (Twitter, Telegram, Discord, Web) for more targeted task distribution

### Enhancements

* Improved Twitter scraping timeout and performance
* Implemented error handling in tweet collection
* Upgraded sentiment analysis to work updated tweet data structures
* Optimized worker selection for faster and more efficient data processing
* Implemented reduced timeout handling to have faster response times
* Implemented per-node timeout checks to identify and temporarily exclude underperforming nodes

## Change Log

* Version: 0.0.9-beta
* Version: 0.5.0
1 change: 1 addition & 0 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func main() {
if node.IsStaked {
go workers.MonitorWorkers(ctx, node)
go masa.SubscribeToBlocks(ctx, node)
go node.NodeTracker.ClearExpiredWorkerTimeouts()
}

// Listen for SIGINT (CTRL+C)
Expand Down
5 changes: 3 additions & 2 deletions pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,8 +100,9 @@ func handleWorkResponse(c *gin.Context, responseCh chan []byte) {

c.JSON(http.StatusOK, result)
return
case <-time.After(90 * time.Second):
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out"})
// teslashibe: adjust to timeout after 10 seconds for performance testing
case <-time.After(10 * time.Second):
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out, check that port 4001 TCP inbound is open."})
return
case <-c.Done():
return
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func GetInstance() *AppConfig {
once.Do(func() {
instance = &AppConfig{}

instance.setDefaultConfig()
instance.setEnvVariableConfig()
instance.setDefaultConfig()
instance.setFileConfig(viper.GetString("FILE_PATH"))
err := instance.setCommandLineConfig()
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions pkg/llmbridge/sentiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ import (
// AnalyzeSentimentTweets analyzes the sentiment of the provided tweets by sending them to the Claude API.
// It concatenates the tweets, creates a payload, sends a request to Claude, parses the response,
// and returns the concatenated tweets content, a sentiment summary, and any error.
func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string, prompt string) (string, string, error) {
func AnalyzeSentimentTweets(tweets []*twitterscraper.TweetResult, model string, prompt string) (string, string, error) {
// check if we are using claude or gpt, can add others easily
if strings.Contains(model, "claude-") {
client := NewClaudeClient() // Adjusted to call without arguments
tweetsContent := ConcatenateTweets(tweets)

var validTweets []*twitterscraper.TweetResult
for _, tweet := range tweets {
if tweet.Error != nil {
logrus.WithError(tweet.Error).Warn("[-] Error in tweet")
continue
}
validTweets = append(validTweets, tweet)
}

tweetsContent := ConcatenateTweets(validTweets)
payloadBytes, err := CreatePayload(tweetsContent, model, prompt)
if err != nil {
logrus.Errorf("[-] Error creating payload: %v", err)
Expand Down Expand Up @@ -101,10 +111,10 @@ func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string, prompt

// ConcatenateTweets concatenates the text of the provided tweets into a single string,
// with each tweet separated by a newline character.
func ConcatenateTweets(tweets []*twitterscraper.Tweet) string {
func ConcatenateTweets(tweets []*twitterscraper.TweetResult) string {
var tweetsTexts []string
for _, tweet := range tweets {
tweetsTexts = append(tweetsTexts, tweet.Text)
for _, t := range tweets {
tweetsTexts = append(tweetsTexts, t.Tweet.Text)
}
return strings.Join(tweetsTexts, "\n")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/network/kdht.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (dbValidator) Validate(_ string, _ []byte) error { return nil }
func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr,
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool) (*dht.IpfsDHT, error) {
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool, removePeerCallback func(peer.ID)) (*dht.IpfsDHT, error) {
options := make([]dht.Option, 0)
options = append(options, dht.BucketSize(100)) // Adjust bucket size
options = append(options, dht.Concurrency(100)) // Increase concurrency
Expand Down Expand Up @@ -64,6 +64,9 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
Source: "kdht",
}
peerChan <- pe
if removePeerCallback != nil {
removePeerCallback(p)
}
}

if err = kademliaDHT.Bootstrap(ctx); err != nil {
Expand Down Expand Up @@ -113,7 +116,7 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
if strings.Contains(err.Error(), "protocols not supported") {
logrus.Fatalf("[-] %s Please update to the latest version and make sure you are connecting to the correct network.", err.Error())
} else {
logrus.Error("[-] Error opening stream:", err)
logrus.Error("[-] Error opening stream: ", err)
}
return
}
Expand Down
9 changes: 7 additions & 2 deletions pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
Expand Down Expand Up @@ -177,7 +178,7 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {
multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst),
Context: ctx,
PeerChan: make(chan myNetwork.PeerEvent),
NodeTracker: pubsub2.NewNodeEventTracker(config.Version, cfg.Environment),
NodeTracker: pubsub2.NewNodeEventTracker(config.Version, cfg.Environment, hst.ID().String()),
PubSubManager: subscriptionManager,
IsStaked: isStaked,
IsValidator: cfg.Validator,
Expand Down Expand Up @@ -215,7 +216,11 @@ func (node *OracleNode) Start() (err error) {
go node.ListenToNodeTracker()
go node.handleDiscoveredPeers()

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked)
removePeerCallback := func(p peer.ID) {
node.NodeTracker.RemoveNodeData(p.String())
}

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked, removePeerCallback)
if err != nil {
return err
}
Expand Down
6 changes: 6 additions & 0 deletions pkg/oracle_node_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,6 +161,12 @@ func (node *OracleNode) ReceiveNodeData(stream network.Stream) {
for scanner.Scan() {
data := scanner.Bytes()
var page NodeDataPage

if err := json.Unmarshal(data, &page); err != nil {
logrus.Errorf("[-] Failed to unmarshal NodeData page: %v %s %+v", err, string(data), page)
continue
}

if err := json.Unmarshal(data, &page); err != nil {
logrus.Errorf("[-] Failed to unmarshal NodeData page: %v", err)
continue
Expand Down
40 changes: 39 additions & 1 deletion pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ type NodeData struct {
IsWebScraper bool `json:"isWebScraper"`
Records any `json:"records,omitempty"`
Version string `json:"version"`
WorkerTimeout time.Time `json:"workerTimeout,omitempty"`
}

// NewNodeData creates a new NodeData struct initialized with the given
// parameters. It is used to represent data about a node in the network.
func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData {
multiaddrs := make([]JSONMultiaddr, 0)
multiaddrs = append(multiaddrs, JSONMultiaddr{addr})
// cfg := config.GetInstance()

return &NodeData{
PeerId: peerId,
Expand All @@ -94,6 +94,44 @@ func (n *NodeData) Address() string {
return fmt.Sprintf("%s/p2p/%s", n.Multiaddrs[0].String(), n.PeerId.String())
}

// WorkerCategory represents the main categories of workers
type WorkerCategory int

const (
CategoryDiscord WorkerCategory = iota
CategoryTelegram
CategoryTwitter
CategoryWeb
)

// String returns the string representation of the WorkerCategory
func (wc WorkerCategory) String() string {
return [...]string{"Discord", "Telegram", "Twitter", "Web"}[wc]
}

// CanDoWork checks if the node can perform work of the specified WorkerType.
// It returns true if the node is configured for the given worker type, false otherwise.
func (n *NodeData) CanDoWork(workerType WorkerCategory) bool {

if !n.WorkerTimeout.IsZero() && time.Since(n.WorkerTimeout) < 16*time.Minute {
logrus.Infof("[+] Skipping worker %s due to timeout", n.PeerId)
return false
}

switch workerType {
case CategoryTwitter:
return n.IsActive && n.IsTwitterScraper
case CategoryDiscord:
return n.IsActive && n.IsDiscordScraper
case CategoryTelegram:
return n.IsActive && n.IsTelegramScraper
case CategoryWeb:
return n.IsActive && n.IsWebScraper
default:
return false
}
}

// TwitterScraper checks if the current node is configured as a Twitter scraper.
// It retrieves the configuration instance and returns the value of the TwitterScraper field.
func (n *NodeData) TwitterScraper() bool {
Expand Down
Loading

0 comments on commit 5aa99ab

Please sign in to comment.