Skip to content

Commit

Permalink
CBG-3355: Add current version to channel cache (#6571)
Browse files Browse the repository at this point in the history
* CBG-3255: Add current version to log entry for population on the channel cache. Pre-requisite for my work on adding CV to change entries. Only adds CV to log entry from docs seen over DCP at this time pending work on channel cache backfill

* add comments and protect against panic in channel cache population

* add more commnets

* updated to move test and few lines populating log entry
  • Loading branch information
gregns1 committed Dec 1, 2023
1 parent 071c677 commit 45b1d3f
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 0 deletions.
2 changes: 2 additions & 0 deletions channels/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type LogEntry struct {
PrevSequence uint64 // Sequence of previous active revision
IsPrincipal bool // Whether the log-entry is a tracking entry for a principal doc
CollectionID uint32 // Collection ID
SourceID string // SourceID allocated to the doc's Current Version on the HLV
Version uint64 // Version allocated to the doc's Current Version on the HLV
}

func (l LogEntry) String() string {
Expand Down
5 changes: 5 additions & 0 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -499,6 +499,7 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
if len(rawUserXattr) > 0 {
collection.revisionCache.RemoveWithRev(docID, syncData.CurrentRev)
}

change := &LogEntry{
Sequence: syncData.Sequence,
DocID: docID,
Expand All @@ -509,6 +510,10 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
Channels: syncData.Channels,
CollectionID: event.CollectionID,
}
if syncData.HLV != nil {
change.SourceID = syncData.HLV.SourceID
change.Version = syncData.HLV.Version
}

millisecondLatency := int(feedLatency / time.Millisecond)

Expand Down
18 changes: 18 additions & 0 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ func logEntry(seq uint64, docid string, revid string, channelNames []string, col
return entry
}

func testLogEntryWithCV(seq uint64, docid string, revid string, channelNames []string, collectionID uint32, sourceID string, version uint64) *LogEntry {
entry := &LogEntry{
Sequence: seq,
DocID: docid,
RevID: revid,
TimeReceived: time.Now(),
CollectionID: collectionID,
SourceID: sourceID,
Version: version,
}
channelMap := make(channels.ChannelMap)
for _, channelName := range channelNames {
channelMap[channelName] = nil
}
entry.Channels = channelMap
return entry
}

func TestSkippedSequenceList(t *testing.T) {

skipList := NewSkippedSequenceList()
Expand Down
42 changes: 42 additions & 0 deletions db/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -521,3 +521,45 @@ func BenchmarkChangesFeedDocUnmarshalling(b *testing.B) {
}

}

// TestCurrentVersionPopulationOnChannelCache:
// - Make channel active on cache
// - Add a doc that is assigned this channel
// - Get the sync data of that doc to assert against the HLV defined on it
// - Wait for the channel cache to be populated with this doc write
// - Assert the CV in the entry fetched from channel cache matches the sync data CV and the bucket UUID on the database context
func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyDCP, base.KeyCache, base.KeyHTTP)
db, ctx := setupTestDB(t)
defer db.Close(ctx)
collection := GetSingleDatabaseCollectionWithUser(t, db)
collectionID := collection.GetCollectionID()
bucketUUID := db.BucketUUID
collection.ChannelMapper = channels.NewChannelMapper(ctx, channels.DocChannelsSyncFunction, db.Options.JavascriptTimeout)

// Make channel active
_, err := db.channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), getChangesOptionsWithZeroSeq(t))
require.NoError(t, err)

// Put a doc that gets assigned a CV to populate the channel cache with
_, _, err = collection.Put(ctx, "doc1", Body{"channels": []string{"ABC"}})
require.NoError(t, err)
err = collection.WaitForPendingChanges(base.TestCtx(t))
require.NoError(t, err)

syncData, err := collection.GetDocSyncData(ctx, "doc1")
require.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)

// get entry of above doc from channel cache
entries, err := db.channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), getChangesOptionsWithZeroSeq(t))
require.NoError(t, err)
require.NotNil(t, entries)

// assert that the source and version has been populated with the channel cache entry for the doc
assert.Equal(t, "doc1", entries[0].DocID)
assert.Equal(t, uintCAS, entries[0].Version)
assert.Equal(t, bucketUUID, entries[0].SourceID)
assert.Equal(t, syncData.HLV.SourceID, entries[0].SourceID)
assert.Equal(t, syncData.HLV.Version, entries[0].Version)
}
17 changes: 17 additions & 0 deletions db/channel_cache_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,23 @@ func verifyChannelDocIDs(entries []*LogEntry, docIDs []string) bool {
return true
}

type cvValues struct {
source string
version uint64
}

func verifyCVEntries(entries []*LogEntry, cvs []cvValues) bool {
for index, cv := range cvs {
if entries[index].SourceID != cv.source {
return false
}
if entries[index].Version != cv.version {
return false
}
}
return true
}

func writeEntries(entries []*LogEntry) {
for index, entry := range entries {
log.Printf("%d:seq=%d, docID=%s, revID=%s", index, entry.Sequence, entry.DocID, entry.RevID)
Expand Down
52 changes: 52 additions & 0 deletions db/channel_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,58 @@ func TestChannelCacheMaxSize(t *testing.T) {
assert.Equal(t, 4, int(maxEntries))
}

// TestChannelCacheCurrentVersion:
// - Makes channel channels active for channels used in test by requesting changes on each channel
// - Add 4 docs to the channel cache with CV defined in the log entry
// - Get changes for each channel in question and assert that the CV is populated in each entry expected
func TestChannelCacheCurrentVersion(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

cache := db.changeCache.getChannelCache()

collectionID := GetSingleDatabaseCollection(t, db.DatabaseContext).GetCollectionID()

// Make channels active
_, err := cache.GetChanges(ctx, channels.NewID("chanA", collectionID), getChangesOptionsWithCtxOnly(t))
require.NoError(t, err)
_, err = cache.GetChanges(ctx, channels.NewID("chanB", collectionID), getChangesOptionsWithCtxOnly(t))
require.NoError(t, err)
_, err = cache.GetChanges(ctx, channels.NewID("chanC", collectionID), getChangesOptionsWithCtxOnly(t))
require.NoError(t, err)
_, err = cache.GetChanges(ctx, channels.NewID("chanD", collectionID), getChangesOptionsWithCtxOnly(t))
require.NoError(t, err)

cache.AddToCache(ctx, testLogEntryWithCV(1, "doc1", "1-a", []string{"chanB", "chanC", "chanD"}, collectionID, "test1", 123))
cache.AddToCache(ctx, testLogEntryWithCV(2, "doc2", "1-a", []string{"chanB", "chanC", "chanD"}, collectionID, "test2", 1234))
cache.AddToCache(ctx, testLogEntryWithCV(3, "doc3", "1-a", []string{"chanC", "chanD"}, collectionID, "test3", 12345))
cache.AddToCache(ctx, testLogEntryWithCV(4, "doc4", "1-a", []string{"chanC"}, collectionID, "test4", 123456))

// assert on channel cache entries for 'chanC'
entriesChanC, err := cache.GetChanges(ctx, channels.NewID("chanC", collectionID), getChangesOptionsWithZeroSeq(t))
assert.NoError(t, err)
require.Len(t, entriesChanC, 4)
assert.True(t, verifyChannelSequences(entriesChanC, []uint64{1, 2, 3, 4}))
assert.True(t, verifyChannelDocIDs(entriesChanC, []string{"doc1", "doc2", "doc3", "doc4"}))
assert.True(t, verifyCVEntries(entriesChanC, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}, {source: "test3", version: 12345}, {source: "test4", version: 123456}}))

// assert on channel cache entries for 'chanD'
entriesChanD, err := cache.GetChanges(ctx, channels.NewID("chanD", collectionID), getChangesOptionsWithZeroSeq(t))
assert.NoError(t, err)
require.Len(t, entriesChanD, 3)
assert.True(t, verifyChannelSequences(entriesChanD, []uint64{1, 2, 3}))
assert.True(t, verifyChannelDocIDs(entriesChanD, []string{"doc1", "doc2", "doc3"}))
assert.True(t, verifyCVEntries(entriesChanD, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}, {source: "test3", version: 12345}}))

// assert on channel cache entries for 'chanB'
entriesChanB, err := cache.GetChanges(ctx, channels.NewID("chanB", collectionID), getChangesOptionsWithZeroSeq(t))
assert.NoError(t, err)
require.Len(t, entriesChanB, 2)
assert.True(t, verifyChannelSequences(entriesChanB, []uint64{1, 2}))
assert.True(t, verifyChannelDocIDs(entriesChanB, []string{"doc1", "doc2"}))
assert.True(t, verifyCVEntries(entriesChanB, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}}))
}

func getCacheUtilization(stats *base.CacheStats) (active, tombstones, removals int) {
active = int(stats.ChannelCacheRevsActive.Value())
tombstones = int(stats.ChannelCacheRevsTombstone.Value())
Expand Down
4 changes: 4 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ func (dbc *DatabaseContext) CollectionChannelViewForTest(tb testing.TB, collecti
return collection.getChangesInChannelFromQuery(base.TestCtx(tb), channelName, startSeq, endSeq, 0, false)
}

func (db *DatabaseContext) GetChannelCache() ChannelCache {
return db.channelCache
}

// Test-only version of GetPrincipal that doesn't trigger channel/role recalculation
func (dbc *DatabaseContext) GetPrincipalForTest(tb testing.TB, name string, isUser bool) (info *auth.PrincipalConfig, err error) {
ctx := base.TestCtx(tb)
Expand Down

0 comments on commit 45b1d3f

Please sign in to comment.