diff --git a/tests/robustness/client/client.go b/tests/robustness/client/client.go index bd3ccb5b9f3d..516e5d5df8dc 100644 --- a/tests/robustness/client/client.go +++ b/tests/robustness/client/client.go @@ -193,9 +193,13 @@ func (c *RecordingClient) Defragment(ctx context.Context) (*clientv3.DefragmentR func (c *RecordingClient) Compact(ctx context.Context, rev int64) (*clientv3.CompactResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() + callTime := time.Since(c.baseTime) resp, err := c.client.Compact(ctx, rev) + returnTime := time.Since(c.baseTime) + c.kvOperations.AppendCompact(rev, callTime, returnTime, resp, err) return resp, err } + func (c *RecordingClient) MemberList(ctx context.Context, opts ...clientv3.OpOption) (*clientv3.MemberListResponse, error) { c.kvMux.Lock() defer c.kvMux.Unlock() diff --git a/tests/robustness/failpoint/failpoint.go b/tests/robustness/failpoint/failpoint.go index 14e6ddf7e940..3c1d2b1bf979 100644 --- a/tests/robustness/failpoint/failpoint.go +++ b/tests/robustness/failpoint/failpoint.go @@ -39,9 +39,11 @@ var ( KillFailpoint, BeforeCommitPanic, AfterCommitPanic, RaftBeforeSavePanic, RaftAfterSavePanic, DefragBeforeCopyPanic, DefragBeforeRenamePanic, BackendBeforePreCommitHookPanic, BackendAfterPreCommitHookPanic, BackendBeforeStartDBTxnPanic, BackendAfterStartDBTxnPanic, BackendBeforeWritebackBufPanic, - BackendAfterWritebackBufPanic, CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, + BackendAfterWritebackBufPanic, + CompactBeforeCommitScheduledCompactPanic, CompactAfterCommitScheduledCompactPanic, CompactBeforeSetFinishedCompactPanic, CompactAfterSetFinishedCompactPanic, CompactBeforeCommitBatchPanic, - CompactAfterCommitBatchPanic, RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, + CompactAfterCommitBatchPanic, + RaftBeforeLeaderSendPanic, BlackholePeerNetwork, DelayPeerNetwork, RaftBeforeFollowerSendPanic, RaftBeforeApplySnapPanic, RaftAfterApplySnapPanic, RaftAfterWALReleasePanic, RaftBeforeSaveSnapPanic, RaftAfterSaveSnapPanic, BlackholeUntilSnapshot, BeforeApplyOneConfChangeSleep, diff --git a/tests/robustness/model/describe.go b/tests/robustness/model/describe.go index 42c23ddd53ec..f6694eb51a49 100644 --- a/tests/robustness/model/describe.go +++ b/tests/robustness/model/describe.go @@ -25,6 +25,9 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin if response.Error != "" { return fmt.Sprintf("err: %q", response.Error) } + if response.ClientError != "" { + return fmt.Sprintf("err: %q", response.ClientError) + } if response.PartialResponse { return fmt.Sprintf("unknown, rev: %d", response.Revision) } @@ -33,7 +36,7 @@ func describeEtcdResponse(request EtcdRequest, response MaybeEtcdResponse) strin return fmt.Sprintf("%s, rev: %d", describeRangeResponse(request.Range.RangeOptions, *response.Range), response.Revision) case Txn: return fmt.Sprintf("%s, rev: %d", describeTxnResponse(request.Txn, response.Txn), response.Revision) - case LeaseGrant, LeaseRevoke, Defragment: + case LeaseGrant, LeaseRevoke, Defragment, Compact: if response.Revision == 0 { return "ok" } @@ -67,6 +70,8 @@ func describeEtcdRequest(request EtcdRequest) string { return fmt.Sprintf("leaseRevoke(%d)", request.LeaseRevoke.LeaseID) case Defragment: return fmt.Sprintf("defragment()") + case Compact: + return fmt.Sprintf("compact(%d)", request.Compact.Revision) default: return fmt.Sprintf("", request.Type) } diff --git a/tests/robustness/model/deterministic.go b/tests/robustness/model/deterministic.go index f59bb91ab541..fdfa095e5276 100644 --- a/tests/robustness/model/deterministic.go +++ b/tests/robustness/model/deterministic.go @@ -23,6 +23,8 @@ import ( "sort" "github.com/anishathalye/porcupine" + + "go.etcd.io/etcd/server/v3/storage/mvcc" ) // DeterministicModel assumes a deterministic execution of etcd requests. All @@ -64,10 +66,11 @@ var DeterministicModel = porcupine.Model{ } type EtcdState struct { - Revision int64 - KeyValues map[string]ValueRevision - KeyLeases map[string]int64 - Leases map[int64]EtcdLease + Revision int64 + CompactRevision int64 + KeyValues map[string]ValueRevision + KeyLeases map[string]int64 + Leases map[int64]EtcdLease } func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, EtcdState) { @@ -77,10 +80,12 @@ func (s EtcdState) apply(request EtcdRequest, response EtcdResponse) (bool, Etcd func freshEtcdState() EtcdState { return EtcdState{ - Revision: 1, - KeyValues: map[string]ValueRevision{}, - KeyLeases: map[string]int64{}, - Leases: map[int64]EtcdLease{}, + Revision: 1, + // Start from CompactRevision equal -1 as etcd allows client to compact revision 0 for some reason. + CompactRevision: -1, + KeyValues: map[string]ValueRevision{}, + KeyLeases: map[string]int64{}, + Leases: map[int64]EtcdLease{}, } } @@ -100,6 +105,9 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { if request.Range.Revision > s.Revision { return s, MaybeEtcdResponse{Error: ErrEtcdFutureRev.Error()} } + if request.Range.Revision < s.CompactRevision { + return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}} + } return s, MaybeEtcdResponse{PartialResponse: true, EtcdResponse: EtcdResponse{Revision: s.Revision}} case Txn: failure := false @@ -178,6 +186,12 @@ func (s EtcdState) Step(request EtcdRequest) (EtcdState, MaybeEtcdResponse) { return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Revision: s.Revision, LeaseRevoke: &LeaseRevokeResponse{}}} case Defragment: return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: s.Revision}} + case Compact: + if request.Compact.Revision <= s.CompactRevision { + return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}} + } + s.CompactRevision = request.Compact.Revision + return s, MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: s.Revision}} default: panic(fmt.Sprintf("Unknown request type: %v", request.Type)) } @@ -237,6 +251,7 @@ const ( LeaseGrant RequestType = "leaseGrant" LeaseRevoke RequestType = "leaseRevoke" Defragment RequestType = "defragment" + Compact RequestType = "compact" ) type EtcdRequest struct { @@ -246,6 +261,7 @@ type EtcdRequest struct { Range *RangeRequest Txn *TxnRequest Defragment *DefragmentRequest + Compact *CompactRequest } func (r *EtcdRequest) IsRead() bool { @@ -337,6 +353,8 @@ type EtcdResponse struct { LeaseGrant *LeaseGrantReponse LeaseRevoke *LeaseRevokeResponse Defragment *DefragmentResponse + Compact *CompactResponse + ClientError string Revision int64 } @@ -398,3 +416,10 @@ func ToValueOrHash(value string) ValueOrHash { } return v } + +type CompactResponse struct { +} + +type CompactRequest struct { + Revision int64 +} diff --git a/tests/robustness/model/history.go b/tests/robustness/model/history.go index edf2c778f051..a11e1b8e1eff 100644 --- a/tests/robustness/model/history.go +++ b/tests/robustness/model/history.go @@ -16,6 +16,7 @@ package model import ( "fmt" + "strings" "time" "github.com/anishathalye/porcupine" @@ -23,6 +24,7 @@ import ( "go.etcd.io/etcd/api/v3/etcdserverpb" "go.etcd.io/etcd/api/v3/mvccpb" clientv3 "go.etcd.io/etcd/client/v3" + "go.etcd.io/etcd/server/v3/storage/mvcc" "go.etcd.io/etcd/tests/v3/robustness/identity" ) @@ -259,6 +261,25 @@ func (h *AppendableHistory) AppendDefragment(start, end time.Duration, resp *cli h.appendSuccessful(request, start, end, defragmentResponse(revision)) } +func (h *AppendableHistory) AppendCompact(rev int64, start, end time.Duration, resp *clientv3.CompactResponse, err error) { + request := compactRequest(rev) + if err != nil { + if strings.Contains(err.Error(), mvcc.ErrCompacted.Error()) { + h.appendSuccessful(request, start, end, MaybeEtcdResponse{ + EtcdResponse: EtcdResponse{ClientError: mvcc.ErrCompacted.Error()}, + }) + return + } + h.appendFailed(request, start, end, err) + return + } + var revision int64 + if resp != nil && resp.Header != nil { + revision = resp.Header.Revision + } + h.appendSuccessful(request, start, end, compactResponse(revision)) +} + func (h *AppendableHistory) appendFailed(request EtcdRequest, start, end time.Duration, err error) { op := porcupine.Operation{ ClientId: h.streamID, @@ -444,6 +465,14 @@ func defragmentResponse(revision int64) MaybeEtcdResponse { return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Defragment: &DefragmentResponse{}, Revision: revision}} } +func compactRequest(rev int64) EtcdRequest { + return EtcdRequest{Type: Compact, Compact: &CompactRequest{Revision: rev}} +} + +func compactResponse(revision int64) MaybeEtcdResponse { + return MaybeEtcdResponse{EtcdResponse: EtcdResponse{Compact: &CompactResponse{}, Revision: revision}} +} + type History struct { operations []porcupine.Operation } diff --git a/tests/robustness/report/wal.go b/tests/robustness/report/wal.go index c1d8e5decbee..ea0c0f187137 100644 --- a/tests/robustness/report/wal.go +++ b/tests/robustness/report/wal.go @@ -184,7 +184,11 @@ func parseEntryNormal(ent raftpb.Entry) (*model.EtcdRequest, error) { case raftReq.ClusterVersionSet != nil: return nil, nil case raftReq.Compaction != nil: - return nil, nil + request := model.EtcdRequest{ + Type: model.Compact, + Compact: &model.CompactRequest{Revision: raftReq.Compaction.Revision}, + } + return &request, nil case raftReq.Txn != nil: txn := model.TxnRequest{ Conditions: []model.EtcdCondition{}, diff --git a/tests/robustness/traffic/etcd.go b/tests/robustness/traffic/etcd.go index 4e80c633d571..d72ead4a2856 100644 --- a/tests/robustness/traffic/etcd.go +++ b/tests/robustness/traffic/etcd.go @@ -38,13 +38,14 @@ var ( {choice: List, weight: 15}, {choice: StaleGet, weight: 10}, {choice: StaleList, weight: 10}, - {choice: Put, weight: 23}, - {choice: LargePut, weight: 2}, {choice: Delete, weight: 5}, {choice: MultiOpTxn, weight: 5}, {choice: PutWithLease, weight: 5}, {choice: LeaseRevoke, weight: 5}, {choice: CompareAndSet, weight: 5}, + {choice: Put, weight: 22}, + {choice: LargePut, weight: 2}, + {choice: Compact, weight: 1}, }, } EtcdPut = etcdTraffic{ @@ -56,9 +57,10 @@ var ( {choice: List, weight: 15}, {choice: StaleGet, weight: 10}, {choice: StaleList, weight: 10}, - {choice: Put, weight: 40}, {choice: MultiOpTxn, weight: 5}, {choice: LargePut, weight: 5}, + {choice: Put, weight: 39}, + {choice: Compact, weight: 1}, }, } ) @@ -89,6 +91,7 @@ const ( LeaseRevoke etcdRequestType = "leaseRevoke" CompareAndSet etcdRequestType = "compareAndSet" Defragment etcdRequestType = "defragment" + Compact etcdRequestType = "compact" ) func (t etcdTraffic) Name() string { @@ -264,6 +267,12 @@ func (c etcdTrafficClient) Request(ctx context.Context, request etcdRequestType, if resp != nil { rev = resp.Header.Revision } + case Compact: + var resp *clientv3.CompactResponse + resp, err = c.client.Compact(opCtx, lastRev) + if resp != nil { + rev = resp.Header.Revision + } default: panic("invalid choice") } diff --git a/tests/robustness/traffic/kubernetes.go b/tests/robustness/traffic/kubernetes.go index 7ba278a90c9f..ede4720724ff 100644 --- a/tests/robustness/traffic/kubernetes.go +++ b/tests/robustness/traffic/kubernetes.go @@ -37,9 +37,10 @@ var ( resource: "pods", namespace: "default", writeChoices: []choiceWeight[KubernetesRequestType]{ - {choice: KubernetesUpdate, weight: 90}, + {choice: KubernetesUpdate, weight: 89}, {choice: KubernetesDelete, weight: 5}, {choice: KubernetesCreate, weight: 5}, + {choice: KubernetesCompact, weight: 5}, }, } ) @@ -167,6 +168,8 @@ func (t kubernetesTraffic) Write(ctx context.Context, kc *kubernetesClient, ids _, err = kc.OptimisticUpdate(writeCtx, key, fmt.Sprintf("%d", ids.NewRequestID()), rev) case KubernetesCreate: err = kc.OptimisticCreate(writeCtx, t.generateKey(), fmt.Sprintf("%d", ids.NewRequestID())) + case KubernetesCompact: + err = kc.Compact(writeCtx, rev) default: panic(fmt.Sprintf("invalid choice: %q", op)) } @@ -209,9 +212,10 @@ func (t kubernetesTraffic) generateKey() string { type KubernetesRequestType string const ( - KubernetesDelete KubernetesRequestType = "delete" - KubernetesUpdate KubernetesRequestType = "update" - KubernetesCreate KubernetesRequestType = "create" + KubernetesDelete KubernetesRequestType = "delete" + KubernetesUpdate KubernetesRequestType = "update" + KubernetesCreate KubernetesRequestType = "create" + KubernetesCompact KubernetesRequestType = "compact" ) type kubernetesClient struct { @@ -250,6 +254,11 @@ func (k kubernetesClient) RequestProgress(ctx context.Context) error { return k.client.RequestProgress(clientv3.WithRequireLeader(ctx)) } +func (k kubernetesClient) Compact(ctx context.Context, rev int64) error { + _, err := k.client.Compact(ctx, rev) + return err +} + // Kubernetes optimistically assumes that key didn't change since it was last observed, so it executes operations within a transaction conditioned on key not changing. // However, if the keys value changed it wants imminently to read it, thus the Get operation on failure. func (k kubernetesClient) optimisticOperationOrGet(ctx context.Context, key string, operation clientv3.Op, expectedRevision int64) (*mvccpb.KeyValue, error) { diff --git a/tests/robustness/validate/patch_history.go b/tests/robustness/validate/patch_history.go index 44185de99eb9..81754bcbfda1 100644 --- a/tests/robustness/validate/patch_history.go +++ b/tests/robustness/validate/patch_history.go @@ -192,6 +192,7 @@ func persistedOperationsReturnTime(allOperations []porcupine.Operation, persiste } } case model.LeaseGrant: + case model.Compact: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } @@ -216,6 +217,7 @@ func operationReturnTime(operations []porcupine.Operation) map[model.EtcdOperati } case model.Range: case model.LeaseGrant: + case model.Compact: default: panic(fmt.Sprintf("Unknown request type: %q", request.Type)) } diff --git a/tests/robustness/validate/validate_test.go b/tests/robustness/validate/validate_test.go index 1ebf90750d18..d752b8bb2576 100644 --- a/tests/robustness/validate/validate_test.go +++ b/tests/robustness/validate/validate_test.go @@ -49,11 +49,9 @@ func TestDataReports(t *testing.T) { } visualize := ValidateAndReturnVisualize(t, zaptest.NewLogger(t), Config{}, reports, persistedRequests, 5*time.Minute) - if t.Failed() { - err := visualize(filepath.Join(path, "history.html")) - if err != nil { - t.Fatal(err) - } + err = visualize(filepath.Join(path, "history.html")) + if err != nil { + t.Fatal(err) } }) } diff --git a/tests/robustness/watch.go b/tests/robustness/watch.go index f19fced73740..45869446fbda 100644 --- a/tests/robustness/watch.go +++ b/tests/robustness/watch.go @@ -67,45 +67,59 @@ type watchConfig struct { // watchUntilRevision watches all changes until context is cancelled, it has observed revision provided via maxRevisionChan or maxRevisionChan was closed. func watchUntilRevision(ctx context.Context, t *testing.T, c *client.RecordingClient, maxRevisionChan <-chan int64, cfg watchConfig) { var maxRevision int64 - var lastRevision int64 + var lastRevision int64 = 1 ctx, cancel := context.WithCancel(ctx) defer cancel() - watch := c.Watch(ctx, "", 1, true, true, false) +resetWatch: for { - select { - case <-ctx.Done(): - if maxRevision == 0 { - t.Errorf("Client didn't collect all events, max revision not set") - } - if lastRevision < maxRevision { - t.Errorf("Client didn't collect all events, revision got %d, expected: %d", lastRevision, maxRevision) - } - return - case revision, ok := <-maxRevisionChan: - if ok { - maxRevision = revision - if lastRevision >= maxRevision { - cancel() - } - } else { - // Only cancel if maxRevision was never set. + watch := c.Watch(ctx, "", lastRevision+1, true, true, false) + for { + select { + case <-ctx.Done(): if maxRevision == 0 { + t.Errorf("Client didn't collect all events, max revision not set") + } + if lastRevision < maxRevision { + t.Errorf("Client didn't collect all events, revision got %d, expected: %d", lastRevision, maxRevision) + } + return + case revision, ok := <-maxRevisionChan: + if ok { + maxRevision = revision + if lastRevision >= maxRevision { + cancel() + } + } else { + // Only cancel if maxRevision was never set. + if maxRevision == 0 { + cancel() + } + } + case resp, ok := <-watch: + if !ok { + t.Logf("Watch channel closed") + continue resetWatch + } + if cfg.requestProgress { + c.RequestProgress(ctx) + } + + if resp.Err() != nil { + if resp.Canceled { + if resp.CompactRevision > lastRevision { + lastRevision = resp.CompactRevision + } + continue resetWatch + } + t.Errorf("Watch stream received error, err %v", resp.Err()) + } + if len(resp.Events) > 0 { + lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision + } + if maxRevision != 0 && lastRevision >= maxRevision { cancel() } } - case resp := <-watch: - if cfg.requestProgress { - c.RequestProgress(ctx) - } - if resp.Err() != nil && !resp.Canceled { - t.Errorf("Watch stream received error, err %v", resp.Err()) - } - if len(resp.Events) > 0 { - lastRevision = resp.Events[len(resp.Events)-1].Kv.ModRevision - } - if maxRevision != 0 && lastRevision >= maxRevision { - cancel() - } } } }