Skip to content

Commit

Permalink
[TEST only] A test for memory leak when creating/deleting consumers (#…
Browse files Browse the repository at this point in the history
…5607)

Verifies the fix in #5600

When ran locally, it was using ~180MB extra for the test before, and
under 40Mb after.

(this is a replacement for #5603 that ran into some git issues.
  • Loading branch information
derekcollison authored Jul 22, 2024
2 parents a14c364 + a1e2de7 commit 181e79d
Showing 1 changed file with 117 additions and 0 deletions.
117 changes: 117 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -2329,3 +2330,119 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) {
}
}
}

// https://github.com/nats-io/nats-server/pull/5600
func TestJetStreamClusterConsumerLeak(t *testing.T) {
N := 2000 // runs in under 10s, but significant enough to see the difference.
NConcurrent := 100

clusterConf := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leafnodes {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3)
defer cl.shutdown()
cl.waitOnLeader()

s := cl.randomNonLeader()

// Create the test stream.
streamName := "LEAK_TEST_STREAM"
nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p"))
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"$SOMETHING.>"},
Storage: nats.FileStorage,
Retention: nats.InterestPolicy,
Replicas: 3,
})
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}

concurrent := make(chan struct{}, NConcurrent)
for i := 0; i < NConcurrent; i++ {
concurrent <- struct{}{}
}
errors := make(chan error, N)

wg := sync.WaitGroup{}
wg.Add(N)

// Gather the stats for comparison.
before := &runtime.MemStats{}
runtime.GC()
runtime.ReadMemStats(before)

for i := 0; i < N; {
// wait for a slot to open up
<-concurrent
i++
go func() {
defer func() {
concurrent <- struct{}{}
wg.Done()
}()

nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p"))
defer nc.Close()

consumerName := "sessid_" + nuid.Next()
_, err := js.AddConsumer(streamName, &nats.ConsumerConfig{
DeliverSubject: "inbox",
Durable: consumerName,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverNewPolicy,
FilterSubject: "$SOMETHING.ELSE.subject",
AckWait: 30 * time.Second,
MaxAckPending: 1024,
})
if err != nil {
errors <- fmt.Errorf("Error on JetStream consumer creation: %v", err)
return
}

err = js.DeleteConsumer(streamName, consumerName)
if err != nil {
errors <- fmt.Errorf("Error on JetStream consumer deletion: %v", err)
}
}()
}

wg.Wait()
if len(errors) > 0 {
for err := range errors {
t.Fatalf("%v", err)
}
}

after := &runtime.MemStats{}
runtime.GC()
runtime.ReadMemStats(after)

// Before https://github.com/nats-io/nats-server/pull/5600 this test was
// adding 180Mb+ to HeapInuse. Now it's under 40Mb (ran locally on a Mac)
limit := before.HeapInuse + 100*1024*1024 // 100MB
if after.HeapInuse > before.HeapInuse+limit {
t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse)
}
}

0 comments on commit 181e79d

Please sign in to comment.