"""
.. inheritance-diagram:: pyopus.design.sensitivity
:parts: 1
**Sensitivity analysis (PyOPUS subsystem name: SENS)**
Uses finite differences to estimate the sensitivities of performance
measures to input parameters.
Parameter screening is possible based on sensitivity results.
"""
from ..misc.debug import DbgMsgOut, DbgMsg
from ..evaluator.performance import PerformanceEvaluator, updateAnalysisCount
from ..evaluator.auxfunc import listParamDesc, paramDict
from ..parallel.cooperative import cOS
from .. import PyOpusError
import numpy as np
from pprint import pprint
__all__ = [ 'Sensitivity', 'screen' ]
[docs]class Sensitivity(object):
"""
Callable object that computes sensitivities.
See :class:`~pyopus.evaluator.performance.PerformanceEvaluator`
for details on *evaluator*.
The parameters are given by a dictionary *paramDesc* where
parameter name is the key. One entry in this dictionary
corresponds to one parameter. Every entry is a dictionary
with the following members
* lo .. lower bound (no lower bound if omitted).
* hi .. upper bound
* init .. parameter value where the sensitivity will be
computed (optional)
*paramDesc* can be a list in which case the complete set of
parameters is the union of parameters specified by list
memebers.
*paramNames* specifies the order of parameters in the
sensitivity vector. If not specified a default parameter ordering
is constructed and stored in the ``paramNames`` member.
Setting *debug* to a value greater than 0 turns on debug messages.
The verbosity is proportional to the specified number.
*diffType* is the finite difference type
(``central``, ``forward``, or ``backward``)
*absPerturb* specifies the absolute perturbation. If it is a single
value it applies to all parameters. If it is a dictionary it applies
to parameters listed in the dictionary.
*relPerturbHiLo* specifies the relative perturbation with respect
to the allowed parameter span given by ``lo`` and ``hi`` members of
the *paramDesc* dictionary. If it is a single value it applies to all
parameters for which a finite ``lo`` and ``hi`` value are specified.
If it is a dictionary it applies only to parameters listed in the
dictionary (provided they have finite ``lo`` and ``hi`` specified).
*relPerturbValue* specified teh relative perturbation with respect
to the parameter value. If it is a single value it applies to all
parameters with nonzero value. If it is a dictionary it applies to
parameters listed in the dictionary (provided they have nonzero
value).
If *clipping* is set to ``True`` the perturbed values are verified
against ``lo`` and ``hi``. If a bound is violated the finite difference
type for that parameter is change so that the violation is remedied.
If *spawnerLevel* is not greater than 1 the evaluations of
perturbed points are dispatched to available computing nodes.
The number of circuit evaluations is stored in the ``neval`` member.
Calling convention: obj(x0) where *x0* is a dictionary of parameter
values specifying the point where the sensitivity should be computed.
If omitted the values in the ``init`` member of the *paramDesc* are
used.
Returns a tuple with performance deltas as the first member,
the parameter perturbations as the second member, and the
*analysisCount* dictionary as the third member.
The performance deltas are returned in a double dictionary
where the first index is the performance measure name and the
second index is the corner name. Every member is a list of
values coresponding to parameters ordered as specified by
*paramNames*.
If the computation of a performance delta for some parameter
perturbation fails (i.e. any of the points required for computing the
performance delta fail to produce a valid result) the corresponding
entry in the list is ``None``.
Dividing performance deltas by parameter perturbations
results in the sesitivities. This holds regardless of the type of
perturbation specified with *diffType*.
The performance deltas are stored in the *results* member.
The parameter perturbations are stored in the *delta* member.
Objects of this type store the number of analyses performed
during the last call to the object in the *analysisCount*
member.
"""
def __init__(self, evaluator, paramDesc, paramNames=None, debug=0,
absPerturb=None, relPerturbHiLo=None, relPerturbValue=None,
clipping=True, diffType="central",
spawnerLevel=1
):
if diffType=='central':
self.diffType=0
elif diffType=='forward':
self.diffType=1
elif diffType=='backward':
self.diffType=2
else:
raise PyOpusError(DbgMsg("SENS", "Unknown finite differentiation type '%s'." % (diffType)))
self.absPerturb=absPerturb
self.relPerturbHiLo=relPerturbHiLo
self.relPerturbValue=relPerturbValue
self.evaluator=evaluator
self.paramNames=paramNames
self.debug=debug
self.spawnerLevel=spawnerLevel
self.clipping=clipping
# Build the list of parameters by merging dictionaries
if type(paramDesc) is list or type(paramDesc) is tuple:
self.paramDesc={}
for entry in paramDesc:
self.paramDesc.update(entry)
else:
self.paramDesc=paramDesc
if self.paramNames is None:
self.paramNames=list(self.paramDesc.keys())
def preparePerturbations(self, x0):
self.val=[]
self.delta=[]
self.ptype=[]
allCentral=True
for name in self.paramNames:
par=self.paramDesc[name]
val=x0[name]
lo=par['lo'] if 'lo' in par else -np.inf
hi=par['hi'] if 'hi' in par else np.inf
self.val.append(val)
# Check wrt range
if val<lo or val>hi:
raise PyOpusError(DbgMsg("SENS", "Parameter value '%s' outside bounds." % (name)))
# Compute perturbation
hlrange=hi-lo
if self.absPerturb is not None and type(self.absPerturb) is not dict:
# use abs perturb, global
delta=self.absPerturb
elif self.absPerturb is not None and name in self.absPerturb:
# use abs perturb, local
delta=self.absPerturb[name]
elif (
self.relPerturbHiLo is not None and np.isfinite(hlrange) and
type(self.relPerturbHiLo) is not dict
):
delta=hlrange*self.relPerturbHiLo
elif (
self.relPerturbHiLo is not None and np.isfinite(hlrange) and
name in self.relPerturbHiLo
):
delta=hlrange*self.relPerturbHiLo[name]
elif (
self.relPerturbValue is not None and val!=0 and
type(self.relPerturbValue) is not dict
):
delta=val*self.relPerturbValue
elif (
self.relPerturbValue is not None and val!=0 and
name in self.relPerturbValue
):
delta=val*self.relPerturbValue[name]
else:
raise PyOpusError(DbgMsg("SENS", "Cannot determine perturbation for parameter '%s'." % (name)))
self.delta.append(delta)
ptype=self.diffType
# Clipping
if self.clipping:
if ptype==0 and val+delta/2>hi:
# Switch to backward
ptype=2
if self.debug:
DbgMsgOut("SENS", "Switching from central to backward difference for parameter '%s'." % (name))
if ptype==0 and val-delta/2<lo:
# Switch to forward
ptype=1
if self.debug:
DbgMsgOut("SENS", "Switching from central to forward difference for parameter '%s'." % (name))
if ptype==1 and val+delta>hi:
# Switch to backward
ptype=2
if self.debug:
DbgMsgOut("SENS", "Switching from forward to backward difference for parameter '%s'." % (name))
if ptype==2 and val-delta<lo:
# Switch to forward
ptype=1
if self.debug:
DbgMsgOut("SENS", "Switching from backward to forward difference for parameter '%s'." % (name))
if ptype==1 and val+delta>hi:
# Give up
raise PyOpusError(DbgMsg("SENS", "All perturbations of parameter '%s' are outside bounds." % (name)))
if ptype==2 and val-delta<lo:
# Give up
raise PyOpusError(DbgMsg("SENS", "All perturbations of parameter '%s' are outside bounds." % (name)))
# All parameters central difference?
if ptype!=0:
allCentral=False
self.ptype.append(ptype)
# Compute perturbations
self.ndxp=[]
self.ndxn=[]
self.pval=[]
self.pndx=[]
self.psign=[]
atndx=0
if not allCentral:
# Need origin
self.pval.append(None)
self.psign.append(0)
self.pndx.append(None)
atndx+=1
for ii in range(len(self.paramNames)):
ptype=self.ptype[ii]
if ptype==0:
self.pval.append(self.val[ii]+self.delta[ii]/2)
self.pval.append(self.val[ii]-self.delta[ii]/2)
self.psign.extend([1,-1])
self.pndx.extend([ii,ii])
self.ndxp.append(atndx)
self.ndxn.append(atndx+1)
atndx+=2
elif ptype==1:
self.pval.append(self.val[ii]+self.delta[ii])
self.psign.append(1)
self.pndx.append(ii)
self.ndxp.append(atndx)
self.ndxn.append(0)
atndx+=1
else:
self.pval.append(self.val[ii]-self.delta[ii])
self.psign.append(-1)
self.pndx.append(ii)
self.ndxn.append(atndx)
self.ndxp.append(0)
atndx+=1
def jobGenerator(self, x0):
for ii in range(len(self.pval)):
pndx=self.pndx[ii]
if pndx is None:
yield (self.jobProcessor, [self.evaluator, x0, {}])
else:
yield (self.jobProcessor, [self.evaluator, x0, {self.paramNames[pndx]: self.pval[ii]}])
@staticmethod
def jobProcessor(ev, x0, dx):
x={}
x.update(x0)
x.update(dx)
result, anCount = ev(x)
return result, anCount
def jobCollector(self, results, analysisCount):
while True:
index, job, retval = (yield)
result, anCount = retval
if self.debug>1:
if self.psign[index]==0:
DbgMsgOut("SENS", "Computed performace at origin")
elif self.psign[index]>0:
DbgMsgOut("SENS", "Computed positive perturbation of parameter '%s'." % (self.paramNames[self.pndx[index]]))
else:
DbgMsgOut("SENS", "Computed negative perturbation of parameter '%s'." % (self.paramNames[self.pndx[index]]))
updateAnalysisCount(analysisCount, anCount)
results[index]=result
def __call__(self, x0=None):
if x0 is None:
x0={ name: desc['init'] for name, desc in self.paramDesc.items() }
# Reset analysis count
analysisCount={}
self.preparePerturbations(x0)
evresults=[None]*len(self.pval)
# Evaluate
cOS.dispatch(
jobList=self.jobGenerator(x0),
collector=self.jobCollector(evresults, analysisCount),
remote=self.spawnerLevel<=1
)
# Blank results, indexed by measure, corner, parameter index
diffs={}
# Compute deltas
for ii in range(len(self.paramNames)):
ndxp=self.ndxp[ii]
ndxn=self.ndxn[ii]
resp=evresults[ndxp]
resn=evresults[ndxn]
# All measures are always present
for measureName, dat in resp.items():
if measureName not in diffs:
diffs[measureName]={}
# All corners are always present
for cornerName, valp in dat.items():
if cornerName not in diffs[measureName]:
diffs[measureName][cornerName]=[]
valn=resn[measureName][cornerName]
# Do we have both?
if valp is not None and valn is not None:
diffs[measureName][cornerName].append(valp-valn)
else:
diffs[measureName][cornerName].append(None)
self.results=diffs
self.analysisCount=analysisCount
self.neval=len(self.pval)
self.delta=np.array(self.delta)
return diffs, self.delta, self.analysisCount
[docs] def diffVector(self, measureName, cornerName):
"""
Returns the vector of performance differences in the order
specified by *parameterNames* for the given *measureName*
and *cornerName*. If the performance difference is an array
(vector measures) the first index in the returned array is
the parameter index while the remaining indices correspond
to performance measure's indices.
If some performance difference is not computed due to errors
the corresponding components of the returned array are set to
``np.nan``.
If all performance measures fail to be computed for all
parameters the size of the array is unknown. In that case an
empty array is returned.
"""
diffs=self.results[measureName][cornerName]
# Scan all deltas, get vector length
shp=None
for diff in diffs:
if diff is not None:
shp=diff.shape
# Failed for all deltas
if shp is None:
return np.array([])
# Collect vector
vec=[]
for diff in diffs:
if diff is None:
# Failed for this delta
blank=np.zeros(shp)
blank.fill(np.nan)
vec.append(blank)
else:
vec.append(diff)
# Array, first index is parameter, the rest are performance measure components
return np.array(vec)
[docs]def screen(diffVector, delta, paramNames=None, contribThreshold=0.01, cumulativeThreshold=0.25, useSens=False, squared=True, debug=0):
"""
Performs parameter screening. Returns two lists of inidices
corresponding to parameters whose influence is below threshold
and parameters whose influsence is above threshold.
The indices are ordered by absolute sensitivity from the smallest
to the largest.
*diffVector* is the vector of performance differences corresponding
to perturbations of parameters specified by *paramNames*. Only scalar
performances can be screened (i.e. this must be a 1-D vector).
If *paramNames* is not specified parameters indices starting with 0
are used in the debug output.
*delta* is the vector of parameter perturbations.
A parameter is considered to have low influence if its relative
contribution is below *contribThreshold* and the relative cumulative
contribution of the set of all parameters with low influence does
not exceed *cumulativeThreshold*.
If *useSens* is set to ``True`` the performance measure
sensitivities are used instead of deltas. This makes sense only
if all parameters are expressed in the same units.
If *squared* is set to ``False`` the absolute sensitivities/deltas
are used instead of squared sensitivities/deltas in the computation
of the relative influence.
"""
# Default parameter names
if paramNames is None:
paramNames=[ ("%d" % ii) for ii in range(delta.size) ]
if diffVector is None or diffVector.size==0 or diffVector.size!=delta.size:
inIndex=np.array(range(len(paramNames)))
outIndex=np.array([])
if debug:
DbgMsgOut("SENS", "Bad sensitivity data, all parameters are significant.")
return outIndex, inIndex
# Collect sensitivity data
evSens=diffVector
if useSens:
evSens=evSens/delta
# Use absolute values
pSens=np.abs(evSens)
# Are all sensitivities finite?
if not np.isfinite(pSens).all():
# No, no parameter is marked as insignificant
if debug:
DbgMsgOut("SENS", "Infinite or NaN sensitivity, all parameters are significant.")
outIndex=np.array([])
inIndex=range(len(paramNames))
return outIndex, inIndex
# Order sensitivities from smallest to largest
ii=np.argsort(pSens)
pSensOrd=pSens[ii]
# Is largest absolute sensitivity zero?
if pSensOrd.max()<=0.0:
if debug:
DbgMsgOut("SENS", "Zero sensitivity, all parameters are significant.")
inIndex=np.array(range(pSensOrd.size))
outIndex=np.array([])
return outIndex, inIndex
# Square
if squared:
pSensOrd=pSensOrd**2
# Compute normalized cumulative sum (relative)
pSensCS=pSensOrd.cumsum()
pSensCS/=pSensCS[-1]
# Compute relative contributions
pSensRel=pSensOrd/pSensOrd.sum()
# Find threshold
thri=np.where((pSensCS>cumulativeThreshold)|(pSensRel>contribThreshold))[0]
# Extract indices
if len(thri)<=0:
# Crossing not found
outIndex=np.array([])
inIndex=ii
else:
# Crossing found
outIndex=ii[:thri[0]]
inIndex=ii[thri[0]:]
if debug:
DbgMsgOut("SENS", "Screening report")
DbgMsgOut("SENS", "Parameter with lowest influence above screening threshold: "+str(paramNames[ii[thri[0]]]))
DbgMsgOut("SENS", "%30s %12s %12s %12s" % ("name", "sens" if useSens else "diff", "rel", "cum"))
for jj in range(len(paramNames)):
DbgMsgOut("SENS", "%30s %12.2e %12.2e %12.2e" % (
paramNames[ii[jj]],
evSens[ii[jj]],
pSensRel[jj],
pSensCS[jj]
)
)
return outIndex, inIndex