diff --git a/.gitignore b/.gitignore index 2d66cce..be85301 100644 --- a/.gitignore +++ b/.gitignore @@ -1 +1,2 @@ -/mtdb \ No newline at end of file +/mtdb +docker-compose.override.yml diff --git a/block/block.go b/block/block.go index 82124b7..3e30377 100644 --- a/block/block.go +++ b/block/block.go @@ -2,6 +2,8 @@ package block import ( "database/sql" + "fmt" + "math" "github.com/minetest-go/mtdb/types" ) @@ -13,15 +15,46 @@ type Block struct { Data []byte `json:"data"` } +func (b *Block) String() string { + if b == nil { + return "nil" + } + v := b.Data + if len(b.Data) > 20 { + v = b.Data[:20] + } + return fmt.Sprintf("Block{X: %d, Y: %d, Z: %d, data: \"%v\"}", b.PosX, b.PosY, b.PosZ, string(v)) +} + +// BlockRepository implementes data access layer for the Minetest map data. type BlockRepository interface { types.Backup + + // GetByPost returns the map block at positions X,Y,Z. GetByPos(x, y, z int) (*Block, error) + + // Iterator returns a channel to fetch all data from the starting position + // X,Y,Z, with the map blocks sorted by position ascending. Sorting is done + // by X,Y and Z coordinates. + Iterator(x, y, z int) (chan *Block, types.Closer, error) + + // Update upserts the provided map block in the database, using the position + // as key. Update(block *Block) error + + // Delete removes the map block from the database denoted by the x,y,z + // coordinates. Delete(x, y, z int) error + + // Vacuum executes the storage layer vacuum command. Useful to reclaim + // storage space if not done automatically by the backend. Vacuum() error + Count() (int64, error) } +// NewBlockRepository initializes the connection with the appropriate database +// backend and returns the BlockRepository implementation suited for it. func NewBlockRepository(db *sql.DB, dbtype types.DatabaseType) BlockRepository { switch dbtype { case types.DATABASE_POSTGRES: @@ -32,3 +65,10 @@ func NewBlockRepository(db *sql.DB, dbtype types.DatabaseType) BlockRepository { return nil } } + +// AsBlockPos converts the coordinates from the given Node into the equivalent +// Block position. Each block contains 16x16x16 nodes. +func AsBlockPos(x, y, z int) (int, int, int) { + pos := func(x int) int { return int(math.Floor(float64(x) / 16.0)) } + return pos(x), pos(y), pos(z) +} diff --git a/block/block_postgres.go b/block/block_postgres.go index 16b1279..60d2c89 100644 --- a/block/block_postgres.go +++ b/block/block_postgres.go @@ -5,7 +5,11 @@ import ( "bufio" "bytes" "database/sql" + "encoding/json" + + "github.com/minetest-go/mtdb/types" + "github.com/sirupsen/logrus" ) type postgresBlockRepository struct { @@ -26,6 +30,60 @@ func (repo *postgresBlockRepository) GetByPos(x, y, z int) (*Block, error) { return entry, err } +func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) { + rows, err := repo.db.Query(` + SELECT posX, posY, posZ, data + FROM blocks + WHERE posX >= $1 AND posY >= $2 AND posZ >= $3 + ORDER BY posX, posY, posZ + `, x, y, z) + if err != nil { + return nil, nil, err + } + + l := logrus.WithField("iterating_from", []int{x, y, z}) + ch := make(chan *Block) + done := make(types.WhenDone, 1) + count := int64(0) + + // Spawn go routine to fetch rows and send to channel + go func() { + defer close(ch) + defer rows.Close() + + l.Debug("Retrieving database rows ...") + for { + select { + case <-done: + // We can now return, we are done + l.Debugf("Iterator closed by caller. Finishing up...") + return + default: + if rows.Next() { + // Debug progress while fetching rows every 100's + count++ + if count%100 == 0 { + l.Debugf("Retrieved %d records so far", count) + } + // Fetch and send to channel + b := &Block{} + if err = rows.Scan(&b.PosX, &b.PosY, &b.PosZ, &b.Data); err != nil { + l.Errorf("Failed to read next item from iterator: %v; aborting", err) + return + } + ch <- b + } else { + l.Debug("Iterator finished, closing up rows and channel") + return + } + } + } + }() + + // Return channel to main component + return ch, done, nil +} + func (repo *postgresBlockRepository) Update(block *Block) error { _, err := repo.db.Exec("insert into blocks(posX,posY,posZ,data) values($1,$2,$3,$4) ON CONFLICT ON CONSTRAINT blocks_pkey DO UPDATE SET data = $4", block.PosX, block.PosY, block.PosZ, block.Data) diff --git a/block/block_postgres_test.go b/block/block_postgres_test.go index 5bda0b6..80ddf1b 100644 --- a/block/block_postgres_test.go +++ b/block/block_postgres_test.go @@ -1,6 +1,7 @@ package block_test import ( + "database/sql" "testing" "github.com/minetest-go/mtdb/block" @@ -8,26 +9,31 @@ import ( "github.com/stretchr/testify/assert" ) -func TestPostgresBlocksRepo(t *testing.T) { +func setupPostgress(t *testing.T) (block.BlockRepository, *sql.DB) { db, err := getPostgresDB(t) assert.NoError(t, err) + // Cleanup any previous data + db.Exec("delete from blocks") + assert.NoError(t, block.MigrateBlockDB(db, types.DATABASE_POSTGRES)) blocks_repo := block.NewBlockRepository(db, types.DATABASE_POSTGRES) - testBlocksRepository(t, blocks_repo) + + assert.NotNil(t, blocks_repo) + return blocks_repo, db } -func TestMaxConnections(t *testing.T) { - db, err := getPostgresDB(t) - assert.NoError(t, err) +func TestPostgresBlocksRepo(t *testing.T) { + blocks_repo, _ := setupPostgress(t) + testBlocksRepository(t, blocks_repo) +} - assert.NoError(t, block.MigrateBlockDB(db, types.DATABASE_POSTGRES)) - blocks_repo := block.NewBlockRepository(db, types.DATABASE_POSTGRES) - assert.NotNil(t, blocks_repo) +func TestPostgresMaxConnections(t *testing.T) { + blocks_repo, db := setupPostgress(t) var maxConnections int row := db.QueryRow("show max_connections;") - err = row.Scan(&maxConnections) + err := row.Scan(&maxConnections) assert.NoError(t, err) t.Logf("Testing against %v max connections", maxConnections) @@ -55,3 +61,22 @@ func TestMaxConnections(t *testing.T) { } } } + +func TestPostgresIterator(t *testing.T) { + blocks_repo, _ := setupPostgress(t) + testBlocksRepositoryIterator(t, blocks_repo) +} + +func TestPostgresIteratorErrorHandling(t *testing.T) { + blocks_repo, db := setupPostgress(t) + defer db.Close() + + testIteratorErrorHandling(t, blocks_repo, db, ` + ALTER TABLE blocks ALTER COLUMN posX TYPE float; + UPDATE blocks SET posX = 18446744073709551615;`) +} + +func TestPostgresIteratorCloser(t *testing.T) { + r, _ := setupPostgress(t) + testIteratorClose(t, r) +} diff --git a/block/block_sqlite.go b/block/block_sqlite.go index b4d6d06..e498ea0 100644 --- a/block/block_sqlite.go +++ b/block/block_sqlite.go @@ -6,6 +6,9 @@ import ( "bytes" "database/sql" "encoding/json" + + "github.com/minetest-go/mtdb/types" + "github.com/sirupsen/logrus" ) type sqliteBlockRepository struct { @@ -69,6 +72,63 @@ func (repo *sqliteBlockRepository) GetByPos(x, y, z int) (*Block, error) { return entry, err } +func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) { + pos := CoordToPlain(x, y, z) + rows, err := repo.db.Query(` + SELECT pos, data + FROM blocks + WHERE pos >= $1 + ORDER BY pos + `, pos) + if err != nil { + return nil, nil, err + } + + l := logrus. + WithField("iterating_from", []int{x, y, z}). + WithField("pos", pos) + ch := make(chan *Block) + done := make(types.WhenDone, 1) + count := int64(0) + + // Spawn go routine to fetch rows and send to channel + go func() { + defer close(ch) + defer rows.Close() + + l.Debug("Retrieving database rows") + for { + select { + case <-done: + l.Debugf("Iterator closed by caller. Finishing up...") + return + default: + if rows.Next() { + // Debug progress while fetching rows every 100's + count++ + if count%100 == 0 { + l.Debugf("Retrieved %d records so far", count) + } + // Fetch and send to channel + b := &Block{} + if err = rows.Scan(&pos, &b.Data); err != nil { + l.Errorf("Failed to read next item from iterator: %v", err) + return + } + b.PosX, b.PosY, b.PosZ = PlainToCoord(pos) + ch <- b + } else { + l.Debug("Iterator finished, closing up rows and channel") + return + } + } + } + }() + + // Return channel to main component + return ch, done, nil +} + func (repo *sqliteBlockRepository) Update(block *Block) error { pos := CoordToPlain(block.PosX, block.PosY, block.PosZ) _, err := repo.db.Exec("replace into blocks(pos,data) values($1, $2)", pos, block.Data) diff --git a/block/block_sqlite_test.go b/block/block_sqlite_test.go index cf634d4..447f20c 100644 --- a/block/block_sqlite_test.go +++ b/block/block_sqlite_test.go @@ -13,8 +13,7 @@ import ( "github.com/stretchr/testify/assert" ) -func TestSqliteBlockRepo(t *testing.T) { - // open db +func setupSqlite(t *testing.T) (block.BlockRepository, *sql.DB) { dbfile, err := os.CreateTemp(os.TempDir(), "map.sqlite") assert.NoError(t, err) assert.NotNil(t, dbfile) @@ -24,5 +23,57 @@ func TestSqliteBlockRepo(t *testing.T) { assert.NoError(t, block.MigrateBlockDB(db, types.DATABASE_SQLITE)) blocks_repo := block.NewBlockRepository(db, types.DATABASE_SQLITE) + return blocks_repo, db +} + +func TestSqliteBlockRepo(t *testing.T) { + // open db + blocks_repo, _ := setupSqlite(t) testBlocksRepository(t, blocks_repo) } + +func TestSqliteIterator(t *testing.T) { + blocks_repo, _ := setupSqlite(t) + testBlocksRepositoryIterator(t, blocks_repo) +} + +func TestSqliteIteratorErrorHandling(t *testing.T) { + blocks_repo, db := setupSqlite(t) + defer db.Close() + + testIteratorErrorHandling(t, blocks_repo, db, ` + UPDATE blocks SET pos = 18446744073709551615; + `) +} + +func TestCoordToPlain(t *testing.T) { + nodes := []struct { + x, y, z int + }{ + {0, 0, 0}, + {1, -1, 1}, + {-1, -1, -1}, + + {30912, 30912, 30912}, + {-30912, -30912, -30912}, + } + + for i, tc := range nodes { + t.Logf("Test case: #%d", i) + + x1, y1, z1 := block.AsBlockPos(tc.x, tc.y, tc.z) + pos := block.CoordToPlain(x1, y1, z1) + x2, y2, z2 := block.PlainToCoord(pos) + + t.Logf("in=%v,%v,%v => pos=%v => out=%v,%v,%v", x1, y1, z1, pos, x2, y2, z2) + if x1 != x2 || y1 != y2 || z1 != z2 { + t.Errorf("Unexpected coord returned from pos:"+ + "x=%v,y=%v=z=%v => x=%v, y=%v, z=%v", x1, y1, z1, x2, y2, z2) + } + } +} + +func TestSqliteIteratorCloser(t *testing.T) { + r, _ := setupSqlite(t) + testIteratorClose(t, r) +} diff --git a/block/block_test.go b/block/block_test.go index 5838a6d..7d23519 100644 --- a/block/block_test.go +++ b/block/block_test.go @@ -3,8 +3,10 @@ package block_test import ( "archive/zip" "bytes" + "database/sql" "os" "testing" + "time" "github.com/minetest-go/mtdb/block" "github.com/stretchr/testify/assert" @@ -78,3 +80,139 @@ func testBlocksRepository(t *testing.T, block_repo block.BlockRepository) { err = block_repo.Import(&z.Reader) assert.NoError(t, err) } + +func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepository) { + negX, negY, negZ := block.AsBlockPos(-32000, -32000, -32000) + posX, posY, posZ := block.AsBlockPos(32000, 32000, 32000) + + testData := []block.Block{ + {negX, negY, negZ, []byte("negative")}, + {0, 0, 0, []byte("zero")}, + {posX, posY, posZ, []byte("positive")}, + } + for i := range testData { + b := testData[i] + blocks_repo.Update(&b) + } + // logrus.SetLevel(logrus.DebugLevel) + + // Helper function to loop over all channel data + // TODO(ronoaldo): simplify this so we don't have + // to work so verbose. Perhaps implement a wrapper + // to the channel? + consumeAll := func(it chan *block.Block) int { + t.Logf("consumeAll: fetching data from iterator") + count := 0 + for { + select { + case b, ok := <-it: + if !ok { + return count + } + t.Logf("consumeAll: got %v", b) + count++ + case <-time.After(3 * time.Second): + t.Errorf("consumeAll: timed out") + return count + } + if count > 10 { + panic("consumeAll: too many items returned from channel") + } + } + } + + // Fetch from neg -> pos, retrieves all three blocks + it, _, err := blocks_repo.Iterator(negX, negY, negZ) + if assert.NoError(t, err) { + assert.Equal(t, 3, consumeAll(it)) + } + + // Fetch from zero -> pos, retrieves two blocks + it, _, err = blocks_repo.Iterator(0, 0, 0) + if assert.NoError(t, err) { + assert.Equal(t, 2, consumeAll(it)) + } + + // Fetch from zero +1 -> pos, retrieves only one + it, _, err = blocks_repo.Iterator(0, 0, 1) + if assert.NoError(t, err) { + assert.Equal(t, 1, consumeAll(it)) + } + + // Fetch from 2000,2000,2000, retrieves zero blocks + it, _, err = blocks_repo.Iterator(posX+1, posY+1, posZ+1) + if assert.NoError(t, err) { + assert.Equal(t, 0, consumeAll(it)) + } +} + +func testIteratorErrorHandling(t *testing.T, blocks_repo block.BlockRepository, db *sql.DB, mockDataCorruption string) { + logToTesting(t) + setUp := func() { + if err := blocks_repo.Update(&block.Block{1, 2, 3, []byte("default:stone")}); err != nil { + t.Fatalf("setUp: error loading test data: %v", err) + } + + // Forcing an error during iterator loop + if _, err := db.Exec(mockDataCorruption); err != nil { + t.Fatalf("Error renaming column: %v", err) + } + } + + tearDown := func() { + if _, err := db.Exec("DROP TABLE blocks"); err != nil { + t.Fatalf("tearDown: error resetting test db: %v", err) + } + } + + setUp() + defer tearDown() + + ch, _, err := blocks_repo.Iterator(0, 0, 0) + if err != nil { + t.Fatalf("Error loading the iterator: %v", err) + } + + count := 0 + for b := range ch { + t.Logf("Block: %v", b) + count++ + } + + assert.Equal(t, 0, count, "should not return any blocks when data is corrupted") +} + +func testIteratorClose(t *testing.T, r block.BlockRepository) { + logToTesting(t) + + // setUp: Generates 1000+ blocks + for x := -10; x <= 10; x += 2 { + for y := -10; y <= 10; y += 2 { + for z := -10; z <= 10; z += 2 { + r.Update(&block.Block{x, y, z, []byte("default:stone")}) + } + } + } + + it, cl, err := r.Iterator(0, 0, 0) + assert.NoError(t, err, "no error should be returned when initializing iterator") + assert.NotNil(t, cl, "closer should not be nil") + + count := 0 + for b := range it { + t.Logf("Block received: %v", b) + assert.NotNil(t, b, "should not return a nil block from iterator") + count++ + + if count >= 10 { + t.Logf("Closing the bridge at %d", count) + assert.NoError(t, cl.Close(), "closer should not have any errors") + break + } + } + + totalCount, err := r.Count() + assert.NoError(t, err, "should not return error when counting") + + t.Logf("Retrieved %d blocks from a total of %d", count, totalCount) +} diff --git a/block/util_test.go b/block/util_test.go index df61a8f..78305cd 100644 --- a/block/util_test.go +++ b/block/util_test.go @@ -7,6 +7,7 @@ import ( "testing" _ "github.com/lib/pq" + "github.com/sirupsen/logrus" ) func getPostgresDB(t *testing.T) (*sql.DB, error) { @@ -24,3 +25,17 @@ func getPostgresDB(t *testing.T) (*sql.DB, error) { return sql.Open("postgres", connStr) } + +type testingLogWriter struct { + t *testing.T +} + +func (l testingLogWriter) Write(b []byte) (n int, err error) { + l.t.Logf(string(b)) + return len(b), nil +} + +func logToTesting(t *testing.T) { + logrus.SetOutput(testingLogWriter{t}) + logrus.SetLevel(logrus.DebugLevel) +} diff --git a/types/closer.go b/types/closer.go new file mode 100644 index 0000000..9fddcab --- /dev/null +++ b/types/closer.go @@ -0,0 +1,18 @@ +package types + +// Closer is a type that signals the implementation that we are finished +// consuming the data/performing work and the async operation can be stopped. +// +// Used by the block.BlockRepository.Iterator implementation in order to perform +// an early stop. +type Closer interface { + Close() error +} + +// WhenDone is a simple channel-based implementation of the Closer interface +type WhenDone chan bool + +func (w WhenDone) Close() error { + w <- true + return nil +}