Skip to content

Commit

Permalink
Moved to jetstream_cluster_4_test.go and simplified
Browse files Browse the repository at this point in the history
  • Loading branch information
levb committed Jun 28, 2024
1 parent 6af9b25 commit a1e2de7
Show file tree
Hide file tree
Showing 4 changed files with 136 additions and 252 deletions.
117 changes: 117 additions & 0 deletions server/jetstream_cluster_4_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"math/rand"
"os"
"path/filepath"
"runtime"
"strconv"
"strings"
"sync"
Expand Down Expand Up @@ -2329,3 +2330,119 @@ func TestJetStreamClusterAckFloorBetweenLeaderAndFollowers(t *testing.T) {
}
}
}

// https://github.com/nats-io/nats-server/pull/5600
func TestJetStreamClusterConsumerLeak(t *testing.T) {
N := 2000 // runs in under 10s, but significant enough to see the difference.
NConcurrent := 100

clusterConf := `
listen: 127.0.0.1:-1
server_name: %s
jetstream: {max_mem_store: 256MB, max_file_store: 2GB, store_dir: '%s'}
leafnodes {
listen: 127.0.0.1:-1
}
cluster {
name: %s
listen: 127.0.0.1:%d
routes = [%s]
}
accounts {
ONE { users = [ { user: "one", pass: "p" } ]; jetstream: enabled }
$SYS { users = [ { user: "admin", pass: "s3cr3t!" } ] }
}
`

cl := createJetStreamClusterWithTemplate(t, clusterConf, "Leak-test", 3)
defer cl.shutdown()
cl.waitOnLeader()

s := cl.randomNonLeader()

// Create the test stream.
streamName := "LEAK_TEST_STREAM"
nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p"))
defer nc.Close()
_, err := js.AddStream(&nats.StreamConfig{
Name: streamName,
Subjects: []string{"$SOMETHING.>"},
Storage: nats.FileStorage,
Retention: nats.InterestPolicy,
Replicas: 3,
})
if err != nil {
t.Fatalf("Error creating stream: %v", err)
}

concurrent := make(chan struct{}, NConcurrent)
for i := 0; i < NConcurrent; i++ {
concurrent <- struct{}{}
}
errors := make(chan error, N)

wg := sync.WaitGroup{}
wg.Add(N)

// Gather the stats for comparison.
before := &runtime.MemStats{}
runtime.GC()
runtime.ReadMemStats(before)

for i := 0; i < N; {
// wait for a slot to open up
<-concurrent
i++
go func() {
defer func() {
concurrent <- struct{}{}
wg.Done()
}()

nc, js := jsClientConnect(t, s, nats.UserInfo("one", "p"))
defer nc.Close()

consumerName := "sessid_" + nuid.Next()
_, err := js.AddConsumer(streamName, &nats.ConsumerConfig{
DeliverSubject: "inbox",
Durable: consumerName,
AckPolicy: nats.AckExplicitPolicy,
DeliverPolicy: nats.DeliverNewPolicy,
FilterSubject: "$SOMETHING.ELSE.subject",
AckWait: 30 * time.Second,
MaxAckPending: 1024,
})
if err != nil {
errors <- fmt.Errorf("Error on JetStream consumer creation: %v", err)
return
}

err = js.DeleteConsumer(streamName, consumerName)
if err != nil {
errors <- fmt.Errorf("Error on JetStream consumer deletion: %v", err)
}
}()
}

wg.Wait()
if len(errors) > 0 {
for err := range errors {
t.Fatalf("%v", err)
}
}

after := &runtime.MemStats{}
runtime.GC()
runtime.ReadMemStats(after)

// Before https://github.com/nats-io/nats-server/pull/5600 this test was
// adding 180Mb+ to HeapInuse. Now it's under 40Mb (ran locally on a Mac)
limit := before.HeapInuse + 100*1024*1024 // 100MB
if after.HeapInuse > before.HeapInuse+limit {
t.Fatalf("Extra memory usage too high: %v", after.HeapInuse-before.HeapInuse)
}
}
205 changes: 0 additions & 205 deletions server/mqtt_ex_leak_investigate_test.go

This file was deleted.

15 changes: 0 additions & 15 deletions server/mqtt_ex_test_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,6 @@ import (
"encoding/json"
"fmt"
"io"
"net"
"os"
"os/exec"
"strconv"
Expand All @@ -38,7 +37,6 @@ type mqttTarget struct {
clusters []*cluster
configs []mqttTestConfig
all []mqttDial
allNATS []string
}

type mqttTestConfig struct {
Expand Down Expand Up @@ -196,16 +194,6 @@ func (d mqttDial) Get() (u, p, s, c string) {
return u, p, s, c
}

func (d mqttDial) GetHostPort() (host string, port int) {
_, _, s, _ := d.Get()
host, portS, err := net.SplitHostPort(s)
if err != nil {
return s, 0
}
port, _ = strconv.Atoi(portS)
return host, port
}

func (d mqttDial) Name() string {
_, _, _, c := d.Get()
return c
Expand Down Expand Up @@ -297,16 +285,13 @@ func mqttMakeTestCluster(size int, domain string) func(tb testing.TB) *mqttTarge
cl.waitOnLeader()

all := []mqttDial{}
allNATS := []string{}
for _, s := range cl.servers {
all = append(all, mqttNewDialForServer(s, "one", "p"))
allNATS = append(allNATS, string(mqttNewDial("one", "p", s.getOpts().Host, s.getOpts().Port, "")))
}

return &mqttTarget{
clusters: []*cluster{cl},
all: all,
allNATS: allNATS,
configs: []mqttTestConfig{
{
name: "publish to one",
Expand Down
Loading

0 comments on commit a1e2de7

Please sign in to comment.