Source code for optframework.pbm.pbm_core

# -*- coding: utf-8 -*-
"""
Created on Fri Jan 10 15:13:27 2025

"""

import numpy as np
import scipy.integrate as integrate
import optframework.utils.func.jit_pbm_rhs as jit_pbm_rhs
from optframework.utils.func.static_method import interpolate_psd

[docs]class PBMCore: """ Core computation module for Population Balance Model (PBM) solver. This class handles the core computational tasks for the PBM solver, including moment initialization and PBE solving. It provides methods for setting up initial conditions and integrating the moment equations over time. """ def __init__(self, solver): """Initialize PBMCore with reference to parent solver.""" self.solver = solver
[docs] def init_moments(self, x=None, NDF=None, NDF_shape="normal", N0=1.0, N01=1.0, N02=1.0, V0=None, x_range=(0,1), mean=0.5, std_dev=0.1, shape=2, scale=1, sigma=1, a=2, b=2, size=0.5): """ Initialize moments for the PBM solver (supports both 1D and 2D). Parameters ---------- x : numpy.ndarray, optional x-coordinates for the distribution function (1D only) NDF : numpy.ndarray, optional (Normalized) Distribution function (1D only) NDF_shape : str, optional Shape of the distribution ("normal", "gamma", "lognormal", "beta", "mono") (default: "normal") N0 : float, optional Initial total number concentration (1D only) (default: 1.0) N01 : float, optional Initial number concentration for component 1 (2D only) (default: 1.0) N02 : float, optional Initial number concentration for component 2 (2D only) (default: 1.0) V0 : float, optional Total volume of particles (1D only) x_range : tuple, optional Range of x values (default: (0,1)) mean : float, optional Mean value for normal/lognormal distribution (default: 0.5) std_dev : float, optional Standard deviation for normal distribution (default: 0.1) shape : float, optional Shape parameter for gamma distribution (default: 2) scale : float, optional Scale parameter for gamma distribution (default: 1) sigma : float, optional Standard deviation for lognormal distribution (default: 1) a : float, optional Alpha parameter for beta distribution (default: 2) b : float, optional Beta parameter for beta distribution (default: 2) size : float, optional Size parameter for mono distribution (default: 0.5) """ solver = self.solver if solver.dim == 1: self._init_moments_1d(x, NDF, NDF_shape, N0, V0, x_range, mean, std_dev, shape, scale, sigma, a, b, size) elif solver.dim == 2: self._init_moments_2d(N01, N02) else: raise ValueError(f"Unsupported dimension: {solver.dim}. Only 1D and 2D are supported.")
def _init_moments_1d(self, x, NDF, NDF_shape, N0, V0, x_range, mean, std_dev, shape, scale, sigma, a, b, size): """ Initialize 1D moments for the PBM solver. Creates distribution function and calculates initial moments for 1D systems. Handles both PSD file input and analytical distributions. """ solver = self.solver # Generate NDF if not provided if x is None or NDF is None: if solver.USE_PSD: x = np.linspace(0.0, x_range[1], 10000) if np.any(x < 0): raise ValueError("Error: Volume (x) cannot be negative!") d = np.where(x > 0, (6 * x / np.pi) ** (1/3), 0) if V0 is None: raise ValueError("Total volume of particle must be given to get PSD") NDF = np.zeros_like(d) NDF[1:] = interpolate_psd(d[1:], solver.DIST1, V0) NDF /= x[1] - x[0] else: x, NDF = self._create_distribution(NDF_shape, x_range, mean, std_dev, shape, scale, sigma, a, b, size) # Normalize the distribution moment0 = np.trapz(NDF, x) NDF /= moment0 # Set maximum x value and initialize moments array solver.x_max = 1.0 solver.moments = np.zeros((solver.n_order * 2, solver.t_num)) # Scale NDF by initial concentration and calculate moments NDF *= N0 solver.moments[:, 0] = np.array([np.trapz(NDF * (x ** k), x) for k in range(2 * solver.n_order)]) * solver.V_unit # Normalize moments and set tolerance solver.normalize_mom() solver.set_tol(solver.moments_norm[:, 0]) def _init_moments_2d(self, N01, N02): """ Initialize 2D moments for the PBM solver. Creates distribution functions for both components and calculates initial 2D moments using trapezoidal integration. """ solver = self.solver # Create normalized distribution functions for both components x1, NDF1 = solver.create_ndf(distribution="normal", x_range=(1e-2, 1e-1), mean=6e-2, std_dev=2e-2) x2, NDF2 = solver.create_ndf(distribution="normal", x_range=(1e-2, 1e-1), mean=6e-2, std_dev=2e-2) # Scale by initial concentrations NDF1 *= N01 NDF2 *= N02 # Generate 2D moment indices and initialize moments array solver.moment_2d_indices_c() mu_num = len(solver.indices) solver.moments = np.zeros((mu_num, solver.t_num)) # Calculate initial moments using 2D integration for idx in range(mu_num): k, l = solver.indices[idx, 0], solver.indices[idx, 1] solver.moments[idx, 0] = solver.trapz_2d(NDF1, NDF2, x1, x2, k, l) * solver.V_unit # Set tolerance solver.set_tol(solver.moments[:, 0]) def _create_distribution(self, NDF_shape, x_range, mean, std_dev, shape, scale, sigma, a, b, size): """ Create distribution based on shape parameter. Helper method that delegates to solver's create_ndf method with appropriate parameters for different distribution types. Returns ------- tuple (x, NDF) coordinate array and distribution values """ solver = self.solver if NDF_shape == "normal": return solver.create_ndf(distribution="normal", x_range=x_range, mean=mean, std_dev=std_dev) elif NDF_shape == "gamma": return solver.create_ndf(distribution="gamma", x_range=x_range, shape=shape, scale=scale) elif NDF_shape == "lognormal": return solver.create_ndf(distribution="lognormal", x_range=x_range, mean=mean, sigma=sigma) elif NDF_shape == "beta": return solver.create_ndf(distribution="beta", x_range=x_range, a=a, b=b) elif NDF_shape == "mono": return solver.create_ndf(distribution="mono", x_range=x_range, size=size) else: raise ValueError(f"Unsupported distribution shape: {NDF_shape}")
[docs] def init_moments_2d(self, N01=1.0, N02=1.0): """ Initialize the 2D moments for the PBM solver. .. deprecated:: This method is deprecated. Use `init_moments()` instead, which automatically handles both 1D and 2D cases based on solver.dim. Parameters ---------- N01 : float, optional Initial number concentration for component 1 (default: 1.0) N02 : float, optional Initial number concentration for component 2 (default: 1.0) """ import warnings warnings.warn("init_moments_2d() is deprecated. Use init_moments() instead.", DeprecationWarning, stacklevel=2) self.init_moments(N01=N01, N02=N02)
[docs] def solve_PBM(self, t_vec=None): """ Solve the Population Balance Model using moment equations. Integrates the moment ODEs over time using scipy.integrate.solve_ivp with RK45 method. Handles both 1D and 2D systems with appropriate right-hand-side functions. Parameters ---------- t_vec : numpy.ndarray, optional Time vector for the simulation. If None, uses solver.t_vec """ solver = self.solver if t_vec is None: t_vec = solver.t_vec t_max = solver.t_vec[-1] else: t_max = t_vec[-1] if solver.dim == 1: solver.alpha_prim = solver.alpha_prim.item() if isinstance(solver.alpha_prim, np.ndarray) else solver.alpha_prim rhs = jit_pbm_rhs.get_dMdt_1d args = (solver.x_max, solver.GQMOM, solver.GQMOM_method, solver.moments_norm_factor, solver.n_add, solver.nu, solver.COLEVAL, solver.CORR_BETA, solver.G, solver.alpha_prim, solver.SIZEEVAL, solver.V_unit, solver.X_SEL, solver.Y_SEL, solver.pl_P1, solver.pl_P2, solver.BREAKRVAL, solver.pl_v, solver.pl_q, solver.BREAKFVAL, solver.process_type) with np.errstate(divide='raise', over='raise',invalid='raise'): try: solver.RES = integrate.solve_ivp(rhs, [0, t_max], solver.moments_norm[:,0], t_eval=t_vec, args=args, method='RK45',first_step=None, atol=solver.atolarray, rtol=solver.rtolarray) t_vec = solver.RES.t y_evaluated = solver.RES.y status = True if solver.RES.status == 0 else False except (FloatingPointError, ValueError) as e: print(f"Exception encountered: {e}") y_evaluated = -np.ones((2*solver.n_order,len(t_vec))) status = False if solver.dim == 2: rhs = jit_pbm_rhs.get_dMdt_2d args = (solver.n_order, solver.indices, solver.COLEVAL, solver.CORR_BETA, solver.G, solver.alpha_prim, solver.SIZEEVAL, solver.V_unit, solver.X_SEL, solver.Y_SEL, solver.pl_P1, solver.pl_P2, solver.pl_P3, solver.pl_P4, solver.BREAKRVAL, solver.pl_v, solver.pl_q, solver.BREAKFVAL, solver.process_type) with np.errstate(divide='raise', over='raise',invalid='raise'): try: solver.RES = integrate.solve_ivp(rhs, [0, t_max], solver.moments[:,0], t_eval=t_vec, args=args, method='RK45',first_step=None,max_step=np.inf, atol=solver.atolarray, rtol=solver.rtolarray) t_vec = solver.RES.t y_evaluated = solver.RES.y status = True if solver.RES.status == 0 else False except (FloatingPointError, ValueError) as e: print(f"Exception encountered: {e}") y_evaluated = -np.ones((2*solver.n_order,len(t_vec))) status = False solver.t_vec = t_vec if hasattr(solver, "moments_norm_factor") and solver.moments_norm_factor is not None: solver.moments = y_evaluated * solver.moments_norm_factor[:, np.newaxis] / solver.V_unit else: solver.moments = y_evaluated / solver.V_unit solver.calc_status = status if not solver.calc_status: print('Warning: The integral failed to converge!')