Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add tron topology_spread_constraints support to PaaSTA #3983

Merged
merged 6 commits into from
Nov 11, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
53 changes: 49 additions & 4 deletions paasta_tools/tron_tools.py
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,9 @@
from paasta_tools import spark_tools

from paasta_tools.kubernetes_tools import (
NodeSelectorConfig,
allowlist_denylist_to_requirements,
contains_zone_label,
get_service_account_name,
limit_size_with_hash,
raw_selectors_to_requirements,
Expand Down Expand Up @@ -248,6 +250,7 @@ class TronActionConfigDict(InstanceConfigDict, total=False):
# maneuvering to unify
command: str
service_account_name: str
node_selectors: Dict[str, NodeSelectorConfig]

# the values for this dict can be anything since it's whatever
# spark accepts
Expand Down Expand Up @@ -594,18 +597,39 @@ def get_node_selectors(self) -> Dict[str, str]:
def get_node_affinities(self) -> Optional[List[Dict[str, Union[str, List[str]]]]]:
"""Converts deploy_whitelist and deploy_blacklist in node affinities.

note: At the time of writing, `kubectl describe` does not show affinities,
NOTE: At the time of writing, `kubectl describe` does not show affinities,
only selectors. To see affinities, use `kubectl get pod -o json` instead.

WARNING: At the time of writing, we only used requiredDuringSchedulingIgnoredDuringExecution node affinities in Tron as we currently have
no use case for preferredDuringSchedulingIgnoredDuringExecution node affinities.
"""
requirements = allowlist_denylist_to_requirements(
allowlist=self.get_deploy_whitelist(),
denylist=self.get_deploy_blacklist(),
)
node_selectors = self.config_dict.get("node_selectors", {})
requirements.extend(
raw_selectors_to_requirements(
raw_selectors=self.config_dict.get("node_selectors", {}), # type: ignore
raw_selectors=node_selectors,
)
)

system_paasta_config = load_system_paasta_config()
if system_paasta_config.get_enable_tron_tsc():
# PAASTA-18198: To improve AZ balance with Karpenter, we temporarily allow specifying zone affinities per pool
pool_node_affinities = system_paasta_config.get_pool_node_affinities()
if pool_node_affinities and self.get_pool() in pool_node_affinities:
current_pool_node_affinities = pool_node_affinities[self.get_pool()]
# If the service already has a node selector for a zone, we don't want to override it
if current_pool_node_affinities and not contains_zone_label(
node_selectors
):
requirements.extend(
raw_selectors_to_requirements(
raw_selectors=current_pool_node_affinities,
)
)

if not requirements:
return None

Expand Down Expand Up @@ -963,6 +987,9 @@ def format_tron_action_dict(action_config: TronActionConfig):
"service_account_name": action_config.get_service_account_name(),
}

# we need this loaded in several branches, so we'll load it once at the start to simplify things
system_paasta_config = load_system_paasta_config()

if executor in KUBERNETES_EXECUTOR_NAMES:
# we'd like Tron to be able to distinguish between spark and normal actions
# even though they both run on k8s
Expand All @@ -984,6 +1011,26 @@ def format_tron_action_dict(action_config: TronActionConfig):
result["node_selectors"] = action_config.get_node_selectors()
result["node_affinities"] = action_config.get_node_affinities()

if system_paasta_config.get_enable_tron_tsc():
# XXX: this is currently hardcoded since we should only really need TSC for zone-aware scheduling
result["topology_spread_constraints"] = [
{
# try to evenly spread pods across specified topology
"max_skew": 1,
# narrow down what pods to consider when spreading
"label_selector": {
# only consider pods that are managed by tron
"app.kubernetes.io/managed-by": "tron",
# and in the same pool
"paasta.yelp.com/pool": action_config.get_pool(),
},
# now, spread across AZs
"topology_key": "topology.kubernetes.io/zone",
# but if not possible, schedule even with a zonal imbalance
"when_unsatisfiable": "ScheduleAnyway",
},
]

# XXX: once we're off mesos we can make get_cap_* return just the cap names as a list
result["cap_add"] = [cap["value"] for cap in action_config.get_cap_add()]
result["cap_drop"] = [cap["value"] for cap in action_config.get_cap_drop()]
Expand Down Expand Up @@ -1029,14 +1076,12 @@ def format_tron_action_dict(action_config: TronActionConfig):

# XXX: now that we're actually passing through extra_volumes correctly (e.g., using get_volumes()),
# we can get rid of the default_volumes from the Tron master config
system_paasta_config = load_system_paasta_config()
extra_volumes = action_config.get_volumes(
system_paasta_config.get_volumes(),
uses_bulkdata_default=system_paasta_config.get_uses_bulkdata_default(),
)
if executor == "spark":
is_mrjob = action_config.config_dict.get("mrjob", False)
system_paasta_config = load_system_paasta_config()
# inject additional Spark configs in case of Spark commands
result["command"] = spark_tools.build_spark_command(
result["command"],
Expand Down
4 changes: 4 additions & 0 deletions paasta_tools/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -2053,6 +2053,7 @@ class SystemPaastaConfigDict(TypedDict, total=False):
vitess_throttling_config: Dict
uses_bulkdata_default: bool
enable_automated_redeploys_default: bool
enable_tron_tsc: bool


def load_system_paasta_config(
Expand Down Expand Up @@ -2821,6 +2822,9 @@ def get_uses_bulkdata_default(self) -> bool:
def get_enable_automated_redeploys_default(self) -> bool:
return self.config_dict.get("enable_automated_redeploys_default", False)

def get_enable_tron_tsc(self) -> bool:
return self.config_dict.get("enable_tron_tsc", False)


def _run(
command: Union[str, List[str]],
Expand Down
Loading
Loading