Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: Make LedgerTransactionReader seekable and lazy #5274

Merged
merged 11 commits into from
Apr 16, 2024
19 changes: 16 additions & 3 deletions ingest/CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,27 @@ All notable changes to this project will be documented in this file. This projec

## Unreleased

* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050)

### New Features
* **Performance improvement**: the Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later.
* Support for Soroban and Protocol 20!
* The `LedgerTransactionReader` now has a `Seek(index int)` method to provide reading from arbitrary parts of the ledger [5274](https://github.com/stellar/go/pull/5274).
* `Change` now has a canonical stringification and a set of them is deterministically sortable.
* `NewCompactingChangeReader` will give you a wrapped `ChangeReader` that compacts the changes.
* Let filewatcher use binary hash instead of timestamp to detect core version update [4050](https://github.com/stellar/go/pull/4050).

### Performance Improvements
* The Captive Core backend now reuses bucket files whenever it finds existing ones in the corresponding `--captive-core-storage-path` (introduced in [v2.0](#v2.0.0)) rather than generating a one-time temporary sub-directory ([#3670](https://github.com/stellar/go/pull/3670)). Note that taking advantage of this feature requires [Stellar-Core v17.1.0](https://github.com/stellar/stellar-core/releases/tag/v17.1.0) or later.
* There have been miscallaneous memory and processing speed improvements.

### Bug Fixes
* The Stellar Core runner now parses logs from its underlying subprocess better [#3746](https://github.com/stellar/go/pull/3746).
* Ensures that the underlying Stellar Core is terminated before restarting.
* Backends will now connect with a user agent.
* Better handling of various error and restart scenarios.

### Breaking Changes
* **Captive Core is now the only available backend.**
* The Captive Core configuration should be provided via a TOML file.
* `Change.AccountSignersChanged` has been removed.

## v2.0.0

Expand Down
6 changes: 3 additions & 3 deletions ingest/ledger_change_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -176,7 +176,7 @@ func (r *LedgerChangeReader) Read() (Change, error) {
}
return r.Read()
case evictionChangesState:
entries, err := r.ledgerCloseMeta.EvictedPersistentLedgerEntries()
entries, err := r.lcm.EvictedPersistentLedgerEntries()
if err != nil {
return Change{}, err
}
Expand All @@ -196,9 +196,9 @@ func (r *LedgerChangeReader) Read() (Change, error) {
return r.Read()
case upgradeChangesState:
// Get upgrade changes
if r.upgradeIndex < len(r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()) {
if r.upgradeIndex < len(r.LedgerTransactionReader.lcm.UpgradesProcessing()) {
changes := GetChangesFromLedgerEntryChanges(
r.LedgerTransactionReader.ledgerCloseMeta.UpgradesProcessing()[r.upgradeIndex].Changes,
r.LedgerTransactionReader.lcm.UpgradesProcessing()[r.upgradeIndex].Changes,
)
r.pending = append(r.pending, changes...)
r.upgradeIndex++
Expand Down
140 changes: 87 additions & 53 deletions ingest/ledger_transaction_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,17 +11,29 @@ import (
"github.com/stellar/go/xdr"
)

// LedgerTransactionReader reads transactions for a given ledger sequence from a backend.
// Use NewTransactionReader to create a new instance.
var badMetaVersionErr = errors.New(
"TransactionMeta.V=2 is required in protocol version older than version 10. " +
"Please process ledgers again using the latest stellar-core version.",
)

// LedgerTransactionReader reads transactions for a given ledger sequence from a
// backend. Use NewTransactionReader to create a new instance.
type LedgerTransactionReader struct {
ledgerCloseMeta xdr.LedgerCloseMeta
transactions []LedgerTransaction
readIdx int
lcm xdr.LedgerCloseMeta // read-only
envelopesByHash map[xdr.Hash]xdr.TransactionEnvelope // set once

readIdx int // tracks iteration & seeking
}

// NewLedgerTransactionReader creates a new TransactionReader instance.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.LedgerBackend, networkPassphrase string, sequence uint32) (*LedgerTransactionReader, error) {
// NewLedgerTransactionReader creates a new TransactionReader instance. Note
// that TransactionReader is not thread safe and should not be shared by
// multiple goroutines.
func NewLedgerTransactionReader(
ctx context.Context,
backend ledgerbackend.LedgerBackend,
networkPassphrase string,
sequence uint32,
) (*LedgerTransactionReader, error) {
ledgerCloseMeta, err := backend.GetLedger(ctx, sequence)
if err != nil {
return nil, errors.Wrap(err, "error getting ledger from the backend")
Expand All @@ -30,87 +42,109 @@ func NewLedgerTransactionReader(ctx context.Context, backend ledgerbackend.Ledge
return NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase, ledgerCloseMeta)
}

// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader instance from xdr.LedgerCloseMeta.
// Note that TransactionReader is not thread safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta(networkPassphrase string, ledgerCloseMeta xdr.LedgerCloseMeta) (*LedgerTransactionReader, error) {
reader := &LedgerTransactionReader{ledgerCloseMeta: ledgerCloseMeta}
if err := reader.storeTransactions(ledgerCloseMeta, networkPassphrase); err != nil {
// NewLedgerTransactionReaderFromLedgerCloseMeta creates a new TransactionReader
// instance from xdr.LedgerCloseMeta. Note that TransactionReader is not thread
// safe and should not be shared by multiple goroutines.
func NewLedgerTransactionReaderFromLedgerCloseMeta(
networkPassphrase string,
ledgerCloseMeta xdr.LedgerCloseMeta,
) (*LedgerTransactionReader, error) {
reader := &LedgerTransactionReader{
lcm: ledgerCloseMeta,
envelopesByHash: make(map[xdr.Hash]xdr.TransactionEnvelope, ledgerCloseMeta.CountTransactions()),
readIdx: 0,
}

if err := reader.storeTransactions(networkPassphrase); err != nil {
return nil, errors.Wrap(err, "error extracting transactions from ledger close meta")
}
return reader, nil
}

// GetSequence returns the sequence number of the ledger data stored by this object.
func (reader *LedgerTransactionReader) GetSequence() uint32 {
return reader.ledgerCloseMeta.LedgerSequence()
return reader.lcm.LedgerSequence()
}

// GetHeader returns the XDR Header data associated with the stored ledger.
func (reader *LedgerTransactionReader) GetHeader() xdr.LedgerHeaderHistoryEntry {
return reader.ledgerCloseMeta.LedgerHeaderHistoryEntry()
return reader.lcm.LedgerHeaderHistoryEntry()
}

// Read returns the next transaction in the ledger, ordered by tx number, each time
// it is called. When there are no more transactions to return, an EOF error is returned.
func (reader *LedgerTransactionReader) Read() (LedgerTransaction, error) {
if reader.readIdx < len(reader.transactions) {
reader.readIdx++
return reader.transactions[reader.readIdx-1], nil
if reader.readIdx >= reader.lcm.CountTransactions() {
return LedgerTransaction{}, io.EOF
}
return LedgerTransaction{}, io.EOF
i := reader.readIdx
reader.readIdx++ // next read will advance even on error

hash := reader.lcm.TransactionHash(i)
envelope, ok := reader.envelopesByHash[hash]
if !ok {
hexHash := hex.EncodeToString(hash[:])
return LedgerTransaction{}, errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
}

return LedgerTransaction{
Index: uint32(i + 1), // Transactions start at '1'
Envelope: envelope,
Result: reader.lcm.TransactionResultPair(i),
UnsafeMeta: reader.lcm.TxApplyProcessing(i),
FeeChanges: reader.lcm.FeeProcessing(i),
LedgerVersion: uint32(reader.lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
}, nil
}

// Rewind resets the reader back to the first transaction in the ledger
func (reader *LedgerTransactionReader) Rewind() {
reader.readIdx = 0
reader.Seek(0)
}

// Seek sets the reader back to a specific transaction in the ledger
func (reader *LedgerTransactionReader) Seek(index int) error {
if index >= reader.lcm.CountTransactions() || index < 0 {
return io.EOF
}

reader.readIdx = index
return nil
}

// storeTransactions maps the close meta data into a slice of LedgerTransaction structs, to provide
// a per-transaction view of the data when Read() is called.
func (reader *LedgerTransactionReader) storeTransactions(lcm xdr.LedgerCloseMeta, networkPassphrase string) error {
byHash := map[xdr.Hash]xdr.TransactionEnvelope{}
for i, tx := range lcm.TransactionEnvelopes() {
// storeHashes creates a mapping between hashes and envelopes in order to
// correctly provide a per-transaction view on-the-fly when Read() is called.
func (reader *LedgerTransactionReader) storeTransactions(networkPassphrase string) error {
// See https://github.com/stellar/go/pull/2720: envelopes in the meta (which
// just come straight from the agreed-upon transaction set) are not in the
// same order as the actual list of metas (which are sorted by hash), so we
// need to hash the envelopes *first* to properly associate them with their
// metas.
for i, tx := range reader.lcm.TransactionEnvelopes() {
hash, err := network.HashTransactionInEnvelope(tx, networkPassphrase)
if err != nil {
return errors.Wrapf(err, "could not hash transaction %d in TxSet", i)
}
byHash[hash] = tx
}
reader.envelopesByHash[xdr.Hash(hash)] = tx

for i := 0; i < lcm.CountTransactions(); i++ {
hash := lcm.TransactionHash(i)
envelope, ok := byHash[hash]
if !ok {
hexHash := hex.EncodeToString(hash[:])
return errors.Errorf("unknown tx hash in LedgerCloseMeta: %v", hexHash)
// We check the version only if FeeProcessing is non-empty, because some
// backends (like HistoryArchiveBackend) do not return meta.
//
// Note that the ordering differences are irrelevant here because all we
// care about is checking every meta for this condition.
if reader.lcm.ProtocolVersion() < 10 && reader.lcm.TxApplyProcessing(i).V < 2 &&
len(reader.lcm.FeeProcessing(i)) > 0 {
return badMetaVersionErr
}

// We check the version only if FeeProcessing are non empty because some backends
// (like HistoryArchiveBackend) do not return meta.
if lcm.ProtocolVersion() < 10 && lcm.TxApplyProcessing(i).V < 2 &&
len(lcm.FeeProcessing(i)) > 0 {
return errors.New(
"TransactionMeta.V=2 is required in protocol version older than version 10. " +
"Please process ledgers again using the latest stellar-core version.",
)
}

reader.transactions = append(reader.transactions, LedgerTransaction{
Index: uint32(i + 1), // Transactions start at '1'
Envelope: envelope,
Result: lcm.TransactionResultPair(i),
UnsafeMeta: lcm.TxApplyProcessing(i),
FeeChanges: lcm.FeeProcessing(i),
LedgerVersion: uint32(lcm.LedgerHeaderHistoryEntry().Header.LedgerVersion),
})
}

return nil
}

// Close should be called when reading is finished. This is especially
// helpful when there are still some transactions available so reader can stop
// streaming them.
func (reader *LedgerTransactionReader) Close() error {
reader.transactions = nil
reader.envelopesByHash = nil
return nil
}
143 changes: 143 additions & 0 deletions ingest/ledger_transaction_reader_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,143 @@
package ingest

import (
"io"
"testing"

"github.com/stellar/go/keypair"
"github.com/stellar/go/network"
"github.com/stellar/go/support/collections/set"
"github.com/stellar/go/xdr"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
passphrase = network.TestNetworkPassphrase
// Test prep:
// - two different envelopes which resolve to two different hashes
// - two basically-empty metas that contain the corresponding hashes
// - a ledger that has 5 txs with metas corresponding to these two envs
// - specifically, in the order [first, first, second, second, second]
//
// This tests both hash <--> envelope mapping and indexed iteration.
txEnvs, txHashes, txMetas = makeTransactions(5)
// barebones LCM structure so that the tx reader works w/o nil derefs, 5 txs
ledgerCloseMeta = xdr.LedgerCloseMeta{V: 1,
V1: &xdr.LedgerCloseMetaV1{
TxProcessing: txMetas,
TxSet: xdr.GeneralizedTransactionSet{V: 1,
V1TxSet: &xdr.TransactionSetV1{
Phases: []xdr.TransactionPhase{{V: 0,
V0Components: &[]xdr.TxSetComponent{{
TxsMaybeDiscountedFee: &xdr.TxSetComponentTxsMaybeDiscountedFee{
Txs: txEnvs,
}},
},
}},
},
},
},
}
)

func TestTransactionReader(t *testing.T) {
s := set.NewSet[xdr.Hash](5)
for _, hash := range txHashes {
s.Add(hash)
}
require.Lenf(t, s, len(txHashes), "precondition: hashes aren't unique, envs: %+v", txEnvs)

// simplest case: read from start

reader, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta)
require.NoError(t, err)

for i := 0; i < 5; i++ {
tx, ierr := reader.Read()
require.NoError(t, ierr)
assert.EqualValues(t, i+1, tx.Index, "iteration i=%d", i)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope)
assert.Equal(t, txHashes[tx.Index-1], thisHash)
}
_, err = reader.Read()
require.ErrorIs(t, err, io.EOF)

// start reading from the middle set of txs

require.NoError(t, reader.Seek(2))
for i := 0; i < 3; i++ {
tx, ierr := reader.Read()
require.NoError(t, ierr)
assert.EqualValues(t,
/* txIndex is 1-based, iter is 0-based, start at 3rd tx, 5 total */
1+(i+2)%5,
tx.Index,
"iteration i=%d", i)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
assert.Equal(t, txEnvs[tx.Index-1], tx.Envelope)
assert.Equal(t, txHashes[tx.Index-1], thisHash)
}
_, err = reader.Read()
require.ErrorIs(t, err, io.EOF)

// edge case: start from the last tx
require.NoError(t, reader.Seek(4))
tx, ierr := reader.Read()
require.NoError(t, ierr)
assert.EqualValues(t, 5, tx.Index)

thisHash, ierr := network.HashTransactionInEnvelope(tx.Envelope, passphrase)
require.NoError(t, ierr)
assert.Equal(t, txEnvs[4], tx.Envelope)
assert.Equal(t, txHashes[4], thisHash)
_, err = reader.Read()
require.ErrorIs(t, err, io.EOF)

// error case: too far or too close
for _, idx := range []int{-1, 5, 6} {
rdr, err := NewLedgerTransactionReaderFromLedgerCloseMeta(passphrase, ledgerCloseMeta)
require.NoError(t, err)
require.Error(t, rdr.Seek(idx), "no error when trying seek=%d", idx)
}
}

func makeTransactions(count int) (
envs []xdr.TransactionEnvelope,
hashes [][32]byte,
metas []xdr.TransactionResultMeta,
) {
seqNum := 123_456
for i := 0; i < count; i++ {
txEnv := xdr.TransactionEnvelope{
Type: xdr.EnvelopeTypeEnvelopeTypeTx,
V1: &xdr.TransactionV1Envelope{
Tx: xdr.Transaction{
Ext: xdr.TransactionExt{V: 0},
SourceAccount: xdr.MustMuxedAddress(keypair.MustRandom().Address()),
Operations: []xdr.Operation{},
Fee: xdr.Uint32(seqNum + i),
SeqNum: xdr.SequenceNumber(seqNum + i),
},
Signatures: []xdr.DecoratedSignature{},
},
}

txHash, _ := network.HashTransactionInEnvelope(txEnv, passphrase)
txMeta := xdr.TransactionResultMeta{
Result: xdr.TransactionResultPair{TransactionHash: xdr.Hash(txHash)},
TxApplyProcessing: xdr.TransactionMeta{V: 3, V3: &xdr.TransactionMetaV3{}},
}

envs = append(envs, txEnv)
hashes = append(hashes, txHash)
metas = append(metas, txMeta)
}

return
}
Loading