Skip to content

Commit

Permalink
[scd] oir upsert: factor out determination of subscriptions to notify (
Browse files Browse the repository at this point in the history
…#1092)

* [scd] oir upsert: factor out determination of subscriptions to notify

* comment

* cover delete
  • Loading branch information
Shastick authored Sep 3, 2024
1 parent 02f8ac9 commit 7029a60
Showing 1 changed file with 75 additions and 54 deletions.
129 changes: 75 additions & 54 deletions pkg/scd/operational_intents_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -94,8 +94,8 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest
}
}

// Find Subscriptions that may overlap the OperationalIntent's Volume4D
allsubs, err := r.SearchSubscriptions(ctx, &dssmodels.Volume4D{
// Gather the subscriptions that need to be notified
notifyVolume := &dssmodels.Volume4D{
StartTime: old.StartTime,
EndTime: old.EndTime,
SpatialVolume: &dssmodels.Volume3D{
Expand All @@ -104,22 +104,11 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest
Footprint: dssmodels.GeometryFunc(func() (s2.CellUnion, error) {
return old.Cells, nil
}),
}})
if err != nil {
return stacktrace.Propagate(err, "Unable to search Subscriptions in repo")
}

// Limit Subscription notifications to only those interested in OperationalIntents
subs := repos.Subscriptions{}
for _, s := range allsubs {
if s.NotifyForOperationalIntents {
subs = append(subs, s)
}
}
}}

// Increment notification indices for Subscriptions to be notified
if err := subs.IncrementNotificationIndices(ctx, r); err != nil {
return stacktrace.Propagate(err, "Unable to increment notification indices")
subsToNotify, err := getRelevantSubscriptionsAndIncrementIndices(ctx, r, notifyVolume)
if err != nil {
return stacktrace.Propagate(err, "could not obtain relevant subscriptions")
}

// Delete OperationalIntent from repo
Expand All @@ -139,7 +128,7 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest
// Return response to client
response = &restapi.ChangeOperationalIntentReferenceResponse{
OperationalIntentReference: *old.ToRest(),
Subscribers: makeSubscribersToNotify(subs),
Subscribers: makeSubscribersToNotify(subsToNotify),
}

return nil
Expand Down Expand Up @@ -517,6 +506,68 @@ func validateUpsertRequestAgainstPreviousOIR(
return nil
}

// computeNotificationVolume computes the volume that needs to be queried for subscriptions
// given the requested extent and the (possibly nil) previous operational intent.
// The returned volume is either the union of the requested extent and the previous OIR's extent, or just the requested extent
// if the previous OIR is nil.
func computeNotificationVolume(
previousOIR *scdmodels.OperationalIntent,
requestedExtent *dssmodels.Volume4D) (*dssmodels.Volume4D, error) {

if previousOIR == nil {
return requestedExtent, nil
}

// Compute total affected Volume4D for notification purposes
oldVolume := &dssmodels.Volume4D{
StartTime: previousOIR.StartTime,
EndTime: previousOIR.EndTime,
SpatialVolume: &dssmodels.Volume3D{
AltitudeHi: previousOIR.AltitudeUpper,
AltitudeLo: previousOIR.AltitudeLower,
Footprint: dssmodels.GeometryFunc(func() (s2.CellUnion, error) {
return previousOIR.Cells, nil
}),
},
}
notifyVolume, err := dssmodels.UnionVolumes4D(requestedExtent, oldVolume)
if err != nil {
return nil, stacktrace.Propagate(err, "Error constructing 4D volumes union")
}

return notifyVolume, nil
}

// getRelevantSubscriptionsAndIncrementIndices retrieves the subscriptions relevant to the passed volume and increments their notification indices
// before returning them.
func getRelevantSubscriptionsAndIncrementIndices(
ctx context.Context,
r repos.Repository,
notifyVolume *dssmodels.Volume4D,
) (repos.Subscriptions, error) {

// Find Subscriptions that may need to be notified
allsubs, err := r.SearchSubscriptions(ctx, notifyVolume)
if err != nil {
return nil, stacktrace.Propagate(err, "Failed to search for impacted subscriptions.")
}

// Limit Subscription notifications to only those interested in OperationalIntents
subs := repos.Subscriptions{}
for _, sub := range allsubs {
if sub.NotifyForOperationalIntents {
subs = append(subs, sub)
}
}

// Increment notification indices for relevant Subscriptions
if err := subs.IncrementNotificationIndices(ctx, r); err != nil {
return nil, stacktrace.Propagate(err, "Failed to increment notification indices of relevant subscriptions")
}

return subs, nil
}

// validateKeyAndProvideConflictResponse ensures that the provided key contains all the necessary OVNs relevant for the area covered by the OperationalIntent.
// - If all required keys are provided, (nil, nil) will be returned.
// - If keys are missing, the conflict response to be sent back as well as an error with the dsserr.MissingOVNs code will be returned.
Expand Down Expand Up @@ -758,57 +809,27 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
return stacktrace.Propagate(err, "Error validating time range")
}

// Compute total affected Volume4D for notification purposes
var notifyVol4 *dssmodels.Volume4D
if old == nil {
notifyVol4 = validParams.uExtent
} else {
oldVol4 := &dssmodels.Volume4D{
StartTime: old.StartTime,
EndTime: old.EndTime,
SpatialVolume: &dssmodels.Volume3D{
AltitudeHi: old.AltitudeUpper,
AltitudeLo: old.AltitudeLower,
Footprint: dssmodels.GeometryFunc(func() (s2.CellUnion, error) {
return old.Cells, nil
}),
}}
notifyVol4, err = dssmodels.UnionVolumes4D(validParams.uExtent, oldVol4)
if err != nil {
return stacktrace.Propagate(err, "Error constructing 4D volumes union")
}
}

// Upsert the OperationalIntent
op, err = r.UpsertOperationalIntent(ctx, op)
if err != nil {
return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo")
}

// Find Subscriptions that may need to be notified
allsubs, err := r.SearchSubscriptions(ctx, notifyVol4)
notifyVolume, err := computeNotificationVolume(old, validParams.uExtent)
if err != nil {
return err
}

// Limit Subscription notifications to only those interested in OperationalIntents
subs := repos.Subscriptions{}
for _, sub := range allsubs {
if sub.NotifyForOperationalIntents {
subs = append(subs, sub)
}
return stacktrace.Propagate(err, "Failed to compute notification volume")
}

// Increment notification indices for relevant Subscriptions
err = subs.IncrementNotificationIndices(ctx, r)
// Notify relevant Subscriptions
subsToNotify, err := getRelevantSubscriptionsAndIncrementIndices(ctx, r, notifyVolume)
if err != nil {
return err
return stacktrace.Propagate(err, "Failed to notify relevant Subscriptions")
}

// Return response to client
responseOK = &restapi.ChangeOperationalIntentReferenceResponse{
OperationalIntentReference: *op.ToRest(),
Subscribers: makeSubscribersToNotify(subs),
Subscribers: makeSubscribersToNotify(subsToNotify),
}

return nil
Expand Down

0 comments on commit 7029a60

Please sign in to comment.