10.3.4. Dispatching an unknown number of tasks, collecting results with a results collector

Sometimes the number of tasks to dispatch is not known in advance. The time when to stop generating tasks may depend on the task results. This example demonstrates how to generate tasks and stop when a certain condition is met.

A job is evaluated by the jobProcessor() function defined in file funclib.py in folder demo/parallel/cooperative/.

def jobProcessor(value):
	# Process a job (value), return result (multiply value by 2)
	hostID=MPI.hostID()
	taskID=MPI.taskID()
	
	print("Processing "+str(value)+ " on "+ str(hostID)+" "+str(taskID))
	return 2*value

This time the generator is more complex so we define it as a function. The stopping condition is met when the stopFlag global variable is set to True. This generator generates jobs that multiply values by 2. The values form a sequence starting at start with step given by step.

def dynJobGenerator(start, step=1):
	global stopFlag
	
	# Reset stop flag
	stopFlag=False
	
	ii=start
	while not stopFlag:
		yield (jobProcessor, [ii])
		ii+=step
	
	print("Generator finished.")

By default cOS.dispatch() collects the results and puts them in a list in the order in which the jobs were generated. This time we use a more complex mechanism for colelcting the results - a results collector. A results collector is an unprimed coroutine that receives results and organizes them in some data structures. This enables us to set the stopFlag depending on the received result.

def resultsCollector(resultStorage, stopAtResult=None):
	# Collect results
	global stopFlag
	try:
		while True:
			(index, value, result)=yield
			print("Result for value="+str(value)+" is "+str(result))
			
			# Make space
			if len(resultStorage)<=index:
				resultStorage.extend([None]*(index+1-len(resultStorage)))
			
			resultStorage[index]=result
			
			# This is used only in example 04
			# Set stop flag if stopAt specified and result reaches stopFlag
			# This stops the job generator
			if stopAtResult is not None and result>=stopAtResult and stopFlag is False:
				print("Result", stopAtResult, "reached, stopping generator.")
				stopFlag=True
			
	except GeneratorExit:
		print("Collector finished")

The stopFlag is set when a result is received that is greater or equal to stopAtResult. In our example the results are stored in a list passed ba the resultStorage argument. A results collector can detect when there are no more results to collect by catching the GeneratorExit exception.

Of course one must also define the stopFlag:

stopFlag=False

Finally we put it all together.

File 04-dyndispatch.py in folder demo/parallel/cooperative/

# Dispatches tasks to computational nodes until specified result is reached or exceeded
# This example also demonstrates the use of a collector. 
#  mpirun -n 4 python3 04-dyndispatch.py
#
# Under Windows you should use Microsoft MPI. mpiexec and python should be in 
# the system path. 
#
#   mpiexec /np <number of processes> python 04-dyndispatch.py

from pyopus.parallel.cooperative import cOS
from pyopus.parallel.mpi import MPI
from funclib import dynJobGenerator, resultsCollector

if __name__=='__main__':
	# Set up MPI
	cOS.setVM(MPI())

	# This list will be filled with results
	results=[]

	# Dispatch the jobs
	# Note that when a results collector is specified a dispatch does not return a list 
	# with job results, unless you pass the buildResultList option and set it to True. 
	cOS.dispatch(
		jobList=dynJobGenerator(start=0, step=1), # Start at 0, increase by one
		collector=resultsCollector(results, stopAtResult=150), 
		remote=True
	)

	print("Results: "+str(results))

	# Finish, need to do this if MPI is used
	cOS.finalize()