Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fixes: pure-go build, iterator for postgres #80

Merged
merged 2 commits into from
Dec 3, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions auth/auth_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"database/sql"
"testing"

_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb/auth"
"github.com/minetest-go/mtdb/types"
Expand All @@ -13,7 +13,7 @@ import (

func TestMigrateAuthSQlite(t *testing.T) {
// open db
db, err := sql.Open("sqlite3", ":memory:")
db, err := sql.Open("sqlite", ":memory:")
assert.NoError(t, err)

assert.NoError(t, auth.MigrateAuthDB(db, types.DATABASE_SQLITE))
Expand Down
10 changes: 5 additions & 5 deletions auth/auth_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"testing"

_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb/auth"
"github.com/minetest-go/mtdb/types"
Expand All @@ -15,7 +15,7 @@ import (

func TestEmptySQliteRepo(t *testing.T) {
// open db
db, err := sql.Open("sqlite3", ":memory:")
db, err := sql.Open("sqlite", ":memory:")
assert.NoError(t, err)
repo := auth.NewAuthRepository(db, types.DATABASE_SQLITE)
assert.NotNil(t, repo)
Expand All @@ -33,7 +33,7 @@ func TestSQliteRepo(t *testing.T) {
copyFileContents("testdata/auth.wal.sqlite", dbfile.Name())

// open db
db, err := sql.Open("sqlite3", "file:"+dbfile.Name())
db, err := sql.Open("sqlite", "file:"+dbfile.Name())
assert.NoError(t, err)
repo := auth.NewAuthRepository(db, types.DATABASE_SQLITE)
assert.NotNil(t, repo)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestSQlitePrivRepo(t *testing.T) {
copyFileContents("testdata/auth.wal.sqlite", dbfile.Name())

// open db
db, err := sql.Open("sqlite3", "file:"+dbfile.Name())
db, err := sql.Open("sqlite", "file:"+dbfile.Name())
assert.NoError(t, err)
repo := auth.NewPrivilegeRepository(db, types.DATABASE_SQLITE)
assert.NotNil(t, repo)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestSqliteAuthRepo(t *testing.T) {
dbfile, err := os.CreateTemp(os.TempDir(), "auth.sqlite")
assert.NoError(t, err)
assert.NotNil(t, dbfile)
db, err := sql.Open("sqlite3", "file:"+dbfile.Name())
db, err := sql.Open("sqlite", "file:"+dbfile.Name())
assert.NoError(t, err)
assert.NoError(t, auth.MigrateAuthDB(db, types.DATABASE_SQLITE))
assert.NoError(t, wal.EnableWAL(db))
Expand Down
6 changes: 4 additions & 2 deletions block/block.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,15 +27,17 @@ func (b *Block) String() string {
}

// BlockRepository implementes data access layer for the Minetest map data.
// All positions are in mapblock coordinates, as described here:
// https://github.com/minetest/minetest/blob/master/doc/lua_api.md#mapblock-coordinates
type BlockRepository interface {
types.Backup

// GetByPost returns the map block at positions X,Y,Z.
GetByPos(x, y, z int) (*Block, error)

// Iterator returns a channel to fetch all data from the starting position
// X,Y,Z, with the map blocks sorted by position ascending. Sorting is done
// by X,Y and Z coordinates.
// X,Y,Z (exclusive), with the map blocks sorted by position ascending.
// Sorting is done by Z, Y, X to keep consistency with Sqlite map format.
Iterator(x, y, z int) (chan *Block, types.Closer, error)

// Update upserts the provided map block in the database, using the position
Expand Down
4 changes: 2 additions & 2 deletions block/block_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"database/sql"
"testing"

_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb/block"
"github.com/minetest-go/mtdb/types"
Expand All @@ -13,7 +13,7 @@ import (

func TestMigrateBlockSQlite(t *testing.T) {
// open db
db, err := sql.Open("sqlite3", ":memory:")
db, err := sql.Open("sqlite", ":memory:")
assert.NoError(t, err)

assert.NoError(t, block.MigrateBlockDB(db, types.DATABASE_SQLITE))
Expand Down
64 changes: 49 additions & 15 deletions block/block_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,51 +30,85 @@ func (repo *postgresBlockRepository) GetByPos(x, y, z int) (*Block, error) {
return entry, err
}

// IteratorBatchSize is a default value to be used while batching over Iterators.
var IteratorBatchSize = 4096

// iteratorQuery uses the keyset pagination as described in this SO question
// https://stackoverflow.com/q/57504274
//
// The WHERE (args) >= (params) works in a way to keep the sorting during comparision
// without the need to implement this as an offset/cursor
var iteratorQuery = `-- Query blocks to iterate over
SELECT posX, posY, posZ, data
FROM blocks
WHERE (posZ, posY, posX) > ($3, $2, $1)
ORDER BY posZ, posY, posX
LIMIT $4`

func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) {
rows, err := repo.db.Query(`
SELECT posX, posY, posZ, data
FROM blocks
WHERE posX >= $1 AND posY >= $2 AND posZ >= $3
ORDER BY posX, posY, posZ
`, x, y, z)
// logging setup
l := logrus.WithField("iterating_from", []int{x, y, z})
count := int64(0)
page := 0
pageSize := 0

l.Debug("running database query")
rows, err := repo.db.Query(iteratorQuery, x, y, z, IteratorBatchSize)
if err != nil {
return nil, nil, err
}

l := logrus.WithField("iterating_from", []int{x, y, z})
ch := make(chan *Block)
ch := make(chan *Block, IteratorBatchSize)
done := make(types.WhenDone, 1)
count := int64(0)
lastPos := Block{}

// Spawn go routine to fetch rows and send to channel
go func() {
defer close(ch)
defer rows.Close()

l.Debug("Retrieving database rows ...")
l.Debug("retrieving database rows ...")
for {
select {
case <-done:
// We can now return, we are done
l.Debugf("Iterator closed by caller. Finishing up...")
l.Debugf("iterator closed by caller; finishing up...")
return
default:
if rows.Next() {
// Debug progress while fetching rows every 100's
count++
pageSize++
if count%100 == 0 {
l.Debugf("Retrieved %d records so far", count)
l.Debugf("retrieved %d records so far", count)
}
// Fetch and send to channel
b := &Block{}
if err = rows.Scan(&b.PosX, &b.PosY, &b.PosZ, &b.Data); err != nil {
l.Errorf("Failed to read next item from iterator: %v; aborting", err)
l.Errorf("failed to read next item from iterator: %v; aborting", err)
return
}
lastPos.PosX, lastPos.PosY, lastPos.PosZ = b.PosX, b.PosY, b.PosZ
ch <- b
} else {
l.Debug("Iterator finished, closing up rows and channel")
return
f := logrus.Fields{"last_pos": lastPos, "last_page_size": pageSize, "page": page}
if pageSize > 0 {
page++
// If the previous batch is > 0, restart the query from last position
if err := rows.Close(); err != nil {
l.WithField("err", err).Warning("error closing previous batch")
}
rows, err = repo.db.Query(iteratorQuery, lastPos.PosX, lastPos.PosY, lastPos.PosZ, IteratorBatchSize)
if err != nil {
l.WithField("err", err).Warning("error restarting query")
return
}
pageSize = 0
l.WithFields(f).Debug("batch finished, restarting next batch")
} else {
l.WithFields(f).Debug("iterator finished, closing up rows and channel")
return
}
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions block/block_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ func TestPostgresMaxConnections(t *testing.T) {
}

func TestPostgresIterator(t *testing.T) {
logToTesting(t)
blocks_repo, _ := setupPostgress(t)
testBlocksRepositoryIterator(t, blocks_repo)
}

func TestPostgresIteratorBatches(t *testing.T) {
logToTesting(t)

oldSize := block.IteratorBatchSize
setUp := func() {
block.IteratorBatchSize = 1
}
tearDown := func() {
block.IteratorBatchSize = oldSize
}

setUp()
defer tearDown()
blocks_repo, _ := setupPostgress(t)
testBlocksRepositoryIterator(t, blocks_repo)
}
Expand Down
2 changes: 1 addition & 1 deletion block/block_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, types.Clo
rows, err := repo.db.Query(`
SELECT pos, data
FROM blocks
WHERE pos >= $1
WHERE pos > $1
ORDER BY pos
`, pos)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions block/block_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"testing"

_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb/block"
"github.com/minetest-go/mtdb/types"
Expand All @@ -17,7 +17,7 @@ func setupSqlite(t *testing.T) (block.BlockRepository, *sql.DB) {
dbfile, err := os.CreateTemp(os.TempDir(), "map.sqlite")
assert.NoError(t, err)
assert.NotNil(t, dbfile)
db, err := sql.Open("sqlite3", "file:"+dbfile.Name())
db, err := sql.Open("sqlite", "file:"+dbfile.Name())
assert.NoError(t, err)
assert.NoError(t, wal.EnableWAL(db))

Expand Down
78 changes: 49 additions & 29 deletions block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,36 @@ func testBlocksRepository(t *testing.T, block_repo block.BlockRepository) {
}

func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepository) {
negX, negY, negZ := block.AsBlockPos(-32000, -32000, -32000)
posX, posY, posZ := block.AsBlockPos(32000, 32000, 32000)

testData := []block.Block{
{negX, negY, negZ, []byte("negative")},
{0, 0, 0, []byte("zero")},
{posX, posY, posZ, []byte("positive")},
// For readability, octals < 8 were used so they don't change the actuall value.
{-2000, -2000, -2000, []byte("negative[0]")},
{-1999, -0004, -2000, []byte("negative[1]")},
{+0000, +0000, +0000, []byte("zero")},
{-2001, +0000, +0001, []byte("negative[2]")},
{+0001, +0002, +2000, []byte("positive[0]")},
{+2000, +2000, +2000, []byte("positive[1]")},
}
setUp := func() {
for i := range testData {
b := testData[i]
blocks_repo.Update(&b)
}
}
for i := range testData {
b := testData[i]
blocks_repo.Update(&b)
tearDown := func() {
for _, b := range testData {
blocks_repo.Delete(b.PosX, b.PosY, b.PosZ)
}
}
// logrus.SetLevel(logrus.DebugLevel)

setUp()
defer tearDown()

// Helper function to loop over all channel data
// TODO(ronoaldo): simplify this so we don't have
// to work so verbose. Perhaps implement a wrapper
// to the channel?
consumeAll := func(it chan *block.Block) int {
consumeAll := func(tc string, it chan *block.Block) int {
t.Logf("Test Case: %s", tc)
t.Logf("consumeAll: fetching data from iterator")
count := 0
for {
Expand All @@ -121,28 +132,37 @@ func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepositor
}
}

// Fetch from neg -> pos, retrieves all three blocks
it, _, err := blocks_repo.Iterator(negX, negY, negZ)
if assert.NoError(t, err) {
assert.Equal(t, 3, consumeAll(it))
type testCase struct {
x int
y int
z int
name string
expected int
}

// Fetch from zero -> pos, retrieves two blocks
it, _, err = blocks_repo.Iterator(0, 0, 0)
if assert.NoError(t, err) {
assert.Equal(t, 2, consumeAll(it))
// Sort order should be (z, y, x) to keep consistency with how sqlite
// in minetest works (using a single pos field with z+y+x summed and byte-shifted)
testCases := []testCase{
{-2001, -2001, -2001, "starting from -2000,-2000,-2000 should return 6", 6},
{-2000, -2000, -2000, "starting from -2000,-2000,-2000 should return 5", 5},
{-0001, -0001, -0001, "starting from -0001,-0001,-0001 should return 4", 4},
{+0000, +0000, +0000, "starting from +0000,+0000,+0000 should return 3", 3},
{+0000, +0000, +0001, "starting from +0000,+0000,+0001 should return 2", 2},
{+0000, +0000, +1999, "starting from +2000,+2000,+1999 should return 2", 2},
{+1999, +1999, +1999, "starting from +1999,+1999,+1999 should return 2", 2},
{+2000, +2000, +2000, "starting from +2000,+2000,+2000 should return 0", 0},
{+2000, +2000, +2001, "starting from +2000,+2000,+2001 should return 0", 0},
{+2000, +2001, +2000, "starting from +2000,+2001,+2000 should return 0", 0},
{+2001, +2000, +2000, "starting from +2001,+2000,+2000 should return 0", 0},
}

// Fetch from zero +1 -> pos, retrieves only one
it, _, err = blocks_repo.Iterator(0, 0, 1)
if assert.NoError(t, err) {
assert.Equal(t, 1, consumeAll(it))
}

// Fetch from 2000,2000,2000, retrieves zero blocks
it, _, err = blocks_repo.Iterator(posX+1, posY+1, posZ+1)
if assert.NoError(t, err) {
assert.Equal(t, 0, consumeAll(it))
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
it, _, err := blocks_repo.Iterator(tc.x, tc.y, tc.z)
if assert.NoError(t, err) {
assert.Equal(t, tc.expected, consumeAll(tc.name, it))
}
})
}
}

Expand Down
8 changes: 6 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package mtdb
import (
"archive/zip"
"database/sql"
"fmt"
"path"

_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/minetest-go/mtdb/auth"
"github.com/minetest-go/mtdb/block"
"github.com/minetest-go/mtdb/mod_storage"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/minetest-go/mtdb/wal"
"github.com/minetest-go/mtdb/worldconfig"
"github.com/sirupsen/logrus"
_ "modernc.org/sqlite"
)

type Context struct {
Expand Down Expand Up @@ -78,7 +79,7 @@ func connectAndMigrate(t types.DatabaseType, sqliteConn, psqlConn string, migFn
default:
// default to sqlite
datasource = sqliteConn
dbtype = "sqlite3"
dbtype = "sqlite"
}

if t == types.DATABASE_POSTGRES && datasource == "" {
Expand Down Expand Up @@ -136,6 +137,9 @@ func New(world_dir string) (*Context, error) {
}
if ctx.map_db != nil {
ctx.Blocks = block.NewBlockRepository(ctx.map_db, dbtype)
if ctx.Blocks == nil {
return nil, fmt.Errorf("invalid repository dbtype: %v", dbtype)
}
ctx.backuprepos = append(ctx.backuprepos, ctx.Blocks)
}

Expand Down
Loading
Loading