diff --git a/.env b/.env index 6683e25..71c1871 100644 --- a/.env +++ b/.env @@ -33,6 +33,8 @@ PROXY_CONTAINER_DEBUG_PORT=2345 PROXY_HOST_DEBUG_PORT=2345 ##### E2E Testing Config +TEST_UNCONFIGURED_PROXY_PORT=7779 +TEST_UNCONFIGURED_PROXY_URL=http://localhost:7779 TEST_PROXY_SERVICE_EVM_RPC_URL=http://localhost:7777 TEST_PROXY_SERVICE_EVM_RPC_HOSTNAME=localhost:7777 TEST_PROXY_SERVICE_EVM_RPC_PRUNING_URL=http://localhost:7778 @@ -56,7 +58,7 @@ TEST_REDIS_ENDPOINT_URL=localhost:6379 ##### Kava Proxy Config # What port the proxy service listens on PROXY_SERVICE_PORT=7777 -LOG_LEVEL=TRACE +LOG_LEVEL=DEBUG HTTP_READ_TIMEOUT_SECONDS=30 HTTP_WRITE_TIMEOUT_SECONDS=60 # Address of the origin server to proxy all requests to @@ -66,6 +68,8 @@ PROXY_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-validator:8545,localhost:7 # otherwise, it falls back to the value in PROXY_BACKEND_HOST_URL_MAP PROXY_HEIGHT_BASED_ROUTING_ENABLED=true PROXY_PRUNING_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-pruning:8545,localhost:7778>http://kava-pruning:8545 +# PROXY_MAXIMUM_REQ_BATCH_SIZE is a proxy-enforced limit on the number of subrequest in a batch +PROXY_MAXIMUM_REQ_BATCH_SIZE=100 # Configuration for the servcie to connect to it's database DATABASE_NAME=postgres DATABASE_ENDPOINT_URL=postgres:5432 diff --git a/architecture/MIDDLEWARE.MD b/architecture/MIDDLEWARE.MD index 034d1cf..ec5389b 100644 --- a/architecture/MIDDLEWARE.MD +++ b/architecture/MIDDLEWARE.MD @@ -20,49 +20,51 @@ Any modifications that the middleware function makes to the request or response The earlier the middleware is instantiated, the later it will run. For example the first middleware created by the proxy service is the middleware that will run after the request has been logged and proxied, thereby allowing it to access both the recorded request body and response body, and any context enrichment added by prior middleware. -```golang -service := ProxyService{} +https://github.com/Kava-Labs/kava-proxy-service/blob/847d7889bf5f37770d373d73cd4600a769ebd29c/service/service.go#L54-L110 -// create an http router for registering handlers for a given route -mux := http.NewServeMux() +## Middleware -// will run after the proxy middleware handler and is -// the final function called after all other middleware -// allowing it to access values added to the request context -// to do things like metric the response and cache the response -afterProxyFinalizer := createAfterProxyFinalizer(&service) +The middleware sequence of EVM requests to the proxy service: +![Middleware Sequence for Proxy Service](./images/proxy_service_middleware_sequence.jpg) -// create an http handler that will proxy any request to the specified URL -proxyMiddleware := createProxyRequestMiddleware(afterProxyFinalizer, config, serviceLogger) +### Decode Request Middleware -// create an http handler that will log the request to stdout -// this handler will run before the proxyMiddleware handler -requestLoggingMiddleware := createRequestLoggingMiddleware(proxyMiddleware, serviceLogger) +1. Captures start time of request for latency metrics calculations +1. Attempts to decode the request: + * As a single EVM request. If successful, forwards to Single Request Middleware Sequence with the request in the context. + * As a batch EVM request. If successful, forwards to Batch Processing Middleware with batch in context. + * On failure to decode, the request is sent down the Single Request Middleware Sequence, but with nothing in the context. -// register middleware chain as the default handler for any request to the proxy service -mux.HandleFunc("/", requestLoggingMiddleware) +### Single Request Middleware Sequence +If a single request is decoded (as opposed to a batch list), or the request fails to decode, it is forwarded down this middleware sequence. Additionally, each individual sub-request of a batch is routed through this sequence in order to leverage caching and metrics collection. -// create an http server for the caller to start on demand with a call to ProxyService.Run() -server := &http.Server{ - Addr: fmt.Sprintf(":%s", config.ProxyServicePort), - Handler: mux, -} -``` +This middleware sequence uses the decoded single request from the request context. -## Middleware +#### IsCached Middleware +The front part of the two-part caching middleware. Responsible for determining if an incoming request can be fielded from the cache. If it can, the cached response is put into the context. -### Request Logging Middleware +See [CACHING](./CACHING.md#iscachedmiddleware) for more details. -1. Logs the request body to stdout and stores a parsed version of the request body in the context key `X-KAVA-PROXY-DECODED-REQUEST-BODY` for use by other middleware. - -### Proxy Middleware +#### Proxy Middleware 1. Proxies the request to the configured backend origin server. +2. Times the roundtrip latency for the response from the backend origin server and stores the latency in the context key `X-KAVA-PROXY-ORIGIN-ROUNDTRIP-LATENCY-MILLISECONDS` for use by other middleware. -1. Times the roundtrip latency for the response from the backend origin server and stores the latency in the context key `X-KAVA-PROXY-ORIGIN-ROUNDTRIP-LATENCY-MILLISECONDS` for use by other middleware. +The Proxy middleware is responsible for writing the response to the requestor. Subsequent middlewares are non-blocking to the response. See [Proxy Routing](./PROXY_ROUTING.md) for details on configuration and how requests are routed. -### After Proxy Middleware +#### Caching Middleware +Handles determining if a response can be put in the cache if it isn't already. + +See [CACHING](./CACHING.md#cachingmiddleware) for more details. + +#### After Proxy Middleware 1. Parses the request body and latency from context key values and creates a request metric for the proxied request. + +### Batch Processing Middleware +1. Pulls decoded batch out of the request context +2. Separates into individual sub-requests +3. Routes each sub-request through the Single Request Middleware Sequence in order to leverage caching and metrics creation. +4. Combines all sub-request responses into a single response to the client. diff --git a/architecture/images/proxy_service_middleware_sequence.jpg b/architecture/images/proxy_service_middleware_sequence.jpg new file mode 100644 index 0000000..cc0ecf9 Binary files /dev/null and b/architecture/images/proxy_service_middleware_sequence.jpg differ diff --git a/ci.docker-compose.yml b/ci.docker-compose.yml index 5d3be4f..88c9797 100644 --- a/ci.docker-compose.yml +++ b/ci.docker-compose.yml @@ -31,5 +31,6 @@ services: EVM_QUERY_SERVICE_URL: https://evmrpc.internal.testnet.proxy.kava.io ports: - "${PROXY_HOST_PORT}:${PROXY_CONTAINER_PORT}" + - "${TEST_UNCONFIGURED_PROXY_PORT}:${PROXY_CONTAINER_PORT}" - "${PROXY_CONTAINER_EVM_RPC_PRUNING_PORT}:${PROXY_CONTAINER_PORT}" - "${PROXY_HOST_DEBUG_PORT}:${PROXY_CONTAINER_DEBUG_PORT}" diff --git a/config/config.go b/config/config.go index 6d91452..b244ce4 100644 --- a/config/config.go +++ b/config/config.go @@ -20,6 +20,7 @@ type Config struct { EnableHeightBasedRouting bool ProxyPruningBackendHostURLMapRaw string ProxyPruningBackendHostURLMap map[string]url.URL + ProxyMaximumBatchSize int EvmQueryServiceURL string DatabaseName string DatabaseEndpointURL string @@ -64,6 +65,8 @@ const ( PROXY_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_BACKEND_HOST_URL_MAP" PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY = "PROXY_HEIGHT_BASED_ROUTING_ENABLED" PROXY_PRUNING_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_PRUNING_BACKEND_HOST_URL_MAP" + PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY = "PROXY_MAXIMUM_REQ_BATCH_SIZE" + DEFAULT_PROXY_MAXIMUM_BATCH_SIZE = 500 PROXY_SERVICE_PORT_ENVIRONMENT_KEY = "PROXY_SERVICE_PORT" DATABASE_NAME_ENVIRONMENT_KEY = "DATABASE_NAME" DATABASE_ENDPOINT_URL_ENVIRONMENT_KEY = "DATABASE_ENDPOINT_URL" @@ -279,6 +282,7 @@ func ReadConfig() Config { EnableHeightBasedRouting: EnvOrDefaultBool(PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY, false), ProxyPruningBackendHostURLMapRaw: rawProxyPruningBackendHostURLMap, ProxyPruningBackendHostURLMap: parsedProxyPruningBackendHostURLMap, + ProxyMaximumBatchSize: EnvOrDefaultInt(PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY, DEFAULT_PROXY_MAXIMUM_BATCH_SIZE), DatabaseName: os.Getenv(DATABASE_NAME_ENVIRONMENT_KEY), DatabaseEndpointURL: os.Getenv(DATABASE_ENDPOINT_URL_ENVIRONMENT_KEY), DatabaseUserName: os.Getenv(DATABASE_USERNAME_ENVIRONMENT_KEY), diff --git a/decode/evm_rpc.go b/decode/evm_rpc.go index 51a832b..a50f4a3 100644 --- a/decode/evm_rpc.go +++ b/decode/evm_rpc.go @@ -254,6 +254,13 @@ func DecodeEVMRPCRequest(body []byte) (*EVMRPCRequestEnvelope, error) { return &request, err } +// DecodeEVMRPCRequest attempts to decode raw bytes to a list of EVMRPCRequestEnvelopes +func DecodeEVMRPCRequestList(body []byte) ([]*EVMRPCRequestEnvelope, error) { + var request []*EVMRPCRequestEnvelope + err := json.Unmarshal(body, &request) + return request, err +} + // ExtractBlockNumberFromEVMRPCRequest attempts to extract the block number // associated with a request if // - the request is a valid evm rpc request diff --git a/docker-compose.yml b/docker-compose.yml index fc227e5..1731230 100644 --- a/docker-compose.yml +++ b/docker-compose.yml @@ -11,7 +11,7 @@ services: # run redis for proxy service to cache responses redis: - image: 'bitnami/redis:latest' + image: "bitnami/redis:latest" env_file: .env ports: - "${REDIS_HOST_PORT}:${REDIS_CONTAINER_PORT}" @@ -59,6 +59,7 @@ services: ports: - "${PROXY_HOST_PORT}:${PROXY_CONTAINER_PORT}" - "${PROXY_CONTAINER_EVM_RPC_PRUNING_PORT}:${PROXY_CONTAINER_PORT}" + - "${TEST_UNCONFIGURED_PROXY_PORT}:${PROXY_CONTAINER_PORT}" - "${PROXY_HOST_DEBUG_PORT}:${PROXY_CONTAINER_DEBUG_PORT}" cap_add: - SYS_PTRACE # Allows for attaching debugger to process in this container diff --git a/main_batch_test.go b/main_batch_test.go new file mode 100644 index 0000000..68028ab --- /dev/null +++ b/main_batch_test.go @@ -0,0 +1,299 @@ +package main_test + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "net/http" + "strconv" + "testing" + "time" + + "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/require" +) + +func buildBigBatch(n int, sameBlock bool) []*decode.EVMRPCRequestEnvelope { + batch := make([]*decode.EVMRPCRequestEnvelope, 0, n) + // create n requests + for i := 0; i < n; i++ { + block := "0x1" + if !sameBlock { + block = fmt.Sprintf("0x%s", strconv.FormatInt(int64(i)+1, 16)) + } + batch = append(batch, &decode.EVMRPCRequestEnvelope{ + JSONRPCVersion: "2.0", + ID: i, + Method: "eth_getBlockByNumber", + Params: []interface{}{block, false}, + }) + } + return batch +} + +func TestE2ETest_ValidBatchEvmRequests(t *testing.T) { + redisClient := redis.NewClient(&redis.Options{ + Addr: redisURL, + Password: redisPassword, + DB: 0, + }) + cleanUpRedis(t, redisClient) + expectKeysNum(t, redisClient, 0) + + db, err := database.NewPostgresClient(databaseConfig) + require.NoError(t, err) + cleanMetricsDb(t, db) + + // NOTE! ordering matters for these tests! earlier request responses may end up in the cache. + testCases := []struct { + name string + req []*decode.EVMRPCRequestEnvelope + expectedCacheHeader string + expectedErrStatus int + expectedNumMetrics int + }{ + { + name: "first request, valid & not coming from the cache", + req: []*decode.EVMRPCRequestEnvelope{ + { + JSONRPCVersion: "2.0", + ID: "magic!", + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x1", false}, + }, + }, + expectedCacheHeader: cachemdw.CacheMissHeaderValue, + expectedNumMetrics: 1, + }, + { + name: "multiple requests, valid & none coming from the cache", + req: []*decode.EVMRPCRequestEnvelope{ + { + JSONRPCVersion: "2.0", + ID: "magic!", + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x2", false}, + }, + { + JSONRPCVersion: "2.0", + ID: 123456, + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x3", false}, + }, + }, + expectedCacheHeader: cachemdw.CacheMissHeaderValue, + expectedNumMetrics: 2, + }, + { + name: "multiple requests, valid & some coming from the cache", + req: []*decode.EVMRPCRequestEnvelope{ + { + JSONRPCVersion: "2.0", + ID: "magic!", + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x2", false}, + }, + { + JSONRPCVersion: "2.0", + ID: 123456, + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x4", false}, + }, + }, + expectedCacheHeader: cachemdw.CachePartialHeaderValue, + expectedNumMetrics: 2, + }, + { + name: "multiple requests, valid & all coming from the cache", + req: []*decode.EVMRPCRequestEnvelope{ + { + JSONRPCVersion: "2.0", + ID: "magic!", + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x2", false}, + }, + { + JSONRPCVersion: "2.0", + ID: nil, + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x1", false}, + }, + { + JSONRPCVersion: "2.0", + ID: 123456, + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x3", false}, + }, + }, + expectedCacheHeader: cachemdw.CacheHitHeaderValue, + expectedNumMetrics: 3, + }, + { + name: "empty request", + req: []*decode.EVMRPCRequestEnvelope{nil}, // <-- empty! + expectedCacheHeader: cachemdw.CacheMissHeaderValue, + expectedNumMetrics: 0, + }, + { + name: "empty & non-empty requests, partial cache hit", + req: []*decode.EVMRPCRequestEnvelope{ + nil, // <-- empty! + { + JSONRPCVersion: "2.0", + ID: "this block is in the cache", + Method: "eth_getBlockByNumber", + Params: []interface{}{"0x1", false}, + }, + }, + expectedCacheHeader: cachemdw.CachePartialHeaderValue, + expectedNumMetrics: 1, + }, + { + name: "empty & non-empty requests, cache miss", + req: []*decode.EVMRPCRequestEnvelope{ + nil, // <-- empty! + { + JSONRPCVersion: "2.0", + ID: "this block is NOT in the cache", + Method: "eth_getBlockByNumber", + Params: []interface{}{"0xa", false}, + }, + }, + expectedCacheHeader: cachemdw.CacheMissHeaderValue, + expectedNumMetrics: 1, + }, + { + name: "big-as-can-be batch, some cache hits", + req: buildBigBatch(proxyServiceMaxBatchSize, false), + expectedCacheHeader: cachemdw.CachePartialHeaderValue, + expectedNumMetrics: proxyServiceMaxBatchSize, + }, + { + name: "big-as-can-be batch, all cache hits", + req: buildBigBatch(proxyServiceMaxBatchSize, true), + expectedCacheHeader: cachemdw.CacheHitHeaderValue, + expectedNumMetrics: proxyServiceMaxBatchSize, + }, + { + name: "too-big batch => responds 413", + req: buildBigBatch(proxyServiceMaxBatchSize+1, false), + expectedCacheHeader: cachemdw.CacheHitHeaderValue, + expectedErrStatus: http.StatusRequestEntityTooLarge, + expectedNumMetrics: 0, + }, + } + + for _, tc := range testCases { + startTime := time.Now() + t.Run(tc.name, func(t *testing.T) { + reqInJSON, err := json.Marshal(tc.req) + require.NoError(t, err) + + resp, err := http.Post(proxyServiceURL, "application/json", bytes.NewBuffer(reqInJSON)) + require.NoError(t, err) + + if tc.expectedErrStatus != 0 { + require.Equal(t, tc.expectedErrStatus, resp.StatusCode, "unexpected response status") + return + } + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err, "failed to read response body") + + var decoded []*jsonRpcResponse + err = json.Unmarshal(body, &decoded) + require.NoError(t, err, "failed to unmarshal response into array of responses") + + // expect same number of responses as requests + require.Len(t, decoded, len(tc.req)) + + // expect matching ids + for i, d := range decoded { + var reqId interface{} = nil + if tc.req[i] != nil { + reqId = tc.req[i].ID + } + // EqualValues here because json ints unmarshal as float64s + require.EqualValues(t, reqId, d.Id) + } + + // check expected cache status header + require.Equal(t, tc.expectedCacheHeader, resp.Header.Get(cachemdw.CacheHeaderKey)) + + // verify CORS header + require.Equal(t, resp.Header[accessControlAllowOriginHeaderName], []string{"*"}) + + // wait for all metrics to be created. + // besides verification, waiting for the metrics ensures future tests don't fail b/c metrics are being processed + waitForMetricsInWindow(t, tc.expectedNumMetrics, db, startTime, []string{}) + }) + } + + // clear all metrics & cache state to make future metrics tests less finicky + // (more data increases the read/write to db & redis, and these tests make many db & cache entries) + cleanMetricsDb(t, db) + cleanUpRedis(t, redisClient) +} + +func TestE2ETest_BatchEvmRequestErrorHandling(t *testing.T) { + t.Run("no backend configured (bad gateway error)", func(t *testing.T) { + validReq := []*decode.EVMRPCRequestEnvelope{ + newJsonRpcRequest(123, "eth_getBlockByNumber", []interface{}{"0x1", false}), + newJsonRpcRequest("another-req", "eth_getBlockByNumber", []interface{}{"0x2", false}), + } + reqInJSON, err := json.Marshal(validReq) + require.NoError(t, err) + resp, err := http.Post(proxyUnconfiguredUrl, "application/json", bytes.NewBuffer(reqInJSON)) + + require.NoError(t, err) + require.Equal(t, http.StatusBadGateway, resp.StatusCode) + }) + + t.Run("empty batch", func(t *testing.T) { + emptyReq := []*decode.EVMRPCRequestEnvelope{} + reqInJSON, err := json.Marshal(emptyReq) + require.NoError(t, err) + + resp, err := http.Post(proxyServiceURL, "application/json", bytes.NewBuffer(reqInJSON)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err, "failed to read response body") + + var decoded *jsonRpcResponse // <--- NOT an array response + err = json.Unmarshal(body, &decoded) + require.NoError(t, err, "failed to unmarshal response into array of responses") + + require.Equal(t, -32600, decoded.JsonRpcError.Code) + require.Equal(t, "empty batch", decoded.JsonRpcError.Message) + }) + + t.Run("unsupported method", func(t *testing.T) { + resp, err := http.Get(proxyServiceURL) // <--- GET, not POST + require.NoError(t, err) + require.Equal(t, http.StatusMethodNotAllowed, resp.StatusCode) + }) + + t.Run("invalid JSON", func(t *testing.T) { + req := []byte(`[{id:"almost valid json (missing double quotes around id!)"}]`) + resp, err := http.Post(proxyServiceURL, "application/json", bytes.NewBuffer(req)) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + body, err := io.ReadAll(resp.Body) + require.NoError(t, err, "failed to read response body") + + var decoded *jsonRpcResponse // <--- NOT an array response + err = json.Unmarshal(body, &decoded) + require.NoError(t, err, "failed to unmarshal response into array of responses") + + require.Equal(t, -32700, decoded.JsonRpcError.Code) + require.Equal(t, "parse error", decoded.JsonRpcError.Message) + }) +} diff --git a/main_test.go b/main_test.go index 6daf820..4758d1b 100644 --- a/main_test.go +++ b/main_test.go @@ -28,6 +28,7 @@ import ( "github.com/stretchr/testify/require" "github.com/kava-labs/kava-proxy-service/clients/database" + "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" "github.com/kava-labs/kava-proxy-service/service" @@ -55,11 +56,14 @@ var ( return logger }() + proxyUnconfiguredUrl = os.Getenv("TEST_UNCONFIGURED_PROXY_URL") + proxyServiceURL = os.Getenv("TEST_PROXY_SERVICE_EVM_RPC_URL") proxyServiceHostname = os.Getenv("TEST_PROXY_SERVICE_EVM_RPC_HOSTNAME") proxyServicePruningURL = os.Getenv("TEST_PROXY_SERVICE_EVM_RPC_PRUNING_URL") proxyServiceHeightBasedRouting, _ = strconv.ParseBool(os.Getenv("TEST_PROXY_HEIGHT_BASED_ROUTING_ENABLED")) + proxyServiceMaxBatchSize = config.EnvOrDefaultInt(config.PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY, config.DEFAULT_PROXY_MAXIMUM_BATCH_SIZE) databaseURL = os.Getenv("TEST_DATABASE_ENDPOINT_URL") databasePassword = os.Getenv("DATABASE_PASSWORD") @@ -81,12 +85,9 @@ var ( ) // lookup all the request metrics in the database paging as necessary -// search for any request metrics between starTime and time.Now() for particular request methods +// search for any request metrics between startTime and time.Now() for particular request methods +// if testedmethods is empty, all metrics in timeframe are returned. func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Time, testedmethods []string) []database.ProxiedRequestMetric { - // on fast machines the expected metrics haven't finished being created by the time they are being queried. - // hackily sleep for 10 micro seconds & then get current time - adjustment := 10 * time.Microsecond - time.Sleep(adjustment) endTime := time.Now() var nextCursor int64 @@ -111,7 +112,14 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti // iterate in reverse order to start checking the most recent request metrics first for i := len(proxiedRequestMetrics) - 1; i >= 0; i-- { requestMetric := proxiedRequestMetrics[i] - if requestMetric.RequestTime.After(startTime) && requestMetric.RequestTime.Before(endTime) { + isBetween := requestMetric.RequestTime.After(startTime) && requestMetric.RequestTime.Before(endTime) + if isBetween || requestMetric.RequestTime.Equal(startTime) || requestMetric.RequestTime.Equal(endTime) { + // collect all metrics if testedmethods = [] + if len(testedmethods) == 0 { + requestMetricsDuringRequestWindow = append(requestMetricsDuringRequestWindow, requestMetric) + continue + } + // collect metrics for any desired methods for _, testedMethod := range testedmethods { if requestMetric.MethodName == testedMethod { requestMetricsDuringRequestWindow = append(requestMetricsDuringRequestWindow, requestMetric) @@ -123,6 +131,34 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti return requestMetricsDuringRequestWindow } +// tests for metrics can frequently fail to pass because the check for what's there happens +// more quickly than the writes to the metrics database. +// remove some of that finickiness without unnecessary sleeping by polling for the expected requests +// and only considering it a fail if it times out. +func waitForMetricsInWindow( + t *testing.T, + expected int, + db database.PostgresClient, + startTime time.Time, + testedmethods []string, +) (metrics []database.ProxiedRequestMetric) { + timeoutMin := 1 * time.Second + // scale the timeout by the number of expected requests, or at least 1 second + timeout := time.Duration(expected+1) * 100 * time.Millisecond + if timeout < timeoutMin { + timeout = timeoutMin + } + + // besides verification, waiting for the metrics ensures future tests don't fail b/c metrics are being processed + require.Eventually(t, func() bool { + metrics = findMetricsInWindowForMethods(db, startTime, []string{}) + return len(metrics) >= expected + }, timeout, time.Millisecond, + fmt.Sprintf("failed to find %d metrics in %f seconds from start %s", expected, timeout.Seconds(), startTime)) + + return +} + func TestE2ETestProxyReturnsNonZeroLatestBlockHeader(t *testing.T) { client, err := ethclient.Dial(proxyServiceURL) @@ -172,13 +208,10 @@ func TestE2ETestProxyCreatesRequestMetricForEachRequest(t *testing.T) { require.NoError(t, err) - requestMetricsDuringRequestWindow := findMetricsInWindowForMethods(databaseClient, startTime, []string{testEthMethodName}) - - require.Greater(t, len(requestMetricsDuringRequestWindow), 0) + requestMetricsDuringRequestWindow := waitForMetricsInWindow(t, 1, databaseClient, startTime, []string{testEthMethodName}) requestMetricDuringRequestWindow := requestMetricsDuringRequestWindow[0] - require.Greater(t, requestMetricDuringRequestWindow.ResponseLatencyMilliseconds, int64(0)) require.Equal(t, requestMetricDuringRequestWindow.MethodName, testEthMethodName) require.Equal(t, requestMetricDuringRequestWindow.Hostname, proxyServiceHostname) require.NotEqual(t, requestMetricDuringRequestWindow.RequestIP, "") @@ -215,9 +248,10 @@ func TestE2ETestProxyTracksBlockNumberForEth_getBlockByNumberRequest(t *testing. require.NoError(t, err) - requestMetricsDuringRequestWindow := findMetricsInWindowForMethods(databaseClient, startTime, []string{testEthMethodName}) + requestMetricsDuringRequestWindow := waitForMetricsInWindow( + t, 1, databaseClient, startTime, []string{testEthMethodName}, + ) - require.Greater(t, len(requestMetricsDuringRequestWindow), 0) requestMetricDuringRequestWindow := requestMetricsDuringRequestWindow[0] require.Equal(t, requestMetricDuringRequestWindow.MethodName, testEthMethodName) @@ -244,9 +278,10 @@ func TestE2ETestProxyTracksBlockTagForEth_getBlockByNumberRequest(t *testing.T) require.NoError(t, err) - requestMetricsDuringRequestWindow := findMetricsInWindowForMethods(databaseClient, startTime, []string{testEthMethodName}) + requestMetricsDuringRequestWindow := waitForMetricsInWindow( + t, 1, databaseClient, startTime, []string{testEthMethodName}, + ) - require.Greater(t, len(requestMetricsDuringRequestWindow), 0) requestMetricDuringRequestWindow := requestMetricsDuringRequestWindow[0] require.Equal(t, requestMetricDuringRequestWindow.MethodName, testEthMethodName) @@ -303,10 +338,9 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockNumberParam(t *testing. // eth_call _, _ = client.CallContract(testContext, ethereum.CallMsg{}, requestBlockNumber) - requestMetricsDuringRequestWindow := findMetricsInWindowForMethods(databaseClient, startTime, testedmethods) - - // should be the above but geth doesn't implement client methods for all of them - require.GreaterOrEqual(t, len(requestMetricsDuringRequestWindow), 7) + requestMetricsDuringRequestWindow := waitForMetricsInWindow( + t, 7, databaseClient, startTime, testedmethods, + ) for _, requestMetricDuringRequestWindow := range requestMetricsDuringRequestWindow { require.NotNil(t, *requestMetricDuringRequestWindow.BlockNumber) @@ -356,11 +390,9 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockHashParam(t *testing.T) // eth_getTransactionByBlockHashAndIndex _, _ = client.TransactionInBlock(testContext, requestBlockHash, 0) - requestMetricsDuringRequestWindow := findMetricsInWindowForMethods(databaseClient, startTime, testedmethods) - - // require.GreaterOrEqual(t, len(requestMetricsDuringRequestWindow), len(testedmethods)) - // should be the above but geth doesn't implement client methods for all of them - require.GreaterOrEqual(t, len(requestMetricsDuringRequestWindow), 3) + requestMetricsDuringRequestWindow := waitForMetricsInWindow( + t, 3, databaseClient, startTime, testedmethods, + ) for _, requestMetricDuringRequestWindow := range requestMetricsDuringRequestWindow { require.NotNil(t, *requestMetricDuringRequestWindow.BlockNumber) @@ -441,10 +473,9 @@ func TestE2ETest_HeightBasedRouting(t *testing.T) { err := rpc.Call(nil, tc.method, tc.params...) require.NoError(t, err) - metrics := findMetricsInWindowForMethods(databaseClient, startTime, []string{tc.method}) + metrics := waitForMetricsInWindow(t, 1, databaseClient, startTime, []string{tc.method}) require.Len(t, metrics, 1) - fmt.Printf("%+v\n", metrics[0]) require.Equal(t, tc.method, metrics[0].MethodName) require.Equal(t, tc.expectRoute, metrics[0].ResponseBackend) }) @@ -494,6 +525,7 @@ func TestE2ETestCachingMdwWithBlockNumberParam(t *testing.T) { expectKeysNum(t, redisClient, tc.keysNum) expectedKey := "local-chain:evm-request:eth_getBlockByNumber:sha256:d08b426164eacf6646fb1817403ec0af5d37869a0f32a01ebfab3096fa4999be" containsKey(t, redisClient, expectedKey) + require.Equal(t, cacheMissResp.Header[accessControlAllowOriginHeaderName], []string{"*"}) // eth_getBlockByNumber - cache HIT cacheHitResp := mkJsonRpcRequest(t, proxyServiceURL, 1, tc.method, tc.params) @@ -673,7 +705,7 @@ func TestE2ETestCachingMdwWithBlockNumberParam_Metrics(t *testing.T) { } // get metrics between startTime & now for eth_getBlockByNumber requests - filteredMetrics := findMetricsInWindowForMethods(db, startTime, []string{"eth_getBlockByNumber"}) + filteredMetrics := waitForMetricsInWindow(t, 4, db, startTime, []string{"eth_getBlockByNumber"}) // we expect 4 metrics, 2 of them are cache hits and two of them are cache misses require.Len(t, filteredMetrics, 4) @@ -1130,6 +1162,11 @@ func cleanUpRedis(t *testing.T, redisClient *redis.Client) { } } +func cleanMetricsDb(t *testing.T, db database.PostgresClient) { + _, err := db.Exec("TRUNCATE proxied_request_metrics;") + require.NoError(t, err) +} + func mkJsonRpcRequest(t *testing.T, proxyServiceURL string, id interface{}, method string, params []interface{}) *http.Response { req := newJsonRpcRequest(id, method, params) reqInJSON, err := json.Marshal(req) @@ -1142,19 +1179,12 @@ func mkJsonRpcRequest(t *testing.T, proxyServiceURL string, id interface{}, meth return resp } -type jsonRpcRequest struct { - JsonRpc string `json:"jsonrpc"` - Id interface{} `json:"id"` - Method string `json:"method"` - Params []interface{} `json:"params"` -} - -func newJsonRpcRequest(id interface{}, method string, params []interface{}) *jsonRpcRequest { - return &jsonRpcRequest{ - JsonRpc: "2.0", - Id: id, - Method: method, - Params: params, +func newJsonRpcRequest(id interface{}, method string, params []interface{}) *decode.EVMRPCRequestEnvelope { + return &decode.EVMRPCRequestEnvelope{ + JSONRPCVersion: "2.0", + ID: id, + Method: method, + Params: params, } } diff --git a/routines/metric_partitioning_test.go b/routines/metric_partitioning_test.go index 458362f..436b96f 100644 --- a/routines/metric_partitioning_test.go +++ b/routines/metric_partitioning_test.go @@ -34,7 +34,7 @@ func TestE2ETestMetricPartitioningRoutinePrefillsExpectedPartitionsAfterStartupD // prepare time.Sleep(time.Duration(MetricPartitioningRoutineDelayFirstRunSeconds) * time.Second) - expectedPartitions, err := partitionsForPeriod(time.Now(), int(configuredPrefillDays)) + expectedPartitions, err := partitionsForPeriod(time.Now().UTC(), int(configuredPrefillDays)) assert.Nil(t, err) diff --git a/service/batchmdw/batch_processor.go b/service/batchmdw/batch_processor.go new file mode 100644 index 0000000..e88a943 --- /dev/null +++ b/service/batchmdw/batch_processor.go @@ -0,0 +1,142 @@ +package batchmdw + +import ( + "bytes" + "encoding/json" + "net/http" + "sync" + + "github.com/kava-labs/kava-proxy-service/service/cachemdw" +) + +// BatchProcessor makes multiple requests to the underlying handler and then combines all the +// responses into a single response. +// It assumes all individual responses are valid json. Each response is then marshaled into an array. +type BatchProcessor struct { + handler http.HandlerFunc + requests []*http.Request + responses []*bytes.Buffer + header http.Header + cacheHits int + status int + mu sync.Mutex +} + +// NewBatchProcessor creates a BatchProcessor for combining the responses of reqs to the handler +func NewBatchProcessor(handler http.HandlerFunc, reqs []*http.Request) *BatchProcessor { + return &BatchProcessor{ + handler: handler, + requests: reqs, + responses: make([]*bytes.Buffer, len(reqs)), + header: nil, + status: http.StatusOK, + mu: sync.Mutex{}, + } +} + +// RequestAndServe concurrently sends each request to the underlying handler +// Responses are then collated into a JSON array and written to the ResponseWriter +func (bp *BatchProcessor) RequestAndServe(w http.ResponseWriter) error { + wg := sync.WaitGroup{} + for i, r := range bp.requests { + wg.Add(1) + + go func(idx int, req *http.Request) { + + buf := new(bytes.Buffer) + frw := newFakeResponseWriter(buf, bp.setErrStatus) + bp.handler.ServeHTTP(frw, req) + + bp.setResponse(idx, buf) + bp.applyHeaders(frw.header) + + wg.Done() + }(i, r) + } + + wg.Wait() + + // write all headers + for k, v := range bp.header { + for _, val := range v { + w.Header().Set(k, val) + } + } + + // write cache hit header based on results of all requests + w.Header().Set(cachemdw.CacheHeaderKey, cacheHitValue(len(bp.requests), bp.cacheHits)) + + // return error status if any sub-request returned a non-200 response + if bp.status != http.StatusOK { + w.WriteHeader(bp.status) + w.Write(nil) + return nil + } + + // marshal results into a JSON array + rawMessages := make([]json.RawMessage, 0, len(bp.requests)) + for _, r := range bp.responses { + rawMessages = append(rawMessages, json.RawMessage(r.Bytes())) + } + res, err := json.Marshal(rawMessages) + if err != nil { + return err + } + + w.WriteHeader(http.StatusOK) + w.Write(res) + + return nil +} + +// setResponse is a thread-safe method to set the response for the query with index idx +func (bp *BatchProcessor) setResponse(idx int, res *bytes.Buffer) { + bp.mu.Lock() + defer bp.mu.Unlock() + bp.responses[idx] = res +} + +// applyHeaders is a thread-safe method for combining new response headers with existing results. +// the headers of the first response are used, except for Content-Length and the cache hit status. +// Cache hits are tracked so a representative value can be set after all responses are received. +func (bp *BatchProcessor) applyHeaders(h http.Header) { + bp.mu.Lock() + defer bp.mu.Unlock() + + // initialize all headers with the value of the first response + if bp.header == nil { + bp.header = h.Clone() + // clear content length, will be set by actual Write to client + // must be cleared in order to prevent premature end of client read + bp.header.Del("Content-Length") + // clear cache hit header, will be set by flush() + bp.header.Del(cachemdw.CacheHeaderKey) + } + + // track cache hits + if cachemdw.IsCacheHitHeaders(h) { + bp.cacheHits += 1 + } +} + +// SetErrStatus tracks an error status code if any request returns a non-200 response +func (bp *BatchProcessor) setErrStatus(status int, _ http.Header, _ *bytes.Buffer) { + bp.mu.Lock() + defer bp.mu.Unlock() + bp.status = status +} + +// cacheHitValue handles determining the value for the combined response's CacheHeader +func cacheHitValue(totalNum, cacheHits int) string { + // NOTE: middleware assumes non-zero batch length. + // totalNum should never be 0. if it is, this will indicate a cache MISS. + if cacheHits == 0 || totalNum == 0 { + // case 1. no results from cache => MISS + return cachemdw.CacheMissHeaderValue + } else if cacheHits == totalNum { + // case 2: all results from cache => HIT + return cachemdw.CacheHitHeaderValue + } + //case 3: some results from cache => PARTIAL + return cachemdw.CachePartialHeaderValue +} diff --git a/service/batchmdw/batch_processor_test.go b/service/batchmdw/batch_processor_test.go new file mode 100644 index 0000000..42ab445 --- /dev/null +++ b/service/batchmdw/batch_processor_test.go @@ -0,0 +1,47 @@ +package batchmdw + +import ( + "testing" + + "github.com/kava-labs/kava-proxy-service/service/cachemdw" + "github.com/stretchr/testify/require" +) + +func TestUnitTest_cacheHitValue(t *testing.T) { + for _, tc := range []struct { + name string + total int + hits int + expected string + }{ + { + name: "no hits => MISS", + total: 5, + hits: 0, + expected: cachemdw.CacheMissHeaderValue, + }, + { + name: "all hits => HIT", + total: 5, + hits: 5, + expected: cachemdw.CacheHitHeaderValue, + }, + { + name: "some hits => PARTIAL", + total: 5, + hits: 3, + expected: cachemdw.CachePartialHeaderValue, + }, + { + name: "invalid 0 case => MISS", + total: 0, + hits: 0, + expected: cachemdw.CacheMissHeaderValue, + }, + } { + t.Run(tc.name, func(t *testing.T) { + actual := cacheHitValue(tc.total, tc.hits) + require.Equal(t, tc.expected, actual) + }) + } +} diff --git a/service/batchmdw/batchmdw.go b/service/batchmdw/batchmdw.go new file mode 100644 index 0000000..ed0a729 --- /dev/null +++ b/service/batchmdw/batchmdw.go @@ -0,0 +1,83 @@ +package batchmdw + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "net/http" + + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/logging" +) + +// BatchMiddlewareConfig are the necessary configuration options for the Batch Processing Middleware +type BatchMiddlewareConfig struct { + ServiceLogger *logging.ServiceLogger + + ContextKeyDecodedRequestBatch string + ContextKeyDecodedRequestSingle string + MaximumBatchSize int +} + +// CreateBatchProcessingMiddleware handles batch EVM requests +// The batched request is pulled from the context. +// Then, each request is proxied via the singleRequestHandler +// and the responses are collated into a single result which is served to the client. +func CreateBatchProcessingMiddleware( + singleRequestHandler http.HandlerFunc, + config *BatchMiddlewareConfig, +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + batch := r.Context().Value(config.ContextKeyDecodedRequestBatch) + batchReq, ok := (batch).([]*decode.EVMRPCRequestEnvelope) + if !ok { + // this should only happen if the service is misconfigured. + // the DecodeRequestMiddleware should only route to BatchProcessingMiddleware + // if it successfully decodes a non-zero length batch of EVM requests. + config.ServiceLogger.Error().Msg("BatchProcessingMiddleware expected batch EVM request in context but found none") + // if we can't get decoded request then assign it empty structure to avoid panics + batchReq = []*decode.EVMRPCRequestEnvelope{} + } + if len(batchReq) > config.MaximumBatchSize { + config.ServiceLogger.Debug().Int("size", len(batchReq)).Int("max allowed", config.MaximumBatchSize).Msg("request batch size too large") + w.WriteHeader(http.StatusRequestEntityTooLarge) + w.Write([]byte(fmt.Sprintf("request batch size is too large (%d>%d)", len(batchReq), config.MaximumBatchSize))) + return + } + + config.ServiceLogger.Trace().Any("batch", batchReq).Msg("[BatchProcessingMiddleware] process EVM batch request") + + reqs := make([]*http.Request, 0, len(batchReq)) + for _, single := range batchReq { + // proxy service middlewares expect decoded context key to not be set if the request is nil + // not setting it ensures no nil pointer panics if `null` is included in batch array of requests + singleRequestContext := r.Context() + if single != nil { + singleRequestContext = context.WithValue(r.Context(), config.ContextKeyDecodedRequestSingle, single) + } + + body, err := json.Marshal(single) + if err != nil { + // this shouldn't happen b/c we are marshaling something we unmarshaled. + config.ServiceLogger.Error().Err(err).Any("request", single).Msg("[BatchProcessingMiddleware] unable to marshal request in batch") + // if it does happen, degrade gracefully by skipping request. + continue + } + + // build request as if it's the only one being requested. + req, err := http.NewRequestWithContext(singleRequestContext, r.Method, r.URL.String(), bytes.NewBuffer(body)) + if err != nil { + panic(fmt.Sprintf("failed build sub-request: %s", err)) + } + req.Host = r.Host + req.Header = r.Header + + reqs = append(reqs, req) + } + + // process all requests and respond with results in an array + batchProcessor := NewBatchProcessor(singleRequestHandler, reqs) + batchProcessor.RequestAndServe(w) + } +} diff --git a/service/batchmdw/doc.go b/service/batchmdw/doc.go new file mode 100644 index 0000000..c23a7ca --- /dev/null +++ b/service/batchmdw/doc.go @@ -0,0 +1,16 @@ +// Package batchmdw is responsible for the middleware used to handle batch requests. + +// The primary export is CreateBatchProcessingMiddleware which separates each individual request +// in the batch and proxies it as if it were a single request. +// The responses are then combined into a single JSON array before being sent to the client. + +// A best effort is made to forward the appropriate response headers. +// The headers from the response for the first request are used with the exception of: +// - Content-Length, which is dropped to ensure client reads whole response +// - The cache status header set by cachemdw, which is updated to reflect the cache-hit status of _all_ requests. + +// The cache status header will be set to: +// - `HIT` when all requests are cache hits +// - `MISS` when all requests are cache misses +// - `PARTIAL` when there is a mix of cache hits and misses +package batchmdw diff --git a/service/batchmdw/fake_response_writer.go b/service/batchmdw/fake_response_writer.go new file mode 100644 index 0000000..036ea14 --- /dev/null +++ b/service/batchmdw/fake_response_writer.go @@ -0,0 +1,52 @@ +package batchmdw + +import ( + "bytes" + "net/http" +) + +type ErrorHandler = func(status int, headers http.Header, body *bytes.Buffer) + +// fakeResponseWriter is a custom implementation of http.ResponseWriter that writes all content +// to a buffer. +type fakeResponseWriter struct { + // body is the response body for the current request + body *bytes.Buffer + // header is the response headers for the current request + header http.Header + // onErrorHandler is a method for handling non-OK status responses + onErrorHandler ErrorHandler +} + +var _ http.ResponseWriter = &fakeResponseWriter{} + +// newFakeResponseWriter creates a new fakeResponseWriter that wraps the provided buffer. +func newFakeResponseWriter(buf *bytes.Buffer, onErrorHandler ErrorHandler) *fakeResponseWriter { + return &fakeResponseWriter{ + header: make(http.Header), + body: buf, + onErrorHandler: onErrorHandler, + } +} + +// Write implements the Write method of http.ResponseWriter +// it overrides the Write method to capture the response content for the current request +func (w *fakeResponseWriter) Write(b []byte) (int, error) { + // Write to the buffer + w.body.Write(b) + return len(b), nil +} + +// Header implements the Header method of http.ResponseWriter +// it overrides the Header method to capture the response headers for the current request +func (w *fakeResponseWriter) Header() http.Header { + return w.header +} + +// WriteHeader implements the WriteHeader method of http.ResponseWriter +// it overrides the WriteHeader method to prevent proxied requests from having finalized headers +func (w *fakeResponseWriter) WriteHeader(status int) { + if status != http.StatusOK { + w.onErrorHandler(status, w.header, w.body) + } +} diff --git a/service/cachemdw/cache.go b/service/cachemdw/cache.go index cc7ce50..8110d5b 100644 --- a/service/cachemdw/cache.go +++ b/service/cachemdw/cache.go @@ -80,6 +80,13 @@ func IsCacheable( logger *logging.ServiceLogger, req *decode.EVMRPCRequestEnvelope, ) bool { + // technically, we _could_ cache the "invalid request" response for `null` requests... + // however, doing so will result in different than expected responses for batch requests + // ie. {error} response vs [{error}] (the not-list is expected) + if req == nil { + return false + } + if req.Method == "" { return false } diff --git a/service/cachemdw/is_cached_middleware.go b/service/cachemdw/is_cached_middleware.go index e41b4c9..24e6887 100644 --- a/service/cachemdw/is_cached_middleware.go +++ b/service/cachemdw/is_cached_middleware.go @@ -12,9 +12,10 @@ const ( CachedContextKey = "X-KAVA-PROXY-CACHED" ResponseContextKey = "X-KAVA-PROXY-RESPONSE" - CacheHeaderKey = "X-Kava-Proxy-Cache-Status" - CacheHitHeaderValue = "HIT" - CacheMissHeaderValue = "MISS" + CacheHeaderKey = "X-Kava-Proxy-Cache-Status" + CacheHitHeaderValue = "HIT" + CacheMissHeaderValue = "MISS" + CachePartialHeaderValue = "PARTIAL" ) // IsCachedMiddleware returns kava-proxy-service compatible middleware which works in the following way: @@ -83,3 +84,9 @@ func IsRequestCached(ctx context.Context) bool { cached, ok := ctx.Value(CachedContextKey).(bool) return ok && cached } + +// IsCacheHitHeaders returns true when the passed in response headers are for a request that +// came from the cache. +func IsCacheHitHeaders(header http.Header) bool { + return header.Get(CacheHeaderKey) == CacheHitHeaderValue +} diff --git a/service/middleware.go b/service/middleware.go index 5b19e72..342e703 100644 --- a/service/middleware.go +++ b/service/middleware.go @@ -22,6 +22,7 @@ const ( DefaultAnonymousUserAgent = "anon" // Service defined context keys DecodedRequestContextKey = "X-KAVA-PROXY-DECODED-REQUEST-BODY" + DecodedBatchRequestContextKey = "X-KAVA-PROXY-DECODED-BATCH-REQUEST-BODY" OriginRoundtripLatencyMillisecondsKey = "X-KAVA-PROXY-ORIGIN-ROUNDTRIP-LATENCY-MILLISECONDS" RequestStartTimeContextKey = "X-KAVA-PROXY-REQUEST-START-TIME" RequestHostnameContextKey = "X-KAVA-PROXY-REQUEST-HOSTNAME" @@ -83,60 +84,70 @@ func (w bodySaverResponseWriter) Write(b []byte) (int, error) { w.serviceLogger.Trace().Msg("response body is empty, skipping after request interceptors") } - size, err := w.ResponseWriter.Write(b) - - return size, err + return w.ResponseWriter.Write(b) } -// createRequestLoggingMiddleware returns a handler that logs any request to stdout -// and if able to decode the request to a known type adds it as a context key -// To use the decoded request body, get the value from the context and then -// use type assertion to EVMRPCRequestEnvelope. With this middleware, the request body -// can be read once, and then accessed by all future middleware and the final -// http handler. -func createRequestLoggingMiddleware(h http.HandlerFunc, serviceLogger *logging.ServiceLogger) http.HandlerFunc { +// createDecodeRequestMiddleware is responsible for creating a middleware that +// - decodes the incoming EVM request +// - if successful, puts the decoded request into the context +// - determines if the request is for a single or batch request +// - routes batch requests to BatchProcessingMiddleware +// - routes single requests to next() +func createDecodeRequestMiddleware(next http.HandlerFunc, batchProcessingMiddleware http.HandlerFunc, serviceLogger *logging.ServiceLogger) http.HandlerFunc { return func(w http.ResponseWriter, r *http.Request) { + // capture the initial request time in order to calculate response time & latency at the end requestStartTimeContext := context.WithValue(r.Context(), RequestStartTimeContextKey, time.Now()) - var rawBody []byte - - if r.Body != nil { - var rawBodyBuffer bytes.Buffer - - // Read the request body - body := io.TeeReader(r.Body, &rawBodyBuffer) - - var err error - - rawBody, err = io.ReadAll(body) - - if err != nil { - serviceLogger.Debug().Msg(fmt.Sprintf("error %s reading request body %s", err, body)) - - h.ServeHTTP(w, r) + // skip processing if there is no body content + if r.Body == nil { + serviceLogger.Trace().Msg("no data in request") + next.ServeHTTP(w, r) + return + } - return - } + // read body content + var rawBodyBuffer bytes.Buffer + body := io.TeeReader(r.Body, &rawBodyBuffer) - // Repopulate the request body for the ultimate consumer of this request - r.Body = io.NopCloser(&rawBodyBuffer) + rawBody, err := io.ReadAll(body) + if err != nil { + serviceLogger.Trace().Msg(fmt.Sprintf("error %s reading request body %s", err, body)) + next.ServeHTTP(w, r) + return } + // Repopulate the request body for the ultimate consumer of this request + r.Body = io.NopCloser(&rawBodyBuffer) + + // attempt to decode as single EVM request decodedRequest, err := decode.DecodeEVMRPCRequest(rawBody) + if err == nil { + // successfully decoded request as a single valid EVM request + // forward along with decoded request in context + serviceLogger.Trace(). + Any("decoded request", decodedRequest). + Msg("successfully decoded single EVM request") + singleDecodedReqContext := context.WithValue(requestStartTimeContext, DecodedRequestContextKey, decodedRequest) + next.ServeHTTP(w, r.WithContext(singleDecodedReqContext)) + return + } + // attempt to decode as list of requests + batchRequests, err := decode.DecodeEVMRPCRequestList(rawBody) if err != nil { serviceLogger.Debug().Msg(fmt.Sprintf("error %s parsing of request body %s", err, rawBody)) - - h.ServeHTTP(w, r) - + next.ServeHTTP(w, r) + return + } + if len(batchRequests) == 0 { + serviceLogger.Trace().Msg(fmt.Sprintf("request is for an empty batch: %s", rawBody)) + next.ServeHTTP(w, r) return } - serviceLogger.Trace().Msg(fmt.Sprintf("decoded request body %+v", decodedRequest)) - - decodedRequestBodyContext := context.WithValue(requestStartTimeContext, DecodedRequestContextKey, decodedRequest) - - h.ServeHTTP(w, r.WithContext(decodedRequestBodyContext)) + serviceLogger.Trace().Any("batch", batchRequests).Msg("successfully decoded batch of requests") + batchDecodedReqContext := context.WithValue(requestStartTimeContext, DecodedBatchRequestContextKey, batchRequests) + batchProcessingMiddleware.ServeHTTP(w, r.WithContext(batchDecodedReqContext)) } } @@ -270,6 +281,11 @@ func createProxyRequestMiddleware(next http.Handler, config config.Config, servi Msg("cache miss") w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheMissHeaderValue) + // add CORS headers (if not already added) + accessControlAllowOriginValue := config.GetAccessControlAllowOriginValue(r.Host) + if w.Header().Get("Access-Control-Allow-Origin") == "" && accessControlAllowOriginValue != "" { + w.Header().Set("Access-Control-Allow-Origin", accessControlAllowOriginValue) + } proxy.ServeHTTP(lrw, r) } diff --git a/service/service.go b/service/service.go index 35d1542..7b41c6c 100644 --- a/service/service.go +++ b/service/service.go @@ -9,11 +9,13 @@ import ( "time" "github.com/ethereum/go-ethereum/ethclient" + "github.com/kava-labs/kava-proxy-service/clients/cache" "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/clients/database/migrations" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service/batchmdw" "github.com/kava-labs/kava-proxy-service/service/cachemdw" ) @@ -51,7 +53,7 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // create an http router for registering handlers for a given route mux := http.NewServeMux() - // will run after the proxy middleware handler and is + // AfterProxyFinalizer will run after the proxy middleware handler and is // the final function called after all other middleware // allowing it to access values added to the request context // to do things like metric the response and cache the response @@ -65,7 +67,10 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // - response is present in context cacheAfterProxyMiddleware := serviceCache.CachingMiddleware(afterProxyFinalizer) - // create an http handler that will proxy any request to the specified URL + // ProxyRequestMiddleware responds to the client with + // - cached data if present in the context + // - a forwarded request to the appropriate backend + // Backend is decided by the Proxies configuration for a particular host. proxyMiddleware := createProxyRequestMiddleware(cacheAfterProxyMiddleware, config, serviceLogger, []RequestInterceptor{}, []RequestInterceptor{}) // IsCachedMiddleware works in the following way: @@ -74,9 +79,25 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // - if not present marks as uncached in context and forwards to next middleware cacheMiddleware := serviceCache.IsCachedMiddleware(proxyMiddleware) - // create an http handler that will log the request to stdout - // this handler will run before the proxyMiddleware handler - requestLoggingMiddleware := createRequestLoggingMiddleware(cacheMiddleware, serviceLogger) + // BatchProcessingMiddleware separates a batch into multiple requests and routes each one + // through the single request middleware sequence. + // This allows the sub-requests of a batch to leverage the cache & metric recording. + // Expects non-zero length batch to be in the context. + batchMdwConfig := batchmdw.BatchMiddlewareConfig{ + ServiceLogger: serviceLogger, + ContextKeyDecodedRequestBatch: DecodedBatchRequestContextKey, + ContextKeyDecodedRequestSingle: DecodedRequestContextKey, + MaximumBatchSize: config.ProxyMaximumBatchSize, + } + batchProcessingMiddleware := batchmdw.CreateBatchProcessingMiddleware(cacheMiddleware, &batchMdwConfig) + + // DecodeRequestMiddleware captures the request start time & attempts to decode the request body. + // If successful, the decoded request is put into the request context: + // - if decoded as a single EVM request: it forwards it to the single request middleware sequence + // - if decoded as a batch EVM request: it forwards it to the batchProcessingMiddleware + // - if fails to decode: it passes to single request middleware sequence which will proxy the request + // When requests fail to decode, no context value is set. + decodeRequestMiddleware := createDecodeRequestMiddleware(cacheMiddleware, batchProcessingMiddleware, serviceLogger) // register healthcheck handler that can be used during deployment and operations // to determine if the service is ready to receive requests @@ -87,7 +108,7 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi mux.HandleFunc("/servicecheck", createServicecheckHandler(&service)) // register middleware chain as the default handler for any request to the proxy service - mux.HandleFunc("/", requestLoggingMiddleware) + mux.HandleFunc("/", decodeRequestMiddleware) // create an http server for the caller to start on demand with a call to ProxyService.Run() server := &http.Server{ diff --git a/service/shard.go b/service/shard.go index b4c49a6..ddb5710 100644 --- a/service/shard.go +++ b/service/shard.go @@ -44,13 +44,13 @@ func (hsp HeightShardingProxies) ProxyForRequest(r *http.Request) (*httputil.Rev // some RPC methods can always be routed to the latest block if decode.MethodRequiresNoHistory(decodedReq.Method) { - hsp.Debug().Msg(fmt.Sprintf("request method %s can always use latest block. routing to pruning proxy", decodedReq.Method)) + hsp.Trace().Msg(fmt.Sprintf("request method %s can always use latest block. routing to pruning proxy", decodedReq.Method)) return hsp.pruningProxies.ProxyForRequest(r) } // short circuit if requesting a method that doesn't include block height number if !decode.MethodHasBlockNumberParam(decodedReq.Method) { - hsp.Debug().Msg(fmt.Sprintf("request method does not include block height (%s). routing to default proxy", decodedReq.Method)) + hsp.Trace().Msg(fmt.Sprintf("request method does not include block height (%s). routing to default proxy", decodedReq.Method)) return hsp.defaultProxies.ProxyForRequest(r) } @@ -63,10 +63,10 @@ func (hsp HeightShardingProxies) ProxyForRequest(r *http.Request) (*httputil.Rev // route "latest" to pruning proxy, otherwise route to default if shouldRouteToPruning(height) { - hsp.Debug().Msg(fmt.Sprintf("request is for latest height (%d). routing to pruning proxy", height)) + hsp.Trace().Msg(fmt.Sprintf("request is for latest height (%d). routing to pruning proxy", height)) return hsp.pruningProxies.ProxyForRequest(r) } - hsp.Debug().Msg(fmt.Sprintf("request is for specific height (%d). routing to default proxy", height)) + hsp.Trace().Msg(fmt.Sprintf("request is for specific height (%d). routing to default proxy", height)) return hsp.defaultProxies.ProxyForRequest(r) }