Source code for optframework.dpbe.dpbe_post

# -*- coding: utf-8 -*-
"""
Created on Thu Jul 18 11:11:33 2024

@author: Administrator
"""
import numpy as np
import math
from optframework.utils.func.func_math import float_in_list, float_equal, isZero

[docs]class DPBEPost: def __init__(self, base): self.base = base
[docs] def init_post_params(self): pass
[docs] def calc_v_uni(self, unit_trans=True): """ Calculate unique volume values for a given DPBESolver. """ if unit_trans: ## Converting cubic meters to cubic micrometers unit_scale = 1e18 else: unit_scale = 1 return np.setdiff1d(self.base.V, [-1])*unit_scale
[docs] def calc_x_uni(self, unit_trans=True): """ Calculate unique particle diameters from volume values for a given DPBESolver. """ v_uni = self.calc_v_uni(unit_trans) # Because the length unit in the experimental data is millimeters # and in the simulation it is meters, so it needs to be converted # before use. # x_uni = np.zeros(len(v_uni)) x_uni = (6*v_uni/np.pi)**(1/3) return x_uni
[docs] def calc_Qx(self, x_uni, qx=None, sum_uni=None): """ Calculate the cumulative distribution Qx from qx or sum_uni distribution data. """ Qx = np.zeros_like(qx) if qx is not None else np.zeros_like(sum_uni) if qx is None: Qx = np.cumsum(sum_uni)/sum_uni.sum() else: for i in range(1, len(Qx)): # Qx[i] = np.trapz(qx[:i+1], x_uni[:i+1]) ## Euler backward Qx[i] = Qx[i-1] + qx[i] * (x_uni[i] - x_uni[i-1]) return Qx
[docs] def calc_sum_uni(self, Qx, sum_total): """ Calculate the sum_uni distribution from the Qx cumulative distribution and total sum. """ sum_uni = np.zeros_like(Qx) # sum_uni[0] = sum_total * Qx[0] for i in range(1, len(Qx)): sum_uni[i] = sum_total * max((Qx[i] -Qx[i-1] ), 0) return sum_uni
[docs] def calc_qx(self, Qx, x_uni): """ Calculate the qx distribution from the Qx cumulative distribution. """ qx = np.zeros_like(Qx) qx[1:] = np.diff(Qx) / np.diff(x_uni) return qx
[docs] def re_calc_distribution(self, x_uni, qx=None, Qx=None, sum_uni=None, flag='all'): """ Recalculate distribution metrics for a given DPBESolver and distribution data. Can operate on either qx or sum_uni distribution data to calculate Qx, qx, and particle diameters corresponding to specific percentiles (x_10, x_50, x_90). Parameters ---------- x_uni : `array` Unique particle diameters. qx : `array`, optional qx distribution data. sum_uni : `array`, optional sum_uni distribution data. flag : `str`, optional Specifies which metrics to return. Defaults to 'all', can be a comma-separated list of 'qx', 'Qx', 'x_10', 'x_50', 'x_90'. Returns ------- `tuple` Selected distribution metrics based on the `flag`. Can include qx, Qx, x_10, x_50, and x_90 values. """ if qx is not None: qx_new = qx Qx_new = np.apply_along_axis(lambda qx_slice: self.calc_Qx(x_uni, qx=qx_slice), 0, qx) else: if Qx is None: Qx_new = np.apply_along_axis(lambda sum_uni_slice: self.calc_Qx(x_uni, sum_uni=sum_uni_slice), 0, sum_uni) else: Qx_new = Qx qx_new = np.apply_along_axis(lambda Qx_slice: self.calc_qx(Qx_slice, x_uni), 0, Qx_new) x_weibull, y_weibull = np.apply_along_axis(lambda Qx_slice: self.calc_weibull(Qx_slice, x_uni), 0, Qx_new) dim = qx_new.shape[1] x_10_new = np.zeros(dim) x_50_new = np.zeros(dim) x_90_new = np.zeros(dim) for idx in range(dim): x_10_new[idx] = np.interp(0.1, Qx_new[:, idx], x_uni) x_50_new[idx] = np.interp(0.5, Qx_new[:, idx], x_uni) x_90_new[idx] = np.interp(0.9, Qx_new[:, idx], x_uni) outputs = { 'qx': qx_new, 'Qx': Qx_new, 'x_10': x_10_new, 'x_50': x_50_new, 'x_90': x_90_new, 'x_weibull': x_weibull, 'y_weibull': y_weibull, } if flag == 'all': return outputs.values() else: flags = flag.split(',') return tuple(outputs[f.strip()] for f in flags if f.strip() in outputs)
[docs] def return_distribution(self, comp='all', t=0, N=None, flag='all', rel_q=False, q_type='q3'): """ Returns the particle size distribution (PSD) on a fixed grid. This function computes the PSD based on a chosen quantity: - 'q0': Number-based PSD (weight = N, i.e., V^0 × N) - 'q3': Volume-based PSD (weight = V * N, i.e., V^1 × N) - 'q6': Square-volume PSD (weight = V^2 * N) The returned values include: - 'x_uni': Unique particle diameters (in µm) - 'qx': The density distribution corresponding to the chosen q_type - 'Qx': Cumulative distribution (0–1) - 'x_10', 'x_50', 'x_90': Particle diameters corresponding to 10%, 50%, and 90% cumulative distribution, respectively (in µm) - 'sum_uni': The cumulative weight at each particle size class (number, volume, or squared-volume) Note on unit conversion: - For q3, original volumes (in m³) are converted to µm³ by multiplying by 1e18. - For q6, squared volumes (in m⁶) are converted to µm⁶ by multiplying by 1e36. - For q0, no conversion is necessary. Parameters ---------- comp : str, optional Which particles are counted. Currently, only 'all' is implemented. t : int, optional Time step index to return. N : array_like, optional The particle number distribution. If not provided, the class instance self.N is used. flag : str, optional Specifies which data to return. Options include: - 'x_uni': Unique particle diameters - 'qx': Density distribution (according to q_type) - 'Qx': Cumulative distribution - 'x_10': Particle diameter for 10% cumulative distribution - 'x_50': Particle diameter for 50% cumulative distribution - 'x_90': Particle diameter for 90% cumulative distribution - 'sum_uni': Cumulative weight in each particle size class - 'all': Returns all of the above data. rel_q : bool, optional If True, the density distribution qx will be normalized by its maximum value. q_type : str, optional The type of distribution to compute. Should be one of: - 'q0' for number-based distribution, - 'q3' for volume-based distribution, - 'q6' for square-volume distribution. The default is 'q3'. Returns ------- A tuple containing the requested arrays. If flag == 'all' then the order of return is: (x_uni, qx, Qx, x_10, x_50, x_90, sum_uni) Otherwise, only the items corresponding to the keys specified in flag (comma-separated) are returned. """ base = self.base def unique_with_tolerance(V, tol=1e-3): # Sort the flattened array and remove duplicate values within a tolerance, # using np.isclose to compare floating point numbers. V_sorted = np.sort(V.flatten()) V_unique = [V_sorted[0]] for V_val in V_sorted[1:]: if not np.isclose(V_val, V_unique[-1], atol=tol * V_sorted[0], rtol=0): V_unique.append(V_val) return np.array(V_unique) # If no N is provided, use the one from the class instance. if N is None: N = base.N # Extract unique V values from base.V (ignoring boundary value -1) if base.disc == 'geo': v_uni = np.setdiff1d(base.V.flatten(), np.array([-1.0])) else: v_uni = unique_with_tolerance(base.V) num_classes = len(v_uni) qx = np.zeros(num_classes) x_uni = np.zeros(num_classes) sum_uni = np.zeros(num_classes) ATOL = v_uni[1] * 1e-3 # Determine the exponent for V based on q_type. # q0: exponent 0; q3: exponent 1; q6: exponent 2. if q_type not in ['q0', 'q3', 'q6']: raise ValueError("Unsupported q_type. Options are 'q0', 'q3', or 'q6'.") exp = {'q0': 0, 'q3': 1, 'q6': 2}[q_type] # Precompute V_power = base.V**exp so that each weight is given by V_power * N. V_power = base.V ** exp # Loop through all grid cells and accumulate weight if comp == 'all': if base.dim == 1: for i in range(base.NS): # For each cell in 1D, weight = V_power[i] * N[i,t] weight = V_power[i] * N[i, t] # Find the index in unique v_uni (based on base.V) idx = np.where(np.isclose(v_uni, base.V[i], atol=ATOL))[0] if idx.size > 0: sum_uni[idx[0]] += weight elif base.dim == 2: for i in range(base.NS): for j in range(base.NS): weight = V_power[i, j] * N[i, j, t] idx = np.where(np.isclose(v_uni, base.V[i, j], atol=ATOL))[0] if idx.size > 0: sum_uni[idx[0]] += weight elif base.dim == 3: for i in range(base.NS): for j in range(base.NS): for k in range(base.NS): weight = V_power[i, j, k] * N[i, j, k, t] idx = np.where(np.isclose(v_uni, base.V[i, j, k], atol=ATOL))[0] if idx.size > 0: sum_uni[idx[0]] += weight else: raise ValueError("Unsupported dimension: {}".format(base.dim)) else: print('Case for comp not coded yet. Exiting') return # Unit conversion: For q3 and q6, convert V units accordingly. if q_type == 'q3': # Prevent extremely small (or negative) values, assign a small positive value based on v_uni[1] if num_classes > 1: sum_uni[sum_uni < 0] = v_uni[1] * 1e-3 sum_uni *= 1e18 # Convert from m³ to µm³. elif q_type == 'q6': if num_classes > 1: sum_uni[sum_uni < 0] = (v_uni[1] ** 2) * 1e-3 sum_uni *= 1e36 # Convert from m⁶ to µm⁶. # For q0, no conversion is needed. sum_uni[0] = 0.0 total_weight = np.sum(sum_uni) Qx = np.cumsum(sum_uni) / total_weight if total_weight != 0 else np.zeros(num_classes) # Calculate the diameter array based on particle volume. # Using the spherical volume formula: d = (6V/π)^(1/3), converting meters to micrometers. x_uni[0] = 0 if num_classes > 1: x_uni[1:] = (6 * v_uni[1:] / np.pi) ** (1/3) * 1e6 # Compute the density distribution from the cumulative distribution. if num_classes > 1: qx[1:] = np.diff(Qx) / np.diff(x_uni) # Obtain the x10, x50, and x90 values by interpolation. x_10 = np.interp(0.1, Qx, x_uni) x_50 = np.interp(0.5, Qx, x_uni) x_90 = np.interp(0.9, Qx, x_uni) x_weibull, y_weibull = self.calc_weibull(Qx, x_uni) if rel_q: max_qx = np.max(qx) if max_qx != 0: qx = qx / max_qx outputs = { 'x_uni': x_uni, 'qx': qx, 'Qx': Qx, 'x_10': x_10, 'x_50': x_50, 'x_90': x_90, 'sum_uni': sum_uni, 'x_weibull': x_weibull, 'y_weibull': y_weibull } if flag == 'all': return outputs.values() else: flags = flag.split(',') return tuple(outputs[f.strip()] for f in flags if f.strip() in outputs)
## Return total number. For t=None return full array, else return total number at time index t
[docs] def return_N_t(self,t=None): base = self.base # 1-D case if base.dim == 1: base.N[0,:] = 0.0 if t is None: return np.sum(base.N,axis=0) else: return np.sum(base.N[:,t]) # 2-D case elif base.dim == 2: base.N[0,0,:] = 0.0 if t is None: return np.sum(base.N,axis=(0,1)) else: return np.sum(base.N[:,:,t]) # 3-D case elif base.dim == 3: base.N[0,0,0,:] = 0.0 if t is None: return np.sum(base.N,axis=(0,1,2)) else: return np.sum(base.N[:,:,:,t])
# Calculate distribution moments related to particle volume mu(i,j,t)
[docs] def calc_mom_t(self): base = self.base mu = np.zeros((3,3,len(base.t_vec))) # Time loop for t in range(len(base.t_vec)): for i in range(3): if base.dim == 1: base.N[0,:] = 0.0 mu[i,0,t] = np.sum(base.V**i*base.N[:,t]) # The following only applies for 2D and 3D case # Moment between component 1 and 3 else: for j in range(3): if base.dim == 2: base.N[0,0,:] = 0.0 mu[i,j,t] = np.sum((base.X1_vol*base.V)**i*(base.X3_vol*base.V)**j*base.N[:,:,t]) if base.dim == 3: base.N[0,0,0,:] = 0.0 mu[i,j,t] = np.sum((base.X1_vol*base.V)**i*(base.X3_vol*base.V)**j*base.N[:,:,:,t]) return mu
# Calculate distribution moments related to particle radii mu_r(i,j,t)
[docs] def calc_mom_r_t(self): base = self.base mu_r = np.zeros((3,3,len(base.t_vec))) # Time loop for t in range(len(base.t_vec)): for i in range(3): if base.dim == 1: base.N[0,:] = 0.0 mu_r[i,0,t] = np.sum(base.R**i*base.N[:,t]) # The following only applies for 2D and 3D case # Moment between component 1 and 3 else: R1 = (base.X1_vol*base.V(4*math.pi))**(1/3) R3 = (base.X3_vol*base.V(4*math.pi))**(1/3) for j in range(3): if base.dim == 2: base.N[0,0,:] = 0.0 mu_r[i,j,t] = np.sum((R1)**i*(R3)**j*base.N[:,:,t]) if base.dim == 3: base.N[0,0,0,:] = 0.0 mu_r[i,j,t] = np.sum((R1)**i*(R3)**j*base.N[:,:,:,t]) return mu_r
## Save Variables to file:
[docs] def save_vars(self,file): np.save(file,vars(self.base))
## Caculate Weibull distribution
[docs] def calc_weibull(self, Q=None, x=None, ATOL=1e-10): x_ = y_ = None if Q is not None: y_ = np.zeros_like(Q) valid_Q = (Q > ATOL) & (Q < 1 - ATOL) y_[valid_Q] = np.log(-np.log(1 - Q[valid_Q])) if x is not None: x_ = np.zeros_like(x) valid_x = x > 0 x_[valid_x] = np.log(x[valid_x]) if x is not None and Q is not None: valid = (x > 0) & (Q > ATOL) & (Q < 1 - ATOL) x_ = np.zeros_like(x) y_ = np.zeros_like(Q) x_[valid] = np.log(x[valid]) y_[valid] = np.log(-np.log(1 - Q[valid])) return x_, y_