Source code for TransportMaps.Diagnostics.Validators

#
# This file is part of TransportMaps.
#
# TransportMaps is free software: you can redistribute it and/or modify
# it under the terms of the GNU Lesser General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# TransportMaps is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU Lesser General Public License for more details.
#
# You should have received a copy of the GNU Lesser General Public License
# along with TransportMaps.  If not, see <http://www.gnu.org/licenses/>.
#
# Transport Maps Library
# Copyright (C) 2015-2018 Massachusetts Institute of Technology
# Uncertainty Quantification group
# Department of Aeronautics and Astronautics
#
# Authors: Transport Map Team
# Website: transportmaps.mit.edu
# Support: transportmaps.mit.edu/qa/
#

import numpy as np
import numpy.linalg as npla
import numpy.random as npr
import scipy.stats as stats
import scipy.optimize as sciopt
from TransportMaps.KL import minimize_kl_divergence

from TransportMaps.Misc import read_and_cast_input
from TransportMaps.External import DARKSPARK_SUPPORT
from TransportMaps import \
    mpi_map, mpi_bcast_dmem, \
    ExpectationReduce, \
    distributed_sampling, \
    no_cost_function
from TransportMaps.ObjectBase import TMO
from TransportMaps.Distributions import \
    PullBackParametricTransportMapDistribution

if DARKSPARK_SUPPORT:
    from DARKSparK.multiDimensionalFunction import MultiDimensionalFunction,Reals64

__all__ = ['KLMinimizationValidator',
           'SampleAverageApproximationKLMinimizationValidator',
           # 'GradientChi2KLMinimizationValidator',
           # 'GradientToleranceRegionKLMinimizationValidator',
           'GradientBootstrapKLMinimizationValidator',
           'DimensionAdaptiveSparseGridKLMinimizationValidator']

[docs]class KLMinimizationValidator(TMO): def __init__( self, eps, cost_function=no_cost_function, max_cost=np.inf, max_nsamps=np.inf, stop_on_fcast=False): super(KLMinimizationValidator, self).__init__() self.eps = eps self.cost_function = cost_function self.max_cost = max_cost self.max_nsamps = max_nsamps self.stop_on_fcast = stop_on_fcast
[docs] def pre_solve(self, base_distribution, target_distribution,transport_map, validation_params): r""" Implements a routine to be initialize variables before the solving stage. This function should initialize quantities that are needed for a particular validator. """ pass
[docs] def post_solve(self, base_distribution, target_distribution,transport_map, validation_params): r""" Implements a routine to be make sure the variables set in presolve are in the correct state post exiting the solving stage. Since, in principle, algorithms can be stopped and re-started, this allows variables to be cleaned up when not needed. """ pass
[docs] def solve_to_tolerance( self, transport_map, base_distribution, target_distribution, solve_params, mpi_pool=None): validation_params = {} validation_params['ref_params'] = {} validation_params['solve_params'] = solve_params self.pre_solve(base_distribution, target_distribution,transport_map, validation_params) log = {} max_nsamps_flag = solve_params.get( 'x', np.zeros((0))).shape[0] >= self.max_nsamps cost = self.cost_function( ncalls=getattr(target_distribution, 'ncalls', {}), nevals=getattr(target_distribution, 'nevals', {}), teval=getattr(target_distribution, 'teval', {})) cost_flag = cost > self.max_cost fcast_cost = 0. fcast_cost_flag = False err = np.inf target_err = 0. prune_params = None pull_tar = PullBackParametricTransportMapDistribution( transport_map, target_distribution) ncalls_x_solve = getattr(target_distribution, 'ncalls', {}).copy() lst_ncalls = getattr(target_distribution, 'ncalls', {}).copy() it = 1 while err > target_err and not cost_flag: self.logger.info("Starting solve_to_tolerance iteration "+str(it)) if it > 1: if max_nsamps_flag: # The number of samples is already exceeding or at max. # No refinement possible. break # Compute refinement and forecast cost flag nsamps = self.refinement( base_distribution, pull_tar, validation_params=validation_params) self.logger.info("Refinement - nsamps: %d" % nsamps) if nsamps >= self.max_nsamps: self.logger.warning("Maximum number of samples reached.") max_nsamps_flag = True # If the maximum number of samples is just met, # let it run one more iteration. if nsamps > self.max_nsamps: break if self.stop_on_fcast: fcast_cost = self.cost_function( ncalls=target_distribution.ncalls, nevals=target_distribution.nevals, teval=target_distribution.teval, ncalls_x_solve=ncalls_x_solve, new_nx=nsamps) fcast_cost_flag = fcast_cost > self.max_cost if fcast_cost_flag: # Stop self.logger.warning("Predicted cost exceeds maximum cost allowed.") break solve_params['x0'] = transport_map.coeffs # Warm start # Solve log = minimize_kl_divergence( base_distribution, pull_tar, mpi_pool=mpi_pool, **solve_params) if not log['success']: cost = self.cost_function( ncalls=target_distribution.ncalls, nevals=target_distribution.nevals, teval=target_distribution.teval) break solve_params['x0'] = transport_map.coeffs # Warm start validation_params['solve_log'] = log err, target_err, prune_params = self.error_estimation( base_distribution, pull_tar, validation_params, full_output=True, mpi_pool=mpi_pool) ncalls_x_solve = { key: value - lst_ncalls.get(key,0) for key, value in pull_tar.ncalls.items()} lst_ncalls = pull_tar.ncalls.copy() cost = self.cost_function( ncalls=target_distribution.ncalls, nevals=target_distribution.nevals, teval=target_distribution.teval) cost_flag = cost > self.max_cost self.print_info(err, target_err, cost, validation_params, prune_params) it += 1 self.post_solve(base_distribution, target_distribution, transport_map, validation_params) log['validator_max_nsamps_exceeded'] = max_nsamps_flag log['validator_cost'] = cost log['validator_cost_exceeded'] = cost_flag log['validator_fcast_cost'] = fcast_cost log['validator_fcast_cost_exceeded'] = fcast_cost_flag log['validator_error'] = err log['validator_prune_params'] = prune_params log['validator_target_error'] = target_err return log
[docs] def print_info(self, err, target_err, cost, validation_params, prune_params): solve_params = validation_params['solve_params'] self.logger.info( "nsamps: %d - " % solve_params['qparams'] + \ "err: %.3e (target: %.3e)" % (err, target_err) + \ " - cost: %.2e" % cost)
[docs] def stopping_criterion(self, *args, **kwargs): r""" Implements the stopping criterion for the particular validator This function should return a target error value (e.g. the target error taking into account the magnitude of the objective and the absolute/relative tolerances). In theory :fun:`error_estimation<KLMinimizationValidator.error_estimation>` could handle everything by itself. This abstract method aims to provide more structure to the code. See the implementation :fun:`SampleAverageApproximationKLMinimizationValidator.stopping_criterion` for an example. """ raise NotImplementedError("To be implemented in subclasses if needed")
[docs] def error_estimation(self, base_distribution, pull_tar, error_estimation_params, *args, **kwargs): raise NotImplementedError("To be implemented in subclasses")
[docs] def refinement(self, base_distribution, pull_tar, qtype, qparams, ref_params, *args, **kwargs): r""" Returns: (:class:`tuple`) -- containing the new ``qparams`` and the number of points corresponding to it. """ raise NotImplementedError("To be implemented in subclasses")
[docs]class SampleAverageApproximationKLMinimizationValidator(KLMinimizationValidator): def __init__(self, eps, eps_abs=1e-13, cost_function=no_cost_function, max_cost=np.inf, max_nsamps=np.inf, stop_on_fcast=False, upper_mult=10, lower_n=2, alpha=0.05, lmb_def=2, lmb_max=10): if upper_mult < 1: raise AttributeError("The upper_mult argument must be a float >= 1") if lower_n < 2: raise AttributeError("The lower_n argument must be an integer >= 2") self.eps_abs = eps_abs self.upper_mult = upper_mult self.lower_n = lower_n self.alpha = alpha self.lmb_def = lmb_def self.lmb_max = lmb_max super(SampleAverageApproximationKLMinimizationValidator, self).__init__( eps, cost_function=cost_function, max_cost=max_cost, max_nsamps=max_nsamps, stop_on_fcast=stop_on_fcast)
[docs] def print_info(self, err, target_err, cost, validation_params, prune_params): solve_params = validation_params['solve_params'] ref_params = validation_params['ref_params'] self.logger.info( "nsamps: %d" % solve_params['qparams'] + \ " - err: %.3e [L: %.3e, U:%.3e] (target: %.3e)" % ( err, ref_params['lower_interval'], ref_params['upper_interval'], target_err) + \ " - cost: %.2e" % cost)
[docs] def stopping_criterion(self, err_mag=None): if err_mag is not None: return err_mag * self.eps + self.eps_abs else: return 0.
[docs] def error_estimation( self, base_distribution, pull_tar, validation_params, full_output=False, mpi_pool=None): ref_params = validation_params['ref_params'] solve_params = validation_params['solve_params'] if solve_params['qtype'] != 0: raise AttributeError( "The Sample Average Approximation validator is defined only for " + \ "Monte Carlo quadrature rules") tm = pull_tar.transport_map # Compute upper bound (distributed sampling) upper_nsamps = int(np.ceil(self.upper_mult * solve_params['qparams'])) (x, w) = distributed_sampling( base_distribution, 0, upper_nsamps, mpi_pool=mpi_pool) v2 = - mpi_map( "log_pdf", obj=pull_tar, dmem_key_in_list=['x'], dmem_arg_in_list=['x'], dmem_val_in_list=[x], mpi_pool=mpi_pool) if solve_params.get('regularization') is not None: if solve_params['regularization']['type'] == 'L2': v2 += solve_params['regularization']['alpha'] * \ npla.norm(tm.coeffs - tm.get_identity_coeffs(), 2)**2. upper_mean = np.sum( v2 ) / float(upper_nsamps) upper_var = np.sum( (v2 - upper_mean)**2 ) / float(upper_nsamps-1) upper_interval = stats.t.ppf(1-self.alpha, upper_nsamps) * \ np.sqrt(upper_var/float(upper_nsamps)) upper_bound = upper_mean + upper_interval # Compute lower bound res_x = solve_params.pop('x') res_w = solve_params.pop('w') res_coeffs = pull_tar.coeffs[:] lower_nsamps = self.lower_n v2 = np.zeros(lower_nsamps) coeffs = np.zeros((lower_nsamps, tm.n_coeffs)) for i in range(lower_nsamps): log = minimize_kl_divergence( base_distribution, pull_tar, mpi_pool=mpi_pool, **solve_params) v2[i] = log['fval'] coeffs[i,:] = tm.coeffs lower_mean = np.sum(v2) / float(lower_nsamps) lower_var = np.sum((v2 - lower_mean)**2) / float(lower_nsamps-1) lower_interval = stats.t.ppf(1-self.alpha, lower_nsamps) * \ np.sqrt(lower_var/float(lower_nsamps)) lower_bound = lower_mean - lower_interval pull_tar.coeffs = res_coeffs # Restore coefficients solve_params['x'] = res_x solve_params['w'] = res_w # Error as the gap between the bounds err = max(0., upper_bound - lower_bound) if full_output: # Refinement parameters ref_params['upper_nsamps'] = upper_nsamps ref_params['upper_mean'] = upper_mean ref_params['upper_var'] = upper_var ref_params['upper_interval'] = upper_interval ref_params['lower_nsamps'] = lower_nsamps ref_params['lower_mean'] = lower_mean ref_params['lower_var'] = lower_var ref_params['lower_interval'] = lower_interval # Pruning parameter as 1/stand.dev. of coefficients scaled to (0,1) std_coeffs = np.std(coeffs, axis=0) prune_params = (1/std_coeffs)/max(1/std_coeffs) target_err = self.stopping_criterion(upper_mean) return err, target_err, prune_params else: return err
[docs] def refinement(self, base_distribution, pull_tar, validation_params): ref_params = validation_params['ref_params'] solve_params = validation_params['solve_params'] qparams = solve_params['qparams'] if solve_params['qtype'] != 0: raise AttributeError( "The Sample Average Approximation validator is defined only for " + \ "Monte Carlo quadrature rules") if 'upper_mean' not in ref_params: # If this is the first iteration q = solve_params['qparams'] else: # If one error estimation has already been done X = self.stopping_criterion(ref_params['upper_mean']) - \ (ref_params['upper_mean'] - ref_params['lower_mean']) - \ stats.t.ppf(1-self.alpha, ref_params['upper_nsamps']) * \ np.sqrt(ref_params['upper_var']/ref_params['upper_nsamps']) if X > 0: def f(m, alpha, X, lv): return stats.t.ppf(1-alpha, m) / m * np.sqrt(lv) - X if np.sign( f(ref_params['lower_nsamps'], self.alpha, X, ref_params['lower_var']) ) * \ np.sign( f(self.lmb_max, self.alpha, X, ref_params['lower_var']) ) == -1: m = sciopt.bisect( f, ref_params['lower_nsamps'], self.lmb_max, args=(self.alpha, X, ref_params['lower_var'])) q = int(np.ceil( m/ref_params['lower_nsamps'] * qparams )) else: q = int( np.ceil(self.lmb_def * qparams) ) else: q = int( np.ceil(self.lmb_def * qparams) ) q = min(q, self.max_nsamps) solve_params['qparams'] = q x, w = base_distribution.quadrature(0, q) solve_params['x'] = x solve_params['w'] = w return q
# class GradientChi2KLMinimizationValidator(KLMinimizationValidator): # def __init__(self, eps, # cost_function=no_cost_function, max_cost=np.inf, # max_nsamps=np.inf, stop_on_fcast=False, # n_grad_samps=10, n_bootstrap=None, # alpha=0.95, lmb_def=2, lmb_max=10, # fungrad=False): # self.n_grad_samps = n_grad_samps # self.n_bootstrap = n_bootstrap # self.alpha = alpha # self.lmb_def = lmb_def # self.lmb_max = lmb_max # self.fungrad = fungrad # super(GradientChi2KLMinimizationValidator, # self).__init__(eps, cost_function, max_cost, max_nsamps, stop_on_fcast) # def error_estimation( # self, base_distribution, pull_tar, solve_params, full_output=False): # if solve_params['qtype'] != 0: # raise AttributeError( # "The Gradient Stability validator is defined only for " + \ # "Monte Carlo quadrature rules") # tm = pull_tar.transport_map # # Compute mean and variance of the gradient # nsamps = int(np.ceil( self.n_grad_samps * solve_params['qparams'] )) # x, w = base_distribution.quadrature( # qtype=solve_params['qtype'], qparams=nsamps) # scatter_tuple = (['x'], [x]) # if not self.fungrad: # grad = - mpi_map( # "grad_a_log_pdf", obj=pull_tar, scatter_tuple=scatter_tuple, # mpi_pool=solve_params.get('mpi_pool')) # else: # _, grad = - mpi_map( # "tuple_grad_a_log_pdf", obj=pull_tar, scatter_tuple=scatter_tuple, # mpi_pool=solve_params.get('mpi_pool')) # if solve_params.get('regularization') is not None: # if solve_params['regularization']['type'] == 'L2': # grad += solve_params['regularization']['alpha'] * 2. * \ # (tm.coeffs - tm.get_identity_coeffs()) # # Use randomized resampling to avoid being forced to use n_grad_samps > tm.n_coeffs # if self.n_bootstrap is None: # avg_grads = np.mean( grad.reshape(( # solve_params['qparams'], self.n_grad_samps, tm.n_coeffs)), axis=0 ) # else: # avg_grads = np.zeros((self.n_bootstrap, tm.n_coeffs)) # for i in range(self.n_bootstrap): # idxs = npr.choice(nsamps, size=solve_params['qparams'], replace=True) # avg_grads[i,:] = np.mean( grad[idxs,:], axis=0 ) # mean = np.mean(avg_grads, axis=0) # cntr_avg_grads = avg_grads - mean # cov = np.dot(cntr_avg_grads.T, cntr_avg_grads) / float(solve_params['qparams']-1) # L = scila.cholesky(cov, lower=True) # cstar = stats.chi2(tm.n_coeffs).ppf(self.alpha) # # Solve maximization problem # A = - cstar/solve_params['qparams'] * np.dot(L.T,L) # b = - 2. * np.sqrt(cstar/solve_params['qparams']) * np.dot(mean, L) # c = - np.dot(mean, mean) # def f(x, A, b, c): # return np.dot(x,np.dot(A,x)) + np.dot(b, x) + c # def jac(x, A, b, c): # return 2. * np.dot(A,x) + b # def c1(x): # return 1 - np.dot(x,x) # def c1jac(x): # return - 2 * x # cons = ({'type': 'ineq', 'fun': c1, 'jac': c1jac}) # res = sciopt.minimize( # f, np.zeros(tm.n_coeffs), method='SLSQP', jac=jac, # args=(A, b, c), constraints=cons, tol=1e-12) # xstar = res['x'] # # Compute err as sqrt(f(x)) # err = np.sqrt(- f(xstar, A, b, c)) # if full_output: # # Refinement parameters # ref_params = { # 'mean':mean, # 'L': L, # 'cstar': cstar, # 'xstar': xstar # } # # Prune parameter as 1/variance of directional gradient # var_coeffs = np.diag(cov) # prune_params = (1/var_coeffs)/max(1/var_coeffs) # return err, ref_params, prune_params # else: # return err # def refinement(self, base_distribution, pull_tar, # qtype, qparams, ref_params): # if qtype != 0: # raise AttributeError( # "The Gradient Stability validator is defined only for " + \ # "Monte Carlo quadrature rules") # # Load parameters # mu = ref_params['mean'] # L = ref_params['L'] # cstar = ref_params['cstar'] # xstar = ref_params['xstar'] # # Compute refinement # a = np.dot(mu,mu) - self.eps**2 # if a <= 0: # Lx = np.dot(L, xstar) # b = 2. * np.sqrt(cstar) * np.dot(mu, np.dot(L, xstar)) # c = cstar * np.dot(Lx.T, Lx) # z = (-b + np.sqrt(b**2-4*a*c))/ 2. / a # q = min( int(np.ceil(z**2)), # self.lmb_max * qparams ) # else: # q = int( np.ceil(self.lmb_def * qparams) ) # return q, q # class GradientToleranceRegionKLMinimizationValidator(KLMinimizationValidator): # def __init__(self, eps, # cost_function=no_cost_function, max_cost=np.inf, # max_nsamps=np.inf, stop_on_fcast=False, # n_grad_samps=10, n_gap_resampling=10, # beta=0.95, gamma=0.05, lmb_def=2, lmb_max=10, # fungrad=False): # self.n_grad_samps = n_grad_samps # self.n_gap_resampling = n_gap_resampling # self.beta = beta # self.gamma = gamma # self.lmb_def = lmb_def # self.lmb_max = lmb_max # self.fungrad = fungrad # super(GradientStabilityKLMinimizationValidator, # self).__init__(eps, cost_function, max_cost, max_nsamps, stop_on_fcast) # def _cv(self, N, n, b, g): # n = self.n_grad_samps # b = self.beta # g = self.gamma # return (n-1) * stats.chi2(N, loc=float(N)/float(n)).ppf(b) / \ # stats.chi2(n-N).ppf(1-g) # def error_estimation( # self, base_distribution, pull_tar, solve_params, full_output=False): # if solve_params['qtype'] != 0: # raise AttributeError( # "The Gradient Stability validator is defined only for " + \ # "Monte Carlo quadrature rules") # tm = pull_tar.transport_map # # Compute mean and variance of the gradient # nsamps = int(np.ceil( self.n_grad_samps * solve_params['qparams'] )) # x, w = base_distribution.quadrature( # qtype=solve_params['qtype'], qparams=nsamps) # scatter_tuple = (['x'], [x]) # if not self.fungrad: # grad = - mpi_map( # "grad_a_log_pdf", obj=pull_tar, scatter_tuple=scatter_tuple, # mpi_pool=solve_params.get('mpi_pool')) # else: # _, grad = - mpi_map( # "tuple_grad_a_log_pdf", obj=pull_tar, scatter_tuple=scatter_tuple, # mpi_pool=solve_params.get('mpi_pool')) # if solve_params.get('regularization') is not None: # if solve_params['regularization']['type'] == 'L2': # grad += solve_params['regularization']['alpha'] * 2. * \ # (tm.coeffs - tm.get_identity_coeffs()) # # Use randomized resampling to avoid being forced to use n_grad_samps > tm.n_coeffs # n = tm.n_coeffs + self.n_gap_resampling # avg_grads = np.zeros((n, tm.n_coeffs)) # for i in range(n): # idxs = npr.choice(nsamps, size=solve_params['qparams'], replace=False) # avg_grads[i,:] = np.mean( grad[idxs,:], axis=0 ) # mean = np.mean(avg_grads, axis=0) # cntr_avg_grads = avg_grads - mean # cov = np.dot(cntr_avg_grads.T, cntr_avg_grads) / float(n-1) # L = scila.cholesky(cov, lower=True) # cstar = self._cv(tm.n_coeffs, n, self.beta, self.gamma) # # Solve maximization problem # A = - cstar/solve_params['qparams'] * np.dot(L.T,L) # b = - 2. * np.sqrt(cstar/solve_params['qparams']) * np.dot(mean, L) # c = - np.dot(mean, mean) # def f(x, A, b, c): # return np.dot(x,np.dot(A,x)) + np.dot(b, x) + c # def jac(x, A, b, c): # return 2. * np.dot(A,x) + b # def c1(x): # return 1 - np.dot(x,x) # def c1jac(x): # return - 2 * x # cons = ({'type': 'ineq', 'fun': c1, 'jac': c1jac}) # res = sciopt.minimize( # f, np.zeros(tm.n_coeffs), method='SLSQP', jac=jac, # args=(A, b, c), constraints=cons, tol=1e-12) # xstar = res['x'] # # Compute err as sqrt(f(x)) # err = np.sqrt(f(xstar, A, b, c)) # if full_output: # # Refinement parameters # ref_params = { # 'mean':mean, # 'L': L, # 'cstar': cstar, # 'xstar': xstar # } # # Compute prune parameters as 1/ variance of gradient # var_coeffs = np.diag(cov) # prune_params = (1/var_coeffs)/max(1/var_coeffs) # return err, ref_params, prune_params # else: # return err # def refinement(self, base_distribution, pull_tar, # qtype, qparams, ref_params): # if qtype != 0: # raise AttributeError( # "The Gradient Stability validator is defined only for " + \ # "Monte Carlo quadrature rules") # # Load parameters # mu = ref_params['mean'] # L = ref_params['L'] # cstar = ref_params['cstar'] # xstar = ref_params['xstar'] # # Compute refinement # a = np.dot(mu,mu) - self.eps**2 # if a <= 0: # Lx = np.dot(L, xstar) # b = 2. * np.sqrt(cstar) * np.dot(mu, np.dot(L, xstar)) # c = cstar * np.dot(Lx.T, Lx) # z = (-b + np.sqrt(b**2-4*a*c))/ 2. / a # q = int(np.ceil(z**2)) # else: # q = int( np.ceil(self.lmb_def * qparams) ) # return q, q
[docs]class GradientBootstrapKLMinimizationValidator(KLMinimizationValidator): r""" Bootstrap the gradient to check whether cost function is flat independently of the sample. For the current solution :math:`{\bf a}`, minimizing the cost :math:`\mathcal{Q}_n[\mathcal{J}[{\bf a}]({\bf x})]`, for the sample :math:`{\bf x}^\star`, check that .. math:: \mathbb{P}\left[ \left\Vert\mathcal{Q}_n[\nabla_{\bf a}\mathcal{J}[{\bf a}]({\bf x})]\right\Vert_2 \leq \delta \left\Vert\mathcal{Q}_n[\nabla_{\bf a}\mathcal{J}[{\bf a}]({\bf x}^\star)]\right\Vert_2 + \varepsilon \right] \geq \alpha Args: delta (float): multiplicative factor :math:`\delta` """ def __init__(self, eps, delta=5, cost_function=no_cost_function, max_cost=np.inf, max_nsamps=np.inf, stop_on_fcast=False, n_grad_samps=1, n_bootstrap=2000, alpha=0.95, lmb_min=2, lmb_max=10): self.delta = delta self.n_grad_samps = n_grad_samps self.n_bootstrap = n_bootstrap self.alpha = alpha self.lmb_min = lmb_min self.lmb_max = lmb_max super(GradientBootstrapKLMinimizationValidator, self).__init__(eps, cost_function, max_cost, max_nsamps, stop_on_fcast)
[docs] def update_tolerances(self): print("The target error is computed as: " + \ "delta * ||\nabla_a J[a]||_2 + eps" ) val, flag = read_and_cast_input("absolute tolerance (eps)", float, self.eps) if not flag: return flag self.eps = val val, flag = read_and_cast_input("gradient tolerance (delta)", float, self.delta) if not flag: return flag self.delta = val return flag
[docs] def pre_solve(self, base_distribution, target_distribution, transport_map, validation_params): validation_params['ref_params']['is_first_iter'] = True
[docs] def stopping_criterion(self, obj=None, grad=None, validation_params=None): if validation_params['solve_params'].get('ders') == 1: # Use the target tolerance value. Since ders is 1, then # the tolerance is referred to the l2 value of the gradient. return validation_params['solve_params'].get('tol') * self.delta + self.eps elif obj is not None and grad is not None: return npla.norm(grad, ord=2) * self.delta + self.eps else: return 0.
[docs] def error_estimation( self, base_distribution, pull_tar, validation_params, full_output=False, mpi_pool=None): ref_params = validation_params['ref_params'] solve_params = validation_params['solve_params'] if solve_params['qtype'] != 0: raise AttributeError( "The Bootstrap Gradient validator is defined only for " + \ "Monte Carlo quadrature rules.") tm = pull_tar.transport_map # Prepare the batch nsamps = int(np.ceil( self.n_grad_samps * solve_params['qparams'] )) x, w = base_distribution.quadrature( qtype=solve_params['qtype'], qparams=nsamps) grad = np.zeros((x.shape[0], tm.n_coeffs)) # Prepare batching sizes bsize = solve_params.get('batch_size') if bsize is not None: bsize = bsize[1] if bsize[1] is not None else x.shape[0] else: bsize = x.shape[0] # Distribute the object mpi_bcast_dmem(d=pull_tar, mpi_pool=mpi_pool) for n in range(0,x.shape[0],bsize): nend = min(x.shape[0], n+bsize) scatter_tuple = (['x'], [x[n:nend,:]]) if not solve_params.get('fungrad', False): grad[n:nend,:] = mpi_map( "grad_a_log_pdf", scatter_tuple=scatter_tuple, obj='d', obj_val=pull_tar, mpi_pool=mpi_pool) else: _, grad[n:nend,:] = mpi_map( "tuple_grad_a_log_pdf", scatter_tuple=scatter_tuple, obj='d', obj_val=pull_tar, mpi_pool=mpi_pool) grad = - grad if solve_params.get('regularization') is not None: if solve_params['regularization']['type'] == 'L2': grad += solve_params['regularization']['alpha'] * 2. * \ (tm.coeffs - tm.get_identity_coeffs()) # Update counters if mpi_pool is not None: d_child_list = mpi_pool.get_dmem('d') pull_tar.update_ncalls_tree( d_child_list[0][0] ) for (d_child,) in d_child_list: pull_tar.update_nevals_tree(d_child) pull_tar.update_teval_tree(d_child) # Clear mpi_pool if mpi_pool is not None: mpi_pool.clear_dmem() # Compute the n_bootstrap norms of the gradient nrm_grad = np.zeros(self.n_bootstrap) avg_grads = np.zeros((self.n_bootstrap, tm.n_coeffs)) for i in range(self.n_bootstrap): idxs = npr.choice(nsamps, size=nsamps, replace=True) avg_grads[i,:] = np.mean(grad[idxs,:], axis=0) nrm_grad[i] = npla.norm(avg_grads[i,:], ord=2) # Compute the 100*alpha% empirical percentile for n_bootstrap re-samples prc_idx = int(np.floor( self.alpha * self.n_bootstrap )) prc = nrm_grad[prc_idx] # Error is the 100*(1-alpha)% empirical percentile err = prc ref_params['error'] = err if full_output: # Prune parameters mean = np.mean(avg_grads, axis=0) cntr_avg_grads = avg_grads - mean var_coeffs = self.n_bootstrap / (self.n_bootstrap - 1) \ * np.mean(cntr_avg_grads**2, axis=0) prune_params = (1/var_coeffs)/max(1/var_coeffs) # Compute the target error solve_log = validation_params['solve_log'] target_err = self.stopping_criterion( obj = solve_log['fval'], grad = solve_log['jac'], validation_params = validation_params ) ref_params['target_error'] = target_err return err, target_err, prune_params else: return err
[docs] def refinement(self, base_distribution, pull_tar, validation_params): ref_params = validation_params['ref_params'] solve_params = validation_params['solve_params'] if ref_params['is_first_iter']: ref_params['is_first_iter'] = False try: x = solve_params['x'] w = solve_params['w'] except KeyError: x, w = base_distribution.quadrature(0, solve_params['qparams']) else: err = ref_params['error'] target_err = ref_params['target_error'] missing_digits = np.log10( err / target_err ) q = 10**(2 * missing_digits) nsamps = solve_params['qparams'] * max( self.lmb_min, min(self.lmb_max, q ) ) solve_params['qparams'] = int(np.ceil(nsamps)) x, w = base_distribution.quadrature(0, solve_params['qparams']) solve_params['x'] = x solve_params['w'] = w return solve_params['qparams']
[docs]class DimensionAdaptiveSparseGridKLMinimizationValidator(KLMinimizationValidator): def __init__(self, *args, **kwargs): if not DARKSPARK_SUPPORT: raise RuntimeError("Sparse Grid is not supported (install DARKSparK).") super(DimensionAdaptiveSparseGridKLMinimizationValidator, self).__init__(*args, **kwargs)
[docs] def grad_l2_reg(self,alpha,pull_tar): return alpha* 2.* (pull_tar.transport_map.coeffs - pull_tar.transport_map.get_identity_coeffs())
[docs] def grad_a_log_pdf_call(self, pull_tar,solve_params): return lambda x: -mpi_map("grad_a_log_pdf", obj= pull_tar, dmem_key_in_list=['x'], dmem_arg_in_list=['x'], dmem_val_in_list=[x], mpi_pool=solve_params.get('mpi_pool')).T
[docs] def pre_solve(self, base_distribution, target_distribution,transport_map, validation_params): solve_params = validation_params['solve_params'] ref_params = validation_params['ref_params'] solve_params['tol'] = self.eps ref_params['abs_tol'] = solve_params['tol'] / 10.#2. ref_params['solve_params'] = solve_params
[docs] def post_solve(self, base_distribution, target_distribution,transport_map, validation_params): solve_params = validation_params['solve_params'] ref_params = validation_params['ref_params'] solve_params['x'] = ref_params['x_err_est'] solve_params['w'] = ref_params['w_err_est'] del ref_params['x_err_est'] del ref_params['w_err_est']
[docs] def print_info(self, err, target_err, cost, validation_params, prune_params): solve_params = validation_params['solve_params'] ref_params = validation_params['ref_params'] self.logger.info( "Dimension Adaptive Sparse Grid num pts: %d - " % \ ref_params.get('x_err_est').shape[0] + \ "err: %.3e (target grad inf-norm: %.3e)" % (err, target_err) + \ " - cost: %.2e" % cost)
[docs] def stopping_criterion( self, solve_params): return solve_params['tol']
[docs] def error_estimation(self, base_distribution, pull_tar, validation_params, full_output=False): self.logger.info("Starting Error Estimation") ref_params = validation_params['ref_params'] solve_params = validation_params['solve_params'] qparams = {'abs_tol': ref_params['abs_tol']} x, w = base_distribution.quadrature( qtype=4, qparams=qparams, f=MultiDimensionalFunction( self.grad_a_log_pdf_call(pull_tar,solve_params), Reals64(pull_tar.dim), Reals64(pull_tar.n_coeffs))) #the error is inf norm of the (vector valued) integral of grad -log pullback of target + an optional l2 regularization term error_term = qparams['adapter'].vector_valued_integrals[-1] if solve_params.get('regularization') is not None: if solve_params['regularization']['type'] == 'L2': error_term += self.grad_l2_reg(solve_params['regularization']['alpha'],pull_tar) err = np.linalg.norm( error_term, np.inf) target_err = self.stopping_criterion(solve_params) if not err < target_err: #halve the abs_tol each time, but no need to refine past 1e-2 lower precision than the solve tol.. ref_params['abs_tol'] /= 2.0 ref_params['abs_tol'] = np.fmax(ref_params['abs_tol'],solve_params['tol'] / 100.0) del solve_params['x'] del solve_params['w'] #want new_quadrature_grad_norm to be small, below the optimization tolerance.. ref_params['x_err_est'] = x ref_params['w_err_est'] = w #Future: prune params based on bias^2 instead of variance (sparse grid variance)?? self.logger.info("Finished Error Estimation") return err, target_err, None #no prune params for now....
[docs] def refinement(self, base_distribution, pull_tar, validation_params): r""" Returns: (:class:`tuple`) -- containing the new ``qparams`` and the number of points corresponding to it. """ self.logger.info("Starting Refinement") solve_params = validation_params['solve_params'] ref_params = validation_params['ref_params'] if 'x' not in solve_params: # this is usually the case, except for at the start of a new "validation stage" # where x, w were set to x_err_est and w_err_west in post_solve previously. # in this case, we reuse x (no need to do new quadrature before starting optimization) if 'x_err_est' in ref_params: #use the quadrature from error_estimation for next optimization self.logger.info("Reusing quadrature points from error estimation") solve_params['x'] = ref_params['x_err_est'] solve_params['w'] = ref_params['w_err_est'] del ref_params['x_err_est'] del ref_params['w_err_est'] else: # only reaches here first iteration of first optimzation - # default level 1 grid x, w = base_distribution.quadrature(qtype=4, qparams=dict()) solve_params['x'] = x solve_params['w'] = w else: self.logger.info("Reusing quadrature points from previous optimization") self.logger.info("Finished Refinement") # #plot stuff to take a look at the quadrature # # integrals = [] # # for i in range(1,x.shape[0],round(x.shape[0]/100.)): # # x_mc,w_mc = base_distribution.quadrature(qtype=0, qparams=i) # # scatter_tuple = (['x'], [x_mc]) # # reduce_tuple = (['w'], [w_mc]) # # reduce_obj = ExpectationReduce() # # integrals.append(mpi_map("grad_a_log_pdf", scatter_tuple=scatter_tuple, # # obj=pull_tar, reduce_obj=reduce_obj, # # reduce_tuple=reduce_tuple, # # mpi_pool=solve_params.get('mpi_pool'))) #grad_a_ [0] # import matplotlib # matplotlib.use('Qt5Agg') # import matplotlib.pyplot as plt # # plt.plot(np.arange(1,x.shape[0],round(x.shape[0]/100.)), # # integrals,label="MC, by # pts",c='orange') # plt.plot(adapter.num_grid_pts_history, # adapter.vector_valued_integrals,label="SG, by # pts",c='blue') # plt.title("Refinement: Vector integral, E_rho[-grad log T^* pi]") # plt.xlabel("# samples") # plt.ylabel("integral") # plt.legend() # plt.show() return solve_params['x'].shape[0]