From 67978141c01b14f487665952c7039a6a94e8c77b Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Sun, 1 Oct 2023 20:07:30 +0200 Subject: [PATCH 01/11] ns: Remove application uplink queues --- config/messages.json | 18 ++ pkg/networkserver/grpc_asns.go | 40 +-- .../test/shared/application_uplink_queue.go | 39 +-- .../internal/test/shared/redis.go | 2 +- pkg/networkserver/networkserver.go | 3 +- .../redis/application_uplink_queue.go | 271 +++++++++--------- .../redis/uplink_queue_batcher.go | 56 ++++ pkg/networkserver/utils.go | 20 +- 8 files changed, 255 insertions(+), 194 deletions(-) create mode 100644 pkg/networkserver/redis/uplink_queue_batcher.go diff --git a/config/messages.json b/config/messages.json index d66e8d4649..9f89f6d34a 100644 --- a/config/messages.json +++ b/config/messages.json @@ -7370,6 +7370,15 @@ "file": "application_uplink_queue.go" } }, + "error:pkg/networkserver/redis:invalid_uid": { + "translations": { + "en": "invalid UID" + }, + "description": { + "package": "pkg/networkserver/redis", + "file": "application_uplink_queue.go" + } + }, "error:pkg/networkserver/redis:missing_downlink_correlation_id": { "translations": { "en": "missing identifier correlation ID on downlink message" @@ -7397,6 +7406,15 @@ "file": "registry.go" } }, + "error:pkg/networkserver/redis:missing_uid": { + "translations": { + "en": "missing UID" + }, + "description": { + "package": "pkg/networkserver/redis", + "file": "application_uplink_queue.go" + } + }, "error:pkg/networkserver/redis:no_uplink_match": { "translations": { "en": "no device matches uplink" diff --git a/pkg/networkserver/grpc_asns.go b/pkg/networkserver/grpc_asns.go index a91accfcac..e67cae7018 100644 --- a/pkg/networkserver/grpc_asns.go +++ b/pkg/networkserver/grpc_asns.go @@ -40,14 +40,11 @@ type ApplicationUplinkQueue interface { // Implementations must ensure that Add returns fast. Add(ctx context.Context, ups ...*ttnpb.ApplicationUp) error - // Dispatch dispatches the tasks in the queue. - Dispatch(ctx context.Context, consumerID string) error - - // PopApplication calls f on the most recent application uplink task in the schedule, for which timestamp is in range [0, time.Now()], - // if such is available, otherwise it blocks until it is. + // Pop groups up to limit most recent application uplinks in the queue + // by their application ID and calls f on each group. // Context passed to f must be derived from ctx. // Implementations must respect ctx.Done() value on best-effort basis. - Pop(ctx context.Context, consumerID string, f func(context.Context, *ttnpb.ApplicationIdentifiers, ApplicationUplinkQueueDrainFunc) (time.Time, error)) error + Pop(ctx context.Context, consumerID string, limit int, f func(context.Context, []*ttnpb.ApplicationUp) error) error } func applicationJoinAcceptWithoutAppSKey(pld *ttnpb.ApplicationJoinAccept) *ttnpb.ApplicationJoinAccept { @@ -99,29 +96,20 @@ func (ns *NetworkServer) createProcessApplicationUplinkTask(consumerID string) f } func (ns *NetworkServer) processApplicationUplinkTask(ctx context.Context, consumerID string) error { - return ns.applicationUplinks.Pop(ctx, consumerID, func(ctx context.Context, appID *ttnpb.ApplicationIdentifiers, drain ApplicationUplinkQueueDrainFunc) (time.Time, error) { - conn, err := ns.GetPeerConn(ctx, ttnpb.ClusterRole_APPLICATION_SERVER, nil) - if err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to get Application Server peer") - return time.Now().Add(applicationUplinkTaskRetryInterval), nil - } - - cl := ttnpb.NewNsAsClient(conn) - var sendErr bool - if err := drain(applicationUplinkLimit, func(ups ...*ttnpb.ApplicationUp) error { - err := ns.sendApplicationUplinks(ctx, cl, ups...) + return ns.applicationUplinks.Pop(ctx, consumerID, applicationUplinkLimit, + func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + conn, err := ns.GetPeerConn(ctx, ttnpb.ClusterRole_APPLICATION_SERVER, nil) if err != nil { - sendErr = true + log.FromContext(ctx).WithError(err).Warn("Failed to get Application Server peer") + return err } - return err - }); err != nil { - if !sendErr { - log.FromContext(ctx).WithError(err).Error("Failed to drain application uplinks") + cl := ttnpb.NewNsAsClient(conn) + if err := ns.sendApplicationUplinks(ctx, cl, ups...); err != nil { + log.FromContext(ctx).WithError(err).Error("Failed to send application uplinks") + return err } - return time.Now().Add(applicationUplinkTaskRetryInterval), nil - } - return time.Time{}, nil - }) + return nil + }) } func minAFCntDown(session *ttnpb.Session, macState *ttnpb.MACState) (uint32, error) { diff --git a/pkg/networkserver/internal/test/shared/application_uplink_queue.go b/pkg/networkserver/internal/test/shared/application_uplink_queue.go index fc809fee2d..838b9ff80a 100644 --- a/pkg/networkserver/internal/test/shared/application_uplink_queue.go +++ b/pkg/networkserver/internal/test/shared/application_uplink_queue.go @@ -18,7 +18,6 @@ import ( "bytes" "context" "testing" - "time" "github.com/smarty/assertions" "go.thethings.network/lorawan-stack/v3/pkg/errors" @@ -53,30 +52,6 @@ func handleApplicationUplinkQueueTest(ctx context.Context, q ApplicationUplinkQu t, a := test.MustNewTFromContext(ctx) t.Helper() - dispatchErrCh := make(chan error, len(consumerIDs)) - - dispatchCtx, cancelDispatchCtx := context.WithCancel(ctx) - defer cancelDispatchCtx() - for _, consumerID := range consumerIDs { - go func(consumerID string) { - select { - case <-ctx.Done(): - case dispatchErrCh <- q.Dispatch(dispatchCtx, consumerID): - } - }(consumerID) - } - defer func() { - cancelDispatchCtx() - - for range consumerIDs { - select { - case <-ctx.Done(): - case err := <-dispatchErrCh: - a.So(errors.IsCanceled(err), should.BeTrue) - } - } - }() - type popFuncReq struct { Context context.Context ApplicationIdentifiers *ttnpb.ApplicationIdentifiers @@ -89,23 +64,21 @@ func handleApplicationUplinkQueueTest(ctx context.Context, q ApplicationUplinkQu defer cancelPopCtx() for _, consumerID := range consumerIDs { go func(consumerID string) { - errCh <- q.Pop(popCtx, consumerID, func(ctx context.Context, appID *ttnpb.ApplicationIdentifiers, f ApplicationUplinkQueueDrainFunc) (time.Time, error) { + errCh <- q.Pop(popCtx, consumerID, int(testStreamBlockLimit()), func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { respCh := make(chan TaskPopFuncResponse, 1) select { case <-popCtx.Done(): - return time.Time{}, popCtx.Err() + return popCtx.Err() case reqCh <- popFuncReq{ - Context: ctx, - ApplicationIdentifiers: appID, - Func: f, - Response: respCh, + Context: ctx, + Response: respCh, }: } select { case <-popCtx.Done(): - return time.Time{}, popCtx.Err() + return popCtx.Err() case resp := <-respCh: - return resp.Time, resp.Error + return resp.Error } }) }(consumerID) diff --git a/pkg/networkserver/internal/test/shared/redis.go b/pkg/networkserver/internal/test/shared/redis.go index fb40f1102b..5ffccf960a 100644 --- a/pkg/networkserver/internal/test/shared/redis.go +++ b/pkg/networkserver/internal/test/shared/redis.go @@ -36,7 +36,7 @@ func testStreamBlockLimit() time.Duration { func NewRedisApplicationUplinkQueue(ctx context.Context) (ApplicationUplinkQueue, func()) { tb := test.MustTBFromContext(ctx) cl, flush := test.NewRedis(ctx, append(redisNamespace[:], "application-uplinks")...) - q := redis.NewApplicationUplinkQueue(cl, 100, redisConsumerGroup, 0, testStreamBlockLimit()) + q := redis.NewApplicationUplinkQueue(cl, 100, redisConsumerGroup, 0) if err := q.Init(ctx); err != nil { tb.Fatalf("Failed to initialize Redis application uplink queue: %s", test.FormatError(err)) } diff --git a/pkg/networkserver/networkserver.go b/pkg/networkserver/networkserver.go index 8ff73d33ea..b0335dbe3d 100644 --- a/pkg/networkserver/networkserver.go +++ b/pkg/networkserver/networkserver.go @@ -349,8 +349,7 @@ func New(c *component.Component, conf *Config, opts ...Option) (*NetworkServer, for id, dispatcher := range map[string]interface { Dispatch(context.Context, string) error }{ - downlinkDispatchTaskName: ns.downlinkTasks, - applicationUplinkDispatchTaskName: ns.applicationUplinks, + downlinkDispatchTaskName: ns.downlinkTasks, } { dispatcher := dispatcher ns.RegisterTask(&task.Config{ diff --git a/pkg/networkserver/redis/application_uplink_queue.go b/pkg/networkserver/redis/application_uplink_queue.go index 87630eca76..5b467f55ac 100644 --- a/pkg/networkserver/redis/application_uplink_queue.go +++ b/pkg/networkserver/redis/application_uplink_queue.go @@ -16,51 +16,62 @@ package redis import ( "context" + "sync" "github.com/redis/go-redis/v9" "go.thethings.network/lorawan-stack/v3/pkg/errors" - "go.thethings.network/lorawan-stack/v3/pkg/networkserver" + "go.thethings.network/lorawan-stack/v3/pkg/log" "go.thethings.network/lorawan-stack/v3/pkg/networkserver/internal/time" ttnredis "go.thethings.network/lorawan-stack/v3/pkg/redis" "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" "go.thethings.network/lorawan-stack/v3/pkg/unique" ) -type ApplicationUplinkQueue struct { - applicationQueue *ttnredis.TaskQueue - - redis *ttnredis.Client - maxLen int64 - group string - key string - minIdle time.Duration +type contextualUplinkBatch struct { + ctx context.Context + confirmIDs []string + uplinks []*ttnpb.ApplicationUp } const ( - payloadKey = "payload" + payloadKey = "payload" + payloadUIDKey = "uid" ) +var ( + errMissingPayload = errors.DefineDataLoss("missing_payload", "missing payload") + errInvalidPayload = errors.DefineCorruption("invalid_payload", "invalid payload") + errMissingUID = errors.DefineDataLoss("missing_uid", "missing UID") + errInvalidUID = errors.DefineCorruption("invalid_uid", "invalid UID") +) + +// ApplicationUplinkQueue is an implementation of ApplicationUplinkQueue. +type ApplicationUplinkQueue struct { + redis *ttnredis.Client + maxLen int64 + groupID string + streamID string + minIdle time.Duration + streamBlockLimit time.Duration + consumers sync.Map +} + // NewApplicationUplinkQueue returns new application uplink queue. func NewApplicationUplinkQueue( cl *ttnredis.Client, maxLen int64, - group string, + groupID string, minIdle time.Duration, streamBlockLimit time.Duration, ) *ApplicationUplinkQueue { return &ApplicationUplinkQueue{ - applicationQueue: &ttnredis.TaskQueue{ - Redis: cl, - MaxLen: maxLen, - Group: group, - Key: cl.Key("application"), - StreamBlockLimit: streamBlockLimit, - }, - redis: cl, - maxLen: maxLen, - group: group, - key: cl.Key("application-uplink"), - minIdle: minIdle, + redis: cl, + maxLen: maxLen, + groupID: groupID, + streamID: cl.Key("uplinks"), + minIdle: minIdle, + streamBlockLimit: streamBlockLimit, + consumers: sync.Map{}, } } @@ -68,68 +79,48 @@ func ApplicationUplinkQueueUIDGenericUplinkKey(r keyer, uid string) string { return ttnredis.Key(UIDKey(r, uid), "uplinks") } -func (q *ApplicationUplinkQueue) uidGenericUplinkKey(uid string) string { - return ApplicationUplinkQueueUIDGenericUplinkKey(q.redis, uid) -} - -func (q *ApplicationUplinkQueue) uidInvalidationKey(uid string) string { - return ttnredis.Key(q.uidGenericUplinkKey(uid), "invalidation") -} - -func (q *ApplicationUplinkQueue) uidJoinAcceptKey(uid string) string { - return ttnredis.Key(q.uidGenericUplinkKey(uid), "join-accept") -} - // Init initializes the ApplicationUplinkQueue. func (q *ApplicationUplinkQueue) Init(ctx context.Context) error { - return q.applicationQueue.Init(ctx) + cmd := q.redis.XGroupCreateMkStream(ctx, q.streamID, q.groupID, "0") + if err := cmd.Err(); err != nil && !ttnredis.IsConsumerGroupExistsErr(err) { + return ttnredis.ConvertError(err) + } + return nil } -// Close closes the ApplicationUplinkQueue. +// Close removes all consumers from the consumer group. func (q *ApplicationUplinkQueue) Close(ctx context.Context) error { - return q.applicationQueue.Close(ctx) + pipeline := q.redis.Pipeline() + q.consumers.Range(func(key, value any) bool { + pipeline.XGroupDelConsumer(ctx, q.streamID, q.groupID, key.(string)) + return true + }) + if _, err := pipeline.Exec(ctx); err != nil { + return ttnredis.ConvertError(err) + } + return nil } +// Add implements ApplicationUplinkQueue interface. func (q *ApplicationUplinkQueue) Add(ctx context.Context, ups ...*ttnpb.ApplicationUp) error { if len(ups) == 0 { return nil } _, err := q.redis.Pipelined(ctx, func(p redis.Pipeliner) error { - now := time.Now() - taskMap := map[string]time.Time{} for _, up := range ups { - uid := unique.ID(ctx, up.EndDeviceIds.ApplicationIds) - s, err := ttnredis.MarshalProto(up) if err != nil { return err } - - var uidStreamID string - switch up.Up.(type) { - case *ttnpb.ApplicationUp_JoinAccept: - uidStreamID = q.uidJoinAcceptKey(uid) - case *ttnpb.ApplicationUp_DownlinkQueueInvalidated: - uidStreamID = q.uidInvalidationKey(uid) - default: - uidStreamID = q.uidGenericUplinkKey(uid) - } p.XAdd(ctx, &redis.XAddArgs{ - Stream: uidStreamID, + Stream: q.streamID, MaxLen: q.maxLen, Approx: true, Values: map[string]any{ - payloadKey: s, + payloadUIDKey: unique.ID(ctx, up.EndDeviceIds), + payloadKey: s, }, }) - if _, ok := taskMap[uid]; !ok { - taskMap[uid] = now - } - } - for uid, t := range taskMap { - if err := q.applicationQueue.Add(ctx, p, uid, t, false); err != nil { - return err - } } return nil }) @@ -139,83 +130,101 @@ func (q *ApplicationUplinkQueue) Add(ctx context.Context, ups ...*ttnpb.Applicat return nil } -var ( - errInvalidPayload = errors.DefineCorruption("invalid_payload", "invalid payload") - errMissingPayload = errors.DefineDataLoss("missing_payload", "missing payload") -) +func uidStrFrom(values map[string]any) (string, error) { + uidValue, ok := values[payloadUIDKey] + if !ok { + return "", errMissingUID.New() + } + uid, ok := uidValue.(string) + if !ok { + return "", errInvalidUID.New() + } + return uid, nil +} -func (q *ApplicationUplinkQueue) Dispatch(ctx context.Context, consumerID string) error { - return q.applicationQueue.Dispatch(ctx, consumerID, nil) +func applicationUpFrom(values map[string]any) (*ttnpb.ApplicationUp, error) { + payloadValue, ok := values[payloadKey] + if !ok { + return nil, errMissingPayload.New() + } + payload, ok := payloadValue.(string) + if !ok { + return nil, errInvalidPayload.New() + } + up := &ttnpb.ApplicationUp{} + if err := ttnredis.UnmarshalProto(payload, up); err != nil { + return nil, errInvalidPayload.WithCause(err) + } + return up, nil } -func (q *ApplicationUplinkQueue) Pop(ctx context.Context, consumerID string, f func(context.Context, *ttnpb.ApplicationIdentifiers, networkserver.ApplicationUplinkQueueDrainFunc) (time.Time, error)) error { - return q.applicationQueue.Pop(ctx, consumerID, nil, func(p redis.Pipeliner, uid string, _ time.Time) error { - appID, err := unique.ToApplicationID(uid) +// Pop implements ApplicationUplinkQueue interface. +func (q *ApplicationUplinkQueue) Pop( + ctx context.Context, consumerID string, limit int, + f func(context.Context, []*ttnpb.ApplicationUp) error, +) error { + q.consumers.Store(consumerID, struct{}{}) + + msgs, _, err := q.redis.XAutoClaim(ctx, &redis.XAutoClaimArgs{ + Group: q.groupID, + Consumer: consumerID, + Stream: q.streamID, + Start: "-", + MinIdle: q.minIdle, + Count: int64(limit), + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + return ttnredis.ConvertError(err) + } + + remainingCount := limit - len(msgs) + if remainingCount <= 0 { + remainingCount = 0 + } + + streams, err := q.redis.XReadGroup(ctx, &redis.XReadGroupArgs{ + Group: q.groupID, + Consumer: consumerID, + Streams: []string{q.streamID, ">"}, + Count: int64(remainingCount), + Block: q.streamBlockLimit, + }).Result() + if err != nil && !errors.Is(err, redis.Nil) { + return ttnredis.ConvertError(err) + } + batches := map[string]*contextualUplinkBatch{} + if len(streams) > 0 { + stream := streams[0] + msgs = append(msgs, stream.Messages...) + } + + for _, msg := range msgs { + uid, err := uidStrFrom(msg.Values) if err != nil { - return err + log.FromContext(ctx).Warn("Malformed uplink UID. Skipping message.") + continue } - ctx, err := unique.WithContext(ctx, uid) + up, err := applicationUpFrom(msg.Values) if err != nil { - return err - } - joinAcceptUpStream := q.uidJoinAcceptKey(uid) - invalidationUpStream := q.uidInvalidationKey(uid) - genericUpStream := q.uidGenericUplinkKey(uid) - - streams := [...]string{ - joinAcceptUpStream, - invalidationUpStream, - genericUpStream, - } - - cmds, err := q.redis.Pipelined(ctx, func(pp redis.Pipeliner) error { - for _, stream := range streams { - pp.XGroupCreateMkStream(ctx, stream, q.group, "0") - } - return nil - }) - if err != nil && !ttnredis.IsConsumerGroupExistsErr(err) { - return ttnredis.ConvertError(err) + log.FromContext(ctx).WithError(err).Warn("Malformed uplink payload. Skipping message.") + continue } - var initErr error - for i, cmd := range cmds { - if err := cmd.Err(); err != nil && !ttnredis.IsConsumerGroupExistsErr(err) { - initErr = err - continue - } - p.XGroupDelConsumer(ctx, streams[i], q.group, consumerID) + if err := addToBatch(ctx, batches, msg.ID, uid, up); err != nil { + return err } - if initErr != nil { - return ttnredis.ConvertError(initErr) + } + pipeliner := q.redis.Pipeline() + for _, batch := range batches { + if err := f(batch.ctx, batch.uplinks); err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to process uplink batch") + continue // Do not confirm messages that failed to process. } - t, err := f(ctx, appID, func(limit int, g func(...*ttnpb.ApplicationUp) error) error { - ups := make([]*ttnpb.ApplicationUp, 0, limit) - - processMessages := func(stream string, msgs ...redis.XMessage) error { - ups = ups[:0] - for _, msg := range msgs { - v, ok := msg.Values[payloadKey] - if !ok { - return errMissingPayload.New() - } - s, ok := v.(string) - if !ok { - return errInvalidPayload.New() - } - up := &ttnpb.ApplicationUp{} - if err = ttnredis.UnmarshalProto(s, up); err != nil { - return err - } - ups = append(ups, up) - } - return g(ups...) - } - return ttnredis.RangeStreams(ctx, q.redis, q.group, consumerID, int64(limit), q.minIdle, processMessages, streams[:]...) - }) - if err != nil || t.IsZero() { - return err - } - return q.applicationQueue.Add(ctx, p, uid, t, true) - }) + pipeliner.XAck(ctx, q.streamID, q.groupID, batch.confirmIDs...) + pipeliner.XDel(ctx, q.streamID, batch.confirmIDs...) + } + if _, err := pipeliner.Exec(ctx); err != nil { + return ttnredis.ConvertError(err) + } + return nil } diff --git a/pkg/networkserver/redis/uplink_queue_batcher.go b/pkg/networkserver/redis/uplink_queue_batcher.go new file mode 100644 index 0000000000..495c4b950e --- /dev/null +++ b/pkg/networkserver/redis/uplink_queue_batcher.go @@ -0,0 +1,56 @@ +// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +//go:build !tti +// +build !tti + +package redis + +import ( + "context" + + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" +) + +func getBatchKey(uid string) (string, error) { + devIDs, err := unique.ToDeviceID(uid) + if err != nil { + return "", errInvalidUID.WithCause(err) + } + + return devIDs.ApplicationIds.GetApplicationId(), nil +} + +func addToBatch( + ctx context.Context, m map[string]*contextualUplinkBatch, + confirmID string, uid string, up *ttnpb.ApplicationUp, +) error { + key, err := getBatchKey(uid) + if err != nil { + return err + } + batch, ok := m[key] + if !ok { + batch = &contextualUplinkBatch{ + ctx: ctx, + confirmIDs: make([]string, 0), + uplinks: make([]*ttnpb.ApplicationUp, 0), + } + m[key] = batch + } + batch.uplinks = append(batch.uplinks, up) + batch.confirmIDs = append(batch.confirmIDs, confirmID) + return nil +} diff --git a/pkg/networkserver/utils.go b/pkg/networkserver/utils.go index 9259e56994..256896f7d3 100644 --- a/pkg/networkserver/utils.go +++ b/pkg/networkserver/utils.go @@ -19,6 +19,7 @@ import ( "fmt" "go.thethings.network/lorawan-stack/v3/pkg/band" + "go.thethings.network/lorawan-stack/v3/pkg/errors" "go.thethings.network/lorawan-stack/v3/pkg/events" "go.thethings.network/lorawan-stack/v3/pkg/log" . "go.thethings.network/lorawan-stack/v3/pkg/networkserver/internal" @@ -411,6 +412,10 @@ func (ns *NetworkServer) submitApplicationUplinks(ctx context.Context, ups ...*t )) if err := ns.uplinkSubmissionPool.Publish(ctx, ups); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to enqueue application uplinks in submission pool") + if nonRetryableUplinkError(err) { + log.FromContext(ctx).Warn("Error is non-retryable, dropping application uplinks") + return + } ns.enqueueApplicationUplinks(ctx, ups...) return } @@ -425,8 +430,11 @@ func (ns *NetworkServer) handleUplinkSubmission(ctx context.Context, ups []*ttnp } if err := ns.sendApplicationUplinks(ctx, ttnpb.NewNsAsClient(conn), ups...); err != nil { log.FromContext(ctx).WithError(err).Warn("Failed to send application uplinks to Application Server") + if nonRetryableUplinkError(err) { + log.FromContext(ctx).Warn("Error is non-retryable, dropping application uplinks") + return + } ns.enqueueApplicationUplinks(ctx, ups...) - return } } @@ -462,3 +470,13 @@ var ( "session", } ) + +func nonRetryableUplinkError(err error) bool { + return errors.IsFailedPrecondition(err) || + errors.IsResourceExhausted(err) || + errors.IsAborted(err) || + errors.IsUnauthenticated(err) || + errors.IsPermissionDenied(err) || + errors.IsUnimplemented(err) || + errors.IsInternal(err) +} From 4178873b5d74517b537fb8a33555eb4d1c39b0bd Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Sun, 15 Oct 2023 16:29:21 +0200 Subject: [PATCH 02/11] ns: Restructure application uplink queue tests --- .../test/shared/application_uplink_queue.go | 375 ------------------ .../internal/test/shared/redis.go | 2 +- .../redis/application_uplink_queue_test.go | 372 ++++++++++++++++- 3 files changed, 359 insertions(+), 390 deletions(-) delete mode 100644 pkg/networkserver/internal/test/shared/application_uplink_queue.go diff --git a/pkg/networkserver/internal/test/shared/application_uplink_queue.go b/pkg/networkserver/internal/test/shared/application_uplink_queue.go deleted file mode 100644 index 838b9ff80a..0000000000 --- a/pkg/networkserver/internal/test/shared/application_uplink_queue.go +++ /dev/null @@ -1,375 +0,0 @@ -// Copyright © 2019 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -package test - -import ( - "bytes" - "context" - "testing" - - "github.com/smarty/assertions" - "go.thethings.network/lorawan-stack/v3/pkg/errors" - . "go.thethings.network/lorawan-stack/v3/pkg/networkserver" - "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" - "go.thethings.network/lorawan-stack/v3/pkg/util/test" - "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" - "google.golang.org/protobuf/proto" -) - -// handleApplicationUplinkQueueTest runs a test suite on q. -func handleApplicationUplinkQueueTest(ctx context.Context, q ApplicationUplinkQueue, consumerIDs []string) { - assertAdd := func(ctx context.Context, ups ...*ttnpb.ApplicationUp) bool { - t, a := test.MustNewTFromContext(ctx) - t.Helper() - - errCh := make(chan error, 1) - go func() { - errCh <- q.Add(ctx, ups...) - }() - select { - case <-ctx.Done(): - t.Error("Timed out while waiting for Add to return") - return false - - case err := <-errCh: - return a.So(err, should.BeNil) - } - } - - assertDrainApplication := func(ctx context.Context, withError bool, expected ...*ttnpb.ApplicationUp) bool { - t, a := test.MustNewTFromContext(ctx) - t.Helper() - - type popFuncReq struct { - Context context.Context - ApplicationIdentifiers *ttnpb.ApplicationIdentifiers - Func ApplicationUplinkQueueDrainFunc - Response chan<- TaskPopFuncResponse - } - reqCh := make(chan popFuncReq, 1) - errCh := make(chan error, 1) - popCtx, cancelPopCtx := context.WithCancel(ctx) - defer cancelPopCtx() - for _, consumerID := range consumerIDs { - go func(consumerID string) { - errCh <- q.Pop(popCtx, consumerID, int(testStreamBlockLimit()), func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { - respCh := make(chan TaskPopFuncResponse, 1) - select { - case <-popCtx.Done(): - return popCtx.Err() - case reqCh <- popFuncReq{ - Context: ctx, - Response: respCh, - }: - } - select { - case <-popCtx.Done(): - return popCtx.Err() - case resp := <-respCh: - return resp.Error - } - }) - }(consumerID) - } - - var collected []*ttnpb.ApplicationUp - var requests int - for ; len(collected) < len(expected); requests++ { - select { - case <-ctx.Done(): - t.Error("Timed out while waiting for Pop callback to be called") - return false - - case req := <-reqCh: - if !test.AllTrue( - a.So(req.Context, should.HaveParentContextOrEqual, ctx), - a.So(req.ApplicationIdentifiers, should.Resemble, expected[0].EndDeviceIds.ApplicationIds), - ) { - t.Error("Pop callback assertion failed") - return false - } - - if withError { - if !a.So(req.Func(2, func(ups ...*ttnpb.ApplicationUp) error { - a.So(ups, should.NotBeEmpty) - return test.ErrInternal - }), should.HaveSameErrorDefinitionAs, test.ErrInternal) { - return false - } - } - - if !a.So(req.Func(2, func(ups ...*ttnpb.ApplicationUp) error { - a.So(ups, should.NotBeEmpty) - collected = append(collected, ups...) - return nil - }), should.BeNil) { - return false - } - close(req.Response) - } - } - - for i := 0; i < requests; i++ { - select { - case <-ctx.Done(): - t.Error("Timed out while waiting for Pop to return") - return false - case err := <-errCh: - if !a.So(err, should.BeNil) { - return false - } - } - } - - cancelPopCtx() - - for i := requests; i < len(consumerIDs); i++ { - select { - case <-ctx.Done(): - t.Error("Timed out while waiting for Pop to return") - return false - case err := <-errCh: - if !a.So(errors.IsCanceled(err), should.BeTrue) { - return false - } - } - } - - if a.So(len(collected), should.Equal, len(expected)) { - for _, ex := range expected { - expectedB, err := proto.Marshal(ex) - if !a.So(err, should.BeNil) { - return false - } - - var found bool - for _, coll := range collected { - collectedB, err := proto.Marshal(coll) - if !a.So(err, should.BeNil) { - return false - } - - if bytes.Equal(expectedB, collectedB) { - found = true - } - } - - if !a.So(found, should.BeTrue) { - return false - } - } - } - - return true - } - - appID1 := &ttnpb.ApplicationIdentifiers{ - ApplicationId: "application-uplink-queue-app-1", - } - - appID2 := &ttnpb.ApplicationIdentifiers{ - ApplicationId: "application-uplink-queue-app-2", - } - - invalidations := [...]*ttnpb.ApplicationUp{ - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev2", - }, - CorrelationIds: []string{"invalidations[0]"}, - Up: &ttnpb.ApplicationUp_DownlinkQueueInvalidated{ - DownlinkQueueInvalidated: &ttnpb.ApplicationInvalidatedDownlinks{}, - }, - }, - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev2", - }, - CorrelationIds: []string{"invalidations[1]"}, - Up: &ttnpb.ApplicationUp_DownlinkQueueInvalidated{ - DownlinkQueueInvalidated: &ttnpb.ApplicationInvalidatedDownlinks{}, - }, - }, - } - joinAccepts := [...]*ttnpb.ApplicationUp{ - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev", - }, - CorrelationIds: []string{"joinAccepts[0]"}, - Up: &ttnpb.ApplicationUp_JoinAccept{ - JoinAccept: &ttnpb.ApplicationJoinAccept{}, - }, - }, - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev2", - }, - CorrelationIds: []string{"joinAccepts[1]"}, - Up: &ttnpb.ApplicationUp_JoinAccept{ - JoinAccept: &ttnpb.ApplicationJoinAccept{}, - }, - }, - } - genericApp1Ups := [...]*ttnpb.ApplicationUp{ - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev", - }, - CorrelationIds: []string{"genericApp1Ups[0]"}, - Up: &ttnpb.ApplicationUp_DownlinkFailed{ - DownlinkFailed: &ttnpb.ApplicationDownlinkFailed{}, - }, - }, - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev", - }, - CorrelationIds: []string{"genericApp1Ups[1]"}, - Up: &ttnpb.ApplicationUp_LocationSolved{ - LocationSolved: &ttnpb.ApplicationLocation{}, - }, - }, - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev2", - }, - CorrelationIds: []string{"genericApp1Ups[2]"}, - Up: &ttnpb.ApplicationUp_DownlinkFailed{ - DownlinkFailed: &ttnpb.ApplicationDownlinkFailed{}, - }, - }, - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev", - }, - CorrelationIds: []string{"genericApp1Ups[3]"}, - Up: &ttnpb.ApplicationUp_DownlinkAck{ - DownlinkAck: &ttnpb.ApplicationDownlink{}, - }, - }, - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID1, - DeviceId: "test-dev2", - }, - CorrelationIds: []string{"genericApp1Ups[4]"}, - Up: &ttnpb.ApplicationUp_DownlinkAck{ - DownlinkAck: &ttnpb.ApplicationDownlink{}, - }, - }, - } - - genericApp2Ups := [...]*ttnpb.ApplicationUp{ - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID2, - DeviceId: "test-dev2", - }, - CorrelationIds: []string{"genericApp2Ups[0]"}, - Up: &ttnpb.ApplicationUp_LocationSolved{ - LocationSolved: &ttnpb.ApplicationLocation{}, - }, - }, - { - EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ - ApplicationIds: appID2, - DeviceId: "test-dev", - }, - CorrelationIds: []string{"genericApp2Ups[1]"}, - Up: &ttnpb.ApplicationUp_DownlinkFailed{ - DownlinkFailed: &ttnpb.ApplicationDownlinkFailed{}, - }, - }, - } - - _, a := test.MustNewTFromContext(ctx) - switch { - case - !a.So(assertAdd(ctx, - genericApp1Ups[0], - ), should.BeTrue), - !a.So(assertDrainApplication(ctx, false, - genericApp1Ups[0], - ), should.BeTrue), - - !a.So(assertAdd(ctx, - genericApp2Ups[0], - ), should.BeTrue), - !a.So(assertAdd(ctx, - genericApp2Ups[1], - ), should.BeTrue), - !a.So(assertDrainApplication(ctx, true, - genericApp2Ups[0], - genericApp2Ups[1], - ), should.BeTrue), - - !a.So(assertAdd(ctx, - genericApp1Ups[1], - genericApp1Ups[2], - invalidations[0], - genericApp1Ups[3], - invalidations[1], - joinAccepts[0], - ), should.BeTrue), - !a.So(assertAdd(ctx, - genericApp1Ups[4], - joinAccepts[1], - ), should.BeTrue), - !a.So(assertDrainApplication(ctx, false, - joinAccepts[0], - joinAccepts[1], - invalidations[0], - invalidations[1], - genericApp1Ups[1], - genericApp1Ups[2], - genericApp1Ups[3], - genericApp1Ups[4], - ), should.BeTrue): - } -} - -// HandleApplicationUplinkQueueTest runs a ApplicationUplinkQueue test suite on reg. -func HandleApplicationUplinkQueueTest(t *testing.T, q ApplicationUplinkQueue, consumerIDs []string) { - t.Helper() - test.RunTest(t, test.TestConfig{ - Func: func(ctx context.Context, a *assertions.Assertion) { - t.Helper() - test.RunSubtestFromContext(ctx, test.SubtestConfig{ - Name: "1st run", - Func: func(ctx context.Context, t *testing.T, a *assertions.Assertion) { - handleApplicationUplinkQueueTest(ctx, q, consumerIDs) - }, - }) - if t.Failed() { - t.Skip("Skipping 2nd run") - } - test.RunSubtestFromContext(ctx, test.SubtestConfig{ - Name: "2nd run", - Func: func(ctx context.Context, t *testing.T, a *assertions.Assertion) { - handleApplicationUplinkQueueTest(ctx, q, consumerIDs) - }, - }) - }, - }) -} diff --git a/pkg/networkserver/internal/test/shared/redis.go b/pkg/networkserver/internal/test/shared/redis.go index 5ffccf960a..fb40f1102b 100644 --- a/pkg/networkserver/internal/test/shared/redis.go +++ b/pkg/networkserver/internal/test/shared/redis.go @@ -36,7 +36,7 @@ func testStreamBlockLimit() time.Duration { func NewRedisApplicationUplinkQueue(ctx context.Context) (ApplicationUplinkQueue, func()) { tb := test.MustTBFromContext(ctx) cl, flush := test.NewRedis(ctx, append(redisNamespace[:], "application-uplinks")...) - q := redis.NewApplicationUplinkQueue(cl, 100, redisConsumerGroup, 0) + q := redis.NewApplicationUplinkQueue(cl, 100, redisConsumerGroup, 0, testStreamBlockLimit()) if err := q.Init(ctx); err != nil { tb.Fatalf("Failed to initialize Redis application uplink queue: %s", test.FormatError(err)) } diff --git a/pkg/networkserver/redis/application_uplink_queue_test.go b/pkg/networkserver/redis/application_uplink_queue_test.go index 4b5adc0a04..9354c214d0 100644 --- a/pkg/networkserver/redis/application_uplink_queue_test.go +++ b/pkg/networkserver/redis/application_uplink_queue_test.go @@ -15,28 +15,372 @@ package redis_test import ( + "context" "fmt" + "sync" "testing" + "github.com/smarty/assertions" "go.thethings.network/lorawan-stack/v3/pkg/networkserver" - . "go.thethings.network/lorawan-stack/v3/pkg/networkserver/internal/test/shared" - . "go.thethings.network/lorawan-stack/v3/pkg/networkserver/redis" + nsredis "go.thethings.network/lorawan-stack/v3/pkg/networkserver/redis" + ttnredis "go.thethings.network/lorawan-stack/v3/pkg/redis" + "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" + "go.thethings.network/lorawan-stack/v3/pkg/unique" "go.thethings.network/lorawan-stack/v3/pkg/util/test" + "go.thethings.network/lorawan-stack/v3/pkg/util/test/assertions/should" ) -var _ networkserver.ApplicationUplinkQueue = &ApplicationUplinkQueue{} +var _ networkserver.ApplicationUplinkQueue = &nsredis.ApplicationUplinkQueue{} -func TestApplicationUplinkQueue(t *testing.T) { - for _, consumers := range []int{1, 2, 4, 8} { - t.Run(fmt.Sprintf("Consumers=%d", consumers), func(t *testing.T) { - _, ctx := test.New(t) - consumerIDs := make([]string, 0, consumers) - for i := 0; i < consumers; i++ { - consumerIDs = append(consumerIDs, fmt.Sprintf("consumer-%d-%d", consumers, i)) +var ( + redisNamespace = [...]string{ + "redis_test_uplink_queue", + } + readLimit = 7 + maxLen = int64(100) + groupID = "ns-test" + minIdle = (1 << 8) * test.Delay + streamBlockLimit = (1 << 5) * test.Delay + + appCount = 5 + devCountPerApp = 3 +) + +func setupRedusApplicationUplinkQueue(t *testing.T, cl *ttnredis.Client) (*nsredis.ApplicationUplinkQueue, func()) { + t.Helper() + + _, ctx := test.New(t) + + q := nsredis.NewApplicationUplinkQueue(cl, maxLen, groupID, minIdle, streamBlockLimit) + + return q, func() { + if err := q.Close(ctx); err != nil { + t.Errorf("Failed to close Redis application uplink queue: %s", err) + } + } +} + +func TestApplicationUplinkQueueInitCreatesConsumerGroup(t *testing.T) { + t.Parallel() + a, ctx := test.New(t) + + cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + t.Cleanup(redisCloseFn) + + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) + t.Cleanup(qCloseFn) + + if !a.So(q.Init(ctx), should.BeNil) { + t.FailNow() + } + + streamID := cl.Key("uplinks") + groups, err := cl.XInfoGroups(ctx, streamID).Result() + if !a.So(err, should.BeNil) { + t.FailNow() + } + a.So(groups, should.HaveLength, 1) + a.So(groups[0].Name, should.Equal, groupID) + a.So(groups[0].Consumers, should.Equal, 0) +} + +func TestApplicationUplinkQueueCloseRemovesAllConsumers(t *testing.T) { + t.Parallel() + a, ctx := test.New(t) + + cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + t.Cleanup(redisCloseFn) + + q, _ := setupRedusApplicationUplinkQueue(t, cl) + + if !a.So(q.Init(ctx), should.BeNil) { + t.FailNow() + } + + consumerIDs := []string{"test-consumer-1", "test-consumer-2"} + up := &ttnpb.ApplicationUp{ + EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ + DeviceId: "test-device", + ApplicationIds: &ttnpb.ApplicationIdentifiers{ + ApplicationId: "test-application", + }, + }, + } + for _, consumerID := range consumerIDs { + if err := q.Add(ctx, up); !a.So(err, should.BeNil) { + t.FailNow() + } + if err := q.Pop(ctx, consumerID, 1, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + return nil + }); !a.So(err, should.BeNil) { + t.FailNow() + } + } + + streamID := cl.Key("uplinks") + consumers, err := cl.XInfoConsumers(ctx, streamID, groupID).Result() + if !a.So(err, should.BeNil) { + t.FailNow() + } + a.So(consumers, should.HaveLength, 2) + + if !a.So(q.Close(ctx), should.BeNil) { + t.FailNow() + } + + consumers, err = cl.XInfoConsumers(ctx, streamID, groupID).Result() + if !a.So(err, should.BeNil) { + t.FailNow() + } + a.So(consumers, should.HaveLength, 0) +} + +func generateRandomUplinks(t *testing.T, applicationCount, deviceCount int) []*ttnpb.ApplicationUp { + t.Helper() + + ups := make([]*ttnpb.ApplicationUp, 0, applicationCount*deviceCount) + for i := 0; i < applicationCount; i++ { + applicationID := fmt.Sprintf("test-application-%d", i) + for j := 0; j < deviceCount; j++ { + deviceID := fmt.Sprintf("test-device-%d", j) + ups = append(ups, &ttnpb.ApplicationUp{ + EndDeviceIds: &ttnpb.EndDeviceIdentifiers{ + ApplicationIds: &ttnpb.ApplicationIdentifiers{ + ApplicationId: applicationID, + }, + DeviceId: deviceID, + }, + }) + } + } + return ups +} + +func assertAllEqualAppIDs(t *testing.T, ups []*ttnpb.ApplicationUp) { + t.Helper() + + a := assertions.New(t) + if !a.So(ups, should.NotBeEmpty) { + t.FailNow() + } + + expected := ups[0].EndDeviceIds.ApplicationIds + for _, up := range ups[1:] { + actual := up.EndDeviceIds.ApplicationIds + if !a.So(actual, should.Resemble, expected) { + t.FailNow() + } + } +} + +func assertStreamUplinkCount(t *testing.T, cl *ttnredis.Client, expected int) { + t.Helper() + + a, ctx := test.New(t) + streamID := cl.Key("uplinks") + entries, err := cl.XRange(ctx, streamID, "-", "+").Result() + if !a.So(err, should.BeNil) { + t.FailNow() + } + a.So(entries, should.HaveLength, expected) +} + +func TestApplicationUplinkQueueAdd(t *testing.T) { + t.Parallel() + a, ctx := test.New(t) + + cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + t.Cleanup(redisCloseFn) + + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) + t.Cleanup(qCloseFn) + + if !a.So(q.Init(ctx), should.BeNil) { + t.FailNow() + } + + expectedUps := generateRandomUplinks(t, appCount, devCountPerApp) + expectedUIDs := make([]string, 0, len(expectedUps)) + for _, up := range expectedUps { + expectedUIDs = append(expectedUIDs, unique.ID(ctx, up.EndDeviceIds)) + } + + if err := q.Add(ctx, expectedUps...); !a.So(err, should.BeNil) { + t.FailNow() + } + + streamID := cl.Key("uplinks") + entries, err := cl.XRange(ctx, streamID, "-", "+").Result() + if !a.So(err, should.BeNil) { + t.FailNow() + } + if !a.So(entries, should.HaveLength, len(expectedUps)) { + t.FailNow() + } + + actualUIDs := make([]string, 0, len(entries)) + actualUps := make([]*ttnpb.ApplicationUp, 0, 4) + for _, entry := range entries { + a.So(entry.Values, should.HaveLength, 2) + a.So(entry.Values["payload"], should.NotBeEmpty) + a.So(entry.Values["uid"], should.NotBeEmpty) + actualUIDs = append(actualUIDs, entry.Values["uid"].(string)) + up := &ttnpb.ApplicationUp{} + if err := ttnredis.UnmarshalProto(entry.Values["payload"].(string), up); !a.So(err, should.BeNil) { + t.FailNow() + } + actualUps = append(actualUps, up) + } + a.So(actualUIDs, should.HaveSameElementsDeep, expectedUIDs) + a.So(actualUps, should.HaveSameElementsDeep, expectedUps) +} + +func TestApplicationUplinkQueuePopAll(t *testing.T) { + t.Parallel() + a, ctx := test.New(t) + + cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + t.Cleanup(redisCloseFn) + + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) + t.Cleanup(qCloseFn) + + if !a.So(q.Init(ctx), should.BeNil) { + t.FailNow() + } + + consumerCount := 3 + uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) + errCh := make(chan error, consumerCount) + wg := sync.WaitGroup{} + + for i := 0; i < consumerCount; i++ { + consumerID := fmt.Sprintf("test-consumer-%d", i) + wg.Add(1) + go func() { + defer wg.Done() + + errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + assertAllEqualAppIDs(t, ups) + uplinkCh <- ups + return nil + }) + }() + } + + expected := generateRandomUplinks(t, appCount, devCountPerApp) + actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) + var err error + + go func() { + for { + select { + case ups := <-uplinkCh: + actual = append(actual, ups...) + case <-ctx.Done(): + errCh <- ctx.Err() } - q, closeFn := NewRedisApplicationUplinkQueue(ctx) - defer closeFn() - HandleApplicationUplinkQueueTest(t, q, consumerIDs) - }) + } + }() + + go func() { + for { + select { + case err = <-errCh: + return + case <-ctx.Done(): + errCh <- ctx.Err() + } + } + }() + + if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { + t.FailNow() } + + wg.Wait() + + a.So(err, should.BeNil) + a.So(actual, should.HaveLength, len(expected)) + assertStreamUplinkCount(t, cl, 0) +} + +func TestApplicationUplinkQueuePopErr(t *testing.T) { + t.Parallel() + a, ctx := test.New(t) + + cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + t.Cleanup(redisCloseFn) + + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) + t.Cleanup(qCloseFn) + + if !a.So(q.Init(ctx), should.BeNil) { + t.FailNow() + } + + generateError := func(ups []*ttnpb.ApplicationUp) error { + appID := ups[0].EndDeviceIds.ApplicationIds.ApplicationId + if appID == "test-application-1" || appID == "test-application-2" { + return fmt.Errorf("test error") + } + return nil + } + + consumerCount := 3 + uplinkCh := make(chan []*ttnpb.ApplicationUp, appCount) + errCh := make(chan error, consumerCount) + wg := sync.WaitGroup{} + + for i := 0; i < consumerCount; i++ { + consumerID := fmt.Sprintf("test-consumer-%d", i) + wg.Add(1) + go func() { + defer wg.Done() + + errCh <- q.Pop(ctx, consumerID, readLimit, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + assertAllEqualAppIDs(t, ups) + uplinkCh <- ups + return generateError(ups) + }) + }() + } + + expected := generateRandomUplinks(t, appCount, devCountPerApp) + actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) + var err error + + go func() { + for { + select { + case ups := <-uplinkCh: + actual = append(actual, ups...) + case <-ctx.Done(): + errCh <- ctx.Err() + } + } + }() + + go func() { + for { + select { + case err = <-errCh: + return + case <-ctx.Done(): + errCh <- ctx.Err() + } + } + }() + + if err := q.Add(ctx, expected...); !a.So(err, should.BeNil) { + t.FailNow() + } + + wg.Wait() + + expectedFailCount := devCountPerApp * 2 + + a.So(err, should.BeNil) + a.So(actual, should.HaveLength, len(expected)) // All uplinks should have been processed + assertStreamUplinkCount(t, cl, expectedFailCount) // Only failed uplinks should remain in the stream } From 5e297f95cb1ebe41fccde887b27f70dcc64ef921 Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Sun, 1 Oct 2023 20:08:25 +0200 Subject: [PATCH 03/11] ns,cli: Add ns-db purge command --- cmd/ttn-lw-stack/commands/ns_db.go | 49 ++++++++++++++++++++++++++++++ 1 file changed, 49 insertions(+) diff --git a/cmd/ttn-lw-stack/commands/ns_db.go b/cmd/ttn-lw-stack/commands/ns_db.go index aac3270f3f..f5d5bbfefe 100644 --- a/cmd/ttn-lw-stack/commands/ns_db.go +++ b/cmd/ttn-lw-stack/commands/ns_db.go @@ -211,6 +211,54 @@ var ( return nil }, } + nsDBPurgeCommand = &cobra.Command{ + Use: "purge", + Short: "Purge Network Server application data", + RunE: func(cmd *cobra.Command, args []string) error { + if config.Redis.IsZero() { + panic("Only Redis is supported by this command") + } + + logger.Info("Connecting to Redis database...") + cl := NewNetworkServerApplicationUplinkQueueRedis(config) + defer cl.Close() + + var purged uint64 + + genericUIDKeys := nsredis.ApplicationUplinkQueueUIDGenericUplinkKey(cl, "*") + invalidationUIDKeys := ttnredis.Key(genericUIDKeys, "invalidation") + joinAcceptUIDKeys := ttnredis.Key(genericUIDKeys, "join-accept") + taskQueueKeys := ttnredis.Key(cl.Key("application"), "*") + + targets := []string{ + genericUIDKeys, + invalidationUIDKeys, + joinAcceptUIDKeys, + taskQueueKeys, + } + + pipeliner := cl.Pipeline() + for _, target := range targets { + err := ttnredis.RangeRedisKeys(ctx, cl, target, ttnredis.DefaultRangeCount, + func(k string) (bool, error) { + pipeliner.Del(ctx, k) + purged++ + return true, nil + }) + if err != nil { + logger.WithError(err).Error("Failed to purge Network Server application data") + return err + } + } + if _, err := pipeliner.Exec(ctx); err != nil { + logger.WithError(err).Error("Failed to purge Network Server application data") + return err + } + + logger.WithField("records_purged_count", purged).Info("Purged Network Server application data") + return nil + }, + } ) func init() { @@ -221,4 +269,5 @@ func init() { nsDBCleanupCommand.Flags().Bool("dry-run", false, "Dry run") nsDBCleanupCommand.Flags().Duration("pagination-delay", 100, "Delay between batch requests") nsDBCommand.AddCommand(nsDBCleanupCommand) + nsDBCommand.AddCommand(nsDBPurgeCommand) } From c42e705fb2f748539fc8978640f249abe5e94f66 Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Wed, 18 Oct 2023 12:28:28 +0200 Subject: [PATCH 04/11] ns: Fix uplink queue test race condition --- .../redis/application_uplink_queue_test.go | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/pkg/networkserver/redis/application_uplink_queue_test.go b/pkg/networkserver/redis/application_uplink_queue_test.go index 9354c214d0..04273c963d 100644 --- a/pkg/networkserver/redis/application_uplink_queue_test.go +++ b/pkg/networkserver/redis/application_uplink_queue_test.go @@ -64,7 +64,7 @@ func TestApplicationUplinkQueueInitCreatesConsumerGroup(t *testing.T) { t.Parallel() a, ctx := test.New(t) - cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "init")...) t.Cleanup(redisCloseFn) q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) @@ -88,7 +88,7 @@ func TestApplicationUplinkQueueCloseRemovesAllConsumers(t *testing.T) { t.Parallel() a, ctx := test.New(t) - cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "close")...) t.Cleanup(redisCloseFn) q, _ := setupRedusApplicationUplinkQueue(t, cl) @@ -189,7 +189,7 @@ func TestApplicationUplinkQueueAdd(t *testing.T) { t.Parallel() a, ctx := test.New(t) - cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "add")...) t.Cleanup(redisCloseFn) q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) @@ -239,7 +239,7 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { t.Parallel() a, ctx := test.New(t) - cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_all")...) t.Cleanup(redisCloseFn) q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) @@ -309,7 +309,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { t.Parallel() a, ctx := test.New(t) - cl, redisCloseFn := test.NewRedis(ctx, redisNamespace[:]...) + cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_err")...) t.Cleanup(redisCloseFn) q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) From 205f90462fb4a01ffb34792ec357f5b2937274b7 Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Wed, 18 Oct 2023 12:53:37 +0200 Subject: [PATCH 05/11] ns: Return on error when reading uplink queue --- pkg/networkserver/redis/application_uplink_queue.go | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/pkg/networkserver/redis/application_uplink_queue.go b/pkg/networkserver/redis/application_uplink_queue.go index 5b467f55ac..a22eccd52f 100644 --- a/pkg/networkserver/redis/application_uplink_queue.go +++ b/pkg/networkserver/redis/application_uplink_queue.go @@ -201,13 +201,11 @@ func (q *ApplicationUplinkQueue) Pop( for _, msg := range msgs { uid, err := uidStrFrom(msg.Values) if err != nil { - log.FromContext(ctx).Warn("Malformed uplink UID. Skipping message.") - continue + return err } up, err := applicationUpFrom(msg.Values) if err != nil { - log.FromContext(ctx).WithError(err).Warn("Malformed uplink payload. Skipping message.") - continue + return err } if err := addToBatch(ctx, batches, msg.ID, uid, up); err != nil { return err From 29737de3a1a089c38fe478f48d5a1971583d27d3 Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Wed, 18 Oct 2023 12:58:02 +0200 Subject: [PATCH 06/11] ns: Remove excess if check --- pkg/networkserver/redis/application_uplink_queue.go | 4 ---- 1 file changed, 4 deletions(-) diff --git a/pkg/networkserver/redis/application_uplink_queue.go b/pkg/networkserver/redis/application_uplink_queue.go index a22eccd52f..e6ece47312 100644 --- a/pkg/networkserver/redis/application_uplink_queue.go +++ b/pkg/networkserver/redis/application_uplink_queue.go @@ -178,10 +178,6 @@ func (q *ApplicationUplinkQueue) Pop( } remainingCount := limit - len(msgs) - if remainingCount <= 0 { - remainingCount = 0 - } - streams, err := q.redis.XReadGroup(ctx, &redis.XReadGroupArgs{ Group: q.groupID, Consumer: consumerID, From 16310c17c79640da4540aa080d0d29e99cca32b3 Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Wed, 18 Oct 2023 12:59:09 +0200 Subject: [PATCH 07/11] ns: Improve uplink queue batching logic --- .../redis/application_uplink_queue.go | 30 ++++++++++ .../redis/uplink_queue_batcher.go | 56 ------------------- 2 files changed, 30 insertions(+), 56 deletions(-) delete mode 100644 pkg/networkserver/redis/uplink_queue_batcher.go diff --git a/pkg/networkserver/redis/application_uplink_queue.go b/pkg/networkserver/redis/application_uplink_queue.go index e6ece47312..cc8b60a42f 100644 --- a/pkg/networkserver/redis/application_uplink_queue.go +++ b/pkg/networkserver/redis/application_uplink_queue.go @@ -158,6 +158,36 @@ func applicationUpFrom(values map[string]any) (*ttnpb.ApplicationUp, error) { return up, nil } +func addToBatch( + ctx context.Context, + m map[string]*contextualUplinkBatch, + confirmID string, + uid string, + up *ttnpb.ApplicationUp, +) error { + ctx, err := unique.WithContext(ctx, uid) + if err != nil { + return errInvalidUID.WithCause(err) + } + ids, err := unique.ToDeviceID(uid) + if err != nil { + return errInvalidUID.WithCause(err) + } + key := unique.ID(ctx, ids.ApplicationIds) + batch, ok := m[key] + if !ok { + batch = &contextualUplinkBatch{ + ctx: ctx, + confirmIDs: make([]string, 0), + uplinks: make([]*ttnpb.ApplicationUp, 0), + } + m[key] = batch + } + batch.uplinks = append(batch.uplinks, up) + batch.confirmIDs = append(batch.confirmIDs, confirmID) + return nil +} + // Pop implements ApplicationUplinkQueue interface. func (q *ApplicationUplinkQueue) Pop( ctx context.Context, consumerID string, limit int, diff --git a/pkg/networkserver/redis/uplink_queue_batcher.go b/pkg/networkserver/redis/uplink_queue_batcher.go deleted file mode 100644 index 495c4b950e..0000000000 --- a/pkg/networkserver/redis/uplink_queue_batcher.go +++ /dev/null @@ -1,56 +0,0 @@ -// Copyright © 2023 The Things Network Foundation, The Things Industries B.V. -// -// Licensed under the Apache License, Version 2.0 (the "License"); -// you may not use this file except in compliance with the License. -// You may obtain a copy of the License at -// -// http://www.apache.org/licenses/LICENSE-2.0 -// -// Unless required by applicable law or agreed to in writing, software -// distributed under the License is distributed on an "AS IS" BASIS, -// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. -// See the License for the specific language governing permissions and -// limitations under the License. - -//go:build !tti -// +build !tti - -package redis - -import ( - "context" - - "go.thethings.network/lorawan-stack/v3/pkg/ttnpb" - "go.thethings.network/lorawan-stack/v3/pkg/unique" -) - -func getBatchKey(uid string) (string, error) { - devIDs, err := unique.ToDeviceID(uid) - if err != nil { - return "", errInvalidUID.WithCause(err) - } - - return devIDs.ApplicationIds.GetApplicationId(), nil -} - -func addToBatch( - ctx context.Context, m map[string]*contextualUplinkBatch, - confirmID string, uid string, up *ttnpb.ApplicationUp, -) error { - key, err := getBatchKey(uid) - if err != nil { - return err - } - batch, ok := m[key] - if !ok { - batch = &contextualUplinkBatch{ - ctx: ctx, - confirmIDs: make([]string, 0), - uplinks: make([]*ttnpb.ApplicationUp, 0), - } - m[key] = batch - } - batch.uplinks = append(batch.uplinks, up) - batch.confirmIDs = append(batch.confirmIDs, confirmID) - return nil -} From d80b652db3ca593658ddfefae7efdb5133f370ee Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Wed, 18 Oct 2023 13:32:10 +0200 Subject: [PATCH 08/11] dev: Generate locales --- pkg/webui/locales/ja.json | 2 ++ 1 file changed, 2 insertions(+) diff --git a/pkg/webui/locales/ja.json b/pkg/webui/locales/ja.json index 12329034c1..e667bfce9a 100644 --- a/pkg/webui/locales/ja.json +++ b/pkg/webui/locales/ja.json @@ -2406,9 +2406,11 @@ "error:pkg/networkserver/redis:invalid_identifiers": "無効な識別子", "error:pkg/networkserver/redis:invalid_member_type": "無効なメンバータイプ", "error:pkg/networkserver/redis:invalid_payload": "無効なペイロード", + "error:pkg/networkserver/redis:invalid_uid": "", "error:pkg/networkserver/redis:missing_downlink_correlation_id": "ダウンリンクメッセージの識別子相関IDがありません", "error:pkg/networkserver/redis:missing_payload": "ペイロードが見つかりません", "error:pkg/networkserver/redis:missing_session_data": "セッションデータがありません", + "error:pkg/networkserver/redis:missing_uid": "", "error:pkg/networkserver/redis:no_uplink_match": "デバイスがアップリンクと一致しません", "error:pkg/networkserver/redis:read_only_field": "読み取り専用フィールド `{field}`", "error:pkg/networkserver:abp_join_request": "ABPデバイスからジョインリクエストを受信しました", From ea51853d63faa805e9b8140dfff07f876bba9091 Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Wed, 18 Oct 2023 15:56:30 +0200 Subject: [PATCH 09/11] ns: Add uplink queue consumer claiming test --- .../redis/application_uplink_queue_test.go | 89 +++++++++++++++++-- 1 file changed, 80 insertions(+), 9 deletions(-) diff --git a/pkg/networkserver/redis/application_uplink_queue_test.go b/pkg/networkserver/redis/application_uplink_queue_test.go index 04273c963d..d695cca9d2 100644 --- a/pkg/networkserver/redis/application_uplink_queue_test.go +++ b/pkg/networkserver/redis/application_uplink_queue_test.go @@ -19,7 +19,9 @@ import ( "fmt" "sync" "testing" + "time" + "github.com/redis/go-redis/v9" "github.com/smarty/assertions" "go.thethings.network/lorawan-stack/v3/pkg/networkserver" nsredis "go.thethings.network/lorawan-stack/v3/pkg/networkserver/redis" @@ -40,13 +42,15 @@ var ( maxLen = int64(100) groupID = "ns-test" minIdle = (1 << 8) * test.Delay - streamBlockLimit = (1 << 5) * test.Delay + streamBlockLimit = (1 << 6) * test.Delay appCount = 5 devCountPerApp = 3 ) -func setupRedusApplicationUplinkQueue(t *testing.T, cl *ttnredis.Client) (*nsredis.ApplicationUplinkQueue, func()) { +func setupRedusApplicationUplinkQueue( + t *testing.T, cl *ttnredis.Client, minIdle, streamBlockLimit time.Duration, +) (*nsredis.ApplicationUplinkQueue, func()) { t.Helper() _, ctx := test.New(t) @@ -60,14 +64,14 @@ func setupRedusApplicationUplinkQueue(t *testing.T, cl *ttnredis.Client) (*nsred } } -func TestApplicationUplinkQueueInitCreatesConsumerGroup(t *testing.T) { +func TestApplicationUplinkQueueInit(t *testing.T) { t.Parallel() a, ctx := test.New(t) cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "init")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -84,14 +88,14 @@ func TestApplicationUplinkQueueInitCreatesConsumerGroup(t *testing.T) { a.So(groups[0].Consumers, should.Equal, 0) } -func TestApplicationUplinkQueueCloseRemovesAllConsumers(t *testing.T) { +func TestApplicationUplinkQueueClose(t *testing.T) { t.Parallel() a, ctx := test.New(t) cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "close")...) t.Cleanup(redisCloseFn) - q, _ := setupRedusApplicationUplinkQueue(t, cl) + q, _ := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) if !a.So(q.Init(ctx), should.BeNil) { t.FailNow() @@ -192,7 +196,7 @@ func TestApplicationUplinkQueueAdd(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "add")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -242,7 +246,7 @@ func TestApplicationUplinkQueuePopAll(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_all")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -312,7 +316,7 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "pop_err")...) t.Cleanup(redisCloseFn) - q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl) + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, minIdle, streamBlockLimit) t.Cleanup(qCloseFn) if !a.So(q.Init(ctx), should.BeNil) { @@ -384,3 +388,70 @@ func TestApplicationUplinkQueuePopErr(t *testing.T) { a.So(actual, should.HaveLength, len(expected)) // All uplinks should have been processed assertStreamUplinkCount(t, cl, expectedFailCount) // Only failed uplinks should remain in the stream } + +func TestApplicationUplinkQueueClaiming(t *testing.T) { + t.Parallel() + a, ctx := test.New(t) + + cl, redisCloseFn := test.NewRedis(ctx, append(redisNamespace[:], "claiming")...) + t.Cleanup(redisCloseFn) + + q, qCloseFn := setupRedusApplicationUplinkQueue(t, cl, 0, streamBlockLimit) + t.Cleanup(qCloseFn) + + if !a.So(q.Init(ctx), should.BeNil) { + t.FailNow() + } + + expected := generateRandomUplinks(t, appCount, devCountPerApp) + totalCount := len(expected) + subsetCount := totalCount / 2 + subset, rest := expected[:subsetCount], expected[subsetCount:] + + if err := q.Add(ctx, subset...); !a.So(err, should.BeNil) { + t.FailNow() + } + + consumerID1 := fmt.Sprintf("test-consumer-%d", 1) + err := q.Pop(ctx, consumerID1, totalCount, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + return fmt.Errorf("consumer error") + }) + if !a.So(err, should.BeNil) { + t.FailNow() + } + + streamID := cl.Key("uplinks") + pending, err := cl.XPendingExt(ctx, &redis.XPendingExtArgs{ + Stream: streamID, + Group: groupID, + Start: "-", + End: "+", + Count: int64(totalCount), + }).Result() + if !a.So(err, should.BeNil) { + t.FailNow() + } + a.So(len(pending), should.Equal, subsetCount) + + if err := q.Add(ctx, rest...); !a.So(err, should.BeNil) { + t.FailNow() + } + + actual := make([]*ttnpb.ApplicationUp, 0, len(expected)) + invokeCount := 0 + consumerID2 := fmt.Sprintf("test-consumer-%d", 2) + err = q.Pop(ctx, consumerID2, 100, func(ctx context.Context, ups []*ttnpb.ApplicationUp) error { + assertAllEqualAppIDs(t, ups) + invokeCount++ + actual = append(actual, ups...) + return nil + }) + if !a.So(err, should.BeNil) { + t.FailNow() + } + + a.So(err, should.BeNil) + a.So(invokeCount, should.Equal, appCount) + a.So(actual, should.HaveLength, len(expected)) + assertStreamUplinkCount(t, cl, 0) +} From d6bcce0afcf8a483d4d6df799ec5331d9a61b6eb Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Wed, 18 Oct 2023 15:58:15 +0200 Subject: [PATCH 10/11] ns: Extract uplink queue message processing logic --- .../redis/application_uplink_queue.go | 65 ++++++++++--------- 1 file changed, 36 insertions(+), 29 deletions(-) diff --git a/pkg/networkserver/redis/application_uplink_queue.go b/pkg/networkserver/redis/application_uplink_queue.go index cc8b60a42f..6bef2f3c1c 100644 --- a/pkg/networkserver/redis/application_uplink_queue.go +++ b/pkg/networkserver/redis/application_uplink_queue.go @@ -188,6 +188,41 @@ func addToBatch( return nil } +func (q *ApplicationUplinkQueue) processMessages( + ctx context.Context, + msgs []redis.XMessage, + f func(context.Context, []*ttnpb.ApplicationUp) error, +) error { + batches := map[string]*contextualUplinkBatch{} + for _, msg := range msgs { + uid, err := uidStrFrom(msg.Values) + if err != nil { + return err + } + up, err := applicationUpFrom(msg.Values) + if err != nil { + return err + } + if err := addToBatch(ctx, batches, msg.ID, uid, up); err != nil { + return err + } + } + pipeliner := q.redis.Pipeline() + for _, batch := range batches { + if err := f(batch.ctx, batch.uplinks); err != nil { + log.FromContext(ctx).WithError(err).Warn("Failed to process uplink batch") + continue // Do not confirm messages that failed to process. + } + + pipeliner.XAck(ctx, q.streamID, q.groupID, batch.confirmIDs...) + pipeliner.XDel(ctx, q.streamID, batch.confirmIDs...) + } + if _, err := pipeliner.Exec(ctx); err != nil { + return ttnredis.ConvertError(err) + } + return nil +} + // Pop implements ApplicationUplinkQueue interface. func (q *ApplicationUplinkQueue) Pop( ctx context.Context, consumerID string, limit int, @@ -218,37 +253,9 @@ func (q *ApplicationUplinkQueue) Pop( if err != nil && !errors.Is(err, redis.Nil) { return ttnredis.ConvertError(err) } - batches := map[string]*contextualUplinkBatch{} if len(streams) > 0 { stream := streams[0] msgs = append(msgs, stream.Messages...) } - - for _, msg := range msgs { - uid, err := uidStrFrom(msg.Values) - if err != nil { - return err - } - up, err := applicationUpFrom(msg.Values) - if err != nil { - return err - } - if err := addToBatch(ctx, batches, msg.ID, uid, up); err != nil { - return err - } - } - pipeliner := q.redis.Pipeline() - for _, batch := range batches { - if err := f(batch.ctx, batch.uplinks); err != nil { - log.FromContext(ctx).WithError(err).Warn("Failed to process uplink batch") - continue // Do not confirm messages that failed to process. - } - - pipeliner.XAck(ctx, q.streamID, q.groupID, batch.confirmIDs...) - pipeliner.XDel(ctx, q.streamID, batch.confirmIDs...) - } - if _, err := pipeliner.Exec(ctx); err != nil { - return ttnredis.ConvertError(err) - } - return nil + return q.processMessages(ctx, msgs, f) } From 9899aea1a04e754ed33fab5a2fe59087c2f73bd8 Mon Sep 17 00:00:00 2001 From: Oliver Cvetkovski Date: Thu, 19 Oct 2023 11:15:18 +0200 Subject: [PATCH 11/11] dev: Update CHANGELOG.md --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index 76b8756ff6..39be902f14 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -17,6 +17,7 @@ For details about compatibility between different releases, see the **Commitment - Updated Japanese translations for the Console and backend. - `--grpc.correlation-ids-ignore-methods` configuration option, which allows certain gRPC methods to be skipped from the correlation ID middleware which adds a correlation ID with the name of the gRPC method. Methods bear the format used by `--grpc.log-ignore-methods`, such as `/ttn.lorawan.v3.GsNs/HandleUplink`. - Support for setting multiple frequency plans for gateways from the Console. +- The `ns-db purge` command to purge unused data from the Network Server database. ### Changed