Skip to content

Commit

Permalink
move archive getter/writer to seg package (#11817)
Browse files Browse the repository at this point in the history
and rename to `Reader/Writer`
remove interfaces related to it - to improve inlining
  • Loading branch information
AskAlexSharov authored Sep 4, 2024
1 parent ce2ecf8 commit 4b7d8c0
Show file tree
Hide file tree
Showing 26 changed files with 449 additions and 468 deletions.
5 changes: 2 additions & 3 deletions cmd/commitment-prefix/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,6 @@ import (

"github.com/erigontech/erigon-lib/commitment"
"github.com/erigontech/erigon-lib/seg"
"github.com/erigontech/erigon-lib/state"
)

var (
Expand Down Expand Up @@ -174,15 +173,15 @@ func extractKVPairFromCompressed(filename string, keysSink chan commitment.Branc
defer dec.Close()
tv := commitment.ParseTrieVariant(*flagTrieVariant)

fc, err := state.ParseFileCompression(*flagCompression)
fc, err := seg.ParseFileCompression(*flagCompression)
if err != nil {
return err
}
size := dec.Size()
paris := dec.Count() / 2
cpair := 0

getter := state.NewArchiveGetter(dec.MakeGetter(), fc)
getter := seg.NewReader(dec.MakeGetter(), fc)
for getter.HasNext() {
key, _ := getter.Next(nil)
if !getter.HasNext() {
Expand Down
11 changes: 5 additions & 6 deletions cmd/state/commands/cat_snapshot.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,6 @@ import (
"github.com/spf13/cobra"

"github.com/erigontech/erigon-lib/seg"
"github.com/erigontech/erigon-lib/state"
)

func init() {
Expand Down Expand Up @@ -72,21 +71,21 @@ var catSnapshot = &cobra.Command{

fmt.Printf("File %s modtime %s (%s ago) size %v pairs %d \n", fpath, d.ModTime(), time.Since(d.ModTime()), (datasize.B * datasize.ByteSize(d.Size())).HR(), d.Count()/2)

compFlags := state.CompressNone
compFlags := seg.CompressNone
switch strings.ToLower(compressed) {
case "k":
compFlags = state.CompressKeys
compFlags = seg.CompressKeys
case "v":
compFlags = state.CompressVals
compFlags = seg.CompressVals
case "kv":
compFlags = state.CompressKeys | state.CompressVals
compFlags = seg.CompressKeys | seg.CompressVals
case "":
break
default:
return fmt.Errorf("unknown compression flags %s", compressed)
}

rd := state.NewArchiveGetter(d.MakeGetter(), compFlags)
rd := seg.NewReader(d.MakeGetter(), compFlags)

pbytes := []byte{}
if pick != "" {
Expand Down
195 changes: 195 additions & 0 deletions erigon-lib/seg/seg_auto_rw.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,195 @@
// Copyright 2024 The Erigon Authors
// This file is part of Erigon.
//
// Erigon is free software: you can redistribute it and/or modify
// it under the terms of the GNU Lesser General Public License as published by
// the Free Software Foundation, either version 3 of the License, or
// (at your option) any later version.
//
// Erigon is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU Lesser General Public License for more details.
//
// You should have received a copy of the GNU Lesser General Public License
// along with Erigon. If not, see <http://www.gnu.org/licenses/>.

package seg

import (
"fmt"
)

//Reader and Writer - decorators on Getter and Compressor - which
//can auto-use Next/NextUncompressed and AddWord/AddUncompressedWord - based on `FileCompression` passed to constructor

// Maybe in future will add support of io.Reader/Writer interfaces to this decorators
// Maybe in future will merge decorators into it's parents

type FileCompression uint8

const (
CompressNone FileCompression = 0b0 // no compression
CompressKeys FileCompression = 0b1 // compress keys only
CompressVals FileCompression = 0b10 // compress values only
)

func ParseFileCompression(s string) (FileCompression, error) {
switch s {
case "none", "":
return CompressNone, nil
case "k":
return CompressKeys, nil
case "v":
return CompressVals, nil
case "kv":
return CompressKeys | CompressVals, nil
default:
return 0, fmt.Errorf("invalid file compression type: %s", s)
}
}

func (c FileCompression) String() string {
switch c {
case CompressNone:
return "none"
case CompressKeys:
return "k"
case CompressVals:
return "v"
case CompressKeys | CompressVals:
return "kv"
default:
return ""
}
}

type Reader struct {
*Getter
nextValue bool // if nextValue true then getter.Next() expected to return value
c FileCompression // compressed
}

func NewReader(g *Getter, c FileCompression) *Reader {
return &Reader{Getter: g, c: c}
}

func (g *Reader) MatchPrefix(prefix []byte) bool {
if g.c&CompressKeys != 0 {
return g.Getter.MatchPrefix(prefix)
}
return g.Getter.MatchPrefixUncompressed(prefix)
}

func (g *Reader) Next(buf []byte) ([]byte, uint64) {
fl := CompressKeys
if g.nextValue {
fl = CompressVals
g.nextValue = false
} else {
g.nextValue = true
}

if g.c&fl != 0 {
return g.Getter.Next(buf)
}
return g.Getter.NextUncompressed()
}

func (g *Reader) Reset(offset uint64) {
g.nextValue = false
g.Getter.Reset(offset)
}
func (g *Reader) Skip() (uint64, int) {
fl := CompressKeys
if g.nextValue {
fl = CompressVals
g.nextValue = false
} else {
g.nextValue = true
}

if g.c&fl != 0 {
return g.Getter.Skip()
}
return g.Getter.SkipUncompressed()

}

type Writer struct {
*Compressor
keyWritten bool
c FileCompression
}

func NewWriter(kv *Compressor, compress FileCompression) *Writer {
return &Writer{kv, false, compress}
}

func (c *Writer) AddWord(word []byte) error {
fl := CompressKeys
if c.keyWritten {
fl = CompressVals
c.keyWritten = false
} else {
c.keyWritten = true
}

if c.c&fl != 0 {
return c.Compressor.AddWord(word)
}
return c.Compressor.AddUncompressedWord(word)
}

func (c *Writer) Close() {
if c.Compressor != nil {
c.Compressor.Close()
}
}

func DetectCompressType(getter *Getter) (compressed FileCompression) {
keyCompressed := func() (compressed bool) {
defer func() {
if rec := recover(); rec != nil {
compressed = true
}
}()
getter.Reset(0)
for i := 0; i < 100; i++ {
if getter.HasNext() {
_, _ = getter.NextUncompressed()
}
if getter.HasNext() {
_, _ = getter.Skip()
}
}
return compressed
}()

valCompressed := func() (compressed bool) {
defer func() {
if rec := recover(); rec != nil {
compressed = true
}
}()
getter.Reset(0)
for i := 0; i < 100; i++ {
if getter.HasNext() {
_, _ = getter.Skip()
}
if getter.HasNext() {
_, _ = getter.NextUncompressed()
}
}
return compressed
}()
getter.Reset(0)

if keyCompressed {
compressed |= CompressKeys
}
if valCompressed {
compressed |= CompressVals
}
return compressed
}
18 changes: 9 additions & 9 deletions erigon-lib/state/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,7 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
cfg := domainCfg{
hist: histCfg{
iiCfg: iiCfg{salt: salt, dirs: dirs, db: db},
withLocalityIndex: false, withExistenceIndex: false, compression: CompressNone, historyLargeValues: false,
withLocalityIndex: false, withExistenceIndex: false, compression: seg.CompressNone, historyLargeValues: false,
},
restrictSubsetFileDeletions: a.commitmentValuesTransform,
}
Expand All @@ -170,18 +170,18 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
cfg = domainCfg{
hist: histCfg{
iiCfg: iiCfg{salt: salt, dirs: dirs, db: db},
withLocalityIndex: false, withExistenceIndex: false, compression: CompressNone, historyLargeValues: false,
withLocalityIndex: false, withExistenceIndex: false, compression: seg.CompressNone, historyLargeValues: false,
},
restrictSubsetFileDeletions: a.commitmentValuesTransform,
compress: CompressKeys,
compress: seg.CompressKeys,
}
if a.d[kv.StorageDomain], err = NewDomain(cfg, aggregationStep, kv.StorageDomain, kv.TblStorageVals, kv.TblStorageHistoryKeys, kv.TblStorageHistoryVals, kv.TblStorageIdx, integrityCheck, logger); err != nil {
return nil, err
}
cfg = domainCfg{
hist: histCfg{
iiCfg: iiCfg{salt: salt, dirs: dirs, db: db},
withLocalityIndex: false, withExistenceIndex: false, compression: CompressKeys | CompressVals, historyLargeValues: true,
withLocalityIndex: false, withExistenceIndex: false, compression: seg.CompressKeys | seg.CompressVals, historyLargeValues: true,
},
largeVals: true,
}
Expand All @@ -191,12 +191,12 @@ func NewAggregator(ctx context.Context, dirs datadir.Dirs, aggregationStep uint6
cfg = domainCfg{
hist: histCfg{
iiCfg: iiCfg{salt: salt, dirs: dirs, db: db},
withLocalityIndex: false, withExistenceIndex: false, compression: CompressNone, historyLargeValues: false,
withLocalityIndex: false, withExistenceIndex: false, compression: seg.CompressNone, historyLargeValues: false,
snapshotsDisabled: true,
},
replaceKeysInValues: a.commitmentValuesTransform,
restrictSubsetFileDeletions: a.commitmentValuesTransform,
compress: CompressKeys,
compress: seg.CompressKeys,
}
if a.d[kv.CommitmentDomain], err = NewDomain(cfg, aggregationStep, kv.CommitmentDomain, kv.TblCommitmentVals, kv.TblCommitmentHistoryKeys, kv.TblCommitmentHistoryVals, kv.TblCommitmentIdx, integrityCheck, logger); err != nil {
return nil, err
Expand Down Expand Up @@ -1591,7 +1591,7 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles(mergedAgg *AggregatorRoTx) erro
steps := cf.endTxNum/ac.a.aggregationStep - cf.startTxNum/ac.a.aggregationStep
compression := commitment.d.compression
if steps < DomainMinStepsToCompress {
compression = CompressNone
compression = seg.CompressNone
}
ac.a.logger.Info("[sqeeze_migration] file start", "original", cf.decompressor.FileName(),
"progress", fmt.Sprintf("%d/%d", ci+1, len(mergedAccountFiles)), "compress_cfg", commitment.d.compressCfg, "compress", compression)
Expand All @@ -1606,10 +1606,10 @@ func (ac *AggregatorRoTx) SqueezeCommitmentFiles(mergedAgg *AggregatorRoTx) erro
}
defer squeezedCompr.Close()

reader := NewArchiveGetter(cf.decompressor.MakeGetter(), compression)
reader := seg.NewReader(cf.decompressor.MakeGetter(), compression)
reader.Reset(0)

writer := NewArchiveWriter(squeezedCompr, commitment.d.compression)
writer := seg.NewWriter(squeezedCompr, commitment.d.compression)
rng := MergeRange{needMerge: true, from: af.startTxNum, to: af.endTxNum}
vt, err := commitment.commitmentValTransformDomain(rng, accounts, storage, af, sf)
if err != nil {
Expand Down
10 changes: 5 additions & 5 deletions erigon-lib/state/aggregator_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ func Benchmark_BtreeIndex_Search(b *testing.B) {
dataPath := "../../data/storage.256-288.kv"

indexPath := path.Join(tmp, filepath.Base(dataPath)+".bti")
comp := CompressKeys | CompressVals
comp := seg.CompressKeys | seg.CompressVals
buildBtreeIndex(b, dataPath, indexPath, comp, 1, logger, true)

M := 1024
Expand All @@ -156,7 +156,7 @@ func Benchmark_BtreeIndex_Search(b *testing.B) {

keys, err := pivotKeysFromKV(dataPath)
require.NoError(b, err)
getter := NewArchiveGetter(kv.MakeGetter(), comp)
getter := seg.NewReader(kv.MakeGetter(), comp)

for i := 0; i < b.N; i++ {
p := rnd.Intn(len(keys))
Expand All @@ -167,7 +167,7 @@ func Benchmark_BtreeIndex_Search(b *testing.B) {
}
}

func benchInitBtreeIndex(b *testing.B, M uint64, compression FileCompression) (*seg.Decompressor, *BtIndex, [][]byte, string) {
func benchInitBtreeIndex(b *testing.B, M uint64, compression seg.FileCompression) (*seg.Decompressor, *BtIndex, [][]byte, string) {
b.Helper()

logger := log.New()
Expand All @@ -191,10 +191,10 @@ func benchInitBtreeIndex(b *testing.B, M uint64, compression FileCompression) (*

func Benchmark_BTree_Seek(b *testing.B) {
M := uint64(1024)
compress := CompressNone
compress := seg.CompressNone
kv, bt, keys, _ := benchInitBtreeIndex(b, M, compress)
rnd := rand.New(rand.NewSource(time.Now().UnixNano()))
getter := NewArchiveGetter(kv.MakeGetter(), compress)
getter := seg.NewReader(kv.MakeGetter(), compress)

b.Run("seek_only", func(b *testing.B) {
for i := 0; i < b.N; i++ {
Expand Down
8 changes: 4 additions & 4 deletions erigon-lib/state/aggregator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -417,11 +417,11 @@ func aggregatorV3_RestartOnDatadir(t *testing.T, rc runCfg) {

func TestNewBtIndex(t *testing.T) {
keyCount := 10000
kvPath := generateKV(t, t.TempDir(), 20, 10, keyCount, log.New(), CompressNone)
kvPath := generateKV(t, t.TempDir(), 20, 10, keyCount, log.New(), seg.CompressNone)

indexPath := strings.TrimSuffix(kvPath, ".kv") + ".bt"

kv, bt, err := OpenBtreeIndexAndDataFile(indexPath, kvPath, DefaultBtreeM, CompressNone, false)
kv, bt, err := OpenBtreeIndexAndDataFile(indexPath, kvPath, DefaultBtreeM, seg.CompressNone, false)
require.NoError(t, err)
defer bt.Close()
defer kv.Close()
Expand Down Expand Up @@ -1036,7 +1036,7 @@ func pivotKeysFromKV(dataPath string) ([][]byte, error) {
return listing, nil
}

func generateKV(tb testing.TB, tmp string, keySize, valueSize, keyCount int, logger log.Logger, compressFlags FileCompression) string {
func generateKV(tb testing.TB, tmp string, keySize, valueSize, keyCount int, logger log.Logger, compressFlags seg.FileCompression) string {
tb.Helper()

rnd := rand.New(rand.NewSource(0))
Expand Down Expand Up @@ -1066,7 +1066,7 @@ func generateKV(tb testing.TB, tmp string, keySize, valueSize, keyCount int, log
require.NoError(tb, err)
}

writer := NewArchiveWriter(comp, compressFlags)
writer := seg.NewWriter(comp, compressFlags)

loader := func(k, v []byte, _ etl.CurrentTableReader, _ etl.LoadNextFunc) error {
err = writer.AddWord(k)
Expand Down
Loading

0 comments on commit 4b7d8c0

Please sign in to comment.