Skip to content
This repository has been archived by the owner on Nov 14, 2023. It is now read-only.

Commit

Permalink
Merge pull request #245 from luyuan-li/opb-bsn
Browse files Browse the repository at this point in the history
modify productMsgToMq && parseTx
  • Loading branch information
kaifei Hu authored Feb 13, 2023
2 parents b65b366 + 6074d41 commit 206d98a
Show file tree
Hide file tree
Showing 2 changed files with 34 additions and 42 deletions.
38 changes: 23 additions & 15 deletions handlers/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,21 +162,6 @@ func parseTx(txBytes types.Tx, txResult *types2.ResponseDeliverTx, block *types.
return docTx, nil
}

feeGranter := authTx.FeeGranter()
if feeGranter != nil {
docTx.FeeGranter = feeGranter.String()
}

feePayer := authTx.FeePayer()
if feePayer != nil {
docTx.FeePayer = feePayer.String()
}

feeGrantee := GetFeeGranteeFromEvents(txResult.Events)
if feeGrantee != "" {
docTx.FeeGrantee = feeGrantee
}

docTx.Fee = msgsdktypes.BuildFee(authTx.GetFee(), authTx.GetGas())
docTx.Memo = authTx.GetMemo()

Expand Down Expand Up @@ -243,6 +228,29 @@ func parseTx(txBytes types.Tx, txResult *types2.ResponseDeliverTx, block *types.
docTx.ContractAddrs = removeDuplicatesFromSlice(docTx.ContractAddrs)
docTx.DocTxMsgs = docTxMsgs

feeGranter := authTx.FeeGranter()
if feeGranter != nil {
docTx.FeeGranter = feeGranter.String()
docTx.FeePayer = feeGranter.String()
} else {
feePayer := authTx.FeePayer()
if feePayer != nil {
docTx.FeePayer = feePayer.String()
} else {
if len(msgs) > 0 {
signers := msgs[0].GetSigners()
if len(signers) > 0 {
docTx.FeePayer = signers[0].String()
}
}
}
}

feeGrantee := GetFeeGranteeFromEvents(txResult.Events)
if feeGrantee != "" {
docTx.FeeGrantee = feeGrantee
}

//// don't save txs which have not parsed
//if docTx.Type == "" {
// logger.Warn(constant.NoSupportMsgTypeTag,
Expand Down
38 changes: 11 additions & 27 deletions tasks/execute.go
Original file line number Diff line number Diff line change
Expand Up @@ -380,12 +380,7 @@ func saveDocsWithTxn(blockDoc *models.Block, txDocs []*models.Tx, taskDoc *model
}

if len(txEvms) > 0 {
if err = productMsgToMq(txEvms); err != nil {
logger.Error("saveDocsWithTxn productMsgToMq fail",
logger.Int64("height", blockDoc.Height),
logger.String("err", err.Error()))
return err
}
go productMsgToMq(txEvms)
}

return nil
Expand Down Expand Up @@ -434,29 +429,16 @@ func dealEvmTx(txDocs []*models.Tx) []*models.EvmTx {
return txEvms
}

func productMsgToMq(txEvms []*models.EvmTx) error {
_, err := models.GetClient().DoTransaction(context.Background(), func(sessCtx context.Context) (interface{}, error) {
func productMsgToMq(txEvms []*models.EvmTx) {
_, _ = models.GetClient().DoTransaction(context.Background(), func(sessCtx context.Context) (interface{}, error) {
for _, txEvm := range txEvms {
streamLen, err := stream.GetClient().GetStreamLen(config.GetConfig().Redis.StreamTxEvmKey)
if err != nil {
logger.Error("productMsgToMq stream GetStreamLen fail",
logger.String("err", err.Error()))
return nil, err
}

if streamLen >= config.GetConfig().Redis.StreamMqMaxLen {
//TODO 队列满了,需要监控告警
logger.Debug("productMsgToMq streamLen >= StreamMqMaxLen", logger.Int64("maxLen", config.GetConfig().Redis.StreamMqMaxLen))
return nil, nil
}

_, err = stream.GetClient().PutMsg(config.GetConfig().Redis.StreamTxEvmKey, txEvm.GetStreamMap())
_, err := stream.GetClient().PutMsg(config.GetConfig().Redis.StreamTxEvmKey, txEvm.GetStreamMap())
if err != nil {
logger.Error("productMsgToMq stream PutMsg fail",
logger.Int64("height", txEvm.Height),
logger.String("evm_tx_hash", txEvm.EvmTxHash),
logger.String("err", err.Error()))
return nil, err
continue
}

txEvmCli := models.GetClient().Database(models.GetDbConf().Database).Collection(models.EvmTx{}.Name())
Expand All @@ -468,13 +450,15 @@ func productMsgToMq(txEvms []*models.EvmTx) error {
"update_time": time.Now().Unix(),
},
}
if err := txEvmCli.UpdateOne(sessCtx, query, update); err != nil {
return nil, err
if err = txEvmCli.UpdateOne(sessCtx, query, update); err != nil {
logger.Error("productMsgToMq txEvmCli UpdateOne",
logger.Int64("height", txEvm.Height),
logger.String("evm_tx_hash", txEvm.EvmTxHash),
logger.String("err", err.Error()))
continue
}
}

return nil, nil
})

return err
}

0 comments on commit 206d98a

Please sign in to comment.