Skip to content

Commit

Permalink
Add DB
Browse files Browse the repository at this point in the history
  • Loading branch information
neekolas committed Jul 24, 2024
1 parent 8c35627 commit 11c141d
Show file tree
Hide file tree
Showing 9 changed files with 110 additions and 8 deletions.
12 changes: 10 additions & 2 deletions dev/docker/docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -6,18 +6,26 @@ services:
ports:
- 15432:5432
authz-db:
image: postgres:13
image: postgres:16
environment:
POSTGRES_PASSWORD: xmtp
ports:
- 6543:5432

mls-db:
image: postgres:13
image: postgres:16
environment:
POSTGRES_PASSWORD: xmtp
ports:
- 7654:5432

replication-db:
image: postgres:16
environment:
POSTGRES_PASSWORD: xmtp
ports:
- 8765:5432

prometheus:
image: prom/prometheus
ports:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,8 @@
SET statement_timeout = 0;

--bun:split
SELECT
1
--bun:split
SELECT
2
6 changes: 6 additions & 0 deletions pkg/migrations/replication/20240528181852_init-schema.up.sql
Original file line number Diff line number Diff line change
@@ -0,0 +1,6 @@
SET statement_timeout = 0;

--bun:split
SELECT
1;

18 changes: 18 additions & 0 deletions pkg/migrations/replication/migrate.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,18 @@
package replication

import (
"embed"

"github.com/uptrace/bun/migrate"
)

var Migrations = migrate.NewMigrations()

//go:embed *.sql
var sqlMigrations embed.FS

func init() {
if err := Migrations.Discover(sqlMigrations); err != nil {
panic(err)
}
}
10 changes: 8 additions & 2 deletions pkg/replication/options.go
Original file line number Diff line number Diff line change
@@ -1,12 +1,18 @@
package replication

import "time"

type ApiOptions struct {
Port int `short:"p" long:"port" description:"Port to listen on" default:"5050"`
}

type DbOptions struct {
ReaderConnectionString string `long:"reader-connection-string" description:"Reader connection string"`
WriterConnectionString string `long:"writer-connection-string" description:"Writer connection string" required:"true"`
ReaderConnectionString string `long:"reader-connection-string" description:"Reader connection string"`
WriterConnectionString string `long:"writer-connection-string" description:"Writer connection string" required:"true"`
ReadTimeout time.Duration `long:"read-timeout" description:"Timeout for reading from the database" default:"10s"`
WriteTimeout time.Duration `long:"write-timeout" description:"Timeout for writing to the database" default:"10s"`
MaxOpenConns int `long:"max-open-conns" description:"Maximum number of open connections" default:"80"`
WaitForDB time.Duration `long:"wait-for" description:"wait for DB on start, up to specified duration"`
}

type Options struct {
Expand Down
40 changes: 40 additions & 0 deletions pkg/replication/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,14 +3,20 @@ package replication
import (
"context"
"crypto/ecdsa"
"database/sql"
"net"
"os"
"os/signal"
"syscall"

"github.com/ethereum/go-ethereum/crypto"
"github.com/uptrace/bun"
"github.com/uptrace/bun/dialect/pgdialect"
"github.com/uptrace/bun/migrate"
"github.com/xmtp/xmtp-node-go/pkg/migrations/replication"
"github.com/xmtp/xmtp-node-go/pkg/replication/api"
"github.com/xmtp/xmtp-node-go/pkg/replication/registry"
legacyServer "github.com/xmtp/xmtp-node-go/pkg/server"
"go.uber.org/zap"
)

Expand All @@ -22,6 +28,8 @@ type Server struct {
apiServer *api.ApiServer
nodeRegistry registry.NodeRegistry
privateKey *ecdsa.PrivateKey
writerDb *sql.DB
// Can add reader DB later if needed
}

func New(ctx context.Context, log *zap.Logger, options Options, nodeRegistry registry.NodeRegistry) (*Server, error) {
Expand All @@ -35,6 +43,10 @@ func New(ctx context.Context, log *zap.Logger, options Options, nodeRegistry reg
if err != nil {
return nil, err
}
s.writerDb, err = getWriterDb(options.DB)
if err != nil {
return nil, err
}

s.ctx, s.cancel = context.WithCancel(ctx)
s.apiServer, err = api.NewAPIServer(ctx, log, options.API.Port)
Expand All @@ -45,6 +57,25 @@ func New(ctx context.Context, log *zap.Logger, options Options, nodeRegistry reg
return s, nil
}

func (s *Server) Migrate() error {
db := bun.NewDB(s.writerDb, pgdialect.New())
migrator := migrate.NewMigrator(db, replication.Migrations)
err := migrator.Init(s.ctx)
if err != nil {
return err
}

group, err := migrator.Migrate(s.ctx)
if err != nil {
return err
}
if group.IsZero() {
s.log.Info("No new migrations to run")
}

return nil
}

func (s *Server) Addr() net.Addr {
return s.apiServer.Addr()
}
Expand All @@ -63,3 +94,12 @@ func (s *Server) Shutdown() {
func parsePrivateKey(privateKeyString string) (*ecdsa.PrivateKey, error) {
return crypto.HexToECDSA(privateKeyString)
}

func getWriterDb(options DbOptions) (*sql.DB, error) {
db, err := legacyServer.NewPGXDB(options.WriterConnectionString, options.WaitForDB, options.ReadTimeout)
if err != nil {
return nil, err
}

return db, nil
}
16 changes: 16 additions & 0 deletions pkg/replication/server_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,13 +4,16 @@ import (
"context"
"encoding/hex"
"testing"
"time"

"github.com/ethereum/go-ethereum/crypto"
"github.com/stretchr/testify/require"
"github.com/xmtp/xmtp-node-go/pkg/replication/registry"
test "github.com/xmtp/xmtp-node-go/pkg/testing"
)

const WRITER_DB_CONNECTION_STRING = "postgres://postgres:xmtp@localhost:8765/postgres?sslmode=disable"

func NewTestServer(t *testing.T, registry registry.NodeRegistry) *Server {
log := test.NewLog(t)
privateKey, err := crypto.GenerateKey()
Expand All @@ -21,6 +24,13 @@ func NewTestServer(t *testing.T, registry registry.NodeRegistry) *Server {
API: ApiOptions{
Port: 0,
},
DB: DbOptions{
WriterConnectionString: WRITER_DB_CONNECTION_STRING,
ReadTimeout: time.Second * 10,
WriteTimeout: time.Second * 10,
MaxOpenConns: 10,
WaitForDB: time.Second * 10,
},
}, registry)
require.NoError(t, err)

Expand All @@ -33,3 +43,9 @@ func TestCreateServer(t *testing.T) {
server2 := NewTestServer(t, registry)
require.NotEqual(t, server1.Addr(), server2.Addr())
}

func TestMigrate(t *testing.T) {
registry := registry.NewFixedNodeRegistry([]registry.Node{})
server1 := NewTestServer(t, registry)
require.NoError(t, server1.Migrate())
}
6 changes: 3 additions & 3 deletions pkg/server/pgxdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ import (
"github.com/uptrace/bun/dialect/pgdialect"
)

func newPGXDB(dsn string, waitForDB, statementTimeout time.Duration) (*sql.DB, error) {
func NewPGXDB(dsn string, waitForDB, statementTimeout time.Duration) (*sql.DB, error) {
config, err := pgxpool.ParseConfig(dsn)
if err != nil {
return nil, err
Expand All @@ -38,8 +38,8 @@ func newPGXDB(dsn string, waitForDB, statementTimeout time.Duration) (*sql.DB, e
return db, nil
}

func newBunPGXDb(dsn string, waitForDB, statementTimeout time.Duration) (*bun.DB, error) {
pgxDb, err := newPGXDB(dsn, waitForDB, statementTimeout)
func NewBunPGXDb(dsn string, waitForDB, statementTimeout time.Duration) (*bun.DB, error) {
pgxDb, err := NewPGXDB(dsn, waitForDB, statementTimeout)
if err != nil {
return nil, err
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -231,7 +231,7 @@ func New(ctx context.Context, log *zap.Logger, options Options) (*Server, error)

var MLSStore *mlsstore.Store
if options.MLSStore.DbConnectionString != "" {
if s.mlsDB, err = newBunPGXDb(options.MLSStore.DbConnectionString, options.WaitForDB, options.MLSStore.ReadTimeout); err != nil {
if s.mlsDB, err = NewBunPGXDb(options.MLSStore.DbConnectionString, options.WaitForDB, options.MLSStore.ReadTimeout); err != nil {
return nil, errors.Wrap(err, "creating mls db")
}

Expand Down

0 comments on commit 11c141d

Please sign in to comment.