From f11b92a5f8587ca5f1b3a7d86e23b203ed75d5e4 Mon Sep 17 00:00:00 2001 From: acjzz Date: Mon, 20 Nov 2023 14:27:37 +0400 Subject: [PATCH] chore: adds multiple topics benchamrks --- Makefile | 2 +- engine_bench_test.go | 89 ++++++++++++++++++++++++++++++++++++++++++-- 2 files changed, 87 insertions(+), 4 deletions(-) diff --git a/Makefile b/Makefile index ac8addc..d5d4fff 100644 --- a/Makefile +++ b/Makefile @@ -3,4 +3,4 @@ test: go test -timeout=10s -race -count=1 -failfast -shuffle=on ./... bench: - go test -bench ^Benchmark* -run XXX -benchtime 1s -count 2 -cpu 1,2 -benchmem | tee benchmark_output.txt + go test -bench ^Benchmark* -run XXX -benchtime 1s -count 1 -cpu 1,2 -benchmem | tee benchmark_output.txt diff --git a/engine_bench_test.go b/engine_bench_test.go index 9f62f5d..8b2a949 100644 --- a/engine_bench_test.go +++ b/engine_bench_test.go @@ -22,11 +22,22 @@ var topicBufferSize = []struct { {input: 8192}, } -func BenchmarkEngine(b *testing.B) { +var topicsConsumersProdcuers = []struct { + numTopics int + numProducers int + numConsumers int +}{ + {numTopics: 2, numProducers: 1, numConsumers: 2}, + {numTopics: 2, numProducers: 2, numConsumers: 1}, + {numTopics: 5, numProducers: 3, numConsumers: 6}, + {numTopics: 5, numProducers: 6, numConsumers: 3}, +} + +func BenchmarkSingleTopic(b *testing.B) { for _, ms := range messageBytes { - mbtitle := fmt.Sprintf("msg_bytes_%dk", ms.input/1024) + mbTitle := fmt.Sprintf("msg_%dkB", ms.input/1024) for _, bts := range topicBufferSize { - b.Run(fmt.Sprintf("%s/buffer_topic_%dk", mbtitle, bts.input/1024), func(b *testing.B) { + b.Run(fmt.Sprintf("%s/buffer_%dkB", mbTitle, bts.input/1024), func(b *testing.B) { // Create a new Engine engine := NewEngine(mockLogger) @@ -67,3 +78,75 @@ func BenchmarkEngine(b *testing.B) { } } } + +func BenchmarkMultipleTopics(b *testing.B) { + for _, ms := range messageBytes { + mbTitle := fmt.Sprintf("msg_%dkB", ms.input/1024) + for _, bts := range topicBufferSize { + btsTitle := fmt.Sprintf("buffer_%dkB", bts.input/1024) + for _, tpc := range topicsConsumersProdcuers { + tpcTitle := fmt.Sprintf("%dtop_%dprod_%dcon", tpc.numTopics, tpc.numProducers, tpc.numConsumers) + b.Run(fmt.Sprintf("%s/%s/%s", mbTitle, btsTitle, tpcTitle), func(b *testing.B) { + // Create a new Engine + engine := NewEngine(mockLogger) + + counters := []*counter{} + dones := []chan struct{}{} + + for topicIndex := 0; topicIndex < tpc.numTopics; topicIndex++ { + // Create a shared counter for the topic and consumers + topicName := fmt.Sprintf("Topic%d", topicIndex) + _ = engine.RegisterTopic(topicName, bts.input) + + counters = append(counters, &counter{}) + dones = append(dones, make(chan struct{})) + + // Initialize consumers + for i := 0; i < tpc.numConsumers; i++ { + go func(topicIndex int) { + mockHandler := func(receivedMsg interface{}) { + counters[topicIndex].Increment() + if counters[topicIndex].Value() == tpc.numProducers*b.N { + select { + case <-dones[topicIndex]: + // Channel is already closed + default: + // Channel is open, so close it + close(dones[topicIndex]) + } + } + } + consumer, _ := engine.GetConsumer(topicName, mockHandler) + consumer.Run() + + <-dones[topicIndex] + }(topicIndex) + } + } + + b.ResetTimer() + for topicIndex := 0; topicIndex < tpc.numTopics; topicIndex++ { + topicName := fmt.Sprintf("Topic%d", topicIndex) + // Initialize producers + for i := 0; i < tpc.numProducers; i++ { + go func(topicName string) { + for i := 0; i < b.N; i++ { + // Publish messages to topics + producer, _ := engine.GetProducer(topicName) + _ = producer.Publish(bytes.Repeat([]byte{'1'}, ms.input)) + } + }(topicName) + } + } + + for topicIndex := 0; topicIndex < tpc.numTopics; topicIndex++ { + <-dones[topicIndex] + } + + // Stop the engine + engine.Stop() + }) + } + } + } +}