diff --git a/pkg/apis/workflow/v1alpha1/workflow_types.go b/pkg/apis/workflow/v1alpha1/workflow_types.go index 07f132fcfca6..0ec8817c1d6f 100644 --- a/pkg/apis/workflow/v1alpha1/workflow_types.go +++ b/pkg/apis/workflow/v1alpha1/workflow_types.go @@ -133,6 +133,13 @@ const ( VolumeClaimGCOnSuccess VolumeClaimGCStrategy = "OnWorkflowSuccess" ) +type HoldingNameVersion int + +const ( + HoldingNameV1 HoldingNameVersion = 1 + HoldingNameV2 HoldingNameVersion = 2 +) + // Workflow is the definition of a workflow resource // +genclient // +genclient:noStatus @@ -3782,11 +3789,7 @@ func (ss *SemaphoreStatus) LockWaiting(holderKey, lockKey string, currentHolders func (ss *SemaphoreStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool { i, semaphoreHolding := ss.GetHolding(lockKey) - items := strings.Split(holderKey, "/") - if len(items) == 0 { - return false - } - holdingName := items[len(items)-1] + holdingName := holderKey if i < 0 { ss.Holding = append(ss.Holding, SemaphoreHolding{Semaphore: lockKey, Holders: []string{holdingName}}) return true @@ -3800,11 +3803,8 @@ func (ss *SemaphoreStatus) LockAcquired(holderKey, lockKey string, currentHolder func (ss *SemaphoreStatus) LockReleased(holderKey, lockKey string) bool { i, semaphoreHolding := ss.GetHolding(lockKey) - items := strings.Split(holderKey, "/") - if len(items) == 0 { - return false - } - holdingName := items[len(items)-1] + holdingName := holderKey + if i >= 0 { semaphoreHolding.Holders = slice.RemoveString(semaphoreHolding.Holders, holdingName) ss.Holding[i] = semaphoreHolding @@ -3875,13 +3875,17 @@ func (ms *MutexStatus) LockWaiting(holderKey, lockKey string, currentHolders []s return false } -func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool { - i, mutexHolding := ms.GetHolding(lockKey) +func CheckHolderKeyVersion(holderKey string) HoldingNameVersion { items := strings.Split(holderKey, "/") - if len(items) == 0 { - return false + if len(items) == 2 || len(items) == 3 { + return HoldingNameV2 } - holdingName := items[len(items)-1] + return HoldingNameV1 +} + +func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders []string) bool { + i, mutexHolding := ms.GetHolding(lockKey) + holdingName := holderKey if i < 0 { ms.Holding = append(ms.Holding, MutexHolding{Mutex: lockKey, Holder: holdingName}) return true @@ -3895,11 +3899,7 @@ func (ms *MutexStatus) LockAcquired(holderKey, lockKey string, currentHolders [] func (ms *MutexStatus) LockReleased(holderKey, lockKey string) bool { i, holder := ms.GetHolding(lockKey) - items := strings.Split(holderKey, "/") - if len(items) == 0 { - return false - } - holdingName := items[len(items)-1] + holdingName := holderKey if i >= 0 && holder.Holder == holdingName { ms.Holding = append(ms.Holding[:i], ms.Holding[i+1:]...) return true diff --git a/workflow/controller/operator_concurrency_test.go b/workflow/controller/operator_concurrency_test.go index a7d52a0218c5..d73461dc77fa 100644 --- a/workflow/controller/operator_concurrency_test.go +++ b/workflow/controller/operator_concurrency_test.go @@ -1092,17 +1092,17 @@ spec: configMap: name: cache-example-steps-simple `) + wf.Name = "example-steps-simple-gas12" cancel, controller := newController(wf) defer cancel() ctx := context.Background() - woc := newWorkflowOperationCtx(wf, controller) woc.operate(ctx) holdingJobs := make(map[string]string) for _, node := range woc.wf.Status.Nodes { - holdingJobs[node.ID] = node.DisplayName + holdingJobs[fmt.Sprintf("%s/%s/%s", wf.Namespace, wf.Name, node.ID)] = node.DisplayName } // Check initial status: job-1 acquired the lock diff --git a/workflow/sync/mutex_test.go b/workflow/sync/mutex_test.go index 8551a72841f5..33be3daf52ac 100644 --- a/workflow/sync/mutex_test.go +++ b/workflow/sync/mutex_test.go @@ -107,7 +107,7 @@ status: mutex: holding: - holder: synchronization-wf-level-xxs94 - mutex: default/mutex/test + mutex: default/Mutex/test ` func TestMutexLock(t *testing.T) { @@ -142,7 +142,7 @@ func TestMutexLock(t *testing.T) { assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Mutex) assert.NotNil(t, wf.Status.Synchronization.Mutex.Holding) - assert.Equal(t, wf.Name, wf.Status.Synchronization.Mutex.Holding[0].Holder) + assert.Equal(t, getHolderKey(wf, ""), wf.Status.Synchronization.Mutex.Holding[0].Holder) // Try to acquire again status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "", wf.Spec.Synchronization) @@ -194,7 +194,7 @@ func TestMutexLock(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf2.Status.Synchronization) assert.NotNil(t, wf2.Status.Synchronization.Mutex) - assert.Equal(t, wf2.Name, wf2.Status.Synchronization.Mutex.Holding[0].Holder) + assert.Equal(t, getHolderKey(wf2, ""), wf2.Status.Synchronization.Mutex.Holding[0].Holder) concurrenyMgr.ReleaseAll(wf2) assert.Nil(t, wf2.Status.Synchronization) }) @@ -216,7 +216,8 @@ func TestMutexLock(t *testing.T) { assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Mutex) assert.NotNil(t, wf.Status.Synchronization.Mutex.Holding) - assert.Equal(t, wf.Name, wf.Status.Synchronization.Mutex.Holding[0].Holder) + expected := getHolderKey(wf, "") + assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder) // Try to acquire again status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "", wf.Spec.Synchronization) @@ -271,7 +272,8 @@ func TestMutexLock(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf2.Status.Synchronization) assert.NotNil(t, wf2.Status.Synchronization.Mutex) - assert.Equal(t, wf2.Name, wf2.Status.Synchronization.Mutex.Holding[0].Holder) + expected = getHolderKey(wf2, "") + assert.Equal(t, expected, wf2.Status.Synchronization.Mutex.Holding[0].Holder) concurrenyMgr.ReleaseAll(wf2) assert.Nil(t, wf2.Status.Synchronization) }) @@ -395,7 +397,8 @@ func TestMutexTmplLevel(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Mutex) - assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder) + expected := getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474") + assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder) // Try to acquire again status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482", tmpl.Synchronization) @@ -410,7 +413,8 @@ func TestMutexTmplLevel(t *testing.T) { assert.False(t, wfUpdate) assert.False(t, status) - assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder) + expected = getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474") + assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder) concurrenyMgr.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization) assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Mutex) @@ -423,7 +427,8 @@ func TestMutexTmplLevel(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Mutex) - assert.Equal(t, "synchronization-tmpl-level-mutex-vjcdk-2216915482", wf.Status.Synchronization.Mutex.Holding[0].Holder) + expected = getHolderKey(wf, "synchronization-tmpl-level-mutex-vjcdk-2216915482") + assert.Equal(t, expected, wf.Status.Synchronization.Mutex.Holding[0].Holder) assert.NotEqual(t, "synchronization-tmpl-level-mutex-vjcdk-3941195474", wf.Status.Synchronization.Mutex.Holding[0].Holder) concurrenyMgr.Release(wf, "synchronization-tmpl-level-mutex-vjcdk-3941195474", tmpl.Synchronization) diff --git a/workflow/sync/sync_manager.go b/workflow/sync/sync_manager.go index 6f4c92b87aac..40f874c6b390 100644 --- a/workflow/sync/sync_manager.go +++ b/workflow/sync/sync_manager.go @@ -66,6 +66,77 @@ func (cm *Manager) CheckWorkflowExistence() { } } +func getUpgradedKey(wf *wfv1.Workflow, key string, level SyncLevelType) string { + if wfv1.CheckHolderKeyVersion(key) == wfv1.HoldingNameV1 { + if level == WorkflowLevel { + return getHolderKey(wf, "") + } + return getHolderKey(wf, key) + } + return key +} + +type SyncLevelType int + +const ( + WorkflowLevel SyncLevelType = 1 + TemplateLevel SyncLevelType = 2 + ErrorLevel SyncLevelType = 3 +) + +// HoldingNameV1 keys can be of the form +// x where x is a workflow name +// unfortunately this doesn't differentiate between workflow level keys +// and template level keys. So upgrading is a bit tricky here. + +// given a legacy holding name x, namespace y and workflow name z. +// in the case of a workflow level +// if x != z +// upgradedKey := y/z +// elseif x == z +// upgradedKey := y/z +// in the case of a template level +// if x != z +// upgradedKey := y/z/x +// elif x == z +// upgradedKey := y/z/x + +// there is a possibility that +// a synchronization exists both at the template level +// and at the workflow level -> impossible to upgrade correctly +// due to ambiguity. Currently we just assume workflow level. +func getWorkflowSyncLevelByName(wf *wfv1.Workflow, lockName string) (SyncLevelType, error) { + if wf.Spec.Synchronization != nil { + syncLockName, err := GetLockName(wf.Spec.Synchronization, wf.Namespace) + if err != nil { + return ErrorLevel, err + } + checkName := syncLockName.EncodeName() + if lockName == checkName { + return WorkflowLevel, nil + } + } + + var lastErr error + for _, template := range wf.Spec.Templates { + if template.Synchronization != nil { + syncLockName, err := GetLockName(template.Synchronization, wf.Namespace) + if err != nil { + lastErr = err + continue + } + checkName := syncLockName.EncodeName() + if lockName == checkName { + return TemplateLevel, nil + } + } + } + if lastErr == nil { + lastErr = fmt.Errorf("was unable to determine level for %s", lockName) + } + return ErrorLevel, lastErr +} + func (cm *Manager) Initialize(wfs []wfv1.Workflow) { for _, wf := range wfs { if wf.Status.Synchronization == nil { @@ -86,11 +157,17 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) { } for _, holders := range holding.Holders { - resourceKey := getResourceKey(wf.Namespace, wf.Name, holders) - if semaphore != nil && semaphore.acquire(resourceKey) { - log.Infof("Lock acquired by %s from %s", resourceKey, holding.Semaphore) + level, err := getWorkflowSyncLevelByName(&wf, holding.Semaphore) + if err != nil { + log.Warnf("cannot obtain lock level for '%s' : %v", holding.Semaphore, err) + continue + } + key := getUpgradedKey(&wf, holders, level) + if semaphore != nil && semaphore.acquire(key) { + log.Infof("Lock acquired by %s from %s", key, holding.Semaphore) } } + } } @@ -101,8 +178,13 @@ func (cm *Manager) Initialize(wfs []wfv1.Workflow) { if mutex == nil { mutex := cm.initializeMutex(holding.Mutex) if holding.Holder != "" { - resourceKey := getResourceKey(wf.Namespace, wf.Name, holding.Holder) - mutex.acquire(resourceKey) + level, err := getWorkflowSyncLevelByName(&wf, holding.Mutex) + if err != nil { + log.Warnf("cannot obtain lock level for '%s' : %v", holding.Mutex, err) + continue + } + key := getUpgradedKey(&wf, holding.Holder, level) + mutex.acquire(key) } cm.syncLockMap[holding.Mutex] = mutex } @@ -214,10 +296,9 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool { } for _, holderKey := range holding.Holders { - resourceKey := getResourceKey(wf.Namespace, wf.Name, holderKey) - syncLockHolder.release(resourceKey) + syncLockHolder.release(holderKey) wf.Status.Synchronization.Semaphore.LockReleased(holderKey, holding.Semaphore) - log.Infof("%s released a lock from %s", resourceKey, holding.Semaphore) + log.Infof("%s released a lock from %s", holderKey, holding.Semaphore) } } @@ -227,8 +308,8 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool { if syncLockHolder == nil { continue } - resourceKey := getResourceKey(wf.Namespace, wf.Name, wf.Name) - syncLockHolder.removeFromQueue(resourceKey) + key := getHolderKey(wf, "") + syncLockHolder.removeFromQueue(key) } wf.Status.Synchronization.Semaphore = nil } @@ -240,10 +321,9 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool { continue } - resourceKey := getResourceKey(wf.Namespace, wf.Name, holding.Holder) - syncLockHolder.release(resourceKey) + syncLockHolder.release(holding.Holder) wf.Status.Synchronization.Mutex.LockReleased(holding.Holder, holding.Mutex) - log.Infof("%s released a lock from %s", resourceKey, holding.Mutex) + log.Infof("%s released a lock from %s", holding.Holder, holding.Mutex) } // Remove the pending Workflow level mutex keys @@ -252,8 +332,8 @@ func (cm *Manager) ReleaseAll(wf *wfv1.Workflow) bool { if syncLockHolder == nil { continue } - resourceKey := getResourceKey(wf.Namespace, wf.Name, wf.Name) - syncLockHolder.removeFromQueue(resourceKey) + key := getHolderKey(wf, "") + syncLockHolder.removeFromQueue(key) } wf.Status.Synchronization.Mutex = nil } @@ -296,14 +376,6 @@ func getHolderKey(wf *wfv1.Workflow, nodeName string) string { return key } -func getResourceKey(namespace, wfName, resourceName string) string { - resourceKey := fmt.Sprintf("%s/%s", namespace, wfName) - if resourceName != wfName { - resourceKey = fmt.Sprintf("%s/%s", resourceKey, resourceName) - } - return resourceKey -} - func (cm *Manager) getCurrentLockHolders(lockName string) []string { if concurrency, ok := cm.syncLockMap[lockName]; ok { return concurrency.getCurrentHolders() diff --git a/workflow/sync/sync_manager_test.go b/workflow/sync/sync_manager_test.go index f3abdca94101..10aa74642f0a 100644 --- a/workflow/sync/sync_manager_test.go +++ b/workflow/sync/sync_manager_test.go @@ -378,7 +378,8 @@ func TestSemaphoreWfLevel(t *testing.T) { assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Semaphore) assert.NotNil(t, wf.Status.Synchronization.Semaphore.Holding) - assert.Equal(t, wf.Name, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) + key := getHolderKey(wf, "") + assert.Equal(t, key, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) // Try to acquire again status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "", wf.Spec.Synchronization) @@ -430,7 +431,8 @@ func TestSemaphoreWfLevel(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf2.Status.Synchronization) assert.NotNil(t, wf2.Status.Synchronization.Semaphore) - assert.Equal(t, wf2.Name, wf2.Status.Synchronization.Semaphore.Holding[0].Holders[0]) + key = getHolderKey(wf2, "") + assert.Equal(t, key, wf2.Status.Synchronization.Semaphore.Holding[0].Holders[0]) concurrenyMgr.ReleaseAll(wf2) assert.Nil(t, wf2.Status.Synchronization) @@ -469,7 +471,8 @@ func TestResizeSemaphoreSize(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Semaphore) - assert.Equal(t, wf.Name, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) + key := getHolderKey(wf, "") + assert.Equal(t, key, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) wf1.Name = "two" status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf1, "", wf1.Spec.Synchronization) @@ -499,7 +502,8 @@ func TestResizeSemaphoreSize(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf1.Status.Synchronization) assert.NotNil(t, wf1.Status.Synchronization.Semaphore) - assert.Equal(t, wf1.Name, wf1.Status.Synchronization.Semaphore.Holding[0].Holders[0]) + key = getHolderKey(wf1, "") + assert.Equal(t, key, wf1.Status.Synchronization.Semaphore.Holding[0].Holders[0]) status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf2, "", wf2.Spec.Synchronization) require.NoError(t, err) @@ -508,7 +512,8 @@ func TestResizeSemaphoreSize(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf2.Status.Synchronization) assert.NotNil(t, wf2.Status.Synchronization.Semaphore) - assert.Equal(t, wf2.Name, wf2.Status.Synchronization.Semaphore.Holding[0].Holders[0]) + key = getHolderKey(wf2, "") + assert.Equal(t, key, wf2.Status.Synchronization.Semaphore.Holding[0].Holders[0]) }) } @@ -537,7 +542,8 @@ func TestSemaphoreTmplLevel(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Semaphore) - assert.Equal(t, "semaphore-tmpl-level-xjvln-3448864205", wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) + key := getHolderKey(wf, "semaphore-tmpl-level-xjvln-3448864205") + assert.Equal(t, key, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) // Try to acquire again status, wfUpdate, msg, err = concurrenyMgr.TryAcquire(wf, "semaphore-tmpl-level-xjvln-3448864205", tmpl.Synchronization) @@ -564,7 +570,8 @@ func TestSemaphoreTmplLevel(t *testing.T) { assert.True(t, wfUpdate) assert.NotNil(t, wf.Status.Synchronization) assert.NotNil(t, wf.Status.Synchronization.Semaphore) - assert.Equal(t, "semaphore-tmpl-level-xjvln-1607747183", wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) + key = getHolderKey(wf, "semaphore-tmpl-level-xjvln-1607747183") + assert.Equal(t, key, wf.Status.Synchronization.Semaphore.Holding[0].Holders[0]) }) } @@ -865,3 +872,358 @@ status: }) } + +const wfV2MutexMigrationWorkflowLevel = `apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + creationTimestamp: null + name: test1 + namespace: default +spec: + arguments: {} + entrypoint: whalesay + synchronization: + mutex: + name: my-mutex + templates: + - container: + args: + - hello world + command: + - cowsay + image: docker/whalesay:latest + name: "" + resources: {} + inputs: {} + metadata: {} + name: whalesay + outputs: {} +status: + finishedAt: null + startedAt: null + synchronization: + mutex: + holding: + - holder: test1 + mutex: default/Mutex/my-mutex + +` + +const wfV2MutexMigrationTemplateLevel = `apiVersion: argoproj.io/v1alpha1 +kind: Workflow +metadata: + annotations: + workflows.argoproj.io/pod-name-format: v2 + creationTimestamp: "2024-09-17T00:11:53Z" + generateName: synchronization-tmpl-level- + generation: 5 + labels: + workflows.argoproj.io/completed: "false" + workflows.argoproj.io/phase: Running + name: synchronization-tmpl-level-xvzpt + namespace: argo + resourceVersion: "10182" + uid: f2d4ac34-1495-48ba-8aab-25239880fef3 +spec: + activeDeadlineSeconds: 300 + arguments: {} + entrypoint: synchronization-tmpl-level-example + podSpecPatch: | + terminationGracePeriodSeconds: 3 + templates: + - inputs: {} + metadata: {} + name: synchronization-tmpl-level-example + outputs: {} + steps: + - - arguments: + parameters: + - name: seconds + value: '{{item}}' + name: synchronization-acquire-lock + template: acquire-lock + withParam: '["1","2","3","4","5"]' + - container: + args: + - sleep 60; echo acquired lock + command: + - sh + - -c + image: alpine:latest + name: "" + resources: {} + inputs: {} + metadata: {} + name: acquire-lock + outputs: {} + synchronization: + mutex: + name: workflow +status: + artifactGCStatus: + notSpecified: true + artifactRepositoryRef: + artifactRepository: + archiveLogs: true + s3: + accessKeySecret: + key: accesskey + name: my-minio-cred + bucket: my-bucket + endpoint: minio:9000 + insecure: true + secretKeySecret: + key: secretkey + name: my-minio-cred + configMap: artifact-repositories + key: default-v1 + namespace: argo + conditions: + - status: "True" + type: PodRunning + finishedAt: null + nodes: + synchronization-tmpl-level-xvzpt: + children: + - synchronization-tmpl-level-xvzpt-2018718843 + displayName: synchronization-tmpl-level-xvzpt + finishedAt: null + id: synchronization-tmpl-level-xvzpt + name: synchronization-tmpl-level-xvzpt + phase: Running + progress: 0/5 + startedAt: "2024-09-17T00:11:53Z" + templateName: synchronization-tmpl-level-example + templateScope: local/synchronization-tmpl-level-xvzpt + type: Steps + synchronization-tmpl-level-xvzpt-755731602: + boundaryID: synchronization-tmpl-level-xvzpt + displayName: synchronization-acquire-lock(1:2) + finishedAt: null + id: synchronization-tmpl-level-xvzpt-755731602 + message: 'Waiting for argo/Mutex/workflow lock. Lock status: 0/1' + name: synchronization-tmpl-level-xvzpt[0].synchronization-acquire-lock(1:2) + phase: Pending + progress: 0/1 + startedAt: "2024-09-17T00:11:53Z" + synchronizationStatus: + waiting: argo/Mutex/workflow + templateName: acquire-lock + templateScope: local/synchronization-tmpl-level-xvzpt + type: Pod + synchronization-tmpl-level-xvzpt-928517240: + boundaryID: synchronization-tmpl-level-xvzpt + displayName: synchronization-acquire-lock(0:1) + finishedAt: null + hostNodeName: k3d-k3s-default-server-0 + id: synchronization-tmpl-level-xvzpt-928517240 + name: synchronization-tmpl-level-xvzpt[0].synchronization-acquire-lock(0:1) + phase: Running + progress: 0/1 + startedAt: "2024-09-17T00:11:53Z" + templateName: acquire-lock + templateScope: local/synchronization-tmpl-level-xvzpt + type: Pod + synchronization-tmpl-level-xvzpt-1018728496: + boundaryID: synchronization-tmpl-level-xvzpt + displayName: synchronization-acquire-lock(4:5) + finishedAt: null + id: synchronization-tmpl-level-xvzpt-1018728496 + message: 'Waiting for argo/Mutex/workflow lock. Lock status: 0/1' + name: synchronization-tmpl-level-xvzpt[0].synchronization-acquire-lock(4:5) + phase: Pending + progress: 0/1 + startedAt: "2024-09-17T00:11:53Z" + synchronizationStatus: + waiting: argo/Mutex/workflow + templateName: acquire-lock + templateScope: local/synchronization-tmpl-level-xvzpt + type: Pod + synchronization-tmpl-level-xvzpt-2018718843: + boundaryID: synchronization-tmpl-level-xvzpt + children: + - synchronization-tmpl-level-xvzpt-928517240 + - synchronization-tmpl-level-xvzpt-755731602 + - synchronization-tmpl-level-xvzpt-4037094368 + - synchronization-tmpl-level-xvzpt-3632956078 + - synchronization-tmpl-level-xvzpt-1018728496 + displayName: '[0]' + finishedAt: null + id: synchronization-tmpl-level-xvzpt-2018718843 + name: synchronization-tmpl-level-xvzpt[0] + nodeFlag: {} + phase: Running + progress: 0/5 + startedAt: "2024-09-17T00:11:53Z" + templateScope: local/synchronization-tmpl-level-xvzpt + type: StepGroup + synchronization-tmpl-level-xvzpt-3632956078: + boundaryID: synchronization-tmpl-level-xvzpt + displayName: synchronization-acquire-lock(3:4) + finishedAt: null + id: synchronization-tmpl-level-xvzpt-3632956078 + message: 'Waiting for argo/Mutex/workflow lock. Lock status: 0/1' + name: synchronization-tmpl-level-xvzpt[0].synchronization-acquire-lock(3:4) + phase: Pending + progress: 0/1 + startedAt: "2024-09-17T00:11:53Z" + synchronizationStatus: + waiting: argo/Mutex/workflow + templateName: acquire-lock + templateScope: local/synchronization-tmpl-level-xvzpt + type: Pod + synchronization-tmpl-level-xvzpt-4037094368: + boundaryID: synchronization-tmpl-level-xvzpt + displayName: synchronization-acquire-lock(2:3) + finishedAt: null + id: synchronization-tmpl-level-xvzpt-4037094368 + message: 'Waiting for argo/Mutex/workflow lock. Lock status: 0/1' + name: synchronization-tmpl-level-xvzpt[0].synchronization-acquire-lock(2:3) + phase: Pending + progress: 0/1 + startedAt: "2024-09-17T00:11:53Z" + synchronizationStatus: + waiting: argo/Mutex/workflow + templateName: acquire-lock + templateScope: local/synchronization-tmpl-level-xvzpt + type: Pod + phase: Running + progress: 0/5 + startedAt: "2024-09-17T00:11:53Z" + synchronization: + mutex: + holding: + - holder: synchronization-tmpl-level-xvzpt-928517240 + mutex: argo/Mutex/workflow + waiting: + - holder: argo/synchronization-tmpl-level-xvzpt/synchronization-tmpl-level-xvzpt-928517240 + mutex: argo/Mutex/workflow + taskResultsCompletionStatus: + synchronization-tmpl-level-xvzpt-928517240: false +` + +func TestMutexMigration(t *testing.T) { + assert := assert.New(t) + require := require.New(t) + kube := fake.NewSimpleClientset() + + syncLimitFunc := GetSyncLimitFunc(kube) + + concurrenyMgr := NewLockManager(syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc) + + wfMutex := wfv1.MustUnmarshalWorkflow(wfWithMutex) + + t.Run("RunMigrationWorkflowLevel", func(t *testing.T) { + concurrenyMgr.syncLockMap = make(map[string]Semaphore) + wfMutex2 := wfv1.MustUnmarshalWorkflow(wfV2MutexMigrationWorkflowLevel) + + require.Len(wfMutex2.Status.Synchronization.Mutex.Holding, 1) + holderKey := getHolderKey(wfMutex2, "") + items := strings.Split(holderKey, "/") + holdingName := items[len(items)-1] + assert.Equal(wfMutex2.Status.Synchronization.Mutex.Holding[0].Holder, holdingName) + + concurrenyMgr.syncLockMap = make(map[string]Semaphore) + wfs := []wfv1.Workflow{*wfMutex2.DeepCopy()} + concurrenyMgr.Initialize(wfs) + + lockName, err := GetLockName(wfMutex2.Spec.Synchronization, wfMutex2.Namespace) + require.NoError(err) + + sem, found := concurrenyMgr.syncLockMap[lockName.EncodeName()] + require.True(found) + + holders := sem.getCurrentHolders() + require.Len(holders, 1) + + // PROVE: bug absent + assert.Equal(holderKey, holders[0]) + + // We should already have this lock since we acquired it above + status, _, _, err := concurrenyMgr.TryAcquire(wfMutex2, "", wfMutex.Spec.Synchronization) + require.NoError(err) + // BUG NOT PRESENT: https://github.com/argoproj/argo-workflows/issues/8684 + assert.True(status) + }) + + concurrenyMgr = NewLockManager(syncLimitFunc, func(key string) { + }, WorkflowExistenceFunc) + + t.Run("RunMigrationTemplateLevel", func(t *testing.T) { + concurrenyMgr.syncLockMap = make(map[string]Semaphore) + wfMutex3 := wfv1.MustUnmarshalWorkflow(wfV2MutexMigrationTemplateLevel) + require.Len(wfMutex3.Status.Synchronization.Mutex.Holding, 1) + + numFound := 0 + foundNodeID := "" + for nodeID := range wfMutex3.Status.Nodes { + holder := getHolderKey(wfMutex3, nodeID) + if holder == getUpgradedKey(wfMutex3, wfMutex3.Status.Synchronization.Mutex.Holding[0].Holder, TemplateLevel) { + foundNodeID = nodeID + numFound++ + } + } + assert.Equal(1, numFound) + + wfs := []wfv1.Workflow{*wfMutex3.DeepCopy()} + concurrenyMgr.Initialize(wfs) + + lockName, err := GetLockName(wfMutex3.Spec.Templates[1].Synchronization, wfMutex3.Namespace) + require.NoError(err) + + sem, found := concurrenyMgr.syncLockMap[lockName.EncodeName()] + require.True(found) + + holders := sem.getCurrentHolders() + require.Len(holders, 1) + + holderKey := getHolderKey(wfMutex3, foundNodeID) + + // PROVE: bug absent + assert.Equal(holderKey, holders[0]) + + status, _, _, err := concurrenyMgr.TryAcquire(wfMutex3, foundNodeID, wfMutex.Spec.Synchronization) + require.NoError(err) + // BUG NOT PRESENT: https://github.com/argoproj/argo-workflows/issues/8684 + assert.True(status) + }) +} + +// getHoldingNameV1 legacy code to get holding name. +func getHoldingNameV1(holderKey string) string { + items := strings.Split(holderKey, "/") + return items[len(items)-1] +} + +func TestCheckHolderVersion(t *testing.T) { + + t.Run("CheckHolderKeyWithNodeName", func(t *testing.T) { + assert := assert.New(t) + wfMutex := wfv1.MustUnmarshalWorkflow(wfWithMutex) + key := getHolderKey(wfMutex, wfMutex.Name) + + keyv2 := key + version := wfv1.CheckHolderKeyVersion(keyv2) + assert.Equal(wfv1.HoldingNameV2, version) + + keyv1 := getHoldingNameV1(key) + version = wfv1.CheckHolderKeyVersion(keyv1) + assert.Equal(wfv1.HoldingNameV1, version) + + }) + + t.Run("CheckHolderKeyWithoutNodeName", func(t *testing.T) { + assert := assert.New(t) + wfMutex := wfv1.MustUnmarshalWorkflow(wfWithMutex) + + key := getHolderKey(wfMutex, "") + keyv2 := key + version := wfv1.CheckHolderKeyVersion(keyv2) + assert.Equal(wfv1.HoldingNameV2, version) + + keyv1 := getHoldingNameV1(key) + version = wfv1.CheckHolderKeyVersion(keyv1) + assert.Equal(wfv1.HoldingNameV1, version) + }) +}