From e7103e24ea378dc13ab17daa948ff82b2edc9bc5 Mon Sep 17 00:00:00 2001 From: Julien Perrochet Date: Fri, 30 Aug 2024 16:21:57 +0200 Subject: [PATCH 1/3] [scd] oir upsert: factor out determination of subscriptions to notify --- pkg/scd/operational_intents_handler.go | 93 +++++++++++++++----------- 1 file changed, 54 insertions(+), 39 deletions(-) diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index e8e4d6e53..1235c94af 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -517,6 +517,56 @@ func validateUpsertRequestAgainstPreviousOIR( return nil } +func incrementIndicesAndGetRelevantSubscriptions( + ctx context.Context, + r repos.Repository, + previousOIR *scdmodels.OperationalIntent, + requestedExtent *dssmodels.Volume4D, +) (repos.Subscriptions, error) { + + // Compute total affected Volume4D for notification purposes + notifyVol4 := requestedExtent + if previousOIR != nil { + oldVol4 := &dssmodels.Volume4D{ + StartTime: previousOIR.StartTime, + EndTime: previousOIR.EndTime, + SpatialVolume: &dssmodels.Volume3D{ + AltitudeHi: previousOIR.AltitudeUpper, + AltitudeLo: previousOIR.AltitudeLower, + Footprint: dssmodels.GeometryFunc(func() (s2.CellUnion, error) { + return previousOIR.Cells, nil + }), + }, + } + var err error + notifyVol4, err = dssmodels.UnionVolumes4D(requestedExtent, oldVol4) + if err != nil { + return nil, stacktrace.Propagate(err, "Error constructing 4D volumes union") + } + } + + // Find Subscriptions that may need to be notified + allsubs, err := r.SearchSubscriptions(ctx, notifyVol4) + if err != nil { + return nil, stacktrace.Propagate(err, "Failed to search for impacted subscriptions.") + } + + // Limit Subscription notifications to only those interested in OperationalIntents + subs := repos.Subscriptions{} + for _, sub := range allsubs { + if sub.NotifyForOperationalIntents { + subs = append(subs, sub) + } + } + + // Increment notification indices for relevant Subscriptions + if err := subs.IncrementNotificationIndices(ctx, r); err != nil { + return nil, stacktrace.Propagate(err, "Failed to increment notification indices of relevant subscriptions") + } + + return subs, nil +} + // validateKeyAndProvideConflictResponse ensures that the provided key contains all the necessary OVNs relevant for the area covered by the OperationalIntent. // - If all required keys are provided, (nil, nil) will be returned. // - If keys are missing, the conflict response to be sent back as well as an error with the dsserr.MissingOVNs code will be returned. @@ -758,57 +808,22 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize return stacktrace.Propagate(err, "Error validating time range") } - // Compute total affected Volume4D for notification purposes - var notifyVol4 *dssmodels.Volume4D - if old == nil { - notifyVol4 = validParams.uExtent - } else { - oldVol4 := &dssmodels.Volume4D{ - StartTime: old.StartTime, - EndTime: old.EndTime, - SpatialVolume: &dssmodels.Volume3D{ - AltitudeHi: old.AltitudeUpper, - AltitudeLo: old.AltitudeLower, - Footprint: dssmodels.GeometryFunc(func() (s2.CellUnion, error) { - return old.Cells, nil - }), - }} - notifyVol4, err = dssmodels.UnionVolumes4D(validParams.uExtent, oldVol4) - if err != nil { - return stacktrace.Propagate(err, "Error constructing 4D volumes union") - } - } - // Upsert the OperationalIntent op, err = r.UpsertOperationalIntent(ctx, op) if err != nil { return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo") } - // Find Subscriptions that may need to be notified - allsubs, err := r.SearchSubscriptions(ctx, notifyVol4) + // Notify relevant Subscriptions + subsToNotify, err := incrementIndicesAndGetRelevantSubscriptions(ctx, r, old, validParams.uExtent) if err != nil { - return err - } - - // Limit Subscription notifications to only those interested in OperationalIntents - subs := repos.Subscriptions{} - for _, sub := range allsubs { - if sub.NotifyForOperationalIntents { - subs = append(subs, sub) - } - } - - // Increment notification indices for relevant Subscriptions - err = subs.IncrementNotificationIndices(ctx, r) - if err != nil { - return err + return stacktrace.Propagate(err, "Failed to notify relevant Subscriptions") } // Return response to client responseOK = &restapi.ChangeOperationalIntentReferenceResponse{ OperationalIntentReference: *op.ToRest(), - Subscribers: makeSubscribersToNotify(subs), + Subscribers: makeSubscribersToNotify(subsToNotify), } return nil From 47e45c3cf7b1d0a55eee053b2d72eee690c53f31 Mon Sep 17 00:00:00 2001 From: Julien Perrochet Date: Mon, 2 Sep 2024 09:30:48 +0200 Subject: [PATCH 2/3] comment --- pkg/scd/operational_intents_handler.go | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index 1235c94af..a3112f516 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -517,7 +517,7 @@ func validateUpsertRequestAgainstPreviousOIR( return nil } -func incrementIndicesAndGetRelevantSubscriptions( +func getRelevantSubscriptionsAndIncrementIndices( ctx context.Context, r repos.Repository, previousOIR *scdmodels.OperationalIntent, @@ -815,7 +815,7 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize } // Notify relevant Subscriptions - subsToNotify, err := incrementIndicesAndGetRelevantSubscriptions(ctx, r, old, validParams.uExtent) + subsToNotify, err := getRelevantSubscriptionsAndIncrementIndices(ctx, r, old, validParams.uExtent) if err != nil { return stacktrace.Propagate(err, "Failed to notify relevant Subscriptions") } From 46d34f04762d929d0ca6c254f87b10c8cc299e10 Mon Sep 17 00:00:00 2001 From: Julien Perrochet Date: Mon, 2 Sep 2024 20:56:43 +0200 Subject: [PATCH 3/3] cover delete --- pkg/scd/operational_intents_handler.go | 92 ++++++++++++++------------ 1 file changed, 49 insertions(+), 43 deletions(-) diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index a3112f516..226ed0440 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -94,8 +94,8 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest } } - // Find Subscriptions that may overlap the OperationalIntent's Volume4D - allsubs, err := r.SearchSubscriptions(ctx, &dssmodels.Volume4D{ + // Gather the subscriptions that need to be notified + notifyVolume := &dssmodels.Volume4D{ StartTime: old.StartTime, EndTime: old.EndTime, SpatialVolume: &dssmodels.Volume3D{ @@ -104,22 +104,11 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest Footprint: dssmodels.GeometryFunc(func() (s2.CellUnion, error) { return old.Cells, nil }), - }}) - if err != nil { - return stacktrace.Propagate(err, "Unable to search Subscriptions in repo") - } - - // Limit Subscription notifications to only those interested in OperationalIntents - subs := repos.Subscriptions{} - for _, s := range allsubs { - if s.NotifyForOperationalIntents { - subs = append(subs, s) - } - } + }} - // Increment notification indices for Subscriptions to be notified - if err := subs.IncrementNotificationIndices(ctx, r); err != nil { - return stacktrace.Propagate(err, "Unable to increment notification indices") + subsToNotify, err := getRelevantSubscriptionsAndIncrementIndices(ctx, r, notifyVolume) + if err != nil { + return stacktrace.Propagate(err, "could not obtain relevant subscriptions") } // Delete OperationalIntent from repo @@ -139,7 +128,7 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest // Return response to client response = &restapi.ChangeOperationalIntentReferenceResponse{ OperationalIntentReference: *old.ToRest(), - Subscribers: makeSubscribersToNotify(subs), + Subscribers: makeSubscribersToNotify(subsToNotify), } return nil @@ -517,36 +506,48 @@ func validateUpsertRequestAgainstPreviousOIR( return nil } -func getRelevantSubscriptionsAndIncrementIndices( - ctx context.Context, - r repos.Repository, +// computeNotificationVolume computes the volume that needs to be queried for subscriptions +// given the requested extent and the (possibly nil) previous operational intent. +// The returned volume is either the union of the requested extent and the previous OIR's extent, or just the requested extent +// if the previous OIR is nil. +func computeNotificationVolume( previousOIR *scdmodels.OperationalIntent, - requestedExtent *dssmodels.Volume4D, -) (repos.Subscriptions, error) { + requestedExtent *dssmodels.Volume4D) (*dssmodels.Volume4D, error) { + + if previousOIR == nil { + return requestedExtent, nil + } // Compute total affected Volume4D for notification purposes - notifyVol4 := requestedExtent - if previousOIR != nil { - oldVol4 := &dssmodels.Volume4D{ - StartTime: previousOIR.StartTime, - EndTime: previousOIR.EndTime, - SpatialVolume: &dssmodels.Volume3D{ - AltitudeHi: previousOIR.AltitudeUpper, - AltitudeLo: previousOIR.AltitudeLower, - Footprint: dssmodels.GeometryFunc(func() (s2.CellUnion, error) { - return previousOIR.Cells, nil - }), - }, - } - var err error - notifyVol4, err = dssmodels.UnionVolumes4D(requestedExtent, oldVol4) - if err != nil { - return nil, stacktrace.Propagate(err, "Error constructing 4D volumes union") - } + oldVolume := &dssmodels.Volume4D{ + StartTime: previousOIR.StartTime, + EndTime: previousOIR.EndTime, + SpatialVolume: &dssmodels.Volume3D{ + AltitudeHi: previousOIR.AltitudeUpper, + AltitudeLo: previousOIR.AltitudeLower, + Footprint: dssmodels.GeometryFunc(func() (s2.CellUnion, error) { + return previousOIR.Cells, nil + }), + }, + } + notifyVolume, err := dssmodels.UnionVolumes4D(requestedExtent, oldVolume) + if err != nil { + return nil, stacktrace.Propagate(err, "Error constructing 4D volumes union") } + return notifyVolume, nil +} + +// getRelevantSubscriptionsAndIncrementIndices retrieves the subscriptions relevant to the passed volume and increments their notification indices +// before returning them. +func getRelevantSubscriptionsAndIncrementIndices( + ctx context.Context, + r repos.Repository, + notifyVolume *dssmodels.Volume4D, +) (repos.Subscriptions, error) { + // Find Subscriptions that may need to be notified - allsubs, err := r.SearchSubscriptions(ctx, notifyVol4) + allsubs, err := r.SearchSubscriptions(ctx, notifyVolume) if err != nil { return nil, stacktrace.Propagate(err, "Failed to search for impacted subscriptions.") } @@ -814,8 +815,13 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo") } + notifyVolume, err := computeNotificationVolume(old, validParams.uExtent) + if err != nil { + return stacktrace.Propagate(err, "Failed to compute notification volume") + } + // Notify relevant Subscriptions - subsToNotify, err := getRelevantSubscriptionsAndIncrementIndices(ctx, r, old, validParams.uExtent) + subsToNotify, err := getRelevantSubscriptionsAndIncrementIndices(ctx, r, notifyVolume) if err != nil { return stacktrace.Propagate(err, "Failed to notify relevant Subscriptions") }