Skip to content

Commit

Permalink
Merge pull request #17195 from siyuanfoundation/txBuf1
Browse files Browse the repository at this point in the history
Fix delete inconsistencies in read buffer
  • Loading branch information
ahrtr authored Jan 11, 2024
2 parents 85bc293 + db61c96 commit b3bf59a
Show file tree
Hide file tree
Showing 6 changed files with 211 additions and 3 deletions.
1 change: 1 addition & 0 deletions server/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ require (
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da
github.com/golang/protobuf v1.5.3
github.com/google/btree v1.1.2
github.com/google/go-cmp v0.6.0
github.com/grpc-ecosystem/go-grpc-middleware v1.3.0
github.com/grpc-ecosystem/go-grpc-prometheus v1.2.0
github.com/grpc-ecosystem/grpc-gateway/v2 v2.19.0
Expand Down
36 changes: 34 additions & 2 deletions server/storage/backend/batch_tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,8 @@ func (t *batchTx) commit(stop bool) {

type batchTxBuffered struct {
batchTx
buf txWriteBuffer
buf txWriteBuffer
pendingDeleteOperations int
}

func newBatchTxBuffered(backend *backend) *batchTxBuffered {
Expand All @@ -310,7 +311,27 @@ func (t *batchTxBuffered) Unlock() {
t.buf.writeback(&t.backend.readTx.buf)
// gofail: var afterWritebackBuf struct{}
t.backend.readTx.Unlock()
if t.pending >= t.backend.batchLimit {
// We commit the transaction when the number of pending operations
// reaches the configured limit(batchLimit) to prevent it from
// becoming excessively large.
//
// But we also need to commit the transaction immediately if there
// is any pending deleting operation, otherwise etcd might run into
// a situation that it haven't finished committing the data into backend
// storage (note: etcd periodically commits the bbolt transactions
// instead of on each request) when it applies next request. Accordingly,
// etcd may still read the stale data from bbolt when processing next
// request. So it breaks the linearizability.
//
// Note we don't need to commit the transaction for put requests if
// it doesn't exceed the batch limit, because there is a buffer on top
// of the bbolt. Each time when etcd reads data from backend storage,
// it will read data from both bbolt and the buffer. But there is no
// such a buffer for delete requests.
//
// Please also refer to
// https://github.com/etcd-io/etcd/pull/17119#issuecomment-1857547158
if t.pending >= t.backend.batchLimit || t.pendingDeleteOperations > 0 {
t.commit(false)
}
}
Expand Down Expand Up @@ -356,6 +377,7 @@ func (t *batchTxBuffered) unsafeCommit(stop bool) {
}

t.batchTx.commit(stop)
t.pendingDeleteOperations = 0

if !stop {
t.backend.readTx.tx = t.backend.begin(false)
Expand All @@ -371,3 +393,13 @@ func (t *batchTxBuffered) UnsafeSeqPut(bucket Bucket, key []byte, value []byte)
t.batchTx.UnsafeSeqPut(bucket, key, value)
t.buf.putSeq(bucket, key, value)
}

func (t *batchTxBuffered) UnsafeDelete(bucketType Bucket, key []byte) {
t.batchTx.UnsafeDelete(bucketType, key)
t.pendingDeleteOperations++
}

func (t *batchTxBuffered) UnsafeDeleteBucket(bucket Bucket) {
t.batchTx.UnsafeDeleteBucket(bucket)
t.pendingDeleteOperations++
}
89 changes: 89 additions & 0 deletions server/storage/backend/batch_tx_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@ import (
"testing"
"time"

"github.com/google/go-cmp/cmp"

bolt "go.etcd.io/bbolt"
"go.etcd.io/etcd/server/v3/storage/backend"
betesting "go.etcd.io/etcd/server/v3/storage/backend/testing"
Expand Down Expand Up @@ -205,3 +207,90 @@ func TestBatchTxBatchLimitCommit(t *testing.T) {
return nil
})
}

func TestRangeAfterDeleteBucketMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
tx.Commit()

checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})

tx.Lock()
tx.UnsafeDeleteBucket(schema.Test)
tx.Unlock()

checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
}

func TestRangeAfterDeleteMatch(t *testing.T) {
b, _ := betesting.NewTmpBackend(t, time.Hour, 10000)
defer betesting.Close(t, b)

tx := b.BatchTx()

tx.Lock()
tx.UnsafeCreateBucket(schema.Test)
tx.UnsafePut(schema.Test, []byte("foo"), []byte("bar"))
tx.Unlock()
tx.Commit()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), [][]byte{[]byte("foo")}, [][]byte{[]byte("bar")})

tx.Lock()
tx.UnsafeDelete(schema.Test, []byte("foo"))
tx.Unlock()

checkRangeResponseMatch(t, b.BatchTx(), b.ReadTx(), []byte("foo"), nil, 0)
checkForEach(t, b.BatchTx(), b.ReadTx(), nil, nil)
}

func checkRangeResponseMatch(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, key, endKey []byte, limit int64) {
tx.Lock()
ks1, vs1 := tx.UnsafeRange(schema.Test, key, endKey, limit)
tx.Unlock()

rtx.RLock()
ks2, vs2 := rtx.UnsafeRange(schema.Test, key, endKey, limit)
rtx.RUnlock()

if diff := cmp.Diff(ks1, ks2); diff != "" {
t.Errorf("keys on read and batch transaction doesn't match, diff: %s", diff)
}
if diff := cmp.Diff(vs1, vs2); diff != "" {
t.Errorf("values on read and batch transaction doesn't match, diff: %s", diff)
}
}

func checkForEach(t *testing.T, tx backend.BatchTx, rtx backend.ReadTx, expectedKeys, expectedValues [][]byte) {
tx.Lock()
checkUnsafeForEach(t, tx, expectedKeys, expectedValues)
tx.Unlock()

rtx.RLock()
checkUnsafeForEach(t, rtx, expectedKeys, expectedValues)
rtx.RUnlock()
}

func checkUnsafeForEach(t *testing.T, tx backend.UnsafeReader, expectedKeys, expectedValues [][]byte) {
var ks, vs [][]byte
tx.UnsafeForEach(schema.Test, func(k, v []byte) error {
ks = append(ks, k)
vs = append(vs, v)
return nil
})

if diff := cmp.Diff(ks, expectedKeys); diff != "" {
t.Errorf("keys on transaction doesn't match expected, diff: %s", diff)
}
if diff := cmp.Diff(vs, expectedValues); diff != "" {
t.Errorf("values on transaction doesn't match expected, diff: %s", diff)
}
}
6 changes: 5 additions & 1 deletion tests/framework/integration/cluster.go
Original file line number Diff line number Diff line change
Expand Up @@ -142,7 +142,8 @@ type ClusterConfig struct {
AuthToken string
AuthTokenTTL uint

QuotaBackendBytes int64
QuotaBackendBytes int64
BackendBatchInterval time.Duration

MaxTxnOps uint
MaxRequestBytes uint
Expand Down Expand Up @@ -271,6 +272,7 @@ func (c *Cluster) mustNewMember(t testutil.TB) *Member {
PeerTLS: c.Cfg.PeerTLS,
ClientTLS: c.Cfg.ClientTLS,
QuotaBackendBytes: c.Cfg.QuotaBackendBytes,
BackendBatchInterval: c.Cfg.BackendBatchInterval,
MaxTxnOps: c.Cfg.MaxTxnOps,
MaxRequestBytes: c.Cfg.MaxRequestBytes,
SnapshotCount: c.Cfg.SnapshotCount,
Expand Down Expand Up @@ -598,6 +600,7 @@ type MemberConfig struct {
AuthToken string
AuthTokenTTL uint
QuotaBackendBytes int64
BackendBatchInterval time.Duration
MaxTxnOps uint
MaxRequestBytes uint
SnapshotCount uint64
Expand Down Expand Up @@ -671,6 +674,7 @@ func MustNewMember(t testutil.TB, mcfg MemberConfig) *Member {
m.TickMs = uint(framecfg.TickDuration / time.Millisecond)
m.PreVote = true
m.QuotaBackendBytes = mcfg.QuotaBackendBytes
m.BackendBatchInterval = mcfg.BackendBatchInterval
m.MaxTxnOps = mcfg.MaxTxnOps
if m.MaxTxnOps == 0 {
m.MaxTxnOps = embed.DefaultMaxTxnOps
Expand Down
51 changes: 51 additions & 0 deletions tests/integration/clientv3/user_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"testing"
"time"

"github.com/stretchr/testify/require"
"google.golang.org/grpc"

"go.etcd.io/etcd/api/v3/v3rpc/rpctypes"
Expand Down Expand Up @@ -55,6 +56,56 @@ func TestUserError(t *testing.T) {
}
}

func TestAddUserAfterDelete(t *testing.T) {
integration2.BeforeTest(t)

clus := integration2.NewCluster(t, &integration2.ClusterConfig{Size: 1})
defer clus.Terminate(t)

authapi := clus.RandClient()
authSetupRoot(t, authapi.Auth)
cfg := clientv3.Config{
Endpoints: authapi.Endpoints(),
DialTimeout: 5 * time.Second,
DialOptions: []grpc.DialOption{grpc.WithBlock()},
}
cfg.Username, cfg.Password = "root", "123"
authed, err := integration2.NewClient(t, cfg)
require.NoError(t, err)
defer authed.Close()

// add user
_, err = authed.UserAdd(context.TODO(), "foo", "bar")
require.NoError(t, err)
_, err = authapi.Authenticate(context.TODO(), "foo", "bar")
require.NoError(t, err)
// delete user
_, err = authed.UserDelete(context.TODO(), "foo")
require.NoError(t, err)
if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil {
t.Errorf("expect Authenticate error for old password")
}
// add user back
_, err = authed.UserAdd(context.TODO(), "foo", "bar")
require.NoError(t, err)
_, err = authed.Authenticate(context.TODO(), "foo", "bar")
require.NoError(t, err)
// change password
_, err = authed.UserChangePassword(context.TODO(), "foo", "bar2")
require.NoError(t, err)
_, err = authed.UserChangePassword(context.TODO(), "foo", "bar1")
require.NoError(t, err)

if _, err = authed.Authenticate(context.TODO(), "foo", "bar"); err == nil {
t.Errorf("expect Authenticate error for old password")
}
if _, err = authed.Authenticate(context.TODO(), "foo", "bar2"); err == nil {
t.Errorf("expect Authenticate error for old password")
}
_, err = authed.Authenticate(context.TODO(), "foo", "bar1")
require.NoError(t, err)
}

func TestUserErrorAuth(t *testing.T) {
integration2.BeforeTest(t)

Expand Down
31 changes: 31 additions & 0 deletions tests/integration/member_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,13 @@ import (
"context"
"fmt"
"testing"
"time"

"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"go.etcd.io/etcd/server/v3/etcdserver"
"go.etcd.io/etcd/server/v3/storage/schema"
"go.etcd.io/etcd/tests/v3/framework/integration"
)

Expand Down Expand Up @@ -115,3 +119,30 @@ func TestSnapshotAndRestartMember(t *testing.T) {
}
}
}

func TestRemoveMember(t *testing.T) {
integration.BeforeTest(t)
c := integration.NewCluster(t, &integration.ClusterConfig{Size: 3, UseBridge: true, BackendBatchInterval: 1000 * time.Second})
defer c.Terminate(t)
// membership changes additionally require cluster to be stable for etcdserver.HealthInterval
time.Sleep(etcdserver.HealthInterval)

err := c.RemoveMember(t, c.Client(2), uint64(c.Members[0].ID()))
require.NoError(t, err)

checkMemberCount(t, c.Members[0], 2)
checkMemberCount(t, c.Members[1], 2)
}

func checkMemberCount(t *testing.T, m *integration.Member, expectedMemberCount int) {
be := schema.NewMembershipBackend(m.Logger, m.Server.Backend())
membersFromBackend, _ := be.MustReadMembersFromBackend()
if len(membersFromBackend) != expectedMemberCount {
t.Errorf("Expect member count read from backend=%d, got %d", expectedMemberCount, len(membersFromBackend))
}
membersResp, err := m.Client.MemberList(context.Background())
require.NoError(t, err)
if len(membersResp.Members) != expectedMemberCount {
t.Errorf("Expect len(MemberList)=%d, got %d", expectedMemberCount, len(membersResp.Members))
}
}

0 comments on commit b3bf59a

Please sign in to comment.