Skip to content

Commit

Permalink
Fixed some review issues
Browse files Browse the repository at this point in the history
  • Loading branch information
esuwu committed Jan 13, 2025
1 parent 52377b4 commit fb0e42f
Show file tree
Hide file tree
Showing 9 changed files with 111 additions and 202 deletions.
23 changes: 12 additions & 11 deletions cmd/blockchaininfo/nats_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ func printBlockInfo(blockInfoProto *g.BlockInfo) error {
if err != nil {
return err
}
log.Println(string(blockInfoJSON))
zap.S().Info(string(blockInfoJSON))
return nil
}

Expand All @@ -41,14 +41,14 @@ func printContractInfo(contractInfoProto *g.L2ContractDataEntries, scheme proto.
// Delete data entries are not going to have "type"
prettyJSON, err := json.MarshalIndent(contractInfo, "", " ")
if err != nil {
log.Println("Error converting to pretty JSON:", err)
zap.S().Errorf("failed converting to pretty JSON, %v", err)
return err
}
heightStr := strconv.FormatUint(contractInfoProto.Height, 10)
// Write the pretty JSON to a file
err = os.WriteFile(path+heightStr+".json", prettyJSON, 0600)
if err != nil {
log.Println("Error writing to file:", err)
zap.S().Errorf("failed writing to file: %v", err)
return err
}

Expand All @@ -59,15 +59,15 @@ func receiveBlockUpdates(msg *nats.Msg) {
blockUpdatesInfo := new(g.BlockInfo)
unmrshlErr := blockUpdatesInfo.UnmarshalVT(msg.Data)
if unmrshlErr != nil {
log.Printf("failed to unmarshal block updates, %v", unmrshlErr)
zap.S().Errorf("failed to unmarshal block updates, %v", unmrshlErr)
return
}

err := printBlockInfo(blockUpdatesInfo)
if err != nil {
return
}
log.Printf("Received on %s:\n", msg.Subject)
zap.S().Infof("Received on %s:\n", msg.Subject)
}

func receiveContractUpdates(msg *nats.Msg, contractMsg []byte, scheme proto.Scheme, path string) []byte {
Expand All @@ -78,11 +78,11 @@ func receiveContractUpdates(msg *nats.Msg, contractMsg []byte, scheme proto.Sche
contractMsg = msg.Data[1:]
contractUpdatesInfo := new(g.L2ContractDataEntries)
if err := contractUpdatesInfo.UnmarshalVT(contractMsg); err != nil {
log.Printf("Failed to unmarshal contract updates: %v", err)
zap.S().Errorf("Failed to unmarshal contract updates: %v", err)
return contractMsg
}
if err := printContractInfo(contractUpdatesInfo, scheme, path); err != nil {
log.Printf("Failed to print contract info: %v", err)
zap.S().Errorf("Failed to print contract info: %v", err)
return contractMsg
}
contractMsg = nil
Expand All @@ -95,13 +95,13 @@ func receiveContractUpdates(msg *nats.Msg, contractMsg []byte, scheme proto.Sche
contractMsg = append(contractMsg, msg.Data[1:]...)
contractUpdatesInfo := new(g.L2ContractDataEntries)
if err := contractUpdatesInfo.UnmarshalVT(contractMsg); err != nil {
log.Printf("Failed to unmarshal contract updates: %v", err)
zap.S().Errorf("Failed to unmarshal contract updates: %v", err)
return contractMsg
}

go func() {
if err := printContractInfo(contractUpdatesInfo, scheme, path); err != nil {
log.Printf("Failed to print contract info updates: %v", err)
zap.S().Errorf("Failed to print contract info updates: %v", err)
}
}()
contractMsg = nil
Expand Down Expand Up @@ -132,7 +132,7 @@ func main() {
l2ContractAddress string
)
// Initialize the zap logger
l, err := zap.NewProduction()
logger, err := zap.NewProduction()
if err != nil {
log.Fatalf("failed to initialize zap logger: %v", err)
}
Expand All @@ -141,7 +141,8 @@ func main() {
if syncErr != nil {
log.Fatalf("failed to sync zap logger %v", syncErr)
}
}(l)
}(logger)
zap.ReplaceGlobals(logger)

flag.StringVar(&blockchainType, "blockchain-type", "testnet", "Blockchain scheme (e.g., stagenet, testnet, mainnet)")
flag.StringVar(&updatesPath, "updates-path", "", "File path to store contract updates")
Expand Down
92 changes: 58 additions & 34 deletions pkg/blockchaininfo/blockchaininfo.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,37 +3,80 @@ package blockchaininfo
import (
"bytes"
"encoding/binary"
"errors"
"fmt"
"strconv"
"strings"

"github.com/pkg/errors"
"github.com/wavesplatform/gowaves/pkg/proto"
)

const EpochKeyPrefix = "epoch_"
const blockMeta0xKeyPrefix = "block_0x"

// Helper function to read uint64 from bytes.
func readInt64(data *bytes.Reader) int64 {
func readInt64(data *bytes.Reader) (int64, error) {
var num int64
err := binary.Read(data, binary.BigEndian, &num)
if err != nil {
panic(fmt.Sprintf("Failed to read uint64: %v", err))
return 0, err
}
return num
return num, nil
}

// Decode base64 and extract blockHeight and height.
func extractEpochFromBlockMeta(metaBlockValueBytes []byte) int64 {
func extractEpochFromBlockMeta(metaBlockValueBytes []byte) (int64, error) {
// Create a bytes reader for easier parsing.
reader := bytes.NewReader(metaBlockValueBytes)

// Extract blockHeight and epoch.
readInt64(reader)
epoch := readInt64(reader)
_, err := readInt64(reader)
if err != nil {
return 0, errors.Errorf("failed to read the block height from blockMeta, %v", err)
}
epoch, err := readInt64(reader)
if err != nil {
return 0, errors.Errorf("failed to read the epoch from blockMeta, %v", err)
}

return epoch, nil
}

func filterEpochEntry(entry proto.DataEntry, beforeHeight uint64) ([]proto.DataEntry, error) {
key := entry.GetKey()
// Extract the part after "epoch_"
epochStr := key[len(EpochKeyPrefix):]

return epoch
epochNumber, err := strconv.ParseUint(epochStr, 10, 64)
if err != nil {
return nil, err
}

// Return this entry only if epochNumber is greater than beforeHeight
if epochNumber > beforeHeight {
return []proto.DataEntry{entry}, nil
}
return nil, nil
}

func filterBlock0xEntry(entry proto.DataEntry, beforeHeight uint64) ([]proto.DataEntry, error) {
// Extract blockHeight and height from base64.
binaryEntry, ok := entry.(*proto.BinaryDataEntry)
if !ok {
return nil, errors.New("failed to convert block meta key to binary data entry")
}
epoch, err := extractEpochFromBlockMeta(binaryEntry.Value)
if err != nil {
return nil, errors.Errorf("failed to filter data entries, %v", err)
}

if epoch < 0 {
return nil, errors.New("epoch is less than 0")
}
// Return this entry only if epochNumber is greater than beforeHeight
if uint64(epoch) > beforeHeight {
return []proto.DataEntry{entry}, nil
}
return nil, nil
}

func filterDataEntries(beforeHeight uint64, dataEntries []proto.DataEntry) ([]proto.DataEntry, error) {
Expand All @@ -45,38 +88,19 @@ func filterDataEntries(beforeHeight uint64, dataEntries []proto.DataEntry) ([]pr
switch {
// Filter "epoch_" prefixed keys.
case strings.HasPrefix(key, EpochKeyPrefix):
// Extract the numeric part after "epoch_"
epochStr := key[len(EpochKeyPrefix):]

// Convert the epoch number to uint64.
epochNumber, err := strconv.ParseUint(epochStr, 10, 64)
entryOrNil, err := filterEpochEntry(entry, beforeHeight)
if err != nil {
return nil, err
}

// Compare epoch number with beforeHeight.
if epochNumber > beforeHeight {
// Add to filtered list if epochNumber is greater.
filteredDataEntries = append(filteredDataEntries, entry)
}
filteredDataEntries = append(filteredDataEntries, entryOrNil...)

// Filter block_0x binary entries.
case strings.HasPrefix(key, blockMeta0xKeyPrefix):
// Extract blockHeight and height from base64.
binaryEntry, ok := entry.(*proto.BinaryDataEntry)
if !ok {
return nil, errors.New("failed to convert block meta key to binary data entry")
}
epoch := extractEpochFromBlockMeta(binaryEntry.Value)

if epoch < 0 {
return nil, errors.New("epoch is less than 0")
}
// Compare height with beforeHeight.
if uint64(epoch) > beforeHeight {
// Add to filtered list if height is less than beforeHeight.
filteredDataEntries = append(filteredDataEntries, entry)
entryOrNil, err := filterBlock0xEntry(entry, beforeHeight)
if err != nil {
return nil, err
}
filteredDataEntries = append(filteredDataEntries, entryOrNil...)

// Default case to handle non-epoch and non-base64 entries.
default:
Expand Down
7 changes: 7 additions & 0 deletions pkg/blockchaininfo/bupdates.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,14 @@ package blockchaininfo

import (
"context"
"time"

"github.com/wavesplatform/gowaves/pkg/proto"
"go.uber.org/zap"
)

const ChannelWriteTimeout = 10 * time.Second

type BlockchainUpdatesExtension struct {
ctx context.Context
enableBlockchainUpdatesPlugin bool
Expand Down Expand Up @@ -70,6 +74,9 @@ func (e *BlockchainUpdatesExtension) WriteBUpdates(bUpdates BUpdatesInfo) {
}
select {
case e.bUpdatesChannel <- bUpdates:
case <-time.After(ChannelWriteTimeout):
zap.S().Errorf("failed to write into the blockchain updates channel, out of time")
return
case <-e.ctx.Done():
e.close()
return
Expand Down
Loading

0 comments on commit fb0e42f

Please sign in to comment.