Skip to content

Commit

Permalink
feature: allow early-stopping the iterator.
Browse files Browse the repository at this point in the history
Implemented the Closer type and methods to allow
for callers to early stop the iteretor when they're done.
  • Loading branch information
ronoaldo committed Nov 28, 2023
1 parent 8727323 commit 9ea8596
Show file tree
Hide file tree
Showing 8 changed files with 155 additions and 39 deletions.
14 changes: 13 additions & 1 deletion block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package block

import (
"database/sql"
"fmt"
"math"

"github.com/minetest-go/mtdb/types"
Expand All @@ -14,6 +15,17 @@ type Block struct {
Data []byte `json:"data"`
}

func (b *Block) String() string {
if b == nil {
return "nil"
}
v := b.Data
if len(b.Data) > 20 {
v = b.Data[:20]
}
return fmt.Sprintf("Block{X: %d, Y: %d, Z: %d, data: \"%v\"}", b.PosX, b.PosY, b.PosZ, string(v))
}

// BlockRepository implementes data access layer for the Minetest map data.
type BlockRepository interface {
types.Backup
Expand All @@ -24,7 +36,7 @@ type BlockRepository interface {
// Iterator returns a channel to fetch all data from the starting position
// X,Y,Z, with the map blocks sorted by position ascending. Sorting is done
// by X,Y and Z coordinates.
Iterator(x, y, z int) (chan *Block, error)
Iterator(x, y, z int) (chan *Block, types.Closer, error)

// Update upserts the provided map block in the database, using the position
// as key.
Expand Down
45 changes: 29 additions & 16 deletions block/block_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"encoding/json"

"github.com/minetest-go/mtdb/types"
"github.com/sirupsen/logrus"
)

Expand All @@ -29,46 +30,58 @@ func (repo *postgresBlockRepository) GetByPos(x, y, z int) (*Block, error) {
return entry, err
}

func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, error) {
func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) {
rows, err := repo.db.Query(`
SELECT posX, posY, posZ, data
FROM blocks
WHERE posX >= $1 AND posY >= $2 AND posZ >= $3
ORDER BY posX, posY, posZ
`, x, y, z)
if err != nil {
return nil, err
return nil, nil, err
}

l := logrus.WithField("iterating_from", []int{x, y, z})
ch := make(chan *Block)
done := make(types.WhenDone, 1)
count := int64(0)

// Spawn go routine to fetch rows and send to channel
go func() {
defer close(ch)
defer rows.Close()

l.Debug("Retrieving database rows")
for rows.Next() {
// Debug progress while fetching rows every 100's
count++
if count%100 == 0 {
l.Debugf("Retrieved %d records so far", count)
}
// Fetch and send to channel
b := &Block{}
if err = rows.Scan(&b.PosX, &b.PosY, &b.PosZ, &b.Data); err != nil {
l.Errorf("Failed to read next item from iterator: %v", err)
l.Debug("Retrieving database rows ...")
for {
select {
case <-done:
// We can now return, we are done
l.Debugf("Iterator closed by caller. Finishing up...")
return
default:
if rows.Next() {
// Debug progress while fetching rows every 100's
count++
if count%100 == 0 {
l.Debugf("Retrieved %d records so far", count)
}
// Fetch and send to channel
b := &Block{}
if err = rows.Scan(&b.PosX, &b.PosY, &b.PosZ, &b.Data); err != nil {
l.Errorf("Failed to read next item from iterator: %v; aborting", err)
return
}
ch <- b
} else {
l.Debug("Iterator finished, closing up rows and channel")
return
}
}
ch <- b
}
l.Debug("Iterator finished, closing up rows and channel")
}()

// Return channel to main component
return ch, nil
return ch, done, nil
}

func (repo *postgresBlockRepository) Update(block *Block) error {
Expand Down
5 changes: 5 additions & 0 deletions block/block_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,3 +75,8 @@ func TestPostgresIteratorErrorHandling(t *testing.T) {
ALTER TABLE blocks ALTER COLUMN posX TYPE float;
UPDATE blocks SET posX = 18446744073709551615;`)
}

func TestPostgresIteratorCloser(t *testing.T) {
r, _ := setupPostgress(t)
testIteratorClose(t, r)
}
44 changes: 28 additions & 16 deletions block/block_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"database/sql"
"encoding/json"

"github.com/minetest-go/mtdb/types"
"github.com/sirupsen/logrus"
)

Expand Down Expand Up @@ -71,7 +72,7 @@ func (repo *sqliteBlockRepository) GetByPos(x, y, z int) (*Block, error) {
return entry, err
}

func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, error) {
func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) {
pos := CoordToPlain(x, y, z)
rows, err := repo.db.Query(`
SELECT pos, data
Expand All @@ -80,13 +81,14 @@ func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, error) {
ORDER BY pos
`, pos)
if err != nil {
return nil, err
return nil, nil, err
}

l := logrus.
WithField("iterating_from", []int{x, y, z}).
WithField("pos", pos)
ch := make(chan *Block)
done := make(types.WhenDone, 1)
count := int64(0)

// Spawn go routine to fetch rows and send to channel
Expand All @@ -95,26 +97,36 @@ func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, error) {
defer rows.Close()

l.Debug("Retrieving database rows")
for rows.Next() {
// Debug progress while fetching rows every 100's
count++
if count%100 == 0 {
l.Debugf("Retrieved %d records so far", count)
}
// Fetch and send to channel
b := &Block{}
if err = rows.Scan(&pos, &b.Data); err != nil {
l.Errorf("Failed to read next item from iterator: %v", err)
for {
select {
case <-done:
l.Debugf("Iterator closed by caller. Finishing up...")
return
default:
if rows.Next() {
// Debug progress while fetching rows every 100's
count++
if count%100 == 0 {
l.Debugf("Retrieved %d records so far", count)
}
// Fetch and send to channel
b := &Block{}
if err = rows.Scan(&pos, &b.Data); err != nil {
l.Errorf("Failed to read next item from iterator: %v", err)
return
}
b.PosX, b.PosY, b.PosZ = PlainToCoord(pos)
ch <- b
} else {
l.Debug("Iterator finished, closing up rows and channel")
return
}
}
b.PosX, b.PosY, b.PosZ = PlainToCoord(pos)
ch <- b
}
l.Debug("Iterator finished, closing up rows and channel")
}()

// Return channel to main component
return ch, nil
return ch, done, nil
}

func (repo *sqliteBlockRepository) Update(block *Block) error {
Expand Down
5 changes: 5 additions & 0 deletions block/block_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,3 +72,8 @@ func TestCoordToPlain(t *testing.T) {
}
}
}

func TestSqliteIteratorCloser(t *testing.T) {
r, _ := setupSqlite(t)
testIteratorClose(t, r)
}
48 changes: 42 additions & 6 deletions block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepositor
if !ok {
return count
}
t.Logf("consumeAll: got %#v", b)
t.Logf("consumeAll: got %v", b)
count++
case <-time.After(3 * time.Second):
t.Errorf("consumeAll: timed out")
Expand All @@ -122,31 +122,32 @@ func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepositor
}

// Fetch from neg -> pos, retrieves all three blocks
it, err := blocks_repo.Iterator(negX, negY, negZ)
it, _, err := blocks_repo.Iterator(negX, negY, negZ)
if assert.NoError(t, err) {
assert.Equal(t, 3, consumeAll(it))
}

// Fetch from zero -> pos, retrieves two blocks
it, err = blocks_repo.Iterator(0, 0, 0)
it, _, err = blocks_repo.Iterator(0, 0, 0)
if assert.NoError(t, err) {
assert.Equal(t, 2, consumeAll(it))
}

// Fetch from zero +1 -> pos, retrieves only one
it, err = blocks_repo.Iterator(0, 0, 1)
it, _, err = blocks_repo.Iterator(0, 0, 1)
if assert.NoError(t, err) {
assert.Equal(t, 1, consumeAll(it))
}

// Fetch from 2000,2000,2000, retrieves zero blocks
it, err = blocks_repo.Iterator(posX+1, posY+1, posZ+1)
it, _, err = blocks_repo.Iterator(posX+1, posY+1, posZ+1)
if assert.NoError(t, err) {
assert.Equal(t, 0, consumeAll(it))
}
}

func testIteratorErrorHandling(t *testing.T, blocks_repo block.BlockRepository, db *sql.DB, mockDataCorruption string) {
logToTesting(t)
setUp := func() {
if err := blocks_repo.Update(&block.Block{1, 2, 3, []byte("default:stone")}); err != nil {
t.Fatalf("setUp: error loading test data: %v", err)
Expand All @@ -167,7 +168,7 @@ func testIteratorErrorHandling(t *testing.T, blocks_repo block.BlockRepository,
setUp()
defer tearDown()

ch, err := blocks_repo.Iterator(0, 0, 0)
ch, _, err := blocks_repo.Iterator(0, 0, 0)
if err != nil {
t.Fatalf("Error loading the iterator: %v", err)
}
Expand All @@ -180,3 +181,38 @@ func testIteratorErrorHandling(t *testing.T, blocks_repo block.BlockRepository,

assert.Equal(t, 0, count, "should not return any blocks when data is corrupted")
}

func testIteratorClose(t *testing.T, r block.BlockRepository) {
logToTesting(t)

// setUp: Generates 1000+ blocks
for x := -10; x <= 10; x += 2 {
for y := -10; y <= 10; y += 2 {
for z := -10; z <= 10; z += 2 {
r.Update(&block.Block{x, y, z, []byte("default:stone")})
}
}
}

it, cl, err := r.Iterator(0, 0, 0)
assert.NoError(t, err, "no error should be returned when initializing iterator")
assert.NotNil(t, cl, "closer should not be nil")

count := 0
for b := range it {
t.Logf("Block received: %v", b)
assert.NotNil(t, b, "should not return a nil block from iterator")
count++

if count >= 10 {
t.Logf("Closing the bridge at %d", count)
assert.NoError(t, cl.Close(), "closer should not have any errors")
break
}
}

totalCount, err := r.Count()
assert.NoError(t, err, "should not return error when counting")

t.Logf("Retrieved %d blocks from a total of %d", count, totalCount)
}
15 changes: 15 additions & 0 deletions block/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"testing"

_ "github.com/lib/pq"
"github.com/sirupsen/logrus"
)

func getPostgresDB(t *testing.T) (*sql.DB, error) {
Expand All @@ -24,3 +25,17 @@ func getPostgresDB(t *testing.T) (*sql.DB, error) {

return sql.Open("postgres", connStr)
}

type testingLogWriter struct {
t *testing.T
}

func (l testingLogWriter) Write(b []byte) (n int, err error) {
l.t.Logf(string(b))
return len(b), nil
}

func logToTesting(t *testing.T) {
logrus.SetOutput(testingLogWriter{t})
logrus.SetLevel(logrus.DebugLevel)
}
18 changes: 18 additions & 0 deletions types/closer.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package types

// Closer is a type that signals the implementation that we are finished
// consuming the data/performing work and the async operation can be stopped.
//
// Used by the block.BlockRepository.Iterator implementation in order to perform
// an early stop.
type Closer interface {
Close() error
}

// WhenDone is a simple channel-based implementation of the Closer interface
type WhenDone chan bool

func (w WhenDone) Close() error {
w <- true
return nil
}

0 comments on commit 9ea8596

Please sign in to comment.