Skip to content

Commit

Permalink
Job level scaling with compute resources and batch launches aggregation
Browse files Browse the repository at this point in the history
Signed-off-by: Luca Carrogu <[email protected]>
  • Loading branch information
lukeseawalker committed Jun 27, 2023
1 parent ddf0e0c commit 2397fd2
Show file tree
Hide file tree
Showing 15 changed files with 4,255 additions and 2,238 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ This file is used to list changes made in each version of the aws-parallelcluste
**ENHANCEMENTS**

**CHANGES**
- Perform job level scaling for exclusive jobs, by reading at job information from `SLURM_RESUME_FILE`.
- Perform all-or-nothing / job-level scaling for exclusive jobs, by reading at job information from `SLURM_RESUME_FILE`.

**BUG FIXES**

Expand Down
27 changes: 27 additions & 0 deletions src/common/utils.py
Original file line number Diff line number Diff line change
Expand Up @@ -320,3 +320,30 @@ def validate_absolute_path(path):
if not os.path.isabs(path):
raise ValueError(f"The path {path} is not a valid absolute path")
return True


def setup_logging_filter(logger: logging.Logger, custom_field: str):
"""Set up a custom logging filter."""

class CustomFilter(logging.Filter):
def __init__(self, custom_field: str):
super().__init__()
self.field = custom_field
self.value = None

def set_custom_value(self, custom_value: str):
self.value = custom_value

def filter(self, record: logging.LogRecord) -> bool:
if self.value:
record.msg = f"{self.field} {self.value} - {record.msg}"
return True

custom_filter = CustomFilter(custom_field)
logger.addFilter(custom_filter)
return custom_filter


def remove_logging_filter(logger: logging.Logger, custom_filter):
"""Remove a custom log filter."""
logger.removeFilter(custom_filter)
4 changes: 2 additions & 2 deletions src/slurm_plugin/clustermgtd.py
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
from slurm_plugin.cluster_event_publisher import ClusterEventPublisher
from slurm_plugin.common import TIMESTAMP_FORMAT, log_exception, print_with_count
from slurm_plugin.console_logger import ConsoleLogger
from slurm_plugin.instance_manager import InstanceManager
from slurm_plugin.instance_manager import InstanceManagerFactory
from slurm_plugin.slurm_resources import (
CONFIG_FILE_DIR,
ComputeResourceFailureEvent,
Expand Down Expand Up @@ -413,7 +413,7 @@ def shutdown(self):
@staticmethod
def _initialize_instance_manager(config):
"""Initialize instance manager class that will be used to launch/terminate/describe instances."""
return InstanceManager(
return InstanceManagerFactory.get_manager(
config.region,
config.cluster_name,
config.boto3_config,
Expand Down
5 changes: 4 additions & 1 deletion src/slurm_plugin/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,10 @@ def wrapper(*args, **kwargs):
logger.log(log_level, "Failed when %s with exception %s, message: %s", action_desc, type(e).__name__, e)
if raise_on_error:
if exception_to_raise:
raise exception_to_raise
if isinstance(e, exception_to_raise):
raise e # preserve exception message if exception to raise is same of actual exception
else:
raise exception_to_raise
else:
raise

Expand Down
14 changes: 13 additions & 1 deletion src/slurm_plugin/fleet_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
import boto3
from botocore.exceptions import ClientError
from common.ec2_utils import get_private_ip_address
from common.utils import remove_logging_filter, setup_logging_filter

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -154,16 +155,27 @@ def _evaluate_launch_params(self, count):
def _launch_instances(self, launch_params):
pass

def launch_ec2_instances(self, count):
def launch_ec2_instances(self, count, job_id=None):
"""
Launch EC2 instances.
:raises ClientError in case of failures with Boto3 calls (run_instances, create_fleet, describe_instances)
:raises FleetManagerException in case of missing required instance type info (e.g. private-ip) after 3 retries.
"""
job_id_logging_filter = None
if job_id:
# Setup custom logging filter
job_id_logging_filter = setup_logging_filter(logger, "JobID")
job_id_logging_filter.set_custom_value(job_id)

launch_params = self._evaluate_launch_params(count)
assigned_nodes = self._launch_instances(launch_params)
logger.debug("Launched the following instances: %s", assigned_nodes.get("Instances"))

if job_id_logging_filter:
# Tear down custom logging filter
remove_logging_filter(logger, job_id_logging_filter)

return [EC2Instance.from_describe_instance_data(instance_info) for instance_info in assigned_nodes["Instances"]]


Expand Down
4 changes: 2 additions & 2 deletions src/slurm_plugin/fleet_status_manager.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
from common.schedulers.slurm_commands import resume_powering_down_nodes, update_all_partitions
from slurm_plugin.clustermgtd import ComputeFleetStatus, ComputeFleetStatusManager
from slurm_plugin.common import log_exception
from slurm_plugin.instance_manager import InstanceManager
from slurm_plugin.instance_manager import InstanceManagerFactory
from slurm_plugin.slurm_resources import CONFIG_FILE_DIR, PartitionStatus

log = logging.getLogger(__name__)
Expand Down Expand Up @@ -98,7 +98,7 @@ def _start_partitions():
def _stop_partitions(config):
log.info("Setting slurm partitions to INACTIVE and terminating all compute nodes...")
update_all_partitions(PartitionStatus.INACTIVE, reset_node_addrs_hostname=True)
instance_manager = InstanceManager(
instance_manager = InstanceManagerFactory.get_manager(
config.region,
config.cluster_name,
config.boto3_config,
Expand Down
Loading

0 comments on commit 2397fd2

Please sign in to comment.