"""
Integrator module
=================
Module with the classes of integrators to multi-thread the integration of
an ordinary differential equations
.. math:: \dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})
of the model and its linearized version.
Module classes
--------------
* :class:`RungeKuttaIntegrator`
* :class:`RungeKuttaTglsIntegrator`
"""
import multiprocessing
import numpy as np
from numba import njit
from qgs.integrators.integrate import _integrate_runge_kutta_jit, _integrate_runge_kutta_tgls_jit, _zeros_func
from qgs.functions.util import reverse
[docs]class RungeKuttaIntegrator(object):
"""Class to integrate the ordinary differential equations (ODEs)
.. math:: \dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})
with a set of :class:`TrajectoryProcess` and a specified `Runge-Kutta method`_.
.. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
Parameters
----------
num_threads: None or int, optional
Number of :class:`TrajectoryProcess` workers (threads) to use. If `None`, use the number of machine's
cores available. Default to `None`.
b: None or ~numpy.ndarray, optional
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
If `None`, use the classic RK4 method coefficients. Default to `None`.
c: None or ~numpy.ndarray, optional
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
If `None`, use the classic RK4 method coefficients. Default to `None`.
a: None or ~numpy.ndarray, optional
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
If `None`, use the classic RK4 method coefficients. Default to `None`.
number_of_dimensions: None or int, optional
Allow to hardcode the dynamical system dimension. If `None`, evaluate the dimension from the
callable :attr:`func`. Default to `None`.
Attributes
----------
num_threads: int
Number of :class:`TrajectoryProcess` workers (threads) to use.
b: ~numpy.ndarray
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
c: ~numpy.ndarray
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
a: ~numpy.ndarray
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
n_dim: int
Dynamical system dimension.
n_traj: int
The number of trajectories (initial conditions) computed at the last integration
performed by the integrator.
n_records: int
The number of saved states of the last integration performed by the integrator.
ic: ~numpy.ndarray
Store the integrator initial conditions.
func: callable
Last function :math:`\\boldsymbol{f}` used by the integrator to integrate.
"""
def __init__(self, num_threads=None, b=None, c=None, a=None, number_of_dimensions=None):
if num_threads is None:
self.num_threads = multiprocessing.cpu_count()
else:
self.num_threads = num_threads
# Default is RK4
if a is None and b is None and c is None:
self.c = np.array([0., 0.5, 0.5, 1.])
self.b = np.array([1./6, 1./3, 1./3, 1./6])
self.a = np.zeros((len(self.c), len(self.b)))
self.a[1, 0] = 0.5
self.a[2, 1] = 0.5
self.a[3, 2] = 1.
else:
self.a = a
self.b = b
self.c = c
self.ic = None
self._time = None
self._recorded_traj = None
self.n_traj = 0
self.n_dim = number_of_dimensions
self.n_records = 0
self._write_steps = 0
self._time_direction = 1
self.func = None
self._ics_queue = None
self._traj_queue = None
self._processes_list = list()
[docs] def terminate(self):
"""Stop the workers (threads) and release the resources of the integrator."""
for process in self._processes_list:
process.terminate()
process.join()
[docs] def start(self):
"""Start or restart the workers (threads) of the integrator.
Warnings
--------
If the integrator was not previously terminated, it will be terminated first in the case
of a restart.
"""
self.terminate()
self._processes_list = list()
self._ics_queue = multiprocessing.JoinableQueue()
self._traj_queue = multiprocessing.Queue()
for i in range(self.num_threads):
self._processes_list.append(TrajectoryProcess(i, self.func, self.b, self.c, self.a,
self._ics_queue, self._traj_queue))
for process in self._processes_list:
process.daemon = True
process.start()
[docs] def set_func(self, f, ic_init=True):
"""Set the `Numba`_-jitted function :math:`\\boldsymbol{f}` to integrate.
.. _Numba: https://numba.pydata.org/
Parameters
----------
f: callable
The `Numba`_-jitted function :math:`\\boldsymbol{f}`.
Should have the signature ``f(t, x)`` where ``x`` is the state value and ``t`` is the time.
ic_init: bool, optional
Re-initialize or not the initial conditions of the integrator. Default to `True`.
Warnings
--------
This function restarts the integrator!
"""
self.func = f
if ic_init:
self.ic = None
self.start()
[docs] def set_bca(self, b=None, c=None, a=None, ic_init=True):
"""Set the coefficients of the `Runge-Kutta method`_ and restart the integrator. s
.. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
Parameters
----------
b: None or ~numpy.ndarray, optional
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
If `None`, does not reinitialize these coefficients.
c: None or ~numpy.ndarray, optional
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
If `None`, does not reinitialize these coefficients.
a: None or ~numpy.ndarray, optional
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
If `None`, does not reinitialize these coefficients.
ic_init: bool, optional
Re-initialize or not the initial conditions of the integrator. Default to `True`.
"""
if a is not None:
self.a = a
if b is not None:
self.b = b
if c is not None:
self.c = c
if ic_init:
self.ic = None
self.start()
[docs] def initialize(self, convergence_time, dt, pert_size=0.01, reconvergence_time=None, forward=True,
number_of_trajectories=1, ic=None, reconverge=False):
"""Initialize the integration on an attractor by running it for a transient time,
For an ensemble of initial conditions, can do the same transient time for each, or the
`convergence_time` for the first one, and a smaller `reconvergence_time` for the subsequent ones.
This results into initial conditions on the attractor, stored in :attr:`ic`.
Parameters
----------
convergence_time: float
Transient time needed to converge to the attractor.
dt: float
Timestep of the transient integration.
pert_size:float, optional
If the reconvergence is activated, size of the perturbation to add to the previous ic to find
the next one. Default to 0.01 .
reconvergence_time: None or float, optional
Transient time for the subsequent trajectories after the first long `transient_time`.
forward: bool, optional
Whether to integrate the ODEs forward or backward in time. In case of backward integration, the
initial condition `ic` becomes a final condition. Default to forward integration.
number_of_trajectories: int
Number of initial conditions to find. Default to 1. Inactive if `ic` is provided.
ic: None or ~numpy.ndarray(float), optional
Initial (or final) conditions of the system. Can be a 1D or a 2D array:
* 1D: Provide a single initial condition.
Should be of shape (`n_dim`,) where `n_dim` = :math:`\mathrm{dim}(\\boldsymbol{x})`.
* 2D: Provide an ensemble of initial condition.
Should be of shape (`n_traj`, `n_dim`) where `n_dim` = :math:`\mathrm{dim}(\\boldsymbol{x})`,
and where `n_traj` is the number of initial conditions.
If `None`, use `number_trajectories` random initial conditions. Default to `None`.
If the `forward` argument is `False`, it specifies final conditions.
reconverge: bool
Use or not the smaller transient time reconvergence with a perturbation
after the first initial conditions have been computed. If activated, only use the :attr:`num_threads`
first initial conditions of the `ic` arguments. Default to `False`.
"""
if reconverge is None:
reconverge = False
if ic is None:
if self.n_dim is not None:
i = self.n_dim
else:
i = 1
while True:
self.ic = np.zeros(i)
try:
x = self.func(0., self.ic)
except:
i += 1
else:
break
i = len(self.func(0., self.ic))
if number_of_trajectories > self.num_threads:
reconverge = True
tmp_ic = np.zeros((number_of_trajectories, i))
tmp_ic[:self.num_threads] = np.random.randn(self.num_threads, i)
else:
tmp_ic = np.random.randn(number_of_trajectories, i)
else:
tmp_ic = ic.copy()
if len(tmp_ic.shape) > 1:
number_of_trajectories = tmp_ic.shape[0]
if reconverge and reconvergence_time is not None:
self.integrate(0., convergence_time, dt, ic=tmp_ic[:self.num_threads], write_steps=0, forward=forward)
t, x = self.get_trajectories()
tmp_ic[:self.num_threads] = x
if number_of_trajectories - self.num_threads > self.num_threads:
next_len = self.num_threads
else:
next_len = number_of_trajectories - self.num_threads
index = self.num_threads
while True:
perturbation = pert_size * np.random.randn(next_len, x.shape[1])
self.integrate(0., reconvergence_time, dt, ic=x[:next_len]+perturbation, write_steps=0, forward=forward)
t, x = self.get_trajectories()
tmp_ic[index:index+next_len] = x
index += next_len
if number_of_trajectories - index > self.num_threads:
next_len = self.num_threads
else:
next_len = number_of_trajectories - index
if next_len <= 0:
break
self.ic = tmp_ic
else:
self.integrate(0., convergence_time, dt, ic=tmp_ic, write_steps=0, forward=forward)
t, x = self.get_trajectories()
self.ic = x
[docs] def integrate(self, t0, t, dt, ic=None, forward=True, write_steps=1):
"""Integrate the ordinary differential equations (ODEs)
.. math:: \\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})
with a specified `Runge-Kutta method`_ and workers. The function :math:`\\boldsymbol{f}` is the `Numba`_ jitted
function stored in :attr:`func`. The result of the integration can be obtained afterward by calling
:meth:`get_trajectories`.
.. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
.. _Numba: https://numba.pydata.org/
Parameters
----------
t0: float
Initial time of the time integration. Corresponds to the initial condition's `ic` time.
Important if the ODEs are non-autonomous.
t: float
Final time of the time integration. Corresponds to the final condition.
Important if the ODEs are non-autonomous.
dt: float
Timestep of the integration.
ic: None or ~numpy.ndarray(float), optional
Initial (or final) conditions of the system. Can be a 1D or a 2D array:
* 1D: Provide a single initial condition.
Should be of shape (`n_dim`,) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`.
* 2D: Provide an ensemble of initial condition.
Should be of shape (`n_traj`, `n_dim`) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`,
and where `n_traj` is the number of initial conditions.
If `None`, use the initial conditions stored in :attr:`ic`.
If then :attr:`ic` is `None`, use a zero initial condition.
Default to `None`.
If the `forward` argument is `False`, it specifies final conditions.
forward: bool, optional
Whether to integrate the ODEs forward or backward in time. In case of backward integration, the
initial condition `ic` becomes a final condition. Default to forward integration.
write_steps: int, optional
Save the state of the integration in memory every `write_steps` steps. The other intermediary
steps are lost. It determines the size of the returned objects. Default is 1.
Set to 0 to return only the final state.
"""
if self.func is None:
print('No function to integrate defined!')
return 0
if ic is None:
if self.ic is None:
if self.n_dim is not None:
i = self.n_dim
else:
i = 1
while True:
self.ic = np.zeros(i)
try:
x = self.func(0., self.ic)
except:
i += 1
else:
break
i = len(self.func(0., self.ic))
self.ic = np.zeros(i)
else:
self.ic = ic
if len(self.ic.shape) == 1:
self.ic = self.ic.reshape((1, -1))
self.n_traj = self.ic.shape[0]
self.n_dim = self.ic.shape[1]
self._time = np.concatenate((np.arange(t0, t, dt), np.full((1,), t)))
self._write_steps = write_steps
if forward:
self._time_direction = 1
else:
self._time_direction = -1
if write_steps == 0:
self.n_records = 1
else:
tot = self._time[::self._write_steps]
self.n_records = len(tot)
if tot[-1] != self._time[-1]:
self.n_records += 1
self._recorded_traj = np.zeros((self.n_traj, self.n_dim, self.n_records))
for i in range(self.n_traj):
self._ics_queue.put((i, self._time, self.ic[i], self._time_direction, self._write_steps))
self._ics_queue.join()
for i in range(self.n_traj):
args = self._traj_queue.get()
self._recorded_traj[args[0]] = args[1]
[docs] def get_trajectories(self):
"""Returns the result of the previous integrator integration.
Returns
-------
time, traj: ~numpy.ndarray
The result of the integration:
* **time:** Time at which the state of the system was saved. Array of shape (:attr:`n_records`,).
* **traj:** Saved dynamical system states. 3D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_records`).
If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_dim`, :attr:`n_records`) is returned instead.
"""
if self._write_steps > 0:
if self._time_direction == 1:
if self._time[::self._write_steps][-1] == self._time[-1]:
return self._time[::self._write_steps], np.squeeze(self._recorded_traj)
else:
return np.concatenate((self._time[::self._write_steps], np.full((1,), self._time[-1]))), \
np.squeeze(self._recorded_traj)
else:
rtime = reverse(self._time[::-self._write_steps])
if rtime[0] == self._time[0]:
return rtime, np.squeeze(self._recorded_traj)
else:
return np.concatenate((np.full((1,), self._time[0]), rtime)), np.squeeze(self._recorded_traj)
else:
return self._time[-1], np.squeeze(self._recorded_traj)
[docs] def get_ic(self):
"""Returns the initial conditions stored in the integrator.
Returns
-------
~numpy.ndarray
The initial conditions.
"""
return self.ic
[docs] def set_ic(self, ic):
"""Direct setter for the integrator's initial conditions
Parameters
----------
ic: ~numpy.ndarray(float)
Initial condition of the system. Can be a 1D or a 2D array:
* 1D: Provide a single initial condition.
Should be of shape (`n_dim`,) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`.
* 2D: Provide an ensemble of initial condition.
Should be of shape (`n_traj`, `n_dim`) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`,
and where `n_traj` is the number of initial conditions.
"""
self.ic = ic
[docs]class TrajectoryProcess(multiprocessing.Process):
""":class:`RungeKuttaIntegrator`'s workers class. Allows to multi-thread time integration.
.. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
.. _Numba: https://numba.pydata.org/
Parameters
----------
processID: int
Number identifying the worker.
func: callable
`Numba`_-jitted function to integrate assigned to the worker.
b: ~numpy.ndarray, optional
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
c: ~numpy.ndarray, optional
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
a: ~numpy.ndarray, optional
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
ics_queue: multiprocessing.JoinableQueue
Queue to which the worker ask for initial conditions and parameters input.
traj_queue: multiprocessing.Queue
Queue to which the worker returns the integration results.
Attributes
----------
processID: int
Number identifying the worker.
func: callable
`Numba`_-jitted function to integrate assigned to the worker.
b: ~numpy.ndarray
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
c: ~numpy.ndarray
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
a: ~numpy.ndarray
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
"""
def __init__(self, processID, func, b, c, a, ics_queue, traj_queue):
super().__init__()
self.processID = processID
self._ics_queue = ics_queue
self._traj_queue = traj_queue
self.func = func
self.a = a
self.b = b
self.c = c
[docs] def run(self):
"""Main worker computing routine. Perform the time integration with the fetched initial conditions and parameters."""
while True:
args = self._ics_queue.get()
recorded_traj = _integrate_runge_kutta_jit(self.func, args[1], args[2][np.newaxis, :], args[3], args[4],
self.b, self.c, self.a)
self._traj_queue.put((args[0], recorded_traj))
self._ics_queue.task_done()
[docs]class RungeKuttaTglsIntegrator(object):
"""Class to integrate simultaneously the ordinary differential equations (ODEs)
.. math:: \dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})
and its tangent linear model, i.e. the linearized ODEs
.. math :: \dot{\\boldsymbol{\delta x}} = \\boldsymbol{\mathrm{J}}(t, \\boldsymbol{x}) \cdot \\boldsymbol{\delta x}
where :math:`\\boldsymbol{\mathrm{J}} = \\frac{\partial \\boldsymbol{f}}{\partial \\boldsymbol{x}}` is the
Jacobian matrix of :math:`\\boldsymbol{f}`, with a specified `Runge-Kutta method`_.
To solve this equation, one has to integrate simultaneously both ODEs. This class does so with a set
of :class:`TglsTrajectoryProcess` workers.
The function :math:`\\boldsymbol{f}` and :math:`\\boldsymbol{J}` should
be `Numba`_ jitted functions. These functions must have a signature ``f(t, x)`` and ``J(t, x)`` where ``x`` is
the state value and ``t`` is the time.
.. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
.. _Numba: https://numba.pydata.org/
.. _fundamental matrix of solutions: https://en.wikipedia.org/wiki/Fundamental_matrix_(linear_differential_equation)
Parameters
----------
num_threads: None or int, optional
Number of :class:`TrajectoryProcess` workers (threads) to use. If `None`, use the number of machine's
cores available. Default to `None`.
b: None or ~numpy.ndarray, optional
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
If `None`, use the classic RK4 method coefficients. Default to `None`.
c: None or ~numpy.ndarray, optional
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
If `None`, use the classic RK4 method coefficients. Default to `None`.
a: None or ~numpy.ndarray, optional
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
If `None`, use the classic RK4 method coefficients. Default to `None`.
number_of_dimensions: None or int, optional
Allow to hardcode the dynamical system dimension. If `None`, evaluate the dimension from the
callable :attr:`func`. Default to `None`.
Attributes
----------
num_threads: int
Number of :class:`TrajectoryProcess` workers (threads) to use.
b: ~numpy.ndarray
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
c: ~numpy.ndarray
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
a: ~numpy.ndarray
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
n_dim: int
Dynamical system dimension.
n_traj: int
The number of trajectories (initial conditions) of the non-linear ODEs computed at the last integration
performed by the integrator.
n_tg_traj: int
The number of trajectories (initial conditions) the linear ODEs computed at the last integration
performed by the integrator.
n_records: int
The number of saved states of the last integration performed by the integrator.
ic: ~numpy.ndarray
Store the integrator non-linear ODEs initial conditions.
tg_ic: ~numpy.ndarray
Store the integrator linear ODEs initial conditions.
func: callable
Last function :math:`\\boldsymbol{f}` used by the integrator to integrate.
func_jac: callable
Last Jacobian matrix function :math:`\\boldsymbol{J}` used by the integrator to integrate.
"""
def __init__(self, num_threads=None, b=None, c=None, a=None, number_of_dimensions=None):
if num_threads is None:
self.num_threads = multiprocessing.cpu_count()
else:
self.num_threads = num_threads
# Default is RK4
if a is None and b is None and c is None:
self.c = np.array([0., 0.5, 0.5, 1.])
self.b = np.array([1./6, 1./3, 1./3, 1./6])
self.a = np.zeros((len(self.c), len(self.b)))
self.a[1, 0] = 0.5
self.a[2, 1] = 0.5
self.a[3, 2] = 1.
else:
self.a = a
self.b = b
self.c = c
self.ic = None
self.tg_ic = None
self._time = None
self._recorded_traj = None
self._recorded_fmatrix = None
self.n_traj = 0
self.n_tg_traj = 0
self.n_dim = number_of_dimensions
self.n_records = 0
self._write_steps = 0
self._time_direction = 1
self._adjoint = False
self._boundary = None
self._inverse = 1.
self.func = None
self.func_jac = None
self._ics_queue = None
self._traj_queue = None
self._processes_list = list()
[docs] def terminate(self):
"""Stop the workers (threads) and release the resources of the integrator."""
for process in self._processes_list:
process.terminate()
process.join()
[docs] def start(self):
"""Start or restart the workers (threads) of the integrator.
Warnings
--------
If the integrator was not previously terminated, it will be terminated first in the case
of a restart.
"""
self.terminate()
self._processes_list = list()
self._ics_queue = multiprocessing.JoinableQueue()
self._traj_queue = multiprocessing.Queue()
for i in range(self.num_threads):
self._processes_list.append(TglsTrajectoryProcess(i, self.func, self.func_jac, self.b, self.c, self.a,
self._ics_queue, self._traj_queue))
for process in self._processes_list:
process.daemon = True
process.start()
[docs] def set_func(self, f, fjac, ic_init=True):
"""Set the `Numba`_-jitted function :math:`\\boldsymbol{f}` and Jacobian matrix function
:math:`\\boldsymbol{\\mathrm{J}}` to integrate.
.. _Numba: https://numba.pydata.org/
Parameters
----------
f: callable
The `Numba`_-jitted function :math:`\\boldsymbol{f}`.
Should have the signature ``f(t, x)`` where ``x`` is the state value and ``t`` is the time.
fjac: callable
The `Numba`_-jitted Jacobian matrix function :math:`\\boldsymbol{J}`.
Should have the signature ``J(t, x)`` where ``x`` is the state value and ``t`` is the time.
ic_init: bool, optional
Re-initialize or not the initial conditions of the integrator. Default to `True`.
Warnings
--------
This function restarts the integrator!
"""
self.func = f
self.func_jac = fjac
if ic_init:
self.ic = None
self.start()
[docs] def set_bca(self, b=None, c=None, a=None, ic_init=True):
"""Set the coefficients of the `Runge-Kutta method`_ and restart the integrator. s
.. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
Parameters
----------
b: None or ~numpy.ndarray, optional
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
If `None`, does not reinitialize these coefficients.
c: None or ~numpy.ndarray, optional
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
If `None`, does not reinitialize these coefficients.
a: None or ~numpy.ndarray, optional
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
If `None`, does not reinitialize these coefficients.
ic_init: bool, optional
Re-initialize or not the initial conditions of the integrator. Default to `True`.
"""
if a is not None:
self.a = a
if b is not None:
self.b = b
if c is not None:
self.c = c
if ic_init:
self.ic = None
self.start()
[docs] def initialize(self, convergence_time, dt, pert_size=0.01, reconvergence_time=None, forward=True,
number_of_trajectories=1, ic=None, reconverge=None):
"""Initialize the integration on an attractor by running it for a transient time,
For an ensemble of initial conditions, can do the same transient time for each, or the
`convergence_time` for the first one, and a smaller `reconvergence_time` for the subsequent ones.
This results into initial conditions on the attractor, stored in :attr:`ic`.
Parameters
----------
convergence_time: float
Transient time needed to converge to the attractor.
dt: float
Timestep of the transient integration.
pert_size:float, optional
If the reconvergence is activated, size of the perturbation to add to the previous ic to find
the next one. Default to 0.01 .
reconvergence_time: None or float, optional
Transient time for the subsequent trajectories after the first long `transient_time`.
forward: bool, optional
If true, integrate the ODEs forward in time, else, integrate backward in time. In case of backward integration, the
initial condition `ic` becomes a final condition. Default to forward integration.
number_of_trajectories: int
Number of initial conditions to find. Default to 1. Inactive if `ic` is provided.
ic: None or ~numpy.ndarray(float), optional
Initial condition of the system. Can be a 1D or a 2D array:
* 1D: Provide a single initial condition.
Should be of shape (`n_dim`,) where `n_dim` = :math:`\mathrm{dim}(\\boldsymbol{x})`.
* 2D: Provide an ensemble of initial condition.
Should be of shape (`n_traj`, `n_dim`) where `n_dim` = :math:`\mathrm{dim}(\\boldsymbol{x})`,
and where `n_traj` is the number of initial conditions.
If `None`, use `number_trajectories` random initial conditions. Default to `None`.
reconverge: bool
Use or not the smaller transient time reconvergence with a perturbation
after the first initial conditions have been computed. If activated, only use the :attr:`num_threads`
first initial conditions of the `ic` arguments. Default to `False`.
"""
if reconverge is None:
reconverge = False
if ic is None:
if self.n_dim is not None:
i = self.n_dim
else:
i = 1
while True:
self.ic = np.zeros(i)
try:
x = self.func(0., self.ic)
except:
i += 1
else:
break
i = len(self.func(0., self.ic))
if number_of_trajectories > self.num_threads:
reconverge = True
tmp_ic = np.zeros((number_of_trajectories, i))
tmp_ic[:self.num_threads] = np.random.randn(self.num_threads, i)
else:
tmp_ic = np.random.randn(number_of_trajectories, i)
else:
tmp_ic = ic.copy()
if len(tmp_ic.shape) > 1:
number_of_trajectories = tmp_ic.shape[0]
if reconverge and reconvergence_time is not None:
self.integrate(0., convergence_time, dt, ic=tmp_ic[:self.num_threads], write_steps=0, forward=forward)
t, x, fm = self.get_trajectories()
tmp_ic[:self.num_threads] = x
if number_of_trajectories - self.num_threads > self.num_threads:
next_len = self.num_threads
else:
next_len = number_of_trajectories - self.num_threads
index = self.num_threads
while True:
perturbation = pert_size * np.random.randn(next_len, x.shape[1])
self.integrate(0., reconvergence_time, dt, ic=x[:next_len]+perturbation, write_steps=0, forward=forward)
t, x, fm = self.get_trajectories()
tmp_ic[index:index+next_len] = x
index += next_len
if number_of_trajectories - index > self.num_threads:
next_len = self.num_threads
else:
next_len = number_of_trajectories - index
if next_len <= 0:
break
self.ic = tmp_ic
else:
self.integrate(0., convergence_time, dt, ic=tmp_ic, write_steps=0, forward=forward)
t, x, fm = self.get_trajectories()
self.ic = x
[docs] def integrate(self, t0, t, dt, ic=None, tg_ic=None, forward=True, adjoint=False, inverse=False, boundary=None,
write_steps=1):
"""Integrate simultaneously the non-linear and linearized ordinary differential equations (ODEs)
.. math:: \\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})
and
.. math :: \\dot{\\boldsymbol{\\delta x}} = \\boldsymbol{\\mathrm{J}}(t, \\boldsymbol{x}) \cdot \\boldsymbol{\\delta x}
with a specified `Runge-Kutta method`_ and workers.
The function :math:`\\boldsymbol{f}` is the `Numba`_ jitted function stored in :attr:`func`.
The function :math:`\\boldsymbol{J}` is the `Numba`_ jitted function stored in :attr:`func_jac`.
The result of the integration can be obtained afterward by calling :meth:`get_trajectories`.
.. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
.. _Numba: https://numba.pydata.org/
.. _fundamental matrix of solutions: https://en.wikipedia.org/wiki/Fundamental_matrix_(linear_differential_equation)
Parameters
----------
t0: float
Initial time of the time integration. Corresponds to the initial condition.
t: float
Final time of the time integration. Corresponds to the final condition.
dt: float
Timestep of the integration.
ic: None or ~numpy.ndarray(float), optional
Initial (or final) conditions of the system. Can be a 1D or a 2D array:
* 1D: Provide a single initial condition.
Should be of shape (`n_dim`,) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`.
* 2D: Provide an ensemble of initial condition.
Should be of shape (`n_traj`, `n_dim`) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`,
and where `n_traj` is the number of initial conditions.
If `None`, use the initial conditions stored in :attr:`ic`.
If then :attr:`ic` is `None`, use a zero initial condition.
Default to `None`.
If the `forward` argument is `False`, it specifies final conditions.
tg_ic: None or ~numpy.ndarray(float), optional
Initial (or final) conditions of the linear ODEs
:math:`\\dot{\\boldsymbol{\\delta x}} = \\boldsymbol{\\mathrm{J}}(t, \\boldsymbol{x}) \\cdot \\boldsymbol{\\delta x}`. \n
Can be a 1D, a 2D or a 3D array:
* 1D: Provide a single initial condition. This initial condition of the linear ODEs will be the same used for each
initial condition `ic` of the ODEs :math:`\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})`
Should be of shape (`n_dim`,) where `n_dim` = :math:`\mathrm{dim}(\\boldsymbol{x})`.
* 2D: Two sub-cases:
+ If `tg_ic.shape[0]`=`ic.shape[0]`, assumes that each initial condition `ic[i]` of :math:`\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})`,
correspond to a different initial condition `tg_ic[i]`.
+ Else, assumes and integrate an ensemble of `n_tg_traj` initial condition of the linear ODEs for each
initial condition of :math:`\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})`.
* 3D: An array of shape (`n_traj`, `n_tg_traj`, `n_dim`) which provide an ensemble of `n_tg_ic` initial conditions
specific to each of the `n_traj` initial conditions of :math:`\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})`.
If `None`, use the identity matrix as initial condition, returning the `fundamental matrix of solutions`_ of the
linear ODEs.
Default to `None`.
If the `forward` argument is `False`, it specifies final conditions.
forward: bool, optional
If true, integrate the ODEs forward in time, else, integrate backward in time. In case of backward integration, the
initial condition `ic` becomes a final condition. Default to forward integration.
adjoint: bool, optional
If true, integrate the tangent :math:`\dot{\\boldsymbol{\delta x}} = \\boldsymbol{\mathrm{J}}(t, \\boldsymbol{x}) \cdot \\boldsymbol{\delta x}` ,
else, integrate the adjoint linear model :math:`\dot{\\boldsymbol{\delta x}} = \\boldsymbol{\mathrm{J}}^T(t, \\boldsymbol{x}) \cdot \\boldsymbol{\delta x}`.
Integrate the tangent model by default.
inverse: bool, optional
Wheter or not to invert the Jacobian matrix
:math:`\\boldsymbol{\mathrm{J}}(t, \\boldsymbol{x}) \\rightarrow \\boldsymbol{\mathrm{J}}^{-1}(t, \\boldsymbol{x})`.
`False` by default.
boundary: None or callable, optional
Allow to add a inhomogeneous term to linear ODEs:
:math:`\dot{\\boldsymbol{\delta x}} = \\boldsymbol{\mathrm{J}}(t, \\boldsymbol{x}) \cdot \\boldsymbol{\delta x} + \Psi(t, \\boldsymbol{x})`.
The boundary :math:`\Psi` should have the same signature as :math:`\\boldsymbol{\mathrm{J}}`, i.e. ``func(t, x)``.
If `None`, don't add anything (homogeneous case). `None` by default.
write_steps: int, optional
Save the state of the integration in memory every `write_steps` steps. The other intermediary
steps are lost. It determines the size of the returned objects. Default is 1.
Set to 0 to return only the final state.
"""
if self.func is None or self.func_jac is None:
print('No function to integrate defined!')
return 0
if ic is None:
if self.ic is None:
if self.n_dim is not None:
i = self.n_dim
else:
i = 1
while True:
self.ic = np.zeros(i)
try:
x = self.func(0., self.ic)
except:
i += 1
else:
break
i = len(self.func(0., self.ic))
self.ic = np.zeros(i)
else:
self.ic = ic
if len(self.ic.shape) == 1:
self.ic = self.ic.reshape((1, -1))
self.n_traj = self.ic.shape[0]
self.n_dim = self.ic.shape[1]
self._time = np.concatenate((np.arange(t0, t, dt), np.full((1,), t)))
self._write_steps = write_steps
if tg_ic is None:
tg_ic = np.eye(self.ic.shape[1])
tg_ic_sav = tg_ic.copy()
if len(tg_ic.shape) == 1:
tg_ic = tg_ic.reshape((1, -1, 1))
ict = tg_ic.copy()
for i in range(self.n_traj-1):
ict = np.concatenate((ict, tg_ic))
self.tg_ic = ict
elif len(tg_ic.shape) == 2:
if tg_ic.shape[0] == self.n_traj:
self.tg_ic = tg_ic[..., np.newaxis]
else:
tg_ic = tg_ic[np.newaxis, ...]
tg_ic = np.swapaxes(tg_ic, 1, 2)
ict = tg_ic.copy()
for i in range(self.n_traj-1):
ict = np.concatenate((ict, tg_ic))
self.tg_ic = ict
elif len(tg_ic.shape) == 3:
if tg_ic.shape[1] != self.n_dim:
self.tg_ic = np.swapaxes(tg_ic, 1, 2)
self.n_tg_traj = self.tg_ic.shape[1]
if forward:
self._time_direction = 1
else:
self._time_direction = -1
self._adjoint = adjoint
if boundary is None:
self._boundary = _zeros_func
else:
self._boundary = boundary
self._inverse = 1.
if inverse:
self._inverse *= -1.
if write_steps == 0:
self.n_records = 1
else:
tot = self._time[::self._write_steps]
self.n_records = len(tot)
if tot[-1] != self._time[-1]:
self.n_records += 1
self._recorded_traj = np.zeros((self.n_traj, self.n_dim, self.n_records))
self._recorded_fmatrix = np.zeros((self.n_traj, self.tg_ic.shape[1], self.tg_ic.shape[2], self.n_records))
for i in range(self.n_traj):
self._ics_queue.put((i, self._time, self.ic[i], self.tg_ic[i], self._time_direction, self._write_steps,
self._adjoint, self._inverse, self._boundary))
self._ics_queue.join()
for i in range(self.n_traj):
args = self._traj_queue.get()
self._recorded_traj[args[0]] = args[1]
self._recorded_fmatrix[args[0]] = args[2]
if len(tg_ic_sav.shape) == 2:
if self._recorded_fmatrix.shape[1:3] != tg_ic_sav.shape:
self._recorded_fmatrix = np.swapaxes(self._recorded_fmatrix, 1, 2)
elif len(tg_ic_sav.shape) == 3:
if tg_ic_sav.shape[1] != self.n_dim:
if self._recorded_fmatrix.shape[:3] != tg_ic_sav.shape:
self._recorded_fmatrix = np.swapaxes(self._recorded_fmatrix, 1, 2)
[docs] def get_trajectories(self):
"""Returns the result of the previous integrator integration.
Returns
-------
time, traj, tg_traj: ~numpy.ndarray
The result of the integration:
* **time:** time at which the state of the system was saved. Array of shape (:attr:`n_records`,).
* **traj:** Saved dynamical system states. 3D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_records`).
If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_dim`, :attr:`n_records`) is returned instead.
* **tg_traj:** Saved states of the linear ODEs.
Depending on the input initial conditions of both ODEs,
it is at maximum a 4D array of shape
(:attr:`n_traj`, :attr:`n_tg_traj`, :attr:`n_dim`, :attr:`n_records`).
If one of the dimension is 1, it is squeezed.
"""
if self._write_steps > 0:
if self._time_direction == 1:
if self._time[::self._write_steps][-1] == self._time[-1]:
return self._time[::self._write_steps], np.squeeze(self._recorded_traj), \
np.squeeze(self._recorded_fmatrix)
else:
return np.concatenate((self._time[::self._write_steps], np.full((1,), self._time[-1]))), \
np.squeeze(self._recorded_traj), np.squeeze(self._recorded_fmatrix)
else:
rtime = reverse(self._time[::-self._write_steps])
if rtime[0] == self._time[0]:
return rtime, np.squeeze(self._recorded_traj), np.squeeze(self._recorded_fmatrix)
else:
return np.concatenate((np.full((1,), self._time[0]), rtime)), np.squeeze(self._recorded_traj), \
np.squeeze(self._recorded_fmatrix)
else:
return self._time[-1], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_fmatrix)
[docs] def get_ic(self):
"""Returns the initial conditions of the non-linear ODEs stored in the integrator.
Returns
-------
~numpy.ndarray
The initial conditions.
"""
return self.ic
[docs] def set_ic(self, ic):
"""Direct setter for the integrator's non-linear ODEs initial conditions
Parameters
----------
ic: ~numpy.ndarray(float)
Initial condition of the system. Can be a 1D or a 2D array:
* 1D: Provide a single initial condition.
Should be of shape (`n_dim`,) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`.
* 2D: Provide an ensemble of initial condition.
Should be of shape (`n_traj`, `n_dim`) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`,
and where `n_traj` is the number of initial conditions.
"""
self.ic = ic
[docs] def get_tg_ic(self):
"""Returns the initial conditions of the linear ODEs stored in the integrator.
Returns
-------
~numpy.ndarray
The initial conditions.
"""
return self.tg_ic
[docs] def set_tg_ic(self, tg_ic):
"""Direct setter for the integrator's linear ODEs initial conditions
Parameters
----------
tg_ic: ~numpy.ndarray(float)
Initial condition of the linear ODEs
:math:`\dot{\\boldsymbol{\delta x}} = \\boldsymbol{\mathrm{J}}(t, \\boldsymbol{x}) \cdot \\boldsymbol{\delta x}`. \n
Can be a 1D, a 2D or a 3D array:
* 1D: Provide a single initial condition. This initial condition of the linear ODEs will be the same used for each
initial condition `ic` of the ODEs :math:`\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})`
Should be of shape (`n_dim`,) where `n_dim` = :math:`\mathrm{dim}(\\boldsymbol{x})`.
* 2D: Two sub-cases:
+ If `tg_ic.shape[0]`=`ic.shape[0]`, assumes that each initial condition `ic[i]` of :math:`\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})`,
correspond to a different initial condition `tg_ic[i]`.
+ Else, assumes and integrate an ensemble of `n_tg_traj` initial condition of the linear ODEs for each
initial condition of :math:`\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})`.
* 3D: An array of shape (`n_traj`, `n_tg_traj`, `n_dim`) which provide an ensemble of `n_tg_ic` initial conditions
specific to each of the `n_traj` initial conditions of :math:`\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x})`.
"""
self.tg_ic = tg_ic
[docs]class TglsTrajectoryProcess(multiprocessing.Process):
""":class:`RungeKuttaTglsIntegrator`'s workers class. Allows to multi-thread time integration.
.. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
.. _Numba: https://numba.pydata.org/
Parameters
----------
processID: int
Number identifying the worker.
func: callable
`Numba`_-jitted function to integrate assigned to the worker.
func_jac: callable
`Numba`_-jitted Jacobian matrix function to integrate assigned to the worker.
b: ~numpy.ndarray, optional
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
c: ~numpy.ndarray, optional
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
a: ~numpy.ndarray, optional
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
ics_queue: multiprocessing.JoinableQueue
Queue to which the worker ask for initial conditions input.
traj_queue: multiprocessing.Queue
Queue to which the worker returns the integration results.
Attributes
----------
processID: int
Number identifying the worker.
func: callable
`Numba`_-jitted function to integrate assigned to the worker.
func_jac: callable
`Numba`_-jitted Jacobian matrix function to integrate assigned to the worker.
b: ~numpy.ndarray
Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ .
c: ~numpy.ndarray
Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ .
a: ~numpy.ndarray
Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ .
"""
def __init__(self, processID, func, func_jac, b, c, a, ics_queue, traj_queue):
super().__init__()
self.processID = processID
self._ics_queue = ics_queue
self._traj_queue = traj_queue
self.func = func
self.func_jac = func_jac
self.a = a
self.b = b
self.c = c
[docs] def run(self):
"""Main worker computing routine. Perform the time integration with the fetched initial conditions and parameters."""
while True:
args = self._ics_queue.get()
recorded_traj, recorded_fmatrix = _integrate_runge_kutta_tgls_jit(self.func, self.func_jac, args[1], args[2][np.newaxis, ...],
args[3][np.newaxis, ...], args[4], args[5],
self.b, self.c, self.a,
args[6], args[7], args[8])
self._traj_queue.put((args[0], recorded_traj, recorded_fmatrix))
self._ics_queue.task_done()
if __name__ == "__main__":
import matplotlib.pyplot as plt
from scipy.integrate import odeint
@njit
def f(t, x):
return - np.array([1., 2., 3.]) * x
def fr(x, t):
return f(t, x)
ic = np.random.randn(6).reshape(2, 3)
integrator = RungeKuttaIntegrator()
integrator.set_func(f)
integrator.integrate(0., 10., 0.01, ic=ic, write_steps=1)
time, r = integrator.get_trajectories()
t = np.arange(0., 10., 0.1)
t = np.concatenate((t[::3], np.full((1,), 10.)))
rl = list()
for i in range(ic.shape[0]):
rl.append(odeint(fr, ic[i], t).T)
plt.figure()
for i in range(ic.shape[0]):
p, = plt.plot(time, r[i, 0])
c = p.get_color()
plt.plot(t, rl[i][0], color=c, ls='--')
for j in range(1, ic.shape[1]):
p, = plt.plot(time, r[i, j], color=c)
plt.plot(t, rl[i][j], color=c, ls='--')
plt.title('Forward')
integrator.integrate(0., 10., 0.01, ic=ic, forward=False, write_steps=1)
timet, rt = integrator.get_trajectories()
rlt = list()
for i in range(ic.shape[0]):
rlt.append(odeint(fr, ic[i], reverse(t)).T)
plt.figure()
for i in range(ic.shape[0]):
p, = plt.plot(timet, rt[i, 0])
c = p.get_color()
plt.plot(t, reverse(rlt[i][0]), color=c, ls='--')
for j in range(1, ic.shape[1]):
p, = plt.plot(timet, rt[i, j], color=c)
plt.plot(t, reverse(rlt[i][j]), color=c, ls='--')
plt.title('Backward')
integrator.integrate(0., 10., 0.01, ic=ic, write_steps=0)
tt, re = integrator.get_trajectories()
print(tt)
print(r[0, :, -1], re[0])
plt.show(block=False)
a = 0.25
F = 16.
G = 3.
b = 6.
@njit
def fL84(t, x):
xx = -x[1] ** 2 - x[2] ** 2 - a * x[0] + a * F
yy = x[0] * x[1] - b * x[0] * x[2] - x[1] + G
zz = b * x[0] * x[1] + x[0] * x[2] - x[2]
return np.array([xx, yy, zz])
@njit
def DfL84(t, x):
return np.array([[ -a , -2. * x[1], -2. * x[2]],
[x[1] - b * x[2], -1. + x[0], -b * x[0]],
[b * x[1] + x[2], b * x[0], -1. + x[0]]])
integrator.set_func(fL84)
integrator.integrate(0., 10000., 0.01, write_steps=0)
tt, traj = integrator.get_trajectories()
integrator.integrate(0.,20.,0.01,ic=traj, write_steps=10)
tt, traj = integrator.get_trajectories()
integrator.integrate(0.,20.,0.01,ic=traj[:,-1],write_steps=10, forward=False)
ttb, trajb = integrator.get_trajectories()
plt.figure()
plt.plot(tt, traj[0])
plt.plot(ttb, trajb[0])
plt.show(block=False)
plt.title('Lorenz 63 - Forward then backward')
integrator.integrate(0., 10000., 0.01, write_steps=0)
tt, traj = integrator.get_trajectories()
integrator.integrate(0.,20.,0.01,ic=traj,write_steps=10, forward=False)
tt, trajb = integrator.get_trajectories()
integrator.integrate(0.,20.,0.01,ic=trajb[:,0],write_steps=10)
ttb, traj = integrator.get_trajectories()
plt.figure()
plt.plot(tt, traj[0])
plt.plot(ttb, trajb[0])
plt.show(block=False)
plt.title('Lorenz 63 - Backward then forward')
integrator.terminate()
tgls_integrator = RungeKuttaTglsIntegrator()
tgls_integrator.set_func(fL84, DfL84)
@njit
def tboundary(t, x):
return np.array([0.,x[1],0.])
ic = np.random.randn(4, 3)
tgls_integrator.initialize(10., 0.01, ic=ic)
tgls_integrator.integrate(0., 20., 0.01, write_steps=10, tg_ic=np.zeros(3), boundary=tboundary)
# x, fm = _integrate_runge_kutta_tgls_jit(fL84, DfL84, tgls_integrator.time, tgls_integrator.ic[0][np .newaxis,...], np.zeros((1,3,1)), 1, 1, tgls_integrator.b, tgls_integrator.c, tgls_integrator.a, False, 1., tboundary)
t, x, fm = tgls_integrator.get_trajectories()
tgls_integrator.terminate()