Skip to content
This repository has been archived by the owner on May 22, 2023. It is now read-only.

Prevent TBTC monitoring duplication #591

Merged
merged 13 commits into from
Nov 12, 2020
Merged
47 changes: 42 additions & 5 deletions pkg/extensions/tbtc/tbtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,8 @@ import (
"math"
"math/big"
"math/rand"
"strings"
"sync"
"time"

"github.com/ethereum/go-ethereum/common"
Expand Down Expand Up @@ -77,7 +79,8 @@ func Initialize(
}

type tbtc struct {
chain chain.TBTCHandle
chain chain.TBTCHandle
monitoringLocks sync.Map
}

func newTBTC(chain chain.TBTCHandle) *tbtc {
Expand Down Expand Up @@ -417,10 +420,9 @@ type backoffFn func(iteration int) time.Duration

type timeoutFn func(depositAddress string) (time.Duration, error)

// TODO (keep-ecdsa/pull/585#discussion_r513447505):
// 1. Incoming events deduplication.
// 2. Handle chain reorgs (keep-ecdsa/pull/585#discussion_r511760283)
// 3. Resume monitoring after client restart.
// TODO:
// 1. Handle chain reorgs (keep-ecdsa/pull/585#discussion_r511760283 and keep-ecdsa/pull/585#discussion_r513447505)
// 2. Resume monitoring after client restart.
func (t *tbtc) monitorAndAct(
ctx context.Context,
monitoringName string,
Expand All @@ -437,6 +439,17 @@ func (t *tbtc) monitorAndAct(
return
}

if !t.acquireMonitoringLock(depositAddress, monitoringName) {
logger.Warningf(
"[%v] monitoring for deposit [%v] is already running; "+
"could not start new monitoring instance",
pdyraga marked this conversation as resolved.
Show resolved Hide resolved
monitoringName,
depositAddress,
)
return
}
defer t.releaseMonitoringLock(depositAddress, monitoringName)

logger.Infof(
"starting [%v] monitoring for deposit [%v]",
monitoringName,
Expand Down Expand Up @@ -651,6 +664,30 @@ func (t *tbtc) pastEventsLookupStartBlock() uint64 {
return currentBlock - pastEventsLookbackBlocks
}

func (t *tbtc) acquireMonitoringLock(depositAddress, monitoringName string) bool {
_, isExistingKey := t.monitoringLocks.LoadOrStore(
monitoringLockKey(depositAddress, monitoringName),
true,
)

return !isExistingKey
}

func (t *tbtc) releaseMonitoringLock(depositAddress, monitoringName string) {
t.monitoringLocks.Delete(monitoringLockKey(depositAddress, monitoringName))
}

func monitoringLockKey(
depositAddress string,
monitoringName string,
) string {
return fmt.Sprintf(
"%v-%v",
depositAddress,
strings.ReplaceAll(monitoringName, " ", ""),
)
}

// Computes the exponential backoff value for given iteration.
// For each iteration the result value will be in range:
// - iteration 1: [2000ms, 2100ms)
Expand Down
149 changes: 149 additions & 0 deletions pkg/extensions/tbtc/tbtc_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,8 @@ import (
"testing"
"time"

"github.com/keep-network/keep-common/pkg/subscription"

"github.com/keep-network/keep-ecdsa/pkg/ecdsa"
"github.com/keep-network/keep-ecdsa/pkg/utils/byteutils"

Expand Down Expand Up @@ -1851,6 +1853,153 @@ func TestProvideRedemptionProof_OperatorNotInSigningGroup(
}
}

func TestMonitorAndActDeduplication(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

monitoringName := "monitoring"

shouldMonitorFn := func(depositAddress string) bool {
return true
}

monitoringStartFn := func(
handler depositEventHandler,
) (subscription.EventSubscription, error) {
for i := 0; i < 5; i++ {
handler("deposit") // simulate multiple start events
}

return subscription.NewEventSubscription(func() {}), nil
}

monitoringStopFn := func(
handler depositEventHandler,
) (subscription.EventSubscription, error) {
return subscription.NewEventSubscription(func() {}), nil
}

keepClosedFn := func(depositAddress string) (chan struct{}, func(), error) {
return make(chan struct{}), func() {}, nil
}

actCounter := 0
actFn := func(depositAddress string) error {
actCounter++
pdyraga marked this conversation as resolved.
Show resolved Hide resolved
return nil
}

timeoutFn := func(depositAddress string) (duration time.Duration, e error) {
return timeout, nil
}

monitoringSubscription, err := tbtc.monitorAndAct(
ctx,
monitoringName,
shouldMonitorFn,
monitoringStartFn,
monitoringStopFn,
keepClosedFn,
actFn,
constantBackoff,
timeoutFn,
)
if err != nil {
t.Fatal(err)
}
defer monitoringSubscription.Unsubscribe()

// wait a bit longer than the monitoring timeout
// to make sure the potential transaction completes
time.Sleep(2 * timeout)

expectedActCounter := 1
if actCounter != expectedActCounter {
t.Errorf(
"unexpected number of action invocations\n"+
"expected: [%v]\n"+
"actual: [%v]",
expectedActCounter,
actCounter,
)
}
}

func TestAcquireMonitoringLock(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !tbtc.acquireMonitoringLock("0xBB", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !tbtc.acquireMonitoringLock("0xAA", "monitoring two") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !tbtc.acquireMonitoringLock("0xBB", "monitoring two") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}
}

func TestAcquireMonitoringLock_Duplicate(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring was started before; lock attempt should be rejected")
}
}

func TestReleaseMonitoringLock(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

tbtc.releaseMonitoringLock("0xAA", "monitoring one")

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring lock has been released; should be locked successfully")
}
}

func TestReleaseMonitoringLock_WhenEmpty(t *testing.T) {
ctx, cancelCtx := context.WithCancel(context.Background())
defer cancelCtx()

tbtcChain := local.NewTBTCLocalChain(ctx)
tbtc := newTBTC(tbtcChain)

tbtc.releaseMonitoringLock("0xAA", "monitoring one")

if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}
}

func submitKeepPublicKey(
depositAddress string,
tbtcChain *local.TBTCLocalChain,
Expand Down