Skip to content

Commit

Permalink
chore(db): queue application version transformer migration to database (
Browse files Browse the repository at this point in the history
  • Loading branch information
miguel-crespo-fdc authored Jun 18, 2024
1 parent 93002a4 commit d8edd5a
Show file tree
Hide file tree
Showing 10 changed files with 757 additions and 97 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,9 @@
CREATE TABLE IF NOT EXISTS deployment_attempts
(
eslVersion INTEGER, -- internal ID for ESL
created TIMESTAMP,
envName VARCHAR,
appName VARCHAR,
queuedReleaseVersion BIGINT NULL,
PRIMARY KEY(eslVersion, appName, envName)
);
266 changes: 265 additions & 1 deletion pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -901,10 +901,12 @@ type GetAllAppLocksFun = func(ctx context.Context) (AllAppLocks, error)

type AllAppLocks map[string]map[string][]ApplicationLock // EnvName-> AppName -> []Locks
type AllTeamLocks map[string]map[string][]TeamLock // EnvName-> Team -> []Locks
type AllQueuedVersions map[string]map[string]*int64 // EnvName-> AppName -> queuedVersion

type GetAllEnvLocksFun = func(ctx context.Context) (AllEnvLocks, error)
type GetAllTeamLocksFun = func(ctx context.Context) (AllTeamLocks, error)
type GetAllReleasesFun = func(ctx context.Context, app string) (AllReleases, error)
type GetAllQueuedVersionsFun = func(ctx context.Context) (AllQueuedVersions, error)

// GetAllAppsFun returns a map where the Key is an app name, and the value is a team name of that app
type GetAllAppsFun = func() (map[string]string, error)
Expand All @@ -917,6 +919,7 @@ func (h *DBHandler) RunCustomMigrations(
getAllEnvLocksFun GetAllEnvLocksFun,
getAllAppLocksFun GetAllAppLocksFun,
getAllTeamLocksFun GetAllTeamLocksFun,
getAllQueuedVersionsFun GetAllQueuedVersionsFun,
) error {
span, ctx := tracer.StartSpanFromContext(ctx, "RunCustomMigrations")
defer span.Finish()
Expand Down Expand Up @@ -948,6 +951,10 @@ func (h *DBHandler) RunCustomMigrations(
if err != nil {
return err
}
err = h.RunCustomMigrationQueuedApplicationVersions(ctx, getAllQueuedVersionsFun)
if err != nil {
return err
}
return nil
}

Expand Down Expand Up @@ -1495,6 +1502,37 @@ func (h *DBHandler) RunCustomMigrationTeamLocks(ctx context.Context, getAllTeamL
})
}

func (h *DBHandler) RunCustomMigrationQueuedApplicationVersions(ctx context.Context, getAllQueuedVersionsFun GetAllQueuedVersionsFun) error {
return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error {
l := logger.FromContext(ctx).Sugar()
allTeamLocksDb, err := h.DBSelectAnyDeploymentAttempt(ctx, transaction)
if err != nil {
l.Infof("could not get queued deployments friom database - assuming the manifest repo is correct: %v", err)
allTeamLocksDb = nil
}
if allTeamLocksDb != nil {
l.Infof("There are already queued deployments in the DB - skipping migrations")
return nil
}

allQueuedVersionsInRepo, err := getAllQueuedVersionsFun(ctx)
if err != nil {
return fmt.Errorf("could not get current queued versions to run custom migrations: %v", err)
}

for envName, apps := range allQueuedVersionsInRepo {
for appName, v := range apps {
err := h.DBWriteDeploymentAttempt(ctx, transaction, envName, appName, v)
if err != nil {
return fmt.Errorf("error writing existing queued application version '%d' to DB for app '%s' on environment '%s': %v",
*v, appName, envName, err)
}
}
}
return nil
})
}

func (h *DBHandler) RunCustomMigrationAllAppsTable(ctx context.Context, getAllAppsFun GetAllAppsFun) error {
return h.WithTransaction(ctx, func(ctx context.Context, transaction *sql.Tx) error {
l := logger.FromContext(ctx).Sugar()
Expand Down Expand Up @@ -2817,7 +2855,6 @@ func (h *DBHandler) DBSelectTeamLock(ctx context.Context, tx *sql.Tx, environmen
}
return nil, nil // no rows, but also no error
}

func (h *DBHandler) DBSelectAllTeamLocks(ctx context.Context, tx *sql.Tx, environment, teamName string) (*AllTeamLocksGo, error) {
if h == nil {
return nil, nil
Expand Down Expand Up @@ -3081,3 +3118,230 @@ func (h *DBHandler) processAllTeamLocksRow(ctx context.Context, err error, rows
}
return result, nil
}

type QueuedDeployment struct {
EslVersion EslId
Created time.Time
Env string
App string
Version *int64
}

func (h *DBHandler) DBSelectAnyDeploymentAttempt(ctx context.Context, tx *sql.Tx) (*QueuedDeployment, error) {
if h == nil {
return nil, nil
}
if tx == nil {
return nil, fmt.Errorf("DBSelectAnyDeploymentAttempt: no transaction provided")
}
span, _ := tracer.StartSpanFromContext(ctx, "DBSelectAnyDeploymentAttempt")
defer span.Finish()

insertQuery := h.AdaptQuery(
"SELECT eslVersion, created, envName, appName, queuedReleaseVersion FROM deployment_attempts ORDER BY eslVersion DESC LIMIT 1;")

span.SetTag("query", insertQuery)
rows, err := tx.QueryContext(
ctx,
insertQuery)
return h.processDeploymentAttemptsRow(ctx, rows, err)
}

func (h *DBHandler) DBSelectDeploymentAttemptHistory(ctx context.Context, tx *sql.Tx, environmentName, appName string, limit int) ([]QueuedDeployment, error) {
if h == nil {
return nil, nil
}
if tx == nil {
return nil, fmt.Errorf("DBSelectDeploymentAttemptHistory: no transaction provided")
}
span, _ := tracer.StartSpanFromContext(ctx, "DBSelectDeploymentAttemptHistory")
defer span.Finish()

insertQuery := h.AdaptQuery(
"SELECT eslVersion, created, envName, appName, queuedReleaseVersion FROM deployment_attempts WHERE envName=? AND appName=? ORDER BY eslVersion DESC LIMIT ?;")

span.SetTag("query", insertQuery)
rows, err := tx.QueryContext(
ctx,
insertQuery,
environmentName,
appName, limit)

if err != nil {
return nil, fmt.Errorf("could not query deployment attempts table from DB. Error: %w\n", err)
}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
logger.FromContext(ctx).Sugar().Warnf("row closing error: %v", err)
}
}(rows)

queuedDeployments := make([]QueuedDeployment, 0)
for rows.Next() {
row, err := h.processSingleDeploymentAttemptsRow(rows)
if err != nil {
return nil, err
}
queuedDeployments = append(queuedDeployments, QueuedDeployment{
EslVersion: row.EslVersion,
Created: row.Created,
Env: row.Env,
App: row.App,
Version: row.Version,
})
}
err = closeRows(rows)
if err != nil {
return nil, err
}
return queuedDeployments, nil
}

func (h *DBHandler) DBSelectLatestDeploymentAttempt(ctx context.Context, tx *sql.Tx, environmentName, appName string) (*QueuedDeployment, error) {
if h == nil {
return nil, nil
}
if tx == nil {
return nil, fmt.Errorf("DBSelectLatestDeploymentAttempt: no transaction provided")
}
span, _ := tracer.StartSpanFromContext(ctx, "DBSelectLatestDeploymentAttempt")
defer span.Finish()

insertQuery := h.AdaptQuery(
"SELECT eslVersion, created, envName, appName, queuedReleaseVersion FROM deployment_attempts WHERE envName=? AND appName=? ORDER BY eslVersion DESC LIMIT 1;")

span.SetTag("query", insertQuery)
rows, err := tx.QueryContext(
ctx,
insertQuery,
environmentName,
appName)
return h.processDeploymentAttemptsRow(ctx, rows, err)
}

func (h *DBHandler) DBWriteDeploymentAttempt(ctx context.Context, tx *sql.Tx, envName, appName string, version *int64) error {
if h == nil {
return nil
}
if tx == nil {
return fmt.Errorf("DBWriteDeploymentAttempt: no transaction provided")
}
span, _ := tracer.StartSpanFromContext(ctx, "DBWriteDeploymentAttempt")
defer span.Finish()
return h.dbWriteDeploymentAttemptInternal(ctx, tx, &QueuedDeployment{
EslVersion: 0,
Created: time.Time{},
Env: envName,
App: appName,
Version: version,
})
}

func (h *DBHandler) DBDeleteDeploymentAttempt(ctx context.Context, tx *sql.Tx, envName, appName string) error {
if h == nil {
return nil
}
if tx == nil {
return fmt.Errorf("DBDeleteDeploymentAttempt: no transaction provided")
}
span, _ := tracer.StartSpanFromContext(ctx, "DBWriteDeploymentAttempt")
defer span.Finish()

return h.dbWriteDeploymentAttemptInternal(ctx, tx, &QueuedDeployment{
EslVersion: 0,
Created: time.Time{},
Env: envName,
App: appName,
Version: nil,
})
}

func (h *DBHandler) dbWriteDeploymentAttemptInternal(ctx context.Context, tx *sql.Tx, deployment *QueuedDeployment) error {
if h == nil {
return nil
}
if tx == nil {
return fmt.Errorf("dbWriteDeploymentAttemptInternal: no transaction provided")
}
span, _ := tracer.StartSpanFromContext(ctx, "dbWriteDeploymentAttemptInternal")
defer span.Finish()

latestDeployment, err := h.DBSelectLatestDeploymentAttempt(ctx, tx, deployment.Env, deployment.App)

if err != nil {
return fmt.Errorf("Could not get latest deployment attempt from deployments table")
}
var previousEslVersion EslId

if latestDeployment == nil {
previousEslVersion = 0
} else {
previousEslVersion = latestDeployment.EslVersion
}
nullVersion := NewNullInt(deployment.Version)

insertQuery := h.AdaptQuery(
"INSERT INTO deployment_attempts (eslVersion, created, envName, appName, queuedReleaseVersion) VALUES (?, ?, ?, ?, ?);")

span.SetTag("query", insertQuery)
_, err = tx.Exec(
insertQuery,
previousEslVersion+1,
time.Now(),
deployment.Env,
deployment.App,
nullVersion)

if err != nil {
return fmt.Errorf("could not write deployment attempts table in DB. Error: %w\n", err)
}
return nil
}

func (h *DBHandler) processDeploymentAttemptsRow(ctx context.Context, rows *sql.Rows, err error) (*QueuedDeployment, error) {
if err != nil {
return nil, fmt.Errorf("could not query deployment attempts table from DB. Error: %w\n", err)

}
defer func(rows *sql.Rows) {
err := rows.Close()
if err != nil {
logger.FromContext(ctx).Sugar().Warnf("row closing error: %v", err)
}
}(rows)
var row *QueuedDeployment
if rows.Next() {
row, err = h.processSingleDeploymentAttemptsRow(rows)
if err != nil {
return nil, err
}
}
err = closeRows(rows)
if err != nil {
return nil, err
}
return row, nil
}

// processSingleDeploymentAttemptsRow only processes the row. It assumes that there is an element ready to be processed in rows.
func (h *DBHandler) processSingleDeploymentAttemptsRow(rows *sql.Rows) (*QueuedDeployment, error) {
//exhaustruct:ignore

var row = QueuedDeployment{}
var releaseVersion sql.NullInt64

err := rows.Scan(&row.EslVersion, &row.Created, &row.Env, &row.App, &releaseVersion)
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("Error scanning deployment attempts row from DB. Error: %w\n", err)
}

if releaseVersion.Valid {
row.Version = &releaseVersion.Int64
}
return &row, nil

}
Loading

0 comments on commit d8edd5a

Please sign in to comment.