Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Distribution of detector blocks across MPI processes #334

Open
wants to merge 11 commits into
base: master
Choose a base branch
from
Open
7 changes: 5 additions & 2 deletions litebird_sim/detectors.py
anand-avinash marked this conversation as resolved.
Show resolved Hide resolved
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,9 @@ class DetectorInfo:

- channel (Union[str, None]): The channel. The default is None

- squid (Union[int, None]): The squid number of the detector.
The default value is None.

- sampling_rate_hz (float): The sampling rate of the ADC
associated with this detector. The default is 0.0

Expand Down Expand Up @@ -136,6 +139,7 @@ class DetectorInfo:
pixel: Union[int, None] = None
pixtype: Union[str, None] = None
channel: Union[str, None] = None
squid: Union[int, None] = None
sampling_rate_hz: float = 0.0
fwhm_arcmin: float = 0.0
ellipticity: float = 0.0
Expand All @@ -148,8 +152,6 @@ class DetectorInfo:
fknee_mhz: float = 0.0
fmin_hz: float = 0.0
alpha: float = 0.0
bandcenter_ghz: float = 0.0
bandwidth_ghz: float = 0.0
pol: Union[str, None] = None
orient: Union[str, None] = None
quat: Any = None
Expand All @@ -175,6 +177,7 @@ def from_dict(dictionary: Dict[str, Any]):
- ``pixel``
- ``pixtype``
- ``channel``
- ``squid``
- ``bandcenter_ghz``
- ``bandwidth_ghz``
- ``band_freqs_ghz``
Expand Down
39 changes: 39 additions & 0 deletions litebird_sim/distribute.py
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,45 @@ def distribute_evenly(num_of_elements, num_of_groups):
return result


def distribute_detector_blocks(detector_blocks):
"""Similar to the function :func:`distribute_evenly()`, this function returns the named-tuples of the starting index of the detectors in a group
with respect to the global list of detectors and the number of detectors
in the group. Unlike the :func:`distribute_evenly()`, this function simply
uses the detector groups given in `detector_blocks` attribute.
anand-avinash marked this conversation as resolved.
Show resolved Hide resolved

Example:
Following the example given in
:meth:`litebird_sim.Observation._make_detector_blocks()`,
`distribute_detector_blocks()` will return

```
[
Span(start_idx=0, num_of_elements=2),
Span(start_idx=2, num_of_elements=2),
Span(start_idx=4, num_of_elements=1),
]
```

Args:
detector_blocks (dict): The detector block object. See :meth:`litebird_sim.Observation._make_detector_blocks()`.

Returns:
A list of 2-elements named-tuples containing (1) the starting index of
the detectors of the block with respect to the flatten list of entire
detector blocks and (2) the number of elements in the detector block.
"""
cur_position = 0
prev_length = 0
result = []
for key in detector_blocks:
cur_length = len(detector_blocks[key])
cur_position += prev_length
prev_length = cur_length
result.append(Span(start_idx=cur_position, num_of_elements=cur_length))

return result


# The following implementation of the painter's partition problem is
# heavily inspired by the code at
# https://www.geeksforgeeks.org/painters-partition-problem-set-2/?ref=rp
Expand Down
144 changes: 130 additions & 14 deletions litebird_sim/observations.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,10 @@
import numpy as np
import numpy.typing as npt

from collections import defaultdict

from .coordinates import DEFAULT_TIME_SCALE
from .distribute import distribute_evenly
from .distribute import distribute_evenly, distribute_detector_blocks


@dataclass
Expand Down Expand Up @@ -80,11 +82,19 @@ class Observation:

sampling_rate_hz (float): The sampling frequency, in Hertz.

det_blocks_attributes (list of strings): The list of detector attributes that
will be used to divide detector axis of the tod (and all their attributes).
For example, with ``det_blocks_attributes = ["wafer", "pixel"]``, the
detectors will be divided into the blocks such that all detectors in a
block will have same ``wafer`` and ``pixel`` attribute.
anand-avinash marked this conversation as resolved.
Show resolved Hide resolved

n_blocks_det (int): divide the detector axis of the tod (and all the
arrays of detector attributes) in `n_blocks_det` blocks
arrays of detector attributes) in `n_blocks_det` blocks. Will be ignored
if ``det_blocks_attributes`` is not `None`.
anand-avinash marked this conversation as resolved.
Show resolved Hide resolved

n_blocks_time (int): divide the time axis of the tod in
`n_blocks_time` blocks
`n_blocks_time` blocks. Will be ignored if ``det_blocks_attributes``
is not `None`.
anand-avinash marked this conversation as resolved.
Show resolved Hide resolved

comm: either `None` (do not use MPI) or a MPI communicator
object, like `mpi4py.MPI.COMM_WORLD`. Its size is required to be at
Expand All @@ -103,6 +113,7 @@ def __init__(
sampling_rate_hz: float,
allocate_tod=True,
tods=None,
det_blocks_attributes: Union[List[str], None] = None,
n_blocks_det=1,
n_blocks_time=1,
comm=None,
Expand All @@ -123,6 +134,10 @@ def __init__(
delta = 1.0 / sampling_rate_hz
self.end_time_global = start_time_global + n_samples_global * delta

self._sampling_rate_hz = sampling_rate_hz
self._det_blocks_attributes = det_blocks_attributes
self.detector_blocks = None

if isinstance(detectors, int):
self._n_detectors_global = detectors
else:
Expand All @@ -131,9 +146,12 @@ def __init__(
else:
self._n_detectors_global = len(detectors)

self._sampling_rate_hz = sampling_rate_hz
if self._det_blocks_attributes is not None and comm.size > 1:
n_blocks_det, n_blocks_time = self._make_detector_blocks(
detectors, comm
)

# Neme of the attributes that store an array with the value of a
# Name of the attributes that store an array with the value of a
# property for each of the (local) detectors
self._attr_det_names = []
self._check_blocks(n_blocks_det, n_blocks_time)
Expand All @@ -159,8 +177,17 @@ def __init__(
setattr(self, cur_tod.name, None)

self.setattr_det_global("det_idx", np.arange(self._n_detectors_global), root)

self.detectors_global = []

if self.detector_blocks is not None:
for key in self.detector_blocks:
self.detectors_global += self.detector_blocks[key]
else:
self.detectors_global = detectors

if not isinstance(detectors, int):
self._set_attributes_from_list_of_dict(detectors, root)
self._set_attributes_from_list_of_dict(self.detectors_global, root)

(
self.start_time,
Expand Down Expand Up @@ -203,7 +230,7 @@ def _get_local_start_time_start_and_n_samples(self):
return self.start_time_global + start * delta, start, num

def _set_attributes_from_list_of_dict(self, list_of_dict, root):
assert len(list_of_dict) == self.n_detectors_global
np.testing.assert_equal(len(list_of_dict), self.n_detectors_global)

# Turn list of dict into dict of arrays
if not self.comm or self.comm.rank == root:
Expand Down Expand Up @@ -273,10 +300,86 @@ def n_blocks_time(self):
def n_blocks_det(self):
return self._n_blocks_det

def _make_detector_blocks(self, detectors, comm):
"""This function distributes the detectors in groups such that each
group has same set of attributes specified by the strings in
`self._det_block_attributes`. Once the groups are made, number of
detector blocks is set to be the total number of detector groups
whereas the number of time blocks is computed using the number of
detector blocks and the size of `comm` communicator.

The blocks of detectors are stored in `self.detector_blocks`. It is a
dictionary object with the tuple of `self._det_blocks_attributes` values
as dictionary keys and the list of detectors corresponding to the key
as the dictionary value. This dictionary is sorted so that that the
group with largest number of detectors comes first and the one with
the least number of detectors, comes last.
anand-avinash marked this conversation as resolved.
Show resolved Hide resolved

Example:
For

```
detectors = [
"000_002_123_xx_140_x",
"000_005_321_xx_140_x",
"000_004_456_xx_119_x",
"000_002_654_xx_140_x",
"000_004_789_xx_119_x",
]
```

and `self._det_blocks_attributes = ["channel", "wafer"]`,
`_make_detector_blocks()` will set

```
self.detector_blocks = {
("140", "L02"): ["000_002_123_xx_140_x", "000_002_654_xx_140_x"],
("119", "L04"): ["000_004_456_xx_119_x", "000_004_789_xx_119_x"],
("140", "L05"): ["000_005_321_xx_140_x"],
}
```

and return `n_blocks_det = 3`

Args:
detectors (List[dict]): List of detectors

comm: The MPI communicator

Returns:
n_blocks_det (int): Number of detector blocks

n_blocks_time (int): Number of time blocks

"""
self.detector_blocks = defaultdict(list)
for det in detectors:
key = tuple(det[attribute] for attribute in self._det_blocks_attributes)
self.detector_blocks[key].append(det)

self.detector_blocks = dict(
sorted(
self.detector_blocks.items(),
key=lambda item: len(item[1]),
reverse=True,
)
)
n_blocks_det = len(self.detector_blocks)
n_blocks_time = comm.size // n_blocks_det

return n_blocks_det, n_blocks_time

def _check_blocks(self, n_blocks_det, n_blocks_time):
if self.comm is None:
if n_blocks_det != 1 or n_blocks_time != 1:
raise ValueError("Only one block allowed without an MPI comm")
elif n_blocks_det == 0 or n_blocks_time == 0:
raise ValueError(
"The number of detector blocks and the number of time blocks "
"must be must be non-zero\n"
f"n_blocks_det = {n_blocks_det}, "
f"n_blocks_time = {n_blocks_time}"
)
elif n_blocks_det > self.n_detectors_global:
raise ValueError(
"You can not have more detector blocks than detectors "
Expand All @@ -296,21 +399,33 @@ def _check_blocks(self, n_blocks_det, n_blocks_time):

def _get_start_and_num(self, n_blocks_det, n_blocks_time):
"""For both detectors and time, returns the starting (global)
index and lenght of each block if the number of blocks is changed to the
index and length of each block if the number of blocks is changed to the
values passed as arguments
"""
det_start, det_n = np.array(
[
[span.start_idx, span.num_of_elements]
for span in distribute_evenly(self._n_detectors_global, n_blocks_det)
]
).T
if self._det_blocks_attributes is None or self.comm.size == 1:
det_start, det_n = np.array(
[
[span.start_idx, span.num_of_elements]
for span in distribute_evenly(
self._n_detectors_global, n_blocks_det
)
]
).T
else:
det_start, det_n = np.array(
[
[span.start_idx, span.num_of_elements]
for span in distribute_detector_blocks(self.detector_blocks)
]
).T

time_start, time_n = np.array(
[
[span.start_idx, span.num_of_elements]
for span in distribute_evenly(self._n_samples_global, n_blocks_time)
]
).T

return (
np.array(det_start),
np.array(det_n),
Expand All @@ -327,6 +442,7 @@ def _get_tod_shape(self, n_blocks_det, n_blocks_time):
return (self._n_detectors_global, self._n_samples_global)

_, det_n, _, time_n = self._get_start_and_num(n_blocks_det, n_blocks_time)

try:
return (
det_n[self.comm.rank // n_blocks_time],
Expand Down
9 changes: 8 additions & 1 deletion litebird_sim/simulations.py
Original file line number Diff line number Diff line change
Expand Up @@ -889,6 +889,7 @@ def create_observations(
detectors: List[DetectorInfo],
num_of_obs_per_detector: int = 1,
split_list_over_processes=True,
det_blocks_attributes: Union[List[str], None] = None,
n_blocks_det=1,
n_blocks_time=1,
root=0,
Expand Down Expand Up @@ -922,7 +923,12 @@ def create_observations(
simulating 10 detectors and you specify ``n_blocks_det=5``,
this means that each observation will handle ``10 / 5 = 2``
detectors. The default is that *all* the detectors be kept
together (``n_blocks_det=1``).
together (``n_blocks_det=1``). On the other hand, the parameter
`det_blocks_attributes` specifies the list of detector attributes
to be used to create the groups of detectors. For example, with
``det_blocks_attributes = ["wafer", "pixel"]``, the detectors will
be divided into the groups such that all detectors in a group will
have same ``wafer`` and ``pixel`` attribute.
anand-avinash marked this conversation as resolved.
Show resolved Hide resolved

The parameter `n_blocks_time` specifies the number of time
splits of the observations. In the case of a 3-month-long
Expand Down Expand Up @@ -1013,6 +1019,7 @@ def create_observations(
start_time_global=cur_time,
sampling_rate_hz=sampfreq_hz,
n_samples_global=nsamples,
det_blocks_attributes=det_blocks_attributes,
n_blocks_det=n_blocks_det,
n_blocks_time=n_blocks_time,
comm=(None if split_list_over_processes else self.mpi_comm),
Expand Down
Loading