10.2.7. Spawning remote tasks and collecting results
This example spawns remote tasks and then waits for the results. This demo reuqires at least 5 slots in the cluster (one for the spawner and 4 for the spawned tasks).
File 06-spawn-result.py in folder demo/parallel/vm/
# Spawning multiple tasks and collecting results
import sys
from pyopus.parallel.mpi import MPI as VM
from pyopus.parallel.base import MsgTaskExit, MsgTaskResult
import funclib
import os
if __name__=='__main__':
vm=VM(startupDir=os.getcwd(), debug=0)
# Prepare expressions
exprList=["1+1", "5*5", "bla*bla", "2**7"]
# Create expression to taskID map, initialize values to None
expr2taskID={}
expr2taskID.fromkeys(exprList)
# Spawn evaluators that send MsgTaskResult messages with return value (sendBack=True).
taskIDList=[]
running=[]
taskCount=0
for expr in exprList:
print("Spawning evaluator for: "+expr)
taskIDs=vm.spawnFunction(funclib.pyEvaluator, kwargs={'vm': vm, 'expr': expr}, count=1, sendBack=True)
if len(taskIDs)>0:
# Spawn OK
taskIDList.extend(taskIDs)
running.extend(taskIDs)
expr2taskID[expr]=taskIDs[0]
taskCount+=1
print(" Task ID: %s" % str(taskIDs[0]))
else:
taskIDList.append(None)
print(" Not spawned")
print
# Collect results from successfully spawned workers and wait for them to exit.
running=set(running)
results={}
while len(running)>0 and len(results)<taskCount:
# Receive message, block
recv=vm.receiveMessage(-1)
# Process it
if recv is not None and len(recv)==2:
(srcID, msg)=recv
# Process received result
if type(msg) is MsgTaskExit:
for idd in list(running):
print("R", idd, idd==srcID)
print("src", srcID)
running.remove(srcID)
elif type(msg) is MsgTaskResult:
results[srcID]=msg.returnValue
# Print results
for ii in range(len(exprList)):
expr=exprList[ii]
taskID=taskIDList[ii]
if taskID is not None:
result=results[taskID]
if result is None:
print("%s=[EVAL ERROR]" % expr)
else:
print("%s=%s" % (expr, str(result)))
else:
print("%s=[NOT SPAWNED]" % expr)
vm.finalize()
The pyEvaluator()
function is defined in the funclib
module (file funclib.py
in folder demo/parallel/vm/).
The function evaluates the expression that is passed to it as a string and returns the result.
def pyEvaluator(vm=None, expr=None):
# Evaluate expression and return the result. Return None if error occurs.
try:
result=eval(expr)
except:
result=None
return result