Skip to content

Commit

Permalink
update traffic sim to alleviate lost connections?
Browse files Browse the repository at this point in the history
  • Loading branch information
sagostin committed Aug 29, 2024
1 parent 70b3657 commit c148c2d
Showing 1 changed file with 131 additions and 104 deletions.
235 changes: 131 additions & 104 deletions probes/trafficsim.go
Original file line number Diff line number Diff line change
Expand Up @@ -101,7 +101,9 @@ func (ts *TrafficSim) runClient() error {
for {
currentIP, err := getLocalIP()
if err != nil {
return fmt.Errorf("failed to get local IP: %v", err)
log.Errorf("TrafficSim: Failed to get local IP: %v", err)
time.Sleep(RetryInterval)
continue
}

if ts.localIP != currentIP {
Expand All @@ -111,19 +113,24 @@ func (ts *TrafficSim) runClient() error {

toAddr, err := net.ResolveUDPAddr("udp4", ts.IPAddress+":"+strconv.Itoa(int(ts.Port)))
if err != nil {
return fmt.Errorf("could not resolve %v:%d: %v", ts.IPAddress, ts.Port, err)
log.Errorf("TrafficSim: Could not resolve %v:%d: %v", ts.IPAddress, ts.Port, err)
time.Sleep(RetryInterval)
continue
}

localAddr, err := net.ResolveUDPAddr("udp4", ts.localIP+":0")
if err != nil {
return fmt.Errorf("could not resolve local address: %v", err)
log.Errorf("TrafficSim: Could not resolve local address: %v", err)
time.Sleep(RetryInterval)
continue
}

conn, err := net.DialUDP("udp4", localAddr, toAddr)
if err != nil {
return fmt.Errorf("unable to connect to %v:%d: %v", ts.IPAddress, ts.Port, err)
log.Errorf("TrafficSim: Unable to connect to %v:%d: %v", ts.IPAddress, ts.Port, err)
time.Sleep(RetryInterval)
continue
}
defer conn.Close()

ts.Conn = conn
ts.ClientStats = &ClientStats{
Expand All @@ -133,23 +140,27 @@ func (ts *TrafficSim) runClient() error {
}

if err := ts.sendHello(); err != nil {
log.Error("TrafficSim: Failed to establish connection:", err)
log.Errorf("TrafficSim: Failed to establish connection: %v", err)
ts.Conn.Close()
time.Sleep(RetryInterval)
continue
}

log.Infof("TrafficSim: Connection established successfully to %v", ts.OtherAgent.Hex())

errChan := make(chan error, 3)
go ts.sendDataLoop(errChan)
go ts.reportClientStats()
go ts.receiveDataLoop(errChan)
stopChan := make(chan struct{})

go ts.sendDataLoop(errChan, stopChan)
go ts.reportClientStats(stopChan)
go ts.receiveDataLoop(errChan, stopChan)

select {
case err := <-errChan:
log.Errorf("TrafficSim: Error in client loop: %v", err)
close(stopChan)
ts.Conn.Close()
time.Sleep(RetryInterval)
continue
}
}
}
Expand All @@ -175,74 +186,132 @@ func (ts *TrafficSim) sendHello() error {
return nil
}

func (ts *TrafficSim) sendDataLoop(errChan chan<- error) {
ts.Sequence = 0
func (ts *TrafficSim) sendDataLoop(errChan chan<- error, stopChan <-chan struct{}) {
ticker := time.NewTicker(TrafficSim_DataInterval * time.Second)
defer ticker.Stop()

for {
time.Sleep(TrafficSim_DataInterval * time.Second)

ts.Mutex.Lock()
ts.Sequence++
ts.Mutex.Unlock()
sentTime := time.Now().UnixMilli()
data := TrafficSimData{Sent: sentTime, Seq: ts.Sequence}
dataMsg, err := ts.buildMessage(TrafficSim_DATA, data)
if err != nil {
errChan <- fmt.Errorf("error building data message: %v", err)
select {
case <-stopChan:
return
}
case <-ticker.C:
ts.Mutex.Lock()
ts.Sequence++
ts.Mutex.Unlock()
sentTime := time.Now().UnixMilli()
data := TrafficSimData{Sent: sentTime, Seq: ts.Sequence}
dataMsg, err := ts.buildMessage(TrafficSim_DATA, data)
if err != nil {
errChan <- fmt.Errorf("error building data message: %v", err)
return
}

_, err = ts.Conn.Write([]byte(dataMsg))
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Warn("TrafficSim: Temporary error sending data message:", err)
continue
_, err = ts.Conn.Write([]byte(dataMsg))
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Warn("TrafficSim: Temporary error sending data message:", err)
continue
}
errChan <- fmt.Errorf("error sending data message: %v", err)
return
}
errChan <- fmt.Errorf("error sending data message: %v", err)
return
}

ts.ClientStats.mu.Lock()
ts.ClientStats.PacketTimes[ts.Sequence] = PacketTime{Sent: sentTime}
ts.ClientStats.mu.Unlock()
ts.ClientStats.mu.Lock()
ts.ClientStats.PacketTimes[ts.Sequence] = PacketTime{Sent: sentTime}
ts.ClientStats.mu.Unlock()
}
}
}

func (ts *TrafficSim) receiveDataLoop(errChan chan<- error) {
func (ts *TrafficSim) receiveDataLoop(errChan chan<- error, stopChan <-chan struct{}) {
for {
msgBuf := make([]byte, 256)
ts.Conn.SetReadDeadline(time.Now().Add(5 * time.Second))
msgLen, _, err := ts.Conn.ReadFromUDP(msgBuf)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Warn("TrafficSim: Timeout: No response received.")
continue
select {
case <-stopChan:
return
default:
msgBuf := make([]byte, 256)
ts.Conn.SetReadDeadline(time.Now().Add(5 * time.Second))
msgLen, _, err := ts.Conn.ReadFromUDP(msgBuf)
if err != nil {
if netErr, ok := err.(net.Error); ok && netErr.Timeout() {
log.Warn("TrafficSim: Timeout: No response received.")
continue
}
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Warn("TrafficSim: Temporary error reading from UDP:", err)
continue
}
errChan <- fmt.Errorf("error reading from UDP: %v", err)
return
}
if netErr, ok := err.(net.Error); ok && netErr.Temporary() {
log.Warn("TrafficSim: Temporary error reading from UDP:", err)

tsMsg := TrafficSimMsg{}
err = json.Unmarshal(msgBuf[:msgLen], &tsMsg)
if err != nil {
log.Error("TrafficSim: Error unmarshalling message:", err)
continue
}
errChan <- fmt.Errorf("error reading from UDP: %v", err)
return
}

tsMsg := TrafficSimMsg{}
err = json.Unmarshal(msgBuf[:msgLen], &tsMsg)
if err != nil {
log.Error("TrafficSim: Error unmarshalling message:", err)
continue
if tsMsg.Type == TrafficSim_ACK {
data := tsMsg.Data
seq := data.Seq
receivedTime := time.Now().UnixMilli()
ts.ClientStats.mu.Lock()
if pTime, ok := ts.ClientStats.PacketTimes[seq]; ok {
pTime.Received = receivedTime
ts.ClientStats.PacketTimes[seq] = pTime
}
ts.ClientStats.mu.Unlock()
ts.LastResponse = time.Now()
}
}
}
}

func (ts *TrafficSim) reportClientStats(stopChan <-chan struct{}) {
ticker := time.NewTicker(TrafficSim_ReportSeq * time.Second)
defer ticker.Stop()

if tsMsg.Type == TrafficSim_ACK {
data := tsMsg.Data
seq := data.Seq
receivedTime := time.Now().UnixMilli()
const MaxWaitTime = 500 * time.Millisecond

for {
select {
case <-stopChan:
return
case <-ticker.C:
ts.ClientStats.mu.Lock()
if pTime, ok := ts.ClientStats.PacketTimes[seq]; ok {
pTime.Received = receivedTime
ts.ClientStats.PacketTimes[seq] = pTime

// Find the highest sequence number
var maxSeq int
for seq := range ts.ClientStats.PacketTimes {
if seq > maxSeq {
maxSeq = seq
}
}

// Wait for the last packet, but no longer than MaxWaitTime
startWait := time.Now()
for time.Since(startWait) < MaxWaitTime {
if pTime, ok := ts.ClientStats.PacketTimes[maxSeq]; ok && pTime.Received != 0 {
break
}
ts.ClientStats.mu.Unlock()
time.Sleep(10 * time.Millisecond)
ts.ClientStats.mu.Lock()
}

stats := ts.calculateStats()
ts.ClientStats.PacketTimes = make(map[int]PacketTime)
ts.ClientStats.LastReportTime = time.Now()
ts.Sequence = 1
ts.ClientStats.mu.Unlock()
ts.LastResponse = time.Now()

ts.DataChan <- ProbeData{
ProbeID: ts.Probe,
Triggered: false,
CreatedAt: time.Now(),
Data: stats,
}
}
}
}
Expand All @@ -262,49 +331,6 @@ func getLocalIP() (string, error) {
return "", fmt.Errorf("no suitable local IP address found")
}

func (ts *TrafficSim) reportClientStats() {
ticker := time.NewTicker(TrafficSim_ReportSeq * time.Second)
defer ticker.Stop()

const MaxWaitTime = 500 * time.Millisecond

for range ticker.C {
ts.ClientStats.mu.Lock()

// Find the highest sequence number
var maxSeq int
for seq := range ts.ClientStats.PacketTimes {
if seq > maxSeq {
maxSeq = seq
}
}

// Wait for the last packet, but no longer than MaxWaitTime
startWait := time.Now()
for time.Since(startWait) < MaxWaitTime {
if pTime, ok := ts.ClientStats.PacketTimes[maxSeq]; ok && pTime.Received != 0 {
break
}
ts.ClientStats.mu.Unlock()
time.Sleep(10 * time.Millisecond)
ts.ClientStats.mu.Lock()
}

stats := ts.calculateStats()
ts.ClientStats.PacketTimes = make(map[int]PacketTime)
ts.ClientStats.LastReportTime = time.Now()
ts.Sequence = 1
ts.ClientStats.mu.Unlock()

ts.DataChan <- ProbeData{
ProbeID: ts.Probe,
Triggered: false,
CreatedAt: time.Now(),
Data: stats,
}
}
}

func (ts *TrafficSim) calculateStats() map[string]interface{} {
var totalRTT, minRTT, maxRTT int64
var rtts []float64
Expand Down Expand Up @@ -512,7 +538,8 @@ func (ts *TrafficSim) Start() {
ts.localIP, err = getLocalIP()
if err != nil {
log.Errorf("TrafficSim: Failed to get local IP: %v", err)
return
time.Sleep(RetryInterval)
continue
}

if ts.IsServer {
Expand Down

0 comments on commit c148c2d

Please sign in to comment.