Skip to content

Commit

Permalink
Fix transaction deduplication bug in batch fetcher
Browse files Browse the repository at this point in the history
If the batch fetcher received transactions from the availability module
in a different order than the order in which they should be delivered,
it could happen that the deduplication code removed the transactions
from the wrong batches.
This is now fixed by performing deduplication immediately before
delivery instead of upon the reception of transaction payloads
from the availability module.

Signed-off-by: Matej Pavlovic <[email protected]>
  • Loading branch information
matejpavlovic committed Apr 5, 2023
1 parent eee7aaa commit 579269e
Showing 1 changed file with 46 additions and 18 deletions.
64 changes: 46 additions & 18 deletions pkg/batchfetcher/batchfetcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,9 @@ import (
bfevents "github.com/filecoin-project/mir/pkg/batchfetcher/events"
"github.com/filecoin-project/mir/pkg/checkpoint"
"github.com/filecoin-project/mir/pkg/logging"
"github.com/filecoin-project/mir/pkg/pb/batchfetcherpb"
checkpointpbtypes "github.com/filecoin-project/mir/pkg/pb/checkpointpb/types"
"github.com/filecoin-project/mir/pkg/pb/requestpb"

availabilitypbdsl "github.com/filecoin-project/mir/pkg/pb/availabilitypb/dsl"
apbtypes "github.com/filecoin-project/mir/pkg/pb/availabilitypb/types"
Expand All @@ -20,7 +22,6 @@ import (
"github.com/filecoin-project/mir/pkg/events"
"github.com/filecoin-project/mir/pkg/modules"
"github.com/filecoin-project/mir/pkg/pb/eventpb"
"github.com/filecoin-project/mir/pkg/pb/requestpb"
t "github.com/filecoin-project/mir/pkg/types"
)

Expand All @@ -45,6 +46,30 @@ func NewModule(mc *ModuleConfig, epochNr t.EpochNr, clientProgress *clientprogre
// In such a case, events received later must not be relayed until the pending certificate has been resolved.
var output outputQueue

// filterDuplicates takes a NewOrderedBatch event and removes all the contained transactions
// that have already been added to the clientProgress, i.e., that have already been delivered.
// filterDuplicates modification performs the modification in-place, on the provided batch.
// It is applied to each transaction batch immediately before delivering it to the application.
filterDuplicates := func(newOrderedBatch *batchfetcherpb.NewOrderedBatch) {

newTxs := make([]*requestpb.Request, 0, len(newOrderedBatch.Txs))

for _, tx := range newOrderedBatch.Txs {

// Convenience variables
clID := t.ClientID(tx.ClientId)
reqNo := t.ReqNo(tx.ReqNo)

// Only keep transaction if it has not yet been delivered.
if clientProgress.Add(clID, reqNo) {
newTxs = append(newTxs, tx)
}
}

// Replace the original list of transactions by the filtered one.
newOrderedBatch.Txs = newTxs
}

// The NewEpoch handler updates the current epoch number and forwards the event to the output.
eventpbdsl.UponNewEpoch(m, func(newEpochNr t.EpochNr) error {
epochNr = newEpochNr
Expand All @@ -61,7 +86,23 @@ func NewModule(mc *ModuleConfig, epochNr t.EpochNr, clientProgress *clientprogre
// Create an empty output item and enqueue it immediately.
// Actual output will be delayed until the transactions have been received.
// This is necessary to preserve the order of incoming and outgoing events.
item := outputItem{event: nil}
item := outputItem{
event: nil,

// At the time of delivering the batch,
// filter out transactions that have already been delivered in previous batches.
// Note that this must be done immediately before delivering the batch,
// NOT on reception of the transaction payloads.
// (Otherwise, delivering the transaction payloads from the availability module
// in different order at different nodes would lead to inconsistencies).
f: func(e *eventpb.Event) {
// Casting event to the NewOrderedBatch type is safe,
// because no other event type is ever saved in an output item created at certificate delivery.
filterDuplicates(e.
Type.(*eventpb.Event_BatchFetcher).BatchFetcher.
Type.(*batchfetcherpb.Event_NewOrderedBatch).NewOrderedBatch)
},
}
output.Enqueue(&item)

//TODO cleanup check for empty certificates and make consistent across modules
Expand Down Expand Up @@ -140,22 +181,9 @@ func NewModule(mc *ModuleConfig, epochNr t.EpochNr, clientProgress *clientprogre
// and flushes the output stream.
availabilitypbdsl.UponProvideTransactions(m, func(txs []*requestpbtypes.Request, context *txRequestContext) error {

// Filter out transactions that already have been delivered
newTxs := make([]*requestpb.Request, 0, len(txs))
for _, req := range txs {
// Runs for each received transaction.

// Convenience variables
clID := req.ClientId
reqNo := req.ReqNo

// Only keep request if it has not yet been delivered.
if clientProgress.Add(clID, reqNo) {
newTxs = append(newTxs, req.Pb())
}
}

context.queueItem.event = bfevents.NewOrderedBatch(mc.Destination, newTxs)
// Note that not necessarily all transactions will be part of the final batch.
// When the event leaves the output buffer, duplicates will be filtered out.
context.queueItem.event = bfeventstypes.NewOrderedBatch(mc.Destination, txs).Pb()
output.Flush(m)
return nil
})
Expand Down

0 comments on commit 579269e

Please sign in to comment.