3.1. pyopus.parallel.base
— Base classes for virtual machines
Base classes for virtual machines (PyOPUS subsystem name: VM)
A spawner task is a task that can spawn new tasks on hosts in the virtual machine. All other tasks are worker tasks.
Mirroring and local storage
Often tasks in the virtual machine require the use of additional storage (like a harddisk) for storing the parallel algorithm’s input files which are identical for all hosts. These files must therefore be distributed to all hosts in a virtual machine before the computation can begin.
One way to make all virtual machines see the same directory structure is to use network filesystems like NFS or SMBFS. In this approach the storage physically resides one a computer in the network (file server). This storage is exported and mounted by all hosts in the virtual machine.
Take for instance that folder /shareme/foo
on the file server is exported.
Hosts in the virtual machine mount this folder under /home
. This way all
hosts see the /shareme/foo
folder located on the physical storage of the
server as their own folder /home
.
This approach is very simple and ensures that all hosts see the same input files in the same place. Unfortunately as the number of hosts grows and the amount of data read operatins to the shared folder grows, the network quickly becomes saturated and the computational performance of the virtual machine dramatically decreases because tasks must wait on slow data read operations.
A more scalable solution is to use the local storage of every host to store the algoruthm’s input files. This has the downside that before the computation begins these files must be distributed to all hosts.
On the other hand parallel algorithms often require some local storage for storing various intermediate files. If this local storage is in the form of a shared folder an additional problem occurs when multiple hosts try to write a file with the same name to a physically same place, but with different content.
A solution to the intermediate file problem is to use every host’s phisically local storage (e.g. its local harddisk) for storing the intermediate files.
The solution to these problems in PyOPUS is to use mirroring. Mirroring is
configured through two environmental variables. PARALLEL_LOCAL_STORAGE
specifies the path to the folder where the folders for the local storage of
input and intermediate files will be created. If this environmental variable is
not specified a reasonable default is used (spedified by the tempfile
module).
The PARALLEL_MIRRORED_STORAGE
environmental variable specifies a
colon (:
) separated list of paths to the directories that are mounted from
a common file server. The first such path in PARALLEL_MIRRORED_STORAGE
on
all hosts corresponds to the same directory on the file server. The same goes
for the second, the third, etc. paths listed in PARALLEL_MIRRORED_STORAGE
.
Under Windows the list separator is semicolon (;
).
The PARALLEL_MIRRORED_STORAGE
environmental variable must be set on all
hosts in a virtual machine, if mirroring is used. It is usually set to
/localhome/USERNAME
which must physically reside on the local machine.
PARALLEL_MIRRORED_STORAGE
should be writeable by the user that is running
the spawned processes.
If PARALLEL_MIRRORED_STORAGE
is not set the default under Linux is to
use the user’s home folder. Under Windows the current folder is used by
default.
Both PARALLEL_LOCAL_STORAGE
and PARALLEL_MIRRORED_STORAGE
can use UNIX
style user home directory expansion (e.g. ~user
expands to /home/user
).
Path translation in mirroring operations
Suppose the PARALLEL_MIRRORED_STORAGE
environmental variable is set to
/foo:/bar
on host1 and /d1:/d2
on host2. This means that the following
directories are equivalent (mounted from the same exported folder on a file
server)
host1
host2
/foo
/d1
/bar
/d2
So /foo
on host1 represents the same physical storage as /d1
on host2.
Similarly /bar
on host1 represents the same physical storage as /d2
on
host2. Usually only /home
is common to all hosts in a virtual machine so
PARALLEL_MIRRORED_STORAGE
is set to /home
on all hosts.
Path translation converts a path on host1 into a path to the physically same object (mounted from the same exported directory on the file server) on host2. The following is an example of path translation.
Path on host1
Path on host2
/foo/a
/d1/a
/bar/b
/d2/b
- class pyopus.parallel.base.HostID[source]
Basic host identifier class that is used for identifying a host in a virtual machine.
HostID
objects must support comparison (__eq__()
and__ne__()
), hashing (__hash__()
), and conversion to a string (__str__()
).
- class pyopus.parallel.base.Msg[source]
Base class for a message used in task-to-task communication.
- class pyopus.parallel.base.MsgHostAdd(hostIDs)[source]
This is message that signals new hosts were added to the virtual machine.
The list of
HostID
objects corresponding to the added hosts is stored in thehostIDs
member.
- class pyopus.parallel.base.MsgHostDelete(hostID)[source]
This is message that signals a host has exited from the virtual machine.
The
HostID
object corresponding to the host is stored in thehostID
member.
- class pyopus.parallel.base.MsgTaskExit(taskID)[source]
This is message that signals a task has exited.
The
TaskID
object corresponding to the task is stored in thetaskID
member.
- class pyopus.parallel.base.MsgTaskResult(success, returnValue)[source]
This is message that is sent to the process that spawned a Python function in a virtual machine. The message holds a boolean flag that tells if the function succeeded to run and the return value of the function.
The boolean flag and the return value can be found in
success
andreturnValue
members.
- class pyopus.parallel.base.TaskID[source]
Basic task identifier class that is used for identifying a task in a virtual machine.
TaskID
objects must support comparison (__eq__()
and__ne__()
), hashing (__hash__()
), and conversion to a string (__str__()
).
- class pyopus.parallel.base.VirtualMachine(debug=0, startupDir=None, translateStartupDir=True, mirrorMap=None, persistentStorage=False)[source]
The base class for accessing hosts working in parallel.
debug specifies the debug level. If it is greater than 0 debug messages are printed on the standard output.
startupDir specifies the working directory where the spawned functions will wake up. If set to
None
, the underlying virtual machine default is used.If translateStartupDir is
True
path translation is applied to startupDir just as it is to mirrorMap. If mirroring is defined (mirrorMap is given) then no translation is performed on startupDir In this case startupDIr is treated as relative path with respect to the local storage.mirrorMap is a dictionary specifying filesystem objects (files and directories) on the spawner which are to be mirrored (copied) to local storage on the host where the task is spawned. The keys represent paths on the spawner while the values represent the corresponding paths (relative to the local storage directory) on the host where the task will be spawned. Keys can use UNIX style globbing (anything the
glob.glob()
function can handle is OK). If mirrorMap is set toNone
, no mirroring is performed.For the mirroring to work the filesystem objects on the spawner must be in the folders specified in the
PARALLEL_MIRRORED_STORAGE
environmental variable. This is because mirroring is performed by local copy operations which require the source to be on a mounted network filesystem.If persistentStorage is enabled the local storage folders are not deleted after a spawned task is completed. Furthermore, if a local storage folder already exists for the given PID/VM index combination no mirroring is performed and that folder is used as local storage.
Persistent local storage makes spawning a task much faster as mirroring takes place ony when the first task is spawned under some PID/VM index.
If you enable persistentStorage, make sure you clear local storage at system startup so that old local storage folders from previous runs are not mistakenly used in subsequent runs. PID wraparound can also be a problem, albeit this is very unlikely to occur.
To find out more about setting the working directory and mirroring see the
prepareEnvironment()
method.- cleanupEnvironment(taskStorage, forceCleanup=False)[source]
Removes the working environment prepared for a remote task.
taskStorage is the path to the local storage directory. This directory is removed.
If taskStorage is
None
nothing is removed.
- clearLocalStorage(timeout=-1.0)[source]
This function spawns the
localStorageCleaner()
function on all slots in the virtual machine and on the master. The spawned instances remove the local storage that was created for the slot.timeout is applied where needed. Negative values stand for infinite timeout.
This function should never be called if there are tasks running in the virtual machine because it will remove their local storage.
This function should be called only by the spawner.
- createLocalStorage()[source]
Creates a local storage directory subtree given by subpath under
PARALLEL_LOCAL_STORAGE/pyopus-<username>
.If a local storage directory with the same name already exists it is suffixed by an underscore and a hexadecimal number.
Returns the path to the created local storage directory and a flag that is
True
if the storage folder was created.
- static formatSpawnerConfig()[source]
Formats the configuration information gathered by a spawner task as a string. Works only if called by a spawner task.
- static hostID(task=None)[source]
Returns the
HostID
object corresponding to task.If task is not specified returns the
HostID
of the host on which the caller task runs.
- static hosts()[source]
Returns the list of
HostID
objects representing the nosts in the virtual machine. Works only for hosts that are spawners.
- static hostsWithFreeSlots()[source]
Returns set of objects of class
HostID
corresponding to hosts with at least one free slot.
- markPersistentObject(obj)[source]
Marks and object as persistent. Returns its ID which is valid across the whole virtual machine. Adding the same object again has no effect unless
removePersistentObject()
is called first.
- static parentTaskID()[source]
Returns the
TaskID
object corresponding to the task that spawned the caller task.
- prepareEnvironment()[source]
Prepares the working environment (working directory and local storage) for a spawned function. This method is called by a spawned task. The mirroring information is received from the spawner (spawner’s mirrorList) at function spawn time. Spawned task’s virtual machine object is namely the spawner’s virtual machine object sent to the spawned task.
If mirroring is not configured with the
setSpawn()
method, the working directory is the one specified as startupDir with an appropriate path translation applied to it. If it isNone
the working directory is determined by the undelying virtual machine library (e.g. MPI).If mirroring is configured with
setSpawn()
(mirrorList is notNone
), a local storage directory is created by callingcreateLocalStorage()
with subpath set to<PID>_vm<index>
, wherePID
is the PID of the calling process in hexadecimal notation andindex
is the spawner VM object index.Next mirroring is performed by traversing all members of the processed mirrorList dictionary received from the spawner which is a list of tuples of the form (index, suffix, target) where index and suffix specify the source filesystem object to mirror (see the
translateToAbstractPath()
function for the explanation of index and suffix) while target is the destination where the object will be copied.The source can be specified with globbing characters (anything the
glob.glob()
function can handle is OK).target is the path relative to the local storage directory where the source will be copied. Renaming of the source is not possible. target always specifies the destination directory.
If source is a directory, symbolic links within it are copied as symbolic links which means that they should be relative and point to mirrored filesystem objects in order to remain valid after mirroring.
If startupDir was given and it is not
None
the working directory is set to path given by startupDir that is relative to the local storage directory.If startupDir is
None
the working directory is set to the local storage directory.
Returns the path to the local storage directory.
- receiveMessage(timeout=-1.0)[source]
Receives a message (a Python object) and returns a tuple (senderTaskId, message)
The sender of the message can be identified through the senderTaskId object of class
TaskID
.If timeout is negative the function waits (blocks) until some message arrives. If timeout>0 seconds pass without receiving a message, an empty tuple is returned. Zero timeout performs a nonblocking receive which returns an empty tuple if no message is received.
In case of an error the return value is
None
.
- sendMessage(destination, message)[source]
Sends message (a Python object) to a task with
TaskID
destination. ReturnsTrue
on success.
- static slots()[source]
Returns the number of slots for tasks in a virtual machine. Every processor represents a slot for one task.
- spawnFunction(function, args=(), kwargs={}, count=-1, targetList=None, sendBack=True)[source]
Spawns a count instances of a Python function on remote hosts and passes args and kwargs to the function. Spawning a function actually means to start a Python interpreter, import the function, and call it with args and kwargs.
If count is
None
the number of tasks is select in such way that all available slots are filled.function, args, and kwargs must be pickleable.
targetList specifies a list of hosts on which the function instances will be spawned. If it is
None
all hosts in the virtual machine are candidates for the spawned instances of the function.If sendBack is
True
the spawned tasks return the status and the return value of the function back to the spawner after the function exits. The return value must be pickleable.Returns a list of
TaskID
objects representing the spawned tasks.Works only if called by a spawner task.
- spawnerCall(function, srgs=(), kwargs={}, asynchr=False)[source]
Calls function with arg and kwargs in the spawner task. If asynchr is
False
waits for the call to finish and returns the function’s return value.If called in a worker task delegates the call to the spawner.
If called in a spawner task, calls function locally.
This function is useful for sending data to a central point (spawner) where it is processed (e.g. for database writes, logging, etc.).
- unmarkPersistentObject(obj)[source]
Unmarks an object as persistent. Raises an exception if the object is not persistent.
- vm_counter = 0
VM object counter used for assigning numbers to VM objects. These numbers are used for labelling local storage folders.
- pyopus.parallel.base.getNumberOfCores()[source]
Returns the number of available CPU cores.
Works for Linux, Unix, MacOS, and Windows.
Uses code from Parallel Python (http://www.parallelpython.com).
- pyopus.parallel.base.translateToAbstractPath(path)[source]
Translates a path on the local machine to a tuple (index, suffix) where index denotes the index of the path entry in the
PARALLEL_MIRRORED_STORAGE
environmental variable and suffix is the path relative to that entry.Note that suffix is a relative path even if it starts with
/
.
- pyopus.parallel.base.translateToActualPath(index, relPath)[source]
Translates a index and relPath to an actual path on the local machine.
This is the inverse of the
translateToAbstractPath()
function.