Skip to content

Commit

Permalink
Merge branch 'main' into dn/react_warnings_ci
Browse files Browse the repository at this point in the history
  • Loading branch information
diogo-nogueira-freiheit authored Oct 10, 2024
2 parents 15cb04a + 701af31 commit 055e3c5
Show file tree
Hide file tree
Showing 6 changed files with 165 additions and 32 deletions.
100 changes: 87 additions & 13 deletions pkg/db/db.go
Original file line number Diff line number Diff line change
Expand Up @@ -555,7 +555,7 @@ type DBAllReleasesWithMetaData struct {

func (h *DBHandler) DBSelectAnyRelease(ctx context.Context, tx *sql.Tx, ignorePrepublishes bool) (*DBReleaseWithMetaData, error) {
selectQuery := h.AdaptQuery(fmt.Sprintf(
"SELECT eslVersion, created, appName, metadata, manifests, releaseVersion, deleted, environments " +
"SELECT eslVersion, created, appName, metadata, releaseVersion, deleted, environments " +
" FROM releases " +
" LIMIT 1;"))
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectAnyRelease")
Expand All @@ -566,7 +566,7 @@ func (h *DBHandler) DBSelectAnyRelease(ctx context.Context, tx *sql.Tx, ignorePr
ctx,
selectQuery,
)
processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrepublishes)
processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrepublishes, false)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -608,13 +608,79 @@ func (h *DBHandler) DBSelectReleasesWithoutEnvironments(ctx context.Context, tx
LIMIT 100;
`)
rows, err := tx.QueryContext(ctx, selectQuery)
processedRows, err := h.processReleaseRows(ctx, err, rows, true)
processedRows, err := h.processReleaseRows(ctx, err, rows, true, true)
if err != nil {
return nil, err
}
return processedRows, nil
}

func (h *DBHandler) DBSelectReleasesByVersions(ctx context.Context, tx *sql.Tx, app string, releaseVersions []uint64, ignorePrePublishes bool) ([]*DBReleaseWithMetaData, error) {
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectReleasesByVersions")
defer span.Finish()

if len(releaseVersions) == 0 {
return []*DBReleaseWithMetaData{}, nil
}
repeatedQuestionMarks := strings.Repeat(",?", len(releaseVersions)-1)
selectQuery := h.AdaptQuery(`
SELECT DISTINCT
releases.eslVersion,
releases.created,
releases.appName,
releases.metadata,
releases.releaseVersion,
releases.deleted,
releases.environments
FROM
( SELECT
MAX(eslVersion) AS latestEslVersion,
appname,
releaseversion
FROM
"releases"
WHERE
appname=?
AND
releaseversion IN (?` + repeatedQuestionMarks + `)
GROUP BY
appname, releaseversion
) AS latest
JOIN
releases AS releases
ON
latest.latestEslVersion=releases.eslVersion
AND latest.releaseVersion=releases.releaseVersion
AND latest.appname=releases.appname
LIMIT ?
`)

span.SetTag("query", selectQuery)

args := []any{}
args = append(args, app)
for _, version := range releaseVersions {
args = append(args, version)
}
args = append(args, uint64(len(releaseVersions)))

rows, err := tx.QueryContext(
ctx,
selectQuery,
args...,
)

processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrePublishes, false)
if err != nil {
return nil, err
}
if len(processedRows) == 0 {
return nil, nil
}
return processedRows, nil
}

func (h *DBHandler) DBSelectReleaseByVersion(ctx context.Context, tx *sql.Tx, app string, releaseVersion uint64, ignorePrepublishes bool) (*DBReleaseWithMetaData, error) {
selectQuery := h.AdaptQuery(fmt.Sprintf(
"SELECT eslVersion, created, appName, metadata, manifests, releaseVersion, deleted, environments " +
Expand All @@ -632,7 +698,7 @@ func (h *DBHandler) DBSelectReleaseByVersion(ctx context.Context, tx *sql.Tx, ap
releaseVersion,
)

processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrepublishes)
processedRows, err := h.processReleaseRows(ctx, err, rows, ignorePrepublishes, true)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -737,14 +803,14 @@ func (h *DBHandler) DBSelectReleasesByApp(ctx context.Context, tx *sql.Tx, app s
deleted,
)

return h.processReleaseRows(ctx, err, rows, ignorePrepublishes)
return h.processReleaseRows(ctx, err, rows, ignorePrepublishes, true)
}

func (h *DBHandler) DBSelectReleasesByAppLatestEslVersion(ctx context.Context, tx *sql.Tx, app string, deleted bool, ignorePrepublishes bool) ([]*DBReleaseWithMetaData, error) {
span, ctx := tracer.StartSpanFromContext(ctx, "DBSelectReleasesByApp")
defer span.Finish()
selectQuery := h.AdaptQuery(fmt.Sprintf(
"SELECT eslVersion, created, appName, metadata, manifests, releaseVersion, deleted, environments " +
"SELECT eslVersion, created, appName, metadata, releaseVersion, deleted, environments " +
" FROM releases " +
" WHERE appName=? AND deleted=?" +
" ORDER BY eslVersion DESC, releaseVersion DESC, created DESC;"))
Expand All @@ -756,7 +822,7 @@ func (h *DBHandler) DBSelectReleasesByAppLatestEslVersion(ctx context.Context, t
deleted,
)

return h.processReleaseRows(ctx, err, rows, ignorePrepublishes)
return h.processReleaseRows(ctx, err, rows, ignorePrepublishes, false)
}

func (h *DBHandler) DBSelectAllReleasesOfApp(ctx context.Context, tx *sql.Tx, app string) (*DBAllReleasesWithMetaData, error) {
Expand Down Expand Up @@ -887,7 +953,8 @@ func (h *DBHandler) processAllReleasesRow(ctx context.Context, err error, rows *
}
return row, nil
}
func (h *DBHandler) processReleaseRows(ctx context.Context, err error, rows *sql.Rows, ignorePrepublishes bool) ([]*DBReleaseWithMetaData, error) {

func (h *DBHandler) processReleaseRows(ctx context.Context, err error, rows *sql.Rows, ignorePrepublishes bool, withManifests bool) ([]*DBReleaseWithMetaData, error) {
span, ctx := tracer.StartSpanFromContext(ctx, "processReleaseRows")
defer span.Finish()

Expand All @@ -909,12 +976,17 @@ func (h *DBHandler) processReleaseRows(ctx context.Context, err error, rows *sql
var metadataStr string
var manifestStr string
var environmentsStr sql.NullString
err := rows.Scan(&row.EslVersion, &row.Created, &row.App, &metadataStr, &manifestStr, &row.ReleaseNumber, &row.Deleted, &environmentsStr)
var err error
if withManifests {
err = rows.Scan(&row.EslVersion, &row.Created, &row.App, &metadataStr, &manifestStr, &row.ReleaseNumber, &row.Deleted, &environmentsStr)
} else {
err = rows.Scan(&row.EslVersion, &row.Created, &row.App, &metadataStr /*manifests*/, &row.ReleaseNumber, &row.Deleted, &environmentsStr)
}
if err != nil {
if errors.Is(err, sql.ErrNoRows) {
return nil, nil
}
return nil, fmt.Errorf("Error scanning releases row from DB. Error: %w\n", err)
return nil, fmt.Errorf("Error scanning releases row from DB withManifests=%v. Error: %w\n", withManifests, err)
}
if row.ReleaseNumber != lastSeenRelease {
lastSeenRelease = row.ReleaseNumber
Expand Down Expand Up @@ -942,9 +1014,11 @@ func (h *DBHandler) processReleaseRows(ctx context.Context, err error, rows *sql
var manifestData = DBReleaseManifests{
Manifests: map[string]string{},
}
err = json.Unmarshal(([]byte)(manifestStr), &manifestData)
if err != nil {
return nil, fmt.Errorf("Error during json unmarshal of manifests for releases. Error: %w. Data: %s\n", err, metadataStr)
if withManifests {
err = json.Unmarshal(([]byte)(manifestStr), &manifestData)
if err != nil {
return nil, fmt.Errorf("Error during json unmarshal of manifests for releases. Error: %w. Data: %s\n", err, metadataStr)
}
}
row.Manifests = manifestData
environments := make([]string, 0)
Expand Down
47 changes: 41 additions & 6 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -2512,16 +2512,15 @@ func (s *State) UpdateTopLevelAppInOverview(ctx context.Context, transaction *sq
}
rels = retrievedReleasesOfApp
}
for _, id := range rels {
if rel, err := s.GetApplicationRelease(ctx, transaction, appName, id); err != nil {
return err
} else {

if releasesInDb, err := s.GetApplicationReleasesDB(ctx, transaction, appName, rels); err != nil {
return err
} else {
for _, rel := range releasesInDb {
if rel == nil {
// ignore
} else {
release := rel.ToProto()
release.Version = id
release.UndeployVersion = rel.UndeployVersion
app.Releases = append(app.Releases, release)
}
}
Expand Down Expand Up @@ -3119,6 +3118,42 @@ func (s *State) IsUndeployVersionFromManifest(application string, version uint64
return true, nil
}

func (s *State) GetApplicationReleasesDB(ctx context.Context, transaction *sql.Tx, application string, versions []uint64) ([]*Release, error) {
var result []*Release
if s.DBHandler.ShouldUseOtherTables() {
rels, err := s.DBHandler.DBSelectReleasesByVersions(ctx, transaction, application, versions, true)
if err != nil {
return nil, fmt.Errorf("could not get release of app %s: %v", application, err)
}
if rels == nil {
return nil, nil
}
for _, rel := range rels {
r := &Release{
Version: rel.ReleaseNumber,
UndeployVersion: rel.Metadata.UndeployVersion,
SourceAuthor: rel.Metadata.SourceAuthor,
SourceCommitId: rel.Metadata.SourceCommitId,
SourceMessage: rel.Metadata.SourceMessage,
CreatedAt: rel.Created,
DisplayVersion: rel.Metadata.DisplayVersion,
IsMinor: rel.Metadata.IsMinor,
IsPrepublish: rel.Metadata.IsPrepublish,
}
result = append(result, r)
}
} else {
for i, v := range versions {
rel, err := s.GetApplicationRelease(ctx, transaction, application, v)
if err != nil {
return nil, fmt.Errorf("could not get release of app %s at index %d for version %v: %v", application, i, v, err)
}
result = append(result, rel)
}
}
return result, nil
}

func (s *State) GetApplicationRelease(ctx context.Context, transaction *sql.Tx, application string, version uint64) (*Release, error) {
if s.DBHandler.ShouldUseOtherTables() {
env, err := s.DBHandler.DBSelectReleaseByVersion(ctx, transaction, application, version, true)
Expand Down
24 changes: 15 additions & 9 deletions services/cd-service/pkg/repository/transformer.go
Original file line number Diff line number Diff line change
Expand Up @@ -3181,15 +3181,21 @@ func (c *DeployApplicationVersion) Transform(
} else {
ev := createReplacedByEvent(c.Application, c.Environment, newReleaseCommitId)
if s.DBHandler.ShouldUseOtherTables() {
gen := getGenerator(ctx)
eventUuid := gen.Generate()
oldReleaseCommitId, err := getCommitID(ctx, transaction, state, fs, uint64(*oldVersion), oldReleaseDir, c.Application)
if err != nil {
return "", GetCreateReleaseGeneralFailure(err)
}
err = state.DBHandler.DBWriteReplacedByEvent(ctx, transaction, c.TransformerEslVersion, eventUuid, oldReleaseCommitId, ev)
if err != nil {
return "", err
if oldVersion == nil {
logger.FromContext(ctx).Sugar().Errorf("did not find old version of app %s - skipping replaced-by event", c.Application)
} else {
gen := getGenerator(ctx)
eventUuid := gen.Generate()
v := uint64(*oldVersion)
oldReleaseCommitId, err := getCommitID(ctx, transaction, state, fs, v, oldReleaseDir, c.Application)
if err != nil {
logger.FromContext(ctx).Sugar().Warnf("could not find commit for release %d of app %s - skipping replaced-by event", v, c.Application)
} else {
err = state.DBHandler.DBWriteReplacedByEvent(ctx, transaction, c.TransformerEslVersion, eventUuid, oldReleaseCommitId, ev)
if err != nil {
return "", err
}
}
}
} else {
if err := addEventForRelease(ctx, fs, oldReleaseDir, ev); err != nil {
Expand Down
5 changes: 3 additions & 2 deletions services/cd-service/pkg/service/git_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ Copyright freiheit.com*/
package service

import (
"context"
"fmt"
"sort"
"testing"
Expand Down Expand Up @@ -237,7 +238,7 @@ func TestGetProductOverview(t *testing.T) {
if err != nil {
t.Fatalf("error setting up repository test: %v", err)
}
sv := &GitServer{OverviewService: &OverviewServiceServer{Repository: repo, Shutdown: shutdown}}
sv := &GitServer{OverviewService: &OverviewServiceServer{Repository: repo, Shutdown: shutdown, Context: context.Background()}}

for _, transformer := range tc.Setup {
repo.Apply(testutil.MakeTestContext(), transformer)
Expand Down Expand Up @@ -831,7 +832,7 @@ func TestGetCommitInfo(t *testing.T) {
DBHandler: repo.State().DBHandler,
}
sv := &GitServer{
OverviewService: &OverviewServiceServer{Repository: repo, Shutdown: shutdown},
OverviewService: &OverviewServiceServer{Repository: repo, Shutdown: shutdown, Context: context.Background()},
Config: config,
PageSize: uint64(pageSize),
}
Expand Down
2 changes: 2 additions & 0 deletions services/cd-service/pkg/service/overview_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -640,6 +640,7 @@ func TestOverviewService(t *testing.T) {
svc := &OverviewServiceServer{
Repository: repo,
Shutdown: shutdown,
Context: context.Background(),
}
tc.Test(t, svc)
if tc.DB {
Expand Down Expand Up @@ -972,6 +973,7 @@ func TestOverviewServiceFromCommit(t *testing.T) {
svc := &OverviewServiceServer{
Repository: repo,
Shutdown: shutdown,
Context: context.Background(),
}

ov, err := svc.GetOverview(testutil.MakeTestContext(), &api.GetOverviewRequest{})
Expand Down
19 changes: 17 additions & 2 deletions services/frontend-service/pkg/service/batch.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,12 @@ package service

import (
"context"
"errors"
"time"

api "github.com/freiheit-com/kuberpult/pkg/api/v1"
"github.com/freiheit-com/kuberpult/pkg/logger"
"go.uber.org/zap"
"google.golang.org/grpc"
)

Expand All @@ -32,9 +35,21 @@ type BatchServiceWithDefaultTimeout struct {
func (b *BatchServiceWithDefaultTimeout) ProcessBatch(ctx context.Context, req *api.BatchRequest, options ...grpc.CallOption) (*api.BatchResponse, error) {
var cancel context.CancelFunc
_, hasDeadline := ctx.Deadline()
kuberpultTimeoutError := errors.New("kuberpult batch client timeout exceeded")
if !hasDeadline {
ctx, cancel = context.WithTimeout(ctx, b.DefaultTimeout)
ctx, cancel = context.WithTimeoutCause(ctx, b.DefaultTimeout, kuberpultTimeoutError)
defer cancel()
}
return b.Inner.ProcessBatch(ctx, req, options...)

response, err := b.Inner.ProcessBatch(ctx, req, options...)

if ctx.Err() != nil {
if context.Cause(ctx) == kuberpultTimeoutError {
logger.FromContext(ctx).Warn("Context cancelled due to kuberpult timeout")
} else {
logger.FromContext(ctx).Warn("Context cancelled due %v", zap.Error(context.Cause(ctx)))
}
}

return response, err
}

0 comments on commit 055e3c5

Please sign in to comment.