Skip to content

Commit

Permalink
[v2][node] Construct relay client from relay address from chain (#931)
Browse files Browse the repository at this point in the history
  • Loading branch information
ian-shim authored Dec 2, 2024
1 parent 4ec69dc commit f3a9c52
Show file tree
Hide file tree
Showing 15 changed files with 262 additions and 30 deletions.
9 changes: 9 additions & 0 deletions api/clients/mock/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,15 @@ func (c *MockRelayClient) GetChunksByIndex(ctx context.Context, relayKey corev2.
return args.Get(0).([][]byte), args.Error(1)
}

func (c *MockRelayClient) GetSockets() map[corev2.RelayKey]string {
args := c.Called()
if args.Get(0) == nil {
return nil
}

return args.Get(0).(map[corev2.RelayKey]string)
}

func (c *MockRelayClient) Close() error {
args := c.Called()
return args.Error(0)
Expand Down
10 changes: 9 additions & 1 deletion api/clients/relay_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ type RelayClient interface {
// The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request.
// Each bundle is a sequence of frames in raw form (i.e., serialized core.Bundle bytearray).
GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([][]byte, error)
// GetSockets returns the relay sockets
GetSockets() map[corev2.RelayKey]string
Close() error
}

Expand All @@ -65,6 +67,8 @@ func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayCli
return nil, fmt.Errorf("invalid config: %v", config)
}

logger.Info("creating relay client", "config", config)

initOnce := sync.Map{}
for key := range config.Sockets {
initOnce.Store(key, &sync.Once{})
Expand All @@ -73,7 +77,7 @@ func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayCli
config: config,

initOnce: &initOnce,
logger: logger,
logger: logger.With("component", "RelayClient"),
}, nil
}

Expand Down Expand Up @@ -196,6 +200,10 @@ func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error {
return initErr
}

func (c *relayClient) GetSockets() map[corev2.RelayKey]string {
return c.config.Sockets
}

func (c *relayClient) Close() error {
var errList *multierror.Error
c.conns.Range(func(k, v interface{}) bool {
Expand Down
12 changes: 10 additions & 2 deletions common/read_only_map.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,14 @@
package common

type ReadOnlyMap[K comparable, V any] struct {
import (
"maps"
)

type ReadOnlyMap[K comparable, V comparable] struct {
data map[K]V
}

func NewReadOnlyMap[K comparable, V any](data map[K]V) *ReadOnlyMap[K, V] {
func NewReadOnlyMap[K comparable, V comparable](data map[K]V) *ReadOnlyMap[K, V] {
return &ReadOnlyMap[K, V]{data: data}
}

Expand All @@ -24,3 +28,7 @@ func (m *ReadOnlyMap[K, V]) Keys() []K {
func (m *ReadOnlyMap[K, V]) Len() int {
return len(m.data)
}

func (m *ReadOnlyMap[K, V]) Equal(data map[K]V) bool {
return maps.Equal(m.data, data)
}
6 changes: 6 additions & 0 deletions core/chainio.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,12 @@ type Reader interface {

// GetOnDemandPaymentByAccount returns on-demand payment of an account
GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint32, accountID string) (OnDemandPayment, error)

// GetRelayURL returns the relay URL address for the given key.
GetRelayURL(ctx context.Context, key uint16) (string, error)

// GetRelayURLs returns the relay URL addresses for all relays.
GetRelayURLs(ctx context.Context) (map[uint16]string, error)
}

type Writer interface {
Expand Down
54 changes: 54 additions & 0 deletions core/eth/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ import (
delegationmgr "github.com/Layr-Labs/eigenda/contracts/bindings/DelegationManager"
eigendasrvmg "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager"
ejectionmg "github.com/Layr-Labs/eigenda/contracts/bindings/EjectionManager"
relayreg "github.com/Layr-Labs/eigenda/contracts/bindings/IEigenDARelayRegistry"
indexreg "github.com/Layr-Labs/eigenda/contracts/bindings/IIndexRegistry"
opstateretriever "github.com/Layr-Labs/eigenda/contracts/bindings/OperatorStateRetriever"
regcoordinator "github.com/Layr-Labs/eigenda/contracts/bindings/RegistryCoordinator"
Expand Down Expand Up @@ -39,6 +40,7 @@ type ContractBindings struct {
EjectionManager *ejectionmg.ContractEjectionManager
AVSDirectory *avsdir.ContractAVSDirectory
SocketRegistry *socketreg.ContractSocketRegistry
RelayRegistry *relayreg.ContractIEigenDARelayRegistry
}

type Reader struct {
Expand Down Expand Up @@ -178,6 +180,18 @@ func (t *Reader) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDASe
return err
}

var contractRelayRegistry *relayreg.ContractIEigenDARelayRegistry
relayRegistryAddr, err := contractEigenDAServiceManager.EigenDARelayRegistry(&bind.CallOpts{})
if err != nil {
t.logger.Error("Failed to fetch IEigenDARelayRegistry contract", "err", err)
// TODO(ian-shim): return err when the contract is deployed
} else {
contractRelayRegistry, err = relayreg.NewContractIEigenDARelayRegistry(relayRegistryAddr, t.ethClient)
if err != nil {
t.logger.Error("Failed to fetch IEigenDARelayRegistry contract", "err", err)
}
}

t.bindings = &ContractBindings{
ServiceManagerAddr: eigenDAServiceManagerAddr,
RegCoordinatorAddr: registryCoordinatorAddr,
Expand All @@ -191,6 +205,7 @@ func (t *Reader) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDASe
StakeRegistry: contractStakeRegistry,
EigenDAServiceManager: contractEigenDAServiceManager,
DelegationManager: contractDelegationManager,
RelayRegistry: contractRelayRegistry,
}
return nil
}
Expand Down Expand Up @@ -689,3 +704,42 @@ func (t *Reader) GetOperatorSocket(ctx context.Context, operatorId core.Operator
}
return socket, nil
}

func (t *Reader) GetRelayURL(ctx context.Context, key uint16) (string, error) {
if t.bindings.RelayRegistry == nil {
return "", errors.New("relay registry not deployed")
}

return t.bindings.RelayRegistry.GetRelayURL(&bind.CallOpts{
Context: ctx,
}, uint32(key))
}

func (t *Reader) GetRelayURLs(ctx context.Context) (map[uint16]string, error) {
if t.bindings.RelayRegistry == nil {
return nil, errors.New("relay registry not deployed")
}

res := make(map[uint16]string)
relayKey := uint16(0)
for {
url, err := t.bindings.RelayRegistry.GetRelayURL(&bind.CallOpts{
Context: ctx,
}, uint32(relayKey))

if err != nil && strings.Contains(err.Error(), "execution reverted") {
break
} else if err != nil {
return nil, err
}

res[relayKey] = url
relayKey++
}

if len(res) == 0 {
return nil, errors.New("no relay URLs found")
}

return res, nil
}
19 changes: 19 additions & 0 deletions core/mock/writer.go
Original file line number Diff line number Diff line change
Expand Up @@ -209,6 +209,9 @@ func (t *MockWriter) GetVersionedBlobParams(ctx context.Context, blobVersion uin
func (t *MockWriter) GetAllVersionedBlobParams(ctx context.Context) (map[uint8]*core.BlobVersionParameters, error) {
args := t.Called()
result := args.Get(0)
if result == nil {
return nil, args.Error(1)
}
return result.(map[uint8]*core.BlobVersionParameters), args.Error(1)
}

Expand Down Expand Up @@ -247,3 +250,19 @@ func (t *MockWriter) GetOperatorSocket(ctx context.Context, operatorID core.Oper
result := args.Get(0)
return result.(string), args.Error(1)
}

func (t *MockWriter) GetRelayURL(ctx context.Context, key uint16) (string, error) {
args := t.Called()
result := args.Get(0)
return result.(string), args.Error(1)
}

func (t *MockWriter) GetRelayURLs(ctx context.Context) (map[uint16]string, error) {
args := t.Called()
result := args.Get(0)
if result == nil {
return nil, args.Error(1)
}

return result.(map[uint16]string), args.Error(1)
}
2 changes: 1 addition & 1 deletion core/v2/types.go
Original file line number Diff line number Diff line change
Expand Up @@ -161,7 +161,7 @@ func (b *BlobHeader) GetEncodingParams(blobParams *core.BlobVersionParameters) (
}, nil
}

type RelayKey uint16
type RelayKey = uint16

type BlobCertificate struct {
BlobHeader *BlobHeader
Expand Down
1 change: 1 addition & 0 deletions disperser/controller/dispatcher.go
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@ func (d *Dispatcher) Start(ctx context.Context) error {
if err != nil {
d.logger.Error("failed to handle signatures", "err", err)
}
// TODO(ian-shim): handle errors and mark failed
}()
}
}
Expand Down
5 changes: 4 additions & 1 deletion node/grpc/server_v2_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"context"
"errors"
"os"
"sync/atomic"
"testing"

"github.com/Layr-Labs/eigenda/api/clients"
Expand Down Expand Up @@ -68,6 +69,8 @@ func newTestComponents(t *testing.T, config *node.Config) *testComponents {

s := nodemock.NewMockStoreV2()
relay := clientsmock.NewRelayClient()
var atomicRelayClient atomic.Value
atomicRelayClient.Store(relay)
node := &node.Node{
Config: config,
Logger: logger,
Expand All @@ -76,7 +79,7 @@ func newTestComponents(t *testing.T, config *node.Config) *testComponents {
StoreV2: s,
ChainState: chainState,
ValidatorV2: val,
RelayClient: relay,
RelayClient: atomicRelayClient,
}
node.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParamsMap))
server := grpc.NewServerV2(config, node, logger, ratelimiter)
Expand Down
68 changes: 60 additions & 8 deletions node/node.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"errors"
"fmt"
"io"
"maps"
"math"
"math/big"
"net/http"
Expand Down Expand Up @@ -78,7 +79,7 @@ type Node struct {
ChainID *big.Int
BLSSigner blssignerV1.SignerClient

RelayClient clients.RelayClient
RelayClient atomic.Value

mu sync.Mutex
CurrentSocket string
Expand Down Expand Up @@ -223,8 +224,6 @@ func NewNode(
"quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval,
"eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding)

var relayClient clients.RelayClient

n := &Node{
Config: config,
Logger: nodeLogger,
Expand All @@ -239,7 +238,6 @@ func NewNode(
PubIPProvider: pubIPProvider,
OperatorSocketsFilterer: socketsFilterer,
ChainID: chainID,
RelayClient: relayClient,
BLSSigner: blsClient,
}

Expand All @@ -262,15 +260,31 @@ func NewNode(
if err != nil {
return nil, fmt.Errorf("failed to create new tablestore: %w", err)
}
storeV2 = NewLevelDBStoreV2(dbV2, logger)
timeToExpire := (blockStaleMeasure + storeDurationBlocks) * 12 // 12s per block
storeV2 = NewLevelDBStoreV2(dbV2, logger, time.Duration(timeToExpire)*time.Second)

blobParams, err := tx.GetAllVersionedBlobParams(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get versioned blob parameters: %w", err)
}
blobVersionParams = corev2.NewBlobVersionParameterMap(blobParams)

// TODO(ian-shim): Create a new relay client with relay addresses onchain
var relayClient clients.RelayClient
relayURLs, err := tx.GetRelayURLs(context.Background())
if err != nil {
return nil, fmt.Errorf("failed to get relay URLs: %w", err)
}

relayClient, err = clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relayURLs,
UseSecureGrpcFlag: config.UseSecureGrpc,
}, logger)

if err != nil {
return nil, fmt.Errorf("failed to create new relay client: %w", err)
}

n.RelayClient.Store(relayClient)
}

n.StoreV2 = storeV2
Expand Down Expand Up @@ -395,6 +409,10 @@ func (n *Node) expireLoop() {
}
}

// RefreshOnchainState refreshes the onchain state of the node.
// It fetches the latest blob parameters from the chain and updates the BlobVersionParams.
// It runs periodically based on the OnchainStateRefreshInterval.
// WARNING: this method is not thread-safe and should not be called concurrently.
func (n *Node) RefreshOnchainState(ctx context.Context) error {
if !n.Config.EnableV2 || n.Config.OnchainStateRefreshInterval <= 0 {
return nil
Expand All @@ -406,13 +424,47 @@ func (n *Node) RefreshOnchainState(ctx context.Context) error {
select {
case <-ticker.C:
n.Logger.Info("Refreshing onchain state")
existingBlobParams := n.BlobVersionParams.Load()
blobParams, err := n.Transactor.GetAllVersionedBlobParams(ctx)
if err != nil {
if err == nil {
if existingBlobParams == nil || !existingBlobParams.Equal(blobParams) {
n.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParams))
}
} else {
n.Logger.Error("error fetching blob params", "err", err)
}

existingRelayClient, ok := n.RelayClient.Load().(clients.RelayClient)
if !ok {
n.Logger.Error("error fetching relay client")
continue
}

existingURLs := map[v2.RelayKey]string{}
if existingRelayClient != nil {
existingURLs = existingRelayClient.GetSockets()
}
relayURLs, err := n.Transactor.GetRelayURLs(ctx)
if err != nil {
n.Logger.Error("error fetching relay URLs", "err", err)
continue
}

if maps.Equal(existingURLs, relayURLs) {
n.Logger.Info("No change in relay URLs")
continue
}

relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{
Sockets: relayURLs,
UseSecureGrpcFlag: n.Config.UseSecureGrpc,
}, n.Logger)
if err != nil {
n.Logger.Error("error creating relay client", "err", err)
continue
}

n.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParams))
n.RelayClient.Store(clients.RelayClient(relayClient))
case <-ctx.Done():
return ctx.Err()
}
Expand Down
Loading

0 comments on commit f3a9c52

Please sign in to comment.