Skip to content

Commit

Permalink
fix: concurrent map writes when replicating namespaced objects (#1264)
Browse files Browse the repository at this point in the history
Signed-off-by: Dario Tranchitella <[email protected]>
  • Loading branch information
prometherion authored Dec 4, 2024
1 parent da66f40 commit 8d498bb
Showing 1 changed file with 17 additions and 6 deletions.
23 changes: 17 additions & 6 deletions controllers/resources/processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,6 +79,7 @@ func (r *Processor) HandlePruning(ctx context.Context, current, desired sets.Set
return updateStatus
}

//nolint:gocognit
func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant, allowCrossNamespaceSelection bool, tenantLabel string, resourceIndex int, spec capsulev1beta2.ResourceSpec) ([]string, error) {
log := ctrllog.FromContext(ctx)

Expand Down Expand Up @@ -125,7 +126,6 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant

objLabels[Label] = fmt.Sprintf("%d", resourceIndex)
objLabels[tenantLabel] = tnt.GetName()

// processed will contain the sets of resources replicated, both for the raw and the Namespaced ones:
// these are required to perform a final pruning once the replication has been occurred.
processed := sets.NewString()
Expand Down Expand Up @@ -173,17 +173,20 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant
var wg sync.WaitGroup

errorsChan := make(chan error, len(objs.Items))

// processedRaw is used to avoid concurrent map writes during iteration of namespaced items:
// the objects will be then added to processed variable if the resulting string is not empty,
// meaning it has been processed correctly.
processedRaw := make([]string, len(objs.Items))
// Iterating over all the retrieved objects from the resource spec to get replicated in all the selected Namespaces:
// in case of error during the create or update function, this will be appended to the list of errors.
for _, o := range objs.Items {
for i, o := range objs.Items {
obj := o
obj.SetNamespace(ns.Name)
obj.SetOwnerReferences(nil)

wg.Add(1)

go func(obj unstructured.Unstructured) {
go func(index int, obj unstructured.Unstructured) {
defer wg.Done()

kv := keysAndValues
Expand All @@ -204,8 +207,8 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant
replicatedItem.Namespace = ns.Name
replicatedItem.APIVersion = obj.GetAPIVersion()

processed.Insert(replicatedItem.String())
}(obj)
processedRaw[index] = replicatedItem.String()
}(i, obj)
}

wg.Wait()
Expand All @@ -216,6 +219,14 @@ func (r *Processor) HandleSection(ctx context.Context, tnt capsulev1beta2.Tenant
syncErr = errors.Join(syncErr, err)
}
}

for _, p := range processedRaw {
if p == "" {
continue
}

processed.Insert(p)
}
}

for rawIndex, item := range spec.RawItems {
Expand Down

0 comments on commit 8d498bb

Please sign in to comment.