Source code for qgs.toolbox.lyapunov


"""
    Lyapunov module
    =================

    Module with the classes of multi-thread the computation of the various
    `Lyapunov vectors`_ and `exponents`_. Integrate using the `Runge-Kutta method`_
    defined in the :mod:`~.integrators.integrate` module.
    See :cite:`lyap-KP2012` for more details on the Lyapunov vectors theoretical framework.

    Module classes
    --------------

    * :class:`LyapunovsEstimator` to estimate the Backward and Forward Lyapunov Vectors (BLVs and FLVs) along a trajectory
    * :class:`CovariantLyapunovsEstimator` to estimate the Covariant Lyapunov Vectors (CLVs) along a trajectory
    
    .. _Lyapunov vectors: https://en.wikipedia.org/wiki/Lyapunov_vector
    .. _exponents: https://en.wikipedia.org/wiki/Lyapunov_exponent
    .. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods
    .. _Numba: https://numba.pydata.org/


    References
    ----------

    .. bibliography:: ../model/ref.bib
        :labelprefix: LYAP-
        :keyprefix: lyap-
"""

from numba import njit
import numpy as np
import qgs.integrators.integrate as integrate
from qgs.functions.util import normalize_matrix_columns, solve_triangular_matrix, reverse

import multiprocessing


# TODO: change the usage of np.squeeze in the return of the estimators. Use specific shape descriptors instead.

[docs]class LyapunovsEstimator(object): """Class to compute the Forward and Backward `Lyapunov vectors`_ and `exponents`_ along a trajectory of a dynamical system .. math:: \\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x}) with a set of :class:`LyapProcess` and a specified `Runge-Kutta method`_. The tangent linear model must also be provided. I.e. one must provide the linearized ODEs .. math :: \\dot{\\boldsymbol{\\delta x}} = \\boldsymbol{\\mathrm{J}}(t, \\boldsymbol{x}) \\cdot \\boldsymbol{\\delta x} where :math:`\\boldsymbol{\\mathrm{J}} = \\frac{\\partial \\boldsymbol{f}}{\\partial \\boldsymbol{x}}` is the Jacobian matrix of :math:`\\boldsymbol{f}`. The method used to compute the Lyapunov vectors is the one introduced by Benettin et al. :cite:`lyap-BGGS1980`. Parameters ---------- num_threads: None or int, optional Number of :class:`LyapProcess` workers (threads) to use. If `None`, use the number of machine's cores available. Default to `None`. b: None or ~numpy.ndarray, optional Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . If `None`, use the classic RK4 method coefficients. Default to `None`. c: None or ~numpy.ndarray, optional Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . If `None`, use the classic RK4 method coefficients. Default to `None`. a: None or ~numpy.ndarray, optional Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . If `None`, use the classic RK4 method coefficients. Default to `None`. number_of_dimensions: None or int, optional Allow to hardcode the dynamical system dimension. If `None`, evaluate the dimension from the callable :attr:`func`. Default to `None`. Attributes ---------- num_threads: int Number of :class:`LyapProcess` workers (threads) to use. b: ~numpy.ndarray Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . c: ~numpy.ndarray Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . a: ~numpy.ndarray Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . n_dim: int Dynamical system dimension. n_vec: int The number of Lyapunov vectors to compute. n_traj: int The number of trajectories (initial conditions) computed at the last estimation performed by the estimator. n_records: int The number of saved states of the last estimation performed by the estimator. ic: ~numpy.ndarray Store the estimator initial conditions. func: callable Last function :math:`\\boldsymbol{f}` used by the estimator. func_jac: callable Last Jacobian matrix function :math:`\\boldsymbol{J}` used by the estimator. """ def __init__(self, num_threads=None, b=None, c=None, a=None, number_of_dimensions=None): if num_threads is None: self.num_threads = multiprocessing.cpu_count() else: self.num_threads = num_threads # Default is RK4 if a is None and b is None and c is None: self.c = np.array([0., 0.5, 0.5, 1.]) self.b = np.array([1./6, 1./3, 1./3, 1./6]) self.a = np.zeros((len(self.c), len(self.b))) self.a[1, 0] = 0.5 self.a[2, 1] = 0.5 self.a[3, 2] = 1. else: self.a = a self.b = b self.c = c self.ic = None self._time = None self._pretime = None self._recorded_traj = None self._recorded_exp = None self._recorded_vec = None self.n_traj = 0 self.n_dim = number_of_dimensions self.n_records = 0 self.n_vec = 0 self.write_steps = 0 self._adjoint = False self._forward = -1 self._inverse = 1. self.func = None self.func_jac = None self._ics_queue = None self._lyap_queue = None self._processes_list = list()
[docs] def terminate(self): """Stop the workers (threads) and release the resources of the estimator.""" for process in self._processes_list: process.terminate() process.join()
[docs] def start(self): """Start or restart the workers (threads) of the estimator. Warnings -------- If the estimator was not previously terminated, it will be terminated first in the case of a restart. """ self.terminate() self._processes_list = list() self._ics_queue = multiprocessing.JoinableQueue() self._lyap_queue = multiprocessing.Queue() for i in range(self.num_threads): self._processes_list.append(LyapProcess(i, self.func, self.func_jac, self.b, self.c, self.a, self._ics_queue, self._lyap_queue)) for process in self._processes_list: process.daemon = True process.start()
[docs] def set_bca(self, b=None, c=None, a=None, ic_init=True): """Set the coefficients of the `Runge-Kutta method`_ and restart the estimator. .. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods Parameters ---------- b: None or ~numpy.ndarray, optional Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . If `None`, does not reinitialize these coefficients. c: None or ~numpy.ndarray, optional Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . If `None`, does not reinitialize these coefficients. a: None or ~numpy.ndarray, optional Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . If `None`, does not reinitialize these coefficients. ic_init: bool, optional Re-initialize or not the initial conditions of the estimator. Default to `True`. """ if a is not None: self.a = a if b is not None: self.b = b if c is not None: self.c = c if ic_init: self.ic = None self.start()
[docs] def set_func(self, f, fjac): """Set the `Numba`_-jitted function :math:`\\boldsymbol{f}` and Jacobian matrix function :math:`\\boldsymbol{\\mathrm{J}}` to integrate. .. _Numba: https://numba.pydata.org/ Parameters ---------- f: callable The `Numba`_-jitted function :math:`\\boldsymbol{f}`. Should have the signature ``f(t, x)`` where ``x`` is the state value and ``t`` is the time. fjac: callable The `Numba`_-jitted Jacobian matrix function :math:`\\boldsymbol{J}`. Should have the signature ``J(t, x)`` where ``x`` is the state value and ``t`` is the time. Warnings -------- This function restarts the estimator! """ self.func = f self.func_jac = fjac self.start()
[docs] def compute_lyapunovs(self, t0, tw, t, dt, mdt, ic=None, write_steps=1, n_vec=None, forward=False, adjoint=False, inverse=False): """Estimate the Lyapunov vectors using the Benettin algorithm along a given trajectory, always integrating the said trajectory forward in time from `ic` at `t0` to time `t`. The result of the estimation can be obtained afterward by calling :meth:`get_lyapunovs`. If `forward` is `True`, it yields the Forward Lyapunov Vectors (FLVs) between `t0` and `tw`, otherwise, returns the Backward Lyapunov Vectors (BLVs) between `tw` and `t`. Parameters ---------- t0: float Initial time of the time integration. Corresponds to the initial condition's `ic` time. tw: float Time at which the algorithm start to store the Lyapunov vectors. Define thus also the transient before the which the Lyapunov vectors are considered as having not yet converged. Must be between `t0` and `t`. t: float Final time of the time integration. Corresponds to the final condition. dt: float Timestep of the integration. mdt: float Micro-timestep to integrate the tangent linear equation between the nonlinear system `dt` timesteps. Should be smaller or equal to `dt`. ic: None or ~numpy.ndarray(float), optional Initial conditions of the system. Can be a 1D or a 2D array: * 1D: Provide a single initial condition. Should be of shape (`n_dim`,) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`. * 2D: Provide an ensemble of initial condition. Should be of shape (`n_traj`, `n_dim`) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`, and where `n_traj` is the number of initial conditions. If `None`, use the initial conditions stored in :attr:`ic`. If then :attr:`ic` is `None`, use a zero initial condition. Default to `None`. forward: bool, optional If `True`, yield the `Forward Lyapunov Vectors` (FLVs) between `t0` and `tw`. If `False`, yield the `Backward Lyapunov Vectors` (BLVs) between `tw` and `t`. Default to `False`, i.e. Backward Lyapunov Vectors estimation. adjoint: bool, optional If true, integrate the tangent :math:`\\dot{\\boldsymbol{\\delta x}} = \\boldsymbol{\\mathrm{J}}(t, \\boldsymbol{x}) \\cdot \\boldsymbol{\\delta x}` , else, integrate the adjoint linear model :math:`\\dot{\\boldsymbol{\\delta x}} = \\boldsymbol{\\mathrm{J}}^T(t, \\boldsymbol{x}) \\cdot \\boldsymbol{\\delta x}`. Integrate the tangent model by default. inverse: bool, optional Whether or not to invert the Jacobian matrix :math:`\\boldsymbol{\\mathrm{J}}(t, \\boldsymbol{x}) \\rightarrow \\boldsymbol{\\mathrm{J}}^{-1}(t, \\boldsymbol{x})`. `False` by default. write_steps: int, optional Save the state of the integration in memory every `write_steps` steps. The other intermediary steps are lost. It determines the size of the returned objects. Default is 1. Set to 0 to return only the final state. n_vec: int, optional The number of Lyapunov vectors to compute. Should be smaller or equal to :attr:`n_dim`. """ if self.func is None or self.func_jac is None: print('No function to integrate defined!') return 0 if ic is None: i = 1 while True: self.ic = np.zeros(i) try: x = self.func(0., self.ic) except: i += 1 else: break i = len(self.func(0., self.ic)) self.ic = np.zeros(i) else: self.ic = ic if len(self.ic.shape) == 1: self.ic = self.ic.reshape((1, -1)) self.n_traj = self.ic.shape[0] self.n_dim = self.ic.shape[1] if n_vec is not None: self.n_vec = n_vec else: self.n_vec = self.n_dim self._pretime = np.concatenate((np.arange(t0, tw, dt), np.full((1,), tw))) self._time = np.concatenate((np.arange(tw, t, dt), np.full((1,), t))) self.write_steps = write_steps if forward: self._forward = 1 else: self._forward = -1 self._adjoint = adjoint self._inverse = 1. if inverse: self._inverse *= -1. if write_steps == 0: self.n_records = 1 else: if not forward: tot = self._time[::self.write_steps] self.n_records = len(tot) if tot[-1] != self._time[-1]: self.n_records += 1 else: tot = self._pretime[::self.write_steps] self.n_records = len(tot) if tot[-1] != self._pretime[-1]: self.n_records += 1 self._recorded_traj = np.zeros((self.n_traj, self.n_dim, self.n_records)) self._recorded_vec = np.zeros((self.n_traj, self.n_dim, self.n_vec, self.n_records)) self._recorded_exp = np.zeros((self.n_traj, self.n_vec, self.n_records)) for i in range(self.n_traj): self._ics_queue.put((i, self._pretime, self._time, mdt, self.ic[i], self.n_vec, self.write_steps, self._forward, self._adjoint, self._inverse)) self._ics_queue.join() for i in range(self.n_traj): args = self._lyap_queue.get() self._recorded_traj[args[0]] = args[1] self._recorded_exp[args[0]] = args[2] self._recorded_vec[args[0]] = args[3]
[docs] def get_lyapunovs(self): """Returns the result of the previous Lyapunov vectors estimation. Returns ------- time, traj, exponents, vectors: ~numpy.ndarray The result of the estimation: * **time:** Time at which the state of the system was saved. Array of shape (:attr:`n_records`,). * **traj:** Saved dynamical system states. 3D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_records`). If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_dim`, :attr:`n_records`) is returned instead. * **exponents:** Saved estimates of the local Lyapunov exponents along the trajectory. 3D array of shape (:attr:`n_traj`, :attr:`n_vec`, :attr:`n_records`). If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_vec`, :attr:`n_records`) is returned instead. * **vectors:** Saved estimates of the local Lyapunov vectors along the trajectory. Depending on the input initial conditions, it is maximum a 4D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_vec`, :attr:`n_records`). If one of the dimension is 1, it is squeezed. """ if self._forward == -1: tt = self._time else: tt = self._pretime if self.write_steps > 0: if tt[::self.write_steps][-1] == tt[-1]: return tt[::self.write_steps], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), \ np.squeeze(self._recorded_vec) else: return np.concatenate((tt[::self.write_steps], np.full((1,), tt[-1]))), \ np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), np.squeeze(self._recorded_vec) else: return tt[-1], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), \ np.squeeze(self._recorded_vec)
[docs]class LyapProcess(multiprocessing.Process): """:class:`LyapunovsEstimator`'s workers class. Allows to multi-thread Lyapunov vectors estimation. Parameters ---------- processID: int Number identifying the worker. func: callable `Numba`_-jitted function to integrate assigned to the worker. func_jac: callable `Numba`_-jitted Jacobian matrix function to integrate assigned to the worker. b: ~numpy.ndarray, optional Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . c: ~numpy.ndarray, optional Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . a: ~numpy.ndarray, optional Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . ics_queue: multiprocessing.JoinableQueue Queue to which the worker ask for initial conditions and parameters input. lyap_queue: multiprocessing.Queue Queue to which the worker returns the estimation results. Attributes ---------- processID: int Number identifying the worker. func: callable `Numba`_-jitted function to integrate assigned to the worker. func_jac: callable `Numba`_-jitted Jacobian matrix function to integrate assigned to the worker. b: ~numpy.ndarray Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . c: ~numpy.ndarray Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . a: ~numpy.ndarray Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . """ def __init__(self, processID, func, func_jac, b, c, a, ics_queue, lyap_queue): super().__init__() self.processID = processID self._ics_queue = ics_queue self._lyap_queue = lyap_queue self.func = func self.func_jac = func_jac self.a = a self.b = b self.c = c
[docs] def run(self): """Main worker computing routine. Perform the estimation with the fetched initial conditions and parameters.""" while True: args = self._ics_queue.get() if args[7] == -1: recorded_traj, recorded_exp, recorded_vec = _compute_backward_lyap_jit(self.func, self.func_jac, args[1], args[2], args[3], args[4][np.newaxis, :], args[5], args[6], args[8], args[9], self.b, self.c, self.a) else: recorded_traj, recorded_exp, recorded_vec = _compute_forward_lyap_jit(self.func, self.func_jac, args[1], args[2], args[3], args[4][np.newaxis, :], args[5], args[6], args[8], args[9], self.b, self.c, self.a) self._lyap_queue.put((args[0], np.squeeze(recorded_traj), np.squeeze(recorded_exp), np.squeeze(recorded_vec))) self._ics_queue.task_done()
@njit def _compute_forward_lyap_jit(f, fjac, time, posttime, mdt, ic, n_vec, write_steps, adjoint, inverse, b, c, a): ttraj = integrate._integrate_runge_kutta_jit(f, np.concatenate((time[:-1], posttime)), ic, 1, 1, b, c, a) recorded_traj, recorded_exp, recorded_vec = _compute_forward_lyap_traj_jit(f, fjac, time, posttime, ttraj, mdt, n_vec, write_steps, adjoint, inverse, b, c, a) return recorded_traj, recorded_exp, recorded_vec @njit def _compute_forward_lyap_traj_jit(f, fjac, time, posttime, ttraj, mdt, n_vec, write_steps, adjoint, inverse, b, c, a): traj = ttraj[:, :, :len(time)] posttraj = ttraj[:, :, len(time)-1:] n_traj = ttraj.shape[0] n_dim = ttraj.shape[1] Id = np.zeros((1, n_dim, n_dim)) Id[0] = np.eye(n_dim) if write_steps == 0: n_records = 1 else: tot = time[::write_steps] n_records = len(tot) if tot[-1] != time[-1]: n_records += 1 recorded_vec = np.zeros((n_traj, n_dim, n_vec, n_records)) recorded_traj = np.zeros((n_traj, n_dim, n_records)) recorded_exp = np.zeros((n_traj, n_vec, n_records)) rposttime = reverse(posttime) rtime = reverse(time) for i_traj in range(n_traj): y = np.zeros((1, n_dim)) qr = np.linalg.qr(np.random.random((n_dim, n_vec))) q = qr[0] m_exp = np.zeros((n_dim)) for ti, (tt, dt) in enumerate(zip(rposttime[:-1], np.diff(rposttime))): y[0] = posttraj[i_traj, :, -1-ti] subtime = np.concatenate((np.arange(tt + dt, tt, mdt), np.full((1,), tt))) y_new, prop = integrate._integrate_runge_kutta_tgls_jit(f, fjac, subtime, y, Id, -1, 0, b, c, a, adjoint, inverse, integrate._zeros_func) q_new = prop[0, :, :, 0] @ q qr = np.linalg.qr(q_new) q = qr[0] r = qr[1] iw = -1 for ti, (tt, dt) in enumerate(zip(rtime[:-1], np.diff(rtime))): y[0] = traj[i_traj, :, -1-ti] m_exp = np.log(np.abs(np.diag(r)))/dt if write_steps > 0 and np.mod(ti, write_steps) == 0: recorded_exp[i_traj, :, iw] = m_exp recorded_traj[i_traj, :, iw] = y[0] recorded_vec[i_traj, :, :, iw] = q iw -= 1 subtime = np.concatenate((np.arange(tt + dt, tt, mdt), np.full((1,), tt))) y_new, prop = integrate._integrate_runge_kutta_tgls_jit(f, fjac, subtime, y, Id, -1, 0, b, c, a, adjoint, inverse, integrate._zeros_func) q_new = prop[0, :, :, 0] @ q qr = np.linalg.qr(q_new) q = qr[0] r = qr[1] recorded_exp[i_traj, :, 0] = m_exp recorded_traj[i_traj, :, 0] = y[0] recorded_vec[i_traj, :, :, 0] = q return recorded_traj, recorded_exp, recorded_vec @njit def _compute_backward_lyap_jit(f, fjac, pretime, time, mdt, ic, n_vec, write_steps, adjoint, inverse, b, c, a): ttraj = integrate._integrate_runge_kutta_jit(f, np.concatenate((pretime[:-1], time)), ic, 1, 1, b, c, a) recorded_traj, recorded_exp, recorded_vec = _compute_backward_lyap_traj_jit(f, fjac, pretime, time, ttraj, mdt, n_vec, write_steps, adjoint, inverse, b, c, a) return recorded_traj, recorded_exp, recorded_vec @njit def _compute_backward_lyap_traj_jit(f, fjac, pretime, time, ttraj, mdt, n_vec, write_steps, adjoint, inverse, b, c, a): pretraj = ttraj[:, :, :len(pretime)] traj = ttraj[:, :, (len(pretime)-1):] n_traj = ttraj.shape[0] n_dim = ttraj.shape[1] Id = np.zeros((1, n_dim, n_dim)) Id[0] = np.eye(n_dim) if write_steps == 0: n_records = 1 else: tot = time[::write_steps] n_records = len(tot) if tot[-1] != time[-1]: n_records += 1 recorded_vec = np.zeros((n_traj, n_dim, n_vec, n_records)) recorded_traj = np.zeros((n_traj, n_dim, n_records)) recorded_exp = np.zeros((n_traj, n_vec, n_records)) for i_traj in range(n_traj): y = np.zeros((1, n_dim)) y[0] = pretraj[i_traj, :, 0] qr = np.linalg.qr(np.random.random((n_dim, n_vec))) q = qr[0] m_exp = np.zeros((n_dim)) for ti, (tt, dt) in enumerate(zip(pretime[:-1], np.diff(pretime))): subtime = np.concatenate((np.arange(tt, tt + dt, mdt), np.full((1,), tt + dt))) y_new, prop = integrate._integrate_runge_kutta_tgls_jit(f, fjac, subtime, y, Id, 1, 0, b, c, a, adjoint, inverse, integrate._zeros_func) y[0] = pretraj[i_traj, :, ti+1] q_new = prop[0, :, :, 0] @ q qr = np.linalg.qr(q_new) q = qr[0] r = qr[1] iw = 0 for ti, (tt, dt) in enumerate(zip(time[:-1], np.diff(time))): m_exp = np.log(np.abs(np.diag(r)))/dt if write_steps > 0 and np.mod(ti, write_steps) == 0: recorded_exp[i_traj, :, iw] = m_exp recorded_traj[i_traj, :, iw] = y[0] recorded_vec[i_traj, :, :, iw] = q iw += 1 subtime = np.concatenate((np.arange(tt, tt + dt, mdt), np.full((1,), tt + dt))) y_new, prop = integrate._integrate_runge_kutta_tgls_jit(f, fjac, subtime, y, Id, 1, 0, b, c, a, adjoint, inverse, integrate._zeros_func) y[0] = traj[i_traj, :, ti+1] q_new = prop[0, :, :, 0] @ q qr = np.linalg.qr(q_new) q = qr[0] r = qr[1] recorded_exp[i_traj, :, -1] = m_exp recorded_traj[i_traj, :, -1] = y[0] recorded_vec[i_traj, :, :, -1] = q return recorded_traj, recorded_exp, recorded_vec
[docs]class CovariantLyapunovsEstimator(object): """Class to compute the Covariant `Lyapunov vectors`_ (CLVs) and `exponents`_ along a trajectory of a dynamical system .. math:: \\dot{\\boldsymbol{x}} = \\boldsymbol{f}(t, \\boldsymbol{x}) with a set of :class:`LyapProcess` and a specified `Runge-Kutta method`_. The tangent linear model must also be provided. I.e. one must provide the linearized ODEs .. math :: \\dot{\\boldsymbol{\\delta x}} = \\boldsymbol{\\mathrm{J}}(t, \\boldsymbol{x}) \\cdot \\boldsymbol{\\delta x} where :math:`\\boldsymbol{\\mathrm{J}} = \\frac{\\partial \\boldsymbol{f}}{\\partial \\boldsymbol{x}}` is the Jacobian matrix of :math:`\\boldsymbol{f}`. Parameters ---------- num_threads: None or int, optional Number of :class:`LyapProcess` workers (threads) to use. If `None`, use the number of machine's cores available. Default to `None`. b: None or ~numpy.ndarray, optional Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . If `None`, use the classic RK4 method coefficients. Default to `None`. c: None or ~numpy.ndarray, optional Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . If `None`, use the classic RK4 method coefficients. Default to `None`. a: None or ~numpy.ndarray, optional Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . If `None`, use the classic RK4 method coefficients. Default to `None`. number_of_dimensions: None or int, optional Allow to hardcode the dynamical system dimension. If `None`, evaluate the dimension from the callable :attr:`func`. Default to `None`. method: int, optional Allow to select the method used to compute the CLVs. Presently can be `0` or `1`: * `0`: Uses the method of Ginelli et al. :cite:`lyap-GPTCLP2007`. Suitable for a trajectory not too long (depends on the memory available). * `1`: Uses the method of the intersection of the subspace spanned by the BLVs and FLVs described in :cite:`lyap-ER1985` and :cite:`lyap-KP2012` (see also :cite:`lyap-DPV2021`, Appendix A). Suitable for longer trajectories (uses less memory). Default to `0`, i.e. Ginelli et al. algorithm. noise_pert: float, optional Noise perturbation amplitude parameter of the diagonal of the R matrix in the QR decomposition during the Ginelli step. Mainly done to avoid ill-conditioned matrices near tangencies (see :cite:`lyap-KP2012`). Default to 0 (no perturbation). Only apply if using the Ginelli et al. algorithm, i.e. if ``method=0``. Attributes ---------- num_threads: int Number of :class:`LyapProcess` workers (threads) to use. b: ~numpy.ndarray Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . c: ~numpy.ndarray Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . a: ~numpy.ndarray Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . n_dim: int Dynamical system dimension. n_vec: int The number of Lyapunov vectors to compute. n_traj: int The number of trajectories (initial conditions) computed at the last estimation performed by the estimator. n_records: int The number of saved states of the last estimation performed by the estimator. ic: ~numpy.ndarray Store the estimator initial conditions. func: callable Last function :math:`\\boldsymbol{f}` used by the estimator. func_jac: callable Last Jacobian matrix function :math:`\\boldsymbol{J}` used by the estimator. method: int Select the method used to compute the CLVs: * `0`: Uses the method of Ginelli et al. :cite:`lyap-GPTCLP2007`. Suitable for a trajectory not too long (depends on the memory available). * `1`: Uses the method of the intersection of the subspaces spanned by the BLVs and FLVs described in :cite:`lyap-ER1985` and :cite:`lyap-KP2012` (see also :cite:`lyap-DPV2021`, Appendix A). Suitable for longer trajectories (uses less memory). noise_pert: float Noise perturbation parameter of the diagonal of the matrix resulting from the backpropagation during the Ginelli step. Mainly done to avoid ill-conditioned matrices near tangencies (see :cite:`lyap-KP2012`). Only apply if using the Ginelli et al. algorithm, i.e. if ``method=0``. """ def __init__(self, num_threads=None, b=None, c=None, a=None, number_of_dimensions=None, noise_pert=0., method=0): if num_threads is None: self.num_threads = multiprocessing.cpu_count() else: self.num_threads = num_threads # Default is RK4 if a is None and b is None and c is None: self.c = np.array([0., 0.5, 0.5, 1.]) self.b = np.array([1./6, 1./3, 1./3, 1./6]) self.a = np.zeros((len(self.c), len(self.b))) self.a[1, 0] = 0.5 self.a[2, 1] = 0.5 self.a[3, 2] = 1. else: self.a = a self.b = b self.c = c self.noise_pert = noise_pert self.ic = None self._time = None self._pretime = None self._aftertime = None self._recorded_traj = None self._recorded_exp = None self._recorded_vec = None self._recorded_bvec = None self._recorded_fvec = None self.n_traj = 0 self.n_dim = number_of_dimensions self.n_records = 0 self.n_vec = 0 self.write_steps = 0 self.method = method self.func = None self.func_jac = None self._ics_queue = None self._clv_queue = None self._processes_list = list()
[docs] def terminate(self): """Stop the workers (threads) and release the resources of the estimator.""" for process in self._processes_list: process.terminate() process.join()
[docs] def set_noise_pert(self, noise_pert): """Set the noise perturbation :attr:`noise_pert` parameter. Parameters ---------- noise_pert: float, optional Noise perturbation amplitude parameter of the diagonal of the R matrix in the QR decomposition during the Ginelli step. Mainly done to avoid ill-conditioned matrices near tangencies (see :cite:`lyap-KP2012`). Only apply if using the Ginelli et al. algorithm, i.e. if :attr:`method` is 0. """ self.noise_pert = noise_pert self.start()
[docs] def set_bca(self, b=None, c=None, a=None, ic_init=True): """Set the coefficients of the `Runge-Kutta method`_ and restart the estimator. .. _Runge-Kutta method: https://en.wikipedia.org/wiki/Runge%E2%80%93Kutta_methods Parameters ---------- b: None or ~numpy.ndarray, optional Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . If `None`, does not reinitialize these coefficients. c: None or ~numpy.ndarray, optional Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . If `None`, does not reinitialize these coefficients. a: None or ~numpy.ndarray, optional Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . If `None`, does not reinitialize these coefficients. ic_init: bool, optional Re-initialize or not the initial conditions of the estimator. Default to `True`. """ if a is not None: self.a = a if b is not None: self.b = b if c is not None: self.c = c if ic_init: self.ic = None self.start()
[docs] def start(self): """Start or restart the workers (threads) of the estimator. Warnings -------- If the estimator was not previously terminated, it will be terminated first in the case of a restart. """ self.terminate() self._processes_list = list() self._ics_queue = multiprocessing.JoinableQueue() self._clv_queue = multiprocessing.Queue() for i in range(self.num_threads): self._processes_list.append(ClvProcess(i, self.func, self.func_jac, self.b, self.c, self.a, self._ics_queue, self._clv_queue, self.noise_pert)) for process in self._processes_list: process.daemon = True process.start()
[docs] def set_func(self, f, fjac): """Set the `Numba`_-jitted function :math:`\\boldsymbol{f}` and Jacobian matrix function :math:`\\boldsymbol{\\mathrm{J}}` to integrate. .. _Numba: https://numba.pydata.org/ Parameters ---------- f: callable The `Numba`_-jitted function :math:`\\boldsymbol{f}`. Should have the signature ``f(t, x)`` where ``x`` is the state value and ``t`` is the time. fjac: callable The `Numba`_-jitted Jacobian matrix function :math:`\\boldsymbol{J}`. Should have the signature ``J(t, x)`` where ``x`` is the state value and ``t`` is the time. Warnings -------- This function restarts the estimator! """ self.func = f self.func_jac = fjac self.start()
[docs] def compute_clvs(self, t0, ta, tb, tc, dt, mdt, ic=None, write_steps=1, n_vec=None, method=None, backward_vectors=False, forward_vectors=False): """Estimate the Covariant Lyapunov Vectors (CLVs) along a given trajectory, always integrating the said trajectory forward in time from `ic` at `t0` to time `tc`. Return the CLVs between `ta` and `tb`. The result of the estimation can be obtained afterward by calling :meth:`get_clvs`. Parameters ---------- t0: float Initial time of the time integration. Corresponds to the initial condition's `ic` time. ta: float Define the time span between `t0` and `ta` of the first part of the algorithm, which obtain the convergence to the Backward Lyapunov vectors (initialization of the Benettin algorithm). tb: float Define the time span between `ta` and `tb` where the Covariant Lyapunov Vectors are computed. tc: float Final time of the time integration algorithm. Define the time span between `tb` and `tc` where, depending on the value of :attr:`method`, the convergence to the Forward Lyapunov Vectors or to the Covariant Lyapunov Vectors (thanks to the Ginelli steps) is obtained. dt: float Timestep of the integration. mdt: float Micro-timestep to integrate the tangent linear equation between the nonlinear system `dt` timesteps. Should be smaller or equal to `dt`. ic: None or ~numpy.ndarray(float), optional Initial conditions of the system. Can be a 1D or a 2D array: * 1D: Provide a single initial condition. Should be of shape (`n_dim`,) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`. * 2D: Provide an ensemble of initial condition. Should be of shape (`n_traj`, `n_dim`) where `n_dim` = :math:`\\mathrm{dim}(\\boldsymbol{x})`, and where `n_traj` is the number of initial conditions. If `None`, use the initial conditions stored in :attr:`ic`. If then :attr:`ic` is `None`, use a zero initial condition. Default to `None`. write_steps: int, optional Save the state of the integration in memory every `write_steps` steps. The other intermediary steps are lost. It determines the size of the returned objects. Default is 1. Set to 0 to return only the final state. n_vec: int, optional The number of Lyapunov vectors to compute. Should be smaller or equal to :attr:`n_dim`. method: int, optional Allow to select the method used to compute the CLVs. Presently can be `0` or `1`: * `0`: Uses the method of Ginelli et al. :cite:`lyap-GPTCLP2007`. Suitable for a trajectory not too long (depends on the memory available). * `1`: Uses the method of the intersection of the subspace spanned by the BLVs and FLVs described in :cite:`lyap-ER1985` and :cite:`lyap-KP2012` (see also :cite:`lyap-DPV2021`, Appendix A). Suitable for longer trajectories (uses less memory). Use the Ginelli et al. algorithm if not provided. backward_vectors: bool, optional Store also the computed Backward Lyapunov vectors between `ta` and `tb`. Only applies if ``method=1``. Does not store the BLVs if not provided. forward_vectors: bool, optional Store also the computed Forward Lyapunov vectors between `ta` and `tb`. Only applies if ``method=1``. Does not store the FLVs if not provided. """ if self.func is None or self.func_jac is None: print('No function to integrate defined!') return 0 if ic is None: i = 1 while True: self.ic = np.zeros(i) try: x = self.func(0., self.ic) except: i += 1 else: break i = len(self.func(0., self.ic)) self.ic = np.zeros(i) else: self.ic = ic if len(self.ic.shape) == 1: self.ic = self.ic.reshape((1, -1)) self.n_traj = self.ic.shape[0] self.n_dim = self.ic.shape[1] if n_vec is not None: self.n_vec = n_vec else: self.n_vec = self.n_dim if method is not None: self.method = method self._pretime = np.concatenate((np.arange(t0, ta, dt), np.full((1,), ta))) self._time = np.concatenate((np.arange(ta, tb, dt), np.full((1,), tb))) self._aftertime = np.concatenate((np.arange(tb, tc, dt), np.full((1,), tc))) self.write_steps = write_steps if write_steps == 0: self.n_records = 1 else: tot = self._time[::self.write_steps] self.n_records = len(tot) if tot[-1] != self._time[-1]: self.n_records += 1 self._recorded_traj = np.zeros((self.n_traj, self.n_dim, self.n_records)) self._recorded_vec = np.zeros((self.n_traj, self.n_dim, self.n_vec, self.n_records)) self._recorded_exp = np.zeros((self.n_traj, self.n_vec, self.n_records)) if self.method == 1: if forward_vectors: self._recorded_fvec = np.zeros((self.n_traj, self.n_dim, self.n_vec, self.n_records)) if backward_vectors: self._recorded_bvec = np.zeros((self.n_traj, self.n_dim, self.n_vec, self.n_records)) for i in range(self.n_traj): self._ics_queue.put((i, self._pretime, self._time, self._aftertime, mdt, self.ic[i], self.n_vec, self.write_steps, self.method)) self._ics_queue.join() for i in range(self.n_traj): args = self._clv_queue.get() self._recorded_traj[args[0]] = args[1] self._recorded_exp[args[0]] = args[2] self._recorded_vec[args[0]] = args[3] if self.method == 1: if forward_vectors: self._recorded_fvec[args[0]] = args[5] if backward_vectors: self._recorded_bvec[args[0]] = args[4]
[docs] def get_clvs(self): """Returns the result of the previous CLVs estimation. Returns ------- time, traj, exponents, vectors: ~numpy.ndarray The result of the estimation: * **time:** Time at which the state of the system was saved. Array of shape (:attr:`n_records`,). * **traj:** Saved dynamical system states. 3D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_records`). If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_dim`, :attr:`n_records`) is returned instead. * **exponents:** Saved estimates of the local Lyapunov exponents along the trajectory. 3D array of shape (:attr:`n_traj`, :attr:`n_vec`, :attr:`n_records`). If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_vec`, :attr:`n_records`) is returned instead. * **vectors:** Saved estimates of the local Lyapunov vectors along the trajectory. Depending on the input initial conditions, it is maximum a 4D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_vec`, :attr:`n_records`). If one of the dimension is 1, it is squeezed. """ if self.write_steps > 0: if self._time[::self.write_steps][-1] == self._time[-1]: return self._time[::self.write_steps], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), \ np.squeeze(self._recorded_vec) else: return np.concatenate((self._time[::self.write_steps], np.full((1,), self._time[-1]))), \ np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), np.squeeze(self._recorded_vec) else: return self._time[-1], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), \ np.squeeze(self._recorded_vec)
[docs] def get_blvs(self): """Returns the BLVs obtained during the previous CLVs estimation. Returns ------- time, traj, exponents, vectors: ~numpy.ndarray The result of the estimation: * **time:** Time at which the state of the system was saved. Array of shape (:attr:`n_records`,). * **traj:** Saved dynamical system states. 3D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_records`). If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_dim`, :attr:`n_records`) is returned instead. * **exponents:** Saved estimates of the local Lyapunov exponents along the trajectory. 3D array of shape (:attr:`n_traj`, :attr:`n_vec`, :attr:`n_records`). If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_vec`, :attr:`n_records`) is returned instead. * **vectors:** Saved estimates of the local Lyapunov vectors along the trajectory. Depending on the input initial conditions, it is maximum a 4D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_vec`, :attr:`n_records`). If one of the dimension is 1, it is squeezed. Warnings -------- The BLVs are only available if :attr:`method` is set to 1. """ if self._recorded_bvec is None: return None if self.write_steps > 0: if self._time[::self.write_steps][-1] == self._time[-1]: return self._time[::self.write_steps], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), \ np.squeeze(self._recorded_bvec) else: return np.concatenate((self._time[::self.write_steps], np.full((1,), self._time[-1]))), \ np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), np.squeeze(self._recorded_bvec) else: return self._time[-1], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), \ np.squeeze(self._recorded_bvec)
[docs] def get_flvs(self): """Returns the FLVs obtained during the previous CLVs estimation. Returns ------- time, traj, exponents, vectors: ~numpy.ndarray The result of the estimation: * **time:** Time at which the state of the system was saved. Array of shape (:attr:`n_records`,). * **traj:** Saved dynamical system states. 3D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_records`). If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_dim`, :attr:`n_records`) is returned instead. * **exponents:** Saved estimates of the local Lyapunov exponents along the trajectory. 3D array of shape (:attr:`n_traj`, :attr:`n_vec`, :attr:`n_records`). If :attr:`n_traj` = 1, a 2D array of shape (:attr:`n_vec`, :attr:`n_records`) is returned instead. * **vectors:** Saved estimates of the local Lyapunov vectors along the trajectory. Depending on the input initial conditions, it is maximum a 4D array of shape (:attr:`n_traj`, :attr:`n_dim`, :attr:`n_vec`, :attr:`n_records`). If one of the dimension is 1, it is squeezed. Warnings -------- The FLVs are only available if :attr:`method` is set to 1. """ if self._recorded_fvec is None: return None if self.write_steps > 0: if self._time[::self.write_steps][-1] == self._time[-1]: return self._time[::self.write_steps], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), \ np.squeeze(self._recorded_fvec) else: return np.concatenate((self._time[::self.write_steps], np.full((1,), self._time[-1]))), \ np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), np.squeeze(self._recorded_fvec) else: return self._time[-1], np.squeeze(self._recorded_traj), np.squeeze(self._recorded_exp), \ np.squeeze(self._recorded_fvec)
[docs]class ClvProcess(multiprocessing.Process): """:class:`CovariantLyapunovsEstimator`'s workers class. Allows to multi-thread Lyapunov vectors estimation. Parameters ---------- processID: int Number identifying the worker. func: callable `Numba`_-jitted function to integrate assigned to the worker. func_jac: callable `Numba`_-jitted Jacobian matrix function to integrate assigned to the worker. b: ~numpy.ndarray, optional Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . c: ~numpy.ndarray, optional Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . a: ~numpy.ndarray, optional Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . ics_queue: multiprocessing.JoinableQueue Queue to which the worker ask for initial conditions and parameters input. clv_queue: multiprocessing.Queue Queue to which the worker returns the estimation results. Attributes ---------- processID: int Number identifying the worker. func: callable `Numba`_-jitted function to integrate assigned to the worker. func_jac: callable `Numba`_-jitted Jacobian matrix function to integrate assigned to the worker. b: ~numpy.ndarray Vector of coefficients :math:`b_i` of the `Runge-Kutta method`_ . c: ~numpy.ndarray Matrix of coefficients :math:`c_{i,j}` of the `Runge-Kutta method`_ . a: ~numpy.ndarray Vector of coefficients :math:`a_i` of the `Runge-Kutta method`_ . """ def __init__(self, processID, func, func_jac, b, c, a, ics_queue, clv_queue, noise_pert): super().__init__() self.processID = processID self._ics_queue = ics_queue self._clv_queue = clv_queue self.func = func self.func_jac = func_jac self.a = a self.b = b self.c = c self.noise_pert = noise_pert
[docs] def run(self): """Main worker computing routine. Perform the estimation with the fetched initial conditions and parameters.""" while True: args = self._ics_queue.get() method = args[8] if method == 0: recorded_traj, recorded_exp, recorded_vec = _compute_clv_gin_jit(self.func, self.func_jac, args[1], args[2], args[3], args[4], args[5][np.newaxis, :], args[6], args[7], self.b, self.c, self.a, self.noise_pert) self._clv_queue.put((args[0], np.squeeze(recorded_traj), np.squeeze(recorded_exp), np.squeeze(recorded_vec))) else: recorded_traj, recorded_exp, recorded_vec, backward_vec, forward_vec = _compute_clv_sub_jit(self.func, self.func_jac, args[1], args[2], args[3], args[4], args[5][np.newaxis, :], args[7], self.b, self.c, self.a) self._clv_queue.put((args[0], np.squeeze(recorded_traj), np.squeeze(recorded_exp), np.squeeze(recorded_vec), np.squeeze(backward_vec), np.squeeze(forward_vec))) self._ics_queue.task_done()
# Ginelli et al. method @njit def _compute_clv_gin_jit(f, fjac, pretime, time, aftertime, mdt, ic, n_vec, write_steps, b, c, a, noise_pert): n_traj = ic.shape[0] n_dim = ic.shape[1] Id = np.zeros((1, n_dim, n_dim)) Id[0] = np.eye(n_dim) if write_steps == 0: n_records = 1 else: tot = time[::write_steps] n_records = len(tot) if tot[-1] != time[-1]: n_records += 1 recorded_vec = np.zeros((n_traj, n_dim, n_vec, n_records)) recorded_traj = np.zeros((n_traj, n_dim, n_records)) recorded_exp = np.zeros((n_traj, n_vec, n_records)) for i_traj in range(n_traj): # first part, making the backward vectors converge (initialization of the Benettin algorithm) y = np.zeros((1, n_dim)) y[0] = ic[i_traj] qr = np.linalg.qr(np.random.randn(n_dim, n_vec)) q = qr[0] for tt, dt in zip(pretime[:-1], np.diff(pretime)): subtime = np.concatenate((np.arange(tt, tt + dt, mdt), np.full((1,), tt + dt))) y_new, prop = integrate._integrate_runge_kutta_tgls_jit(f, fjac, subtime, y, Id, 1, 0, b, c, a, False, 1, integrate._zeros_func) y[0] = y_new[0, :, 0] q_new = prop[0, :, :, 0] @ q qr = np.linalg.qr(q_new) q = qr[0] # second part, stores the backward vectors and the r matrix (Benettin steps) # save the trajectories tw = len(time)-1 tew = len(time)+len(aftertime)-2 tmp_traj = np.zeros((tw+1, n_dim)) tmp_vec = np.zeros((tw+1, n_dim, n_vec)) tmp_R = np.zeros((tew, n_vec, n_vec)) for ti, (tt, dt) in enumerate(zip(time[:-1], np.diff(time))): tmp_vec[ti] = q.copy() tmp_traj[ti] = y[0].copy() subtime = np.concatenate((np.arange(tt, tt + dt, mdt), np.full((1,), tt + dt))) y_new, prop = integrate._integrate_runge_kutta_tgls_jit(f, fjac, subtime, y, Id, 1, 0, b, c, a, False, 1, integrate._zeros_func) y[0] = y_new[0, :, 0] q_new = prop[0, :, :, 0] @ q qr = np.linalg.qr(q_new) q = qr[0] tmp_R[ti] = qr[1].copy() tmp_vec[-1] = q.copy() tmp_traj[-1] = y[0].copy() # third part, stores the r matrix (Benettin steps) for ti, (tt, dt) in enumerate(zip(aftertime[:-1], np.diff(aftertime))): subtime = np.concatenate((np.arange(tt, tt + dt, mdt), np.full((1,), tt + dt))) y_new, prop = integrate._integrate_runge_kutta_tgls_jit(f, fjac, subtime, y, Id, 1, 0, b, c, a, False, 1, integrate._zeros_func) y[0] = y_new[0, :, 0] q_new = prop[0, :, :, 0] @ q qr = np.linalg.qr(q_new) q = qr[0] tmp_R[ti+tw] = qr[1].copy() # fourth part, going backward until tb (Ginelli steps) qr = np.linalg.qr(np.random.randn(n_dim, n_vec)) am, norm = normalize_matrix_columns(qr[1]) for ti in range(tew-1, tw, -1): am_new = solve_triangular_matrix(tmp_R[ti], am) noise = np.random.randn(n_dim) for i in range(n_vec): am_new[i, i] += noise[i] * noise_pert am, norm = normalize_matrix_columns(am_new) # fifth and last part, going backward from tb to ta (Ginelli steps) # save the data dte = np.concatenate((np.diff(time), np.full((1,), aftertime[1] - aftertime[0]))) iw = 1 for ti in range(tw, -1, -1): am_new = solve_triangular_matrix(tmp_R[ti], am) noise = np.random.randn(n_vec) for i in range(n_dim): am_new[i, i] += noise[i] * noise_pert am, mloc_exp = normalize_matrix_columns(am_new) if write_steps > 0 and np.mod(tw-ti, write_steps) == 0: recorded_traj[i_traj, :, -iw] = tmp_traj[ti] recorded_exp[i_traj, :, -iw] = -np.log(np.abs(mloc_exp))/dte[ti] recorded_vec[i_traj, :, :, -iw] = tmp_vec[ti] @ am iw += 1 recorded_traj[i_traj, :, 0] = tmp_traj[0] recorded_exp[i_traj, :, 0] = -np.log(np.abs(mloc_exp))/dte[0] recorded_vec[i_traj, :, :, 0] = tmp_vec[0] @ am return recorded_traj, recorded_exp, recorded_vec # Subspace intersection method @njit def _compute_clv_sub_jit(f, fjac, pretime, time, aftertime, mdt, ic, write_steps, b, c, a): n_traj = ic.shape[0] n_dim = ic.shape[1] lp = len(pretime) la = len(aftertime) ttraj = integrate._integrate_runge_kutta_jit(f, np.concatenate((pretime[:-1], time[:-1], aftertime)), ic, 1, 1, b, c, a) traj, exp, fvec = _compute_forward_lyap_traj_jit(f, fjac, time, aftertime, ttraj[:, :, lp-1:], mdt, n_dim, write_steps, False, 1, b, c, a) traj, exp, bvec = _compute_backward_lyap_traj_jit(f, fjac, pretime, time, ttraj[:, :, :-la+1], mdt, n_dim, write_steps, False, 1, b, c, a) recorded_traj = traj recorded_exp = np.zeros_like(traj) n_records = traj.shape[-1] recorded_vec = np.zeros((n_traj, n_dim, n_dim, n_records)) subtime = np.array([0., mdt]) y = np.zeros((1, n_dim)) vec = np.zeros((1, n_dim, n_dim)) for i_traj in range(n_traj): for ti in range(n_records): for j in range(n_dim): u, z, w = np.linalg.svd(bvec[i_traj, :, :j+1, ti].T @ fvec[i_traj, :, :n_dim-j, ti]) basis = bvec[i_traj, :, :j+1, ti] @ u recorded_vec[i_traj, :, j, ti] = basis[:, 0] y[0] = recorded_traj[i_traj, :, ti] vec[0] = recorded_vec[i_traj, :, :, ti] y_new, sol = integrate._integrate_runge_kutta_tgls_jit(f, fjac, subtime, y, vec, 1, 0, b, c, a, False, 1, integrate._zeros_func) soln, mloc_exp = normalize_matrix_columns(sol[0, :, :, 0]) recorded_exp[i_traj, :, ti] = np.log(np.abs(mloc_exp))/mdt return recorded_traj, recorded_exp, recorded_vec, bvec, fvec if __name__ == "__main__": a = 0.25 F = 8. G = 1. b = 4. @njit def fL84(t, x): xx = -x[1] ** 2 - x[2] ** 2 - a * x[0] + a * F yy = x[0] * x[1] - b * x[0] * x[2] - x[1] + G zz = b * x[0] * x[1] + x[0] * x[2] - x[2] return np.array([xx, yy, zz]) @njit def DfL84(t, x): return np.array([[ -a , -2. * x[1], -2. * x[2]], [x[1] - b * x[2], -1. + x[0], -b * x[0]], [b * x[1] + x[2], b * x[0], -1. + x[0]]]) sigma = 10. r = 28. bb = 8. / 3. @njit def fL63(t, x): xx = sigma * (x[1] - x[0]) yy = r * x[0] - x[1] - x[0] * x[2] zz = x[0] * x[1] - bb * x[2] return np.array([xx, yy, zz]) @njit def DfL63(t, x): return np.array([[-sigma, sigma, 0.], [r - x[2], -1., - x[0]], [x[1], x[0], -bb]]) ic = np.random.random(3) # tt, ic_L84 = integrate.integrate_runge_kutta(fL84, 0., 10000., 0.01, ic=ic, write_steps=0) tt, ic = integrate.integrate_runge_kutta(fL63, 0., 10000., 0.01, ic=ic, write_steps=0) print('Computing Backward Lyapunovs') lyapint = LyapunovsEstimator() # lyapint.set_func(fL84, DfL84) lyapint.set_func(fL63, DfL63) lyapint.compute_lyapunovs(0., 10000., 30000., 0.01, 0.01, ic, write_steps=1) #, n_vec=2) btl, btraj, bexp, bvec = lyapint.get_lyapunovs() print('Computing Forward Lyapunovs') # lyapint.set_func(fL84, DfL84) lyapint.set_func(fL63, DfL63) lyapint.compute_lyapunovs(0., 20000., 30000., 0.01, 0.01, ic, write_steps=1, forward=True, adjoint=False, inverse=False) #, n_vec=2) ftl, ftraj, fexp, fvec = lyapint.get_lyapunovs() print('Computing Covariant Lyapunovs') clvint = CovariantLyapunovsEstimator() # clvint.set_func(fL84, DfL84) clvint.set_func(fL63, DfL63) clvint.compute_clvs(0., 10000., 20000., 30000., 0.01, 0.01, ic, write_steps=1) #, n_vec=2) ctl, ctraj, cexp, cvec = clvint.get_clvs() clvint.compute_clvs(0., 10000., 20000., 30000., 0.01, 0.01, ic, write_steps=10, method=1, backward_vectors=True) #, n_vec=2) ctl2, ctraj2, cexp2, cvec2 = clvint.get_clvs() lyapint.terminate() clvint.terminate()