Skip to content

Commit

Permalink
Merge branch 'main' into embed_contracts
Browse files Browse the repository at this point in the history
  • Loading branch information
mudler authored Aug 23, 2024
2 parents ec4cac2 + e8e1822 commit 131a621
Show file tree
Hide file tree
Showing 11 changed files with 208 additions and 162 deletions.
File renamed without changes.
3 changes: 2 additions & 1 deletion cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func main() {
isValidator := cfg.Validator

// Create a new OracleNode
node, err := masa.NewOracleNode(ctx, isStaked)
node, err := masa.NewOracleNode(ctx, config.EnableStaked)
if err != nil {
logrus.Fatal(err)
}
Expand All @@ -78,6 +78,7 @@ func main() {
logrus.Fatal(err)
}

node.NodeTracker.GetAllNodeData()
if cfg.TwitterScraper && cfg.DiscordScraper && cfg.WebScraper {
logrus.Warn("[+] Node is set as all types of scrapers. This may not be intended behavior.")
}
Expand Down
21 changes: 15 additions & 6 deletions pkg/config/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,20 +110,25 @@ type AppConfig struct {
// If the unmarshalling fails, the instance is set to nil.
//
// Subsequent calls to GetInstance will return the same initialized instance.
func GetInstance() *AppConfig {
func GetInstance(options ...Option) *AppConfig {
o := &AppOption{}
o.Apply(options...)

once.Do(func() {
instance = &AppConfig{}

instance.setDefaultConfig()
instance.setEnvVariableConfig()

instance.setFileConfig(viper.GetString("FILE_PATH"))
err := instance.setCommandLineConfig()
if err != nil {
logrus.Fatal(err)

if !o.DisableCLIParse {
if err := instance.setCommandLineConfig(); err != nil {
logrus.Fatal(err)
}
}

err = viper.Unmarshal(instance)
if err != nil {
if err := viper.Unmarshal(instance); err != nil {
logrus.Errorf("[-] Unable to unmarshal config into struct, %v", err)
instance = nil // Ensure instance is nil if unmarshalling fails
}
Expand Down Expand Up @@ -260,5 +265,9 @@ func (c *AppConfig) LogConfig() {
// It returns true if there is at least one bootnode in the Bootnodes slice and it is not an empty string.
// Otherwise, it returns false, indicating that no bootnodes are configured.
func (c *AppConfig) HasBootnodes() bool {
if len(c.Bootnodes) == 0 {
return false
}

return c.Bootnodes[0] != ""
}
34 changes: 34 additions & 0 deletions pkg/config/options.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
package config

type AppOption struct {
DisableCLIParse bool
IsStaked bool
Bootnodes []string
RandomIdentity bool
}

type Option func(*AppOption)

var DisableCLIParse = func(o *AppOption) {
o.DisableCLIParse = true
}

var EnableStaked = func(o *AppOption) {
o.IsStaked = true
}

var EnableRandomIdentity = func(o *AppOption) {
o.RandomIdentity = true
}

func (a *AppOption) Apply(opts ...Option) {
for _, opt := range opts {
opt(a)
}
}

func WithBootNodes(bootnodes ...string) Option {
return func(o *AppOption) {
o.Bootnodes = append(o.Bootnodes, bootnodes...)
}
}
7 changes: 4 additions & 3 deletions pkg/network/kdht.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,8 +29,9 @@ type dbValidator struct{}
func (dbValidator) Validate(_ string, _ []byte) error { return nil }
func (dbValidator) Select(_ string, _ [][]byte) (int, error) { return 0, nil }

func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr,
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool) (*dht.IpfsDHT, error) {
func WithDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Multiaddr,
protocolId, prefix protocol.ID, peerChan chan PeerEvent, isStaked bool, publicHexKey string) (*dht.IpfsDHT, error) {

options := make([]dht.Option, 0)
options = append(options, dht.BucketSize(100)) // Adjust bucket size
options = append(options, dht.Concurrency(100)) // Increase concurrency
Expand Down Expand Up @@ -128,7 +129,7 @@ func WithDht(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.Mul
return
}
multaddrString := GetPriorityAddress(multiaddr)
_, err = stream.Write(pubsub.GetSelfNodeDataJson(host, isStaked, multaddrString.String()))
_, err = stream.Write(pubsub.GetSelfNodeDataJson(host, isStaked, multaddrString.String(), publicHexKey))
if err != nil {
logrus.Errorf("[-] Error writing to stream: %s", err)
return
Expand Down
74 changes: 49 additions & 25 deletions pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,23 @@ package masa
import (
"bytes"
"context"
"crypto/ecdsa"
"encoding/base64"
"encoding/json"
"fmt"
"net"
"os"
"reflect"
"strings"
"sync"
"time"

ethereumCrypto "github.com/ethereum/go-ethereum/crypto"

"github.com/ethereum/go-ethereum/common"
"github.com/libp2p/go-libp2p"
dht "github.com/libp2p/go-libp2p-kad-dht"
"github.com/libp2p/go-libp2p/core/host"
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
rcmgr "github.com/libp2p/go-libp2p/p2p/host/resource-manager"
"github.com/libp2p/go-libp2p/p2p/muxer/yamux"
Expand All @@ -41,7 +43,6 @@ import (

type OracleNode struct {
Host host.Host
PrivKey *ecdsa.PrivateKey
Protocol protocol.ID
priorityAddrs multiaddr.Multiaddr
multiAddrs []multiaddr.Multiaddr
Expand All @@ -62,6 +63,7 @@ type OracleNode struct {
WorkerTracker *pubsub2.WorkerEventTracker
BlockTracker *BlockEventTracker
Blockchain *chain.Chain
options config.AppOption
}

// GetMultiAddrs returns the priority multiaddr for this node.
Expand All @@ -76,30 +78,26 @@ func (node *OracleNode) GetMultiAddrs() multiaddr.Multiaddr {
return node.priorityAddrs
}

// getOutboundIP is a function that returns the outbound IP address of the current machine as a string.
func getOutboundIP() string {
conn, err := net.Dial("udp", "8.8.8.8:80")
if err != nil {
fmt.Println("[-] Error getting outbound IP")
return ""
// GetP2PMultiAddrs returns the multiaddresses for the host in P2P format.
func (node *OracleNode) GetP2PMultiAddrs() ([]multiaddr.Multiaddr, error) {
addrs := node.Host.Addrs()
pi := peer.AddrInfo{
ID: node.Host.ID(),
Addrs: addrs,
}
defer func(conn net.Conn) {
err := conn.Close()
if err != nil {

}
}(conn)
localAddr := conn.LocalAddr().String()
idx := strings.LastIndex(localAddr, ":")
return localAddr[0:idx]
return peer.AddrInfoToP2pAddrs(&pi)
}

// NewOracleNode creates a new OracleNode instance with the provided context and
// staking status. It initializes the libp2p host, DHT, pubsub manager, and other
// components needed for an Oracle node to join the network and participate.
func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {
func NewOracleNode(ctx context.Context, opts ...config.Option) (*OracleNode, error) {
o := &config.AppOption{}
o.Apply(opts...)

// Start with the default scaling limits.
cfg := config.GetInstance()
cfg := config.GetInstance(opts...)
scalingLimits := rcmgr.DefaultLimits
concreteLimits := scalingLimits.AutoScale()
limiter := rcmgr.NewFixedLimiter(concreteLimits)
Expand All @@ -111,14 +109,19 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {

var addrStr []string
libp2pOptions := []libp2p.Option{
libp2p.Identity(masacrypto.KeyManagerInstance().Libp2pPrivKey),
libp2p.ResourceManager(resourceManager),
libp2p.Ping(false), // disable built-in ping
libp2p.EnableNATService(),
libp2p.NATPortMap(),
libp2p.EnableRelay(), // Enable Circuit Relay v2 with hop
}

if o.RandomIdentity {
libp2pOptions = append(libp2pOptions, libp2p.RandomIdentity)
} else {
libp2pOptions = append(libp2pOptions, libp2p.Identity(masacrypto.KeyManagerInstance().Libp2pPrivKey))
}

securityOptions := []libp2p.Option{
libp2p.Security(noise.ID, noise.New),
}
Expand Down Expand Up @@ -151,32 +154,46 @@ func NewOracleNode(ctx context.Context, isStaked bool) (*OracleNode, error) {

return &OracleNode{
Host: hst,
PrivKey: masacrypto.KeyManagerInstance().EcdsaPrivKey,
Protocol: config.ProtocolWithVersion(config.OracleProtocol),
multiAddrs: myNetwork.GetMultiAddressesForHostQuiet(hst),
Context: ctx,
PeerChan: make(chan myNetwork.PeerEvent),
NodeTracker: pubsub2.NewNodeEventTracker(versioning.ProtocolVersion, cfg.Environment, hst.ID().String()),
PubSubManager: subscriptionManager,
IsStaked: isStaked,
IsStaked: o.IsStaked,
IsValidator: cfg.Validator,
IsTwitterScraper: cfg.TwitterScraper,
IsDiscordScraper: cfg.DiscordScraper,
IsTelegramScraper: cfg.TelegramScraper,
IsWebScraper: cfg.WebScraper,
IsLlmServer: cfg.LlmServer,
Blockchain: &chain.Chain{},
options: *o,
}, nil
}

func (node *OracleNode) generateEthHexKeyForRandomIdentity() (string, error) {
// If it's a random identity, get the pubkey from Libp2p
// and convert these to Ethereum public Hex types
pubkey, err := node.Host.ID().ExtractPublicKey()
if err != nil {
return "", fmt.Errorf("failed to extract public key from p2p identity: %w", err)
}
rawKey, err := pubkey.Raw()
if err != nil {
return "", fmt.Errorf("failed to extract public key from p2p identity: %w", err)
}
return common.BytesToAddress(ethereumCrypto.Keccak256(rawKey[1:])[12:]).Hex(), nil
}

// Start initializes the OracleNode by setting up libp2p stream handlers,
// connecting to the DHT and bootnodes, and subscribing to topics. It launches
// goroutines to handle discovered peers, listen to the node tracker, and
// discover peers. If this is a bootnode, it adds itself to the node tracker.
func (node *OracleNode) Start() (err error) {
logrus.Infof("[+] Starting node with ID: %s", node.GetMultiAddrs().String())

bootNodeAddrs, err := myNetwork.GetBootNodesMultiAddress(config.GetInstance().Bootnodes)
bootNodeAddrs, err := myNetwork.GetBootNodesMultiAddress(append(config.GetInstance().Bootnodes, node.options.Bootnodes...))
if err != nil {
return err
}
Expand All @@ -192,7 +209,15 @@ func (node *OracleNode) Start() (err error) {
go node.ListenToNodeTracker()
go node.handleDiscoveredPeers()

node.DHT, err = myNetwork.WithDht(node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked)
var publicKeyHex string
if node.options.RandomIdentity {
publicKeyHex, _ = node.generateEthHexKeyForRandomIdentity()
} else {
publicKeyHex = masacrypto.KeyManagerInstance().EthAddress
}

node.DHT, err = myNetwork.WithDHT(
node.Context, node.Host, bootNodeAddrs, node.Protocol, config.MasaPrefix, node.PeerChan, node.IsStaked, publicKeyHex)
if err != nil {
return err
}
Expand All @@ -205,7 +230,6 @@ func (node *OracleNode) Start() (err error) {

nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String())
if nodeData == nil {
publicKeyHex := masacrypto.KeyManagerInstance().EthAddress
ma := myNetwork.GetMultiAddressesForHostQuiet(node.Host)
nodeData = pubsub2.NewNodeData(ma[0], node.Host.ID(), publicKeyHex, pubsub2.ActivityJoined)
nodeData.IsStaked = node.IsStaked
Expand Down
30 changes: 12 additions & 18 deletions pkg/pubsub/manager.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ import (
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/host"
"github.com/sirupsen/logrus"

"github.com/masa-finance/masa-oracle/pkg/masacrypto"
)

// SubscriptionHandler defines the interface for handling pubsub messages.
Expand All @@ -20,13 +18,12 @@ type SubscriptionHandler interface {
}

type Manager struct {
ctx context.Context
topics map[string]*pubsub.Topic
subscriptions map[string]*pubsub.Subscription
handlers map[string]SubscriptionHandler
gossipSub *pubsub.PubSub
host host.Host
PublicKeyPublisher *PublicKeyPublisher // Add this line
ctx context.Context
topics map[string]*pubsub.Topic
subscriptions map[string]*pubsub.Subscription
handlers map[string]SubscriptionHandler
gossipSub *pubsub.PubSub
host host.Host
}

// NewPubSubManager creates a new PubSubManager instance.
Expand All @@ -40,17 +37,14 @@ func NewPubSubManager(ctx context.Context, host host.Host) (*Manager, error) { /
return nil, err
}
manager := &Manager{
ctx: ctx,
subscriptions: make(map[string]*pubsub.Subscription),
topics: make(map[string]*pubsub.Topic),
handlers: make(map[string]SubscriptionHandler),
gossipSub: gossipSub,
host: host,
PublicKeyPublisher: NewPublicKeyPublisher(nil, masacrypto.KeyManagerInstance().Libp2pPubKey), // Initialize PublicKeyPublisher here
ctx: ctx,
subscriptions: make(map[string]*pubsub.Subscription),
topics: make(map[string]*pubsub.Topic),
handlers: make(map[string]SubscriptionHandler),
gossipSub: gossipSub,
host: host,
}

manager.PublicKeyPublisher.pubSubManager = manager // Ensure the publisher has a reference back to the manager

return manager, nil
}

Expand Down
6 changes: 2 additions & 4 deletions pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"

"github.com/masa-finance/masa-oracle/pkg/masacrypto"
)

const (
Expand Down Expand Up @@ -259,13 +257,13 @@ func (n *NodeData) UpdateAccumulatedUptime() {
// It populates a NodeData struct with the node's ID, staking status, and Ethereum address.
// The NodeData struct is then marshalled into a JSON byte array.
// Returns nil if there is an error marshalling to JSON.
func GetSelfNodeDataJson(host host.Host, isStaked bool, multiaddrString string) []byte {
func GetSelfNodeDataJson(host host.Host, isStaked bool, multiaddrString, publicEthAddress string) []byte {
// Create and populate NodeData
nodeData := NodeData{
PeerId: host.ID(),
MultiaddrsString: multiaddrString,
IsStaked: isStaked,
EthAddress: masacrypto.KeyManagerInstance().EthAddress,
EthAddress: publicEthAddress,
IsTwitterScraper: config.GetInstance().TwitterScraper,
IsDiscordScraper: config.GetInstance().DiscordScraper,
IsTelegramScraper: config.GetInstance().TelegramScraper,
Expand Down
Loading

0 comments on commit 131a621

Please sign in to comment.