"""
symbolic qgs tensor module
==========================
This module computes and holds the symbolic representation of the tensors representing the tendencies of the model's
equations.
"""
from qgs.functions.symbolic_mul import symbolic_tensordot
from qgs.functions.util import add_to_dict
from qgs.params.params import Parameter, ScalingParameter, ParametersArray, Params
import numpy as np
from sympy import simplify
import pickle
from sympy.matrices.immutable import ImmutableSparseMatrix
from sympy.tensor.array import ImmutableSparseNDimArray
# TODO: Check non stored IP version of this
[docs]
class SymbolicQgsTensor(object):
"""Symbolic qgs tendencies tensor class.
Parameters
----------
params: None or QgParams, optional
The models parameters to configure the tensor. `None` to initialize an empty tensor. Default to `None`.
atmospheric_inner_products: None or AtmosphericInnerProducts, optional
The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are
projected.
If `None`, disable the atmospheric tendencies. Default to `None`.
The inner product is returned in symbolic or numeric form.
oceanic_inner_products: None or OceanicInnerProducts, optional
The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected.
If `None`, disable the oceanic tendencies. Default to `None`.
The inner product is returned in symbolic or numeric form.
ground_inner_products: None or GroundInnerProducts, optional
The inner products of the ground basis functions on which the model's PDE ground equations are projected.
If `None`, disable the ground tendencies. Default to `None`.
The inner product is returned in symbolic or numeric form.
Attributes
----------
params: None or QgParams
The models parameters used to configure the tensor. `None` for an empty tensor.
atmospheric_inner_products: None or AtmosphericInnerProducts
The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are
projected.
If `None`, disable the atmospheric tendencies. Default to `None`.
oceanic_inner_products: None or OceanicInnerProducts
The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected.
If `None`, disable the oceanic tendencies. Default to `None`.
ground_inner_products: None or GroundInnerProducts
The inner products of the ground basis functions on which the model's PDE ground equations are projected.
If `None`, disable the ground tendencies. Default to `None`.
tensor: ~sympy.tensor.array.ImmutableSparseNDimArray
The tensor :math:`\\mathcal{T}_{i,j,k}` :math:`i`-th components.
jacobian_tensor: ~sympy.tensor.array.ImmutableSparseNDimArray
The jacobian tensor :math:`\\mathcal{T}_{i,j,k} + \\mathcal{T}_{i,k,j}` :math:`i`-th components.
"""
def __init__(self, params=None, atmospheric_inner_products=None, oceanic_inner_products=None,
ground_inner_products=None):
self.atmospheric_inner_products = atmospheric_inner_products
self.oceanic_inner_products = oceanic_inner_products
self.ground_inner_products = ground_inner_products
self.params = params
self.tensor = None
self.jacobian_tensor = None
if not self.params.dynamic_T:
self.compute_tensor()
def _psi_a(self, i):
"""Transform the :math:`\\psi_{\\mathrm a}` :math:`i`-th coefficient into the effective model's variable.
Parameters
----------
i: int
The :math:`i`-th coefficients of :math:`\\psi_{\\mathrm a}`
Returns
-------
int
The effective model's variable.
"""
return i + 1
def _theta_a(self, i):
"""Transform the :math:`\\theta_{\\mathrm a}` :math:`i`-th coefficient into the effective model's variable.
Parameters
----------
i: int
The :math:`i`-th coefficients of :math:`\\theta_{\\mathrm a}`
Returns
-------
int
The effective model's variable.
"""
return i + self.params.variables_range[0] + 1
def _psi_o(self, i):
"""Transform the :math:`\\psi_{\\mathrm o}` :math:`i`-th coefficient into the effective model's variable.
Parameters
----------
i: int
The :math:`i`-th coefficients of :math:`\\psi_{\\mathrm o}`
Returns
-------
int
The effective model's variable.
"""
return i + self.params.variables_range[1] + 1
def _deltaT_o(self, i):
"""Transform the :math:`\\delta T_{\\mathrm o}` :math:`i`-th coefficient into the effective model's variable.
Parameters
----------
i: int
The :math:`i`-th coefficients of :math:`\\delta T_{\\mathrm o}`
Returns
-------
int
The effective model's variable.
"""
return i + self.params.variables_range[2] + 1
def _deltaT_g(self, i):
"""Transform the :math:`\\delta T_{\\mathrm o}` :math:`i`-th coefficient into the effective model's variable.
Parameters
----------
i: int
The :math:`i`-th coefficients of :math:`\\delta T_{\\mathrm o}`
Returns
-------
int
The effective model's variable.
"""
return i + self.params.variables_range[1] + 1
def _compute_tensor_dicts(self):
if self.params is None:
return None
if self.atmospheric_inner_products is None and self.oceanic_inner_products is None \
and self.ground_inner_products is None:
return None
aips = self.atmospheric_inner_products
par = self.params
atp = par.atemperature_params
gp = par.ground_params
ap = par.atmospheric_params
nvar = par.number_of_variables
bips = None
if self.oceanic_inner_products is not None:
bips = self.oceanic_inner_products
ocean = True
else:
ocean = False
if self.ground_inner_products is not None:
bips = self.ground_inner_products
ground_temp = True
else:
ground_temp = False
if self.params.dynamic_T:
offset = 1
else:
offset = 0
# constructing some derived matrices
if aips is not None:
a_inv = dict()
for i in range(offset, nvar[1]):
for j in range(offset, nvar[1]):
a_inv[(i - offset, j - offset)] = aips.a(i, j)
a_inv = ImmutableSparseMatrix(nvar[0], nvar[0], a_inv)
a_inv = a_inv.inverse()
a_inv = a_inv.simplify()
a_theta = dict()
for i in range(nvar[1]):
for j in range(nvar[1]):
a_theta[(i, j)] = ap.sig0.symbol * aips.a(i, j) - aips.u(i, j)
a_theta = ImmutableSparseMatrix(nvar[1], nvar[1], a_theta)
a_theta = a_theta.inverse()
a_theta = a_theta.simplify()
if bips is not None:
if ocean:
U_inv = dict()
for i in range(nvar[3]):
for j in range(nvar[3]):
U_inv[(i, j)] = bips.U(i, j)
U_inv = ImmutableSparseMatrix(nvar[3], nvar[3], U_inv)
U_inv = U_inv.inverse()
U_inv = U_inv.simplify()
M_psio = dict()
for i in range(offset, nvar[3]):
for j in range(offset, nvar[3]):
M_psio[(i - offset, j - offset)] = bips.M(i, j) + self.params.G.symbolic_expression * bips.U(i, j)
M_psio = ImmutableSparseMatrix(nvar[2], nvar[2], M_psio)
M_psio = M_psio.inverse()
M_psio = M_psio.simplify()
else:
U_inv = dict()
for i in range(nvar[2]):
for j in range(nvar[2]):
U_inv[(i, j)] = bips.U(i, j)
U_inv = ImmutableSparseMatrix(nvar[2], nvar[2], U_inv)
U_inv = U_inv.inverse()
U_inv = U_inv.simplify()
################
if bips is not None:
go = bips.stored
else:
go = True
sy_arr_dic = dict()
if aips.stored and go:
# psi_a part
a_inv_mult_c = a_inv @ aips._c[offset:, offset:]
if gp is not None:
hk_sym_arr = ImmutableSparseMatrix(gp.hk.symbols)
if gp.orographic_basis == "atmospheric":
a_inv_mult_g = symbolic_tensordot(a_inv, aips._g[offset:, offset:, offset:], axes=1)
oro = symbolic_tensordot(a_inv_mult_g, hk_sym_arr, axes=1)[:, :, 0]
else:
a_inv_mult_gh = symbolic_tensordot(a_inv, aips._gh[offset:, offset:, offset:], axes=1)
oro = symbolic_tensordot(a_inv_mult_gh, hk_sym_arr, axes=1)[:, :, 0]
a_inv_mult_b = symbolic_tensordot(a_inv, aips._b[offset:, offset:, offset:], axes=1)
if ocean:
a_inv_mult_d = a_inv @ aips._d[offset:, offset:]
for i in range(nvar[0]):
for j in range(nvar[0]):
jo = j + offset # skipping the theta 0 variable if it exists
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_a(j), 0), -a_inv_mult_c[i, j] * par.scale_params.beta.symbol)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_a(j), 0), -(ap.kd.symbol * _kronecker_delta(i, j)) / 2)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._theta_a(jo), 0), (ap.kd.symbol * _kronecker_delta(i, j)) / 2)
if gp is not None:
# convert
if gp.hk is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_a(j), 0), -oro[i, j] / 2)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._theta_a(jo), 0), oro[i, j] / 2)
for k in range(nvar[0]):
ko = k + offset # skipping the theta 0 variable if it exists
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_a(j), self._psi_a(k)), -a_inv_mult_b[i, j, k])
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._theta_a(jo), self._theta_a(ko)), -a_inv_mult_b[i, j, k])
if ocean:
for j in range(nvar[2]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_o(j), 0), a_inv_mult_d[i, j] * ap.kd.symbol / 2)
# theta_a part
a_theta_mult_u = a_theta @ aips._u
if self.params.Cpa is not None:
val_Cpa = a_theta_mult_u @ ImmutableSparseMatrix(self.params.Cpa.symbolic_expressions)
if atp.hd is not None and atp.thetas is not None:
thetas_sym_arr = ImmutableSparseMatrix(atp.thetas.symbols)
val_thetas = a_theta_mult_u @ thetas_sym_arr
a_theta_mult_a = a_theta @ aips._a[:, offset:]
a_theta_mult_c = a_theta @ aips._c[:, offset:]
a_theta_mult_g = symbolic_tensordot(a_theta, aips._g[:, offset:, offset:], axes=1)
if gp is not None:
if gp.orographic_basis == "atmospheric":
oro = symbolic_tensordot(a_theta_mult_g, hk_sym_arr, axes=1)[:, :, 0]
else:
a_theta_mult_gh = symbolic_tensordot(a_theta, aips._gh[:, offset:, offset:], axes=1)
oro = symbolic_tensordot(a_theta_mult_gh, hk_sym_arr, axes=1)[:, :, 0]
a_theta_mult_b = symbolic_tensordot(a_theta, aips._b[:, offset:, offset:], axes=1)
if ocean:
a_theta_mult_d = a_theta @ aips._d[:, offset:]
a_theta_mult_s = a_theta @ aips._s
if ground_temp:
a_theta_mult_s = a_theta @ aips._s
for i in range(nvar[1]):
if self.params.Cpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), 0, 0), -val_Cpa[i])
if atp.hd is not None and atp.thetas is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), 0, 0), -val_thetas[i] * atp.hd.symbol)
for j in range(nvar[0]):
jo = j + offset # skipping the theta 0 variable if it exists
val_2 = a_theta_mult_a[i, j] * ap.kd.symbol * ap.sig0.symbol / 2
val_3 = a_theta_mult_a[i, j] * (ap.kd.symbol / 2 + 2 * ap.kdp.symbol) * ap.sig0.symbol
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_a(j), 0), val_2)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(jo), 0), -val_3)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(jo), 0), -a_theta_mult_c[i, j] * par.scale_params.beta.symbol * ap.sig0.symbol)
if gp is not None:
if gp.hk is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(jo), 0), -ap.sig0.symbol * oro[i, j] / 2)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_a(j), 0), ap.sig0.symbol * oro[i, j] / 2)
for k in range(nvar[0]):
ko = k + offset # skipping the theta 0 variable if it exists
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_a(j), self._theta_a(ko)), - a_theta_mult_b[i, j, k] * ap.sig0.symbol)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(jo), self._psi_a(k)), - a_theta_mult_b[i, j, k] * ap.sig0.symbol)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_a(j), self._theta_a(ko)), a_theta_mult_g[i, j, k])
for j in range(nvar[1]):
if self.params.Lpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(j), 0), a_theta_mult_u[i, j] * atp.sc.symbol * self.params.Lpa.symbolic_expression)
if self.params.LSBpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(j), 0), a_theta_mult_u[i, j] * self.params.LSBpa.symbolic_expression)
if atp.hd is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(j), 0), a_theta_mult_u[i, j] * atp.hd)
if ocean:
for j in range(nvar[2]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_o(j), 0), -a_theta_mult_d[i, j] * ap.sig0.symbol * ap.kd.symbol / 2)
if self.params.Lpa is not None:
for j in range(nvar[3]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_o(j), 0), -a_theta_mult_s[i, j] * self.params.Lpa.symbolic_expression / 2)
if self.params.LSBpgo is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_o(j), 0), -a_theta_mult_s[i, j] * self.params.LSBpgo.symbolic_expression)
if ground_temp:
if self.params.Lpa is not None:
for j in range(nvar[2]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_g(j), 0), -a_theta_mult_s[i, j] * self.params.Lpa.symbolic_expression / 2)
if self.params.LSBpgo is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_g(j), 0), -a_theta_mult_s[i, j] * self.params.LSBpgo.symbolic_expression)
if ocean:
# psi_o part
M_psio_mult_K = M_psio @ bips._K[offset:, offset:]
M_psio_mult_N = M_psio @ bips._N[offset:, offset:]
M_psio_mult_M = M_psio @ bips._M[offset:, offset:]
M_psio_mult_C = symbolic_tensordot(M_psio, bips._C[offset:, offset:, offset:], axes=1)
for i in range(nvar[2]):
for j in range(nvar[0]):
jo = j + offset # skipping the theta 0 variable if it exists
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._psi_a(j), 0), M_psio_mult_K[i, j] * par.oceanic_params.d.symbol)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._theta_a(jo), 0), -M_psio_mult_K[i, j] * par.oceanic_params.d.symbol)
for j in range(nvar[2]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._psi_o(j), 0), -M_psio_mult_N[i, j] * par.scale_params.beta.symbol)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._psi_o(j), 0), -M_psio_mult_M[i, j] * (par.oceanic_params.r.symbol + par.oceanic_params.d.symbol))
for k in range(nvar[2]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._psi_o(j), self._psi_o(k)), - M_psio_mult_C[i, j, k])
# deltaT_o part
U_inv_mult_W = U_inv @ bips._W
Cpgo_sym_arr = ImmutableSparseMatrix(self.params.Cpgo.symbolic_expressions)
U_inv_mult_W_Cpgo = U_inv_mult_W @ Cpgo_sym_arr
U_inv_mult_O = symbolic_tensordot(U_inv, bips._O[:, offset:, offset:], axes=1)
for i in range(nvar[3]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), 0, 0), U_inv_mult_W_Cpgo[i])
for j in range(nvar[1]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._theta_a(j), 0), U_inv_mult_W[i, j] * 2 * atp.sc.symbol * self.params.Lpgo.symbolic_expression)
if self.params.sbpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._theta_a(j), 0), U_inv_mult_W[i, j] * self.params.sbpa.symbolic_expression)
for j in range(nvar[3]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._deltaT_o(j), 0), - self.params.Lpgo.symbolic_expression * _kronecker_delta(i, j))
if self.params.sbpgo is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._deltaT_o(j), 0), - self.params.sbpgo.symbolic_expression * _kronecker_delta(i, j))
for j in range(nvar[2]):
for k in range(nvar[2]):
ko = k + offset # skipping the T 0 variable if it exists
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._psi_o(j), self._deltaT_o(ko)), -U_inv_mult_O[i, j, k])
# deltaT_g part
if ground_temp:
U_inv_mult_W = U_inv @ bips._W
Cpgo_sym_arr = ImmutableSparseMatrix(self.params.Cpgo.symbolic_expressions)
U_inv_mult_W_Cpgo = U_inv_mult_W @ Cpgo_sym_arr
for i in range(nvar[2]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), 0, 0), U_inv_mult_W_Cpgo[i])
for j in range(nvar[1]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._theta_a(j), 0), U_inv_mult_W[i, j] * 2 * atp.sc.symbol * self.params.Lpgo.symbolic_expression)
if self.params.sbpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._theta_a(j), 0), U_inv_mult_W[i, j] * self.params.sbpa.symbolic_expression)
for j in range(nvar[2]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._deltaT_g(j), 0), - self.params.Lpgo.symbolic_expression * _kronecker_delta(i, j))
if self.params.sbpgo is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._deltaT_g(j), 0), - self.params.sbpgo.symbolic_expression * _kronecker_delta(i, j))
else:
# psi_a part
for i in range(nvar[0]):
for j in range(nvar[0]):
jo = j + offset
val = 0
for jj in range(nvar[0]):
val += a_inv[i, jj] * aips.c(offset + jj, jo)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_a(j), 0), - val * par.scale_params.beta.symbol)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_a(j), 0), - (ap.kd.symbol * _kronecker_delta(i, j)) / 2)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._theta_a(jo), 0), (ap.kd.symbol * _kronecker_delta(i, j)) / 2)
if gp is not None:
hk_sym_arr = ImmutableSparseNDimArray(gp.hk.symbols)
if gp.hk is not None:
oro = 0
if gp.orographic_basis == "atmospheric":
for jj in range(nvar[0]):
for kk in range(nvar[0]):
oro += a_inv[i, jj] * aips.g(offset + jj, j, offset + kk) * hk_sym_arr[kk]
else:
for jj in range(nvar[0]):
for kk in range(nvar[0]):
oro += a_inv[i, jj] * aips.gh(offset + jj, j, offset + kk) * hk_sym_arr[kk]
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_a(j), 0), - oro / 2)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._theta_a(jo), 0), oro / 2)
for k in range(nvar[0]):
ko = k + offset # skipping the theta 0 variable if it exists
val = 0
for jj in range(nvar[0]):
val += a_inv[i, jj] * aips.b(offset + jj, jo, ko)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_a(j), self._psi_a(k)), - val)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._theta_a(jo), self._theta_a(ko)), - val)
if ocean:
for j in range(nvar[2]):
jo = j + offset
val = 0
for jj in range(nvar[0]):
val += a_inv[i, jj] * aips.d(offset + jj, jo)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_a(i), self._psi_o(j), 0), val * ap.kd.symbol / 2)
# theta_a part
for i in range(nvar[1]):
if self.params.Cpa is not None:
val = 0
for jj in range(nvar[1]):
for kk in range(nvar[1]):
val -= a_theta[i, jj] * aips.u(jj, kk) * self.params.Cpa.symbolic_expressions[kk]
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), 0, 0), val)
if atp.hd is not None and atp.thetas is not None:
thetas_sym_arr = ImmutableSparseNDimArray(atp.thetas.symbols)
val = 0
for jj in range(nvar[1]):
for kk in range(nvar[1]):
val -= a_theta[i, jj] * aips.u(jj, kk) * thetas_sym_arr[kk]
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), 0, 0), val * atp.hd.symbol)
for j in range(nvar[0]):
jo = j + offset # skipping the theta 0 variable if it exists
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips.a(jj, jo)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_a(j), 0), val * ap.kd.symbol * ap.sig0.symbol / 2)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(jo), 0), - val * (ap.kd.symbol / 2 - 2 * ap.kdp.symbol) * ap.sig0.symbol)
val = 0
for jj in range(nvar[1]):
val -= a_theta[i, jj] * aips.c(jj, jo)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(jo), 0), val * par.scale_params.beta.symbol * ap.sig0.symbol)
if gp is not None:
if gp.hk is not None:
oro = 0
if gp.orographic_basis == "atmospheric":
for jj in range(nvar[1]):
for kk in range(nvar[0]):
oro += a_theta[i, jj] * aips.g(jj, jo, offset + kk) * gp.hk[kk].symbol
else:
for jj in range(nvar[1]):
for kk in range(nvar[0]):
oro += a_theta[i, jj] * aips.gh(jj, jo, offset + kk) * gp.hk[kk].symbol
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(jo), 0), - ap.sig0.symbol * oro / 2)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_a(j), 0), ap.sig0.symbol * oro / 2)
for k in range(nvar[0]):
ko = k + offset # skipping the theta 0 variable if it exists
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips.b(jj, jo, ko)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_a(j), self._theta_a(ko)), - val * ap.sig0.symbol)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(jo), self._psi_a(k)), - val * ap.sig0.symbol)
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips.g(jj, jo, ko)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_a(j), self._theta_a(ko)), val)
for j in range(nvar[1]):
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips.u(jj, j)
if self.params.Lpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(j), 0), val * atp.sc.symbol * self.params.Lpa.symbolic_expression)
if self.params.LSBpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(j), 0), val * self.params.LSBpa.symbolic_expression)
if atp.hd is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(j), 0), val * atp.hd.symbol)
if ocean:
for j in range(nvar[2]):
jo = j + offset # skipping the theta 0 variable if it exists
val = 0
for jj in range(nvar[1]):
val -= a_theta[i, jj] * aips.d(jj, jo)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._psi_o(j), 0), val * ap.sig0.symbol * ap.kd.symbol / 2)
if self.params.Lpa is not None:
for j in range(nvar[3]):
val = 0
for jj in range(nvar[1]):
val -= a_theta[i, jj] * aips.s(jj, j)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_o(j), 0), val * self.params.Lpa.symbolic_expression / 2)
if self.params.LSBpgo is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_o(j), 0), val * self.params.LSBpgo.symbolic_expression)
if ground_temp:
if self.params.Lpa is not None:
for j in range(nvar[2]):
val = 0
for jj in range(nvar[1]):
val -= a_theta[i, jj] * aips.s(jj, j)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_g(j), 0), val * self.params.Lpa.symbolic_expression / 2)
if self.params.LSBpgo is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_g(j), 0), val * self.params.LSBpgo.symbolic_expression)
if ocean:
# psi_o part
for i in range(nvar[2]):
for j in range(nvar[0]):
jo = j + offset # skipping the theta 0 variable if it exists
for jj in range(nvar[2]):
val = M_psio[i, jj] * bips.K(offset + jj, jo) * par.oceanic_params.d.symbol
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._psi_a(j), 0), val)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._theta_a(jo), 0), - val)
for j in range(nvar[2]):
jo = j + offset # skipping the T 0 variable if it exists
val = 0
for jj in range(nvar[2]):
val -= M_psio[i, jj] * bips.N(offset + jj, jo)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._psi_o(j), 0), val * par.scale_params.beta.symbol)
val = 0
for jj in range(nvar[2]):
val -= M_psio[i, jj] * bips.M(offset + jj, jo)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._psi_o(j), 0), val * (par.oceanic_params.r.symbol + par.oceanic_params.d.symbol))
for k in range(nvar[2]):
ko = k + offset # skipping the T 0 variable if it exists
val = 0
for jj in range(nvar[2]):
val -= M_psio[i, jj] * bips.C(offset + jj, jo, ko)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._psi_o(i), self._psi_o(j), self._psi_o(k)), val)
# deltaT_o part
for i in range(nvar[3]):
val = 0
for jj in range(nvar[1]):
for kk in range(nvar[3]):
val += U_inv[i, kk] * bips.W(kk, jj) * self.params.Cpgo.symbolic_expressions[jj]
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), 0, 0), val)
for j in range(nvar[1]):
val = 0
for jj in range(nvar[3]):
val += U_inv[i, jj] * bips.W(jj, j)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._theta_a(j), 0), val * 2 * atp.sc.symbol * self.params.Lpgo.symbolic_expression)
if self.params.sbpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._theta_a(j), 0), val * self.params.sbpa.symbolic_expression)
for j in range(nvar[3]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._deltaT_o(j), 0), - self.params.Lpgo.symbolic_expression * _kronecker_delta(i, j))
if self.params.sbpgo is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._deltaT_o(j), 0), - self.params.sbpgo.symbolic_expression * _kronecker_delta(i, j))
for j in range(nvar[2]):
for k in range(offset, nvar[3]):
jo = j + offset # skipping the T 0 variable if it exists
val = 0
for jj in range(nvar[3]):
val -= U_inv[i, jj] * bips.O(jj, jo, k)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._psi_o(j), self._deltaT_o(k)), val)
# deltaT_g part
if ground_temp:
for i in range(nvar[2]):
val = 0
for jj in range(nvar[1]):
for kk in range(nvar[2]):
val += U_inv[i, kk] * bips.W(kk, jj) * self.params.Cpgo.symbolic_expressions[jj]
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), 0, 0), val)
for j in range(nvar[1]):
val = 0
for jj in range(nvar[2]):
val += U_inv[i, jj] * bips.W(jj, j)
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._theta_a(j), 0), val * 2 * atp.sc.symbol * self.params.Lpgo.symbolic_expression)
if self.params.sbpa is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._theta_a(j), 0), val * self.params.sbpa.symbolic_expression)
for j in range(nvar[2]):
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._deltaT_g(j), 0), - self.params.Lpgo.symbolic_expression * _kronecker_delta(i, j))
if self.params.sbpgo is not None:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._deltaT_g(j), 0), - self.params.sbpgo.symbolic_expression * _kronecker_delta(i, j))
return sy_arr_dic
[docs]
def compute_tensor(self):
"""Routine to compute the tensor."""
sy_arr_dic = self._compute_tensor_dicts()
sy_arr_dic = self.remove_dic_zeros(sy_arr_dic)
if sy_arr_dic is not None:
self._set_tensor(sy_arr_dic)
def _set_tensor(self, dic, set_symbolic=False):
self.jac_dic = self.remove_dic_zeros(self.jacobian_from_dict(dic))
self.tensor_dic = self.remove_dic_zeros(self.simplify_dict(dic))
if set_symbolic:
self._set_symbolic_tensor()
def _set_symbolic_tensor(self):
ndim = self.params.ndim
if self.params.dynamic_T:
if self.params.T4:
raise ValueError("Symbolic tensor output not configured for T4 version, use Dynamic T version")
else:
dims = (ndim + 1, ndim + 1, ndim + 1, ndim + 1, ndim + 1)
else:
dims = (ndim + 1, ndim + 1, ndim + 1)
jacobian_tensor = ImmutableSparseNDimArray(self.jac_dic.copy(), dims)
tensor = ImmutableSparseNDimArray(self.tensor_dic.copy(), dims)
self.jacobian_tensor = jacobian_tensor.applyfunc(simplify)
self.tensor = tensor.applyfunc(simplify)
[docs]
@staticmethod
def remove_dic_zeros(dic):
"""Removes zero values from dictionary
Parameters
----------
tensor: dict
dictionary which could include 0 in values
Returns
-------
ten_out: dict
dictionary with same keys and values as input, but keys with value of 0 are removed
"""
non_zero_dic = dict()
for key in dic.keys():
if dic[key] != 0:
non_zero_dic[key] = dic[key]
return non_zero_dic
[docs]
@staticmethod
def jacobian_from_dict(dic):
"""Calculates the Jacobian from the qgs tensor
Parameters
----------
dic: dict
dictionary of tendencies of the model
Returns
-------
dic_jac: dict
Jacobian tensor stored in a dictionary
"""
rank = max([len(i) for i in dic.keys()])
n_perm = rank - 2
orig_order = [i for i in range(rank)]
keys = dic.keys()
dic_jac = dic.copy()
for i in range(1, n_perm+1):
new_pos = orig_order.copy()
new_pos[1] = orig_order[i+1]
new_pos[i+1] = orig_order[1]
for key in keys:
dic_jac = add_to_dict(dic_jac, tuple(key[i] for i in new_pos), dic[key])
return dic_jac
[docs]
@staticmethod
def simplify_dict(dic):
"""calculates the upper triangular tensor of a given tensor, stored in dictionary
Parameters
----------
dic: dict
dictionary of tendencies of the model
Returns
-------
dic_upp: dict
Upper triangular tensor, stored as a tensor where the keys are the coordinates of the corresponding value.
"""
keys = dic.keys()
dic_upp = dict()
for key in keys:
new_key = tuple([key[0]] + sorted(key[1:]))
dic_upp = add_to_dict(dic_upp, new_key, dic[key])
return dic_upp
[docs]
def save_to_file(self, filename, **kwargs):
"""Function to save the tensor object to a file with the :mod:`pickle` module.
Parameters
----------
filename: str
The file name where to save the tensor object.
kwargs: dict
Keyword arguments to pass to the :mod:`pickle` module method.
"""
f = open(filename, 'wb')
pickle.dump(self.__dict__, f, **kwargs)
f.close()
[docs]
def sub_tensor(self, tensor=None, continuation_variables=None):
"""Uses sympy substitution to convert the symbolic tensor or a symbolic dictionary to a numerical one.
Parameters
----------
tensor: dict(~sympy.core.expr.Expr or float) or ~sympy.tensor.array.ImmutableSparseNDimArray
Tensor of model tendencies, either as
- a dictionary with keys of non-zero coordinates, and values of Sympy expressions or floats
- or a sparse Sympy tensor
continuation_variables: list(Parameter, ScalingParameter or ParametersArray) or `None`
Variables which remain symbolic, all other variables are substituted with numerical values.
If `None` all variables are substituted.
Returns
-------
ten_out: dict(float)
Dictionary of the substituted tensor of the model tendencies, with coordinates and numerical values
"""
if continuation_variables is None:
continuation_variables = list()
param_subs = _parameter_substitutions(self.params, continuation_variables)
if tensor is None:
ten = self.tensor_dic
else:
ten = tensor
if isinstance(ten, dict):
ten_out = dict()
for key in ten.keys():
val = ten[key].subs(param_subs)
try:
ten_out[key] = float(val)
except:
ten_out[key] = val
else:
# Assuming the tensor is a sympy tensor
ten_out = ten.subs(param_subs)
return ten_out
[docs]
def print_tensor(self, tensor=None, dict_opp=True, tol=1e-10):
"""Print the non-zero coordinates of values of the tensor of the model tendencies
Parameters
----------
tensor: dict(~sympy.core.expr.Expr or float) or ~sympy.tensor.array.ImmutableSparseNDimArray or `None`
Tensor of model tendencies, either as
- a dictionary with keys of non-zero coordinates, and values of Sympy expressions or floats
- or a sparse Sympy tensor
If `None`, defaults to the stored tensor.
Defaults to `None`.
dict_opp: bool
If `True`, returns the unsimplified symbolic expressions, if `False` the simplified expressions are returned.
tol: float
The tolerance to allow for numerical errors when finding non-zero values.
"""
if tensor is None:
if dict_opp:
temp_ten = self.tensor_dic
else:
temp_ten = self.tensor
else:
temp_ten = tensor
if isinstance(temp_ten, dict):
val_list = [(key, temp_ten[key]) for key in temp_ten.keys()]
else:
val_list = np.ndenumerate(temp_ten)
for ix, v in val_list:
if isinstance(v, float):
bool_test = (abs(v) > tol)
else:
bool_test = (v != 0)
if bool_test:
try:
output_val = float(v)
except:
try:
output_val = v.simplify().evalf()
except:
output_val = v
print(str(ix) + ": " + str(output_val))
[docs]
class SymbolicQgsTensorDynamicT(SymbolicQgsTensor):
"""qgs dynamical temperature first order (linear) symbolic tendencies tensor class.
Parameters
----------
params: None or QgParams, optional
The models parameters to configure the tensor. `None` to initialize an empty tensor. Default to `None`.
atmospheric_inner_products: None or AtmosphericInnerProducts, optional
The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected.
If `None`, disable the atmospheric tendencies. Default to `None`.
oceanic_inner_products: None or OceanicInnerProducts, optional
The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected.
If `None`, disable the oceanic tendencies. Default to `None`.
ground_inner_products: None or GroundInnerProducts, optional
The inner products of the ground basis functions on which the model's PDE ground equations are projected.
If `None`, disable the ground tendencies. Default to `None`.
Attributes
----------
params: None or QgParams
The models parameters used to configure the tensor. `None` for an empty tensor.
atmospheric_inner_products: None or AtmosphericInnerProducts
The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected.
If `None`, disable the atmospheric tendencies. Default to `None`.
oceanic_inner_products: None or OceanicInnerProducts
The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected.
If `None`, disable the oceanic tendencies. Default to `None`.
ground_inner_products: None or GroundInnerProducts
The inner products of the ground basis functions on which the model's PDE ground equations are projected.
If `None`, disable the ground tendencies. Default to `None`.
tensor: ~sympy.tensor.array.ImmutableSparseNDimArray
The tensor :math:`\\mathcal{T}_{i,j,k}` :math:`i`-th components.
jacobian_tensor: ~sympy.tensor.array.ImmutableSparseNDimArray
The jacobian tensor :math:`\\mathcal{T}_{i,j,k} + \\mathcal{T}_{i,k,j}` :math:`i`-th components.
"""
def __init__(self, params=None, atmospheric_inner_products=None, oceanic_inner_products=None, ground_inner_products=None):
SymbolicQgsTensor.__init__(self, params, atmospheric_inner_products, oceanic_inner_products, ground_inner_products)
if params.dynamic_T:
self.compute_tensor()
def _compute_tensor_dicts(self):
if self.params is None:
return None
if self.atmospheric_inner_products is None and self.oceanic_inner_products is None \
and self.ground_inner_products is None:
return None
aips = self.atmospheric_inner_products
bips = None
if self.oceanic_inner_products is not None:
bips = self.oceanic_inner_products
elif self.ground_inner_products is not None:
bips = self.ground_inner_products
if bips is not None:
go = bips.stored
else:
go = True
if aips.stored and go:
symbolic_array_full_dict = self._compute_stored_full_dict()
else:
symbolic_array_full_dict = self._compute_non_stored_full_dict()
return symbolic_array_full_dict
def _compute_stored_full_dict(self):
par = self.params
nvar = par.number_of_variables
aips = self.atmospheric_inner_products
bips = None
if self.oceanic_inner_products is not None:
bips = self.oceanic_inner_products
ocean = True
else:
ocean = False
if self.ground_inner_products is not None:
bips = self.ground_inner_products
ground_temp = True
else:
ground_temp = False
# constructing some derived matrices
if aips is not None:
a_theta = dict()
for i in range(nvar[1]):
for j in range(nvar[1]):
a_theta[(i, j)] = par.atmospheric_params.sig0.symbol * aips.a(i, j) - aips.u(i, j)
a_theta = ImmutableSparseMatrix(nvar[1], nvar[1], a_theta)
a_theta = a_theta.inverse()
if bips is not None:
U_inv = dict()
if ocean:
for i in range(nvar[3]):
for j in range(nvar[3]):
U_inv[(i, j)] = bips.U(i, j)
U_inv = ImmutableSparseMatrix(nvar[3], nvar[3], U_inv)
U_inv = U_inv.inverse()
else:
for i in range(nvar[2]):
for j in range(nvar[2]):
U_inv[i, j] = bips.U(i, j)
U_inv = ImmutableSparseMatrix(nvar[2], nvar[2], U_inv)
U_inv = U_inv.inverse()
#################
sy_arr_dic = dict()
# theta_a part
for i in range(nvar[1]):
if self.params.T4LSBpa is not None:
j = k = ell = 0
for m in range(nvar[1]):
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips._z[jj, j, k, ell, m]
if m == 0:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)), self.params.T4LSBpa.symbolic_expression * val)
else:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)), 4 * self.params.T4LSBpa.symbolic_expression * val)
if ocean:
if self.params.T4LSBpgo is not None:
j = k = ell = 0
for m in range(nvar[3]):
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips._v[jj, j, k, ell, m]
if m == 0:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)), -self.params.T4LSBpgo.symbolic_expression * val)
else:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)), -4 * self.params.T4LSBpgo.symbolic_expression * val)
if ground_temp:
if self.params.T4LSBpgo is not None:
j = k = ell = 0
for m in range(nvar[2]):
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips._v[jj, j, k, ell, m]
if m == 0:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)), -self.params.T4LSBpgo.symbolic_expression * val)
else:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._theta_a(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)), -4 * self.params.T4LSBpgo.symbolic_expression * val)
if ocean:
# delta_T part
for i in range(nvar[3]):
j = k = ell = 0
for m in range(nvar[1]):
val = 0
for jj in range(nvar[3]):
val += U_inv[i, jj] * bips._Z[jj, j, k, ell, m]
if m == 0:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)), self.params.T4sbpa.symbolic_expression * val)
else:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)), 4 * self.params.T4sbpa.symbolic_expression * val)
for m in range(nvar[3]):
val = 0
for jj in range(nvar[3]):
val += U_inv[i, jj] * bips._V[jj, j, k, ell, m]
if m == 0:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)), - self.params.T4sbpgo.symbolic_expression * val)
else:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_o(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m)), -4 * self.params.T4sbpgo.symbolic_expression * val)
if ground_temp:
# deltaT_g part
for i in range(nvar[2]):
j = k = ell = 0
for m in range(nvar[1]):
val = 0
for jj in range(nvar[2]):
val += U_inv[i, jj] * bips._Z[jj, j, k, ell, m]
if m == 0:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)), self.params.T4sbpa.symbolic_expression * val)
else:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m)), 4 * self.params.T4sbpa.symbolic_expression * val)
for m in range(nvar[2]):
val = 0
for jj in range(nvar[2]):
val += U_inv[i, jj] * bips._V[jj, j, k, ell, m]
if m == 0:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)), -self.params.T4sbpgo.symbolic_expression * val)
else:
sy_arr_dic = add_to_dict(sy_arr_dic, (self._deltaT_g(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m)), -4 * self.params.T4sbpgo.symbolic_expression * val)
return sy_arr_dic
def _compute_non_stored_full_dict(self):
par = self.params
nvar = par.number_of_variables
aips = self.atmospheric_inner_products
bips = None
if self.oceanic_inner_products is not None:
bips = self.oceanic_inner_products
ocean = True
else:
ocean = False
if self.ground_inner_products is not None:
bips = self.ground_inner_products
ground_temp = True
else:
ground_temp = False
# constructing some derived matrices
if aips is not None:
a_theta = dict()
for i in range(nvar[1]):
for j in range(nvar[1]):
a_theta[(i, j)] = par.atmospheric_params.sig0.symbol * aips.a(i, j) - aips.u(i, j)
a_theta = ImmutableSparseMatrix(nvar[1], nvar[1], a_theta)
a_theta = a_theta.inverse()
if bips is not None:
if ocean:
U_inv = dict()
for i in range(nvar[3]):
for j in range(nvar[3]):
U_inv[i, j] = bips.U(i, j)
U_inv = ImmutableSparseMatrix(nvar[3], nvar[3], U_inv)
else:
U_inv = dict()
for i in range(nvar[2]):
for j in range(nvar[2]):
U_inv[(i, j)] = bips.U(i, j)
U_inv = ImmutableSparseMatrix(nvar[2], nvar[2], U_inv)
U_inv = U_inv.inverse()
#################
sy_arr_dic = dict()
# theta_a part
for i in range(nvar[1]):
if self.params.T4LSBpa is not None:
j = k = ell = 0
for m in range(nvar[1]):
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips.z(jj, j, k, ell, m)
if m == 0:
sy_arr_dic[(self._theta_a(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = self.params.T4LSBpa.symbolic_expression * val
else:
sy_arr_dic[(self._theta_a(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = 4 * self.params.T4LSBpa.symbolic_expression * val
if ocean:
if self.params.T4LSBpgo is not None:
j = k = ell = 0
for m in range(nvar[3]):
val = 0
for jj in range(nvar[1]):
val -= a_theta[i, jj] * aips.v(jj, j, k, ell, m)
if m == 0:
sy_arr_dic[(self._theta_a(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m))] = self.params.T4LSBpgo.symbolic_expression * val
else:
sy_arr_dic[(self._theta_a(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m))] = 4 * self.params.T4LSBpgo.symbolic_expression * val
if ground_temp:
if self.params.T4LSBpgo is not None:
j = k = ell = 0
for m in range(nvar[2]):
val = 0
for jj in range(nvar[1]):
val -= a_theta[i, jj] * aips.v(jj, j, k, ell, m)
if m == 0:
sy_arr_dic[(self._theta_a(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m))] = self.params.T4LSBpgo.symbolic_expression * val
else:
sy_arr_dic[(self._theta_a(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m))] = 4 * self.params.T4LSBpgo.symbolic_expression * val
if ocean:
# deltaT_o part
for i in range(nvar[3]):
j = k = ell = 0
for m in range(nvar[1]):
val = 0
for jj in range(nvar[3]):
val += U_inv[i, jj] * bips.Z(jj, j, k, ell, m)
if m == 0:
sy_arr_dic[(self._deltaT_o(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = self.params.T4sbpa.symbolic_expression * val
else:
sy_arr_dic[(self._deltaT_o(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = 4 * self.params.T4sbpa.symbolic_expression * val
for m in range(nvar[3]):
val = 0
for jj in range(nvar[3]):
val -= U_inv[i, jj] * bips.V(jj, j, k, ell, m)
if m == 0:
sy_arr_dic[(self._deltaT_o(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m))] = self.params.T4sbpgo.symbolic_expression * val
else:
sy_arr_dic[(self._deltaT_o(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m))] = 4 * self.params.T4sbpgo.symbolic_expression * val
# deltaT_g part
if ground_temp:
for i in range(nvar[2]):
j = k = ell = 0
for m in range(nvar[1]):
val = 0
for jj in range(nvar[2]):
val += U_inv[i, jj] * bips._Z[jj, j, k, ell, m]
if m == 0:
sy_arr_dic[(self._deltaT_g(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = self.params.T4sbpa.symbolic_expression * val
else:
sy_arr_dic[(self._deltaT_g(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = 4 * self.params.T4sbpa.symbolic_expression * val
for m in range(nvar[2]):
val = 0
for jj in range(nvar[2]):
val -= U_inv[i, jj] * bips._V[jj, j, k, ell, m]
if m == 0:
sy_arr_dic[(self._deltaT_g(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m))] = self.params.T4sbpgo.symbolic_expression * val
else:
sy_arr_dic[(self._deltaT_g(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m))] = 4 * self.params.T4sbpgo.symbolic_expression * val
return sy_arr_dic
[docs]
def compute_tensor(self):
"""Routine to compute the tensor."""
if self.params.T4:
# TODO: Make a proper error message for here
raise ValueError("Parameters are set for T4 version, set dynamic_T=True")
symbolic_dict_linear = SymbolicQgsTensor._compute_tensor_dicts(self)
symbolic_dict_linear = _shift_dict_keys(symbolic_dict_linear, (0, 0))
symbolic_dict_dynT = self._compute_tensor_dicts()
if symbolic_dict_linear is not None:
symbolic_dict_dynT = {**symbolic_dict_linear, **symbolic_dict_dynT}
if symbolic_dict_dynT is not None:
self._set_tensor(symbolic_dict_dynT)
[docs]
class SymbolicQgsTensorT4(SymbolicQgsTensor):
"""qgs dynamical temperature first order (linear) symbolic tendencies tensor class.
Parameters
----------
params: None or QgParams, optional
The models parameters to configure the tensor. `None` to initialize an empty tensor. Default to `None`.
atmospheric_inner_products: None or AtmosphericInnerProducts, optional
The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected.
If `None`, disable the atmospheric tendencies. Default to `None`.
oceanic_inner_products: None or OceanicInnerProducts, optional
The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected.
If `None`, disable the oceanic tendencies. Default to `None`.
ground_inner_products: None or GroundInnerProducts, optional
The inner products of the ground basis functions on which the model's PDE ground equations are projected.
If `None`, disable the ground tendencies. Default to `None`.
Attributes
----------
params: None or QgParams
The models parameters used to configure the tensor. `None` for an empty tensor.
atmospheric_inner_products: None or AtmosphericInnerProducts
The inner products of the atmospheric basis functions on which the model's PDE atmospheric equations are projected.
If `None`, disable the atmospheric tendencies. Default to `None`.
oceanic_inner_products: None or OceanicInnerProducts
The inner products of the oceanic basis functions on which the model's PDE oceanic equations are projected.
If `None`, disable the oceanic tendencies. Default to `None`.
ground_inner_products: None or GroundInnerProducts
The inner products of the ground basis functions on which the model's PDE ground equations are projected.
If `None`, disable the ground tendencies. Default to `None`.
tensor: ~sympy.tensor.array.ImmutableSparseNDimArray
The tensor :math:`\\mathcal{T}_{i,j,k}` :math:`i`-th components.
jacobian_tensor: ~sympy.tensor.array.ImmutableSparseNDimArray
The jacobian tensor :math:`\\mathcal{T}_{i,j,k} + \\mathcal{T}_{i,k,j}` :math:`i`-th components.
"""
def __init__(self, params=None, atmospheric_inner_products=None, oceanic_inner_products=None, ground_inner_products=None):
SymbolicQgsTensor.__init__(self, params, atmospheric_inner_products, oceanic_inner_products, ground_inner_products)
if params.T4:
self.compute_tensor()
def _compute_non_stored_full_dict(self):
par = self.params
nvar = par.number_of_variables
aips = self.atmospheric_inner_products
bips = None
if self.oceanic_inner_products is not None:
bips = self.oceanic_inner_products
ocean = True
else:
ocean = False
if self.ground_inner_products is not None:
bips = self.ground_inner_products
ground_temp = True
else:
ground_temp = False
# constructing some derived matrices
if aips is not None:
a_theta = dict()
for i in range(nvar[1]):
for j in range(nvar[1]):
a_theta[(i, j)] = par.atmospheric_params.sig0.symbol * aips.a(i, j) - aips.u(i, j)
a_theta = ImmutableSparseMatrix(nvar[1], nvar[1], a_theta)
a_theta = a_theta.inverse()
if bips is not None:
if ocean:
U_inv = dict()
for i in range(nvar[3]):
for j in range(nvar[3]):
U_inv[i, j] = bips.U(i, j)
U_inv = ImmutableSparseMatrix(nvar[3], nvar[3], U_inv)
else:
U_inv = dict()
for i in range(nvar[2]):
for j in range(nvar[2]):
U_inv[(i, j)] = bips.U(i, j)
U_inv = ImmutableSparseMatrix(nvar[2], nvar[2], U_inv)
U_inv = U_inv.inverse()
#################
sy_arr_dic = dict()
# theta_a part
for i in range(nvar[1]):
if self.params.T4LSBpa is not None:
for j in range(nvar[1]):
for k in range(nvar[1]):
for ell in range(nvar[1]):
for m in range(nvar[1]):
val = 0
for jj in range(nvar[1]):
val += a_theta[i, jj] * aips.z(jj, j, k, ell, m)
sy_arr_dic[(self._theta_a(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = self.params.T4LSBpa.symbolic_expression * val
if ocean:
if self.params.T4LSBpgo is not None:
for j in range(nvar[3]):
for k in range(nvar[3]):
for ell in range(nvar[3]):
for m in range(nvar[3]):
val = 0
for jj in range(nvar[1]):
val -= a_theta[i, jj] * aips.v(jj, j, k, ell, m)
sy_arr_dic[(self._theta_a(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m))] = self.params.T4LSBpgo.symbolic_expression * val
if ground_temp:
if self.params.T4LSBpgo is not None:
for j in range(nvar[2]):
for k in range(nvar[2]):
for ell in range(nvar[2]):
for m in range(nvar[2]):
val = 0
for jj in range(nvar[1]):
val -= a_theta[i, jj] * aips.v(jj, j, k, ell, m)
sy_arr_dic[(self._theta_a(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m))] = self.params.T4LSBpgo.symbolic_expression * val
if ocean:
# deltaT_o part
for i in range(nvar[3]):
for j in range(nvar[1]):
for k in range(nvar[1]):
for ell in range(nvar[1]):
for m in range(nvar[1]):
val = 0
for jj in range(nvar[3]):
val += U_inv[i, jj] * bips.Z(jj, j, k, ell, m)
sy_arr_dic[(self._deltaT_o(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = self.params.T4LSBpa.symbolic_expression * val
for j in range(nvar[3]):
for k in range(nvar[3]):
for ell in range(nvar[3]):
for m in range(nvar[3]):
val = 0
for jj in range(nvar[3]):
val -= U_inv[i, jj] * bips.V(jj, j, k, ell, m)
sy_arr_dic[(self._deltaT_o(i), self._deltaT_o(j), self._deltaT_o(k), self._deltaT_o(ell), self._deltaT_o(m))] = self.params.T4LSBpgo.symbolic_expression * val
# deltaT_g part
if ground_temp:
for i in range(nvar[2]):
for j in range(nvar[1]):
for k in range(nvar[1]):
for ell in range(nvar[1]):
for m in range(nvar[1]):
val = 0
for jj in range(nvar[2]):
val += U_inv[i, jj] * bips._Z[jj, j, k, ell, m]
sy_arr_dic[(self._deltaT_g(i), self._theta_a(j), self._theta_a(k), self._theta_a(ell), self._theta_a(m))] = self.params.T4LSBpa.symbolic_expression * val
for j in range(nvar[2]):
for k in range(nvar[2]):
for ell in range(nvar[2]):
for m in range(nvar[2]):
val = 0
for jj in range(nvar[2]):
val -= U_inv[i, jj] * bips._V[jj, j, k, ell, m]
sy_arr_dic[(self._deltaT_g(i), self._deltaT_g(j), self._deltaT_g(k), self._deltaT_g(ell), self._deltaT_g(m))] = self.params.T4LSBpgo.symbolic_expression * val
return sy_arr_dic
[docs]
def compute_tensor(self):
"""Routine to compute the tensor."""
if not self.params.T4:
raise ValueError("Parameters are not set for T4 version")
symbolic_dict_linear = SymbolicQgsTensor._compute_tensor_dicts(self)
symbolic_dict_linear = _shift_dict_keys(symbolic_dict_linear, (0, 0))
symbolic_dict_T4 = self._compute_non_stored_full_dict()
if symbolic_dict_linear is not None:
symbolic_dict_T4 = {**symbolic_dict_linear, **symbolic_dict_T4}
if symbolic_dict_T4 is not None:
self._set_tensor(symbolic_dict_T4)
def _kronecker_delta(i, j):
if i == j:
return 1
else:
return 0
def _shift_dict_keys(dic, shift):
"""
Keys of given dictionary are altered to add values in the given indicies
Parameters
----------
dic: dict
Dictionary representing a tensor.
shift: tuple
A tuple that represents the shift in the tensor indices.
"""
shifted_dic = dict()
for key in dic.keys():
new_key = key + shift
shifted_dic[new_key] = dic[key]
return shifted_dic
def _parameter_substitutions(params, continuation_variables):
"""
Returns a dict of parameters values that are to be substituted,
removing the parameters given in `continuation_variables`.
"""
subs = params._all_items
if continuation_variables is None:
continuation_variables = list()
# Remove variables in continuation variables
for cv in continuation_variables:
if isinstance(cv, ParametersArray):
for cv_i in cv:
subs.remove(cv_i)
elif isinstance(cv, Parameter):
subs.remove(cv)
else: # Try ... who knows...
subs.remove(cv)
# make the remaining items into a dict to pass to sympy subs function
sub_dic = {}
for p in subs:
if p.symbol is not None:
sub_dic[p.symbol] = float(p)
return sub_dic
if __name__ == "__main__":
dic = dict()
dic = add_to_dict(dic, (0, 0), 1)
dic = add_to_dict(dic, (0, 0), 2)
print(dic)
from qgs.params.params import QgParams
from qgs.inner_products import symbolic
params = QgParams({'rr': 287.e0, 'sb': 5.6e-8})
params.set_atmospheric_channel_fourier_modes(6, 6, mode="symbolic")
params.atmospheric_params.set_params({'sigma': 0.2, 'kd': 0.1, 'kdp': 0.01})
params.ground_params.set_orography(0.2, 1)
params.atemperature_params.set_thetas(0.1, 0)
aip = symbolic.AtmosphericSymbolicInnerProducts(params, return_symbolic=True, make_substitution=True)
# sym_aotensor = SymbolicQgsTensor(params=params, atmospheric_inner_products=aip)