Skip to content

Commit

Permalink
Refactor upserts for instance ip addresses
Browse files Browse the repository at this point in the history
Enable the use of SELECT FOR UPDATE when finding existing instance ip
address rows that may be changed during an upsert transaction. This will
lock the rows, even from reads, until the transaction is commit. Based
on the combination of this and the previous commit where we enforce ip
address uniqueness in the db, we should have good protection from upsert
race conditions.

Also refactor the retry logic into its own wrapper function for
improved clarity.
  • Loading branch information
ScottGarman committed Sep 1, 2023
1 parent a58c2f3 commit aca8032
Show file tree
Hide file tree
Showing 2 changed files with 73 additions and 84 deletions.
22 changes: 2 additions & 20 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -197,10 +197,6 @@ github.com/go-playground/universal-translator v0.18.1 h1:Bcnm0ZwsGyWbCzImXv+pAJn
github.com/go-playground/universal-translator v0.18.1/go.mod h1:xekY+UJKNuX9WP91TpwSH2VMlDf28Uj24BCp08ZFTUY=
github.com/go-playground/validator/v10 v10.2.0/go.mod h1:uOYAAleCW8F/7oMFd6aG0GOhaH6EGOAJShg8Id5JGkI=
github.com/go-playground/validator/v10 v10.10.0/go.mod h1:74x4gJWsvQexRdW8Pn3dXSGrTK4nAUsbPlLADvpJkos=
github.com/go-playground/validator/v10 v10.15.0 h1:nDU5XeOKtB3GEa+uB7GNYwhVKsgjAR7VgKoNB6ryXfw=
github.com/go-playground/validator/v10 v10.15.0/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-playground/validator/v10 v10.15.1 h1:BSe8uhN+xQ4r5guV/ywQI4gO59C2raYcGffYWZEjZzM=
github.com/go-playground/validator/v10 v10.15.1/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-playground/validator/v10 v10.15.3 h1:S+sSpunYjNPDuXkWbK+x+bA7iXiW296KG4dL3X7xUZo=
github.com/go-playground/validator/v10 v10.15.3/go.mod h1:9iXMNT7sEkjXb0I+enO7QXmzG6QCsPWY4zveKFVRSyU=
github.com/go-sql-driver/mysql v1.6.0/go.mod h1:DCzpHaOWr8IXmIStZouvnhqoel9Qv2LBy8hT2VhHyBg=
Expand Down Expand Up @@ -299,7 +295,6 @@ github.com/google/pprof v0.0.0-20210720184732-4bb14d4b1be1/go.mod h1:kpwsk12EmLe
github.com/google/renameio v0.1.0/go.mod h1:KWCgfxg9yswjAJkECMjeO8J8rahYeXnNhOm40UhjYkI=
github.com/google/uuid v1.1.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.1.2/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.0 h1:t6JiXgmwXMjEs8VusXIJk2BXHsn+wx8BZdTaoZ5fu7I=
github.com/google/uuid v1.3.0/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
github.com/google/uuid v1.3.1 h1:KjJaJ9iWZ3jOFZIf1Lqf4laDRCasjl0BCmnEGxkdLb4=
github.com/google/uuid v1.3.1/go.mod h1:TIyPZe4MgqvfeYDBFedMoGGpEw/LqOeaOT+nhxU+yHo=
Expand Down Expand Up @@ -524,8 +519,6 @@ github.com/pmezard/go-difflib v1.0.0/go.mod h1:iKH77koFhYxTK1pcRnkKkqfTogsbg7gZN
github.com/posener/complete v1.1.1/go.mod h1:em0nMJCgc9GFtwrmVmEMR/ZL6WyhyjMBndrE9hABlRI=
github.com/posener/complete v1.2.3/go.mod h1:WZIdtGGp+qx0sLrYKtIRAruyNpv6hFCicSgv7Sy7s/s=
github.com/prashantv/gostub v1.1.0 h1:BTyx3RfQjRHnUWaGF9oQos79AlQ5k8WNktv7VGvVH4g=
github.com/pressly/goose/v3 v3.13.4 h1:9xRcg/hEU9HqeRNeKh69VLtPWCKAYTX6l2VsXWOX86A=
github.com/pressly/goose/v3 v3.13.4/go.mod h1:Fo8rYaf9tYfQiDpo+ymrnZi8vvLkvguRl16nu7QnUT4=
github.com/pressly/goose/v3 v3.15.0 h1:6tY5aDqFknY6VZkorFGgZtWygodZQxfmmEF4rqyJW9k=
github.com/pressly/goose/v3 v3.15.0/go.mod h1:LlIo3zGccjb/YUgG+Svdb9Er14vefRdlDI7URCDrwYo=
github.com/prometheus/client_golang v0.9.1/go.mod h1:7SWBe2y4D6OKWSNQJUaRYU/AaXPKyh/dDVn+NZz0KFw=
Expand Down Expand Up @@ -639,12 +632,9 @@ github.com/volatiletech/null/v8 v8.1.2 h1:kiTiX1PpwvuugKwfvUNX/SU/5A2KGZMXfGD0DU
github.com/volatiletech/null/v8 v8.1.2/go.mod h1:98DbwNoKEpRrYtGjWFctievIfm4n4MxG0A6EBUcoS5g=
github.com/volatiletech/randomize v0.0.1 h1:eE5yajattWqTB2/eN8df4dw+8jwAzBtbdo5sbWC4nMk=
github.com/volatiletech/randomize v0.0.1/go.mod h1:GN3U0QYqfZ9FOJ67bzax1cqZ5q2xuj2mXrXBjWaRTlY=
github.com/volatiletech/sqlboiler/v4 v4.14.2 h1:j5QnlR5/wYDmGDDTutI3BO+4oPBiqYoVrfReVr7VSxA=
github.com/volatiletech/sqlboiler/v4 v4.14.2/go.mod h1:65288sb8jBLnTynTumBK6eU8C2JwWsiPjoPihEfC0/A=
github.com/volatiletech/sqlboiler/v4 v4.15.0 h1:+twm3mA34SaUF6wB9U6QkXxkK8AKkV5EfgMSvcKWeY4=
github.com/volatiletech/sqlboiler/v4 v4.15.0/go.mod h1:s643wqYyCQ7Ak2hMVxH7kTS0+lFPNlj+gHKUIukJ0YA=
github.com/volatiletech/strmangle v0.0.1/go.mod h1:F6RA6IkB5vq0yTG4GQ0UsbbRcl3ni9P76i+JrTBKFFg=
github.com/volatiletech/strmangle v0.0.4/go.mod h1:ycDvbDkjDvhC0NUU8w3fWwl5JEMTV56vTKXzR3GeR+0=
github.com/volatiletech/strmangle v0.0.5 h1:CompJPy+lAi9h+YU/IzBR4X2RDRuAuEIP+kjFdyZXcU=
github.com/volatiletech/strmangle v0.0.5/go.mod h1:ycDvbDkjDvhC0NUU8w3fWwl5JEMTV56vTKXzR3GeR+0=
github.com/yuin/goldmark v1.1.25/go.mod h1:3hX8gzYuyVAZsxl0MRgGTJEmQBFcNTphYh9decYSb74=
Expand All @@ -662,10 +652,6 @@ go.etcd.io/etcd/client/v2 v2.305.4/go.mod h1:Ud+VUwIi9/uQHOMA+4ekToJ12lTxlv0zB/+
go.etcd.io/etcd/client/v3 v3.5.4/go.mod h1:ZaRkVgBZC+L+dLCjTcF1hRXpgZXQPOvnA/Ak/gq3kiY=
go.hollow.sh/toolbox v0.6.1 h1:3E6JofImSCe63XayczbGfDxIXUjmBziMBBmbwook8WA=
go.hollow.sh/toolbox v0.6.1/go.mod h1:nl+5RDDyYY/+wukOUzHHX2mOyWKRjlTOXUcGxny+tns=
go.infratographer.com/x v0.3.4 h1:K7azcoiLZPRdOnr4M7DMyB2DjZzXRVcfr7G6FeQd16o=
go.infratographer.com/x v0.3.4/go.mod h1:pXXSdeJBisAK3AdED5EFj7Yo8z8td7fOWDkNl4Dkp0s=
go.infratographer.com/x v0.3.7 h1:kkykoVtC8XrmvC4oZwHWa/15+dv9RhQHgSm8KoEb/Nc=
go.infratographer.com/x v0.3.7/go.mod h1:/zbDM9njbWzUDCA9pkbi1z/v4VZjGsVHx+SPycSgIhg=
go.infratographer.com/x v0.3.8 h1:ZKL/oeTO8an4p58ZXtDdCMl9DVr7Y+RAY2EVeTf1/Uc=
go.infratographer.com/x v0.3.8/go.mod h1:H8O2vkWmo26WNuQEFS2PlJoms9YLJ7BNiwFNMTwCuuA=
go.opencensus.io v0.21.0/go.mod h1:mSImk1erAIZhrmZN+AvHh14ztQfjbGwt4TtuofqLduU=
Expand Down Expand Up @@ -1049,7 +1035,7 @@ golang.org/x/tools v0.1.3/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.4/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.5/go.mod h1:o0xws9oXOQQZyjljx8fwUC0k7L1pTE6eaCbjGeHmOkk=
golang.org/x/tools v0.1.12/go.mod h1:hNGJHUnrk76NpqgfD5Aqm5Crs+Hm0VOH/i9J2+nxYbc=
golang.org/x/tools v0.10.0 h1:tvDr/iQoUqNdohiYm0LmmKcBk+q86lb9EprIUFhHHGg=
golang.org/x/tools v0.12.1-0.20230815132531-74c255bcf846 h1:Vve/L0v7CXXuxUmaMGIEK/dEeq7uiqb5qBgQrZzIE7E=
golang.org/x/xerrors v0.0.0-20190410155217-1f06c39b4373/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190513163551-3ee3066db522/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
Expand Down Expand Up @@ -1186,8 +1172,6 @@ google.golang.org/genproto v0.0.0-20220421151946-72621c1f0bd3/go.mod h1:8w6bsBMX
google.golang.org/genproto v0.0.0-20220429170224-98d788798c3e/go.mod h1:8w6bsBMX6yCPbAVTeqQHvzxW0EIFigd5lZyahWgyfDo=
google.golang.org/genproto v0.0.0-20220505152158-f39f71e6c8f3/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20220519153652-3a47de7e79bd/go.mod h1:RAyBrSAP7Fh3Nc84ghnVLDPuV51xc9agzmm4Ph6i0Q4=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1 h1:KpwkzHKEF7B9Zxg18WzOa7djJ+Ha5DzthMyZYQfEn2A=
google.golang.org/genproto v0.0.0-20230410155749-daa745c078e1/go.mod h1:nKE/iIaLqn2bQwXBg8f1g2Ylh6r5MN5CmZvuzZCgsCU=
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130 h1:Au6te5hbKUV8pIYWHqOUZ1pva5qK/rwbIhoXEUB9Lu8=
google.golang.org/genproto v0.0.0-20230706204954-ccb25ca9f130/go.mod h1:O9kGHb51iE/nOGvQaDUuadVYqovW56s5emA88lQnj6Y=
google.golang.org/genproto/googleapis/api v0.0.0-20230706204954-ccb25ca9f130 h1:XVeBY8d/FaK4848myy41HBqnDwvxeV3zMZhwN1TvAMU=
Expand Down Expand Up @@ -1225,8 +1209,6 @@ google.golang.org/grpc v1.44.0/go.mod h1:k+4IHHFw41K8+bbowsex27ge2rCb65oeWqe4jJ5
google.golang.org/grpc v1.45.0/go.mod h1:lN7owxKUQEqMfSyQikvvk5tf/6zMPsrK+ONuO11+0rQ=
google.golang.org/grpc v1.46.0/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.46.2/go.mod h1:vN9eftEi1UMyUsIF80+uQXhHjbXYbm0uXoFCACuMGWk=
google.golang.org/grpc v1.55.0 h1:3Oj82/tFSCeUrRTg/5E/7d/W5A1tj6Ky1ABAuZuv5ag=
google.golang.org/grpc v1.55.0/go.mod h1:iYEXKGkEBhg1PjZQvoYEVPTDkHo1/bjTnfwTeGONTY8=
google.golang.org/grpc v1.57.0 h1:kfzNeI/klCGD2YPMUlaGNT3pxvYfga7smW3Vth8Zsiw=
google.golang.org/grpc v1.57.0/go.mod h1:Sd+9RMTACXwmub0zcNY2c4arhtrbBYD1AUHI/dt16Mo=
google.golang.org/grpc/cmd/protoc-gen-go-grpc v1.1.0/go.mod h1:6Kw0yEErY5E/yWrBtf03jp27GLLJujG4z/JK95pnjjw=
Expand Down Expand Up @@ -1316,7 +1298,7 @@ modernc.org/opt v0.1.1/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/opt v0.1.3 h1:3XOZf2yznlhC+ibLltsDGzABUGVx8J6pnFMS3E4dcq4=
modernc.org/opt v0.1.3/go.mod h1:WdSiB5evDcignE70guQKxYUl14mgWtbClRi5wmkkTX0=
modernc.org/sqlite v1.18.1/go.mod h1:6ho+Gow7oX5V+OiOQ6Tr4xeqbx13UZ6t+Fw9IRUG4d4=
modernc.org/sqlite v1.23.1 h1:nrSBg4aRQQwq59JpvGEQ15tNxoO5pX/kUjcRNwSAGQM=
modernc.org/sqlite v1.25.0 h1:AFweiwPNd/b3BoKnBOfFm+Y260guGMF+0UFk0savqeA=
modernc.org/strutil v1.1.1/go.mod h1:DE+MQQ/hjKBZS2zNInV5hhcipt5rLPWkmpbGeW5mmdw=
modernc.org/strutil v1.1.3 h1:fNMm+oJklMGYfU9Ylcywl0CO5O6nTfaowNsh2wpPjzY=
modernc.org/strutil v1.1.3/go.mod h1:MEHNA7PdEnEwLvspRMtWTNnp2nnyvMfkimT1NKNAGbw=
Expand Down
135 changes: 71 additions & 64 deletions internal/upserter/upserter.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ import (
"github.com/jmoiron/sqlx"
"github.com/spf13/viper"
"github.com/volatiletech/sqlboiler/v4/boil"
"github.com/volatiletech/sqlboiler/v4/queries/qm"
"go.uber.org/zap"

"go.hollow.sh/metadataservice/internal/models"
Expand All @@ -33,7 +34,7 @@ func UpsertMetadata(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id str

logger.Sugar().Info("Starting metadata upsert for uuid: ", id)

return doUpsert(ctx, db, logger, id, ipAddresses, metadataUpserter)
return doUpsertWithRetries(ctx, db, logger, id, ipAddresses, metadataUpserter)
}

// UpsertUserdata is used to upsert (update or insert) an instance_userdata
Expand All @@ -46,29 +47,85 @@ func UpsertUserdata(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id str

logger.Sugar().Info("Starting userdata upsert for uuid: ", id)

return doUpsert(ctx, db, logger, id, ipAddresses, userdataUpserter)
return doUpsertWithRetries(ctx, db, logger, id, ipAddresses, userdataUpserter)
}

// doUpsertWithRetries is just a wrapper function that invokes doUpsert(), but handles the retry logic
func doUpsertWithRetries(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id string, ipAddresses []string, upsertRecordFunc RecordUpserter) error {
upsertSuccess := false
maxUpsertRetries := viper.GetInt("crdb.max_retries")
dbRetryInterval := viper.GetDuration("crdb.retry_interval")

var err error

for i := 0; i <= maxUpsertRetries && !upsertSuccess; i++ {
err = doUpsert(ctx, db, logger, id, ipAddresses, upsertRecordFunc)
if err == nil {
upsertSuccess = true

if i > 0 {
logger.Sugar().Info("Upsert operation for instance: ", id, " successful on retry attempt #", i)
}
} else {
// Exponential backoff would be overkill here, but adding a bit of jitter
// to sleep a short time is reasonable
jitter := time.Duration(rand.Int63n(int64(dbRetryInterval)))
time.Sleep(jitter)
}
}

if !upsertSuccess {
logger.Sugar().Error("Upsert operation failed for instance: ", id, " even after ", maxUpsertRetries, " attempts")
return err
}

return nil
}

// doUpsert handles the functionality common to inserting or updating both
// metadata and userdata records. Namely, handling conflicting or stale
// (in the case of an update) IP address associations.
func doUpsert(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id string, ipAddresses []string, upsertRecordFunc RecordUpserter) error {
logger.Sugar().Info("doUpsert starting for id: ", id, " - upserting IPs ", ipAddresses)

ctx = boil.WithDebug(ctx, true)

// Start a DB transaction
txErr := false

tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}

// If there's an error, we'll want to roll back the transaction.
defer func() {
if txErr {
logger.Sugar().Warn("Rolling back doUpsert transaction for instance: ", id, " with ipAddresses: ", ipAddresses)

err := tx.Rollback()
if err != nil {
logger.Sugar().Error("Could not roll back doUpsert transaction for instance: ", id, "Error: ", err)
}
}
}()

// Step 1
// Look for any conflicting IP addresses (IPs already present and associated
// with a *different* Instance ID)
conflictIPs, err := models.InstanceIPAddresses(models.InstanceIPAddressWhere.Address.IN(ipAddresses), models.InstanceIPAddressWhere.InstanceID.NEQ(id)).All(ctx, db)
// Select and lock the ip address rows that may be updated or deleted by this operation, to prevent race conditions
// This includes:
// * ip addresses that already exist for this instance id (instanceIPAddresses)
// * ip addresses included in this update request, but are associated with a different instance id (conflictIPs)
instanceIPAddresses, err := models.InstanceIPAddresses(models.InstanceIPAddressWhere.InstanceID.EQ(id), qm.For("UPDATE")).All(ctx, db)
if err != nil {
return err
}

// Step 2
// Look up any existing instance_ip_addresses rows for the provided instance ID
instanceIPAddresses, err := models.InstanceIPAddresses(models.InstanceIPAddressWhere.InstanceID.EQ(id)).All(ctx, db)
conflictIPs, err := models.InstanceIPAddresses(models.InstanceIPAddressWhere.Address.IN(ipAddresses), models.InstanceIPAddressWhere.InstanceID.NEQ(id), qm.For("UPDATE")).All(ctx, db)
if err != nil {
return err
}

// Step 2.5.a
// Step 2.a
// Find "stale" InstanceIPAddress rows for this instance. That is, select
// rows from the instanceIPAddresses result which don't have a corresponding
// entry in the list of IP Addresses supplied in the call.
Expand All @@ -89,7 +146,7 @@ func doUpsert(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id string, i
}
}

// Step 2.5.b
// Step 2.b
// Find new IP Addresses that were specified in the call that aren't
// currently associated to the instance.
var newInstanceIPAddresses models.InstanceIPAddressSlice
Expand All @@ -113,57 +170,7 @@ func doUpsert(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id string, i
}
}

upsertSuccess := false
maxUpsertRetries := viper.GetInt("crdb.max_retries")
dbRetryInterval := viper.GetDuration("crdb.retry_interval")

for i := 0; i <= maxUpsertRetries && !upsertSuccess; i++ {
err = performUpsert(ctx, db, logger, id, upsertRecordFunc, conflictIPs, staleInstanceIPAddresses, newInstanceIPAddresses)
if err == nil {
upsertSuccess = true

if i > 0 {
logger.Sugar().Info("DB upsert transaction for instance: ", id, " successful on retry attempt #", i)
}
} else {
// Exponential backoff would be overkill here, but adding a bit of jitter
// to sleep a short time is reasonable
jitter := time.Duration(rand.Int63n(int64(dbRetryInterval)))
time.Sleep(jitter)
}
}

if !upsertSuccess {
logger.Sugar().Error("Upsert operation failed for instance: ", id, " even after ", maxUpsertRetries, " attempts")
return err
}

return nil
}

func performUpsert(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id string, upsertRecordFunc RecordUpserter, conflictIPs models.InstanceIPAddressSlice, staleInstanceIPAddresses models.InstanceIPAddressSlice, newInstanceIPAddresses models.InstanceIPAddressSlice) error {
// Step 3
// Kick off the DB transaction
txErr := false

tx, err := db.BeginTx(ctx, nil)
if err != nil {
return err
}

// If there's an error, we'll want to roll back the transaction.
defer func() {
if txErr {
logger.Sugar().Warn("Rolling back upserter transaction for instance: ", id)

err := tx.Rollback()
if err != nil {
logger.Sugar().Error("Could not roll back upserter transaction for instance: ", id, "Error: ", err)
}
}
}()

// Step 4
// Remove any instance_ip_address rows for the specified IP addresses that
// are currently associated to a *different* instance ID
for _, conflictingIP := range conflictIPs {
Expand All @@ -178,7 +185,7 @@ func performUpsert(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id stri
}
}

// Step 5
// Step 4
// Remove any "stale" instance_ip_addresses rows associated to the provided
// instnace_id but were not specified in the call.
for _, staleIP := range staleInstanceIPAddresses {
Expand All @@ -190,7 +197,7 @@ func performUpsert(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id stri
}
}

// Step 6
// Step 5
// Create instance_ip_addresses rows for any IP addresses specified in the
// call that aren't already associated to the provided instance_id
for _, newInstanceIP := range newInstanceIPAddresses {
Expand All @@ -202,7 +209,7 @@ func performUpsert(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id stri
}
}

// Step 7
// Step 6
// Upsert the instance_metadata or instance_userdata table. This will create
// a new row with the provided instance ID and metadata or userdata if there
// is no current row for instance_id. If there is an existing row matching on
Expand All @@ -214,7 +221,7 @@ func performUpsert(ctx context.Context, db *sqlx.DB, logger *zap.Logger, id stri
return err
}

// Step 8
// Step 7
// Commit our transaction
err = tx.Commit()
if err != nil {
Expand Down

0 comments on commit aca8032

Please sign in to comment.