Skip to content

Commit

Permalink
CBG-3255: Add current version to log entry for population on the chan…
Browse files Browse the repository at this point in the history
…nel cache. Pre-requisite for my work on adding CV to change entries. Only adds CV to log entry from docs seen over DCP at this time pending work on channel cache backfill
  • Loading branch information
gregns1 committed Nov 10, 2023
1 parent 1de3947 commit 7cefd23
Show file tree
Hide file tree
Showing 7 changed files with 133 additions and 0 deletions.
2 changes: 2 additions & 0 deletions channels/log_entry.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@ type LogEntry struct {
PrevSequence uint64 // Sequence of previous active revision
IsPrincipal bool // Whether the log-entry is a tracking entry for a principal doc
CollectionID uint32 // Collection ID
SourceID string
Version uint64
}

func (l LogEntry) String() string {
Expand Down
2 changes: 2 additions & 0 deletions db/change_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -508,6 +508,8 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) {
TimeSaved: syncData.TimeSaved,
Channels: syncData.Channels,
CollectionID: event.CollectionID,
SourceID: syncData.HLV.SourceID,
Version: syncData.HLV.Version,
}

millisecondLatency := int(feedLatency / time.Millisecond)
Expand Down
18 changes: 18 additions & 0 deletions db/change_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -74,6 +74,24 @@ func logEntry(seq uint64, docid string, revid string, channelNames []string, col
return entry
}

func testLogEntryWithCV(seq uint64, docid string, revid string, channelNames []string, collectionID uint32, sourceID string, version uint64) *LogEntry {
entry := &LogEntry{
Sequence: seq,
DocID: docid,
RevID: revid,
TimeReceived: time.Now(),
CollectionID: collectionID,
SourceID: sourceID,
Version: version,
}
channelMap := make(channels.ChannelMap)
for _, channelName := range channelNames {
channelMap[channelName] = nil
}
entry.Channels = channelMap
return entry
}

func TestSkippedSequenceList(t *testing.T) {

skipList := NewSkippedSequenceList()
Expand Down
17 changes: 17 additions & 0 deletions db/channel_cache_single_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -951,6 +951,23 @@ func verifyChannelDocIDs(entries []*LogEntry, docIDs []string) bool {
return true
}

type cvValues struct {
source string
version uint64
}

func verifyCVEntries(entries []*LogEntry, cvs []cvValues) bool {
for index, cv := range cvs {
if entries[index].SourceID != cv.source {
return false
}
if entries[index].Version != cv.version {
return false
}
}
return true
}

func writeEntries(entries []*LogEntry) {
for index, entry := range entries {
log.Printf("%d:seq=%d, docID=%s, revID=%s", index, entry.Sequence, entry.DocID, entry.RevID)
Expand Down
48 changes: 48 additions & 0 deletions db/channel_cache_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,54 @@ func TestChannelCacheMaxSize(t *testing.T) {
assert.Equal(t, 4, int(maxEntries))
}

func TestChannelCacheCurrentVersion(t *testing.T) {
db, ctx := setupTestDB(t)
defer db.Close(ctx)

cache := db.changeCache.getChannelCache()

collectionID := GetSingleDatabaseCollection(t, db.DatabaseContext).GetCollectionID()

// Make channels active
_, err := cache.GetChanges(ctx, channels.NewID("chanA", collectionID), getChangesOptionsWithCtxOnly(t))
require.NoError(t, err)
_, err = cache.GetChanges(ctx, channels.NewID("chanB", collectionID), getChangesOptionsWithCtxOnly(t))
require.NoError(t, err)
_, err = cache.GetChanges(ctx, channels.NewID("chanC", collectionID), getChangesOptionsWithCtxOnly(t))
require.NoError(t, err)
_, err = cache.GetChanges(ctx, channels.NewID("chanD", collectionID), getChangesOptionsWithCtxOnly(t))
require.NoError(t, err)

cache.AddToCache(ctx, testLogEntryWithCV(1, "doc1", "1-a", []string{"chanB", "chanC", "chanD"}, collectionID, "test1", 123))
cache.AddToCache(ctx, testLogEntryWithCV(2, "doc2", "1-a", []string{"chanB", "chanC", "chanD"}, collectionID, "test2", 1234))
cache.AddToCache(ctx, testLogEntryWithCV(3, "doc3", "1-a", []string{"chanC", "chanD"}, collectionID, "test3", 12345))
cache.AddToCache(ctx, testLogEntryWithCV(4, "doc4", "1-a", []string{"chanC"}, collectionID, "test4", 123456))

// assert on channel cache entries for 'chanC'
entriesChanC, err := cache.GetChanges(ctx, channels.NewID("chanC", collectionID), getChangesOptionsWithZeroSeq(t))
assert.NoError(t, err)
require.Len(t, entriesChanC, 4)
assert.True(t, verifyChannelSequences(entriesChanC, []uint64{1, 2, 3, 4}))
assert.True(t, verifyChannelDocIDs(entriesChanC, []string{"doc1", "doc2", "doc3", "doc4"}))
assert.True(t, verifyCVEntries(entriesChanC, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}, {source: "test3", version: 12345}, {source: "test4", version: 123456}}))

// assert on channel cache entries for 'chanD'
entriesChanD, err := cache.GetChanges(ctx, channels.NewID("chanD", collectionID), getChangesOptionsWithZeroSeq(t))
assert.NoError(t, err)
require.Len(t, entriesChanD, 3)
assert.True(t, verifyChannelSequences(entriesChanD, []uint64{1, 2, 3}))
assert.True(t, verifyChannelDocIDs(entriesChanD, []string{"doc1", "doc2", "doc3"}))
assert.True(t, verifyCVEntries(entriesChanD, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}, {source: "test3", version: 12345}}))

// assert on channel cache entries for 'chanB'
entriesChanB, err := cache.GetChanges(ctx, channels.NewID("chanB", collectionID), getChangesOptionsWithZeroSeq(t))
assert.NoError(t, err)
require.Len(t, entriesChanB, 2)
assert.True(t, verifyChannelSequences(entriesChanB, []uint64{1, 2}))
assert.True(t, verifyChannelDocIDs(entriesChanB, []string{"doc1", "doc2"}))
assert.True(t, verifyCVEntries(entriesChanB, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}}))
}

func getCacheUtilization(stats *base.CacheStats) (active, tombstones, removals int) {
active = int(stats.ChannelCacheRevsActive.Value())
tombstones = int(stats.ChannelCacheRevsTombstone.Value())
Expand Down
4 changes: 4 additions & 0 deletions db/util_testing.go
Original file line number Diff line number Diff line change
Expand Up @@ -459,6 +459,10 @@ func (dbc *DatabaseContext) CollectionChannelViewForTest(tb testing.TB, collecti
return collection.getChangesInChannelFromQuery(base.TestCtx(tb), channelName, startSeq, endSeq, 0, false)
}

func (db *DatabaseContext) GetChannelCache() ChannelCache {
return db.channelCache
}

// Test-only version of GetPrincipal that doesn't trigger channel/role recalculation
func (dbc *DatabaseContext) GetPrincipalForTest(tb testing.TB, name string, isUser bool) (info *auth.PrincipalConfig, err error) {
ctx := base.TestCtx(tb)
Expand Down
42 changes: 42 additions & 0 deletions rest/changes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"time"

"github.com/couchbase/sync_gateway/base"
"github.com/couchbase/sync_gateway/channels"
"github.com/couchbase/sync_gateway/db"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
Expand Down Expand Up @@ -273,3 +274,44 @@ func TestWebhookWinningRevChangedEvent(t *testing.T) {
assert.Equal(t, 5, int(atomic.LoadUint32(&WinningRevChangedCount)))
assert.Equal(t, 6, int(atomic.LoadUint32(&DocumentChangedCount)))
}

func TestCurrentVersionPopulationOnChannelCache(t *testing.T) {
base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyDCP, base.KeyCache, base.KeyHTTP)
rt := NewRestTester(t, &RestTesterConfig{
SyncFn: channels.DocChannelsSyncFunction,
})
defer rt.Close()
channelCache := rt.GetDatabase().GetChannelCache()
ctx := base.TestCtx(t)
collectionID := rt.GetSingleTestDatabaseCollection().GetCollectionID()
bucketUUID := rt.GetDatabase().BucketUUID

// Make channel active
_, err := channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), db.ChangesOptions{ChangesCtx: ctx})
require.NoError(t, err)

// Put a doc that gets assigned a CV to populate the channel cache with
resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"channels": ["ABC"]}`)
RequireStatus(t, resp, http.StatusCreated)

syncData, err := rt.GetSingleTestDatabaseCollection().GetDocSyncData(ctx, "doc1")
require.NoError(t, err)
uintCAS := base.HexCasToUint64(syncData.Cas)

var logEntries []*db.LogEntry
// get entry of above doc from channel cache
err = rt.WaitForConditionWithOptions(func() bool {
entries, err := channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), db.ChangesOptions{ChangesCtx: ctx})
require.NoError(t, err)
logEntries = entries
return len(entries) != 0
}, 100, 1000)
require.NoError(t, err)

// assert that the source and version has been populated with the channel cache entry for the doc
assert.Equal(t, "doc1", logEntries[0].DocID)
assert.Equal(t, uintCAS, logEntries[0].Version)
assert.Equal(t, bucketUUID, logEntries[0].SourceID)
assert.Equal(t, syncData.HLV.SourceID, logEntries[0].SourceID)
assert.Equal(t, syncData.HLV.Version, logEntries[0].Version)
}

0 comments on commit 7cefd23

Please sign in to comment.