Skip to content

Commit

Permalink
[scd] OIR upsert: push down cleanup of implicit subscription into CRDB
Browse files Browse the repository at this point in the history
  • Loading branch information
Shastick committed Aug 6, 2024
1 parent ad28b74 commit 17c419f
Show file tree
Hide file tree
Showing 2 changed files with 42 additions and 59 deletions.
52 changes: 0 additions & 52 deletions pkg/scd/operational_intents_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,42 +16,6 @@ import (
"github.com/interuss/stacktrace"
)

// subscriptionCanBeRemoved will check if:
// - a previous subscription was attached,
// - if so, if it was an implicit subscription
// - if so, if we can remove it after creating the new implicit subscription
//
// This is to be used in contexts where an implicit subscription may need to be cleaned up.
// NOTE: this should eventually be pushed down to CRDB as part of the queries being executed in the callers of this method.
//
// See https://github.com/interuss/dss/issues/1059 for more details
func subscriptionCanBeRemoved(ctx context.Context, r repos.Repository, subscriptionID *dssmodels.ID) (bool, error) {
// Get the Subscription supporting the OperationalIntent, if one is defined
if subscriptionID != nil {
sub, err := r.GetSubscription(ctx, *subscriptionID)
if err != nil {
return false, stacktrace.Propagate(err, "Unable to get OperationalIntent's Subscription from repo")
}
if sub == nil {
return false, stacktrace.NewError("OperationalIntent's Subscription missing from repo")
}

if sub.ImplicitSubscription {
// Get the Subscription's dependent OperationalIntents
dependentOps, err := r.GetDependentOperationalIntents(ctx, sub.ID)
if err != nil {
return false, stacktrace.Propagate(err, "Could not find dependent OperationalIntents")
}
if len(dependentOps) == 0 {
return false, stacktrace.NewError("An implicit Subscription had no dependent OperationalIntents")
} else if len(dependentOps) == 1 {
return true, nil
}
}
}
return false, nil
}

// DeleteOperationalIntentReference deletes a single operational intent ref for a given ID at
// the specified version.
func (a *Server) DeleteOperationalIntentReference(ctx context.Context, req *restapi.DeleteOperationalIntentReferenceRequest,
Expand Down Expand Up @@ -459,7 +423,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
return stacktrace.Propagate(err, "Could not get OperationalIntent from repo")
}

var previousSubscriptionID *dssmodels.ID
if old != nil {
if old.Manager != manager {
return stacktrace.NewErrorWithCode(dsserr.PermissionDenied,
Expand All @@ -471,7 +434,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
}

version = int32(old.Version)
previousSubscriptionID = old.SubscriptionID
} else {
if ovn != "" {
return stacktrace.NewErrorWithCode(dsserr.NotFound, "OperationalIntent does not exist and therefore is not version %s", ovn)
Expand All @@ -481,7 +443,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
}

var sub *scdmodels.Subscription
removePreviousImplicitSubscription := false
if subscriptionID.Empty() {
// Create an implicit subscription if the implicit subscription params are set:
// for situations where these params are required but have not been set,
Expand All @@ -495,11 +456,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
}
}

removePreviousImplicitSubscription, err = subscriptionCanBeRemoved(ctx, r, previousSubscriptionID)
if err != nil {
return stacktrace.Propagate(err, "Could not determine if previous Subscription can be removed")
}

// Note: parameters for a new implicit subscription have been passed, so we will create
// a new implicit subscription even if another subscription was attaches to this OIR before,
// (and regardless of whether it was an implicit subscription or not).
Expand Down Expand Up @@ -705,14 +661,6 @@ func (a *Server) upsertOperationalIntentReference(ctx context.Context, authorize
return stacktrace.Propagate(err, "Failed to upsert OperationalIntent in repo")
}

// Check if the previously attached subscription should be removed
if removePreviousImplicitSubscription {
err = r.DeleteSubscription(ctx, *previousSubscriptionID)
if err != nil {
return stacktrace.Propagate(err, "Unable to delete previous implicit Subscription")
}
}

// Find Subscriptions that may need to be notified
allsubs, err := r.SearchSubscriptions(ctx, notifyVol4)
if err != nil {
Expand Down
49 changes: 42 additions & 7 deletions pkg/scd/store/cockroach/operational_intents.go
Original file line number Diff line number Diff line change
Expand Up @@ -210,13 +210,48 @@ func (s *repo) DeleteOperationalIntent(ctx context.Context, id dssmodels.ID) err
func (s *repo) UpsertOperationalIntent(ctx context.Context, operation *scdmodels.OperationalIntent) (*scdmodels.OperationalIntent, error) {
var (
upsertOperationsQuery = fmt.Sprintf(`
UPSERT INTO
scd_operations
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11)
RETURNING
%s`, operationFieldsWithoutPrefix, operationFieldsWithPrefix)
WITH previous_implicit_sub AS (
-- get the current subscription id if:
-- - it exists
-- - it is implicit
-- - the OIR's subscription is being updated (ie, the new subscription id is different from the old one)
SELECT
scd_subscriptions.id
FROM scd_operations
JOIN scd_subscriptions ON scd_operations.subscription_id = scd_subscriptions.id
WHERE
scd_operations.id = $1
AND
scd_subscriptions.implicit = true
AND
scd_subscriptions.id != $9
),
upserted_oir AS (
-- actual insertion/update statement
UPSERT INTO
scd_operations
(%s)
VALUES
($1, $2, $3, $4, $5, $6, $7, $8, $9, transaction_timestamp(), $10, $11)
RETURNING
%s
),
dependent_oirs AS ( -- NOTE: this sub-query will still return the OIR being mutated (!)
SELECT id
FROM scd_operations
WHERE subscription_id = (SELECT id FROM previous_implicit_sub)
),
deleted_subscription_id AS (
-- We are guaranteed to only delete something here if the OIR is being updated. Upon creation
-- previous_implicit_sub will be empty
DELETE FROM scd_subscriptions
WHERE id = (SELECT id FROM previous_implicit_sub)
AND (SELECT COUNT(*) FROM dependent_oirs) = 1 -- NOTE: see above, the OIR being updated is still counted here, hence a value of 1
RETURNING id
)
-- return the upserted OIR
SELECT * FROM upserted_oir
`, operationFieldsWithoutPrefix, operationFieldsWithPrefix)
)

cids := make([]int64, len(operation.Cells))
Expand Down

0 comments on commit 17c419f

Please sign in to comment.