#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps. If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#
import numpy as np
import scipy.linalg as scila
from ..Misc import \
required_kwargs, \
deprecate, \
counted, cached
from .TriangularComponentwiseMapBase import TriangularComponentwiseMap
from .ComponentwiseTransportMapBase import ComponentwiseTransportMap
__all__ = [
'TriangularComponentwiseTransportMap',
# Deprecated
'TriangularTransportMap',
'MonotonicTriangularTransportMap'
]
nax = np.newaxis
[docs]class TriangularComponentwiseTransportMap(
TriangularComponentwiseMap,
ComponentwiseTransportMap
):
r""" Triangular transport map :math:`T({\bf x})=[T_1,T_2,\ldots,T_{d_x}]^\top`, where :math:`T_i(x_{1:i}):\mathbb{R}^i\rightarrow\mathbb{R}`.
"""
@required_kwargs('active_vars', 'approx_list')
def __init__(self, **kwargs):
r"""
Kwargs:
active_vars (:class:`list<list>` [:math:`d_x`] of :class:`list<list>`): for
each dimension lists the active variables.
approx_list (:class:`list<list>` [:math:`d_x`] of :class:`MonotoneFunctional<TransportMaps.Maps.Functionals.MonotoneFunctional>`):
list of monotone functionals for each dimension
"""
super(TriangularComponentwiseTransportMap,self).__init__(**kwargs)
@cached([('components','dim_out')])
@counted
[docs] def log_det_grad_x(self, x, precomp=None, idxs_slice=slice(None), cache=None):
r""" Compute: :math:`\log \det \nabla_{\bf x} T({\bf x})`.
Since the map is lower triangular,
.. math::
\log \det \nabla_{\bf x} T({\bf x}) = \sum_{k=1}^d \log \partial_{{\bf x}_k} T_k({\bf x}_{1:k})
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
precomp (:class:`dict<dict>`): dictionary of precomputed values
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
cache (:class:`dict<dict>`): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m`]) --
:math:`\log \det \nabla_{\bf x} T({\bf x})` at every
evaluation point
Raises:
ValueError: if :math:`d` does not match the dimension of the transport map.
"""
if precomp is None:
idxs_slice = slice(None)
precomp = {'components': [{} for i in range(self.dim_out)]}
self.precomp_partial_xd(x, precomp)
if x.shape[1] != self.dim_in:
raise ValueError("dimension mismatch")
pxd = self.partial_xd(x, precomp=precomp, idxs_slice=idxs_slice, cache=cache)
out = np.sum(np.log(pxd),axis=1)
return out
@counted
[docs] def log_det_grad_x_inverse(self, x, precomp=None, idxs_slice=slice(None)):
r""" Compute: :math:`\log \det \nabla_{\bf x} T^{-1}({\bf x})`.
Since the map is lower triangular,
.. math::
\log \det \nabla_{\bf y} T^{-1}({\bf x}) = \sum_{k=1}^d \log \partial_{{\bf x}_k} T^{-1}_k({\bf y}_{1:k})
For :math:`{\bf x} = T^{-1}({\bf y})`,
.. math::
\log \det \nabla_{\bf y} T^{-1}({\bf x}) = - \sum_{k=1}^d \log \partial_{{\bf x}_k} T_k({\bf x}_{1:k})
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
precomp (:class:`dict<dict>`): dictionary of precomputed values
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m`]) --
:math:`\log \det \nabla_{\bf x} T({\bf x})` at every
evaluation point
Raises:
ValueError: if :math:`d` does not match the dimension of the transport map.
"""
try:
xinv = precomp['xinv']
except (TypeError, KeyError):
xinv = self.inverse(x, precomp)
return - self.log_det_grad_x( xinv )
@counted
[docs] def grad_x_log_det_grad_x(self, x, precomp=None, idxs_slice=slice(None),
*args, **kwargs):
r""" Compute: :math:`\nabla_{\bf x} \log \det \nabla_{\bf x} T({\bf x})`
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
precomp (:class:`dict<dict>`): dictionary of precomputed values
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d`]) --
:math:`\nabla_{\bf x} \log \det \nabla_{\bf x} T({\bf x})`
at every evaluation point
Raises:
ValueError: if :math:`d` does not match the dimension of the transport map.
.. seealso:: :func:`log_det_grad_x`.
"""
if precomp is None:
idxs_slice = slice(None)
precomp = {'components': [{} for i in range(self.dim)]}
self.precomp_grad_x_partial_xd(x, precomp)
if x.shape[1] != self.dim:
raise ValueError("dimension mismatch")
out = np.zeros((x.shape[0], self.dim))
for k,(a,avar,p) in enumerate(zip(self.approx_list,self.active_vars,
precomp['components'])):
out[:,avar] += a.grad_x_partial_xd(x[:,avar], p)[:,0,:] / \
a.partial_xd(x[:,avar], p)[:,0,nax]
return out
@counted
[docs] def hess_x_log_det_grad_x(self, x, precomp=None, idxs_slice=slice(None),
*args, **kwargs):
r""" Compute: :math:`\nabla^2_{\bf x} \log \det \nabla_{\bf x} T({\bf x})`
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
precomp (:class:`dict<dict>`): dictionary of precomputed values
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d,d`]) --
:math:`\nabla^2_{\bf x} \log \det \nabla_{\bf x} T({\bf x})`
at every evaluation point
Raises:
ValueError: if :math:`d` does not match the dimension of the transport map.
.. seealso:: :func:`log_det_grad_x` and :func:`grad_x_log_det_grad_x`.
"""
if precomp is None:
idxs_slice = slice(None)
precomp = {'components': [{} for i in range(self.dim)]}
self.precomp_hess_x_partial_xd(x, precomp)
out = np.zeros((x.shape[0], self.dim, self.dim))
for k,(a,avar,p) in enumerate(zip(self.approx_list, self.active_vars,
precomp['components'])):
# 2d numpy advanced indexing
nvar = len(avar)
rr,cc = np.meshgrid(avar,avar)
rr = list( rr.flatten() )
cc = list( cc.flatten() )
idxs = (slice(None), rr, cc)
# Compute hess_x_partial_xd
dxk = a.partial_xd(x[:,avar], p)[:,0]
out[idxs] += (
a.hess_x_partial_xd(x[:,avar], p)[:,0,:,:] / \
dxk[:,nax,nax]
).reshape((x.shape[0],nvar**2))
dxdxkT = a.grad_x_partial_xd(x[:,avar], p)[:,0,:]
dxdxkT2 = dxdxkT[:,:,nax] * dxdxkT[:,nax,:]
out[idxs] -= (dxdxkT2 / (dxk**2.)[:,nax,nax]).reshape((x.shape[0],nvar**2))
return out
@counted
[docs] def action_hess_x_log_det_grad_x(self, x, dx, precomp=None, idxs_slice=slice(None),
*args, **kwargs):
r""" Compute: :math:`\langle\nabla^2_{\bf x} \log \det \nabla_{\bf x} T({\bf x}), \delta{\bf x}\rangle`
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
dx (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): direction
on which to evaluate the Hessian
precomp (:class:`dict<dict>`): dictionary of precomputed values
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d`]) --
:math:`\langle\nabla^2_{\bf x} \log \det \nabla_{\bf x} T({\bf x}), \delta{\bf x}\rangle`
at every evaluation point
Raises:
ValueError: if :math:`d` does not match the dimension of the transport map.
.. seealso:: :func:`log_det_grad_x` and :func:`grad_x_log_det_grad_x`.
"""
if precomp is None:
idxs_slice = slice(None)
precomp = {'components': [{} for i in range(self.dim)]}
self.precomp_hess_x_partial_xd(x, precomp)
m = x.shape[0]
out = np.zeros((m, self.dim))
for k,(a,avar,p) in enumerate(zip(self.approx_list, self.active_vars,
precomp['components'])):
dxk = a.partial_xd(x[:,avar], p)[:,0] # m
out[:,avar] += np.einsum(
'...ij,...j->...i',
a.hess_x_partial_xd(x[:,avar], p)[:,0,:,:],
dx[:,avar]
) / dxk[:,nax]
dxdxkT = a.grad_x_partial_xd(x[:,avar], p)[:,0,:] # m x navar
tmp = np.einsum('ij,ij->i', dxdxkT, dx[:,avar])
out[:,avar] -= dxdxkT * tmp[:,nax] / (dxk**2.)[:,nax]
return out
@counted
[docs] def inverse(self, x, precomp=None, idxs_slice=slice(None), *args, **kwargs):
r""" Compute: :math:`T^{-1}({\bf y})`
If the map has more input than outputs :math:`d_{\rm in} > d_{\rm out}`,
it consider the first :math:`d_{\rm in} - d_{\rm out}` values in ``x``
to be already inverted values and feed them to the following approximations
to find the inverse.
If ``x`` has :math:`d < d_{\rm in}`, performs the inversion only on
the :math:`d` dimensional head of the map.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
precomp (:class:`dict<dict>`): dictionary of precomputed values
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d`]) --
:math:`T^{-1}({\bf y})` for every evaluation point
Raises:
ValueError: if :math:`d_{\rm in} < d_{\rm out}`
"""
if precomp is None:
idxs_slice = slice(None)
precomp = {}
# Evaluation
d = x.shape[1]
if d > self.dim_in:
raise ValueError("dimension mismatch")
xout = np.zeros(x.shape)
skip_dim = self.dim_in - self.dim_out
if skip_dim < 0:
raise ValueError("The map has more output than inputs")
xout[:,:skip_dim] = x[:,:skip_dim]
for i in range(x.shape[0]):
for k, (a,avar) in enumerate(zip(self.approx_list,self.active_vars)):
if avar[-1] == d:
break # Terminate once d is reached
xout[i,skip_dim+k] = a.inverse(xout[i,avar[:-1]], x[i,skip_dim+k])
return xout
@counted
[docs] def grad_x_inverse(self, x, precomp=None, idxs_slice=slice(None), *args, **kwargs):
r""" Compute :math:`\nabla_{\bf x} T^{-1}({\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
precomp (:class:`dict<dict>`): dictionary of precomputed values
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d,d`]) --
gradient matrices for every evaluation point.
Raises:
NotImplementedError: to be implemented in subclasses
"""
try:
xinv = precomp['xinv']
except (TypeError, KeyError):
xinv = self.inverse(x, precomp)
gx = self.grad_x(xinv)
gx_inv = np.zeros((xinv.shape[0], self.dim, self.dim))
for i in range(xinv.shape[0]):
gx_inv[i,:,:] = scila.solve_triangular(gx[i,:,:], np.eye(self.dim), lower=True)
return gx_inv
##############
# DEPRECATED #
##############
[docs]class TriangularTransportMap(
TriangularComponentwiseTransportMap
):
@deprecate(
'TriangularTransportMap',
'3.0',
'Use Maps.TriangularComponentwiseTransportMap instead'
)
def __init__(self, active_vars, approx_list):
super(TriangularTransportMap, self).__init__(
active_vars=active_vars,
approx_list=approx_list
)
[docs]class MonotonicTriangularTransportMap(
TriangularComponentwiseTransportMap
):
@deprecate(
'MonotonicTriangularTransportMap',
'3.0',
'Use Maps.TriangularComponentwiseTransportMap instead'
)
def __init__(self, active_vars, approx_list):
super(MonotonicTriangularTransportMap, self).__init__(
active_vars=active_vars,
approx_list=approx_list
)