Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve lifecycle management #416

Merged
merged 5 commits into from
Jul 7, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
64 changes: 35 additions & 29 deletions cmd/f3/manifest.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,7 @@
"github.com/libp2p/go-libp2p/core/crypto"
"github.com/libp2p/go-libp2p/core/peer"
"github.com/urfave/cli/v2"
"golang.org/x/sync/errgroup"
"golang.org/x/xerrors"
)

Expand Down Expand Up @@ -137,6 +138,8 @@
return xerrors.Errorf("initializing libp2p host: %w", err)
}

defer func() { _ = host.Close() }()

Check warning on line 141 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L141

Added line #L141 was not covered by tests

// Connect to all bootstrap addresses once. This should be sufficient to build
// the pubsub mesh, if not then we need to periodically re-connect and/or pull in
// the Lotus bootstrapping, which includes DHT connectivity.
Expand Down Expand Up @@ -171,15 +174,15 @@
}

manifestPath := c.String("manifest")
loadManifestAndVersion := func() (manifest.Manifest, manifest.Version, error) {
loadManifestAndVersion := func() (*manifest.Manifest, manifest.Version, error) {

Check warning on line 177 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L177

Added line #L177 was not covered by tests
Kubuxu marked this conversation as resolved.
Show resolved Hide resolved

m, err := loadManifest(manifestPath)
if err != nil {
return manifest.Manifest{}, "", xerrors.Errorf("loading manifest: %w", err)
return nil, "", xerrors.Errorf("loading manifest: %w", err)

Check warning on line 181 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L181

Added line #L181 was not covered by tests
}
version, err := m.Version()
if err != nil {
return manifest.Manifest{}, "", xerrors.Errorf("versioning manifest: %w", err)
return nil, "", xerrors.Errorf("versioning manifest: %w", err)

Check warning on line 185 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L185

Added line #L185 was not covered by tests
}
return m, version, nil
}
Expand All @@ -200,46 +203,49 @@
}
_, _ = fmt.Fprintf(c.App.Writer, "Started manifest sender with version: %s\n", manifestVersion)

go func() {
sender.Start(c.Context)
}()
Kubuxu marked this conversation as resolved.
Show resolved Hide resolved
defer func() {
sender.Stop()
_ = host.Close()
}()

checkTicker := time.NewTicker(c.Duration("checkInterval"))
for c.Context.Err() == nil {
select {
case <-c.Context.Done():
return c.Context.Err()
case <-checkTicker.C:
if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil {
_, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err)
} else if manifestVersion != nextManifestVersion {
_, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion)
sender.UpdateManifest(nextManifest)
manifestVersion = nextManifestVersion
checkInterval := c.Duration("checkInterval")

Check warning on line 206 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L206

Added line #L206 was not covered by tests

errgrp, ctx := errgroup.WithContext(c.Context)
errgrp.Go(func() error { return sender.Run(ctx) })
errgrp.Go(func() error {
checkTicker := time.NewTicker(checkInterval)
defer checkTicker.Stop()

Check warning on line 212 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L208-L212

Added lines #L208 - L212 were not covered by tests

for ctx.Err() == nil {
select {
case <-ctx.Done():
return nil
Kubuxu marked this conversation as resolved.
Show resolved Hide resolved
case <-checkTicker.C:
if nextManifest, nextManifestVersion, err := loadManifestAndVersion(); err != nil {
_, _ = fmt.Fprintf(c.App.ErrWriter, "Failed reload manifest: %v\n", err)
} else if manifestVersion != nextManifestVersion {
_, _ = fmt.Fprintf(c.App.Writer, "Loaded manifest with version: %s\n", nextManifestVersion)
sender.UpdateManifest(nextManifest)
manifestVersion = nextManifestVersion

Check warning on line 224 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L214-L224

Added lines #L214 - L224 were not covered by tests
}
}
}
}
return nil

return nil

Check warning on line 229 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L229

Added line #L229 was not covered by tests
})

return errgrp.Wait()

Check warning on line 232 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L232

Added line #L232 was not covered by tests
},
}

func getManifest(c *cli.Context) (manifest.Manifest, error) {
func getManifest(c *cli.Context) (*manifest.Manifest, error) {

Check warning on line 236 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L236

Added line #L236 was not covered by tests
manifestPath := c.String("manifest")
return loadManifest(manifestPath)
}

func loadManifest(path string) (manifest.Manifest, error) {
func loadManifest(path string) (*manifest.Manifest, error) {

Check warning on line 241 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L241

Added line #L241 was not covered by tests
f, err := os.Open(path)
if err != nil {
return manifest.Manifest{}, xerrors.Errorf("opening %s to load manifest: %w", path, err)
return nil, xerrors.Errorf("opening %s to load manifest: %w", path, err)

Check warning on line 244 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L244

Added line #L244 was not covered by tests
}
defer f.Close()
var m manifest.Manifest

err = m.Unmarshal(f)
return m, err
return &m, err

Check warning on line 250 in cmd/f3/manifest.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/manifest.go#L250

Added line #L250 was not covered by tests
}
14 changes: 6 additions & 8 deletions cmd/f3/run.go
Original file line number Diff line number Diff line change
Expand Up @@ -121,20 +121,18 @@
return xerrors.Errorf("creating module: %w", err)
}

mprovider.SetManifestChangeCallback(f3.ManifestChangeCallback(module))
go runMessageSubscription(ctx, module, gpbft.ActorID(id), signingBackend)

return module.Run(ctx)
if err := module.Start(ctx); err != nil {
return nil

Check warning on line 127 in cmd/f3/run.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/run.go#L126-L127

Added lines #L126 - L127 were not covered by tests
}
<-ctx.Done()
return module.Stop(context.Background())

Check warning on line 130 in cmd/f3/run.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/run.go#L129-L130

Added lines #L129 - L130 were not covered by tests
},
Stebalien marked this conversation as resolved.
Show resolved Hide resolved
}

func runMessageSubscription(ctx context.Context, module *f3.F3, actorID gpbft.ActorID, signer gpbft.Signer) {
for {
select {
case <-ctx.Done():
return
default:
}
for ctx.Err() == nil {

Check warning on line 135 in cmd/f3/run.go

View check run for this annotation

Codecov / codecov/patch

cmd/f3/run.go#L135

Added line #L135 was not covered by tests

ch := make(chan *gpbft.MessageBuilder, 4)
module.SubscribeForMessagesToSign(ch)
Expand Down
29 changes: 29 additions & 0 deletions ec/powerdelta.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,29 @@
package ec

import (
"context"
"fmt"

"github.com/filecoin-project/go-f3/certs"
"github.com/filecoin-project/go-f3/gpbft"
)

func WithModifiedPower(backend Backend, delta []certs.PowerTableDelta) Backend {
return &withModifiedPower{
Backend: backend,
delta: delta,
}
}

type withModifiedPower struct {
Backend
delta []certs.PowerTableDelta
}

func (b *withModifiedPower) GetPowerTable(ctx context.Context, ts gpbft.TipSetKey) (gpbft.PowerEntries, error) {
pt, err := b.Backend.GetPowerTable(ctx, ts)
if err != nil {
return nil, fmt.Errorf("getting power table: %w", err)

Check warning on line 26 in ec/powerdelta.go

View check run for this annotation

Codecov / codecov/patch

ec/powerdelta.go#L26

Added line #L26 was not covered by tests
}
return certs.ApplyPowerTableDiffs(pt, b.delta)
}
Loading
Loading