Source code for mbtrack2.tracking.parallel

# -*- coding: utf-8 -*-
"""
Module to handle MPI parallel computation via the mpi4py module [1].
"""
import numpy as np
from numpy.typing import NDArray

from mbtrack2.tracking.particles import Beam


[docs] class Mpi: """ Class which handle parallel computation via the mpi4py module [1]. Non-empty bunches are distributed across MPI ranks as contiguous blocks: rank r owns bunches bunch_index[displacements[r]:displacements[r] + counts[r]], where bunch_index = np.where(filling_pattern)[0]. The number of ranks must not exceed the number of non-empty bunches, but any smaller rank count is allowed (a single rank can own several bunches). Outputs of share_* methods are dicts keyed by bunch number (absolute index into filling_pattern). Every key is an element of bunch_index, so callers write beam.mpi.tau_center[bunch_num]. Parameters ---------- filling_pattern : bool array of shape (h,) Filling pattern of the beam, like Beam.filling_pattern Attributes ---------- comm : MPI.Intracomm object MPI intra-communicator of the processor group. rank : int Rank of the current processor. size : int Number of processors in the communicator. bunch_index : int array of shape (n_bunches,) Non-empty bunch numbers. counts : int array of shape (size,) Number of bunches owned by each rank. displacements : int array of shape (size,) Starting position (in bunch_index) of each rank's block. bunch_to_rank_arr : int array of shape (n_bunches,) Rank that owns each non-empty bunch. rank_to_bunches : dict[int, np.ndarray] Bunch numbers owned by each rank. bunch_num_to_rank : dict[int, int] Map from bunch number to the rank that owns it. bunch_numbers : list of int Bunch numbers owned by the current processor. bunch_numbers_set : set of int Set version of bunch_numbers for fast membership tests. local_count : int Number of bunches owned by the current processor. local_displacement : int Offset of the current rank's block inside bunch_index. Methods ------- write_table(filling_pattern) Build the rank-bunch distribution tables. rank_to_bunch(rank) Return the bunch numbers owned by rank. bunch_to_rank(bunch_num) Return the rank that owns bunch_num. share_distributions(beam) Compute and share bunch profiles across all ranks. share_means(beam) Compute and share bunch means across all ranks. share_stds(beam) Compute and share bunch standard deviations across all ranks. share_scalar(beam, attr) Gather a scalar per-bunch attribute across all ranks. References ---------- [1] L. Dalcin, P. Kler, R. Paz, and A. Cosimo, Parallel Distributed Computing using Python, Advances in Water Resources, 34(9):1124-1139, 2011. """
[docs] def __init__(self, filling_pattern: NDArray): from mpi4py import MPI self.MPI = MPI self.comm = MPI.COMM_WORLD self.rank = self.comm.Get_rank() self.size = self.comm.Get_size() self.write_table(filling_pattern)
[docs] def write_table(self, filling_pattern: NDArray): """ Build the rank-bunch distribution tables. Non-empty bunches are split into contiguous blocks. The first n_bunches % size ranks receive one extra bunch so the partition is as balanced as possible. Parameters ---------- filling_pattern : bool array of shape (h,) Filling pattern of the beam, like Beam.filling_pattern """ bunch_index = np.where(filling_pattern)[0].astype(int) n_bunches = bunch_index.size if self.size > n_bunches: raise ValueError( f"The number of processors ({self.size}) must not exceed the " f"number of non-empty bunches ({n_bunches}).") base, extra = divmod(n_bunches, self.size) counts = np.full(self.size, base, dtype=int) counts[:extra] += 1 displacements = np.concatenate( ([0], np.cumsum(counts[:-1]))).astype(int) bunch_to_rank_arr = np.empty(n_bunches, dtype=int) rank_to_bunches = {} bunch_num_to_rank = {} for r in range(self.size): start = displacements[r] stop = start + counts[r] bunch_to_rank_arr[start:stop] = r rank_to_bunches[r] = bunch_index[start:stop].copy() for b in bunch_index[start:stop]: bunch_num_to_rank[int(b)] = r self.bunch_index = bunch_index self.counts = counts self.displacements = displacements self.bunch_to_rank_arr = bunch_to_rank_arr self.rank_to_bunches = rank_to_bunches self.bunch_num_to_rank = bunch_num_to_rank self.bunch_numbers = [int(b) for b in rank_to_bunches[self.rank]] self.bunch_numbers_set = set(self.bunch_numbers) self.local_count = int(counts[self.rank]) self.local_displacement = int(displacements[self.rank])
[docs] def rank_to_bunch(self, rank: int) -> NDArray: """ Return the bunch numbers owned by rank. Parameters ---------- rank : int Returns ------- bunch_numbers : np.ndarray of int Bunch numbers (absolute indices into filling_pattern) owned by rank. """ return self.rank_to_bunches[rank]
[docs] def bunch_to_rank(self, bunch_num: int) -> int | None: """ Return the rank that owns bunch_num. Parameters ---------- bunch_num : int Absolute bunch number (index into filling_pattern). Returns ------- rank : int or None Rank that tracks bunch_num, or None if bunch_num is empty. """ rank = self.bunch_num_to_rank.get(bunch_num) if rank is None: print(f"The bunch {bunch_num} is not tracked on any processor.") return rank
[docs] def _allgatherv(self, local: NDArray, inner_size: int, dtype, mpi_dtype) -> NDArray: """ Gather per-bunch arrays from all ranks into a single shared array. Each rank contributes local_count rows. The result concatenates all ranks' contributions in bunch order, producing one row per non-empty bunch aligned with self.bunch_index. Parameters ---------- local : np.ndarray of shape (local_count, inner_size) Local data array for the bunches owned by the current rank. inner_size : int Number of elements per bunch (columns in the result). dtype : numpy dtype NumPy data type for local and the returned array. mpi_dtype : MPI.Datatype Matching MPI datatype (e.g. MPI.DOUBLE for np.float64). Returns ------- result : np.ndarray of shape (n_bunches, inner_size) Row j contains data for bunch self.bunch_index[j]. """ n_bunches = self.bunch_index.size result = np.empty((n_bunches, inner_size), dtype=dtype) local = np.ascontiguousarray(local, dtype=dtype) sendbuf = [local, self.local_count * inner_size, mpi_dtype] recvbuf = [ result, (self.counts * inner_size).astype(int), (self.displacements * inner_size).astype(int), mpi_dtype, ] self.comm.Allgatherv(sendbuf, recvbuf) return result
[docs] def share_scalar(self, beam: Beam, attr: str) -> dict[int, float]: """ Gather a scalar per-bunch attribute across all ranks. Parameters ---------- beam : Beam object attr : str Name of the Bunch attribute to gather. Must be a scalar float. Returns ------- result : dict[int, float] Maps bunch number to the gathered attribute value. """ local_vals = np.array( [getattr(beam[bn], attr) for bn in self.bunch_numbers], dtype=np.float64).reshape(self.local_count, 1) arr = self._allgatherv(local_vals, 1, np.float64, self.MPI.DOUBLE).reshape(-1) return { int(self.bunch_index[j]): float(arr[j]) for j in range(self.bunch_index.size) }
[docs] def share_distributions(self, beam: Beam, dimensions: str | list[str] = "tau", n_bin: int = 75): """ Compute the bunch profiles and share them across all processors. Produces the following attributes keyed by bunch number (int): - self.<dim>_center : dict[int, np.ndarray of shape (n_bin,)] - self.<dim>_profile : dict[int, np.ndarray of shape (n_bin,)] - self.<dim>_bin_length : dict[int, float] - self.<dim>_sorted_index : dict[int, np.ndarray | None], locally-owned bunches only. - self.charge_per_mp_all : dict[int, float] Parameters ---------- beam : Beam object dimensions : str or list of str, optional n_bin : int or list of int, optional """ if (beam.mpi_switch == False): print("Error, mpi is not initialised.") if isinstance(dimensions, str): dimensions = [dimensions] if isinstance(n_bin, int): n_bin = np.ones((len(dimensions), ), dtype=int) * n_bin self.charge_per_mp_all = self.share_scalar(beam, 'charge_per_mp') local_bunches = self.bunch_numbers local_count = self.local_count for i in range(len(dimensions)): dim = dimensions[i] n = int(n_bin[i]) local_profile = np.zeros((local_count, n), dtype=np.int64) local_center = np.zeros((local_count, n), dtype=np.float64) local_bin_length = np.zeros((local_count, 1), dtype=np.float64) sorted_index_map = {} for k, bn in enumerate(local_bunches): bunch = beam[bn] if not bunch.is_empty: bins, sorted_index, profile, center = bunch.binning( dimension=dim, n_bin=n) local_profile[k, :] = profile local_center[k, :] = center local_bin_length[k, 0] = bins[1] - bins[0] sorted_index_map[bn] = sorted_index else: sorted_index_map[bn] = None if beam.filling_pattern[bn] is True: beam.update_filling_pattern() beam.update_distance_between_bunches() center_arr = self._allgatherv(local_center, n, np.float64, self.MPI.DOUBLE) profile_arr = self._allgatherv(local_profile, n, np.int64, self.MPI.INT64_T) bin_len_arr = self._allgatherv(local_bin_length, 1, np.float64, self.MPI.DOUBLE).reshape(-1) self.__setattr__( dim + "_center", { int(self.bunch_index[j]): center_arr[j] for j in range(self.bunch_index.size) }) self.__setattr__( dim + "_profile", { int(self.bunch_index[j]): profile_arr[j] for j in range(self.bunch_index.size) }) self.__setattr__( dim + "_bin_length", { int(self.bunch_index[j]): float(bin_len_arr[j]) for j in range(self.bunch_index.size) }) self.__setattr__(dim + "_sorted_index", sorted_index_map)
[docs] def share_means(self, beam: Beam): """ Compute and share bunch means across all processors. Produces: - self.mean_all : dict[int, np.ndarray of shape (6,)], keyed by bunch number. - self.charge_all : dict[int, float], keyed by bunch number. Parameters ---------- beam : Beam object """ if (beam.mpi_switch == False): print("Error, mpi is not initialised.") self.charge_all = self.share_scalar(beam, 'charge') local_mean = np.zeros((self.local_count, 6), dtype=np.float64) for k, bn in enumerate(self.bunch_numbers): bunch = beam[bn] if not bunch.is_empty: local_mean[k, :] = bunch.mean mean_arr = self._allgatherv(local_mean, 6, np.float64, self.MPI.DOUBLE) self.mean_all = { int(self.bunch_index[j]): mean_arr[j] for j in range(self.bunch_index.size) }
[docs] def share_stds(self, beam: Beam): """ Compute and share bunch standard deviations across all processors. Produces: - self.std_all : dict[int, np.ndarray of shape (6,)], keyed by bunch number. - self.charge_all : dict[int, float], keyed by bunch number. Parameters ---------- beam : Beam object """ if (beam.mpi_switch == False): print("Error, mpi is not initialised.") self.charge_all = self.share_scalar(beam, 'charge') local_std = np.zeros((self.local_count, 6), dtype=np.float64) for k, bn in enumerate(self.bunch_numbers): bunch = beam[bn] if not bunch.is_empty: local_std[k, :] = bunch.std std_arr = self._allgatherv(local_std, 6, np.float64, self.MPI.DOUBLE) self.std_all = { int(self.bunch_index[j]): std_arr[j] for j in range(self.bunch_index.size) }