Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feature/add_all_transaction_list_query -> release/v0.4.50 #242

Open
wants to merge 8 commits into
base: release/v0.4.50
Choose a base branch
from
3 changes: 2 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
# INTERX

For test
INTERX is an interchain engine, proxy, load balancer & security gateway service for communication between backend and frontend.
It will connect to the node using the GRPC endpoint as well as the RPC endpoint ([`Tendermint RPC`](https://docs.tendermint.com/master/rpc/)).

Expand Down Expand Up @@ -509,4 +510,4 @@ Remember this settings when you set/update manually from `config.json`.

### How to update caching configurations
All caching configurations are set in `config.json` file.
`config.json` file includes `rpc_methods` field and there you can set/update caching config of each endpoint.
`config.json` file includes `rpc_methods` field and there you can set/update caching config of each endpoint.
7 changes: 7 additions & 0 deletions common/gateway.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package common
import (
"encoding/base64"
"encoding/json"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
Expand Down Expand Up @@ -344,3 +345,9 @@ func RosettaBuildError(code int, message string, description string, retriable b
func RosettaServeError(code int, data string, message string, statusCode int) (interface{}, interface{}, int) {
return nil, RosettaBuildError(code, message, data, true, nil), statusCode
}

// BuildTxSearchEndpoint creates a tx_search endpoint.
func BuildTxSearchEndpoint(rpcAddr string, query string, page int, limit int, orderBy string) string {
return fmt.Sprintf("%s/tx_search?query=\"%s\"&page=%d&per_page=%d&order_by=\"%s\"",
rpcAddr, query, page, limit, orderBy)
}
4 changes: 2 additions & 2 deletions config/constants.go
Original file line number Diff line number Diff line change
@@ -1,8 +1,8 @@
package config

const (
InterxVersion = "v0.4.48"
SekaiVersion = "v0.3.42"
InterxVersion = "v0.4.50"
SekaiVersion = "v0.4.1"
CosmosVersion = "v0.47.6"

QueryDashboard = "/api/dashboard"
Expand Down
46 changes: 34 additions & 12 deletions database/transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,9 +13,18 @@ import (

// GetTransactions is a function to get user transactions from cache
func GetTransactions(address string, isWithdraw bool) (*tmTypes.ResultTxSearch, error) {
filePath := fmt.Sprintf("%s/transactions/%s", config.GetDbCacheDir(), address)
if !isWithdraw {
filePath = filePath + "-inbound"
var filePath string

basePath := fmt.Sprintf("%s/transactions", config.GetDbCacheDir())
suffix := "-inbound"
if isWithdraw {
suffix = ""
}

if address == "" {
filePath = fmt.Sprintf("%s/all-transactions%s", basePath, suffix)
} else {
filePath = fmt.Sprintf("%s/%s%s", basePath, address, suffix)
}

data := tmTypes.ResultTxSearch{}
Expand Down Expand Up @@ -55,10 +64,12 @@ func GetLastBlockFetched(address string, isWithdraw bool) int64 {
func SaveTransactions(address string, txsData tmTypes.ResultTxSearch, isWithdraw bool) error {
cachedData, _ := GetTransactions(address, isWithdraw)

// Append new txs to the cached txs array
if cachedData.TotalCount > 0 {
txsData.Txs = append(txsData.Txs, cachedData.Txs...)
txsData.TotalCount = txsData.TotalCount + cachedData.TotalCount
if address != "" {
// Append new txs to the cached txs array
if cachedData.TotalCount > 0 {
txsData.Txs = append(txsData.Txs, cachedData.Txs...)
txsData.TotalCount = txsData.TotalCount + cachedData.TotalCount
}
}

data, err := json.Marshal(txsData)
Expand All @@ -67,10 +78,8 @@ func SaveTransactions(address string, txsData tmTypes.ResultTxSearch, isWithdraw
}

folderPath := fmt.Sprintf("%s/transactions", config.GetDbCacheDir())
filePath := fmt.Sprintf("%s/%s", folderPath, address)
if !isWithdraw {
filePath = filePath + "-inbound"
}
fileName := resolveFileName(address, isWithdraw)
filePath := fmt.Sprintf("%s/%s", folderPath, fileName)

global.Mutex.Lock()
err = os.MkdirAll(folderPath, os.ModePerm)
Expand All @@ -85,8 +94,21 @@ func SaveTransactions(address string, txsData tmTypes.ResultTxSearch, isWithdraw
global.Mutex.Unlock()

if err != nil {
fmt.Println("[cache] Unable to save response: ", filePath)
fmt.Println("[SaveTransactions][cache] Unable to save response: ", filePath)
}

return err
}

// Helper function to determine the file name
func resolveFileName(address string, isWithdraw bool) string {
if address == "" {
address = "all-transactions"
}

if !isWithdraw {
return fmt.Sprintf("%s-inbound", address)
}

return address
}
122 changes: 49 additions & 73 deletions gateway/interx/interx.tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,11 +26,6 @@ import (
"golang.org/x/exp/slices"
)

type TxsResponse struct {
Transactions []types.TransactionResponse `json:"transactions"`
TotalCount int `json:"total_count"`
}

// RegisterInterxTxRoutes registers tx query routers.
func RegisterInterxTxRoutes(r *mux.Router, gwCosmosmux *runtime.ServeMux, rpcAddr string) {
r.HandleFunc(config.QueryUnconfirmedTxs, QueryUnconfirmedTxs(rpcAddr)).Methods("GET")
Expand All @@ -47,10 +42,6 @@ func GetTransactionsWithSync(rpcAddr string, address string, isOutbound bool) (*
var limit = 100
var limitPages = 100

if address == "" {
return &tmTypes.ResultTxSearch{}, nil
}

lastBlock := database.GetLastBlockFetched(address, isOutbound)
totalResult := tmTypes.ResultTxSearch{
Txs: []*tmTypes.ResultTx{},
Expand All @@ -59,15 +50,23 @@ func GetTransactionsWithSync(rpcAddr string, address string, isOutbound bool) (*

for page < limitPages {
var events = make([]string, 0, 5)
if isOutbound {
events = append(events, fmt.Sprintf("message.sender='%s'", address))
if address != "" {
if isOutbound {
events = append(events, fmt.Sprintf("message.sender='%s'", address))
} else {
events = append(events, fmt.Sprintf("transfer.recipient='%s'", address))
}
events = append(events, fmt.Sprintf("tx.height>%d", lastBlock))
}

var query string
if address == "" {
query = "tx.height>0"
} else {
events = append(events, fmt.Sprintf("transfer.recipient='%s'", address))
query = strings.Join(events, "%20AND%20")
}
events = append(events, fmt.Sprintf("tx.height>%d", lastBlock))

// search transactions
endpoint := fmt.Sprintf("%s/tx_search?query=\"%s\"&page=%d&per_page=%d&order_by=\"desc\"", rpcAddr, strings.Join(events, "%20AND%20"), page, limit)
endpoint := common.BuildTxSearchEndpoint(rpcAddr, query, page, limit, "desc")
common.GetLogger().Info("[query-transaction] Entering transaction search: ", endpoint)

resp, err := http.Get(endpoint)
Expand Down Expand Up @@ -131,31 +130,23 @@ func GetFilteredTransactions(rpcAddr string, address string, txtypes []string, d
Txs: []*tmTypes.ResultTx{},
TotalCount: 0,
}

if len(directions) == 0 {
directions = []string{"inbound", "outbound"}
}
if slices.Contains(directions, "inbound") {
cachedTxs1, err := GetTransactionsWithSync(rpcAddr, address, false)
for _, cachedTx := range cachedTxs1.Txs {
hashToDirectionMap[cachedTx.Hash.String()] = append(hashToDirectionMap[cachedTx.Hash.String()], "inbound")
}
if err != nil {
return nil, err
}
cachedTxs.TotalCount += cachedTxs1.TotalCount
cachedTxs.Txs = append(cachedTxs.Txs, cachedTxs1.Txs...)
}

if slices.Contains(directions, "outbound") {
cachedTxs2, err := GetTransactionsWithSync(rpcAddr, address, true)
for _, cachedTx := range cachedTxs2.Txs {
hashToDirectionMap[cachedTx.Hash.String()] = append(hashToDirectionMap[cachedTx.Hash.String()], "outbound")
}
if err != nil {
return nil, err
if address != "" {
for _, direction := range directions {
switch direction {
case "inbound":
cachedTxs, _ = processDirection(rpcAddr, address, false, "inbound", hashToDirectionMap, &cachedTxs)

case "outbound":
cachedTxs, _ = processDirection(rpcAddr, address, true, "outbound", hashToDirectionMap, &cachedTxs)
}
}
cachedTxs.TotalCount += cachedTxs2.TotalCount
cachedTxs.Txs = append(cachedTxs.Txs, cachedTxs2.Txs...)
} else {
cachedTxs, _ = processDirection(rpcAddr, address, true, "outbound", hashToDirectionMap, &cachedTxs)
}

var res []types.TransactionResponse
Expand Down Expand Up @@ -224,6 +215,7 @@ func GetFilteredTransactions(rpcAddr string, address string, txtypes []string, d
Status: hashStatus,
Direction: hashToDirectionMap[cachedTx.Hash.String()][0],
Hash: fmt.Sprintf("0x%X", cachedTx.Hash),
Height: cachedTx.Height,
Txs: txResponses,
}
if len(hashToDirectionMap[cachedTx.Hash.String()]) > 1 {
Expand Down Expand Up @@ -306,39 +298,6 @@ func SearchTxHashHandle(rpcAddr string, sender string, recipient string, txType
return result, nil
}

// Get block height for tx hash from cache or tendermint
func getBlockHeight(rpcAddr string, hash string) (int64, error) {
endpoint := fmt.Sprintf("%s/tx?hash=%s", rpcAddr, hash)
common.GetLogger().Info("[query-block] Entering block query: ", endpoint)

resp, err := http.Get(endpoint)
if err != nil {
common.GetLogger().Error("[query-block] Unable to connect to ", endpoint)
return 0, err
}
defer resp.Body.Close()

respBody, _ := ioutil.ReadAll(resp.Body)
response := new(tmJsonRPCTypes.RPCResponse)

if err := json.Unmarshal(respBody, response); err != nil {
common.GetLogger().Error("[query-block] Unable to decode response: ", err)
return 0, err
}
if response.Error != nil {
common.GetLogger().Error("[query-block] Error response:", response.Error.Message)
return 0, errors.New(response.Error.Message)
}

result := new(tmTypes.ResultTx)
if err := tmjson.Unmarshal(response.Result, result); err != nil {
common.GetLogger().Error("[query-block] Failed to unmarshal result:", err)
return 0, fmt.Errorf("error unmarshalling result: %w", err)
}

return result.Height, nil
}

func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{}, interface{}, int) {
err := r.ParseForm()
if err != nil {
Expand Down Expand Up @@ -371,10 +330,6 @@ func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{}

//------------ Address ------------
account = r.FormValue("address")
if account == "" {
common.GetLogger().Error("[query-transactions] 'address' is not set")
return common.ServeError(0, "'address' is not set", "", http.StatusBadRequest)
}

//------------ Direction ------------
directionsParam := r.FormValue("direction")
Expand Down Expand Up @@ -509,7 +464,7 @@ func QueryBlockTransactionsHandler(rpcAddr string, r *http.Request) (interface{}
}
txResults = txResults[offset:int(math.Min(float64(offset+limit), float64(len(txResults))))]

res := TxsResponse{
res := types.TxsResponse{
TotalCount: totalCount,
Transactions: txResults,
}
Expand Down Expand Up @@ -655,3 +610,24 @@ func QueryUnconfirmedTxs(rpcAddr string) http.HandlerFunc {
common.WrapResponse(w, request, *response, statusCode, false)
}
}

func processDirection(
rpcAddr, address string,
isOutbound bool,
direction string,
hashToDirectionMap map[string][]string,
cachedTxs *tmTypes.ResultTxSearch,
) (tmTypes.ResultTxSearch, error) {
transactions, err := GetTransactionsWithSync(rpcAddr, address, isOutbound)
if err != nil {
return tmTypes.ResultTxSearch{}, err
}

for _, tx := range transactions.Txs {
hashToDirectionMap[tx.Hash.String()] = append(hashToDirectionMap[tx.Hash.String()], direction)
}

cachedTxs.TotalCount += transactions.TotalCount
cachedTxs.Txs = append(cachedTxs.Txs, transactions.Txs...)
return *cachedTxs, nil
}
41 changes: 39 additions & 2 deletions gateway/interx/interx.tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,12 +2,16 @@ package interx

import (
"encoding/json"
"errors"
"fmt"
"io/ioutil"
"net/http"
"net/http/httptest"
"os"
"testing"
"time"

"github.com/KiraCore/interx/common"
"github.com/KiraCore/interx/config"
"github.com/KiraCore/interx/database"
"github.com/KiraCore/interx/test"
Expand Down Expand Up @@ -96,7 +100,7 @@ func (suite *InterxTxTestSuite) TestBlockTransactionsHandler() {
suite.Assert()
}

resultTxSearch := TxsResponse{}
resultTxSearch := types.TxsResponse{}
err = json.Unmarshal(suite.blockTransactionsQueryResponse.Result, &resultTxSearch)
suite.Require().NoError(err)
suite.Require().EqualValues(result.TotalCount, resultTxSearch.TotalCount)
Expand Down Expand Up @@ -140,7 +144,7 @@ func TestInterxTxTestSuite(t *testing.T) {

txMsg := make(map[string]interface{})
txMsg["type"] = "send"
resBytes, err = json.Marshal(TxsResponse{
resBytes, err = json.Marshal(types.TxsResponse{
TotalCount: 1,
Transactions: []types.TransactionResponse{
{
Expand Down Expand Up @@ -229,3 +233,36 @@ func TestInterxTxTestSuite(t *testing.T) {

tendermintServer.Close()
}

// Get block height for tx hash from cache or tendermint
func getBlockHeight(rpcAddr string, hash string) (int64, error) {
endpoint := fmt.Sprintf("%s/tx?hash=%s", rpcAddr, hash)
common.GetLogger().Info("[query-block] Entering block query: ", endpoint)

resp, err := http.Get(endpoint)
if err != nil {
common.GetLogger().Error("[query-block] Unable to connect to ", endpoint)
return 0, err
}
defer resp.Body.Close()

respBody, _ := ioutil.ReadAll(resp.Body)
response := new(tmJsonRPCTypes.RPCResponse)

if err := json.Unmarshal(respBody, response); err != nil {
common.GetLogger().Error("[query-block] Unable to decode response: ", err)
return 0, err
}
if response.Error != nil {
common.GetLogger().Error("[query-block] Error response:", response.Error.Message)
return 0, errors.New(response.Error.Message)
}

result := new(tmRPCTypes.ResultTx)
if err := tmjson.Unmarshal(response.Result, result); err != nil {
common.GetLogger().Error("[query-block] Failed to unmarshal result:", err)
return 0, fmt.Errorf("error unmarshalling result: %w", err)
}

return result.Height, nil
}
2 changes: 1 addition & 1 deletion go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ module github.com/KiraCore/interx
go 1.19

require (
cosmossdk.io/math v1.2.0
github.com/KeisukeYamashita/go-jsonrpc v1.0.1
github.com/KiraCore/sekai v0.3.38
github.com/btcsuite/btcd v0.22.1
Expand Down Expand Up @@ -35,7 +36,6 @@ require (
cosmossdk.io/depinject v1.0.0-alpha.4 // indirect
cosmossdk.io/errors v1.0.0 // indirect
cosmossdk.io/log v1.2.1 // indirect
cosmossdk.io/math v1.2.0 // indirect
cosmossdk.io/tools/rosetta v0.2.1 // indirect
filippo.io/edwards25519 v1.0.0 // indirect
github.com/99designs/go-keychain v0.0.0-20191008050251-8e49817e8af4 // indirect
Expand Down
Loading
Loading