Skip to content

Commit

Permalink
HPCC-31248 k8s multi-thor queue target
Browse files Browse the repository at this point in the history
Add ability for k8s Thor definitions to specify additional queues,
such that multiple disparate Thor's can [also] listen to the same
named queue.
This allows, for example, separate Thor's that are bound to
different availability zone node pools, to be configured to
listen to a single queue.

Signed-off-by: Jake Smith <[email protected]>
  • Loading branch information
jakesmith committed Feb 15, 2024
1 parent 0e38230 commit 5afc6ad
Show file tree
Hide file tree
Showing 6 changed files with 102 additions and 9 deletions.
9 changes: 9 additions & 0 deletions ecl/agentexec/agentexec.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,16 @@ int CEclAgentExecutionServer::run()
}
}
else
{
getClusterEclAgentQueueName(queueNames, agentName);
Owned<IPropertyTreeIterator> auxQueueIter = config->getElements("auxQueues");
ForEach(*auxQueueIter)
{
queueNames.append(',');
const char *auxQueueName = auxQueueIter->query().queryProp(nullptr);
getClusterEclAgentQueueName(queueNames, auxQueueName);
}
}
#else
getAgentQueueNames(queueNames, agentName);
#endif
Expand Down
63 changes: 55 additions & 8 deletions helm/hpcc/templates/_helpers.tpl
Original file line number Diff line number Diff line change
Expand Up @@ -1091,10 +1091,27 @@ securityContext:
runAsGroup: {{ $user.gid | default 10001 }}
{{ end -}}

{{/*
Validate the Thors associated with two aux queues match
*/}}
{{- define "hpcc.validateAuxQueueMatch" -}}
{{- $root := .root -}}
{{- $current := .current -}}
{{- $incoming := .incoming -}}
{{- $currentPrefix := get $current "prefix" -}}
{{- $incomingPrefix := get $incoming "prefix" -}}
{{- if not (eq $currentPrefix $incomingPrefix) -}}
{{- $_ := fail (printf "Thor '%s' defines additional queue '%s' with different prefix to existing Thor using same aux queue" $incoming.name $current.name) -}}
{{- else if not (eq $current.width $incoming.width) -}}
{{- $_ := fail (printf "Thor '%s' defines additional queue '%s' with different width to existing Thor using same aux queue" $incoming.name $current.name) -}}
{{- end -}}
{{- end -}}

{{/*
Generate instance queue names
*/}}
{{- define "hpcc.generateConfigMapQueues" -}}
{{- $root := . -}}
{{- range $.Values.eclagent -}}
{{- if not .disabled -}}
- name: {{ .name }}
Expand All @@ -1119,16 +1136,46 @@ Generate instance queue names
{{- end }}
{{- end }}
{{ end -}}
{{- $stdThorQueues := dict -}}
{{- $stdThorQueuesWithAux := dict -}}
{{- range $.Values.thor -}}
{{- if not .disabled -}}
- name: {{ .name }}
type: thor
{{- if hasKey . "prefix" }}
prefix: {{ .prefix }}
{{- end }}
width: {{ mul (.numWorkers | default 1) ( .channelsPerWorker | default 1) }}
{{- end }}
{{ end -}}
{{- $queueItem := dict "name" .name "type" "thor" -}}
{{- if hasKey . "prefix" -}}
{{- $_ := set $queueItem "prefix" .prefix -}}
{{- end -}}
{{- $_ := set $queueItem "width" (mul (.numWorkers | default 1) ( .channelsPerWorker | default 1)) -}}
{{- if hasKey . "auxQueues" -}}
{{- $_ := set $stdThorQueuesWithAux .name .auxQueues -}}
{{- end -}}
{{- $_ := set $stdThorQueues .name $queueItem -}}
{{- end -}}
{{- end -}}
{{- $auxThorQueues := dict -}}
{{- range $thorNameWithAuxQueues, $auxQueues := $stdThorQueuesWithAux -}}
{{ $queueItem := get $stdThorQueues $thorNameWithAuxQueues -}}
{{- range $auxQueueName := $auxQueues -}}
{{- if (hasKey $stdThorQueues $auxQueueName) -}}
{{- $_ := fail (printf "Thor '%s' defines aux queue '%s' that clashes with existing Thor name" $queueItem.name $auxQueueName) -}}
{{- end -}}
{{- if (hasKey $auxThorQueues $auxQueueName) -}}
{{- $existingAuxQueueItem := get $auxThorQueues $auxQueueName -}}
{{- include "hpcc.validateAuxQueueMatch" (dict "root" $root "current" $existingAuxQueueItem "incoming" $queueItem) -}}
{{- else -}}
{{- $newQueueItem := deepCopy $queueItem -}}
{{- $_ := set $newQueueItem "name" $auxQueueName -}}
{{- $_ := set $auxThorQueues $auxQueueName $newQueueItem -}}
{{- end -}}
{{- end -}}
{{- end -}}
{{- $combinedList := list -}}
{{- range $stdQueue := $stdThorQueues }}
{{- $combinedList = append $combinedList $stdQueue -}}
{{- end -}}
{{- range $auxQueue := $auxThorQueues }}
{{- $combinedList = append $combinedList $auxQueue -}}
{{- end -}}
{{- toYaml $combinedList -}}
{{- end -}}

{{- define "hpcc.usesRemoteIssuer" -}}
Expand Down
2 changes: 1 addition & 1 deletion helm/hpcc/templates/thor.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@ Pass in dict with root and me
{{- define "hpcc.thorConfigMap" -}}
{{- $eclAgentType := .me.eclAgentType | default "hthor" }}
{{- $hthorName := printf "%s-%s" .me.name $eclAgentType }}
{{- $eclAgentScope := dict "name" .eclAgentName "type" $eclAgentType "useChildProcesses" .eclAgentUseChildProcesses "replicas" .eclAgentReplicas "maxActive" .me.maxJobs | merge (pick .me "keepJobs" "logging" "tracing") }}
{{- $eclAgentScope := dict "name" .eclAgentName "type" $eclAgentType "useChildProcesses" .eclAgentUseChildProcesses "replicas" .eclAgentReplicas "maxActive" .me.maxJobs | merge (pick .me "keepJobs" "logging" "tracing" "auxQueues") }}
{{- $thorAgentScope := dict "name" .thorAgentName "replicas" .thorAgentReplicas "maxActive" .me.maxGraphs | merge (pick .me "keepJobs" "logging" "tracing") }}
{{- $eclAgentResources := .me.eclAgentResources | default dict -}}
{{- $hthorScope := dict "name" $hthorName "jobMemorySectionName" "eclAgentMemory" | merge (pick .me "multiJobLinger" "maxGraphStartupTime" "logging" "tracing") | merge (dict "resources" (deepCopy $eclAgentResources)) }}
Expand Down
5 changes: 5 additions & 0 deletions helm/hpcc/values.schema.json
Original file line number Diff line number Diff line change
Expand Up @@ -2522,6 +2522,11 @@
"description": "[Optional] The time (seconds) for the job to wait for a Thor instance to start",
"default": 600
},
"auxQueues": {
"type": "array",
"items": { "type": "string" },
"description": "[Optional] Auxiliary queue names to listen to. Multiple Thor's can listen to the same queue using this mechanism"
},
"image": {
"$ref": "#/definitions/image"
},
Expand Down
16 changes: 16 additions & 0 deletions testing/helm/errtests/mismatched-multithor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
thor:
- name: thor1
auxQueues: [ thor ]
numWorkers: 1
maxJobs: 1
maxGraphs: 1
- name: thor2
auxQueues: [ thor ]
numWorkers: 2
maxJobs: 1
maxGraphs: 1
- name: thor3
auxQueues: [ thor ]
numWorkers: 3
maxJobs: 1
maxGraphs: 1
16 changes: 16 additions & 0 deletions testing/helm/tests/multithor.yaml
Original file line number Diff line number Diff line change
@@ -0,0 +1,16 @@
thor:
- name: thor1
auxQueues: [ thor ]
numWorkers: 2
maxJobs: 1
maxGraphs: 1
- name: thor2
auxQueues: [ thor ]
numWorkers: 2
maxJobs: 1
maxGraphs: 1
- name: thor3
auxQueues: [ thor ]
numWorkers: 2
maxJobs: 1
maxGraphs: 1

0 comments on commit 5afc6ad

Please sign in to comment.