Skip to content

Commit

Permalink
Merge pull request #46 from carsdotcom/forge_patches
Browse files Browse the repository at this point in the history
Forge 1.1.0 Updates
  • Loading branch information
Macr0Nerd authored Apr 8, 2024
2 parents da71ef4 + f503e30 commit 72b69d8
Show file tree
Hide file tree
Showing 15 changed files with 109 additions and 89 deletions.
2 changes: 1 addition & 1 deletion .bumpversion.cfg
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
[bumpversion]
current_version = 1.0.2
current_version = 1.1.0
commit = True
tag = False
parse = (?P<major>\d+)\.(?P<minor>\d+)\.(?P<patch>\d+)
Expand Down
15 changes: 15 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -6,6 +6,21 @@ The format is based on [Keep a Changelog](https://keepachangelog.com/en/1.0.0/),

## [Unreleased]

## [1.1.0] - 2024-02-26
### Added
- **Create** - Added `destroy_on_create`
- **Create** - Added `create_timeout` option
- **Common** - Moved all `n_list` functions to `get_nlist()`
- **Dependencies** - Updated dependencies and tested on latest versions
- **Create** - Set default boto3 session at beginning of create to resolve region bug
- **Create**
- Multi-AZ functionality
- Spot retries
- On-demand Failover

### Changed
- **Create** - Configurable spot strategy
- **Documentation** - Updated with new changes

## [1.0.2] - 2022-10-27

Expand Down
1 change: 0 additions & 1 deletion README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,6 @@

[![GitHub license](https://img.shields.io/github/license/carsdotcom/cars-forge?color=navy&label=License&logo=License&style=flat-square)](https://github.com/carsdotcom/cars-forge/blob/main/LICENSE)
[![PyPI](https://img.shields.io/pypi/v/cars-forge?color=navy&style=flat-square)](https://pypi.org/project/cars-forge/)
![hacktoberfest](https://img.shields.io/github/issues/carsdotcom/cars-forge?color=orange&label=Hacktoberfest%202022&style=flat-square&?labelColor=black)
![PyPI - Downloads](https://img.shields.io/pypi/dm/cars-forge?color=navy&style=flat-square)
![GitHub Workflow Status (branch)](https://img.shields.io/github/workflow/status/carsdotcom/cars-forge/Publish%20Package/main?color=navy&style=flat-square)
![GitHub contributors](https://img.shields.io/github/contributors/carsdotcom/cars-forge?color=navy&style=flat-square)
Expand Down
25 changes: 19 additions & 6 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,8 +8,17 @@ readme = "README.md"
requires-python = ">=3.7"
license = "Apache-2.0"
authors = [
{name = "Nikhil Patel", email = "[email protected]"}
{name = "Nikhil Patel", email = "[email protected]"},
{name = "Gabriele Ron", email = "[email protected]"},
{name = "Joao Moreira", email = "[email protected]"}
]

maintainers = [
{name = "Nikhil Patel", email = "[email protected]"},
{name = "Gabriele Ron", email = "[email protected]"},
{name = "Joao Moreira", email = "[email protected]"}
]

keywords = [
"AWS",
"EC2",
Expand All @@ -19,6 +28,7 @@ keywords = [
"Cluster",
"Jupyter"
]

classifiers = [
"Development Status :: 5 - Production/Stable",
"Environment :: Console",
Expand All @@ -34,18 +44,21 @@ classifiers = [
"Programming Language :: Python :: 3.10",
"Programming Language :: Python :: 3.11",
]

dynamic = ["version"]

dependencies = [
"boto3~=1.19.0",
"pyyaml~=5.3.0",
"schema~=0.7.0",
"boto3",
"pyyaml",
"schema",
]

[project.optional-dependencies]
test = [
"pytest~=7.1.0",
"pytest-cov~=4.0"
"pytest",
"pytest-cov"
]

dev = [
"bump2version~=1.0",
]
Expand Down
2 changes: 1 addition & 1 deletion src/forge/__init__.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
__version__ = "1.0.2"
__version__ = "1.1.0"

# Default values for forge's essential arguments
DEFAULT_ARG_VALS = {
Expand Down
29 changes: 29 additions & 0 deletions src/forge/common.py
Original file line number Diff line number Diff line change
Expand Up @@ -144,6 +144,35 @@ def get_ip(details, states):
return [(i['ip'], i['id']) for i in list(filter(lambda x: x['state'] in states, details))]


def get_nlist(config):
"""get list of instance names based on service
Parameters
----------
config : dict
Forge configuration data
Returns
-------
list
List of instance names
"""
date = config.get('date', '')
market = config.get('market', DEFAULT_ARG_VALS['market'])
name = config['name']
service = config['service']

n_list = []
if service == "cluster":
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')
if config.get('rr_all'):
n_list.append(f'{name}-{market[-1]}-{service}-worker-{date}')
elif service == "single":
n_list.append(f'{name}-{market[0]}-{service}-{date}')

return n_list


@contextlib.contextmanager
def key_file(secret_id, region, profile):
"""Safely retrieve a secret file from AWS for temporary use.
Expand Down
1 change: 1 addition & 0 deletions src/forge/configure.py
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,7 @@ def check_env_yaml(env_yaml):
error='Invalid spot allocation strategy'),
Optional('on_demand_failover'): And(bool),
Optional('spot_retries'): And(Use(int), lambda x: x > 0),
Optional('create_timeout'): And(Use(int), lambda x: x > 0),
})
try:
validated = schema.validate(env_yaml)
Expand Down
21 changes: 17 additions & 4 deletions src/forge/create.py
Original file line number Diff line number Diff line change
Expand Up @@ -167,6 +167,12 @@ def create_status(n, request, config):
create_time = fleet_details.get('CreateTime')
time_without_spot = 0
while current_status != 'fulfilled':
if config.get('create_timeout') and t > config['create_timeout']:
logger.error('Timeout of %s seconds hit for instance fulfillment; Aborting.', config['create_timeout'])
if destroy_flag:
destroy(config)
exit_callback(config, exit=True)

if current_status == 'pending_fulfillment':
time.sleep(10)
t += 10
Expand Down Expand Up @@ -664,6 +670,12 @@ def search_and_create(config, task, instance_details):
e = detail[0]
if e['state'] in ['running', 'stopped', 'stopping', 'pending']:
logger.info('%s is %s, the IP is %s', task, e['state'], e['ip'])

if config.get('destroy_on_create'):
logger.info('destroy_on_create true, destroying fleet.')
destroy(config)
create_template(n, config, task)
create_fleet(n, config, task, instance_details)
else:
if len(e['fleet_id']) != 0:
logger.info('Fleet is running without EC2, will recreate it.')
Expand Down Expand Up @@ -703,11 +715,12 @@ def _check(x, i):
return x[i] if x and x[i:i + 1] else None

for task in task_list:
task_worker_count = worker_count
if 'cluster-master' in task or 'single' in task:
task_ram, task_cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 0), cpu=_check(cpu, 0), ratio=_check(ratio, 0))
worker_count = 1
task_worker_count = 1
elif 'cluster-worker' in task:
task_ram, task_cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 1), cpu=_check(cpu, 1), ratio=_check(ratio, 1), workers=worker_count)
task_ram, task_cpu, total_ram, ram2cpu_ratio = calc_machine_ranges(ram=_check(ram, 1), cpu=_check(cpu, 1), ratio=_check(ratio, 1), workers=task_worker_count)
else:
logger.error("'%s' does not seem to be a valid cluster or single job.", task)
if destroy_flag:
Expand All @@ -717,8 +730,8 @@ def _check(x, i):
logger.debug('%s OVERRIDE DETAILS | RAM: %s out of %s | CPU: %s with ratio of %s', task, ram, total_ram, cpu, ram2cpu_ratio)

instance_details[task] = {
'total_capacity': worker_count or total_ram,
'capacity_unit': 'units' if worker_count else 'memory-mib',
'total_capacity': task_worker_count or total_ram,
'capacity_unit': 'units' if task_worker_count else 'memory-mib',
'override_instance_stats': {
'MemoryMiB': {'Min': task_ram[0], 'Max': task_ram[1]},
'VCpuCount': {'Min': task_cpu[0], 'Max': task_cpu[1]},
Expand Down
1 change: 1 addition & 0 deletions src/forge/parser.py
Original file line number Diff line number Diff line change
Expand Up @@ -127,6 +127,7 @@ def add_job_args(parser):
common_grp.add_argument('--valid_time', '--valid-time', type=positive_int_arg)
common_grp.add_argument('--user_data', '--user-data', nargs='*')
common_grp.add_argument('--gpu', action='store_true', dest='gpu_flag')
common_grp.add_argument('--destroy_on_create', '--destroy-on-create', action='store_true', default=None)


def add_action_args(parser):
Expand Down
15 changes: 2 additions & 13 deletions src/forge/rsync.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import DEFAULT_ARG_VALS, REQUIRED_ARGS
from .parser import add_basic_args, add_general_args, add_env_args, add_action_args
from .common import ec2_ip, key_file, get_ip, exit_callback
from .common import ec2_ip, key_file, get_ip, get_nlist, exit_callback

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -40,11 +40,6 @@ def rsync(config):
config : dict
Forge configuration data
"""
name = config.get('name')
date = config.get('date', '')
market = config.get('market', DEFAULT_ARG_VALS['market'])
service = config.get('service')
rr_all = config.get('rr_all')

def _rsync(config, ip):
"""performs the rsync to a given ip
Expand Down Expand Up @@ -84,13 +79,7 @@ def _rsync(config, ip):
else:
logger.info('Rsync successful:\n%s', output)

n_list = []
if service == "cluster":
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')
if rr_all:
n_list.append(f'{name}-{market[-1]}-{service}-worker-{date}')
elif service == "single":
n_list.append(f'{name}-{market[0]}-{service}-{date}')
n_list = get_nlist(config)

for n in n_list:
try:
Expand Down
11 changes: 2 additions & 9 deletions src/forge/run.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@
from . import DEFAULT_ARG_VALS, REQUIRED_ARGS
from .exceptions import ExitHandlerException
from .parser import add_basic_args, add_general_args, add_env_args, add_action_args
from .common import ec2_ip, key_file, get_ip, destroy_hook, user_accessible_vars, FormatEmpty, exit_callback
from .common import ec2_ip, key_file, get_ip, destroy_hook, user_accessible_vars, FormatEmpty, exit_callback, get_nlist
from .destroy import destroy

logger = logging.getLogger(__name__)
Expand Down Expand Up @@ -77,7 +77,6 @@ def _run(config, ip):
pem_secret = config['forge_pem_secret']
region = config['region']
profile = config.get('aws_profile')
env = config['forge_env']

with key_file(pem_secret, region, profile) as pem_path:
fmt = FormatEmpty()
Expand All @@ -94,13 +93,7 @@ def _run(config, ip):
)
return exc.returncode

n_list = []
if service == "cluster":
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')
if rr_all:
n_list.append(f'{name}-{market[-1]}-{service}-worker-{date}')
elif service == "single":
n_list.append(f'{name}-{market[0]}-{service}-{date}')
n_list = get_nlist(config)

for n in n_list:
try:
Expand Down
26 changes: 6 additions & 20 deletions src/forge/start.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import REQUIRED_ARGS
from .parser import add_basic_args, add_general_args, add_env_args
from .common import ec2_ip, get_ip, set_boto_session
from .common import ec2_ip, get_ip, set_boto_session, get_nlist

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -75,25 +75,11 @@ def start(config):
config : dict
Forge configuration data
"""
name = config['name']
date = config.get('date', '')
service = config['service']
market = config.get('market')

n_list = []
if service == "cluster":
if market[0] == 'spot':
logger.error('Master is a spot instance; you cannot start a spot instance')
elif market[0] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')

if market[-1] == 'spot':
logger.error('Worker is a spot fleet; you cannot start a spot fleet')
elif market[-1] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-worker-{date}')
elif service == "single":
if market[0] == 'spot':
logger.error('The instance is a spot instance; you cannot start a spot instance')
elif market[0] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-{date}')
if 'spot' in market:
logger.error('Master or worker is a spot instance; you cannot start a spot instance')
# sys.exit(1) # ToDo: Should we change the tests to reflect an exit or allow it to continue?

n_list = get_nlist({**config, 'rr_all': True})
start_fleet(n_list, config)
27 changes: 7 additions & 20 deletions src/forge/stop.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@

from . import REQUIRED_ARGS
from .parser import add_basic_args, add_general_args, add_env_args
from .common import ec2_ip, get_ip, set_boto_session
from .common import ec2_ip, get_ip, set_boto_session, get_nlist

logger = logging.getLogger(__name__)

Expand Down Expand Up @@ -70,25 +70,12 @@ def stop(config):
config : dict
Forge configuration data
"""
name = config['name']
date = config.get('date', '')
service = config['service']
market = config.get('market')

n_list = []
if service == "cluster":
if market[0] == 'spot':
logger.error('Master is a spot instance; you cannot stop a spot instance')
elif market[0] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-master-{date}')

if market[-1] == 'spot':
logger.error('Worker is a spot fleet; you cannot stop a spot fleet')
elif market[-1] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-worker-{date}')
elif service == "single":
if market[0] == 'spot':
logger.error('The instance is a spot instance; you cannot stop a spot instance')
elif market[0] == 'on-demand':
n_list.append(f'{name}-{market[0]}-{service}-{date}')
if 'spot' in market:
logger.error('Master or worker is a spot instance; you cannot stop a spot instance')
# sys.exit(1) # ToDo: Should we change the tests to reflect an exit or allow it to continue?

n_list = get_nlist({**config, 'rr_all': True})

stop_fleet(n_list, config)
12 changes: 6 additions & 6 deletions src/forge/yaml_loader.py
Original file line number Diff line number Diff line change
Expand Up @@ -135,6 +135,7 @@ def _get_type(x):
Optional('cpu'): And(list, error='Invalid CPU cores'),
Optional('destroy_after_success'): And(bool),
Optional('destroy_after_failure'): And(bool),
Optional('destroy_on_create'): And(bool),
Optional('disk'): And(Use(int), positive_int),
Optional('disk_device_name'): And(str, len, error='Invalid Device Name'),
Optional('forge_env'): And(str, len, error='Invalid Environment'),
Expand Down Expand Up @@ -191,7 +192,7 @@ def load_config(args):

logger.info('Checking config file: %s', args['yaml'])
logger.debug('Required User config is %s', user_config)
env = args['forge_env'] or user_config.get('forge_env')
env = args.get('forge_env') or user_config.get('forge_env')

if env is None:
logger.error("'forge_env' variable required.")
Expand All @@ -218,12 +219,11 @@ def load_config(args):
env_config.update(normalize_config(env_config))
check_keys(env_config['region'], env_config.get('aws_profile'))

additional_config_data = env_config.pop('additional_config', None)
additional_config_data = env_config.pop('additional_config', [])
additional_config = []
if additional_config_data:
for i in additional_config_data:
ADDITIONAL_KEYS.append(i['name'])
additional_config.append(i)
for i in additional_config_data:
ADDITIONAL_KEYS.append(i['name'])
additional_config.append(i)

logger.debug('Additional config options: %s', additional_config)

Expand Down
Loading

0 comments on commit 72b69d8

Please sign in to comment.