From 9e399b34f9c406684b6542ea417fe76c42a3937f Mon Sep 17 00:00:00 2001 From: Julien Perrochet Date: Mon, 29 Jul 2024 14:50:11 +0200 Subject: [PATCH 1/2] [scd] inline cleanup of implicit subscription into CRDB call when removing OIR --- pkg/scd/operational_intents_handler.go | 14 ------ .../store/cockroach/operational_intents.go | 45 +++++++++++++++++-- 2 files changed, 41 insertions(+), 18 deletions(-) diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index 86b8732e5..c6494125b 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -111,11 +111,6 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest } } - removeImplicitSubscription, err := subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, id, previousSubscription) - if err != nil { - return stacktrace.Propagate(err, "Could not determine if Subscription can be removed") - } - // Gather the subscriptions that need to be notified notifyVolume := &dssmodels.Volume4D{ StartTime: old.StartTime, @@ -138,15 +133,6 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest return stacktrace.Propagate(err, "Unable to delete OperationalIntent from repo") } - // removeImplicitSubscription is only true if the OIR had a subscription defined - if removeImplicitSubscription { - // Automatically remove a now-unused implicit Subscription - err = r.DeleteSubscription(ctx, previousSubscription.ID) - if err != nil { - return stacktrace.Propagate(err, "Unable to delete associated implicit Subscription") - } - } - // Return response to client response = &restapi.ChangeOperationalIntentReferenceResponse{ OperationalIntentReference: *old.ToRest(), diff --git a/pkg/scd/store/cockroach/operational_intents.go b/pkg/scd/store/cockroach/operational_intents.go index 90b3082c7..d3d2a7dea 100644 --- a/pkg/scd/store/cockroach/operational_intents.go +++ b/pkg/scd/store/cockroach/operational_intents.go @@ -169,10 +169,32 @@ func (s *repo) GetOperationalIntent(ctx context.Context, id dssmodels.ID) (*scdm func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) error { var ( deleteOperationQuery = ` - DELETE FROM + WITH deleted_oir AS ( + DELETE FROM scd_operations WHERE id = $1 + RETURNING + id, subscription_id + ), + exists_and_is_implicit AS ( + SELECT subscription_id + FROM deleted_oir + JOIN scd_subscriptions ON scd_subscriptions.id = deleted_oir.subscription_id + WHERE scd_subscriptions.implicit = true + ), + dependent_oirs AS ( -- NOTE: this sub-query will still return the OIR being deleted (!) + SELECT id + FROM scd_operations + WHERE subscription_id = (SELECT subscription_id FROM deleted_oir) + ), + deleted_subscription_id AS ( + DELETE FROM scd_subscriptions + WHERE id = (SELECT subscription_id FROM exists_and_is_implicit) + AND (SELECT COUNT(*) FROM dependent_oirs) = 1 -- NOTE: see above, the OIR being removed is still counted here, hence a value of 1 + RETURNING id + ) + SELECT id FROM deleted_oir ` ) @@ -180,13 +202,28 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err if err != nil { return stacktrace.Propagate(err, "Failed to convert id to PgUUID") } - res, err := s.q.Exec(ctx, deleteOperationQuery, uid) + res, err := s.q.Query(ctx, deleteOperationQuery, uid) if err != nil { return stacktrace.Propagate(err, "Error in query: %s", deleteOperationQuery) } + defer res.Close() - if res.RowsAffected() == 0 { - return stacktrace.NewError("Could not delete Operation that does not exist") + // Check that the deleted OIR ID was returned: + var opID dssmodels.ID + if res.Next() { + err = res.Scan(&opID) + if err != nil { + return stacktrace.Propagate(err, "Error scanning deleted Operation ID") + } + } else if resErr := res.Err(); resErr != nil { + // Note: typically, res.Next() will be false and res.Err() non-nil when the query fails + // because it collided with another query and the whole transaction needs to be retried. + // This situation is extremely likely to occur when the DSS is under concurrent load. + return stacktrace.Propagate(resErr, "Error in query: %s", deleteOperationQuery) + } + + if opID == "" || opID != id { + return stacktrace.NewError("Could not delete Operation that does not exist. Delete: %s, returned opID: %s", id, opID) } return nil From fdb78496d0a4ee0239a755212cc7fafa2caba3cd Mon Sep 17 00:00:00 2001 From: Julien Perrochet Date: Mon, 5 Aug 2024 20:29:19 +0200 Subject: [PATCH 2/2] [scd] OIR upsert: push down cleanup of implicit subscription into CRDB --- pkg/scd/operational_intents_handler.go | 46 ---------------- .../store/cockroach/operational_intents.go | 52 ++++++++++++++++--- 2 files changed, 45 insertions(+), 53 deletions(-) diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index c6494125b..063c73cd5 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -16,37 +16,6 @@ import ( "github.com/interuss/stacktrace" ) -// subscriptionIsImplicitAndOnlyAttachedToOIR will check if: -// - the subscription is defined and is implicit -// - the subscription is attached to the specified operational intent -// - the subscription is not attached to any other operational intent -// -// This is to be used in contexts where an implicit subscription may need to be cleaned up: if true is returned, -// the subscription can be safely removed after the operational intent is deleted or attached to another subscription. -// -// NOTE: this should eventually be pushed down to CRDB as part of the queries being executed in the callers of this method. -// -// See https://github.com/interuss/dss/issues/1059 for more details -func subscriptionIsImplicitAndOnlyAttachedToOIR(ctx context.Context, r repos.Repository, oirID dssmodels.ID, subscription *scdmodels.Subscription) (bool, error) { - if subscription == nil { - return false, nil - } - if !subscription.ImplicitSubscription { - return false, nil - } - // Get the Subscription's dependent OperationalIntents - dependentOps, err := r.GetDependentOperationalIntents(ctx, subscription.ID) - if err != nil { - return false, stacktrace.Propagate(err, "Could not find dependent OperationalIntents") - } - if len(dependentOps) == 0 { - return false, stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents") - } else if len(dependentOps) == 1 && dependentOps[0] == oirID { - return true, nil - } - return false, nil -} - // DeleteOperationalIntentReference deletes a single operational intent ref for a given ID at // the specified version. func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *restapi.DeleteOperationalIntentReferenceRequest, @@ -837,13 +806,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize // Determine if the previous subscription is being replaced and if it will need to be cleaned up previousSubIsBeingReplaced := previousSub != nil && validParams.subscriptionID != previousSub.ID - removePreviousImplicitSubscription := false - if previousSubIsBeingReplaced { - removePreviousImplicitSubscription, err = subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, validParams.id, previousSub) - if err != nil { - return stacktrace.Propagate(err, "Could not determine if previous Subscription can be removed") - } - } // attachedSub is the subscription that will end up being attached to the OIR // it defaults to the previous subscription (which may be nil), and may be updated if required by the parameters @@ -916,14 +878,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo") } - // Check if the previously attached subscription should be removed - if removePreviousImplicitSubscription { - err = r.DeleteSubscription(ctx, previousSub.ID) - if err != nil { - return stacktrace.Propagate(err, "Unable to delete previous implicit Subscription") - } - } - notifyVolume, err := computeNotificationVolume(old, validParams.uExtent) if err != nil { return stacktrace.Propagate(err, "Failed to compute notification volume") diff --git a/pkg/scd/store/cockroach/operational_intents.go b/pkg/scd/store/cockroach/operational_intents.go index d3d2a7dea..d1f1b00cc 100644 --- a/pkg/scd/store/cockroach/operational_intents.go +++ b/pkg/scd/store/cockroach/operational_intents.go @@ -233,13 +233,51 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err func (s *repo) UpsertOperationalIntent(ctx context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) { var ( upsertOperationsQuery = fmt.Sprintf(` - UPSERT INTO - scd_operations - (%s) - VALUES - ($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11, $12, $13) - RETURNING - %s`, operationFieldsWithoutPrefix, operationFieldsWithPrefix) + WITH previous_implicit_sub AS ( + -- get the current subscription id if: + -- - it exists + -- - it is implicit + -- - the OIR's subscription is being updated (ie, the new subscription id is different from the old one) + SELECT + scd_subscriptions.id + FROM scd_operations + JOIN scd_subscriptions ON scd_operations.subscription_id = scd_subscriptions.id + WHERE + scd_operations.id = $1 + AND + scd_subscriptions.implicit = true + AND + -- in SQL, X != NULL will always be false: + -- this condition needs to cover cases where the new subscription is undefined, + -- so we add an explicit 'IS NULL' check. + (scd_subscriptions.id != $9 OR $9 IS NULL) + ), + upserted_oir AS ( + -- actual insertion/update statement + UPSERT INTO + scd_operations + (%s) + VALUES + ($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11, $12, $13) + RETURNING + %s + ), + dependent_oirs AS ( -- NOTE: this sub-query will still return the OIR being mutated (!) + SELECT id + FROM scd_operations + WHERE subscription_id = (SELECT id FROM previous_implicit_sub) + ), + deleted_subscription_id AS ( + -- We are guaranteed to only delete something here if the OIR is being updated. Upon creation + -- previous_implicit_sub will be empty + DELETE FROM scd_subscriptions + WHERE id = (SELECT id FROM previous_implicit_sub) + AND (SELECT COUNT(*) FROM dependent_oirs) = 1 -- NOTE: see above, the OIR being updated is still counted here, hence a value of 1 + RETURNING id + ) + -- return the upserted OIR + SELECT * FROM upserted_oir + `, operationFieldsWithoutPrefix, operationFieldsWithPrefix) ) cids := make([]int64, len(operation.Cells))