sharedmem package

Easier parallel programming on shared memory computers.

The source code is at http://github.com/rainwoodman/sharedmem .

Programming Model

MapReduce provides the equivalent to multiprocessing.Pool, with the following differences:

  • MapReduce does not require the work function to be picklable.
  • MapReduce adds a reduction step that is guaranteed to run on the master process’s scope.
  • MapReduce allows the use of critical sections and ordered execution in the work function.

Shared memory segments can be accessed as numpy arrays, allocated via

Shared memory segments are visible by the master process and slave processes in MapReduce. Skillful usage of shared memory segments can avoid Python pickling as a bottle neck in the scalibility of your code.

Usage

The package can be installed via easy_install sharedmem. Alternatively, the file sharedmem.py can be directly embedded into other projects.

The only external dependency is numpy. sharedmem was designed to work with large shared memory chunks via numpy.ndarray.

Environment variable OMP_NUM_THREADS is used to determine the default number of slaves. On PBS/Torque systems, PBS_NUM_PPN is used if OMP_NUM_THREADS is not defined

Attention

This module depends on the fork system call, thus is available only on posix systems (not Windows).

Examples

Sum up a large array, printing the progress

>>> input = numpy.arange(1024 * 1024 * 128, dtype='f8')
>>> output = sharedmem.empty(1024 * 1024 * 128, dtype='f8')
>>> with sharedmem.MapReduce() as pool:
>>>    chunksize = 1024 * 1024
>>>    def work(i):
>>>        s = slice (i, i + chunksize)
>>>        output[s] = input[s]
>>>        return i, sum(input[s]) # we use the slower python sum operator
>>>    def reduce(i, r):
>>>        print('chunk', i, 'done')
>>>        return r
>>>    r = pool.map(work, range(0, len(input), chunksize), reduce=reduce)
>>> print numpy.sum(r)
>>>

Count the total number of bacon and eggs in a directory.

>>> files = glob.glob('mydata/*.txt')
>>> word_count = {'bacon': 0, 'eggs': 0 }
>>> with sharedmem.MapReduce() as pool:
>>>    def work(filename):
>>>        f = file(filename, 'r').read()
>>>        wc = dict(word_count) # copy the word_count dict
>>>        for word in f.split():
>>>            if word in wc:
>>>                wc[word] += 1
>>>        return filename, wc
>>>    def reduce(filename, wc):
>>>        print (filename, 'done')
>>>        for key in word_count:
>>>            word_count[key] += wc[key]
>>>    pool.map(work, input, reduce=reduce)
>>> print word_count
>>>

pool.ordered can be used to require a block of code to be executed in order

>>> with sharedmem.MapReduce() as pool:
>>>    def work(i):
>>>         with pool.ordered:
>>>            print('Hello World from rank', i, '/', pool.np)
>>>    pool.map(work, range(pool.np))

pool.critical can be used to protect a block of code, ensuring no two workers enter the code block at the same time.

>>> counter = sharedmem.empty(1)
>>> counter[:] = 0
>>> with sharedmem.MapReduce() as pool:
>>>    def work(i):
>>>         with pool.critical:
>>>             counter[:] += i
>>>    pool.map(work, range(10))
>>> print(counter)

API References

sharedmem.set_debug(flag)[source]

Set the debug mode.

In debug mode (flag==True), the MapReduce pool will run the work function on the master thread / process. This ensures all exceptions can be properly inspected by a debugger, e.g. pdb.

Parameters:

flag : boolean

True for debug mode, False for production mode.

sharedmem.get_debug()[source]

Get the debug mode.

Returns:The debug mode. True if currently in debugging mode.
sharedmem.total_memory()[source]

Returns the the amount of memory available for use.

The memory is obtained from MemTotal entry in /proc/meminfo.

Notes

This function is not very useful and not very portable.

sharedmem.cpu_count()[source]

Returns the default number of slave processes to be spawned.

The default value is the number of physical cpu cores seen by python. OMP_NUM_THREADS environment variable overrides it.

On PBS/torque systems if OMP_NUM_THREADS is empty, we try to use the value of PBS_NUM_PPN variable.

Notes

On some machines the physical number of cores does not equal the number of cpus shall be used. PSC Blacklight for example.

exception sharedmem.SlaveException(reason, traceback)[source]

Bases: exceptions.Exception

Represents an exception that has occured during a slave process

Attributes

reason (Exception, or subclass of Exception.) The underlining reason of the exception. If the original exception can be pickled, the type of the exception is preserved. Otherwise, a LostExceptionType warning is issued, and reason is of type Exception.
traceback (str) The string version of the traceback that can be used to inspect the error.
exception sharedmem.StopProcessGroup[source]

Bases: exceptions.Exception

A special type of Exception. StopProcessGroup will terminate the slave process/thread

class sharedmem.background(function, *args, **kwargs)[source]

Bases: object

Asyncrhonized function call via a background process.

Parameters:

function : callable

the function to call

*args : positional arguments

**kwargs : keyward arguments

Examples

>>> def function(*args, **kwargs):
>>>    pass
>>> bg = background(function, *args, **kwargs)
>>> rt = bg.wait()

Methods

wait() Wait and join the child process.
wait()[source]

Wait and join the child process. The return value of the function call is returned. If any exception occurred it is wrapped and raised.

class sharedmem.MapReduce(backend=<class sharedmem.ProcessBackend at 0x7f9b60b20738>, np=None)[source]

Bases: object

A pool of slave processes for a Map-Reduce operation

Parameters:

backend : ProcessBackend or ThreadBackend

ProcessBackend is preferred. ThreadBackend can be used in cases where processes creation is not allowed.

np : int or None

Number of processes to use. Default (None) is from OMP_NUM_THREADS or the number of available cores on the computer. If np is 0, all operations are performed on the master process – no child processes are created.

Notes

Always wrap the call to map() in a context manager (‘with’) block.

Methods

map(func, sequence[, reduce, star]) Map-reduce with multile processes.
map(func, sequence, reduce=None, star=False)[source]

Map-reduce with multile processes.

Apply func to each item on the sequence, in parallel. As the results are collected, reduce is called on the result. The reduced result is returned as a list.

Parameters:

func : callable

The function to call. It must accept the same number of arguments as the length of an item in the sequence.

Warning

func is not supposed to use exceptions for flow control. In non-debug mode all exceptions will be wrapped into a SlaveException.

sequence : list or array_like

The sequence of arguments to be applied to func.

reduce : callable, optional

Apply an reduction operation on the return values of func. If func returns a tuple, they are treated as positional arguments of reduce.

star : boolean

if True, the items in sequence are treated as positional arguments of reduce.

Returns:

results : list

The list of reduced results from the map operation, in the order of the arguments of sequence.

Raises:

SlaveException

If any of the slave process encounters an exception. Inspect SlaveException.reason for the underlying exception.

sharedmem.MapReduceByThread(np=None)[source]

Creates a MapReduce object but with the Thread backend.

The process backend is usually preferred.

sharedmem.empty(shape, dtype='f8')[source]

Create an empty shared memory array.

sharedmem.empty_like(array, dtype=None)[source]

Create a shared memory array from the shape of array.

sharedmem.full(shape, value, dtype='f8')[source]

Create a shared memory array of given shape and type, filled with value.

sharedmem.full_like(array, value, dtype=None)[source]

Create a shared memory array with the same shape and type as a given array, filled with value.

sharedmem.copy(a)[source]

Copy an array to the shared memory.

Notes

copy is not always necessary because the private memory is always copy-on-write.

Use a = copy(a) to immediately dereference the old ‘a’ on private memory

Table Of Contents

This Page