Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix: make operators able to join all specified quorums #268

Merged
merged 1 commit into from
Mar 22, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions core/mock/tx.go
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ func (t *MockTransactor) RegisterOperator(
operatorToAvsRegistrationSigSalt [32]byte,
operatorToAvsRegistrationSigExpiry *big.Int,
) error {
args := t.Called()
args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry)
return args.Error(0)
}

Expand All @@ -56,7 +56,7 @@ func (t *MockTransactor) RegisterOperatorWithChurn(
operatorToAvsRegistrationSigSalt [32]byte,
operatorToAvsRegistrationSigExpiry *big.Int,
churnReply *churner.ChurnReply) error {
args := t.Called()
args := t.Called(ctx, keypair, socket, quorumIds, operatorEcdsaPrivateKey, operatorToAvsRegistrationSigSalt, operatorToAvsRegistrationSigExpiry, churnReply)
return args.Error(0)
}

Expand Down
100 changes: 100 additions & 0 deletions node/churner_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package node

import (
"context"
"crypto/tls"
"errors"
"time"

churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/operators/churner"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type ChurnerClient interface {
// Churn sends a churn request to the churner service
// The quorumIDs cannot be empty, but may contain quorums that the operator is already registered in.
// If the operator is already registered in a quorum, the churner will ignore it and continue with the other quorums.
Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error)
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
}

type churnerClient struct {
churnerURL string
useSecureGrpc bool
timeout time.Duration
logger logging.Logger
}

func NewChurnerClient(churnerURL string, useSecureGrpc bool, timeout time.Duration, logger logging.Logger) ChurnerClient {
return &churnerClient{
churnerURL: churnerURL,
useSecureGrpc: useSecureGrpc,
timeout: timeout,
logger: logger,
}
}

func (c *churnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) {
if len(quorumIDs) == 0 {
return nil, errors.New("quorumIDs cannot be empty")
}
// generate salt
privateKeyBytes := []byte(keyPair.PrivKey.String())
salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumIDs[:], privateKeyBytes)

churnRequest := &churner.ChurnRequest{
OperatorAddress: gethcommon.HexToAddress(operatorAddress),
OperatorToRegisterPubkeyG1: keyPair.PubKey,
OperatorToRegisterPubkeyG2: keyPair.GetPubKeyG2(),
OperatorRequestSignature: &core.Signature{},
QuorumIDs: quorumIDs,
}

copy(churnRequest.Salt[:], salt)

// sign the request
churnRequest.OperatorRequestSignature = keyPair.SignMessage(churner.CalculateRequestHash(churnRequest))

// convert to protobuf
churnRequestPb := &churnerpb.ChurnRequest{
OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(),
OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(),
OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(),
Salt: salt[:],
OperatorAddress: operatorAddress,
}

churnRequestPb.QuorumIds = make([]uint32, len(quorumIDs))
for i, quorumID := range quorumIDs {
churnRequestPb.QuorumIds[i] = uint32(quorumID)
}
credential := insecure.NewCredentials()
if c.useSecureGrpc {
config := &tls.Config{}
credential = credentials.NewTLS(config)
}

conn, err := grpc.Dial(
c.churnerURL,
grpc.WithTransportCredentials(credential),
)
if err != nil {
c.logger.Error("Node cannot connect to churner", "err", err)
return nil, err
}
defer conn.Close()

gc := churnerpb.NewChurnerClient(conn)
ctx, cancel := context.WithTimeout(ctx, c.timeout)
defer cancel()

opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)

return gc.Churn(ctx, churnRequestPb, opt)
}
3 changes: 3 additions & 0 deletions node/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,6 +90,9 @@ func NewConfig(ctx *cli.Context) (*Config, error) {
}
ids = append(ids, core.QuorumID(val))
}
if len(ids) == 0 {
return nil, errors.New("no quorum ids provided")
}

expirationPollIntervalSec := ctx.GlobalUint64(flags.ExpirationPollIntervalSecFlag.Name)
if expirationPollIntervalSec <= minExpirationPollIntervalSec {
Expand Down
2 changes: 1 addition & 1 deletion node/flags/flags.go
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ var (
}
QuorumIDListFlag = cli.StringFlag{
Name: common.PrefixFlag(FlagPrefix, "quorum-id-list"),
Usage: "Comma separated list of quorum IDs that the node will participate in",
Usage: "Comma separated list of quorum IDs that the node will participate in. There should be at least one quorum ID. This list can contain quorums node is already registered with. If the node opts in to quorums already registered with, it will be a no-op.",
Required: true,
EnvVar: common.PrefixEnvVar(EnvVarPrefix, "QUORUM_ID_LIST"),
}
Expand Down
30 changes: 30 additions & 0 deletions node/mock/churner_client.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,30 @@
package mock

import (
"context"

churnerpb "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/node"
"github.com/stretchr/testify/mock"
)

type ChurnerClient struct {
mock.Mock
}

var _ node.ChurnerClient = (*ChurnerClient)(nil)

func (c *ChurnerClient) Churn(ctx context.Context, operatorAddress string, keyPair *core.KeyPair, quorumIDs []core.QuorumID) (*churnerpb.ChurnReply, error) {
args := c.Called()
var reply *churnerpb.ChurnReply
if args.Get(0) != nil {
reply = (args.Get(0)).(*churnerpb.ChurnReply)
}

var err error
if args.Get(1) != nil {
err = (args.Get(1)).(error)
}
return reply, err
}
3 changes: 2 additions & 1 deletion node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -200,7 +200,8 @@ func (n *Node) Start(ctx context.Context) error {
OperatorId: n.Config.ID,
QuorumIDs: n.Config.QuorumIDList,
}
err = RegisterOperator(ctx, operator, n.Transactor, n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Logger)
churnerClient := NewChurnerClient(n.Config.ChurnerUrl, n.Config.UseSecureGrpc, n.Config.Timeout, n.Logger)
err = RegisterOperator(ctx, operator, n.Transactor, churnerClient, n.Logger)
if err != nil {
return fmt.Errorf("failed to register the operator: %w", err)
}
Expand Down
103 changes: 23 additions & 80 deletions node/operator.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,21 +3,14 @@ package node
import (
"context"
"crypto/ecdsa"
"crypto/tls"
"errors"
"fmt"
"math/big"
"slices"
"time"

grpcchurner "github.com/Layr-Labs/eigenda/api/grpc/churner"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/operators/churner"
"github.com/Layr-Labs/eigensdk-go/logging"
gethcommon "github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/crypto"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials"
"google.golang.org/grpc/credentials/insecure"
)

type Operator struct {
Expand All @@ -31,27 +24,21 @@ type Operator struct {
}

// RegisterOperator operator registers the operator with the given public key for the given quorum IDs.
func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerUrl string, useSecureGrpc bool, logger logging.Logger) error {
registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, operator.OperatorId)
func RegisterOperator(ctx context.Context, operator *Operator, transactor core.Transactor, churnerClient ChurnerClient, logger logging.Logger) error {
quorumsToRegister, err := operator.getQuorumIdsToRegister(ctx, transactor)
if err != nil {
return fmt.Errorf("failed to get registered quorum ids for an operator: %w", err)
return fmt.Errorf("failed to get quorum ids to register: %w", err)
}

logger.Debug("Registered quorum ids", "registeredQuorumIds", registeredQuorumIds)
if len(registeredQuorumIds) != 0 {
if len(quorumsToRegister) == 0 {
return nil
}

logger.Info("Quorums to register for", "quorums", operator.QuorumIDs)

if len(operator.QuorumIDs) == 0 {
return errors.New("an operator should be in at least one quorum to be useful")
}
logger.Info("Quorums to register for", "quorums", quorumsToRegister)

// register for quorums
shouldCallChurner := false
// check if one of the quorums to register for is full
for _, quorumID := range operator.QuorumIDs {
for _, quorumID := range quorumsToRegister {
operatorSetParams, err := transactor.GetOperatorSetParams(ctx, quorumID)
if err != nil {
return err
Expand All @@ -75,22 +62,22 @@ func RegisterOperator(ctx context.Context, operator *Operator, transactor core.T

privateKeyBytes := []byte(operator.KeyPair.PrivKey.String())
salt := [32]byte{}
copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), operator.QuorumIDs[:], privateKeyBytes))
copy(salt[:], crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), quorumsToRegister, privateKeyBytes))

// Get the current block number
expiry := big.NewInt((time.Now().Add(10 * time.Minute)).Unix())

// if we should call the churner, call it
if shouldCallChurner {
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It looks like there could be another issue here. Suppose an operator has enough stake to enter quorum 0 but not enough for quorum 1, and both quorums are full. Right now, the operator will try to register for both quorums and be rejected by the churner.

We probably need to either:

  • Locally simulate the churner check and filter out quorums for which the operator will not be able to join. This is probably the best option, since there's no point in making a churner call if we can tell the operation will fail.
  • Update the churner so that it can handle partial failures.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is this already fixed?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No. This should be handled in a separate ticket.

churnReply, err := requestChurnApproval(ctx, operator, churnerUrl, useSecureGrpc, logger)
churnReply, err := churnerClient.Churn(ctx, operator.Address, operator.KeyPair, quorumsToRegister)
if err != nil {
return fmt.Errorf("failed to request churn approval: %w", err)
}

return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry, churnReply)
return transactor.RegisterOperatorWithChurn(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry, churnReply)
} else {
// other wise just register normally
return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, operator.QuorumIDs, operator.PrivKey, salt, expiry)
return transactor.RegisterOperator(ctx, operator.KeyPair, operator.Socket, quorumsToRegister, operator.PrivKey, salt, expiry)
}
}

Expand All @@ -108,67 +95,23 @@ func UpdateOperatorSocket(ctx context.Context, transactor core.Transactor, socke
return transactor.UpdateOperatorSocket(ctx, socket)
}

func requestChurnApproval(ctx context.Context, operator *Operator, churnerUrl string, useSecureGrpc bool, logger logging.Logger) (*grpcchurner.ChurnReply, error) {
logger.Info("churner url", "url", churnerUrl)

credential := insecure.NewCredentials()
if useSecureGrpc {
config := &tls.Config{}
credential = credentials.NewTLS(config)
// getQuorumIdsToRegister returns the quorum ids that the operator is not registered in.
func (c *Operator) getQuorumIdsToRegister(ctx context.Context, transactor core.Transactor) ([]core.QuorumID, error) {
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
if len(c.QuorumIDs) == 0 {
return nil, fmt.Errorf("an operator should be in at least one quorum to be useful")
}

conn, err := grpc.Dial(
churnerUrl,
grpc.WithTransportCredentials(credential),
)
registeredQuorumIds, err := transactor.GetRegisteredQuorumIdsForOperator(ctx, c.OperatorId)
if err != nil {
logger.Error("Node cannot connect to churner", "err", err)
return nil, err
return nil, fmt.Errorf("failed to get registered quorum ids for an operator: %w", err)
}
defer conn.Close()

gc := grpcchurner.NewChurnerClient(conn)
ctx, cancel := context.WithTimeout(ctx, operator.Timeout)
defer cancel()

request := newChurnRequest(operator.Address, operator.KeyPair, operator.QuorumIDs)
opt := grpc.MaxCallSendMsgSize(1024 * 1024 * 300)

return gc.Churn(ctx, request, opt)
}

func newChurnRequest(address string, KeyPair *core.KeyPair, QuorumIDs []core.QuorumID) *grpcchurner.ChurnRequest {

// generate salt
privateKeyBytes := []byte(KeyPair.PrivKey.String())
salt := crypto.Keccak256([]byte("churn"), []byte(time.Now().String()), QuorumIDs[:], privateKeyBytes)

churnRequest := &churner.ChurnRequest{
OperatorAddress: gethcommon.HexToAddress(address),
OperatorToRegisterPubkeyG1: KeyPair.PubKey,
OperatorToRegisterPubkeyG2: KeyPair.GetPubKeyG2(),
OperatorRequestSignature: &core.Signature{},
QuorumIDs: QuorumIDs,
}

copy(churnRequest.Salt[:], salt)

// sign the request
churnRequest.OperatorRequestSignature = KeyPair.SignMessage(churner.CalculateRequestHash(churnRequest))

// convert to protobuf
churnRequestPb := &grpcchurner.ChurnRequest{
OperatorToRegisterPubkeyG1: churnRequest.OperatorToRegisterPubkeyG1.Serialize(),
OperatorToRegisterPubkeyG2: churnRequest.OperatorToRegisterPubkeyG2.Serialize(),
OperatorRequestSignature: churnRequest.OperatorRequestSignature.Serialize(),
Salt: salt[:],
OperatorAddress: address,
}

churnRequestPb.QuorumIds = make([]uint32, len(QuorumIDs))
for i, quorumID := range QuorumIDs {
churnRequestPb.QuorumIds[i] = uint32(quorumID)
quorumIdsToRegister := make([]core.QuorumID, 0, len(c.QuorumIDs))
for _, quorumID := range c.QuorumIDs {
if !slices.Contains(registeredQuorumIds, quorumID) {
quorumIdsToRegister = append(quorumIdsToRegister, quorumID)
}
}

return churnRequestPb
return quorumIdsToRegister, nil
}
Loading
Loading