Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[dss] scd: oir upsert - inline cleanup of implicit subscription into CRDB #1064

Open
wants to merge 2 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
60 changes: 0 additions & 60 deletions pkg/scd/operational_intents_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,37 +16,6 @@ import (
"github.com/interuss/stacktrace"
)

// subscriptionIsImplicitAndOnlyAttachedToOIR will check if:
// - the subscription is defined and is implicit
// - the subscription is attached to the specified operational intent
// - the subscription is not attached to any other operational intent
//
// This is to be used in contexts where an implicit subscription may need to be cleaned up: if true is returned,
// the subscription can be safely removed after the operational intent is deleted or attached to another subscription.
//
// NOTE: this should eventually be pushed down to CRDB as part of the queries being executed in the callers of this method.
//
// See https://github.com/interuss/dss/issues/1059 for more details
func subscriptionIsImplicitAndOnlyAttachedToOIR(ctx context.Context, r repos.Repository, oirID dssmodels.ID, subscription *scdmodels.Subscription) (bool, error) {
if subscription == nil {
return false, nil
}
if !subscription.ImplicitSubscription {
return false, nil
}
// Get the Subscription's dependent OperationalIntents
dependentOps, err := r.GetDependentOperationalIntents(ctx, subscription.ID)
if err != nil {
return false, stacktrace.Propagate(err, "Could not find dependent OperationalIntents")
}
if len(dependentOps) == 0 {
return false, stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents")
} else if len(dependentOps) == 1 && dependentOps[0] == oirID {
return true, nil
}
return false, nil
}

// DeleteOperationalIntentReference deletes a single operational intent ref for a given ID at
// the specified version.
func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *restapi.DeleteOperationalIntentReferenceRequest,
Expand Down Expand Up @@ -111,11 +80,6 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest
}
}

removeImplicitSubscription, err := subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, id, previousSubscription)
if err != nil {
return stacktrace.Propagate(err, "Could not determine if Subscription can be removed")
}

// Gather the subscriptions that need to be notified
notifyVolume := &dssmodels.Volume4D{
StartTime: old.StartTime,
Expand All @@ -138,15 +102,6 @@ func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *rest
return stacktrace.Propagate(err, "Unable to delete OperationalIntent from repo")
}

// removeImplicitSubscription is only true if the OIR had a subscription defined
if removeImplicitSubscription {
// Automatically remove a now-unused implicit Subscription
err = r.DeleteSubscription(ctx, previousSubscription.ID)
if err != nil {
return stacktrace.Propagate(err, "Unable to delete associated implicit Subscription")
}
}

// Return response to client
response = &restapi.ChangeOperationalIntentReferenceResponse{
OperationalIntentReference: *old.ToRest(),
Expand Down Expand Up @@ -851,13 +806,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize

// Determine if the previous subscription is being replaced and if it will need to be cleaned up
previousSubIsBeingReplaced := previousSub != nil && validParams.subscriptionID != previousSub.ID
removePreviousImplicitSubscription := false
if previousSubIsBeingReplaced {
removePreviousImplicitSubscription, err = subscriptionIsImplicitAndOnlyAttachedToOIR(ctx, r, validParams.id, previousSub)
if err != nil {
return stacktrace.Propagate(err, "Could not determine if previous Subscription can be removed")
}
}

// attachedSub is the subscription that will end up being attached to the OIR
// it defaults to the previous subscription (which may be nil), and may be updated if required by the parameters
Expand Down Expand Up @@ -930,14 +878,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo")
}

// Check if the previously attached subscription should be removed
if removePreviousImplicitSubscription {
err = r.DeleteSubscription(ctx, previousSub.ID)
if err != nil {
return stacktrace.Propagate(err, "Unable to delete previous implicit Subscription")
}
}

notifyVolume, err := computeNotificationVolume(old, validParams.uExtent)
if err != nil {
return stacktrace.Propagate(err, "Failed to compute notification volume")
Expand Down
97 changes: 86 additions & 11 deletions pkg/scd/store/cockroach/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -169,24 +169,61 @@ func (s *repo) GetOperationalIntent(ctx context.Context, id dssmodels.ID) (*scdm
func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) error {
var (
deleteOperationQuery = `
DELETE FROM
WITH deleted_oir AS (
DELETE FROM
scd_operations
WHERE
id = $1
RETURNING
id, subscription_id
),
exists_and_is_implicit AS (
SELECT subscription_id
FROM deleted_oir
JOIN scd_subscriptions ON scd_subscriptions.id = deleted_oir.subscription_id
WHERE scd_subscriptions.implicit = true
),
dependent_oirs AS ( -- NOTE: this sub-query will still return the OIR being deleted (!)
SELECT id
FROM scd_operations
WHERE subscription_id = (SELECT subscription_id FROM deleted_oir)
),
deleted_subscription_id AS (
DELETE FROM scd_subscriptions
WHERE id = (SELECT subscription_id FROM exists_and_is_implicit)
AND (SELECT COUNT(*) FROM dependent_oirs) = 1 -- NOTE: see above, the OIR being removed is still counted here, hence a value of 1
RETURNING id
)
SELECT id FROM deleted_oir
`
)

uid, err := id.PgUUID()
if err != nil {
return stacktrace.Propagate(err, "Failed to convert id to PgUUID")
}
res, err := s.q.Exec(ctx, deleteOperationQuery, uid)
res, err := s.q.Query(ctx, deleteOperationQuery, uid)
if err != nil {
return stacktrace.Propagate(err, "Error in query: %s", deleteOperationQuery)
}
defer res.Close()

if res.RowsAffected() == 0 {
return stacktrace.NewError("Could not delete Operation that does not exist")
// Check that the deleted OIR ID was returned:
var opID dssmodels.ID
if res.Next() {
err = res.Scan(&opID)
if err != nil {
return stacktrace.Propagate(err, "Error scanning deleted Operation ID")
}
} else if resErr := res.Err(); resErr != nil {
// Note: typically, res.Next() will be false and res.Err() non-nil when the query fails
// because it collided with another query and the whole transaction needs to be retried.
// This situation is extremely likely to occur when the DSS is under concurrent load.
return stacktrace.Propagate(resErr, "Error in query: %s", deleteOperationQuery)
}

if opID == "" || opID != id {
return stacktrace.NewError("Could not delete Operation that does not exist. Delete: %s, returned opID: %s", id, opID)
}

return nil
Expand All @@ -196,13 +233,51 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err
func (s *repo) UpsertOperationalIntent(ctx context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) {
var (
upsertOperationsQuery = fmt.Sprintf(`
UPSERT INTO
scd_operations
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11, $12, $13)
RETURNING
%s`, operationFieldsWithoutPrefix, operationFieldsWithPrefix)
WITH previous_implicit_sub AS (
-- get the current subscription id if:
-- - it exists
-- - it is implicit
-- - the OIR's subscription is being updated (ie, the new subscription id is different from the old one)
SELECT
scd_subscriptions.id
FROM scd_operations
JOIN scd_subscriptions ON scd_operations.subscription_id = scd_subscriptions.id
WHERE
scd_operations.id = $1
AND
scd_subscriptions.implicit = true
AND
-- in SQL, X != NULL will always be false:
-- this condition needs to cover cases where the new subscription is undefined,
-- so we add an explicit 'IS NULL' check.
(scd_subscriptions.id != $9 OR $9 IS NULL)
),
upserted_oir AS (
-- actual insertion/update statement
UPSERT INTO
scd_operations
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11, $12, $13)
RETURNING
%s
),
dependent_oirs AS ( -- NOTE: this sub-query will still return the OIR being mutated (!)
SELECT id
FROM scd_operations
WHERE subscription_id = (SELECT id FROM previous_implicit_sub)
),
deleted_subscription_id AS (
-- We are guaranteed to only delete something here if the OIR is being updated. Upon creation
-- previous_implicit_sub will be empty
DELETE FROM scd_subscriptions
WHERE id = (SELECT id FROM previous_implicit_sub)
AND (SELECT COUNT(*) FROM dependent_oirs) = 1 -- NOTE: see above, the OIR being updated is still counted here, hence a value of 1
RETURNING id
)
-- return the upserted OIR
SELECT * FROM upserted_oir
`, operationFieldsWithoutPrefix, operationFieldsWithPrefix)
)

cids := make([]int64, len(operation.Cells))
Expand Down
Loading