Skip to content

Commit

Permalink
server: Implement WithMmapSize for backend config
Browse files Browse the repository at this point in the history
Revert to receiving the log and the path. Accept a third argument which
are options to be applied to the backend config.
Add a new funcion WithMmapSize, which modifies the backend config to
override this option.

Signed-off-by: Ivan Valdes <[email protected]>
  • Loading branch information
ivanvc committed Feb 12, 2024
1 parent f0ce28e commit cd9dfaf
Show file tree
Hide file tree
Showing 16 changed files with 38 additions and 33 deletions.
2 changes: 1 addition & 1 deletion etcdutl/etcdutl/migrate_command.go
Original file line number Diff line number Diff line change
Expand Up @@ -92,7 +92,7 @@ func (o *migrateOptions) Config() (*migrateConfig, error) {
}

dbPath := datadir.ToBackendFileName(o.dataDir)
c.be = backend.NewDefaultBackend(backend.BackendConfig{Logger: GetLogger(), Path: dbPath})
c.be = backend.NewDefaultBackend(GetLogger(), dbPath)

walPath := datadir.ToWalDir(o.dataDir)
w, err := wal.OpenForRead(c.lg, walPath, walpb.Snapshot{})
Expand Down
8 changes: 4 additions & 4 deletions etcdutl/snapshot/v3_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -325,7 +325,7 @@ func (s *v3Manager) saveDB() error {
return err
}

be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize})
be := backend.NewDefaultBackend(s.lg, s.outDbPath(), backend.WithMmapSize(s.initialMmapSize))
defer be.Close()

err = schema.NewMembershipBackend(s.lg, be).TrimMembershipFromBackend()
Expand All @@ -339,7 +339,7 @@ func (s *v3Manager) saveDB() error {
// modifyLatestRevision can increase the latest revision by the given amount and sets the scheduled compaction
// to that revision so that the server will consider this revision compacted.
func (s *v3Manager) modifyLatestRevision(bumpAmount uint64) error {
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath()})
be := backend.NewDefaultBackend(s.lg, s.outDbPath())
defer func() {
be.ForceCommit()
be.Close()
Expand Down Expand Up @@ -480,7 +480,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
}

// add members again to persist them to the store we create.
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize})
be := backend.NewDefaultBackend(s.lg, s.outDbPath(), backend.WithMmapSize(s.initialMmapSize))
defer be.Close()
s.cl.SetBackend(schema.NewMembershipBackend(s.lg, be))
for _, m := range s.cl.Members() {
Expand Down Expand Up @@ -559,7 +559,7 @@ func (s *v3Manager) saveWALAndSnap() (*raftpb.HardState, error) {
}

func (s *v3Manager) updateCIndex(commit uint64, term uint64) error {
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: s.lg, Path: s.outDbPath(), MmapSize: s.initialMmapSize})
be := backend.NewDefaultBackend(s.lg, s.outDbPath(), backend.WithMmapSize(s.initialMmapSize))
defer be.Close()

cindex.UpdateConsistentIndexForce(be.BatchTx(), commit, term)
Expand Down
4 changes: 2 additions & 2 deletions server/etcdserver/cindex/cindex_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@ func TestConsistentIndex(t *testing.T) {
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(backend.BackendConfig{Logger: zaptest.NewLogger(t), Path: tmpPath})
b := backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath)
defer b.Close()
ci.SetBackend(b)
index = ci.ConsistentIndex()
Expand Down Expand Up @@ -108,7 +108,7 @@ func TestConsistentIndexDecrease(t *testing.T) {
be.ForceCommit()
be.Close()

be = backend.NewDefaultBackend(backend.BackendConfig{Logger: zaptest.NewLogger(t), Path: tmpPath})
be = backend.NewDefaultBackend(zaptest.NewLogger(t), tmpPath)
defer be.Close()
ci := NewConsistentIndex(be)
ci.SetConsistentIndex(tc.index, tc.term)
Expand Down
20 changes: 12 additions & 8 deletions server/storage/backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,8 @@ type BackendConfig struct {
Hooks Hooks
}

type BackendConfigFunc func(*BackendConfig)

func DefaultBackendConfig(lg *zap.Logger) BackendConfig {
return BackendConfig{
BatchInterval: defaultBatchInterval,
Expand All @@ -164,15 +166,17 @@ func New(bcfg BackendConfig) Backend {
return newBackend(bcfg)
}

func NewDefaultBackend(bc BackendConfig) Backend {

bcfg := DefaultBackendConfig(bc.Logger)
bcfg.Path = bc.Path
func WithMmapSize(size uint64) BackendConfigFunc {
return func(bcfg *BackendConfig) {
bcfg.MmapSize = size
}
}

if bc.MmapSize > 0 {
bcfg.MmapSize = bc.MmapSize
} else {
bcfg.MmapSize = InitialMmapSize
func NewDefaultBackend(lg *zap.Logger, path string, bcfns ...BackendConfigFunc) Backend {
bcfg := DefaultBackendConfig(lg)
bcfg.Path = path
for _, fn := range bcfns {
fn(&bcfg)
}

return newBackend(bcfg)
Expand Down
4 changes: 2 additions & 2 deletions server/storage/mvcc/store_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ func TestScheduledCompact(t *testing.T) {
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
b := backend.NewDefaultBackend(lg, tmpPath)
defer b.Close()
v, found := UnsafeReadScheduledCompact(b.BatchTx())
assert.Equal(t, true, found)
Expand Down Expand Up @@ -103,7 +103,7 @@ func TestFinishedCompact(t *testing.T) {
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
b := backend.NewDefaultBackend(lg, tmpPath)
defer b.Close()
v, found := UnsafeReadFinishedCompact(b.BatchTx())
assert.Equal(t, true, found)
Expand Down
4 changes: 2 additions & 2 deletions server/storage/schema/auth_roles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ func TestGetAllRoles(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.GetAllRoles()
Expand Down Expand Up @@ -219,7 +219,7 @@ func TestGetRole(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.GetRole("role1")
Expand Down
4 changes: 2 additions & 2 deletions server/storage/schema/auth_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -66,7 +66,7 @@ func TestAuthEnabled(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
tx = abe2.BatchTx()
Expand Down Expand Up @@ -117,7 +117,7 @@ func TestAuthRevision(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
tx := abe2.BatchTx()
Expand Down
4 changes: 2 additions & 2 deletions server/storage/schema/auth_users_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,7 +110,7 @@ func TestGetAllUsers(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.ReadTx().UnsafeGetAllUsers()
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestGetUser(t *testing.T) {
abe.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
abe2 := NewAuthBackend(lg, be2)
users := abe2.GetUser("alice")
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/lease_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ func TestLeaseBackend(t *testing.T) {
be.ForceCommit()
be.Close()

be2 := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
be2 := backend.NewDefaultBackend(lg, tmpPath)
defer be2.Close()
leases := MustUnsafeGetAllLeases(be2.ReadTx())

Expand Down
6 changes: 3 additions & 3 deletions server/storage/schema/schema_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,7 +83,7 @@ func TestValidate(t *testing.T) {
lg := zap.NewNop()
dataPath := setupBackendData(t, tc.version, tc.overrideKeys)

b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath})
b := backend.NewDefaultBackend(lg, dataPath)
defer b.Close()
err := Validate(lg, b.ReadTx())
if (err != nil) != tc.expectError {
Expand Down Expand Up @@ -211,7 +211,7 @@ func TestMigrate(t *testing.T) {
t.Fatal(err)
}

b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath})
b := backend.NewDefaultBackend(lg, dataPath)
defer b.Close()

err = Migrate(lg, b.BatchTx(), walVersion, tc.targetVersion)
Expand Down Expand Up @@ -255,7 +255,7 @@ func TestMigrateIsReversible(t *testing.T) {
lg := zap.NewNop()
dataPath := setupBackendData(t, tc.initialVersion, nil)

be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dataPath})
be := backend.NewDefaultBackend(lg, dataPath)
defer be.Close()
tx := be.BatchTx()
tx.Lock()
Expand Down
2 changes: 1 addition & 1 deletion server/storage/schema/version_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ func TestVersion(t *testing.T) {
be.ForceCommit()
be.Close()

b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: tmpPath})
b := backend.NewDefaultBackend(lg, tmpPath)
defer b.Close()
v := UnsafeReadStorageVersion(b.BatchTx())

Expand Down
3 changes: 2 additions & 1 deletion server/verify/verify.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,8 @@ func Verify(cfg Config) error {
lg.Info("verification of persisted state successful", zap.String("data-dir", cfg.DataDir))
}
}()
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: datadir.ToBackendFileName(cfg.DataDir)})

be := backend.NewDefaultBackend(lg, datadir.ToBackendFileName(cfg.DataDir))
defer be.Close()

snapshot, hardstate, err := validateWal(cfg)
Expand Down
2 changes: 1 addition & 1 deletion tests/e2e/utl_migrate_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,7 @@ func TestEtctlutlMigrate(t *testing.T) {
}

t.Log("etcdutl migrate...")
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: filepath.Join(memberDataDir, "member/snap/db")})
be := backend.NewDefaultBackend(lg, filepath.Join(memberDataDir, "member/snap/db"))
defer be.Close()

ver := schema.ReadStorageVersion(be.ReadTx())
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/clientv3/maintenance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -277,7 +277,7 @@ func testMaintenanceSnapshotErrorInflight(t *testing.T, snapshot func(context.Co
// take about 1-second to read snapshot
clus.Members[0].Stop(t)
dpath := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: dpath})
b := backend.NewDefaultBackend(lg, dpath)
s := mvcc.NewStore(lg, b, &lease.FakeLessor{}, mvcc.StoreConfig{CompactionBatchLimit: math.MaxInt32})
rev := 100000
for i := 2; i <= rev; i++ {
Expand Down
2 changes: 1 addition & 1 deletion tests/integration/v3_alarm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestV3CorruptAlarm(t *testing.T) {
// Corrupt member 0 by modifying backend offline.
clus.Members[0].Stop(t)
fp := filepath.Join(clus.Members[0].DataDir, "member", "snap", "db")
be := backend.NewDefaultBackend(backend.BackendConfig{Logger: lg, Path: fp})
be := backend.NewDefaultBackend(lg, fp)
s := mvcc.NewStore(lg, be, nil, mvcc.StoreConfig{})
// NOTE: cluster_proxy mode with namespacing won't set 'k', but namespace/'k'.
s.Put([]byte("abc"), []byte("def"), 0)
Expand Down
2 changes: 1 addition & 1 deletion tools/etcd-dump-db/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,7 @@ func iterateBucket(dbPath, bucket string, limit uint64, decode bool) (err error)
}

func getHash(dbPath string) (hash uint32, err error) {
b := backend.NewDefaultBackend(backend.BackendConfig{Logger: zap.NewNop(), Path: dbPath})
b := backend.NewDefaultBackend(zap.NewNop(), dbPath)
return b.Hash(schema.DefaultIgnores)
}

Expand Down

0 comments on commit cd9dfaf

Please sign in to comment.