From 8b5907a1f9b08af947f48e887d673c50fa28ac2c Mon Sep 17 00:00:00 2001 From: Imre Halasz Date: Tue, 3 Dec 2024 10:48:52 +0100 Subject: [PATCH] as: Add default pagination limit to pubsub --- .../io/pubsub/redis/registry.go | 56 +++++++++++++++---- 1 file changed, 46 insertions(+), 10 deletions(-) diff --git a/pkg/applicationserver/io/pubsub/redis/registry.go b/pkg/applicationserver/io/pubsub/redis/registry.go index ef718fbf4a5..b1b54db0fa6 100644 --- a/pkg/applicationserver/io/pubsub/redis/registry.go +++ b/pkg/applicationserver/io/pubsub/redis/registry.go @@ -16,6 +16,7 @@ package redis import ( "context" + "runtime/trace" "time" "github.com/redis/go-redis/v9" @@ -132,19 +133,54 @@ func (r PubSubRegistry) Range(ctx context.Context, paths []string, f func(contex func (r PubSubRegistry) List(ctx context.Context, ids *ttnpb.ApplicationIdentifiers, paths []string) ([]*ttnpb.ApplicationPubSub, error) { var pbs []*ttnpb.ApplicationPubSub appUID := unique.ID(ctx, ids) - err := ttnredis.FindProtos(ctx, r.Redis, r.appKey(appUID), r.makeUIDKeyFunc(appUID)).Range(func() (proto.Message, func() (bool, error)) { - pb := &ttnpb.ApplicationPubSub{} - return pb, func() (bool, error) { - pb, err := applyPubSubFieldMask(nil, pb, appendImplicitPubSubGetPaths(paths...)...) + uidKey := r.appKey(appUID) + + opts := []ttnredis.FindProtosOption{} + limit, offset := ttnredis.PaginationLimitAndOffsetFromContext(ctx) + if limit != 0 { + opts = append(opts, + ttnredis.FindProtosSorted(false), + ttnredis.FindProtosWithOffsetAndCount(offset, limit), + ) + } + + rangeProtos := func(c redis.Cmdable) error { + return ttnredis.FindProtos(ctx, c, uidKey, r.makeUIDKeyFunc(appUID), opts...).Range( + func() (proto.Message, func() (bool, error)) { + pb := &ttnpb.ApplicationPubSub{} + return pb, func() (bool, error) { + pb, err := applyPubSubFieldMask(nil, pb, appendImplicitPubSubGetPaths(paths...)...) + if err != nil { + return false, err + } + pbs = append(pbs, pb) + return true, nil + } + }) + } + + defer trace.StartRegion(ctx, "list pubsub by application id").End() + + var err error + if limit != 0 { + var lockerID string + lockerID, err = ttnredis.GenerateLockerID() + if err != nil { + return nil, err + } + err = ttnredis.LockedWatch(ctx, r.Redis, uidKey, lockerID, r.LockTTL, func(tx *redis.Tx) (err error) { + total, err := tx.SCard(ctx, uidKey).Result() if err != nil { - return false, err + return err } - pbs = append(pbs, pb) - return true, nil - } - }) + ttnredis.SetPaginationTotal(ctx, total) + return rangeProtos(tx) + }) + } else { + err = rangeProtos(r.Redis) + } if err != nil { - return nil, err + return nil, ttnredis.ConvertError(err) } return pbs, nil }