Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

support shard-based routing #89

Merged
merged 14 commits into from
Mar 6, 2024
Merged
6 changes: 5 additions & 1 deletion .env
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ TEST_DATABASE_ENDPOINT_URL=localhost:5432
TEST_PROXY_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-validator:8545,localhost:7778>http://kava-pruning:8545
TEST_PROXY_HEIGHT_BASED_ROUTING_ENABLED=true
TEST_PROXY_PRUNING_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-pruning:8545,localhost:7778>http://kava-pruning:8545
TEST_PROXY_SHARD_BACKEND_HOST_URL_MAP=localhost:7777>10|http://kava-shard-10:8545|20|http://kava-shard-20:8545
# What level of logging to use for service objects constructed during
# unit tests
TEST_SERVICE_LOG_LEVEL=ERROR
Expand Down Expand Up @@ -71,9 +72,12 @@ PROXY_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-validator:8545,localhost:7
# otherwise, it falls back to the value in PROXY_BACKEND_HOST_URL_MAP
PROXY_HEIGHT_BASED_ROUTING_ENABLED=true
PROXY_PRUNING_BACKEND_HOST_URL_MAP=localhost:7777>http://kava-pruning:8545,localhost:7778>http://kava-pruning:8545
# enable shard routing for hosts defined in PROXY_SHARD_BACKEND_HOST_URL_MAP
PROXY_SHARDED_ROUTING_ENABLED=true
PROXY_SHARD_BACKEND_HOST_URL_MAP=localhost:7777>10|http://kava-shard-10:8545|20|http://kava-shard-20:8545
# PROXY_MAXIMUM_REQ_BATCH_SIZE is a proxy-enforced limit on the number of subrequest in a batch
PROXY_MAXIMUM_REQ_BATCH_SIZE=100
# Configuration for the servcie to connect to it's database
# Configuration for the service to connect to it's database
DATABASE_NAME=postgres
DATABASE_ENDPOINT_URL=postgres:5432
DATABASE_USERNAME=postgres
Expand Down
2 changes: 1 addition & 1 deletion architecture/PROXY_ROUTING.md
Original file line number Diff line number Diff line change
Expand Up @@ -77,7 +77,7 @@ Now suppose you want multiple backends for the same host.

The proxy service supports height-based routing to direct requests that only require the most recent
block to a different cluster.
This support is handled via the [`HeightShardingProxies` implementation](../service/shard.go#L16).
This support is handled via the [`PruningOrDefaultProxies` implementation](../service/shard.go#L16).

This is configured via the `PROXY_HEIGHT_BASED_ROUTING_ENABLED` and `PROXY_PRUNING_BACKEND_HOST_URL_MAP`
environment variables.
Expand Down
3 changes: 3 additions & 0 deletions ci.docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -23,11 +23,14 @@ services:
env_file: .env
environment:
PROXY_HEIGHT_BASED_ROUTING_ENABLED: "true"
PROXY_SHARDED_ROUTING_ENABLED: "true"
# use public testnet as backend origin server to avoid having
# to self-host a beefy Github Action runner
# to build and run a kava node each execution
PROXY_BACKEND_HOST_URL_MAP: localhost:7777>https://evmrpcdata.internal.testnet.proxy.kava.io,localhost:7778>https://evmrpc.internal.testnet.proxy.kava.io
PROXY_PRUNING_BACKEND_HOST_URL_MAP: localhost:7777>https://evmrpc.internal.testnet.proxy.kava.io
# fake the shards by defining shards with existing backends
PROXY_SHARD_BACKEND_HOST_URL_MAP: localhost:7777>10|https://evmrpc.internal.testnet.proxy.kava.io|20|https://evmrpc.internal.testnet.proxy.kava.io
EVM_QUERY_SERVICE_URL: https://evmrpc.internal.testnet.proxy.kava.io
ports:
- "${PROXY_HOST_PORT}:${PROXY_CONTAINER_PORT}"
Expand Down
54 changes: 54 additions & 0 deletions config/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,9 @@ type Config struct {
EnableHeightBasedRouting bool
ProxyPruningBackendHostURLMapRaw string
ProxyPruningBackendHostURLMap map[string]url.URL
EnableShardedRouting bool
ProxyShardBackendHostURLMapRaw string
ProxyShardBackendHostURLMap map[string]IntervalURLMap
ProxyMaximumBatchSize int
EvmQueryServiceURL string
DatabaseName string
Expand Down Expand Up @@ -65,6 +68,8 @@ const (
PROXY_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_BACKEND_HOST_URL_MAP"
PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY = "PROXY_HEIGHT_BASED_ROUTING_ENABLED"
PROXY_PRUNING_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_PRUNING_BACKEND_HOST_URL_MAP"
PROXY_SHARDED_ROUTING_ENABLED_ENVIRONMENT_KEY = "PROXY_SHARDED_ROUTING_ENABLED"
PROXY_SHARD_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY = "PROXY_SHARD_BACKEND_HOST_URL_MAP"
PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY = "PROXY_MAXIMUM_REQ_BATCH_SIZE"
DEFAULT_PROXY_MAXIMUM_BATCH_SIZE = 500
PROXY_SERVICE_PORT_ENVIRONMENT_KEY = "PROXY_SERVICE_PORT"
Expand Down Expand Up @@ -220,6 +225,50 @@ func ParseRawProxyBackendHostURLMap(raw string) (map[string]url.URL, error) {
return hostURLMap, combinedErr
}

// ParseRawShardRoutingBackendHostURLMap attempts to parse backend host URL mapping for shards.
// The shard map is a map of host name => (map of end block => backend route)
// returning the mapping and error (if any)
func ParseRawShardRoutingBackendHostURLMap(raw string) (map[string]IntervalURLMap, error) {
parsed := make(map[string]IntervalURLMap)
hostConfigs := strings.Split(raw, ",")
for _, hc := range hostConfigs {
pieces := strings.Split(hc, ">")
if len(pieces) != 2 {
return parsed, fmt.Errorf("expected shard definition like <host>:<end-height>|<backend-route>, found '%s'", hc)
}

host := pieces[0]
endpointBackendValues := strings.Split(pieces[1], "|")
if len(endpointBackendValues)%2 != 0 {
return parsed, fmt.Errorf("unexpected <end-height>|<backend-route> sequence for %s: %s",
host, pieces[1],
)
}

backendByEndHeight := make(map[uint64]*url.URL, len(endpointBackendValues)/2)
for i := 0; i < len(endpointBackendValues); i += 2 {
endHeight, err := strconv.ParseUint(endpointBackendValues[i], 10, 64)
if err != nil || endHeight == 0 {
return parsed, fmt.Errorf("invalid shard end height (%s) for host %s: %s",
endpointBackendValues[i], host, err,
)
}

backendRoute, err := url.Parse(endpointBackendValues[i+1])
if err != nil || backendRoute.String() == "" {
return parsed, fmt.Errorf("invalid shard backend route (%s) for height %d of host %s: %s",
endpointBackendValues[i+1], endHeight, host, err,
)
}
backendByEndHeight[endHeight] = backendRoute
}

parsed[host] = NewIntervalURLMap(backendByEndHeight)
}

return parsed, nil
}

// ParseRawHostnameToHeaderValueMap attempts to parse mappings of hostname to corresponding header value.
// For example hostname to access-control-allow-origin header value.
func ParseRawHostnameToHeaderValueMap(raw string) (map[string]string, error) {
Expand Down Expand Up @@ -257,10 +306,12 @@ func ParseRawHostnameToHeaderValueMap(raw string) (map[string]string, error) {
func ReadConfig() Config {
rawProxyBackendHostURLMap := os.Getenv(PROXY_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY)
rawProxyPruningBackendHostURLMap := os.Getenv(PROXY_PRUNING_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY)
rawProxyShardedBackendHostURLMap := os.Getenv(PROXY_SHARD_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY)
// best effort to parse, callers are responsible for validating
// before using any values read
parsedProxyBackendHostURLMap, _ := ParseRawProxyBackendHostURLMap(rawProxyBackendHostURLMap)
parsedProxyPruningBackendHostURLMap, _ := ParseRawProxyBackendHostURLMap(rawProxyPruningBackendHostURLMap)
parsedProxyShardedBackendHostURLMap, _ := ParseRawShardRoutingBackendHostURLMap(rawProxyShardedBackendHostURLMap)

whitelistedHeaders := os.Getenv(WHITELISTED_HEADERS_ENVIRONMENT_KEY)
parsedWhitelistedHeaders := strings.Split(whitelistedHeaders, ",")
Expand All @@ -282,6 +333,9 @@ func ReadConfig() Config {
EnableHeightBasedRouting: EnvOrDefaultBool(PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY, false),
ProxyPruningBackendHostURLMapRaw: rawProxyPruningBackendHostURLMap,
ProxyPruningBackendHostURLMap: parsedProxyPruningBackendHostURLMap,
EnableShardedRouting: EnvOrDefaultBool(PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY, false),
ProxyShardBackendHostURLMapRaw: rawProxyShardedBackendHostURLMap,
ProxyShardBackendHostURLMap: parsedProxyShardedBackendHostURLMap,
ProxyMaximumBatchSize: EnvOrDefaultInt(PROXY_MAXIMUM_BATCH_SIZE_ENVIRONMENT_KEY, DEFAULT_PROXY_MAXIMUM_BATCH_SIZE),
DatabaseName: os.Getenv(DATABASE_NAME_ENVIRONMENT_KEY),
DatabaseEndpointURL: os.Getenv(DATABASE_ENDPOINT_URL_ENVIRONMENT_KEY),
Expand Down
30 changes: 30 additions & 0 deletions config/config_test.go
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
package config_test

import (
"net/url"
"os"
"testing"

"github.com/kava-labs/kava-proxy-service/config"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)

var (
Expand All @@ -14,6 +16,8 @@ var (
proxyServiceBackendHostURLMap = os.Getenv("TEST_PROXY_BACKEND_HOST_URL_MAP")
proxyServiceHeightBasedRouting = os.Getenv("TEST_PROXY_HEIGHT_BASED_ROUTING_ENABLED")
proxyServicePruningBackendHostURLMap = os.Getenv("TEST_PROXY_PRUNING_BACKEND_HOST_URL_MAP")
proxyServiceShardedRoutingEnabled = os.Getenv("TEST_PROXY_HEIGHT_BASED_ROUTING_ENABLED")
proxyServiceShardBackendHostURLMap = os.Getenv("TEST_PROXY_SHARD_BACKEND_HOST_URL_MAP")
)

func TestUnitTestEnvODefaultReturnsDefaultIfEnvironmentVariableNotSet(t *testing.T) {
Expand Down Expand Up @@ -53,10 +57,36 @@ func TestUnitTestParseHostMapReturnsErrEmptyHostMapWhenEmpty(t *testing.T) {
assert.ErrorIs(t, err, config.ErrEmptyHostMap)
}

func TestUnitTestParseRawShardRoutingBackendHostURLMap(t *testing.T) {
parsed, err := config.ParseRawShardRoutingBackendHostURLMap("localhost:7777>10|http://kava-shard-10:8545|20|http://kava-shard-20:8545")
require.NoError(t, err)
expected := map[string]config.IntervalURLMap{
"localhost:7777": config.NewIntervalURLMap(map[uint64]*url.URL{
10: mustUrl("http://kava-shard-10:8545"),
20: mustUrl("http://kava-shard-20:8545"),
}),
}
require.Equal(t, expected, parsed)

_, err = config.ParseRawShardRoutingBackendHostURLMap("no-shard-def")
require.ErrorContains(t, err, "expected shard definition like <host>:<end-height>|<backend-route>")

_, err = config.ParseRawShardRoutingBackendHostURLMap("invalid-shard-def>odd|number|bad")
require.ErrorContains(t, err, "unexpected <end-height>|<backend-route> sequence for invalid-shard-def")

_, err = config.ParseRawShardRoutingBackendHostURLMap("invalid-height>NaN|backend-host")
require.ErrorContains(t, err, "invalid shard end height (NaN) for host invalid-height")

_, err = config.ParseRawShardRoutingBackendHostURLMap("invalid-backend-host>100|")
require.ErrorContains(t, err, "invalid shard backend route () for height 100 of host invalid-backend-host")
}

func setDefaultEnv() {
os.Setenv(config.PROXY_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY, proxyServiceBackendHostURLMap)
os.Setenv(config.PROXY_HEIGHT_BASED_ROUTING_ENABLED_KEY, proxyServiceHeightBasedRouting)
os.Setenv(config.PROXY_PRUNING_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY, proxyServicePruningBackendHostURLMap)
os.Setenv(config.PROXY_SHARDED_ROUTING_ENABLED_ENVIRONMENT_KEY, proxyServiceShardedRoutingEnabled)
os.Setenv(config.PROXY_SHARD_BACKEND_HOST_URL_MAP_ENVIRONMENT_KEY, proxyServiceShardBackendHostURLMap)
os.Setenv(config.PROXY_SERVICE_PORT_ENVIRONMENT_KEY, proxyServicePort)
os.Setenv(config.LOG_LEVEL_ENVIRONMENT_KEY, config.DEFAULT_LOG_LEVEL)
}
41 changes: 41 additions & 0 deletions config/intervalmap.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,41 @@
package config

import (
"net/url"
"sort"
)

// IntervalURLMap stores URLs associated with a range of numbers.
// The intervals are defined by their endpoints and must not overlap.
// The intervals are exclusive of the endpoints.
type IntervalURLMap struct {
UrlByEndHeight map[uint64]*url.URL
endpoints []uint64
}

// NewIntervalURLMap creates a new IntervalMap from a map of interval endpoint => url.
// The intervals are exclusive of their endpoint.
// ie. if the lowest value endpoint in the map is 10, the interval is for all numbers 1 through 9.
func NewIntervalURLMap(urlByEndHeight map[uint64]*url.URL) IntervalURLMap {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

to enforce line 9 comments of // The intervals are defined by their endpoints and must not overlap. might be handy to validate the map is discontinuous and return an error in this new function

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, i don't follow. the intervals are only defined by one point (the endpoint), so the only way you couldn't create a valid interval is if you set the same endpoint to two different URLs.

are you suggesting i panic if they set the same endpoint multiple times for the same host?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

thanks though! drawing your attention to this made me realize i was still on the branch that had exclusive endpoints. because of some simplifications i made to the shard command, it was actually easier to make them inclusive of the end block. just updated the code & tests to reflect that change (that i already had in the docs 😅)

Copy link
Contributor

@galxy25 galxy25 Mar 4, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basically I'm saying there are properties we are expecting this type to have that will effect how the service functions and is configured by hands by human, those two things in conjunction make me think it's worth adding some validation that the parsed config is not only syntactically correct but semantically correct - whether that be inclusive / exclusive or unique (fwiw I was referring to testing the inclusive property in terms that the supplied interval map doesn't have any gaps, e.g. config for a shard for blocks 10-50, and for shard 100-latest since this is something configured by a human operator and human operators err but the case of duplicate endpoints is also a good edge case to validate)

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

gotcha. at this point the value has already been parsed into a valid map. the encoding is such that you can't define non-continuous ranges of shards (only endblocks are defined. the first shard starts at height 1, and then each shard covers from the previous shard's endBlock + 1 to its defined end block).

i've added some more validations to the parsing of the env value to hopefully catch misunderstandings or misconfigurations:

  • error if a host's shards are not ordered by end block (implies a misunderstanding in how the shard block ranges are defined)
  • error if a host has multiple shards defined for the same end block (prevents quiet overrides of the first defined value(s))

endpoints := make([]uint64, 0, len(urlByEndHeight))
for e := range urlByEndHeight {
endpoints = append(endpoints, e)
}
sort.Slice(endpoints, func(i, j int) bool { return endpoints[i] < endpoints[j] })

return IntervalURLMap{
UrlByEndHeight: urlByEndHeight,
endpoints: endpoints,
}
}

// Lookup finds the value associated with the interval containing the number, if it exists.
func (im *IntervalURLMap) Lookup(num uint64) (*url.URL, uint64, bool) {
i := sort.Search(len(im.endpoints), func(i int) bool { return im.endpoints[i] > num })

if i < len(im.endpoints) && num < im.endpoints[i] {
return im.UrlByEndHeight[im.endpoints[i]], im.endpoints[i], true
}

return nil, 0, false
}
56 changes: 56 additions & 0 deletions config/intervalmap_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,56 @@
package config_test

import (
"fmt"
"net/url"
"testing"

"github.com/kava-labs/kava-proxy-service/config"
"github.com/stretchr/testify/require"
)

func mustUrl(s string) *url.URL {
u, err := url.Parse(s)
if err != nil {
panic(fmt.Sprintf("failed to parse url %s: %s", s, err))
}
return u
}

func TestUnitTestIntervalMap(t *testing.T) {
valueByEndpoint := map[uint64]*url.URL{
10: mustUrl("A"),
20: mustUrl("B"),
100: mustUrl("C"),
}
intervalmap := config.NewIntervalURLMap(valueByEndpoint)

testCases := []struct {
value uint64
expectFound bool
expectEndHeight uint64
expectResult string
}{
{1, true, 10, "A"},
{9, true, 10, "A"},
{10, true, 20, "B"},
{15, true, 20, "B"},
{20, true, 100, "C"},
{75, true, 100, "C"},
{100, false, 0, ""},
{300, false, 0, ""},
}

for _, tc := range testCases {
t.Run(fmt.Sprintf("Lookup(%d)", tc.value), func(t *testing.T) {
result, endHeight, found := intervalmap.Lookup(tc.value)
require.Equal(t, tc.expectFound, found)
require.Equal(t, tc.expectEndHeight, endHeight)
if tc.expectResult == "" {
require.Nil(t, result)
} else {
require.Equal(t, tc.expectResult, result.String())
}
})
}
}
28 changes: 28 additions & 0 deletions docker-compose.yml
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,34 @@ services:
- "${KAVA_CONTAINER_COSMOS_RPC_PORT}"
- "${KAVA_CONTAINER_EVM_RPC_PORT}"

# shards are in name only. they are standard peer nodes, but will only recieve traffic
pirtleshell marked this conversation as resolved.
Show resolved Hide resolved
# for a specific block range. kava-shard-10 receives requests for heights 1-9
kava-shard-10:
image: kava/kava:${KAVA_CONTAINER_TAG}
entrypoint: /docker/shared/kava-entrypoint.sh
env_file: .env
volumes:
- ./docker/shared:/docker/shared
# expose ports for other services to be able to connect to within
# the default docker-compose network
expose:
- "${KAVA_CONTAINER_COSMOS_RPC_PORT}"
- "${KAVA_CONTAINER_EVM_RPC_PORT}"

# shards are in name only. they are standard peer nodes, but will only recieve traffic
# for a specific block range. kava-shard-20 receives requests for heights 10-19
kava-shard-20:
image: kava/kava:${KAVA_CONTAINER_TAG}
entrypoint: /docker/shared/kava-entrypoint.sh
env_file: .env
volumes:
- ./docker/shared:/docker/shared
# expose ports for other services to be able to connect to within
# the default docker-compose network
expose:
- "${KAVA_CONTAINER_COSMOS_RPC_PORT}"
- "${KAVA_CONTAINER_EVM_RPC_PORT}"

# run proxy service to observe, route, and scale requests to kava api endpoints
proxy:
build:
Expand Down
23 changes: 19 additions & 4 deletions main_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -89,8 +89,9 @@ var (
// search for any request metrics between startTime and time.Now() for particular request methods
// if testedmethods is empty, all metrics in timeframe are returned.
func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Time, testedmethods []string) []database.ProxiedRequestMetric {
extension := time.Duration(testExtendMetricWindowMs) * time.Millisecond
// add small buffer into future in case metrics are still being created
endTime := time.Now().Add(time.Duration(testExtendMetricWindowMs) * time.Millisecond)
endTime := time.Now().Add(extension)

var nextCursor int64
var proxiedRequestMetrics []database.ProxiedRequestMetric
Expand Down Expand Up @@ -130,6 +131,8 @@ func findMetricsInWindowForMethods(db database.PostgresClient, startTime time.Ti
}
}

// ensure next window has no overlap with current one
time.Sleep(extension)
return requestMetricsDuringRequestWindow
}

Expand Down Expand Up @@ -422,14 +425,26 @@ func TestE2ETest_HeightBasedRouting(t *testing.T) {
{
name: "request for non-latest height -> default",
method: "eth_getBlockByNumber",
params: []interface{}{"0x2", false},
params: []interface{}{"0x15", false}, // block 21 is beyond shards
expectRoute: service.ResponseBackendDefault,
},
{
name: "request for earliest height -> default",
name: "request for height in 1st shard -> shard",
method: "eth_getBlockByNumber",
params: []interface{}{"0x2", false}, // block 2
expectRoute: service.ResponseBackendShard,
},
{
name: "request for height in 2nd shard -> shard",
method: "eth_getBlockByNumber",
params: []interface{}{"0xF", false}, // block 15
expectRoute: service.ResponseBackendShard,
},
{
name: "request for earliest height -> 1st shard",
method: "eth_getBlockByNumber",
params: []interface{}{"earliest", false},
expectRoute: service.ResponseBackendDefault,
expectRoute: service.ResponseBackendShard,
},
{
name: "request for latest height -> pruning",
Expand Down
Loading
Loading