diff --git a/pkg/extensions/tbtc/tbtc.go b/pkg/extensions/tbtc/tbtc.go index 710e70d56..56fb494cf 100644 --- a/pkg/extensions/tbtc/tbtc.go +++ b/pkg/extensions/tbtc/tbtc.go @@ -8,6 +8,8 @@ import ( "math" "math/big" "math/rand" + "strings" + "sync" "time" "github.com/ethereum/go-ethereum/common" @@ -74,7 +76,8 @@ func Initialize(ctx context.Context, chain chain.TBTCHandle) error { } type tbtc struct { - chain chain.TBTCHandle + chain chain.TBTCHandle + monitoringLocks sync.Map } func newTBTC(chain chain.TBTCHandle) *tbtc { @@ -414,10 +417,9 @@ type backoffFn func(iteration int) time.Duration type timeoutFn func(depositAddress string) (time.Duration, error) -// TODO (keep-ecdsa/pull/585#discussion_r513447505): -// 1. Incoming events deduplication. -// 2. Handle chain reorgs (keep-ecdsa/pull/585#discussion_r511760283) -// 3. Resume monitoring after client restart. +// TODO: +// 1. Handle chain reorgs (keep-ecdsa/pull/585#discussion_r511760283 and keep-ecdsa/pull/585#discussion_r513447505) +// 2. Resume monitoring after client restart. func (t *tbtc) monitorAndAct( ctx context.Context, monitoringName string, @@ -434,6 +436,16 @@ func (t *tbtc) monitorAndAct( return } + if !t.acquireMonitoringLock(depositAddress, monitoringName) { + logger.Warningf( + "[%v] monitoring for deposit [%v] is already running", + monitoringName, + depositAddress, + ) + return + } + defer t.releaseMonitoringLock(depositAddress, monitoringName) + logger.Infof( "starting [%v] monitoring for deposit [%v]", monitoringName, @@ -646,6 +658,30 @@ func (t *tbtc) pastEventsLookupStartBlock() uint64 { return currentBlock - pastEventsLookbackBlocks } +func (t *tbtc) acquireMonitoringLock(depositAddress, monitoringName string) bool { + _, isExistingKey := t.monitoringLocks.LoadOrStore( + monitoringLockKey(depositAddress, monitoringName), + true, + ) + + return !isExistingKey +} + +func (t *tbtc) releaseMonitoringLock(depositAddress, monitoringName string) { + t.monitoringLocks.Delete(monitoringLockKey(depositAddress, monitoringName)) +} + +func monitoringLockKey( + depositAddress string, + monitoringName string, +) string { + return fmt.Sprintf( + "%v-%v", + depositAddress, + strings.ReplaceAll(monitoringName, " ", ""), + ) +} + // Computes the exponential backoff value for given iteration. // For each iteration the result value will be in range: // - iteration 1: [2000ms, 2100ms) diff --git a/pkg/extensions/tbtc/tbtc_test.go b/pkg/extensions/tbtc/tbtc_test.go index e84cbc295..db67c0775 100644 --- a/pkg/extensions/tbtc/tbtc_test.go +++ b/pkg/extensions/tbtc/tbtc_test.go @@ -7,9 +7,12 @@ import ( "math/big" "math/rand" "reflect" + "sync/atomic" "testing" "time" + "github.com/keep-network/keep-common/pkg/subscription" + "github.com/keep-network/keep-ecdsa/pkg/ecdsa" "github.com/keep-network/keep-ecdsa/pkg/utils/byteutils" @@ -1851,6 +1854,153 @@ func TestProvideRedemptionProof_OperatorNotInSigningGroup( } } +func TestMonitorAndActDeduplication(t *testing.T) { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + tbtcChain := local.NewTBTCLocalChain(ctx) + tbtc := newTBTC(tbtcChain) + + monitoringName := "monitoring" + + shouldMonitorFn := func(depositAddress string) bool { + return true + } + + monitoringStartFn := func( + handler depositEventHandler, + ) (subscription.EventSubscription, error) { + for i := 0; i < 5; i++ { + handler("deposit") // simulate multiple start events + } + + return subscription.NewEventSubscription(func() {}), nil + } + + monitoringStopFn := func( + handler depositEventHandler, + ) (subscription.EventSubscription, error) { + return subscription.NewEventSubscription(func() {}), nil + } + + keepClosedFn := func(depositAddress string) (chan struct{}, func(), error) { + return make(chan struct{}), func() {}, nil + } + + var actCounter uint64 + actFn := func(depositAddress string) error { + atomic.AddUint64(&actCounter, 1) + return nil + } + + timeoutFn := func(depositAddress string) (duration time.Duration, e error) { + return timeout, nil + } + + monitoringSubscription, err := tbtc.monitorAndAct( + ctx, + monitoringName, + shouldMonitorFn, + monitoringStartFn, + monitoringStopFn, + keepClosedFn, + actFn, + constantBackoff, + timeoutFn, + ) + if err != nil { + t.Fatal(err) + } + defer monitoringSubscription.Unsubscribe() + + // wait a bit longer than the monitoring timeout + // to make sure the potential transaction completes + time.Sleep(2 * timeout) + + expectedActCounter := uint64(1) + if actCounter != expectedActCounter { + t.Errorf( + "unexpected number of action invocations\n"+ + "expected: [%v]\n"+ + "actual: [%v]", + expectedActCounter, + actCounter, + ) + } +} + +func TestAcquireMonitoringLock(t *testing.T) { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + tbtcChain := local.NewTBTCLocalChain(ctx) + tbtc := newTBTC(tbtcChain) + + if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") { + t.Errorf("monitoring wasn't started before; should be locked successfully") + } + + if !tbtc.acquireMonitoringLock("0xBB", "monitoring one") { + t.Errorf("monitoring wasn't started before; should be locked successfully") + } + + if !tbtc.acquireMonitoringLock("0xAA", "monitoring two") { + t.Errorf("monitoring wasn't started before; should be locked successfully") + } + + if !tbtc.acquireMonitoringLock("0xBB", "monitoring two") { + t.Errorf("monitoring wasn't started before; should be locked successfully") + } +} + +func TestAcquireMonitoringLock_Duplicate(t *testing.T) { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + tbtcChain := local.NewTBTCLocalChain(ctx) + tbtc := newTBTC(tbtcChain) + + if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") { + t.Errorf("monitoring wasn't started before; should be locked successfully") + } + + if tbtc.acquireMonitoringLock("0xAA", "monitoring one") { + t.Errorf("monitoring was started before; lock attempt should be rejected") + } +} + +func TestReleaseMonitoringLock(t *testing.T) { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + tbtcChain := local.NewTBTCLocalChain(ctx) + tbtc := newTBTC(tbtcChain) + + if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") { + t.Errorf("monitoring wasn't started before; should be locked successfully") + } + + tbtc.releaseMonitoringLock("0xAA", "monitoring one") + + if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") { + t.Errorf("monitoring lock has been released; should be locked successfully") + } +} + +func TestReleaseMonitoringLock_WhenEmpty(t *testing.T) { + ctx, cancelCtx := context.WithCancel(context.Background()) + defer cancelCtx() + + tbtcChain := local.NewTBTCLocalChain(ctx) + tbtc := newTBTC(tbtcChain) + + tbtc.releaseMonitoringLock("0xAA", "monitoring one") + + if !tbtc.acquireMonitoringLock("0xAA", "monitoring one") { + t.Errorf("monitoring wasn't started before; should be locked successfully") + } +} + func submitKeepPublicKey( depositAddress string, tbtcChain *local.TBTCLocalChain,