10.2.7. Spawning remote tasks and collecting results

This example spawns remote tasks and then waits for the results. This demo reuqires at least 5 slots in the cluster (one for the spawner and 4 for the spawned tasks).

File 06-spawn-result.py in folder demo/parallel/vm/

# Spawning multiple tasks and collecting results

import sys
from pyopus.parallel.mpi import MPI as VM
	
from pyopus.parallel.base import MsgTaskExit, MsgTaskResult

import funclib
import os

if __name__=='__main__':
	vm=VM(startupDir=os.getcwd(), debug=0)
	
	# Prepare expressions
	exprList=["1+1", "5*5", "bla*bla", "2**7"]
	
	# Create expression to taskID map, initialize values to None
	expr2taskID={}
	expr2taskID.fromkeys(exprList)
	
	# Spawn evaluators that send MsgTaskResult messages with return value (sendBack=True). 
	taskIDList=[]
	running=[]
	taskCount=0
	for expr in exprList:
		print("Spawning evaluator for: "+expr)
		taskIDs=vm.spawnFunction(funclib.pyEvaluator, kwargs={'vm': vm, 'expr': expr}, count=1, sendBack=True)
		if len(taskIDs)>0:
			# Spawn OK
			taskIDList.extend(taskIDs)
			running.extend(taskIDs)
			expr2taskID[expr]=taskIDs[0]
			taskCount+=1
			print("  Task ID: %s" % str(taskIDs[0]))
		else:
			taskIDList.append(None)
			print("  Not spawned")
	
	print 
	
	# Collect results from successfully spawned workers and wait for them to exit. 
	running=set(running)
	results={}
	while len(running)>0 and len(results)<taskCount:
		# Receive message, block
		recv=vm.receiveMessage(-1)
		# Process it
		if recv is not None and len(recv)==2:
			(srcID, msg)=recv
			# Process received result
			if type(msg) is MsgTaskExit:
				for idd in list(running):
					print("R", idd, idd==srcID)
				print("src", srcID)
				running.remove(srcID)
			elif type(msg) is MsgTaskResult:
				results[srcID]=msg.returnValue
	
	# Print results
	for ii in range(len(exprList)):
		expr=exprList[ii]
		taskID=taskIDList[ii]
		if taskID is not None:
			result=results[taskID]
			if result is None:
				print("%s=[EVAL ERROR]" % expr)
			else:
				print("%s=%s" % (expr, str(result)))
		else:
			print("%s=[NOT SPAWNED]" % expr)
	
	vm.finalize()
	

The pyEvaluator() function is defined in the funclib module (file funclib.py in folder demo/parallel/vm/). The function evaluates the expression that is passed to it as a string and returns the result.

def pyEvaluator(vm=None, expr=None):
	# Evaluate expression and return the result. Return None if error occurs. 
	try:
		result=eval(expr)
	except:
		result=None
	
	return result