Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: Enhanced Data Collection and Response Time for SendWork and Twitter #471

Merged
merged 38 commits into from
Aug 2, 2024
Merged
Show file tree
Hide file tree
Changes from 29 commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
5f83a76
scraper tests
jdutchak Jul 30, 2024
d219340
scrape test
jdutchak Jul 31, 2024
04c06b7
handling worker errors, adding unit testing for 429
jdutchak Jul 31, 2024
1d4280d
added worker category and method in node data per bob to get capabili…
jdutchak Aug 1, 2024
789a39b
updated get twitter trends
jdutchak Aug 1, 2024
bd130a4
merge
jdutchak Aug 1, 2024
bafca74
Merge branch 'fix/tweet-errors' of https://github.com/masa-finance/ma…
jdutchak Aug 1, 2024
d91712c
local test
jdutchak Aug 1, 2024
36cf87d
testing *twitterscraper.TweetResult
jdutchak Aug 1, 2024
e03d9fc
added missing cfg.TelegramScraper check
jdutchak Aug 1, 2024
4c01581
Merge branch 'test' into fix/tweet-errors
jdutchak Aug 1, 2024
0964727
fix: update timeout to 10 seconds and update worker selection to n=3
teslashibe Aug 1, 2024
3ccec6a
Merge branch 'fix/tweet-errors' of https://github.com/masa-finance/ma…
teslashibe Aug 1, 2024
a910a76
fix: linting flags
teslashibe Aug 1, 2024
0caafeb
fix: linting in scrape_test.go
teslashibe Aug 1, 2024
4412ad2
fix: update sendWork to have a lower timeout than the hander timeout
teslashibe Aug 1, 2024
c112b94
merge conflict
jdutchak Aug 1, 2024
52abaee
modified error message to include peer id re processing
jdutchak Aug 1, 2024
a65940c
Implement adaptive timeouts based on historical node performance in n…
jdutchak Aug 1, 2024
30965ff
finished 429 unit test scrape_test and sending tweets error back to t…
jdutchak Aug 1, 2024
0b5831d
updated timeout to 16 min
jdutchak Aug 1, 2024
c2b7524
collecting err in twitter calls into Tweet struct, err will be in idx 0
jdutchak Aug 1, 2024
e29339c
parsing error for local and remote separately
jdutchak Aug 1, 2024
a9ed152
formatting error
jdutchak Aug 1, 2024
75f446c
moved workertimeoutcheck to candowork
jdutchak Aug 1, 2024
680e8b6
removed notes
jdutchak Aug 1, 2024
272ece4
removed obsoleted struct and commented code
jdutchak Aug 2, 2024
7fcd2a7
tests
jdutchak Aug 2, 2024
8bc5d80
added cleanupStalePeers feat
jdutchak Aug 2, 2024
f789126
added responseCollector <- &pubsub2.Message{} when local worker hangs
jdutchak Aug 2, 2024
280b764
checking peerid for empty
jdutchak Aug 2, 2024
29393a8
logging for nd parse err
jdutchak Aug 2, 2024
776bfac
fixing panic on multiaddr issue in addorupdatenodedata
jdutchak Aug 2, 2024
3fd2f06
Merge branch 'test' into fix/tweet-errors
5u6r054 Aug 2, 2024
701776e
dont remove self
jdutchak Aug 2, 2024
646cd99
added error message in sendwork to Set WorkerTimeout for the node
jdutchak Aug 2, 2024
421353d
checking response for proper error msg
jdutchak Aug 2, 2024
5bfa147
checking response for proper error msg
jdutchak Aug 2, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .env.example
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ DISCORD_SCRAPER=true
DISCORD_BOT_TOKEN= your discord bot token
TWITTER_SCRAPER=true
WEB_SCRAPER=true
TELEGRAM_SCRAPER=false

# Go to my.telegram.org/auth and after logging in click the developer option to get these
TELEGRAM_APP_ID=
TELEGRAM_APP_HASH=

## Optional added features for ORACLE or VALIDATOR nodes only

Expand Down
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ client: build
@./bin/masa-node-cli

test:
@go test ./...
@go test -v -count=1 ./...

clean:
@rm -rf bin
Expand Down
18 changes: 13 additions & 5 deletions RELEASE_NOTES.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,26 +2,34 @@

[All Releases](https://github.com/masa-finance/masa-oracle/releases)

## [0.0.9-beta](https://github.com/masa-finance/masa-oracle/releases) (2024)
## [0.0.5.x](https://github.com/masa-finance/masa-oracle/releases) (2024)

## Overview

This release of the Masa Oracle Node introduces the following new features and changes.

### Breaking Changes

* Protocol verion change to 0.0.9
* Protocol verion change to 0.5.x

### Bug fixes

* None
* Increased from selecting all available nodes to a maximum of XX nodes per request
* Reduced global timeout from 60 seconds to XX tbd seconds for faster response times

### New Features

* Added protocol blockchain ledger
* Node selection now considers worker category (Twitter, Telegram, Discord, Web) for more targeted task distribution

### Enhancements

* Improved Twitter scraping timeout and performance
* Implemented error handling in tweet collection
* Upgraded sentiment analysis to work updated tweet data structures
* Optimized worker selection for faster and more efficient data processing
* Implemented reduced timeout handling to have faster response times
* Implemented per-node timeout checks to identify and temporarily exclude underperforming nodes

## Change Log

* Version: 0.0.9-beta
* Version: 0.5.x
1 change: 1 addition & 0 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,6 +99,7 @@ func main() {
if node.IsStaked {
go workers.MonitorWorkers(ctx, node)
go masa.SubscribeToBlocks(ctx, node)
go node.NodeTracker.ClearExpiredWorkerTimeouts()
}

// Listen for SIGINT (CTRL+C)
Expand Down
3 changes: 2 additions & 1 deletion pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,8 @@ func handleWorkResponse(c *gin.Context, responseCh chan []byte) {

c.JSON(http.StatusOK, result)
return
case <-time.After(90 * time.Second):
// teslashibe: adjust to timeout after 10 seconds for performance testing
case <-time.After(10 * time.Second):
c.JSON(http.StatusGatewayTimeout, gin.H{"error": "Request timed out"})
return
case <-c.Done():
Expand Down
2 changes: 1 addition & 1 deletion pkg/config/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -112,8 +112,8 @@ func GetInstance() *AppConfig {
once.Do(func() {
instance = &AppConfig{}

instance.setDefaultConfig()
instance.setEnvVariableConfig()
instance.setDefaultConfig()
instance.setFileConfig(viper.GetString("FILE_PATH"))
err := instance.setCommandLineConfig()
if err != nil {
Expand Down
20 changes: 15 additions & 5 deletions pkg/llmbridge/sentiment.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,11 +19,21 @@ import (
// AnalyzeSentimentTweets analyzes the sentiment of the provided tweets by sending them to the Claude API.
// It concatenates the tweets, creates a payload, sends a request to Claude, parses the response,
// and returns the concatenated tweets content, a sentiment summary, and any error.
func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string, prompt string) (string, string, error) {
func AnalyzeSentimentTweets(tweets []*twitterscraper.TweetResult, model string, prompt string) (string, string, error) {
// check if we are using claude or gpt, can add others easily
if strings.Contains(model, "claude-") {
client := NewClaudeClient() // Adjusted to call without arguments
tweetsContent := ConcatenateTweets(tweets)

var validTweets []*twitterscraper.TweetResult
for _, tweet := range tweets {
if tweet.Error != nil {
logrus.WithError(tweet.Error).Warn("[-] Error in tweet")
continue
}
validTweets = append(validTweets, tweet)
}

tweetsContent := ConcatenateTweets(validTweets)
payloadBytes, err := CreatePayload(tweetsContent, model, prompt)
if err != nil {
logrus.Errorf("[-] Error creating payload: %v", err)
Expand Down Expand Up @@ -101,10 +111,10 @@ func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string, prompt

// ConcatenateTweets concatenates the text of the provided tweets into a single string,
// with each tweet separated by a newline character.
func ConcatenateTweets(tweets []*twitterscraper.Tweet) string {
func ConcatenateTweets(tweets []*twitterscraper.TweetResult) string {
var tweetsTexts []string
for _, tweet := range tweets {
tweetsTexts = append(tweetsTexts, tweet.Text)
for _, t := range tweets {
tweetsTexts = append(tweetsTexts, t.Tweet.Text)
}
return strings.Join(tweetsTexts, "\n")
}
Expand Down
7 changes: 5 additions & 2 deletions pkg/network/kdht.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@ func (dbValidator) Validate(_ string, _ []byte) error { return nil }
func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr,
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool) (*dht.IpfsDHT, error) {
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool, removePeerCallback func(peer.ID)) (*dht.IpfsDHT, error) {
options := make([]dht.Option, 0)
options = append(options, dht.BucketSize(100)) // Adjust bucket size
options = append(options, dht.Concurrency(100)) // Increase concurrency
Expand Down Expand Up @@ -64,6 +64,9 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
Source: "kdht",
}
peerChan <- pe
if removePeerCallback != nil {
removePeerCallback(p)
}
}

if err = kademliaDHT.Bootstrap(ctx); err != nil {
Expand Down Expand Up @@ -113,7 +116,7 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
if strings.Contains(err.Error(), "protocols not supported") {
logrus.Fatalf("[-] %s Please update to the latest version and make sure you are connecting to the correct network.", err.Error())
} else {
logrus.Error("[-] Error opening stream:", err)
logrus.Error("[-] Error opening stream: ", err)
}
return
}
Expand Down
7 changes: 6 additions & 1 deletion pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
Expand Down Expand Up @@ -215,7 +216,11 @@ func (node *OracleNode) Start() (err error) {
go node.ListenToNodeTracker()
go node.handleDiscoveredPeers()

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked)
removePeerCallback := func(p peer.ID) {
node.NodeTracker.RemoveNodeData(p.String())
}

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked, removePeerCallback)
if err != nil {
return err
}
Expand Down
40 changes: 39 additions & 1 deletion pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,14 +66,14 @@ type NodeData struct {
IsWebScraper bool `json:"isWebScraper"`
Records any `json:"records,omitempty"`
Version string `json:"version"`
WorkerTimeout time.Time `json:"workerTimeout,omitempty"`
}

// NewNodeData creates a new NodeData struct initialized with the given
// parameters. It is used to represent data about a node in the network.
func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, activity int) *NodeData {
multiaddrs := make([]JSONMultiaddr, 0)
multiaddrs = append(multiaddrs, JSONMultiaddr{addr})
// cfg := config.GetInstance()

return &NodeData{
PeerId: peerId,
Expand All @@ -94,6 +94,44 @@ func (n *NodeData) Address() string {
return fmt.Sprintf("%s/p2p/%s", n.Multiaddrs[0].String(), n.PeerId.String())
}

// WorkerCategory represents the main categories of workers
type WorkerCategory int

const (
CategoryDiscord WorkerCategory = iota
CategoryTelegram
CategoryTwitter
CategoryWeb
)

// String returns the string representation of the WorkerCategory
func (wc WorkerCategory) String() string {
return [...]string{"Discord", "Telegram", "Twitter", "Web"}[wc]
}

// CanDoWork checks if the node can perform work of the specified WorkerType.
// It returns true if the node is configured for the given worker type, false otherwise.
func (n *NodeData) CanDoWork(workerType WorkerCategory) bool {

if !n.WorkerTimeout.IsZero() && time.Since(n.WorkerTimeout) < 16*time.Minute {
logrus.Infof("[+] Skipping worker %s due to timeout", n.PeerId)
return false
}

switch workerType {
case CategoryTwitter:
return n.IsActive && n.IsTwitterScraper
case CategoryDiscord:
return n.IsActive && n.IsDiscordScraper
case CategoryTelegram:
return n.IsActive && n.IsTelegramScraper
case CategoryWeb:
return n.IsActive && n.IsWebScraper
default:
return false
}
}

// TwitterScraper checks if the current node is configured as a Twitter scraper.
// It retrieves the configuration instance and returns the value of the TwitterScraper field.
func (n *NodeData) TwitterScraper() bool {
Expand Down
69 changes: 69 additions & 0 deletions pkg/pubsub/node_event_tracker.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package pubsub

import (
"context"
"encoding/json"
"fmt"
"sort"
Expand Down Expand Up @@ -39,6 +40,7 @@
ConnectBuffer: make(map[string]ConnectBufferEntry),
}
go net.ClearExpiredBufferEntries()
go net.StartCleanupRoutine(context.Background())
return net
}

Expand Down Expand Up @@ -356,8 +358,75 @@
}
}

// RemoveNodeData removes the node data associated with the given peer ID from the NodeEventTracker.
// It deletes the node data from the internal map and removes any corresponding entry
// from the connect buffer. This function is typically called when a peer disconnects
// or is no longer part of the network.
//
// Parameters:
// - peerID: A string representing the ID of the peer to be removed.
func (net *NodeEventTracker) RemoveNodeData(peerID string) {
net.nodeData.Delete(peerID)
delete(net.ConnectBuffer, peerID)
logrus.Infof("[+] Removed peer %s from NodeTracker", peerID)
}

// ClearExpiredWorkerTimeouts periodically checks and clears expired worker timeouts.
// It runs in an infinite loop, sleeping for 5 minutes between each iteration.
// For each node in the network, it checks if the worker timeout has expired (after 60 minutes).
// If a timeout has expired, it resets the WorkerTimeout to zero and updates the node data.
// This function helps manage the availability of workers in the network by clearing
// temporary timeout states.
func (net *NodeEventTracker) ClearExpiredWorkerTimeouts() {
for {
time.Sleep(5 * time.Minute) // Check every 5 minutes
now := time.Now()

for _, nodeData := range net.GetAllNodeData() {
if !nodeData.WorkerTimeout.IsZero() && now.Sub(nodeData.WorkerTimeout) >= 16*time.Minute {
nodeData.WorkerTimeout = time.Time{} // Reset to zero value
net.AddOrUpdateNodeData(&nodeData, true)
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
Fixed Show fixed Hide fixed
}
}
}
}

const (
maxDisconnectionTime = 2 * time.Minute
cleanupInterval = 1 * time.Minute
)

// StartCleanupRoutine starts a goroutine that periodically checks for and removes stale peers
func (net *NodeEventTracker) StartCleanupRoutine(ctx context.Context) {
ticker := time.NewTicker(cleanupInterval)
defer ticker.Stop()

for {
select {
case <-ticker.C:
net.cleanupStalePeers()
case <-ctx.Done():
return
}
}
}

// cleanupStalePeers checks for and removes stale peers from both the routing table and node data
func (net *NodeEventTracker) cleanupStalePeers() {
now := time.Now()

for _, nodeData := range net.GetAllNodeData() {
if now.Sub(time.Unix(nodeData.LastUpdatedUnix, 0)) > maxDisconnectionTime {
logrus.Infof("Removing stale peer: %s", nodeData.PeerId)
net.RemoveNodeData(nodeData.PeerId.String())
delete(net.ConnectBuffer, nodeData.PeerId.String())

// Notify about peer removal
net.NodeDataChan <- &NodeData{
PeerId: nodeData.PeerId,
Activity: ActivityLeft,
LastUpdatedUnix: now.Unix(),
}
}
}
}
Loading
Loading