Skip to content

Commit

Permalink
ENH: Make the ARMCPT swapping function a dedicated parameter
Browse files Browse the repository at this point in the history
  • Loading branch information
Bas van Beek committed Nov 12, 2020
1 parent bf10b0d commit fc7845e
Show file tree
Hide file tree
Showing 2 changed files with 83 additions and 63 deletions.
145 changes: 82 additions & 63 deletions FOX/armc/armc_pt.py
Original file line number Diff line number Diff line change
Expand Up @@ -13,8 +13,9 @@
""" # noqa: E501

from logging import Logger
from typing import (
Tuple, Dict, Mapping, Iterable, List, Sequence, overload, Any, TYPE_CHECKING
Tuple, Dict, Mapping, Iterable, List, Sequence, overload, Any, TYPE_CHECKING, Callable
)

import numpy as np
Expand All @@ -37,22 +38,89 @@
MolIter = Iterable[MultiMolecule]

Key = Tuple[float, ...]
SwapFunc = Callable[[np.ndarray, "ARMCPT"], Iterable[Tuple[int, int]]]


def swap_random(acceptance: np.ndarray, armcpt: "ARMCPT",
weighted: bool = False) -> List[Tuple[int, int]]:
r"""Swap the :math:`\phi` and move range of two between forcefield parameter sets.
The two main parameters are the acceptance rate
:math:`\boldsymbol{\alpha} \in \mathbb{R}^{(n,)}` and target acceptance rate
:math:`\boldsymbol{\alpha^{t}} \in \mathbb{R}^{(n,)}`,
both vector's elements being the :math:`[0,1]` interval.
The (weighted) probability distribution :math:`\boldsymbol{p} \in \mathbb{R}^{(n,)}` is
consequently used for identifying which two forcefield parameter sets
are swapped.
.. math::
\hat{\boldsymbol{p}} = |\boldsymbol{\alpha} - \boldsymbol{\alpha^{t}}|^{-1}
\boldsymbol{p} = \frac{\hat{\boldsymbol{p}}}{\sum^n_{i} {\hat{\boldsymbol{p}}}_{i}}
Parameters
----------
acceptance : :class:`numpy.ndarray` [:class:`bool`], shape :math:`(n, m)`
A 2D boolean array with acceptance rates over
the course of the last super-iteration.
armc : :class:`FOX.armc.ARMCPT`
An ARMCPT instance.
weighted : :class:`bool`
If ``True``, identify the to-be swapped parameters via a
weighted probability distribution.
Returns
-------
:data:`List[Tuple[int, int]]<typing.List>`
A list of 2-tuples with to-be swapped indices.
"""
self = armcpt
if weighted:
_p = acceptance.mean(axis=0) - self.phi.a_target
if 0 in _p:
p = np.zeros_like(_p)
p[_p == 0] = True
else:
p = _p**-1
else:
p = np.ones(acceptance.shape[1])
p /= p.sum() # normalize

idx_range = np.arange(len(p))
idx1 = np.random.choice(idx_range, p=p)
idx2 = np.random.choice(idx_range, p=p)

if idx1 != idx2:
self.logger.info(f"Swapping parameter sets {idx1} and {idx2}")
return [(idx1, idx2)]
else:
self.logger.info("Preserving all parameter sets")
return []


class ARMCPT(ARMC):
"""An :class:`ARMC` subclass implementing a parallel tempering procedure."""

def __init__(self, **kwargs: Any) -> None:
swap_phi: SwapFunc

def __init__(self, swapper: SwapFunc = swap_random, **kwargs: Any) -> None:
r"""Initialize an :class:`ARMCPT` instance.
Parameters
----------
swapper : :data:`Callable[[ndarray,ARMCPT], Iterable[Tuple[int, int]]<typing.Callable>`
A callable for identifying the to-be swapped parameter.
Should return an iterable with 2-tuples of to-be swapped indices.
\**kwargs : :data:`~typing.Any`
Further keyword arguments for the :class:`ARMC` and
:class:`MonteCarloABC` superclasses.
"""
super().__init__(**kwargs)
self.swap_phi = swapper
if len(self.phi.phi) <= 1:
raise ValueError("{self.__class__.__name__!r} requires 'phi.phi' to "
"contain more than 1 element")
Expand All @@ -65,9 +133,7 @@ def acceptance(self) -> np.ndarray:
return np.zeros(shape, dtype=bool)

@overload
def _parse_call(self) -> List[Key]: ...
@overload
def _parse_call(self, start: None, key_new: None) -> List[Key]: ...
def _parse_call(self, start: None = ..., key_new: None = ...) -> List[Key]: ...
@overload
def _parse_call(self, start: int, key_new: Iterable[Key]) -> List[Key]: ...
def _parse_call(self, start=None, key_new=None): # noqa: E301
Expand All @@ -81,9 +147,7 @@ def _parse_call(self, start=None, key_new=None): # noqa: E301
return list(ret)

@overload
def __call__(self) -> None: ...
@overload
def __call__(self, start: None, key_new: None) -> None: ...
def __call__(self, start: None = ..., key_new: None = ...) -> None: ...
@overload
def __call__(self, start: int, key_new: Key) -> None: ...
def __call__(self, start=None, key_new=None): # noqa: E301
Expand All @@ -100,7 +164,7 @@ def __call__(self, start=None, key_new=None): # noqa: E301
for omega in range(self.sub_iter_len):
key_new = self.do_inner(kappa, omega, acceptance, key_new)
self.phi.update(acceptance)
self.swap_phi(acceptance)
self.swap_phi(acceptance, self)

def do_inner(self, kappa: int, omega: int, acceptance: np.ndarray,
key_old: Sequence[Key]) -> List[Key]:
Expand Down Expand Up @@ -179,53 +243,7 @@ def _do_inner4(self, accept: Iterable[bool], error_change: Iterable[bool],

return ret

def swap_phi(self, acceptance: np.ndarray, weighted: bool = False) -> None:
r"""Swap the :math:`\phi` and move range of two between forcefield parameter sets.
The two main parameters are the acceptance rate
:math:`\boldsymbol{\alpha} \in \mathbb{R}^{(n,)}` and target acceptance rate
:math:`\boldsymbol{\alpha^{t}} \in \mathbb{R}^{(n,)}`,
both vector's elements being the :math:`[0,1]` interval.
The (weighted) probability distribution :math:`\boldsymbol{p} \in \mathbb{R}^{(n,)}` is
consequently used for identifying which two forcefield parameter sets
are swapped.
.. math::
\hat{\boldsymbol{p}} = |\boldsymbol{\alpha} - \boldsymbol{\alpha^{t}}|^{-1}
\boldsymbol{p} = \frac{\hat{\boldsymbol{p}}}{\sum^n_{i} {\hat{\boldsymbol{p}}}_{i}}
Parameters
----------
acceptance : :class:`numpy.ndarray` [:class:`bool`], shape :math:`(n, m)`
A 2D boolean array with acceptance rates over
the course of the last super-iteration.
"""
if weighted:
_p = acceptance.mean(axis=0) - self.phi.a_target
if 0 in _p:
p = np.zeros_like(_p)
p[_p == 0] = True
else:
p = _p**-1
else:
p = np.ones(acceptance.shape[1])
p /= p.sum() # normalize

idx_range = np.arange(len(p))
idx1 = np.random.choice(idx_range, p=p)
idx2 = np.random.choice(idx_range, p=p)

if idx1 != idx2:
self.logger.info(f"Swapping parameter sets {idx1} and {idx2}")
self._swap_phi(idx1, idx2)
else:
self.logger.info("Preserving all parameter sets")

def _swap_phi(self, idx1: int, idx2: int) -> None:
def _swap_phi(self, idx_iter: Iterable[Tuple[int, int]]) -> None:
"""Swap the array-elements **idx1** and **idx2** of four :class:`ARMCPT` attributes.
Affects the following attributes:
Expand All @@ -236,10 +254,11 @@ def _swap_phi(self, idx1: int, idx2: int) -> None:
* :attr:`ARMCPT.param.move_range<FOX.armc.ParamMapping.move_range>`
"""
i = [idx1, idx2]
j = [idx2, idx1]

self.phi.phi[i] = self.phi.phi[j]
self.phi.a_target[i] = self.phi.a_target[j]
self.phi.gamma[i] = self.phi.gamma[j]
self.param.move_range[i] = self.param.move_range[j]
for idx1, idx2 in idx_iter:
i = [idx1, idx2]
j = [idx2, idx1]

self.phi.phi[i] = self.phi.phi[j]
self.phi.a_target[i] = self.phi.a_target[j]
self.phi.gamma[i] = self.phi.gamma[j]
self.param.move_range[i] = self.param.move_range[j]
1 change: 1 addition & 0 deletions FOX/armc/monte_carlo.py
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ class MonteCarloABC(AbstractDataClass, ABC, Mapping[Key, np.ndarray]):
keep_files: bool
hdf5_file: Union[str, PathLike]
pes: Dict[str, GetPesDescriptor]
swap_phi: Optional[Callable[..., Any]]

@property
def molecule(self) -> Tuple[MultiMolecule, ...]:
Expand Down

0 comments on commit fc7845e

Please sign in to comment.