#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps. If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#
import numpy as np
from TransportMaps.Misc import counted, cached
from TransportMaps.Maps import TransportMap
from TransportMaps.Maps import ListCompositeTransportMap
__all__ = ['LiftedTransportMap', 'SequentialMarkovChainTransportMap']
[docs]class LiftedTransportMap(TransportMap):
r""" Given a map :math:`T` of dimension :math:`d_\theta + 2 d_{\bf x}`, where :math:`d_\theta` is the number of hyper-parameters and :math:`d_{\bf x}` is the state dimension, lift it to a :math:`\hat{d}` state dimensional map.
Let
.. math::
T(\Theta, {\bf x}) = \begin{bmatrix}
T^{(0)}(\Theta) \\
T^{(1)}(\Theta, {\bf x}) \\
\end{bmatrix}
be the map to be lifted at index :math:`i` into a :math:`\hat{d}` dimensional map.
.. math::
T_{\rm lift}(\Theta, {\bf x}) = \left[ \begin{array}{c}
T^{(0)}(\theta) \\
x_{1} \\
\; \vdots \\
x_{i-1} \\
T^{(1)}(\theta, x_{i}, \ldots, x_{i+2 d_{\bf x}}) \\
x_{i+2d_{\bf x}+1} \\
\; \vdots \\
x_{\hat{d}}
\end{array} \right]
Args:
idx (int): index where to lift :math:`T`
tm (:class:`TransportMap<TransportMap>`): transport map :math:`T`
dim (int): total dimension :math:`\hat{d}`
hyper_dim (int): number of hyper-parameters :math:`d_\theta`
"""
def __init__(self, idx, tm, dim, hyper_dim):
super(LiftedTransportMap, self).__init__(dim_in=dim, dim_out=dim)
if idx >= 0:
self.hyper_dim = hyper_dim
self.state_dim = (tm.dim-hyper_dim)//2
self.tm = tm
# Instantiate index mappings
self.hyper_idxs = np.arange(hyper_dim)
self.state_idxs = np.arange(
hyper_dim + idx * self.state_dim,
hyper_dim + (idx+2) * self.state_dim)
self.idxs = np.hstack( (self.hyper_idxs, self.state_idxs) )
elif idx == -1: # Filtering map at first step
self.hyper_dim = hyper_dim
self.state_dim = tm.dim-hyper_dim
self.tm = tm
# Instantiate index mappings
self.hyper_idxs = np.arange(hyper_dim)
self.state_idxs = np.arange(
hyper_dim + (idx+1) * self.state_dim,
hyper_dim + (idx+2) * self.state_dim)
self.idxs = np.hstack( (self.hyper_idxs, self.state_idxs) )
else:
raise ValueError("idx must be >= -1")
[docs] def get_ncalls_tree(self, indent=""):
out = super(LiftedTransportMap, self).get_ncalls_tree(indent)
out += self.tm.get_ncalls_tree(indent + " ")
return out
[docs] def get_nevals_tree(self, indent=""):
out = super(LiftedTransportMap, self).get_nevals_tree(indent)
out += self.tm.get_nevals_tree(indent + " ")
return out
[docs] def get_teval_tree(self, indent=""):
out = super(LiftedTransportMap, self).get_teval_tree(indent)
out += self.tm.get_teval_tree(indent + " ")
return out
[docs] def update_ncalls_tree(self, obj):
super(LiftedTransportMap, self).update_ncalls_tree(obj)
self.tm.update_ncalls_tree(obj.tm)
[docs] def update_nevals_tree(self, obj):
super(LiftedTransportMap, self).update_nevals_tree(obj)
self.tm.update_nevals_tree(obj.tm)
[docs] def update_teval_tree(self, obj):
super(LiftedTransportMap, self).update_teval_tree(obj)
self.tm.update_teval_tree(obj.tm)
[docs] def reset_counters(self):
super(LiftedTransportMap, self).reset_counters()
self.tm.reset_counters()
@property
[docs] def n_coeffs(self):
r""" Returns the total number of coefficients.
Returns:
(:class:`int`) -- total number :math:`N` of
coefficients characterizing the map.
"""
return self.tm.n_coeffs
@property
[docs] def coeffs(self):
r""" Returns the actual value of the coefficients.
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`N`]) -- coefficients.
"""
return self.tm.coeffs
@coeffs.setter
def coeffs(self, coeffs):
r""" Set the coefficients.
Args:
coeffs (:class:`ndarray<numpy.ndarray>` [:math:`N`]):
coefficients for the various maps
"""
self.tm.coeffs = coeffs
@cached([('tm',None)])
@counted
[docs] def evaluate(self, x, precomp=None, idxs_slice=slice(None), cache=None):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
try:
tm_cache = cache['tm_cache']
except TypeError:
tm_cache = None
out = x.copy()
out[:,self.idxs] = self.tm.evaluate(x[:,self.idxs], precomp=precomp,
idxs_slice=idxs_slice, cache=tm_cache)
return out
@counted
[docs] def inverse(self, x, *args, **kwargs):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
out = x.copy()
out[:,self.idxs] = self.tm.inverse(x[:,self.idxs], *args, **kwargs)
return out
@cached([('tm',None)])
@counted
[docs] def grad_x(self, x, precomp=None, idxs_slice=slice(None), cache=None):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
try:
tm_cache = cache['tm_cache']
except TypeError:
tm_cache = None
m = x.shape[0]
out = np.zeros((m, self.dim, self.dim))
out[:,range(self.dim),range(self.dim)] = 1.
out[np.ix_(range(m),self.idxs, self.idxs)] = self.tm.grad_x(
x[:,self.idxs], precomp=precomp,
idxs_slice=idxs_slice, cache=tm_cache)
return out
@cached([('tm',None)],False)
@counted
[docs] def hess_x(self, x, precomp=None, idxs_slice=slice(None), cache=None):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
try:
tm_cache = cache['tm_cache']
except TypeError:
tm_cache = None
m = x.shape[0]
out = np.zeros((m, self.dim, self.dim, self.dim))
out[np.ix_(range(m),self.idxs, self.idxs, self.idxs)] = \
self.tm.hess_x(x[:,self.idxs], precomp=precomp,
idxs_slice=idxs_slice, cache=tm_cache)
return out
@counted
[docs] def grad_x_inverse(self, x, *args, **kwargs):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
m = x.shape[0]
out = np.zeros((m, self.dim, self.dim))
out[:,range(self.dim),range(self.dim)] = 1.
out[np.ix_(range(m),self.idxs, self.idxs)] = self.tm.grad_x_inverse(
x[:,self.idxs], *args, **kwargs)
return out
@counted
[docs] def hess_x_inverse(self, x, *args, **kwargs):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
m = x.shape[0]
out = np.zeros((m, self.dim, self.dim, self.dim))
out[np.ix_(range(m),self.idxs, self.idxs,self.idxs)] = \
self.tm.hess_x_inverse(
x[:,self.idxs], *args, **kwargs)
return out
@cached([('tm',None)])
@counted
[docs] def log_det_grad_x(self, x, precomp=None, idxs_slice=slice(None), cache=None):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
try:
tm_cache = cache['tm_cache']
except TypeError:
tm_cache = None
return self.tm.log_det_grad_x(x[:,self.idxs], precomp=precomp,
idxs_slice=idxs_slice, cache=tm_cache)
@cached([('tm',None)])
@counted
[docs] def grad_x_log_det_grad_x(self, x, precomp=None, idxs_slice=slice(None), cache=None):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
try:
tm_cache = cache['tm_cache']
except TypeError:
tm_cache = None
out = np.zeros((x.shape[0], self.dim))
out[:,self.idxs] = self.tm.grad_x_log_det_grad_x(
x[:,self.idxs], precomp=precomp,
idxs_slice=idxs_slice, cache=tm_cache)
return out
@cached([('tm',None)],False)
@counted
[docs] def hess_x_log_det_grad_x(self, x, precomp=None, idxs_slice=slice(None), cache=None):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
try:
tm_cache = cache['tm_cache']
except TypeError:
tm_cache = None
m = x.shape[0]
out = np.zeros((m, self.dim, self.dim))
out[np.ix_(range(m),self.idxs, self.idxs)] = \
self.tm.hess_x_log_det_grad_x(
x[:,self.idxs], precomp=precomp,
idxs_slice=idxs_slice, cache=tm_cache)
return out
@counted
[docs] def log_det_grad_x_inverse(self, x, *args, **kwargs):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
return self.tm.log_det_grad_x_inverse(x[:,self.idxs], *args, **kwargs)
@counted
[docs] def grad_x_log_det_grad_x_inverse(self, x, *args, **kwargs):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
out = np.zeros((x.shape[0], self.dim))
out[:,self.idxs] = self.tm.grad_x_log_det_grad_x_inverse(
x[:,self.idxs], *args, **kwargs)
return out
@counted
[docs] def hess_x_log_det_grad_x_inverse(self, x, *args, **kwargs):
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
m = x.shape[0]
out = np.zeros((m, self.dim, self.dim))
out[np.ix_(range(m),self.idxs, self.idxs)] = \
self.tm.hess_x_log_det_grad_x_inverse(
x[:,self.idxs], *args, **kwargs)
return out
[docs]class SequentialMarkovChainTransportMap(ListCompositeTransportMap):
r""" Compose the lower triangular 1-lag smoothing maps into the smoothing map
Args:
tm_list (list): list of 1-lag smoothing lower triangular transport maps
hyper_dim (int): number of hyper-parameters
.. warning:: this works only for one dimensional states!
It will be extended for higher dimensional states in the future.
"""
def __init__(self, tm_list, hyper_dim):
self.hyper_dim = hyper_dim
if len(tm_list) == 0:
self.state_dim = 0
else:
self.state_dim = (tm_list[0].dim-hyper_dim) // 2
lifted_dim = self.hyper_dim + (len(tm_list)+1) * self.state_dim
self.comp_list = tm_list
lifted_maps = [LiftedTransportMap(d, tm, lifted_dim, self.hyper_dim) for
d, tm in enumerate(tm_list)]
super(SequentialMarkovChainTransportMap, self).__init__(map_list=lifted_maps)