Skip to content

Commit

Permalink
Add eth accounts to allowlist (Layr-Labs#175)
Browse files Browse the repository at this point in the history
  • Loading branch information
mooselumph authored Jan 17, 2024
1 parent 1355bd9 commit 669be3d
Show file tree
Hide file tree
Showing 7 changed files with 401 additions and 121 deletions.
60 changes: 60 additions & 0 deletions api/grpc/mock/disperser.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
package mock

import (
"context"
"errors"

"github.com/Layr-Labs/eigenda/api/grpc/disperser"

"google.golang.org/grpc"
)

func MakeStreamMock(ctx context.Context) *StreamMock {
return &StreamMock{
ctx: ctx,
recvToServer: make(chan *disperser.AuthenticatedRequest, 10),
sentFromServer: make(chan *disperser.AuthenticatedReply, 10),
}
}

type StreamMock struct {
grpc.ServerStream
ctx context.Context
recvToServer chan *disperser.AuthenticatedRequest
sentFromServer chan *disperser.AuthenticatedReply
}

func (m *StreamMock) Context() context.Context {
return m.ctx
}

func (m *StreamMock) Send(resp *disperser.AuthenticatedReply) error {
m.sentFromServer <- resp
return nil
}

func (m *StreamMock) Recv() (*disperser.AuthenticatedRequest, error) {
req, more := <-m.recvToServer
if !more {
return nil, errors.New("empty")
}
return req, nil
}

func (m *StreamMock) SendFromClient(req *disperser.AuthenticatedRequest) error {
m.recvToServer <- req
return nil
}

func (m *StreamMock) RecvToClient() (*disperser.AuthenticatedReply, error) {
response, more := <-m.sentFromServer
if !more {
return nil, errors.New("empty")
}
return response, nil
}

func (m *StreamMock) Close() {
close(m.recvToServer)
close(m.sentFromServer)
}
2 changes: 2 additions & 0 deletions api/proto/disperser/disperser.proto
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,8 @@ message DisperseBlobRequest {
// within the same batch.
repeated SecurityParams security_params = 2;

// The account ID of the client. This should be a hex-encoded string of the ECSDA public key
// corresponding to the key used by the client to sign the BlobAuthHeader.
string account_id = 3;
}

Expand Down
4 changes: 4 additions & 0 deletions common/ratelimit/limiter.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,9 +53,13 @@ func (d *rateLimiter) AllowRequest(ctx context.Context, requesterID common.Reque
// Determine bucket deduction
deduction := time.Microsecond * time.Duration(1e6*float32(blobSize)/float32(rate)/d.globalRateParams.Multipliers[i])

prevLevel := bucketParams.BucketLevels[i]

// Update the bucket level
bucketParams.BucketLevels[i] = getBucketLevel(bucketParams.BucketLevels[i], size, interval, deduction)
allowed = allowed && bucketParams.BucketLevels[i] > 0

d.logger.Debug("Bucket level", "key", requesterID, "prevLevel", prevLevel, "level", bucketParams.BucketLevels[i], "size", size, "interval", interval, "deduction", deduction, "allowed", allowed)
}

// Update the bucket based on blob size and current rate
Expand Down
2 changes: 1 addition & 1 deletion disperser/apiserver/rate_config.go
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ func CLIFlags(envPrefix string) []cli.Flag {
},
cli.StringSliceFlag{
Name: AllowlistFlagName,
Usage: "Allowlist of IPs and corresponding blob/byte rates to bypass rate limiting. Format: <IP>:<quorum ID>:<blob rate>:<byte rate>. Example: 127.0.0.1:0:10:10485760",
Usage: "Allowlist of IPs or ethereum addresses (including initial \"0x\") and corresponding blob/byte rates to bypass rate limiting. Format: [<IP>||<ETH ADDRESS>]:<quorum ID>:<blob rate>:<byte rate>. Example: 127.0.0.1:0:10:10485760",
EnvVar: common.PrefixEnvVar(envPrefix, "ALLOWLIST"),
Required: false,
Value: &cli.StringSlice{},
Expand Down
253 changes: 253 additions & 0 deletions disperser/apiserver/ratelimit_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,253 @@
package apiserver_test

import (
"context"
"crypto/rand"
"net"
"strings"
"testing"

"github.com/Layr-Labs/eigenda/api/grpc/disperser"
pb "github.com/Layr-Labs/eigenda/api/grpc/disperser"
"github.com/Layr-Labs/eigenda/api/grpc/mock"
"github.com/Layr-Labs/eigenda/core"
"github.com/Layr-Labs/eigenda/core/auth"
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/peer"
)

func TestRatelimit(t *testing.T) {
data50KiB := make([]byte, 50*1024)
_, err := rand.Read(data50KiB)
assert.NoError(t, err)
data1KiB := make([]byte, 1024)
_, err = rand.Read(data1KiB)
assert.NoError(t, err)

// Try with a non-allowlisted IP
p := &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP("1.1.1.1"),
Port: 51001,
},
}
ctx := peer.NewContext(context.Background(), p)

// Try with non-allowlisted IP
// Should fail with account throughput limit because unauth throughput limit is 20 KiB/s for quorum 0
_, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{
Data: data50KiB,
SecurityParams: []*pb.SecurityParams{
{
QuorumId: 0,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
},
})
assert.ErrorContains(t, err, "account throughput limit")

// Try with non-allowlisted IP. Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 20 blobs.
numLimited := 0
for i := 0; i < 20; i++ {
_, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{
Data: data1KiB,
SecurityParams: []*pb.SecurityParams{
{
QuorumId: 1,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
},
})
if err != nil && strings.Contains(err.Error(), "account blob limit") {
numLimited++
}
}
assert.Greater(t, numLimited, 0)

// Now try with an allowlisted IP
// This should succeed because the account throughput limit is 100 KiB/s for quorum 0
p = &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP("1.2.3.4"),
Port: 51001,
},
}
ctx = peer.NewContext(context.Background(), p)

_, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{
Data: data50KiB,
SecurityParams: []*pb.SecurityParams{
{
QuorumId: 0,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
},
})
assert.NoError(t, err)

// This should succeed because the account blob limit (5 blobs/s) X bucket size (3s) is larger than 10 blobs.
for i := 0; i < 10; i++ {
_, err = dispersalServer.DisperseBlob(ctx, &pb.DisperseBlobRequest{
Data: data1KiB,
SecurityParams: []*pb.SecurityParams{
{
QuorumId: 1,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
},
})
assert.NoError(t, err)
}
}

func TestAuthRatelimit(t *testing.T) {

data50KiB := make([]byte, 50*1024)
_, err := rand.Read(data50KiB)
assert.NoError(t, err)
data1KiB := make([]byte, 1024)
_, err = rand.Read(data1KiB)
assert.NoError(t, err)

// Use an unauthenticated signer
privateKeyHex := "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcdeb"
signer := auth.NewSigner(privateKeyHex)

errorChan := make(chan error, 10)

// Should fail with account throughput limit because unauth throughput limit is 20 KiB/s for quorum 0
simulateClient(t, signer, "2.2.2.2", data50KiB, []*pb.SecurityParams{
{
QuorumId: 0,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
}, errorChan, false)

err = <-errorChan
assert.ErrorContains(t, err, "account throughput limit")

// Should fail with account blob limit because blob rate (3 blobs/s) X bucket size (3s) is smaller than 10 blobs.
for i := 0; i < 20; i++ {
simulateClient(t, signer, "3.3.3.3", data1KiB, []*pb.SecurityParams{
{
QuorumId: 1,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
}, errorChan, false)
}
numLimited := 0
for i := 0; i < 20; i++ {
err = <-errorChan
if err != nil && strings.Contains(err.Error(), "account blob limit") {
numLimited++
}
}
assert.Greater(t, numLimited, 0)

// Use an authenticated signer
privateKeyHex = "0x0123456789abcdef0123456789abcdef0123456789abcdef0123456789abcded"
signer = auth.NewSigner(privateKeyHex)

// This should succeed because the account throughput limit is 100 KiB/s for quorum 0
simulateClient(t, signer, "4.4.4.4", data50KiB, []*pb.SecurityParams{
{
QuorumId: 0,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
}, errorChan, false)

err = <-errorChan
assert.NoError(t, err)

// This should succeed because the account blob limit (5 blobs/s) X bucket size (3s) is larger than 10 blobs.
for i := 0; i < 10; i++ {
simulateClient(t, signer, "5.5.5.5", data1KiB, []*pb.SecurityParams{
{
QuorumId: 1,
AdversaryThreshold: 50,
QuorumThreshold: 100,
},
}, errorChan, false)
}
numLimited = 0
for i := 0; i < 10; i++ {
err = <-errorChan
if err != nil && strings.Contains(err.Error(), "account blob limit") {
numLimited++
}
}
assert.Equal(t, numLimited, 0)

}

func simulateClient(t *testing.T, signer core.BlobRequestSigner, origin string, data []byte, params []*pb.SecurityParams, errorChan chan error, shouldSucceed bool) {

p := &peer.Peer{
Addr: &net.TCPAddr{
IP: net.ParseIP(origin),
Port: 51001,
},
}
ctx := peer.NewContext(context.Background(), p)
stream := mock.MakeStreamMock(ctx)

go func() {
err := dispersalServer.DisperseBlobAuthenticated(stream)
errorChan <- err
stream.Close()
}()

err := stream.SendFromClient(&pb.AuthenticatedRequest{
Payload: &pb.AuthenticatedRequest_DisperseRequest{
DisperseRequest: &pb.DisperseBlobRequest{
Data: data,
SecurityParams: params,
AccountId: signer.GetAccountID(),
},
},
})
assert.NoError(t, err)

reply, err := stream.RecvToClient()
assert.NoError(t, err)

authHeaderReply, ok := reply.Payload.(*disperser.AuthenticatedReply_BlobAuthHeader)
assert.True(t, ok)

authHeader := core.BlobAuthHeader{
BlobCommitments: core.BlobCommitments{},
AccountID: "",
Nonce: authHeaderReply.BlobAuthHeader.ChallengeParameter,
}

authData, err := signer.SignBlobRequest(authHeader)
assert.NoError(t, err)

// Process challenge and send back challenge_reply
err = stream.SendFromClient(&disperser.AuthenticatedRequest{Payload: &disperser.AuthenticatedRequest_AuthenticationData{
AuthenticationData: &disperser.AuthenticationData{
AuthenticationData: authData,
},
}})
assert.NoError(t, err)

if shouldSucceed {

reply, err = stream.RecvToClient()
assert.NoError(t, err)

disperseReply, ok := reply.Payload.(*disperser.AuthenticatedReply_DisperseReply)
assert.True(t, ok)

assert.Equal(t, disperseReply.DisperseReply.Result, disperser.BlobStatus_PROCESSING)

}

}
Loading

0 comments on commit 669be3d

Please sign in to comment.