Skip to content

Commit

Permalink
Merge pull request #6641 from TheThingsNetwork/feature/remove-ns-appl…
Browse files Browse the repository at this point in the history
…ication-uplink-queue

Backport: Remove NS application uplink queue
  • Loading branch information
cvetkovski98 authored Oct 19, 2023
2 parents d23194d + 9899aea commit 179cc39
Show file tree
Hide file tree
Showing 10 changed files with 702 additions and 574 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ For details about compatibility between different releases, see the **Commitment
- Updated Japanese translations for the Console and backend.
- `--grpc.correlation-ids-ignore-methods` configuration option, which allows certain gRPC methods to be skipped from the correlation ID middleware which adds a correlation ID with the name of the gRPC method. Methods bear the format used by `--grpc.log-ignore-methods`, such as `/ttn.lorawan.v3.GsNs/HandleUplink`.
- Support for setting multiple frequency plans for gateways from the Console.
- The `ns-db purge` command to purge unused data from the Network Server database.

### Changed

Expand Down
49 changes: 49 additions & 0 deletions cmd/ttn-lw-stack/commands/ns_db.go
Original file line number Diff line number Diff line change
Expand Up @@ -211,6 +211,54 @@ var (
return nil
},
}
nsDBPurgeCommand = &cobra.Command{
Use: "purge",
Short: "Purge Network Server application data",
RunE: func(cmd *cobra.Command, args []string) error {
if config.Redis.IsZero() {
panic("Only Redis is supported by this command")
}

logger.Info("Connecting to Redis database...")
cl := NewNetworkServerApplicationUplinkQueueRedis(config)
defer cl.Close()

var purged uint64

genericUIDKeys := nsredis.ApplicationUplinkQueueUIDGenericUplinkKey(cl, "*")
invalidationUIDKeys := ttnredis.Key(genericUIDKeys, "invalidation")
joinAcceptUIDKeys := ttnredis.Key(genericUIDKeys, "join-accept")
taskQueueKeys := ttnredis.Key(cl.Key("application"), "*")

targets := []string{
genericUIDKeys,
invalidationUIDKeys,
joinAcceptUIDKeys,
taskQueueKeys,
}

pipeliner := cl.Pipeline()
for _, target := range targets {
err := ttnredis.RangeRedisKeys(ctx, cl, target, ttnredis.DefaultRangeCount,
func(k string) (bool, error) {
pipeliner.Del(ctx, k)
purged++
return true, nil
})
if err != nil {
logger.WithError(err).Error("Failed to purge Network Server application data")
return err
}
}
if _, err := pipeliner.Exec(ctx); err != nil {
logger.WithError(err).Error("Failed to purge Network Server application data")
return err
}

logger.WithField("records_purged_count", purged).Info("Purged Network Server application data")
return nil
},
}
)

func init() {
Expand All @@ -221,4 +269,5 @@ func init() {
nsDBCleanupCommand.Flags().Bool("dry-run", false, "Dry run")
nsDBCleanupCommand.Flags().Duration("pagination-delay", 100, "Delay between batch requests")
nsDBCommand.AddCommand(nsDBCleanupCommand)
nsDBCommand.AddCommand(nsDBPurgeCommand)
}
18 changes: 18 additions & 0 deletions config/messages.json
Original file line number Diff line number Diff line change
Expand Up @@ -7370,6 +7370,15 @@
"file": "application_uplink_queue.go"
}
},
"error:pkg/networkserver/redis:invalid_uid": {
"translations": {
"en": "invalid UID"
},
"description": {
"package": "pkg/networkserver/redis",
"file": "application_uplink_queue.go"
}
},
"error:pkg/networkserver/redis:missing_downlink_correlation_id": {
"translations": {
"en": "missing identifier correlation ID on downlink message"
Expand Down Expand Up @@ -7397,6 +7406,15 @@
"file": "registry.go"
}
},
"error:pkg/networkserver/redis:missing_uid": {
"translations": {
"en": "missing UID"
},
"description": {
"package": "pkg/networkserver/redis",
"file": "application_uplink_queue.go"
}
},
"error:pkg/networkserver/redis:no_uplink_match": {
"translations": {
"en": "no device matches uplink"
Expand Down
40 changes: 14 additions & 26 deletions pkg/networkserver/grpc_asns.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,14 +40,11 @@ type ApplicationUplinkQueue interface {
// Implementations must ensure that Add returns fast.
Add(ctx context.Context, ups ...*ttnpb.ApplicationUp) error

// Dispatch dispatches the tasks in the queue.
Dispatch(ctx context.Context, consumerID string) error

// PopApplication calls f on the most recent application uplink task in the schedule, for which timestamp is in range [0, time.Now()],
// if such is available, otherwise it blocks until it is.
// Pop groups up to limit most recent application uplinks in the queue
// by their application ID and calls f on each group.
// Context passed to f must be derived from ctx.
// Implementations must respect ctx.Done() value on best-effort basis.
Pop(ctx context.Context, consumerID string, f func(context.Context, *ttnpb.ApplicationIdentifiers, ApplicationUplinkQueueDrainFunc) (time.Time, error)) error
Pop(ctx context.Context, consumerID string, limit int, f func(context.Context, []*ttnpb.ApplicationUp) error) error
}

func applicationJoinAcceptWithoutAppSKey(pld *ttnpb.ApplicationJoinAccept) *ttnpb.ApplicationJoinAccept {
Expand Down Expand Up @@ -99,29 +96,20 @@ func (ns *NetworkServer) createProcessApplicationUplinkTask(consumerID string) f
}

func (ns *NetworkServer) processApplicationUplinkTask(ctx context.Context, consumerID string) error {
return ns.applicationUplinks.Pop(ctx, consumerID, func(ctx context.Context, appID *ttnpb.ApplicationIdentifiers, drain ApplicationUplinkQueueDrainFunc) (time.Time, error) {
conn, err := ns.GetPeerConn(ctx, ttnpb.ClusterRole_APPLICATION_SERVER, nil)
if err != nil {
log.FromContext(ctx).WithError(err).Warn("Failed to get Application Server peer")
return time.Now().Add(applicationUplinkTaskRetryInterval), nil
}

cl := ttnpb.NewNsAsClient(conn)
var sendErr bool
if err := drain(applicationUplinkLimit, func(ups ...*ttnpb.ApplicationUp) error {
err := ns.sendApplicationUplinks(ctx, cl, ups...)
return ns.applicationUplinks.Pop(ctx, consumerID, applicationUplinkLimit,
func(ctx context.Context, ups []*ttnpb.ApplicationUp) error {
conn, err := ns.GetPeerConn(ctx, ttnpb.ClusterRole_APPLICATION_SERVER, nil)
if err != nil {
sendErr = true
log.FromContext(ctx).WithError(err).Warn("Failed to get Application Server peer")
return err
}
return err
}); err != nil {
if !sendErr {
log.FromContext(ctx).WithError(err).Error("Failed to drain application uplinks")
cl := ttnpb.NewNsAsClient(conn)
if err := ns.sendApplicationUplinks(ctx, cl, ups...); err != nil {
log.FromContext(ctx).WithError(err).Error("Failed to send application uplinks")
return err
}
return time.Now().Add(applicationUplinkTaskRetryInterval), nil
}
return time.Time{}, nil
})
return nil
})
}

func minAFCntDown(session *ttnpb.Session, macState *ttnpb.MACState) (uint32, error) {
Expand Down
Loading

0 comments on commit 179cc39

Please sign in to comment.