diff --git a/cmd/substreams-sink-sql/common_flags.go b/cmd/substreams-sink-sql/common_flags.go index 5c63407..a8f6892 100644 --- a/cmd/substreams-sink-sql/common_flags.go +++ b/cmd/substreams-sink-sql/common_flags.go @@ -57,13 +57,15 @@ func extractSinkConfig(pkg *pbsubstreams.Package) (*pbsql.Service, error) { func newDBLoader( cmd *cobra.Command, psqlDSN string, - flushInterval time.Duration, + batchBlockFlushInterval int, + batchRowFlushInterval int, + liveBlockFlushInterval int, handleReorgs bool, ) (*db.Loader, error) { moduleMismatchMode, err := db.ParseOnModuleHashMismatch(sflags.MustGetString(cmd, onModuleHashMistmatchFlag)) cli.NoError(err, "invalid mistmatch mode") - dbLoader, err := db.NewLoader(psqlDSN, flushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer) + dbLoader, err := db.NewLoader(psqlDSN, batchBlockFlushInterval, batchRowFlushInterval, liveBlockFlushInterval, moduleMismatchMode, &handleReorgs, zlog, tracer) if err != nil { return nil, fmt.Errorf("new psql loader: %w", err) } diff --git a/cmd/substreams-sink-sql/create_user.go b/cmd/substreams-sink-sql/create_user.go index f8a91a4..5eb1a55 100644 --- a/cmd/substreams-sink-sql/create_user.go +++ b/cmd/substreams-sink-sql/create_user.go @@ -45,7 +45,7 @@ func createUserE(cmd *cobra.Command, args []string) error { } if err := retry(ctx, func(ctx context.Context) error { - dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) + dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) if err != nil { return fmt.Errorf("new psql loader: %w", err) } diff --git a/cmd/substreams-sink-sql/generate_csv.go b/cmd/substreams-sink-sql/generate_csv.go index 0637043..929415b 100644 --- a/cmd/substreams-sink-sql/generate_csv.go +++ b/cmd/substreams-sink-sql/generate_csv.go @@ -107,7 +107,7 @@ func generateCsvE(cmd *cobra.Command, args []string) error { return fmt.Errorf("new base sinker: %w", err) } - dbLoader, err := newDBLoader(cmd, dsn, 0, false) // flush interval not used in CSV mode + dbLoader, err := newDBLoader(cmd, dsn, 0, 0, 0, false) // flush interval not used in CSV mode if err != nil { return fmt.Errorf("new db loader: %w", err) } diff --git a/cmd/substreams-sink-sql/run.go b/cmd/substreams-sink-sql/run.go index 927321e..d2bfc3c 100644 --- a/cmd/substreams-sink-sql/run.go +++ b/cmd/substreams-sink-sql/run.go @@ -28,7 +28,9 @@ var sinkRunCmd = Command(sinkRunE, AddCommonSinkerFlags(flags) flags.Int("undo-buffer-size", 0, "If non-zero, handling of reorgs in the database is disabled. Instead, a buffer is introduced to only process a blocks once it has been confirmed by that many blocks, introducing a latency but slightly reducing the load on the database when close to head.") - flags.Int("flush-interval", 1000, "When in catch up mode, flush every N blocks") + flags.Int("batch-block-flush-interval", 1_000, "When in catch up mode, flush every N blocks or after batch-row-flush-interval, whichever comes first. Set to 0 to disable and only use batch-row-flush-interval (default 1,000).") + flags.Int("batch-row-flush-interval", 100_000, "When in catch up mode, flush every N rows or after batch-block-flush-interval, whichever comes first. Set to 0 to disable and only use batch-block-flush-interval (default 100,000).") + flags.Int("live-block-flush-interval", 1, "When processing in live mode, flush every N blocks (default 1).") flags.StringP("endpoint", "e", "", "Specify the substreams endpoint, ex: `mainnet.eth.streamingfast.io:443`") }), OnCommandErrorLogAndExit(zlog), @@ -84,7 +86,7 @@ func sinkRunE(cmd *cobra.Command, args []string) error { return fmt.Errorf("new base sinker: %w", err) } - dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetDuration(cmd, "flush-interval"), handleReorgs) + dbLoader, err := newDBLoader(cmd, dsn, sflags.MustGetInt(cmd, "batch-block-flush-interval"), sflags.MustGetInt(cmd, "batch-row-flush-interval"), sflags.MustGetInt(cmd, "live-block-flush-interval"), handleReorgs) if err != nil { return fmt.Errorf("new db loader: %w", err) } diff --git a/cmd/substreams-sink-sql/setup.go b/cmd/substreams-sink-sql/setup.go index afdd591..c4687f0 100644 --- a/cmd/substreams-sink-sql/setup.go +++ b/cmd/substreams-sink-sql/setup.go @@ -46,7 +46,7 @@ func sinkSetupE(cmd *cobra.Command, args []string) error { return fmt.Errorf("extract sink config: %w", err) } - dbLoader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) + dbLoader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchError, nil, zlog, tracer) if err != nil { return fmt.Errorf("new psql loader: %w", err) } diff --git a/cmd/substreams-sink-sql/tools.go b/cmd/substreams-sink-sql/tools.go index 398228c..c67a900 100644 --- a/cmd/substreams-sink-sql/tools.go +++ b/cmd/substreams-sink-sql/tools.go @@ -145,7 +145,7 @@ func toolsDeleteCursorE(cmd *cobra.Command, args []string) error { func toolsCreateLoader() *db.Loader { dsn := viper.GetString("tools-global-dsn") - loader, err := db.NewLoader(dsn, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer) + loader, err := db.NewLoader(dsn, 0, 0, 0, db.OnModuleHashMismatchIgnore, nil, zlog, tracer) cli.NoError(err, "Unable to instantiate database manager from DSN %q", dsn) if err := loader.LoadTables(); err != nil { diff --git a/db/db.go b/db/db.go index 0a45630..4469cc5 100644 --- a/db/db.go +++ b/db/db.go @@ -4,8 +4,6 @@ import ( "context" "database/sql" "fmt" - "time" - "github.com/jimsmart/schema" "github.com/streamingfast/logging" orderedmap "github.com/wk8/go-ordered-map/v2" @@ -39,8 +37,11 @@ type Loader struct { tables map[string]*TableInfo cursorTable *TableInfo - handleReorgs bool - flushInterval time.Duration + handleReorgs bool + batchBlockFlushInterval int + batchRowFlushInterval int + liveBlockFlushInterval int + moduleMismatchMode OnModuleHashMismatch logger *zap.Logger @@ -51,7 +52,9 @@ type Loader struct { func NewLoader( psqlDsn string, - flushInterval time.Duration, + batchBlockFlushInterval int, + batchRowFlushInterval int, + liveBlockFlushInterval int, moduleMismatchMode OnModuleHashMismatch, handleReorgs *bool, logger *zap.Logger, @@ -68,15 +71,17 @@ func NewLoader( } l := &Loader{ - DB: db, - database: dsn.database, - schema: dsn.schema, - entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](), - tables: map[string]*TableInfo{}, - flushInterval: flushInterval, - moduleMismatchMode: moduleMismatchMode, - logger: logger, - tracer: tracer, + DB: db, + database: dsn.database, + schema: dsn.schema, + entries: NewOrderedMap[string, *OrderedMap[string, *Operation]](), + tables: map[string]*TableInfo{}, + batchBlockFlushInterval: batchBlockFlushInterval, + batchRowFlushInterval: batchRowFlushInterval, + liveBlockFlushInterval: liveBlockFlushInterval, + moduleMismatchMode: moduleMismatchMode, + logger: logger, + tracer: tracer, } _, err = l.tryDialect() if err != nil { @@ -95,7 +100,9 @@ func NewLoader( } logger.Info("created new DB loader", - zap.Duration("flush_interval", flushInterval), + zap.Int("batch_block_flush_interval", batchBlockFlushInterval), + zap.Int("batch_row_flush_interval", batchRowFlushInterval), + zap.Int("live_block_flush_interval", liveBlockFlushInterval), zap.String("driver", dsn.driver), zap.String("database", dsn.database), zap.String("schema", dsn.schema), @@ -129,8 +136,18 @@ func (l *Loader) BeginTx(ctx context.Context, opts *sql.TxOptions) (Tx, error) { return l.DB.BeginTx(ctx, opts) } -func (l *Loader) FlushInterval() time.Duration { - return l.flushInterval +func (l *Loader) BatchBlockFlushInterval() int { + return l.batchBlockFlushInterval +} + +func (l *Loader) LiveBlockFlushInterval() int { return l.liveBlockFlushInterval } + +func (l *Loader) FlushNeeded() bool { + totalRows := 0 + for pair := l.entries.Oldest(); pair != nil; pair = pair.Next() { + totalRows += pair.Value.Len() + } + return totalRows > l.batchRowFlushInterval } func (l *Loader) LoadTables() error { diff --git a/db/testing.go b/db/testing.go index 252207a..523d0d4 100644 --- a/db/testing.go +++ b/db/testing.go @@ -15,7 +15,7 @@ func NewTestLoader( tables map[string]*TableInfo, ) (*Loader, *TestTx) { - loader, err := NewLoader("psql://x:5432/x", 0, OnModuleHashMismatchIgnore, nil, zlog, tracer) + loader, err := NewLoader("psql://x:5432/x", 0, 0, 0, OnModuleHashMismatchIgnore, nil, zlog, tracer) if err != nil { panic(err) } diff --git a/sinker/sinker.go b/sinker/sinker.go index 24b0639..93bbbcb 100644 --- a/sinker/sinker.go +++ b/sinker/sinker.go @@ -118,8 +118,13 @@ func (s *SQLSinker) HandleBlockScopedData(ctx context.Context, data *pbsubstream return fmt.Errorf("apply database changes: %w", err) } - if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 { - s.logger.Debug("flushing to database", zap.Stringer("block", cursor.Block()), zap.Bool("is_live", *isLive)) + if data.Clock.Number%s.batchBlockModulo(data, isLive) == 0 || s.loader.FlushNeeded() { + s.logger.Debug("flushing to database", + zap.Stringer("block", cursor.Block()), + zap.Bool("is_live", *isLive), + zap.Bool("block_flush_interval_reached", data.Clock.Number%s.batchBlockModulo(data, isLive) == 0), + zap.Bool("row_flush_interval_reached", s.loader.FlushNeeded()), + ) flushStart := time.Now() rowFlushedCount, err := s.loader.Flush(ctx, s.OutputModuleHash(), cursor, data.FinalBlockHeight) @@ -219,8 +224,8 @@ func (s *SQLSinker) batchBlockModulo(blockData *pbsubstreamsrpc.BlockScopedData, return LIVE_BLOCK_FLUSH_EACH } - if s.loader.FlushInterval() > 0 { - return uint64(s.loader.FlushInterval()) + if s.loader.BatchBlockFlushInterval() > 0 { + return uint64(s.loader.BatchBlockFlushInterval()) } return HISTORICAL_BLOCK_FLUSH_EACH diff --git a/sinker/stats.go b/sinker/stats.go index 6d9e502..88af764 100644 --- a/sinker/stats.go +++ b/sinker/stats.go @@ -24,8 +24,8 @@ func NewStats(logger *zap.Logger) *Stats { return &Stats{ Shutter: shutter.New(), - dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Second, 30*time.Second, "flush"), - dbFlushAvgDuration: dmetrics.NewAvgDurationCounter(30*time.Second, dmetrics.InferUnit, "per flush"), + dbFlushRate: dmetrics.MustNewAvgRateFromPromCounter(FlushCount, 1*time.Minute, 30*time.Minute, "flush"), + dbFlushAvgDuration: dmetrics.NewAvgDurationCounter(1*time.Minute, dmetrics.InferUnit, "per flush"), flusehdRows: dmetrics.NewValueFromMetric(FlushedRowsCount, "rows"), logger: logger,