Skip to content

Commit

Permalink
feat: implement block iterator (minetest-go#56)
Browse files Browse the repository at this point in the history
* fix: sorry, too many connections.

database/Sql.Rows.Close() should be called as it is
a safe to call and idempotent; not calling it will keep the
underlying connection open.

Added an integration test to reproduce the issue
and a fix to it.

Fixes minetest-go#53

* feat: implement block iterator for map data.

The iterator returns a go channel to allow for
concurrent access over all the data in the map,
skipping any non-existing blocks.

Fix minetest-go#55

* refactor: renamed to AsBlockPos.

* testing: added error handling for iterator.

Fixed an infinite loop/invalid results returned by
the Iterator if database/data is corrupted.

* feature: allow early-stopping the iterator.

Implemented the Closer type and methods to allow
for callers to early stop the iteretor when they're done.
  • Loading branch information
ronoaldo authored Nov 28, 2023
1 parent ded6012 commit 990b478
Show file tree
Hide file tree
Showing 9 changed files with 418 additions and 12 deletions.
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
@@ -1 +1,2 @@
/mtdb
/mtdb
docker-compose.override.yml
40 changes: 40 additions & 0 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ package block

import (
"database/sql"
"fmt"
"math"

"github.com/minetest-go/mtdb/types"
)
Expand All @@ -13,15 +15,46 @@ type Block struct {
Data []byte `json:"data"`
}

func (b *Block) String() string {
if b == nil {
return "nil"
}
v := b.Data
if len(b.Data) > 20 {
v = b.Data[:20]
}
return fmt.Sprintf("Block{X: %d, Y: %d, Z: %d, data: \"%v\"}", b.PosX, b.PosY, b.PosZ, string(v))
}

// BlockRepository implementes data access layer for the Minetest map data.
type BlockRepository interface {
types.Backup

// GetByPost returns the map block at positions X,Y,Z.
GetByPos(x, y, z int) (*Block, error)

// Iterator returns a channel to fetch all data from the starting position
// X,Y,Z, with the map blocks sorted by position ascending. Sorting is done
// by X,Y and Z coordinates.
Iterator(x, y, z int) (chan *Block, types.Closer, error)

// Update upserts the provided map block in the database, using the position
// as key.
Update(block *Block) error

// Delete removes the map block from the database denoted by the x,y,z
// coordinates.
Delete(x, y, z int) error

// Vacuum executes the storage layer vacuum command. Useful to reclaim
// storage space if not done automatically by the backend.
Vacuum() error

Count() (int64, error)
}

// NewBlockRepository initializes the connection with the appropriate database
// backend and returns the BlockRepository implementation suited for it.
func NewBlockRepository(db *sql.DB, dbtype types.DatabaseType) BlockRepository {
switch dbtype {
case types.DATABASE_POSTGRES:
Expand All @@ -32,3 +65,10 @@ func NewBlockRepository(db *sql.DB, dbtype types.DatabaseType) BlockRepository {
return nil
}
}

// AsBlockPos converts the coordinates from the given Node into the equivalent
// Block position. Each block contains 16x16x16 nodes.
func AsBlockPos(x, y, z int) (int, int, int) {
pos := func(x int) int { return int(math.Floor(float64(x) / 16.0)) }
return pos(x), pos(y), pos(z)
}
58 changes: 58 additions & 0 deletions block/block_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,11 @@ import (
"bufio"
"bytes"
"database/sql"

"encoding/json"

"github.com/minetest-go/mtdb/types"
"github.com/sirupsen/logrus"
)

type postgresBlockRepository struct {
Expand All @@ -26,6 +30,60 @@ func (repo *postgresBlockRepository) GetByPos(x, y, z int) (*Block, error) {
return entry, err
}

func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) {
rows, err := repo.db.Query(`
SELECT posX, posY, posZ, data
FROM blocks
WHERE posX >= $1 AND posY >= $2 AND posZ >= $3
ORDER BY posX, posY, posZ
`, x, y, z)
if err != nil {
return nil, nil, err
}

l := logrus.WithField("iterating_from", []int{x, y, z})
ch := make(chan *Block)
done := make(types.WhenDone, 1)
count := int64(0)

// Spawn go routine to fetch rows and send to channel
go func() {
defer close(ch)
defer rows.Close()

l.Debug("Retrieving database rows ...")
for {
select {
case <-done:
// We can now return, we are done
l.Debugf("Iterator closed by caller. Finishing up...")
return
default:
if rows.Next() {
// Debug progress while fetching rows every 100's
count++
if count%100 == 0 {
l.Debugf("Retrieved %d records so far", count)
}
// Fetch and send to channel
b := &Block{}
if err = rows.Scan(&b.PosX, &b.PosY, &b.PosZ, &b.Data); err != nil {
l.Errorf("Failed to read next item from iterator: %v; aborting", err)
return
}
ch <- b
} else {
l.Debug("Iterator finished, closing up rows and channel")
return
}
}
}
}()

// Return channel to main component
return ch, done, nil
}

func (repo *postgresBlockRepository) Update(block *Block) error {
_, err := repo.db.Exec("insert into blocks(posX,posY,posZ,data) values($1,$2,$3,$4) ON CONFLICT ON CONSTRAINT blocks_pkey DO UPDATE SET data = $4",
block.PosX, block.PosY, block.PosZ, block.Data)
Expand Down
43 changes: 34 additions & 9 deletions block/block_postgres_test.go
Original file line number Diff line number Diff line change
@@ -1,33 +1,39 @@
package block_test

import (
"database/sql"
"testing"

"github.com/minetest-go/mtdb/block"
"github.com/minetest-go/mtdb/types"
"github.com/stretchr/testify/assert"
)

func TestPostgresBlocksRepo(t *testing.T) {
func setupPostgress(t *testing.T) (block.BlockRepository, *sql.DB) {
db, err := getPostgresDB(t)
assert.NoError(t, err)

// Cleanup any previous data
db.Exec("delete from blocks")

assert.NoError(t, block.MigrateBlockDB(db, types.DATABASE_POSTGRES))
blocks_repo := block.NewBlockRepository(db, types.DATABASE_POSTGRES)
testBlocksRepository(t, blocks_repo)

assert.NotNil(t, blocks_repo)
return blocks_repo, db
}

func TestMaxConnections(t *testing.T) {
db, err := getPostgresDB(t)
assert.NoError(t, err)
func TestPostgresBlocksRepo(t *testing.T) {
blocks_repo, _ := setupPostgress(t)
testBlocksRepository(t, blocks_repo)
}

assert.NoError(t, block.MigrateBlockDB(db, types.DATABASE_POSTGRES))
blocks_repo := block.NewBlockRepository(db, types.DATABASE_POSTGRES)
assert.NotNil(t, blocks_repo)
func TestPostgresMaxConnections(t *testing.T) {
blocks_repo, db := setupPostgress(t)

var maxConnections int
row := db.QueryRow("show max_connections;")
err = row.Scan(&maxConnections)
err := row.Scan(&maxConnections)
assert.NoError(t, err)
t.Logf("Testing against %v max connections", maxConnections)

Expand Down Expand Up @@ -55,3 +61,22 @@ func TestMaxConnections(t *testing.T) {
}
}
}

func TestPostgresIterator(t *testing.T) {
blocks_repo, _ := setupPostgress(t)
testBlocksRepositoryIterator(t, blocks_repo)
}

func TestPostgresIteratorErrorHandling(t *testing.T) {
blocks_repo, db := setupPostgress(t)
defer db.Close()

testIteratorErrorHandling(t, blocks_repo, db, `
ALTER TABLE blocks ALTER COLUMN posX TYPE float;
UPDATE blocks SET posX = 18446744073709551615;`)
}

func TestPostgresIteratorCloser(t *testing.T) {
r, _ := setupPostgress(t)
testIteratorClose(t, r)
}
60 changes: 60 additions & 0 deletions block/block_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,9 @@ import (
"bytes"
"database/sql"
"encoding/json"

"github.com/minetest-go/mtdb/types"
"github.com/sirupsen/logrus"
)

type sqliteBlockRepository struct {
Expand Down Expand Up @@ -69,6 +72,63 @@ func (repo *sqliteBlockRepository) GetByPos(x, y, z int) (*Block, error) {
return entry, err
}

func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) {
pos := CoordToPlain(x, y, z)
rows, err := repo.db.Query(`
SELECT pos, data
FROM blocks
WHERE pos >= $1
ORDER BY pos
`, pos)
if err != nil {
return nil, nil, err
}

l := logrus.
WithField("iterating_from", []int{x, y, z}).
WithField("pos", pos)
ch := make(chan *Block)
done := make(types.WhenDone, 1)
count := int64(0)

// Spawn go routine to fetch rows and send to channel
go func() {
defer close(ch)
defer rows.Close()

l.Debug("Retrieving database rows")
for {
select {
case <-done:
l.Debugf("Iterator closed by caller. Finishing up...")
return
default:
if rows.Next() {
// Debug progress while fetching rows every 100's
count++
if count%100 == 0 {
l.Debugf("Retrieved %d records so far", count)
}
// Fetch and send to channel
b := &Block{}
if err = rows.Scan(&pos, &b.Data); err != nil {
l.Errorf("Failed to read next item from iterator: %v", err)
return
}
b.PosX, b.PosY, b.PosZ = PlainToCoord(pos)
ch <- b
} else {
l.Debug("Iterator finished, closing up rows and channel")
return
}
}
}
}()

// Return channel to main component
return ch, done, nil
}

func (repo *sqliteBlockRepository) Update(block *Block) error {
pos := CoordToPlain(block.PosX, block.PosY, block.PosZ)
_, err := repo.db.Exec("replace into blocks(pos,data) values($1, $2)", pos, block.Data)
Expand Down
55 changes: 53 additions & 2 deletions block/block_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,7 @@ import (
"github.com/stretchr/testify/assert"
)

func TestSqliteBlockRepo(t *testing.T) {
// open db
func setupSqlite(t *testing.T) (block.BlockRepository, *sql.DB) {
dbfile, err := os.CreateTemp(os.TempDir(), "map.sqlite")
assert.NoError(t, err)
assert.NotNil(t, dbfile)
Expand All @@ -24,5 +23,57 @@ func TestSqliteBlockRepo(t *testing.T) {

assert.NoError(t, block.MigrateBlockDB(db, types.DATABASE_SQLITE))
blocks_repo := block.NewBlockRepository(db, types.DATABASE_SQLITE)
return blocks_repo, db
}

func TestSqliteBlockRepo(t *testing.T) {
// open db
blocks_repo, _ := setupSqlite(t)
testBlocksRepository(t, blocks_repo)
}

func TestSqliteIterator(t *testing.T) {
blocks_repo, _ := setupSqlite(t)
testBlocksRepositoryIterator(t, blocks_repo)
}

func TestSqliteIteratorErrorHandling(t *testing.T) {
blocks_repo, db := setupSqlite(t)
defer db.Close()

testIteratorErrorHandling(t, blocks_repo, db, `
UPDATE blocks SET pos = 18446744073709551615;
`)
}

func TestCoordToPlain(t *testing.T) {
nodes := []struct {
x, y, z int
}{
{0, 0, 0},
{1, -1, 1},
{-1, -1, -1},

{30912, 30912, 30912},
{-30912, -30912, -30912},
}

for i, tc := range nodes {
t.Logf("Test case: #%d", i)

x1, y1, z1 := block.AsBlockPos(tc.x, tc.y, tc.z)
pos := block.CoordToPlain(x1, y1, z1)
x2, y2, z2 := block.PlainToCoord(pos)

t.Logf("in=%v,%v,%v => pos=%v => out=%v,%v,%v", x1, y1, z1, pos, x2, y2, z2)
if x1 != x2 || y1 != y2 || z1 != z2 {
t.Errorf("Unexpected coord returned from pos:"+
"x=%v,y=%v=z=%v => x=%v, y=%v, z=%v", x1, y1, z1, x2, y2, z2)
}
}
}

func TestSqliteIteratorCloser(t *testing.T) {
r, _ := setupSqlite(t)
testIteratorClose(t, r)
}
Loading

0 comments on commit 990b478

Please sign in to comment.