Skip to content

Commit

Permalink
Remove manifest versions (#472)
Browse files Browse the repository at this point in the history
* Remove manifest versions

This also allows us to re-config without re-bootstrap (by restarting the
local F3 instance). This is only safe for non-consensus parameters and
will be horribly unsafe until we fix #392.

Also:

- Removes reliance on hashing json, relies on the network name and
  manual equality checks.
- Removes versions. We now expect the version to be explicitly specified
  in the network name.
- Starts message sequence numbers at the current time so we don't need
  to save them.
- Remove the EC from the dynamic manifest provider as it's unused.
- Tests.

fixes #468

* additional equality test

* send updates whenever the manifest changes
  • Loading branch information
Stebalien authored Jul 13, 2024
1 parent 6806dd3 commit 4aba72e
Show file tree
Hide file tree
Showing 10 changed files with 189 additions and 105 deletions.
27 changes: 7 additions & 20 deletions cmd/f3/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -173,20 +173,7 @@ var manifestServeCmd = cli.Command{
}

manifestPath := c.String("manifest")
loadManifestAndVersion := func() (*manifest.Manifest, manifest.Version, error) {

m, err := loadManifest(manifestPath)
if err != nil {
return nil, "", fmt.Errorf("loading manifest: %w", err)
}
version, err := m.Version()
if err != nil {
return nil, "", fmt.Errorf("versioning manifest: %w", err)
}
return m, version, nil
}

initManifest, manifestVersion, err := loadManifestAndVersion()
currentManifest, err := loadManifest(manifestPath)
if err != nil {
return fmt.Errorf("loading initial manifest: %w", err)
}
Expand All @@ -196,11 +183,11 @@ var manifestServeCmd = cli.Command{
return fmt.Errorf("initialzing pubsub: %w", err)
}

sender, err := manifest.NewManifestSender(host, pubSub, initManifest, c.Duration("publishInterval"))
sender, err := manifest.NewManifestSender(host, pubSub, currentManifest, c.Duration("publishInterval"))
if err != nil {
return fmt.Errorf("initialzing manifest sender: %w", err)
}
_, _ = fmt.Fprintf(c.App.Writer, "Started manifest sender with version: %s\n", manifestVersion)
_, _ = fmt.Fprintf(c.App.Writer, "Started manifest sender with network name: %s\n", currentManifest.NetworkName)

checkInterval := c.Duration("checkInterval")

Expand All @@ -215,12 +202,12 @@ var manifestServeCmd = cli.Command{
case <-ctx.Done():
return nil
case <-checkTicker.C:
if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil {
if nextManifest, err := loadManifest(manifestPath); err != nil {
_, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err)
} else if manifestVersion != nextManifestVersion {
_, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion)
} else if !nextManifest.Equal(currentManifest) {
_, _ = fmt.Fprintf(c.App.Writer, "Loaded changed manifest with network name: %q\n", nextManifest.NetworkName)
sender.UpdateManifest(nextManifest)
manifestVersion = nextManifestVersion
currentManifest = nextManifest
}
}
}
Expand Down
2 changes: 1 addition & 1 deletion cmd/f3/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,7 @@ var runCmd = cli.Command{
if err != nil {
return fmt.Errorf("parsing manifest server ID: %w", err)
}
mprovider = manifest.NewDynamicManifestProvider(m, ps, nil, manifestServer)
mprovider = manifest.NewDynamicManifestProvider(m, ps, manifestServer)
} else {
mprovider = manifest.NewStaticManifestProvider(m)
}
Expand Down
17 changes: 17 additions & 0 deletions gpbft/powertable.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package gpbft

import (
"bytes"
"errors"
"fmt"
"maps"
Expand Down Expand Up @@ -31,6 +32,22 @@ type PowerTable struct {
ScaledTotal uint16
}

func (p *PowerEntry) Equal(o *PowerEntry) bool {
return p.ID == o.ID && p.Power.Cmp(o.Power) == 0 && bytes.Equal(p.PubKey, o.PubKey)
}

func (p PowerEntries) Equal(o PowerEntries) bool {
if len(p) != len(o) {
return false
}
for i := range p {
if !p[i].Equal(&o[i]) {
return false
}
}
return true
}

// Len returns the number of entries in this PowerTable.
func (p PowerEntries) Len() int {
return len(p)
Expand Down
1 change: 1 addition & 0 deletions gpbft/powertable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -191,6 +191,7 @@ func TestPowerTable(t *testing.T) {
gotCopy := subject.Copy()
require.Equal(t, subject, gotCopy)
require.NotSame(t, subject, gotCopy)
require.True(t, subject.Entries.Equal(gotCopy.Entries))
})
}
})
Expand Down
45 changes: 15 additions & 30 deletions manifest/dynamic_manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,6 @@ import (
"fmt"
"io"

"github.com/filecoin-project/go-f3/ec"
"github.com/filecoin-project/go-f3/gpbft"

logging "github.com/ipfs/go-log/v2"
pubsub "github.com/libp2p/go-libp2p-pubsub"
"github.com/libp2p/go-libp2p/core/peer"
Expand All @@ -28,7 +25,6 @@ const ManifestPubSubTopicName = "/f3/manifests/0.0.1"
// the manifest to be changed at runtime.
type DynamicManifestProvider struct {
pubsub *pubsub.PubSub
ec ec.Backend
manifestServerID peer.ID

runningCtx context.Context
Expand All @@ -41,29 +37,12 @@ type DynamicManifestProvider struct {

// ManifestUpdateMessage updates the GPBFT manifest.
type ManifestUpdateMessage struct {
// A monotonically increasing sequence number for ordering manifest
// updates received over the network.
// An increasing sequence number for ordering manifest updates received over the network.
MessageSequence uint64
// The manifest version changes each time we distribute a new manifest. Pausing/resuming
// does not update the version number.
ManifestVersion uint64
// The manifest to apply or nil to pause the network.
Manifest *Manifest
}

func (mu ManifestUpdateMessage) toManifest() *Manifest {
if mu.Manifest == nil {
return nil
}

// When a manifest configuration changes, a new network name is set that depends on the
// manifest version of the previous version to avoid overlapping previous configurations.
cpy := *mu.Manifest
newName := fmt.Sprintf("%s/%d", string(cpy.NetworkName), mu.ManifestVersion)
cpy.NetworkName = gpbft.NetworkName(newName)
return &cpy
}

func (m ManifestUpdateMessage) Marshal() ([]byte, error) {
b, err := json.Marshal(m)
if err != nil {
Expand All @@ -80,13 +59,12 @@ func (m *ManifestUpdateMessage) Unmarshal(r io.Reader) error {
return nil
}

func NewDynamicManifestProvider(initialManifest *Manifest, pubsub *pubsub.PubSub, ec ec.Backend, manifestServerID peer.ID) ManifestProvider {
func NewDynamicManifestProvider(initialManifest *Manifest, pubsub *pubsub.PubSub, manifestServerID peer.ID) *DynamicManifestProvider {
ctx, cancel := context.WithCancel(context.Background())
errgrp, ctx := errgroup.WithContext(ctx)

return &DynamicManifestProvider{
pubsub: pubsub,
ec: ec,
manifestServerID: manifestServerID,
runningCtx: ctx,
errgrp: errgrp,
Expand Down Expand Up @@ -123,11 +101,8 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) (_err error) {
// XXX: load the initial manifest from disk!
// And save it!

m.manifestChanges <- ManifestUpdateMessage{
MessageSequence: 0,
ManifestVersion: 0,
Manifest: m.initialManifest,
}.toManifest()
currentManifest := m.initialManifest
m.manifestChanges <- m.initialManifest

m.errgrp.Go(func() error {
defer func() {
Expand Down Expand Up @@ -167,8 +142,18 @@ func (m *DynamicManifestProvider) Start(startCtx context.Context) (_err error) {
}
log.Infof("received manifest update %d", update.MessageSequence)
msgSeqNumber = update.MessageSequence

oldManifest := currentManifest
currentManifest = update.Manifest

// If we're receiving the same manifest multiple times (manifest publisher
// could have restarted), don't re-apply it.
if oldManifest.Equal(currentManifest) {
continue
}

select {
case m.manifestChanges <- update.toManifest():
case m.manifestChanges <- update.Manifest:
case <-m.runningCtx.Done():
return nil
}
Expand Down
90 changes: 90 additions & 0 deletions manifest/dynamic_manifest_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
package manifest

import (
"context"
"testing"
"time"

pubsub "github.com/libp2p/go-libp2p-pubsub"
mocknetwork "github.com/libp2p/go-libp2p/p2p/net/mock"
"github.com/stretchr/testify/require"
)

func TestDynamicManifest(t *testing.T) {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

t.Cleanup(cancel)

mocknet := mocknetwork.New()
initialManifest := LocalDevnetManifest()

var (
sender *ManifestSender
provider *DynamicManifestProvider
)

{
host, err := mocknet.GenPeer()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, host.Close()) })

pubSub, err := pubsub.NewGossipSub(ctx, host, pubsub.WithPeerExchange(true))
require.NoError(t, err)
sender, err = NewManifestSender(host, pubSub, initialManifest, 10*time.Millisecond)
require.NoError(t, err)
}

{
host, err := mocknet.GenPeer()
require.NoError(t, err)
t.Cleanup(func() { require.NoError(t, host.Close()) })

pubSub, err := pubsub.NewGossipSub(ctx, host, pubsub.WithPeerExchange(true))
require.NoError(t, err)

provider = NewDynamicManifestProvider(initialManifest, pubSub, sender.SenderID())
}

mocknet.LinkAll()
mocknet.ConnectAllButSelf()

waitSender := make(chan error, 1)
senderCtx, cancelSender := context.WithCancel(ctx)
go func() { waitSender <- sender.Run(senderCtx) }()

require.NoError(t, provider.Start(ctx))
t.Cleanup(func() { require.NoError(t, provider.Stop(context.Background())) })

// Should receive the initial manifest.
require.True(t, initialManifest.Equal(<-provider.ManifestUpdates()))

// Pausing should send nil.
sender.Pause()
require.Nil(t, <-provider.ManifestUpdates())

// Should get the initial manifest again.
sender.Resume()
require.True(t, initialManifest.Equal(<-provider.ManifestUpdates()))

cancelSender()
require.Nil(t, <-waitSender)

// Re-start the sender. The client shouldn't see an update.
senderCtx, cancelSender = context.WithCancel(ctx)
go func() { waitSender <- sender.Run(senderCtx) }()

select {
case <-provider.ManifestUpdates():
t.Fatal("did not expect a manifest update when restarting manifest sender")
case <-time.After(1 * time.Second):
}
newManifest := *initialManifest
newManifest.NetworkName = "updated-name"
sender.UpdateManifest(&newManifest)

require.True(t, newManifest.Equal(<-provider.ManifestUpdates()))

cancelSender()
require.NoError(t, <-waitSender)
}
Loading

0 comments on commit 4aba72e

Please sign in to comment.