Posts by author blatman

# Radar-shaped statistic to check the uniformity of a design of experiments

Here is a Python translation of a R function allowing to check the uniformity of a design of experiments obtained by OpenTURNS (as a NumericalSample object). It is an implementation of the "Radar-shaped statistic" method detailed in the PhD thesis of J. Franco (2008).

The program requires a C shared library 'unif_radar_lib.so' that can be compiled from a source file 'C_rss.c', displayed below. For example the following commands can be used in Linux console with the gcc compiler:

gcc -c -Wall -Werror -fpic -std=c99 C_rss.c


Then the 'unif_radar.py' file can be executed directly or called from another user-defined function. Calling directly the enclosed version will run an example with a design based on a low-discrepancy sequence.

#!/usr/bin/env python
# coding = utf-8

"""
Check the uniformity of a design of experiments using the radar-shaped
statistic.

The uniformity of a multidimensional design is checked in each plane defined
by a couple (Xi, Xj) of input parameters. In every plane, the distribution of
the orthogonal projections of the design points onto an axis with an angle
'theta' with respect to the Xi and Xj axes is studied. The projection
distribution is compared to the uniform distribution by means of a given
distance, e.g. the Kolmogorov-Smirnov one. The greatest discrepancy is
retained and a statistical test is performed to challenge the uniformity
assumption. Moreover, the design is shown in the "worst" plane (Xi, Xj).

The method was designed by J. Franco (2008). This code is a Python translation
of a R function in the R library DiceDesign (Franco et al., 2015).

References
----------
Franco J. (2008). Planification d'experiences numeriques en phase exploratoire
pour la simulation des phenomenes complexes, PhD Thesis, Ecole Nationale
Superieure des Mines de Saint-Etienne.

Franco J., Dupuy D., Roustant O. (2015). DiceDesign: Designs of Computer
Experiments. R package version 1.7,
URL http://CRAN.R-project.org/package=DiceDesign.
"""

import numpy as np
import ctypes
import matplotlib.pyplot as plt
import openturns as ot

def scale_data(x):
"""Transform the data into uniformly distributed values over [-1,1]"""
x = ot.NumericalSample(x)
xs = []
for i in range(x.getDimension()):
xi = x[:,i]
xs.append( [2.*xi.computeEmpiricalCDF(xij)-1. for xij in xi] )
return ot.NumericalSample(np.array(xs).T)

def ks_stat(x):
"""Evaluate the Kolmogorov-Smirnov statistic"""
n = float(len(x))
ordered_sample = np.sort(x)
Dplus = np.max( (1.+np.arange(n))/n - ordered_sample )
Dminus = np.max( ordered_sample - np.arange(n)/n )
stat = np.max([Dplus, Dminus])
return stat

def ks_quantile(n, alpha = 0.05, eps=np.finfo(float).eps):
"""Evaluate the Kolmogorov-Smirnov alpha-quantile"""
index, = np.where(np.abs([0.1, 0.05, 0.025, 0.01] - alpha*np.ones(4)) < eps)
q = [1.224, 1.358, 1.48, 1.628][index]/(np.sqrt(n)+0.12+0.11/np.sqrt(n))

return q

def check_design(x, n_theta=360, alpha=0.95, c_lib='./unif_radar_lib.so', \
eps=np.finfo(float).eps):
"""
Check the uniformity of the data using the radar-shaped statistic.

Plots showing the greatest distance to the uniformity in the "worst" plane
of inputs (Xi, Xj) are created. The results of the statistical test are
printed.

Parameters
----------
x: OpenTURNS NumericalSample
Design of experiments.

n_theta: integer, optional
Number of angular positions for the projection axis (default is 360).

alpha: float, optional
Level of the statistical uniformity test (default is 0.95).

c_lib: string, optional
Name of the C shared library used to speed up the iterations through
the pairs of input variables (default is the location in the current

eps: float, optional
Tolerance for the small axis orientation angle (default is the numpy
value 'np.finfo(float).eps')
"""

print("")
print("2D radial scanning statistic (RSS) using Kolmogorov-Smirnov statistic")
print("Discretization step (in degree): %.1f" %(180./n_theta))
print("Maximum of RSS values (global statistic) per pair of dimensions\n")

design = np.array(scale_data(x))
n, d = design.shape

# Discretization of the angular position
theta_degree = np.linspace(0., 180., n_theta+1)[0:-1]
theta = theta_degree*np.pi/180. # deg -> rad
cos_theta, sin_theta = np.cos(theta), np.sin(theta)
subdiv_halfcircle = np.vstack((cos_theta, sin_theta)).T

# Specify the C code library and types
my_library = ctypes.CDLL(c_lib)
[ctypes.POINTER(ctypes.c_double)]*4 + [ctypes.c_int]*2
ctypes.POINTER(ctypes.c_double)

global_stat = np.zeros((d, d))
global_stat_max = 0

# Iterate through pairs of input parameters

for i in range(d-1):
x1 = design[:,i]

for j in range(i+1, d):

x2 = design[:,j]

x1_c = (ctypes.c_double * n)()
for index, value in enumerate(x1):
x1_c[index] = value
x2_c = (ctypes.c_double * n)()
for index, value in enumerate(x2):
x2_c[index] = value
ax_c = (ctypes.c_double * n_theta)()
for index, value in enumerate(cos_theta):
ax_c[index] = value
ay_c = (ctypes.c_double * n_theta)()
for index, value in enumerate(sin_theta):
ay_c[index] = value
n_c = ctypes.c_int(n)
n_theta_c = ctypes.c_int(n_theta)

out = my_library.C_rss2Dloop(x1_c, x2_c, ax_c, ay_c, n_c, n_theta_c)

projections2 = np.array(out[:n*n_theta])
projections2 = projections2.reshape((n, n_theta), order="F")

# Iterate through the angle positions

anglewise_stat_ij = np.zeros(n_theta)
for i_theta in range(n_theta):
anglewise_stat_ij[i_theta] = ks_stat(projections2[:,i_theta])

# Store the worst pvalue found for the parameters (Xi, Xj)
global_stat_ij = np.max(anglewise_stat_ij)
global_stat[i,j] = global_stat_ij
global_stat[j,i] = global_stat_ij

# Update the worst pvalue found for all parameter couples
if global_stat_ij > global_stat_max:
global_stat_max = global_stat_ij.copy()
pair_worst = [i,j]
anglewise_stat = anglewise_stat_ij.copy()

print("Plane (%d,%d)  - Greatest observed Kolmogorov distance: %.4f" \
% (i+1, j+1, global_stat_ij))

# Plot results

gof_test_stat = ks_quantile(n, 1.-alpha)

design_worst = design[:,pair_worst]
anglewise_stat_max = np.max(anglewise_stat)
index_max = np.argmax(anglewise_stat)
cos_theta_max = subdiv_halfcircle[index_max,0]
sin_theta_max = subdiv_halfcircle[index_max,1]
dir_max = subdiv_halfcircle[index_max,:]

plt.figure()
plt.subplot(111, aspect='equal')
plt.plot(design_worst[:,0], design_worst[:,1], 'ob')
plt.xlabel(r"$X_{"+str(pair_worst+1)+"}$", size=16)
plt.ylabel(r"$X_{"+str(pair_worst+1)+"}$", size=16)
plt.title("Worst plane", size=16)

plt.figure()
plt.subplot(111, aspect='equal')
plt.plot(rx, ry, '-b', label="KS statistic")
theta_aux = np.linspace(0., 2.*np.pi+0.1, 80)
plt.plot(gof_test_stat*np.cos(theta_aux), gof_test_stat*np.sin(theta_aux), \
'-k', label="KS rejection threshold")
plt.legend(loc="best")
plt.title(r"Plane (X%d , X%d) - Worst angle: %d$^{\circ}$" \
% (pair_worst+1, pair_worst+1, theta_degree[index_max]), \
size = 16)

plt.figure()
plt.subplot(111, aspect='equal')
plt.plot(design_worst[:,0], design_worst[:,1], 'ob')
projections = np.dot(design_worst, dir_max)
plt.plot(projections*dir_max, projections*dir_max, 'or')
if np.abs(cos_theta_max) < eps:
plt.plot([0,0], [-1,1], "-r")
else:
plt.plot([-1,1], np.array([-1,1])*sin_theta_max/cos_theta_max, "-r")
for i in range(n):
plt.plot([design_worst[i,0], projections[i]*cos_theta_max], \
[design_worst[i,1], projections[i]*sin_theta_max], 'r:')
plt.xlabel(r"$X_{"+str(pair_worst+1)+"}$", size=16)
plt.ylabel(r"$X_{"+str(pair_worst+1)+"}$", size=16)
plt.xlim([-1,1])
plt.ylim([-1,1])
plt.title(r"$\theta=%d^{\circ}$" % (theta_degree[index_max]), size=20)

print("\n\nKolmogorov-Smirnov quantile: %.4f\n" % global_stat_max)
print("Kolmogorov-Smirnov threshold: %.2f\n" % gof_test_stat)
if global_stat_max < gof_test_stat:
print("Uniformity hypothesis NOT REJECTED")
else:
print("Uniformity hypothesis REJECTED at level %.2f\n" % alpha)

if __name__ == "__main__":

"""Test with a low discrepancy sequence"""

plt.ion() # interactive mode ON

n = 60
d = 20

mySobolSeq = ot.SobolSequence(d)
design = 2*np.array( mySobolSeq.generate(n) )-1.

check_design(design)


#include <math.h>
#include <stdio.h>
#include <string.h>
#include <stdlib.h>

#define precision 0.000000000001

#define SQUARE(x) ((x)*(x))
#define CUBE(x) ((x)*(x)*(x))
#define MAX(a,b) (a<b ? b:a)

double C_uniform_cdf(const double *p) {

return(0.5 * (MAX(*p + 1., 0.) - MAX(*p - 1., 0.)));

}

double C_sumof2uniforms_cdf(const double *p, const double *ax, const double *ay, const int *k) {

double s = 0.;

if ((fabs(ax[*k]) > precision) && (fabs(ay[*k]) > precision)) {
s += SQUARE(MAX(*p - ax[*k] - ay[*k], 0.));
s += SQUARE(MAX(*p + ax[*k] + ay[*k], 0.));
s -= SQUARE(MAX(*p - ax[*k] + ay[*k], 0.));
s -= SQUARE(MAX(*p + ax[*k] - ay[*k], 0.));
return(s / (8 * ax[*k] * ay[*k]));
}
else return(C_uniform_cdf(p));

}

double C_sumof3uniforms_cdf(const double *p, const double *ax, const double *ay, const double *az, const int *k) {

double s = 0.;

if ((fabs(ax[*k]) > precision) && (fabs(ay[*k]) > precision) && (fabs(az[*k]) > precision)) {
s += CUBE(MAX(*p + ax[*k] + ay[*k] + az[*k], 0.));
s += CUBE(MAX(*p + ax[*k] - ay[*k] - az[*k], 0.));
s += CUBE(MAX(*p - ax[*k] + ay[*k] - az[*k], 0.));
s += CUBE(MAX(*p - ax[*k] - ay[*k] + az[*k], 0.));
s -= CUBE(MAX(*p - ax[*k] - ay[*k] - az[*k], 0.));
s -= CUBE(MAX(*p - ax[*k] + ay[*k] + az[*k], 0.));
s -= CUBE(MAX(*p + ax[*k] - ay[*k] + az[*k], 0.));
s -= CUBE(MAX(*p + ax[*k] + ay[*k] - az[*k], 0.));
return(s / (48 * ax[*k] * ay[*k] * az[*k]));
}
else if ((fabs(ax[*k]) <= precision)) return(C_sumof2uniforms_cdf(p, ay, az, k));
else if ((fabs(ay[*k]) <= precision)) return(C_sumof2uniforms_cdf(p, ax, az, k));
else return(C_sumof2uniforms_cdf(p, ax, ay, k));
}

void C_rss2Dloop(const double *x, const double *y, const double *ax, const double *ay, const int *n, const int *n_theta, double *ans) {

double p;

for (int i_theta=0; i_theta < *n_theta; i_theta++) {

for (int i=0; i < *n; i++) {
p = x[i]*ax[i_theta] + y[i]*ay[i_theta];
ans[i + *n * i_theta] = C_sumof2uniforms_cdf(&p, ax, ay, &i_theta);
}

}

}

void C_rss3Dloop(const double *x, const double *y, const double *z, const double *ax, const double *ay, const double *az, const int *n, const int *n_theta, const int *n_phi, const int *i_phi, double *ans) {

double p;
int k;

for (int i_theta=0; i_theta < *n_theta; i_theta++) {

for (int i=0; i < *n; i++) {
k = i_theta + *n_theta * *i_phi;
p = x[i]*ax[k] + y[i]*ay[k] + z[i]*az[k];
ans[i + *n * i_theta] = C_sumof3uniforms_cdf(&p, ax, ay, az, &k);
}

}
}



# Implementation of the Iterative Generalized Polynomial Chaos (i-gPC)

Hi,

Here is an OpenTurns-based implementation of the so-called i-gPC method designed by Poette & Lucor (2012). It allows the approximation of strongly nonlinear responses of models depending on random parameters. The strategy was originally applied in the field of fluid dynamics, precisely to stochastic partial differential equations modelling shocks, e.g. Burgers equations.

The i-gPC procedure consists in first approximating the input (X)-output (Y) relationship by a classical chaos expansion (called Y0), and then to construct a new chaos (Y1) to approximate the mapping between the residuals (Y-Y0) and the response Y using an orthonormal basis with respect to the measure of (Y-Y0). Then a new chaos is sought to mimick the relationship between (Y-Y1) and (Y), and so on.

The implementation is successful in some cases, as for the 1D Heaviside function example provided with the OT script when some "good" algorithm parameters are selected. Nonetheless it may suffer severe numerical instability in some cases. This problem arises when creating at eash igPC step an ad-hoc orthonormal basis from a kernel estimate of the density functions of the current residuals.

This drawback should be overcome in a future version. Any suggestion/comment is welcome!

Main script "igPC.py"

# -*- coding: utf-8 -*-

"""
Performs iterative generalized polynomial chaos approximation.

This is an implementation of the approach designed by [Poette,
Lucor (2012)]. This approach was shown to be more accurate than
ordinary chaos expansion without requiring additional computer
experiments. It makes it possible to approximate strongly nonlinear
model responses and to tackle the Gibbs phenomenon in the presence
of discontinuities.

Reference
---------
G. Poette, D. Lucor, Non intrusive iterative stochastic spectral
representation with application to compressible gas dynamics,
Journal of Computational Physics 231 (2012), pp. 3587-3609
"""

import matplotlib.pyplot as plt
from numpy import loadtxt, c_, linspace, array
from openturns import *

class IterativeGPC(object):
"""
Iterative generalized polynomial chaos study

Parameters
----------
in_filename: string
Name of the file containing the input sample.

out_filename: string
Name of the file containing the output sample.

myDistribution: *ComposedDistribution* openturns object.
Joint distribution from which the input sample was
generated.

pc_degree: integer
Total degree of the polynomial chaos expansions.

n_iter: integer, optional
Number of igPC iterations. If n_iter is not given
(default 4).

n_kernel_pdf: integer, optional
Size of the sample that is generated from the
several chaos approximations in order to estimate the
density using kernel smoothing (default 5,000).

Attributes
----------
data: dict
Dictionary that contains the input and the output samples
(*NumericalSample* objects) as well as their (common)
size and their respective dimensions.

alg_opts: dict
Dictionary of the parameters to pass to the perform_igpc
method.

metamodels: dict
List of metamodels (*NumericalMathFunction* objects)
obtained at each iteration of igPC.
"""

def __init__(self, in_filename, out_filename, myDistribution,
pc_degree, n_iter=4, n_kernel_pdf=5000):

# Import the input and the output data
xSample, ySample = ( NumericalSample( c_[ loadtxt(fic) ] )
for fic in(in_filename, out_filename) )
(n_sample_in, dim_in), (n_sample_out, dim_out) = ((sample.getSize(),
sample.getDimension()) for sample in (xSample, ySample))
if n_sample_in != n_sample_out:
raise ValueError("The sizes of the input and the output "
+ "samples must be equal.")
else:
self.data = {"input": xSample, "output": ySample,
"size": n_sample_in, "dim_in": dim_in,
"dim_out": dim_out}

# Get the input joint distribution
if dim_in != myDistribution.getDimension():
raise ValueError("The dimensions of the input sample and "
+ "the input joint distribution must be "
+ "equal.")
else:
self.distribution = myDistribution

# Get the igPC options
self.alg_opts = {"pc_degree": pc_degree, "n_iter": n_iter,
"n_kernel_pdf": n_kernel_pdf}

def perform_igpc(self):
"""
Performs iterative generalized polynomial chaos approximation.
"""

# Extract attributes
myDistribution = self.distribution
dim = self.data["dim_in"]
n_sample = self.data["size"]
xSample, ySample = self.data["input"], self.data["output"]
pc_degree, n_iter, n_kernel_pdf = (self.alg_opts[key] for key
in ("pc_degree", "n_iter", "n_kernel_pdf"))

# Specify a multivariate polynomial orthonormal basis
univariatePolyList = []
for i in range(dim):
dist = myDistribution.getMarginal(i)
univariatePolyList.append( StandardDistributionPolynomialFactory(dist) )
polyColl = PolynomialFamilyCollection(univariatePolyList)
enumerateFunction = EnumerateFunction(dim)
multivariateBasis = OrthogonalProductPolynomialFactory(polyColl, enumerateFunction)

# Define the LAR strategy
basisSequenceFactory = LAR()
fittingAlgorithm = CorrectedLeaveOneOut()
approximationAlgorithm = LeastSquaresMetaModelSelectionFactory(basisSequenceFactory, fittingAlgorithm)

# Define the truncation strategy
P = enumerateFunction.getStrataCumulatedCardinal(pc_degree)
truncatureBasisStrategy = FixedStrategy(OrthogonalBasis(multivariateBasis), P)
evalStrategy = LeastSquaresStrategy(myDistribution, approximationAlgorithm)
weights = *n_sample

# Construct the chaos approximation
polynomialChaosAlgorithm = FunctionalChaosAlgorithm(xSample, weights, ySample,
myDistribution, truncatureBasisStrategy,
evalStrategy)
polynomialChaosAlgorithm.run()

# Get the chaos results
result = polynomialChaosAlgorithm.getResult()
metamodel = result.getMetaModel()
chaosRV = FunctionalChaosRandomVector(result)
zSample = metamodel(xSample)

# Carry out the i-gPC procedure
#
metamodel_list = [metamodel]
for k in range(n_iter):
#
# KLUDGE: Approximate the chaos output distribution using a kernel PDF
# estimator.
# This may be a crude approach possibly leading to numerical instabilities.
# This should be improved in a future version.
zBigSample = chaosRV.getSample(n_kernel_pdf)
kernel = KernelSmoothing()
myBandwith = kernel.computeSilvermanBandwidth(zBigSample)
dist_Z = kernel.build(zBigSample, myBandwith)
myDistribution_Z = ComposedDistribution([dist_Z])
#
# Construct an ad-hoc orthonormal polynomial basis of the chaos measure
polyColl_Z = PolynomialFamilyCollection([StandardDistributionPolynomialFactory(dist_Z)])
enumerateFunction = EnumerateFunction(1)
multivariateBasis_Z = OrthogonalProductPolynomialFactory(polyColl_Z, enumerateFunction)
truncatureBasisStrategy_Z = FixedStrategy(OrthogonalBasis(multivariateBasis_Z), pc_degree+1)
evalStrategy_Z = LeastSquaresStrategy(dist_Z, approximationAlgorithm)
#
# Compute a chaos approximation of the relationship between the previous chaos output and
# the acutal output.
polynomialChaosAlgorithm = FunctionalChaosAlgorithm(zSample, weights, ySample,
myDistribution_Z, truncatureBasisStrategy_Z,
evalStrategy_Z)
polynomialChaosAlgorithm.run()
#
# Get results
result = polynomialChaosAlgorithm.getResult()
metamodel = result.getMetaModel()
chaosRV = FunctionalChaosRandomVector(result)
zSample = metamodel(zSample)
metamodel_list.append(metamodel)

self.metamodels = metamodel_list

def __call__(self, xSample):
pass

def plot_results(self, i_input, i_output, model=None):
"""
Plot the chaos approximations obtained at the several
iterations of IgPC.
"""

xSample = self.data["input"].getMarginal(i_input)
ySample = self.data["output"].getMarginal(i_output)
x = linspace(xSample.getMin(), xSample.getMax(), 80)
kernel = KernelSmoothing()

fig1, fig2 = plt.figure(1), plt.figure(2)
ax1, ax2 = (fig.add_subplot(111) for fig in (fig1, fig2))

xBigSample = self.distribution.getSample(10000)

if model != None:
ax1.plot(x, model(c_[x]), '--b',
label="Actual response")
yBigSample = model(xBigSample)
kernelDist = kernel.build(yBigSample)
#            ax2.plot(x, kernelDist.computePDF(NumericalSample(c_[x])),
#                     '--b', label="Actual response")
ax2.hist(array(yBigSample).ravel(), bins=100,
label="Actual response", normed=True)

zSample = NumericalSample(c_[x])
zBigSample = xBigSample
for i, metamodel in enumerate(self.metamodels):
zSample  = metamodel(zSample)
zBigSample = metamodel(zBigSample)
kernelDist = kernel.build(zBigSample)
ax1.plot(x, array(zSample).ravel(),
label=("i-gPC - Iter %d" %i))
ax2.plot(x, kernelDist.computePDF(NumericalSample(c_[x])),
label=("i-gPC - Iter %d" %i))

[[ax.legend(loc=0), ax.set_xlabel("x")] for ax in (ax1, ax2)]
ax1.set_ylabel("y=f(x)")
ax2.set_ylabel("Probability density function")

return fig1, fig2

if __name__ == "__main__":

import numpy as np
import pylab as plt
from get_sample import get_sample

plt.close("all")
RandomGenerator_SetSeed(63)

# Create a data set based on the Heaviside function

class modelePYTHON(OpenTURNSPythonFunction):
def __init__(self) :
OpenTURNSPythonFunction.__init__(self,1,1)
# that following method gives the implementation of modelePYTHON
def _exec(self,x) :
""" Univariate Heaviside function """
if x <= 0.2:
return [1.]
else:
return [0.]

model = NumericalMathFunction(modelePYTHON())

dist_X = Uniform(-1,1)
myDistribution = ComposedDistribution([dist_X])
myRandomVector = RandomVector(myDistribution)

n_sample = 100
xSample = get_sample(myRandomVector, n_sample, "QMC") # generate a quasi-Monte Carlo sample
ySample = model(xSample)
np.savetxt("input.txt", np.array(xSample))
np.savetxt("output.txt", np.array(ySample))

n_kernel_pdf = 5000
n_iter = 6
pc_degree = 8

study = IterativeGPC("input.txt", "output.txt", myDistribution,
pc_degree, n_iter=n_iter, n_kernel_pdf=10000)

study.perform_igpc()
map(plt.show, study.plot_results(0, 0, model=model))


Auxiliary function "get_sample.py"

from numpy import array
from openturns import LowDiscrepancyExperiment, \
SobolSequence

def get_sample(vectX, N, method="MC"):
"""
Generate a sample according to a given
distribution.
"""
if method == "MC":
X = vectX.getSample(N)
elif method == "QMC":
dim_in = vectX.getDimension()
sequence = LowDiscrepancyExperiment(SobolSequence(dim_in),
vectX.getDistribution(), N)
X = array(sequence.generate())
return X

• Posted: 2014-07-15 17:10 (Updated: 2014-07-29 13:23)
• Author: blatman
• Categories: (none)