diff --git a/.env.example b/.env.example index b6386fee..33a3ba66 100644 --- a/.env.example +++ b/.env.example @@ -1,61 +1,49 @@ -## note: all nodes need to be staked to participate - -## BOOTNODE configuration -ENV=test -FILE_PATH=. -PORT=8080 -RPC_URL=https://ethereum-sepolia.publicnode.com - -## ORACLE without SCRAPER configuration +## Note: All nodes need to be staked to participate +## To stake your node: +## 1. Ensure you have Sepolia ETH from a public faucet +## 2. Ensure you have Sepolia MASA tokens run make faucet to get 1000 Sepolia MASA +## 3. Start your node and copy the Public Key from the startup logs +## 4. Use the following commands: make stake or ./masa-node --stake (e.g., ./masa-node --stake 1000 for 1000 MASA) +## 5. Wait for the transaction to be confirmed on the blockchain +## 6. Restart your node after staking +## Minimum stake required: 1000 Sepolia MASA + +## Minimum .env configuration for running a staked node and getting data from the network +## With this configuration, you can retrieve data but cannot provide data as a worker BOOTNODES=/ip4/35.223.224.220/udp/4001/quic-v1/p2p/16Uiu2HAmPxXXjR1XJEwckh6q1UStheMmGaGe8fyXdeRs3SejadSa ENV=test FILE_PATH=. PORT=8080 RPC_URL=https://ethereum-sepolia.publicnode.com -## VALIDATOR configuration -BOOTNODES=/ip4/35.223.224.220/udp/4001/quic-v1/p2p/16Uiu2HAmPxXXjR1XJEwckh6q1UStheMmGaGe8fyXdeRs3SejadSa -ENV=test -FILE_PATH=. -PORT=8080 -RPC_URL=https://ethereum-sepolia.publicnode.com -VALIDATOR=true -## ORACLE with SCRAPER configuration -BOOTNODES=/ip4/35.223.224.220/udp/4001/quic-v1/p2p/16Uiu2HAmPxXXjR1XJEwckh6q1UStheMmGaGe8fyXdeRs3SejadSa -ENV=test -FILE_PATH=. -PORT=8080 -RPC_URL=https://ethereum-sepolia.publicnode.com -TWITTER_USERNAME= your twitter username -TWITTER_PASSWORD= your twitter password -TWITTER_2FA_CODE= your twitter 2fa code -DISCORD_SCRAPER=true -DISCORD_BOT_TOKEN= your discord bot token -TWITTER_SCRAPER=true -WEB_SCRAPER=true -TELEGRAM_SCRAPER=false - -# Go to my.telegram.org/auth and after logging in click the developer option to get these -TELEGRAM_APP_ID= -TELEGRAM_APP_HASH= +# Worker Configuration +# Note: To become a worker and provide data to the network, you must configure the following settings -## Optional added features for ORACLE or VALIDATOR nodes only - -# Claude API and Elevenlabs API -CLAUDE_API_KEY= -CLAUDE_API_URL= -CLAUDE_API_VERSION= -ELAB_KEY= - -# For use with ollama https://hub.docker.com/r/ollama/ollama -LLM_SERVER=true -LLM_CHAT_URL=http://localhost:11434/api/chat +# Twitter Configuration +# Note: A pro-paid Twitter account is required to run a Twitter worker +TWITTER_SCRAPER=true +TWITTER_USERNAME=your pro-paid twitter username (without the '@' symbol) +TWITTER_PASSWORD=your twitter password +# Important: If your 2FA code times out, you'll need to restart your node and login by submitting a request. +# We recommend temporarily disabling 2FA to save your cookies locally to your .home or .masa directory, then re-enabling it afterwards. +# This will help avoid frequent login requests and potential timeouts. +TWITTER_2FA_CODE=your twitter 2fa code (if applicable) + +# Discord Configuration +# Note: You must have a bot in a Discord guild to scrape Discord channel messages +DISCORD_SCRAPER=true +DISCORD_BOT_TOKEN=your discord bot token -# Bring your own Cloudflare worker token -LLM_CF_URL= -LLM_CF_TOKEN= +# Web Scraper Configuration +WEB_SCRAPER=true -# Bring your own OpenAI api key -OPENAI_API_KEY= -PROMPT="You are a helpful assistant." +# Telegram Configuration +# Note: You must configure a bot as a developer and add it to a channel to scrape Telegram channel messages +TELEGRAM_SCRAPER=false +# To obtain these credentials, go to my.telegram.org/auth, log in, and select the API development tools +TELEGRAM_APP_ID=your telegram app id +TELEGRAM_APP_HASH=your telegram app hash +# Configure your Telegram bot and add it to the channel you want to scrape +TELEGRAM_BOT_TOKEN=your telegram bot token +TELEGRAM_CHANNEL_USERNAME=username of the channel to scrape (without the '@' symbol) \ No newline at end of file diff --git a/.github/workflows/static.yml b/.github/disabled/static.yml similarity index 100% rename from .github/workflows/static.yml rename to .github/disabled/static.yml diff --git a/.github/workflows/build-goreleaser-test.yml b/.github/workflows/build-goreleaser-test.yml index 761f013d..00af95d7 100644 --- a/.github/workflows/build-goreleaser-test.yml +++ b/.github/workflows/build-goreleaser-test.yml @@ -19,7 +19,7 @@ jobs: - name: Run GoReleaser uses: goreleaser/goreleaser-action@v6 with: - version: v2.1.0 + version: '~> v2' args: build --clean --snapshot env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.github/workflows/goreleaser.yml b/.github/workflows/goreleaser.yml index dccaa4f8..81c10279 100644 --- a/.github/workflows/goreleaser.yml +++ b/.github/workflows/goreleaser.yml @@ -24,7 +24,7 @@ jobs: - name: Run GoReleaser uses: goreleaser/goreleaser-action@v6 with: - version: v2.1.0 + version: '~> v2' args: release --clean env: GITHUB_TOKEN: ${{ secrets.GITHUB_TOKEN }} diff --git a/.goreleaser.yml b/.goreleaser.yml index 230d00e2..64b64cc6 100644 --- a/.goreleaser.yml +++ b/.goreleaser.yml @@ -1,7 +1,27 @@ # Make sure to check the documentation at http://goreleaser.com version: 2 builds: - - main: ./cmd/masa-node/main.go + - main: ./cmd/masa-node + id: "masa-node" + binary: masa-node + ldflags: + - -w -s + - -X github.com/masa-finance/masa-oracle/internal.Version={{.Tag}} + - -X github.com/masa-finance/masa-oracle/internal.Commit={{.Commit}} + env: + - CGO_ENABLED=0 + goos: + - linux + - windows + - darwin + - freebsd + goarch: + - amd64 + - arm + - arm64 + - main: ./cmd/masa-node-cli + id: "masa-node-cli" + binary: masa-node-cli ldflags: - -w -s - -X github.com/masa-finance/masa-oracle/internal.Version={{.Tag}} diff --git a/DOCKER.md b/DOCKER.md index 9fc01cd1..87f54a62 100644 --- a/DOCKER.md +++ b/DOCKER.md @@ -31,8 +31,8 @@ Example `.env` file content: BOOTNODES=/ip4/35.223.224.220/udp/4001/quic-v1/p2p/16Uiu2HAmPxXXjR1XJEwckh6q1UStheMmGaGe8fyXdeRs3SejadSa RPC_URL=https://ethereum-sepolia.publicnode.com ENV=test -TWITTER_USER="your_username" -TWITTER_PASS="your_password" +TWITTER_USERNAME="your_username" +TWITTER_PASSWORD="your_password" TWITTER_2FA_CODE="your_2fa_code" TWITTER_SCRAPER=True diff --git a/Makefile b/Makefile index 117222e3..620c0158 100644 --- a/Makefile +++ b/Makefile @@ -1,8 +1,22 @@ VERSION := $(shell git describe --tags --abbrev=0) +GORELEASER?= +# check if goreleaser exists +ifeq (, $(shell which goreleaser)) + GORELEASER=curl -sfL https://goreleaser.com/static/run | bash -s -- +else + GORELEASER=$(shell which goreleaser) +endif + print-version: @echo "Version: ${VERSION}" +dev-dist: + $(GORELEASER) build --snapshot --single-target --clean + +dist: + $(GORELEASER) build --single-target --clean + build: @go build -v -ldflags "-X github.com/masa-finance/masa-oracle/internal/versioning.ApplicationVersion=${VERSION}" -o ./bin/masa-node ./cmd/masa-node @go build -v -ldflags "-X github.com/masa-finance/masa-oracle/internal/versioning.ApplicationVersion=${VERSION}" -o ./bin/masa-node-cli ./cmd/masa-node-cli diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index ff934f8f..85f7e956 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -69,7 +69,7 @@ func main() { isValidator := cfg.Validator // Create a new OracleNode - node, err := masa.NewOracleNode(ctx, isStaked) + node, err := masa.NewOracleNode(ctx, config.EnableStaked) if err != nil { logrus.Fatal(err) } @@ -78,6 +78,7 @@ func main() { logrus.Fatal(err) } + node.NodeTracker.GetAllNodeData() if cfg.TwitterScraper && cfg.DiscordScraper && cfg.WebScraper { logrus.Warn("[+] Node is set as all types of scrapers. This may not be intended behavior.") } diff --git a/docs/guides-and-tutorials/runpod.md b/docs/guides-and-tutorials/runpod.md new file mode 100644 index 00000000..57fd2258 --- /dev/null +++ b/docs/guides-and-tutorials/runpod.md @@ -0,0 +1,83 @@ +# Quickstart: Deploying Masa Oracle Node on RunPod + +## Prerequisites + +1. A RunPod account (sign up at https://www.runpod.io/) +2. Basic knowledge of Docker and command-line interfaces + +## Step 1: Use the Official Masa Finance Docker Image + +Instead of building your own Docker image, we'll use the official Masa Finance image from Docker Hub. + +## Step 2: Create a RunPod Template + +1. Log in to your RunPod account. +2. Go to "Templates" in the left sidebar. +3. Click "New Template". +4. Fill in the template details: + - Name: Masa Protocol Node + - Image: masafinance/masa-node:latest + - Container Disk: 10 GB (or as needed) + - Volume Disk: 20 GB (or as needed) + - Ports: 4001/tcp, 4001/udp, 8080/tcp + +5. In the "Docker Command" field, enter: + ```bash + /usr/bin/masa-node --bootnodes="$BOOTNODES" --env="$ENV" --validator="$VALIDATOR" --cachePath="$CACHE_PATH" + ``` + +6. Add environment variables: + - BOOTNODES + - ENV + - RPC_URL + - FILE_PATH + - VALIDATOR + - CACHE_PATH + - TWITTER_PASSwORD + - TWITTER_USERNAME + - TWITTER_2FA_CODE + - TWITTER_SCRAPER + +7. Save the template. + +## Step 3: Deploy Your Node on RunPod + +1. Go to "Pods" in the left sidebar. +2. Click "Deploy". +3. Select your "Masa Oracle Node" template. +4. Choose a GPU type (CPU-only might be sufficient for basic node operation). +5. Set the values for your environment variables. +6. Deploy the pod. + +## Step 4: Access Your Node + +1. Once deployed, go to the "Pods" page. +2. Find your Masa Oracle Node pod and click on it. +3. You can access the node's logs and terminal from this page. + +## Step 5: Verify Node Operation + +1. Use the provided terminal to check if your node is running correctly: + ```bash + docker logs masa-node + ``` + +2. Look for the startup message indicating successful connection to the network. + +## Step 6: Stake Your Node (if not already staked) + +If your node isn't staked, you can stake it using the RunPod terminal: + +```bash +docker exec masa-node /usr/bin/masa-node --stake 1000 +``` + +Replace 1000 with the amount of MASA tokens you want to stake. + +## Conclusion + +You've now successfully deployed your Masa Oracle Node on RunPod using the official Masa Finance Docker image. Monitor your node's performance and logs regularly to ensure it's operating correctly. Remember to keep your environment variables and node software up to date. + +For more detailed information on node operation and troubleshooting, refer to the main Masa Oracle documentation. + +[Source: https://hub.docker.com/repositories/masafinance] \ No newline at end of file diff --git a/docs/oracle-node/quickstart.md b/docs/oracle-node/quickstart.md index fae4d0a8..a19c8e91 100644 --- a/docs/oracle-node/quickstart.md +++ b/docs/oracle-node/quickstart.md @@ -51,8 +51,8 @@ OPENAI_API_KEY= PROMPT="You are a helpful assistant." # X -TWITTER_USER="yourusername" -TWITTER_PASS="yourpassword" +TWITTER_USERNAME="yourusername" +TWITTER_PASSWORD="yourpassword" TWITTER_2FA_CODE="your2fa" # Worker node config; default = false diff --git a/docs/worker-node/quickstart.md b/docs/worker-node/quickstart.md index 0c84d2db..08fd73c2 100644 --- a/docs/worker-node/quickstart.md +++ b/docs/worker-node/quickstart.md @@ -51,8 +51,8 @@ OPENAI_API_KEY= PROMPT="You are a helpful assistant." # X -TWITTER_USER="yourusername" -TWITTER_PASS="yourpassword" +TWITTER_USERNAME="yourusername" +TWITTER_PASSWORD="yourpassword" TWITTER_2FA_CODE="your2fa" # Worker node config; default = false diff --git a/go.mod b/go.mod index ec90d8a5..b7f31610 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,8 @@ require ( github.com/multiformats/go-multiaddr v0.13.0 github.com/multiformats/go-multihash v0.2.3 github.com/ollama/ollama v0.3.1 + github.com/onsi/ginkgo/v2 v2.16.0 + github.com/onsi/gomega v1.30.0 github.com/rivo/tview v0.0.0-20240505185119-ed116790de0f github.com/sashabaranov/go-openai v1.28.2 github.com/sirupsen/logrus v1.9.3 @@ -104,6 +106,7 @@ require ( github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect github.com/golang/protobuf v1.5.4 // indirect github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect + github.com/google/go-cmp v0.6.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect @@ -170,7 +173,6 @@ require ( github.com/multiformats/go-multicodec v0.9.0 // indirect github.com/multiformats/go-multistream v0.5.0 // indirect github.com/multiformats/go-varint v0.0.7 // indirect - github.com/onsi/ginkgo/v2 v2.16.0 // indirect github.com/opencontainers/runtime-spec v1.2.0 // indirect github.com/opentracing/opentracing-go v1.2.0 // indirect github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect diff --git a/pkg/config/app.go b/pkg/config/app.go index 2e662f85..198cd800 100644 --- a/pkg/config/app.go +++ b/pkg/config/app.go @@ -110,20 +110,25 @@ type AppConfig struct { // If the unmarshalling fails, the instance is set to nil. // // Subsequent calls to GetInstance will return the same initialized instance. -func GetInstance() *AppConfig { +func GetInstance(options ...Option) *AppConfig { + o := &AppOption{} + o.Apply(options...) + once.Do(func() { instance = &AppConfig{} instance.setDefaultConfig() instance.setEnvVariableConfig() + instance.setFileConfig(viper.GetString("FILE_PATH")) - err := instance.setCommandLineConfig() - if err != nil { - logrus.Fatal(err) + + if !o.DisableCLIParse { + if err := instance.setCommandLineConfig(); err != nil { + logrus.Fatal(err) + } } - err = viper.Unmarshal(instance) - if err != nil { + if err := viper.Unmarshal(instance); err != nil { logrus.Errorf("[-] Unable to unmarshal config into struct, %v", err) instance = nil // Ensure instance is nil if unmarshalling fails } @@ -260,5 +265,9 @@ func (c *AppConfig) LogConfig() { // It returns true if there is at least one bootnode in the Bootnodes slice and it is not an empty string. // Otherwise, it returns false, indicating that no bootnodes are configured. func (c *AppConfig) HasBootnodes() bool { + if len(c.Bootnodes) == 0 { + return false + } + return c.Bootnodes[0] != "" } diff --git a/pkg/config/options.go b/pkg/config/options.go new file mode 100644 index 00000000..7dc14456 --- /dev/null +++ b/pkg/config/options.go @@ -0,0 +1,34 @@ +package config + +type AppOption struct { + DisableCLIParse bool + IsStaked bool + Bootnodes []string + RandomIdentity bool +} + +type Option func(*AppOption) + +var DisableCLIParse = func(o *AppOption) { + o.DisableCLIParse = true +} + +var EnableStaked = func(o *AppOption) { + o.IsStaked = true +} + +var EnableRandomIdentity = func(o *AppOption) { + o.RandomIdentity = true +} + +func (a *AppOption) Apply(opts ...Option) { + for _, opt := range opts { + opt(a) + } +} + +func WithBootNodes(bootnodes ...string) Option { + return func(o *AppOption) { + o.Bootnodes = append(o.Bootnodes, bootnodes...) + } +} diff --git a/pkg/network/kdht.go b/pkg/network/kdht.go index 5bb8a78e..1222e4d4 100644 --- a/pkg/network/kdht.go +++ b/pkg/network/kdht.go @@ -29,8 +29,9 @@ type dbValidator struct{} func (dbValidator) Validate(_ string, _ []byte) error { return nil } func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil } -func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, - protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool) (*dht.IpfsDHT, error) { +func WithDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr, + protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool, publicHexKey string) (*dht.IpfsDHT, error) { + options := make([]dht.Option, 0) options = append(options, dht.BucketSize(100)) // Adjust bucket size options = append(options, dht.Concurrency(100)) // Increase concurrency @@ -128,7 +129,7 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul return } multaddrString := GetPriorityAddress(multiaddr) - _, err = stream.Write(pubsub.GetSelfNodeDataJson(host, isStaked, multaddrString.String())) + _, err = stream.Write(pubsub.GetSelfNodeDataJson(host, isStaked, multaddrString.String(), publicHexKey)) if err != nil { logrus.Errorf("[-] Error writing to stream: %s", err) return diff --git a/pkg/oracle_node.go b/pkg/oracle_node.go index 1089c56f..f1833d1f 100644 --- a/pkg/oracle_node.go +++ b/pkg/oracle_node.go @@ -3,21 +3,23 @@ package masa import ( "bytes" "context" - "crypto/ecdsa" "encoding/base64" "encoding/json" "fmt" - "net" "os" "reflect" "strings" "sync" "time" + ethereumCrypto "github.com/ethereum/go-ethereum/crypto" + + "github.com/ethereum/go-ethereum/common" "github.com/libp2p/go-libp2p" dht "github.com/libp2p/go-libp2p-kad-dht" "github.com/libp2p/go-libp2p/core/host" "github.com/libp2p/go-libp2p/core/network" + "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager" "github.com/libp2p/go-libp2p/p2p/muxer/yamux" @@ -41,7 +43,6 @@ import ( type OracleNode struct { Host host.Host - PrivKey *ecdsa.PrivateKey Protocol protocol.ID priorityAddrs multiaddr.Multiaddr multiAddrs []multiaddr.Multiaddr @@ -62,6 +63,7 @@ type OracleNode struct { WorkerTracker *pubsub2.WorkerEventTracker BlockTracker *BlockEventTracker Blockchain *chain.Chain + options config.AppOption } // GetMultiAddrs returns the priority multiaddr for this node. @@ -76,30 +78,26 @@ func (node *OracleNode) GetMultiAddrs() multiaddr.Multiaddr { return node.priorityAddrs } -// getOutboundIP is a function that returns the outbound IP address of the current machine as a string. -func getOutboundIP() string { - conn, err := net.Dial("udp", "8.8.8.8:80") - if err != nil { - fmt.Println("[-] Error getting outbound IP") - return "" +// GetP2PMultiAddrs returns the multiaddresses for the host in P2P format. +func (node *OracleNode) GetP2PMultiAddrs() ([]multiaddr.Multiaddr, error) { + addrs := node.Host.Addrs() + pi := peer.AddrInfo{ + ID: node.Host.ID(), + Addrs: addrs, } - defer func(conn net.Conn) { - err := conn.Close() - if err != nil { - } - }(conn) - localAddr := conn.LocalAddr().String() - idx := strings.LastIndex(localAddr, ":") - return localAddr[0:idx] + return peer.AddrInfoToP2pAddrs(&pi) } // NewOracleNode creates a new OracleNode instance with the provided context and // staking status. It initializes the libp2p host, DHT, pubsub manager, and other // components needed for an Oracle node to join the network and participate. -func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { +func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, error) { + o := &config.AppOption{} + o.Apply(opts...) + // Start with the default scaling limits. - cfg := config.GetInstance() + cfg := config.GetInstance(opts...) scalingLimits := rcmgr.DefaultLimits concreteLimits := scalingLimits.AutoScale() limiter := rcmgr.NewFixedLimiter(concreteLimits) @@ -111,7 +109,6 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { var addrStr []string libp2pOptions := []libp2p.Option{ - libp2p.Identity(masacrypto.KeyManagerInstance().Libp2pPrivKey), libp2p.ResourceManager(resourceManager), libp2p.Ping(false), // disable built-in ping libp2p.EnableNATService(), @@ -119,6 +116,12 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { libp2p.EnableRelay(), // Enable Circuit Relay v2 with hop } + if o.RandomIdentity { + libp2pOptions = append(libp2pOptions, libp2p.RandomIdentity) + } else { + libp2pOptions = append(libp2pOptions, libp2p.Identity(masacrypto.KeyManagerInstance().Libp2pPrivKey)) + } + securityOptions := []libp2p.Option{ libp2p.Security(noise.ID, noise.New), } @@ -151,14 +154,13 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { return &OracleNode{ Host: hst, - PrivKey: masacrypto.KeyManagerInstance().EcdsaPrivKey, Protocol: config.ProtocolWithVersion(config.OracleProtocol), multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst), Context: ctx, PeerChan: make(chan myNetwork.PeerEvent), NodeTracker: pubsub2.NewNodeEventTracker(versioning.ProtocolVersion, cfg.Environment, hst.ID().String()), PubSubManager: subscriptionManager, - IsStaked: isStaked, + IsStaked: o.IsStaked, IsValidator: cfg.Validator, IsTwitterScraper: cfg.TwitterScraper, IsDiscordScraper: cfg.DiscordScraper, @@ -166,9 +168,24 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { IsWebScraper: cfg.WebScraper, IsLlmServer: cfg.LlmServer, Blockchain: &chain.Chain{}, + options: *o, }, nil } +func (node *OracleNode) generateEthHexKeyForRandomIdentity() (string, error) { + // If it's a random identity, get the pubkey from Libp2p + // and convert these to Ethereum public Hex types + pubkey, err := node.Host.ID().ExtractPublicKey() + if err != nil { + return "", fmt.Errorf("failed to extract public key from p2p identity: %w", err) + } + rawKey, err := pubkey.Raw() + if err != nil { + return "", fmt.Errorf("failed to extract public key from p2p identity: %w", err) + } + return common.BytesToAddress(ethereumCrypto.Keccak256(rawKey[1:])[12:]).Hex(), nil +} + // Start initializes the OracleNode by setting up libp2p stream handlers, // connecting to the DHT and bootnodes, and subscribing to topics. It launches // goroutines to handle discovered peers, listen to the node tracker, and @@ -176,7 +193,7 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) { func (node *OracleNode) Start() (err error) { logrus.Infof("[+] Starting node with ID: %s", node.GetMultiAddrs().String()) - bootNodeAddrs, err := myNetwork.GetBootNodesMultiAddress(config.GetInstance().Bootnodes) + bootNodeAddrs, err := myNetwork.GetBootNodesMultiAddress(append(config.GetInstance().Bootnodes, node.options.Bootnodes...)) if err != nil { return err } @@ -192,7 +209,15 @@ func (node *OracleNode) Start() (err error) { go node.ListenToNodeTracker() go node.handleDiscoveredPeers() - node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked) + var publicKeyHex string + if node.options.RandomIdentity { + publicKeyHex, _ = node.generateEthHexKeyForRandomIdentity() + } else { + publicKeyHex = masacrypto.KeyManagerInstance().EthAddress + } + + node.DHT, err = myNetwork.WithDHT( + node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked, publicKeyHex) if err != nil { return err } @@ -205,7 +230,6 @@ func (node *OracleNode) Start() (err error) { nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String()) if nodeData == nil { - publicKeyHex := masacrypto.KeyManagerInstance().EthAddress ma := myNetwork.GetMultiAddressesForHostQuiet(node.Host) nodeData = pubsub2.NewNodeData(ma[0], node.Host.ID(), publicKeyHex, pubsub2.ActivityJoined) nodeData.IsStaked = node.IsStaked diff --git a/pkg/pubsub/manager.go b/pkg/pubsub/manager.go index 59d1c4eb..9b704b23 100644 --- a/pkg/pubsub/manager.go +++ b/pkg/pubsub/manager.go @@ -9,8 +9,6 @@ import ( pubsub "github.com/libp2p/go-libp2p-pubsub" "github.com/libp2p/go-libp2p/core/host" "github.com/sirupsen/logrus" - - "github.com/masa-finance/masa-oracle/pkg/masacrypto" ) // SubscriptionHandler defines the interface for handling pubsub messages. @@ -20,13 +18,12 @@ type SubscriptionHandler interface { } type Manager struct { - ctx context.Context - topics map[string]*pubsub.Topic - subscriptions map[string]*pubsub.Subscription - handlers map[string]SubscriptionHandler - gossipSub *pubsub.PubSub - host host.Host - PublicKeyPublisher *PublicKeyPublisher // Add this line + ctx context.Context + topics map[string]*pubsub.Topic + subscriptions map[string]*pubsub.Subscription + handlers map[string]SubscriptionHandler + gossipSub *pubsub.PubSub + host host.Host } // NewPubSubManager creates a new PubSubManager instance. @@ -40,17 +37,14 @@ func NewPubSubManager(ctx context.Context, host host.Host) (*Manager, error) { / return nil, err } manager := &Manager{ - ctx: ctx, - subscriptions: make(map[string]*pubsub.Subscription), - topics: make(map[string]*pubsub.Topic), - handlers: make(map[string]SubscriptionHandler), - gossipSub: gossipSub, - host: host, - PublicKeyPublisher: NewPublicKeyPublisher(nil, masacrypto.KeyManagerInstance().Libp2pPubKey), // Initialize PublicKeyPublisher here + ctx: ctx, + subscriptions: make(map[string]*pubsub.Subscription), + topics: make(map[string]*pubsub.Topic), + handlers: make(map[string]SubscriptionHandler), + gossipSub: gossipSub, + host: host, } - manager.PublicKeyPublisher.pubSubManager = manager // Ensure the publisher has a reference back to the manager - return manager, nil } diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index 65ae139d..9917ca39 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -12,8 +12,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/multiformats/go-multiaddr" "github.com/sirupsen/logrus" - - "github.com/masa-finance/masa-oracle/pkg/masacrypto" ) const ( @@ -259,13 +257,13 @@ func (n *NodeData) UpdateAccumulatedUptime() { // It populates a NodeData struct with the node's ID, staking status, and Ethereum address. // The NodeData struct is then marshalled into a JSON byte array. // Returns nil if there is an error marshalling to JSON. -func GetSelfNodeDataJson(host host.Host, isStaked bool, multiaddrString string) []byte { +func GetSelfNodeDataJson(host host.Host, isStaked bool, multiaddrString, publicEthAddress string) []byte { // Create and populate NodeData nodeData := NodeData{ PeerId: host.ID(), MultiaddrsString: multiaddrString, IsStaked: isStaked, - EthAddress: masacrypto.KeyManagerInstance().EthAddress, + EthAddress: publicEthAddress, IsTwitterScraper: config.GetInstance().TwitterScraper, IsDiscordScraper: config.GetInstance().DiscordScraper, IsTelegramScraper: config.GetInstance().TelegramScraper, diff --git a/pkg/pubsub/public_key_topic.go b/pkg/pubsub/public_key_topic.go index 0f4d7c4a..b20b4757 100644 --- a/pkg/pubsub/public_key_topic.go +++ b/pkg/pubsub/public_key_topic.go @@ -3,7 +3,6 @@ package pubsub import ( "encoding/hex" "encoding/json" - "errors" pubsub "github.com/libp2p/go-libp2p-pubsub" libp2pCrypto "github.com/libp2p/go-libp2p/core/crypto" @@ -11,21 +10,12 @@ import ( "github.com/sirupsen/logrus" ) -// topicPublicKeyMap maps topics to their associated public keys. -var topicPublicKeyMap = make(map[string]string) - // PublicKeySubscriptionHandler handles incoming messages on public key topics. type PublicKeySubscriptionHandler struct { PublicKeys []PublicKeyMessage PubKeyTopic *pubsub.Topic } -type PublicKeyPublisher struct { - pubSubManager *Manager - pubKey libp2pCrypto.PubKey - publishedMessages []PublicKeyMessage -} - // PublicKeyMessage represents the structure of the public key messages. type PublicKeyMessage struct { PublicKey string `json:"publicKey"` @@ -33,96 +23,6 @@ type PublicKeyMessage struct { Data string `json:"data"` } -// NewPublicKeyPublisher creates a new instance of PublicKeyPublisher. -func NewPublicKeyPublisher(manager *Manager, pubKey libp2pCrypto.PubKey) *PublicKeyPublisher { - return &PublicKeyPublisher{ - pubSubManager: manager, - pubKey: pubKey, - } -} - -// PublishNodePublicKey publishes the node's public key to the designated topic. -func (p *PublicKeyPublisher) PublishNodePublicKey(publicKey string, data, signature []byte) error { - topicName := "bootNodePublicKey" - logrus.Infof("[+] Publishing node's public key to topic: %s", topicName) - - // Ensure the topic exists or create it - _, err := p.ensureTopic(topicName) - if err != nil { - logrus.WithError(err).Errorf("[-] Failed to ensure topic '%s' exists", topicName) - return err - } - - // Check if a public key has already been published to the topic - existingPubKey, exists := topicPublicKeyMap[topicName] - - if exists { - logrus.Infof("[+] Public key already published for topic: %s. Verifying signature.", topicName) - // If a public key exists, verify the signature against the existing public key - pubKeyBytes, err := hex.DecodeString(existingPubKey) - if err != nil { - logrus.WithError(err).Error("[-] Failed to decode existing public key for verification") - return err - } - pubKey, err := libp2pCrypto.UnmarshalPublicKey(pubKeyBytes) - if err != nil { - logrus.WithError(err).Error("[-] Failed to unmarshal existing public key for verification") - return err - } - isValid, err := pubKey.Verify(data, signature) - if err != nil || !isValid { - logrus.WithError(err).Error("[-] Unauthorized: Failed signature verification or signature is invalid") - return errors.New("unauthorized: only the owner of the public key can publish changes") - } - logrus.Infof("[+] Signature verified successfully for topic: %s", topicName) - } else { - logrus.Infof("[+] No existing public key for topic: %s. Proceeding with initial publication.", topicName) - // If no public key is associated with the topic, this is the initial publication - topicPublicKeyMap[topicName] = publicKey - } - - // Serialize the public key message - msg := PublicKeyMessage{ - PublicKey: publicKey, - Signature: hex.EncodeToString(signature), - Data: string(data), - } - msgBytes, err := json.Marshal(msg) - if err != nil { - logrus.WithError(err).Error("[-] Failed to marshal public key message") - return errors.New("failed to marshal message") - } - - // Use the existing Manager to publish the message - logrus.Infof("[+] Publishing serialized message to topic: %s", topicName) - if err := p.pubSubManager.Publish(topicName, msgBytes); err != nil { - return err - } - // Store the published message in the slice - p.publishedMessages = append(p.publishedMessages, msg) - logrus.Infof("[+] Stored published message for topic: %s", topicName) - - // Print the published data in the console - logrus.Infof("[+] Published data: PublicKey: %s, Signature: %s, Data: %s", msg.PublicKey, msg.Signature, msg.Data) - return nil -} - -// ensureTopic checks if a topic exists and creates it if not. -func (p *PublicKeyPublisher) ensureTopic(topicName string) (*pubsub.Topic, error) { - // Check if the topic already exists - if topic, exists := p.pubSubManager.topics[topicName]; exists { - return topic, nil - } - - // If the topic does not exist, attempt to create it - topic, err := p.pubSubManager.createTopic(topicName) - if err != nil { - return nil, err - } - - return topic, nil -} - // HandleMessage handles incoming public key messages, with verification and update logic. func (handler *PublicKeySubscriptionHandler) HandleMessage(m *pubsub.Message) { logrus.Info("[+] Handling incoming public key message") @@ -180,8 +80,3 @@ func (handler *PublicKeySubscriptionHandler) GetPublicKeys() []PublicKeyMessage logrus.Info("[+] Retrieving stored public keys") return handler.PublicKeys } - -// Optionally, add a method to retrieve the stored messages -func (p *PublicKeyPublisher) GetPublishedMessages() []PublicKeyMessage { - return p.publishedMessages -} diff --git a/pkg/tests/integration/oracle_suite_test.go b/pkg/tests/integration/oracle_suite_test.go new file mode 100644 index 00000000..b2e3566c --- /dev/null +++ b/pkg/tests/integration/oracle_suite_test.go @@ -0,0 +1,13 @@ +package masa_test + +import ( + "testing" + + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +func TestOracle(t *testing.T) { + RegisterFailHandler(Fail) + RunSpecs(t, "Oracle integration test suite") +} diff --git a/pkg/tests/integration/tracker_test.go b/pkg/tests/integration/tracker_test.go new file mode 100644 index 00000000..c45e6b13 --- /dev/null +++ b/pkg/tests/integration/tracker_test.go @@ -0,0 +1,77 @@ +// Nodetracker integration test +package masa_test + +import ( + "context" + "fmt" + + . "github.com/masa-finance/masa-oracle/pkg" + "github.com/masa-finance/masa-oracle/pkg/config" + . "github.com/onsi/ginkgo/v2" + . "github.com/onsi/gomega" +) + +var _ = Describe("Oracle integration tests", func() { + Context("NodeData distribution", func() { + It("is distributed across two nodes", func() { + ctx := context.Background() + ctx, cancel := context.WithCancel(ctx) + defer cancel() + + n, err := NewOracleNode( + ctx, + config.EnableStaked, + config.DisableCLIParse, + config.EnableRandomIdentity, + ) + Expect(err).ToNot(HaveOccurred()) + + err = n.Start() + Expect(err).ToNot(HaveOccurred()) + + addrs, err := n.GetP2PMultiAddrs() + Expect(err).ToNot(HaveOccurred()) + + var bootNodes []string + for _, addr := range addrs { + bootNodes = append(bootNodes, addr.String()) + } + + By(fmt.Sprintf("Generating second node with bootnodes %+v", bootNodes)) + n2, err := NewOracleNode(ctx, + config.EnableStaked, + config.DisableCLIParse, + config.WithBootNodes(bootNodes...), + config.EnableRandomIdentity, + ) + Expect(err).ToNot(HaveOccurred()) + Expect(n.Host.ID()).ToNot(Equal(n2.Host.ID())) + + err = n2.Start() + Expect(err).ToNot(HaveOccurred()) + + // Wait for the nodes to see each others in their respective + // nodeTracker + Eventually(func() bool { + datas := n2.NodeTracker.GetAllNodeData() + return len(datas) == 2 + }, "30s").Should(BeTrue()) + + Eventually(func() bool { + datas := n.NodeTracker.GetAllNodeData() + return len(datas) == 2 + }, "30s").Should(BeTrue()) + + data := n.NodeTracker.GetAllNodeData() + + peerIds := []string{} + for _, d := range data { + peerIds = append(peerIds, d.PeerId.String()) + } + + Expect(peerIds).To(ContainElement(n.Host.ID().String())) + Expect(peerIds).To(ContainElement(n2.Host.ID().String())) + + }) + }) +})