# Copyright (c) 2024-2026 ETH Zurich and the authors of the qttools package.
import warnings
import numpy as np
from qttools import NDArray, xp
from qttools.kernels import linalg
from qttools.kernels.operator import operator_inverse
from qttools.nevp.nevp import NEVP
from qttools.utils.mpi_utils import get_section_sizes
class Beyn(NEVP):
"""Beyn's integral method for solving NEVP.[^1]
This is implemented along the lines of what is described in [^2].
[^1]: W.-J. Beyn, An integral method for solving nonlinear
eigenvalue problems, Linear Algebra and its Applications, 2012.
[^2]: S. Brück, Ab-initio Quantum Transport Simulations for
Nanoelectronic Devices, ETH Zurich, 2017.
Parameters
----------
r_o : float
The outer radius of the annulus for the contour integration.
r_i : float
The inner radius of the annulus for the contour integration.
m_0 : int
Guess for the number of eigenvalues that lie in the subspace.
num_quad_points : int
The number of quadrature points to use for the contour
integration.
num_threads_contour : int, optional
The number of cuda threads to use for the contour integration kernel.
Only relevant for GPU computations.
eig_compute_location : str, optional
The location where to compute the eigenvalues and eigenvectors.
Can be either "numpy" or "cupy" or "nvmath".
project_compute_location : str, optional
The location where to compute the singular value
or qr decomposition for the projector.
Can be either "numpy" or "cupy".
use_qr : bool, optional
Whether to use QR decomposition for the projector instead of SVD.
Default is `False`.
contour_batch_size : int, optional
The batch size for the contour integration kernel. If `None`,
the batch size is set to `num_quad_points`.
use_pinned_memory : bool, optional
Whether to use pinnend memory if cupy is used.
Default is `True`.
"""
def __init__(
self,
r_o: float,
r_i: float,
m_0: int,
num_quad_points: int,
num_threads_contour: int = 1024,
eig_compute_location: str = "numpy",
project_compute_location: str = "numpy",
use_qr: bool = False,
contour_batch_size: int | None = None,
use_pinned_memory: bool = True,
):
"""Initializes the Beyn NEVP solver."""
self.r_o = r_o
self.r_i = r_i
self.m_0 = m_0
self.num_quad_points = num_quad_points
self.num_threads_contour = num_threads_contour
self.eig_compute_location = eig_compute_location
self.project_compute_location = project_compute_location
self.use_qr = use_qr
if contour_batch_size is None:
contour_batch_size = num_quad_points
self.contour_batch_size = contour_batch_size
contour_counts, _ = get_section_sizes(
num_quad_points, int(np.ceil(num_quad_points / contour_batch_size))
)
self.contour_displacements = np.cumsum(
np.concatenate(([0], np.array(contour_counts)))
)
self.use_pinned_memory = use_pinned_memory
self.rng = xp.random.default_rng(42)
def _project_svd(
self, P_0: NDArray, P_1: NDArray
) -> tuple[list[NDArray], list[NDArray]]:
"""Projects the systems onto the linear subspace with an SVD.
Parameters
----------
P_0 : NDArray
The first moment of the system.
P_1 : NDArray
The second moment of the system.
Returns
-------
a : list[NDArray]
The projected systems.
u_out : list[NDArray]
The projectors.
"""
batchsize = P_0.shape[0]
d = P_0.shape[-1]
# Perform an SVD on the linear subspace projector.
u, s, vh = linalg.svd(
P_0,
full_matrices=False,
compute_module=self.project_compute_location,
use_pinned_memory=self.use_pinned_memory,
)
a = []
u_out = []
v_out = []
for i in range(batchsize):
ui, si, vhi = u[i], s[i], vh[i]
# Remove the zero singular values (within numerical tolerance).
eps_svd = si.max() * d * xp.finfo(P_0.dtype).eps
inds = xp.where(si > eps_svd)[0]
ui, si, vhi = ui[:, inds], si[inds], vhi[inds, :]
u_out.append(ui)
v_out.append(vhi)
# Probe second moment.
a.append(ui.conj().T @ P_1[i] @ vhi.conj().T / si)
return a, u_out
def _project_qr(self, P_0: NDArray, P_1: NDArray) -> tuple[NDArray, NDArray]:
"""Projects the systems onto the linear subspace with a QR.
Parameters
----------
P_0 : NDArray
The first moment of the system.
P_1 : NDArray
The second moment of the system.
Returns
-------
a : NDArray
The projected system.
q : NDArray
The projectors.
"""
# Perform an QR on the linear subspace projector.
q, r = linalg.qr(
P_0,
compute_module=self.project_compute_location,
use_pinned_memory=self.use_pinned_memory,
)
a = q.conj().swapaxes(-2, -1) @ P_1 @ linalg.inv(r)
return a, q
def _contour_integrate(self, a_xx: tuple[NDArray, ...]) -> tuple[NDArray, NDArray]:
"""Computes the contour integral of the operator inverse.
Parameters
----------
a_xx : tuple[NDArray, ...]
The coefficient blocks of the non-linear eigenvalue problem
from lowest to highest order.
Returns
-------
q0 : NDArray
The first moment of the contour integral.
q1 : NDArray
The second moment of the contour integral.
"""
in_type = a_xx[0].dtype
# Determine quadrature points and weights.
zeta = xp.arange(self.num_quad_points) + 1
phase = xp.exp(2j * xp.pi * (zeta + 0.5) / self.num_quad_points)
z_o = self.r_o * phase
z_i = self.r_i * phase
w = xp.ones(self.num_quad_points) / self.num_quad_points
# Reshape to broadcast over batch dimension.
z_o = z_o.reshape(1, -1, 1, 1)
z_i = z_i.reshape(1, -1, 1, 1)
w = w.reshape(1, -1, 1, 1)
q0 = xp.zeros_like(a_xx[0])
q1 = xp.zeros_like(a_xx[0])
a_xx = [a_x[:, xp.newaxis, :, :] for a_x in a_xx]
for j in range(len(self.contour_displacements) - 1):
z_oj = z_o[
:, self.contour_displacements[j] : self.contour_displacements[j + 1]
]
z_ij = z_i[
:, self.contour_displacements[j] : self.contour_displacements[j + 1]
]
w_j = w[
:, self.contour_displacements[j] : self.contour_displacements[j + 1]
]
inv_Tz_o = operator_inverse(
a_xx, z_oj, in_type, in_type, self.num_threads_contour
)
inv_Tz_i = operator_inverse(
a_xx, z_ij, in_type, in_type, self.num_threads_contour
)
q0 += xp.sum(w_j * z_oj * inv_Tz_o - w_j * z_ij * inv_Tz_i, axis=1)
q1 += xp.sum(w_j * z_oj**2 * inv_Tz_o - w_j * z_ij**2 * inv_Tz_i, axis=1)
return q0, q1
def _solve_reduced_system(
self, a: list[NDArray] | NDArray, Y: NDArray, p_back: list[NDArray] | NDArray
) -> tuple[NDArray, NDArray]:
"""Solve the reduced system
Parameters
----------
a : list[NDArray] | NDArray
The projected system.
Y : NDArray
The initial subspace guess.
p_back : list[NDArray] | NDArray
The projectors.
Returns
-------
ws : NDArray
The eigenvalues.
vs : NDArray
The eigenvectors.
"""
in_type = a[0].dtype
if isinstance(a, list):
batchsize = len(a)
else:
batchsize = a.shape[0]
# solve the reduced system
w, v = linalg.eig(
a,
compute_module=self.eig_compute_location,
use_pinned_memory=self.use_pinned_memory,
)
# Get the eigenvalues and eigenvectors.
if isinstance(a, list):
ws = xp.zeros((batchsize, self.m_0), dtype=in_type)
vs = Y.copy()
for i in range(batchsize):
len_w = len(w[i])
# Recover the full eigenvectors from the subspace.
ws[i, :len_w] = w[i]
vs[i, :, :len_w] = p_back[i] @ v[i]
else:
ws = w
vs = p_back @ v
return ws, vs
def _one_sided(self, a_xx: tuple[NDArray, ...]) -> tuple[NDArray, NDArray]:
"""Solves the polynomial eigenvalue problem through contour integration.
This method solves the non-linear eigenvalue problem defined by
the coefficient blocks `a_xx` from lowest to highest order.
Parameters
----------
a_xx : tuple[NDArray, ...]
The coefficient blocks of the non-linear eigenvalue problem
from lowest to highest order.
Returns
-------
ws : NDArray
The eigenvalues.
vs : NDArray
The eigenvectors.
"""
d = a_xx[0].shape[-1]
batchsize = a_xx[0].shape[0]
# NOTE: Here we could also use a good initial guess.
Y = self.rng.random((batchsize, d, self.m_0)) + 1j * self.rng.random(
(batchsize, d, self.m_0)
)
# Compute the contour integral.
q0, q1 = self._contour_integrate(a_xx)
# Compute first and second moment.
P_0 = q0 @ Y
P_1 = q1 @ Y
# project the system
if self.use_qr:
a, p_back = self._project_qr(P_0, P_1)
else:
a, p_back = self._project_svd(P_0, P_1)
return self._solve_reduced_system(a, Y, p_back)
def __call__(self, a_xx: tuple[NDArray, ...]) -> tuple[NDArray, NDArray]:
"""Solves the polynomial eigenvalue problem through contour integration.
This method solves the non-linear eigenvalue problem defined by
the coefficient blocks `a_xx` from lowest to highest order.
Parameters
----------
a_xx : tuple[NDArray, ...]
The coefficient blocks of the non-linear eigenvalue problem
from lowest to highest order.
Returns
-------
ws : NDArray
The eigenvalues.
vs : NDArray
The right eigenvectors.
"""
# Allow for batched input.
if a_xx[0].ndim == 2:
a_xx = tuple(a_x[xp.newaxis, :, :] for a_x in a_xx)
batch_shape = a_xx[0].shape[:-2]
d = a_xx[0].shape[-1]
# allow for higher dimensional inputs
a_xx = tuple(a_x.reshape(-1, d, d) for a_x in a_xx)
if d < self.m_0 and self.use_qr:
warnings.warn(
f"Subspace guess {self.m_0} is larger than the "
f"dimension of the system {a_xx[0].shape[-1]}. "
f"Setting subspace guess to {a_xx[0].shape[-1]}."
)
self.old_m_0 = self.m_0
self.m_0 = a_xx[0].shape[-1]
ws, vs = self._one_sided(a_xx)
ws = ws.reshape(*batch_shape, self.m_0)
vs = vs.reshape(*batch_shape, d, self.m_0)
# reset subspace guess
if hasattr(self, "old_m_0"):
self.m_0 = self.old_m_0
return ws, vs