Source code for pyopus.design.sensitivity

"""
.. inheritance-diagram:: pyopus.design.sensitivity
    :parts: 1

**Sensitivity analysis (PyOPUS subsystem name: SENS)**

Uses finite differences to estimate the sensitivities of performance 
measures to input parameters. 

Parameter screening is possible based on sensitivity results. 
""" 

from ..misc.debug import DbgMsgOut, DbgMsg
from ..evaluator.performance import PerformanceEvaluator, updateAnalysisCount
from ..evaluator.auxfunc import listParamDesc, paramDict
from ..parallel.cooperative import cOS
from .. import PyOpusError
import numpy as np
from pprint import pprint


__all__ = [ 'Sensitivity', 'screen' ] 

[docs]class Sensitivity(object): """ Callable object that computes sensitivities. See :class:`~pyopus.evaluator.performance.PerformanceEvaluator` for details on *evaluator*. The parameters are given by a dictionary *paramDesc* where parameter name is the key. One entry in this dictionary corresponds to one parameter. Every entry is a dictionary with the following members * lo .. lower bound (no lower bound if omitted). * hi .. upper bound * init .. parameter value where the sensitivity will be computed (optional) *paramDesc* can be a list in which case the complete set of parameters is the union of parameters specified by list memebers. *paramNames* specifies the order of parameters in the sensitivity vector. If not specified a default parameter ordering is constructed and stored in the ``paramNames`` member. Setting *debug* to a value greater than 0 turns on debug messages. The verbosity is proportional to the specified number. *diffType* is the finite difference type (``central``, ``forward``, or ``backward``) *absPerturb* specifies the absolute perturbation. If it is a single value it applies to all parameters. If it is a dictionary it applies to parameters listed in the dictionary. *relPerturbHiLo* specifies the relative perturbation with respect to the allowed parameter span given by ``lo`` and ``hi`` members of the *paramDesc* dictionary. If it is a single value it applies to all parameters for which a finite ``lo`` and ``hi`` value are specified. If it is a dictionary it applies only to parameters listed in the dictionary (provided they have finite ``lo`` and ``hi`` specified). *relPerturbValue* specified teh relative perturbation with respect to the parameter value. If it is a single value it applies to all parameters with nonzero value. If it is a dictionary it applies to parameters listed in the dictionary (provided they have nonzero value). If *clipping* is set to ``True`` the perturbed values are verified against ``lo`` and ``hi``. If a bound is violated the finite difference type for that parameter is change so that the violation is remedied. If *spawnerLevel* is not greater than 1 the evaluations of perturbed points are dispatched to available computing nodes. The number of circuit evaluations is stored in the ``neval`` member. Calling convention: obj(x0) where *x0* is a dictionary of parameter values specifying the point where the sensitivity should be computed. If omitted the values in the ``init`` member of the *paramDesc* are used. Returns a tuple with performance deltas as the first member, the parameter perturbations as the second member, and the *analysisCount* dictionary as the third member. The performance deltas are returned in a double dictionary where the first index is the performance measure name and the second index is the corner name. Every member is a list of values coresponding to parameters ordered as specified by *paramNames*. If the computation of a performance delta for some parameter perturbation fails (i.e. any of the points required for computing the performance delta fail to produce a valid result) the corresponding entry in the list is ``None``. Dividing performance deltas by parameter perturbations results in the sesitivities. This holds regardless of the type of perturbation specified with *diffType*. The performance deltas are stored in the *results* member. The parameter perturbations are stored in the *delta* member. Objects of this type store the number of analyses performed during the last call to the object in the *analysisCount* member. """ def __init__(self, evaluator, paramDesc, paramNames=None, debug=0, absPerturb=None, relPerturbHiLo=None, relPerturbValue=None, clipping=True, diffType="central", spawnerLevel=1 ): if diffType=='central': self.diffType=0 elif diffType=='forward': self.diffType=1 elif diffType=='backward': self.diffType=2 else: raise PyOpusError(DbgMsg("SENS", "Unknown finite differentiation type '%s'." % (diffType))) self.absPerturb=absPerturb self.relPerturbHiLo=relPerturbHiLo self.relPerturbValue=relPerturbValue self.evaluator=evaluator self.paramNames=paramNames self.debug=debug self.spawnerLevel=spawnerLevel self.clipping=clipping # Build the list of parameters by merging dictionaries if type(paramDesc) is list or type(paramDesc) is tuple: self.paramDesc={} for entry in paramDesc: self.paramDesc.update(entry) else: self.paramDesc=paramDesc if self.paramNames is None: self.paramNames=list(self.paramDesc.keys()) def preparePerturbations(self, x0): self.val=[] self.delta=[] self.ptype=[] allCentral=True for name in self.paramNames: par=self.paramDesc[name] val=x0[name] lo=par['lo'] if 'lo' in par else -np.inf hi=par['hi'] if 'hi' in par else np.inf self.val.append(val) # Check wrt range if val<lo or val>hi: raise PyOpusError(DbgMsg("SENS", "Parameter value '%s' outside bounds." % (name))) # Compute perturbation hlrange=hi-lo if self.absPerturb is not None and type(self.absPerturb) is not dict: # use abs perturb, global delta=self.absPerturb elif self.absPerturb is not None and name in self.absPerturb: # use abs perturb, local delta=self.absPerturb[name] elif ( self.relPerturbHiLo is not None and np.isfinite(hlrange) and type(self.relPerturbHiLo) is not dict ): delta=hlrange*self.relPerturbHiLo elif ( self.relPerturbHiLo is not None and np.isfinite(hlrange) and name in self.relPerturbHiLo ): delta=hlrange*self.relPerturbHiLo[name] elif ( self.relPerturbValue is not None and val!=0 and type(self.relPerturbValue) is not dict ): delta=val*self.relPerturbValue elif ( self.relPerturbValue is not None and val!=0 and name in self.relPerturbValue ): delta=val*self.relPerturbValue[name] else: raise PyOpusError(DbgMsg("SENS", "Cannot determine perturbation for parameter '%s'." % (name))) self.delta.append(delta) ptype=self.diffType # Clipping if self.clipping: if ptype==0 and val+delta/2>hi: # Switch to backward ptype=2 if self.debug: DbgMsgOut("SENS", "Switching from central to backward difference for parameter '%s'." % (name)) if ptype==0 and val-delta/2<lo: # Switch to forward ptype=1 if self.debug: DbgMsgOut("SENS", "Switching from central to forward difference for parameter '%s'." % (name)) if ptype==1 and val+delta>hi: # Switch to backward ptype=2 if self.debug: DbgMsgOut("SENS", "Switching from forward to backward difference for parameter '%s'." % (name)) if ptype==2 and val-delta<lo: # Switch to forward ptype=1 if self.debug: DbgMsgOut("SENS", "Switching from backward to forward difference for parameter '%s'." % (name)) if ptype==1 and val+delta>hi: # Give up raise PyOpusError(DbgMsg("SENS", "All perturbations of parameter '%s' are outside bounds." % (name))) if ptype==2 and val-delta<lo: # Give up raise PyOpusError(DbgMsg("SENS", "All perturbations of parameter '%s' are outside bounds." % (name))) # All parameters central difference? if ptype!=0: allCentral=False self.ptype.append(ptype) # Compute perturbations self.ndxp=[] self.ndxn=[] self.pval=[] self.pndx=[] self.psign=[] atndx=0 if not allCentral: # Need origin self.pval.append(None) self.psign.append(0) self.pndx.append(None) atndx+=1 for ii in range(len(self.paramNames)): ptype=self.ptype[ii] if ptype==0: self.pval.append(self.val[ii]+self.delta[ii]/2) self.pval.append(self.val[ii]-self.delta[ii]/2) self.psign.extend([1,-1]) self.pndx.extend([ii,ii]) self.ndxp.append(atndx) self.ndxn.append(atndx+1) atndx+=2 elif ptype==1: self.pval.append(self.val[ii]+self.delta[ii]) self.psign.append(1) self.pndx.append(ii) self.ndxp.append(atndx) self.ndxn.append(0) atndx+=1 else: self.pval.append(self.val[ii]-self.delta[ii]) self.psign.append(-1) self.pndx.append(ii) self.ndxn.append(atndx) self.ndxp.append(0) atndx+=1 def jobGenerator(self, x0): for ii in range(len(self.pval)): pndx=self.pndx[ii] if pndx is None: yield (self.jobProcessor, [self.evaluator, x0, {}]) else: yield (self.jobProcessor, [self.evaluator, x0, {self.paramNames[pndx]: self.pval[ii]}]) @staticmethod def jobProcessor(ev, x0, dx): x={} x.update(x0) x.update(dx) result, anCount = ev(x) return result, anCount def jobCollector(self, results, analysisCount): while True: index, job, retval = (yield) result, anCount = retval if self.debug>1: if self.psign[index]==0: DbgMsgOut("SENS", "Computed performace at origin") elif self.psign[index]>0: DbgMsgOut("SENS", "Computed positive perturbation of parameter '%s'." % (self.paramNames[self.pndx[index]])) else: DbgMsgOut("SENS", "Computed negative perturbation of parameter '%s'." % (self.paramNames[self.pndx[index]])) updateAnalysisCount(analysisCount, anCount) results[index]=result def __call__(self, x0=None): if x0 is None: x0={ name: desc['init'] for name, desc in self.paramDesc.items() } # Reset analysis count analysisCount={} self.preparePerturbations(x0) evresults=[None]*len(self.pval) # Evaluate cOS.dispatch( jobList=self.jobGenerator(x0), collector=self.jobCollector(evresults, analysisCount), remote=self.spawnerLevel<=1 ) # Blank results, indexed by measure, corner, parameter index diffs={} # Compute deltas for ii in range(len(self.paramNames)): ndxp=self.ndxp[ii] ndxn=self.ndxn[ii] resp=evresults[ndxp] resn=evresults[ndxn] # All measures are always present for measureName, dat in resp.items(): if measureName not in diffs: diffs[measureName]={} # All corners are always present for cornerName, valp in dat.items(): if cornerName not in diffs[measureName]: diffs[measureName][cornerName]=[] valn=resn[measureName][cornerName] # Do we have both? if valp is not None and valn is not None: diffs[measureName][cornerName].append(valp-valn) else: diffs[measureName][cornerName].append(None) self.results=diffs self.analysisCount=analysisCount self.neval=len(self.pval) self.delta=np.array(self.delta) return diffs, self.delta, self.analysisCount
[docs] def diffVector(self, measureName, cornerName): """ Returns the vector of performance differences in the order specified by *parameterNames* for the given *measureName* and *cornerName*. If the performance difference is an array (vector measures) the first index in the returned array is the parameter index while the remaining indices correspond to performance measure's indices. If some performance difference is not computed due to errors the corresponding components of the returned array are set to ``np.nan``. If all performance measures fail to be computed for all parameters the size of the array is unknown. In that case an empty array is returned. """ diffs=self.results[measureName][cornerName] # Scan all deltas, get vector length shp=None for diff in diffs: if diff is not None: shp=diff.shape # Failed for all deltas if shp is None: return np.array([]) # Collect vector vec=[] for diff in diffs: if diff is None: # Failed for this delta blank=np.zeros(shp) blank.fill(np.nan) vec.append(blank) else: vec.append(diff) # Array, first index is parameter, the rest are performance measure components return np.array(vec)
[docs]def screen(diffVector, delta, paramNames=None, contribThreshold=0.01, cumulativeThreshold=0.25, useSens=False, squared=True, debug=0): """ Performs parameter screening. Returns two lists of inidices corresponding to parameters whose influence is below threshold and parameters whose influsence is above threshold. The indices are ordered by absolute sensitivity from the smallest to the largest. *diffVector* is the vector of performance differences corresponding to perturbations of parameters specified by *paramNames*. Only scalar performances can be screened (i.e. this must be a 1-D vector). If *paramNames* is not specified parameters indices starting with 0 are used in the debug output. *delta* is the vector of parameter perturbations. A parameter is considered to have low influence if its relative contribution is below *contribThreshold* and the relative cumulative contribution of the set of all parameters with low influence does not exceed *cumulativeThreshold*. If *useSens* is set to ``True`` the performance measure sensitivities are used instead of deltas. This makes sense only if all parameters are expressed in the same units. If *squared* is set to ``False`` the absolute sensitivities/deltas are used instead of squared sensitivities/deltas in the computation of the relative influence. """ # Default parameter names if paramNames is None: paramNames=[ ("%d" % ii) for ii in range(delta.size) ] if diffVector is None or diffVector.size==0 or diffVector.size!=delta.size: inIndex=np.array(range(len(paramNames))) outIndex=np.array([]) if debug: DbgMsgOut("SENS", "Bad sensitivity data, all parameters are significant.") return outIndex, inIndex # Collect sensitivity data evSens=diffVector if useSens: evSens=evSens/delta # Use absolute values pSens=np.abs(evSens) # Are all sensitivities finite? if not np.isfinite(pSens).all(): # No, no parameter is marked as insignificant if debug: DbgMsgOut("SENS", "Infinite or NaN sensitivity, all parameters are significant.") outIndex=np.array([]) inIndex=range(len(paramNames)) return outIndex, inIndex # Order sensitivities from smallest to largest ii=np.argsort(pSens) pSensOrd=pSens[ii] # Is largest absolute sensitivity zero? if pSensOrd.max()<=0.0: if debug: DbgMsgOut("SENS", "Zero sensitivity, all parameters are significant.") inIndex=np.array(range(pSensOrd.size)) outIndex=np.array([]) return outIndex, inIndex # Square if squared: pSensOrd=pSensOrd**2 # Compute normalized cumulative sum (relative) pSensCS=pSensOrd.cumsum() pSensCS/=pSensCS[-1] # Compute relative contributions pSensRel=pSensOrd/pSensOrd.sum() # Find threshold thri=np.where((pSensCS>cumulativeThreshold)|(pSensRel>contribThreshold))[0] # Extract indices if len(thri)<=0: # Crossing not found outIndex=np.array([]) inIndex=ii else: # Crossing found outIndex=ii[:thri[0]] inIndex=ii[thri[0]:] if debug: DbgMsgOut("SENS", "Screening report") DbgMsgOut("SENS", "Parameter with lowest influence above screening threshold: "+str(paramNames[ii[thri[0]]])) DbgMsgOut("SENS", "%30s %12s %12s %12s" % ("name", "sens" if useSens else "diff", "rel", "cum")) for jj in range(len(paramNames)): DbgMsgOut("SENS", "%30s %12.2e %12.2e %12.2e" % ( paramNames[ii[jj]], evSens[ii[jj]], pSensRel[jj], pSensCS[jj] ) ) return outIndex, inIndex