Skip to content

Commit

Permalink
Move Options handling to the config package
Browse files Browse the repository at this point in the history
  • Loading branch information
mcamou committed Nov 13, 2024
1 parent b839305 commit 898b542
Show file tree
Hide file tree
Showing 15 changed files with 130 additions and 66 deletions.
23 changes: 9 additions & 14 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@ import (
"github.com/masa-finance/masa-oracle/pkg/api"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/db"
"github.com/masa-finance/masa-oracle/pkg/masacrypto"
"github.com/masa-finance/masa-oracle/pkg/staking"
)

Expand All @@ -34,19 +33,15 @@ func main() {
if err != nil {
logrus.Fatalf("[-] %v", err)

Check warning on line 34 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L34

Added line #L34 was not covered by tests
}
cfg.LogConfig()
cfg.SetupLogging()

keyManager, err := masacrypto.NewKeyManager(cfg.PrivateKey, cfg.PrivateKeyFile)
if err != nil {
logrus.Fatal("[-] Failed to initialize keys:", err)
}
cfg.SetupLogging()
cfg.LogConfig()

Check warning on line 39 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L37-L39

Added lines #L37 - L39 were not covered by tests
// Create a cancellable context
ctx, cancel := context.WithCancel(context.Background())

if cfg.Faucet {
err := handleFaucet(cfg.RpcUrl, keyManager.EcdsaPrivKey)
err := handleFaucet(cfg.RpcUrl, cfg.KeyManager.EcdsaPrivKey)

Check warning on line 44 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L44

Added line #L44 was not covered by tests
if err != nil {
logrus.Errorf("[-] %v", err)
os.Exit(1)
Expand All @@ -57,7 +52,7 @@ func main() {
}

if cfg.StakeAmount != "" {
err := handleStaking(cfg.RpcUrl, keyManager.EcdsaPrivKey, cfg.StakeAmount)
err := handleStaking(cfg.RpcUrl, cfg.KeyManager.EcdsaPrivKey, cfg.StakeAmount)

Check warning on line 55 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L55

Added line #L55 was not covered by tests
if err != nil {
logrus.Warningf("%v", err)
} else {
Expand All @@ -67,7 +62,7 @@ func main() {
}

// Verify the staking event
isStaked, err := staking.VerifyStakingEvent(cfg.RpcUrl, keyManager.EthAddress)
isStaked, err := staking.VerifyStakingEvent(cfg.RpcUrl, cfg.KeyManager.EthAddress)

Check warning on line 65 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L65

Added line #L65 was not covered by tests
if err != nil {
logrus.Error(err)
}
Expand All @@ -76,7 +71,7 @@ func main() {
logrus.Warn("No staking event found for this address")
}

masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg, keyManager)
masaNodeOptions, workHandlerManager, pubKeySub := config.InitOptions(cfg)

Check warning on line 74 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L74

Added line #L74 was not covered by tests
// Create a new OracleNode
masaNode, err := node.NewOracleNode(ctx, masaNodeOptions...)

Expand All @@ -94,14 +89,14 @@ func main() {

if cfg.AllowedPeer {
cfg.AllowedPeerId = masaNode.Host.ID().String()
cfg.AllowedPeerPublicKey = keyManager.HexPubKey
cfg.AllowedPeerPublicKey = cfg.KeyManager.HexPubKey

Check warning on line 92 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L92

Added line #L92 was not covered by tests
logrus.Infof("[+] Allowed peer with ID: %s and PubKey: %s", cfg.AllowedPeerId, cfg.AllowedPeerPublicKey)
} else {
logrus.Warn("[-] This node is not set as the allowed peer")
}

// Init cache resolver
db.InitResolverCache(masaNode, keyManager, cfg.AllowedPeerId, cfg.AllowedPeerPublicKey, cfg.Validator)
db.InitResolverCache(masaNode, cfg.KeyManager, cfg.AllowedPeerId, cfg.AllowedPeerPublicKey, cfg.Validator)

Check warning on line 99 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L99

Added line #L99 was not covered by tests

// Cancel the context when SIGINT is received
go handleSignals(cancel, masaNode, cfg)
Expand All @@ -125,7 +120,7 @@ func main() {
logrus.Errorf("[-] Error while getting node IP address from %v: %v", multiAddr, err)
}
// Display the welcome message with the multiaddress and IP address
config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, cfg.Validator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)
config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, cfg.KeyManager.EthAddress, isStaked, cfg.Validator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)

Check warning on line 123 in cmd/masa-node/main.go

View check run for this annotation

Codecov / codecov/patch

cmd/masa-node/main.go#L123

Added line #L123 was not covered by tests

<-ctx.Done()
}
Expand Down
46 changes: 45 additions & 1 deletion node/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,15 @@ type NodeOption struct {
Version string
MasaDir string
CachePath string
KeyManager *masacrypto.KeyManager

OracleProtocol string
NodeDataSyncProtocol string
NodeGossipTopic string
Rendezvous string
WorkerProtocol string
PageSize int

KeyManager *masacrypto.KeyManager
}

type PubSubHandlers struct {
Expand Down Expand Up @@ -167,3 +175,39 @@ func WithKeyManager(km *masacrypto.KeyManager) Option {
o.KeyManager = km
}
}

func WithOracleProtocol(s string) Option {
return func(o *NodeOption) {
o.OracleProtocol = s
}

Check warning on line 182 in node/options.go

View check run for this annotation

Codecov / codecov/patch

node/options.go#L179-L182

Added lines #L179 - L182 were not covered by tests
}

func WithNodeDataSyncProtocol(s string) Option {
return func(o *NodeOption) {
o.NodeDataSyncProtocol = s
}

Check warning on line 188 in node/options.go

View check run for this annotation

Codecov / codecov/patch

node/options.go#L185-L188

Added lines #L185 - L188 were not covered by tests
}

func WithNodeGossipTopic(s string) Option {
return func(o *NodeOption) {
o.NodeGossipTopic = s
}

Check warning on line 194 in node/options.go

View check run for this annotation

Codecov / codecov/patch

node/options.go#L191-L194

Added lines #L191 - L194 were not covered by tests
}

func WithRendezvous(s string) Option {
return func(o *NodeOption) {
o.Rendezvous = s
}

Check warning on line 200 in node/options.go

View check run for this annotation

Codecov / codecov/patch

node/options.go#L197-L200

Added lines #L197 - L200 were not covered by tests
}

func WithWorkerProtocol(s string) Option {
return func(o *NodeOption) {
o.WorkerProtocol = s
}

Check warning on line 206 in node/options.go

View check run for this annotation

Codecov / codecov/patch

node/options.go#L203-L206

Added lines #L203 - L206 were not covered by tests
}

func WithPageSize(size int) Option {
return func(o *NodeOption) {
o.PageSize = size
}

Check warning on line 212 in node/options.go

View check run for this annotation

Codecov / codecov/patch

node/options.go#L209-L212

Added lines #L209 - L212 were not covered by tests
}
10 changes: 4 additions & 6 deletions node/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (

"github.com/masa-finance/masa-oracle/internal/versioning"
"github.com/masa-finance/masa-oracle/pkg/chain"
"github.com/masa-finance/masa-oracle/pkg/config"
myNetwork "github.com/masa-finance/masa-oracle/pkg/network"
"github.com/masa-finance/masa-oracle/pkg/pubsub"
)
Expand All @@ -47,7 +46,6 @@ type OracleNode struct {
Blockchain *chain.Chain
Options NodeOption
Context context.Context
Config *config.AppConfig
}

// GetMultiAddrs returns the priority multiaddr for this node.
Expand Down Expand Up @@ -150,7 +148,7 @@ func NewOracleNode(ctx context.Context, opts ...Option) (*OracleNode, error) {
Options: *o,
}

n.Protocol = n.protocolWithVersion(config.OracleProtocol)
n.Protocol = n.protocolWithVersion(n.Options.OracleProtocol)

Check warning on line 151 in node/oracle_node.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node.go#L151

Added line #L151 was not covered by tests
return n, nil
}

Expand Down Expand Up @@ -204,7 +202,7 @@ func (node *OracleNode) Start() (err error) {
logrus.Infof("[+] Starting node with ID: %s", node.GetMultiAddrs().String())

node.Host.SetStreamHandler(node.Protocol, node.handleStream)
node.Host.SetStreamHandler(node.protocolWithVersion(config.NodeDataSyncProtocol), node.ReceiveNodeData)
node.Host.SetStreamHandler(node.protocolWithVersion(node.Options.NodeDataSyncProtocol), node.ReceiveNodeData)

Check warning on line 205 in node/oracle_node.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node.go#L205

Added line #L205 was not covered by tests

for pid, n := range node.Options.ProtocolHandlers {
node.Host.SetStreamHandler(pid, n)
Expand All @@ -215,7 +213,7 @@ func (node *OracleNode) Start() (err error) {
}

if node.Options.IsStaked {
node.Host.SetStreamHandler(node.protocolWithVersion(config.NodeGossipTopic), node.GossipNodeData)
node.Host.SetStreamHandler(node.protocolWithVersion(node.Options.NodeGossipTopic), node.GossipNodeData)

Check warning on line 216 in node/oracle_node.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node.go#L216

Added line #L216 was not covered by tests
}

node.Host.Network().Notify(node.NodeTracker)
Expand All @@ -236,7 +234,7 @@ func (node *OracleNode) Start() (err error) {
return err
}

err = myNetwork.EnableMDNS(node.Host, config.Rendezvous, node.PeerChan)
err = myNetwork.EnableMDNS(node.Host, node.Options.Rendezvous, node.PeerChan)

Check warning on line 237 in node/oracle_node.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node.go#L237

Added line #L237 was not covered by tests
if err != nil {
return err
}
Expand Down
13 changes: 6 additions & 7 deletions node/oracle_node_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,6 @@ import (
"github.com/libp2p/go-libp2p/core/peer"
"github.com/sirupsen/logrus"

"github.com/masa-finance/masa-oracle/pkg/config"
pubsub2 "github.com/masa-finance/masa-oracle/pkg/pubsub"
)

Expand Down Expand Up @@ -42,7 +41,7 @@ func (node *OracleNode) ListenToNodeTracker() {
continue
}
// Publish the JSON data on the node.topic
err = node.PublishTopic(config.NodeGossipTopic, jsonData)
err = node.PublishTopic(node.Options.NodeGossipTopic, jsonData)

Check warning on line 44 in node/oracle_node_listener.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node_listener.go#L44

Added line #L44 was not covered by tests
if err != nil {
logrus.Errorf("[-] Error publishing node data: %v", err)
}
Expand Down Expand Up @@ -88,10 +87,10 @@ type NodeDataPage struct {
func (node *OracleNode) SendNodeDataPage(allNodeData []pubsub2.NodeData, stream network.Stream, pageNumber int) {
logrus.Debugf("[+] SendNodeDataPage --> %s: Page: %d", stream.Conn().RemotePeer(), pageNumber)
totalRecords := len(allNodeData)
totalPages := int(math.Ceil(float64(totalRecords) / config.PageSize))
totalPages := int(math.Ceil(float64(totalRecords) / float64(node.Options.PageSize)))

Check warning on line 90 in node/oracle_node_listener.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node_listener.go#L90

Added line #L90 was not covered by tests

startIndex := pageNumber * config.PageSize
endIndex := startIndex + config.PageSize
startIndex := pageNumber * node.Options.PageSize
endIndex := startIndex + node.Options.PageSize

Check warning on line 93 in node/oracle_node_listener.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node_listener.go#L92-L93

Added lines #L92 - L93 were not covered by tests
if endIndex > totalRecords {
endIndex = totalRecords
}
Expand Down Expand Up @@ -133,9 +132,9 @@ func (node *OracleNode) SendNodeData(peerID peer.ID) {
nodeData = node.NodeTracker.GetUpdatedNodes(sinceTime)
}
totalRecords := len(nodeData)
totalPages := int(math.Ceil(float64(totalRecords) / float64(config.PageSize)))
totalPages := int(math.Ceil(float64(totalRecords) / float64(node.Options.PageSize)))

Check warning on line 135 in node/oracle_node_listener.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node_listener.go#L135

Added line #L135 was not covered by tests

stream, err := node.Host.NewStream(node.Context, peerID, node.protocolWithVersion(config.NodeDataSyncProtocol))
stream, err := node.Host.NewStream(node.Context, peerID, node.protocolWithVersion(node.Options.NodeDataSyncProtocol))

Check warning on line 137 in node/oracle_node_listener.go

View check run for this annotation

Codecov / codecov/patch

node/oracle_node_listener.go#L137

Added line #L137 was not covered by tests
if err != nil {
// node.NodeTracker.RemoveNodeData(peerID.String())
return
Expand Down
3 changes: 1 addition & 2 deletions node/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,6 @@ import (
"github.com/libp2p/go-libp2p/core/network"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/libp2p/go-libp2p/core/protocol"
"github.com/masa-finance/masa-oracle/pkg/config"
)

const (
Expand Down Expand Up @@ -50,7 +49,7 @@ func (node *OracleNode) subscribeToTopics() error {
}

// Subscribe to NodeGossipTopic to participate in the network's gossip protocol.
if err := node.SubscribeTopic(config.NodeGossipTopic, node.NodeTracker, false); err != nil {
if err := node.SubscribeTopic(node.Options.NodeGossipTopic, node.NodeTracker, false); err != nil {

Check warning on line 52 in node/protocol.go

View check run for this annotation

Codecov / codecov/patch

node/protocol.go#L52

Added line #L52 was not covered by tests
return err
}

Expand Down
13 changes: 10 additions & 3 deletions pkg/config/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"strings"

"github.com/masa-finance/masa-oracle/internal/versioning"
"github.com/masa-finance/masa-oracle/pkg/masacrypto"

"github.com/gotd/contrib/bg"
"github.com/joho/godotenv"
Expand Down Expand Up @@ -58,6 +59,7 @@ type AppConfig struct {
WebScraper bool `mapstructure:"webScraper"`
APIEnabled bool `mapstructure:"api_enabled"`

KeyManager *masacrypto.KeyManager
TelegramStop bg.StopFunc
}

Expand All @@ -72,22 +74,27 @@ type AppConfig struct {
// In case of any errors it returns nill with an error object
func GetConfig() (*AppConfig, error) {
instance := &AppConfig{}

instance.setDefaultConfig()
// TODO Shouldn't the env vars override the file config, instead of the other way around?
instance.setEnvVariableConfig()
instance.setFileConfig(viper.GetString("FILE_PATH"))

if err := instance.setCommandLineConfig(); err != nil {
return nil, fmt.Errorf("Error while setting command line options: %v", err)
return nil, err
}

Check warning on line 84 in pkg/config/app.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/app.go#L75-L84

Added lines #L75 - L84 were not covered by tests

if err := viper.Unmarshal(instance); err != nil {
return nil, fmt.Errorf("Unable to unmarshal config into struct: %v", err)
return nil, fmt.Errorf("Unable to unmarshal config into struct, %v", err)
}

Check warning on line 88 in pkg/config/app.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/app.go#L86-L88

Added lines #L86 - L88 were not covered by tests

instance.APIEnabled = viper.GetBool("api_enabled")

Check warning on line 90 in pkg/config/app.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/app.go#L90

Added line #L90 was not covered by tests

keyManager, err := masacrypto.NewKeyManager(instance.PrivateKey, instance.PrivateKeyFile)
if err != nil {
return nil, fmt.Errorf("Failed to initialize keys: %v", err)
}
instance.KeyManager = keyManager

Check warning on line 96 in pkg/config/app.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/app.go#L92-L96

Added lines #L92 - L96 were not covered by tests

return instance, nil

Check warning on line 98 in pkg/config/app.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/app.go#L98

Added line #L98 was not covered by tests
}

Expand Down
41 changes: 26 additions & 15 deletions cmd/masa-node/config.go → pkg/config/options.go
Original file line number Diff line number Diff line change
@@ -1,16 +1,27 @@
package main
package config

import (
"github.com/masa-finance/masa-oracle/node"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/masacrypto"
pubsub "github.com/masa-finance/masa-oracle/pkg/pubsub"
"github.com/masa-finance/masa-oracle/pkg/pubsub"
"github.com/masa-finance/masa-oracle/pkg/workers"
)

func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) {
var constantOptions = []node.Option{
node.WithOracleProtocol(OracleProtocol),
node.WithNodeDataSyncProtocol(NodeDataSyncProtocol),
node.WithNodeGossipTopic(NodeGossipTopic),
node.WithRendezvous(Rendezvous),
node.WithPageSize(PageSize),
}

// WithConstantOptions adds options that are set to constant values. We need to add them to
// the node to avoid a dependency loop.
func WithConstantOptions(nodes ...node.Option) []node.Option {
return append(nodes, constantOptions...)
}

func InitOptions(cfg *AppConfig) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) {

Check warning on line 23 in pkg/config/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/options.go#L23

Added line #L23 was not covered by tests
// WorkerManager configuration
// TODO: this needs to be moved under config, but now it's here as there are import cycles given singletons
workerManagerOptions := []workers.WorkerOptionFunc{
workers.WithMasaDir(cfg.MasaDir),
}
Expand All @@ -20,25 +31,26 @@ func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]no
cachePath = cfg.MasaDir + "/cache"
}

masaNodeOptions := []node.Option{
masaNodeOptions := WithConstantOptions(

Check warning on line 34 in pkg/config/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/options.go#L34

Added line #L34 was not covered by tests
node.EnableStaked,
// config.WithService(),
// WithService(),

Check warning on line 36 in pkg/config/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/options.go#L36

Added line #L36 was not covered by tests
node.WithEnvironment(cfg.Environment),
node.WithVersion(cfg.Version),
node.WithPort(cfg.PortNbr),
node.WithBootNodes(cfg.Bootnodes...),
node.WithMasaDir(cfg.MasaDir),
node.WithCachePath(cachePath),
node.WithKeyManager(keyManager),
}
node.WithKeyManager(cfg.KeyManager),
node.WithWorkerProtocol(WorkerProtocol),
)

Check warning on line 45 in pkg/config/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/options.go#L43-L45

Added lines #L43 - L45 were not covered by tests

if cfg.TwitterScraper {
workerManagerOptions = append(workerManagerOptions, workers.EnableTwitterWorker)
masaNodeOptions = append(masaNodeOptions, node.IsTwitterScraper)
}

if cfg.TelegramScraper {
// XXX: Telegram scraper is not implemented yet in the worker (?)
// TODO: Telegram scraper is not implemented yet in the worker (?)

Check warning on line 53 in pkg/config/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/options.go#L53

Added line #L53 was not covered by tests
masaNodeOptions = append(masaNodeOptions, node.IsTelegramScraper)
}

Expand All @@ -56,15 +68,14 @@ func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]no
blockChainEventTracker := node.NewBlockChain()
pubKeySub := &pubsub.PublicKeySubscriptionHandler{}

// TODO: Where the config is involved, move to the config the generation of Node options
masaNodeOptions = append(masaNodeOptions, []node.Option{
// Register the worker manager
node.WithMasaProtocolHandler(
config.WorkerProtocol,
WorkerProtocol,

Check warning on line 74 in pkg/config/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/options.go#L74

Added line #L74 was not covered by tests
workHandlerManager.HandleWorkerStream,
),
node.WithPubSubHandler(config.PublicKeyTopic, pubKeySub, false),
node.WithPubSubHandler(config.BlockTopic, blockChainEventTracker, true),
node.WithPubSubHandler(PublicKeyTopic, pubKeySub, false),
node.WithPubSubHandler(BlockTopic, blockChainEventTracker, true),

Check warning on line 78 in pkg/config/options.go

View check run for this annotation

Codecov / codecov/patch

pkg/config/options.go#L77-L78

Added lines #L77 - L78 were not covered by tests
}...)

if cfg.Validator {
Expand Down
1 change: 1 addition & 0 deletions pkg/network/kdht.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@ func EnableDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.M
options = append(options, dht.RoutingTableRefreshPeriod(time.Minute*5)) // Set refresh interval
options = append(options, dht.Mode(dht.ModeAutoServer))
options = append(options, dht.ProtocolPrefix(prefix))
// WTF: Why?
options = append(options, dht.NamespacedValidator("db", dbValidator{}))

kademliaDHT, err := dht.New(ctx, host, options...)
Expand Down
Loading

0 comments on commit 898b542

Please sign in to comment.