Skip to content

Commit

Permalink
first cut at automatic snapshotting
Browse files Browse the repository at this point in the history
- had to make sure we don't snapshot until directive is fully applied
on a computer... otherwise there's races between loading the files and
truncating the write log.

- added a dirty bit to resources and a bool return to incrementing the
write log... don't snapshot if it returns false because that means
there's been no writes. (but make sure you close the storage transaction!)

- added the actually snapshotting routine which just fires every
<timeout> and serially snapshots everything.

- tweaked some logging

- added ability to get all tables in an org/db or literally all. I
think I just needed the "literally all", but it was natural to allow
it to be scoped to org or DB as well.

(cherry picked from commit b8b08bc)
  • Loading branch information
jaffee authored and ch7ck committed Jan 10, 2023
1 parent f8120d4 commit 0f67a0c
Show file tree
Hide file tree
Showing 19 changed files with 231 additions and 38 deletions.
26 changes: 22 additions & 4 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -3098,6 +3098,9 @@ func (api *API) DirectiveApplied(ctx context.Context) (bool, error) {
// SnapshotShardData triggers the node to perform a shard snapshot based on the
// provided SnapshotShardDataRequest.
func (api *API) SnapshotShardData(ctx context.Context, req *dax.SnapshotShardDataRequest) error {
if !api.holder.DirectiveApplied() {
return errors.New("don't have directive yet, can't snapshot shard")
}
// TODO(jaffee) confirm this node is actually responsible for the given
// shard? Not sure we need to given that this request comes from
// MDS, but might be a belt&suspenders situation.
Expand All @@ -3112,11 +3115,14 @@ func (api *API) SnapshotShardData(ctx context.Context, req *dax.SnapshotShardDat
if err != nil {
return errors.Wrap(err, "getting index/shard readcloser")
}
defer rc.Close()

resource := api.serverlessStorage.GetShardResource(qtid, partitionNum, req.ShardNum)
// Bump writelog version while write Tx is held.
if err := resource.IncrementWLVersion(); err != nil {
if ok, err := resource.IncrementWLVersion(); err != nil {
return errors.Wrap(err, "incrementing write log version")
} else if !ok {
return nil
}
// TODO(jaffee) look into downgrading Tx on RBF to read lock here now that WL version is incremented.
err = resource.Snapshot(rc)
Expand All @@ -3126,6 +3132,9 @@ func (api *API) SnapshotShardData(ctx context.Context, req *dax.SnapshotShardDat
// SnapshotTableKeys triggers the node to perform a table keys snapshot based on
// the provided SnapshotTableKeysRequest.
func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKeysRequest) error {
if !api.holder.DirectiveApplied() {
return errors.New("don't have directive yet, can't snapshot table keys")
}
// If the index is not keyed, no-op on snapshotting its keys.
if idx, err := api.Index(ctx, string(req.TableKey)); err != nil {
return newNotFoundError(ErrIndexNotFound, string(req.TableKey))
Expand All @@ -3148,8 +3157,11 @@ func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKey
defer wrTo.Rollback()

resource := api.serverlessStorage.GetTableKeyResource(qtid, req.PartitionNum)
if err := resource.IncrementWLVersion(); err != nil {
if ok, err := resource.IncrementWLVersion(); err != nil {
return errors.Wrap(err, "incrementing write log version")
} else if !ok {
// no need to snapshot, no writes
return nil
}
// TODO(jaffee) downgrade write tx to read-only
err = resource.SnapshotTo(wrTo)
Expand All @@ -3159,12 +3171,15 @@ func (api *API) SnapshotTableKeys(ctx context.Context, req *dax.SnapshotTableKey
// SnapshotFieldKeys triggers the node to perform a field keys snapshot based on
// the provided SnapshotFieldKeysRequest.
func (api *API) SnapshotFieldKeys(ctx context.Context, req *dax.SnapshotFieldKeysRequest) error {
if !api.holder.DirectiveApplied() {
return errors.New("don't have directive yet, can't snapshot field keys")
}
qtid := req.TableKey.QualifiedTableID()

// Create the snapshot for the current version.
trans, err := api.FieldTranslateData(ctx, string(req.TableKey), string(req.Field))
if err != nil {
return errors.Wrap(err, "getting index/field writeto")
return errors.Wrap(err, "getting index/field translator")
}
// get a write tx to ensure no other writes while incrementing WL version.
wrTo, err := trans.Begin(true)
Expand All @@ -3174,8 +3189,11 @@ func (api *API) SnapshotFieldKeys(ctx context.Context, req *dax.SnapshotFieldKey
defer wrTo.Rollback()

resource := api.serverlessStorage.GetFieldKeyResource(qtid, req.Field)
if err := resource.IncrementWLVersion(); err != nil {
if ok, err := resource.IncrementWLVersion(); err != nil {
return errors.Wrap(err, "incrementing writelog version")
} else if !ok {
// no need to snapshot, no writes
return nil
}
// TODO(jaffee) downgrade to read tx
err = resource.SnapshotTo(wrTo)
Expand Down
1 change: 1 addition & 0 deletions ctl/dax.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ func BuildDAXFlags(cmd *cobra.Command, srv *server.Command) {
flags.BoolVar(&srv.Config.MDS.Run, "mds.run", srv.Config.MDS.Run, "Run the MDS service in process.")
flags.DurationVar(&srv.Config.MDS.Config.RegistrationBatchTimeout, "mds.config.registration-batch-timeout", srv.Config.MDS.Config.RegistrationBatchTimeout, "Timeout for node registration batches.")
flags.StringVar(&srv.Config.MDS.Config.DataDir, "mds.config.data-dir", srv.Config.MDS.Config.DataDir, "MDS directory to use in process.")
flags.DurationVar(&srv.Config.MDS.Config.SnappingTurtleTimeout, "mds.config.snapping-turtle-timeout", srv.Config.MDS.Config.SnappingTurtleTimeout, "Period for running automatic snapshotting routine.")

// WriteLogger
flags.BoolVar(&srv.Config.WriteLogger.Run, "writelogger.run", srv.Config.WriteLogger.Run, "Run the WriteLogger service in process.")
Expand Down
2 changes: 1 addition & 1 deletion dax/Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -85,7 +85,7 @@ dc-cli:
# featurebase cli --host localhost --port 8080 --org-id=testorg --db-id=testdb
# create table keysidstbl2 (_id string, slice idset);
dc-datagen:
docker-compose run datagen --end-at=500 --pilosa.batch-size=500 --featurebase.table-name=keysidstbl2
docker-compose run datagen --end-at=500 --pilosa.batch-size=500 --featurebase.table-name=keysidstbl2

dc-exec-%:
docker-compose exec $* /bin/sh
7 changes: 7 additions & 0 deletions dax/mds/controller/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,5 +25,12 @@ type Config struct {
// have been registered.
RegistrationBatchTimeout time.Duration

// SnappingTurtleTimeout is the period on which the automatic
// snapshotting routine will run. If performing all the snapshots
// takes longer than this amount of time, snapshotting will run
// continuously. If it finishes before the timeout, it will wait
// until the timeout expires to start another round of snapshots.
SnappingTurtleTimeout time.Duration

Logger logger.Logger
}
12 changes: 7 additions & 5 deletions dax/mds/controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -47,6 +47,7 @@ type Controller struct {

registrationBatchTimeout time.Duration
nodeChan chan *dax.Node
snappingTurtleTimeout time.Duration
stopping chan struct{}

logger logger.Logger
Expand All @@ -69,10 +70,12 @@ func New(cfg Config) *Controller {

poller: dax.NewNopAddressManager(),

logger: logger.NopLogger,
registrationBatchTimeout: cfg.RegistrationBatchTimeout,
nodeChan: make(chan *dax.Node, 10),
snappingTurtleTimeout: cfg.SnappingTurtleTimeout,

nodeChan: make(chan *dax.Node, 10),
stopping: make(chan struct{}),
logger: logger.NopLogger,
}

if cfg.Logger != nil {
Expand Down Expand Up @@ -106,14 +109,13 @@ func New(cfg Config) *Controller {
c.Schemar = cfg.Schemar
}

c.registrationBatchTimeout = cfg.RegistrationBatchTimeout

return c
}

// Run starts the node registration goroutine.
// Run starts long running subroutines.
func (c *Controller) Run() error {
go c.nodeRegistrationRoutine(c.nodeChan, c.registrationBatchTimeout)
go c.snappingTurtleRoutine(c.snappingTurtleTimeout)

return nil
}
Expand Down
1 change: 0 additions & 1 deletion dax/mds/controller/http/director.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,6 @@ func (d *Director) SendDirective(ctx context.Context, dir *dax.Directive) error

func (d *Director) SendSnapshotShardDataRequest(ctx context.Context, req *dax.SnapshotShardDataRequest) error {
url := fmt.Sprintf("%s/%s/shard-data", req.Address.WithScheme("http"), d.snapshotRequestPath)
d.logger.Printf("SEND HTTP snapshot shard data request to: %s\n", url)

// Encode the request.
postBody, err := json.Marshal(req)
Expand Down
110 changes: 110 additions & 0 deletions dax/mds/controller/snapping_turtle.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,110 @@
package controller

import (
"context"
"time"

"github.com/molecula/featurebase/v3/dax"
)

func (c *Controller) snappingTurtleRoutine(period time.Duration) {
if period == 0 {
return // disable automatic snapshotting
}
ticker := time.NewTicker(period)
for {
select {
case <-c.stopping:
ticker.Stop()
c.logger.Debugf("TURTLE: Stopping Snapping Turtle")
return
case <-ticker.C:
c.snapAll()
}
}

}

func (c *Controller) snapAll() {
c.logger.Debugf("TURTLE: snapAll")
ctx := context.Background()
computeNodes, err := c.ComputeBalancer.CurrentState(ctx)
if err != nil {
c.logger.Printf("Error getting compute balancer state for snapping turtle: %v", err)
}

// Weird nested loop for snapshotting shard data. The reason for
// this is to avoid hotspotting each node in turn and spread the
// snapshotting load across all nodes rather than snapshotting all
// jobs on one node and then moving onto the next one.
i := 0
stillWorking := true
for stillWorking {
stillWorking = false
for _, workerInfo := range computeNodes {
if len(workerInfo.Jobs) <= i {
continue
}
stillWorking = true
j, err := decodeShard(workerInfo.Jobs[i])
if err != nil {
c.logger.Printf("couldn't decode a shard out of the job: '%s', err: %v", workerInfo.Jobs[i], err)
}
c.SnapshotShardData(ctx, j.t.QualifiedTableID(), j.shardNum())
}
i++
}

// Get all tables across all orgs/dbs so we can snapshot all keyed
// fields and look up whether a table is keyed to snapshot it's
// partitions.
tables, err := c.Schemar.Tables(ctx, dax.TableQualifier{})
if err != nil {
c.logger.Printf("Couldn't get schema for snapshotting keys: %v", err)
return
}
// snapshot keyed fields
tableMap := make(map[dax.TableKey]*dax.QualifiedTable)
for _, table := range tables {
tableMap[table.Key()] = table
for _, f := range table.Fields {
if f.StringKeys() && !f.IsPrimaryKey() {
err := c.SnapshotFieldKeys(ctx, table.QualifiedID(), f.Name)
if err != nil {
c.logger.Printf("Couldn't snapshot table: %s, field: %s, error: %v", table, f.Name, err)
}
}
}
}

// Get all partition jobs from balancer and snapshot table keys
// for any partition that goes with a keyed table. Doing the same
// weird nested loop thing to avoid doing all jobs on one node
// back to back.
translateNodes, err := c.TranslateBalancer.CurrentState(ctx)
if err != nil {
c.logger.Printf("Error getting translate balancer state for snapping turtle: %v", err)
}

i = 0
stillWorking = true
for stillWorking {
stillWorking = false
for _, workerInfo := range translateNodes {
if len(workerInfo.Jobs) <= i {
continue
}
stillWorking = true
j, err := decodePartition(workerInfo.Jobs[i])
if err != nil {
table := tableMap[j.table()]
if table.StringKeys() {
c.SnapshotTableKeys(ctx, table.QualifiedID(), j.partitionNum())
}
c.logger.Printf("couldn't decode a partition out of the job: '%s', err: %v", workerInfo.Jobs[i], err)
}
}
i++
}
c.logger.Debugf("TURTLE: snapAll complete")
}
3 changes: 3 additions & 0 deletions dax/mds/mds.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ type Config struct {
// have been registered.
RegistrationBatchTimeout time.Duration `toml:"registration-batch-timeout"`

SnappingTurtleTimeout time.Duration

// Poller
PollInterval time.Duration `toml:"poll-interval"`

Expand Down Expand Up @@ -107,6 +109,7 @@ func New(cfg Config) *MDS {
TranslateBalancer: naiveboltdb.NewBalancer("translate", controllerDB, logr),

RegistrationBatchTimeout: cfg.RegistrationBatchTimeout,
SnappingTurtleTimeout: cfg.SnappingTurtleTimeout,

StorageMethod: cfg.StorageMethod,
// just reusing this bolt for internal controller svcs
Expand Down
9 changes: 3 additions & 6 deletions dax/mds/poller/poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,24 +133,21 @@ func (p *Poller) pollAll() {
toRemove := []dax.Address{}

for _, addr := range addrs {
p.logger.Debugf("polling: %s", addr)
start := time.Now()
up := p.nodePoller.Poll(addr)
if !up {
p.logger.Printf("poller removing %s", addr)
toRemove = append(toRemove, addr)
}
p.logger.Debugf("done poll: %s, %s", addr, time.Since(start))
}

if len(toRemove) > 0 {
p.logger.Debugf("removing addresses: %v", toRemove)
p.logger.Debugf("POLLER: removing addresses: %v", toRemove)
start := time.Now()
err := p.addressManager.RemoveAddresses(ctx, toRemove...)
if err != nil {
p.logger.Printf("removing %s: %v", toRemove, err)
p.logger.Printf("POLLER: error removing %s: %v", toRemove, err)
}
p.logger.Debugf("remove complete: %s", time.Since(start))
p.logger.Debugf("POLLER removing %v complete: %s", toRemove, time.Since(start))
}

}
12 changes: 10 additions & 2 deletions dax/mds/schemar/boltdb/schemar.go
Original file line number Diff line number Diff line change
Expand Up @@ -253,7 +253,7 @@ func (s *Schemar) tableIDByName(tx *boltdb.Tx, qual dax.TableQualifier, name dax
}

// Tables returns a list of Table for all existing tables. If one or more table
// names is provided, then only those will be included in the output.
// IDs is provided, then only those will be included in the output.
func (s *Schemar) Tables(ctx context.Context, qual dax.TableQualifier, ids ...dax.TableID) ([]*dax.QualifiedTable, error) {
tx, err := s.db.BeginTx(ctx, false)
if err != nil {
Expand All @@ -276,6 +276,12 @@ func (s *Schemar) getTables(ctx context.Context, tx *boltdb.Tx, qual dax.TableQu
}

prefix := []byte(fmt.Sprintf(prefixFmtTables, qual.OrganizationID, qual.DatabaseID))
if qual.OrganizationID == "" && qual.DatabaseID == "" {
prefix = []byte(prefixTables)
} else if qual.DatabaseID == "" {
prefix = []byte(fmt.Sprintf(prefixFmtTablesOrg, qual.OrganizationID))
}

for k, v := c.Seek(prefix); k != nil && bytes.HasPrefix(k, prefix); k, v = c.Next() {
if v == nil {
s.logger.Printf("nil value for key: %s", k)
Expand Down Expand Up @@ -346,7 +352,9 @@ func (s *Schemar) DropTable(ctx context.Context, qtid dax.QualifiedTableID) erro
}

const (
prefixFmtTables = "tables/%s/%s/"
prefixTables = "tables/"
prefixFmtTablesOrg = prefixTables + "%s/"
prefixFmtTables = prefixFmtTablesOrg + "%s/"
prefixFmtTableNames = "tablenames/%s/%s/"
)

Expand Down
34 changes: 34 additions & 0 deletions dax/mds/schemar/boltdb/schemar_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -143,4 +143,38 @@ func TestSchemar(t *testing.T) {
assert.NoError(t, err)
assert.Equal(t, exp, tables)
})

t.Run("GetTablesAll", func(t *testing.T) {
// get a fresh DB
db := testbolt.MustOpenDB(t)
defer testbolt.MustCloseDB(t, db)

t.Cleanup(func() {
testbolt.CleanupDB(t, db.Path())
})
// Initialize the buckets.
assert.NoError(t, db.InitializeBuckets(boltdb.SchemarBuckets...))

s := boltdb.NewSchemar(db, logger.NopLogger)

qtbl0 := daxtest.TestQualifiedTableWithID(t, qual, tableID0, tableName0, partitionN, false)
orgID2 := dax.OrganizationID("acme2")
qual2 := dax.NewTableQualifier(orgID2, dbID)
tableID2 := "3"
qtbl2 := daxtest.TestQualifiedTableWithID(t, qual2, tableID2, dax.TableName("two"), partitionN, false)

assert.NoError(t, s.CreateTable(ctx, qtbl0))
assert.NoError(t, s.CreateTable(ctx, qtbl2))

exp := []*dax.QualifiedTable{qtbl0, qtbl2}

tables, err := s.Tables(ctx, dax.TableQualifier{})
assert.NoError(t, err)
assert.Equal(t, exp, tables)

tables, err = s.Tables(ctx, dax.TableQualifier{OrganizationID: orgID2})
assert.NoError(t, err)
assert.Equal(t, []*dax.QualifiedTable{qtbl2}, tables)

})
}
7 changes: 7 additions & 0 deletions dax/mds/schemar/schemar.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,13 @@ type Schemar interface {
CreateField(context.Context, dax.QualifiedTableID, *dax.Field) error
DropField(context.Context, dax.QualifiedTableID, dax.FieldName) error
Table(context.Context, dax.QualifiedTableID) (*dax.QualifiedTable, error)

// Tables returns a list of tables. If the qualifiers DatabaseID
// is empty, all tables in the org will be returned. If the
// OrganizationID is empty, all tables will be returned. If both
// are populated, only tables in that databse will be returned. If
// greater than zero table IDs are passed in the third argument,
// only tables matching those IDs will be returned.
Tables(context.Context, dax.TableQualifier, ...dax.TableID) ([]*dax.QualifiedTable, error)

// TableID is a reverse-lookup method to get the TableID for a given
Expand Down
Loading

0 comments on commit 0f67a0c

Please sign in to comment.