diff --git a/report/reporter.go b/report/reporter.go index 9b97f14..16cc555 100644 --- a/report/reporter.go +++ b/report/reporter.go @@ -11,7 +11,6 @@ type Report struct { Response ResponseMetrics } -// TODO: Total connections type LoadMetrics struct { TotalRequests uint SuccessCount uint @@ -94,7 +93,6 @@ func (reporter *Reporter) Run() { } func (reporter *Reporter) PrintReport(writer io.Writer) { - println("printing report...") <-reporter.loadMetricsDoneChannel if reporter.responseMetricsDoneChannel != nil { <-reporter.responseMetricsDoneChannel @@ -118,11 +116,8 @@ func (reporter *Reporter) collectLoadMetrics() { reporter.report.Load.ErrorCountByType[load.Err.Error()]++ } else { reporter.report.Load.SuccessCount++ + reporter.report.Load.TotalPayloadLengthBytes += load.PayloadLengthBytes } - reporter.report.Load.TotalPayloadLengthBytes += load.PayloadLengthBytes - reporter.report.Load.AveragePayloadLengthBytes = reporter.report.Load.TotalPayloadLengthBytes / int64( - totalGeneratedLoad, - ) if reporter.report.Load.EarliestLoadSendTime.IsZero() || load.LoadGenerationTime.Before(reporter.report.Load.EarliestLoadSendTime) { @@ -137,6 +132,13 @@ func (reporter *Reporter) collectLoadMetrics() { startTime := reporter.report.Load.EarliestLoadSendTime timeToCompleteLoad := reporter.report.Load.LatestLoadSendTime.Sub(startTime) + if reporter.report.Load.SuccessCount != 0 { + reporter.report.Load.AveragePayloadLengthBytes = reporter.report.Load.TotalPayloadLengthBytes / int64( + reporter.report.Load.SuccessCount, + ) + } else { + reporter.report.Load.AveragePayloadLengthBytes = 0 + } reporter.report.Load.TotalTime = timeToCompleteLoad reporter.report.Load.TotalRequests = totalGeneratedLoad close(reporter.loadMetricsDoneChannel) @@ -154,12 +156,8 @@ func (reporter *Reporter) collectResponseMetrics() { reporter.report.Response.ErrorCountByType[response.Err.Error()]++ } else { reporter.report.Response.SuccessCount++ + reporter.report.Response.TotalResponsePayloadLengthBytes += response.PayloadLengthBytes } - reporter.report.Response.TotalResponsePayloadLengthBytes += response.PayloadLengthBytes - reporter.report.Response.AverageResponsePayloadLengthBytes = reporter.report.Response.TotalResponsePayloadLengthBytes / - int64( - totalResponses, - ) if reporter.report.Response.EarliestResponseReceivedTime.IsZero() || response.ResponseTime.Before( @@ -176,6 +174,14 @@ func (reporter *Reporter) collectResponseMetrics() { } } reporter.report.Response.TotalResponses = uint(totalResponses) + if reporter.report.Response.SuccessCount != 0 { + reporter.report.Response.AverageResponsePayloadLengthBytes = reporter.report.Response.TotalResponsePayloadLengthBytes / + int64( + reporter.report.Response.SuccessCount, + ) + } else { + reporter.report.Response.AverageResponsePayloadLengthBytes = 0 + } timeToCompleteResponses := reporter.report.Response.LatestResponseReceivedTime. Sub(reporter.report.Response.EarliestResponseReceivedTime) diff --git a/report/reporter_test.go b/report/reporter_test.go index 3e899b9..d9c4a13 100644 --- a/report/reporter_test.go +++ b/report/reporter_test.go @@ -72,7 +72,7 @@ func TestReportWithTotalRequests(t *testing.T) { } func TestReportWithPayloadLengthInGeneratingLoad(t *testing.T) { - loadGenerationChannel := make(chan LoadGenerationResponse, 1) + loadGenerationChannel := make(chan LoadGenerationResponse, 2) reporter := NewLoadGenerationMetricsCollectingReporter(loadGenerationChannel) reporter.Run() @@ -83,13 +83,57 @@ func TestReportWithPayloadLengthInGeneratingLoad(t *testing.T) { PayloadLengthBytes: 10, } - time.Sleep(2 * time.Millisecond) + time.Sleep(4 * time.Millisecond) close(loadGenerationChannel) + time.Sleep(2 * time.Millisecond) assert.Equal(t, int64(20), reporter.report.Load.TotalPayloadLengthBytes) assert.Equal(t, int64(10), reporter.report.Load.AveragePayloadLengthBytes) } +func TestReportWithPayloadLengthInGeneratingLoadWithAnError(t *testing.T) { + loadGenerationChannel := make(chan LoadGenerationResponse, 2) + reporter := NewLoadGenerationMetricsCollectingReporter(loadGenerationChannel) + reporter.Run() + + loadGenerationChannel <- LoadGenerationResponse{ + PayloadLengthBytes: 10, + } + loadGenerationChannel <- LoadGenerationResponse{ + Err: errors.New("test error"), + PayloadLengthBytes: 10, + } + + time.Sleep(4 * time.Millisecond) + close(loadGenerationChannel) + + time.Sleep(2 * time.Millisecond) + assert.Equal(t, int64(10), reporter.report.Load.TotalPayloadLengthBytes) + assert.Equal(t, int64(10), reporter.report.Load.AveragePayloadLengthBytes) +} + +func TestReportWithPayloadLengthInGeneratingLoadWithAllErrors(t *testing.T) { + loadGenerationChannel := make(chan LoadGenerationResponse, 2) + reporter := NewLoadGenerationMetricsCollectingReporter(loadGenerationChannel) + reporter.Run() + + loadGenerationChannel <- LoadGenerationResponse{ + Err: errors.New("test error"), + PayloadLengthBytes: 10, + } + loadGenerationChannel <- LoadGenerationResponse{ + Err: errors.New("test error"), + PayloadLengthBytes: 10, + } + + time.Sleep(4 * time.Millisecond) + close(loadGenerationChannel) + + time.Sleep(2 * time.Millisecond) + assert.Equal(t, int64(0), reporter.report.Load.TotalPayloadLengthBytes) + assert.Equal(t, int64(0), reporter.report.Load.AveragePayloadLengthBytes) +} + func TestReportWithLoadTimeInGeneratingLoad(t *testing.T) { loadGenerationChannel := make(chan LoadGenerationResponse, 1) reporter := NewLoadGenerationMetricsCollectingReporter(loadGenerationChannel) @@ -182,7 +226,7 @@ func TestReportWithTotalResponses(t *testing.T) { } func TestReportWithResponsePayloadLengthInReceivingResponse(t *testing.T) { - responseChannel := make(chan SubjectServerResponse, 1) + responseChannel := make(chan SubjectServerResponse, 2) reporter := NewResponseMetricsCollectingReporter(nil, responseChannel) reporter.Run() @@ -196,6 +240,8 @@ func TestReportWithResponsePayloadLengthInReceivingResponse(t *testing.T) { time.Sleep(2 * time.Millisecond) close(responseChannel) + time.Sleep(2 * time.Millisecond) + assert.Equal(t, int64(20), reporter.report.Response.TotalResponsePayloadLengthBytes) assert.Equal( t, @@ -204,6 +250,59 @@ func TestReportWithResponsePayloadLengthInReceivingResponse(t *testing.T) { ) } +func TestReportWithResponsePayloadLengthInReceivingResponseWithAnError(t *testing.T) { + responseChannel := make(chan SubjectServerResponse, 2) + reporter := NewResponseMetricsCollectingReporter(nil, responseChannel) + reporter.Run() + + responseChannel <- SubjectServerResponse{ + PayloadLengthBytes: 10, + } + responseChannel <- SubjectServerResponse{ + Err: errors.New("test error"), + PayloadLengthBytes: 10, + } + + time.Sleep(2 * time.Millisecond) + close(responseChannel) + + time.Sleep(2 * time.Millisecond) + + assert.Equal(t, int64(10), reporter.report.Response.TotalResponsePayloadLengthBytes) + assert.Equal( + t, + int64(10), + reporter.report.Response.AverageResponsePayloadLengthBytes, + ) +} + +func TestReportWithResponsePayloadLengthInReceivingResponseWithAllErrors(t *testing.T) { + responseChannel := make(chan SubjectServerResponse, 2) + reporter := NewResponseMetricsCollectingReporter(nil, responseChannel) + reporter.Run() + + responseChannel <- SubjectServerResponse{ + Err: errors.New("test error"), + PayloadLengthBytes: 10, + } + responseChannel <- SubjectServerResponse{ + Err: errors.New("test error"), + PayloadLengthBytes: 10, + } + + time.Sleep(2 * time.Millisecond) + close(responseChannel) + + time.Sleep(2 * time.Millisecond) + + assert.Equal(t, int64(0), reporter.report.Response.TotalResponsePayloadLengthBytes) + assert.Equal( + t, + int64(0), + reporter.report.Response.AverageResponsePayloadLengthBytes, + ) +} + func TestReportWithLoadTimeInReceivingResponse(t *testing.T) { responseChannel := make(chan SubjectServerResponse, 1) reporter := NewResponseMetricsCollectingReporter(nil, responseChannel) diff --git a/tests/blast_integration_test.go b/tests/blast_integration_test.go index d8ff3d2..a4f0d2a 100644 --- a/tests/blast_integration_test.go +++ b/tests/blast_integration_test.go @@ -151,3 +151,38 @@ func TestBlastWithLoadGenerationAndResponseReadingForMaximumDuration(t *testing. assert.True(t, totalRequestsMade < 2_00_000) } + +func TestBlastWithResponseReadingGivenTheTargetServerFailsInSendingResponses(t *testing.T) { + payloadSizeBytes := int64(10) + server, err := NewEchoServerWithNoWriteback("tcp", "localhost:10005", payloadSizeBytes, 2) + assert.Nil(t, err) + + server.accept(t) + defer server.stop() + + concurrency, totalRequests := uint(10), uint(20) + + groupOptions := workers.NewGroupOptions( + concurrency, + totalRequests, + []byte("HelloWorld"), + "localhost:10005", + ) + responseOptions := blast.ResponseOptions{ + ResponsePayloadSizeBytes: payloadSizeBytes, + TotalResponsesToRead: 20, + ReadingOption: blast.ReadTotalResponses, + } + buffer := &bytes.Buffer{} + blast.OutputStream = buffer + blast.NewBlastWithResponseReading(groupOptions, responseOptions, 5*time.Second) + + output := string(buffer.Bytes()) + + assert.True(t, strings.Contains(output, "ResponseMetrics")) + assert.True(t, strings.Contains(output, "TotalResponses: 20")) + assert.True(t, strings.Contains(output, "SuccessCount: 10")) + assert.True(t, strings.Contains(output, "ErrorCount: 10")) + assert.True(t, strings.Contains(output, "TotalResponsePayloadSize: 100 B")) + assert.True(t, strings.Contains(output, "AveragePayloadSize: 10 B")) +} diff --git a/tests/echo_server.go b/tests/echo_server.go index 2ad396c..5fe2a8c 100644 --- a/tests/echo_server.go +++ b/tests/echo_server.go @@ -9,10 +9,11 @@ import ( ) type EchoServer struct { - listener net.Listener - payloadSizeBytes int64 - stopChannel chan struct{} - totalRequests atomic.Uint32 + listener net.Listener + payloadSizeBytes int64 + stopChannel chan struct{} + totalRequests atomic.Uint32 + donotWritebackEveryKRequests uint } func NewEchoServer(network, address string, payloadSizeBytes int64) (*EchoServer, error) { @@ -28,6 +29,24 @@ func NewEchoServer(network, address string, payloadSizeBytes int64) (*EchoServer }, nil } +func NewEchoServerWithNoWriteback( + network, address string, + payloadSizeBytes int64, + donotWritebackEveryKRequests uint, +) (*EchoServer, error) { + listener, err := net.Listen(network, address) + if err != nil { + return nil, err + } + + return &EchoServer{ + listener: listener, + payloadSizeBytes: payloadSizeBytes, + stopChannel: make(chan struct{}), + donotWritebackEveryKRequests: donotWritebackEveryKRequests, + }, nil +} + func (server *EchoServer) accept(t *testing.T) { go func() { for { @@ -46,6 +65,7 @@ func (server *EchoServer) accept(t *testing.T) { func (server *EchoServer) handleConnection(connection net.Conn) { go func() { + requestCount := uint(0) for { select { case <-server.stopChannel: @@ -56,7 +76,12 @@ func (server *EchoServer) handleConnection(connection net.Conn) { _, _ = connection.Read(payload) server.totalRequests.Add(1) - _, _ = connection.Write(payload) + requestCount = requestCount + 1 + if server.donotWritebackEveryKRequests != 0 && + requestCount%server.donotWritebackEveryKRequests == 0 { + } else { + _, _ = connection.Write(payload) + } } } }()