Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

WIP: Add nodegroup annotation to the pod #1152

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions docs/CHANGELOG.rst
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,11 @@ This project adheres to `Semantic Versioning <http://semver.org/>`_.
Unreleased
==========

Added
-----
- Add node affinity for instance-category using the ratio between instance to
process resource requirements.

Changed
-------
- Change fallback for storage resource from 200GB to 10GB
Expand Down
136 changes: 136 additions & 0 deletions resolwe/flow/managers/workload_connectors/kubernetes.py
Original file line number Diff line number Diff line change
Expand Up @@ -106,6 +106,29 @@ def sanitize_kubernetes_label(label: str, trim_end: bool = True) -> str:
return sanitized_label


def memory_to_mb(memory_str):
# If the input is an integer, assume it's in gigabytes (GB)
if isinstance(memory_str, int):
return memory_str * 1024 # Default to gigabytes

# If no unit is provided, assume it's in gigabytes (GB)
if memory_str[-1].isdigit():
return int(memory_str) * 1024 # Default to gigabytes

# Check the last character for the unit (M for MB, G for GB)
unit = memory_str[-1].upper()
value = int(memory_str[:-1]) # The numerical part

if unit == "G":
return value * 1024 # Convert GB to MB
elif unit == "M":
return value # It's already in MB
else:
raise ValueError(
"Invalid memory unit. Use 'M' for megabytes or 'G' for gigabytes."
)


class ConfigLocation(Enum):
"""The enum specifying where to read the configuration from."""

Expand Down Expand Up @@ -610,6 +633,104 @@ def optimize_job_scheduling(self, data: Data, job_description: dict):
}
}

def append_node_affinity(self, job_description: dict, process_requirements: dict):
"""
Appends nodeAffinity based on CPU and memory ratio to an existing job description if instance_types are defined
within FLOW_KUBERNETES_AFFINITY.

:param job_description: dict containing the job spec where the affinity will be appended.
:param process_requirements: dict containing 'cpu' and 'memory' requirements of the process.
:return: None (modifies job_description in place).
"""

# Extract kubernetes_affinity and instance_types from settings
if kubernetes_affinity := getattr(settings, "FLOW_KUBERNETES_AFFINITY", None):
instance_types = kubernetes_affinity.get("instance_types", {})
else:
instance_types = {}

# Proceed only if instance_types is not empty
if not instance_types:
return

best_match_category = None
lowest_process_hourly_rate = float("inf")

process_cpu = process_requirements["cpu"]
process_memory = process_requirements["memory"]

# Sort instance types by hourly_rate to prefer cheaper options.
# This should avoid spawning 128GB instance type for a single 32GB
# process, but support larger instances when needed. This is not fully
# optimized if smaller instance types are available, since it would
# create a lot of instance fragmentation. Smaller nodes should have
# at least 8 cpus in this configuration.
sorted_instance_types = sorted(
instance_types.items(), key=lambda x: x[1].get("hourly_rate", float("inf"))
)

for instance_type, attributes in sorted_instance_types:
instance_cpu = int(attributes["cpu"])
instance_memory = memory_to_mb(attributes["memory"])
instance_category = attributes["instance-category"]
instance_hourly_rate = float(attributes["hourly_rate"])

# Skip instance types that don't meet the process requirements
# This needs to be here, since some instance category may not have
# a sufficiently large instance type available.
if instance_cpu < process_cpu or instance_memory < process_memory:
continue

# In practice this should be equivalent to:
# compute-optimized for ratio <~ 2.25
# general purpose for 2.25 <~ ratio <= 5.25
# memory-optimized for ratio > 5.25
process_instance_density = min(
instance_cpu / process_cpu, instance_memory / process_memory
)
process_hourly_rate = instance_hourly_rate / process_instance_density

if process_hourly_rate < lowest_process_hourly_rate:
lowest_process_hourly_rate = process_hourly_rate
best_match_category = instance_category

if not best_match_category:
raise ValueError(
"No suitable instance type found for the given process requirements."
)

# Create the new match expression
new_match_expression = {
"key": "instance-category",
"operator": "In",
"values": [best_match_category],
}

job_spec = job_description["spec"]["template"]["spec"]

# Ensure nodeAffinity structure exists in job_description
if "affinity" not in job_spec:
job_spec["affinity"] = {}

if "nodeAffinity" not in job_spec["affinity"]:
job_spec["affinity"]["nodeAffinity"] = {
"requiredDuringSchedulingIgnoredDuringExecution": {
"nodeSelectorTerms": []
}
}

node_affinity = job_spec["affinity"]["nodeAffinity"]
node_selector_terms = node_affinity[
"requiredDuringSchedulingIgnoredDuringExecution"
]["nodeSelectorTerms"]

# If there are no existing terms, create one
if not node_selector_terms:
node_selector_terms.append({"matchExpressions": []})

# Append the new match expression to the first nodeSelectorTerm's matchExpressions
node_selector_terms[0]["matchExpressions"].append(new_match_expression)

def _get_overcommit_factors(self, data: Data) -> dict:
"""Get the overcommit settings for CPU and memory.

Expand Down Expand Up @@ -670,6 +791,8 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
requests["cpu"] = limits["cores"] * overcommit_factors["cpu"]
limits["cpu"] = limits.pop("cores") + 1

process_requirements = {"cpu": limits["cpu"], "memory": limits["memory"]}

# The memory in the database is stored in megabytes but the kubertenes
# requires memory in bytes.
# We request less memory than stored in the database and set limit at 10% more
Expand Down Expand Up @@ -852,6 +975,19 @@ def start(self, data: Data, listener_connection: Tuple[str, str, str]):
},
}
self.optimize_job_scheduling(data, job_description)
try:
self.append_node_affinity(job_description, process_requirements)
except Exception as error:
# This operation is not critical for the job to run, so we log the
# error and continue. The job will still run, but the node affinity
# may be sub-optimal.
logger.exception(
__(
"Kubernetes instance category affinity assignemt failed: '{}'.",
error,
)
)

start_time = time.time()

processing_name = constants.PROCESSING_VOLUME_NAME
Expand Down
Loading