Skip to content

Commit

Permalink
fixs: pure-go build, iterator for postgres
Browse files Browse the repository at this point in the history
- Uses modernc.org/sqlite package
- Properly used row filters for Iterator
- Small updates for easy local testing
Refs minetest-go#77
  • Loading branch information
ronoaldo committed Dec 2, 2023
1 parent 3b9c878 commit a49b3e7
Show file tree
Hide file tree
Showing 19 changed files with 239 additions and 84 deletions.
4 changes: 2 additions & 2 deletions auth/auth_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"database/sql"
"testing"

_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb/auth"
"github.com/minetest-go/mtdb/types"
Expand All @@ -13,7 +13,7 @@ import (

func TestMigrateAuthSQlite(t *testing.T) {
// open db
db, err := sql.Open("sqlite3", ":memory:")
db, err := sql.Open("sqlite", ":memory:")
assert.NoError(t, err)

assert.NoError(t, auth.MigrateAuthDB(db, types.DATABASE_SQLITE))
Expand Down
10 changes: 5 additions & 5 deletions auth/auth_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"testing"

_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb/auth"
"github.com/minetest-go/mtdb/types"
Expand All @@ -15,7 +15,7 @@ import (

func TestEmptySQliteRepo(t *testing.T) {
// open db
db, err := sql.Open("sqlite3", ":memory:")
db, err := sql.Open("sqlite", ":memory:")
assert.NoError(t, err)
repo := auth.NewAuthRepository(db, types.DATABASE_SQLITE)
assert.NotNil(t, repo)
Expand All @@ -33,7 +33,7 @@ func TestSQliteRepo(t *testing.T) {
copyFileContents("testdata/auth.wal.sqlite", dbfile.Name())

// open db
db, err := sql.Open("sqlite3", "file:"+dbfile.Name())
db, err := sql.Open("sqlite", "file:"+dbfile.Name())
assert.NoError(t, err)
repo := auth.NewAuthRepository(db, types.DATABASE_SQLITE)
assert.NotNil(t, repo)
Expand Down Expand Up @@ -99,7 +99,7 @@ func TestSQlitePrivRepo(t *testing.T) {
copyFileContents("testdata/auth.wal.sqlite", dbfile.Name())

// open db
db, err := sql.Open("sqlite3", "file:"+dbfile.Name())
db, err := sql.Open("sqlite", "file:"+dbfile.Name())
assert.NoError(t, err)
repo := auth.NewPrivilegeRepository(db, types.DATABASE_SQLITE)
assert.NotNil(t, repo)
Expand Down Expand Up @@ -155,7 +155,7 @@ func TestSqliteAuthRepo(t *testing.T) {
dbfile, err := os.CreateTemp(os.TempDir(), "auth.sqlite")
assert.NoError(t, err)
assert.NotNil(t, dbfile)
db, err := sql.Open("sqlite3", "file:"+dbfile.Name())
db, err := sql.Open("sqlite", "file:"+dbfile.Name())
assert.NoError(t, err)
assert.NoError(t, auth.MigrateAuthDB(db, types.DATABASE_SQLITE))
assert.NoError(t, wal.EnableWAL(db))
Expand Down
4 changes: 2 additions & 2 deletions block/block_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@ import (
"database/sql"
"testing"

_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb/block"
"github.com/minetest-go/mtdb/types"
Expand All @@ -13,7 +13,7 @@ import (

func TestMigrateBlockSQlite(t *testing.T) {
// open db
db, err := sql.Open("sqlite3", ":memory:")
db, err := sql.Open("sqlite", ":memory:")
assert.NoError(t, err)

assert.NoError(t, block.MigrateBlockDB(db, types.DATABASE_SQLITE))
Expand Down
64 changes: 49 additions & 15 deletions block/block_postgres.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,51 +30,85 @@ func (repo *postgresBlockRepository) GetByPos(x, y, z int) (*Block, error) {
return entry, err
}

// IteratorBatchSize is a default value to be used while batching over Iterators.
var IteratorBatchSize = 4096

// iteratorQuery uses the keyset pagination as described in this SO question
// https://stackoverflow.com/q/57504274
//
// The WHERE (args) >= (params) works in a way to keep the sorting during comparision
// without the need to implement this as an offset/cursor
var iteratorQuery = `-- Query blocks to iterate over
SELECT posX, posY, posZ, data
FROM blocks
WHERE (posZ, posY, posX) > ($3, $2, $1)
ORDER BY posZ, posY, posX
LIMIT $4`

func (repo *postgresBlockRepository) Iterator(x, y, z int) (chan *Block, types.Closer, error) {
rows, err := repo.db.Query(`
SELECT posX, posY, posZ, data
FROM blocks
WHERE posX >= $1 AND posY >= $2 AND posZ >= $3
ORDER BY posX, posY, posZ
`, x, y, z)
// logging setup
l := logrus.WithField("iterating_from", []int{x, y, z})
count := int64(0)
page := 0
pageSize := 0

l.Debug("running database query")
rows, err := repo.db.Query(iteratorQuery, x, y, z, IteratorBatchSize)
if err != nil {
return nil, nil, err
}

l := logrus.WithField("iterating_from", []int{x, y, z})
ch := make(chan *Block)
ch := make(chan *Block, IteratorBatchSize)
done := make(types.WhenDone, 1)
count := int64(0)
lastPos := Block{}

// Spawn go routine to fetch rows and send to channel
go func() {
defer close(ch)
defer rows.Close()

l.Debug("Retrieving database rows ...")
l.Debug("retrieving database rows ...")
for {
select {
case <-done:
// We can now return, we are done
l.Debugf("Iterator closed by caller. Finishing up...")
l.Debugf("iterator closed by caller; finishing up...")
return
default:
if rows.Next() {
// Debug progress while fetching rows every 100's
count++
pageSize++
if count%100 == 0 {
l.Debugf("Retrieved %d records so far", count)
l.Debugf("retrieved %d records so far", count)
}
// Fetch and send to channel
b := &Block{}
if err = rows.Scan(&b.PosX, &b.PosY, &b.PosZ, &b.Data); err != nil {
l.Errorf("Failed to read next item from iterator: %v; aborting", err)
l.Errorf("failed to read next item from iterator: %v; aborting", err)
return
}
lastPos.PosX, lastPos.PosY, lastPos.PosZ = b.PosX, b.PosY, b.PosZ
ch <- b
} else {
l.Debug("Iterator finished, closing up rows and channel")
return
f := logrus.Fields{"last_pos": lastPos, "last_page_size": pageSize, "page": page}
if pageSize > 0 {
page++
// If the previous batch is > 0, restart the query from last position
if err := rows.Close(); err != nil {
l.WithField("err", err).Warning("error closing previous batch")
}
rows, err = repo.db.Query(iteratorQuery, lastPos.PosX, lastPos.PosY, lastPos.PosZ, IteratorBatchSize)
if err != nil {
l.WithField("err", err).Warning("error restarting query")
return
}
pageSize = 0
l.WithFields(f).Debug("batch finished, restarting next batch")
} else {
l.WithFields(f).Debug("iterator finished, closing up rows and channel")
return
}
}
}
}
Expand Down
18 changes: 18 additions & 0 deletions block/block_postgres_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,24 @@ func TestPostgresMaxConnections(t *testing.T) {
}

func TestPostgresIterator(t *testing.T) {
logToTesting(t)
blocks_repo, _ := setupPostgress(t)
testBlocksRepositoryIterator(t, blocks_repo)
}

func TestPostgresIteratorBatches(t *testing.T) {
logToTesting(t)

oldSize := block.IteratorBatchSize
setUp := func() {
block.IteratorBatchSize = 1
}
tearDown := func() {
block.IteratorBatchSize = oldSize
}

setUp()
defer tearDown()
blocks_repo, _ := setupPostgress(t)
testBlocksRepositoryIterator(t, blocks_repo)
}
Expand Down
2 changes: 1 addition & 1 deletion block/block_sqlite.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ func (repo *sqliteBlockRepository) Iterator(x, y, z int) (chan *Block, types.Clo
rows, err := repo.db.Query(`
SELECT pos, data
FROM blocks
WHERE pos >= $1
WHERE pos > $1
ORDER BY pos
`, pos)
if err != nil {
Expand Down
4 changes: 2 additions & 2 deletions block/block_sqlite_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ import (
"os"
"testing"

_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb/block"
"github.com/minetest-go/mtdb/types"
Expand All @@ -17,7 +17,7 @@ func setupSqlite(t *testing.T) (block.BlockRepository, *sql.DB) {
dbfile, err := os.CreateTemp(os.TempDir(), "map.sqlite")
assert.NoError(t, err)
assert.NotNil(t, dbfile)
db, err := sql.Open("sqlite3", "file:"+dbfile.Name())
db, err := sql.Open("sqlite", "file:"+dbfile.Name())
assert.NoError(t, err)
assert.NoError(t, wal.EnableWAL(db))

Expand Down
78 changes: 49 additions & 29 deletions block/block_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,25 +82,36 @@ func testBlocksRepository(t *testing.T, block_repo block.BlockRepository) {
}

func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepository) {
negX, negY, negZ := block.AsBlockPos(-32000, -32000, -32000)
posX, posY, posZ := block.AsBlockPos(32000, 32000, 32000)

testData := []block.Block{
{negX, negY, negZ, []byte("negative")},
{0, 0, 0, []byte("zero")},
{posX, posY, posZ, []byte("positive")},
// For readability, octals < 8 were used so they don't change the actuall value.
{-2000, -2000, -2000, []byte("negative[0]")},
{-1999, -0004, -2000, []byte("negative[1]")},
{+0000, +0000, +0000, []byte("zero")},
{-2001, +0000, +0001, []byte("negative[2]")},
{+0001, +0002, +2000, []byte("positive[0]")},
{+2000, +2000, +2000, []byte("positive[1]")},
}
setUp := func() {
for i := range testData {
b := testData[i]
blocks_repo.Update(&b)
}
}
for i := range testData {
b := testData[i]
blocks_repo.Update(&b)
tearDown := func() {
for _, b := range testData {
blocks_repo.Delete(b.PosX, b.PosY, b.PosZ)
}
}
// logrus.SetLevel(logrus.DebugLevel)

setUp()
defer tearDown()

// Helper function to loop over all channel data
// TODO(ronoaldo): simplify this so we don't have
// to work so verbose. Perhaps implement a wrapper
// to the channel?
consumeAll := func(it chan *block.Block) int {
consumeAll := func(tc string, it chan *block.Block) int {
t.Logf("Test Case: %s", tc)
t.Logf("consumeAll: fetching data from iterator")
count := 0
for {
Expand All @@ -121,28 +132,37 @@ func testBlocksRepositoryIterator(t *testing.T, blocks_repo block.BlockRepositor
}
}

// Fetch from neg -> pos, retrieves all three blocks
it, _, err := blocks_repo.Iterator(negX, negY, negZ)
if assert.NoError(t, err) {
assert.Equal(t, 3, consumeAll(it))
type testCase struct {
x int
y int
z int
name string
expected int
}

// Fetch from zero -> pos, retrieves two blocks
it, _, err = blocks_repo.Iterator(0, 0, 0)
if assert.NoError(t, err) {
assert.Equal(t, 2, consumeAll(it))
// Sort order should be (z, y, x) to keep consistency with how sqlite
// in minetest works (using a single pos field with z+y+x summed and byte-shifted)
testCases := []testCase{
{-2001, -2001, -2001, "starting from -2000,-2000,-2000 should return 6", 6},
{-2000, -2000, -2000, "starting from -2000,-2000,-2000 should return 5", 5},
{-0001, -0001, -0001, "starting from -0001,-0001,-0001 should return 4", 4},
{+0000, +0000, +0000, "starting from +0000,+0000,+0000 should return 3", 3},
{+0000, +0000, +0001, "starting from +0000,+0000,+0001 should return 2", 2},
{+0000, +0000, +1999, "starting from +2000,+2000,+1999 should return 2", 2},
{+1999, +1999, +1999, "starting from +1999,+1999,+1999 should return 2", 2},
{+2000, +2000, +2000, "starting from +2000,+2000,+2000 should return 0", 0},
{+2000, +2000, +2001, "starting from +2000,+2000,+2001 should return 0", 0},
{+2000, +2001, +2000, "starting from +2000,+2001,+2000 should return 0", 0},
{+2001, +2000, +2000, "starting from +2001,+2000,+2000 should return 0", 0},
}

// Fetch from zero +1 -> pos, retrieves only one
it, _, err = blocks_repo.Iterator(0, 0, 1)
if assert.NoError(t, err) {
assert.Equal(t, 1, consumeAll(it))
}

// Fetch from 2000,2000,2000, retrieves zero blocks
it, _, err = blocks_repo.Iterator(posX+1, posY+1, posZ+1)
if assert.NoError(t, err) {
assert.Equal(t, 0, consumeAll(it))
for _, tc := range testCases {
t.Run(tc.name, func(t *testing.T) {
it, _, err := blocks_repo.Iterator(tc.x, tc.y, tc.z)
if assert.NoError(t, err) {
assert.Equal(t, tc.expected, consumeAll(tc.name, it))
}
})
}
}

Expand Down
8 changes: 6 additions & 2 deletions context.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,10 +3,10 @@ package mtdb
import (
"archive/zip"
"database/sql"
"fmt"
"path"

_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
"github.com/minetest-go/mtdb/auth"
"github.com/minetest-go/mtdb/block"
"github.com/minetest-go/mtdb/mod_storage"
Expand All @@ -15,6 +15,7 @@ import (
"github.com/minetest-go/mtdb/wal"
"github.com/minetest-go/mtdb/worldconfig"
"github.com/sirupsen/logrus"
_ "modernc.org/sqlite"
)

type Context struct {
Expand Down Expand Up @@ -78,7 +79,7 @@ func connectAndMigrate(t types.DatabaseType, sqliteConn, psqlConn string, migFn
default:
// default to sqlite
datasource = sqliteConn
dbtype = "sqlite3"
dbtype = "sqlite"
}

if t == types.DATABASE_POSTGRES && datasource == "" {
Expand Down Expand Up @@ -136,6 +137,9 @@ func New(world_dir string) (*Context, error) {
}
if ctx.map_db != nil {
ctx.Blocks = block.NewBlockRepository(ctx.map_db, dbtype)
if ctx.Blocks == nil {
return nil, fmt.Errorf("invalid repository dbtype: %v", dbtype)
}
ctx.backuprepos = append(ctx.backuprepos, ctx.Blocks)
}

Expand Down
2 changes: 1 addition & 1 deletion context_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ import (
"testing"

_ "github.com/lib/pq"
_ "github.com/mattn/go-sqlite3"
_ "modernc.org/sqlite"

"github.com/minetest-go/mtdb"
"github.com/stretchr/testify/assert"
Expand Down
Loading

0 comments on commit a49b3e7

Please sign in to comment.