Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Pusher Engine with updated interface #6780

Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/collection/pusher/engine.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,7 +146,7 @@ func (e *Engine) processOutboundMessages(ctx context.Context) error {
// Because the pusher engine does not accept inputs from the network,
// always drop any messages and return an error.
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
return fmt.Errorf("pusher engine should only receive local messages on the same node")
return fmt.Errorf("pusher engine should only receive local messages on the same node: got message %T on channel %v from origin %v", message, channel, originID)
}

// SubmitCollectionGuarantee adds a collection guarantee to the engine's queue
Expand Down
2 changes: 0 additions & 2 deletions engine/collection/pusher/engine_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,8 +97,6 @@ func (suite *Suite) TestSubmitCollectionGuarantee() {
Run(func(_ mock.Arguments) { close(done) }).Return(nil).Once()

suite.engine.SubmitCollectionGuarantee(guarantee)
// TODO signature?
//suite.Require().Nil(err)

unittest.RequireCloseBefore(suite.T(), done, time.Second, "message not sent")

Expand Down
8 changes: 4 additions & 4 deletions module/finalizer/collection/finalizer.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,21 +21,21 @@ import (
type Finalizer struct {
db *badger.DB
transactions mempool.Transactions
prov collection.GuaranteedCollectionPublisher
pusher collection.GuaranteedCollectionPublisher
metrics module.CollectionMetrics
}

// NewFinalizer creates a new finalizer for collection nodes.
func NewFinalizer(
db *badger.DB,
transactions mempool.Transactions,
prov collection.GuaranteedCollectionPublisher,
pusher collection.GuaranteedCollectionPublisher,
metrics module.CollectionMetrics,
) *Finalizer {
f := &Finalizer{
db: db,
transactions: transactions,
prov: prov,
pusher: pusher,
metrics: metrics,
}
return f
Expand Down Expand Up @@ -159,7 +159,7 @@ func (f *Finalizer) MakeFinal(blockID flow.Identifier) error {
// collection.

// TODO add real signatures here (2711)
tim-barry marked this conversation as resolved.
Show resolved Hide resolved
f.prov.SubmitCollectionGuarantee(&flow.CollectionGuarantee{
f.pusher.SubmitCollectionGuarantee(&flow.CollectionGuarantee{
CollectionID: payload.Collection.ID(),
ReferenceBlockID: payload.ReferenceBlockID,
ChainID: header.ChainID,
Expand Down
132 changes: 61 additions & 71 deletions module/finalizer/collection/finalizer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,9 +64,8 @@ func TestFinalizer(t *testing.T) {
bootstrap()
defer cleanup()

prov := new(collectionmock.GuaranteedCollectionPublisher)
prov.On("SubmitCollectionGuarantee", mock.Anything)
finalizer := collection.NewFinalizer(db, pool, prov, metrics)
pusher := collectionmock.NewGuaranteedCollectionPublisher(t)
finalizer := collection.NewFinalizer(db, pool, pusher, metrics)

fakeBlockID := unittest.IdentifierFixture()
err := finalizer.MakeFinal(fakeBlockID)
Expand All @@ -77,9 +76,9 @@ func TestFinalizer(t *testing.T) {
bootstrap()
defer cleanup()

prov := new(collectionmock.GuaranteedCollectionPublisher)
prov.On("SubmitCollectionGuarantee", mock.Anything)
finalizer := collection.NewFinalizer(db, pool, prov, metrics)
pusher := collectionmock.NewGuaranteedCollectionPublisher(t)
pusher.On("SubmitCollectionGuarantee", mock.Anything).Once()
finalizer := collection.NewFinalizer(db, pool, pusher, metrics)

// tx1 is included in the finalized block
tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 })
Expand All @@ -103,9 +102,8 @@ func TestFinalizer(t *testing.T) {
bootstrap()
defer cleanup()

prov := new(collectionmock.GuaranteedCollectionPublisher)
prov.On("SubmitCollectionGuarantee", mock.Anything)
finalizer := collection.NewFinalizer(db, pool, prov, metrics)
pusher := collectionmock.NewGuaranteedCollectionPublisher(t)
finalizer := collection.NewFinalizer(db, pool, pusher, metrics)

// create a new block that isn't connected to a parent
block := unittest.ClusterBlockWithParent(genesis)
Expand All @@ -122,8 +120,8 @@ func TestFinalizer(t *testing.T) {
bootstrap()
defer cleanup()

prov := new(collectionmock.GuaranteedCollectionPublisher)
finalizer := collection.NewFinalizer(db, pool, prov, metrics)
pusher := collectionmock.NewGuaranteedCollectionPublisher(t)
finalizer := collection.NewFinalizer(db, pool, pusher, metrics)

// create a block with empty payload on genesis
block := unittest.ClusterBlockWithParent(genesis)
Expand All @@ -140,16 +138,15 @@ func TestFinalizer(t *testing.T) {
assert.Equal(t, block.ID(), final.ID())

// collection should not have been propagated
prov.AssertNotCalled(t, "SubmitCollectionGuarantee", mock.Anything)
pusher.AssertNotCalled(t, "SubmitCollectionGuarantee", mock.Anything)
})

t.Run("finalize single block", func(t *testing.T) {
bootstrap()
defer cleanup()

prov := new(collectionmock.GuaranteedCollectionPublisher)
prov.On("SubmitCollectionGuarantee", mock.Anything)
finalizer := collection.NewFinalizer(db, pool, prov, metrics)
pusher := collectionmock.NewGuaranteedCollectionPublisher(t)
finalizer := collection.NewFinalizer(db, pool, pusher, metrics)

// tx1 is included in the finalized block and mempool
tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 })
Expand All @@ -163,6 +160,15 @@ func TestFinalizer(t *testing.T) {
block.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx1))
insert(block)

// block should be passed to pusher
pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block.Header.ChainID,
SignerIndices: block.Header.ParentVoterIndices,
Signature: nil,
}).Once()

// finalize the block
err := finalizer.MakeFinal(block.ID())
assert.Nil(t, err)
Expand All @@ -177,26 +183,15 @@ func TestFinalizer(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, block.ID(), final.ID())
assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, final.ID())

// block should be passed to provider
prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1)
prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block.Header.ChainID,
SignerIndices: block.Header.ParentVoterIndices,
Signature: nil,
})
})

// when finalizing a block with un-finalized ancestors, those ancestors should be finalized as well
t.Run("finalize multiple blocks together", func(t *testing.T) {
bootstrap()
defer cleanup()

prov := new(collectionmock.GuaranteedCollectionPublisher)
prov.On("SubmitCollectionGuarantee", mock.Anything)
finalizer := collection.NewFinalizer(db, pool, prov, metrics)
pusher := collectionmock.NewGuaranteedCollectionPublisher(t)
finalizer := collection.NewFinalizer(db, pool, pusher, metrics)

// tx1 is included in the first finalized block and mempool
tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 })
Expand All @@ -215,6 +210,22 @@ func TestFinalizer(t *testing.T) {
block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2))
insert(block2)

// both blocks should be passed to pusher
pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block1.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block1.Header.ChainID,
SignerIndices: block1.Header.ParentVoterIndices,
Signature: nil,
}).Once()
pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block2.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block2.Header.ChainID,
SignerIndices: block2.Header.ParentVoterIndices,
Signature: nil,
}).Once()

// finalize block2 (should indirectly finalize block1 as well)
err := finalizer.MakeFinal(block2.ID())
assert.Nil(t, err)
Expand All @@ -228,32 +239,14 @@ func TestFinalizer(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, block2.ID(), final.ID())
assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID(), block2.ID())

// both blocks should be passed to provider
prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 2)
prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block1.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block1.Header.ChainID,
SignerIndices: block1.Header.ParentVoterIndices,
Signature: nil,
})
prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block2.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block2.Header.ChainID,
SignerIndices: block2.Header.ParentVoterIndices,
Signature: nil,
})
})

t.Run("finalize with un-finalized child", func(t *testing.T) {
bootstrap()
defer cleanup()

prov := new(collectionmock.GuaranteedCollectionPublisher)
prov.On("SubmitCollectionGuarantee", mock.Anything)
finalizer := collection.NewFinalizer(db, pool, prov, metrics)
pusher := collectionmock.NewGuaranteedCollectionPublisher(t)
finalizer := collection.NewFinalizer(db, pool, pusher, metrics)

// tx1 is included in the finalized parent block and mempool
tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 })
Expand All @@ -272,6 +265,15 @@ func TestFinalizer(t *testing.T) {
block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2))
insert(block2)

// block should be passed to pusher
pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block1.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block1.Header.ChainID,
SignerIndices: block1.Header.ParentVoterIndices,
Signature: nil,
}).Once()

// finalize block1 (should NOT finalize block2)
err := finalizer.MakeFinal(block1.ID())
assert.Nil(t, err)
Expand All @@ -286,26 +288,15 @@ func TestFinalizer(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, block1.ID(), final.ID())
assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID())

// block should be passed to provider
prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1)
prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block1.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block1.Header.ChainID,
SignerIndices: block1.Header.ParentVoterIndices,
Signature: nil,
})
})

// when finalizing a block with a conflicting fork, the fork should not be finalized.
t.Run("conflicting fork", func(t *testing.T) {
bootstrap()
defer cleanup()

prov := new(collectionmock.GuaranteedCollectionPublisher)
prov.On("SubmitCollectionGuarantee", mock.Anything)
finalizer := collection.NewFinalizer(db, pool, prov, metrics)
pusher := collectionmock.NewGuaranteedCollectionPublisher(t)
finalizer := collection.NewFinalizer(db, pool, pusher, metrics)

// tx1 is included in the finalized block and mempool
tx1 := unittest.TransactionBodyFixture(func(tx *flow.TransactionBody) { tx.ProposalKey.SequenceNumber = 1 })
Expand All @@ -324,6 +315,15 @@ func TestFinalizer(t *testing.T) {
block2.SetPayload(model.PayloadFromTransactions(refBlock.ID(), &tx2))
insert(block2)

// block should be passed to pusher
pusher.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block1.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block1.Header.ChainID,
SignerIndices: block1.Header.ParentVoterIndices,
Signature: nil,
}).Once()

// finalize block1
err := finalizer.MakeFinal(block1.ID())
assert.Nil(t, err)
Expand All @@ -338,16 +338,6 @@ func TestFinalizer(t *testing.T) {
assert.Nil(t, err)
assert.Equal(t, block1.ID(), final.ID())
assertClusterBlocksIndexedByReferenceHeight(t, db, refBlock.Height, block1.ID())

// block should be passed to provider
prov.AssertNumberOfCalls(t, "SubmitCollectionGuarantee", 1)
prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block1.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block1.Header.ChainID,
SignerIndices: block1.Header.ParentVoterIndices,
Signature: nil,
})
})
})
}
Expand Down
Loading