Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Relay timeouts #918

Merged
merged 43 commits into from
Nov 25, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
43 commits
Select commit Hold shift + click to select a range
02e9069
Added rate limiting.
cody-littley Nov 15, 2024
dc39c21
Properly handle blob sizes.
cody-littley Nov 15, 2024
d99fd39
Incremental progress.
cody-littley Nov 15, 2024
e39eaf9
Incremental progress.
cody-littley Nov 15, 2024
876ec69
Merge branch 'master' into relay-rate-limits2
cody-littley Nov 18, 2024
9500f4f
unit tests
cody-littley Nov 18, 2024
ffe18c6
Unit tests.
cody-littley Nov 18, 2024
79d9614
Fix tests.
cody-littley Nov 18, 2024
c1d83e6
Cleanup.
cody-littley Nov 18, 2024
ed4daca
Added get chunks request hashing.
cody-littley Nov 18, 2024
3438b92
Start work on authenticator.
cody-littley Nov 18, 2024
b5dc37c
Fix test issue.
cody-littley Nov 18, 2024
b420cca
Cleanup
cody-littley Nov 18, 2024
517b78e
Merge branch 'master' into relay-rate-limits2
cody-littley Nov 19, 2024
7982aec
Convert config to flag pattern.
cody-littley Nov 19, 2024
366fdf7
Simplify rate limiter classes.
cody-littley Nov 19, 2024
f2c10e4
Made suggested changes.
cody-littley Nov 19, 2024
0f95ce4
Merge branch 'relay-rate-limits2' into relay-authentication
cody-littley Nov 19, 2024
c2bb9c3
Shorten package name.
cody-littley Nov 19, 2024
151305b
Started testing
cody-littley Nov 19, 2024
29fd940
Finished unit tests.
cody-littley Nov 19, 2024
3993af4
Nil authenticator test.
cody-littley Nov 19, 2024
d094b80
Test with authentication saving disabled.
cody-littley Nov 19, 2024
d8691e1
Tie together config.
cody-littley Nov 19, 2024
8b9512b
Add method for convenient signing.
cody-littley Nov 19, 2024
fb7ec51
Made requested changes.
cody-littley Nov 19, 2024
6325f0c
Merge branch 'master' into relay-authentication
cody-littley Nov 19, 2024
5f853e0
Revert unintentional changes.
cody-littley Nov 19, 2024
f22544c
Fix bug.
cody-littley Nov 20, 2024
7a26c27
Made requested changes.
cody-littley Nov 20, 2024
d21e0bc
Update proto documentation.
cody-littley Nov 20, 2024
98a8469
Add key caching.
cody-littley Nov 20, 2024
5948e73
lint
cody-littley Nov 20, 2024
2f2209c
Added sane timeouts for relay.
cody-littley Nov 20, 2024
4204ea0
Added moar timeouts.
cody-littley Nov 20, 2024
0892c9c
Add flags.
cody-littley Nov 20, 2024
e6f4e8e
Unit tests.
cody-littley Nov 20, 2024
a23b905
Merge branch 'master' into relay-timeouts
cody-littley Nov 21, 2024
3f06fff
fix bugs
cody-littley Nov 21, 2024
43d8c91
lint
cody-littley Nov 21, 2024
9969859
Made suggested changes.
cody-littley Nov 22, 2024
46b02e2
Made suggested change.
cody-littley Nov 25, 2024
b24f23d
Merge branch 'master' into relay-timeouts
cody-littley Nov 25, 2024
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
10 changes: 0 additions & 10 deletions common/aws/cli.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,6 @@ type ClientConfig struct {
// FragmentParallelismConstant helps determine the size of the pool of workers to help upload/download files.
// A non-zero value for this parameter adds a constant number of workers. Default is 0.
FragmentParallelismConstant int
// FragmentReadTimeout is used to bound the maximum time to wait for a single fragmented read.
// Default is 30 seconds.
FragmentReadTimeout time.Duration
// FragmentWriteTimeout is used to bound the maximum time to wait for a single fragmented write.
// Default is 30 seconds.
FragmentWriteTimeout time.Duration
}

func ClientFlags(envPrefix string, flagPrefix string) []cli.Flag {
Expand Down Expand Up @@ -120,8 +114,6 @@ func ReadClientConfig(ctx *cli.Context, flagPrefix string) ClientConfig {
EndpointURL: ctx.GlobalString(common.PrefixFlag(flagPrefix, EndpointURLFlagName)),
FragmentParallelismFactor: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismFactorFlagName)),
FragmentParallelismConstant: ctx.GlobalInt(common.PrefixFlag(flagPrefix, FragmentParallelismConstantFlagName)),
FragmentReadTimeout: ctx.GlobalDuration(common.PrefixFlag(flagPrefix, FragmentReadTimeoutFlagName)),
FragmentWriteTimeout: ctx.GlobalDuration(common.PrefixFlag(flagPrefix, FragmentWriteTimeoutFlagName)),
}
}

Expand All @@ -131,7 +123,5 @@ func DefaultClientConfig() *ClientConfig {
Region: "us-east-2",
FragmentParallelismFactor: 8,
FragmentParallelismConstant: 0,
FragmentReadTimeout: 30 * time.Second,
FragmentWriteTimeout: 30 * time.Second,
}
}
6 changes: 0 additions & 6 deletions common/aws/s3/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,6 @@ func (s *client) FragmentedUploadObject(
}
resultChannel := make(chan error, len(fragments))

ctx, cancel := context.WithTimeout(ctx, s.cfg.FragmentWriteTimeout)
defer cancel()

for _, fragment := range fragments {
fragmentCapture := fragment
s.concurrencyLimiter <- struct{}{}
Expand Down Expand Up @@ -301,9 +298,6 @@ func (s *client) FragmentedDownloadObject(
}
resultChannel := make(chan *readResult, len(fragmentKeys))

ctx, cancel := context.WithTimeout(ctx, s.cfg.FragmentWriteTimeout)
defer cancel()

for i, fragmentKey := range fragmentKeys {
boundFragmentKey := fragmentKey
boundI := i
Expand Down
15 changes: 9 additions & 6 deletions relay/auth/authenticator.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type RequestAuthenticator interface {
// The origin is the address of the peer that sent the request. This may be used to cache auth results
ian-shim marked this conversation as resolved.
Show resolved Hide resolved
// in order to save server resources.
AuthenticateGetChunksRequest(
ctx context.Context,
origin string,
request *pb.GetChunksRequest,
now time.Time) error
Expand Down Expand Up @@ -53,6 +54,7 @@ type requestAuthenticator struct {

// NewRequestAuthenticator creates a new RequestAuthenticator.
func NewRequestAuthenticator(
ctx context.Context,
ics core.IndexedChainState,
keyCacheSize int,
authenticationTimeoutDuration time.Duration) (RequestAuthenticator, error) {
Expand All @@ -70,20 +72,20 @@ func NewRequestAuthenticator(
keyCache: keyCache,
}

err = authenticator.preloadCache()
err = authenticator.preloadCache(ctx)
if err != nil {
return nil, fmt.Errorf("failed to preload cache: %w", err)
}

return authenticator, nil
}

func (a *requestAuthenticator) preloadCache() error {
func (a *requestAuthenticator) preloadCache(ctx context.Context) error {
blockNumber, err := a.ics.GetCurrentBlockNumber()
if err != nil {
return fmt.Errorf("failed to get current block number: %w", err)
}
operators, err := a.ics.GetIndexedOperators(context.Background(), blockNumber)
operators, err := a.ics.GetIndexedOperators(ctx, blockNumber)
if err != nil {
return fmt.Errorf("failed to get operators: %w", err)
}
Expand All @@ -96,6 +98,7 @@ func (a *requestAuthenticator) preloadCache() error {
}

func (a *requestAuthenticator) AuthenticateGetChunksRequest(
ctx context.Context,
origin string,
request *pb.GetChunksRequest,
now time.Time) error {
Expand All @@ -105,7 +108,7 @@ func (a *requestAuthenticator) AuthenticateGetChunksRequest(
return nil
}

key, err := a.getOperatorKey(core.OperatorID(request.OperatorId))
key, err := a.getOperatorKey(ctx, core.OperatorID(request.OperatorId))
if err != nil {
return fmt.Errorf("failed to get operator key: %w", err)
}
Expand All @@ -131,7 +134,7 @@ func (a *requestAuthenticator) AuthenticateGetChunksRequest(
}

// getOperatorKey returns the public key of the operator with the given ID, caching the result.
func (a *requestAuthenticator) getOperatorKey(operatorID core.OperatorID) (*core.G2Point, error) {
func (a *requestAuthenticator) getOperatorKey(ctx context.Context, operatorID core.OperatorID) (*core.G2Point, error) {
key, ok := a.keyCache.Get(operatorID)
if ok {
return key, nil
Expand All @@ -141,7 +144,7 @@ func (a *requestAuthenticator) getOperatorKey(operatorID core.OperatorID) (*core
if err != nil {
return nil, fmt.Errorf("failed to get current block number: %w", err)
}
operators, err := a.ics.GetIndexedOperators(context.Background(), blockNumber)
operators, err := a.ics.GetIndexedOperators(ctx, blockNumber)
if err != nil {
return nil, fmt.Errorf("failed to get operators: %w", err)
}
Expand Down
29 changes: 24 additions & 5 deletions relay/auth/authenticator_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,8 @@ import (
func TestMockSigning(t *testing.T) {
tu.InitializeRandom()

ctx := context.Background()

operatorID := mock.MakeOperatorId(0)
stakes := map[core.QuorumID]map[core.OperatorID]int{
core.QuorumID(0): {
Expand All @@ -24,7 +26,7 @@ func TestMockSigning(t *testing.T) {
ics, err := mock.NewChainDataMock(stakes)
require.NoError(t, err)

operators, err := ics.GetIndexedOperators(context.Background(), 0)
operators, err := ics.GetIndexedOperators(ctx, 0)
require.NoError(t, err)

operator, ok := operators[operatorID]
Expand All @@ -46,6 +48,8 @@ func TestMockSigning(t *testing.T) {
func TestValidRequest(t *testing.T) {
tu.InitializeRandom()

ctx := context.Background()

operatorID := mock.MakeOperatorId(0)
stakes := map[core.QuorumID]map[core.OperatorID]int{
core.QuorumID(0): {
Expand All @@ -58,7 +62,7 @@ func TestValidRequest(t *testing.T) {

timeout := 10 * time.Second

authenticator, err := NewRequestAuthenticator(ics, 1024, timeout)
authenticator, err := NewRequestAuthenticator(ctx, ics, 1024, timeout)
require.NoError(t, err)

request := randomGetChunksRequest()
Expand All @@ -69,6 +73,7 @@ func TestValidRequest(t *testing.T) {
now := time.Now()

err = authenticator.AuthenticateGetChunksRequest(
ctx,
"foobar",
request,
now)
Expand All @@ -83,12 +88,14 @@ func TestValidRequest(t *testing.T) {
start := now
for now.Before(start.Add(timeout)) {
err = authenticator.AuthenticateGetChunksRequest(
ctx,
"foobar",
invalidRequest,
now)
require.NoError(t, err)

err = authenticator.AuthenticateGetChunksRequest(
ctx,
"baz",
invalidRequest,
now)
Expand All @@ -99,6 +106,7 @@ func TestValidRequest(t *testing.T) {

// After the timeout elapses, new requests should trigger authentication.
err = authenticator.AuthenticateGetChunksRequest(
ctx,
"foobar",
invalidRequest,
now)
Expand All @@ -108,6 +116,8 @@ func TestValidRequest(t *testing.T) {
func TestAuthenticationSavingDisabled(t *testing.T) {
tu.InitializeRandom()

ctx := context.Background()

operatorID := mock.MakeOperatorId(0)
stakes := map[core.QuorumID]map[core.OperatorID]int{
core.QuorumID(0): {
Expand All @@ -121,7 +131,7 @@ func TestAuthenticationSavingDisabled(t *testing.T) {
// This disables saving of authentication results.
timeout := time.Duration(0)

authenticator, err := NewRequestAuthenticator(ics, 1024, timeout)
authenticator, err := NewRequestAuthenticator(ctx, ics, 1024, timeout)
require.NoError(t, err)

request := randomGetChunksRequest()
Expand All @@ -132,6 +142,7 @@ func TestAuthenticationSavingDisabled(t *testing.T) {
now := time.Now()

err = authenticator.AuthenticateGetChunksRequest(
ctx,
"foobar",
request,
now)
Expand All @@ -144,6 +155,7 @@ func TestAuthenticationSavingDisabled(t *testing.T) {
invalidRequest.OperatorSignature = signature // the previous signature is invalid here

err = authenticator.AuthenticateGetChunksRequest(
ctx,
"foobar",
invalidRequest,
now)
Expand All @@ -153,6 +165,8 @@ func TestAuthenticationSavingDisabled(t *testing.T) {
func TestNonExistingClient(t *testing.T) {
tu.InitializeRandom()

ctx := context.Background()

operatorID := mock.MakeOperatorId(0)
stakes := map[core.QuorumID]map[core.OperatorID]int{
core.QuorumID(0): {
Expand All @@ -165,7 +179,7 @@ func TestNonExistingClient(t *testing.T) {

timeout := 10 * time.Second

authenticator, err := NewRequestAuthenticator(ics, 1024, timeout)
authenticator, err := NewRequestAuthenticator(ctx, ics, 1024, timeout)
require.NoError(t, err)

invalidOperatorID := tu.RandomBytes(32)
Expand All @@ -174,6 +188,7 @@ func TestNonExistingClient(t *testing.T) {
request.OperatorId = invalidOperatorID

err = authenticator.AuthenticateGetChunksRequest(
ctx,
"foobar",
request,
time.Now())
Expand All @@ -183,6 +198,8 @@ func TestNonExistingClient(t *testing.T) {
func TestBadSignature(t *testing.T) {
tu.InitializeRandom()

ctx := context.Background()

operatorID := mock.MakeOperatorId(0)
stakes := map[core.QuorumID]map[core.OperatorID]int{
core.QuorumID(0): {
Expand All @@ -195,7 +212,7 @@ func TestBadSignature(t *testing.T) {

timeout := 10 * time.Second

authenticator, err := NewRequestAuthenticator(ics, 1024, timeout)
authenticator, err := NewRequestAuthenticator(ctx, ics, 1024, timeout)
require.NoError(t, err)

request := randomGetChunksRequest()
Expand All @@ -205,6 +222,7 @@ func TestBadSignature(t *testing.T) {
now := time.Now()

err = authenticator.AuthenticateGetChunksRequest(
ctx,
"foobar",
request,
now)
Expand All @@ -217,6 +235,7 @@ func TestBadSignature(t *testing.T) {
request.OperatorSignature[0] = request.OperatorSignature[0] ^ 1

err = authenticator.AuthenticateGetChunksRequest(
ctx,
"foobar",
request,
now)
Expand Down
24 changes: 16 additions & 8 deletions relay/blob_provider.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"github.com/Layr-Labs/eigenda/disperser/common/v2/blobstore"
"github.com/Layr-Labs/eigenda/relay/cache"
"github.com/Layr-Labs/eigensdk-go/logging"
"time"
)

// blobProvider encapsulates logic for fetching blobs. Utilized by the relay Server.
Expand All @@ -20,6 +21,9 @@ type blobProvider struct {

// blobCache is an LRU cache of blobs.
blobCache cache.CachedAccessor[v2.BlobKey, []byte]

ian-shim marked this conversation as resolved.
Show resolved Hide resolved
// fetchTimeout is the maximum time to wait for a blob fetch operation to complete.
fetchTimeout time.Duration
}

// newBlobProvider creates a new blobProvider.
Expand All @@ -28,12 +32,14 @@ func newBlobProvider(
logger logging.Logger,
blobStore *blobstore.BlobStore,
blobCacheSize int,
maxIOConcurrency int) (*blobProvider, error) {
maxIOConcurrency int,
fetchTimeout time.Duration) (*blobProvider, error) {

server := &blobProvider{
ctx: ctx,
logger: logger,
blobStore: blobStore,
ctx: ctx,
logger: logger,
blobStore: blobStore,
fetchTimeout: fetchTimeout,
}

c, err := cache.NewCachedAccessor[v2.BlobKey, []byte](blobCacheSize, maxIOConcurrency, server.fetchBlob)
Expand All @@ -46,9 +52,8 @@ func newBlobProvider(
}

// GetBlob retrieves a blob from the blob store.
func (s *blobProvider) GetBlob(blobKey v2.BlobKey) ([]byte, error) {

data, err := s.blobCache.Get(blobKey)
func (s *blobProvider) GetBlob(ctx context.Context, blobKey v2.BlobKey) ([]byte, error) {
data, err := s.blobCache.Get(ctx, blobKey)

if err != nil {
// It should not be possible for external users to force an error here since we won't
Expand All @@ -62,7 +67,10 @@ func (s *blobProvider) GetBlob(blobKey v2.BlobKey) ([]byte, error) {

// fetchBlob retrieves a single blob from the blob store.
func (s *blobProvider) fetchBlob(blobKey v2.BlobKey) ([]byte, error) {
data, err := s.blobStore.GetBlob(s.ctx, blobKey)
ctx, cancel := context.WithTimeout(s.ctx, s.fetchTimeout)
defer cancel()

data, err := s.blobStore.GetBlob(ctx, blobKey)
if err != nil {
s.logger.Errorf("Failed to fetch blob: %v", err)
return nil, err
Expand Down
Loading
Loading