Source code for mbtrack2.impedance.impedance_model

# -*- coding: utf-8 -*-
"""
Module where the ImpedanceModel class is defined.
"""
import pickle
from copy import deepcopy

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib import colormaps
from matplotlib.axes import Axes
from mpl_toolkits.axes_grid1.inset_locator import inset_axes
from numpy.typing import NDArray
from scipy.integrate import trapezoid
from scipy.interpolate import interp1d

from mbtrack2.impedance.wakefield import WakeField
from mbtrack2.utilities.misc import (
    Synchrotron,
    beam_loss_factor,
    double_sided_impedance,
    effective_impedance,
)
from mbtrack2.utilities.spectrum import (
    beam_spectrum,
    gaussian_bunch_spectrum,
    spectral_density,
)


[docs] class ImpedanceModel(): """ Define the impedance model of the machine. The model must be completed with successive add(...) and add_global(...) calls, then compute_sum() must be run. The transverse impedance and wake functions are beta weighted and divided by the beta at the tracking location (ring.optics.local_beta). Parameters ---------- ring : Synchrotron object average_beta : array-like of shape (2,), optional Average beta function used for global wakes normalization in [m]. The global wakes are normalized by average_beta / local_beta. If None and an AT lattice is loaded, average_beta is computed from the lattice. If None and an AT lattice is not loaded, average_beta is taken to be equal to local_beta, i.e. no normalization. The default is None. Attributes ---------- wakefields : list of WakeField objects WakeFields in the model. positions : list of arrays Positions corresponding the different WakeField objects in the model. names : list of str Names of the WakeField objects. sum : WakeField Sum of every WakeField in the model weigthed by beta functions. sum_"name" : WakeField Sum of the "name" Wakefield weigthed by beta functions. sum_names : array Names of attributes where the WakeFields are summed by name. globals : list of WakeField objects Globals WakeFields in the model. globals_names : list of str Names of the global WakeFields objects. Methods ------- add(wakefield, positions, name) Add the same WakeField object at different locations to the model. add_global(wakefield, name) Add a "global" WakeField object which will added to the sum WakeField but weighted only by the average beta functions. sum_beta(wake, beta) Weight a WakeField object by an array of beta functions. compute_sum_names() Compute the weighted WakeField for each WakeField object type. compute_sum() Compute the sum of all weighted WakeField into self.sum. plot_area(Z_type="Zlong", component="real", sigma=None, attr_list=None) Plot the contributions of different kind of WakeFields. save(file) Save impedance model to file. load(file) Load impedance model from file. """
[docs] def __init__(self, ring: Synchrotron, average_beta: NDArray | None = None): self.ring = ring self.optics = self.ring.optics self.wakefields = [] self.positions = [] self.names = [] self.globals = [] self.globals_names = [] self.sum_names = [] # global wake beta normalization if average_beta is None: average_beta = self.ring.optics.average_beta self.average_beta = np.expand_dims(np.array(average_beta), 1)
[docs] def add(self, wakefield: WakeField, positions: NDArray | float | int, name: str | None = None): """ Add the same WakeField object at different locations to the model. Parameters ---------- wakefield : WakeField WakeField object to add to the model. positions : array, float or int Array of longitudinal positions where the elements are loacted. name : str, optional Name of the element type. If None, the name of the WakeField object is used. The default is None. Returns ------- None. """ if name is None: name = wakefield.name if name is None: raise ValueError("Please give a valid name.") if name not in self.names: self.names.append(name) else: raise ValueError("This name is already taken.") self.wakefields.append(wakefield) self.positions.append(positions)
[docs] def add_global(self, wakefield: WakeField, name: str | None = None): """ Add a "global" WakeField object which will added to the sum WakeField but weighted only by the average beta functions. To use with "distributed" elements, for example a resistive wall wakefield computed from an effective radius (so which has already been weighted by beta functions). Parameters ---------- wakefield : WakeField WakeField object to add to the model. name : str, optional Name of the element type. If None, the name of the WakeField object is used. The default is None. Returns ------- None. """ self.globals.append(wakefield) if name is None: name = wakefield.name if name is None: raise ValueError("Please give a valid name.") if name not in self.globals_names: self.globals_names.append(name) else: raise ValueError("This name is already taken.")
# setattr(self, name, wakefield)
[docs] def sum_beta(self, wake: WakeField, beta: NDArray) -> WakeField: """ Weight a WakeField object by an array of beta functions. Parameters ---------- wake : WakeField WakeField element object. beta : array of shape (2, N) Beta function at the locations of the elements. Returns ------- wake_sum : WakeField WakeField object weighted by beta functions. """ wake_sum = deepcopy(wake) local_beta = self.ring.optics.local_beta for component_name in wake_sum.components: comp = getattr(wake_sum, component_name) weight = (beta[0, :]**comp.power_x) * (beta[1, :]**comp.power_y) if comp.plane == "x": weight = weight.sum() / local_beta[0] if comp.plane == "y": weight = weight.sum() / local_beta[1] else: weight = weight.sum() setattr(wake_sum, component_name, weight * comp) return wake_sum
[docs] def compute_sum_names(self): """ Compute the weighted WakeField for each WakeField object type. The new summed WakeField object are set to into self.sum_name. """ for idx, wake in enumerate(self.wakefields): attribute_name = "sum_" + self.names[idx] beta = self.optics.beta(self.positions[idx]) wake_sum = self.sum_beta(wake, beta) wake_sum.name = attribute_name setattr(self, attribute_name, wake_sum) self.sum_names.append(attribute_name)
[docs] def compute_sum(self): """ Compute the sum of all weighted WakeField into self.sum. """ self.compute_sum_names() for i, name in enumerate(self.sum_names): if i == 0: self.sum = deepcopy(getattr(self, name)) self.sum.name = "sum" else: wake2 = getattr(self, name) for component_name2 in wake2.components: comp2 = getattr(wake2, component_name2) try: comp1 = getattr(self.sum, component_name2) setattr(self.sum, component_name2, comp1 + comp2) except AttributeError: setattr(self.sum, component_name2, comp2) for i, wake2 in enumerate(self.globals): wake2 = self.sum_beta(wake2, self.average_beta) name = self.globals_names[i] setattr(self, name, wake2) self.sum_names.append(name) if not hasattr(self, "sum"): self.sum = deepcopy(wake2) self.sum.name = "sum" else: for component_name2 in wake2.components: comp2 = getattr(wake2, component_name2) try: comp1 = getattr(self.sum, component_name2) setattr(self.sum, component_name2, comp1 + comp2) except AttributeError: setattr(self.sum, component_name2, comp2)
[docs] def group_attributes(self, string_in_name: str, names_to_group: list[str] | None = None): """ Group attributes in the ImpedanceModel based on a string pattern or explicit list. Parameters ---------- string_in_name : str The string pattern used to match attribute names for grouping. If `names_to_group` is provided, this is the name of the new grouped attribute. names_to_group : list of str, optional list of attribute names to be explicitly grouped. If not provided, attributes matching `string_in_name` are automatically selected. Defaults to None. """ # detection of names to group attrs = self.sum_names if names_to_group is None: names_to_group = [] for attr in attrs: if string_in_name in attr: names_to_group.append(attr) # new WakeField with all everything summed up wakefields = [] for name in names_to_group: wakefields.append(getattr(self, name)) beta = np.ones((len(names_to_group), 2)) new_group = WakeField.add_several_wakefields(wakefields, beta) setattr(self, string_in_name, new_group) self.sum_names.append(string_in_name) # remove grouped elements for name in names_to_group: self.sum_names.remove(name) delattr(self, name)
[docs] def rename_attribute(self, old_name: str, new_name: str): """ Rename an attribute in the ImpedanceModel. Parameters ---------- old_name : str The current name of the attribute to be renamed. new_name : str The new name for the attribute. """ self.__dict__[new_name] = self.__dict__.pop(old_name)
[docs] def plot_area(self, Z_type: str = "Zlong", component: str = "real", sigma: float | None = None, attr_list: list[str] | None = None, zoom: bool = False, ax: Axes | None = None) -> Axes: """ Plot the contributions of different kind of WakeFields. Parameters ---------- Z_type : str, optional Type of impedance to plot. component : str, optional Component to plot, can be "real" or "imag". sigma : float, optional RMS bunch length in [s] to use for the spectral density. If equal to None, the spectral density is not plotted. attr_list : list or array of str, optional Attributes to plot. zoom : bool If True, add a zoomed plot on top right corner. ax : Axes, optional Axes where the plot is displayed. If None, a new figure is created. Return ------ ax : Axes Axes with the plot on it. """ if attr_list is None: attr_list = self.sum_names # manage legend Ztype_dict = { "Zlong": 0, "Zxdip": 1, "Zydip": 2, "Zxquad": 3, "Zyquad": 4 } scale = [1e-3, 1e-6, 1e-6, 1e-6, 1e-6] label_list = [ r"$Z_\mathrm{long} \; (\mathrm{k}\Omega)$", r"$\frac{1}{\beta_0} \sum_{j} \beta_{x,j} Z_{x,j}^\mathrm{Dip} \; (\mathrm{M}\Omega/\mathrm{m})$", r"$\frac{1}{\beta_0} \sum_{j} \beta_{y,j} Z_{y,j}^\mathrm{Dip} \; (\mathrm{M}\Omega/\mathrm{m})$", r"$\frac{1}{\beta_0} \sum_{j} \beta_{x,j} Z_{x,j}^\mathrm{Quad} \; (\mathrm{M}\Omega/\mathrm{m})$", r"$\frac{1}{\beta_0} \sum_{j} \beta_{y,j} Z_{y,j}^\mathrm{Quad} \; (\mathrm{M}\Omega/\mathrm{m})$" ] leg = Ztype_dict[Z_type] # sort plot by decresing area size area = np.zeros((len(attr_list), )) for index, attr in enumerate(attr_list): try: sum_imp = getattr(getattr(self, attr), Z_type) area[index] = trapezoid(sum_imp.data[component], sum_imp.data.index) except AttributeError: pass sorted_index = area.argsort()[::-1] # Init fig if ax is None: fig = plt.figure() ax = fig.gca() zero_impedance = getattr(self.sum, Z_type) * 0 total_imp = 0 legend = [] if sigma is not None: legend.append("Spectral density for sigma = " + str(sigma) + " s") # Main plot colorblind = colormaps['tab10'].colors for index in sorted_index: attr = attr_list[index] # Set all impedances with common indexes using + zero_impedance try: sum_imp = getattr(getattr(self, attr), Z_type) + zero_impedance ax.fill_between(sum_imp.data.index * 1e-9, total_imp, total_imp + sum_imp.data[component] * scale[leg], edgecolor=colorblind[index % 10], color=colorblind[index % 10]) total_imp += sum_imp.data[component] * scale[leg] if attr[:4] == "sum_": legend.append(attr[4:]) else: legend.append(attr) except AttributeError: pass if sigma is not None: spect = spectral_density(zero_impedance.data.index, sigma) spect = spect / spect.max() * max(total_imp) ax.plot(sum_imp.data.index * 1e-9, spect, 'r', linewidth=2.5) ax.legend(legend, loc="upper left", ncol=2) ax.set_xlabel("Frequency (GHz)") ax.set_ylabel(label_list[leg] + " - " + component + " part") ax.set_title(label_list[leg] + " - " + component + " part") if zoom is True: in_ax = inset_axes( ax, width="30%", # width = 30% of parent_bbox height=1.5, # height : 1 inch loc='upper right') total_imp = 0 for index in sorted_index: attr = attr_list[index] # Set all impedances with common indexes using + zero_impedance try: sum_imp = getattr(getattr(self, attr), Z_type) + zero_impedance in_ax.fill_between(sum_imp.data.index * 1e-3, total_imp, total_imp + sum_imp.data[component] * 1e-9, edgecolor=colorblind[index % 10], color=colorblind[index % 10]) total_imp += sum_imp.data[component] * 1e-9 except AttributeError: pass in_ax.set_xlim([0, 200]) in_ax.set_xlabel("Frequency (kHz)") in_ax.set_ylabel(r"$[\mathrm{G}\Omega]$") return ax
[docs] def effective_impedance(self, m: int, mu: int, sigma: float, M: int, tuneS: float, xi: float | None = None, mode: str = "Hermite") -> pd.DataFrame: """ Compute the longitudinal and transverse effective impedance. Parameters ---------- mu : int coupled bunch mode number, goes from 0 to (M-1) where M is the number of bunches m : int head-tail (or azimutal/synchrotron) mode number sigma : float RMS bunch length in [s] M : int Number of bunches. tuneS : float Synchrotron tune. xi : float, optional (non-normalized) chromaticity mode: str, optional type of the mode taken into account for the computation: -"Hermite" modes for Gaussian bunches Returns ------- summary : DataFrame Longitudinal and transverse effective impedance. """ attr_list = self.sum_names eff_array = np.zeros((len(attr_list), 3), dtype=complex) for i, attr in enumerate(attr_list): try: impedance = getattr(getattr(self, attr), "Zlong") eff_array[i, 0] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass try: impedance = getattr(getattr(self, attr), "Zxdip") eff_array[i, 1] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass try: impedance = getattr(getattr(self, attr), "Zydip") eff_array[i, 2] = effective_impedance(self.ring, impedance, m, mu, sigma, M, tuneS, xi, mode) except AttributeError: pass eff_array[:, 0] = eff_array[:, 0] * self.ring.omega0 * 1e3 eff_array[:, 1] = eff_array[:, 1] * 1e-3 eff_array[:, 2] = eff_array[:, 2] * 1e-3 summary = pd.DataFrame(eff_array, index=attr_list, columns=[ "Z/n [mOhm]", "sum betax x Zxeff [kOhm]", "sum betay x Zyeff [kOhm]" ]) return summary
[docs] def energy_loss(self, M: int, bunch_spacing: float, I: float, sigma: float | None = None, bunch_spectrum: NDArray | None = None, freq_spectrum: NDArray | None = None, n_points: float = 10e6) -> pd.DataFrame: """ Compute the beam and bunch loss factor and energy losses for each type of element in the model. Assumtions: - Constant spacing between bunches. - All bunches have the same bunch distribution. - Gaussian bunches if sigma is given. Parameters ---------- M : int Number of bunches in the beam. bunch_spacing : float Time between two bunches in [s]. I : float Total beam current in [A]. sigma : float, optional RMS bunch length in [s]. If None, freq_spectrum and bunch_spectrum must be given. Default is None. bunch_spectrum : array, optional Bunch spectrum to consider (i.e. FT of bunch profile). Not used if sigma is not None. Default is None. freq_spectrum : array, optional Frequency points corresponding to bunch_spectrum in [Hz]. Not used if sigma is not None. Default is None. n_points : float, optional Number of points used in the frequency spectrums. Default is 10e6. Returns ------- summary : Dataframe Contains the beam and bunch loss factor and energy loss for the full model and for each type of different component. """ fmax = self.sum.Zlong.data.index.max() fmin = self.sum.Zlong.data.index.min() Q = I * self.ring.T0 / M if fmin >= 0: fmin = -1 * fmax f = np.linspace(fmin, fmax, int(n_points)) if sigma is None: if bunch_spectrum is None: raise ValueError( "Either sigma or bunch_spectrum and freq_spectrum" + "must be specified.") if freq_spectrum is None: raise ValueError( "freq_spectrum must be given if bunch_spectrum" + " is specified.") if freq_spectrum[0] >= 0: freq_spectrum = np.concatenate( (-1 * freq_spectrum[:0:-1], freq_spectrum)) bunch_spectrum = np.concatenate( (bunch_spectrum[:0:-1], bunch_spectrum)) bunch_spectrum_interp = interp1d(freq_spectrum, bunch_spectrum, bounds_error=False, fill_value=0) bunch_spect = bunch_spectrum_interp(f) beam_spect = beam_spectrum(f, M, bunch_spacing, bunch_spectrum=bunch_spect, sigma=None) else: beam_spect = beam_spectrum(f, M, bunch_spacing, sigma=sigma) bunch_spect = gaussian_bunch_spectrum(f, sigma) attr_list = self.sum_names loss_array = np.zeros((len(attr_list), 2)) for i, attr in enumerate(attr_list): try: impedance = getattr(getattr(self, attr), "Zlong") loss_array[i, 0] = beam_loss_factor(impedance, f, beam_spect, self.ring) loss_array[i, 1] = beam_loss_factor(impedance, f, bunch_spect, self.ring) except AttributeError: pass loss_array = loss_array * 1e-12 summary = pd.DataFrame(loss_array, index=attr_list, columns=[ "loss factor (beam) [V/pC]", "loss factor (bunch) [V/pC]" ]) summary["P (beam) [W]"] = summary["loss factor (beam) [V/pC]"] * \ 1e12*Q**2/(self.ring.T0) summary["P (bunch) [W]"] = summary["loss factor (bunch) [V/pC]"] * \ 1e12*Q**2/(self.ring.T0)*M return summary
[docs] def power_loss_spectrum(self, M: int, bunch_spacing: float, I: float, sigma: float | None = None, bunch_spectrum: NDArray | None = None, freq_spectrum: NDArray | None = None, n_points: float = 10e6, max_overlap: bool = False, plot: bool = False) -> tuple[NDArray, NDArray]: """ Compute the power loss spectrum of the summed longitudinal impedance as in Eq. (4) of [1]. Assumtions: - Constant spacing between bunches. - All bunches have the same bunch distribution. - Gaussian bunches if sigma is given. Parameters ---------- M : int Number of bunches in the beam. bunch_spacing : float Time between two bunches in [s]. I : float Total beam current in [A]. sigma : float, optional RMS bunch length in [s]. If None, freq_spectrum and bunch_spectrum must be given. Default is None. bunch_spectrum : array Bunch spectrum to consider (i.e. FT of bunch profile). Not used if sigma is not None. Default is None. freq_spectrum : array Frequency points corresponding to bunch_spectrum in [Hz]. Not used if sigma is not None. Default is None. n_points : float, optional Number of points used in the frequency spectrum. Default is 10e6. max_overlap : bool, optional If True, the bunch spectrum (scaled to the number of bunches) is used instead of the beam spectrum to compute the maximum value of the power loss spectrum at each frequency. Should only be used to maximise the power loss at a given frequency (e.g. for HOMs) and not for the full spectrum. Default is False. plot : bool, optional If True, plots: - the overlap between the real part of the longitudinal impedance and the beam spectrum. - the power loss spectrum. Default is False. Returns ------- pf0 : array Frequency points. power_loss : array Power loss spectrum in [W]. References ---------- [1] : L. Teofili, et al. "A Multi-Physics Approach to Simulate the RF Heating 3D Power Map Induced by the Proton Beam in a Beam Intercepting Device", in IPAC'18, 2018, doi:10.18429/JACoW-IPAC2018-THPAK093 """ impedance = self.sum.Zlong fmax = impedance.data.index.max() fmin = impedance.data.index.min() Q = I * self.ring.T0 / M if fmin >= 0: fmin = -1 * fmax double_sided_impedance(impedance) frequency = np.linspace(fmin, fmax, int(n_points)) if sigma is None: if bunch_spectrum is None: raise ValueError( "Either sigma or bunch_spectrum and freq_spectrum" + "must be specified.") if freq_spectrum is None: raise ValueError( "freq_spectrum must be given if bunch_spectrum" + " is specified.") if freq_spectrum[0] >= 0: freq_spectrum = np.concatenate( (-1 * freq_spectrum[:0:-1], freq_spectrum)) bunch_spectrum = np.concatenate( (bunch_spectrum[:0:-1], bunch_spectrum)) bunch_spectrum_interp = interp1d(freq_spectrum, bunch_spectrum, bounds_error=False, fill_value=0) bunch_spectrum = bunch_spectrum_interp(frequency) else: bunch_spectrum = None if max_overlap is False: spectrum = beam_spectrum(frequency, M, bunch_spacing, sigma, bunch_spectrum) else: if bunch_spectrum is None: spectrum = gaussian_bunch_spectrum(frequency, sigma) * M else: spectrum = bunch_spectrum * M pmax = np.floor(fmax / self.ring.f0) pmin = np.floor(fmin / self.ring.f0) p = np.arange(pmin + 1, pmax) pf0 = p * self.ring.f0 ReZ = np.real(impedance(pf0)) spectral_density = np.abs(spectrum)**2 # interpolation of the spectrum is needed to avoid problems liked to # division by 0 # computing the spectrum directly to the frequency points gives # wrong results spect = interp1d(frequency, spectral_density) power_loss = (self.ring.f0 * Q)**2 * ReZ * spect(pf0) if plot is True: _, ax = plt.subplots() twin = ax.twinx() p1, = ax.plot(pf0, ReZ, color="r", label="Re[Z] [Ohm]") p2, = twin.plot(pf0, spect(pf0) * (self.ring.f0 * Q)**2, color="b", label="Beam spectrum [a.u.]") ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Re[Z] [Ohm]") twin.set_ylabel("Beam spectrum [a.u.]") plots = [p1, p2] ax.legend(handles=plots, loc="best") _, ax = plt.subplots() ax.plot(pf0, power_loss) ax.set_xlabel("Frequency [Hz]") ax.set_ylabel("Power loss [W]") return pf0, power_loss
[docs] def save(self, file: str): """ Save impedance model to file. The same pandas version is needed on both saving and loading computer for the pickle to work. Parameters ---------- file : str File where the impedance model is saved. Returns ------- None. """ to_save = { "wakefields": self.wakefields, "positions": self.positions, "names": self.names, "globals": self.globals, "globals_names": self.globals_names, "average_beta": self.average_beta } with open(file, "wb") as f: pickle.dump(to_save, f)
[docs] def load(self, file: str): """ Load impedance model from file. The same pandas version is needed on both saving and loading computer for the pickle to work. Parameters ---------- file : str File where the impedance model is saved. Returns ------- None. """ with open(file, 'rb') as f: to_load = pickle.load(f) self.wakefields = to_load["wakefields"] self.positions = to_load["positions"] self.names = to_load["names"] self.globals = to_load["globals"] self.globals_names = to_load["globals_names"] self.average_beta = to_load["average_beta"] self.compute_sum()