Skip to content

Commit

Permalink
as: Add default pagination limit to pubsub
Browse files Browse the repository at this point in the history
  • Loading branch information
halimi committed Dec 3, 2024
1 parent 2e8fc9d commit 8b5907a
Showing 1 changed file with 46 additions and 10 deletions.
56 changes: 46 additions & 10 deletions pkg/applicationserver/io/pubsub/redis/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ package redis

import (
"context"
"runtime/trace"
"time"

"github.com/redis/go-redis/v9"
Expand Down Expand Up @@ -132,19 +133,54 @@ func (r PubSubRegistry) Range(ctx context.Context, paths []string, f func(contex
func (r PubSubRegistry) List(ctx context.Context, ids *ttnpb.ApplicationIdentifiers, paths []string) ([]*ttnpb.ApplicationPubSub, error) {
var pbs []*ttnpb.ApplicationPubSub
appUID := unique.ID(ctx, ids)
err := ttnredis.FindProtos(ctx, r.Redis, r.appKey(appUID), r.makeUIDKeyFunc(appUID)).Range(func() (proto.Message, func() (bool, error)) {
pb := &ttnpb.ApplicationPubSub{}
return pb, func() (bool, error) {
pb, err := applyPubSubFieldMask(nil, pb, appendImplicitPubSubGetPaths(paths...)...)
uidKey := r.appKey(appUID)

opts := []ttnredis.FindProtosOption{}
limit, offset := ttnredis.PaginationLimitAndOffsetFromContext(ctx)
if limit != 0 {
opts = append(opts,
ttnredis.FindProtosSorted(false),
ttnredis.FindProtosWithOffsetAndCount(offset, limit),
)
}

rangeProtos := func(c redis.Cmdable) error {
return ttnredis.FindProtos(ctx, c, uidKey, r.makeUIDKeyFunc(appUID), opts...).Range(
func() (proto.Message, func() (bool, error)) {
pb := &ttnpb.ApplicationPubSub{}
return pb, func() (bool, error) {
pb, err := applyPubSubFieldMask(nil, pb, appendImplicitPubSubGetPaths(paths...)...)
if err != nil {
return false, err
}
pbs = append(pbs, pb)
return true, nil
}
})
}

defer trace.StartRegion(ctx, "list pubsub by application id").End()

var err error
if limit != 0 {
var lockerID string
lockerID, err = ttnredis.GenerateLockerID()
if err != nil {
return nil, err
}
err = ttnredis.LockedWatch(ctx, r.Redis, uidKey, lockerID, r.LockTTL, func(tx *redis.Tx) (err error) {
total, err := tx.SCard(ctx, uidKey).Result()
if err != nil {
return false, err
return err
}
pbs = append(pbs, pb)
return true, nil
}
})
ttnredis.SetPaginationTotal(ctx, total)
return rangeProtos(tx)
})
} else {
err = rangeProtos(r.Redis)
}
if err != nil {
return nil, err
return nil, ttnredis.ConvertError(err)
}
return pbs, nil
}
Expand Down

0 comments on commit 8b5907a

Please sign in to comment.