Skip to content

Commit

Permalink
refactor(interx): improve transaction handling and direction processi…
Browse files Browse the repository at this point in the history
…ng.Removed the redundant TxsResponse definition and replaced it with types.TxsResponse for consistency.Refactored GetTransactionsWithSync. Consolidated outbound and inbound logic with improved query construction.Eliminated unnecessary checks for empty addresses.Replaced repetitive endpoint construction with BuildTxSearchEndpoint.

Simplified GetFilteredTransactions.Introduced processDirection to handle direction-specific logic modularly.Removed redundant loops and conditions for processing inbound and outbound transactions.Added processDirection as a reusable function for transaction direction handling.Enhanced QueryBlockTransactionsHandler.Removed redundant address validation.Updated responses to use types.TxsResponse for consistency.Removed the obsolete getBlockHeight function, as it is no longer needed.
  • Loading branch information
golnar-boosty committed Nov 21, 2024
1 parent a0ab572 commit aaa1a97
Showing 1 changed file with 49 additions and 73 deletions.
122 changes: 49 additions & 73 deletions gateway/interx/interx.tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ import (
"golang.org/x/exp/slices"
)

type TxsResponse struct {
Transactions []types.TransactionResponse `json:"transactions"`
TotalCount int `json:"total_count"`
}

// RegisterInterxTxRoutes registers tx query routers.
func RegisterInterxTxRoutes(r *mux.Router, gwCosmosmux *runtime.ServeMux, rpcAddr string) {
r.HandleFunc(config.QueryUnconfirmedTxs, QueryUnconfirmedTxs(rpcAddr)).Methods("GET")
Expand All @@ -47,10 +42,6 @@ func GetTransactionsWithSync(rpcAddr string, address string, isOutbound bool) (*
var limit = 100
var limitPages = 100

if address == "" {
return &tmTypes.ResultTxSearch{}, nil
}

lastBlock := database.GetLastBlockFetched(address, isOutbound)
totalResult := tmTypes.ResultTxSearch{
Txs: []*tmTypes.ResultTx{},
Expand All @@ -59,15 +50,23 @@ func GetTransactionsWithSync(rpcAddr string, address string, isOutbound bool) (*

for page < limitPages {
var events = make([]string, 0, 5)
if isOutbound {
events = append(events, fmt.Sprintf("message.sender='%s'", address))
if address != "" {
if isOutbound {
events = append(events, fmt.Sprintf("message.sender='%s'", address))
} else {
events = append(events, fmt.Sprintf("transfer.recipient='%s'", address))
}
events = append(events, fmt.Sprintf("tx.height>%d", lastBlock))
}

var query string
if address == "" {
query = "tx.height>0"
} else {
events = append(events, fmt.Sprintf("transfer.recipient='%s'", address))
query = strings.Join(events, "%20AND%20")
}
events = append(events, fmt.Sprintf("tx.height>%d", lastBlock))

// search transactions
endpoint := fmt.Sprintf("%s/tx_search?query=\"%s\"&page=%d&per_page=%d&order_by=\"desc\"", rpcAddr, strings.Join(events, "%20AND%20"), page, limit)
endpoint := common.BuildTxSearchEndpoint(rpcAddr, query, page, limit, "desc")
common.GetLogger().Info("[query-transaction] Entering transaction search: ", endpoint)

resp, err := http.Get(endpoint)
Expand Down Expand Up @@ -131,31 +130,23 @@ func GetFilteredTransactions(rpcAddr string, address string, txtypes []string, d
Txs: []*tmTypes.ResultTx{},
TotalCount: 0,
}

if len(directions) == 0 {
directions = []string{"inbound", "outbound"}
}
if slices.Contains(directions, "inbound") {
cachedTxs1, err := GetTransactionsWithSync(rpcAddr, address, false)
for _, cachedTx := range cachedTxs1.Txs {
hashToDirectionMap[cachedTx.Hash.String()] = append(hashToDirectionMap[cachedTx.Hash.String()], "inbound")
}
if err != nil {
return nil, err
}
cachedTxs.TotalCount += cachedTxs1.TotalCount
cachedTxs.Txs = append(cachedTxs.Txs, cachedTxs1.Txs...)
}

if slices.Contains(directions, "outbound") {
cachedTxs2, err := GetTransactionsWithSync(rpcAddr, address, true)
for _, cachedTx := range cachedTxs2.Txs {
hashToDirectionMap[cachedTx.Hash.String()] = append(hashToDirectionMap[cachedTx.Hash.String()], "outbound")
}
if err != nil {
return nil, err
if address != "" {
for _, direction := range directions {
switch direction {
case "inbound":
cachedTxs, _ = processDirection(rpcAddr, address, false, "inbound", hashToDirectionMap, &cachedTxs)

case "outbound":
cachedTxs, _ = processDirection(rpcAddr, address, true, "outbound", hashToDirectionMap, &cachedTxs)
}
}
cachedTxs.TotalCount += cachedTxs2.TotalCount
cachedTxs.Txs = append(cachedTxs.Txs, cachedTxs2.Txs...)
} else {
cachedTxs, _ = processDirection(rpcAddr, address, true, "outbound", hashToDirectionMap, &cachedTxs)
}

var res []types.TransactionResponse
Expand Down Expand Up @@ -224,6 +215,7 @@ func GetFilteredTransactions(rpcAddr string, address string, txtypes []string, d
Status: hashStatus,
Direction: hashToDirectionMap[cachedTx.Hash.String()][0],
Hash: fmt.Sprintf("0x%X", cachedTx.Hash),
Height: cachedTx.Height,
Txs: txResponses,
}
if len(hashToDirectionMap[cachedTx.Hash.String()]) > 1 {
Expand Down Expand Up @@ -306,39 +298,6 @@ func SearchTxHashHandle(rpcAddr string, sender string, recipient string, txType
return result, nil
}

// Get block height for tx hash from cache or tendermint
func getBlockHeight(rpcAddr string, hash string) (int64, error) {
endpoint := fmt.Sprintf("%s/tx?hash=%s", rpcAddr, hash)
common.GetLogger().Info("[query-block] Entering block query: ", endpoint)

resp, err := http.Get(endpoint)
if err != nil {
common.GetLogger().Error("[query-block] Unable to connect to ", endpoint)
return 0, err
}
defer resp.Body.Close()

respBody, _ := ioutil.ReadAll(resp.Body)
response := new(tmJsonRPCTypes.RPCResponse)

if err := json.Unmarshal(respBody, response); err != nil {
common.GetLogger().Error("[query-block] Unable to decode response: ", err)
return 0, err
}
if response.Error != nil {
common.GetLogger().Error("[query-block] Error response:", response.Error.Message)
return 0, errors.New(response.Error.Message)
}

result := new(tmTypes.ResultTx)
if err := tmjson.Unmarshal(response.Result, result); err != nil {
common.GetLogger().Error("[query-block] Failed to unmarshal result:", err)
return 0, fmt.Errorf("error unmarshalling result: %w", err)
}

return result.Height, nil
}

func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{}, interface{}, int) {
err := r.ParseForm()
if err != nil {
Expand Down Expand Up @@ -371,10 +330,6 @@ func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{}

//------------ Address ------------
account = r.FormValue("address")
if account == "" {
common.GetLogger().Error("[query-transactions] 'address' is not set")
return common.ServeError(0, "'address' is not set", "", http.StatusBadRequest)
}

//------------ Direction ------------
directionsParam := r.FormValue("direction")
Expand Down Expand Up @@ -509,7 +464,7 @@ func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{}
}
txResults = txResults[offset:int(math.Min(float64(offset+limit), float64(len(txResults))))]

res := TxsResponse{
res := types.TxsResponse{
TotalCount: totalCount,
Transactions: txResults,
}
Expand Down Expand Up @@ -655,3 +610,24 @@ func QueryUnconfirmedTxs(rpcAddr string) http.HandlerFunc {
common.WrapResponse(w, request, *response, statusCode, false)
}
}

func processDirection(
rpcAddr, address string,
isOutbound bool,
direction string,
hashToDirectionMap map[string][]string,
cachedTxs *tmTypes.ResultTxSearch,
) (tmTypes.ResultTxSearch, error) {
transactions, err := GetTransactionsWithSync(rpcAddr, address, isOutbound)
if err != nil {
return tmTypes.ResultTxSearch{}, err
}

for _, tx := range transactions.Txs {
hashToDirectionMap[tx.Hash.String()] = append(hashToDirectionMap[tx.Hash.String()], direction)
}

cachedTxs.TotalCount += transactions.TotalCount
cachedTxs.Txs = append(cachedTxs.Txs, transactions.Txs...)
return *cachedTxs, nil
}

0 comments on commit aaa1a97

Please sign in to comment.