From 956b227d0e611ed287cdef6c9772e3928eeae952 Mon Sep 17 00:00:00 2001 From: Evgeniy Scherbina Date: Fri, 20 Oct 2023 13:50:01 -0400 Subject: [PATCH] Basic implementation of Caching Middleware (#39) * Basic implementation of Caching Middleware * Minor fix * Added comments * Update CACHING.md * Update CACHING.md * Update CACHING.md * Add response parser * Integrate response parser and adjust tests accordingly * Fix tests * Improve tests * CR's fixes * CR's fixes * Change Key Structure * Update CACHING.md * Rename ChainID -> CachePrefix and add validation * Add is_cache_enabled boolean flag * Added test for cache disabled scenario * CR's fixes * Improve tests * Update healthcheck * CR's fixes * CR's fixes: improve logging * Fixes after merge * Improve IsCacheable method * Improve logging * CR's fixes * Fix JSON-RPC response's ID flow * Remove deprecated TODO * Small fix --- .env | 15 ++ .gitignore | 3 +- architecture/CACHING.md | 90 +++++++ clients/cache/cache.go | 16 ++ clients/cache/inmemory.go | 101 +++++++ clients/cache/redis.go | 114 ++++++++ config/config.go | 15 ++ config/validate.go | 14 + decode/evm_rpc.go | 20 +- go.mod | 3 + go.sum | 7 + main_test.go | 324 ++++++++++++++++++++++- service/cachemdw/cache.go | 152 +++++++++++ service/cachemdw/cache_test.go | 106 ++++++++ service/cachemdw/caching_middleware.go | 66 +++++ service/cachemdw/doc.go | 10 + service/cachemdw/is_cached_middleware.go | 89 +++++++ service/cachemdw/keys.go | 68 +++++ service/cachemdw/keys_test.go | 73 +++++ service/cachemdw/middleware_test.go | 206 ++++++++++++++ service/cachemdw/response.go | 99 +++++++ service/cachemdw/response_test.go | 166 ++++++++++++ service/cachemdw/testdata_test.go | 298 +++++++++++++++++++++ service/handlers.go | 15 +- service/middleware.go | 64 ++++- service/service.go | 85 ++++-- 26 files changed, 2188 insertions(+), 31 deletions(-) create mode 100644 architecture/CACHING.md create mode 100644 clients/cache/cache.go create mode 100644 clients/cache/inmemory.go create mode 100644 clients/cache/redis.go create mode 100644 service/cachemdw/cache.go create mode 100644 service/cachemdw/cache_test.go create mode 100644 service/cachemdw/caching_middleware.go create mode 100644 service/cachemdw/doc.go create mode 100644 service/cachemdw/is_cached_middleware.go create mode 100644 service/cachemdw/keys.go create mode 100644 service/cachemdw/keys_test.go create mode 100644 service/cachemdw/middleware_test.go create mode 100644 service/cachemdw/response.go create mode 100644 service/cachemdw/response_test.go create mode 100644 service/cachemdw/testdata_test.go diff --git a/.env b/.env index d7b38f5..c7c2375 100644 --- a/.env +++ b/.env @@ -47,6 +47,8 @@ TEST_SERVICE_LOG_LEVEL=ERROR # endpoint the proxy service should use for querying # evm blockchain information related to proxied requests TEST_EVM_QUERY_SERVICE_URL=http://kava-validator:8545 +# TEST_REDIS_ENDPOINT_URL is an url of redis +TEST_REDIS_ENDPOINT_URL=localhost:6379 ##### Kava Node Config @@ -100,6 +102,19 @@ METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS=10 METRIC_PARTITIONINING_PREFILL_PERIOD_DAYS=7 # Used by `ready` script to ensure metric partitions have been created. MINIMUM_REQUIRED_PARTITIONS=30 +# CACHE_ENABLED specifies if cache should be enabled. By default cache is disabled. +CACHE_ENABLED=true +# REDIS_ENDPOINT_URL is an url of redis +REDIS_ENDPOINT_URL=redis:6379 +REDIS_PASSWORD= +# CACHE_TTL_SECONDS is a TTL for cached evm requests +# CACHE_TTL_SECONDS should be specified in seconds +CACHE_TTL_SECONDS=600 +# CACHE_PREFIX is used as prefix for any key in the cache, key has such structure: +# :evm-request::sha256: +# Possible values are testnet, mainnet, etc... +# CACHE_PREFIX must not contain colon symbol +CACHE_PREFIX=local-chain ##### Database Config POSTGRES_PASSWORD=password diff --git a/.gitignore b/.gitignore index 89c231d..b6aeeef 100644 --- a/.gitignore +++ b/.gitignore @@ -17,10 +17,11 @@ kava-proxy-service cover.html # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ # ignore editor files .vscode/ +.idea/ # ignore e2e test validator files docker/shared/genesis.json diff --git a/architecture/CACHING.md b/architecture/CACHING.md new file mode 100644 index 0000000..421e138 --- /dev/null +++ b/architecture/CACHING.md @@ -0,0 +1,90 @@ +## Caching Middleware Architecture + +Package `cachemdw` is responsible for caching EVM requests and provides corresponding middleware + +package can work with any underlying storage which implements simple `cache.Cache` interface + +package provides two different middlewares: +- `IsCachedMiddleware` (should be run before proxy middleware) +- `CachingMiddleware` (should be run after proxy middleware) + +`IsCachedMiddleware` is responsible for setting response in the context if it's in the cache + +`CachingMiddleware` is responsible for caching response by taking a value from context (should be set by `ProxyMiddleware`) and setting in the cache + +## CachingMiddleware + +`CachingMiddleware` returns kava-proxy-service compatible middleware which works in the following way: +- tries to get decoded request from context (previous middleware should set it) +- checks few conditions: + - if request isn't already cached + - if request is cacheable + - if response is present in context +- if all above is true - caches the response +- calls next middleware + +## IsCachedMiddleware + +`IsCachedMiddleware` returns kava-proxy-service compatible middleware which works in the following way: +- tries to get decoded request from context (previous middleware should set it) +- tries to get response from the cache + - if present sets cached response in context, marks as cached in context and forwards to next middleware + - if not present marks as uncached in context and forwards to next middleware +- next middleware should check whether request was cached and act accordingly: + +## What requests are cached? + +As of now we cache requests which has `specific block number` in request, for example: +```json +{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + "0x1b4", // specific block number + true + ], + "id":1 +} +``` + +we don't cache requests without `specific block number` or requests which uses magic tags as a block number: "latest", "pending", "earliest", etc... + +## Cache Invalidation + +### Keys Structure + +Keys have such format: + +`:evm-request::sha256:` + +For example: + +`local-chain:evm-request:eth_getBlockByHash:sha256:2db366278f2cb463f92147bd888bdcad528b44baa94b7920fdff35f4c11ee617` + +### Invalidation for specific method + +If you want to invalidate cache for specific method you may run such command: + +`redis-cli KEYS ":evm-request::sha256:*" | xargs redis-cli DEL` + +For example: + +`redis-cli KEYS "local-chain:evm-request:eth_getBlockByNumber:sha256:*" | xargs redis-cli DEL` + +### Invalidation for all methods + +If you want to invalidate cache for all methods you may run such command: + +`redis-cli KEYS ":evm-request:*" | xargs redis-cli DEL` + +For example: + +`redis-cli KEYS "local-chain:evm-request:*" | xargs redis-cli DEL` + +## Architecture Diagrams + +### Serve request from the cache (avoiding call to actual backend) +![image](https://github.com/Kava-Labs/kava-proxy-service/assets/37836031/1bd8cb8e-6a9e-45a6-b698-3f99eaab2aa2) + +### Serve request from the backend and then cache the response +![image](https://github.com/Kava-Labs/kava-proxy-service/assets/37836031/b0eb5cb9-51da-43f9-bb7d-b94bf482f366) diff --git a/clients/cache/cache.go b/clients/cache/cache.go new file mode 100644 index 0000000..cc6f9cb --- /dev/null +++ b/clients/cache/cache.go @@ -0,0 +1,16 @@ +package cache + +import ( + "context" + "errors" + "time" +) + +var ErrNotFound = errors.New("value not found in the cache") + +type Cache interface { + Set(ctx context.Context, key string, data []byte, expiration time.Duration) error + Get(ctx context.Context, key string) ([]byte, error) + Delete(ctx context.Context, key string) error + Healthcheck(ctx context.Context) error +} diff --git a/clients/cache/inmemory.go b/clients/cache/inmemory.go new file mode 100644 index 0000000..9293af2 --- /dev/null +++ b/clients/cache/inmemory.go @@ -0,0 +1,101 @@ +package cache + +import ( + "context" + "sync" + "time" +) + +// InMemoryCache is an in-memory implementation of the Cache interface. +type InMemoryCache struct { + data map[string]cacheItem + mutex sync.RWMutex +} + +// Ensure InMemoryCache implements the Cache interface. +var _ Cache = (*InMemoryCache)(nil) + +// cacheItem represents an item stored in the cache. +type cacheItem struct { + data []byte + expiration time.Time +} + +// NewInMemoryCache creates a new instance of InMemoryCache. +func NewInMemoryCache() *InMemoryCache { + return &InMemoryCache{ + data: make(map[string]cacheItem), + } +} + +// Set sets the value of a key in the cache. +func (c *InMemoryCache) Set( + ctx context.Context, + key string, + data []byte, + expiration time.Duration, +) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + expiry := time.Now().Add(expiration) + + if expiration == 0 { + // 100 years in the future to prevent expiry + expiry = time.Now().AddDate(100, 0, 0) + } + + c.data[key] = cacheItem{ + data: data, + expiration: expiry, + } + + return nil +} + +// Get retrieves the value of a key from the cache. +func (c *InMemoryCache) Get(ctx context.Context, key string) ([]byte, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + item, ok := c.data[key] + if !ok || time.Now().After(item.expiration) { + // Not a real ttl but just replicates it for fetching + delete(c.data, key) + + return nil, ErrNotFound + } + + return item.data, nil +} + +// GetAll returns all the non-expired data in the cache. +func (c *InMemoryCache) GetAll(ctx context.Context) map[string][]byte { + c.mutex.RLock() + defer c.mutex.RUnlock() + + result := make(map[string][]byte) + + for key, item := range c.data { + if time.Now().After(item.expiration) { + delete(c.data, key) + } else { + result[key] = item.data + } + } + + return result +} + +// Delete removes a key from the cache. +func (c *InMemoryCache) Delete(ctx context.Context, key string) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + delete(c.data, key) + return nil +} + +func (c *InMemoryCache) Healthcheck(ctx context.Context) error { + return nil +} diff --git a/clients/cache/redis.go b/clients/cache/redis.go new file mode 100644 index 0000000..f5bc604 --- /dev/null +++ b/clients/cache/redis.go @@ -0,0 +1,114 @@ +package cache + +import ( + "context" + "fmt" + "time" + + "github.com/kava-labs/kava-proxy-service/logging" + "github.com/redis/go-redis/v9" +) + +type RedisConfig struct { + Address string + Password string + DB int +} + +// RedisCache is an implementation of Cache that uses Redis as the caching backend. +type RedisCache struct { + client *redis.Client + *logging.ServiceLogger +} + +var _ Cache = (*RedisCache)(nil) + +func NewRedisCache( + cfg *RedisConfig, + logger *logging.ServiceLogger, +) (*RedisCache, error) { + client := redis.NewClient(&redis.Options{ + Addr: cfg.Address, + Password: cfg.Password, + DB: cfg.DB, + }) + + return &RedisCache{ + client: client, + ServiceLogger: logger, + }, nil +} + +// Set sets the value for the given key in the cache with the given expiration. +func (rc *RedisCache) Set( + ctx context.Context, + key string, + value []byte, + expiration time.Duration, +) error { + rc.Logger.Trace(). + Str("key", key). + Str("value", string(value)). + Dur("expiration", expiration). + Msg("setting value in redis") + + return rc.client.Set(ctx, key, value, expiration).Err() +} + +// Get gets the value for the given key in the cache. +func (rc *RedisCache) Get( + ctx context.Context, + key string, +) ([]byte, error) { + rc.Logger.Trace(). + Str("key", key). + Msg("getting value from redis") + + val, err := rc.client.Get(ctx, key).Bytes() + if err == redis.Nil { + rc.Logger.Trace(). + Str("key", key). + Msgf("value not found in redis") + return nil, ErrNotFound + } + if err != nil { + rc.Logger.Error(). + Str("key", key). + Err(err). + Msg("error during getting value from redis") + return nil, err + } + + rc.Logger.Trace(). + Str("key", key). + Str("value", string(val)). + Msg("successfully got value from redis") + + return val, nil +} + +// Delete deletes the value for the given key in the cache. +func (rc *RedisCache) Delete(ctx context.Context, key string) error { + rc.Logger.Trace(). + Str("key", key). + Msg("deleting value from redis") + + return rc.client.Del(ctx, key).Err() +} + +func (rc *RedisCache) Healthcheck(ctx context.Context) error { + rc.Logger.Trace().Msg("redis healthcheck was called") + + // Check if we can connect to Redis + _, err := rc.client.Ping(ctx).Result() + if err != nil { + rc.Logger.Error(). + Err(err). + Msg("can't ping redis") + return fmt.Errorf("error connecting to Redis: %v", err) + } + + rc.Logger.Trace().Msg("redis healthcheck was successful") + + return nil +} diff --git a/config/config.go b/config/config.go index 25a6f80..f9593a6 100644 --- a/config/config.go +++ b/config/config.go @@ -40,6 +40,11 @@ type Config struct { MetricPartitioningRoutineInterval time.Duration MetricPartitioningRoutineDelayFirstRun time.Duration MetricPartitioningPrefillPeriodDays int + CacheEnabled bool + RedisEndpointURL string + RedisPassword string + CacheTTL time.Duration + CachePrefix string } const ( @@ -84,6 +89,11 @@ const ( DEFAULT_DATABASE_READ_TIMEOUT_SECONDS = 60 DATABASE_WRITE_TIMEOUT_SECONDS_ENVIRONMENT_KEY = "DATABASE_WRITE_TIMEOUT_SECONDS" DEFAULT_DATABASE_WRITE_TIMEOUT_SECONDS = 10 + CACHE_ENABLED_ENVIRONMENT_KEY = "CACHE_ENABLED" + REDIS_ENDPOINT_URL_ENVIRONMENT_KEY = "REDIS_ENDPOINT_URL" + REDIS_PASSWORD_ENVIRONMENT_KEY = "REDIS_PASSWORD" + CACHE_TTL_ENVIRONMENT_KEY = "CACHE_TTL_SECONDS" + CACHE_PREFIX_ENVIRONMENT_KEY = "CACHE_PREFIX" ) var ErrEmptyHostMap = errors.New("backend host url map is empty") @@ -217,5 +227,10 @@ func ReadConfig() Config { MetricPartitioningRoutineInterval: time.Duration(time.Duration(EnvOrDefaultInt(METRIC_PARTITIONING_ROUTINE_INTERVAL_SECONDS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_ROUTINE_INTERVAL_SECONDS)) * time.Second), MetricPartitioningRoutineDelayFirstRun: time.Duration(time.Duration(EnvOrDefaultInt(METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS)) * time.Second), MetricPartitioningPrefillPeriodDays: EnvOrDefaultInt(METRIC_PARTITIONING_PREFILL_PERIOD_DAYS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_PREFILL_PERIOD_DAYS), + CacheEnabled: EnvOrDefaultBool(CACHE_ENABLED_ENVIRONMENT_KEY, false), + RedisEndpointURL: os.Getenv(REDIS_ENDPOINT_URL_ENVIRONMENT_KEY), + RedisPassword: os.Getenv(REDIS_PASSWORD_ENVIRONMENT_KEY), + CacheTTL: time.Duration(EnvOrDefaultInt(CACHE_TTL_ENVIRONMENT_KEY, 0)) * time.Second, + CachePrefix: os.Getenv(CACHE_PREFIX_ENVIRONMENT_KEY), } } diff --git a/config/validate.go b/config/validate.go index 901fb37..7a70ca0 100644 --- a/config/validate.go +++ b/config/validate.go @@ -5,6 +5,7 @@ import ( "fmt" "net/url" "strconv" + "strings" ) var ( @@ -60,6 +61,19 @@ func Validate(config Config) error { allErrs = errors.Join(allErrs, fmt.Errorf("invalid %s specified %d, must be non-zero and less than or equal to %d", METRIC_PARTITIONING_PREFILL_PERIOD_DAYS_ENVIRONMENT_KEY, config.MetricPartitioningPrefillPeriodDays, MaxMetricPartitioningPrefillPeriodDays)) } + if config.RedisEndpointURL == "" { + allErrs = errors.Join(allErrs, fmt.Errorf("invalid %s specified %s, must not be empty", REDIS_ENDPOINT_URL_ENVIRONMENT_KEY, config.RedisEndpointURL)) + } + if config.CacheTTL <= 0 { + allErrs = errors.Join(allErrs, fmt.Errorf("invalid %s specified %s, must be greater than zero", CACHE_TTL_ENVIRONMENT_KEY, config.CacheTTL)) + } + if strings.Contains(config.CachePrefix, ":") { + allErrs = errors.Join(allErrs, fmt.Errorf("invalid %s specified %s, must not contain colon symbol", CACHE_PREFIX_ENVIRONMENT_KEY, config.CachePrefix)) + } + if config.CachePrefix == "" { + allErrs = errors.Join(allErrs, fmt.Errorf("invalid %s specified %s, must not be empty", CACHE_PREFIX_ENVIRONMENT_KEY, config.CachePrefix)) + } + return allErrs } diff --git a/decode/evm_rpc.go b/decode/evm_rpc.go index 0b54b05..5b97d14 100644 --- a/decode/evm_rpc.go +++ b/decode/evm_rpc.go @@ -6,12 +6,17 @@ import ( "errors" "fmt" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" - cosmosmath "cosmossdk.io/math" + "github.com/ethereum/go-ethereum/common" + ethctypes "github.com/ethereum/go-ethereum/core/types" ) +// EVMBlockGetter defines an interface which can be implemented by any client capable of getting ethereum block header by hash +type EVMBlockGetter interface { + // HeaderByHash returns ethereum block header by hash + HeaderByHash(ctx context.Context, hash common.Hash) (*ethctypes.Header, error) +} + // These block tags are special strings used to reference blocks in JSON-RPC // see https://ethereum.org/en/developers/docs/apis/json-rpc/#default-block const ( @@ -217,7 +222,7 @@ func DecodeEVMRPCRequest(body []byte) (*EVMRPCRequestEnvelope, error) { // - the request is a valid evm rpc request // - the method for the request supports specifying a block number // - the provided block number is a valid tag or number -func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context.Context, evmClient *ethclient.Client) (int64, error) { +func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context.Context, blockGetter EVMBlockGetter) (int64, error) { // only attempt to extract block number from a valid ethereum api request if r.Method == "" { return 0, ErrInvalidEthAPIRequest @@ -228,7 +233,7 @@ func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context. } // handle cacheable by block hash if MethodHasBlockHashParam(r.Method) { - return lookupBlockNumberFromHashParam(ctx, evmClient, r.Method, r.Params) + return lookupBlockNumberFromHashParam(ctx, blockGetter, r.Method, r.Params) } // handle unable to cached return 0, ErrUncachaebleByBlockNumberEthRequest @@ -236,7 +241,7 @@ func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context. // Generic method to lookup the block number // based on the hash value in a set of params -func lookupBlockNumberFromHashParam(ctx context.Context, evmClient *ethclient.Client, methodName string, params []interface{}) (int64, error) { +func lookupBlockNumberFromHashParam(ctx context.Context, blockGetter EVMBlockGetter, methodName string, params []interface{}) (int64, error) { paramIndex, exists := MethodNameToBlockHashParamIndex[methodName] if !exists { @@ -249,8 +254,7 @@ func lookupBlockNumberFromHashParam(ctx context.Context, evmClient *ethclient.Cl return 0, fmt.Errorf(fmt.Sprintf("error decoding block hash param from params %+v at index %d", params, paramIndex)) } - header, err := evmClient.HeaderByHash(ctx, common.HexToHash(blockHash)) - + header, err := blockGetter.HeaderByHash(ctx, common.HexToHash(blockHash)) if err != nil { return 0, err } diff --git a/go.mod b/go.mod index 27a36ba..8bc47b3 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cosmossdk.io/math v1.0.0 github.com/ethereum/go-ethereum v1.11.2 github.com/google/uuid v1.3.0 + github.com/redis/go-redis/v9 v9.2.1 github.com/rs/zerolog v1.29.0 github.com/stretchr/testify v1.8.2 github.com/uptrace/bun v1.1.12 @@ -18,9 +19,11 @@ require ( require ( github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/color v1.14.1 // indirect github.com/go-ole/go-ole v1.2.1 // indirect github.com/go-stack/stack v1.8.1 // indirect diff --git a/go.sum b/go.sum index a82a2c9..55e2f8e 100644 --- a/go.sum +++ b/go.sum @@ -5,10 +5,13 @@ github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIO github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VictoriaMetrics/fastcache v1.6.0 h1:C/3Oi3EiBCqufydp1neRZkqcwmEiuRT9c3fqvvgKm5o= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cockroachdb/errors v1.9.1 h1:yFVvsI0VxmRShfawbt/laCIDy/mtTqqnvoNgiy5bEV8= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 h1:ytcWPaNPhNoGMWEhDvS3zToKcDpRsLuRolQJBVGdozk= @@ -24,6 +27,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/ethereum/go-ethereum v1.11.2 h1:z/luyejbevDCAMUUiu0rc80dxJxOnpoG58k5o0tSawc= github.com/ethereum/go-ethereum v1.11.2/go.mod h1:DuefStAgaxoaYGLR0FueVcVbehmn5n9QUcVrMCuOvuc= @@ -78,6 +83,8 @@ github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvq github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= +github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= +github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/main_test.go b/main_test.go index 560db62..a50161a 100644 --- a/main_test.go +++ b/main_test.go @@ -1,25 +1,32 @@ package main_test import ( + "bytes" "context" + "encoding/json" + "errors" "fmt" + "io" + "math/big" + "net/http" "os" "strconv" "testing" "time" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" - "github.com/ethereum/go-ethereum" "github.com/ethereum/go-ethereum/common" "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" + "github.com/redis/go-redis/v9" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" "github.com/kava-labs/kava-proxy-service/service" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" ) const ( @@ -58,6 +65,9 @@ var ( Logger: &testServiceLogger, RunDatabaseMigrations: false, } + + redisURL = os.Getenv("TEST_REDIS_ENDPOINT_URL") + redisPassword = os.Getenv("REDIS_PASSWORD") ) // lookup all the request metrics in the database paging as necessary @@ -435,3 +445,311 @@ func TestE2ETest_HeightBasedRouting(t *testing.T) { }) } } + +func TestE2ETestCachingMdwWithBlockNumberParam(t *testing.T) { + // create api and database clients + client, err := ethclient.Dial(proxyServiceURL) + if err != nil { + t.Fatal(err) + } + + redisClient := redis.NewClient(&redis.Options{ + Addr: redisURL, + Password: redisPassword, + DB: 0, + }) + cleanUpRedis(t, redisClient) + expectKeysNum(t, redisClient, 0) + + for _, tc := range []struct { + desc string + method string + params []interface{} + keysNum int + }{ + { + desc: "test case #1", + method: "eth_getBlockByNumber", + params: []interface{}{"0x1", true}, + keysNum: 1, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // test cache MISS and cache HIT scenarios for specified method + // check corresponding values in cachemdw.CacheHeaderKey HTTP header + // check that cached and non-cached responses are equal + + // eth_getBlockByNumber - cache MISS + resp1 := mkJsonRpcRequest(t, proxyServiceURL, 1, tc.method, tc.params) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp1.Header[cachemdw.CacheHeaderKey][0]) + body1, err := io.ReadAll(resp1.Body) + require.NoError(t, err) + err = checkJsonRpcErr(body1) + require.NoError(t, err) + expectKeysNum(t, redisClient, tc.keysNum) + expectedKey := "local-chain:evm-request:eth_getBlockByNumber:sha256:d08b426164eacf6646fb1817403ec0af5d37869a0f32a01ebfab3096fa4999be" + containsKey(t, redisClient, expectedKey) + + // eth_getBlockByNumber - cache HIT + resp2 := mkJsonRpcRequest(t, proxyServiceURL, 1, tc.method, tc.params) + require.Equal(t, cachemdw.CacheHitHeaderValue, resp2.Header[cachemdw.CacheHeaderKey][0]) + body2, err := io.ReadAll(resp2.Body) + require.NoError(t, err) + err = checkJsonRpcErr(body2) + require.NoError(t, err) + expectKeysNum(t, redisClient, tc.keysNum) + containsKey(t, redisClient, expectedKey) + + require.JSONEq(t, string(body1), string(body2), "blocks should be the same") + }) + } + + // test cache MISS and cache HIT scenarios for eth_getBlockByNumber method + // check that cached and non-cached responses are equal + { + // eth_getBlockByNumber - cache MISS + block1, err := client.BlockByNumber(testContext, big.NewInt(2)) + require.NoError(t, err) + expectKeysNum(t, redisClient, 2) + expectedKey := "local-chain:evm-request:eth_getBlockByNumber:sha256:0bfa7c5affc525ed731803c223042b4b1eb16ee7a6a539ae213b47a3ef6e3a7d" + containsKey(t, redisClient, expectedKey) + + // eth_getBlockByNumber - cache HIT + block2, err := client.BlockByNumber(testContext, big.NewInt(2)) + require.NoError(t, err) + expectKeysNum(t, redisClient, 2) + containsKey(t, redisClient, expectedKey) + + require.Equal(t, block1, block2, "blocks should be the same") + } + + cleanUpRedis(t, redisClient) +} + +func TestE2ETestCachingMdwWithBlockNumberParam_EmptyResult(t *testing.T) { + testRandomAddressHex := "0x6767114FFAA17C6439D7AEA480738B982CE63A02" + testAddress := common.HexToAddress(testRandomAddressHex) + + // create api and database clients + client, err := ethclient.Dial(proxyServiceURL) + if err != nil { + t.Fatal(err) + } + + redisClient := redis.NewClient(&redis.Options{ + Addr: redisURL, + Password: redisPassword, + DB: 0, + }) + cleanUpRedis(t, redisClient) + expectKeysNum(t, redisClient, 0) + + for _, tc := range []struct { + desc string + method string + params []interface{} + keysNum int + }{ + { + desc: "test case #1", + method: "eth_getTransactionCount", + params: []interface{}{testAddress, "0x1"}, + keysNum: 0, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // both calls should lead to cache MISS scenario, because empty results aren't cached + // check corresponding values in cachemdw.CacheHeaderKey HTTP header + // check that responses are equal + + // eth_getBlockByNumber - cache MISS + resp1 := mkJsonRpcRequest(t, proxyServiceURL, 1, tc.method, tc.params) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp1.Header[cachemdw.CacheHeaderKey][0]) + body1, err := io.ReadAll(resp1.Body) + require.NoError(t, err) + err = checkJsonRpcErr(body1) + require.NoError(t, err) + expectKeysNum(t, redisClient, tc.keysNum) + + // eth_getBlockByNumber - cache MISS again (empty results aren't cached) + resp2 := mkJsonRpcRequest(t, proxyServiceURL, 1, tc.method, tc.params) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp2.Header[cachemdw.CacheHeaderKey][0]) + body2, err := io.ReadAll(resp2.Body) + require.NoError(t, err) + err = checkJsonRpcErr(body2) + require.NoError(t, err) + expectKeysNum(t, redisClient, tc.keysNum) + + require.JSONEq(t, string(body1), string(body2), "blocks should be the same") + }) + } + + // both calls should lead to cache MISS scenario, because empty results aren't cached + // check that responses are equal + { + // eth_getTransactionCount - cache MISS + bal1, err := client.NonceAt(testContext, testAddress, big.NewInt(2)) + require.NoError(t, err) + expectKeysNum(t, redisClient, 0) + + // eth_getTransactionCount - cache MISS again (empty results aren't cached) + bal2, err := client.NonceAt(testContext, testAddress, big.NewInt(2)) + require.NoError(t, err) + expectKeysNum(t, redisClient, 0) + + require.Equal(t, bal1, bal2, "balances should be the same") + } + + cleanUpRedis(t, redisClient) +} + +func TestE2ETestCachingMdwWithBlockNumberParam_DiffJsonRpcReqIDs(t *testing.T) { + redisClient := redis.NewClient(&redis.Options{ + Addr: redisURL, + Password: redisPassword, + DB: 0, + }) + cleanUpRedis(t, redisClient) + expectKeysNum(t, redisClient, 0) + + for _, tc := range []struct { + desc string + method string + params []interface{} + keysNum int + }{ + { + desc: "test case #1", + method: "eth_getBlockByNumber", + params: []interface{}{"0x1", true}, + keysNum: 1, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // test cache MISS and cache HIT scenarios for specified method + // check corresponding values in cachemdw.CacheHeaderKey HTTP header + // NOTE: JSON-RPC request IDs are different + // check that cached and non-cached responses differ only in response ID + + // eth_getBlockByNumber - cache MISS + resp1 := mkJsonRpcRequest(t, proxyServiceURL, 1, tc.method, tc.params) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp1.Header[cachemdw.CacheHeaderKey][0]) + body1, err := io.ReadAll(resp1.Body) + require.NoError(t, err) + err = checkJsonRpcErr(body1) + require.NoError(t, err) + expectKeysNum(t, redisClient, tc.keysNum) + expectedKey := "local-chain:evm-request:eth_getBlockByNumber:sha256:d08b426164eacf6646fb1817403ec0af5d37869a0f32a01ebfab3096fa4999be" + containsKey(t, redisClient, expectedKey) + + // eth_getBlockByNumber - cache HIT + resp2 := mkJsonRpcRequest(t, proxyServiceURL, 2, tc.method, tc.params) + require.Equal(t, cachemdw.CacheHitHeaderValue, resp2.Header[cachemdw.CacheHeaderKey][0]) + body2, err := io.ReadAll(resp2.Body) + require.NoError(t, err) + err = checkJsonRpcErr(body2) + require.NoError(t, err) + expectKeysNum(t, redisClient, tc.keysNum) + containsKey(t, redisClient, expectedKey) + + rpcResp1, err := cachemdw.UnmarshalJsonRpcResponse(body1) + require.NoError(t, err) + rpcResp2, err := cachemdw.UnmarshalJsonRpcResponse(body2) + require.NoError(t, err) + + // JSON-RPC Version and Result should be equal + require.Equal(t, rpcResp1.Version, rpcResp2.Version) + require.Equal(t, rpcResp1.Result, rpcResp2.Result) + + // JSON-RPC response ID should correspond to JSON-RPC request ID + require.Equal(t, string(rpcResp1.ID), "1") + require.Equal(t, string(rpcResp2.ID), "2") + + // JSON-RPC error should be empty + require.Empty(t, rpcResp1.JsonRpcError) + require.Empty(t, rpcResp2.JsonRpcError) + + // Double-check that JSON-RPC responses differ only in response ID + rpcResp2.ID = []byte("1") + require.Equal(t, rpcResp1, rpcResp2) + }) + } + + cleanUpRedis(t, redisClient) +} + +func expectKeysNum(t *testing.T, redisClient *redis.Client, keysNum int) { + keys, err := redisClient.Keys(context.Background(), "*").Result() + require.NoError(t, err) + + require.Equal(t, keysNum, len(keys)) +} + +func containsKey(t *testing.T, redisClient *redis.Client, key string) { + keys, err := redisClient.Keys(context.Background(), key).Result() + require.NoError(t, err) + require.GreaterOrEqual(t, len(keys), 1) +} + +func cleanUpRedis(t *testing.T, redisClient *redis.Client) { + keys, err := redisClient.Keys(context.Background(), "*").Result() + require.NoError(t, err) + + if len(keys) != 0 { + _, err = redisClient.Del(context.Background(), keys...).Result() + require.NoError(t, err) + } +} + +func mkJsonRpcRequest(t *testing.T, proxyServiceURL string, id int, method string, params []interface{}) *http.Response { + req := newJsonRpcRequest(id, method, params) + reqInJSON, err := json.Marshal(req) + require.NoError(t, err) + reqReader := bytes.NewBuffer(reqInJSON) + resp, err := http.Post(proxyServiceURL, "application/json", reqReader) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + return resp +} + +type jsonRpcRequest struct { + JsonRpc string `json:"jsonrpc"` + Id int `json:"id"` + Method string `json:"method"` + Params []interface{} `json:"params"` +} + +func newJsonRpcRequest(id int, method string, params []interface{}) *jsonRpcRequest { + return &jsonRpcRequest{ + JsonRpc: "2.0", + Id: id, + Method: method, + Params: params, + } +} + +type jsonRpcResponse struct { + Jsonrpc string `json:"jsonrpc"` + Id int `json:"id"` + Result interface{} `json:"result"` + Error string `json:"error"` +} + +func checkJsonRpcErr(body []byte) error { + var resp jsonRpcResponse + err := json.Unmarshal(body, &resp) + if err != nil { + return err + } + + if resp.Error != "" { + return errors.New(resp.Error) + } + + if resp.Result == "" { + return errors.New("result is empty") + } + + return nil +} diff --git a/service/cachemdw/cache.go b/service/cachemdw/cache.go new file mode 100644 index 0000000..17f597f --- /dev/null +++ b/service/cachemdw/cache.go @@ -0,0 +1,152 @@ +package cachemdw + +import ( + "context" + "encoding/json" + "errors" + "fmt" + "strconv" + "time" + + "github.com/kava-labs/kava-proxy-service/clients/cache" + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/logging" +) + +// ServiceCache is responsible for caching EVM requests and provides corresponding middleware +// ServiceCache can work with any underlying storage which implements simple cache.Cache interface +type ServiceCache struct { + cacheClient cache.Cache + blockGetter decode.EVMBlockGetter + // TTL for cached evm requests + cacheTTL time.Duration + decodedRequestContextKey any + // cachePrefix is used as prefix for any key in the cache + cachePrefix string + cacheEnabled bool + + *logging.ServiceLogger +} + +func NewServiceCache( + cacheClient cache.Cache, + blockGetter decode.EVMBlockGetter, + cacheTTL time.Duration, + decodedRequestContextKey any, + cachePrefix string, + cacheEnabled bool, + logger *logging.ServiceLogger, +) *ServiceCache { + return &ServiceCache{ + cacheClient: cacheClient, + blockGetter: blockGetter, + cacheTTL: cacheTTL, + decodedRequestContextKey: decodedRequestContextKey, + cachePrefix: cachePrefix, + cacheEnabled: cacheEnabled, + ServiceLogger: logger, + } +} + +// IsCacheable checks if EVM request is cacheable. +// In current implementation we consider request is cacheable if it has specific block height +func IsCacheable( + logger *logging.ServiceLogger, + req *decode.EVMRPCRequestEnvelope, +) bool { + if req.Method == "" { + return false + } + + if decode.MethodHasBlockHashParam(req.Method) { + return true + } + + if decode.MethodHasBlockNumberParam(req.Method) { + blockNumber, err := decode.ParseBlockNumberFromParams(req.Method, req.Params) + if err != nil { + logger.Logger.Error(). + Err(err). + Msg("can't parse block number from params") + return false + } + + // blockNumber < 0 means magic tag was used, one of the "latest", "pending", "earliest", etc... + // we cache requests without magic tag or with the earliest magic tag + return blockNumber > 0 || blockNumber == decode.BlockTagToNumberCodec[decode.BlockTagEarliest] + } + + return false +} + +// GetCachedQueryResponse calculates cache key for request and then tries to get it from cache. +// NOTE: only JSON-RPC response's result will be taken from the cache. +// JSON-RPC response's ID and Version will be constructed on the fly to match JSON-RPC request. +func (c *ServiceCache) GetCachedQueryResponse( + ctx context.Context, + req *decode.EVMRPCRequestEnvelope, +) ([]byte, error) { + key, err := GetQueryKey(c.cachePrefix, req) + if err != nil { + return nil, err + } + + // get JSON-RPC response's result from the cache + result, err := c.cacheClient.Get(ctx, key) + if err != nil { + return nil, err + } + + // JSON-RPC response's ID and Version should match JSON-RPC request + id := strconv.Itoa(int(req.ID)) + response := JsonRpcResponse{ + Version: req.JSONRPCVersion, + ID: []byte(id), + Result: result, + } + responseInJSON, err := json.Marshal(response) + if err != nil { + return nil, err + } + + return responseInJSON, nil +} + +// CacheQueryResponse calculates cache key for request and then saves response to the cache. +// NOTE: only JSON-RPC response's result is cached. +// There is no point to cache JSON-RPC response's ID (because it should correspond to request's ID, which constantly changes). +// Same with JSON-RPC response's Version. +func (c *ServiceCache) CacheQueryResponse( + ctx context.Context, + req *decode.EVMRPCRequestEnvelope, + responseInBytes []byte, +) error { + // don't cache uncacheable requests + if !IsCacheable(c.ServiceLogger, req) { + return errors.New("query isn't cacheable") + } + + response, err := UnmarshalJsonRpcResponse(responseInBytes) + if err != nil { + return fmt.Errorf("can't unmarshal json-rpc response: %w", err) + } + // don't cache uncacheable responses + if !response.IsCacheable() { + return fmt.Errorf("response isn't cacheable") + } + + key, err := GetQueryKey(c.cachePrefix, req) + if err != nil { + return err + } + + return c.cacheClient.Set(ctx, key, response.Result, c.cacheTTL) +} + +func (c *ServiceCache) Healthcheck(ctx context.Context) error { + return c.cacheClient.Healthcheck(ctx) +} + +func (c *ServiceCache) IsCacheEnabled() bool { + return c.cacheEnabled +} diff --git a/service/cachemdw/cache_test.go b/service/cachemdw/cache_test.go new file mode 100644 index 0000000..3d39e5c --- /dev/null +++ b/service/cachemdw/cache_test.go @@ -0,0 +1,106 @@ +package cachemdw_test + +import ( + "context" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + ethctypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + + "github.com/kava-labs/kava-proxy-service/clients/cache" + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" +) + +const ( + defaultCachePrefixString = "1" + defaultBlockNumber = "42" +) + +var ( + defaultQueryResp = []byte(testEVMQueries[TestRequestWeb3ClientVersion].ResponseBody) +) + +type MockEVMBlockGetter struct{} + +func NewMockEVMBlockGetter() *MockEVMBlockGetter { + return &MockEVMBlockGetter{} +} + +var _ decode.EVMBlockGetter = (*MockEVMBlockGetter)(nil) + +func (c *MockEVMBlockGetter) HeaderByHash(ctx context.Context, hash common.Hash) (*ethctypes.Header, error) { + panic("not implemented") +} + +func TestUnitTestIsCacheable(t *testing.T) { + logger, err := logging.New("TRACE") + require.NoError(t, err) + + for _, tc := range []struct { + desc string + req *decode.EVMRPCRequestEnvelope + cacheable bool + }{ + { + desc: "test case #1", + req: mkEVMRPCRequestEnvelope(defaultBlockNumber), + cacheable: true, + }, + { + desc: "test case #2", + req: mkEVMRPCRequestEnvelope("0"), + cacheable: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + cacheable := cachemdw.IsCacheable(&logger, tc.req) + require.Equal(t, tc.cacheable, cacheable) + }) + } +} + +func TestUnitTestCacheQueryResponse(t *testing.T) { + logger, err := logging.New("TRACE") + require.NoError(t, err) + + inMemoryCache := cache.NewInMemoryCache() + blockGetter := NewMockEVMBlockGetter() + cacheTTL := time.Hour + ctxb := context.Background() + + serviceCache := cachemdw.NewServiceCache( + inMemoryCache, + blockGetter, + cacheTTL, + service.DecodedRequestContextKey, + defaultCachePrefixString, + true, + &logger, + ) + + req := mkEVMRPCRequestEnvelope(defaultBlockNumber) + resp, err := serviceCache.GetCachedQueryResponse(ctxb, req) + require.Equal(t, cache.ErrNotFound, err) + require.Empty(t, resp) + + err = serviceCache.CacheQueryResponse(ctxb, req, defaultQueryResp) + require.NoError(t, err) + + resp, err = serviceCache.GetCachedQueryResponse(ctxb, req) + require.NoError(t, err) + require.JSONEq(t, string(defaultQueryResp), string(resp)) +} + +func mkEVMRPCRequestEnvelope(blockNumber string) *decode.EVMRPCRequestEnvelope { + return &decode.EVMRPCRequestEnvelope{ + JSONRPCVersion: "2.0", + ID: 1, + Method: "eth_getBalance", + Params: []interface{}{"0x1234", blockNumber}, + } +} diff --git a/service/cachemdw/caching_middleware.go b/service/cachemdw/caching_middleware.go new file mode 100644 index 0000000..63f237c --- /dev/null +++ b/service/cachemdw/caching_middleware.go @@ -0,0 +1,66 @@ +package cachemdw + +import ( + "net/http" + + "github.com/kava-labs/kava-proxy-service/decode" +) + +// CachingMiddleware returns kava-proxy-service compatible middleware which works in the following way: +// - tries to get decoded request from context (previous middleware should set it) +// - checks few conditions: +// - if request isn't already cached +// - if request is cacheable +// - if response is present in context +// +// - if all above is true - caches the response +// - calls next middleware +func (c *ServiceCache) CachingMiddleware( + next http.Handler, +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // if cache is not enabled - do nothing and forward to next middleware + if !c.cacheEnabled { + c.Logger.Trace(). + Str("method", r.Method). + Str("url", r.URL.String()). + Str("host", r.Host). + Msg("cache is disabled skipping caching-middleware") + + next.ServeHTTP(w, r) + return + } + + // if we can't get decoded request then forward to next middleware + req := r.Context().Value(c.decodedRequestContextKey) + decodedReq, ok := (req).(*decode.EVMRPCRequestEnvelope) + if !ok { + c.Logger.Error(). + Str("method", r.Method). + Str("url", r.URL.String()). + Str("host", r.Host). + Msg("can't cast request to *EVMRPCRequestEnvelope type") + + next.ServeHTTP(w, r) + return + } + + isCached := IsRequestCached(r.Context()) + cacheable := IsCacheable(c.ServiceLogger, decodedReq) + response := r.Context().Value(ResponseContextKey) + typedResponse, ok := response.([]byte) + + // if request isn't already cached, request is cacheable and response is present in context - cache the response + if !isCached && cacheable && ok { + if err := c.CacheQueryResponse( + r.Context(), + decodedReq, + typedResponse, + ); err != nil { + c.Logger.Error().Msgf("can't validate and cache response: %v", err) + } + } + + next.ServeHTTP(w, r) + } +} diff --git a/service/cachemdw/doc.go b/service/cachemdw/doc.go new file mode 100644 index 0000000..aafc683 --- /dev/null +++ b/service/cachemdw/doc.go @@ -0,0 +1,10 @@ +// Package cachemdw is responsible for caching EVM requests and provides corresponding middleware +// package can work with any underlying storage which implements simple cache.Cache interface +// +// package provides two different middlewares: +// - IsCachedMiddleware (should be run before proxy middleware) +// - CachingMiddleware (should be run after proxy middleware) +// +// IsCachedMiddleware is responsible for setting response in the context if it's in the cache +// CachingMiddleware is responsible for caching response by taking a value from context (should be set by proxy mdw) and setting in the cache +package cachemdw diff --git a/service/cachemdw/is_cached_middleware.go b/service/cachemdw/is_cached_middleware.go new file mode 100644 index 0000000..64a3b6c --- /dev/null +++ b/service/cachemdw/is_cached_middleware.go @@ -0,0 +1,89 @@ +package cachemdw + +import ( + "context" + "net/http" + + "github.com/kava-labs/kava-proxy-service/clients/cache" + "github.com/kava-labs/kava-proxy-service/decode" +) + +const ( + CachedContextKey = "X-KAVA-PROXY-CACHED" + ResponseContextKey = "X-KAVA-PROXY-RESPONSE" + + CacheHeaderKey = "X-Kava-Proxy-Cache-Status" + CacheHitHeaderValue = "HIT" + CacheMissHeaderValue = "MISS" +) + +// IsCachedMiddleware returns kava-proxy-service compatible middleware which works in the following way: +// - tries to get decoded request from context (previous middleware should set it) +// - tries to get response from the cache +// - if present sets cached response in context, marks as cached in context and forwards to next middleware +// - if not present marks as uncached in context and forwards to next middleware +// +// - next middleware should check whether request was cached and act accordingly: +func (c *ServiceCache) IsCachedMiddleware( + next http.Handler, +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // if cache is not enabled - do nothing and forward to next middleware + if !c.cacheEnabled { + c.Logger.Trace(). + Str("method", r.Method). + Str("url", r.URL.String()). + Str("host", r.Host). + Msg("cache is disabled skipping is-cached-middleware") + + next.ServeHTTP(w, r) + return + } + + uncachedContext := context.WithValue(r.Context(), CachedContextKey, false) + cachedContext := context.WithValue(r.Context(), CachedContextKey, true) + + // if we can't get decoded request then forward to next middleware + req := r.Context().Value(c.decodedRequestContextKey) + decodedReq, ok := (req).(*decode.EVMRPCRequestEnvelope) + if !ok { + c.Logger.Error(). + Str("method", r.Method). + Str("url", r.URL.String()). + Str("host", r.Host). + Msg("can't cast request to *EVMRPCRequestEnvelope type") + + next.ServeHTTP(w, r.WithContext(uncachedContext)) + return + } + + // Check if the request is cached: + // 1. if not cached or we encounter an error then mark as uncached and forward to next middleware + // 2. if cached then mark as cached, set cached response in context and forward to next middleware + cachedQueryResponse, err := c.GetCachedQueryResponse(r.Context(), decodedReq) + if err != nil && err != cache.ErrNotFound { + // log unexpected error + c.Logger.Error(). + Err(err). + Msg("error during getting response from cache") + } + if err != nil { + // 1. if not cached or we encounter an error then mark as uncached and forward to next middleware + next.ServeHTTP(w, r.WithContext(uncachedContext)) + return + } + + // 2. if cached then mark as cached, set cached response in context and forward to next middleware + responseContext := context.WithValue(cachedContext, ResponseContextKey, cachedQueryResponse) + next.ServeHTTP(w, r.WithContext(responseContext)) + } +} + +// IsRequestCached returns whether request was cached +// if returns true it means: +// - middleware marked that request was cached +// - value of cached response should be available in context via ResponseContextKey +func IsRequestCached(ctx context.Context) bool { + cached, ok := ctx.Value(CachedContextKey).(bool) + return ok && cached +} diff --git a/service/cachemdw/keys.go b/service/cachemdw/keys.go new file mode 100644 index 0000000..7cd5ead --- /dev/null +++ b/service/cachemdw/keys.go @@ -0,0 +1,68 @@ +package cachemdw + +import ( + "crypto/sha256" + "encoding/hex" + "encoding/json" + "fmt" + "strings" + + "github.com/kava-labs/kava-proxy-service/decode" +) + +type CacheItemType int + +const ( + CacheItemTypeEVMRequest CacheItemType = iota + 1 +) + +func (t CacheItemType) String() string { + switch t { + case CacheItemTypeEVMRequest: + return "evm-request" + default: + return "unknown" + } +} + +func BuildCacheKey(cachePrefix string, cacheItemType CacheItemType, parts []string) string { + fullParts := append( + []string{ + cachePrefix, + cacheItemType.String(), + }, + parts..., + ) + + return strings.Join(fullParts, ":") +} + +// GetQueryKey calculates cache key for request +func GetQueryKey( + cachePrefix string, + req *decode.EVMRPCRequestEnvelope, +) (string, error) { + if req == nil { + return "", fmt.Errorf("request shouldn't be nil") + } + + serializedParams, err := json.Marshal(req.Params) + if err != nil { + return "", err + } + + data := make([]byte, 0) + data = append(data, []byte(req.Method)...) + data = append(data, serializedParams...) + + hashedReq := sha256.Sum256(data) + hashedReqInHex := hex.EncodeToString(hashedReq[:]) + + parts := []string{ + req.Method, + "sha256", + hashedReqInHex, + } + + return BuildCacheKey(cachePrefix, CacheItemTypeEVMRequest, parts), nil +} diff --git a/service/cachemdw/keys_test.go b/service/cachemdw/keys_test.go new file mode 100644 index 0000000..703dd94 --- /dev/null +++ b/service/cachemdw/keys_test.go @@ -0,0 +1,73 @@ +package cachemdw_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" +) + +func TestUnitTestBuildCacheKey(t *testing.T) { + for _, tc := range []struct { + desc string + cachePrefix string + cacheItemType cachemdw.CacheItemType + parts []string + expectedCacheKey string + }{ + { + desc: "test case #1", + cachePrefix: "chain1", + cacheItemType: cachemdw.CacheItemTypeEVMRequest, + parts: []string{"1", "2", "3"}, + expectedCacheKey: "chain1:evm-request:1:2:3", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + cacheKey := cachemdw.BuildCacheKey(tc.cachePrefix, tc.cacheItemType, tc.parts) + require.Equal(t, tc.expectedCacheKey, cacheKey) + }) + } +} + +func TestUnitTestGetQueryKey(t *testing.T) { + for _, tc := range []struct { + desc string + cachePrefix string + req *decode.EVMRPCRequestEnvelope + expectedCacheKey string + errMsg string + }{ + { + desc: "test case #1", + cachePrefix: "chain1", + req: &decode.EVMRPCRequestEnvelope{ + JSONRPCVersion: "2.0", + ID: 1, + Method: "eth_getBlockByHash", + Params: []interface{}{"0x1234", true}, + }, + expectedCacheKey: "chain1:evm-request:eth_getBlockByHash:sha256:2db366278f2cb463f92147bd888bdcad528b44baa94b7920fdff35f4c11ee617", + }, + { + desc: "test case #1", + cachePrefix: "chain1", + req: nil, + errMsg: "request shouldn't be nil", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + cacheKey, err := cachemdw.GetQueryKey(tc.cachePrefix, tc.req) + if tc.errMsg == "" { + require.NoError(t, err) + require.Equal(t, tc.expectedCacheKey, cacheKey) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), tc.errMsg) + require.Empty(t, cacheKey) + } + }) + } +} diff --git a/service/cachemdw/middleware_test.go b/service/cachemdw/middleware_test.go new file mode 100644 index 0000000..8476e8c --- /dev/null +++ b/service/cachemdw/middleware_test.go @@ -0,0 +1,206 @@ +package cachemdw_test + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kava-labs/kava-proxy-service/clients/cache" + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" +) + +func TestUnitTestServiceCacheMiddleware(t *testing.T) { + logger, err := logging.New("TRACE") + require.NoError(t, err) + + inMemoryCache := cache.NewInMemoryCache() + blockGetter := NewMockEVMBlockGetter() + cacheTTL := time.Duration(0) // TTL: no expiry + + serviceCache := cachemdw.NewServiceCache( + inMemoryCache, + blockGetter, + cacheTTL, + service.DecodedRequestContextKey, + defaultCachePrefixString, + true, + &logger, + ) + + emptyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}) + cachingMdw := serviceCache.CachingMiddleware(emptyHandler) + // proxyHandler emulates behaviour of actual service proxy handler + // sequence of execution: + // - isCachedMdw + // - proxyHandler + // - cachingMdw + // - emptyHandler + proxyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := []byte(testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody) + if cachemdw.IsRequestCached(r.Context()) { + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheHitHeaderValue) + } else { + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheMissHeaderValue) + } + w.WriteHeader(http.StatusOK) + w.Write(response) + responseContext := context.WithValue(r.Context(), cachemdw.ResponseContextKey, response) + + cachingMdw.ServeHTTP(w, r.WithContext(responseContext)) + }) + isCachedMdw := serviceCache.IsCachedMiddleware(proxyHandler) + + // test cache MISS and cache HIT scenarios for specified method + // check corresponding values in cachemdw.CacheHeaderKey HTTP header + + t.Run("cache miss", func(t *testing.T) { + req := createTestHttpRequest( + t, + "https://api.kava.io:8545/thisshouldntshowup", + TestRequestEthBlockByNumberSpecific, + ) + resp := httptest.NewRecorder() + + isCachedMdw.ServeHTTP(resp, req) + + require.Equal(t, http.StatusOK, resp.Code) + require.JSONEq(t, testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody, resp.Body.String()) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp.Header().Get(cachemdw.CacheHeaderKey)) + + cacheItems := inMemoryCache.GetAll(context.Background()) + require.Len(t, cacheItems, 1) + require.Contains(t, cacheItems, "1:evm-request:eth_getBlockByNumber:sha256:bf79de57723b25b85391513b470ea6989e7c44dd9afc0c270ee961c9f12f578d") + }) + + t.Run("cache hit", func(t *testing.T) { + req := createTestHttpRequest( + t, + "https://api.kava.io:8545/thisshouldntshowup", + TestRequestEthBlockByNumberSpecific, + ) + resp := httptest.NewRecorder() + + isCachedMdw.ServeHTTP(resp, req) + + require.Equal(t, http.StatusOK, resp.Code) + require.JSONEq(t, testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody, resp.Body.String()) + require.Equal(t, cachemdw.CacheHitHeaderValue, resp.Header().Get(cachemdw.CacheHeaderKey)) + + cacheItems := inMemoryCache.GetAll(context.Background()) + require.Len(t, cacheItems, 1) + require.Contains(t, cacheItems, "1:evm-request:eth_getBlockByNumber:sha256:bf79de57723b25b85391513b470ea6989e7c44dd9afc0c270ee961c9f12f578d") + }) +} + +func TestUnitTestServiceCacheMiddleware_CacheIsDisabled(t *testing.T) { + logger, err := logging.New("TRACE") + require.NoError(t, err) + + inMemoryCache := cache.NewInMemoryCache() + blockGetter := NewMockEVMBlockGetter() + cacheTTL := time.Duration(0) // TTL: no expiry + + serviceCache := cachemdw.NewServiceCache( + inMemoryCache, + blockGetter, + cacheTTL, + service.DecodedRequestContextKey, + defaultCachePrefixString, + false, + &logger, + ) + + emptyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}) + cachingMdw := serviceCache.CachingMiddleware(emptyHandler) + // proxyHandler emulates behaviour of actual service proxy handler + // sequence of execution: + // - isCachedMdw + // - proxyHandler + // - cachingMdw + // - emptyHandler + proxyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := []byte(testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody) + if cachemdw.IsRequestCached(r.Context()) { + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheHitHeaderValue) + } else { + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheMissHeaderValue) + } + w.WriteHeader(http.StatusOK) + w.Write(response) + responseContext := context.WithValue(r.Context(), cachemdw.ResponseContextKey, response) + + cachingMdw.ServeHTTP(w, r.WithContext(responseContext)) + }) + isCachedMdw := serviceCache.IsCachedMiddleware(proxyHandler) + + // both calls should lead to cache MISS scenario, because cache is disabled + // check corresponding values in cachemdw.CacheHeaderKey HTTP header + + t.Run("cache miss", func(t *testing.T) { + req := createTestHttpRequest( + t, + "https://api.kava.io:8545/thisshouldntshowup", + TestRequestEthBlockByNumberSpecific, + ) + resp := httptest.NewRecorder() + + isCachedMdw.ServeHTTP(resp, req) + + require.Equal(t, http.StatusOK, resp.Code) + require.JSONEq(t, testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody, resp.Body.String()) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp.Header().Get(cachemdw.CacheHeaderKey)) + + cacheItems := inMemoryCache.GetAll(context.Background()) + require.Len(t, cacheItems, 0) + }) + + t.Run("cache miss again (cache is disabled)", func(t *testing.T) { + req := createTestHttpRequest( + t, + "https://api.kava.io:8545/thisshouldntshowup", + TestRequestEthBlockByNumberSpecific, + ) + resp := httptest.NewRecorder() + + isCachedMdw.ServeHTTP(resp, req) + + require.Equal(t, http.StatusOK, resp.Code) + require.JSONEq(t, testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody, resp.Body.String()) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp.Header().Get(cachemdw.CacheHeaderKey)) + + cacheItems := inMemoryCache.GetAll(context.Background()) + require.Len(t, cacheItems, 0) + }) +} + +func createTestHttpRequest( + t *testing.T, + url string, + reqName testReqName, +) *http.Request { + t.Helper() + + req, err := http.NewRequest(http.MethodGet, url, nil) + require.NoError(t, err) + + decodedReq, err := decode.DecodeEVMRPCRequest( + []byte(testEVMQueries[reqName].RequestBody), + ) + require.NoError(t, err) + + decodedReqCtx := context.WithValue( + req.Context(), + service.DecodedRequestContextKey, + decodedReq, + ) + req = req.WithContext(decodedReqCtx) + + return req +} diff --git a/service/cachemdw/response.go b/service/cachemdw/response.go new file mode 100644 index 0000000..d4f9dcd --- /dev/null +++ b/service/cachemdw/response.go @@ -0,0 +1,99 @@ +package cachemdw + +import ( + "encoding/json" + "errors" + "fmt" +) + +type JsonRpcError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +// String returns the string representation of the error +func (e *JsonRpcError) String() string { + return fmt.Sprintf("%s (code: %d)", e.Message, e.Code) +} + +// JsonRpcResponse is a EVM JSON-RPC response +type JsonRpcResponse struct { + Version string `json:"jsonrpc,omitempty"` + ID json.RawMessage `json:"id,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + JsonRpcError *JsonRpcError `json:"error,omitempty"` +} + +// UnmarshalJsonRpcResponse unmarshals a JSON-RPC response +func UnmarshalJsonRpcResponse(data []byte) (*JsonRpcResponse, error) { + var msg JsonRpcResponse + err := json.Unmarshal(data, &msg) + return &msg, err +} + +// Marshal marshals a JSON-RPC response to JSON +func (resp *JsonRpcResponse) Marshal() ([]byte, error) { + return json.Marshal(resp) +} + +// Error returns the json-rpc error if any +func (resp *JsonRpcResponse) Error() error { + if resp.JsonRpcError == nil { + return nil + } + + return errors.New(resp.JsonRpcError.String()) +} + +// IsResultEmpty checks if the response's result is empty +func (resp *JsonRpcResponse) IsResultEmpty() bool { + if len(resp.Result) == 0 { + // empty response's result + return true + } + + var result interface{} + err := json.Unmarshal(resp.Result, &result) + if err != nil { + // consider result as empty if it's malformed + return true + } + + switch r := result.(type) { + case []interface{}: + // consider result as empty if it's empty slice + return len(r) == 0 + case string: + // Matches: + // - "" - Empty string + // - "0x0" - Represents zero in official json-rpc conventions. See: + // https://ethereum.org/en/developers/docs/apis/json-rpc/#conventions + // + // - "0x" - Empty response from some endpoints like getCode + + return r == "" || r == "0x0" || r == "0x" + case bool: + // consider result as empty if it's false + return !r + case nil: + // consider result as empty if it's null + return true + default: + return false + } +} + +// IsCacheable returns true in case of: +// - json-rpc response doesn't contain an error +// - json-rpc response's result isn't empty +func (resp *JsonRpcResponse) IsCacheable() bool { + if err := resp.Error(); err != nil { + return false + } + + if resp.IsResultEmpty() { + return false + } + + return true +} diff --git a/service/cachemdw/response_test.go b/service/cachemdw/response_test.go new file mode 100644 index 0000000..978be5a --- /dev/null +++ b/service/cachemdw/response_test.go @@ -0,0 +1,166 @@ +package cachemdw_test + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kava-labs/kava-proxy-service/service/cachemdw" +) + +func TestUnitTestJsonRpcResponse_IsResultEmpty(t *testing.T) { + toJSON := func(t *testing.T, result any) []byte { + resultInJSON, err := json.Marshal(result) + require.NoError(t, err) + + return resultInJSON + } + + mkResp := func(result []byte) *cachemdw.JsonRpcResponse { + return &cachemdw.JsonRpcResponse{ + Version: "2.0", + ID: []byte("1"), + Result: result, + } + } + + tests := []struct { + name string + resp *cachemdw.JsonRpcResponse + isEmpty bool + }{ + { + name: "empty result", + resp: mkResp([]byte("")), + isEmpty: true, + }, + { + name: "invalid json", + resp: mkResp([]byte("invalid json")), + isEmpty: true, + }, + { + name: "empty slice", + resp: mkResp(toJSON(t, []interface{}{})), + isEmpty: true, + }, + { + name: "empty string", + resp: mkResp(toJSON(t, "")), + isEmpty: true, + }, + { + name: "0x0 string", + resp: mkResp(toJSON(t, "0x0")), + isEmpty: true, + }, + { + name: "0x string", + resp: mkResp(toJSON(t, "0x")), + isEmpty: true, + }, + { + name: "empty bool", + resp: mkResp(toJSON(t, false)), + isEmpty: true, + }, + { + name: "nil", + resp: mkResp(nil), + isEmpty: true, + }, + { + name: "null", + resp: mkResp(toJSON(t, nil)), + isEmpty: true, + }, + { + name: "non-empty slice", + resp: mkResp(toJSON(t, []interface{}{1})), + isEmpty: false, + }, + { + name: "non-empty string", + resp: mkResp(toJSON(t, "0x1234")), + isEmpty: false, + }, + { + name: "non-empty bool", + resp: mkResp(toJSON(t, true)), + isEmpty: false, + }, + { + name: "unsupported empty object", + resp: mkResp(toJSON(t, map[string]interface{}{})), + isEmpty: false, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal( + t, + tc.isEmpty, + tc.resp.IsResultEmpty(), + ) + }) + } +} + +func TestUnitTestJsonRpcResponse_IsCacheable(t *testing.T) { + toJSON := func(t *testing.T, result any) []byte { + resultInJSON, err := json.Marshal(result) + require.NoError(t, err) + + return resultInJSON + } + + tests := []struct { + name string + resp *cachemdw.JsonRpcResponse + isCacheable bool + }{ + { + name: "empty result", + resp: &cachemdw.JsonRpcResponse{ + Version: "2.0", + ID: []byte("1"), + Result: []byte{}, + }, + isCacheable: false, + }, + { + name: "non-empty error", + resp: &cachemdw.JsonRpcResponse{ + Version: "2.0", + ID: []byte("1"), + Result: toJSON(t, "0x1234"), + JsonRpcError: &cachemdw.JsonRpcError{ + Code: 1, + Message: "error", + }, + }, + isCacheable: false, + }, + { + name: "valid response", + resp: &cachemdw.JsonRpcResponse{ + Version: "2.0", + ID: []byte("1"), + Result: toJSON(t, "0x1234"), + }, + isCacheable: true, + }, + } + + for _, tc := range tests { + t.Run(tc.name, func(t *testing.T) { + require.Equal( + t, + tc.isCacheable, + tc.resp.IsCacheable(), + ) + }) + } +} diff --git a/service/cachemdw/testdata_test.go b/service/cachemdw/testdata_test.go new file mode 100644 index 0000000..cac9f24 --- /dev/null +++ b/service/cachemdw/testdata_test.go @@ -0,0 +1,298 @@ +package cachemdw_test + +type testHttpRequestResponse struct { + RequestBody string + ResponseBody string +} + +type testReqName string + +const ( + TestRequestWeb3ClientVersion testReqName = "web3_clientVersion" + TestRequestEthGetAccountsEmpty testReqName = "eth_getAccounts/empty" + TestRequestEthBlockByNumberSpecific testReqName = "eth_getBlockByNumber" + TestRequestEthBlockByNumberLatest testReqName = "eth_getBlockByNumber/latest" + TestRequestEthBlockByNumberFuture testReqName = "eth_getBlockByNumber/future" + TestRequestEthBlockByNumberError testReqName = "eth_getBlockByNumber/error" + TestRequestEthGetBalancePositive testReqName = "eth_getBalance/positive" + TestRequestEthGetBalanceZero testReqName = "eth_getBalance/zero" + TestRequestEthGetCodeEmpty testReqName = "eth_getCode/empty" +) + +// testEVMQueries is a map of testing json-rpc responses. These are copied from +// real requests to the Kava evm. +var testEVMQueries = map[testReqName]testHttpRequestResponse{ + TestRequestWeb3ClientVersion: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"web3_clientVersion", + "params":[], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": "Version dev ()\nCompiled at using Go go1.20.3 (amd64)" + }`, + }, + TestRequestEthGetAccountsEmpty: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_accounts", + "params":[], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": [] + }`, + }, + TestRequestEthBlockByNumberSpecific: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + "0x1b4", + true + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "difficulty": "0x0", + "extraData": "0x", + "gasLimit": "0x1312d00", + "gasUsed": "0x1afc2", + "hash": "0xcc6963a6d025ec2dad24373fbd5f2c3ab75b51ccb31f049682e2001b1a20322f", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner": "0x7f73862f0672c066c3f6b4330a736479f0345cd7", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "number": "0x1b4", + "parentHash": "0xd313a81b36d717e4ce67cb7d8f6560158bef9a25f8a4e1b63475050a4181102c", + "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size": "0x2431", + "stateRoot": "0x20197ba04e30d29a58b508b752d41f0614ceb8d47d2ea2544ff64a6490327625", + "timestamp": "0x628e85a0", + "totalDifficulty": "0x0", + "transactions": [], + "transactionsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles": [] + } + }`, + }, + TestRequestEthBlockByNumberLatest: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + "latest", + true + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "difficulty": "0x0", + "extraData": "0x", + "gasLimit": "0x1312d00", + "gasUsed": "0xffea13", + "hash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "logsBloom": "0x9030082000000200200000040002300000000000040008000000000000800100800001000040002008000400800080000880021000000802000002001000080000060040800000000140ac09010020200000a0100000000010000200040000004400048042040018008843800a00080080004000000c00001000001108800288000000014080008000001a80000040020400900000020000800201500044210880001000080040c10c081000000000400000040400000480000021001040000000000002000812002084000700430010000000410008028800c20000100020000001000001210040080010000000010240082880400000000000208000004008", + "miner": "0xb21adc77c091742783061ab15a0bd1c27efc7a81", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "number": "0x49be70", + "parentHash": "0x7e30cc8b5f6208d0c07d7964930a8dc5d111e4f5830744121beeb5d028c8332d", + "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size": "0x2364", + "stateRoot": "0x992ed1d2a240baf82ac21bd1d143f000181f9d15f0b8f1b03ee33b0b704d32ce", + "timestamp": "0x6465223a", + "totalDifficulty": "0x0", + "transactions": [ + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0xbfa2f9018a41a5419d38bf3e11e8651e998037c5", + "gas": "0x895440", + "gasPrice": "0x1e", + "hash": "0x58dc15c522cce394167619c3d80ed6c7645db4b43b4759d2808e7468be6808cf", + "input": "0xfdb5a03e", + "nonce": "0x5e3c6", + "to": "0x109f3289665a8f034e2cacdbcfb678cabe09f1d5", + "transactionIndex": "0x0", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x1180", + "r": "0x89834b451fd30d4c66e35b14af33d1759541f9758fac889a1fb47dab7759db64", + "s": "0x4511c49cc3ab4dcddcb8e427b3c3b3208c947899e32e461b694efa01d44b2d23" + }, + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0xd479f39e2d2cf61a6708d2a68b245ed04c10683d", + "gas": "0x4c4b40", + "gasPrice": "0x37", + "hash": "0xf71127d678911732c7654d4fcff7b7b690a552e2b1652d5b981a03b93d64bee0", + "input": "0xfdb5a03e", + "nonce": "0x71e2a", + "to": "0xbc50b9f7f8a4ac5cfbb02d214239033dd5a35527", + "transactionIndex": "0x1", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x117f", + "r": "0x19e8dfcca57dbad7359b4cd48347f184a08ab0b939ccf3a6978ec4bedf6507c3", + "s": "0xcd02607f329646b02fff56a211a5442f6c5b13fba6bc01a39edcf6c1d6bdd1e" + }, + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0xbfa2f9018a41a5419d38bf3e11e8651e998037c5", + "gas": "0x895440", + "gasPrice": "0x1e", + "hash": "0x2ee916a9b0732d7badd7d9f5bb7d933bb5344bb672f73bf741b922ff9a9d2252", + "input": "0xfdb5a03e", + "nonce": "0x5e3c7", + "to": "0x738114fc34d7b0d33f13d2b5c3d44484ec85c7f1", + "transactionIndex": "0x2", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x1180", + "r": "0x81dfe4351c448ccc6a5f6f2a2f866ad023df7843da56c38fb19dd0d5d90e22de", + "s": "0x1d770062f304732b4fa5544bd12bd4356f9a853d66bbdef547eab034b142897a" + }, + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0x07f92d445d1fa59059b50fb664a7633b86db1152", + "gas": "0x989680", + "gasPrice": "0x3c", + "hash": "0x9cb2439b6d4784d58118d3facb9beba77dc80fa4c998b281a4cf46e944298c1f", + "input": "0xfdb5a03e", + "nonce": "0x58885", + "to": "0xefa8952a4ab8b210a5f1dd2a378ed3d1200cf64b", + "transactionIndex": "0x3", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x117f", + "r": "0x515203b1e08dc06fa7a1fa4e029775a233aa6f9af419185d0b6b8a6407d27eb5", + "s": "0x52984774ebce17affe5f842ad930605381e5ee146074aa1adc171eb4cc128270" + }, + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0x6d4f641c7f86c5c76182066b7bc1023dfe51c8f0", + "gas": "0x424f3", + "gasPrice": "0x3b9aca00", + "hash": "0x102a60dcd2333651c5f13e53db5ffad994e3fd4d74d99eb1355562da0b1b4d8d", + "input": "0xabe50f1900000000000000000000000000000000000000000000010f0cf064dd592000000000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x376", + "to": "0x2911c3a3b497af71aacbb9b1e9fd3ee5d50f959d", + "transactionIndex": "0x4", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x1180", + "r": "0x1890b8bf21b7a50f4a55568535dcff47d89a257e780e51e419c237af943f2afe", + "s": "0x313b04a2e0ea400623fbcfd22af91ebd2593c3fdbd29fbe1004e81e54430770a" + } + ], + "transactionsRoot": "0xfa3bae7d2ee5eff10fe2ff44840d31755c56eb0dfd0827ebc5ad21ac628020d3", + "uncles": [] + } + }`, + }, + TestRequestEthBlockByNumberFuture: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + "0x59be70", + true + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": null + }`, + }, + TestRequestEthBlockByNumberError: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + oops + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": null, + "error": { + "code": -32700, + "message": "parse error" + } + }`, + }, + TestRequestEthGetBalancePositive: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBalance", + "params":[ + "0x373CE80dd1e921506EC5603290AF444e60CeF61F", + "0x49BCF0" + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": "0xdfe3285d58c7e365" + }`, + }, + TestRequestEthGetBalanceZero: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBalance", + "params":[ + "0x1111111111111111111111111111111111111111", + "0x2" + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": "0x0" + }`, + }, + TestRequestEthGetCodeEmpty: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getCode", + "params":[ + "0x1111111111111111111111111111111111111111", + "0x2" + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": "0x" + }`, + }, +} diff --git a/service/handlers.go b/service/handlers.go index 338071e..b7aa38c 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -1,6 +1,7 @@ package service import ( + "context" "encoding/json" "errors" "fmt" @@ -20,12 +21,24 @@ func createHealthcheckHandler(service *ProxyService) func(http.ResponseWriter, * // check that the database is reachable err := service.Database.HealthCheck() - if err != nil { errMsg := fmt.Errorf("proxy service unable to connect to database") combinedErrors = errors.Join(combinedErrors, errMsg) } + if service.Cache.IsCacheEnabled() { + // check that the cache is reachable + err := service.Cache.Healthcheck(context.Background()) + if err != nil { + service.Logger.Error(). + Err(err). + Msg("cache healthcheck failed") + + errMsg := fmt.Errorf("proxy service unable to connect to cache: %v", err) + combinedErrors = errors.Join(combinedErrors, errMsg) + } + } + if combinedErrors != nil { w.WriteHeader(http.StatusInternalServerError) diff --git a/service/middleware.go b/service/middleware.go index 61da20a..9103721 100644 --- a/service/middleware.go +++ b/service/middleware.go @@ -15,6 +15,7 @@ import ( "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" ) const ( @@ -153,6 +154,19 @@ func createProxyRequestMiddleware(next http.Handler, config config.Config, servi handler := func(proxies Proxies) func(http.ResponseWriter, *http.Request) { return func(w http.ResponseWriter, r *http.Request) { + req := r.Context().Value(DecodedRequestContextKey) + decodedReq, ok := (req).(*decode.EVMRPCRequestEnvelope) + if !ok { + serviceLogger.Logger.Error(). + Str("method", r.Method). + Str("url", r.URL.String()). + Str("host", r.Host). + Msg("can't cast request to *EVMRPCRequestEnvelope type") + + // if we can't get decoded request then assign it empty structure to avoid panics + decodedReq = new(decode.EVMRPCRequestEnvelope) + } + serviceLogger.Trace().Msg(fmt.Sprintf("proxying request %+v", r)) proxyRequestAt := time.Now() @@ -220,8 +234,37 @@ func createProxyRequestMiddleware(next http.Handler, config config.Config, servi serviceLogger.Trace().Msg("request body is empty, skipping before request interceptors") } - // proxy request to backend origin servers - proxy.ServeHTTP(lrw, r) + isCached := cachemdw.IsRequestCached(r.Context()) + cachedResponse := r.Context().Value(cachemdw.ResponseContextKey) + typedCachedResponse, ok := cachedResponse.([]byte) + + // if cache is enabled, request is cached and response is present in context - serve the request from the cache + // otherwise proxy to the actual backend + if config.CacheEnabled && isCached && ok { + serviceLogger.Logger.Trace(). + Str("method", r.Method). + Str("url", r.URL.String()). + Str("host", r.Host). + Str("evm-method", decodedReq.Method). + Msg("cache hit") + + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheHitHeaderValue) + w.Header().Add("Content-Type", "application/json") + _, err := w.Write(typedCachedResponse) + if err != nil { + serviceLogger.Logger.Error().Msg(fmt.Sprintf("can't write cached response: %v", err)) + } + } else { + serviceLogger.Logger.Trace(). + Str("method", r.Method). + Str("url", r.URL.String()). + Str("host", r.Host). + Str("evm-method", decodedReq.Method). + Msg("cache miss") + + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheMissHeaderValue) + proxy.ServeHTTP(lrw, r) + } serviceLogger.Trace().Msg(fmt.Sprintf("response %+v \nheaders %+v \nstatus %+v for request %+v", lrw.Status(), lrw.Header(), lrw.body, r)) @@ -240,6 +283,22 @@ func createProxyRequestMiddleware(next http.Handler, config config.Config, servi // add response backend name to context enrichedContext = context.WithValue(enrichedContext, ProxyMetadataContextKey, proxyMetadata) + // if cache is enabled, update enrichedContext with cachemdw.ResponseContextKey -> bodyBytes key-value pair + if config.CacheEnabled { + var bodyCopy bytes.Buffer + tee := io.TeeReader(lrw.body, &bodyCopy) + // read all body from reader into bodyBytes, and copy into bodyCopy + bodyBytes, err := io.ReadAll(tee) + if err != nil { + serviceLogger.Error().Err(err).Msg("can't read lrw.body") + } + + // replace empty body reader with fresh copy + lrw.body = &bodyCopy + // set body in context + enrichedContext = context.WithValue(enrichedContext, cachemdw.ResponseContextKey, bodyBytes) + } + // parse the remote address of the request for use below remoteAddressParts := strings.Split(r.RemoteAddr, ":") @@ -386,6 +445,7 @@ func createAfterProxyFinalizer(service *ProxyService, config config.Config) http } var blockNumber *int64 + // TODO: Redundant ExtractBlockNumberFromEVMRPCRequest call here if request is cached rawBlockNumber, err := decodedRequestBody.ExtractBlockNumberFromEVMRPCRequest(r.Context(), service.evmClient) if err != nil { diff --git a/service/service.go b/service/service.go index ac91e82..dbefc22 100644 --- a/service/service.go +++ b/service/service.go @@ -9,15 +9,18 @@ import ( "time" "github.com/ethereum/go-ethereum/ethclient" + "github.com/kava-labs/kava-proxy-service/clients/cache" "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/clients/database/migrations" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" ) // ProxyService represents an instance of the proxy service API type ProxyService struct { Database *database.PostgresClient + Cache *cachemdw.ServiceCache httpProxy *http.Server evmClient *ethclient.Client *logging.ServiceLogger @@ -27,6 +30,24 @@ type ProxyService struct { func New(ctx context.Context, config config.Config, serviceLogger *logging.ServiceLogger) (ProxyService, error) { service := ProxyService{} + // create database client + db, err := createDatabaseClient(ctx, config, serviceLogger) + if err != nil { + return ProxyService{}, err + } + + // create evm api client + evmClient, err := ethclient.Dial(config.EvmQueryServiceURL) + if err != nil { + return ProxyService{}, err + } + + // create cache client + serviceCache, err := createServiceCache(ctx, config, serviceLogger, evmClient) + if err != nil { + return ProxyService{}, err + } + // create an http router for registering handlers for a given route mux := http.NewServeMux() @@ -38,12 +59,24 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // set up before and after request interceptors (a.k.a. raptors 🦖🦖) + // CachingMiddleware caches request in case of: + // - request isn't already cached + // - request is cacheable + // - response is present in context + cacheAfterProxyMiddleware := serviceCache.CachingMiddleware(afterProxyFinalizer) + // create an http handler that will proxy any request to the specified URL - proxyMiddleware := createProxyRequestMiddleware(afterProxyFinalizer, config, serviceLogger, []RequestInterceptor{}, []RequestInterceptor{}) + proxyMiddleware := createProxyRequestMiddleware(cacheAfterProxyMiddleware, config, serviceLogger, []RequestInterceptor{}, []RequestInterceptor{}) + + // IsCachedMiddleware works in the following way: + // - tries to get response from the cache + // - if present sets cached response in context, marks as cached in context and forwards to next middleware + // - if not present marks as uncached in context and forwards to next middleware + cacheMiddleware := serviceCache.IsCachedMiddleware(proxyMiddleware) // create an http handler that will log the request to stdout // this handler will run before the proxyMiddleware handler - requestLoggingMiddleware := createRequestLoggingMiddleware(proxyMiddleware, serviceLogger) + requestLoggingMiddleware := createRequestLoggingMiddleware(cacheMiddleware, serviceLogger) // register healthcheck handler that can be used during deployment and operations // to determine if the service is ready to receive requests @@ -64,13 +97,6 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi ReadTimeout: time.Duration(config.HTTPReadTimeoutSeconds) * time.Second, } - // create database client - db, err := createDatabaseClient(ctx, config, serviceLogger) - - if err != nil { - return ProxyService{}, err - } - // register database status handler // for responding to requests for the status // of database related operations such as @@ -78,17 +104,11 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // partitioning mux.HandleFunc("/status/database", createDatabaseStatusHandler(&service, db)) - // create evm api client - evmClient, err := ethclient.Dial(config.EvmQueryServiceURL) - - if err != nil { - return ProxyService{}, err - } - service = ProxyService{ httpProxy: server, ServiceLogger: serviceLogger, Database: db, + Cache: serviceCache, evmClient: evmClient, } @@ -167,6 +187,39 @@ func createDatabaseClient(ctx context.Context, config config.Config, logger *log return &serviceDatabase, err } +func createServiceCache( + ctx context.Context, + config config.Config, + logger *logging.ServiceLogger, + evmclient *ethclient.Client, +) (*cachemdw.ServiceCache, error) { + cfg := cache.RedisConfig{ + Address: config.RedisEndpointURL, + Password: config.RedisPassword, + DB: 0, + } + redisCache, err := cache.NewRedisCache( + &cfg, + logger, + ) + if err != nil { + logger.Error().Msg(fmt.Sprintf("error %s creating cache using endpoint %+v", err, config.RedisEndpointURL)) + return nil, err + } + + serviceCache := cachemdw.NewServiceCache( + redisCache, + evmclient, + config.CacheTTL, + DecodedRequestContextKey, + config.CachePrefix, + config.CacheEnabled, + logger, + ) + + return serviceCache, nil +} + // Run runs the proxy service, returning error (if any) in the event // the proxy service stops func (p *ProxyService) Run() error {