Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix(db): write to cutoff table if writeEslTableOnly=true #1893

Merged
merged 3 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ along with kuberpult. If not, see <https://directory.fsf.org/wiki/License:Expat>

Copyright freiheit.com*/

package cutoff
package db

import (
"context"
Expand All @@ -23,12 +23,11 @@ import (
"fmt"
"time"

"github.com/freiheit-com/kuberpult/pkg/db"
"github.com/freiheit-com/kuberpult/pkg/logger"
"gopkg.in/DataDog/dd-trace-go.v1/ddtrace/tracer"
)

func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVersion, error) {
func DBReadCutoff(h *DBHandler, ctx context.Context, tx *sql.Tx) (*EslVersion, error) {
span, _ := tracer.StartSpanFromContext(ctx, "DBReadCutoff")
defer span.Finish()

Expand All @@ -48,8 +47,8 @@ func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVers
}
}(rows)

var eslVersion db.EslVersion
var eslVersionPtr *db.EslVersion = nil
var eslVersion EslVersion
var eslVersionPtr *EslVersion = nil
if rows.Next() {
err := rows.Scan(&eslVersion)
if err != nil {
Expand All @@ -71,7 +70,7 @@ func DBReadCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx) (*db.EslVers
return eslVersionPtr, nil
}

func DBWriteCutoff(h *db.DBHandler, ctx context.Context, tx *sql.Tx, eslVersion db.EslVersion) error {
func DBWriteCutoff(h *DBHandler, ctx context.Context, tx *sql.Tx, eslVersion EslVersion) error {
span, _ := tracer.StartSpanFromContext(ctx, "DBWriteCutoff")
defer span.Finish()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@ along with kuberpult. If not, see <https://directory.fsf.org/wiki/License:Expat>

Copyright freiheit.com*/

package cutoff
package db

import (
"context"
Expand All @@ -23,7 +23,6 @@ import (
"fmt"
"testing"

"github.com/freiheit-com/kuberpult/pkg/db"
"github.com/freiheit-com/kuberpult/pkg/testutil"
"github.com/google/go-cmp/cmp"
)
Expand All @@ -34,17 +33,17 @@ type EmptyTransformer struct{}
func TestTransformerWritesEslDataRoundTrip(t *testing.T) {
tcs := []struct {
Name string
eslVersion []db.EslVersion
ExpectedEslVersion db.EslVersion
eslVersion []EslVersion
ExpectedEslVersion EslVersion
}{
{
Name: "test with one write operation",
eslVersion: []db.EslVersion{1},
eslVersion: []EslVersion{1},
ExpectedEslVersion: 1,
},
{
Name: "test with multiple write operations",
eslVersion: []db.EslVersion{1, 2, 3, 4, 5},
eslVersion: []EslVersion{1, 2, 3, 4, 5},
ExpectedEslVersion: 5,
},
}
Expand All @@ -63,7 +62,7 @@ func TestTransformerWritesEslDataRoundTrip(t *testing.T) {
i := 0
for i < len(tc.eslVersion) {
//Write bogus transformer for FK reasons
err := dbHandler.DBWriteEslEventInternal(ctx, "empty", transaction, interface{}(tf), db.ESLMetadata{})
err := dbHandler.DBWriteEslEventInternal(ctx, "empty", transaction, interface{}(tf), ESLMetadata{})
if err != nil {
return err
}
Expand Down Expand Up @@ -100,29 +99,3 @@ func TestTransformerWritesEslDataRoundTrip(t *testing.T) {
})
}
}

// setupDB returns a new DBHandler with a tmp directory every time, so tests can are completely independent
func setupDB(t *testing.T) *db.DBHandler {
ctx := context.Background()
dir, err := testutil.CreateMigrationsPath(4)
tmpDir := t.TempDir()
t.Logf("directory for DB migrations: %s", dir)
t.Logf("tmp dir for DB data: %s", tmpDir)
cfg := db.DBConfig{
MigrationsPath: dir,
DriverName: "sqlite3",
DbHost: tmpDir,
}

migErr := db.RunDBMigrations(ctx, cfg)
if migErr != nil {
t.Fatal(migErr)
}

dbHandler, err := db.Connect(ctx, cfg)
if err != nil {
t.Fatal(err)
}

return dbHandler
}
19 changes: 19 additions & 0 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -854,6 +854,25 @@ func (r *repository) ApplyTransformersInternal(ctx context.Context, transaction
}
}
t.SetEslVersion(db.TransformerID(internal.EslVersion))

if r.DB != nil && r.DB.WriteEslOnly {
// if we were previously running with `db.writeEslTableOnly=true`, but now are running with
// `db.writeEslTableOnly=false` (which is the recommended way to enable the database),
// then we would have many events in the event_sourcing_light table that have not been processed.
// So, we write the cutoff if we are only writing to the esl table. Then, when the database is fully
// enabled, the cutoff is found and determined to be the latest transformer. When this happens,
// the export service takes over the duties of writing the cutoff

err = db.DBWriteCutoff(r.DB, ctx, transaction, internal.EslVersion)
if err != nil {
applyErr := TransformerBatchApplyError{
TransformerError: err,
Index: i,
}
return nil, nil, nil, &applyErr
}

}
}

if msg, subChanges, err := RunTransformer(ctxWithTime, t, state, transaction); err != nil {
Expand Down
7 changes: 3 additions & 4 deletions services/manifest-repo-export-service/pkg/cmd/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,6 @@ import (
"context"
"database/sql"
"github.com/cenkalti/backoff/v4"
cutoff "github.com/freiheit-com/kuberpult/services/manifest-repo-export-service/pkg/db"
"strconv"
"time"

Expand Down Expand Up @@ -297,7 +296,7 @@ func processEsls(ctx context.Context, repo repository.Repository, dbHandler *db.
if err3 != nil {
return err3
}
return cutoff.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion)
return db.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion)
})
if err2 != nil {
return fmt.Errorf("error in DBWriteFailedEslEvent %v", err2)
Expand All @@ -313,7 +312,7 @@ func processEsls(ctx context.Context, repo repository.Repository, dbHandler *db.
}
log.Infof("event processed successfully, now writing to cutoff and pushing...")
err = dbHandler.WithTransactionR(ctx, 2, false, func(ctx context.Context, transaction *sql.Tx) error {
err2 := cutoff.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion)
err2 := db.DBWriteCutoff(dbHandler, ctx, transaction, esl.EslVersion)
if err2 != nil {
return err2
}
Expand Down Expand Up @@ -354,7 +353,7 @@ func measurePushes(ddMetrics statsd.ClientInterface, log *zap.SugaredLogger, fai

func handleOneEvent(ctx context.Context, transaction *sql.Tx, dbHandler *db.DBHandler, ddMetrics statsd.ClientInterface, repo repository.Repository) (repository.Transformer, *db.EslEventRow, error) {
log := logger.FromContext(ctx).Sugar()
eslVersion, err := cutoff.DBReadCutoff(dbHandler, ctx, transaction)
eslVersion, err := db.DBReadCutoff(dbHandler, ctx, transaction)
if err != nil {
return nil, nil, fmt.Errorf("error in DBReadCutoff %v", err)
}
Expand Down
Loading