Skip to content

Commit

Permalink
persisting to chain and added get block from api and ipfs
Browse files Browse the repository at this point in the history
  • Loading branch information
jdutchak committed Jul 19, 2024
1 parent 34b0ed5 commit bec2123
Show file tree
Hide file tree
Showing 6 changed files with 101 additions and 182 deletions.
49 changes: 49 additions & 0 deletions pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"os"
"sync"

"github.com/masa-finance/masa-oracle/pkg/chain"
"github.com/masa-finance/masa-oracle/pkg/workers"
"github.com/multiformats/go-multiaddr"
"github.com/sirupsen/logrus"
Expand Down Expand Up @@ -859,6 +860,54 @@ func (api *API) CfLlmChat() gin.HandlerFunc {
}
}

func (api *API) GetBlocks() gin.HandlerFunc {
return func(c *gin.Context) {

type BlockData struct {
InputData interface{} `json:"input_data"`
TransactionHash string `json:"transaction_hash"`
PreviousHash string `json:"previous_hash"`
TransactionNonce int `json:"transaction_nonce"`
}

type Blocks struct {
BlockData []BlockData `json:"block_data"`
}
var existingBlocks Blocks
blocks := chain.GetBlockchain(api.Node.Blockchain)

for _, block := range blocks {
var inputData interface{}
err := json.Unmarshal(block.Data, &inputData)
if err != nil {
inputData = string(block.Data) // Fallback to string if unmarshal fails
}

blockData := BlockData{
InputData: inputData,
TransactionHash: fmt.Sprintf("%x", block.Hash),
PreviousHash: fmt.Sprintf("%x", block.Link),
TransactionNonce: int(block.Nonce),
}
existingBlocks.BlockData = append(existingBlocks.BlockData, blockData)
}

jsonData, err := json.Marshal(existingBlocks)
if err != nil {
logrus.Error(err)
return
}
var blocksResponse Blocks
err = json.Unmarshal(jsonData, &blocksResponse)
if err != nil {
logrus.Error(err)
c.JSON(http.StatusInternalServerError, gin.H{"error": err.Error()})
return
}
c.JSON(http.StatusOK, blocksResponse)
}
}

func (api *API) Test() gin.HandlerFunc {
return func(c *gin.Context) {
var reqBody struct {
Expand Down
11 changes: 11 additions & 0 deletions pkg/api/routes.go
Original file line number Diff line number Diff line change
Expand Up @@ -434,8 +434,19 @@ func SetupRoutes(node *masa.OracleNode) *gin.Engine {
// @Router /chat/cf [post]
v1.POST("/chat/cf", API.CfLlmChat())

// @Summary Get Blocks
// @Description Retrieves the list of blocks from the blockchain
// @Tags Blocks
// @Accept json
// @Produce json
// @Success 200 {object} Blocks "Successfully retrieved blocks"
// @Failure 400 {object} ErrorResponse "Error retrieving blocks"
// @Router /blocks [get]
v1.GET("/blocks", API.GetBlocks())

// @note a test route
v1.POST("/test", API.Test())

}

// @Summary Node Status Page
Expand Down
16 changes: 16 additions & 0 deletions pkg/chain/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,3 +110,19 @@ func (c *Chain) GetBlock(hash []byte) (*Block, error) {
}
return block, nil
}

func GetBlockchain(c *Chain) []*Block {
var blockchain []*Block

each := func(b *Block) {
blockchain = append(blockchain, b)
}

err := c.IterateLink(each, func() {}, func() {})
if err != nil {
logrus.Errorf("Error iterating through blockchain: %v", err)
return nil
}

return blockchain
}
59 changes: 22 additions & 37 deletions pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,7 +210,6 @@ func (node *OracleNode) Start() (err error) {

if node.IsStaked {
node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeGossipTopic), node.GossipNodeData)
node.Host.SetStreamHandler(config.ProtocolWithVersion(config.BlockTopic), node.BlockData)
}
node.Host.Network().Notify(node.NodeTracker)

Expand Down Expand Up @@ -368,15 +367,14 @@ var (
)

type BlockData struct {
InputData string `json:"input_data"`
TransactionHash string `json:"transaction_hash"`
PreviousHash string `json:"previous_hash"`
TransactionNonce int `json:"transaction_nonce"`
InputData interface{} `json:"input_data"`
TransactionHash string `json:"transaction_hash"`
PreviousHash string `json:"previous_hash"`
TransactionNonce int `json:"transaction_nonce"`
}

type Blocks struct {
TransactionHash string `json:"last_transaction_hash"`
BlockData []BlockData `json:"block_data"`
BlockData []BlockData `json:"block_data"`
}

type BlockEvents struct{}
Expand All @@ -389,8 +387,6 @@ type BlockEventTracker struct {
}

func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) {
logrus.Infof("chain -> Received block from: %s", m.ReceivedFrom)

var blockEvents BlockEvents
err := json.Unmarshal(m.Data, &blockEvents)
if err != nil {
Expand All @@ -403,37 +399,26 @@ func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) {
b.BlocksCh <- m
}

func updateBlocks(ctx context.Context, node *OracleNode, block *chain.Block) {

blockData := BlockData{
InputData: string(block.Data),
TransactionHash: fmt.Sprintf("%x", block.Hash),
PreviousHash: fmt.Sprintf("%x", block.Link),
TransactionNonce: int(block.Nonce),
}
func updateBlocks(ctx context.Context, node *OracleNode) {

var existingBlocks Blocks
existingBlocks.BlockData = append(existingBlocks.BlockData, blockData)
existingBlocks = Blocks{
TransactionHash: blockData.TransactionHash,
BlockData: []BlockData{blockData},
}
blocks := chain.GetBlockchain(node.Blockchain)

// exists, _ := node.DHT.GetValue(ctx, "/db/blocks")
// var existingBlocks Blocks
// if exists != nil {
// err := json.Unmarshal(exists, &existingBlocks)
// if err != nil {
// logrus.Errorf("Error unmarshalling existing block data: %v", err)
// }
// existingBlocks.BlockData = append(existingBlocks.BlockData, blockData)
// } else {
// existingBlocks = Blocks{
// TransactionHash: blockData.TransactionHash,
// BlockData: []BlockData{blockData},
// }
// }
for _, block := range blocks {
var inputData interface{}
err := json.Unmarshal(block.Data, &inputData)
if err != nil {
inputData = string(block.Data) // Fallback to string if unmarshal fails
}

blockData := BlockData{
InputData: inputData,
TransactionHash: fmt.Sprintf("%x", block.Hash),
PreviousHash: fmt.Sprintf("%x", block.Link),
TransactionNonce: int(block.Nonce),
}
existingBlocks.BlockData = append(existingBlocks.BlockData, blockData)
}
jsonData, err := json.Marshal(existingBlocks)
if err != nil {
logrus.Error(err)
Expand Down Expand Up @@ -495,7 +480,7 @@ func SubscribeToBlocks(ctx context.Context, node *OracleNode) {
}
b.Print()

updateBlocks(ctx, node, b)
updateBlocks(ctx, node)
}

case <-ctx.Done():
Expand Down
128 changes: 0 additions & 128 deletions pkg/oracle_node_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,134 +210,6 @@ func (node *OracleNode) GossipNodeData(stream network.Stream) {
}
}

func (node *OracleNode) BlockData(stream network.Stream) {
logrus.Info("BlockData stream from -> ", stream.Conn().RemotePeer())

data, err := io.ReadAll(stream)
if err != nil {
logrus.Errorf("Failed to read stream: %v", err)
return
}
logrus.Info("stream -> BlockData", data)
// go readData(node, rw)
// go writeData(node, rw)
err = stream.Close()
if err != nil {
logrus.Errorf("Failed to close stream: %v", err)
}
}

// func readData(node *OracleNode, rw *bufio.ReadWriter) {
//
// for {
// str, err := rw.ReadString('\n')
// if err != nil {
//
// logrus.Fatal(err)
// }
//
// if str == "" {
// return
// }
// if str != "\n" {
//
// logrus.Info("readData", node.multiAddrs, str)
//
// // chain := make([]Block, 0)
// // if err := json.Unmarshal([]byte(str), &chain); err != nil {
// // log.Fatal(err)
// // }
//
// // mutex.Lock()
// // if len(chain) > len(Blockchain) {
// // Blockchain = chain
// // bytes, err := json.MarshalIndent(Blockchain, "", " ")
// // if err != nil {
//
// // log.Fatal(err)
// // }
// // // Green console color: \x1b[32m
// // // Reset console color: \x1b[0m
// // fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
// // }
// // mutex.Unlock()
// }
// }
// }

//func writeData(node *OracleNode, rw *bufio.ReadWriter) {
// for {
// sendData := fmt.Sprintf("peer connected %s", node.multiAddrs)
//
// _, err := rw.WriteString(sendData)
// if err != nil {
// logrus.Error("Error writing to buffer:", err)
// return
// }
// err = rw.Flush()
// if err != nil {
// logrus.Error("Error flushing buffer:", err)
// return
// }
//
// time.Sleep(time.Second * 10)
// }
//
// // go func() {
// // for {
// // time.Sleep(5 * time.Second)
// // mutex.Lock()
// // bytes, err := json.Marshal(Blockchain)
// // if err != nil {
// // log.Println(err)
// // }
// // mutex.Unlock()
//
// // mutex.Lock()
// // rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
// // rw.Flush()
// // mutex.Unlock()
//
// // }
// // }()
//
// // stdReader := bufio.NewReader(os.Stdin)
//
// // for {
// // fmt.Print("> ")
// // sendData, err := stdReader.ReadString('\n')
// // if err != nil {
// // log.Fatal(err)
// // }
//
// // sendData = strings.Replace(sendData, "\n", "", -1)
// // bpm, err := strconv.Atoi(sendData)
// // if err != nil {
// // log.Fatal(err)
// // }
// // newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)
//
// // if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
// // mutex.Lock()
// // Blockchain = append(Blockchain, newBlock)
// // mutex.Unlock()
// // }
//
// // bytes, err := json.Marshal(Blockchain)
// // if err != nil {
// // log.Println(err)
// // }
//
// // spew.Dump(Blockchain)
//
// // mutex.Lock()
// // rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
// // rw.Flush()
// // mutex.Unlock()
// // }
//
//}

// handleStreamData reads a network stream to get the remote peer ID
// and NodeData. It returns the remote peer ID, NodeData, and any error.
func (node *OracleNode) handleStreamData(stream network.Stream) (peer.ID, pubsub2.NodeData, error) {
Expand Down
20 changes: 3 additions & 17 deletions pkg/workers/workers.go
Original file line number Diff line number Diff line change
Expand Up @@ -313,7 +313,7 @@ func SendWork(node *masa.OracleNode, m *pubsub2.Message) {
msg := &pubsub2.Message{}
err = json.Unmarshal([]byte(response.Value), msg)
if err != nil {
msg, err = getResponseMessage(result.(*messages.Response))
_, err = getResponseMessage(result.(*messages.Response))
if err != nil {
logrus.Debugf("Error getting response message: %v", err)
return
Expand Down Expand Up @@ -510,23 +510,9 @@ func processWork(data *pubsub2.Message, work string, startTime *time.Time, node
Duration: duration.Seconds(),
Timestamp: time.Now().Unix(),
}
logrus.Infof("[+] Work event: %v", workEvent)

// @todo
// Create a new stream to handle the block data
stream, err := node.Host.NewStream(context.Background(), peer.ID(workEvent.PeerId), config.ProtocolWithVersion(config.BlockTopic))
if err != nil {
logrus.Errorf("Failed to open stream: %v", err)
return
}
defer stream.Close()

// Write the message data to the stream
_, err = stream.Write(workEvent.Payload)
if err != nil {
logrus.Errorf("Failed to write to stream: %v", err)
return
}
// @todo
_ = node.PubSubManager.Publish(config.TopicWithVersion(config.BlockTopic), workEvent.Payload)

// updateRecords(node, workEvent)
}

0 comments on commit bec2123

Please sign in to comment.