Skip to content

Commit

Permalink
feat(share/eds): add EdsMaker
Browse files Browse the repository at this point in the history
  • Loading branch information
vgonkivs committed Aug 19, 2023
1 parent 04e2928 commit c418b1b
Show file tree
Hide file tree
Showing 13 changed files with 201 additions and 144 deletions.
64 changes: 0 additions & 64 deletions core/eds.go

This file was deleted.

53 changes: 0 additions & 53 deletions core/eds_test.go

This file was deleted.

40 changes: 27 additions & 13 deletions core/exchange.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (

"golang.org/x/sync/errgroup"

"github.com/celestiaorg/celestia-app/pkg/appconsts"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"

Expand All @@ -19,20 +20,23 @@ import (
const concurrencyLimit = 4

type Exchange struct {
fetcher *BlockFetcher
store *eds.Store
construct header.ConstructFn
fetcher *BlockFetcher
store *eds.Store
headerConstruct header.ConstructFn
edsConstruct eds.ConstructFn
}

func NewExchange(
fetcher *BlockFetcher,
store *eds.Store,
construct header.ConstructFn,
headerConstruct header.ConstructFn,
edsConstruct eds.ConstructFn,
) *Exchange {
return &Exchange{
fetcher: fetcher,
store: store,
construct: construct,
fetcher: fetcher,
store: store,
headerConstruct: headerConstruct,
edsConstruct: edsConstruct,
}
}

Expand Down Expand Up @@ -110,12 +114,17 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
adder := ipld.NewProofsAdder(int(block.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(block.Data, block.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
dataSq, err := ce.edsConstruct(
block.Data.Txs.ToSliceOfBytes(),
block.Header.Version.App,
appconsts.SquareSizeUpperBound(block.Header.Version.App),
nmt.NodeVisitor(adder.VisitFn()),
)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", &block.Height, err)
}
// construct extended header
eh, err := ce.construct(ctx, &block.Header, comm, vals, eds)
eh, err := ce.headerConstruct(ctx, &block.Header, comm, vals, dataSq)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", &block.Height, err))
}
Expand All @@ -126,7 +135,7 @@ func (ce *Exchange) Get(ctx context.Context, hash libhead.Hash) (*header.Extende
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
err = eds.StoreEDS(ctx, eh.DAH.Hash(), dataSq, ce.store)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for height %d: %w", &block.Height, err)
}
Expand All @@ -152,18 +161,23 @@ func (ce *Exchange) getExtendedHeaderByHeight(ctx context.Context, height *int64
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
dataSq, err := ce.edsConstruct(
b.Data.Txs.ToSliceOfBytes(),
b.Header.Version.App,
appconsts.SquareSizeUpperBound(b.Header.Version.App),
nmt.NodeVisitor(adder.VisitFn()),
)
if err != nil {
return nil, fmt.Errorf("extending block data for height %d: %w", b.Header.Height, err)
}
// create extended header
eh, err := ce.construct(ctx, &b.Header, &b.Commit, &b.ValidatorSet, eds)
eh, err := ce.headerConstruct(ctx, &b.Header, &b.Commit, &b.ValidatorSet, dataSq)
if err != nil {
panic(fmt.Errorf("constructing extended header for height %d: %w", b.Header.Height, err))
}

ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, eh.DAH.Hash(), eds, ce.store)
err = eds.StoreEDS(ctx, eh.DAH.Hash(), dataSq, ce.store)
if err != nil {
return nil, fmt.Errorf("storing EDS to eds.Store for block height %d: %w", b.Header.Height, err)
}
Expand Down
2 changes: 1 addition & 1 deletion core/exchange_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func TestCoreExchange_RequestHeaders(t *testing.T) {

store := createStore(t)

ce := NewExchange(fetcher, store, header.MakeExtendedHeader)
ce := NewExchange(fetcher, store, header.MakeExtendedHeader, eds.MakeExtendedDataSquare)
headers, err := ce.GetRangeByHeight(context.Background(), 1, 10)
require.NoError(t, err)

Expand Down
9 changes: 8 additions & 1 deletion core/header_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,11 @@ import (
"github.com/stretchr/testify/require"
"github.com/tendermint/tendermint/libs/rand"

"github.com/celestiaorg/celestia-app/pkg/appconsts"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/header/headertest"
"github.com/celestiaorg/celestia-node/share/eds"
)

func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {
Expand All @@ -30,7 +33,11 @@ func TestMakeExtendedHeaderForEmptyBlock(t *testing.T) {
comm, val, err := fetcher.GetBlockInfo(ctx, &height)
require.NoError(t, err)

eds, err := extendBlock(b.Data, b.Header.Version.App)
eds, err := eds.MakeExtendedDataSquare(
b.Data.Txs.ToSliceOfBytes(),
b.Header.Version.App,
appconsts.SquareSizeUpperBound(b.Header.Version.App),
)
require.NoError(t, err)

headerExt, err := header.MakeExtendedHeader(ctx, &b.Header, comm, val, eds)
Expand Down
38 changes: 29 additions & 9 deletions core/listener.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,8 +11,11 @@ import (
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/attribute"

"github.com/celestiaorg/celestia-app/app"
"github.com/celestiaorg/celestia-app/pkg/appconsts"
libhead "github.com/celestiaorg/go-header"
"github.com/celestiaorg/nmt"
"github.com/celestiaorg/rsmt2d"

"github.com/celestiaorg/celestia-node/header"
"github.com/celestiaorg/celestia-node/share/eds"
Expand All @@ -32,8 +35,10 @@ var tracer = otel.Tracer("core/listener")
type Listener struct {
fetcher *BlockFetcher

construct header.ConstructFn
store *eds.Store
headerConstruct header.ConstructFn
edsConstruct eds.ConstructFn

store *eds.Store

headerBroadcaster libhead.Broadcaster[*header.ExtendedHeader]
hashBroadcaster shrexsub.BroadcastFn
Expand All @@ -47,15 +52,17 @@ func NewListener(
bcast libhead.Broadcaster[*header.ExtendedHeader],
fetcher *BlockFetcher,
hashBroadcaster shrexsub.BroadcastFn,
construct header.ConstructFn,
headerConstruct header.ConstructFn,
edsConstruct eds.ConstructFn,
store *eds.Store,
blocktime time.Duration,
) *Listener {
return &Listener{
fetcher: fetcher,
headerBroadcaster: bcast,
hashBroadcaster: hashBroadcaster,
construct: construct,
headerConstruct: headerConstruct,
edsConstruct: edsConstruct,
store: store,
listenerTimeout: 2 * blocktime,
}
Expand Down Expand Up @@ -155,19 +162,32 @@ func (cl *Listener) handleNewSignedBlock(ctx context.Context, b types.EventDataS
adder := ipld.NewProofsAdder(int(b.Data.SquareSize))
defer adder.Purge()

eds, err := extendBlock(b.Data, b.Header.Version.App, nmt.NodeVisitor(adder.VisitFn()))
if err != nil {
return fmt.Errorf("extending block data: %w", err)
var (
dataSq *rsmt2d.ExtendedDataSquare
err error
)

if !app.IsEmptyBlock(b.Data, b.Header.Version.App) {
dataSq, err = cl.edsConstruct(
b.Data.Txs.ToSliceOfBytes(),
b.Header.Version.App,
appconsts.SquareSizeUpperBound(b.Header.Version.App),
nmt.NodeVisitor(adder.VisitFn()),
)
if err != nil {
return fmt.Errorf("extending block data: %w", err)
}
}

// generate extended header
eh, err := cl.construct(ctx, &b.Header, &b.Commit, &b.ValidatorSet, eds)
eh, err := cl.headerConstruct(ctx, &b.Header, &b.Commit, &b.ValidatorSet, dataSq)
if err != nil {
panic(fmt.Errorf("making extended header: %w", err))
}

// attempt to store block data if not empty
ctx = ipld.CtxWithProofsAdder(ctx, adder)
err = storeEDS(ctx, b.Header.DataHash.Bytes(), eds, cl.store)
err = eds.StoreEDS(ctx, b.Header.DataHash.Bytes(), dataSq, cl.store)
if err != nil {
return fmt.Errorf("storing EDS: %w", err)
}
Expand Down
10 changes: 9 additions & 1 deletion core/listener_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -168,7 +168,15 @@ func createListener(
require.NoError(t, p2pSub.Stop(ctx))
})

return NewListener(p2pSub, fetcher, edsSub.Broadcast, header.MakeExtendedHeader, store, nodep2p.BlockTime)
return NewListener(
p2pSub,
fetcher,
edsSub.Broadcast,
header.MakeExtendedHeader,
eds.MakeExtendedDataSquare,
store,
nodep2p.BlockTime,
)
}

func createEdsPubSub(ctx context.Context, t *testing.T) *shrexsub.PubSub {
Expand Down
12 changes: 10 additions & 2 deletions nodebuilder/core/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,10 +41,18 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
bcast libhead.Broadcaster[*header.ExtendedHeader],
fetcher *core.BlockFetcher,
pubsub *shrexsub.PubSub,
construct header.ConstructFn,
hConstruct header.ConstructFn,
edsConstruct eds.ConstructFn,
store *eds.Store,
) *core.Listener {
return core.NewListener(bcast, fetcher, pubsub.Broadcast, construct, store, p2p.BlockTime)
return core.NewListener(bcast,
fetcher,
pubsub.Broadcast,
hConstruct,
edsConstruct,
store,
p2p.BlockTime,
)
},
fx.OnStart(func(ctx context.Context, listener *core.Listener) error {
return listener.Start(ctx)
Expand Down
5 changes: 5 additions & 0 deletions nodebuilder/core/opts.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,3 +17,8 @@ func WithClient(client core.Client) fx.Option {
func WithHeaderConstructFn(construct header.ConstructFn) fx.Option {
return fx.Replace(construct)
}

// WithEdsConstructFn sets custom func that creates extended data square
func WithEdsConstructFn(construct header.ConstructFn) fx.Option {
return fx.Replace(construct)
}
3 changes: 3 additions & 0 deletions nodebuilder/share/module.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,9 @@ func ConstructModule(tp node.Type, cfg *Config, options ...fx.Option) fx.Option
"share",
baseComponents,
bridgeAndFullComponents,
fx.Provide(func() eds.ConstructFn {
return eds.MakeExtendedDataSquare
}),
fxutil.ProvideAs(func(getter *getters.StoreGetter) share.Getter {
return getter
}),
Expand Down
1 change: 1 addition & 0 deletions nodebuilder/tests/swamp/swamp.go
Original file line number Diff line number Diff line change
Expand Up @@ -177,6 +177,7 @@ func (s *Swamp) setupGenesis() {
core.NewBlockFetcher(s.ClientContext.Client),
store,
header.MakeExtendedHeader,
eds.MakeExtendedDataSquare,
)

h, err := ex.GetByHeight(ctx, 1)
Expand Down
Loading

0 comments on commit c418b1b

Please sign in to comment.