diff --git a/handlers/tx.go b/handlers/tx.go index 2ac01f6..62c88ec 100644 --- a/handlers/tx.go +++ b/handlers/tx.go @@ -162,21 +162,6 @@ func parseTx(txBytes types.Tx, txResult *types2.ResponseDeliverTx, block *types. return docTx, nil } - feeGranter := authTx.FeeGranter() - if feeGranter != nil { - docTx.FeeGranter = feeGranter.String() - } - - feePayer := authTx.FeePayer() - if feePayer != nil { - docTx.FeePayer = feePayer.String() - } - - feeGrantee := GetFeeGranteeFromEvents(txResult.Events) - if feeGrantee != "" { - docTx.FeeGrantee = feeGrantee - } - docTx.Fee = msgsdktypes.BuildFee(authTx.GetFee(), authTx.GetGas()) docTx.Memo = authTx.GetMemo() @@ -243,6 +228,29 @@ func parseTx(txBytes types.Tx, txResult *types2.ResponseDeliverTx, block *types. docTx.ContractAddrs = removeDuplicatesFromSlice(docTx.ContractAddrs) docTx.DocTxMsgs = docTxMsgs + feeGranter := authTx.FeeGranter() + if feeGranter != nil { + docTx.FeeGranter = feeGranter.String() + docTx.FeePayer = feeGranter.String() + } else { + feePayer := authTx.FeePayer() + if feePayer != nil { + docTx.FeePayer = feePayer.String() + } else { + if len(msgs) > 0 { + signers := msgs[0].GetSigners() + if len(signers) > 0 { + docTx.FeePayer = signers[0].String() + } + } + } + } + + feeGrantee := GetFeeGranteeFromEvents(txResult.Events) + if feeGrantee != "" { + docTx.FeeGrantee = feeGrantee + } + //// don't save txs which have not parsed //if docTx.Type == "" { // logger.Warn(constant.NoSupportMsgTypeTag, diff --git a/tasks/execute.go b/tasks/execute.go index b85984e..1697868 100644 --- a/tasks/execute.go +++ b/tasks/execute.go @@ -380,12 +380,7 @@ func saveDocsWithTxn(blockDoc *models.Block, txDocs []*models.Tx, taskDoc *model } if len(txEvms) > 0 { - if err = productMsgToMq(txEvms); err != nil { - logger.Error("saveDocsWithTxn productMsgToMq fail", - logger.Int64("height", blockDoc.Height), - logger.String("err", err.Error())) - return err - } + go productMsgToMq(txEvms) } return nil @@ -434,29 +429,16 @@ func dealEvmTx(txDocs []*models.Tx) []*models.EvmTx { return txEvms } -func productMsgToMq(txEvms []*models.EvmTx) error { - _, err := models.GetClient().DoTransaction(context.Background(), func(sessCtx context.Context) (interface{}, error) { +func productMsgToMq(txEvms []*models.EvmTx) { + _, _ = models.GetClient().DoTransaction(context.Background(), func(sessCtx context.Context) (interface{}, error) { for _, txEvm := range txEvms { - streamLen, err := stream.GetClient().GetStreamLen(config.GetConfig().Redis.StreamTxEvmKey) - if err != nil { - logger.Error("productMsgToMq stream GetStreamLen fail", - logger.String("err", err.Error())) - return nil, err - } - - if streamLen >= config.GetConfig().Redis.StreamMqMaxLen { - //TODO 队列满了,需要监控告警 - logger.Debug("productMsgToMq streamLen >= StreamMqMaxLen", logger.Int64("maxLen", config.GetConfig().Redis.StreamMqMaxLen)) - return nil, nil - } - - _, err = stream.GetClient().PutMsg(config.GetConfig().Redis.StreamTxEvmKey, txEvm.GetStreamMap()) + _, err := stream.GetClient().PutMsg(config.GetConfig().Redis.StreamTxEvmKey, txEvm.GetStreamMap()) if err != nil { logger.Error("productMsgToMq stream PutMsg fail", logger.Int64("height", txEvm.Height), logger.String("evm_tx_hash", txEvm.EvmTxHash), logger.String("err", err.Error())) - return nil, err + continue } txEvmCli := models.GetClient().Database(models.GetDbConf().Database).Collection(models.EvmTx{}.Name()) @@ -468,13 +450,15 @@ func productMsgToMq(txEvms []*models.EvmTx) error { "update_time": time.Now().Unix(), }, } - if err := txEvmCli.UpdateOne(sessCtx, query, update); err != nil { - return nil, err + if err = txEvmCli.UpdateOne(sessCtx, query, update); err != nil { + logger.Error("productMsgToMq txEvmCli UpdateOne", + logger.Int64("height", txEvm.Height), + logger.String("evm_tx_hash", txEvm.EvmTxHash), + logger.String("err", err.Error())) + continue } } return nil, nil }) - - return err }