Skip to content

Commit

Permalink
fix: introduce TransformerBatchApplyError ... (#1402)
Browse files Browse the repository at this point in the history
- to allow to forward, which transformer of a batch failed
- use it in ApplyTransformers and ApplyTransformersInternal
- remove grcp errors in low level repository: grpc errors should only be
  created on a high level at the endpoint
- try to remove (now?) rather pointless internal error type
- close the result channel and ensure only one error or nil is send
- rename too generic type element to transformerBatchrename, and
applyElement to applyTransformerBatches
  • Loading branch information
bjoern-michaelsen-freiheit authored Mar 7, 2024
1 parent 7208187 commit 7d5332c
Show file tree
Hide file tree
Showing 8 changed files with 190 additions and 174 deletions.
18 changes: 0 additions & 18 deletions services/cd-service/pkg/repository/error.go
Original file line number Diff line number Diff line change
Expand Up @@ -98,24 +98,6 @@ func GetCreateReleaseAppNameTooLong(appName string, regExp string, maxLen uint32
}
}

type InternalError struct {
inner error
}

func (i *InternalError) String() string {
return fmt.Sprintf("repository internal: %s", i.inner)
}

func (i *InternalError) Unwrap() error {
return i.inner
}

func (i *InternalError) Error() string {
return i.String()
}

var _ error = (*InternalError)(nil)

type LockedError struct {
EnvironmentApplicationLocks map[string]Lock
EnvironmentLocks map[string]Lock
Expand Down
12 changes: 6 additions & 6 deletions services/cd-service/pkg/repository/queue.go
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ package repository
/**
This queue contains transformers. Do not confuse with the "queuedVersion" field in protobuf (api.proto).
The queue here is used because applying a change to git (pushing) takes some time.
Still, every request waits for the transformer AND push to finish (that's what the `result` channel is for in the "element struct" below).
Still, every request waits for the transformer AND push to finish (that's what the `result` channel is for in the "transformerBatch struct" below).
This queue improves the throughput when there are many parallel requests, because the "push" operation is done only once for multiple requests (a request here is essentially the same as a transformer).
Many parallel requests can happen in a CI with many microservices that all call the "release" endpoint almost at the same time.
This queue does not improve the latency, because each request still waits for the push to finish.
Expand All @@ -30,24 +30,24 @@ import (
)

type queue struct {
elements chan element
transformerBatches chan transformerBatch
}

type element struct {
type transformerBatch struct {
ctx context.Context
transformers []Transformer
result chan error
}

func (q *queue) add(ctx context.Context, transformers []Transformer) <-chan error {
resultChannel := make(chan error, 1)
e := element{
e := transformerBatch{
ctx: ctx,
transformers: transformers,
result: resultChannel,
}
select {
case q.elements <- e:
case q.transformerBatches <- e:
return resultChannel
case <-ctx.Done():
resultChannel <- ctx.Err()
Expand All @@ -57,6 +57,6 @@ func (q *queue) add(ctx context.Context, transformers []Transformer) <-chan erro

func makeQueue() queue {
return queue{
elements: make(chan element, 5),
transformerBatches: make(chan transformerBatch, 5),
}
}
134 changes: 82 additions & 52 deletions services/cd-service/pkg/repository/repository.go
Original file line number Diff line number Diff line change
Expand Up @@ -64,12 +64,27 @@ import (
type Repository interface {
Apply(ctx context.Context, transformers ...Transformer) error
Push(ctx context.Context, pushAction func() error) error
ApplyTransformersInternal(ctx context.Context, transformers ...Transformer) ([]string, *State, []*TransformerResult, error)
ApplyTransformersInternal(ctx context.Context, transformers ...Transformer) ([]string, *State, []*TransformerResult, *TransformerBatchApplyError)
State() *State
StateAt(oid *git.Oid) (*State, error)
Notify() *notify.Notify
}

type TransformerBatchApplyError struct {
TransformerError error // the error that caused the batch to fail. nil if no error happened
Index int // the index of the transformer that caused the batch to fail or -1 if the error happened outside one specific transformer
}

func (err *TransformerBatchApplyError) Error() string {
if err == nil {
return "no transformer error!"
}
if err.Index < 0 {
return fmt.Sprintf("error not specific to one transformer of this batch: %s", err.TransformerError.Error())
}
return fmt.Sprintf("error at index %d of transformer batch: %s", err.Index, err.TransformerError.Error())
}

func defaultBackOffProvider() backoff.BackOff {
eb := backoff.NewExponentialBackOff()
eb.MaxElapsedTime = 7 * time.Second
Expand Down Expand Up @@ -415,9 +430,10 @@ func New2(ctx context.Context, cfg RepositoryConfig) (Repository, setup.Backgrou

func (r *repository) ProcessQueue(ctx context.Context, health *setup.HealthReporter) error {
defer func() {
close(r.queue.elements)
for e := range r.queue.elements {
close(r.queue.transformerBatches)
for e := range r.queue.transformerBatches {
e.result <- ctx.Err()
close(e.result)
}
}()
tick := time.Tick(r.config.NetworkTimeout)
Expand All @@ -439,38 +455,39 @@ func (r *repository) ProcessQueue(ctx context.Context, health *setup.HealthRepor
// this triggers a for loop every `NetworkTimeout` to refresh the readiness
case <-ctx.Done():
return nil
case e := <-r.queue.elements:
case e := <-r.queue.transformerBatches:
r.ProcessQueueOnce(ctx, e, defaultPushUpdate, DefaultPushActionCallback)
}
}
}

func (r *repository) applyElements(elements []element, allowFetchAndReset bool) ([]element, error, *TransformerResult) {
func (r *repository) applyTransformerBatches(transformerBatches []transformerBatch, allowFetchAndReset bool) ([]transformerBatch, error, *TransformerResult) {
//exhaustruct:ignore
var changes = &TransformerResult{}
for i := 0; i < len(elements); {
e := elements[i]
for i := 0; i < len(transformerBatches); {
e := transformerBatches[i]
subChanges, applyErr := r.ApplyTransformers(e.ctx, e.transformers...)
changes.Combine(subChanges)
if applyErr != nil {
if errors.Is(applyErr, InvalidJson) && allowFetchAndReset {
if errors.Is(applyErr.TransformerError, InvalidJson) && allowFetchAndReset {
// Invalid state. fetch and reset and redo
err := r.FetchAndReset(e.ctx)
if err != nil {
return elements, err, nil
return transformerBatches, err, nil
}
return r.applyElements(elements, false)
return r.applyTransformerBatches(transformerBatches, false)
} else {
e.result <- applyErr
// here, we keep all elements "behind i".
// these are the elements that have not been applied yet
elements = append(elements[:i], elements[i+1:]...)
close(e.result)
// here, we keep all transformerBatches "behind i".
// these are the transformerBatches that have not been applied yet
transformerBatches = append(transformerBatches[:i], transformerBatches[i+1:]...)
}
} else {
i++
}
}
return elements, nil, changes
return transformerBatches, nil, changes
}

var panicError = errors.New("Panic")
Expand Down Expand Up @@ -498,20 +515,21 @@ func (r *repository) useRemote(ctx context.Context, callback func(*git.Remote) e
}
}

func (r *repository) drainQueue() []element {
elements := []element{}
func (r *repository) drainQueue() []transformerBatch {
transformerBatches := []transformerBatch{}
for {
select {
case f := <-r.queue.elements:
case f := <-r.queue.transformerBatches:
// Check that the item is not already cancelled
select {
case <-f.ctx.Done():
f.result <- f.ctx.Err()
close(f.result)
default:
elements = append(elements, f)
transformerBatches = append(transformerBatches, f)
}
default:
return elements
return transformerBatches
}
}
}
Expand Down Expand Up @@ -541,25 +559,27 @@ func DefaultPushActionCallback(pushOptions git.PushOptions, r *repository) PushA

type PushUpdateFunc func(string, *bool) git.PushUpdateReferenceCallback

func (r *repository) ProcessQueueOnce(ctx context.Context, e element, callback PushUpdateFunc, pushAction PushActionCallbackFunc) {
func (r *repository) ProcessQueueOnce(ctx context.Context, e transformerBatch, callback PushUpdateFunc, pushAction PushActionCallbackFunc) {
logger := logger.FromContext(ctx)
var err error = panicError
elements := []element{e}
defer func() {
for _, el := range elements {
el.result <- err
}
}()
// Check that the first element is not already canceled
transformerBatches := []transformerBatch{e}
// Check that the first transformerBatch is not already canceled
select {
case <-e.ctx.Done():
e.result <- e.ctx.Err()
close(e.result)
return
default:
}
defer func() {
for _, el := range transformerBatches {
el.result <- err
close(el.result)
}
}()

// Try to fetch more items from the queue in order to push more things together
elements = append(elements, r.drainQueue()...)
transformerBatches = append(transformerBatches, r.drainQueue()...)

var pushSuccess = true

Expand All @@ -580,12 +600,12 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e element, callback P
}

// Apply the items
elements, err, changes := r.applyElements(elements, true)
transformerBatches, err, changes := r.applyTransformerBatches(transformerBatches, true)
if err != nil {
return
}

if len(elements) == 0 {
if len(transformerBatches) == 0 {
return
}

Expand All @@ -600,12 +620,12 @@ func (r *repository) ProcessQueueOnce(ctx context.Context, e element, callback P
return
}
// Apply the items
elements, err, changes = r.applyElements(elements, false)
if err != nil || len(elements) == 0 {
transformerBatches, err, changes = r.applyTransformerBatches(transformerBatches, false)
if err != nil || len(transformerBatches) == 0 {
return
}
if pushErr := r.Push(e.ctx, pushAction(pushOptions, r)); pushErr != nil {
err = &InternalError{inner: pushErr}
err = pushErr
}
} else if errors.Is(err, context.DeadlineExceeded) || errors.Is(err, context.Canceled) {
err = grpc.CanceledError(ctx, err)
Expand Down Expand Up @@ -835,16 +855,20 @@ type ArgoWebhookData struct {
Commits []commit
}

func (r *repository) ApplyTransformersInternal(ctx context.Context, transformers ...Transformer) ([]string, *State, []*TransformerResult, error) {
func (r *repository) ApplyTransformersInternal(ctx context.Context, transformers ...Transformer) ([]string, *State, []*TransformerResult, *TransformerBatchApplyError) {
if state, err := r.StateAt(nil); err != nil {
return nil, nil, nil, grpc.InternalError(ctx, fmt.Errorf("%s: %w", "failure in StateAt", err))
return nil, nil, nil, &TransformerBatchApplyError{TransformerError: fmt.Errorf("%s: %w", "failure in StateAt", err), Index: -1}
} else {
var changes []*TransformerResult = nil
commitMsg := []string{}
ctxWithTime := WithTimeNow(ctx, time.Now())
for _, t := range transformers {
for i, t := range transformers {
if msg, subChanges, err := RunTransformer(ctxWithTime, t, state); err != nil {
return nil, nil, nil, err
applyErr := TransformerBatchApplyError{
TransformerError: err,
Index: i,
}
return nil, nil, nil, &applyErr
} else {
commitMsg = append(commitMsg, msg)
changes = append(changes, subChanges)
Expand Down Expand Up @@ -916,29 +940,32 @@ func CombineArray(others []*TransformerResult) *TransformerResult {
return r
}

func (r *repository) ApplyTransformers(ctx context.Context, transformers ...Transformer) (*TransformerResult, error) {
commitMsg, state, changes, err := r.ApplyTransformersInternal(ctx, transformers...)
if err != nil {
return nil, err
func (r *repository) ApplyTransformers(ctx context.Context, transformers ...Transformer) (*TransformerResult, *TransformerBatchApplyError) {
commitMsg, state, changes, applyErr := r.ApplyTransformersInternal(ctx, transformers...)
if applyErr != nil {
return nil, applyErr
}
if err := r.afterTransform(ctx, *state); err != nil {
return nil, grpc.InternalError(ctx, fmt.Errorf("%s: %w", "failure in afterTransform", err))
return nil, &TransformerBatchApplyError{TransformerError: fmt.Errorf("%s: %w", "failure in afterTransform", err), Index: -1}
}

treeId, err := state.Filesystem.(*fs.TreeBuilderFS).Insert()
if err != nil {
return nil, &InternalError{inner: err}
treeId, insertError := state.Filesystem.(*fs.TreeBuilderFS).Insert()
if insertError != nil {
return nil, &TransformerBatchApplyError{TransformerError: insertError, Index: -1}
}
committer := &git.Signature{
Name: r.config.CommitterName,
Email: r.config.CommitterEmail,
When: time.Now(),
}

user, err := auth.ReadUserFromContext(ctx)
user, readUserErr := auth.ReadUserFromContext(ctx)

if err != nil {
return nil, err
if readUserErr != nil {
return nil, &TransformerBatchApplyError{
TransformerError: readUserErr,
Index: -1,
}
}

author := &git.Signature{
Expand All @@ -954,16 +981,19 @@ func (r *repository) ApplyTransformers(ctx context.Context, transformers ...Tran
}
oldCommitId := rev

newCommitId, err := r.repository.CreateCommitFromIds(
newCommitId, createErr := r.repository.CreateCommitFromIds(
fmt.Sprintf("refs/heads/%s", r.config.Branch),
author,
committer,
strings.Join(commitMsg, "\n"),
treeId,
rev,
)
if err != nil {
return nil, grpc.InternalError(ctx, fmt.Errorf("%s: %w", "createCommitFromIds failed", err))
if createErr != nil {
return nil, &TransformerBatchApplyError{
TransformerError: fmt.Errorf("%s: %w", "createCommitFromIds failed", createErr),
Index: -1,
}
}
result := CombineArray(changes)
result.Commits = &CommitIds{
Expand Down Expand Up @@ -1006,7 +1036,7 @@ func (r *repository) FetchAndReset(ctx context.Context) error {
return remote.Fetch([]string{fetchSpec}, &fetchOptions, "fetching")
})
if err != nil {
return &InternalError{inner: err}
return err
}
var zero git.Oid
var rev *git.Oid = &zero
Expand Down
Loading

0 comments on commit 7d5332c

Please sign in to comment.