From 04c58267180011516b5a5f78e765c6dae679b6bc Mon Sep 17 00:00:00 2001 From: Sven Urbanski Date: Fri, 16 Aug 2024 17:57:35 +0200 Subject: [PATCH 1/3] fix(db): write to cutoff table if writeEslTableOnly=true If we were previously running with `db.writeEslTableOnly=true`, but now are running with `db.writeEslTableOnly=false` (which is the recommended way to enable the database), then we would have many events in the event_sourcing_light table that have not been processed - possibly over days or weeks. We must not apply events that are that outdated! So, if there is no cutoff yet, we write the current transformer to the cutoff, so it doesn't get processed. Ref: SRX-KNBOC7 --- .../pkg => pkg}/db/cutoff.go | 11 +++--- .../pkg => pkg}/db/cutoff_test.go | 39 +++---------------- .../cd-service/pkg/repository/repository.go | 27 +++++++++++++ .../pkg/cmd/server.go | 7 ++-- 4 files changed, 41 insertions(+), 43 deletions(-) rename {services/manifest-repo-export-service/pkg => pkg}/db/cutoff.go (86%) rename {services/manifest-repo-export-service/pkg => pkg}/db/cutoff_test.go (72%) diff --git a/services/manifest-repo-export-service/pkg/db/cutoff.go b/pkg/db/cutoff.go similarity index 86% rename from services/manifest-repo-export-service/pkg/db/cutoff.go rename to pkg/db/cutoff.go index ee999e8c6..804b61da3 100644 --- a/services/manifest-repo-export-service/pkg/db/cutoff.go +++ b/pkg/db/cutoff.go @@ -14,7 +14,7 @@ along with kuberpult. If not, see Copyright freiheit.com*/ -package cutoff +package db import ( "context" @@ -23,12 +23,11 @@ import ( "fmt" "time" - "github.com/freiheit-com/kuberpult/pkg/db" "github.com/freiheit-com/kuberpult/pkg/logger" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) -func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVersion, error) { +func DBReadCutoff(h *DBHandler, ctx context.Context, tx *sql.Tx) (*EslVersion, error) { span, _ := tracer.StartSpanFromContext(ctx, "DBReadCutoff") defer span.Finish() @@ -48,8 +47,8 @@ func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVers } }(rows) - var eslVersion db.EslVersion - var eslVersionPtr *db.EslVersion = nil + var eslVersion EslVersion + var eslVersionPtr *EslVersion = nil if rows.Next() { err := rows.Scan(&eslVersion) if err != nil { @@ -71,7 +70,7 @@ func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVers return eslVersionPtr, nil } -func DBWriteCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx, eslVersion db.EslVersion) error { +func DBWriteCutoff(h *DBHandler, ctx context.Context, tx *sql.Tx, eslVersion EslVersion) error { span, _ := tracer.StartSpanFromContext(ctx, "DBWriteCutoff") defer span.Finish() diff --git a/services/manifest-repo-export-service/pkg/db/cutoff_test.go b/pkg/db/cutoff_test.go similarity index 72% rename from services/manifest-repo-export-service/pkg/db/cutoff_test.go rename to pkg/db/cutoff_test.go index ac3d47704..1f19cbc10 100644 --- a/services/manifest-repo-export-service/pkg/db/cutoff_test.go +++ b/pkg/db/cutoff_test.go @@ -14,7 +14,7 @@ along with kuberpult. If not, see Copyright freiheit.com*/ -package cutoff +package db import ( "context" @@ -23,7 +23,6 @@ import ( "fmt" "testing" - "github.com/freiheit-com/kuberpult/pkg/db" "github.com/freiheit-com/kuberpult/pkg/testutil" "github.com/google/go-cmp/cmp" ) @@ -34,17 +33,17 @@ type EmptyTransformer struct{} func TestTransformerWritesEslDataRoundTrip(t *testing.T) { tcs := []struct { Name string - eslVersion []db.EslVersion - ExpectedEslVersion db.EslVersion + eslVersion []EslVersion + ExpectedEslVersion EslVersion }{ { Name: "test with one write operation", - eslVersion: []db.EslVersion{1}, + eslVersion: []EslVersion{1}, ExpectedEslVersion: 1, }, { Name: "test with multiple write operations", - eslVersion: []db.EslVersion{1, 2, 3, 4, 5}, + eslVersion: []EslVersion{1, 2, 3, 4, 5}, ExpectedEslVersion: 5, }, } @@ -63,7 +62,7 @@ func TestTransformerWritesEslDataRoundTrip(t *testing.T) { i := 0 for i < len(tc.eslVersion) { //Write bogus transformer for FK reasons - err := dbHandler.DBWriteEslEventInternal(ctx, "empty", transaction, interface{}(tf), db.ESLMetadata{}) + err := dbHandler.DBWriteEslEventInternal(ctx, "empty", transaction, interface{}(tf), ESLMetadata{}) if err != nil { return err } @@ -100,29 +99,3 @@ func TestTransformerWritesEslDataRoundTrip(t *testing.T) { }) } } - -// setupDB returns a new DBHandler with a tmp directory every time, so tests can are completely independent -func setupDB(t *testing.T) *db.DBHandler { - ctx := context.Background() - dir, err := testutil.CreateMigrationsPath(4) - tmpDir := t.TempDir() - t.Logf("directory for DB migrations: %s", dir) - t.Logf("tmp dir for DB data: %s", tmpDir) - cfg := db.DBConfig{ - MigrationsPath: dir, - DriverName: "sqlite3", - DbHost: tmpDir, - } - - migErr := db.RunDBMigrations(ctx, cfg) - if migErr != nil { - t.Fatal(migErr) - } - - dbHandler, err := db.Connect(ctx, cfg) - if err != nil { - t.Fatal(err) - } - - return dbHandler -} diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index c3d97cc64..b4a5ce52e 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -854,6 +854,33 @@ func (r *repository) ApplyTransformersInternal(ctx context.Context, transaction } } t.SetEslVersion(db.TransformerID(internal.EslVersion)) + + if r.DB.ShouldUseOtherTables() { + // if we were previously running with `db.writeEslTableOnly=true`, but now are running with + // `db.writeEslTableOnly=false` (which is the recommended way to enable the database), + // then we would have many events in the event_sourcing_light table that have not been processed. + // So, if there is no cutoff yet, we write the current transformer to the cutoff, so it doesn't get processed: + + eslVersion, err := db.DBReadCutoff(r.DB, ctx, transaction) + if err != nil { + applyErr := TransformerBatchApplyError{ + TransformerError: err, + Index: i, + } + return nil, nil, nil, &applyErr + } + // only write if there is no cutoff yet: + if eslVersion == nil { + err = db.DBWriteCutoff(r.DB, ctx, transaction, internal.EslVersion) + if err != nil { + applyErr := TransformerBatchApplyError{ + TransformerError: err, + Index: i, + } + return nil, nil, nil, &applyErr + } + } + } } if msg, subChanges, err := RunTransformer(ctxWithTime, t, state, transaction); err != nil { diff --git a/services/manifest-repo-export-service/pkg/cmd/server.go b/services/manifest-repo-export-service/pkg/cmd/server.go index 1755b6ab0..e8dce2d95 100755 --- a/services/manifest-repo-export-service/pkg/cmd/server.go +++ b/services/manifest-repo-export-service/pkg/cmd/server.go @@ -20,7 +20,6 @@ import ( "context" "database/sql" "github.com/cenkalti/backoff/v4" - cutoff "github.com/freiheit-com/kuberpult/services/manifest-repo-export-service/pkg/db" "strconv" "time" @@ -297,7 +296,7 @@ func processEsls(ctx context.Context, repo repository.Repository, dbHandler *db. if err3 != nil { return err3 } - return cutoff.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion) + return db.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion) }) if err2 != nil { return fmt.Errorf("error in DBWriteFailedEslEvent %v", err2) @@ -313,7 +312,7 @@ func processEsls(ctx context.Context, repo repository.Repository, dbHandler *db. } log.Infof("event processed successfully, now writing to cutoff and pushing...") err = dbHandler.WithTransactionR(ctx, 2, false, func(ctx context.Context, transaction *sql.Tx) error { - err2 := cutoff.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion) + err2 := db.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion) if err2 != nil { return err2 } @@ -354,7 +353,7 @@ func measurePushes(ddMetrics statsd.ClientInterface, log *zap.SugaredLogger, fai func handleOneEvent(ctx context.Context, transaction *sql.Tx, dbHandler *db.DBHandler, ddMetrics statsd.ClientInterface, repo repository.Repository) (repository.Transformer, *db.EslEventRow, error) { log := logger.FromContext(ctx).Sugar() - eslVersion, err := cutoff.DBReadCutoff(dbHandler, ctx, transaction) + eslVersion, err := db.DBReadCutoff(dbHandler, ctx, transaction) if err != nil { return nil, nil, fmt.Errorf("error in DBReadCutoff %v", err) } From ea393c20459ad4ca7ac6762e62fa9ca624cd7ea1 Mon Sep 17 00:00:00 2001 From: Miguel Crespo Date: Wed, 28 Aug 2024 12:56:23 +0100 Subject: [PATCH 2/3] Fix cutoff --- services/cd-service/pkg/repository/repository.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index b4a5ce52e..43feb0e43 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -855,7 +855,7 @@ func (r *repository) ApplyTransformersInternal(ctx context.Context, transaction } t.SetEslVersion(db.TransformerID(internal.EslVersion)) - if r.DB.ShouldUseOtherTables() { + if r.DB.WriteEslOnly { // if we were previously running with `db.writeEslTableOnly=true`, but now are running with // `db.writeEslTableOnly=false` (which is the recommended way to enable the database), // then we would have many events in the event_sourcing_light table that have not been processed. From 10c16c07657599f8a8843f65c79853bc5102cdf2 Mon Sep 17 00:00:00 2001 From: Miguel Crespo Date: Fri, 30 Aug 2024 11:34:16 +0100 Subject: [PATCH 3/3] fix cutoff and update --- .../cd-service/pkg/repository/repository.go | 20 ++++++------------- 1 file changed, 6 insertions(+), 14 deletions(-) diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index 43feb0e43..e969240d2 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -855,13 +855,15 @@ func (r *repository) ApplyTransformersInternal(ctx context.Context, transaction } t.SetEslVersion(db.TransformerID(internal.EslVersion)) - if r.DB.WriteEslOnly { + if r.DB != nil && r.DB.WriteEslOnly { // if we were previously running with `db.writeEslTableOnly=true`, but now are running with // `db.writeEslTableOnly=false` (which is the recommended way to enable the database), // then we would have many events in the event_sourcing_light table that have not been processed. - // So, if there is no cutoff yet, we write the current transformer to the cutoff, so it doesn't get processed: + // So, we write the cutoff if we are only writing to the esl table. Then, when the database is fully + // enabled, the cutoff is found and determined to be the latest transformer. When this happens, + // the export service takes over the duties of writing the cutoff - eslVersion, err := db.DBReadCutoff(r.DB, ctx, transaction) + err = db.DBWriteCutoff(r.DB, ctx, transaction, internal.EslVersion) if err != nil { applyErr := TransformerBatchApplyError{ TransformerError: err, @@ -869,17 +871,7 @@ func (r *repository) ApplyTransformersInternal(ctx context.Context, transaction } return nil, nil, nil, &applyErr } - // only write if there is no cutoff yet: - if eslVersion == nil { - err = db.DBWriteCutoff(r.DB, ctx, transaction, internal.EslVersion) - if err != nil { - applyErr := TransformerBatchApplyError{ - TransformerError: err, - Index: i, - } - return nil, nil, nil, &applyErr - } - } + } }