Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events db backend #230

Merged
merged 17 commits into from
Aug 24, 2024
Merged
Show file tree
Hide file tree
Changes from 10 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 2 additions & 1 deletion cmd/soroban-rpc/internal/config/options.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ import (

const (
// OneDayOfLedgers is (roughly) a 24 hour window of ledgers.
OneDayOfLedgers = 17280
OneDayOfLedgers = 17280
SevenDayOfLedgers = OneDayOfLedgers * 7

defaultHTTPEndpoint = "localhost:8000"
)
Expand Down
90 changes: 48 additions & 42 deletions cmd/soroban-rpc/internal/daemon/daemon.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,6 @@ import (
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/config"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/db"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/events"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/feewindow"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/ingest"
"github.com/stellar/soroban-rpc/cmd/soroban-rpc/internal/preflight"
Expand Down Expand Up @@ -199,7 +198,7 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}, metricsRegistry),
}

feewindows, eventStore := daemon.mustInitializeStorage(cfg)
feewindows := daemon.mustInitializeStorage(cfg)

onIngestionRetry := func(err error, dur time.Duration) {
logger.WithError(err).Error("could not run ingestion. Retrying")
Expand All @@ -215,7 +214,6 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
cfg.HistoryRetentionWindow,
cfg.NetworkPassphrase,
),
EventStore: eventStore,
NetworkPassPhrase: cfg.NetworkPassphrase,
Archive: historyArchive,
LedgerBackend: core,
Expand All @@ -240,12 +238,12 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {

jsonRPCHandler := internal.NewJSONRPCHandler(cfg, internal.HandlerParams{
Daemon: daemon,
EventStore: eventStore,
FeeStatWindows: feewindows,
Logger: logger,
LedgerReader: db.NewLedgerReader(dbConn),
LedgerEntryReader: db.NewLedgerEntryReader(dbConn),
TransactionReader: db.NewTransactionReader(logger, dbConn, cfg.NetworkPassphrase),
EventReader: db.NewEventReader(logger, dbConn, cfg.NetworkPassphrase),
PreflightGetter: preflightWorkerPool,
})

Expand Down Expand Up @@ -290,56 +288,64 @@ func MustNew(cfg *config.Config, logger *supportlog.Entry) *Daemon {
}

// mustInitializeStorage initializes the storage using what was on the DB
func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindows, *events.MemoryStore) {
eventStore := events.NewMemoryStore(
d,
cfg.NetworkPassphrase,
cfg.HistoryRetentionWindow,
)
feewindows := feewindow.NewFeeWindows(
func (d *Daemon) mustInitializeStorage(cfg *config.Config) *feewindow.FeeWindows {
feeWindows := feewindow.NewFeeWindows(
cfg.ClassicFeeStatsLedgerRetentionWindow,
cfg.SorobanFeeStatsLedgerRetentionWindow,
cfg.NetworkPassphrase,
d.db,
)

readTxMetaCtx, cancelReadTxMeta := context.WithTimeout(context.Background(), cfg.IngestionTimeout)
defer cancelReadTxMeta()
var initialSeq uint32
var currentSeq uint32
dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg)
applicableRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, cfg.HistoryRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for migration")
}

maxFeeRetentionWindow := max(cfg.ClassicFeeStatsLedgerRetentionWindow, cfg.SorobanFeeStatsLedgerRetentionWindow)
feeStatsRange, err := db.GetMigrationLedgerRange(readTxMetaCtx, d.db, maxFeeRetentionWindow)
if err != nil {
d.logger.WithError(err).Fatal("could not get ledger range for fee stats")
}

// Combine the ledger range for fees, events and transactions
ledgerSeqRange := feeStatsRange.Merge(applicableRange)

dataMigrations, err := db.BuildMigrations(readTxMetaCtx, d.logger, d.db, cfg.NetworkPassphrase, ledgerSeqRange)
if err != nil {
d.logger.WithError(err).Fatal("could not build migrations")
}
// NOTE: We could optimize this to avoid unnecessary ingestion calls
// (the range of txmetas can be larger than the individual store retention windows)
// but it's probably not worth the pain.
err = db.NewLedgerReader(d.db).StreamAllLedgers(readTxMetaCtx, func(txmeta xdr.LedgerCloseMeta) error {
currentSeq = txmeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store and applying DB data migrations")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
if err := eventStore.IngestEvents(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize event memory store")
}
if err := feewindows.IngestFees(txmeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}
// TODO: clean up once we remove the in-memory storage.
// (we should only stream over the required range)
if r := dataMigrations.ApplicableRange(); r.IsLedgerIncluded(currentSeq) {
if err := dataMigrations.Apply(readTxMetaCtx, txmeta); err != nil {
d.logger.WithError(err).Fatal("could not run migrations")

// Apply migration for events, transactions and fee stats
err = db.NewLedgerReader(d.db).StreamLedgerRange(
readTxMetaCtx,
ledgerSeqRange.FirstLedgerSeq,
ledgerSeqRange.LastLedgerSeq,
func(txMeta xdr.LedgerCloseMeta) error {
currentSeq = txMeta.LedgerSequence()
if initialSeq == 0 {
initialSeq = currentSeq
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Info("initializing in-memory store")
} else if (currentSeq-initialSeq)%inMemoryInitializationLedgerLogPeriod == 0 {
d.logger.WithFields(supportlog.F{
"seq": currentSeq,
}).Debug("still initializing in-memory store")
}
}
return nil
})

if err = feeWindows.IngestFees(txMeta); err != nil {
d.logger.WithError(err).Fatal("could not initialize fee stats")
}

if err := dataMigrations.Apply(readTxMetaCtx, txMeta); err != nil {
d.logger.WithError(err).Fatal("could not apply migration for ledger ", currentSeq)
}
return nil
})
if err != nil {
d.logger.WithError(err).Fatal("could not obtain txmeta cache from the database")
}
Expand All @@ -353,7 +359,7 @@ func (d *Daemon) mustInitializeStorage(cfg *config.Config) (*feewindow.FeeWindow
}).Info("finished initializing in-memory store and applying DB data migrations")
}

return feewindows, eventStore
return feeWindows
}

func (d *Daemon) Run() {
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down Expand Up @@ -27,6 +27,13 @@ type Cursor struct {
Event uint32
}

type CursorRange struct {
// Start defines the (inclusive) start of the range.
Start Cursor
// End defines the (exclusive) end of the range.
End Cursor
}

// String returns a string representation of this cursor
func (c Cursor) String() string {
return fmt.Sprintf(
Expand Down
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
package events
package db

import (
"encoding/json"
Expand Down
16 changes: 16 additions & 0 deletions cmd/soroban-rpc/internal/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,7 @@ type ReadWriter interface {

type WriteTx interface {
TransactionWriter() TransactionWriter
EventWriter() EventWriter
LedgerEntryWriter() LedgerEntryWriter
LedgerWriter() LedgerWriter

Expand Down Expand Up @@ -250,6 +251,12 @@ func (rw *readWriter) NewTx(ctx context.Context) (WriteTx, error) {
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
eventWriter: eventHandler{
log: rw.log,
db: txSession,
stmtCache: stmtCache,
passphrase: rw.passphrase,
},
}
writer.txWriter.RegisterMetrics(
rw.metrics.TxIngestDuration,
Expand All @@ -266,6 +273,7 @@ type writeTx struct {
ledgerEntryWriter ledgerEntryWriter
ledgerWriter ledgerWriter
txWriter transactionHandler
eventWriter eventHandler
ledgerRetentionWindow uint32
}

Expand All @@ -281,6 +289,10 @@ func (w writeTx) TransactionWriter() TransactionWriter {
return &w.txWriter
}

func (w writeTx) EventWriter() EventWriter {
return &w.eventWriter
}

func (w writeTx) Commit(ledgerSeq uint32) error {
if err := w.ledgerEntryWriter.flush(); err != nil {
return err
Expand All @@ -293,6 +305,10 @@ func (w writeTx) Commit(ledgerSeq uint32) error {
return err
}

if err := w.eventWriter.trimEvents(ledgerSeq, w.ledgerRetentionWindow); err != nil {
psheth9 marked this conversation as resolved.
Show resolved Hide resolved
return err
}

_, err := sq.Replace(metaTableName).
Values(latestLedgerSequenceMetaKey, strconv.FormatUint(uint64(ledgerSeq), 10)).
RunWith(w.stmtCache).
Expand Down
Loading
Loading