Source code for pyopus.parallel.cooperative

"""
.. inheritance-diagram:: pyopus.parallel.cooperative
    :parts: 1
	
**Cooperative multitasking OS with task outsourcing (PyOPUS subsystem name: COS)**

This module is based on the :mod:`greenlet` module. Concurrent tasks can be created 
in a UNIX-like fashion (with the :meth:`Spawn` method). The return value of a task 
or multiple tasks is collected with the :meth:`Join` method. Joins can be blocking 
or nonblocking. 

The cooperative multitasking OS takes care of outsourcing the tasks to computing 
nodes if it is permitted to do this and there are computing nodes available. 
Outsourcing uses the virtual machine specified by calling the :meth:`setVM` method. 
If no virtual machine is specified outsourcing is not possible. 

COS makes it possible to implement multilevel parallelism and asynchronous 
algorithms in a simple manner. Parallel algorithms can be run on a single CPU 
by taking advantage of the :mod:`greenlet` module for providing the microthread 
functionality. Every local task is a microthread that runs concurrently with 
other microthreads. The microthreads are cooperatively scheduled by COS. 
Of course such a run is slower than a real parallel run involving multiple 
tasks across multiple processors. 
"""

from greenlet import greenlet
from pyopus.parallel.base import MsgTaskResult, MsgTaskExit, translateToAbstractPath, translateToActualPath
from ..misc import identify
from ..misc.debug import DbgMsg, DbgMsgOut
from .. import PyOpusError

__all__ = [ 'cOS', 'Task', 'SysCall', 'Spawn', 'Yield', 'Join', 'Scheduler', 'OpusOS' ]

# System calls 

# Base class
[docs]class SysCall(object): """ Base class for COS system calls. """ def handle(self, sched, task): raise PyOpusError("COS: This is an abstract class.")
# Spawn a child task
[docs]class Spawn(SysCall): """ System call for spawning a child task. * *func* - function defining the task * *args* - positional arguments to the task's function * *kwargs* - keyword arguments to the task's function * *remote* - ``True`` if a remote spawn is requested. Spawns a local task if the scheduler has no VM object or *remote* is ``False``. * *targetHostID* - specifies the :class:`HostID` of the host where the remote task should be started. ``None`` corresponds to any host. * *block* - ``True`` if a remote spawn should block until a slot is free. Has no effect for local spawns. A nonblocking remote spawn returns tid=-1 on failure. * *enqueue* - ``True`` if a remote spawn should be queued if no slot is available. Setting *block* to ``True`` implies *enqueue* is also ``True`` regardless of the value passed as *enqueue*. * *extendedReturnValue* - when set to ``True`` the return value is a tuple of the form ``(tid, taskID, hostID)``. For local tasks ``taskID`` and ``hostII`` are always ``None``. For remote tasks they contain the :class:`TaskID` and :class:`HostID` objects corresponding to the remote task. If a task is enqueued for remote execution with a nonblocking spawn ``taskID`` is ``None`` while ``hostID`` is equal to *targetHostID*. Returns the tid of a spawned task. """ def __init__(self, func, args=[], kwargs={}, remote=False, targetHostID=None, block=True, enqueue=True, extendedReturnValue=False): self.func=func self.remote=remote self.targetHostID=targetHostID self.block=block self.enqueue=enqueue or block is True self.args=args self.kwargs=kwargs self.extendedReturnValue=extendedReturnValue def handle(self, sched, task): # Check if we can hope to spawn remotely, if not now then at least sometime in the future if ( self.remote and # Remote spawn requested sched.vm is not None and # Have a VM sched.vm.freeSlots()+len(sched.remoteTasks)>0 # Have at least one slot ): # Remote spawn newTask=sched.new(parent=task, func=self.func, remote=True, targetHostID=self.targetHostID, args=self.args, kwargs=self.kwargs) if newTask is None: # Failed to spawn, no slots for a remote task if self.enqueue: # Blocking spawn # Enqueue this spawn sched.enqueueSpawn(self, task, self.targetHostID) # Nonblocking enqueued spawn if not self.block: if self.extendedReturnValue: task.sendval=newTask.tid, None, self.targetHostID else: task.sendval=newTask.tid # Schedule spawner task sched.schedule(task) else: # Do not enqueue if self.extendedReturnValue: task.sendval=-1, None, None else: task.sendval=-1 # Schedule spawner task sched.schedule(task) else: # Success, return tid if self.extendedReturnValue: task.sendval=newTask.tid, newTask.remoteTaskID, newTask.remoteHostID else: task.sendval=newTask.tid # Schedule spawner task sched.schedule(task) else: # Local spawn newTask=sched.new(parent=task, func=self.func, remote=False, args=self.args, kwargs=self.kwargs) if self.extendedReturnValue: task.sendval=newTask.tid, None, None else: task.sendval=newTask.tid # Schedule spawner task sched.schedule(task)
# Yield control to the scheduler
[docs]class Yield(SysCall): """ A system call for yielding the control to the scheduler. The scheduler switches the context to the next scheduled microthread. """ def handle(self, sched, task): sched.schedule(task)
# Join a child task
[docs]class Join(SysCall): """ A system call for joining a child task. The oldest chid is joined (the one with the lowest tid). Return the task's tid and its return value. * *tidlist* - list of task IDs that we are waiting on. Empty list waits on all child tasks. * *block* - ``True`` if the call should block until a task can be joined Returns a dictionary with tid for key holding the return value of the task that was joined. The dictionary has a single entry. Returns an empty dictionary if there are no children. Failed nonblocking join returns an empty dictionary. """ def __init__(self, tidList=[], block=True): self.tidList=tidList self.block=block def handle(self, sched, task): # Prepare an empty return value for the syscall task.sendval={} # No finished child found finishedChild=None # Empty tid list if len(self.tidList)==0: # No children if task.nchildren+len(task.finishedChildren)==0: # Schedule the task again sched.schedule(task) # Return an empty sendval return # A finished child exists if len(task.finishedChildren)>0: # Get the oldest child # childTid=list(task.finishedChildren.keys())[0] childTid=min(task.finishedChildren.keys()) finishedChild=task.finishedChildren[childTid] else: # Verify if listed children exist, verify if they are really children for tid in self.tidList: # Waiting on tid 0 (main task) is not allowed if tid==0: task.localThrow(Exception, "COS: Waiting on task 0 (main) is not allowed.") if tid in sched.tasks: # Child is running child=sched.tasks[tid] elif tid in task.finishedChildren: # Child is finished, get it child=task.finishedChildren[tid] if finishedChild is None or finishedChild.tid>child.tid: finishedChild=child else: task.localThrow(Exception, "COS: Child %d does not exist." % tid) if child.ptid!=task.tid: task.localThrow(Exception, "COS: Task %d is not a child of %d." % (tid, task.tid)) # Finished child found if finishedChild is not None: # Remove it del task.finishedChildren[finishedChild.tid] # Construct return value and schedule the task task.sendval[finishedChild.tid]=finishedChild.retval sched.schedule(task) return # If the call is nonblocking, return an empty sendval if not self.block: # Schedule the task again sched.schedule(task) # Return an empty sendval return else: # No finished child found, stop task because the call is blocking task.waitingOn=self.tidList task.status=Task.Swaiting
# Task wrapper
[docs]class Task(object): """ Task wrapper object. Wraps one microthread or one remote task. Arguments: * *parent* - parent task. Should be ``None`` for teh main task. * *greenlet* - greenlet of a local task * *remoteTaskID* - :class:`TaskID` object of a remote task * *remoteHostID* - :class:`HostID` object of a remote task * *name* - task name. If ``None`` a name is generated. from *greenlet* or *remoteTaskID* * *args* - positional arguments to the greenlet passed at startup * *kwargs* - keyword arguments to the greenlet passed at startup If *args* and *kwargs* are ``None`` the greenlet is assumed to be already running. Members: * ``tid`` - task id * ``ptid`` - parent task id * ``remoteTaskID`` - :class:`TaskID` object of a remote task * ``remoteHostID`` - :class:`HostID` object of a renmote task * ``nchildren`` - number of children * ``finishedChildren`` - dictionary of finished children waiting to be joined. The key is the tid of a child. * ``sendval`` - return value passed to the task at switch * ``retval`` - value returned by the finished task * ``waitingOn`` - list of child task ids the task is waiting on * ``status`` - 0 (created), 1 (running), 2 (stopped), 3 (finished) * ``scheduled`` - number of times a local task is enqueued for execution. Always 0 for remote tasks. """ Screated=0 Srunning=1 Swaiting=2 Sfinished=3 # 0 is the scheduler, 1 is the main task (created at scheduler startup) taskid=1 def __init__(self, parent, greenlet=None, remoteTaskID=None, remoteHostID=None, name=None, func=None, args=[], kwargs={}): if greenlet is not None: # Local task self.remoteTaskID=None self.remoteHostID=None self.greenlet=greenlet if args is None and kwargs is None: self.status=Task.Srunning #args=[] #kwargs={} else: self.status=Task.Screated else: # Remote task, assume it is already running self.greenlet=None self.remoteTaskID=remoteTaskID self.remoteHostID=remoteHostID self.name=name if name is not None else str(remoteTaskID) # Remote tasks are already running self.status=Task.Srunning self.func=func self.args=args self.kwargs=kwargs self.ptid=parent.tid if parent is not None else None self.scheduled=0 if name is not None: self.name=name elif func is not None: self.name=str(func) elif greenlet is not None: self.name=str(greenlet) else: self.name=str(remoteTaskID) self.tid=Task.taskid Task.taskid+=1 if parent is not None: parent.nchildren+=1 self.nchildren=0 self.finishedChildren={} self.sendval=None self.retval=None self.waitingOn=None #def __repr__(self): # return str(self.tid)+"("+str(self.remoteTaskID)+")"
[docs] def switchToLocal(self): """ Switch control to the local task (microthread) represented by this object. """ # Is the task running if self.greenlet is None: raise PyOpusError("COS: Cannot switch to remote task tid=%d." % self.tid) if self.status==Task.Screated: # Send arguments (task startup) self.status=Task.Srunning retval=self.greenlet.switch(*self.args, **self.kwargs) else: # Send the value to task retval=self.greenlet.switch(self.sendval) # Check if it is finished if self.greenlet.dead: self.status=Task.Sfinished # Reset value that will be sent at next switch self.sendval=None return retval
[docs] def localThrow(self, exception, value): """ Throws an *exception* with value *value* in the local task represented by this object. """ if self.greenlet is None: raise PyOpusError("COS: Cannot switch to a remote task.") self.greenlet.throw(exception, value)
# Scheduler
[docs]class Scheduler(object): """ Cooperative multitasking scheduler based on greenlets. *vm* - virtual machine abstraction for spawning remote tasks. The main loop of the scheduler is entered by calling the scheduler object. """ # Initially main task is active def __init__(self, vm=None, debug=0): # Task ID of scheduler self.tid=0 # The scheduler is the active task self.activeTask=self.tid # Local task queue self.ready=[] # All tasks that are running (i.e. not finished), tid for key self.tasks={} # Remote tasks that are not finished, remote task ID for key self.remoteTasks={} # A queue for spawn system calls waiting on a free remote slot self.waitingOnSlot=[] # A queue for spawn system calls waiting on a free remote slot # on a given HostID self.waitingOnSlotOnHost={} # Set VM self.setVM(vm) # Debug level self.debug=debug
[docs] def setDebug(self, debug=0): """ Set the debug message level to *debug*. Setting this value to 0 turns off debugging. """ self.debug=debug
[docs] def countTasks(self): """ Returns the number of running tasks including the scheduler and the main task. """ return len(self.tasks)
[docs] def countRemoteTasks(self): """ Returns the number of running remote tasks. """ return len(self.remoteTasks)
[docs] def countLocalTasks(self): """ Returns the number of running local tasks including the scheduler and the main task. """ return len(self.tasks)-len(self.remoteTasks)
[docs] def setVM(self, vm): """ Sets the VM abstraction object used for spawning remote tasks. Allowed only when no remote tasks are running. Setting a VM object on remote task has no effect. """ if len(self.remoteTasks)>0: raise PyOpusError("COS: Remote tasks running. Cannot replace VM.") if vm is not None and vm.parentTaskID().valid(): # Valid parent task, this is a slave # Do not allow spawning remote tasks self.vm=None elif vm is not None and vm.slots()<2: # Need at lest 2 slots for remote spawning (1 for master and 1 for worker) self.vm=None else: self.vm=vm
[docs] def new(self, parent, func, args=[], kwargs={}, remote=False, targetHostID=None): """ Create a new task. Arguments: * *parent* - parent task object * *func* - function defining the task * *args* - positional arguments to the task's function * *kwargs* - keyword arguments to the task's function * *remote* - ``True`` for a remote task * *targetHostID* - :class:`HostID` object corresponding to the host where a remote task should be started. ``None`` corresponds to any host Returns a :class:`Task` object or ``None`` on failure. """ if not remote or self.vm is None: # Create a greenlet g=greenlet(func) task=Task(parent, greenlet=g, func=func, args=args, kwargs=kwargs) self.tasks[task.tid]=task self.schedule(task) else: # Spawn a remote task if self.vm.freeSlots()<=0: return None taskIDs=self.vm.spawnFunction(func, args=args, kwargs=kwargs, count=1, targetList=[targetHostID] if targetHostID is not None else None, sendBack=True ) if len(taskIDs)>0: remoteTaskID=taskIDs[0] remoteHostID=self.vm.hostID(remoteTaskID) else: return None task=Task(parent, remoteTaskID=remoteTaskID, remoteHostID=remoteHostID, func=func, args=args, kwargs=kwargs) self.tasks[task.tid]=task self.remoteTasks[remoteTaskID]=task # Do not schedule a remote task return task
[docs] def schedule(self, task): """ Schedules a local *task* for execution. """ if task.greenlet is None: raise PyOpusError("Trying to schedule remote task tid=%d." % task.tid) if task.status is not Task.Srunning and task.status is not Task.Screated: raise PyOpusError("Trying to schedule task tid=%d that is not running." % task.tid) # Avoid scheduling a task twice if task.scheduled>0: return self.ready.append(task) task.scheduled+=1 if self.debug>0: DbgMsgOut("COS", "Task tid=%d scheduled." % (task.tid))
[docs] def enqueueSpawn(self, spawnSyscall, spawnerTask, targetHostID=None): """ Equeues a spawn system call waiting on a free slot. If *targetHostID* is specified a spawn system call is enqueued for handling when a slot is available on host given by *targetHostID* which is an object of class :class:`HostID`. """ if targetHostID is None: self.waitingOnSlot.append((spawnSyscall, spawnerTask)) else: if targetHostID not in self.waitingOnSlotOnHost: self.waitingOnSlotOnHost[targetHostID]=[] self.waitingOnSlotOnHost[targetHostID].append((spawnSyscall, spawnerTask))
def __call__(self): # Enqueue main task, receives tid=1 mainTaskGreenlet=greenlet.getcurrent().parent if not mainTaskGreenlet: raise PyOpusError("COS: Scheduler must run in a separate greenlet.") mainTask=Task(parent=None, greenlet=mainTaskGreenlet, name="_main", args=None, kwargs=None) self.tasks[mainTask.tid]=mainTask self.schedule(mainTask) # Main loop, exit when there are no tasks left while self.tasks: # Remote task remoteTask=None # Local task localTask=None # Handle messages from remote tasks while self.vm is not None: # If there are any tasks scheduled locally, do not block if len(self.ready)>0: if self.debug>1: DbgMsgOut("COS", "Nonblocking receive in scheduler.") recv=self.vm.receiveMessage(0) else: # Block because there are no local tasks scheduled if self.debug>1: DbgMsgOut("COS", "Blocking receive in scheduler.") recv=self.vm.receiveMessage(-1) if recv is not None and len(recv)==2: # Valid message, handle it (srcID, msg)=recv # Check message type if type(msg) is MsgTaskResult and srcID in self.remoteTasks: # Result message # Find task, set its return value remoteTask=self.remoteTasks[srcID] remoteTask.retval=msg.returnValue if self.debug>0: DbgMsgOut("COS", "Result message received from %s on %s, tid=%d." % (str(srcID), str(self.vm.hostID(srcID)), remoteTask.tid)) elif type(msg) is MsgTaskExit: # Task exit message if srcID in self.remoteTasks: # Get remote task remoteTask=self.remoteTasks[srcID] # Remove it from list of remote tasks del self.remoteTasks[srcID] # Mark it as finished remoteTask.status=Task.Sfinished if self.debug>0: DbgMsgOut("COS", "Task exit message received from %s on %s, tid=%d, task finished." % (str(srcID), str(self.vm.hostID(srcID)), remoteTask.tid)) # Get parent parent=self.tasks[remoteTask.ptid] # Remove task from the list of running tasks del self.tasks[remoteTask.tid] # Update children count of parent task parent.nchildren-=1 # Is the parent waiting on the task (Join system call)? if ( parent.status==Task.Swaiting and (len(parent.waitingOn)==0 or remoteTask.tid in parent.waitingOn) ): if self.debug>0: DbgMsgOut("COS", "Parent task tid=%d of task tid=%d waiting on results (Join)." % (parent.tid, remoteTask.tid)) # Set return value for parent parent.sendval={ remoteTask.tid: remoteTask.retval } # Schedule parent parent.status=Task.Srunning self.schedule(parent) else: # Parent is not waiting on the task, add to parent's finished children dictionary parent.finishedChildren[remoteTask.tid]=remoteTask # We have a free slot. # Assume we have no spawn call waiting spawnSyscall=None # Get spawn calls waiting on a specific host needHosts=set(self.waitingOnSlotOnHost.keys()) if len(needHosts)>0: # Get hosts with free slots freeHosts=self.vm.hostsWithFreeSlots() # Find first match candidateHosts=freeHosts.intersection(needHosts) if len(candidateHosts)>0: candidateHost=candidateHosts.pop() spawnSyscall, spawnerTask = self.waitingOnSlotOnHost[candidateHost].pop() if len(self.waitingOnSlotOnHost[candidateHost])<=0: del self.waitingOnSlotOnHost[candidateHost] # No syscall yet? if spawnSyscall is None: # Look in syscalls waiting on any host if len(self.waitingOnSlot)>0: # Get it spawnSyscall, spawnerTask = self.waitingOnSlot.pop(0) # Do we have a syscall now? if spawnSyscall is not None: # Check if the spawner task is still running if spawnerTask.tid in self.tasks: # Handle it if self.debug>0: DbgMsgOut("COS", "Task tid=%d is waiting on a free slot. Handling Spawn." % (spawnerTask.tid)) spawnSyscall.handle(self, spawnerTask) elif recv is not None and len(recv)==0: # Empty tuple received (timeout in nonblocking receive) # No more pending mesassages if self.debug>1: DbgMsgOut("COS", "No more pending messages.") break # No remote task status changed, but we have scheduled local tasks # if remoteTask is None and len(self.ready)>0: # Handle local tasks if len(self.ready)>0: # Get next local task localTask=self.ready.pop(0) localTask.scheduled-=1 if localTask.status is not Task.Srunning and localTask.status is not Task.Screated: raise PyOpusError(DbgMsg("COS", "A local task that is not running was scheduled.")) # Switch to local task identify.tid=localTask.tid self.activeTask=localTask.tid retval=localTask.switchToLocal() self.activeTask=self.tid identify.tid=self.tid # Task is finished if localTask.status==Task.Sfinished: # Store return value localTask.retval=retval if self.debug>0: DbgMsgOut("COS", "Local task tid=%d finished." % (localTask.tid)) # Get parent parent=self.tasks[localTask.ptid] # Remove task from the list of running tasks del self.tasks[localTask.tid] # Update children count of parent task parent.nchildren-=1 # Is the parent waiting on the task (Join system call)? if ( parent.status==Task.Swaiting and (len(parent.waitingOn)==0 or localTask.tid in parent.waitingOn) ): if self.debug>0: DbgMsgOut("COS", "Parent task tid=%d of task tid=%d waiting on results (Join)." % (parent.tid, localTask.tid)) # Set return value for parent parent.sendval={ localTask.tid: localTask.retval } # Schedule parent parent.status=Task.Srunning self.schedule(parent) else: # Parent is not waiting on the task, add to parent's finished children dictionary parent.finishedChildren[localTask.tid]=localTask elif retval is not None and isinstance(retval, SysCall): # Local task performed a system call # Handle it if self.debug>0: DbgMsgOut("COS", "Handling %s system call from task tid=%d" % (str(type(retval)), localTask.tid)) retval.handle(self, localTask)
# Do not schedule the task, leave that to the syscall handler # Cooperative OS wrapper
[docs]class OpusOS(object): """ Cooperative multitasking OS class. The user should import the only instance of this class represented by the :data:`cOS` variable. *vm* - virtual machine abstraction for spawning remote tasks. If *vm* is ``None`` remote spawning is disabled. """ scheduler=None debug=0 def __init__(self, vm=None): self._createScheduler(vm)
[docs] def setVM(self, vm=None): """ Sets the virtual machine object. Allowed only when there are no remote tasks running. This is not a system call and does not yield execution the the scheduler. """ OpusOS.scheduler.setVM(vm)
[docs] @staticmethod def setDebug(debug=0): """ Set the debug message level to *debug*. Setting this value to 0 turns off debugging. """ OpusOS.debug=debug OpusOS.scheduler.setDebug(debug)
def _createScheduler(self, vm): if OpusOS.scheduler is not None: raise PyOpusError("COS: There can be only one OpusOS object.") # Create scheduler OpusOS.scheduler=Scheduler(vm) # Create scheduler greenlet OpusOS.schedulerGreenlet=greenlet(OpusOS.scheduler) # Swith to scheduler to start it OpusOS.schedulerGreenlet.switch() # For pickling at remote spawn def __getstate__(self): # Pack the state and the vm object state=self.__dict__.copy() return state, OpusOS.scheduler.vm # For unpickling at remote spawn def __setstate__(self, stateIn): # Unpack the state and the vm object state, vm = stateIn # Set state self.__dict__.update(state) # Create scheduler object if there is none yet if OpusOS.scheduler is None: # Create scheduler if this is the first object of this type in this process self._createScheduler(vm) # Functions
[docs] @staticmethod def freeSlots(): """ Returns the number of free slots in a vm. If there is no vm, returns -1. A slot is used as soon as a task is spawned. A slot is freed as soon as a task is finished, The task does not have to be joind to free a slot. This is not a system call and does not yield execution the the scheduler. """ if OpusOS.scheduler.vm is not None: return OpusOS.scheduler.vm.freeSlots() return -1
[docs] @staticmethod def finishedTasks(): """ Returns the number of tasks that are finished but not joined. If there is no vm, returns -1. This is not a system call and does not yield execution the the scheduler. """ if OpusOS.scheduler.vm is not None: tid=OpusOS.scheduler.activeTask return len(OpusOS.scheduler.tasks[tid].finishedChildren) return -1
[docs] @staticmethod def slots(): """ Returns the number of slots in a vm. If there is no vm, returns -1. This is not a system call and does not yield execution the the scheduler. """ if OpusOS.scheduler.vm is not None: return OpusOS.scheduler.vm.slots() return -1
[docs] @staticmethod def getTid(): """ Returns the task id of the running microthread. This is not a system call and does not yield execution the the scheduler. """ return OpusOS.scheduler.activeTask
[docs] @staticmethod def toAbstractPath(p): """ Translates path *p* to abstract path that can be sent to a remote process and decoded there. Takes into account parallel mirrored storage. An abstract path comprises a parallel mirrored storage entry index and a relative path suffix. See :func:`pyopus.parallel.base.translateToAbstractPath` for more information. """ return translateToAbstractPath(p)
[docs] @staticmethod def toActualPath(abstractPath): """ Translates *abstractPath* comprising a parallel mirrored storage entry index and a relative path suffix to actual local path. If entry index is ``None`` no translation takes place and the relative path suffix is returned. See :func:`pyopus.parallel.base.translateToActualPath` for more information. """ index, relPath = abstractPath if index is None: return relPath else: return translateToActualPath(index, relPath)
[docs] @staticmethod def spawnerCall(function, args=(), kwargs={}, asynchr=False): """ Calls *function* with *args* and *kwargs* in the spawner task. If no VM object is set or this is the spawner task the *function* is called locally. IF *asynchr* is ``False`` waits for the remote call to finish and returns its return value. """ vm=OpusOS.scheduler.vm if vm is None: # No vm, call locally return function(*args, **kwargs) else: return vm.spawnerCall(function, args, kwargs, asynchr)
# System calls. These function yield execution to the scheduler.
[docs] @staticmethod def Yield(): """ Invokes the *Yield* system call. See :class:`Yield`. """ return OpusOS.schedulerGreenlet.switch(Yield())
[docs] @staticmethod def Spawn(*args, **kwargs): """ Invokes the *Spawn* system call. See :class:`Spawn`. """ return OpusOS.schedulerGreenlet.switch(Spawn(*args, **kwargs))
[docs] @staticmethod def Join(*args, **kwargs): """ Invokes the *Join* system call. See :class:`Join`. """ return OpusOS.schedulerGreenlet.switch(Join(*args, **kwargs))
# Asynchronous dispatch
[docs] @staticmethod def dispatch(jobList, collector=None, remote=True, buildResultList=None, collectHostIDs=False): """ Dispatches multiple jobs and collects the results. If *remote* is ``True`` the jobs are dispatched asynchronously across the available computing nodes. A job is a tuple of the form ``(callable, args, kwargs, extra)``. A job is evaluated by invoking the callable with given ``args`` and ``kwargs``. If ``kwargs`` is omitted only positional arguments are passed. If ``args`` is also omitted the ``callable`` is invoked without arguments. The return value of the ``callable`` is the job result. Extra data (``extra``) can be stored in the optional entries after ``kwargs``. This data is not passed to the callable. *jobList* is an iterator object (i,e, list) that holds the jobs. It can also be a generator that produces jobs. A job may not be ``None``. *collector* is an optional unprimed coroutine. When a job is finished its result is sent to the *collector* in the form of a tuple ``(index, job, result)`` where ``index`` is the index of the ``job``. Collector's task is to collect the results in a user-defined data structure. If *collectHostIDs* is ``True`` the collector receives tuples of the form ``(index, job, result, hostID)`` where ``hostID`` is an object of the class :class:`HostID` that corresponds to the host where the job was run. For jobs that were not spawned as remote jobs ``hostID`` is ``None``. By catching the ``GeneratorExit`` exception in the *collector* a postprocessing step can be performed on the collected results. Returns the list of results (in the order of generated jobs). When a *collector* is specified the results are not collected in a list unless *buildResultList* is set to ``True``. """ # Prime the collector if collector is not None: next(collector) # Prepare results list # Collect results in a list if collector is None or buildResultList is True: resList=[] jobs={} taskIDs={} hostIDs={} ii=0 while True: # Get next job try: # job=jobList.next() job=next(jobList) # Get parts f=job[0] args=job[1] if len(job)>1 else [] kwargs=job[2] if len(job)>2 else {} # Spawn tid, taskID, hostID = OpusOS.Spawn(f, args=args, kwargs=kwargs, remote=remote, block=True, extendedReturnValue=True) taskIDs[tid]=taskID hostIDs[tid]=hostID jobs[tid]=ii,job if OpusOS.debug>0: DbgMsgOut("COS", "Spawned task tid=%d, running tasks: %s" % (tid, str(jobs.keys()))) ii+=1 except StopIteration: job=None # Join a job, block if one of the following holds # - no more jobs to spawn # - there are less than 2 slots in the VM # - there are no free slots (with joined tasks) in the VM block=(job is None) or (OpusOS.slots()<2) or (OpusOS.freeSlots()-OpusOS.finishedTasks()<=0) jr=OpusOS.Join(block=block) for tid, retval in jr.items(): # Extract job jj,jjob=jobs[tid] taskID=taskIDs[tid] hostID=hostIDs[tid] del jobs[tid] if OpusOS.debug>0: DbgMsgOut("COS", "Joined task tid=%d, running tasks: %s" % (tid, str(jobs.keys()))) # Send result to the collector if collector is not None: if collectHostIDs: collector.send((jj, jjob, retval, hostID)) else: collector.send((jj, jjob, retval)) # Collect results in a list if collector is None or buildResultList is True: # Make space if len(resList)<=jj: resList.extend([None]*(jj-len(resList)+1)) # Store resList[jj]=retval # Displatched all jobs and nothing to join left if not jr and job is None: break # Shut down the collector if collector is not None: collector.close() # Return result list if collector is None or buildResultList is True: return resList else: return None
[docs] @staticmethod def dispatchSingle(function, args=[], kwargs={}, remote=True): """ Dispatches a single task defined by *function*, *args*, and *kwargs*. If *remote* is ``True`` the task is dispatched to a remote computing node. This function is used for moving a task to a remote processor and waits for the results. It is not very useful in the sense that it does not introduce any parallelism. Returns the return value of the function. """ tid=OpusOS.Spawn(function, args, kwargs, remote=remote, block=True) return OpusOS.Join()[tid]
[docs] @staticmethod def finalize(): """ Performs cleanup. Calls the :meth:`finalize` method of the vm. """ if OpusOS.scheduler.vm is not None: OpusOS.scheduler.vm.finalize()
# OS object cOS=OpusOS() "Cooperative multitasking OS instance for accessing its functionality."