diff --git a/probes/speedtest.go b/probes/speedtest.go index b7797cd..daf5824 100644 --- a/probes/speedtest.go +++ b/probes/speedtest.go @@ -1,57 +1,116 @@ package probes import ( + "github.com/showwin/speedtest-go/speedtest" + "github.com/showwin/speedtest-go/speedtest/transport" + "strconv" "time" ) type SpeedTestResult struct { - Latency time.Duration `json:"latency"bson:"latency"` - DLSpeed float64 `json:"dl_speed"bson:"dl_speed"` - ULSpeed float64 `json:"ul_speed"bson:"ul_speed"` - Server string `json:"server"bson:"server"` - Host string `json:"host"bson:"host"` - Timestamp time.Time `json:"timestamp"bson:"timestamp"` + TestData []speedtest.Server `json:"test_data"` + Timestamp time.Time `json:"timestamp" bson:"timestamp"` +} + +type Server struct { + URL string `xml:"url,attr" json:"url"` + Lat string `xml:"lat,attr" json:"lat"` + Lon string `xml:"lon,attr" json:"lon"` + Name string `xml:"name,attr" json:"name"` + Country string `xml:"country,attr" json:"country"` + Sponsor string `xml:"sponsor,attr" json:"sponsor"` + ID string `xml:"id,attr" json:"id"` + Host string `xml:"host,attr" json:"host"` + Distance float64 `json:"distance"` + Latency time.Duration `json:"latency"` + MaxLatency time.Duration `json:"max_latency"` + MinLatency time.Duration `json:"min_latency"` + Jitter time.Duration `json:"jitter"` + DLSpeed ByteRate `json:"dl_speed"` + ULSpeed ByteRate `json:"ul_speed"` + TestDuration TestDuration `json:"test_duration"` + PacketLoss transport.PLoss `json:"packet_loss"` +} + +type ByteRate float64 + +type TestDuration struct { + Ping *time.Duration `json:"ping"` + Download *time.Duration `json:"download"` + Upload *time.Duration `json:"upload"` + Total *time.Duration `json:"total"` +} + +type PLoss struct { + Sent int `json:"sent"` // Number of sent packets acknowledged by the remote. + Dup int `json:"dup"` // Number of duplicate packets acknowledged by the remote. + Max int `json:"max"` // The maximum index value received by the remote. } func SpeedTest(cd *Probe) (SpeedTestResult, error) { - var s1 SpeedTestResult - //user, err := speedtest.FetchUserInfo() - /*if err != nil { - return s1, err - }*/ - /*serverList, err := speedtest.FetchServers(user) - if err != nil { - return s1, err - } - targets, err := serverList.FindServer([]int{}) - if err != nil { - return s1, err - } + var s1 []speedtest.Server + var speedtestClient = speedtest.New() - if len(targets) <= 0 { - return s1, errors.New("unable to reach Ookla") - } + // Use a proxy for the speedtest. eg: socks://127.0.0.1:7890 + // speedtest.WithUserConfig(&speedtest.UserConfig{Proxy: "socks://127.0.0.1:7890"})(speedtestClient) + + // Select a network card as the data interface. + // speedtest.WithUserConfig(&speedtest.UserConfig{Source: "192.168.1.101"})(speedtestClient) + + // Get user's network information + // user, _ := speedtestClient.FetchUserInfo() + + // Get a list of servers near a specified location + // user.SetLocationByCity("Tokyo") + // user.SetLocation("Osaka", 34.6952, 135.5006) - mainT := targets[0] + // Search server using serverID. + // eg: fetch server with ID 28910. + // speedtest.ErrServerNotFound will be returned if the server cannot be found. + // server, err := speedtest.FetchServerByID("28910") - // todo make this direct p2p connections for testing between agents?? + serverList, _ := speedtestClient.FetchServers() + var targets []*speedtest.Server - err := mainT.PingTest() - if err != nil { - return SpeedTestResult{}, err + primaryTarget := cd.Config.Target[0].Target + if cd.Config.Target[0].Target == "" { + targets2, _ := serverList.FindServer([]int{}) + targets = append(targets, targets2...) + + } else if primaryTarget != "" && primaryTarget != "expired" && primaryTarget != "ok" { + atoi, err := strconv.Atoi(cd.Config.Target[0].Target) + if err != nil { + return SpeedTestResult{}, err + } + targets2, _ := serverList.FindServer([]int{atoi}) + targets = append(targets, targets2...) } - err := mainT.DownloadTest(false) - if err != nil { - return SpeedTestResult{}, err + + for _, s := range targets { + // Please make sure your host can access this test server, + // otherwise you will get an error. + // It is recommended to replace a server at this time + err := s.PingTest(nil) + if err != nil { + return SpeedTestResult{}, err + } + err = s.DownloadTest() + if err != nil { + return SpeedTestResult{}, err + } + err = s.UploadTest() + if err != nil { + return SpeedTestResult{}, err + } + + s1 = append(s1, *s) + s.Context.Reset() // reset counter } - mainT.UploadTest(false) - s1.Latency = mainT.Latency - s1.DLSpeed = mainT.DLSpeed - s1.ULSpeed = mainT.ULSpeed - s1.Server = mainT.Name - s1.Host = mainT.Host - s1.Timestamp = time.Now()*/ + result := SpeedTestResult{ + TestData: s1, + Timestamp: time.Now(), + } - return s1, nil + return result, nil } diff --git a/workers/probe.go b/workers/probe.go index cef39a7..e606820 100644 --- a/workers/probe.go +++ b/workers/probe.go @@ -123,6 +123,8 @@ func contains(ids []primitive.ObjectID, id primitive.ObjectID) bool { return false } +var speedTestRunning = false + var trafficSimServer *probes.TrafficSim var trafficSimClients []*probes.TrafficSim @@ -218,11 +220,8 @@ func startCheckWorker(id primitive.ObjectID, dataChan chan probes.ProbeData, thi Data: mtr, } - //fmt.Println("Sending apiClient to the channel (Sysinfo) for ", agentCheck.Config.Interval, "...") dC <- cD - //fmt.Println("sleeping for " + strconv.Itoa(agentCheck.Config.Interval) + " minutes") time.Sleep(time.Duration(agentCheck.Config.Interval) * time.Minute) - // todo push continue case probes.ProbeType_MTR: log.Info("MTR: Running test for ", agentCheck.Config.Target[0].Target, "...") @@ -231,22 +230,13 @@ func startCheckWorker(id primitive.ObjectID, dataChan chan probes.ProbeData, thi fmt.Println(err) } - /*m, err := json.Marshal(mtr) - if err != nil { - fmt.Print(err) - }*/ - cD := probes.ProbeData{ ProbeID: agentCheck.ID, Triggered: false, Data: mtr, } - - //fmt.Println("Sending apiClient to the channel (MTR) for ", agentCheck.Config.Interval, "...") dC <- cD - //fmt.Println("sleeping for " + strconv.Itoa(agentCheck.Config.Interval) + " minutes") time.Sleep(time.Duration(agentCheck.Config.Interval) * time.Minute) - // todo push continue /*case probes.ProbeType_RPERF: // if check says its a server, start a iperf server based on the bind and port provided in target @@ -283,25 +273,17 @@ func startCheckWorker(id primitive.ObjectID, dataChan chan probes.ProbeData, thi continue*/ case probes.ProbeType_SPEEDTEST: // todo make this dynamic and on demand - /*if agentCheck.Config.Pending { + if !speedTestRunning { fmt.Println("Running speed test...") - speedtest, err := checks.SpeedTest(&agentCheck) + speedTestResult, err := probes.SpeedTest(&agentCheck) if err != nil { - fmt.Println(err) + log.Error(err) return } - m, err := json.Marshal(speedtest) - if err != nil { - fmt.Print(err) - } - - cD := api.CheckData{ - Target: agentCheck.Target, - CheckID: agentCheck.ID, - AgentID: agentCheck.AgentID, - Result: string(m), - Type: api.CtSpeedtest, + cD := probes.ProbeData{ + ProbeID: agentCheck.ID, + Data: speedTestResult, } dC <- cD @@ -310,15 +292,22 @@ func startCheckWorker(id primitive.ObjectID, dataChan chan probes.ProbeData, thi //todo preventing it from being in the configuration after //time.Sleep(time.Minute * 5) //} - }*/ - continue + } + return case probes.ProbeType_SPEEDTEST_SERVERS: // todo make this dynamic and on demand var speedtestClient = speedtest.New() serverList, _ := speedtestClient.FetchServers() //targets, _ := serverList.FindServer([]int{}) // todo ship this off to the backend so we can display "speedtest" servers near the agent, and periodically refresh the options - log.Warn(serverList) + + cD := probes.ProbeData{ + ProbeID: agentCheck.ID, + Data: serverList, + } + + dC <- cD + break case probes.ProbeType_PING: log.Infof("Ping: Running test for %v...", agentCheck.Config.Target[0].Target)