forked from onflow/flow-go
-
Notifications
You must be signed in to change notification settings - Fork 0
/
engine.go
182 lines (163 loc) · 6.45 KB
/
engine.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
package compliance
import (
"fmt"
"github.com/rs/zerolog"
"github.com/onflow/flow-go/consensus/hotstuff"
"github.com/onflow/flow-go/consensus/hotstuff/model"
"github.com/onflow/flow-go/engine"
"github.com/onflow/flow-go/engine/common/fifoqueue"
"github.com/onflow/flow-go/engine/consensus"
"github.com/onflow/flow-go/model/flow"
"github.com/onflow/flow-go/model/messages"
"github.com/onflow/flow-go/module"
"github.com/onflow/flow-go/module/component"
"github.com/onflow/flow-go/module/events"
"github.com/onflow/flow-go/module/irrecoverable"
"github.com/onflow/flow-go/module/metrics"
"github.com/onflow/flow-go/state/protocol"
"github.com/onflow/flow-go/storage"
)
// defaultBlockQueueCapacity maximum capacity of inbound queue for `messages.BlockProposal`s
const defaultBlockQueueCapacity = 10_000
// Engine is a wrapper around `compliance.Core`. The Engine queues inbound messages, relevant
// node-internal notifications, and manages the worker routines processing the inbound events,
// and forwards outbound messages to the networking layer.
// `compliance.Core` implements the actual compliance logic.
// Implements consensus.Compliance interface.
type Engine struct {
component.Component
hotstuff.FinalizationConsumer
log zerolog.Logger
mempoolMetrics module.MempoolMetrics
engineMetrics module.EngineMetrics
me module.Local
headers storage.Headers
payloads storage.Payloads
tracer module.Tracer
state protocol.State
core *Core
pendingBlocks *fifoqueue.FifoQueue // queue for processing inbound blocks
pendingBlocksNotifier engine.Notifier
}
var _ consensus.Compliance = (*Engine)(nil)
func NewEngine(
log zerolog.Logger,
me module.Local,
core *Core,
) (*Engine, error) {
// Inbound FIFO queue for `messages.BlockProposal`s
blocksQueue, err := fifoqueue.NewFifoQueue(
defaultBlockQueueCapacity,
fifoqueue.WithLengthObserver(func(len int) { core.mempoolMetrics.MempoolEntries(metrics.ResourceBlockProposalQueue, uint(len)) }),
)
if err != nil {
return nil, fmt.Errorf("failed to create queue for inbound block proposals: %w", err)
}
eng := &Engine{
log: log.With().Str("compliance", "engine").Logger(),
me: me,
mempoolMetrics: core.mempoolMetrics,
engineMetrics: core.engineMetrics,
headers: core.headers,
payloads: core.payloads,
pendingBlocks: blocksQueue,
state: core.state,
tracer: core.tracer,
core: core,
pendingBlocksNotifier: engine.NewNotifier(),
}
finalizationActor, finalizationWorker := events.NewFinalizationActor(eng.processOnFinalizedBlock)
eng.FinalizationConsumer = finalizationActor
// create the component manager and worker threads
eng.Component = component.NewComponentManagerBuilder().
AddWorker(eng.processBlocksLoop).
AddWorker(finalizationWorker).
Build()
return eng, nil
}
// processBlocksLoop processes available block, vote, and timeout messages as they are queued.
func (e *Engine) processBlocksLoop(ctx irrecoverable.SignalerContext, ready component.ReadyFunc) {
ready()
doneSignal := ctx.Done()
newMessageSignal := e.pendingBlocksNotifier.Channel()
for {
select {
case <-doneSignal:
return
case <-newMessageSignal:
err := e.processQueuedBlocks(doneSignal) // no errors expected during normal operations
if err != nil {
ctx.Throw(err)
}
}
}
}
// processQueuedBlocks processes any available messages until the message queue is empty.
// Only returns when all inbound queues are empty (or the engine is terminated).
// No errors are expected during normal operation. All returned exceptions are potential
// symptoms of internal state corruption and should be fatal.
func (e *Engine) processQueuedBlocks(doneSignal <-chan struct{}) error {
for {
select {
case <-doneSignal:
return nil
default:
}
msg, ok := e.pendingBlocks.Pop()
if ok {
batch := msg.(flow.Slashable[[]*messages.BlockProposal])
for _, block := range batch.Message {
err := e.core.OnBlockProposal(flow.Slashable[*messages.BlockProposal]{
OriginID: batch.OriginID,
Message: block,
})
e.core.engineMetrics.MessageHandled(metrics.EngineCompliance, metrics.MessageBlockProposal)
if err != nil {
return fmt.Errorf("could not handle block proposal: %w", err)
}
}
continue
}
// when there are no more messages in the queue, back to the processBlocksLoop to wait
// for the next incoming message to arrive.
return nil
}
}
// OnBlockProposal feeds a new block proposal into the processing pipeline.
// Incoming proposals are queued and eventually dispatched by worker.
func (e *Engine) OnBlockProposal(proposal flow.Slashable[*messages.BlockProposal]) {
e.core.engineMetrics.MessageReceived(metrics.EngineCompliance, metrics.MessageBlockProposal)
proposalAsList := flow.Slashable[[]*messages.BlockProposal]{
OriginID: proposal.OriginID,
Message: []*messages.BlockProposal{proposal.Message},
}
if e.pendingBlocks.Push(proposalAsList) {
e.pendingBlocksNotifier.Notify()
} else {
e.core.engineMetrics.InboundMessageDropped(metrics.EngineCompliance, metrics.MessageBlockProposal)
}
}
// OnSyncedBlocks feeds a batch of blocks obtained via sync into the processing pipeline.
// Blocks in batch aren't required to be in any particular order.
// Incoming proposals are queued and eventually dispatched by worker.
func (e *Engine) OnSyncedBlocks(blocks flow.Slashable[[]*messages.BlockProposal]) {
e.core.engineMetrics.MessageReceived(metrics.EngineCompliance, metrics.MessageSyncedBlocks)
if e.pendingBlocks.Push(blocks) {
e.pendingBlocksNotifier.Notify()
} else {
e.core.engineMetrics.InboundMessageDropped(metrics.EngineCompliance, metrics.MessageSyncedBlocks)
}
}
// processOnFinalizedBlock informs compliance.Core about finalization of the respective block.
// The input to this callback is treated as trusted. This method should be executed on
// `OnFinalizedBlock` notifications from the node-internal consensus instance.
// No errors expected during normal operations.
func (e *Engine) processOnFinalizedBlock(block *model.Block) error {
// retrieve the latest finalized header, so we know the height
finalHeader, err := e.headers.ByBlockID(block.BlockID)
if err != nil { // no expected errors
return fmt.Errorf("could not get finalized header: %w", err)
}
e.core.ProcessFinalizedBlock(finalHeader)
return nil
}