diff --git a/.env b/.env index 5197d89..00d97e9 100644 --- a/.env +++ b/.env @@ -48,7 +48,7 @@ TEST_EVM_QUERY_SERVICE_URL=http://kava:8545 ##### Kava Proxy Config # What port the proxy service listens on PROXY_SERVICE_PORT=7777 -LOG_LEVEL=TRACE +LOG_LEVEL=ERROR HTTP_READ_TIMEOUT_SECONDS=30 HTTP_WRITE_TIMEOUT_SECONDS=60 # Address of the origin server to proxy all requests to @@ -89,6 +89,9 @@ METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS=10 METRIC_PARTITIONINING_PREFILL_PERIOD_DAYS=7 # Used by `ready` script to ensure metric partitions have been created. MINIMUM_REQUIRED_PARTITIONS=30 +REDIS_ENDPOINT_URL=redis:6379 +REDIS_PASSWORD= +CHAIN_ID=local-chain ##### Database Config POSTGRES_PASSWORD=password diff --git a/.gitignore b/.gitignore index bb30b67..83ba227 100644 --- a/.gitignore +++ b/.gitignore @@ -17,7 +17,8 @@ kava-proxy-service cover.html # Dependency directories (remove the comment below to include it) -# vendor/ +vendor/ # ignore editor files .vscode/ +.idea/ \ No newline at end of file diff --git a/Makefile b/Makefile index d63769d..6291a8f 100644 --- a/Makefile +++ b/Makefile @@ -27,7 +27,7 @@ install: lint .PHONY: build # build a development version docker image of the service build: lint - docker build ./ -f local.Dockerfile -t ${IMAGE_NAME}:${LOCAL_IMAGE_TAG} + docker build --no-cache ./ -f local.Dockerfile -t ${IMAGE_NAME}:${LOCAL_IMAGE_TAG} .PHONY: publish # build a production version docker image of the service @@ -42,7 +42,7 @@ unit-test: .PHONY: e2e-test # run tests that execute against a local or remote instance of the API e2e-test: - go test -count=1 -v -cover -coverprofile cover.out --race ./... -run "^TestE2ETest*" + go test -count=1 -v -cover -coverprofile cover.out --race ./... -run "^TestE2ETestProxyCachesMethodsWithBlockNumberParam*" .PHONY: it # run any test matching the provided pattern, can pass a regex or a string diff --git a/clients/cache/cache.go b/clients/cache/cache.go new file mode 100644 index 0000000..cc6f9cb --- /dev/null +++ b/clients/cache/cache.go @@ -0,0 +1,16 @@ +package cache + +import ( + "context" + "errors" + "time" +) + +var ErrNotFound = errors.New("value not found in the cache") + +type Cache interface { + Set(ctx context.Context, key string, data []byte, expiration time.Duration) error + Get(ctx context.Context, key string) ([]byte, error) + Delete(ctx context.Context, key string) error + Healthcheck(ctx context.Context) error +} diff --git a/clients/cache/inmemory.go b/clients/cache/inmemory.go new file mode 100644 index 0000000..9293af2 --- /dev/null +++ b/clients/cache/inmemory.go @@ -0,0 +1,101 @@ +package cache + +import ( + "context" + "sync" + "time" +) + +// InMemoryCache is an in-memory implementation of the Cache interface. +type InMemoryCache struct { + data map[string]cacheItem + mutex sync.RWMutex +} + +// Ensure InMemoryCache implements the Cache interface. +var _ Cache = (*InMemoryCache)(nil) + +// cacheItem represents an item stored in the cache. +type cacheItem struct { + data []byte + expiration time.Time +} + +// NewInMemoryCache creates a new instance of InMemoryCache. +func NewInMemoryCache() *InMemoryCache { + return &InMemoryCache{ + data: make(map[string]cacheItem), + } +} + +// Set sets the value of a key in the cache. +func (c *InMemoryCache) Set( + ctx context.Context, + key string, + data []byte, + expiration time.Duration, +) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + expiry := time.Now().Add(expiration) + + if expiration == 0 { + // 100 years in the future to prevent expiry + expiry = time.Now().AddDate(100, 0, 0) + } + + c.data[key] = cacheItem{ + data: data, + expiration: expiry, + } + + return nil +} + +// Get retrieves the value of a key from the cache. +func (c *InMemoryCache) Get(ctx context.Context, key string) ([]byte, error) { + c.mutex.RLock() + defer c.mutex.RUnlock() + + item, ok := c.data[key] + if !ok || time.Now().After(item.expiration) { + // Not a real ttl but just replicates it for fetching + delete(c.data, key) + + return nil, ErrNotFound + } + + return item.data, nil +} + +// GetAll returns all the non-expired data in the cache. +func (c *InMemoryCache) GetAll(ctx context.Context) map[string][]byte { + c.mutex.RLock() + defer c.mutex.RUnlock() + + result := make(map[string][]byte) + + for key, item := range c.data { + if time.Now().After(item.expiration) { + delete(c.data, key) + } else { + result[key] = item.data + } + } + + return result +} + +// Delete removes a key from the cache. +func (c *InMemoryCache) Delete(ctx context.Context, key string) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + delete(c.data, key) + return nil +} + +func (c *InMemoryCache) Healthcheck(ctx context.Context) error { + return nil +} diff --git a/clients/cache/redis.go b/clients/cache/redis.go new file mode 100644 index 0000000..f5bc604 --- /dev/null +++ b/clients/cache/redis.go @@ -0,0 +1,114 @@ +package cache + +import ( + "context" + "fmt" + "time" + + "github.com/kava-labs/kava-proxy-service/logging" + "github.com/redis/go-redis/v9" +) + +type RedisConfig struct { + Address string + Password string + DB int +} + +// RedisCache is an implementation of Cache that uses Redis as the caching backend. +type RedisCache struct { + client *redis.Client + *logging.ServiceLogger +} + +var _ Cache = (*RedisCache)(nil) + +func NewRedisCache( + cfg *RedisConfig, + logger *logging.ServiceLogger, +) (*RedisCache, error) { + client := redis.NewClient(&redis.Options{ + Addr: cfg.Address, + Password: cfg.Password, + DB: cfg.DB, + }) + + return &RedisCache{ + client: client, + ServiceLogger: logger, + }, nil +} + +// Set sets the value for the given key in the cache with the given expiration. +func (rc *RedisCache) Set( + ctx context.Context, + key string, + value []byte, + expiration time.Duration, +) error { + rc.Logger.Trace(). + Str("key", key). + Str("value", string(value)). + Dur("expiration", expiration). + Msg("setting value in redis") + + return rc.client.Set(ctx, key, value, expiration).Err() +} + +// Get gets the value for the given key in the cache. +func (rc *RedisCache) Get( + ctx context.Context, + key string, +) ([]byte, error) { + rc.Logger.Trace(). + Str("key", key). + Msg("getting value from redis") + + val, err := rc.client.Get(ctx, key).Bytes() + if err == redis.Nil { + rc.Logger.Trace(). + Str("key", key). + Msgf("value not found in redis") + return nil, ErrNotFound + } + if err != nil { + rc.Logger.Error(). + Str("key", key). + Err(err). + Msg("error during getting value from redis") + return nil, err + } + + rc.Logger.Trace(). + Str("key", key). + Str("value", string(val)). + Msg("successfully got value from redis") + + return val, nil +} + +// Delete deletes the value for the given key in the cache. +func (rc *RedisCache) Delete(ctx context.Context, key string) error { + rc.Logger.Trace(). + Str("key", key). + Msg("deleting value from redis") + + return rc.client.Del(ctx, key).Err() +} + +func (rc *RedisCache) Healthcheck(ctx context.Context) error { + rc.Logger.Trace().Msg("redis healthcheck was called") + + // Check if we can connect to Redis + _, err := rc.client.Ping(ctx).Result() + if err != nil { + rc.Logger.Error(). + Err(err). + Msg("can't ping redis") + return fmt.Errorf("error connecting to Redis: %v", err) + } + + rc.Logger.Trace().Msg("redis healthcheck was successful") + + return nil +} diff --git a/config/config.go b/config/config.go index 06e2d73..4891135 100644 --- a/config/config.go +++ b/config/config.go @@ -37,6 +37,10 @@ type Config struct { MetricPartitioningRoutineInterval time.Duration MetricPartitioningRoutineDelayFirstRun time.Duration MetricPartitioningPrefillPeriodDays int + RedisEndpointURL string + RedisPassword string + CacheTTL time.Duration + ChainID string } const ( @@ -79,6 +83,11 @@ const ( DEFAULT_DATABASE_READ_TIMEOUT_SECONDS = 60 DATABASE_WRITE_TIMEOUT_SECONDS_ENVIRONMENT_KEY = "DATABASE_WRITE_TIMEOUT_SECONDS" DEFAULT_DATABASE_WRITE_TIMEOUT_SECONDS = 10 + REDIS_ENDPOINT_URL_ENVIRONMENT_KEY = "REDIS_ENDPOINT_URL" + REDIS_PASSWORD_ENVIRONMENT_KEY = "REDIS_PASSWORD" + CACHE_TTL_ENVIRONMENT_KEY = "CACHE_TTL" + DEFAULT_CACHE_TTL_SECONDS = 600 + CHAIN_ID_ENVIRONMENT_KEY = "CHAIN_ID" ) // EnvOrDefault fetches an environment variable value, or if not set returns the fallback value @@ -204,5 +213,9 @@ func ReadConfig() Config { MetricPartitioningRoutineInterval: time.Duration(time.Duration(EnvOrDefaultInt(METRIC_PARTITIONING_ROUTINE_INTERVAL_SECONDS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_ROUTINE_INTERVAL_SECONDS)) * time.Second), MetricPartitioningRoutineDelayFirstRun: time.Duration(time.Duration(EnvOrDefaultInt(METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS)) * time.Second), MetricPartitioningPrefillPeriodDays: EnvOrDefaultInt(METRIC_PARTITIONING_PREFILL_PERIOD_DAYS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_PREFILL_PERIOD_DAYS), + RedisEndpointURL: os.Getenv(REDIS_ENDPOINT_URL_ENVIRONMENT_KEY), + RedisPassword: os.Getenv(REDIS_PASSWORD_ENVIRONMENT_KEY), + CacheTTL: time.Duration(EnvOrDefaultInt(CACHE_TTL_ENVIRONMENT_KEY, DEFAULT_CACHE_TTL_SECONDS)) * time.Second, + ChainID: os.Getenv(CHAIN_ID_ENVIRONMENT_KEY), } } diff --git a/decode/evm_rpc.go b/decode/evm_rpc.go index c773792..4ead817 100644 --- a/decode/evm_rpc.go +++ b/decode/evm_rpc.go @@ -6,12 +6,17 @@ import ( "errors" "fmt" - "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/ethclient" - cosmosmath "cosmossdk.io/math" + "github.com/ethereum/go-ethereum/common" + ethctypes "github.com/ethereum/go-ethereum/core/types" ) +// EVMBlockGetter defines an interface which can be implemented by any client capable of getting ethereum block by hash +type EVMBlockGetter interface { + // BlockByHash returns ethereum block by hash + BlockByHash(ctx context.Context, hash common.Hash) (*ethctypes.Block, error) +} + // Errors that might result from decoding parts or the whole of // an EVM RPC request var ( @@ -138,7 +143,7 @@ func DecodeEVMRPCRequest(body []byte) (*EVMRPCRequestEnvelope, error) { // - the request is a valid evm rpc request // - the method for the request supports specifying a block number // - the provided block number is a valid tag or number -func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context.Context, evmClient *ethclient.Client) (int64, error) { +func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context.Context, blockGetter EVMBlockGetter) (int64, error) { // only attempt to extract block number from a valid ethereum api request if r.Method == "" { return 0, ErrInvalidEthAPIRequest @@ -173,12 +178,12 @@ func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context. return parseBlockNumberFromParams(r.Method, r.Params) } - return lookupBlockNumberFromHashParam(ctx, evmClient, r.Method, r.Params) + return lookupBlockNumberFromHashParam(ctx, blockGetter, r.Method, r.Params) } // Generic method to lookup the block number // based on the hash value in a set of params -func lookupBlockNumberFromHashParam(ctx context.Context, evmClient *ethclient.Client, methodName string, params []interface{}) (int64, error) { +func lookupBlockNumberFromHashParam(ctx context.Context, blockGetter EVMBlockGetter, methodName string, params []interface{}) (int64, error) { paramIndex, exists := MethodNameToBlockHashParamIndex[methodName] if !exists { @@ -191,7 +196,7 @@ func lookupBlockNumberFromHashParam(ctx context.Context, evmClient *ethclient.Cl return 0, fmt.Errorf(fmt.Sprintf("error decoding block hash param from params %+v at index %d", params, paramIndex)) } - block, err := evmClient.BlockByHash(ctx, common.HexToHash(blockHash)) + block, err := blockGetter.BlockByHash(ctx, common.HexToHash(blockHash)) if err != nil { return 0, err diff --git a/go.mod b/go.mod index 27a36ba..8bc47b3 100644 --- a/go.mod +++ b/go.mod @@ -6,6 +6,7 @@ require ( cosmossdk.io/math v1.0.0 github.com/ethereum/go-ethereum v1.11.2 github.com/google/uuid v1.3.0 + github.com/redis/go-redis/v9 v9.2.1 github.com/rs/zerolog v1.29.0 github.com/stretchr/testify v1.8.2 github.com/uptrace/bun v1.1.12 @@ -18,9 +19,11 @@ require ( require ( github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect + github.com/cespare/xxhash/v2 v2.2.0 // indirect github.com/davecgh/go-spew v1.1.1 // indirect github.com/deckarep/golang-set/v2 v2.1.0 // indirect github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect + github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect github.com/fatih/color v1.14.1 // indirect github.com/go-ole/go-ole v1.2.1 // indirect github.com/go-stack/stack v1.8.1 // indirect diff --git a/go.sum b/go.sum index a82a2c9..55e2f8e 100644 --- a/go.sum +++ b/go.sum @@ -5,10 +5,13 @@ github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 h1:fLjPD/aNc3UIO github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6/go.mod h1:3eOhrUMpNV+6aFIbp5/iudMxNCF27Vw2OZgy4xEx0Fg= github.com/VictoriaMetrics/fastcache v1.6.0 h1:C/3Oi3EiBCqufydp1neRZkqcwmEiuRT9c3fqvvgKm5o= github.com/beorn7/perks v1.0.1 h1:VlbKKnNfV8bJzeqoa4cOKqO6bYr3WgKZxO8Z16+hsOM= +github.com/bsm/ginkgo/v2 v2.12.0 h1:Ny8MWAHyOepLGlLKYmXG4IEkioBysk6GpaRTLC8zwWs= +github.com/bsm/gomega v1.27.10 h1:yeMWxP2pV2fG3FgAODIY8EiRE3dy0aeFYt4l7wh6yKA= github.com/btcsuite/btcd/btcec/v2 v2.2.0 h1:fzn1qaOt32TuLjFlkzYSsBC35Q3KUjT1SwPxiMSCF5k= github.com/btcsuite/btcd/btcec/v2 v2.2.0/go.mod h1:U7MHm051Al6XmscBQ0BoNydpOTsFAn707034b5nY8zU= github.com/btcsuite/btcd/chaincfg/chainhash v1.0.1 h1:q0rUy8C/TYNBQS1+CGKw68tLOFYSNEs0TFnxxnS9+4U= github.com/cespare/xxhash/v2 v2.2.0 h1:DC2CZ1Ep5Y4k3ZQ899DldepgrayRUGE6BBZ/cd9Cj44= +github.com/cespare/xxhash/v2 v2.2.0/go.mod h1:VGX0DQ3Q6kWi7AoAeZDth3/j3BFtOZR5XLFGgcrjCOs= github.com/cockroachdb/errors v1.9.1 h1:yFVvsI0VxmRShfawbt/laCIDy/mtTqqnvoNgiy5bEV8= github.com/cockroachdb/logtags v0.0.0-20230118201751-21c54148d20b h1:r6VH0faHjZeQy818SGhaone5OnYfxFR/+AzdY3sf5aE= github.com/cockroachdb/pebble v0.0.0-20230209160836-829675f94811 h1:ytcWPaNPhNoGMWEhDvS3zToKcDpRsLuRolQJBVGdozk= @@ -24,6 +27,8 @@ github.com/decred/dcrd/crypto/blake256 v1.0.0 h1:/8DMNYp9SGi5f0w7uCm6d6M4OU2rGFK github.com/decred/dcrd/crypto/blake256 v1.0.0/go.mod h1:sQl2p6Y26YV+ZOcSTP6thNdn47hh8kt6rqSlvmrXFAc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 h1:YLtO71vCjJRCBcrPMtQ9nqBsqpA1m5sE92cU+pd5Mcc= github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1/go.mod h1:hyedUtir6IdtD/7lIxGeCxkaw7y45JueMRL4DIyJDKs= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f h1:lO4WD4F/rVNCu3HqELle0jiPLLBs70cWOduZpkS1E78= +github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f/go.mod h1:cuUVRXasLTGF7a8hSLbxyZXjz+1KgoB3wDUb6vlszIc= github.com/edsrzf/mmap-go v1.0.0 h1:CEBF7HpRnUCSJgGUb5h1Gm7e3VkmVDrR8lvWVLtrOFw= github.com/ethereum/go-ethereum v1.11.2 h1:z/luyejbevDCAMUUiu0rc80dxJxOnpoG58k5o0tSawc= github.com/ethereum/go-ethereum v1.11.2/go.mod h1:DuefStAgaxoaYGLR0FueVcVbehmn5n9QUcVrMCuOvuc= @@ -78,6 +83,8 @@ github.com/prometheus/client_model v0.3.0 h1:UBgGFHqYdG/TPFD1B1ogZywDqEkwp3fBMvq github.com/prometheus/common v0.39.0 h1:oOyhkDq05hPZKItWVBkJ6g6AtGxi+fy7F4JvUV8uhsI= github.com/prometheus/procfs v0.9.0 h1:wzCHvIvM5SxWqYvwgVL7yJY8Lz3PKn49KQtpgMYJfhI= github.com/prometheus/tsdb v0.7.1 h1:YZcsG11NqnK4czYLrWd9mpEuAJIHVQLwdrleYfszMAA= +github.com/redis/go-redis/v9 v9.2.1 h1:WlYJg71ODF0dVspZZCpYmoF1+U1Jjk9Rwd7pq6QmlCg= +github.com/redis/go-redis/v9 v9.2.1/go.mod h1:hdY0cQFCN4fnSYT6TkisLufl/4W5UIXyv0b/CLO2V2M= github.com/rogpeppe/go-internal v1.9.0 h1:73kH8U+JUqXU8lRuOHeVHaa/SZPifC7BkcraZVejAe8= github.com/rs/cors v1.7.0 h1:+88SsELBHx5r+hZ8TCkggzSstaWNbDvThkVK8H6f9ik= github.com/rs/xid v1.4.0/go.mod h1:trrq9SKmegXys3aeAKXMUTdJsYXVwGY3RLcfgqegfbg= diff --git a/main_test.go b/main_test.go index 3bd2cff..f44bf6b 100644 --- a/main_test.go +++ b/main_test.go @@ -1,7 +1,16 @@ package main_test import ( + "bytes" "context" + "encoding/json" + "errors" + "fmt" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" + "github.com/redis/go-redis/v9" + "io" + "math/big" + "net/http" "os" "testing" "time" @@ -50,6 +59,9 @@ var ( Logger: &testServiceLogger, RunDatabaseMigrations: false, } + + redisHostPort = os.Getenv("REDIS_HOST_PORT") + redisPassword = os.Getenv("REDIS_PASSWORD") ) func TestE2ETestProxyReturnsNonZeroLatestBlockHeader(t *testing.T) { @@ -469,3 +481,176 @@ func TestE2ETestProxyTracksBlockNumberForMethodsWithBlockHashParam(t *testing.T) assert.Equal(t, *requestMetricDuringRequestWindow.BlockNumber, requestBlockNumber) } } + +func TestE2ETestProxyCachesMethodsWithBlockNumberParam(t *testing.T) { + testRandomAddressHex := "0x6767114FFAA17C6439D7AEA480738B982CE63A02" + testAddress := common.HexToAddress(testRandomAddressHex) + + // create api and database clients + client, err := ethclient.Dial(proxyServiceURL) + if err != nil { + t.Fatal(err) + } + + redisClient := redis.NewClient(&redis.Options{ + Addr: fmt.Sprintf("localhost:%v", redisHostPort), + Password: redisPassword, + DB: 0, + }) + cleanUpRedis(t, redisClient) + expectKeysNum(t, redisClient, 0) + + for _, tc := range []struct { + desc string + method string + params []interface{} + keysNum int + }{ + { + desc: "test case #1", + method: "eth_getTransactionCount", + params: []interface{}{testAddress, "0x1"}, + keysNum: 1, + }, + { + desc: "test case #2", + method: "eth_getBlockByNumber", + params: []interface{}{"0x1", true}, + keysNum: 2, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + // test cache MISS and cache HIT scenarios for specified method + // check corresponding values in cachemdw.CacheMissHeaderValue HTTP header + // check that cached and non-cached responses are equal + + // eth_getBlockByNumber - cache MISS + resp1 := mkJsonRpcRequest(t, proxyServiceURL, tc.method, tc.params) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp1.Header[cachemdw.CacheHeaderKey][0]) + body1, err := io.ReadAll(resp1.Body) + require.NoError(t, err) + err = checkJsonRpcErr(body1) + require.NoError(t, err) + expectKeysNum(t, redisClient, tc.keysNum) + + // eth_getBlockByNumber - cache HIT + resp2 := mkJsonRpcRequest(t, proxyServiceURL, tc.method, tc.params) + require.Equal(t, cachemdw.CacheHitHeaderValue, resp2.Header[cachemdw.CacheHeaderKey][0]) + body2, err := io.ReadAll(resp2.Body) + require.NoError(t, err) + err = checkJsonRpcErr(body2) + require.NoError(t, err) + expectKeysNum(t, redisClient, tc.keysNum) + + require.JSONEq(t, string(body1), string(body2), "blocks should be the same") + }) + } + + // test cache MISS and cache HIT scenarios for eth_getTransactionCount method + // check that cached and non-cached responses are equal + { + // eth_getTransactionCount - cache MISS + bal1, err := client.NonceAt(testContext, testAddress, big.NewInt(2)) + require.NoError(t, err) + expectKeysNum(t, redisClient, 3) + + // eth_getTransactionCount - cache HIT + bal2, err := client.NonceAt(testContext, testAddress, big.NewInt(2)) + require.NoError(t, err) + expectKeysNum(t, redisClient, 3) + + require.Equal(t, bal1, bal2, "balances should be the same") + } + + // test cache MISS and cache HIT scenarios for eth_getBlockByNumber method + // check that cached and non-cached responses are equal + { + // eth_getBlockByNumber - cache MISS + block1, err := client.BlockByNumber(testContext, big.NewInt(2)) + require.NoError(t, err) + expectKeysNum(t, redisClient, 4) + + // eth_getBlockByNumber - cache HIT + block2, err := client.BlockByNumber(testContext, big.NewInt(2)) + require.NoError(t, err) + expectKeysNum(t, redisClient, 4) + + require.Equal(t, block1, block2, "blocks should be the same") + } + + cleanUpRedis(t, redisClient) +} + +func expectKeysNum(t *testing.T, redisClient *redis.Client, keysNum int) { + keys, err := redisClient.Keys(context.Background(), "*").Result() + require.NoError(t, err) + + require.Equal(t, keysNum, len(keys)) +} + +func cleanUpRedis(t *testing.T, redisClient *redis.Client) { + keys, err := redisClient.Keys(context.Background(), "*").Result() + require.NoError(t, err) + + for _, key := range keys { + fmt.Printf("key: %v\n", key) + } + + if len(keys) != 0 { + _, err = redisClient.Del(context.Background(), keys...).Result() + require.NoError(t, err) + } +} + +func mkJsonRpcRequest(t *testing.T, proxyServiceURL, method string, params []interface{}) *http.Response { + req := newJsonRpcRequest(method, params) + reqInJSON, err := json.Marshal(req) + require.NoError(t, err) + reqReader := bytes.NewBuffer(reqInJSON) + resp, err := http.Post(proxyServiceURL, "application/json", reqReader) + require.NoError(t, err) + require.Equal(t, http.StatusOK, resp.StatusCode) + + return resp +} + +type jsonRpcRequest struct { + JsonRpc string `json:"jsonrpc"` + Method string `json:"method"` + Params []interface{} `json:"params"` + Id int `json:"id"` +} + +func newJsonRpcRequest(method string, params []interface{}) *jsonRpcRequest { + return &jsonRpcRequest{ + JsonRpc: "2.0", + Method: method, + Params: params, + Id: 1, + } +} + +type jsonRpcResponse struct { + Jsonrpc string `json:"jsonrpc"` + Id int `json:"id"` + Result interface{} `json:"result"` + Error string `json:"error"` +} + +func checkJsonRpcErr(body []byte) error { + var resp jsonRpcResponse + err := json.Unmarshal(body, &resp) + if err != nil { + return err + } + + if resp.Error != "" { + return errors.New(resp.Error) + } + + if resp.Result == "" { + return errors.New("result is empty") + } + + return nil +} diff --git a/rebuild.sh b/rebuild.sh new file mode 100755 index 0000000..71603d8 --- /dev/null +++ b/rebuild.sh @@ -0,0 +1,4 @@ +make down +docker rmi kava-proxy-service-proxy +make build +make up \ No newline at end of file diff --git a/service/cachemdw/cache.go b/service/cachemdw/cache.go new file mode 100644 index 0000000..ef25f25 --- /dev/null +++ b/service/cachemdw/cache.go @@ -0,0 +1,125 @@ +package cachemdw + +import ( + "context" + "errors" + "time" + + "github.com/kava-labs/kava-proxy-service/clients/cache" + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/logging" +) + +// ServiceCache is responsible for caching EVM requests and provides corresponding middleware +// ServiceCache can work with any underlying storage which implements simple cache.Cache interface +type ServiceCache struct { + cacheClient cache.Cache + blockGetter decode.EVMBlockGetter + cacheTTL time.Duration + decodedRequestContextKey any + // chainID is used as prefix for any key in the cache + chainID string + + *logging.ServiceLogger +} + +func NewServiceCache( + cacheClient cache.Cache, + blockGetter decode.EVMBlockGetter, + cacheTTL time.Duration, + decodedRequestContextKey any, + chainID string, + logger *logging.ServiceLogger, +) *ServiceCache { + return &ServiceCache{ + cacheClient: cacheClient, + blockGetter: blockGetter, + cacheTTL: cacheTTL, + decodedRequestContextKey: decodedRequestContextKey, + chainID: chainID, + ServiceLogger: logger, + } +} + +// IsCacheable checks if EVM request is cacheable. +// In current implementation we consider request is cacheable if it has specific block height +func IsCacheable( + ctx context.Context, + blockGetter decode.EVMBlockGetter, + logger *logging.ServiceLogger, + req *decode.EVMRPCRequestEnvelope, +) bool { + blockNumber, err := req.ExtractBlockNumberFromEVMRPCRequest(ctx, blockGetter) + if err != nil { + logger.Logger.Error(). + Err(err). + Msg("can't extract block number from EVM RPC request") + return false + } + + if blockNumber <= 0 { + return false + } + + return true +} + +// GetCachedQueryResponse calculates cache key for request and then tries to get it from cache. +func (c *ServiceCache) GetCachedQueryResponse( + ctx context.Context, + req *decode.EVMRPCRequestEnvelope, +) ([]byte, error) { + key, err := GetQueryKey(c.chainID, req) + if err != nil { + return nil, err + } + + value, err := c.cacheClient.Get(ctx, key) + if err != nil { + return nil, err + } + + return value, nil +} + +// CacheQueryResponse calculates cache key for request and then saves response to the cache. +func (c *ServiceCache) CacheQueryResponse( + ctx context.Context, + req *decode.EVMRPCRequestEnvelope, + chainID string, + response []byte, +) error { + if !IsCacheable(ctx, c.blockGetter, c.ServiceLogger, req) { + return errors.New("query isn't cacheable") + } + + key, err := GetQueryKey(chainID, req) + if err != nil { + return err + } + + return c.cacheClient.Set(ctx, key, response, c.cacheTTL) +} + +func (c *ServiceCache) ValidateAndCacheQueryResponse( + ctx context.Context, + req *decode.EVMRPCRequestEnvelope, + response []byte, +) error { + // TODO(yevhenii): add validation + + if err := c.CacheQueryResponse( + ctx, + req, + c.chainID, + response, + ); err != nil { + return err + } + + return nil +} + +func (c *ServiceCache) Healthcheck(ctx context.Context) error { + return c.cacheClient.Healthcheck(ctx) +} diff --git a/service/cachemdw/cache_test.go b/service/cachemdw/cache_test.go new file mode 100644 index 0000000..4675145 --- /dev/null +++ b/service/cachemdw/cache_test.go @@ -0,0 +1,132 @@ +package cachemdw_test + +import ( + "context" + "math/big" + "testing" + "time" + + "github.com/ethereum/go-ethereum/common" + ethctypes "github.com/ethereum/go-ethereum/core/types" + "github.com/stretchr/testify/require" + + "github.com/kava-labs/kava-proxy-service/clients/cache" + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" +) + +const ( + defaultChainIDString = "1" + defaultHost = "api.kava.io" + defaultBlockNumber = "42" +) + +var ( + defaultChainID = big.NewInt(1) + defaultQueryResp = []byte("resp") +) + +type MockEVMBlockGetter struct{} + +func NewMockEVMBlockGetter() *MockEVMBlockGetter { + return &MockEVMBlockGetter{} +} + +var _ decode.EVMBlockGetter = (*MockEVMBlockGetter)(nil) + +func (c *MockEVMBlockGetter) BlockByHash(ctx context.Context, hash common.Hash) (*ethctypes.Block, error) { + panic("not implemented") +} + +func (c *MockEVMBlockGetter) ChainID(ctx context.Context) (*big.Int, error) { + return defaultChainID, nil +} + +func TestUnitTestIsCacheable(t *testing.T) { + logger, err := logging.New("TRACE") + require.NoError(t, err) + + blockGetter := NewMockEVMBlockGetter() + ctxb := context.Background() + + for _, tc := range []struct { + desc string + req *decode.EVMRPCRequestEnvelope + cacheable bool + }{ + { + desc: "test case #1", + req: mkEVMRPCRequestEnvelope(defaultBlockNumber), + cacheable: true, + }, + { + desc: "test case #2", + req: mkEVMRPCRequestEnvelope("0"), + cacheable: false, + }, + } { + t.Run(tc.desc, func(t *testing.T) { + cacheable := cachemdw.IsCacheable(ctxb, blockGetter, &logger, tc.req) + require.Equal(t, tc.cacheable, cacheable) + }) + } +} + +func TestUnitTestCacheQueryResponse(t *testing.T) { + logger, err := logging.New("TRACE") + require.NoError(t, err) + + inMemoryCache := cache.NewInMemoryCache() + blockGetter := NewMockEVMBlockGetter() + cacheTTL := time.Hour + ctxb := context.Background() + + serviceCache := cachemdw.NewServiceCache(inMemoryCache, blockGetter, cacheTTL, service.DecodedRequestContextKey, defaultChainIDString, &logger) + + req := mkEVMRPCRequestEnvelope(defaultBlockNumber) + resp, err := serviceCache.GetCachedQueryResponse(ctxb, req) + require.Equal(t, cache.ErrNotFound, err) + require.Empty(t, resp) + + err = serviceCache.CacheQueryResponse(ctxb, req, defaultChainIDString, defaultQueryResp) + require.NoError(t, err) + + resp, err = serviceCache.GetCachedQueryResponse(ctxb, req) + require.NoError(t, err) + require.Equal(t, defaultQueryResp, resp) +} + +func TestUnitTestValidateAndCacheQueryResponse(t *testing.T) { + logger, err := logging.New("TRACE") + require.NoError(t, err) + + inMemoryCache := cache.NewInMemoryCache() + blockGetter := NewMockEVMBlockGetter() + cacheTTL := time.Hour + ctxb := context.Background() + + serviceCache := cachemdw.NewServiceCache(inMemoryCache, blockGetter, cacheTTL, service.DecodedRequestContextKey, defaultChainIDString, &logger) + + req := mkEVMRPCRequestEnvelope(defaultBlockNumber) + resp, err := serviceCache.GetCachedQueryResponse(ctxb, req) + require.Equal(t, cache.ErrNotFound, err) + require.Empty(t, resp) + + err = serviceCache.ValidateAndCacheQueryResponse(ctxb, req, defaultQueryResp) + require.NoError(t, err) + + resp, err = serviceCache.GetCachedQueryResponse(ctxb, req) + require.NoError(t, err) + require.Equal(t, defaultQueryResp, resp) +} + +func mkEVMRPCRequestEnvelope(blockNumber string) *decode.EVMRPCRequestEnvelope { + return &decode.EVMRPCRequestEnvelope{ + JSONRPCVersion: "2.0", + ID: 1, + Method: "eth_getBalance", + Params: []interface{}{"0x1234", blockNumber}, + } +} diff --git a/service/cachemdw/caching_middleware.go b/service/cachemdw/caching_middleware.go new file mode 100644 index 0000000..f739a0d --- /dev/null +++ b/service/cachemdw/caching_middleware.go @@ -0,0 +1,50 @@ +package cachemdw + +import ( + "net/http" + + "github.com/kava-labs/kava-proxy-service/decode" +) + +// CachingMiddleware returns kava-proxy-service compatible middleware which works in the following way: +// - tries to get decoded request from context (previous middleware should set it) +// - checks few conditions: +// - if request isn't already cached +// - if request is cacheable +// - if response is present in context +// +// - if all above is true - caches the response +// - calls next middleware +func (c *ServiceCache) CachingMiddleware( + next http.Handler, +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + // if we can't get decoded request then forward to next middleware + req := r.Context().Value(c.decodedRequestContextKey) + decodedReq, ok := (req).(*decode.EVMRPCRequestEnvelope) + if !ok { + c.Logger.Error().Msg("can't cast request to *EVMRPCRequestEnvelope type") + + next.ServeHTTP(w, r) + return + } + + isCached := IsRequestCached(r.Context()) + cacheable := IsCacheable(r.Context(), c.blockGetter, c.ServiceLogger, decodedReq) + response := r.Context().Value(ResponseContextKey) + typedResponse, ok := response.([]byte) + + // if request isn't already cached, request is cacheable and response is present in context - cache the response + if !isCached && cacheable && ok { + if err := c.ValidateAndCacheQueryResponse( + r.Context(), + decodedReq, + typedResponse, + ); err != nil { + c.Logger.Error().Msgf("can't validate and cache response: %v", err) + } + } + + next.ServeHTTP(w, r) + } +} diff --git a/service/cachemdw/keys.go b/service/cachemdw/keys.go new file mode 100644 index 0000000..54123a5 --- /dev/null +++ b/service/cachemdw/keys.go @@ -0,0 +1,66 @@ +package cachemdw + +import ( + "encoding/json" + "fmt" + "strings" + + "github.com/ethereum/go-ethereum/crypto" + + "github.com/kava-labs/kava-proxy-service/decode" +) + +type CacheItemType int + +const ( + CacheItemTypeQuery CacheItemType = iota + 1 +) + +func (t CacheItemType) String() string { + switch t { + case CacheItemTypeQuery: + return "query" + default: + return "unknown" + } +} + +func BuildCacheKey(cacheItemType CacheItemType, parts []string) string { + fullParts := append( + []string{ + cacheItemType.String(), + }, + parts..., + ) + + return strings.Join(fullParts, ":") +} + +// GetQueryKey calculates cache key for request +func GetQueryKey( + chainID string, + req *decode.EVMRPCRequestEnvelope, +) (string, error) { + if req == nil { + return "", fmt.Errorf("request shouldn't be nil") + } + + // TODO(yevhenii): use stable/sorted JSON serializer + serializedParams, err := json.Marshal(req.Params) + if err != nil { + return "", err + } + + data := make([]byte, 0) + data = append(data, []byte(req.Method)...) + data = append(data, serializedParams...) + + hashedReq := crypto.Keccak256Hash(data) + + parts := []string{ + chainID, + hashedReq.Hex(), + } + + return BuildCacheKey(CacheItemTypeQuery, parts), nil +} diff --git a/service/cachemdw/keys_test.go b/service/cachemdw/keys_test.go new file mode 100644 index 0000000..62b4b11 --- /dev/null +++ b/service/cachemdw/keys_test.go @@ -0,0 +1,71 @@ +package cachemdw_test + +import ( + "testing" + + "github.com/stretchr/testify/require" + + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" +) + +func TestUnitTestBuildCacheKey(t *testing.T) { + for _, tc := range []struct { + desc string + cacheItemType cachemdw.CacheItemType + parts []string + expectedCacheKey string + }{ + { + desc: "test case #1", + cacheItemType: cachemdw.CacheItemTypeQuery, + parts: []string{"1", "2", "3"}, + expectedCacheKey: "query:1:2:3", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + cacheKey := cachemdw.BuildCacheKey(tc.cacheItemType, tc.parts) + require.Equal(t, tc.expectedCacheKey, cacheKey) + }) + } +} + +func TestUnitTestGetQueryKey(t *testing.T) { + for _, tc := range []struct { + desc string + chainID string + req *decode.EVMRPCRequestEnvelope + expectedCacheKey string + errMsg string + }{ + { + desc: "test case #1", + chainID: "chain1", + req: &decode.EVMRPCRequestEnvelope{ + JSONRPCVersion: "2.0", + ID: 1, + Method: "eth_getBlockByHash", + Params: []interface{}{"0x1234", true}, + }, + expectedCacheKey: "query:chain1:0xb2b69f976d9aa41cd2065e2a2354254f6cba682a6fe2b3996571daa27ea4a6f4", + }, + { + desc: "test case #1", + chainID: "chain1", + req: nil, + errMsg: "request shouldn't be nil", + }, + } { + t.Run(tc.desc, func(t *testing.T) { + cacheKey, err := cachemdw.GetQueryKey(tc.chainID, tc.req) + if tc.errMsg == "" { + require.NoError(t, err) + require.Equal(t, tc.expectedCacheKey, cacheKey) + } else { + require.Error(t, err) + require.Contains(t, err.Error(), tc.errMsg) + require.Empty(t, cacheKey) + } + }) + } +} diff --git a/service/cachemdw/middleware.go b/service/cachemdw/middleware.go new file mode 100644 index 0000000..89afac4 --- /dev/null +++ b/service/cachemdw/middleware.go @@ -0,0 +1,73 @@ +package cachemdw + +import ( + "context" + "net/http" + + "github.com/kava-labs/kava-proxy-service/clients/cache" + "github.com/kava-labs/kava-proxy-service/decode" +) + +const ( + CachedContextKey = "X-KAVA-PROXY-CACHED" + ResponseContextKey = "X-KAVA-PROXY-RESPONSE" + + CacheHeaderKey = "X-Kava-Proxy-Cache-Status" + CacheHitHeaderValue = "HIT" + CacheMissHeaderValue = "MISS" +) + +// IsCachedMiddleware returns kava-proxy-service compatible middleware which works in the following way: +// - tries to get decoded request from context (previous middleware should set it) +// - tries to get response from the cache +// - if present sets cached response in context, marks as cached in context and forwards to next middleware +// - if not present marks as uncached in context and forwards to next middleware +// +// - next middleware should check whether request was cached and act accordingly: +func (c *ServiceCache) IsCachedMiddleware( + next http.Handler, +) http.HandlerFunc { + return func(w http.ResponseWriter, r *http.Request) { + uncachedContext := context.WithValue(r.Context(), CachedContextKey, false) + cachedContext := context.WithValue(r.Context(), CachedContextKey, true) + + // if we can't get decoded request then forward to next middleware + req := r.Context().Value(c.decodedRequestContextKey) + decodedReq, ok := (req).(*decode.EVMRPCRequestEnvelope) + if !ok { + c.Logger.Error().Msg("can't cast request to *EVMRPCRequestEnvelope type") + + next.ServeHTTP(w, r.WithContext(uncachedContext)) + return + } + + // Check if the request is cached: + // 1. if not cached or we encounter an error then mark as uncached and forward to next middleware + // 2. if cached then mark as cached, set cached response in context and forward to next middleware + cachedQueryResponse, err := c.GetCachedQueryResponse(r.Context(), decodedReq) + if err != nil && err != cache.ErrNotFound { + // log unexpected error + c.Logger.Error(). + Err(err). + Msg("error during getting response from cache") + } + if err != nil { + // 1. if not cached or we encounter an error then mark as uncached and forward to next middleware + next.ServeHTTP(w, r.WithContext(uncachedContext)) + return + } + + // 2. if cached then mark as cached, set cached response in context and forward to next middleware + responseContext := context.WithValue(cachedContext, ResponseContextKey, cachedQueryResponse) + next.ServeHTTP(w, r.WithContext(responseContext)) + } +} + +// IsRequestCached returns whether request was cached +// if returns true it means: +// - middleware marked that request was cached +// - value of cached response should be available in context via ResponseContextKey +func IsRequestCached(ctx context.Context) bool { + cached, ok := ctx.Value(CachedContextKey).(bool) + return ok && cached +} diff --git a/service/cachemdw/middleware_test.go b/service/cachemdw/middleware_test.go new file mode 100644 index 0000000..2e77a0c --- /dev/null +++ b/service/cachemdw/middleware_test.go @@ -0,0 +1,110 @@ +package cachemdw_test + +import ( + "context" + "net/http" + "net/http/httptest" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/kava-labs/kava-proxy-service/clients/cache" + "github.com/kava-labs/kava-proxy-service/decode" + "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" +) + +func TestE2ETestServiceCacheMiddleware(t *testing.T) { + logger, err := logging.New("TRACE") + require.NoError(t, err) + + inMemoryCache := cache.NewInMemoryCache() + blockGetter := NewMockEVMBlockGetter() + cacheTTL := time.Duration(0) // TTL: no expiry + + serviceCache := cachemdw.NewServiceCache(inMemoryCache, blockGetter, cacheTTL, service.DecodedRequestContextKey, defaultChainIDString, &logger) + + emptyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) {}) + cachingMdw := serviceCache.CachingMiddleware(emptyHandler) + // proxyHandler emulates behaviour of actual service proxy handler + // sequence of execution: + // - isCachedMdw + // - proxyHandler + // - cachingMdw + // - emptyHandler + proxyHandler := http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + response := []byte(testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody) + if cachemdw.IsRequestCached(r.Context()) { + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheHitHeaderValue) + } else { + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheMissHeaderValue) + } + w.WriteHeader(http.StatusOK) + w.Write(response) + responseContext := context.WithValue(r.Context(), cachemdw.ResponseContextKey, response) + + cachingMdw.ServeHTTP(w, r.WithContext(responseContext)) + }) + isCachedMdw := serviceCache.IsCachedMiddleware(proxyHandler) + + t.Run("cache miss", func(t *testing.T) { + req := createTestHttpRequest( + t, + "https://api.kava.io:8545/thisshouldntshowup", + TestRequestEthBlockByNumberSpecific, + ) + resp := httptest.NewRecorder() + + isCachedMdw.ServeHTTP(resp, req) + + require.Equal(t, http.StatusOK, resp.Code) + require.JSONEq(t, testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody, resp.Body.String()) + require.Equal(t, cachemdw.CacheMissHeaderValue, resp.Header().Get(cachemdw.CacheHeaderKey)) + + cacheItems := inMemoryCache.GetAll(context.Background()) + require.Len(t, cacheItems, 1) + require.Contains(t, cacheItems, "query:1:0x885d3d84b42d647be47d94a001428be7e88ab787251031ddbfb247a581d0505a") + }) + + t.Run("cache hit", func(t *testing.T) { + req := createTestHttpRequest( + t, + "https://api.kava.io:8545/thisshouldntshowup", + TestRequestEthBlockByNumberSpecific, + ) + resp := httptest.NewRecorder() + + isCachedMdw.ServeHTTP(resp, req) + + require.Equal(t, http.StatusOK, resp.Code) + require.JSONEq(t, testEVMQueries[TestRequestEthBlockByNumberSpecific].ResponseBody, resp.Body.String()) + require.Equal(t, cachemdw.CacheHitHeaderValue, resp.Header().Get(cachemdw.CacheHeaderKey)) + }) +} + +func createTestHttpRequest( + t *testing.T, + url string, + reqName testReqName, +) *http.Request { + t.Helper() + + req, err := http.NewRequest(http.MethodGet, url, nil) + require.NoError(t, err) + + decodedReq, err := decode.DecodeEVMRPCRequest( + []byte(testEVMQueries[reqName].RequestBody), + ) + require.NoError(t, err) + + decodedReqCtx := context.WithValue( + req.Context(), + service.DecodedRequestContextKey, + decodedReq, + ) + req = req.WithContext(decodedReqCtx) + + return req +} diff --git a/service/cachemdw/testdata_test.go b/service/cachemdw/testdata_test.go new file mode 100644 index 0000000..cac9f24 --- /dev/null +++ b/service/cachemdw/testdata_test.go @@ -0,0 +1,298 @@ +package cachemdw_test + +type testHttpRequestResponse struct { + RequestBody string + ResponseBody string +} + +type testReqName string + +const ( + TestRequestWeb3ClientVersion testReqName = "web3_clientVersion" + TestRequestEthGetAccountsEmpty testReqName = "eth_getAccounts/empty" + TestRequestEthBlockByNumberSpecific testReqName = "eth_getBlockByNumber" + TestRequestEthBlockByNumberLatest testReqName = "eth_getBlockByNumber/latest" + TestRequestEthBlockByNumberFuture testReqName = "eth_getBlockByNumber/future" + TestRequestEthBlockByNumberError testReqName = "eth_getBlockByNumber/error" + TestRequestEthGetBalancePositive testReqName = "eth_getBalance/positive" + TestRequestEthGetBalanceZero testReqName = "eth_getBalance/zero" + TestRequestEthGetCodeEmpty testReqName = "eth_getCode/empty" +) + +// testEVMQueries is a map of testing json-rpc responses. These are copied from +// real requests to the Kava evm. +var testEVMQueries = map[testReqName]testHttpRequestResponse{ + TestRequestWeb3ClientVersion: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"web3_clientVersion", + "params":[], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": "Version dev ()\nCompiled at using Go go1.20.3 (amd64)" + }`, + }, + TestRequestEthGetAccountsEmpty: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_accounts", + "params":[], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": [] + }`, + }, + TestRequestEthBlockByNumberSpecific: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + "0x1b4", + true + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "difficulty": "0x0", + "extraData": "0x", + "gasLimit": "0x1312d00", + "gasUsed": "0x1afc2", + "hash": "0xcc6963a6d025ec2dad24373fbd5f2c3ab75b51ccb31f049682e2001b1a20322f", + "logsBloom": "0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000", + "miner": "0x7f73862f0672c066c3f6b4330a736479f0345cd7", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "number": "0x1b4", + "parentHash": "0xd313a81b36d717e4ce67cb7d8f6560158bef9a25f8a4e1b63475050a4181102c", + "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size": "0x2431", + "stateRoot": "0x20197ba04e30d29a58b508b752d41f0614ceb8d47d2ea2544ff64a6490327625", + "timestamp": "0x628e85a0", + "totalDifficulty": "0x0", + "transactions": [], + "transactionsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "uncles": [] + } + }`, + }, + TestRequestEthBlockByNumberLatest: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + "latest", + true + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": { + "difficulty": "0x0", + "extraData": "0x", + "gasLimit": "0x1312d00", + "gasUsed": "0xffea13", + "hash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "logsBloom": "0x9030082000000200200000040002300000000000040008000000000000800100800001000040002008000400800080000880021000000802000002001000080000060040800000000140ac09010020200000a0100000000010000200040000004400048042040018008843800a00080080004000000c00001000001108800288000000014080008000001a80000040020400900000020000800201500044210880001000080040c10c081000000000400000040400000480000021001040000000000002000812002084000700430010000000410008028800c20000100020000001000001210040080010000000010240082880400000000000208000004008", + "miner": "0xb21adc77c091742783061ab15a0bd1c27efc7a81", + "mixHash": "0x0000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x0000000000000000", + "number": "0x49be70", + "parentHash": "0x7e30cc8b5f6208d0c07d7964930a8dc5d111e4f5830744121beeb5d028c8332d", + "receiptsRoot": "0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421", + "sha3Uncles": "0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347", + "size": "0x2364", + "stateRoot": "0x992ed1d2a240baf82ac21bd1d143f000181f9d15f0b8f1b03ee33b0b704d32ce", + "timestamp": "0x6465223a", + "totalDifficulty": "0x0", + "transactions": [ + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0xbfa2f9018a41a5419d38bf3e11e8651e998037c5", + "gas": "0x895440", + "gasPrice": "0x1e", + "hash": "0x58dc15c522cce394167619c3d80ed6c7645db4b43b4759d2808e7468be6808cf", + "input": "0xfdb5a03e", + "nonce": "0x5e3c6", + "to": "0x109f3289665a8f034e2cacdbcfb678cabe09f1d5", + "transactionIndex": "0x0", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x1180", + "r": "0x89834b451fd30d4c66e35b14af33d1759541f9758fac889a1fb47dab7759db64", + "s": "0x4511c49cc3ab4dcddcb8e427b3c3b3208c947899e32e461b694efa01d44b2d23" + }, + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0xd479f39e2d2cf61a6708d2a68b245ed04c10683d", + "gas": "0x4c4b40", + "gasPrice": "0x37", + "hash": "0xf71127d678911732c7654d4fcff7b7b690a552e2b1652d5b981a03b93d64bee0", + "input": "0xfdb5a03e", + "nonce": "0x71e2a", + "to": "0xbc50b9f7f8a4ac5cfbb02d214239033dd5a35527", + "transactionIndex": "0x1", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x117f", + "r": "0x19e8dfcca57dbad7359b4cd48347f184a08ab0b939ccf3a6978ec4bedf6507c3", + "s": "0xcd02607f329646b02fff56a211a5442f6c5b13fba6bc01a39edcf6c1d6bdd1e" + }, + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0xbfa2f9018a41a5419d38bf3e11e8651e998037c5", + "gas": "0x895440", + "gasPrice": "0x1e", + "hash": "0x2ee916a9b0732d7badd7d9f5bb7d933bb5344bb672f73bf741b922ff9a9d2252", + "input": "0xfdb5a03e", + "nonce": "0x5e3c7", + "to": "0x738114fc34d7b0d33f13d2b5c3d44484ec85c7f1", + "transactionIndex": "0x2", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x1180", + "r": "0x81dfe4351c448ccc6a5f6f2a2f866ad023df7843da56c38fb19dd0d5d90e22de", + "s": "0x1d770062f304732b4fa5544bd12bd4356f9a853d66bbdef547eab034b142897a" + }, + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0x07f92d445d1fa59059b50fb664a7633b86db1152", + "gas": "0x989680", + "gasPrice": "0x3c", + "hash": "0x9cb2439b6d4784d58118d3facb9beba77dc80fa4c998b281a4cf46e944298c1f", + "input": "0xfdb5a03e", + "nonce": "0x58885", + "to": "0xefa8952a4ab8b210a5f1dd2a378ed3d1200cf64b", + "transactionIndex": "0x3", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x117f", + "r": "0x515203b1e08dc06fa7a1fa4e029775a233aa6f9af419185d0b6b8a6407d27eb5", + "s": "0x52984774ebce17affe5f842ad930605381e5ee146074aa1adc171eb4cc128270" + }, + { + "blockHash": "0xe1cbbd4ba91685ce6c3fe51f2a64cc29d81beee2926d803f7f9ba59fba42fb43", + "blockNumber": "0x49be70", + "from": "0x6d4f641c7f86c5c76182066b7bc1023dfe51c8f0", + "gas": "0x424f3", + "gasPrice": "0x3b9aca00", + "hash": "0x102a60dcd2333651c5f13e53db5ffad994e3fd4d74d99eb1355562da0b1b4d8d", + "input": "0xabe50f1900000000000000000000000000000000000000000000010f0cf064dd592000000000000000000000000000000000000000000000000000000000000000000000", + "nonce": "0x376", + "to": "0x2911c3a3b497af71aacbb9b1e9fd3ee5d50f959d", + "transactionIndex": "0x4", + "value": "0x0", + "type": "0x0", + "chainId": "0x8ae", + "v": "0x1180", + "r": "0x1890b8bf21b7a50f4a55568535dcff47d89a257e780e51e419c237af943f2afe", + "s": "0x313b04a2e0ea400623fbcfd22af91ebd2593c3fdbd29fbe1004e81e54430770a" + } + ], + "transactionsRoot": "0xfa3bae7d2ee5eff10fe2ff44840d31755c56eb0dfd0827ebc5ad21ac628020d3", + "uncles": [] + } + }`, + }, + TestRequestEthBlockByNumberFuture: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + "0x59be70", + true + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": null + }`, + }, + TestRequestEthBlockByNumberError: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBlockByNumber", + "params":[ + oops + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": null, + "error": { + "code": -32700, + "message": "parse error" + } + }`, + }, + TestRequestEthGetBalancePositive: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBalance", + "params":[ + "0x373CE80dd1e921506EC5603290AF444e60CeF61F", + "0x49BCF0" + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": "0xdfe3285d58c7e365" + }`, + }, + TestRequestEthGetBalanceZero: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getBalance", + "params":[ + "0x1111111111111111111111111111111111111111", + "0x2" + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": "0x0" + }`, + }, + TestRequestEthGetCodeEmpty: { + RequestBody: `{ + "jsonrpc":"2.0", + "method":"eth_getCode", + "params":[ + "0x1111111111111111111111111111111111111111", + "0x2" + ], + "id":1 + }`, + ResponseBody: `{ + "jsonrpc": "2.0", + "id": 1, + "result": "0x" + }`, + }, +} diff --git a/service/handlers.go b/service/handlers.go index 338071e..5e778bf 100644 --- a/service/handlers.go +++ b/service/handlers.go @@ -1,6 +1,7 @@ package service import ( + "context" "encoding/json" "errors" "fmt" @@ -20,12 +21,21 @@ func createHealthcheckHandler(service *ProxyService) func(http.ResponseWriter, * // check that the database is reachable err := service.Database.HealthCheck() - if err != nil { errMsg := fmt.Errorf("proxy service unable to connect to database") combinedErrors = errors.Join(combinedErrors, errMsg) } + err = service.Cache.Healthcheck(context.Background()) + if err != nil { + service.Logger.Error(). + Err(err). + Msg("cache healthcheck failed") + + errMsg := fmt.Errorf("proxy service unable to connect to cache") + combinedErrors = errors.Join(combinedErrors, errMsg) + } + if combinedErrors != nil { w.WriteHeader(http.StatusInternalServerError) diff --git a/service/middleware.go b/service/middleware.go index b77f2f1..f1a2354 100644 --- a/service/middleware.go +++ b/service/middleware.go @@ -16,6 +16,7 @@ import ( "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/decode" "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" ) const ( @@ -228,8 +229,23 @@ func createProxyRequestMiddleware(next http.Handler, config config.Config, servi serviceLogger.Trace().Msg("request body is empty, skipping before request interceptors") } - // proxy request to backend origin servers - proxy.ServeHTTP(lrw, r) + isCached := cachemdw.IsRequestCached(r.Context()) + response := r.Context().Value(cachemdw.ResponseContextKey) + typedResponse, ok := response.([]byte) + + // if request is cached and response is present in context - serve the request from the cache + // otherwise proxy to the actual backend + if isCached && ok { + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheHitHeaderValue) + w.Header().Add("Content-Type", "application/json") + _, err := w.Write(typedResponse) + if err != nil { + serviceLogger.Logger.Error().Msg(fmt.Sprintf("can't write cached response: %v", err)) + } + } else { + w.Header().Add(cachemdw.CacheHeaderKey, cachemdw.CacheMissHeaderValue) + proxy.ServeHTTP(lrw, r) + } serviceLogger.Trace().Msg(fmt.Sprintf("response %+v \nheaders %+v \nstatus %+v for request %+v", lrw.Status(), lrw.Header(), lrw.body, r)) @@ -243,7 +259,13 @@ func createProxyRequestMiddleware(next http.Handler, config config.Config, servi // extract the original hostname the request was sent to requestHostnameContext := context.WithValue(originRoundtripLatencyContext, RequestHostnameContextKey, r.Host) - enrichedContext := requestHostnameContext + rawBody, err := io.ReadAll(lrw.body) + if err != nil { + serviceLogger.Error().Err(err).Msg("can't read lrw.body") + } + responseContext := context.WithValue(requestHostnameContext, cachemdw.ResponseContextKey, rawBody) + + enrichedContext := responseContext // parse the remote address of the request for use below remoteAddressParts := strings.Split(r.RemoteAddr, ":") @@ -383,6 +405,7 @@ func createAfterProxyFinalizer(service *ProxyService, config config.Config) http } var blockNumber *int64 + // TODO: Redundant ExtractBlockNumberFromEVMRPCRequest call here if request is cached rawBlockNumber, err := decodedRequestBody.ExtractBlockNumberFromEVMRPCRequest(r.Context(), service.evmClient) if err != nil { diff --git a/service/service.go b/service/service.go index ac91e82..26637a9 100644 --- a/service/service.go +++ b/service/service.go @@ -9,15 +9,18 @@ import ( "time" "github.com/ethereum/go-ethereum/ethclient" + "github.com/kava-labs/kava-proxy-service/clients/cache" "github.com/kava-labs/kava-proxy-service/clients/database" "github.com/kava-labs/kava-proxy-service/clients/database/migrations" "github.com/kava-labs/kava-proxy-service/config" "github.com/kava-labs/kava-proxy-service/logging" + "github.com/kava-labs/kava-proxy-service/service/cachemdw" ) // ProxyService represents an instance of the proxy service API type ProxyService struct { Database *database.PostgresClient + Cache *cachemdw.ServiceCache httpProxy *http.Server evmClient *ethclient.Client *logging.ServiceLogger @@ -27,6 +30,24 @@ type ProxyService struct { func New(ctx context.Context, config config.Config, serviceLogger *logging.ServiceLogger) (ProxyService, error) { service := ProxyService{} + // create database client + db, err := createDatabaseClient(ctx, config, serviceLogger) + if err != nil { + return ProxyService{}, err + } + + // create evm api client + evmClient, err := ethclient.Dial(config.EvmQueryServiceURL) + if err != nil { + return ProxyService{}, err + } + + // create cache client + serviceCache, err := createServiceCache(ctx, config, serviceLogger, evmClient) + if err != nil { + return ProxyService{}, err + } + // create an http router for registering handlers for a given route mux := http.NewServeMux() @@ -38,12 +59,24 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // set up before and after request interceptors (a.k.a. raptors 🦖🦖) + // CachingMiddleware caches request in case of: + // - request isn't already cached + // - request is cacheable + // - response is present in context + cacheAfterProxyMiddleware := serviceCache.CachingMiddleware(afterProxyFinalizer) + // create an http handler that will proxy any request to the specified URL - proxyMiddleware := createProxyRequestMiddleware(afterProxyFinalizer, config, serviceLogger, []RequestInterceptor{}, []RequestInterceptor{}) + proxyMiddleware := createProxyRequestMiddleware(cacheAfterProxyMiddleware, config, serviceLogger, []RequestInterceptor{}, []RequestInterceptor{}) + + // IsCachedMiddleware works in the following way: + // - tries to get response from the cache + // - if present sets cached response in context, marks as cached in context and forwards to next middleware + // - if not present marks as uncached in context and forwards to next middleware + cacheMiddleware := serviceCache.IsCachedMiddleware(proxyMiddleware) // create an http handler that will log the request to stdout // this handler will run before the proxyMiddleware handler - requestLoggingMiddleware := createRequestLoggingMiddleware(proxyMiddleware, serviceLogger) + requestLoggingMiddleware := createRequestLoggingMiddleware(cacheMiddleware, serviceLogger) // register healthcheck handler that can be used during deployment and operations // to determine if the service is ready to receive requests @@ -64,13 +97,6 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi ReadTimeout: time.Duration(config.HTTPReadTimeoutSeconds) * time.Second, } - // create database client - db, err := createDatabaseClient(ctx, config, serviceLogger) - - if err != nil { - return ProxyService{}, err - } - // register database status handler // for responding to requests for the status // of database related operations such as @@ -78,17 +104,11 @@ func New(ctx context.Context, config config.Config, serviceLogger *logging.Servi // partitioning mux.HandleFunc("/status/database", createDatabaseStatusHandler(&service, db)) - // create evm api client - evmClient, err := ethclient.Dial(config.EvmQueryServiceURL) - - if err != nil { - return ProxyService{}, err - } - service = ProxyService{ httpProxy: server, ServiceLogger: serviceLogger, Database: db, + Cache: serviceCache, evmClient: evmClient, } @@ -167,6 +187,38 @@ func createDatabaseClient(ctx context.Context, config config.Config, logger *log return &serviceDatabase, err } +func createServiceCache( + ctx context.Context, + config config.Config, + logger *logging.ServiceLogger, + evmclient *ethclient.Client, +) (*cachemdw.ServiceCache, error) { + cfg := cache.RedisConfig{ + Address: config.RedisEndpointURL, + Password: config.RedisPassword, + DB: 0, + } + redisCache, err := cache.NewRedisCache( + &cfg, + logger, + ) + if err != nil { + logger.Error().Msg(fmt.Sprintf("error %s creating cache using endpoint %+v", err, config.RedisEndpointURL)) + return nil, err + } + + serviceCache := cachemdw.NewServiceCache( + redisCache, + evmclient, + config.CacheTTL, + DecodedRequestContextKey, + config.ChainID, + logger, + ) + + return serviceCache, nil +} + // Run runs the proxy service, returning error (if any) in the event // the proxy service stops func (p *ProxyService) Run() error {