# -*- coding: utf-8 -*-
"""
This module defines both exponential and FIR based bunch by bunch damper
feedback for tracking.
"""
import matplotlib.pyplot as plt
import numpy as np
from numpy.typing import NDArray
from mbtrack2.tracking.element import Element
from mbtrack2.tracking.particles import Beam, Bunch
from mbtrack2.utilities.synchrotron import Synchrotron
[docs]
class TransverseExponentialDamper(Element):
"""
Simple bunch by bunch damper feedback system based on exponential damping.
Parameters
----------
ring : Synchrotron object
Synchrotron to use.
damping_time : float array of shape (2,)
Damping time in [turn].
If 0, the damper is not used in the considered plane.
phase_diff : float array of shape (2,)
Phase setting of the feedback in [deg]:
- 90 corresponds to a pure resistive damper
- 0 corresponds to a pure reactive damper.
local_beta : float array of shape (2,), optional
Beta function at the bpm/kicker location in [m].
The default is ring.optics.local_beta.
"""
[docs]
def __init__(self,
ring: Synchrotron,
damping_time: NDArray,
phase_diff: NDArray,
local_beta: NDArray | None = None):
self.ring = ring
self.damping_time = np.array(damping_time)
self.phase_diff = np.deg2rad(np.array(phase_diff))
if local_beta:
self.beta_bpm = local_beta
self.beta_kicker = local_beta
else:
self.beta_bpm = ring.optics.local_beta
self.beta_kicker = ring.optics.local_beta
[docs]
@Element.parallel
def track(self, bunch: Bunch | Beam):
"""
Tracking method for the feedback system
No bunch to bunch interaction, so written for Bunch object and
@Element.parallel is used to handle Beam object.
Parameters
----------
bunch : Bunch or Beam object
"""
if self.damping_time[0] != 0:
mean_x = bunch['x'].mean()
mean_xp = bunch['xp'].mean()
bunch['xp'] -= (
2 / self.damping_time[0] * np.sin(self.phase_diff[0]) *
mean_xp) * np.sqrt(self.beta_bpm[0] / self.beta_kicker[0]) + (
2 / self.damping_time[0] * np.cos(self.phase_diff[0]) *
mean_x) / np.sqrt(self.beta_bpm[0] * self.beta_kicker[0])
if self.damping_time[1] != 0:
mean_y = bunch['y'].mean()
mean_yp = bunch['yp'].mean()
bunch['yp'] -= (
2 / self.damping_time[1] * np.sin(self.phase_diff[1]) *
mean_yp) * np.sqrt(self.beta_bpm[1] / self.beta_kicker[1]) + (
2 / self.damping_time[1] * np.cos(self.phase_diff[1]) *
mean_y) / np.sqrt(self.beta_bpm[1] * self.beta_kicker[1])
[docs]
class LongitudinalExponentialDamper(Element):
"""
Simple bunch by bunch damper feedback system based on exponential damping.
Parameters
----------
ring : Synchrotron object
Synchrotron to use.
damping_time : float
Damping time in [turn].
If 0, the damper is not used.
phase_diff : float
Phase setting of the feedback in [deg]:
- 90 corresponds to a pure resistive damper
- 0 corresponds to a pure reactive damper.
long_tune : float,
Longitudinal (synchrotron) tune of the ring.
In the presence of multi-harmonic RF systems, this tune should be
computed from the complete RF configuration (voltages and phases).
The recommended way to obtain this value is via
`ring.get_longitudinal_twiss(...)`.
For a single-RF system, the synchrotron tune provided by
`ring.synchrotron_tune(Vrf)` may be used.
"""
[docs]
def __init__(self, ring: Synchrotron, damping_time: float,
phase_diff: float, long_tune: float):
self.ring = ring
self.damping_time = damping_time
self.phase_diff = np.deg2rad(phase_diff)
self.eta = ring.eta(0)
self.long_tune = long_tune
[docs]
@Element.parallel
def track(self, bunch: Bunch | Beam):
"""
Tracking method for the feedback system
No bunch to bunch interaction, so written for Bunch object and
@Element.parallel is used to handle Beam object.
Parameters
----------
bunch : Bunch or Beam object
"""
if self.damping_time != 0:
mean_tau = bunch['tau'].mean()
mean_delta = bunch['delta'].mean()
bunch['delta'] -= ((2 / self.damping_time) *
(np.sin(self.phase_diff) * mean_delta +
np.cos(self.phase_diff) * mean_tau *
self.long_tune * self.ring.omega0 / self.eta))
[docs]
class FIRDamper(Element):
"""
Bunch by bunch damper feedback system based on FIR filters.
FIR computation is based on [1].
Parameters
----------
ring : Synchrotron object
Synchrotron to use.
plane : {"x","y","s"}
Allow to choose on which plane the damper is active.
tune : float
Reference (betatron or synchrotron) tune for which the damper system
is set.
turn_delay : int
Number of turn delay before the kick is applied.
tap_number : int
Number of tap for the FIR filter.
gain : float
Gain of the FIR filter.
phase : float
Phase of the FIR filter in [degree].
local_beta : float, optional
Beta function at the bpm/kicker location in [m].
The default is ring.optics.local_beta of the considered plane.
bpm_error : float, optional
RMS measurement error applied to the computed mean position.
Unit is [m] if the plane is "x" or "y" and [s] if the plane is "s".
The default is None.
max_kick : float, optional
Maximum kick strength limitation.
Unit is [rad] if the plane is "x" or "y" and no unit (delta) if the
plane is "s".
The default is None.
Attributes
----------
pos : array
Stored beam postions.
kick : array
Stored damper kicks.
coef : array
Coefficients of the FIR filter.
Methods
-------
get_fir(tap_number, tune, phase, turn_delay, gain)
Initialize the FIR filter and return an array containing the FIR
coefficients.
plot_fir()
Plot the gain and the phase of the FIR filter.
track(beam_or_bunch)
Tracking method.
get_damping_time()
Return damping time in [turn].
get_tune_shift()
Return tune shit (in tune units).
References
----------
[1] T.Nakamura, S.Daté, K. Kobayashi, T. Ohshima. Proceedings of EPAC
2004. Transverse bunch by bunch feedback system for the Spring-8
storage ring.
"""
[docs]
def __init__(self,
ring: Synchrotron,
plane: str,
turn_delay: int,
tune: float,
tap_number: int,
gain: float,
phase: float,
local_beta: NDArray | None = None,
bpm_error: float | None = None,
max_kick: float | None = None):
self.ring = ring
self.tune = tune
self.turn_delay = turn_delay
self.tap_number = tap_number
self.phase = -phase
self.bpm_error = bpm_error
self.max_kick = max_kick
self.plane = plane
if local_beta:
self.beta_bpm = self.beta_kicker = local_beta
else:
self.beta_bpm = self.beta_kicker = ring.optics.local_beta[
0] if plane == 'x' else ring.optics.local_beta[1]
self.gain = gain
self.action, self.damp_idx, self.mean_idx = self._set_plane_indices(
plane)
self.beam_no_mpi = False
self.pos = np.zeros((self.tap_number, 1))
self.kick = np.zeros((self.turn_delay + 1, 1))
self.coef = self.get_fir(self.tap_number, self.tune, self.phase,
self.turn_delay, self.gain)
[docs]
def _set_plane_indices(self, plane: str) -> tuple[str, int, int]:
if plane == "x":
return "xp", 0, 0
if plane == "y":
return "yp", 1, 2
if plane == "s":
return "delta", 2, 4
raise ValueError(f"Invalid plane: {plane}")
[docs]
def get_damping_time(self) -> float:
"""Return damping time in [turn]."""
l = np.arange(0, self.tap_number)
return 2 / np.sum(
self.coef * np.sin(-2 * np.pi * (l + self.turn_delay) * self.tune))
[docs]
def get_tune_shift(self) -> float:
"""Return tune shit (in tune units)."""
l = np.arange(0, self.tap_number)
return 1 / (2 * np.pi) * 1 / 2 * np.sum(
self.coef * np.cos(-2 * np.pi * (l + self.turn_delay) * self.tune))
[docs]
def get_fir(self, tap_number: int, tune: float, phase: float,
turn_delay: int, gain: float) -> list[float]:
"""
Compute the FIR coefficients.
FIR computation is based on [1].
Parameters
----------
tap_number : int
Number of tap for the FIR filter.
tune : float
Reference (betatron or synchrotron) tune for which the damper system
is set.
phase : float
Phase of the FIR filter in [degree].
turn_delay : int
Number of turn delay before the kick is applied.
gain : float
Gain of the FIR filter.
Returns
-------
FIR_coef : array
Array containing the FIR coefficients.
"""
it = np.arange(-turn_delay, -turn_delay - tap_number, -1)
zeta = np.deg2rad(phase)
phi = 2 * np.pi * tune
cs = np.cos(phi * it)
sn = np.sin(phi * it)
cc = np.vstack([np.ones(tap_number), cs, sn, it * sn, it * cs])
W = np.linalg.inv(cc @ cc.T)
D = W @ cc
fir_coef = gain * (D[1] * np.cos(zeta) + D[2] * np.sin(zeta))
return fir_coef
[docs]
def plot_fir(
self,
axs: list[plt.Axes] | None = None,
) -> list[plt.Axes]:
"""
Plot the gain and the phase of the FIR filter.
Parameters
----------
axs : list of 2 axes, optional
Axes for gain and phase plots.
Returns
-------
axes : list of 2 axes.
Axes for gain and phase plots.
"""
tune = np.arange(0, 1, 0.0001)
H_FIR = np.sum([
coef * np.exp(-1j * 2 * np.pi * k * tune)
for k, coef in enumerate(self.coef)
],
axis=0)
latency = np.exp(-1j * 2 * np.pi * tune * self.turn_delay)
H_tot = H_FIR * latency
gain = np.abs(H_tot)
phase = np.angle(H_tot, deg=True)
if axs is None:
_, [ax1, ax2] = plt.subplots(2, 1)
else:
ax1, ax2 = axs
ax1.plot(tune, gain)
ax1.set_ylabel("Gain")
ax2.plot(tune, phase)
ax2.set_xlabel("Tune")
ax2.set_ylabel("Phase in degree")
return [ax1, ax2]
[docs]
def track(self, beam_or_bunch: Beam | Bunch):
"""
Tracking method.
Parameters
----------
beam_or_bunch : Beam or Bunch
Data to track.
"""
if isinstance(beam_or_bunch, Bunch):
self._track_sb(beam_or_bunch)
elif isinstance(beam_or_bunch, Beam):
beam = beam_or_bunch
if beam.mpi_switch is True:
self._track_sb(beam[beam.mpi.bunch_num])
else:
if self.beam_no_mpi is False:
self._init_beam_no_mpi(beam)
for i, bunch in enumerate(beam.not_empty):
self._track_sb(bunch, i)
else:
TypeError("beam_or_bunch must be a Beam or Bunch")
[docs]
def _init_beam_no_mpi(self, beam: Beam):
"""
Change array sizes if Beam is used without mpi.
Parameters
----------
beam : Beam
Beam to track.
"""
n_bunch = len(beam)
self.pos = np.zeros((self.tap_number, n_bunch))
self.kick = np.zeros((self.turn_delay + 1, n_bunch))
self.beam_no_mpi = True
[docs]
def _track_sb(self, bunch: Bunch, bunch_number: int = 0):
"""
Core of the tracking method.
Parameters
----------
bunch : Bunch
Bunch to track.
bunch_number : int, optional
Number of bunch in beam.not_empty.
The default is 0.
"""
self.pos[0, bunch_number] = bunch.mean[self.mean_idx]
if self.bpm_error is not None:
self.pos[0, bunch_number] += np.random.normal(0, self.bpm_error)
kick = 0
for k in range(self.tap_number):
kick += self.coef[k] * self.pos[k, bunch_number]
if self.max_kick is not None:
if kick > self.max_kick:
kick = self.max_kick
elif kick < -1 * self.max_kick:
kick = -1 * self.max_kick
self.kick[
-1,
bunch_number] = kick # / np.sqrt(self.beta_bpm * self.beta_kicker)
bunch[self.action] += self.kick[0, bunch_number]
self.pos[:, bunch_number] = np.roll(self.pos[:, bunch_number], 1)
self.kick[:, bunch_number] = np.roll(self.kick[:, bunch_number], -1)