Skip to content

Commit

Permalink
Merge branch 'main' into fix-multiaddress-not-constructed-on-startup
Browse files Browse the repository at this point in the history
  • Loading branch information
restevens402 committed Aug 23, 2024
2 parents b49a1e8 + e8e1822 commit aebdd6e
Show file tree
Hide file tree
Showing 11 changed files with 196 additions and 155 deletions.
File renamed without changes.
3 changes: 2 additions & 1 deletion cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ func main() {
isValidator := cfg.Validator

// Create a new OracleNode
node, err := masa.NewOracleNode(ctx, isStaked)
node, err := masa.NewOracleNode(ctx, config.EnableStaked)
if err != nil {
logrus.Fatal(err)
}
Expand All @@ -80,6 +80,7 @@ func main() {
logrus.Fatal(err)
}

node.NodeTracker.GetAllNodeData()
if cfg.TwitterScraper && cfg.DiscordScraper && cfg.WebScraper {
logrus.Warn("[+] Node is set as all types of scrapers. This may not be intended behavior.")
}
Expand Down
4 changes: 3 additions & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,8 @@ require (
github.com/multiformats/go-multiaddr v0.13.0
github.com/multiformats/go-multihash v0.2.3
github.com/ollama/ollama v0.3.1
github.com/onsi/ginkgo/v2 v2.16.0
github.com/onsi/gomega v1.30.0
github.com/rivo/tview v0.0.0-20240505185119-ed116790de0f
github.com/sashabaranov/go-openai v1.28.2
github.com/sirupsen/logrus v1.9.3
Expand Down Expand Up @@ -104,6 +106,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/golang/protobuf v1.5.4 // indirect
github.com/golang/snappy v0.0.5-0.20220116011046-fa5810519dcb // indirect
github.com/google/go-cmp v0.6.0 // indirect
github.com/google/gopacket v1.1.19 // indirect
github.com/google/pprof v0.0.0-20240207164012-fb44976bdcd5 // indirect
github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect
Expand Down Expand Up @@ -170,7 +173,6 @@ require (
github.com/multiformats/go-multicodec v0.9.0 // indirect
github.com/multiformats/go-multistream v0.5.0 // indirect
github.com/multiformats/go-varint v0.0.7 // indirect
github.com/onsi/ginkgo/v2 v2.16.0 // indirect
github.com/opencontainers/runtime-spec v1.2.0 // indirect
github.com/opentracing/opentracing-go v1.2.0 // indirect
github.com/pbnjay/memory v0.0.0-20210728143218-7b4eea64cf58 // indirect
Expand Down
21 changes: 15 additions & 6 deletions pkg/config/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,25 @@ type AppConfig struct {
// If the unmarshalling fails, the instance is set to nil.
//
// Subsequent calls to GetInstance will return the same initialized instance.
func GetInstance() *AppConfig {
func GetInstance(options ...Option) *AppConfig {
o := &AppOption{}
o.Apply(options...)

once.Do(func() {
instance = &AppConfig{}

instance.setDefaultConfig()
instance.setEnvVariableConfig()

instance.setFileConfig(viper.GetString("FILE_PATH"))
err := instance.setCommandLineConfig()
if err != nil {
logrus.Fatal(err)

if !o.DisableCLIParse {
if err := instance.setCommandLineConfig(); err != nil {
logrus.Fatal(err)
}
}

err = viper.Unmarshal(instance)
if err != nil {
if err := viper.Unmarshal(instance); err != nil {
logrus.Errorf("[-] Unable to unmarshal config into struct, %v", err)
instance = nil // Ensure instance is nil if unmarshalling fails
}
Expand Down Expand Up @@ -260,5 +265,9 @@ func (c *AppConfig) LogConfig() {
// It returns true if there is at least one bootnode in the Bootnodes slice and it is not an empty string.
// Otherwise, it returns false, indicating that no bootnodes are configured.
func (c *AppConfig) HasBootnodes() bool {
if len(c.Bootnodes) == 0 {
return false
}

return c.Bootnodes[0] != ""
}
34 changes: 34 additions & 0 deletions pkg/config/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package config

type AppOption struct {
DisableCLIParse bool
IsStaked bool
Bootnodes []string
RandomIdentity bool
}

type Option func(*AppOption)

var DisableCLIParse = func(o *AppOption) {
o.DisableCLIParse = true
}

var EnableStaked = func(o *AppOption) {
o.IsStaked = true
}

var EnableRandomIdentity = func(o *AppOption) {
o.RandomIdentity = true
}

func (a *AppOption) Apply(opts ...Option) {
for _, opt := range opts {
opt(a)
}
}

func WithBootNodes(bootnodes ...string) Option {
return func(o *AppOption) {
o.Bootnodes = append(o.Bootnodes, bootnodes...)
}
}
61 changes: 39 additions & 22 deletions pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ package masa
import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/base64"
"encoding/json"
"fmt"
"net"
"os"
"reflect"
"strings"
"sync"
"time"

ethereumCrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
Expand All @@ -41,7 +43,6 @@ import (

type OracleNode struct {
Host host.Host
PrivKey *ecdsa.PrivateKey
Protocol protocol.ID
priorityAddrs multiaddr.Multiaddr
multiAddrs []multiaddr.Multiaddr
Expand All @@ -62,6 +63,7 @@ type OracleNode struct {
WorkerTracker *pubsub2.WorkerEventTracker
BlockTracker *BlockEventTracker
Blockchain *chain.Chain
options config.AppOption
}

// GetMultiAddrs returns the priority multiaddr for this node.
Expand All @@ -76,30 +78,26 @@ func (node *OracleNode) GetMultiAddrs() multiaddr.Multiaddr {
return node.priorityAddrs
}

// getOutboundIP is a function that returns the outbound IP address of the current machine as a string.
func getOutboundIP() string {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
fmt.Println("[-] Error getting outbound IP")
return ""
// GetP2PMultiAddrs returns the multiaddresses for the host in P2P format.
func (node *OracleNode) GetP2PMultiAddrs() ([]multiaddr.Multiaddr, error) {
addrs := node.Host.Addrs()
pi := peer.AddrInfo{
ID: node.Host.ID(),
Addrs: addrs,
}
defer func(conn net.Conn) {
err := conn.Close()
if err != nil {

}
}(conn)
localAddr := conn.LocalAddr().String()
idx := strings.LastIndex(localAddr, ":")
return localAddr[0:idx]
return peer.AddrInfoToP2pAddrs(&pi)
}

// NewOracleNode creates a new OracleNode instance with the provided context and
// staking status. It initializes the libp2p host, DHT, pubsub manager, and other
// components needed for an Oracle node to join the network and participate.
func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {
func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, error) {
o := &config.AppOption{}
o.Apply(opts...)

// Start with the default scaling limits.
cfg := config.GetInstance()
cfg := config.GetInstance(opts...)
scalingLimits := rcmgr.DefaultLimits
concreteLimits := scalingLimits.AutoScale()
limiter := rcmgr.NewFixedLimiter(concreteLimits)
Expand All @@ -111,14 +109,19 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {

var addrStr []string
libp2pOptions := []libp2p.Option{
libp2p.Identity(masacrypto.KeyManagerInstance().Libp2pPrivKey),
libp2p.ResourceManager(resourceManager),
libp2p.Ping(false), // disable built-in ping
libp2p.EnableNATService(),
libp2p.NATPortMap(),
libp2p.EnableRelay(), // Enable Circuit Relay v2 with hop
}

if o.RandomIdentity {
libp2pOptions = append(libp2pOptions, libp2p.RandomIdentity)
} else {
libp2pOptions = append(libp2pOptions, libp2p.Identity(masacrypto.KeyManagerInstance().Libp2pPrivKey))
}

securityOptions := []libp2p.Option{
libp2p.Security(noise.ID, noise.New),
}
Expand Down Expand Up @@ -151,24 +154,38 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {

return &OracleNode{
Host: hst,
PrivKey: masacrypto.KeyManagerInstance().EcdsaPrivKey,
Protocol: config.ProtocolWithVersion(config.OracleProtocol),
multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst),
Context: ctx,
PeerChan: make(chan myNetwork.PeerEvent),
NodeTracker: pubsub2.NewNodeEventTracker(versioning.ProtocolVersion, cfg.Environment, hst.ID().String()),
PubSubManager: subscriptionManager,
IsStaked: isStaked,
IsStaked: o.IsStaked,
IsValidator: cfg.Validator,
IsTwitterScraper: cfg.TwitterScraper,
IsDiscordScraper: cfg.DiscordScraper,
IsTelegramScraper: cfg.TelegramScraper,
IsWebScraper: cfg.WebScraper,
IsLlmServer: cfg.LlmServer,
Blockchain: &chain.Chain{},
options: *o,
}, nil
}

func (node *OracleNode) generateEthHexKeyForRandomIdentity() (string, error) {
// If it's a random identity, get the pubkey from Libp2p
// and convert these to Ethereum public Hex types
pubkey, err := node.Host.ID().ExtractPublicKey()
if err != nil {
return "", fmt.Errorf("failed to extract public key from p2p identity: %w", err)
}
rawKey, err := pubkey.Raw()
if err != nil {
return "", fmt.Errorf("failed to extract public key from p2p identity: %w", err)
}
return common.BytesToAddress(ethereumCrypto.Keccak256(rawKey[1:])[12:]).Hex(), nil
}

// Start initializes the OracleNode by setting up libp2p stream handlers,
// connecting to the DHT and bootnodes, and subscribing to topics. It launches
// goroutines to handle discovered peers, listen to the node tracker, and
Expand Down
30 changes: 12 additions & 18 deletions pkg/pubsub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/sirupsen/logrus"

"github.com/masa-finance/masa-oracle/pkg/masacrypto"
)

// SubscriptionHandler defines the interface for handling pubsub messages.
Expand All @@ -20,13 +18,12 @@ type SubscriptionHandler interface {
}

type Manager struct {
ctx context.Context
topics map[string]*pubsub.Topic
subscriptions map[string]*pubsub.Subscription
handlers map[string]SubscriptionHandler
gossipSub *pubsub.PubSub
host host.Host
PublicKeyPublisher *PublicKeyPublisher // Add this line
ctx context.Context
topics map[string]*pubsub.Topic
subscriptions map[string]*pubsub.Subscription
handlers map[string]SubscriptionHandler
gossipSub *pubsub.PubSub
host host.Host
}

// NewPubSubManager creates a new PubSubManager instance.
Expand All @@ -40,17 +37,14 @@ func NewPubSubManager(ctx context.Context, host host.Host) (*Manager, error) { /
return nil, err
}
manager := &Manager{
ctx: ctx,
subscriptions: make(map[string]*pubsub.Subscription),
topics: make(map[string]*pubsub.Topic),
handlers: make(map[string]SubscriptionHandler),
gossipSub: gossipSub,
host: host,
PublicKeyPublisher: NewPublicKeyPublisher(nil, masacrypto.KeyManagerInstance().Libp2pPubKey), // Initialize PublicKeyPublisher here
ctx: ctx,
subscriptions: make(map[string]*pubsub.Subscription),
topics: make(map[string]*pubsub.Topic),
handlers: make(map[string]SubscriptionHandler),
gossipSub: gossipSub,
host: host,
}

manager.PublicKeyPublisher.pubSubManager = manager // Ensure the publisher has a reference back to the manager

return manager, nil
}

Expand Down
3 changes: 1 addition & 2 deletions pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,13 +7,12 @@ import (

"github.com/masa-finance/masa-oracle/internal/versioning"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/masacrypto"

"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"

"github.com/masa-finance/masa-oracle/pkg/masacrypto"
)

const (
Expand Down
Loading

0 comments on commit aebdd6e

Please sign in to comment.