#!/usr/bin/env python
#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps. If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Author: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#
import numpy as np
import numpy.linalg as npla
from sklearn.gaussian_process import GaussianProcessRegressor
from sklearn.gaussian_process.kernels import RBF
from ....External import DOLFIN_SUPPORT
from ....Misc import counted
from .... import Distributions as DIST
from ....Distributions import Inference as DISTINF
from .... import Likelihoods as LKL
from .... import Maps
__all__ = [
"ViscosityInitialConditionsBurgersPosteriorDistribution"
]
[docs]class ViscosityInitialConditionsBurgersPosteriorDistribution(
DISTINF.BayesPosteriorDistribution
):
def __init__(
self,
# Physics
solver,
# Prior settings
u0_length_scale = 1.,
nu_mean = -3,
nu_std = 3,
# Likelihood setting
obs_sigma = 1e-2,
# Observations (must be defined on the FunctionSpace of the solver)
obs = None
):
if not DOLFIN_SUPPORT:
raise ImportError("Please install FENICS (dolfin) in order to use this class")
self.solver = solver
self.u0_length_scale = u0_length_scale
self.nu_mean = nu_mean
self.nu_std = nu_std
self.obs_sigma = obs_sigma
self.obs = obs
# List of attributes to be excluded in get/set params
self.exclude_state_param_list = set()
###################
# Building priors
# The prior will be built as the push-forward of a
# Standard normal through maps delivering the right
# marginals. We'll do it this way so that it will be
# easy to whiten the prior.
# Build the prior for the viscosity:
# log \nu \sim N(\mu_\nu, \sigma_\nu)
L = Maps.FrozenLinearDiagonalTransportMap([nu_mean], [nu_std])
E = Maps.FrozenExponentialDiagonalTransportMap(1)
nu_map = Maps.CompositeTransportMap(E, L)
# prior_nu = DIST.PushForwardTransportMapDistribution(
# nu_map,
# DIST.StandardNormalDistribution(1)
# )
# Build the GP prior for the initial conditions
# To this end fit a GP with RBF covariance to the Boundary
# conditions of the problem, then extract its mean and covariance
# at the coordinates of the discretization.
# The prior is defined for the interior of the interval,
# as on the boundaries the values are fixed.
a = (solver.ul * solver.xr - solver.ur * solver.xl) / (solver.xr - solver.xl)
b = (solver.ur - solver.ul) / (solver.xr - solver.xl)
gpr = GaussianProcessRegressor(
# kernel = Matern(nu=self.u0_matern_smoothness)
kernel = RBF(length_scale=1.)
)
gpr.fit( np.array([[solver.xl],[solver.xr]]), np.array([0.,0.]) )
_, cov = gpr.predict( solver.coord.reshape((solver.ndofs, 1)), return_cov=True )
mean = a + b * solver.coord
self.mean_boundaries = [mean[0], mean[-1]]
mean = mean[1:-1]
cov = cov[1:-1,1:-1]
u, s, v = npla.svd(cov)
if not np.allclose(np.dot(v.T * s, v), cov, rtol=1e-8, atol=1e-8):
raise ValueError("Covariance matrix is not symmetric positive definite")
u0_map = Maps.AffineTransportMap(c=mean, L=np.dot(v.T * np.sqrt(s), v))
# prior_u0 = DIST.PushForwardTransportMapDistribution(
# u0_map, DIST.StandardNormalDistribution(solver.ndofs-2))
# Assemble the prior map (keep it around for whitening)
self.prior_map = Maps.TriangularListStackedTransportMap(
map_list = [nu_map, u0_map],
active_vars = [[0], list(range(1,solver.ndofs-1))]
)
prior = DIST.PushForwardTransportMapDistribution(
self.prior_map,
DIST.StandardNormalDistribution(solver.ndofs-1)
)
# # Assemble the prior distribution as a factorized one (nu,u0)
# prior = DIST.FactorizedDistribution([
# (prior_nu, [0], []),
# (prior_u0, list(range(1,solver.ndofs-1)), [])
# ])
# Define the log-likelihood
lkl = ViscosityInitialConditionsBurgersLogLikelihood(
solver = solver,
bvs = self.mean_boundaries,
obs = obs,
obs_sigma = obs_sigma
)
# Call super
super(ViscosityInitialConditionsBurgersPosteriorDistribution, self).__init__(lkl, prior)
class ViscosityInitialConditionsBurgersMismatchMap(Maps.Map):
def __init__(self, solver, bvs):
self.solver = solver
self.bvs = bvs
super(ViscosityInitialConditionsBurgersMismatchMap, self).__init__(
dim_in=solver.ndofs-1, dim_out=1)
@counted
def evaluate(self, x, *args, **kwargs):
r"""
Args:
x (array): the first column contains samples for :math:`\nu`,
while the remaining columns contain values of the field :math:`u_0`.
"""
f = np.zeros((x.shape[0],1))
u0 = np.zeros(self.solver.ndofs)
u0[0] = self.bvs[0]
u0[-1] = self.bvs[1]
for i in range(x.shape[0]):
nu = x[i,0]
u0[1:-1] = x[i,1:]
f[i,0] = self.solver.evaluate(nu, u0)
return f
@counted
def grad_x(self, x, *args, **kwargs):
u0 = np.zeros(self.solver.ndofs)
u0[0] = self.bvs[0]
u0[-1] = self.bvs[1]
gx = np.zeros((x.shape[0],1,x.shape[1]))
for i in range(x.shape[0]):
nu = x[i,0]
u0[1:-1] = x[i,1:]
dJdnu, dJdu = self.solver.grad_x(nu, u0)
gx[i,0,0] = dJdnu
gx[i,0,1:] = dJdu[1:-1]
return gx
@counted
def tuple_grad_x(self, x, *args, **kwargs):
u0 = np.zeros(self.solver.ndofs)
u0[0] = self.bvs[0]
u0[-1] = self.bvs[1]
f = np.zeros((x.shape[0],1))
gx = np.zeros((x.shape[0],1,x.shape[1]))
for i in range(x.shape[0]):
nu = x[i,0]
u0[1:-1] = x[i,1:]
J, dJdnu, dJdu = self.solver.tuple_grad_x(nu, u0)
f[i,0] = J
gx[i,0,0] = dJdnu
gx[i,0,1:] = dJdu[1:-1]
return f, gx
class ViscosityInitialConditionsBurgersLogLikelihood( LKL.LogLikelihood ):
r"""
Given the ``solver`` :math:`J:(u_0,\nu) \mapsto \int_a^b (u(x,T) - u_d)^2 dx`,
where the mapping :math:`F:(u_0,\nu) \mapsto u(x,T)` is defined by the solution
of the Burger's equation
.. math::
\begin{cases}
\partial_t u(x,t) = \nu \partial_{xx} u - u \partial_x u & x \in [x_l, x_r] \\
u(x,0) = u_0(x) & \\
u(x_l,t) = u_l \quad u(x_r,t) = u_r &
\end{cases} \;,
the observation :math:`u_d := F_h(\hat{u}_0,\hat{\nu})` (where :math:`F_h` is the
data generating model for the coefficients :math:`\hat{u}_0,\hat{\nu}`) and
the observation noise :math:`sigma`, assembles the likelihood
.. math::
\mathcal{L}_{u_d}(u_0,\nu) := - \frac{1}{2\sigma^2} J(u_0,\nu)
"""
def __init__(self, solver, bvs, obs, obs_sigma):
r"""
Args:
solver (FinalL2MismatchBurgersProblem): solver providing function evaluation
and function/gradient evaluation
bvs (list): values to be appended to the boundaries of the initial conditions
obs (:class:`ndarray<numpy.ndarray>`): coefficients representing the final condition
in the :class:`FunctionSpace<dolfin.FunctionSpace>` used in the solver
obs_sigma (float): observation noise :math:`\sigma`
"""
if not DOLFIN_SUPPORT:
raise ImportError("Please install FENICS (dolfin) in order to use this class")
self.mismatch_map = ViscosityInitialConditionsBurgersMismatchMap(
solver, bvs
)
self.T = self.mismatch_map
self.obs_sigma = obs_sigma
super(ViscosityInitialConditionsBurgersLogLikelihood, self).__init__(obs, solver.ndofs-1)
def _get_y(self):
return super(ViscosityInitialConditionsBurgersLogLikelihood, self)._get_y()
def _set_y(self, y):
super(ViscosityInitialConditionsBurgersLogLikelihood, self)._set_y(y)
self.set_up()
y = property(_get_y, _set_y)
def set_up(self):
self.mismatch_map.solver.ud = self.y
def __setstate__(self, state):
super(ViscosityInitialConditionsBurgersLogLikelihood, self).__setstate__(state)
self.set_up()
@counted
def evaluate(self, x, *args, **kwargs):
r"""
Args:
x (array): the first column contains samples for :math:`\nu`,
while the remaining columns contain values of the field :math:`u_0`.
"""
f = self.T.evaluate(x, *args, **kwargs)
return - .5 * f[:,0] / self.obs_sigma**2
@counted
def grad_x(self, x, *args, **kwargs):
s = self.obs_sigma
gx = self.T.grad_x(x, *args, **kwargs)
return - .5 * gx[:,0,:] / s**2
@counted
def tuple_grad_x(self, x, *args, **kwargs):
s = self.obs_sigma
f, gx = self.T.tuple_grad_x(x, *args, **kwargs)
return - .5 * f[:,0] / s**2, - .5 * gx[:,0,:] / s**2