diff --git a/pkg/scd/operational_intents_handler.go b/pkg/scd/operational_intents_handler.go index 8ff48deb4..68f0015ba 100644 --- a/pkg/scd/operational_intents_handler.go +++ b/pkg/scd/operational_intents_handler.go @@ -430,6 +430,13 @@ func (a *Server) PutOperationalIntentReference(ctx context.Context, manager stri action := func(ctx context.Context, r repos.Repository) (err error) { var version int32 // Version of the Operational Intent (0 means creation requested). + // Lock subscriptions based on the cell to reduce the number of retries under concurrent load. + // See issue #1002 for details. + err = r.LockSubscriptionsOnCells(ctx, cells) + if err != nil { + return stacktrace.Propagate(err, "Unable to acquire lock") + } + // Get existing OperationalIntent, if any, and validate request old, err := r.GetOperationalIntent(ctx, id) if err != nil { diff --git a/pkg/scd/repos/repos.go b/pkg/scd/repos/repos.go index 2440d1a27..ab1cc4a9f 100644 --- a/pkg/scd/repos/repos.go +++ b/pkg/scd/repos/repos.go @@ -2,7 +2,7 @@ package repos import ( "context" - + "github.com/golang/geo/s2" dssmodels "github.com/interuss/dss/pkg/models" scdmodels "github.com/interuss/dss/pkg/scd/models" ) @@ -51,6 +51,9 @@ type Subscription interface { // specified Subscription and returns the resulting corresponding // notification indices. IncrementNotificationIndices(ctx context.Context, subscriptionIds []dssmodels.ID) ([]int, error) + + // LockSubscriptionsOnCells locks the subscriptions of interest on specific cells. + LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error } type UssAvailability interface { diff --git a/pkg/scd/store/cockroach/subscriptions.go b/pkg/scd/store/cockroach/subscriptions.go index a9c9f182e..63fe5cf30 100644 --- a/pkg/scd/store/cockroach/subscriptions.go +++ b/pkg/scd/store/cockroach/subscriptions.go @@ -382,3 +382,34 @@ func (c *repo) IncrementNotificationIndices(ctx context.Context, subscriptionIds return indices, nil } + +func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error { + cids := make([]int64, len(cells)) + + for i, cell := range cells { + cids[i] = int64(cell) + } + + var pgCids pgtype.Int8Array + err := pgCids.Set(cids) + if err != nil { + return stacktrace.Propagate(err, "Failed to convert array to jackc/pgtype") + } + + const query = ` + SELECT + id + FROM + scd_subscriptions + WHERE + cells && $1 + FOR UPDATE + ` + + _, err = c.q.Exec(ctx, query, pgCids) + if err != nil { + return stacktrace.Propagate(err, "Error in query: %s", query) + } + + return nil +}