Skip to content

Commit

Permalink
Merge PR: broadcast optimize (#1597)
Browse files Browse the repository at this point in the history
* chID str optimized

* optimized prometheus Counter

* updateSendBytesTotalMetrics

* TxMessage amino marshaller

* update test

* mempool reactor encodeMsg

* mempool reactor encodeMsg optimize

Co-authored-by: xiangjianmeng <[email protected]>
  • Loading branch information
cwbhhjl and xiangjianmeng authored Feb 25, 2022
1 parent 4ef1ff9 commit 4abdcfb
Show file tree
Hide file tree
Showing 7 changed files with 338 additions and 14 deletions.
19 changes: 19 additions & 0 deletions libs/tendermint/mempool/codec.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,25 @@ import (

var cdc = amino.NewCodec()

var txMessageAminoTypePrefix []byte

func init() {
RegisterMessages(cdc)

txMessageAminoTypePrefix = initTxMessageAminoTypePrefix(cdc)
}

func initTxMessageAminoTypePrefix(cdc *amino.Codec) []byte {
txMessageAminoTypePrefix := make([]byte, 8)
tpl, err := cdc.GetTypePrefix(&TxMessage{}, txMessageAminoTypePrefix)
if err != nil {
panic(err)
}
txMessageAminoTypePrefix = txMessageAminoTypePrefix[:tpl]
return txMessageAminoTypePrefix
}

// getTxMessageAminoTypePrefix returns the amino type prefix of TxMessage, the result is readonly!
func getTxMessageAminoTypePrefix() []byte {
return txMessageAminoTypePrefix
}
71 changes: 69 additions & 2 deletions libs/tendermint/mempool/reactor.go
Original file line number Diff line number Diff line change
@@ -1,13 +1,15 @@
package mempool

import (
"bytes"
"fmt"
"github.com/ethereum/go-ethereum/common"
"math"
"reflect"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"

abci "github.com/okex/exchain/libs/tendermint/abci/types"
cfg "github.com/okex/exchain/libs/tendermint/config"
"github.com/okex/exchain/libs/tendermint/libs/clist"
Expand Down Expand Up @@ -296,7 +298,7 @@ func (memR *Reactor) broadcastTxRoutine(peer p2p.Peer) {
}
}

success := peer.Send(MempoolChannel, cdc.MustMarshalBinaryBare(msg))
success := peer.Send(MempoolChannel, memR.encodeMsg(msg))
if !success {
time.Sleep(peerCatchupSleepIntervalMS * time.Millisecond)
continue
Expand Down Expand Up @@ -325,6 +327,18 @@ func RegisterMessages(cdc *amino.Codec) {
cdc.RegisterInterface((*Message)(nil), nil)
cdc.RegisterConcrete(&TxMessage{}, "tendermint/mempool/TxMessage", nil)
cdc.RegisterConcrete(&WtxMessage{}, "tendermint/mempool/WtxMessage", nil)

cdc.RegisterConcreteMarshaller("tendermint/mempool/TxMessage", func(codec *amino.Codec, i interface{}) ([]byte, error) {
txmp, ok := i.(*TxMessage)
if ok {
return txmp.MarshalToAmino(codec)
}
txm, ok := i.(TxMessage)
if ok {
return txm.MarshalToAmino(codec)
}
return nil, fmt.Errorf("%T is not a TxMessage", i)
})
}

func (memR *Reactor) decodeMsg(bz []byte) (msg Message, err error) {
Expand All @@ -336,13 +350,66 @@ func (memR *Reactor) decodeMsg(bz []byte) (msg Message, err error) {
return
}

func (memR *Reactor) encodeMsg(msg Message) []byte {
var ok bool
var txmp *TxMessage
var txm TxMessage
if txmp, ok = msg.(*TxMessage); !ok {
txmp = nil
if txm, ok = msg.(TxMessage); ok {
txmp = &txm
}
}
if txmp != nil {
buf := &bytes.Buffer{}
tp := getTxMessageAminoTypePrefix()
buf.Grow(len(tp) + txmp.AminoSize(cdc))
// we manually assemble the encoded bytes for performance
buf.Write(tp)
err := txmp.MarshalAminoTo(cdc, buf)
if err == nil {
return buf.Bytes()
}
}
return cdc.MustMarshalBinaryBare(msg)
}

//-------------------------------------

// TxMessage is a Message containing a transaction.
type TxMessage struct {
Tx types.Tx
}

func (m TxMessage) AminoSize(_ *amino.Codec) int {
size := 0
if len(m.Tx) > 0 {
size += 1 + amino.ByteSliceSize(m.Tx)
}
return size
}

func (m TxMessage) MarshalToAmino(cdc *amino.Codec) ([]byte, error) {
buf := new(bytes.Buffer)
buf.Grow(m.AminoSize(cdc))
err := m.MarshalAminoTo(cdc, buf)
if err != nil {
return nil, err
}
return buf.Bytes(), nil
}

func (m TxMessage) MarshalAminoTo(_ *amino.Codec, buf *bytes.Buffer) error {
if len(m.Tx) != 0 {
const pbKey = byte(1<<3 | amino.Typ3_ByteLength)
err := amino.EncodeByteSliceWithKeyToBuffer(buf, m.Tx, pbKey)
if err != nil {
return err
}
}
return nil
}

// String returns a string representation of the TxMessage.
func (m *TxMessage) String() string {
return fmt.Sprintf("[TxMessage %v]", m.Tx)
Expand Down
69 changes: 69 additions & 0 deletions libs/tendermint/mempool/reactor_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,14 @@
package mempool

import (
"math/rand"
"net"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"

"github.com/okex/exchain/libs/tendermint/crypto/ed25519"

"github.com/fortytw2/leaktest"
Expand Down Expand Up @@ -263,3 +266,69 @@ func TestVerifyWtx(t *testing.T) {
err = wtx.verify(nodeKeyWhitelist)
assert.Nil(t, err)
}

func TestTxMessageAmino(t *testing.T) {
testcases := []TxMessage{
{},
{[]byte{}},
{[]byte{1, 2, 3, 4, 5, 6, 7}},
}

var typePrefix = make([]byte, 8)
tpLen, err := cdc.GetTypePrefix(TxMessage{}, typePrefix)
require.NoError(t, err)
typePrefix = typePrefix[:tpLen]
reactor := Reactor{}

for _, tx := range testcases {
var m Message
m = tx
expectBz, err := cdc.MarshalBinaryBare(m)
require.NoError(t, err)
actualBz, err := tx.MarshalToAmino(cdc)
require.NoError(t, err)

require.Equal(t, expectBz, append(typePrefix, actualBz...))
require.Equal(t, len(expectBz), tpLen+tx.AminoSize(cdc))

actualBz, err = cdc.MarshalBinaryBareWithRegisteredMarshaller(tx)
require.NoError(t, err)

require.Equal(t, expectBz, actualBz)
require.Equal(t, cdc.MustMarshalBinaryBare(m), reactor.encodeMsg(&tx))
require.Equal(t, cdc.MustMarshalBinaryBare(m), reactor.encodeMsg(tx))
}
}

func BenchmarkTxMessageAminoMarshal(b *testing.B) {
var bz = make([]byte, 256)
rand.Read(bz)
txm := TxMessage{bz}
reactor := &Reactor{}
b.ResetTimer()

b.Run("amino", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err := cdc.MarshalBinaryBare(&txm)
if err != nil {
b.Fatal(err)
}
}
})
b.Run("marshaller", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
_, err := cdc.MarshalBinaryBareWithRegisteredMarshaller(&txm)
if err != nil {
b.Fatal(err)
}
}
})
b.Run("encodeMsg", func(b *testing.B) {
b.ReportAllocs()
for i := 0; i < b.N; i++ {
reactor.encodeMsg(&txm)
}
})
}
7 changes: 6 additions & 1 deletion libs/tendermint/p2p/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,11 @@ type Metrics struct {
NumTxs metrics.Gauge
}

type peerChMetric struct {
PeerReceiveBytesTotal map[byte]metrics.Counter
PeerSendBytesTotal map[byte]metrics.Counter
}

// PrometheusMetrics returns Metrics build using Prometheus client library.
// Optionally, labels can be provided along with their values ("foo",
// "fooValue").
Expand All @@ -48,7 +53,7 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics {
Name: "peer_receive_bytes_total",
Help: "Number of bytes received from a given peer.",
}, append(labels, "peer_id", "chID")).With(labelsAndValues...),
PeerSendBytesTotal: prometheus.NewCounterFrom(stdprometheus.CounterOpts{
PeerSendBytesTotal: NewCounterFrom(stdprometheus.CounterOpts{
Namespace: namespace,
Subsystem: MetricsSubsystem,
Name: "peer_send_bytes_total",
Expand Down
66 changes: 55 additions & 11 deletions libs/tendermint/p2p/peer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,8 @@ import (
"net"
"time"

"github.com/go-kit/kit/metrics"

"github.com/okex/exchain/libs/tendermint/libs/cmap"
"github.com/okex/exchain/libs/tendermint/libs/log"
"github.com/okex/exchain/libs/tendermint/libs/service"
Expand All @@ -14,6 +16,30 @@ import (

const metricsTickerDuration = 10 * time.Second

var chIdStrTable = [256]string{
"0x0", "0x1", "0x2", "0x3", "0x4", "0x5", "0x6", "0x7", "0x8", "0x9", "0xa", "0xb", "0xc", "0xd", "0xe", "0xf",
"0x10", "0x11", "0x12", "0x13", "0x14", "0x15", "0x16", "0x17", "0x18", "0x19", "0x1a", "0x1b", "0x1c", "0x1d", "0x1e", "0x1f",
"0x20", "0x21", "0x22", "0x23", "0x24", "0x25", "0x26", "0x27", "0x28", "0x29", "0x2a", "0x2b", "0x2c", "0x2d", "0x2e", "0x2f",
"0x30", "0x31", "0x32", "0x33", "0x34", "0x35", "0x36", "0x37", "0x38", "0x39", "0x3a", "0x3b", "0x3c", "0x3d", "0x3e", "0x3f",
"0x40", "0x41", "0x42", "0x43", "0x44", "0x45", "0x46", "0x47", "0x48", "0x49", "0x4a", "0x4b", "0x4c", "0x4d", "0x4e", "0x4f",
"0x50", "0x51", "0x52", "0x53", "0x54", "0x55", "0x56", "0x57", "0x58", "0x59", "0x5a", "0x5b", "0x5c", "0x5d", "0x5e", "0x5f",
"0x60", "0x61", "0x62", "0x63", "0x64", "0x65", "0x66", "0x67", "0x68", "0x69", "0x6a", "0x6b", "0x6c", "0x6d", "0x6e", "0x6f",
"0x70", "0x71", "0x72", "0x73", "0x74", "0x75", "0x76", "0x77", "0x78", "0x79", "0x7a", "0x7b", "0x7c", "0x7d", "0x7e", "0x7f",
"0x80", "0x81", "0x82", "0x83", "0x84", "0x85", "0x86", "0x87", "0x88", "0x89", "0x8a", "0x8b", "0x8c", "0x8d", "0x8e", "0x8f",
"0x90", "0x91", "0x92", "0x93", "0x94", "0x95", "0x96", "0x97", "0x98", "0x99", "0x9a", "0x9b", "0x9c", "0x9d", "0x9e", "0x9f",
"0xa0", "0xa1", "0xa2", "0xa3", "0xa4", "0xa5", "0xa6", "0xa7", "0xa8", "0xa9", "0xaa", "0xab", "0xac", "0xad", "0xae", "0xaf",
"0xb0", "0xb1", "0xb2", "0xb3", "0xb4", "0xb5", "0xb6", "0xb7", "0xb8", "0xb9", "0xba", "0xbb", "0xbc", "0xbd", "0xbe", "0xbf",
"0xc0", "0xc1", "0xc2", "0xc3", "0xc4", "0xc5", "0xc6", "0xc7", "0xc8", "0xc9", "0xca", "0xcb", "0xcc", "0xcd", "0xce", "0xcf",
"0xd0", "0xd1", "0xd2", "0xd3", "0xd4", "0xd5", "0xd6", "0xd7", "0xd8", "0xd9", "0xda", "0xdb", "0xdc", "0xdd", "0xde", "0xdf",
"0xe0", "0xe1", "0xe2", "0xe3", "0xe4", "0xe5", "0xe6", "0xe7", "0xe8", "0xe9", "0xea", "0xeb", "0xec", "0xed", "0xee", "0xef",
"0xf0", "0xf1", "0xf2", "0xf3", "0xf4", "0xf5", "0xf6", "0xf7", "0xf8", "0xf9", "0xfa", "0xfb", "0xfc", "0xfd", "0xfe", "0xff",
}

func getChIdStr(chID byte) string {
// fmt.Sprintf("%#x", chID),
return chIdStrTable[chID]
}

// Peer is an interface representing a peer connected on a reactor.
type Peer interface {
service.Service
Expand Down Expand Up @@ -115,6 +141,7 @@ type peer struct {

metrics *Metrics
metricsTicker *time.Ticker
chMetrics peerChMetric
}

type PeerOption func(*peer)
Expand Down Expand Up @@ -150,6 +177,19 @@ func newPeer(
option(p)
}

p.chMetrics = peerChMetric{
PeerSendBytesTotal: make(map[byte]metrics.Counter),
PeerReceiveBytesTotal: make(map[byte]metrics.Counter),
}

if p.metrics != nil {
pid := string(p.ID())
for _, ch := range p.channels {
p.chMetrics.PeerSendBytesTotal[ch] = p.metrics.PeerSendBytesTotal.With("peer_id", pid, "chID", getChIdStr(ch))
p.chMetrics.PeerReceiveBytesTotal[ch] = p.metrics.PeerReceiveBytesTotal.With("peer_id", pid, "chID", getChIdStr(ch))
}
}

return p
}

Expand Down Expand Up @@ -249,11 +289,7 @@ func (p *peer) Send(chID byte, msgBytes []byte) bool {
}
res := p.mconn.Send(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.updateSendBytesTotalMetrics(chID, len(msgBytes))
}
return res
}
Expand All @@ -268,11 +304,7 @@ func (p *peer) TrySend(chID byte, msgBytes []byte) bool {
}
res := p.mconn.TrySend(chID, msgBytes)
if res {
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(len(msgBytes)))
p.updateSendBytesTotalMetrics(chID, len(msgBytes))
}
return res
}
Expand Down Expand Up @@ -359,6 +391,18 @@ func (p *peer) metricsReporter() {
}
}

func (p *peer) updateSendBytesTotalMetrics(chID byte, msgBytesLen int) {
if counter, ok := p.chMetrics.PeerSendBytesTotal[chID]; ok {
counter.Add(float64(msgBytesLen))
} else {
labels := []string{
"peer_id", string(p.ID()),
"chID", getChIdStr(chID),
}
p.metrics.PeerSendBytesTotal.With(labels...).Add(float64(msgBytesLen))
}
}

//------------------------------------------------------------------
// helper funcs

Expand All @@ -380,7 +424,7 @@ func createMConnection(
}
labels := []string{
"peer_id", string(p.ID()),
"chID", fmt.Sprintf("%#x", chID),
"chID", getChIdStr(chID),
}
p.metrics.PeerReceiveBytesTotal.With(labels...).Add(float64(len(msgBytes)))
reactor.Receive(chID, p, msgBytes)
Expand Down
Loading

0 comments on commit 4abdcfb

Please sign in to comment.