Skip to content

Commit

Permalink
Improve removeExistingDocuments performance by adding cache
Browse files Browse the repository at this point in the history
Signed-off-by: Gao Hongtao <[email protected]>
  • Loading branch information
hanahmily committed Dec 25, 2024
1 parent 279547f commit e54f64b
Show file tree
Hide file tree
Showing 11 changed files with 456 additions and 229 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,4 @@ vendor/**
/search/query/y.output
*.test
tags
*.prof
4 changes: 2 additions & 2 deletions batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,14 +20,14 @@ import (

const _idField = "_id"

type Identifier string
type Identifier []byte

func (i Identifier) Field() string {
return _idField
}

func (i Identifier) Term() []byte {
return []byte(i)
return i
}

// NewBatch creates a new empty batch.
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ require (
)

require (
github.com/VictoriaMetrics/fastcache v1.12.2 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect
github.com/golang/snappy v0.0.4 // indirect
github.com/inconshreveable/mousetrap v1.1.0 // indirect
github.com/klauspost/compress v1.17.11 // indirect
github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 // indirect
Expand Down
9 changes: 9 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,10 @@ github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv
github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90=
github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 h1:Ja62sgOCp2qPTd8Xmldv1U83v11IRIsh6KlB7UaFLj4=
github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118/go.mod h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI=
github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI=
github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI=
github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw=
github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM=
github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo=
github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM=
github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA=
Expand All @@ -22,6 +25,8 @@ github.com/blevesearch/vellum v1.0.10 h1:HGPJDT2bTva12hrHepVT3rOyIKFFF4t7Gf6yMxy
github.com/blevesearch/vellum v1.0.10/go.mod h1:ul1oT0FhSMDIExNjIxHqJoGpVrBpKCdgDQNxfqgJt7k=
github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds=
github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI=
github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44=
github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs=
github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o=
github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand All @@ -30,6 +35,8 @@ github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 h1:y7y0Oa6UawqTFP
github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw=
github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k=
github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k=
github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM=
github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q=
github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8=
github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw=
github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes=
Expand All @@ -47,6 +54,7 @@ github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3k
github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA=
github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg=
github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME=
github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI=
github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4=
github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg=
github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg=
Expand All @@ -58,6 +66,7 @@ golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL
golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA=
golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs=
golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg=
golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo=
golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA=
golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM=
Expand Down
6 changes: 6 additions & 0 deletions index/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,12 @@ func (b *Batch) Reset() {
b.persistedCallback = nil
b.unparsedDocuments = b.unparsedDocuments[:0]
b.unparsedIDs = b.unparsedIDs[:0]
b.fieldNames = b.fieldNames[:0]
}

func (b *Batch) ResetDoc() {
b.documents = b.documents[:0]
b.ids = b.ids[:0]
}

func (b *Batch) SetPersistedCallback(f func(error)) {
Expand Down
2 changes: 2 additions & 0 deletions index/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -78,6 +78,8 @@ type Config struct {
ValidateSnapshotCRC bool

virtualFields map[string][]segment.Field

CacheMaxBytes int
}

func (config Config) WithSegmentType(typ string) Config {
Expand Down
23 changes: 22 additions & 1 deletion index/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@ package index
import (
"reflect"
"sync/atomic"

"github.com/VictoriaMetrics/fastcache"
)

func (s *Writer) DirectoryStats() (numFilesOnDisk, numBytesUsedDisk uint64) {
Expand All @@ -30,7 +32,19 @@ func (s *Writer) Stats() Stats {
// Update the stats atomically
atomic.StoreUint64(&s.stats.CurOnDiskBytes, numBytesUsedDisk)
atomic.StoreUint64(&s.stats.CurOnDiskFiles, numFilesOnDisk)
return s.stats.Clone()
stats := s.stats.Clone()
c := s.cache.Load()
if c != nil {
var cs fastcache.Stats
c.UpdateStats(&cs)
stats.CacheGetCalls = cs.GetCalls
stats.CacheSetCalls = cs.SetCalls
stats.CacheMisses = cs.Misses
stats.CacheEntriesCount = cs.EntriesCount
stats.CacheBytesSize = cs.BytesSize
stats.CacheMaxBytesSize = cs.MaxBytesSize
}
return stats
}

// Stats tracks statistics about the index, fields that are
Expand Down Expand Up @@ -152,6 +166,13 @@ type Stats struct {
newSegBufBytesRemoved uint64
analysisBytesAdded uint64
analysisBytesRemoved uint64

CacheGetCalls uint64
CacheSetCalls uint64
CacheMisses uint64
CacheEntriesCount uint64
CacheBytesSize uint64
CacheMaxBytesSize uint64
}

func (s *Stats) ToMap() map[string]interface{} {
Expand Down
87 changes: 71 additions & 16 deletions index/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ import (
"sync/atomic"
"time"

"github.com/VictoriaMetrics/fastcache"
"github.com/bits-and-blooms/bitset"
segment "github.com/blugelabs/bluge_segment_api"

"github.com/RoaringBitmap/roaring"
Expand Down Expand Up @@ -53,6 +55,8 @@ type Writer struct {
asyncTasks sync.WaitGroup

closeOnce sync.Once

cache atomic.Pointer[fastcache.Cache]
}

func OpenWriter(config Config) (*Writer, error) {
Expand All @@ -63,6 +67,10 @@ func OpenWriter(config Config) (*Writer, error) {
closeCh: make(chan struct{}),
}

if config.CacheMaxBytes > 0 {
rv.cache.Store(fastcache.New(config.CacheMaxBytes))
}

// start the requested number of analysis workers
for i := 0; i < config.NumAnalysisWorkers; i++ {
config.GoFunc(func() {
Expand Down Expand Up @@ -195,10 +203,19 @@ func (s *Writer) fireAsyncError(err error) {
func (s *Writer) Close() (err error) {
s.closeOnce.Do(func() {
err = s.close()
s.ResetCache()
})
return err
}

func (s *Writer) ResetCache() {
c := s.cache.Load()
if c != nil {
c.Reset()
s.cache.Store(nil)
}
}

func (s *Writer) close() (err error) {
startTime := time.Now()
defer func() {
Expand Down Expand Up @@ -296,38 +313,76 @@ func (s *Writer) Batch(batch *Batch) (err error) {
return err
}

var id = "_id"

func (s *Writer) removeExistingDocuments(batch *Batch) error {
if len(batch.unparsedIDs) == 0 {
return nil
}

root := s.currentSnapshot()
defer func() { _ = root.Close() }()
removeIDMap := bitset.New(uint(len(batch.unparsedIDs)))

var dict segment.Dictionary
var err error
for _, seg := range root.segment {
dict, err := seg.segment.Dictionary(batch.unparsedIDs[0].Field())
if err != nil {
return err
}

for i := 0; i < len(batch.unparsedIDs); i++ {
if ok, _ := dict.Contains(batch.unparsedIDs[i].Term()); !ok {
dict = nil
ff := seg.segment.Fields()
for i := uint(0); i < uint(len(batch.unparsedIDs)); i++ {
if removeIDMap.Test(i) {
continue
}
idTerm := batch.unparsedIDs[i].Term()
c := s.cache.Load()
if c != nil {
if !c.Has(idTerm) {
if dict == nil {
dict, err = seg.segment.Dictionary(id)
if err != nil {
return err
}
}
if ok, _ := dict.Contains(idTerm); !ok {
continue
}
c.Set(idTerm, nil)
}
} else {
if dict == nil {
dict, err = seg.segment.Dictionary(id)
if err != nil {
return err
}
}
if ok, _ := dict.Contains(idTerm); !ok {
continue
}
}

fn := batch.fieldNames[i]
if len(fn) > 0 {
if anyItemNotExist(fn, seg.segment.Fields()) {
if anyItemNotExist(fn, ff) {
continue
}
}
batch.unparsedDocuments = append(batch.unparsedDocuments[:i], batch.unparsedDocuments[i+1:]...)
batch.unparsedIDs = append(batch.unparsedIDs[:i], batch.unparsedIDs[i+1:]...)
batch.fieldNames = append(batch.fieldNames[:i], batch.fieldNames[i+1:]...)
i--
if len(batch.unparsedDocuments) == 0 {
removeIDMap.Set(i)
if removeIDMap.All() {
return nil
}
}
}
if len(batch.unparsedDocuments) > 0 {
batch.documents = append(batch.documents, batch.unparsedDocuments...)
batch.ids = append(batch.ids, batch.unparsedIDs...)
if removeIDMap.Any() {
for i := uint(0); i < uint(len(batch.unparsedIDs)); i++ {
if removeIDMap.Test(i) {
continue
}
batch.documents = append(batch.documents, batch.unparsedDocuments[i])
batch.ids = append(batch.ids, batch.unparsedIDs[i])
}
} else {
batch.documents = batch.unparsedDocuments
batch.ids = batch.unparsedIDs
}
return nil
}
Expand Down
98 changes: 98 additions & 0 deletions index/writer_benchmark_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,98 @@
// Copyright (c) 2020 Couchbase, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package index

import (
"fmt"
"os"
"runtime/pprof"
"testing"
"time"
)

// goos: darwin
// goarch: arm64
// pkg: github.com/blugelabs/bluge/index
// cpu: Apple M1 Pro
// BenchmarkWriter_removeExistingDocuments-NoCache 24632 41085 ns/op 18711 B/op 204 allocs/op
// BenchmarkWriter_removeExistingDocuments-Cache 161628 6865 ns/op 2456 B/op 102 allocs/op
func BenchmarkWriter_removeExistingDocuments(b *testing.B) {
cfg, cleanup := CreateConfig("BenchmarkWriter_removeExistingDocuments")
cfg.CacheMaxBytes = 100 << 20
defer func() {
err := cleanup()
if err != nil {
b.Log(err)
}
}()

idx, err := OpenWriter(cfg)
if err != nil {
b.Fatal(err)
}
defer func() {
err = idx.Close()
if err != nil {
b.Fatal(err)
}
}()
for i := 0; i < 3000; i += 100 {
batch := NewBatch()
for j := 0; j < 100; j++ {
serviceName := fmt.Sprintf("service-%d", (i+j)%10) // 10 different service names
ipAddress := fmt.Sprintf("192.168.%d.%d", (i+j)/256, (i+j)%256) // IP addresses
docID := fmt.Sprintf("%s-%s", serviceName, ipAddress)
doc := &FakeDocument{
NewFakeField("_id", docID, true, false, true),
NewFakeField("title", fmt.Sprintf("mister-%d", i), true, false, true),
}
batch.Insert(doc)
}
if err := idx.Batch(batch); err != nil {
b.Fatalf("failed to apply batch: %v", err)
}
}
time.Sleep(1 * time.Second)
batchRemove := NewBatch()
for j := 0; j < 100; j++ {
serviceName := fmt.Sprintf("service-%d", j%10) // 10 different service names
ipAddress := fmt.Sprintf("192.168.%d.%d", j/256, j%256) // IP addresses
docID := fmt.Sprintf("%s-%s", serviceName, ipAddress) // Document ID composed of service name and IP address
doc := &FakeDocument{
NewFakeField("_id", docID, true, false, true),
NewFakeField("title", fmt.Sprintf("mister-%d", j), true, false, true),
}
batchRemove.InsertIfAbsent(testIdentifier(docID), []string{"title"}, doc)
}

// Start profiling
f, err := os.Create("cpu.prof")
if err != nil {
b.Fatal(err)
}
defer f.Close()
if err := pprof.StartCPUProfile(f); err != nil {
b.Fatal(err)
}
defer pprof.StopCPUProfile()
b.ResetTimer()
for i := 0; i < b.N; i++ {
batchRemove.ResetDoc()
err := idx.removeExistingDocuments(batchRemove)
if err != nil {
b.Fatal(err)
}
}
}
Loading

0 comments on commit e54f64b

Please sign in to comment.