Skip to content

Commit

Permalink
Add lock at the beginning of the OI transaction
Browse files Browse the repository at this point in the history
  • Loading branch information
barroco committed Feb 23, 2024
1 parent b42f7e5 commit c1e2819
Show file tree
Hide file tree
Showing 3 changed files with 42 additions and 0 deletions.
7 changes: 7 additions & 0 deletions pkg/scd/operational_intents_handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -430,6 +430,13 @@ func (a *Server) PutOperationalIntentReference(ctx context.Context, manager stri
action := func(ctx context.Context, r repos.Repository) (err error) {
var version int32 // Version of the Operational Intent (0 means creation requested).

// Lock subscriptions based on the cell to reduce the number of retries under concurrent load.
// See issue #1002 for details.
err = r.LockSubscriptionsOnCells(ctx, cells)
if err != nil {
return stacktrace.Propagate(err, "Unable to acquire lock")
}

// Get existing OperationalIntent, if any, and validate request
old, err := r.GetOperationalIntent(ctx, id)
if err != nil {
Expand Down
4 changes: 4 additions & 0 deletions pkg/scd/repos/repos.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package repos

import (
"context"
"github.com/golang/geo/s2"

dssmodels "github.com/interuss/dss/pkg/models"
scdmodels "github.com/interuss/dss/pkg/scd/models"
Expand All @@ -27,6 +28,9 @@ type OperationalIntent interface {
// GetDependentOperationalIntents returns IDs of all operations dependent on
// subscription identified by "subscriptionID".
GetDependentOperationalIntents(ctx context.Context, subscriptionID dssmodels.ID) ([]dssmodels.ID, error)

// LockSubscriptionsOnCells locks the subscriptions of interest on specific cells.
LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error
}

// Subscription abstracts subscription-specific interactions with the backing repository.
Expand Down
31 changes: 31 additions & 0 deletions pkg/scd/store/cockroach/subscriptions.go
Original file line number Diff line number Diff line change
Expand Up @@ -382,3 +382,34 @@ func (c *repo) IncrementNotificationIndices(ctx context.Context, subscriptionIds

return indices, nil
}

func (c *repo) LockSubscriptionsOnCells(ctx context.Context, cells s2.CellUnion) error {
cids := make([]int64, len(cells))

for i, cell := range cells {
cids[i] = int64(cell)
}

var pgCids pgtype.Int8Array
err := pgCids.Set(cids)
if err != nil {
return stacktrace.Propagate(err, "Failed to convert array to jackc/pgtype")
}

const query = `
SELECT
id
FROM
scd_subscriptions
WHERE
cells && $1
FOR UPDATE
`

_, err = c.q.Exec(ctx, query, pgCids)
if err != nil {
return stacktrace.Propagate(err, "Error in query: %s", query)
}

return nil
}

0 comments on commit c1e2819

Please sign in to comment.