10.2.4. Measuring the message delay and throughput
File 03-messaging.py in folder demo/parallel/vm/
# Measures the message delay and average transfer speed for messages of various sizes
import sys
from pyopus.parallel.mpi import MPI as VM
from pyopus.parallel.base import MsgTaskExit, MsgTaskResult
import funclib
import os, time, sys
import numpy as np
if __name__=='__main__':
# Set work direcotry on worker to be the same as on the spawner.
vm=VM(startupDir=os.getcwd(), debug=1)
# Get hosts, find a non-local host
myHostID=vm.hostID()
# Find a remote host
for hostID in vm.hosts():
if hostID!=myHostID:
break
# See if we have at least one remote host.
if hostID==myHostID:
print("\nWarning. Measuring local communication speed.")
# Prepare data sizes
dataSizes=[0, 1, 10, 100, 1000, 10000, 100000, 1000000, 10000000]
# Spawn bounceBack()
taskIDs=vm.spawnFunction(funclib.bounceBack, kwargs={'vm': vm}, targetList=[hostID], count=1)
# Check if it succeeded
if len(taskIDs)<1:
print("Failed to spawn bounceBack().")
exit(-1)
taskID=taskIDs[0]
print("Task layout:")
print(vm.formatSpawnerConfig())
print("Measuring message delivery time and data throughput to "+str(hostID)+".")
print("Bounce back task: "+str(taskID))
# Go through data sizes
total_time=0
for dataSize in dataSizes:
# Create data
data=np.random.randint(0, 256, size=dataSize).astype(np.uint8)
# How many times do we need to cycle send/receive for runtime=1s?
if total_time>0 and oldDataSize>0:
# Calculate new repeats for 1 secons run
repeats=int(repeats/total_time*2.0*oldDataSize/dataSize)
if repeats==0:
repeats=1
else:
# Initial repeats
repeats=1000
# Warm up
for count in range(repeats):
vm.sendMessage(taskID, data)
dummy=vm.receiveMessage()
# Time
mark=time.time()
for count in range(repeats):
vm.sendMessage(taskID, data)
dummy=vm.receiveMessage()
total_time=time.time()-mark
# Evaluate
dt=total_time/2.0/repeats
oldDataSize=dataSize
tp=dataSize*8/dt
# Print result
print("Data size %9.3fkB, iterations=%6d, time=%7.0fus, speed=%5.3fMb/s" % (
dataSize/1000.0, repeats, dt*1e6, tp/1e6
))
sys.stdout.flush()
# Send None (will make bounceBack() exit.
vm.sendMessage(taskID, None)
# Wait for MsgTaskExit message
while True:
(src, msg)=vm.receiveMessage()
if type(msg) is MsgTaskExit:
break
vm.finalize()
The bounceBack()
function is defined in the funclib
module (file funclib.py
in folder demo/parallel/vm/).
This function bounces back every received message to its source.
def bounceBack(vm=None):
# Enter a loop, receive messages and send them back.
# If a None is received, exit.
# Loop
while True:
recv=vm.receiveMessage()
# Not an error and not timeout
if recv is not None and len(recv)==2:
# Get source and message
(sourceID, msg)=recv
# Check if we must exit
if msg is None:
break
# Send back
vm.sendMessage(sourceID, msg)
The output for communication between processes on the same host
Warning. Measuring local communication speed.
Task layout:
localhost ncpu=8 free slots: []
slot= 0 task= 0
slot= 1 task= 1
Measuring message delivery time and data throughput to localhost.
Bounce back task: 1:1
localhost_2538_0 VM: Changing working directory to '/mnt/data/Data/pytest/demo/parallel/vm'.
Data size 0.000kB, iterations= 10000, time= 37us, speed=0.000Mb/s
Data size 0.001kB, iterations= 10000, time= 37us, speed=0.216Mb/s
Data size 0.010kB, iterations= 2701, time= 38us, speed=2.118Mb/s
Data size 0.100kB, iterations= 2647, time= 38us, speed=21.308Mb/s
Data size 1.000kB, iterations= 2663, time= 40us, speed=199.249Mb/s
Data size 10.000kB, iterations= 2490, time= 48us, speed=1661.765Mb/s
Data size 100.000kB, iterations= 2077, time= 100us, speed=8023.758Mb/s
Data size 1000.000kB, iterations= 1002, time= 755us, speed=10601.659Mb/s
Data size 10000.000kB, iterations= 132, time= 12370us, speed=6467.453Mb/s
calypso_2537_0 MPI: Task 1:1 exit detected.