Skip to content

Commit

Permalink
operator register quorums
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Mar 20, 2024
1 parent 43b5464 commit 821ffe9
Show file tree
Hide file tree
Showing 7 changed files with 230 additions and 84 deletions.
4 changes: 2 additions & 2 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (t *MockTransactor) RegisterOperator(
operatorToAvsRegistrationSigSalt [32]byte,
operatorToAvsRegistrationSigExpiry *big.Int,
) error {
args := t.Called()
args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry)
return args.Error(0)
}

Expand All @@ -56,7 +56,7 @@ func (t *MockTransactor) RegisterOperatorWithChurn(
operatorToAvsRegistrationSigSalt [32]byte,
operatorToAvsRegistrationSigExpiry *big.Int,
churnReply *churner.ChurnReply) error {
args := t.Called()
args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry, churnReply)
return args.Error(0)
}

Expand Down
93 changes: 93 additions & 0 deletions node/churner_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package node

import (
"context"
"crypto/tls"
"time"

churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/operators/churner"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type ChurnerClient interface {
Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error)
}

type churnerClient struct {
churnerURL string
useSecureGrpc bool
timeout time.Duration
logger logging.Logger
}

func NewChurnerClient(churnerURL string, useSecureGrpc bool, timeout time.Duration, logger logging.Logger) ChurnerClient {
return &churnerClient{
churnerURL: churnerURL,
useSecureGrpc: useSecureGrpc,
timeout: timeout,
logger: logger,
}
}

func (c *churnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) {
// generate salt
privateKeyBytes := []byte(keyPair.PrivKey.String())
salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumIDs[:], privateKeyBytes)

churnRequest := &churner.ChurnRequest{
OperatorAddress: gethcommon.HexToAddress(operatorAddress),
OperatorToRegisterPubkeyG1: keyPair.PubKey,
OperatorToRegisterPubkeyG2: keyPair.GetPubKeyG2(),
OperatorRequestSignature: &core.Signature{},
QuorumIDs: quorumIDs,
}

copy(churnRequest.Salt[:], salt)

// sign the request
churnRequest.OperatorRequestSignature = keyPair.SignMessage(churner.CalculateRequestHash(churnRequest))

// convert to protobuf
churnRequestPb := &churnerpb.ChurnRequest{
OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(),
OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(),
OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(),
Salt: salt[:],
OperatorAddress: operatorAddress,
}

churnRequestPb.QuorumIds = make([]uint32, len(quorumIDs))
for i, quorumID := range quorumIDs {
churnRequestPb.QuorumIds[i] = uint32(quorumID)
}
credential := insecure.NewCredentials()
if c.useSecureGrpc {
config := &tls.Config{}
credential = credentials.NewTLS(config)
}

conn, err := grpc.Dial(
c.churnerURL,
grpc.WithTransportCredentials(credential),
)
if err != nil {
c.logger.Error("Node cannot connect to churner", "err", err)
return nil, err
}
defer conn.Close()

gc := churnerpb.NewChurnerClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)

return gc.Churn(ctx, churnRequestPb, opt)
}
30 changes: 30 additions & 0 deletions node/mock/churner_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mock

import (
"context"

churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/node"
"github.com/stretchr/testify/mock"
)

type ChurnerClient struct {
mock.Mock
}

var _ node.ChurnerClient = (*ChurnerClient)(nil)

func (c *ChurnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) {
args := c.Called()
var reply *churnerpb.ChurnReply
if args.Get(0) != nil {
reply = (args.Get(0)).(*churnerpb.ChurnReply)
}

var err error
if args.Get(1) != nil {
err = (args.Get(1)).(error)
}
return reply, err
}
3 changes: 2 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ func (n *Node) Start(ctx context.Context) error {
OperatorId: n.Config.ID,
QuorumIDs: n.Config.QuorumIDList,
}
err = RegisterOperator(ctx, operator, n.Transactor, n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Logger)
churnerClient := NewChurnerClient(n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Config.Timeout, n.Logger)
err = RegisterOperator(ctx, operator, n.Transactor, churnerClient, n.Logger)
if err != nil {
return fmt.Errorf("failed to register the operator: %w", err)
}
Expand Down
103 changes: 23 additions & 80 deletions node/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@ package node
import (
"context"
"crypto/ecdsa"
"crypto/tls"
"errors"
"fmt"
"math/big"
"slices"
"time"

grpcchurner "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/operators/churner"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type Operator struct {
Expand All @@ -31,27 +24,21 @@ type Operator struct {
}

// RegisterOperator operator registers the operator with the given public key for the given quorum IDs.
func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerUrl string, useSecureGrpc bool, logger logging.Logger) error {
registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, operator.OperatorId)
func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerClient ChurnerClient, logger logging.Logger) error {
quorumsToRegister, err := operator.getQuorumIdsToRegister(ctx, transactor)
if err != nil {
return fmt.Errorf("failed to get registered quorum ids for an operator: %w", err)
return fmt.Errorf("failed to get quorum ids to register: %w", err)
}

logger.Debug("Registered quorum ids", "registeredQuorumIds", registeredQuorumIds)
if len(registeredQuorumIds) != 0 {
if len(quorumsToRegister) == 0 {
return nil
}

logger.Info("Quorums to register for", "quorums", operator.QuorumIDs)

if len(operator.QuorumIDs) == 0 {
return errors.New("an operator should be in at least one quorum to be useful")
}
logger.Info("Quorums to register for", "quorums", quorumsToRegister)

// register for quorums
shouldCallChurner := false
// check if one of the quorums to register for is full
for _, quorumID := range operator.QuorumIDs {
for _, quorumID := range quorumsToRegister {
operatorSetParams, err := transactor.GetOperatorSetParams(ctx, quorumID)
if err != nil {
return err
Expand All @@ -75,22 +62,22 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T

privateKeyBytes := []byte(operator.KeyPair.PrivKey.String())
salt := [32]byte{}
copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), operator.QuorumIDs[:], privateKeyBytes))
copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumsToRegister, privateKeyBytes))

// Get the current block number
expiry := big.NewInt((time.Now().Add(10 * time.Minute)).Unix())

// if we should call the churner, call it
if shouldCallChurner {
churnReply, err := requestChurnApproval(ctx, operator, churnerUrl, useSecureGrpc, logger)
churnReply, err := churnerClient.Churn(ctx, operator.Address, operator.KeyPair, quorumsToRegister)
if err != nil {
return fmt.Errorf("failed to request churn approval: %w", err)
}

return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry, churnReply)
return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry, churnReply)
} else {
// other wise just register normally
return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry)
return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry)
}
}

Expand All @@ -108,67 +95,23 @@ func UpdateOperatorSocket(ctx context.Context, transactor core.Transactor, socke
return transactor.UpdateOperatorSocket(ctx, socket)
}

func requestChurnApproval(ctx context.Context, operator *Operator, churnerUrl string, useSecureGrpc bool, logger logging.Logger) (*grpcchurner.ChurnReply, error) {
logger.Info("churner url", "url", churnerUrl)

credential := insecure.NewCredentials()
if useSecureGrpc {
config := &tls.Config{}
credential = credentials.NewTLS(config)
// getQuorumIdsToRegister returns the quorum ids that the operator is not registered in.
func (c *Operator) getQuorumIdsToRegister(ctx context.Context, transactor core.Transactor) ([]core.QuorumID, error) {
if len(c.QuorumIDs) == 0 {
return nil, fmt.Errorf("an operator should be in at least one quorum to be useful")
}

conn, err := grpc.Dial(
churnerUrl,
grpc.WithTransportCredentials(credential),
)
registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, c.OperatorId)
if err != nil {
logger.Error("Node cannot connect to churner", "err", err)
return nil, err
return nil, fmt.Errorf("failed to get registered quorum ids for an operator: %w", err)
}
defer conn.Close()

gc := grpcchurner.NewChurnerClient(conn)
ctx, cancel := context.WithTimeout(ctx, operator.Timeout)
defer cancel()

request := newChurnRequest(operator.Address, operator.KeyPair, operator.QuorumIDs)
opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)

return gc.Churn(ctx, request, opt)
}

func newChurnRequest(address string, KeyPair *core.KeyPair, QuorumIDs []core.QuorumID) *grpcchurner.ChurnRequest {

// generate salt
privateKeyBytes := []byte(KeyPair.PrivKey.String())
salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), QuorumIDs[:], privateKeyBytes)

churnRequest := &churner.ChurnRequest{
OperatorAddress: gethcommon.HexToAddress(address),
OperatorToRegisterPubkeyG1: KeyPair.PubKey,
OperatorToRegisterPubkeyG2: KeyPair.GetPubKeyG2(),
OperatorRequestSignature: &core.Signature{},
QuorumIDs: QuorumIDs,
}

copy(churnRequest.Salt[:], salt)

// sign the request
churnRequest.OperatorRequestSignature = KeyPair.SignMessage(churner.CalculateRequestHash(churnRequest))

// convert to protobuf
churnRequestPb := &grpcchurner.ChurnRequest{
OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(),
OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(),
OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(),
Salt: salt[:],
OperatorAddress: address,
}

churnRequestPb.QuorumIds = make([]uint32, len(QuorumIDs))
for i, quorumID := range QuorumIDs {
churnRequestPb.QuorumIds[i] = uint32(quorumID)
quorumIdsToRegister := make([]core.QuorumID, 0, len(c.QuorumIDs))
for _, quorumID := range c.QuorumIDs {
if !slices.Contains(registeredQuorumIds, quorumID) {
quorumIdsToRegister = append(quorumIdsToRegister, quorumID)
}
}

return churnRequestPb
return quorumIdsToRegister, nil
}
78 changes: 78 additions & 0 deletions node/operator_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
package node_test

import (
"context"
"testing"
"time"

"github.com/Layr-Labs/eigenda/core"
coremock "github.com/Layr-Labs/eigenda/core/mock"
"github.com/Layr-Labs/eigenda/node"
nodemock "github.com/Layr-Labs/eigenda/node/mock"
"github.com/Layr-Labs/eigensdk-go/logging"
"github.com/ethereum/go-ethereum/common/hexutil"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/mock"
)

func TestRegisterOperator(t *testing.T) {
logger := logging.NewNoopLogger()
operatorID := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad"))
keyPair, err := core.GenRandomBlsKeys()
assert.NoError(t, err)
// Create a new operator
operator := &node.Operator{
Address: "0xB7Ad27737D88B07De48CDc2f379917109E993Be4",
Socket: "localhost:50051",
Timeout: 10 * time.Second,
PrivKey: nil,
KeyPair: keyPair,
OperatorId: operatorID,
QuorumIDs: []core.QuorumID{0, 1},
}
tx := &coremock.MockTransactor{}
tx.On("GetRegisteredQuorumIdsForOperator").Return([]uint8{0}, nil)
tx.On("GetOperatorSetParams", mock.Anything, mock.Anything).Return(&core.OperatorSetParam{
MaxOperatorCount: 1,
ChurnBIPsOfOperatorStake: 20,
ChurnBIPsOfTotalStake: 20000,
}, nil)
tx.On("GetNumberOfRegisteredOperatorForQuorum").Return(uint32(0), nil)
tx.On("RegisterOperator", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
churnerClient := &nodemock.ChurnerClient{}
churnerClient.On("Churn").Return(nil, nil)
err = node.RegisterOperator(context.Background(), operator, tx, churnerClient, logger)
assert.NoError(t, err)
tx.AssertCalled(t, "RegisterOperator", mock.Anything, mock.Anything, mock.Anything, []core.QuorumID{1}, mock.Anything, mock.Anything, mock.Anything)
}

func TestRegisterOperatorWithChurn(t *testing.T) {
logger := logging.NewNoopLogger()
operatorID := [32]byte(hexutil.MustDecode("0x3fbfefcdc76462d2cdb7d0cea75f27223829481b8b4aa6881c94cb2126a316ad"))
keyPair, err := core.GenRandomBlsKeys()
assert.NoError(t, err)
// Create a new operator
operator := &node.Operator{
Address: "0xB7Ad27737D88B07De48CDc2f379917109E993Be4",
Socket: "localhost:50051",
Timeout: 10 * time.Second,
PrivKey: nil,
KeyPair: keyPair,
OperatorId: operatorID,
QuorumIDs: []core.QuorumID{0, 1},
}
tx := &coremock.MockTransactor{}
tx.On("GetRegisteredQuorumIdsForOperator").Return([]uint8{0}, nil)
tx.On("GetOperatorSetParams", mock.Anything, mock.Anything).Return(&core.OperatorSetParam{
MaxOperatorCount: 1,
ChurnBIPsOfOperatorStake: 20,
ChurnBIPsOfTotalStake: 20000,
}, nil)
tx.On("GetNumberOfRegisteredOperatorForQuorum").Return(uint32(1), nil)
tx.On("RegisterOperatorWithChurn", mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything, mock.Anything).Return(nil)
churnerClient := &nodemock.ChurnerClient{}
churnerClient.On("Churn").Return(nil, nil)
err = node.RegisterOperator(context.Background(), operator, tx, churnerClient, logger)
assert.NoError(t, err)
tx.AssertCalled(t, "RegisterOperatorWithChurn", mock.Anything, mock.Anything, mock.Anything, []core.QuorumID{1}, mock.Anything, mock.Anything, mock.Anything, mock.Anything)
}
Loading

0 comments on commit 821ffe9

Please sign in to comment.