Source code for qgs.tensors.qgtensor

"""
    qgs tensor module
    =================

    This module computes and holds the tensors representing the tendencies of the model's equations.

    TODO: Add a list of the different tensor available

"""
from contextlib import redirect_stdout

import numpy as np
import sparse as sp
import pickle

real_eps = np.finfo(np.float64).eps


[docs]class QgsTensor(object): """qgs tendencies tensor class. Parameters ---------- params: None or QgParams, optional The models parameters to configure the tensor. `None` to initialize an empty tensor. Default to `None`. atmospheric_inner_products: None or AtmosphericInnerProducts, optional The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected. If None, disable the atmospheric tendencies. Default to `None`. oceanic_inner_products: None or OceanicInnerProducts, optional The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected. If None, disable the oceanic tendencies. Default to `None`. ground_inner_products: None or GroundInnerProducts, optional The inner products of the ground basis functions on which the model's PDE ground equations are projected. If None, disable the ground tendencies. Default to `None`. Attributes ---------- params: None or QgParams The models parameters used to configure the tensor. `None` for an empty tensor. atmospheric_inner_products: None or AtmosphericInnerProducts The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected. If None, disable the atmospheric tendencies. Default to `None`. oceanic_inner_products: None or OceanicInnerProducts The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected. If None, disable the oceanic tendencies. Default to `None`. ground_inner_products: None or GroundInnerProducts The inner products of the ground basis functions on which the model's PDE ground equations are projected. If None, disable the ground tendencies. Default to `None`. tensor: sparse.COO(float) The tensor :math:`\\mathcal{T}_{i,j,k}` :math:`i`-th components. jacobian_tensor: sparse.COO(float) The jacobian tensor :math:`\\mathcal{T}_{i,j,k} + \\mathcal{T}_{i,k,j}` :math:`i`-th components. """ def __init__(self, params=None, atmospheric_inner_products=None, oceanic_inner_products=None, ground_inner_products=None): self.atmospheric_inner_products = atmospheric_inner_products self.oceanic_inner_products = oceanic_inner_products self.ground_inner_products = ground_inner_products self.params = params self.tensor = None self.jacobian_tensor = None self.compute_tensor() def _psi_a(self, i): """Transform the :math:`\\psi_{\\mathrm a}` :math:`i`-th coefficient into the effective model's variable. Parameters ---------- i: int The :math:`i`-th coefficients of :math:`\\psi_{\\mathrm a}` Returns ------- int The effective model's variable. """ return i + 1 def _theta_a(self, i): """Transform the :math:`\\theta_{\\mathrm a}` :math:`i`-th coefficient into the effective model's variable. Parameters ---------- i: int The :math:`i`-th coefficients of :math:`\\theta_{\\mathrm a}` Returns ------- int The effective model's variable. """ return i + self.params.variables_range[0] + 1 def _psi_o(self, i): """Transform the :math:`\\psi_{\\mathrm o}` :math:`i`-th coefficient into the effective model's variable. Parameters ---------- i: int The :math:`i`-th coefficients of :math:`\\psi_{\\mathrm o}` Returns ------- int The effective model's variable. """ return i + self.params.variables_range[1] + 1 def _deltaT_o(self, i): """Transform the :math:`\\delta T_{\\mathrm o}` :math:`i`-th coefficient into the effective model's variable. Parameters ---------- i: int The :math:`i`-th coefficients of :math:`\\delta T_{\\mathrm o}` Returns ------- int The effective model's variable. """ return i + self.params.variables_range[2] + 1 def _deltaT_g(self, i): """Transform the :math:`\\delta T_{\\mathrm o}` :math:`i`-th coefficient into the effective model's variable. Parameters ---------- i: int The :math:`i`-th coefficients of :math:`\\delta T_{\\mathrm o}` Returns ------- int The effective model's variable. """ return i + self.params.variables_range[1] + 1 def _compute_tensor_dicts(self): if self.params is None: return None if self.atmospheric_inner_products is None and self.oceanic_inner_products is None \ and self.ground_inner_products is None: return None aips = self.atmospheric_inner_products par = self.params atp = par.atemperature_params ap = par.atmospheric_params op = par.oceanic_params scp = par.scale_params gp = par.ground_params nvar = par.number_of_variables ndim = par.ndim bips = None if self.oceanic_inner_products is not None: bips = self.oceanic_inner_products ocean = True else: ocean = False if self.ground_inner_products is not None: bips = self.ground_inner_products ground_temp = True else: ground_temp = False if self.params.dynamic_T: offset = 1 else: offset = 0 # constructing some derived matrices if aips is not None: a_inv = np.zeros((nvar[0], nvar[0])) for i in range(offset, nvar[1]): for j in range(offset, nvar[1]): a_inv[i - offset, j - offset] = aips.a(i, j) a_inv = np.linalg.inv(a_inv) a_inv = sp.COO(a_inv) a_theta = np.zeros((nvar[1], nvar[1])) for i in range(nvar[1]): for j in range(nvar[1]): a_theta[i, j] = ap.sig0 * aips.a(i, j) - aips.u(i, j) a_theta = np.linalg.inv(a_theta) a_theta = sp.COO(a_theta) if bips is not None: if ocean: U_inv = np.zeros((nvar[3], nvar[3])) for i in range(nvar[3]): for j in range(nvar[3]): U_inv[i, j] = bips.U(i, j) U_inv = np.linalg.inv(U_inv) U_inv = sp.COO(U_inv) M_psio = np.zeros((nvar[2], nvar[2])) for i in range(offset, nvar[3]): for j in range(offset, nvar[3]): M_psio[i - offset, j - offset] = bips.M(i, j) + par.G * bips.U(i, j) M_psio = np.linalg.inv(M_psio) M_psio = sp.COO(M_psio) else: U_inv = np.zeros((nvar[2], nvar[2])) for i in range(nvar[2]): for j in range(nvar[2]): U_inv[i, j] = bips.U(i, j) U_inv = np.linalg.inv(U_inv) U_inv = sp.COO(U_inv) ################# if bips is not None: go = bips.stored else: go = True sparse_arrays_dict = dict() if aips.stored and go: # psi_a part for i in range(nvar[0]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') for j in range(nvar[0]): jo = j + offset # skipping the theta 0 variable if it exists val = a_inv[i, :] @ aips._c[offset:, jo] t[self._psi_a(j), 0] -= val * scp.beta t[self._psi_a(j), 0] -= (ap.kd * _kronecker_delta(i, j)) / 2 t[self._theta_a(jo), 0] = (ap.kd * _kronecker_delta(i, j)) / 2 if gp is not None: if gp.hk is not None: if gp.orographic_basis == "atmospheric": oro = a_inv[i, :] @ aips._g[offset:, jo, offset:] @ sp.COO(gp.hk.astype(float)) # not perfect else: oro = a_inv[i, :] @ aips._gh[offset:, jo, offset:] @ sp.COO(gp.hk.astype(float)) # not perfect t[self._psi_a(j), 0] -= oro / 2 t[self._theta_a(jo), 0] += oro / 2 for k in range(nvar[0]): ko = k + offset # skipping the theta 0 variable if it exists val = a_inv[i, :] @ aips._b[offset:, jo, ko] t[self._psi_a(j), self._psi_a(k)] = - val t[self._theta_a(jo), self._theta_a(ko)] = - val if ocean: for j in range(nvar[2]): jo = j + offset # skipping the theta 0 variable if it exists val = a_inv[i, :] @ aips._d[offset:, jo] t[self._psi_o(j), 0] += val * ap.kd / 2 sparse_arrays_dict[self._psi_a(i)] = t.to_coo() # theta_a part for i in range(nvar[1]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') if par.Cpa is not None: t[0, 0] -= a_theta[i, :] @ aips._u @ sp.COO(par.Cpa.astype(float)) # not perfect if atp.hd is not None and atp.thetas is not None: val = - a_theta[i, :] @ aips._u @ sp.COO(atp.thetas.astype(float)) # not perfect t[0, 0] += val * atp.hd for j in range(nvar[0]): jo = j + offset # skipping the theta 0 variable if it exists val = a_theta[i, :] @ aips._a[:, jo] t[self._psi_a(j), 0] += val * ap.kd * ap.sig0 / 2 t[self._theta_a(jo), 0] -= val * (ap.kd / 2 + 2 * ap.kdp) * ap.sig0 val = - a_theta[i, :] @ aips._c[:, jo] t[self._theta_a(jo), 0] += val * scp.beta * ap.sig0 if gp is not None: if gp.hk is not None: if gp.orographic_basis == "atmospheric": oro = a_theta[i, :] @ aips._g[:, jo, offset:] @ sp.COO(gp.hk.astype(float)) # not perfect else: oro = a_theta[i, :] @ aips._gh[:, jo, offset:] @ sp.COO(gp.hk.astype(float)) # not perfect t[self._theta_a(jo), 0] -= ap.sig0 * oro / 2 t[self._psi_a(j), 0] += ap.sig0 * oro / 2 for k in range(nvar[0]): ko = k + offset # skipping the theta 0 variable if it exists val = a_theta[i, :] @ aips._b[:, jo, ko] t[self._psi_a(j), self._theta_a(ko)] = - val * ap.sig0 t[self._theta_a(jo), self._psi_a(k)] = - val * ap.sig0 val = a_theta[i, :] @ aips._g[:, jo, ko] t[self._psi_a(j), self._theta_a(ko)] += val for j in range(nvar[1]): val = a_theta[i, :] @ aips._u[:, j] if par.Lpa is not None: t[self._theta_a(j), 0] += val * atp.sc * par.Lpa if par.LSBpa is not None: t[self._theta_a(j), 0] += val * par.LSBpa if atp.hd is not None: t[self._theta_a(j), 0] += val * atp.hd if ocean: for j in range(nvar[2]): jo = j + offset # skipping the theta 0 variable if it exists val = - a_theta[i, :] @ aips._d[:, jo] t[self._psi_o(j), 0] += val * ap.sig0 * ap.kd / 2 if par.Lpa is not None: for j in range(nvar[3]): val = - a_theta[i, :] @ aips._s[:, j] t[self._deltaT_o(j), 0] += val * par.Lpa / 2 if par.LSBpgo is not None: t[self._deltaT_o(j), 0] += val * par.LSBpgo if ground_temp: if par.Lpa is not None: for j in range(nvar[2]): val = - a_theta[i, :] @ aips._s[:, j] t[self._deltaT_g(j), 0] += val * par.Lpa / 2 if par.LSBpgo is not None: t[self._deltaT_g(j), 0] += val * par.LSBpgo sparse_arrays_dict[self._theta_a(i)] = t.to_coo() if ocean: # psi_o part for i in range(nvar[2]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') for j in range(nvar[0]): jo = j + offset # skipping the theta 0 variable if it exists val = M_psio[i, :] @ bips._K[offset:, jo] * op.d t[self._psi_a(j), 0] += val t[self._theta_a(jo), 0] -= val for j in range(nvar[2]): jo = j + offset # skipping the T 0 variable if it exists val = - M_psio[i, :] @ bips._N[offset:, jo] t[self._psi_o(j), 0] += val * scp.beta val = - M_psio[i, :] @ bips._M[offset:, jo] t[self._psi_o(j), 0] += val * (op.r + op.d) for k in range(nvar[2]): ko = k + offset # skipping the T 0 variable if it exists t[self._psi_o(j), self._psi_o(k)] -= M_psio[i, :] @ bips._C[offset:, jo, ko] sparse_arrays_dict[self._psi_o(i)] = t.to_coo() # deltaT_o part for i in range(nvar[3]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') t[0, 0] += U_inv[i, :] @ bips._W @ sp.COO(par.Cpgo.astype(float)) for j in range(nvar[1]): val = U_inv[i, :] @ bips._W[:, j] t[self._theta_a(j), 0] += val * 2 * atp.sc * par.Lpgo if par.sbpa is not None: t[self._theta_a(j), 0] += val * par.sbpa for j in range(nvar[3]): t[self._deltaT_o(j), 0] = - par.Lpgo * _kronecker_delta(i, j) if par.sbpgo is not None: t[self._deltaT_o(j), 0] += - par.sbpgo * _kronecker_delta(i, j) for j in range(nvar[2]): for k in range(nvar[2]): jo = j + offset # skipping the T 0 variable if it exists ko = k + offset # skipping the T 0 variable if it exists t[self._psi_o(j), self._deltaT_o(ko)] -= U_inv[i, :] @ bips._O[:, jo, ko] sparse_arrays_dict[self._deltaT_o(i)] = t.to_coo() # deltaT_g part if ground_temp: for i in range(nvar[2]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') t[0, 0] += U_inv[i, :] @ bips._W @ sp.COO(par.Cpgo.astype(float)) # not perfect for j in range(nvar[1]): val = U_inv[i, :] @ bips._W[:, j] t[self._theta_a(j), 0] += val * 2 * atp.sc * par.Lpgo if par.sbpa is not None: t[self._theta_a(j), 0] += val * par.sbpa for j in range(nvar[2]): t[self._deltaT_g(j), 0] = - par.Lpgo * _kronecker_delta(i, j) if par.sbpgo is not None: t[self._deltaT_g(j), 0] += - par.sbpgo * _kronecker_delta(i, j) sparse_arrays_dict[self._deltaT_g(i)] = t.to_coo() else: # psi_a part for i in range(nvar[0]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') for j in range(nvar[0]): jo = j + offset val = 0 for jj in range(nvar[0]): val += a_inv[i, jj] * aips.c(offset + jj, jo) t[self._psi_a(j), 0] -= val * scp.beta t[self._psi_a(j), 0] -= (ap.kd * _kronecker_delta(i, j)) / 2 t[self._theta_a(jo), 0] = (ap.kd * _kronecker_delta(i, j)) / 2 if gp is not None: if gp.hk is not None: oro = 0 if gp.orographic_basis == "atmospheric": for jj in range(nvar[0]): for kk in range(nvar[0]): oro += a_inv[i, jj] * aips.g(offset + jj, j, offset + kk) * gp.hk[kk] else: for jj in range(nvar[0]): for kk in range(nvar[0]): oro += a_inv[i, jj] * aips.gh(offset + jj, j, offset + kk) * gp.hk[kk] t[self._psi_a(j), 0] -= oro / 2 t[self._theta_a(jo), 0] += oro / 2 for k in range(nvar[0]): ko = k + offset # skipping the theta 0 variable if it exists val = 0 for jj in range(nvar[0]): val += a_inv[i, jj] * aips.b(offset + jj, jo, ko) t[self._psi_a(j), self._psi_a(k)] = - val t[self._theta_a(jo), self._theta_a(ko)] = - val if ocean: for j in range(nvar[2]): jo = j + offset val = 0 for jj in range(nvar[0]): val += a_inv[i, jj] * aips.d(offset + jj, jo) t[self._psi_o(j), 0] += val * ap.kd / 2 sparse_arrays_dict[self._psi_a(i)] = t.to_coo() # theta_a part for i in range(nvar[1]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') if par.Cpa is not None: for jj in range(nvar[1]): for kk in range(nvar[1]): t[0, 0] -= a_theta[i, jj] * aips.u(jj, kk) * par.Cpa[kk] if atp.hd is not None and atp.thetas is not None: val = 0 for jj in range(nvar[1]): for kk in range(nvar[1]): val -= a_theta[i, jj] * aips.u(jj, kk) * atp.thetas[kk] t[0, 0] += val * atp.hd for j in range(nvar[0]): jo = j + offset # skipping the theta 0 variable if it exists val = 0 for jj in range(nvar[1]): val += a_theta[i, jj] * aips.a(jj, jo) t[self._psi_a(j), 0] += val * ap.kd * ap.sig0 / 2 t[self._theta_a(jo), 0] -= val * (ap.kd / 2 + 2 * ap.kdp) * ap.sig0 val = 0 for jj in range(nvar[1]): val -= a_theta[i, jj] * aips.c(jj, jo) t[self._theta_a(jo), 0] += val * scp.beta * ap.sig0 if gp is not None: if gp.hk is not None: oro = 0 if gp.orographic_basis == "atmospheric": for jj in range(nvar[1]): for kk in range(nvar[0]): oro += a_theta[i, jj] * aips.g(jj, jo, offset + kk) * gp.hk[kk] else: for jj in range(nvar[1]): for kk in range(nvar[0]): oro += a_theta[i, jj] * aips.gh(jj, jo, offset + kk) * gp.hk[kk] t[self._theta_a(jo), 0] -= ap.sig0 * oro / 2 t[self._psi_a(j), 0] += ap.sig0 * oro / 2 for k in range(nvar[0]): ko = k + offset # skipping the theta 0 variable if it exists val = 0 for jj in range(nvar[1]): val += a_theta[i, jj] * aips.b(jj, jo, ko) t[self._psi_a(j), self._theta_a(ko)] = - val * ap.sig0 t[self._theta_a(jo), self._psi_a(k)] = - val * ap.sig0 val = 0 for jj in range(nvar[1]): val += a_theta[i, jj] * aips.g(jj, jo, ko) t[self._psi_a(j), self._theta_a(ko)] += val for j in range(nvar[1]): val = 0 for jj in range(nvar[1]): val += a_theta[i, jj] * aips.u(jj, j) if par.Lpa is not None: t[self._theta_a(j), 0] += val * atp.sc * par.Lpa if par.LSBpa is not None: t[self._theta_a(j), 0] += val * par.LSBpa if atp.hd is not None: t[self._theta_a(j), 0] += val * atp.hd if ocean: for j in range(nvar[2]): jo = j + offset # skipping the theta 0 variable if it exists val = 0 for jj in range(nvar[1]): val -= a_theta[i, jj] * aips.d(jj, jo) t[self._psi_o(j), 0] += val * ap.sig0 * ap.kd / 2 if par.Lpa is not None: for j in range(nvar[3]): val = 0 for jj in range(nvar[1]): val -= a_theta[i, jj] * aips.s(jj, j) t[self._deltaT_o(j), 0] += val * par.Lpa / 2 if par.LSBpgo is not None: t[self._deltaT_o(j), 0] += val * par.LSBpgo if ground_temp: if par.Lpa is not None: for j in range(nvar[2]): val = 0 for jj in range(nvar[1]): val -= a_theta[i, jj] * aips.s(jj, j) t[self._deltaT_g(j), 0] += val * par.Lpa / 2 if par.LSBpgo is not None: t[self._deltaT_g(j), 0] += val * par.LSBpgo sparse_arrays_dict[self._theta_a(i)] = t.to_coo() if ocean: # psi_o part for i in range(nvar[2]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') for j in range(nvar[0]): jo = j + offset # skipping the theta 0 variable if it exists for jj in range(nvar[2]): val = M_psio[i, jj] * bips.K(offset + jj, jo) * op.d t[self._psi_a(j), 0] += val t[self._theta_a(jo), 0] -= val for j in range(nvar[2]): jo = j + offset # skipping the T 0 variable if it exists val = 0 for jj in range(nvar[2]): val -= M_psio[i, jj] * bips.N(offset + jj, jo) t[self._psi_o(j), 0] += val * scp.beta val = 0 for jj in range(nvar[2]): val -= M_psio[i, jj] * bips.M(offset + jj, jo) t[self._psi_o(j), 0] += val * (op.r + op.d) for k in range(nvar[2]): ko = k + offset # skipping the T 0 variable if it exists for jj in range(nvar[2]): t[self._psi_o(j), self._psi_o(k)] -= M_psio[i, jj] * bips.C(offset + jj, jo, ko) sparse_arrays_dict[self._psi_o(i)] = t.to_coo() # deltaT_o part for i in range(nvar[3]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') for jj in range(nvar[1]): for kk in range(nvar[3]): t[0, 0] += U_inv[i, kk] * bips.W(kk, jj) * par.Cpgo[jj] for j in range(nvar[1]): val = 0 for jj in range(nvar[3]): val += U_inv[i, jj] * bips.W(jj, j) t[self._theta_a(j), 0] += val * 2 * atp.sc * par.Lpgo if par.sbpa is not None: t[self._theta_a(j), 0] += val * par.sbpa for j in range(nvar[3]): t[self._deltaT_o(j), 0] = - par.Lpgo * _kronecker_delta(i, j) if par.sbpgo is not None: t[self._deltaT_o(j), 0] += - par.sbpgo * _kronecker_delta(i, j) for j in range(nvar[2]): for k in range(offset, nvar[3]): jo = j + offset # skipping the T 0 variable if it exists for jj in range(nvar[3]): t[self._psi_o(j), self._deltaT_o(k)] -= U_inv[i, jj] * bips.O(jj, jo, k) sparse_arrays_dict[self._deltaT_o(i)] = t.to_coo() # deltaT_g part if ground_temp: for i in range(nvar[2]): t = sp.zeros((ndim + 1, ndim + 1), dtype=np.float64, format='dok') for jj in range(nvar[1]): for kk in range(nvar[2]): t[0, 0] += U_inv[i, kk] * bips.W(kk, jj) * par.Cpgo[jj] for j in range(nvar[1]): val = 0 for jj in range(nvar[2]): val += U_inv[i, jj] * bips.W(jj, j) t[self._theta_a(j), 0] += val * 2 * atp.sc * par.Lpgo if par.sbpa is not None: t[self._theta_a(j), 0] += val * par.sbpa for j in range(nvar[2]): t[self._deltaT_g(j), 0] = - par.Lpgo * _kronecker_delta(i, j) if par.sbpgo is not None: t[self._deltaT_g(j), 0] += - par.sbpgo * _kronecker_delta(i, j) sparse_arrays_dict[self._deltaT_g(i)] = t.to_coo() return sparse_arrays_dict
[docs] def compute_tensor(self): """Routine to compute the tensor.""" # gathering par = self.params ndim = par.ndim sparse_arrays_dict = self._compute_tensor_dicts() tensor = sp.zeros((ndim + 1, ndim + 1, ndim + 1), dtype=np.float64, format='coo') if sparse_arrays_dict is not None: tensor = self._add_dict_to_tensor(sparse_arrays_dict, tensor) self._set_tensor(tensor)
def _set_tensor(self, tensor): if not isinstance(tensor, sp.COO): tensor = tensor.to_coo() self.jacobian_tensor = self.jacobian_from_tensor(tensor) self.tensor = self.simplify_tensor(tensor) @staticmethod def _add_dict_to_tensor(dic, tensor): shape = tensor.shape rank = len(shape) if tensor.coords.size == 0: coords = np.array(rank * [[0]], dtype=int) data = np.array([0.]) else: coords = tensor.coords.copy() data = tensor.data.copy() for i in dic: if not isinstance(dic[i], (list, tuple)): dl = [dic[i]] else: dl = dic[i] for d in dl: t_coords = d.coords t_rank = t_coords.shape[0] values = d.data new_coords = np.concatenate((np.full((1, len(values)), i), t_coords), axis=0) for _ in range(t_rank+1, rank): new_coords = np.concatenate((new_coords, np.zeros((1, len(values)), dtype=int))) data = np.concatenate((data, values)) coords = np.concatenate((coords, new_coords), axis=1) return sp.COO(coords, data, shape=shape) @staticmethod def _shift_tensor_coordinates(tensor, shift): new_coords = tensor.coords.copy() + shift return sp.COO(new_coords.astype(int), tensor.data)
[docs] @staticmethod def jacobian_from_tensor(tensor): """Function to compute the Jacobian tensor. Parameters ---------- tensor: ~sparse.C00 The qgs tensor. Returns ------- ~sparse.COO The Jacobian tensor. """ n_perm = len(tensor.shape) - 2 jacobian_tensor = tensor.copy() for i in range(1, n_perm+1): jacobian_tensor += tensor.swapaxes(1, i+1) return jacobian_tensor
[docs] @staticmethod def simplify_tensor(tensor): """Routine that simplifies the component of a tensor :math:`\\mathcal{T}`. For each index :math:`i`, it upper-triangularizes the tensor :math:`\\mathcal{T}_{i,\\ldots}` for all the subsequent indices. Parameters ---------- tensor: ~sparse.COO The tensor to simplify. Returns ------- ~sparse.COO The upper-triangularized tensor. """ coords = tensor.coords.copy() sorted_indices = np.sort(coords[1:, :], axis=0) coords[1:, :] = sorted_indices upp_tensor = sp.COO(coords, tensor.data.copy(), shape=tensor.shape, prune=True) return upp_tensor
[docs] def save_to_file(self, filename, **kwargs): """Function to save the tensor object to a file with the :mod:`pickle` module. Parameters ---------- filename: str The file name where to save the tensor object. kwargs: dict Keyword arguments to pass to the :mod:`pickle` module method. """ f = open(filename, 'wb') pickle.dump(self.__dict__, f, **kwargs) f.close()
[docs] def load_from_file(self, filename, **kwargs): """Function to load previously a saved tensor object with the method :meth:`save_to_file`. Parameters ---------- filename: str The file name where the tensor object was saved. kwargs: dict Keyword arguments to pass to the :mod:`pickle` module method. """ f = open(filename, 'rb') tmp_dict = pickle.load(f, **kwargs) f.close() self.__dict__.clear() self.__dict__.update(tmp_dict)
[docs] def print_tensor(self, tensor_name=""): """Routine to print the tensor. Parameters ---------- tensor_name: str, optional Specify the name to print beside the values of the tensor. Default to `QgsTensor`. """ if not tensor_name: tensor_name = 'QgsTensor' for coo, val in zip(self.tensor.coords.T, self.tensor.data): self._string_format(print, tensor_name, coo, val)
[docs] def print_tensor_to_file(self, filename, tensor_name=""): """Routine to print the tensor to a file. Parameters ---------- filename: str The filename where to print the tensor. tensor_name: str, optional Specify the name to print beside the values of the tensor. Default to `QgsTensor`. """ with open(filename, 'w') as f: with redirect_stdout(f): self.print_tensor(tensor_name)
[docs] def print_jacobian_tensor(self, tensor_name=""): """Routine to print the Jacobian tensor. Parameters ---------- tensor_name: str, optional Specify the name to print beside the values of the tensor. Default to `QgsTensorJacobian`. """ if not tensor_name: tensor_name = 'QgsTensorJacobian' for coo, val in zip(self.jacobian_tensor.coords.T, self.jacobian_tensor.data): self._string_format(print, tensor_name, coo, val)
[docs] def print_jacobian_tensor_to_file(self, filename, tensor_name=""): """Routine to print the Jacobian tensor to a file. Parameters ---------- filename: str The filename where to print the tensor. tensor_name: str, optional Specify the name to print beside the values of the tensor. Default to `QgsTensorJacobian`. """ with open(filename, 'w') as f: with redirect_stdout(f): self.print_jacobian_tensor(tensor_name)
@staticmethod def _string_format(func, symbol, indices, value): if abs(value) >= real_eps: s = symbol for i in indices: s += "["+str(i)+"]" s += " = % .5E" % value func(s)
[docs]class QgsTensorDynamicT(QgsTensor): """qgs dynamical temperature first order (linear) tendencies tensor class. Parameters ---------- params: None or QgParams, optional The models parameters to configure the tensor. `None` to initialize an empty tensor. Default to `None`. atmospheric_inner_products: None or AtmosphericInnerProducts, optional The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected. If None, disable the atmospheric tendencies. Default to `None`. oceanic_inner_products: None or OceanicInnerProducts, optional The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected. If None, disable the oceanic tendencies. Default to `None`. ground_inner_products: None or GroundInnerProducts, optional The inner products of the ground basis functions on which the model's PDE ground equations are projected. If None, disable the ground tendencies. Default to `None`. Attributes ---------- params: None or QgParams The models parameters used to configure the tensor. `None` for an empty tensor. atmospheric_inner_products: None or AtmosphericInnerProducts The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected. If None, disable the atmospheric tendencies. Default to `None`. oceanic_inner_products: None or OceanicInnerProducts The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected. If None, disable the oceanic tendencies. Default to `None`. ground_inner_products: None or GroundInnerProducts The inner products of the ground basis functions on which the model's PDE ground equations are projected. If None, disable the ground tendencies. Default to `None`. tensor: sparse.COO(float) The tensor :math:`\\mathcal{T}_{i,j,k}` :math:`i`-th components. jacobian_tensor: sparse.COO(float) The jacobian tensor :math:`\\mathcal{T}_{i,j,k} + \\mathcal{T}_{i,k,j}` :math:`i`-th components. """ def __init__(self, params=None, atmospheric_inner_products=None, oceanic_inner_products=None, ground_inner_products=None): QgsTensor.__init__(self, params, atmospheric_inner_products, oceanic_inner_products, ground_inner_products) def _compute_tensor_dicts(self): if self.params is None: return None if self.atmospheric_inner_products is None and self.oceanic_inner_products is None \ and self.ground_inner_products is None: return None aips = self.atmospheric_inner_products bips = None if self.oceanic_inner_products is not None: bips = self.oceanic_inner_products elif self.ground_inner_products is not None: bips = self.ground_inner_products if bips is not None: go = bips.stored else: go = True if aips.stored and go: sparse_arrays_full_dict = self._compute_stored_full_dict() else: sparse_arrays_full_dict = self._compute_non_stored_full_dict() return sparse_arrays_full_dict def _compute_stored_full_dict(self): par = self.params ap = par.atmospheric_params nvar = par.number_of_variables aips = self.atmospheric_inner_products bips = None if self.oceanic_inner_products is not None: bips = self.oceanic_inner_products ocean = True else: ocean = False if self.ground_inner_products is not None: bips = self.ground_inner_products ground_temp = True else: ground_temp = False # constructing some derived matrices if aips is not None: a_theta = np.zeros((nvar[1], nvar[1])) for i in range(nvar[1]): for j in range(nvar[1]): a_theta[i, j] = ap.sig0 * aips.a(i, j) - aips.u(i, j) a_theta = np.linalg.inv(a_theta) a_theta = sp.COO(a_theta) if bips is not None: if ocean: U_inv = np.zeros((nvar[3], nvar[3])) for i in range(nvar[3]): for j in range(nvar[3]): U_inv[i, j] = bips.U(i, j) U_inv = np.linalg.inv(U_inv) U_inv = sp.COO(U_inv) else: U_inv = np.zeros((nvar[2], nvar[2])) for i in range(nvar[2]): for j in range(nvar[2]): U_inv[i, j] = bips.U(i, j) U_inv = np.linalg.inv(U_inv) U_inv = sp.COO(U_inv) ################# sparse_arrays_full_dict = dict() # theta_a part for i in range(nvar[1]): sparse_arrays_full_dict[self._theta_a(i)] = list() if par.T4LSBpa is not None: val = sp.tensordot(a_theta[i], aips._z, axes=1) if val.nnz > 0: sparse_arrays_full_dict[self._theta_a(i)].append(self._shift_tensor_coordinates(par.T4LSBpa * val, self._theta_a(0))) if ocean: if par.T4LSBpgo is not None: val = sp.tensordot(a_theta[i], aips._v, axes=1) if val.nnz > 0: sparse_arrays_full_dict[self._theta_a(i)].append(self._shift_tensor_coordinates(- par.T4LSBpgo * val, self._deltaT_o(0))) if ground_temp: if par.T4LSBpgo is not None: val = sp.tensordot(a_theta[i], aips._v, axes=1) if val.nnz > 0: sparse_arrays_full_dict[self._theta_a(i)].append(self._shift_tensor_coordinates(- par.T4LSBpgo * val, self._deltaT_g(0))) if ocean: # deltaT_o part for i in range(nvar[3]): sparse_arrays_full_dict[self._deltaT_o(i)] = list() val = sp.tensordot(U_inv[i], bips._Z, axes=1) if val.nnz > 0: sparse_arrays_full_dict[self._deltaT_o(i)].append(self._shift_tensor_coordinates(par.T4sbpa * val, self._theta_a(0))) val = sp.tensordot(U_inv[i], bips._V, axes=1) if val.nnz > 0: sparse_arrays_full_dict[self._deltaT_o(i)].append(self._shift_tensor_coordinates(- par.T4sbpgo * val, self._deltaT_o(0))) if ground_temp: # deltaT_g part for i in range(nvar[2]): sparse_arrays_full_dict[self._deltaT_g(i)] = list() val = sp.tensordot(U_inv[i], bips._Z, axes=1) if val.nnz > 0: sparse_arrays_full_dict[self._deltaT_g(i)].append(self._shift_tensor_coordinates(par.T4sbpa * val, self._theta_a(0))) val = sp.tensordot(U_inv[i], bips._V, axes=1) if val.nnz > 0: sparse_arrays_full_dict[self._deltaT_g(i)].append(self._shift_tensor_coordinates(- par.T4sbpgo * val, self._deltaT_g(0))) return sparse_arrays_full_dict def _compute_non_stored_full_dict(self): par = self.params ap = par.atmospheric_params nvar = par.number_of_variables ndim = par.ndim aips = self.atmospheric_inner_products bips = None if self.oceanic_inner_products is not None: bips = self.oceanic_inner_products ocean = True else: ocean = False if self.ground_inner_products is not None: bips = self.ground_inner_products ground_temp = True else: ground_temp = False # constructing some derived matrices if aips is not None: a_theta = np.zeros((nvar[1], nvar[1])) for i in range(nvar[1]): for j in range(nvar[1]): a_theta[i, j] = ap.sig0 * aips.a(i, j) - aips.u(i, j) a_theta = np.linalg.inv(a_theta) a_theta = sp.COO(a_theta) if bips is not None: if ocean: U_inv = np.zeros((nvar[3], nvar[3])) for i in range(nvar[3]): for j in range(nvar[3]): U_inv[i, j] = bips.U(i, j) U_inv = np.linalg.inv(U_inv) U_inv = sp.COO(U_inv) else: U_inv = np.zeros((nvar[2], nvar[2])) for i in range(nvar[2]): for j in range(nvar[2]): U_inv[i, j] = bips.U(i, j) U_inv = np.linalg.inv(U_inv) U_inv = sp.COO(U_inv) ################# sparse_arrays_full_dict = dict() # theta_a part for i in range(nvar[1]): t_full = sp.zeros((ndim + 1, ndim + 1, ndim + 1, ndim + 1), dtype=np.float64, format='dok') if par.T4LSBpa is not None: j = k = ell = 0 for m in range(nvar[1]): val = 0 for jj in range(nvar[1]): val += a_theta[i, jj] * aips.z(jj, j, k, ell, m) if m == 0: t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = par.T4LSBpa * val else: t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = 4 * par.T4LSBpa * val if ocean: if par.T4LSBpgo is not None: j = k = ell = 0 for m in range(nvar[3]): val = 0 for jj in range(nvar[1]): val -= a_theta[i, jj] * aips.v(jj, j, k, ell, m) if m == 0: t_full[self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)] = par.T4LSBpgo * val else: t_full[self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)] = 4 * par.T4LSBpgo * val if ground_temp: if par.T4LSBpgo is not None: j = k = ell = 0 for m in range(nvar[2]): val = 0 for jj in range(nvar[1]): val -= a_theta[i, jj] * aips.v(jj, j, k, ell, m) if m == 0: t_full[self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)] = par.T4LSBpgo * val else: t_full[self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)] = 4 * par.T4LSBpgo * val sparse_arrays_full_dict[self._theta_a(i)] = t_full.to_coo() if ocean: # deltaT_o part for i in range(nvar[3]): t_full = sp.zeros((ndim + 1, ndim + 1, ndim + 1, ndim + 1), dtype=np.float64, format='dok') j = k = ell = 0 for m in range(nvar[1]): val = 0 for jj in range(nvar[3]): val += U_inv[i, jj] * bips.Z(jj, j, k, ell, m) if m == 0: t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = par.T4sbpa * val else: t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = 4 * par.T4sbpa * val for m in range(nvar[3]): val = 0 for jj in range(nvar[3]): val -= U_inv[i, jj] * bips.V(jj, j, k, ell, m) if m == 0: t_full[self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)] = par.T4sbpgo * val else: t_full[self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)] = 4 * par.T4sbpgo * val sparse_arrays_full_dict[self._deltaT_o(i)] = t_full.to_coo() # deltaT_g part if ground_temp: for i in range(nvar[2]): t_full = sp.zeros((ndim + 1, ndim + 1, ndim + 1, ndim + 1), dtype=np.float64, format='dok') j = k = ell = 0 for m in range(nvar[1]): val = 0 for jj in range(nvar[2]): val += U_inv[i, jj] * bips._Z[jj, j, k, ell, m] if m == 0: t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = par.T4sbpa * val else: t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = 4 * par.T4sbpa * val for m in range(nvar[2]): val = 0 for jj in range(nvar[2]): val -= U_inv[i, jj] * bips._V[jj, j, k, ell, m] if m == 0: t_full[self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)] = par.T4sbpgo * val else: t_full[self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)] = 4 * par.T4sbpgo * val sparse_arrays_full_dict[self._deltaT_g(i)] = t_full.to_coo() return sparse_arrays_full_dict
[docs] def compute_tensor(self): """Routine to compute the tensor.""" # gathering par = self.params ndim = par.ndim sparse_arrays_dict = QgsTensor._compute_tensor_dicts(self) sparse_arrays_full_dict = self._compute_tensor_dicts() tensor = sp.zeros((ndim + 1, ndim + 1, ndim + 1, ndim + 1, ndim + 1), dtype=np.float64, format='coo') if sparse_arrays_dict is not None: tensor = self._add_dict_to_tensor(sparse_arrays_dict, tensor) if sparse_arrays_full_dict is not None: tensor = self._add_dict_to_tensor(sparse_arrays_full_dict, tensor) self._set_tensor(tensor)
[docs]class QgsTensorT4(QgsTensorDynamicT): """qgs :math:`T^4` tendencies tensor class. Implies dynamical zeroth-order temperature. Parameters ---------- params: None or QgParams, optional The models parameters to configure the tensor. `None` to initialize an empty tensor. Default to `None`. atmospheric_inner_products: None or AtmosphericInnerProducts, optional The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected. If None, disable the atmospheric tendencies. Default to `None`. oceanic_inner_products: None or OceanicInnerProducts, optional The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected. If None, disable the oceanic tendencies. Default to `None`. ground_inner_products: None or GroundInnerProducts, optional The inner products of the ground basis functions on which the model's PDE ground equations are projected. If None, disable the ground tendencies. Default to `None`. Attributes ---------- params: None or QgParams The models parameters used to configure the tensor. `None` for an empty tensor. atmospheric_inner_products: None or AtmosphericInnerProducts The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected. If None, disable the atmospheric tendencies. Default to `None`. oceanic_inner_products: None or OceanicInnerProducts The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected. If None, disable the oceanic tendencies. Default to `None`. ground_inner_products: None or GroundInnerProducts The inner products of the ground basis functions on which the model's PDE ground equations are projected. If None, disable the ground tendencies. Default to `None`. tensor: sparse.COO(float) The tensor :math:`\\mathcal{T}_{i,j,k}` :math:`i`-th components. jacobian_tensor: sparse.COO(float) The jacobian tensor :math:`\\mathcal{T}_{i,j,k} + \\mathcal{T}_{i,k,j}` :math:`i`-th components. """ def __init__(self, params=None, atmospheric_inner_products=None, oceanic_inner_products=None, ground_inner_products=None): QgsTensorDynamicT.__init__(self, params, atmospheric_inner_products, oceanic_inner_products, ground_inner_products) def _compute_non_stored_full_dict(self): if self.params is None: return None if self.atmospheric_inner_products is None and self.oceanic_inner_products is None \ and self.ground_inner_products is None: return None aips = self.atmospheric_inner_products par = self.params ap = par.atmospheric_params nvar = par.number_of_variables ndim = par.ndim bips = None if self.oceanic_inner_products is not None: bips = self.oceanic_inner_products ocean = True else: ocean = False if self.ground_inner_products is not None: bips = self.ground_inner_products ground_temp = True else: ground_temp = False # constructing some derived matrices if aips is not None: a_theta = np.zeros((nvar[1], nvar[1])) for i in range(nvar[1]): for j in range(nvar[1]): a_theta[i, j] = ap.sig0 * aips.a(i, j) - aips.u(i, j) a_theta = np.linalg.inv(a_theta) a_theta = sp.COO(a_theta) if bips is not None: if ocean: U_inv = np.zeros((nvar[3], nvar[3])) for i in range(nvar[3]): for j in range(nvar[3]): U_inv[i, j] = bips.U(i, j) U_inv = np.linalg.inv(U_inv) U_inv = sp.COO(U_inv) else: U_inv = np.zeros((nvar[2], nvar[2])) for i in range(nvar[2]): for j in range(nvar[2]): U_inv[i, j] = bips.U(i, j) U_inv = np.linalg.inv(U_inv) U_inv = sp.COO(U_inv) ################# sparse_arrays_full_dict = dict() # theta_a part for i in range(nvar[1]): t_full = sp.zeros((ndim + 1, ndim + 1, ndim + 1, ndim + 1), dtype=np.float64, format='dok') if par.T4LSBpa is not None: for j in range(nvar[1]): for k in range(nvar[1]): for ell in range(nvar[1]): for m in range(nvar[1]): val = 0 for jj in range(nvar[1]): val += a_theta[i, jj] * aips.z(jj, j, k, ell, m) t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = par.T4LSBpa * val if ocean: if par.T4LSBpgo is not None: for j in range(nvar[3]): for k in range(nvar[3]): for ell in range(nvar[3]): for m in range(nvar[3]): val = 0 for jj in range(nvar[1]): val -= a_theta[i, jj] * aips.v(jj, j, k, ell, m) t_full[self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)] = par.T4LSBpgo * val if ground_temp: if par.T4LSBpgo is not None: for j in range(nvar[2]): for k in range(nvar[2]): for ell in range(nvar[2]): for m in range(nvar[2]): val = 0 for jj in range(nvar[1]): val -= a_theta[i, jj] * aips.v(jj, j, k, ell, m) t_full[self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)] = par.T4LSBpgo * val sparse_arrays_full_dict[self._theta_a(i)] = t_full.to_coo() if ocean: # deltaT_o part for i in range(nvar[3]): t_full = sp.zeros((ndim + 1, ndim + 1, ndim + 1, ndim + 1), dtype=np.float64, format='dok') for j in range(nvar[1]): for k in range(nvar[1]): for ell in range(nvar[1]): for m in range(nvar[1]): val = 0 for jj in range(nvar[3]): val += U_inv[i, jj] * bips.Z(jj, j, k, ell, m) t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = par.T4sbpa * val for j in range(nvar[3]): for k in range(nvar[3]): for ell in range(nvar[3]): for m in range(nvar[3]): val = 0 for jj in range(nvar[3]): val -= U_inv[i, jj] * bips.V(jj, j, k, ell, m) t_full[self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)] = par.T4sbpgo * val sparse_arrays_full_dict[self._deltaT_o(i)] = t_full.to_coo() # deltaT_g part if ground_temp: for i in range(nvar[2]): t_full = sp.zeros((ndim + 1, ndim + 1, ndim + 1, ndim + 1), dtype=np.float64, format='dok') for j in range(nvar[1]): for k in range(nvar[1]): for ell in range(nvar[1]): for m in range(nvar[1]): val = 0 for jj in range(nvar[2]): val += U_inv[i, jj] * bips._Z[jj, j, k, ell, m] t_full[self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)] = par.T4sbpa * val for j in range(nvar[2]): for k in range(nvar[2]): for ell in range(nvar[2]): for m in range(nvar[2]): val = 0 for jj in range(nvar[2]): val -= U_inv[i, jj] * bips._V[jj, j, k, ell, m] t_full[self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)] = par.T4sbpgo * val sparse_arrays_full_dict[self._deltaT_g(i)] = t_full.to_coo() return sparse_arrays_full_dict
def _kronecker_delta(i, j): if i == j: return 1 else: return 0 if __name__ == '__main__': from qgs.params.params import QgParams from qgs.inner_products.analytic import AtmosphericAnalyticInnerProducts, OceanicAnalyticInnerProducts from qgs.inner_products.symbolic import AtmosphericSymbolicInnerProducts, OceanicSymbolicInnerProducts # Analytic test params = QgParams({'rr': 287.e0, 'sb': 5.6e-8}) params.set_params({'kd': 0.04, 'kdp': 0.04, 'n': 1.5}) params.set_atmospheric_channel_fourier_modes(2, 2) params.set_oceanic_basin_fourier_modes(2, 4) aip = AtmosphericAnalyticInnerProducts(params) oip = OceanicAnalyticInnerProducts(params) aip.connect_to_ocean(oip) agotensor = QgsTensor(params, aip, oip) # Symbolic dynamic T test params_t = QgParams({'rr': 287.e0, 'sb': 5.6e-8}, dynamic_T=True) params_t.set_params({'kd': 0.04, 'kdp': 0.04, 'n': 1.5}) params_t.set_atmospheric_channel_fourier_modes(2, 2, mode='symbolic') params_t.set_oceanic_basin_fourier_modes(2, 4, mode='symbolic') aip = AtmosphericSymbolicInnerProducts(params_t, quadrature=True) # , stored=False) oip = OceanicSymbolicInnerProducts(params_t, quadrature=True) # , stored=False) agotensor_t = QgsTensorDynamicT(params_t, aip, oip) # Symbolic dynamic T4 test params_t4 = QgParams({'rr': 287.e0, 'sb': 5.6e-8}, T4=True) params_t4.set_params({'kd': 0.04, 'kdp': 0.04, 'n': 1.5}) params_t4.set_atmospheric_channel_fourier_modes(2, 2, mode='symbolic') params_t4.set_oceanic_basin_fourier_modes(2, 4, mode='symbolic') aip = AtmosphericSymbolicInnerProducts(params_t4, quadrature=True) # , stored=False) oip = OceanicSymbolicInnerProducts(params_t4, quadrature=True) # , stored=False) aip.save_to_file("aip.ip") oip.save_to_file("oip.ip") aip.load_from_file("aip.ip") oip.load_from_file("oip.ip") agotensor_t4 = QgsTensorT4(params_t4, aip, oip)