Skip to content

Commit

Permalink
Custom Astar finality (#14021)
Browse files Browse the repository at this point in the history
* Custom Astar finality

* fix merge artifact

* fix lint issue

* simplify isRequestingFinalizedBlock

* avoid iterating through the whole batch again

* fix errors wrapping
  • Loading branch information
dhaidashenko authored Aug 9, 2024
1 parent 95cb692 commit bd648bd
Show file tree
Hide file tree
Showing 11 changed files with 277 additions and 49 deletions.
5 changes: 5 additions & 0 deletions .changeset/warm-houses-build.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"chainlink": minor
---

Added custom finality calculation for Astar #internal
4 changes: 4 additions & 0 deletions core/chains/evm/client/chain_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -160,9 +160,13 @@ func (c *chainClient) BalanceAt(ctx context.Context, account common.Address, blo
return c.multiNode.BalanceAt(ctx, account, blockNumber)
}

// BatchCallContext - sends all given requests as a single batch.
// Request specific errors for batch calls are returned to the individual BatchElem.
// Ensure the same BatchElem slice provided by the caller is passed through the call stack
// to ensure the caller has access to the errors.
// Note: some chains (e.g Astar) have custom finality requests, so even when FinalityTagEnabled=true, finality tag
// might not be properly handled and returned results might have weaker finality guarantees. It's highly recommended
// to use HeadTracker to identify latest finalized block.
func (c *chainClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
return c.multiNode.BatchCallContext(ctx, b)
}
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/chain_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ func TestEthClient_HeaderByNumber(t *testing.T) {
`{"difficulty":"0xf3a00","extraData":"0xd883010503846765746887676f312e372e318664617277696e","gasLimit":"0xffc001","gasUsed":"0x0","hash":"0x41800b5c3f1717687d85fc9018faac0a6e90b39deaa0b99e7fe4fe796ddeb26a","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xd1aeb42885a43b72b518182ef893125814811048","mixHash":"0x0f98b15f1a4901a7e9204f3c500a7bd527b3fb2c3340e12176a44b83e414a69e","nonce":"0x0ece08ea8c49dfd9","number":"0x1","parentHash":"0x41941023680923e0fe4d74a34bdac8141f2540e3ae90623718e47d66d1ca4a2d","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x218","stateRoot":"0xc7b01007a10da045eacb90385887dd0c38fcb5db7393006bdde24b93873c334b","timestamp":"0x58318da2","totalDifficulty":"0x1f3a00","transactions":[],"transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","uncles":[]}`},
{"happy parity", expectedBlockNum, expectedBlockNum.Int64(), nil,
`{"author":"0xd1aeb42885a43b72b518182ef893125814811048","difficulty":"0xf3a00","extraData":"0xd883010503846765746887676f312e372e318664617277696e","gasLimit":"0xffc001","gasUsed":"0x0","hash":"0x41800b5c3f1717687d85fc9018faac0a6e90b39deaa0b99e7fe4fe796ddeb26a","logsBloom":"0x00000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000000","miner":"0xd1aeb42885a43b72b518182ef893125814811048","mixHash":"0x0f98b15f1a4901a7e9204f3c500a7bd527b3fb2c3340e12176a44b83e414a69e","nonce":"0x0ece08ea8c49dfd9","number":"0x1","parentHash":"0x41941023680923e0fe4d74a34bdac8141f2540e3ae90623718e47d66d1ca4a2d","receiptsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","sealFields":["0xa00f98b15f1a4901a7e9204f3c500a7bd527b3fb2c3340e12176a44b83e414a69e","0x880ece08ea8c49dfd9"],"sha3Uncles":"0x1dcc4de8dec75d7aab85b567b6ccd41ad312451b948a7413f0a142fd40d49347","size":"0x218","stateRoot":"0xc7b01007a10da045eacb90385887dd0c38fcb5db7393006bdde24b93873c334b","timestamp":"0x58318da2","totalDifficulty":"0x1f3a00","transactions":[],"transactionsRoot":"0x56e81f171bcc55a6ff8345e692c0f86e5b48e01b996cadc001622fb5e363b421","uncles":[]}`},
{"missing header", expectedBlockNum, 0, fmt.Errorf("no live nodes available for chain %s", testutils.FixtureChainID.String()),
{"missing header", expectedBlockNum, 0, fmt.Errorf("RPCClient returned error (eth-primary-rpc-0): not found"),
`null`},
}

Expand Down Expand Up @@ -366,7 +366,7 @@ func TestEthClient_HeaderByNumber(t *testing.T) {
ctx, cancel := context.WithTimeout(tests.Context(t), 5*time.Second)
result, err := ethClient.HeadByNumber(ctx, expectedBlockNum)
if test.error != nil {
require.Error(t, err, test.error)
require.EqualError(t, err, test.error.Error())
} else {
require.NoError(t, err)
require.Equal(t, expectedBlockHash, result.Hash.Hex())
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/evm_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ func NewEvmClient(cfg evmconfig.NodePool, chainCfg commonclient.ChainConfig, cli
for i, node := range nodes {
if node.SendOnly != nil && *node.SendOnly {
rpc := NewRPCClient(lggr, empty, (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID,
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout)
commonclient.Secondary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
sendonly := commonclient.NewSendOnlyNode(lggr, (url.URL)(*node.HTTPURL),
*node.Name, chainID, rpc)
sendonlys = append(sendonlys, sendonly)
} else {
rpc := NewRPCClient(lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i),
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout)
chainID, commonclient.Primary, cfg.FinalizedBlockPollInterval(), largePayloadRPCTimeout, defaultRPCTimeout, chainType)
primaryNode := commonclient.NewNode(cfg, chainCfg,
lggr, (url.URL)(*node.WSURL), (*url.URL)(node.HTTPURL), *node.Name, int32(i), chainID, *node.Order,
rpc, "EVM")
Expand Down
4 changes: 2 additions & 2 deletions core/chains/evm/client/helpers_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -140,7 +140,7 @@ func NewChainClientWithTestNode(
}

lggr := logger.Test(t)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout)
rpc := NewRPCClient(lggr, *parsed, rpcHTTPURL, "eth-primary-rpc-0", id, chainID, commonclient.Primary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")

n := commonclient.NewNode[*big.Int, *evmtypes.Head, RPCClient](
nodeCfg, clientMocks.ChainConfig{NoNewHeadsThresholdVal: noNewHeadsThreshold}, lggr, *parsed, rpcHTTPURL, "eth-primary-node-0", id, chainID, 1, rpc, "EVM")
Expand All @@ -152,7 +152,7 @@ func NewChainClientWithTestNode(
return nil, pkgerrors.Errorf("sendonly ethereum rpc url scheme must be http(s): %s", u.String())
}
var empty url.URL
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout)
rpc := NewRPCClient(lggr, empty, &sendonlyRPCURLs[i], fmt.Sprintf("eth-sendonly-rpc-%d", i), id, chainID, commonclient.Secondary, 0, commonclient.QueryTimeout, commonclient.QueryTimeout, "")
s := commonclient.NewSendOnlyNode[*big.Int, RPCClient](
lggr, u, fmt.Sprintf("eth-sendonly-%d", i), chainID, rpc)
sendonlys = append(sendonlys, s)
Expand Down
170 changes: 142 additions & 28 deletions core/chains/evm/client/rpc_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package client

import (
"context"
"encoding/json"
"errors"
"fmt"
"math/big"
Expand All @@ -28,6 +29,7 @@ import (
commonclient "github.com/smartcontractkit/chainlink/v2/common/client"
commontypes "github.com/smartcontractkit/chainlink/v2/common/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/assets"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/config/chaintype"
evmtypes "github.com/smartcontractkit/chainlink/v2/core/chains/evm/types"
"github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils"
ubig "github.com/smartcontractkit/chainlink/v2/core/chains/evm/utils/big"
Expand Down Expand Up @@ -120,6 +122,7 @@ type rpcClient struct {
largePayloadRpcTimeout time.Duration
rpcTimeout time.Duration
finalizedBlockPollInterval time.Duration
chainType chaintype.ChainType

ws rawclient
http *rawclient
Expand Down Expand Up @@ -156,10 +159,12 @@ func NewRPCClient(
finalizedBlockPollInterval time.Duration,
largePayloadRpcTimeout time.Duration,
rpcTimeout time.Duration,
chainType chaintype.ChainType,
) RPCClient {
r := &rpcClient{
largePayloadRpcTimeout: largePayloadRpcTimeout,
rpcTimeout: rpcTimeout,
chainType: chainType,
}
r.name = name
r.id = id
Expand Down Expand Up @@ -396,8 +401,28 @@ func (r *rpcClient) CallContext(ctx context.Context, result interface{}, method
return err
}

func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) error {
ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.largePayloadRpcTimeout)
func (r *rpcClient) BatchCallContext(rootCtx context.Context, b []rpc.BatchElem) error {
// Astar's finality tags provide weaker finality guarantees than we require.
// Fetch latest finalized block using Astar's custom requests and populate it after batch request completes
var astarRawLatestFinalizedBlock json.RawMessage
var requestedFinalizedBlock bool
if r.chainType == chaintype.ChainAstar {
for _, el := range b {
if !isRequestingFinalizedBlock(el) {
continue
}

requestedFinalizedBlock = true
err := r.astarLatestFinalizedBlock(rootCtx, &astarRawLatestFinalizedBlock)
if err != nil {
return fmt.Errorf("failed to get astar latest finalized block: %w", err)
}

break
}
}

ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(rootCtx, r.largePayloadRpcTimeout)
defer cancel()
lggr := r.newRqLggr().With("nBatchElems", len(b), "batchElems", b)

Expand All @@ -412,8 +437,46 @@ func (r *rpcClient) BatchCallContext(ctx context.Context, b []rpc.BatchElem) err
duration := time.Since(start)

r.logResult(lggr, err, duration, r.getRPCDomain(), "BatchCallContext")
if err != nil {
return err
}

return err
if r.chainType == chaintype.ChainAstar && requestedFinalizedBlock {
// populate requested finalized block with correct value
for _, el := range b {
if !isRequestingFinalizedBlock(el) {
continue
}

el.Error = nil
err = json.Unmarshal(astarRawLatestFinalizedBlock, el.Result)
if err != nil {
el.Error = fmt.Errorf("failed to unmarshal astar finalized block into provided struct: %w", err)
}
}
}

return nil
}

func isRequestingFinalizedBlock(el rpc.BatchElem) bool {
isGetBlock := el.Method == "eth_getBlockByNumber" && len(el.Args) > 0
if !isGetBlock {
return false
}

if el.Args[0] == rpc.FinalizedBlockNumber {
return true
}

switch arg := el.Args[0].(type) {
case string:
return arg == rpc.FinalizedBlockNumber.String()
case fmt.Stringer:
return arg.String() == rpc.FinalizedBlockNumber.String()
default:
return false
}
}

// TODO: Full transition from SubscribeNewHead to SubscribeToHeads is done in BCI-2875
Expand Down Expand Up @@ -601,17 +664,84 @@ func (r *rpcClient) HeaderByHash(ctx context.Context, hash common.Hash) (header
return
}

func (r *rpcClient) LatestFinalizedBlock(ctx context.Context) (*evmtypes.Head, error) {
return r.blockByNumber(ctx, rpc.FinalizedBlockNumber.String())
func (r *rpcClient) LatestFinalizedBlock(ctx context.Context) (head *evmtypes.Head, err error) {
// capture chStopInFlight to ensure we are not updating chainInfo with observations related to previous life cycle
ctx, cancel, chStopInFlight, _, _ := r.acquireQueryCtx(ctx, r.rpcTimeout)
defer cancel()
if r.chainType == chaintype.ChainAstar {
// astar's finality tags provide weaker guarantee. Use their custom request to request latest finalized block
err = r.astarLatestFinalizedBlock(ctx, &head)
} else {
err = r.ethGetBlockByNumber(ctx, rpc.FinalizedBlockNumber.String(), &head)
}

if err != nil {
return
}

if head == nil {
err = r.wrapRPCClientError(ethereum.NotFound)
return
}

head.EVMChainID = ubig.New(r.chainID)

r.onNewFinalizedHead(ctx, chStopInFlight, head)
return
}

func (r *rpcClient) astarLatestFinalizedBlock(ctx context.Context, result interface{}) (err error) {
var hashResult string
err = r.CallContext(ctx, &hashResult, "chain_getFinalizedHead")
if err != nil {
return fmt.Errorf("failed to get astar latest finalized hash: %w", err)
}

var astarHead struct {
Number *hexutil.Big `json:"number"`
}
err = r.CallContext(ctx, &astarHead, "chain_getHeader", hashResult, false)
if err != nil {
return fmt.Errorf("failed to get astar head by hash: %w", err)
}

if astarHead.Number == nil {
return r.wrapRPCClientError(fmt.Errorf("expected non empty head number of finalized block"))
}

err = r.ethGetBlockByNumber(ctx, astarHead.Number.String(), result)
if err != nil {
return fmt.Errorf("failed to get astar finalized block: %w", err)
}

return nil
}

func (r *rpcClient) BlockByNumber(ctx context.Context, number *big.Int) (head *evmtypes.Head, err error) {
hex := ToBlockNumArg(number)
return r.blockByNumber(ctx, hex)
ctx, cancel, chStopInFlight, _, _ := r.acquireQueryCtx(ctx, r.rpcTimeout)
defer cancel()
hexNumber := ToBlockNumArg(number)
err = r.ethGetBlockByNumber(ctx, hexNumber, &head)
if err != nil {
return
}

if head == nil {
err = r.wrapRPCClientError(ethereum.NotFound)
return
}

head.EVMChainID = ubig.New(r.chainID)

if hexNumber == rpc.LatestBlockNumber.String() {
r.onNewHead(ctx, chStopInFlight, head)
}

return
}

func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evmtypes.Head, err error) {
ctx, cancel, chStopInFlight, ws, http := r.acquireQueryCtx(ctx, r.rpcTimeout)
func (r *rpcClient) ethGetBlockByNumber(ctx context.Context, number string, result interface{}) (err error) {
ctx, cancel, ws, http := r.makeLiveQueryCtxAndSafeGetClients(ctx, r.rpcTimeout)
defer cancel()
const method = "eth_getBlockByNumber"
args := []interface{}{number, false}
Expand All @@ -623,30 +753,14 @@ func (r *rpcClient) blockByNumber(ctx context.Context, number string) (head *evm
lggr.Debug("RPC call: evmclient.Client#CallContext")
start := time.Now()
if http != nil {
err = r.wrapHTTP(http.rpc.CallContext(ctx, &head, method, args...))
err = r.wrapHTTP(http.rpc.CallContext(ctx, result, method, args...))
} else {
err = r.wrapWS(ws.rpc.CallContext(ctx, &head, method, args...))
err = r.wrapWS(ws.rpc.CallContext(ctx, result, method, args...))
}
duration := time.Since(start)

r.logResult(lggr, err, duration, r.getRPCDomain(), "CallContext")
if err != nil {
return nil, err
}
if head == nil {
err = r.wrapRPCClientError(ethereum.NotFound)
return
}
head.EVMChainID = ubig.New(r.chainID)

switch number {
case rpc.FinalizedBlockNumber.String():
r.onNewFinalizedHead(ctx, chStopInFlight, head)
case rpc.LatestBlockNumber.String():
r.onNewHead(ctx, chStopInFlight, head)
}

return
return err
}

func (r *rpcClient) BlockByHash(ctx context.Context, hash common.Hash) (head *evmtypes.Head, err error) {
Expand Down
Loading

0 comments on commit bd648bd

Please sign in to comment.