From 79189c9c340c6bc01b34d0f0f0aa9efa7e9682b6 Mon Sep 17 00:00:00 2001 From: Bogdan Kanivets Date: Thu, 1 Sep 2022 00:58:44 -0700 Subject: [PATCH] server: don't panic in readonly serializable txn Problem: We pass grpc context down to applier in readonly serializable txn. This context can be cancelled for example due to timeout. This will trigger panic inside applyTxn Solution: Only panic for transactions with write operations fixes https://github.com/etcd-io/etcd/issues/14110 main PR https://github.com/etcd-io/etcd/pull/14149 Signed-off-by: Bogdan Kanivets --- server/etcdserver/apply.go | 33 ++++++++---- server/etcdserver/apply_test.go | 96 +++++++++++++++++++++++++++++++++ server/mvcc/kvstore_txn.go | 3 +- 3 files changed, 122 insertions(+), 10 deletions(-) create mode 100644 server/etcdserver/apply_test.go diff --git a/server/etcdserver/apply.go b/server/etcdserver/apply.go index 5a77ef37734..2ef151259c1 100644 --- a/server/etcdserver/apply.go +++ b/server/etcdserver/apply.go @@ -428,6 +428,7 @@ func (a *applierV3backend) Range(ctx context.Context, txn mvcc.TxnRead, r *pb.Ra } func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnResponse, *traceutil.Trace, error) { + lg := a.s.Logger() trace := traceutil.Get(ctx) if trace.IsEmpty() { trace = traceutil.New("transaction", a.s.Logger()) @@ -474,7 +475,18 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR txn.End() txn = a.s.KV().Write(trace) } - a.applyTxn(ctx, txn, rt, txnPath, txnResp) + _, err := a.applyTxn(ctx, txn, rt, txnPath, txnResp) + if err != nil { + if isWrite { + // end txn to release locks before panic + txn.End() + // When txn with write operations starts it has to be successful + // We don't have a way to recover state in case of write failure + lg.Panic("unexpected error during txn with writes", zap.Error(err)) + } else { + lg.Error("unexpected error during readonly txn", zap.Error(err)) + } + } rev := txn.Rev() if len(txn.Changes()) != 0 { rev++ @@ -486,7 +498,7 @@ func (a *applierV3backend) Txn(ctx context.Context, rt *pb.TxnRequest) (*pb.TxnR traceutil.Field{Key: "number_of_response", Value: len(txnResp.Responses)}, traceutil.Field{Key: "response_revision", Value: txnResp.Header.Revision}, ) - return txnResp, trace, nil + return txnResp, trace, err } // newTxnResp allocates a txn response for a txn request given a path. @@ -617,14 +629,13 @@ func compareKV(c *pb.Compare, ckv mvccpb.KeyValue) bool { return true } -func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int) { +func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt *pb.TxnRequest, txnPath []bool, tresp *pb.TxnResponse) (txns int, err error) { trace := traceutil.Get(ctx) reqs := rt.Success if !txnPath[0] { reqs = rt.Failure } - lg := a.s.Logger() for i, req := range reqs { respi := tresp.Responses[i].Response switch tv := req.Request.(type) { @@ -635,7 +646,7 @@ func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt * traceutil.Field{Key: "range_end", Value: string(tv.RequestRange.RangeEnd)}) resp, err := a.Range(ctx, txn, tv.RequestRange) if err != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) + return 0, fmt.Errorf("applyTxn: failed Range: %w", err) } respi.(*pb.ResponseOp_ResponseRange).ResponseRange = resp trace.StopSubTrace() @@ -646,26 +657,30 @@ func (a *applierV3backend) applyTxn(ctx context.Context, txn mvcc.TxnWrite, rt * traceutil.Field{Key: "req_size", Value: tv.RequestPut.Size()}) resp, _, err := a.Put(ctx, txn, tv.RequestPut) if err != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) + return 0, fmt.Errorf("applyTxn: failed Put: %w", err) } respi.(*pb.ResponseOp_ResponsePut).ResponsePut = resp trace.StopSubTrace() case *pb.RequestOp_RequestDeleteRange: resp, err := a.DeleteRange(txn, tv.RequestDeleteRange) if err != nil { - lg.Panic("unexpected error during txn", zap.Error(err)) + return 0, fmt.Errorf("applyTxn: failed DeleteRange: %w", err) } respi.(*pb.ResponseOp_ResponseDeleteRange).ResponseDeleteRange = resp case *pb.RequestOp_RequestTxn: resp := respi.(*pb.ResponseOp_ResponseTxn).ResponseTxn - applyTxns := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp) + applyTxns, err := a.applyTxn(ctx, txn, tv.RequestTxn, txnPath[1:], resp) + if err != nil { + // don't wrap the error. It's a recursive call and err should be already wrapped + return 0, err + } txns += applyTxns + 1 txnPath = txnPath[applyTxns+1:] default: // empty union } } - return txns + return txns, nil } func (a *applierV3backend) Compaction(compaction *pb.CompactionRequest) (*pb.CompactionResponse, <-chan struct{}, *traceutil.Trace, error) { diff --git a/server/etcdserver/apply_test.go b/server/etcdserver/apply_test.go new file mode 100644 index 00000000000..65704fd4e78 --- /dev/null +++ b/server/etcdserver/apply_test.go @@ -0,0 +1,96 @@ +package etcdserver + +import ( + "context" + "strings" + "sync" + "testing" + + "github.com/stretchr/testify/assert" + "go.uber.org/zap" + + pb "go.etcd.io/etcd/api/v3/etcdserverpb" + "go.etcd.io/etcd/server/v3/lease" + "go.etcd.io/etcd/server/v3/mvcc" + betesting "go.etcd.io/etcd/server/v3/mvcc/backend/testing" +) + +func TestReadonlyTxnError(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + s := mvcc.New(zap.NewExample(), b, &lease.FakeLessor{}, mvcc.StoreConfig{}) + defer s.Close() + + // setup minimal server to get access to applier + srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeRecorder()})} + srv.kv = s + srv.be = b + + a := srv.newApplierV3Backend() + + // setup cancelled context + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + + // put some data to prevent early termination in rangeKeys + // we are expecting failure on cancelled context check + s.Put([]byte("foo"), []byte("bar"), lease.NoLease) + + txn := &pb.TxnRequest{ + Success: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestRange{ + RequestRange: &pb.RangeRequest{ + Key: []byte("foo"), + }, + }, + }, + }, + } + + _, _, err := a.Txn(ctx, txn) + if err == nil || !strings.Contains(err.Error(), "applyTxn: failed Range: rangeKeys: context cancelled: context canceled") { + t.Fatalf("Expected context canceled error, got %v", err) + } +} + +func TestWriteTxnPanic(t *testing.T) { + b, _ := betesting.NewDefaultTmpBackend(t) + defer betesting.Close(t, b) + s := mvcc.New(zap.NewExample(), b, &lease.FakeLessor{}, mvcc.StoreConfig{}) + defer s.Close() + + // setup minimal server to get access to applier + srv := &EtcdServer{lgMu: new(sync.RWMutex), lg: zap.NewExample(), r: *newRaftNode(raftNodeConfig{lg: zap.NewExample(), Node: newNodeRecorder()})} + srv.kv = s + srv.be = b + + a := srv.newApplierV3Backend() + + // setup cancelled context + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + + // write txn that puts some data and then fails in range due to cancelled context + txn := &pb.TxnRequest{ + Success: []*pb.RequestOp{ + { + Request: &pb.RequestOp_RequestPut{ + RequestPut: &pb.PutRequest{ + Key: []byte("foo"), + Value: []byte("bar"), + }, + }, + }, + { + Request: &pb.RequestOp_RequestRange{ + RequestRange: &pb.RangeRequest{ + Key: []byte("foo"), + }, + }, + }, + }, + } + + assert.Panics(t, func() { a.Txn(ctx, txn) }, "Expected panic in Txn with writes") +} diff --git a/server/mvcc/kvstore_txn.go b/server/mvcc/kvstore_txn.go index 9df7b79410f..8d243363547 100644 --- a/server/mvcc/kvstore_txn.go +++ b/server/mvcc/kvstore_txn.go @@ -16,6 +16,7 @@ package mvcc import ( "context" + "fmt" "go.etcd.io/etcd/api/v3/mvccpb" "go.etcd.io/etcd/pkg/v3/traceutil" @@ -156,7 +157,7 @@ func (tr *storeTxnRead) rangeKeys(ctx context.Context, key, end []byte, curRev i for i, revpair := range revpairs[:len(kvs)] { select { case <-ctx.Done(): - return nil, ctx.Err() + return nil, fmt.Errorf("rangeKeys: context cancelled: %w", ctx.Err()) default: } revToBytes(revpair, revBytes)