diff --git a/pkg/api/handlers_data.go b/pkg/api/handlers_data.go index d3e02d20..ae125e5b 100644 --- a/pkg/api/handlers_data.go +++ b/pkg/api/handlers_data.go @@ -2,6 +2,7 @@ package api import ( "bytes" + "context" "encoding/base64" "encoding/json" "fmt" @@ -880,12 +881,28 @@ func (api *API) Test() gin.HandlerFunc { c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) } - err = api.Node.PubSubManager.Publish(config.TopicWithVersion(config.BlockTopic), bodyBytes) + /// test + stream, err := api.Node.Host.NewStream(context.Background(), api.Node.Host.ID(), config.ProtocolWithVersion(config.BlockTopic)) if err != nil { - logrus.Errorf("Error publishing block: %v", err) - c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + logrus.Errorf("Failed to open stream: %v", err) + return + } + defer stream.Close() + + // Write the message data to the stream + _, err = stream.Write(bodyBytes) + if err != nil { + logrus.Errorf("Failed to write to stream: %v", err) return } + /// test + + // err = api.Node.PubSubManager.Publish(config.TopicWithVersion(config.BlockTopic), bodyBytes) + // if err != nil { + // logrus.Errorf("Error publishing block: %v", err) + // c.JSON(http.StatusBadRequest, gin.H{"error": err.Error()}) + // return + // } c.JSON(http.StatusOK, gin.H{"message": "message sent"}) } diff --git a/pkg/oracle_node.go b/pkg/oracle_node.go index d8f6c302..1497c6cd 100644 --- a/pkg/oracle_node.go +++ b/pkg/oracle_node.go @@ -209,6 +209,7 @@ func (node *OracleNode) Start() (err error) { // IsStaked if node.IsStaked { node.Host.SetStreamHandler(config.ProtocolWithVersion(config.NodeGossipTopic), node.GossipNodeData) + node.Host.SetStreamHandler(config.ProtocolWithVersion(config.BlockTopic), node.BlockData) } node.Host.Network().Notify(node.NodeTracker) diff --git a/pkg/oracle_node_listener.go b/pkg/oracle_node_listener.go index 649c161d..9e3e453e 100644 --- a/pkg/oracle_node_listener.go +++ b/pkg/oracle_node_listener.go @@ -5,6 +5,7 @@ import ( "bytes" "context" "encoding/json" + "fmt" "io" "math" "time" @@ -215,6 +216,145 @@ func (node *OracleNode) GossipNodeData(stream network.Stream) { } } +func (node *OracleNode) BlockData(stream network.Stream) { + logrus.Info("BlockData") + + // Read from the stream + // data, err := io.ReadAll(stream) + // if err != nil { + // logrus.Errorf("Failed to read stream: %v", err) + // return + // } + + remotePeerId, nodeData, err := node.handleStreamData(stream) + if err != nil { + logrus.Errorf("Failed to read stream: %v", err) + return + } + // Only allow create blocks from other nodes ? + if remotePeerId.String() != nodeData.PeerId.String() { + rw := bufio.NewReadWriter(bufio.NewReader(stream), bufio.NewWriter(stream)) + // go readData(node, rw) + // go writeData(node, rw) + logrus.Info("BlockData to be ledgered", rw) + } + err = stream.Close() + if err != nil { + logrus.Errorf("Failed to close stream: %v", err) + } +} + +func readData(node *OracleNode, rw *bufio.ReadWriter) { + + for { + str, err := rw.ReadString('\n') + if err != nil { + + logrus.Fatal(err) + } + + if str == "" { + return + } + if str != "\n" { + + logrus.Info("readData", node.multiAddrs, str) + + // chain := make([]Block, 0) + // if err := json.Unmarshal([]byte(str), &chain); err != nil { + // log.Fatal(err) + // } + + // mutex.Lock() + // if len(chain) > len(Blockchain) { + // Blockchain = chain + // bytes, err := json.MarshalIndent(Blockchain, "", " ") + // if err != nil { + + // log.Fatal(err) + // } + // // Green console color: \x1b[32m + // // Reset console color: \x1b[0m + // fmt.Printf("\x1b[32m%s\x1b[0m> ", string(bytes)) + // } + // mutex.Unlock() + } + } +} + +func writeData(node *OracleNode, rw *bufio.ReadWriter) { + for { + sendData := fmt.Sprintf("peer connected %s", node.multiAddrs) + + _, err := rw.WriteString(sendData) + if err != nil { + logrus.Error("Error writing to buffer:", err) + return + } + err = rw.Flush() + if err != nil { + logrus.Error("Error flushing buffer:", err) + return + } + + time.Sleep(time.Second * 10) + } + + // go func() { + // for { + // time.Sleep(5 * time.Second) + // mutex.Lock() + // bytes, err := json.Marshal(Blockchain) + // if err != nil { + // log.Println(err) + // } + // mutex.Unlock() + + // mutex.Lock() + // rw.WriteString(fmt.Sprintf("%s\n", string(bytes))) + // rw.Flush() + // mutex.Unlock() + + // } + // }() + + // stdReader := bufio.NewReader(os.Stdin) + + // for { + // fmt.Print("> ") + // sendData, err := stdReader.ReadString('\n') + // if err != nil { + // log.Fatal(err) + // } + + // sendData = strings.Replace(sendData, "\n", "", -1) + // bpm, err := strconv.Atoi(sendData) + // if err != nil { + // log.Fatal(err) + // } + // newBlock := generateBlock(Blockchain[len(Blockchain)-1], bpm) + + // if isBlockValid(newBlock, Blockchain[len(Blockchain)-1]) { + // mutex.Lock() + // Blockchain = append(Blockchain, newBlock) + // mutex.Unlock() + // } + + // bytes, err := json.Marshal(Blockchain) + // if err != nil { + // log.Println(err) + // } + + // spew.Dump(Blockchain) + + // mutex.Lock() + // rw.WriteString(fmt.Sprintf("%s\n", string(bytes))) + // rw.Flush() + // mutex.Unlock() + // } + +} + // handleStreamData reads a network stream to get the remote peer ID // and NodeData. It returns the remote peer ID, NodeData, and any error. func (node *OracleNode) handleStreamData(stream network.Stream) (peer.ID, pubsub2.NodeData, error) { diff --git a/pkg/pubsub/block_tracker.go b/pkg/pubsub/block_tracker.go index cb25c6a3..06e1db6b 100644 --- a/pkg/pubsub/block_tracker.go +++ b/pkg/pubsub/block_tracker.go @@ -22,6 +22,22 @@ type BlockEventTracker struct { // HandleMessage implements subscription BlockEventTracker handler func (b *BlockEventTracker) HandleMessage(m *pubsub.Message) { logrus.Infof("chain -> Received block from: %s", m.ReceivedFrom) + + // Create a new stream to handle the block data + // stream, err := b.Node.Host.NewStream(context.Background(), m.ReceivedFrom, config.ProtocolWithVersion(config.BlockTopic)) + // if err != nil { + // logrus.Errorf("Failed to open stream: %v", err) + // return + // } + // defer stream.Close() + + // // Write the message data to the stream + // _, err = stream.Write(m.Data) + // if err != nil { + // logrus.Errorf("Failed to write to stream: %v", err) + // return + // } + var blocks Blocks err := json.Unmarshal(m.Data, &blocks) if err != nil { diff --git a/pkg/pubsub/node_data.go b/pkg/pubsub/node_data.go index b7a57ee8..7a488c74 100644 --- a/pkg/pubsub/node_data.go +++ b/pkg/pubsub/node_data.go @@ -99,26 +99,6 @@ func NewNodeData(addr multiaddr.Multiaddr, peerId peer.ID, publicKey string, act } } -//// CalculateCurrentUptime calculates the current uptime based on Unix timestamps. -//func (n *NodeData) CalculateCurrentUptime() { -// if n.Activity == ActivityJoined { -// n.CurrentUptime = time.Duration(n.LastUpdatedUnix-n.LastJoinedUnix) * time.Second -// } else { -// n.CurrentUptime = 0 -// } -// n.CurrentUptimeStr = n.CurrentUptime.String() -//} - -//// CalculateAccumulatedUptime calculates the accumulated uptime based on Unix timestamps. -//func (n *NodeData) CalculateAccumulatedUptime() { -// if n.FirstJoinedUnix > 0 && n.LastLeftUnix > 0 { -// n.AccumulatedUptime = time.Duration(n.LastLeftUnix-n.FirstJoinedUnix) * time.Second -// } else { -// n.AccumulatedUptime = 0 -// } -// n.AccumulatedUptimeStr = n.AccumulatedUptime.String() -//} - // Address returns a string representation of the NodeData's multiaddress // and peer ID in the format "/ip4/127.0.0.1/tcp/4001/p2p/QmcgpsyWgH8Y8ajJz1Cu72KnS5uo2Aa2LpzU7kinSupNKC". // This can be used by other nodes to connect to this node. diff --git a/pkg/subscriptions.go b/pkg/subscriptions.go index b4bc77de..9d1ad0e1 100644 --- a/pkg/subscriptions.go +++ b/pkg/subscriptions.go @@ -15,6 +15,10 @@ func SubscribeToTopics(node *OracleNode) error { return err } + //if err := node.PubSubManager.AddSubscription(config.TopicWithVersion(config.BlockTopic), node.BlockData, true); err != nil { + // return err + //} + // Subscribe to PublicKeyTopic to manage and verify public keys within the network. if err := node.PubSubManager.AddSubscription(config.TopicWithVersion(config.PublicKeyTopic), &pubsub2.PublicKeySubscriptionHandler{}, false); err != nil { return err