diff --git a/integration/obscurogateway/obscurogateway_test.go b/integration/obscurogateway/obscurogateway_test.go index ef5d16fae1..5b07dd19b6 100644 --- a/integration/obscurogateway/obscurogateway_test.go +++ b/integration/obscurogateway/obscurogateway_test.go @@ -234,11 +234,9 @@ func testMultipleAccountsSubscription(t *testing.T, httpURL, wsURL string, w wal // user0 should see two lifecycle events (1 for each interaction with setMessage2) assert.Equal(t, 2, len(user0logs)) // user1 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage - // TODO: 5 events as expected (2*2 lifecycle events + 1 event specific to an address0 - change after deduplication - assert.Equal(t, 5, len(user1logs)) + assert.Equal(t, 3, len(user1logs)) // user2 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage - // TODO: 5 events as expected (2*2 lifecycle events + 1 event specific to an address0 - change after deduplication - assert.Equal(t, 5, len(user2logs)) + assert.Equal(t, 3, len(user2logs)) } func testAreTxsMinted(t *testing.T, httpURL, wsURL string, w wallet.Wallet) { //nolint: unused diff --git a/tools/walletextension/common/constants.go b/tools/walletextension/common/constants.go index b49f964728..f7ab956396 100644 --- a/tools/walletextension/common/constants.go +++ b/tools/walletextension/common/constants.go @@ -49,6 +49,7 @@ const ( APIVersion1 = "/v1" MethodEthSubscription = "eth_subscription" PathVersion = "/version/" + DeduplicationBufferSize = 20 ) var ReaderHeadTimeout = 10 * time.Second diff --git a/tools/walletextension/subscriptions/deduplication_circular_buffer.go b/tools/walletextension/subscriptions/deduplication_circular_buffer.go new file mode 100644 index 0000000000..52a2e23cfe --- /dev/null +++ b/tools/walletextension/subscriptions/deduplication_circular_buffer.go @@ -0,0 +1,43 @@ +package subscriptions + +import "github.com/ethereum/go-ethereum/common" + +// LogKey uniquely represents a log (consists of BlockHash, TxHash, and Index) +type LogKey struct { + BlockHash common.Hash // Not necessary, but can be helpful in edge case of block reorg. + TxHash common.Hash + Index uint +} + +// CircularBuffer is a data structure that uses a single, fixed-size buffer as if it was connected end-to-end. +type CircularBuffer struct { + data []LogKey + size int + end int +} + +// NewCircularBuffer initializes a new CircularBuffer of the given size. +func NewCircularBuffer(size int) *CircularBuffer { + return &CircularBuffer{ + data: make([]LogKey, size), + size: size, + end: 0, + } +} + +// Push adds a new LogKey to the end of the buffer. If the buffer is full, +// it overwrites the oldest data with the new LogKey. +func (cb *CircularBuffer) Push(key LogKey) { + cb.data[cb.end] = key + cb.end = (cb.end + 1) % cb.size +} + +// Contains checks if the given LogKey exists in the buffer +func (cb *CircularBuffer) Contains(key LogKey) bool { + for _, item := range cb.data { + if item == key { + return true + } + } + return false +} diff --git a/tools/walletextension/subscriptions/subscriptions.go b/tools/walletextension/subscriptions/subscriptions.go index e4d25cf374..6e1a29636c 100644 --- a/tools/walletextension/subscriptions/subscriptions.go +++ b/tools/walletextension/subscriptions/subscriptions.go @@ -50,7 +50,6 @@ func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req go readFromChannelAndWriteToUserConn(funnelMultipleAccountsChan, userConn, userSubscriptionID, sm.logger) // iterate over all clients and subscribe for each of them - // TODO: currently we use only first client (enabling subscriptions for all of them will be part of future PR) for _, client := range clients { subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, funnelMultipleAccountsChan, req.Params...) if err != nil { @@ -75,13 +74,29 @@ func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req } func readFromChannelAndWriteToUserConn(channel chan common.IDAndLog, userConn userconn.UserConn, userSubscriptionID gethrpc.ID, logger gethlog.Logger) { + buffer := NewCircularBuffer(wecommon.DeduplicationBufferSize) for data := range channel { + // create unique identifier for current log + uniqueLogKey := LogKey{ + BlockHash: data.Log.BlockHash, + TxHash: data.Log.TxHash, + Index: data.Log.Index, + } + + // check if the current event is a duplicate (and skip it if it is) + if buffer.Contains(uniqueLogKey) { + continue + } + jsonResponse, err := prepareLogResponse(data, userSubscriptionID) if err != nil { logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, data.SubID, log.ErrKey, err) continue } + // the current log is unique, and we want to add it to our buffer and proceed with forwarding to the user + buffer.Push(uniqueLogKey) + logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, data.SubID) err = userConn.WriteResponse(jsonResponse) if err != nil {