Source code for TransportMaps.Algorithms.SequentialInference.LinearSequentialInference

#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps.  If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#

import collections
import numpy as np
import scipy.linalg as scila

from ...Maps import AffineTransportMap
from ...Maps.Decomposable import LiftedTransportMap
from ...Distributions import GaussianDistribution

from .SequentialInferenceBase import *

__all__ = ['LinearFilter',
           'LinearSmoother']

[docs]class LinearFilter(Filter): r""" Perform the on-line filtering of a sequential linear Gaussian Hidden Markov chain. Aka: Kalman filter. If the linear state-space model is parametric, i.e. .. math:: {\bf Z}_{k+1} = {\bf c}_k(\theta) + {\bf F}_k(\theta){\bf Z}_k + {\bf w}_k(\theta) \\ {\bf Y}_{k} = {\bf H}_k(\theta){\bf Z}_k + {\bf v}_k(\theta) then one can optionally compute the gradient (with respect to the parameters) of the filter. Args: ders (int): ``0`` no gradient is computed, ``1`` compute gradient pi_hyper (:class:`Distribution<TransportMaps.Distributions.Distribution>`): prior distribution on the hyper-parameters :math:`\pi(\Theta)` .. todo:: Square-root filter """ def __init__(self, ders=0, pi_hyper=None): super(LinearFilter, self).__init__(pi_hyper) self._marg_log_lklhood = 0. self._filt_mean_list = [] self._filt_cov_list = [] self._ders = ders if ders > 0: self._filt_grad_mean_list = [] self._filt_grad_cov_list = [] self._grad_marg_log_lklhood = 0. @property
[docs] def marginal_log_likelihood(self): r""" Returns the marginal log-likelihood :math:`\log\pi\left({\bf Y}_{\Xi\leq k}\right)` Returns: (:class:`float`) -- current marginal likelihood """ return self._marg_log_lklhood
@property
[docs] def filtering_mean_list(self): r""" Returns the means of all the filtering distributions Returns: (:class:`list` of :class:`float`) -- means of :math:`\pi\left({\bf Z}_k\middle\vert{\bf Y}_{\Xi\leq k}\right)` for :math:`k\in \Lambda=0,\ldots,n`. """ return self._filt_mean_list
@property
[docs] def filtering_covariance_list(self): r""" Returns the covariances of all the filtering distributions Returns: (:class:`list` of :class:`ndarray<numpy.ndarray>`) -- covariances of :math:`\pi\left({\bf Z}_k\middle\vert{\bf Y}_{\Xi\leq k}\right)` for :math:`k\in \Lambda=0,\ldots,n`. """ return self._filt_cov_list
@property
[docs] def grad_marginal_log_likelihood(self): r""" Returns the gradient of the marginal log-likelihood :math:`\nabla_\theta\log\pi\left({\bf Y}_{\Xi\leq k}\right)` Returns: (:class:`float`) -- current marginal likelihood """ return self._grad_marg_log_lklhood
@property
[docs] def filtering_grad_mean_list(self): r""" Returns the gradient of the means of all the filtering distributions Returns: (:class:`list` of :class:`float`) -- gradient of the means of :math:`\pi\left({\bf Z}_k\middle\vert{\bf Y}_{\Xi\leq k}\right)` for :math:`k\in \Lambda=0,\ldots,n`. """ return self._filt_grad_mean_list
@property
[docs] def filtering_grad_covariance_list(self): r""" Returns the gradient of the covariances of all the filtering distributions Returns: (:class:`list` of :class:`ndarray<numpy.ndarray>`) -- gradient of the covariances of :math:`\pi\left({\bf Z}_k\middle\vert{\bf Y}_{\Xi\leq k}\right)` for :math:`k\in \Lambda=0,\ldots,n`. """ return self._filt_grad_cov_list
[docs] def _assimilation_step(self): r""" Assimilate one piece of Gaussian data :math:`\left( \pi\left({\bf Z}_{k+1} \middle\vert {\bf Z}_k \right), \log \mathcal{L}\left({\bf y}_{k+1}\middle\vert {\bf Z}_{k+1}\right) \right)`. """ pi = self.pi.pi_list[-1] ll = self.pi.ll_list[-1] # PRECIDTION STEP if self.nsteps == 0: # Load initial conditions x_{0|-1} and P_{0|-1} pred_x = pi.mu pred_P = pi.covariance else: # Load former state and covariance x = self._filt_mean_list[-1] P = self._filt_cov_list[-1] # Load transition system c = pi.T.c F = pi.T.L b = pi.pi.mu Q = pi.pi.covariance # Predict pred_x = c + F.dot(x) + b pred_P = F.dot(P.dot(F.T)) + Q # Gradient recursion if self._ders > 0: # Prediction step if self.nsteps == 0: pred_ga_x = pi.grad_a_mu pred_ga_P = pi.grad_a_sigma else: # Load former gradient state and gradient covariance ga_x = self._filt_grad_mean_list[-1] ga_P = self._filt_grad_cov_list[-1] # Load transition derivatives ga_c = pi.T.grad_a_c ga_F = pi.T.grad_a_T ga_b = pi.pi.grad_a_mu ga_Q = pi.pi.grad_a_sigma # Predict pred_ga_x = ga_c + np.einsum('ijk,j->ik', ga_F, x) + \ np.einsum('ij,jk->ik', F, ga_x) + ga_b # gF P F^T + F gP F^T + F P gF^T pred_ga_P = np.einsum('ijk,jl,ml->imk', ga_F, P, F) + \ np.einsum('ij,jkl,mk->iml', F, ga_P, F) + \ np.einsum('ij,jk,lkm->ilm', F, P, ga_F) + \ ga_Q # State and convariance after prediction x = pred_x P = pred_P if self._ders > 0: ga_x = pred_ga_x ga_P = pred_ga_P # Perform update if necessary up_x = x up_P = P if ll is not None: # Load observation system z = ll.y a = ll.T.c H = ll.T.L b = ll.pi.mu R = ll.pi.covariance # Compute update recursions y = z - a - b - np.dot(H, x) # Mesurament residual S = R + np.dot(H, np.dot(P, H.T)) # Innovation S12_factors = scila.cho_factor(S, True) S1y = scila.cho_solve(S12_factors, y) S1H = scila.cho_solve(S12_factors, H) KS = np.dot(P, H.T) Ky = np.dot(KS, S1y) KH = np.dot(KS, S1H) ImKH = np.eye(pi.dim) - KH # Upadtes (state, covariance, marginal likelihood) up_x = x + Ky up_P = np.dot(ImKH, P) self._marg_log_lklhood -= .5 * (np.dot(y, S1y) + \ 2 * np.sum(np.log(np.diag(S12_factors[0]))) + \ y.shape[0] * np.log(2*np.pi)) # Gradient recursion if ll is not None and self._ders > 0: # Load gradients of observation system ga_a = ll.T.grad_a_c ga_H = ll.T.grad_a_T ga_b = ll.pi.grad_a_mu ga_R = ll.pi.grad_a_sigma ncoeffs = ga_a.shape[1] # Compute update recursions ga_y = - (ga_a + ga_b + np.einsum('ijk,j->ik', ga_H, x) + \ np.einsum('ij,jk->ik', H, ga_x) ) ga_S = ga_R + \ np.einsum('ijk,jl,ml->imk', ga_H, P, H) + \ np.einsum('ij,jkl,mk->iml', H, ga_P, H) + \ np.einsum('ij,jk,lkm->ilm', H, P, ga_H) S1gaS = np.zeros( ga_S.shape ) for nc in range(ncoeffs): S1gaS[:,:,nc] = scila.cho_solve(S12_factors, ga_S[:,:,nc]) ga_KS = np.einsum('ijk,lj->ilk', ga_P, H) + \ np.einsum('ij,kjl->ikl', P, ga_H) + \ np.einsum('ij,kj,klm->ilm', P, H, - S1gaS) S1gay = np.zeros( ga_y.shape ) for nc in range(ncoeffs): S1gay[:,nc] = scila.cho_solve(S12_factors, ga_y[:,nc]) S1gaH = np.zeros( ga_H.shape ) for nc in range(ncoeffs): S1gaH[:,:,nc] = scila.cho_solve(S12_factors, ga_H[:,:,nc]) Kgay = np.dot(KS, S1gay) gaKy = np.einsum('ijk,j->ik', ga_KS, S1y) gaKH = np.einsum('ijk,jl->ilk', ga_KS, S1H) KgaH = np.einsum('ij,jkl->ikl', KS, S1gaH) # Upadtes (state, covariance) up_ga_x = ga_x + gaKy + Kgay up_ga_P = - np.einsum('ijk,jl->ilk', gaKH + KgaH, P) \ + np.einsum('ij,jkl->ikl', ImKH, ga_P) # Update marginal likelihood tr_S1gaS = np.einsum('iik->k', S1gaS) # Trace for each parameter ga_mll = .5 * (np.dot(ga_y.T, S1y) + \ np.einsum('i,ijk,j->k', y, -S1gaS, S1y) + \ np.dot(y, S1gay) + \ tr_S1gaS) self._grad_marg_log_lklhood -= ga_mll # State and convariance after update x = up_x P = up_P if self._ders > 0: ga_x = up_ga_x ga_P = up_ga_P # Append new filtering map try: L = scila.cholesky(P, True) except scila.LinAlgError: U,S,V = scila.svd(P) L = U * np.sqrt(S) M = AffineTransportMap(c=x, L=L) self.F_list.append(M) # Update filtering mean and covariance self._filt_mean_list.append(x) self._filt_cov_list.append( np.dot(L, L.T) ) if self._ders > 0: self._filt_grad_mean_list.append( ga_x ) self._filt_grad_cov_list.append( ga_P )
[docs]class LinearSmoother(LinearFilter,Smoother): r""" Perform the on-line assimilation of a sequential linear Gaussian Hidden Markov chain. Args: lag (:class:`numpy.float`): lag to be used in the backward updates of smoothing means and covariances. The default value ``None`` indicates infinite lag. .. todo:: no hyper-parameter admitted right now. """ def __init__(self, lag=None): super(LinearSmoother,self).__init__() self._lag = lag self._smooth_mean_list = [] self._smooth_cov_list = [] # Data structures necessary for fast smoothing updates self._CB_queue = collections.deque(maxlen=lag) self._CB2_queue = collections.deque(maxlen=lag) @property
[docs] def lag(self): return self._lag
@property
[docs] def smoothing_mean_list(self): return self._smooth_mean_list
@property
[docs] def smoothing_covariance_list(self): return self._smooth_cov_list
[docs] def _assimilation_step(self): r""" Assimilate one piece of Gaussian data :math:`\left( \pi\left({\bf Z}_{k+1} \middle\vert {\bf Z}_k \right), \log \mathcal{L}\left({\bf y}_{k+1}\middle\vert {\bf Z}_{k+1}\right) \right)`. """ # Run the filter to get (c,L) LinearFilter._assimilation_step(self) c = self.F_list[-1].c C = self.F_list[-1].L a = None A = None B = None # Apply recursions if self.nsteps == 0: # At first step smoothing == filtering M = AffineTransportMap(c=c, L=C) L = LiftedTransportMap(-1, M, self.pi.dim, 0) self.L_list.append( L ) else: cm1 = self.F_list[-2].c Cm1 = self.F_list[-2].L # Load transition system pi = self.pi.pi_list[-1] sdim = pi.dim F = pi.T.L # Recursions if isinstance(pi.pi, GaussianDistribution): Q12 = pi.pi.square_root Q12FCm1 = scila.solve_triangular(Q12, np.dot(F, Cm1), lower=True) Q1FCm1 = scila.solve_triangular(Q12, Q12FCm1, lower=True, trans='T') else: Q1FCm1 = pi.pi.solve_covariance( np.dot(F, Cm1) ) J = np.eye(pi.dim) + np.dot(Cm1.T, np.dot(F.T, Q1FCm1)) J12 = scila.cholesky(J, lower=True) Cm1TFT = np.dot(Cm1.T, F.T) # A A = scila.solve_triangular(J12, np.eye(pi.dim), lower=True, trans='T') # B if isinstance(pi.pi, GaussianDistribution): Q12C = scila.solve_triangular(Q12, C, lower=True) Q1C = scila.solve_triangular(Q12, Q12C, lower=True, trans='T') else: Q1C = pi.pi.solve_covariance( C ) PC = - np.dot(Cm1TFT,Q1C) J12PC = scila.solve_triangular(J12, PC, lower=True) B = - scila.solve_triangular(J12, J12PC, lower=True, trans='T') # a Fcm1c = np.dot(F,cm1) - c if isinstance(pi.pi, GaussianDistribution): Q12Fcm1c = scila.solve_triangular(Q12, Fcm1c, lower=True) Q1Fcm1c = scila.solve_triangular(Q12, Q12Fcm1c, lower=True, trans='T') else: Q1Fcm1c = pi.pi.solve_covariance( Fcm1c ) PFcm1c = - np.dot(Cm1TFT, Q1Fcm1c) J12PFcm1c = scila.solve_triangular(J12, PFcm1c, lower=True) a = scila.solve_triangular(J12, J12PFcm1c, lower=True, trans='T') # Assemble smoothing map ac = np.hstack((a, c)) ABC = np.zeros((2*sdim, 2*sdim)) # Matrix [[A,B],[0,C]] ABC[:sdim,:sdim] = A ABC[:sdim,sdim:] = B ABC[sdim:,sdim:] = C M = AffineTransportMap(c=ac, L=ABC) # Update all dimension of previous maps for L in self.L_list: L.dim_in = L.dim_out = self.pi.dim # Append new map L = LiftedTransportMap(self.nsteps-1, M, self.pi.dim, 0) self.L_list.append(L) # Update smoothing means and covariances self._update_smoothing_mean_covariance_lists( self.nsteps, self.lag, self._smooth_mean_list, self._smooth_cov_list, self._CB_queue, self._CB2_queue, c, C, a, A, B)
[docs] def _update_smoothing_mean_covariance_lists( self, nsteps, lag, smooth_mean_list, smooth_cov_list, CB_queue, CB2_queue, c, C, a=None, A=None, B=None): # (the lag is controlled by the length of CB_queue and CB2_queue) for i, (CB, CB2) in enumerate(zip(CB_queue, CB2_queue)): if lag is None: lag = nsteps idx = max(nsteps - lag, 0) + i # MEAN: Insert new term <CB,a> smooth_mean_list[idx] += np.dot(CB, a) # COV: Remove last <CB,CB.T> term introduced smooth_cov_list[idx] -= CB2 # COV: Insert new term <CBA,CBA.T> CBA = np.dot(CB, A) CBA2 = np.dot(CBA, CBA.T) smooth_cov_list[idx] += CBA2 # Update CB and CB2 CBB = np.dot(CB, B) CB_queue[i] = CBB CBB2 = np.dot(CBB, CBB.T) CB2_queue[i] = CBB2 # COV: Insert new term <CBB,CBB.T> smooth_cov_list[idx] += CBB2 # Append new mean smooth_mean_list.append( c.copy() ) # Append new covariance CB_queue.append( C ) C2 = np.dot(C, C.T) CB2_queue.append( C2 ) smooth_cov_list.append( C2.copy() )
[docs] def offline_smoothing_mean_covariance_lists(self, lag=None): r""" Compute the mean and covariances with a fixed lag for a pre-assimilated density """ sdim = self.pi.state_dim smooth_mean_list = [] smooth_cov_list = [] # Data structures necessary for fast smoothing updates CB_queue = collections.deque(maxlen=lag) CB2_queue = collections.deque(maxlen=lag) # Iterate over the available maps for nsteps, L in enumerate(self.L_list): M = L.tm if nsteps > 0: c = M.c[sdim:] C = M.L[sdim:,sdim:] a = M.c[:sdim] A = M.L[:sdim,:sdim] B = M.L[:sdim,sdim:] else: c = M.c[:sdim] C = M.L[:sdim,:sdim] a = None A = None B = None self._update_smoothing_mean_covariance_lists( nsteps, lag, smooth_mean_list, smooth_cov_list, CB_queue, CB2_queue, c, C, a, A, B) return (smooth_mean_list, smooth_cov_list)