#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps. If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#
import copy as cp
import numpy as np
import scipy.stats as stats
from TransportMaps.Misc import counted, cached
from TransportMaps.Distributions.ConditionalDistributionBase import ConditionalDistribution
from TransportMaps.Distributions.FrozenDistributions import NormalDistribution
from TransportMaps.Distributions.Decomposable.SequentialInferenceDistributions import HiddenMarkovChainDistribution, \
MarkovChainDistribution
from TransportMaps.Maps.Functionals import Functional
from TransportMaps.Likelihoods.LikelihoodBase import LogLikelihood as TMLL
__all__ = [
'F_phi', 'F_sigma', 'IdentityFunction', 'ConstantFunction',
'PriorHyperParameters',
'PriorDynamicsInitialConditions',
'PriorDynamicsTransition',
'LogLikelihood',
'StocVolHyperDistribution',
'generate_data', 'trim_distribution']
nax = np.newaxis
############################
# Transformation functions #
############################
[docs]class F_phi(object):
def __init__(self,mean,std):
self.mean = mean
self.std = std
[docs] def evaluate(self, x):
Xh = self.mean+self.std*x
return 2*( np.exp(Xh) ) / (1+np.exp(Xh)) - 1.
[docs] def grad_x(self,x):
Xh = self.mean+self.std*x
return 2*self.std*np.exp(Xh) / ( np.exp(Xh) + 1. )**2
[docs] def hess_x(self,x):
Xh = self.mean+self.std*x
return -2*self.std**2*np.exp(Xh) * \
( np.exp(Xh) - 1. ) / ( np.exp(Xh) + 1. )**3
[docs]class F_sigma(object): #Square root of an inverse gamma
def __init__(self,k,theta):
self.k = k
self.theta = theta
self.F_Gauss2InvGamma=F_Gauss2InvGamma(k,theta)
[docs] def evaluate(self, x):
return np.sqrt( self.F_Gauss2InvGamma.evaluate(x) )
[docs] def grad_x(self,x):
return .5*(self.F_Gauss2InvGamma.evaluate(x)**-0.5) * \
self.F_Gauss2InvGamma.grad_x(x)
[docs] def hess_x(self,x):
FX=self.F_Gauss2InvGamma.evaluate(x)
return -.25*(FX**-1.5)*(self.F_Gauss2InvGamma.grad_x(x)**2) + \
.5*(FX**-0.5)*self.F_Gauss2InvGamma.hess_x(x)
class F_Gauss2InvGamma(object):
def __init__(self,k,theta):
self.k = k
self.theta = theta
self.std = stats.norm()
self.invGamma = stats.invgamma(k, scale = theta)
def evaluate(self, X):
return self.invGamma.ppf( self.std.cdf(X) )
def grad_x(self,X):
FX = self.evaluate(X)
return self.std.pdf(X)/self.invGamma.pdf(FX)
def hess_x(self,X):
FX = self.evaluate(X)
grad_FX = self.grad_x(X)
a = self.k
b = self.theta
gamma = (b - (a+1)*FX )/FX**2
return - grad_FX*( X + grad_FX*gamma )
[docs]class ConstantFunction(Functional):
def __init__(self,constant):
if not isinstance(constant, float) and not isinstance(constant, int):
raise ValueError("Constant must be float or int")
self.constant = float(constant)
super(ConstantFunction,self).__init__(1)
[docs] def evaluate(self, x):
return self.constant
[docs] def grad_x(self, x):
return 0.
[docs] def hess_x(self, x):
return 0.
[docs]class IdentityFunction(Functional):
def __init__(self):
super(IdentityFunction, self).__init__(1)
[docs] def evaluate(self, x):
return x
[docs] def grad_x(self, x):
return 1.
[docs] def hess_x(self, x):
return 0.
################
# Priors #
################
[docs]class PriorHyperParameters(NormalDistribution):
r""" Distribution :math:`\pi(\mu, \sigma, \phi)`
Here :math:`\mu \sim \mathcal{N}(0,\sigma^2_\mu)`,
:math:`\phi \sim \mathcal{N}(0,1)` and
:math:`\sigma \sim \mathcal{N}(0,1)`, if they are to be considered
hyper-parameters.
Args:
is_mu_hyper (bool): whether :math:`\mu` is an hyper-parameter
is_sigma_hyper (bool): whether :math:`\sigma` is an hyper-parameter
is_phi_hyper (bool): whether :math:`\phi` is an hyper-parameter
mu_sigma (float): parameter :math:`\sigma_\mu`
"""
def __init__(self, is_mu_hyper=False, is_sigma_hyper=False, is_phi_hyper=False,
sigma_mu=None):
dim = is_mu_hyper + is_sigma_hyper + is_phi_hyper
if dim == 0:
raise ValueError("Avoid defining densities of dimension 0")
self.is_mu_hyper = is_mu_hyper
self.is_sigma_hyper = is_sigma_hyper
self.is_phi_hyper = is_phi_hyper
self.sigma_mu = sigma_mu
# Define mean and covariance matrix
mu = np.zeros(dim)
cov = np.eye(dim)
if self.is_mu_hyper:
cov[0,0] = self.sigma_mu**2.
# Initialize Gaussian distribution
super(PriorHyperParameters, self).__init__(mu, covariance=cov)
[docs]class PriorDynamicsInitialConditions(ConditionalDistribution):
r""" Conditional distribution :math:`\pi({\bf X}_{t_0}\vert \mu, \sigma, \phi)`
Args:
is_mu_hyper (bool): whether :math:`\mu` is an hyper-parameter
is_sigma_hyper (bool): whether :math:`\sigma` is an hyper-parameter
is_phi_hyper (bool): whether :math:`\phi` is an hyper-parameter
"""
def __init__(self,
is_mu_hyper=False, mu=None,
is_sigma_hyper=False, sigma=None,
is_phi_hyper=False, phi=None):
#Local ordering of the input function: Xmu, Xsigma, Xphi, X0
self.is_mu_hyper = is_mu_hyper
self.is_sigma_hyper = is_sigma_hyper
self.is_phi_hyper = is_phi_hyper
self.hyper_dim = self.is_mu_hyper + self.is_sigma_hyper + self.is_phi_hyper
self.state_dim = 1
super(PriorDynamicsInitialConditions, self).__init__(
self.state_dim, self.hyper_dim)
# Init hyper-parameters transformations
self.mu = mu
self.phi = phi
self.sigma = sigma
counter = 0
if is_mu_hyper:
self.index_mu = counter
counter += 1
if is_sigma_hyper:
self.index_sigma = counter
counter += 1
if is_phi_hyper:
self.index_phi = counter
counter += 1
@cached()
@counted
[docs] def log_pdf(self, x, y, cache=None, **kwargs):
X0 = x[:,0]
Xmu, Xsigma, Xphi = self.extract_variables(y)
# Evaluate transform functions
f_Xmu = self.mu.evaluate(Xmu)
f_Xphi = self.phi.evaluate(Xphi)
f_Xsigma = self.sigma.evaluate(Xsigma)
# Evaluate
return -.5*(1-f_Xphi**2)/f_Xsigma**2*(X0-f_Xmu)**2 + \
- np.log(f_Xsigma) + 0.5 * np.log(1-f_Xphi**2)
@cached()
@counted
[docs] def grad_x_log_pdf(self, x, y, cache=None, **kwargs):
X0 = x[:,0]
Xmu, Xsigma, Xphi = self.extract_variables(y)
# Evaluate transform functions
f_Xmu = self.mu.evaluate(Xmu)
f_Xphi = self.phi.evaluate(Xphi)
f_Xsigma = self.sigma.evaluate(Xsigma)
# Evaluate gradients of transform functions
if self.is_phi_hyper:
grad_f_Xphi = self.phi.grad_x(Xphi)
if self.is_sigma_hyper:
grad_f_Xsigma = self.sigma.grad_x(Xsigma)
# Evaluate
sdim = self.state_dim
hdim = self.hyper_dim
grad = np.zeros( (x.shape[0], sdim + hdim) )
grad[:,0] = - (1-f_Xphi**2)/f_Xsigma**2*(X0-f_Xmu)
if self.is_mu_hyper:
grad[:,sdim+self.index_mu] = (1-f_Xphi**2)/f_Xsigma**2*(X0-f_Xmu)
if self.is_phi_hyper:
grad[:,sdim+self.index_phi] = f_Xphi*grad_f_Xphi/f_Xsigma**2*(X0-f_Xmu)**2 + \
- f_Xphi / (1-f_Xphi**2)*grad_f_Xphi
if self.is_sigma_hyper:
grad[:,sdim+self.index_sigma] = grad_f_Xsigma / f_Xsigma * \
( (1-f_Xphi**2)/f_Xsigma**2*(X0-f_Xmu)**2 - 1 )
return grad
@counted
[docs] def hess_x_log_pdf(self, x, y, cache=None, **kwargs):
X0 = x[:,0]
Xmu, Xsigma, Xphi = self.extract_variables(y)
# Evaluate transform functions
f_Xmu = self.mu.evaluate(Xmu)
f_Xphi = self.phi.evaluate(Xphi)
f_Xsigma = self.sigma.evaluate(Xsigma)
# Evaluate gradients and Hessians of transform functions
if self.is_phi_hyper:
grad_f_Xphi = self.phi.grad_x(Xphi)
hess_f_Xphi = self.phi.hess_x(Xphi)
if self.is_sigma_hyper:
grad_f_Xsigma = self.sigma.grad_x(Xsigma)
hess_f_Xsigma = self.sigma.hess_x(Xsigma)
# Evaluate
sdim = self.state_dim
hdim = self.hyper_dim
hess = np.zeros( (x.shape[0], sdim+hdim, sdim+hdim) )
hess[:,0,0] = -(1-f_Xphi**2)/f_Xsigma**2
if self.is_mu_hyper:
hess[:,sdim+self.index_mu, sdim+self.index_mu] = -(1-f_Xphi**2)/f_Xsigma**2
hess[:,0,sdim+self.index_mu] = (1-f_Xphi**2)/f_Xsigma**2
hess[:,sdim+self.index_mu,0] = hess[:,0,sdim+self.index_mu]
if self.is_sigma_hyper:
hess[:,sdim+self.index_mu,sdim+self.index_sigma] = \
-2*grad_f_Xsigma/f_Xsigma**3*(1-f_Xphi**2)*(X0-f_Xmu)
hess[:,sdim+self.index_sigma,sdim+self.index_mu] = \
hess[:,sdim+self.index_mu,sdim+self.index_sigma]
if self.is_phi_hyper:
hess[:,sdim+self.index_mu ,sdim+self.index_phi] = \
-2*f_Xphi/f_Xsigma**2*grad_f_Xphi*(X0-f_Xmu)
hess[:,sdim+self.index_phi,sdim+self.index_mu] = \
hess[:,sdim+self.index_mu,sdim+self.index_phi]
if self.is_sigma_hyper:
hess[:,sdim+self.index_sigma,sdim+self.index_sigma] = \
(1-f_Xphi**2)/f_Xsigma**3*(X0-f_Xmu)**2 * \
(hess_f_Xsigma-3*grad_f_Xsigma**2/f_Xsigma) + \
grad_f_Xsigma**2/f_Xsigma**2 - hess_f_Xsigma/f_Xsigma
hess[:,0,sdim+self.index_sigma] = \
2*grad_f_Xsigma/f_Xsigma**3*(1-f_Xphi**2)*(X0-f_Xmu)
hess[:,sdim+self.index_sigma,0] = hess[:,0,sdim+self.index_sigma]
if self.is_phi_hyper:
hess[:,sdim+self.index_sigma,sdim+self.index_phi]= \
-2*f_Xphi/f_Xsigma**3 * grad_f_Xphi * \
grad_f_Xsigma*(X0-f_Xmu)**2
hess[:,sdim+self.index_phi,sdim+self.index_sigma] = \
hess[:,sdim+self.index_sigma,sdim+self.index_phi]
if self.is_phi_hyper:
hess[:,sdim+self.index_phi,sdim+self.index_phi] = \
(grad_f_Xphi**2 + f_Xphi*hess_f_Xphi) * \
( (X0-f_Xmu)**2/f_Xsigma**2-1./(1.-f_Xphi**2)) - \
2*f_Xphi**2*grad_f_Xphi**2/(1.-f_Xphi**2)**2
hess[:,0,sdim+self.index_phi] = 2*f_Xphi/f_Xsigma**2 * \
grad_f_Xphi*(X0-f_Xmu)
hess[:,sdim+self.index_phi,0] = hess[:,0,sdim+self.index_phi]
return hess
[docs]class PriorDynamicsTransition(ConditionalDistribution):
r""" Transition distribution :math:`\pi({\bf X}_{t_{k+1}}\vert {\bf X}_{t_{k}}, \mu, \sigma, \phi)`
Args:
is_mu_hyper (bool): whether :math:`\mu` is an hyper-parameter
is_sigma_hyper (bool): whether :math:`\sigma` is an hyper-parameter
is_phi_hyper (bool): whether :math:`\phi` is an hyper-parameter
"""
def __init__(self,
is_mu_hyper=False, mu=None,
is_sigma_hyper=False, sigma=None,
is_phi_hyper=False, phi=None):
#Local ordering of the input function: Xmu, Xsigma, Xphi, X0
self.is_mu_hyper = is_mu_hyper
self.is_phi_hyper = is_phi_hyper
self.is_sigma_hyper = is_sigma_hyper
self.hyper_dim = self.is_mu_hyper + self.is_sigma_hyper + self.is_phi_hyper
self.state_dim = 1
super(PriorDynamicsTransition, self).__init__(
self.state_dim, self.state_dim + self.hyper_dim)
# Init hyper-parameters transformations
self.mu = mu
self.phi = phi
self.sigma = sigma
counter = 1
if is_mu_hyper:
self.index_mu = counter
counter += 1
if is_sigma_hyper:
self.index_sigma = counter
counter += 1
if is_phi_hyper:
self.index_phi = counter
counter += 1
@cached()
@counted
[docs] def log_pdf(self, x, y, cache=None, **kwargs):
Xtp1 = x[:,0]
(Xt, Xmu, Xsigma, Xphi) = self.extract_variables(y)
# Evaluate transform functions
f_Xmu = self.mu.evaluate(Xmu)
f_Xphi = self.phi.evaluate(Xphi)
f_Xsigma = self.sigma.evaluate(Xsigma)
# Evaluate
return -.5*1/f_Xsigma**2 * \
( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )**2 - np.log(f_Xsigma)
@cached()
@counted
[docs] def grad_x_log_pdf(self, x, y, cache=None, **kwargs):
Xtp1 = x[:,0]
(Xt, Xmu, Xsigma, Xphi) = self.extract_variables(y)
# Evaluate transform functions
f_Xmu = self.mu.evaluate(Xmu)
f_Xphi = self.phi.evaluate(Xphi)
f_Xsigma = self.sigma.evaluate(Xsigma)
# Evaluate gradients of transform functions
if self.is_phi_hyper:
grad_f_Xphi = self.phi.grad_x(Xphi)
if self.is_sigma_hyper:
grad_f_Xsigma = self.sigma.grad_x(Xsigma)
# Evaluate
sdim = self.state_dim
hdim = self.hyper_dim
grad = np.zeros( (x.shape[0], 2*sdim + hdim) )
grad[:,1] = f_Xphi/f_Xsigma**2*( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )
grad[:,0] = -1/f_Xsigma**2*( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )
if self.is_mu_hyper:
grad[:,sdim+self.index_mu] = - (f_Xphi-1)/f_Xsigma**2 * \
( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )
if self.is_sigma_hyper:
grad[:,sdim+self.index_sigma] = grad_f_Xsigma/f_Xsigma * \
(1/f_Xsigma**2 * (Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu))**2-1)
if self.is_phi_hyper:
grad[:,sdim+self.index_phi] = grad_f_Xphi/f_Xsigma**2 * \
( Xt - f_Xmu ) * ( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )
return grad
@counted
[docs] def hess_x_log_pdf(self, x, y, cache=None, **kwargs):
Xtp1 = x[:,0]
(Xt, Xmu, Xsigma, Xphi) = self.extract_variables(y)
# Evaluate transform functions
f_Xmu = self.mu.evaluate(Xmu)
f_Xphi = self.phi.evaluate(Xphi)
f_Xsigma = self.sigma.evaluate(Xsigma)
# Evaluate gradients and Hessians of transform functions
if self.is_phi_hyper:
grad_f_Xphi = self.phi.grad_x(Xphi)
hess_f_Xphi = self.phi.hess_x(Xphi)
if self.is_sigma_hyper:
grad_f_Xsigma = self.sigma.grad_x(Xsigma)
hess_f_Xsigma = self.sigma.hess_x(Xsigma)
# Evaluate
sdim = self.state_dim
hdim = self.hyper_dim
hess = np.zeros( (x.shape[0], 2*sdim+hdim, 2*sdim+hdim) )
hess[:,1,1] = -(f_Xphi/f_Xsigma)**2
hess[:,0,0] = -1/f_Xsigma**2
hess[:,1,0] = f_Xphi/f_Xsigma**2
hess[:,0,1] = hess[:,1,0]
if self.is_mu_hyper:
hess[:,sdim+self.index_mu,sdim+self.index_mu] = - ( (f_Xphi-1)/f_Xsigma )**2
hess[:,1,sdim+self.index_mu] = (f_Xphi*(f_Xphi-1))/f_Xsigma**2
hess[:,sdim+self.index_mu,1] = hess[:,1,sdim+self.index_mu]
hess[:,0,sdim+self.index_mu] = -(f_Xphi-1)/f_Xsigma**2
hess[:,sdim+self.index_mu,0] = hess[:,0,sdim+self.index_mu]
if self.is_sigma_hyper:
hess[:,sdim+self.index_mu,sdim+self.index_sigma] = \
2*(f_Xphi-1)*grad_f_Xsigma/f_Xsigma**3 * \
( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )
hess[:,sdim+self.index_sigma,sdim+self.index_mu] = \
hess[:,sdim+self.index_mu,sdim+self.index_sigma]
if self.is_phi_hyper:
hess[:,sdim+self.index_mu,sdim+self.index_phi] = \
-grad_f_Xphi/f_Xsigma**2 * \
( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) ) + \
(f_Xphi - 1)/f_Xsigma**2*grad_f_Xphi*(Xt - f_Xmu)
hess[:,sdim+self.index_phi,sdim+self.index_mu] = \
hess[:,sdim+self.index_mu,sdim+self.index_phi]
if self.is_sigma_hyper:
hess[:,sdim+self.index_sigma,sdim+self.index_sigma] = \
(hess_f_Xsigma/f_Xsigma - (grad_f_Xsigma/f_Xsigma)**2) * \
(1/f_Xsigma**2*(Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu))**2-1) - \
2*grad_f_Xsigma**2/f_Xsigma**4*(Xtp1-f_Xmu-f_Xphi*(Xt-f_Xmu))**2
hess[:,1,sdim+self.index_sigma] = \
-2*f_Xphi/f_Xsigma**3*grad_f_Xsigma * \
( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )
hess[:,sdim+self.index_sigma,1] = hess[:,1,sdim+self.index_sigma]
hess[:,0,sdim+self.index_sigma] = \
2*1/f_Xsigma**3*grad_f_Xsigma*( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )
hess[:,sdim+self.index_sigma,0] = hess[:,0,sdim+self.index_sigma]
if self.is_phi_hyper:
hess[:,sdim+self.index_sigma,sdim+self.index_phi]= \
-2*grad_f_Xsigma*grad_f_Xphi/f_Xsigma**3*(Xt-f_Xmu) * \
( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) )
hess[:,sdim+self.index_phi,sdim+self.index_sigma] = \
hess[:,sdim+self.index_sigma,sdim+self.index_phi]
if self.is_phi_hyper:
hess[:,sdim+self.index_phi,sdim+self.index_phi] = \
hess_f_Xphi/f_Xsigma**2*(Xt-f_Xmu) * \
( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) ) - \
(grad_f_Xphi/f_Xsigma)**2*(Xt - f_Xmu)**2
hess[:,1,sdim+self.index_phi] = \
grad_f_Xphi/f_Xsigma**2*( Xtp1 - f_Xmu - f_Xphi*(Xt - f_Xmu) ) \
- f_Xphi * grad_f_Xphi / f_Xsigma**2*(Xt - f_Xmu)
hess[:,sdim+self.index_phi,1] = hess[:,1,sdim+self.index_phi]
hess[:,0,sdim+self.index_phi] = grad_f_Xphi/f_Xsigma**2*(Xt - f_Xmu)
hess[:,sdim+self.index_phi,0] = hess[:,0,sdim+self.index_phi]
return hess
##################
# Likelihood #
##################
[docs]class LogLikelihood(TMLL):
r""" Abstract class for log-likelihood :math:`\log \pi({\bf y}_t \vert {\bf X}_t), \mu, \phi, \sigma`
where, up to integration constants,
.. math::
\log\pi({\bf y}_t \vert {\bf X}_t, \mu, \phi, \sigma) = - \frac{1}{2}({\bf y}_t^2 \exp(-{\bf X}_t))
Args:
y (:class:`ndarray<numpy.ndarray>`): data
is_mu_hyper (bool): whether :math:`\mu` is an hyper-parameter
is_phi_hyper (bool): whether :math:`\phi` is an hyper-parameter
is_sigma_hyper (bool): whether :math:`\sigma` is an hyper-parameter
"""
def __init__(self, y,
is_mu_hyper=False, is_sigma_hyper=False, is_phi_hyper=False):
#Local ordering of the input function: X0, Xmu, Xsigma, Xphi
self.is_mu_hyper = is_mu_hyper
self.is_sigma_hyper = is_phi_hyper
self.is_phi_hyper = is_sigma_hyper
self.hyper_dim = self.is_mu_hyper + self.is_sigma_hyper + self.is_phi_hyper
self.state_dim = 1
dim = self.state_dim + self.hyper_dim
super(LogLikelihood, self).__init__(y, dim)
@cached()
@counted
[docs] def evaluate(self, x, cache=None, **kwargs):
Xt = self.extract_variables(x)
return (-.5 * self.y**2. * np.exp(-Xt) -.5 * Xt)[:,nax]
@cached()
@counted
[docs] def grad_x(self, x, cache=None, **kwargs):
Xt = self.extract_variables(x)
out = np.zeros((x.shape[0], 1, self.dim_in))
out[:,0,0] = .5 * self.y**2. * np.exp(-Xt) - .5
return out
@counted
[docs] def hess_x(self, x, cache=None, **kwargs):
Xt = self.extract_variables(x)
out = np.zeros((x.shape[0], 1, self.dim_in, self.dim_in))
out[:,0,0,0] = - .5 * self.y**2. * np.exp(-Xt)
return out
##################
# Full Posterior #
##################
[docs]class StocVolHyperDistribution(HiddenMarkovChainDistribution):
def __init__(self,
is_mu_hyper=False, is_sigma_hyper=False, is_phi_hyper=False,
mu=None, sigma=None, phi=None,
mu_sigma=1.,
sigma_k=1., sigma_theta=.1,
phi_mean=3., phi_std=1.):
# Initialize true dynamic (optionally provided)
self.Xt = None
# Build prior on hyper parameters if necessary
self.is_mu_hyper = is_mu_hyper
self.is_phi_hyper = is_phi_hyper
self.is_sigma_hyper = is_sigma_hyper
hyper_dim = is_mu_hyper + is_phi_hyper + is_sigma_hyper
if hyper_dim > 0:
pi_hyper = PriorHyperParameters(is_mu_hyper, is_sigma_hyper, is_phi_hyper,
mu_sigma)
else:
pi_hyper = None
# Store transformations
if is_mu_hyper:
self.mu = IdentityFunction()
else:
self.mu = ConstantFunction(mu)
if is_phi_hyper:
self.phi = F_phi(phi_mean, phi_std)
else:
self.phi = ConstantFunction(phi)
if is_sigma_hyper:
self.sigma = F_sigma(sigma_k, sigma_theta)
else:
self.sigma = ConstantFunction(sigma)
# Initialize distribution (no transitions defined yet)
super(StocVolHyperDistribution, self).__init__(
MarkovChainDistribution([], pi_hyper),
[]
)
@counted
[docs] def action_hess_x_log_pdf(self, x, dx, cache=None, **kwargs):
H = self.hess_x_log_pdf(x, cache=cache)
return np.einsum('...ij,...j->...i', H, dx)
@property
[docs] def index_mu(self):
if self.is_mu_hyper:
return 0
else:
return None
@property
[docs] def index_sigma(self):
if self.is_sigma_hyper:
counter = len([o for o in [self.index_mu] if o is not None])
return counter
else:
return None
@property
[docs] def index_phi(self):
if self.is_phi_hyper:
counter = len([o for o in [self.index_mu, self.index_sigma]
if o is not None])
return counter
else:
return None
[docs] def get_tvec(self):
return np.arange(self.get_nsteps())
[docs] def get_nsteps(self):
return len(self.pi_list)
[docs] def assimilate(self, y=None, Xt=None):
r""" Assimilate one piece of data.
Args:
y (:class:`ndarray<numpy.ndarray>`): data. ``y==None`` stands for missing data.
Xt (:class:`ndarray<numpy.ndarray>`): true dynamics
"""
ll = None
if y is not None:
ll = LogLikelihood(y, self.is_mu_hyper,
self.is_sigma_hyper, self.is_phi_hyper)
if len(self.pi_list) == 0: # Initial conditions
pi = PriorDynamicsInitialConditions(
self.is_mu_hyper, self.mu,
self.is_sigma_hyper, self.sigma,
self.is_phi_hyper, self.phi)
if Xt is not None:
self.Xt = [Xt]
else: # Transitions
pi = PriorDynamicsTransition(
self.is_mu_hyper, self.mu,
self.is_sigma_hyper, self.sigma,
self.is_phi_hyper, self.phi)
if Xt is not None:
if self.Xt is None or len(self.Xt) != len(self.pi_list):
raise ValueError("The true dynamics must be provided " + \
"at all steps, if one wish to privide them")
self.Xt.append(Xt)
self.append(pi, ll)
[docs]def generate_data(nsteps, mu = None, sigma = None, phi = None):
#Random initial conditions
x0 = sigma/np.sqrt(1.-phi**2)*np.random.randn(1) + mu
#Noise observations
noise_obs = np.random.randn(nsteps)
#Noise dynamics
noise_dyn = np.random.randn(nsteps)
# Allocate variables
Xt = np.zeros(nsteps)
Xt[0] = x0
# Simulate
for ii in range(nsteps-1):
Xt[ii+1] = mu+phi*(Xt[ii]-mu) + sigma*noise_dyn[ii]
#Collect observations
dataObs = list(noise_obs*np.exp(.5*Xt))
return (dataObs, Xt)
[docs]def trim_distribution(old_dens, nsteps):
old_nsteps = old_dens.get_nsteps()
if nsteps > old_nsteps:
raise ValueError("The number of steps can only be decreased with this function")
new_dens = cp.deepcopy(old_dens)
# re-init distribution
new_dens.dim = new_dens.hyper_dim
new_pi_list = new_dens.pi_list[:nsteps]
new_ll_list = new_dens.ll_list[:nsteps]
new_dens.pi_list = []
new_dens.ll_list = []
for pi, ll in zip(new_pi_list, new_ll_list):
new_dens.append(pi, ll)
return new_dens