diff --git a/channels/log_entry.go b/channels/log_entry.go index c5eb7f405a..655de3b23a 100644 --- a/channels/log_entry.go +++ b/channels/log_entry.go @@ -54,6 +54,8 @@ type LogEntry struct { PrevSequence uint64 // Sequence of previous active revision IsPrincipal bool // Whether the log-entry is a tracking entry for a principal doc CollectionID uint32 // Collection ID + SourceID string + Version uint64 } func (l LogEntry) String() string { diff --git a/db/change_cache.go b/db/change_cache.go index 8f42deeb1e..493e82d8a0 100644 --- a/db/change_cache.go +++ b/db/change_cache.go @@ -508,6 +508,8 @@ func (c *changeCache) DocChanged(event sgbucket.FeedEvent) { TimeSaved: syncData.TimeSaved, Channels: syncData.Channels, CollectionID: event.CollectionID, + SourceID: syncData.HLV.SourceID, + Version: syncData.HLV.Version, } millisecondLatency := int(feedLatency / time.Millisecond) diff --git a/db/change_cache_test.go b/db/change_cache_test.go index 5306f509a8..12b7ec3529 100644 --- a/db/change_cache_test.go +++ b/db/change_cache_test.go @@ -74,6 +74,24 @@ func logEntry(seq uint64, docid string, revid string, channelNames []string, col return entry } +func testLogEntryWithCV(seq uint64, docid string, revid string, channelNames []string, collectionID uint32, sourceID string, version uint64) *LogEntry { + entry := &LogEntry{ + Sequence: seq, + DocID: docid, + RevID: revid, + TimeReceived: time.Now(), + CollectionID: collectionID, + SourceID: sourceID, + Version: version, + } + channelMap := make(channels.ChannelMap) + for _, channelName := range channelNames { + channelMap[channelName] = nil + } + entry.Channels = channelMap + return entry +} + func TestSkippedSequenceList(t *testing.T) { skipList := NewSkippedSequenceList() diff --git a/db/channel_cache_single_test.go b/db/channel_cache_single_test.go index 7b2c883b92..d0431ab4c9 100644 --- a/db/channel_cache_single_test.go +++ b/db/channel_cache_single_test.go @@ -951,6 +951,23 @@ func verifyChannelDocIDs(entries []*LogEntry, docIDs []string) bool { return true } +type cvValues struct { + source string + version uint64 +} + +func verifyCVEntries(entries []*LogEntry, cvs []cvValues) bool { + for index, cv := range cvs { + if entries[index].SourceID != cv.source { + return false + } + if entries[index].Version != cv.version { + return false + } + } + return true +} + func writeEntries(entries []*LogEntry) { for index, entry := range entries { log.Printf("%d:seq=%d, docID=%s, revID=%s", index, entry.Sequence, entry.DocID, entry.RevID) diff --git a/db/channel_cache_test.go b/db/channel_cache_test.go index 4637539848..4cc482bed5 100644 --- a/db/channel_cache_test.go +++ b/db/channel_cache_test.go @@ -53,6 +53,54 @@ func TestChannelCacheMaxSize(t *testing.T) { assert.Equal(t, 4, int(maxEntries)) } +func TestChannelCacheCurrentVersion(t *testing.T) { + db, ctx := setupTestDB(t) + defer db.Close(ctx) + + cache := db.changeCache.getChannelCache() + + collectionID := GetSingleDatabaseCollection(t, db.DatabaseContext).GetCollectionID() + + // Make channels active + _, err := cache.GetChanges(ctx, channels.NewID("chanA", collectionID), getChangesOptionsWithCtxOnly(t)) + require.NoError(t, err) + _, err = cache.GetChanges(ctx, channels.NewID("chanB", collectionID), getChangesOptionsWithCtxOnly(t)) + require.NoError(t, err) + _, err = cache.GetChanges(ctx, channels.NewID("chanC", collectionID), getChangesOptionsWithCtxOnly(t)) + require.NoError(t, err) + _, err = cache.GetChanges(ctx, channels.NewID("chanD", collectionID), getChangesOptionsWithCtxOnly(t)) + require.NoError(t, err) + + cache.AddToCache(ctx, testLogEntryWithCV(1, "doc1", "1-a", []string{"chanB", "chanC", "chanD"}, collectionID, "test1", 123)) + cache.AddToCache(ctx, testLogEntryWithCV(2, "doc2", "1-a", []string{"chanB", "chanC", "chanD"}, collectionID, "test2", 1234)) + cache.AddToCache(ctx, testLogEntryWithCV(3, "doc3", "1-a", []string{"chanC", "chanD"}, collectionID, "test3", 12345)) + cache.AddToCache(ctx, testLogEntryWithCV(4, "doc4", "1-a", []string{"chanC"}, collectionID, "test4", 123456)) + + // assert on channel cache entries for 'chanC' + entriesChanC, err := cache.GetChanges(ctx, channels.NewID("chanC", collectionID), getChangesOptionsWithZeroSeq(t)) + assert.NoError(t, err) + require.Len(t, entriesChanC, 4) + assert.True(t, verifyChannelSequences(entriesChanC, []uint64{1, 2, 3, 4})) + assert.True(t, verifyChannelDocIDs(entriesChanC, []string{"doc1", "doc2", "doc3", "doc4"})) + assert.True(t, verifyCVEntries(entriesChanC, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}, {source: "test3", version: 12345}, {source: "test4", version: 123456}})) + + // assert on channel cache entries for 'chanD' + entriesChanD, err := cache.GetChanges(ctx, channels.NewID("chanD", collectionID), getChangesOptionsWithZeroSeq(t)) + assert.NoError(t, err) + require.Len(t, entriesChanD, 3) + assert.True(t, verifyChannelSequences(entriesChanD, []uint64{1, 2, 3})) + assert.True(t, verifyChannelDocIDs(entriesChanD, []string{"doc1", "doc2", "doc3"})) + assert.True(t, verifyCVEntries(entriesChanD, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}, {source: "test3", version: 12345}})) + + // assert on channel cache entries for 'chanB' + entriesChanB, err := cache.GetChanges(ctx, channels.NewID("chanB", collectionID), getChangesOptionsWithZeroSeq(t)) + assert.NoError(t, err) + require.Len(t, entriesChanB, 2) + assert.True(t, verifyChannelSequences(entriesChanB, []uint64{1, 2})) + assert.True(t, verifyChannelDocIDs(entriesChanB, []string{"doc1", "doc2"})) + assert.True(t, verifyCVEntries(entriesChanB, []cvValues{{source: "test1", version: 123}, {source: "test2", version: 1234}})) +} + func getCacheUtilization(stats *base.CacheStats) (active, tombstones, removals int) { active = int(stats.ChannelCacheRevsActive.Value()) tombstones = int(stats.ChannelCacheRevsTombstone.Value()) diff --git a/db/util_testing.go b/db/util_testing.go index 94b2a802e2..9a072e1437 100644 --- a/db/util_testing.go +++ b/db/util_testing.go @@ -459,6 +459,10 @@ func (dbc *DatabaseContext) CollectionChannelViewForTest(tb testing.TB, collecti return collection.getChangesInChannelFromQuery(base.TestCtx(tb), channelName, startSeq, endSeq, 0, false) } +func (db *DatabaseContext) GetChannelCache() ChannelCache { + return db.channelCache +} + // Test-only version of GetPrincipal that doesn't trigger channel/role recalculation func (dbc *DatabaseContext) GetPrincipalForTest(tb testing.TB, name string, isUser bool) (info *auth.PrincipalConfig, err error) { ctx := base.TestCtx(tb) diff --git a/rest/changes_test.go b/rest/changes_test.go index 0673ccfc7d..564bd2f3ec 100644 --- a/rest/changes_test.go +++ b/rest/changes_test.go @@ -19,6 +19,7 @@ import ( "time" "github.com/couchbase/sync_gateway/base" + "github.com/couchbase/sync_gateway/channels" "github.com/couchbase/sync_gateway/db" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" @@ -273,3 +274,44 @@ func TestWebhookWinningRevChangedEvent(t *testing.T) { assert.Equal(t, 5, int(atomic.LoadUint32(&WinningRevChangedCount))) assert.Equal(t, 6, int(atomic.LoadUint32(&DocumentChangedCount))) } + +func TestCurrentVersionPopulationOnChannelCache(t *testing.T) { + base.SetUpTestLogging(t, base.LevelDebug, base.KeyCRUD, base.KeyImport, base.KeyDCP, base.KeyCache, base.KeyHTTP) + rt := NewRestTester(t, &RestTesterConfig{ + SyncFn: channels.DocChannelsSyncFunction, + }) + defer rt.Close() + channelCache := rt.GetDatabase().GetChannelCache() + ctx := base.TestCtx(t) + collectionID := rt.GetSingleTestDatabaseCollection().GetCollectionID() + bucketUUID := rt.GetDatabase().BucketUUID + + // Make channel active + _, err := channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), db.ChangesOptions{ChangesCtx: ctx}) + require.NoError(t, err) + + // Put a doc that gets assigned a CV to populate the channel cache with + resp := rt.SendAdminRequest(http.MethodPut, "/{{.keyspace}}/doc1", `{"channels": ["ABC"]}`) + RequireStatus(t, resp, http.StatusCreated) + + syncData, err := rt.GetSingleTestDatabaseCollection().GetDocSyncData(ctx, "doc1") + require.NoError(t, err) + uintCAS := base.HexCasToUint64(syncData.Cas) + + var logEntries []*db.LogEntry + // get entry of above doc from channel cache + err = rt.WaitForConditionWithOptions(func() bool { + entries, err := channelCache.GetChanges(ctx, channels.NewID("ABC", collectionID), db.ChangesOptions{ChangesCtx: ctx}) + require.NoError(t, err) + logEntries = entries + return len(entries) != 0 + }, 100, 1000) + require.NoError(t, err) + + // assert that the source and version has been populated with the channel cache entry for the doc + assert.Equal(t, "doc1", logEntries[0].DocID) + assert.Equal(t, uintCAS, logEntries[0].Version) + assert.Equal(t, bucketUUID, logEntries[0].SourceID) + assert.Equal(t, syncData.HLV.SourceID, logEntries[0].SourceID) + assert.Equal(t, syncData.HLV.Version, logEntries[0].Version) +}