Skip to content

Commit

Permalink
operator register quorums
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim committed Feb 22, 2024
1 parent 8d5eb99 commit 1d25acf
Show file tree
Hide file tree
Showing 7 changed files with 235 additions and 88 deletions.
4 changes: 2 additions & 2 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (t *MockTransactor) RegisterOperator(
operatorToAvsRegistrationSigSalt [32]byte,
operatorToAvsRegistrationSigExpiry *big.Int,
) error {
args := t.Called()
args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry)
return args.Error(0)
}

Expand All @@ -56,7 +56,7 @@ func (t *MockTransactor) RegisterOperatorWithChurn(
operatorToAvsRegistrationSigSalt [32]byte,
operatorToAvsRegistrationSigExpiry *big.Int,
churnReply *churner.ChurnReply) error {
args := t.Called()
args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry, churnReply)
return args.Error(0)
}

Expand Down
93 changes: 93 additions & 0 deletions node/churner_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package node

import (
"context"
"crypto/tls"
"time"

churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/churner"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type ChurnerClient interface {
Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error)
}

type churnerClient struct {
churnerURL string
useSecureGrpc bool
timeout time.Duration
logger common.Logger
}

func NewChurnerClient(churnerURL string, useSecureGrpc bool, timeout time.Duration, logger common.Logger) ChurnerClient {
return &churnerClient{
churnerURL: churnerURL,
useSecureGrpc: useSecureGrpc,
timeout: timeout,
logger: logger,
}
}

func (c *churnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) {
// generate salt
privateKeyBytes := []byte(keyPair.PrivKey.String())
salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumIDs[:], privateKeyBytes)

churnRequest := &churner.ChurnRequest{
OperatorAddress: gethcommon.HexToAddress(operatorAddress),
OperatorToRegisterPubkeyG1: keyPair.PubKey,
OperatorToRegisterPubkeyG2: keyPair.GetPubKeyG2(),
OperatorRequestSignature: &core.Signature{},
QuorumIDs: quorumIDs,
}

copy(churnRequest.Salt[:], salt)

// sign the request
churnRequest.OperatorRequestSignature = keyPair.SignMessage(churner.CalculateRequestHash(churnRequest))

// convert to protobuf
churnRequestPb := &churnerpb.ChurnRequest{
OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(),
OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(),
OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(),
Salt: salt[:],
OperatorAddress: operatorAddress,
}

churnRequestPb.QuorumIds = make([]uint32, len(quorumIDs))
for i, quorumID := range quorumIDs {
churnRequestPb.QuorumIds[i] = uint32(quorumID)
}
credential := insecure.NewCredentials()
if c.useSecureGrpc {
config := &tls.Config{}
credential = credentials.NewTLS(config)
}

conn, err := grpc.Dial(
c.churnerURL,
grpc.WithTransportCredentials(credential),
)
if err != nil {
c.logger.Error("Node cannot connect to churner", "err", err)
return nil, err
}
defer conn.Close()

gc := churnerpb.NewChurnerClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)

return gc.Churn(ctx, churnRequestPb, opt)
}
30 changes: 30 additions & 0 deletions node/mock/churner_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mock

import (
"context"

churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/node"
"github.com/stretchr/testify/mock"
)

type ChurnerClient struct {
mock.Mock
}

var _ node.ChurnerClient = (*ChurnerClient)(nil)

func (c *ChurnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) {
args := c.Called()
var reply *churnerpb.ChurnReply
if args.Get(0) != nil {
reply = (args.Get(0)).(*churnerpb.ChurnReply)
}

var err error
if args.Get(1) != nil {
err = (args.Get(1)).(error)
}
return reply, err
}
3 changes: 2 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -203,7 +203,8 @@ func (n *Node) Start(ctx context.Context) error {
OperatorId: n.Config.ID,
QuorumIDs: n.Config.QuorumIDList,
}
err = RegisterOperator(ctx, operator, n.Transactor, n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Logger)
churnerClient := NewChurnerClient(n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Config.Timeout, n.Logger)
err = RegisterOperator(ctx, operator, n.Transactor, churnerClient, n.Logger)
if err != nil {
return fmt.Errorf("failed to register the operator: %w", err)
}
Expand Down
108 changes: 25 additions & 83 deletions node/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@ package node
import (
"context"
"crypto/ecdsa"
"crypto/tls"
"errors"
"fmt"
"math/big"
"slices"
"time"

grpcchurner "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/churner"
"github.com/Layr-Labs/eigenda/common"
"github.com/Layr-Labs/eigenda/core"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type Operator struct {
Expand All @@ -31,27 +24,21 @@ type Operator struct {
}

// RegisterOperator operator registers the operator with the given public key for the given quorum IDs.
func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerUrl string, useSecureGrpc bool, logger common.Logger) error {
registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, operator.OperatorId)
func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerClient ChurnerClient, logger common.Logger) error {
quorumsToRegister, err := operator.getQuorumIdsToRegister(ctx, transactor)
if err != nil {
return fmt.Errorf("failed to get registered quorum ids for an operator: %w", err)
return fmt.Errorf("failed to get quorum ids to register: %w", err)
}

logger.Debug("Registered quorum ids", "registeredQuorumIds", registeredQuorumIds)
if len(registeredQuorumIds) != 0 {
if len(quorumsToRegister) == 0 {
return nil
}

logger.Info("Quorums to register for", "quorums", operator.QuorumIDs)

if len(operator.QuorumIDs) == 0 {
return errors.New("an operator should be in at least one quorum to be useful")
}
logger.Info("Quorums to register for", "quorums", quorumsToRegister)

// register for quorums
shouldCallChurner := false
// check if one of the quorums to register for is full
for _, quorumID := range operator.QuorumIDs {
for _, quorumID := range quorumsToRegister {
operatorSetParams, err := transactor.GetOperatorSetParams(ctx, quorumID)
if err != nil {
return err
Expand All @@ -75,22 +62,22 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T

privateKeyBytes := []byte(operator.KeyPair.PrivKey.String())
salt := [32]byte{}
copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), operator.QuorumIDs[:], privateKeyBytes))
copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumsToRegister, privateKeyBytes))

// Get the current block number
expiry := big.NewInt((time.Now().Add(10 * time.Minute)).Unix())

// if we should call the churner, call it
if shouldCallChurner {
churnReply, err := requestChurnApproval(ctx, operator, churnerUrl, useSecureGrpc, logger)
churnReply, err := churnerClient.Churn(ctx, operator.Address, operator.KeyPair, quorumsToRegister)
if err != nil {
return fmt.Errorf("failed to request churn approval: %w", err)
}

return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry, churnReply)
return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry, churnReply)
} else {
// other wise just register normally
return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry)
return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry)
}
}

Expand All @@ -108,83 +95,38 @@ func UpdateOperatorQuorums(
ctx context.Context,
operator *Operator,
transactor core.Transactor,
churnerUrl string,
useSecureGrpc bool,
churnerClient ChurnerClient,
logger common.Logger,
) error {
err := DeregisterOperator(ctx, operator.KeyPair, transactor)
if err != nil {
return fmt.Errorf("failed to deregister operator: %w", err)
}
return RegisterOperator(ctx, operator, transactor, churnerUrl, useSecureGrpc, logger)
return RegisterOperator(ctx, operator, transactor, churnerClient, logger)
}

// UpdateOperatorSocket updates the socket for the given operator
func UpdateOperatorSocket(ctx context.Context, transactor core.Transactor, socket string) error {
return transactor.UpdateOperatorSocket(ctx, socket)
}

func requestChurnApproval(ctx context.Context, operator *Operator, churnerUrl string, useSecureGrpc bool, logger common.Logger) (*grpcchurner.ChurnReply, error) {
logger.Info("churner url", "url", churnerUrl)

credential := insecure.NewCredentials()
if useSecureGrpc {
config := &tls.Config{}
credential = credentials.NewTLS(config)
// getQuorumIdsToRegister returns the quorum ids that the operator is not registered in.
func (c *Operator) getQuorumIdsToRegister(ctx context.Context, transactor core.Transactor) ([]core.QuorumID, error) {
if len(c.QuorumIDs) == 0 {
return nil, fmt.Errorf("an operator should be in at least one quorum to be useful")
}

conn, err := grpc.Dial(
churnerUrl,
grpc.WithTransportCredentials(credential),
)
registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, c.OperatorId)
if err != nil {
logger.Error("Node cannot connect to churner", "err", err)
return nil, err
return nil, fmt.Errorf("failed to get registered quorum ids for an operator: %w", err)
}
defer conn.Close()

gc := grpcchurner.NewChurnerClient(conn)
ctx, cancel := context.WithTimeout(ctx, operator.Timeout)
defer cancel()

request := newChurnRequest(operator.Address, operator.KeyPair, operator.QuorumIDs)
opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)

return gc.Churn(ctx, request, opt)
}

func newChurnRequest(address string, KeyPair *core.KeyPair, QuorumIDs []core.QuorumID) *grpcchurner.ChurnRequest {

// generate salt
privateKeyBytes := []byte(KeyPair.PrivKey.String())
salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), QuorumIDs[:], privateKeyBytes)

churnRequest := &churner.ChurnRequest{
OperatorAddress: gethcommon.HexToAddress(address),
OperatorToRegisterPubkeyG1: KeyPair.PubKey,
OperatorToRegisterPubkeyG2: KeyPair.GetPubKeyG2(),
OperatorRequestSignature: &core.Signature{},
QuorumIDs: QuorumIDs,
}

copy(churnRequest.Salt[:], salt)

// sign the request
churnRequest.OperatorRequestSignature = KeyPair.SignMessage(churner.CalculateRequestHash(churnRequest))

// convert to protobuf
churnRequestPb := &grpcchurner.ChurnRequest{
OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(),
OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(),
OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(),
Salt: salt[:],
OperatorAddress: address,
}

churnRequestPb.QuorumIds = make([]uint32, len(QuorumIDs))
for i, quorumID := range QuorumIDs {
churnRequestPb.QuorumIds[i] = uint32(quorumID)
quorumIdsToRegister := make([]core.QuorumID, 0, len(c.QuorumIDs))
for _, quorumID := range c.QuorumIDs {
if !slices.Contains(registeredQuorumIds, quorumID) {
quorumIdsToRegister = append(quorumIdsToRegister, quorumID)
}
}

return churnRequestPb
return quorumIdsToRegister, nil
}
Loading

0 comments on commit 1d25acf

Please sign in to comment.