Skip to content
2 changes: 1 addition & 1 deletion devito/ir/iet/visitors.py
Original file line number Diff line number Diff line change
Expand Up @@ -1494,7 +1494,7 @@ def visit_ParallelTree(self, o):
def visit_HaloSpot(self, o):
hs = o.halo_scheme
fmapper = {self.mapper.get(k, k): v for k, v in hs.fmapper.items()}
halo_scheme = hs.build(fmapper, hs.honored)
halo_scheme = hs._rebuild(fmapper=fmapper)
body = self._visit(o.body)
return o._rebuild(halo_scheme=halo_scheme, body=body)

Expand Down
16 changes: 11 additions & 5 deletions devito/mpi/distributed.py
Original file line number Diff line number Diff line change
Expand Up @@ -261,7 +261,7 @@ def nprocs_local(self):

@property
def topology(self):
return DimensionTuple(*self._topology, getters=self.dimensions)
return self._topology

@property
def topology_logical(self):
Expand Down Expand Up @@ -353,7 +353,9 @@ def __init__(self, shape, dimensions, input_comm=None, topology=None):
self._topology = compute_dims(self._input_comm.size, len(shape))
else:
# A custom topology may contain integers or the wildcard '*'
self._topology = CustomTopology(topology, self._input_comm)
self._topology = CustomTopology(
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Might want to have compute_dims return a DimensionTuple as well

topology, self._input_comm, getters=dimensions
)

if self._input_comm is not input_comm:
# By default, Devito arranges processes into a cartesian topology.
Expand Down Expand Up @@ -896,7 +898,7 @@ def _arg_values(self, *args, **kwargs):
return self._arg_defaults()


class CustomTopology(tuple):
class CustomTopology(DimensionTuple):

"""
The CustomTopology class provides a mechanism to describe parametric domain
Expand Down Expand Up @@ -954,7 +956,7 @@ class CustomTopology(tuple):
'xy': ('*', '*', 1),
}

def __new__(cls, items, input_comm):
def __new__(cls, items, input_comm, **kwargs):
# Keep track of nstars and already defined decompositions
nstars = items.count('*')

Expand Down Expand Up @@ -992,11 +994,15 @@ def __new__(cls, items, input_comm):
# Final check that topology matches the communicator size
assert np.prod(processed) == input_comm.size

obj = super().__new__(cls, processed)
obj = super().__new__(cls, *processed, **kwargs)
obj.logical = items

return obj

def __repr__(self):
return (f"CustomTopology(logical={self.logical}, "
f"physical={super().__repr__()})")


def compute_dims(nprocs, ndim):
# We don't do anything clever here. In fact, we do something very basic --
Expand Down
125 changes: 85 additions & 40 deletions devito/mpi/halo_scheme.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,9 +7,9 @@
import sympy
from sympy import Max, Min

from devito import configuration
from devito.data import CENTER, CORE, LEFT, OWNED, RIGHT
from devito.ir.support import Forward, Scope
from devito.symbolics import IntDiv
from devito.symbolics.manipulation import _uxreplace_registry
from devito.tools import (
EnrichedTuple, Reconstructable, Tag, as_tuple, filter_ordered, filter_sorted, flatten,
Expand Down Expand Up @@ -137,11 +137,9 @@ def __init__(self, exprs, ispace):
# Derive the halo exchanges
self._mapper = frozendict(classify(exprs, ispace))

# Track the IterationSpace offsets induced by SubDomains/SubDimensions.
# These should be honored in the derivation of the `omapper`
# Track the IterationSpace offsets induced by SubDomains/SubDimensions,
# which are honored in the derivation of the `omapper`
self._honored = {}
# SubDimensions are not necessarily included directly in
# ispace.dimensions and hence we need to first utilize the `_defines` method
dims = set().union(*[d._defines for d in ispace.dimensions
if d._defines & self.dimensions])
subdims = [d for d in dims if d.is_Sub and not d.local]
Expand All @@ -150,6 +148,12 @@ def __init__(self, exprs, ispace):
self._honored[i.root] = frozenset([(ltk, rtk)])
self._honored = frozendict(self._honored)

# Further constraints on the `omapper` derivation. At construction time
# there's none, but lowering passes may change this
# * `_alignment` may be a positive integer representing the alignment
# requirement, in number of *elements*, of the underlying expressions
self._alignment = None

def __repr__(self):
fnames = ",".join(i.name for i in set(self._mapper))
return f"HaloScheme<{fnames}>"
Expand All @@ -165,11 +169,22 @@ def __len__(self):
def __hash__(self):
return hash((self._mapper.__hash__(), self.honored.__hash__()))

@classmethod
def build(cls, fmapper, honored):
def _rebuild(self, fmapper=None, honored=None, alignment=None):
"""
Rebuild a HaloScheme from the provided `fmapper` and `honored`. Reuse
`self`'s values for the missing arguments.
"""
obj = object.__new__(HaloScheme)

if fmapper is None:
fmapper = self._mapper
if honored is None:
honored = self._honored

obj._mapper = frozendict(fmapper)
obj._honored = frozendict(honored)
obj._alignment = alignment or self._alignment

return obj

@classmethod
Expand Down Expand Up @@ -223,7 +238,7 @@ def union(self, halo_schemes):
for d, v in i.honored.items():
honored[d] = honored.get(d, frozenset()) | v

return HaloScheme.build(fmapper, honored)
return i._rebuild(fmapper=fmapper, honored=honored)

@property
def honored(self):
Expand All @@ -241,10 +256,14 @@ def is_void(self):
@cached_property
def omapper(self):
"""
Logical decomposition of the DOMAIN region into OWNED and CORE sub-regions.
Logical decomposition of the DOMAIN region into OWNED and CORE sub-regions,
"cumulative" over all DiscreteFunctions in the HaloScheme.

This is "cumulative" over all DiscreteFunctions in the HaloScheme; it also
takes into account IterationSpace offsets induced by SubDomains/SubDimensions.
The computed OMapper takes into account:

* The offsets induced by SubDomains/SubDimensions ("thickness");
* Any data alignment requirement of the underlying expressions
(`_alignment` attribute).

Examples
--------
Expand Down Expand Up @@ -366,28 +385,62 @@ def omapper(self):

if s is CENTER:
where.append((d, CORE, s))
mapper[d] = (d.symbolic_min + osl,
d.symbolic_max - osr)

mapper[d] = (
d.symbolic_min + osl,
d.symbolic_max - osr
)

if nl != 0:
mapper[nl] = (Max(nl - osl, 0),)
if nr != 0:
mapper[nr] = (Max(nr - osr, 0),)
else:
where.append((d, OWNED, s))

if s is LEFT:
mapper[d] = (d.symbolic_min,
Min(d.symbolic_min + osl - 1, d.symbolic_max - nr))
mapper[d] = (
d.symbolic_min,
Min(d.symbolic_min + osl - 1, d.symbolic_max - nr)
)

if nl != 0:
mapper[nl] = (nl,)
mapper[nr] = (0,)
else:
mapper[d] = (Max(d.symbolic_max - osr + 1, d.symbolic_min + nl),
d.symbolic_max)
mapper[d] = (
Max(d.symbolic_max - osr + 1, d.symbolic_min + nl),
d.symbolic_max
)

if nr != 0:
mapper[nl] = (0,)
mapper[nr] = (nr,)

processed.append((tuple(where), frozendict(mapper)))

# Apply the alignment constraints, if any
# First, get the fastest varying (contiguous) Dimension, which is the
# one that matters for alignment
if self._alignment:
fvds = {f.dimensions[-1] for f in self.fmapper}
if len(fvds) != 1:
raise HaloSchemeException(
"Unexpected contiguous Dimensions found while computing the "
f"`omapper`: {fvds}"
)
fvd = fvds.pop()

for i, (where, mapper) in enumerate(list(processed)):
try:
m, M = mapper[fvd]
except KeyError:
continue

aligned_m = IntDiv(m, self._alignment) * self._alignment

processed[i] = (where, frozendict({**mapper, fvd: (aligned_m, M)}))

_, core = processed.pop(0)
owned = processed

Expand Down Expand Up @@ -483,15 +536,15 @@ def project(self, functions):
to the provided `functions`.
"""
fmapper = {f: v for f, v in self.fmapper.items() if f in as_tuple(functions)}
return HaloScheme.build(fmapper, self.honored)
return self._rebuild(fmapper=fmapper)

def drop(self, functions):
"""
Create a new HaloScheme that contains all entries in `self` except those
corresponding to the provided `functions`.
"""
fmapper = {f: v for f, v in self.fmapper.items() if f not in as_tuple(functions)}
return HaloScheme.build(fmapper, self.honored)
return self._rebuild(fmapper=fmapper)

def add(self, f, hse):
"""
Expand All @@ -503,7 +556,7 @@ def add(self, f, hse):
if f in fmapper:
hse = fmapper[f].union(hse)
fmapper[f] = hse
return HaloScheme.build(fmapper, self.honored)
return self._rebuild(fmapper=fmapper)

def merge(self, hs):
"""
Expand All @@ -512,20 +565,14 @@ def merge(self, hs):
fmapper = dict(self.fmapper)
for f, hse in hs.fmapper.items():
fmapper[f] = fmapper.get(f, hse).merge(hse)
return HaloScheme.build(fmapper, self.honored)
return self._rebuild(fmapper=fmapper)


def classify(exprs, ispace):
"""
Produce the mapper `Function -> HaloSchemeEntry`, which describes the necessary
halo exchanges in the given Scope.
"""

# Some MPI modes require pulling the `loc_indices` from the reads, others
# from the writes. It essentially depends on whether the halo exchange is
# performed before (reads) or after (writes) the OWNED region is computed
loc_indices_from_reads = configuration['mpi'] not in ('dual',)

scope = Scope(exprs)

mapper = {}
Expand Down Expand Up @@ -565,15 +612,17 @@ def classify(exprs, ispace):
else:
v[(d, LEFT)] = STENCIL
v[(d, RIGHT)] = STENCIL
elif loc_indices_from_reads:
else:
v[(d, i[d])] = NONE

# Does `i` actually require a halo exchange?
if not any(hl is STENCIL for hl in v.values()):
continue

# Derive diagonal halo exchanges from the previous analysis
combs = list(product([LEFT, CENTER, RIGHT], repeat=len(f._dist_dimensions)))
combs = list(
product([LEFT, CENTER, RIGHT], repeat=len(f._dist_dimensions))
)
combs.remove((CENTER,)*len(f._dist_dimensions))
for c in combs:
key = (f._dist_dimensions, c)
Expand All @@ -598,13 +647,6 @@ def classify(exprs, ispace):
if not halo_labels:
continue

# Augment `halo_labels` with `loc_indices`-related information if necessary
if not loc_indices_from_reads:
for i in scope.writes.get(f, []):
for d in i.findices:
if not f.grid.is_distributed(d):
halo_labels[(d, i[d])].add(NONE)

# Separate halo-exchange Dimensions from `loc_indices`
raw_loc_indices, halos = defaultdict(list), []
for (d, s), hl in halo_labels.items():
Expand All @@ -613,15 +655,18 @@ def classify(exprs, ispace):
if not hl:
continue
elif len(hl) > 1:
raise HaloSchemeException("Inconsistency found while building a halo "
f"scheme for `{f}` along Dimension `{d}`")
raise HaloSchemeException(
"Inconsistency found while building a halo scheme for "
f"`{f}` along Dimension `{d}`")
elif hl.pop() is STENCIL:
halos.append(Halo(d, s))
elif d._defines & set(ispace.itdims):
raw_loc_indices[d].append(s)

loc_indices, loc_dirs = process_loc_indices(raw_loc_indices,
ispace.directions)
loc_indices, loc_dirs = process_loc_indices(
raw_loc_indices, ispace.directions
)

mapper[f] = HaloSchemeEntry(loc_indices, loc_dirs, halos, dims)

return mapper
Expand Down
43 changes: 6 additions & 37 deletions devito/mpi/routines.py
Original file line number Diff line number Diff line change
Expand Up @@ -957,36 +957,6 @@ def _call_remainder(self, remainder):
return remainder


class DualHaloExchangeBuilder(Overlap2HaloExchangeBuilder):

"""
"Dual" of Overlap2HaloExchangeBuilder, as the "remainder" is now the first
thing getting computed.

Generates:

remainder()
haloupdate()
compute_core()
halowait()
"""

def _make_body(self, callcompute, remainder, haloupdates, halowaits):
body = []

assert remainder is not None
body.append(self._call_remainder(remainder))

body.append(HaloUpdateList(body=haloupdates))

assert callcompute is not None
body.append(callcompute)

body.append(HaloWaitList(body=halowaits))

return List(body=body)


class FullHaloExchangeBuilder(Overlap2HaloExchangeBuilder):

"""
Expand Down Expand Up @@ -1058,7 +1028,6 @@ def _call_poke(self, poke):
'overlap': OverlapHaloExchangeBuilder,
'overlap2': Overlap2HaloExchangeBuilder,
'full': FullHaloExchangeBuilder,
'dual': DualHaloExchangeBuilder
}


Expand Down Expand Up @@ -1117,6 +1086,12 @@ def __init__(self, arguments, **kwargs):
super().__init__('MPI_Irecv', arguments)


class AllreduceCall(Call):

def __init__(self, arguments, **kwargs):
super().__init__('MPI_Allreduce', arguments, **kwargs)


class MPICall(Call):

@property
Expand Down Expand Up @@ -1426,12 +1401,6 @@ def _arg_values(self, args=None, **kwargs):
return values


class AllreduceCall(Call):

def __init__(self, arguments, **kwargs):
super().__init__('MPI_Allreduce', arguments, **kwargs)


class ReductionBuilder:

"""
Expand Down
Loading
Loading