Skip to content

Commit

Permalink
chore: adds multiple topics benchamrks
Browse files Browse the repository at this point in the history
  • Loading branch information
acjzz committed Nov 20, 2023
1 parent 400f139 commit 4dcb94a
Show file tree
Hide file tree
Showing 2 changed files with 87 additions and 4 deletions.
2 changes: 1 addition & 1 deletion Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -3,4 +3,4 @@ test:
go test -timeout=10s -race -count=1 -failfast -shuffle=on ./...

bench:
go test -bench ^Benchmark* -run XXX -benchtime 1s -count 2 -cpu 1,2 -benchmem | tee benchmark_output.txt
go test -bench ^Benchmark* -run XXX -benchtime 1s -count 1 -cpu 1,2 -benchmem | tee benchmark_output.txt
89 changes: 86 additions & 3 deletions engine_bench_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,11 +22,22 @@ var topicBufferSize = []struct {
{input: 8192},
}

func BenchmarkEngine(b *testing.B) {
var topicsConsumersProdcuers = []struct {
numTopics int
numProducers int
numConsumers int
}{
{numTopics: 2, numProducers: 1, numConsumers: 2},
{numTopics: 2, numProducers: 2, numConsumers: 1},
{numTopics: 5, numProducers: 3, numConsumers: 6},
{numTopics: 5, numProducers: 6, numConsumers: 3},
}

func BenchmarkSingleTopic(b *testing.B) {
for _, ms := range messageBytes {
mbtitle := fmt.Sprintf("msg_bytes_%dk", ms.input/1024)
mbTitle := fmt.Sprintf("msg_%dkB", ms.input/1024)
for _, bts := range topicBufferSize {
b.Run(fmt.Sprintf("%s/buffer_topic_%dk", mbtitle, bts.input/1024), func(b *testing.B) {
b.Run(fmt.Sprintf("%s/buffer_%dkB", mbTitle, bts.input/1024), func(b *testing.B) {
// Create a new Engine
engine := NewEngine(mockLogger)

Expand Down Expand Up @@ -67,3 +78,75 @@ func BenchmarkEngine(b *testing.B) {
}
}
}

func BenchmarkMultipleTopics(b *testing.B) {
for _, ms := range messageBytes {
mbTitle := fmt.Sprintf("msg_%dkB", ms.input/1024)
for _, bts := range topicBufferSize {
btsTitle := fmt.Sprintf("buffer_%dkB", bts.input/1024)
for _, tpc := range topicsConsumersProdcuers {
tpcTitle := fmt.Sprintf("%dtop_%dprod_%dcon", tpc.numTopics, tpc.numProducers, tpc.numConsumers)
b.Run(fmt.Sprintf("%s/%s/%s", mbTitle, btsTitle, tpcTitle), func(b *testing.B) {
// Create a new Engine
engine := NewEngine(mockLogger)

counters := []*counter{}
dones := []chan struct{}{}

for topicIndex := 0; topicIndex < tpc.numTopics; topicIndex++ {
// Create a shared counter for the topic and consumers
topicName := fmt.Sprintf("Topic%d", topicIndex)
_ = engine.RegisterTopic(topicName, bts.input)

counters = append(counters, &counter{})
dones = append(dones, make(chan struct{}))

// Initialize consumers
for i := 0; i < tpc.numConsumers; i++ {
go func(topicIndex int) {
mockHandler := func(receivedMsg interface{}) {
counters[topicIndex].Increment()
if counters[topicIndex].Value() == tpc.numProducers*b.N {
select {
case <-dones[topicIndex]:
// Channel is already closed
default:
// Channel is open, so close it
close(dones[topicIndex])
}
}
}
consumer, _ := engine.GetConsumer(topicName, mockHandler)
consumer.Run()

<-dones[topicIndex]
}(topicIndex)
}
}

b.ResetTimer()
for topicIndex := 0; topicIndex < tpc.numTopics; topicIndex++ {
topicName := fmt.Sprintf("Topic%d", topicIndex)
// Initialize producers
for i := 0; i < tpc.numProducers; i++ {
go func(topicName string) {
for i := 0; i < b.N; i++ {
// Publish messages to topics
producer, _ := engine.GetProducer(topicName)
_ = producer.Publish(bytes.Repeat([]byte{'1'}, ms.input))
}
}(topicName)
}
}

for topicIndex := 0; topicIndex < tpc.numTopics; topicIndex++ {
<-dones[topicIndex]
}

// Stop the engine
engine.Stop()
})
}
}
}
}

0 comments on commit 4dcb94a

Please sign in to comment.