Skip to content

Commit

Permalink
Merge PR: push pendingtx to kafka (#942)
Browse files Browse the repository at this point in the history
* push pendingtx to kafka

* delete debug code

* add some log

* fix

* update kafka msg type

* change kafkaAddr

Co-authored-by: Zhong Qiu <[email protected]>
Co-authored-by: MengXiangJian <[email protected]>
  • Loading branch information
3 people authored Aug 17, 2021
1 parent 2836a14 commit 2799e58
Show file tree
Hide file tree
Showing 5 changed files with 146 additions and 6 deletions.
6 changes: 3 additions & 3 deletions app/app.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@ package app

import (
"fmt"
"github.com/spf13/viper"
"io"
"math/big"
"os"
Expand Down Expand Up @@ -48,6 +47,7 @@ import (
"github.com/okex/exchain/x/staking"
"github.com/okex/exchain/x/stream"
"github.com/okex/exchain/x/token"
"github.com/spf13/viper"
abci "github.com/tendermint/tendermint/abci/types"
"github.com/tendermint/tendermint/crypto/tmhash"
"github.com/tendermint/tendermint/libs/log"
Expand All @@ -63,7 +63,7 @@ func init() {
}

const (
appName = "OKExChain"
appName = "OKExChain"
DynamicGpWeight = "dynamic-gp-weight"
EnableDynamicGp = "enable-dynamic-gp"
)
Expand Down Expand Up @@ -226,7 +226,7 @@ func NewOKExChainApp(
}

gpWeight := viper.GetInt(DynamicGpWeight)
if gpWeight == 0 {
if gpWeight <= 0 {
gpWeight = 1
} else if gpWeight > 100 {
gpWeight = 100
Expand Down
17 changes: 14 additions & 3 deletions app/rpc/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,18 @@ import (
"os"
"strings"

sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/spf13/viper"

"github.com/cosmos/cosmos-sdk/client/flags"
"github.com/cosmos/cosmos-sdk/client/input"
"github.com/cosmos/cosmos-sdk/client/lcd"
"github.com/cosmos/cosmos-sdk/crypto/keys"
cmserver "github.com/cosmos/cosmos-sdk/server"
sdk "github.com/cosmos/cosmos-sdk/types"
"github.com/ethereum/go-ethereum/rpc"
"github.com/okex/exchain/app/crypto/ethsecp256k1"
"github.com/okex/exchain/app/crypto/hd"
"github.com/okex/exchain/app/rpc/pendingtx"
"github.com/okex/exchain/app/rpc/websockets"
"github.com/spf13/viper"
)

const (
Expand All @@ -30,6 +30,8 @@ const (
FlagRateLimitBurst = "rpc.rate-limit-burst"
FlagEnableMonitor = "rpc.enable-monitor"
FlagDisableAPI = "rpc.disable-api"
FlagKafkaAddr = "pendingtx.kafka-addr"
FlagKafkaTopic = "pendingtx.kafka-topic"

MetricsNamespace = "x"
// MetricsSubsystem is a subsystem shared by all metrics exposed by this package.
Expand Down Expand Up @@ -87,6 +89,15 @@ func RegisterRoutes(rs *lcd.RestServer) {
websocketAddr := viper.GetString(flagWebsocket)
ws := websockets.NewServer(rs.CliCtx, rs.Logger(), websocketAddr)
ws.Start()

// pending tx watcher
kafkaAddrs := viper.GetString(FlagKafkaAddr)
kafkaTopic := viper.GetString(FlagKafkaTopic)
if kafkaAddrs != "" && kafkaTopic != "" {
kafkaClient := pendingtx.NewKafkaClient(strings.Split(kafkaAddrs, ","), kafkaTopic)
ptw := pendingtx.NewWatcher(rs.CliCtx, rs.Logger(), kafkaClient)
ptw.Start()
}
}

func unlockKeyFromNameAndPassphrase(accountNames []string, passphrase string) ([]ethsecp256k1.PrivKey, error) {
Expand Down
49 changes: 49 additions & 0 deletions app/rpc/pendingtx/kafka.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,49 @@
package pendingtx

import (
"context"
"encoding/json"

rpctypes "github.com/okex/exchain/app/rpc/types"
"github.com/segmentio/kafka-go"
)

type KafkaClient struct {
Topic string
*kafka.Writer
}

func NewKafkaClient(addrs []string, topic string) *KafkaClient {
return &KafkaClient{
Topic: topic,
Writer: kafka.NewWriter(kafka.WriterConfig{
Brokers: addrs,
Topic: topic,
Balancer: &kafka.LeastBytes{},
}),
}
}

type KafkaMsg struct {
Topic string `json:"topic"`
Source interface{} `json:"source"`
Data *rpctypes.Transaction `json:"data"`
}

func (kc *KafkaClient) Send(hash []byte, tx *rpctypes.Transaction) error {
msg, err := json.Marshal(KafkaMsg{
Topic: kc.Topic,
Data: tx,
})
if err != nil {
return err
}

// Automatic retries and reconnections on errors.
return kc.WriteMessages(context.Background(),
kafka.Message{
Key: hash,
Value: msg,
},
)
}
77 changes: 77 additions & 0 deletions app/rpc/pendingtx/watcher.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,77 @@
package pendingtx

import (
"fmt"

"github.com/cosmos/cosmos-sdk/client/context"
"github.com/ethereum/go-ethereum/common"
rpcfilters "github.com/okex/exchain/app/rpc/namespaces/eth/filters"
rpctypes "github.com/okex/exchain/app/rpc/types"
"github.com/tendermint/tendermint/libs/log"
coretypes "github.com/tendermint/tendermint/rpc/core/types"
tmtypes "github.com/tendermint/tendermint/types"
)

type Watcher struct {
clientCtx context.CLIContext
events *rpcfilters.EventSystem
logger log.Logger

sender Sender
}

type Sender interface {
Send(hash []byte, tx *rpctypes.Transaction) error
}

func NewWatcher(clientCtx context.CLIContext, log log.Logger, sender Sender) *Watcher {
return &Watcher{
clientCtx: clientCtx,
events: rpcfilters.NewEventSystem(clientCtx.Client),
logger: log.With("module", "pendingtx-watcher"),

sender: sender,
}
}

func (w *Watcher) Start() {
sub, _, err := w.events.SubscribePendingTxs()
if err != nil {
w.logger.Error("error creating block filter", "error", err.Error())
}

go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) {
for {
select {
case ev := <-txsCh:
data, ok := ev.Data.(tmtypes.EventDataTx)
if !ok {
w.logger.Error(fmt.Sprintf("invalid data type %T, expected EventDataTx", ev.Data), "ID", sub.ID())
continue
}
txHash := common.BytesToHash(data.Tx.Hash())
w.logger.Debug("receive tx from mempool", "txHash=", txHash.String())

ethTx, err := rpctypes.RawTxToEthTx(w.clientCtx, data.Tx)
if err != nil {
w.logger.Error("failed to decode raw tx to eth tx", "hash", txHash.String(), "error", err)
continue
}

tx, err := rpctypes.NewTransaction(ethTx, txHash, common.Hash{}, uint64(data.Height), uint64(data.Index))
if err != nil {
w.logger.Error("failed to new transaction", "hash", txHash.String(), "error", err)
continue
}

go func(hash []byte, tx *rpctypes.Transaction) {
w.logger.Debug("push pending tx to MQ", "txHash=", txHash.String())
err = w.sender.Send(hash, tx)
if err != nil {
w.logger.Error("failed to send pending tx", "hash", txHash.String(), "error", err)
}
}(txHash.Bytes(), tx)
}
}
}(sub.Event(), sub.Err())
}
3 changes: 3 additions & 0 deletions cmd/client/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,7 @@ func RegisterAppFlag(cmd *cobra.Command) {
cmd.Flags().Int(eth.BroadcastPeriodSecond, 10, "every BroadcastPeriodSecond second check the txPool, and broadcast when it's eligible")

cmd.Flags().Bool(rpc.FlagEnableMonitor, false, "Enable the rpc monitor and register rpc metrics to prometheus")

cmd.Flags().String(rpc.FlagKafkaAddr, "", "The address of kafka cluster to consume pending txs")
cmd.Flags().String(rpc.FlagKafkaTopic, "", "The topic that the kafka writer will produce messages to")
}

0 comments on commit 2799e58

Please sign in to comment.