From bec2123ab52c5b8c53a37a6323c377a6bd565a43 Mon Sep 17 00:00:00 2001 From: jdutchak Date: Fri, 19 Jul 2024 10:15:52 -0400 Subject: [PATCH] persisting to chain and added get block from api and ipfs --- pkg/api/handlers_data.go | 49 ++++++++++++++ pkg/api/routes.go | 11 ++++ pkg/chain/chain.go | 16 +++++ pkg/oracle_node.go | 59 +++++++---------- pkg/oracle_node_listener.go | 128 ------------------------------------ pkg/workers/workers.go | 20 +----- 6 files changed, 101 insertions(+), 182 deletions(-) diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index d3e02d20..bb3680b4 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -10,6 +10,7 @@ import ( "os" "sync" + "github.com/masa-finance/masa-oracle/pkg/chain" "github.com/masa-finance/masa-oracle/pkg/workers" "github.com/multiformats/go-multiaddr" "github.com/sirupsen/logrus" @@ -859,6 +860,54 @@ func (api *API) CfLlmChat() gin.HandlerFunc { } } +func (api *API) GetBlocks() gin.HandlerFunc { + return func(c *gin.Context) { + + type BlockData struct { + InputData interface{} `json:"input_data"` + TransactionHash string `json:"transaction_hash"` + PreviousHash string `json:"previous_hash"` + TransactionNonce int `json:"transaction_nonce"` + } + + type Blocks struct { + BlockData []BlockData `json:"block_data"` + } + var existingBlocks Blocks + blocks := chain.GetBlockchain(api.Node.Blockchain) + + for _, block := range blocks { + var inputData interface{} + err := json.Unmarshal(block.Data, &inputData) + if err != nil { + inputData = string(block.Data) // Fallback to string if unmarshal fails + } + + blockData := BlockData{ + InputData: inputData, + TransactionHash: fmt.Sprintf("%x", block.Hash), + PreviousHash: fmt.Sprintf("%x", block.Link), + TransactionNonce: int(block.Nonce), + } + existingBlocks.BlockData = append(existingBlocks.BlockData, blockData) + } + + jsonData, err := json.Marshal(existingBlocks) + if err != nil { + logrus.Error(err) + return + } + var blocksResponse Blocks + err = json.Unmarshal(jsonData, &blocksResponse) + if err != nil { + logrus.Error(err) + c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()}) + return + } + c.JSON(http.StatusOK, blocksResponse) + } +} + func (api *API) Test() gin.HandlerFunc { return func(c *gin.Context) { var reqBody struct { diff --git a/pkg/api/routes.go b/pkg/api/routes.go index aec39add..dc76ae05 100644 --- a/pkg/api/routes.go +++ b/pkg/api/routes.go @@ -434,8 +434,19 @@ func SetupRoutes(node *masa.OracleNode) *gin.Engine { // @Router /chat/cf [post] v1.POST("/chat/cf", API.CfLlmChat()) + // @Summary Get Blocks + // @Description Retrieves the list of blocks from the blockchain + // @Tags Blocks + // @Accept json + // @Produce json + // @Success 200 {object} Blocks "Successfully retrieved blocks" + // @Failure 400 {object} ErrorResponse "Error retrieving blocks" + // @Router /blocks [get] + v1.GET("/blocks", API.GetBlocks()) + // @note a test route v1.POST("/test", API.Test()) + } // @Summary Node Status Page diff --git a/pkg/chain/chain.go b/pkg/chain/chain.go index 427fd477..b2dd3bcb 100644 --- a/pkg/chain/chain.go +++ b/pkg/chain/chain.go @@ -110,3 +110,19 @@ func (c *Chain) GetBlock(hash []byte) (*Block, error) { } return block, nil } + +func GetBlockchain(c *Chain) []*Block { + var blockchain []*Block + + each := func(b *Block) { + blockchain = append(blockchain, b) + } + + err := c.IterateLink(each, func() {}, func() {}) + if err != nil { + logrus.Errorf("Error iterating through blockchain: %v", err) + return nil + } + + return blockchain +} diff --git a/pkg/oracle_node.go b/pkg/oracle_node.go index f4e72c9d..a9a8285a 100644 --- a/pkg/oracle_node.go +++ b/pkg/oracle_node.go @@ -210,7 +210,6 @@ func (node *OracleNode) Start() (err error) { if node.IsStaked { node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeGossipTopic), node.GossipNodeData) - node.Host.SetStreamHandler(config.ProtocolWithVersion(config.BlockTopic), node.BlockData) } node.Host.Network().Notify(node.NodeTracker) @@ -368,15 +367,14 @@ var ( ) type BlockData struct { - InputData string `json:"input_data"` - TransactionHash string `json:"transaction_hash"` - PreviousHash string `json:"previous_hash"` - TransactionNonce int `json:"transaction_nonce"` + InputData interface{} `json:"input_data"` + TransactionHash string `json:"transaction_hash"` + PreviousHash string `json:"previous_hash"` + TransactionNonce int `json:"transaction_nonce"` } type Blocks struct { - TransactionHash string `json:"last_transaction_hash"` - BlockData []BlockData `json:"block_data"` + BlockData []BlockData `json:"block_data"` } type BlockEvents struct{} @@ -389,8 +387,6 @@ type BlockEventTracker struct { } func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) { - logrus.Infof("chain -> Received block from: %s", m.ReceivedFrom) - var blockEvents BlockEvents err := json.Unmarshal(m.Data, &blockEvents) if err != nil { @@ -403,37 +399,26 @@ func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) { b.BlocksCh <- m } -func updateBlocks(ctx context.Context, node *OracleNode, block *chain.Block) { - - blockData := BlockData{ - InputData: string(block.Data), - TransactionHash: fmt.Sprintf("%x", block.Hash), - PreviousHash: fmt.Sprintf("%x", block.Link), - TransactionNonce: int(block.Nonce), - } +func updateBlocks(ctx context.Context, node *OracleNode) { var existingBlocks Blocks - existingBlocks.BlockData = append(existingBlocks.BlockData, blockData) - existingBlocks = Blocks{ - TransactionHash: blockData.TransactionHash, - BlockData: []BlockData{blockData}, - } + blocks := chain.GetBlockchain(node.Blockchain) - // exists, _ := node.DHT.GetValue(ctx, "/db/blocks") - // var existingBlocks Blocks - // if exists != nil { - // err := json.Unmarshal(exists, &existingBlocks) - // if err != nil { - // logrus.Errorf("Error unmarshalling existing block data: %v", err) - // } - // existingBlocks.BlockData = append(existingBlocks.BlockData, blockData) - // } else { - // existingBlocks = Blocks{ - // TransactionHash: blockData.TransactionHash, - // BlockData: []BlockData{blockData}, - // } - // } + for _, block := range blocks { + var inputData interface{} + err := json.Unmarshal(block.Data, &inputData) + if err != nil { + inputData = string(block.Data) // Fallback to string if unmarshal fails + } + blockData := BlockData{ + InputData: inputData, + TransactionHash: fmt.Sprintf("%x", block.Hash), + PreviousHash: fmt.Sprintf("%x", block.Link), + TransactionNonce: int(block.Nonce), + } + existingBlocks.BlockData = append(existingBlocks.BlockData, blockData) + } jsonData, err := json.Marshal(existingBlocks) if err != nil { logrus.Error(err) @@ -495,7 +480,7 @@ func SubscribeToBlocks(ctx context.Context, node *OracleNode) { } b.Print() - updateBlocks(ctx, node, b) + updateBlocks(ctx, node) } case <-ctx.Done(): diff --git a/pkg/oracle_node_listener.go b/pkg/oracle_node_listener.go index 6517cb66..1bc66bc7 100644 --- a/pkg/oracle_node_listener.go +++ b/pkg/oracle_node_listener.go @@ -210,134 +210,6 @@ func (node *OracleNode) GossipNodeData(stream network.Stream) { } } -func (node *OracleNode) BlockData(stream network.Stream) { - logrus.Info("BlockData stream from -> ", stream.Conn().RemotePeer()) - - data, err := io.ReadAll(stream) - if err != nil { - logrus.Errorf("Failed to read stream: %v", err) - return - } - logrus.Info("stream -> BlockData", data) - // go readData(node, rw) - // go writeData(node, rw) - err = stream.Close() - if err != nil { - logrus.Errorf("Failed to close stream: %v", err) - } -} - -// func readData(node *OracleNode, rw *bufio.ReadWriter) { -// -// for { -// str, err := rw.ReadString('\n') -// if err != nil { -// -// logrus.Fatal(err) -// } -// -// if str == "" { -// return -// } -// if str != "\n" { -// -// logrus.Info("readData", node.multiAddrs, str) -// -// // chain := make([]Block, 0) -// // if err := json.Unmarshal([]byte(str), &chain); err != nil { -// // log.Fatal(err) -// // } -// -// // mutex.Lock() -// // if len(chain) > len(Blockchain) { -// // Blockchain = chain -// // bytes, err := json.MarshalIndent(Blockchain, "", " ") -// // if err != nil { -// -// // log.Fatal(err) -// // } -// // // Green console color: \x1b[32m -// // // Reset console color: \x1b[0m -// // fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes)) -// // } -// // mutex.Unlock() -// } -// } -// } - -//func writeData(node *OracleNode, rw *bufio.ReadWriter) { -// for { -// sendData := fmt.Sprintf("peer connected %s", node.multiAddrs) -// -// _, err := rw.WriteString(sendData) -// if err != nil { -// logrus.Error("Error writing to buffer:", err) -// return -// } -// err = rw.Flush() -// if err != nil { -// logrus.Error("Error flushing buffer:", err) -// return -// } -// -// time.Sleep(time.Second * 10) -// } -// -// // go func() { -// // for { -// // time.Sleep(5 * time.Second) -// // mutex.Lock() -// // bytes, err := json.Marshal(Blockchain) -// // if err != nil { -// // log.Println(err) -// // } -// // mutex.Unlock() -// -// // mutex.Lock() -// // rw.WriteString(fmt.Sprintf("%s\n", string(bytes))) -// // rw.Flush() -// // mutex.Unlock() -// -// // } -// // }() -// -// // stdReader := bufio.NewReader(os.Stdin) -// -// // for { -// // fmt.Print("> ") -// // sendData, err := stdReader.ReadString('\n') -// // if err != nil { -// // log.Fatal(err) -// // } -// -// // sendData = strings.Replace(sendData, "\n", "", -1) -// // bpm, err := strconv.Atoi(sendData) -// // if err != nil { -// // log.Fatal(err) -// // } -// // newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm) -// -// // if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) { -// // mutex.Lock() -// // Blockchain = append(Blockchain, newBlock) -// // mutex.Unlock() -// // } -// -// // bytes, err := json.Marshal(Blockchain) -// // if err != nil { -// // log.Println(err) -// // } -// -// // spew.Dump(Blockchain) -// -// // mutex.Lock() -// // rw.WriteString(fmt.Sprintf("%s\n", string(bytes))) -// // rw.Flush() -// // mutex.Unlock() -// // } -// -//} - // handleStreamData reads a network stream to get the remote peer ID // and NodeData. It returns the remote peer ID, NodeData, and any error. func (node *OracleNode) handleStreamData(stream network.Stream) (peer.ID, pubsub2.NodeData, error) { diff --git a/pkg/workers/workers.go b/pkg/workers/workers.go index 9b9d4fa8..8731d2e7 100644 --- a/pkg/workers/workers.go +++ b/pkg/workers/workers.go @@ -313,7 +313,7 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) { msg := &pubsub2.Message{} err = json.Unmarshal([]byte(response.Value), msg) if err != nil { - msg, err = getResponseMessage(result.(*messages.Response)) + _, err = getResponseMessage(result.(*messages.Response)) if err != nil { logrus.Debugf("Error getting response message: %v", err) return @@ -510,23 +510,9 @@ func processWork(data *pubsub2.Message, work string, startTime *time.Time, node Duration: duration.Seconds(), Timestamp: time.Now().Unix(), } + logrus.Infof("[+] Work event: %v", workEvent) - // @todo - // Create a new stream to handle the block data - stream, err := node.Host.NewStream(context.Background(), peer.ID(workEvent.PeerId), config.ProtocolWithVersion(config.BlockTopic)) - if err != nil { - logrus.Errorf("Failed to open stream: %v", err) - return - } - defer stream.Close() - - // Write the message data to the stream - _, err = stream.Write(workEvent.Payload) - if err != nil { - logrus.Errorf("Failed to write to stream: %v", err) - return - } - // @todo + _ = node.PubSubManager.Publish(config.TopicWithVersion(config.BlockTopic), workEvent.Payload) // updateRecords(node, workEvent) }