Skip to content

Commit

Permalink
add ProxyMaximumBatchSize configuration
Browse files Browse the repository at this point in the history
proxy service responds 413 if it receives a request with >ProxyMaximumBatchSize sub-requests
  • Loading branch information
pirtleshell committed Feb 8, 2024
1 parent 94ec347 commit 220e764
Show file tree
Hide file tree
Showing 6 changed files with 105 additions and 6 deletions.
2 changes: 2 additions & 0 deletions .env
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,8 @@ PROXY_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-validator:8545,localhost:7
# otherwise, it falls back to the value in PROXY_BACKEND_HOST_URL_MAP
PROXY_HEIGHT_BASED_ROUTING_ENABLED=true
PROXY_PRUNING_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-pruning:8545,localhost:7778>http://kava-pruning:8545
# PROXY_MAXIMUM_REQ_BATCH_SIZE is a proxy-enforced limit on the number of subrequest in a batch
PROXY_MAXIMUM_REQ_BATCH_SIZE=100
# Configuration for the servcie to connect to it's database
DATABASE_NAME=postgres
DATABASE_ENDPOINT_URL=postgres:5432
Expand Down
4 changes: 4 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ type Config struct {
EnableHeightBasedRouting bool
ProxyPruningBackendHostURLMapRaw string
ProxyPruningBackendHostURLMap map[string]url.URL
ProxyMaximumBatchSize int
EvmQueryServiceURL string
DatabaseName string
DatabaseEndpointURL string
Expand Down Expand Up @@ -64,6 +65,8 @@ const (
PROXY_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_BACKEND_HOST_URL_MAP"
PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY = "PROXY_HEIGHT_BASED_ROUTING_ENABLED"
PROXY_PRUNING_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_PRUNING_BACKEND_HOST_URL_MAP"
PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY = "PROXY_MAXIMUM_REQ_BATCH_SIZE"
DEFAULT_PROXY_MAXIMUM_BATCH_SIZE = 500
PROXY_SERVICE_PORT_ENVIRONMENT_KEY = "PROXY_SERVICE_PORT"
DATABASE_NAME_ENVIRONMENT_KEY = "DATABASE_NAME"
DATABASE_ENDPOINT_URL_ENVIRONMENT_KEY = "DATABASE_ENDPOINT_URL"
Expand Down Expand Up @@ -279,6 +282,7 @@ func ReadConfig() Config {
EnableHeightBasedRouting: EnvOrDefaultBool(PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY, false),
ProxyPruningBackendHostURLMapRaw: rawProxyPruningBackendHostURLMap,
ProxyPruningBackendHostURLMap: parsedProxyPruningBackendHostURLMap,
ProxyMaximumBatchSize: EnvOrDefaultInt(PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY, DEFAULT_PROXY_MAXIMUM_BATCH_SIZE),
DatabaseName: os.Getenv(DATABASE_NAME_ENVIRONMENT_KEY),
DatabaseEndpointURL: os.Getenv(DATABASE_ENDPOINT_URL_ENVIRONMENT_KEY),
DatabaseUserName: os.Getenv(DATABASE_USERNAME_ENVIRONMENT_KEY),
Expand Down
79 changes: 75 additions & 4 deletions main_batch_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,17 +3,38 @@ package main_test
import (
"bytes"
"encoding/json"
"fmt"
"io"
"net/http"
"strconv"
"testing"
"time"

"github.com/kava-labs/kava-proxy-service/clients/database"
"github.com/kava-labs/kava-proxy-service/decode"
"github.com/kava-labs/kava-proxy-service/service/cachemdw"
"github.com/redis/go-redis/v9"
"github.com/stretchr/testify/require"
)

func buildBigBatch(n int, sameBlock bool) []*decode.EVMRPCRequestEnvelope {
batch := make([]*decode.EVMRPCRequestEnvelope, 0, n)
// create n requests
for i := 0; i < n; i++ {
block := "0x1"
if !sameBlock {
block = fmt.Sprintf("0x%s", strconv.FormatInt(int64(i)+1, 16))
}
batch = append(batch, &decode.EVMRPCRequestEnvelope{
JSONRPCVersion: "2.0",
ID: i,
Method: "eth_getBlockByNumber",
Params: []interface{}{block, false},
})
}
return batch
}

func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
redisClient := redis.NewClient(&redis.Options{
Addr: redisURL,
Expand All @@ -23,11 +44,17 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
cleanUpRedis(t, redisClient)
expectKeysNum(t, redisClient, 0)

db, err := database.NewPostgresClient(databaseConfig)
require.NoError(t, err)
cleanMetricsDb(t, db)

// NOTE! ordering matters for these tests! earlier request responses may end up in the cache.
testCases := []struct {
name string
req []*decode.EVMRPCRequestEnvelope
expectedCacheHeader string
expectedErrStatus int
expectedNumMetrics int
}{
{
name: "first request, valid & not coming from the cache",
Expand All @@ -40,6 +67,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
},
},
expectedCacheHeader: cachemdw.CacheMissHeaderValue,
expectedNumMetrics: 1,
},
{
name: "multiple requests, valid & none coming from the cache",
Expand All @@ -58,6 +86,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
},
},
expectedCacheHeader: cachemdw.CacheMissHeaderValue,
expectedNumMetrics: 2,
},
{
name: "multiple requests, valid & some coming from the cache",
Expand All @@ -76,6 +105,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
},
},
expectedCacheHeader: cachemdw.CachePartialHeaderValue,
expectedNumMetrics: 2,
},
{
name: "multiple requests, valid & all coming from the cache",
Expand All @@ -90,21 +120,23 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
JSONRPCVersion: "2.0",
ID: nil,
Method: "eth_getBlockByNumber",
Params: []interface{}{"0x3", false},
Params: []interface{}{"0x1", false},
},
{
JSONRPCVersion: "2.0",
ID: 123456,
Method: "eth_getBlockByNumber",
Params: []interface{}{"0x4", false},
Params: []interface{}{"0x3", false},
},
},
expectedCacheHeader: cachemdw.CacheHitHeaderValue,
expectedNumMetrics: 3,
},
{
name: "empty request",
req: []*decode.EVMRPCRequestEnvelope{nil}, // <-- empty!
expectedCacheHeader: cachemdw.CacheMissHeaderValue,
expectedNumMetrics: 0,
},
{
name: "empty & non-empty requests, partial cache hit",
Expand All @@ -118,6 +150,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
},
},
expectedCacheHeader: cachemdw.CachePartialHeaderValue,
expectedNumMetrics: 1,
},
{
name: "empty & non-empty requests, cache miss",
Expand All @@ -131,16 +164,42 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
},
},
expectedCacheHeader: cachemdw.CacheMissHeaderValue,
expectedNumMetrics: 1,
},
{
name: "big-as-can-be batch, some cache hits",
req: buildBigBatch(proxyServiceMaxBatchSize, false),
expectedCacheHeader: cachemdw.CachePartialHeaderValue,
expectedNumMetrics: proxyServiceMaxBatchSize,
},
{
name: "big-as-can-be batch, all cache hits",
req: buildBigBatch(proxyServiceMaxBatchSize, true),
expectedCacheHeader: cachemdw.CacheHitHeaderValue,
expectedNumMetrics: proxyServiceMaxBatchSize,
},
{
name: "too-big batch => responds 413",
req: buildBigBatch(proxyServiceMaxBatchSize+1, false),
expectedCacheHeader: cachemdw.CacheHitHeaderValue,
expectedErrStatus: http.StatusRequestEntityTooLarge,
expectedNumMetrics: 0,
},
}

for _, tc := range testCases {
startTime := time.Now()
t.Run(tc.name, func(t *testing.T) {
reqInJSON, err := json.Marshal(tc.req)
require.NoError(t, err)

resp, err := http.Post(proxyServiceURL, "application/json", bytes.NewBuffer(reqInJSON))
require.NoError(t, err)

if tc.expectedErrStatus != 0 {
require.Equal(t, tc.expectedErrStatus, resp.StatusCode, "unexpected response status")
return
}
require.Equal(t, http.StatusOK, resp.StatusCode)

body, err := io.ReadAll(resp.Body)
Expand Down Expand Up @@ -169,10 +228,22 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) {
// verify CORS header
require.Equal(t, resp.Header[accessControlAllowOriginHeaderName], []string{"*"})

// sleep to let the cache catch up :)
time.Sleep(10 * time.Microsecond)
// wait for all metrics to be created.
// scale the timeout by the number of requests made during this test.
timeout := time.Duration((len(tc.req))+1) * 100 * time.Millisecond
// besides verification, waiting for the metrics ensures future tests don't fail b/c metrics are being processed
require.Eventually(t, func() bool {
numMetrics := len(findMetricsInWindowForMethods(db, startTime, []string{}))
return numMetrics >= tc.expectedNumMetrics
}, timeout, time.Millisecond,
fmt.Sprintf("failed to find %d metrics in %f seconds from start %s", tc.expectedNumMetrics, timeout.Seconds(), startTime))
})
}

// clear all metrics & cache state to make future metrics tests less finicky
// (more data increases the read/write to db & redis, and these tests make many db & cache entries)
cleanMetricsDb(t, db)
cleanUpRedis(t, redisClient)
}

func TestE2ETest_BatchEvmRequestErrorHandling(t *testing.T) {
Expand Down
18 changes: 16 additions & 2 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ import (
"github.com/stretchr/testify/require"

"github.com/kava-labs/kava-proxy-service/clients/database"
"github.com/kava-labs/kava-proxy-service/config"
"github.com/kava-labs/kava-proxy-service/decode"
"github.com/kava-labs/kava-proxy-service/logging"
"github.com/kava-labs/kava-proxy-service/service"
Expand Down Expand Up @@ -62,6 +63,7 @@ var (
proxyServicePruningURL = os.Getenv("TEST_PROXY_SERVICE_EVM_RPC_PRUNING_URL")

proxyServiceHeightBasedRouting, _ = strconv.ParseBool(os.Getenv("TEST_PROXY_HEIGHT_BASED_ROUTING_ENABLED"))
proxyServiceMaxBatchSize = config.EnvOrDefaultInt(config.PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY, config.DEFAULT_PROXY_MAXIMUM_BATCH_SIZE)

databaseURL = os.Getenv("TEST_DATABASE_ENDPOINT_URL")
databasePassword = os.Getenv("DATABASE_PASSWORD")
Expand All @@ -84,6 +86,7 @@ var (

// lookup all the request metrics in the database paging as necessary
// search for any request metrics between starTime and time.Now() for particular request methods
// if testedmethods is empty, all metrics in timeframe are returned.
func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Time, testedmethods []string) []database.ProxiedRequestMetric {
// on fast machines the expected metrics haven't finished being created by the time they are being queried.
// hackily sleep for 10 milliseconds & then get current time
Expand Down Expand Up @@ -113,7 +116,14 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti
// iterate in reverse order to start checking the most recent request metrics first
for i := len(proxiedRequestMetrics) - 1; i >= 0; i-- {
requestMetric := proxiedRequestMetrics[i]
if requestMetric.RequestTime.After(startTime) && requestMetric.RequestTime.Before(endTime) {
isBetween := requestMetric.RequestTime.After(startTime) && requestMetric.RequestTime.Before(endTime)
if isBetween || requestMetric.RequestTime.Equal(startTime) || requestMetric.RequestTime.Equal(endTime) {
// collect all metrics if testedmethods = []
if len(testedmethods) == 0 {
requestMetricsDuringRequestWindow = append(requestMetricsDuringRequestWindow, requestMetric)
continue
}
// collect metrics for any desired methods
for _, testedMethod := range testedmethods {
if requestMetric.MethodName == testedMethod {
requestMetricsDuringRequestWindow = append(requestMetricsDuringRequestWindow, requestMetric)
Expand Down Expand Up @@ -445,7 +455,6 @@ func TestE2ETest_HeightBasedRouting(t *testing.T) {
metrics := findMetricsInWindowForMethods(databaseClient, startTime, []string{tc.method})

require.Len(t, metrics, 1)
fmt.Printf("%+v\n", metrics[0])
require.Equal(t, tc.method, metrics[0].MethodName)
require.Equal(t, tc.expectRoute, metrics[0].ResponseBackend)
})
Expand Down Expand Up @@ -1132,6 +1141,11 @@ func cleanUpRedis(t *testing.T, redisClient *redis.Client) {
}
}

func cleanMetricsDb(t *testing.T, db database.PostgresClient) {
_, err := db.Exec("TRUNCATE proxied_request_metrics;")
require.NoError(t, err)
}

func mkJsonRpcRequest(t *testing.T, proxyServiceURL string, id interface{}, method string, params []interface{}) *http.Response {
req := newJsonRpcRequest(id, method, params)
reqInJSON, err := json.Marshal(req)
Expand Down
7 changes: 7 additions & 0 deletions service/batchmdw/batchmdw.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ type BatchMiddlewareConfig struct {

ContextKeyDecodedRequestBatch string
ContextKeyDecodedRequestSingle string
MaximumBatchSize int
}

// CreateBatchProcessingMiddleware handles batch EVM requests
Expand All @@ -38,6 +39,12 @@ func CreateBatchProcessingMiddleware(
// if we can't get decoded request then assign it empty structure to avoid panics
batchReq = []*decode.EVMRPCRequestEnvelope{}
}
if len(batchReq) > config.MaximumBatchSize {
config.ServiceLogger.Debug().Int("size", len(batchReq)).Int("max allowed", config.MaximumBatchSize).Msg("request batch size too large")
w.WriteHeader(http.StatusRequestEntityTooLarge)
w.Write([]byte(fmt.Sprintf("request batch size is too large (%d>%d)", len(batchReq), config.MaximumBatchSize)))
return
}

config.ServiceLogger.Trace().Any("batch", batchReq).Msg("[BatchProcessingMiddleware] process EVM batch request")

Expand Down
1 change: 1 addition & 0 deletions service/service.go
Original file line number Diff line number Diff line change
Expand Up @@ -87,6 +87,7 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi
ServiceLogger: serviceLogger,
ContextKeyDecodedRequestBatch: DecodedBatchRequestContextKey,
ContextKeyDecodedRequestSingle: DecodedRequestContextKey,
MaximumBatchSize: config.ProxyMaximumBatchSize,
}
batchProcessingMiddleware := batchmdw.CreateBatchProcessingMiddleware(cacheMiddleware, &batchMdwConfig)

Expand Down

0 comments on commit 220e764

Please sign in to comment.