Posts for the month of July 2014

Implementation of the Iterative Generalized Polynomial Chaos (i-gPC)

Hi,

Here is an OpenTurns-based implementation of the so-called i-gPC method designed by Poette & Lucor (2012). It allows the approximation of strongly nonlinear responses of models depending on random parameters. The strategy was originally applied in the field of fluid dynamics, precisely to stochastic partial differential equations modelling shocks, e.g. Burgers equations.

The i-gPC procedure consists in first approximating the input (X)-output (Y) relationship by a classical chaos expansion (called Y0), and then to construct a new chaos (Y1) to approximate the mapping between the residuals (Y-Y0) and the response Y using an orthonormal basis with respect to the measure of (Y-Y0). Then a new chaos is sought to mimick the relationship between (Y-Y1) and (Y), and so on.

The implementation is successful in some cases, as for the 1D Heaviside function example provided with the OT script when some "good" algorithm parameters are selected. Nonetheless it may suffer severe numerical instability in some cases. This problem arises when creating at eash igPC step an ad-hoc orthonormal basis from a kernel estimate of the density functions of the current residuals.

This drawback should be overcome in a future version. Any suggestion/comment is welcome!

Main script "igPC.py"

# -*- coding: utf-8 -*-

""" 
Performs iterative generalized polynomial chaos approximation.

This is an implementation of the approach designed by [Poette, 
Lucor (2012)]. This approach was shown to be more accurate than 
ordinary chaos expansion without requiring additional computer 
experiments. It makes it possible to approximate strongly nonlinear 
model responses and to tackle the Gibbs phenomenon in the presence 
of discontinuities.

Reference
---------
G. Poette, D. Lucor, Non intrusive iterative stochastic spectral
representation with application to compressible gas dynamics,
Journal of Computational Physics 231 (2012), pp. 3587-3609
"""

import matplotlib.pyplot as plt
from numpy import loadtxt, c_, linspace, array
from openturns import *


class IterativeGPC(object):
    """ 
    Iterative generalized polynomial chaos study
    
    Parameters
    ----------
    in_filename: string
        Name of the file containing the input sample.
        
    out_filename: string
        Name of the file containing the output sample.
    
    myDistribution: *ComposedDistribution* openturns object.
        Joint distribution from which the input sample was 
        generated.
    
    pc_degree: integer
        Total degree of the polynomial chaos expansions.
    
    n_iter: integer, optional
        Number of igPC iterations. If `n_iter` is not given 
        (default 4).
    
    n_kernel_pdf: integer, optional
        Size of the sample that is generated from the 
        several chaos approximations in order to estimate the 
        density using kernel smoothing (default 5,000).
    
    Attributes
    ----------
    data: dict
        Dictionary that contains the input and the output samples 
        (*NumericalSample* objects) as well as their (common) 
        size and their respective dimensions.
    
    alg_opts: dict
        Dictionary of the parameters to pass to the `perform_igpc` 
        method.
        
    metamodels: dict
        List of metamodels (*NumericalMathFunction* objects) 
        obtained at each iteration of igPC.
    """
    
    def __init__(self, in_filename, out_filename, myDistribution,
                 pc_degree, n_iter=4, n_kernel_pdf=5000):
        
        # Import the input and the output data
        xSample, ySample = ( NumericalSample( c_[ loadtxt(fic) ] )
                             for fic in(in_filename, out_filename) )
        (n_sample_in, dim_in), (n_sample_out, dim_out) = ((sample.getSize(), 
                     sample.getDimension()) for sample in (xSample, ySample))
        if n_sample_in != n_sample_out:
            raise ValueError("The sizes of the input and the output "
                             + "samples must be equal.")
        else:
            self.data = {"input": xSample, "output": ySample, 
                         "size": n_sample_in, "dim_in": dim_in, 
                         "dim_out": dim_out}
        
        # Get the input joint distribution
        if dim_in != myDistribution.getDimension():
            raise ValueError("The dimensions of the input sample and "
                             + "the input joint distribution must be "
                             + "equal.")
        else:
            self.distribution = myDistribution
            
        # Get the igPC options
        self.alg_opts = {"pc_degree": pc_degree, "n_iter": n_iter,
                         "n_kernel_pdf": n_kernel_pdf}
        
        

    def perform_igpc(self):
        """
        Performs iterative generalized polynomial chaos approximation.
        """
        
        # Extract attributes
        myDistribution = self.distribution
        dim = self.data["dim_in"]
        n_sample = self.data["size"]
        xSample, ySample = self.data["input"], self.data["output"]
        pc_degree, n_iter, n_kernel_pdf = (self.alg_opts[key] for key 
                           in ("pc_degree", "n_iter", "n_kernel_pdf"))
        
        # Specify a multivariate polynomial orthonormal basis
        univariatePolyList = []
        for i in range(dim):
            dist = myDistribution.getMarginal(i)
            univariatePolyList.append( StandardDistributionPolynomialFactory(dist) )
        polyColl = PolynomialFamilyCollection(univariatePolyList)
        enumerateFunction = EnumerateFunction(dim)
        multivariateBasis = OrthogonalProductPolynomialFactory(polyColl, enumerateFunction)
        
        # Define the LAR strategy
        basisSequenceFactory = LAR()
        fittingAlgorithm = CorrectedLeaveOneOut()
        approximationAlgorithm = LeastSquaresMetaModelSelectionFactory(basisSequenceFactory, fittingAlgorithm)
        
        # Define the truncation strategy
        P = enumerateFunction.getStrataCumulatedCardinal(pc_degree)
        truncatureBasisStrategy = FixedStrategy(OrthogonalBasis(multivariateBasis), P)
        evalStrategy = LeastSquaresStrategy(myDistribution, approximationAlgorithm)
        weights = [1]*n_sample
        
        # Construct the chaos approximation
        polynomialChaosAlgorithm = FunctionalChaosAlgorithm(xSample, weights, ySample, 
                                                            myDistribution, truncatureBasisStrategy, 
                                                            evalStrategy)
        polynomialChaosAlgorithm.run()
        
        # Get the chaos results
        result = polynomialChaosAlgorithm.getResult()
        metamodel = result.getMetaModel()
        chaosRV = FunctionalChaosRandomVector(result)
        zSample = metamodel(xSample)
        
        # Carry out the i-gPC procedure
        #
        metamodel_list = [metamodel]
        for k in range(n_iter):
            #
            # KLUDGE: Approximate the chaos output distribution using a kernel PDF
            # estimator.
            # This may be a crude approach possibly leading to numerical instabilities. 
            # This should be improved in a future version.
            zBigSample = chaosRV.getSample(n_kernel_pdf)
            kernel = KernelSmoothing()
            myBandwith = kernel.computeSilvermanBandwidth(zBigSample)
            dist_Z = kernel.build(zBigSample, myBandwith)
            myDistribution_Z = ComposedDistribution([dist_Z])
            #
            # Construct an ad-hoc orthonormal polynomial basis of the chaos measure
            polyColl_Z = PolynomialFamilyCollection([StandardDistributionPolynomialFactory(dist_Z)])
            enumerateFunction = EnumerateFunction(1)
            multivariateBasis_Z = OrthogonalProductPolynomialFactory(polyColl_Z, enumerateFunction)
            truncatureBasisStrategy_Z = FixedStrategy(OrthogonalBasis(multivariateBasis_Z), pc_degree+1)
            evalStrategy_Z = LeastSquaresStrategy(dist_Z, approximationAlgorithm)
            #
            # Compute a chaos approximation of the relationship between the previous chaos output and 
            # the acutal output.
            polynomialChaosAlgorithm = FunctionalChaosAlgorithm(zSample, weights, ySample, 
                                                                myDistribution_Z, truncatureBasisStrategy_Z, 
                                                                evalStrategy_Z)
            polynomialChaosAlgorithm.run()
            #
            # Get results
            result = polynomialChaosAlgorithm.getResult()
            metamodel = result.getMetaModel()
            chaosRV = FunctionalChaosRandomVector(result)
            zSample = metamodel(zSample)
            metamodel_list.append(metamodel)
        
        self.metamodels = metamodel_list
    
    
    def __call__(self, xSample):
        pass
    
    
    def plot_results(self, i_input, i_output, model=None):
        """
        Plot the chaos approximations obtained at the several 
        iterations of IgPC.
        """
        
        xSample = self.data["input"].getMarginal(i_input)
        ySample = self.data["output"].getMarginal(i_output)
        x = linspace(xSample.getMin()[0], xSample.getMax()[0], 80)
        kernel = KernelSmoothing()
        
        fig1, fig2 = plt.figure(1), plt.figure(2)
        ax1, ax2 = (fig.add_subplot(111) for fig in (fig1, fig2))
        
        xBigSample = self.distribution.getSample(10000)
        
        if model != None:
            ax1.plot(x, model(c_[x]), '--b',
                    label="Actual response")
            yBigSample = model(xBigSample)
            kernelDist = kernel.build(yBigSample)
#            ax2.plot(x, kernelDist.computePDF(NumericalSample(c_[x])),
#                     '--b', label="Actual response")
            ax2.hist(array(yBigSample).ravel(), bins=100,
                     label="Actual response", normed=True)
        
        zSample = NumericalSample(c_[x])
        zBigSample = xBigSample
        for i, metamodel in enumerate(self.metamodels):
            zSample  = metamodel(zSample)
            zBigSample = metamodel(zBigSample)
            kernelDist = kernel.build(zBigSample)
            ax1.plot(x, array(zSample).ravel(), 
                     label=("i-gPC - Iter %d" %i))
            ax2.plot(x, kernelDist.computePDF(NumericalSample(c_[x])), 
                     label=("i-gPC - Iter %d" %i))
            
        [[ax.legend(loc=0), ax.set_xlabel("x")] for ax in (ax1, ax2)]
        ax1.set_ylabel("y=f(x)")
        ax2.set_ylabel("Probability density function")

        return fig1, fig2
        
        
    
    
if __name__ == "__main__":
    
    
    import numpy as np
    import pylab as plt
    from get_sample import get_sample
    
    plt.close("all")
    RandomGenerator_SetSeed(63)
    
    # Create a data set based on the Heaviside function
    
    class modelePYTHON(OpenTURNSPythonFunction):
        def __init__(self) :
            OpenTURNSPythonFunction.__init__(self,1,1)
        # that following method gives the implementation of modelePYTHON
        def _exec(self,x) :
            """ Univariate Heaviside function """
            if x[0] <= 0.2:
                return [1.]
            else:
                return [0.]

    model = NumericalMathFunction(modelePYTHON())

    dist_X = Uniform(-1,1)
    myDistribution = ComposedDistribution([dist_X])
    myRandomVector = RandomVector(myDistribution)
    
    n_sample = 100
    xSample = get_sample(myRandomVector, n_sample, "QMC") # generate a quasi-Monte Carlo sample
    ySample = model(xSample)    
    np.savetxt("input.txt", np.array(xSample))
    np.savetxt("output.txt", np.array(ySample))
    
    n_kernel_pdf = 5000
    n_iter = 6
    pc_degree = 8
    
    study = IterativeGPC("input.txt", "output.txt", myDistribution,
                    pc_degree, n_iter=n_iter, n_kernel_pdf=10000)
                    
    study.perform_igpc()
    map(plt.show, study.plot_results(0, 0, model=model))

Auxiliary function "get_sample.py"

from numpy import array
from openturns import LowDiscrepancyExperiment, \
                      SobolSequence

def get_sample(vectX, N, method="MC"):
    """
    Generate a sample according to a given 
    distribution.
    """
    if method == "MC":
        X = vectX.getSample(N)
    elif method == "QMC":
        dim_in = vectX.getDimension()
        sequence = LowDiscrepancyExperiment(SobolSequence(dim_in), 
                   vectX.getDistribution(), N)
        X = array(sequence.generate())
    return X
  • Posted: 2014-07-15 17:10 (Updated: 2014-07-29 13:23)
  • Author: blatman
  • Categories: (none)
  • Comments (0)