Skip to content

Commit

Permalink
Merge branch 'develop' into bump-mcms-for-salt-fix
Browse files Browse the repository at this point in the history
  • Loading branch information
connorwstein committed Dec 13, 2024
2 parents 1253d1d + 2ec4d6c commit 8bc5127
Show file tree
Hide file tree
Showing 13 changed files with 228 additions and 22 deletions.
2 changes: 2 additions & 0 deletions core/capabilities/compute/compute_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@ import (
"github.com/smartcontractkit/chainlink/v2/core/capabilities"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/wasmtest"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"

cappkg "github.com/smartcontractkit/chainlink-common/pkg/capabilities"
"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
Expand Down Expand Up @@ -188,6 +189,7 @@ func TestComputeFetch(t *testing.T) {
th := setup(t, defaultConfig)

th.connector.EXPECT().DonID().Return("don-id")
th.connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
th.connector.EXPECT().GatewayIDs().Return([]string{"gateway1", "gateway2"})

msgID := strings.Join([]string{
Expand Down
11 changes: 9 additions & 2 deletions core/capabilities/webapi/outgoing_connector_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,15 @@ func (c *OutgoingConnectorHandler) HandleSingleNodeRequest(ctx context.Context,
}
sort.Strings(gatewayIDs)

err = c.gc.SignAndSendToGateway(ctx, gatewayIDs[0], body)
if err != nil {
selectedGateway := gatewayIDs[0]

l.Infow("selected gateway, awaiting connection", "gatewayID", selectedGateway)

if err := c.gc.AwaitConnection(ctx, selectedGateway); err != nil {
return nil, errors.Wrap(err, "await connection canceled")
}

if err := c.gc.SignAndSendToGateway(ctx, selectedGateway, body); err != nil {
return nil, errors.Wrap(err, "failed to send request to gateway")
}

Expand Down
3 changes: 3 additions & 0 deletions core/capabilities/webapi/outgoing_connector_handler_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/smartcontractkit/chainlink-common/pkg/utils/tests"
"github.com/smartcontractkit/chainlink/v2/core/logger"
"github.com/smartcontractkit/chainlink/v2/core/utils/matches"

"github.com/smartcontractkit/chainlink/v2/core/services/gateway/api"
gcmocks "github.com/smartcontractkit/chainlink/v2/core/services/gateway/connector/mocks"
Expand All @@ -36,6 +37,7 @@ func TestHandleSingleNodeRequest(t *testing.T) {
msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the default timeout
Expand Down Expand Up @@ -82,6 +84,7 @@ func TestHandleSingleNodeRequest(t *testing.T) {
msgID := "msgID"
testURL := "http://localhost:8080"
connector.EXPECT().DonID().Return("donID")
connector.EXPECT().AwaitConnection(matches.AnyContext, "gateway1").Return(nil)
connector.EXPECT().GatewayIDs().Return([]string{"gateway1"})

// build the expected body with the defined timeout
Expand Down
2 changes: 1 addition & 1 deletion core/capabilities/webapi/target/target_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -194,7 +194,7 @@ func TestCapability_Execute(t *testing.T) {
require.NoError(t, err)

gatewayResp := gatewayResponse(t, msgID)

th.connector.EXPECT().AwaitConnection(mock.Anything, "gateway1").Return(nil)
th.connector.On("SignAndSendToGateway", mock.Anything, "gateway1", mock.Anything).Return(nil).Run(func(args mock.Arguments) {
th.connectorHandler.HandleGatewayMessage(ctx, "gateway1", gatewayResp)
}).Once()
Expand Down
6 changes: 4 additions & 2 deletions core/chains/evm/logpoller/log_poller.go
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,8 @@ import (
pkgerrors "github.com/pkg/errors"
"golang.org/x/exp/maps"

commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/services"
"github.com/smartcontractkit/chainlink-common/pkg/timeutil"
Expand Down Expand Up @@ -91,6 +93,7 @@ type Client interface {
}

type HeadTracker interface {
services.Service
LatestAndFinalizedBlock(ctx context.Context) (latest, finalized *evmtypes.Head, err error)
}

Expand All @@ -99,7 +102,6 @@ var (
ErrReplayRequestAborted = pkgerrors.New("aborted, replay request cancelled")
ErrReplayInProgress = pkgerrors.New("replay request cancelled, but replay is already in progress")
ErrLogPollerShutdown = pkgerrors.New("replay aborted due to log poller shutdown")
ErrFinalityViolated = pkgerrors.New("finality violated")
)

type logPoller struct {
Expand Down Expand Up @@ -525,7 +527,7 @@ func (lp *logPoller) Close() error {

func (lp *logPoller) Healthy() error {
if lp.finalityViolated.Load() {
return ErrFinalityViolated
return commontypes.ErrFinalityViolated
}
return nil
}
Expand Down
9 changes: 7 additions & 2 deletions core/chains/evm/logpoller/log_poller_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,10 +22,13 @@ import (
"github.com/stretchr/testify/require"
"go.uber.org/zap/zapcore"

commontypes "github.com/smartcontractkit/chainlink-common/pkg/types"

"github.com/smartcontractkit/chainlink-common/pkg/logger"
"github.com/smartcontractkit/chainlink-common/pkg/types/query"
"github.com/smartcontractkit/chainlink-common/pkg/types/query/primitives"
commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils"

"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/chaintype"

htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks"
Expand Down Expand Up @@ -1106,7 +1109,8 @@ func TestLogPoller_ReorgDeeperThanFinality(t *testing.T) {

secondPoll := th.PollAndSaveLogs(testutils.Context(t), firstPoll)
assert.Equal(t, firstPoll, secondPoll)
assert.Equal(t, logpoller.ErrFinalityViolated, th.LogPoller.Healthy())
require.Equal(t, commontypes.ErrFinalityViolated, th.LogPoller.Healthy())
require.Equal(t, commontypes.ErrFinalityViolated, th.LogPoller.HealthReport()[th.LogPoller.Name()])

// Manually remove re-org'd chain from the log poller to bring it back to life
// LogPoller should be healthy again after first poll
Expand All @@ -1116,7 +1120,8 @@ func TestLogPoller_ReorgDeeperThanFinality(t *testing.T) {
// Poll from latest
recoveryPoll := th.PollAndSaveLogs(testutils.Context(t), 1)
assert.Equal(t, int64(35), recoveryPoll)
assert.NoError(t, th.LogPoller.Healthy())
require.NoError(t, th.LogPoller.Healthy())
require.NoError(t, th.LogPoller.HealthReport()[th.LogPoller.Name()])
})
}
}
Expand Down
45 changes: 39 additions & 6 deletions core/services/gateway/connector/connector.go
Original file line number Diff line number Diff line change
Expand Up @@ -28,13 +28,14 @@ type GatewayConnector interface {

AddHandler(methods []string, handler GatewayConnectorHandler) error
// SendToGateway takes a signed message as argument and sends it to the specified gateway
SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error
SendToGateway(ctx context.Context, gatewayID string, msg *api.Message) error
// SignAndSendToGateway signs the message and sends the message to the specified gateway
SignAndSendToGateway(ctx context.Context, gatewayID string, msg *api.MessageBody) error
// GatewayIDs returns the list of Gateway IDs
GatewayIDs() []string
// DonID returns the DON ID
DonID() string
AwaitConnection(ctx context.Context, gatewayID string) error
}

// Signer implementation needs to be provided by a GatewayConnector user (node)
Expand Down Expand Up @@ -78,12 +79,30 @@ func (c *gatewayConnector) HealthReport() map[string]error {
func (c *gatewayConnector) Name() string { return c.lggr.Name() }

type gatewayState struct {
// signal channel is closed once the gateway is connected
signalCh chan struct{}

conn network.WSConnectionWrapper
config ConnectorGatewayConfig
url *url.URL
wsClient network.WebSocketClient
}

// A gatewayState is connected when the signal channel is closed
func (gs *gatewayState) signal() {
close(gs.signalCh)
}

// awaitConn blocks until the gateway is connected or the context is done
func (gs *gatewayState) awaitConn(ctx context.Context) error {
select {
case <-ctx.Done():
return fmt.Errorf("await connection failed: %w", ctx.Err())
case <-gs.signalCh:
return nil
}
}

func NewGatewayConnector(config *ConnectorConfig, signer Signer, clock clockwork.Clock, lggr logger.Logger) (GatewayConnector, error) {
if config == nil || signer == nil || clock == nil || lggr == nil {
return nil, errors.New("nil dependency")
Expand Down Expand Up @@ -125,6 +144,7 @@ func NewGatewayConnector(config *ConnectorConfig, signer Signer, clock clockwork
config: gw,
url: parsedURL,
wsClient: network.NewWebSocketClient(config.WsClientConfig, connector, lggr),
signalCh: make(chan struct{}),
}
gateways[gw.Id] = gateway
urlToId[gw.URL] = gw.Id
Expand All @@ -150,17 +170,25 @@ func (c *gatewayConnector) AddHandler(methods []string, handler GatewayConnector
return nil
}

func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayId string, msg *api.Message) error {
func (c *gatewayConnector) AwaitConnection(ctx context.Context, gatewayID string) error {
gateway, ok := c.gateways[gatewayID]
if !ok {
return fmt.Errorf("invalid Gateway ID %s", gatewayID)
}
return gateway.awaitConn(ctx)
}

func (c *gatewayConnector) SendToGateway(ctx context.Context, gatewayID string, msg *api.Message) error {
data, err := c.codec.EncodeResponse(msg)
if err != nil {
return fmt.Errorf("error encoding response for gateway %s: %v", gatewayId, err)
return fmt.Errorf("error encoding response for gateway %s: %w", gatewayID, err)
}
gateway, ok := c.gateways[gatewayId]
gateway, ok := c.gateways[gatewayID]
if !ok {
return fmt.Errorf("invalid Gateway ID %s", gatewayId)
return fmt.Errorf("invalid Gateway ID %s", gatewayID)
}
if gateway.conn == nil {
return fmt.Errorf("connector not started")
return errors.New("connector not started")
}
return gateway.conn.Write(ctx, websocket.BinaryMessage, data)
}
Expand Down Expand Up @@ -242,10 +270,15 @@ func (c *gatewayConnector) reconnectLoop(gatewayState *gatewayState) {
} else {
c.lggr.Infow("connected successfully", "url", gatewayState.url)
closeCh := gatewayState.conn.Reset(conn)
gatewayState.signal()
<-closeCh
c.lggr.Infow("connection closed", "url", gatewayState.url)

// reset backoff
redialBackoff = utils.NewRedialBackoff()

// reset signal channel
gatewayState.signalCh = make(chan struct{})
}
select {
case <-c.shutdownCh:
Expand Down
63 changes: 55 additions & 8 deletions core/services/gateway/connector/mocks/gateway_connector.go

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

20 changes: 20 additions & 0 deletions core/services/relay/evm/chain_components_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package evm_test
import (
"context"
"crypto/ecdsa"
"errors"
"fmt"
"math"
"math/big"
Expand All @@ -12,22 +13,28 @@ import (
"time"

"github.com/ethereum/go-ethereum/accounts/abi/bind"
"github.com/ethereum/go-ethereum/common"
evmtypes "github.com/ethereum/go-ethereum/core/types"
"github.com/ethereum/go-ethereum/crypto"
"github.com/ethereum/go-ethereum/ethclient/simulated"
"github.com/jmoiron/sqlx"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"

"github.com/smartcontractkit/chainlink-common/pkg/services"

commonconfig "github.com/smartcontractkit/chainlink-common/pkg/config"
commontestutils "github.com/smartcontractkit/chainlink-common/pkg/loop/testutils"
clcommontypes "github.com/smartcontractkit/chainlink-common/pkg/types"
"github.com/smartcontractkit/chainlink-common/pkg/types/interfacetests"

htMocks "github.com/smartcontractkit/chainlink/v2/common/headtracker/mocks"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/client"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller"
lpMocks "github.com/smartcontractkit/chainlink/v2/core/chains/evm/logpoller/mocks"
evmtxmgr "github.com/smartcontractkit/chainlink/v2/core/chains/evm/txmgr"
clevmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/internal/cltest"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils"
"github.com/smartcontractkit/chainlink/v2/core/internal/testutils/configtest"
Expand Down Expand Up @@ -206,6 +213,19 @@ func TestContractReaderEventsInitValidation(t *testing.T) {
}
}

func TestChainReader_HealthReport(t *testing.T) {
lp := lpMocks.NewLogPoller(t)
lp.EXPECT().HealthReport().Return(map[string]error{"lp_name": clcommontypes.ErrFinalityViolated}).Once()
ht := htMocks.NewHeadTracker[*clevmtypes.Head, common.Hash](t)
htError := errors.New("head tracker error")
ht.EXPECT().HealthReport().Return(map[string]error{"ht_name": htError}).Once()
cr, err := evm.NewChainReaderService(testutils.Context(t), logger.NullLogger, lp, ht, nil, types.ChainReaderConfig{Contracts: nil})
require.NoError(t, err)
healthReport := cr.HealthReport()
require.True(t, services.ContainsError(healthReport, clcommontypes.ErrFinalityViolated), "expected chain reader to propagate logpoller's error")
require.True(t, services.ContainsError(healthReport, htError), "expected chain reader to propagate headtracker's error")
}

func TestChainComponents(t *testing.T) {
testutils.SkipFlakey(t, "https://smartcontract-it.atlassian.net/browse/BCFR-1083")
t.Parallel()
Expand Down
Loading

0 comments on commit 8bc5127

Please sign in to comment.