Skip to content

Commit

Permalink
added retryer for traffic sim and tested mtr with winders
Browse files Browse the repository at this point in the history
  • Loading branch information
sagostin committed Jul 12, 2024
1 parent f8c36b8 commit de53aad
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 70 deletions.
2 changes: 1 addition & 1 deletion env.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ const (
defaultConfig = "HOST=https://api.netwatcher.io\nHOST_WS=wss://api.netwatcher.io/agent_ws\nID=PUT AGENT ID/SECRET HERE\nPIN=PUT PIN HERE\n"
)

const VERSION = "1.2.0rc1"
const VERSION = "1.2.1b1"

func loadConfig(configFile string) error {
fmt.Printf("NetWatcher v%s - Copyright (c) 2023-%d Shaun Agostinho\n", VERSION, time.Now().Year())
Expand Down
64 changes: 50 additions & 14 deletions main.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package main

import (
"archive/tar"
"archive/zip"
"compress/gzip"
"crypto/sha256"
"encoding/hex"
Expand Down Expand Up @@ -101,11 +102,11 @@ func downloadTrippyDependency() error {
switch runtime.GOOS {
case "windows":
if runtime.GOARCH == "amd64" {
fileName = "trippy-VER-x86_64-pc-windows-msvc.exe"
fileName = "trippy-VER-x86_64-pc-windows-msvc.zip"
} else {
fileName = "trippy-VER-i686-pc-windows-msvc.exe"
fileName = "trippy-VER-aarch64-pc-windows-msvc.zip"
}
extractedName = fileName
extractedName = "trip.exe"
case "darwin":
fileName = "trippy-VER-x86_64-apple-darwin.tar.gz"
extractedName = "trip"
Expand Down Expand Up @@ -144,26 +145,23 @@ func downloadTrippyDependency() error {
}

var newHash string
if runtime.GOOS != "windows" {
// Extract the tar.gz for Linux and macOS
newHash, err = extractTarGzAndHash(tempFilePath, libPath)
if runtime.GOOS == "windows" {
newHash, err = extractZipAndHash(tempFilePath, libPath)
if err != nil {
os.Remove(tempFilePath)
return fmt.Errorf("failed to extract archive: %v", err)
}
// Remove the temporary tar.gz file
// Remove the temporary zip file
os.Remove(tempFilePath)
} else {
// For Windows, just rename the downloaded file and get its hash
err = os.Rename(tempFilePath, filePath)
// Extract the tar.gz for Linux and macOS
newHash, err = extractTarGzAndHash(tempFilePath, libPath)
if err != nil {
os.Remove(tempFilePath)
return fmt.Errorf("failed to rename file: %v", err)
}
newHash, err = getFileHash(filePath)
if err != nil {
return fmt.Errorf("failed to get file hash: %v", err)
return fmt.Errorf("failed to extract archive: %v", err)
}
// Remove the temporary tar.gz file
os.Remove(tempFilePath)
}

log.Printf("Downloaded trippy binary: %s\n", filePath)
Expand Down Expand Up @@ -230,6 +228,44 @@ func extractTarGzAndHash(archivePath, destPath string) (string, error) {
return "", fmt.Errorf("'trip' binary not found in archive")
}

func extractZipAndHash(archivePath, destPath string) (string, error) {
reader, err := zip.OpenReader(archivePath)
if err != nil {
return "", err
}
defer reader.Close()

for _, file := range reader.File {
if filepath.Base(file.Name) == "trip.exe" {
outPath := filepath.Join(destPath, "trip.exe")

src, err := file.Open()
if err != nil {
return "", err
}
defer src.Close()

dst, err := os.Create(outPath)
if err != nil {
return "", err
}
defer dst.Close()

hasher := sha256.New()
writer := io.MultiWriter(dst, hasher)

if _, err := io.Copy(writer, src); err != nil {
return "", err
}

fmt.Printf("Extracted file: %s\n", file.Name)
return hex.EncodeToString(hasher.Sum(nil)), nil
}
}

return "", fmt.Errorf("'netwatcher-agent.exe' not found in archive")
}

func getFileHash(filePath string) (string, error) {
f, err := os.Open(filePath)
if err != nil {
Expand Down
17 changes: 9 additions & 8 deletions probes/mtr.go
Original file line number Diff line number Diff line change
Expand Up @@ -90,9 +90,9 @@ func Mtr(cd *Probe, triggered bool) (MtrResult, error) {
switch runtime.GOOS {
case "windows":
if runtime.GOARCH == "amd64" {
trippyBinary = "trippy-x86_64-pc-windows-msvc.exe"
trippyBinary = "trip.exe"
} else {
trippyBinary = "trippy-i686-pc-windows-msvc.exe"
trippyBinary = "trip.exe"
}
case "darwin":
trippyBinary = "trip"
Expand All @@ -110,26 +110,27 @@ func Mtr(cd *Probe, triggered bool) (MtrResult, error) {

trippyPath = filepath.Join(trippyPath, trippyBinary)

args := []string{
/*args := []string{
"--icmp",
"--mode json",
"--multipath-strategy dublin",
"--dns-resolve-method cloudflare",
"--dns-lookup-as-info",
"--report-cycles " + strconv.Itoa(triggeredCount),
cd.Config.Target[0].Target,
}
}*/

ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second)
defer cancel()
/*ctx, cancel := context.WithTimeout(context.Background(), time.Duration(60)*time.Second)
defer cancel()*/

var cmd *exec.Cmd
if runtime.GOOS == "windows" {
cmd = exec.CommandContext(ctx, trippyPath, args...)
shellArgs := append([]string{"/c", trippyPath + " --icmp --mode json --multipath-strategy classic --dns-resolve-method cloudflare --report-cycles " + strconv.Itoa(triggeredCount) + " --dns-lookup-as-info " + cd.Config.Target[0].Target})
cmd = exec.CommandContext(context.TODO(), "cmd.exe", shellArgs...)
} else {
// For Linux and macOS, use /bin/bash
shellArgs := append([]string{"-c", trippyPath + " --icmp --mode json --multipath-strategy classic --dns-resolve-method cloudflare --report-cycles " + strconv.Itoa(triggeredCount) + " --dns-lookup-as-info " + cd.Config.Target[0].Target})
cmd = exec.CommandContext(ctx, "/bin/bash", shellArgs...)
cmd = exec.CommandContext(context.TODO(), "/bin/bash", shellArgs...)
}

output, err := cmd.CombinedOutput()
Expand Down
122 changes: 75 additions & 47 deletions probes/trafficsim.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ import (

const TrafficSim_ReportSeq = 60
const TrafficSim_DataInterval = 1
const RetryInterval = 5 * time.Second

type TrafficSim struct {
Running bool
Expand Down Expand Up @@ -95,37 +96,46 @@ func (ts *TrafficSim) buildMessage(msgType TrafficSimMsgType, data TrafficSimDat
return string(msgBytes), nil
}

func (ts *TrafficSim) runClient() {
toAddr, err := net.ResolveUDPAddr("udp4", ts.IPAddress+":"+strconv.Itoa(int(ts.Port)))
if err != nil {
log.Errorf("TrafficSim: Could not resolve %v:%d", ts.IPAddress, ts.Port)
return
}
func (ts *TrafficSim) runClient() error {
for {
toAddr, err := net.ResolveUDPAddr("udp4", ts.IPAddress+":"+strconv.Itoa(int(ts.Port)))
if err != nil {
return fmt.Errorf("could not resolve %v:%d: %v", ts.IPAddress, ts.Port, err)
}

conn, err := net.DialUDP("udp4", nil, toAddr)
if err != nil {
log.Errorf("TrafficSim: Unable to connect to %v:%d", ts.IPAddress, ts.Port)
return
}
defer conn.Close()
conn, err := net.DialUDP("udp4", nil, toAddr)
if err != nil {
return fmt.Errorf("unable to connect to %v:%d: %v", ts.IPAddress, ts.Port, err)
}
defer conn.Close()

ts.Conn = conn
ts.ClientStats = &ClientStats{
LastReportTime: time.Now(),
ReportInterval: 15 * time.Second,
PacketTimes: make(map[int]PacketTime),
}
ts.Conn = conn
ts.ClientStats = &ClientStats{
LastReportTime: time.Now(),
ReportInterval: 15 * time.Second,
PacketTimes: make(map[int]PacketTime),
}

if err := ts.sendHello(); err != nil {
log.Error("TrafficSim: Failed to establish connection:", err)
return
}
if err := ts.sendHello(); err != nil {
log.Error("TrafficSim: Failed to establish connection:", err)
time.Sleep(RetryInterval)
continue
}

log.Infof("TrafficSim: Connection established successfully to %v", ts.OtherAgent.Hex())
log.Infof("TrafficSim: Connection established successfully to %v", ts.OtherAgent.Hex())

go ts.sendDataLoop()
go ts.reportClientStats()
ts.receiveDataLoop()
errChan := make(chan error, 3)
go ts.sendDataLoop(errChan)
go ts.reportClientStats()
go ts.receiveDataLoop(errChan)

select {
case err := <-errChan:
log.Errorf("TrafficSim: Error in client loop: %v", err)
time.Sleep(RetryInterval)
continue
}
}
}

func (ts *TrafficSim) sendHello() error {
Expand All @@ -149,7 +159,7 @@ func (ts *TrafficSim) sendHello() error {
return nil
}

func (ts *TrafficSim) sendDataLoop() {
func (ts *TrafficSim) sendDataLoop(errChan chan<- error) {
ts.Sequence = 0
for {
time.Sleep(TrafficSim_DataInterval * time.Second)
Expand All @@ -161,33 +171,42 @@ func (ts *TrafficSim) sendDataLoop() {
data := TrafficSimData{Sent: sentTime, Seq: ts.Sequence}
dataMsg, err := ts.buildMessage(TrafficSim_DATA, data)
if err != nil {
log.Error("TrafficSim: Error building data message:", err)
continue
errChan <- fmt.Errorf("error building data message: %v", err)
return
}

_, err = ts.Conn.Write([]byte(dataMsg))
if err != nil {
log.Error("TrafficSim: Error sending data message:", err)
} else {
ts.ClientStats.mu.Lock()
ts.ClientStats.PacketTimes[ts.Sequence] = PacketTime{Sent: sentTime}
ts.ClientStats.mu.Unlock()
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Warn("TrafficSim: Temporary error sending data message:", err)
continue
}
errChan <- fmt.Errorf("error sending data message: %v", err)
return
}

ts.ClientStats.mu.Lock()
ts.ClientStats.PacketTimes[ts.Sequence] = PacketTime{Sent: sentTime}
ts.ClientStats.mu.Unlock()
}
}

func (ts *TrafficSim) receiveDataLoop() {
func (ts *TrafficSim) receiveDataLoop(errChan chan<- error) {
for {
msgBuf := make([]byte, 256)
ts.Conn.SetReadDeadline(time.Now().Add(5 * time.Second))
msgLen, _, err := ts.Conn.ReadFromUDP(msgBuf)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Error("TrafficSim: Timeout: No response received.")
log.Warn("TrafficSim: Timeout: No response received.")
continue
}
log.Error("TrafficSim: Error reading from UDP:", err)
continue
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Warn("TrafficSim: Temporary error reading from UDP:", err)
continue
}
errChan <- fmt.Errorf("error reading from UDP: %v", err)
return
}

tsMsg := TrafficSimMsg{}
Expand Down Expand Up @@ -332,11 +351,10 @@ func (ts *TrafficSim) calculateStats() map[string]interface{} {
}
}

func (ts *TrafficSim) runServer() {
func (ts *TrafficSim) runServer() error {
ln, err := net.ListenUDP("udp4", &net.UDPAddr{Port: int(ts.Port)})
if err != nil {
log.Errorf("Unable to listen on :%d", ts.Port)
return
return fmt.Errorf("unable to listen on :%d: %v", ts.Port, err)
}
defer ln.Close()

Expand All @@ -348,8 +366,11 @@ func (ts *TrafficSim) runServer() {
msgBuf := make([]byte, 256)
msgLen, addr, err := ln.ReadFromUDP(msgBuf)
if err != nil {
log.Error("TrafficSim: Error reading from UDP:", err)
continue
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Warn("TrafficSim: Temporary error reading from UDP:", err)
continue
}
return fmt.Errorf("error reading from UDP: %v", err)
}

go ts.handleConnection(ln, addr, msgBuf[:msgLen])
Expand Down Expand Up @@ -449,9 +470,16 @@ func (ts *TrafficSim) isAgentAllowed(agentID primitive.ObjectID) bool {
}

func (ts *TrafficSim) Start() {
if ts.IsServer {
ts.runServer()
} else {
ts.runClient()
for {
var err error
if ts.IsServer {
err = ts.runServer()
} else {
err = ts.runClient()
}
if err != nil {
log.Errorf("TrafficSim: Error occurred: %v. Retrying in %v...", err, RetryInterval)
time.Sleep(RetryInterval)
}
}
}

0 comments on commit de53aad

Please sign in to comment.