Skip to content
This repository has been archived by the owner on May 22, 2023. It is now read-only.

Prevent TBTC monitoring duplication #591

Merged
merged 13 commits into from
Nov 12, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
46 changes: 41 additions & 5 deletions pkg/extensions/tbtc/tbtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math"
"math/big"
"math/rand"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -74,7 +76,8 @@ func Initialize(ctx context.Context, chain chain.TBTCHandle) error {
}

type tbtc struct {
chain chain.TBTCHandle
chain chain.TBTCHandle
monitoringLocks sync.Map
}

func newTBTC(chain chain.TBTCHandle) *tbtc {
Expand Down Expand Up @@ -414,10 +417,9 @@ type backoffFn func(iteration int) time.Duration

type timeoutFn func(depositAddress string) (time.Duration, error)

// TODO (keep-ecdsa/pull/585#discussion_r513447505):
// 1. Incoming events deduplication.
// 2. Handle chain reorgs (keep-ecdsa/pull/585#discussion_r511760283)
// 3. Resume monitoring after client restart.
// TODO:
// 1. Handle chain reorgs (keep-ecdsa/pull/585#discussion_r511760283 and keep-ecdsa/pull/585#discussion_r513447505)
// 2. Resume monitoring after client restart.
func (t *tbtc) monitorAndAct(
ctx context.Context,
monitoringName string,
Expand All @@ -434,6 +436,16 @@ func (t *tbtc) monitorAndAct(
return
}

if !t.acquireMonitoringLock(depositAddress, monitoringName) {
logger.Warningf(
"[%v] monitoring for deposit [%v] is already running",
monitoringName,
depositAddress,
)
return
}
defer t.releaseMonitoringLock(depositAddress, monitoringName)

logger.Infof(
"starting [%v] monitoring for deposit [%v]",
monitoringName,
Expand Down Expand Up @@ -646,6 +658,30 @@ func (t *tbtc) pastEventsLookupStartBlock() uint64 {
return currentBlock - pastEventsLookbackBlocks
}

func (t *tbtc) acquireMonitoringLock(depositAddress, monitoringName string) bool {
_, isExistingKey := t.monitoringLocks.LoadOrStore(
monitoringLockKey(depositAddress, monitoringName),
true,
)

return !isExistingKey
}

func (t *tbtc) releaseMonitoringLock(depositAddress, monitoringName string) {
t.monitoringLocks.Delete(monitoringLockKey(depositAddress, monitoringName))
}

func monitoringLockKey(
depositAddress string,
monitoringName string,
) string {
return fmt.Sprintf(
"%v-%v",
depositAddress,
strings.ReplaceAll(monitoringName, " ", ""),
)
}

// Computes the exponential backoff value for given iteration.
// For each iteration the result value will be in range:
// - iteration 1: [2000ms, 2100ms)
Expand Down
150 changes: 150 additions & 0 deletions pkg/extensions/tbtc/tbtc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,12 @@ import (
"math/big"
"math/rand"
"reflect"
"sync/atomic"
"testing"
"time"

"github.com/keep-network/keep-common/pkg/subscription"

"github.com/keep-network/keep-ecdsa/pkg/ecdsa"
"github.com/keep-network/keep-ecdsa/pkg/utils/byteutils"

Expand Down Expand Up @@ -1851,6 +1854,153 @@ func TestProvideRedemptionProof_OperatorNotInSigningGroup(
}
}

func TestMonitorAndActDeduplication(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

monitoringName := "monitoring"

shouldMonitorFn := func(depositAddress string) bool {
return true
}

monitoringStartFn := func(
handler depositEventHandler,
) (subscription.EventSubscription, error) {
for i := 0; i < 5; i++ {
handler("deposit") // simulate multiple start events
}

return subscription.NewEventSubscription(func() {}), nil
}

monitoringStopFn := func(
handler depositEventHandler,
) (subscription.EventSubscription, error) {
return subscription.NewEventSubscription(func() {}), nil
}

keepClosedFn := func(depositAddress string) (chan struct{}, func(), error) {
return make(chan struct{}), func() {}, nil
}

var actCounter uint64
actFn := func(depositAddress string) error {
atomic.AddUint64(&actCounter, 1)
return nil
}

timeoutFn := func(depositAddress string) (duration time.Duration, e error) {
return timeout, nil
}

monitoringSubscription, err := tbtc.monitorAndAct(
ctx,
monitoringName,
shouldMonitorFn,
monitoringStartFn,
monitoringStopFn,
keepClosedFn,
actFn,
constantBackoff,
timeoutFn,
)
if err != nil {
t.Fatal(err)
}
defer monitoringSubscription.Unsubscribe()

// wait a bit longer than the monitoring timeout
// to make sure the potential transaction completes
time.Sleep(2 * timeout)

expectedActCounter := uint64(1)
if actCounter != expectedActCounter {
t.Errorf(
"unexpected number of action invocations\n"+
"expected: [%v]\n"+
"actual: [%v]",
expectedActCounter,
actCounter,
)
}
}

func TestAcquireMonitoringLock(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !tbtc.acquireMonitoringLock("0xBB", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !tbtc.acquireMonitoringLock("0xAA", "monitoring two") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !tbtc.acquireMonitoringLock("0xBB", "monitoring two") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}
}

func TestAcquireMonitoringLock_Duplicate(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring was started before; lock attempt should be rejected")
}
}

func TestReleaseMonitoringLock(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

tbtc.releaseMonitoringLock("0xAA", "monitoring one")

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring lock has been released; should be locked successfully")
}
}

func TestReleaseMonitoringLock_WhenEmpty(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

tbtc.releaseMonitoringLock("0xAA", "monitoring one")

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}
}

func submitKeepPublicKey(
depositAddress string,
tbtcChain *local.TBTCLocalChain,
Expand Down