Source code for pmesh.domain

"""
Domain Decomposition in Gaepsi

currently we have a GridND decomposition algorithm.

A few concepts:

- `local` and `ghost`. A data entry is local if
it is stored on the current rank. A data entry is ghost
if it is stored on another rank.

- `primary` and `padding`. A spatial position is
primary if it is within the spatial boundary the current rank.
If it is not, then it is in the `padding` region, which is
determined by the `smoothing` parameter of decompose. If it
is further out, there must have been an error.

`local` and `primary` are not necessarily the same set of entries.

"""
from mpi4py import MPI
import numpy
import heapq

[docs]def bincountv(x, weights, minlength=None, dtype=None, out=None): """ bincount with vector weights """ weights = numpy.array(weights) if minlength == None: if len(x) == 0: minlength = 0 else: minlength = x.max() + 1 if dtype is None: dtype = weights.dtype # protect dtype dtype = numpy.dtype(dtype) shape = [minlength] + list(weights.shape[1:]) if out is None: out = numpy.empty(shape, dtype=dtype) for index in numpy.ndindex(*shape[1:]): ind = tuple([Ellipsis] + list(index)) out[ind] = numpy.bincount(x, weights[ind], minlength=minlength) return out
[docs]def promote(data, comm): data = numpy.asarray(data) dtypes = comm.allgather(data.dtype.str) if any(['V' in dtype for dtype in dtypes]): # structs? common type doesn't work here. if not all([dtypes == dtype for dtype in dtypes]): raise TypeError("type of data is incompatible -- some are structs, some are not") else: # refuse to promote the type of structs return data else: dtype = numpy.find_common_type(list(set(dtypes)), []) data = numpy.asarray(data, dtype) allshape = comm.allgather(data.shape[1:]) if any([numpy.any(shape != data.shape[1:]) for shape in allshape]): raise ValueError('the shape of the data does not match accross ranks.') return data
[docs]class Layout(object): """ The communication layout of a domain decomposition. Given a set of particles, Layout object knows how to move particles around. Do not create a Layout object directly. Always use :py:meth:`GridND.decompose`. Useful methods are :py:meth:`exchange`, and :py:meth:`gather`. """ def __init__(self, comm, oldlength, sendcounts, indices, recvcounts=None): """ sendcounts is the number of items to send indices is the indices of the items in the data array. """ self.comm = comm assert self.comm.size == sendcounts.shape self.sendcounts = numpy.array(sendcounts, order='C') self.recvcounts = numpy.empty_like(self.sendcounts, order='C') self.sendoffsets = numpy.zeros_like(self.sendcounts, order='C') self.recvoffsets = numpy.zeros_like(self.recvcounts, order='C') if recvcounts is None: # calculate the recv counts array # ! Alltoall self.comm.Barrier() self.comm.Alltoall(self.sendcounts, self.recvcounts) self.comm.Barrier() else: self.recvcounts = recvcounts self.sendoffsets[1:] = self.sendcounts.cumsum()[:-1] self.recvoffsets[1:] = self.recvcounts.cumsum()[:-1] self.oldlength = oldlength self.newlength = self.recvcounts.sum() self.indices = indices
[docs] def exchange(self, data): """ Deliever data to the intersecting domains. Parameters ---------- data : array_like (, extra_dimensions) The data to be delievered. It shall be of the same length and of the same ordering of the input position that builds the layout. Each element is a matching element of the position used in the call to :py:meth:`GridND.decompose` Returns ------- newdata : array_like The data delievered to the domain. Ghosts are created if a particle intersects multiple domains. Refer to :py:meth:`gather` for collecting data of ghosts. """ # first convert to array data = promote(data, self.comm) if any(self.comm.allgather(len(data) != self.oldlength)): raise ValueError( 'the length of data does not match that used to build the layout') #build buffer # Watch out: # take produces C-contiguous array, # friendly to alltoallv. # fancy indexing does not always return C_contiguous # array (2 days to realize this!) buffer = data.take(self.indices, axis=0) # build a dtype for communication # this is to avoid 2GB limit from bytes. duplicity = numpy.product(numpy.array(data.shape[1:], 'intp')) itemsize = duplicity * data.dtype.itemsize dt = MPI.BYTE.Create_contiguous(itemsize) dt.Commit() dtype = numpy.dtype((data.dtype, data.shape[1:])) recvbuffer = numpy.empty(self.newlength, dtype=dtype, order='C') self.comm.Barrier() # now fire rt = self.comm.Alltoallv((buffer, (self.sendcounts, self.sendoffsets), dt), (recvbuffer, (self.recvcounts, self.recvoffsets), dt)) dt.Free() self.comm.Barrier() return recvbuffer
[docs] def gather(self, data, mode='sum', out=None): """ Pull the data from other ranks back to its original hosting rank. Attributes ---------- data : array_like data for each received particles. mode : string 'sum', 'any', 'mean', 'all', 'local', or any numpy ufunc. :code:`'all'` is to return all results, local and ghosts, without any reduction :code:`'sum'` is to reduce the ghosts to the local with sum. :code:`'local'` is to remove ghosts, keeping only local :code:`'any'` is to reduce the ghosts to the local, by using any one local or ghost :code:`'mean'` is to reduce the ghosts, to use the mean (sum divided by the total number) any :code:`numpy.ufunc` is to reduce the ghosts, with the ufunc. `'sum'` is equivalent to `numpy.ufunc.add` out : array_like or None stores the result of the gather operation. Returns ------- gathered : array_like gathered data. It is of the same length and ordering of the original positions used in :py:meth:`domain.decompose`. When mode is 'all', all gathered particles (corresponding to self.indices) are returned. """ # lets check the data type first data = promote(data, self.comm) if any(self.comm.allgather(len(data) != self.newlength)): raise ValueError( 'the length of data does not match result of a domain.exchange') dtype = numpy.dtype((data.dtype, data.shape[1:])) if mode == 'local': self.comm.Barrier() # drop all ghosts communication is not needed if out is None: out = numpy.empty(self.oldlength, dtype=dtype) # indices uses send offsets start2 = self.sendoffsets[self.comm.rank] size2 = self.sendcounts[self.comm.rank] end2 = start2 + size2 ind = self.indices[start2:end2] # data uses send offsets start1 = self.recvoffsets[self.comm.rank] size1 = self.recvcounts[self.comm.rank] end1 = start1 + size1 out[ind] = data[start1:end1] return out # build a dtype for communication # this is to avoid 2GB limit from bytes. duplicity = numpy.product(numpy.array(data.shape[1:], 'intp')) itemsize = duplicity * data.dtype.itemsize dt = MPI.BYTE.Create_contiguous(itemsize) dt.Commit() recvbuffer = numpy.empty(len(self.indices), dtype=dtype, order='C') self.comm.Barrier() # now fire rt = self.comm.Alltoallv((data, (self.recvcounts, self.recvoffsets), dt), (recvbuffer, (self.sendcounts, self.sendoffsets), dt)) dt.Free() self.comm.Barrier() if self.oldlength == 0: if out is None: out = numpy.empty(self.oldlength, dtype=dtype) return out if mode == 'all': if out is None: out = recvbuffer else: out[...] = recvbuffer return out if mode == 'sum': return bincountv(self.indices, recvbuffer, minlength=self.oldlength, out=out) if isinstance(mode, numpy.ufunc): arg = self.indices.argsort() recvbuffer = recvbuffer[arg] N = numpy.bincount(self.indices, minlength=self.oldlength) offset = numpy.zeros(self.oldlength, 'intp') offset[1:] = numpy.cumsum(N)[:-1] return mode.reduceat(recvbuffer, offset, out=out) if mode == 'mean': N = numpy.bincount(self.indices, minlength=self.oldlength) s = [self.oldlength] +  * (len(recvbuffer.shape) - 1) N = N.reshape(s) out = bincountv(self.indices, recvbuffer, minlength=self.oldlength, out=out) out[...] /= N return out if mode == 'any': if out is None: out = numpy.zeros(self.oldlength, dtype=dtype) out[self.indices] = recvbuffer return out raise NotImplementedError
[docs]class GridND(object): """ GridND is domain decomposition on a uniform grid of N dimensions. The total number of domains is prod([ len(dir) - 1 for dir in edges]). Attributes ---------- edges : list (Ndim) A list of edges of the edges in each dimension. edges[i] is the edges on direction i. edges[i] includes 0 and BoxSize. comm : :py:class:`MPI.Comm` MPI Communicator, default is :code:`MPI.COMM_WORLD` periodic : boolean Is the domain decomposition periodic? If so , edges[i][-1] is the period. """ from ._domain import gridnd_fill as _fill _fill = staticmethod(_fill) @staticmethod def _digitize(data, bins, right=False): if len(data) == 0: return numpy.empty((0), dtype='intp') else: return numpy.digitize(data, bins, right)
[docs] @classmethod def uniform(cls, BoxSize, comm=MPI.COMM_WORLD, periodic=True): ndim = len(BoxSize) # compute a optimal shape where each domain is as cubical as possible r = (1.0 * comm.size / numpy.prod(BoxSize) * min(BoxSize)) ** (1.0 / ndim) shape = [ r * (BoxSize[i] / min(BoxSize)) for i in range(ndim)] shape = numpy.array(shape) imax = shape.argmax() shape = numpy.int32(shape) shape[shape < 1] = 1 shape[imax] = 1 shape[imax] = comm.size // numpy.prod(shape) assert numpy.prod(shape) <= comm.size edges = [] for i in range(ndim): edges.append(numpy.linspace(0, BoxSize[i], shape[i] + 1, endpoint=True)) return cls(edges, comm, periodic)
def __init__(self, edges, comm=MPI.COMM_WORLD, periodic=True, DomainAssign=None): """ DomainAssign records each domain is assigned to which rank """ self.shape = numpy.array([len(g) - 1 for g in edges], dtype='int32') self.ndim = len(self.shape) self.edges = numpy.asarray(edges) self.periodic = periodic self.comm = comm self.size = numpy.product(self.shape) if DomainAssign is None: if comm.size >= self.size: DomainAssign = numpy.array(range(self.size), dtype='int32') else: DomainAssign = numpy.empty(self.size, dtype='int32') for i in range(comm.size): start = i * self.size // comm.size end = (i + 1) * self.size // comm.size DomainAssign[start:end] = i self.DomainAssign = DomainAssign dd = numpy.zeros(self.shape, dtype='int16') for i, edge in enumerate(edges): edge = numpy.array(edge) dd1 = edge[1:] == edge[:-1] dd1 = dd1.reshape([-1 if ii == i else 1 for ii in range(self.ndim)]) dd[...] |= dd1 self.DomainDegenerate = dd.ravel() self._update_primary_regions()
[docs] def load(self, pos, transform=None, gamma=2): """ Returns the load of each domain, assuming that the load is a power-law N^gamma, where N is the number of particles within it. Parameters ---------- pos : array_like (, ndim) position of particles, ndim can be more than the dimenions of the domains, in which case only the first few directions are used. Returns ------- domainload : array_like The load of each domain. """ # FIXME: this function looks like decompose; might be able to merge them into one? pos = numpy.asarray(pos) assert pos.shape >= self.ndim if transform is None: transform = lambda x: x Npoint = len(pos) periodic = self.periodic if Npoint != 0: sil = numpy.empty((self.ndim, Npoint), dtype='i2', order='C') chunksize = 1024 * 48 for i in range(0, Npoint, chunksize): s = slice(i, i + chunksize) chunk = transform(pos[s]) for j in range(self.ndim): if periodic: tmp = numpy.remainder(chunk[:, j], self.edges[j][-1]) else: tmp = chunk[:, j] sil[j, s] = self._digitize(tmp, self.edges[j]) - 1 if periodic: mode = 'raise' # periodic box, must be in 0 ~ self.shape due to remainder else: mode = 'clip' # non periodic box, assign particles outside edges to the edge domains. # FIXME: perhaps better to raise here? it may be very unbalanced if # the edge is very far off! particle_domain = numpy.ravel_multi_index(sil, self.shape, mode=mode) tmp = numpy.bincount(particle_domain, minlength=self.size) else: tmp = numpy.zeros(self.size) domainload = self.comm.allreduce(tmp, op=MPI.SUM) domainload = domainload ** gamma return domainload