diff --git a/api/clients/mock/relay_client.go b/api/clients/mock/relay_client.go index b268a430f..e97e1e540 100644 --- a/api/clients/mock/relay_client.go +++ b/api/clients/mock/relay_client.go @@ -39,6 +39,15 @@ func (c *MockRelayClient) GetChunksByIndex(ctx context.Context, relayKey corev2. return args.Get(0).([][]byte), args.Error(1) } +func (c *MockRelayClient) GetSockets() map[corev2.RelayKey]string { + args := c.Called() + if args.Get(0) == nil { + return nil + } + + return args.Get(0).(map[corev2.RelayKey]string) +} + func (c *MockRelayClient) Close() error { args := c.Called() return args.Error(0) diff --git a/api/clients/relay_client.go b/api/clients/relay_client.go index 5ed6a0ead..f43d747ab 100644 --- a/api/clients/relay_client.go +++ b/api/clients/relay_client.go @@ -39,6 +39,8 @@ type RelayClient interface { // The returned slice has the same length and ordering as the input slice, and the i-th element is the bundle for the i-th request. // Each bundle is a sequence of frames in raw form (i.e., serialized core.Bundle bytearray). GetChunksByIndex(ctx context.Context, relayKey corev2.RelayKey, requests []*ChunkRequestByIndex) ([][]byte, error) + // GetSockets returns the relay sockets + GetSockets() map[corev2.RelayKey]string Close() error } @@ -65,6 +67,8 @@ func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayCli return nil, fmt.Errorf("invalid config: %v", config) } + logger.Info("creating relay client", "config", config) + initOnce := sync.Map{} for key := range config.Sockets { initOnce.Store(key, &sync.Once{}) @@ -73,7 +77,7 @@ func NewRelayClient(config *RelayClientConfig, logger logging.Logger) (*relayCli config: config, initOnce: &initOnce, - logger: logger, + logger: logger.With("component", "RelayClient"), }, nil } @@ -196,6 +200,10 @@ func (c *relayClient) initOnceGrpcConnection(key corev2.RelayKey) error { return initErr } +func (c *relayClient) GetSockets() map[corev2.RelayKey]string { + return c.config.Sockets +} + func (c *relayClient) Close() error { var errList *multierror.Error c.conns.Range(func(k, v interface{}) bool { diff --git a/common/read_only_map.go b/common/read_only_map.go index 3e6e09705..4469c4623 100644 --- a/common/read_only_map.go +++ b/common/read_only_map.go @@ -1,10 +1,14 @@ package common -type ReadOnlyMap[K comparable, V any] struct { +import ( + "maps" +) + +type ReadOnlyMap[K comparable, V comparable] struct { data map[K]V } -func NewReadOnlyMap[K comparable, V any](data map[K]V) *ReadOnlyMap[K, V] { +func NewReadOnlyMap[K comparable, V comparable](data map[K]V) *ReadOnlyMap[K, V] { return &ReadOnlyMap[K, V]{data: data} } @@ -24,3 +28,7 @@ func (m *ReadOnlyMap[K, V]) Keys() []K { func (m *ReadOnlyMap[K, V]) Len() int { return len(m.data) } + +func (m *ReadOnlyMap[K, V]) Equal(data map[K]V) bool { + return maps.Equal(m.data, data) +} diff --git a/core/chainio.go b/core/chainio.go index cb76848ef..e0f5e7db9 100644 --- a/core/chainio.go +++ b/core/chainio.go @@ -120,6 +120,12 @@ type Reader interface { // GetOnDemandPaymentByAccount returns on-demand payment of an account GetOnDemandPaymentByAccount(ctx context.Context, blockNumber uint32, accountID string) (OnDemandPayment, error) + + // GetRelayURL returns the relay URL address for the given key. + GetRelayURL(ctx context.Context, key uint16) (string, error) + + // GetRelayURLs returns the relay URL addresses for all relays. + GetRelayURLs(ctx context.Context) (map[uint16]string, error) } type Writer interface { diff --git a/core/eth/reader.go b/core/eth/reader.go index bf5aa5bb0..b22435468 100644 --- a/core/eth/reader.go +++ b/core/eth/reader.go @@ -12,6 +12,7 @@ import ( delegationmgr "github.com/Layr-Labs/eigenda/contracts/bindings/DelegationManager" eigendasrvmg "github.com/Layr-Labs/eigenda/contracts/bindings/EigenDAServiceManager" ejectionmg "github.com/Layr-Labs/eigenda/contracts/bindings/EjectionManager" + relayreg "github.com/Layr-Labs/eigenda/contracts/bindings/IEigenDARelayRegistry" indexreg "github.com/Layr-Labs/eigenda/contracts/bindings/IIndexRegistry" opstateretriever "github.com/Layr-Labs/eigenda/contracts/bindings/OperatorStateRetriever" regcoordinator "github.com/Layr-Labs/eigenda/contracts/bindings/RegistryCoordinator" @@ -39,6 +40,7 @@ type ContractBindings struct { EjectionManager *ejectionmg.ContractEjectionManager AVSDirectory *avsdir.ContractAVSDirectory SocketRegistry *socketreg.ContractSocketRegistry + RelayRegistry *relayreg.ContractIEigenDARelayRegistry } type Reader struct { @@ -178,6 +180,18 @@ func (t *Reader) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDASe return err } + var contractRelayRegistry *relayreg.ContractIEigenDARelayRegistry + relayRegistryAddr, err := contractEigenDAServiceManager.EigenDARelayRegistry(&bind.CallOpts{}) + if err != nil { + t.logger.Error("Failed to fetch IEigenDARelayRegistry contract", "err", err) + // TODO(ian-shim): return err when the contract is deployed + } else { + contractRelayRegistry, err = relayreg.NewContractIEigenDARelayRegistry(relayRegistryAddr, t.ethClient) + if err != nil { + t.logger.Error("Failed to fetch IEigenDARelayRegistry contract", "err", err) + } + } + t.bindings = &ContractBindings{ ServiceManagerAddr: eigenDAServiceManagerAddr, RegCoordinatorAddr: registryCoordinatorAddr, @@ -191,6 +205,7 @@ func (t *Reader) updateContractBindings(blsOperatorStateRetrieverAddr, eigenDASe StakeRegistry: contractStakeRegistry, EigenDAServiceManager: contractEigenDAServiceManager, DelegationManager: contractDelegationManager, + RelayRegistry: contractRelayRegistry, } return nil } @@ -689,3 +704,42 @@ func (t *Reader) GetOperatorSocket(ctx context.Context, operatorId core.Operator } return socket, nil } + +func (t *Reader) GetRelayURL(ctx context.Context, key uint16) (string, error) { + if t.bindings.RelayRegistry == nil { + return "", errors.New("relay registry not deployed") + } + + return t.bindings.RelayRegistry.GetRelayURL(&bind.CallOpts{ + Context: ctx, + }, uint32(key)) +} + +func (t *Reader) GetRelayURLs(ctx context.Context) (map[uint16]string, error) { + if t.bindings.RelayRegistry == nil { + return nil, errors.New("relay registry not deployed") + } + + res := make(map[uint16]string) + relayKey := uint16(0) + for { + url, err := t.bindings.RelayRegistry.GetRelayURL(&bind.CallOpts{ + Context: ctx, + }, uint32(relayKey)) + + if err != nil && strings.Contains(err.Error(), "execution reverted") { + break + } else if err != nil { + return nil, err + } + + res[relayKey] = url + relayKey++ + } + + if len(res) == 0 { + return nil, errors.New("no relay URLs found") + } + + return res, nil +} diff --git a/core/mock/writer.go b/core/mock/writer.go index c70db8fc0..bf729a555 100644 --- a/core/mock/writer.go +++ b/core/mock/writer.go @@ -209,6 +209,9 @@ func (t *MockWriter) GetVersionedBlobParams(ctx context.Context, blobVersion uin func (t *MockWriter) GetAllVersionedBlobParams(ctx context.Context) (map[uint8]*core.BlobVersionParameters, error) { args := t.Called() result := args.Get(0) + if result == nil { + return nil, args.Error(1) + } return result.(map[uint8]*core.BlobVersionParameters), args.Error(1) } @@ -247,3 +250,19 @@ func (t *MockWriter) GetOperatorSocket(ctx context.Context, operatorID core.Oper result := args.Get(0) return result.(string), args.Error(1) } + +func (t *MockWriter) GetRelayURL(ctx context.Context, key uint16) (string, error) { + args := t.Called() + result := args.Get(0) + return result.(string), args.Error(1) +} + +func (t *MockWriter) GetRelayURLs(ctx context.Context) (map[uint16]string, error) { + args := t.Called() + result := args.Get(0) + if result == nil { + return nil, args.Error(1) + } + + return result.(map[uint16]string), args.Error(1) +} diff --git a/core/v2/types.go b/core/v2/types.go index 796ab06b5..fedf3fcce 100644 --- a/core/v2/types.go +++ b/core/v2/types.go @@ -161,7 +161,7 @@ func (b *BlobHeader) GetEncodingParams(blobParams *core.BlobVersionParameters) ( }, nil } -type RelayKey uint16 +type RelayKey = uint16 type BlobCertificate struct { BlobHeader *BlobHeader diff --git a/disperser/controller/dispatcher.go b/disperser/controller/dispatcher.go index c5b58ca91..cc7b7da40 100644 --- a/disperser/controller/dispatcher.go +++ b/disperser/controller/dispatcher.go @@ -107,6 +107,7 @@ func (d *Dispatcher) Start(ctx context.Context) error { if err != nil { d.logger.Error("failed to handle signatures", "err", err) } + // TODO(ian-shim): handle errors and mark failed }() } } diff --git a/node/grpc/server_v2_test.go b/node/grpc/server_v2_test.go index 249bd6bb7..11ef1fce3 100644 --- a/node/grpc/server_v2_test.go +++ b/node/grpc/server_v2_test.go @@ -4,6 +4,7 @@ import ( "context" "errors" "os" + "sync/atomic" "testing" "github.com/Layr-Labs/eigenda/api/clients" @@ -68,6 +69,8 @@ func newTestComponents(t *testing.T, config *node.Config) *testComponents { s := nodemock.NewMockStoreV2() relay := clientsmock.NewRelayClient() + var atomicRelayClient atomic.Value + atomicRelayClient.Store(relay) node := &node.Node{ Config: config, Logger: logger, @@ -76,7 +79,7 @@ func newTestComponents(t *testing.T, config *node.Config) *testComponents { StoreV2: s, ChainState: chainState, ValidatorV2: val, - RelayClient: relay, + RelayClient: atomicRelayClient, } node.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParamsMap)) server := grpc.NewServerV2(config, node, logger, ratelimiter) diff --git a/node/node.go b/node/node.go index 07055e55c..62845f134 100644 --- a/node/node.go +++ b/node/node.go @@ -7,6 +7,7 @@ import ( "errors" "fmt" "io" + "maps" "math" "math/big" "net/http" @@ -78,7 +79,7 @@ type Node struct { ChainID *big.Int BLSSigner blssignerV1.SignerClient - RelayClient clients.RelayClient + RelayClient atomic.Value mu sync.Mutex CurrentSocket string @@ -223,8 +224,6 @@ func NewNode( "quorumIDs", fmt.Sprint(config.QuorumIDList), "registerNodeAtStart", config.RegisterNodeAtStart, "pubIPCheckInterval", config.PubIPCheckInterval, "eigenDAServiceManagerAddr", config.EigenDAServiceManagerAddr, "blockStaleMeasure", blockStaleMeasure, "storeDurationBlocks", storeDurationBlocks, "enableGnarkBundleEncoding", config.EnableGnarkBundleEncoding) - var relayClient clients.RelayClient - n := &Node{ Config: config, Logger: nodeLogger, @@ -239,7 +238,6 @@ func NewNode( PubIPProvider: pubIPProvider, OperatorSocketsFilterer: socketsFilterer, ChainID: chainID, - RelayClient: relayClient, BLSSigner: blsClient, } @@ -262,7 +260,8 @@ func NewNode( if err != nil { return nil, fmt.Errorf("failed to create new tablestore: %w", err) } - storeV2 = NewLevelDBStoreV2(dbV2, logger) + timeToExpire := (blockStaleMeasure + storeDurationBlocks) * 12 // 12s per block + storeV2 = NewLevelDBStoreV2(dbV2, logger, time.Duration(timeToExpire)*time.Second) blobParams, err := tx.GetAllVersionedBlobParams(context.Background()) if err != nil { @@ -270,7 +269,22 @@ func NewNode( } blobVersionParams = corev2.NewBlobVersionParameterMap(blobParams) - // TODO(ian-shim): Create a new relay client with relay addresses onchain + var relayClient clients.RelayClient + relayURLs, err := tx.GetRelayURLs(context.Background()) + if err != nil { + return nil, fmt.Errorf("failed to get relay URLs: %w", err) + } + + relayClient, err = clients.NewRelayClient(&clients.RelayClientConfig{ + Sockets: relayURLs, + UseSecureGrpcFlag: config.UseSecureGrpc, + }, logger) + + if err != nil { + return nil, fmt.Errorf("failed to create new relay client: %w", err) + } + + n.RelayClient.Store(relayClient) } n.StoreV2 = storeV2 @@ -395,6 +409,10 @@ func (n *Node) expireLoop() { } } +// RefreshOnchainState refreshes the onchain state of the node. +// It fetches the latest blob parameters from the chain and updates the BlobVersionParams. +// It runs periodically based on the OnchainStateRefreshInterval. +// WARNING: this method is not thread-safe and should not be called concurrently. func (n *Node) RefreshOnchainState(ctx context.Context) error { if !n.Config.EnableV2 || n.Config.OnchainStateRefreshInterval <= 0 { return nil @@ -406,13 +424,47 @@ func (n *Node) RefreshOnchainState(ctx context.Context) error { select { case <-ticker.C: n.Logger.Info("Refreshing onchain state") + existingBlobParams := n.BlobVersionParams.Load() blobParams, err := n.Transactor.GetAllVersionedBlobParams(ctx) - if err != nil { + if err == nil { + if existingBlobParams == nil || !existingBlobParams.Equal(blobParams) { + n.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParams)) + } + } else { n.Logger.Error("error fetching blob params", "err", err) + } + + existingRelayClient, ok := n.RelayClient.Load().(clients.RelayClient) + if !ok { + n.Logger.Error("error fetching relay client") + continue + } + + existingURLs := map[v2.RelayKey]string{} + if existingRelayClient != nil { + existingURLs = existingRelayClient.GetSockets() + } + relayURLs, err := n.Transactor.GetRelayURLs(ctx) + if err != nil { + n.Logger.Error("error fetching relay URLs", "err", err) + continue + } + + if maps.Equal(existingURLs, relayURLs) { + n.Logger.Info("No change in relay URLs") + continue + } + + relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{ + Sockets: relayURLs, + UseSecureGrpcFlag: n.Config.UseSecureGrpc, + }, n.Logger) + if err != nil { + n.Logger.Error("error creating relay client", "err", err) continue } - n.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParams)) + n.RelayClient.Store(clients.RelayClient(relayClient)) case <-ctx.Done(): return ctx.Err() } diff --git a/node/node_test.go b/node/node_test.go index e66a287b9..8000b2e2d 100644 --- a/node/node_test.go +++ b/node/node_test.go @@ -82,23 +82,21 @@ func newComponents(t *testing.T) *components { panic("failed to create a new levelDB store") } defer os.Remove(dbPath) - relayClient := clientsmock.NewRelayClient() n := &node.Node{ - Config: config, - Logger: logger, - KeyPair: keyPair, - Metrics: nil, - Store: store, - ChainState: chainState, - Validator: mockVal, - Transactor: tx, - RelayClient: relayClient, + Config: config, + Logger: logger, + KeyPair: keyPair, + Metrics: nil, + Store: store, + ChainState: chainState, + Validator: mockVal, + Transactor: tx, } n.BlobVersionParams.Store(v2.NewBlobVersionParameterMap(blobParamsMap)) return &components{ node: n, tx: tx, - relayClient: relayClient, + relayClient: clientsmock.NewRelayClient(), } } diff --git a/node/node_v2.go b/node/node_v2.go index c37c4c464..af9e9cf6e 100644 --- a/node/node_v2.go +++ b/node/node_v2.go @@ -34,7 +34,8 @@ type RawBundles struct { } func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch, operatorState *core.OperatorState) ([]*corev2.BlobShard, []*RawBundles, error) { - if n.RelayClient == nil { + relayClient, ok := n.RelayClient.Load().(clients.RelayClient) + if !ok || relayClient == nil { return nil, nil, fmt.Errorf("relay client is not set") } @@ -102,7 +103,7 @@ func (n *Node) DownloadBundles(ctx context.Context, batch *corev2.Batch, operato relayKey := relayKey req := requests[relayKey] pool.Submit(func() { - bundles, err := n.RelayClient.GetChunksByRange(ctx, relayKey, req.chunkRequests) + bundles, err := relayClient.GetChunksByRange(ctx, relayKey, req.chunkRequests) if err != nil { n.Logger.Errorf("failed to get chunks from relays: %v", err) bundleChan <- response{ diff --git a/node/node_v2_test.go b/node/node_v2_test.go index cc5655043..14e1b0259 100644 --- a/node/node_v2_test.go +++ b/node/node_v2_test.go @@ -11,12 +11,14 @@ import ( v2 "github.com/Layr-Labs/eigenda/core/v2" "github.com/Layr-Labs/eigenda/encoding" nodemock "github.com/Layr-Labs/eigenda/node/mock" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/mock" "github.com/stretchr/testify/require" ) func TestDownloadBundles(t *testing.T) { c := newComponents(t) + c.node.RelayClient.Store(c.relayClient) ctx := context.Background() blobKeys, batch, bundles := nodemock.MockBatch(t) blobCerts := batch.BlobCertificates @@ -89,6 +91,7 @@ func TestDownloadBundles(t *testing.T) { func TestDownloadBundlesFail(t *testing.T) { c := newComponents(t) + c.node.RelayClient.Store(c.relayClient) ctx := context.Background() blobKeys, batch, bundles := nodemock.MockBatch(t) @@ -123,9 +126,10 @@ func TestDownloadBundlesFail(t *testing.T) { require.Nil(t, rawBundles) } -func TestRefreshOnchainState(t *testing.T) { +func TestRefreshOnchainStateFailure(t *testing.T) { c := newComponents(t) c.node.Config.EnableV2 = true + c.node.RelayClient.Store(c.relayClient) c.node.Config.OnchainStateRefreshInterval = time.Millisecond ctx := context.Background() bp, ok := c.node.BlobVersionParams.Load().Get(0) @@ -133,9 +137,67 @@ func TestRefreshOnchainState(t *testing.T) { require.Equal(t, bp, blobParams) _, ok = c.node.BlobVersionParams.Load().Get(1) require.False(t, ok) + relayClient, ok := c.node.RelayClient.Load().(clients.RelayClient) + require.True(t, ok) + require.NotNil(t, relayClient) + + // Both updates fail + newCtx, cancel := context.WithTimeout(ctx, c.node.Config.OnchainStateRefreshInterval*2) + defer cancel() + + c.tx.On("GetAllVersionedBlobParams", mock.Anything).Return(nil, assert.AnError) + c.relayClient.On("GetSockets").Return(nil) + c.tx.On("GetRelayURLs", mock.Anything).Return(nil, assert.AnError) + err := c.node.RefreshOnchainState(newCtx) + require.ErrorIs(t, err, context.DeadlineExceeded) + bp, ok = c.node.BlobVersionParams.Load().Get(0) + require.True(t, ok) + require.Equal(t, bp, blobParams) + _, ok = c.node.BlobVersionParams.Load().Get(1) + require.False(t, ok) + newRelayClient := c.node.RelayClient.Load().(clients.RelayClient) + require.Same(t, relayClient, newRelayClient) + + // Same relay URLs shouldn't trigger update + newCtx1, cancel1 := context.WithTimeout(ctx, c.node.Config.OnchainStateRefreshInterval*2) + defer cancel1() + + c.tx.On("GetAllVersionedBlobParams", mock.Anything).Return(nil, assert.AnError) + relayURLs := map[v2.RelayKey]string{ + 0: "http://localhost:8080", + } + c.relayClient.On("GetSockets").Return(relayURLs).Once() + c.tx.On("GetRelayURLs", mock.Anything).Return(relayURLs, nil) + err = c.node.RefreshOnchainState(newCtx1) + require.ErrorIs(t, err, context.DeadlineExceeded) + newRelayClient = c.node.RelayClient.Load().(clients.RelayClient) + require.Same(t, relayClient, newRelayClient) +} + +func TestRefreshOnchainStateSuccess(t *testing.T) { + c := newComponents(t) + c.node.Config.EnableV2 = true + c.node.Config.OnchainStateRefreshInterval = time.Millisecond + relayURLs := map[v2.RelayKey]string{ + 0: "http://localhost:8080", + } + relayClient, err := clients.NewRelayClient(&clients.RelayClientConfig{ + Sockets: relayURLs, + }, c.node.Logger) + require.NoError(t, err) + // set up non-mock client + c.node.RelayClient.Store(relayClient) + ctx := context.Background() + bp, ok := c.node.BlobVersionParams.Load().Get(0) + require.True(t, ok) + require.Equal(t, bp, blobParams) + _, ok = c.node.BlobVersionParams.Load().Get(1) + require.False(t, ok) + // Blob params updated successfully newCtx, cancel := context.WithTimeout(ctx, c.node.Config.OnchainStateRefreshInterval*2) defer cancel() + blobParams2 := &core.BlobVersionParameters{ NumChunks: 111, CodingRate: 1, @@ -145,7 +207,12 @@ func TestRefreshOnchainState(t *testing.T) { 0: blobParams, 1: blobParams2, }, nil) - err := c.node.RefreshOnchainState(newCtx) + newRelayURLs := map[v2.RelayKey]string{ + 1: "http://localhost:8081", + 2: "http://localhost:8082", + } + c.tx.On("GetRelayURLs", mock.Anything).Return(newRelayURLs, nil) + err = c.node.RefreshOnchainState(newCtx) require.ErrorIs(t, err, context.DeadlineExceeded) bp, ok = c.node.BlobVersionParams.Load().Get(0) require.True(t, ok) @@ -153,6 +220,9 @@ func TestRefreshOnchainState(t *testing.T) { bp, ok = c.node.BlobVersionParams.Load().Get(1) require.True(t, ok) require.Equal(t, bp, blobParams2) + newRelayClient := c.node.RelayClient.Load().(clients.RelayClient) + require.NotSame(t, relayClient, newRelayClient) + require.Equal(t, newRelayURLs, newRelayClient.GetSockets()) } func bundleEqual(t *testing.T, expected, actual core.Bundle) { diff --git a/node/store_v2.go b/node/store_v2.go index 62da00f54..7b435adeb 100644 --- a/node/store_v2.go +++ b/node/store_v2.go @@ -33,10 +33,12 @@ type storeV2 struct { var _ StoreV2 = &storeV2{} -func NewLevelDBStoreV2(db kvstore.TableStore, logger logging.Logger) *storeV2 { +func NewLevelDBStoreV2(db kvstore.TableStore, logger logging.Logger, ttl time.Duration) *storeV2 { return &storeV2{ db: db, logger: logger, + + ttl: ttl, } } diff --git a/node/store_v2_test.go b/node/store_v2_test.go index 2a78c8080..50fc74d4d 100644 --- a/node/store_v2_test.go +++ b/node/store_v2_test.go @@ -2,6 +2,7 @@ package node_test import ( "testing" + "time" "github.com/Layr-Labs/eigenda/common/kvstore" "github.com/Layr-Labs/eigenda/common/kvstore/tablestore" @@ -189,6 +190,6 @@ func createStoreV2(t *testing.T) (node.StoreV2, kvstore.TableStore) { config.Schema = []string{node.BatchHeaderTableName, node.BlobCertificateTableName, node.BundleTableName} tStore, err := tablestore.Start(logger, config) require.NoError(t, err) - s := node.NewLevelDBStoreV2(tStore, logger) + s := node.NewLevelDBStoreV2(tStore, logger, 10*time.Second) return s, tStore }