# -*- coding: utf-8 -*-
"""
PBE-related calculations during optimization
"""
# import os
import gc
import numpy as np
# from scipy.interpolate import interp1d
# from optframework.dpbe import DPBESolver
from optframework.base.adapters_api import make_solver
# from optframework.dpbe.dpbe_post import DPBEPost
from optframework.utils.func.print import print_highlighted
[docs]class OptPBE():
def __init__(self, base):
self.base = base
[docs] def close_pbe(self):
base = self.base
base.p.close()
del base.p
if base.dim == 2:
base.p_NM.close()
base.p_M.close()
del base.p_NM
del base.p_M
gc.collect()
[docs] def calc_pop(self, pop, params=None, t_vec=None, init_N=None):
"""
Configure and calculate the PBE.
If `calc_init_N` is set to False, the full initialization is performed
without calculating alpha. Otherwise, it calculates various terms such
as F_M, B_R, and int_B_F before solving the PBE.
Parameters
----------
pop : object
The population instance for which the PBE will be calculated.
params : dict, optional
The population parameters. If not provided, it uses the existing parameters of the population.
t_vec : array-like, optional
The time vector for which the PBE will be solved. If not provided, the default time vector is used.
Returns
-------
None
"""
self.set_pop_para(pop, params)
pop.calc_matrix(init_N)
pop.solve(t_vec)
[docs] def set_init_pop_para(self,pop_params):
"""
Initialize population parameters for all PBEs.
This method sets the population parameters for the main population (`p`)
as well as for the auxiliary populations (`p_NM` and `p_M`, if they exist).
Parameters
----------
pop_params : dict
The parameters to be applied to the populations.
Returns
-------
None
"""
self.set_pop_para(self.base.p, pop_params)
if hasattr(self, 'p_NM'):
self.set_pop_para(self.base.p_NM, pop_params)
if hasattr(self, 'p_M'):
self.set_pop_para(self.base.p_M, pop_params)
self.base.set_init_pop_para_flag = True
[docs] def set_pop_para(self, pop, params_in):
"""
Set the population parameters for a given population instance.
This method configures the population attributes based on the provided parameters.
It handles both 1D and 2D populations and adjusts specific parameters such as
`alpha_prim` and `CORR_BETA` depending on the dimensionality of the population.
Parameters
----------
pop : object
The population instance whose parameters are being set.
params_in : dict
The dictionary of population parameters to be applied.
Returns
-------
None
"""
base = self.base
params = params_in.copy()
if params is None:
return
# Set population attributes based on the parameters
params = base.check_corr_agg(params)
self.set_pop_attributes(pop, params)
# if 'corr_agg' in params:
# print_highlighted("Debug in opt_pbe.py, corr_agg still exits in params!", title="DEBUG", color="yellow")
# else:
# print_highlighted("Debug in opt_pbe.py, corr_agg does not exit in params!", title="DEBUG", color="green")
# Handle alpha_prim separately for 1D populations
# if base.dim == 1:
# if 'alpha_prim' in params:
# # Set alpha_prim based on its dimensions
# if params['alpha_prim'].ndim != 0:
# pop.alpha_prim = params['alpha_prim'][0]
# else:
# pop.alpha_prim = params['alpha_prim']
# # Handle alpha_prim and corr_agg for 2D populations
# elif base.dim == 2:
# if 'alpha_prim' in params:
# alpha_prim_value = params['alpha_prim']
# # Set alpha_prim for main, NM, and M populations
# if pop is base.p:
# alpha_prim_temp = np.zeros(4)
# alpha_prim_temp[0] = alpha_prim_value[0]
# alpha_prim_temp[1] = alpha_prim_temp[2] = alpha_prim_value[1]
# alpha_prim_temp[3] = alpha_prim_value[2]
# pop.alpha_prim = alpha_prim_temp
# elif pop is base.p_NM:
# pop.alpha_prim = alpha_prim_value[0]
# elif pop is base.p_M:
# pop.alpha_prim = alpha_prim_value[2]
# if 'pl_P3' in params and 'pl_P4' in params:
# if pop is base.p_M:
# pop.pl_P1 = params['pl_P3']
# pop.pl_P2 = params['pl_P4']
[docs] def set_pop_attributes(self, pop, params):
"""
Set attributes for a population instance from the provided parameters.
Parameters
----------
pop : object
The population instance whose attributes are being set.
params : dict
A dictionary containing the population parameters. Each key-value pair
corresponds to an attribute name and its value.
Returns
-------
None
"""
for key, value in params.items():
setattr(pop, key, value)
# def set_comp_para(self, data_path):
# """
# Set component parameters for two types of particles (labeled as NM and M).
# This method configures the particle size distribution (PSD) parameters for both particles.
# The PSD can either be loaded from specified file paths or
# default values can be used if the files are not available.
# Parameters
# ----------
# data_path : str
# The base path where PSD data files are located.
# Raises
# ------
# Exception
# If the PSD data file for NM or M particles is not found.
# Returns
# -------
# None
# """
# base = self.base
# # If PSD is used, load PSD data from the provided file paths
# if base.p.USE_PSD:
# DIST1_path = os.path.join(data_path, 'PSD_data')
# DIST3_path = os.path.join(data_path, 'PSD_data')
# dist_path_R01 = os.path.join(DIST1_path, base.p.DIST1_name)
# dist_path_R03 = os.path.join(DIST3_path, base.p.DIST3_name)
# # Raise exceptions if the PSD data files for NM or M particles are missing
# if not os.path.exists(dist_path_R01):
# raise Exception(f"initial PSD data in path: {dist_path_R01} not found!")
# if not os.path.exists(dist_path_R03):
# raise Exception(f"initial PSD data in path: {dist_path_R03} not found!")
# # Load PSD data for NM and M particles
# psd_dict_R01 = np.load(dist_path_R01,allow_pickle=True).item()
# psd_dict_R03 = np.load(dist_path_R03,allow_pickle=True).item()
# if base.USE_PSD_R:
# base.p.R01 = psd_dict_R01[base.R01_0] * base.R01_0_scl
# base.p.R03 = psd_dict_R03[base.R03_0] * base.R03_0_scl
# else:
# # If PSD is not used, set radii for NM and M particles manually
# base.p.R01 = base.R_01 * base.R01_0_scl
# base.p.R03 = base.R_03 * base.R03_0_scl
# if base.dim > 1:
# # parameter for particle component 1 - NM
# base.p_NM.R01 = base.p.R01
# base.p_NM.DIST1_path = DIST1_path
# # parameter for particle component 2 - M
# base.p_M.R01 = base.p.R03
# base.p_M.DIST1_path = DIST3_path
# base.p_M.DIST1_name = base.p_M.DIST3_name
# base.set_comp_para_flag = True
# def calc_all_R(self):
# """
# Calculate the radius for particles in all PBEs (Population Balance Equations).
# Returns
# -------
# None
# """
# self.base.p.core.calc_R()
# self.base.p_NM.core.calc_R()
# self.base.p_M.core.calc_R()
# def calc_init_from_data(self, exp_data_paths, init_flag):
# """
# Initialize the number concentration N for PBE(s) based on experimental data.
# This method initializes the N for both 1D and 2D PBE instances. For 2D PBE systems, the initialization
# assumes that the system initially contains only pure materials (i.e., no mixed particles have formed yet).
# As a result, the initialization of 2D PBE is effectively equivalent to performing two 1D initializations:
# one for the NM particles and one for the M particles.
# Parameters
# ----------
# sample_num : `int`
# The number of sets of experimental data used for initialization.
# exp_data_paths : `list of str`
# Paths to the experimental data for initialization.
# init_flag : `str`
# The method to use for initialization: 'int' for interpolation or 'mean' for averaging
# the initial sets.
# """
# base = self.base
# if base.dim ==1:
# base.p.calc_R()
# base.p.N = np.zeros((base.p.NS, len(base.p.t_vec)))
# base.init_N = self.set_init_N_1D(base.p, exp_data_paths, init_flag)
# elif base.dim == 2:
# self.calc_all_R()
# base.init_N_NM = self.set_init_N_1D(base.p_NM, exp_data_paths[1], init_flag)
# base.init_N_M = self.set_init_N_1D(base.p_M, exp_data_paths[2], init_flag)
# base.p.N = np.zeros((base.p.NS, base.p.NS, len(base.p.t_vec)))
# base.p_NM.N = np.zeros((base.p.NS, len(base.p.t_vec)))
# base.p_M.N = np.zeros((base.p.NS, len(base.p.t_vec)))
# # Set the number concentration for NM and M populations at the initial time step
# # This assumes the system initially contains only pure materials, so no mixed particles exist
# base.p.N[1:, 1, 0] = base.p_NM.N[1:, 0]
# base.p.N[1, 1:, 0] = base.p_M.N[1:, 0]
# base.init_N_2D = base.p.N.copy()
# def set_init_N_1D(self, pop, exp_data_path, init_flag):
# """
# Initialize the number concentration N for 1D PBE based on experimental data.
# This method initializes the number concentration (N) for 1D PBE using experimental data.
# It supports two initialization methods: interpolation of the initial time points ('int') or
# averaging the initial data sets ('mean').
# Parameters
# ----------
# pop : object
# The population instance (PBE solver) for which the number concentration is being initialized.
# exp_data_path : str
# Path to the experimental data file for initialization.
# init_flag : str
# The initialization method. Options are:
# - 'int': Use interpolation for initialization.
# - 'mean': Use the mean of the initial data sets for initialization.
# Returns
# -------
# None
# """
# base = self.base
# x_uni = pop.post.calc_x_uni()
# if not base.exp_data:
# # If only one sample exists, initialize N based on the first few time points
# if base.sample_num == 1:
# # Exclude the zero point and extrapolate the initial conditions
# x_uni_exp, sumN_uni_init_sets = base.opt_data.read_exp(exp_data_path, base.t_init[1:])
# else:
# # For multiple samples, average the initial data values
# exp_data_path=base.opt_data.traverse_path(0, exp_data_path)
# x_uni_exp, sumN_uni_tem = base.opt_data.read_exp(exp_data_path, base.t_init[1:])
# sumN_uni_all_samples = np.zeros((len(x_uni_exp), len(base.t_init[1:]), base.sample_num))
# sumN_uni_all_samples[:, :, 0] = sumN_uni_tem
# # Loop through remaining samples and average the data sets
# for i in range(1, base.sample_num):
# exp_data_path=base.opt_data.traverse_path(i, exp_data_path)
# _, sumN_uni_tem = base.opt_data.read_exp(exp_data_path, base.t_init[1:])
# sumN_uni_all_samples[:, :, i] = sumN_uni_tem
# sumN_uni_init_sets = sumN_uni_all_samples.mean(axis=2)
# sumN_uni_init = np.zeros(len(x_uni))
# # Initialize based on interpolation of the time points
# if init_flag == 'int':
# for idx in range(len(x_uni_exp)):
# interp_time = interp1d(base.t_init[1:], sumN_uni_init_sets[idx, :], kind='linear', fill_value="extrapolate")
# sumN_uni_init[idx] = interp_time(0.0)
# # Initialize based on the mean of the initial data sets
# elif init_flag == 'mean':
# sumN_uni_init = sumN_uni_init_sets.mean(axis=1)
# # Interpolate the experimental data onto the dPBE grid
# inter_grid = interp1d(x_uni_exp, sumN_uni_init, kind='linear', fill_value="extrapolate")
# sumN_uni_init = inter_grid(x_uni)
# else:
# Q3_init_exp, x_uni_exp = base.opt_data.read_exp(exp_data_path, base.t_init[1:])
# inter_grid = interp1d(x_uni_exp.flatten(), Q3_init_exp, kind='linear', fill_value="extrapolate")
# Q3_init_mod = inter_grid(x_uni)
# Q3_init_mod[np.where(Q3_init_mod < 0)] = 0.0
# Q3_init_mod = Q3_init_mod / Q3_init_mod.max()
# sumN_uni_init = np.zeros(x_uni.shape)
# sumN_uni_init[1:] = np.diff(Q3_init_mod) * base.p.V01 *1e18 / (np.pi * x_uni[1:]**3 /6)
# N_init = np.zeros((pop.NS, len(pop.t_vec)))
# N_init[:, 0]= sumN_uni_init
# # Set very small N values to zero
# thr = 1e-5
# N_init[N_init < (thr * N_init[1:, 0].max())]=0
# return N_init