Source code for TransportMaps.Distributions.Examples.Lorenz96.Lorenz96Distributions

#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps.  If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#

import numpy as np
from TransportMaps.Maps.ODEs import AutonomousForwardEulerMap

from TransportMaps.Misc import counted, cached, get_sub_cache
from TransportMaps.Distributions.Decomposable.SequentialInferenceDistributions import \
    Lag1TransitionDistribution, \
    HiddenLag1TransitionTimeHomogeneousMarkovChainDistribution
from TransportMaps.Maps import \
    Map, \
    ListCompositeMap, \
    InverseMap

from TransportMaps.Likelihoods.LikelihoodBase import AdditiveLogLikelihood

try:
    from TransportMaps.Distributions.Examples.Lorenz96.fast_eval import \
        l96mII_evaluate, l96mII_grad_x, l96mII_tuple_grad_x, \
        l96mII_hess_x, l96mII_action_hess_x
    L96MII_FAST_EVAL_FLAG = True
except ImportError:
    L96MII_FAST_EVAL_FLAG = False

__all__ = [
    'Lorenz96DynamicsMap',
    'Lorenz96ForwardEulerMap',
    'Lorenz96MultipleStepsForwardEulerMap',
    'Lorenz96ForwardEulerDistribution',
    'Lorenz96ModalForwardEulerDistribution'
]


[docs]class Lorenz96DynamicsMap(Map): r""" Defines the evolution of the dynamics of the Lorenz-96 (model II) system. Evaluates the right hand side the Lorenz-96 system: .. math:: \dot{x}_n = \sum_{j=-J}^J \sum_{i=-J}^J (-x_{n-2K-i} x_{n-K-j} + x_{n-K+j-i} x_{n+K+j})/K^2 - x_{n} + F where :math:`J=(K-1)/2` and :math:`K` is odd. The model is presented in Lorenz, E. N. (2005). Designing Chaotic Models. Journal of the Atmospheric Sciences, 62(5), 1574–1587. https://doi.org/10.1175/JAS3430.1 For the parameter :math:`K=1`, this reverts to the original Lorenz-96 model Lorenz, E. (1963). Deterministic nonperiodic flow. Journal of the Atmospheric Sciences. https://doi.org/10.1175/1520-0469(1963)020<0130%3ADNF>2.0.CO%3B2 .. document private functions .. automethod:: __init__ """
[docs] def __init__( self, d = 40, K = 1, F = 8. ): r""" Args: d (int): dimension of the state :math:`{\bf x}` K (int): smoothing term :math:`K` (must be odd) F (float): forcing term :math:`F` """ if not L96MII_FAST_EVAL_FLAG: raise ImportError( "TransportMaps was installed without cython support. " + \ "This class requires it." ) if not isinstance(K, int) or K % 2 == 0: raise ValueError( "The smoothing factor K must be an odd integer.") self._K = K self._F = F super(Lorenz96DynamicsMap, self).__init__( dim_in = d, dim_out = d )
@property
[docs] def F(self): return self._F
@counted
[docs] def evaluate( self, x, *args, **kwargs ): return l96mII_evaluate(x, self._K, self._F)
@counted
[docs] def grad_x( self, x, *args, **kwargs ): return l96mII_grad_x(x, self._K, self._F)
@counted
[docs] def tuple_grad_x( self, x, *args, **kwargs ): return l96mII_tuple_grad_x(x, self._K, self._F)
@counted
[docs] def hess_x( self, x, *args, **kwargs ): return l96mII_hess_x(x, self._K, self._F)
@counted
[docs] def action_hess_x( self, x, dx, *args, **kwargs ): return l96mII_action_hess_x(x, dx, self._K, self._F)
# out = np.zeros( (x.shape[0], self.dim_out, self.dim_in) ) # d = self.dim # for i in range(d): # out[:,i,(i-2)%d] += - dx[:,(i-1)%d] / self._K**2 # out[:,i,(i-1)%d] += - dx[:,(i-2)%d] / self._K**2 # out[:,i,(i-1)%d] += dx[:,(i+1)%d] / self._K**2 # out[:,i,(i+1)%d] += dx[:,(i-1)%d] / self._K**2 # return out
[docs]class Lorenz96ForwardEulerMap( AutonomousForwardEulerMap ): r""" Defines the evolution of the Lorenz-96 dynamics for one forward Euler step. Evaluates the Euler step of the Lorenz-96 system: .. math:: \dot{x}_i^{(n+1)} = x_i^{(n)} + \Delta t \cdot (x_{i+1}^{(n)} - x_{i-2}^{(n)}) x_{i-1}^{(n)} / K^2 - x_{i}^{(n)} + F .. document private functions .. automethod:: __init__ """
[docs] def __init__( self, dt, d = 40, K = 1, F = 8. ): r""" Args: dt (float): step site :math:`\Delta t` d (int): dimension of the state :math:`{\bf x}` K (int): smoothing term :math:`K` (must be odd) F (float): forcing term :math:`F` """ super(Lorenz96ForwardEulerMap, self).__init__( dt, Lorenz96DynamicsMap(d, K, F) )
@property
[docs] def K(self): return self._rhs.K
@property
[docs] def F(self): return self._rhs.F
[docs]class Lorenz96MultipleStepsForwardEulerMap( ListCompositeMap ): r""" Defines the evolution of the Lorenz-96 dynamics for :math:`n` forward Euler steps. Evaluates :math:`n` times the Euler step of the Lorenz-96 system: .. math:: \dot{x}_i^{(k)} \mapsto x_i^{(k)} + \Delta t \cdot (x_{i+1}^{(k)} - x_{i-2}^{(k)}) x_{i-1}^{(k)} / K^2 - x_{i}^{(k)} + F .. document private functions .. automethod:: __init__ """
[docs] def __init__( self, n, dt, d = 40, K = 1, F = 8. ): r""" Args: n (int): number :math:`n` of Euler steps. dt (float): step size :math:`\Delta t` d (int): dimension of the state :math:`{\bf x}` K (int): smoothing term :math:`K` (must be odd) F (float): forcing term :math:`F` """ self._l96femap = Lorenz96ForwardEulerMap( dt, d=d, K=K, F=F) super(Lorenz96MultipleStepsForwardEulerMap, self).__init__( [self._l96femap] * n)
@property
[docs] def n(self): return self.n_maps
@property
[docs] def dt(self): return self._l96femap.dt
@property
[docs] def K(self): return self._l96femap.K
@property
[docs] def F(self): return self._l96femap.F
@property
[docs] def l96femap(self): return self._l96femap
[docs]class Lorenz96ForwardEulerDistribution( HiddenLag1TransitionTimeHomogeneousMarkovChainDistribution ): r""" Defines the Hidden Markov Chain distribution defined by the Lorenz-96 dynamics. For the index sets :math:`\Lambda={in : i=0,\ldots}` and :math:`\Xi \subset \Lambda` the model is defined by .. math:: \begin{cases} {\bf x}_{k+n} = g({\bf x}_k) + \varepsilon_{\text{dyn}} & \text{for } k \in \Lambda \\ {\bf y}_{k+n} &= h({\bf x}_{k+n}) + \varepsilon_{\text{noise}} & \text{for } k+n \in \Xi \;, \end{cases} where :math:`g:{\bf x}_k \mapsto {\bf x}_{k+n}` is the map corresponding to :math:`n` Euler steps of the discretized dynamics with time step :math:`\Delta t`, :math:`h` is the observation operator, :math:`\varepsilon_{\text{dyn}} \sim \pi_{\text{dyn}}` is the dynamics noise, :math:`\varepsilon_{\text{noise}} \sim \pi_{\text{noise}}` is the observational noise and the initial conditions :math:`{\bf x}_0` are distributed according to :math:`\pi_{\text{init}}`. """ def __init__( self, # Lorenz parameters n, dt, d, K, F, # Initial conditions pi_init, # Transitions pi_dyn, # Observations obs_map, pi_noise, ): r""" Args: n (int): number :math:`n` of Euler steps defining the map :math:`g` dt (float): step size :math:`\Delta t` d (int): dimension of the state :math:`{\bf x}` K (int): smoothing term :math:`K` (must be odd) F (float): forcing term :math:`F` pi_init (:class:`Distribution<TransportMaps.Distributions.Distribution>`): distribution :math:`\pi_{\text{init}}` of the initial conditions pi_dyn (:class:`Distribution<TransportMaps.Distributions.Distribution>`): distribution :math:`\pi_{\text{dyn}}` of the dynamics noise obs_map (:class:`Map<TransportMaps.Maps.Map>`): observation operator :math:`h` pi_noise (:class:`Distribution<TransportMaps.Distributions.Distribution>`): distribution :math:`\pi_{\text{noise}}` of the observations noise """ # Set up the transition distribution self._l96map = Lorenz96MultipleStepsForwardEulerMap( n=n, dt=dt, d=d, K=K, F=F) # Set up the components of the likelihoods self._obs_map = obs_map self._pi_noise = pi_noise # Tracking time and states (optional) self._t = [] self._xs = [] # Call constructor super(Lorenz96ForwardEulerDistribution, self).__init__( pi_init, self._l96map, pi_dyn, pi_list=[], pi_hyper=None, ll_list=[]) @property
[docs] def n(self): return self._l96map.n
@property
[docs] def dt(self): return self._l96map.dt
@property
[docs] def K(self): return self._l96map.K
@property
[docs] def F(self): return self._l96map.F
@property
[docs] def t(self): return self._t
@property
[docs] def t_obs(self): return [ t for t, ll in zip(self.t, self.ll_list) if ll is not None ]
@property
[docs] def xs(self): return self._xs
@property
[docs] def l96map(self): return self._l96map
@property
[docs] def obs_map(self): return self._obs_map
[docs] def step(self, y, x=None): r""" Add a step to the hidden Markov Chain. Args: y: observation. ``None`` if missing. x: state if known (for diagnostic) """ if y is not None: ll = AdditiveLogLikelihood( y, self._pi_noise, self._obs_map) else: ll = None # Update time and (optional) state self._t.append( self._t[-1] + self.n * self.dt if self.nsteps > 0 else 0. ) self._xs.append( x ) # Append self.append( ll )
[docs]class Lorenz96ModalForwardEulerDistribution( Lorenz96ForwardEulerDistribution ): r""" Defines the Hidden Markov Chain distribution defined by the Lorenz-96 dynamics in modal form. For the index sets :math:`\Lambda={in : i=0,\ldots}` and :math:`\Xi \subset \Lambda` the model is defined by .. math:: \begin{cases} {\bf b}_{k+n} = P_r^{-1} \circ g \circ P_r({\bf b}_k) + \varepsilon^r_{\text{dyn}} & \text{for } k \in \Lambda \\ {\bf y}_{k+n} &= h\circ P_r({\bf b}_{k+n}) + \varepsilon_{\text{noise}} & \text{for } k+n \in \Xi \;, \end{cases} where :math:`g:{\bf x}_k \mapsto {\bf x}_{k+n}` is the map corresponding to :math:`n` Euler steps of the discretized dynamics with time step :math:`\Delta t`, :math:`P_r:\mathbb{R}^d\rightarrow\mathbb{R}^r` is a projection operator onto the modal subspace (:math:`r\leq d`), :math:`h` is the observation operator, :math:`\varepsilon^r_{\text{dyn}} \sim \pi^r_{\text{dyn}}` is the noise of the modal dynamics, :math:`\varepsilon_{\text{noise}} \sim \pi_{\text{noise}}` is the observational noise and the initial conditions :math:`{\bf b}_0` are distributed according to :math:`\pi^r_{\text{init}}`. """ def __init__( self, # Lorenz parameters n, dt, d, K, F, # Modal parameters modal_map, # Initial conditions pi_init, # Transitions pi_dyn, # Observations obs_map, pi_noise, ): r""" Args: n (int): number :math:`n` of Euler steps defining the map :math:`g` dt (float): step size :math:`\Delta t` d (int): dimension of the state :math:`{\bf x}` K (int): smoothing term :math:`K` (must be odd) F (float): forcing term :math:`F` modal_map (:class:`Map<TransportMaps.Maps.Map>`): projection operator :math:`P_r` pi_init (:class:`Distribution<TransportMaps.Distributions.Distribution>`): distribution :math:`\pi_{\text{init}}` of the initial conditions pi_dyn (:class:`Distribution<TransportMaps.Distributions.Distribution>`): distribution :math:`\pi_{\text{dyn}}` of the dynamics noise obs_map (:class:`Map<TransportMaps.Maps.Map>`): observation operator :math:`h` pi_noise (:class:`Distribution<TransportMaps.Distributions.Distribution>`): distribution :math:`\pi_{\text{noise}}` of the observations noise """ # Set up the transition distribution self._l96map = Lorenz96MultipleStepsForwardEulerMap( n=n, dt=dt, d=d, K=K, F=F) self._modal_map = modal_map F = ListCompositeMap( [ InverseMap( modal_map ), self._l96map, modal_map ] ) # Set up the distribution of the initial conditions pi_init = pi_init # Set up the components of the likelihoods self._obs_map = ListCompositeMap( [obs_map, modal_map] ) self._pi_noise = pi_noise # Tracking time and states (optional) self._t = [] self._xs = [] self._bs = [] # Call constructor HiddenLag1TransitionTimeHomogeneousMarkovChainDistribution.__init__( self, pi_init, F, pi_dyn, pi_list=[], pi_hyper=None, ll_list=[]) @property
[docs] def modal_map(self): return self._modal_map
@property
[docs] def obs_map(self): return self._obs_map.tm_list[0]
@property
[docs] def bs(self): return self._bs
[docs] def step(self, y, x=None): r""" Add a step to the hidden Markov Chain. Args: y: observation. ``None`` if missing. x: state if known (for diagnostic) """ super(Lorenz96ModalForwardEulerDistribution, self).step(y, x=x) # Update modal truth if any b = self._modal_map.inverse(x[np.newaxis,:])[0,:] if x is not None else None self._bs.append( b )