Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

cache: add namespace to scores #864

Merged
merged 3 commits into from
Oct 15, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion master/rest.go
Original file line number Diff line number Diff line change
Expand Up @@ -881,7 +881,7 @@ func (m *Master) searchDocuments(collection, subset, category string, request *r
return
}
// Get the popular list
scores, err := m.CacheClient.SearchDocuments(ctx, collection, subset, []string{category}, offset, m.Config.Recommend.CacheSize)
scores, err := m.CacheClient.SearchScores(ctx, "", collection, subset, "", []string{category}, offset, m.Config.Recommend.CacheSize)
if err != nil {
server.InternalServerError(response, err)
return
Expand Down
20 changes: 10 additions & 10 deletions master/rest_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -648,14 +648,14 @@ func TestServer_SearchDocumentsOfItems(t *testing.T) {
for i, operator := range operators {
t.Run(operator.Name, func(t *testing.T) {
// Put scores
scores := []cache.Document{
scores := []cache.Score{
{Id: strconv.Itoa(i) + "0", Score: 100, Categories: []string{operator.Category}},
{Id: strconv.Itoa(i) + "1", Score: 99, Categories: []string{operator.Category}},
{Id: strconv.Itoa(i) + "2", Score: 98, Categories: []string{operator.Category}},
{Id: strconv.Itoa(i) + "3", Score: 97, Categories: []string{operator.Category}},
{Id: strconv.Itoa(i) + "4", Score: 96, Categories: []string{operator.Category}},
}
err := s.CacheClient.AddDocuments(ctx, operator.Collection, operator.Subset, scores)
err := s.CacheClient.AddScores(ctx, "", operator.Collection, operator.Subset, scores)
assert.NoError(t, err)
items := make([]ScoredItem, 0)
for _, score := range scores {
Expand Down Expand Up @@ -699,14 +699,14 @@ func TestServer_SearchDocumentsOfUsers(t *testing.T) {
for _, operator := range operators {
t.Logf("test RESTful API: %v", operator.Get)
// Put scores
scores := []cache.Document{
scores := []cache.Score{
{Id: "0", Score: 100, Categories: []string{""}},
{Id: "1", Score: 99, Categories: []string{""}},
{Id: "2", Score: 98, Categories: []string{""}},
{Id: "3", Score: 97, Categories: []string{""}},
{Id: "4", Score: 96, Categories: []string{""}},
}
err := s.CacheClient.AddDocuments(ctx, operator.Prefix, operator.Label, scores)
err := s.CacheClient.AddScores(ctx, "", operator.Prefix, operator.Label, scores)
assert.NoError(t, err)
users := make([]ScoreUser, 0)
for _, score := range scores {
Expand Down Expand Up @@ -758,7 +758,7 @@ func TestServer_GetRecommends(t *testing.T) {
s, cookie := newMockServer(t)
defer s.Close(t)
// inset recommendation
itemIds := []cache.Document{
itemIds := []cache.Score{
{Id: "1", Score: 99, Categories: []string{""}},
{Id: "2", Score: 98, Categories: []string{""}},
{Id: "3", Score: 97, Categories: []string{""}},
Expand All @@ -769,7 +769,7 @@ func TestServer_GetRecommends(t *testing.T) {
{Id: "8", Score: 92, Categories: []string{""}},
}
ctx := context.Background()
err := s.CacheClient.AddDocuments(ctx, cache.OfflineRecommend, "0", itemIds)
err := s.CacheClient.AddScores(ctx, "", cache.OfflineRecommend, "0", itemIds)
assert.NoError(t, err)
// insert feedback
feedback := []data.Feedback{
Expand Down Expand Up @@ -825,14 +825,14 @@ func TestMaster_Purge(t *testing.T) {
assert.NoError(t, err)
assert.ElementsMatch(t, []string{"a", "b", "c"}, set)

err = s.CacheClient.AddDocuments(ctx, "sorted", "", []cache.Document{
err = s.CacheClient.AddScores(ctx, "", "sorted", "", []cache.Score{
{Id: "a", Score: 1, Categories: []string{""}},
{Id: "b", Score: 2, Categories: []string{""}},
{Id: "c", Score: 3, Categories: []string{""}}})
assert.NoError(t, err)
z, err := s.CacheClient.SearchDocuments(ctx, "sorted", "", []string{""}, 0, -1)
z, err := s.CacheClient.SearchScores(ctx, "", "sorted", "", "", []string{""}, 0, -1)
assert.NoError(t, err)
assert.ElementsMatch(t, []cache.Document{
assert.ElementsMatch(t, []cache.Score{
{Id: "a", Score: 1, Categories: []string{""}},
{Id: "b", Score: 2, Categories: []string{""}},
{Id: "c", Score: 3, Categories: []string{""}}}, z)
Expand Down Expand Up @@ -869,7 +869,7 @@ func TestMaster_Purge(t *testing.T) {
set, err = s.CacheClient.GetSet(ctx, "set")
assert.NoError(t, err)
assert.Empty(t, set)
z, err = s.CacheClient.SearchDocuments(ctx, "sorted", "", []string{""}, 0, -1)
z, err = s.CacheClient.SearchScores(ctx, "", "sorted", "", "", []string{""}, 0, -1)
assert.NoError(t, err)
assert.Empty(t, z)

Expand Down
28 changes: 14 additions & 14 deletions master/tasks.go
Original file line number Diff line number Diff line change
Expand Up @@ -91,21 +91,21 @@ func (m *Master) runLoadDatasetTask() error {
}

// save popular items to cache
if err = m.CacheClient.AddDocuments(ctx, cache.PopularItems, "", popularItems.ToSlice()); err != nil {
if err = m.CacheClient.AddScores(ctx, "", cache.PopularItems, "", popularItems.ToSlice()); err != nil {
log.Logger().Error("failed to cache popular items", zap.Error(err))
}
if err = m.CacheClient.DeleteDocuments(ctx, []string{cache.PopularItems}, cache.DocumentCondition{Before: &popularItems.Timestamp}); err != nil {
if err = m.CacheClient.DeleteScores(ctx, "", []string{cache.PopularItems}, cache.ScoreCondition{Before: &popularItems.Timestamp}); err != nil {
log.Logger().Error("failed to reclaim outdated items", zap.Error(err))
}
if err = m.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.LastUpdatePopularItemsTime), time.Now())); err != nil {
log.Logger().Error("failed to write latest update popular items time", zap.Error(err))
}

// save the latest items to cache
if err = m.CacheClient.AddDocuments(ctx, cache.LatestItems, "", latestItems.ToSlice()); err != nil {
if err = m.CacheClient.AddScores(ctx, "", cache.LatestItems, "", latestItems.ToSlice()); err != nil {
log.Logger().Error("failed to cache latest items", zap.Error(err))
}
if err = m.CacheClient.DeleteDocuments(ctx, []string{cache.LatestItems}, cache.DocumentCondition{Before: &latestItems.Timestamp}); err != nil {
if err = m.CacheClient.DeleteScores(ctx, "", []string{cache.LatestItems}, cache.ScoreCondition{Before: &latestItems.Timestamp}); err != nil {
log.Logger().Error("failed to reclaim outdated items", zap.Error(err))
}
if err = m.CacheClient.Set(ctx, cache.Time(cache.Key(cache.GlobalMeta, cache.LastUpdateLatestItemsTime), time.Now())); err != nil {
Expand Down Expand Up @@ -397,10 +397,10 @@ func (m *Master) findItemNeighborsBruteForce(dataset *ranking.DataSet, labeledIt
}
aggregator.Add(category, recommends, scores)
}
if err := m.CacheClient.AddDocuments(ctx, cache.ItemNeighbors, itemId, aggregator.ToSlice()); err != nil {
if err := m.CacheClient.AddScores(ctx, "", cache.ItemNeighbors, itemId, aggregator.ToSlice()); err != nil {
return errors.Trace(err)
}
if err := m.CacheClient.DeleteDocuments(ctx, []string{cache.ItemNeighbors}, cache.DocumentCondition{
if err := m.CacheClient.DeleteScores(ctx, "", []string{cache.ItemNeighbors}, cache.ScoreCondition{
Subset: proto.String(itemId),
Before: &aggregator.Timestamp,
}); err != nil {
Expand Down Expand Up @@ -502,10 +502,10 @@ func (m *Master) findItemNeighborsIVF(dataset *ranking.DataSet, labelIDF, userID
aggregator.Add(category, resultValues, resultScores)
}
}
if err := m.CacheClient.AddDocuments(ctx, cache.ItemNeighbors, itemId, aggregator.ToSlice()); err != nil {
if err := m.CacheClient.AddScores(ctx, "", cache.ItemNeighbors, itemId, aggregator.ToSlice()); err != nil {
return errors.Trace(err)
}
if err := m.CacheClient.DeleteDocuments(ctx, []string{cache.ItemNeighbors}, cache.DocumentCondition{
if err := m.CacheClient.DeleteScores(ctx, "", []string{cache.ItemNeighbors}, cache.ScoreCondition{
Subset: proto.String(itemId),
Before: &aggregator.Timestamp,
}); err != nil {
Expand Down Expand Up @@ -716,10 +716,10 @@ func (m *Master) findUserNeighborsBruteForce(ctx context.Context, dataset *ranki
}
aggregator := cache.NewDocumentAggregator(startSearchTime)
aggregator.Add("", recommends, scores)
if err := m.CacheClient.AddDocuments(ctx, cache.UserNeighbors, userId, aggregator.ToSlice()); err != nil {
if err := m.CacheClient.AddScores(ctx, "", cache.UserNeighbors, userId, aggregator.ToSlice()); err != nil {
return errors.Trace(err)
}
if err := m.CacheClient.DeleteDocuments(ctx, []string{cache.UserNeighbors}, cache.DocumentCondition{
if err := m.CacheClient.DeleteScores(ctx, "", []string{cache.UserNeighbors}, cache.ScoreCondition{
Subset: proto.String(userId),
Before: &aggregator.Timestamp,
}); err != nil {
Expand Down Expand Up @@ -808,10 +808,10 @@ func (m *Master) findUserNeighborsIVF(ctx context.Context, dataset *ranking.Data
}
aggregator := cache.NewDocumentAggregator(startSearchTime)
aggregator.Add("", resultValues, resultScores)
if err := m.CacheClient.AddDocuments(ctx, cache.UserNeighbors, userId, aggregator.ToSlice()); err != nil {
if err := m.CacheClient.AddScores(ctx, "", cache.UserNeighbors, userId, aggregator.ToSlice()); err != nil {
return errors.Trace(err)
}
if err := m.CacheClient.DeleteDocuments(ctx, []string{cache.UserNeighbors}, cache.DocumentCondition{
if err := m.CacheClient.DeleteScores(ctx, "", []string{cache.UserNeighbors}, cache.ScoreCondition{
Subset: proto.String(userId),
Before: &aggregator.Timestamp,
}); err != nil {
Expand Down Expand Up @@ -872,7 +872,7 @@ func (m *Master) checkUserNeighborCacheTimeout(userId string) bool {
)
ctx := context.Background()
// check cache
if items, err := m.CacheClient.SearchDocuments(ctx, cache.UserNeighbors, userId, []string{""}, 0, -1); err != nil {
if items, err := m.CacheClient.SearchScores(ctx, "", cache.UserNeighbors, userId, "", []string{""}, 0, -1); err != nil {
log.Logger().Error("failed to load user neighbors", zap.String("user_id", userId), zap.Error(err))
return true
} else if len(items) == 0 {
Expand Down Expand Up @@ -927,7 +927,7 @@ func (m *Master) checkItemNeighborCacheTimeout(itemId string, categories []strin

// check cache
for _, category := range append([]string{""}, categories...) {
items, err := m.CacheClient.SearchDocuments(ctx, cache.ItemNeighbors, itemId, []string{category}, 0, -1)
items, err := m.CacheClient.SearchScores(ctx, "", cache.ItemNeighbors, itemId, "", []string{category}, 0, -1)
if err != nil {
log.Logger().Error("failed to load item neighbors", zap.String("item_id", itemId), zap.Error(err))
return true
Expand Down
Loading
Loading