Skip to content

Commit

Permalink
chore(refactor): create the node package
Browse files Browse the repository at this point in the history
Signed-off-by: mudler <[email protected]>
  • Loading branch information
mudler committed Sep 17, 2024
1 parent 13e2b2d commit 919f152
Show file tree
Hide file tree
Showing 17 changed files with 304 additions and 261 deletions.
26 changes: 13 additions & 13 deletions cmd/masa-node/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (

"github.com/sirupsen/logrus"

masa "github.com/masa-finance/masa-oracle/pkg"
"github.com/masa-finance/masa-oracle/node"
"github.com/masa-finance/masa-oracle/pkg/api"
"github.com/masa-finance/masa-oracle/pkg/config"
"github.com/masa-finance/masa-oracle/pkg/db"
Expand Down Expand Up @@ -74,39 +74,39 @@ func main() {
isValidator := cfg.Validator

// Create a new OracleNode
node, err := masa.NewOracleNode(ctx, config.EnableStaked)
masaNode, err := node.NewOracleNode(ctx, config.EnableStaked)
if err != nil {
logrus.Fatal(err)
}
err = node.Start()
err = masaNode.Start()
if err != nil {
logrus.Fatal(err)
}

node.NodeTracker.GetAllNodeData()
masaNode.NodeTracker.GetAllNodeData()
if cfg.TwitterScraper && cfg.DiscordScraper && cfg.WebScraper {
logrus.Warn("[+] Node is set as all types of scrapers. This may not be intended behavior.")
}

if cfg.AllowedPeer {
cfg.AllowedPeerId = node.Host.ID().String()
cfg.AllowedPeerId = masaNode.Host.ID().String()
cfg.AllowedPeerPublicKey = keyManager.HexPubKey
logrus.Infof("[+] Allowed peer with ID: %s and PubKey: %s", cfg.AllowedPeerId, cfg.AllowedPeerPublicKey)
} else {
logrus.Warn("[-] This node is not set as the allowed peer")
}

// Init cache resolver
db.InitResolverCache(node, keyManager)
db.InitResolverCache(masaNode, keyManager)

// Subscribe and if actor start monitoring actor workers
// considering all that matters is if the node is staked
// and other peers can do work we only need to check this here
// if this peer can or cannot scrape or write that is checked in other places
if node.IsStaked {
node.Host.SetStreamHandler(config.ProtocolWithVersion(config.WorkerProtocol), workers.GetWorkHandlerManager().HandleWorkerStream)
go masa.SubscribeToBlocks(ctx, node)
go node.NodeTracker.ClearExpiredWorkerTimeouts()
if masaNode.IsStaked {
masaNode.Host.SetStreamHandler(config.ProtocolWithVersion(config.WorkerProtocol), workers.GetWorkHandlerManager().HandleWorkerStream)
go node.SubscribeToBlocks(ctx, masaNode)
go masaNode.NodeTracker.ClearExpiredWorkerTimeouts()
}

// Listen for SIGINT (CTRL+C)
Expand All @@ -116,7 +116,7 @@ func main() {
// Cancel the context when SIGINT is received
go func() {
<-c
nodeData := node.NodeTracker.GetNodeData(node.Host.ID().String())
nodeData := masaNode.NodeTracker.GetNodeData(masaNode.Host.ID().String())
if nodeData != nil {
nodeData.Left()
}
Expand All @@ -130,7 +130,7 @@ func main() {
}
}()

router := api.SetupRoutes(node)
router := api.SetupRoutes(masaNode)
go func() {
err = router.Run()
if err != nil {
Expand All @@ -139,7 +139,7 @@ func main() {
}()

// Get the multiaddress and IP address of the node
multiAddr := node.GetMultiAddrs() // Get the multiaddress
multiAddr := masaNode.GetMultiAddrs() // Get the multiaddress
ipAddr, err := multiAddr.ValueForProtocol(multiaddr.P_IP4) // Get the IP address
// Display the welcome message with the multiaddress and IP address
config.DisplayWelcomeMessage(multiAddr.String(), ipAddr, keyManager.EthAddress, isStaked, isValidator, cfg.TwitterScraper, cfg.TelegramScraper, cfg.DiscordScraper, cfg.WebScraper, versioning.ApplicationVersion, versioning.ProtocolVersion)
Expand Down
214 changes: 214 additions & 0 deletions node/blockchain.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,214 @@
package node

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
"os"
"reflect"
"sync"
"time"

shell "github.com/ipfs/go-ipfs-api"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/masa-finance/masa-oracle/pkg/chain"
"github.com/sirupsen/logrus"
)

// Blockchain Implementation
var (
blocksCh = make(chan *pubsub.Message)
)

type BlockData struct {
Block uint64 `json:"block"`
InputData interface{} `json:"input_data"`
TransactionHash string `json:"transaction_hash"`
PreviousHash string `json:"previous_hash"`
TransactionNonce int `json:"nonce"`
}

type Blocks struct {
BlockData []BlockData `json:"blocks"`
}

type BlockEvents struct{}

type BlockEventTracker struct {
BlockEvents []BlockEvents
BlockTopic *pubsub.Topic
mu sync.Mutex
}

// HandleMessage processes incoming pubsub messages containing block events.
// It unmarshals the message data into a slice of BlockEvents and appends them
// to the tracker's BlockEvents slice.
func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) {
var blockEvents any

// Try to decode as base64 first
decodedData, err := base64.StdEncoding.DecodeString(string(m.Data))
if err == nil {
m.Data = decodedData
}

// Try to unmarshal as JSON
err = json.Unmarshal(m.Data, &blockEvents)
if err != nil {
// If JSON unmarshal fails, try to interpret as string
blockEvents = string(m.Data)
}

b.mu.Lock()
defer b.mu.Unlock()

switch v := blockEvents.(type) {
case []BlockEvents:
b.BlockEvents = append(b.BlockEvents, v...)
case BlockEvents:
b.BlockEvents = append(b.BlockEvents, v)
case map[string]interface{}:
// Convert map to BlockEvents struct
newBlockEvent := BlockEvents{}
// You might need to add logic here to properly convert the map to BlockEvents
b.BlockEvents = append(b.BlockEvents, newBlockEvent)
case []interface{}:
// Convert each item in the slice to BlockEvents
for _, item := range v {
if be, ok := item.(BlockEvents); ok {
b.BlockEvents = append(b.BlockEvents, be)
}
}
case string:
// Handle string data
newBlockEvent := BlockEvents{}
// You might need to add logic here to properly convert the string to BlockEvents
b.BlockEvents = append(b.BlockEvents, newBlockEvent)
default:
logrus.Warnf("[-] Unexpected data type in message: %v", reflect.TypeOf(v))
}

blocksCh <- m
}

func updateBlocks(ctx context.Context, node *OracleNode) error {

var existingBlocks Blocks
blocks := chain.GetBlockchain(node.Blockchain)

for _, block := range blocks {
var inputData interface{}
err := json.Unmarshal(block.Data, &inputData)
if err != nil {
inputData = string(block.Data) // Fallback to string if unmarshal fails
}

blockData := BlockData{
Block: block.Block,
InputData: base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf("%v", inputData))),
TransactionHash: fmt.Sprintf("%x", block.Hash),
PreviousHash: fmt.Sprintf("%x", block.Link),
TransactionNonce: int(block.Nonce),
}
existingBlocks.BlockData = append(existingBlocks.BlockData, blockData)
}
jsonData, err := json.Marshal(existingBlocks)
if err != nil {
return err
}

err = node.DHT.PutValue(ctx, "/db/blocks", jsonData)
if err != nil {
logrus.Warningf("[-] Unable to store block on DHT: %v", err)
}

if os.Getenv("IPFS_URL") != "" {

infuraURL := fmt.Sprintf("https://%s:%s@%s", os.Getenv("PID"), os.Getenv("PS"), os.Getenv("IPFS_URL"))
sh := shell.NewShell(infuraURL)

jsonBytes, err := json.Marshal(jsonData)
if err != nil {
logrus.Errorf("[-] Error marshalling JSON: %s", err)
}

reader := bytes.NewReader(jsonBytes)

hash, err := sh.AddWithOpts(reader, true, true)
if err != nil {
logrus.Errorf("[-] Error persisting to IPFS: %s", err)
} else {
logrus.Printf("[+] Ledger persisted with IPFS hash: https://dwn.infura-ipfs.io/ipfs/%s\n", hash)
_ = node.DHT.PutValue(ctx, "/db/ipfs", []byte(fmt.Sprintf("https://dwn.infura-ipfs.io/ipfs/%s", hash)))

}
}

return nil
}

func SubscribeToBlocks(ctx context.Context, node *OracleNode) {
if !node.IsValidator {
return
}

go func() {
err := node.Blockchain.Init()
if err != nil {
logrus.Error(err)
}
}()

updateTicker := time.NewTicker(time.Second * 60)
defer updateTicker.Stop()

for {
select {
case block, ok := <-blocksCh:
if !ok {
logrus.Error("[-] Block channel closed")
return
}
if err := processBlock(node, block); err != nil {
logrus.Errorf("[-] Error processing block: %v", err)
// Consider adding a retry mechanism or circuit breaker here
}

case <-updateTicker.C:
logrus.Info("[+] blockchain tick")
if err := updateBlocks(ctx, node); err != nil {
logrus.Errorf("[-] Error updating blocks: %v", err)
// Consider adding a retry mechanism or circuit breaker here
}

case <-ctx.Done():
logrus.Info("[+] Context cancelled, stopping block subscription")
return
}
}
}

func processBlock(node *OracleNode, block *pubsub.Message) error {
blocks := chain.GetBlockchain(node.Blockchain)
for _, b := range blocks {
if string(b.Data) == string(block.Data) {
return nil // Block already exists
}
}

if err := node.Blockchain.AddBlock(block.Data); err != nil {
return fmt.Errorf("[-] failed to add block: %w", err)
}

if node.Blockchain.LastHash != nil {
b, err := node.Blockchain.GetBlock(node.Blockchain.LastHash)
if err != nil {
return fmt.Errorf("[-] failed to get last block: %w", err)
}
b.Print()
}

return nil
}
Loading

0 comments on commit 919f152

Please sign in to comment.