Skip to content

Commit

Permalink
use mixin class for osu
Browse files Browse the repository at this point in the history
  • Loading branch information
Samuel Moors committed Dec 30, 2024
1 parent 82bd926 commit 40dae87
Show file tree
Hide file tree
Showing 2 changed files with 69 additions and 172 deletions.
6 changes: 4 additions & 2 deletions eessi/testsuite/eessi_mixin.py
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ class EESSI_Mixin(RegressionMixin):
scale = parameter(SCALES.keys())
bench_name = None
bench_name_ci = None
num_tasks_per_compute_unit = 1

# Create ReFrame variables for logging runtime environment information
cvmfs_repo_name = variable(str, value='None')
Expand Down Expand Up @@ -118,7 +119,7 @@ def run_after_init(self):
# Set scales as tags
hooks.set_tag_scale(self)

@run_after('init')
@run_before('setup', always_last=True)
def measure_mem_usage(self):
if self.measure_memory_usage:
hooks.measure_memory_usage(self)
Expand Down Expand Up @@ -163,7 +164,8 @@ def validate_setup(self):
@run_after('setup')
def assign_tasks_per_compute_unit(self):
"""Call hooks to assign tasks per compute unit, set OMP_NUM_THREADS, and set compact process binding"""
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.compute_unit)
hooks.assign_tasks_per_compute_unit(test=self, compute_unit=self.compute_unit,
num_per=self.num_tasks_per_compute_unit)

# Set OMP_NUM_THREADS environment variable
hooks.set_omp_num_threads(self)
Expand Down
235 changes: 65 additions & 170 deletions eessi/testsuite/tests/apps/osu.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,19 +6,20 @@
non-GPU nodes. Otherwise those tests will FAIL.
"""
import reframe as rfm
from reframe.core.builtins import parameter, run_after # added only to make the linter happy
from reframe.core.builtins import parameter, run_after
from reframe.utility import reframe

from hpctestlib.microbenchmarks.mpi.osu import osu_benchmark

from eessi.testsuite import hooks, utils
from eessi.testsuite.constants import *
from eessi.testsuite.constants import COMPUTE_UNIT, CPU, DEVICE_TYPES, INVALID_SYSTEM, GPU, NODE, SCALES
from eessi.testsuite.eessi_mixin import EESSI_Mixin
from eessi.testsuite.utils import find_modules, log


def filter_scales_pt2pt():
"""
Filtering function for filtering scales for the pt2pt OSU test
returns all scales with either 2 cores, 1 full node, or 2 full nodes
"""
return [
k for (k, v) in SCALES.items()
Expand All @@ -30,7 +31,8 @@ def filter_scales_pt2pt():

def filter_scales_coll():
"""
Filtering function for filtering scales for collective the OSU test
Filtering function for filtering scales for the collective OSU test
returns all scales with at least 2 cores
"""
return [
k for (k, v) in SCALES.items()
Expand All @@ -40,24 +42,23 @@ def filter_scales_coll():


@rfm.simple_test
class EESSI_OSU_Micro_Benchmarks_pt2pt(osu_benchmark):
''' Run-only OSU test '''
scale = parameter(filter_scales_pt2pt())
valid_prog_environs = ['default']
valid_systems = ['*']
class EESSI_OSU_Base(osu_benchmark, EESSI_Mixin):
""" base class for OSU tests """
time_limit = '30m'
module_name = parameter(find_modules('OSU-Micro-Benchmarks'))
# Device type for non-cuda OSU-Micro-Benchmarks should run on hosts of both node types. To do this the default
# device type is set to GPU.
device_type = parameter([DEVICE_TYPES[CPU], DEVICE_TYPES[GPU]])
# unset num_tasks_per_node from the hpctestlib.

# reset num_tasks_per_node from the hpctestlib: we handle it ourselves
num_tasks_per_node = None

# Set num_warmup_iters to 5 to reduce execution time, especially on slower interconnects
num_warmup_iters = 5
# Set num_iters to 10 to reduce execution time, especially on slower interconnects
num_iters = 10

def required_mem_per_node(self):
return 1024

@run_after('init')
def filter_scales_2gpus(self):
"""Filter out scales with < 2 GPUs if running on GPUs"""
Expand All @@ -69,26 +70,6 @@ def filter_scales_2gpus(self):
self.valid_systems = [INVALID_SYSTEM]
log(f'valid_systems set to {self.valid_systems} for scale {self.scale} and device_type {self.device_type}')

@run_after('init')
def filter_benchmark_pt2pt(self):
""" Filter out all non-mpi.pt2pt benchmarks """
if not self.benchmark_info[0].startswith('mpi.pt2pt'):
self.valid_systems = [INVALID_SYSTEM]

@run_after('init')
def run_after_init(self):
"""hooks to run after init phase"""

# Filter on which scales are supported by the partitions defined in the ReFrame configuration
hooks.filter_supported_scales(self)

hooks.filter_valid_systems_by_device_type(self, required_device_type=self.device_type)

hooks.set_modules(self)

# Set scales as tags
hooks.set_tag_scale(self)

@run_after('init')
def set_device_buffers(self):
"""
Expand All @@ -98,32 +79,46 @@ def set_device_buffers(self):
"""
if self.device_type == DEVICE_TYPES[GPU]:
self.device_buffers = 'cuda'

else:
# If the device_type is CPU then device_buffers should always be CPU.
self.device_buffers = 'cpu'

@run_after('init')
def set_tag_ci(self):
""" Setting tests under CI tag. """
if (self.benchmark_info[0] in ['mpi.pt2pt.osu_latency', 'mpi.pt2pt.osu_bw']):
self.tags.add('CI')
log(f'tags set to {self.tags}')
def set_tags(self):
""" Setting custom tags """
self.bench_name = self.benchmark_info[0]
self.tags.add(self.bench_name.split('.')[-1])

@run_after('setup', always_last=True)
def skip_test_1gpu(self):
if self.device_type == DEVICE_TYPES[GPU]:
num_gpus = self.num_gpus_per_node * self.num_nodes
self.skip_if(num_gpus < 2, "Skipping GPU test : only 1 GPU available for this test case")

if (self.benchmark_info[0] == 'mpi.pt2pt.osu_bw'):
self.tags.add('osu_bw')

if (self.benchmark_info[0] == 'mpi.pt2pt.osu_latency'):
self.tags.add('osu_latency')
@rfm.simple_test
class EESSI_OSU_Micro_Benchmarks_pt2pt(EESSI_OSU_Base):
''' point-to-point OSU test '''
scale = parameter(filter_scales_pt2pt())
compute_unit = COMPUTE_UNIT[NODE]

@run_after('init')
def set_mem(self):
""" Setting an extra job option of memory. This test has only 4 possibilities: 1_node, 2_nodes, 2_cores and
1cpn_2nodes. This is implemented for all cases including full node cases. The requested memory may seem large
and the test requires at least 4.5 GB per core at the minimum for the full test when run with validation (-c
option for osu_bw or osu_latency). We run till message size 8 (-m 8) which significantly reduces memory
requirement."""
self.extra_resources = {'memory': {'size': '12GB'}}
def filter_benchmark_pt2pt(self):
""" Filter out all non-mpi.pt2pt benchmarks """
if not self.benchmark_info[0].startswith('mpi.pt2pt'):
self.valid_systems = [INVALID_SYSTEM]

@run_after('init')
def select_ci(self):
" Select the CI variants "
if (self.bench_name in ['mpi.pt2pt.osu_latency', 'mpi.pt2pt.osu_bw']):
self.bench_name_ci = self.bench_name

@run_after('init')
def set_num_tasks_per_compute_unit(self):
""" Setting number of tasks per compute unit and cpus per task. This sets num_cpus_per_task
for 1 node and 2 node options where the request is for full nodes."""
if SCALES.get(self.scale).get('num_nodes') == 1:
self.num_tasks_per_compute_unit = 2

@run_after('setup')
def adjust_executable_opts(self):
Expand All @@ -132,134 +127,34 @@ def adjust_executable_opts(self):
Therefore we must override it *after* the 'setup' phase
"""
if self.device_type == DEVICE_TYPES[CPU]:
self.executable_opts = [ele for ele in self.executable_opts if ele != 'D']

@run_after('setup')
def set_num_tasks_per_node(self):
""" Setting number of tasks per node and cpus per task in this function. This function sets num_cpus_per_task
for 1 node and 2 node options where the request is for full nodes."""
if SCALES.get(self.scale).get('num_nodes') == 1:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE], 2)
else:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE])

@run_after('setup')
def set_num_gpus_per_node(self):
"""
Set number of GPUs per node for GPU-to-GPU tests
"""
if self.device_type == DEVICE_TYPES[GPU]:
# Skip single-node tests with less than 2 GPU devices in the node
self.skip_if(
SCALES[self.scale]['num_nodes'] == 1 and self.default_num_gpus_per_node < 2,
"There are < 2 GPU devices present in the node."
f" Skipping tests with device_type={DEVICE_TYPES[GPU]} involving < 2 GPUs and 1 node."
)
if not self.num_gpus_per_node:
self.num_gpus_per_node = self.default_num_gpus_per_node
log(f'num_gpus_per_node set to {self.num_gpus_per_node} for partition {self.current_partition.name}')
self.executable_opts = [x for x in self.executable_opts if x != 'D']


@rfm.simple_test
class EESSI_OSU_Micro_Benchmarks_coll(osu_benchmark):
''' Run-only OSU test '''
class EESSI_OSU_Micro_Benchmarks_coll(EESSI_OSU_Base):
''' collective OSU test '''
scale = parameter(filter_scales_coll())
valid_prog_environs = ['default']
valid_systems = ['*']
time_limit = '30m'
module_name = parameter(utils.find_modules('OSU-Micro-Benchmarks'))
# Device type for non-cuda OSU-Micro-Benchmarks should run on hosts of both node types. To do this the default
# device type is set to GPU.
device_type = parameter([DEVICE_TYPES[CPU], DEVICE_TYPES[GPU]])
# Unset num_tasks_per_node from hpctestlib
num_tasks_per_node = None

# Set num_warmup_iters to 5 to reduce execution time, especially on slower interconnects
num_warmup_iters = 5
# Set num_iters to 10 to reduce execution time, especially on slower interconnects
num_iters = 10

@run_after('init')
def run_after_init(self):
"""hooks to run after init phase"""
# Note: device_buffers variable is inherited from the hpctestlib class and adds options to the launcher
# commands based on what device is set.
self.device_buffers = 'cpu'
# Filter on which scales are supported by the partitions defined in the ReFrame configuration
hooks.filter_supported_scales(self)
hooks.filter_valid_systems_by_device_type(self, required_device_type=self.device_type)
is_cuda_module = utils.is_cuda_required_module(self.module_name)
if is_cuda_module and self.device_type == DEVICE_TYPES[GPU]:
self.device_buffers = 'cuda'

# If the device_type is CPU then device buffer should always be CPU.
if self.device_type == DEVICE_TYPES[CPU]:
self.device_buffers = 'cpu'
# This part of the code removes the collective communication calls out of the run list since this test is only
# meant for collective.
def filter_benchmark_coll(self):
""" Filter out all non-mpi.collective benchmarks """
if not self.benchmark_info[0].startswith('mpi.collective'):
self.valid_systems = []
hooks.set_modules(self)

@run_after('init')
def set_tag_ci(self):
if (self.benchmark_info[0] == 'mpi.collective.osu_allreduce'
or self.benchmark_info[0] == 'mpi.collective.osu_alltoall'):
self.tags.add('CI')
if (self.benchmark_info[0] == 'mpi.collective.osu_allreduce'):
self.tags.add('osu_allreduce')
if (self.benchmark_info[0] == 'mpi.collective.osu_alltoall'):
self.tags.add('osu_alltoall')
self.valid_systems = [INVALID_SYSTEM]

@run_after('init')
def set_mem(self):
""" Setting an extra job option of memory. The alltoall operation takes maximum memory of 0.1 GB per core for a
message size of 8 and almost 0.5 GB per core for the maximum message size the test allows. But we limit the
message sizes to 8 and for a safety net we take 64 GB assuming dense nodes works for all the tests and node
types."""
self.extra_resources = {'memory': {'size': '64GB'}}
def select_ci(self):
" Select the CI variants "
if (self.bench_name in ['mpi.collective.osu_allreduce', 'mpi.collective.osu_alltoall']):
self.bench_name_ci = self.bench_name

@run_after('init')
def set_num_tasks(self):
hooks.set_tag_scale(self)

@run_after('setup')
def set_num_tasks_per_node(self):
""" Setting number of tasks per node, cpus per task and gpus per node in this function. This function sets
num_cpus_per_task for 1 node and 2 node options where the request is for full nodes."""
max_avail_cpus_per_node = self.current_partition.processor.num_cpus
if self.device_buffers == 'cpu':
# Setting num_tasks and num_tasks_per_node for the CPU tests
if SCALES.get(self.scale).get('num_cpus_per_node', 0):
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE],
self.default_num_cpus_per_node)
elif SCALES.get(self.scale).get('node_part', 0):
pass_num_per = int(max_avail_cpus_per_node / SCALES.get(self.scale).get('node_part', 0))
if pass_num_per > 1:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT[NODE], pass_num_per)
else:
self.skip(msg="Too few cores available for a collective operation.")

if FEATURES[GPU] in self.current_partition.features:
max_avail_gpus_per_node = utils.get_max_avail_gpus_per_node(self)
# Setting number of GPU for a cpu test on a GPU node.
if SCALES.get(self.scale).get('num_nodes') == 1:
self.num_gpus_per_node = 1
else:
self.num_gpus_per_node = max_avail_gpus_per_node
elif self.device_buffers == 'cuda':
max_avail_gpus_per_node = utils.get_max_avail_gpus_per_node(self)
# Setting num_tasks and num_tasks_per_node for the GPU tests
if max_avail_gpus_per_node == 1 and SCALES.get(self.scale).get('num_nodes') == 1:
self.skip(msg="There is only 1 device in the node. Skipping collective tests involving only 1 node.")
else:
if SCALES.get(self.scale).get('num_gpus_per_node', 0) * SCALES.get(self.scale).get('num_nodes', 0) > 1:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT.get(GPU, FEATURES[GPU]))
elif SCALES.get(self.scale).get('node_part', 0):
pass_num_per = int(max_avail_gpus_per_node / SCALES.get(self.scale).get('node_part', 0))
if pass_num_per > 1:
hooks.assign_tasks_per_compute_unit(self, COMPUTE_UNIT.get(GPU, FEATURES[GPU]))
else:
self.skip(msg="Total GPUs (max_avail_gpus_per_node / node_part) is 1 less.")
else:
self.skip(msg="Total GPUs (num_nodes * num_gpus_per_node) = 1")
def set_compute_unit(self):
"""
Set the compute unit to which tasks will be assigned:
one task per core for CPU runs, and one task per GPU for GPU runs.
"""
device_to_compute_unit = {
DEVICE_TYPES[CPU]: COMPUTE_UNIT[CPU],
DEVICE_TYPES[GPU]: COMPUTE_UNIT[GPU],
}
self.compute_unit = device_to_compute_unit.get(self.device_type)

0 comments on commit 40dae87

Please sign in to comment.