From b5d112009239c9f1fb207dde7dd6b8415d824f36 Mon Sep 17 00:00:00 2001 From: Mario Camou Date: Thu, 7 Nov 2024 16:52:57 +0100 Subject: [PATCH] chore(all): Cleanup and linting (#623) --- .github/workflows/tests.yaml | 15 ++-- .gitignore | 3 + Makefile | 17 +++-- cmd/masa-node-cli/handlers.go | 76 +++++++++++--------- cmd/masa-node/main.go | 10 +-- go.mod | 1 - go.sum | 2 - node/oracle_node.go | 53 ++++++-------- pkg/api/handlers_node.go | 15 ++-- pkg/chain/chain.go | 15 ++-- pkg/db/operations.go | 82 ++-------------------- pkg/event/event_client.go | 2 +- pkg/llmbridge/sentiment.go | 14 ++-- pkg/network/address.go | 27 ++----- pkg/network/kdht.go | 2 +- pkg/network/mdns.go | 2 +- pkg/pubsub/node_event_tracker.go | 4 +- pkg/scrapers/telegram/telegram_client.go | 2 - pkg/scrapers/twitter/followers.go | 10 +-- pkg/scrapers/web/web.go | 7 +- pkg/tests/scrapers/twitter_scraper_test.go | 5 +- pkg/tests/twitter/twitter_scraper_test.go | 4 +- pkg/workers/worker_selection.go | 2 - 23 files changed, 156 insertions(+), 214 deletions(-) diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 10642528..97b256b1 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -9,6 +9,7 @@ on: - '**' jobs: test: + runs-on: ubuntu-latest steps: @@ -20,12 +21,16 @@ jobs: with: go-version: '^1.22' - - name: Install dependencies - run: go mod tidy + - name: Install golangci-lint + run: sudo snap install golangci-lint --classic + + - name: Code formatting and linting + run: make ci-lint + - name: Run tests - run: | - make test + run: make test + - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: - token: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.gitignore b/.gitignore index 7b804f9f..eb53ba71 100644 --- a/.gitignore +++ b/.gitignore @@ -73,3 +73,6 @@ snippets.txt # Build result of goreleaser dist/ bp-todo.md + +# Result of running tests with coverage +coverage.txt diff --git a/Makefile b/Makefile index e7c03c02..186f6bcd 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ build: contracts/node_modules install: @sh ./node_install.sh - + run: build @./bin/masa-node @@ -44,16 +44,25 @@ stake: build client: build @./bin/masa-node-cli +# TODO Add -race and fix race conditions test: contracts/node_modules - @go test -coverprofile=coverage.txt -covermode=atomic -v ./... + @go test -coverprofile=coverage.txt -covermode=atomic -v -count=1 -shuffle=on ./... + +ci-lint: + go mod tidy && git diff --exit-code + go mod download + go mod verify + gofmt -s -w . && git diff --exit-code + go vet ./... + golangci-lint run clean: @rm -rf bin - + @if [ -d ~/.masa/blocks ]; then rm -rf ~/.masa/blocks; fi @if [ -d ~/.masa/cache ]; then rm -rf ~/.masa/cache; fi @if [ -f masa_node.log ]; then rm masa_node.log; fi - + proto: sh pkg/workers/messages/build.sh diff --git a/cmd/masa-node-cli/handlers.go b/cmd/masa-node-cli/handlers.go index 5ffd0da7..32d14c88 100644 --- a/cmd/masa-node-cli/handlers.go +++ b/cmd/masa-node-cli/handlers.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "log" "net/http" "os" "os/exec" @@ -43,21 +42,6 @@ func handleIPAddress(multiAddr string) string { return "" } -// handleOpenFile reads the content of a file specified by the filename 'f' and returns it as a string. -// If the file cannot be read, the function logs a fatal error and exits the program. -// Parameters: -// - f: The name of the file to read. -// Returns: -// - A string containing the content of the file. -// func handleOpenFile(f string) string { -// dat, err := os.ReadFile(f) -// if err != nil { -// log.Print(err) -// return "" -// } -// return string(dat) -// } - // handleSaveFile writes the provided content to a file specified by the filename 'f'. // It appends the content to the file if it already exists, or creates a new file with the content if it does not. // The file is created with permissions set to 0755. @@ -66,9 +50,14 @@ func handleIPAddress(multiAddr string) string { // - content: The content to write to the file. func handleSaveFile(f string, content string) { file, err := os.OpenFile(f, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755) - file.WriteString(content + "\n") if err != nil { - log.Println(err) + logrus.Errorf("[-] Error while opening file %s for writing: %v", f, err) + return + } + + _, err = file.WriteString(content + "\n") + if err != nil { + logrus.Errorf("[-] Error while writing to file %s: %v", f, err) return } } @@ -84,7 +73,7 @@ func handleSaveFile(f string, content string) { func handleGPT(prompt string, userMessage string) (string, error) { key := os.Getenv("OPENAI_API_KEY") if key == "" { - log.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.") + logrus.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.") return "", errors.New("OPENAI_API_KEY is not set") } client := openai.NewClient(key) @@ -105,7 +94,7 @@ func handleGPT(prompt string, userMessage string) (string, error) { }, ) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while getting ChatGPT completion: %v", err) return "", err } return resp.Choices[0].Message.Content, nil @@ -131,7 +120,7 @@ func handleSpeak(response string) { req, err := http.NewRequest(http.MethodPost, os.Getenv("ELAB_URL"), bytes.NewBuffer(buf)) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while creating HTTP request to %s: %v", os.Getenv("ELAB_URL"), err) return } req.Header.Set("accept", "*/*") @@ -140,7 +129,7 @@ func handleSpeak(response string) { resp, err := http.DefaultClient.Do(req) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while sending HTTP POST to %s: %v", os.Getenv("ELAB_URL"), err) return } defer resp.Body.Close() @@ -148,19 +137,39 @@ func handleSpeak(response string) { if resp.StatusCode == http.StatusOK { bodyBytes, err := io.ReadAll(resp.Body) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while reading HTTP reply from ElevenLabs API: %v", err) return } + + // TODO: Configure filename file, err := os.Create("output.mp3") - file.Write(bodyBytes) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while opening output.mp3 for writing: %v", err) + return + } + + _, err = file.Write(bodyBytes) + if err != nil { + logrus.Errorf("[-] Error while writing voice data to output.mp3: %v", err) return - } else { - cmd := exec.Command("afplay", "output.mp3") - go cmd.Run() - go handleTranscribe("output.mp3", "transcription.txt") } + + // TODO: Is afplay available in all platforms? Perhaps configure? + cmd := exec.Command("afplay", "output.mp3") + go func() { + err := cmd.Run() + if err != nil { + logrus.Errorf("[-] Error while playing output using %s: %v", cmd, err) + } + }() + go func() { + err := handleTranscribe("output.mp3", "transcription.txt") + if err != nil { + logrus.Errorf("[-] Error while transcribing audio: %v", err) + } + }() + + // TODO: perhaps rm output.mp3? } } @@ -169,8 +178,7 @@ func handleSpeak(response string) { func handleTranscribe(audioFile string, txtFile string) error { key := os.Getenv("OPENAI_API_KEY") if key == "" { - log.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.") - return errors.New("OPENAI_API_KEY is not set") + return errors.New("OPENAI_API_KEY is not set. Please set the environment variable and try again.") } client := openai.NewClient(key) ctx := context.Background() @@ -178,13 +186,13 @@ func handleTranscribe(audioFile string, txtFile string) error { Model: openai.Whisper1, FilePath: audioFile, } + resp, err := client.CreateTranscription(ctx, req) if err != nil { - fmt.Printf("Transcription error: %v\n", err) return err - } else { - handleSaveFile(txtFile, resp.Text) } + + handleSaveFile(txtFile, resp.Text) return nil } diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index 511140e8..4ce683ca 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -70,8 +70,6 @@ func main() { logrus.Warn("No staking event found for this address") } - isValidator := cfg.Validator - masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg) // Create a new OracleNode masaNode, err := node.NewOracleNode(ctx, masaNodeOptions...) @@ -80,8 +78,7 @@ func main() { logrus.Fatal(err) } - err = masaNode.Start() - if err != nil { + if err = masaNode.Start(); err != nil { logrus.Fatal(err) } @@ -118,8 +115,11 @@ func main() { // Get the multiaddress and IP address of the node multiAddr := masaNode.GetMultiAddrs() // Get the multiaddress ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address + if err != nil { + logrus.Errorf("[-] Error while getting node IP address from %v: %v", multiAddr, err) + } // Display the welcome message with the multiaddress and IP address - config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) + config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, cfg.Validator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) <-ctx.Done() } diff --git a/go.mod b/go.mod index f1ee6ac3..f815b6c7 100644 --- a/go.mod +++ b/go.mod @@ -23,7 +23,6 @@ require ( github.com/ipfs/go-ds-leveldb v0.5.0 github.com/ipfs/go-ipfs-api v0.7.0 github.com/joho/godotenv v1.5.1 - github.com/lib/pq v1.10.9 github.com/libp2p/go-libp2p v0.36.3 github.com/libp2p/go-libp2p-kad-dht v0.26.1 github.com/libp2p/go-libp2p-pubsub v0.12.0 diff --git a/go.sum b/go.sum index fb72a48b..88e349ba 100644 --- a/go.sum +++ b/go.sum @@ -404,8 +404,6 @@ github.com/leanovate/gopter v0.2.9 h1:fQjYxZaynp97ozCzfOyOuAGOU4aU/z37zf/tOujFk7 github.com/leanovate/gopter v0.2.9/go.mod h1:U2L/78B+KVFIx2VmW6onHJQzXtFb+p5y3y2Sh+Jxxv8= github.com/leodido/go-urn v1.4.0 h1:WT9HwE9SGECu3lg4d/dIA+jxlljEa1/ffXKmRjqdmIQ= github.com/leodido/go-urn v1.4.0/go.mod h1:bvxc+MVxLKB4z00jd1z+Dvzr47oO32F/QSNjSBOlFxI= -github.com/lib/pq v1.10.9 h1:YXG7RB+JIjhP29X+OtkiDnYaXQwpS4JEWq7dtCCRUEw= -github.com/lib/pq v1.10.9/go.mod h1:AlVN5x4E4T544tWzH6hKfbfQvm3HdbOxrmggDNAPY9o= github.com/libp2p/go-buffer-pool v0.1.0 h1:oK4mSFcQz7cTQIfqbe4MIj9gLW+mnanjyFtc6cdF0Y8= github.com/libp2p/go-buffer-pool v0.1.0/go.mod h1:N+vh8gMqimBzdKkSMVuydVDq+UV5QTWy5HSiZacSbPg= github.com/libp2p/go-cidranger v1.1.0 h1:ewPN8EZ0dd1LSnrtuwd4709PXVcITVeuwbag38yPW7c= diff --git a/node/oracle_node.go b/node/oracle_node.go index f5666335..762ff7c9 100644 --- a/node/oracle_node.go +++ b/node/oracle_node.go @@ -33,8 +33,9 @@ import ( ) type OracleNode struct { - Host host.Host - Protocol protocol.ID + Host host.Host + Protocol protocol.ID + // TODO: Rm from here and from NodeData? Should not be necessary priorityAddrs multiaddr.Multiaddr multiAddrs []multiaddr.Multiaddr DHT *dht.IpfsDHT @@ -134,9 +135,13 @@ func NewOracleNode(ctx context.Context, opts ...Option) (*OracleNode, error) { return nil, err } + ma, err := myNetwork.GetMultiAddressesForHost(hst) + if err != nil { + return nil, err + } n := &OracleNode{ Host: hst, - multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), + multiAddrs: ma, PeerChan: make(chan myNetwork.PeerEvent), NodeTracker: pubsub.NewNodeEventTracker(versioning.ProtocolVersion, o.Environment, hst.ID().String()), Context: ctx, @@ -163,14 +168,22 @@ func (node *OracleNode) generateEthHexKeyForRandomIdentity() (string, error) { return common.BytesToAddress(ethereumCrypto.Keccak256(rawKey[1:])[12:]).Hex(), nil } -func (node *OracleNode) getNodeData(host host.Host, addr multiaddr.Multiaddr, publicEthAddress string) *pubsub.NodeData { +func (node *OracleNode) getNodeData() *pubsub.NodeData { // GetSelfNodeData converts the local node's data into a JSON byte array. // It populates a NodeData struct with the node's ID, staking status, and Ethereum address. // The NodeData struct is then marshalled into a JSON byte array. // Returns nil if there is an error marshalling to JSON. // Create and populate NodeData - nodeData := pubsub.NewNodeData(addr, host.ID(), publicEthAddress, pubsub.ActivityJoined) - nodeData.MultiaddrsString = addr.String() + + var publicEthAddress string + if node.Options.RandomIdentity { + publicEthAddress, _ = node.generateEthHexKeyForRandomIdentity() + } else { + publicEthAddress = masacrypto.KeyManagerInstance().EthAddress + } + + nodeData := pubsub.NewNodeData(node.priorityAddrs, node.Host.ID(), publicEthAddress, pubsub.ActivityJoined) + nodeData.MultiaddrsString = node.priorityAddrs.String() nodeData.IsStaked = node.Options.IsStaked nodeData.IsTwitterScraper = node.Options.IsTwitterScraper nodeData.IsDiscordScraper = node.Options.IsDiscordScraper @@ -211,26 +224,19 @@ func (node *OracleNode) Start() (err error) { go node.handleDiscoveredPeers() go node.NodeTracker.ClearExpiredWorkerTimeouts() - var publicKeyHex string - if node.Options.RandomIdentity { - publicKeyHex, _ = node.generateEthHexKeyForRandomIdentity() - } else { - publicKeyHex = masacrypto.KeyManagerInstance().EthAddress - } - - myNodeData := node.getNodeData(node.Host, node.priorityAddrs, publicKeyHex) + myNodeData := node.getNodeData() bootstrapNodes, err := myNetwork.GetBootNodesMultiAddress(node.Options.Bootnodes) if err != nil { return err } - node.DHT, err = myNetwork.WithDHT(node.Context, node.Host, bootstrapNodes, node.Protocol, masaPrefix, node.PeerChan, myNodeData) + node.DHT, err = myNetwork.EnableDHT(node.Context, node.Host, bootstrapNodes, node.Protocol, masaPrefix, node.PeerChan, myNodeData) if err != nil { return err } - err = myNetwork.WithMDNS(node.Host, config.Rendezvous, node.PeerChan) + err = myNetwork.EnableMDNS(node.Host, config.Rendezvous, node.PeerChan) if err != nil { return err } @@ -340,21 +346,6 @@ func (node *OracleNode) IsPublisher() bool { return node.Signature != "" } -// FromUnixTime converts a Unix timestamp into a formatted string. -// The Unix timestamp is expected to be in seconds. -// The returned string is in the format "2006-01-02T15:04:05.000Z". -func (node *OracleNode) FromUnixTime(unixTime int64) string { - return time.Unix(unixTime, 0).Format("2006-01-02T15:04:05.000Z") -} - -// ToUnixTime converts a formatted string time into a Unix timestamp. -// The input string is expected to be in the format "2006-01-02T15:04:05.000Z". -// The returned Unix timestamp is in seconds. -func (node *OracleNode) ToUnixTime(stringTime string) int64 { - t, _ := time.Parse("2006-01-02T15:04:05.000Z", stringTime) - return t.Unix() -} - // Version returns the current version string of the oracle node software. func (node *OracleNode) Version() string { return config.GetInstance().Version diff --git a/pkg/api/handlers_node.go b/pkg/api/handlers_node.go index f5f4ef75..d50d926f 100644 --- a/pkg/api/handlers_node.go +++ b/pkg/api/handlers_node.go @@ -383,8 +383,8 @@ func (api *API) NodeStatusPageHandler() gin.HandlerFunc { "IsDiscordScraper": false, "IsTelegramScraper": false, "IsWebScraper": false, - "FirstJoined": api.Node.FromUnixTime(time.Now().Unix()), - "LastJoined": api.Node.FromUnixTime(time.Now().Unix()), + "FirstJoined": fromUnixTime(time.Now().Unix()), + "LastJoined": fromUnixTime(time.Now().Unix()), "CurrentUptime": "0", "TotalUptime": "0", "Rewards": "Coming Soon!", @@ -402,8 +402,8 @@ func (api *API) NodeStatusPageHandler() gin.HandlerFunc { templateData["IsDiscordScraper"] = nd.IsDiscordScraper templateData["IsTelegramScraper"] = nd.IsTelegramScraper templateData["IsWebScraper"] = nd.IsWebScraper - templateData["FirstJoined"] = api.Node.FromUnixTime(nd.FirstJoinedUnix) - templateData["LastJoined"] = api.Node.FromUnixTime(nd.LastJoinedUnix) + templateData["FirstJoined"] = fromUnixTime(nd.FirstJoinedUnix) + templateData["LastJoined"] = fromUnixTime(nd.LastJoinedUnix) templateData["CurrentUptime"] = pubsub.PrettyDuration(nd.GetCurrentUptime()) templateData["TotalUptime"] = pubsub.PrettyDuration(nd.GetAccumulatedUptime()) templateData["BytesScraped"] = "0 MB" @@ -434,3 +434,10 @@ func (api *API) GetNodeApiKey() gin.HandlerFunc { }) } } + +// fromUnixTime converts a Unix timestamp into a formatted string. +// The Unix timestamp is expected to be in seconds. +// The returned string is in the format "2006-01-02T15:04:05.000Z". +func fromUnixTime(unixTime int64) string { + return time.Unix(unixTime, 0).Format("2006-01-02T15:04:05.000Z") +} diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go index 6a3de7f1..c9608c9b 100644 --- a/pkg/chain/chain.go +++ b/pkg/chain/chain.go @@ -34,11 +34,12 @@ func (c *Chain) Init(path string) error { } logrus.WithFields(logrus.Fields{"block": Difficulty}).Info("[+] Initializing blockchain...") c.storage = &Persistance{} - c.storage.Init(dataDir, func() (Serializable, []byte) { + _, err := c.storage.Init(dataDir, func() (Serializable, []byte) { genesisBlock := makeGenesisBlock() return genesisBlock, genesisBlock.Hash }) - return nil + + return err } // makeGenesisBlock creates and returns the genesis block for the blockchain. @@ -136,7 +137,10 @@ func (c *Chain) getNextBlockNumber() uint64 { // Returns: // - error: An error if any operation fails during iteration, nil otherwise. func (c *Chain) IterateLink(each func(b *Block), pre, post func()) error { - c.UpdateLastHash() + if err := c.UpdateLastHash(); err != nil { + return err + } + currentHash := c.LastHash pre() for len(currentHash) > 0 { @@ -165,7 +169,10 @@ func (c *Chain) IterateLink(each func(b *Block), pre, post func()) error { // - *Block: A pointer to the last block in the chain. // - error: An error if the retrieval fails, nil otherwise. func (c *Chain) GetLastBlock() (*Block, error) { - c.UpdateLastHash() + if err := c.UpdateLastHash(); err != nil { + return nil, err + } + return c.GetBlock(c.LastHash) } diff --git a/pkg/db/operations.go b/pkg/db/operations.go index cd845076..31ad8da7 100644 --- a/pkg/db/operations.go +++ b/pkg/db/operations.go @@ -1,13 +1,10 @@ +// TODO Rename this to something else, this is NOT a database (it just stores data in the DHT and in a cache) package db import ( - "bytes" "context" "encoding/json" "fmt" - "io" - "net/http" - "os" "time" "github.com/masa-finance/masa-oracle/node" @@ -39,20 +36,10 @@ func WriteData(node *node.OracleNode, key string, value []byte) error { var err error node.DHT.ForceRefresh() - if key != node.Host.ID().String() { - err = node.DHT.PutValue(ctx, "/db/"+key, value) // any key value so the data is public + err = node.DHT.PutValue(ctx, "/db/"+key, value) // any key value so the data is public - _, er := PutCache(ctx, key, value) - if er != nil { - logrus.Errorf("[-] Error putting cache: %v", er) - } - } else { - err = node.DHT.PutValue(ctx, "/db/"+node.Host.ID().String(), value) // nodes private data based on node id - - _, er := PutCache(ctx, node.Host.ID().String(), value) - if er != nil { - logrus.Errorf("[-] Error putting cache: %v", er) - } + if _, er := PutCache(ctx, key, value); er != nil { + logrus.Errorf("[-] Error putting cache: %v", er) } if err != nil { @@ -80,16 +67,9 @@ func ReadData(node *node.OracleNode, key string) ([]byte, error) { var err error var val []byte - if key != node.Host.ID().String() { - val, err = GetCache(ctx, key) - if val == nil || err != nil { - val, err = node.DHT.GetValue(ctx, "/db/"+key) - } - } else { - val, err = GetCache(ctx, node.Host.ID().String()) - if val == nil || err != nil { - val, err = node.DHT.GetValue(ctx, "/db/"+node.Host.ID().String()) - } + val, err = GetCache(ctx, key) + if val == nil || err != nil { + val, err = node.DHT.GetValue(ctx, "/db/"+key) } if err != nil { @@ -102,51 +82,3 @@ func ReadData(node *node.OracleNode, key string) ([]byte, error) { return val, nil } - -// SendToS3 sends a payload to an S3-compatible API. -// -// Parameters: -// - uid: The unique identifier for the payload. -// - payload: The payload to be sent, represented as a map of key-value pairs. -// -// Returns: -// - error: Returns an error if the operation fails, otherwise returns nil. -func SendToS3(uid string, payload map[string]string) error { - - apiURL := os.Getenv("API_URL") - authToken := "your-secret-token" - - // Creating the JSON payload - // payload := map[string]string{ - // "key1": "value1", - // "key2": "value2", - // } - - jsonPayload, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("[-] Failed to marshal JSON payload: %v", err) - } - - req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonPayload)) - if err != nil { - return fmt.Errorf("[-] Failed to create HTTP request: %v", err) - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", authToken) - - // Send the request - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("[-] Failed to send HTTP request: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) - return fmt.Errorf("[-] Received non-OK response: %s, body: %s", resp.Status, string(bodyBytes)) - } - - return nil -} diff --git a/pkg/event/event_client.go b/pkg/event/event_client.go index 2d36cc23..79041ded 100644 --- a/pkg/event/event_client.go +++ b/pkg/event/event_client.go @@ -45,7 +45,7 @@ func (c *EventClient) SendEvent(event Event) error { defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { - + c.Logger.WithError(err).Error("Failed to close body") } }(resp.Body) diff --git a/pkg/llmbridge/sentiment.go b/pkg/llmbridge/sentiment.go index 5f780567..799e50cc 100644 --- a/pkg/llmbridge/sentiment.go +++ b/pkg/llmbridge/sentiment.go @@ -370,31 +370,31 @@ func AnalyzeSentimentTelegram(messages []*tg.Message, model string, prompt strin requestJSON, err := json.Marshal(genReq) if err != nil { - logrus.Errorf("Error marshaling request JSON: %v", err) + logrus.Errorf("[-] Error marshaling request JSON: %v", err) return "", "", err } uri := config.GetInstance().LLMChatUrl if uri == "" { - errMsg := "ollama api url not set" - logrus.Errorf(errMsg) - return "", "", errors.New(errMsg) + err := errors.New("[-] ollama api url not set") + logrus.Errorf("%v", err) + return "", "", err } resp, err := http.Post(uri, "application/json", bytes.NewReader(requestJSON)) if err != nil { - logrus.Errorf("Error sending request to API: %v", err) + logrus.Errorf("[-] Error sending request to API: %v", err) return "", "", err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - logrus.Errorf("Error reading response body: %v", err) + logrus.Errorf("[-] Error reading response body: %v", err) return "", "", err } var payload api.ChatResponse err = json.Unmarshal(body, &payload) if err != nil { - logrus.Errorf("Error unmarshaling response JSON: %v", err) + logrus.Errorf("[-] Error unmarshaling response JSON: %v", err) return "", "", err } diff --git a/pkg/network/address.go b/pkg/network/address.go index 7b7995db..a1674dbb 100644 --- a/pkg/network/address.go +++ b/pkg/network/address.go @@ -38,16 +38,8 @@ func GetMultiAddressesForHost(host host.Host) ([]multiaddr.Multiaddr, error) { return addresses, nil } -// GetMultiAddressesForHostQuiet returns the multiaddresses for the host without logging -func GetMultiAddressesForHostQuiet(host host.Host) []multiaddr.Multiaddr { - ma, err := GetMultiAddressesForHost(host) - if err != nil { - logrus.Fatal(err) - } - return ma -} - -// getPublicMultiAddress returns the best public IP address +// getPublicMultiAddress returns the best public IP address (for some definition of "best") +// TODO: This is not guaranteed to work, and should not be necessary since we're using AutoNAT func getPublicMultiAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { ipBytes, err := Get("https://api.ipify.org?format=text", nil) externalIP := net.ParseIP(string(ipBytes)) @@ -60,9 +52,8 @@ func getPublicMultiAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { } var addrToCopy multiaddr.Multiaddr - for _, addr := range addrs { - addrToCopy = addr - break + if len(addrs) > 0 { + addrToCopy = addrs[0] } publicMultiaddr, err := replaceIPComponent(addrToCopy, externalIP.String()) if err != nil { @@ -73,6 +64,7 @@ func getPublicMultiAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { } // GetPriorityAddress returns the best public or private IP address +// TODO rm? func GetPriorityAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { var bestPrivateAddr multiaddr.Multiaddr bestPublicAddr := getPublicMultiAddress(addrs) @@ -155,15 +147,6 @@ func replaceIPComponent(maddr multiaddr.Multiaddr, newIP string) (multiaddr.Mult return multiaddr.Join(components...), nil } -//func contains(slice []multiaddr.Multiaddr, item multiaddr.Multiaddr) bool { -// for _, a := range slice { -// if a.Equal(item) { -// return true -// } -// } -// return false -//} - func getGCPExternalIP() (bool, string) { // Create a new HTTP client with a specific timeout diff --git a/pkg/network/kdht.go b/pkg/network/kdht.go index 6b4e97cb..49cab0e8 100644 --- a/pkg/network/kdht.go +++ b/pkg/network/kdht.go @@ -30,7 +30,7 @@ type dbValidator struct{} func (dbValidator) Validate(_ string, _ []byte) error { return nil } func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } -func WithDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, protocolId, prefix protocol.ID, peerChan chan PeerEvent, nodeData *pubsub.NodeData) (*dht.IpfsDHT, error) { +func EnableDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, protocolId, prefix protocol.ID, peerChan chan PeerEvent, nodeData *pubsub.NodeData) (*dht.IpfsDHT, error) { options := make([]dht.Option, 0) options = append(options, dht.BucketSize(100)) // Adjust bucket size options = append(options, dht.Concurrency(100)) // Increase concurrency diff --git a/pkg/network/mdns.go b/pkg/network/mdns.go index 6beab0b4..66ba65e7 100644 --- a/pkg/network/mdns.go +++ b/pkg/network/mdns.go @@ -24,7 +24,7 @@ func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) { n.PeerChan <- pe } -func WithMDNS(host host.Host, rendezvous string, peerChan chan PeerEvent) error { +func EnableMDNS(host host.Host, rendezvous string, peerChan chan PeerEvent) error { notifee := &discoveryNotifee{ PeerChan: peerChan, Rendezvous: rendezvous, diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index a2b963b6..6abe70e3 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -382,11 +382,10 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip net.NodeDataChan <- nodeData net.nodeData.Set(nodeData.PeerId.String(), nodeData) } else { + dataChanged = true if !nd.SelfIdentified { - dataChanged = true nd.SelfIdentified = true } - dataChanged = true nd.IsStaked = nodeData.IsStaked nd.IsDiscordScraper = nodeData.IsDiscordScraper nd.IsTelegramScraper = nodeData.IsTelegramScraper @@ -396,7 +395,6 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip nd.Multiaddrs = nodeData.Multiaddrs nd.EthAddress = nodeData.EthAddress if nd.EthAddress == "" && nodeData.EthAddress != "" { - dataChanged = true nd.EthAddress = nodeData.EthAddress } diff --git a/pkg/scrapers/telegram/telegram_client.go b/pkg/scrapers/telegram/telegram_client.go index 836d2ea3..7b9dbde5 100644 --- a/pkg/scrapers/telegram/telegram_client.go +++ b/pkg/scrapers/telegram/telegram_client.go @@ -9,7 +9,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "github.com/gotd/contrib/bg" "github.com/gotd/td/session" @@ -21,7 +20,6 @@ import ( var ( client *telegram.Client - once sync.Once appID int // Your actual app ID appHash string // Your actual app hash sessionDir = filepath.Join(os.Getenv("HOME"), ".telegram-sessions") diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index c2acdd90..25617904 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -15,11 +15,13 @@ func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Leg followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") if errString != "" { - if handleRateLimit(fmt.Errorf(errString), account) { - return nil, fmt.Errorf("rate limited") + err := fmt.Errorf("rate limited: %s", errString) + if handleRateLimit(err, account) { + return nil, err } - logrus.Errorf("Error fetching followers: %v", errString) - return nil, fmt.Errorf("%v", errString) + + logrus.Errorf("[-] Error fetching followers: %s", errString) + return nil, fmt.Errorf("error fetching followers: %s", errString) } return followingResponse, nil diff --git a/pkg/scrapers/web/web.go b/pkg/scrapers/web/web.go index 45af5de1..e868d66f 100644 --- a/pkg/scrapers/web/web.go +++ b/pkg/scrapers/web/web.go @@ -66,11 +66,14 @@ func ScrapeWebData(uri []string, depth int) ([]byte, error) { ) // Adjust the parallelism and delay based on your needs and server capacity - c.Limit(&colly.LimitRule{ + limitRule := colly.LimitRule{ DomainGlob: "*", Parallelism: 4, // Increased parallelism Delay: 500 * time.Millisecond, // Reduced delay - }) + } + if err := c.Limit(&limitRule); err != nil { + logrus.Errorf("[-] Unable to set scraper limit. Using default. Error: %v", err) + } // Increase the timeout slightly if necessary c.SetRequestTimeout(240 * time.Second) // Increased to 4 minutes diff --git a/pkg/tests/scrapers/twitter_scraper_test.go b/pkg/tests/scrapers/twitter_scraper_test.go index 96a1c45f..1031ee6d 100644 --- a/pkg/tests/scrapers/twitter_scraper_test.go +++ b/pkg/tests/scrapers/twitter_scraper_test.go @@ -1,3 +1,4 @@ +// TODO: This is a WIP // Package scrapers_test contains integration tests for the Twitter scraper functionality. // // Dev Notes: @@ -97,7 +98,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(cookieFile).To(BeAnExistingFile()) // Clear the scraper to force cookie reuse - firstScraper = nil + firstScraper = nil // nolint: ineffassign // Second authentication (should use cookies) secondScraper := authenticate() @@ -124,7 +125,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(cookieFile).To(BeAnExistingFile()) // Clear the scraper to force cookie reuse - firstScraper = nil + firstScraper = nil // nolint: ineffassign // Second authentication (should use cookies) secondScraper := authenticate() diff --git a/pkg/tests/twitter/twitter_scraper_test.go b/pkg/tests/twitter/twitter_scraper_test.go index ee251b4b..9ede2a0f 100644 --- a/pkg/tests/twitter/twitter_scraper_test.go +++ b/pkg/tests/twitter/twitter_scraper_test.go @@ -90,7 +90,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(cookieFile).To(BeAnExistingFile()) // Clear the scraper to force cookie reuse - firstScraper = nil + firstScraper = nil // nolint: ineffassign // Second authentication (should use cookies) secondScraper := authenticate() @@ -117,7 +117,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(cookieFile).To(BeAnExistingFile()) // Clear the scraper to force cookie reuse - firstScraper = nil + firstScraper = nil // nolint: ineffassign // Second authentication (should use cookies) secondScraper := authenticate() diff --git a/pkg/workers/worker_selection.go b/pkg/workers/worker_selection.go index fd05954b..665819de 100644 --- a/pkg/workers/worker_selection.go +++ b/pkg/workers/worker_selection.go @@ -2,7 +2,6 @@ package workers import ( "math/rand" - "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/sirupsen/logrus" @@ -34,7 +33,6 @@ func getTwitterWorkers(node *node.OracleNode, nodes []pubsub.NodeData, limit int topPerformers := nodes[:poolSize] // Shuffle the top performers - rand.Seed(time.Now().UnixNano()) rand.Shuffle(len(topPerformers), func(i, j int) { topPerformers[i], topPerformers[j] = topPerformers[j], topPerformers[i] })