"""
.. inheritance-diagram:: pyopus.parallel.base
:parts: 1
**Base classes for virtual machines (PyOPUS subsystem name: VM)**
A **spawner task** is a task that can spawn new tasks on hosts in the virtual
machine. All other tasks are **worker tasks**.
**Mirroring and local storage**
Often tasks in the virtual machine require the use of additional storage (like
a harddisk) for storing the parallel algorithm's input files which are
identical for all hosts. These files must therefore be distributed to all
hosts in a virtual machine before the computation can begin.
One way to make all virtual machines see the same directory structure is to
use network filesystems like NFS or SMBFS. In this approach the storage
physically resides one a computer in the network (file server). This storage
is exported and mounted by all hosts in the virtual machine.
Take for instance that folder ``/shareme/foo`` on the file server is exported.
Hosts in the virtual machine mount this folder under ``/home``. This way all
hosts see the ``/shareme/foo`` folder located on the physical storage of the
server as their own folder ``/home``.
This approach is very simple and ensures that all hosts see the same input
files in the same place. Unfortunately as the number of hosts grows and the
amount of data read operatins to the shared folder grows, the network quickly
becomes saturated and the computational performance of the virtual machine
dramatically decreases because tasks must wait on slow data read operations.
A more scalable solution is to use the local storage of every host to store
the algoruthm's input files. This has the downside that before the computation
begins these files must be distributed to all hosts.
On the other hand parallel algorithms often require some local storage for
storing various intermediate files. If this local storage is in the form of a
shared folder an additional problem occurs when multiple hosts try to write a
file with the same name to a physically same place, but with different content.
A solution to the intermediate file problem is to use every host's phisically
local storage (e.g. its local harddisk) for storing the intermediate files.
The solution to these problems in PyOPUS is to use mirroring. Mirroring is
configured through two environmental variables. ``PARALLEL_LOCAL_STORAGE``
specifies the path to the folder where the folders for the local storage of
input and intermediate files will be created. If this environmental variable is
not specified a reasonable default is used (spedified by the :mod:`tempfile`
module).
The ``PARALLEL_MIRRORED_STORAGE`` environmental variable specifies a
colon (``:``) separated list of paths to the directories that are mounted from
a common file server. The first such path in ``PARALLEL_MIRRORED_STORAGE`` on
all hosts corresponds to the same directory on the file server. The same goes
for the second, the third, etc. paths listed in ``PARALLEL_MIRRORED_STORAGE``.
Under Windows the list separator is semicolon (``;``).
The ``PARALLEL_MIRRORED_STORAGE`` environmental variable must be set on all
hosts in a virtual machine, if mirroring is used. It is usually set to
``/localhome/USERNAME`` which must physically reside on the local machine.
``PARALLEL_MIRRORED_STORAGE`` should be writeable by the user that is running
the spawned processes.
If ``PARALLEL_MIRRORED_STORAGE`` is not set the default under Linux is to
use the user's home folder. Under Windows the current folder is used by
default.
Both ``PARALLEL_LOCAL_STORAGE`` and ``PARALLEL_MIRRORED_STORAGE`` can use UNIX
style user home directory expansion (e.g. ``~user`` expands to ``/home/user``).
**Path translation in mirroring operations**
Suppose the ``PARALLEL_MIRRORED_STORAGE`` environmental variable is set to
``/foo:/bar`` on host1 and ``/d1:/d2`` on host2. This means that the following
directories are equivalent (mounted from the same exported folder on a file
server)
======= =======
host1 host2
======= =======
/foo /d1
/bar /d2
======= =======
So ``/foo`` on host1 represents the same physical storage as ``/d1`` on host2.
Similarly ``/bar`` on host1 represents the same physical storage as ``/d2`` on
host2. Usually only ``/home`` is common to all hosts in a virtual machine so
``PARALLEL_MIRRORED_STORAGE`` is set to ``/home`` on all hosts.
Path translation converts a path on host1 into a path to the physically
same object (mounted from the same exported directory on the file
server) on host2. The following is an example of path translation.
============= =============
Path on host1 Path on host2
============= =============
/foo/a /d1/a
/bar/b /d2/b
============= =============
"""
from inspect import ismethod, isclass
from ..misc.debug import DbgMsg, DbgMsgOut
from ..misc.env import environ
from ..misc.username import username
from .. import PyOpusError
import platform
import pickle, io
from glob import iglob
import os, sys, shutil, time, tempfile
__all__ = [ 'TaskID', 'HostID', 'Msg', 'MsgTaskExit', 'MsgHostDelete', 'MsgHostAdd',
'MsgTaskResult', 'VirtualMachine', 'getNumberOfCores',
'translateToAbstractPath', 'translateToActualPath' ]
# Persistent objects table, global object id for key
persistentObjects={}
# Translator from local object id to global object id
persistentGlobalId={}
# This variable holds a reference to the first imported VM module - for resolving conflicts
# (e.g. MPI and PVM don't work together)
firstVM=None
# All derivative classes must be picklable
# Task identifier definition
[docs]class TaskID(object):
"""
Basic task identifier class that is used for identifying a task in a
virtual machine. :class:`TaskID` objects must support comparison
(:func:`__eq__` and :func:`__ne__`), hashing (:func:`__hash__`), and
conversion to a string (:func:`__str__`).
"""
[docs] @staticmethod
def bad():
"""
A static member function. Called with ``TaskID.bad()``.
Returns an invalid task ID.
"""
return TaskID()
def __init__(self):
pass
def __hash__(self):
return 0
def __eq__(self, other):
return type(self)==type(other)
def __ne__(self, other):
return type(self)!=type(other)
def __str__(self):
return "NOTASK"
[docs] def valid(self):
"""
Returns ``True`` if this :class:`TaskID` object is valid.
"""
return False
# Host identifier definition
[docs]class HostID(object):
"""
Basic host identifier class that is used for identifying a host in a
virtual machine. :class:`HostID` objects must support comparison
(:func:`__eq__` and :func:`__ne__`), hashing (:func:`__hash__`), and
conversion to a string (:func:`__str__`).
"""
[docs] @staticmethod
def bad():
"""
A static member function. Called with ``HostID.bad()``.
Returns an invalid host ID.
"""
return TaskID()
def __init__(self):
pass
def __hash__(self):
return 0
def __eq__(self, other):
return type(self)==type(other)
def __ne__(self, other):
return type(self)!=type(other)
def __str__(self):
return "NOHOST"
[docs] def valid(self):
"""
Returns ``True`` if this :class:`HostID` object is valid.
"""
return False
# Message on virtual machine level of abstraction
[docs]class Msg(object):
"""
Base class for a message used in task-to-task communication.
"""
pass
# Define some common messages
[docs]class MsgTaskExit(Msg):
"""
This is message that signals a task has exited.
The :class:`TaskID` object corresponding to the task is stored in the
:attr:`taskID` member.
"""
def __init__(self, taskID):
Msg.__init__(self)
self.taskID=taskID
[docs]class MsgHostDelete(Msg):
"""
This is message that signals a host has exited from the virtual machine.
The :class:`HostID` object corresponding to the host is stored in the
:attr:`hostID` member.
"""
def __init__(self, hostID):
Msg.__init__(self)
self.hostID=hostID
[docs]class MsgHostAdd(Msg):
"""
This is message that signals new hosts were added to the virtual machine.
The list of :class:`HostID` objects corresponding to the added hosts is
stored in the :attr:`hostIDs` member.
"""
def __init__(self, hostIDs):
Msg.__init__(self)
self.hostIDs=hostIDs
[docs]class MsgTaskResult(Msg):
"""
This is message that is sent to the process that spawned a Python function
in a virtual machine. The message holds a boolean flag that tells if the
function succeeded to run and the return value of the function.
The boolean flag and the return value can be found in :attr:`success` and
:attr:`returnValue` members.
"""
def __init__(self, success, returnValue):
Msg.__init__(self)
self.success=success
self.returnValue=returnValue
# These are helper methods
[docs]def translateToAbstractPath(path):
"""
Translates a *path* on the local machine to a tuple (*index*, *suffix*)
where *index* denotes the index of the path entry in the
``PARALLEL_MIRRORED_STORAGE`` environmental variable and *suffix* is
the path relative to that entry.
Note that *suffix* is a relative path even if it starts with ``/``.
"""
# Make path canonical
canonical=os.path.realpath(path)
# Mark it as not found in PARALLEL_MIRRORED_STORAGE
for index in range(len(ParallelMirroredStorage)):
masterMirroredDir=ParallelMirroredStorage[index]
if canonical.find(masterMirroredDir)==0:
found=True
# We have a mirrored direcotry prefix, get relative path.
suffix=os.path.relpath(canonical, masterMirroredDir)
return (index, suffix)
# Failed to translate
raise PyOpusError(DbgMsg("VM", "'"+path+"' not in PARALLEL_MIRRORED_STORAGE."))
[docs]def translateToActualPath(index, relPath):
"""
Translates a *index* and *relPath* to an actual path on the local
machine.
This is the inverse of the :func:`translateToAbstractPath` function.
"""
if index>=len(ParallelMirroredStorage):
raise PyOpusError(DbgMsg("VM", "PARALLEL_MIRRORED_STORAGE should have at least "+str(index+1)+" members."))
return os.path.join(ParallelMirroredStorage[index], relPath)
# Virtual machine
[docs]class VirtualMachine(object):
"""
The base class for accessing hosts working in parallel.
*debug* specifies the debug level. If it is greater than 0 debug messages
are printed on the standard output.
*startupDir* specifies the working directory where the spawned
functions will wake up. If set to ``None``, the underlying virtual machine
default is used.
If *translateStartupDir* is ``True`` path translation is applied to
*startupDir* just as it is to *mirrorMap*. If mirroring is defined
(*mirrorMap* is given) then no translation is performed on *startupDir*
In this case *startupDIr* is treated as relative path with respect to
the local storage.
*mirrorMap* is a dictionary specifying filesystem objects (files and
directories) on the spawner which are to be mirrored (copied) to local
storage on the host where the task is spawned. The keys represent
paths on the spawner while the values represent the corresponding
paths (relative to the local storage directory) on the host where the
task will be spawned. Keys can use UNIX style globbing (anything the
:func:`glob.glob` function can handle is OK). If *mirrorMap* is set to
``None``, no mirroring is performed.
For the mirroring to work the filesystem objects on the spawner must be
in the folders specified in the ``PARALLEL_MIRRORED_STORAGE``
environmental variable. This is because mirroring is performed by local
copy operations which require the source to be on a mounted network
filesystem.
If *persistentStorage* is enabled the local storage folders are
not deleted after a spawned task is completed. Furthermore, if a local
storage folder already exists for the given PID/VM index combination
no mirroring is performed and that folder is used as local storage.
Persistent local storage makes spawning a task much faster as mirroring
takes place ony when the first task is spawned under some PID/VM index.
If you enable *persistentStorage*, make sure you clear local storage
at system startup so that old local storage folders from previous runs
are not mistakenly used in subsequent runs. PID wraparound can also be
a problem, albeit this is very unlikely to occur.
To find out more about setting the working directory and mirroring see
the :meth:`prepareEnvironment` method.
"""
vm_counter=0
"""
VM object counter used for assigning numbers to VM objects.
These numbers are used for labelling local storage folders.
"""
def __init__(self, debug=0, startupDir=None, translateStartupDir=True, mirrorMap=None, persistentStorage=False):
# Debug level
self.debug=debug
# Enumerate VM objects
self.vmIndex=VirtualMachine.vm_counter
VirtualMachine.vm_counter=+1
self.persistentStorage=persistentStorage
# Process startupDir
if translateStartupDir and mirrorMap is not None and startupDir is not None:
(self.startupDirIndex, self.startupDirSuffix)=translateToAbstractPath(startupDir)
else:
self.startupDirIndex=None
self.startupDirSuffix=startupDir
# Process local storage map
if mirrorMap is None:
self.mirrorList=None
else:
# Build mirror list
# Start with empty list
self.mirrorList=[]
# Go through all entries in mirrorMap
for (masterObject, target) in mirrorMap.items():
(index, suffix)=translateToAbstractPath(masterObject)
# Append to self.mirrorList
self.mirrorList.append((index, suffix, target))
if self.debug:
DbgMsgOut("VM", "Mirroring set for '"+suffix+"' in storage "+str(index)+" to '"+target+"' in local storage.")
[docs] @staticmethod
def slots():
"""
Returns the number of slots for tasks in a virtual machine.
Every processor represents a slot for one task.
"""
return 0
[docs] @staticmethod
def freeSlots():
"""
Returns the number of free slots for tasks in the virtual machine.
"""
return 0
[docs] @staticmethod
def freeSlotsSet():
"""
Returns the set of free slots.
"""
return 0
[docs] @staticmethod
def hostsWithFreeSlots():
"""
Returns set of objects of class :class:`HostID` corresponding
to hosts with at least one free slot.
"""
return set([])
[docs] @staticmethod
def hosts():
"""
Returns the list of :class:`HostID` objects representing the nosts in
the virtual machine. Works only for hosts that are spawners.
"""
return []
[docs] @staticmethod
def taskID():
"""
Returns the :class:`TaskID` object corresponding to the calling task.
"""
return None
[docs] @staticmethod
def hostID(task=None):
"""
Returns the :class:`HostID` object corresponding to *task*.
If *task* is not specified returns the :class:`HostID` of the
host on which the caller task runs.
"""
return None
[docs] @staticmethod
def parentTaskID():
"""
Returns the :class:`TaskID` object corresponding to the task that
spawned the caller task.
"""
return None
@classmethod
def dummy(cls):
pass
# This function prepares a function descriptor for spawning so that member functions can also be spawned
@classmethod
def func2desc(cls, func):
if ismethod(func):
# Handles instance methods and classmethods
# Get self, pack it together with method name
return (func.__self__, func.__name__)
else:
# Plain function, just return the function
return func
# This function reconstructs a function from a function descriptor
@classmethod
def desc2func(cls, desc):
if type(desc) is tuple:
# Instance methods and classmethods
s,n=desc
if isclass(s):
# Class, look in __dict__
return getattr(s, n)
else:
# Instance, use __getattribute__
return s.__getattribute__(n)
else:
# Plain function
return desc
# This must be overriden by every derived class.
[docs] def spawnFunction(self, function, args=(), kwargs={}, count=-1, targetList=None, sendBack=True):
"""
Spawns a *count* instances of a Python *function* on remote hosts and
passes *args* and *kwargs* to the function. Spawning a function
actually means to start a Python interpreter, import the function, and
call it with *args* and *kwargs*.
If *count* is ``None`` the number of tasks is select in such way that
all available slots are filled.
*function*, *args*, and *kwargs* must be pickleable.
*targetList* specifies a list of hosts on which the function instances
will be spawned. If it is ``None`` all hosts in the virtual machine are
candidates for the spawned instances of the function.
If *sendBack* is ``True`` the spawned tasks return the status and the
return value of the function back to the spawner after the function
exits. The return value must be pickleable.
Returns a list of :class:`TaskID` objects representing the spawned tasks.
Works only if called by a spawner task.
"""
raise PyOpusError(DbgMsg("VM", "Spawning not implemented."))
# This must be overriden by every derived class.
[docs] def spawnerCall(self, function, srgs=(), kwargs={}, asynchr=False):
"""
Calls *function* with *arg* and *kwargs* in the spawner task.
If *asynchr* is ``False`` waits for the call to finish and returns
the function's return value.
If called in a worker task delegates the call to the spawner.
If called in a spawner task, calls *function* locally.
This function is useful for sending data to a central point (spawner)
where it is processed (e.g. for database writes, logging, etc.).
"""
raise PyOpusError(DbgMsg("VM", "Spawner calls not implemented."))
[docs] def checkForIncoming(self):
"""
Returns ``True`` if there is a message waiting to be received.
"""
raise PyOpusError(DbgMsg("VM", "Message check not implemented."))
[docs] def receiveMessage(self, timeout=-1.0):
"""
Receives a *message* (a Python object) and returns a tuple
(*senderTaskId*, *message*)
The sender of the *message* can be identified through the
*senderTaskId* object of class :class:`TaskID`.
If *timeout* is negative the function waits (blocks) until some message
arrives. If *timeout*>0 seconds pass without receiving a message, an
empty tuple is returned. Zero *timeout* performs a nonblocking receive
which returns an empty tuple if no message is received.
In case of an error the return value is ``None``.
"""
raise PyOpusError(DbgMsg("VM", "Reception of messages not implemented."))
[docs] def sendMessage(self, destination, message):
"""
Sends *message* (a Python object) to a task with :class:`TaskID`
*destination*. Returns ``True`` on success.
"""
raise PyOpusError(DbgMsg("VM", "Sending of messages not implemented."))
[docs] @staticmethod
def finalize():
"""
Cleans up after a parallel program.
Should be called before exit.
"""
pass
[docs] def clearLocalStorage(self, timeout=-1.0):
"""
This function spawns the :func:`localStorageCleaner` function on all
slots in the virtual machine and on the master. The spawned
instances remove the local storage that was created for the slot.
*timeout* is applied where needed. Negative values stand for infinite
*timeout*.
This function should never be called if there are tasks running in the
virtual machine because it will remove their local storage.
This function should be called only by the spawner.
"""
# Get hosts
hostIDs=self.hosts()
# Run the cleaner locally
localStorageCleaner(self)
# Spawn a cleaner on every host
taskIDs=[]
for hostID in hostIDs:
taskID=self.spawnFunction(localStorageCleaner, kwargs={'vm': self}, count=1, targetList=[hostID])
taskIDs.extend(taskID)
# Collect return values and task exit messages from all hosts confirming that cleanup is finished.
taskIDs=set(taskIDs)
mark=time.time()
while len(taskIDs)>0:
remains=timeout
if timeout>=0:
remains=timeout-(time.time()-mark)
if remains<=0:
break
recv=self.receiveMessage(remains)
if recv is not None and len(recv)==2:
(sourceID, msg)=recv
if type(msg) is MsgTaskExit:
# Remove taskID from set of spawned task IDs.
if sourceID in taskIDs:
taskIDs.remove(sourceID)
else:
# Throw away other messages.
pass
def localStoragePath(self):
if ParallelLocalStorage is None:
raise PyOpusError(DbgMsg("VM", "PARALLEL_LOCAL_STORAGE is not set."))
# Construct subpath
subpath="%x_vm%x" % (os.getpid(), self.vmIndex)
# Build storage directory name
taskStorage=os.path.join(ParallelLocalStorage, "pyopus-"+username, subpath)
# Folder must not exist before creation
# If itt exists, modify the folder name
# But do this only if persistent storage is not enabled
if not self.persistentStorage:
# Change name if it already exists, add numeric suffix (hex).
counter=0
taskStorageRenamed=taskStorage
while os.path.lexists(taskStorageRenamed):
# Directory exists, try another one (add numeric suffix)
counter+=1
taskStorageRenamed=taskStorage+("_%x" % counter)
if counter>0:
taskStorage=taskStorageRenamed
if self.debug:
DbgMsgOut("VM", ("Local storage directory already exists, adding suffix _%x." % (counter)))
return taskStorage
[docs] def createLocalStorage(self):
"""
Creates a local storage directory subtree given by *subpath* under
``PARALLEL_LOCAL_STORAGE/pyopus-<username>``.
If a local storage directory with the same name already exists it is
suffixed by an underscore and a hexadecimal number.
Returns the path to the created local storage directory and a flag that
is ``True`` if the storage folder was created.
"""
# Build storage directory name
# taskStorage=os.path.join(ParallelLocalStorage, "pyopus-"+username, subpath)
taskStorage=self.localStoragePath()
created=False
try:
# Check if it exists
if not os.path.isdir(taskStorage):
# Create only if it is not there yet
os.makedirs(taskStorage)
created=True
except:
raise PyOpusError(DbgMsg("VM", "Failed to create local storage in '"+taskStorage+"'"))
return taskStorage, created
# Create local storage, mirror, and set working directory.
# Return local storage path. If no mirroring was performed, return None.
[docs] def prepareEnvironment(self):
"""
Prepares the working environment (working directory and local storage)
for a spawned function. This method is called by a spawned task. The
mirroring information is received from the spawner (spawner's
*mirrorList*) at function spawn time. Spawned task's virtual machine
object is namely the spawner's virtual machine object sent to the
spawned task.
* If mirroring is not configured with the :meth:`setSpawn` method, the
working directory is the one specified as *startupDir* with an
appropriate path translation applied to it. If it is ``None`` the
working directory is determined by the undelying virtual machine
library (e.g. MPI).
* If mirroring is configured with :meth:`setSpawn` (*mirrorList* is not
``None``), a local storage directory is created by calling
:meth:`createLocalStorage` with *subpath* set to ``<PID>_vm<index>``,
where ``PID`` is the PID of the calling process in hexadecimal notation
and ``index`` is the spawner VM object index.
Next mirroring is performed by traversing all members of the
processed *mirrorList* dictionary received from the spawner which is
a list of tuples of the form (*index*, *suffix*, *target*) where
*index* and *suffix* specify the source filesystem object to mirror
(see the :func:`translateToAbstractPath` function for the explanation
of *index* and *suffix*) while *target* is the destination where the
object will be copied.
The source can be specified with globbing characters (anything the
:func:`glob.glob` function can handle is OK).
*target* is the path relative to the local storage directory where
the source will be copied. Renaming of the source is not possible.
*target* always specifies the destination directory.
If source is a directory, symbolic links within it are copied as
symbolic links which means that they should be relative and point to
mirrored filesystem objects in order to remain valid after mirroring.
If *startupDir* was given and it is not ``None`` the working
directory is set to path given by *startupDir* that is relative to
the local storage directory.
If *startupDir* is ``None`` the working directory is set to the
local storage directory.
Returns the path to the local storage directory.
"""
if self.mirrorList is None:
# No mirroring. Change working directory and return.
if self.startupDirSuffix is not None:
if self.startupDirIndex is None:
startupPath=self.startupDirSuffix
else:
startupPath=translateToActualPath(self.startupDirIndex, self.startupDirSuffix)
if self.debug:
DbgMsgOut("VM", "Changing working directory to '"+startupPath+"'.")
os.chdir(startupPath)
return None
# Have mirroring.
# Create local storage directory
if self.debug:
DbgMsgOut("VM", "Creating local storage.")
taskStorage, created = self.createLocalStorage()
if self.debug:
DbgMsgOut("VM", "Using local storage in '"+taskStorage+"'.")
# Change working directory to local storage
os.chdir(taskStorage)
# Skip mirroring if local storage was not created and persistentStorage is True
if created or not self.persistentStorage:
# Copy local storage subdirs (master must do some preparation first)
# Worker instructions for directory copying are in the vm object received from parent
for mirrorDirective in self.mirrorList:
(index, suffix, target)=mirrorDirective
# Translate
sourcePath=translateToActualPath(index, suffix)
# Debug
if self.debug:
DbgMsgOut("VM", "Mirroring '"+sourcePath+"' to '"+target+"'.")
sys.stdout.flush()
# Do the globbing
for source in iglob(sourcePath):
if os.path.isdir(source):
# Copying a directory. Destination is the directory where the copy of the tree will be created.
# Renaming is not possible. Copy symlinks as symlinks.
# Get the name of the copied object (copytree wants a destination object name).
(srcPath, srcName)=os.path.split(source)
# Copy tree, create destination directory if it does not exist.
shutil.copytree(source, os.path.join(target, srcName), symlinks=True)
else:
# Copying a file. Destination is the directory where the copy of the file will be created.
# Renaming is not possible. Copy file itself, not symlink
# Get the name of the copied object.
(srcPath, srcName)=os.path.split(source)
# Create destination directory if it does not exist.
if not os.path.exists(target):
os.makedirs(target)
if not os.path.isdir(target):
raise PyOpusError(DbgMsg("VM", "Mirroring destination exists, but is not a directory."))
# Copy file.
shutil.copy(source, target)
# See if workDir is given
if self.startupDirSuffix is not None:
# startupDir is relative to local storage directory.
tmpDir=os.path.join(taskStorage, self.startupDirSuffix)
if self.debug:
DbgMsgOut("VM", "Changing working directory to '"+tmpDir+"'.")
os.chdir(tmpDir)
return taskStorage
[docs] def cleanupEnvironment(self, taskStorage, forceCleanup=False):
"""
Removes the working environment prepared for a remote task.
*taskStorage* is the path to the local storage directory.
This directory is removed.
If *taskStorage* is ``None`` nothing is removed.
"""
if taskStorage is None:
return
# Do nothing if local storage is persistent
if self.persistentStorage and not forceCleanup:
return
if self.debug:
if forceCleanup:
DbgMsgOut("VM", "Forcing removal of '"+taskStorage+"'.")
else:
DbgMsgOut("VM", "Removing '"+taskStorage+"'.")
# Go to ParallelLocalStorage so we are not in our own way.
os.chdir(ParallelLocalStorage)
# Remove task storage folder
shutil.rmtree(taskStorage, True)
[docs] def markPersistentObject(self, obj):
"""
Marks and object as persistent. Returns its ID which is valid
across the whole virtual machine. Adding the same object
again has no effect unless :meth:`removePersistentObject` is
called first.
"""
localId=id(obj)
oid=localId
# Do nothing if it is already there
if oid in persistentObjects:
return oid
# Broadcast
self._broadcastNewPersistentObject(obj, oid)
self._addPersistentObject(obj, oid)
return oid
[docs] def unmarkPersistentObject(self, obj):
"""
Unmarks an object as persistent. Raises an exception if the
object is not persistent.
"""
localId=id(obj)
if localId not in persistentGlobalId:
raise PyOpusError(DbgMsg("VM", ("Object id=%d is not persistent." % (oid))))
oid=persistentGlobalId[localId]
self._removePersistentObject(oid)
# Broadcast
self._broadcastPersistentObjectDeletion(oid)
@staticmethod
def _addPersistentObject(obj, oid):
"""
Add object *obj* to to persistent objects table with global
object id *oid*.
"""
persistentObjects[oid]=obj
persistentGlobalId[id(obj)]=oid
@staticmethod
def _removePersistentObject(oid):
"""
Remove persistent object with global object id *oid* from
persistent objects table.
"""
if oid in persistentObjects:
localId=id(persistentObjects[oid])
del persistentObjects[oid]
del persistentGlobalId[localId]
def _broadcastNewPersistentObject(self, obj, oid):
"""
Broadcasts object *obj* associated with persistent object id
*id*. to all workers.
This must be implemented in derived classes.
"""
raise PyOpusError(DbgMsg("VM", "Persistent objects not implemented."))
def _broadcastPersistentObjectDeletion(self, oid):
"""
Broadcasts a message to all workers that persistent object
with id *id* should be removed from teh table of persistent
objects.
This must be implemented in derived classes.
"""
raise PyOpusError(DbgMsg("VM", "Persistent objects not implemented."))
# Persistence aware custom pickler
class _PersistencePickler(pickle.Pickler):
def persistent_id(self, obj):
localId=id(obj)
if localId in persistentGlobalId:
oid=persistentGlobalId[localId]
return ("PI", oid)
else:
return None
class _PersistenceUnpickler(pickle.Unpickler):
def persistent_load(self, spec):
tag, oid = spec
if tag=="PI":
if oid in persistentObjects:
return persistentObjects[oid]
else:
raise pickle.UnpicklingError("Persistent id=%d object not found." % (oid))
else:
raise pickle.UnpicklingError("Unsupported persistent object type.")
bytes_types = (bytes, bytearray)
# Convenience functions that use our persistence aware custom pickler
def _dumps(obj, protocol=None, *, fix_imports=True):
f = io.BytesIO()
_PersistencePickler(f, protocol, fix_imports=fix_imports).dump(obj)
res = f.getvalue()
assert isinstance(res, bytes_types)
return res
def _loads(s, *, fix_imports=True, encoding="ASCII", errors="strict"):
if isinstance(s, str):
raise TypeError("Can't load pickle from unicode string")
file = io.BytesIO(s)
return _PersistenceUnpickler(file, fix_imports=fix_imports, encoding=encoding, errors=errors).load()
#
# Helper functions
#
def localStorageCleaner(vm=None):
"""
This is the function spawned by the
:meth:`VirtualMachine.clearLocalStorage` method.
The function looks in the directory given by the ``PARALLEL_LOCAL_STORAGE``
environmental variable and removes everything in that directory.
"""
# Prepare base path ParallelLocalStorage/pyopus/<username>
basePath=os.path.join(ParallelLocalStorage, "pyopus-"+username)
# Traverse storageRoot entries.
entries=os.listdir(basePath)
# Go to ParallelLocalStorage/pyopus so we are not in our own way.
os.chdir(os.path.join(ParallelLocalStorage))
# Go through entries.
for entry in entries:
completeEntry=os.path.join(basePath, entry)
# Ignore errors (delete everything we can delete).
try:
if os.path.isdir(completeEntry):
shutil.rmtree(completeEntry, True)
else:
os.remove(completeEntry)
if vm.debug:
DbgMsgOut("VM", "Removing '"+completeEntry+"'.")
except:
if vm.debug:
DbgMsgOut("VM", "Failed to remove '"+completeEntry+"'.")
if vm.debug:
DbgMsgOut("VM", "Finished local storage cleanup.")
[docs]def getNumberOfCores():
"""
Returns the number of available CPU cores.
Works for Linux, Unix, MacOS, and Windows.
Uses code from Parallel Python (http://www.parallelpython.com).
"""
# Taken from Parallel Python. Thanks.
# For Linux, Unix and MacOS
if hasattr(os, "sysconf"):
if "SC_NPROCESSORS_ONLN" in os.sysconf_names:
# Linux and Unix
ncpus = os.sysconf("SC_NPROCESSORS_ONLN")
if isinstance(ncpus, int) and ncpus > 0:
return ncpus
else:
# MacOS X
return int(os.popen2("sysctl -n hw.ncpu")[1].read())
# For Windows
if "NUMBER_OF_PROCESSORS" in environ:
ncpus = int(environ["NUMBER_OF_PROCESSORS"])
if ncpus > 0:
return ncpus
# Default
return 1
#
# Initialization - runs once at first module import
#
# Get local storage from environment - this is the directory residing on a phisically
# local medium where runs local to the host are performed. The user who owns the
# Python process must have read/write permission to this directory.
# If it does not exist yet, we try to create it.
if 'PARALLEL_LOCAL_STORAGE' in environ:
ParallelLocalStorage=environ['PARALLEL_LOCAL_STORAGE']
try:
# Expand user home (~, ~name (unix only))
ParallelLocalStorage=os.path.expanduser(ParallelLocalStorage)
# Normalize path, get canonical path (full path, eliminate symlinks)
ParallelLocalStorage=os.path.realpath(ParallelLocalStorage)
# Create if not there yet
if not os.path.exists(ParallelLocalStorage):
# Not there, create it (along with all missing directories in the path)
os.makedirs(ParallelLocalStorage)
except KeyboardInterrupt:
DbgMsgOut("VM", "Keyboard interrupt.")
raise
except:
DbgMsgOut("VM", "Failed to process local storage dir "+ParallelLocalStorage+".")
raise
else:
# Reasonable default for Linux and Windows
ParallelLocalStorage=tempfile.gettempdir()
# Get mirrored storage from environment - a directory which is common to all hosts
# in the virtual machine. This is a directory that is mounted from a common
# location (nfs exported by a server). Usually this is the /home directory.
# This is where you start your parallel runs and keep your Python scripts.
# Can be a colon separated list of directories. If one host specifies a
# colon separated list, all must specify a list of same length with
# directories in the same order as they appear across hosts.
if 'PARALLEL_MIRRORED_STORAGE' in environ:
ParallelMirroredStorage=environ['PARALLEL_MIRRORED_STORAGE']
if platform.platform().startswith('Windows'):
# Split list in directories, separator is semicolon
ParallelMirroredStorage=ParallelMirroredStorage.split(';')
else:
# Split list in directories, separator is colon
ParallelMirroredStorage=ParallelMirroredStorage.split(':')
# Process list
for i in range(0, len(ParallelMirroredStorage)):
mirrored=ParallelMirroredStorage[i]
try:
# Expand user home (~, ~name (unix only))
mirrored=os.path.expanduser(mirrored)
# Normalize path, get canonical path (full path, eliminate symlinks)
mirrored=os.path.realpath(mirrored)
except KeyboardInterrupt:
DbgMsgOut("VM", "keyboard interrupt")
raise
except:
DbgMsgOut("VM", "failed to process mirrored storage dir %d" % (mirrored))
ParallelMirroredStorage[i]=mirrored
del mirrored
else:
if platform.platform().startswith('Windows'):
# Reasonable default for Windows is the current working directory
ParallelMirroredStorage=[os.path.realpath(os.getcwd())]
else:
# Reasonable default for Linux is the user's home
ParallelMirroredStorage=[os.path.realpath(os.path.expanduser("~"))]