diff --git a/app/app.go b/app/app.go index 30cc946588..e194e03b2f 100644 --- a/app/app.go +++ b/app/app.go @@ -2,7 +2,6 @@ package app import ( "fmt" - "github.com/spf13/viper" "io" "math/big" "os" @@ -48,6 +47,7 @@ import ( "github.com/okex/exchain/x/staking" "github.com/okex/exchain/x/stream" "github.com/okex/exchain/x/token" + "github.com/spf13/viper" abci "github.com/tendermint/tendermint/abci/types" "github.com/tendermint/tendermint/crypto/tmhash" "github.com/tendermint/tendermint/libs/log" @@ -63,7 +63,7 @@ func init() { } const ( - appName = "OKExChain" + appName = "OKExChain" DynamicGpWeight = "dynamic-gp-weight" EnableDynamicGp = "enable-dynamic-gp" ) @@ -226,7 +226,7 @@ func NewOKExChainApp( } gpWeight := viper.GetInt(DynamicGpWeight) - if gpWeight == 0 { + if gpWeight <= 0 { gpWeight = 1 } else if gpWeight > 100 { gpWeight = 100 diff --git a/app/rpc/config.go b/app/rpc/config.go index 6e0210a67e..2effbdbe13 100644 --- a/app/rpc/config.go +++ b/app/rpc/config.go @@ -6,18 +6,18 @@ import ( "os" "strings" - sdk "github.com/cosmos/cosmos-sdk/types" - "github.com/spf13/viper" - "github.com/cosmos/cosmos-sdk/client/flags" "github.com/cosmos/cosmos-sdk/client/input" "github.com/cosmos/cosmos-sdk/client/lcd" "github.com/cosmos/cosmos-sdk/crypto/keys" cmserver "github.com/cosmos/cosmos-sdk/server" + sdk "github.com/cosmos/cosmos-sdk/types" "github.com/ethereum/go-ethereum/rpc" "github.com/okex/exchain/app/crypto/ethsecp256k1" "github.com/okex/exchain/app/crypto/hd" + "github.com/okex/exchain/app/rpc/pendingtx" "github.com/okex/exchain/app/rpc/websockets" + "github.com/spf13/viper" ) const ( @@ -30,6 +30,8 @@ const ( FlagRateLimitBurst = "rpc.rate-limit-burst" FlagEnableMonitor = "rpc.enable-monitor" FlagDisableAPI = "rpc.disable-api" + FlagKafkaAddr = "pendingtx.kafka-addr" + FlagKafkaTopic = "pendingtx.kafka-topic" MetricsNamespace = "x" // MetricsSubsystem is a subsystem shared by all metrics exposed by this package. @@ -87,6 +89,15 @@ func RegisterRoutes(rs *lcd.RestServer) { websocketAddr := viper.GetString(flagWebsocket) ws := websockets.NewServer(rs.CliCtx, rs.Logger(), websocketAddr) ws.Start() + + // pending tx watcher + kafkaAddrs := viper.GetString(FlagKafkaAddr) + kafkaTopic := viper.GetString(FlagKafkaTopic) + if kafkaAddrs != "" && kafkaTopic != "" { + kafkaClient := pendingtx.NewKafkaClient(strings.Split(kafkaAddrs, ","), kafkaTopic) + ptw := pendingtx.NewWatcher(rs.CliCtx, rs.Logger(), kafkaClient) + ptw.Start() + } } func unlockKeyFromNameAndPassphrase(accountNames []string, passphrase string) ([]ethsecp256k1.PrivKey, error) { diff --git a/app/rpc/pendingtx/kafka.go b/app/rpc/pendingtx/kafka.go new file mode 100644 index 0000000000..48a1f86a57 --- /dev/null +++ b/app/rpc/pendingtx/kafka.go @@ -0,0 +1,49 @@ +package pendingtx + +import ( + "context" + "encoding/json" + + rpctypes "github.com/okex/exchain/app/rpc/types" + "github.com/segmentio/kafka-go" +) + +type KafkaClient struct { + Topic string + *kafka.Writer +} + +func NewKafkaClient(addrs []string, topic string) *KafkaClient { + return &KafkaClient{ + Topic: topic, + Writer: kafka.NewWriter(kafka.WriterConfig{ + Brokers: addrs, + Topic: topic, + Balancer: &kafka.LeastBytes{}, + }), + } +} + +type KafkaMsg struct { + Topic string `json:"topic"` + Source interface{} `json:"source"` + Data *rpctypes.Transaction `json:"data"` +} + +func (kc *KafkaClient) Send(hash []byte, tx *rpctypes.Transaction) error { + msg, err := json.Marshal(KafkaMsg{ + Topic: kc.Topic, + Data: tx, + }) + if err != nil { + return err + } + + // Automatic retries and reconnections on errors. + return kc.WriteMessages(context.Background(), + kafka.Message{ + Key: hash, + Value: msg, + }, + ) +} diff --git a/app/rpc/pendingtx/watcher.go b/app/rpc/pendingtx/watcher.go new file mode 100644 index 0000000000..a65183fb41 --- /dev/null +++ b/app/rpc/pendingtx/watcher.go @@ -0,0 +1,77 @@ +package pendingtx + +import ( + "fmt" + + "github.com/cosmos/cosmos-sdk/client/context" + "github.com/ethereum/go-ethereum/common" + rpcfilters "github.com/okex/exchain/app/rpc/namespaces/eth/filters" + rpctypes "github.com/okex/exchain/app/rpc/types" + "github.com/tendermint/tendermint/libs/log" + coretypes "github.com/tendermint/tendermint/rpc/core/types" + tmtypes "github.com/tendermint/tendermint/types" +) + +type Watcher struct { + clientCtx context.CLIContext + events *rpcfilters.EventSystem + logger log.Logger + + sender Sender +} + +type Sender interface { + Send(hash []byte, tx *rpctypes.Transaction) error +} + +func NewWatcher(clientCtx context.CLIContext, log log.Logger, sender Sender) *Watcher { + return &Watcher{ + clientCtx: clientCtx, + events: rpcfilters.NewEventSystem(clientCtx.Client), + logger: log.With("module", "pendingtx-watcher"), + + sender: sender, + } +} + +func (w *Watcher) Start() { + sub, _, err := w.events.SubscribePendingTxs() + if err != nil { + w.logger.Error("error creating block filter", "error", err.Error()) + } + + go func(txsCh <-chan coretypes.ResultEvent, errCh <-chan error) { + for { + select { + case ev := <-txsCh: + data, ok := ev.Data.(tmtypes.EventDataTx) + if !ok { + w.logger.Error(fmt.Sprintf("invalid data type %T, expected EventDataTx", ev.Data), "ID", sub.ID()) + continue + } + txHash := common.BytesToHash(data.Tx.Hash()) + w.logger.Debug("receive tx from mempool", "txHash=", txHash.String()) + + ethTx, err := rpctypes.RawTxToEthTx(w.clientCtx, data.Tx) + if err != nil { + w.logger.Error("failed to decode raw tx to eth tx", "hash", txHash.String(), "error", err) + continue + } + + tx, err := rpctypes.NewTransaction(ethTx, txHash, common.Hash{}, uint64(data.Height), uint64(data.Index)) + if err != nil { + w.logger.Error("failed to new transaction", "hash", txHash.String(), "error", err) + continue + } + + go func(hash []byte, tx *rpctypes.Transaction) { + w.logger.Debug("push pending tx to MQ", "txHash=", txHash.String()) + err = w.sender.Send(hash, tx) + if err != nil { + w.logger.Error("failed to send pending tx", "hash", txHash.String(), "error", err) + } + }(txHash.Bytes(), tx) + } + } + }(sub.Event(), sub.Err()) +} diff --git a/cmd/client/flags.go b/cmd/client/flags.go index bc9196db1b..c226878483 100644 --- a/cmd/client/flags.go +++ b/cmd/client/flags.go @@ -43,4 +43,7 @@ func RegisterAppFlag(cmd *cobra.Command) { cmd.Flags().Int(eth.BroadcastPeriodSecond, 10, "every BroadcastPeriodSecond second check the txPool, and broadcast when it's eligible") cmd.Flags().Bool(rpc.FlagEnableMonitor, false, "Enable the rpc monitor and register rpc metrics to prometheus") + + cmd.Flags().String(rpc.FlagKafkaAddr, "", "The address of kafka cluster to consume pending txs") + cmd.Flags().String(rpc.FlagKafkaTopic, "", "The topic that the kafka writer will produce messages to") }