diff --git a/.env b/.env index 87343e1..71c1871 100644 --- a/.env +++ b/.env @@ -68,6 +68,8 @@ PROXY_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-validator:8545,localhost:7 # otherwise, it falls back to the value in PROXY_BACKEND_HOST_URL_MAP PROXY_HEIGHT_BASED_ROUTING_ENABLED=true PROXY_PRUNING_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-pruning:8545,localhost:7778>http://kava-pruning:8545 +# PROXY_MAXIMUM_REQ_BATCH_SIZE is a proxy-enforced limit on the number of subrequest in a batch +PROXY_MAXIMUM_REQ_BATCH_SIZE=100 # Configuration for the servcie to connect to it's database DATABASE_NAME=postgres DATABASE_ENDPOINT_URL=postgres:5432 diff --git a/config/config.go b/config/config.go index 6d91452..b244ce4 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ type Config struct { EnableHeightBasedRouting bool ProxyPruningBackendHostURLMapRaw string ProxyPruningBackendHostURLMap map[string]url.URL + ProxyMaximumBatchSize int EvmQueryServiceURL string DatabaseName string DatabaseEndpointURL string @@ -64,6 +65,8 @@ const ( PROXY_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_BACKEND_HOST_URL_MAP" PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY = "PROXY_HEIGHT_BASED_ROUTING_ENABLED" PROXY_PRUNING_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_PRUNING_BACKEND_HOST_URL_MAP" + PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY = "PROXY_MAXIMUM_REQ_BATCH_SIZE" + DEFAULT_PROXY_MAXIMUM_BATCH_SIZE = 500 PROXY_SERVICE_PORT_ENVIRONMENT_KEY = "PROXY_SERVICE_PORT" DATABASE_NAME_ENVIRONMENT_KEY = "DATABASE_NAME" DATABASE_ENDPOINT_URL_ENVIRONMENT_KEY = "DATABASE_ENDPOINT_URL" @@ -279,6 +282,7 @@ func ReadConfig() Config { EnableHeightBasedRouting: EnvOrDefaultBool(PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY, false), ProxyPruningBackendHostURLMapRaw: rawProxyPruningBackendHostURLMap, ProxyPruningBackendHostURLMap: parsedProxyPruningBackendHostURLMap, + ProxyMaximumBatchSize: EnvOrDefaultInt(PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY, DEFAULT_PROXY_MAXIMUM_BATCH_SIZE), DatabaseName: os.Getenv(DATABASE_NAME_ENVIRONMENT_KEY), DatabaseEndpointURL: os.Getenv(DATABASE_ENDPOINT_URL_ENVIRONMENT_KEY), DatabaseUserName: os.Getenv(DATABASE_USERNAME_ENVIRONMENT_KEY), diff --git a/main_batch_test.go b/main_batch_test.go index 22d4b6e..f244215 100644 --- a/main_batch_test.go +++ b/main_batch_test.go @@ -3,17 +3,38 @@ package main_test import ( "bytes" "encoding/json" + "fmt" "io" "net/http" + "strconv" "testing" "time" + "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/service/cachemdw" "github.com/redis/go-redis/v9" "github.com/stretchr/testify/require" ) +func buildBigBatch(n int, sameBlock bool) []*decode.EVMRPCRequestEnvelope { + batch := make([]*decode.EVMRPCRequestEnvelope, 0, n) + // create n requests + for i := 0; i < n; i++ { + block := "0x1" + if !sameBlock { + block = fmt.Sprintf("0x%s", strconv.FormatInt(int64(i)+1, 16)) + } + batch = append(batch, &decode.EVMRPCRequestEnvelope{ + JSONRPCVersion: "2.0", + ID: i, + Method: "eth_getBlockByNumber", + Params: []interface{}{block, false}, + }) + } + return batch +} + func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { redisClient := redis.NewClient(&redis.Options{ Addr: redisURL, @@ -23,11 +44,17 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { cleanUpRedis(t, redisClient) expectKeysNum(t, redisClient, 0) + db, err := database.NewPostgresClient(databaseConfig) + require.NoError(t, err) + cleanMetricsDb(t, db) + // NOTE! ordering matters for these tests! earlier request responses may end up in the cache. testCases := []struct { name string req []*decode.EVMRPCRequestEnvelope expectedCacheHeader string + expectedErrStatus int + expectedNumMetrics int }{ { name: "first request, valid & not coming from the cache", @@ -40,6 +67,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { }, }, expectedCacheHeader: cachemdw.CacheMissHeaderValue, + expectedNumMetrics: 1, }, { name: "multiple requests, valid & none coming from the cache", @@ -58,6 +86,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { }, }, expectedCacheHeader: cachemdw.CacheMissHeaderValue, + expectedNumMetrics: 2, }, { name: "multiple requests, valid & some coming from the cache", @@ -76,6 +105,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { }, }, expectedCacheHeader: cachemdw.CachePartialHeaderValue, + expectedNumMetrics: 2, }, { name: "multiple requests, valid & all coming from the cache", @@ -90,21 +120,23 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { JSONRPCVersion: "2.0", ID: nil, Method: "eth_getBlockByNumber", - Params: []interface{}{"0x3", false}, + Params: []interface{}{"0x1", false}, }, { JSONRPCVersion: "2.0", ID: 123456, Method: "eth_getBlockByNumber", - Params: []interface{}{"0x4", false}, + Params: []interface{}{"0x3", false}, }, }, expectedCacheHeader: cachemdw.CacheHitHeaderValue, + expectedNumMetrics: 3, }, { name: "empty request", req: []*decode.EVMRPCRequestEnvelope{nil}, // <-- empty! expectedCacheHeader: cachemdw.CacheMissHeaderValue, + expectedNumMetrics: 0, }, { name: "empty & non-empty requests, partial cache hit", @@ -118,6 +150,7 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { }, }, expectedCacheHeader: cachemdw.CachePartialHeaderValue, + expectedNumMetrics: 1, }, { name: "empty & non-empty requests, cache miss", @@ -131,16 +164,42 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { }, }, expectedCacheHeader: cachemdw.CacheMissHeaderValue, + expectedNumMetrics: 1, + }, + { + name: "big-as-can-be batch, some cache hits", + req: buildBigBatch(proxyServiceMaxBatchSize, false), + expectedCacheHeader: cachemdw.CachePartialHeaderValue, + expectedNumMetrics: proxyServiceMaxBatchSize, + }, + { + name: "big-as-can-be batch, all cache hits", + req: buildBigBatch(proxyServiceMaxBatchSize, true), + expectedCacheHeader: cachemdw.CacheHitHeaderValue, + expectedNumMetrics: proxyServiceMaxBatchSize, + }, + { + name: "too-big batch => responds 413", + req: buildBigBatch(proxyServiceMaxBatchSize+1, false), + expectedCacheHeader: cachemdw.CacheHitHeaderValue, + expectedErrStatus: http.StatusRequestEntityTooLarge, + expectedNumMetrics: 0, }, } for _, tc := range testCases { + startTime := time.Now() t.Run(tc.name, func(t *testing.T) { reqInJSON, err := json.Marshal(tc.req) require.NoError(t, err) resp, err := http.Post(proxyServiceURL, "application/json", bytes.NewBuffer(reqInJSON)) require.NoError(t, err) + + if tc.expectedErrStatus != 0 { + require.Equal(t, tc.expectedErrStatus, resp.StatusCode, "unexpected response status") + return + } require.Equal(t, http.StatusOK, resp.StatusCode) body, err := io.ReadAll(resp.Body) @@ -169,10 +228,22 @@ func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { // verify CORS header require.Equal(t, resp.Header[accessControlAllowOriginHeaderName], []string{"*"}) - // sleep to let the cache catch up :) - time.Sleep(10 * time.Microsecond) + // wait for all metrics to be created. + // scale the timeout by the number of requests made during this test. + timeout := time.Duration((len(tc.req))+1) * 100 * time.Millisecond + // besides verification, waiting for the metrics ensures future tests don't fail b/c metrics are being processed + require.Eventually(t, func() bool { + numMetrics := len(findMetricsInWindowForMethods(db, startTime, []string{})) + return numMetrics >= tc.expectedNumMetrics + }, timeout, time.Millisecond, + fmt.Sprintf("failed to find %d metrics in %f seconds from start %s", tc.expectedNumMetrics, timeout.Seconds(), startTime)) }) } + + // clear all metrics & cache state to make future metrics tests less finicky + // (more data increases the read/write to db & redis, and these tests make many db & cache entries) + cleanMetricsDb(t, db) + cleanUpRedis(t, redisClient) } func TestE2ETest_BatchEvmRequestErrorHandling(t *testing.T) { diff --git a/main_test.go b/main_test.go index 8212b47..9959aaf 100644 --- a/main_test.go +++ b/main_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" "github.com/kava-labs/kava-proxy-service/service" @@ -62,6 +63,7 @@ var ( proxyServicePruningURL = os.Getenv("TEST_PROXY_SERVICE_EVM_RPC_PRUNING_URL") proxyServiceHeightBasedRouting, _ = strconv.ParseBool(os.Getenv("TEST_PROXY_HEIGHT_BASED_ROUTING_ENABLED")) + proxyServiceMaxBatchSize = config.EnvOrDefaultInt(config.PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY, config.DEFAULT_PROXY_MAXIMUM_BATCH_SIZE) databaseURL = os.Getenv("TEST_DATABASE_ENDPOINT_URL") databasePassword = os.Getenv("DATABASE_PASSWORD") @@ -84,6 +86,7 @@ var ( // lookup all the request metrics in the database paging as necessary // search for any request metrics between starTime and time.Now() for particular request methods +// if testedmethods is empty, all metrics in timeframe are returned. func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Time, testedmethods []string) []database.ProxiedRequestMetric { // on fast machines the expected metrics haven't finished being created by the time they are being queried. // hackily sleep for 10 milliseconds & then get current time @@ -113,7 +116,14 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti // iterate in reverse order to start checking the most recent request metrics first for i := len(proxiedRequestMetrics) - 1; i >= 0; i-- { requestMetric := proxiedRequestMetrics[i] - if requestMetric.RequestTime.After(startTime) && requestMetric.RequestTime.Before(endTime) { + isBetween := requestMetric.RequestTime.After(startTime) && requestMetric.RequestTime.Before(endTime) + if isBetween || requestMetric.RequestTime.Equal(startTime) || requestMetric.RequestTime.Equal(endTime) { + // collect all metrics if testedmethods = [] + if len(testedmethods) == 0 { + requestMetricsDuringRequestWindow = append(requestMetricsDuringRequestWindow, requestMetric) + continue + } + // collect metrics for any desired methods for _, testedMethod := range testedmethods { if requestMetric.MethodName == testedMethod { requestMetricsDuringRequestWindow = append(requestMetricsDuringRequestWindow, requestMetric) @@ -445,7 +455,6 @@ func TestE2ETest_HeightBasedRouting(t *testing.T) { metrics := findMetricsInWindowForMethods(databaseClient, startTime, []string{tc.method}) require.Len(t, metrics, 1) - fmt.Printf("%+v\n", metrics[0]) require.Equal(t, tc.method, metrics[0].MethodName) require.Equal(t, tc.expectRoute, metrics[0].ResponseBackend) }) @@ -1132,6 +1141,11 @@ func cleanUpRedis(t *testing.T, redisClient *redis.Client) { } } +func cleanMetricsDb(t *testing.T, db database.PostgresClient) { + _, err := db.Exec("TRUNCATE proxied_request_metrics;") + require.NoError(t, err) +} + func mkJsonRpcRequest(t *testing.T, proxyServiceURL string, id interface{}, method string, params []interface{}) *http.Response { req := newJsonRpcRequest(id, method, params) reqInJSON, err := json.Marshal(req) diff --git a/service/batchmdw/batchmdw.go b/service/batchmdw/batchmdw.go index b5ff27e..ed0a729 100644 --- a/service/batchmdw/batchmdw.go +++ b/service/batchmdw/batchmdw.go @@ -17,6 +17,7 @@ type BatchMiddlewareConfig struct { ContextKeyDecodedRequestBatch string ContextKeyDecodedRequestSingle string + MaximumBatchSize int } // CreateBatchProcessingMiddleware handles batch EVM requests @@ -38,6 +39,12 @@ func CreateBatchProcessingMiddleware( // if we can't get decoded request then assign it empty structure to avoid panics batchReq = []*decode.EVMRPCRequestEnvelope{} } + if len(batchReq) > config.MaximumBatchSize { + config.ServiceLogger.Debug().Int("size", len(batchReq)).Int("max allowed", config.MaximumBatchSize).Msg("request batch size too large") + w.WriteHeader(http.StatusRequestEntityTooLarge) + w.Write([]byte(fmt.Sprintf("request batch size is too large (%d>%d)", len(batchReq), config.MaximumBatchSize))) + return + } config.ServiceLogger.Trace().Any("batch", batchReq).Msg("[BatchProcessingMiddleware] process EVM batch request") diff --git a/service/service.go b/service/service.go index e853a03..7b41c6c 100644 --- a/service/service.go +++ b/service/service.go @@ -87,6 +87,7 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi ServiceLogger: serviceLogger, ContextKeyDecodedRequestBatch: DecodedBatchRequestContextKey, ContextKeyDecodedRequestSingle: DecodedRequestContextKey, + MaximumBatchSize: config.ProxyMaximumBatchSize, } batchProcessingMiddleware := batchmdw.CreateBatchProcessingMiddleware(cacheMiddleware, &batchMdwConfig)