Skip to content
This repository has been archived by the owner on May 22, 2023. It is now read-only.

Prevent TBTC monitoring duplication #591

Merged
merged 13 commits into from
Nov 12, 2020
Merged
55 changes: 55 additions & 0 deletions pkg/extensions/tbtc/monitoring_lock.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
package tbtc

import (
"fmt"
"strings"
"sync"
)

type monitoringLockKey string

func newMonitoringLockKey(
depositAddress string,
monitoringName string,
) monitoringLockKey {
return monitoringLockKey(fmt.Sprintf(
"%v-%v",
depositAddress,
strings.ReplaceAll(monitoringName, " ", ""),
))
}
pdyraga marked this conversation as resolved.
Show resolved Hide resolved

type monitoringLock struct {
pdyraga marked this conversation as resolved.
Show resolved Hide resolved
locks map[monitoringLockKey]bool
mutex sync.Mutex
}

func newMonitoringLock() *monitoringLock {
return &monitoringLock{
locks: make(map[monitoringLockKey]bool),
}
}

func (ml *monitoringLock) tryLock(depositAddress, monitoringName string) bool {
ml.mutex.Lock()
defer ml.mutex.Unlock()

key := newMonitoringLockKey(depositAddress, monitoringName)

if ml.locks[key] == true {
return false
}

ml.locks[key] = true

return true
}

func (ml *monitoringLock) release(depositAddress, monitoringName string) {
ml.mutex.Lock()
defer ml.mutex.Unlock()

key := newMonitoringLockKey(depositAddress, monitoringName)

delete(ml.locks, key)
}
59 changes: 59 additions & 0 deletions pkg/extensions/tbtc/monitoring_lock_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,59 @@
package tbtc

import "testing"

func TestMonitoringLock_TryLock(t *testing.T) {
monitoringLock := newMonitoringLock()

if !monitoringLock.tryLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !monitoringLock.tryLock("0xBB", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !monitoringLock.tryLock("0xAA", "monitoring two") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if !monitoringLock.tryLock("0xBB", "monitoring two") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}
}

func TestMonitoringLock_TryLock_Duplicate(t *testing.T) {
monitoringLock := newMonitoringLock()

if !monitoringLock.tryLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

if monitoringLock.tryLock("0xAA", "monitoring one") {
t.Errorf("monitoring was started before; lock attempt should be rejected")
}
}

func TestMonitoringLock_Release(t *testing.T) {
monitoringLock := newMonitoringLock()

if !monitoringLock.tryLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}

monitoringLock.release("0xAA", "monitoring one")

if !monitoringLock.tryLock("0xAA", "monitoring one") {
t.Errorf("monitoring lock has been released; should be locked successfully")
}
}

func TestMonitoringLock_Release_WhenEmpty(t *testing.T) {
monitoringLock := newMonitoringLock()

monitoringLock.release("0xAA", "monitoring one")

if !monitoringLock.tryLock("0xAA", "monitoring one") {
t.Errorf("monitoring wasn't started before; should be locked successfully")
}
}
28 changes: 20 additions & 8 deletions pkg/extensions/tbtc/tbtc.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,14 +82,16 @@ func Initialize(
}

type tbtc struct {
chain chain.TBTCHandle
keepsRegistry KeepsRegistry
chain chain.TBTCHandle
keepsRegistry KeepsRegistry
monitoringLock *monitoringLock
}

func newTBTC(chain chain.TBTCHandle, keepsRegistry KeepsRegistry) *tbtc {
return &tbtc{
chain: chain,
keepsRegistry: keepsRegistry,
chain: chain,
keepsRegistry: keepsRegistry,
monitoringLock: newMonitoringLock(),
}
}

Expand Down Expand Up @@ -419,10 +421,9 @@ type backoffFn func(iteration int) time.Duration

type timeoutFn func(depositAddress string) (time.Duration, error)

// TODO (keep-ecdsa/pull/585#discussion_r513447505):
// 1. Incoming events deduplication.
// 2. Handle chain reorgs (keep-ecdsa/pull/585#discussion_r511760283)
// 3. Resume monitoring after client restart.
// TODO:
// 1. Handle chain reorgs (keep-ecdsa/pull/585#discussion_r511760283 and keep-ecdsa/pull/585#discussion_r513447505)
// 2. Resume monitoring after client restart.
func (t *tbtc) monitorAndAct(
ctx context.Context,
monitoringName string,
Expand All @@ -439,6 +440,17 @@ func (t *tbtc) monitorAndAct(
return
}

if !t.monitoringLock.tryLock(depositAddress, monitoringName) {
pdyraga marked this conversation as resolved.
Show resolved Hide resolved
logger.Warningf(
"lock for [%v] monitoring for deposit [%v] has been "+
"already acquired; could not start the monitoring",
pdyraga marked this conversation as resolved.
Show resolved Hide resolved
monitoringName,
depositAddress,
)
return
}
defer t.monitoringLock.release(depositAddress, monitoringName)

logger.Infof(
"starting [%v] monitoring for deposit [%v]",
monitoringName,
Expand Down