Skip to content

Commit

Permalink
WIP blockdata
Browse files Browse the repository at this point in the history
  • Loading branch information
jdutchak committed Jul 18, 2024
1 parent d38a9f6 commit a20187c
Show file tree
Hide file tree
Showing 6 changed files with 181 additions and 23 deletions.
23 changes: 20 additions & 3 deletions pkg/api/handlers_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package api

import (
"bytes"
"context"
"encoding/base64"
"encoding/json"
"fmt"
Expand Down Expand Up @@ -880,12 +881,28 @@ func (api *API) Test() gin.HandlerFunc {
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
}

err = api.Node.PubSubManager.Publish(config.TopicWithVersion(config.BlockTopic), bodyBytes)
/// test
stream, err := api.Node.Host.NewStream(context.Background(), api.Node.Host.ID(), config.ProtocolWithVersion(config.BlockTopic))
if err != nil {
logrus.Errorf("Error publishing block: %v", err)
c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
logrus.Errorf("Failed to open stream: %v", err)
return
}
defer stream.Close()

// Write the message data to the stream
_, err = stream.Write(bodyBytes)
if err != nil {
logrus.Errorf("Failed to write to stream: %v", err)
return
}
/// test

// err = api.Node.PubSubManager.Publish(config.TopicWithVersion(config.BlockTopic), bodyBytes)
// if err != nil {
// logrus.Errorf("Error publishing block: %v", err)
// c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()})
// return
// }

c.JSON(http.StatusOK, gin.H{"message": "message sent"})
}
Expand Down
1 change: 1 addition & 0 deletions pkg/oracle_node.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,7 @@ func (node *OracleNode) Start() (err error) {
// IsStaked
if node.IsStaked {
node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeGossipTopic), node.GossipNodeData)
node.Host.SetStreamHandler(config.ProtocolWithVersion(config.BlockTopic), node.BlockData)
}
node.Host.Network().Notify(node.NodeTracker)

Expand Down
140 changes: 140 additions & 0 deletions pkg/oracle_node_listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"math"
"time"
Expand Down Expand Up @@ -215,6 +216,145 @@ func (node *OracleNode) GossipNodeData(stream network.Stream) {
}
}

func (node *OracleNode) BlockData(stream network.Stream) {
logrus.Info("BlockData")

// Read from the stream
// data, err := io.ReadAll(stream)
// if err != nil {
// logrus.Errorf("Failed to read stream: %v", err)
// return
// }

remotePeerId, nodeData, err := node.handleStreamData(stream)
if err != nil {
logrus.Errorf("Failed to read stream: %v", err)
return
}
// Only allow create blocks from other nodes ?
if remotePeerId.String() != nodeData.PeerId.String() {
rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream))
// go readData(node, rw)
// go writeData(node, rw)
logrus.Info("BlockData to be ledgered", rw)
}
err = stream.Close()
if err != nil {
logrus.Errorf("Failed to close stream: %v", err)
}
}

func readData(node *OracleNode, rw *bufio.ReadWriter) {

for {
str, err := rw.ReadString('\n')
if err != nil {

logrus.Fatal(err)
}

if str == "" {
return
}
if str != "\n" {

logrus.Info("readData", node.multiAddrs, str)

// chain := make([]Block, 0)
// if err := json.Unmarshal([]byte(str), &chain); err != nil {
// log.Fatal(err)
// }

// mutex.Lock()
// if len(chain) > len(Blockchain) {
// Blockchain = chain
// bytes, err := json.MarshalIndent(Blockchain, "", " ")
// if err != nil {

// log.Fatal(err)
// }
// // Green console color: \x1b[32m
// // Reset console color: \x1b[0m
// fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes))
// }
// mutex.Unlock()
}
}
}

func writeData(node *OracleNode, rw *bufio.ReadWriter) {
for {
sendData := fmt.Sprintf("peer connected %s", node.multiAddrs)

_, err := rw.WriteString(sendData)
if err != nil {
logrus.Error("Error writing to buffer:", err)
return
}
err = rw.Flush()
if err != nil {
logrus.Error("Error flushing buffer:", err)
return
}

time.Sleep(time.Second * 10)
}

// go func() {
// for {
// time.Sleep(5 * time.Second)
// mutex.Lock()
// bytes, err := json.Marshal(Blockchain)
// if err != nil {
// log.Println(err)
// }
// mutex.Unlock()

// mutex.Lock()
// rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
// rw.Flush()
// mutex.Unlock()

// }
// }()

// stdReader := bufio.NewReader(os.Stdin)

// for {
// fmt.Print("> ")
// sendData, err := stdReader.ReadString('\n')
// if err != nil {
// log.Fatal(err)
// }

// sendData = strings.Replace(sendData, "\n", "", -1)
// bpm, err := strconv.Atoi(sendData)
// if err != nil {
// log.Fatal(err)
// }
// newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm)

// if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) {
// mutex.Lock()
// Blockchain = append(Blockchain, newBlock)
// mutex.Unlock()
// }

// bytes, err := json.Marshal(Blockchain)
// if err != nil {
// log.Println(err)
// }

// spew.Dump(Blockchain)

// mutex.Lock()
// rw.WriteString(fmt.Sprintf("%s\n", string(bytes)))
// rw.Flush()
// mutex.Unlock()
// }

}

// handleStreamData reads a network stream to get the remote peer ID
// and NodeData. It returns the remote peer ID, NodeData, and any error.
func (node *OracleNode) handleStreamData(stream network.Stream) (peer.ID, pubsub2.NodeData, error) {
Expand Down
16 changes: 16 additions & 0 deletions pkg/pubsub/block_tracker.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,22 @@ type BlockEventTracker struct {
// HandleMessage implements subscription BlockEventTracker handler
func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) {
logrus.Infof("chain -> Received block from: %s", m.ReceivedFrom)

// Create a new stream to handle the block data
// stream, err := b.Node.Host.NewStream(context.Background(), m.ReceivedFrom, config.ProtocolWithVersion(config.BlockTopic))
// if err != nil {
// logrus.Errorf("Failed to open stream: %v", err)
// return
// }
// defer stream.Close()

// // Write the message data to the stream
// _, err = stream.Write(m.Data)
// if err != nil {
// logrus.Errorf("Failed to write to stream: %v", err)
// return
// }

var blocks Blocks
err := json.Unmarshal(m.Data, &blocks)
if err != nil {
Expand Down
20 changes: 0 additions & 20 deletions pkg/pubsub/node_data.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,26 +99,6 @@ func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, act
}
}

//// CalculateCurrentUptime calculates the current uptime based on Unix timestamps.
//func (n *NodeData) CalculateCurrentUptime() {
// if n.Activity == ActivityJoined {
// n.CurrentUptime = time.Duration(n.LastUpdatedUnix-n.LastJoinedUnix) * time.Second
// } else {
// n.CurrentUptime = 0
// }
// n.CurrentUptimeStr = n.CurrentUptime.String()
//}

//// CalculateAccumulatedUptime calculates the accumulated uptime based on Unix timestamps.
//func (n *NodeData) CalculateAccumulatedUptime() {
// if n.FirstJoinedUnix > 0 && n.LastLeftUnix > 0 {
// n.AccumulatedUptime = time.Duration(n.LastLeftUnix-n.FirstJoinedUnix) * time.Second
// } else {
// n.AccumulatedUptime = 0
// }
// n.AccumulatedUptimeStr = n.AccumulatedUptime.String()
//}

// Address returns a string representation of the NodeData's multiaddress
// and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC".
// This can be used by other nodes to connect to this node.
Expand Down
4 changes: 4 additions & 0 deletions pkg/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@ func SubscribeToTopics(node *OracleNode) error {
return err
}

//if err := node.PubSubManager.AddSubscription(config.TopicWithVersion(config.BlockTopic), node.BlockData, true); err != nil {
// return err
//}

// Subscribe to PublicKeyTopic to manage and verify public keys within the network.
if err := node.PubSubManager.AddSubscription(config.TopicWithVersion(config.PublicKeyTopic), &pubsub2.PublicKeySubscriptionHandler{}, false); err != nil {
return err
Expand Down

0 comments on commit a20187c

Please sign in to comment.