10.3.4. Dispatching an unknown number of tasks, collecting results with a results collector¶
Sometimes the number of tasks to dispatch is not known in advance. The time when to stop generating tasks may depend on the task results. This example demonstrates how to generate tasks and stop when a certain condition is met.
A job is evaluated by the jobProcessor()
function defined in file
funclib.py in folder
demo/parallel/cooperative/.
def jobProcessor(value):
# Process a job (value), return result (multiply value by 2)
hostID=MPI.hostID()
taskID=MPI.taskID()
print("Processing "+str(value)+ " on "+ str(hostID)+" "+str(taskID))
return 2*value
This time the generator is more complex so we define it as a function. The stopping
condition is met when the stopFlag global variable is set to True
. This generator
generates jobs that multiply values by 2. The values form a sequence starting at start
with step given by step.
def dynJobGenerator(start, step=1):
global stopFlag
# Reset stop flag
stopFlag=False
ii=start
while not stopFlag:
yield (jobProcessor, [ii])
ii+=step
print("Generator finished.")
By default cOS.dispatch()
collects the results and puts them in a list in the order
in which the jobs were generated. This time we use a more complex mechanism for colelcting
the results - a results collector. A results collector is an unprimed coroutine that
receives results and organizes them in some data structures. This enables us to set the
stopFlag depending on the received result.
def resultsCollector(resultStorage, stopAtResult=None):
# Collect results
global stopFlag
try:
while True:
(index, value, result)=yield
print("Result for value="+str(value)+" is "+str(result))
# Make space
if len(resultStorage)<=index:
resultStorage.extend([None]*(index+1-len(resultStorage)))
resultStorage[index]=result
# This is used only in example 04
# Set stop flag if stopAt specified and result reaches stopFlag
# This stops the job generator
if stopAtResult is not None and result>=stopAtResult and stopFlag is False:
print("Result", stopAtResult, "reached, stopping generator.")
stopFlag=True
except GeneratorExit:
print("Collector finished")
The stopFlag is set when a result is received that is greater or equal to stopAtResult. In our example the results are stored in a list passed ba the resultStorage argument. A results collector can detect when there are no more results to collect by catching the GeneratorExit exception.
Of course one must also define the stopFlag:
stopFlag=False
Finally we put it all together.
File 04-dyndispatch.py in folder demo/parallel/cooperative/
# Dispatches tasks to computational nodes until specified result is reached or exceeded
# This example also demonstrates the use of a collector.
# mpirun -n 4 python3 04-dyndispatch.py
#
# Under Windows you should use Microsoft MPI. mpiexec and python should be in
# the system path.
#
# mpiexec /np <number of processes> python 04-dyndispatch.py
from pyopus.parallel.cooperative import cOS
from pyopus.parallel.mpi import MPI
from funclib import dynJobGenerator, resultsCollector
if __name__=='__main__':
# Set up MPI
cOS.setVM(MPI())
# This list will be filled with results
results=[]
# Dispatch the jobs
# Note that when a results collector is specified a dispatch does not return a list
# with job results, unless you pass the buildResultList option and set it to True.
cOS.dispatch(
jobList=dynJobGenerator(start=0, step=1), # Start at 0, increase by one
collector=resultsCollector(results, stopAtResult=150),
remote=True
)
print("Results: "+str(results))
# Finish, need to do this if MPI is used
cOS.finalize()