diff --git a/services/manifest-repo-export-service/pkg/db/cutoff.go b/pkg/db/cutoff.go similarity index 86% rename from services/manifest-repo-export-service/pkg/db/cutoff.go rename to pkg/db/cutoff.go index ee999e8c6..804b61da3 100644 --- a/services/manifest-repo-export-service/pkg/db/cutoff.go +++ b/pkg/db/cutoff.go @@ -14,7 +14,7 @@ along with kuberpult. If not, see Copyright freiheit.com*/ -package cutoff +package db import ( "context" @@ -23,12 +23,11 @@ import ( "fmt" "time" - "github.com/freiheit-com/kuberpult/pkg/db" "github.com/freiheit-com/kuberpult/pkg/logger" "gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer" ) -func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVersion, error) { +func DBReadCutoff(h *DBHandler, ctx context.Context, tx *sql.Tx) (*EslVersion, error) { span, _ := tracer.StartSpanFromContext(ctx, "DBReadCutoff") defer span.Finish() @@ -48,8 +47,8 @@ func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVers } }(rows) - var eslVersion db.EslVersion - var eslVersionPtr *db.EslVersion = nil + var eslVersion EslVersion + var eslVersionPtr *EslVersion = nil if rows.Next() { err := rows.Scan(&eslVersion) if err != nil { @@ -71,7 +70,7 @@ func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVers return eslVersionPtr, nil } -func DBWriteCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx, eslVersion db.EslVersion) error { +func DBWriteCutoff(h *DBHandler, ctx context.Context, tx *sql.Tx, eslVersion EslVersion) error { span, _ := tracer.StartSpanFromContext(ctx, "DBWriteCutoff") defer span.Finish() diff --git a/services/manifest-repo-export-service/pkg/db/cutoff_test.go b/pkg/db/cutoff_test.go similarity index 72% rename from services/manifest-repo-export-service/pkg/db/cutoff_test.go rename to pkg/db/cutoff_test.go index ac3d47704..1f19cbc10 100644 --- a/services/manifest-repo-export-service/pkg/db/cutoff_test.go +++ b/pkg/db/cutoff_test.go @@ -14,7 +14,7 @@ along with kuberpult. If not, see Copyright freiheit.com*/ -package cutoff +package db import ( "context" @@ -23,7 +23,6 @@ import ( "fmt" "testing" - "github.com/freiheit-com/kuberpult/pkg/db" "github.com/freiheit-com/kuberpult/pkg/testutil" "github.com/google/go-cmp/cmp" ) @@ -34,17 +33,17 @@ type EmptyTransformer struct{} func TestTransformerWritesEslDataRoundTrip(t *testing.T) { tcs := []struct { Name string - eslVersion []db.EslVersion - ExpectedEslVersion db.EslVersion + eslVersion []EslVersion + ExpectedEslVersion EslVersion }{ { Name: "test with one write operation", - eslVersion: []db.EslVersion{1}, + eslVersion: []EslVersion{1}, ExpectedEslVersion: 1, }, { Name: "test with multiple write operations", - eslVersion: []db.EslVersion{1, 2, 3, 4, 5}, + eslVersion: []EslVersion{1, 2, 3, 4, 5}, ExpectedEslVersion: 5, }, } @@ -63,7 +62,7 @@ func TestTransformerWritesEslDataRoundTrip(t *testing.T) { i := 0 for i < len(tc.eslVersion) { //Write bogus transformer for FK reasons - err := dbHandler.DBWriteEslEventInternal(ctx, "empty", transaction, interface{}(tf), db.ESLMetadata{}) + err := dbHandler.DBWriteEslEventInternal(ctx, "empty", transaction, interface{}(tf), ESLMetadata{}) if err != nil { return err } @@ -100,29 +99,3 @@ func TestTransformerWritesEslDataRoundTrip(t *testing.T) { }) } } - -// setupDB returns a new DBHandler with a tmp directory every time, so tests can are completely independent -func setupDB(t *testing.T) *db.DBHandler { - ctx := context.Background() - dir, err := testutil.CreateMigrationsPath(4) - tmpDir := t.TempDir() - t.Logf("directory for DB migrations: %s", dir) - t.Logf("tmp dir for DB data: %s", tmpDir) - cfg := db.DBConfig{ - MigrationsPath: dir, - DriverName: "sqlite3", - DbHost: tmpDir, - } - - migErr := db.RunDBMigrations(ctx, cfg) - if migErr != nil { - t.Fatal(migErr) - } - - dbHandler, err := db.Connect(ctx, cfg) - if err != nil { - t.Fatal(err) - } - - return dbHandler -} diff --git a/services/cd-service/pkg/repository/repository.go b/services/cd-service/pkg/repository/repository.go index c3d97cc64..e969240d2 100644 --- a/services/cd-service/pkg/repository/repository.go +++ b/services/cd-service/pkg/repository/repository.go @@ -854,6 +854,25 @@ func (r *repository) ApplyTransformersInternal(ctx context.Context, transaction } } t.SetEslVersion(db.TransformerID(internal.EslVersion)) + + if r.DB != nil && r.DB.WriteEslOnly { + // if we were previously running with `db.writeEslTableOnly=true`, but now are running with + // `db.writeEslTableOnly=false` (which is the recommended way to enable the database), + // then we would have many events in the event_sourcing_light table that have not been processed. + // So, we write the cutoff if we are only writing to the esl table. Then, when the database is fully + // enabled, the cutoff is found and determined to be the latest transformer. When this happens, + // the export service takes over the duties of writing the cutoff + + err = db.DBWriteCutoff(r.DB, ctx, transaction, internal.EslVersion) + if err != nil { + applyErr := TransformerBatchApplyError{ + TransformerError: err, + Index: i, + } + return nil, nil, nil, &applyErr + } + + } } if msg, subChanges, err := RunTransformer(ctxWithTime, t, state, transaction); err != nil { diff --git a/services/manifest-repo-export-service/pkg/cmd/server.go b/services/manifest-repo-export-service/pkg/cmd/server.go index 1755b6ab0..e8dce2d95 100755 --- a/services/manifest-repo-export-service/pkg/cmd/server.go +++ b/services/manifest-repo-export-service/pkg/cmd/server.go @@ -20,7 +20,6 @@ import ( "context" "database/sql" "github.com/cenkalti/backoff/v4" - cutoff "github.com/freiheit-com/kuberpult/services/manifest-repo-export-service/pkg/db" "strconv" "time" @@ -297,7 +296,7 @@ func processEsls(ctx context.Context, repo repository.Repository, dbHandler *db. if err3 != nil { return err3 } - return cutoff.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion) + return db.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion) }) if err2 != nil { return fmt.Errorf("error in DBWriteFailedEslEvent %v", err2) @@ -313,7 +312,7 @@ func processEsls(ctx context.Context, repo repository.Repository, dbHandler *db. } log.Infof("event processed successfully, now writing to cutoff and pushing...") err = dbHandler.WithTransactionR(ctx, 2, false, func(ctx context.Context, transaction *sql.Tx) error { - err2 := cutoff.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion) + err2 := db.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion) if err2 != nil { return err2 } @@ -354,7 +353,7 @@ func measurePushes(ddMetrics statsd.ClientInterface, log *zap.SugaredLogger, fai func handleOneEvent(ctx context.Context, transaction *sql.Tx, dbHandler *db.DBHandler, ddMetrics statsd.ClientInterface, repo repository.Repository) (repository.Transformer, *db.EslEventRow, error) { log := logger.FromContext(ctx).Sugar() - eslVersion, err := cutoff.DBReadCutoff(dbHandler, ctx, transaction) + eslVersion, err := db.DBReadCutoff(dbHandler, ctx, transaction) if err != nil { return nil, nil, fmt.Errorf("error in DBReadCutoff %v", err) }