From 8ab4dd9532d0bd56391e22d843e35bf5764f2cec Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 10:12:14 +0000 Subject: [PATCH 01/12] reimplement dmsg-monitor with new logic --- cmd/dmsg-monitor/commands/root.go | 63 ++++----- cmd/dmsg-monitor/dmsg-monitor.go | 2 +- pkg/dmsg-monitor/api/api.go | 216 ++++++++++-------------------- 3 files changed, 102 insertions(+), 179 deletions(-) diff --git a/cmd/dmsg-monitor/commands/root.go b/cmd/dmsg-monitor/commands/root.go index ec48499e..53125bd4 100644 --- a/cmd/dmsg-monitor/commands/root.go +++ b/cmd/dmsg-monitor/commands/root.go @@ -2,7 +2,6 @@ package commands import ( - "context" "fmt" "log" "os" @@ -12,7 +11,6 @@ import ( "github.com/skycoin/skywire-utilities/pkg/buildinfo" "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/cmdutil" "github.com/skycoin/skywire-utilities/pkg/logging" "github.com/skycoin/skywire-utilities/pkg/tcpproxy" "github.com/spf13/cobra" @@ -28,18 +26,16 @@ var ( tag string logLvl string sleepDeregistration time.Duration - batchSize int ) func init() { - RootCmd.Flags().StringVarP(&addr, "addr", "a", ":9080", "address to bind to.\033[0m") - RootCmd.Flags().DurationVarP(&sleepDeregistration, "sleep-deregistration", "s", 60, "Sleep time for derigstration process in minutes\033[0m") - RootCmd.Flags().IntVarP(&batchSize, "batchsize", "b", 20, "Batch size of deregistration\033[0m") - RootCmd.Flags().StringVarP(&confPath, "config", "c", "dmsg-monitor.json", "config file location.\033[0m") + RootCmd.Flags().StringVarP(&addr, "addr", "a", "", "address to bind to.\033[0m") + RootCmd.Flags().DurationVarP(&sleepDeregistration, "sleep-deregistration", "s", 0, "Sleep time for derigstration process in minutes\033[0m") RootCmd.Flags().StringVarP(&dmsgURL, "dmsg-url", "d", "", "url to dmsg data.\033[0m") RootCmd.Flags().StringVarP(&utURL, "ut-url", "u", "", "url to uptime tracker visor data.\033[0m") + RootCmd.Flags().StringVarP(&confPath, "config", "c", "dmsg-monitor.json", "path of dmsg-monitor config\033[0m") RootCmd.Flags().StringVar(&tag, "tag", "dmsg_monitor", "logging tag\033[0m") - RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "info", "set log level one of: info, error, warn, debug, trace, panic") + RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "set log level one of: info, error, warn, debug, trace, panic") } // RootCmd contains the root command @@ -64,24 +60,33 @@ var RootCmd = &cobra.Command{ } mLogger := logging.NewMasterLogger() - lvl, err := logging.LevelFromString(logLvl) + conf, err := api.ReadConfig(confPath) if err != nil { - mLogger.Fatal("Invalid log level") + mLogger.Fatal("Invalid config file") } - logging.SetLevel(lvl) - - conf := api.InitConfig(confPath, mLogger) + // use overwrite config values if flags not set if dmsgURL == "" { - dmsgURL = conf.Dmsg.Discovery + dmsgURL = conf.DMSGUrl } if utURL == "" { - utURL = conf.UptimeTracker.Addr + "/uptimes" + utURL = conf.UTUrl + "/uptimes" + } + if addr == "" { + addr = conf.Addr + } + if sleepDeregistration == 0 { + sleepDeregistration = conf.SleepDeregistration + } + if logLvl == "" { + logLvl = conf.LogLevel } - var srvURLs api.ServicesURLs - srvURLs.DMSG = dmsgURL - srvURLs.UT = utURL + lvl, err := logging.LevelFromString(logLvl) + if err != nil { + mLogger.Fatal("Invalid log level") + } + logging.SetLevel(lvl) logger := mLogger.PackageLogger(tag) @@ -92,25 +97,15 @@ var RootCmd = &cobra.Command{ var monitorConfig api.DMSGMonitorConfig monitorConfig.PK = conf.PK monitorConfig.Sign = monitorSign - monitorConfig.BatchSize = batchSize - - dmsgMonitorAPI := api.New(logger, srvURLs, monitorConfig) - - ctx, cancel := cmdutil.SignalContext(context.Background(), logger) - defer cancel() + monitorConfig.DMSG = dmsgURL + monitorConfig.UT = utURL - go dmsgMonitorAPI.InitDeregistrationLoop(ctx, conf, sleepDeregistration) + dmsgMonitorAPI := api.New(logger, monitorConfig) - go func() { - if err := tcpproxy.ListenAndServe(addr, dmsgMonitorAPI); err != nil { - logger.Errorf("serve: %v", err) - cancel() - } - }() + go dmsgMonitorAPI.InitDeregistrationLoop(sleepDeregistration) - <-ctx.Done() - if err := dmsgMonitorAPI.Visor.Close(); err != nil { - logger.WithError(err).Error("Visor closed with error.") + if err := tcpproxy.ListenAndServe(addr, dmsgMonitorAPI); err != nil { + logger.Errorf("serve: %v", err) } }, } diff --git a/cmd/dmsg-monitor/dmsg-monitor.go b/cmd/dmsg-monitor/dmsg-monitor.go index a582269c..ca99a7b2 100644 --- a/cmd/dmsg-monitor/dmsg-monitor.go +++ b/cmd/dmsg-monitor/dmsg-monitor.go @@ -11,7 +11,7 @@ import ( func init() { var helpflag bool commands.RootCmd.SetUsageTemplate(help) - commands.RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsgpty-cli") + commands.RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsg-monitor") commands.RootCmd.SetHelpCommand(&cobra.Command{Hidden: true}) commands.RootCmd.PersistentFlags().MarkHidden("help") //nolint } diff --git a/pkg/dmsg-monitor/api/api.go b/pkg/dmsg-monitor/api/api.go index d377aafb..fed9749a 100644 --- a/pkg/dmsg-monitor/api/api.go +++ b/pkg/dmsg-monitor/api/api.go @@ -3,13 +3,13 @@ package api import ( "bytes" - "context" "encoding/json" "fmt" "io" "math/rand" "net/http" "net/url" + "os" "strings" "sync" "time" @@ -22,9 +22,6 @@ import ( "github.com/skycoin/skywire-utilities/pkg/httputil" "github.com/skycoin/skywire-utilities/pkg/logging" utilenv "github.com/skycoin/skywire-utilities/pkg/skyenv" - "github.com/skycoin/skywire/pkg/app/appserver" - "github.com/skycoin/skywire/pkg/visor" - "github.com/skycoin/skywire/pkg/visor/visorconfig" ) // API register all the API endpoints. @@ -32,8 +29,6 @@ import ( type API struct { http.Handler - Visor *visor.Visor - dmsgURL string utURL string logger logging.Logger @@ -42,19 +37,13 @@ type API struct { nmPk cipher.PubKey nmSign cipher.Sig - batchSize int whitelistedPKs map[string]bool } // DMSGMonitorConfig is struct for Keys and Sign value of dmsg monitor type DMSGMonitorConfig struct { - PK cipher.PubKey - Sign cipher.Sig - BatchSize int -} - -// ServicesURLs is struct for organize URL of services -type ServicesURLs struct { + PK cipher.PubKey + Sign cipher.Sig DMSG string UT string } @@ -71,16 +60,15 @@ type Error struct { } // New returns a new *chi.Mux object, which can be started as a server -func New(logger *logging.Logger, srvURLs ServicesURLs, monitorConfig DMSGMonitorConfig) *API { +func New(logger *logging.Logger, monitorConfig DMSGMonitorConfig) *API { api := &API{ - dmsgURL: srvURLs.DMSG, - utURL: srvURLs.UT, logger: *logger, startedAt: time.Now(), nmPk: monitorConfig.PK, nmSign: monitorConfig.Sign, - batchSize: monitorConfig.BatchSize, + dmsgURL: monitorConfig.DMSG, + utURL: monitorConfig.UT, whitelistedPKs: whitelistedPKs(), } r := chi.NewRouter() @@ -127,23 +115,16 @@ func (api *API) log(r *http.Request) logrus.FieldLogger { } // InitDeregistrationLoop is function which runs periodic background tasks of API. -func (api *API) InitDeregistrationLoop(ctx context.Context, conf *visorconfig.V1, sleepDeregistration time.Duration) { - // Start a visor - api.startVisor(ctx, conf) - +func (api *API) InitDeregistrationLoop(sleepDeregistration time.Duration) { + deadDmsgCandidate := make(map[string]bool) for { - select { - case <-ctx.Done(): - return - default: - api.deregister() - time.Sleep(sleepDeregistration * time.Minute) - } + api.deregister(deadDmsgCandidate) + time.Sleep(sleepDeregistration * time.Minute) } } // deregister use as routine to deregister old/dead entries in the network -func (api *API) deregister() { +func (api *API) deregister(deadDmsgCandidate map[string]bool) { api.logger.Info("Deregistration routine start.") defer api.dMu.Unlock() api.dMu.Lock() @@ -155,13 +136,13 @@ func (api *API) deregister() { return } - api.dmsgDeregistration(uptimes) + api.dmsgDeregistration(deadDmsgCandidate, uptimes) api.logger.Info("Deregistration routine completed.") } // dmsgDeregistration is a routine to deregister dead dmsg entries in dmsg discovery -func (api *API) dmsgDeregistration(uptimes map[string]bool) { +func (api *API) dmsgDeregistration(deadDmsgCandidate, uptimes map[string]bool) { api.logger.Info("DMSGD Deregistration started.") // get list of all dmsg clients, not servers @@ -176,36 +157,26 @@ func (api *API) dmsgDeregistration(uptimes map[string]bool) { }) // check dmsg clients either alive or dead checkerConfig := dmsgCheckerConfig{ - wg: new(sync.WaitGroup), - locker: new(sync.Mutex), - uptimes: uptimes, - transport: "dmsg", + wg: new(sync.WaitGroup), + locker: new(sync.Mutex), + uptimes: uptimes, } deadDmsg := []string{} - var tmpBatchSize, deadDmsgCount int - for i, client := range clients { + for _, client := range clients { if _, ok := api.whitelistedPKs[client]; !ok { checkerConfig.wg.Add(1) checkerConfig.client = client - go api.dmsgChecker(checkerConfig, &deadDmsg) - } - tmpBatchSize++ - if tmpBatchSize == api.batchSize || i == len(clients)-1 { - checkerConfig.wg.Wait() - // deregister clients from dmsg-discovery - if len(deadDmsg) > 0 { - api.dmsgDeregister(deadDmsg) - deadDmsgCount += len(deadDmsg) - } - deadDmsg = []string{} - tmpBatchSize = 0 + go api.dmsgChecker(checkerConfig, deadDmsgCandidate, &deadDmsg) } } - - api.logger.WithField("Number of dead DMSG entries", deadDmsgCount).Info("DMSGD Deregistration completed.") + checkerConfig.wg.Wait() + if len(deadDmsg) > 0 { + api.dmsgDeregister(deadDmsg) + } + api.logger.WithField("List of dead DMSG entries", deadDmsg).WithField("Number of dead DMSG entries", len(deadDmsg)).Info("DMSGD Deregistration completed.") } -func (api *API) dmsgChecker(cfg dmsgCheckerConfig, deadDmsg *[]string) { +func (api *API) dmsgChecker(cfg dmsgCheckerConfig, deadDmsgCandidate map[string]bool, deadDmsg *[]string) { defer cfg.wg.Done() key := cipher.PubKey{} @@ -215,34 +186,15 @@ func (api *API) dmsgChecker(cfg dmsgCheckerConfig, deadDmsg *[]string) { return } - var trp bool - retrier := 3 - for retrier > 0 { - tp, err := api.Visor.AddTransport(key, cfg.transport, time.Second*3) - if err != nil { - api.logger.WithField("Retry", 4-retrier).WithError(err).Warnf("Failed to establish %v transport to %v", cfg.transport, key) - retrier-- - if strings.Contains(err.Error(), "unknown network type") { - trp = true - retrier = 0 - } - } else { - api.logger.Infof("Established %v transport to %v", cfg.transport, key) - trp = true - err = api.Visor.RemoveTransport(tp.ID) - if err != nil { - api.logger.Warnf("Error removing %v transport of %v: %v", cfg.transport, key, err) - } - retrier = 0 - } - } - - if !trp { - if status, ok := cfg.uptimes[key.Hex()]; !ok || !status { - cfg.locker.Lock() + if status, ok := cfg.uptimes[key.Hex()]; !ok || !status { + cfg.locker.Lock() + if _, ok := deadDmsgCandidate[key.Hex()]; ok { *deadDmsg = append(*deadDmsg, key.Hex()) - cfg.locker.Unlock() + delete(deadDmsgCandidate, key.Hex()) + } else { + deadDmsgCandidate[key.Hex()] = true } + cfg.locker.Unlock() } } @@ -256,11 +208,10 @@ func (api *API) dmsgDeregister(keys []string) { } type dmsgCheckerConfig struct { - client string - transport string - uptimes map[string]bool - wg *sync.WaitGroup - locker *sync.Mutex + client string + uptimes map[string]bool + wg *sync.WaitGroup + locker *sync.Mutex } // deregisterRequest is dereigstration handler for all services @@ -350,74 +301,51 @@ type uptimes struct { Online bool `json:"online"` } -func (api *API) startVisor(ctx context.Context, conf *visorconfig.V1) { - conf.SetLogger(logging.NewMasterLogger()) - v, ok := visor.NewVisor(ctx, conf) - if !ok { - api.logger.Fatal("Failed to start visor.") - } - api.Visor = v +// MonitorConfig is the structure of dmsg-monitor's config +type MonitorConfig struct { + SK cipher.SecKey `json:"sk,omitempty"` + PK cipher.PubKey `json:"pk,omitempty"` + DMSGUrl string `json:"dmsg_url,omitempty"` + UTUrl string `json:"ut_url,omitempty"` + Addr string `json:"addr,omitempty"` + LogLevel string `json:"log_level,omitempty"` + SleepDeregistration time.Duration `json:"sleep_deregistration,omitempty"` } -// InitConfig to initilise config -func InitConfig(confPath string, mLog *logging.MasterLogger) *visorconfig.V1 { - log := mLog.PackageLogger("network_monitor:config") - log.Info("Reading config from file.") - log.WithField("filepath", confPath).Info() +func (c *MonitorConfig) ensureKeys() error { + if !c.PK.Null() { + return nil + } + if c.SK.Null() { + c.PK, c.SK = cipher.GenerateKeyPair() + return nil + } + var err error + if c.PK, err = c.SK.PubKey(); err != nil { + return err + } + return nil +} - oldConf, err := visorconfig.ReadFile(confPath) +// ReadConfig reads the config file without opening or writing to it +func ReadConfig(confPath string) (*MonitorConfig, error) { + f, err := os.ReadFile(confPath) //nolint if err != nil { - log.WithError(err).Fatal("Failed to read config file.") - } - var testEnv bool - if oldConf.Dmsg.Discovery == utilenv.TestDmsgDiscAddr { - testEnv = true - } - // have same services as old config - services := &visorconfig.Services{ - DmsgDiscovery: oldConf.Dmsg.Discovery, - TransportDiscovery: oldConf.Transport.Discovery, - AddressResolver: oldConf.Transport.AddressResolver, - RouteFinder: oldConf.Routing.RouteFinder, - RouteSetupNodes: oldConf.Routing.RouteSetupNodes, - UptimeTracker: oldConf.UptimeTracker.Addr, - ServiceDiscovery: oldConf.Launcher.ServiceDisc, - } - // update oldconfig - conf, err := visorconfig.MakeDefaultConfig(mLog, &oldConf.SK, false, false, testEnv, false, false, confPath, "", services) + return nil, fmt.Errorf("%w", err) + } + raw, err := io.ReadAll(bytes.NewReader(f)) if err != nil { - log.WithError(err).Fatal("Failed to create config.") + return nil, fmt.Errorf("%w", err) } - - // have the same apps that the old config had - var newConfLauncherApps []appserver.AppConfig - for _, app := range conf.Launcher.Apps { - for _, oldApp := range oldConf.Launcher.Apps { - if app.Name == oldApp.Name { - newConfLauncherApps = append(newConfLauncherApps, app) - } - } + var conf *MonitorConfig + dec := json.NewDecoder(bytes.NewReader(raw)) + if err := dec.Decode(&conf); err != nil { + return nil, fmt.Errorf("failed to decode json: %w", err) } - conf.Launcher.Apps = newConfLauncherApps - - conf.Version = oldConf.Version - conf.LocalPath = oldConf.LocalPath - conf.Launcher.BinPath = oldConf.Launcher.BinPath - conf.Launcher.ServerAddr = oldConf.Launcher.ServerAddr - conf.CLIAddr = oldConf.CLIAddr - conf.Transport.TransportSetupPKs = oldConf.Transport.TransportSetupPKs - - // following services are not needed - conf.STCP = nil - conf.Dmsgpty = nil - conf.Transport.PublicAutoconnect = false - - // save the config file - if err := conf.Flush(); err != nil { - log.WithError(err).Fatal("Failed to flush config to file.") + if err := conf.ensureKeys(); err != nil { + return nil, fmt.Errorf("%v: %w", "config has invalid secret key", err) } - - return conf + return conf, nil } func whitelistedPKs() map[string]bool { From ddca1e599fee04ec4811b922ccd2af7a2128c468 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 10:33:33 +0000 Subject: [PATCH 02/12] create internal monitors for all monitor services, as config reader and struct --- cmd/dmsg-monitor/commands/root.go | 3 +- internal/monitors/lig.go | 62 +++++++++++++++++++++++++++++++ pkg/dmsg-monitor/api/api.go | 48 ------------------------ 3 files changed, 64 insertions(+), 49 deletions(-) create mode 100644 internal/monitors/lig.go diff --git a/cmd/dmsg-monitor/commands/root.go b/cmd/dmsg-monitor/commands/root.go index 53125bd4..57fba3d2 100644 --- a/cmd/dmsg-monitor/commands/root.go +++ b/cmd/dmsg-monitor/commands/root.go @@ -15,6 +15,7 @@ import ( "github.com/skycoin/skywire-utilities/pkg/tcpproxy" "github.com/spf13/cobra" + "github.com/skycoin/skywire-services/internal/monitors" "github.com/skycoin/skywire-services/pkg/dmsg-monitor/api" ) @@ -60,7 +61,7 @@ var RootCmd = &cobra.Command{ } mLogger := logging.NewMasterLogger() - conf, err := api.ReadConfig(confPath) + conf, err := monitors.ReadConfig(confPath) if err != nil { mLogger.Fatal("Invalid config file") } diff --git a/internal/monitors/lig.go b/internal/monitors/lig.go new file mode 100644 index 00000000..47cf644e --- /dev/null +++ b/internal/monitors/lig.go @@ -0,0 +1,62 @@ +// Package monitors internal/monitors/lib.go +package monitors + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "os" + "time" + + "github.com/skycoin/skywire-utilities/pkg/cipher" +) + +// MonitorConfig is the structure of dmsg-monitor's config +type MonitorConfig struct { + SK cipher.SecKey `json:"sk,omitempty"` + PK cipher.PubKey `json:"pk,omitempty"` + DMSGUrl string `json:"dmsg_url,omitempty"` + UTUrl string `json:"ut_url,omitempty"` + ARUrl string `json:"ar_url,omitempty"` + TPDUrl string `json:"tpd_url,omitempty"` + Addr string `json:"addr,omitempty"` + LogLevel string `json:"log_level,omitempty"` + SleepDeregistration time.Duration `json:"sleep_deregistration,omitempty"` +} + +func (c *MonitorConfig) ensureKeys() error { + if !c.PK.Null() { + return nil + } + if c.SK.Null() { + c.PK, c.SK = cipher.GenerateKeyPair() + return nil + } + var err error + if c.PK, err = c.SK.PubKey(); err != nil { + return err + } + return nil +} + +// ReadConfig reads the config file without opening or writing to it +func ReadConfig(confPath string) (*MonitorConfig, error) { + f, err := os.ReadFile(confPath) //nolint + if err != nil { + return nil, fmt.Errorf("%w", err) + } + raw, err := io.ReadAll(bytes.NewReader(f)) + if err != nil { + return nil, fmt.Errorf("%w", err) + } + var conf *MonitorConfig + dec := json.NewDecoder(bytes.NewReader(raw)) + if err := dec.Decode(&conf); err != nil { + return nil, fmt.Errorf("failed to decode json: %w", err) + } + if err := conf.ensureKeys(); err != nil { + return nil, fmt.Errorf("%v: %w", "config has invalid secret key", err) + } + return conf, nil +} diff --git a/pkg/dmsg-monitor/api/api.go b/pkg/dmsg-monitor/api/api.go index fed9749a..1568f863 100644 --- a/pkg/dmsg-monitor/api/api.go +++ b/pkg/dmsg-monitor/api/api.go @@ -9,7 +9,6 @@ import ( "math/rand" "net/http" "net/url" - "os" "strings" "sync" "time" @@ -301,53 +300,6 @@ type uptimes struct { Online bool `json:"online"` } -// MonitorConfig is the structure of dmsg-monitor's config -type MonitorConfig struct { - SK cipher.SecKey `json:"sk,omitempty"` - PK cipher.PubKey `json:"pk,omitempty"` - DMSGUrl string `json:"dmsg_url,omitempty"` - UTUrl string `json:"ut_url,omitempty"` - Addr string `json:"addr,omitempty"` - LogLevel string `json:"log_level,omitempty"` - SleepDeregistration time.Duration `json:"sleep_deregistration,omitempty"` -} - -func (c *MonitorConfig) ensureKeys() error { - if !c.PK.Null() { - return nil - } - if c.SK.Null() { - c.PK, c.SK = cipher.GenerateKeyPair() - return nil - } - var err error - if c.PK, err = c.SK.PubKey(); err != nil { - return err - } - return nil -} - -// ReadConfig reads the config file without opening or writing to it -func ReadConfig(confPath string) (*MonitorConfig, error) { - f, err := os.ReadFile(confPath) //nolint - if err != nil { - return nil, fmt.Errorf("%w", err) - } - raw, err := io.ReadAll(bytes.NewReader(f)) - if err != nil { - return nil, fmt.Errorf("%w", err) - } - var conf *MonitorConfig - dec := json.NewDecoder(bytes.NewReader(raw)) - if err := dec.Decode(&conf); err != nil { - return nil, fmt.Errorf("failed to decode json: %w", err) - } - if err := conf.ensureKeys(); err != nil { - return nil, fmt.Errorf("%v: %w", "config has invalid secret key", err) - } - return conf, nil -} - func whitelistedPKs() map[string]bool { whitelistedPKs := make(map[string]bool) for _, pk := range strings.Split(utilenv.NetworkMonitorPKs, ",") { From ee3302540df9bf7d656df135ea66a9e796b14a1f Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 11:44:22 +0000 Subject: [PATCH 03/12] reimplement ar and merge to dmsg as one monitor service --- cmd/dmsg-monitor/commands/root.go | 23 +++-- pkg/dmsg-monitor/api/api.go | 154 +++++++++++++++++++++++------- 2 files changed, 131 insertions(+), 46 deletions(-) diff --git a/cmd/dmsg-monitor/commands/root.go b/cmd/dmsg-monitor/commands/root.go index 57fba3d2..ee3c5e61 100644 --- a/cmd/dmsg-monitor/commands/root.go +++ b/cmd/dmsg-monitor/commands/root.go @@ -23,6 +23,7 @@ var ( confPath string dmsgURL string utURL string + arURL string addr string tag string logLvl string @@ -32,10 +33,11 @@ var ( func init() { RootCmd.Flags().StringVarP(&addr, "addr", "a", "", "address to bind to.\033[0m") RootCmd.Flags().DurationVarP(&sleepDeregistration, "sleep-deregistration", "s", 0, "Sleep time for derigstration process in minutes\033[0m") - RootCmd.Flags().StringVarP(&dmsgURL, "dmsg-url", "d", "", "url to dmsg data.\033[0m") - RootCmd.Flags().StringVarP(&utURL, "ut-url", "u", "", "url to uptime tracker visor data.\033[0m") - RootCmd.Flags().StringVarP(&confPath, "config", "c", "dmsg-monitor.json", "path of dmsg-monitor config\033[0m") - RootCmd.Flags().StringVar(&tag, "tag", "dmsg_monitor", "logging tag\033[0m") + RootCmd.Flags().StringVar(&dmsgURL, "dmsg-url", "", "url to dmsg data.\033[0m") + RootCmd.Flags().StringVar(&utURL, "ut-url", "", "url to uptime tracker visor data.\033[0m") + RootCmd.Flags().StringVar(&arURL, "ar-url", "", "url to ar data.\033[0m") + RootCmd.Flags().StringVarP(&confPath, "config", "c", "network-monitor.json", "path of network-monitor config\033[0m") + RootCmd.Flags().StringVar(&tag, "tag", "network_monitor", "logging tag\033[0m") RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "set log level one of: info, error, warn, debug, trace, panic") } @@ -46,10 +48,9 @@ var RootCmd = &cobra.Command{ }(), Short: "DMSG monitor of DMSG discovery entries.", Long: ` - ┌┬┐┌┬┐┌─┐┌─┐ ┌┬┐┌─┐┌┐┌┬┌┬┐┌─┐┬─┐ - │││││└─┐│ ┬───││││ │││││ │ │ │├┬┘ - ─┴┘┴ ┴└─┘└─┘ ┴ ┴└─┘┘└┘┴ ┴ └─┘┴└─ -`, + ┌┐┌┌─┐┌┬┐┬ ┬┌─┐┬─┐┬┌─ ┌┬┐┌─┐┌┐┌┬┌┬┐┌─┐┬─┐ + │││├┤ │ ││││ │├┬┘├┴┐───││││ │││││ │ │ │├┬┘ + ┘└┘└─┘ ┴ └┴┘└─┘┴└─┴ ┴ ┴ ┴└─┘┘└┘┴ ┴ └─┘┴└─`, SilenceErrors: true, SilenceUsage: true, DisableSuggestions: true, @@ -73,6 +74,9 @@ var RootCmd = &cobra.Command{ if utURL == "" { utURL = conf.UTUrl + "/uptimes" } + if arURL == "" { + arURL = conf.ARUrl + } if addr == "" { addr = conf.Addr } @@ -95,11 +99,12 @@ var RootCmd = &cobra.Command{ monitorSign, _ := cipher.SignPayload([]byte(conf.PK.Hex()), conf.SK) //nolint - var monitorConfig api.DMSGMonitorConfig + var monitorConfig api.MonitorConfig monitorConfig.PK = conf.PK monitorConfig.Sign = monitorSign monitorConfig.DMSG = dmsgURL monitorConfig.UT = utURL + monitorConfig.AR = arURL dmsgMonitorAPI := api.New(logger, monitorConfig) diff --git a/pkg/dmsg-monitor/api/api.go b/pkg/dmsg-monitor/api/api.go index 1568f863..b3afb5aa 100644 --- a/pkg/dmsg-monitor/api/api.go +++ b/pkg/dmsg-monitor/api/api.go @@ -6,7 +6,6 @@ import ( "encoding/json" "fmt" "io" - "math/rand" "net/http" "net/url" "strings" @@ -30,6 +29,7 @@ type API struct { dmsgURL string utURL string + arURL string logger logging.Logger dMu sync.RWMutex startedAt time.Time @@ -39,12 +39,13 @@ type API struct { whitelistedPKs map[string]bool } -// DMSGMonitorConfig is struct for Keys and Sign value of dmsg monitor -type DMSGMonitorConfig struct { +// MonitorConfig is struct for Keys and Sign value of network monitor +type MonitorConfig struct { PK cipher.PubKey Sign cipher.Sig DMSG string UT string + AR string } // HealthCheckResponse is struct of /health endpoint @@ -59,7 +60,7 @@ type Error struct { } // New returns a new *chi.Mux object, which can be started as a server -func New(logger *logging.Logger, monitorConfig DMSGMonitorConfig) *API { +func New(logger *logging.Logger, monitorConfig MonitorConfig) *API { api := &API{ logger: *logger, @@ -68,6 +69,7 @@ func New(logger *logging.Logger, monitorConfig DMSGMonitorConfig) *API { nmSign: monitorConfig.Sign, dmsgURL: monitorConfig.DMSG, utURL: monitorConfig.UT, + arURL: monitorConfig.AR, whitelistedPKs: whitelistedPKs(), } r := chi.NewRouter() @@ -116,15 +118,17 @@ func (api *API) log(r *http.Request) logrus.FieldLogger { // InitDeregistrationLoop is function which runs periodic background tasks of API. func (api *API) InitDeregistrationLoop(sleepDeregistration time.Duration) { deadDmsgCandidate := make(map[string]bool) + deadStcprCandidate := make(map[string]bool) + deadSudphCandidate := make(map[string]bool) for { - api.deregister(deadDmsgCandidate) + api.deregister(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate) time.Sleep(sleepDeregistration * time.Minute) } } // deregister use as routine to deregister old/dead entries in the network -func (api *API) deregister(deadDmsgCandidate map[string]bool) { - api.logger.Info("Deregistration routine start.") +func (api *API) deregister(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate map[string]bool) { + api.logger.Info("Deregistration routine started.") defer api.dMu.Unlock() api.dMu.Lock() @@ -135,51 +139,94 @@ func (api *API) deregister(deadDmsgCandidate map[string]bool) { return } - api.dmsgDeregistration(deadDmsgCandidate, uptimes) + api.networkDeregistration(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate, uptimes) api.logger.Info("Deregistration routine completed.") } // dmsgDeregistration is a routine to deregister dead dmsg entries in dmsg discovery -func (api *API) dmsgDeregistration(deadDmsgCandidate, uptimes map[string]bool) { - api.logger.Info("DMSGD Deregistration started.") +func (api *API) networkDeregistration(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate, uptimes map[string]bool) { + conf := checkerConfig{ + wg: new(sync.WaitGroup), + locker: new(sync.Mutex), + uptimes: uptimes, + } + api.dmsgDeregistration(conf, deadDmsgCandidate) + api.arDeregistration(conf, deadStcprCandidate, deadSudphCandidate) +} +func (api *API) dmsgDeregistration(conf checkerConfig, deadDmsgCandidate map[string]bool) { + api.logger.Info("DMSG deregistraion routine started.") // get list of all dmsg clients, not servers - clients, err := getClients(api.dmsgURL) + dmsgEntries, err := getDMSGEntries(api.dmsgURL) if err != nil { - api.logger.Warnf("Error occur during get dmsg clients list due to %s", err) + api.logger.Warnf("Error occur during get dmsg entries list due to %s", err) return } - //randomize the order of the dmsg entries - rand.Shuffle(len(clients), func(i, j int) { - clients[i], clients[j] = clients[j], clients[i] - }) - // check dmsg clients either alive or dead - checkerConfig := dmsgCheckerConfig{ - wg: new(sync.WaitGroup), - locker: new(sync.Mutex), - uptimes: uptimes, - } + + // DMSG deregistration deadDmsg := []string{} - for _, client := range clients { - if _, ok := api.whitelistedPKs[client]; !ok { - checkerConfig.wg.Add(1) - checkerConfig.client = client - go api.dmsgChecker(checkerConfig, deadDmsgCandidate, &deadDmsg) + for _, entry := range dmsgEntries { + if _, ok := api.whitelistedPKs[entry]; !ok { + conf.wg.Add(1) + conf.entry = entry + go api.entryChecker(conf, deadDmsgCandidate, &deadDmsg) } } - checkerConfig.wg.Wait() + conf.wg.Wait() if len(deadDmsg) > 0 { api.dmsgDeregister(deadDmsg) } api.logger.WithField("List of dead DMSG entries", deadDmsg).WithField("Number of dead DMSG entries", len(deadDmsg)).Info("DMSGD Deregistration completed.") + api.logger.Info("DMSG deregistraion routine completed.") +} + +func (api *API) arDeregistration(conf checkerConfig, deadStcprCandidate, deadSudphCandidate map[string]bool) { + api.logger.Info("AR deregistraion routine started.") + // get list of all ar entries + arEntries, err := getAREntries(api.arURL) + if err != nil { + api.logger.Warnf("error occur during get ar entries list due to %s", err) + return + } + + // STCPR deregistration + deadStcpr := []string{} + for _, entry := range arEntries.Stcpr { + if _, ok := api.whitelistedPKs[entry]; !ok { + conf.wg.Add(1) + conf.entry = entry + go api.entryChecker(conf, deadStcprCandidate, &deadStcpr) + } + } + conf.wg.Wait() + if len(deadStcpr) > 0 { + api.arDeregister(deadStcpr, "stcpr") + } + api.logger.WithField("list of dead stcpr entries", deadStcpr).WithField("number of dead stcpr entries", len(deadStcpr)).Info("stcpr deregistration completed.") + + // SUDPH deregistration + deadSudph := []string{} + for _, entry := range arEntries.Sudph { + if _, ok := api.whitelistedPKs[entry]; !ok { + conf.wg.Add(1) + conf.entry = entry + go api.entryChecker(conf, deadSudphCandidate, &deadSudph) + } + } + conf.wg.Wait() + if len(deadSudph) > 0 { + api.arDeregister(deadSudph, "sudph") + } + api.logger.WithField("list of dead sudph entries", deadSudph).WithField("number of dead sudph entries", len(deadSudph)).Info("sudph deregistration completed.") + api.logger.Info("AR deregistraion routine completed.") } -func (api *API) dmsgChecker(cfg dmsgCheckerConfig, deadDmsgCandidate map[string]bool, deadDmsg *[]string) { +func (api *API) entryChecker(cfg checkerConfig, deadCandidate map[string]bool, deadEntries *[]string) { defer cfg.wg.Done() key := cipher.PubKey{} - err := key.UnmarshalText([]byte(cfg.client)) + err := key.UnmarshalText([]byte(cfg.entry)) if err != nil { api.logger.Warnf("Error marshaling key: %s", err) return @@ -187,11 +234,11 @@ func (api *API) dmsgChecker(cfg dmsgCheckerConfig, deadDmsgCandidate map[string] if status, ok := cfg.uptimes[key.Hex()]; !ok || !status { cfg.locker.Lock() - if _, ok := deadDmsgCandidate[key.Hex()]; ok { - *deadDmsg = append(*deadDmsg, key.Hex()) - delete(deadDmsgCandidate, key.Hex()) + if _, ok := deadCandidate[key.Hex()]; ok { + *deadEntries = append(*deadEntries, key.Hex()) + delete(deadCandidate, key.Hex()) } else { - deadDmsgCandidate[key.Hex()] = true + deadCandidate[key.Hex()] = true } cfg.locker.Unlock() } @@ -206,8 +253,17 @@ func (api *API) dmsgDeregister(keys []string) { api.logger.Info("Deregister request send to DSMGD") } -type dmsgCheckerConfig struct { - client string +func (api *API) arDeregister(keys []string, entryType string) { + err := api.deregisterRequest(keys, fmt.Sprintf(api.arURL+"/deregister/%s", entryType), "address resolver") + if err != nil { + api.logger.Warn(err) + return + } + api.logger.Info("Deregister request send to DSMGD") +} + +type checkerConfig struct { + entry string uptimes map[string]bool wg *sync.WaitGroup locker *sync.Mutex @@ -251,7 +307,7 @@ func (api *API) deregisterRequest(keys []string, rawReqURL, service string) erro type clientList []string -func getClients(dmsgURL string) (data clientList, err error) { +func getDMSGEntries(dmsgURL string) (data clientList, err error) { res, err := http.Get(dmsgURL + "/dmsg-discovery/visorEntries") //nolint if err != nil { @@ -300,6 +356,30 @@ type uptimes struct { Online bool `json:"online"` } +type visorTransports struct { + Sudph []string `json:"sudph"` + Stcpr []string `json:"stcpr"` +} + +func getAREntries(arURL string) (data visorTransports, err error) { + res, err := http.Get(arURL + "/transports") //nolint + if err != nil { + return visorTransports{}, err + } + + body, err := io.ReadAll(res.Body) + if err != nil { + return visorTransports{}, err + } + + err = json.Unmarshal(body, &data) + if err != nil { + return visorTransports{}, err + } + + return data, err +} + func whitelistedPKs() map[string]bool { whitelistedPKs := make(map[string]bool) for _, pk := range strings.Split(utilenv.NetworkMonitorPKs, ",") { From c67002795ca0dac106479a9f0823e805b199b04c Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 11:47:23 +0000 Subject: [PATCH 04/12] combine ar and dmsg monitor as network-monitor --- cmd/dmsg-monitor/README.md | 16 - cmd/dmsg-monitor/commands/root.go | 124 ----- cmd/dmsg-monitor/dmsg-monitor.go | 43 -- cmd/network-monitor/README.md | 5 +- cmd/network-monitor/commands/root.go | 143 ++--- cmd/network-monitor/network-monitor.go | 2 +- pkg/dmsg-monitor/api/api.go | 410 -------------- pkg/network-monitor/api/api.go | 510 ++++++------------ pkg/network-monitor/store/memory_store.go | 59 -- .../store/memory_store_test.go | 13 - pkg/network-monitor/store/redis_store.go | 101 ---- pkg/network-monitor/store/redis_store_test.go | 54 -- pkg/network-monitor/store/store.go | 40 -- 13 files changed, 210 insertions(+), 1310 deletions(-) delete mode 100644 cmd/dmsg-monitor/README.md delete mode 100644 cmd/dmsg-monitor/commands/root.go delete mode 100644 cmd/dmsg-monitor/dmsg-monitor.go delete mode 100644 pkg/dmsg-monitor/api/api.go delete mode 100644 pkg/network-monitor/store/memory_store.go delete mode 100644 pkg/network-monitor/store/memory_store_test.go delete mode 100644 pkg/network-monitor/store/redis_store.go delete mode 100644 pkg/network-monitor/store/redis_store_test.go delete mode 100644 pkg/network-monitor/store/store.go diff --git a/cmd/dmsg-monitor/README.md b/cmd/dmsg-monitor/README.md deleted file mode 100644 index 247b3f98..00000000 --- a/cmd/dmsg-monitor/README.md +++ /dev/null @@ -1,16 +0,0 @@ -# DMSG Monitor - -## API endpoints - -### GET `/health` -Gets the health info of the service. e.g. -``` -{ - "build_info": { - "version": "v1.0.1-267-ge1617c5b", - "commit": "e1617c5b0121182cfd2b610dc518e4753e56440e", - "date": "2022-10-25T11:01:52Z" - }, - "started_at": "2022-10-25T11:10:45.152629597Z" -} -``` diff --git a/cmd/dmsg-monitor/commands/root.go b/cmd/dmsg-monitor/commands/root.go deleted file mode 100644 index ee3c5e61..00000000 --- a/cmd/dmsg-monitor/commands/root.go +++ /dev/null @@ -1,124 +0,0 @@ -// Package commands cmd/dmsg-monitor/commands/root.go -package commands - -import ( - "fmt" - "log" - "os" - "path/filepath" - "strings" - "time" - - "github.com/skycoin/skywire-utilities/pkg/buildinfo" - "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/logging" - "github.com/skycoin/skywire-utilities/pkg/tcpproxy" - "github.com/spf13/cobra" - - "github.com/skycoin/skywire-services/internal/monitors" - "github.com/skycoin/skywire-services/pkg/dmsg-monitor/api" -) - -var ( - confPath string - dmsgURL string - utURL string - arURL string - addr string - tag string - logLvl string - sleepDeregistration time.Duration -) - -func init() { - RootCmd.Flags().StringVarP(&addr, "addr", "a", "", "address to bind to.\033[0m") - RootCmd.Flags().DurationVarP(&sleepDeregistration, "sleep-deregistration", "s", 0, "Sleep time for derigstration process in minutes\033[0m") - RootCmd.Flags().StringVar(&dmsgURL, "dmsg-url", "", "url to dmsg data.\033[0m") - RootCmd.Flags().StringVar(&utURL, "ut-url", "", "url to uptime tracker visor data.\033[0m") - RootCmd.Flags().StringVar(&arURL, "ar-url", "", "url to ar data.\033[0m") - RootCmd.Flags().StringVarP(&confPath, "config", "c", "network-monitor.json", "path of network-monitor config\033[0m") - RootCmd.Flags().StringVar(&tag, "tag", "network_monitor", "logging tag\033[0m") - RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "set log level one of: info, error, warn, debug, trace, panic") -} - -// RootCmd contains the root command -var RootCmd = &cobra.Command{ - Use: func() string { - return strings.Split(filepath.Base(strings.ReplaceAll(strings.ReplaceAll(fmt.Sprintf("%v", os.Args), "[", ""), "]", "")), " ")[0] - }(), - Short: "DMSG monitor of DMSG discovery entries.", - Long: ` - ┌┐┌┌─┐┌┬┐┬ ┬┌─┐┬─┐┬┌─ ┌┬┐┌─┐┌┐┌┬┌┬┐┌─┐┬─┐ - │││├┤ │ ││││ │├┬┘├┴┐───││││ │││││ │ │ │├┬┘ - ┘└┘└─┘ ┴ └┴┘└─┘┴└─┴ ┴ ┴ ┴└─┘┘└┘┴ ┴ └─┘┴└─`, - SilenceErrors: true, - SilenceUsage: true, - DisableSuggestions: true, - DisableFlagsInUseLine: true, - Version: buildinfo.Version(), - Run: func(_ *cobra.Command, _ []string) { - if _, err := buildinfo.Get().WriteTo(os.Stdout); err != nil { - log.Printf("Failed to output build info: %v", err) - } - - mLogger := logging.NewMasterLogger() - conf, err := monitors.ReadConfig(confPath) - if err != nil { - mLogger.Fatal("Invalid config file") - } - - // use overwrite config values if flags not set - if dmsgURL == "" { - dmsgURL = conf.DMSGUrl - } - if utURL == "" { - utURL = conf.UTUrl + "/uptimes" - } - if arURL == "" { - arURL = conf.ARUrl - } - if addr == "" { - addr = conf.Addr - } - if sleepDeregistration == 0 { - sleepDeregistration = conf.SleepDeregistration - } - if logLvl == "" { - logLvl = conf.LogLevel - } - - lvl, err := logging.LevelFromString(logLvl) - if err != nil { - mLogger.Fatal("Invalid log level") - } - logging.SetLevel(lvl) - - logger := mLogger.PackageLogger(tag) - - logger.WithField("addr", addr).Info("Serving DMSG-Monitor API...") - - monitorSign, _ := cipher.SignPayload([]byte(conf.PK.Hex()), conf.SK) //nolint - - var monitorConfig api.MonitorConfig - monitorConfig.PK = conf.PK - monitorConfig.Sign = monitorSign - monitorConfig.DMSG = dmsgURL - monitorConfig.UT = utURL - monitorConfig.AR = arURL - - dmsgMonitorAPI := api.New(logger, monitorConfig) - - go dmsgMonitorAPI.InitDeregistrationLoop(sleepDeregistration) - - if err := tcpproxy.ListenAndServe(addr, dmsgMonitorAPI); err != nil { - logger.Errorf("serve: %v", err) - } - }, -} - -// Execute executes root CLI command. -func Execute() { - if err := RootCmd.Execute(); err != nil { - log.Fatal("Failed to execute command: ", err) - } -} diff --git a/cmd/dmsg-monitor/dmsg-monitor.go b/cmd/dmsg-monitor/dmsg-monitor.go deleted file mode 100644 index ca99a7b2..00000000 --- a/cmd/dmsg-monitor/dmsg-monitor.go +++ /dev/null @@ -1,43 +0,0 @@ -// Package main cmd/dmsg-monitor/dmsg-monitor.go -package main - -import ( - cc "github.com/ivanpirog/coloredcobra" - "github.com/spf13/cobra" - - "github.com/skycoin/skywire-services/cmd/dmsg-monitor/commands" -) - -func init() { - var helpflag bool - commands.RootCmd.SetUsageTemplate(help) - commands.RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsg-monitor") - commands.RootCmd.SetHelpCommand(&cobra.Command{Hidden: true}) - commands.RootCmd.PersistentFlags().MarkHidden("help") //nolint -} - -func main() { - cc.Init(&cc.Config{ - RootCmd: commands.RootCmd, - Headings: cc.HiBlue + cc.Bold, - Commands: cc.HiBlue + cc.Bold, - CmdShortDescr: cc.HiBlue, - Example: cc.HiBlue + cc.Italic, - ExecName: cc.HiBlue + cc.Bold, - Flags: cc.HiBlue + cc.Bold, - FlagsDescr: cc.HiBlue, - NoExtraNewlines: true, - NoBottomNewline: true, - }) - commands.Execute() -} - -const help = "Usage:\r\n" + - " {{.UseLine}}{{if .HasAvailableSubCommands}}{{end}} {{if gt (len .Aliases) 0}}\r\n\r\n" + - "{{.NameAndAliases}}{{end}}{{if .HasAvailableSubCommands}}\r\n\r\n" + - "Available Commands:{{range .Commands}}{{if (or .IsAvailableCommand)}}\r\n " + - "{{rpad .Name .NamePadding }} {{.Short}}{{end}}{{end}}{{end}}{{if .HasAvailableLocalFlags}}\r\n\r\n" + - "Flags:\r\n" + - "{{.LocalFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}{{if .HasAvailableInheritedFlags}}\r\n\r\n" + - "Global Flags:\r\n" + - "{{.InheritedFlags.FlagUsages | trimTrailingWhitespaces}}{{end}}\r\n\r\n" diff --git a/cmd/network-monitor/README.md b/cmd/network-monitor/README.md index 19584653..247b3f98 100644 --- a/cmd/network-monitor/README.md +++ b/cmd/network-monitor/README.md @@ -1,4 +1,4 @@ -# Network Monitor +# DMSG Monitor ## API endpoints @@ -14,6 +14,3 @@ Gets the health info of the service. e.g. "started_at": "2022-10-25T11:10:45.152629597Z" } ``` - -### GET `/status` -Gets the status of all the transports in the deployment. diff --git a/cmd/network-monitor/commands/root.go b/cmd/network-monitor/commands/root.go index c787d39f..0d175db9 100644 --- a/cmd/network-monitor/commands/root.go +++ b/cmd/network-monitor/commands/root.go @@ -2,7 +2,6 @@ package commands import ( - "context" "fmt" "log" "os" @@ -12,52 +11,34 @@ import ( "github.com/skycoin/skywire-utilities/pkg/buildinfo" "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/cmdutil" "github.com/skycoin/skywire-utilities/pkg/logging" - "github.com/skycoin/skywire-utilities/pkg/metricsutil" - "github.com/skycoin/skywire-utilities/pkg/storeconfig" "github.com/skycoin/skywire-utilities/pkg/tcpproxy" "github.com/spf13/cobra" - "github.com/skycoin/skywire-services/internal/nmmetrics" + "github.com/skycoin/skywire-services/internal/monitors" "github.com/skycoin/skywire-services/pkg/network-monitor/api" - "github.com/skycoin/skywire-services/pkg/network-monitor/store" -) - -const ( - redisScheme = "redis://" ) var ( - sleepDeregistration time.Duration confPath string - sdURL string - arURL string + dmsgURL string utURL string + arURL string addr string tag string logLvl string - metricsAddr string - redisURL string - testing bool - redisPoolSize int - batchSize int + sleepDeregistration time.Duration ) func init() { - RootCmd.Flags().StringVarP(&addr, "addr", "a", ":9080", "address to bind to.\033[0m") - RootCmd.Flags().DurationVar(&sleepDeregistration, "sleep-deregistration", 10, "Sleep time for derigstration process in minutes\033[0m") - RootCmd.Flags().IntVarP(&batchSize, "batchsize", "b", 30, "Batch size of deregistration\033[0m") - RootCmd.Flags().StringVarP(&confPath, "config", "c", "network-monitor.json", "config file location.\033[0m") - RootCmd.Flags().StringVarP(&sdURL, "sd-url", "n", "", "url to service discovery.\033[0m") - RootCmd.Flags().StringVarP(&arURL, "ar-url", "v", "", "url to address resolver.\033[0m") - RootCmd.Flags().StringVarP(&utURL, "ut-url", "u", "", "url to uptime tracker visor data.\033[0m") + RootCmd.Flags().StringVarP(&addr, "addr", "a", "", "address to bind to.\033[0m") + RootCmd.Flags().DurationVarP(&sleepDeregistration, "sleep-deregistration", "s", 0, "Sleep time for derigstration process in minutes\033[0m") + RootCmd.Flags().StringVar(&dmsgURL, "dmsg-url", "", "url to dmsg data.\033[0m") + RootCmd.Flags().StringVar(&utURL, "ut-url", "", "url to uptime tracker visor data.\033[0m") + RootCmd.Flags().StringVar(&arURL, "ar-url", "", "url to ar data.\033[0m") + RootCmd.Flags().StringVarP(&confPath, "config", "c", "network-monitor.json", "path of network-monitor config\033[0m") RootCmd.Flags().StringVar(&tag, "tag", "network_monitor", "logging tag\033[0m") - RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "info", "set log level one of: info, error, warn, debug, trace, panic") - RootCmd.Flags().StringVarP(&metricsAddr, "metrics", "m", "", "address to bind metrics API to\033[0m") - RootCmd.Flags().StringVar(&redisURL, "redis", "redis://localhost:6379", "connections string for a redis store\033[0m") - RootCmd.Flags().BoolVarP(&testing, "testing", "t", false, "enable testing to start without redis\033[0m") - RootCmd.Flags().IntVar(&redisPoolSize, "redis-pool-size", 10, "redis connection pool size\033[0m") + RootCmd.Flags().StringVarP(&logLvl, "loglvl", "l", "", "set log level one of: info, error, warn, debug, trace, panic") } // RootCmd contains the root command @@ -65,7 +46,7 @@ var RootCmd = &cobra.Command{ Use: func() string { return strings.Split(filepath.Base(strings.ReplaceAll(strings.ReplaceAll(fmt.Sprintf("%v", os.Args), "[", ""), "]", "")), " ")[0] }(), - Short: "Network monitor for skywire VPN and Visor.", + Short: "DMSG monitor of DMSG discovery entries.", Long: ` ┌┐┌┌─┐┌┬┐┬ ┬┌─┐┬─┐┬┌─ ┌┬┐┌─┐┌┐┌┬┌┬┐┌─┐┬─┐ │││├┤ │ ││││ │├┬┘├┴┐───││││ │││││ │ │ │├┬┘ @@ -80,89 +61,57 @@ var RootCmd = &cobra.Command{ log.Printf("Failed to output build info: %v", err) } - if !strings.HasPrefix(redisURL, redisScheme) { - redisURL = redisScheme + redisURL - } - - storeConfig := storeconfig.Config{ - Type: storeconfig.Redis, - URL: redisURL, - Password: storeconfig.RedisPassword(), - PoolSize: redisPoolSize, - } - - if testing { - storeConfig.Type = storeconfig.Memory - } - - s, err := store.New(storeConfig) - if err != nil { - log.Fatal("Failed to initialize redis store: ", err) - } - mLogger := logging.NewMasterLogger() - lvl, err := logging.LevelFromString(logLvl) + conf, err := monitors.ReadConfig(confPath) if err != nil { - mLogger.Fatal("Invalid loglvl detected") + mLogger.Fatal("Invalid config file") } - logging.SetLevel(lvl) - conf := api.InitConfig(confPath, mLogger) - - if sdURL == "" { - sdURL = conf.Launcher.ServiceDisc + // use overwrite config values if flags not set + if dmsgURL == "" { + dmsgURL = conf.DMSGUrl + } + if utURL == "" { + utURL = conf.UTUrl + "/uptimes" } if arURL == "" { - arURL = conf.Transport.AddressResolver + arURL = conf.ARUrl } - if utURL == "" { - utURL = conf.UptimeTracker.Addr + "/uptimes" + if addr == "" { + addr = conf.Addr + } + if sleepDeregistration == 0 { + sleepDeregistration = conf.SleepDeregistration + } + if logLvl == "" { + logLvl = conf.LogLevel } - var srvURLs api.ServicesURLs - srvURLs.SD = sdURL - srvURLs.AR = arURL - srvURLs.UT = utURL - - logger := mLogger.PackageLogger("network_monitor") - - logger.WithField("addr", addr).Info("Serving discovery API...") - - metricsutil.ServeHTTPMetrics(logger, metricsAddr) - - var m nmmetrics.Metrics - if metricsAddr == "" { - m = nmmetrics.NewEmpty() - } else { - m = nmmetrics.NewVictoriaMetrics() + lvl, err := logging.LevelFromString(logLvl) + if err != nil { + mLogger.Fatal("Invalid log level") } - enableMetrics := metricsAddr != "" + logging.SetLevel(lvl) - nmSign, _ := cipher.SignPayload([]byte(conf.PK.Hex()), conf.SK) //nolint + logger := mLogger.PackageLogger(tag) - var nmConfig api.NetworkMonitorConfig - nmConfig.PK = conf.PK - nmConfig.SK = conf.SK - nmConfig.Sign = nmSign - nmConfig.BatchSize = batchSize + logger.WithField("addr", addr).Info("Serving DMSG-Monitor API...") - nmAPI := api.New(s, logger, srvURLs, enableMetrics, m, nmConfig) + monitorSign, _ := cipher.SignPayload([]byte(conf.PK.Hex()), conf.SK) //nolint - ctx, cancel := cmdutil.SignalContext(context.Background(), logger) - defer cancel() + var monitorConfig api.MonitorConfig + monitorConfig.PK = conf.PK + monitorConfig.Sign = monitorSign + monitorConfig.DMSG = dmsgURL + monitorConfig.UT = utURL + monitorConfig.AR = arURL - go nmAPI.InitDeregistrationLoop(ctx, conf, sleepDeregistration) + dmsgMonitorAPI := api.New(logger, monitorConfig) - go func() { - if err := tcpproxy.ListenAndServe(addr, nmAPI); err != nil { - logger.Errorf("serve: %v", err) - cancel() - } - }() + go dmsgMonitorAPI.InitDeregistrationLoop(sleepDeregistration) - <-ctx.Done() - if err := nmAPI.Visor.Close(); err != nil { - logger.WithError(err).Error("Visor closed with error.") + if err := tcpproxy.ListenAndServe(addr, dmsgMonitorAPI); err != nil { + logger.Errorf("serve: %v", err) } }, } diff --git a/cmd/network-monitor/network-monitor.go b/cmd/network-monitor/network-monitor.go index 02b1aea1..fb353c8a 100644 --- a/cmd/network-monitor/network-monitor.go +++ b/cmd/network-monitor/network-monitor.go @@ -11,7 +11,7 @@ import ( func init() { var helpflag bool commands.RootCmd.SetUsageTemplate(help) - commands.RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsgpty-cli") + commands.RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsg-monitor") commands.RootCmd.SetHelpCommand(&cobra.Command{Hidden: true}) commands.RootCmd.PersistentFlags().MarkHidden("help") //nolint } diff --git a/pkg/dmsg-monitor/api/api.go b/pkg/dmsg-monitor/api/api.go deleted file mode 100644 index b3afb5aa..00000000 --- a/pkg/dmsg-monitor/api/api.go +++ /dev/null @@ -1,410 +0,0 @@ -// Package api pkg/dmsg-monitor/api.go -package api - -import ( - "bytes" - "encoding/json" - "fmt" - "io" - "net/http" - "net/url" - "strings" - "sync" - "time" - - "github.com/go-chi/chi/v5" - "github.com/go-chi/chi/v5/middleware" - "github.com/sirupsen/logrus" - "github.com/skycoin/skywire-utilities/pkg/buildinfo" - "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/httputil" - "github.com/skycoin/skywire-utilities/pkg/logging" - utilenv "github.com/skycoin/skywire-utilities/pkg/skyenv" -) - -// API register all the API endpoints. -// It implements a net/http.Handler. -type API struct { - http.Handler - - dmsgURL string - utURL string - arURL string - logger logging.Logger - dMu sync.RWMutex - startedAt time.Time - - nmPk cipher.PubKey - nmSign cipher.Sig - whitelistedPKs map[string]bool -} - -// MonitorConfig is struct for Keys and Sign value of network monitor -type MonitorConfig struct { - PK cipher.PubKey - Sign cipher.Sig - DMSG string - UT string - AR string -} - -// HealthCheckResponse is struct of /health endpoint -type HealthCheckResponse struct { - BuildInfo *buildinfo.Info `json:"build_info,omitempty"` - StartedAt time.Time `json:"started_at,omitempty"` -} - -// Error is the object returned to the client when there's an error. -type Error struct { - Error string `json:"error"` -} - -// New returns a new *chi.Mux object, which can be started as a server -func New(logger *logging.Logger, monitorConfig MonitorConfig) *API { - - api := &API{ - logger: *logger, - startedAt: time.Now(), - nmPk: monitorConfig.PK, - nmSign: monitorConfig.Sign, - dmsgURL: monitorConfig.DMSG, - utURL: monitorConfig.UT, - arURL: monitorConfig.AR, - whitelistedPKs: whitelistedPKs(), - } - r := chi.NewRouter() - - r.Use(middleware.RequestID) - r.Use(middleware.RealIP) - r.Use(middleware.Logger) - r.Use(middleware.Recoverer) - r.Use(httputil.SetLoggerMiddleware(logger)) - r.Get("/health", api.health) - api.Handler = r - - return api -} - -func (api *API) health(w http.ResponseWriter, r *http.Request) { - info := buildinfo.Get() - api.writeJSON(w, r, http.StatusOK, HealthCheckResponse{ - BuildInfo: info, - StartedAt: api.startedAt, - }) -} - -func (api *API) writeJSON(w http.ResponseWriter, r *http.Request, code int, object interface{}) { - jsonObject, err := json.Marshal(object) - if err != nil { - api.log(r).WithError(err).Errorf("failed to encode json response") - w.WriteHeader(http.StatusInternalServerError) - - return - } - - w.Header().Set("Content-Type", "application/json") - w.WriteHeader(code) - - _, err = w.Write(jsonObject) - if err != nil { - api.log(r).WithError(err).Errorf("failed to write json response") - } -} - -func (api *API) log(r *http.Request) logrus.FieldLogger { - return httputil.GetLogger(r) -} - -// InitDeregistrationLoop is function which runs periodic background tasks of API. -func (api *API) InitDeregistrationLoop(sleepDeregistration time.Duration) { - deadDmsgCandidate := make(map[string]bool) - deadStcprCandidate := make(map[string]bool) - deadSudphCandidate := make(map[string]bool) - for { - api.deregister(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate) - time.Sleep(sleepDeregistration * time.Minute) - } -} - -// deregister use as routine to deregister old/dead entries in the network -func (api *API) deregister(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate map[string]bool) { - api.logger.Info("Deregistration routine started.") - defer api.dMu.Unlock() - api.dMu.Lock() - - // get uptimes data to check online/offline of visor based on uptime tracker - uptimes, err := getUptimeTracker(api.utURL) - if err != nil { - api.logger.Warnf("Error occur during get uptime tracker status list due to %s", err) - return - } - - api.networkDeregistration(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate, uptimes) - - api.logger.Info("Deregistration routine completed.") -} - -// dmsgDeregistration is a routine to deregister dead dmsg entries in dmsg discovery -func (api *API) networkDeregistration(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate, uptimes map[string]bool) { - conf := checkerConfig{ - wg: new(sync.WaitGroup), - locker: new(sync.Mutex), - uptimes: uptimes, - } - api.dmsgDeregistration(conf, deadDmsgCandidate) - api.arDeregistration(conf, deadStcprCandidate, deadSudphCandidate) -} - -func (api *API) dmsgDeregistration(conf checkerConfig, deadDmsgCandidate map[string]bool) { - api.logger.Info("DMSG deregistraion routine started.") - // get list of all dmsg clients, not servers - dmsgEntries, err := getDMSGEntries(api.dmsgURL) - if err != nil { - api.logger.Warnf("Error occur during get dmsg entries list due to %s", err) - return - } - - // DMSG deregistration - deadDmsg := []string{} - for _, entry := range dmsgEntries { - if _, ok := api.whitelistedPKs[entry]; !ok { - conf.wg.Add(1) - conf.entry = entry - go api.entryChecker(conf, deadDmsgCandidate, &deadDmsg) - } - } - conf.wg.Wait() - if len(deadDmsg) > 0 { - api.dmsgDeregister(deadDmsg) - } - api.logger.WithField("List of dead DMSG entries", deadDmsg).WithField("Number of dead DMSG entries", len(deadDmsg)).Info("DMSGD Deregistration completed.") - api.logger.Info("DMSG deregistraion routine completed.") -} - -func (api *API) arDeregistration(conf checkerConfig, deadStcprCandidate, deadSudphCandidate map[string]bool) { - api.logger.Info("AR deregistraion routine started.") - // get list of all ar entries - arEntries, err := getAREntries(api.arURL) - if err != nil { - api.logger.Warnf("error occur during get ar entries list due to %s", err) - return - } - - // STCPR deregistration - deadStcpr := []string{} - for _, entry := range arEntries.Stcpr { - if _, ok := api.whitelistedPKs[entry]; !ok { - conf.wg.Add(1) - conf.entry = entry - go api.entryChecker(conf, deadStcprCandidate, &deadStcpr) - } - } - conf.wg.Wait() - if len(deadStcpr) > 0 { - api.arDeregister(deadStcpr, "stcpr") - } - api.logger.WithField("list of dead stcpr entries", deadStcpr).WithField("number of dead stcpr entries", len(deadStcpr)).Info("stcpr deregistration completed.") - - // SUDPH deregistration - deadSudph := []string{} - for _, entry := range arEntries.Sudph { - if _, ok := api.whitelistedPKs[entry]; !ok { - conf.wg.Add(1) - conf.entry = entry - go api.entryChecker(conf, deadSudphCandidate, &deadSudph) - } - } - conf.wg.Wait() - if len(deadSudph) > 0 { - api.arDeregister(deadSudph, "sudph") - } - api.logger.WithField("list of dead sudph entries", deadSudph).WithField("number of dead sudph entries", len(deadSudph)).Info("sudph deregistration completed.") - api.logger.Info("AR deregistraion routine completed.") -} - -func (api *API) entryChecker(cfg checkerConfig, deadCandidate map[string]bool, deadEntries *[]string) { - defer cfg.wg.Done() - - key := cipher.PubKey{} - err := key.UnmarshalText([]byte(cfg.entry)) - if err != nil { - api.logger.Warnf("Error marshaling key: %s", err) - return - } - - if status, ok := cfg.uptimes[key.Hex()]; !ok || !status { - cfg.locker.Lock() - if _, ok := deadCandidate[key.Hex()]; ok { - *deadEntries = append(*deadEntries, key.Hex()) - delete(deadCandidate, key.Hex()) - } else { - deadCandidate[key.Hex()] = true - } - cfg.locker.Unlock() - } -} - -func (api *API) dmsgDeregister(keys []string) { - err := api.deregisterRequest(keys, api.dmsgURL+"/dmsg-discovery/deregister", "dmsg discovery") - if err != nil { - api.logger.Warn(err) - return - } - api.logger.Info("Deregister request send to DSMGD") -} - -func (api *API) arDeregister(keys []string, entryType string) { - err := api.deregisterRequest(keys, fmt.Sprintf(api.arURL+"/deregister/%s", entryType), "address resolver") - if err != nil { - api.logger.Warn(err) - return - } - api.logger.Info("Deregister request send to DSMGD") -} - -type checkerConfig struct { - entry string - uptimes map[string]bool - wg *sync.WaitGroup - locker *sync.Mutex -} - -// deregisterRequest is dereigstration handler for all services -func (api *API) deregisterRequest(keys []string, rawReqURL, service string) error { - reqURL, err := url.Parse(rawReqURL) - if err != nil { - return fmt.Errorf("Error on parsing deregistration URL : %v", err) - } - - jsonData, err := json.Marshal(keys) - if err != nil { - return fmt.Errorf("Error on parsing deregistration keys : %v", err) - } - body := bytes.NewReader(jsonData) - - req := &http.Request{ - Method: "DELETE", - URL: reqURL, - Header: map[string][]string{ - "NM-PK": {api.nmPk.Hex()}, - "NM-Sign": {api.nmSign.Hex()}, - }, - Body: io.NopCloser(body), - } - - res, err := http.DefaultClient.Do(req) - if err != nil { - return fmt.Errorf("Error on send deregistration request : %s", err) - } - defer res.Body.Close() //nolint - - if res.StatusCode != http.StatusOK { - return fmt.Errorf("Error deregister keys from %s : %s", service, err) - } - - return nil -} - -type clientList []string - -func getDMSGEntries(dmsgURL string) (data clientList, err error) { - res, err := http.Get(dmsgURL + "/dmsg-discovery/visorEntries") //nolint - - if err != nil { - return nil, err - } - - body, err := io.ReadAll(res.Body) - - if err != nil { - return nil, err - } - - err = json.Unmarshal(body, &data) - if err != nil { - return nil, err - } - return data, nil -} - -func getUptimeTracker(utURL string) (map[string]bool, error) { - response := make(map[string]bool) - res, err := http.Get(utURL) //nolint - if err != nil { - return response, err - } - - body, err := io.ReadAll(res.Body) - if err != nil { - return response, err - } - var data []uptimes - err = json.Unmarshal(body, &data) - if err != nil { - return response, err - } - - for _, visor := range data { - response[visor.Key] = visor.Online - } - - return response, nil -} - -type uptimes struct { - Key string `json:"key"` - Online bool `json:"online"` -} - -type visorTransports struct { - Sudph []string `json:"sudph"` - Stcpr []string `json:"stcpr"` -} - -func getAREntries(arURL string) (data visorTransports, err error) { - res, err := http.Get(arURL + "/transports") //nolint - if err != nil { - return visorTransports{}, err - } - - body, err := io.ReadAll(res.Body) - if err != nil { - return visorTransports{}, err - } - - err = json.Unmarshal(body, &data) - if err != nil { - return visorTransports{}, err - } - - return data, err -} - -func whitelistedPKs() map[string]bool { - whitelistedPKs := make(map[string]bool) - for _, pk := range strings.Split(utilenv.NetworkMonitorPKs, ",") { - whitelistedPKs[pk] = true - } - for _, pk := range strings.Split(utilenv.TestNetworkMonitorPKs, ",") { - whitelistedPKs[pk] = true - } - for _, pk := range strings.Split(utilenv.RouteSetupPKs, ",") { - whitelistedPKs[pk] = true - } - for _, pk := range strings.Split(utilenv.TestRouteSetupPKs, ",") { - whitelistedPKs[pk] = true - } - for _, pk := range strings.Split(utilenv.TPSetupPKs, ",") { - whitelistedPKs[pk] = true - } - for _, pk := range strings.Split(utilenv.TestTPSetupPKs, ",") { - whitelistedPKs[pk] = true - } - for _, pk := range strings.Split(utilenv.SurveyWhitelistPKs, ",") { - whitelistedPKs[pk] = true - } - for _, pk := range strings.Split(utilenv.RewardSystemPKs, ",") { - whitelistedPKs[pk] = true - } - return whitelistedPKs -} diff --git a/pkg/network-monitor/api/api.go b/pkg/network-monitor/api/api.go index f337fa26..a3b915dd 100644 --- a/pkg/network-monitor/api/api.go +++ b/pkg/network-monitor/api/api.go @@ -3,7 +3,6 @@ package api import ( "bytes" - "context" "encoding/json" "fmt" "io" @@ -20,16 +19,7 @@ import ( "github.com/skycoin/skywire-utilities/pkg/cipher" "github.com/skycoin/skywire-utilities/pkg/httputil" "github.com/skycoin/skywire-utilities/pkg/logging" - "github.com/skycoin/skywire-utilities/pkg/metricsutil" utilenv "github.com/skycoin/skywire-utilities/pkg/skyenv" - "github.com/skycoin/skywire/pkg/app/appserver" - "github.com/skycoin/skywire/pkg/transport/network" - "github.com/skycoin/skywire/pkg/visor" - "github.com/skycoin/skywire/pkg/visor/visorconfig" - - "github.com/skycoin/skywire-services/internal/nm" - "github.com/skycoin/skywire-services/internal/nmmetrics" - "github.com/skycoin/skywire-services/pkg/network-monitor/store" ) // API register all the API endpoints. @@ -37,46 +27,25 @@ import ( type API struct { http.Handler - Visor *visor.Visor - VisorSummaries map[cipher.PubKey]nm.VisorSummary - - visorDetails map[cipher.PubKey]visorDetails - sdURL string - arURL string - utURL string - logger logging.Logger - store store.Store - mu sync.RWMutex - dMu sync.RWMutex - startedAt time.Time - - reqsInFlightCountMiddleware *metricsutil.RequestsInFlightCountMiddleware - metrics nmmetrics.Metrics - nmPk cipher.PubKey - nmSk cipher.SecKey - nmSign cipher.Sig - batchSize int - whitelistedPKs map[string]bool -} - -type visorDetails struct { - IsOnline bool - IsStcpr bool -} + dmsgURL string + utURL string + arURL string + logger logging.Logger + dMu sync.RWMutex + startedAt time.Time -// NetworkMonitorConfig is struct for Keys and Sign value of NM -type NetworkMonitorConfig struct { - PK cipher.PubKey - SK cipher.SecKey - Sign cipher.Sig - BatchSize int + nmPk cipher.PubKey + nmSign cipher.Sig + whitelistedPKs map[string]bool } -// ServicesURLs is struct for organize URL of services -type ServicesURLs struct { - SD string - AR string - UT string +// MonitorConfig is struct for Keys and Sign value of network monitor +type MonitorConfig struct { + PK cipher.PubKey + Sign cipher.Sig + DMSG string + UT string + AR string } // HealthCheckResponse is struct of /health endpoint @@ -91,24 +60,17 @@ type Error struct { } // New returns a new *chi.Mux object, which can be started as a server -func New(s store.Store, logger *logging.Logger, srvURLs ServicesURLs, enableMetrics bool, m nmmetrics.Metrics, nmConfig NetworkMonitorConfig) *API { +func New(logger *logging.Logger, monitorConfig MonitorConfig) *API { api := &API{ - VisorSummaries: make(map[cipher.PubKey]nm.VisorSummary), - visorDetails: make(map[cipher.PubKey]visorDetails), - sdURL: srvURLs.SD, - arURL: srvURLs.AR, - utURL: srvURLs.UT, - logger: *logger, - store: s, - startedAt: time.Now(), - reqsInFlightCountMiddleware: metricsutil.NewRequestsInFlightCountMiddleware(), - metrics: m, - nmPk: nmConfig.PK, - nmSk: nmConfig.SK, - nmSign: nmConfig.Sign, - batchSize: nmConfig.BatchSize, - whitelistedPKs: whitelistedPKs(), + logger: *logger, + startedAt: time.Now(), + nmPk: monitorConfig.PK, + nmSign: monitorConfig.Sign, + dmsgURL: monitorConfig.DMSG, + utURL: monitorConfig.UT, + arURL: monitorConfig.AR, + whitelistedPKs: whitelistedPKs(), } r := chi.NewRouter() @@ -116,28 +78,13 @@ func New(s store.Store, logger *logging.Logger, srvURLs ServicesURLs, enableMetr r.Use(middleware.RealIP) r.Use(middleware.Logger) r.Use(middleware.Recoverer) - if enableMetrics { - r.Use(api.reqsInFlightCountMiddleware.Handle) - r.Use(metricsutil.RequestDurationMiddleware) - } r.Use(httputil.SetLoggerMiddleware(logger)) - r.Get("/status", api.getStatus) r.Get("/health", api.health) api.Handler = r return api } -func (api *API) getStatus(w http.ResponseWriter, r *http.Request) { - data, err := api.store.GetAllSummaries() - if err != nil { - api.logger.WithError(err).Warnf("Error Getting all summaries") - } - if err := json.NewEncoder(w).Encode(data); err != nil { - api.writeError(w, r, err) - } -} - func (api *API) health(w http.ResponseWriter, r *http.Request) { info := buildinfo.Get() api.writeJSON(w, r, http.StatusOK, HealthCheckResponse{ @@ -164,66 +111,27 @@ func (api *API) writeJSON(w http.ResponseWriter, r *http.Request, code int, obje } } -// ServeHTTP implements http.Handler. -func (api *API) writeError(w http.ResponseWriter, r *http.Request, err error) { - var status int - - if err == context.DeadlineExceeded { - status = http.StatusRequestTimeout - } - - // we still haven't found the error - if status == 0 { - if _, ok := err.(*json.SyntaxError); ok { - status = http.StatusBadRequest - } - } - - // we fallback to 500 - if status == 0 { - status = http.StatusInternalServerError - } - - if status != http.StatusNotFound { - api.log(r).Warnf("%d: %s", status, err) - } - - w.WriteHeader(status) - w.Header().Set("Content-Type", "application/json") - if err := json.NewEncoder(w).Encode(&Error{Error: err.Error()}); err != nil { - api.log(r).WithError(err).Warn("Failed to encode error") - } -} - func (api *API) log(r *http.Request) logrus.FieldLogger { return httputil.GetLogger(r) } // InitDeregistrationLoop is function which runs periodic background tasks of API. -func (api *API) InitDeregistrationLoop(ctx context.Context, conf *visorconfig.V1, sleepDeregistration time.Duration) { - // Start a visor - api.startVisor(ctx, conf) - +func (api *API) InitDeregistrationLoop(sleepDeregistration time.Duration) { + deadDmsgCandidate := make(map[string]bool) + deadStcprCandidate := make(map[string]bool) + deadSudphCandidate := make(map[string]bool) for { - select { - case <-ctx.Done(): - return - default: - api.deregister(ctx) - time.Sleep(sleepDeregistration * time.Minute) - } + api.deregister(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate) + time.Sleep(sleepDeregistration * time.Minute) } } // deregister use as routine to deregister old/dead entries in the network -func (api *API) deregister(ctx context.Context) { - api.logger.Info("Deregistration routine start.") +func (api *API) deregister(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate map[string]bool) { + api.logger.Info("Deregistration routine started.") defer api.dMu.Unlock() api.dMu.Lock() - // reload keys - api.getVisorKeys() - // get uptimes data to check online/offline of visor based on uptime tracker uptimes, err := getUptimeTracker(api.utURL) if err != nil { @@ -231,167 +139,134 @@ func (api *API) deregister(ctx context.Context) { return } - api.arDeregistration(ctx, uptimes) + api.networkDeregistration(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate, uptimes) api.logger.Info("Deregistration routine completed.") - - // reload keys - api.getVisorKeys() } -// arDeregistration is a routine to deregister dead entries in address resolver transports -func (api *API) arDeregistration(ctx context.Context, uptimes map[string]bool) { - api.logger.Info("AR Deregistration started.") - allSudphCount, allStcprCount := 0, 0 - arKeys := make(map[cipher.PubKey]visorDetails) - for key, details := range api.visorDetails { - arKeys[key] = details - - if details.IsStcpr { - allStcprCount++ - } - if details.IsOnline { - allSudphCount++ - } - } - if len(arKeys) == 0 { - api.logger.Warn("No visor keys found") - return - } - - checkRes := arCheckerResult{ - deadStcpr: &[]string{}, - deadSudph: &[]string{}, - } - - checkConf := arChekerConfig{ - ctx: ctx, +// dmsgDeregistration is a routine to deregister dead dmsg entries in dmsg discovery +func (api *API) networkDeregistration(deadDmsgCandidate, deadStcprCandidate, deadSudphCandidate, uptimes map[string]bool) { + conf := checkerConfig{ wg: new(sync.WaitGroup), + locker: new(sync.Mutex), uptimes: uptimes, } + api.dmsgDeregistration(conf, deadDmsgCandidate) + api.arDeregistration(conf, deadStcprCandidate, deadSudphCandidate) +} - tmpBatchSize := 0 - for key, details := range arKeys { - if _, ok := api.whitelistedPKs[key.Hex()]; ok { - continue - } - tmpBatchSize++ - checkConf.wg.Add(1) - checkConf.key = key - checkConf.details = details - go api.arChecker(checkConf, &checkRes) - if tmpBatchSize == api.batchSize { - time.Sleep(time.Minute) - tmpBatchSize = 0 - } +func (api *API) dmsgDeregistration(conf checkerConfig, deadDmsgCandidate map[string]bool) { + api.logger.Info("DMSG deregistraion routine started.") + // get list of all dmsg clients, not servers + dmsgEntries, err := getDMSGEntries(api.dmsgURL) + if err != nil { + api.logger.Warnf("Error occur during get dmsg entries list due to %s", err) + return } - checkConf.wg.Wait() - - stcprCounter := int64(allStcprCount - len(*checkRes.deadStcpr)) - sudphCounter := int64(allSudphCount - len(*checkRes.deadSudph)) - api.logger.WithField("sudph", sudphCounter).WithField("stcpr", stcprCounter).Info("Transports online.") - api.metrics.SetTpCount(stcprCounter, sudphCounter) - - if len(*checkRes.deadStcpr) > 0 { - api.arDeregister(*checkRes.deadStcpr, "stcpr") + // DMSG deregistration + deadDmsg := []string{} + for _, entry := range dmsgEntries { + if _, ok := api.whitelistedPKs[entry]; !ok { + conf.wg.Add(1) + conf.entry = entry + go api.entryChecker(conf, deadDmsgCandidate, &deadDmsg) + } } - api.logger.WithField("Number of dead Stcpr", len(*checkRes.deadStcpr)).WithField("PKs", checkRes.deadStcpr).Info("STCPR deregistration complete.") - - if len(*checkRes.deadSudph) > 0 { - api.arDeregister(*checkRes.deadSudph, "sudph") + conf.wg.Wait() + if len(deadDmsg) > 0 { + api.dmsgDeregister(deadDmsg) } - api.logger.WithField("Number of dead Sudph", len(*checkRes.deadSudph)).WithField("PKs", checkRes.deadSudph).Info("SUDPH deregistration complete.") - - api.logger.Info("AR Deregistration completed.") + api.logger.WithField("List of dead DMSG entries", deadDmsg).WithField("Number of dead DMSG entries", len(deadDmsg)).Info("DMSGD Deregistration completed.") + api.logger.Info("DMSG deregistraion routine completed.") } -func (api *API) arChecker(cfg arChekerConfig, res *arCheckerResult) { - defer cfg.wg.Done() - visorSum, err := api.store.GetVisorByPk(cfg.key.String()) +func (api *API) arDeregistration(conf checkerConfig, deadStcprCandidate, deadSudphCandidate map[string]bool) { + api.logger.Info("AR deregistraion routine started.") + // get list of all ar entries + arEntries, err := getAREntries(api.arURL) if err != nil { - api.logger.WithError(err).Debugf("Failed to fetch visor summary of PK %s in AR deregister procces.", cfg.key.Hex()) - if err != store.ErrVisorSumNotFound { - return - } + api.logger.Warnf("error occur during get ar entries list due to %s", err) + return } - stcprC := make(chan bool) - sudphC := make(chan bool) - if cfg.details.IsStcpr { - go api.testTransport(cfg.key, network.STCPR, stcprC) + // STCPR deregistration + deadStcpr := []string{} + for _, entry := range arEntries.Stcpr { + if _, ok := api.whitelistedPKs[entry]; !ok { + conf.wg.Add(1) + conf.entry = entry + go api.entryChecker(conf, deadStcprCandidate, &deadStcpr) + } } - if cfg.details.IsOnline { - go api.testTransport(cfg.key, network.SUDPH, sudphC) + conf.wg.Wait() + if len(deadStcpr) > 0 { + api.arDeregister(deadStcpr, "stcpr") } + api.logger.WithField("list of dead stcpr entries", deadStcpr).WithField("number of dead stcpr entries", len(deadStcpr)).Info("stcpr deregistration completed.") - if cfg.details.IsStcpr { - visorSum.Stcpr = <-stcprC - } - if cfg.details.IsOnline { - visorSum.Sudph = <-sudphC + // SUDPH deregistration + deadSudph := []string{} + for _, entry := range arEntries.Sudph { + if _, ok := api.whitelistedPKs[entry]; !ok { + conf.wg.Add(1) + conf.entry = entry + go api.entryChecker(conf, deadSudphCandidate, &deadSudph) + } } - visorSum.Timestamp = time.Now().Unix() - api.mu.Lock() - err = api.store.AddVisorSummary(cfg.ctx, cfg.key, visorSum) - if err != nil { - api.logger.WithError(err).Warnf("Failed to save Visor summary of %v", cfg.key) + conf.wg.Wait() + if len(deadSudph) > 0 { + api.arDeregister(deadSudph, "sudph") } + api.logger.WithField("list of dead sudph entries", deadSudph).WithField("number of dead sudph entries", len(deadSudph)).Info("sudph deregistration completed.") + api.logger.Info("AR deregistraion routine completed.") +} - if cfg.details.IsStcpr && !visorSum.Stcpr { - *res.deadStcpr = append(*res.deadStcpr, cfg.key.Hex()) - } +func (api *API) entryChecker(cfg checkerConfig, deadCandidate map[string]bool, deadEntries *[]string) { + defer cfg.wg.Done() - if cfg.details.IsOnline && !visorSum.Sudph { - *res.deadSudph = append(*res.deadSudph, cfg.key.Hex()) + key := cipher.PubKey{} + err := key.UnmarshalText([]byte(cfg.entry)) + if err != nil { + api.logger.Warnf("Error marshaling key: %s", err) + return } - api.mu.Unlock() -} - -func (api *API) testTransport(key cipher.PubKey, transport network.Type, ch chan bool) { - var isUp bool - retrier := 3 - for retrier > 0 { - tp, err := api.Visor.AddTransport(key, string(transport), time.Second*3) - if err != nil { - api.logger.WithField("Retry", 4-retrier).WithError(err).Warnf("Failed to establish %v transport to %v", transport, key) - retrier-- - continue - } - - api.logger.Infof("Established %v transport to %v", transport, key) - isUp = true - err = api.Visor.RemoveTransport(tp.ID) - if err != nil { - api.logger.Warnf("Error removing %v transport of %v: %v", transport, key, err) + if status, ok := cfg.uptimes[key.Hex()]; !ok || !status { + cfg.locker.Lock() + if _, ok := deadCandidate[key.Hex()]; ok { + *deadEntries = append(*deadEntries, key.Hex()) + delete(deadCandidate, key.Hex()) + } else { + deadCandidate[key.Hex()] = true } - retrier = 0 + cfg.locker.Unlock() } - - ch <- isUp } -func (api *API) arDeregister(keys []string, transport string) { - err := api.deregisterRequest(keys, fmt.Sprintf(api.arURL+"/deregister/%s", transport), "address resolver") +func (api *API) dmsgDeregister(keys []string) { + err := api.deregisterRequest(keys, api.dmsgURL+"/dmsg-discovery/deregister", "dmsg discovery") if err != nil { api.logger.Warn(err) return } - api.logger.Info("Deregister request send to AR") + api.logger.Info("Deregister request send to DSMGD") } -type arChekerConfig struct { - ctx context.Context - wg *sync.WaitGroup - key cipher.PubKey - details visorDetails - uptimes map[string]bool +func (api *API) arDeregister(keys []string, entryType string) { + err := api.deregisterRequest(keys, fmt.Sprintf(api.arURL+"/deregister/%s", entryType), "address resolver") + if err != nil { + api.logger.Warn(err) + return + } + api.logger.Info("Deregister request send to DSMGD") } -type arCheckerResult struct { - deadStcpr *[]string - deadSudph *[]string +type checkerConfig struct { + entry string + uptimes map[string]bool + wg *sync.WaitGroup + locker *sync.Mutex } // deregisterRequest is dereigstration handler for all services @@ -430,28 +305,26 @@ func (api *API) deregisterRequest(keys []string, rawReqURL, service string) erro return nil } -type visorTransports struct { - Sudph []cipher.PubKey `json:"sudph"` - Stcpr []cipher.PubKey `json:"stcpr"` -} +type clientList []string -func getVisors(arURL string) (data visorTransports, err error) { - res, err := http.Get(arURL + "/transports") //nolint +func getDMSGEntries(dmsgURL string) (data clientList, err error) { + res, err := http.Get(dmsgURL + "/dmsg-discovery/visorEntries") //nolint if err != nil { - return visorTransports{}, err + return nil, err } body, err := io.ReadAll(res.Body) if err != nil { - return visorTransports{}, err + return nil, err } + err = json.Unmarshal(body, &data) if err != nil { - return visorTransports{}, err + return nil, err } - return data, err + return data, nil } func getUptimeTracker(utURL string) (map[string]bool, error) { @@ -483,103 +356,28 @@ type uptimes struct { Online bool `json:"online"` } -func (api *API) getVisorKeys() { - api.visorDetails = make(map[cipher.PubKey]visorDetails) - visorTs, err := getVisors(api.arURL) - if err != nil { - api.logger.Warnf("Error while fetching visors: %v", err) - return - } - if len(visorTs.Stcpr) == 0 && len(visorTs.Sudph) == 0 { - api.logger.Warn("No visors found... Will try again") - } - for _, visorPk := range visorTs.Stcpr { - if visorPk != api.nmPk { - detail := api.visorDetails[visorPk] - detail.IsStcpr = true - api.visorDetails[visorPk] = detail - } - } - for _, visorPk := range visorTs.Sudph { - if visorPk != api.nmPk { - detail := api.visorDetails[visorPk] - detail.IsOnline = true - api.visorDetails[visorPk] = detail - } - } - - api.logger.WithField("visors", len(api.visorDetails)).Info("Visor keys updated.") - api.metrics.SetTotalVisorCount(int64(len(api.visorDetails))) -} - -func (api *API) startVisor(ctx context.Context, conf *visorconfig.V1) { - conf.SetLogger(logging.NewMasterLogger()) - v, ok := visor.NewVisor(ctx, conf) - if !ok { - api.logger.Fatal("Failed to start visor.") - } - api.Visor = v +type visorTransports struct { + Sudph []string `json:"sudph"` + Stcpr []string `json:"stcpr"` } -// InitConfig to initilise config -func InitConfig(confPath string, mLog *logging.MasterLogger) *visorconfig.V1 { - log := mLog.PackageLogger("network_monitor:config") - log.Info("Reading config from file.") - log.WithField("filepath", confPath).Info() - - oldConf, err := visorconfig.ReadFile(confPath) - if err != nil { - log.WithError(err).Fatal("Failed to read config file.") - } - var testEnv bool - if oldConf.Dmsg.Discovery == utilenv.TestDmsgDiscAddr { - testEnv = true - } - // have same services as old config - services := &visorconfig.Services{ - DmsgDiscovery: oldConf.Dmsg.Discovery, - TransportDiscovery: oldConf.Transport.Discovery, - AddressResolver: oldConf.Transport.AddressResolver, - RouteFinder: oldConf.Routing.RouteFinder, - RouteSetupNodes: oldConf.Routing.RouteSetupNodes, - UptimeTracker: oldConf.UptimeTracker.Addr, - ServiceDiscovery: oldConf.Launcher.ServiceDisc, - } - // update oldconfig - conf, err := visorconfig.MakeDefaultConfig(mLog, &oldConf.SK, false, false, testEnv, false, false, confPath, "", services) +func getAREntries(arURL string) (data visorTransports, err error) { + res, err := http.Get(arURL + "/transports") //nolint if err != nil { - log.WithError(err).Fatal("Failed to create config.") + return visorTransports{}, err } - // have the same apps that the old config had - var newConfLauncherApps []appserver.AppConfig - for _, app := range conf.Launcher.Apps { - for _, oldApp := range oldConf.Launcher.Apps { - if app.Name == oldApp.Name { - newConfLauncherApps = append(newConfLauncherApps, app) - } - } + body, err := io.ReadAll(res.Body) + if err != nil { + return visorTransports{}, err } - conf.Launcher.Apps = newConfLauncherApps - - conf.Version = oldConf.Version - conf.LocalPath = oldConf.LocalPath - conf.Launcher.BinPath = oldConf.Launcher.BinPath - conf.Launcher.ServerAddr = oldConf.Launcher.ServerAddr - conf.CLIAddr = oldConf.CLIAddr - conf.Transport.TransportSetupPKs = oldConf.Transport.TransportSetupPKs - // following services are not needed - conf.STCP = nil - conf.Dmsgpty = nil - conf.Transport.PublicAutoconnect = false - - // save the config file - if err := conf.Flush(); err != nil { - log.WithError(err).Fatal("Failed to flush config to file.") + err = json.Unmarshal(body, &data) + if err != nil { + return visorTransports{}, err } - return conf + return data, err } func whitelistedPKs() map[string]bool { @@ -590,7 +388,23 @@ func whitelistedPKs() map[string]bool { for _, pk := range strings.Split(utilenv.TestNetworkMonitorPKs, ",") { whitelistedPKs[pk] = true } - whitelistedPKs[utilenv.RouteSetupPKs] = true - whitelistedPKs[utilenv.TestRouteSetupPKs] = true + for _, pk := range strings.Split(utilenv.RouteSetupPKs, ",") { + whitelistedPKs[pk] = true + } + for _, pk := range strings.Split(utilenv.TestRouteSetupPKs, ",") { + whitelistedPKs[pk] = true + } + for _, pk := range strings.Split(utilenv.TPSetupPKs, ",") { + whitelistedPKs[pk] = true + } + for _, pk := range strings.Split(utilenv.TestTPSetupPKs, ",") { + whitelistedPKs[pk] = true + } + for _, pk := range strings.Split(utilenv.SurveyWhitelistPKs, ",") { + whitelistedPKs[pk] = true + } + for _, pk := range strings.Split(utilenv.RewardSystemPKs, ",") { + whitelistedPKs[pk] = true + } return whitelistedPKs } diff --git a/pkg/network-monitor/store/memory_store.go b/pkg/network-monitor/store/memory_store.go deleted file mode 100644 index a94fb184..00000000 --- a/pkg/network-monitor/store/memory_store.go +++ /dev/null @@ -1,59 +0,0 @@ -// Package store pkg/network-monitor/store/memory_store.go -package store - -import ( - "context" - "errors" - "sync" - - "github.com/skycoin/skywire-utilities/pkg/cipher" - - "github.com/skycoin/skywire-services/internal/nm" -) - -type memStore struct { - visorSummaries map[string]*nm.VisorSummary - mu sync.RWMutex -} - -// newMemoryStore creates new uptimes memory store. -func newMemoryStore() Store { - return &memStore{ - visorSummaries: make(map[string]*nm.VisorSummary), - } -} - -func (s *memStore) AddVisorSummary(_ context.Context, key cipher.PubKey, visorSum *nm.VisorSummary) error { - s.mu.Lock() - defer s.mu.Unlock() - s.visorSummaries[key.String()] = visorSum - - return nil -} - -func (s *memStore) GetVisorByPk(pk string) (entry *nm.VisorSummary, err error) { - s.mu.Lock() - defer s.mu.Unlock() - sum, ok := s.visorSummaries[pk] - if !ok { - return &nm.VisorSummary{}, errors.New("No visor entry found") - } - return sum, nil -} - -func (s *memStore) GetAllSummaries() (map[string]nm.Summary, error) { - s.mu.Lock() - defer s.mu.Unlock() - response := make(map[string]nm.Summary) - - for key, visorSum := range s.visorSummaries { - response[key] = nm.Summary{ - Visor: visorSum, - } - } - return response, nil -} - -func (s *memStore) Close() { - -} diff --git a/pkg/network-monitor/store/memory_store_test.go b/pkg/network-monitor/store/memory_store_test.go deleted file mode 100644 index 1c6ba0be..00000000 --- a/pkg/network-monitor/store/memory_store_test.go +++ /dev/null @@ -1,13 +0,0 @@ -//go:build !no_ci -// +build !no_ci - -package store - -import ( - "testing" -) - -func TestMemory(t *testing.T) { - s := newMemoryStore() - testNetwork(t, s) -} diff --git a/pkg/network-monitor/store/redis_store.go b/pkg/network-monitor/store/redis_store.go deleted file mode 100644 index 06a4197f..00000000 --- a/pkg/network-monitor/store/redis_store.go +++ /dev/null @@ -1,101 +0,0 @@ -package store - -import ( - "context" - "encoding/json" - "fmt" - "log" - "strings" - - "github.com/go-redis/redis" - "github.com/skycoin/skywire-utilities/pkg/cipher" - - "github.com/skycoin/skywire-services/internal/nm" -) - -const ( - networkKey = "nm" - visorKey = "visor" -) - -type redisStore struct { - client *redis.Client -} - -func newRedisStore(addr, password string, poolSize int) (*redisStore, error) { - opt, err := redis.ParseURL(addr) - if err != nil { - return nil, fmt.Errorf("addr: %w", err) - } - - opt.Password = password - if poolSize != 0 { - opt.PoolSize = poolSize - } - redisCl := redis.NewClient(opt) - - if err := redisCl.Ping().Err(); err != nil { - log.Fatalf("Failed to connect to Redis cluster: %v", err) - } - - return &redisStore{redisCl}, nil -} - -func (s *redisStore) AddVisorSummary(_ context.Context, key cipher.PubKey, visorSum *nm.VisorSummary) error { - - data, err := json.Marshal(visorSum) - if err != nil { - return err - } - - if _, err := s.client.Set(nmKey(visorKey, key.String()), string(data), 0).Result(); err != nil { - return err - } - - return nil -} - -func (s *redisStore) GetVisorByPk(pk string) (entry *nm.VisorSummary, err error) { - data, err := s.client.Get(nmKey(visorKey, pk)).Result() - if err != nil { - return &nm.VisorSummary{}, ErrVisorSumNotFound - } - - if err := json.Unmarshal([]byte(data), &entry); err != nil { - return nil, err - } - - return entry, nil -} - -func (s *redisStore) GetAllSummaries() (map[string]nm.Summary, error) { - var visorKeys []string - var err error - response := make(map[string]nm.Summary) - - visorKeys, err = s.client.Keys(s.searchKey(visorKey)).Result() - if err != nil { - return response, err - } - - for _, visorKey := range visorKeys { - key := strings.Split(visorKey, ":") - vSum, err := s.GetVisorByPk(key[2]) - if err != nil { - return response, err - } - response[key[2]] = nm.Summary{ - Visor: vSum, - } - } - - return response, err -} - -func nmKey(t string, key string) string { - return fmt.Sprintf("nm:%v:%s", t, key) -} - -func (s *redisStore) searchKey(key string) string { - return fmt.Sprintf("*%v:%v*", networkKey, key) -} diff --git a/pkg/network-monitor/store/redis_store_test.go b/pkg/network-monitor/store/redis_store_test.go deleted file mode 100644 index 3824f395..00000000 --- a/pkg/network-monitor/store/redis_store_test.go +++ /dev/null @@ -1,54 +0,0 @@ -//go:build !no_ci -// +build !no_ci - -package store - -import ( - "context" - "testing" - "time" - - "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/stretchr/testify/require" - - "github.com/skycoin/skywire-services/internal/nm" -) - -func testNetwork(t *testing.T, store Store) { - visorSumObj := make(map[cipher.PubKey]*nm.VisorSummary) - - const iterations = 3 - for i := 0; i < iterations; i++ { - visorSummary := &nm.VisorSummary{ - Sudph: true, - Stcpr: true, - Timestamp: time.Now().Unix(), - } - pk, _ := cipher.GenerateKeyPair() - visorSumObj[pk] = visorSummary - } - - conn := context.Background() - - t.Run("add visor summaries", func(t *testing.T) { - for pk, sum := range visorSumObj { - err := store.AddVisorSummary(conn, pk, sum) - require.NoError(t, err) - } - }) - - t.Run("all summaries", func(t *testing.T) { - summaries, err := store.GetAllSummaries() - require.NoError(t, err) - require.Len(t, summaries, 3) - }) - - t.Run("specific visor summary by pub key", func(t *testing.T) { - for pk, sum := range visorSumObj { - summary, err := store.GetVisorByPk(pk.String()) - require.NoError(t, err) - require.Equal(t, summary, sum) - } - }) - -} diff --git a/pkg/network-monitor/store/store.go b/pkg/network-monitor/store/store.go deleted file mode 100644 index b53fee1a..00000000 --- a/pkg/network-monitor/store/store.go +++ /dev/null @@ -1,40 +0,0 @@ -package store - -import ( - "context" - "errors" - - "github.com/skycoin/skywire-utilities/pkg/cipher" - "github.com/skycoin/skywire-utilities/pkg/storeconfig" - - "github.com/skycoin/skywire-services/internal/nm" -) - -var ( - // ErrVisorSumNotFound indicates that requested visor summary is not registered. - ErrVisorSumNotFound = errors.New("Visor summary not found") -) - -// Store stores Transport metadata and generated nonce values. -type Store interface { - TransportStore -} - -// TransportStore stores Transport metadata. -type TransportStore interface { - AddVisorSummary(context.Context, cipher.PubKey, *nm.VisorSummary) error - GetVisorByPk(string) (*nm.VisorSummary, error) - GetAllSummaries() (map[string]nm.Summary, error) -} - -// New constructs a new Store of requested type. -func New(config storeconfig.Config) (Store, error) { - switch config.Type { - case storeconfig.Memory: - return newMemoryStore(), nil - case storeconfig.Redis: - return newRedisStore(config.URL, config.Password, config.PoolSize) - default: - return nil, errors.New("unknown store type") - } -} From bccb0ae49afc1ff8aad3b3812dc2fe60af3cd582 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 11:52:16 +0000 Subject: [PATCH 05/12] fix docker configs for new combined network monitor --- docker/config/dmsg-monitor.json | 51 -------------------- docker/config/network-monitor.json | 59 ++++-------------------- docker/docker-compose.yml | 29 ++---------- docker/docker_build.sh | 8 ---- docker/docker_clean.sh | 1 - docker/docker_push.sh | 1 - docker/images/dmsg-monitor/Dockerfile | 34 -------------- docker/images/network-monitor/Dockerfile | 6 +-- 8 files changed, 14 insertions(+), 175 deletions(-) delete mode 100644 docker/config/dmsg-monitor.json delete mode 100644 docker/images/dmsg-monitor/Dockerfile diff --git a/docker/config/dmsg-monitor.json b/docker/config/dmsg-monitor.json deleted file mode 100644 index 318d6e03..00000000 --- a/docker/config/dmsg-monitor.json +++ /dev/null @@ -1,51 +0,0 @@ -{ - "version": "v1.3.10", - "sk": "66475bf8ce6140792ffed0cba14490a93d9b033ca1f1811ca88d69df72c76d2f", - "pk": "02704c20a061d0a97f2ac8efc9e7c96fcf9bb54fcdfcc96bec83fb4b9b3b2f7d3f", - "dmsg": { - "discovery": "http://dmsg-discovery:9090", - "sessions_count": 1, - "servers": [] - }, - "transport": { - "discovery": "http://transport-discovery:9091", - "address_resolver": "http://address-resolver:9093", - "public_autoconnect": false, - "transport_setup": null, - "log_store": { - "type": "file", - "location": "./local/transport_logs", - "rotation_interval": "168h0m0s" - }, - "stcpr_port": 0, - "sudph_port": 0 - }, - "routing": { - "route_setup_nodes": [ - "0324579f003e6b4048bae2def4365e634d8e0e3054a20fc7af49daf2a179658557" - ], - "route_finder": "http://route-finder:9092", - "route_finder_timeout": "10s", - "min_hops": 0 - }, - "uptime_tracker": { - "addr": "http://uptime-tracker:9096" - }, - "launcher": { - "service_discovery": "http://service-discovery:9098", - "apps": null, - "server_addr": "localhost:5510", - "bin_path": "./release", - "display_node_ip": false - }, - "survey_whitelist": null, - "hypervisors": [], - "cli_addr": "localhost:3435", - "log_level": "info", - "local_path": "./local/network-monitor", - "dmsghttp_server_path": "./local/custom", - "stun_servers": null, - "shutdown_timeout": "10s", - "is_public": false, - "persistent_transports": null -} \ No newline at end of file diff --git a/docker/config/network-monitor.json b/docker/config/network-monitor.json index e35cc507..58a442a9 100644 --- a/docker/config/network-monitor.json +++ b/docker/config/network-monitor.json @@ -1,51 +1,10 @@ { - "version": "v1.3.10", - "sk": "6e4afd961a83a04690e0562e0b7688c52e05efcbf8ef5748f3a15a6001d7fb01", - "pk": "027306a7dee4908adb0f3a828a2482cbc889e0a061b1320fd6a60a37bfc6b5f747", - "dmsg": { - "discovery": "http://dmsg-discovery:9090", - "sessions_count": 1, - "servers": [] - }, - "transport": { - "discovery": "http://transport-discovery:9091", - "address_resolver": "http://address-resolver:9093", - "public_autoconnect": false, - "transport_setup": null, - "log_store": { - "type": "file", - "location": "./local/transport_logs", - "rotation_interval": "168h0m0s" - }, - "stcpr_port": 0, - "sudph_port": 0 - }, - "routing": { - "route_setup_nodes": [ - "0324579f003e6b4048bae2def4365e634d8e0e3054a20fc7af49daf2a179658557" - ], - "route_finder": "http://route-finder:9092", - "route_finder_timeout": "10s", - "min_hops": 0 - }, - "uptime_tracker": { - "addr": "http://uptime-tracker:9096" - }, - "launcher": { - "service_discovery": "http://service-discovery:9098", - "apps": null, - "server_addr": "localhost:5510", - "bin_path": "./release", - "display_node_ip": false - }, - "survey_whitelist": null, - "hypervisors": [], - "cli_addr": "localhost:3435", - "log_level": "info", - "local_path": "./local/network-monitor", - "dmsghttp_server_path": "./local/custom", - "stun_servers": null, - "shutdown_timeout": "10s", - "is_public": false, - "persistent_transports": null -} \ No newline at end of file + "sk":"f9fee80460daa7b1648753d5145f99a0869c3c22c747ce9e3b9c7626671263d8", + "pk":"03ba70a1430a3c3d0ffef56a309139cae32809de06f930421b0f7cdb909238491f", + "dmsg_url":"http://dmsgd.skywire.skycoin.com", + "ut_url":"http://ut.skywire.skycoin.com", + "ar_url":"http://ar.skywire.skycoin.com", + "sleep_deregistration":1, + "addr":":8080", + "log_level":"debug" +} diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index cf85559c..a546294b 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -461,29 +461,6 @@ services: image: "${REGISTRY}/network-monitor:${DOCKER_TAG}" hostname: network-monitor container_name: "network-monitor" - networks: - srv: - ipv4_address: 175.0.0.30 - intra: - ipv4_address: 174.0.0.30 - ports: - - "9080:9080" - entrypoint: /release/network-monitor --redis redis://nm-redis:6379 --config /release/network-monitor.json - depends_on: - - nm-redis - - visor-c - volumes: - - type: bind - source: ../docker/config/network-monitor.json - target: /release/network-monitor.json - read_only: false - stdin_open: true # docker run -i - tty: true # docker run -t - - dmsg-monitor: - image: "${REGISTRY}/dmsg-monitor:${DOCKER_TAG}" - hostname: dmsg-monitor - container_name: "dmsg-monitor" networks: srv: ipv4_address: 175.0.0.31 @@ -491,12 +468,12 @@ services: ipv4_address: 174.0.0.31 volumes: - type: bind - source: ../docker/config/dmsg-monitor.json - target: /release/dmsg-monitor.json + source: ../docker/config/network-monitor.json + target: /release/network-monitor.json read_only: false ports: - "9081:9081" - entrypoint: /release/dmsg-monitor -d http://dmsg-discovery:9090 -u http://uptime-tracker:9096 -a :9081 --config /release/dmsg-monitor.json + entrypoint: /release/network-monitor --dmsg-url http://dmsg-discovery:9090 --ut-url http://uptime-tracker:9096 --ar-url http://address-resolver:9093 -a :9081 --config /release/network-monitor.json depends_on: - visor-c restart: always diff --git a/docker/docker_build.sh b/docker/docker_build.sh index a96f20d3..3ec0cd33 100755 --- a/docker/docker_build.sh +++ b/docker/docker_build.sh @@ -237,14 +237,6 @@ DOCKER_BUILDKIT="$bldkit" docker build -f docker/images/public-visor-monitor/Doc $platform \ -t "$registry"/public-visor-monitor:"$image_tag" . -echo "building dmsg monitor image" -DOCKER_BUILDKIT="$bldkit" docker build -f docker/images/dmsg-monitor/Dockerfile \ - --build-arg base_image="$base_image" \ - --build-arg build_opts="$go_buildopts" \ - --build-arg image_tag="$image_tag" \ - $platform \ - -t "$registry"/dmsg-monitor:"$image_tag" . - echo "building tpd monitor image" DOCKER_BUILDKIT="$bldkit" docker build -f docker/images/tpd-monitor/Dockerfile \ --build-arg base_image="$base_image" \ diff --git a/docker/docker_clean.sh b/docker/docker_clean.sh index 931bda42..92946c39 100755 --- a/docker/docker_clean.sh +++ b/docker/docker_clean.sh @@ -22,7 +22,6 @@ declare -a images_arr=( "skycoin/liveness-checker:${image_tag}" "skycoin/vpn-monitor:${image_tag}" "skycoin/public-visor-monitor:${image_tag}" - "skycoin/dmsg-monitor:${image_tag}" "skycoin/tpd-monitor:${image_tag}" "skycoin/transport-setup:${image_tag}" "skycoin/skysocks-monitor:${image_tag}" diff --git a/docker/docker_push.sh b/docker/docker_push.sh index a8d57434..4fad3210 100755 --- a/docker/docker_push.sh +++ b/docker/docker_push.sh @@ -26,7 +26,6 @@ declare -a images_arr=( "liveness-checker" "vpn-monitor" "public-visor-monitor" - "dmsg-monitor" "tpd-monitor" "transport-setup" "skysocks-monitor" diff --git a/docker/images/dmsg-monitor/Dockerfile b/docker/images/dmsg-monitor/Dockerfile deleted file mode 100644 index 85847326..00000000 --- a/docker/images/dmsg-monitor/Dockerfile +++ /dev/null @@ -1,34 +0,0 @@ -ARG image_tag -ARG base_image - -FROM ${base_image} as builder - -ARG build_opts - -COPY . /skywire-services -WORKDIR /skywire-services - -RUN go build "${build_opts}" -o /release/dmsg-monitor ./cmd/dmsg-monitor - -FROM alpine as prod - -WORKDIR /release -COPY --from=builder /release/dmsg-monitor /release/dmsg-monitor -ENTRYPOINT ["/release/dmsg-monitor"] - -FROM prod as test - -# OS image -FROM alpine as e2e -WORKDIR /release - -COPY ./docker/common/install-prequisites.sh /release/install-prequisites.sh -RUN sh -c /release/install-prequisites.sh cert-only \ - && rm -rf /release/install-prequisites.sh - -COPY --from=builder /release/dmsg-monitor /release/dmsg-monitor -ENTRYPOINT ["/release/dmsg-monitor"] - -FROM e2e as integration - -FROM ${image_tag} diff --git a/docker/images/network-monitor/Dockerfile b/docker/images/network-monitor/Dockerfile index 459d346d..f855ec6d 100644 --- a/docker/images/network-monitor/Dockerfile +++ b/docker/images/network-monitor/Dockerfile @@ -8,12 +8,11 @@ ARG build_opts COPY . /skywire-services WORKDIR /skywire-services -RUN go build "${build_opts}" -o /release/network-monitor ./cmd/network-monitor && \ - go build "${build_opts}" -o /release/vpn-client ./cmd/vpn-lite-client +RUN go build "${build_opts}" -o /release/network-monitor ./cmd/network-monitor FROM alpine as prod + WORKDIR /release -COPY --from=builder /release/vpn-client /apps/vpn-client COPY --from=builder /release/network-monitor /release/network-monitor ENTRYPOINT ["/release/network-monitor"] @@ -27,7 +26,6 @@ COPY ./docker/common/install-prequisites.sh /release/install-prequisites.sh RUN sh -c /release/install-prequisites.sh cert-only \ && rm -rf /release/install-prequisites.sh -COPY --from=builder /release/vpn-client /release/vpn-client COPY --from=builder /release/network-monitor /release/network-monitor ENTRYPOINT ["/release/network-monitor"] From 622a4667665ed3686dd0c546f17379faee887b63 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 12:19:44 +0000 Subject: [PATCH 06/12] fix removed dmsg monitor from skywire-services cmd --- cmd/skywire-services/services.go | 3 --- 1 file changed, 3 deletions(-) diff --git a/cmd/skywire-services/services.go b/cmd/skywire-services/services.go index 8f441692..6946cc8e 100644 --- a/cmd/skywire-services/services.go +++ b/cmd/skywire-services/services.go @@ -13,7 +13,6 @@ import ( ar "github.com/skycoin/skywire-services/cmd/address-resolver/commands" confbs "github.com/skycoin/skywire-services/cmd/config-bootstrapper/commands" - dmsgm "github.com/skycoin/skywire-services/cmd/dmsg-monitor/commands" kg "github.com/skycoin/skywire-services/cmd/keys-gen/commands" lc "github.com/skycoin/skywire-services/cmd/liveness-checker/commands" nv "github.com/skycoin/skywire-services/cmd/node-visualizer/commands" @@ -38,7 +37,6 @@ func init() { nv.RootCmd, pvm.RootCmd, se.RootCmd, - dmsgm.RootCmd, ) tpd.RootCmd.Use = "tpd" tps.RootCmd.Use = "tps" @@ -51,7 +49,6 @@ func init() { nv.RootCmd.Use = "nv" pvm.RootCmd.Use = "pvm" se.RootCmd.Use = "se" - dmsgm.RootCmd.Use = "dmsgm" var helpflag bool RootCmd.SetUsageTemplate(help) From ea94699aab7f4f00ae0eb93bf149b927b682f527 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 12:25:38 +0000 Subject: [PATCH 07/12] fix issue on ut url --- cmd/network-monitor/commands/root.go | 30 +++++++++++----------------- pkg/network-monitor/api/api.go | 2 +- pkg/node-visualizer/web/.env | 2 +- 3 files changed, 14 insertions(+), 20 deletions(-) diff --git a/cmd/network-monitor/commands/root.go b/cmd/network-monitor/commands/root.go index 0d175db9..a882023b 100644 --- a/cmd/network-monitor/commands/root.go +++ b/cmd/network-monitor/commands/root.go @@ -7,7 +7,6 @@ import ( "os" "path/filepath" "strings" - "time" "github.com/skycoin/skywire-utilities/pkg/buildinfo" "github.com/skycoin/skywire-utilities/pkg/cipher" @@ -20,19 +19,17 @@ import ( ) var ( - confPath string - dmsgURL string - utURL string - arURL string - addr string - tag string - logLvl string - sleepDeregistration time.Duration + confPath string + dmsgURL string + utURL string + arURL string + addr string + tag string + logLvl string ) func init() { RootCmd.Flags().StringVarP(&addr, "addr", "a", "", "address to bind to.\033[0m") - RootCmd.Flags().DurationVarP(&sleepDeregistration, "sleep-deregistration", "s", 0, "Sleep time for derigstration process in minutes\033[0m") RootCmd.Flags().StringVar(&dmsgURL, "dmsg-url", "", "url to dmsg data.\033[0m") RootCmd.Flags().StringVar(&utURL, "ut-url", "", "url to uptime tracker visor data.\033[0m") RootCmd.Flags().StringVar(&arURL, "ar-url", "", "url to ar data.\033[0m") @@ -72,7 +69,7 @@ var RootCmd = &cobra.Command{ dmsgURL = conf.DMSGUrl } if utURL == "" { - utURL = conf.UTUrl + "/uptimes" + utURL = conf.UTUrl } if arURL == "" { arURL = conf.ARUrl @@ -80,9 +77,6 @@ var RootCmd = &cobra.Command{ if addr == "" { addr = conf.Addr } - if sleepDeregistration == 0 { - sleepDeregistration = conf.SleepDeregistration - } if logLvl == "" { logLvl = conf.LogLevel } @@ -95,7 +89,7 @@ var RootCmd = &cobra.Command{ logger := mLogger.PackageLogger(tag) - logger.WithField("addr", addr).Info("Serving DMSG-Monitor API...") + logger.WithField("addr", addr).Info("Serving Network-Monitor API...") monitorSign, _ := cipher.SignPayload([]byte(conf.PK.Hex()), conf.SK) //nolint @@ -106,11 +100,11 @@ var RootCmd = &cobra.Command{ monitorConfig.UT = utURL monitorConfig.AR = arURL - dmsgMonitorAPI := api.New(logger, monitorConfig) + networkMonitorAPI := api.New(logger, monitorConfig) - go dmsgMonitorAPI.InitDeregistrationLoop(sleepDeregistration) + go networkMonitorAPI.InitDeregistrationLoop(conf.SleepDeregistration) - if err := tcpproxy.ListenAndServe(addr, dmsgMonitorAPI); err != nil { + if err := tcpproxy.ListenAndServe(addr, networkMonitorAPI); err != nil { logger.Errorf("serve: %v", err) } }, diff --git a/pkg/network-monitor/api/api.go b/pkg/network-monitor/api/api.go index a3b915dd..98e50f7f 100644 --- a/pkg/network-monitor/api/api.go +++ b/pkg/network-monitor/api/api.go @@ -329,7 +329,7 @@ func getDMSGEntries(dmsgURL string) (data clientList, err error) { func getUptimeTracker(utURL string) (map[string]bool, error) { response := make(map[string]bool) - res, err := http.Get(utURL) //nolint + res, err := http.Get(utURL + "/uptimes") //nolint if err != nil { return response, err } diff --git a/pkg/node-visualizer/web/.env b/pkg/node-visualizer/web/.env index 9f1fdb5b..a978aa45 100644 --- a/pkg/node-visualizer/web/.env +++ b/pkg/node-visualizer/web/.env @@ -1 +1 @@ -REACT_APP_SKY_NODEVIZ_URL=https://nv.skywire.dev/map +REACT_APP_SKY_NODEVIZ_URL=https://localhost:9081/map From 4eb401b09b5771833510e2b2aa26dc9204971d3d Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 12:25:50 +0000 Subject: [PATCH 08/12] remove useless dmsgmonitor commands --- Makefile | 5 +---- 1 file changed, 1 insertion(+), 4 deletions(-) diff --git a/Makefile b/Makefile index befef9e0..885d7a66 100644 --- a/Makefile +++ b/Makefile @@ -47,7 +47,7 @@ export REGISTRY=${DOCKER_REGISTRY} dep: ## Sorts dependencies # GO111MODULE=on GOPRIVATE=github.com/skycoin/* go get -v github.com/skycoin/skywire@master GO111MODULE=on GOPRIVATE=github.com/skycoin/* go mod vendor -v - yarn --cwd ./pkg/node-visualizer/web install + # yarn --cwd ./pkg/node-visualizer/web install format: dep ## Formats the code. Must have goimports and goimports-reviser installed (use make install-linters). goimports -w -local github.com/skycoin/skywire-services ./pkg @@ -69,7 +69,6 @@ build: dep ## Build binaries ${OPTS} go build ${BUILD_OPTS} -o ./bin/transport-setup ./cmd/transport-setup ${OPTS} go build ${BUILD_OPTS} -o ./bin/config-bootstrapper ./cmd/config-bootstrapper ${OPTS} go build ${BUILD_OPTS} -o ./bin/liveness-checker ./cmd/liveness-checker - ${OPTS} go build ${BUILD_OPTS} -o ./bin/dmsg-monitor ./cmd/dmsg-monitor ${OPTS} go build ${BUILD_OPTS} -o ./bin/tpd-monitor ./cmd/tpd-monitor ${OPTS} go build ${BUILD_OPTS} -o ./bin/vpn-monitor ./cmd/vpn-monitor ${OPTS} go build ${BUILD_OPTS} -o ./bin/skysocks-monitor ./cmd/skysocks-monitor @@ -90,7 +89,6 @@ build-deploy: ## Build for deployment Docker images ${DOCKER_OPTS} go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o ./release/vpn-client ./cmd/vpn-lite-client ${DOCKER_OPTS} go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o ./release/transport-setup ./cmd/transport-setup ${DOCKER_OPTS} go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o ./release/node-visualizer ./cmd/node-visualizer - ${DOCKER_OPTS} go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o ./release/dmsg-monitor ./cmd/dmsg-monitor ${DOCKER_OPTS} go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o ./release/tpd-monitor ./cmd/tpd-monitor ${DOCKER_OPTS} go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o ./release/vpn-monitor ./cmd/vpn-monitor ${DOCKER_OPTS} go build ${BUILD_OPTS_DEPLOY} -mod=vendor -o /release/skysocks-monitor ./cmd/skysocks-monitor @@ -109,7 +107,6 @@ build-race: dep ## Build binaries ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/vpn-client ./cmd/vpn-lite-client ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/transport-setup ./cmd/transport-setup ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/node-visualizer ./cmd/node-visualizer - ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/dmsg-monitor ./cmd/dmsg-monitor ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/tpd-monitor ./cmd/tpd-monitor ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/vpn-monitor ./cmd/vpn-monitor ${OPTS} go build ${BUILD_OPTS} -race -o ./bin/skysocks-monitor ./cmd/skysocks-monitor From a3757f2a83aff2dd8005ec05b06c4430dad4d2c3 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 12:32:04 +0000 Subject: [PATCH 09/12] fix network-monitor.json config for e2e and integration test --- docker/config/network-monitor.json | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/docker/config/network-monitor.json b/docker/config/network-monitor.json index 58a442a9..fbf9c973 100644 --- a/docker/config/network-monitor.json +++ b/docker/config/network-monitor.json @@ -1,10 +1,10 @@ { "sk":"f9fee80460daa7b1648753d5145f99a0869c3c22c747ce9e3b9c7626671263d8", "pk":"03ba70a1430a3c3d0ffef56a309139cae32809de06f930421b0f7cdb909238491f", - "dmsg_url":"http://dmsgd.skywire.skycoin.com", - "ut_url":"http://ut.skywire.skycoin.com", - "ar_url":"http://ar.skywire.skycoin.com", - "sleep_deregistration":1, - "addr":":8080", + "dmsg_url":"http://dmsg-discovery:9090", + "ut_url":"http://uptime-tracker:9096", + "ar_url":"http://address-resolver:9093", + "sleep_deregistration":10, + "addr":":9081", "log_level":"debug" } From c4e140f68b1ced90fa4a9d491a4af29ffb1d2d43 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 12:48:00 +0000 Subject: [PATCH 10/12] remove useless nm-redis from docker-compose config --- docker/docker-compose.yml | 14 -------------- 1 file changed, 14 deletions(-) diff --git a/docker/docker-compose.yml b/docker/docker-compose.yml index a546294b..0b8b156c 100644 --- a/docker/docker-compose.yml +++ b/docker/docker-compose.yml @@ -99,20 +99,6 @@ services: depends_on: - hoster - nm-redis: - image: "redis:alpine" - networks: - srv: - ipv4_address: 175.0.0.254 - intra: - ipv4_address: 174.0.0.254 - ports: - - "6383:6379" - container_name: "nm-redis" - hostname: nm-redis - depends_on: - - hoster - ut-redis: image: "redis:alpine" networks: From 5cd587ac462847798df828b0567aabcdc1cd3aa0 Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 12:52:57 +0000 Subject: [PATCH 11/12] remove dmsg-monitor from env_test cases --- internal/integration/env_test.go | 1 - 1 file changed, 1 deletion(-) diff --git a/internal/integration/env_test.go b/internal/integration/env_test.go index 05941bff..0346d007 100644 --- a/internal/integration/env_test.go +++ b/internal/integration/env_test.go @@ -65,7 +65,6 @@ func NewEnv() *TestEnv { "/address-resolver", "/service-discovery", "/network-monitor", - "/dmsg-monitor", "/tpd-monitor", "/vpn-monitor", "/public-visor-monitor", From 0de62c84ed824ac9aa96bcafb4e22588288e8eec Mon Sep 17 00:00:00 2001 From: Mohammed Date: Tue, 26 Mar 2024 12:54:09 +0000 Subject: [PATCH 12/12] fix removed dmsg-monitor codes --- cmd/network-monitor/network-monitor.go | 2 +- internal/monitors/{lig.go => lib.go} | 2 +- pkg/tpd-monitor/api/api.go | 2 +- 3 files changed, 3 insertions(+), 3 deletions(-) rename internal/monitors/{lig.go => lib.go} (96%) diff --git a/cmd/network-monitor/network-monitor.go b/cmd/network-monitor/network-monitor.go index fb353c8a..c3e4de5a 100644 --- a/cmd/network-monitor/network-monitor.go +++ b/cmd/network-monitor/network-monitor.go @@ -11,7 +11,7 @@ import ( func init() { var helpflag bool commands.RootCmd.SetUsageTemplate(help) - commands.RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for dmsg-monitor") + commands.RootCmd.PersistentFlags().BoolVarP(&helpflag, "help", "h", false, "help for network-monitor") commands.RootCmd.SetHelpCommand(&cobra.Command{Hidden: true}) commands.RootCmd.PersistentFlags().MarkHidden("help") //nolint } diff --git a/internal/monitors/lig.go b/internal/monitors/lib.go similarity index 96% rename from internal/monitors/lig.go rename to internal/monitors/lib.go index 47cf644e..0111b7e8 100644 --- a/internal/monitors/lig.go +++ b/internal/monitors/lib.go @@ -12,7 +12,7 @@ import ( "github.com/skycoin/skywire-utilities/pkg/cipher" ) -// MonitorConfig is the structure of dmsg-monitor's config +// MonitorConfig is the structure of monitor's config type MonitorConfig struct { SK cipher.SecKey `json:"sk,omitempty"` PK cipher.PubKey `json:"pk,omitempty"` diff --git a/pkg/tpd-monitor/api/api.go b/pkg/tpd-monitor/api/api.go index 60fce30b..ea964007 100644 --- a/pkg/tpd-monitor/api/api.go +++ b/pkg/tpd-monitor/api/api.go @@ -1,4 +1,4 @@ -// Package api pkg/dmsg-monitor/api.go +// Package api pkg/tpd-monitor/api.go package api import (