From db030c458e832ea20fb6143c13a2e9b5643da7bf Mon Sep 17 00:00:00 2001 From: DuncDennis <90915296+DuncDennis@users.noreply.github.com> Date: Thu, 20 Apr 2023 09:49:15 +0200 Subject: [PATCH] WIP: New simulation backend with better structure. --- src/lorenzpy/simulations.py | 308 ------------------ src/lorenzpy/simulations/__init__.py | 62 ++++ src/lorenzpy/simulations/_autonomous_flows.py | 76 +++++ src/lorenzpy/simulations/_base.py | 218 +++++++++++++ src/lorenzpy/simulations/_discrete_maps.py | 19 ++ src/lorenzpy/simulations/_driven_systems.py | 16 + src/lorenzpy/simulations/_others.py | 234 +++++++++++++ tests/test_simulations.py | 7 +- 8 files changed, 631 insertions(+), 309 deletions(-) delete mode 100644 src/lorenzpy/simulations.py create mode 100644 src/lorenzpy/simulations/__init__.py create mode 100644 src/lorenzpy/simulations/_autonomous_flows.py create mode 100644 src/lorenzpy/simulations/_base.py create mode 100644 src/lorenzpy/simulations/_discrete_maps.py create mode 100644 src/lorenzpy/simulations/_driven_systems.py create mode 100644 src/lorenzpy/simulations/_others.py diff --git a/src/lorenzpy/simulations.py b/src/lorenzpy/simulations.py deleted file mode 100644 index 6abcd88..0000000 --- a/src/lorenzpy/simulations.py +++ /dev/null @@ -1,308 +0,0 @@ -"""Simulate various continuous and discrete chaotic dynamical system. - -Every dynamical system is represented as a class. - -The available classes are: -- Lorenz63 -- MackeyGlass - -The system's parameters are introduced in the class's constructor. - -For example when creating a system object of the Lorenz63, the Lorenz parameters, -sigma, rho, beta, and the timestep dt are parsed as: - -sys_obj = Lorenz63(sigma=10, rho=10, beta=5, dt=1) - -Each sys_obj contains a "simulate" function. -To simulate 1000 time-steps of the Lorenz63 system call: - -sys_obj.simulate(1000). - -The general syntax to create a trajectory of a System is given as: - -trajectory = (=). -simulate(time_steps, starting_point=) - -Examples: - >>> import lorenzpy.simulations as sims - >>> data = sims.Lorenz63().simulate(1000) - >>> data.shape - (1000, 3) -""" - -from __future__ import annotations - -from abc import ABC, abstractmethod -from typing import Callable - -import numpy as np - - -def _runge_kutta( - f: Callable[[np.ndarray], np.ndarray], dt: float, x: np.ndarray -) -> np.ndarray: - """Simulate one step for ODEs of the form dx/dt = f(x(t)) using Runge-Kutta. - - Args: - f: function used to calculate the time derivative at point x. - dt: time step size. - x: d-dim position at time t. - - Returns: - d-dim position at time t+dt. - - """ - k1: np.ndarray = dt * f(x) - k2: np.ndarray = dt * f(x + k1 / 2) - k3: np.ndarray = dt * f(x + k2 / 2) - k4: np.ndarray = dt * f(x + k3) - next_step: np.ndarray = x + 1.0 / 6 * (k1 + 2 * k2 + 2 * k3 + k4) - return next_step - - -def _timestep_iterator( - f: Callable[[np.ndarray], np.ndarray], time_steps: int, starting_point: np.ndarray -) -> np.ndarray: - """Iterate a function f: x(i+1) = f(x(i)) multiple times to obtain a full traj. - - Args: - f: The iterator function x(i+1) = f(x(i)). - time_steps: The number of time_steps of the output trajectory. - The starting_point is included as the 0th element in the trajectory. - starting_point: Starting point of the trajectory. - - Returns: - trajectory: system-state at every simulated timestep. - - """ - starting_point = np.array(starting_point) - traj_size = (time_steps, starting_point.shape[0]) - traj = np.zeros(traj_size) - traj[0, :] = starting_point - for t in range(1, traj_size[0]): - traj[t] = f(traj[t - 1]) - return traj - - -class _SimBase(ABC): - """A base class for all the simulation classes.""" - - default_starting_point: np.ndarray - sys_dim: int - - @abstractmethod - def iterate(self, x: np.ndarray) -> np.ndarray: - """Generate the next time step when the previous one is given.""" - - def simulate( - self, time_steps: int, starting_point: np.ndarray | None = None - ) -> np.ndarray: - """Simulate the trajectory. - - Args: - time_steps: Number of time steps t to simulate. - starting_point: Starting point of the trajectory shape (sys_dim,). - If None, take the default starting point. - - Returns: - Trajectory of shape (t, sys_dim). - - """ - if starting_point is None: - starting_point = self.default_starting_point - else: - if starting_point.size != self.sys_dim: - raise ValueError( - "Provided starting_point has the wrong dimension. " - f"{self.sys_dim} was expected and {starting_point.size} " - "was given" - ) - return _timestep_iterator(self.iterate, time_steps, starting_point) - - -class _SimBaseRungeKutta(_SimBase): - dt: float - - @abstractmethod - def flow(self, x: np.ndarray) -> np.ndarray: - """Return the flow of the continuous dynamical system.""" - - def iterate(self, x: np.ndarray) -> np.ndarray: - """Calculate the next timestep x(t+dt) with given x(t) using runge kutta. - - Args: - x: the previous point x(t). - - Returns: - : x(t+dt) corresponding to input x(t). - - """ - return _runge_kutta(self.flow, self.dt, x) - - -class Lorenz63(_SimBaseRungeKutta): - """Simulate the 3-dimensional autonomous flow: Lorenz-63 attractor. - - Literature values (Sprott, Julien Clinton, and Julien C. Sprott. Chaos and - time-series analysis. Vol. 69. Oxford: Oxford university press, 2003.) for default - parameters and starting_point: - - Lyapunov exponents: (0.9059, 0.0, -14.5723) - - Kaplan-Yorke dimension: 2.06215 - - Correlation dimension: 2.068 +- 0.086 - They refer to: - - Parameters: {"sigma": 10.0, "rho": 28.0, "beta": 8 / 3} - - Starting point: [0.0, -0.01, 9.0] - """ - - default_parameters = {"sigma": 10.0, "rho": 28.0, "beta": 8 / 3, "dt": 0.05} - default_starting_point = np.array([0.0, -0.01, 9.0]) - sys_dim = 3 - - def __init__( - self, - sigma: float | None = None, - rho: float | None = None, - beta: float | None = None, - dt: float | None = None, - ) -> None: - """Define the system parameters. - - Args: - sigma: 'sigma' parameter in the Lorenz 63 equations. - rho: 'rho' parameter in the Lorenz 63 equations. - beta: 'beta' parameter in the Lorenz 63 equations. - dt: Size of time steps. - """ - self.sigma = self.default_parameters["sigma"] if sigma is None else sigma - self.rho = self.default_parameters["rho"] if rho is None else rho - self.beta = self.default_parameters["beta"] if beta is None else beta - self.dt = self.default_parameters["dt"] if dt is None else dt - - def flow(self, x: np.ndarray) -> np.ndarray: - """Calculate (dx/dt, dy/dt, dz/dt) with given (x,y,z) for RK4. - - Args: - x: (x,y,z) coordinates. Needs to have shape (3,). - - Returns: - : (dx/dt, dy/dt, dz/dt) corresponding to input x. - - """ - return np.array( - [ - self.sigma * (x[1] - x[0]), - x[0] * (self.rho - x[2]) - x[1], - x[0] * x[1] - self.beta * x[2], - ] - ) - - -class MackeyGlass: - """Simulate the Mackey-Glass delay differential system. - - TODO: Add literature values for Lyapunov etc. - TODO: Hint the differences between this class and the other Sim classes (delay). - TODO: Check if the structure is really good? - TODO: Add Proper Tests. - TODO: Decide whether to use the simple forward-euler or RK4-style update. - - Note: As the Mackey-Glass system is a delay-differential equation, the class does - not contain a simple iterate function. - """ - - default_parameters = {"a": 0.2, "b": 0.1, "c": 10, "tau": 23.0, "dt": 0.1} - default_starting_point = np.array([0.9]) - sys_dim = 1 - - def __init__( - self, - a: float | None = None, - b: float | None = None, - c: int | None = None, - tau: float | None = None, - dt: float | None = None, - ) -> None: - """Define the Mackey-Glass system parameters. - - Args: - a: "a" parameter of the Mackey-Glass equation. - b: "b" parameter of the Mackey-Glass equation. - c: "c" parameter of the Mackey-Glass equation. - tau: Time delay of Mackey-Glass equation. - dt: Size of time steps. - """ - self.a = self.default_parameters["a"] if a is None else a - self.b = self.default_parameters["b"] if b is None else b - self.c = self.default_parameters["c"] if c is None else c - self.tau = self.default_parameters["tau"] if tau is None else tau - self.dt = self.default_parameters["dt"] if dt is None else dt - - # The number of time steps between t=0 and t=-tau: - self.history_steps = int(self.tau / self.dt) - - def flow_mg(self, x: np.ndarray, x_past: np.ndarray) -> np.ndarray: - """Calculate the flow of the Mackey-Glass equation. - - Args: - x: The immediate value of the system. Needs to have shape (1,). - x_past: The delayed value of the system. Needs to have shape (1,). - - Returns: - : The flow corresponding to x and x_past. - """ - return np.array( - [self.a * x_past[0] / (1 + x_past[0] ** self.c) - self.b * x[0]] - ) - - def iterate_mg(self, x: np.ndarray, x_past: np.ndarray) -> np.ndarray: - """Calculate the next time step in the Mackey-Glass equation. - - Args: - x: The immediate value of the system. Needs to have shape (1,). - x_past: The delayed value of the system. Needs to have shape (1,). - - Returns: - : The next value given the immediate and delayed values. - """ - return x + self.dt * self.flow_mg(x, x_past) - # f = lambda x_use: self.flow_mg(x_use, x_past) - # return _runge_kutta(f, self.dt, x) - - def simulate( - self, time_steps: int, starting_point: np.ndarray | None = None - ) -> np.ndarray: - """Simulate the Mackey-Glass trajectory. - - Args: - time_steps: Number of time steps t to simulate. - starting_point: Starting point of the trajectory shape (sys_dim,). - If None, take the default starting point. - - Returns: - Trajectory of shape (t, sys_dim). - - """ - if starting_point is None: - starting_point = self.default_starting_point - else: - if starting_point.size != self.sys_dim: - raise ValueError( - "Provided starting_point has the wrong dimension. " - f"{self.sys_dim} was expected and {starting_point.size} " - "was given" - ) - - traj_w_hist = np.zeros((self.history_steps + time_steps, self.sys_dim)) - traj_w_hist[: self.history_steps, :] = np.repeat( - starting_point, self.history_steps - )[:, np.newaxis] - traj_w_hist[self.history_steps, :] = starting_point - - for t in range(1, time_steps): - t_shifted = t + self.history_steps - traj_w_hist[t_shifted] = self.iterate_mg( - traj_w_hist[t_shifted - 1], - traj_w_hist[t_shifted - 1 - self.history_steps], - ) - - return traj_w_hist[self.history_steps :, :] diff --git a/src/lorenzpy/simulations/__init__.py b/src/lorenzpy/simulations/__init__.py new file mode 100644 index 0000000..fbc82c5 --- /dev/null +++ b/src/lorenzpy/simulations/__init__.py @@ -0,0 +1,62 @@ +"""Simulate various continuous and discrete chaotic dynamical system. + +Every dynamical system is represented as a class. + +The available classes are: +- Lorenz63 +- MackeyGlass + +The system's parameters are introduced in the class's constructor. + +For example when creating a system object of the Lorenz63, the Lorenz parameters, +sigma, rho, beta, and the timestep dt are parsed as: + +sys_obj = Lorenz63(sigma=10, rho=10, beta=5, dt=1) + +Each sys_obj contains a "simulate" function. +To simulate 1000 time-steps of the Lorenz63 system call: + +sys_obj.simulate(1000). + +The general syntax to create a trajectory of a System is given as: + +trajectory = (=). +simulate(time_steps, starting_point=) + +Examples: + >>> import lorenzpy.simulations as sims + >>> data = sims.Lorenz63().simulate(1000) + >>> data.shape + (1000, 3) + + +TODO + - Probably for each concrete simulation class + public methods. Compare with sklearn + - Find out which functionality is missing. E.g. Raising error when wrong values are + parsed. + - Check where to add proper tests and how to add them efficiently. Fixtures? + Parametrization? + - Implement all the other dynamical systems. + - Check if the names of files and functions make sense? + - Add functionality to add your own dynamical system? As my base-classes are + protected this is maybe not so easy? -> Make ABC public? + - Think about adding NARMA? Maybe I need a random number generator framework. + - Check if I can further reduce code duplication. Maybe regarding solvers. + - Check for proper doc-generation. It seems that the methods of inhereted members + is not implemented yet. See: + https://github.com/mkdocstrings/mkdocstrings/issues/78 +""" + +from ._autonomous_flows import Lorenz63, Lorenz96 +from ._discrete_maps import Logistic +from ._driven_systems import SimplestDrivenChaotic +from ._others import KuramotoSivashinsky, MackeyGlass + +__all__ = [ + "Lorenz63", + "Lorenz96", + "Logistic", + "SimplestDrivenChaotic", + "KuramotoSivashinsky", + "MackeyGlass", +] diff --git a/src/lorenzpy/simulations/_autonomous_flows.py b/src/lorenzpy/simulations/_autonomous_flows.py new file mode 100644 index 0000000..835e5f2 --- /dev/null +++ b/src/lorenzpy/simulations/_autonomous_flows.py @@ -0,0 +1,76 @@ +"""Autonomous flows.""" +import numpy as np + +from ._base import _BaseSimFlow + + +class Lorenz63(_BaseSimFlow): + """Simulation class for the Lorenz63 system. + + This function is able to simulate the chaotic dynamical system originally + introduced by Lorenz. + + Attributes: + sigma: Sigma parameter. + rho: rho parameter. + """ + + def __init__(self, sigma=10.0, rho=28.0, beta=8 / 3, dt=0.1, solver="rk4"): + """Initialize the Lorenz63. + + Args: + sigma: a. + rho: b. + beta: c. + dt: d. + solver: e. + """ + super().__init__(dt, solver) + self.sigma = sigma + self.rho = rho + self.beta = beta + + def flow(self, x: np.ndarray) -> np.ndarray: + return np.array( + [ + self.sigma * (x[1] - x[0]), + x[0] * (self.rho - x[2]) - x[1], + x[0] * x[1] - self.beta * x[2], + ] + ) + + def get_default_starting_pnt(self) -> np.ndarray: + return np.array([0.0, -0.01, 9.0]) + + +class Lorenz96(_BaseSimFlow): + """Simulate the n-dimensional Lorenz 96 model.""" + + def __init__( + self, sys_dim: int = 30, force: float = 8.0, dt: float = 0.05, solver="rk4" + ) -> None: + super().__init__(dt=dt, solver=solver) + self.sys_dim = sys_dim + self.force = force + + def get_default_starting_pnt(self) -> np.ndarray: + return np.sin(np.arange(self.sys_dim)) + + def flow(self, x: np.ndarray) -> np.ndarray: + system_dimension = x.shape[0] + derivative = np.zeros(system_dimension) + # Periodic Boundary Conditions for the 3 edge cases i=1,2,system_dimension + derivative[0] = (x[1] - x[system_dimension - 2]) * x[system_dimension - 1] - x[ + 0 + ] + derivative[1] = (x[2] - x[system_dimension - 1]) * x[0] - x[1] + derivative[system_dimension - 1] = (x[0] - x[system_dimension - 3]) * x[ + system_dimension - 2 + ] - x[system_dimension - 1] + + # TODO: Rewrite using numpy vectorization to make faster + for i in range(2, system_dimension - 1): + derivative[i] = (x[i + 1] - x[i - 2]) * x[i - 1] - x[i] + + derivative = derivative + self.force + return derivative diff --git a/src/lorenzpy/simulations/_base.py b/src/lorenzpy/simulations/_base.py new file mode 100644 index 0000000..f3db174 --- /dev/null +++ b/src/lorenzpy/simulations/_base.py @@ -0,0 +1,218 @@ +"""Base code for simulations.""" + +from __future__ import annotations + +import inspect +from abc import ABC, abstractmethod +from typing import Any, Callable + +import numpy as np + + +def _forward_euler( + f: Callable[[np.ndarray], np.ndarray], dt: float, x: np.ndarray +) -> np.ndarray: + """Simulate one step for ODEs of the form dx/dt = f(x(t)) using the forward euler. + + Args: + f: function used to calculate the time derivative at point x. + dt: time step size. + x: d-dim position at time t. + + Returns: + d-dim position at time t+dt. + + """ + return x + dt * f(x) + + +def _runge_kutta( + f: Callable[[np.ndarray], np.ndarray], dt: float, x: np.ndarray +) -> np.ndarray: + """Simulate one step for ODEs of the form dx/dt = f(x(t)) using Runge-Kutta. + + Args: + f: function used to calculate the time derivative at point x. + dt: time step size. + x: d-dim position at time t. + + Returns: + d-dim position at time t+dt. + + """ + k1: np.ndarray = dt * f(x) + k2: np.ndarray = dt * f(x + k1 / 2) + k3: np.ndarray = dt * f(x + k2 / 2) + k4: np.ndarray = dt * f(x + k3) + next_step: np.ndarray = x + 1.0 / 6 * (k1 + 2 * k2 + 2 * k3 + k4) + return next_step + + +def _timestep_iterator( + f: Callable[[np.ndarray], np.ndarray], time_steps: int, starting_point: np.ndarray +) -> np.ndarray: + """Iterate an iterator-function f: x(i+1) = f(x(i)) multiple times.""" + starting_point = np.array(starting_point) + traj_size = (time_steps, starting_point.shape[0]) + traj = np.zeros(traj_size) + traj[0, :] = starting_point + for t in range(1, traj_size[0]): + traj[t] = f(traj[t - 1]) + return traj + + +class _BaseSim(ABC): + """Base class for all simulation classes.""" + + @classmethod + def _get_init_params(cls) -> list[inspect.Parameter]: + """Get the init parameter names and default values.""" + init = cls.__init__ + + # introspect the constructor arguments to find the model parameters + # to represent + init_signature = inspect.signature(init) + + # Consider the constructor parameters excluding 'self' + parameters = [p for p in init_signature.parameters.values() if p.name != "self"] + return parameters + + @classmethod + def get_default_params(cls) -> dict[str, Any]: + """Get the default parameters.""" + parameters = cls._get_init_params() + out = dict() + for param in parameters: + out[param.name] = param.default + return out + + def get_params(self) -> dict[str, Any]: + """Get the current parameters.""" + out = dict() + parameters = self._get_init_params() + for param in parameters: + key = param.name + value = getattr(self, key) + out[key] = value + return out + + @abstractmethod + def simulate( + self, + time_steps: int, + starting_point: np.ndarray | None = None, + transient: int = 0, + ) -> np.ndarray: + """Simulate the trajectory of the Dynamical System. + + Args: + time_steps: Number of time steps in returned trajectory. + starting_point: The starting point of the trajectory of shape (sim_dim,). + If None is provided, the default value will be used. + If transient is 0, the first point of the returned + trajectory will be the starting_point. + transient: Optional number of transient points to discard before tracking + the returned trajectory. + + Returns: + The trajectory of shape (time_steps, sim_dim). + """ + + @abstractmethod + def get_default_starting_pnt(self) -> np.ndarray: + """Get the default starting point to be used in the simulate function.""" + + +class _BaseSimIterate(_BaseSim): + """Base class for Simulation classes using an iterate function.""" + + @abstractmethod + def iterate(self, x: np.ndarray) -> np.ndarray: + """Iterate the system from one time step to the next.""" + + def simulate( + self, + time_steps: int, + starting_point: np.ndarray | None = None, + transient: int = 0, + ) -> np.ndarray: + """Simulate the trajectory. + + Args: + time_steps: Number of time steps t to simulate. + starting_point: Starting point of the trajectory shape (sys_dim,). + If None, take the default starting point. + transient: TBD + + Returns: + Trajectory of shape (t, sys_dim). + + """ + if starting_point is None: + starting_point = self.get_default_starting_pnt() + + return _timestep_iterator(self.iterate, time_steps + transient, starting_point)[ + transient:, : + ] + + +class _BaseSimFlow(_BaseSimIterate): + """Base class for continuous-time Simulation classes defined by a flow.""" + + @abstractmethod + def __init__( + self, + dt: float, + solver: str | Callable[[Callable, float, np.ndarray], np.ndarray], + ): + """Initialize the time step dt and the solver.""" + self.dt = dt + self.solver = solver + + @abstractmethod + def flow(self, x: np.ndarray) -> np.ndarray: + """Return the flow of the continuous dynamical system.""" + + def iterate(self, x: np.ndarray) -> np.ndarray: + """Calculate the next timestep x(t+dt) with given x(t) using runge kutta. + + Args: + x: the previous point x(t). + + Returns: + : x(t+dt) corresponding to input x(t). + + """ + if isinstance(self.solver, str): + if self.solver == "rk4": + x_next = _runge_kutta(self.flow, self.dt, x) + elif self.solver == "forward_euler": + x_next = _forward_euler(self.flow, self.dt, x) + else: + raise ValueError(f"Unsupported solver: {self.solver}") + else: + x_next = self.solver(self.flow, self.dt, x) + + return x_next + + +class _BaseSimFlowDriven(_BaseSimFlow, ABC): + """Base class for driven continuous time dynamical systems.""" + + def simulate( + self, + time_steps: int, + starting_point: np.ndarray | None = None, + transient: int = 0, + time_included: bool = False, + ) -> np.ndarray: + if starting_point is not None: + if not time_included: + starting_point = np.append(starting_point, 0) + traj = super().simulate( + time_steps, starting_point=starting_point, transient=transient + ) + if not time_included: + return traj[:, :-1] + else: + return traj diff --git a/src/lorenzpy/simulations/_discrete_maps.py b/src/lorenzpy/simulations/_discrete_maps.py new file mode 100644 index 0000000..01713b2 --- /dev/null +++ b/src/lorenzpy/simulations/_discrete_maps.py @@ -0,0 +1,19 @@ +"""Discrete maps.""" +import numpy as np + +from ._base import _BaseSimIterate + + +class Logistic(_BaseSimIterate): + def __init__(self, r: float = 4.0) -> None: + self.r = r + + def iterate(self, x: np.ndarray) -> np.ndarray: + return np.array( + [ + self.r * x[0] * (1 - x[0]), + ] + ) + + def get_default_starting_pnt(self) -> np.ndarray: + return np.array([0.1]) diff --git a/src/lorenzpy/simulations/_driven_systems.py b/src/lorenzpy/simulations/_driven_systems.py new file mode 100644 index 0000000..f19dce5 --- /dev/null +++ b/src/lorenzpy/simulations/_driven_systems.py @@ -0,0 +1,16 @@ +"""Driven Systems.""" +import numpy as np + +from ._base import _BaseSimFlowDriven + + +class SimplestDrivenChaotic(_BaseSimFlowDriven): + def __init__(self, omega: float = 1.88, dt: float = 0.1, solver="rk4") -> None: + super().__init__(dt, solver) + self.omega = omega + + def flow(self, x: np.ndarray) -> np.ndarray: + return np.array([x[1], -(x[0] ** 3) + np.sin(self.omega * x[2]), 1]) + + def get_default_starting_pnt(self) -> np.ndarray: + return np.array([0.0, 0.0, 0.0]) diff --git a/src/lorenzpy/simulations/_others.py b/src/lorenzpy/simulations/_others.py new file mode 100644 index 0000000..41e90f3 --- /dev/null +++ b/src/lorenzpy/simulations/_others.py @@ -0,0 +1,234 @@ +"""Dynamical systems implemented with other algorithms.""" +from __future__ import annotations + +from typing import Callable + +import numpy as np + +from ._base import _BaseSim, _BaseSimIterate, _forward_euler, _runge_kutta + + +class KuramotoSivashinsky(_BaseSimIterate): + """Simulate the n-dimensional Kuramoto-Sivashinsky PDE. + + Note: dimension must be an even number. + + PDE: y_t = -y*y_x - (1+eps)*y_xx - y_xxxx. + + Reference for the numerical integration: + "fourth order time stepping for stiff pde-kassam trefethen 2005" at + https://people.maths.ox.ac.uk/trefethen/publication/PDF/2005_111.pdf + + Python implementation at: https://github.com/E-Renshaw/kuramoto-sivashinsky + + Literature values (doi:10.1017/S1446181119000105) for Lyapunov Exponents: + - lyapunov exponents: (0.080, 0.056, 0.014, 0.003, -0.003 ...) + They refer to: + - Parameters: {"sys_length": 36.0, "eps": 0.0} + """ + + def __init__( + self, + sys_dim: int = 50, + sys_length: float = 36.0, + eps: float = 0.0, + dt: float = 0.1, + ) -> None: + self.sys_dim = sys_dim + self.sys_length = sys_length + self.eps = eps + self.dt = dt + + self._prepare() + + def get_default_starting_pnt(self) -> np.ndarray: + x = ( + self.sys_length + * np.transpose(np.conj(np.arange(1, self.sys_dim + 1))) + / self.sys_dim + ) + return np.array( + np.cos(2 * np.pi * x / self.sys_length) + * (1 + np.sin(2 * np.pi * x / self.sys_length)) + ) + + def _prepare(self) -> None: + """Calculate internal attributes. + + TODO: make auxiliary variables protected. + """ + k = ( + np.transpose( + np.conj( + np.concatenate( + ( + np.arange(0, self.sys_dim / 2), + np.array([0]), + np.arange(-self.sys_dim / 2 + 1, 0), + ) + ) + ) + ) + * 2 + * np.pi + / self.sys_length + ) + + L = (1 + self.eps) * k**2 - k**4 + + self.E = np.exp(self.dt * L) + self.E_2 = np.exp(self.dt * L / 2) + M = 64 + r = np.exp(1j * np.pi * (np.arange(1, M + 1) - 0.5) / M) + LR = self.dt * np.transpose(np.repeat([L], M, axis=0)) + np.repeat( + [r], self.sys_dim, axis=0 + ) + self.Q = self.dt * np.real(np.mean((np.exp(LR / 2) - 1) / LR, axis=1)) + self.f1 = self.dt * np.real( + np.mean((-4 - LR + np.exp(LR) * (4 - 3 * LR + LR**2)) / LR**3, axis=1) + ) + self.f2 = self.dt * np.real( + np.mean((2 + LR + np.exp(LR) * (-2 + LR)) / LR**3, axis=1) + ) + self.f3 = self.dt * np.real( + np.mean((-4 - 3 * LR - LR**2 + np.exp(LR) * (4 - LR)) / LR**3, axis=1) + ) + + self.g = -0.5j * k + + def iterate(self, x: np.ndarray) -> np.ndarray: + """Calculate next timestep x(t+1) with given x(t). + + Args: + x: (x_0(i),x_1(i),..) coordinates. Needs to have shape (self.sys_dim,). + + Returns: + : (x_0(i+1),x_1(i+1),..) corresponding to input x. + + """ + v = np.fft.fft(x) + Nv = self.g * np.fft.fft(np.real(np.fft.ifft(v)) ** 2) + a = self.E_2 * v + self.Q * Nv + Na = self.g * np.fft.fft(np.real(np.fft.ifft(a)) ** 2) + b = self.E_2 * v + self.Q * Na + Nb = self.g * np.fft.fft(np.real(np.fft.ifft(b)) ** 2) + c = self.E_2 * a + self.Q * (2 * Nb - Nv) + Nc = self.g * np.fft.fft(np.real(np.fft.ifft(c)) ** 2) + v = self.E * v + Nv * self.f1 + 2 * (Na + Nb) * self.f2 + Nc * self.f3 + return np.real(np.fft.ifft(v)) + + +class MackeyGlass(_BaseSim): + """Simulate the Mackey-Glass delay differential system. + + TODO: Add literature values for Lyapunov etc. + TODO: Hint the differences between this class and the other Sim classes (delay). + TODO: Check if the structure is really good? + TODO: Add Proper Tests. + TODO: Decide whether to use the simple forward-euler or RK4-style update. + + Note: As the Mackey-Glass system is a delay-differential equation, the class does + not contain a simple iterate function. + """ + + def __init__( + self, + a: float = 0.2, + b: float = 0.1, + c: int = 10, + tau: float = 23.0, + dt: float = 0.1, + solver: str | Callable = "rk4", + ) -> None: + self.a = a + self.b = b + self.c = c + self.tau = tau + self.dt = dt + self.solver = solver + + # The number of time steps between t=0 and t=-tau: + self.history_steps = int(self.tau / self.dt) + + def get_default_starting_pnt(self) -> np.ndarray: + return np.array([0.9]) + + def flow_mg(self, x: np.ndarray, x_past: np.ndarray) -> np.ndarray: + """Calculate the flow of the Mackey-Glass equation. + + Args: + x: The immediate value of the system. Needs to have shape (1,). + x_past: The delayed value of the system. Needs to have shape (1,). + + Returns: + : The flow corresponding to x and x_past. + """ + return np.array( + [self.a * x_past[0] / (1 + x_past[0] ** self.c) - self.b * x[0]] + ) + + def iterate_mg(self, x: np.ndarray, x_past: np.ndarray) -> np.ndarray: + """Calculate the next time step in the Mackey-Glass equation. + + Args: + x: The immediate value of the system. Needs to have shape (1,). + x_past: The delayed value of the system. Needs to have shape (1,). + + Returns: + : The next value given the immediate and delayed values. + """ + + def flow_like(x_use: np.ndarray) -> np.ndarray: + return self.flow_mg(x_use, x_past) + + if isinstance(self.solver, str): + if self.solver == "rk4": + x_next = _runge_kutta(flow_like, self.dt, x) + elif self.solver == "forward_euler": + x_next = _forward_euler(flow_like, self.dt, x) + else: + raise ValueError(f"Unsupported solver: {self.solver}") + else: + x_next = self.solver(flow_like, self.dt, x) + + return x_next + + def simulate( + self, + time_steps: int, + starting_point: np.ndarray | None = None, + transient: int = 0, + ) -> np.ndarray: + """Simulate the Mackey-Glass trajectory. + + Args: + time_steps: Number of time steps t to simulate. + starting_point: Starting point of the trajectory shape (sys_dim,). + If None, take the default starting point. + transient: Washout before storing the trajectory. + + Returns: + Trajectory of shape (t, sys_dim). + """ + if starting_point is None: + starting_point = self.get_default_starting_pnt() + + if starting_point.size == self.history_steps + 1: + initial_history = starting_point + elif starting_point.size == 1: + initial_history = np.repeat(starting_point, self.history_steps) + else: + raise ValueError("Wrong size of starting point.") + + traj_w_hist = np.zeros((self.history_steps + time_steps, 1)) + traj_w_hist[: self.history_steps, :] = initial_history[:, np.newaxis] + traj_w_hist[self.history_steps, :] = starting_point + + for t in range(1, time_steps + transient): + t_shifted = t + self.history_steps + traj_w_hist[t_shifted] = self.iterate_mg( + traj_w_hist[t_shifted - 1], + traj_w_hist[t_shifted - 1 - self.history_steps], + ) + + return traj_w_hist[self.history_steps + transient :, :] diff --git a/tests/test_simulations.py b/tests/test_simulations.py index 78bffe0..40602e9 100644 --- a/tests/test_simulations.py +++ b/tests/test_simulations.py @@ -3,9 +3,10 @@ import pytest from lorenzpy import simulations +from lorenzpy.simulations._base import _BaseSimIterate -class DemoSim(simulations._SimBase): +class DemoSim(_BaseSimIterate): """A simple simulation class subclassing simulations._SimBase.""" sys_dim = 3 @@ -18,6 +19,10 @@ def iterate(self, x: np.ndarray) -> np.ndarray: """Just return the same value again.""" return x + def get_default_starting_pnt(self) -> np.ndarray: + """Return some default starting point.""" + return np.array([0, 0, 0]) + def test_lorenz63_simulation_shape(): """Testing lorenz63 simulation."""