#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps. If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#
import numpy as np
from TransportMaps.Misc import counted, cached, cached_tuple, get_sub_cache
from TransportMaps.Maps.Functionals import \
Functional
from TransportMaps.Maps import \
ConditionallyLinearMap, AffineMap
from TransportMaps.Distributions import \
ConditionalDistribution, \
NormalDistribution, \
ConditionallyNormalDistribution
__all__ = ['LogLikelihood',
'AdditiveLogLikelihood',
'AdditiveNormalLogLikelihood',
'AdditiveLinearNormalLogLikelihood',
'AdditiveLinearGaussianLogLikelihood',
'AdditiveConditionallyLinearNormalLogLikelihood',
'AdditiveConditionallyLinearGaussianLogLikelihood',
'LogisticLogLikelihood',
'TwoParametersLogisticLogLikelihood',
'IndependentLogLikelihood']
nax = np.newaxis
[docs]class LogLikelihood(Functional):
r""" Abstract class for log-likelihood :math:`\log \pi({\bf y} \vert {\bf x})`
Note that :math:`\log\pi:\mathbb{R}^d \rightarrow \mathbb{R}`
is considered a function of :math:`{\bf x}`, while the
data :math:`{\bf y}` is fixed.
Args:
y (:class:`ndarray<numpy.ndarray>`): data
dim (int): input dimension $d$
"""
def __init__(self, y, dim):
super(LogLikelihood, self).__init__(dim)
self.y = y
[docs] def _get_y(self):
return self._y
[docs] def _set_y(self, y):
self._y = y
[docs] y = property(_get_y, _set_y)
@cached()
@counted
[docs] def evaluate(self, x, *args, **kwargs):
r""" [Abstract] Evaluate :math:`\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1`]) -- function evaluations
"""
raise NotImplementedError("To be implemented in sub-classes")
@cached()
@counted
[docs] def grad_x(self, x, *args, **kwargs):
r""" [Abstract] Evaluate :math:`\nabla_{\bf x}\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d`]) -- gradient evaluations
"""
raise NotImplementedError("To be implemented in sub-classes")
@cached_tuple(['evaluate', 'grad_x'])
@counted
[docs] def tuple_grad_x(self, x, cache=None, **kwargs):
r""" Evaluate :math:`\left(\log\pi({\bf y} \vert {\bf x}),\nabla_{\bf x}\log\pi({\bf y} \vert {\bf x})\right)`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
Returns:
(:class:`tuple`) --
:math:`\left(\log\pi({\bf y} \vert {\bf x}),\nabla_{\bf x}\log\pi({\bf y} \vert {\bf x})\right)`
"""
return ( self.evaluate(x, cache=cache, **kwargs),
self.grad_x(x, cache=cache, **kwargs) )
@cached(caching=False)
@counted
[docs] def hess_x(self, x, *args, **kwargs):
r""" [Abstract] Evaluate :math:`\nabla^2_{\bf x}\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d,d`]) -- Hessian evaluations
"""
raise NotImplementedError("To be implemented in sub-classes")
@cached(caching=False)
@counted
[docs] def action_hess_x(self, x, dx, *args, **kwargs):
r""" [Abstract] Evaluate :math:`\langle \nabla^2_{\bf x}\log\pi({\bf y} \vert {\bf x}),\delta{\bf x}\rangle`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
dx (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): direction
on which to evaluate the Hessian
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d`]) -- Hessian evaluations
"""
raise NotImplementedError("To be implemented in sub-classes")
[docs]class AdditiveLogLikelihood(LogLikelihood):
r""" Log-likelihood :math:`\log \pi({\bf y} \vert {\bf x})=\log\pi({\bf y} - T({\bf x}))`
Args:
y (:class:`ndarray<numpy.ndarray>` :math:`[d_y]`): data
pi (:class:`Distribution<TransportMaps.Distributions.Distribution>`):
distribution :math:`\nu_\pi`
T (:class:`Map<TransportMaps.Maps.Map>`): map :math:`T:\mathbb{R}^d\rightarrow\mathbb{R}^{d_y}`
"""
def __init__(self, y, pi, T):
if len(y) != pi.dim:
raise ValueError("The dimension of the data must match the " + \
"dimension of the distribution pi")
if len(y) != T.dim_out:
raise ValueError("The dimension of the data must match the " + \
"dimension of the output of T")
super(AdditiveLogLikelihood, self).__init__(y, T.dim_in)
self.pi = pi
self.T = T
if issubclass(type(pi), ConditionalDistribution):
self._isPiCond = True
else:
self._isPiCond = False
[docs] def get_ncalls_tree(self, indent=""):
out = super(AdditiveLogLikelihood, self).get_ncalls_tree(indent)
out += self.pi.get_ncalls_tree(indent + ' ')
out += self.T.get_ncalls_tree(indent + ' ')
return out
[docs] def get_nevals_tree(self, indent=""):
out = super(AdditiveLogLikelihood, self).get_nevals_tree(indent)
out += self.pi.get_nevals_tree(indent + ' ')
out += self.T.get_nevals_tree(indent + ' ')
return out
[docs] def get_teval_tree(self, indent=""):
out = super(AdditiveLogLikelihood, self).get_teval_tree(indent)
out += self.pi.get_teval_tree(indent + ' ')
out += self.T.get_teval_tree(indent + ' ')
return out
[docs] def update_ncalls_tree(self, obj):
super(AdditiveLogLikelihood, self).update_ncalls_tree(obj)
self.pi.update_ncalls_tree(obj.pi)
self.T.update_ncalls_tree(obj.T)
[docs] def update_nevals_tree(self, obj):
super(AdditiveLogLikelihood, self).update_nevals_tree(obj)
self.pi.update_nevals_tree(obj.pi)
self.T.update_nevals_tree(obj.T)
[docs] def update_teval_tree(self, obj):
super(AdditiveLogLikelihood, self).update_teval_tree(obj)
self.pi.update_teval_tree(obj.pi)
self.T.update_teval_tree(obj.T)
[docs] def reset_counters(self):
super(AdditiveLogLikelihood, self).reset_counters()
self.pi.reset_counters()
self.T.reset_counters()
@property
[docs] def isPiCond(self):
return self._isPiCond
@cached([('pi',None),('T',None)])
@counted
[docs] def evaluate(self, x, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1`]) -- function evaluations
"""
pi_cache, T_cache = get_sub_cache(cache, ('pi',None), ('T',None))
frw = self.T.evaluate(x, idxs_slice=idxs_slice, cache=T_cache)
return self.pi.log_pdf(
self.y - frw, idxs_slice=idxs_slice, cache=pi_cache )[:,nax]
@cached([('pi',None),('T',None)])
@counted
[docs] def grad_x(self, x, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\nabla_{\bf x}\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d`]) -- gradient evaluations
.. todo:: caching is not implemented
"""
pi_cache, T_cache = get_sub_cache(cache, ('pi',None), ('T',None))
frw = self.T.evaluate(x, idxs_slice=idxs_slice, cache=T_cache)
gx_frw = self.T.grad_x(x, idxs_slice=idxs_slice, cache=T_cache)
gx_lpdf = self.pi.grad_x_log_pdf(
self.y - frw, idxs_slice=idxs_slice, cache=pi_cache )
out = - np.einsum('...i,...ij->...j',gx_lpdf, gx_frw)
return out[:,nax,:]
@cached_tuple(['evaluate','grad_x'], [('pi',None),('T',None)])
@counted
[docs] def tuple_grad_x(self, x, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\log\pi({\bf y} \vert {\bf x}), \nabla_{\bf x}\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
cache (dict): cache
Returns:
(:class:`tuple`) -- function and gradient evaluations
.. todo:: caching is not implemented
"""
pi_cache, T_cache = get_sub_cache(cache, ('pi',None), ('T',None))
frw, gx_frw = self.T.tuple_grad_x(x, idxs_slice=idxs_slice, cache=T_cache)
lpdf, gx_lpdf = self.pi.tuple_grad_x_log_pdf(
self.y - frw, idxs_slice=idxs_slice, cache=pi_cache )
gx = - np.einsum('...i,...ij->...j',gx_lpdf, gx_frw)
return lpdf[:,nax], gx[:,nax,:]
@cached([('pi',None),('T',None)],False)
@counted
[docs] def hess_x(self, x, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\nabla^2_{\bf x}\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d,d`]) -- Hessian evaluations
.. todo:: caching is not implemented
"""
pi_cache, T_cache = get_sub_cache(cache, ('pi',None), ('T',None))
frw = self.T.evaluate(x, idxs_slice=idxs_slice, cache=T_cache) # m x d_y
gx_frw = self.T.grad_x(x, idxs_slice=idxs_slice, cache=T_cache) # m x d_y x d_x
hx_frw = self.T.hess_x(x, idxs_slice=idxs_slice, cache=T_cache) # m x d_y x d_x x d_x
gx_lpdf = self.pi.grad_x_log_pdf(
self.y - frw, idxs_slice=idxs_slice, cache=pi_cache ) # m x d_y
hx_lpdf = self.pi.hess_x_log_pdf(
self.y - frw, idxs_slice=idxs_slice, cache=pi_cache ) # m x d_y x d_y
out = np.einsum('...ij,...ik,...kl->...jl', gx_frw, hx_lpdf, gx_frw)
out -= np.einsum('...i,...ijk->...jk', gx_lpdf, hx_frw) # m x d_x x d_x
return out[:,nax,:,:]
@cached([('pi',None),('T',None)],False)
@counted
[docs] def action_hess_x(self, x, dx, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\langle\nabla^2_{\bf x}\log\pi({\bf y} \vert {\bf x}),\delta{\bf x}\rangle`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
dx (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): direction
on which to evaluate the Hessian
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d`]) -- action of Hessian evaluations
.. todo:: caching is not implemented
"""
pi_cache, T_cache = get_sub_cache(cache, ('pi',None), ('T',None))
frw = self.T.evaluate(x, idxs_slice=idxs_slice, cache=T_cache) # m x d_y
gx_frw = self.T.grad_x(x, idxs_slice=idxs_slice, cache=T_cache) # m x d_y x d_x
gx_lpdf = self.pi.grad_x_log_pdf(
self.y - frw, idxs_slice=idxs_slice, cache=pi_cache ) # m x d_y
A = np.einsum('...ij,...j->...i', -gx_frw, dx) # m x d_y
A = self.pi.action_hess_x_log_pdf(
self.y - frw, A, idxs_slice=idxs_slice, cache=pi_cache) # m x d_y
A = np.einsum('...ij,...i->...j', -gx_frw, A) # m x d_x
B = -self.T.action_hess_x(
x, dx, idxs_slice=idxs_slice, cache=T_cache) # m x d_y x d_x
B = np.einsum('...i,...ij->...j', gx_lpdf, B) # m x d_x
return (A + B)[:,nax,:]
[docs]class AdditiveNormalLogLikelihood(AdditiveLogLikelihood):
r""" Define the log-likelihood for the additive Gaussian model
The model is
.. math::
{\bf y} = {\bf T}({\bf x}) + \varepsilon \;, \quad
\varepsilon \sim \mathcal{N}(\mu, \Sigma)
where :math:`T \in \mathbb{R}^{d_y \times d_x}`, :math:`\mu \in \mathbb{R}^{d_y}`
and :math:`\Sigma \in \mathbb{R}^{d_y \times d_y}` is symmetric positve
definite
.. document private functions
.. automethod:: __init__
"""
[docs] def __init__(self, y, T, mu, covariance=None, precision=None):
r"""
Args:
y (:class:`ndarray<numpy.ndarray>` [:math:`d_y`]): data
T (:class:`Map<TransportMaps.Maps.Map>`): map
:math:`T:\mathbb{R}^d\rightarrow\mathbb{R}^{d_y}`
mu (:class:`ndarray<numpy.ndarray>` [:math:`d_y`] or :class:`Map<TransportMaps.Maps.Map>`):
noise mean or parametric map returning the mean
covariance (:class:`ndarray<numpy.ndarray>` [:math:`d_y,d_y`] or :class:`Map<TransportMaps.Maps.Map>`):
noise covariance or parametric map returning the covariance
precision (:class:`ndarray<numpy.ndarray>` [:math:`d_y,d_y`] or :class:`Map<TransportMaps.Maps.Map>`):
noise precision matrix or parametric map returning the precision matrix
"""
pi = NormalDistribution(mu, covariance=covariance, precision=precision)
super(AdditiveNormalLogLikelihood, self).__init__(y, pi, T)
[docs]class AdditiveLinearNormalLogLikelihood(AdditiveNormalLogLikelihood):
r""" Define the log-likelihood for the additive linear Gaussian model
The model is
.. math::
{\bf y} = {\bf c} + {\bf T}{\bf x} + \varepsilon \;, \quad
\varepsilon \sim \mathcal{N}(\mu, \Sigma)
where :math:`T \in \mathbb{R}^{d_y \times d_x}`, :math:`\mu \in \mathbb{R}^{d_y}`
and :math:`\Sigma \in \mathbb{R}^{d_y \times d_y}` is symmetric positve
definite
Args:
y (:class:`ndarray<numpy.ndarray>` [:math:`d_y`]): data
c (:class:`ndarray<numpy.ndarray>` [:math:`d_y`] or :class:`Map<TransportMaps.Maps.Map>`): system constant or parametric map returning the constant
T (:class:`ndarray<numpy.ndarray>` [:math:`d_y,d_x`] or :class:`Map<TransportMaps.Maps.Map>`): system matrix or parametric map returning the system matrix
mu (:class:`ndarray<numpy.ndarray>` [:math:`d_y`] or :class:`Map<TransportMaps.Maps.Map>`): noise mean or parametric map returning the mean
covariance (:class:`ndarray<numpy.ndarray>` [:math:`d_y,d_y`] or :class:`Map<TransportMaps.Maps.Map>`):
noise covariance or parametric map returning the covariance
precision (:class:`ndarray<numpy.ndarray>` [:math:`d_y,d_y`] or :class:`Map<TransportMaps.Maps.Map>`):
noise precision matrix or parametric map returning the precision matrix
"""
def __init__(self, y, c, T, mu, covariance=None, precision=None):
# INIT MAP AND DISTRIBUTION
linmap = AffineMap(c=c, L=T)
super(AdditiveLinearNormalLogLikelihood, self).__init__(
y, linmap, mu, covariance=covariance, precision=precision)
[docs]AdditiveLinearGaussianLogLikelihood = AdditiveLinearNormalLogLikelihood
[docs]class AdditiveConditionallyLinearNormalLogLikelihood(AdditiveLogLikelihood):
r""" Define the log-likelihood for the additive linear Gaussian model
The model is
.. math::
{\bf y} = {\bf c}(\theta) + {\bf T}(\theta){\bf x} + \varepsilon \;, \quad
\varepsilon \sim \mathcal{N}(\mu(\theta), \Sigma(\theta))
where :math:`T \in \mathbb{R}^{d_y \times d_x}`, :math:`\mu \in \mathbb{R}^{d_y}`
and :math:`\Sigma \in \mathbb{R}^{d_y \times d_y}` is symmetric positve
definite
Args:
y (:class:`ndarray<numpy.ndarray>` [:math:`d_y`]): data
c (:class:`ndarray<numpy.ndarray>` [:math:`d_y`] or :class:`Map<TransportMaps.Maps.Map>`): system constant or parametric map returning the constant
T (:class:`ndarray<numpy.ndarray>` [:math:`d_y,d_x`] or :class:`Map<TransportMaps.Maps.Map>`): system matrix or parametric map returning the system matrix
mu (:class:`ndarray<numpy.ndarray>` [:math:`d_y`] or :class:`Map<TransportMaps.Maps.Map>`): noise mean or parametric map returning the mean
covariance (:class:`ndarray<numpy.ndarray>` [:math:`d_y,d_y`] or :class:`Map<TransportMaps.Maps.Map>`):
noise covariance or parametric map returning the covariance
precision (:class:`ndarray<numpy.ndarray>` [:math:`d_y,d_y`] or :class:`Map<TransportMaps.Maps.Map>`):
noise precision matrix or parametric map returning the precision matrix
active_vars_system (:class:`list` of :class:`int`): active variables
identifying the parameters for for :math:`c(\theta), T(\theta)`.
active_vars_distribution (:class:`list` of :class:`int`): active variables
identifying the parameters for for :math:`\mu(\theta), \Sigma(\theta)`.
coeffs (:class:`ndarray<numpy.ndarray>`): initialization coefficients
"""
def __init__(self, y, c, T, mu, covariance=None, precision=None,
active_vars_system=[], active_vars_distribution=[],
coeffs=None):
# SYSTEM
if c.dim_in != T.dim_in:
raise ValueError("Input dimension c and T don't match")
# DISTRIBUTION
if isinstance(mu, np.ndarray) and (
(covariance is not None and isinstance(covariance, np.ndarray)) or
(precision is not None and isinstance(precision, np.ndarray)) ):
pi = NormalDistribution(mu, covariance=covariance, precision=precision)
else:
if mu.dim_in != c.dim_in:
raise ValueError("Input dimension c and mu don't match")
if covariance is not None and mu.dim_in != covariance.dim_in:
raise ValueError("Input dimension mu and covariance don't match")
if precision is not None and mu.dim_in != precision.dim_in:
raise ValueError("Input dimension mu and precision don't match")
pi = ConditionallyNormalDistribution(mu, covariance=covariance, precision=precision)
# SETUP AUXILIARY VARIABLES
self._n_coeffs = c.dim_in
# INIT MAP AND DISTRIBUTION
linmap = ConditionallyLinearMap(c, T)
super(AdditiveConditionallyLinearNormalLogLikelihood, self).__init__(y, pi, linmap)
self.coeffs = coeffs
@property
[docs] def n_coeffs(self):
return self._n_coeffs
@property
[docs] def coeffs(self):
return self._coeffs
@coeffs.setter
def coeffs(self, coeffs):
if coeffs is not None:
self.T.coeffs = coeffs
if self.isPiCond:
self.pi.coeffs = coeffs
self._coeffs = coeffs
[docs]AdditiveConditionallyLinearGaussianLogLikelihood = AdditiveConditionallyLinearNormalLogLikelihood
[docs]class LogisticLogLikelihood( LogLikelihood ):
r""" Class for log-likelihood of :math:`{\bf y}`, where :math:`y_i \vert {\bf x}; {\bf f}_i \sim \mathrm{Bernoulli}(\mathrm{logit}^{-1}({\bf f}_i^\top {\bf x})`
This implements the vectorized log-likelihood:
.. math::
\log\pi({\bf y}\vert{\bf x}; {\bf f}_{1:N}) =
\sum_{i=1}^N y_i \frac{1}{1+\exp(-{\bf f}_i^\top {\bf x}} +
(1 - y_i) \left[
1 - \frac{1}{1+\exp(-{\bf f}_i^\top {\bf x}}
\right]
Args:
y (:class:`ndarray<numpy.ndarray>` of :class:`int` [N]): binary data
F (:class:`ndarray<numpy.ndarray>` [N,d-1]): factors :math:`{\bf f}_{1:N}`
"""
def __init__(self, y, F):
self._F = np.hstack( (np.ones((F.shape[0],1)), F) ) # Append intercept
super(LogisticLogLikelihood, self).__init__(y, self._F.shape[1])
@property
[docs] def F(self):
return self._F
[docs] def logit(self, x, *args, **kwargs):
return np.log( x / (1 - x) )
[docs] def inv_logit(self, x, *args, **kwargs):
return (1. + np.exp( - x ))**-1 # m x N
@counted
[docs] def evaluate(self, x, *args, **kwargs):
r"""
Args:
x (:class:`ndarray<numpy.ndarray>` [m,d]): evaluation points
"""
m = x.shape[0]
xf = np.einsum('ij,kj->ik', x, self._F) # m x N
stbl = (xf <= 20) # values for which log/exp are stable
ovfl = (xf > 20) # values for which log/exp overflow
tmp = np.zeros(xf.shape)
exp = np.exp(-xf)
yy = np.tile(self.y,(m, 1)) # m x N
# Stable locations
log = np.log(1 + exp[stbl])
tmp[stbl] = - yy[stbl] * log - (1-yy[stbl]) * (xf[stbl] + log)
# Overflow locations
tmp[ovfl] = - yy[ovfl] * exp[ovfl] - (1-yy[ovfl]) * (xf[ovfl] + exp[ovfl])
return np.sum(tmp, axis=1)[:,nax]
@counted
[docs] def grad_x(self, x, *args, **kwargs):
m = x.shape[0]
xf = np.einsum('ij,kj->ik', x, self._F) # m x N
stbl = (xf <= 20) # values for which log/exp are stable
ovfl = (xf > 20) # values for which log/exp overflow
tmp = np.zeros(xf.shape) # m x N
exp = np.exp(-xf)
yy = np.tile(self.y,(m, 1)) # m x N
# Stable locations
exp1exp = exp[stbl] / (1+exp[stbl])
tmp[stbl] = yy[stbl] * exp1exp + (1-yy[stbl]) * (exp1exp-1)
# Overflow location
tmp[ovfl] = yy[ovfl] * exp[ovfl] + (1-yy[ovfl]) * (exp[ovfl]-1)
# Contract and multiply by F
out = np.dot(tmp, self._F)
return out[:,nax,:]
# @counted
# def evaluate(self, x, *args, **kwargs):
# r"""
# Args:
# x (:class:`ndarray<numpy.ndarray>` [m,d]): evaluation points
# """
# xx = np.einsum('ij,kj->ik', x, self._F)
# p = self.inv_logit(xx)
# out = np.sum(
# self.y * np.log(p) + (1 - self.y) * np.log(1 - p), axis=1
# )
# return out[:,nax]
# @counted
# def grad_x(self, x, *args, **kwargs):
# xx = np.einsum('ij,kj->ik', x, self._F) # m x N
# p = (1 + np.exp(-xx))**-1 # m x N
# gx_p = ((1 + np.exp(-xx))**-2 * np.exp(-xx))[:,:,nax] * self._F[nax,:,:] # m x N x d
# gx_p1 = gx_p / p[:,:,nax] # m x N x d
# gx_p2 = gx_p / (1 - p)[:,:,nax] # m x N x d
# out = np.einsum('...i,...ij->...j', self.y, gx_p1) - \
# np.einsum('...i,...ij->...j', 1-self.y, gx_p2)
# return out[:,nax,:]
[docs]class TwoParametersLogisticLogLikelihood( LogLikelihood ):
r""" See Stan example
:math:`I`: number of words, :math:`J`: number of people
Args:
y (:class:`ndarray<numpy.ndarray>` of :class:`int` [I,J]): binary data
"""
def __init__(self, y):
super(TwoParametersLogisticLogLikelihood, self).__init__(
y, 2 * y.shape[0] + y.shape[1])
[docs] def inv_logit(self, alpha, beta, theta, *args, **kwargs):
return (1. + np.exp( - alpha[:,:,nax] * (theta[:,nax,:] - beta[:,:,nax]) ))**-1
@counted
[docs] def evaluate(self, x, *args, **kwargs):
I = self.y.shape[0]
J = self.y.shape[1]
# Extract alpha, beta and theta
alpha = x[:,:I]
beta = x[:,I:2*I]
theta = x[:,2*I:2*I+J]
# Compute inv logit
z = self.inv_logit(alpha, beta, theta)
# Compute log-likelihood
out = np.sum(
self.y[nax,:,:] * z + (1 - self.y[nax,:,:]) * (1-z),
axis=(1,2)
)
return out[:,nax]
@counted
[docs] def grad_x(self, x, *args, **kwargs):
I = self.y.shape[0]
J = self.y.shape[1]
# Extract alpha, beta and theta
alpha = x[:,:I]
beta = x[:,I:2*I]
theta = x[:,2*I:]
# Compute
yy = 2 * self.y - 1 # I x J
z = np.exp( - alpha[:,:,nax] * (theta[:,nax,:] - beta[:,:,nax]) )
zz = (1 + z)**-2 # m x I x J
out = np.zeros(x.shape)
# Partial alpha, beta and theta
pa = zz * z * (theta[:,nax,:] - beta[:,:,nax]) * yy[nax,:,:]
pb = - zz * z * alpha[:,:,nax] * yy[nax,:,:]
pt = - pb
# Assemble
out[:,:I] = np.sum(pa, axis=2)
out[:,I:2*I] = np.sum(pb, axis=2)
out[:,2*I:] = np.sum(pt, axis=1)
return out[:,nax,:]
[docs]class IndependentLogLikelihood(Functional):
r""" Handling likelihoods in the form :math:`\pi({\bf y}\vert{\bf x}) = \prod_{i=1}^{n}\pi_i({\bf y}_i\vert{\bf x}_i)`
Args:
factors (:class:`list` of :class:`tuple`): each tuple contains a
log-likelihood (:class:`LogLikelihood`) and the sub-set of variables
of :math:`{\bf x}` on which it acts.
Example
-------
Let :math:`\pi(y_0,y_2\vert x_0,x_1,x_2)= \pi_0(y_0\vert x_0) \pi_2(y_2,x_2)`.
>>> factors = [(ll0, [0]),
>>> (ll2, [2])]
>>> ll = IndependentLogLikelihood(factors)
"""
def __init__(self, factors):
self.factors = []
self.input_dim = set()
for i, (ll, xidxs) in enumerate(factors):
# Check right number of inputs
if ll is not None and len(set(list(xidxs))) != ll.dim_in:
raise TypeError("The dimension of the %d " % i + \
"log-likelihood is not consistent with " + \
"the number of variables.")
self.factors.append( (ll, xidxs) )
self.input_dim |= set(xidxs)
dim = 0 if len(self.input_dim) == 0 else max(self.input_dim)+1
super(IndependentLogLikelihood, self).__init__(dim)
[docs] def get_ncalls_tree(self, indent=""):
out = super(IndependentLogLikelihood, self).get_ncalls_tree(indent)
for ll,_ in self.factors:
if ll is not None:
out += ll.get_ncalls_tree(indent + ' ')
return out
[docs] def get_nevals_tree(self, indent=""):
out = super(IndependentLogLikelihood, self).get_nevals_tree(indent)
for ll,_ in self.factors:
if ll is not None:
out += ll.get_nevals_tree(indent + ' ')
return out
[docs] def get_teval_tree(self, indent=""):
out = super(IndependentLogLikelihood, self).get_teval_tree(indent)
for ll,_ in self.factors:
if ll is not None:
out += ll.get_teval_tree(indent + ' ')
return out
[docs] def update_ncalls_tree(self, obj):
super(IndependentLogLikelihood, self).update_ncalls_tree(obj)
for (ll,_),(obj_ll,_) in zip(self.factors,obj.factors):
if ll is not None:
ll.update_ncalls_tree(obj_ll)
[docs] def update_nevals_tree(self, obj):
super(IndependentLogLikelihood, self).update_nevals_tree(obj)
for (ll,_),(obj_ll,_) in zip(self.factors,obj.factors):
if ll is not None:
ll.update_nevals_tree(obj_ll)
[docs] def update_teval_tree(self, obj):
super(IndependentLogLikelihood, self).update_teval_tree(obj)
for (ll,_),(obj_ll,_) in zip(self.factors,obj.factors):
if ll is not None:
ll.update_teval_tree(obj_ll)
[docs] def reset_counters(self):
super(IndependentLogLikelihood, self).reset_counters()
for ll,_ in self.factors:
if ll is not None:
ll.reset_counters()
@property
[docs] def y(self):
return [ ll.y for ll,_ in self.factors ]
@property
[docs] def n_factors(self):
return len(self.factors)
[docs] def append(self, factor):
r""" Add a new factor to the likelihood
Args:
factors (:class:`tuple`): tuple containing a
log-likelihood (:class:`LogLikelihood`) and the sub-set of variables
of :math:`{\bf x}` on which it acts.
Example
-------
Let :math:`\pi(y_0,y_2\vert x_0,x_1,x_2)= \pi_0(y_0\vert x_0) \pi_2(y_2,x_2)` and
let's add the factor :math:`\pi_1(y_1\vert x_1)`, obtaining:
.. math::
\pi(y_0,y_1,y_2\vert x_0,x_1,x_2)= \pi_0(y_0\vert x_0) \pi_1(y_1\vert x_1) \pi_2(y_2,x_2)
>>> factor = (ll1, [1])
>>> ll.append(factor)
"""
ll, xidxs = factor
if ll is not None and len(set(xidxs)) != ll.dim_in:
raise TypeError("The dimension of the " + \
"log-likelihood is not consistent with " + \
"the number of variables.")
self.factors.append( (ll, xidxs) )
self.input_dim |= set(xidxs)
self.dim_in = max(self.input_dim)+1
@cached([("ll_list","n_factors")])
@counted
[docs] def evaluate(self, x, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1`]) -- function evaluations
"""
if x.shape[1] != self.dim_in:
raise ValueError("The dimension of the input does not match the dimension " + \
"of the log-likelihood")
ll_list_cache = get_sub_cache(cache, ("ll_list",self.n_factors))
out = np.zeros((x.shape[0],1))
for (ll, xidxs), ll_cache in zip(self.factors, ll_list_cache):
if ll is not None:
out += ll.evaluate(
x[:,xidxs], idxs_slice=idxs_slice, cache=ll_cache, **kwargs)
return out
@cached([("ll_list","n_factors")])
@counted
[docs] def grad_x(self, x, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\nabla_{\bf x}\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d`]) -- gradient evaluations
"""
if x.shape[1] != self.dim_in:
raise ValueError("The dimension of the input does not match the dimension " + \
"of the log-likelihood")
ll_list_cache = get_sub_cache(cache, ("ll_list",self.n_factors))
out = np.zeros((x.shape[0],1,self.dim_in))
for (ll, xidxs), ll_cache in zip(self.factors, ll_list_cache):
if ll is not None:
out[:,:,xidxs] += ll.grad_x(
x[:,xidxs], idxs_slice=idxs_slice, cache=ll_cache, **kwargs)
return out
@cached_tuple(['evaluate', 'grad_x'],[("ll_list","n_factors")])
@counted
[docs] def tuple_grad_x(self, x, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\log\pi({\bf y} \vert {\bf x}), \nabla_{\bf x}\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
cache (dict): cache
Returns:
(:class:`tuple`) -- function and gradient evaluations
"""
if x.shape[1] != self.dim_in:
raise ValueError("The dimension of the input does not match the dimension " + \
"of the log-likelihood")
ll_list_cache = get_sub_cache(cache, ("ll_list",self.n_factors))
fx_out = np.zeros((x.shape[0],1))
gx_out = np.zeros((x.shape[0],1,self.dim_in))
for (ll, xidxs), ll_cache in zip(self.factors, ll_list_cache):
if ll is not None:
fx, gfx = ll.tuple_grad_x(
x[:,xidxs], idxs_slice=idxs_slice, cache=ll_cache, **kwargs)
fx_out += fx
gx_out[:,:,xidxs] += gfx
return fx_out, gx_out
@cached([("ll_list","n_factors")],False)
@counted
[docs] def hess_x(self, x, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\nabla^2_{\bf x}\log\pi({\bf y} \vert {\bf x})`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d,d`]) -- Hessian evaluations
"""
if x.shape[1] != self.dim_in:
raise ValueError("The dimension of the input does not match the dimension " + \
"of the log-likelihood")
ll_list_cache = get_sub_cache(cache, ("ll_list",self.n_factors))
m = x.shape[0]
out = np.zeros( (m, 1, self.dim_in, self.dim_in) )
for (ll, xidxs), ll_cache in zip(self.factors, ll_list_cache):
if ll is not None:
out[np.ix_(range(m),[0],xidxs,xidxs)] += \
ll.hess_x(
x[:,xidxs], idxs_slice=idxs_slice, cache=ll_cache, **kwargs)
return out
@cached([("ll_list","n_factors")],False)
@counted
[docs] def action_hess_x(self, x, dx, idxs_slice=slice(None,None,None), cache=None, **kwargs):
r""" Evaluate :math:`\langle\nabla^2_{\bf x}\log\pi({\bf y} \vert {\bf x}),\delta{\bf x}\rangle`.
Args:
x (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): evaluation points
dx (:class:`ndarray<numpy.ndarray>` [:math:`m,d`]): direction
on which to evaluate the Hessian
cache (dict): cache
Returns:
(:class:`ndarray<numpy.ndarray>` [:math:`m,1,d`]) -- Hessian evaluations
"""
if x.shape[1] != self.dim_in:
raise ValueError("The dimension of the input does not match the dimension " + \
"of the log-likelihood")
ll_list_cache = get_sub_cache(cache, ("ll_list",self.n_factors))
m = x.shape[0]
out = np.zeros( (m, 1, self.dim_in) )
for (ll, xidxs), ll_cache in zip(self.factors, ll_list_cache):
if ll is not None:
out[np.ix_(range(m),[0],xidxs)] += \
ll.action_hess_x(
x[:,xidxs], dx[:,xidxs],
idxs_slice=idxs_slice, cache=ll_cache, **kwargs)
return out