Source code for smudgy.utils

"""Utility functions for SPH operations."""

from __future__ import annotations

from collections.abc import Sequence

import numpy as np
import numpy.typing as npt
from scipy import spatial

FloatArray = npt.NDArray[np.floating]
IntArray = npt.NDArray[np.int_]
BoxInput = float | Sequence[float] | npt.ArrayLike


[docs] def build_kdtree( points: npt.ArrayLike, boxsize: BoxInput | None = None ) -> spatial.cKDTree: """Construct a cKDTree with optional periodic box sizes. If ``boxsize`` is provided, the tree will use periodic boundary conditions. Parameters ---------- points Array of shape ``(N, D)`` with particle coordinates, where ``N`` is the number of particles and ``D`` the dimension. boxsize Scalar or ``(D,)`` array defining periodic box lengths, or ``None``. Returns ------- scipy.spatial.cKDTree Tree built from ``points``. """ return spatial.cKDTree(points, boxsize=boxsize)
[docs] def query_kdtree( tree: spatial.cKDTree, points: npt.ArrayLike, k: int ) -> tuple[FloatArray, IntArray]: """Query a cKDTree for the k nearest neighbors of given points. Parameters ---------- tree cKDTree instance to query. points Array of shape ``(M, D)`` with query coordinates, where ``M`` is the number of query positions. k Number of nearest neighbors to return. Returns ------- Tuple[numpy.ndarray, numpy.ndarray] Tuple of ``(distances, indices)`` from ``cKDTree.query``. """ return tree.query(points, k=k, workers=-1)
[docs] def shift_coordinates(coordinates: npt.ArrayLike) -> FloatArray: """Shift coordinates so the minimum per-axis value is at zero. Parameters ---------- coordinates Array of shape ``(N, D)`` with particle coordinates. Returns ------- numpy.ndarray Shifted coordinates with the same shape as ``coordinates``. Raises ------ AssertionError If the input array is not 2D. """ coordinates_array = np.asarray(coordinates) assert ( coordinates_array.ndim == 2 ), "Input coordinates must be a 2D array of shape (N, D)" return coordinates_array - coordinates_array.min(axis=0)
[docs] def coordinate_difference_with_pbc( x: npt.ArrayLike, y: npt.ArrayLike, boxsize: BoxInput ) -> FloatArray: """Compute coordinate differences with periodic boundary conditions. Parameters ---------- x Array-like coordinates. y Array-like coordinates to subtract from ``x``. boxsize Scalar or ``(D,)`` array defining periodic box lengths. Returns ------- numpy.ndarray Coordinate differences wrapped into the range $[-0.5 * boxsize, 0.5 * boxsize)$ per dimension. Raises ------ AssertionError If the input coordinate arrays do not have the same spatial dimension. If ``boxsize`` is an array, if it is not 1D or if its length does not match the coordinate dimension. """ shape_x = x.shape shape_y = y.shape assert ( shape_x[-1] == shape_y[-1] ), "Input coordinate arrays must have the same spatial dimension" # compute differences coordinate_differences = np.asarray(x) - np.asarray(y) # early exit if boxsize is None (non-periodic case) if boxsize is None: return coordinate_differences # prepare boxsize array for broadcasting if np.isscalar(boxsize): box_arr = np.array([boxsize] * shape_x[-1]) else: assert np.asarray(boxsize).ndim == 1, "'boxsize' must be a scalar or 1D array" assert ( len(boxsize) == shape_x[-1] ), "Length of 'boxsize' must match coordinate dimension" box_arr = np.asarray(boxsize) half_box = 0.5 * box_arr return (coordinate_differences + half_box) % box_arr - half_box
[docs] def compute_smoLens( tree: spatial.cKDTree, num_neighbors: int, query_positions: npt.ArrayLike | None = None, ) -> tuple[FloatArray, IntArray, FloatArray]: """Estimate smoothing length as half the distance to the Nth neighbor. Parameters ---------- tree cKDTree built from particle positions. num_neighbors Number of neighbors used for the estimate. query_positions Array of shape ``(M, D)`` with positions where smoothing length is evaluated. If ``None``, uses particle positions from the tree. Returns ------- Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] Tuple of ``(hsm, nn_inds, nn_dists)`` where ``hsm`` has shape ``(M,)``, ``nn_dists`` has shape ``(M, num_neighbors)``, and ``nn_inds`` has shape ``(M, num_neighbors)``. """ if query_positions is None: query_positions = tree.data nn_dists, nn_inds = query_kdtree(tree, query_positions, k=num_neighbors) hsm = nn_dists[:, -1] # * 0.5 return hsm, nn_inds, nn_dists
[docs] def compute_smoTens( tree: spatial.cKDTree, weights: npt.ArrayLike, num_neighbors: int, query_positions: npt.ArrayLike | None = None, ) -> tuple[FloatArray, FloatArray, FloatArray, IntArray, FloatArray]: """Compute anisotropic smoothing tensor using a covariance-based method. Implements the method from Marinho (2021), generalized for 2D and 3D. Parameters ---------- tree cKDTree built from particle positions. weights Array of shape ``(N,)`` with particle weights. num_neighbors Number of neighbors used for covariance estimation. query_positions Array of shape ``(M, D)`` where the tensor is evaluated. If ``None``, uses particle positions from the tree. Returns ------- Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray, numpy.ndarray] Tuple ``(H, eigvals, eigvecs, nn_inds, nn_dists, rel_coords)`` where ``H`` has shape ``(M, D, D)``, ``eigvals`` has shape ``(M, D)``, ``eigvecs`` has shape ``(M, D, D)``, ``nn_inds`` has shape ``(M, num_neighbors)``, ``nn_dists`` has shape ``(M, num_neighbors)``, and ``rel_coords`` has shape ``(M, num_neighbors, D)``. """ pos = tree.data boxsize = tree.boxsize spatial_dim = pos.shape[-1] if spatial_dim not in (2, 3): raise ValueError( "[smudgy] Only 2D and 3D positions are supported for anisotropic smoothing tensors." ) if query_positions is None: query_positions = pos weights = weights.flatten() if weights.ndim > 1 else weights # Find nearest neighbors nn_dists, nn_inds = query_kdtree(tree, query_positions, k=num_neighbors) neighbor_coords = pos[nn_inds] neighbor_weights = weights[nn_inds] # Account for periodic boundary conditions rel_coords = coordinate_difference_with_pbc( neighbor_coords, query_positions[:, np.newaxis, :], boxsize ) # Compute mass-weighted covariance matrix outer = np.einsum("...i, ...j -> ...ij", rel_coords, rel_coords) outer = outer * neighbor_weights[..., np.newaxis, np.newaxis] Sigma = ( np.sum(outer, axis=1) / np.sum(neighbor_weights, axis=1)[..., np.newaxis, np.newaxis] ) # Compute eigendecomposition and construct smoothing tensor H = VΛV^T # Relative regularization of the Sigma matrix, to make sure H, eigvals, and vecs are all finite eps = 1e-6 trace = np.trace(Sigma, axis1=-2, axis2=-1)[..., None, None] Sigma_reg = Sigma + eps * trace * np.eye(spatial_dim)[None, :, :] eigvals, eigvecs = np.linalg.eigh(Sigma_reg) eigvals = np.sqrt(np.clip(eigvals, 0, None)) Λ = eigvals[..., np.newaxis] * np.eye(spatial_dim) H = np.matmul(np.matmul(eigvecs, Λ), np.transpose(eigvecs, axes=(0, 2, 1))) return H, eigvals, eigvecs, nn_inds, nn_dists, rel_coords
[docs] def project_smoTens_to_2d( h_tensor: npt.ArrayLike, plane: list[int] | None = None, basis: list[Sequence[float], Sequence[float]] | None = None, ) -> tuple[FloatArray, FloatArray, FloatArray]: """Project 3D smoothing tensors onto a 2D plane. Parameters ---------- h_tensor Array of shape ``(N, 3, 3)`` with 3D smoothing tensors. plane Indices of the axes to project onto for 3D to 2D deposition. Mutually exclusive with ``basis``. basis List of basis vectors ``(e1, e2)`` spanning the projection plane. Each vector should be array-like of length 3. Mutually exclusive with ``plane``. Returns ------- Tuple[numpy.ndarray, numpy.ndarray, numpy.ndarray] Tuple ``(h_tensor_2d, eigvals, eigvecs)`` where ``h_tensor_2d`` has shape ``(N, 2, 2)``, ``eigvals`` has shape ``(N, 2)``, and ``eigvecs`` has shape ``(N, 2, 2)``. Raises ------ ValueError If neither or both of ``plane`` and ``basis`` are provided. If ``plane`` is not one of the allowed values or types or does not have length 2. If ``basis`` is not a 2-tuple of 3D vectors. """ # Validate inputs if plane is None and basis is None: raise ValueError("Either 'plane' or 'basis' must be provided") if plane is not None and basis is not None: raise ValueError( "'plane' and 'basis' are mutually exclusive, only provide one of the two" ) # Validate that plane is either list or array of length 2 with valid indices if plane is not None: if not (isinstance(plane, (list, np.ndarray)) and len(plane) == 2): raise ValueError("'plane' must be a list or array of length 2") if any(p not in [0, 1, 2] for p in plane): raise ValueError("'plane' indices must be in the range [0, 2]") # Define projection basis vectors if basis is not None: if len(basis) != 2: raise ValueError("'basis' must be a 2-tuple of vectors") e1, e2 = basis else: unit_vectors = [[1, 0, 0], [0, 1, 0], [0, 0, 1]] e1 = unit_vectors[plane[0]] e2 = unit_vectors[plane[1]] # Compute projected tensors: (P @ H^-1 @ P^T)^-1 projection_matrix = np.array([e1, e2], dtype="float32") # (2, 3) h_tensor_inv = np.linalg.inv(h_tensor) # (N, 3, 3) # Vectorized computation: P @ H_inv @ P^T for all particles temp = np.einsum( "ij,njk,lk->nil", projection_matrix, h_tensor_inv, projection_matrix ) h_tensor_2d = np.linalg.inv(temp) # (N, 2, 2) # Compute eigendecomposition of 2D tensors eigvals, eigvecs = np.linalg.eigh(h_tensor_2d) return h_tensor_2d, eigvals, eigvecs