Easier parallel programming on shared memory computers.
The source code is at http://github.com/rainwoodman/sharedmem .
MapReduce provides the equivalent to multiprocessing.Pool, with the following differences:
Shared memory segments can be accessed as numpy arrays, allocated via
Shared memory segments are visible by the master process and slave processes in MapReduce. Skillful usage of shared memory segments can avoid Python pickling as a bottle neck in the scalibility of your code.
The package can be installed via easy_install sharedmem. Alternatively, the file sharedmem.py can be directly embedded into other projects.
The only external dependency is numpy. sharedmem was designed to work with large shared memory chunks via numpy.ndarray.
Environment variable OMP_NUM_THREADS is used to determine the default number of slaves. On PBS/Torque systems, PBS_NUM_PPN is used if OMP_NUM_THREADS is not defined
Attention
This module depends on the fork system call, thus is available only on posix systems (not Windows).
Sum up a large array, printing the progress
>>> input = numpy.arange(1024 * 1024 * 128, dtype='f8')
>>> output = sharedmem.empty(1024 * 1024 * 128, dtype='f8')
>>> with sharedmem.MapReduce() as pool:
>>> chunksize = 1024 * 1024
>>> def work(i):
>>> s = slice (i, i + chunksize)
>>> output[s] = input[s]
>>> return i, sum(input[s]) # we use the slower python sum operator
>>> def reduce(i, r):
>>> print('chunk', i, 'done')
>>> return r
>>> r = pool.map(work, range(0, len(input), chunksize), reduce=reduce)
>>> print numpy.sum(r)
>>>
Count the total number of bacon and eggs in a directory.
>>> files = glob.glob('mydata/*.txt')
>>> word_count = {'bacon': 0, 'eggs': 0 }
>>> with sharedmem.MapReduce() as pool:
>>> def work(filename):
>>> f = file(filename, 'r').read()
>>> wc = dict(word_count) # copy the word_count dict
>>> for word in f.split():
>>> if word in wc:
>>> wc[word] += 1
>>> return filename, wc
>>> def reduce(filename, wc):
>>> print (filename, 'done')
>>> for key in word_count:
>>> word_count[key] += wc[key]
>>> pool.map(work, input, reduce=reduce)
>>> print word_count
>>>
pool.ordered can be used to require a block of code to be executed in order
>>> with sharedmem.MapReduce() as pool:
>>> def work(i):
>>> with pool.ordered:
>>> print('Hello World from rank', i, '/', pool.np)
>>> pool.map(work, range(pool.np))
pool.critical can be used to protect a block of code, ensuring no two workers enter the code block at the same time.
>>> counter = sharedmem.empty(1)
>>> counter[:] = 0
>>> with sharedmem.MapReduce() as pool:
>>> def work(i):
>>> with pool.critical:
>>> counter[:] += i
>>> pool.map(work, range(10))
>>> print(counter)
Set the debug mode.
In debug mode (flag==True), the MapReduce pool will run the work function on the master thread / process. This ensures all exceptions can be properly inspected by a debugger, e.g. pdb.
Parameters: | flag : boolean
|
---|
Get the debug mode.
Returns: | The debug mode. True if currently in debugging mode. |
---|
Returns the the amount of memory available for use.
The memory is obtained from MemTotal entry in /proc/meminfo.
Notes
This function is not very useful and not very portable.
Returns the default number of slave processes to be spawned.
The default value is the number of physical cpu cores seen by python. OMP_NUM_THREADS environment variable overrides it.
On PBS/torque systems if OMP_NUM_THREADS is empty, we try to use the value of PBS_NUM_PPN variable.
Notes
On some machines the physical number of cores does not equal the number of cpus shall be used. PSC Blacklight for example.
Bases: exceptions.Exception
Represents an exception that has occured during a slave process
Attributes
reason | (Exception, or subclass of Exception.) The underlining reason of the exception. If the original exception can be pickled, the type of the exception is preserved. Otherwise, a LostExceptionType warning is issued, and reason is of type Exception. |
traceback | (str) The string version of the traceback that can be used to inspect the error. |
Bases: exceptions.Exception
A special type of Exception. StopProcessGroup will terminate the slave process/thread
Bases: object
Asyncrhonized function call via a background process.
Parameters: | function : callable
*args : positional arguments **kwargs : keyward arguments |
---|
Examples
>>> def function(*args, **kwargs):
>>> pass
>>> bg = background(function, *args, **kwargs)
>>> rt = bg.wait()
Methods
wait() | Wait and join the child process. |
Bases: object
A pool of slave processes for a Map-Reduce operation
Parameters: | backend : ProcessBackend or ThreadBackend
np : int or None
|
---|
Notes
Always wrap the call to map() in a context manager (‘with’) block.
Methods
map(func, sequence[, reduce, star]) | Map-reduce with multile processes. |
Map-reduce with multile processes.
Apply func to each item on the sequence, in parallel. As the results are collected, reduce is called on the result. The reduced result is returned as a list.
Parameters: | func : callable
sequence : list or array_like
reduce : callable, optional
star : boolean
|
---|---|
Returns: | results : list
|
Raises: | SlaveException
|
Creates a MapReduce object but with the Thread backend.
The process backend is usually preferred.
Create a shared memory array from the shape of array.
Create a shared memory array of given shape and type, filled with value.