diff --git a/.github/workflows/tests.yaml b/.github/workflows/tests.yaml index 10642528..97b256b1 100644 --- a/.github/workflows/tests.yaml +++ b/.github/workflows/tests.yaml @@ -9,6 +9,7 @@ on: - '**' jobs: test: + runs-on: ubuntu-latest steps: @@ -20,12 +21,16 @@ jobs: with: go-version: '^1.22' - - name: Install dependencies - run: go mod tidy + - name: Install golangci-lint + run: sudo snap install golangci-lint --classic + + - name: Code formatting and linting + run: make ci-lint + - name: Run tests - run: | - make test + run: make test + - name: Upload coverage to Codecov uses: codecov/codecov-action@v4 with: - token: ${{ secrets.CODECOV_TOKEN }} \ No newline at end of file + token: ${{ secrets.CODECOV_TOKEN }} diff --git a/.gitignore b/.gitignore index 7b804f9f..eb53ba71 100644 --- a/.gitignore +++ b/.gitignore @@ -73,3 +73,6 @@ snippets.txt # Build result of goreleaser dist/ bp-todo.md + +# Result of running tests with coverage +coverage.txt diff --git a/Makefile b/Makefile index e7c03c02..477b2c4e 100644 --- a/Makefile +++ b/Makefile @@ -28,7 +28,7 @@ build: contracts/node_modules install: @sh ./node_install.sh - + run: build @./bin/masa-node @@ -44,16 +44,25 @@ stake: build client: build @./bin/masa-node-cli +# TODO: Add -race and fix race conditions test: contracts/node_modules - @go test -coverprofile=coverage.txt -covermode=atomic -v ./... + @go test -coverprofile=coverage.txt -covermode=atomic -v -count=1 -shuffle=on ./... + +ci-lint: + go mod tidy && git diff --exit-code + go mod download + go mod verify + gofmt -s -w . && git diff --exit-code + go vet ./... + golangci-lint run clean: @rm -rf bin - + @if [ -d ~/.masa/blocks ]; then rm -rf ~/.masa/blocks; fi @if [ -d ~/.masa/cache ]; then rm -rf ~/.masa/cache; fi @if [ -f masa_node.log ]; then rm masa_node.log; fi - + proto: sh pkg/workers/messages/build.sh diff --git a/cmd/masa-node-cli/handlers.go b/cmd/masa-node-cli/handlers.go index 5ffd0da7..32d14c88 100644 --- a/cmd/masa-node-cli/handlers.go +++ b/cmd/masa-node-cli/handlers.go @@ -7,7 +7,6 @@ import ( "errors" "fmt" "io" - "log" "net/http" "os" "os/exec" @@ -43,21 +42,6 @@ func handleIPAddress(multiAddr string) string { return "" } -// handleOpenFile reads the content of a file specified by the filename 'f' and returns it as a string. -// If the file cannot be read, the function logs a fatal error and exits the program. -// Parameters: -// - f: The name of the file to read. -// Returns: -// - A string containing the content of the file. -// func handleOpenFile(f string) string { -// dat, err := os.ReadFile(f) -// if err != nil { -// log.Print(err) -// return "" -// } -// return string(dat) -// } - // handleSaveFile writes the provided content to a file specified by the filename 'f'. // It appends the content to the file if it already exists, or creates a new file with the content if it does not. // The file is created with permissions set to 0755. @@ -66,9 +50,14 @@ func handleIPAddress(multiAddr string) string { // - content: The content to write to the file. func handleSaveFile(f string, content string) { file, err := os.OpenFile(f, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0755) - file.WriteString(content + "\n") if err != nil { - log.Println(err) + logrus.Errorf("[-] Error while opening file %s for writing: %v", f, err) + return + } + + _, err = file.WriteString(content + "\n") + if err != nil { + logrus.Errorf("[-] Error while writing to file %s: %v", f, err) return } } @@ -84,7 +73,7 @@ func handleSaveFile(f string, content string) { func handleGPT(prompt string, userMessage string) (string, error) { key := os.Getenv("OPENAI_API_KEY") if key == "" { - log.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.") + logrus.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.") return "", errors.New("OPENAI_API_KEY is not set") } client := openai.NewClient(key) @@ -105,7 +94,7 @@ func handleGPT(prompt string, userMessage string) (string, error) { }, ) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while getting ChatGPT completion: %v", err) return "", err } return resp.Choices[0].Message.Content, nil @@ -131,7 +120,7 @@ func handleSpeak(response string) { req, err := http.NewRequest(http.MethodPost, os.Getenv("ELAB_URL"), bytes.NewBuffer(buf)) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while creating HTTP request to %s: %v", os.Getenv("ELAB_URL"), err) return } req.Header.Set("accept", "*/*") @@ -140,7 +129,7 @@ func handleSpeak(response string) { resp, err := http.DefaultClient.Do(req) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while sending HTTP POST to %s: %v", os.Getenv("ELAB_URL"), err) return } defer resp.Body.Close() @@ -148,19 +137,39 @@ func handleSpeak(response string) { if resp.StatusCode == http.StatusOK { bodyBytes, err := io.ReadAll(resp.Body) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while reading HTTP reply from ElevenLabs API: %v", err) return } + + // TODO: Configure filename file, err := os.Create("output.mp3") - file.Write(bodyBytes) if err != nil { - log.Print(err) + logrus.Errorf("[-] Error while opening output.mp3 for writing: %v", err) + return + } + + _, err = file.Write(bodyBytes) + if err != nil { + logrus.Errorf("[-] Error while writing voice data to output.mp3: %v", err) return - } else { - cmd := exec.Command("afplay", "output.mp3") - go cmd.Run() - go handleTranscribe("output.mp3", "transcription.txt") } + + // TODO: Is afplay available in all platforms? Perhaps configure? + cmd := exec.Command("afplay", "output.mp3") + go func() { + err := cmd.Run() + if err != nil { + logrus.Errorf("[-] Error while playing output using %s: %v", cmd, err) + } + }() + go func() { + err := handleTranscribe("output.mp3", "transcription.txt") + if err != nil { + logrus.Errorf("[-] Error while transcribing audio: %v", err) + } + }() + + // TODO: perhaps rm output.mp3? } } @@ -169,8 +178,7 @@ func handleSpeak(response string) { func handleTranscribe(audioFile string, txtFile string) error { key := os.Getenv("OPENAI_API_KEY") if key == "" { - log.Println("OPENAI_API_KEY is not set. Please set the environment variable and try again.") - return errors.New("OPENAI_API_KEY is not set") + return errors.New("OPENAI_API_KEY is not set. Please set the environment variable and try again.") } client := openai.NewClient(key) ctx := context.Background() @@ -178,13 +186,13 @@ func handleTranscribe(audioFile string, txtFile string) error { Model: openai.Whisper1, FilePath: audioFile, } + resp, err := client.CreateTranscription(ctx, req) if err != nil { - fmt.Printf("Transcription error: %v\n", err) return err - } else { - handleSaveFile(txtFile, resp.Text) } + + handleSaveFile(txtFile, resp.Text) return nil } diff --git a/cmd/masa-node/config.go b/cmd/masa-node/config.go index 1c394453..e995cbe6 100644 --- a/cmd/masa-node/config.go +++ b/cmd/masa-node/config.go @@ -3,14 +3,23 @@ package main import ( "github.com/masa-finance/masa-oracle/node" "github.com/masa-finance/masa-oracle/pkg/config" + "github.com/masa-finance/masa-oracle/pkg/masacrypto" "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/masa-finance/masa-oracle/pkg/workers" ) -func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) { +func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) { // WorkerManager configuration - // XXX: this needs to be moved under config, but now it's here as there are import cycles given singletons - workerManagerOptions := []workers.WorkerOptionFunc{} + // TODO: this needs to be moved under config, but now it's here as there are import cycles given singletons + workerManagerOptions := []workers.WorkerOptionFunc{ + workers.WithLlmChatUrl(cfg.LLMChatUrl), + workers.WithMasaDir(cfg.MasaDir), + } + + cachePath := cfg.CachePath + if cachePath == "" { + cachePath = cfg.MasaDir + "/cache" + } masaNodeOptions := []node.Option{ node.EnableStaked, @@ -19,6 +28,10 @@ func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerMana node.WithVersion(cfg.Version), node.WithPort(cfg.PortNbr), node.WithBootNodes(cfg.Bootnodes...), + node.WithMasaDir(cfg.MasaDir), + node.WithCachePath(cachePath), + node.WithLLMCloudFlareURL(cfg.LLMCfUrl), + node.WithKeyManager(keyManager), } if cfg.TwitterScraper { @@ -50,8 +63,7 @@ func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerMana blockChainEventTracker := node.NewBlockChain() pubKeySub := &pubsub.PublicKeySubscriptionHandler{} - // TODO: Where the config is involved, move to the config the generation of - // Node options + // TODO: Where the config is involved, move to the config the generation of Node options masaNodeOptions = append(masaNodeOptions, []node.Option{ // Register the worker manager node.WithMasaProtocolHandler( @@ -68,7 +80,7 @@ func initOptions(cfg *config.AppConfig) ([]node.Option, *workers.WorkHandlerMana // and other peers can do work we only need to check this here // if this peer can or cannot scrape or write that is checked in other places masaNodeOptions = append(masaNodeOptions, - node.WithService(blockChainEventTracker.Start(config.GetInstance().MasaDir)), + node.WithService(blockChainEventTracker.Start(cfg.MasaDir)), ) } diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index 511140e8..6443fb9b 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -34,13 +34,16 @@ func main() { cfg.LogConfig() cfg.SetupLogging() - keyManager := masacrypto.KeyManagerInstance() + keyManager, err := masacrypto.NewKeyManager(cfg.PrivateKey, cfg.PrivateKeyFile) + if err != nil { + logrus.Fatal("[-] Failed to initialize keys:", err) + } // Create a cancellable context ctx, cancel := context.WithCancel(context.Background()) if cfg.Faucet { - err := handleFaucet(keyManager.EcdsaPrivKey) + err := handleFaucet(cfg.RpcUrl, keyManager.EcdsaPrivKey) if err != nil { logrus.Errorf("[-] %v", err) os.Exit(1) @@ -51,7 +54,7 @@ func main() { } if cfg.StakeAmount != "" { - err := handleStaking(keyManager.EcdsaPrivKey) + err := handleStaking(cfg.RpcUrl, keyManager.EcdsaPrivKey, cfg.StakeAmount) if err != nil { logrus.Warningf("%v", err) } else { @@ -61,7 +64,7 @@ func main() { } // Verify the staking event - isStaked, err := staking.VerifyStakingEvent(keyManager.EthAddress) + isStaked, err := staking.VerifyStakingEvent(cfg.RpcUrl, keyManager.EthAddress) if err != nil { logrus.Error(err) } @@ -70,9 +73,7 @@ func main() { logrus.Warn("No staking event found for this address") } - isValidator := cfg.Validator - - masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg) + masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg, keyManager) // Create a new OracleNode masaNode, err := node.NewOracleNode(ctx, masaNodeOptions...) @@ -80,8 +81,7 @@ func main() { logrus.Fatal(err) } - err = masaNode.Start() - if err != nil { + if err = masaNode.Start(); err != nil { logrus.Fatal(err) } @@ -98,7 +98,7 @@ func main() { } // Init cache resolver - db.InitResolverCache(masaNode, keyManager) + db.InitResolverCache(masaNode, keyManager, cfg.AllowedPeerId, cfg.AllowedPeerPublicKey, cfg.Validator) // Cancel the context when SIGINT is received go handleSignals(cancel, masaNode, cfg) @@ -118,8 +118,11 @@ func main() { // Get the multiaddress and IP address of the node multiAddr := masaNode.GetMultiAddrs() // Get the multiaddress ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address + if err != nil { + logrus.Errorf("[-] Error while getting node IP address from %v: %v", multiAddr, err) + } // Display the welcome message with the multiaddress and IP address - config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) + config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, cfg.Validator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) <-ctx.Done() } diff --git a/cmd/masa-node/staking.go b/cmd/masa-node/staking.go index 19889b4c..36b07890 100644 --- a/cmd/masa-node/staking.go +++ b/cmd/masa-node/staking.go @@ -9,20 +9,19 @@ import ( "github.com/fatih/color" "github.com/sirupsen/logrus" - "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/staking" ) -func handleStaking(privateKey *ecdsa.PrivateKey) error { +func handleStaking(rpcUrl string, privateKey *ecdsa.PrivateKey, stakeAmount string) error { // Staking logic // Convert the stake amount to the smallest unit, assuming 18 decimal places - amountBigInt, ok := new(big.Int).SetString(config.GetInstance().StakeAmount, 10) + amountBigInt, ok := new(big.Int).SetString(stakeAmount, 10) if !ok { logrus.Fatal("Invalid stake amount") } amountInSmallestUnit := new(big.Int).Mul(amountBigInt, big.NewInt(1e18)) - stakingClient, err := staking.NewClient(privateKey) + stakingClient, err := staking.NewClient(rpcUrl, privateKey) if err != nil { return err } @@ -86,8 +85,8 @@ func handleStaking(privateKey *ecdsa.PrivateKey) error { return nil } -func handleFaucet(privateKey *ecdsa.PrivateKey) error { - faucetClient, err := staking.NewClient(privateKey) +func handleFaucet(rpcUrl string, privateKey *ecdsa.PrivateKey) error { + faucetClient, err := staking.NewClient(rpcUrl, privateKey) if err != nil { logrus.Error("[-] Failed to create staking client:", err) return err diff --git a/docs/oracle-node/twitter-sentiment.md b/docs/oracle-node/twitter-sentiment.md index a8b3fcb4..cee2dd8c 100644 --- a/docs/oracle-node/twitter-sentiment.md +++ b/docs/oracle-node/twitter-sentiment.md @@ -64,7 +64,7 @@ const ( #### Masa cli or code integration -Tweets are fetched using the Twitter Scraper library, as seen in the [llmbridge](file:///Users/john/Projects/masa/masa-oracle/pkg/llmbridge/sentiment_twitter.go#1%2C9-1%2C9) package. This process does not require Twitter API keys, making it accessible and straightforward. +Tweets are fetched using the Twitter Scraper library, as seen in the [llmbridge](../pkg/llmbridge/sentiment_twitter.go#1%2C9-1%2C9) package. This process does not require Twitter API keys, making it accessible and straightforward. ```go func AnalyzeSentimentTweets(tweets []*twitterscraper.Tweet, model string) (string, string, error) { ... } diff --git a/node/options.go b/node/options.go index 613e8f73..4f58ba2b 100644 --- a/node/options.go +++ b/node/options.go @@ -4,6 +4,7 @@ import ( "context" "github.com/masa-finance/masa-oracle/node/types" + "github.com/masa-finance/masa-oracle/pkg/masacrypto" "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/protocol" @@ -22,15 +23,18 @@ type NodeOption struct { IsWebScraper bool IsLlmServer bool - Bootnodes []string - RandomIdentity bool - UseLocalWorkerAsRemote bool - Services []func(ctx context.Context, node *OracleNode) - PubSubHandles []PubSubHandlers - ProtocolHandlers map[protocol.ID]network.StreamHandler - MasaProtocolHandlers map[string]network.StreamHandler - Environment string - Version string + Bootnodes []string + RandomIdentity bool + Services []func(ctx context.Context, node *OracleNode) + PubSubHandles []PubSubHandlers + ProtocolHandlers map[protocol.ID]network.StreamHandler + MasaProtocolHandlers map[string]network.StreamHandler + Environment string + Version string + MasaDir string + CachePath string + LLMCloudflareUrl string + KeyManager *masacrypto.KeyManager } type PubSubHandlers struct { @@ -155,3 +159,27 @@ func WithPort(port int) Option { o.PortNbr = port } } + +func WithMasaDir(directory string) Option { + return func(o *NodeOption) { + o.MasaDir = directory + } +} + +func WithCachePath(path string) Option { + return func(o *NodeOption) { + o.CachePath = path + } +} + +func WithLLMCloudFlareURL(url string) Option { + return func(o *NodeOption) { + o.LLMCloudflareUrl = url + } +} + +func WithKeyManager(km *masacrypto.KeyManager) Option { + return func(o *NodeOption) { + o.KeyManager = km + } +} diff --git a/node/oracle_node.go b/node/oracle_node.go index f5666335..8ccb535a 100644 --- a/node/oracle_node.go +++ b/node/oracle_node.go @@ -27,14 +27,14 @@ import ( "github.com/masa-finance/masa-oracle/internal/versioning" "github.com/masa-finance/masa-oracle/pkg/chain" "github.com/masa-finance/masa-oracle/pkg/config" - "github.com/masa-finance/masa-oracle/pkg/masacrypto" myNetwork "github.com/masa-finance/masa-oracle/pkg/network" "github.com/masa-finance/masa-oracle/pkg/pubsub" ) type OracleNode struct { - Host host.Host - Protocol protocol.ID + Host host.Host + Protocol protocol.ID + // TODO: Rm from here and from NodeData? Should not be necessary priorityAddrs multiaddr.Multiaddr multiAddrs []multiaddr.Multiaddr DHT *dht.IpfsDHT @@ -47,6 +47,7 @@ type OracleNode struct { Blockchain *chain.Chain Options NodeOption Context context.Context + Config *config.AppConfig } // GetMultiAddrs returns the priority multiaddr for this node. @@ -101,7 +102,7 @@ func NewOracleNode(ctx context.Context, opts ...Option) (*OracleNode, error) { if o.RandomIdentity { libp2pOptions = append(libp2pOptions, libp2p.RandomIdentity) } else { - libp2pOptions = append(libp2pOptions, libp2p.Identity(masacrypto.KeyManagerInstance().Libp2pPrivKey)) + libp2pOptions = append(libp2pOptions, libp2p.Identity(o.KeyManager.Libp2pPrivKey)) } securityOptions := []libp2p.Option{ @@ -134,9 +135,13 @@ func NewOracleNode(ctx context.Context, opts ...Option) (*OracleNode, error) { return nil, err } + ma, err := myNetwork.GetMultiAddressesForHost(hst) + if err != nil { + return nil, err + } n := &OracleNode{ Host: hst, - multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), + multiAddrs: ma, PeerChan: make(chan myNetwork.PeerEvent), NodeTracker: pubsub.NewNodeEventTracker(versioning.ProtocolVersion, o.Environment, hst.ID().String()), Context: ctx, @@ -163,14 +168,22 @@ func (node *OracleNode) generateEthHexKeyForRandomIdentity() (string, error) { return common.BytesToAddress(ethereumCrypto.Keccak256(rawKey[1:])[12:]).Hex(), nil } -func (node *OracleNode) getNodeData(host host.Host, addr multiaddr.Multiaddr, publicEthAddress string) *pubsub.NodeData { +func (node *OracleNode) getNodeData() *pubsub.NodeData { // GetSelfNodeData converts the local node's data into a JSON byte array. // It populates a NodeData struct with the node's ID, staking status, and Ethereum address. // The NodeData struct is then marshalled into a JSON byte array. // Returns nil if there is an error marshalling to JSON. // Create and populate NodeData - nodeData := pubsub.NewNodeData(addr, host.ID(), publicEthAddress, pubsub.ActivityJoined) - nodeData.MultiaddrsString = addr.String() + + var publicEthAddress string + if node.Options.RandomIdentity { + publicEthAddress, _ = node.generateEthHexKeyForRandomIdentity() + } else { + publicEthAddress = node.Options.KeyManager.EthAddress + } + + nodeData := pubsub.NewNodeData(node.priorityAddrs, node.Host.ID(), publicEthAddress, pubsub.ActivityJoined) + nodeData.MultiaddrsString = node.priorityAddrs.String() nodeData.IsStaked = node.Options.IsStaked nodeData.IsTwitterScraper = node.Options.IsTwitterScraper nodeData.IsDiscordScraper = node.Options.IsDiscordScraper @@ -211,26 +224,19 @@ func (node *OracleNode) Start() (err error) { go node.handleDiscoveredPeers() go node.NodeTracker.ClearExpiredWorkerTimeouts() - var publicKeyHex string - if node.Options.RandomIdentity { - publicKeyHex, _ = node.generateEthHexKeyForRandomIdentity() - } else { - publicKeyHex = masacrypto.KeyManagerInstance().EthAddress - } - - myNodeData := node.getNodeData(node.Host, node.priorityAddrs, publicKeyHex) + myNodeData := node.getNodeData() bootstrapNodes, err := myNetwork.GetBootNodesMultiAddress(node.Options.Bootnodes) if err != nil { return err } - node.DHT, err = myNetwork.WithDHT(node.Context, node.Host, bootstrapNodes, node.Protocol, masaPrefix, node.PeerChan, myNodeData) + node.DHT, err = myNetwork.EnableDHT(node.Context, node.Host, bootstrapNodes, node.Protocol, masaPrefix, node.PeerChan, myNodeData) if err != nil { return err } - err = myNetwork.WithMDNS(node.Host, config.Rendezvous, node.PeerChan) + err = myNetwork.EnableMDNS(node.Host, config.Rendezvous, node.PeerChan) if err != nil { return err } @@ -239,7 +245,7 @@ func (node *OracleNode) Start() (err error) { go p(node.Context, node) } - go myNetwork.Discover(node.Context, node.Host, node.DHT, node.Protocol) + go myNetwork.Discover(node.Context, node.Options.Bootnodes, node.Host, node.DHT, node.Protocol) nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String()) if nodeData == nil { @@ -322,15 +328,14 @@ func (node *OracleNode) handleStream(stream network.Stream) { // IsWorker determines if the OracleNode is configured to act as an actor. // An actor node is one that has at least one of the following scrapers enabled: -// TwitterScraper, DiscordScraper, or WebScraper. +// TwitterScraper, DiscordScraper, TelegramScraper or WebScraper. // It returns true if any of these scrapers are enabled, otherwise false. func (node *OracleNode) IsWorker() bool { // need to get this by node data - cfg := config.GetInstance() - if cfg.TwitterScraper || cfg.DiscordScraper || cfg.TelegramScraper || cfg.WebScraper { - return true - } - return false + return node.Options.IsTwitterScraper || + node.Options.IsDiscordScraper || + node.Options.IsTelegramScraper || + node.Options.IsWebScraper } // IsPublisher returns true if this node is a publisher node. @@ -340,24 +345,9 @@ func (node *OracleNode) IsPublisher() bool { return node.Signature != "" } -// FromUnixTime converts a Unix timestamp into a formatted string. -// The Unix timestamp is expected to be in seconds. -// The returned string is in the format "2006-01-02T15:04:05.000Z". -func (node *OracleNode) FromUnixTime(unixTime int64) string { - return time.Unix(unixTime, 0).Format("2006-01-02T15:04:05.000Z") -} - -// ToUnixTime converts a formatted string time into a Unix timestamp. -// The input string is expected to be in the format "2006-01-02T15:04:05.000Z". -// The returned Unix timestamp is in seconds. -func (node *OracleNode) ToUnixTime(stringTime string) int64 { - t, _ := time.Parse("2006-01-02T15:04:05.000Z", stringTime) - return t.Unix() -} - // Version returns the current version string of the oracle node software. func (node *OracleNode) Version() string { - return config.GetInstance().Version + return node.Options.Version } // LogActiveTopics logs the currently active topic names to the diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index fb8fb443..08f8d1ec 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -781,7 +781,7 @@ func (api *API) CfLlmChat() gin.HandlerFunc { } api.sendTrackingEvent(data_types.LLMChat, bodyBytes) - cfUrl := config.GetInstance().LLMCfUrl + cfUrl := api.Node.Options.LLMCloudflareUrl if cfUrl == "" { c.JSON(http.StatusBadRequest, gin.H{"error": fmt.Errorf("missing env LLM_CF_URL")}) return diff --git a/pkg/api/handlers_node.go b/pkg/api/handlers_node.go index f5f4ef75..7c8d789b 100644 --- a/pkg/api/handlers_node.go +++ b/pkg/api/handlers_node.go @@ -11,7 +11,6 @@ import ( "github.com/masa-finance/masa-oracle/pkg/consensus" "github.com/masa-finance/masa-oracle/pkg/db" - "github.com/masa-finance/masa-oracle/pkg/masacrypto" "github.com/sirupsen/logrus" "github.com/gin-gonic/gin" @@ -181,7 +180,7 @@ func (api *API) PublishPublicKeyHandler() gin.HandlerFunc { return } - keyManager := masacrypto.KeyManagerInstance() + keyManager := api.Node.Options.KeyManager // Set the data to be signed as the signer's Peer ID data := []byte(api.Node.Host.ID().String()) @@ -383,8 +382,8 @@ func (api *API) NodeStatusPageHandler() gin.HandlerFunc { "IsDiscordScraper": false, "IsTelegramScraper": false, "IsWebScraper": false, - "FirstJoined": api.Node.FromUnixTime(time.Now().Unix()), - "LastJoined": api.Node.FromUnixTime(time.Now().Unix()), + "FirstJoined": fromUnixTime(time.Now().Unix()), + "LastJoined": fromUnixTime(time.Now().Unix()), "CurrentUptime": "0", "TotalUptime": "0", "Rewards": "Coming Soon!", @@ -402,8 +401,8 @@ func (api *API) NodeStatusPageHandler() gin.HandlerFunc { templateData["IsDiscordScraper"] = nd.IsDiscordScraper templateData["IsTelegramScraper"] = nd.IsTelegramScraper templateData["IsWebScraper"] = nd.IsWebScraper - templateData["FirstJoined"] = api.Node.FromUnixTime(nd.FirstJoinedUnix) - templateData["LastJoined"] = api.Node.FromUnixTime(nd.LastJoinedUnix) + templateData["FirstJoined"] = fromUnixTime(nd.FirstJoinedUnix) + templateData["LastJoined"] = fromUnixTime(nd.LastJoinedUnix) templateData["CurrentUptime"] = pubsub.PrettyDuration(nd.GetCurrentUptime()) templateData["TotalUptime"] = pubsub.PrettyDuration(nd.GetAccumulatedUptime()) templateData["BytesScraped"] = "0 MB" @@ -434,3 +433,10 @@ func (api *API) GetNodeApiKey() gin.HandlerFunc { }) } } + +// fromUnixTime converts a Unix timestamp into a formatted string. +// The Unix timestamp is expected to be in seconds. +// The returned string is in the format "2006-01-02T15:04:05.000Z". +func fromUnixTime(unixTime int64) string { + return time.Unix(unixTime, 0).Format("2006-01-02T15:04:05.000Z") +} diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go index 6a3de7f1..c9608c9b 100644 --- a/pkg/chain/chain.go +++ b/pkg/chain/chain.go @@ -34,11 +34,12 @@ func (c *Chain) Init(path string) error { } logrus.WithFields(logrus.Fields{"block": Difficulty}).Info("[+] Initializing blockchain...") c.storage = &Persistance{} - c.storage.Init(dataDir, func() (Serializable, []byte) { + _, err := c.storage.Init(dataDir, func() (Serializable, []byte) { genesisBlock := makeGenesisBlock() return genesisBlock, genesisBlock.Hash }) - return nil + + return err } // makeGenesisBlock creates and returns the genesis block for the blockchain. @@ -136,7 +137,10 @@ func (c *Chain) getNextBlockNumber() uint64 { // Returns: // - error: An error if any operation fails during iteration, nil otherwise. func (c *Chain) IterateLink(each func(b *Block), pre, post func()) error { - c.UpdateLastHash() + if err := c.UpdateLastHash(); err != nil { + return err + } + currentHash := c.LastHash pre() for len(currentHash) > 0 { @@ -165,7 +169,10 @@ func (c *Chain) IterateLink(each func(b *Block), pre, post func()) error { // - *Block: A pointer to the last block in the chain. // - error: An error if the retrieval fails, nil otherwise. func (c *Chain) GetLastBlock() (*Block, error) { - c.UpdateLastHash() + if err := c.UpdateLastHash(); err != nil { + return nil, err + } + return c.GetBlock(c.LastHash) } diff --git a/pkg/config/app.go b/pkg/config/app.go index 1ecbebab..f6a74ec7 100644 --- a/pkg/config/app.go +++ b/pkg/config/app.go @@ -91,6 +91,7 @@ func GetInstance() *AppConfig { instance = &AppConfig{} instance.setDefaultConfig() + // TODO Shouldn't the env vars override the file config, instead of the other way around? instance.setEnvVariableConfig() instance.setFileConfig(viper.GetString("FILE_PATH")) diff --git a/pkg/db/access_control.go b/pkg/db/access_control.go index 9840998d..47ea5044 100644 --- a/pkg/db/access_control.go +++ b/pkg/db/access_control.go @@ -8,11 +8,11 @@ package db import ( "encoding/hex" + libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto" "github.com/libp2p/go-libp2p/core/host" "github.com/sirupsen/logrus" - "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/consensus" ) @@ -34,14 +34,7 @@ func isAuthorized(nodeID string) bool { } // Verifier checks if the given host is allowed to access to the database and verifies the signature -func Verifier(h host.Host, data []byte, signature []byte) bool { - // Load configuration instance - cfg := config.GetInstance() - - // Get allowed peer ID and public key from the configuration - allowedPeerID := cfg.AllowedPeerId - allowedPeerPubKeyString := cfg.AllowedPeerPublicKey - +func Verifier(h host.Host, data []byte, signature []byte, allowedPeerID string, allowedPeerPubKeyString string, isValidator bool) bool { if allowedPeerID == "" || allowedPeerPubKeyString == "" { logrus.Warn("[-] Allowed peer ID or public key not found in configuration") return false @@ -81,7 +74,7 @@ func Verifier(h host.Host, data []byte, signature []byte) bool { return false } - if cfg.Validator { + if isValidator { logrus.WithFields(logrus.Fields{ "hostID": h.ID().String(), diff --git a/pkg/db/operations.go b/pkg/db/operations.go index cd845076..91fd5e83 100644 --- a/pkg/db/operations.go +++ b/pkg/db/operations.go @@ -1,13 +1,10 @@ +// TODO: Rename this to something else, this is NOT a database (it just stores data in the DHT and in a cache) package db import ( - "bytes" "context" "encoding/json" "fmt" - "io" - "net/http" - "os" "time" "github.com/masa-finance/masa-oracle/node" @@ -39,20 +36,10 @@ func WriteData(node *node.OracleNode, key string, value []byte) error { var err error node.DHT.ForceRefresh() - if key != node.Host.ID().String() { - err = node.DHT.PutValue(ctx, "/db/"+key, value) // any key value so the data is public + err = node.DHT.PutValue(ctx, "/db/"+key, value) // any key value so the data is public - _, er := PutCache(ctx, key, value) - if er != nil { - logrus.Errorf("[-] Error putting cache: %v", er) - } - } else { - err = node.DHT.PutValue(ctx, "/db/"+node.Host.ID().String(), value) // nodes private data based on node id - - _, er := PutCache(ctx, node.Host.ID().String(), value) - if er != nil { - logrus.Errorf("[-] Error putting cache: %v", er) - } + if _, er := PutCache(ctx, key, value); er != nil { + logrus.Errorf("[-] Error putting cache: %v", er) } if err != nil { @@ -80,16 +67,9 @@ func ReadData(node *node.OracleNode, key string) ([]byte, error) { var err error var val []byte - if key != node.Host.ID().String() { - val, err = GetCache(ctx, key) - if val == nil || err != nil { - val, err = node.DHT.GetValue(ctx, "/db/"+key) - } - } else { - val, err = GetCache(ctx, node.Host.ID().String()) - if val == nil || err != nil { - val, err = node.DHT.GetValue(ctx, "/db/"+node.Host.ID().String()) - } + val, err = GetCache(ctx, key) + if val == nil || err != nil { + val, err = node.DHT.GetValue(ctx, "/db/"+key) } if err != nil { @@ -102,51 +82,3 @@ func ReadData(node *node.OracleNode, key string) ([]byte, error) { return val, nil } - -// SendToS3 sends a payload to an S3-compatible API. -// -// Parameters: -// - uid: The unique identifier for the payload. -// - payload: The payload to be sent, represented as a map of key-value pairs. -// -// Returns: -// - error: Returns an error if the operation fails, otherwise returns nil. -func SendToS3(uid string, payload map[string]string) error { - - apiURL := os.Getenv("API_URL") - authToken := "your-secret-token" - - // Creating the JSON payload - // payload := map[string]string{ - // "key1": "value1", - // "key2": "value2", - // } - - jsonPayload, err := json.Marshal(payload) - if err != nil { - return fmt.Errorf("[-] Failed to marshal JSON payload: %v", err) - } - - req, err := http.NewRequest("POST", apiURL, bytes.NewBuffer(jsonPayload)) - if err != nil { - return fmt.Errorf("[-] Failed to create HTTP request: %v", err) - } - - req.Header.Set("Content-Type", "application/json") - req.Header.Set("Authorization", authToken) - - // Send the request - client := &http.Client{} - resp, err := client.Do(req) - if err != nil { - return fmt.Errorf("[-] Failed to send HTTP request: %v", err) - } - defer resp.Body.Close() - - if resp.StatusCode != http.StatusOK { - bodyBytes, _ := io.ReadAll(resp.Body) - return fmt.Errorf("[-] Received non-OK response: %s, body: %s", resp.Status, string(bodyBytes)) - } - - return nil -} diff --git a/pkg/db/resolver_cache.go b/pkg/db/resolver_cache.go index 4473c8da..685fc242 100644 --- a/pkg/db/resolver_cache.go +++ b/pkg/db/resolver_cache.go @@ -43,13 +43,10 @@ type Record struct { // The purpose of this function is to initialize the resolver cache and perform any necessary setup or configuration. It associates the resolver cache with the provided Masa Oracle node and key manager. // // Note: The specific implementation details of the `InitResolverCache` function are not provided in the given code snippet. The function signature suggests that it initializes the resolver cache, but the actual initialization logic would be present in the function body. -func InitResolverCache(node *node.OracleNode, keyManager *masacrypto.KeyManager) { +func InitResolverCache(node *node.OracleNode, keyManager *masacrypto.KeyManager, allowedPeerID string, allowedPeerPubKeyString string, isValidator bool) { var err error - cachePath := config.GetInstance().CachePath - if cachePath == "" { - cachePath = config.GetInstance().MasaDir + "/cache" - } - cache, err = leveldb.NewDatastore(cachePath, nil) + + cache, err = leveldb.NewDatastore(node.Options.CachePath, nil) if err != nil { log.Fatal(err) } @@ -60,7 +57,7 @@ func InitResolverCache(node *node.OracleNode, keyManager *masacrypto.KeyManager) if err != nil { logrus.Errorf("[-] Error signing data: %v", err) } - _ = Verifier(node.Host, data, signature) + _ = Verifier(node.Host, data, signature, allowedPeerID, allowedPeerPubKeyString, isValidator) go monitorNodeData(context.Background(), node) diff --git a/pkg/event/event_client.go b/pkg/event/event_client.go index 2d36cc23..79041ded 100644 --- a/pkg/event/event_client.go +++ b/pkg/event/event_client.go @@ -45,7 +45,7 @@ func (c *EventClient) SendEvent(event Event) error { defer func(Body io.ReadCloser) { err := Body.Close() if err != nil { - + c.Logger.WithError(err).Error("Failed to close body") } }(resp.Body) diff --git a/pkg/llmbridge/client.go b/pkg/llmbridge/client.go index dd9b35a4..11000e80 100644 --- a/pkg/llmbridge/client.go +++ b/pkg/llmbridge/client.go @@ -11,7 +11,6 @@ import ( "net/http" "strings" - "github.com/masa-finance/masa-oracle/pkg/config" "github.com/sashabaranov/go-openai" ) @@ -24,15 +23,13 @@ type GPTClient struct { } // NewClaudeClient creates a new ClaudeClient instance with default configuration. -func NewClaudeClient() *ClaudeClient { - cnf := NewClaudeAPIConfig() - return &ClaudeClient{config: cnf} +func NewClaudeClient(config *ClaudeAPIConfig) *ClaudeClient { + return &ClaudeClient{config: config} } // NewGPTClient creates a new GPTClient instance with default configuration. -func NewGPTClient() *GPTClient { - cnf := NewGPTConfig() - return &GPTClient{config: cnf} +func NewGPTClient(config *GPTAPIConfig) *GPTClient { + return &GPTClient{config: config} } // SendRequest sends an HTTP request to the Claude API with the given payload. @@ -64,8 +61,7 @@ func (c *GPTClient) SendRequest(payload string, model string, prompt string) (st break } - cfg := config.GetInstance() - key := cfg.GPTApiKey + key := c.config.APIKey if key == "" { return "", errors.New("OPENAI_API_KEY is not set") } diff --git a/pkg/llmbridge/config.go b/pkg/llmbridge/config.go index 0b6e2b33..ea957e35 100644 --- a/pkg/llmbridge/config.go +++ b/pkg/llmbridge/config.go @@ -14,9 +14,7 @@ type GPTAPIConfig struct { // NewClaudeAPIConfig creates a new ClaudeAPIConfig instance with values loaded // from the application config. -func NewClaudeAPIConfig() *ClaudeAPIConfig { - appConfig := config.GetInstance() - +func NewClaudeAPIConfig(appConfig *config.AppConfig) *ClaudeAPIConfig { // need to add these to the config package return &ClaudeAPIConfig{ URL: appConfig.ClaudeApiURL, @@ -27,9 +25,7 @@ func NewClaudeAPIConfig() *ClaudeAPIConfig { // NewGPTConfig creates a new GPTConfig instance with values loaded // from the application config. -func NewGPTConfig() *GPTAPIConfig { - appConfig := config.GetInstance() - +func NewGPTConfig(appConfig *config.AppConfig) *GPTAPIConfig { // need to add these to the config package return &GPTAPIConfig{ APIKey: appConfig.GPTApiKey, diff --git a/pkg/llmbridge/sentiment.go b/pkg/llmbridge/sentiment.go index 5f780567..a2a7a24d 100644 --- a/pkg/llmbridge/sentiment.go +++ b/pkg/llmbridge/sentiment.go @@ -21,9 +21,17 @@ import ( // It concatenates the tweets, creates a payload, sends a request to Claude, parses the response, // and returns the concatenated tweets content, a sentiment summary, and any error. func AnalyzeSentimentTweets(tweets []*twitterscraper.TweetResult, model string, prompt string) (string, string, error) { + appConfig := config.GetInstance() + // check if we are using claude or gpt, can add others easily if strings.Contains(model, "claude-") { - client := NewClaudeClient() // Adjusted to call without arguments + client := NewClaudeClient( + &ClaudeAPIConfig{ + URL: appConfig.ClaudeApiURL, + APIKey: appConfig.ClaudeApiKey, + Version: appConfig.ClaudeApiVersion, + }, + ) var validTweets []*twitterscraper.TweetResult for _, tweet := range tweets { @@ -54,7 +62,11 @@ func AnalyzeSentimentTweets(tweets []*twitterscraper.TweetResult, model string, return tweetsContent, sentimentSummary, nil } else if strings.Contains(model, "gpt-") { - client := NewGPTClient() + client := NewGPTClient( + &GPTAPIConfig{ + APIKey: appConfig.GPTApiKey, + }, + ) tweetsContent := ConcatenateTweets(tweets) sentimentSummary, err := client.SendRequest(tweetsContent, model, prompt) if err != nil { @@ -84,7 +96,7 @@ func AnalyzeSentimentTweets(tweets []*twitterscraper.TweetResult, model string, if err != nil { return "", "", err } - uri := config.GetInstance().LLMChatUrl + uri := appConfig.LLMChatUrl if uri == "" { return "", "", errors.New("ollama api url not set") } @@ -124,9 +136,17 @@ func ConcatenateTweets(tweets []*twitterscraper.TweetResult) string { // It concatenates the text, creates a payload, sends a request to Claude, parses the response, // and returns the concatenated content, a sentiment summary, and any error. func AnalyzeSentimentWeb(data string, model string, prompt string) (string, string, error) { + appConfig := config.GetInstance() + // check if we are using claude or gpt, can add others easily if strings.Contains(model, "claude-") { - client := NewClaudeClient() // Adjusted to call without arguments + client := NewClaudeClient( + &ClaudeAPIConfig{ + URL: appConfig.ClaudeApiURL, + APIKey: appConfig.ClaudeApiKey, + Version: appConfig.ClaudeApiVersion, + }, + ) payloadBytes, err := CreatePayload(data, model, prompt) if err != nil { logrus.Errorf("[-] Error creating payload: %v", err) @@ -146,7 +166,11 @@ func AnalyzeSentimentWeb(data string, model string, prompt string) (string, stri return data, sentimentSummary, nil } else if strings.Contains(model, "gpt-") { - client := NewGPTClient() + client := NewGPTClient( + &GPTAPIConfig{ + APIKey: appConfig.GPTApiKey, + }, + ) sentimentSummary, err := client.SendRequest(data, model, prompt) if err != nil { logrus.Errorf("[-] Error sending request to GPT: %v", err) @@ -166,7 +190,7 @@ func AnalyzeSentimentWeb(data string, model string, prompt string) (string, stri if err != nil { return "", "", err } - cfUrl := config.GetInstance().LLMCfUrl + cfUrl := appConfig.LLMCfUrl if cfUrl == "" { return "", "", errors.New("cloudflare workers url not set") } @@ -210,7 +234,7 @@ func AnalyzeSentimentWeb(data string, model string, prompt string) (string, stri if err != nil { return "", "", err } - uri := config.GetInstance().LLMChatUrl + uri := appConfig.LLMChatUrl if uri == "" { return "", "", errors.New("ollama api url not set") } @@ -241,6 +265,8 @@ func AnalyzeSentimentWeb(data string, model string, prompt string) (string, stri // It concatenates the messages, creates a payload, sends a request to the sentiment analysis service, parses the response, // and returns the concatenated messages content, a sentiment summary, and any error. func AnalyzeSentimentDiscord(messages []string, model string, prompt string) (string, string, error) { + appConfig := config.GetInstance() + // Concatenate messages with a newline character messagesContent := strings.Join(messages, "\n") @@ -248,7 +274,14 @@ func AnalyzeSentimentDiscord(messages []string, model string, prompt string) (st // Replace with the actual logic you have for sending requests to your sentiment analysis service // For example, if you're using the Claude API: if strings.Contains(model, "claude-") { - client := NewClaudeClient() // Adjusted to call without arguments + client := NewClaudeClient( + &ClaudeAPIConfig{ + URL: appConfig.ClaudeApiURL, + APIKey: appConfig.ClaudeApiKey, + Version: appConfig.ClaudeApiVersion, + }, + ) + payloadBytes, err := CreatePayload(messagesContent, model, prompt) if err != nil { logrus.Errorf("[-] Error creating payload: %v", err) @@ -289,7 +322,7 @@ func AnalyzeSentimentDiscord(messages []string, model string, prompt string) (st logrus.Errorf("[-] Error marshaling request JSON: %v", err) return "", "", err } - uri := config.GetInstance().LLMChatUrl + uri := appConfig.LLMChatUrl if uri == "" { errMsg := "ollama api url not set" logrus.Errorf("[-] %v", errMsg) @@ -321,6 +354,8 @@ func AnalyzeSentimentDiscord(messages []string, model string, prompt string) (st // AnalyzeSentimentTelegram analyzes the sentiment of the provided Telegram messages by sending them to the sentiment analysis API. func AnalyzeSentimentTelegram(messages []*tg.Message, model string, prompt string) (string, string, error) { + appConfig := config.GetInstance() + // Concatenate messages with a newline character var messageTexts []string for _, msg := range messages { @@ -332,7 +367,13 @@ func AnalyzeSentimentTelegram(messages []*tg.Message, model string, prompt strin // The rest of the code follows the same pattern as AnalyzeSentimentDiscord if strings.Contains(model, "claude-") { - client := NewClaudeClient() // Adjusted to call without arguments + client := NewClaudeClient( + &ClaudeAPIConfig{ + URL: appConfig.ClaudeApiURL, + APIKey: appConfig.ClaudeApiKey, + Version: appConfig.ClaudeApiVersion, + }, + ) payloadBytes, err := CreatePayload(messagesContent, model, prompt) if err != nil { logrus.Errorf("Error creating payload: %v", err) @@ -370,31 +411,31 @@ func AnalyzeSentimentTelegram(messages []*tg.Message, model string, prompt strin requestJSON, err := json.Marshal(genReq) if err != nil { - logrus.Errorf("Error marshaling request JSON: %v", err) + logrus.Errorf("[-] Error marshaling request JSON: %v", err) return "", "", err } - uri := config.GetInstance().LLMChatUrl + uri := appConfig.LLMChatUrl if uri == "" { - errMsg := "ollama api url not set" - logrus.Errorf(errMsg) - return "", "", errors.New(errMsg) + err := errors.New("[-] ollama api url not set") + logrus.Errorf("%v", err) + return "", "", err } resp, err := http.Post(uri, "application/json", bytes.NewReader(requestJSON)) if err != nil { - logrus.Errorf("Error sending request to API: %v", err) + logrus.Errorf("[-] Error sending request to API: %v", err) return "", "", err } defer resp.Body.Close() body, err := io.ReadAll(resp.Body) if err != nil { - logrus.Errorf("Error reading response body: %v", err) + logrus.Errorf("[-] Error reading response body: %v", err) return "", "", err } var payload api.ChatResponse err = json.Unmarshal(body, &payload) if err != nil { - logrus.Errorf("Error unmarshaling response JSON: %v", err) + logrus.Errorf("[-] Error unmarshaling response JSON: %v", err) return "", "", err } diff --git a/pkg/masacrypto/key_manager.go b/pkg/masacrypto/key_manager.go index 426223d1..596922e6 100644 --- a/pkg/masacrypto/key_manager.go +++ b/pkg/masacrypto/key_manager.go @@ -3,13 +3,9 @@ package masacrypto import ( "crypto/ecdsa" "fmt" - "sync" ethCrypto "github.com/ethereum/go-ethereum/crypto" "github.com/libp2p/go-libp2p/core/crypto" - "github.com/sirupsen/logrus" - - "github.com/masa-finance/masa-oracle/pkg/config" ) // KeyManager is meant to simplify the management of cryptographic keys used in the application. @@ -32,19 +28,6 @@ import ( // to Ethereum address format. // - Ensures thread-safe initialization and access to the cryptographic keys through the // use of the sync.Once mechanism. -// -// Usage: -// To access the KeyManager and its functionalities, use the KeyManagerInstance() function -// which returns the singleton instance of KeyManager. This instance can then be used to -// perform various key management tasks, such as retrieving the application's cryptographic -// keys, converting key formats, and more. -// Example: -// keyManager := crypto.KeyManagerInstance() - -var ( - keyManagerInstance *KeyManager - once sync.Once -) // KeyManager holds all the cryptographic entities used in the application. type KeyManager struct { @@ -57,64 +40,58 @@ type KeyManager struct { EthAddress string // Ethereum format address } -// KeyManagerInstance returns the singleton instance of KeyManager, initializing it if necessary. -func KeyManagerInstance() *KeyManager { - once.Do(func() { - keyManagerInstance = &KeyManager{} - if err := keyManagerInstance.loadPrivateKey(); err != nil { - logrus.Fatal("[-] Failed to initialize keys:", err) - } - }) - return keyManagerInstance -} - -// loadPrivateKey loads the node's private key from the environment or a file. -// It first checks for a private key set via the PrivateKey config. If not found, -// it tries to load the key from the PrivateKeyFile. As a last resort, it -// generates a new key and saves it to the private key file. - +// NewKeyManager returns an initialized KeyManager. It first checks for a +// private key set via the PrivateKey config. If not found, it tries to +// load the key from the PrivateKeyFile. As a last resort, it generates +// a new key and saves it to the private key file. // The private key is loaded into both Libp2p and ECDSA formats for use by // different parts of the system. The public key and hex-encoded key representations // are also derived. -func (km *KeyManager) loadPrivateKey() (err error) { - var keyFile string - cfg := config.GetInstance() - if len(cfg.PrivateKey) > 0 { - km.Libp2pPrivKey, err = getPrivateKeyFromEnv(cfg.PrivateKey) +func NewKeyManager(privateKey string, privateKeyFile string) (*KeyManager, error) { + km := &KeyManager{} + + var err error + + if len(privateKey) > 0 { + km.Libp2pPrivKey, err = getPrivateKeyFromEnv(privateKey) if err != nil { - return err + return nil, err } } else { - keyFile = config.GetInstance().PrivateKeyFile // Check if the private key file exists - km.Libp2pPrivKey, err = getPrivateKeyFromFile(keyFile) + km.Libp2pPrivKey, err = getPrivateKeyFromFile(privateKeyFile) if err != nil { - km.Libp2pPrivKey, err = generateNewPrivateKey(keyFile) + km.Libp2pPrivKey, err = generateNewPrivateKey(privateKeyFile) if err != nil { - return err + return nil, err } } } + km.Libp2pPubKey = km.Libp2pPrivKey.GetPublic() + // After obtaining the libp2p privKey, convert it to an ECDSA private key km.EcdsaPrivKey, err = libp2pPrivateKeyToEcdsa(km.Libp2pPrivKey) if err != nil { - return err + return nil, err } - err = saveEcdesaPrivateKeyToFile(km.EcdsaPrivKey, fmt.Sprintf("%s.ecdsa", keyFile)) + err = saveEcdesaPrivateKeyToFile(km.EcdsaPrivKey, fmt.Sprintf("%s.ecdsa", privateKeyFile)) if err != nil { - return err + return nil, err } + km.HexPrivKey, err = getHexEncodedPrivateKey(km.Libp2pPrivKey) if err != nil { - return err + return nil, err } + km.EcdsaPubKey = &km.EcdsaPrivKey.PublicKey km.HexPubKey, err = getHexEncodedPublicKey(km.Libp2pPubKey) if err != nil { - return err + return nil, err } + km.EthAddress = ethCrypto.PubkeyToAddress(km.EcdsaPrivKey.PublicKey).Hex() - return nil + return km, nil } diff --git a/pkg/network/address.go b/pkg/network/address.go index 1741010c..57fe6260 100644 --- a/pkg/network/address.go +++ b/pkg/network/address.go @@ -38,16 +38,8 @@ func GetMultiAddressesForHost(host host.Host) ([]multiaddr.Multiaddr, error) { return addresses, nil } -// GetMultiAddressesForHostQuiet returns the multiaddresses for the host without logging -func GetMultiAddressesForHostQuiet(host host.Host) []multiaddr.Multiaddr { - ma, err := GetMultiAddressesForHost(host) - if err != nil { - logrus.Fatal(err) - } - return ma -} - -// getPublicMultiAddress returns the best public IP address +// getPublicMultiAddress returns the best public IP address (for some definition of "best") +// TODO: This is not guaranteed to work, and should not be necessary since we're using AutoNAT func getPublicMultiAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { ipBytes, err := Get("https://checkip.amazonaws.com", nil) externalIP := net.ParseIP(strings.TrimSpace(string(ipBytes))) @@ -60,9 +52,8 @@ func getPublicMultiAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { } var addrToCopy multiaddr.Multiaddr - for _, addr := range addrs { - addrToCopy = addr - break + if len(addrs) > 0 { + addrToCopy = addrs[0] } publicMultiaddr, err := replaceIPComponent(addrToCopy, externalIP.String()) if err != nil { @@ -73,6 +64,7 @@ func getPublicMultiAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { } // GetPriorityAddress returns the best public or private IP address +// TODO: rm? func GetPriorityAddress(addrs []multiaddr.Multiaddr) multiaddr.Multiaddr { var bestPrivateAddr multiaddr.Multiaddr bestPublicAddr := getPublicMultiAddress(addrs) @@ -156,15 +148,6 @@ func replaceIPComponent(maddr multiaddr.Multiaddr, newIP string) (multiaddr.Mult return multiaddr.Join(components...), nil } -//func contains(slice []multiaddr.Multiaddr, item multiaddr.Multiaddr) bool { -// for _, a := range slice { -// if a.Equal(item) { -// return true -// } -// } -// return false -//} - func getGCPExternalIP() (bool, string) { // Create a new HTTP client with a specific timeout diff --git a/pkg/network/discover.go b/pkg/network/discover.go index 5c25c385..757628eb 100644 --- a/pkg/network/discover.go +++ b/pkg/network/discover.go @@ -10,8 +10,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" - "github.com/masa-finance/masa-oracle/pkg/config" - dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" @@ -24,7 +22,7 @@ import ( // It initializes discovery via the DHT and advertises this node. // It runs discovery in a loop with a ticker, re-advertising and finding new peers. // For each discovered peer, it checks if already connected, and if not, dials them. -func Discover(ctx context.Context, host host.Host, dht *dht.IpfsDHT, protocol protocol.ID) { +func Discover(ctx context.Context, bootNodes []string, host host.Host, dht *dht.IpfsDHT, protocol protocol.ID) { var routingDiscovery *routing.RoutingDiscovery protocolString := string(protocol) logrus.Infof("[+] Discovering peers for protocol: %s", protocolString) @@ -97,16 +95,12 @@ func Discover(ctx context.Context, host host.Host, dht *dht.IpfsDHT, protocol pr ID: availPeer.ID, Addrs: availPeer.Addrs, } - hostAddrInfo := peer.AddrInfo{ - ID: host.ID(), - Addrs: host.Addrs(), - } - if availPeerAddrInfo.ID.String() == hostAddrInfo.ID.String() { + if availPeerAddrInfo.ID == host.ID() { logrus.Debugf("Skipping connect to self: %s", availPeerAddrInfo.ID.String()) continue } if len(availPeerAddrInfo.Addrs) == 0 { - for _, bn := range config.GetInstance().Bootnodes { + for _, bn := range bootNodes { bootNode := strings.Split(bn, "/")[len(strings.Split(bn, "/"))-1] if availPeerAddrInfo.ID.String() != bootNode { logrus.Warningf("Skipping connect to non bootnode peer with no multiaddress: %s", availPeerAddrInfo.ID.String()) @@ -117,7 +111,7 @@ func Discover(ctx context.Context, host host.Host, dht *dht.IpfsDHT, protocol pr logrus.Infof("[+] Available Peer: %s", availPeer.String()) if host.Network().Connectedness(availPeer.ID) != network.Connected { - if isConnectedToBootnode(host, config.GetInstance().Bootnodes) { + if isConnectedToBootnode(host, bootNodes) { _, err := host.Network().DialPeer(ctx, availPeer.ID) if err != nil { logrus.Warningf("[-] Failed to connect to peer %s, will retry...", availPeer.ID.String()) @@ -126,10 +120,10 @@ func Discover(ctx context.Context, host host.Host, dht *dht.IpfsDHT, protocol pr logrus.Infof("[+] Connected to peer %s", availPeer.ID.String()) } } else { - for _, bn := range config.GetInstance().Bootnodes { + for _, bn := range bootNodes { if len(bn) > 0 { logrus.Info("[-] Not connected to any bootnode. Attempting to reconnect...") - reconnectToBootnodes(ctx, host, config.GetInstance().Bootnodes) + reconnectToBootnodes(ctx, host, bootNodes) } } } diff --git a/pkg/network/kdht.go b/pkg/network/kdht.go index 6b4e97cb..49cab0e8 100644 --- a/pkg/network/kdht.go +++ b/pkg/network/kdht.go @@ -30,7 +30,7 @@ type dbValidator struct{} func (dbValidator) Validate(_ string, _ []byte) error { return nil } func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } -func WithDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, protocolId, prefix protocol.ID, peerChan chan PeerEvent, nodeData *pubsub.NodeData) (*dht.IpfsDHT, error) { +func EnableDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, protocolId, prefix protocol.ID, peerChan chan PeerEvent, nodeData *pubsub.NodeData) (*dht.IpfsDHT, error) { options := make([]dht.Option, 0) options = append(options, dht.BucketSize(100)) // Adjust bucket size options = append(options, dht.Concurrency(100)) // Increase concurrency diff --git a/pkg/network/mdns.go b/pkg/network/mdns.go index 6beab0b4..66ba65e7 100644 --- a/pkg/network/mdns.go +++ b/pkg/network/mdns.go @@ -24,7 +24,7 @@ func (n *discoveryNotifee) HandlePeerFound(pi peer.AddrInfo) { n.PeerChan <- pe } -func WithMDNS(host host.Host, rendezvous string, peerChan chan PeerEvent) error { +func EnableMDNS(host host.Host, rendezvous string, peerChan chan PeerEvent) error { notifee := &discoveryNotifee{ PeerChan: peerChan, Rendezvous: rendezvous, diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index a2b963b6..6abe70e3 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -382,11 +382,10 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip net.NodeDataChan <- nodeData net.nodeData.Set(nodeData.PeerId.String(), nodeData) } else { + dataChanged = true if !nd.SelfIdentified { - dataChanged = true nd.SelfIdentified = true } - dataChanged = true nd.IsStaked = nodeData.IsStaked nd.IsDiscordScraper = nodeData.IsDiscordScraper nd.IsTelegramScraper = nodeData.IsTelegramScraper @@ -396,7 +395,6 @@ func (net *NodeEventTracker) AddOrUpdateNodeData(nodeData *NodeData, forceGossip nd.Multiaddrs = nodeData.Multiaddrs nd.EthAddress = nodeData.EthAddress if nd.EthAddress == "" && nodeData.EthAddress != "" { - dataChanged = true nd.EthAddress = nodeData.EthAddress } diff --git a/pkg/scrapers/telegram/telegram_client.go b/pkg/scrapers/telegram/telegram_client.go index 836d2ea3..7b9dbde5 100644 --- a/pkg/scrapers/telegram/telegram_client.go +++ b/pkg/scrapers/telegram/telegram_client.go @@ -9,7 +9,6 @@ import ( "path/filepath" "strconv" "strings" - "sync" "github.com/gotd/contrib/bg" "github.com/gotd/td/session" @@ -21,7 +20,6 @@ import ( var ( client *telegram.Client - once sync.Once appID int // Your actual app ID appHash string // Your actual app hash sessionDir = filepath.Join(os.Getenv("HOME"), ".telegram-sessions") diff --git a/pkg/scrapers/twitter/common.go b/pkg/scrapers/twitter/common.go index 797a5ca8..34cd7f3b 100644 --- a/pkg/scrapers/twitter/common.go +++ b/pkg/scrapers/twitter/common.go @@ -50,9 +50,8 @@ func parseAccounts(accountPairs []string) []*TwitterAccount { }) } -func getAuthenticatedScraper() (*Scraper, *TwitterAccount, error) { +func getAuthenticatedScraper(baseDir string) (*Scraper, *TwitterAccount, error) { once.Do(initializeAccountManager) - baseDir := config.GetInstance().MasaDir account := accountManager.GetNextAccount() if account == nil { diff --git a/pkg/scrapers/twitter/followers.go b/pkg/scrapers/twitter/followers.go index c2acdd90..72020b8c 100644 --- a/pkg/scrapers/twitter/followers.go +++ b/pkg/scrapers/twitter/followers.go @@ -7,19 +7,21 @@ import ( "github.com/sirupsen/logrus" ) -func ScrapeFollowersForProfile(username string, count int) ([]twitterscraper.Legacy, error) { - scraper, account, err := getAuthenticatedScraper() +func ScrapeFollowersForProfile(baseDir string, username string, count int) ([]twitterscraper.Legacy, error) { + scraper, account, err := getAuthenticatedScraper(baseDir) if err != nil { return nil, err } followingResponse, errString, _ := scraper.FetchFollowers(username, count, "") if errString != "" { - if handleRateLimit(fmt.Errorf(errString), account) { - return nil, fmt.Errorf("rate limited") + err := fmt.Errorf("rate limited: %s", errString) + if handleRateLimit(err, account) { + return nil, err } - logrus.Errorf("Error fetching followers: %v", errString) - return nil, fmt.Errorf("%v", errString) + + logrus.Errorf("[-] Error fetching followers: %s", errString) + return nil, fmt.Errorf("error fetching followers: %s", errString) } return followingResponse, nil diff --git a/pkg/scrapers/twitter/profile.go b/pkg/scrapers/twitter/profile.go index cfe77096..547ae987 100644 --- a/pkg/scrapers/twitter/profile.go +++ b/pkg/scrapers/twitter/profile.go @@ -4,8 +4,8 @@ import ( twitterscraper "github.com/masa-finance/masa-twitter-scraper" ) -func ScrapeTweetsProfile(username string) (twitterscraper.Profile, error) { - scraper, account, err := getAuthenticatedScraper() +func ScrapeTweetsProfile(baseDir string, username string) (twitterscraper.Profile, error) { + scraper, account, err := getAuthenticatedScraper(baseDir) if err != nil { return twitterscraper.Profile{}, err } diff --git a/pkg/scrapers/twitter/tweets.go b/pkg/scrapers/twitter/tweets.go index b32b2c4d..e58506ac 100644 --- a/pkg/scrapers/twitter/tweets.go +++ b/pkg/scrapers/twitter/tweets.go @@ -11,8 +11,8 @@ type TweetResult struct { Error error } -func ScrapeTweetsByQuery(query string, count int) ([]*TweetResult, error) { - scraper, account, err := getAuthenticatedScraper() +func ScrapeTweetsByQuery(baseDir string, query string, count int) ([]*TweetResult, error) { + scraper, account, err := getAuthenticatedScraper(baseDir) if err != nil { return nil, err } diff --git a/pkg/scrapers/web/web.go b/pkg/scrapers/web/web.go index 45af5de1..e868d66f 100644 --- a/pkg/scrapers/web/web.go +++ b/pkg/scrapers/web/web.go @@ -66,11 +66,14 @@ func ScrapeWebData(uri []string, depth int) ([]byte, error) { ) // Adjust the parallelism and delay based on your needs and server capacity - c.Limit(&colly.LimitRule{ + limitRule := colly.LimitRule{ DomainGlob: "*", Parallelism: 4, // Increased parallelism Delay: 500 * time.Millisecond, // Reduced delay - }) + } + if err := c.Limit(&limitRule); err != nil { + logrus.Errorf("[-] Unable to set scraper limit. Using default. Error: %v", err) + } // Increase the timeout slightly if necessary c.SetRequestTimeout(240 * time.Second) // Increased to 4 minutes diff --git a/pkg/staking/contracts.go b/pkg/staking/contracts.go index d58eeb9e..5092da73 100644 --- a/pkg/staking/contracts.go +++ b/pkg/staking/contracts.go @@ -6,8 +6,6 @@ import ( "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" - - "github.com/masa-finance/masa-oracle/pkg/config" ) var MasaTokenAddress common.Address @@ -18,10 +16,10 @@ type Client struct { PrivateKey *ecdsa.PrivateKey } -// NewClient initializes a new Client instance with the provided private key. +// NewClient initializes a new ethClient.Client instance with the provided private key. // It loads the contract addresses, initializes an Ethereum client, and returns // a Client instance. -func NewClient(privateKey *ecdsa.PrivateKey) (*Client, error) { +func NewClient(rpcUrl string, privateKey *ecdsa.PrivateKey) (*Client, error) { addresses, err := LoadContractAddresses() if err != nil { return nil, fmt.Errorf("[-] Failed to load contract addresses: %v", err) @@ -30,7 +28,7 @@ func NewClient(privateKey *ecdsa.PrivateKey) (*Client, error) { MasaTokenAddress = common.HexToAddress(addresses.Sepolia.MasaToken) ProtocolStakingContractAddress = common.HexToAddress(addresses.Sepolia.ProtocolStaking) - client, err := ethclient.Dial(config.GetInstance().RpcUrl) + client, err := ethclient.Dial(rpcUrl) if err != nil { return nil, err } diff --git a/pkg/staking/verify.go b/pkg/staking/verify.go index 73019fad..c4ea6168 100644 --- a/pkg/staking/verify.go +++ b/pkg/staking/verify.go @@ -9,21 +9,14 @@ import ( "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" - - "github.com/masa-finance/masa-oracle/pkg/config" ) // VerifyStakingEvent checks if the given user address has staked tokens by // calling the stakes() view function on the ProtocolStaking contract. // It connects to an Ethereum node, encodes the stakes call, calls the contract, // unpacks the result, and returns true if the stakes amount is > 0. -func VerifyStakingEvent(userAddress string) (bool, error) { - rpcURL := config.GetInstance().RpcUrl - if rpcURL == "" { - return false, fmt.Errorf("%s is not set", config.RpcUrl) - } - - client, err := ethclient.Dial(rpcURL) +func VerifyStakingEvent(rpcUrl string, userAddress string) (bool, error) { + client, err := ethclient.Dial(rpcUrl) if err != nil { return false, fmt.Errorf("[-] Failed to connect to the Ethereum client: %v", err) } diff --git a/pkg/tests/scrapers/twitter_scraper_test.go b/pkg/tests/scrapers/twitter_scraper_test.go index 96a1c45f..42544978 100644 --- a/pkg/tests/scrapers/twitter_scraper_test.go +++ b/pkg/tests/scrapers/twitter_scraper_test.go @@ -1,3 +1,4 @@ +// TODO: This is a WIP // Package scrapers_test contains integration tests for the Twitter scraper functionality. // // Dev Notes: @@ -13,7 +14,6 @@ import ( "runtime" "github.com/joho/godotenv" - "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" twitterscraper "github.com/masa-finance/masa-twitter-scraper" . "github.com/onsi/ginkgo/v2" @@ -26,6 +26,7 @@ var _ = Describe("Twitter Auth Function", func() { twitterUsername string twitterPassword string twoFACode string + masaDir string ) loadEnv := func() { @@ -44,8 +45,7 @@ var _ = Describe("Twitter Auth Function", func() { BeforeEach(func() { loadEnv() - tempDir := GinkgoT().TempDir() - config.GetInstance().MasaDir = tempDir + masaDir = GinkgoT().TempDir() twitterUsername = os.Getenv("TWITTER_USERNAME") twitterPassword = os.Getenv("TWITTER_PASSWORD") @@ -53,20 +53,18 @@ var _ = Describe("Twitter Auth Function", func() { Expect(twitterUsername).NotTo(BeEmpty(), "TWITTER_USERNAME environment variable is not set") Expect(twitterPassword).NotTo(BeEmpty(), "TWITTER_PASSWORD environment variable is not set") - - config.GetInstance().TwitterUsername = twitterUsername - config.GetInstance().TwitterPassword = twitterPassword - config.GetInstance().Twitter2FaCode = twoFACode + Expect(twoFACode).NotTo(BeEmpty(), "TWITTER_PASSWORD environment variable is not set") }) authenticate := func() *twitterscraper.Scraper { + // TODO Actually authenticate return nil //return twitter.Auth() } PIt("authenticates and logs in successfully", func() { // Ensure cookie file doesn't exist before authentication - cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + cookieFile := filepath.Join(masaDir, "twitter_cookies.json") Expect(cookieFile).NotTo(BeAnExistingFile()) // Authenticate @@ -80,7 +78,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(scraper.IsLoggedIn()).To(BeTrue()) // Attempt a simple operation to verify the session is valid - profile, err := twitter.ScrapeTweetsProfile("twitter") + profile, err := twitter.ScrapeTweetsProfile(masaDir, "twitter") Expect(err).To(BeNil()) Expect(profile.Username).To(Equal("twitter")) @@ -93,11 +91,11 @@ var _ = Describe("Twitter Auth Function", func() { Expect(firstScraper).NotTo(BeNil()) // Verify cookie file is created - cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + cookieFile := filepath.Join(masaDir, "twitter_cookies.json") Expect(cookieFile).To(BeAnExistingFile()) // Clear the scraper to force cookie reuse - firstScraper = nil + firstScraper = nil // nolint: ineffassign // Second authentication (should use cookies) secondScraper := authenticate() @@ -107,7 +105,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(secondScraper.IsLoggedIn()).To(BeTrue()) // Attempt a simple operation to verify the session is valid - profile, err := twitter.ScrapeTweetsProfile("twitter") + profile, err := twitter.ScrapeTweetsProfile(masaDir, "twitter") Expect(err).To(BeNil()) Expect(profile.Username).To(Equal("twitter")) @@ -120,11 +118,11 @@ var _ = Describe("Twitter Auth Function", func() { Expect(firstScraper).NotTo(BeNil()) // Verify cookie file is created - cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + cookieFile := filepath.Join(masaDir, "twitter_cookies.json") Expect(cookieFile).To(BeAnExistingFile()) // Clear the scraper to force cookie reuse - firstScraper = nil + firstScraper = nil // nolint: ineffassign // Second authentication (should use cookies) secondScraper := authenticate() @@ -134,12 +132,12 @@ var _ = Describe("Twitter Auth Function", func() { Expect(secondScraper.IsLoggedIn()).To(BeTrue()) // Attempt to scrape profile - profile, err := twitter.ScrapeTweetsProfile("god") + profile, err := twitter.ScrapeTweetsProfile(masaDir, "god") Expect(err).To(BeNil()) logrus.Infof("Profile of 'god': %+v", profile) // Scrape recent #Bitcoin tweets - tweets, err := twitter.ScrapeTweetsByQuery("#Bitcoin", 3) + tweets, err := twitter.ScrapeTweetsByQuery(masaDir, "#Bitcoin", 3) Expect(err).To(BeNil()) Expect(tweets).To(HaveLen(3)) @@ -150,6 +148,6 @@ var _ = Describe("Twitter Auth Function", func() { }) AfterEach(func() { - os.RemoveAll(config.GetInstance().MasaDir) + os.RemoveAll(masaDir) }) }) diff --git a/pkg/tests/twitter/twitter_scraper_test.go b/pkg/tests/twitter/twitter_scraper_test.go index ee251b4b..b41587cb 100644 --- a/pkg/tests/twitter/twitter_scraper_test.go +++ b/pkg/tests/twitter/twitter_scraper_test.go @@ -6,7 +6,6 @@ import ( "runtime" "github.com/joho/godotenv" - "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/scrapers/twitter" twitterscraper "github.com/masa-finance/masa-twitter-scraper" . "github.com/onsi/ginkgo/v2" @@ -19,6 +18,7 @@ var _ = Describe("Twitter Auth Function", func() { twitterUsername string twitterPassword string twoFACode string + masaDir string ) loadEnv := func() { @@ -37,8 +37,7 @@ var _ = Describe("Twitter Auth Function", func() { BeforeEach(func() { loadEnv() - tempDir := GinkgoT().TempDir() - config.GetInstance().MasaDir = tempDir + masaDir = GinkgoT().TempDir() twitterUsername = os.Getenv("TWITTER_USERNAME") twitterPassword = os.Getenv("TWITTER_PASSWORD") @@ -46,20 +45,18 @@ var _ = Describe("Twitter Auth Function", func() { Expect(twitterUsername).NotTo(BeEmpty(), "TWITTER_USERNAME environment variable is not set") Expect(twitterPassword).NotTo(BeEmpty(), "TWITTER_PASSWORD environment variable is not set") - - config.GetInstance().TwitterUsername = twitterUsername - config.GetInstance().TwitterPassword = twitterPassword - config.GetInstance().Twitter2FaCode = twoFACode + Expect(twoFACode).NotTo(BeEmpty(), "TWITTER_PASSWORD environment variable is not set") }) authenticate := func() *twitterscraper.Scraper { + // TODO Actually authenticate return nil //return twitter.Auth() } PIt("authenticates and logs in successfully", func() { // Ensure cookie file doesn't exist before authentication - cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + cookieFile := filepath.Join(masaDir, "twitter_cookies.json") Expect(cookieFile).NotTo(BeAnExistingFile()) // Authenticate @@ -73,7 +70,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(scraper.IsLoggedIn()).To(BeTrue()) // Attempt a simple operation to verify the session is valid - profile, err := twitter.ScrapeTweetsProfile("twitter") + profile, err := twitter.ScrapeTweetsProfile(masaDir, "twitter") Expect(err).To(BeNil()) Expect(profile.Username).To(Equal("twitter")) @@ -86,11 +83,11 @@ var _ = Describe("Twitter Auth Function", func() { Expect(firstScraper).NotTo(BeNil()) // Verify cookie file is created - cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + cookieFile := filepath.Join(masaDir, "twitter_cookies.json") Expect(cookieFile).To(BeAnExistingFile()) // Clear the scraper to force cookie reuse - firstScraper = nil + firstScraper = nil // nolint: ineffassign // Second authentication (should use cookies) secondScraper := authenticate() @@ -100,7 +97,7 @@ var _ = Describe("Twitter Auth Function", func() { Expect(secondScraper.IsLoggedIn()).To(BeTrue()) // Attempt a simple operation to verify the session is valid - profile, err := twitter.ScrapeTweetsProfile("twitter") + profile, err := twitter.ScrapeTweetsProfile(masaDir, "twitter") Expect(err).To(BeNil()) Expect(profile.Username).To(Equal("twitter")) @@ -113,11 +110,11 @@ var _ = Describe("Twitter Auth Function", func() { Expect(firstScraper).NotTo(BeNil()) // Verify cookie file is created - cookieFile := filepath.Join(config.GetInstance().MasaDir, "twitter_cookies.json") + cookieFile := filepath.Join(masaDir, "twitter_cookies.json") Expect(cookieFile).To(BeAnExistingFile()) // Clear the scraper to force cookie reuse - firstScraper = nil + firstScraper = nil // nolint: ineffassign // Second authentication (should use cookies) secondScraper := authenticate() @@ -127,12 +124,12 @@ var _ = Describe("Twitter Auth Function", func() { Expect(secondScraper.IsLoggedIn()).To(BeTrue()) // Attempt to scrape profile - profile, err := twitter.ScrapeTweetsProfile("god") + profile, err := twitter.ScrapeTweetsProfile(masaDir, "god") Expect(err).To(BeNil()) logrus.Infof("Profile of 'god': %+v", profile) // Scrape recent #Bitcoin tweets - tweets, err := twitter.ScrapeTweetsByQuery("#Bitcoin", 3) + tweets, err := twitter.ScrapeTweetsByQuery(masaDir, "#Bitcoin", 3) Expect(err).To(BeNil()) Expect(tweets).To(HaveLen(3)) @@ -143,6 +140,6 @@ var _ = Describe("Twitter Auth Function", func() { }) AfterEach(func() { - os.RemoveAll(config.GetInstance().MasaDir) + os.RemoveAll(masaDir) }) }) diff --git a/pkg/workers/handlers/llm.go b/pkg/workers/handlers/llm.go index ed9687ae..e527a046 100644 --- a/pkg/workers/handlers/llm.go +++ b/pkg/workers/handlers/llm.go @@ -6,9 +6,8 @@ import ( "github.com/sirupsen/logrus" - "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/network" - "github.com/masa-finance/masa-oracle/pkg/workers/types" + data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) // TODO: LLMChatBody isn't used anywhere in the codebase. Remove after testing @@ -22,13 +21,20 @@ type LLMChatBody struct { Stream bool `json:"stream"` } -type LLMChatHandler struct{} +type LLMChatHandler struct { + llmChatUrl string +} + +func NewLLMChatHandler(llmChatUrl string) *LLMChatHandler { + return &LLMChatHandler{ + llmChatUrl: llmChatUrl, + } +} // HandleWork implements the WorkHandler interface for LLMChatHandler. func (h *LLMChatHandler) HandleWork(data []byte) data_types.WorkResponse { logrus.Infof("[+] LLM Chat %s", data) - uri := config.GetInstance().LLMChatUrl - if uri == "" { + if h.llmChatUrl == "" { return data_types.WorkResponse{Error: "missing env variable LLM_CHAT_URL"} } @@ -41,7 +47,7 @@ func (h *LLMChatHandler) HandleWork(data []byte) data_types.WorkResponse { if err != nil { return data_types.WorkResponse{Error: fmt.Sprintf("unable to marshal LLM chat data: %v", err)} } - resp, err := network.Post(uri, jsnBytes, nil) + resp, err := network.Post(h.llmChatUrl, jsnBytes, nil) if err != nil { return data_types.WorkResponse{Error: fmt.Sprintf("unable to post LLM chat data: %v", err)} } diff --git a/pkg/workers/handlers/twitter.go b/pkg/workers/handlers/twitter.go index bd4cc00e..5709a23e 100644 --- a/pkg/workers/handlers/twitter.go +++ b/pkg/workers/handlers/twitter.go @@ -9,9 +9,9 @@ import ( data_types "github.com/masa-finance/masa-oracle/pkg/workers/types" ) -type TwitterQueryHandler struct{} -type TwitterFollowersHandler struct{} -type TwitterProfileHandler struct{} +type TwitterQueryHandler struct{ MasaDir string } +type TwitterFollowersHandler struct{ MasaDir string } +type TwitterProfileHandler struct{ MasaDir string } func (h *TwitterQueryHandler) HandleWork(data []byte) data_types.WorkResponse { logrus.Infof("[+] TwitterQueryHandler input: %s", data) @@ -25,7 +25,7 @@ func (h *TwitterQueryHandler) HandleWork(data []byte) data_types.WorkResponse { logrus.Infof("[+] Scraping tweets for query: %s, count: %d", query, count) - resp, err := twitter.ScrapeTweetsByQuery(query, count) + resp, err := twitter.ScrapeTweetsByQuery(h.MasaDir, query, count) if err != nil { logrus.Errorf("[+] TwitterQueryHandler error scraping tweets: %v", err) return data_types.WorkResponse{Error: err.Error()} @@ -48,7 +48,7 @@ func (h *TwitterFollowersHandler) HandleWork(data []byte) data_types.WorkRespons } username := dataMap["username"].(string) count := int(dataMap["count"].(float64)) - resp, err := twitter.ScrapeFollowersForProfile(username, count) + resp, err := twitter.ScrapeFollowersForProfile(h.MasaDir, username, count) if err != nil { return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter followers: %v", err)} } @@ -64,7 +64,7 @@ func (h *TwitterProfileHandler) HandleWork(data []byte) data_types.WorkResponse return data_types.WorkResponse{Error: fmt.Sprintf("unable to parse twitter profile data: %v", err)} } username := dataMap["username"].(string) - resp, err := twitter.ScrapeTweetsProfile(username) + resp, err := twitter.ScrapeTweetsProfile(h.MasaDir, username) if err != nil { return data_types.WorkResponse{Error: fmt.Sprintf("unable to get twitter profile: %v", err)} } diff --git a/pkg/workers/options.go b/pkg/workers/options.go index 0f261748..4387edf6 100644 --- a/pkg/workers/options.go +++ b/pkg/workers/options.go @@ -5,6 +5,8 @@ type WorkerOption struct { isWebScraperWorker bool isLLMServerWorker bool isDiscordScraperWorker bool + llmChatUrl string + masaDir string } type WorkerOptionFunc func(*WorkerOption) @@ -25,6 +27,18 @@ var EnableDiscordScraperWorker = func(o *WorkerOption) { o.isDiscordScraperWorker = true } +func WithLlmChatUrl(url string) WorkerOptionFunc { + return func(o *WorkerOption) { + o.llmChatUrl = url + } +} + +func WithMasaDir(dir string) WorkerOptionFunc { + return func(o *WorkerOption) { + o.masaDir = dir + } +} + func (a *WorkerOption) Apply(opts ...WorkerOptionFunc) { for _, opt := range opts { opt(a) diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index 4fc87e0b..563c91aa 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -33,9 +33,9 @@ func NewWorkHandlerManager(opts ...WorkerOptionFunc) *WorkHandlerManager { } if options.isTwitterWorker { - whm.addWorkHandler(data_types.Twitter, &handlers.TwitterQueryHandler{}) - whm.addWorkHandler(data_types.TwitterFollowers, &handlers.TwitterFollowersHandler{}) - whm.addWorkHandler(data_types.TwitterProfile, &handlers.TwitterProfileHandler{}) + whm.addWorkHandler(data_types.Twitter, &handlers.TwitterQueryHandler{MasaDir: options.masaDir}) + whm.addWorkHandler(data_types.TwitterFollowers, &handlers.TwitterFollowersHandler{MasaDir: options.masaDir}) + whm.addWorkHandler(data_types.TwitterProfile, &handlers.TwitterProfileHandler{MasaDir: options.masaDir}) } if options.isWebScraperWorker { @@ -43,7 +43,7 @@ func NewWorkHandlerManager(opts ...WorkerOptionFunc) *WorkHandlerManager { } if options.isLLMServerWorker { - whm.addWorkHandler(data_types.LLMChat, &handlers.LLMChatHandler{}) + whm.addWorkHandler(data_types.LLMChat, handlers.NewLLMChatHandler(options.llmChatUrl)) } if options.isDiscordScraperWorker { diff --git a/pkg/workers/worker_selection.go b/pkg/workers/worker_selection.go index ed93d4c3..f96bb70e 100644 --- a/pkg/workers/worker_selection.go +++ b/pkg/workers/worker_selection.go @@ -2,7 +2,6 @@ package workers import ( "math/rand" - "time" "github.com/libp2p/go-libp2p/core/peer" "github.com/sirupsen/logrus" @@ -34,7 +33,6 @@ func getTwitterWorkers(node *node.OracleNode, nodes []pubsub.NodeData, limit int topPerformers := nodes[:poolSize] // Shuffle the top performers - rand.Seed(time.Now().UnixNano()) rand.Shuffle(len(topPerformers), func(i, j int) { topPerformers[i], topPerformers[j] = topPerformers[j], topPerformers[i] })