Source code for qgs.functions.tendencies


"""
    Tendencies definition module
    ============================

    This module provides functions to create the tendencies functions of the model, based on
    its parameters.

"""
import numpy as np
from numba import njit

from qgs.inner_products.analytic import AtmosphericAnalyticInnerProducts, OceanicAnalyticInnerProducts, GroundAnalyticInnerProducts
from qgs.inner_products.symbolic import AtmosphericSymbolicInnerProducts, OceanicSymbolicInnerProducts, GroundSymbolicInnerProducts
from qgs.tensors.qgtensor import QgsTensor, QgsTensorDynamicT, QgsTensorT4
from qgs.tensors.atmo_thermo_tensor import AtmoThermoTensor, AtmoThermoTensorDynamicT, AtmoThermoTensorT4
from qgs.functions.sparse_mul import sparse_mul5, sparse_mul4, sparse_mul3, sparse_mul2


[docs]def create_tendencies(params, return_inner_products=False, return_qgtensor=False): """Function to handle the inner products and tendencies tensors construction. Returns the tendencies function :math:`\\boldsymbol{f}` determining the model's ordinary differential equations: .. math:: \dot{\\boldsymbol{x}} = \\boldsymbol{f}(\\boldsymbol{x}) which is for the model's integration. It returns also the linearized tendencies :math:`\\boldsymbol{\mathrm{J}} \equiv \\boldsymbol{\mathrm{D}f} = \\frac{\partial \\boldsymbol{f}}{\partial \\boldsymbol{x}}` (Jacobian matrix) which are used by the tangent linear model: .. math :: \dot{\\boldsymbol{\delta x}} = \\boldsymbol{\mathrm{J}}(\\boldsymbol{x}) \cdot \\boldsymbol{\delta x} Parameters ---------- params: QgParams The parameters fully specifying the model configuration. return_inner_products: bool If True, return the inner products of the model. Default to False. return_qgtensor: bool If True, return the tendencies tensor of the model. Default to False. Returns ------- f: callable The numba-jitted tendencies function. Df: callable The numba-jitted linearized tendencies function. inner_products: (AtmosphericInnerProducts, OceanicInnerProducts) If `return_inner_products` is True, the inner products of the system. qgtensor: QgsTensor If `return_qgtensor` is True, the tendencies tensor of the system. """ if params.ablocks is not None: aip = AtmosphericAnalyticInnerProducts(params) elif params.atmospheric_basis is not None: aip = AtmosphericSymbolicInnerProducts(params) else: aip = None if params.oblocks is not None: oip = OceanicAnalyticInnerProducts(params) elif params.oceanic_basis is not None: oip = OceanicSymbolicInnerProducts(params) else: oip = None if params.gblocks is not None: gip = GroundAnalyticInnerProducts(params) elif params.ground_basis is not None: gip = GroundSymbolicInnerProducts(params) else: gip = None if aip is not None and oip is not None: if not aip.connected_to_ocean: aip.connect_to_ocean(oip) elif aip is not None and gip is not None: if not aip.connected_to_ground: aip.connect_to_ground(gip) if params.T4: agotensor = QgsTensorT4(params, aip, oip, gip) elif params.dynamic_T: agotensor = QgsTensorDynamicT(params, aip, oip, gip) else: agotensor = QgsTensor(params, aip, oip, gip) coo = agotensor.tensor.coords.T val = agotensor.tensor.data jcoo = agotensor.jacobian_tensor.coords.T jval = agotensor.jacobian_tensor.data if params.T4 or params.dynamic_T: @njit def f(t, x): xx = np.concatenate((np.full((1,), 1.), x)) xr = sparse_mul5(coo, val, xx, xx, xx, xx) return xr[1:] @njit def Df(t, x): xx = np.concatenate((np.full((1,), 1.), x)) mul_jac = sparse_mul4(jcoo, jval, xx, xx, xx) return mul_jac[1:, 1:] else: @njit def f(t, x): xx = np.concatenate((np.full((1,), 1.), x)) xr = sparse_mul3(coo, val, xx, xx) return xr[1:] @njit def Df(t, x): xx = np.concatenate((np.full((1,), 1.), x)) mul_jac = sparse_mul2(jcoo, jval, xx) return mul_jac[1:, 1:] ret = list() ret.append(f) ret.append(Df) if return_inner_products: ret.append((aip, oip, gip)) if return_qgtensor: ret.append(agotensor) return ret
[docs]def create_atmo_thermo_tendencies(params, return_atmo_thermo_tensor=False): """Returns a function which return a part of the atmospheric thermodynamic tendencies useful for computing the vertical wind. Parameters ---------- params: QgParams The parameters fully specifying the model configuration. return_atmo_thermo_tensor: bool If True, return the tendencies tensor of these tencencies. Default to False. Returns ------- f: callable The numba-jitted tendencies function. atmo_thermo_tensor: AtmoThermoTensor If `return_atmo_thermo_tensor` is True, the tendencies tensor of the system. """ if params.ablocks is not None: aip = AtmosphericAnalyticInnerProducts(params) elif params.atmospheric_basis is not None: aip = AtmosphericSymbolicInnerProducts(params) else: aip = None if params.oblocks is not None: oip = OceanicAnalyticInnerProducts(params) elif params.oceanic_basis is not None: oip = OceanicSymbolicInnerProducts(params) else: oip = None if params.gblocks is not None: gip = GroundAnalyticInnerProducts(params) elif params.ground_basis is not None: gip = GroundSymbolicInnerProducts(params) else: gip = None if aip is not None and oip is not None: if not aip.connected_to_ocean: aip.connect_to_ocean(oip) elif aip is not None and gip is not None: if not aip.connected_to_ground: aip.connect_to_ground(gip) if params.T4: agotensor = AtmoThermoTensorT4(params, aip, oip, gip) elif params.dynamic_T: agotensor = AtmoThermoTensorDynamicT(params, aip, oip, gip) else: agotensor = AtmoThermoTensor(params, aip, oip, gip) coo = agotensor.tensor.coords.T val = agotensor.tensor.data if params.T4 or params.dynamic_T: @njit def f(t, x): xx = np.concatenate((np.full((1,), 1.), x)) xr = sparse_mul5(coo, val, xx, xx, xx, xx) return xr[1:] else: @njit def f(t, x): xx = np.concatenate((np.full((1,), 1.), x)) xr = sparse_mul3(coo, val, xx, xx) return xr[1:] if return_atmo_thermo_tensor: ret = list() ret.append(f) ret.append(agotensor) else: ret = f return ret
if __name__ == '__main__': from qgs.params.params import QgParams params = QgParams() params.set_atmospheric_channel_fourier_modes(2, 2, mode='symbolic') params.set_oceanic_basin_fourier_modes(2, 4, mode='symbolic') f, Df = create_tendencies(params)