Skip to content

Commit

Permalink
Sarthak | Changes the logic in reporter to measure average payload size
Browse files Browse the repository at this point in the history
  • Loading branch information
SarthakMakhija committed Aug 19, 2023
1 parent 86fd29d commit dd0fa95
Show file tree
Hide file tree
Showing 4 changed files with 184 additions and 19 deletions.
28 changes: 17 additions & 11 deletions report/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ type Report struct {
Response ResponseMetrics
}

// TODO: Total connections
type LoadMetrics struct {
TotalRequests uint
SuccessCount uint
Expand Down Expand Up @@ -94,7 +93,6 @@ func (reporter *Reporter) Run() {
}

func (reporter *Reporter) PrintReport(writer io.Writer) {
println("printing report...")
<-reporter.loadMetricsDoneChannel
if reporter.responseMetricsDoneChannel != nil {
<-reporter.responseMetricsDoneChannel
Expand All @@ -118,11 +116,8 @@ func (reporter *Reporter) collectLoadMetrics() {
reporter.report.Load.ErrorCountByType[load.Err.Error()]++
} else {
reporter.report.Load.SuccessCount++
reporter.report.Load.TotalPayloadLengthBytes += load.PayloadLengthBytes
}
reporter.report.Load.TotalPayloadLengthBytes += load.PayloadLengthBytes
reporter.report.Load.AveragePayloadLengthBytes = reporter.report.Load.TotalPayloadLengthBytes / int64(
totalGeneratedLoad,
)

if reporter.report.Load.EarliestLoadSendTime.IsZero() ||
load.LoadGenerationTime.Before(reporter.report.Load.EarliestLoadSendTime) {
Expand All @@ -137,6 +132,13 @@ func (reporter *Reporter) collectLoadMetrics() {
startTime := reporter.report.Load.EarliestLoadSendTime
timeToCompleteLoad := reporter.report.Load.LatestLoadSendTime.Sub(startTime)

if reporter.report.Load.SuccessCount != 0 {
reporter.report.Load.AveragePayloadLengthBytes = reporter.report.Load.TotalPayloadLengthBytes / int64(
reporter.report.Load.SuccessCount,
)
} else {
reporter.report.Load.AveragePayloadLengthBytes = 0
}
reporter.report.Load.TotalTime = timeToCompleteLoad
reporter.report.Load.TotalRequests = totalGeneratedLoad
close(reporter.loadMetricsDoneChannel)
Expand All @@ -154,12 +156,8 @@ func (reporter *Reporter) collectResponseMetrics() {
reporter.report.Response.ErrorCountByType[response.Err.Error()]++
} else {
reporter.report.Response.SuccessCount++
reporter.report.Response.TotalResponsePayloadLengthBytes += response.PayloadLengthBytes
}
reporter.report.Response.TotalResponsePayloadLengthBytes += response.PayloadLengthBytes
reporter.report.Response.AverageResponsePayloadLengthBytes = reporter.report.Response.TotalResponsePayloadLengthBytes /
int64(
totalResponses,
)

if reporter.report.Response.EarliestResponseReceivedTime.IsZero() ||
response.ResponseTime.Before(
Expand All @@ -176,6 +174,14 @@ func (reporter *Reporter) collectResponseMetrics() {
}
}
reporter.report.Response.TotalResponses = uint(totalResponses)
if reporter.report.Response.SuccessCount != 0 {
reporter.report.Response.AverageResponsePayloadLengthBytes = reporter.report.Response.TotalResponsePayloadLengthBytes /
int64(
reporter.report.Response.SuccessCount,
)
} else {
reporter.report.Response.AverageResponsePayloadLengthBytes = 0
}

timeToCompleteResponses := reporter.report.Response.LatestResponseReceivedTime.
Sub(reporter.report.Response.EarliestResponseReceivedTime)
Expand Down
105 changes: 102 additions & 3 deletions report/reporter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ func TestReportWithTotalRequests(t *testing.T) {
}

func TestReportWithPayloadLengthInGeneratingLoad(t *testing.T) {
loadGenerationChannel := make(chan LoadGenerationResponse, 1)
loadGenerationChannel := make(chan LoadGenerationResponse, 2)
reporter := NewLoadGenerationMetricsCollectingReporter(loadGenerationChannel)
reporter.Run()

Expand All @@ -83,13 +83,57 @@ func TestReportWithPayloadLengthInGeneratingLoad(t *testing.T) {
PayloadLengthBytes: 10,
}

time.Sleep(2 * time.Millisecond)
time.Sleep(4 * time.Millisecond)
close(loadGenerationChannel)

time.Sleep(2 * time.Millisecond)
assert.Equal(t, int64(20), reporter.report.Load.TotalPayloadLengthBytes)
assert.Equal(t, int64(10), reporter.report.Load.AveragePayloadLengthBytes)
}

func TestReportWithPayloadLengthInGeneratingLoadWithAnError(t *testing.T) {
loadGenerationChannel := make(chan LoadGenerationResponse, 2)
reporter := NewLoadGenerationMetricsCollectingReporter(loadGenerationChannel)
reporter.Run()

loadGenerationChannel <- LoadGenerationResponse{
PayloadLengthBytes: 10,
}
loadGenerationChannel <- LoadGenerationResponse{
Err: errors.New("test error"),
PayloadLengthBytes: 10,
}

time.Sleep(4 * time.Millisecond)
close(loadGenerationChannel)

time.Sleep(2 * time.Millisecond)
assert.Equal(t, int64(10), reporter.report.Load.TotalPayloadLengthBytes)
assert.Equal(t, int64(10), reporter.report.Load.AveragePayloadLengthBytes)
}

func TestReportWithPayloadLengthInGeneratingLoadWithAllErrors(t *testing.T) {
loadGenerationChannel := make(chan LoadGenerationResponse, 2)
reporter := NewLoadGenerationMetricsCollectingReporter(loadGenerationChannel)
reporter.Run()

loadGenerationChannel <- LoadGenerationResponse{
Err: errors.New("test error"),
PayloadLengthBytes: 10,
}
loadGenerationChannel <- LoadGenerationResponse{
Err: errors.New("test error"),
PayloadLengthBytes: 10,
}

time.Sleep(4 * time.Millisecond)
close(loadGenerationChannel)

time.Sleep(2 * time.Millisecond)
assert.Equal(t, int64(0), reporter.report.Load.TotalPayloadLengthBytes)
assert.Equal(t, int64(0), reporter.report.Load.AveragePayloadLengthBytes)
}

func TestReportWithLoadTimeInGeneratingLoad(t *testing.T) {
loadGenerationChannel := make(chan LoadGenerationResponse, 1)
reporter := NewLoadGenerationMetricsCollectingReporter(loadGenerationChannel)
Expand Down Expand Up @@ -182,7 +226,7 @@ func TestReportWithTotalResponses(t *testing.T) {
}

func TestReportWithResponsePayloadLengthInReceivingResponse(t *testing.T) {
responseChannel := make(chan SubjectServerResponse, 1)
responseChannel := make(chan SubjectServerResponse, 2)
reporter := NewResponseMetricsCollectingReporter(nil, responseChannel)
reporter.Run()

Expand All @@ -196,6 +240,8 @@ func TestReportWithResponsePayloadLengthInReceivingResponse(t *testing.T) {
time.Sleep(2 * time.Millisecond)
close(responseChannel)

time.Sleep(2 * time.Millisecond)

assert.Equal(t, int64(20), reporter.report.Response.TotalResponsePayloadLengthBytes)
assert.Equal(
t,
Expand All @@ -204,6 +250,59 @@ func TestReportWithResponsePayloadLengthInReceivingResponse(t *testing.T) {
)
}

func TestReportWithResponsePayloadLengthInReceivingResponseWithAnError(t *testing.T) {
responseChannel := make(chan SubjectServerResponse, 2)
reporter := NewResponseMetricsCollectingReporter(nil, responseChannel)
reporter.Run()

responseChannel <- SubjectServerResponse{
PayloadLengthBytes: 10,
}
responseChannel <- SubjectServerResponse{
Err: errors.New("test error"),
PayloadLengthBytes: 10,
}

time.Sleep(2 * time.Millisecond)
close(responseChannel)

time.Sleep(2 * time.Millisecond)

assert.Equal(t, int64(10), reporter.report.Response.TotalResponsePayloadLengthBytes)
assert.Equal(
t,
int64(10),
reporter.report.Response.AverageResponsePayloadLengthBytes,
)
}

func TestReportWithResponsePayloadLengthInReceivingResponseWithAllErrors(t *testing.T) {
responseChannel := make(chan SubjectServerResponse, 2)
reporter := NewResponseMetricsCollectingReporter(nil, responseChannel)
reporter.Run()

responseChannel <- SubjectServerResponse{
Err: errors.New("test error"),
PayloadLengthBytes: 10,
}
responseChannel <- SubjectServerResponse{
Err: errors.New("test error"),
PayloadLengthBytes: 10,
}

time.Sleep(2 * time.Millisecond)
close(responseChannel)

time.Sleep(2 * time.Millisecond)

assert.Equal(t, int64(0), reporter.report.Response.TotalResponsePayloadLengthBytes)
assert.Equal(
t,
int64(0),
reporter.report.Response.AverageResponsePayloadLengthBytes,
)
}

func TestReportWithLoadTimeInReceivingResponse(t *testing.T) {
responseChannel := make(chan SubjectServerResponse, 1)
reporter := NewResponseMetricsCollectingReporter(nil, responseChannel)
Expand Down
35 changes: 35 additions & 0 deletions tests/blast_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -151,3 +151,38 @@ func TestBlastWithLoadGenerationAndResponseReadingForMaximumDuration(t *testing.

assert.True(t, totalRequestsMade < 2_00_000)
}

func TestBlastWithResponseReadingGivenTheTargetServerFailsInSendingResponses(t *testing.T) {
payloadSizeBytes := int64(10)
server, err := NewEchoServerWithNoWriteback("tcp", "localhost:10005", payloadSizeBytes, 2)
assert.Nil(t, err)

server.accept(t)
defer server.stop()

concurrency, totalRequests := uint(10), uint(20)

groupOptions := workers.NewGroupOptions(
concurrency,
totalRequests,
[]byte("HelloWorld"),
"localhost:10005",
)
responseOptions := blast.ResponseOptions{
ResponsePayloadSizeBytes: payloadSizeBytes,
TotalResponsesToRead: 20,
ReadingOption: blast.ReadTotalResponses,
}
buffer := &bytes.Buffer{}
blast.OutputStream = buffer
blast.NewBlastWithResponseReading(groupOptions, responseOptions, 5*time.Second)

output := string(buffer.Bytes())

assert.True(t, strings.Contains(output, "ResponseMetrics"))
assert.True(t, strings.Contains(output, "TotalResponses: 20"))
assert.True(t, strings.Contains(output, "SuccessCount: 10"))
assert.True(t, strings.Contains(output, "ErrorCount: 10"))
assert.True(t, strings.Contains(output, "TotalResponsePayloadSize: 100 B"))
assert.True(t, strings.Contains(output, "AveragePayloadSize: 10 B"))
}
35 changes: 30 additions & 5 deletions tests/echo_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,10 +9,11 @@ import (
)

type EchoServer struct {
listener net.Listener
payloadSizeBytes int64
stopChannel chan struct{}
totalRequests atomic.Uint32
listener net.Listener
payloadSizeBytes int64
stopChannel chan struct{}
totalRequests atomic.Uint32
donotWritebackEveryKRequests uint
}

func NewEchoServer(network, address string, payloadSizeBytes int64) (*EchoServer, error) {
Expand All @@ -28,6 +29,24 @@ func NewEchoServer(network, address string, payloadSizeBytes int64) (*EchoServer
}, nil
}

func NewEchoServerWithNoWriteback(
network, address string,
payloadSizeBytes int64,
donotWritebackEveryKRequests uint,
) (*EchoServer, error) {
listener, err := net.Listen(network, address)
if err != nil {
return nil, err
}

return &EchoServer{
listener: listener,
payloadSizeBytes: payloadSizeBytes,
stopChannel: make(chan struct{}),
donotWritebackEveryKRequests: donotWritebackEveryKRequests,
}, nil
}

func (server *EchoServer) accept(t *testing.T) {
go func() {
for {
Expand All @@ -46,6 +65,7 @@ func (server *EchoServer) accept(t *testing.T) {

func (server *EchoServer) handleConnection(connection net.Conn) {
go func() {
requestCount := uint(0)
for {
select {
case <-server.stopChannel:
Expand All @@ -56,7 +76,12 @@ func (server *EchoServer) handleConnection(connection net.Conn) {
_, _ = connection.Read(payload)
server.totalRequests.Add(1)

_, _ = connection.Write(payload)
requestCount = requestCount + 1
if server.donotWritebackEveryKRequests != 0 &&
requestCount%server.donotWritebackEveryKRequests == 0 {
} else {
_, _ = connection.Write(payload)
}
}
}
}()
Expand Down

0 comments on commit dd0fa95

Please sign in to comment.