Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Ziga/og deduplicate events #1628

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 2 additions & 4 deletions integration/obscurogateway/obscurogateway_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -234,11 +234,9 @@ func testMultipleAccountsSubscription(t *testing.T, httpURL, wsURL string, w wal
// user0 should see two lifecycle events (1 for each interaction with setMessage2)
assert.Equal(t, 2, len(user0logs))
// user1 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage
// TODO: 5 events as expected (2*2 lifecycle events + 1 event specific to an address0 - change after deduplication
assert.Equal(t, 5, len(user1logs))
assert.Equal(t, 3, len(user1logs))
// user2 should see three events (two lifecycle events - same as user0) and event with his interaction with setMessage
// TODO: 5 events as expected (2*2 lifecycle events + 1 event specific to an address0 - change after deduplication
assert.Equal(t, 5, len(user2logs))
assert.Equal(t, 3, len(user2logs))
}

func testAreTxsMinted(t *testing.T, httpURL, wsURL string, w wallet.Wallet) { //nolint: unused
Expand Down
1 change: 1 addition & 0 deletions tools/walletextension/common/constants.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ const (
APIVersion1 = "/v1"
MethodEthSubscription = "eth_subscription"
PathVersion = "/version/"
DeduplicationBufferSize = 20
)

var ReaderHeadTimeout = 10 * time.Second
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
package subscriptions

import "github.com/ethereum/go-ethereum/common"

// LogKey uniquely represents a log (consists of BlockHash, TxHash, and Index)
type LogKey struct {
BlockHash common.Hash // Not necessary, but can be helpful in edge case of block reorg.
TxHash common.Hash
Index uint
}

// CircularBuffer is a data structure that uses a single, fixed-size buffer as if it was connected end-to-end.
type CircularBuffer struct {
data []LogKey
size int
end int
}

// NewCircularBuffer initializes a new CircularBuffer of the given size.
func NewCircularBuffer(size int) *CircularBuffer {
otherview marked this conversation as resolved.
Show resolved Hide resolved
return &CircularBuffer{
data: make([]LogKey, size),
size: size,
end: 0,
}
}

// Push adds a new LogKey to the end of the buffer. If the buffer is full,
// it overwrites the oldest data with the new LogKey.
func (cb *CircularBuffer) Push(key LogKey) {
cb.data[cb.end] = key
cb.end = (cb.end + 1) % cb.size
}

// Contains checks if the given LogKey exists in the buffer
func (cb *CircularBuffer) Contains(key LogKey) bool {
for _, item := range cb.data {
if item == key {
return true
}
}
return false
}
17 changes: 16 additions & 1 deletion tools/walletextension/subscriptions/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,6 @@ func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req
go readFromChannelAndWriteToUserConn(funnelMultipleAccountsChan, userConn, userSubscriptionID, sm.logger)

// iterate over all clients and subscribe for each of them
// TODO: currently we use only first client (enabling subscriptions for all of them will be part of future PR)
for _, client := range clients {
subscription, err := client.Subscribe(context.Background(), resp, rpc.SubscribeNamespace, funnelMultipleAccountsChan, req.Params...)
if err != nil {
Expand All @@ -75,13 +74,29 @@ func (sm *SubscriptionManager) HandleNewSubscriptions(clients []rpc.Client, req
}

func readFromChannelAndWriteToUserConn(channel chan common.IDAndLog, userConn userconn.UserConn, userSubscriptionID gethrpc.ID, logger gethlog.Logger) {
buffer := NewCircularBuffer(wecommon.DeduplicationBufferSize)
for data := range channel {
// create unique identifier for current log
zkokelj marked this conversation as resolved.
Show resolved Hide resolved
uniqueLogKey := LogKey{
BlockHash: data.Log.BlockHash,
TxHash: data.Log.TxHash,
Index: data.Log.Index,
}

// check if the current event is a duplicate (and skip it if it is)
if buffer.Contains(uniqueLogKey) {
continue
}

jsonResponse, err := prepareLogResponse(data, userSubscriptionID)
if err != nil {
logger.Error("could not marshal log response to JSON on subscription.", log.SubIDKey, data.SubID, log.ErrKey, err)
continue
}

// the current log is unique, and we want to add it to our buffer and proceed with forwarding to the user
buffer.Push(uniqueLogKey)

logger.Trace(fmt.Sprintf("Forwarding log from Obscuro node: %s", jsonResponse), log.SubIDKey, data.SubID)
err = userConn.WriteResponse(jsonResponse)
if err != nil {
Expand Down
Loading