#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps. If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#
import unittest
import numpy as np
import numpy.random as npr
import numpy.linalg as npla
from TransportMaps import Maps
from TransportMaps.Distributions.Examples import InertialNavigationSystem as INS
from TransportMaps.Distributions.Examples import StochasticVolatility as SV
from TransportMaps.Algorithms import SequentialInference as ALGSI
import TransportMaps.Distributions as DIST
import TransportMaps.Diagnostics as DIAG
import TransportMaps.Builders as BUILD
import TransportMaps as TM
try:
import mpi_map
except:
MPI_SUPPORT = False
[docs]class LinearGaussianSequentialInferenceTest(unittest.TestCase):
[docs] def setUp(self):
npr.seed(1)
[docs] def test(self):
nsteps = 96
# Generate data
T,Z,Y = INS.generate_data(nsteps)
for n in range(20,29):
Y[n] = None
# Run smoother
pi_prior = INS.Prior()
pi_trans = INS.Transition()
SMT = ALGSI.LinearSmoother(lag=None)
for n in range(nsteps+1):
# Define log-likelihood
if Y[n] is None: # Missing data
ll = None
else:
ll = INS.LogLikelihood(Y[n])
# Define transition / prior
if n > 0:
pin = pi_trans
else:
pin = pi_prior
# Assimilation
SMT.assimilate(pin, ll)
M = SMT.smoothing_map
pi = SMT.pi
cov_list = SMT.smoothing_covariance_list
# Test variance diagnostic
pull_M_pi = DIST.PullBackTransportMapDistribution(M, pi)
rho = DIST.StandardNormalDistribution(pi.dim)
var_seq = DIAG.variance_approx_kl(rho, pull_M_pi, qtype=0, qparams=1000)
self.assertTrue( var_seq < 1e-14 )
# Compare full covariances
pi_hx = npla.inv(-pi.hess_x_log_pdf(np.zeros((1,pi.dim)))[0,:,:])
gx_M = M.grad_x(np.zeros((1,pi.dim)))[0,:,:]
M_hx = np.dot(gx_M, gx_M.T)
self.assertTrue( np.allclose(pi_hx, M_hx) )
# Compare smoothing covariances
for i, cov in enumerate(cov_list):
cx = pi_hx[i*6:(i+1)*6,i*6:(i+1)*6]
self.assertTrue( np.allclose(cx, cov) )
[docs]class NonLinearStocVolSequentialInferenceTest(unittest.TestCase):
[docs] def setUp(self):
npr.seed(1)
[docs] def test(self):
nsteps = 10
# Generate data
mu = -.5
phi = .95
sigma = 0.25
yB, ZA = SV.generate_data(nsteps, mu, sigma, phi)
# Set up model
is_mu_h = True
is_sigma_h = False
is_phi_h = True
pi_hyper = SV.PriorHyperParameters(is_mu_h, is_sigma_h, is_phi_h, 1)
mu_h = SV.IdentityFunction()
phi_h = SV.F_phi(3.,1.)
sigma_h = SV.ConstantFunction(0.25)
pi_ic = SV.PriorDynamicsInitialConditions(
is_mu_h, mu_h, is_sigma_h, sigma_h, is_phi_h, phi_h)
pi_trans = SV.PriorDynamicsTransition(
is_mu_h, mu_h, is_sigma_h, sigma_h, is_phi_h, phi_h)
# Run smoother
INT = ALGSI.TransportMapsSmoother(pi_hyper)
for n, yt in enumerate(yB):
if yt is None: ll = None # Missing data
else: ll = SV.LogLikelihood(yt, is_mu_h, is_sigma_h, is_phi_h)
if n == 0: pin = pi_ic # Prior initial conditions
else: pin = pi_trans # Prior transition dynamics
# Transport map specification
if n == 0: dim_tm = 3
else: dim_tm = 4
tm = Maps.assemble_IsotropicIntegratedSquaredTriangularTransportMap(
dim=dim_tm, order=3)
# Solution builder (link to standard adaptivity algorithm, i.e. no adaptivity)
builder_class = BUILD.KullbackLeiblerBuilder
# Solution parameters
solve_params = {'qtype': 3, 'qparams': [7]*dim_tm,
'tol': 1e-3}
# Regression map specification
hyper_tm = Maps.assemble_IsotropicIntegratedSquaredTriangularTransportMap(
dim=pi_hyper.dim, order=3)
# Regression parameters
rho_hyper = DIST.StandardNormalDistribution(pi_hyper.dim)
reg_params = {'d': rho_hyper, 'qtype': 3,
'qparams': [7]*pi_hyper.dim,
'tol': 1e-4}
# Assimilation
INT.assimilate(pin, ll,
tm=tm, solve_params=solve_params,
builder_class=builder_class,
hyper_tm=hyper_tm, regression_params=reg_params)
# Diagnostic: compare to Laplace approximation
rho = DIST.StandardNormalDistribution(INT.pi.dim)
lpl = TM.laplace_approximation(INT.pi)
lpl_tm = Maps.LinearTransportMap.build_from_Normal(lpl)
pull_lpl_pi = DIST.PullBackTransportMapDistribution(lpl_tm, INT.pi)
var_lpl = DIAG.variance_approx_kl(rho, pull_lpl_pi, qtype=0, qparams=1000)
pull_T_pi = DIST.PullBackTransportMapDistribution(INT.smoothing_map, INT.pi)
var_seq = DIAG.variance_approx_kl(rho, pull_T_pi, qtype=0, qparams=1000)
assert(var_seq < var_lpl)
[docs]def build_suite(ttype='all'):
suite_lgsi = unittest.TestLoader().loadTestsFromTestCase(
LinearGaussianSequentialInferenceTest)
suite_nlsvsi = unittest.TestLoader().loadTestsFromTestCase(
NonLinearStocVolSequentialInferenceTest)
# Group suites
suites_list = []
if ttype in ['all', 'serial']:
suites_list += [
suite_lgsi,
suite_nlsvsi
]
all_suites = unittest.TestSuite( suites_list )
return all_suites
[docs]def run_tests(
ttype='serial',
failfast=False
):
all_suites = build_suite(ttype)
# RUN
unittest.TextTestRunner(
verbosity=2,
failfast=failfast
).run(all_suites)
if __name__ == '__main__':
run_tests()