**Base classes for optimization algorithms and plugins 
(PyOPUS subsystem name: OPT)**

Every optimization algorthm should be used in the following way. 

1. Create the optimizer object. 
2. Call the :meth:`reset` method of the object to set the initial point. 
3. Optionally call the :meth:`check` method to check the consistency of 
4. Run the algorithm by calling the :meth:`run` method. 

The same object can be reused by calling the :meth:`reset` method followed by 
an optional call to :meth:`check` and a call to :meth:`run`. 

from ..misc.debug import DbgMsgOut, DbgMsg
from .cache import Cache

import numpy as np
from numpy import sqrt, isinf, Inf, where, zeros, array, concatenate
from numpy import random
from time import sleep

import sys

__all__ = [ 'RandomDelay', 'Plugin', 'Reporter', 'Stopper', 'Annotator', 'AnnotatorGroup', 
		'Optimizer', 'BoxConstrainedOptimizer', 'ConstrainedOptimizer',
		'UCEvaluator', 'BCEvaluator', 'CEvaluator', 
		'normalizer', 'denormalizer', 'CostCollector',  ]

[docs]class RandomDelay(object): """ A wrapper class for introducing a random delay into function evaluation Objects of this class are callable. On call they evaluate the callable object given by *obj* with the given args. A random delay with uniform distribution specified by the *delayInterval* list is generated and applied before the return value from *obj* is returned. The two members of *delayInterval* list specify the lower and the upper bound on the delay. f Example:: from pyopus.optimizer.base import RandomDelay def f(x): return 2*x # Delayed evaluator with a random delay between 1 and 5 seconds fprime=RandomDelay(f, [1.0, 5.0]) # Return f(10) without delay print f(x) # Return f(10) with a random delay between 1 and 5 seconds print fprime(10) """ def __init__(self, obj, delayInterval): self.obj=obj self.delayInterval=delayInterval self.start=delayInterval[0] self.range=delayInterval[1]-delayInterval[0] # Seed our own random generator, remember its state and restore original state state=random.get_state() random.seed() self.state=random.get_state() random.set_state(state) def __call__(self, *args): # Switch to our own random generator state=random.get_state() random.set_state(self.state) # Generate delay s=self.start+self.range*random.rand(1)[0] # Remember our state and switch back random generator self.state=random.get_state() random.set_state(state) # Sleep sleep(s) # Evaluate return self.obj(*args)
[docs]class Plugin(object): """ Base class for optimization algorithm plugins. A plugin is a callable object with the following calling convention ``plugin_object(x, ft, opt)`` where *x* is an array representing a point in the search space and *ft* is the corresponding cost function value. If *ft* is a tuple the first member is the cost function value, the second member is an array of nonlinear constraint function values, and the third member is the vector of constraint violations. A violation of 0 means that none of the nonlinear constraints is violated. Positive/negative violations indicate a upper/lower bound on the constraint function is violated. *opt* is a reference to the optimization algorithm where the plugin is installed. The plugin's job is to produce output or update some internal structures with the data collected from *x*, *ft*, and *opt*. If *quiet* is ``True`` the plugin supresses its output. This is useful on remote workers where the output of a plugin is often uneccessary. """ def __init__(self, quiet=False): # Stop flag self.stop=False # Should the reporter be silent self.quiet=quiet pass
[docs] def reset(self): """ Resets the plugin to its initial state. """ pass
[docs] def setQuiet(self, quiet): """ If *quiet* is ``True`` all further output is supressed by the plugin. To reenable it, call this method with *quiet* set to ``False``. """ self.quiet=quiet
def __call__(self, x, ft, opt): pass
[docs]class Reporter(Plugin): """ A base class for plugins used for reporting the cost function value and its details. If *onImprovement* is ``True`` the cost function value is reported only when it improves on the best-yet (lowest) cost function value. *onIterStep* specifies the number of calls to the reporter after which the cost function value is reported regardless of the *onImprovement* setting. Setting *onIterStep* to 0 disables this feature. The value of the cost function at the first call to the reporter (after the last call to the :meth:`reset` method) is always reported. The mechanism for detecting an improvement is simple. It compares the :attr:`niter` and the :attr:`bestIter` attributes of *opt*. The latter becomes equal to the former whenever a new better point is found. See the :meth:`updateBest` method of the optimization algorithm for more details. *ft* is a tuple for nonlinearly constrained optimizers. More details can be found in the documantation of the :class:`Plugin` class. The reporter prints the function value and the cumulative constraint violation (h). The iteration number is obtained from the :attr:`niter` member of the *opt* object passed when the reporter is called. The best-yet value of the cost function is obtained from the :attr:`f` member of the *opt* object. This member is ``None`` at the first iteration. """ def __init__(self, onImprovement=True, onIterStep=1): Plugin.__init__(self) # onIterStep=0 turns of regular reporting self.onImprovement=onImprovement self.onIterStep=onIterStep def __call__(self, x, ft, opt): # We have improvement at first iteration or whenever optimizer says there is improvement if opt.f is None or opt.niter==opt.bestIter: improved=True else: improved=False # Report if not self.quiet: if (self.onImprovement and improved) or (self.onIterStep!=0 and opt.niter%self.onIterStep==0): # Is ft a tuple if type(ft) is tuple: # Nonlinear constraint violations given, print cumulative squared violation (h) print("iter="+str(opt.niter)+" f="+str(ft[0])+" h="+str((ft[2]**2).sum())+" fbest="+str(opt.f)) else: # No nonlinear constraints, just f print("iter="+str(opt.niter)+" f="+str(ft)+" fbest="+str(opt.f)) sys.stdout.flush()
[docs]class Stopper(Plugin): """ Stopper plugins are used for stopping the optimization algorithm when a particular condition is satisfied. The actuall stopping is achieved by setting the :attr:`stop` member of the *opt* object to ``True``. """ def __call__(self, x, ft, opt): pass
[docs]class Annotator(object): """ Annotators produce annotations of the function value. These annotations can be used for restoring the state of local objects from remotely computed results. Usually the annotation is produced on remote workers when the cost function is evaluated. It is the job of the optimization algorithm to send the value of *ft* along with the corresponding annotations from worker (where the evaluation took place) to the master where the annotation is consumed (by a call to the :meth:`newResult` method of the optimization algorithm). This way the master can access all the auxiliary data which is normally produced only on the machine where the evaluation of the cost function took place. """
[docs] def produce(self): """ Produce an annotation. """ return None
[docs] def consume(self, annotation): """ Consume an annotation. """ return
[docs]class AnnotatorGroup(object): """ This object is a container holding annotators. """ def __init__(self): self.annotators=[]
[docs] def add(self, annotator, index=-1): """ Adds an annotator at position *index*. If no *index* is given the annotator is appended at the end. The index of the annotator is returned. """ if index<0: self.annotators.append(annotator) return len(self.annotators)-1 else: self.annotators[index]=annotator return index
[docs] def produce(self): """ Produces a list of annotations corresponding to annotators. """ annot=[] for a in self.annotators: annot.append(a.produce()) return annot
[docs] def consume(self, annotations): """ Consumes a list of annotations. """ for ii in range(len(self.annotators)): self.annotators[ii].consume(annotations[ii])
[docs]def UCEvaluator(x, f, annGrp, nanBarrier): """ Evaluator for unconstrained optimization. Returns the function value and the annotations. This function should be used on a remote computing node for evaluating the function. Its return value should be sent to the master that invoked the evaluation. The arguments for this function are generated by the optimizer. See the :meth:`getEvaluator` method. The evaluator and its arguments are the minimum of things needed for a remote evaluation. By using this function one can avoid pickling and sending the whole optimizer object with all of its auxiliary data. Instead one can just pickle and send the bare minimum needed for a remote evaluation. """ fval=f(x) if nanBarrier and np.isnan(fval): fval=np.Inf return np.array(fval), annGrp.produce()
[docs]def BCEvaluator(x, xlo, xhi, f, annGrp, extremeBarrierBox, nanBarrier): """ Evaluator for box-constrained optimization. Returns the function value and the annotations. The arguments are generated by the optimizer. See the :meth:`getEvaluator` method. The evaluator and its arguments are the minimum of things needed for a remote evaluation. See the :func:`UCEvaluator` function for more information. """ # Check box constraint violations violatedLo=(x<xlo).any() violatedHi=(x>xhi).any() # Enforce box constraints if extreme barrier approach is used if extremeBarrierBox: if violatedLo or violatedHi: return Inf else: if violatedLo: raise Exception(DbgMsg("BCOPT", "Point violates lower bound.")) if violatedHi: raise Exception(DbgMsg("BCOPT", "Point violates upper bound.")) fval=f(x) if nanBarrier and np.isnan(fval): fval=np.Inf return np.array(fval), annGrp.produce()
[docs]def CEvaluator(x, xlo, xhi, f, fc, c, annGrp, extremeBarrierBox, nanBarrier): """ Evaluator for constrained optimization. Returns the function value, the constraints values, and the annotations. The arguments are generated by the optimizer. See the :meth:`getEvaluator` method. The evaluator and its arguments are the minimum of things needed for remote evaluation. See the :func:`UCEvaluator` function for more information. """ # Check box constraint violations violatedLo=(x<xlo).any() violatedHi=(x>xhi).any() # No function value # Enforce box constraints with extreme barrier approach if extremeBarrierBox: if violatedLo or violatedHi: fval=Inf cval=np.array([]) return fval, cval, None else: if violatedLo: raise Exception(DbgMsg("COPT", "Point violates lower bound.")) if violatedHi: raise Exception(DbgMsg("COPT", "Point violates upper bound.")) # Do we have fc if fc is not None: # Yes # Evaluate f and c (fval,cval)=fc(x) else: # No # Evaluate constraints if c is not None: cval=c(x) else: cval=np.array([]) # Evaluate function (always) fval=f(x) # Nan barrier if nanBarrier and np.isnan(fval): fval=np.Inf return np.array(fval), np.array(cval), annGrp.produce()
[docs]class Optimizer(object): """ Base class for unconstrained optimization algorithms. *function* is the cost function (Python function or any other callable object) which should be minimzied. If *debug* is greater than 0, debug messages are printed at standard output. *fstop* specifies the cost function value at which the algorithm is stopped. If it is ``None`` the algorithm is never stopped regardless of how low the cost function's value gets. *maxiter* specifies the number of cost function evaluations after which the algorithm is stopped. If it is ``None`` the number of cost function evaluations is unlimited. *nanBarrier* specifies if NaN function values should be treated as infinite thus resulting in an extreme barrier. *cache* turns on local point caching. Currently works only for algorithms that do not use remote evaluations. See the :mod:`~pyopus.optimizer.cache` module for more information. The following members are available in every object of the :class:`Optimizer` class * :attr:`ndim` - dimension of the problem. Updated when the :meth:`reset` method is called. * :attr:`niter` - the consecutive number of cost function evaluation. * :attr:`x` - the argument to the cost function resulting in the best-yet (lowest) value. * :attr:`f` - the best-yet value of the cost function. * :attr:`bestIter` - the iteration in which the best-yet value of the cost function was found. * :attr:`bestAnnotations` - a list of annotations produced by the installed annotators for the best-yet value of the cost function. * :attr:`stop` - boolean flag indicating that the algorithm should stop. * :attr:`annotations` - a list of annotations produced by the installed annotators for the last evaluated cost function value * :attr:`annGrp` - :class:`AnnotatorGroup` object that holds the installed annotators * :attr:`plugins` - a list of installed plugin objects Plugin objects are called at every cost function evaluation or whenever a remotely evaluated cost function value is registered by the :meth:`newResult` method. Values of *x* and related members are arrays. """ def __init__(self, function, debug=0, fstop=None, maxiter=None, nanBarrier=False, cache=False): # Function subject to optimization, must be picklable for parallel optimization methods. self.function=function # Debug mode flag self.debug=debug # Problem dimension self.ndim=None # Stopping conditions self.fstop=fstop self.maxiter=maxiter # NaN barrier self.nanBarrier=nanBarrier # Cache if cache: self.cache=Cache() else: self.cache=None # Iteration counter self.niter=0 # Best-yet point self.x=None self.f=None self.bestIter=None self.bestAnnotations=None # Plugins self.plugins=[] # Annotator group self.annGrp=AnnotatorGroup() # Stop flag self.stop=False # Annotations produced at last cost function evaluation self.annotations=None
[docs] def check(self): """ Checks the optimization algorithm's settings and raises an exception if something is wrong. """ if ( is None): raise Exception(DbgMsg("OPT", "Cost function not defined.")) if self.maxiter is not None and self.maxiter<1: raise Exception(DbgMsg("OPT", "Maximum number of iterations must be at least 1."))
[docs] def installPlugin(self, plugin): """ Installs a plugin object or an annotator in the plugins list and/or annotators list Returns two indices. The first is the index of the installed annotator while the second is the index of the instaleld plugin. If teh object is not an annotator, the first index is ``None``. Similarly, if the object is not a plugin the second index is ``None``. """ i1=None if issubclass(type(plugin), Annotator): i1=self.annGrp.add(plugin) i2=None if issubclass(type(plugin), Plugin): self.plugins.append(plugin) i2=len(self.plugins)-1 return (i1, i2)
[docs] def getEvaluator(self, x): """ Returns a tuple holding the evaluator function and its positional arguments that evaluate the problem at *x*. This tuple can be sent to a remote computing node and evaluation is invoked using:: # Tuple t holds the function and its arguments func,args=t retval=func(*args) # Send retval back to the master The first positional argument is the point to be evaluated. The evaluator returns a tuple of the form (f,annotations). """ return UCEvaluator, [x, self.function, self.annGrp, self.nanBarrier]
[docs] def fun(self, x, count=True): """ Evaluates the cost function at *x* (array). If *count* is ``True`` the :meth:`newResult` method is invoked with *x*, the obtained cost function value, and *annotations* argument set to ``None``. This means that the result is registered (best-yet point information are updated), the plugins are calls and the annotators are invoked to produce annotations. Use ``False`` for *count* if you need to evaluate the cost function for debugging purposes. Returns the value of the cost function at *x*. """ data=None if self.cache is not None: data=self.cache.lookup(x) if data is not None: f,annot,it=data else: # Evaluate evf, args = self.getEvaluator(x) f,annot=evf(*args) # Do the things that need to be done with a new result # No annotation is provided telling the newResult() method that the # function evaluation actually happened in this process. if count: self.newResult(x, f, annot) return np.array(f)
[docs] def updateBest(self, x, f): """ Updates best yet function value. Returns ``True`` if an update takes place. """ if (self.f is None) or (self.f>f): self.f=f self.x=x self.bestIter=self.niter return True return False
[docs] def newResult(self, x, f, annotations=None): """ Registers the cost function value *f* obtained at point *x* with annotations list given by *annotations*. Increases the :attr:`niter` member to reflect the iteration number of the point being registered and updates the :attr:`f`, :attr:`x`, and :attr:`bestIter` members. If the *annotations* argument is given, it must be a list with as many members as there are annotator objects installed in the optimizer. The annotations list is stored in the :attr:`annotations` member. If *f* improves the best-yet value annotations are also stored in the :attr:`bestAnnotations` member. The *annotations* are consumed by calling the :meth:`consume` method of the annotators. Finally it is checked if the best-yet value of cost function is below :attr:`fstop` or the number of iterations exceeded :attr:`maxiter`. If any of these two conditions is satisfied, the algorithm is stopped by setting the :attr:`stop` member to ``True``. """ # Increase evaluation counter self.niter+=1 # Store in cache if self.cache and self.cache.lookup(x) is None: self.cache.insert(x, (f, annotations, self.niter)) # Update best-yet updated=self.updateBest(x, f) # If no annotation are given, function evaluation happened in this process if annotations is not None: # Put annotations in annotations list self.annotations=annotations # Consume annotations self.annGrp.consume(annotations) # Update best-yet annotations if updated: self.bestAnnotations=self.annotations # Annotations are set up. Call plugins. nplugins=len(self.plugins) for index in range(0,nplugins): plugin=self.plugins[index] if plugin is not None: stopBefore=self.stop plugin(x, f, self) if self.debug and self.stop and not stopBefore: DbgMsgOut("OPT", "Run stopped by plugin object.") # Force stop condition on f<=fstop if (self.fstop is not None) and (self.f<=self.fstop): self.stop=True if self.debug: DbgMsgOut("OPT", "Function fell below desired value. Stopping.") # Force stop condition on niter>maxiter if self.maxiter is not None and self.niter>=self.maxiter: self.stop=True if self.debug: DbgMsgOut("OPT", "Maximal number of iterations exceeded. Stopping.")
[docs] def reset(self, x0): """ Puts the optimizer in its initial state and sets the initial point to be the 1-dimensional array or list *x0*. The length of the array becomes the dimension of the optimization problem (:attr:`ndim` member). """ # Debug message if self.debug: DbgMsgOut("OPT", "Resetting.") # Determine dimension of the problem from initial point x0=array(x0) self.ndim=x0.shape[0] if x0.ndim!=1: raise Exception(DbgMsg("OPT", "Initial point must be a vector.")) # Store initial point self.x=x0.copy() self.f=None # Reset iteration counter self.niter=0 # Reset plugins for plugin in self.plugins: if plugin is not None: plugin.reset()
[docs] def run(self): """ Runs the optimization algorithm. """ # Does nothing, reimplement this in a derived class. pass
[docs]def normalizer(x, origin, scale): """ Normalizer for bound constrained optimization. """ return (x-origin)/scale
[docs]def denormalizer(y, origin, scale): """ Denormalizer for bound constrained optimization. """ return y*scale+origin
[docs]class BoxConstrainedOptimizer(Optimizer): """ Box-constrained optimizer class *xlo* and *xhi* are 1-dimensional arrays or lists holding the lower and the upper bounds on the components of *x*. Some algorithms allow the components of *xlo* to be :math:`- \infty` and the components of *xhi* to be :math:`+ \infty`. If *extremeBarrierBox* is set to ``True`` the :meth:`fun` method returns :math:`\infty` if the supplied point violates the box constraints. See the :class:`Optimizer` class for more information. """ def __init__(self, function, xlo=None, xhi=None, debug=0, fstop=None, maxiter=None, nanBarrier=False, extremeBarrierBox=False, cache=False): Optimizer.__init__(self, function, debug, fstop, maxiter, nanBarrier, cache) # Constraints self.xlo=xlo self.xhi=xhi self.extremeBarrierBox=False
[docs] def check(self): """ Checks the optimization algorithm's settings and raises an exception if something is wrong. """ # Check bounds if self.xlo is not None: if (self.xlo.ndim!=1 or self.xhi.ndim!=1): raise Exception(DbgMsg("OPT", "Bounds must be one-dimensional vectors.")) if self.xhi is not None: if (self.xlo.shape[0]!=self.xhi.shape[0]): raise Exception(DbgMsg("OPT", "Bounds must match in length.")) if (self.xlo is not None) and (self.xhi is not None): if (self.xlo>=self.xhi).any(): raise Exception(DbgMsg("OPT", "Lower bound must be below upper bound.")) # Unconstraint checks Optimizer.check(self)
[docs] def reset(self, x0): """ Puts the optimizer in its initial state and sets the initial point to be the 1-dimensional array or list *x0*. The length of the array becomes the dimension of the optimization problem (:attr:`ndim` member). The shape of *x* must match that of *xlo* and *xhi*. Normalization origin :math:`n_o` and scaling :math:`n_s` are calculated from the values of *xlo*, *xhi*, and intial point *x0*: * If a lower bound :math:`x_{lo}^i` is :math:`- \infty` .. math:: n_s^i &= 2 (x_{hi}^i - x_0^i) \\\\ n_o^i &= x_0^i - n_s^i / 2 * If an upper bound :math:`x_{hi}^i` is :math:`+ \infty` .. math:: n_s^i &= 2 (x_0^i - x_{lo}^i) \\\\ n_o^i &= x_0^i - n_s^i / 2 * If both lower and upper bound are infinite .. math:: n_s^i &= 2 \\\\ n_o^i &= x_0^i * If bouth bounds are finite .. math:: n_s^i &= x_{hi}^i - x_{lo}^i \\\\ n_o^i &= x_{lo} """ x0=array(x0) Optimizer.reset(self, x0) if self.debug: DbgMsgOut("BCOPT", "Resetting.") # The dimension is now known # Set default bounds to Inf, -Inf if self.xlo is None: self.xlo=zeros(self.ndim) self.xlo.fill(-Inf) else: self.xlo=array(self.xlo) if self.xhi is None: self.xhi=zeros(self.ndim) self.xhi.fill(Inf) else: self.xhi=array(self.xhi) self.hasBounds=False if np.isfinite(self.xlo).any() or np.isfinite(self.xhi).any(): self.hasBounds=True self.allBounds=False if np.isfinite(self.xlo).all() and np.isfinite(self.xhi).all(): self.allBounds=True # Check initial point against bounds if (x0<self.xlo).any(): raise Exception(DbgMsg("BCOPT", "Initial point violates lower bound.")) if (x0>self.xhi).any(): raise Exception(DbgMsg("BCOPT", "Initial point violates upper bound.")) # Normalization (defaults to low-high range) self.normScale=self.xhi-self.xlo self.normOrigin=self.xlo.copy() # Update normalization origin and scaling for unbound coordinates based on initial point. ndx=where(isinf(self.xlo) & ~isinf(self.xhi)) if len(ndx[0])>0: self.normScale[ndx]=2*(self.xhi[ndx]-x0[ndx]) self.normOrigin[ndx]=x0[ndx]-self.normScale[ndx]/2.0 ndx=where(~isinf(self.xlo) & isinf(self.xhi)) if len(ndx[0])>0: self.normScale[ndx]=2*(x0[ndx]-self.xlo[ndx]) self.normOrigin[ndx]=self.xlo[ndx] ndx=where(isinf(self.xlo) & isinf(self.xhi)) if len(ndx[0])>0: self.normScale[ndx]=x0[ndx]*0.0+2.0 self.normOrigin[ndx]=x0[ndx]
[docs] def bound(self, x): """ Fixes components of *x* so that the bounds are enforced. If a component is below lower bound it is set to the lower bound. If a component is above upper bound it is set to the upper bound. """ pos=where(x<self.xlo)[0] x[pos]=self.xlo[pos] pos=where(x>self.xhi)[0] x[pos]=self.xhi[pos]
[docs] def normDist(self, x, y): """ Calculates normalized distance between *x* and *y*. Normalized distance is calculated as .. math:: \\sqrt{\\sum_{i=1}^{n} (\\frac{x^i - y^i}{n_s^i})^2} """ d=(x-y) return sqrt(sum((d/self.normScale)**2))
[docs] def normalize(self, x): """ Returnes a normalized point *y* corresponding to *x*. Components of *y* are .. math:: y^i = \\frac{x^i - n_o^i}{n_s^i} If both bounds are finite, the result is within the :math:`[0,1]` interval. """ return normalizer(x, self.normOrigin, self.normScale)
[docs] def denormalize(self, y): """ Returns a denormalized point *x* corresponding to *y*. Components of *x* are .. math:: x^i = y^i n_s^i + n_o^i """ return denormalizer(y, self.normOrigin, self.normScale)
[docs] def getEvaluator(self, x): """ Returns the evaluator function and its positional arguments that evaluate the problem at *x*. The first positional argument is the point to be evaluated. The evaluator returns a tuple of the form (f,annotations). """ return BCEvaluator, [x, self.xlo, self.xhi, self.function, self.annGrp, self.extremeBarrierBox, self.nanBarrier]
[docs] def fun(self, x, count=True): """ Evaluates the cost function at *x* (array). If *count* is ``True`` the :meth:`newResult` method is invoked with *x*, the obtained cost function value, and *annotations* argument set to ``None``. This means that the result is registered (best-yet point information are updated) and the plugins are called to produce annotations. Use ``False`` for *count* if you need to evaluate the cost function for debugging purposes. Returns the value of the cost function at *x*. """ data=None if self.cache is not None: data=self.cache.lookup(x) if data is not None: f,annot,it=data else: # Evaluate evf, args = self.getEvaluator(x) f,annot=evf(*args) # Do the things that need to be done with a new result # No annotation is provided telling the newResult() method that the # function evaluation actually happened in this process. if count: self.newResult(x, f) return np.array(f)
[docs]class ConstrainedOptimizer(BoxConstrainedOptimizer): """ Constrained optimizer class *xlo* and *xhi* are 1-dimensional arrays or lists holding the lower and upper bounds on the components of *x*. Some algorithms allow the components of *xlo* to be :math:`- \infty` and the components of *xhi* to be :math:`+ \infty`. *constraints* is a function that returns an array holding the values of the general nonlinear constraints. *clo* and *chi* are vectors of lower and upper bounds on the constraint functions in *constraints*. Nonlinear constraints are of the form :math:`\leq c_{lo} \leq f(x) \leq c_{hi}`. *fc* is a function that simultaneously evaluates the function and the constraints. When it is given, *function* and *constraints* must be ``None``. See the :class:`BoxConstrainedOptimizer` class for more information. """ def __init__(self, function, xlo=None, xhi=None, constraints=None, clo=None, chi=None, fc=None, debug=0, fstop=None, maxiter=None, nanBarrier=False, extremeBarrierBox=False, cache=False): BoxConstrainedOptimizer.__init__(self, function, xlo, xhi, debug, fstop, maxiter, nanBarrier, extremeBarrierBox, cache) # Constraints self.constraints=constraints # Function and constraints self.fc=fc if fc is not None and (function is not None or constraints is not None): raise Exception(DbgMsg("COPT", "When fc is given, function and constraints must be None.")) # Constraint bounds self.clo=clo self.chi=chi
[docs] def check(self): """ Checks the optimization algorithm's settings and raises an exception if something is wrong. """ # Unconstrained and box constrained checks BoxConstrainedOptimizer.check(self) # Nonlinear constraint checks if self.constraints is not None: if (self.clo.ndim!=1 or self.chi.ndim!=1): raise Exception(DbgMsg("COPT", "Constraint bounds must be one-dimensional vectors.")) if (self.clo.shape[0]!=self.chi.shape[0]): raise Exception(DbgMsg("COPT", "Constraint bounds must match in length.")) if (self.clo>self.chi).any(): raise Exception(DbgMsg("COPT", "Lower constraint bound must not be greater than upper constraint bound."))
[docs] def reset(self, x0): """ Puts the optimizer in its initial state and sets the initial point to be the 1-dimensional array or list *x0*. The length of the array becomes the dimension of the optimization problem (:attr:`ndim` member). The shape of *x* must match that of *xlo* and *xhi* and *x* must be within the bounds specified by *xlo* and *xhi*. See the :meth:`reset` method of :class:`BoxConstrainedOptimizer` class for more information. """ x0=array(x0) BoxConstrainedOptimizer.reset(self, x0) if self.debug: DbgMsgOut("COPT", "Resetting.") # Default constraint bounds - don't known number of constraints, error if ( (self.constraints is not None or self.fc is not None) and self.clo is None and self.chi is None ): raise Exception(DbgMsg("COPT", "Cannot deduce the number of constraints")) if (self.constraints is not None or self.fc is not None): if self.clo is None: self.clo=zeros(self.chi.shape[0]) self.clo.fill(-Inf) else: self.clo=array(self.clo) if self.chi is None: self.chi=zeros(self.clo.shape[0]) self.chi.fill(Inf) else: self.chi=array(self.chi) # Count nonlinear constraints[0] else: # Reset nonlinear constraint values at best point self.c=None
[docs] def fun(self, x, count=True): """ The use of this function is not allowed in constrained optimization. """ raise Exception(DbgMsg("COPT", "fun() is not allowed. Use funcon()."))
[docs] def getEvaluator(self, x): """ Returns the evaluator function and its positional arguments that evaluate the problem at *x*. The first positional argument is the point to be evaluated. The evaluator returns a tuple of the form (f,c,annotations). """ return CEvaluator, [x, self.xlo, self.xhi, self.function, self.fc, self.constraints, self.annGrp, self.extremeBarrierBox, self.nanBarrier]
[docs] def funcon(self, x, count=True): """ Evaluates the cost function and the nonlinear constraints at *x*. If *count* is ``True`` the :meth:`newResult` method is invoked with *x*, the obtained cost function value, the nonlinear constraint values, and the *annotations* argument set to ``None``. This means that the result is registered (best-yet point information are updated) and the plugins are called to produce annotations. Use ``False`` for *count* if you need to evaluate the cost function and the constraints for debugging purposes. Returns the value of the cost function at *x* adjusted for extreme barrier (i.e. ``Inf`` when box constraints are violated or the function value is ``Nan``) and a vector of constraint function values. """ data=None if self.cache is not None: data=self.cache.lookup(x) if data is not None: f,c,annot,it=data else: evf, args = self.getEvaluator(x) f,c,annot=evf(*args) # Register result if count: self.newResult(x, f, c, annot) return np.array(f), np.array(c)
[docs] def constraintViolation(self, c): """ Returns constraint violation vector. Negative values correspond to lower bound violation while positive values correspond to upper bound violation. Returns 0 if *c* is zero-length. """ if c.size>0: vlo=c-self.clo vlo=np.where(vlo>=0.0, 0.0, vlo) vhi=c-self.chi vhi=np.where(vhi<=0.0, 0.0, vhi) return vlo+vhi else: return np.array([])
[docs] def aggregateConstraintViolation(self, c, useL2squared=False): """ Computes the aggregate constraint violation. If no nonlinear constraints are violated this value is 0. Otherwise it is greater than zero. *c* is the vector of constraint violations returned by the :meth:`constraintViolation` method. if *useL2squared* is ``True`` the L2 norm is used for computing the aggregate violation. Otherwise L1 norm is used. """ h=self.constraintViolation(c) if h.size==0: return 0.0 else: if useL2squared: return (h**2).sum() # L2 else: return np.abs(h).sum() # L1
[docs] def updateBest(self, x, f, c): """ Updates best yet function and constraint values. Calculates cumulative squared constraint violation (h). It is 0 if there are no constraints. If h<hbest, replace best point. If h==hbest and f<fbest, replace best point. Returns ``True`` if an update takes place. """ if self.f is None: self.f=f self.x=x self.c=c self.bestIter=self.niter return True else: h=self.aggregateConstraintViolation(c) hbest=self.aggregateConstraintViolation(self.c) if (h<hbest) or (h==hbest and f<self.f): self.f=f self.x=x self.c=c self.bestIter=self.niter return True return False
[docs] def newResult(self, x, f, c, annotations=None): """ Registers the cost function value *f* and constraints values *c* obtained at point *x* with annotations list given by *annotations*. Increases the :attr:`niter` member to reflect the iteration number of the point being registered and updates the :attr:`f`, :attr:`x`, :attr:`c`, and :attr:`bestIter` members. See the :meth:`newResult` method of the :class:`Optimizer` class for more information. """ # Increase evaluation counter self.niter+=1 # Store in cache if self.cache and self.cache.lookup(x) is None: self.cache.insert(x, (f,c,annotations,self.niter)) # Update best-yet updated=self.updateBest(x, f, c) # If no annotation are given, function evaluation happened in this process if annotations is not None: # Put annotations in annotations list self.annotations=annotations # Consume annotations self.annGrp.consume(annotations) # Update best-yet annotations if updated: self.bestAnnotations=self.annotations # Annotations are set up. Call plugins. nplugins=len(self.plugins) for index in range(0,nplugins): plugin=self.plugins[index] if plugin is not None: stopBefore=self.stop plugin(x, (f,c,self.constraintViolation(c)), self) if self.debug and self.stop and not stopBefore: DbgMsgOut("OPT", "Run stopped by plugin object.") # Force stop condition on f<=fstop if (self.fstop is not None) and (self.f<=self.fstop): self.stop=True if self.debug: DbgMsgOut("OPT", "Function fell below desired value. Stopping.") # Force stop condition on niter>maxiter if self.maxiter is not None and self.niter>=self.maxiter: self.stop=True if self.debug: DbgMsgOut("OPT", "Maximal number of iterations exceeded. Stopping.")
# Cost function and input parameter collector
[docs]class CostCollector(Plugin): """ A subclass of the :class:`~pyopus.optimizer.base.Plugin` iterative algorithm plugin class. This is a callable object invoked at every iteration of the algorithm. It collects the input parameter vector (n components) and the aggregate function value. Let niter denote the number of stored iterations. The input parameter values are stored in the :attr:`xval` member which is an array of shape (niter, n) while the aggregate function values are stored in the :attr:`fval` member (array with shape (niter)). If the algorithm supplies constraint violations they are stored in the :attr:`hval` member. Otherwise this member is set to ``None``. Some iterative algorithms do not evaluate iterations sequentially. Such algorithms denote the iteration number with the :attr:`index` member. If the :attr:`index` member is not present in the iterative algorithm object the internal iteration counter of the :class:`CostCollector` is used. The first index in the *xval*, *fval*, and *hval* arrays is the iteration index. If iterations are not performed sequentially these two arrays may contain gaps where no valid input parameter or aggregate function value is found. The gaps are denoted by the *valid* array (of shape (niter)) where zeros denote a gap. *xval*, *fval*, *hval*, and *valid* arrays are resized in chunks of size *chunkSize*. """ def __init__(self, chunkSize=100): Plugin.__init__(self) self.xval=None self.fval=None self.hval=None self.valid=None self.n = chunkSize self.memLen = chunkSize def __call__(self, x, ft, opt): if self.xval is None: #allocate space in memory self.xval = zeros([self.n,len(x)]) self.fval = zeros([self.n]) if type(ft) is tuple: self.hval = zeros([self.n]) self.valid = zeros([self.n]) self.localindex = 0 if 'index' in opt.__dict__: index = opt.index else: index = self.localindex self.localindex += 1 #check if the index is inside the already allocated space -> if not allocate new space in memory while index >= self.memLen: self.xval = concatenate((self.xval, zeros([self.n,len(x)])), axis=0) self.fval = concatenate((self.fval, zeros([self.n])), axis=0) if type(ft) is tuple: self.hval = concatenate((self.hval, zeros([self.n])), axis=0) self.valid = concatenate((self.valid, zeros([self.n])), axis=0) self.memLen += self.n #write data self.xval[index] = x if type(ft) is tuple: self.fval[index] = ft[0] self.hval[index] = ft[2].sum() else: self.fval[index] = ft self.valid[index] += 1 return None
[docs] def finalize(self): """ Removes the space beyond the recorded iteration with highest iteration number. This space was reserved for the last chunk, but the highest recorded iteration may not be the one recorded at the end of the chunk. Must be called after the iterative algorithm is stopped. """ if self.valid is None: return nonZeroIndex = self.valid.nonzero() lastIndex=nonZeroIndex[0].max()+1 self.xval = self.xval[0:lastIndex][:] self.fval = self.fval[0:lastIndex] if self.hval is not None: self.hval = self.hval[0:lastIndex] self.valid = self.valid[0:lastIndex]
[docs] def reset(self): """ Clears the :attr:`xval`, :attr:`fval`, :attr:`hval`, and :attr:`valid` members. """ self.xval = None self.fval = None self.hval = None self.valid = None self.memLen = self.n