Skip to content

Commit

Permalink
TPD concurrency (#1808)
Browse files Browse the repository at this point in the history
* fix skywire cli tp disc

* add initEnsureTPDConcurrency module
  • Loading branch information
0pcom authored Apr 13, 2024
1 parent 53d15f1 commit f9455ce
Show file tree
Hide file tree
Showing 2 changed files with 80 additions and 9 deletions.
19 changes: 11 additions & 8 deletions cmd/skywire-cli/commands/tp/tp.go
Original file line number Diff line number Diff line change
Expand Up @@ -314,20 +314,23 @@ var discTpCmd = &cobra.Command{
Short: "Discover remote transport(s)",
Long: "\n Discover remote transport(s) by ID or public key",
DisableFlagsInUseLine: true,
Args: func(_ *cobra.Command, _ []string) error {
Run: func(cmd *cobra.Command, _ []string) {
if tpID == "" && tpPK == "" {
return errors.New("must specify either transport id or public key")
internal.PrintFatalError(cmd.Flags(), errors.New("must specify either transport id or public key"))
return
}
if tpID != "" && tpPK != "" {
return errors.New("cannot specify both transport id and public key")
internal.PrintFatalError(cmd.Flags(), errors.New("cannot specify both transport id and public key"))
return
}
return nil
},
Run: func(cmd *cobra.Command, _ []string) {
var tppk cipher.PubKey
var tpid transportID
internal.Catch(cmd.Flags(), tpid.Set(tpID))
internal.Catch(cmd.Flags(), tppk.Set(tpPK))
if tpID != "" {
internal.Catch(cmd.Flags(), tpid.Set(tpID))
}
if tpPK != "" {
internal.Catch(cmd.Flags(), tppk.Set(tpPK))
}
rpcClient, err := clirpc.Client(cmd.Flags())
if err != nil {
os.Exit(1)
Expand Down
70 changes: 69 additions & 1 deletion pkg/visor/init.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@ import (
"time"

"github.com/ccding/go-stun/stun"
"github.com/google/uuid"
"github.com/sirupsen/logrus"
"github.com/skycoin/dmsg/pkg/direct"
dmsgdisc "github.com/skycoin/dmsg/pkg/disc"
Expand Down Expand Up @@ -95,6 +96,8 @@ var (
dmsgC vinit.Module
// Transportability checker ensures the visor can accept transports by creating a self-transport or exiting after 3 failed attempts to create one
tc vinit.Module
// TPD concurrency checker removes transports from tpd that the visor does not have registered locally
tpdco vinit.Module
// Transport manager
tr vinit.Module
// Transport setup
Expand Down Expand Up @@ -175,8 +178,9 @@ func registerModules(logger *logging.MasterLogger) {
skyFwd = maker("sky_forward_conn", initSkywireForwardConn, &dmsgC, &dmsgCtrl, &tr, &launch)
pi = maker("ping", initPing, &dmsgC, &tm)
tc = maker("transportable", initEnsureVisorIsTransportable, &dmsgC, &tm)
tpdco = maker("tpd_concurrency", initEnsureTPDConcurrency, &dmsgC, &tm)
vis = vinit.MakeModule("visor", vinit.DoNothing, logger, &ebc, &ar, &disc, &pty,
&tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC, &skyFwd, &pi, &systemSurvey, &tc)
&tr, &rt, &launch, &cli, &hvs, &ut, &pv, &pvs, &trs, &stcpC, &stcprC, &skyFwd, &pi, &systemSurvey, &tc, &tpdco)

hv = maker("hypervisor", initHypervisor, &vis)
}
Expand Down Expand Up @@ -1294,6 +1298,70 @@ func initEnsureVisorIsTransportable(ctx context.Context, v *Visor, log *logging.
return nil
}

func initEnsureTPDConcurrency(ctx context.Context, v *Visor, log *logging.Logger) error { //nolint:all
const tickDuration = 5 * time.Minute
ticker := time.NewTicker(tickDuration)
go func() {
time.Sleep(time.Minute)
for range ticker.C {
entries, err := v.DiscoverTransportsByPK(v.conf.PK)
if err != nil {
v.isServicesHealthy.unset()
log.WithError(err).Warn("Cannot ensure concurrency with TPD")
//reduce tick duration on non nil error
ticker.Reset(time.Minute)
} else {
var dtpids []uuid.UUID
var rmtpids []uuid.UUID
var tpids []uuid.UUID
for _, e := range entries {
if e.Edges[0] != e.Edges[1] {
dtpids = append(dtpids, e.ID)
}
}
transports, err := v.Transports(nil, nil, false)
for _, t := range transports {
if t.Local != t.Remote {
tpids = append(tpids, t.ID)
}
}
for _, t := range dtpids {
var found bool
for _, tt := range tpids {
if tt == t {
found = true
}
}
if !found {
rmtpids = append(rmtpids, t)
}
}
if 0 < len(rmtpids) {
log.WithError(err).Warn(fmt.Sprintf("Found %v transports in transport discovery not registered locally", len(rmtpids)))
tpdC, err := connectToTpDisc(ctx, v, v.MasterLogger().PackageLogger("tpd_concurrency"))
if err != nil {
log.WithError(err).Warn("failed to create transport discovery client")
} else {
for _, rm := range rmtpids {
err = tpdC.DeleteTransport(ctx, rm)
if err != nil {
log.WithError(err).Warn(fmt.Sprintf("Failed to remove transport from tpd %v", rm))
}
}
}
}
ticker.Reset(tickDuration)
}
}
}()

v.pushCloseStack("tpd_concurrency", func() error {
ticker.Stop()
return nil
})
return nil
}

// advertise this visor as public in service discovery
// this service is not considered critical and always returns true
func initPublicVisor(_ context.Context, v *Visor, log *logging.Logger) error { //nolint:all
Expand Down

0 comments on commit f9455ce

Please sign in to comment.