diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index e287d1f43..56352afcd 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -16,6 +16,37 @@ import ( "github.com/interuss/stacktrace" ) +// subscriptionIsImplicitAndOnlyAttachedToOIR will check if: +// - the subscription is defined and is implicit +// - the subscription is attached to the specified operational intent +// - the subscription is not attached to any other operational intent +// +// This is to be used in contexts where an implicit subscription may need to be cleaned up: if true is returned, +// the subscription can be safely removed after the operational intent is deleted or attached to another subscription. +// +// NOTE: this should eventually be pushed down to CRDB as part of the queries being executed in the callers of this method. +// +// See https://github.com/interuss/dss/issues/1059 for more details +func subscriptionIsImplicitAndOnlyAttachedToOIR(ctx context.Context, r repos.Repository, oirID dssmodels.ID, subscription *scdmodels.Subscription) (bool, error) { + if subscription == nil { + return false, nil + } + if !subscription.ImplicitSubscription { + return false, nil + } + // Get the Subscription's dependent OperationalIntents + dependentOps, err := r.GetDependentOperationalIntents(ctx, subscription.ID) + if err != nil { + return false, stacktrace.Propagate(err, "Could not find dependent OperationalIntents") + } + if len(dependentOps) == 0 { + return false, stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents") + } else if len(dependentOps) == 1 && dependentOps[0] == oirID { + return true, nil + } + return false, nil +} + // DeleteOperationalIntentReference deletes a single operational intent ref for a given ID at // the specified version. func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *restapi.DeleteOperationalIntentReferenceRequest, @@ -69,29 +100,15 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest } // Get the Subscription supporting the OperationalIntent, if one is defined - var sub *scdmodels.Subscription - removeImplicitSubscription := false + var previousSubscription *scdmodels.Subscription if old.SubscriptionID != nil { - sub, err = r.GetSubscription(ctx, *old.SubscriptionID) + previousSubscription, err = r.GetSubscription(ctx, *old.SubscriptionID) if err != nil { return stacktrace.Propagate(err, "Unable to get OperationalIntent's Subscription from repo") } - if sub == nil { + if previousSubscription == nil { return stacktrace.NewError("OperationalIntent's Subscription missing from repo") } - - if sub.ImplicitSubscription { - // Get the Subscription's dependent OperationalIntents - dependentOps, err := r.GetDependentOperationalIntents(ctx, sub.ID) - if err != nil { - return stacktrace.Propagate(err, "Could not find dependent OperationalIntents") - } - if len(dependentOps) == 0 { - return stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents") - } else if len(dependentOps) == 1 { - removeImplicitSubscription = true - } - } } // Gather the subscriptions that need to be notified @@ -116,15 +133,6 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest return stacktrace.Propagate(err, "Unable to delete OperationalIntent from repo") } - // removeImplicitSubscription is only true if the OIR had a subscription defined - if removeImplicitSubscription { - // Automatically remove a now-unused implicit Subscription - err = r.DeleteSubscription(ctx, sub.ID) - if err != nil { - return stacktrace.Propagate(err, "Unable to delete associated implicit Subscription") - } - } - // Return response to client response = &restapi.ChangeOperationalIntentReferenceResponse{ OperationalIntentReference: *old.ToRest(), @@ -810,33 +818,63 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize // For an OIR being created, version starts at 0 version := int32(0) + var previousSub *scdmodels.Subscription if old != nil { version = int32(old.Version) + // Fetch the previous OIR's subscription if it exists + if old.SubscriptionID != nil { + previousSub, err = r.GetSubscription(ctx, *old.SubscriptionID) + if err != nil { + return stacktrace.Propagate(err, "Unable to get OperationalIntent's Subscription from repo") + } + } + } + + // Determine if the previous subscription is being replaced and if it will need to be cleaned up + previousSubIsBeingReplaced := previousSub != nil && validParams.subscriptionID != previousSub.ID + removePreviousImplicitSubscription := false + if previousSubIsBeingReplaced { + removePreviousImplicitSubscription, err = subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, validParams.id, previousSub) + if err != nil { + return stacktrace.Propagate(err, "Could not determine if previous Subscription can be removed") + } } - var sub *scdmodels.Subscription + // attachedSub is the subscription that will end up being attached to the OIR + // it defaults to the previous subscription (which may be nil), and may be updated if required by the parameters + attachedSub := previousSub if validParams.subscriptionID.Empty() { - // Create an implicit subscription if one has been requested. - // Requesting neither an explicit nor an implicit subscription is allowed for ACCEPTED states: - // for other states, an error will have been returned earlier. - // if no implicit subscription is requested and we reached this point, we will proceed without subscription + // No subscription ID was provided: + // check if an implicit subscription should be created, otherwise do nothing if validParams.implicitSubscription.requested { - if sub, err = createAndStoreNewImplicitSubscription(ctx, r, manager, validParams); err != nil { + // Parameters for a new implicit subscription have been passed: we will create + // a new implicit subscription even if another subscription was attached to this OIR before, + // regardless of whether it was an implicit subscription or not. + if attachedSub, err = createAndStoreNewImplicitSubscription(ctx, r, manager, validParams); err != nil { return stacktrace.Propagate(err, "Failed to create implicit subscription") } + } else { + // If no subscription ID is provided and no implicit subscription is requested, + // the OIR should have no attached subscription + attachedSub = nil } } else { // Attempt to rely on the specified subscription - sub, err = r.GetSubscription(ctx, validParams.subscriptionID) - if err != nil { - return stacktrace.Propagate(err, "Unable to get depended-upon Subscription") - } - if sub == nil { - return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Depended-upon Subscription %s does not exist", validParams.subscriptionID) + // If it is different from the previous subscription, we need to fetch it from the store + // in order to ensure it correctly covers the OIR. + // We do the check below in order to avoid re-fetching the subscription if it has not changed + if attachedSub == nil || previousSubIsBeingReplaced { + attachedSub, err = r.GetSubscription(ctx, validParams.subscriptionID) + if err != nil { + return stacktrace.Propagate(err, "Unable to get requested Subscription from store") + } + if attachedSub == nil { + return stacktrace.NewErrorWithCode(dsserr.BadRequest, "Specified Subscription %s does not exist", validParams.subscriptionID) + } } // We need to confirm that it is owned by the calling manager - if sub.Manager != manager { + if attachedSub.Manager != manager { return stacktrace.Propagate( // We do a bit of wrapping gymnastics because the root error message will be sent in the response, // and we don't want to include the effective manager in there. @@ -845,27 +883,27 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize // The propagation message will end in the logs and help with debugging. "Subscription %s owned by %s, but %s attempted to use it for an OperationalIntent", validParams.subscriptionID, - sub.Manager, + attachedSub.Manager, manager, ) } // We need to ensure the subscription covers the OIR's geo-temporal extent - sub, err = ensureSubscriptionCoversOIR(ctx, r, sub, validParams) + attachedSub, err = ensureSubscriptionCoversOIR(ctx, r, attachedSub, validParams) if err != nil { return stacktrace.Propagate(err, "Failed to ensure subscription covers OIR") } } if validParams.state.RequiresKey() { - responseConflict, err = validateKeyAndProvideConflictResponse(ctx, r, manager, validParams, sub) + responseConflict, err = validateKeyAndProvideConflictResponse(ctx, r, manager, validParams, attachedSub) if err != nil { return stacktrace.PropagateWithCode(err, stacktrace.GetCode(err), "Failed to validate key") } } // Construct the new OperationalIntent - op := validParams.toOIR(manager, sub, version+1) + op := validParams.toOIR(manager, attachedSub, version+1) // Upsert the OperationalIntent op, err = r.UpsertOperationalIntent(ctx, op) @@ -873,6 +911,14 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo") } + // Check if the previously attached subscription should be removed + if removePreviousImplicitSubscription { + err = r.DeleteSubscription(ctx, previousSub.ID) + if err != nil { + return stacktrace.Propagate(err, "Unable to delete previous implicit Subscription") + } + } + notifyVolume, err := computeNotificationVolume(old, validParams.uExtent) if err != nil { return stacktrace.Propagate(err, "Failed to compute notification volume") diff --git a/pkg/scd/store/cockroach/operational_intents.go b/pkg/scd/store/cockroach/operational_intents.go index e98d3d280..22bceb9c2 100644 --- a/pkg/scd/store/cockroach/operational_intents.go +++ b/pkg/scd/store/cockroach/operational_intents.go @@ -146,10 +146,32 @@ func (s *repo) GetOperationalIntent(ctx context.Context, id dssmodels.ID) (*scdm func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) error { var ( deleteOperationQuery = ` - DELETE FROM + WITH deleted_oir AS ( + DELETE FROM scd_operations WHERE id = $1 + RETURNING + id, subscription_id + ), + exists_and_is_implicit AS ( + SELECT subscription_id + FROM deleted_oir + JOIN scd_subscriptions ON scd_subscriptions.id = deleted_oir.subscription_id + WHERE scd_subscriptions.implicit = true + ), + dependent_oirs AS ( -- NOTE: this sub-query will still return the OIR being deleted (!) + SELECT id + FROM scd_operations + WHERE subscription_id = (SELECT subscription_id FROM deleted_oir) + ), + deleted_subscription_id AS ( + DELETE FROM scd_subscriptions + WHERE id = (SELECT subscription_id FROM exists_and_is_implicit) + AND (SELECT COUNT(*) FROM dependent_oirs) = 1 -- NOTE: see above, the OIR being removed is still counted here, hence a value of 1 + RETURNING id + ) + SELECT id FROM deleted_oir ` ) @@ -157,13 +179,28 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err if err != nil { return stacktrace.Propagate(err, "Failed to convert id to PgUUID") } - res, err := s.q.Exec(ctx, deleteOperationQuery, uid) + res, err := s.q.Query(ctx, deleteOperationQuery, uid) if err != nil { return stacktrace.Propagate(err, "Error in query: %s", deleteOperationQuery) } + defer res.Close() - if res.RowsAffected() == 0 { - return stacktrace.NewError("Could not delete Operation that does not exist") + // Check that the deleted OIR ID was returned: + var opID dssmodels.ID + if res.Next() { + err = res.Scan(&opID) + if err != nil { + return stacktrace.Propagate(err, "Error scanning deleted Operation ID") + } + } else if resErr := res.Err(); resErr != nil { + // Note: typically, res.Next() will be false and res.Err() non-nil when the query fails + // because it collided with another query and the whole transaction needs to be retried. + // This situation is extremely likely to occur when the DSS is under concurrent load. + return stacktrace.Propagate(resErr, "Error in query: %s", deleteOperationQuery) + } + + if opID == "" || opID != id { + return stacktrace.NewError("Could not delete Operation that does not exist. Delete: %s, returned opID: %s", id, opID) } return nil