Skip to content

Commit

Permalink
wip
Browse files Browse the repository at this point in the history
  • Loading branch information
bbrks committed Jan 19, 2024
1 parent b685e80 commit 056fecb
Show file tree
Hide file tree
Showing 4 changed files with 76 additions and 35 deletions.
4 changes: 4 additions & 0 deletions db/blip_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -995,6 +995,10 @@ func (bh *blipHandler) processRev(rq *blip.Message, stats *processRevStats) (err
// include current rev property in history
history = append(history, rev)
}

// TODO: Extract/parse HLV from history for MV and PV
// CV kept in rev property on its own

var versionVectorStr string
if historyStr := rq.Properties[RevMessageHistory]; historyStr != "" && bh.activeCBMobileSubprotocol < CBMobileReplicationV4 {
history = append(history, strings.Split(historyStr, ",")...)
Expand Down
10 changes: 5 additions & 5 deletions db/crud.go
Original file line number Diff line number Diff line change
Expand Up @@ -2768,16 +2768,16 @@ func (db *DatabaseCollectionWithUser) CheckProposedRev(ctx context.Context, doci

// CheckProposedVersion Given DocID and a version vector in string form (as seen over Blip), check whether
// it can be added without conflict.
func (db *DatabaseCollectionWithUser) CheckProposedVersion(ctx context.Context, docid, versionVector string) (status ProposedRevStatus, currentVersion string) {
func (db *DatabaseCollectionWithUser) CheckProposedVersion(ctx context.Context, docid, proposedVersionStr string) (status ProposedRevStatus, currentVersion string) {
if strings.HasPrefix(docid, "_design/") && db.user != nil {
return ProposedRev_OK, "" // Users can't upload design docs, so ignore them
}

incomingHLV, err := extractHLVFromBlipMessage(versionVector)
proposedVersion, err := CreateVersionFromString(proposedVersionStr)
if err != nil {
base.WarnfCtx(ctx, "Couldn't parse proposed version for doc %q / %q: %v", base.UD(docid), proposedVersionStr, err)
return ProposedRev_Error, ""
}
incomingDocCV := Version{SourceID: incomingHLV.SourceID, Value: incomingHLV.Version}

localDocCV := Version{}
doc, err := db.GetDocSyncDataNoImport(ctx, docid, DocUnmarshalNoHistory)
Expand All @@ -2790,10 +2790,10 @@ func (db *DatabaseCollectionWithUser) CheckProposedVersion(ctx context.Context,
return ProposedRev_Error, ""
}
return ProposedRev_OK_IsNew, ""
} else if localDocCV == incomingDocCV {
} else if localDocCV == proposedVersion {

return ProposedRev_Exists, ""
} else if doc.HLV.IsInConflict(incomingHLV) {
} else if doc.HLV.IsVersionInConflict(proposedVersion) {
return ProposedRev_Conflict, localDocCV.String()
}

Expand Down
72 changes: 42 additions & 30 deletions db/database_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -307,14 +307,7 @@ func TestDatabase(t *testing.T) {

}

// TestCheckProposedVersion:
// - Create a doc on database
// - Run doc ID through CheckProposedVersion with the same CV as the local doc
// - Run doc ID through CheckProposedVersion with the non conflict hlv present
// - Run new doc ID through CheckProposedVersion
// - Run doc ID through CheckProposedVersion with conflicting hlv
// - Run doc ID through CheckProposedVersion with invalid HLV string
// - Assert all above scenarios return appropriate status
// TestCheckProposedVersion ensures that a given CV will return the appropriate status based on the information present in the HLV.
func TestCheckProposedVersion(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyAll)

Expand All @@ -328,32 +321,51 @@ func TestCheckProposedVersion(t *testing.T) {
_, doc, err := collection.Put(ctx, "doc1", body)
require.NoError(t, err)

src, vrs := doc.HLV.GetCurrentVersion()
hlvString := fmt.Sprint(vrs, "@", src)

// version exists scenario
status, _ := collection.CheckProposedVersion(ctx, "doc1", hlvString)
assert.Equal(t, ProposedRev_Exists, status)
t.Run("version exists", func(t *testing.T) {
var hlv HybridLogicalVector
assert.NoError(t, base.DeepCopyInefficient(&hlv, doc.HLV))
hlvString := hlv.GetCurrentVersionString()
status, _ := collection.CheckProposedVersion(ctx, "doc1", hlvString)
assert.Equal(t, ProposedRev_Exists, status)
})

// non conflict new version scenario
hlvString = fmt.Sprint(vrs+100, "@", bucketUUID, ";", hlvString)
status, _ = collection.CheckProposedVersion(ctx, "doc1", hlvString)
assert.Equal(t, ProposedRev_OK, status)
t.Run("non conflict new version", func(t *testing.T) {
var hlv HybridLogicalVector
assert.NoError(t, base.DeepCopyInefficient(&hlv, doc.HLV))
err = hlv.AddVersion(Version{bucketUUID, hlv.CurrentVersionCAS + 100})
require.NoError(t, err)
// TODO: Get full BLIP formatted HLV
hlvString := hlv.GetCurrentVersionString()
status, _ := collection.CheckProposedVersion(ctx, "doc1", hlvString)
assert.Equal(t, ProposedRev_OK, status)

Check failure on line 340 in db/database_test.go

View workflow job for this annotation

GitHub Actions / test (ubuntu)

database_test.go:340: Error Trace: /home/runner/work/sync_gateway/sync_gateway/db/database_test.go:340 Error: Not equal: expected: 0 actual : 409 Test: TestCheckProposedVersion/non_conflict_new_version

Check failure on line 340 in db/database_test.go

View workflow job for this annotation

GitHub Actions / test (windows)

database_test.go:340: Error Trace: D:/a/sync_gateway/sync_gateway/db/database_test.go:340 Error: Not equal: expected: 0 actual : 409 Test: TestCheckProposedVersion/non_conflict_new_version

Check failure on line 340 in db/database_test.go

View workflow job for this annotation

GitHub Actions / test-race

database_test.go:340: Error Trace: /home/runner/work/sync_gateway/sync_gateway/db/database_test.go:340 Error: Not equal: expected: 0 actual : 409 Test: TestCheckProposedVersion/non_conflict_new_version

Check failure on line 340 in db/database_test.go

View workflow job for this annotation

GitHub Actions / test (macos)

database_test.go:340: Error Trace: /Users/runner/work/sync_gateway/sync_gateway/db/database_test.go:340 Error: Not equal: expected: 0 actual : 409 Test: TestCheckProposedVersion/non_conflict_new_version
})

// new doc scenario
hlvString = fmt.Sprint(100, "@", bucketUUID)
status, _ = collection.CheckProposedVersion(ctx, "doc2", hlvString)
assert.Equal(t, ProposedRev_OK_IsNew, status)
t.Run("new doc same cv", func(t *testing.T) {
var hlv HybridLogicalVector
assert.NoError(t, base.DeepCopyInefficient(&hlv, doc.HLV))
hlvString := hlv.GetCurrentVersionString()
status, _ := collection.CheckProposedVersion(ctx, "doc2", hlvString)
assert.Equal(t, ProposedRev_OK_IsNew, status)
})

// conflict scenario
hlvString = fmt.Sprint(4, "@", "cluster2", ";", 1, "@", "cluster2")
status, _ = collection.CheckProposedVersion(ctx, "doc1", hlvString)
assert.Equal(t, ProposedRev_Conflict, status)
t.Run("conflict", func(t *testing.T) {
var hlv HybridLogicalVector
assert.NoError(t, base.DeepCopyInefficient(&hlv, doc.HLV))
err = hlv.AddVersion(Version{"cluster2", hlv.CurrentVersionCAS + 1})
require.NoError(t, err)
err = hlv.AddVersion(Version{"cluster2", hlv.CurrentVersionCAS + 4})
require.NoError(t, err)
// TODO: Get full BLIP formatted HLV
hlvString := hlv.GetCurrentVersionString()
status, _ := collection.CheckProposedVersion(ctx, "doc1", hlvString)
assert.Equal(t, ProposedRev_Conflict, status)
})

// test error scenario
hlvString = fmt.Sprint("")
status, _ = collection.CheckProposedVersion(ctx, "doc1", hlvString)
assert.Equal(t, ProposedRev_Error, status)
t.Run("invalid hlv", func(t *testing.T) {
hlvString := ""
status, _ := collection.CheckProposedVersion(ctx, "doc1", hlvString)
assert.Equal(t, ProposedRev_Error, status)
})

}

Expand Down
25 changes: 25 additions & 0 deletions db/hybrid_logical_vector.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,6 +104,15 @@ func (hlv *HybridLogicalVector) GetCurrentVersionString() string {
return version.String()
}

// IsVersionInConflict tests to see if a given version would be in conflict with the in memory HLV.
func (hlv *HybridLogicalVector) IsVersionInConflict(version Version) bool {
// TODO: Complete
if hlv.isVersionDominating(version) {
return false
}
return true
}

// IsInConflict tests to see if in memory HLV is conflicting with another HLV
func (hlv *HybridLogicalVector) IsInConflict(otherVector HybridLogicalVector) bool {
// test if either HLV(A) or HLV(B) are dominating over each other. If so they are not in conflict
Expand Down Expand Up @@ -179,6 +188,22 @@ func (hlv *HybridLogicalVector) isDominating(otherVector HybridLogicalVector) bo
return false
}

// isVersionDominating tests if in memory HLV is dominating the given version
func (hlv *HybridLogicalVector) isVersionDominating(version Version) bool {
// Dominating Criteria:
// HLV A dominates HLV B if source(A) == source(B) and version(A) > version(B)
// If there is an entry in pv(B) for A's current source and version(A) > B's version for that pv entry then A is dominating
// if there is an entry in mv(B) for A's current source and version(A) > B's version for that pv entry then A is dominating

// Grab the latest CAS version for HLV(A)'s sourceID in HLV(B), if HLV(A) version CAS is > HLV(B)'s then it is dominating
// If 0 CAS is returned then the sourceID does not exist on HLV(B)
if hlv.Version > version.Value {
return true
}
// HLV A is not dominating over version
return false
}

// isEqual tests if in memory HLV is equal to another
func (hlv *HybridLogicalVector) isEqual(otherVector HybridLogicalVector) bool {
// if in HLV(A) sourceID the same as HLV(B) sourceID and HLV(A) CAS is equal to HLV(B) CAS then the two HLV's are equal
Expand Down

0 comments on commit 056fecb

Please sign in to comment.