diff --git a/controllers/resources/processor.go b/controllers/resources/processor.go index e5fdcc1b..10726f69 100644 --- a/controllers/resources/processor.go +++ b/controllers/resources/processor.go @@ -79,6 +79,7 @@ func (r *Processor) HandlePruning(ctx context.Context, current, desired sets.Set return updateStatus } +//nolint:gocognit func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant, allowCrossNamespaceSelection bool, tenantLabel string, resourceIndex int, spec capsulev1beta2.ResourceSpec) ([]string, error) { log := ctrllog.FromContext(ctx) @@ -125,7 +126,6 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant objLabels[Label] = fmt.Sprintf("%d", resourceIndex) objLabels[tenantLabel] = tnt.GetName() - // processed will contain the sets of resources replicated, both for the raw and the Namespaced ones: // these are required to perform a final pruning once the replication has been occurred. processed := sets.NewString() @@ -173,17 +173,20 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant var wg sync.WaitGroup errorsChan := make(chan error, len(objs.Items)) - + // processedRaw is used to avoid concurrent map writes during iteration of namespaced items: + // the objects will be then added to processed variable if the resulting string is not empty, + // meaning it has been processed correctly. + processedRaw := make([]string, len(objs.Items)) // Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces: // in case of error during the create or update function, this will be appended to the list of errors. - for _, o := range objs.Items { + for i, o := range objs.Items { obj := o obj.SetNamespace(ns.Name) obj.SetOwnerReferences(nil) wg.Add(1) - go func(obj unstructured.Unstructured) { + go func(index int, obj unstructured.Unstructured) { defer wg.Done() kv := keysAndValues @@ -204,8 +207,8 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant replicatedItem.Namespace = ns.Name replicatedItem.APIVersion = obj.GetAPIVersion() - processed.Insert(replicatedItem.String()) - }(obj) + processedRaw[index] = replicatedItem.String() + }(i, obj) } wg.Wait() @@ -216,6 +219,14 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant syncErr = errors.Join(syncErr, err) } } + + for _, p := range processedRaw { + if p == "" { + continue + } + + processed.Insert(p) + } } for rawIndex, item := range spec.RawItems {