diff --git a/pkg/db/db.go b/pkg/db/db.go index 507e44217..81f5d98af 100644 --- a/pkg/db/db.go +++ b/pkg/db/db.go @@ -1579,7 +1579,7 @@ func (h *DBHandler) RunCustomMigrationReleases(ctx context.Context, getAllAppsFu span, _ := tracer.StartSpanFromContext(ctx, "RunCustomMigrationReleases") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { l := logger.FromContext(ctx).Sugar() allReleasesDb, err := h.DBSelectAnyRelease(ctx, transaction) if err != nil { @@ -1641,7 +1641,7 @@ func (h *DBHandler) RunCustomMigrationDeployments(ctx context.Context, getAllDep span, _ := tracer.StartSpanFromContext(ctx, "RunCustomMigrationDeployments") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { l := logger.FromContext(ctx).Sugar() allAppsDb, err := h.DBSelectAnyDeployment(ctx, transaction) if err != nil { @@ -1699,7 +1699,7 @@ func (h *DBHandler) RunCustomMigrationEnvLocks(ctx context.Context, getAllEnvLoc span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrationEnvLocks") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { l := logger.FromContext(ctx).Sugar() allEnvLocksDb, err := h.DBSelectAnyActiveEnvLocks(ctx, transaction) if err != nil { @@ -1746,7 +1746,7 @@ func (h *DBHandler) RunCustomMigrationAppLocks(ctx context.Context, getAllAppLoc span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrationAppLocks") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { l := logger.FromContext(ctx).Sugar() allAppLocksDb, err := h.DBSelectAnyActiveAppLock(ctx, transaction) if err != nil { @@ -1793,7 +1793,7 @@ func (h *DBHandler) RunCustomMigrationTeamLocks(ctx context.Context, getAllTeamL span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrationTeamLocks") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { l := logger.FromContext(ctx).Sugar() allTeamLocksDb, err := h.DBSelectAnyActiveTeamLock(ctx, transaction) if err != nil { @@ -1836,7 +1836,7 @@ func (h *DBHandler) RunCustomMigrationTeamLocks(ctx context.Context, getAllTeamL } func (h *DBHandler) RunCustomMigrationsCommitEvents(ctx context.Context, getAllEvents GetAllEventsFun) error { - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { l := logger.FromContext(ctx).Sugar() ev, err := h.DBSelectAnyEvent(ctx, transaction) if err != nil { @@ -1872,7 +1872,7 @@ func (h *DBHandler) RunCustomMigrationQueuedApplicationVersions(ctx context.Cont span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrationQueuedApplicationVersions") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { l := logger.FromContext(ctx).Sugar() allTeamLocksDb, err := h.DBSelectAnyDeploymentAttempt(ctx, transaction) if err != nil { @@ -1904,7 +1904,7 @@ func (h *DBHandler) RunCustomMigrationQueuedApplicationVersions(ctx context.Cont // For commit_events migrations, we need some transformer to be on the database before we run their migrations. func (h *DBHandler) RunCustomMigrationsEventSourcingLight(ctx context.Context) error { - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { span, _ := tracer.StartSpanFromContext(ctx, "RunCustomMigrationsEventSourcingLight") defer span.Finish() l := logger.FromContext(ctx).Sugar() @@ -1965,7 +1965,7 @@ func (h *DBHandler) RunCustomMigrationAllAppsTable(ctx context.Context, getAllAp span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrationAllAppsTable") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { l := logger.FromContext(ctx).Sugar() allAppsDb, err := h.DBSelectAllApplications(ctx, transaction) if err != nil { @@ -2001,7 +2001,7 @@ func (h *DBHandler) RunCustomMigrationApps(ctx context.Context, getAllAppsFun Ge span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrationApps") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { dbApp, err := h.DBSelectAnyApp(ctx, transaction) if err != nil { return fmt.Errorf("could not get dbApp from database - assuming the manifest repo is correct: %v", err) @@ -4150,7 +4150,7 @@ func (h *DBHandler) RunCustomMigrationEnvironments(ctx context.Context, getAllEn span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrationEnvironments") defer span.Finish() - return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + return h.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { log := logger.FromContext(ctx).Sugar() arbitraryAllEnvsRow, err := h.DBSelectAnyEnvironment(ctx, transaction) diff --git a/pkg/db/db_test.go b/pkg/db/db_test.go index a5c756004..f41601e47 100644 --- a/pkg/db/db_test.go +++ b/pkg/db/db_test.go @@ -133,7 +133,7 @@ INSERT INTO all_apps (version , created , json) VALUES (1, '1713218400', '{"ap if err != nil { t.Fatal("Error establishing DB connection: ", zap.Error(err)) } - tx, err := db.DB.BeginTx(ctx, nil) + tx, err := db.BeginTransaction(ctx, false) if err != nil { t.Fatalf("Error creating transaction. Error: %v\n", err) } @@ -238,7 +238,7 @@ func TestCustomMigrationReleases(t *testing.T) { ctx := context.Background() dbHandler := SetupRepositoryTestWithDB(t) - err3 := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err3 := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { err2 := dbHandler.RunCustomMigrationReleases(ctx, getAllApps, getAllReleases) if err2 != nil { return fmt.Errorf("error: %v", err2) @@ -354,7 +354,7 @@ func TestCommitEvents(t *testing.T) { t.Fatalf("Error running custom migrations for esl table. Error: %v\n", err) } - err = db.WithTransaction(ctx, func(ctx context.Context, tx *sql.Tx) error { + err = db.WithTransaction(ctx, false, func(ctx context.Context, tx *sql.Tx) error { if err != nil { t.Fatalf("Error creating transaction. Error: %v\n", err) } @@ -533,7 +533,7 @@ func TestReadWriteDeployment(t *testing.T) { dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { deployment, err2 := dbHandler.DBSelectAnyDeployment(ctx, transaction) if err2 != nil { return err2 @@ -618,7 +618,7 @@ func TestDeleteEnvironmentLock(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { envLock, err2 := dbHandler.DBSelectEnvironmentLock(ctx, transaction, tc.Env, tc.LockID) if err2 != nil { return err2 @@ -695,7 +695,7 @@ func TestReadWriteEnvironmentLock(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { envLock, err2 := dbHandler.DBSelectEnvironmentLock(ctx, transaction, tc.Env, tc.LockID) if err2 != nil { return err2 @@ -770,7 +770,7 @@ func TestReadWriteApplicationLock(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { envLock, err2 := dbHandler.DBSelectAppLock(ctx, transaction, tc.Env, tc.AppName, tc.LockID) if err2 != nil { return err2 @@ -860,7 +860,7 @@ func TestDeleteApplicationLock(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { envLock, err2 := dbHandler.DBSelectEnvironmentLock(ctx, transaction, tc.Env, tc.LockID) if err2 != nil { return err2 @@ -967,7 +967,7 @@ func TestQueueApplicationVersion(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { for _, deployments := range tc.Deployments { err := dbHandler.DBWriteDeploymentAttempt(ctx, transaction, deployments.Env, deployments.App, deployments.Version) if err != nil { @@ -1030,7 +1030,7 @@ func TestQueueApplicationVersionDelete(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { err := dbHandler.DBWriteDeploymentAttempt(ctx, transaction, tc.Env, tc.AppName, tc.Version) if err != nil { return err @@ -1098,7 +1098,7 @@ func TestReadWriteTeamLock(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { envLock, err2 := dbHandler.DBSelectTeamLock(ctx, transaction, tc.Env, tc.TeamName, tc.LockID) if err2 != nil { return err2 @@ -1188,7 +1188,7 @@ func TestDeleteTeamLock(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { envLock, err2 := dbHandler.DBSelectTeamLock(ctx, transaction, tc.Env, tc.TeamName, tc.LockID) if err2 != nil { return err2 @@ -1277,7 +1277,7 @@ func TestDeleteRelease(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { err2 := dbHandler.DBInsertRelease(ctx, transaction, tc.toInsert, tc.toInsert.EslId-1) if err2 != nil { return err2 @@ -1453,7 +1453,7 @@ func TestReadWriteEnvironment(t *testing.T) { dbHandler := setupDB(t) for _, envToWrite := range tc.EnvsToWrite { - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { err := dbHandler.DBWriteEnvironment(ctx, transaction, envToWrite.EnvironmentName, envToWrite.EnvironmentConfig) if err != nil { return fmt.Errorf("error while writing environment, error: %w", err) @@ -1465,7 +1465,7 @@ func TestReadWriteEnvironment(t *testing.T) { } } - envEntry, err := WithTransactionT(dbHandler, ctx, func(ctx context.Context, transaction *sql.Tx) (*DBEnvironment, error) { + envEntry, err := WithTransactionT(dbHandler, ctx, true, func(ctx context.Context, transaction *sql.Tx) (*DBEnvironment, error) { envEntry, err := dbHandler.DBSelectEnvironment(ctx, transaction, tc.EnvToQuery) if err != nil { return nil, fmt.Errorf("error while selecting environment entry, error: %w", err) @@ -1557,7 +1557,7 @@ func TestReadWriteEslEvent(t *testing.T) { t.Parallel() ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, true, func(ctx context.Context, transaction *sql.Tx) error { err := dbHandler.DBWriteEslEventInternal(ctx, tc.EventType, transaction, tc.EventData, tc.EventMetadata) if err != nil { return err @@ -1621,7 +1621,7 @@ func TestReadWriteAllEnvironments(t *testing.T) { dbHandler := setupDB(t) for _, allEnvs := range tc.AllEnvsToWrite { - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { err := dbHandler.DBWriteAllEnvironments(ctx, transaction, allEnvs) if err != nil { return fmt.Errorf("error while writing environment, error: %w", err) @@ -1633,7 +1633,7 @@ func TestReadWriteAllEnvironments(t *testing.T) { } } - allEnvsEntry, err := WithTransactionT(dbHandler, ctx, func(ctx context.Context, transaction *sql.Tx) (*DBAllEnvironments, error) { + allEnvsEntry, err := WithTransactionT(dbHandler, ctx, true, func(ctx context.Context, transaction *sql.Tx) (*DBAllEnvironments, error) { allEnvsEntry, err := dbHandler.DBSelectAllEnvironments(ctx, transaction) if err != nil { return nil, fmt.Errorf("error while selecting environment entry, error: %w", err) @@ -1790,7 +1790,7 @@ func TestReadReleasesByApp(t *testing.T) { ctx := testutil.MakeTestContext() dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { for _, release := range tc.Releases { err := dbHandler.DBInsertRelease(ctx, transaction, release, release.EslId-1) if err != nil { diff --git a/pkg/db/transactions.go b/pkg/db/transactions.go index e36c0ee22..a8331c87c 100644 --- a/pkg/db/transactions.go +++ b/pkg/db/transactions.go @@ -28,8 +28,8 @@ type DBFunctionMultipleEntriesT[T any] func(ctx context.Context, transaction *sq // WithTransaction opens a transaction, runs `f` and then calls either Commit or Rollback. // Use this if the only thing to return from `f` is an error. -func (h *DBHandler) WithTransaction(ctx context.Context, f DBFunction) error { - _, err := WithTransactionT(h, ctx, func(ctx context.Context, transaction *sql.Tx) (*interface{}, error) { +func (h *DBHandler) WithTransaction(ctx context.Context, readonly bool, f DBFunction) error { + _, err := WithTransactionT(h, ctx, readonly, func(ctx context.Context, transaction *sql.Tx) (*interface{}, error) { err2 := f(ctx, transaction) if err2 != nil { return nil, err2 @@ -43,8 +43,8 @@ func (h *DBHandler) WithTransaction(ctx context.Context, f DBFunction) error { } // WithTransactionT is the same as WithTransaction, but you can also return data, not just the error. -func WithTransactionT[T any](h *DBHandler, ctx context.Context, f DBFunctionT[T]) (*T, error) { - res, err := WithTransactionMultipleEntriesT(h, ctx, func(ctx context.Context, transaction *sql.Tx) ([]T, error) { +func WithTransactionT[T any](h *DBHandler, ctx context.Context, readonly bool, f DBFunctionT[T]) (*T, error) { + res, err := WithTransactionMultipleEntriesT(h, ctx, readonly, func(ctx context.Context, transaction *sql.Tx) ([]T, error) { fRes, err2 := f(ctx, transaction) if err2 != nil { return nil, err2 @@ -61,17 +61,17 @@ func WithTransactionT[T any](h *DBHandler, ctx context.Context, f DBFunctionT[T] } // WithTransactionMultipleEntriesT is the same as WithTransaction, but you can also return and array of data, not just the error. -func WithTransactionMultipleEntriesT[T any](h *DBHandler, ctx context.Context, f DBFunctionMultipleEntriesT[T]) ([]T, error) { +func WithTransactionMultipleEntriesT[T any](h *DBHandler, ctx context.Context, readonly bool, f DBFunctionMultipleEntriesT[T]) ([]T, error) { span, ctx := tracer.StartSpanFromContext(ctx, "DBTransaction") defer span.Finish() - onError := func(e error) { + onError := func(e error) ([]T, error) { span.Finish(tracer.WithError(e)) + return nil, e } - tx, err := h.DB.BeginTx(ctx, nil) + tx, err := h.BeginTransaction(ctx, readonly) if err != nil { - onError(err) - return nil, err + return onError(err) } defer func(tx *sql.Tx) { _ = tx.Rollback() @@ -81,13 +81,18 @@ func WithTransactionMultipleEntriesT[T any](h *DBHandler, ctx context.Context, f result, err := f(ctx, tx) if err != nil { - onError(err) - return nil, err + return onError(err) } err = tx.Commit() if err != nil { - onError(err) - return nil, err + return onError(err) } return result, nil } + +func (h *DBHandler) BeginTransaction(ctx context.Context, readonly bool) (*sql.Tx, error) { + return h.DB.BeginTx(ctx, &sql.TxOptions{ + Isolation: sql.LevelLinearizable, + ReadOnly: readonly, + }) +} diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index a7fc5fa9c..60c090bcb 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -483,7 +483,7 @@ func New2(ctx context.Context, cfg RepositoryConfig) (Repository, setup.Backgrou // Check configuration for errors and abort early if any: if state.DBHandler.ShouldUseOtherTables() { - _, err = db.WithTransactionT(state.DBHandler, ctx, func(ctx context.Context, transaction *sql.Tx) (*map[string]config.EnvironmentConfig, error) { + _, err = db.WithTransactionT(state.DBHandler, ctx, false, func(ctx context.Context, transaction *sql.Tx) (*map[string]config.EnvironmentConfig, error) { ret, err := state.GetEnvironmentConfigsAndValidate(ctx, transaction) return &ret, err }) @@ -541,7 +541,7 @@ func (r *repository) applyTransformerBatches(transformerBatches []transformerBat var txErr error e := transformerBatches[i] if r.DB.ShouldUseEslTable() { - transaction, txErr = r.DB.DB.BeginTx(e.ctx, nil) + transaction, txErr = r.DB.BeginTransaction(e.ctx, false) if txErr != nil { e.finish(txErr) transformerBatches = append(transformerBatches[:i], transformerBatches[i+1:]...) @@ -781,23 +781,14 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e transformerBatch, c } func UpdateDatadogMetricsDB(ctx context.Context, state *State, r Repository, changes *TransformerResult, now time.Time) error { - var transaction *sql.Tx - var txErr error repo := r.(*repository) - transaction, txErr = repo.DB.DB.BeginTx(ctx, nil) - if txErr != nil { - return txErr - } - defer func(tx *sql.Tx) { - _ = tx.Rollback() - }(transaction) - - ddError := UpdateDatadogMetrics(ctx, transaction, state, r, changes, now) - if ddError != nil { - return ddError - } - - err := transaction.Commit() + err := repo.DB.WithTransaction(ctx, true, func(ctx context.Context, transaction *sql.Tx) error { + ddError := UpdateDatadogMetrics(ctx, transaction, state, r, changes, now) + if ddError != nil { + return ddError + } + return nil + }) if err != nil { return err } diff --git a/services/cd-service/pkg/repository/transformer.go b/services/cd-service/pkg/repository/transformer.go index f84728c8c..13082f69f 100644 --- a/services/cd-service/pkg/repository/transformer.go +++ b/services/cd-service/pkg/repository/transformer.go @@ -260,7 +260,7 @@ func RegularlySendDatadogMetrics(repo Repository, interval time.Duration, callBa func GetRepositoryStateAndUpdateMetrics(ctx context.Context, repo Repository) { s := repo.State() if s.DBHandler.ShouldUseOtherTables() { - err := s.DBHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := s.DBHandler.WithTransaction(ctx, true, func(ctx context.Context, transaction *sql.Tx) error { if err := UpdateDatadogMetrics(ctx, transaction, s, repo, nil, time.Now()); err != nil { return err } diff --git a/services/cd-service/pkg/repository/transformer_db_test.go b/services/cd-service/pkg/repository/transformer_db_test.go index 3b1fc0f3d..fe1055390 100644 --- a/services/cd-service/pkg/repository/transformer_db_test.go +++ b/services/cd-service/pkg/repository/transformer_db_test.go @@ -297,7 +297,7 @@ func TestTransformerWritesEslDataRoundTrip(t *testing.T) { t.Errorf("setup error could not set up transformers \n%v", err) } - err = r.DB.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err = r.DB.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { _, _, _, err2 := repo.ApplyTransformersInternal(testutil.MakeTestContext(), transaction, tc.Transformer) if err2 != nil { return err2 @@ -431,7 +431,7 @@ func TestEnvLockTransformersWithDB(t *testing.T) { var err error = nil repo = SetupRepositoryTestWithDB(t) r := repo.(*repository) - err = r.DB.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err = r.DB.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { var batchError *TransformerBatchApplyError = nil _, _, _, batchError = r.ApplyTransformersInternal(testutil.MakeTestContext(), transaction, tc.Transformers...) if batchError != nil { @@ -448,7 +448,7 @@ func TestEnvLockTransformersWithDB(t *testing.T) { } } - locks, err := db.WithTransactionT(repo.State().DBHandler, ctx, func(ctx context.Context, transaction *sql.Tx) (*db.AllEnvLocksGo, error) { + locks, err := db.WithTransactionT(repo.State().DBHandler, ctx, false, func(ctx context.Context, transaction *sql.Tx) (*db.AllEnvLocksGo, error) { return repo.State().DBHandler.DBSelectAllEnvironmentLocks(ctx, transaction, envProduction) }) @@ -591,7 +591,7 @@ func TestTeamLockTransformersWithDB(t *testing.T) { var err error = nil repo = SetupRepositoryTestWithDB(t) r := repo.(*repository) - err = r.DB.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err = r.DB.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { var batchError *TransformerBatchApplyError = nil _, _, _, batchError = r.ApplyTransformersInternal(testutil.MakeTestContext(), transaction, tc.Transformers...) if batchError != nil { @@ -608,7 +608,7 @@ func TestTeamLockTransformersWithDB(t *testing.T) { } } - locks, err := db.WithTransactionT(repo.State().DBHandler, ctx, func(ctx context.Context, transaction *sql.Tx) (*db.AllTeamLocksGo, error) { + locks, err := db.WithTransactionT(repo.State().DBHandler, ctx, true, func(ctx context.Context, transaction *sql.Tx) (*db.AllTeamLocksGo, error) { return repo.State().DBHandler.DBSelectAllTeamLocks(ctx, transaction, envAcceptance, team) }) @@ -753,7 +753,7 @@ func TestCreateApplicationVersionDB(t *testing.T) { ctxWithTime := time.WithTimeNow(testutil.MakeTestContext(), timeNowOld) repo := SetupRepositoryTestWithDB(t) - err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, func(ctx context.Context, transaction *sql.Tx) error { + err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, false, func(ctx context.Context, transaction *sql.Tx) error { _, state, _, err := repo.ApplyTransformersInternal(ctx, transaction, tc.Transformers...) if err != nil { t.Fatalf("expected no error, got %v", err) @@ -831,7 +831,7 @@ func TestDeleteQueueApplicationVersion(t *testing.T) { t.Parallel() ctxWithTime := time.WithTimeNow(testutil.MakeTestContext(), timeNowOld) repo := SetupRepositoryTestWithDB(t) - err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, func(ctx context.Context, transaction *sql.Tx) error { + err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, false, func(ctx context.Context, transaction *sql.Tx) error { _, state, _, err := repo.ApplyTransformersInternal(ctx, transaction, tc.Transformers...) if err != nil { t.Fatalf("expected no error, got %v", err) @@ -899,7 +899,7 @@ func TestQueueDeploymentTransformer(t *testing.T) { t.Parallel() ctxWithTime := time.WithTimeNow(testutil.MakeTestContext(), timeNowOld) repo := SetupRepositoryTestWithDB(t) - err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, func(ctx context.Context, transaction *sql.Tx) error { + err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, false, func(ctx context.Context, transaction *sql.Tx) error { _, state, _, err := repo.ApplyTransformersInternal(ctx, transaction, tc.Transformers...) if err != nil { t.Fatalf("expected no error, got %v", err) @@ -1029,7 +1029,7 @@ func TestCleanupOldVersionDB(t *testing.T) { ctxWithTime := time.WithTimeNow(testutil.MakeTestContext(), timeNowOld) repo := SetupRepositoryTestWithDB(t) repo.(*repository).config.ReleaseVersionsLimit = tc.ReleaseVersionLimit - err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, func(ctx context.Context, transaction *sql.Tx) error { + err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, false, func(ctx context.Context, transaction *sql.Tx) error { _, state, _, err := repo.ApplyTransformersInternal(ctx, transaction, tc.Transformers...) if err != nil { t.Fatalf("expected no error, got %v", err) @@ -1111,7 +1111,7 @@ func TestCreateEnvironmentTransformer(t *testing.T) { t.Parallel() ctxWithTime := time.WithTimeNow(testutil.MakeTestContext(), timeNowOld) repo := SetupRepositoryTestWithDB(t) - err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, func(ctx context.Context, transaction *sql.Tx) error { + err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, false, func(ctx context.Context, transaction *sql.Tx) error { _, state, _, err := repo.ApplyTransformersInternal(ctx, transaction, tc.Transformers...) if err != nil { t.Fatalf("expected no error, got %v", err) @@ -1193,7 +1193,7 @@ func TestEventGenerationFromTransformers(t *testing.T) { t.Parallel() ctxWithTime := time.WithTimeNow(testutil.MakeTestContext(), timeNowOld) repo := SetupRepositoryTestWithDB(t) - err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, func(ctx context.Context, transaction *sql.Tx) error { + err3 := repo.State().DBHandler.WithTransaction(ctxWithTime, false, func(ctx context.Context, transaction *sql.Tx) error { _, state, _, err := repo.ApplyTransformersInternal(ctx, transaction, tc.Transformers...) if err != nil { t.Fatalf("expected no error, got %v", err) @@ -1346,7 +1346,7 @@ func TestEvents(t *testing.T) { var err error = nil repo = SetupRepositoryTestWithDB(t) r := repo.(*repository) - err = r.DB.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err = r.DB.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { var batchError *TransformerBatchApplyError = nil _, _, _, batchError = r.ApplyTransformersInternal(testutil.MakeTestContext(), transaction, tc.Transformers...) if batchError != nil { @@ -1471,7 +1471,7 @@ func TestDeleteEnvFromAppWithDB(t *testing.T) { t.Parallel() ctxWithTime := time.WithTimeNow(testutil.MakeTestContext(), timeNowOld) repo := SetupRepositoryTestWithDB(t) - err := repo.State().DBHandler.WithTransaction(ctxWithTime, func(ctx context.Context, transaction *sql.Tx) error { + err := repo.State().DBHandler.WithTransaction(ctxWithTime, false, func(ctx context.Context, transaction *sql.Tx) error { for _, release := range tc.PrevReleases { repo.State().DBHandler.DBInsertRelease(ctx, transaction, release, 0) } diff --git a/services/cd-service/pkg/repository/transformer_test.go b/services/cd-service/pkg/repository/transformer_test.go index 6fc8926e9..4dfd37cf4 100644 --- a/services/cd-service/pkg/repository/transformer_test.go +++ b/services/cd-service/pkg/repository/transformer_test.go @@ -1870,7 +1870,6 @@ func TestApplicationDeploymentEvent(t *testing.T) { if batchError != nil { t.Fatalf("2 encountered error but no error is expected here: '%v'", batchError) } - if err != nil { t.Fatalf("encountered error but no error is expected here: '%v'", err) } diff --git a/services/cd-service/pkg/service/batch_test.go b/services/cd-service/pkg/service/batch_test.go index 4d84b4efb..e1f92c3eb 100644 --- a/services/cd-service/pkg/service/batch_test.go +++ b/services/cd-service/pkg/service/batch_test.go @@ -1042,7 +1042,7 @@ func TestCreateEnvironmentTrain(t *testing.T) { var envs map[string]config.EnvironmentConfig if repo.State().DBHandler.ShouldUseOtherTables() { var envsPtr *map[string]config.EnvironmentConfig - envsPtr, err = db.WithTransactionT(repo.State().DBHandler, ctx, func(ctx context.Context, transaction *sql.Tx) (*map[string]config.EnvironmentConfig, error) { + envsPtr, err = db.WithTransactionT(repo.State().DBHandler, ctx, true, func(ctx context.Context, transaction *sql.Tx) (*map[string]config.EnvironmentConfig, error) { envs, err := repo.State().GetAllEnvironmentConfigs(ctx, transaction) return &envs, err }) diff --git a/services/cd-service/pkg/service/environment.go b/services/cd-service/pkg/service/environment.go index 3f8ca4483..f75e8e6e7 100644 --- a/services/cd-service/pkg/service/environment.go +++ b/services/cd-service/pkg/service/environment.go @@ -36,7 +36,7 @@ func (o *EnvironmentServiceServer) GetEnvironmentConfig( in *api.GetEnvironmentConfigRequest) (*api.GetEnvironmentConfigResponse, error) { state := o.Repository.State() - config, err := db.WithTransactionT(state.DBHandler, ctx, func(ctx context.Context, transaction *sql.Tx) (*config.EnvironmentConfig, error) { + config, err := db.WithTransactionT(state.DBHandler, ctx, true, func(ctx context.Context, transaction *sql.Tx) (*config.EnvironmentConfig, error) { return state.GetEnvironmentConfig(ctx, transaction, in.Environment) }) diff --git a/services/cd-service/pkg/service/git.go b/services/cd-service/pkg/service/git.go index b18518089..bf159aea5 100644 --- a/services/cd-service/pkg/service/git.go +++ b/services/cd-service/pkg/service/git.go @@ -202,7 +202,7 @@ func (s *GitServer) GetCommitInfo(ctx context.Context, in *api.GetCommitInfoRequ sort.Strings(touchedApps) var events []*api.Event if s.OverviewService.Repository.State().DBHandler.ShouldUseOtherTables() { - events, err = db.WithTransactionMultipleEntriesT(s.OverviewService.Repository.State().DBHandler, ctx, func(ctx context.Context, transaction *sql.Tx) ([]*api.Event, error) { + events, err = db.WithTransactionMultipleEntriesT(s.OverviewService.Repository.State().DBHandler, ctx, true, func(ctx context.Context, transaction *sql.Tx) ([]*api.Event, error) { return s.GetEvents(ctx, transaction, fs, commitPath) }) } else { diff --git a/services/cd-service/pkg/service/overview.go b/services/cd-service/pkg/service/overview.go index b98460679..eeb359f4e 100644 --- a/services/cd-service/pkg/service/overview.go +++ b/services/cd-service/pkg/service/overview.go @@ -81,7 +81,7 @@ func (o *OverviewServiceServer) getOverviewDB( var response *api.GetOverviewResponse if s.DBHandler.ShouldUseOtherTables() { - err := s.DBHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := s.DBHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { var err2 error response, err2 = o.getOverview(ctx, s, transaction) return err2 diff --git a/services/cd-service/pkg/service/version.go b/services/cd-service/pkg/service/version.go index 7a2237c83..3be94f7e2 100644 --- a/services/cd-service/pkg/service/version.go +++ b/services/cd-service/pkg/service/version.go @@ -100,7 +100,7 @@ func (o *VersionServiceServer) GetManifests(ctx context.Context, req *api.GetMan } if state.DBHandler.ShouldUseOtherTables() { - result, err := db.WithTransactionT(state.DBHandler, ctx, func(ctx context.Context, transaction *sql.Tx) (*api.GetManifestsResponse, error) { + result, err := db.WithTransactionT(state.DBHandler, ctx, false, func(ctx context.Context, transaction *sql.Tx) (*api.GetManifestsResponse, error) { var ( err error release uint64 diff --git a/services/manifest-repo-export-service/pkg/cmd/server.go b/services/manifest-repo-export-service/pkg/cmd/server.go index 71e66e64e..b68eb8298 100755 --- a/services/manifest-repo-export-service/pkg/cmd/server.go +++ b/services/manifest-repo-export-service/pkg/cmd/server.go @@ -219,7 +219,9 @@ func Run(ctx context.Context) error { eslTableEmpty := false eslEventSkipped := false - err = dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + // most of what happens here is indeed "read only", however we have to write to the cutoff table in the end + const readonly = false + err = dbHandler.WithTransaction(ctx, readonly, func(ctx context.Context, transaction *sql.Tx) error { eslId, err := cutoff.DBReadCutoff(dbHandler, ctx, transaction) if err != nil { return fmt.Errorf("error in DBReadCutoff %v", err) diff --git a/services/manifest-repo-export-service/pkg/db/cutoff_test.go b/services/manifest-repo-export-service/pkg/db/cutoff_test.go index 3feed2b02..364353089 100644 --- a/services/manifest-repo-export-service/pkg/db/cutoff_test.go +++ b/services/manifest-repo-export-service/pkg/db/cutoff_test.go @@ -56,7 +56,7 @@ func TestTransformerWritesEslDataRoundTrip(t *testing.T) { dbHandler := setupDB(t) - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { //We need to add transformers for these eslids beforehand (FK) tf := EmptyTransformer{} i := 0 diff --git a/services/manifest-repo-export-service/pkg/repository/transformer_test.go b/services/manifest-repo-export-service/pkg/repository/transformer_test.go index 5a93537d3..b8efb760c 100644 --- a/services/manifest-repo-export-service/pkg/repository/transformer_test.go +++ b/services/manifest-repo-export-service/pkg/repository/transformer_test.go @@ -412,7 +412,7 @@ func TestTransformerWorksWithDb(t *testing.T) { ctx := context.Background() dbHandler := repo.State().DBHandler - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { // setup: // this 'INSERT INTO' would be done one the cd-server side, so we emulate it here: err := dbHandler.DBWriteMigrationsTransformer(ctx, transaction) @@ -604,7 +604,7 @@ func TestDeploymentEvent(t *testing.T) { ctx := AddGeneratorToContext(testutil.MakeTestContext(), testutil.NewIncrementalUUIDGenerator()) dbHandler := repo.State().DBHandler - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { // setup: // this 'INSERT INTO' would be done one the cd-server side, so we emulate it here: err := dbHandler.DBWriteMigrationsTransformer(ctx, transaction) @@ -794,7 +794,7 @@ func TestCleanupOldApplicationVersions(t *testing.T) { ctx := AddGeneratorToContext(testutil.MakeTestContext(), testutil.NewIncrementalUUIDGenerator()) dbHandler := repo.State().DBHandler - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { // setup: // this 'INSERT INTO' would be done one the cd-server side, so we emulate it here: err := dbHandler.DBWriteMigrationsTransformer(ctx, transaction) @@ -987,7 +987,7 @@ func TestReplacedByEvents(t *testing.T) { ctx := AddGeneratorToContext(testutil.MakeTestContext(), testutil.NewIncrementalUUIDGenerator()) dbHandler := repo.State().DBHandler - err := dbHandler.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error { + err := dbHandler.WithTransaction(ctx, false, func(ctx context.Context, transaction *sql.Tx) error { // setup: // this 'INSERT INTO' would be done one the cd-server side, so we emulate it here: err := dbHandler.DBWriteMigrationsTransformer(ctx, transaction)