Skip to content

Commit

Permalink
Add the Abillity to Defragment the Beacon State (#13444)
Browse files Browse the repository at this point in the history
* Defragment head state

* change log level

* change it to be more efficient

* add flag

* add tests and clean up

* fix it

* gosimple

* Update container/multi-value-slice/multi_value_slice.go

Co-authored-by: Radosław Kapka <[email protected]>

* radek's review

* unlock it

* remove from fc lock

---------

Co-authored-by: rkapka <[email protected]>
  • Loading branch information
nisdas and rkapka authored Jan 13, 2024
1 parent 0cfbddc commit 1ff5a43
Show file tree
Hide file tree
Showing 8 changed files with 288 additions and 1 deletion.
1 change: 1 addition & 0 deletions beacon-chain/blockchain/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ go_library(
"chain_info.go",
"chain_info_forkchoice.go",
"currently_syncing_block.go",
"defragment.go",
"error.go",
"execution_engine.go",
"forkchoice_update_execution.go",
Expand Down
27 changes: 27 additions & 0 deletions beacon-chain/blockchain/defragment.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,27 @@
package blockchain

import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
"github.com/prysmaticlabs/prysm/v4/beacon-chain/state"
"github.com/prysmaticlabs/prysm/v4/config/features"
"github.com/prysmaticlabs/prysm/v4/time"
)

var stateDefragmentationTime = promauto.NewSummary(prometheus.SummaryOpts{
Name: "head_state_defragmentation_milliseconds",
Help: "Milliseconds it takes to defragment the head state",
})

// This method defragments our state, so that any specific fields which have
// a higher number of fragmented indexes are reallocated to a new separate slice for
// that field.
func (s *Service) defragmentState(st state.BeaconState) {
if !features.Get().EnableExperimentalState {
return
}
startTime := time.Now()
st.Defragment()
elapsedTime := time.Since(startTime)
stateDefragmentationTime.Observe(float64(elapsedTime.Milliseconds()))
}
3 changes: 3 additions & 0 deletions beacon-chain/blockchain/receive_block.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ func (s *Service) ReceiveBlock(ctx context.Context, block interfaces.ReadOnlySig
}
daWaitedTime := time.Since(daStartTime)

// Defragment the state before continuing block processing.
s.defragmentState(postState)

// The rest of block processing takes a lock on forkchoice.
s.cfg.ForkChoiceStore.Lock()
defer s.cfg.ForkChoiceStore.Unlock()
Expand Down
1 change: 1 addition & 0 deletions beacon-chain/state/interfaces.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ type BeaconState interface {
WriteOnlyBeaconState
Copy() BeaconState
CopyAllTries()
Defragment()
HashTreeRoot(ctx context.Context) ([32]byte, error)
Prover
json.Marshaler
Expand Down
49 changes: 49 additions & 0 deletions beacon-chain/state/state-native/multi_value_slices.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,55 @@ func NewMultiValueValidators(vals []*ethpb.Validator) *MultiValueValidators {
return mv
}

// Defragment checks whether each individual multi-value field in our state is fragmented
// and if it is, it will 'reset' the field to create a new multivalue object.
func (b *BeaconState) Defragment() {
b.lock.Lock()
defer b.lock.Unlock()
if b.blockRootsMultiValue != nil && b.blockRootsMultiValue.IsFragmented() {
initialMVslice := b.blockRootsMultiValue
b.blockRootsMultiValue = b.blockRootsMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.BlockRoots.String()).Inc()
runtime.SetFinalizer(b.blockRootsMultiValue, blockRootsFinalizer)
}
if b.stateRootsMultiValue != nil && b.stateRootsMultiValue.IsFragmented() {
initialMVslice := b.stateRootsMultiValue
b.stateRootsMultiValue = b.stateRootsMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.StateRoots.String()).Inc()
runtime.SetFinalizer(b.stateRootsMultiValue, stateRootsFinalizer)
}
if b.randaoMixesMultiValue != nil && b.randaoMixesMultiValue.IsFragmented() {
initialMVslice := b.randaoMixesMultiValue
b.randaoMixesMultiValue = b.randaoMixesMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.RandaoMixes.String()).Inc()
runtime.SetFinalizer(b.randaoMixesMultiValue, randaoMixesFinalizer)
}
if b.balancesMultiValue != nil && b.balancesMultiValue.IsFragmented() {
initialMVslice := b.balancesMultiValue
b.balancesMultiValue = b.balancesMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.Balances.String()).Inc()
runtime.SetFinalizer(b.balancesMultiValue, balancesFinalizer)
}
if b.validatorsMultiValue != nil && b.validatorsMultiValue.IsFragmented() {
initialMVslice := b.validatorsMultiValue
b.validatorsMultiValue = b.validatorsMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.Validators.String()).Inc()
runtime.SetFinalizer(b.validatorsMultiValue, validatorsFinalizer)
}
if b.inactivityScoresMultiValue != nil && b.inactivityScoresMultiValue.IsFragmented() {
initialMVslice := b.inactivityScoresMultiValue
b.inactivityScoresMultiValue = b.inactivityScoresMultiValue.Reset(b)
initialMVslice.Detach(b)
multiValueCountGauge.WithLabelValues(types.InactivityScores.String()).Inc()
runtime.SetFinalizer(b.inactivityScoresMultiValue, inactivityScoresFinalizer)
}
}

func randaoMixesFinalizer(m *MultiValueRandaoMixes) {
multiValueCountGauge.WithLabelValues(types.RandaoMixes.String()).Dec()
}
Expand Down
2 changes: 1 addition & 1 deletion beacon-chain/sync/initial-sync/blocks_fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -606,7 +606,7 @@ func TestBlocksFetcher_WaitForBandwidth(t *testing.T) {
p1.Connect(p2)
require.Equal(t, 1, len(p1.BHost.Network().Peers()), "Expected peers to be connected")
req := &ethpb.BeaconBlocksByRangeRequest{
Count: 64,
Count: 64,
}

topic := p2pm.RPCBlocksByRangeTopicV1
Expand Down
56 changes: 56 additions & 0 deletions container/multi-value-slice/multi_value_slice.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,6 +96,10 @@ import (
"github.com/pkg/errors"
)

// Amount of references beyond which a multivalue object is considered
// fragmented.
const fragmentationLimit = 50000

// Id is an object identifier.
type Id = uint64

Expand Down Expand Up @@ -424,6 +428,58 @@ func (s *Slice[V]) MultiValueStatistics() MultiValueStatistics {
return stats
}

// IsFragmented checks if our mutlivalue object is fragmented (individual references held).
// If the number of references is higher than our threshold we return true.
func (s *Slice[V]) IsFragmented() bool {
stats := s.MultiValueStatistics()
return stats.TotalIndividualElemReferences+stats.TotalAppendedElemReferences >= fragmentationLimit
}

// Reset builds a new multivalue object with respect to the
// provided object's id. The base slice will be based on this
// particular id.
func (s *Slice[V]) Reset(obj Identifiable) *Slice[V] {
s.lock.RLock()
defer s.lock.RUnlock()

l, ok := s.cachedLengths[obj.Id()]
if !ok {
l = len(s.sharedItems)
}

items := make([]V, l)
copy(items, s.sharedItems)
for i, ind := range s.individualItems {
for _, v := range ind.Values {
_, found := containsId(v.ids, obj.Id())
if found {
items[i] = v.val
break
}
}
}

index := len(s.sharedItems)
for _, app := range s.appendedItems {
found := true
for _, v := range app.Values {
_, found = containsId(v.ids, obj.Id())
if found {
items[index] = v.val
index++
break
}
}
if !found {
break
}
}

reset := &Slice[V]{}
reset.Init(items)
return reset
}

func (s *Slice[V]) fillOriginalItems(obj Identifiable, items *[]V) {
for i, item := range s.sharedItems {
ind, ok := s.individualItems[uint64(i)]
Expand Down
150 changes: 150 additions & 0 deletions container/multi-value-slice/multi_value_slice_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,156 @@ func TestDetach(t *testing.T) {
assert.Equal(t, false, ok)
}

func TestReset(t *testing.T) {
s := setup()
obj := &testObject{id: 2}

reset := s.Reset(obj)

assert.Equal(t, 8, len(reset.sharedItems))
assert.Equal(t, 123, reset.sharedItems[0])
assert.Equal(t, 2, reset.sharedItems[1])
assert.Equal(t, 3, reset.sharedItems[2])
assert.Equal(t, 123, reset.sharedItems[3])
assert.Equal(t, 2, reset.sharedItems[4])
assert.Equal(t, 2, reset.sharedItems[5])
assert.Equal(t, 3, reset.sharedItems[6])
assert.Equal(t, 2, reset.sharedItems[7])
assert.Equal(t, 0, len(reset.individualItems))
assert.Equal(t, 0, len(reset.appendedItems))
}

func TestFragmentation_IndividualReferences(t *testing.T) {
s := &Slice[int]{}
s.Init([]int{123, 123, 123, 123, 123})
s.individualItems[1] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 1,
ids: []uint64{1},
},
{
val: 2,
ids: []uint64{2},
},
},
}
s.individualItems[2] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 3,
ids: []uint64{1, 2},
},
},
}

numOfRefs := fragmentationLimit / 2
for i := 3; i < numOfRefs; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}

assert.Equal(t, false, s.IsFragmented())

// Add more references to hit fragmentation limit. Id 1
// has 2 references above.
for i := numOfRefs; i < numOfRefs+3; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, true, s.IsFragmented())
}

func TestFragmentation_AppendedReferences(t *testing.T) {
s := &Slice[int]{}
s.Init([]int{123, 123, 123, 123, 123})
s.appendedItems = []*MultiValueItem[int]{
{
Values: []*Value[int]{
{
val: 1,
ids: []uint64{1},
},
{
val: 2,
ids: []uint64{2},
},
},
},
{
Values: []*Value[int]{
{
val: 3,
ids: []uint64{1, 2},
},
},
},
}
s.cachedLengths[1] = 7
s.cachedLengths[2] = 8

numOfRefs := fragmentationLimit / 2
for i := 3; i < numOfRefs; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}

assert.Equal(t, false, s.IsFragmented())

// Add more references to hit fragmentation limit. Id 1
// has 2 references above.
for i := numOfRefs; i < numOfRefs+3; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, true, s.IsFragmented())
}

func TestFragmentation_IndividualAndAppendedReferences(t *testing.T) {
s := &Slice[int]{}
s.Init([]int{123, 123, 123, 123, 123})
s.individualItems[2] = &MultiValueItem[int]{
Values: []*Value[int]{
{
val: 3,
ids: []uint64{1, 2},
},
},
}
s.appendedItems = []*MultiValueItem[int]{
{
Values: []*Value[int]{
{
val: 1,
ids: []uint64{1},
},
{
val: 2,
ids: []uint64{2},
},
},
},
}
s.cachedLengths[1] = 7
s.cachedLengths[2] = 8

numOfRefs := fragmentationLimit / 2
for i := 3; i < numOfRefs; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}

assert.Equal(t, false, s.IsFragmented())

// Add more references to hit fragmentation limit. Id 1
// has 2 references above.
for i := numOfRefs; i < numOfRefs+3; i++ {
obj := &testObject{id: uint64(i)}
s.Copy(&testObject{id: 1}, obj)
}
assert.Equal(t, true, s.IsFragmented())
}

// Share the slice between 2 objects.
// Index 0: Shared value
// Index 1: Different individual value
Expand Down

0 comments on commit 1ff5a43

Please sign in to comment.