Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat!: adds support for gRPC streaming and removes dependency on RPC #3915

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
58 commits
Select commit Hold shift + click to select a range
e831ee3
feat!: use blocks streaming API instead of RPC in fetcher
rach-id Nov 2, 2024
ea9a2d7
test: node builder test using the new grpc endpoints
rach-id Nov 4, 2024
5cd2e94
chore: import
rach-id Nov 4, 2024
c30fdbc
chore: add stop function for fetcher
rach-id Nov 4, 2024
9a194b2
feat!: remove rpc port
rach-id Nov 4, 2024
8360531
chore: remove core rpc tests
rach-id Nov 4, 2024
2b71210
chore: remove core rpc tests
rach-id Nov 4, 2024
13bddc3
fix: test fill blocks failing after node stop
rach-id Nov 4, 2024
a568598
chore: use commits for deps
rach-id Nov 4, 2024
291952b
chore: fmt
rach-id Nov 4, 2024
3087a3a
chore: fix tests
rach-id Nov 5, 2024
cc81362
chore: fmt
rach-id Nov 5, 2024
7d89608
chore: fmt
rach-id Nov 5, 2024
51096af
chore: memory optimisation
rach-id Nov 6, 2024
fe54137
chore: rename to first part
rach-id Nov 8, 2024
62c1dc7
feat!: use a local signed block
rach-id Nov 8, 2024
184391b
chore: remove unnecessary select
rach-id Nov 8, 2024
e11d333
chore: fmt
rach-id Nov 8, 2024
b37c4b2
Merge branch 'main' into support-grpc-endpoints
rach-id Nov 11, 2024
b14cb16
chore: go mod tidy
rach-id Nov 11, 2024
e51f237
chore: remove unnecessary =nil
rach-id Nov 12, 2024
221253f
chore: fix test
rach-id Nov 12, 2024
fe20093
chore: rename grpc port to port
rach-id Nov 12, 2024
4a90605
fix(header)!: tendermint compatible json marshall (#3928)
zvolin Nov 13, 2024
f371c33
chore: use net.JoinHostPort as suggested by @cristaloleg
rach-id Nov 20, 2024
92c4f94
chore: context timeout as suggested by @walldiss
rach-id Nov 20, 2024
af7f2be
chore: remove unnecessary comment
rach-id Nov 21, 2024
7a62570
feat!: remove pointers in requests
rach-id Nov 21, 2024
1b41b56
Merge branch 'feature/api-breaks' into support-grpc-endpoints
rach-id Nov 21, 2024
a863f6b
chore: bump version
rach-id Nov 21, 2024
b5a545f
chore: fix remaining implementation to support removing pointers
rach-id Nov 21, 2024
a10ac73
chore: lint
rach-id Nov 21, 2024
d4243fa
chore: check if fetcher is listening for new blocks
rach-id Nov 21, 2024
511068a
fix: correctly set the is listening for blocks to false
rach-id Nov 22, 2024
a033feb
feat: support stopping the gRPC connection + subscription retry
rach-id Nov 22, 2024
c1970f0
Merge branch 'main' into support-grpc-endpoints
rach-id Nov 25, 2024
b640490
chore: remove unnecessary import
rach-id Nov 25, 2024
f59ca34
Merge branch 'main' into support-grpc-endpoints
rach-id Nov 25, 2024
4becb70
fix: testing bridge node initialisation
rach-id Nov 26, 2024
79089f9
fix: creating the client
rach-id Nov 26, 2024
3ca0dd1
Merge remote-tracking branch 'origin/support-grpc-endpoints' into sup…
rach-id Nov 26, 2024
8eecac0
fix: setting the right gRPC address when starting swamp
rach-id Nov 26, 2024
3329474
fix: initialising the bridge node correctly in swamp
rach-id Nov 26, 2024
258b02e
Merge branch 'main' into support-grpc-endpoints
rach-id Nov 26, 2024
5045918
chore: use atomic bool
rach-id Nov 27, 2024
58e7944
Merge remote-tracking branch 'origin/support-grpc-endpoints' into sup…
rach-id Nov 27, 2024
5634a4d
chore: use client not running error
rach-id Nov 27, 2024
c5a9abd
chore: remove unnecessary checks
rach-id Nov 27, 2024
45cb70f
chore: remove block meta and use SignedBlock as return
rach-id Nov 27, 2024
cd8092a
chore: update error message
rach-id Nov 27, 2024
130ee10
chore: revert unnecessary change
rach-id Nov 27, 2024
33677e3
Revert "fix(header)!: tendermint compatible json marshall (#3928)"
rach-id Nov 27, 2024
d7ff1d3
chore: use reference to data
rach-id Nov 27, 2024
3afda30
Merge branch 'main' into support-grpc-endpoints
rach-id Nov 27, 2024
77bb292
chore: merge conflicts
rach-id Nov 27, 2024
600b540
chore: remove unnecessary ctx cancellation check as suggested by @ren…
rach-id Nov 28, 2024
3bf91e2
Merge branch 'main' into support-grpc-endpoints
rach-id Nov 28, 2024
b68c3f9
chore: go mod tidy
rach-id Nov 28, 2024
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
71 changes: 56 additions & 15 deletions core/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,24 +3,65 @@ package core
import (
"fmt"

retryhttp "github.com/hashicorp/go-retryablehttp"
"github.com/tendermint/tendermint/rpc/client"
"github.com/tendermint/tendermint/rpc/client/http"
coregrpc "github.com/tendermint/tendermint/rpc/grpc"
"google.golang.org/grpc"
"google.golang.org/grpc/credentials/insecure"
)

// Client is an alias to Core Client.
type Client = client.Client
// Client is a core gRPC client.
type Client struct {
coregrpc.BlockAPIClient
host, port string
conn *grpc.ClientConn
}

// NewRemote creates a new Client that communicates with a remote Core endpoint over HTTP.
func NewRemote(ip, port string) (Client, error) {
httpClient := retryhttp.NewClient()
httpClient.RetryMax = 2
// suppress logging
httpClient.Logger = nil
// NewClient creates a new Client that communicates with a remote Core endpoint over gRPC.
// The connection is not started when creating the client.
// Use the Start method to start the connection.
func NewClient(host, port string) *Client {
return &Client{
host: host,
port: port,
}
}

return http.NewWithClient(
fmt.Sprintf("tcp://%s:%s", ip, port),
"/websocket",
httpClient.StandardClient(),
// Start created the Client's gRPC connection with optional dial options.
// If the connection is already started, it does nothing.
func (c *Client) Start(opts ...grpc.DialOption) error {
if c.IsRunning() {
return nil
}
if len(opts) == 0 {
opts = append(opts, grpc.WithTransportCredentials(insecure.NewCredentials()))
}
conn, err := grpc.NewClient(
fmt.Sprintf("%s:%s", c.host, c.port),
opts...,
)
if err != nil {
return err
}
c.conn = conn

c.BlockAPIClient = coregrpc.NewBlockAPIClient(conn)
return nil
}

// IsRunning checks if the client's connection is established and ready for use.
// It returns true if the connection is active, false otherwise.
func (c *Client) IsRunning() bool {
return c.conn != nil && c.BlockAPIClient != nil
}

// Stop terminates the Client's gRPC connection and releases all related resources.
// If the connection is already stopped, it does nothing.
func (c *Client) Stop() error {
if !c.IsRunning() {
return nil
}
defer func() {
c.conn = nil
c.BlockAPIClient = nil
}()
return c.conn.Close()
}
4 changes: 2 additions & 2 deletions core/eds.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,8 +24,8 @@ import (
// extendBlock extends the given block data, returning the resulting
// ExtendedDataSquare (EDS). If there are no transactions in the block,
// nil is returned in place of the eds.
func extendBlock(data types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) {
if app.IsEmptyBlockRef(&data, appVersion) {
func extendBlock(data *types.Data, appVersion uint64, options ...nmt.Option) (*rsmt2d.ExtendedDataSquare, error) {
if app.IsEmptyBlockRef(data, appVersion) {
return share.EmptyEDS(), nil
}

Expand Down
4 changes: 2 additions & 2 deletions core/eds_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ func TestTrulyEmptySquare(t *testing.T) {
SquareSize: 1,
}

eds, err := extendBlock(data, appconsts.LatestVersion)
eds, err := extendBlock(&data, appconsts.LatestVersion)
require.NoError(t, err)
require.True(t, eds.Equals(share.EmptyEDS()))
}
Expand All @@ -38,7 +38,7 @@ func TestEmptySquareWithZeroTxs(t *testing.T) {
Txs: []types.Tx{},
}

eds, err := extendBlock(data, appconsts.LatestVersion)
eds, err := extendBlock(&data, appconsts.LatestVersion)
require.NoError(t, err)
require.True(t, eds.Equals(share.EmptyEDS()))

Expand Down
25 changes: 8 additions & 17 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,6 @@ import (
"fmt"
"time"

"github.com/tendermint/tendermint/types"
"golang.org/x/sync/errgroup"

libhead "github.com/celestiaorg/go-header"
Expand Down Expand Up @@ -60,8 +59,7 @@ func NewExchange(

func (ce *Exchange) GetByHeight(ctx context.Context, height uint64) (*header.ExtendedHeader, error) {
log.Debugw("requesting header", "height", height)
intHeight := int64(height)
return ce.getExtendedHeaderByHeight(ctx, &intHeight)
return ce.getExtendedHeaderByHeight(ctx, int64(height))
}

func (ce *Exchange) GetRangeByHeight(
Expand Down Expand Up @@ -127,12 +125,12 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
return nil, fmt.Errorf("fetching block by hash %s: %w", hash.String(), err)
}

comm, vals, err := ce.fetcher.GetBlockInfo(ctx, &block.Height)
comm, vals, err := ce.fetcher.GetBlockInfo(ctx, block.Height)
if err != nil {
return nil, fmt.Errorf("fetching block info for height %d: %w", &block.Height, err)
}

eds, err := extendBlock(block.Data, block.Header.Version.App)
eds, err := extendBlock(&block.Data, block.Header.Version.App)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
}
Expand Down Expand Up @@ -160,29 +158,22 @@ func (ce *Exchange) Head(
_ ...libhead.HeadOption[*header.ExtendedHeader],
) (*header.ExtendedHeader, error) {
log.Debug("requesting head")
return ce.getExtendedHeaderByHeight(ctx, nil)
return ce.getExtendedHeaderByHeight(ctx, 0)
}

func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64) (*header.ExtendedHeader, error) {
func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height int64) (*header.ExtendedHeader, error) {
b, err := ce.fetcher.GetSignedBlock(ctx, height)
if err != nil {
if height == nil {
return nil, fmt.Errorf("fetching signed block for head from core: %w", err)
}
return nil, fmt.Errorf("fetching signed block at height %d from core: %w", *height, err)
return nil, fmt.Errorf("fetching signed block at height %d from core: %w", height, err)
}
log.Debugw("fetched signed block from core", "height", b.Header.Height)

eds, err := extendBlock(b.Data, b.Header.Version.App)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}

// TODO(@Wondertan): This is a hack to deref Data, allowing GC to pick it up.
// The better footgun-less solution is to change core.ResultSignedBlock fields to be pointers instead of values.
b.Data = types.Data{}

eh, err := ce.construct(&b.Header, &b.Commit, &b.ValidatorSet, eds)
// create extended header
eh, err := ce.construct(b.Header, b.Commit, b.ValidatorSet, eds)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}
Expand Down
16 changes: 12 additions & 4 deletions core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,7 @@ package core
import (
"bytes"
"context"
"net"
"testing"
"time"

Expand Down Expand Up @@ -34,7 +35,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
genBlock, err := fetcher.GetBlock(ctx, genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)
Expand All @@ -61,6 +62,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {
require.NoError(t, err)
assert.True(t, has)
}
require.NoError(t, fetcher.Stop(ctx))
}

// TestExchange_DoNotStoreHistoric tests that the CoreExchange will not
Expand All @@ -87,7 +89,7 @@ func TestExchange_DoNotStoreHistoric(t *testing.T) {

// initialize store with genesis block
genHeight := int64(1)
genBlock, err := fetcher.GetBlock(ctx, &genHeight)
genBlock, err := fetcher.GetBlock(ctx, genHeight)
require.NoError(t, err)
genHeader, err := ce.Get(ctx, genBlock.Header.Hash().Bytes())
require.NoError(t, err)
Expand Down Expand Up @@ -117,7 +119,13 @@ func createCoreFetcher(t *testing.T, cfg *testnode.Config) (*BlockFetcher, testn
// flakiness with accessing account state)
_, err := cctx.WaitForHeightWithTimeout(2, time.Second*2) // TODO @renaynay: configure?
require.NoError(t, err)
return NewBlockFetcher(cctx.Client), cctx
host, port, err := net.SplitHostPort(cctx.GRPCClient.Target())
require.NoError(t, err)
client := NewClient(host, port)
require.NoError(t, client.Start())
fetcher, err := NewBlockFetcher(client)
require.NoError(t, err)
return fetcher, cctx
}

// fillBlocks fills blocks until the context is canceled.
Expand Down Expand Up @@ -153,7 +161,7 @@ func generateNonEmptyBlocks(
sub, err := fetcher.SubscribeNewBlockEvent(ctx)
require.NoError(t, err)
defer func() {
err = fetcher.UnsubscribeNewBlockEvent(ctx)
err = fetcher.Stop(ctx)
require.NoError(t, err)
}()

Expand Down
Loading