Skip to content

Commit

Permalink
Sarthak | ResponseReader now tracks the number of responses read
Browse files Browse the repository at this point in the history
  • Loading branch information
SarthakMakhija committed Aug 16, 2023
1 parent ee78801 commit c37b3e1
Show file tree
Hide file tree
Showing 4 changed files with 53 additions and 10 deletions.
9 changes: 5 additions & 4 deletions report/reporter.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,8 +42,8 @@ type Reporter struct {

func NewLoadGenerationMetricsCollectingReporter(
loadGenerationChannel chan LoadGenerationResponse,
) Reporter {
return Reporter{
) *Reporter {
return &Reporter{
report: &Report{
Load: LoadMetrics{
ErrorCountByType: make(map[string]uint),
Expand All @@ -53,14 +53,15 @@ func NewLoadGenerationMetricsCollectingReporter(
},
},
loadGenerationChannel: loadGenerationChannel,
responseChannel: nil,
}
}

func NewResponseMetricsCollectingReporter(
loadGenerationChannel chan LoadGenerationResponse,
responseChannel chan SubjectServerResponse,
) Reporter {
return Reporter{
) *Reporter {
return &Reporter{
report: &Report{
Load: LoadMetrics{
ErrorCountByType: make(map[string]uint),
Expand Down
22 changes: 16 additions & 6 deletions report/response_reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import (
"errors"
"io"
"net"
"sync/atomic"
"time"
)

Expand All @@ -21,19 +22,23 @@ type SubjectServerResponse struct {
}

type ResponseReader struct {
responseSizeBytes int64
stopChannel chan struct{}
responseChannel chan SubjectServerResponse
responseSizeBytes int64
totalResponsesToRead uint32
readResponses atomic.Uint32
stopChannel chan struct{}
responseChannel chan SubjectServerResponse
}

func NewResponseReader(
responseSizeBytes int64,
totalResponsesToRead uint,
responseChannel chan SubjectServerResponse,
) *ResponseReader {
return &ResponseReader{
responseSizeBytes: responseSizeBytes,
stopChannel: make(chan struct{}),
responseChannel: responseChannel,
responseSizeBytes: responseSizeBytes,
totalResponsesToRead: uint32(totalResponsesToRead),
stopChannel: make(chan struct{}),
responseChannel: responseChannel,
}
}

Expand Down Expand Up @@ -69,6 +74,7 @@ func (responseReader *ResponseReader) StartReading(connection net.Conn) {
PayloadLengthBytes: int64(len(buffer)),
}
}
responseReader.readResponses.Add(1)
}
}
}(connection)
Expand All @@ -77,3 +83,7 @@ func (responseReader *ResponseReader) StartReading(connection net.Conn) {
func (responseReader *ResponseReader) close() {
close(responseReader.stopChannel)
}

func (responseReader *ResponseReader) TotalResponsesRead() uint32 {
return responseReader.readResponses.Load()
}
31 changes: 31 additions & 0 deletions tests/response_reader_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ func TestReadsResponseFromASingleConnection(t *testing.T) {

responseReader := report.NewResponseReader(
payloadSizeBytes,
1,
responseChannel,
)
responseReader.StartReading(connection)
Expand Down Expand Up @@ -65,6 +66,7 @@ func TestReadsResponseFromTwoConnections(t *testing.T) {

responseReader := report.NewResponseReader(
payloadSizeBytes,
2,
responseChannel,
)
responseReader.StartReading(connection)
Expand All @@ -75,6 +77,35 @@ func TestReadsResponseFromTwoConnections(t *testing.T) {
assert.Equal(t, []byte("HelloWorld"), responses[1])
}

func TestTracksTheNumberOfResponsesRead(t *testing.T) {
payloadSizeBytes := int64(10)
server, err := NewEchoServer("tcp", "localhost:9092", payloadSizeBytes)
assert.Nil(t, err)

server.accept(t)

connection := connectTo(t, "localhost:9092")
writeTo(t, connection, []byte("HelloWorld"))

responseChannel := make(chan report.SubjectServerResponse)

defer func() {
server.stop()
close(responseChannel)
_ = connection.Close()
}()

responseReader := report.NewResponseReader(
payloadSizeBytes,
1,
responseChannel,
)
responseReader.StartReading(connection)

_ = <-responseChannel
assert.Equal(t, uint32(1), responseReader.TotalResponsesRead())
}

func connectTo(t *testing.T, address string) net.Conn {
connection, err := net.Dial("tcp", address)
assert.Nil(t, err)
Expand Down
1 change: 1 addition & 0 deletions tests/worker_group_integration_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@ func TestSendsARequestAndReadsResponseWithSingleConnection(t *testing.T) {
),
report.NewResponseReader(
responseSizeBytes,
totalRequests,
responseChannel,
),
).Run()
Expand Down

0 comments on commit c37b3e1

Please sign in to comment.