Skip to content

Commit

Permalink
added running speedtests and
Browse files Browse the repository at this point in the history
  • Loading branch information
sagostin committed Jul 13, 2024
1 parent efbaece commit a93b91e
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 67 deletions.
135 changes: 97 additions & 38 deletions probes/speedtest.go
Original file line number Diff line number Diff line change
@@ -1,57 +1,116 @@
package probes

import (
"github.com/showwin/speedtest-go/speedtest"
"github.com/showwin/speedtest-go/speedtest/transport"
"strconv"
"time"
)

type SpeedTestResult struct {
Latency time.Duration `json:"latency"bson:"latency"`
DLSpeed float64 `json:"dl_speed"bson:"dl_speed"`
ULSpeed float64 `json:"ul_speed"bson:"ul_speed"`
Server string `json:"server"bson:"server"`
Host string `json:"host"bson:"host"`
Timestamp time.Time `json:"timestamp"bson:"timestamp"`
TestData []speedtest.Server `json:"test_data"`
Timestamp time.Time `json:"timestamp" bson:"timestamp"`
}

type Server struct {
URL string `xml:"url,attr" json:"url"`
Lat string `xml:"lat,attr" json:"lat"`
Lon string `xml:"lon,attr" json:"lon"`
Name string `xml:"name,attr" json:"name"`
Country string `xml:"country,attr" json:"country"`
Sponsor string `xml:"sponsor,attr" json:"sponsor"`
ID string `xml:"id,attr" json:"id"`
Host string `xml:"host,attr" json:"host"`
Distance float64 `json:"distance"`
Latency time.Duration `json:"latency"`
MaxLatency time.Duration `json:"max_latency"`
MinLatency time.Duration `json:"min_latency"`
Jitter time.Duration `json:"jitter"`
DLSpeed ByteRate `json:"dl_speed"`
ULSpeed ByteRate `json:"ul_speed"`
TestDuration TestDuration `json:"test_duration"`
PacketLoss transport.PLoss `json:"packet_loss"`
}

type ByteRate float64

type TestDuration struct {
Ping *time.Duration `json:"ping"`
Download *time.Duration `json:"download"`
Upload *time.Duration `json:"upload"`
Total *time.Duration `json:"total"`
}

type PLoss struct {
Sent int `json:"sent"` // Number of sent packets acknowledged by the remote.
Dup int `json:"dup"` // Number of duplicate packets acknowledged by the remote.
Max int `json:"max"` // The maximum index value received by the remote.
}

func SpeedTest(cd *Probe) (SpeedTestResult, error) {
var s1 SpeedTestResult
//user, err := speedtest.FetchUserInfo()
/*if err != nil {
return s1, err
}*/
/*serverList, err := speedtest.FetchServers(user)
if err != nil {
return s1, err
}
targets, err := serverList.FindServer([]int{})
if err != nil {
return s1, err
}
var s1 []speedtest.Server
var speedtestClient = speedtest.New()

if len(targets) <= 0 {
return s1, errors.New("unable to reach Ookla")
}
// Use a proxy for the speedtest. eg: socks://127.0.0.1:7890
// speedtest.WithUserConfig(&speedtest.UserConfig{Proxy: "socks://127.0.0.1:7890"})(speedtestClient)

// Select a network card as the data interface.
// speedtest.WithUserConfig(&speedtest.UserConfig{Source: "192.168.1.101"})(speedtestClient)

// Get user's network information
// user, _ := speedtestClient.FetchUserInfo()

// Get a list of servers near a specified location
// user.SetLocationByCity("Tokyo")
// user.SetLocation("Osaka", 34.6952, 135.5006)

mainT := targets[0]
// Search server using serverID.
// eg: fetch server with ID 28910.
// speedtest.ErrServerNotFound will be returned if the server cannot be found.
// server, err := speedtest.FetchServerByID("28910")

// todo make this direct p2p connections for testing between agents??
serverList, _ := speedtestClient.FetchServers()
var targets []*speedtest.Server

err := mainT.PingTest()
if err != nil {
return SpeedTestResult{}, err
primaryTarget := cd.Config.Target[0].Target
if cd.Config.Target[0].Target == "" {
targets2, _ := serverList.FindServer([]int{})
targets = append(targets, targets2...)

} else if primaryTarget != "" && primaryTarget != "expired" && primaryTarget != "ok" {
atoi, err := strconv.Atoi(cd.Config.Target[0].Target)
if err != nil {
return SpeedTestResult{}, err
}
targets2, _ := serverList.FindServer([]int{atoi})
targets = append(targets, targets2...)
}
err := mainT.DownloadTest(false)
if err != nil {
return SpeedTestResult{}, err

for _, s := range targets {
// Please make sure your host can access this test server,
// otherwise you will get an error.
// It is recommended to replace a server at this time
err := s.PingTest(nil)
if err != nil {
return SpeedTestResult{}, err
}
err = s.DownloadTest()
if err != nil {
return SpeedTestResult{}, err
}
err = s.UploadTest()
if err != nil {
return SpeedTestResult{}, err
}

s1 = append(s1, *s)
s.Context.Reset() // reset counter
}
mainT.UploadTest(false)

s1.Latency = mainT.Latency
s1.DLSpeed = mainT.DLSpeed
s1.ULSpeed = mainT.ULSpeed
s1.Server = mainT.Name
s1.Host = mainT.Host
s1.Timestamp = time.Now()*/
result := SpeedTestResult{
TestData: s1,
Timestamp: time.Now(),
}

return s1, nil
return result, nil
}
47 changes: 18 additions & 29 deletions workers/probe.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,8 @@ func contains(ids []primitive.ObjectID, id primitive.ObjectID) bool {
return false
}

var speedTestRunning = false

var trafficSimServer *probes.TrafficSim
var trafficSimClients []*probes.TrafficSim

Expand Down Expand Up @@ -218,11 +220,8 @@ func startCheckWorker(id primitive.ObjectID, dataChan chan probes.ProbeData, thi
Data: mtr,
}

//fmt.Println("Sending apiClient to the channel (Sysinfo) for ", agentCheck.Config.Interval, "...")
dC <- cD
//fmt.Println("sleeping for " + strconv.Itoa(agentCheck.Config.Interval) + " minutes")
time.Sleep(time.Duration(agentCheck.Config.Interval) * time.Minute)
// todo push
continue
case probes.ProbeType_MTR:
log.Info("MTR: Running test for ", agentCheck.Config.Target[0].Target, "...")
Expand All @@ -231,22 +230,13 @@ func startCheckWorker(id primitive.ObjectID, dataChan chan probes.ProbeData, thi
fmt.Println(err)
}

/*m, err := json.Marshal(mtr)
if err != nil {
fmt.Print(err)
}*/

cD := probes.ProbeData{
ProbeID: agentCheck.ID,
Triggered: false,
Data: mtr,
}

//fmt.Println("Sending apiClient to the channel (MTR) for ", agentCheck.Config.Interval, "...")
dC <- cD
//fmt.Println("sleeping for " + strconv.Itoa(agentCheck.Config.Interval) + " minutes")
time.Sleep(time.Duration(agentCheck.Config.Interval) * time.Minute)
// todo push
continue
/*case probes.ProbeType_RPERF:
// if check says its a server, start a iperf server based on the bind and port provided in target
Expand Down Expand Up @@ -283,25 +273,17 @@ func startCheckWorker(id primitive.ObjectID, dataChan chan probes.ProbeData, thi
continue*/
case probes.ProbeType_SPEEDTEST:
// todo make this dynamic and on demand
/*if agentCheck.Config.Pending {
if !speedTestRunning {
fmt.Println("Running speed test...")
speedtest, err := checks.SpeedTest(&agentCheck)
speedTestResult, err := probes.SpeedTest(&agentCheck)
if err != nil {
fmt.Println(err)
log.Error(err)
return
}

m, err := json.Marshal(speedtest)
if err != nil {
fmt.Print(err)
}
cD := api.CheckData{
Target: agentCheck.Target,
CheckID: agentCheck.ID,
AgentID: agentCheck.AgentID,
Result: string(m),
Type: api.CtSpeedtest,
cD := probes.ProbeData{
ProbeID: agentCheck.ID,
Data: speedTestResult,
}

dC <- cD
Expand All @@ -310,15 +292,22 @@ func startCheckWorker(id primitive.ObjectID, dataChan chan probes.ProbeData, thi
//todo preventing it from being in the configuration after
//time.Sleep(time.Minute * 5)
//}
}*/
continue
}
return
case probes.ProbeType_SPEEDTEST_SERVERS:
// todo make this dynamic and on demand
var speedtestClient = speedtest.New()
serverList, _ := speedtestClient.FetchServers()
//targets, _ := serverList.FindServer([]int{})
// todo ship this off to the backend so we can display "speedtest" servers near the agent, and periodically refresh the options
log.Warn(serverList)

cD := probes.ProbeData{
ProbeID: agentCheck.ID,
Data: serverList,
}

dC <- cD

break
case probes.ProbeType_PING:
log.Infof("Ping: Running test for %v...", agentCheck.Config.Target[0].Target)
Expand Down

0 comments on commit a93b91e

Please sign in to comment.