#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps. If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#
import numpy as np
import scipy.stats as stats
import scipy.linalg as scila
from TransportMaps.Misc import counted, cached
from .ConditionalDistributionBase import ConditionalDistribution
from .FrozenDistributions import NormalDistribution
__all__ = [
'ConditionallyNormalDistribution',
'MeanConditionallyNormalDistribution',
# Deprecated
'ConditionallyGaussianDistribution',
'MeanConditionallyGaussianDistribution',
]
nax = np.newaxis
[docs]class ConditionallyNormalDistribution(ConditionalDistribution):
r""" Multivariate Gaussian distribution :math:`\pi({\bf x}\vert{\bf y}) \sim \mathcal{N}(\mu({\bf y}), \Sigma({\bf y}))`
Args:
mu (:class:`Map<TransportMaps.Maps.Map>`): mean vector map
sigma (:class:`Map<TransportMaps.Maps.Map>`): covariance matrix map
precision (:class:`Map<TransportMaps.Maps.Map>`): precision matrix map
coeffs (:class:`ndarray<numpy.ndarray>`): fix the coefficients :math:`{\bf y}`
"""
def __init__(self, mu, sigma=None, precision=None, coeffs=None):
if (sigma is not None) and (precision is not None):
raise ValueError("The fields sigma and precision are mutually " +
"exclusive")
if sigma is not None and mu.dim_in != sigma.dim_in:
raise ValueError("The number of parameters must be the same for both " + \
"the map mu and the map sigma")
if precision is not None and mu.dim_in != precision.dim_in:
raise ValueError("The number of parameters must be the same for both " + \
"the map mu and the map precision")
self._muMap = mu
self._sigmaMap = sigma
self._precisionMap = precision
if sigma is not None:
self._isSigmaOn = True
else:
self._isSigmaOn = False
self._mu = None
self._sigma = None
self._precision = None
self._pi = NormalDistribution(np.zeros(mu.dim_out), np.eye(mu.dim_out))
super(ConditionallyNormalDistribution,self).__init__(mu.dim_out, mu.dim_in)
self._coeffs = None
self.coeffs = coeffs
# Gradients
self._grad_a_mu = None
self._grad_a_T = None
self._grad_a_precision = None
self._grad_a_sigma = None
self._grad_a_c = None
[docs] def get_ncalls_tree(self, indent=""):
out = super(ConditionallyNormalDistribution, self).get_ncalls_tree(indent)
out += self._pi.get_ncalls_tree(indent + ' ')
out += self._muMap.get_ncalls_tree(indent + ' ')
if self._sigmaMap is not None:
out += self._sigmaMap.get_ncalls_tree(indent + ' ')
if self._precisionMap is not None:
out += self._precisionMap.get_ncalls_tree(indent + ' ')
return out
[docs] def get_nevals_tree(self, indent=""):
out = super(ConditionallyNormalDistribution, self).get_nevals_tree(indent)
out += self._pi.get_nevals_tree(indent + ' ')
out += self._muMap.get_nevals_tree(indent + ' ')
if self._sigmaMap is not None:
out += self._sigmaMap.get_nevals_tree(indent + ' ')
if self._precisionMap is not None:
out += self._precisionMap.get_nevals_tree(indent + ' ')
return out
[docs] def get_teval_tree(self, indent=""):
out = super(ConditionallyNormalDistribution, self).get_teval_tree(indent)
out += self._pi.get_teval_tree(indent + ' ')
out += self._muMap.get_teval_tree(indent + ' ')
if self._sigmaMap is not None:
out += self._sigmaMap.get_teval_tree(indent + ' ')
if self._precisionMap is not None:
out += self._precisionMap.get_teval_tree(indent + ' ')
return out
[docs] def update_ncalls_tree(self, obj):
super(ConditionallyNormalDistribution, self).update_ncalls_tree(obj)
self._pi.update_ncalls_tree(obj._pi)
self._muMap.update_ncalls_tree(obj._muMap)
if self._sigmaMap is not None:
self._sigmaMap.update_ncalls_tree(obj._sigmaMap)
if self._precisionMap is not None:
self._precisionMap.update_ncalls_tree(obj._precisionMap)
[docs] def update_nevals_tree(self, obj):
super(ConditionallyNormalDistribution, self).update_nevals_tree(obj)
self._pi.update_nevals_tree(obj._pi)
self._muMap.update_nevals_tree(obj._muMap)
if self._sigmaMap is not None:
self._sigmaMap.update_nevals_tree(obj._sigmaMap)
if self._precisionMap is not None:
self._precisionMap.update_nevals_tree(obj._precisionMap)
[docs] def update_teval_tree(self, obj):
super(ConditionallyNormalDistribution, self).update_teval_tree(obj)
self._pi.update_teval_tree(obj._pi)
self._muMap.update_teval_tree(obj._muMap)
if self._sigmaMap is not None:
self._sigmaMap.update_teval_tree(obj._sigmaMap)
if self._precisionMap is not None:
self._precisionMap.update_teval_tree(obj._precisionMap)
[docs] def reset_counters(self):
super(ConditionallyNormalDistribution, self).reset_counters()
self._pi.reset_counters()
self._muMap.reset_counters()
if self._sigmaMap is not None:
self._sigmaMap.reset_counters()
if self._precisionMap is not None:
self._precisionMap.reset_counters()
@property
[docs] def pi(self):
return self._pi
@property
[docs] def mu(self):
return self._pi.mu
@property
[docs] def sigma(self):
return self._pi.sigma
@property
[docs] def precision(self):
return self._pi.precision
@property
[docs] def muMap(self):
return self._muMap
@property
[docs] def sigmaMap(self):
return self._sigmaMap
@property
[docs] def precisionMap(self):
return self._precisionMap
@property
[docs] def coeffs(self):
return self._coeffs
@coeffs.setter
def coeffs(self, coeffs):
if coeffs is None:
self._coeffs = None
elif self._coeffs is None or np.any(self._coeffs != coeffs):
# Set up Gaussian distribution
mu = self._muMap.evaluate(coeffs[nax,:])[0,:]
sigma = None
precision = None
if self._isSigmaOn:
sigma = self._sigmaMap.evaluate(
coeffs[nax,:])[0,:,:]
self._precision = None
else:
precision = self._precisionMap.evaluate(
coeffs[nax,:])[0,:,:]
self._sigma = None
# Set up gradients
try:
self._grad_a_mu = self._muMap.grad_x(coeffs[nax,:])[0,:,:]
if self._isSigmaOn:
self._grad_a_sigma = \
self._sigmaMap.grad_x(coeffs[nax,:])[0,:,:,:]
self._grad_a_precision = None
else:
self._grad_a_precision = \
self._precisionMap.grad_x(coeffs[nax,:])[0,:,:,:]
self._grad_a_sigma = None
except NotImplementedError:
self._grad_a_c = None
self._grad_a_T = None
try:
self._hess_a_mu = self._muMap.hess_x(
coeffs[nax,:])[0,:,:,:]
if self._isSigmaOn:
self._hess_a_sigma = \
self._sigmaMap.hess_x(coeffs[nax,:])[0,:,:,:,:]
self._hess_a_precision = None
else:
self._hess_a_precision = \
self._precisionMap.hess_x(coeffs[nax,:])[0,:,:,:,:]
self._hess_a_sigma = None
except NotImplementedError:
self._hess_a_c = None
self._hess_a_T = None
if self._pi is None:
self._pi = NormalDistribution(mu, sigma=sigma, precision=precision)
else:
self._pi.mu = mu
if self._isSigmaOn:
self._pi.sigma = sigma
else:
self._pi.precision = precision
self._coeffs = coeffs
@property
[docs] def grad_a_mu(self):
return self._grad_a_mu
@property
[docs] def grad_a_sigma(self):
return self._grad_a_sigma
@property
[docs] def grad_a_precision(self):
return self._grad_a_precision
[docs] def rvs(self, m, y=None, **kwargs):
r""" Generate :math:`m` samples from the distribution.
Args:
m (int): number of samples to generate
y (:class:`ndarray<numpy.ndarray>` [:math:`d_y`]): conditioning values
:math:`{\bf Y}={\bf y}`
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d`]) -- :math:`m`
:math:`d`-dimensional samples
Raises:
NotImplementedError: the method needs to be defined in the sub-classes
"""
if self._coeffs is None:
self.pi.mu = self._muMap.evaluate(y[nax,:])[0,:]
if self._isSigmaOn:
self.pi.sigma = self._sigmaMap.evaluate(y[nax,:])[0,:,:]
else:
self.pi.precision = self._precisionMap.evaluate(y[nax,:])[0,:,:]
return self.pi.rvs(m)
@cached([("pi",None),("mu",None),("sigma",None),("precision",None)])
@counted
[docs] def log_pdf(self, x, y, params=None, idxs_slice=slice(None,None,None), cache=None):
r""" Evaluate :math:`\log \pi({\bf x}\vert{\bf y})`
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
y (:class:`ndarray<numpy.ndarray>` [:math:`m,d_y`] or :class:`ndarray<numpy.ndarray>` [:math:`d_y`]):
conditioning values :math:`{\bf Y}={\bf y}`. In the second case one
conditioning value is used for all the :math:`m` points :math:`{\bf x}`
params (dict): parameters
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m`]) -- values of :math:`\log\pi`
at the ``x`` points.
"""
if self._coeffs is None:
m = x.shape[0]
try:
pi_cache = cache['pi_cache']
mu_cache = cache['mu_cache']
sigma_cache = cache['sigma_cache']
precision_cache = cache['precision_cache']
except TypeError:
pi_cache = None
mu_cache = None
sigma_cache = None
precision_cache = None
mu = self._muMap.evaluate(y, cache=mu_cache)
if self._isSigmaOn:
sigma = self._sigmaMap.evaluate(y, cache=sigma_cache)
else:
precision = self._precisionMap.Evaluate(y, cache=precision_cache)
out = np.zeros(m)
for i in range(m):
self.pi.mu = mu[i]
if self._isSigmaOn:
self.pi.sigma = sigma[i]
else:
self.pi.precision = precision[i]
out[i] = self.pi.log_pdf(
x[[i],:], params=params, idxs_slice=idxs_slice,
cache=pi_cache)
else:
try:
pi_cache = cache['pi_cache']
except TypeError:
pi_cache = None
out = self.pi.log_pdf(x, params=params, idxs_slice=idxs_slice,
cache=pi_cache)
return out
@cached([("pi",None),("mu",None),("sigma",None),("precision",None)])
@counted
[docs] def grad_x_log_pdf(self, x, y, params=None, idxs_slice=slice(None,None,None),
cache=None):
r""" Evaluate :math:`\nabla_{\bf x,y} \log \pi({\bf x}\vert{\bf y})`
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
y (:class:`ndarray<numpy.ndarray>` [:math:`m,d_y`]): conditioning values
:math:`{\bf Y}={\bf y}`
params (dict): parameters
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d`]) -- values of
:math:`\nabla_x\log\pi` at the ``x`` points.
"""
if self._coeffs is None:
try:
pi_cache = cache['pi_cache']
mu_cache = cache['mu_cache']
sigma_cache = cache['sigma_cache']
precision_cache = cache['precision_cache']
except TypeError:
pi_cache = None
mu_cache = None
sigma_cache = None
precision_cache = None
m = x.shape[0]
mu_all = self._muMap.evaluate(y, cache=mu_cache)
ga_mu_all = self._muMap.grad_x(y, cache=mu_cache)
if self._isSigmaOn:
sigma_all = self._sigmaMap.evaluate(y, cache=sigma_cache)
ga_sigma_all = self._sigmaMap.evaluate(y, cache=sigma_cache)
else:
precision_all = self._precisionMap.Evaluate(y, cache=precision_cache)
raise NotImplementedError("There are some parts missing for this gradient")
out = np.zeros((m, x.shape[1]+y.shape[1]))
for i in range(m):
mu = mu_all[i]
ga_mu = ga_mu_all[i]
sigma = sigma_all[i]
ga_sigma = ga_sigma_all[i]
# Cholesky of sigma
chol = scila.cho_factor(sigma)
precision = scila.cho_solve(chol, np.eye(mu.shape))
ga_precision = - np.einsum('ij,jkl,km->iml', precision, ga_sigma, precision)
xmu = x[[i],:]-mu
isgas = np.einsum('ij,jkl->ikl', precision, ga_precision)
out[i,:x.shape[1]] = self.pi.grad_x_log_pdf(
x[[i],:], params=params, idxs_slice=idxs_slice,
cache=pi_cache)
out[i,x.shape[1]:] = \
-.5 * ( np.einsum('ij,ik,...k->...j', -ga_mu, precision, xmu) + \
np.einsum('...i,ikl,...k->...l', xmu, ga_precision, xmu) + \
np.einsum('...i,ij,jk->...k', xmu, precision, -ga_mu) ) \
- .5 * np.einsum('iik->k', isgas) # Log determinant part
else:
raise NotImplementedError("To be done.")
return out
[docs]ConditionallyGaussianDistribution = ConditionallyNormalDistribution
[docs]class MeanConditionallyNormalDistribution(ConditionalDistribution):
r""" Multivariate Gaussian distribution :math:`\pi({\bf x}\vert{\bf y}) \sim \mathcal{N}(\mu({\bf y}), \Sigma)`
Args:
mu (:class:`Map<TransportMaps.Maps.Map>`): mean vector map
sigma (:class:`ndarray<numpy.ndarray>`): covariance matrix map
precision (:class:`ndarray<numpy.ndarray>`): precision matrix map
coeffs (:class:`ndarray<numpy.ndarray>`): fix the coefficients :math:`{\bf y}`
"""
def __init__(self, mu, sigma=None, precision=None, coeffs=None):
if (sigma is not None) and (precision is not None):
raise ValueError("The fields sigma and precision are mutually " +
"exclusive")
self._muMap = mu
self._mu = None
self._pi = NormalDistribution(np.zeros(mu.dim_out), sigma)
super(MeanConditionallyNormalDistribution,self).__init__(mu.dim_out, mu.dim_in)
self._coeffs = None
self.coeffs = coeffs
[docs] def get_ncalls_tree(self, indent=""):
out = super(MeanConditionallyNormalDistribution, self).get_ncalls_tree(indent)
out += self._pi.get_ncalls_tree(indent + ' ')
out += self._muMap.get_ncalls_tree(indent + ' ')
return out
[docs] def get_nevals_tree(self, indent=""):
out = super(MeanConditionallyNormalDistribution, self).get_nevals_tree(indent)
out += self._pi.get_nevals_tree(indent + ' ')
out += self._muMap.get_nevals_tree(indent + ' ')
return out
[docs] def get_teval_tree(self, indent=""):
out = super(MeanConditionallyNormalDistribution, self).get_teval_tree(indent)
out += self._pi.get_teval_tree(indent + ' ')
out += self._muMap.get_teval_tree(indent + ' ')
return out
[docs] def update_ncalls_tree(self, obj):
super(MeanConditionallyNormalDistribution, self).update_ncalls_tree(obj)
self._pi.update_ncalls_tree(obj._pi)
self._muMap.update_ncalls_tree(obj._muMap)
[docs] def update_nevals_tree(self, obj):
super(MeanConditionallyNormalDistribution, self).update_nevals_tree(obj)
self._pi.update_nevals_tree(obj._pi)
self._muMap.update_nevals_tree(obj._muMap)
[docs] def update_teval_tree(self, obj):
super(MeanConditionallyNormalDistribution, self).update_teval_tree(obj)
self._pi.update_teval_tree(obj._pi)
self._muMap.update_teval_tree(obj._muMap)
[docs] def reset_counters(self):
super(MeanConditionallyNormalDistribution, self).reset_counters()
self._pi.reset_counters()
self._muMap.reset_counters()
@property
[docs] def pi(self):
return self._pi
@property
[docs] def mu(self):
return self._pi.mu
@property
[docs] def sigma(self):
return self._pi.sigma
@property
[docs] def precision(self):
return self._pi.precision
@property
[docs] def muMap(self):
return self._muMap
@property
[docs] def coeffs(self):
return self._coeffs
@coeffs.setter
def coeffs(self, coeffs):
if coeffs is None:
self._coeffs = None
elif self._coeffs is None or np.any(self._coeffs != coeffs):
# Set up Gaussian distribution
mu = self._muMap.evaluate(coeffs[nax,:])[0,:]
self._pi.mu = mu
self._coeffs = coeffs
@property
[docs] def grad_a_mu(self):
return self._grad_a_mu
[docs] def rvs(self, m, y=None, **kwargs):
r""" Generate :math:`m` samples from the distribution.
Args:
m (int): number of samples to generate
y (:class:`ndarray<numpy.ndarray>` [:math:`d_y`]): conditioning values
:math:`{\bf Y}={\bf y}`
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d`]) -- :math:`m`
:math:`d`-dimensional samples
Raises:
NotImplementedError: the method needs to be defined in the sub-classes
"""
if self._coeffs is None:
if y.ndim == 1:
self.pi.mu = self._muMap.evaluate(y[nax,:])[0,:]
out = self.pi.rvs(m)
else:
mu = self._muMap.evaluate(y)
z = stats.norm().rvs(m*self.dim).reshape((m,self.dim))
out = mu + np.dot( self.pi.sampling_mat, z.T ).T
else:
out = self.pi.rvs(m)
return out
@cached([("pi",None),("mu",None)])
@counted
[docs] def log_pdf(self, x, y, params=None, idxs_slice=slice(None,None,None), cache=None):
r""" Evaluate :math:`\log \pi({\bf x}\vert{\bf y})`
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
y (:class:`ndarray<numpy.ndarray>` [:math:`m,d_y`] or :class:`ndarray<numpy.ndarray>` [:math:`d_y`]):
conditioning values :math:`{\bf Y}={\bf y}`. In the second case one
conditioning value is used for all the :math:`m` points :math:`{\bf x}`
params (dict): parameters
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m`]) -- values of :math:`\log\pi`
at the ``x`` points.
"""
if self._coeffs is None:
try:
pi_cache = cache['pi_cache']
mu_cache = cache['mu_cache']
except TypeError:
pi_cache = None
mu_cache = None
m = x.shape[0]
mu = self._muMap.evaluate(y, idxs_slice=idxs_slice, cache=mu_cache)
b = x - mu
sol = self.pi.solve_covariance( b.T ).T
out = - .5 * np.einsum('...i,...i->...', b, sol) \
- self.pi.dim * .5 * np.log(2.*np.pi) \
- .5 * self.pi.log_det_covariance
else:
try:
pi_cache = cache['pi_cache']
except TypeError:
pi_cache = None
out = self.pi.log_pdf(x, params=params, idxs_slice=idxs_slice,
cache=pi_cache)
return out
@cached([("pi",None),("mu",None)])
@counted
[docs] def grad_x_log_pdf(self, x, y, params=None, idxs_slice=slice(None,None,None),
cache=None):
r""" Evaluate :math:`\nabla_{\bf x,y} \log \pi({\bf x}\vert{\bf y})`
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
y (:class:`ndarray<numpy.ndarray>` [:math:`m,d_y`]): conditioning values
:math:`{\bf Y}={\bf y}`
params (dict): parameters
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d`]) -- values of
:math:`\nabla_x\log\pi` at the ``x`` points.
"""
if self._coeffs is None:
try:
pi_cache = cache['pi_cache']
mu_cache = cache['mu_cache']
except TypeError:
pi_cache = None
mu_cache = None
m = x.shape[0]
mu = self._muMap.evaluate(y, idxs_slice=idxs_slice, cache=mu_cache)
gx_mu = self._muMap.grad_x(y, idxs_slice=idxs_slice, cache=mu_cache)
out = np.zeros((m, self.dim + self.dim_y))
b = x - mu
out[:,:self.pi.dim] = - np.dot( self.pi.precision, b.T ).T
out[:,self.pi.dim:] = np.einsum(
'...i,...ij->...j', - out[:,:self.pi.dim], gx_mu)
else:
try:
pi_cache = cache['pi_cache']
mu_cache = cache['mu_cache']
except TypeError:
pi_cache = None
mu_cache = None
m = x.shape[0]
mu = self._muMap.evaluate(y, cache=mu_cache)
gx_mu = self._muMap.grad_x(y, cache=mu_cache)
out = np.zeros((m, self.dim + self.dim_y))
b = x - mu
out[:,:self.pi.dim] = - np.dot( self.pi.precision, b.T ).T
out[:,self.pi.dim:] = np.einsum(
'...i,ij->...j', - out[:,:self.pi.dim], gx_mu)
return out
@cached([("pi",None),("mu",None)], False)
@counted
[docs] def hess_x_log_pdf(self, x, y, params=None, idxs_slice=slice(None,None,None),
cache=None):
r""" Evaluate :math:`\nabla^2_{\bf x,y} \log \pi({\bf x}\vert{\bf y})`
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
y (:class:`ndarray<numpy.ndarray>` [:math:`m,d_y`]): conditioning values
:math:`{\bf Y}={\bf y}`
params (dict): parameters
idxs_slice (slice): if precomputed values are present, this parameter
indicates at which of the points to evaluate. The number of indices
represented by ``idxs_slice`` must match ``x.shape[0]``.
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,d`]) -- values of
:math:`\nabla^2_x\log\pi` at the ``x`` points.
"""
if self._coeffs is None:
try:
pi_cache = cache['pi_cache']
mu_cache = cache['mu_cache']
except TypeError:
pi_cache = None
mu_cache = None
m = x.shape[0]
mu = self._muMap.evaluate(y, idxs_slice=idxs_slice, cache=mu_cache)
gx_mu = self._muMap.grad_x(y, idxs_slice=idxs_slice, cache=mu_cache)
hx_mu = self._muMap.hess_x(y, idxs_slice=idxs_slice, cache=mu_cache)
out = np.zeros((m, self.dim + self.dim_y, self.dim + self.dim_y))
b = x - mu
dylpdf = np.dot( self.pi.precision, b.T ).T
dy2lpdf = - self.pi.precision
out[:,:self.pi.dim,:self.pi.dim] = - self.pi.precision[nax,:,:]
out[:,:self.pi.dim,self.pi.dim:] = \
np.einsum('ij,...jk->...ik', self.pi.precision, gx_mu)
out[:,self.pi.dim:,:self.pi.dim] = \
out[:,:self.pi.dim,self.pi.dim:].transpose((0,2,1))
out[:,self.pi.dim:,self.pi.dim:] = \
np.einsum('...ji,jk,...kl->...il',
gx_mu, dy2lpdf, gx_mu) + \
np.einsum('...i,...ijk->...jk', dylpdf, hx_mu)
else:
try:
pi_cache = cache['pi_cache']
mu_cache = cache['mu_cache']
except TypeError:
pi_cache = None
mu_cache = None
m = x.shape[0]
mu = self._muMap.evaluate(y, cache=mu_cache)
gx_mu = self._muMap.grad_x(y, cache=mu_cache)
hx_mu = self._muMap.hess_x(y, cache=mu_cache)
out = np.zeros((m, self.dim + self.dim_y, self.dim + self.dim_y))
b = x - mu
dylpdf = np.dot( self.pi.precision, b.T ).T
dy2lpdf = - self.pi.precision
out[:,:self.pi.dim,:self.pi.dim] = - self.pi.precision[nax,:,:]
out[:,:self.pi.dim,self.pi.dim:] = \
np.einsum('ij,jk->ik', self.pi.precision, gx_mu)[nax,:,:]
out[:,self.pi.dim:,:self.pi.dim] = \
out[:,:self.pi.dim,self.pi.dim:].transpose((0,2,1))
out[:,self.pi.dim:,self.pi.dim:] = \
np.einsum('ji,jk,kl->il',
gx_mu, dy2lpdf, gx_mu)[nax,:,:] + \
np.einsum('...i,ijk->...jk', dylpdf, hx_mu)
return out
[docs]MeanConditionallyGaussianDistribution = MeanConditionallyNormalDistribution