From 4c59544c21e5fd734ef5f1d37725757418ce948a Mon Sep 17 00:00:00 2001 From: Nicholas Molnar <65710+neekolas@users.noreply.github.com> Date: Tue, 30 Apr 2024 07:36:12 -0700 Subject: [PATCH] Add retry support for serializable transactions --- pkg/mls/store/store.go | 19 ++++++++++++++++++- 1 file changed, 18 insertions(+), 1 deletion(-) diff --git a/pkg/mls/store/store.go b/pkg/mls/store/store.go index bf202994..b94f9d4f 100644 --- a/pkg/mls/store/store.go +++ b/pkg/mls/store/store.go @@ -106,7 +106,7 @@ func (s *Store) PublishIdentityUpdate(ctx context.Context, req *identity.Publish return nil, errors.New("IdentityUpdate is required") } - if err := s.RunInTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}, func(ctx context.Context, txQueries *queries.Queries) error { + if err := s.RunInSerializableTx(ctx, 3, func(ctx context.Context, txQueries *queries.Queries) error { inboxLogEntries, err := txQueries.GetAllInboxLogs(ctx, new_update.GetInboxId()) if err != nil { return err @@ -605,3 +605,20 @@ func (s *Store) RunInTx( done = true return tx.Commit() } + +func (s *Store) RunInSerializableTx(ctx context.Context, numRetries int, fn func(ctx context.Context, txQueries *queries.Queries) error) error { + var err error + for i := 0; i < numRetries; i++ { + select { + case <-ctx.Done(): + return ctx.Err() + default: + err = s.RunInTx(ctx, &sql.TxOptions{Isolation: sql.LevelSerializable}, fn) + if err == nil { + return nil + } + s.log.Warn("Error in serializable tx", zap.Error(err)) + } + } + return err +}