Skip to content

Commit

Permalink
Dual transmission key protection (#15969)
Browse files Browse the repository at this point in the history
* -Add tags to eth keystore state
-Check primary/secondary addresses

* -Update to mutex

* -Add test for resource mutex

* -Fix tests

* -Update keystore.eth mock

* -Fix tests

* -Remove unused files

* -Fix import cycle

* -Fix lint

* -Fix lint

* -Update to RW lock

* Implement feedback

* Implement feedback

* Implement feedback
  • Loading branch information
george-dorin authored Jan 21, 2025
1 parent f7eef5c commit eef9932
Show file tree
Hide file tree
Showing 17 changed files with 535 additions and 16 deletions.
2 changes: 1 addition & 1 deletion core/internal/features/ocr2/features_ocr2_helper.go
Original file line number Diff line number Diff line change
Expand Up @@ -610,7 +610,7 @@ updateInterval = "1m"
contractABI, err2 := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorABI))
require.NoError(t, err2)
apps[0].GetRelayers().LegacyEVMChains().Slice()
ct, err2 := evm.NewOCRContractTransmitter(testutils.Context(t), ocrContractAddress, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].Client(), contractABI, nil, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].LogPoller(), lggr)
ct, err2 := evm.NewOCRContractTransmitter(testutils.Context(t), ocrContractAddress, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].Client(), contractABI, nil, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].LogPoller(), lggr, apps[0].KeyStore.Eth())
require.NoError(t, err2)
configDigest, epoch, err2 := ct.LatestConfigDigestAndEpoch(testutils.Context(t))
require.NoError(t, err2)
Expand Down
2 changes: 1 addition & 1 deletion core/internal/features/ocr2/features_ocr2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -264,7 +264,7 @@ updateInterval = "1m"
// Assert we can read the latest config digest and epoch after a report has been submitted.
contractABI, err := abi.JSON(strings.NewReader(ocr2aggregator.OCR2AggregatorABI))
require.NoError(t, err)
ct, err := evm.NewOCRContractTransmitter(testutils.Context(t), ocrContractAddress, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].Client(), contractABI, nil, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].LogPoller(), lggr)
ct, err := evm.NewOCRContractTransmitter(testutils.Context(t), ocrContractAddress, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].Client(), contractABI, nil, apps[0].GetRelayers().LegacyEVMChains().Slice()[0].LogPoller(), lggr, apps[0].KeyStore.Eth())
require.NoError(t, err)
configDigest, epoch, err := ct.LatestConfigDigestAndEpoch(testutils.Context(t))
require.NoError(t, err)
Expand Down
106 changes: 106 additions & 0 deletions core/services/job/job_orm_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@ import (
"github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/utils/jsonserializable"
pkgworkflows "github.com/smartcontractkit/chainlink-common/pkg/workflows"
"github.com/smartcontractkit/chainlink/v2/core/services/keystore"

"github.com/smartcontractkit/chainlink/v2/core/bridges"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
Expand Down Expand Up @@ -2233,3 +2234,108 @@ func TestORM_CreateJob_OCR2_With_DualTransmission(t *testing.T) {
keyStore.Eth().XXXTestingOnlyAdd(ctx, dtTransmitterAddress)
require.NoError(t, jobORM.CreateJob(ctx, &jb))
}

func TestORM_CreateJob_KeyLocking(t *testing.T) {
ctx := testutils.Context(t)
customChainID := big.New(testutils.NewRandomEVMChainID())

config := configtest.NewGeneralConfig(t, func(c *chainlink.Config, s *chainlink.Secrets) {
enabled := true
c.EVM = append(c.EVM, &evmcfg.EVMConfig{
ChainID: customChainID,
Chain: evmcfg.Defaults(customChainID),
Enabled: &enabled,
Nodes: evmcfg.EVMNodes{{}},
})
})
db := pgtest.NewSqlxDB(t)
ks := cltest.NewKeyStore(t, db)
require.NoError(t, ks.OCR2().Add(ctx, cltest.DefaultOCR2Key))
_, transmitterID := cltest.MustInsertRandomKey(t, ks.Eth())
dtTransmitterAddress := cltest.MustGenerateRandomKey(t)
ks.Eth().XXXTestingOnlyAdd(ctx, dtTransmitterAddress)

baseJobSpec := fmt.Sprintf(testspecs.OCR2EVMDualTransmissionSpecMinimalTemplate, transmitterID.String())

lggr := logger.TestLogger(t)
pipelineORM := pipeline.NewORM(db, lggr, config.JobPipeline().MaxSuccessfulRuns())
bridgesORM := bridges.NewORM(db)

jobORM := NewTestORM(t, db, pipelineORM, bridgesORM, ks)

t.Run("keys not locked", func(t *testing.T) {
completeDualTransmissionSpec := fmt.Sprintf(`
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress = '0x613a38AC1659769640aaE063C651F48E0250454C'
transmitterAddress = '%s'
[relayConfig.dualTransmission.meta]
key1 = ['val1']
key2 = ['val2','val3']
`,
dtTransmitterAddress.Address.String())

jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+completeDualTransmissionSpec, nil)
require.NoError(t, err)

jb.OCR2OracleSpec.TransmitterID = null.StringFrom(transmitterID.String())
jb.Name = null.StringFrom(uuid.NewString())

require.NoError(t, jobORM.CreateJob(ctx, &jb))
})

t.Run("keys locked", func(t *testing.T) {
completeDualTransmissionSpec := fmt.Sprintf(`
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress = '0x613a38AC1659769640aaE063C651F48E0250454C'
transmitterAddress = '%s'
[relayConfig.dualTransmission.meta]
key1 = ['val1']
key2 = ['val2','val3']
`,
dtTransmitterAddress.Address.String())

rm, err := ks.Eth().GetResourceMutex(ctx, transmitterID)
require.NoError(t, err)
require.NoError(t, rm.TryLock(keystore.TXMv1))
rm, err = ks.Eth().GetResourceMutex(ctx, dtTransmitterAddress.Address)
require.NoError(t, err)
require.NoError(t, rm.TryLock(keystore.TXMv2))
jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+completeDualTransmissionSpec, nil)
require.NoError(t, err)

jb.OCR2OracleSpec.TransmitterID = null.StringFrom(transmitterID.String())
jb.Name = null.StringFrom(uuid.NewString())

require.NoError(t, jobORM.CreateJob(ctx, &jb))
})

t.Run("keys locked but job spec misconfigured", func(t *testing.T) {
rm, err := ks.Eth().GetResourceMutex(ctx, transmitterID)
require.NoError(t, err)
require.NoError(t, rm.TryLock(keystore.TXMv1))
rm, err = ks.Eth().GetResourceMutex(ctx, dtTransmitterAddress.Address)
require.NoError(t, err)
require.NoError(t, rm.TryLock(keystore.TXMv2))

completeDualTransmissionSpec := fmt.Sprintf(`
enableDualTransmission=true
[relayConfig.dualTransmission]
contractAddress = '0x613a38AC1659769640aaE063C651F48E0250454C'
transmitterAddress = '%s'
[relayConfig.dualTransmission.meta]
key1 = ['val1']
key2 = ['val2','val3']
`,
transmitterID.String())

jb, err := ocr2validate.ValidatedOracleSpecToml(testutils.Context(t), config.OCR2(), config.Insecure(), baseJobSpec+completeDualTransmissionSpec, nil)
require.NoError(t, err)

jb.OCR2OracleSpec.TransmitterID = null.StringFrom(dtTransmitterAddress.Address.String())
jb.Name = null.StringFrom(uuid.NewString())

require.ErrorContains(t, jobORM.CreateJob(ctx, &jb), "cannot be a secondary transmitter address because it's used a primary transmitter in another job")
})
}
30 changes: 30 additions & 0 deletions core/services/job/orm.go
Original file line number Diff line number Diff line change
Expand Up @@ -308,6 +308,10 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
}

if enableDualTransmission, ok := jb.OCR2OracleSpec.RelayConfig["enableDualTransmission"]; ok && enableDualTransmission != nil {
if jb.OCR2OracleSpec.Relay != relay.NetworkEVM {
return errors.New("dual transmission is enabled only for EVM")
}

rawDualTransmissionConfig, ok := jb.OCR2OracleSpec.RelayConfig["dualTransmission"]
if !ok {
return errors.New("dual transmission is enabled but no dual transmission config present")
Expand Down Expand Up @@ -341,6 +345,23 @@ func (o *orm) CreateJob(ctx context.Context, jb *Job) error {
return errors.Wrap(err, "unknown dual transmission transmitterAddress")
}

// Check if secondary transmitter address is used as primary somewhere else
hasLock, err2 := checkIfKeyHasLock(ctx, tx.keyStore.Eth(), common.HexToAddress(dtTransmitterAddress), keystore.TXMv1)
if err2 != nil {
return err2
} else if hasLock {
return errors.Errorf("key %s cannot be a secondary transmitter address because it's used a primary transmitter in another job", dtTransmitterAddress)
}
}

// Check if primary transmitter address is used as secondary somewhere else, don't check for mercury as it uses CSA keys for transmitters
if jb.OCR2OracleSpec.PluginType != types.Mercury {
hasLock, err2 := checkIfKeyHasLock(ctx, tx.keyStore.Eth(), common.HexToAddress(jb.OCR2OracleSpec.TransmitterID.String), keystore.TXMv2)
if err2 != nil {
return err2
} else if hasLock {
return errors.Errorf("key %s cannot be a (primary) transmitter address because it's used a secondary transmitter address in another job", jb.OCR2OracleSpec.TransmitterID.String)
}
}

specID, err := tx.insertOCR2OracleSpec(ctx, jb.OCR2OracleSpec)
Expand Down Expand Up @@ -1745,3 +1766,12 @@ func validateDualTransmissionMeta(meta map[string]interface{}) error {

return nil
}

func checkIfKeyHasLock(ctx context.Context, ks keystore.Eth, address common.Address, usage keystore.ServiceType) (bool, error) {
rm, err := ks.GetResourceMutex(ctx, address)
if err != nil {
return false, err
}

return rm.IsLocked(usage)
}
21 changes: 21 additions & 0 deletions core/services/keystore/eth.go
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ type Eth interface {
GetStateForKey(ctx context.Context, key ethkey.KeyV2) (ethkey.State, error)
GetStatesForChain(ctx context.Context, chainID *big.Int) ([]ethkey.State, error)
EnabledAddressesForChain(ctx context.Context, chainID *big.Int) (addresses []common.Address, err error)
GetResourceMutex(ctx context.Context, address common.Address) (*ResourceMutex, error)

XXXTestingOnlySetState(ctx context.Context, keyState ethkey.State)
XXXTestingOnlyAdd(ctx context.Context, key ethkey.KeyV2)
Expand All @@ -59,6 +60,26 @@ type eth struct {
ds sqlutil.DataSource
subscribers [](chan struct{})
subscribersMu *sync.RWMutex
resourceMutex map[common.Address]*ResourceMutex // ResourceMutex is an internal field and ought not be persisted to the database. Its main usage is to verify that the same key is not used for both TXMv1 and TXMv2 (usage in both TXMs will cause nonce drift and will lead to missing transactions). This functionality should be removed after we completely switch to TXMv2
}

// GetResourceMutex gets the resource mutex associates with the address if no resource mutex is found a new one is created
func (ks *eth) GetResourceMutex(ctx context.Context, address common.Address) (*ResourceMutex, error) {
ks.lock.Lock()
defer ks.lock.Unlock()
if ks.isLocked() {
return nil, ErrLocked
}

if ks.resourceMutex == nil {
ks.resourceMutex = make(map[common.Address]*ResourceMutex)
}

_, exists := ks.resourceMutex[address]
if !exists {
ks.resourceMutex[address] = NewResourceMutex()
}
return ks.resourceMutex[address], nil
}

var _ Eth = &eth{}
Expand Down
61 changes: 61 additions & 0 deletions core/services/keystore/mocks/eth.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

68 changes: 68 additions & 0 deletions core/services/keystore/models.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"encoding/json"
"fmt"
"math/big"
"sync"
"time"

gethkeystore "github.com/ethereum/go-ethereum/accounts/keystore"
Expand Down Expand Up @@ -423,3 +424,70 @@ func (rawKeys rawKeyRing) keys() (*keyRing, error) {
func adulteratedPassword(password string) string {
return "master-password-" + password
}

type ResourceMutex struct {
mu sync.Mutex
serviceType ServiceType
count int // Tracks active users per service type
}
type ServiceType int

const (
TXMv1 ServiceType = iota
TXMv2
)

// TryLock attempts to lock the resource for the specified service type.
// It returns an error if the resource is locked by a different service type.
func (rm *ResourceMutex) TryLock(serviceType ServiceType) error {
rm.mu.Lock()
defer rm.mu.Unlock()

if rm.count == 0 {
rm.serviceType = serviceType
}

// Check if other service types are using the resource
if rm.serviceType != serviceType && rm.count > 0 {
return errors.New("resource is locked by another service type")
}

// Increment active count for the current service type
rm.count++
return nil
}

// Unlock releases the lock for the service type
func (rm *ResourceMutex) Unlock(serviceType ServiceType) error {
rm.mu.Lock()
defer rm.mu.Unlock()

// Check if the service type has an active lock
if rm.count == 0 {
return errors.New("no active lock")
}

if rm.serviceType != serviceType {
return errors.New("no active lock for this service type")
}

// Decrement active count for the service type
rm.count--
return nil
}

// IsLocked checks if the resource is locked by a specific service type.
func (rm *ResourceMutex) IsLocked(serviceType ServiceType) (bool, error) {
rm.mu.Lock()
defer rm.mu.Unlock()

if rm.count == 0 || rm.serviceType != serviceType {
return false, nil
}

return true, nil
}

func NewResourceMutex() *ResourceMutex {
return &ResourceMutex{}
}
Loading

0 comments on commit eef9932

Please sign in to comment.