From 5bacf2414a37fe6b51d348c3a909c14e56f1d685 Mon Sep 17 00:00:00 2001 From: Shaun Agostinho Date: Thu, 11 Jul 2024 12:38:48 -0700 Subject: [PATCH] updated trafficsim data format? --- probes/trafficsim.go | 104 ++++++++++++++++++++++++++++--------------- 1 file changed, 67 insertions(+), 37 deletions(-) diff --git a/probes/trafficsim.go b/probes/trafficsim.go index 21c9f15..4bb4690 100644 --- a/probes/trafficsim.go +++ b/probes/trafficsim.go @@ -4,7 +4,9 @@ import ( "encoding/json" "fmt" log "github.com/sirupsen/logrus" + "math" "net" + "sort" "strconv" "sync" "time" @@ -12,6 +14,9 @@ import ( "go.mongodb.org/mongo-driver/bson/primitive" ) +const TrafficSim_ReportSeq = 15 +const TrafficSim_DataInterval = 1 + type TrafficSim struct { Running bool Errored bool @@ -36,18 +41,18 @@ type TrafficSim struct { type Connection struct { Addr *net.UDPAddr LastResponse time.Time - ReceivedData map[int]TrafficSimData ExpectedSeq int AgentID primitive.ObjectID + ClientStats *ClientStats } type ClientStats struct { - SentPackets int `json:"sentPackets"` - ReceivedAcks int `json:"receivedAcks"` - LastReportTime time.Time `json:"lastReportTime"` - ReportInterval time.Duration `json:"reportInterval"` - PacketTimes map[int]PacketTime `json:"-"` - mu sync.Mutex + DuplicatePackets int `json:"duplicatePackets"` + OutOfSequence int `json:"outOfSequence"` + PacketTimes map[int]PacketTime `json:"-"` + LastReportTime time.Time `json:"lastReportTime"` + ReportInterval time.Duration `json:"reportInterval"` + mu sync.Mutex } type PacketTime struct { @@ -147,11 +152,11 @@ func (ts *TrafficSim) sendHello() error { func (ts *TrafficSim) sendDataLoop() { ts.Sequence = 0 for { - time.Sleep(1 * time.Second) + time.Sleep(TrafficSim_DataInterval * time.Second) + + ts.Mutex.Lock() ts.Sequence++ - if ts.Sequence > ts.MaxSequence { - ts.Sequence = 1 - } + ts.Mutex.Unlock() sentTime := time.Now().UnixMilli() data := TrafficSimData{Sent: sentTime, Seq: ts.Sequence} dataMsg, err := ts.buildMessage(TrafficSim_DATA, data) @@ -165,7 +170,6 @@ func (ts *TrafficSim) sendDataLoop() { log.Error("TrafficSim: Error sending data message:", err) } else { ts.ClientStats.mu.Lock() - ts.ClientStats.SentPackets++ ts.ClientStats.PacketTimes[ts.Sequence] = PacketTime{Sent: sentTime} ts.ClientStats.mu.Unlock() } @@ -201,7 +205,6 @@ func (ts *TrafficSim) receiveDataLoop() { if pTime, ok := ts.ClientStats.PacketTimes[seq]; ok { pTime.Received = receivedTime ts.ClientStats.PacketTimes[seq] = pTime - ts.ClientStats.ReceivedAcks++ } ts.ClientStats.mu.Unlock() ts.LastResponse = time.Now() @@ -210,21 +213,21 @@ func (ts *TrafficSim) receiveDataLoop() { } func (ts *TrafficSim) reportClientStats() { - ticker := time.NewTicker(ts.ClientStats.ReportInterval) + ticker := time.NewTicker(TrafficSim_ReportSeq * time.Second) defer ticker.Stop() for range ticker.C { ts.ClientStats.mu.Lock() stats := ts.calculateStats() - ts.ClientStats.SentPackets = 0 - ts.ClientStats.ReceivedAcks = 0 ts.ClientStats.PacketTimes = make(map[int]PacketTime) ts.ClientStats.LastReportTime = time.Now() ts.ClientStats.mu.Unlock() + ts.Sequence = 1 ts.DataChan <- ProbeData{ ProbeID: ts.Probe, Triggered: false, + CreatedAt: time.Now(), Data: stats, } } @@ -232,15 +235,28 @@ func (ts *TrafficSim) reportClientStats() { func (ts *TrafficSim) calculateStats() map[string]interface{} { var totalRTT, minRTT, maxRTT int64 + var rtts []float64 lostPackets := 0 outOfOrder := 0 + duplicatePackets := 0 lastReceivedTime := int64(0) + lastSeq := 0 + seenPackets := make(map[int]bool) - for _, pTime := range ts.ClientStats.PacketTimes { + // Sort keys to process packets in sequence order + var keys []int + for k := range ts.ClientStats.PacketTimes { + keys = append(keys, k) + } + sort.Ints(keys) + + for _, seq := range keys { + pTime := ts.ClientStats.PacketTimes[seq] if pTime.Received == 0 { lostPackets++ } else { rtt := pTime.Received - pTime.Sent + rtts = append(rtts, float64(rtt)) totalRTT += rtt if minRTT == 0 || rtt < minRTT { minRTT = rtt @@ -251,24 +267,42 @@ func (ts *TrafficSim) calculateStats() map[string]interface{} { if pTime.Received < lastReceivedTime { outOfOrder++ } + if seq < lastSeq { + outOfOrder++ + } lastReceivedTime = pTime.Received + lastSeq = seq + + // Check for duplicate packets + if seenPackets[seq] { + duplicatePackets++ + } else { + seenPackets[seq] = true + } } } - avgRTT := int64(0) - if ts.ClientStats.ReceivedAcks > 0 { - avgRTT = totalRTT / int64(ts.ClientStats.ReceivedAcks) + avgRTT := float64(0) + stdDevRTT := float64(0) + if len(rtts) > 0 { + avgRTT = float64(totalRTT) / float64(len(rtts)) + + // Calculate standard deviation + for _, rtt := range rtts { + stdDevRTT += math.Pow(rtt-avgRTT, 2) + } + stdDevRTT = math.Sqrt(stdDevRTT / float64(len(rtts))) } return map[string]interface{}{ - "sentPackets": ts.ClientStats.SentPackets, - "receivedAcks": ts.ClientStats.ReceivedAcks, - "lostPackets": lostPackets, - "outOfOrder": outOfOrder, - "averageRTT": avgRTT, - "minRTT": minRTT, - "maxRTT": maxRTT, - "reportInterval": ts.ClientStats.ReportInterval, + "lostPackets": lostPackets, + "outOfSequence": outOfOrder, + "duplicatePackets": duplicatePackets, + "averageRTT": avgRTT, + "minRTT": minRTT, + "maxRTT": maxRTT, + "stdDevRTT": stdDevRTT, + "totalPackets": len(ts.ClientStats.PacketTimes), } } @@ -315,8 +349,6 @@ func (ts *TrafficSim) handleConnection(conn *net.UDPConn, addr *net.UDPAddr, msg connection = &Connection{ Addr: addr, LastResponse: time.Now(), - ReceivedData: make(map[int]TrafficSimData), - ExpectedSeq: 1, AgentID: tsMsg.Src, } ts.Connections[tsMsg.Src] = connection @@ -346,7 +378,6 @@ func (ts *TrafficSim) sendACK(conn *net.UDPConn, addr *net.UDPAddr, data Traffic func (ts *TrafficSim) handleData(conn *net.UDPConn, addr *net.UDPAddr, data TrafficSimData, connection *Connection) { connection.LastResponse = time.Now() - connection.ReceivedData[data.Seq] = data log.Infof("TrafficSim: Received data from %s: Seq %d", addr.String(), data.Seq) @@ -357,10 +388,9 @@ func (ts *TrafficSim) handleData(conn *net.UDPConn, addr *net.UDPAddr, data Traf } ts.sendACK(conn, addr, ackData) - if data.Seq == 1 && connection.ExpectedSeq > 1 { + /*if data.Seq == 1 && connection.ExpectedSeq > 1 { log.Infof("TrafficSim: Client %s has reset its sequence", addr.String()) connection.ExpectedSeq = 1 - connection.ReceivedData = make(map[int]TrafficSimData) } if data.Seq > connection.ExpectedSeq { @@ -369,18 +399,18 @@ func (ts *TrafficSim) handleData(conn *net.UDPConn, addr *net.UDPAddr, data Traf log.Warnf("TrafficSim: Out of sequence packet received. Expected: %d, Got: %d", connection.ExpectedSeq, data.Seq) } else { connection.ExpectedSeq++ - } + }*/ - if len(connection.ReceivedData) >= 10 { + /*if len(connection.ReceivedData) >= 10 { ts.reportToController(connection) connection.ReceivedData = make(map[int]TrafficSimData) connection.ExpectedSeq = 1 - } + }*/ } func (ts *TrafficSim) reportToController(connection *Connection) { // Implement the actual reporting logic here - log.Infof("TrafficSim: Reporting stats for client %s", connection.AgentID.Hex()) + // log.Infof("TrafficSim: Reporting stats for client %s", connection.AgentID.Hex()) } func (ts *TrafficSim) isAgentAllowed(agentID primitive.ObjectID) bool {