Skip to content

Commit

Permalink
Merge pull request #2 from pinax-network/feature/rework_flush_intervals
Browse files Browse the repository at this point in the history
rework flush intervals
  • Loading branch information
fschoell authored Apr 15, 2024
2 parents 8dd82c8 + 66b66b3 commit f08eadc
Show file tree
Hide file tree
Showing 10 changed files with 58 additions and 32 deletions.
6 changes: 4 additions & 2 deletions cmd/substreams-sink-sql/common_flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,13 +57,15 @@ func extractSinkConfig(pkg *pbsubstreams.Package) (*pbsql.Service, error) {
func newDBLoader(
cmd *cobra.Command,
psqlDSN string,
flushInterval time.Duration,
batchBlockFlushInterval int,
batchRowFlushInterval int,
liveBlockFlushInterval int,
handleReorgs bool,
) (*db.Loader, error) {
moduleMismatchMode, err := db.ParseOnModuleHashMismatch(sflags.MustGetString(cmd, onModuleHashMistmatchFlag))
cli.NoError(err, "invalid mistmatch mode")

dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer)
dbLoader, err := db.NewLoader(psqlDSN, batchBlockFlushInterval, batchRowFlushInterval, liveBlockFlushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer)
if err != nil {
return nil, fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/create_user.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,7 @@ func createUserE(cmd *cobra.Command, args []string) error {
}

if err := retry(ctx, func(ctx context.Context) error {
dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
if err != nil {
return fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/generate_csv.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,7 +107,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new base sinker: %w", err)
}

dbLoader, err := newDBLoader(cmd, dsn, 0, false) // flush interval not used in CSV mode
dbLoader, err := newDBLoader(cmd, dsn, 0, 0, 0, false) // flush interval not used in CSV mode
if err != nil {
return fmt.Errorf("new db loader: %w", err)
}
Expand Down
6 changes: 4 additions & 2 deletions cmd/substreams-sink-sql/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,9 @@ var sinkRunCmd = Command(sinkRunE,
AddCommonSinkerFlags(flags)

flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.")
flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks")
flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval (default 1,000).")
flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval (default 100,000).")
flags.Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks (default 1).")
flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`")
}),
OnCommandErrorLogAndExit(zlog),
Expand Down Expand Up @@ -84,7 +86,7 @@ func sinkRunE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("new base sinker: %w", err)
}

dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval"), handleReorgs)
dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetInt(cmd, "batch-block-flush-interval"), sflags.MustGetInt(cmd, "batch-row-flush-interval"), sflags.MustGetInt(cmd, "live-block-flush-interval"), handleReorgs)
if err != nil {
return fmt.Errorf("new db loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/setup.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@ func sinkSetupE(cmd *cobra.Command, args []string) error {
return fmt.Errorf("extract sink config: %w", err)
}

dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer)
if err != nil {
return fmt.Errorf("new psql loader: %w", err)
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/substreams-sink-sql/tools.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func toolsDeleteCursorE(cmd *cobra.Command, args []string) error {

func toolsCreateLoader() *db.Loader {
dsn := viper.GetString("tools-global-dsn")
loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer)
loader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer)
cli.NoError(err, "Unable to instantiate database manager from DSN %q", dsn)

if err := loader.LoadTables(); err != nil {
Expand Down
51 changes: 34 additions & 17 deletions db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@ import (
"context"
"database/sql"
"fmt"
"time"

"github.com/jimsmart/schema"
"github.com/streamingfast/logging"
orderedmap "github.com/wk8/go-ordered-map/v2"
Expand Down Expand Up @@ -39,8 +37,11 @@ type Loader struct {
tables map[string]*TableInfo
cursorTable *TableInfo

handleReorgs bool
flushInterval time.Duration
handleReorgs bool
batchBlockFlushInterval int
batchRowFlushInterval int
liveBlockFlushInterval int

moduleMismatchMode OnModuleHashMismatch

logger *zap.Logger
Expand All @@ -51,7 +52,9 @@ type Loader struct {

func NewLoader(
psqlDsn string,
flushInterval time.Duration,
batchBlockFlushInterval int,
batchRowFlushInterval int,
liveBlockFlushInterval int,
moduleMismatchMode OnModuleHashMismatch,
handleReorgs *bool,
logger *zap.Logger,
Expand All @@ -68,15 +71,17 @@ func NewLoader(
}

l := &Loader{
DB: db,
database: dsn.database,
schema: dsn.schema,
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
tables: map[string]*TableInfo{},
flushInterval: flushInterval,
moduleMismatchMode: moduleMismatchMode,
logger: logger,
tracer: tracer,
DB: db,
database: dsn.database,
schema: dsn.schema,
entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](),
tables: map[string]*TableInfo{},
batchBlockFlushInterval: batchBlockFlushInterval,
batchRowFlushInterval: batchRowFlushInterval,
liveBlockFlushInterval: liveBlockFlushInterval,
moduleMismatchMode: moduleMismatchMode,
logger: logger,
tracer: tracer,
}
_, err = l.tryDialect()
if err != nil {
Expand All @@ -95,7 +100,9 @@ func NewLoader(
}

logger.Info("created new DB loader",
zap.Duration("flush_interval", flushInterval),
zap.Int("batch_block_flush_interval", batchBlockFlushInterval),
zap.Int("batch_row_flush_interval", batchRowFlushInterval),
zap.Int("live_block_flush_interval", liveBlockFlushInterval),
zap.String("driver", dsn.driver),
zap.String("database", dsn.database),
zap.String("schema", dsn.schema),
Expand Down Expand Up @@ -129,8 +136,18 @@ func (l *Loader) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error) {
return l.DB.BeginTx(ctx, opts)
}

func (l *Loader) FlushInterval() time.Duration {
return l.flushInterval
func (l *Loader) BatchBlockFlushInterval() int {
return l.batchBlockFlushInterval
}

func (l *Loader) LiveBlockFlushInterval() int { return l.liveBlockFlushInterval }

func (l *Loader) FlushNeeded() bool {
totalRows := 0
for pair := l.entries.Oldest(); pair != nil; pair = pair.Next() {
totalRows += pair.Value.Len()
}
return totalRows > l.batchRowFlushInterval
}

func (l *Loader) LoadTables() error {
Expand Down
2 changes: 1 addition & 1 deletion db/testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ func NewTestLoader(
tables map[string]*TableInfo,
) (*Loader, *TestTx) {

loader, err := NewLoader("psql://x:5432/x", 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
loader, err := NewLoader("psql://x:5432/x", 0, 0, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer)
if err != nil {
panic(err)
}
Expand Down
13 changes: 9 additions & 4 deletions sinker/sinker.go
Original file line number Diff line number Diff line change
Expand Up @@ -118,8 +118,13 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream
return fmt.Errorf("apply database changes: %w", err)
}

if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 {
s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive))
if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 || s.loader.FlushNeeded() {
s.logger.Debug("flushing to database",
zap.Stringer("block", cursor.Block()),
zap.Bool("is_live", *isLive),
zap.Bool("block_flush_interval_reached", data.Clock.Number%s.batchBlockModulo(data, isLive) == 0),
zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()),
)

flushStart := time.Now()
rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight)
Expand Down Expand Up @@ -219,8 +224,8 @@ func (s *SQLSinker) batchBlockModulo(blockData *pbsubstreamsrpc.BlockScopedData,
return LIVE_BLOCK_FLUSH_EACH
}

if s.loader.FlushInterval() > 0 {
return uint64(s.loader.FlushInterval())
if s.loader.BatchBlockFlushInterval() > 0 {
return uint64(s.loader.BatchBlockFlushInterval())
}

return HISTORICAL_BLOCK_FLUSH_EACH
Expand Down
4 changes: 2 additions & 2 deletions sinker/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ func NewStats(logger *zap.Logger) *Stats {
return &Stats{
Shutter: shutter.New(),

dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Second, 30*time.Second, "flush"),
dbFlushAvgDuration: dmetrics.NewAvgDurationCounter(30*time.Second, dmetrics.InferUnit, "per flush"),
dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Minute, 30*time.Minute, "flush"),
dbFlushAvgDuration: dmetrics.NewAvgDurationCounter(1*time.Minute, dmetrics.InferUnit, "per flush"),
flusehdRows: dmetrics.NewValueFromMetric(FlushedRowsCount, "rows"),
logger: logger,

Expand Down

0 comments on commit f08eadc

Please sign in to comment.