Skip to content

Commit

Permalink
Basic implementation of Caching Middleware
Browse files Browse the repository at this point in the history
  • Loading branch information
evgeniy-scherbina committed Oct 16, 2023
1 parent 69b86f0 commit 484f69e
Show file tree
Hide file tree
Showing 23 changed files with 1,493 additions and 31 deletions.
5 changes: 4 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ TEST_EVM_QUERY_SERVICE_URL=http://kava:8545
##### Kava Proxy Config
# What port the proxy service listens on
PROXY_SERVICE_PORT=7777
LOG_LEVEL=TRACE
LOG_LEVEL=ERROR
HTTP_READ_TIMEOUT_SECONDS=30
HTTP_WRITE_TIMEOUT_SECONDS=60
# Address of the origin server to proxy all requests to
Expand Down Expand Up @@ -89,6 +89,9 @@ METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS=10
METRIC_PARTITIONINING_PREFILL_PERIOD_DAYS=7
# Used by `ready` script to ensure metric partitions have been created.
MINIMUM_REQUIRED_PARTITIONS=30
REDIS_ENDPOINT_URL=redis:6379
REDIS_PASSWORD=
CHAIN_ID=local-chain

##### Database Config
POSTGRES_PASSWORD=password
Expand Down
3 changes: 2 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,8 @@ kava-proxy-service
cover.html

# Dependency directories (remove the comment below to include it)
# vendor/
vendor/

# ignore editor files
.vscode/
.idea/
4 changes: 2 additions & 2 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -27,7 +27,7 @@ install: lint
.PHONY: build
# build a development version docker image of the service
build: lint
docker build ./ -f local.Dockerfile -t ${IMAGE_NAME}:${LOCAL_IMAGE_TAG}
docker build --no-cache ./ -f local.Dockerfile -t ${IMAGE_NAME}:${LOCAL_IMAGE_TAG}

.PHONY: publish
# build a production version docker image of the service
Expand All @@ -42,7 +42,7 @@ unit-test:
.PHONY: e2e-test
# run tests that execute against a local or remote instance of the API
e2e-test:
go test -count=1 -v -cover -coverprofile cover.out --race ./... -run "^TestE2ETest*"
go test -count=1 -v -cover -coverprofile cover.out --race ./... -run "^TestE2ETestProxyCachesMethodsWithBlockNumberParam*"

.PHONY: it
# run any test matching the provided pattern, can pass a regex or a string
Expand Down
16 changes: 16 additions & 0 deletions clients/cache/cache.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
package cache

import (
"context"
"errors"
"time"
)

var ErrNotFound = errors.New("value not found in the cache")

type Cache interface {
Set(ctx context.Context, key string, data []byte, expiration time.Duration) error
Get(ctx context.Context, key string) ([]byte, error)
Delete(ctx context.Context, key string) error
Healthcheck(ctx context.Context) error
}
101 changes: 101 additions & 0 deletions clients/cache/inmemory.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
package cache

import (
"context"
"sync"
"time"
)

// InMemoryCache is an in-memory implementation of the Cache interface.
type InMemoryCache struct {
data map[string]cacheItem
mutex sync.RWMutex
}

// Ensure InMemoryCache implements the Cache interface.
var _ Cache = (*InMemoryCache)(nil)

// cacheItem represents an item stored in the cache.
type cacheItem struct {
data []byte
expiration time.Time
}

// NewInMemoryCache creates a new instance of InMemoryCache.
func NewInMemoryCache() *InMemoryCache {
return &InMemoryCache{
data: make(map[string]cacheItem),
}
}

// Set sets the value of a key in the cache.
func (c *InMemoryCache) Set(
ctx context.Context,
key string,
data []byte,
expiration time.Duration,
) error {
c.mutex.Lock()
defer c.mutex.Unlock()

expiry := time.Now().Add(expiration)

if expiration == 0 {
// 100 years in the future to prevent expiry
expiry = time.Now().AddDate(100, 0, 0)
}

c.data[key] = cacheItem{
data: data,
expiration: expiry,
}

return nil
}

// Get retrieves the value of a key from the cache.
func (c *InMemoryCache) Get(ctx context.Context, key string) ([]byte, error) {
c.mutex.RLock()
defer c.mutex.RUnlock()

item, ok := c.data[key]
if !ok || time.Now().After(item.expiration) {
// Not a real ttl but just replicates it for fetching
delete(c.data, key)

return nil, ErrNotFound
}

return item.data, nil
}

// GetAll returns all the non-expired data in the cache.
func (c *InMemoryCache) GetAll(ctx context.Context) map[string][]byte {
c.mutex.RLock()
defer c.mutex.RUnlock()

result := make(map[string][]byte)

for key, item := range c.data {
if time.Now().After(item.expiration) {
delete(c.data, key)
} else {
result[key] = item.data
}
}

return result
}

// Delete removes a key from the cache.
func (c *InMemoryCache) Delete(ctx context.Context, key string) error {
c.mutex.Lock()
defer c.mutex.Unlock()

delete(c.data, key)
return nil
}

func (c *InMemoryCache) Healthcheck(ctx context.Context) error {
return nil
}
114 changes: 114 additions & 0 deletions clients/cache/redis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,114 @@
package cache

import (
"context"
"fmt"
"time"

"github.com/kava-labs/kava-proxy-service/logging"
"github.com/redis/go-redis/v9"
)

type RedisConfig struct {
Address string
Password string
DB int
}

// RedisCache is an implementation of Cache that uses Redis as the caching backend.
type RedisCache struct {
client *redis.Client
*logging.ServiceLogger
}

var _ Cache = (*RedisCache)(nil)

func NewRedisCache(
cfg *RedisConfig,
logger *logging.ServiceLogger,
) (*RedisCache, error) {
client := redis.NewClient(&redis.Options{
Addr: cfg.Address,
Password: cfg.Password,
DB: cfg.DB,
})

return &RedisCache{
client: client,
ServiceLogger: logger,
}, nil
}

// Set sets the value for the given key in the cache with the given expiration.
func (rc *RedisCache) Set(
ctx context.Context,
key string,
value []byte,
expiration time.Duration,
) error {
rc.Logger.Trace().
Str("key", key).
Str("value", string(value)).
Dur("expiration", expiration).
Msg("setting value in redis")

return rc.client.Set(ctx, key, value, expiration).Err()
}

// Get gets the value for the given key in the cache.
func (rc *RedisCache) Get(
ctx context.Context,
key string,
) ([]byte, error) {
rc.Logger.Trace().
Str("key", key).
Msg("getting value from redis")

val, err := rc.client.Get(ctx, key).Bytes()
if err == redis.Nil {
rc.Logger.Trace().
Str("key", key).
Msgf("value not found in redis")
return nil, ErrNotFound
}
if err != nil {
rc.Logger.Error().
Str("key", key).
Err(err).
Msg("error during getting value from redis")
return nil, err
}

rc.Logger.Trace().
Str("key", key).
Str("value", string(val)).
Msg("successfully got value from redis")

return val, nil
}

// Delete deletes the value for the given key in the cache.
func (rc *RedisCache) Delete(ctx context.Context, key string) error {
rc.Logger.Trace().
Str("key", key).
Msg("deleting value from redis")

return rc.client.Del(ctx, key).Err()
}

func (rc *RedisCache) Healthcheck(ctx context.Context) error {
rc.Logger.Trace().Msg("redis healthcheck was called")

// Check if we can connect to Redis
_, err := rc.client.Ping(ctx).Result()
if err != nil {
rc.Logger.Error().
Err(err).
Msg("can't ping redis")
return fmt.Errorf("error connecting to Redis: %v", err)
}

rc.Logger.Trace().Msg("redis healthcheck was successful")

return nil
}
13 changes: 13 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,10 @@ type Config struct {
MetricPartitioningRoutineInterval time.Duration
MetricPartitioningRoutineDelayFirstRun time.Duration
MetricPartitioningPrefillPeriodDays int
RedisEndpointURL string
RedisPassword string
CacheTTL time.Duration
ChainID string
}

const (
Expand Down Expand Up @@ -79,6 +83,11 @@ const (
DEFAULT_DATABASE_READ_TIMEOUT_SECONDS = 60
DATABASE_WRITE_TIMEOUT_SECONDS_ENVIRONMENT_KEY = "DATABASE_WRITE_TIMEOUT_SECONDS"
DEFAULT_DATABASE_WRITE_TIMEOUT_SECONDS = 10
REDIS_ENDPOINT_URL_ENVIRONMENT_KEY = "REDIS_ENDPOINT_URL"
REDIS_PASSWORD_ENVIRONMENT_KEY = "REDIS_PASSWORD"
CACHE_TTL_ENVIRONMENT_KEY = "CACHE_TTL"
DEFAULT_CACHE_TTL_SECONDS = 600
CHAIN_ID_ENVIRONMENT_KEY = "CHAIN_ID"
)

// EnvOrDefault fetches an environment variable value, or if not set returns the fallback value
Expand Down Expand Up @@ -204,5 +213,9 @@ func ReadConfig() Config {
MetricPartitioningRoutineInterval: time.Duration(time.Duration(EnvOrDefaultInt(METRIC_PARTITIONING_ROUTINE_INTERVAL_SECONDS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_ROUTINE_INTERVAL_SECONDS)) * time.Second),
MetricPartitioningRoutineDelayFirstRun: time.Duration(time.Duration(EnvOrDefaultInt(METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_ROUTINE_DELAY_FIRST_RUN_SECONDS)) * time.Second),
MetricPartitioningPrefillPeriodDays: EnvOrDefaultInt(METRIC_PARTITIONING_PREFILL_PERIOD_DAYS_ENVIRONMENT_KEY, DEFAULT_METRIC_PARTITIONING_PREFILL_PERIOD_DAYS),
RedisEndpointURL: os.Getenv(REDIS_ENDPOINT_URL_ENVIRONMENT_KEY),
RedisPassword: os.Getenv(REDIS_PASSWORD_ENVIRONMENT_KEY),
CacheTTL: time.Duration(EnvOrDefaultInt(CACHE_TTL_ENVIRONMENT_KEY, DEFAULT_CACHE_TTL_SECONDS)) * time.Second,
ChainID: os.Getenv(CHAIN_ID_ENVIRONMENT_KEY),
}
}
19 changes: 12 additions & 7 deletions decode/evm_rpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,12 +6,17 @@ import (
"errors"
"fmt"

"github.com/ethereum/go-ethereum/common"
"github.com/ethereum/go-ethereum/ethclient"

cosmosmath "cosmossdk.io/math"
"github.com/ethereum/go-ethereum/common"
ethctypes "github.com/ethereum/go-ethereum/core/types"
)

// EVMBlockGetter defines an interface which can be implemented by any client capable of getting ethereum block by hash
type EVMBlockGetter interface {
// BlockByHash returns ethereum block by hash
BlockByHash(ctx context.Context, hash common.Hash) (*ethctypes.Block, error)
}

// Errors that might result from decoding parts or the whole of
// an EVM RPC request
var (
Expand Down Expand Up @@ -138,7 +143,7 @@ func DecodeEVMRPCRequest(body []byte) (*EVMRPCRequestEnvelope, error) {
// - the request is a valid evm rpc request
// - the method for the request supports specifying a block number
// - the provided block number is a valid tag or number
func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context.Context, evmClient *ethclient.Client) (int64, error) {
func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context.Context, blockGetter EVMBlockGetter) (int64, error) {
// only attempt to extract block number from a valid ethereum api request
if r.Method == "" {
return 0, ErrInvalidEthAPIRequest
Expand Down Expand Up @@ -173,12 +178,12 @@ func (r *EVMRPCRequestEnvelope) ExtractBlockNumberFromEVMRPCRequest(ctx context.
return parseBlockNumberFromParams(r.Method, r.Params)
}

return lookupBlockNumberFromHashParam(ctx, evmClient, r.Method, r.Params)
return lookupBlockNumberFromHashParam(ctx, blockGetter, r.Method, r.Params)
}

// Generic method to lookup the block number
// based on the hash value in a set of params
func lookupBlockNumberFromHashParam(ctx context.Context, evmClient *ethclient.Client, methodName string, params []interface{}) (int64, error) {
func lookupBlockNumberFromHashParam(ctx context.Context, blockGetter EVMBlockGetter, methodName string, params []interface{}) (int64, error) {
paramIndex, exists := MethodNameToBlockHashParamIndex[methodName]

if !exists {
Expand All @@ -191,7 +196,7 @@ func lookupBlockNumberFromHashParam(ctx context.Context, evmClient *ethclient.Cl
return 0, fmt.Errorf(fmt.Sprintf("error decoding block hash param from params %+v at index %d", params, paramIndex))
}

block, err := evmClient.BlockByHash(ctx, common.HexToHash(blockHash))
block, err := blockGetter.BlockByHash(ctx, common.HexToHash(blockHash))

if err != nil {
return 0, err
Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,7 @@ require (
cosmossdk.io/math v1.0.0
github.com/ethereum/go-ethereum v1.11.2
github.com/google/uuid v1.3.0
github.com/redis/go-redis/v9 v9.2.1
github.com/rs/zerolog v1.29.0
github.com/stretchr/testify v1.8.2
github.com/uptrace/bun v1.1.12
Expand All @@ -18,9 +19,11 @@ require (
require (
github.com/StackExchange/wmi v0.0.0-20180116203802-5d049714c4a6 // indirect
github.com/btcsuite/btcd/btcec/v2 v2.2.0 // indirect
github.com/cespare/xxhash/v2 v2.2.0 // indirect
github.com/davecgh/go-spew v1.1.1 // indirect
github.com/deckarep/golang-set/v2 v2.1.0 // indirect
github.com/decred/dcrd/dcrec/secp256k1/v4 v4.0.1 // indirect
github.com/dgryski/go-rendezvous v0.0.0-20200823014737-9f7001d12a5f // indirect
github.com/fatih/color v1.14.1 // indirect
github.com/go-ole/go-ole v1.2.1 // indirect
github.com/go-stack/stack v1.8.1 // indirect
Expand Down
Loading

0 comments on commit 484f69e

Please sign in to comment.