From e54f64be56e84e155e9074a442ee976ae333abc0 Mon Sep 17 00:00:00 2001 From: Gao Hongtao Date: Wed, 25 Dec 2024 13:49:54 +0800 Subject: [PATCH] Improve removeExistingDocuments performance by adding cache Signed-off-by: Gao Hongtao --- .gitignore | 1 + batch.go | 4 +- go.mod | 3 + go.sum | 9 + index/batch.go | 6 + index/config.go | 2 + index/stats.go | 23 +- index/writer.go | 87 +++++-- index/writer_benchmark_test.go | 98 ++++++++ index/writer_test.go | 448 +++++++++++++++++---------------- writer.go | 4 + 11 files changed, 456 insertions(+), 229 deletions(-) create mode 100644 index/writer_benchmark_test.go diff --git a/.gitignore b/.gitignore index 3bd3ef8..ae5846e 100644 --- a/.gitignore +++ b/.gitignore @@ -17,3 +17,4 @@ vendor/** /search/query/y.output *.test tags +*.prof diff --git a/batch.go b/batch.go index d1f9d34..5344028 100644 --- a/batch.go +++ b/batch.go @@ -20,14 +20,14 @@ import ( const _idField = "_id" -type Identifier string +type Identifier []byte func (i Identifier) Field() string { return _idField } func (i Identifier) Term() []byte { - return []byte(i) + return i } // NewBatch creates a new empty batch. diff --git a/go.mod b/go.mod index e40dc2a..6741114 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,10 @@ require ( ) require ( + github.com/VictoriaMetrics/fastcache v1.12.2 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 // indirect + github.com/golang/snappy v0.0.4 // indirect github.com/inconshreveable/mousetrap v1.1.0 // indirect github.com/klauspost/compress v1.17.11 // indirect github.com/leesper/go_rng v0.0.0-20190531154944-a612b043e353 // indirect diff --git a/go.sum b/go.sum index ceedcd1..8b61322 100644 --- a/go.sum +++ b/go.sum @@ -3,7 +3,10 @@ github.com/RoaringBitmap/roaring v1.9.4 h1:yhEIoH4YezLYT04s1nHehNO64EKFTop/wBhxv github.com/RoaringBitmap/roaring v1.9.4/go.mod h1:6AXUsoIEzDTFFQCe1RbGA6uFONMhvejWj5rqITANK90= github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118 h1:Ja62sgOCp2qPTd8Xmldv1U83v11IRIsh6KlB7UaFLj4= github.com/SkyAPM/ice v0.0.0-20241108011032-c3d8eea75118/go.mod h1:DoQeb0Ee86LyruZSL77Ddscfk/THJ38x453CRCnGEPI= +github.com/VictoriaMetrics/fastcache v1.12.2 h1:N0y9ASrJ0F6h0QaC3o6uJb3NIZ9VKLjCM7NQbSmF7WI= +github.com/VictoriaMetrics/fastcache v1.12.2/go.mod h1:AmC+Nzz1+3G2eCPapF6UcsnkThDcMsQicp4xDukwJYI= github.com/ajstarks/svgo v0.0.0-20180226025133-644b8db467af/go.mod h1:K08gAheRH3/J6wwsYMMT4xOr94bZjxIelGM0+d/wbFw= +github.com/allegro/bigcache v1.2.1-0.20190218064605-e24eb225f156/go.mod h1:Cb/ax3seSYIx7SuZdm2G2xzfwmv3TPSk2ucNfQESPXM= github.com/axiomhq/hyperloglog v0.2.0 h1:u1XT3yyY1rjzlWuP6NQIrV4bRYHOaqZaovqjcBEvZJo= github.com/axiomhq/hyperloglog v0.2.0/go.mod h1:GcgMjz9gaDKZ3G0UMS6Fq/VkZ4l7uGgcJyxA7M+omIM= github.com/bits-and-blooms/bitset v1.2.0/go.mod h1:gIdJ4wp64HaoK2YrL1Q5/N7Y16edYb8uY+O0FJTyyDA= @@ -22,6 +25,8 @@ github.com/blevesearch/vellum v1.0.10 h1:HGPJDT2bTva12hrHepVT3rOyIKFFF4t7Gf6yMxy github.com/blevesearch/vellum v1.0.10/go.mod h1:ul1oT0FhSMDIExNjIxHqJoGpVrBpKCdgDQNxfqgJt7k= github.com/caio/go-tdigest v3.1.0+incompatible h1:uoVMJ3Q5lXmVLCCqaMGHLBWnbGoN6Lpu7OAUPR60cds= github.com/caio/go-tdigest v3.1.0+incompatible/go.mod h1:sHQM/ubZStBUmF1WbB8FAm8q9GjDajLC5T7ydxE3JHI= +github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cpuguy83/go-md2man/v2 v2.0.4/go.mod h1:tgQtvFlXSQOSOSIRvRPT7W67SCa46tRHOmNcaadrF8o= github.com/davecgh/go-spew v1.1.0/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38= github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c= @@ -30,6 +35,8 @@ github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140 h1:y7y0Oa6UawqTFP github.com/dgryski/go-metro v0.0.0-20211217172704-adc40b04c140/go.mod h1:c9O8+fpSOX1DM8cPNSkX/qsBWdkD4yd2dpciOWQjpBw= github.com/fogleman/gg v1.2.1-0.20190220221249-0403632d5b90/go.mod h1:R/bRT+9gY/C5z7JzPU0zXsXHKM4/ayA+zqcVNZzPa1k= github.com/golang/freetype v0.0.0-20170609003504-e2365dfdc4a0/go.mod h1:E/TSTwGwJL78qG/PmXZO1EjYhfJinVAhrmmHX6Z8B9k= +github.com/golang/snappy v0.0.4 h1:yAGX7huGHXlcLOEtBnF4w7FQwA26wojNCwOYAEhLjQM= +github.com/golang/snappy v0.0.4/go.mod h1:/XxbfmMg8lxefKM7IXC3fBNl/7bRcc72aCRzEWrmP2Q= github.com/inconshreveable/mousetrap v1.1.0 h1:wN+x4NVGpMsO7ErUn/mUI3vEoE6Jt13X2s0bqwp9tc8= github.com/inconshreveable/mousetrap v1.1.0/go.mod h1:vpF70FUmC8bwa3OWnCshd2FqLfsEA9PFc4w1p2J65bw= github.com/jung-kurt/gofpdf v1.0.3-0.20190309125859-24315acbbda5/go.mod h1:7Id9E/uU8ce6rXgefFLlgrJj/GYY22cpxn+r32jIOes= @@ -47,6 +54,7 @@ github.com/spf13/cobra v1.8.1/go.mod h1:wHxEcudfqmLYa8iTfL+OuZPbBZkmvliBWKIezN3k github.com/spf13/pflag v1.0.5 h1:iy+VFUOCP1a+8yFto/drg2CJ5u0yRoB7fZw3DKv/JXA= github.com/spf13/pflag v1.0.5/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= +github.com/stretchr/testify v1.3.0/go.mod h1:M5WIy9Dh21IEIfnGCwXGc5bZfKNJtfHm1UVUgZn+9EI= github.com/stretchr/testify v1.4.0/go.mod h1:j7eGeouHqKxXV5pUuKE4zz7dFj8WfuZ+81PSLYec5m4= github.com/stretchr/testify v1.7.0/go.mod h1:6Fq8oRcR53rry900zMqJjRRixrwX3KX962/h/Wwjteg= github.com/stretchr/testify v1.9.0 h1:HtqpIVDClZ4nwg75+f6Lvsy/wHu+3BoSGCbBAcpTsTg= @@ -58,6 +66,7 @@ golang.org/x/exp v0.0.0-20180807140117-3d87b88a115f/go.mod h1:CJ0aWSM057203Lf6IL golang.org/x/exp v0.0.0-20190125153040-c74c464bbbf2/go.mod h1:CJ0aWSM057203Lf6IL+f9T1iT9GByDxfZKAQTCR3kQA= golang.org/x/image v0.0.0-20180708004352-c73c2afc3b81/go.mod h1:ux5Hcp/YLpHSI86hEcLt0YII63i6oz57MZXIpbrjZUs= golang.org/x/sys v0.0.0-20220520151302-bc2c85ada10a/go.mod h1:oPkhp1MJrh7nUepCBck5+mAzfO9JrbApNNgaTdGDITg= +golang.org/x/sys v0.14.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/sys v0.26.0 h1:KHjCJyddX0LoSTb3J+vWpupP9p0oznkqVk/IfjymZbo= golang.org/x/sys v0.26.0/go.mod h1:/VUhepiaJMQUp4+oa/7Zr1D23ma6VTLIYjOOTFZPUcA= golang.org/x/text v0.19.0 h1:kTxAhCbGbxhK0IwgSKiMO5awPoDQ0RpfiVYBfK860YM= diff --git a/index/batch.go b/index/batch.go index 083d38e..77f5a6e 100644 --- a/index/batch.go +++ b/index/batch.go @@ -54,6 +54,12 @@ func (b *Batch) Reset() { b.persistedCallback = nil b.unparsedDocuments = b.unparsedDocuments[:0] b.unparsedIDs = b.unparsedIDs[:0] + b.fieldNames = b.fieldNames[:0] +} + +func (b *Batch) ResetDoc() { + b.documents = b.documents[:0] + b.ids = b.ids[:0] } func (b *Batch) SetPersistedCallback(f func(error)) { diff --git a/index/config.go b/index/config.go index 9a4b2d8..e9569d3 100644 --- a/index/config.go +++ b/index/config.go @@ -78,6 +78,8 @@ type Config struct { ValidateSnapshotCRC bool virtualFields map[string][]segment.Field + + CacheMaxBytes int } func (config Config) WithSegmentType(typ string) Config { diff --git a/index/stats.go b/index/stats.go index e3a9b97..1da3625 100644 --- a/index/stats.go +++ b/index/stats.go @@ -17,6 +17,8 @@ package index import ( "reflect" "sync/atomic" + + "github.com/VictoriaMetrics/fastcache" ) func (s *Writer) DirectoryStats() (numFilesOnDisk, numBytesUsedDisk uint64) { @@ -30,7 +32,19 @@ func (s *Writer) Stats() Stats { // Update the stats atomically atomic.StoreUint64(&s.stats.CurOnDiskBytes, numBytesUsedDisk) atomic.StoreUint64(&s.stats.CurOnDiskFiles, numFilesOnDisk) - return s.stats.Clone() + stats := s.stats.Clone() + c := s.cache.Load() + if c != nil { + var cs fastcache.Stats + c.UpdateStats(&cs) + stats.CacheGetCalls = cs.GetCalls + stats.CacheSetCalls = cs.SetCalls + stats.CacheMisses = cs.Misses + stats.CacheEntriesCount = cs.EntriesCount + stats.CacheBytesSize = cs.BytesSize + stats.CacheMaxBytesSize = cs.MaxBytesSize + } + return stats } // Stats tracks statistics about the index, fields that are @@ -152,6 +166,13 @@ type Stats struct { newSegBufBytesRemoved uint64 analysisBytesAdded uint64 analysisBytesRemoved uint64 + + CacheGetCalls uint64 + CacheSetCalls uint64 + CacheMisses uint64 + CacheEntriesCount uint64 + CacheBytesSize uint64 + CacheMaxBytesSize uint64 } func (s *Stats) ToMap() map[string]interface{} { diff --git a/index/writer.go b/index/writer.go index f7e3aec..91e1a52 100644 --- a/index/writer.go +++ b/index/writer.go @@ -24,6 +24,8 @@ import ( "sync/atomic" "time" + "github.com/VictoriaMetrics/fastcache" + "github.com/bits-and-blooms/bitset" segment "github.com/blugelabs/bluge_segment_api" "github.com/RoaringBitmap/roaring" @@ -53,6 +55,8 @@ type Writer struct { asyncTasks sync.WaitGroup closeOnce sync.Once + + cache atomic.Pointer[fastcache.Cache] } func OpenWriter(config Config) (*Writer, error) { @@ -63,6 +67,10 @@ func OpenWriter(config Config) (*Writer, error) { closeCh: make(chan struct{}), } + if config.CacheMaxBytes > 0 { + rv.cache.Store(fastcache.New(config.CacheMaxBytes)) + } + // start the requested number of analysis workers for i := 0; i < config.NumAnalysisWorkers; i++ { config.GoFunc(func() { @@ -195,10 +203,19 @@ func (s *Writer) fireAsyncError(err error) { func (s *Writer) Close() (err error) { s.closeOnce.Do(func() { err = s.close() + s.ResetCache() }) return err } +func (s *Writer) ResetCache() { + c := s.cache.Load() + if c != nil { + c.Reset() + s.cache.Store(nil) + } +} + func (s *Writer) close() (err error) { startTime := time.Now() defer func() { @@ -296,38 +313,76 @@ func (s *Writer) Batch(batch *Batch) (err error) { return err } +var id = "_id" + func (s *Writer) removeExistingDocuments(batch *Batch) error { + if len(batch.unparsedIDs) == 0 { + return nil + } + root := s.currentSnapshot() defer func() { _ = root.Close() }() + removeIDMap := bitset.New(uint(len(batch.unparsedIDs))) + var dict segment.Dictionary + var err error for _, seg := range root.segment { - dict, err := seg.segment.Dictionary(batch.unparsedIDs[0].Field()) - if err != nil { - return err - } - - for i := 0; i < len(batch.unparsedIDs); i++ { - if ok, _ := dict.Contains(batch.unparsedIDs[i].Term()); !ok { + dict = nil + ff := seg.segment.Fields() + for i := uint(0); i < uint(len(batch.unparsedIDs)); i++ { + if removeIDMap.Test(i) { continue } + idTerm := batch.unparsedIDs[i].Term() + c := s.cache.Load() + if c != nil { + if !c.Has(idTerm) { + if dict == nil { + dict, err = seg.segment.Dictionary(id) + if err != nil { + return err + } + } + if ok, _ := dict.Contains(idTerm); !ok { + continue + } + c.Set(idTerm, nil) + } + } else { + if dict == nil { + dict, err = seg.segment.Dictionary(id) + if err != nil { + return err + } + } + if ok, _ := dict.Contains(idTerm); !ok { + continue + } + } + fn := batch.fieldNames[i] if len(fn) > 0 { - if anyItemNotExist(fn, seg.segment.Fields()) { + if anyItemNotExist(fn, ff) { continue } } - batch.unparsedDocuments = append(batch.unparsedDocuments[:i], batch.unparsedDocuments[i+1:]...) - batch.unparsedIDs = append(batch.unparsedIDs[:i], batch.unparsedIDs[i+1:]...) - batch.fieldNames = append(batch.fieldNames[:i], batch.fieldNames[i+1:]...) - i-- - if len(batch.unparsedDocuments) == 0 { + removeIDMap.Set(i) + if removeIDMap.All() { return nil } } } - if len(batch.unparsedDocuments) > 0 { - batch.documents = append(batch.documents, batch.unparsedDocuments...) - batch.ids = append(batch.ids, batch.unparsedIDs...) + if removeIDMap.Any() { + for i := uint(0); i < uint(len(batch.unparsedIDs)); i++ { + if removeIDMap.Test(i) { + continue + } + batch.documents = append(batch.documents, batch.unparsedDocuments[i]) + batch.ids = append(batch.ids, batch.unparsedIDs[i]) + } + } else { + batch.documents = batch.unparsedDocuments + batch.ids = batch.unparsedIDs } return nil } diff --git a/index/writer_benchmark_test.go b/index/writer_benchmark_test.go new file mode 100644 index 0000000..7805588 --- /dev/null +++ b/index/writer_benchmark_test.go @@ -0,0 +1,98 @@ +// Copyright (c) 2020 Couchbase, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package index + +import ( + "fmt" + "os" + "runtime/pprof" + "testing" + "time" +) + +// goos: darwin +// goarch: arm64 +// pkg: github.com/blugelabs/bluge/index +// cpu: Apple M1 Pro +// BenchmarkWriter_removeExistingDocuments-NoCache 24632 41085 ns/op 18711 B/op 204 allocs/op +// BenchmarkWriter_removeExistingDocuments-Cache 161628 6865 ns/op 2456 B/op 102 allocs/op +func BenchmarkWriter_removeExistingDocuments(b *testing.B) { + cfg, cleanup := CreateConfig("BenchmarkWriter_removeExistingDocuments") + cfg.CacheMaxBytes = 100 << 20 + defer func() { + err := cleanup() + if err != nil { + b.Log(err) + } + }() + + idx, err := OpenWriter(cfg) + if err != nil { + b.Fatal(err) + } + defer func() { + err = idx.Close() + if err != nil { + b.Fatal(err) + } + }() + for i := 0; i < 3000; i += 100 { + batch := NewBatch() + for j := 0; j < 100; j++ { + serviceName := fmt.Sprintf("service-%d", (i+j)%10) // 10 different service names + ipAddress := fmt.Sprintf("192.168.%d.%d", (i+j)/256, (i+j)%256) // IP addresses + docID := fmt.Sprintf("%s-%s", serviceName, ipAddress) + doc := &FakeDocument{ + NewFakeField("_id", docID, true, false, true), + NewFakeField("title", fmt.Sprintf("mister-%d", i), true, false, true), + } + batch.Insert(doc) + } + if err := idx.Batch(batch); err != nil { + b.Fatalf("failed to apply batch: %v", err) + } + } + time.Sleep(1 * time.Second) + batchRemove := NewBatch() + for j := 0; j < 100; j++ { + serviceName := fmt.Sprintf("service-%d", j%10) // 10 different service names + ipAddress := fmt.Sprintf("192.168.%d.%d", j/256, j%256) // IP addresses + docID := fmt.Sprintf("%s-%s", serviceName, ipAddress) // Document ID composed of service name and IP address + doc := &FakeDocument{ + NewFakeField("_id", docID, true, false, true), + NewFakeField("title", fmt.Sprintf("mister-%d", j), true, false, true), + } + batchRemove.InsertIfAbsent(testIdentifier(docID), []string{"title"}, doc) + } + + // Start profiling + f, err := os.Create("cpu.prof") + if err != nil { + b.Fatal(err) + } + defer f.Close() + if err := pprof.StartCPUProfile(f); err != nil { + b.Fatal(err) + } + defer pprof.StopCPUProfile() + b.ResetTimer() + for i := 0; i < b.N; i++ { + batchRemove.ResetDoc() + err := idx.removeExistingDocuments(batchRemove) + if err != nil { + b.Fatal(err) + } + } +} diff --git a/index/writer_test.go b/index/writer_test.go index 2b8c2a7..1cb40e5 100644 --- a/index/writer_test.go +++ b/index/writer_test.go @@ -1633,242 +1633,270 @@ func TestIndexSeekBackwardsStats(t *testing.T) { } func TestBatch_InsertIfAbsent(t *testing.T) { - cfg, cleanup := CreateConfig("TestBatch_InsertIfAbsent") - defer func() { - err := cleanup() - if err != nil { - t.Log(err) - } - }() - - idx, err := OpenWriter(cfg) - if err != nil { - t.Fatal(err) - } - defer func() { - err := idx.Close() - if err != nil { - t.Fatal(err) - } - }() - - var expectedCount uint64 - - // Verify initial document count is zero - reader, err := idx.Reader() - if err != nil { - t.Fatal(err) - } - docCount, err := reader.Count() - if err != nil { - t.Error(err) - } - if docCount != expectedCount { - t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) - } - err = reader.Close() - if err != nil { - t.Fatal(err) - } + tests := []struct { + name string + cacheMaxBytes int + }{ + {"WithoutCache", 0}, + {"WithCache", 1024 * 1024}, // 1MB cache + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg, cleanup := CreateConfig("TestBatch_InsertIfAbsent") + defer func() { + err := cleanup() + if err != nil { + t.Log(err) + } + }() + + cfg.CacheMaxBytes = tt.cacheMaxBytes + + idx, err := OpenWriter(cfg) + if err != nil { + t.Fatal(err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() + + var expectedCount uint64 + + // Verify initial document count is zero + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } - // Insert a document using InsertIfAbsent - docID := "doc-1" - doc := &FakeDocument{ - NewFakeField("_id", docID, true, false, false), - NewFakeField("title", "mister", false, false, true), - } - batch := NewBatch() - batch.InsertIfAbsent(testIdentifier(docID), []string{"title"}, doc) + // Insert a document using InsertIfAbsent + docID := "doc-1" + doc := &FakeDocument{ + NewFakeField("_id", docID, true, false, false), + NewFakeField("title", "mister", false, false, true), + } + batch := NewBatch() + batch.InsertIfAbsent(testIdentifier(docID), []string{"title"}, doc) - // Apply the batch - if err := idx.Batch(batch); err != nil { - t.Fatalf("failed to apply batch: %v", err) - } - expectedCount++ + // Apply the batch + if err := idx.Batch(batch); err != nil { + t.Fatalf("failed to apply batch: %v", err) + } + expectedCount++ - // Verify document count after insertion - reader, err = idx.Reader() - if err != nil { - t.Fatal(err) - } - docCount, err = reader.Count() - if err != nil { - t.Error(err) - } - if docCount != expectedCount { - t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) - } - err = reader.Close() - if err != nil { - t.Fatal(err) - } + // Verify document count after insertion + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } - // Attempt to InsertIfAbsent with the same ID - docDuplicate := &FakeDocument{ - NewFakeField("_id", docID, true, false, false), - NewFakeField("title", "mister2", true, false, true), - } - batchDuplicate := NewBatch() - batchDuplicate.InsertIfAbsent(testIdentifier(docID), []string{"title"}, docDuplicate) + // Attempt to InsertIfAbsent with the same ID + docDuplicate := &FakeDocument{ + NewFakeField("_id", docID, true, false, false), + NewFakeField("title", "mister2", true, false, true), + } + batchDuplicate := NewBatch() + batchDuplicate.InsertIfAbsent(testIdentifier(docID), []string{"title"}, docDuplicate) - // Apply the duplicate batch - if err := idx.Batch(batchDuplicate); err != nil { - t.Fatalf("failed to apply duplicate batch: %v", err) - } + // Apply the duplicate batch + if err := idx.Batch(batchDuplicate); err != nil { + t.Fatalf("failed to apply duplicate batch: %v", err) + } - // Since it's InsertIfAbsent, the document should not be duplicated - // Verify document count remains the same - reader, err = idx.Reader() - if err != nil { - t.Fatal(err) - } - docCount, err = reader.Count() - if err != nil { - t.Error(err) - } - if docCount != expectedCount { - t.Errorf("Expected document count to be %d after duplicate insert, got %d", expectedCount, docCount) - } + // Since it's InsertIfAbsent, the document should not be duplicated + // Verify document count remains the same + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d after duplicate insert, got %d", expectedCount, docCount) + } - docNum1, err := findNumberByID(reader, docID) - if err != nil { - t.Fatal(err) - } + docNum1, err := findNumberByID(reader, docID) + if err != nil { + t.Fatal(err) + } - dvr, err := reader.DocumentValueReader([]string{"title"}) - if err != nil { - t.Fatal(err) - } - err = dvr.VisitDocumentValues(docNum1, func(field string, term []byte) { - if field == "title" { - if string(term) != "mister" { - t.Errorf("expected title to be 'First Document', got '%s'", string(term)) + dvr, err := reader.DocumentValueReader([]string{"title"}) + if err != nil { + t.Fatal(err) + } + err = dvr.VisitDocumentValues(docNum1, func(field string, term []byte) { + if field == "title" { + if string(term) != "mister" { + t.Errorf("expected title to be 'mister', got '%s'", string(term)) + } + } + }) + if err != nil { + t.Fatal(err) } - } - }) - if err != nil { - t.Fatal(err) - } - err = reader.VisitStoredFields(docNum1, func(field string, value []byte) bool { - if field == "title" { - if string(value) != "mister" { - t.Errorf("expected title to be 'mister', got '%s'", string(value)) + err = reader.VisitStoredFields(docNum1, func(field string, value []byte) bool { + if field == "title" { + if string(value) != "mister" { + t.Errorf("expected title to be 'mister', got '%s'", string(value)) + } + } + return true + }) + if err != nil { + t.Fatal(err) } - } - return true - }) - if err != nil { - t.Fatal(err) - } - err = reader.Close() - if err != nil { - t.Fatal(err) + err = reader.Close() + if err != nil { + t.Fatal(err) + } + }) } } func TestBatch_InsertAndUpdateContent(t *testing.T) { - cfg, cleanup := CreateConfig("TestBatch_InsertAndUpdateContent") - defer func() { - err := cleanup() - if err != nil { - t.Log(err) - } - }() - - idx, err := OpenWriter(cfg) - if err != nil { - t.Fatal(err) - } - defer func() { - err := idx.Close() - if err != nil { - t.Fatal(err) - } - }() + tests := []struct { + name string + cacheMaxBytes int + }{ + {"WithoutCache", 0}, + {"WithCache", 1024 * 1024}, // 1MB cache + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + cfg, cleanup := CreateConfig("TestBatch_InsertAndUpdateContent") + defer func() { + err := cleanup() + if err != nil { + t.Log(err) + } + }() + + cfg.CacheMaxBytes = tt.cacheMaxBytes + + idx, err := OpenWriter(cfg) + if err != nil { + t.Fatal(err) + } + defer func() { + err := idx.Close() + if err != nil { + t.Fatal(err) + } + }() - var expectedCount uint64 + var expectedCount uint64 - // Insert a document - docID := "doc-1" - doc := &FakeDocument{ - NewFakeField("_id", docID, true, false, false), - NewFakeField("title", "mister", false, false, true), - } - batch := NewBatch() - batch.InsertIfAbsent(testIdentifier(docID), []string{"title"}, doc) + // Insert a document + docID := "doc-1" + doc := &FakeDocument{ + NewFakeField("_id", docID, true, false, false), + NewFakeField("title", "mister", false, false, true), + } + batch := NewBatch() + batch.InsertIfAbsent(testIdentifier(docID), []string{"title"}, doc) - // Apply the batch - if err := idx.Batch(batch); err != nil { - t.Fatalf("failed to apply batch: %v", err) - } - expectedCount++ + // Apply the batch + if err := idx.Batch(batch); err != nil { + t.Fatalf("failed to apply batch: %v", err) + } + expectedCount++ - // Verify document count after insertion - reader, err := idx.Reader() - if err != nil { - t.Fatal(err) - } - docCount, err := reader.Count() - if err != nil { - t.Error(err) - } - if docCount != expectedCount { - t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) - } - err = reader.Close() - if err != nil { - t.Fatal(err) - } + // Verify document count after insertion + reader, err := idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err := reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d got %d", expectedCount, docCount) + } + err = reader.Close() + if err != nil { + t.Fatal(err) + } - // Update the document with new content - docUpdated := &FakeDocument{ - NewFakeField("_id", docID, true, false, false), - NewFakeField("title", "mister", false, false, true), - NewFakeField("content", "updated content", false, false, true), - } - batchUpdate := NewBatch() - batchUpdate.InsertIfAbsent(testIdentifier(docID), []string{"title", "content"}, docUpdated) + // Update the document with new content + docUpdated := &FakeDocument{ + NewFakeField("_id", docID, true, false, false), + NewFakeField("title", "mister", false, false, true), + NewFakeField("content", "updated content", false, false, true), + } + batchUpdate := NewBatch() + batchUpdate.InsertIfAbsent(testIdentifier(docID), []string{"title", "content"}, docUpdated) - // Apply the update batch - if err := idx.Batch(batchUpdate); err != nil { - t.Fatalf("failed to apply update batch: %v", err) - } + // Apply the update batch + if err := idx.Batch(batchUpdate); err != nil { + t.Fatalf("failed to apply update batch: %v", err) + } - // Verify document count remains the same - reader, err = idx.Reader() - if err != nil { - t.Fatal(err) - } - docCount, err = reader.Count() - if err != nil { - t.Error(err) - } - if docCount != expectedCount { - t.Errorf("Expected document count to be %d after update, got %d", expectedCount, docCount) - } + // Verify document count remains the same + reader, err = idx.Reader() + if err != nil { + t.Fatal(err) + } + docCount, err = reader.Count() + if err != nil { + t.Error(err) + } + if docCount != expectedCount { + t.Errorf("Expected document count to be %d after update, got %d", expectedCount, docCount) + } - docNum1, err := findNumberByID(reader, docID) - if err != nil { - t.Fatal(err) - } + docNum1, err := findNumberByID(reader, docID) + if err != nil { + t.Fatal(err) + } - // Verify the updated content - err = reader.VisitStoredFields(docNum1, func(field string, value []byte) bool { - if field == "content" { - if string(value) != "updated content" { - t.Errorf("expected content to be 'updated content', got '%s'", string(value)) + // Verify the updated content + err = reader.VisitStoredFields(docNum1, func(field string, value []byte) bool { + if field == "content" { + if string(value) != "updated content" { + t.Errorf("expected content to be 'updated content', got '%s'", string(value)) + } + } + return true + }) + if err != nil { + t.Fatal(err) } - } - return true - }) - if err != nil { - t.Fatal(err) - } - err = reader.Close() - if err != nil { - t.Fatal(err) + err = reader.Close() + if err != nil { + t.Fatal(err) + } + }) } } diff --git a/writer.go b/writer.go index 8a13c6d..9110700 100644 --- a/writer.go +++ b/writer.go @@ -67,6 +67,10 @@ func (w *Writer) Close() error { return w.chill.Close() } +func (w *Writer) ResetCache() { + w.chill.ResetCache() +} + func (w *Writer) Status() index.Stats { return w.chill.Stats() }