10.3.6. Performing many differential evolution runs on a cluster of workstations¶
Suppose we want to evaluate te performance of differential evolution for all combinations of n population size and m test functions. Every function/population size must be run k times. This gives us k * l * m differential evolution runs which is quite a bit of work. Suppose we want to distribute this work to a bunch of computers.
First we must define a function that runs differential evolution for a given problem with a given population size. When a run is finished the function returns the best function value found and the function evaluation history.
File funclib.py in folder demo/parallel/desweep/
# A function that executes a run of given function with given population size.
# Must be in a module so it can be pickled and sent to a task.
from pyopus.optimizer.de import DifferentialEvolution
from pyopus.optimizer.base import CostCollector
from numpy import array, zeros, random
from pyopus.parallel.mpi import MPI
def deRun(prob, popSize, runIndex, maxiter=75000, maxGen=1500, w=0.5, pc=0.3, seed=None):
hostID=MPI.hostID()
taskID=MPI.taskID()
print(str(hostID)+" "+str(taskID)+(" evaluating %s, run=%2d, popsize=%3d" % (prob.name, runIndex+1, popSize)))
# Value of seed must be None so that every run has a different random number sequence
opt=DifferentialEvolution(
prob, prob.xl, prob.xh, debug=0, maxiter=maxiter,
maxGen=maxGen, populationSize=popSize, w=w, pc=pc, seed=None
)
cc=CostCollector()
opt.installPlugin(cc)
opt.reset(zeros(len(prob.xl)))
opt.run()
cc.finalize()
return (opt.f, cc.fval)
Now let’s take a look at the main program in file funclib.py in folder demo/parallel/desweep/. First we import some things
import os
from numpy import array, zeros, arange
from pickle import dump
from pyopus.problems import glbc
from pyopus.parallel.cooperative import cOS
from pyopus.parallel.mpi import MPI
import funclib
Next we define the parameters of the run.
# First three problems of GlobalBCsuite
funcIndicesList=[0,1,2]
# For the whole GlobalBCsuite uncomment this
# range(len(glbc.GlobalBCsuite))
# Population sizes for which we want to evaluate the performance of DE
popSizeList=[10, 20] # range(10, 101, 40)
# Number of runs
nRun=3
# Number of generations (unlimited)
nGen=None # or a number to limit the number of generations
# Maximal number of function evaluations
maxIter=75000 # or None for no limit
# Do we want to write function evaluation history to files (one file per run)
writeFhist=True
The job generator produces tuples of the form (function, args, kwargs, misc). The last entry of the tuple contains miscellaneous data that helps the results collector to organize the collected results. This entry does not affect job evaluation.
def jobGenerator():
for atFunc in range(len(funcIndicesList)):
for atPopSize in range(len(popSizeList)):
for atRun in range(nRun):
yield (
funclib.deRun,
[],
{
'prob': glbc.GlobalBCsuite[funcIndicesList[atFunc]](),
'popSize': popSizeList[atPopSize],
'runIndex': atRun,
'maxiter': maxIter,
'maxGen': nGen,
},
# Extra data not passed to deRun
(atFunc, atPopSize, atRun)
)
The results collector is an unprimed coroutine. It writes the function evaluation history
to a file named fhist_f<index>_p<popSize>_r<runIndex>.pck
. It also stores the lowest
function value in the finalF dictionary.
def resultsCollector(finalF):
try:
while True:
index, job, result = yield
atFunc, atPopSize, atRun = job[3] # Get extra data
fBest, fHistory = result
print("Received results for %s, run=%2d, popsize=%3d " % (
glbc.GlobalBCsuite[funcIndicesList[atFunc]].name,
atRun+1,
popSizeList[atPopSize]
))
if writeFhist:
with open("fhist_f%d_p%d_r%d.pck" % (funcIndicesList[atFunc], popSizeList[atPopSize], atRun), "wb") as fp:
dump(fHistory, fp, protocol=-1)
finalF[atFunc][atPopSize][atRun]=fBest
except GeneratorExit:
print("No more results to collect.")
Finally the main part of the program dispatches the generated jobs and writes some summary
information to a file named fsummary.pck
.
if __name__=='__main__':
cOS.setVM(MPI(startupDir=os.getcwd()))
# Prepare results storage
finalF=zeros((len(funcIndicesList), len(popSizeList), nRun))
# Dispatch jobs
cOS.dispatch(
jobList=jobGenerator(),
collector=resultsCollector(finalF),
remote=True
)
# Prepare function names
names=[]
dims=[]
for i in funcIndicesList:
prob=glbc.GlobalBCsuite[i]()
names.append(prob.name)
dims.append(prob.n)
# Store summary
summary={
'funcIndices': funcIndicesList,
'names': names,
'dims': dims,
'populationSizes': popSizeList,
'finalF': finalF
}
with open("fsummary.pck", "wb") as fp:
dump(summary, fp, protocol=-1)
cOS.finalize()
You can run the example on a ,local computer in parallel using the available processors with
mpirun -n <nproc> python depop.py
See Using MPI with PyOPUS on how to run a program across mutiple machines.
After the run is finished you end up with a bunch of .pck files. To display the stored function evaluation history for a particular run, you can use the program in File an.py in folder demo/parallel/desweep/
# Load results summary, plot function value history for given function,
# population size, and run.
# Run this by typing
# python3 an.py
from numpy import array, zeros, arange
from pickle import dump, load
from pyopus.plotter import interface as pyopl
from pprint import pprint
if __name__=='__main__':
# Which history to load
findex=0 # Function index
popsize=10 # Population size
runindex=0 # Run index (starting with 0)
# Load the summary and print it
with open("fsummary.pck", "rb") as f:
summary=load(f)
pprint(summary)
# Read the history from a file
with open("fhist_f%d_p%d_r%d.pck" % (findex, popsize, runindex), "rb") as f:
fhist=load(f)
# Create a figure
f1=pyopl.figure()
pyopl.lock(True)
if pyopl.alive(f1):
# Draw the history
ax=f1.add_subplot(1,1,1)
ax.semilogy(arange(len(fhist))/popsize, fhist)
ax.set_xlabel('generation')
ax.set_ylabel('f')
ax.set_title('Progress of differential evolution')
ax.grid()
pyopl.draw(f1)
pyopl.lock(False)
# Wait for the plot window to close
pyopl.join()