Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor Pusher Engine with updated interface #6780

Conversation

tim-barry
Copy link

- Remove pusher engine implementation of network.Engine
  - Replace with network.MessageProcessor
  - See: #6747 (comment)
- Remove SubmitCollectionGuarantee message type
  - Was only used between Finalizer and Pusher engine
  - New interface passes and stores collection guarantees directly,
    instead of wrapping and then unwrapping them
  - See: #6747 (comment)
- Add GuaranteedCollectionPublisher interface, implemented by pusher engine
  - Only used by the Finalizer (and intermediate constructors)
  - Mocks are generated for it, used in Finalizer unit tests
  - See: #6747 (comment)
@codecov-commenter
Copy link

codecov-commenter commented Dec 4, 2024

Codecov Report

Attention: Patch coverage is 73.52941% with 9 lines in your changes missing coverage. Please review.

Project coverage is 41.25%. Comparing base (7e4258a) to head (f58ccaf).

Files with missing lines Patch % Lines
...collection/mock/guaranteed_collection_publisher.go 0.00% 8 Missing ⚠️
engine/collection/pusher/engine.go 94.73% 1 Missing ⚠️
Additional details and impacted files
@@                       Coverage Diff                       @@
##           feature/pusher-engine-refactor    #6780   +/-   ##
===============================================================
  Coverage                           41.24%   41.25%           
===============================================================
  Files                                2061     2062    +1     
  Lines                              182737   182738    +1     
===============================================================
+ Hits                                75375    75382    +7     
+ Misses                             101045   101044    -1     
+ Partials                             6317     6312    -5     
Flag Coverage Δ
unittests 41.25% <73.52%> (+<0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

☔ View full report in Codecov by Sentry.
📢 Have feedback on the report? Share it here.

@tim-barry tim-barry marked this pull request as ready for review December 4, 2024 17:07
@tim-barry
Copy link
Author

Issue: #6765

module/finalizer/collection/finalizer.go Outdated Show resolved Hide resolved
module/finalizer/collection/finalizer_test.go Outdated Show resolved Hide resolved
engine/collection/pusher/engine.go Outdated Show resolved Hide resolved
engine/collection/pusher/engine_test.go Outdated Show resolved Hide resolved
module/finalizer/collection/finalizer_test.go Outdated Show resolved Hide resolved
module/finalizer/collection/finalizer_test.go Outdated Show resolved Hide resolved
Comment on lines 183 to 189
prov.AssertCalled(t, "SubmitCollectionGuarantee", &flow.CollectionGuarantee{
CollectionID: block.Payload.Collection.ID(),
ReferenceBlockID: refBlock.ID(),
ChainID: block.Header.ChainID,
SignerIndices: block.Header.ParentVoterIndices,
Signature: nil,
})
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We can similarly change this call to be part of the assertion at the beginning of the test. We would remove lines 183-189 and replace line 150 with:

prov.On("SubmitCollectionGuarantee", &flow.CollectionGuarantee{
	CollectionID:     block.Payload.Collection.ID(),
	ReferenceBlockID: refBlock.ID(),
	ChainID:          block.Header.ChainID,
	SignerIndices:    block.Header.ParentVoterIndices,
	Signature:        nil,
}).Once()

This is again a sign of older code using old patterns (not a problem with your PR: just pointing out something that is already present). When you write new tests, you should generally write specific assertions before execution, rather than afterward. It is less code, and having the assertions before execution avoids the potential pitfall of using the test output itself as the expected value.

I'm pointing this out mainly as a pattern to avoid directly replicating in your own code. But I do think it would be a good exercise to update the assertions in this test file as described above, to get a sense for how the library works and when/how assertions are validated.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did this in d157f48, and also made sure to see the error messages when the test fails because those assertions aren't met; definitely useful that it tells you when the number of expected calls doesn't match.

I did run into what I think is a VSCode issue: when the tests are run from VSCode and fail, the file that the error occurred in is given the wrong path (it's assumed to be in the "module/finalizer/collection/" directory, the same one as the test is in); when the tests are instead run on the command line via go test, it only reports the filename where the error occurred (not a full path).

- Construct the mock objects with their `.New___()` method instead of using
  golang's built-in `new` function, which enables automatic cleanup.
- Replace explicit AssertCalled checks at the end of tests with .On() and .Once()
  which will automatically be checked at the end of the test.
module/finalizer/collection/finalizer.go Outdated Show resolved Hide resolved
Copy link
Member

@AlexHentschel AlexHentschel left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very nice work 👏

Comment on lines 144 to 150
// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
// a non-blocking manner. It returns the potential processing error when done.
// Because the pusher engine does not accept inputs from the network,
// always drop any messages and return an error.
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
return fmt.Errorf("pusher engine should only receive local messages on the same node")
return fmt.Errorf("pusher engine should only receive local messages on the same node: got message %T on channel %v from origin %v", message, channel, originID)
}
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This suggestion is largely about mildly adjusting the framing of Process to align with the protocol. Jordan had a helpful explanation in his comment on the prior PR, which might be a useful reference here.

Here’s how I’d frame the context:

  • The Flow protocol requires that honest Collector nodes broadcast Collection guarantees to Consensus Nodes (and only those). "Broadcasting" here refers to epidemic gossip or similar algorithms used to disseminate messages efficiently while minimizing bandwidth usage. You don’t need to dive into the details for this PR, but I thought a quick note on terminology might be helpful. The relevant broadcasting operation is this:
    err = e.conduit.Publish(guarantee, consensusNodes.NodeIDs()...)
  • Now, consider the case where an honest Collector node receives a message from another node—let’s call it "Byzantine Bob." Bob, being byzantine, might deviate from protocol rules when broadcasting messages. While the protocol specifies that Collection Guarantees should be sent to the consensus committee, Bob might specify a different set of recipients for its message.
  • From the perspective of the pusher.Engine (only running within honest Collector nodes), we would receive such messages via the channels.PushGuarantees networking channel. It’s important to note that the networking layer operates as a low-level tool and doesn’t understand the higher-level protocol logic. Its role is simply to relay messages, even if they’re outside the protocol rules (there are only very very basic checks in the networking layer, as briefly explained in this notion doc).

However, there’s an opportunity to refine this behavior. Returning an error here informs the networking layer of an issue, but since the networking layer doesn’t understand the meaning of those errors, it’s essentially acting as a "glorified logger." Instead, I think it would be cleaner and more maintainable for the pusher.Engine, which has the detailed protocol knowledge, to handle this edge case directly. For example:

  • We could use the dedicated logging keys for flagging suspected protocolviolations:
    // KeySuspicious is a logging label that is used to flag the log event as suspicious behavior
    // This is used to add an easily searchable label to the log event
    KeySuspicious = "suspicious"
    These keys make it easier to identify and analyze specific issues in the logs.
  • Additionally, we can add an explanatory log entry here to provide context for the behavior without needing to involve the networking layer.
Suggested change
// Process processes the given event from the node with the given origin ID in
// a blocking manner. It returns the potential processing error when done.
// a non-blocking manner. It returns the potential processing error when done.
// Because the pusher engine does not accept inputs from the network,
// always drop any messages and return an error.
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
return fmt.Errorf("pusher engine should only receive local messages on the same node")
return fmt.Errorf("pusher engine should only receive local messages on the same node: got message %T on channel %v from origin %v", message, channel, originID)
}
// Process is called by the networking layer, when peers broadcast messages with this node
// as one of the recipients. The protocol specifies that Collector nodes broadcast Collection
// Guarantees to Consensus Nodes and _only_ those. When the pusher engine (running only on
// Collectors) receives a message, this message is evidence of byzantine behavior.
// Byzantine inputs are internally handled by the pusher.Engine and do *not* result in
// error returns. No errors expected during normal operation (including byzantine inputs).
func (e *Engine) Process(channel channels.Channel, originID flow.Identifier, message any) error {
// Targeting a collector node's pusher.Engine with messages could be considered as a slashable offense.
// Though, for generating cryptographic evidence, we need Message Forensics - see reference [1].
// Much further into the future, when we are implementing slashing challenges, we'll probably implement a
// dedicated consumer to post-process evidence of protocol violations into slashing challenges. For now,
// we just log this with the `KeySuspicious` to alert the node operator.
// [1] Message Forensics FLIP https://github.com/onflow/flips/pull/195)
errs := fmt.Errorf("collector node's pusher.Engine was targeted by message %T on channel %v", message, channel)
e.log.Warn().
Err(errs).
Bool(logging.KeySuspicious, true).
Str("peer_id", originID.String()).
Msg("potentially byzantine networking traffic detected")
return nil
}

Comment on lines 111 to 114
// send from a non-allowed role
sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0]

msg := &messages.SubmitCollectionGuarantee{
Guarantee: *guarantee,
}
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg)
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee)
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

unfortunately, with the change I suggested above, we would hide the case where pusher.Engine rejected the input from the external-facing MessageProcessor interface

// TODO: This function should not return an error.
// The networking layer's responsibility is fulfilled once it delivers a message to an engine.
// It does not possess the context required to handle errors that may arise during an engine's processing
// of the message, as error handling for message processing falls outside the domain of the networking layer.
// Consequently, it is reasonable to remove the error from the Process function's signature,
// since returning an error to the networking layer would not be useful in this context.
Process(channel channels.Channel, originID flow.Identifier, message interface{}) error

Trying to adjust the test so we verify that pusher.Engine handles any input interface and does not broadcast when Process is called (?) ... maybe something like 👇 ?

Suggested change
// send from a non-allowed role
sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0]
msg := &messages.SubmitCollectionGuarantee{
Guarantee: *guarantee,
}
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, msg)
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee)
// verify that pusher.Engine handles any (potentially byzantine) input:
// A byzantine peer could target the collector node's pusher engine with messages
// The pusher should discard those and explicitly not get tricked into broadcasting
// collection guarantees which a byzantine peer might try to inject into the system.
sender := suite.identities.Filter(filter.HasRole[flow.Identity](flow.RoleVerification))[0]
err := suite.engine.Process(channels.PushGuarantees, sender.NodeID, guarantee)
suite.Require().NoError(err)

@AlexHentschel
Copy link
Member

forgot to complement on that earlier: your PR description is great - very precise, thorough and concise.

Improve documentation for Process method of pusher engine,
and log an error instead of returning an error.
See: #6780 (comment)

Co-authored-by: Alexander Hentschel <[email protected]>
@tim-barry tim-barry merged commit 656cbd5 into feature/pusher-engine-refactor Dec 10, 2024
55 checks passed
@tim-barry tim-barry deleted the tim/6765-pusher-engine-update-interface branch December 10, 2024 20:42
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

4 participants