"""SPH kernel functions and their gradients for isotropic and anisotropic smoothing."""
import math
import numpy as np
import numpy.typing as npt
[docs]
class BaseKernelClass:
"""Base class for SPH kernels."""
def __init__(self, dim: int, support: float) -> None:
"""Initialize the kernel with spatial dimension and support radius."""
if not isinstance(dim, int) or dim not in (1, 2, 3):
raise ValueError(
f"`dim` must be an integer and one of 1, 2, or 3, but found {dim} of type {type(dim)}"
)
self.dim = dim
self.support = support
self.eps = 1e-7
self.name = "base_kernel"
[docs]
def evaluate(self, r_ij, h, mode):
"""Evaluate the kernel function for given relative positions, smoothing lengths, and mode."""
# assert that r_ij is a numpy array of floats with shape (..., dim)
if not isinstance(r_ij, np.ndarray) or r_ij.dtype.kind not in "fiu":
raise ValueError(
f"`r_ij` must be a numpy array of floats, ints, or unsigned ints, but found type {type(r_ij)} with dtype {getattr(r_ij, 'dtype', None)}"
)
# assert that h is a numpy array of floats
if not isinstance(h, np.ndarray) or h.dtype.kind not in "fiu":
raise ValueError(
f"`h` must be a numpy array of floats, ints, or unsigned ints, but found type {type(h)} with dtype {getattr(h, 'dtype', None)}"
)
if mode in ["separable", "isotropic"]:
res = self._evaluate_isotropic(r_ij, h)
elif mode == "anisotropic":
res = self._evaluate_anisotropic(r_ij, h)
else:
raise ValueError(f"Invalid mode '{mode}'")
# Cast back to input dtype if it is a floating point type to avoid
# promotion to float64 caused by internal constants or sigma values.
if r_ij.dtype.kind == "f":
return res.astype(r_ij.dtype, copy=False)
return res
[docs]
def evaluate_gradient(self, r_ij, h, mode):
"""Evaluate the gradient of the kernel function for given relative positions, smoothing lengths, and mode."""
# assert that r_ij is a numpy array of floats with shape (..., dim)
if not isinstance(r_ij, np.ndarray) or r_ij.dtype.kind not in "fiu":
raise ValueError(
f"`r_ij` must be a numpy array of floats, ints, or unsigned ints, but found type {type(r_ij)} with dtype {getattr(r_ij, 'dtype', None)}"
)
# assert that h is a numpy array of floats
if not isinstance(h, np.ndarray) or h.dtype.kind not in "fiu":
raise ValueError(
f"`h` must be a numpy array of floats, ints, or unsigned ints, but found type {type(h)} with dtype {getattr(h, 'dtype', None)}"
)
if mode in ["separable", "isotropic"]:
res = self._evaluate_gradient_isotropic(r_ij, h)
elif mode == "anisotropic":
res = self._evaluate_gradient_anisotropic(r_ij, h)
else:
raise ValueError(f"Invalid mode '{mode}'")
# Cast back to input dtype if it is a floating point type to avoid
# promotion to float64 caused by internal constants or sigma values.
if r_ij.dtype.kind == "f":
return res.astype(r_ij.dtype, copy=False)
return res
# =========================================================
# isotropic
# =========================================================
def _evaluate_isotropic(self, r_ij, h):
if h.ndim == r_ij.ndim - 1:
h = h[..., None]
q = np.abs(r_ij) / (h + self.eps)
norm = h**self.dim
return self._kernel_sigma() / norm * self._kernel_values(q)
def _evaluate_gradient_isotropic(self, r_ij, h):
h = h[..., None]
r_mag = np.linalg.norm(
r_ij,
axis=-1,
keepdims=True,
)
q = r_mag / (h + self.eps)
dW_dq = self._kernel_gradient_values(q)
dW_dr = dW_dq / (h + self.eps)
er = np.zeros_like(r_ij)
np.divide(
r_ij,
r_mag,
out=er,
where=r_mag > 0,
)
norm = h**self.dim
return self._kernel_sigma() / norm * dW_dr * er
# =========================================================
# anisotropic
# =========================================================
def _evaluate_anisotropic(self, r_ij, H):
H_inv = np.linalg.inv(H)
det_H = np.linalg.det(H)
xi = np.einsum(
"...ij,...j->...i",
H_inv,
r_ij,
)
q = np.linalg.norm(xi, axis=-1)
return self._kernel_sigma() / det_H * self._kernel_values(q)
def _evaluate_gradient_anisotropic(self, r_ij, H):
H_inv = np.linalg.inv(H)
H_inv_T = np.swapaxes(H_inv, -1, -2)
det_H = np.linalg.det(H)
xi = np.einsum(
"...ij,...j->...i",
H_inv,
r_ij,
)
q = np.linalg.norm(
xi,
axis=-1,
keepdims=True,
)
dK_dq = self._kernel_gradient_values(q)
grad_q = np.einsum(
"...ij,...j->...i",
H_inv_T,
xi,
) / (q + self.eps)
return self._kernel_sigma() / det_H[..., None] * dK_dq * grad_q
def _kernel_sigma(self):
raise NotImplementedError("Must implement _kernel_sigma in subclass")
def _kernel_values(self):
raise NotImplementedError("Must implement _kernel_values in subclass")
def _kernel_gradient_values(self):
raise NotImplementedError("Must implement _kernel_gradient_values in subclass")
[docs]
class TophatKernel(BaseKernelClass):
"""Spherically symmetric Tophat kernel."""
def __init__(self, dim):
"""Initialize the Tophat kernel for the given spatial dimension."""
super().__init__(dim, support=0.5)
self.name = "tophat"
def _kernel_sigma(self) -> float:
if self.dim == 1:
return 1.0
elif self.dim == 2:
return 4.0 / math.pi
elif self.dim == 3:
return 6.0 / math.pi
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
mask = q <= self.support
return np.where(mask, 1.0, 0.0)
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
# Gradient of Tophat is a delta function at q=1, numerically problematic.
return np.zeros_like(q)
[docs]
class TophatSepKernel(BaseKernelClass):
"""Rectangular Tophat kernel (Product of 1D Tophats)."""
def __init__(self, dim):
"""Initialize the Tophat separable kernel for the given spatial dimension."""
super().__init__(dim, support=0.5)
self.name = "tophat_separable"
def _kernel_sigma(self) -> float:
return 1.0
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
# For TophatRect, support is 0.5 in each dimension.
q = np.abs(q)
mask = q <= self.support
return np.where(mask, 1.0, 0.0)
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
return np.zeros_like(q)
[docs]
class TSCKernel(BaseKernelClass):
"""Spherically symmetric triangular (tent) kernel in radius."""
def __init__(self, dim):
"""Initialize the TSC kernel for the given spatial dimension."""
super().__init__(dim, support=1.5)
self.name = "tsc"
def _kernel_sigma(self) -> float:
# normalize so integral over d-dim space = 1
if self.dim == 1:
return 1.0 / self.support
elif self.dim == 2:
return 3.0 / (np.pi * self.support**2)
elif self.dim == 3:
return 3.0 / (np.pi * self.support**3)
def _kernel_values(self, q):
q = np.abs(q)
val = np.zeros_like(q)
mask = q <= self.support
val[mask] = 1.0 - q[mask] / self.support
return val
def _kernel_gradient_values(self, q):
q = np.abs(q)
grad = np.zeros_like(q)
mask = q <= self.support
grad[mask] = -(1.0 / self.support)
return grad
[docs]
class TSCSepKernel(BaseKernelClass):
"""Separable triangle (tent) kernel with support 1.5 (not B-spline TSC)."""
def __init__(self, dim):
"""Initialize the TSC separable kernel for the given spatial dimension."""
super().__init__(dim, support=1.5)
self.name = "tsc_separable"
def _kernel_sigma(self) -> float:
# normalization so that integral over 1D = 1
# ∫_{-h}^{h} (1 - |q|/h) dq = h
# => need 1/h scaling
return 1.0 / self.support**self.dim
def _kernel_values(self, q):
q_abs = np.abs(q)
val = np.zeros_like(q_abs)
mask = q_abs <= self.support
val[mask] = 1.0 - q_abs[mask] / self.support
return val
def _kernel_gradient_values(self, q):
q_abs = np.abs(q)
sign = np.sign(q)
grad = np.zeros_like(q)
mask = q_abs <= self.support
grad[mask] = -(1.0 / self.support) * sign[mask]
return grad
[docs]
class GaussianKernel(BaseKernelClass):
"""Gaussian kernel implementation for SPH."""
def __init__(self, dim: int) -> None:
"""Initialize the Gaussian kernel for the given spatial dimension."""
super().__init__(dim=dim, support=3.0)
self.name = "gaussian"
def _kernel_sigma(self) -> float:
if self.dim == 1:
return 1.0 / math.pi**0.5
elif self.dim == 2:
return 1.0 / math.pi
elif self.dim == 3:
return 1.0 / math.pi**1.5
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
mask = q <= self.support
return np.where(mask, np.exp(-(q**2)), 0.0)
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
mask = q <= self.support
return np.where(mask, -2 * q * np.exp(-(q**2)), 0.0)
[docs]
class GaussianSepKernel(BaseKernelClass):
"""Gaussian kernel implementation for SPH."""
def __init__(self, dim: int) -> None:
"""Initialize the Gaussian separable kernel for the given spatial dimension."""
super().__init__(dim=dim, support=3.0)
self.name = "gaussian_separable"
def _kernel_sigma(self) -> float:
if self.dim == 1:
return 1.0 / math.pi**0.5
elif self.dim == 2:
return 1.0 / math.pi
elif self.dim == 3:
return 1.0 / math.pi**1.5
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
q = np.abs(q)
mask = q <= self.support
return np.where(mask, np.exp(-(q**2)), 0.0)
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
q = np.abs(q)
mask = q <= self.support
return np.where(mask, -2 * q * np.exp(-(q**2)), 0.0)
[docs]
class LucyKernel(BaseKernelClass):
"""Lucy kernel implementation for SPH."""
def __init__(self, dim: int) -> None:
"""Initialize the Lucy kernel for the given spatial dimension."""
super().__init__(dim=dim, support=1.0)
self.name = "lucy"
def _kernel_sigma(self) -> float:
"""Compute the normalization constant for the Lucy kernel."""
if self.dim == 1:
return 5.0 / 4.0
elif self.dim == 2:
return 5.0 / math.pi
elif self.dim == 3:
return 105.0 / (16.0 * math.pi)
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
mask = q <= self.support
return np.where(mask, (1 + 3 * q) * (1 - q) ** 3, 0.0)
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
mask = q <= self.support
return np.where(mask, -12 * q * (1 - q) ** 2, 0.0)
[docs]
class CubicSplineKernel(BaseKernelClass):
"""Cubic spline kernel implementation for SPH."""
def __init__(self, dim: int) -> None:
"""Initialize the Cubic spline kernel for the given spatial dimension."""
super().__init__(dim=dim, support=1.0)
self.name = "cubic_spline"
def _kernel_sigma(self) -> float:
if self.dim == 1:
return 4.0 / 3.0
elif self.dim == 2:
return 40.0 / (7.0 * math.pi)
elif self.dim == 3:
return 8.0 / math.pi
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
mask1 = q <= 0.5
mask2 = (q > 0.5) & (q <= 1)
W = np.where(mask1, 1 - 6 * q**2 + 6 * q**3, 0.0)
W = np.where(mask2, 2 * (1 - q) ** 3, W)
return W
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
mask1 = q <= 0.5
mask2 = (q > 0.5) & (q <= 1)
dW_dq = np.where(mask1, -6 * q * (2 - 3 * q), 0.0)
dW_dq = np.where(mask2, -6 * (1.0 - q) ** 2, dW_dq)
return dW_dq
[docs]
class QuinticSplineKernel(BaseKernelClass):
"""Quintic spline kernel implementation for SPH."""
def __init__(self, dim: int) -> None:
"""Initialize the Quintic spline kernel for the given spatial dimension."""
super().__init__(dim=dim, support=3.0)
self.name = "quintic_spline"
def _kernel_sigma(self) -> float:
if self.dim == 1:
return 1.0 / 120.0
elif self.dim == 2:
return 7.0 / (478.0 * math.pi)
elif self.dim == 3:
return 1.0 / (120.0 * math.pi)
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
mask1 = (q >= 0) & (q <= 1)
mask2 = (q > 1) & (q <= 2)
mask3 = (q > 2) & (q <= 3)
W = np.where(mask1, (3 - q) ** 5 - 6 * (2 - q) ** 5 + 15 * (1 - q) ** 5, 0.0)
W = np.where(mask2, (3 - q) ** 5 - 6 * (2 - q) ** 5, W)
W = np.where(mask3, (3 - q) ** 5, W)
return W
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
mask1 = q <= 1
mask2 = (q > 1) & (q <= 2)
mask3 = (q > 2) & (q <= 3)
dW_dq = np.where(
mask1, -5 * (3 - q) ** 4 + 30 * (2 - q) ** 4 - 75 * (1 - q) ** 4, 0.0
)
dW_dq = np.where(mask2, -5 * (3 - q) ** 4 + 30 * (2 - q) ** 4, dW_dq)
dW_dq = np.where(mask3, -5 * (3 - q) ** 4, dW_dq)
return dW_dq
[docs]
class WendlandC2Kernel(BaseKernelClass):
"""Wendland C2 kernel implementation for SPH."""
def __init__(self, dim: int) -> None:
"""Initialize the Wendland C2 kernel for the given spatial dimension."""
super().__init__(dim=dim, support=2.0)
self.name = "wendland_c2"
def _kernel_sigma(self) -> float:
if self.dim == 1:
return 5.0 / 8.0
elif self.dim == 2:
return 7.0 / (4.0 * math.pi)
elif self.dim == 3:
return 21.0 / (16.0 * math.pi)
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
mask = q <= 2
if self.dim == 1:
return np.where(mask, (1 - q / 2.0) ** 3 * (1.5 * q + 1), 0.0)
else:
return np.where(mask, (1 - q / 2.0) ** 4 * (2 * q + 1), 0.0)
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
mask = q <= 2
z = 1 - 0.5 * q
if self.dim == 1:
return np.where(mask, -1.5 * z**2 * (1.5 * q + 1) + 1.5 * z**3, 0.0)
else:
return np.where(mask, -2 * z**3 * (2 * q + 1) + 2 * z**4, 0.0)
[docs]
class WendlandC4Kernel(BaseKernelClass):
"""Wendland C4 kernel implementation for SPH."""
def __init__(self, dim: int) -> None:
"""Initialize the Wendland C4 kernel for the given spatial dimension."""
super().__init__(dim=dim, support=2.0)
self.name = "wendland_c4"
def _kernel_sigma(self) -> float:
if self.dim == 1:
return 3.0 / 4.0
elif self.dim == 2:
return 9.0 / (4.0 * math.pi)
elif self.dim == 3:
return 495.0 / (256.0 * math.pi)
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
mask = q <= 2
if self.dim == 1:
return np.where(mask, (1 - q / 2.0) ** 5 * (2 * q**2 + 2.5 * q + 1), 0.0)
else:
return np.where(
mask, (1 - q / 2.0) ** 6 * (35 / 12 * q**2 + 3 * q + 1), 0.0
)
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
mask = q <= 2
z = 1 - 0.5 * q
if self.dim == 1:
f, df = z**5, -2.5 * z**4
g, dg = 2 * q**2 + 2.5 * q + 1, 4 * q + 2.5
else:
f, df = z**6, -3 * z**5
g, dg = (35.0 / 12.0) * q**2 + 3 * q + 1, (35.0 / 6.0) * q + 3
return np.where(mask, df * g + f * dg, 0.0)
[docs]
class WendlandC6Kernel(BaseKernelClass):
"""Wendland C6 kernel implementation for SPH."""
def __init__(self, dim: int) -> None:
"""Initialize the Wendland C6 kernel for the given spatial dimension."""
super().__init__(dim=dim, support=2.0)
self.name = "wendland_c6"
def _kernel_sigma(self) -> float:
if self.dim == 1:
return 55.0 / 64.0
elif self.dim == 2:
return 39.0 / (14.0 * math.pi)
elif self.dim == 3:
return 1365.0 / (512.0 * math.pi)
def _kernel_values(self, q: npt.NDArray[np.floating]) -> npt.NDArray[np.floating]:
mask = q <= 2
if self.dim == 1:
return np.where(
mask,
(1 - q / 2.0) ** 7
* (21.0 / 8.0 * q**3 + 19.0 / 4.0 * q**2 + 3.5 * q + 1),
0.0,
)
else:
return np.where(
mask,
(1 - q / 2.0) ** 8 * (4.0 * q**3 + 6.25 * q**2 + 4.0 * q + 1),
0.0,
)
def _kernel_gradient_values(
self, q: npt.NDArray[np.floating]
) -> npt.NDArray[np.floating]:
mask = q <= 2
z = 1 - 0.5 * q
if self.dim == 1:
f, df = z**7, -3.5 * z**6
g, dg = (21.0 / 8.0 * q**3 + 19.0 / 4.0 * q**2 + 3.5 * q + 1), (
63.0 / 8.0 * q**2 + 19.0 / 2.0 * q + 3.5
)
else:
f, df = z**8, -4 * z**7
g, dg = (4.0 * q**3 + 6.25 * q**2 + 4.0 * q + 1), (
12.0 * q**2 + 12.5 * q + 4.0
)
return np.where(mask, df * g + f * dg, 0.0)
KERNEL_CLASSES = {
"tophat_separable": TophatSepKernel,
"tsc_separable": TSCSepKernel,
"gaussian_separable": GaussianSepKernel,
"tophat": TophatKernel,
"tsc": TSCKernel,
"gaussian": GaussianKernel,
"lucy": LucyKernel,
"cubic_spline": CubicSplineKernel,
"quintic_spline": QuinticSplineKernel,
"wendland_c2": WendlandC2Kernel,
"wendland_c4": WendlandC4Kernel,
"wendland_c6": WendlandC6Kernel,
}
[docs]
def get_kernel(kernel_name: str, dim: int) -> BaseKernelClass:
"""Return a kernel instance for the given kernel name and spatial dimension.
Parameters
----------
kernel_name : str
Name of the kernel to create.
dim : int
Spatial dimension (1, 2, or 3).
Returns
-------
BaseKernelClass
An instance of the specified kernel class.
Raises
------
If ``kernel_name`` is not recognized or if ``dim`` is not valid.
"""
if kernel_name not in KERNEL_CLASSES:
raise ValueError(
f"Invalid kernel_name '{kernel_name}'. Must be one of {list(KERNEL_CLASSES.keys())}."
)
if dim not in (1, 2, 3):
raise ValueError(f"Invalid dim '{dim}'. Must be 1, 2, or 3.")
return KERNEL_CLASSES[kernel_name](dim=dim)