3.3. pyopus.parallel.cooperative — Cooperative multitasking OS with task outsourcing

Inheritance diagram of pyopus.parallel.cooperative

Cooperative multitasking OS with task outsourcing (PyOPUS subsystem name: COS)

This module is based on the greenlet module. Concurrent tasks can be created in a UNIX-like fashion (with the Spawn() method). The return value of a task or multiple tasks is collected with the Join() method. Joins can be blocking or nonblocking.

The cooperative multitasking OS takes care of outsourcing the tasks to computing nodes if it is permitted to do this and there are computing nodes available. Outsourcing uses the virtual machine specified by calling the setVM() method. If no virtual machine is specified outsourcing is not possible.

COS makes it possible to implement multilevel parallelism and asynchronous algorithms in a simple manner. Parallel algorithms can be run on a single CPU by taking advantage of the greenlet module for providing the microthread functionality. Every local task is a microthread that runs concurrently with other microthreads. The microthreads are cooperatively scheduled by COS. Of course such a run is slower than a real parallel run involving multiple tasks across multiple processors.

class pyopus.parallel.cooperative.Task(parent, greenlet=None, remoteTaskID=None, remoteHostID=None, name=None, func=None, args=[], kwargs={})

Task wrapper object. Wraps one microthread or one remote task.

Arguments:

  • parent - parent task. Should be None for teh main task.
  • greenlet - greenlet of a local task
  • remoteTaskID - TaskID object of a remote task
  • remoteHostID - HostID object of a remote task
  • name - task name. If None a name is generated. from greenlet or remoteTaskID
  • args - positional arguments to the greenlet passed at startup
  • kwargs - keyword arguments to the greenlet passed at startup

If args and kwargs are None the greenlet is assumed to be already running.

Members:

  • tid - task id
  • ptid - parent task id
  • remoteTaskID - TaskID object of a remote task
  • remoteHostID - HostID object of a renmote task
  • nchildren - number of children
  • finishedChildren - dictionary of finished children waiting to be joined. The key is the tid of a child.
  • sendval - return value passed to the task at switch
  • retval - value returned by the finished task
  • waitingOn - list of child task ids the task is waiting on
  • status - 0 (created), 1 (running), 2 (stopped), 3 (finished)
  • scheduled - number of times a local task is enqueued for execution. Always 0 for remote tasks.
localThrow(exception, value)

Throws an exception with value value in the local task represented by this object.

switchToLocal()

Switch control to the local task (microthread) represented by this object.

class pyopus.parallel.cooperative.SysCall

Base class for COS system calls.

class pyopus.parallel.cooperative.Spawn(func, args=[], kwargs={}, remote=False, targetHostID=None, block=True, enqueue=True, extendedReturnValue=False)

System call for spawning a child task.

  • func - function defining the task
  • args - positional arguments to the task’s function
  • kwargs - keyword arguments to the task’s function
  • remote - True if a remote spawn is requested. Spawns a local task if the scheduler has no VM object or remote is False.
  • targetHostID - specifies the HostID of the host where the remote task should be started. None corresponds to any host.
  • block - True if a remote spawn should block until a slot is free. Has no effect for local spawns. A nonblocking remote spawn returns tid=-1 on failure.
  • enqueue - True if a remote spawn should be queued if no slot is available. Setting block to True implies enqueue is also True regardless of the value passed as enqueue.
  • extendedReturnValue - when set to True the return value is a tuple of the form (tid, taskID, hostID). For local tasks taskID and hostII are always None. For remote tasks they contain the TaskID and HostID objects corresponding to the remote task. If a task is enqueued for remote execution with a nonblocking spawn taskID is None while hostID is equal to targetHostID.

Returns the tid of a spawned task.

class pyopus.parallel.cooperative.Yield

A system call for yielding the control to the scheduler.

The scheduler switches the context to the next scheduled microthread.

class pyopus.parallel.cooperative.Join(tidList=[], block=True)

A system call for joining a child task. The oldest chid is joined (the one with the lowest tid). Return the task’s tid and its return value.

  • tidlist - list of task IDs that we are waiting on. Empty list waits on all child tasks.
  • block - True if the call should block until a task can be joined

Returns a dictionary with tid for key holding the return value of the task that was joined. The dictionary has a single entry.

Returns an empty dictionary if there are no children.

Failed nonblocking join returns an empty dictionary.

class pyopus.parallel.cooperative.Scheduler(vm=None, debug=0)

Cooperative multitasking scheduler based on greenlets.

vm - virtual machine abstraction for spawning remote tasks.

The main loop of the scheduler is entered by calling the scheduler object.

countLocalTasks()

Returns the number of running local tasks including the scheduler and the main task.

countRemoteTasks()

Returns the number of running remote tasks.

countTasks()

Returns the number of running tasks including the scheduler and the main task.

enqueueSpawn(spawnSyscall, spawnerTask, targetHostID=None)

Equeues a spawn system call waiting on a free slot.

If targetHostID is specified a spawn system call is enqueued for handling when a slot is available on host given by targetHostID which is an object of class HostID.

new(parent, func, args=[], kwargs={}, remote=False, targetHostID=None)

Create a new task.

Arguments:

  • parent - parent task object
  • func - function defining the task
  • args - positional arguments to the task’s function
  • kwargs - keyword arguments to the task’s function
  • remote - True for a remote task
  • targetHostID - HostID object corresponding to the host where a remote task should be started. None corresponds to any host

Returns a Task object or None on failure.

schedule(task)

Schedules a local task for execution.

setDebug(debug=0)

Set the debug message level to debug.

Setting this value to 0 turns off debugging.

setVM(vm)

Sets the VM abstraction object used for spawning remote tasks.

Allowed only when no remote tasks are running. Setting a VM object on remote task has no effect.

class pyopus.parallel.cooperative.OpusOS(vm=None)

Cooperative multitasking OS class.

The user should import the only instance of this class represented by the cOS variable.

vm - virtual machine abstraction for spawning remote tasks.

If vm is None remote spawning is disabled.

static Join(*args, **kwargs)

Invokes the Join system call.

See Join.

static Spawn(*args, **kwargs)

Invokes the Spawn system call.

See Spawn.

static Yield()

Invokes the Yield system call.

See Yield.

static dispatch(jobList, collector=None, remote=True, buildResultList=None, collectHostIDs=False)

Dispatches multiple jobs and collects the results. If remote is True the jobs are dispatched asynchronously across the available computing nodes.

A job is a tuple of the form (callable, args, kwargs, extra). A job is evaluated by invoking the callable with given args and kwargs. If kwargs is omitted only positional arguments are passed. If args is also omitted the callable is invoked without arguments. The return value of the callable is the job result. Extra data (extra) can be stored in the optional entries after kwargs. This data is not passed to the callable.

jobList is an iterator object (i,e, list) that holds the jobs. It can also be a generator that produces jobs. A job may not be None.

collector is an optional unprimed coroutine. When a job is finished its result is sent to the collector in the form of a tuple (index, job, result) where index is the index of the job. Collector’s task is to collect the results in a user-defined data structure.

If collectHostIDs is True the collector receives tuples of the form (index, job, result, hostID) where hostID is an object of the class HostID that corresponds to the host where the job was run. For jobs that were not spawned as remote jobs hostID is None.

By catching the GeneratorExit exception in the collector a postprocessing step can be performed on the collected results.

Returns the list of results (in the order of generated jobs). When a collector is specified the results are not collected in a list unless buildResultList is set to True.

static dispatchSingle(function, args=[], kwargs={}, remote=True)

Dispatches a single task defined by function, args, and kwargs. If remote is True the task is dispatched to a remote computing node.

This function is used for moving a task to a remote processor and waits for the results. It is not very useful in the sense that it does not introduce any parallelism.

Returns the return value of the function.

static finalize()

Performs cleanup. Calls the finalize() method of the vm.

static finishedTasks()

Returns the number of tasks that are finished but not joined.

If there is no vm, returns -1.

This is not a system call and does not yield execution the the scheduler.

static freeSlots()

Returns the number of free slots in a vm. If there is no vm, returns -1.

A slot is used as soon as a task is spawned. A slot is freed as soon as a task is finished, The task does not have to be joind to free a slot.

This is not a system call and does not yield execution the the scheduler.

static getTid()

Returns the task id of the running microthread.

This is not a system call and does not yield execution the the scheduler.

static setDebug(debug=0)

Set the debug message level to debug.

Setting this value to 0 turns off debugging.

setVM(vm=None)

Sets the virtual machine object.

Allowed only when there are no remote tasks running.

This is not a system call and does not yield execution the the scheduler.

static slots()

Returns the number of slots in a vm. If there is no vm, returns -1.

This is not a system call and does not yield execution the the scheduler.

static toAbstractPath(p)

Translates path p to abstract path that can be sent to a remote process and decoded there. Takes into account parallel mirrored storage.

An abstract path comprises a parallel mirrored storage entry index and a relative path suffix.

See pyopus.parallel.base.translateToAbstractPath() for more information.

static toActualPath(abstractPath)

Translates abstractPath comprising a parallel mirrored storage entry index and a relative path suffix to actual local path.

If entry index is None no translation takes place and the relative path suffix is returned.

See pyopus.parallel.base.translateToActualPath() for more information.

pyopus.parallel.cooperative.cOS

Cooperative multitasking OS instance for accessing its functionality.