From 9ea8596d23fa92703a518365a782abc26b89e1fc Mon Sep 17 00:00:00 2001 From: Ronoaldo JLP Date: Mon, 27 Nov 2023 21:55:12 -0300 Subject: [PATCH] feature: allow early-stopping the iterator. Implemented the Closer type and methods to allow for callers to early stop the iteretor when they're done. --- block/block.go | 14 ++++++++++- block/block_postgres.go | 45 +++++++++++++++++++++------------ block/block_postgres_test.go | 5 ++++ block/block_sqlite.go | 44 +++++++++++++++++++++------------ block/block_sqlite_test.go | 5 ++++ block/block_test.go | 48 +++++++++++++++++++++++++++++++----- block/util_test.go | 15 +++++++++++ types/closer.go | 18 ++++++++++++++ 8 files changed, 155 insertions(+), 39 deletions(-) create mode 100644 types/closer.go diff --git a/block/block.go b/block/block.go index 6e106f7..3e30377 100644 --- a/block/block.go +++ b/block/block.go @@ -2,6 +2,7 @@ package block import ( "database/sql" + "fmt" "math" "github.com/minetest-go/mtdb/types" @@ -14,6 +15,17 @@ type Block struct { Data []byte `json:"data"` } +func (b *Block) String() string { + if b == nil { + return "nil" + } + v := b.Data + if len(b.Data) > 20 { + v = b.Data[:20] + } + return fmt.Sprintf("Block{X: %d, Y: %d, Z: %d, data: \"%v\"}", b.PosX, b.PosY, b.PosZ, string(v)) +} + // BlockRepository implementes data access layer for the Minetest map data. type BlockRepository interface { types.Backup @@ -24,7 +36,7 @@ type BlockRepository interface { // Iterator returns a channel to fetch all data from the starting position // X,Y,Z, with the map blocks sorted by position ascending. Sorting is done // by X,Y and Z coordinates. - Iterator(x, y, z int) (chan *Block, error) + Iterator(x, y, z int) (chan *Block, types.Closer, error) // Update upserts the provided map block in the database, using the position // as key. diff --git a/block/block_postgres.go b/block/block_postgres.go index bca33fb..60d2c89 100644 --- a/block/block_postgres.go +++ b/block/block_postgres.go @@ -8,6 +8,7 @@ import ( "encoding/json" + "github.com/minetest-go/mtdb/types" "github.com/sirupsen/logrus" ) @@ -29,7 +30,7 @@ func (repo *postgresBlockRepository) GetByPos(x, y, z int) (*Block, error) { return entry, err } -func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, error) { +func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) { rows, err := repo.db.Query(` SELECT posX, posY, posZ, data FROM blocks @@ -37,11 +38,12 @@ func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, error) ORDER BY posX, posY, posZ `, x, y, z) if err != nil { - return nil, err + return nil, nil, err } l := logrus.WithField("iterating_from", []int{x, y, z}) ch := make(chan *Block) + done := make(types.WhenDone, 1) count := int64(0) // Spawn go routine to fetch rows and send to channel @@ -49,26 +51,37 @@ func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, error) defer close(ch) defer rows.Close() - l.Debug("Retrieving database rows") - for rows.Next() { - // Debug progress while fetching rows every 100's - count++ - if count%100 == 0 { - l.Debugf("Retrieved %d records so far", count) - } - // Fetch and send to channel - b := &Block{} - if err = rows.Scan(&b.PosX, &b.PosY, &b.PosZ, &b.Data); err != nil { - l.Errorf("Failed to read next item from iterator: %v", err) + l.Debug("Retrieving database rows ...") + for { + select { + case <-done: + // We can now return, we are done + l.Debugf("Iterator closed by caller. Finishing up...") return + default: + if rows.Next() { + // Debug progress while fetching rows every 100's + count++ + if count%100 == 0 { + l.Debugf("Retrieved %d records so far", count) + } + // Fetch and send to channel + b := &Block{} + if err = rows.Scan(&b.PosX, &b.PosY, &b.PosZ, &b.Data); err != nil { + l.Errorf("Failed to read next item from iterator: %v; aborting", err) + return + } + ch <- b + } else { + l.Debug("Iterator finished, closing up rows and channel") + return + } } - ch <- b } - l.Debug("Iterator finished, closing up rows and channel") }() // Return channel to main component - return ch, nil + return ch, done, nil } func (repo *postgresBlockRepository) Update(block *Block) error { diff --git a/block/block_postgres_test.go b/block/block_postgres_test.go index b878201..80ddf1b 100644 --- a/block/block_postgres_test.go +++ b/block/block_postgres_test.go @@ -75,3 +75,8 @@ func TestPostgresIteratorErrorHandling(t *testing.T) { ALTER TABLE blocks ALTER COLUMN posX TYPE float; UPDATE blocks SET posX = 18446744073709551615;`) } + +func TestPostgresIteratorCloser(t *testing.T) { + r, _ := setupPostgress(t) + testIteratorClose(t, r) +} diff --git a/block/block_sqlite.go b/block/block_sqlite.go index 550ab5d..e498ea0 100644 --- a/block/block_sqlite.go +++ b/block/block_sqlite.go @@ -7,6 +7,7 @@ import ( "database/sql" "encoding/json" + "github.com/minetest-go/mtdb/types" "github.com/sirupsen/logrus" ) @@ -71,7 +72,7 @@ func (repo *sqliteBlockRepository) GetByPos(x, y, z int) (*Block, error) { return entry, err } -func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, error) { +func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) { pos := CoordToPlain(x, y, z) rows, err := repo.db.Query(` SELECT pos, data @@ -80,13 +81,14 @@ func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, error) { ORDER BY pos `, pos) if err != nil { - return nil, err + return nil, nil, err } l := logrus. WithField("iterating_from", []int{x, y, z}). WithField("pos", pos) ch := make(chan *Block) + done := make(types.WhenDone, 1) count := int64(0) // Spawn go routine to fetch rows and send to channel @@ -95,26 +97,36 @@ func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, error) { defer rows.Close() l.Debug("Retrieving database rows") - for rows.Next() { - // Debug progress while fetching rows every 100's - count++ - if count%100 == 0 { - l.Debugf("Retrieved %d records so far", count) - } - // Fetch and send to channel - b := &Block{} - if err = rows.Scan(&pos, &b.Data); err != nil { - l.Errorf("Failed to read next item from iterator: %v", err) + for { + select { + case <-done: + l.Debugf("Iterator closed by caller. Finishing up...") return + default: + if rows.Next() { + // Debug progress while fetching rows every 100's + count++ + if count%100 == 0 { + l.Debugf("Retrieved %d records so far", count) + } + // Fetch and send to channel + b := &Block{} + if err = rows.Scan(&pos, &b.Data); err != nil { + l.Errorf("Failed to read next item from iterator: %v", err) + return + } + b.PosX, b.PosY, b.PosZ = PlainToCoord(pos) + ch <- b + } else { + l.Debug("Iterator finished, closing up rows and channel") + return + } } - b.PosX, b.PosY, b.PosZ = PlainToCoord(pos) - ch <- b } - l.Debug("Iterator finished, closing up rows and channel") }() // Return channel to main component - return ch, nil + return ch, done, nil } func (repo *sqliteBlockRepository) Update(block *Block) error { diff --git a/block/block_sqlite_test.go b/block/block_sqlite_test.go index 5c5c091..447f20c 100644 --- a/block/block_sqlite_test.go +++ b/block/block_sqlite_test.go @@ -72,3 +72,8 @@ func TestCoordToPlain(t *testing.T) { } } } + +func TestSqliteIteratorCloser(t *testing.T) { + r, _ := setupSqlite(t) + testIteratorClose(t, r) +} diff --git a/block/block_test.go b/block/block_test.go index 8218c1e..7d23519 100644 --- a/block/block_test.go +++ b/block/block_test.go @@ -109,7 +109,7 @@ func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepositor if !ok { return count } - t.Logf("consumeAll: got %#v", b) + t.Logf("consumeAll: got %v", b) count++ case <-time.After(3 * time.Second): t.Errorf("consumeAll: timed out") @@ -122,31 +122,32 @@ func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepositor } // Fetch from neg -> pos, retrieves all three blocks - it, err := blocks_repo.Iterator(negX, negY, negZ) + it, _, err := blocks_repo.Iterator(negX, negY, negZ) if assert.NoError(t, err) { assert.Equal(t, 3, consumeAll(it)) } // Fetch from zero -> pos, retrieves two blocks - it, err = blocks_repo.Iterator(0, 0, 0) + it, _, err = blocks_repo.Iterator(0, 0, 0) if assert.NoError(t, err) { assert.Equal(t, 2, consumeAll(it)) } // Fetch from zero +1 -> pos, retrieves only one - it, err = blocks_repo.Iterator(0, 0, 1) + it, _, err = blocks_repo.Iterator(0, 0, 1) if assert.NoError(t, err) { assert.Equal(t, 1, consumeAll(it)) } // Fetch from 2000,2000,2000, retrieves zero blocks - it, err = blocks_repo.Iterator(posX+1, posY+1, posZ+1) + it, _, err = blocks_repo.Iterator(posX+1, posY+1, posZ+1) if assert.NoError(t, err) { assert.Equal(t, 0, consumeAll(it)) } } func testIteratorErrorHandling(t *testing.T, blocks_repo block.BlockRepository, db *sql.DB, mockDataCorruption string) { + logToTesting(t) setUp := func() { if err := blocks_repo.Update(&block.Block{1, 2, 3, []byte("default:stone")}); err != nil { t.Fatalf("setUp: error loading test data: %v", err) @@ -167,7 +168,7 @@ func testIteratorErrorHandling(t *testing.T, blocks_repo block.BlockRepository, setUp() defer tearDown() - ch, err := blocks_repo.Iterator(0, 0, 0) + ch, _, err := blocks_repo.Iterator(0, 0, 0) if err != nil { t.Fatalf("Error loading the iterator: %v", err) } @@ -180,3 +181,38 @@ func testIteratorErrorHandling(t *testing.T, blocks_repo block.BlockRepository, assert.Equal(t, 0, count, "should not return any blocks when data is corrupted") } + +func testIteratorClose(t *testing.T, r block.BlockRepository) { + logToTesting(t) + + // setUp: Generates 1000+ blocks + for x := -10; x <= 10; x += 2 { + for y := -10; y <= 10; y += 2 { + for z := -10; z <= 10; z += 2 { + r.Update(&block.Block{x, y, z, []byte("default:stone")}) + } + } + } + + it, cl, err := r.Iterator(0, 0, 0) + assert.NoError(t, err, "no error should be returned when initializing iterator") + assert.NotNil(t, cl, "closer should not be nil") + + count := 0 + for b := range it { + t.Logf("Block received: %v", b) + assert.NotNil(t, b, "should not return a nil block from iterator") + count++ + + if count >= 10 { + t.Logf("Closing the bridge at %d", count) + assert.NoError(t, cl.Close(), "closer should not have any errors") + break + } + } + + totalCount, err := r.Count() + assert.NoError(t, err, "should not return error when counting") + + t.Logf("Retrieved %d blocks from a total of %d", count, totalCount) +} diff --git a/block/util_test.go b/block/util_test.go index df61a8f..78305cd 100644 --- a/block/util_test.go +++ b/block/util_test.go @@ -7,6 +7,7 @@ import ( "testing" _ "github.com/lib/pq" + "github.com/sirupsen/logrus" ) func getPostgresDB(t *testing.T) (*sql.DB, error) { @@ -24,3 +25,17 @@ func getPostgresDB(t *testing.T) (*sql.DB, error) { return sql.Open("postgres", connStr) } + +type testingLogWriter struct { + t *testing.T +} + +func (l testingLogWriter) Write(b []byte) (n int, err error) { + l.t.Logf(string(b)) + return len(b), nil +} + +func logToTesting(t *testing.T) { + logrus.SetOutput(testingLogWriter{t}) + logrus.SetLevel(logrus.DebugLevel) +} diff --git a/types/closer.go b/types/closer.go new file mode 100644 index 0000000..9fddcab --- /dev/null +++ b/types/closer.go @@ -0,0 +1,18 @@ +package types + +// Closer is a type that signals the implementation that we are finished +// consuming the data/performing work and the async operation can be stopped. +// +// Used by the block.BlockRepository.Iterator implementation in order to perform +// an early stop. +type Closer interface { + Close() error +} + +// WhenDone is a simple channel-based implementation of the Closer interface +type WhenDone chan bool + +func (w WhenDone) Close() error { + w <- true + return nil +}