Skip to content

Commit

Permalink
Add getLedgers implementation (#303)
Browse files Browse the repository at this point in the history
Add getLedgers implementation
  • Loading branch information
aditya1702 authored Nov 13, 2024
1 parent f768eb9 commit 4f10b54
Show file tree
Hide file tree
Showing 11 changed files with 821 additions and 42 deletions.
4 changes: 4 additions & 0 deletions cmd/soroban-rpc/internal/config/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ type Config struct {
CoreRequestTimeout time.Duration
DefaultEventsLimit uint
DefaultTransactionsLimit uint
DefaultLedgersLimit uint
FriendbotURL string
HistoryArchiveURLs []string
HistoryArchiveUserAgent string
Expand All @@ -34,6 +35,7 @@ type Config struct {
LogLevel logrus.Level
MaxEventsLimit uint
MaxTransactionsLimit uint
MaxLedgersLimit uint
MaxHealthyLedgerLatency time.Duration
NetworkPassphrase string
PreflightWorkerCount uint
Expand All @@ -52,6 +54,7 @@ type Config struct {
RequestBacklogGetLedgerEntriesQueueLimit uint
RequestBacklogGetTransactionQueueLimit uint
RequestBacklogGetTransactionsQueueLimit uint
RequestBacklogGetLedgersQueueLimit uint
RequestBacklogSendTransactionQueueLimit uint
RequestBacklogSimulateTransactionQueueLimit uint
RequestBacklogGetFeeStatsTransactionQueueLimit uint
Expand All @@ -65,6 +68,7 @@ type Config struct {
MaxGetLedgerEntriesExecutionDuration time.Duration
MaxGetTransactionExecutionDuration time.Duration
MaxGetTransactionsExecutionDuration time.Duration
MaxGetLedgersExecutionDuration time.Duration
MaxSendTransactionExecutionDuration time.Duration
MaxSimulateTransactionExecutionDuration time.Duration
MaxGetFeeStatsExecutionDuration time.Duration
Expand Down
35 changes: 35 additions & 0 deletions cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -282,6 +282,28 @@ func (cfg *Config) options() Options {
return nil
},
},
{
Name: "max-ledgers-limit",
Usage: "Maximum amount of ledgers allowed in a single getLedgers response",
ConfigKey: &cfg.MaxLedgersLimit,
DefaultValue: uint(200),
},
{
Name: "default-ledgers-limit",
Usage: "Default cap on the amount of ledgers included in a single getLedgers response",
ConfigKey: &cfg.DefaultLedgersLimit,
DefaultValue: uint(50),
Validate: func(_ *Option) error {
if cfg.DefaultLedgersLimit > cfg.MaxLedgersLimit {
return fmt.Errorf(
"default-ledgers-limit (%v) cannot exceed max-ledgers-limit (%v)",
cfg.DefaultLedgersLimit,
cfg.MaxLedgersLimit,
)
}
return nil
},
},
{
Name: "max-healthy-ledger-latency",
Usage: "maximum ledger latency (i.e. time elapsed since the last known ledger closing time) considered to be healthy" +
Expand Down Expand Up @@ -372,6 +394,13 @@ func (cfg *Config) options() Options {
DefaultValue: uint(1000),
Validate: positive,
},
{
TomlKey: strutils.KebabToConstantCase("request-backlog-get-ledgers-queue-limit"),
Usage: "Maximum number of outstanding getLedgers requests",
ConfigKey: &cfg.RequestBacklogGetLedgersQueueLimit,
DefaultValue: uint(1000),
Validate: positive,
},
{
TomlKey: strutils.KebabToConstantCase("request-backlog-send-transaction-queue-limit"),
Usage: "Maximum number of outstanding SendTransaction requests",
Expand Down Expand Up @@ -453,6 +482,12 @@ func (cfg *Config) options() Options {
ConfigKey: &cfg.MaxGetTransactionsExecutionDuration,
DefaultValue: 5 * time.Second,
},
{
TomlKey: strutils.KebabToConstantCase("max-get-ledgers-execution-duration"),
Usage: "The maximum duration of time allowed for processing a getLedgers request. When that time elapses, the rpc server would return -32001 and abort the request's execution",
ConfigKey: &cfg.MaxGetLedgersExecutionDuration,
DefaultValue: 5 * time.Second,
},
{
TomlKey: strutils.KebabToConstantCase("max-send-transaction-execution-duration"),
Usage: "The maximum duration of time allowed for processing a sendTransaction request. When that time elapses, the rpc server would return -32001 and abort the request's execution",
Expand Down
122 changes: 99 additions & 23 deletions cmd/soroban-rpc/internal/db/ledger.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,12 @@ package db

import (
"context"
"database/sql"
"fmt"

sq "github.com/Masterminds/squirrel"

"github.com/stellar/go/support/db"
"github.com/stellar/go/xdr"

"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ledgerbucketwindow"
Expand All @@ -22,20 +24,82 @@ type LedgerReader interface {
StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error
GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error)
StreamLedgerRange(ctx context.Context, startLedger uint32, endLedger uint32, f StreamLedgerFn) error
NewTx(ctx context.Context) (LedgerReaderTx, error)
}

type LedgerReaderTx interface {
GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error)
BatchGetLedgers(ctx context.Context, sequence uint32, batchSize uint) ([]xdr.LedgerCloseMeta, error)
Done() error
}

type LedgerWriter interface {
InsertLedger(ledger xdr.LedgerCloseMeta) error
}

type readDB interface {
Select(ctx context.Context, dest interface{}, query sq.Sqlizer) error
}

type ledgerReader struct {
db *DB
}

type ledgerReaderTx struct {
tx db.SessionInterface
latestLedgerSeq uint32
latestLedgerCloseTime int64
}

func (l ledgerReaderTx) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.LedgerRange, error) {
if l.latestLedgerSeq != 0 {
return getLedgerRangeWithCache(ctx, l.tx, l.latestLedgerSeq, l.latestLedgerCloseTime)
}
return getLedgerRangeWithoutCache(ctx, l.tx)
}

// BatchGetLedgers fetches ledgers in batches from the db.
func (l ledgerReaderTx) BatchGetLedgers(ctx context.Context, sequence uint32,
batchSize uint,
) ([]xdr.LedgerCloseMeta, error) {
sql := sq.Select("meta").
From(ledgerCloseMetaTableName).
Where(sq.And{
sq.GtOrEq{"sequence": sequence},
sq.LtOrEq{"sequence": sequence + uint32(batchSize) - 1},
})

results := make([]xdr.LedgerCloseMeta, 0, batchSize)
if err := l.tx.Select(ctx, &results, sql); err != nil {
return nil, err
}

return results, nil
}

func (l ledgerReaderTx) Done() error {
return l.tx.Rollback()
}

func NewLedgerReader(db *DB) LedgerReader {
return ledgerReader{db: db}
}

func (r ledgerReader) NewTx(ctx context.Context) (LedgerReaderTx, error) {
r.db.cache.RLock()
defer r.db.cache.RUnlock()
txSession := r.db.Clone()
if err := txSession.BeginTx(ctx, &sql.TxOptions{ReadOnly: true}); err != nil {
return nil, fmt.Errorf("failed to begin read transaction: %w", err)
}
tx := ledgerReaderTx{
tx: txSession,
latestLedgerSeq: r.db.cache.latestLedgerSeq,
latestLedgerCloseTime: r.db.cache.latestLedgerCloseTime,
}
return tx, nil
}

// StreamAllLedgers runs f over all the ledgers in the database (until f errors or signals it's done).
func (r ledgerReader) StreamAllLedgers(ctx context.Context, f StreamLedgerFn) error {
sql := sq.Select("meta").From(ledgerCloseMetaTableName).OrderBy("sequence asc")
Expand Down Expand Up @@ -112,32 +176,44 @@ func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.Le

// Make use of the cached latest ledger seq and close time to query only the oldest ledger details.
if latestLedgerSeqCache != 0 {
query := sq.Select("meta").
From(ledgerCloseMetaTableName).
Where(
fmt.Sprintf("sequence = (SELECT MIN(sequence) FROM %s)", ledgerCloseMetaTableName),
)
var lcm []xdr.LedgerCloseMeta
if err := r.db.Select(ctx, &lcm, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}
return getLedgerRangeWithCache(ctx, r.db, latestLedgerSeqCache, latestLedgerCloseTimeCache)
}
return getLedgerRangeWithoutCache(ctx, r.db)
}

if len(lcm) == 0 {
return ledgerbucketwindow.LedgerRange{}, ErrEmptyDB
}
// getLedgerRangeWithCache uses the latest ledger cache to optimize the query.
// It only needs to look up the first ledger since we have the latest cached.
func getLedgerRangeWithCache(ctx context.Context, db readDB,
latestSeq uint32, latestTime int64,
) (ledgerbucketwindow.LedgerRange, error) {
query := sq.Select("meta").
From(ledgerCloseMetaTableName).
Where(
fmt.Sprintf("sequence = (SELECT MIN(sequence) FROM %s)", ledgerCloseMetaTableName),
)
var lcm []xdr.LedgerCloseMeta
if err := db.Select(ctx, &lcm, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}

return ledgerbucketwindow.LedgerRange{
FirstLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcm[0].LedgerSequence(),
CloseTime: lcm[0].LedgerCloseTime(),
},
LastLedger: ledgerbucketwindow.LedgerInfo{
Sequence: latestLedgerSeqCache,
CloseTime: latestLedgerCloseTimeCache,
},
}, nil
if len(lcm) == 0 {
return ledgerbucketwindow.LedgerRange{}, ErrEmptyDB
}

return ledgerbucketwindow.LedgerRange{
FirstLedger: ledgerbucketwindow.LedgerInfo{
Sequence: lcm[0].LedgerSequence(),
CloseTime: lcm[0].LedgerCloseTime(),
},
LastLedger: ledgerbucketwindow.LedgerInfo{
Sequence: latestSeq,
CloseTime: latestTime,
},
}, nil
}

// getLedgerRangeWithoutCache queries both the first and last ledger when cache isn't available
func getLedgerRangeWithoutCache(ctx context.Context, db readDB) (ledgerbucketwindow.LedgerRange, error) {
query := sq.Select("lcm.meta").
From(ledgerCloseMetaTableName + " as lcm").
Where(sq.Or{
Expand All @@ -146,7 +222,7 @@ func (r ledgerReader) GetLedgerRange(ctx context.Context) (ledgerbucketwindow.Le
}).OrderBy("lcm.sequence ASC")

var lcms []xdr.LedgerCloseMeta
if err := r.db.Select(ctx, &lcms, query); err != nil {
if err := db.Select(ctx, &lcms, query); err != nil {
return ledgerbucketwindow.LedgerRange{}, fmt.Errorf("couldn't query ledger range: %w", err)
}

Expand Down
59 changes: 40 additions & 19 deletions cmd/soroban-rpc/internal/db/ledger_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -183,25 +183,8 @@ func TestGetLedgerRange_EmptyDB(t *testing.T) {
}

func BenchmarkGetLedgerRange(b *testing.B) {
db := NewTestDB(b)
logger := log.DefaultLogger
writer := NewReadWriter(logger, db, interfaces.MakeNoOpDeamon(), 100, 1_000_000, passphrase)
write, err := writer.NewTx(context.TODO())
require.NoError(b, err)

// create 100k tx rows
lcms := make([]xdr.LedgerCloseMeta, 0, 100_000)
for i := range cap(lcms) {
lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0))
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(b, ledgerW.InsertLedger(lcm))
require.NoError(b, txW.InsertTransactions(lcm))
}
require.NoError(b, write.Commit(lcms[len(lcms)-1]))
reader := NewLedgerReader(db)
testDB, lcms := setupBenchmarkingDB(b)
reader := NewLedgerReader(testDB)

b.ResetTimer()
for range b.N {
Expand All @@ -212,6 +195,22 @@ func BenchmarkGetLedgerRange(b *testing.B) {
}
}

func BenchmarkBatchGetLedgers(b *testing.B) {
testDB, lcms := setupBenchmarkingDB(b)
reader := NewLedgerReader(testDB)
readTx, err := reader.NewTx(context.Background())
require.NoError(b, err)
batchSize := uint(200) // using the current maximum value for getLedgers endpoint

b.ResetTimer()
for range b.N {
ledgers, err := readTx.BatchGetLedgers(context.TODO(), 1334, batchSize)
require.NoError(b, err)
assert.Equal(b, lcms[0].LedgerSequence(), ledgers[0].LedgerSequence())
assert.Equal(b, lcms[batchSize-1].LedgerSequence(), ledgers[batchSize-1].LedgerSequence())
}
}

func NewTestDB(tb testing.TB) *DB {
tmp := tb.TempDir()
dbPath := path.Join(tmp, "db.sqlite")
Expand All @@ -222,3 +221,25 @@ func NewTestDB(tb testing.TB) *DB {
})
return db
}

func setupBenchmarkingDB(b *testing.B) (*DB, []xdr.LedgerCloseMeta) {
testDB := NewTestDB(b)
logger := log.DefaultLogger
writer := NewReadWriter(logger, testDB, interfaces.MakeNoOpDeamon(),
100, 1_000_000, passphrase)
write, err := writer.NewTx(context.TODO())
require.NoError(b, err)

lcms := make([]xdr.LedgerCloseMeta, 0, 100_000)
for i := range cap(lcms) {
lcms = append(lcms, txMeta(uint32(1234+i), i%2 == 0))
}

ledgerW, txW := write.LedgerWriter(), write.TransactionWriter()
for _, lcm := range lcms {
require.NoError(b, ledgerW.InsertLedger(lcm))
require.NoError(b, txW.InsertTransactions(lcm))
}
require.NoError(b, write.Commit(lcms[len(lcms)-1]))
return testDB, lcms
}
4 changes: 4 additions & 0 deletions cmd/soroban-rpc/internal/db/mocks.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,6 +109,10 @@ func (m *MockLedgerReader) GetLedgerRange(_ context.Context) (ledgerbucketwindow
return m.txn.ledgerRange, nil
}

func (m *MockLedgerReader) NewTx(_ context.Context) (LedgerReaderTx, error) {
return nil, errors.New("mock NewTx error")
}

var (
_ TransactionReader = &MockTransactionHandler{}
_ TransactionWriter = &MockTransactionHandler{}
Expand Down
Loading

0 comments on commit 4f10b54

Please sign in to comment.