Skip to content

Commit

Permalink
updated trafficsim data format?
Browse files Browse the repository at this point in the history
  • Loading branch information
sagostin committed Jul 11, 2024
1 parent 2aff63c commit 5bacf24
Showing 1 changed file with 67 additions and 37 deletions.
104 changes: 67 additions & 37 deletions probes/trafficsim.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,19 @@ import (
"encoding/json"
"fmt"
log "github.com/sirupsen/logrus"
"math"
"net"
"sort"
"strconv"
"sync"
"time"

"go.mongodb.org/mongo-driver/bson/primitive"
)

const TrafficSim_ReportSeq = 15
const TrafficSim_DataInterval = 1

type TrafficSim struct {
Running bool
Errored bool
Expand All @@ -36,18 +41,18 @@ type TrafficSim struct {
type Connection struct {
Addr *net.UDPAddr
LastResponse time.Time
ReceivedData map[int]TrafficSimData
ExpectedSeq int
AgentID primitive.ObjectID
ClientStats *ClientStats
}

type ClientStats struct {
SentPackets int `json:"sentPackets"`
ReceivedAcks int `json:"receivedAcks"`
LastReportTime time.Time `json:"lastReportTime"`
ReportInterval time.Duration `json:"reportInterval"`
PacketTimes map[int]PacketTime `json:"-"`
mu sync.Mutex
DuplicatePackets int `json:"duplicatePackets"`
OutOfSequence int `json:"outOfSequence"`
PacketTimes map[int]PacketTime `json:"-"`
LastReportTime time.Time `json:"lastReportTime"`
ReportInterval time.Duration `json:"reportInterval"`
mu sync.Mutex
}

type PacketTime struct {
Expand Down Expand Up @@ -147,11 +152,11 @@ func (ts *TrafficSim) sendHello() error {
func (ts *TrafficSim) sendDataLoop() {
ts.Sequence = 0
for {
time.Sleep(1 * time.Second)
time.Sleep(TrafficSim_DataInterval * time.Second)

ts.Mutex.Lock()
ts.Sequence++
if ts.Sequence > ts.MaxSequence {
ts.Sequence = 1
}
ts.Mutex.Unlock()
sentTime := time.Now().UnixMilli()
data := TrafficSimData{Sent: sentTime, Seq: ts.Sequence}
dataMsg, err := ts.buildMessage(TrafficSim_DATA, data)
Expand All @@ -165,7 +170,6 @@ func (ts *TrafficSim) sendDataLoop() {
log.Error("TrafficSim: Error sending data message:", err)
} else {
ts.ClientStats.mu.Lock()
ts.ClientStats.SentPackets++
ts.ClientStats.PacketTimes[ts.Sequence] = PacketTime{Sent: sentTime}
ts.ClientStats.mu.Unlock()
}
Expand Down Expand Up @@ -201,7 +205,6 @@ func (ts *TrafficSim) receiveDataLoop() {
if pTime, ok := ts.ClientStats.PacketTimes[seq]; ok {
pTime.Received = receivedTime
ts.ClientStats.PacketTimes[seq] = pTime
ts.ClientStats.ReceivedAcks++
}
ts.ClientStats.mu.Unlock()
ts.LastResponse = time.Now()
Expand All @@ -210,37 +213,50 @@ func (ts *TrafficSim) receiveDataLoop() {
}

func (ts *TrafficSim) reportClientStats() {
ticker := time.NewTicker(ts.ClientStats.ReportInterval)
ticker := time.NewTicker(TrafficSim_ReportSeq * time.Second)
defer ticker.Stop()

for range ticker.C {
ts.ClientStats.mu.Lock()
stats := ts.calculateStats()
ts.ClientStats.SentPackets = 0
ts.ClientStats.ReceivedAcks = 0
ts.ClientStats.PacketTimes = make(map[int]PacketTime)
ts.ClientStats.LastReportTime = time.Now()
ts.ClientStats.mu.Unlock()
ts.Sequence = 1

ts.DataChan <- ProbeData{
ProbeID: ts.Probe,
Triggered: false,
CreatedAt: time.Now(),
Data: stats,
}
}
}

func (ts *TrafficSim) calculateStats() map[string]interface{} {
var totalRTT, minRTT, maxRTT int64
var rtts []float64
lostPackets := 0
outOfOrder := 0
duplicatePackets := 0
lastReceivedTime := int64(0)
lastSeq := 0
seenPackets := make(map[int]bool)

for _, pTime := range ts.ClientStats.PacketTimes {
// Sort keys to process packets in sequence order
var keys []int
for k := range ts.ClientStats.PacketTimes {
keys = append(keys, k)
}
sort.Ints(keys)

for _, seq := range keys {
pTime := ts.ClientStats.PacketTimes[seq]
if pTime.Received == 0 {
lostPackets++
} else {
rtt := pTime.Received - pTime.Sent
rtts = append(rtts, float64(rtt))
totalRTT += rtt
if minRTT == 0 || rtt < minRTT {
minRTT = rtt
Expand All @@ -251,24 +267,42 @@ func (ts *TrafficSim) calculateStats() map[string]interface{} {
if pTime.Received < lastReceivedTime {
outOfOrder++
}
if seq < lastSeq {
outOfOrder++
}
lastReceivedTime = pTime.Received
lastSeq = seq

// Check for duplicate packets
if seenPackets[seq] {
duplicatePackets++
} else {
seenPackets[seq] = true
}
}
}

avgRTT := int64(0)
if ts.ClientStats.ReceivedAcks > 0 {
avgRTT = totalRTT / int64(ts.ClientStats.ReceivedAcks)
avgRTT := float64(0)
stdDevRTT := float64(0)
if len(rtts) > 0 {
avgRTT = float64(totalRTT) / float64(len(rtts))

// Calculate standard deviation
for _, rtt := range rtts {
stdDevRTT += math.Pow(rtt-avgRTT, 2)
}
stdDevRTT = math.Sqrt(stdDevRTT / float64(len(rtts)))
}

return map[string]interface{}{
"sentPackets": ts.ClientStats.SentPackets,
"receivedAcks": ts.ClientStats.ReceivedAcks,
"lostPackets": lostPackets,
"outOfOrder": outOfOrder,
"averageRTT": avgRTT,
"minRTT": minRTT,
"maxRTT": maxRTT,
"reportInterval": ts.ClientStats.ReportInterval,
"lostPackets": lostPackets,
"outOfSequence": outOfOrder,
"duplicatePackets": duplicatePackets,
"averageRTT": avgRTT,
"minRTT": minRTT,
"maxRTT": maxRTT,
"stdDevRTT": stdDevRTT,
"totalPackets": len(ts.ClientStats.PacketTimes),
}
}

Expand Down Expand Up @@ -315,8 +349,6 @@ func (ts *TrafficSim) handleConnection(conn *net.UDPConn, addr *net.UDPAddr, msg
connection = &Connection{
Addr: addr,
LastResponse: time.Now(),
ReceivedData: make(map[int]TrafficSimData),
ExpectedSeq: 1,
AgentID: tsMsg.Src,
}
ts.Connections[tsMsg.Src] = connection
Expand Down Expand Up @@ -346,7 +378,6 @@ func (ts *TrafficSim) sendACK(conn *net.UDPConn, addr *net.UDPAddr, data Traffic

func (ts *TrafficSim) handleData(conn *net.UDPConn, addr *net.UDPAddr, data TrafficSimData, connection *Connection) {
connection.LastResponse = time.Now()
connection.ReceivedData[data.Seq] = data

log.Infof("TrafficSim: Received data from %s: Seq %d", addr.String(), data.Seq)

Expand All @@ -357,10 +388,9 @@ func (ts *TrafficSim) handleData(conn *net.UDPConn, addr *net.UDPAddr, data Traf
}
ts.sendACK(conn, addr, ackData)

if data.Seq == 1 && connection.ExpectedSeq > 1 {
/*if data.Seq == 1 && connection.ExpectedSeq > 1 {
log.Infof("TrafficSim: Client %s has reset its sequence", addr.String())
connection.ExpectedSeq = 1
connection.ReceivedData = make(map[int]TrafficSimData)
}
if data.Seq > connection.ExpectedSeq {
Expand All @@ -369,18 +399,18 @@ func (ts *TrafficSim) handleData(conn *net.UDPConn, addr *net.UDPAddr, data Traf
log.Warnf("TrafficSim: Out of sequence packet received. Expected: %d, Got: %d", connection.ExpectedSeq, data.Seq)
} else {
connection.ExpectedSeq++
}
}*/

if len(connection.ReceivedData) >= 10 {
/*if len(connection.ReceivedData) >= 10 {
ts.reportToController(connection)
connection.ReceivedData = make(map[int]TrafficSimData)
connection.ExpectedSeq = 1
}
}*/
}

func (ts *TrafficSim) reportToController(connection *Connection) {
// Implement the actual reporting logic here
log.Infof("TrafficSim: Reporting stats for client %s", connection.AgentID.Hex())
// log.Infof("TrafficSim: Reporting stats for client %s", connection.AgentID.Hex())
}

func (ts *TrafficSim) isAgentAllowed(agentID primitive.ObjectID) bool {
Expand Down

0 comments on commit 5bacf24

Please sign in to comment.