From 0bcac5f0211af5b79ace4e1bb9694d745d466445 Mon Sep 17 00:00:00 2001 From: Tamal Saha Date: Mon, 16 Oct 2023 01:22:35 -0700 Subject: [PATCH] Use nats.go v1.31.0 Signed-off-by: Tamal Saha --- go.mod | 3 +- go.sum | 10 +- vendor/github.com/nats-io/nats.go/.travis.yml | 2 +- vendor/github.com/nats-io/nats.go/README.md | 2 +- .../nats-io/nats.go/dependencies.md | 22 +- .../nats-io/nats.go/jetstream/ordered.go | 19 +- .../nats-io/nats.go/jetstream/pull.go | 202 ++++++++++++++---- vendor/github.com/nats-io/nats.go/nats.go | 24 +-- .../nats-io/nats.go/testing_internal.go | 59 +++++ vendor/modules.txt | 4 +- 10 files changed, 255 insertions(+), 92 deletions(-) create mode 100644 vendor/github.com/nats-io/nats.go/testing_internal.go diff --git a/go.mod b/go.mod index 1550b16a..c954cf35 100644 --- a/go.mod +++ b/go.mod @@ -11,7 +11,7 @@ require ( github.com/google/go-github/v55 v55.0.0 github.com/klauspost/cpuid/v2 v2.2.3 github.com/linode/linodego v1.22.0 - github.com/nats-io/nats.go v1.30.3-0.20231012190013-e4ae3183a8b0 + github.com/nats-io/nats.go v1.31.0 github.com/olekukonko/tablewriter v0.0.6-0.20230925090304-df64c4bbad77 github.com/pkg/errors v0.9.1 github.com/sirupsen/logrus v1.9.0 @@ -94,7 +94,6 @@ require ( github.com/mitchellh/reflectwalk v1.0.0 // indirect github.com/modern-go/concurrent v0.0.0-20180306012644-bacd9c7ef1dd // indirect github.com/modern-go/reflect2 v1.0.2 // indirect - github.com/nats-io/nats-server/v2 v2.9.14 // indirect github.com/nats-io/nkeys v0.4.5 // indirect github.com/nats-io/nuid v1.0.1 // indirect github.com/oklog/ulid v1.3.1 // indirect diff --git a/go.sum b/go.sum index 8190f106..a2c99947 100644 --- a/go.sum +++ b/go.sum @@ -1395,7 +1395,6 @@ github.com/miekg/dns v1.1.41/go.mod h1:p6aan82bvRIyn+zDIv9xYNUpwa73JcSh9BKwknJys github.com/miekg/dns v1.1.48/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= github.com/miekg/dns v1.1.50/go.mod h1:e3IlAVfNqAllflbibAZEWOXOQ+Ynzk/dDozDxY7XnME= github.com/miekg/pkcs11 v1.0.3/go.mod h1:XsNlhZGX73bx86s2hdc/FuaLm2CPZJemRLMA+WTFxgs= -github.com/minio/highwayhash v1.0.2 h1:Aak5U0nElisjDCfPSG79Tgzkn2gl66NxOMspRrKnA/g= github.com/mistifyio/go-zfs v2.1.2-0.20190413222219-f784269be439+incompatible/go.mod h1:8AuVvqP/mXw1px98n46wfvcGfQ4ci2FwoAjKYxuo3Z4= github.com/mitchellh/cli v1.0.0/go.mod h1:hNIlj7HEI86fIcpObd7a0FcrxTWetlwJDGcceTlRvqc= github.com/mitchellh/cli v1.1.0/go.mod h1:xcISNoH86gajksDmfB23e/pu+B+GeFRMYmoHXxx3xhI= @@ -1448,15 +1447,11 @@ github.com/mwitkow/go-conntrack v0.0.0-20161129095857-cc309e4a2223/go.mod h1:qRW github.com/mwitkow/go-conntrack v0.0.0-20190716064945-2f068394615f/go.mod h1:qRWi+5nqEBWmkhHvq77mSJWrCKwh8bxhgT7d/eI7P4U= github.com/mxk/go-flowrate v0.0.0-20140419014527-cca7078d478f/go.mod h1:ZdcZmHo+o7JKHSa8/e818NopupXU1YMK5fe1lsApnBw= github.com/nats-io/jwt v0.3.0/go.mod h1:fRYCDE99xlTsqUzISS1Bi75UBJ6ljOJQOAAu5VglpSg= -github.com/nats-io/jwt v0.3.2 h1:+RB5hMpXUUA2dfxuhBTEkMOrYmM+gKIZYS1KjSostMI= github.com/nats-io/jwt v0.3.2/go.mod h1:/euKqTS1ZD+zzjYrY7pseZrTtWQSjujC7xjPc8wL6eU= -github.com/nats-io/jwt/v2 v2.3.0 h1:z2mA1a7tIf5ShggOFlR1oBPgd6hGqcDYsISxZByUzdI= github.com/nats-io/nats-server/v2 v2.1.2/go.mod h1:Afk+wRZqkMQs/p45uXdrVLuab3gwv3Z8C4HTBu8GD/k= -github.com/nats-io/nats-server/v2 v2.9.14 h1:n2GscWVgXpA14vQSRP/MM1SGi4wyazR9l19/gWxqgXQ= -github.com/nats-io/nats-server/v2 v2.9.14/go.mod h1:40ZwFm4npKdFBhOdY7rkh3YyI1oI91FzLvlYyB7HfzM= github.com/nats-io/nats.go v1.9.1/go.mod h1:ZjDU1L/7fJ09jvUSRVBR2e7+RnLiiIQyqyzEE/Zbp4w= -github.com/nats-io/nats.go v1.30.3-0.20231012190013-e4ae3183a8b0 h1:J2W5ZrIHKXQTNkkugS8HLQSslkcPj2r2Qk+J0byukog= -github.com/nats-io/nats.go v1.30.3-0.20231012190013-e4ae3183a8b0/go.mod h1:dcfhUgmQNN4GJEfIb2f9R7Fow+gzBF4emzDHrVBd5qM= +github.com/nats-io/nats.go v1.31.0 h1:/WFBHEc/dOKBF6qf1TZhrdEfTmOZ5JzdJ+Y3m6Y/p7E= +github.com/nats-io/nats.go v1.31.0/go.mod h1:di3Bm5MLsoB4Bx61CBTsxuarI36WbhAwOm8QrW39+i8= github.com/nats-io/nkeys v0.1.0/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.1.3/go.mod h1:xpnFELMwJABBLVhffcfd1MZx6VsNRFpEugbxziKVo7w= github.com/nats-io/nkeys v0.4.5 h1:Zdz2BUlFm4fJlierwvGK+yl20IAKUm7eV6AAZXEhkPk= @@ -2284,7 +2279,6 @@ golang.org/x/time v0.0.0-20220224211638-0e9765cccd65/go.mod h1:tRJNPiyCQ0inRvYxb golang.org/x/time v0.0.0-20220922220347-f3bd1da661af/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.1.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= golang.org/x/time v0.2.0/go.mod h1:tRJNPiyCQ0inRvYxbN9jk5I+vvW/OXSQhTDSoE431IQ= -golang.org/x/time v0.3.0 h1:rg5rLMjNzMS1RkNLzCG38eapWhnYLFYXDXj2gOlr8j4= golang.org/x/tools v0.0.0-20180221164845-07fd8470d635/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180828015842-6cd1fcedba52/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= golang.org/x/tools v0.0.0-20180917221912-90fa682c2a6e/go.mod h1:n7NCudcB/nEzxVGmLbDWY5pfWTLqBcC2KZ6jyYvM4mQ= diff --git a/vendor/github.com/nats-io/nats.go/.travis.yml b/vendor/github.com/nats-io/nats.go/.travis.yml index 36879705..1505f773 100644 --- a/vendor/github.com/nats-io/nats.go/.travis.yml +++ b/vendor/github.com/nats-io/nats.go/.travis.yml @@ -22,7 +22,7 @@ before_script: - golangci-lint run ./jetstream/... script: - go test -modfile=go_test.mod -v -run=TestNoRace -p=1 ./... --failfast -vet=off -- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off; fi +- if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then ./scripts/cov.sh TRAVIS; else go test -modfile=go_test.mod -race -v -p=1 ./... --failfast -vet=off -tags=internal_testing; fi after_success: - if [[ "$TRAVIS_GO_VERSION" =~ 1.21 ]]; then $HOME/gopath/bin/goveralls -coverprofile=acc.out -service travis-ci; fi diff --git a/vendor/github.com/nats-io/nats.go/README.md b/vendor/github.com/nats-io/nats.go/README.md index 108db4e3..042733da 100644 --- a/vendor/github.com/nats-io/nats.go/README.md +++ b/vendor/github.com/nats-io/nats.go/README.md @@ -29,7 +29,7 @@ When using or transitioning to Go modules support: ```bash # Go client latest or explicit version go get github.com/nats-io/nats.go/@latest -go get github.com/nats-io/nats.go/@v1.30.2 +go get github.com/nats-io/nats.go/@v1.31.0 # For latest NATS Server, add /v2 at the end go get github.com/nats-io/nats-server/v2 diff --git a/vendor/github.com/nats-io/nats.go/dependencies.md b/vendor/github.com/nats-io/nats.go/dependencies.md index d54bcf98..ec9ab3c6 100644 --- a/vendor/github.com/nats-io/nats.go/dependencies.md +++ b/vendor/github.com/nats-io/nats.go/dependencies.md @@ -2,14 +2,14 @@ This file lists the dependencies used in this repository. -| Dependency | License | -|-|-| -| Go | BSD 3-Clause "New" or "Revised" License | -| github.com/golang/protobuf v1.4.2 | BSD 3-Clause "New" or "Revised" License | -| github.com/nats-io/nats-server/v2 v2.10.0 | Apache License 2.0 | -| github.com/nats-io/nkeys v0.4.5 | Apache License 2.0 | -| github.com/nats-io/nuid v1.0.1 | Apache License 2.0 | -| google.golang.org/protobuf v1.23.0 | BSD 3-Clause License | -| github.com/klauspost/compress v1.17.0 | Apache License 2.0 | -| go.uber.org/goleak v1.2.1 | MIT License | -| golang.org/x/text | BSD 3-Clause License | +| Dependency | License | +|-----------------------------------|--------------| +| Go | BSD 3-Clause | +| github.com/golang/protobuf/proto | BSD-3-Clause | +| github.com/klauspost/compress | BSD-3-Clause | +| github.com/nats-io/nats-server/v2 | Apache-2.0 | +| github.com/nats-io/nkeys | Apache-2.0 | +| github.com/nats-io/nuid | Apache-2.0 | +| go.uber.org/goleak | MIT | +| golang.org/x/text | BSD-3-Clause | +| google.golang.org/protobuf | BSD-3-Clause | diff --git a/vendor/github.com/nats-io/nats.go/jetstream/ordered.go b/vendor/github.com/nats-io/nats.go/jetstream/ordered.go index e462981e..92474464 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/ordered.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/ordered.go @@ -69,7 +69,7 @@ var errOrderedSequenceMismatch = errors.New("sequence mismatch") // Consume can be used to continuously receive messages and handle them with the provided callback function func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) (ConsumeContext, error) { - if c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume && c.currentConsumer == nil { + if (c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume) && c.currentConsumer == nil { err := c.reset() if err != nil { return nil, err @@ -81,7 +81,7 @@ func (c *orderedConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt return nil, ErrOrderConsumerUsedAsFetch } c.consumerType = consumerTypeConsume - consumeOpts, err := parseConsumeOpts(opts...) + consumeOpts, err := parseConsumeOpts(true, opts...) if err != nil { return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) } @@ -178,7 +178,8 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err } if errors.Is(err, ErrNoHeartbeat) || errors.Is(err, errOrderedSequenceMismatch) || - errors.Is(err, ErrConsumerDeleted) { + errors.Is(err, ErrConsumerDeleted) || + errors.Is(err, ErrConsumerNotFound) { // only reset if serial matches the current consumer serial and there is no reset in progress if serial == c.serial && atomic.LoadUint32(&c.resetInProgress) == 0 { atomic.StoreUint32(&c.resetInProgress, 1) @@ -190,7 +191,7 @@ func (c *orderedConsumer) errHandler(serial int) func(cc ConsumeContext, err err // Messages returns [MessagesContext], allowing continuously iterating over messages on a stream. func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error) { - if c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume && c.currentConsumer == nil { + if (c.consumerType == consumerTypeNotSet || c.consumerType == consumerTypeConsume) && c.currentConsumer == nil { err := c.reset() if err != nil { return nil, err @@ -202,7 +203,7 @@ func (c *orderedConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, er return nil, ErrOrderConsumerUsedAsFetch } c.consumerType = consumerTypeConsume - consumeOpts, err := parseMessagesOpts(opts...) + consumeOpts, err := parseMessagesOpts(true, opts...) if err != nil { return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) } @@ -386,13 +387,19 @@ func (c *orderedConsumer) reset() error { defer c.Unlock() defer atomic.StoreUint32(&c.resetInProgress, 0) if c.currentConsumer != nil { + c.currentConsumer.Lock() + if c.currentConsumer.subscriptions[""] != nil { + c.currentConsumer.subscriptions[""].Stop() + } + consName := c.currentConsumer.CachedInfo().Name + c.currentConsumer.Unlock() var err error for i := 0; ; i++ { if c.cfg.MaxResetAttempts > 0 && i == c.cfg.MaxResetAttempts { return fmt.Errorf("%w: maximum number of delete attempts reached: %s", ErrOrderedConsumerReset, err) } ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second) - err = c.jetStream.DeleteConsumer(ctx, c.stream, c.currentConsumer.CachedInfo().Name) + err = c.jetStream.DeleteConsumer(ctx, c.stream, consName) cancel() if err != nil { if errors.Is(err, ErrConsumerNotFound) { diff --git a/vendor/github.com/nats-io/nats.go/jetstream/pull.go b/vendor/github.com/nats-io/nats.go/jetstream/pull.go index a3601ece..148d1281 100644 --- a/vendor/github.com/nats-io/nats.go/jetstream/pull.go +++ b/vendor/github.com/nats-io/nats.go/jetstream/pull.go @@ -156,7 +156,7 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( if handler == nil { return nil, ErrHandlerRequired } - consumeOpts, err := parseConsumeOpts(opts...) + consumeOpts, err := parseConsumeOpts(false, opts...) if err != nil { return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) } @@ -280,32 +280,46 @@ func (p *pullConsumer) Consume(handler MessageHandler, opts ...PullConsumeOpt) ( if !isConnected { isConnected = true // try fetching consumer info several times to make sure consumer is available after reconnect - for i := 0; i < 5; i++ { + backoffOpts := backoffOpts{ + attempts: 10, + initialInterval: 1 * time.Second, + disableInitialExecution: true, + factor: 2, + maxInterval: 10 * time.Second, + cancel: sub.done, + } + err = retryWithBackoff(func(attempt int) (bool, error) { + isClosed := atomic.LoadUint32(&sub.closed) == 1 + if isClosed { + return false, nil + } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() _, err := p.Info(ctx) - cancel() - if err == nil { - break - } if err != nil { - if i == 4 { - sub.cleanup() - if sub.consumeOpts.ErrHandler != nil { - sub.consumeOpts.ErrHandler(sub, err) + if sub.consumeOpts.ErrHandler != nil { + err = fmt.Errorf("[%d] attempting to fetch consumer info after reconnect: %w", attempt, err) + if attempt == backoffOpts.attempts-1 { + err = errors.Join(err, fmt.Errorf("maximum retry attempts reached")) } - sub.Unlock() - return + sub.consumeOpts.ErrHandler(sub, err) } + return true, err } - time.Sleep(5 * time.Second) - } - batchSize := sub.consumeOpts.MaxMessages - if sub.consumeOpts.StopAfter > 0 { - batchSize = min(batchSize, sub.consumeOpts.StopAfter-sub.delivered) + return false, nil + }, backoffOpts) + if err != nil { + if sub.consumeOpts.ErrHandler != nil { + sub.consumeOpts.ErrHandler(sub, err) + } + sub.Unlock() + sub.cleanup() + return } + sub.fetchNext <- &pullRequest{ Expires: sub.consumeOpts.Expires, - Batch: batchSize, + Batch: sub.consumeOpts.MaxMessages, MaxBytes: sub.consumeOpts.MaxBytes, Heartbeat: sub.consumeOpts.Heartbeat, } @@ -404,7 +418,7 @@ func (s *pullSubscription) checkPending() { // [PullHeartbeat] - sets an idle heartbeat setting for a pull request, default is set to 5s // [WithMessagesErrOnMissingHeartbeat] - sets whether a missing heartbeat error should be reported when calling Next func (p *pullConsumer) Messages(opts ...PullMessagesOpt) (MessagesContext, error) { - consumeOpts, err := parseMessagesOpts(opts...) + consumeOpts, err := parseMessagesOpts(false, opts...) if err != nil { return nil, fmt.Errorf("%w: %s", ErrInvalidOption, err) } @@ -527,21 +541,38 @@ func (s *pullSubscription) Next() (Msg, error) { if !isConnected { isConnected = true // try fetching consumer info several times to make sure consumer is available after reconnect - for i := 0; i < 5; i++ { + backoffOpts := backoffOpts{ + attempts: 10, + initialInterval: 1 * time.Second, + disableInitialExecution: true, + factor: 2, + maxInterval: 10 * time.Second, + cancel: s.done, + } + err = retryWithBackoff(func(attempt int) (bool, error) { + isClosed := atomic.LoadUint32(&s.closed) == 1 + if isClosed { + return false, nil + } ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second) + defer cancel() _, err := s.consumer.Info(ctx) - cancel() - if err == nil { - break - } if err != nil { - if i == 4 { - s.Stop() - return nil, err + if errors.Is(err, ErrConsumerNotFound) { + return false, err + } + if attempt == backoffOpts.attempts-1 { + return true, fmt.Errorf("could not get consumer info after server reconnect: %w", err) } + return true, err } - time.Sleep(5 * time.Second) + return false, nil + }, backoffOpts) + if err != nil { + s.Stop() + return nil, err } + s.pending.msgCount = 0 s.pending.byteCount = 0 if hbMonitor != nil { @@ -578,6 +609,7 @@ func (s *pullSubscription) handleStatusMsg(msg *nats.Msg, msgErr error) error { if s.consumeOpts.ErrHandler != nil { s.consumeOpts.ErrHandler(s, err) } + return err } s.pending.msgCount -= msgsLeft if s.pending.msgCount < 0 { @@ -608,6 +640,7 @@ func (s *pullSubscription) Stop() { if atomic.LoadUint32(&s.closed) == 1 { return } + atomic.StoreUint32(&s.closed, 1) close(s.done) if s.consumeOpts.stopAfterMsgsLeft != nil { if s.delivered >= s.consumeOpts.StopAfter { @@ -616,7 +649,6 @@ func (s *pullSubscription) Stop() { s.consumeOpts.stopAfterMsgsLeft <- s.consumeOpts.StopAfter - s.delivered } } - atomic.StoreUint32(&s.closed, 1) } // Fetch sends a single request to retrieve given number of messages. @@ -812,6 +844,7 @@ func (s *pullSubscription) cleanup() { close(s.connStatusChanged) s.subscription = nil delete(s.consumer.subscriptions, s.id) + atomic.StoreUint32(&s.closed, 1) } // pull sends a pull request to the server and waits for messages using a subscription from [pullSubscription]. @@ -837,7 +870,7 @@ func (s *pullSubscription) pull(req *pullRequest, subject string) error { return nil } -func parseConsumeOpts(opts ...PullConsumeOpt) (*consumeOpts, error) { +func parseConsumeOpts(ordered bool, opts ...PullConsumeOpt) (*consumeOpts, error) { consumeOpts := &consumeOpts{ MaxMessages: unset, MaxBytes: unset, @@ -851,13 +884,13 @@ func parseConsumeOpts(opts ...PullConsumeOpt) (*consumeOpts, error) { return nil, err } } - if err := consumeOpts.setDefaults(); err != nil { + if err := consumeOpts.setDefaults(ordered); err != nil { return nil, err } return consumeOpts, nil } -func parseMessagesOpts(opts ...PullMessagesOpt) (*consumeOpts, error) { +func parseMessagesOpts(ordered bool, opts ...PullMessagesOpt) (*consumeOpts, error) { consumeOpts := &consumeOpts{ MaxMessages: unset, MaxBytes: unset, @@ -871,13 +904,13 @@ func parseMessagesOpts(opts ...PullMessagesOpt) (*consumeOpts, error) { return nil, err } } - if err := consumeOpts.setDefaults(); err != nil { + if err := consumeOpts.setDefaults(ordered); err != nil { return nil, err } return consumeOpts, nil } -func (consumeOpts *consumeOpts) setDefaults() error { +func (consumeOpts *consumeOpts) setDefaults(ordered bool) error { if consumeOpts.MaxBytes != unset && consumeOpts.MaxMessages != unset { return fmt.Errorf("only one of MaxMessages and MaxBytes can be specified") } @@ -902,9 +935,16 @@ func (consumeOpts *consumeOpts) setDefaults() error { consumeOpts.ThresholdBytes = int(math.Ceil(float64(consumeOpts.MaxBytes) / 2)) } if consumeOpts.Heartbeat == unset { - consumeOpts.Heartbeat = consumeOpts.Expires / 2 - if consumeOpts.Heartbeat > 30*time.Second { - consumeOpts.Heartbeat = 30 * time.Second + if ordered { + consumeOpts.Heartbeat = 5 * time.Second + if consumeOpts.Expires < 10*time.Second { + consumeOpts.Heartbeat = consumeOpts.Expires / 2 + } + } else { + consumeOpts.Heartbeat = consumeOpts.Expires / 2 + if consumeOpts.Heartbeat > 30*time.Second { + consumeOpts.Heartbeat = 30 * time.Second + } } } if consumeOpts.Heartbeat > consumeOpts.Expires/2 { @@ -912,3 +952,91 @@ func (consumeOpts *consumeOpts) setDefaults() error { } return nil } + +type backoffOpts struct { + // total retry attempts + // -1 for unlimited + attempts int + // initial interval after which first retry will be performed + // defaults to 1s + initialInterval time.Duration + // determines whether first function execution should be performed immediately + disableInitialExecution bool + // multiplier on each attempt + // defaults to 2 + factor float64 + // max interval between retries + // after reaching this value, all subsequent + // retries will be performed with this interval + // defaults to 1 minute + maxInterval time.Duration + // custom backoff intervals + // if set, overrides all other options except attempts + // if attempts are set, then the last interval will be used + // for all subsequent retries after reaching the limit + customBackoff []time.Duration + // cancel channel + // if set, retry will be cancelled when this channel is closed + cancel <-chan struct{} +} + +func retryWithBackoff(f func(int) (bool, error), opts backoffOpts) error { + var err error + var shouldContinue bool + // if custom backoff is set, use it instead of other options + if len(opts.customBackoff) > 0 { + if opts.attempts != 0 { + return fmt.Errorf("cannot use custom backoff intervals when attempts are set") + } + for i, interval := range opts.customBackoff { + select { + case <-opts.cancel: + return nil + case <-time.After(interval): + } + shouldContinue, err = f(i) + if !shouldContinue { + return err + } + } + return err + } + + // set default options + if opts.initialInterval == 0 { + opts.initialInterval = 1 * time.Second + } + if opts.factor == 0 { + opts.factor = 2 + } + if opts.maxInterval == 0 { + opts.maxInterval = 1 * time.Minute + } + if opts.attempts == 0 { + return fmt.Errorf("retry attempts have to be set when not using custom backoff intervals") + } + interval := opts.initialInterval + for i := 0; ; i++ { + if i == 0 && opts.disableInitialExecution { + time.Sleep(interval) + continue + } + shouldContinue, err = f(i) + if !shouldContinue { + return err + } + if opts.attempts > 0 && i >= opts.attempts-1 { + break + } + select { + case <-opts.cancel: + return nil + case <-time.After(interval): + } + interval = time.Duration(float64(interval) * opts.factor) + if interval >= opts.maxInterval { + interval = opts.maxInterval + } + } + return err +} diff --git a/vendor/github.com/nats-io/nats.go/nats.go b/vendor/github.com/nats-io/nats.go/nats.go index 73709db8..da13692f 100644 --- a/vendor/github.com/nats-io/nats.go/nats.go +++ b/vendor/github.com/nats-io/nats.go/nats.go @@ -47,7 +47,7 @@ import ( // Default Constants const ( - Version = "1.30.2" + Version = "1.31.0" DefaultURL = "nats://127.0.0.1:4222" DefaultPort = 4222 DefaultMaxReconnect = 60 @@ -3077,28 +3077,6 @@ func (nc *Conn) waitForMsgs(s *Subscription) { // Return what is to be used. If we return nil the message will be dropped. type msgFilter func(m *Msg) *Msg -func (nc *Conn) addMsgFilter(subject string, filter msgFilter) { - nc.subsMu.Lock() - defer nc.subsMu.Unlock() - - if nc.filters == nil { - nc.filters = make(map[string]msgFilter) - } - nc.filters[subject] = filter -} - -func (nc *Conn) removeMsgFilter(subject string) { - nc.subsMu.Lock() - defer nc.subsMu.Unlock() - - if nc.filters != nil { - delete(nc.filters, subject) - if len(nc.filters) == 0 { - nc.filters = nil - } - } -} - // processMsg is called by parse and will place the msg on the // appropriate channel/pending queue for processing. If the channel is full, // or the pending queue is over the pending limits, the connection is diff --git a/vendor/github.com/nats-io/nats.go/testing_internal.go b/vendor/github.com/nats-io/nats.go/testing_internal.go new file mode 100644 index 00000000..18397026 --- /dev/null +++ b/vendor/github.com/nats-io/nats.go/testing_internal.go @@ -0,0 +1,59 @@ +// Copyright 2023 The NATS Authors +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build internal_testing +// +build internal_testing + +// Functions in this file are only available when building nats.go with the +// internal_testing build tag. They are used by the nats.go test suite. +package nats + +// AddMsgFilter adds a message filter for the given subject +// to the connection. The filter will be called for each +// message received on the subject. If the filter returns +// nil, the message will be dropped. +func (nc *Conn) AddMsgFilter(subject string, filter msgFilter) { + nc.subsMu.Lock() + defer nc.subsMu.Unlock() + + if nc.filters == nil { + nc.filters = make(map[string]msgFilter) + } + nc.filters[subject] = filter +} + +// RemoveMsgFilter removes a message filter for the given subject. +func (nc *Conn) RemoveMsgFilter(subject string) { + nc.subsMu.Lock() + defer nc.subsMu.Unlock() + + if nc.filters != nil { + delete(nc.filters, subject) + if len(nc.filters) == 0 { + nc.filters = nil + } + } +} + +// IsJSControlMessage returns true if the message is a JetStream control message. +func IsJSControlMessage(msg *Msg) (bool, int) { + return isJSControlMessage(msg) +} + +// CloseTCPConn closes the underlying TCP connection. +// It can be used to simulate a disconnect. +func (nc *Conn) CloseTCPConn() { + nc.mu.Lock() + defer nc.mu.Unlock() + nc.conn.Close() +} diff --git a/vendor/modules.txt b/vendor/modules.txt index fb478ecd..21d599c7 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -278,9 +278,7 @@ github.com/modern-go/concurrent # github.com/modern-go/reflect2 v1.0.2 ## explicit; go 1.12 github.com/modern-go/reflect2 -# github.com/nats-io/nats-server/v2 v2.9.14 -## explicit; go 1.19 -# github.com/nats-io/nats.go v1.30.3-0.20231012190013-e4ae3183a8b0 +# github.com/nats-io/nats.go v1.31.0 ## explicit; go 1.20 github.com/nats-io/nats.go github.com/nats-io/nats.go/encoders/builtin