10.3.3. Dispatching a set of tasks and collecting the resultsΒΆ

Outsourcing a set of tasks and collecting the results is simple. You don’t have to do any calls to cOS.Spawn() or cOS.Yield(). Instead you can use cOS.dispatch().

A job is evaluated by the jobProcessor() function defined in file funclib.py in folder demo/parallel/cooperative/.

def jobProcessor(value):
	# Process a job (value), return result (multiply value by 2)
	hostID=MPI.hostID()
	taskID=MPI.taskID()
	
	print("Processing "+str(value)+ " on "+ str(hostID)+" "+str(taskID))
	return 2*value

Every job is specified in the form of a tuple where the first entry is the function and the second entry is the list of the positional arguments. The jobs are generated by a generator that is defined with a simple generator expression.

Dispatching is asynchronous. This means that if you have less processors than jobs a processor will receive a new job as soon as the previous one is finished. This is repeated until all jobs are processed.

File 03-dispatch.py in folder demo/parallel/cooperative/

# Dispatches a fixed number of tasks to computational nodes
# Run it by typing
#  mpirun -n 4 python3 03-dispatch.py
# If you run it with 
#  python3 03-dispatch.py
# only the local processor will be used. 
#
# Under Windows you should use Microsoft MPI. mpiexec and python should be in 
# the system path. 
#
#   mpiexec /np <number of processes> python 03-dispatch.py

from pyopus.parallel.cooperative import cOS
from pyopus.parallel.mpi import MPI
from funclib import jobProcessor

if __name__=='__main__':
	# Set up MPI
	cOS.setVM(MPI())
	
	# This generator produces 100 jobs which are tuples of the form
	#   (function, args)
	jobGen=((jobProcessor, [value]) for value in range(100))
	
	# Dispatch jobs and collect results
	results=cOS.dispatch(jobList=jobGen, remote=True)

	# Results are put in the list in the ame order as the jobs are generated by jobGen
	print("Results: "+str(results))

	# Finish, need to do this if MPI is used
	cOS.finalize()