Source code for smudgy.pointcloud

"""Core SPH operations and PointCloud class for particle-based computations."""

from collections.abc import Sequence
from typing import Any, Literal

import numpy as np
import numpy.typing as npt

from .core.backend import _call_backend
from .core.kernels import get_kernel
from .smooth import SmoothingInfo
from .utils import (
    build_kdtree,
    compute_smoLens,
    compute_smoTens,
    coordinate_difference_with_pbc,
    project_smoTens_to_2d,
    query_kdtree,
)

STRUCTURES = ("separable", "isotropic", "anisotropic")
Structure = Literal["separable", "isotropic", "anisotropic"]


[docs] class PointCloud: """Represent a collection of particles for operations.""" def __init__( self, positions: npt.NDArray[np.floating], weights: npt.NDArray[np.floating] | None = None, boxsize: float | Sequence[float] | None = None, verbose: bool = True, ) -> None: """Initialize a PointCloud container for particle-based SPH computations. Parameters ---------- positions : npt.NDArray[np.floating] Particle positions, shape (N, D). weights : npt.NDArray[np.floating] | None Particle weights (e.g. masses), shape (N,). If None, uniform weights are used. boxsize : float or Sequence[float], optional Periodic box size(s). If None, no periodicity is used. verbose : bool, default True Verbosity flag. """ self.dim = positions.shape[-1] assert ( self.dim == 2 or self.dim == 3 ), f"Particle positions must be of shape (N, 2) or (N, 3) but found {positions.shape}" self.positions = positions.astype(np.float32) self.weights = ( np.ones(self.positions.shape[0], dtype=np.float32) if weights is None else weights.astype(np.float32) ) assert ( self.weights.shape[0] == self.positions.shape[0] ), f"Shape mismatch: length of weights and positions must be the same but found: {self.weights.shape} and {self.positions.shape}" self.verbose = verbose if boxsize is None: self.periodic = False self.boxsize = None else: self.periodic = True boxsize_arr = np.asarray(boxsize) if boxsize_arr.ndim == 0: self.boxsize = np.repeat(boxsize_arr, self.dim) else: assert boxsize_arr.shape == ( self.dim, ), f"'boxsize' must be a scalar or have shape ({self.dim},), got {boxsize_arr.shape}" self.boxsize = boxsize_arr # initialize empty attributes for smoothing info and density self.supported_structures = STRUCTURES self.smoothing = SmoothingInfo() if self.verbose: periodic_str = ( f"in periodic box of size={self.boxsize}" if self.periodic else "without periodicity" ) print( f"[smudgy] Initialized {self.dim}d PointCloud with {self.positions.shape[0]} particles {periodic_str}" ) def _set_structure(self, structure: Structure) -> None: """Set the smoothing structure (isotropic, anisotropic, or separable).""" assert isinstance( structure, str ), f"'structure' must be a string but found {type(structure)}" if structure not in self.supported_structures: raise ValueError( f"'structure' must be one of {self.supported_structures}, but found '{structure}'" ) self.structure = structure def _set_kernel_name(self, kernel_name: str) -> None: """Set the name of the SPH kernel to use.""" assert isinstance( kernel_name, str ), f"'kernel_name' must be a string but found {type(kernel_name)}" self.kernel_name = kernel_name def _set_num_neighbors(self, num_neighbors: int) -> None: """Set the number of neighbors for smoothing computations.""" self.num_neighbors = num_neighbors def _check_property(self, property: str | None = None) -> None: """Ensure a global property is set before use.""" if not hasattr(self, property): raise AttributeError( f"'{property}' has not been set: either set it via 'global_setup' method or provide it as a function argument" ) def _validate_neighbors(self, nn_inds: npt.NDArray[np.integer]) -> None: """Verify that neighbor indices are within valid bounds.""" max_idx = np.max(nn_inds) if max_idx >= self.positions.shape[0]: raise IndexError( f"Neighbor index {max_idx} is out of bounds for {self.positions.shape[0]} particles. This indicates a bug in the neighbor search or input setup." ) def _check_density_computed(self, structure: Structure) -> None: """Verify that density has been computed for the requested structure.""" field = ( self.smoothing.density_iso if structure == "isotropic" else self.smoothing.density_aniso ) if field is None: raise AttributeError( f"Particle density has not been computed yet for structure '{structure}'; call 'compute_density' with 'structure={structure}' first." ) def _check_smoothing_computed(self, structure: Structure) -> None: """Verify that smoothing lengths/tensors have been computed for the structure.""" attr_map = { "separable": "smoLens", "isotropic": "smoLens", "anisotropic": "smoTens", } if getattr(self.smoothing, attr_map[structure]) is None: raise AttributeError( f"Smoothing {attr_map[structure]} has not been computed yet for structure '{structure}'; call 'compute_smoothing' with 'structure={structure}' first." ) def _resolve_structure(self, structure: Structure | None) -> Structure: """Resolve the smoothing structure from argument or global state.""" if structure is None: self._check_property("structure") return self.structure if structure not in self.supported_structures: raise ValueError( f"'structure' must be one of {self.supported_structures}, but found '{structure}'" ) return structure def _resolve_kernel_name(self, kernel_name: str | None = None) -> str: """Resolve the kernel name from argument or global state.""" if kernel_name is None: self._check_property("kernel_name") return self.kernel_name assert isinstance( kernel_name, str ), f"'kernel_name' must be a string but found {type(kernel_name)}" return kernel_name def _resolve_num_neighbors(self, num_neighbors: int | None = None) -> int: """Resolve the number of neighbors from argument or global state.""" if num_neighbors is None: self._check_property("num_neighbors") return self.num_neighbors assert ( isinstance(num_neighbors, int) and num_neighbors > 0 ), f"'num_neighbors' must be a positive integer but found {num_neighbors}" return num_neighbors def _resolve(self, val: Any, name: str) -> Any: """Resolve global properties (kernel, structure, etc.).""" if val is not None: return val if not hasattr(self, name): raise AttributeError( f"'{name}' not set; use 'global_setup' or provide as argument" ) return getattr(self, name) def _resolve_fields( self, fields: npt.ArrayLike | str | list[str] ) -> tuple[npt.NDArray[np.floating], list[int]]: """Resolve fields to a single array and return component counts.""" if isinstance(fields, (str, np.ndarray)): fields = [fields] elif not isinstance(fields, list): raise ValueError( "Invalid 'fields' argument: must be a string, list, or numpy array" ) arrays = [] field_sizes = [] for f in fields: arr = getattr(self, f) if isinstance(f, str) else np.asarray(f) arr = np.atleast_2d(arr).T if arr.ndim == 1 else np.atleast_2d(arr) arrays.append(arr) field_sizes.append(arr.shape[1]) for i, arr in enumerate(arrays): self._validate_shape(arr, f"field {i}") return np.concatenate(arrays, axis=-1), field_sizes def _validate_shape(self, arr: npt.NDArray[Any], name: str) -> None: """Ensure the first dimension of an array matches the number of particles.""" if arr.shape[0] != self.positions.shape[0]: raise ValueError( f"Length of '{name}' ({arr.shape[0]}) must match number of particles ({self.positions.shape[0]})" )
[docs] def global_setup( self, structure: Structure | None = None, kernel_name: str | None = None, num_neighbors: int | None = None, ) -> "PointCloud": """Set global parameters for SPH computations. Parameters ---------- structure : Structure, optional Smoothing structure ('separable', 'isotropic', or 'anisotropic'). kernel_name : str, optional Name of the SPH kernel to use globally. num_neighbors : int, optional Number of neighbors for smoothing length computation. Returns ------- PointCloud The current instance for method chaining. """ if structure: self._set_structure(structure) if self.verbose: print(f"[smudgy] Set structure globally to '{self.structure}'") if kernel_name: self._set_kernel_name(kernel_name) if self.verbose: print(f"[smudgy] Set kernel globally to '{self.kernel_name}'") if num_neighbors: self._set_num_neighbors(num_neighbors) if self.verbose: print( f"[smudgy] Set number of neighbors globally to {self.num_neighbors}" ) return self
def _build_tree( self, positions: npt.NDArray[np.floating], boxsize: float | Sequence[float] | None = None, ) -> None: """Build a kd-tree for neighbor searches. Parameters ---------- positions : npt.NDArray[np.floating] Particle positions, shape (N, D). boxsize : float or array-like, optional Periodic box size for the tree construction. """ tree = build_kdtree(positions, boxsize=boxsize) self.smoothing.tree = tree def _ensure_tree(self) -> Any: """Ensure a kd-tree exists for neighbor searches.""" if self.smoothing.tree is None: if self.verbose: print("[smudgy] Building kd-tree from particle positions") self._build_tree(self.positions, boxsize=self.boxsize) return self.smoothing.tree
[docs] def compute_smoothing( self, query_positions: npt.ArrayLike | None = None, num_neighbors: int | None = None, structure: Structure | None = None, ) -> None: """Compute smoothing lengths for SPH calculations. Parameters ---------- query_positions : npt.ArrayLike, optional Positions where smoothing is evaluated. If None, uses particle positions. num_neighbors : int, optional Number of neighbors for smoothing length computation. structure : Structure, optional Smoothing structure for computation. Returns ------- None Notes ----- Results are stored in the ``smoothing`` attribute. """ num_neighbors_temp = self._resolve_num_neighbors(num_neighbors) structure_temp = self._resolve_structure(structure) tree = self._ensure_tree() if self.verbose: info_str = "tensors" if structure_temp == "anisotropic" else "lengths" print( f"[smudgy] Computing smoothing {info_str} from {num_neighbors_temp} neighbors" ) kwargs = { "tree": tree, "num_neighbors": num_neighbors_temp, "query_positions": query_positions, } if structure_temp in ["separable", "isotropic"]: smoLens, nn_inds, nn_dists = compute_smoLens(**kwargs) self.smoothing.smoLens = smoLens else: # anisotropic ( smoTens, smoTens_eigvals, smoTens_eigvecs, nn_inds, nn_dists, nn_dists_vec, ) = compute_smoTens(**kwargs, weights=self.weights) self.smoothing.smoTens, self.smoothing.nn_dists_vec = smoTens, nn_dists_vec self.smoothing.smoTens_eigvals, self.smoothing.smoTens_eigvecs = ( smoTens_eigvals, smoTens_eigvecs, ) self.smoothing.nn_inds, self.smoothing.nn_dists = nn_inds, nn_dists self.smoothing.num_neighbors = num_neighbors_temp self._validate_neighbors(nn_inds)
def _get_rel_coords( self, query_positions: npt.NDArray[np.floating], positions: npt.NDArray[np.floating], ) -> npt.NDArray[np.floating]: """Compute relative coordinates between particles and query positions, respecting PBC.""" if self.periodic: return coordinate_difference_with_pbc( query_positions[:, np.newaxis, :], positions, self.boxsize, ) return query_positions[:, np.newaxis, :] - positions def _get_active_smoothing( self, structure: Structure, mask: npt.NDArray[np.bool_] | None = None ) -> npt.NDArray[np.floating]: """Retrieve the relevant smoothing data (lengths or tensors) for a given structure.""" if structure in ["separable", "isotropic"]: data = self.smoothing.smoLens else: data = self.smoothing.smoTens return data[mask] if mask is not None else data
[docs] def set_smoothing( self, structure: Structure | None = None, smoLens: npt.ArrayLike | None = None, smoTens: npt.ArrayLike | None = None, smoTens_eigvals: npt.ArrayLike | None = None, smoTens_eigvecs: npt.ArrayLike | None = None, ) -> None: """Manually assign smoothing lengths or tensors to particles. Parameters ---------- structure : Structure, optional Smoothing structure. Required if setting `smoLens` or `smoTens`. smoLens : npt.ArrayLike, optional Isotropic smoothing lengths, shape (N,). smoTens : npt.ArrayLike, optional Anisotropic smoothing tensors, shape (N, D, D). smoTens_eigvals : npt.ArrayLike, optional Eigenvalues of the smoothing tensors, shape (N, D). smoTens_eigvecs : npt.ArrayLike, optional Eigenvectors of the smoothing tensors, shape (N, D, D). """ # for smoLens, structure must be set and either 'separable' or 'isotropic' if smoLens: assert ( structure == "isotropic" ), "Structure must be specified when providing 'smoLens'" self._validate_shape(np.asarray(smoLens), "smoLens") self.smoothing.smoLens = np.asarray(smoLens, dtype=np.float32) if smoTens: assert ( structure == "anisotropic" ), "Structure must be specified when providing 'smoTens'" self._validate_shape(np.asarray(smoTens), "smoTens") self.smoothing.smoTens = np.asarray(smoTens, dtype=np.float32) if smoTens_eigvals: self._validate_shape(np.asarray(smoTens_eigvals), "smoTens_eigvals") self.smoothing.smoTens_eigvals = np.asarray( smoTens_eigvals, dtype=np.float32 ) if smoTens_eigvecs: self._validate_shape(np.asarray(smoTens_eigvecs), "smoTens_eigvecs") self.smoothing.smoTens_eigvecs = np.asarray( smoTens_eigvecs, dtype=np.float32 )
[docs] def add_fields( self, names: str | list[str], values: npt.ArrayLike | list[npt.ArrayLike] ) -> None: """Add one or multiple custom fields to the PointCloud instance. Parameters ---------- names : str or list of str Name(s) of the field(s) to add. values : array_like or list of array_like Field values. Each array must have shape (N,) or (N, num_components). """ # --- Case 1: multiple fields --- if isinstance(names, (list, tuple)) or isinstance(values, (list, tuple)): if not ( isinstance(names, (list, tuple)) and isinstance(values, (list, tuple)) ): raise ValueError( "If passing multiple fields, both 'names' and 'values' must be lists/tuples." ) if len(names) != len(values): raise ValueError("'names' and 'values' must have the same length.") for name, val in zip(names, values): self.add_fields(name, val) # recursive call return # --- Case 2: single field --- name = names values_arr = np.asarray(values, dtype=np.float32) if hasattr(self, name): print(f"Overwriting existing attribute '{name}' on PointCloud instance.") self._validate_shape(values_arr, name) setattr(self, name, values_arr)
[docs] def delete_fields(self, names: str | list[str]) -> None: """Delete one or multiple custom fields from the PointCloud instance. Parameters ---------- names : str or list of str Name(s) of the field(s) to delete. """ # --- Case 1: multiple fields --- if isinstance(names, (list, tuple)): for name in names: self.delete_fields(name) # recursive call return # --- Case 2: single field --- name = names if hasattr(self, name): delattr(self, name) else: print( f"No attribute named '{name}' found on PointCloud instance to delete." )
[docs] def compute_density( self, kernel_name: str | None = None, structure: Structure | None = None, ) -> None: """Compute particle densities using SPH kernels. Parameters ---------- kernel_name : str, optional Name of the SPH kernel to use for density computation. structure : Structure, optional Smoothing structure specifier. Notes ----- Results are stored in the `smoothing` attribute. """ st = self._resolve(structure, "structure") kn = self._resolve(kernel_name, "kernel_name") kernel = get_kernel(kn, dim=self.dim) self._check_smoothing_computed(st) nn_inds = self.smoothing.nn_inds kwargs = { "r_ij": ( self.smoothing.nn_dists_vec if st == "anisotropic" else self.smoothing.nn_dists ), "h": ( self.smoothing.smoTens if st == "anisotropic" else self.smoothing.smoLens ), "mode": st, } # r_ij = (N, 8) # h = (N,) if self.verbose: print(f"[smudgy] Computing density using {st} '{kernel.name}' kernel") density = np.sum(self.weights[nn_inds] * kernel.evaluate(**kwargs), axis=1) self.smoothing.kernel_name = kn if st == "anisotropic": self.smoothing.density_aniso = density else: self.smoothing.density_iso = density
[docs] def interpolate_fields( self, fields: npt.ArrayLike | str | list[str], query_positions: npt.ArrayLike = None, compute_gradients: bool = False, structure: Structure | None = None, ) -> npt.NDArray[np.floating]: """Interpolate particle fields to query positions using SPH. Parameters ---------- fields : Union[npt.ArrayLike, str, List[str]] Field data to interpolate. Can be an array, string name, or list of both. query_positions : npt.ArrayLike, optional Array of shape (M, D) with positions where fields are interpolated. compute_gradients : bool, default False Whether to compute field gradients instead of values. structure : Structure, optional Smoothing structure for interpolation. Returns ------- npt.NDArray[np.floating] Interpolated field values (shape M, F) or gradients (shape M, F, D). """ # check that structure is set either globally or via argument structure_temp = self._resolve_structure(structure) # check that density has been computed for the chosen structure self._check_density_computed(structure_temp) # load kernel from the smoothing class, since it has been used to compute density already and we want to ensure consistency between density computation and interpolation kernel_temp = get_kernel(self.smoothing.kernel_name, dim=self.dim) # cast fields to correct input format fields, fields_sizes = self._resolve_fields(fields) # if query_positions is None, use particle positions for interpolation (i.e. return smoothed fields at particle positions) # for compute_gradients = True, this allows to compute smoothed gradients at particle positions if query_positions is None: query_positions = self.positions # can re-use the neighbor indices and distances from the smoothing computation nn_inds = self.smoothing.nn_inds nn_dists = self.smoothing.nn_dists else: # for new query positions, need to perform a new neighbor search tree = self._ensure_tree() nn_dists, nn_inds = query_kdtree( tree, query_positions, k=self.num_neighbors, ) # prepare interpolation weights and fields fields_ = fields[nn_inds] if structure_temp == "anisotropic": density_temp = self.smoothing.density_aniso else: density_temp = self.smoothing.density_iso weights = self.weights[nn_inds] / (density_temp[nn_inds] + 1e-8) # select kernel arguments kwargs = dict(mode=structure_temp) if structure_temp == "anisotropic" or compute_gradients: rel_coords = self._get_rel_coords( query_positions, self.positions[nn_inds], ) kwargs["r_ij"] = ( rel_coords if (structure_temp == "anisotropic" or compute_gradients) else nn_dists ) kwargs["h"] = ( self.smoothing.smoTens[nn_inds] if structure_temp == "anisotropic" else self.smoothing.smoLens[nn_inds] ) if self.verbose: grad_str = "gradients of " if compute_gradients else "" print( f"[smudgy] Interpolating {grad_str}fields at query positions using {structure_temp} '{kernel_temp.name}' kernel" ) w = ( kernel_temp.evaluate_gradient(**kwargs) if compute_gradients else kernel_temp.evaluate(**kwargs) ) einsum_str = ( "...kf,...kd,...k->...fd" if compute_gradients else "...kf,...k,...k->...f" ) return np.einsum(einsum_str, fields_, w, weights)
[docs] def interpolate_gradient_fields( self, fields: npt.ArrayLike, query_positions: npt.ArrayLike, structure: Structure | None = None, ) -> npt.NDArray[np.floating]: """Compute gradients of particle fields at query positions using SPH. Parameters ---------- fields : npt.ArrayLike Field data to differentiate. query_positions : npt.ArrayLike Positions where gradients are evaluated. structure : Structure, optional Smoothing structure for interpolation. Returns ------- npt.NDArray[np.floating] Interpolated field gradients, shape (M, F, D). """ return self.interpolate_fields( fields=fields, query_positions=query_positions, compute_gradients=True, structure=structure, )
def _resolve_deposition_method( self, kernel_name: str | None, structure: Structure | None ) -> tuple[str, str]: """Map kernel and structure to a specific deposition method and backend kernel name.""" kn = self._resolve_kernel_name(kernel_name) if kn == "ngp": return "ngp", kn st = self._resolve_structure(structure) dual_kernels = ("tophat", "tsc", "gaussian") is_dual = any(k in kn for k in dual_kernels) is_separable = st == "separable" if is_separable and not is_dual: raise ValueError( f"Structure 'separable' is incompatible with spherically symmetric kernel '{kn}'" ) method = st if is_dual and is_separable: kn += "_separable" return method, kn def _resolve_averaged( self, averaged: bool | Sequence[bool], field_sizes: Sequence[int], ) -> npt.NDArray[np.bool_]: """Resolve averaged flags to one bool per scalar field component.""" total_components = sum(field_sizes) # Single bool -> broadcast everywhere if isinstance(averaged, bool): return np.full(total_components, averaged, dtype=np.bool_) if not isinstance(averaged, (list, tuple)): raise TypeError( f"'averaged' must be a bool or sequence of bools, found {type(averaged)}" ) # Case 1: # User provided one bool per scalar component already if len(averaged) == total_components: return np.asarray(averaged, dtype=np.bool_) # Case 2: # User provided one bool per field -> broadcast components if len(averaged) == len(field_sizes): resolved = [] for avg, size in zip(averaged, field_sizes): resolved.extend([avg] * size) return np.asarray(resolved, dtype=np.bool_) raise ValueError( f"Length of 'averaged' ({len(averaged)}) must match either " f"the number of fields ({len(field_sizes)}) or the total number " f"of scalar components ({total_components})" ) def _prepare_deposition_smoothing( self, method: str, adaptive: bool, num_p: int, gn: npt.NDArray[np.int32], d_lens: npt.NDArray[np.float32], mask: npt.NDArray[np.bool_], dep_dim: int, plane_projection: list[int] | None = None, plane_projection_basis: npt.NDArray[np.float32] | None = None, ) -> tuple[ npt.NDArray[np.float32] | None, npt.NDArray[np.float32] | None, npt.NDArray[np.float32] | None, ]: """Prepare smoothing data (fixed or adaptive) for the deposition call.""" # First two are NGP and uniform methods like Cloud-in-Cell and TSC where no smoothing info is needed if method == "ngp": return None, None, None if not adaptive: spacing = d_lens / gn if method == "separable": return ( np.repeat((spacing * 1.0)[np.newaxis, :], num_p, axis=0).astype( np.float32 ), None, None, ) # for isotropic we average the spacing across all dimensions # TODO: can do better here... else: return ( np.full(num_p, np.mean(spacing), dtype=np.float32), None, None, ) # Otherwise we gather smoothing info based on selected structure self._check_smoothing_computed(method if method != "separable" else "isotropic") if method == "separable": return ( np.repeat(self.smoothing.smoLens[mask][:, np.newaxis], dep_dim, axis=1), None, None, ) if method == "isotropic": return (self.smoothing.smoLens[mask], None, None) if method == "anisotropic": if plane_projection is not None or plane_projection_basis is not None: _, vals, vecs = project_smoTens_to_2d( self.smoothing.smoTens[mask], plane=plane_projection, basis=plane_projection_basis, ) return None, vals, vecs else: return ( None, self.smoothing.smoTens_eigvals[mask], self.smoothing.smoTens_eigvecs[mask], ) else: raise ValueError(f"Unsupported deposition method '{method}'")
[docs] def deposit_to_grid( self, fields: npt.ArrayLike | str | list[str], averaged: bool | Sequence[bool], gridnums: int | Sequence[int], extent: Sequence[Sequence[float]] | None = None, kernel_name: str | None = None, structure: Structure | None = None, adaptive: bool = True, plane_projection: list[int] | None = None, plane_projection_basis: ( list[Sequence[float] | npt.NDArray[np.float32]] | None ) = None, integration_method: str = "midpoint", num_kernel_evaluations_per_axis: int = 4, eta_crit: float = 1.0, return_weights: bool = False, use_python: bool = False, use_openmp: bool = True, omp_threads: int | None = None, ) -> ( npt.NDArray[np.floating] | tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]] ): """Deposit particle fields onto a structured grid using SPH. Parameters ---------- fields : Union[npt.ArrayLike, str, List[str]] Field data to deposit. averaged : Union[bool, Sequence[bool]] Whether to divide the result by weights for each field component. gridnums : Union[int, Sequence[int]] Number of cells along each axis. extent : Optional[Sequence[Sequence[float]]], optional Domain bounds [[xmin, xmax], [ymin, ymax], ...]. If None, uses `boxsize`. kernel_name : str, optional SPH kernel name. structure : Structure, optional Smoothing structure for deposition. adaptive : bool, default False Whether to use adaptive smoothing from the instance. plane_projection : List[int], optional Indices of the axes to project onto for 3D to 2D deposition. plane_projection_basis : List[Sequence[float] | npt.NDArray[np.float32]], optional This feature is currently not supported. Placeholder for future functionality to specify a custom projection basis. integration_method : str, default 'midpoint' Kernel integration method. num_kernel_evaluations_per_axis : int, default 4 Resolution for kernel integration. eta_crit : float, default 1.0 Anti-aliasing threshold to switch from sampled to full numerical quadrature. return_weights : bool, default False If True, returns the weights (density) grid as well. use_python : bool, default False Whether to force the Python instead of the C++ backend. use_openmp : bool, default True Whether to use multi-threading in the C++ backend. omp_threads : int, optional Number of threads for OpenMP. Returns ------- Union[npt.NDArray[np.floating], tuple[npt.NDArray[np.floating], npt.NDArray[np.floating]]] Deposited field grid, and optionally the weights grid. """ method, kn_res = self._resolve_deposition_method(kernel_name, structure) fields, fields_sizes = self._resolve_fields(fields) # adaptive = False is only supported for structure = separable or isotropic but not anisotropic if not adaptive and method == "anisotropic": raise ValueError( "Adaptive smoothing must be enabled for 'anisotropic' deposition, but found `adaptive`=False" ) if omp_threads is not None and omp_threads < 1: raise ValueError(f"omp_threads must be >= 1, got {omp_threads}") # validate mutual exclusivity of projection arguments if plane_projection_basis is not None: raise ValueError( "Specifying 'plane_projection_basis' is currently not supported." ) # resolve the averaged argument to a list of bools matching the number of field components averaged = self._resolve_averaged(averaged, fields_sizes) # construct the mask of particles falling into extent if extent is None: if self.boxsize is None: raise ValueError("Either 'boxsize' must be set or 'extent' provided") box = np.atleast_1d(self.boxsize).astype(np.float32) if box.size == 1: box = np.repeat(box, self.dim) domain_min, domain_max, periodic = ( np.zeros(self.dim, dtype=np.float32), box, bool(self.periodic), ) else: ext = np.asarray(extent, dtype=np.float32) domain_min, domain_max, periodic = ext[:, 0], ext[:, 1], False d_lens = domain_max - domain_min mask = np.all( (self.positions >= domain_min) & (self.positions <= domain_max), axis=1, ) pos_temp = self.positions[mask] - domain_min weights_temp = self.weights[mask] fields_temp = fields[mask] # if plane_projection is set, collect relevant axes if plane_projection: # cast to array and ensure it's 1D plane_projection = np.atleast_1d(plane_projection).astype(int) # assert that given axes are all different assert ( plane_projection[0] != plane_projection[1] ), "Plane projection axes must be unique, but found duplicates in {plane_projection}" # assert that given axes are valid indices for the position dimension assert np.all(plane_projection < self.dim) and np.all( plane_projection >= 0 ), f"Plane projection axes must be between 0 and {self.dim - 1}, but found {plane_projection}" if self.dim != 3: raise ValueError( f"Plane projection requires 3D positions, but positions are {self.dim}d" ) # fancy indexing does not work here -> use np.take to select and stack relevant axes pos_temp = np.take(pos_temp, plane_projection, axis=1) d_lens = np.take(d_lens, plane_projection).astype(np.float32) # resolve gridnums and ensure it matches deposition dimension dep_dim = pos_temp.shape[1] gn = np.atleast_1d(gridnums).astype(np.int32) if gn.size == 1: gn = np.repeat(gn, dep_dim) if gn.size != dep_dim: raise ValueError( f"Length of 'gridnums' ({gn.size}) must match deposition dimension ({dep_dim})" ) h, h_vals, h_vecs = self._prepare_deposition_smoothing( method, adaptive, pos_temp.shape[0], gn, d_lens, mask, dep_dim, plane_projection, ) # Backend execution threads = 0 if omp_threads is None else int(omp_threads) func_name = f"{method}_{dep_dim}d" if self.verbose: print( f"[smudgy] Using {'python' if use_python else 'c++'} backend for {method} deposition ({func_name})" ) fields_grid, weights_grid = _call_backend( func_name=func_name, use_python=use_python, use_openmp=use_openmp, omp_threads=threads, positions=pos_temp, quantities=fields_temp, particle_weights=weights_temp, smoothing_lengths=h, h_vals=h_vals, h_vecs=h_vecs, boxsizes=d_lens, gridnums=gn, periodic=periodic, kernel_name=kn_res, integration_method=integration_method, num_kernel_evaluations_per_axis=num_kernel_evaluations_per_axis, eta_crit=eta_crit, ) # Post-processing for i, avg in enumerate(averaged): if i < fields_grid.shape[-1] and avg: fields_grid[..., i] /= weights_grid + 1e-10 return (fields_grid, weights_grid) if return_weights else fields_grid