From 10677ca04718f43f3f7b2e147eba802654ee7c29 Mon Sep 17 00:00:00 2001 From: Alex Sharov Date: Sat, 2 Nov 2024 15:50:40 +0700 Subject: [PATCH] kv: reduce kv interface step2 (#12580) --- cmd/integration/commands/stages.go | 6 - common/changeset/accs.go | 113 ----- common/changeset/storage_changeset_test.go | 456 ------------------ common/debug/memstats.go | 39 -- common/prque/lazyqueue.go | 200 -------- common/prque/lazyqueue_test.go | 127 ----- core/chain_makers.go | 5 +- core/state/recon_writer_inc.go | 137 ------ erigon-lib/kv/dbutils/composite_keys.go | 115 ----- erigon-lib/kv/dbutils/composite_keys_test.go | 77 --- erigon-lib/kv/dbutils/suffix_type.go | 86 ---- erigon-lib/kv/kv_interface.go | 25 +- erigon-lib/kv/mdbx/kv_abstract_test.go | 7 +- erigon-lib/kv/mdbx/kv_mdbx.go | 399 +++------------ erigon-lib/kv/mdbx/kv_mdbx_batch.go | 160 ++++++ erigon-lib/kv/mdbx/kv_mdbx_test.go | 8 +- erigon-lib/kv/membatch/mapmutation.go | 7 +- .../kv/membatchwithdb/memory_mutation.go | 28 +- erigon-lib/kv/order/order.go | 7 + erigon-lib/kv/remotedb/kv_remote.go | 16 +- .../kv/remotedbserver/remotedbserver.go | 13 +- erigon-lib/kv/stream/stream_test.go | 16 +- erigon-lib/state/aggregator.go | 1 + erigon-lib/state/history.go | 8 +- erigon-lib/state/reconst.go | 120 ----- erigon-lib/txpool/pool.go | 5 +- eth/stagedsync/stagedsynctest/harness.go | 7 +- polygon/bridge/mdbx_store.go | 5 +- polygon/heimdall/entity_store.go | 3 +- turbo/app/backup_cmd.go | 169 ------- turbo/trie/trie_root.go | 58 --- 31 files changed, 284 insertions(+), 2139 deletions(-) delete mode 100644 common/changeset/accs.go delete mode 100644 common/changeset/storage_changeset_test.go delete mode 100644 common/debug/memstats.go delete mode 100644 common/prque/lazyqueue.go delete mode 100644 common/prque/lazyqueue_test.go delete mode 100644 core/state/recon_writer_inc.go delete mode 100644 erigon-lib/kv/dbutils/composite_keys_test.go delete mode 100644 erigon-lib/kv/dbutils/suffix_type.go create mode 100644 erigon-lib/kv/mdbx/kv_mdbx_batch.go delete mode 100644 turbo/app/backup_cmd.go diff --git a/cmd/integration/commands/stages.go b/cmd/integration/commands/stages.go index 12b8c119920..6e6c72b5f76 100644 --- a/cmd/integration/commands/stages.go +++ b/cmd/integration/commands/stages.go @@ -720,12 +720,6 @@ func stageHeaders(db kv.RwDB, ctx context.Context, logger log.Logger) error { return db.Update(ctx, func(tx kv.RwTx) error { if reset { - if casted, ok := tx.(kv.CanWarmupDB); ok { - if err := casted.WarmupDB(false); err != nil { - return err - } - } - if err := reset2.ResetBlocks(tx, db, agg, br, bw, dirs, *chainConfig, logger); err != nil { return err } diff --git a/common/changeset/accs.go b/common/changeset/accs.go deleted file mode 100644 index 4ce76c693a0..00000000000 --- a/common/changeset/accs.go +++ /dev/null @@ -1,113 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package changeset - -import ( - common2 "github.com/erigontech/erigon-lib/common" - libcommon "github.com/erigontech/erigon-lib/common" - "github.com/erigontech/erigon-lib/common/hexutility" - "github.com/erigontech/erigon-lib/etl" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/kv/temporal/historyv2" - - "github.com/erigontech/erigon/ethdb" -) - -// GetModifiedAccounts returns a list of addresses that were modified in the block range -// [startNum:endNum) -func GetModifiedAccounts(db kv.Tx, startNum, endNum uint64) ([]libcommon.Address, error) { - changedAddrs := make(map[libcommon.Address]struct{}) - if err := ForRange(db, kv.AccountChangeSet, startNum, endNum, func(blockN uint64, k, v []byte) error { - changedAddrs[libcommon.BytesToAddress(k)] = struct{}{} - return nil - }); err != nil { - return nil, err - } - - if len(changedAddrs) == 0 { - return nil, nil - } - - idx := 0 - result := make([]libcommon.Address, len(changedAddrs)) - for addr := range changedAddrs { - copy(result[idx][:], addr[:]) - idx++ - } - - return result, nil -} - -// [from:to) -func ForRange(db kv.Tx, bucket string, from, to uint64, walker func(blockN uint64, k, v []byte) error) error { - var blockN uint64 - c, err := db.Cursor(bucket) - if err != nil { - return err - } - defer c.Close() - return ethdb.Walk(c, hexutility.EncodeTs(from), 0, func(k, v []byte) (bool, error) { - var err error - blockN, k, v, err = historyv2.FromDBFormat(k, v) - if err != nil { - return false, err - } - if blockN >= to { - return false, nil - } - if err = walker(blockN, k, v); err != nil { - return false, err - } - return true, nil - }) -} - -// RewindData generates rewind data for all plain buckets between the timestamp -// timestapSrc is the current timestamp, and timestamp Dst is where we rewind -func RewindData(db kv.Tx, timestampSrc, timestampDst uint64, changes *etl.Collector, quit <-chan struct{}) error { - if err := walkAndCollect( - changes.Collect, - db, kv.AccountChangeSet, - timestampDst+1, timestampSrc, - quit, - ); err != nil { - return err - } - - if err := walkAndCollect( - changes.Collect, - db, kv.StorageChangeSet, - timestampDst+1, timestampSrc, - quit, - ); err != nil { - return err - } - - return nil -} - -func walkAndCollect(collectorFunc func([]byte, []byte) error, db kv.Tx, bucket string, timestampDst, timestampSrc uint64, quit <-chan struct{}) error { - return ForRange(db, bucket, timestampDst, timestampSrc+1, func(bl uint64, k, v []byte) error { - if err := common2.Stopped(quit); err != nil { - return err - } - if innerErr := collectorFunc(common2.Copy(k), common2.Copy(v)); innerErr != nil { - return innerErr - } - return nil - }) -} diff --git a/common/changeset/storage_changeset_test.go b/common/changeset/storage_changeset_test.go deleted file mode 100644 index c14bb242acd..00000000000 --- a/common/changeset/storage_changeset_test.go +++ /dev/null @@ -1,456 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package changeset - -import ( - "bytes" - "fmt" - "math/rand" - "reflect" - "strconv" - "testing" - - "github.com/erigontech/erigon-lib/kv/dbutils" - - libcommon "github.com/erigontech/erigon-lib/common" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/kv/memdb" - historyv22 "github.com/erigontech/erigon-lib/kv/temporal/historyv2" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - - "github.com/erigontech/erigon/common" -) - -const ( - storageTable = kv.StorageChangeSet - defaultIncarnation = 1 -) - -var numOfChanges = []int{1, 3, 10, 100} - -func getDefaultIncarnation() uint64 { return defaultIncarnation } -func getRandomIncarnation() uint64 { return rand.Uint64() } - -func hashValueGenerator(j int) []byte { - val, _ := libcommon.HashData([]byte("val" + strconv.Itoa(j))) - return val.Bytes() -} - -func emptyValueGenerator(j int) []byte { - return nil -} - -func getTestDataAtIndex(i, j int, inc uint64) []byte { - address := libcommon.HexToAddress(fmt.Sprintf("0xBe828AD8B538D1D691891F6c725dEdc5989abBc%d", i)) - key, _ := libcommon.HashData([]byte("key" + strconv.Itoa(j))) - return dbutils.PlainGenerateCompositeStorageKey(address.Bytes(), inc, key.Bytes()) -} - -func TestEncodingStorageNewWithRandomIncarnation(t *testing.T) { - doTestEncodingStorageNew(t, getRandomIncarnation, hashValueGenerator) -} - -func TestEncodingStorageNewWithDefaultIncarnation(t *testing.T) { - doTestEncodingStorageNew(t, getDefaultIncarnation, hashValueGenerator) -} - -func TestEncodingStorageNewWithDefaultIncarnationAndEmptyValue(t *testing.T) { - doTestEncodingStorageNew(t, getDefaultIncarnation, emptyValueGenerator) -} - -func doTestEncodingStorageNew( - t *testing.T, - incarnationGenerator func() uint64, - valueGenerator func(int) []byte, -) { - m := historyv22.Mapper[storageTable] - - f := func(t *testing.T, numOfElements int, numOfKeys int) { - var err error - ch := m.New() - for i := 0; i < numOfElements; i++ { - inc := incarnationGenerator() - for j := 0; j < numOfKeys; j++ { - key := getTestDataAtIndex(i, j, inc) - val := valueGenerator(j) - err = ch.Add(key, val) - if err != nil { - t.Fatal(err) - } - - } - } - ch2 := m.New() - err = m.Encode(0, ch, func(k, v []byte) error { - var err error - _, k, v, err = m.Decode(k, v) - if err != nil { - return err - } - return ch2.Add(k, v) - }) - if err != nil { - t.Fatal(err) - } - - for i := range ch.Changes { - if !bytes.Equal(ch.Changes[i].Key, ch2.Changes[i].Key) { - t.Log(common.Bytes2Hex(ch.Changes[i].Key)) - t.Log(common.Bytes2Hex(ch2.Changes[i].Key)) - t.Error("not equal", i) - } - } - for i := range ch.Changes { - if !bytes.Equal(ch.Changes[i].Value, ch2.Changes[i].Value) { - t.Log(common.Bytes2Hex(ch.Changes[i].Value)) - t.Log(common.Bytes2Hex(ch2.Changes[i].Value)) - t.Fatal("not equal", i) - } - } - if !reflect.DeepEqual(ch, ch2) { - for i, v := range ch.Changes { - if !bytes.Equal(v.Key, ch2.Changes[i].Key) || !bytes.Equal(v.Value, ch2.Changes[i].Value) { - fmt.Println("Diff ", i) - fmt.Println("k1", common.Bytes2Hex(v.Key), len(v.Key)) - fmt.Println("k2", common.Bytes2Hex(ch2.Changes[i].Key)) - fmt.Println("v1", common.Bytes2Hex(v.Value)) - fmt.Println("v2", common.Bytes2Hex(ch2.Changes[i].Value)) - } - } - t.Error("not equal") - } - } - - for _, v := range numOfChanges { - v := v - t.Run(formatTestName(v, 1), func(t *testing.T) { - f(t, v, 1) - }) - } - - for _, v := range numOfChanges { - v := v - t.Run(formatTestName(v, 5), func(t *testing.T) { - f(t, v, 5) - }) - } - - t.Run(formatTestName(10, 10), func(t *testing.T) { - f(t, 10, 10) - }) - t.Run(formatTestName(50, 1000), func(t *testing.T) { - f(t, 50, 1000) - }) - t.Run(formatTestName(100, 1000), func(t *testing.T) { - f(t, 100, 1000) - }) -} - -func TestEncodingStorageNewWithoutNotDefaultIncarnationWalk(t *testing.T) { - m := historyv22.Mapper[storageTable] - - ch := m.New() - f := func(t *testing.T, numOfElements, numOfKeys int) { - for i := 0; i < numOfElements; i++ { - for j := 0; j < numOfKeys; j++ { - val := hashValueGenerator(j) - key := getTestDataAtIndex(i, j, defaultIncarnation) - err := ch.Add(key, val) - if err != nil { - t.Fatal(err) - } - } - } - - i := 0 - err := m.Encode(0, ch, func(k, v []byte) error { - var err error - _, k, v, err = m.Decode(k, v) - assert.NoError(t, err) - if !bytes.Equal(k, ch.Changes[i].Key) { - t.Log(common.Bytes2Hex(ch.Changes[i].Key)) - t.Log(common.Bytes2Hex(k)) - t.Error(i, "key was incorrect", common.Bytes2Hex(k), common.Bytes2Hex(ch.Changes[i].Key)) - } - if !bytes.Equal(v, ch.Changes[i].Value) { - t.Log(common.Bytes2Hex(ch.Changes[i].Value)) - t.Log(common.Bytes2Hex(v)) - t.Error(i, "val is incorrect", v, ch.Changes[i].Value) - } - i++ - return nil - }) - if err != nil { - t.Fatal(err) - } - } - - for _, v := range numOfChanges { - v := v - t.Run(fmt.Sprintf("elements: %d keys: %d", v, 1), func(t *testing.T) { - f(t, v, 1) - }) - } - - for _, v := range numOfChanges { - v := v - t.Run(fmt.Sprintf("elements: %d keys: %d", v, 5), func(t *testing.T) { - f(t, v, 5) - }) - } - - t.Run(formatTestName(50, 1000), func(t *testing.T) { - f(t, 50, 1000) - }) - t.Run(formatTestName(5, 1000), func(t *testing.T) { - f(t, 5, 1000) - }) -} - -func TestEncodingStorageNewWithoutNotDefaultIncarnationFind(t *testing.T) { - m := historyv22.Mapper[storageTable] - _, tx := memdb.NewTestTx(t) - - clear := func() { - c, err := tx.RwCursor(storageTable) - require.NoError(t, err) - defer c.Close() - for k, _, err := c.First(); k != nil; k, _, err = c.First() { - if err != nil { - t.Fatal(err) - } - err = c.DeleteCurrent() - if err != nil { - t.Fatal(err) - } - } - } - - doTestFind(t, tx, m.Find, clear) -} - -func TestEncodingStorageNewWithoutNotDefaultIncarnationFindWithoutIncarnation(t *testing.T) { - bkt := storageTable - m := historyv22.Mapper[bkt] - _, tx := memdb.NewTestTx(t) - - clear := func() { - c, err := tx.RwCursor(bkt) - require.NoError(t, err) - defer c.Close() - for k, _, err := c.First(); k != nil; k, _, err = c.First() { - if err != nil { - t.Fatal(err) - } - err = c.DeleteCurrent() - if err != nil { - t.Fatal(err) - } - } - } - - doTestFind(t, tx, m.Find, clear) -} - -func doTestFind( - t *testing.T, - tx kv.RwTx, - findFunc func(kv.CursorDupSort, uint64, []byte) ([]byte, error), - clear func(), -) { - m := historyv22.Mapper[storageTable] - t.Helper() - f := func(t *testing.T, numOfElements, numOfKeys int) { - defer clear() - ch := m.New() - for i := 0; i < numOfElements; i++ { - for j := 0; j < numOfKeys; j++ { - val := hashValueGenerator(j) - key := getTestDataAtIndex(i, j, defaultIncarnation) - err := ch.Add(key, val) - if err != nil { - t.Fatal(err) - } - } - } - - c, err := tx.RwCursorDupSort(storageTable) - require.NoError(t, err) - - err = m.Encode(1, ch, func(k, v []byte) error { - if err2 := c.Put(libcommon.Copy(k), libcommon.Copy(v)); err2 != nil { - return err2 - } - return nil - }) - if err != nil { - t.Fatal(err) - } - - for i, v := range ch.Changes { - val, err := findFunc(c, 1, v.Key) - if err != nil { - t.Error(err, i) - } - if !bytes.Equal(val, v.Value) { - t.Fatal("value not equal for ", v.Value, val) - } - } - } - - for _, v := range numOfChanges[:len(numOfChanges)-2] { - v := v - f(t, v, 1) - } - - for _, v := range numOfChanges[:len(numOfChanges)-2] { - v := v - f(t, v, 5) - } - - f(t, 50, 1000) - f(t, 100, 1000) -} - -func BenchmarkDecodeNewStorage(t *testing.B) { - numOfElements := 10 - // empty StorageChangeSet first - ch := historyv22.NewStorageChangeSet() - var err error - for i := 0; i < numOfElements; i++ { - address := []byte("0xa4e69cebbf4f8f3a1c6e493a6983d8a5879d22057a7c73b00e105d7c7e21ef" + strconv.Itoa(i)) - key, _ := libcommon.HashData([]byte("key" + strconv.Itoa(i))) - val, _ := libcommon.HashData([]byte("val" + strconv.Itoa(i))) - err = ch.Add(dbutils.PlainGenerateCompositeStorageKey(address, rand.Uint64(), key[:]), val.Bytes()) - if err != nil { - t.Fatal(err) - } - } - - t.ResetTimer() - var ch2 *historyv22.ChangeSet - for i := 0; i < t.N; i++ { - err := historyv22.EncodeStorage(1, ch, func(k, v []byte) error { - var err error - _, _, _, err = historyv22.DecodeStorage(k, v) - return err - }) - if err != nil { - t.Fatal(err) - } - } - _ = ch2 -} - -func BenchmarkEncodeNewStorage(t *testing.B) { - numOfElements := 10 - // empty StorageChangeSet first - ch := historyv22.NewStorageChangeSet() - var err error - for i := 0; i < numOfElements; i++ { - address := []byte("0xa4e69cebbf4f8f3a1c6e493a6983d8a5879d22057a7c73b00e105d7c7e21ef" + strconv.Itoa(i)) - key, _ := libcommon.HashData([]byte("key" + strconv.Itoa(i))) - val, _ := libcommon.HashData([]byte("val" + strconv.Itoa(i))) - err = ch.Add(dbutils.PlainGenerateCompositeStorageKey(address, rand.Uint64(), key[:]), val.Bytes()) - if err != nil { - t.Fatal(err) - } - } - - t.ResetTimer() - for i := 0; i < t.N; i++ { - err := historyv22.EncodeStorage(1, ch, func(k, v []byte) error { - return nil - }) - if err != nil { - t.Fatal(err) - } - } -} - -func formatTestName(elements, keys int) string { - return fmt.Sprintf("elements: %d keys: %d", elements, keys) -} - -func TestMultipleIncarnationsOfTheSameContract(t *testing.T) { - bkt := kv.StorageChangeSet - m := historyv22.Mapper[bkt] - _, tx := memdb.NewTestTx(t) - - c1, err := tx.CursorDupSort(bkt) - require.NoError(t, err) - defer c1.Close() - - contractA := libcommon.HexToAddress("0x6f0e0cdac6c716a00bd8db4d0eee4f2bfccf8e6a") - contractB := libcommon.HexToAddress("0xc5acb79c258108f288288bc26f7820d06f45f08c") - contractC := libcommon.HexToAddress("0x1cbdd8336800dc3fe27daf5fb5188f0502ac1fc7") - contractD := libcommon.HexToAddress("0xd88eba4c93123372a9f67215f80477bc3644e6ab") - - key1 := libcommon.HexToHash("0xa4e69cebbf4f8f3a1c6e493a6983d8a5879d22057a7c73b00e105d7c7e21efbc") - key2 := libcommon.HexToHash("0x0bece5a88f7b038f806dbef77c0b462506e4b566c5be7dd44e8e2fc7b1f6a99c") - key3 := libcommon.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000001") - key4 := libcommon.HexToHash("0x4fdf6c1878d2469b49684effe69db8689d88a4f1695055538501ff197bc9e30e") - key5 := libcommon.HexToHash("0xaa2703c3ae5d0024b2c3ab77e5200bb2a8eb39a140fad01e89a495d73760297c") - key6 := libcommon.HexToHash("0x000000000000000000000000000000000000000000000000000000000000df77") - key7 := libcommon.HexToHash("0x0000000000000000000000000000000000000000000000000000000000000000") - - val1 := common.FromHex("0x33bf0d0c348a2ef1b3a12b6a535e1e25a56d3624e45603e469626d80fd78c762") - val2 := common.FromHex("0x0000000000000000000000000000000000000000000000000000000000000459") - val3 := common.FromHex("0x0000000000000000000000000000002506e4b566c5be7dd44e8e2fc7b1f6a99c") - val4 := common.FromHex("0x207a386cdf40716455365db189633e822d3a7598558901f2255e64cb5e424714") - val5 := common.FromHex("0x0000000000000000000000000000000000000000000000000000000000000000") - val6 := common.FromHex("0xec89478783348038046b42cc126a3c4e351977b5f4cf5e3c4f4d8385adbf8046") - - c, err := tx.RwCursorDupSort(bkt) - require.NoError(t, err) - - ch := historyv22.NewStorageChangeSet() - assert.NoError(t, ch.Add(dbutils.PlainGenerateCompositeStorageKey(contractA.Bytes(), 2, key1.Bytes()), val1)) - assert.NoError(t, ch.Add(dbutils.PlainGenerateCompositeStorageKey(contractA.Bytes(), 1, key5.Bytes()), val5)) - assert.NoError(t, ch.Add(dbutils.PlainGenerateCompositeStorageKey(contractA.Bytes(), 2, key6.Bytes()), val6)) - - assert.NoError(t, ch.Add(dbutils.PlainGenerateCompositeStorageKey(contractB.Bytes(), 1, key2.Bytes()), val2)) - assert.NoError(t, ch.Add(dbutils.PlainGenerateCompositeStorageKey(contractB.Bytes(), 1, key3.Bytes()), val3)) - - assert.NoError(t, ch.Add(dbutils.PlainGenerateCompositeStorageKey(contractC.Bytes(), 5, key4.Bytes()), val4)) - - assert.NoError(t, historyv22.EncodeStorage(1, ch, func(k, v []byte) error { - return c.Put(k, v) - })) - - data1, err1 := m.Find(c, 1, dbutils.PlainGenerateCompositeStorageKey(contractA.Bytes(), 2, key1.Bytes())) - assert.NoError(t, err1) - assert.Equal(t, data1, val1) - - data3, err3 := m.Find(c, 1, dbutils.PlainGenerateCompositeStorageKey(contractB.Bytes(), 1, key3.Bytes())) - assert.NoError(t, err3) - assert.Equal(t, data3, val3) - - data5, err5 := m.Find(c, 1, dbutils.PlainGenerateCompositeStorageKey(contractA.Bytes(), 1, key5.Bytes())) - assert.NoError(t, err5) - assert.Equal(t, data5, val5) - - _, errA := m.Find(c, 1, dbutils.PlainGenerateCompositeStorageKey(contractA.Bytes(), 1, key1.Bytes())) - assert.Error(t, errA) - - _, errB := m.Find(c, 1, dbutils.PlainGenerateCompositeStorageKey(contractD.Bytes(), 2, key1.Bytes())) - assert.Error(t, errB) - - _, errC := m.Find(c, 1, dbutils.PlainGenerateCompositeStorageKey(contractB.Bytes(), 1, key7.Bytes())) - assert.Error(t, errC) -} diff --git a/common/debug/memstats.go b/common/debug/memstats.go deleted file mode 100644 index 73f3146a87c..00000000000 --- a/common/debug/memstats.go +++ /dev/null @@ -1,39 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package debug - -import ( - "fmt" - "runtime" - - "github.com/erigontech/erigon-lib/common/dbg" -) - -func PrintMemStats(short bool) { - var m runtime.MemStats - dbg.ReadMemStats(&m) - // For info on each, see: https://golang.org/pkg/runtime/#MemStats - if short { - fmt.Printf("HeapInuse: %vMb\n", ByteToMb(m.HeapInuse)) - } else { - fmt.Printf("HeapInuse: %vMb, Alloc: %vMb, TotalAlloc: %vMb, Sys: %vMb, NumGC: %v, PauseNs: %d\n", ByteToMb(m.HeapInuse), ByteToMb(m.Alloc), ByteToMb(m.TotalAlloc), ByteToMb(m.Sys), m.NumGC, m.PauseNs[(m.NumGC+255)%256]) - } -} - -func ByteToMb(b uint64) uint64 { - return b / 1024 / 1024 -} diff --git a/common/prque/lazyqueue.go b/common/prque/lazyqueue.go deleted file mode 100644 index 00ed608ba31..00000000000 --- a/common/prque/lazyqueue.go +++ /dev/null @@ -1,200 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// (original work) -// Copyright 2024 The Erigon Authors -// (modifications) -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package prque - -import ( - "container/heap" - "time" - - "github.com/erigontech/erigon/common/mclock" -) - -// LazyQueue is a priority queue data structure where priorities can change over -// time and are only evaluated on demand. -// Two callbacks are required: -// - priority evaluates the actual priority of an item -// - maxPriority gives an upper estimate for the priority in any moment between -// now and the given absolute time -// -// If the upper estimate is exceeded then Update should be called for that item. -// A global Refresh function should also be called periodically. -type LazyQueue struct { - clock mclock.Clock - // Items are stored in one of two internal queues ordered by estimated max - // priority until the next and the next-after-next refresh. Update and Refresh - // always places items in queue[1]. - queue [2]*sstack - popQueue *sstack - period time.Duration - maxUntil mclock.AbsTime - indexOffset int - setIndex SetIndexCallback - priority PriorityCallback - maxPriority MaxPriorityCallback - lastRefresh1, lastRefresh2 mclock.AbsTime -} - -type ( - PriorityCallback func(data interface{}) int64 // actual priority callback - MaxPriorityCallback func(data interface{}, until mclock.AbsTime) int64 // estimated maximum priority callback -) - -// NewLazyQueue creates a new lazy queue -func NewLazyQueue(setIndex SetIndexCallback, priority PriorityCallback, maxPriority MaxPriorityCallback, clock mclock.Clock, refreshPeriod time.Duration) *LazyQueue { - q := &LazyQueue{ - popQueue: newSstack(nil), - setIndex: setIndex, - priority: priority, - maxPriority: maxPriority, - clock: clock, - period: refreshPeriod, - lastRefresh1: clock.Now(), - lastRefresh2: clock.Now(), - } - q.Reset() - q.refresh(clock.Now()) - return q -} - -// Reset clears the contents of the queue -func (q *LazyQueue) Reset() { - q.queue[0] = newSstack(q.setIndex0) - q.queue[1] = newSstack(q.setIndex1) -} - -// Refresh performs queue re-evaluation if necessary -func (q *LazyQueue) Refresh() { - now := q.clock.Now() - for time.Duration(now-q.lastRefresh2) >= q.period*2 { - q.refresh(now) - q.lastRefresh2 = q.lastRefresh1 - q.lastRefresh1 = now - } -} - -// refresh re-evaluates items in the older queue and swaps the two queues -func (q *LazyQueue) refresh(now mclock.AbsTime) { - q.maxUntil = now + mclock.AbsTime(q.period) - for q.queue[0].Len() != 0 { - q.Push(heap.Pop(q.queue[0]).(*item).value) - } - q.queue[0], q.queue[1] = q.queue[1], q.queue[0] - q.indexOffset = 1 - q.indexOffset - q.maxUntil += mclock.AbsTime(q.period) -} - -// Push adds an item to the queue -func (q *LazyQueue) Push(data interface{}) { - heap.Push(q.queue[1], &item{data, q.maxPriority(data, q.maxUntil)}) -} - -// Update updates the upper priority estimate for the item with the given queue index -func (q *LazyQueue) Update(index int) { - q.Push(q.Remove(index)) -} - -// Pop removes and returns the item with the greatest actual priority -func (q *LazyQueue) Pop() (interface{}, int64) { - var ( - resData interface{} - resPri int64 - ) - q.MultiPop(func(data interface{}, priority int64) bool { - resData = data - resPri = priority - return false - }) - return resData, resPri -} - -// peekIndex returns the index of the internal queue where the item with the -// highest estimated priority is or -1 if both are empty -func (q *LazyQueue) peekIndex() int { - if q.queue[0].Len() != 0 { - if q.queue[1].Len() != 0 && q.queue[1].blocks[0][0].priority > q.queue[0].blocks[0][0].priority { - return 1 - } - return 0 - } - if q.queue[1].Len() != 0 { - return 1 - } - return -1 -} - -// MultiPop pops multiple items from the queue and is more efficient than calling -// Pop multiple times. Popped items are passed to the callback. MultiPop returns -// when the callback returns false or there are no more items to pop. -func (q *LazyQueue) MultiPop(callback func(data interface{}, priority int64) bool) { - nextIndex := q.peekIndex() - for nextIndex != -1 { - data := heap.Pop(q.queue[nextIndex]).(*item).value - heap.Push(q.popQueue, &item{data, q.priority(data)}) - nextIndex = q.peekIndex() - for q.popQueue.Len() != 0 && (nextIndex == -1 || q.queue[nextIndex].blocks[0][0].priority < q.popQueue.blocks[0][0].priority) { - i := heap.Pop(q.popQueue).(*item) - if !callback(i.value, i.priority) { - for q.popQueue.Len() != 0 { - q.Push(heap.Pop(q.popQueue).(*item).value) - } - return - } - nextIndex = q.peekIndex() // re-check because callback is allowed to push items back - } - } -} - -// PopItem pops the item from the queue only, dropping the associated priority value. -func (q *LazyQueue) PopItem() interface{} { - i, _ := q.Pop() - return i -} - -// Remove removes the item with the given index. -func (q *LazyQueue) Remove(index int) interface{} { - if index < 0 { - return nil - } - return heap.Remove(q.queue[index&1^q.indexOffset], index>>1).(*item).value -} - -// Empty checks whether the priority queue is empty. -func (q *LazyQueue) Empty() bool { - return q.queue[0].Len() == 0 && q.queue[1].Len() == 0 -} - -// Size returns the number of items in the priority queue. -func (q *LazyQueue) Size() int { - return q.queue[0].Len() + q.queue[1].Len() -} - -// setIndex0 translates internal queue item index to the virtual index space of LazyQueue -func (q *LazyQueue) setIndex0(data interface{}, index int) { - if index == -1 { - q.setIndex(data, -1) - } else { - q.setIndex(data, index+index) - } -} - -// setIndex1 translates internal queue item index to the virtual index space of LazyQueue -func (q *LazyQueue) setIndex1(data interface{}, index int) { - q.setIndex(data, index+index+1) -} diff --git a/common/prque/lazyqueue_test.go b/common/prque/lazyqueue_test.go deleted file mode 100644 index c4133970375..00000000000 --- a/common/prque/lazyqueue_test.go +++ /dev/null @@ -1,127 +0,0 @@ -// Copyright 2019 The go-ethereum Authors -// (original work) -// Copyright 2024 The Erigon Authors -// (modifications) -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package prque - -import ( - "math/rand" - "sync" - "testing" - "time" - - "github.com/erigontech/erigon/common/mclock" -) - -const ( - testItems = 1000 - testPriorityStep = 100 - testSteps = 1000000 - testStepPeriod = time.Millisecond - testQueueRefresh = time.Second - testAvgRate = float64(testPriorityStep) / float64(testItems) / float64(testStepPeriod) -) - -type lazyItem struct { - p, maxp int64 - last mclock.AbsTime - index int -} - -func testPriority(a interface{}) int64 { - return a.(*lazyItem).p -} - -func testMaxPriority(a interface{}, until mclock.AbsTime) int64 { - i := a.(*lazyItem) - dt := until - i.last - i.maxp = i.p + int64(float64(dt)*testAvgRate) - return i.maxp -} - -func testSetIndex(a interface{}, i int) { - a.(*lazyItem).index = i -} - -func TestLazyQueue(t *testing.T) { - rand.Seed(time.Now().UnixNano()) - clock := &mclock.Simulated{} - q := NewLazyQueue(testSetIndex, testPriority, testMaxPriority, clock, testQueueRefresh) - - var ( - items [testItems]lazyItem - maxPri int64 - ) - - for i := range items[:] { - items[i].p = rand.Int63n(testPriorityStep * 10) - if items[i].p > maxPri { - maxPri = items[i].p - } - items[i].index = -1 - q.Push(&items[i]) - } - - var ( - lock sync.Mutex - wg sync.WaitGroup - stopCh = make(chan chan struct{}) - ) - defer wg.Wait() - wg.Add(1) - go func() { - defer wg.Done() - for { - select { - case <-clock.After(testQueueRefresh): - lock.Lock() - q.Refresh() - lock.Unlock() - case <-stopCh: - return - } - } - }() - - for c := 0; c < testSteps; c++ { - i := rand.Intn(testItems) - lock.Lock() - items[i].p += rand.Int63n(testPriorityStep*2-1) + 1 - if items[i].p > maxPri { - maxPri = items[i].p - } - items[i].last = clock.Now() - if items[i].p > items[i].maxp { - q.Update(items[i].index) - } - if rand.Intn(100) == 0 { - p := q.PopItem().(*lazyItem) - if p.p != maxPri { - lock.Unlock() - close(stopCh) - t.Fatalf("incorrect item (best known priority %d, popped %d)", maxPri, p.p) - } - q.Push(p) - } - lock.Unlock() - clock.Run(testStepPeriod) - clock.WaitForTimers(1) - } - - close(stopCh) -} diff --git a/core/chain_makers.go b/core/chain_makers.go index 68e41a3f422..ad4013fdd08 100644 --- a/core/chain_makers.go +++ b/core/chain_makers.go @@ -26,6 +26,7 @@ import ( "fmt" "math/big" + "github.com/erigontech/erigon-lib/kv/order" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon-lib/chain" @@ -516,7 +517,7 @@ func CalcHashRootForTests(tx kv.RwTx, header *types.Header, histV4, trace bool) if trace { if GenerateTrace { fmt.Printf("State after %d================\n", header.Number) - it, err := tx.Range(kv.HashedAccounts, nil, nil) + it, err := tx.Range(kv.HashedAccounts, nil, nil, order.Asc, kv.Unlim) if err != nil { return hashRoot, err } @@ -528,7 +529,7 @@ func CalcHashRootForTests(tx kv.RwTx, header *types.Header, histV4, trace bool) fmt.Printf("%x: %x\n", k, v) } fmt.Printf("..................\n") - it, err = tx.Range(kv.HashedStorage, nil, nil) + it, err = tx.Range(kv.HashedStorage, nil, nil, order.Asc, kv.Unlim) if err != nil { return hashRoot, err } diff --git a/core/state/recon_writer_inc.go b/core/state/recon_writer_inc.go deleted file mode 100644 index f9f97b55136..00000000000 --- a/core/state/recon_writer_inc.go +++ /dev/null @@ -1,137 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package state - -import ( - "bytes" - - "github.com/holiman/uint256" - - libcommon "github.com/erigontech/erigon-lib/common" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon-lib/kv/dbutils" - libstate "github.com/erigontech/erigon-lib/state" - - "github.com/erigontech/erigon/core/types/accounts" -) - -type StateReconWriterInc struct { - as *libstate.AggregatorStep - rs *ReconState - txNum uint64 - tx kv.Tx - chainTx kv.Tx -} - -func NewStateReconWriterInc(as *libstate.AggregatorStep, rs *ReconState) *StateReconWriterInc { - return &StateReconWriterInc{ - as: as, - rs: rs, - } -} - -func (w *StateReconWriterInc) SetTxNum(txNum uint64) { - w.txNum = txNum -} - -func (w *StateReconWriterInc) SetTx(tx kv.Tx) { - w.tx = tx -} - -func (w *StateReconWriterInc) SetChainTx(chainTx kv.Tx) { - w.chainTx = chainTx -} - -func (w *StateReconWriterInc) UpdateAccountData(address libcommon.Address, original, account *accounts.Account) error { - addr := address.Bytes() - if ok, stateTxNum := w.as.MaxTxNumAccounts(addr); !ok || stateTxNum != w.txNum { - return nil - } - value := make([]byte, account.EncodingLengthForStorage()) - if account.Incarnation > 0 { - account.Incarnation = FirstContractIncarnation - } - account.EncodeForStorage(value) - w.rs.Put(kv.PlainStateR, addr, nil, value, w.txNum) - return nil -} - -func (w *StateReconWriterInc) UpdateAccountCode(address libcommon.Address, incarnation uint64, codeHash libcommon.Hash, code []byte) error { - addr, codeHashBytes := address.Bytes(), codeHash.Bytes() - if ok, stateTxNum := w.as.MaxTxNumCode(addr); !ok || stateTxNum != w.txNum { - return nil - } - if len(code) > 0 { - w.rs.Put(kv.CodeR, codeHashBytes, nil, libcommon.CopyBytes(code), w.txNum) - w.rs.Put(kv.PlainContractR, dbutils.PlainGenerateStoragePrefix(addr, FirstContractIncarnation), nil, codeHashBytes, w.txNum) - } else { - w.rs.Delete(kv.PlainContractD, dbutils.PlainGenerateStoragePrefix(addr, FirstContractIncarnation), nil, w.txNum) - } - return nil -} - -func (w *StateReconWriterInc) DeleteAccount(address libcommon.Address, original *accounts.Account) error { - addr := address.Bytes() - if ok, stateTxNum := w.as.MaxTxNumAccounts(addr); ok && stateTxNum == w.txNum { - //fmt.Printf("delete account [%x]=>{} txNum: %d\n", address, w.txNum) - w.rs.Delete(kv.PlainStateD, addr, nil, w.txNum) - } - // Iterate over storage of this contract and delete it too - var c kv.Cursor - var err error - if c, err = w.chainTx.Cursor(kv.PlainState); err != nil { - return err - } - defer c.Close() - var k []byte - for k, _, err = c.Seek(addr); err == nil && bytes.HasPrefix(k, addr); k, _, err = c.Next() { - //fmt.Printf("delete account storage [%x] [%x]=>{} txNum: %d\n", address, k[20+8:], w.txNum) - if len(k) > 20 { - w.rs.Delete(kv.PlainStateD, addr, libcommon.CopyBytes(k[20+8:]), w.txNum) - } - } - if err != nil { - return err - } - // Delete all pending storage for this contract - w.rs.RemoveAll(kv.PlainStateR, addr) - w.rs.RemoveAll(kv.PlainStateD, addr) - // Delete code - if ok, stateTxNum := w.as.MaxTxNumCode(addr); ok && stateTxNum == w.txNum { - w.rs.Delete(kv.PlainContractD, dbutils.PlainGenerateStoragePrefix(addr, FirstContractIncarnation), nil, w.txNum) - } - return nil -} - -func (w *StateReconWriterInc) WriteAccountStorage(address libcommon.Address, incarnation uint64, key *libcommon.Hash, original, value *uint256.Int) error { - addr, k := address.Bytes(), key.Bytes() - if ok, stateTxNum := w.as.MaxTxNumStorage(addr, k); !ok || stateTxNum != w.txNum { - return nil - } - if value.IsZero() { - w.rs.Delete(kv.PlainStateD, addr, k, w.txNum) - //fmt.Printf("delete storage [%x] [%x] => [%x], txNum: %d\n", address, *key, value.Bytes(), w.txNum) - } else { - //fmt.Printf("storage [%x] [%x] => [%x], txNum: %d\n", address, *key, value.Bytes(), w.txNum) - w.rs.Put(kv.PlainStateR, addr, k, value.Bytes(), w.txNum) - } - return nil -} - -func (w *StateReconWriterInc) CreateContract(address libcommon.Address) error { - return nil -} diff --git a/erigon-lib/kv/dbutils/composite_keys.go b/erigon-lib/kv/dbutils/composite_keys.go index 9e908ccd2be..a98152fb512 100644 --- a/erigon-lib/kv/dbutils/composite_keys.go +++ b/erigon-lib/kv/dbutils/composite_keys.go @@ -67,16 +67,6 @@ func LogKey(blockNumber uint64, txId uint32) []byte { return newK } -// bloomBitsKey = bloomBitsPrefix + bit (uint16 big endian) + section (uint64 big endian) + hash -func BloomBitsKey(bit uint, section uint64, hash libcommon.Hash) []byte { - key := append(make([]byte, 10), hash.Bytes()...) - - binary.BigEndian.PutUint16(key[0:], uint16(bit)) - binary.BigEndian.PutUint64(key[2:], section) - - return key -} - // AddrHash + KeyHash // Only for trie func GenerateCompositeTrieKey(addressHash libcommon.Hash, seckey libcommon.Hash) []byte { @@ -85,108 +75,3 @@ func GenerateCompositeTrieKey(addressHash libcommon.Hash, seckey libcommon.Hash) compositeKey = append(compositeKey, seckey[:]...) return compositeKey } - -// AddrHash + incarnation + KeyHash -// For contract storage -func GenerateCompositeStorageKey(addressHash libcommon.Hash, incarnation uint64, seckey libcommon.Hash) []byte { - compositeKey := make([]byte, length.Hash+length.Incarnation+length.Hash) - copy(compositeKey, addressHash[:]) - binary.BigEndian.PutUint64(compositeKey[length.Hash:], incarnation) - copy(compositeKey[length.Hash+length.Incarnation:], seckey[:]) - return compositeKey -} - -func ParseCompositeStorageKey(compositeKey []byte) (libcommon.Hash, uint64, libcommon.Hash) { - prefixLen := length.Hash + length.Incarnation - addrHash, inc := ParseStoragePrefix(compositeKey[:prefixLen]) - var key libcommon.Hash - copy(key[:], compositeKey[prefixLen:prefixLen+length.Hash]) - return addrHash, inc, key -} - -// AddrHash + incarnation + KeyHash -// For contract storage (for plain state) -func PlainGenerateCompositeStorageKey(address []byte, incarnation uint64, key []byte) []byte { - compositeKey := make([]byte, length.Addr+length.Incarnation+length.Hash) - copy(compositeKey, address) - binary.BigEndian.PutUint64(compositeKey[length.Addr:], incarnation) - copy(compositeKey[length.Addr+length.Incarnation:], key) - return compositeKey -} - -func PlainParseCompositeStorageKey(compositeKey []byte) (libcommon.Address, uint64, libcommon.Hash) { - prefixLen := length.Addr + length.Incarnation - addr, inc := PlainParseStoragePrefix(compositeKey[:prefixLen]) - var key libcommon.Hash - copy(key[:], compositeKey[prefixLen:prefixLen+length.Hash]) - return addr, inc, key -} - -// AddrHash + incarnation + StorageHashPrefix -func GenerateCompositeStoragePrefix(addressHash []byte, incarnation uint64, storageHashPrefix []byte) []byte { - key := make([]byte, length.Hash+length.Incarnation+len(storageHashPrefix)) - copy(key, addressHash) - binary.BigEndian.PutUint64(key[length.Hash:], incarnation) - copy(key[length.Hash+length.Incarnation:], storageHashPrefix) - return key -} - -// address hash + incarnation prefix -func GenerateStoragePrefix(addressHash []byte, incarnation uint64) []byte { - prefix := make([]byte, length.Hash+NumberLength) - copy(prefix, addressHash) - binary.BigEndian.PutUint64(prefix[length.Hash:], incarnation) - return prefix -} - -// address hash + incarnation prefix (for plain state) -func PlainGenerateStoragePrefix(address []byte, incarnation uint64) []byte { - prefix := make([]byte, length.Addr+NumberLength) - copy(prefix, address) - binary.BigEndian.PutUint64(prefix[length.Addr:], incarnation) - return prefix -} - -func PlainParseStoragePrefix(prefix []byte) (libcommon.Address, uint64) { - var addr libcommon.Address - copy(addr[:], prefix[:length.Addr]) - inc := binary.BigEndian.Uint64(prefix[length.Addr : length.Addr+length.Incarnation]) - return addr, inc -} - -func ParseStoragePrefix(prefix []byte) (libcommon.Hash, uint64) { - var addrHash libcommon.Hash - copy(addrHash[:], prefix[:length.Hash]) - inc := binary.BigEndian.Uint64(prefix[length.Hash : length.Hash+length.Incarnation]) - return addrHash, inc -} - -// Key + blockNum -func CompositeKeySuffix(key []byte, timestamp uint64) (composite, encodedTS []byte) { - encodedTS = encodeTimestamp(timestamp) - composite = make([]byte, len(key)+len(encodedTS)) - copy(composite, key) - copy(composite[len(key):], encodedTS) - return composite, encodedTS -} - -// encodeTimestamp has the property: if a < b, then Encoding(a) < Encoding(b) lexicographically -func encodeTimestamp(timestamp uint64) []byte { - var suffix []byte - var limit uint64 = 32 - - for bytecount := 1; bytecount <= 8; bytecount++ { - if timestamp < limit { - suffix = make([]byte, bytecount) - b := timestamp - for i := bytecount - 1; i > 0; i-- { - suffix[i] = byte(b & 0xff) - b >>= 8 - } - suffix[0] = byte(b) | (byte(bytecount) << 5) // 3 most significant bits of the first byte are bytecount - break - } - limit <<= 8 - } - return suffix -} diff --git a/erigon-lib/kv/dbutils/composite_keys_test.go b/erigon-lib/kv/dbutils/composite_keys_test.go deleted file mode 100644 index 9c4bebe2eef..00000000000 --- a/erigon-lib/kv/dbutils/composite_keys_test.go +++ /dev/null @@ -1,77 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package dbutils - -import ( - "testing" - - "github.com/erigontech/erigon-lib/common" - libcommon "github.com/erigontech/erigon-lib/common" - "github.com/stretchr/testify/assert" -) - -func TestPlainParseStoragePrefix(t *testing.T) { - expectedAddr := libcommon.HexToAddress("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c") - expectedIncarnation := uint64(999000999) - - prefix := PlainGenerateStoragePrefix(expectedAddr[:], expectedIncarnation) - - addr, incarnation := PlainParseStoragePrefix(prefix) - - assert.Equal(t, expectedAddr, addr, "address should be extracted") - assert.Equal(t, expectedIncarnation, incarnation, "incarnation should be extracted") -} - -func TestPlainParseCompositeStorageKey(t *testing.T) { - expectedAddr := libcommon.HexToAddress("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c") - expectedIncarnation := uint64(999000999) - expectedKey := libcommon.HexToHash("0x58833f949125129fb8c6c93d2c6003c5bab7c0b116d695f4ca137b1debf4e472") - - compositeKey := PlainGenerateCompositeStorageKey(expectedAddr.Bytes(), expectedIncarnation, expectedKey.Bytes()) - - addr, incarnation, key := PlainParseCompositeStorageKey(compositeKey) - - assert.Equal(t, expectedAddr, addr, "address should be extracted") - assert.Equal(t, expectedIncarnation, incarnation, "incarnation should be extracted") - assert.Equal(t, expectedKey, key, "key should be extracted") -} - -func TestParseStoragePrefix(t *testing.T) { - expectedAddrHash, _ := libcommon.HashData(libcommon.HexToAddress("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c").Bytes()) - expectedIncarnation := uint64(999000999) - - prefix := GenerateStoragePrefix(expectedAddrHash[:], expectedIncarnation) - - addrHash, incarnation := ParseStoragePrefix(prefix) - - assert.Equal(t, expectedAddrHash, addrHash, "address should be extracted") - assert.Equal(t, expectedIncarnation, incarnation, "incarnation should be extracted") -} - -func TestParseCompositeStorageKey(t *testing.T) { - expectedAddrHash, _ := common.HashData(libcommon.HexToAddress("0x5A0b54D5dc17e0AadC383d2db43B0a0D3E029c4c").Bytes()) - expectedIncarnation := uint64(999000999) - expectedKey := libcommon.HexToHash("0x58833f949125129fb8c6c93d2c6003c5bab7c0b116d695f4ca137b1debf4e472") - - compositeKey := GenerateCompositeStorageKey(expectedAddrHash, expectedIncarnation, expectedKey) - - addrHash, incarnation, key := ParseCompositeStorageKey(compositeKey) - - assert.Equal(t, expectedAddrHash, addrHash, "address should be extracted") - assert.Equal(t, expectedIncarnation, incarnation, "incarnation should be extracted") - assert.Equal(t, expectedKey, key, "key should be extracted") -} diff --git a/erigon-lib/kv/dbutils/suffix_type.go b/erigon-lib/kv/dbutils/suffix_type.go deleted file mode 100644 index 40dcd797dde..00000000000 --- a/erigon-lib/kv/dbutils/suffix_type.go +++ /dev/null @@ -1,86 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package dbutils - -import "encoding/binary" - -type Suffix []byte - -func ToSuffix(b []byte) Suffix { - return b -} - -func (s Suffix) Add(key []byte) Suffix { - var l int - if s == nil { - l = 4 - } else { - l = len(s) - } - dv := make([]byte, l+1+len(key)) - copy(dv, s) - binary.BigEndian.PutUint32(dv, 1+s.KeyCount()) // Increment the counter of keys - dv[l] = byte(len(key)) - copy(dv[l+1:], key) - return dv -} -func (s Suffix) MultiAdd(keys [][]byte) Suffix { - var l int - if s == nil { - l = 4 - } else { - l = len(s) - } - newLen := len(keys) - for _, key := range keys { - newLen += len(key) - } - dv := make([]byte, l+newLen) - copy(dv, s) - binary.BigEndian.PutUint32(dv, uint32(len(keys))+s.KeyCount()) - i := l - for _, key := range keys { - dv[i] = byte(len(key)) - i++ - copy(dv[i:], key) - i += len(key) - } - return dv -} - -func (s Suffix) KeyCount() uint32 { - if len(s) < 4 { - return 0 - } - return binary.BigEndian.Uint32(s) -} - -func (s Suffix) Walk(f func(k []byte) error) error { - keyCount := int(s.KeyCount()) - for i, ki := 4, 0; ki < keyCount; ki++ { - l := int(s[i]) - i++ - kk := make([]byte, l) - copy(kk, s[i:i+l]) - err := f(kk) - if err != nil { - return err - } - i += l - } - return nil -} diff --git a/erigon-lib/kv/kv_interface.go b/erigon-lib/kv/kv_interface.go index 27099f5aa15..e40e7f6fda6 100644 --- a/erigon-lib/kv/kv_interface.go +++ b/erigon-lib/kv/kv_interface.go @@ -77,9 +77,6 @@ import ( const ReadersLimit = 32000 // MDBX_READERS_LIMIT=32767 -// const Unbounded []byte = nil -const Unlim int = -1 - var ( ErrAttemptToDeleteNonDeprecatedBucket = errors.New("only buckets from dbutils.ChaindataDeprecatedTables can be deleted") @@ -345,6 +342,9 @@ type StatelessRwTx interface { Putter } +// const Unbounded/EOF/EndOfTable []byte = nil +const Unlim int = -1 + // Tx // WARNING: // - Tx is not threadsafe and may only be used in the goroutine that created it @@ -366,23 +366,16 @@ type Tx interface { Cursor(table string) (Cursor, error) CursorDupSort(table string) (CursorDupSort, error) // CursorDupSort - can be used if bucket has mdbx.DupSort flag - DBSize() (uint64, error) - // --- High-Level methods: 1request -> stream of server-side pushes --- // Range [from, to) // Range(from, nil) means [from, EndOfTable) // Range(nil, to) means [StartOfTable, to) - Range(table string, fromPrefix, toPrefix []byte) (stream.KV, error) - // Stream is like Range, but for requesting huge data (Example: full table scan). Client can't stop it. - // Stream(table string, fromPrefix, toPrefix []byte) (stream.KV, error) - // RangeAscend - like Range [from, to) but also allow pass Limit parameters + // if `order.Desc` expecing `from`<`to` // Limit -1 means Unlimited - RangeAscend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) - // StreamAscend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) - // RangeDescend - is like Range [from, to), but expecing `from`<`to` - // example: RangeDescend("Table", "B", "A", -1) - RangeDescend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) + // Designed for requesting huge data (Example: full table scan). Client can't stop it. + // Example: RangeDescend("Table", "B", "A", order.Asc, -1) + Range(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) //StreamDescend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) // Prefix - is exactly Range(Table, prefix, kv.NextSubtree(prefix)) Prefix(table string, prefix []byte) (stream.KV, error) @@ -569,10 +562,6 @@ type TemporalPutDel interface { type TxnId uint64 // internal auto-increment ID. can't cast to eth-network canonical blocks txNum -type CanWarmupDB interface { - WarmupDB(force bool) error - LockDBInRam() error -} type HasSpaceDirty interface { SpaceDirty() (uint64, uint64, error) } diff --git a/erigon-lib/kv/mdbx/kv_abstract_test.go b/erigon-lib/kv/mdbx/kv_abstract_test.go index 3aaa4fad0f3..8070e892b71 100644 --- a/erigon-lib/kv/mdbx/kv_abstract_test.go +++ b/erigon-lib/kv/mdbx/kv_abstract_test.go @@ -23,6 +23,7 @@ import ( "runtime" "testing" + "github.com/erigontech/erigon-lib/kv/order" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "google.golang.org/grpc" @@ -271,7 +272,7 @@ func TestRemoteKvRange(t *testing.T) { err = db.View(ctx, func(tx kv.Tx) error { cntRange := func(from, to []byte) (i int) { - it, err := tx.Range(kv.AccountChangeSet, from, to) + it, err := tx.Range(kv.AccountChangeSet, from, to, order.Asc, kv.Unlim) require.NoError(err) for it.HasNext() { _, _, err = it.Next() @@ -292,7 +293,7 @@ func TestRemoteKvRange(t *testing.T) { // Limit err = db.View(ctx, func(tx kv.Tx) error { cntRange := func(from, to []byte) (i int) { - it, err := tx.RangeAscend(kv.AccountChangeSet, from, to, 2) + it, err := tx.Range(kv.AccountChangeSet, from, to, order.Asc, 2) require.NoError(err) for it.HasNext() { _, _, err := it.Next() @@ -312,7 +313,7 @@ func TestRemoteKvRange(t *testing.T) { err = db.View(ctx, func(tx kv.Tx) error { cntRange := func(from, to []byte) (i int) { - it, err := tx.RangeDescend(kv.AccountChangeSet, from, to, 2) + it, err := tx.Range(kv.AccountChangeSet, from, to, order.Desc, 2) require.NoError(err) for it.HasNext() { _, _, err := it.Next() diff --git a/erigon-lib/kv/mdbx/kv_mdbx.go b/erigon-lib/kv/mdbx/kv_mdbx.go index 27779f57744..968a24e2fa7 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx.go +++ b/erigon-lib/kv/mdbx/kv_mdbx.go @@ -97,41 +97,34 @@ func NewMDBX(log log.Logger) MdbxOpts { func (opts MdbxOpts) GetLabel() kv.Label { return opts.label } func (opts MdbxOpts) GetInMem() bool { return opts.inMem } func (opts MdbxOpts) GetPageSize() uint64 { return opts.pageSize } - -func (opts MdbxOpts) Label(label kv.Label) MdbxOpts { - opts.label = label - return opts -} - -func (opts MdbxOpts) DirtySpace(s uint64) MdbxOpts { - opts.dirtySpace = s - return opts -} - -func (opts MdbxOpts) RoTxsLimiter(l *semaphore.Weighted) MdbxOpts { - opts.roTxsLimiter = l - return opts -} - -func (opts MdbxOpts) PageSize(v uint64) MdbxOpts { - opts.pageSize = v - return opts -} - -func (opts MdbxOpts) GrowthStep(v datasize.ByteSize) MdbxOpts { - opts.growthStep = v - return opts +func (opts MdbxOpts) Set(opt MdbxOpts) MdbxOpts { + return opt } +func (opts MdbxOpts) HasFlag(flag uint) bool { return opts.flags&flag != 0 } -func (opts MdbxOpts) Path(path string) MdbxOpts { - opts.path = path +func (opts MdbxOpts) Label(label kv.Label) MdbxOpts { opts.label = label; return opts } +func (opts MdbxOpts) DirtySpace(s uint64) MdbxOpts { opts.dirtySpace = s; return opts } +func (opts MdbxOpts) RoTxsLimiter(l *semaphore.Weighted) MdbxOpts { opts.roTxsLimiter = l; return opts } +func (opts MdbxOpts) PageSize(v uint64) MdbxOpts { opts.pageSize = v; return opts } +func (opts MdbxOpts) GrowthStep(v datasize.ByteSize) MdbxOpts { opts.growthStep = v; return opts } +func (opts MdbxOpts) Path(path string) MdbxOpts { opts.path = path; return opts } +func (opts MdbxOpts) Exclusive() MdbxOpts { opts.flags = opts.flags | mdbx.Exclusive; return opts } +func (opts MdbxOpts) Flags(f func(uint) uint) MdbxOpts { opts.flags = f(opts.flags); return opts } +func (opts MdbxOpts) Readonly() MdbxOpts { opts.flags = opts.flags | mdbx.Readonly; return opts } +func (opts MdbxOpts) Accede() MdbxOpts { opts.flags = opts.flags | mdbx.Accede; return opts } +func (opts MdbxOpts) SyncPeriod(period time.Duration) MdbxOpts { opts.syncPeriod = period; return opts } +func (opts MdbxOpts) DBVerbosity(v kv.DBVerbosityLvl) MdbxOpts { opts.verbosity = v; return opts } +func (opts MdbxOpts) MapSize(sz datasize.ByteSize) MdbxOpts { opts.mapSize = sz; return opts } +func (opts MdbxOpts) LifoReclaim() MdbxOpts { opts.flags |= mdbx.LifoReclaim; return opts } +func (opts MdbxOpts) WriteMergeThreshold(v uint64) MdbxOpts { opts.mergeThreshold = v; return opts } +func (opts MdbxOpts) WithTableCfg(f TableCfgFunc) MdbxOpts { opts.bucketsCfg = f; return opts } +func (opts MdbxOpts) WriteMap(flag bool) MdbxOpts { + if flag { + opts.flags |= mdbx.WriteMap + } return opts } -func (opts MdbxOpts) Set(opt MdbxOpts) MdbxOpts { - return opt -} - func (opts MdbxOpts) InMem(tmpDir string) MdbxOpts { if tmpDir != "" { if err := os.MkdirAll(tmpDir, 0755); err != nil { @@ -153,62 +146,6 @@ func (opts MdbxOpts) InMem(tmpDir string) MdbxOpts { return opts } -func (opts MdbxOpts) Exclusive() MdbxOpts { - opts.flags = opts.flags | mdbx.Exclusive - return opts -} - -func (opts MdbxOpts) Flags(f func(uint) uint) MdbxOpts { - opts.flags = f(opts.flags) - return opts -} - -func (opts MdbxOpts) HasFlag(flag uint) bool { return opts.flags&flag != 0 } -func (opts MdbxOpts) Readonly() MdbxOpts { - opts.flags = opts.flags | mdbx.Readonly - return opts -} -func (opts MdbxOpts) Accede() MdbxOpts { - opts.flags = opts.flags | mdbx.Accede - return opts -} - -func (opts MdbxOpts) SyncPeriod(period time.Duration) MdbxOpts { - opts.syncPeriod = period - return opts -} - -func (opts MdbxOpts) DBVerbosity(v kv.DBVerbosityLvl) MdbxOpts { - opts.verbosity = v - return opts -} - -func (opts MdbxOpts) MapSize(sz datasize.ByteSize) MdbxOpts { - opts.mapSize = sz - return opts -} - -func (opts MdbxOpts) WriteMap(flag bool) MdbxOpts { - if flag { - opts.flags |= mdbx.WriteMap - } - return opts -} -func (opts MdbxOpts) LifoReclaim() MdbxOpts { - opts.flags |= mdbx.LifoReclaim - return opts -} - -func (opts MdbxOpts) WriteMergeThreshold(v uint64) MdbxOpts { - opts.mergeThreshold = v - return opts -} - -func (opts MdbxOpts) WithTableCfg(f TableCfgFunc) MdbxOpts { - opts.bucketsCfg = f - return opts -} - var pathDbMap = map[string]kv.RoDB{} var pathDbMapLock sync.Mutex @@ -514,139 +451,6 @@ type MdbxKV struct { batch *batch } -// Default values if not set in a DB instance. -const ( - DefaultMaxBatchSize int = 1000 - DefaultMaxBatchDelay = 10 * time.Millisecond -) - -type batch struct { - db *MdbxKV - timer *time.Timer - start sync.Once - calls []call -} - -type call struct { - fn func(kv.RwTx) error - err chan<- error -} - -// trigger runs the batch if it hasn't already been run. -func (b *batch) trigger() { - b.start.Do(b.run) -} - -// run performs the transactions in the batch and communicates results -// back to DB.Batch. -func (b *batch) run() { - b.db.batchMu.Lock() - b.timer.Stop() - // Make sure no new work is added to this batch, but don't break - // other batches. - if b.db.batch == b { - b.db.batch = nil - } - b.db.batchMu.Unlock() - -retry: - for len(b.calls) > 0 { - var failIdx = -1 - err := b.db.Update(context.Background(), func(tx kv.RwTx) error { - for i, c := range b.calls { - if err := safelyCall(c.fn, tx); err != nil { - failIdx = i - return err - } - } - return nil - }) - - if failIdx >= 0 { - // take the failing transaction out of the batch. it's - // safe to shorten b.calls here because db.batch no longer - // points to us, and we hold the mutex anyway. - c := b.calls[failIdx] - b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1] - // tell the submitter re-run it solo, continue with the rest of the batch - c.err <- trySolo - continue retry - } - - // pass success, or bolt internal errors, to all callers - for _, c := range b.calls { - c.err <- err - } - break retry - } -} - -// trySolo is a special sentinel error value used for signaling that a -// transaction function should be re-run. It should never be seen by -// callers. -var trySolo = errors.New("batch function returned an error and should be re-run solo") - -type panicked struct { - reason interface{} -} - -func (p panicked) Error() string { - if err, ok := p.reason.(error); ok { - return err.Error() - } - return fmt.Sprintf("panic: %v", p.reason) -} - -func safelyCall(fn func(tx kv.RwTx) error, tx kv.RwTx) (err error) { - defer func() { - if p := recover(); p != nil { - err = panicked{p} - } - }() - return fn(tx) -} - -// Batch is only useful when there are multiple goroutines calling it. -// It behaves similar to Update, except: -// -// 1. concurrent Batch calls can be combined into a single RwTx. -// -// 2. the function passed to Batch may be called multiple times, -// regardless of whether it returns error or not. -// -// This means that Batch function side effects must be idempotent and -// take permanent effect only after a successful return is seen in -// caller. -// -// Example of bad side-effects: print messages, mutate external counters `i++` -// -// The maximum batch size and delay can be adjusted with DB.MaxBatchSize -// and DB.MaxBatchDelay, respectively. -func (db *MdbxKV) Batch(fn func(tx kv.RwTx) error) error { - errCh := make(chan error, 1) - - db.batchMu.Lock() - if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) { - // There is no existing batch, or the existing batch is full; start a new one. - db.batch = &batch{ - db: db, - } - db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger) - } - db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) - if len(db.batch.calls) >= db.MaxBatchSize { - // wake up batch, it's ready to run - go db.batch.trigger() - } - db.batchMu.Unlock() - - err := <-errCh - if errors.Is(err, trySolo) { - err = db.Update(context.Background(), fn) - } - return err -} - func (db *MdbxKV) Path() string { return db.opts.path } func (db *MdbxKV) PageSize() uint64 { return db.opts.pageSize } func (db *MdbxKV) ReadOnly() bool { return db.opts.HasFlag(mdbx.Readonly) } @@ -841,9 +645,13 @@ type MdbxCursor struct { id uint64 } -func (db *MdbxKV) Env() *mdbx.Env { - return db.env +func (db *MdbxKV) Env() *mdbx.Env { return db.env } +func (db *MdbxKV) AllTables() kv.TableCfg { + return db.buckets } +func (tx *MdbxTx) IsRo() bool { return tx.readOnly } +func (tx *MdbxTx) ViewID() uint64 { return tx.tx.ID() } +func (tx *MdbxTx) ListBuckets() ([]string, error) { return tx.tx.ListDBI() } func (db *MdbxKV) AllDBI() map[string]kv.DBI { res := map[string]kv.DBI{} @@ -861,13 +669,6 @@ func (tx *MdbxTx) Count(bucket string) (uint64, error) { return st.Entries, nil } -func (db *MdbxKV) AllTables() kv.TableCfg { - return db.buckets -} - -func (tx *MdbxTx) IsRo() bool { return tx.readOnly } -func (tx *MdbxTx) ViewID() uint64 { return tx.tx.ID() } - func (tx *MdbxTx) CollectMetrics() { if tx.db.opts.label != kv.ChainDB { return @@ -914,9 +715,6 @@ func (tx *MdbxTx) CollectMetrics() { kv.GcPagesMetric.SetUint64((gc.LeafPages + gc.OverflowPages) * tx.db.opts.pageSize / 8) } -// ListBuckets - all buckets stored as keys of un-named bucket -func (tx *MdbxTx) ListBuckets() ([]string, error) { return tx.tx.ListDBI() } - func (tx *MdbxTx) WarmupDB(force bool) error { if force { return tx.tx.EnvWarmup(mdbx.WarmupForce|mdbx.WarmupOomSafe, time.Hour) @@ -1143,30 +941,6 @@ func (tx *MdbxTx) SpaceDirty() (uint64, uint64, error) { return txInfo.SpaceDirty, tx.db.txSize, nil } -func (tx *MdbxTx) PrintDebugInfo() { - /* - txInfo, err := tx.tx.Info(true) - if err != nil { - panic(err) - } - - txSize := uint(txInfo.SpaceDirty / 1024) - doPrint := debug.BigRoTxKb() == 0 && debug.BigRwTxKb() == 0 || - tx.readOnly && debug.BigRoTxKb() > 0 && txSize > debug.BigRoTxKb() || - (!tx.readOnly && debug.BigRwTxKb() > 0 && txSize > debug.BigRwTxKb()) - if doPrint { - tx.db.log.Info("Tx info", - "id", txInfo.Id, - "read_lag", txInfo.ReadLag, - "ro", tx.readOnly, - //"space_retired_mb", txInfo.SpaceRetired/1024/1024, - "space_dirty_mb", txInfo.SpaceDirty/1024/1024, - //"callers", debug.Callers(7), - ) - } - */ -} - func (tx *MdbxTx) closeCursors() { for _, c := range tx.toCloseMap { if c != nil { @@ -1199,7 +973,6 @@ func (tx *MdbxTx) Put(table string, k, v []byte) error { func (tx *MdbxTx) Delete(table string, k []byte) error { err := tx.tx.Del(mdbx.DBI(tx.db.buckets[table].DBI), k, nil) - //TODO: revise the logic, why we should drop not found err? maybe we need another function for get with key error if mdbx.IsNotFound(err) { return nil } @@ -1208,7 +981,6 @@ func (tx *MdbxTx) Delete(table string, k []byte) error { func (tx *MdbxTx) GetOne(bucket string, k []byte) ([]byte, error) { v, err := tx.tx.Get(mdbx.DBI(tx.db.buckets[bucket].DBI), k) - //TODO: revise the logic, why we should drop not found err? maybe we need another function for get with key error if mdbx.IsNotFound(err) { return nil, nil } @@ -1362,35 +1134,12 @@ func (tx *MdbxTx) CursorDupSort(bucket string) (kv.CursorDupSort, error) { return tx.RwCursorDupSort(bucket) } -// methods here help to see better pprof picture -func (c *MdbxCursor) set(k []byte) ([]byte, []byte, error) { return c.c.Get(k, nil, mdbx.Set) } -func (c *MdbxCursor) getCurrent() ([]byte, []byte, error) { return c.c.Get(nil, nil, mdbx.GetCurrent) } -func (c *MdbxCursor) next() ([]byte, []byte, error) { return c.c.Get(nil, nil, mdbx.Next) } -func (c *MdbxCursor) nextDup() ([]byte, []byte, error) { return c.c.Get(nil, nil, mdbx.NextDup) } -func (c *MdbxCursor) nextNoDup() ([]byte, []byte, error) { return c.c.Get(nil, nil, mdbx.NextNoDup) } -func (c *MdbxCursor) prev() ([]byte, []byte, error) { return c.c.Get(nil, nil, mdbx.Prev) } -func (c *MdbxCursor) prevDup() ([]byte, []byte, error) { return c.c.Get(nil, nil, mdbx.PrevDup) } -func (c *MdbxCursor) prevNoDup() ([]byte, []byte, error) { return c.c.Get(nil, nil, mdbx.PrevNoDup) } -func (c *MdbxCursor) last() ([]byte, []byte, error) { return c.c.Get(nil, nil, mdbx.Last) } -func (c *MdbxCursor) delCurrent() error { return c.c.Del(mdbx.Current) } -func (c *MdbxCursor) delAllDupData() error { return c.c.Del(mdbx.AllDups) } -func (c *MdbxCursor) put(k, v []byte) error { return c.c.Put(k, v, 0) } -func (c *MdbxCursor) putNoOverwrite(k, v []byte) error { return c.c.Put(k, v, mdbx.NoOverwrite) } -func (c *MdbxCursor) getBoth(k, v []byte) ([]byte, error) { - _, v, err := c.c.Get(k, v, mdbx.GetBoth) - return v, err -} -func (c *MdbxCursor) getBothRange(k, v []byte) ([]byte, error) { - _, v, err := c.c.Get(k, v, mdbx.GetBothRange) - return v, err -} - func (c *MdbxCursor) First() ([]byte, []byte, error) { return c.Seek(nil) } func (c *MdbxCursor) Last() ([]byte, []byte, error) { - k, v, err := c.last() + k, v, err := c.c.Get(nil, nil, mdbx.Last) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil @@ -1421,49 +1170,45 @@ func (c *MdbxCursor) Seek(seek []byte) (k, v []byte, err error) { } return []byte{}, nil, fmt.Errorf("cursor.SetRange: %w, bucket: %s, key: %x", err, c.bucketName, seek) } - return k, v, nil } func (c *MdbxCursor) Next() (k, v []byte, err error) { - k, v, err = c.next() + k, v, err = c.c.Get(nil, nil, mdbx.Next) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil } return []byte{}, nil, fmt.Errorf("failed MdbxKV cursor.Next(): %w", err) } - return k, v, nil } func (c *MdbxCursor) Prev() (k, v []byte, err error) { - k, v, err = c.prev() + k, v, err = c.c.Get(nil, nil, mdbx.Prev) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil } return []byte{}, nil, fmt.Errorf("failed MdbxKV cursor.Prev(): %w", err) } - return k, v, nil } // Current - return key/data at current cursor position func (c *MdbxCursor) Current() ([]byte, []byte, error) { - k, v, err := c.getCurrent() + k, v, err := c.c.Get(nil, nil, mdbx.GetCurrent) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil } return []byte{}, nil, err } - return k, v, nil } func (c *MdbxCursor) Delete(k []byte) error { - _, _, err := c.set(k) + _, _, err := c.c.Get(k, nil, mdbx.Set) if err != nil { if mdbx.IsNotFound(err) { return nil @@ -1472,10 +1217,10 @@ func (c *MdbxCursor) Delete(k []byte) error { } if c.bucketCfg.Flags&mdbx.DupSort != 0 { - return c.delAllDupData() + return c.c.Del(mdbx.AllDups) } - return c.delCurrent() + return c.c.Del(mdbx.Current) } // DeleteCurrent This function deletes the key/data pair to which the cursor refers. @@ -1483,23 +1228,18 @@ func (c *MdbxCursor) Delete(k []byte) error { // can still be used on it. // Both MDB_NEXT and MDB_GET_CURRENT will return the same record after // this operation. -func (c *MdbxCursor) DeleteCurrent() error { - return c.delCurrent() -} - -func (c *MdbxCursor) PutNoOverwrite(key []byte, value []byte) error { - return c.putNoOverwrite(key, value) -} +func (c *MdbxCursor) DeleteCurrent() error { return c.c.Del(mdbx.Current) } +func (c *MdbxCursor) PutNoOverwrite(k, v []byte) error { return c.c.Put(k, v, mdbx.NoOverwrite) } func (c *MdbxCursor) Put(key []byte, value []byte) error { - if err := c.put(key, value); err != nil { + if err := c.c.Put(key, value, 0); err != nil { return fmt.Errorf("label: %s, table: %s, err: %w", c.tx.db.opts.label, c.bucketName, err) } return nil } func (c *MdbxCursor) SeekExact(key []byte) ([]byte, []byte, error) { - k, v, err := c.set(key) + k, v, err := c.c.Get(key, nil, mdbx.Set) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil @@ -1531,24 +1271,20 @@ type MdbxDupSortCursor struct { *MdbxCursor } -func (c *MdbxDupSortCursor) Internal() *mdbx.Cursor { - return c.c -} - // DeleteExact - does delete func (c *MdbxDupSortCursor) DeleteExact(k1, k2 []byte) error { - _, err := c.getBoth(k1, k2) + _, _, err := c.c.Get(k1, k2, mdbx.GetBoth) if err != nil { // if key not found, or found another one - then nothing to delete if mdbx.IsNotFound(err) { return nil } return err } - return c.delCurrent() + return c.c.Del(mdbx.Current) } func (c *MdbxDupSortCursor) SeekBothExact(key, value []byte) ([]byte, []byte, error) { - v, err := c.getBoth(key, value) + _, v, err := c.c.Get(key, value, mdbx.GetBoth) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil @@ -1559,7 +1295,7 @@ func (c *MdbxDupSortCursor) SeekBothExact(key, value []byte) ([]byte, []byte, er } func (c *MdbxDupSortCursor) SeekBothRange(key, value []byte) ([]byte, error) { - v, err := c.getBothRange(key, value) + _, v, err := c.c.Get(key, value, mdbx.GetBothRange) if err != nil { if mdbx.IsNotFound(err) { return nil, nil @@ -1582,7 +1318,7 @@ func (c *MdbxDupSortCursor) FirstDup() ([]byte, error) { // NextDup - iterate only over duplicates of current key func (c *MdbxDupSortCursor) NextDup() ([]byte, []byte, error) { - k, v, err := c.nextDup() + k, v, err := c.c.Get(nil, nil, mdbx.NextDup) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil @@ -1594,7 +1330,7 @@ func (c *MdbxDupSortCursor) NextDup() ([]byte, []byte, error) { // NextNoDup - iterate with skipping all duplicates func (c *MdbxDupSortCursor) NextNoDup() ([]byte, []byte, error) { - k, v, err := c.nextNoDup() + k, v, err := c.c.Get(nil, nil, mdbx.NextNoDup) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil @@ -1605,7 +1341,7 @@ func (c *MdbxDupSortCursor) NextNoDup() ([]byte, []byte, error) { } func (c *MdbxDupSortCursor) PrevDup() ([]byte, []byte, error) { - k, v, err := c.prevDup() + k, v, err := c.c.Get(nil, nil, mdbx.PrevDup) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil @@ -1616,7 +1352,7 @@ func (c *MdbxDupSortCursor) PrevDup() ([]byte, []byte, error) { } func (c *MdbxDupSortCursor) PrevNoDup() ([]byte, []byte, error) { - k, v, err := c.prevNoDup() + k, v, err := c.c.Get(nil, nil, mdbx.PrevNoDup) if err != nil { if mdbx.IsNotFound(err) { return nil, nil, nil @@ -1661,7 +1397,7 @@ func (c *MdbxDupSortCursor) PutNoDupData(k, v []byte) error { // DeleteCurrentDuplicates - delete all of the data items for the current key. func (c *MdbxDupSortCursor) DeleteCurrentDuplicates() error { - if err := c.delAllDupData(); err != nil { + if err := c.c.Del(mdbx.AllDups); err != nil { return fmt.Errorf("label: %s,in DeleteCurrentDuplicates: %w", c.tx.db.opts.label, err) } return nil @@ -1708,19 +1444,23 @@ func (tx *MdbxTx) ForEach(bucket string, fromPrefix []byte, walker func(k, v []b func (tx *MdbxTx) Prefix(table string, prefix []byte) (stream.KV, error) { nextPrefix, ok := kv.NextSubtree(prefix) if !ok { - return tx.Range(table, prefix, nil) + return tx.Range(table, prefix, nil, order.Asc, -1) } - return tx.Range(table, prefix, nextPrefix) + return tx.Range(table, prefix, nextPrefix, order.Asc, -1) } -func (tx *MdbxTx) Range(table string, fromPrefix, toPrefix []byte) (stream.KV, error) { - return tx.RangeAscend(table, fromPrefix, toPrefix, -1) -} -func (tx *MdbxTx) RangeAscend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) { - return tx.rangeOrderLimit(table, fromPrefix, toPrefix, order.Asc, limit) -} -func (tx *MdbxTx) RangeDescend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) { - return tx.rangeOrderLimit(table, fromPrefix, toPrefix, order.Desc, limit) +func (tx *MdbxTx) Range(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) { + s := &cursor2iter{ctx: tx.ctx, tx: tx, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: asc, limit: int64(limit), id: tx.ID} + tx.ID++ + if tx.toCloseMap == nil { + tx.toCloseMap = make(map[uint64]kv.Closer) + } + tx.toCloseMap[s.id] = s + if err := s.init(table, tx); err != nil { + s.Close() //it's responsibility of constructor (our) to close resource on error + return nil, err + } + return s, nil } type cursor2iter struct { @@ -1734,19 +1474,6 @@ type cursor2iter struct { ctx context.Context } -func (tx *MdbxTx) rangeOrderLimit(table string, fromPrefix, toPrefix []byte, orderAscend order.By, limit int) (*cursor2iter, error) { - s := &cursor2iter{ctx: tx.ctx, tx: tx, fromPrefix: fromPrefix, toPrefix: toPrefix, orderAscend: orderAscend, limit: int64(limit), id: tx.ID} - tx.ID++ - if tx.toCloseMap == nil { - tx.toCloseMap = make(map[uint64]kv.Closer) - } - tx.toCloseMap[s.id] = s - if err := s.init(table, tx); err != nil { - s.Close() //it's responsibility of constructor (our) to close resource on error - return nil, err - } - return s, nil -} func (s *cursor2iter) init(table string, tx kv.Tx) error { if s.orderAscend && s.fromPrefix != nil && s.toPrefix != nil && bytes.Compare(s.fromPrefix, s.toPrefix) >= 0 { return fmt.Errorf("tx.Dual: %x must be lexicographicaly before %x", s.fromPrefix, s.toPrefix) diff --git a/erigon-lib/kv/mdbx/kv_mdbx_batch.go b/erigon-lib/kv/mdbx/kv_mdbx_batch.go new file mode 100644 index 00000000000..ba73b5b37b4 --- /dev/null +++ b/erigon-lib/kv/mdbx/kv_mdbx_batch.go @@ -0,0 +1,160 @@ +// Copyright 2021 The Erigon Authors +// This file is part of Erigon. +// +// Erigon is free software: you can redistribute it and/or modify +// it under the terms of the GNU Lesser General Public License as published by +// the Free Software Foundation, either version 3 of the License, or +// (at your option) any later version. +// +// Erigon is distributed in the hope that it will be useful, +// but WITHOUT ANY WARRANTY; without even the implied warranty of +// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the +// GNU Lesser General Public License for more details. +// +// You should have received a copy of the GNU Lesser General Public License +// along with Erigon. If not, see . + +package mdbx + +import ( + "context" + "errors" + "fmt" + "sync" + "time" + + "github.com/erigontech/erigon-lib/kv" +) + +// Batch is only useful when there are multiple goroutines calling it. +// It behaves similar to Update, except: +// +// 1. concurrent Batch calls can be combined into a single RwTx. +// +// 2. the function passed to Batch may be called multiple times, +// regardless of whether it returns error or not. +// +// This means that Batch function side effects must be idempotent and +// take permanent effect only after a successful return is seen in +// caller. +// +// Example of bad side-effects: print messages, mutate external counters `i++` +// +// The maximum batch size and delay can be adjusted with DB.MaxBatchSize +// and DB.MaxBatchDelay, respectively. +func (db *MdbxKV) Batch(fn func(tx kv.RwTx) error) error { + errCh := make(chan error, 1) + + db.batchMu.Lock() + if (db.batch == nil) || (db.batch != nil && len(db.batch.calls) >= db.MaxBatchSize) { + // There is no existing batch, or the existing batch is full; start a new one. + db.batch = &batch{ + db: db, + } + db.batch.timer = time.AfterFunc(db.MaxBatchDelay, db.batch.trigger) + } + db.batch.calls = append(db.batch.calls, call{fn: fn, err: errCh}) + if len(db.batch.calls) >= db.MaxBatchSize { + // wake up batch, it's ready to run + go db.batch.trigger() + } + db.batchMu.Unlock() + + err := <-errCh + if errors.Is(err, trySolo) { + err = db.Update(context.Background(), fn) + } + return err +} + +// Default values if not set in a DB instance. +const ( + DefaultMaxBatchSize int = 1000 + DefaultMaxBatchDelay = 10 * time.Millisecond +) + +type batch struct { + db *MdbxKV + timer *time.Timer + start sync.Once + calls []call +} + +type call struct { + fn func(kv.RwTx) error + err chan<- error +} + +// trigger runs the batch if it hasn't already been run. +func (b *batch) trigger() { + b.start.Do(b.run) +} + +// run performs the transactions in the batch and communicates results +// back to DB.Batch. +func (b *batch) run() { + b.db.batchMu.Lock() + b.timer.Stop() + // Make sure no new work is added to this batch, but don't break + // other batches. + if b.db.batch == b { + b.db.batch = nil + } + b.db.batchMu.Unlock() + +retry: + for len(b.calls) > 0 { + var failIdx = -1 + err := b.db.Update(context.Background(), func(tx kv.RwTx) error { + for i, c := range b.calls { + if err := safelyCall(c.fn, tx); err != nil { + failIdx = i + return err + } + } + return nil + }) + + if failIdx >= 0 { + // take the failing transaction out of the batch. it's + // safe to shorten b.calls here because db.batch no longer + // points to us, and we hold the mutex anyway. + c := b.calls[failIdx] + b.calls[failIdx], b.calls = b.calls[len(b.calls)-1], b.calls[:len(b.calls)-1] + // tell the submitter re-run it solo, continue with the rest of the batch + c.err <- trySolo + continue retry + } + + // pass success, or bolt internal errors, to all callers + for _, c := range b.calls { + c.err <- err + } + break retry + } +} + +// trySolo is a special sentinel error value used for signaling that a +// transaction function should be re-run. It should never be seen by +// callers. +var trySolo = errors.New("batch function returned an error and should be re-run solo") + +type panicked struct { + reason interface{} +} + +func (p panicked) Error() string { + if err, ok := p.reason.(error); ok { + return err.Error() + } + return fmt.Sprintf("panic: %v", p.reason) +} + +func safelyCall(fn func(tx kv.RwTx) error, tx kv.RwTx) (err error) { + defer func() { + if p := recover(); p != nil { + err = panicked{p} + } + }() + return fn(tx) +} diff --git a/erigon-lib/kv/mdbx/kv_mdbx_test.go b/erigon-lib/kv/mdbx/kv_mdbx_test.go index 664f5e6c299..1d42f22cb08 100644 --- a/erigon-lib/kv/mdbx/kv_mdbx_test.go +++ b/erigon-lib/kv/mdbx/kv_mdbx_test.go @@ -124,7 +124,7 @@ func TestRange(t *testing.T) { _, tx, _ := BaseCase(t) //[from, to) - it, err := tx.Range("Table", []byte("key1"), []byte("key3")) + it, err := tx.Range("Table", []byte("key1"), []byte("key3"), order.Asc, kv.Unlim) require.NoError(t, err) require.True(t, it.HasNext()) k, v, err := it.Next() @@ -142,7 +142,7 @@ func TestRange(t *testing.T) { require.False(t, it.HasNext()) // [from, nil) means [from, INF) - it, err = tx.Range("Table", []byte("key1"), nil) + it, err = tx.Range("Table", []byte("key1"), nil, order.Asc, kv.Unlim) require.NoError(t, err) cnt := 0 for it.HasNext() { @@ -156,7 +156,7 @@ func TestRange(t *testing.T) { _, tx, _ := BaseCase(t) //[from, to) - it, err := tx.RangeDescend("Table", []byte("key3"), []byte("key1"), kv.Unlim) + it, err := tx.Range("Table", []byte("key3"), []byte("key1"), order.Desc, kv.Unlim) require.NoError(t, err) require.True(t, it.HasNext()) k, v, err := it.Next() @@ -172,7 +172,7 @@ func TestRange(t *testing.T) { require.False(t, it.HasNext()) - it, err = tx.RangeDescend("Table", nil, nil, 2) + it, err = tx.Range("Table", nil, nil, order.Desc, 2) require.NoError(t, err) cnt := 0 diff --git a/erigon-lib/kv/membatch/mapmutation.go b/erigon-lib/kv/membatch/mapmutation.go index 2e603b50729..cc4069494df 100644 --- a/erigon-lib/kv/membatch/mapmutation.go +++ b/erigon-lib/kv/membatch/mapmutation.go @@ -80,12 +80,7 @@ func (m *Mapmutation) DBSize() (uint64, error) { panic("implement me") } -func (m *Mapmutation) Range(table string, fromPrefix, toPrefix []byte) (stream.KV, error) { - //TODO implement me - panic("implement me") -} - -func (m *Mapmutation) RangeAscend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) { +func (m *Mapmutation) Range(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) { //TODO implement me panic("implement me") } diff --git a/erigon-lib/kv/membatchwithdb/memory_mutation.go b/erigon-lib/kv/membatchwithdb/memory_mutation.go index 70d3df91530..9e2b2d37b42 100644 --- a/erigon-lib/kv/membatchwithdb/memory_mutation.go +++ b/erigon-lib/kv/membatchwithdb/memory_mutation.go @@ -242,9 +242,9 @@ func (m *MemoryMutation) ForEach(bucket string, fromPrefix []byte, walker func(k func (m *MemoryMutation) Prefix(table string, prefix []byte) (stream.KV, error) { nextPrefix, ok := kv.NextSubtree(prefix) if !ok { - return m.Range(table, prefix, nil) + return m.Range(table, prefix, nil, order.Asc, kv.Unlim) } - return m.Range(table, prefix, nextPrefix) + return m.Range(table, prefix, nextPrefix, order.Asc, kv.Unlim) } func (m *MemoryMutation) Stream(table string, fromPrefix, toPrefix []byte) (stream.KV, error) { panic("please implement me") @@ -255,31 +255,13 @@ func (m *MemoryMutation) StreamAscend(table string, fromPrefix, toPrefix []byte, func (m *MemoryMutation) StreamDescend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) { panic("please implement me") } -func (m *MemoryMutation) Range(table string, fromPrefix, toPrefix []byte) (stream.KV, error) { - return m.RangeAscend(table, fromPrefix, toPrefix, -1) -} -func (m *MemoryMutation) RangeAscend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) { +func (m *MemoryMutation) Range(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) { s := &rangeIter{orderAscend: true, limit: int64(limit)} var err error - if s.iterDb, err = m.db.RangeAscend(table, fromPrefix, toPrefix, limit); err != nil { - return s, err - } - if s.iterMem, err = m.memTx.RangeAscend(table, fromPrefix, toPrefix, limit); err != nil { - return s, err - } - if _, err := s.init(); err != nil { - s.Close() //it's responsibility of constructor (our) to close resource on error - return nil, err - } - return s, nil -} -func (m *MemoryMutation) RangeDescend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) { - s := &rangeIter{orderAscend: false, limit: int64(limit)} - var err error - if s.iterDb, err = m.db.RangeDescend(table, fromPrefix, toPrefix, limit); err != nil { + if s.iterDb, err = m.db.Range(table, fromPrefix, toPrefix, asc, limit); err != nil { return s, err } - if s.iterMem, err = m.memTx.RangeDescend(table, fromPrefix, toPrefix, limit); err != nil { + if s.iterMem, err = m.memTx.Range(table, fromPrefix, toPrefix, asc, limit); err != nil { return s, err } if _, err := s.init(); err != nil { diff --git a/erigon-lib/kv/order/order.go b/erigon-lib/kv/order/order.go index 47c1226b791..fa00afb97fb 100644 --- a/erigon-lib/kv/order/order.go +++ b/erigon-lib/kv/order/order.go @@ -22,3 +22,10 @@ const ( Asc By = true Desc By = false ) + +func FromBool(v bool) By { + if v { + return Asc + } + return Desc +} diff --git a/erigon-lib/kv/remotedb/kv_remote.go b/erigon-lib/kv/remotedb/kv_remote.go index a69a246003f..c06022bb728 100644 --- a/erigon-lib/kv/remotedb/kv_remote.go +++ b/erigon-lib/kv/remotedb/kv_remote.go @@ -276,7 +276,7 @@ func (tx *tx) Count(bucket string) (uint64, error) { func (tx *tx) BucketSize(name string) (uint64, error) { panic("not implemented") } func (tx *tx) ForEach(bucket string, fromPrefix []byte, walker func(k, v []byte) error) error { - it, err := tx.Range(bucket, fromPrefix, nil) + it, err := tx.Range(bucket, fromPrefix, nil, order.Asc, kv.Unlim) if err != nil { return err } @@ -681,9 +681,9 @@ func (tx *tx) IndexRange(name kv.InvertedIdx, k []byte, fromTs, toTs int, asc or func (tx *tx) Prefix(table string, prefix []byte) (stream.KV, error) { nextPrefix, ok := kv.NextSubtree(prefix) if !ok { - return tx.Range(table, prefix, nil) + return tx.Range(table, prefix, nil, order.Asc, kv.Unlim) } - return tx.Range(table, prefix, nextPrefix) + return tx.Range(table, prefix, nextPrefix, order.Asc, kv.Unlim) } func (tx *tx) rangeOrderLimit(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) { @@ -696,14 +696,8 @@ func (tx *tx) rangeOrderLimit(table string, fromPrefix, toPrefix []byte, asc ord return reply.Keys, reply.Values, reply.NextPageToken, nil }), nil } -func (tx *tx) Range(table string, fromPrefix, toPrefix []byte) (stream.KV, error) { - return tx.rangeOrderLimit(table, fromPrefix, toPrefix, order.Asc, -1) -} -func (tx *tx) RangeAscend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) { - return tx.rangeOrderLimit(table, fromPrefix, toPrefix, order.Asc, limit) -} -func (tx *tx) RangeDescend(table string, fromPrefix, toPrefix []byte, limit int) (stream.KV, error) { - return tx.rangeOrderLimit(table, fromPrefix, toPrefix, order.Desc, limit) +func (tx *tx) Range(table string, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) { + return tx.rangeOrderLimit(table, fromPrefix, toPrefix, asc, limit) } func (tx *tx) RangeDupSort(table string, key []byte, fromPrefix, toPrefix []byte, asc order.By, limit int) (stream.KV, error) { panic("not implemented yet") diff --git a/erigon-lib/kv/remotedbserver/remotedbserver.go b/erigon-lib/kv/remotedbserver/remotedbserver.go index efdf2da2a02..1450ccbb1b5 100644 --- a/erigon-lib/kv/remotedbserver/remotedbserver.go +++ b/erigon-lib/kv/remotedbserver/remotedbserver.go @@ -726,16 +726,9 @@ func (s *KvServer) Range(_ context.Context, req *remote.RangeReq) (*remote.Pairs var err error if err = s.with(req.TxId, func(tx kv.Tx) error { var it stream.KV - if req.OrderAscend { - it, err = tx.RangeAscend(req.Table, from, req.ToPrefix, limit) - if err != nil { - return err - } - } else { - it, err = tx.RangeDescend(req.Table, from, req.ToPrefix, limit) - if err != nil { - return err - } + it, err = tx.Range(req.Table, from, req.ToPrefix, order.FromBool(req.OrderAscend), limit) + if err != nil { + return err } for it.HasNext() { k, v, err := it.Next() diff --git a/erigon-lib/kv/stream/stream_test.go b/erigon-lib/kv/stream/stream_test.go index ff6e1f3e708..e42eba6c584 100644 --- a/erigon-lib/kv/stream/stream_test.go +++ b/erigon-lib/kv/stream/stream_test.go @@ -90,8 +90,8 @@ func TestUnionPairs(t *testing.T) { _ = tx.Put(kv.E2AccountsHistory, []byte{4}, []byte{1}) _ = tx.Put(kv.AccountChangeSet, []byte{2}, []byte{9}) _ = tx.Put(kv.AccountChangeSet, []byte{3}, []byte{9}) - it, _ := tx.Range(kv.E2AccountsHistory, nil, nil) - it2, _ := tx.Range(kv.AccountChangeSet, nil, nil) + it, _ := tx.Range(kv.E2AccountsHistory, nil, nil, order.Asc, kv.Unlim) + it2, _ := tx.Range(kv.AccountChangeSet, nil, nil, order.Asc, kv.Unlim) keys, values, err := stream.ToArrayKV(stream.UnionKV(it, it2, -1)) require.NoError(err) require.Equal([][]byte{{1}, {2}, {3}, {4}}, keys) @@ -103,8 +103,8 @@ func TestUnionPairs(t *testing.T) { defer tx.Rollback() _ = tx.Put(kv.AccountChangeSet, []byte{2}, []byte{9}) _ = tx.Put(kv.AccountChangeSet, []byte{3}, []byte{9}) - it, _ := tx.Range(kv.E2AccountsHistory, nil, nil) - it2, _ := tx.Range(kv.AccountChangeSet, nil, nil) + it, _ := tx.Range(kv.E2AccountsHistory, nil, nil, order.Asc, kv.Unlim) + it2, _ := tx.Range(kv.AccountChangeSet, nil, nil, order.Asc, kv.Unlim) keys, _, err := stream.ToArrayKV(stream.UnionKV(it, it2, -1)) require.NoError(err) require.Equal([][]byte{{2}, {3}}, keys) @@ -116,8 +116,8 @@ func TestUnionPairs(t *testing.T) { _ = tx.Put(kv.E2AccountsHistory, []byte{1}, []byte{1}) _ = tx.Put(kv.E2AccountsHistory, []byte{3}, []byte{1}) _ = tx.Put(kv.E2AccountsHistory, []byte{4}, []byte{1}) - it, _ := tx.Range(kv.E2AccountsHistory, nil, nil) - it2, _ := tx.Range(kv.AccountChangeSet, nil, nil) + it, _ := tx.Range(kv.E2AccountsHistory, nil, nil, order.Asc, kv.Unlim) + it2, _ := tx.Range(kv.AccountChangeSet, nil, nil, order.Asc, kv.Unlim) keys, _, err := stream.ToArrayKV(stream.UnionKV(it, it2, -1)) require.NoError(err) require.Equal([][]byte{{1}, {3}, {4}}, keys) @@ -126,8 +126,8 @@ func TestUnionPairs(t *testing.T) { require := require.New(t) tx, _ := db.BeginRw(ctx) defer tx.Rollback() - it, _ := tx.Range(kv.E2AccountsHistory, nil, nil) - it2, _ := tx.Range(kv.AccountChangeSet, nil, nil) + it, _ := tx.Range(kv.E2AccountsHistory, nil, nil, order.Asc, kv.Unlim) + it2, _ := tx.Range(kv.AccountChangeSet, nil, nil, order.Asc, kv.Unlim) m := stream.UnionKV(it, it2, -1) require.False(m.HasNext()) }) diff --git a/erigon-lib/state/aggregator.go b/erigon-lib/state/aggregator.go index 851e2fdbd78..6053ded41cc 100644 --- a/erigon-lib/state/aggregator.go +++ b/erigon-lib/state/aggregator.go @@ -283,6 +283,7 @@ func (a *Aggregator) registerII(idx kv.InvertedIdxPos, salt *uint32, dirs datadi return nil } +func (a *Aggregator) StepSize() uint64 { return a.aggregationStep } func (a *Aggregator) OnFreeze(f OnFreezeFunc) { a.onFreeze = f } func (a *Aggregator) DisableFsync() { for _, d := range a.d { diff --git a/erigon-lib/state/history.go b/erigon-lib/state/history.go index fac1beec9e7..31adb3cf01d 100644 --- a/erigon-lib/state/history.go +++ b/erigon-lib/state/history.go @@ -1733,13 +1733,7 @@ func (ht *HistoryRoTx) idxRangeRecent(key []byte, startTxNum, endTxNum int, asc toTxNum = uint64(endTxNum) } binary.BigEndian.PutUint64(to[len(key):], toTxNum) - var it stream.KV - var err error - if asc { - it, err = roTx.RangeAscend(ht.h.historyValsTable, from, to, limit) - } else { - it, err = roTx.RangeDescend(ht.h.historyValsTable, from, to, limit) - } + it, err := roTx.Range(ht.h.historyValsTable, from, to, asc, limit) if err != nil { return nil, err } diff --git a/erigon-lib/state/reconst.go b/erigon-lib/state/reconst.go index b99fb4196a8..7bf2df5b486 100644 --- a/erigon-lib/state/reconst.go +++ b/erigon-lib/state/reconst.go @@ -1,121 +1 @@ package state - -import ( - "fmt" - - "github.com/erigontech/erigon-lib/kv" -) - -// AggregatorStep is used for incremental reconstitution, it allows -// accessing history in isolated way for each step -type AggregatorStep struct { - a *Aggregator - accounts *HistoryStep - storage *HistoryStep - code *HistoryStep - commitment *HistoryStep - keyBuf []byte -} - -func (a *Aggregator) StepSize() uint64 { return a.aggregationStep } -func (a *Aggregator) MakeSteps() ([]*AggregatorStep, error) { - frozenAndIndexed := a.DirtyFilesEndTxNumMinimax() - accountSteps := a.d[kv.AccountsDomain].MakeSteps(frozenAndIndexed) - codeSteps := a.d[kv.CodeDomain].MakeSteps(frozenAndIndexed) - storageSteps := a.d[kv.StorageDomain].MakeSteps(frozenAndIndexed) - commitmentSteps := a.d[kv.CommitmentDomain].MakeSteps(frozenAndIndexed) - if len(accountSteps) != len(storageSteps) || len(storageSteps) != len(codeSteps) { - return nil, fmt.Errorf("different limit of steps (try merge snapshots): accountSteps=%d, storageSteps=%d, codeSteps=%d", len(accountSteps), len(storageSteps), len(codeSteps)) - } - steps := make([]*AggregatorStep, len(accountSteps)) - for i, accountStep := range accountSteps { - steps[i] = &AggregatorStep{ - a: a, - accounts: accountStep, - storage: storageSteps[i], - code: codeSteps[i], - commitment: commitmentSteps[i], - } - } - return steps, nil -} - -func (as *AggregatorStep) TxNumRange() (uint64, uint64) { - return as.accounts.indexFile.startTxNum, as.accounts.indexFile.endTxNum -} - -func (as *AggregatorStep) IterateAccountsTxs() *ScanIteratorInc { - return as.accounts.iterateTxs() -} - -func (as *AggregatorStep) IterateStorageTxs() *ScanIteratorInc { - return as.storage.iterateTxs() -} - -func (as *AggregatorStep) IterateCodeTxs() *ScanIteratorInc { - return as.code.iterateTxs() -} - -func (as *AggregatorStep) ReadAccountDataNoState(addr []byte, txNum uint64) ([]byte, bool, uint64) { - return as.accounts.GetNoState(addr, txNum) -} - -func (as *AggregatorStep) ReadAccountStorageNoState(addr []byte, loc []byte, txNum uint64) ([]byte, bool, uint64) { - if cap(as.keyBuf) < len(addr)+len(loc) { - as.keyBuf = make([]byte, len(addr)+len(loc)) - } else if len(as.keyBuf) != len(addr)+len(loc) { - as.keyBuf = as.keyBuf[:len(addr)+len(loc)] - } - copy(as.keyBuf, addr) - copy(as.keyBuf[len(addr):], loc) - return as.storage.GetNoState(as.keyBuf, txNum) -} - -func (as *AggregatorStep) ReadAccountCodeNoState(addr []byte, txNum uint64) ([]byte, bool, uint64) { - return as.code.GetNoState(addr, txNum) -} - -func (as *AggregatorStep) ReadAccountCodeSizeNoState(addr []byte, txNum uint64) (int, bool, uint64) { - code, noState, stateTxNum := as.code.GetNoState(addr, txNum) - return len(code), noState, stateTxNum -} - -func (as *AggregatorStep) MaxTxNumAccounts(addr []byte) (bool, uint64) { - return as.accounts.MaxTxNum(addr) -} - -func (as *AggregatorStep) MaxTxNumStorage(addr []byte, loc []byte) (bool, uint64) { - if cap(as.keyBuf) < len(addr)+len(loc) { - as.keyBuf = make([]byte, len(addr)+len(loc)) - } else if len(as.keyBuf) != len(addr)+len(loc) { - as.keyBuf = as.keyBuf[:len(addr)+len(loc)] - } - copy(as.keyBuf, addr) - copy(as.keyBuf[len(addr):], loc) - return as.storage.MaxTxNum(as.keyBuf) -} - -func (as *AggregatorStep) MaxTxNumCode(addr []byte) (bool, uint64) { - return as.code.MaxTxNum(addr) -} - -func (as *AggregatorStep) IterateAccountsHistory(txNum uint64) *HistoryIteratorInc { - return as.accounts.interateHistoryBeforeTxNum(txNum) -} - -func (as *AggregatorStep) IterateStorageHistory(txNum uint64) *HistoryIteratorInc { - return as.storage.interateHistoryBeforeTxNum(txNum) -} - -func (as *AggregatorStep) IterateCodeHistory(txNum uint64) *HistoryIteratorInc { - return as.code.interateHistoryBeforeTxNum(txNum) -} - -func (as *AggregatorStep) Clone() *AggregatorStep { - return &AggregatorStep{ - a: as.a, - accounts: as.accounts.Clone(), - storage: as.storage.Clone(), - code: as.code.Clone(), - } -} diff --git a/erigon-lib/txpool/pool.go b/erigon-lib/txpool/pool.go index 425baf84d99..6eb6794c336 100644 --- a/erigon-lib/txpool/pool.go +++ b/erigon-lib/txpool/pool.go @@ -35,6 +35,7 @@ import ( gokzg4844 "github.com/crate-crypto/go-kzg-4844" mapset "github.com/deckarep/golang-set/v2" + "github.com/erigontech/erigon-lib/kv/order" "github.com/go-stack/stack" "github.com/google/btree" "github.com/hashicorp/golang-lru/v2/simplelru" @@ -2108,7 +2109,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { if err != nil { return err } - it, err := tx.Range(kv.RecentLocalTransaction, nil, nil) + it, err := tx.Range(kv.RecentLocalTransaction, nil, nil, order.Asc, kv.Unlim) if err != nil { return err } @@ -2125,7 +2126,7 @@ func (p *TxPool) fromDB(ctx context.Context, tx kv.Tx, coreTx kv.Tx) error { parseCtx.WithSender(false) i := 0 - it, err = tx.Range(kv.PoolTransaction, nil, nil) + it, err = tx.Range(kv.PoolTransaction, nil, nil, order.Asc, kv.Unlim) if err != nil { return err } diff --git a/eth/stagedsync/stagedsynctest/harness.go b/eth/stagedsync/stagedsynctest/harness.go index bd958694086..facbd0f4d70 100644 --- a/eth/stagedsync/stagedsynctest/harness.go +++ b/eth/stagedsync/stagedsynctest/harness.go @@ -27,6 +27,7 @@ import ( "testing" "time" + "github.com/erigontech/erigon-lib/kv/order" "github.com/holiman/uint256" "github.com/stretchr/testify/require" "go.uber.org/mock/gomock" @@ -291,7 +292,7 @@ func (h *Harness) SetMiningBlockEmptyHeader(ctx context.Context, t *testing.T, p func (h *Harness) ReadSpansFromDB(ctx context.Context) (spans []*heimdall.Span, err error) { err = h.chainDataDB.View(ctx, func(tx kv.Tx) error { - spanIter, err := tx.Range(kv.BorSpans, nil, nil) + spanIter, err := tx.Range(kv.BorSpans, nil, nil, order.Asc, kv.Unlim) if err != nil { return err } @@ -326,7 +327,7 @@ func (h *Harness) ReadSpansFromDB(ctx context.Context) (spans []*heimdall.Span, func (h *Harness) ReadStateSyncEventsFromDB(ctx context.Context) (eventIDs []uint64, err error) { err = h.chainDataDB.View(ctx, func(tx kv.Tx) error { - eventsIter, err := tx.Range(kv.BorEvents, nil, nil) + eventsIter, err := tx.Range(kv.BorEvents, nil, nil, order.Asc, kv.Unlim) if err != nil { return err } @@ -352,7 +353,7 @@ func (h *Harness) ReadStateSyncEventsFromDB(ctx context.Context) (eventIDs []uin func (h *Harness) ReadLastStateSyncEventNumPerBlockFromDB(ctx context.Context) (nums map[uint64]uint64, err error) { nums = map[uint64]uint64{} err = h.chainDataDB.View(ctx, func(tx kv.Tx) error { - eventNumsIter, err := tx.Range(kv.BorEventNums, nil, nil) + eventNumsIter, err := tx.Range(kv.BorEventNums, nil, nil, order.Asc, kv.Unlim) if err != nil { return err } diff --git a/polygon/bridge/mdbx_store.go b/polygon/bridge/mdbx_store.go index cb044154bc5..a783a1461a4 100644 --- a/polygon/bridge/mdbx_store.go +++ b/polygon/bridge/mdbx_store.go @@ -28,6 +28,7 @@ import ( libcommon "github.com/erigontech/erigon-lib/common" "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/order" "github.com/erigontech/erigon-lib/log/v3" "github.com/erigontech/erigon/polygon/heimdall" "github.com/erigontech/erigon/polygon/polygoncommon" @@ -207,7 +208,7 @@ func lastEventIdWithinWindow(tx kv.Tx, fromId uint64, toTime time.Time) (uint64, k := make([]byte, 8) binary.BigEndian.PutUint64(k, fromId) - it, err := tx.RangeAscend(kv.BorEvents, k, nil, -1) + it, err := tx.Range(kv.BorEvents, k, nil, order.Asc, kv.Unlim) if err != nil { return 0, err } @@ -513,7 +514,7 @@ func (s txStore) Events(ctx context.Context, start, end uint64) ([][]byte, error kEnd := make([]byte, 8) binary.BigEndian.PutUint64(kEnd, end) - it, err := s.tx.Range(kv.BorEvents, kStart, kEnd) + it, err := s.tx.Range(kv.BorEvents, kStart, kEnd, order.Asc, kv.Unlim) if err != nil { return nil, err } diff --git a/polygon/heimdall/entity_store.go b/polygon/heimdall/entity_store.go index 41588242603..1a5ec6539a3 100644 --- a/polygon/heimdall/entity_store.go +++ b/polygon/heimdall/entity_store.go @@ -27,6 +27,7 @@ import ( "github.com/erigontech/erigon-lib/common/generics" "github.com/erigontech/erigon-lib/downloader/snaptype" "github.com/erigontech/erigon-lib/kv" + "github.com/erigontech/erigon-lib/kv/order" "github.com/erigontech/erigon/polygon/polygoncommon" ) @@ -379,7 +380,7 @@ func (s txEntityStore[TEntity]) PutEntity(ctx context.Context, id uint64, entity func (s txEntityStore[TEntity]) RangeFromId(ctx context.Context, startId uint64) ([]TEntity, error) { startKey := entityStoreKey(startId) - it, err := s.tx.Range(s.table, startKey[:], nil) + it, err := s.tx.Range(s.table, startKey[:], nil, order.Asc, kv.Unlim) if err != nil { return nil, err } diff --git a/turbo/app/backup_cmd.go b/turbo/app/backup_cmd.go deleted file mode 100644 index b239dd27f1c..00000000000 --- a/turbo/app/backup_cmd.go +++ /dev/null @@ -1,169 +0,0 @@ -// Copyright 2024 The Erigon Authors -// This file is part of Erigon. -// -// Erigon is free software: you can redistribute it and/or modify -// it under the terms of the GNU Lesser General Public License as published by -// the Free Software Foundation, either version 3 of the License, or -// (at your option) any later version. -// -// Erigon is distributed in the hope that it will be useful, -// but WITHOUT ANY WARRANTY; without even the implied warranty of -// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the -// GNU Lesser General Public License for more details. -// -// You should have received a copy of the GNU Lesser General Public License -// along with Erigon. If not, see . - -package app - -import ( - "fmt" - "os" - "path/filepath" - - "github.com/erigontech/erigon-lib/common" - "github.com/erigontech/erigon-lib/kv/backup" - - "github.com/c2h5oh/datasize" - "github.com/erigontech/erigon-lib/common/datadir" - "github.com/erigontech/erigon-lib/common/dir" - "github.com/erigontech/erigon-lib/kv" - "github.com/erigontech/erigon/cmd/utils" - "github.com/erigontech/erigon/cmd/utils/flags" - "github.com/erigontech/erigon/turbo/debug" - "github.com/urfave/cli/v2" -) - -// nolint -var backupCommand = cli.Command{ - Name: "alpha_backup", - Description: `Alpha verison of command. Backup all databases without stopping of Erigon. -While this command has Alpha prefix - we recommend to stop Erigon for backup. -Limitations: -- no support of datadir/snapshots folder. Recommendation: backup snapshots dir manually AFTER databases backup. Possible to implement in future. -- no support of Consensus DB (copy it manually if you need). Possible to implement in future. -- way to pipe output to compressor (lz4/zstd). Can compress target floder later or use zfs-with-enabled-compression. -- jwt tocken: copy it manually - if need. -- no support of SentryDB (datadir/nodes folder). Because seems no much reason to backup it. - -Example: erigon backup --datadir= --to.datadir= - -TODO: -- support of Consensus DB (copy it manually if you need). Possible to implement in future. -- support 2 use-cases: create new node (then remove jwt tocken, and nodes folder) and backup exising one (then backup jwt tocken, and nodes folder) -- support of datadir/snapshots folder. Possible to implement in future. Can copy it manually or rsync or symlink/mount. -`, - Action: doBackup, - Flags: joinFlags([]cli.Flag{ - &utils.DataDirFlag, - &ToDatadirFlag, - &BackupToPageSizeFlag, - &BackupLabelsFlag, - &BackupTablesFlag, - &WarmupThreadsFlag, - }), -} - -var ( - ToDatadirFlag = flags.DirectoryFlag{ - Name: "to.datadir", - Usage: "Target datadir", - Required: true, - } - BackupLabelsFlag = cli.StringFlag{ - Name: "labels", - Usage: "Name of component to backup. Example: chaindata,txpool,downloader", - } - BackupTablesFlag = cli.StringFlag{ - Name: "tables", - Usage: "One of: PlainState,HashedState", - } - BackupToPageSizeFlag = cli.StringFlag{ - Name: "to.pagesize", - Usage: utils.DbPageSizeFlag.Usage, - } - WarmupThreadsFlag = cli.Uint64Flag{ - Name: "warmup.threads", - Usage: `Erigon's db works as blocking-io: means it stops when read from disk. -It means backup speed depends on 'disk latency' (not throughput). -Can spawn many threads which will read-ahead the data and bring it to OS's PageCache. -CloudDrives (and ssd) have bad-latency and good-parallel-throughput - then having >1k of warmup threads will help.`, - Value: uint64(backup.ReadAheadThreads), - } -) - -func doBackup(cliCtx *cli.Context) error { - logger, _, _, err := debug.Setup(cliCtx, true /* rootLogger */) - if err != nil { - return err - } - - defer logger.Info("backup done") - - ctx := cliCtx.Context - dirs := datadir.New(cliCtx.String(utils.DataDirFlag.Name)) - toDirs := datadir.New(cliCtx.String(ToDatadirFlag.Name)) - - var targetPageSize datasize.ByteSize - if cliCtx.IsSet(BackupToPageSizeFlag.Name) { - targetPageSize = flags.DBPageSizeFlagUnmarshal(cliCtx, BackupToPageSizeFlag.Name, BackupToPageSizeFlag.Usage) - } - - var lables = []kv.Label{kv.ChainDB, kv.TxPoolDB, kv.DownloaderDB} - if cliCtx.IsSet(BackupToPageSizeFlag.Name) { - lables = lables[:0] - for _, l := range common.CliString2Array(cliCtx.String(BackupLabelsFlag.Name)) { - lables = append(lables, kv.UnmarshalLabel(l)) - } - } - - var tables []string - if cliCtx.IsSet(BackupTablesFlag.Name) { - tables = common.CliString2Array(cliCtx.String(BackupTablesFlag.Name)) - } - - readAheadThreads := backup.ReadAheadThreads - if cliCtx.IsSet(WarmupThreadsFlag.Name) { - readAheadThreads = int(cliCtx.Uint64(WarmupThreadsFlag.Name)) - } - - //kv.SentryDB no much reason to backup - //TODO: add support of kv.ConsensusDB - for _, label := range lables { - var from, to string - switch label { - case kv.ChainDB: - from, to = dirs.Chaindata, toDirs.Chaindata - case kv.TxPoolDB: - from, to = dirs.TxPool, toDirs.TxPool - case kv.DownloaderDB: - from, to = filepath.Join(dirs.Snap, "db"), filepath.Join(toDirs.Snap, "db") - default: - panic(fmt.Sprintf("unexpected: %+v", label)) - } - - exists, err := dir.Exist(from) - if err != nil { - return err - } - if !exists { - continue - } - - if len(tables) == 0 { // if not partial backup - just drop target dir, to make backup more compact/fast (instead of clean tables) - if err := os.RemoveAll(to); err != nil { - return fmt.Errorf("mkdir: %w, %s", err, to) - } - } - if err := os.MkdirAll(to, 0740); err != nil { //owner: rw, group: r, others: - - return fmt.Errorf("mkdir: %w, %s", err, to) - } - logger.Info("[backup] start", "label", label) - fromDB, toDB := backup.OpenPair(from, to, label, targetPageSize, logger) - if err := backup.Kv2kv(ctx, fromDB, toDB, nil, readAheadThreads, logger); err != nil { - return err - } - } - - return nil -} diff --git a/turbo/trie/trie_root.go b/turbo/trie/trie_root.go index 82d7383bc9f..fe89780a437 100644 --- a/turbo/trie/trie_root.go +++ b/turbo/trie/trie_root.go @@ -1477,19 +1477,6 @@ func keyIsBefore(k1, k2 []byte) bool { return bytes.Compare(k1, k2) < 0 } -func UnmarshalTrieNodeTyped(v []byte) (hasState, hasTree, hasHash uint16, hashes []libcommon.Hash, rootHash libcommon.Hash) { - hasState, hasTree, hasHash, v = binary.BigEndian.Uint16(v), binary.BigEndian.Uint16(v[2:]), binary.BigEndian.Uint16(v[4:]), v[6:] - if bits.OnesCount16(hasHash)+1 == len(v)/length2.Hash { - rootHash.SetBytes(libcommon.CopyBytes(v[:32])) - v = v[32:] - } - hashes = make([]libcommon.Hash, len(v)/length2.Hash) - for i := 0; i < len(hashes); i++ { - hashes[i].SetBytes(libcommon.CopyBytes(v[i*length2.Hash : (i+1)*length2.Hash])) - } - return -} - func UnmarshalTrieNode(v []byte) (hasState, hasTree, hasHash uint16, hashes, rootHash []byte) { hasState, hasTree, hasHash, hashes = binary.BigEndian.Uint16(v), binary.BigEndian.Uint16(v[2:]), binary.BigEndian.Uint16(v[4:]), v[6:] if bits.OnesCount16(hasHash)+1 == len(hashes)/length2.Hash { @@ -1499,51 +1486,6 @@ func UnmarshalTrieNode(v []byte) (hasState, hasTree, hasHash uint16, hashes, roo return } -func MarshalTrieNodeTyped(hasState, hasTree, hasHash uint16, h []libcommon.Hash, buf []byte) []byte { - buf = buf[:6+len(h)*length2.Hash] - meta, hashes := buf[:6], buf[6:] - binary.BigEndian.PutUint16(meta, hasState) - binary.BigEndian.PutUint16(meta[2:], hasTree) - binary.BigEndian.PutUint16(meta[4:], hasHash) - for i := 0; i < len(h); i++ { - copy(hashes[i*length2.Hash:(i+1)*length2.Hash], h[i].Bytes()) - } - return buf -} - -func StorageKey(addressHash []byte, incarnation uint64, prefix []byte) []byte { - return dbutils2.GenerateCompositeStoragePrefix(addressHash, incarnation, prefix) -} - -func MarshalTrieNode(hasState, hasTree, hasHash uint16, hashes, rootHash []byte, buf []byte) []byte { - buf = buf[:len(hashes)+len(rootHash)+6] - meta, hashesList := buf[:6], buf[6:] - binary.BigEndian.PutUint16(meta, hasState) - binary.BigEndian.PutUint16(meta[2:], hasTree) - binary.BigEndian.PutUint16(meta[4:], hasHash) - if len(rootHash) == 0 { - copy(hashesList, hashes) - } else { - copy(hashesList, rootHash) - copy(hashesList[32:], hashes) - } - return buf -} - -func CastTrieNodeValue(hashes, rootHash []byte) []libcommon.Hash { - to := make([]libcommon.Hash, len(hashes)/length2.Hash+len(rootHash)/length2.Hash) - i := 0 - if len(rootHash) > 0 { - to[0].SetBytes(libcommon.CopyBytes(rootHash)) - i++ - } - for j := 0; j < len(hashes)/length2.Hash; j++ { - to[i].SetBytes(libcommon.CopyBytes(hashes[j*length2.Hash : (j+1)*length2.Hash])) - i++ - } - return to -} - func makeCurrentKeyStr(k []byte) string { var currentKeyStr string if k == nil {