Source code for TransportMaps.Distributions.Examples.LogGaussianCoxProcess.LogGaussianCoxProcessDistributions

#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps.  If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Author: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#

import numpy as np
import scipy.linalg as scila
import scipy.stats as stats

from ....Misc import counted
from .... import Distributions, Maps

__all__ = [
    'StationaryKernel',
    'IsotropicStationaryKernel',
    'OrnsteinUhlenbeck',
    'SquaredExponentialKernel',
    'GaussianProcess',
    'PoissonPointProcessDistribution',
    'PoissonPointProcessLogLikelihood',
    'LogGaussianCoxProcessPosterior'
]

class Kernel(object):
    pass

[docs]class StationaryKernel(Kernel):
[docs] def distance(self, x1, x2): return x1 - x2
[docs]class IsotropicStationaryKernel(StationaryKernel):
[docs] def distance(self, x1, x2): return np.sqrt(np.sum((x1 - x2)**2., axis=1))
[docs]class OrnsteinUhlenbeck(IsotropicStationaryKernel): def __init__(self, l=1.): self.l = l
[docs] def __call__(self, x1, x2): d = self.distance(x1, x2) return np.exp( - d / self.l )
[docs]class SquaredExponentialKernel(IsotropicStationaryKernel): def __init__(self, l=1.): self.l = l
[docs] def __call__(self, x1, x2): d = self.distance(x1, x2) return np.exp( - d**2. / (2. * self.l**2.) )
[docs]class GaussianProcess(Distributions.PushForwardTransportMapDistribution): def __init__(self, pts, kernel, mu=None): self.proc_dim = pts.shape[1] self.pts = pts self.kernel = kernel if mu is None: mu = np.zeros(self.pts.shape[0]) # Evaluate covariance npts = self.pts.shape[0] x1 = np.zeros((npts**2, self.proc_dim)) x2 = np.zeros((npts**2, self.proc_dim)) for i in range(pts.shape[1]): p1, p2 = np.meshgrid( self.pts[:,i], self.pts[:,i] ) x1[:,i] = p1.flatten() x2[:,i] = p2.flatten() sigma = self.kernel(x1, x2).reshape((npts,npts)) chol = scila.cholesky(sigma, lower=True) tm = Maps.AffineTransportMap(c=mu, L=chol) base = Distributions.StandardNormalDistribution(mu.shape[0]) # Set the distribution super(GaussianProcess, self).__init__(tm, base)
[docs]class PoissonPointProcessDistribution(Distributions.Distribution): def __init__(self, lmb): self.lmb = lmb self.dim = len(lmb)
[docs] def rvs(self, n): out = np.zeros((n, self.dim),dtype=int) for i in range(self.dim): rv = stats.poisson(mu=self.lmb[i]) out[:,i] = rv.rvs(n) return out
[docs] def pdf(self, x, *args, **kwargs): r""" Evaluate the pdf. ``x`` is integer """ return np.exp( self.log_pdf(x) )
[docs] def log_pdf(self, x, *args, **kwargs): r""" Evaluate the log_pdf. ``x`` is integer """ if x.shape[1] != self.dim: raise ValueError("Incompatible dimensions") out = x * np.log(self.lmb) # Add the log(x!) oshape = out.shape out = out.reshape(out.size) xmax = np.max(x) idxs = np.arange(out.size) for i in range(2,xmax+1): (ii,) = np.where(out[idxs] >= i) idxs = idxs[ii] out[idxs] += np.log(i) out = out.reshape(oshape) # Add last term out -= self.lmb return out
[docs]class PoissonPointProcessLogLikelihood(Maps.Map): def __init__(self, obs, dim_in): super(PoissonPointProcessLogLikelihood, self).__init__( dim_in=dim_in, dim_out=1) self.obs = obs self.nobs = len(obs) # Pre-compute the log(obs!) term self.log_obs_fact = np.zeros( len(obs) ) omax = np.max(self.obs) idxs = np.arange(len(self.obs)) for i in range(2,omax+1): (ii,) = np.where(self.log_obs_fact[idxs] >= i) idxs = idxs[ii] self.log_obs_fact[idxs] += np.log(i)
[docs] def evaluate(self, x, *args, **kwargs): r""" Evaluate the log_pdf. ``x`` is integer """ if x.shape[1] != self.dim_in: raise ValueError("Incompatible dimensions") out = np.zeros((x.shape[0], 1, self.dim_in)) out[:,0,:self.nobs] = self.obs * np.log(x[:,:self.nobs]) # Add log(obs!) term out[:,0,:self.nobs] -= self.log_obs_fact # Add last term out[:,0,:self.nobs] -= x[:,:self.nobs] return np.sum(out, axis=2) # Returns m x 1
[docs] def grad_x(self, x, *args, **kwargs): if x.shape[1] != self.dim_in: raise ValueError("Incompatible dimensions") out = np.zeros((x.shape[0], 1, self.dim_in)) out[:,0,:self.nobs] = self.obs / x[:,:self.nobs] - 1. return out # Returns m x 1 x d
[docs] def hess_x(self, x, *args, **kwargs): if x.shape[1] != self.dim_in: raise ValueError("Incompatible dimensions") out = np.zeros( (x.shape[0], 1, self.dim_in, self.dim_in) ) diag_out = np.einsum('...ii->...i', out) # Read/write from Numpy 1.10 diag_out[:,0,:self.nobs] = - self.obs / x[:,:self.nobs]**2. return out
[docs]class LogGaussianCoxProcessPosterior(Distributions.Distribution): # This is the posterior restricted to the observation positions def __init__(self, reduced_gp, obs, full_N, full_obs_idxs, full_gp, full_lmb): self.obs = obs # ndarray of size (1,nobs) self.nobs = len(obs) self.reduced_gp = reduced_gp self.dim = reduced_gp.dim self.full_N = full_N self.full_obs_idxs = full_obs_idxs self.full_gp = full_gp self.full_lmb = full_lmb ##### PRIOR ###### self.prior = Distributions.StandardNormalDistribution(self.dim) ##### LIKELIHOOD ##### pppll = PoissonPointProcessLogLikelihood(self.obs, self.dim) exp_map = Maps.FrozenExponentialDiagonalTransportMap(self.dim) self.log_likelihood = Maps.ListCompositeMap( map_list=[pppll, exp_map, reduced_gp.transport_map] ) @counted
[docs] def pdf(self, x, *args, **kwargs): return np.exp(self.log_pdf(x))
@counted
[docs] def log_pdf(self, x, *args, **kwargs): out = self.prior.log_pdf(x) out += self.log_likelihood.evaluate(x)[:,0] return out
@counted
[docs] def grad_x_log_pdf(self, x, *args, **kwargs): out = self.prior.grad_x_log_pdf(x) out += self.log_likelihood.grad_x(x)[:,0,:] return out
@counted
[docs] def hess_x_log_pdf(self, x, *args, **kwargs): out = self.prior.hess_x_log_pdf(x) out += self.log_likelihood.hess_x(x)[:,0,:,:] return out