3.1. pyopus.parallel.base — Base classes for virtual machines

Inheritance diagram of pyopus.parallel.base

Base classes for virtual machines (PyOPUS subsystem name: VM)

A spawner task is a task that can spawn new tasks on hosts in the virtual machine. All other tasks are worker tasks.

Mirroring and local storage

Often tasks in the virtual machine require the use of additional storage (like a harddisk) for storing the parallel algorithm’s input files which are identical for all hosts. These files must therefore be distributed to all hosts in a virtual machine before the computation can begin.

One way to make all virtual machines see the same directory structure is to use network filesystems like NFS or SMBFS. In this approach the storage physically resides one a computer in the network (file server). This storage is exported and mounted by all hosts in the virtual machine.

Take for instance that folder /shareme/foo on the file server is exported. Hosts in the virtual machine mount this folder under /home. This way all hosts see the /shareme/foo folder located on the physical storage of the server as their own folder /home.

This approach is very simple and ensures that all hosts see the same input files in the same place. Unfortunately as the number of hosts grows and the amount of data read operatins to the shared folder grows, the network quickly becomes saturated and the computational performance of the virtual machine dramatically decreases because tasks must wait on slow data read operations.

A more scalable solution is to use the local storage of every host to store the algoruthm’s input files. This has the downside that before the computation begins these files must be distributed to all hosts.

On the other hand parallel algorithms often require some local storage for storing various intermediate files. If this local storage is in the form of a shared folder an additional problem occurs when multiple hosts try to write a file with the same name to a physically same place, but with different content.

A solution to the intermediate file problem is to use every host’s phisically local storage (e.g. its local harddisk) for storing the intermediate files.

The solution to these problems in PyOPUS is to use mirroring. Mirroring is configured through two environmental variables. PARALLEL_LOCAL_STORAGE specifies the path to the folder where the folders for the local storage of input and intermediate files will be created. If this environmental variable is not specified a reasonable default is used (spedified by the tempfile module).

The PARALLEL_MIRRORED_STORAGE environmental variable specifies a colon (:) separated list of paths to the directories that are mounted from a common file server. The first such path in PARALLEL_MIRRORED_STORAGE on all hosts corresponds to the same directory on the file server. The same goes for the second, the third, etc. paths listed in PARALLEL_MIRRORED_STORAGE. Under Windows the list separator is semicolon (;).

The PARALLEL_MIRRORED_STORAGE environmental variable must be set on all hosts in a virtual machine, if mirroring is used. It is usually set to /localhome/USERNAME which must physically reside on the local machine. PARALLEL_MIRRORED_STORAGE should be writeable by the user that is running the spawned processes.

If PARALLEL_MIRRORED_STORAGE is not set the default under Linux is to use the user’s home folder. Under Windows the current folder is used by default.

Both PARALLEL_LOCAL_STORAGE and PARALLEL_MIRRORED_STORAGE can use UNIX style user home directory expansion (e.g. ~user expands to /home/user).

Path translation in mirroring operations

Suppose the PARALLEL_MIRRORED_STORAGE environmental variable is set to /foo:/bar on host1 and /d1:/d2 on host2. This means that the following directories are equivalent (mounted from the same exported folder on a file server)

host1

host2

/foo

/d1

/bar

/d2

So /foo on host1 represents the same physical storage as /d1 on host2. Similarly /bar on host1 represents the same physical storage as /d2 on host2. Usually only /home is common to all hosts in a virtual machine so PARALLEL_MIRRORED_STORAGE is set to /home on all hosts.

Path translation converts a path on host1 into a path to the physically same object (mounted from the same exported directory on the file server) on host2. The following is an example of path translation.

Path on host1

Path on host2

/foo/a

/d1/a

/bar/b

/d2/b

class pyopus.parallel.base.HostID[source]

Basic host identifier class that is used for identifying a host in a virtual machine. HostID objects must support comparison (__eq__() and __ne__()), hashing (__hash__()), and conversion to a string (__str__()).

static bad()[source]

A static member function. Called with HostID.bad(). Returns an invalid host ID.

valid()[source]

Returns True if this HostID object is valid.

class pyopus.parallel.base.Msg[source]

Base class for a message used in task-to-task communication.

class pyopus.parallel.base.MsgHostAdd(hostIDs)[source]

This is message that signals new hosts were added to the virtual machine.

The list of HostID objects corresponding to the added hosts is stored in the hostIDs member.

class pyopus.parallel.base.MsgHostDelete(hostID)[source]

This is message that signals a host has exited from the virtual machine.

The HostID object corresponding to the host is stored in the hostID member.

class pyopus.parallel.base.MsgTaskExit(taskID)[source]

This is message that signals a task has exited.

The TaskID object corresponding to the task is stored in the taskID member.

class pyopus.parallel.base.MsgTaskResult(success, returnValue)[source]

This is message that is sent to the process that spawned a Python function in a virtual machine. The message holds a boolean flag that tells if the function succeeded to run and the return value of the function.

The boolean flag and the return value can be found in success and returnValue members.

class pyopus.parallel.base.TaskID[source]

Basic task identifier class that is used for identifying a task in a virtual machine. TaskID objects must support comparison (__eq__() and __ne__()), hashing (__hash__()), and conversion to a string (__str__()).

static bad()[source]

A static member function. Called with TaskID.bad(). Returns an invalid task ID.

valid()[source]

Returns True if this TaskID object is valid.

class pyopus.parallel.base.VirtualMachine(debug=0, startupDir=None, translateStartupDir=True, mirrorMap=None, persistentStorage=False)[source]

The base class for accessing hosts working in parallel.

debug specifies the debug level. If it is greater than 0 debug messages are printed on the standard output.

startupDir specifies the working directory where the spawned functions will wake up. If set to None, the underlying virtual machine default is used.

If translateStartupDir is True path translation is applied to startupDir just as it is to mirrorMap. If mirroring is defined (mirrorMap is given) then no translation is performed on startupDir In this case startupDIr is treated as relative path with respect to the local storage.

mirrorMap is a dictionary specifying filesystem objects (files and directories) on the spawner which are to be mirrored (copied) to local storage on the host where the task is spawned. The keys represent paths on the spawner while the values represent the corresponding paths (relative to the local storage directory) on the host where the task will be spawned. Keys can use UNIX style globbing (anything the glob.glob() function can handle is OK). If mirrorMap is set to None, no mirroring is performed.

For the mirroring to work the filesystem objects on the spawner must be in the folders specified in the PARALLEL_MIRRORED_STORAGE environmental variable. This is because mirroring is performed by local copy operations which require the source to be on a mounted network filesystem.

If persistentStorage is enabled the local storage folders are not deleted after a spawned task is completed. Furthermore, if a local storage folder already exists for the given PID/VM index combination no mirroring is performed and that folder is used as local storage.

Persistent local storage makes spawning a task much faster as mirroring takes place ony when the first task is spawned under some PID/VM index.

If you enable persistentStorage, make sure you clear local storage at system startup so that old local storage folders from previous runs are not mistakenly used in subsequent runs. PID wraparound can also be a problem, albeit this is very unlikely to occur.

To find out more about setting the working directory and mirroring see the prepareEnvironment() method.

checkForIncoming()[source]

Returns True if there is a message waiting to be received.

cleanupEnvironment(taskStorage, forceCleanup=False)[source]

Removes the working environment prepared for a remote task.

taskStorage is the path to the local storage directory. This directory is removed.

If taskStorage is None nothing is removed.

clearLocalStorage(timeout=- 1.0)[source]

This function spawns the localStorageCleaner() function on all slots in the virtual machine and on the master. The spawned instances remove the local storage that was created for the slot.

timeout is applied where needed. Negative values stand for infinite timeout.

This function should never be called if there are tasks running in the virtual machine because it will remove their local storage.

This function should be called only by the spawner.

createLocalStorage()[source]

Creates a local storage directory subtree given by subpath under PARALLEL_LOCAL_STORAGE/pyopus-<username>.

If a local storage directory with the same name already exists it is suffixed by an underscore and a hexadecimal number.

Returns the path to the created local storage directory and a flag that is True if the storage folder was created.

static finalize()[source]

Cleans up after a parallel program. Should be called before exit.

static formatSpawnerConfig()[source]

Formats the configuration information gathered by a spawner task as a string. Works only if called by a spawner task.

static freeSlots()[source]

Returns the number of free slots for tasks in the virtual machine.

static freeSlotsSet()[source]

Returns the set of free slots.

static hostID(task=None)[source]

Returns the HostID object corresponding to task.

If task is not specified returns the HostID of the host on which the caller task runs.

static hosts()[source]

Returns the list of HostID objects representing the nosts in the virtual machine. Works only for hosts that are spawners.

static hostsWithFreeSlots()[source]

Returns set of objects of class HostID corresponding to hosts with at least one free slot.

markPersistentObject(obj)[source]

Marks and object as persistent. Returns its ID which is valid across the whole virtual machine. Adding the same object again has no effect unless removePersistentObject() is called first.

static parentTaskID()[source]

Returns the TaskID object corresponding to the task that spawned the caller task.

prepareEnvironment()[source]

Prepares the working environment (working directory and local storage) for a spawned function. This method is called by a spawned task. The mirroring information is received from the spawner (spawner’s mirrorList) at function spawn time. Spawned task’s virtual machine object is namely the spawner’s virtual machine object sent to the spawned task.

  • If mirroring is not configured with the setSpawn() method, the working directory is the one specified as startupDir with an appropriate path translation applied to it. If it is None the working directory is determined by the undelying virtual machine library (e.g. MPI).

  • If mirroring is configured with setSpawn() (mirrorList is not None), a local storage directory is created by calling createLocalStorage() with subpath set to <PID>_vm<index>, where PID is the PID of the calling process in hexadecimal notation and index is the spawner VM object index.

    Next mirroring is performed by traversing all members of the processed mirrorList dictionary received from the spawner which is a list of tuples of the form (index, suffix, target) where index and suffix specify the source filesystem object to mirror (see the translateToAbstractPath() function for the explanation of index and suffix) while target is the destination where the object will be copied.

    The source can be specified with globbing characters (anything the glob.glob() function can handle is OK).

    target is the path relative to the local storage directory where the source will be copied. Renaming of the source is not possible. target always specifies the destination directory.

    If source is a directory, symbolic links within it are copied as symbolic links which means that they should be relative and point to mirrored filesystem objects in order to remain valid after mirroring.

    If startupDir was given and it is not None the working directory is set to path given by startupDir that is relative to the local storage directory.

    If startupDir is None the working directory is set to the local storage directory.

Returns the path to the local storage directory.

receiveMessage(timeout=- 1.0)[source]

Receives a message (a Python object) and returns a tuple (senderTaskId, message)

The sender of the message can be identified through the senderTaskId object of class TaskID.

If timeout is negative the function waits (blocks) until some message arrives. If timeout>0 seconds pass without receiving a message, an empty tuple is returned. Zero timeout performs a nonblocking receive which returns an empty tuple if no message is received.

In case of an error the return value is None.

sendMessage(destination, message)[source]

Sends message (a Python object) to a task with TaskID destination. Returns True on success.

static slots()[source]

Returns the number of slots for tasks in a virtual machine. Every processor represents a slot for one task.

spawnFunction(function, args=(), kwargs={}, count=- 1, targetList=None, sendBack=True)[source]

Spawns a count instances of a Python function on remote hosts and passes args and kwargs to the function. Spawning a function actually means to start a Python interpreter, import the function, and call it with args and kwargs.

If count is None the number of tasks is select in such way that all available slots are filled.

function, args, and kwargs must be pickleable.

targetList specifies a list of hosts on which the function instances will be spawned. If it is None all hosts in the virtual machine are candidates for the spawned instances of the function.

If sendBack is True the spawned tasks return the status and the return value of the function back to the spawner after the function exits. The return value must be pickleable.

Returns a list of TaskID objects representing the spawned tasks.

Works only if called by a spawner task.

spawnerCall(function, srgs=(), kwargs={}, asynchr=False)[source]

Calls function with arg and kwargs in the spawner task. If asynchr is False waits for the call to finish and returns the function’s return value.

If called in a worker task delegates the call to the spawner.

If called in a spawner task, calls function locally.

This function is useful for sending data to a central point (spawner) where it is processed (e.g. for database writes, logging, etc.).

static taskID()[source]

Returns the TaskID object corresponding to the calling task.

unmarkPersistentObject(obj)[source]

Unmarks an object as persistent. Raises an exception if the object is not persistent.

vm_counter = 0

VM object counter used for assigning numbers to VM objects. These numbers are used for labelling local storage folders.

pyopus.parallel.base.getNumberOfCores()[source]

Returns the number of available CPU cores.

Works for Linux, Unix, MacOS, and Windows.

Uses code from Parallel Python (http://www.parallelpython.com).

pyopus.parallel.base.translateToAbstractPath(path)[source]

Translates a path on the local machine to a tuple (index, suffix) where index denotes the index of the path entry in the PARALLEL_MIRRORED_STORAGE environmental variable and suffix is the path relative to that entry.

Note that suffix is a relative path even if it starts with /.

pyopus.parallel.base.translateToActualPath(index, relPath)[source]

Translates a index and relPath to an actual path on the local machine.

This is the inverse of the translateToAbstractPath() function.