10.2.3. Spawning a task on a (remote) hostΒΆ

File 02-spawn.py in folder demo/parallel/vm/

# Spawns a remote task, handles messages (collects results, detects exit)

import sys
from pyopus.parallel.mpi import MPI as VM
# By default stdout is forwarded to the mpirun/mpiexec terminal

import funclib 
from pyopus.parallel.base import MsgTaskExit, MsgTaskResult
import os, time

if __name__=='__main__':
	# Startup dir must be the same as the one where funclib is located 
	# so we can import it (funclib is not in PYTHONPATH). 
	# MPI guarantees this by default, while PVM does not. 
	vm=VM(startupDir=os.getcwd(), debug=2)
	
	# Get host list. 
	hostIDs=vm.hosts()
	initialFreeSlots=vm.freeSlots()
	print("Hosts: ")
	for hostID in hostIDs:
		print("  "+str(hostID))
	print("Free slots: "+str(initialFreeSlots))
	
	# Spawn 2 tasks anywhere, send vm as argument with name 'vm'.  
	# The spawned function must be defined in an importable module outside main .py file. 
	print("\nSpawning 2 tasks, anywhere.")
	taskIDs=vm.spawnFunction(funclib.hello, kwargs={'vm': vm}, count=2)
	print("Spawned: ")
	for task in taskIDs:
		print("  ", str(task))
	print("Free slots: "+str(vm.freeSlots())+"\n")
	
	print("----\n"+vm.formatSpawnerConfig()+"----")
	
	# Blocking receive 4 messages (2 return values and 2 exit)
	while vm.freeSlots()!=initialFreeSlots:
		received=vm.receiveMessage()
		
		# Handle error (None) and timeout (empty tuple)
		if received is None or len(received)==0:
			continue
		
		# Unpack
		(fromId, msg)=received
		
		# Note that the received message may be comming from a dead worker. 
		# Verify that the message is comming from one of our workers. 
		if fromId not in taskIDs:
			continue
		
		# Handle message
		if type(msg) is MsgTaskResult:
			print("Received from "+str(fromId)+" TaskResult success="+str(msg.success)+"\n  "+str(msg.returnValue))
		elif type(msg) is MsgTaskExit:
			print("Received from "+str(fromId)+" TaskExit")
		
		print("Free slots: "+str(vm.freeSlots())+"\n")
	
	print("----\n"+vm.formatSpawnerConfig()+"----")
	
	vm.finalize()

The hello() function is defined in the funclib module (file funclib.py in folder demo/parallel/vm/) as

def hello(vm):
	# Print some output
	print("Worker "+str(vm.taskID())+" at "+str(vm.hostID())+".")
	sys.stdout.flush()
	
	# Return a message with my task ID and host
	return "Hello, I am worker "+str(vm.taskID())+" on "+str(vm.hostID())+" in "+str(os.getcwd())+"."