# -*- coding: utf-8 -*-
"""
This module defines the WakePotential and LongRangeResistiveWall classes which
deal with the single bunch and multi-bunch wakes.
"""
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from matplotlib.axes import Axes
from numpy.typing import NDArray
from scipy import signal
from scipy.constants import c, mu_0, pi
from mbtrack2.impedance.wakefield import WakeField
from mbtrack2.tracking.element import Element
from mbtrack2.tracking.particles import Beam, Bunch
from mbtrack2.utilities.spectrum import gaussian_bunch
from mbtrack2.utilities.synchrotron import Synchrotron
[docs]
class WakePotential(Element):
"""
Compute a wake potential from uniformly sampled wake functions by
performing a convolution with a bunch charge profile.
Two different time bases are used. The first one is controled by the n_bin
parameter and is used to compute the bunch profile. Then the bunch profile
is interpolated on the wake function time base which is used to perform the
convolution to get the wake potential.
Parameters
----------
ring : Synchrotron object
wakefield : Wakefield object
Wakefield object which contains the wake functions to be used. The wake
functions must be uniformly sampled!
n_bin : int, optional
Number of bins for constructing the longitudinal bunch profile.
interp_on_position : bool, optional
If True, the computed wake potential is interpolated on the exact
particle location. If False, the wake potential is interpolated on the
bin center and each particle of the bin get the same value.
Default is True.
Attributes
----------
rho : array of shape (n_bin, )
Bunch charge density profile in the unit [1/s].
Wp : array
Wake potential profile.
Wp_interp : array of shape (mp_number, )
Wake potential, obtained from interpolating Wp, exerted on each macro-particle.
Methods
-------
charge_density(bunch)
Compute bunch charge density profile in [1/s].
dipole_moment(bunch, plane, tau0)
Return the dipole moment of the bunch computed on the same time array
as the wake function.
prepare_wakefunction(wake_type)
Prepare the wake function of a given wake_type to be used for the wake
potential computation.
get_wakepotential(bunch, wake_type)
Return the wake potential computed on the wake function time array
limited to the bunch profile.
track(bunch)
Tracking method for the element.
plot_last_wake(wake_type)
Plot the last wake potential of a given type computed during the last
call of the track method.
reference_loss(bunch)
Calculate the loss factor and kick factor from the wake potential and
compare it to a reference value assuming a Gaussian bunch computed in
the frequency domain.
check_sampling()
Check if the wake function sampling is uniform.
reduce_sampling(factor)
Reduce wake function samping by an integer factor.
"""
[docs]
def __init__(self,
ring: Synchrotron,
wakefield: WakeField,
n_bin: int = 80,
interp_on_position: bool = True):
self.wakefield = wakefield
self.types = self.wakefield.wake_components
self.n_types = len(self.wakefield.wake_components)
self.ring = ring
self.n_bin = n_bin
if self.n_bin < 2:
raise ValueError("n_bin must be >= 2.")
self.check_sampling()
self._set_wakefunction_max_frequency()
self.interp_on_position = interp_on_position
# Suppress numpy warning for floating-point operations.
np.seterr(invalid='ignore')
[docs]
def _interp_regular_numpy(self, x_new: NDArray, x_min: float, dx: float,
y: NDArray) -> NDArray:
"""
Linear interpolation funtion for uniformly sampled input data.
Parameters
----------
x_new : array
New interpolation base.
x_min : float
Minimum value of the original interpolant base.
dx : float
Step of the original interpolant base.
y : array
Data to interpolate.
Returns
-------
result : array of len(x_new)
Data interpolated on x_new coordinates.
"""
x_max = x_min + (len(y) - 1) * dx # Compute the maximum valid x value
result = np.zeros_like(x_new, dtype=y.dtype)
valid_mask = (x_new >= x_min) & (x_new <= x_max)
indices = ((x_new[valid_mask] - x_min) / dx).astype(int)
indices = np.clip(indices, 0, len(y) - 2) # Ensure within valid range
weights = (x_new[valid_mask] - (x_min + indices*dx)) / dx
result[valid_mask] = y[indices] + weights * (y[indices + 1] -
y[indices])
return result
[docs]
def charge_density(self, bunch: Bunch):
"""
Compute bunch charge density profile in [1/s].
Parameters
----------
bunch : Bunch object
"""
# Get binning data
a, b, c, d = bunch.binning(n_bin=self.n_bin)
self.bins = a
self.sorted_index = b
self.profile = c
self.center = d
self.bin_size = self.bins[1] - self.bins[0]
# Compute charge density
self.rho = bunch.charge_per_mp * self.profile / (self.bin_size *
bunch.charge)
self.rho = np.array(self.rho)
# Compute time array
self.tau = np.array(self.center)
self.dtau = self.tau[1] - self.tau[0]
# Add N values before and after rho and tau
N = self.n_bin // 2
m = self.n_bin % 2
self.tau = np.arange(self.tau[0] - self.dtau * N,
self.tau[-1] + self.dtau * (N+m), self.dtau)
self.rho = np.pad(self.rho, (N, N + m),
mode="constant",
constant_values=(0, 0))
if len(self.tau) != len(self.rho):
self.tau = np.append(self.tau, self.tau[-1] + self.dtau)
self.tau_mean = np.mean(self.tau)
self.tau -= self.tau_mean
[docs]
def dipole_moment(self, bunch: Bunch, plane: str,
tau0: NDArray) -> NDArray:
"""
Return the dipole moment of the bunch computed on the same time array
as the wake function.
Parameters
----------
bunch : Bunch object
plane : str
Plane on which the dipole moment is computed, "x" or "y".
tau0 : array
Time array on which the dipole moment will be interpolated, in [s].
Returns
-------
dipole : array
Dipole moment of the bunch.
"""
dipole = np.bincount(self.sorted_index,
weights=bunch[plane],
minlength=self.n_bin + 1)[:-1]
dipole = np.divide(dipole,
self.profile,
where=self.profile != 0,
out=np.zeros_like(dipole))
dipole = np.nan_to_num(dipole)
# Add N values to get same size as tau/profile
N = self.n_bin // 2
m = self.n_bin % 2
dipole = np.concatenate((np.zeros(N + m), dipole, np.zeros(N)))
# Interpole on tau0 to get the same size as W0
dipole0 = self._interp_regular_numpy(tau0, np.min(self.tau), self.dtau,
dipole)
setattr(self, "dipole_" + plane, dipole0)
return dipole0
[docs]
def prepare_wakefunction(
self,
wake_type: str,
tau: NDArray,
save_data: bool = True) -> tuple[NDArray, float, NDArray]:
"""
Prepare the wake function of a given wake_type to be used for the wake
potential computation.
The new time array keeps the same sampling time as given in the
WakeFunction definition but is restricted to the bunch profile time
array.
Parameters
----------
wake_type : str
Type of the wake function to prepare: "Wlong", "Wxdip", ...
tau : array
Time domain array of the bunch profile in [s].
save_data : bool, optional
If True, the results are saved as atributes.
Returns
-------
tau0 : array
Time base of the wake function in [s].
dtau0 : float
Difference between two points of the wake function time base in
[s].
W0 : array
Wake function array in [V/C] or [V/C/m].
"""
wake_data = getattr(self.wakefield, wake_type).data
tau0 = np.array(wake_data.index)
dtau0 = tau0[1] - tau0[0]
W0 = np.array(wake_data["real"])
# Keep only the wake function on the rho window
ind = (tau0 > min(tau[0], 0)) & (tau0 < max(tau[-1], 0))
tau0 = tau0[ind]
W0 = W0[ind]
# Check the wake function window for assymetry
assym = (np.abs(tau0[-1]) - np.abs(tau0[0])) / dtau0
n_assym = int(np.floor(assym))
if np.floor(assym) > 1:
# add at head
if np.abs(tau0[-1]) > np.abs(tau0[0]):
tau0 = np.arange(tau0[0] - dtau0*n_assym, tau0[-1] + dtau0,
dtau0)
n_to_add = len(tau0) - len(W0)
W0 = np.concatenate((np.zeros(n_to_add), W0))
# add at tail
elif np.abs(tau0[0]) > np.abs(tau0[-1]):
tau0 = np.arange(tau0[0], tau0[-1] + dtau0 * (n_assym+1),
dtau0)
n_to_add = len(tau0) - len(W0)
W0 = np.concatenate((np.zeros(n_to_add), W0))
# Check is the wf is shorter than rho then add zeros
if (tau0[0] > tau[0]) or (tau0[-1] < tau[-1]):
n = max(int(np.ceil((tau0[0] - tau[0]) / dtau0)),
int(np.ceil((tau[-1] - tau0[-1]) / dtau0)))
tau0 = np.arange(tau0[0] - dtau0*n, tau0[-1] + dtau0 * (n+1),
dtau0)
W0 = np.insert(W0, 0, np.zeros(n))
n_to_add = len(tau0) - len(W0)
W0 = np.pad(W0, (0, n_to_add),
mode="constant",
constant_values=(0, 0))
if save_data:
setattr(self, "tau0_" + wake_type, tau0)
setattr(self, "dtau0_" + wake_type, dtau0)
setattr(self, "W0_" + wake_type, W0)
return (tau0, dtau0, W0)
[docs]
def get_wakepotential(self, bunch: Bunch,
wake_type: str) -> tuple[NDArray, NDArray]:
"""
Return the wake potential computed on the wake function time array
limited to the bunch profile.
Parameters
----------
bunch : Bunch object
wake_type : str
Wake function type: "Wlong", "Wxdip", ...
Returns
-------
tau0 : array
Time base.
Wp : array
Wake potential.
"""
(tau0, dtau0, W0) = self.prepare_wakefunction(wake_type, self.tau)
profile0 = self._interp_regular_numpy(tau0, np.min(self.tau),
self.dtau, self.rho)
if wake_type in ["Wlong", "Wxcst", "Wycst"]:
Wp = signal.convolve(profile0, W0 * -1, mode='same') * dtau0
elif wake_type == "Wxdip":
dipole0 = self.dipole_moment(bunch, "x", tau0)
Wp = signal.convolve(profile0 * dipole0, W0, mode='same') * dtau0
elif wake_type == "Wydip":
dipole0 = self.dipole_moment(bunch, "y", tau0)
Wp = signal.convolve(profile0 * dipole0, W0, mode='same') * dtau0
elif wake_type in ["Wxquad", "Wyquad"]:
Wp = signal.convolve(profile0, W0, mode='same') * dtau0
else:
raise ValueError("This type of wake is not taken into account.")
setattr(self, "profile0_" + wake_type, profile0)
setattr(self, wake_type, Wp)
return tau0, Wp
[docs]
@Element.parallel
@Element.track_bunch_if_non_empty
def track(self, bunch: Bunch | Beam):
"""
Tracking method for the element.
No bunch to bunch interaction, so written for Bunch objects and
@Element.parallel is used to handle Beam objects.
Parameters
----------
bunch : Bunch or Beam object.
"""
self.charge_density(bunch)
for wake_type in self.types:
tau0, Wp = self.get_wakepotential(bunch, wake_type)
if self.interp_on_position:
Wp_interp = self._interp_regular_numpy(
bunch["tau"], np.min(tau0 + self.tau_mean),
(tau0[1] - tau0[0]), Wp)
else:
Wp_interp = np.interp(self.center, tau0 + self.tau_mean, Wp, 0,
0)
Wp_interp = Wp_interp[self.sorted_index]
if wake_type == "Wlong":
bunch["delta"] += Wp_interp * bunch.charge / self.ring.E0
elif wake_type == "Wxdip":
bunch["xp"] += Wp_interp * bunch.charge / self.ring.E0
elif wake_type == "Wydip":
bunch["yp"] += Wp_interp * bunch.charge / self.ring.E0
elif wake_type == "Wxquad":
bunch["xp"] += (bunch["x"] * Wp_interp * bunch.charge /
self.ring.E0)
elif wake_type == "Wyquad":
bunch["yp"] += (bunch["y"] * Wp_interp * bunch.charge /
self.ring.E0)
elif wake_type == "Wxcst":
bunch["xp"] += Wp_interp * bunch.charge / self.ring.E0
elif wake_type == "Wycst":
bunch["yp"] += Wp_interp * bunch.charge / self.ring.E0
else:
raise ValueError("Incorrect wake_type. Must be [Wlong, Wxdip,\
Wydip, Wxquad, Wyquad, Wxcst, Wycst].")
[docs]
def plot_last_wake(
self,
wake_type: str,
plot_rho: bool = True,
plot_dipole: bool = False,
plot_wake_function: bool = True,
ax: Axes | None = None,
) -> Axes:
"""
Plot the last wake potential of a given type computed during the last
call of the track method.
Parameters
----------
wake_type : str
Type of the wake to plot: "Wlong", "Wxdip", ...
plot_rho : bool, optional
Plot the normalised bunch profile. The default is True.
plot_dipole : bool, optional
Plot the normalised dipole moment. The default is False.
plot_wake_function : bool, optional
Plot the normalised wake function. The default is True.
ax : Axes, optional
Axes where the plot is displayed. If None, a new figure is created.
Returns
-------
ax : Axes
"""
labels = {
"Wlong": r"$W_{p,long}$ (V/pC)",
"Wxdip": r"$W_{p,x}^{D} (V/pC)$",
"Wydip": r"$W_{p,y}^{D} (V/pC)$",
"Wxquad": r"$W_{p,x}^{Q} (V/pC/m)$",
"Wyquad": r"$W_{p,y}^{Q} (V/pC/m)$",
"Wxcst": r"$W_{p,x}^{M}$ (V/pC)",
"Wycst": r"$W_{p,y}^{M}$ (V/pC)",
}
Wp = getattr(self, wake_type)
tau0 = getattr(self, "tau0_" + wake_type)
if ax is None:
fig, ax = plt.subplots()
ax.plot(tau0 * 1e12, Wp * 1e-12, label=labels[wake_type])
ax.set_xlabel("$\\tau$ (ps)")
ax.set_ylabel(labels[wake_type])
if plot_rho is True:
profile0 = getattr(self, "profile0_" + wake_type)
profile_rescaled = profile0 / max(profile0) * max(np.abs(Wp))
rho_rescaled = self.rho / max(self.rho) * max(np.abs(Wp))
ax.plot(tau0 * 1e12,
profile_rescaled * 1e-12,
label=r"$\rho$ interpolated (a.u.)")
ax.plot((self.tau + self.tau_mean) * 1e12,
rho_rescaled * 1e-12,
label=r"$\rho$ (a.u.)",
linestyle='dashed')
ax.legend()
if plot_wake_function is True:
W0 = getattr(self, "W0_" + wake_type)
W0_rescaled = W0 / max(np.abs(W0)) * max(np.abs(Wp))
ax.plot(tau0 * 1e12,
W0_rescaled * 1e-12,
label=r"$W_{function}$ (a.u.)")
ax.legend()
if plot_dipole is True:
dipole = getattr(self, "dipole_" + wake_type[1])
dipole_rescaled = dipole / max(dipole) * max(np.abs(Wp))
ax.plot(tau0 * 1e12,
dipole_rescaled * 1e-12,
label=r"Dipole moment (a.u.)")
ax.legend()
return ax
[docs]
def get_gaussian_wakepotential(
self,
sigma: float,
wake_type: str,
dipole: float = 1e-3,
) -> tuple[NDArray, NDArray, NDArray, NDArray, NDArray]:
"""
Return the wake potential computed using a perfect gaussian profile.
Parameters
----------
sigma : float
RMS bunch length in [s].
wake_type : str
Wake function type: "Wlong", "Wxdip", ...
dipole : float, optional
Dipole moment to consider in [m], (uniform dipole moment).
Returns
-------
tau0 : array
Time base in [s].
W0 : array
Wake function.
Wp : array
Wake potential.
profile0 : array
Gaussian bunch profile.
dipole0 : array
Dipole moment.
"""
tau = np.linspace(-10 * sigma, 10 * sigma, int(1e3))
(tau0, dtau0, W0) = self.prepare_wakefunction(wake_type, tau, False)
profile0 = gaussian_bunch(tau0, sigma)
dipole0 = np.ones_like(profile0) * dipole
if wake_type in ["Wlong", "Wxcst", "Wycst"]:
Wp = signal.convolve(profile0, W0 * -1, mode='same') * dtau0
elif wake_type in ["Wxdip", "Wydip"]:
Wp = signal.convolve(profile0 * dipole0, W0, mode='same') * dtau0
elif wake_type in ["Wxquad", "Wyquad"]:
Wp = signal.convolve(profile0, W0, mode='same') * dtau0
else:
raise ValueError("This type of wake is not taken into account.")
return tau0, W0, Wp, profile0, dipole0
[docs]
def plot_gaussian_wake(
self,
sigma: float,
wake_type: str,
dipole: float = 1e-3,
plot_rho: bool = True,
plot_dipole: bool = False,
plot_wake_function: bool = True,
ax: Axes | None = None,
) -> Axes:
"""
Plot the wake potential of a given type for a perfect gaussian bunch.
Parameters
----------
sigma : float
RMS bunch length in [s].
wake_type : str
Type of the wake to plot: "Wlong", "Wxdip", ...
dipole : float, optional
Dipole moment to consider in [m], (uniform dipole moment).
plot_rho : bool, optional
Plot the normalised bunch profile. The default is True.
plot_dipole : bool, optional
Plot the normalised dipole moment. The default is False.
plot_wake_function : bool, optional
Plot the normalised wake function. The default is True.
ax : Axes, optional
Axes where the plot is displayed. If None, a new figure is created.
Returns
-------
ax : Axes
"""
labels = {
"Wlong": r"$W_{p,long}$ (V/pC)",
"Wxdip": r"$W_{p,x}^{D}$ (V/pC)",
"Wydip": r"$W_{p,y}^{D}$ (V/pC)",
"Wxquad": r"$W_{p,x}^{Q}$ (V/pC/m)",
"Wyquad": r"$W_{p,y}^{Q}$ (V/pC/m)",
"Wxcst": r"$W_{p,x}^{M}$ (V/pC)",
"Wycst": r"$W_{p,y}^{M}$ (V/pC)"
}
tau0, W0, Wp, profile0, dipole0 = self.get_gaussian_wakepotential(
sigma, wake_type, dipole)
if ax is None:
fig, ax = plt.subplots()
ax.plot(tau0 * 1e12, Wp * 1e-12, label=labels[wake_type])
ax.set_xlabel(r"$\tau$ (ps)")
ax.set_ylabel(labels[wake_type])
if plot_rho is True:
profile_rescaled = profile0 / max(profile0) * max(np.abs(Wp))
ax.plot(tau0 * 1e12,
profile_rescaled * 1e-12,
label=r"$\rho$ (a.u.)")
ax.legend()
if plot_wake_function is True:
W0_rescaled = W0 / max(W0) * max(np.abs(Wp))
ax.plot(tau0 * 1e12,
W0_rescaled * 1e-12,
label=r"$W_{function}$ (a.u.)")
ax.legend()
if plot_dipole is True:
dipole_rescaled = dipole0 / max(dipole0) * max(np.abs(Wp))
ax.plot(tau0 * 1e12,
dipole_rescaled * 1e-12,
label=r"Dipole moment (a.u.)")
ax.legend()
return ax
[docs]
def reference_loss(self, bunch: Bunch) -> pd.DataFrame:
"""
Calculate the loss factor and kick factor from the wake potential and
compare it to a reference value assuming a Gaussian bunch computed in
the frequency domain.
Parameters
----------
bunch : Bunch object
Returns
-------
loss_data : DataFrame
An output showing the loss/kick factors compared to the reference
values.
"""
loss = []
loss_0 = []
delta_loss = []
index = []
for wake_type in self.types:
tau0, Wp = self.get_wakepotential(bunch, wake_type)
profile0 = getattr(self, "profile0_" + wake_type)
factorTD = np.trapezoid(Wp * profile0, tau0)
if wake_type == "Wlong":
factorTD *= -1
if wake_type == "Wxdip":
factorTD /= bunch["x"].mean()
if wake_type == "Wydip":
factorTD /= bunch["y"].mean()
Z = getattr(self.wakefield, "Z" + wake_type[1:])
sigma = bunch['tau'].std()
factorFD = Z.loss_factor(sigma)
loss.append(factorTD)
loss_0.append(factorFD)
delta_loss.append((factorTD-factorFD) / factorFD * 100)
if wake_type == "Wlong":
index.append("Wlong [V/C]")
else:
index.append(wake_type + " [V/C/m]")
column = ['TD factor', 'FD factor', 'Relative error [%]']
loss_data = pd.DataFrame(np.array([loss, loss_0, delta_loss]).T,
columns=column,
index=index)
return loss_data
[docs]
def check_sampling(self):
"""
Check if the wake function sampling is uniform.
Raises
------
ValueError
"""
for wake_type in self.types:
idx = getattr(self.wakefield, wake_type).data.index
diff = idx[1:] - idx[:-1]
result = np.all(np.isclose(diff, diff[0], atol=1e-15))
if not result:
raise ValueError(
"The wake function must be uniformly sampled.")
[docs]
def reduce_sampling(self, factor: int):
"""
Reduce wake function samping by an integer factor.
Used to reduce computation time for long bunches.
Parameters
----------
factor : int
"""
for wake_type in self.types:
idx = getattr(self.wakefield, wake_type).data.index[::factor]
getattr(self.wakefield,
wake_type).data = getattr(self.wakefield,
wake_type).data.loc[idx]
self.check_sampling()
[docs]
def _set_wakefunction_max_frequency(self):
for j, wake_type in enumerate(self.types):
idx = getattr(self.wakefield, wake_type).data.index
deltaT = idx[1] - idx[0]
if j == 0:
self._wakefunction_max_frequency = 0.5 / deltaT
else:
self._wakefunction_max_frequency = min(
0.5 / deltaT, self._wakefunction_max_frequency)
@property
def wakefunction_max_frequency(self) -> float:
"""Return the wake function maximum frequency in [Hz]."""
return self._wakefunction_max_frequency
@property
def binning_max_frequency(self) -> float:
"""Return the binning maximum frequency in [Hz]."""
return 0.5 / self.dtau
@property
def wakepotential_max_frequency(self) -> float:
"""Return the wake potential maximum frequency in [Hz]."""
return min(self.wakefunction_max_frequency, self.binning_max_frequency)
[docs]
class LongRangeResistiveWall(Element):
"""
Element to deal with multi-bunch and multi-turn wakes from resistive wall
using the algorithm defined in [1].
Main approximations:
- Bunches are treated as point charge.
- Assymptotic expression for the resistive wall wake functions are
used.
- Multi-turn wakes are limited to nt turns.
Self-bunch interaction is not included in this element and should be dealed
with the WakePotential class.
Parameters
----------
ring : Synchrotron object
beam : Beam object
length : float
Length of the resistive pipe to consider in [m].
rho : float
Effective resistivity to consider in [ohm.m] as in [1].
radius : float
Beam pipe radius to consider in [m].
types : str or list, optional
Wake types to consider.
Available types are: "Wlong","Wxdip","Wydip","Wxquad","Wyquad".
The default is ["Wlong","Wxdip","Wydip"].
nt : int or float, optional
Number of turns to consider for the long range wakes.
The default is 50.
x3 : float, optional
Horizontal effective radius of the 3rd power in [m], as Eq.27 in [1].
The default is radius.
y3 : float, optional
Vertical effective radius of the 3rd power in [m], as Eq.27 in [1].
The default is radius.
x3_quad : float, optional
Quadrupolar wake horizontal effective radius of the 3rd power in [m].
As given by ResistiveWallModel.resistive_wall_effective_radius_yokoya.
The quadrupolar radius can be either be positive (defocusing) or
negative (focusing).
The default is None.
y3_quad : float, optional
Quadrupolar wake vertical effective radius of the 3rd power in [m].
As given by ResistiveWallModel.resistive_wall_effective_radius_yokoya.
The quadrupolar radius can be either be positive (defocusing) or
negative (focusing).
The default is None.
average_beta : array-like of shape (2,), optional
Average beta function used for kick normalization in [m].
The transverse kick is normalized by average_beta / local_beta.
If None and an AT lattice is loaded, average_beta is computed from
the lattice.
If None and an AT lattice is not loaded, average_beta is taken to be
equal to local_beta, i.e. no normalization.
The default is None.
References
----------
[1] : Skripka, Galina, et al. "Simultaneous computation of intrabunch and
interbunch collective beam motions in storage rings." NIM.A (2016).
"""
[docs]
def __init__(self,
ring: Synchrotron,
beam: Beam,
length: float,
rho: float,
radius: float,
types: list[str] = ["Wlong", "Wxdip", "Wydip"],
nt: int = 50,
x3: float | None = None,
y3: float | None = None,
x3_quad: float | None = None,
y3_quad: float | None = None,
average_beta: NDArray | None = None):
# parameters
self.ring = ring
self.length = length
self.rho = rho
self.nt = int(nt)
self.nb = len(beam)
if isinstance(types, str):
self.types = [types]
elif isinstance(types, list):
self.types = types
# effective radius for RW
self.radius = radius
if x3 is not None:
self.x3 = x3
else:
self.x3 = radius
if y3 is not None:
self.y3 = y3
else:
self.y3 = radius
if ("Wxquad" in self.types):
if x3_quad is None:
raise ValueError("x3_quad must be given to use Wxquad type.")
self.x_sign = np.sign(x3_quad)
self.x3_quad = np.abs(x3_quad)
if ("Wyquad" in self.types):
if y3_quad is None:
raise ValueError("y3_quad must be given to use Wyquad type.")
self.y_sign = np.sign(y3_quad)
self.y3_quad = np.abs(y3_quad)
# kick beta normalization
if average_beta is None:
average_beta = self.ring.optics.average_beta
self.norm_x = average_beta[0] / self.ring.optics.local_beta[0]
self.norm_y = average_beta[1] / self.ring.optics.local_beta[1]
# constants
self.Z0 = mu_0 * c
self.t0 = (2 * self.rho * self.radius**2 / self.Z0)**(1 / 3) / c
# check approximation
if 20 * self.t0 > ring.T1:
raise ValueError("The approximated wake functions are not valid.")
# init tables
self.tau = np.ones((self.nb, self.nt)) * 1e100
self.x = np.zeros((self.nb, self.nt))
self.y = np.zeros((self.nb, self.nt))
self.charge = np.zeros((self.nb, self.nt))
[docs]
def Wlong(self, t: float) -> float:
"""
Approxmiate expression for the longitudinal resistive wall wake
function - Eq.24 of [1].
Parameters
----------
t : float
Time in [s].
Returns
-------
wl : float
Wake function in [V/C].
"""
wl = (1 / (4 * pi * self.radius) *
np.sqrt(self.Z0 * self.rho / (c * pi * t**3)) * self.length) * -1
return wl
[docs]
def Wdip(self, t: float, plane: str) -> float:
"""
Approxmiate expression for the transverse resistive wall wake
function - Eq.26 of [1].
Parameters
----------
t : float
Time in [s].
plane : str
"x" or "y".
Returns
-------
wdip : float
Wake function in [V/C/m].
"""
if plane == "x":
r3 = self.x3
elif plane == "y":
r3 = self.y3
elif plane == "x_quad":
r3 = self.x3_quad
elif plane == "y_quad":
r3 = self.y3_quad
else:
raise ValueError()
wdip = (1 / (pi * r3**3) * np.sqrt(self.Z0 * c * self.rho / (pi*t)) *
self.length)
return wdip
[docs]
def update_tables(self, beam: Beam):
"""
Update tables.
Table tau[i,j] is defined as the time difference of the bunch i center
of mass with respect to center of the RF bucket number 0 at turn j.
Turn 0 corresponds to the tracked turn.
Positive time corresponds to past events and negative time to future
events.
Parameters
----------
beam : Beam object
Returns
-------
None.
"""
# shift tables to next turn
self.tau += self.ring.T0
self.tau = np.roll(self.tau, shift=1, axis=1)
self.x = np.roll(self.x, shift=1, axis=1)
self.y = np.roll(self.y, shift=1, axis=1)
self.charge = np.roll(self.charge, shift=1, axis=1)
# update tables
if beam.mpi_switch:
beam.mpi.share_means(beam)
# negative sign as when bunch 0 is tracked, the others are not yet passed
self.tau[:,
0] = beam.mpi.mean_all[:,
4] - beam.bunch_index * self.ring.T1
self.x[:, 0] = beam.mpi.mean_all[:, 0]
self.y[:, 0] = beam.mpi.mean_all[:, 2]
self.charge[:, 0] = beam.mpi.charge_all
else:
mean_all = beam.bunch_mean
charge_all = beam.bunch_charge
# negative sign as when bunch 0 is tracked, the others are not yet passed
self.tau[:, 0] = mean_all[
4, beam.filling_pattern] - beam.bunch_index * self.ring.T1
self.x[:, 0] = mean_all[0, beam.filling_pattern]
self.y[:, 0] = mean_all[2, beam.filling_pattern]
self.charge[:, 0] = charge_all[beam.filling_pattern]
[docs]
def get_kick(self, rank: int, wake_type: float) -> float:
"""
Compute the wake kick to apply.
Parameters
----------
rank : int
Rank of the bunch, as defined in Mpi class.
wake_type : float
Type of the wake to compute.
Returns
-------
sum_kick : float
Sum of the kicks from the different bunches at different turns.
"""
sum_kick = 0
for j in range(self.nt):
for i in range(self.nb):
if (j == 0) and (rank <= i):
continue
deltaT = self.tau[i, j] - self.tau[rank, 0]
if wake_type == "Wlong":
sum_kick += self.Wlong(deltaT) * self.charge[i, j]
elif wake_type == "Wxdip":
sum_kick += self.Wdip(
deltaT, "x") * self.charge[i, j] * self.x[i, j]
elif wake_type == "Wydip":
sum_kick += self.Wdip(
deltaT, "y") * self.charge[i, j] * self.y[i, j]
elif wake_type == "Wxquad":
sum_kick += self.x_sign * self.Wdip(
deltaT, "x_quad") * self.charge[i, j]
elif wake_type == "Wyquad":
sum_kick += self.y_sign * self.Wdip(
deltaT, "y_quad") * self.charge[i, j]
return sum_kick
[docs]
def track_bunch(self, bunch: Bunch, rank: int):
"""
Track a bunch.
Should only be used within the track method and not standalone.
Parameters
----------
bunch : Bunch object
rank : int
Rank of the bunch, as defined in Mpi class.
Returns
-------
None.
"""
for wake_type in self.types:
kick = self.get_kick(rank, wake_type)
if wake_type == "Wlong":
bunch["delta"] -= kick / self.ring.E0
elif wake_type == "Wxdip":
bunch["xp"] += kick / self.ring.E0 * self.norm_x
elif wake_type == "Wydip":
bunch["yp"] += kick / self.ring.E0 * self.norm_y
elif wake_type == "Wxquad":
bunch["xp"] += (bunch["x"] * kick / self.ring.E0) * self.norm_x
elif wake_type == "Wyquad":
bunch["yp"] += (bunch["y"] * kick / self.ring.E0) * self.norm_y
[docs]
def track(self, beam: Beam):
"""
Track a beam.
Parameters
----------
beam : Beam object
Returns
-------
None.
"""
self.update_tables(beam)
if beam.mpi_switch:
rank = beam.mpi.rank
bunch_index = beam.mpi.bunch_num # Number of the tracked bunch in this processor
bunch = beam[bunch_index]
self.track_bunch(bunch, rank)
else:
for rank, bunch in enumerate(beam.not_empty):
self.track_bunch(bunch, rank)