From 898b54235bed429a7ac768e3a61ee3f9978d3f4a Mon Sep 17 00:00:00 2001 From: mcamou Date: Wed, 13 Nov 2024 18:35:11 +0100 Subject: [PATCH] Move Options handling to the config package --- cmd/masa-node/main.go | 23 ++++------ node/options.go | 46 ++++++++++++++++++- node/oracle_node.go | 10 ++-- node/oracle_node_listener.go | 13 +++--- node/protocol.go | 3 +- pkg/config/app.go | 13 ++++-- .../config.go => pkg/config/options.go | 41 +++++++++++------ pkg/network/kdht.go | 1 + pkg/pubsub/manager.go | 4 +- pkg/pubsub/node_event_tracker.go | 8 +++- pkg/tests/integration/tracker_test.go | 18 +++++--- pkg/tests/node_data/node_data_tracker_test.go | 7 ++- pkg/workers/handlers/web.go | 1 - pkg/workers/types/work_types.go | 5 +- pkg/workers/worker_manager.go | 3 +- 15 files changed, 130 insertions(+), 66 deletions(-) rename cmd/masa-node/config.go => pkg/config/options.go (68%) diff --git a/cmd/masa-node/main.go b/cmd/masa-node/main.go index be0c0d6d..176833a5 100644 --- a/cmd/masa-node/main.go +++ b/cmd/masa-node/main.go @@ -16,7 +16,6 @@ import ( "github.com/masa-finance/masa-oracle/pkg/api" "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/db" - "github.com/masa-finance/masa-oracle/pkg/masacrypto" "github.com/masa-finance/masa-oracle/pkg/staking" ) @@ -34,19 +33,15 @@ func main() { if err != nil { logrus.Fatalf("[-] %v", err) } - cfg.LogConfig() - cfg.SetupLogging() - keyManager, err := masacrypto.NewKeyManager(cfg.PrivateKey, cfg.PrivateKeyFile) - if err != nil { - logrus.Fatal("[-] Failed to initialize keys:", err) - } + cfg.SetupLogging() + cfg.LogConfig() // Create a cancellable context ctx, cancel := context.WithCancel(context.Background()) if cfg.Faucet { - err := handleFaucet(cfg.RpcUrl, keyManager.EcdsaPrivKey) + err := handleFaucet(cfg.RpcUrl, cfg.KeyManager.EcdsaPrivKey) if err != nil { logrus.Errorf("[-] %v", err) os.Exit(1) @@ -57,7 +52,7 @@ func main() { } if cfg.StakeAmount != "" { - err := handleStaking(cfg.RpcUrl, keyManager.EcdsaPrivKey, cfg.StakeAmount) + err := handleStaking(cfg.RpcUrl, cfg.KeyManager.EcdsaPrivKey, cfg.StakeAmount) if err != nil { logrus.Warningf("%v", err) } else { @@ -67,7 +62,7 @@ func main() { } // Verify the staking event - isStaked, err := staking.VerifyStakingEvent(cfg.RpcUrl, keyManager.EthAddress) + isStaked, err := staking.VerifyStakingEvent(cfg.RpcUrl, cfg.KeyManager.EthAddress) if err != nil { logrus.Error(err) } @@ -76,7 +71,7 @@ func main() { logrus.Warn("No staking event found for this address") } - masaNodeOptions, workHandlerManager, pubKeySub := initOptions(cfg, keyManager) + masaNodeOptions, workHandlerManager, pubKeySub := config.InitOptions(cfg) // Create a new OracleNode masaNode, err := node.NewOracleNode(ctx, masaNodeOptions...) @@ -94,14 +89,14 @@ func main() { if cfg.AllowedPeer { cfg.AllowedPeerId = masaNode.Host.ID().String() - cfg.AllowedPeerPublicKey = keyManager.HexPubKey + cfg.AllowedPeerPublicKey = cfg.KeyManager.HexPubKey logrus.Infof("[+] Allowed peer with ID: %s and PubKey: %s", cfg.AllowedPeerId, cfg.AllowedPeerPublicKey) } else { logrus.Warn("[-] This node is not set as the allowed peer") } // Init cache resolver - db.InitResolverCache(masaNode, keyManager, cfg.AllowedPeerId, cfg.AllowedPeerPublicKey, cfg.Validator) + db.InitResolverCache(masaNode, cfg.KeyManager, cfg.AllowedPeerId, cfg.AllowedPeerPublicKey, cfg.Validator) // Cancel the context when SIGINT is received go handleSignals(cancel, masaNode, cfg) @@ -125,7 +120,7 @@ func main() { logrus.Errorf("[-] Error while getting node IP address from %v: %v", multiAddr, err) } // Display the welcome message with the multiaddress and IP address - config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, cfg.Validator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) + config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, cfg.KeyManager.EthAddress, isStaked, cfg.Validator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion) <-ctx.Done() } diff --git a/node/options.go b/node/options.go index c2bf90cc..c6a6c826 100644 --- a/node/options.go +++ b/node/options.go @@ -32,7 +32,15 @@ type NodeOption struct { Version string MasaDir string CachePath string - KeyManager *masacrypto.KeyManager + + OracleProtocol string + NodeDataSyncProtocol string + NodeGossipTopic string + Rendezvous string + WorkerProtocol string + PageSize int + + KeyManager *masacrypto.KeyManager } type PubSubHandlers struct { @@ -167,3 +175,39 @@ func WithKeyManager(km *masacrypto.KeyManager) Option { o.KeyManager = km } } + +func WithOracleProtocol(s string) Option { + return func(o *NodeOption) { + o.OracleProtocol = s + } +} + +func WithNodeDataSyncProtocol(s string) Option { + return func(o *NodeOption) { + o.NodeDataSyncProtocol = s + } +} + +func WithNodeGossipTopic(s string) Option { + return func(o *NodeOption) { + o.NodeGossipTopic = s + } +} + +func WithRendezvous(s string) Option { + return func(o *NodeOption) { + o.Rendezvous = s + } +} + +func WithWorkerProtocol(s string) Option { + return func(o *NodeOption) { + o.WorkerProtocol = s + } +} + +func WithPageSize(size int) Option { + return func(o *NodeOption) { + o.PageSize = size + } +} diff --git a/node/oracle_node.go b/node/oracle_node.go index 60b1bf16..a3c588d6 100644 --- a/node/oracle_node.go +++ b/node/oracle_node.go @@ -26,7 +26,6 @@ import ( "github.com/masa-finance/masa-oracle/internal/versioning" "github.com/masa-finance/masa-oracle/pkg/chain" - "github.com/masa-finance/masa-oracle/pkg/config" myNetwork "github.com/masa-finance/masa-oracle/pkg/network" "github.com/masa-finance/masa-oracle/pkg/pubsub" ) @@ -47,7 +46,6 @@ type OracleNode struct { Blockchain *chain.Chain Options NodeOption Context context.Context - Config *config.AppConfig } // GetMultiAddrs returns the priority multiaddr for this node. @@ -150,7 +148,7 @@ func NewOracleNode(ctx context.Context, opts ...Option) (*OracleNode, error) { Options: *o, } - n.Protocol = n.protocolWithVersion(config.OracleProtocol) + n.Protocol = n.protocolWithVersion(n.Options.OracleProtocol) return n, nil } @@ -204,7 +202,7 @@ func (node *OracleNode) Start() (err error) { logrus.Infof("[+] Starting node with ID: %s", node.GetMultiAddrs().String()) node.Host.SetStreamHandler(node.Protocol, node.handleStream) - node.Host.SetStreamHandler(node.protocolWithVersion(config.NodeDataSyncProtocol), node.ReceiveNodeData) + node.Host.SetStreamHandler(node.protocolWithVersion(node.Options.NodeDataSyncProtocol), node.ReceiveNodeData) for pid, n := range node.Options.ProtocolHandlers { node.Host.SetStreamHandler(pid, n) @@ -215,7 +213,7 @@ func (node *OracleNode) Start() (err error) { } if node.Options.IsStaked { - node.Host.SetStreamHandler(node.protocolWithVersion(config.NodeGossipTopic), node.GossipNodeData) + node.Host.SetStreamHandler(node.protocolWithVersion(node.Options.NodeGossipTopic), node.GossipNodeData) } node.Host.Network().Notify(node.NodeTracker) @@ -236,7 +234,7 @@ func (node *OracleNode) Start() (err error) { return err } - err = myNetwork.EnableMDNS(node.Host, config.Rendezvous, node.PeerChan) + err = myNetwork.EnableMDNS(node.Host, node.Options.Rendezvous, node.PeerChan) if err != nil { return err } diff --git a/node/oracle_node_listener.go b/node/oracle_node_listener.go index f6ad7121..a11f1a70 100644 --- a/node/oracle_node_listener.go +++ b/node/oracle_node_listener.go @@ -14,7 +14,6 @@ import ( "github.com/libp2p/go-libp2p/core/peer" "github.com/sirupsen/logrus" - "github.com/masa-finance/masa-oracle/pkg/config" pubsub2 "github.com/masa-finance/masa-oracle/pkg/pubsub" ) @@ -42,7 +41,7 @@ func (node *OracleNode) ListenToNodeTracker() { continue } // Publish the JSON data on the node.topic - err = node.PublishTopic(config.NodeGossipTopic, jsonData) + err = node.PublishTopic(node.Options.NodeGossipTopic, jsonData) if err != nil { logrus.Errorf("[-] Error publishing node data: %v", err) } @@ -88,10 +87,10 @@ type NodeDataPage struct { func (node *OracleNode) SendNodeDataPage(allNodeData []pubsub2.NodeData, stream network.Stream, pageNumber int) { logrus.Debugf("[+] SendNodeDataPage --> %s: Page: %d", stream.Conn().RemotePeer(), pageNumber) totalRecords := len(allNodeData) - totalPages := int(math.Ceil(float64(totalRecords) / config.PageSize)) + totalPages := int(math.Ceil(float64(totalRecords) / float64(node.Options.PageSize))) - startIndex := pageNumber * config.PageSize - endIndex := startIndex + config.PageSize + startIndex := pageNumber * node.Options.PageSize + endIndex := startIndex + node.Options.PageSize if endIndex > totalRecords { endIndex = totalRecords } @@ -133,9 +132,9 @@ func (node *OracleNode) SendNodeData(peerID peer.ID) { nodeData = node.NodeTracker.GetUpdatedNodes(sinceTime) } totalRecords := len(nodeData) - totalPages := int(math.Ceil(float64(totalRecords) / float64(config.PageSize))) + totalPages := int(math.Ceil(float64(totalRecords) / float64(node.Options.PageSize))) - stream, err := node.Host.NewStream(node.Context, peerID, node.protocolWithVersion(config.NodeDataSyncProtocol)) + stream, err := node.Host.NewStream(node.Context, peerID, node.protocolWithVersion(node.Options.NodeDataSyncProtocol)) if err != nil { // node.NodeTracker.RemoveNodeData(peerID.String()) return diff --git a/node/protocol.go b/node/protocol.go index b6f6b3cf..b413dec6 100644 --- a/node/protocol.go +++ b/node/protocol.go @@ -9,7 +9,6 @@ import ( "github.com/libp2p/go-libp2p/core/network" "github.com/libp2p/go-libp2p/core/peer" "github.com/libp2p/go-libp2p/core/protocol" - "github.com/masa-finance/masa-oracle/pkg/config" ) const ( @@ -50,7 +49,7 @@ func (node *OracleNode) subscribeToTopics() error { } // Subscribe to NodeGossipTopic to participate in the network's gossip protocol. - if err := node.SubscribeTopic(config.NodeGossipTopic, node.NodeTracker, false); err != nil { + if err := node.SubscribeTopic(node.Options.NodeGossipTopic, node.NodeTracker, false); err != nil { return err } diff --git a/pkg/config/app.go b/pkg/config/app.go index e02ae0de..1b8d6881 100644 --- a/pkg/config/app.go +++ b/pkg/config/app.go @@ -9,6 +9,7 @@ import ( "strings" "github.com/masa-finance/masa-oracle/internal/versioning" + "github.com/masa-finance/masa-oracle/pkg/masacrypto" "github.com/gotd/contrib/bg" "github.com/joho/godotenv" @@ -58,6 +59,7 @@ type AppConfig struct { WebScraper bool `mapstructure:"webScraper"` APIEnabled bool `mapstructure:"api_enabled"` + KeyManager *masacrypto.KeyManager TelegramStop bg.StopFunc } @@ -72,22 +74,27 @@ type AppConfig struct { // In case of any errors it returns nill with an error object func GetConfig() (*AppConfig, error) { instance := &AppConfig{} - instance.setDefaultConfig() // TODO Shouldn't the env vars override the file config, instead of the other way around? instance.setEnvVariableConfig() instance.setFileConfig(viper.GetString("FILE_PATH")) if err := instance.setCommandLineConfig(); err != nil { - return nil, fmt.Errorf("Error while setting command line options: %v", err) + return nil, err } if err := viper.Unmarshal(instance); err != nil { - return nil, fmt.Errorf("Unable to unmarshal config into struct: %v", err) + return nil, fmt.Errorf("Unable to unmarshal config into struct, %v", err) } instance.APIEnabled = viper.GetBool("api_enabled") + keyManager, err := masacrypto.NewKeyManager(instance.PrivateKey, instance.PrivateKeyFile) + if err != nil { + return nil, fmt.Errorf("Failed to initialize keys: %v", err) + } + instance.KeyManager = keyManager + return instance, nil } diff --git a/cmd/masa-node/config.go b/pkg/config/options.go similarity index 68% rename from cmd/masa-node/config.go rename to pkg/config/options.go index d147e8e1..f0e185da 100644 --- a/cmd/masa-node/config.go +++ b/pkg/config/options.go @@ -1,16 +1,27 @@ -package main +package config import ( "github.com/masa-finance/masa-oracle/node" - "github.com/masa-finance/masa-oracle/pkg/config" - "github.com/masa-finance/masa-oracle/pkg/masacrypto" - pubsub "github.com/masa-finance/masa-oracle/pkg/pubsub" + "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/masa-finance/masa-oracle/pkg/workers" ) -func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) { +var constantOptions = []node.Option{ + node.WithOracleProtocol(OracleProtocol), + node.WithNodeDataSyncProtocol(NodeDataSyncProtocol), + node.WithNodeGossipTopic(NodeGossipTopic), + node.WithRendezvous(Rendezvous), + node.WithPageSize(PageSize), +} + +// WithConstantOptions adds options that are set to constant values. We need to add them to +// the node to avoid a dependency loop. +func WithConstantOptions(nodes ...node.Option) []node.Option { + return append(nodes, constantOptions...) +} + +func InitOptions(cfg *AppConfig) ([]node.Option, *workers.WorkHandlerManager, *pubsub.PublicKeySubscriptionHandler) { // WorkerManager configuration - // TODO: this needs to be moved under config, but now it's here as there are import cycles given singletons workerManagerOptions := []workers.WorkerOptionFunc{ workers.WithMasaDir(cfg.MasaDir), } @@ -20,17 +31,18 @@ func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]no cachePath = cfg.MasaDir + "/cache" } - masaNodeOptions := []node.Option{ + masaNodeOptions := WithConstantOptions( node.EnableStaked, - // config.WithService(), + // WithService(), node.WithEnvironment(cfg.Environment), node.WithVersion(cfg.Version), node.WithPort(cfg.PortNbr), node.WithBootNodes(cfg.Bootnodes...), node.WithMasaDir(cfg.MasaDir), node.WithCachePath(cachePath), - node.WithKeyManager(keyManager), - } + node.WithKeyManager(cfg.KeyManager), + node.WithWorkerProtocol(WorkerProtocol), + ) if cfg.TwitterScraper { workerManagerOptions = append(workerManagerOptions, workers.EnableTwitterWorker) @@ -38,7 +50,7 @@ func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]no } if cfg.TelegramScraper { - // XXX: Telegram scraper is not implemented yet in the worker (?) + // TODO: Telegram scraper is not implemented yet in the worker (?) masaNodeOptions = append(masaNodeOptions, node.IsTelegramScraper) } @@ -56,15 +68,14 @@ func initOptions(cfg *config.AppConfig, keyManager *masacrypto.KeyManager) ([]no blockChainEventTracker := node.NewBlockChain() pubKeySub := &pubsub.PublicKeySubscriptionHandler{} - // TODO: Where the config is involved, move to the config the generation of Node options masaNodeOptions = append(masaNodeOptions, []node.Option{ // Register the worker manager node.WithMasaProtocolHandler( - config.WorkerProtocol, + WorkerProtocol, workHandlerManager.HandleWorkerStream, ), - node.WithPubSubHandler(config.PublicKeyTopic, pubKeySub, false), - node.WithPubSubHandler(config.BlockTopic, blockChainEventTracker, true), + node.WithPubSubHandler(PublicKeyTopic, pubKeySub, false), + node.WithPubSubHandler(BlockTopic, blockChainEventTracker, true), }...) if cfg.Validator { diff --git a/pkg/network/kdht.go b/pkg/network/kdht.go index 49cab0e8..989a21de 100644 --- a/pkg/network/kdht.go +++ b/pkg/network/kdht.go @@ -37,6 +37,7 @@ func EnableDHT(ctx context.Context, host host.Host, bootstrapNodes []multiaddr.M options = append(options, dht.RoutingTableRefreshPeriod(time.Minute*5)) // Set refresh interval options = append(options, dht.Mode(dht.ModeAutoServer)) options = append(options, dht.ProtocolPrefix(prefix)) + // WTF: Why? options = append(options, dht.NamespacedValidator("db", dbValidator{})) kademliaDHT, err := dht.New(ctx, host, options...) diff --git a/pkg/pubsub/manager.go b/pkg/pubsub/manager.go index 5b8b3b96..5f326d9e 100644 --- a/pkg/pubsub/manager.go +++ b/pkg/pubsub/manager.go @@ -83,7 +83,7 @@ func (sm *Manager) AddSubscription(topicName string, handler types.SubscriptionH for { msg, err := sub.Next(sm.ctx) if err != nil { - logrus.Errorf("[-] Error reading from topic: %v", err) + logrus.Errorf("[-] AddSubscription: Error reading from topic: %v", err) if errors.Is(err, context.Canceled) { return } @@ -221,7 +221,7 @@ func (sm *Manager) Subscribe(topicName string, handler types.SubscriptionHandler for { msg, err := sub.Next(sm.ctx) if err != nil { - logrus.Errorf("[-] Error reading from topic: %v", err) + logrus.Errorf("[-] Subscribe: Error reading from topic: %v", err) if errors.Is(err, context.Canceled) { return } diff --git a/pkg/pubsub/node_event_tracker.go b/pkg/pubsub/node_event_tracker.go index 6abe70e3..022850ed 100644 --- a/pkg/pubsub/node_event_tracker.go +++ b/pkg/pubsub/node_event_tracker.go @@ -17,8 +17,10 @@ import ( ) type NodeEventTracker struct { - NodeDataChan chan *NodeData - nodeData *SafeMap + NodeDataChan chan *NodeData + // WTF: Do we really need this? Can't we store it in the libp2p PeerStore metadata? + nodeData *SafeMap + // WTF: Unused? nodeDataFile string ConnectBuffer map[string]ConnectBufferEntry nodeVersion string @@ -138,6 +140,7 @@ func (net *NodeEventTracker) Connected(n network.Network, c network.Conn) { nodeData, exists := net.nodeData.Get(peerID) if !exists { + // WTF: Shouldn't we add it? We don't yet have the NodeData but we can at least add it. return } else { if nodeData.IsActive { @@ -172,6 +175,7 @@ func (net *NodeEventTracker) Disconnected(n network.Network, c network.Conn) { nodeData, exists := net.nodeData.Get(peerID) if !exists { // this should never happen + // WTF: Since we're never adding it on `Connected`.... logrus.Debugf("Node data does not exist for disconnected node: %s", peerID) return } diff --git a/pkg/tests/integration/tracker_test.go b/pkg/tests/integration/tracker_test.go index 3a2cff69..40a21344 100644 --- a/pkg/tests/integration/tracker_test.go +++ b/pkg/tests/integration/tracker_test.go @@ -6,6 +6,7 @@ import ( "fmt" . "github.com/masa-finance/masa-oracle/node" + "github.com/masa-finance/masa-oracle/pkg/config" . "github.com/onsi/ginkgo/v2" . "github.com/onsi/gomega" ) @@ -19,8 +20,10 @@ var _ = Describe("Oracle integration tests", func() { n, err := NewOracleNode( ctx, - EnableStaked, - EnableRandomIdentity, + config.WithConstantOptions( + EnableStaked, + EnableRandomIdentity, + )..., ) Expect(err).ToNot(HaveOccurred()) @@ -36,10 +39,13 @@ var _ = Describe("Oracle integration tests", func() { } By(fmt.Sprintf("Generating second node with bootnodes %+v", bootNodes)) - n2, err := NewOracleNode(ctx, - EnableStaked, - WithBootNodes(bootNodes...), - EnableRandomIdentity, + n2, err := NewOracleNode( + ctx, + config.WithConstantOptions( + EnableStaked, + WithBootNodes(bootNodes...), + EnableRandomIdentity, + )..., ) Expect(err).ToNot(HaveOccurred()) Expect(n.Host.ID()).ToNot(Equal(n2.Host.ID())) diff --git a/pkg/tests/node_data/node_data_tracker_test.go b/pkg/tests/node_data/node_data_tracker_test.go index e43afbfb..ac101ab4 100644 --- a/pkg/tests/node_data/node_data_tracker_test.go +++ b/pkg/tests/node_data/node_data_tracker_test.go @@ -8,6 +8,7 @@ import ( . "github.com/onsi/gomega" . "github.com/masa-finance/masa-oracle/node" + "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/pubsub" ) @@ -16,8 +17,10 @@ var _ = Describe("NodeDataTracker", func() { It("should correctly update NodeData Twitter fields", func() { testNode, err := NewOracleNode( context.Background(), - EnableStaked, - EnableRandomIdentity, + config.WithConstantOptions( + EnableStaked, + EnableRandomIdentity, + )..., ) Expect(err).NotTo(HaveOccurred()) diff --git a/pkg/workers/handlers/web.go b/pkg/workers/handlers/web.go index 309c94ea..43190744 100644 --- a/pkg/workers/handlers/web.go +++ b/pkg/workers/handlers/web.go @@ -11,7 +11,6 @@ import ( // WebHandler - All the web handlers implement the WorkHandler interface. type WebHandler struct{} -type WebSentimentHandler struct{} func (h *WebHandler) HandleWork(data []byte) data_types.WorkResponse { logrus.Infof("[+] WebHandler %s", data) diff --git a/pkg/workers/types/work_types.go b/pkg/workers/types/work_types.go index 437ed402..9b2e6340 100644 --- a/pkg/workers/types/work_types.go +++ b/pkg/workers/types/work_types.go @@ -12,7 +12,6 @@ const ( Discord WorkerType = "discord" DiscordProfile WorkerType = "discord-profile" DiscordChannelMessages WorkerType = "discord-channel-messages" - TelegramSentiment WorkerType = "telegram-sentiment" TelegramChannelMessages WorkerType = "telegram-channel-messages" DiscordGuildChannels WorkerType = "discord-guild-channels" DiscordUserGuilds WorkerType = "discord-user-guilds" @@ -35,7 +34,7 @@ func WorkerTypeToCategory(wt WorkerType) pubsub.WorkerCategory { case Discord, DiscordProfile, DiscordChannelMessages, DiscordGuildChannels, DiscordUserGuilds: logrus.Info("WorkerType is related to Discord") return pubsub.CategoryDiscord - case TelegramSentiment, TelegramChannelMessages: + case TelegramChannelMessages: logrus.Info("WorkerType is related to Telegram") return pubsub.CategoryTelegram case Twitter, TwitterFollowers, TwitterProfile: @@ -57,7 +56,7 @@ func WorkerTypeToDataSource(wt WorkerType) string { case Discord, DiscordProfile, DiscordChannelMessages, DiscordGuildChannels, DiscordUserGuilds: logrus.Info("WorkerType is related to Discord") return DataSourceDiscord - case TelegramSentiment, TelegramChannelMessages: + case TelegramChannelMessages: logrus.Info("WorkerType is related to Telegram") return DataSourceTelegram case Twitter, TwitterFollowers, TwitterProfile: diff --git a/pkg/workers/worker_manager.go b/pkg/workers/worker_manager.go index 4a4797f2..e31c395f 100644 --- a/pkg/workers/worker_manager.go +++ b/pkg/workers/worker_manager.go @@ -16,7 +16,6 @@ import ( "github.com/sirupsen/logrus" "github.com/masa-finance/masa-oracle/node" - "github.com/masa-finance/masa-oracle/pkg/config" "github.com/masa-finance/masa-oracle/pkg/event" "github.com/masa-finance/masa-oracle/pkg/pubsub" "github.com/masa-finance/masa-oracle/pkg/workers/handlers" @@ -214,7 +213,7 @@ func (whm *WorkHandlerManager) sendWorkToWorker(node *node.OracleNode, worker da } else { //whm.eventTracker.TrackRemoteWorkerConnection(worker.AddrInfo.ID.String()) logrus.Debugf("[+] Connection established with node: %s", worker.AddrInfo.ID.String()) - stream, err := node.ProtocolStream(ctxWithTimeout, worker.AddrInfo.ID, config.WorkerProtocol) + stream, err := node.ProtocolStream(ctxWithTimeout, worker.AddrInfo.ID, node.Options.WorkerProtocol) if err != nil { response.Error = fmt.Sprintf("error opening stream: %v", err) whm.eventTracker.TrackWorkerFailure(workRequest.WorkType, response.Error, worker.AddrInfo.ID.String())