From c1e2819a6e2c05e76d0545f6beb6dfe7610ba8d1 Mon Sep 17 00:00:00 2001 From: Michael Barroco Date: Fri, 23 Feb 2024 16:49:07 +0100 Subject: [PATCH] Add lock at the beginning of the OI transaction --- pkg/scd/operational_intents_handler.go | 7 ++++++ pkg/scd/repos/repos.go | 4 +++ pkg/scd/store/cockroach/subscriptions.go | 31 ++++++++++++++++++++++++ 3 files changed, 42 insertions(+) diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index 8ff48deb4..68f0015ba 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -430,6 +430,13 @@ func (a *Server) PutOperationalIntentReference(ctx context.Context, manager stri action := func(ctx context.Context, r repos.Repository) (err error) { var version int32 // Version of the Operational Intent (0 means creation requested). + // Lock subscriptions based on the cell to reduce the number of retries under concurrent load. + // See issue #1002 for details. + err = r.LockSubscriptionsOnCells(ctx, cells) + if err != nil { + return stacktrace.Propagate(err, "Unable to acquire lock") + } + // Get existing OperationalIntent, if any, and validate request old, err := r.GetOperationalIntent(ctx, id) if err != nil { diff --git a/pkg/scd/repos/repos.go b/pkg/scd/repos/repos.go index 2440d1a27..9838fc118 100644 --- a/pkg/scd/repos/repos.go +++ b/pkg/scd/repos/repos.go @@ -2,6 +2,7 @@ package repos import ( "context" + "github.com/golang/geo/s2" dssmodels "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" @@ -27,6 +28,9 @@ type OperationalIntent interface { // GetDependentOperationalIntents returns IDs of all operations dependent on // subscription identified by "subscriptionID". GetDependentOperationalIntents(ctx context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error) + + // LockSubscriptionsOnCells locks the subscriptions of interest on specific cells. + LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error } // Subscription abstracts subscription-specific interactions with the backing repository. diff --git a/pkg/scd/store/cockroach/subscriptions.go b/pkg/scd/store/cockroach/subscriptions.go index a9c9f182e..63fe5cf30 100644 --- a/pkg/scd/store/cockroach/subscriptions.go +++ b/pkg/scd/store/cockroach/subscriptions.go @@ -382,3 +382,34 @@ func (c *repo) IncrementNotificationIndices(ctx context.Context, subscriptionIds return indices, nil } + +func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error { + cids := make([]int64, len(cells)) + + for i, cell := range cells { + cids[i] = int64(cell) + } + + var pgCids pgtype.Int8Array + err := pgCids.Set(cids) + if err != nil { + return stacktrace.Propagate(err, "Failed to convert array to jackc/pgtype") + } + + const query = ` + SELECT + id + FROM + scd_subscriptions + WHERE + cells && $1 + FOR UPDATE + ` + + _, err = c.q.Exec(ctx, query, pgCids) + if err != nil { + return stacktrace.Propagate(err, "Error in query: %s", query) + } + + return nil +}