Skip to content

Commit

Permalink
CBG-4432: Implement Delete on the BlipTesterClient and enable delete-…
Browse files Browse the repository at this point in the history
…related topology tests (#7251)

* Implement Delete on the BlipTesterClient and enable delete-related topology tests

* Skip flaky TestMultiActorConflictDelete

* Enforce non-nil parentVersion in BlipTesterCollectionClient Delete()

* Use sequence number in HLV for BlipTesterClient instead of UnixNano because of poor Windows time resolution

* Put back to UnixNano and add TODO

* Skip on Windows for flaky CV comparison
  • Loading branch information
bbrks authored Dec 21, 2024
1 parent f62a7e6 commit 61b0628
Show file tree
Hide file tree
Showing 11 changed files with 93 additions and 120 deletions.
6 changes: 3 additions & 3 deletions rest/blip_api_attachment_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -379,7 +379,7 @@ func TestBlipPushPullNewAttachmentNoCommonAncestor(t *testing.T) {
bodyText := `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"data":"aGVsbG8gd29ybGQ="}}}`
rev := NewDocVersionFromFakeRev("2-abc")
// FIXME CBG-4400: docID: doc1 was not found on the client - expecting to update doc based on parentVersion RevID: 2-abc
err := btcRunner.StoreRevOnClient(btc.id, docID, &rev, []byte(bodyText))
_, err := btcRunner.AddRev(btc.id, docID, &rev, []byte(bodyText))
require.NoError(t, err)

bodyText = `{"greetings":[{"hi":"alice"}],"_attachments":{"hello.txt":{"revpos":2,"length":11,"stub":true,"digest":"sha1-Kq5sNclPz7QV2+lfQIuc6R7oRu0="}}}`
Expand Down Expand Up @@ -608,7 +608,7 @@ func TestBlipLegacyAttachNameChange(t *testing.T) {
docVersion := client1.GetDocVersion(docID)

// Store the document and attachment on the test client
err := btcRunner.StoreRevOnClient(client1.id, docID, &docVersion, rawDoc)
_, err := btcRunner.AddRev(client1.id, docID, &docVersion, rawDoc)
// FIXME CBG-4400: docID: doc was not found on the client - expecting to update doc based on parentVersion RevID: 1-5fc93bd36377008f96fdae2719c174ed
require.NoError(t, err)

Expand Down Expand Up @@ -672,7 +672,7 @@ func TestBlipLegacyAttachDocUpdate(t *testing.T) {

// Store the document and attachment on the test client
// FIXME CBG-4400: docID: doc was not found on the client - expecting to update doc based on parentVersion RevID: 1-5fc93bd36377008f96fdae2719c174ed
err := btcRunner.StoreRevOnClient(client1.id, docID, &version, rawDoc)
_, err := btcRunner.AddRev(client1.id, docID, &version, rawDoc)
require.NoError(t, err)
btcRunner.AttachmentsLock(client1.id).Lock()
btcRunner.Attachments(client1.id)[digest] = attBody
Expand Down
64 changes: 53 additions & 11 deletions rest/utilities_testing_blip_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,6 +314,32 @@ func (btcc *BlipTesterCollectionClient) GetDoc(docID string) ([]byte, *DocVersio
return latestRev.body, &latestRev.version
}

// IsTombstoned returns true if the latest version of the doc is a tombstone.
func (btcc *BlipTesterCollectionClient) IsTombstoned(docID string) (bool, error) {
doc, exists := btcc.getClientDoc(docID)
if !exists {
return false, base.ErrNotFound
}
rev, err := doc.latestRev()
if err != nil {
return false, err
}
return rev.isDelete, nil
}

// IsVersionTombstone returns true if the given version is found and is a tombstone.
func (btcc *BlipTesterCollectionClient) IsVersionTombstone(docID string, version DocVersion) (bool, error) {
doc, exists := btcc.getClientDoc(docID)
if !exists {
return false, base.ErrNotFound
}
rev, err := doc.getRev(version)
if err != nil {
return false, err
}
return rev.isDelete, nil
}

// getClientDoc returns the clientDoc for the given docID, if it exists.
func (btcc *BlipTesterCollectionClient) getClientDoc(docID string) (*clientDoc, bool) {
btcc.seqLock.RLock()
Expand Down Expand Up @@ -1175,6 +1201,7 @@ type proposeChangeBatchEntry struct {
revTreeIDHistory []string
hlvHistory db.HybridLogicalVector
latestServerVersion DocVersion
isDelete bool
}

func (e proposeChangeBatchEntry) historyStr() string {
Expand All @@ -1200,7 +1227,7 @@ func proposeChangesEntryForDoc(doc *clientDoc) proposeChangeBatchEntry {
}
revisionHistory = append(revisionHistory, doc._revisionsBySeq[seq].version.RevTreeID)
}
return proposeChangeBatchEntry{docID: doc.id, version: latestRev.version, revTreeIDHistory: revisionHistory, hlvHistory: latestRev.HLV, latestServerVersion: doc._latestServerVersion}
return proposeChangeBatchEntry{docID: doc.id, version: latestRev.version, revTreeIDHistory: revisionHistory, hlvHistory: latestRev.HLV, latestServerVersion: doc._latestServerVersion, isDelete: latestRev.isDelete}
}

// StartPull will begin a push replication with the given options between the client and server
Expand Down Expand Up @@ -1318,6 +1345,12 @@ func (btcc *BlipTesterCollectionClient) StartPushWithOpts(opts BlipTesterPushOpt
docBody := doc._revisionsBySeq[doc._seqsByVersions[change.version]].body
doc.lock.RUnlock()

if change.isDelete {
revRequest.Properties[db.RevMessageDeleted] = "1"
// SG doesn't like nil bodies - transform the tombstone into an empty body
docBody = []byte(base.EmptyDocument)
}

if serverDeltas && btcc.parent.ClientDeltas && ok && !serverRev.isDelete {
base.DebugfCtx(ctx, base.KeySGTest, "specifying last known server version as deltaSrc for doc %s = %v", change.docID, change.latestServerVersion)
var deltaSrc string
Expand Down Expand Up @@ -1511,6 +1544,7 @@ func (btr *BlipTesterReplicator) sendMsg(msg *blip.Message) (err error) {
}

// upsertDoc will create or update the doc based on whether parentVersion is passed or not. Enforces MVCC update.
// body can be nil and the update will be treated as a tombstone/delete.
func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *DocVersion, body []byte) (*clientDocRev, error) {
btc.seqLock.Lock()
defer btc.seqLock.Unlock()
Expand Down Expand Up @@ -1561,6 +1595,7 @@ func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *Do

var docVersion DocVersion
if btc.UseHLV() {
// TODO: CBG-4440 Construct a HLC for Value - UnixNano is not accurate enough on Windows to generate unique values, and seq is not comparable across clients.
newVersion := db.Version{SourceID: fmt.Sprintf("btc-%d", btc.parent.id), Value: uint64(time.Now().UnixNano())}
if err := hlv.AddVersion(newVersion); err != nil {
return nil, err
Expand All @@ -1574,7 +1609,7 @@ func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *Do

btc._seqLast++
newSeq := btc._seqLast
rev := clientDocRev{clientSeq: newSeq, version: docVersion, body: body, HLV: hlv}
rev := clientDocRev{clientSeq: newSeq, version: docVersion, body: body, HLV: hlv, isDelete: body == nil}
doc.addNewRev(rev)

btc._seqStore[newSeq] = doc
Expand All @@ -1587,6 +1622,18 @@ func (btc *BlipTesterCollectionClient) upsertDoc(docID string, parentVersion *Do
return &rev, nil
}

// Delete creates a tombstone for the document.
func (btc *BlipTesterCollectionClient) Delete(docID string, parentVersion *DocVersion) (DocVersion, error) {
if parentVersion == nil {
return DocVersion{}, fmt.Errorf("parentVersion must be provided for delete operation")
}
newRev, err := btc.upsertDoc(docID, parentVersion, nil)
if err != nil {
return DocVersion{}, err
}
return newRev.version, nil
}

// AddRev creates a revision on the client.
// The rev ID is always: "N-abc", where N is rev generation for predictability.
func (btc *BlipTesterCollectionClient) AddRev(docID string, parentVersion *DocVersion, body []byte) (DocVersion, error) { // Inline attachment processing
Expand Down Expand Up @@ -1743,11 +1790,6 @@ func (btc *BlipTesterCollectionClient) PushRevWithHistory(docID string, parentVe
return &newRev.version, nil
}

func (btc *BlipTesterCollectionClient) StoreRevOnClient(docID string, parentVersion *DocVersion, body []byte) error {
_, err := btc.upsertDoc(docID, parentVersion, body)
return err
}

func (btc *BlipTesterCollectionClient) ProcessInlineAttachments(inputBody []byte, revGen int) (outputBody []byte, err error) {
if bytes.Contains(inputBody, []byte(db.BodyAttachments)) {
var newDocJSON map[string]interface{}
Expand Down Expand Up @@ -1999,6 +2041,10 @@ func (btcRunner *BlipTestClientRunner) AddRev(clientID uint32, docID string, ver
return btcRunner.SingleCollection(clientID).AddRev(docID, version, body)
}

func (btcRunner *BlipTestClientRunner) Delete(clientID uint32, docID string, version *DocVersion) (DocVersion, error) {
return btcRunner.SingleCollection(clientID).Delete(docID, version)
}

func (btcRunner *BlipTestClientRunner) PushUnsolicitedRev(clientID uint32, docID string, parentVersion *DocVersion, body []byte) (*DocVersion, error) {
return btcRunner.SingleCollection(clientID).PushUnsolicitedRev(docID, parentVersion, body)
}
Expand All @@ -2015,10 +2061,6 @@ func (btcRunner *BlipTestClientRunner) saveAttachment(clientID uint32, contentTy
return btcRunner.SingleCollection(clientID).saveAttachment(contentType, attachmentData)
}

func (btcRunner *BlipTestClientRunner) StoreRevOnClient(clientID uint32, docID string, parentVersion *DocVersion, body []byte) error {
return btcRunner.SingleCollection(clientID).StoreRevOnClient(docID, parentVersion, body)
}

func (btcRunner *BlipTestClientRunner) PushRevWithHistory(clientID uint32, docID string, parentVersion *DocVersion, body []byte, revCount, prunedRevCount int) (*DocVersion, error) {
return btcRunner.SingleCollection(clientID).PushRevWithHistory(docID, parentVersion, body, revCount, prunedRevCount)
}
Expand Down
35 changes: 19 additions & 16 deletions topologytest/couchbase_lite_mock_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -113,8 +113,17 @@ func (p *CouchbaseLiteMockPeer) WriteDocument(dsName sgbucket.DataStoreName, doc
}

// DeleteDocument deletes a document on the peer. The test will fail if the document does not exist.
func (p *CouchbaseLiteMockPeer) DeleteDocument(sgbucket.DataStoreName, string) DocMetadata {
return DocMetadata{}
func (p *CouchbaseLiteMockPeer) DeleteDocument(dsName sgbucket.DataStoreName, docID string) DocMetadata {
p.TB().Logf("%s: Deleting document %s", p, docID)
client := p.getSingleSGBlipClient().CollectionClient(dsName)
_, parentMeta := p.getLatestDocVersion(dsName, docID)
parentVersion := rest.EmptyDocVersion()
if parentMeta != nil {
parentVersion = &db.DocVersion{CV: parentMeta.CV(p.TB())}
}
docVersion, err := client.Delete(docID, parentVersion)
require.NoError(p.TB(), err)
return DocMetadataFromDocVersion(p.TB(), docID, docVersion)
}

// WaitForDocVersion waits for a document to reach a specific version. The test will fail if the document does not reach the expected version in 20s.
Expand All @@ -134,21 +143,15 @@ func (p *CouchbaseLiteMockPeer) WaitForDocVersion(dsName sgbucket.DataStoreName,
return body
}

// WaitForDeletion waits for a document to be deleted. This document must be a tombstone. The test will fail if the document still exists after 20s.
func (p *CouchbaseLiteMockPeer) WaitForDeletion(_ sgbucket.DataStoreName, _ string) {
require.Fail(p.TB(), "WaitForDeletion not yet implemented CBG-4257")
}

// WaitForTombstoneVersion waits for a document to reach a specific version, this must be a tombstone. The test will fail if the document does not reach the expected version in 20s.
func (p *CouchbaseLiteMockPeer) WaitForTombstoneVersion(_ sgbucket.DataStoreName, _ string, _ DocMetadata) {
require.Fail(p.TB(), "WaitForTombstoneVersion not yet implemented CBG-4257")
}

// RequireDocNotFound asserts that a document does not exist on the peer.
func (p *CouchbaseLiteMockPeer) RequireDocNotFound(sgbucket.DataStoreName, string) {
// not implemented yet in blip client tester
// _, err := p.btcRunner.GetDoc(p.btc.id, docID)
// base.RequireDocNotFoundError(p.btcRunner.TB(), err)
func (p *CouchbaseLiteMockPeer) WaitForTombstoneVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata) {
client := p.getSingleSGBlipClient().CollectionClient(dsName)
expectedVersion := db.DocVersion{CV: expected.CV(p.TB())}
require.EventuallyWithT(p.TB(), func(c *assert.CollectT) {
isTombstone, err := client.IsVersionTombstone(docID, expectedVersion)
require.NoError(c, err)
require.True(c, isTombstone, "expected docID %s on peer %s to be deleted", docID, p)
}, totalWaitTime, pollInterval)
}

// Close will shut down the peer and close any active replications on the peer.
Expand Down
14 changes: 0 additions & 14 deletions topologytest/couchbase_server_peer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -163,14 +163,6 @@ func (p *CouchbaseServerPeer) WaitForDocVersion(dsName sgbucket.DataStoreName, d
return body
}

// WaitForDeletion waits for a document to be deleted. This document must be a tombstone. The test will fail if the document still exists after 20s.
func (p *CouchbaseServerPeer) WaitForDeletion(dsName sgbucket.DataStoreName, docID string) {
require.EventuallyWithT(p.tb, func(c *assert.CollectT) {
_, err := p.getCollection(dsName).Get(docID, nil)
assert.True(c, base.IsDocNotFoundError(err), "expected docID %s to be deleted from peer %s, found err=%v", docID, p.name, err)
}, totalWaitTime, pollInterval)
}

// WaitForTombstoneVersion waits for a document to reach a specific version, this must be a tombstone. The test will fail if the document does not reach the expected version in 20s.
func (p *CouchbaseServerPeer) WaitForTombstoneVersion(dsName sgbucket.DataStoreName, docID string, expected DocMetadata) {
docBytes := p.waitForDocVersion(dsName, docID, expected)
Expand Down Expand Up @@ -198,12 +190,6 @@ func (p *CouchbaseServerPeer) waitForDocVersion(dsName sgbucket.DataStoreName, d
return docBytes
}

// RequireDocNotFound asserts that a document does not exist on the peer.
func (p *CouchbaseServerPeer) RequireDocNotFound(dsName sgbucket.DataStoreName, docID string) {
_, err := p.getCollection(dsName).Get(docID, nil)
base.RequireDocNotFoundError(p.tb, err)
}

// Close will shut down the peer and close any active replications on the peer.
func (p *CouchbaseServerPeer) Close() {
for _, r := range p.pullReplications {
Expand Down
12 changes: 2 additions & 10 deletions topologytest/hlv_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,11 +59,7 @@ func waitForVersionAndBody(t *testing.T, dsName base.ScopeAndCollectionName, pee
}

func waitForTombstoneVersion(t *testing.T, dsName base.ScopeAndCollectionName, peers Peers, docID string, expectedVersion BodyAndVersion) {
for peerName, peer := range peers.SortedPeers() {
if peer.Type() == PeerTypeCouchbaseLite {
t.Logf("skipping deletion check for Couchbase Lite peer %s, CBG-4432", peerName)
continue
}
for _, peer := range peers.SortedPeers() {
t.Logf("waiting for tombstone version %#v on %s, written from %s", expectedVersion, peer, expectedVersion.updatePeer)
peer.WaitForTombstoneVersion(dsName, docID, expectedVersion.docMeta)
}
Expand Down Expand Up @@ -138,11 +134,7 @@ func deleteConflictDocs(t *testing.T, dsName base.ScopeAndCollectionName, peers
continue
}
deleteVersion := peer.DeleteDocument(dsName, docID)
if peer.Type() == PeerTypeCouchbaseLite {
t.Logf("Don't include deleteVersion from Couchbase Lite peers when determining lastWrite %s, CBG-4432", peerName)
continue
}
t.Logf("deleteVersion on peer %s: %+v", peerName, deleteVersion)
t.Logf("deleteVersion: %+v", deleteVersion)
documentVersion = append(documentVersion, BodyAndVersion{docMeta: deleteVersion, updatePeer: peerName})
}
index := len(documentVersion) - 1
Expand Down
4 changes: 1 addition & 3 deletions topologytest/multi_actor_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -83,9 +83,7 @@ func TestMultiActorConflictUpdate(t *testing.T) {
// 6. start replications
// 7. assert that the documents are deleted on all peers and have hlv sources equal to the number of active peers
func TestMultiActorConflictDelete(t *testing.T) {
if !base.UnitTestUrlIsWalrus() {
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
}
t.Skip("Flakey failures on multi actor conflicting writes, CBG-4379")
for _, topology := range append(simpleTopologies, Topologies...) {
t.Run(topology.description, func(t *testing.T) {
collectionName, peers, replications := setupTests(t, topology)
Expand Down
9 changes: 0 additions & 9 deletions topologytest/multi_actor_no_conflict_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,11 +56,6 @@ func TestMultiActorDelete(t *testing.T) {

for createPeerName, createPeer := range peers.ActivePeers() {
for deletePeerName, deletePeer := range peers {
// CBG-4432: implement delete document in blip tester
if deletePeer.Type() == PeerTypeCouchbaseLite {
continue
}

docID := getDocID(t) + "_create=" + createPeerName + ",update=" + deletePeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "deletePeer": "%s", "topology": "%s", "action": "create"}`, createPeerName, createPeerName, deletePeer, topology.description))
createVersion := createPeer.CreateDocument(collectionName, docID, body1)
Expand Down Expand Up @@ -93,10 +88,6 @@ func TestMultiActorResurrect(t *testing.T) {

for createPeerName, createPeer := range peers.ActivePeers() {
for deletePeerName, deletePeer := range peers {
// CBG-4432: implement delete document in blip tester
if deletePeer.Type() == PeerTypeCouchbaseLite {
continue
}
for resurrectPeerName, resurrectPeer := range peers {
docID := getDocID(t) + "_create=" + createPeerName + ",delete=" + deletePeerName + ",resurrect=" + resurrectPeerName
body1 := []byte(fmt.Sprintf(`{"activePeer": "%s", "createPeer": "%s", "deletePeer": "%s", "resurrectPeer": "%s", "topology": "%s", "action": "create"}`, createPeerName, createPeerName, deletePeer, resurrectPeer, topology.description))
Expand Down
Loading

0 comments on commit 61b0628

Please sign in to comment.