Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Break getTransactionsByLedgerSequence into smaller sub-functions #244

Merged
merged 14 commits into from
Jul 25, 2024
Merged
210 changes: 121 additions & 89 deletions cmd/soroban-rpc/internal/methods/get_transactions.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/stellar/go/ingest"
"github.com/stellar/go/support/log"
"github.com/stellar/go/toid"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
Expand Down Expand Up @@ -93,38 +94,19 @@ type transactionsRPCHandler struct {
networkPassphrase string
}

// getTransactionsByLedgerSequence fetches transactions between the start and end ledgers, inclusive of both.
// The number of ledgers returned can be tuned using the pagination options - cursor and limit.
func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Context, request GetTransactionsRequest) (GetTransactionsResponse, error) {
ledgerRange, err := h.ledgerReader.GetLedgerRange(ctx)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

err = request.isValid(h.maxLimit, ledgerRange)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidRequest,
Message: err.Error(),
}
}

// Move start to pagination cursor
// initializePagination sets the pagination limit and cursor
func (h transactionsRPCHandler) initializePagination(request GetTransactionsRequest) (toid.ID, uint, error) {
start := toid.New(int32(request.StartLedger), 1, 1)
limit := h.defaultLimit
if request.Pagination != nil {
if request.Pagination.Cursor != "" {
cursorInt, err := strconv.ParseInt(request.Pagination.Cursor, 10, 64)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
return toid.ID{}, 0, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: err.Error(),
}
}

*start = toid.Parse(cursorInt)
// increment tx index because, when paginating,
// we start with the item right after the cursor
Expand All @@ -134,92 +116,142 @@ func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Cont
limit = request.Pagination.Limit
}
}
return *start, limit, nil
}

// Iterate through each ledger and its transactions until limit or end range is reached.
// The latest ledger acts as the end ledger range for the request.
var txns []TransactionInfo
cursor := toid.New(0, 0, 0)
LedgerLoop:
for ledgerSeq := start.LedgerSequence; ledgerSeq <= int32(ledgerRange.LastLedger.Sequence); ledgerSeq++ {
// Get ledger close meta from db
ledger, found, err := h.ledgerReader.GetLedger(ctx, uint32(ledgerSeq))
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
// fetchLedgerData calls the meta table to fetch the corresponding ledger data.
func (h transactionsRPCHandler) fetchLedgerData(ctx context.Context, ledgerSeq uint32) (xdr.LedgerCloseMeta, error) {
ledger, found, err := h.ledgerReader.GetLedger(ctx, ledgerSeq)
if err != nil {
return ledger, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
} else if !found {
return ledger, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: fmt.Sprintf("database does not contain metadata for ledger: %d", ledgerSeq),
}
}
return ledger, nil
}

// processTransactionsInLedger cycles through all the transactions in a ledger, extracts the transaction info
// and builds the list of transactions.
func (h transactionsRPCHandler) processTransactionsInLedger(ledger xdr.LedgerCloseMeta, start toid.ID,
txns *[]TransactionInfo, limit uint,
) (*toid.ID, bool, error) {
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(h.networkPassphrase, ledger)
if err != nil {
return nil, false, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

startTxIdx := 1
ledgerSeq := ledger.LedgerSequence()
if int32(ledgerSeq) == start.LedgerSequence {
startTxIdx = int(start.TransactionOrder)
if ierr := reader.Seek(startTxIdx - 1); ierr != nil && !errors.Is(ierr, io.EOF) {
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
return nil, false, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
Message: ierr.Error(),
}
}
}

txCount := ledger.CountTransactions()
cursor := toid.New(int32(ledgerSeq), 0, 1)
for i := startTxIdx; i <= txCount; i++ {
cursor.TransactionOrder = int32(i)

ingestTx, err := reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
break
}
} else if !found {
return GetTransactionsResponse{}, &jrpc2.Error{
return nil, false, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: fmt.Sprintf("ledger close meta not found: %d", ledgerSeq),
Message: err.Error(),
}
}

// Initialize tx reader.
reader, err := ingest.NewLedgerTransactionReaderFromLedgerCloseMeta(h.networkPassphrase, ledger)
tx, err := db.ParseTransaction(ledger, ingestTx)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
return nil, false, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

// Move the reader to specific tx idx
startTxIdx := 1
if ledgerSeq == start.LedgerSequence {
startTxIdx = int(start.TransactionOrder)
if ierr := reader.Seek(startTxIdx - 1); ierr != nil && ierr != io.EOF {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: ierr.Error(),
}
}
txInfo := TransactionInfo{
ApplicationOrder: tx.ApplicationOrder,
FeeBump: tx.FeeBump,
ResultXdr: base64.StdEncoding.EncodeToString(tx.Result),
ResultMetaXdr: base64.StdEncoding.EncodeToString(tx.Meta),
EnvelopeXdr: base64.StdEncoding.EncodeToString(tx.Envelope),
DiagnosticEventsXDR: base64EncodeSlice(tx.Events),
Ledger: tx.Ledger.Sequence,
LedgerCloseTime: tx.Ledger.CloseTime,
}
txInfo.Status = TransactionStatusFailed
if tx.Successful {
txInfo.Status = TransactionStatusSuccess
}

// Decode transaction info from ledger meta
txCount := ledger.CountTransactions()
for i := startTxIdx; i <= txCount; i++ {
cursor = toid.New(int32(ledger.LedgerSequence()), int32(i), 1)
*txns = append(*txns, txInfo)
if len(*txns) >= int(limit) {
return cursor, true, nil
}
}

ingestTx, err := reader.Read()
if err != nil {
if errors.Is(err, io.EOF) {
// No more transactions to read. Start from next ledger
break
}
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidParams,
Message: err.Error(),
}
}
return cursor, false, nil
}

tx, err := db.ParseTransaction(ledger, ingestTx)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}
// getTransactionsByLedgerSequence fetches transactions between the start and end ledgers, inclusive of both.
// The number of ledgers returned can be tuned using the pagination options - cursor and limit.
func (h transactionsRPCHandler) getTransactionsByLedgerSequence(ctx context.Context,
request GetTransactionsRequest,
) (GetTransactionsResponse, error) {
ledgerRange, err := h.ledgerReader.GetLedgerRange(ctx)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InternalError,
Message: err.Error(),
}
}

txInfo := TransactionInfo{
ApplicationOrder: tx.ApplicationOrder,
FeeBump: tx.FeeBump,
ResultXdr: base64.StdEncoding.EncodeToString(tx.Result),
ResultMetaXdr: base64.StdEncoding.EncodeToString(tx.Meta),
EnvelopeXdr: base64.StdEncoding.EncodeToString(tx.Envelope),
DiagnosticEventsXDR: base64EncodeSlice(tx.Events),
Ledger: tx.Ledger.Sequence,
LedgerCloseTime: tx.Ledger.CloseTime,
}
txInfo.Status = TransactionStatusFailed
if tx.Successful {
txInfo.Status = TransactionStatusSuccess
}
err = request.isValid(h.maxLimit, ledgerRange)
if err != nil {
return GetTransactionsResponse{}, &jrpc2.Error{
Code: jrpc2.InvalidRequest,
Message: err.Error(),
}
}

txns = append(txns, txInfo)
if len(txns) >= int(limit) {
break LedgerLoop
}
start, limit, err := h.initializePagination(request)
if err != nil {
return GetTransactionsResponse{}, err
}

// Iterate through each ledger and its transactions until limit or end range is reached.
// The latest ledger acts as the end ledger range for the request.
var txns []TransactionInfo
aditya1702 marked this conversation as resolved.
Show resolved Hide resolved
var done bool
cursor := toid.New(0, 0, 0)
for ledgerSeq := start.LedgerSequence; ledgerSeq <= int32(ledgerRange.LastLedger.Sequence); ledgerSeq++ {
ledger, err := h.fetchLedgerData(ctx, uint32(ledgerSeq))
if err != nil {
return GetTransactionsResponse{}, err
}

cursor, done, err = h.processTransactionsInLedger(ledger, start, &txns, limit)
if err != nil {
return GetTransactionsResponse{}, err
}
if done {
break
}
}

Expand Down
2 changes: 1 addition & 1 deletion cmd/soroban-rpc/internal/methods/get_transactions_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -238,7 +238,7 @@ func TestGetTransactions_LedgerNotFound(t *testing.T) {
}

response, err := handler.getTransactionsByLedgerSequence(context.TODO(), request)
expectedErr := fmt.Errorf("[%d] ledger close meta not found: 2", jrpc2.InvalidParams)
expectedErr := fmt.Errorf("[%d] database does not contain metadata for ledger: 2", jrpc2.InvalidParams)
assert.Equal(t, expectedErr.Error(), err.Error())
assert.Nil(t, response.Transactions)
}
Expand Down
Loading