10.3.4. Dispatching an unknown number of tasks, collecting results with a results collector¶
Sometimes the number of tasks to dispatch is not known in advance. The time when to stop generating tasks may depend on the task results. This example demonstrates how to generate tasks and stop when a certain condition is met.
A job is evaluated by the
jobProcessor() function defined in file
funclib.py in folder
def jobProcessor(value): # Process a job (value), return result (multiply value by 2) hostID=MPI.hostID() taskID=MPI.taskID() print("Processing "+str(value)+ " on "+ str(hostID)+" "+str(taskID)) return 2*value
This time the generator is more complex so we define it as a function. The stopping
condition is met when the stopFlag global variable is set to
True. This generator
generates jobs that multiply values by 2. The values form a sequence starting at start
with step given by step.
def dynJobGenerator(start, step=1): global stopFlag # Reset stop flag stopFlag=False ii=start while not stopFlag: yield (jobProcessor, [ii]) ii+=step print("Generator finished.")
cOS.dispatch() collects the results and puts them in a list in the order
in which the jobs were generated. This time we use a more complex mechanism for colelcting
the results - a results collector. A results collector is an unprimed coroutine that
receives results and organizes them in some data structures. This enables us to set the
stopFlag depending on the received result.
def resultsCollector(resultStorage, stopAtResult=None): # Collect results global stopFlag try: while True: (index, value, result)=yield print("Result for value="+str(value)+" is "+str(result)) # Make space if len(resultStorage)<=index: resultStorage.extend([None]*(index+1-len(resultStorage))) resultStorage[index]=result # This is used only in example 04 # Set stop flag if stopAt specified and result reaches stopFlag # This stops the job generator if stopAtResult is not None and result>=stopAtResult and stopFlag is False: print("Result", stopAtResult, "reached, stopping generator.") stopFlag=True except GeneratorExit: print("Collector finished")
The stopFlag is set when a result is received that is greater or equal to stopAtResult. In our example the results are stored in a list passed ba the resultStorage argument. A results collector can detect when there are no more results to collect by catching the GeneratorExit exception.
Of course one must also define the stopFlag:
Finally we put it all together.
File 04-dyndispatch.py in folder demo/parallel/cooperative/
# Dispatches tasks to computational nodes until specified result is reached or exceeded # This example also demonstrates the use of a collector. # mpirun -n 4 python3 04-dyndispatch.py # # Under Windows you should use Microsoft MPI. mpiexec and python should be in # the system path. # # mpiexec /np <number of processes> python 04-dyndispatch.py from pyopus.parallel.cooperative import cOS from pyopus.parallel.mpi import MPI from funclib import dynJobGenerator, resultsCollector if __name__=='__main__': # Set up MPI cOS.setVM(MPI()) # This list will be filled with results results= # Dispatch the jobs # Note that when a results collector is specified a dispatch does not return a list # with job results, unless you pass the buildResultList option and set it to True. cOS.dispatch( jobList=dynJobGenerator(start=0, step=1), # Start at 0, increase by one collector=resultsCollector(results, stopAtResult=150), remote=True ) print("Results: "+str(results)) # Finish, need to do this if MPI is used cOS.finalize()