Skip to content

Commit

Permalink
Merge pull request #972 from dusk-network/fix-971
Browse files Browse the repository at this point in the history
Introduce TTL property to Cucko Filter instance per round
  • Loading branch information
goshawk-3 authored Mar 20, 2021
2 parents b971b86 + b328375 commit 02bc3a5
Show file tree
Hide file tree
Showing 11 changed files with 132 additions and 284 deletions.
2 changes: 1 addition & 1 deletion cmd/dusk/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -359,7 +359,7 @@ func (s *Server) Close() {
func registerPeerServices(processor *peer.MessageProcessor, db database.DB, eventBus *eventbus.EventBus, rpcBus *rpcbus.RPCBus) {
processor.Register(topics.Ping, responding.ProcessPing)
dataBroker := responding.NewDataBroker(db, rpcBus)
dataRequestor := responding.NewDataRequestor(db, rpcBus, eventBus)
dataRequestor := responding.NewDataRequestor(db, rpcBus)
bhb := responding.NewBlockHashBroker(db)
cb := responding.NewCandidateBroker(db)
cp := consensus.NewPublisher(eventBus)
Expand Down
3 changes: 2 additions & 1 deletion pkg/config/groups.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,8 @@ type networkConfiguration struct {
Monitor monitorConfiguration
Port string

MaxDupeMapItems uint32
MaxDupeMapItems uint32
MaxDupeMapExpire uint32
}

type kadcastConfiguration struct {
Expand Down
7 changes: 5 additions & 2 deletions pkg/config/samples/default.dusk.toml
Original file line number Diff line number Diff line change
Expand Up @@ -56,9 +56,12 @@ streamErrors=true
# port for the node to bind on
port=7000

# Maximum number of items that can be registered per round in any DupeMap
# Up to 1MB per DupeMap instance could be allocated if maxDupeMapItems=300000
# Maximum number of items that can be registered by a single DupeMap
# Up to ~0.3MB per DupeMap instance could be allocated if maxDupeMapItems=300000
maxDupeMapItems=300000
# Number of seconds before dupemap expires and gets reset.
# Ideally should be less than 15s - average consensus time
maxDupeMapExpire=5

[network.seeder]
# array of seeder servers
Expand Down
4 changes: 2 additions & 2 deletions pkg/core/consensus/testing/utils.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,7 +71,7 @@ type gossipRouter struct {

func (g *gossipRouter) route(m message.Message) {
b := m.Payload().(message.SafeBuffer).Buffer
if g.d.CanFwd(&b) {
if g.d.HasAnywhere(&b) {
// The incoming message will be a message.SafeBuffer, as it's coming from
// the consensus.Emitter.
m, err := message.Unmarshal(&b)
Expand All @@ -84,7 +84,7 @@ func (g *gossipRouter) route(m message.Message) {
}

func rerouteGossip(eb *eventbus.EventBus) {
router := &gossipRouter{eb, dupemap.Launch(eb)}
router := &gossipRouter{eb, dupemap.NewDupeMap(5, 100)}
gossipListener := eventbus.NewSafeCallbackListener(router.route)
eb.Subscribe(topics.Gossip, gossipListener)
}
Expand Down
72 changes: 22 additions & 50 deletions pkg/p2p/peer/dupemap/dupemap.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,83 +10,55 @@ import (
"bytes"

cfg "github.com/dusk-network/dusk-blockchain/pkg/config"
"github.com/dusk-network/dusk-blockchain/pkg/core/consensus"
"github.com/dusk-network/dusk-blockchain/pkg/util/nativeutils/eventbus"
log "github.com/sirupsen/logrus"
)

const (
defaultTolerance = uint64(3)
defaultCapacity = uint32(100000)
defaultCapacity = uint32(300000)
defaultExpire = int64(5)
)

// TODO: DupeMap should deal with value bytes.Buffer rather than pointers as it is not supposed to mutate the struct.
//nolint:golint
type DupeMap struct {
round uint64
tmpMap *TmpMap
tolerance uint64
tmpMap *TmpMap
}

// Launch returns a dupemap which is self-cleaning, by launching a goroutine
// which listens for accepted blocks, and updates the height upon receipt.
func Launch(eventBus eventbus.Broker) *DupeMap {
acceptedBlockChan, _ := consensus.InitAcceptedBlockUpdate(eventBus)

// NewDupeMapDefault returns a dupemap instance with default config.
func NewDupeMapDefault() *DupeMap {
capacity := cfg.Get().Network.MaxDupeMapItems
if capacity == 0 {
capacity = defaultCapacity
}

log.WithField("cap", capacity).Info("launch dupemap instance")

// NB defaultTolerance is number of rounds before data expires. That's
// said, the overall number of items per DupeMap instance is
// defaultTolerance*maxItemsPerRound
//
// E.g 3 * 300,000 ~ 1MB max memory allocated by a DupeMap instance

dupeBlacklist := NewDupeMap(1, capacity)

go func() {
for {
blk := <-acceptedBlockChan
// NOTE: do we need locking?
dupeBlacklist.UpdateHeight(blk.Header.Height)
}
}()
expire := int64(cfg.Get().Network.MaxDupeMapExpire)
if expire == 0 {
expire = defaultExpire
}

return dupeBlacklist
return NewDupeMap(expire, capacity)
}

// NewDupeMap returns a DupeMap.
func NewDupeMap(round uint64, capacity uint32) *DupeMap {
tmpMap := NewTmpMap(defaultTolerance, capacity)
// NewDupeMap creates new dupemap instance.
func NewDupeMap(expire int64, capacity uint32) *DupeMap {
log.WithField("cap", capacity).Info("create dupemap instance")

tmpMap := NewTmpMap(capacity, expire)

return &DupeMap{
round,
tmpMap,
defaultTolerance,
}
}

// UpdateHeight for a round.
func (d *DupeMap) UpdateHeight(round uint64) {
d.tmpMap.UpdateHeight(round)
}

// SetTolerance for a round.
func (d *DupeMap) SetTolerance(roundNr uint64) {
threshold := d.tmpMap.Height() - roundNr
d.tmpMap.DeleteBefore(threshold)
d.tmpMap.SetTolerance(roundNr)
}

// CanFwd tests if any of Cuckoo Filters (a filter per round) knows already
// HasAnywhere tests if any of Cuckoo Filters (a filter per round) knows already
// this payload. Similarly to Bloom Filters, False positive matches are
// possible, but false negatives are not.
func (d *DupeMap) CanFwd(payload *bytes.Buffer) bool {
found := d.tmpMap.HasAnywhere(payload)
// In addition, it also resets all expired items.
func (d *DupeMap) HasAnywhere(payload *bytes.Buffer) bool {
// Reset any bloom filters that have expired
d.tmpMap.CleanExpired()

found := d.tmpMap.Has(payload)
if found {
return false
}
Expand Down
63 changes: 13 additions & 50 deletions pkg/p2p/peer/dupemap/dupemap_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,22 +15,6 @@ import (
"github.com/stretchr/testify/assert"
)

var dupeTests = []struct {
height uint64
tolerance uint64
canFwd bool
}{
{1, 3, true},
{1, 3, false},
{2, 3, false},
{4, 3, true},
{4, 3, false},
{5, 3, false},
{7, 1, true},
{8, 1, false},
{9, 1, true},
}

var dupeFilterTests = []struct {
data uint16
canFwd bool
Expand All @@ -48,40 +32,21 @@ var dupeFilterTests = []struct {
{9, true},
}

func TestDupeMap(t *testing.T) {
dupeMap := dupemap.NewDupeMap(1, 100)

test := bytes.NewBufferString("This is a test")

for _, tt := range dupeTests {
dupeMap.UpdateHeight(tt.height)
dupeMap.SetTolerance(tt.tolerance)

res := dupeMap.CanFwd(test)
if !assert.Equal(t, tt.canFwd, res) {
assert.FailNowf(t, "failure", "DupeMap.CanFwd: expected %t, got %t with height %d and tolerance %d", res, tt.canFwd, tt.height, tt.tolerance)
}
}

t.Log("Size: ", dupeMap.Size())
}

func TestCanFwd(t *testing.T) {
dupeMap := dupemap.NewDupeMap(1, 100)
dupeMap.SetTolerance(10)
func TestHasAnywhere(t *testing.T) {
dupeMap := dupemap.NewDupeMap(5, 100)

for i, tt := range dupeFilterTests {
test := make([]byte, 2)
binary.BigEndian.PutUint16(test, tt.data)

res := dupeMap.CanFwd(bytes.NewBuffer(test))
res := dupeMap.HasAnywhere(bytes.NewBuffer(test))
if !assert.Equal(t, tt.canFwd, res) {
assert.FailNowf(t, "failure", "DupeMap.CanFwd: expected %t, got %t, index %d", res, tt.canFwd, i)
assert.FailNowf(t, "failure", "DupeMap.HasAnywhere: expected %t, got %t, index %d", res, tt.canFwd, i)
}
}
}

func TestCanFwdBigData(t *testing.T) {
func TestHasAnywhereBigData(t *testing.T) {
type testu struct {
payload *bytes.Buffer
canFwd bool
Expand All @@ -99,15 +64,14 @@ func TestCanFwdBigData(t *testing.T) {

// Initialize a dupemap with 1M capacity per round-filter
itemsCount := uint32(1000 * 1000)
dupeMap := dupemap.NewDupeMap(1, itemsCount)
dupeMap.SetTolerance(10)
dupeMap := dupemap.NewDupeMap(10, itemsCount)

falsePositiveCount := uint(0)

for _, d := range testData {
// underlying filter structure is a probabilistic data structure
// That's said, Few false positive are possible.
if !dupeMap.CanFwd(d.payload) {
if !dupeMap.HasAnywhere(d.payload) {
falsePositiveCount++
}
}
Expand All @@ -118,11 +82,11 @@ func TestCanFwdBigData(t *testing.T) {
assert.Failf(t, "failure", "false positive are too many %f", falsePositiveRate)
}

// Now CanFwd should always returns false
// Now HasAnywhere should always returns false
for _, d := range testData {
// Ensure that the underlying filter structure supports "definitely
// no" a.k.a no false negative
if dupeMap.CanFwd(d.payload) != false {
if dupeMap.HasAnywhere(d.payload) != false {
t.FailNow()
}
}
Expand All @@ -131,7 +95,7 @@ func TestCanFwdBigData(t *testing.T) {
assert.LessOrEqual(t, dupeMap.Size(), 1024*1024)
}

func BenchmarkCanFwd(b *testing.B) {
func BenchmarkHasAnywhere(b *testing.B) {
b.StopTimer()

type testu struct {
Expand All @@ -151,18 +115,17 @@ func BenchmarkCanFwd(b *testing.B) {
for i := 0; i < b.N; i++ {
b.StopTimer()

dupeMap := dupemap.NewDupeMap(1, 1000000)
dupeMap.SetTolerance(10)
dupeMap := dupemap.NewDupeMap(5, 1000000)

b.StartTimer()

// CanFwd always returns true
for _, t := range testData {
_ = dupeMap.CanFwd(t.payload)
_ = dupeMap.HasAnywhere(t.payload)
}

for _, t := range testData {
_ = dupeMap.CanFwd(t.payload)
_ = dupeMap.HasAnywhere(t.payload)
}
}
}
Loading

0 comments on commit 02bc3a5

Please sign in to comment.