Skip to content

Commit

Permalink
Merge branch 'main' into PRT-replace-cookbook-with-lava-config
Browse files Browse the repository at this point in the history
  • Loading branch information
shleikes committed Dec 3, 2024
2 parents c6cd51f + e9cfc7d commit e4064e4
Show file tree
Hide file tree
Showing 29 changed files with 503 additions and 144 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ endpoints:
- api-interface: tendermintrpc
chain-id: LAV1
network-address:
address: "127.0.0.1:2220"
address: "127.0.0.1:2224"
node-urls:
- url: ws://127.0.0.1:26657/websocket
- url: http://127.0.0.1:26657
- url: http://127.0.0.1:26657
addons:
- archive
- url: https://trustless-api.com
- url: http://127.0.0.1:4444
methods:
- block
- block_by_hash
Expand All @@ -18,7 +18,7 @@ endpoints:
- api-interface: grpc
chain-id: LAV1
network-address:
address: "127.0.0.1:2220"
address: "127.0.0.1:2224"
node-urls:
- url: 127.0.0.1:9090
- url: 127.0.0.1:9090
Expand All @@ -27,7 +27,7 @@ endpoints:
- api-interface: rest
chain-id: LAV1
network-address:
address: "127.0.0.1:2220"
address: "127.0.0.1:2224"
node-urls:
- url: http://127.0.0.1:1317
- url: http://127.0.0.1:1317
Expand Down
20 changes: 20 additions & 0 deletions cookbook/projects/policy_all_chains_with_extension.yml
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,26 @@ Policy:
extensions:
- "archive"
mixed: true
- chain_id: OSMOSIS
requirements:
- collection:
api_interface: "rest"
type: "GET"
extensions:
- "archive"
mixed: true
- collection:
api_interface: "grpc"
type: ""
extensions:
- "archive"
mixed: true
- collection:
api_interface: "tendermintrpc"
type: ""
extensions:
- "archive"
mixed: true
- chain_id: COSMOSHUB
requirements:
- collection:
Expand Down
51 changes: 51 additions & 0 deletions protocol/chainlib/base_chain_parser.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,8 @@ package chainlib
import (
"errors"
"fmt"
"io"
"net/http"
"regexp"
"strings"
"sync"
Expand Down Expand Up @@ -356,6 +358,55 @@ func (apip *BaseChainParser) isValidInternalPath(path string) bool {
return ok
}

// take an http request and direct it through the consumer
func (apip *BaseChainParser) ExtractDataFromRequest(request *http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) {
// Extract relative URL path
url = request.URL.Path
// Extract connection type
connectionType = request.Method

// Extract metadata
for key, values := range request.Header {
for _, value := range values {
metadata = append(metadata, pairingtypes.Metadata{
Name: key,
Value: value,
})
}
}

// Extract data
if request.Body != nil {
bodyBytes, err := io.ReadAll(request.Body)
if err != nil {
return "", "", "", nil, err
}
data = string(bodyBytes)
}

return url, data, connectionType, metadata, nil
}

func (apip *BaseChainParser) SetResponseFromRelayResult(relayResult *common.RelayResult) (*http.Response, error) {
if relayResult == nil {
return nil, errors.New("relayResult is nil")
}
response := &http.Response{
StatusCode: relayResult.StatusCode,
Header: make(http.Header),
}

for _, values := range relayResult.Reply.Metadata {
response.Header.Add(values.Name, values.Value)
}

if relayResult.Reply != nil && relayResult.Reply.Data != nil {
response.Body = io.NopCloser(strings.NewReader(string(relayResult.Reply.Data)))
}

return response, nil
}

// getSupportedApi fetches service api from spec by name
func (apip *BaseChainParser) getApiCollection(connectionType, internalPath, addon string) (*spectypes.ApiCollection, error) {
// Guard that the GrpcChainParser instance exists
Expand Down
5 changes: 1 addition & 4 deletions protocol/chainlib/chain_router.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,14 +326,11 @@ func newChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint lavase
}
}
if hasSubscriptionInSpec && apiCollection.Enabled && !webSocketSupported {
err := utils.LavaFormatError("subscriptions are applicable for this chain, but websocket is not provided in 'supported' map. By not setting ws/wss your provider wont be able to accept ws subscriptions, therefore might receive less rewards and lower QOS score.", nil,
return nil, utils.LavaFormatError("subscriptions are applicable for this chain, but websocket is not provided in 'supported' map. By not setting ws/wss your provider wont be able to accept ws subscriptions, therefore might receive less rewards and lower QOS score.", nil,
utils.LogAttr("apiInterface", apiCollection.CollectionData.ApiInterface),
utils.LogAttr("supportedMap", supportedMap),
utils.LogAttr("required", WebSocketExtension),
)
if !IgnoreSubscriptionNotConfiguredError {
return nil, err
}
}

utils.LavaFormatDebug("router keys", utils.LogAttr("chainProxyRouter", chainProxyRouter))
Expand Down
8 changes: 0 additions & 8 deletions protocol/chainlib/chain_router_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,6 @@ func TestChainRouterWithDisabledWebSocketInSpec(t *testing.T) {
chainParser, err := NewChainParser(apiInterface)
require.NoError(t, err)

IgnoreSubscriptionNotConfiguredError = false

addonsOptions := []string{"-addon-", "-addon2-"}
extensionsOptions := []string{"-test-", "-test2-", "-test3-"}

Expand Down Expand Up @@ -400,8 +398,6 @@ func TestChainRouterWithEnabledWebSocketInSpec(t *testing.T) {
chainParser, err := NewChainParser(apiInterface)
require.NoError(t, err)

IgnoreSubscriptionNotConfiguredError = false

addonsOptions := []string{"-addon-", "-addon2-"}
extensionsOptions := []string{"-test-", "-test2-", "-test3-"}

Expand Down Expand Up @@ -795,8 +791,6 @@ func TestChainRouterWithMethodRoutes(t *testing.T) {
chainParser, err := NewChainParser(apiInterface)
require.NoError(t, err)

IgnoreSubscriptionNotConfiguredError = false

addonsOptions := []string{"-addon-", "-addon2-"}
extensionsOptions := []string{"-test-", "-test2-", "-test3-"}

Expand Down Expand Up @@ -2181,8 +2175,6 @@ func TestChainRouterWithInternalPaths(t *testing.T) {
chainParser, err := NewChainParser(play.apiInterface)
require.NoError(t, err)

IgnoreSubscriptionNotConfiguredError = false

spec := testcommon.CreateMockSpec()
spec.ApiCollections = play.specApiCollections
chainParser.SetSpec(spec)
Expand Down
27 changes: 24 additions & 3 deletions protocol/chainlib/chainlib.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package chainlib
import (
"context"
"fmt"
"net/http"
"time"

"github.com/lavanet/lava/v4/protocol/chainlib/chainproxy/rpcInterfaceMessages"
Expand All @@ -11,13 +12,13 @@ import (
"github.com/lavanet/lava/v4/protocol/common"
"github.com/lavanet/lava/v4/protocol/lavasession"
"github.com/lavanet/lava/v4/protocol/metrics"
"github.com/lavanet/lava/v4/utils"
pairingtypes "github.com/lavanet/lava/v4/x/pairing/types"
spectypes "github.com/lavanet/lava/v4/x/spec/types"
)

var (
IgnoreSubscriptionNotConfiguredError = true
IgnoreSubscriptionNotConfiguredErrorFlag = "ignore-subscription-not-configured-error"
const (
INTERNAL_ADDRESS = "internal-addr"
)

func NewChainParser(apiInterface string) (chainParser ChainParser, err error) {
Expand All @@ -44,6 +45,10 @@ func NewChainListener(
refererData *RefererData,
consumerWsSubscriptionManager *ConsumerWSSubscriptionManager,
) (ChainListener, error) {
if listenEndpoint.NetworkAddress == INTERNAL_ADDRESS {
utils.LavaFormatDebug("skipping chain listener for internal address")
return NewEmptyChainListener(), nil
}
switch listenEndpoint.ApiInterface {
case spectypes.APIInterfaceJsonRPC:
return NewJrpcChainListener(ctx, listenEndpoint, relaySender, healthReporter, rpcConsumerLogs, refererData, consumerWsSubscriptionManager), nil
Expand Down Expand Up @@ -76,6 +81,8 @@ type ChainParser interface {
UpdateBlockTime(newBlockTime time.Duration)
GetUniqueName() string
ExtensionsParser() *extensionslib.ExtensionParser
ExtractDataFromRequest(*http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error)
SetResponseFromRelayResult(*common.RelayResult) (*http.Response, error)
}

type ChainMessage interface {
Expand Down Expand Up @@ -173,3 +180,17 @@ func GetChainRouter(ctx context.Context, nConns uint, rpcProviderEndpoint *lavas
}
return newChainRouter(ctx, nConns, *rpcProviderEndpoint, chainParser, proxyConstructor)
}

type EmptyChainListener struct{}

func NewEmptyChainListener() ChainListener {
return &EmptyChainListener{}
}

func (*EmptyChainListener) Serve(ctx context.Context, cmdFlags common.ConsumerCmdFlags) {
// do nothing
}

func (*EmptyChainListener) GetListeningAddress() string {
return ""
}
24 changes: 21 additions & 3 deletions protocol/chainlib/chainproxy/rpcclient/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -237,13 +237,16 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool {
h.handleSubscriptionResultTendermint(msg)
return true
case msg.isEthereumNotification():
if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
if strings.HasSuffix(msg.Method, ethereumNotificationMethodSuffix) {
h.handleSubscriptionResultEthereum(msg)
return true
} else if strings.HasSuffix(msg.Method, solanaNotificationMethodSuffix) {
h.handleSubscriptionResultSolana(msg)
return true
}
return false
case msg.isStarkNetPathfinderNotification():
if strings.HasSuffix(msg.Method, notificationMethodSuffix) {
if strings.HasSuffix(msg.Method, ethereumNotificationMethodSuffix) {
h.handleSubscriptionResultStarkNetPathfinder(msg)
return true
}
Expand All @@ -258,7 +261,7 @@ func (h *handler) handleImmediate(msg *JsonrpcMessage) bool {
}

func (h *handler) handleSubscriptionResultStarkNetPathfinder(msg *JsonrpcMessage) {
var result starkNetPathfinderSubscriptionResult
var result integerIdSubscriptionResult
if err := json.Unmarshal(msg.Result, &result); err != nil {
utils.LavaFormatTrace("Dropping invalid starknet pathfinder subscription message",
utils.LogAttr("err", err),
Expand Down Expand Up @@ -290,6 +293,21 @@ func (h *handler) handleSubscriptionResultEthereum(msg *JsonrpcMessage) {
}
}

func (h *handler) handleSubscriptionResultSolana(msg *JsonrpcMessage) {
var result integerIdSubscriptionResult
if err := json.Unmarshal(msg.Params, &result); err != nil {
utils.LavaFormatTrace("Dropping invalid solana subscription message",
utils.LogAttr("err", err),
utils.LogAttr("params", string(msg.Params)),
)
h.log.Debug("Dropping invalid subscription message")
return
}
if h.clientSubs[strconv.Itoa(result.ID)] != nil {
h.clientSubs[strconv.Itoa(result.ID)].deliver(msg)
}
}

func (h *handler) handleSubscriptionResultTendermint(msg *JsonrpcMessage) {
var result tendermintSubscriptionResult
if err := json.Unmarshal(msg.Result, &result); err != nil {
Expand Down
13 changes: 7 additions & 6 deletions protocol/chainlib/chainproxy/rpcclient/json.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,11 +33,12 @@ import (
)

const (
Vsn = "2.0"
serviceMethodSeparator = "_"
subscribeMethodSuffix = "_subscribe"
unsubscribeMethodSuffix = "_unsubscribe"
notificationMethodSuffix = "_subscription"
Vsn = "2.0"
serviceMethodSeparator = "_"
subscribeMethodSuffix = "_subscribe"
unsubscribeMethodSuffix = "_unsubscribe"
ethereumNotificationMethodSuffix = "_subscription"
solanaNotificationMethodSuffix = "Notification"

defaultWriteTimeout = 10 * time.Second // used if context has no deadline
)
Expand All @@ -49,7 +50,7 @@ type ethereumSubscriptionResult struct {
Result json.RawMessage `json:"result,omitempty"`
}

type starkNetPathfinderSubscriptionResult struct {
type integerIdSubscriptionResult struct {
ID int `json:"subscription"`
Result json.RawMessage `json:"result,omitempty"`
}
Expand Down
2 changes: 1 addition & 1 deletion protocol/chainlib/chainproxy/rpcclient/subscription.go
Original file line number Diff line number Diff line change
Expand Up @@ -181,7 +181,7 @@ func (n *Notifier) send(sub *Subscription, data json.RawMessage) error {
ctx := context.Background()
return n.h.conn.writeJSON(ctx, &JsonrpcMessage{
Version: Vsn,
Method: n.namespace + notificationMethodSuffix,
Method: n.namespace + ethereumNotificationMethodSuffix,
Params: params,
})
}
Expand Down
6 changes: 6 additions & 0 deletions protocol/chainlib/tendermintRPC.go
Original file line number Diff line number Diff line change
Expand Up @@ -275,6 +275,12 @@ func (*TendermintChainParser) newBatchChainMessage(serviceApi *spectypes.Api, re
return nodeMsg, err
}

// overwritten because tendermintrpc doesnt use POST but an empty connecionType
func (apip *TendermintChainParser) ExtractDataFromRequest(request *http.Request) (url string, data string, connectionType string, metadata []pairingtypes.Metadata, err error) {
url, data, _, metadata, err = apip.BaseChainParser.ExtractDataFromRequest(request)
return url, data, "", metadata, err
}

func (*TendermintChainParser) newChainMessage(serviceApi *spectypes.Api, requestedBlock int64, requestedHashes []string, msg *rpcInterfaceMessages.TendermintrpcMessage, apiCollection *spectypes.ApiCollection, usedDefaultValue bool) *baseChainMessageContainer {
nodeMsg := &baseChainMessageContainer{
api: serviceApi,
Expand Down
2 changes: 1 addition & 1 deletion protocol/integration/protocol_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2154,7 +2154,7 @@ func TestArchiveProvidersRetryOnParsedHash(t *testing.T) {
ChainId: specId,
SeenBlock: 1005,
BlocksHashesToHeights: []*pairingtypes.BlockHashToHeight{{Hash: blockHash, Height: spectypes.NOT_APPLICABLE}},
}) // caching in the portal doesn't care about hashes, and we don't have data on finalization yet
}) // caching in the consumer doesn't care about hashes, and we don't have data on finalization yet
cancel()
if err != nil {
continue
Expand Down
2 changes: 1 addition & 1 deletion protocol/lavasession/provider_types.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ func (endpoint *RPCProviderEndpoint) AddonsString() string {
}

func (endpoint *RPCProviderEndpoint) String() string {
return endpoint.ChainID + ":" + endpoint.ApiInterface + " Network Address:" + endpoint.NetworkAddress.Address + " Node: " + endpoint.UrlsString() + " Geolocation:" + strconv.FormatUint(endpoint.Geolocation, 10) + " Addons:" + endpoint.AddonsString()
return endpoint.ChainID + ":" + endpoint.ApiInterface + " Network Address:" + endpoint.NetworkAddress.Address + " Node:" + endpoint.UrlsString() + " Geolocation:" + strconv.FormatUint(endpoint.Geolocation, 10) + " Addons:" + endpoint.AddonsString()
}

func (endpoint *RPCProviderEndpoint) Validate() error {
Expand Down
Loading

0 comments on commit e4064e4

Please sign in to comment.