# -*- coding: utf-8 -*-
"""
Module to handle MPI parallel computation via the mpi4py module [1].
"""
import numpy as np
from numpy.typing import NDArray
from mbtrack2.tracking.particles import Beam
[docs]
class Mpi:
"""
Class which handle parallel computation via the mpi4py module [1].
Non-empty bunches are distributed across MPI ranks as contiguous blocks:
rank r owns bunches bunch_index[displacements[r]:displacements[r] + counts[r]],
where bunch_index = np.where(filling_pattern)[0]. The number of ranks must
not exceed the number of non-empty bunches, but any smaller rank count is
allowed (a single rank can own several bunches).
Outputs of share_* methods are dicts keyed by bunch number (absolute index
into filling_pattern). Every key is an element of bunch_index, so callers
write beam.mpi.tau_center[bunch_num].
Parameters
----------
filling_pattern : bool array of shape (h,)
Filling pattern of the beam, like Beam.filling_pattern
Attributes
----------
comm : MPI.Intracomm object
MPI intra-communicator of the processor group.
rank : int
Rank of the current processor.
size : int
Number of processors in the communicator.
bunch_index : int array of shape (n_bunches,)
Non-empty bunch numbers.
counts : int array of shape (size,)
Number of bunches owned by each rank.
displacements : int array of shape (size,)
Starting position (in bunch_index) of each rank's block.
bunch_to_rank_arr : int array of shape (n_bunches,)
Rank that owns each non-empty bunch.
rank_to_bunches : dict[int, np.ndarray]
Bunch numbers owned by each rank.
bunch_num_to_rank : dict[int, int]
Map from bunch number to the rank that owns it.
bunch_numbers : list of int
Bunch numbers owned by the current processor.
bunch_numbers_set : set of int
Set version of bunch_numbers for fast membership tests.
local_count : int
Number of bunches owned by the current processor.
local_displacement : int
Offset of the current rank's block inside bunch_index.
Methods
-------
write_table(filling_pattern)
Build the rank-bunch distribution tables.
rank_to_bunch(rank)
Return the bunch numbers owned by rank.
bunch_to_rank(bunch_num)
Return the rank that owns bunch_num.
share_distributions(beam)
Compute and share bunch profiles across all ranks.
share_means(beam)
Compute and share bunch means across all ranks.
share_stds(beam)
Compute and share bunch standard deviations across all ranks.
share_scalar(beam, attr)
Gather a scalar per-bunch attribute across all ranks.
References
----------
[1] L. Dalcin, P. Kler, R. Paz, and A. Cosimo, Parallel Distributed
Computing using Python, Advances in Water Resources, 34(9):1124-1139, 2011.
"""
[docs]
def __init__(self, filling_pattern: NDArray):
from mpi4py import MPI
self.MPI = MPI
self.comm = MPI.COMM_WORLD
self.rank = self.comm.Get_rank()
self.size = self.comm.Get_size()
self.write_table(filling_pattern)
[docs]
def write_table(self, filling_pattern: NDArray):
"""
Build the rank-bunch distribution tables.
Non-empty bunches are split into contiguous blocks. The first
n_bunches % size ranks receive one extra bunch so the partition is as
balanced as possible.
Parameters
----------
filling_pattern : bool array of shape (h,)
Filling pattern of the beam, like Beam.filling_pattern
"""
bunch_index = np.where(filling_pattern)[0].astype(int)
n_bunches = bunch_index.size
if self.size > n_bunches:
raise ValueError(
f"The number of processors ({self.size}) must not exceed the "
f"number of non-empty bunches ({n_bunches}).")
base, extra = divmod(n_bunches, self.size)
counts = np.full(self.size, base, dtype=int)
counts[:extra] += 1
displacements = np.concatenate(
([0], np.cumsum(counts[:-1]))).astype(int)
bunch_to_rank_arr = np.empty(n_bunches, dtype=int)
rank_to_bunches = {}
bunch_num_to_rank = {}
for r in range(self.size):
start = displacements[r]
stop = start + counts[r]
bunch_to_rank_arr[start:stop] = r
rank_to_bunches[r] = bunch_index[start:stop].copy()
for b in bunch_index[start:stop]:
bunch_num_to_rank[int(b)] = r
self.bunch_index = bunch_index
self.counts = counts
self.displacements = displacements
self.bunch_to_rank_arr = bunch_to_rank_arr
self.rank_to_bunches = rank_to_bunches
self.bunch_num_to_rank = bunch_num_to_rank
self.bunch_numbers = [int(b) for b in rank_to_bunches[self.rank]]
self.bunch_numbers_set = set(self.bunch_numbers)
self.local_count = int(counts[self.rank])
self.local_displacement = int(displacements[self.rank])
[docs]
def rank_to_bunch(self, rank: int) -> NDArray:
"""
Return the bunch numbers owned by rank.
Parameters
----------
rank : int
Returns
-------
bunch_numbers : np.ndarray of int
Bunch numbers (absolute indices into filling_pattern) owned by rank.
"""
return self.rank_to_bunches[rank]
[docs]
def bunch_to_rank(self, bunch_num: int) -> int | None:
"""
Return the rank that owns bunch_num.
Parameters
----------
bunch_num : int
Absolute bunch number (index into filling_pattern).
Returns
-------
rank : int or None
Rank that tracks bunch_num, or None if bunch_num is empty.
"""
rank = self.bunch_num_to_rank.get(bunch_num)
if rank is None:
print(f"The bunch {bunch_num} is not tracked on any processor.")
return rank
[docs]
def _allgatherv(self, local: NDArray, inner_size: int, dtype,
mpi_dtype) -> NDArray:
"""
Gather per-bunch arrays from all ranks into a single shared array.
Each rank contributes local_count rows. The result concatenates all
ranks' contributions in bunch order, producing one row per non-empty
bunch aligned with self.bunch_index.
Parameters
----------
local : np.ndarray of shape (local_count, inner_size)
Local data array for the bunches owned by the current rank.
inner_size : int
Number of elements per bunch (columns in the result).
dtype : numpy dtype
NumPy data type for local and the returned array.
mpi_dtype : MPI.Datatype
Matching MPI datatype (e.g. MPI.DOUBLE for np.float64).
Returns
-------
result : np.ndarray of shape (n_bunches, inner_size)
Row j contains data for bunch self.bunch_index[j].
"""
n_bunches = self.bunch_index.size
result = np.empty((n_bunches, inner_size), dtype=dtype)
local = np.ascontiguousarray(local, dtype=dtype)
sendbuf = [local, self.local_count * inner_size, mpi_dtype]
recvbuf = [
result,
(self.counts * inner_size).astype(int),
(self.displacements * inner_size).astype(int),
mpi_dtype,
]
self.comm.Allgatherv(sendbuf, recvbuf)
return result
[docs]
def share_scalar(self, beam: Beam, attr: str) -> dict[int, float]:
"""
Gather a scalar per-bunch attribute across all ranks.
Parameters
----------
beam : Beam object
attr : str
Name of the Bunch attribute to gather. Must be a scalar float.
Returns
-------
result : dict[int, float]
Maps bunch number to the gathered attribute value.
"""
local_vals = np.array(
[getattr(beam[bn], attr) for bn in self.bunch_numbers],
dtype=np.float64).reshape(self.local_count, 1)
arr = self._allgatherv(local_vals, 1, np.float64,
self.MPI.DOUBLE).reshape(-1)
return {
int(self.bunch_index[j]): float(arr[j])
for j in range(self.bunch_index.size)
}
[docs]
def share_distributions(self,
beam: Beam,
dimensions: str | list[str] = "tau",
n_bin: int = 75):
"""
Compute the bunch profiles and share them across all processors.
Produces the following attributes keyed by bunch number (int):
- self.<dim>_center : dict[int, np.ndarray of shape (n_bin,)]
- self.<dim>_profile : dict[int, np.ndarray of shape (n_bin,)]
- self.<dim>_bin_length : dict[int, float]
- self.<dim>_sorted_index : dict[int, np.ndarray | None],
locally-owned bunches only.
- self.charge_per_mp_all : dict[int, float]
Parameters
----------
beam : Beam object
dimensions : str or list of str, optional
n_bin : int or list of int, optional
"""
if (beam.mpi_switch == False):
print("Error, mpi is not initialised.")
if isinstance(dimensions, str):
dimensions = [dimensions]
if isinstance(n_bin, int):
n_bin = np.ones((len(dimensions), ), dtype=int) * n_bin
self.charge_per_mp_all = self.share_scalar(beam, 'charge_per_mp')
local_bunches = self.bunch_numbers
local_count = self.local_count
for i in range(len(dimensions)):
dim = dimensions[i]
n = int(n_bin[i])
local_profile = np.zeros((local_count, n), dtype=np.int64)
local_center = np.zeros((local_count, n), dtype=np.float64)
local_bin_length = np.zeros((local_count, 1), dtype=np.float64)
sorted_index_map = {}
for k, bn in enumerate(local_bunches):
bunch = beam[bn]
if not bunch.is_empty:
bins, sorted_index, profile, center = bunch.binning(
dimension=dim, n_bin=n)
local_profile[k, :] = profile
local_center[k, :] = center
local_bin_length[k, 0] = bins[1] - bins[0]
sorted_index_map[bn] = sorted_index
else:
sorted_index_map[bn] = None
if beam.filling_pattern[bn] is True:
beam.update_filling_pattern()
beam.update_distance_between_bunches()
center_arr = self._allgatherv(local_center, n, np.float64,
self.MPI.DOUBLE)
profile_arr = self._allgatherv(local_profile, n, np.int64,
self.MPI.INT64_T)
bin_len_arr = self._allgatherv(local_bin_length, 1, np.float64,
self.MPI.DOUBLE).reshape(-1)
self.__setattr__(
dim + "_center", {
int(self.bunch_index[j]): center_arr[j]
for j in range(self.bunch_index.size)
})
self.__setattr__(
dim + "_profile", {
int(self.bunch_index[j]): profile_arr[j]
for j in range(self.bunch_index.size)
})
self.__setattr__(
dim + "_bin_length", {
int(self.bunch_index[j]): float(bin_len_arr[j])
for j in range(self.bunch_index.size)
})
self.__setattr__(dim + "_sorted_index", sorted_index_map)
[docs]
def share_means(self, beam: Beam):
"""
Compute and share bunch means across all processors.
Produces:
- self.mean_all : dict[int, np.ndarray of shape (6,)], keyed by bunch number.
- self.charge_all : dict[int, float], keyed by bunch number.
Parameters
----------
beam : Beam object
"""
if (beam.mpi_switch == False):
print("Error, mpi is not initialised.")
self.charge_all = self.share_scalar(beam, 'charge')
local_mean = np.zeros((self.local_count, 6), dtype=np.float64)
for k, bn in enumerate(self.bunch_numbers):
bunch = beam[bn]
if not bunch.is_empty:
local_mean[k, :] = bunch.mean
mean_arr = self._allgatherv(local_mean, 6, np.float64, self.MPI.DOUBLE)
self.mean_all = {
int(self.bunch_index[j]): mean_arr[j]
for j in range(self.bunch_index.size)
}
[docs]
def share_stds(self, beam: Beam):
"""
Compute and share bunch standard deviations across all processors.
Produces:
- self.std_all : dict[int, np.ndarray of shape (6,)], keyed by bunch number.
- self.charge_all : dict[int, float], keyed by bunch number.
Parameters
----------
beam : Beam object
"""
if (beam.mpi_switch == False):
print("Error, mpi is not initialised.")
self.charge_all = self.share_scalar(beam, 'charge')
local_std = np.zeros((self.local_count, 6), dtype=np.float64)
for k, bn in enumerate(self.bunch_numbers):
bunch = beam[bn]
if not bunch.is_empty:
local_std[k, :] = bunch.std
std_arr = self._allgatherv(local_std, 6, np.float64, self.MPI.DOUBLE)
self.std_all = {
int(self.bunch_index[j]): std_arr[j]
for j in range(self.bunch_index.size)
}