3.3. pyopus.parallel.cooperative
— Cooperative multitasking OS with task outsourcing
Cooperative multitasking OS with task outsourcing (PyOPUS subsystem name: COS)
This module is based on the greenlet
module. Concurrent tasks can be created
in a UNIX-like fashion (with the Spawn()
method). The return value of a task
or multiple tasks is collected with the Join()
method. Joins can be blocking
or nonblocking.
The cooperative multitasking OS takes care of outsourcing the tasks to computing
nodes if it is permitted to do this and there are computing nodes available.
Outsourcing uses the virtual machine specified by calling the setVM()
method.
If no virtual machine is specified outsourcing is not possible.
COS makes it possible to implement multilevel parallelism and asynchronous
algorithms in a simple manner. Parallel algorithms can be run on a single CPU
by taking advantage of the greenlet
module for providing the microthread
functionality. Every local task is a microthread that runs concurrently with
other microthreads. The microthreads are cooperatively scheduled by COS.
Of course such a run is slower than a real parallel run involving multiple
tasks across multiple processors.
- class pyopus.parallel.cooperative.Join(tidList=[], block=True)[source]
A system call for joining a child task. The oldest chid is joined (the one with the lowest tid). Return the task’s tid and its return value.
tidlist - list of task IDs that we are waiting on. Empty list waits on all child tasks.
block -
True
if the call should block until a task can be joined
Returns a dictionary with tid for key holding the return value of the task that was joined. The dictionary has a single entry.
Returns an empty dictionary if there are no children.
Failed nonblocking join returns an empty dictionary.
- class pyopus.parallel.cooperative.OpusOS(vm=None)[source]
Cooperative multitasking OS class.
The user should import the only instance of this class represented by the
cOS
variable.vm - virtual machine abstraction for spawning remote tasks.
If vm is
None
remote spawning is disabled.- static dispatch(jobList, collector=None, remote=True, buildResultList=None, collectHostIDs=False)[source]
Dispatches multiple jobs and collects the results. If remote is
True
the jobs are dispatched asynchronously across the available computing nodes.A job is a tuple of the form
(callable, args, kwargs, extra)
. A job is evaluated by invoking the callable with givenargs
andkwargs
. Ifkwargs
is omitted only positional arguments are passed. Ifargs
is also omitted thecallable
is invoked without arguments. The return value of thecallable
is the job result. Extra data (extra
) can be stored in the optional entries afterkwargs
. This data is not passed to the callable.jobList is an iterator object (i,e, list) that holds the jobs. It can also be a generator that produces jobs. A job may not be
None
.collector is an optional unprimed coroutine. When a job is finished its result is sent to the collector in the form of a tuple
(index, job, result)
whereindex
is the index of thejob
. Collector’s task is to collect the results in a user-defined data structure.If collectHostIDs is
True
the collector receives tuples of the form(index, job, result, hostID)
wherehostID
is an object of the classHostID
that corresponds to the host where the job was run. For jobs that were not spawned as remote jobshostID
isNone
.By catching the
GeneratorExit
exception in the collector a postprocessing step can be performed on the collected results.Returns the list of results (in the order of generated jobs). When a collector is specified the results are not collected in a list unless buildResultList is set to
True
.
- static dispatchSingle(function, args=[], kwargs={}, remote=True)[source]
Dispatches a single task defined by function, args, and kwargs. If remote is
True
the task is dispatched to a remote computing node.This function is used for moving a task to a remote processor and waits for the results. It is not very useful in the sense that it does not introduce any parallelism.
Returns the return value of the function.
- static finalize()[source]
Performs cleanup. Calls the
finalize()
method of the vm.
- static finishedTasks()[source]
Returns the number of tasks that are finished but not joined.
If there is no vm, returns -1.
This is not a system call and does not yield execution the the scheduler.
- static freeSlots()[source]
Returns the number of free slots in a vm. If there is no vm, returns -1.
A slot is used as soon as a task is spawned. A slot is freed as soon as a task is finished, The task does not have to be joind to free a slot.
This is not a system call and does not yield execution the the scheduler.
- static getTid()[source]
Returns the task id of the running microthread.
This is not a system call and does not yield execution the the scheduler.
- static setDebug(debug=0)[source]
Set the debug message level to debug.
Setting this value to 0 turns off debugging.
- setVM(vm=None)[source]
Sets the virtual machine object.
Allowed only when there are no remote tasks running.
This is not a system call and does not yield execution the the scheduler.
- static slots()[source]
Returns the number of slots in a vm. If there is no vm, returns -1.
This is not a system call and does not yield execution the the scheduler.
- static spawnerCall(function, args=(), kwargs={}, asynchr=False)[source]
Calls function with args and kwargs in the spawner task. If no VM object is set or this is the spawner task the function is called locally.
IF asynchr is
False
waits for the remote call to finish and returns its return value.
- static toAbstractPath(p)[source]
Translates path p to abstract path that can be sent to a remote process and decoded there. Takes into account parallel mirrored storage.
An abstract path comprises a parallel mirrored storage entry index and a relative path suffix.
See
pyopus.parallel.base.translateToAbstractPath()
for more information.
- static toActualPath(abstractPath)[source]
Translates abstractPath comprising a parallel mirrored storage entry index and a relative path suffix to actual local path.
If entry index is
None
no translation takes place and the relative path suffix is returned.See
pyopus.parallel.base.translateToActualPath()
for more information.
- class pyopus.parallel.cooperative.Scheduler(vm=None, debug=0)[source]
Cooperative multitasking scheduler based on greenlets.
vm - virtual machine abstraction for spawning remote tasks.
The main loop of the scheduler is entered by calling the scheduler object.
- countLocalTasks()[source]
Returns the number of running local tasks including the scheduler and the main task.
- countTasks()[source]
Returns the number of running tasks including the scheduler and the main task.
- enqueueSpawn(spawnSyscall, spawnerTask, targetHostID=None)[source]
Equeues a spawn system call waiting on a free slot.
If targetHostID is specified a spawn system call is enqueued for handling when a slot is available on host given by targetHostID which is an object of class
HostID
.
- new(parent, func, args=[], kwargs={}, remote=False, targetHostID=None)[source]
Create a new task.
Arguments:
parent - parent task object
func - function defining the task
args - positional arguments to the task’s function
kwargs - keyword arguments to the task’s function
remote -
True
for a remote tasktargetHostID -
HostID
object corresponding to the host where a remote task should be started.None
corresponds to any host
Returns a
Task
object orNone
on failure.
- class pyopus.parallel.cooperative.Spawn(func, args=[], kwargs={}, remote=False, targetHostID=None, block=True, enqueue=True, extendedReturnValue=False)[source]
System call for spawning a child task.
func - function defining the task
args - positional arguments to the task’s function
kwargs - keyword arguments to the task’s function
remote -
True
if a remote spawn is requested. Spawns a local task if the scheduler has no VM object or remote isFalse
.targetHostID - specifies the
HostID
of the host where the remote task should be started.None
corresponds to any host.block -
True
if a remote spawn should block until a slot is free. Has no effect for local spawns. A nonblocking remote spawn returns tid=-1 on failure.enqueue -
True
if a remote spawn should be queued if no slot is available. Setting block toTrue
implies enqueue is alsoTrue
regardless of the value passed as enqueue.extendedReturnValue - when set to
True
the return value is a tuple of the form(tid, taskID, hostID)
. For local taskstaskID
andhostII
are alwaysNone
. For remote tasks they contain theTaskID
andHostID
objects corresponding to the remote task. If a task is enqueued for remote execution with a nonblocking spawntaskID
isNone
whilehostID
is equal to targetHostID.
Returns the tid of a spawned task.
- class pyopus.parallel.cooperative.Task(parent, greenlet=None, remoteTaskID=None, remoteHostID=None, name=None, func=None, args=[], kwargs={})[source]
Task wrapper object. Wraps one microthread or one remote task.
Arguments:
parent - parent task. Should be
None
for teh main task.greenlet - greenlet of a local task
remoteTaskID -
TaskID
object of a remote taskremoteHostID -
HostID
object of a remote taskname - task name. If
None
a name is generated. from greenlet or remoteTaskIDargs - positional arguments to the greenlet passed at startup
kwargs - keyword arguments to the greenlet passed at startup
If args and kwargs are
None
the greenlet is assumed to be already running.Members:
tid
- task idptid
- parent task idremoteTaskID
-TaskID
object of a remote taskremoteHostID
-HostID
object of a renmote tasknchildren
- number of childrenfinishedChildren
- dictionary of finished children waiting to be joined. The key is the tid of a child.sendval
- return value passed to the task at switchretval
- value returned by the finished taskwaitingOn
- list of child task ids the task is waiting onstatus
- 0 (created), 1 (running), 2 (stopped), 3 (finished)scheduled
- number of times a local task is enqueued for execution. Always 0 for remote tasks.
- class pyopus.parallel.cooperative.Yield[source]
A system call for yielding the control to the scheduler.
The scheduler switches the context to the next scheduled microthread.
- pyopus.parallel.cooperative.cOS
Cooperative multitasking OS instance for accessing its functionality.