10.3.2. Remote tasks (task outsourcing)¶
Suppose you want to outsource tasks to other processors in the system (either in the same physical machine or in a remote machine). First the task must be defined as a function that returns the task result. The function must be defined in a module so that it can be pickled and sent to a remote processor.
In this example the printMsgMPI()
function is defined in file
funclib.py in folder
demo/parallel/cooperative/
It prints message msg n times along with the host and task ID assigned by the MPI
subsystem.
def printMsgMPI(msg, n):
# Same as previous, except that prints the host and the task before the message
hostID=MPI.hostID()
taskID=MPI.taskID()
for ii in range(n):
print("h="+str(hostID)+" t="+str(taskID)+": "+msg+" : "+str(ii))
cOS.Yield()
return n
The following Python program spawns two concurrent local tasks and two remote tasks. The actual CPU where a remote task will run is assigned by MPI.
File 02-remote.py in folder demo/parallel/cooperative/
# Outsources tasks, run this example as
# mpirun -n 4 python3 02-remote.py
#
# Under Windows you should use Microsoft MPI. mpiexec and python should be in
# the system path.
#
# mpiexec /np <number of processes> python 02-remote.py
from pyopus.parallel.cooperative import cOS
from pyopus.parallel.mpi import MPI
from funclib import printMsgMPI
if __name__=='__main__':
# Set up MPI
cOS.setVM(MPI())
# Spawn two tasks (locally)
tidA=cOS.Spawn(printMsgMPI, kwargs={'msg': 'Hello A', 'n': 10})
tidB=cOS.Spawn(printMsgMPI, kwargs={'msg': 'Hello B', 'n': 20})
# Spawn two remote tasks
tidC=cOS.Spawn(printMsgMPI, kwargs={'msg': 'Hello C', 'n': 15}, remote=True)
tidD=cOS.Spawn(printMsgMPI, kwargs={'msg': 'Hello D', 'n': 18}, remote=True)
# IDs of running tasks
running=set([tidA,tidB,tidC,tidD])
# Wait for all tasks to finish
while len(running)>0:
# Wait for any task
retval=cOS.Join()
# Wait for tasks with specified IDs
# retval=cOS.Join(running)
# Remove IDs of finished tasks
for tid in retval.keys():
print("Task: "+str(tid)+" finished, return value: "+str(retval[tid]))
running.remove(tid)
# Cleanup and exit MPI, need to do this if MPI is used
cOS.finalize()