Source code for optframework.base.dpbe_adapter

# -*- coding: utf-8 -*-
"""
Created on Tue Aug 26 12:54:06 2025

@author: px2030
"""
import os
from typing import Any, Literal
import numpy as np
import pandas as pd
from scipy.interpolate import interp1d
from .adapters_api_basics import WriteThroughAdapter
from optframework.dpbe.dpbe_base import DPBESolver
from optframework.utils.func.func_read_exp import write_read_exp

[docs]class DPBEAdapter(WriteThroughAdapter): """ Adapter for DPBESolver with role-aware handling. Roles: - "main": 2D main solver; alpha_prim [a0, a1, a2] -> [a0, a1, a1, a2] - "NM" : 1D auxiliary solver; alpha_prim takes a0 - "M" : 1D auxiliary solver; alpha_prim takes a2 For dim==2: * pl_P3 -> impl.pl_P1 (written immediately) * pl_P4 -> impl.pl_P2 (written immediately) """ def __init__(self, *, opt, role: Literal["main","NM","M"]="main", **kw: Any): impl = DPBESolver(**kw) super().__init__(impl) # Optional: name mappings # self._map.update({ # "grid_x": "V1", # "grid_y": "V3", # }) # Optional: Adapter-only field (won't write-through) self._skip.update({"role", "opt"}) self.role = role self.opt = opt # Write-through attributes self.calc_status = True # ---------- alpha_prim ---------- def set_alpha_prim(impl, value): arr = np.asarray(value) dim = impl.dim if dim is None: raise ValueError("DPBEAdapter: 'dim' must be set on impl before alpha_prim.") r = self.role if dim == 1: if arr.ndim == 0: impl.alpha_prim = float(arr) else: if arr.size < 1: raise ValueError("alpha_prim must contain at least one value for dim=1.") impl.alpha_prim = float(arr.ravel()[0]) elif dim == 2: flat = arr.ravel() if r == "main": if flat.size == 3: a0, a1, a2 = map(float, flat.tolist()) impl.alpha_prim = np.array([a0, a1, a1, a2], dtype=float) elif flat.size == 4: impl.alpha_prim = np.array(flat, dtype=float).reshape(4,) else: raise ValueError( f"alpha_prim for main 2D should have length 3 (a0,a1,a2)" f" or 4 (a0,a1,a1,a2); got {flat.size}." ) elif r == "NM": if flat.size < 1: raise ValueError("alpha_prim must contain at least a0 for NM in 2D.") impl.alpha_prim = float(flat[0]) elif r == "M": if flat.size < 3: raise ValueError("alpha_prim must contain at least a2 (index 2) for M in 2D.") impl.alpha_prim = float(flat[2]) else: raise ValueError(f"Unknown role '{r}'.") else: raise ValueError(f"Unsupported dim={dim} for DPBEAdapter alpha_prim handling.") self._setters["alpha_prim"] = set_alpha_prim if self.role == "M" and impl.dim == 2: def set_pl_P3(impl, value): impl.pl_P1 = value def set_pl_P4(impl, value): impl.pl_P2 = value self._setters["pl_P3"] = set_pl_P3 self._setters["pl_P4"] = set_pl_P4 # %% ESSENTIAL METHOD INTERFACE
[docs] def set_comp_para(self, data_path: str) -> None: """ Wrap component-parameter setup and write-through into solvers (opt.p, opt.p_NM, opt.p_M). Parameters ---------- opt : object A coordinator object that holds optimizer-wide parameters and the solver instances: - opt.p : the main solver (may be a DPBEAdapter or a raw solver) - opt.p_NM : auxiliary solver for NM (dim==1) when opt.dim > 1 - opt.p_M : auxiliary solver for M (dim==1) when opt.dim > 1 - opt.dim : problem dimension (1 or 2) - opt.USE_PSD_R, opt.R01_0, opt.R03_0, opt.R01_0_scl, opt.R03_0_scl, etc. data_path : str opt path where PSD data folder resides (expects a subfolder 'PSD_data'). Returns ------- bool True on success. Also sets `opt.set_comp_para_flag = True`. """ opt = self.opt if self.role != "main": raise TypeError(f"set_comp_para is only available on main solver adapters, not role={self.role!r}") # Always define PSD root paths so later usage (dim>1) won't hit UnboundLocalError DIST1_path = os.path.join(data_path, "PSD_data") DIST3_path = os.path.join(data_path, "PSD_data") opt.p.DIST1_path = DIST1_path opt.p.DIST3_path = DIST3_path opt.p.DIST1_name = opt.DIST1_name opt.p.DIST3_name = opt.DIST3_name # Optionally load PSD dicts if the main solver (opt.p) uses PSD files psd_dict_R01 = None psd_dict_R03 = None if opt.p.USE_PSD: dist_path_R01 = os.path.join(DIST1_path, opt.DIST1_name) dist_path_R03 = os.path.join(DIST3_path, opt.DIST3_name) if not os.path.exists(dist_path_R01): raise Exception(f"initial PSD data in path: {dist_path_R01} not found!") if not os.path.exists(dist_path_R03): raise Exception(f"initial PSD data in path: {dist_path_R03} not found!") # Load PSD dicts (expects numpy .npy with dict payload) psd_dict_R01 = np.load(dist_path_R01, allow_pickle=True).item() psd_dict_R03 = np.load(dist_path_R03, allow_pickle=True).item() # Decide radii for the main solver (opt.p) if opt.USE_PSD_R: if psd_dict_R01 is None or psd_dict_R03 is None: raise Exception("USE_PSD_R is True but PSD dictionaries were not loaded. " "Ensure opt.p.USE_PSD is True and files exist.") opt.p.R01 = psd_dict_R01[opt.R01_0] * opt.R01_0_scl opt.p.R03 = psd_dict_R03[opt.R03_0] * opt.R03_0_scl else: # Manual radii opt.p.R01 = opt.R_01 * opt.R01_0_scl opt.p.R03 = opt.R_03 * opt.R03_0_scl # If 2D, also configure the auxiliary 1D solvers (NM and M) if opt.dim > 1: # Component 1 - NM opt.p_NM.R01 = opt.p.R01 opt.p_NM.DIST1_path = DIST1_path opt.p_NM.DIST1_name = opt.p.DIST1_name # Component 2 - M opt.p_M.R01 = opt.p.R03 opt.p_M.DIST1_path = DIST3_path # Mirror the original behavior: use DIST3_name as DIST1_name for M opt.p_M.DIST1_name = opt.p.DIST3_name opt.set_comp_para_flag = True
[docs] def reset_params(self) -> None: self.impl._reset_params()
[docs] def calc_init_from_data(self, exp_data_paths, init_flag) -> None: """ Initialize the number concentration N for PBE(s) based on experimental data. This method initializes the N for both 1D and 2D PBE instances. For 2D PBE systems, the initialization assumes that the system initially contains only pure materials (i.e., no mixed particles have formed yet). As a result, the initialization of 2D PBE is effectively equivalent to performing two 1D initializations: one for the NM particles and one for the M particles. Parameters ---------- sample_num : `int` The number of sets of experimental data used for initialization. exp_data_paths : `list of str` Paths to the experimental data for initialization. init_flag : `str` The method to use for initialization: 'int' for interpolation or 'mean' for averaging the initial sets. """ opt = self.opt if opt.dim ==1: opt.p.core.calc_R() opt.p.N = np.zeros((opt.p.NS, len(opt.p.t_vec))) opt.init_N = self._set_init_N_1D(opt, opt.p, exp_data_paths, init_flag) elif opt.dim == 2: opt.p.core.calc_R() opt.p_NM.core.calc_R() opt.p_M.core.calc_R() opt.init_N_NM = self._set_init_N_1D(opt, opt.p_NM, exp_data_paths[1], init_flag) opt.init_N_M = self._set_init_N_1D(opt, opt.p_M, exp_data_paths[2], init_flag) opt.p.N = np.zeros((opt.p.NS, opt.p.NS, len(opt.p.t_vec))) opt.p_NM.N = np.zeros((opt.p.NS, len(opt.p.t_vec))) opt.p_M.N = np.zeros((opt.p.NS, len(opt.p.t_vec))) # Set the number concentration for NM and M populations at the initial time step # This assumes the system initially contains only pure materials, so no mixed particles exist opt.p.N[1:, 1, 0] = opt.p_NM.N[1:, 0] opt.p.N[1, 1:, 0] = opt.p_M.N[1:, 0] opt.init_N_2D = opt.p.N.copy()
[docs] def calc_matrix(self, init_N) -> None: if not self.opt.calc_init_N: self.impl.core.full_init(calc_alpha=False) else: if init_N is None: raise Exception("initial N is not provided") self.impl.core.calc_R() self.impl.N = init_N self.impl.core.calc_F_M() self.impl.core.calc_B_R() self.impl.core.calc_int_B_F()
[docs] def solve(self, t_vec) -> None: self.impl.core.solve_PBE(t_vec=t_vec)
[docs] def get_all_data(self, exp_data_path) -> tuple[np.ndarray, np.ndarray]: if not self.opt.exp_data: x_uni, data_exp = self._get_all_synth_data(exp_data_path) else: x_uni, data_exp = self._get_all_exp_data(exp_data_path) return x_uni, data_exp
[docs] def calc_delta_pop(self, x_uni_exp, data_exp) -> float: """ Calculate the average differences (delta) between experimental and simulated PSDs. This method loops through the experimental data and computes the average difference (delta) between the experimental PSD and the simulated PSD. It supports both single and multiple sample sets and handles optional smoothing via KDE (Kernel Density Estimation). Parameters ---------- x_uni_exp : array-like The unique particle diameters in experimental data. data_exp : array-like The experimental PSD data that will be compared with the simulated PSD. pop : :class: An instance of the PBE solver, which generates the simulated PSD. Returns ------- float The average difference (delta) between the experimental PSD and the simulated PSD, normalized by the number of particle sizes in the experimental data (`x_uni_exp`). """ opt = self.opt # If smoothing is enabled, initialize a list to store KDE objects if opt.smoothing: kde_list = [] # Get the unique particle size in PBE x_uni = self.impl.post.calc_x_uni() # Loop through time steps to collect the simulation results and convert to PSD for idt in range(opt.delta_t_start_step, opt.num_t_steps): if opt.smoothing: sum_uni = self.impl.post.return_distribution(t=idt, flag='sum_uni', q_type=opt.dist_type)[0] # Volume of particles with index=0 is 0; in theory, such particles do not exist kde = opt.opt_data.KDE_fit(x_uni[1:], sum_uni[1:]) # The qx distribution measured by the Lumisizer is typically matched using the # average values of two measurement nodes, so the corresponding conversion has also been performed here. # x_uni_m = (x_uni[:-1]+x_uni[1:]) / 2 # kde = opt.KDE_fit(x_uni_m, sum_uni[1:]) kde_list.append(kde) delta_sum = 0 # Single sample case if opt.sample_num == 1: qx_mod = np.zeros((len(x_uni_exp), opt.num_t_steps-opt.delta_t_start_step)) for idt in range(opt.num_t_steps-opt.delta_t_start_step): if opt.smoothing: qx_mod_tem = opt.opt_data.KDE_score(kde_list[idt], x_uni_exp[1:]) qx_mod[1:, idt] = qx_mod_tem else: qx_mod[:, idt] = self.impl.post.return_distribution(t=idt+opt.delta_t_start_step, flag='qx', q_type=opt.dist_type)[0] Qx = self.impl.post.calc_Qx(x_uni_exp, qx_mod[:, idt]) qx_mod[:, idt] = qx_mod[:, idt] / Qx.max() # Calculate the delta for each cost function type, if is defined. for flag, cost_func_type in opt.delta_flag: data_mod = self.impl.post.re_calc_distribution(x_uni_exp, qx=qx_mod, flag=flag)[0] delta = opt.cost_fun(data_exp, data_mod, cost_func_type, flag) delta_sum += delta return delta # Multiple sample case else: for i in range (0, opt.sample_num): qx_mod = np.zeros((len(x_uni_exp[i]), opt.num_t_steps-opt.delta_t_start_step)) for idt in range(opt.num_t_steps-opt.delta_t_start_step): if opt.smoothing: qx_mod_tem = opt.opt_data.KDE_score(kde_list[idt], x_uni_exp[i][1:]) qx_mod[1:, idt] = qx_mod_tem else: qx_mod[:, idt] = self.impl.post.return_distribution(t=idt+opt.delta_t_start_step, flag='qx')[0] Qx = self.impl.post.calc_Qx(x_uni_exp[i], qx_mod[:, idt]) qx_mod[:, idt] = qx_mod[:, idt] / Qx.max() # Calculate delta for each cost function type, if is defined. for flag, cost_func_type in opt.delta_flag: data_mod = self.impl.post.re_calc_distribution(x_uni_exp[i], qx=qx_mod, flag=flag)[0] delta = opt.cost_fun(data_exp[i], data_mod, cost_func_type, flag) delta_sum += delta delta_sum /= opt.sample_num return delta_sum
[docs] def close(self) -> None: self.impl.core._close()
# %% OPTIONAL METHOD INTERFACE
[docs] def generate_data(self, data_path=None, multi_flag=False, pop_params=None, add_info=""): """ Generates synthetic data based on simulation results, with optional noise. This method generates synthetic data based on the population parameters and simulation results. If noise is enabled, it modifies the file name to reflect the noise type and strength. The data is saved to an Excel file. For multi-dimensional optimizations, separate files for different dimensions are created. Parameters ---------- pop_params : dict, optional Parameters for the population model. If not provided, uses `self.pop_params`. add_info : str, optional Additional information to append to the file name. Default is an empty string. Returns ------- None """ if self.role != "main": raise TypeError(f"set_comp_para is only available on main solver adapters, not role={self.role!r}") opt = self.opt # Modify the file name if noise is enabled if opt.add_noise: # Modify the file name to include noise type and strength filename = f"Sim_{opt.noise_type}_{opt.noise_strength}"+add_info+".xlsx" else: filename = "Sim"+add_info+".xlsx" # Construct the full path for the output data file exp_data_path = os.path.join(data_path, filename) # Check if multi_flag is False, indicating no auxiliary 1D data if not multi_flag: # Calculate the population data based on the given parameters opt.calc_pop(opt.p, pop_params, opt.t_all) # Only proceed if the calculation status is valid if opt.core.p.calc_status: for i in range(0, opt.sample_num): # Traverse the file path if multiple samples are generated if opt.sample_num != 1: exp_data_path=opt.traverse_path(i, exp_data_path) # print(opt.exp_data_path) ## Write new data to file self._write_new_data(opt.p, exp_data_path) else: return else: # For multi-dimensional data, create multiple file paths for different datasets exp_data_paths = [ exp_data_path, exp_data_path.replace(".xlsx", "_NM.xlsx"), exp_data_path.replace(".xlsx", "_M.xlsx") ] # Calculate the population data for all dimensions opt.calc_all_pop(pop_params, opt.t_all) if opt.p.calc_status and opt.p_NM.calc_status and opt.p_M.calc_status: for i in range(0, opt.sample_num): if opt.sample_num != 1: # Traverse the file paths for multiple samples exp_data_paths = opt.core.opt_data.traverse_path(i, exp_data_paths) # Write data for each dimension to separate files self._write_new_data(opt.p, exp_data_paths[0]) self._write_new_data(opt.p_NM, exp_data_paths[1]) self._write_new_data(opt.p_M, exp_data_paths[2]) else: return
# %% INTERNAL METHOD INTERFACE def _set_init_N_1D(self, pop, exp_data_path, init_flag) -> np.ndarray: """ Initialize the number concentration N for 1D PBE based on experimental data. This method initializes the number concentration (N) for 1D PBE using experimental data. It supports two initialization methods: interpolation of the initial time points ('int') or averaging the initial data sets ('mean'). Parameters ---------- pop : object The population instance (PBE solver) for which the number concentration is being initialized. exp_data_path : str Path to the experimental data file for initialization. init_flag : str The initialization method. Options are: - 'int': Use interpolation for initialization. - 'mean': Use the mean of the initial data sets for initialization. Returns ------- None """ opt = self.opt x_uni = pop.post.calc_x_uni() if not opt.exp_data: # If only one sample exists, initialize N based on the first few time points if opt.sample_num == 1: # Exclude the zero point and extrapolate the initial conditions x_uni_exp, sumN_uni_init_sets = opt.opt_data.read_exp(exp_data_path, opt.t_init[1:]) else: # For multiple samples, average the initial data values exp_data_path=opt.opt_data.traverse_path(0, exp_data_path) x_uni_exp, sumN_uni_tem = opt.opt_data.read_exp(exp_data_path, opt.t_init[1:]) sumN_uni_all_samples = np.zeros((len(x_uni_exp), len(opt.t_init[1:]), opt.sample_num)) sumN_uni_all_samples[:, :, 0] = sumN_uni_tem # Loop through remaining samples and average the data sets for i in range(1, opt.sample_num): exp_data_path=opt.opt_data.traverse_path(i, exp_data_path) _, sumN_uni_tem = opt.opt_data.read_exp(exp_data_path, opt.t_init[1:]) sumN_uni_all_samples[:, :, i] = sumN_uni_tem sumN_uni_init_sets = sumN_uni_all_samples.mean(axis=2) sumN_uni_init = np.zeros(len(x_uni)) # Initialize based on interpolation of the time points if init_flag == 'int': for idx in range(len(x_uni_exp)): interp_time = interp1d(opt.t_init[1:], sumN_uni_init_sets[idx, :], kind='linear', fill_value="extrapolate") sumN_uni_init[idx] = interp_time(0.0) # Initialize based on the mean of the initial data sets elif init_flag == 'mean': sumN_uni_init = sumN_uni_init_sets.mean(axis=1) # Interpolate the experimental data onto the dPBE grid inter_grid = interp1d(x_uni_exp, sumN_uni_init, kind='linear', fill_value="extrapolate") sumN_uni_init = inter_grid(x_uni) else: Q3_init_exp, x_uni_exp = opt.opt_data.read_exp(exp_data_path, opt.t_init[1:]) inter_grid = interp1d(x_uni_exp.flatten(), Q3_init_exp, kind='linear', fill_value="extrapolate") Q3_init_mod = inter_grid(x_uni) Q3_init_mod[np.where(Q3_init_mod < 0)] = 0.0 Q3_init_mod = Q3_init_mod / Q3_init_mod.max() sumN_uni_init = np.zeros(x_uni.shape) sumN_uni_init[1:] = np.diff(Q3_init_mod) * opt.p.V01 *1e18 / (np.pi * x_uni[1:]**3 /6) N_init = np.zeros((pop.NS, len(pop.t_vec))) N_init[:, 0]= sumN_uni_init # Set very small N values to zero thr = 1e-5 N_init[N_init < (thr * N_init[1:, 0].max())]=0 return N_init def _write_new_data(self, pop, exp_data_path): """ Saves the calculation results in the format of experimental data. This method saves the population distribution data into an Excel file in the format used for experimental data. It supports both smoothed and non-smoothed distributions, and can apply noise to the data if required. Parameters ---------- pop : :class:`optframework.dpbe` The population instance for which data is being generated. exp_data_path : str The file path where the experimental data will be saved. Returns ------- None """ opt = self.opt # Return if the population data is invalid (calculation status is False) if not pop.calc_status: return # Get particle size and volume in the dPBE grid x_uni = pop.post.calc_x_uni() v_uni = pop.post.calc_v_uni() # Format the simulation times for the output file formatted_times = write_read_exp.convert_seconds_to_time(opt.t_all) # Initialize the sumN_uni array to store particle count distributions sumN_uni = np.zeros((len(x_uni)-1, len(opt.t_all))) idt_vec = [np.where(opt.t_all == t_time)[0][0] for t_time in opt.t_vec] for idt in idt_vec[1:]: if opt.smoothing: # Get the volume distribution at the current time step sumvol_uni = pop.post.return_distribution(t=idt, flag='sum_uni')[0] # Skip index=0, as particles with volume 0 theoretically do not exist kde = opt.opt_data.KDE_fit(x_uni[1:], sumvol_uni[1:]) # Smooth the distribution using KDE and insert a zero for the first entry q3 = opt.opt_data.KDE_score(kde, x_uni[1:]) q3 = np.insert(q3, 0, 0.0) # Calculate and normalize Q3 values Q3 = self.impl.post.calc_Qx(x_uni, q3) Q3 = Q3 / Q3.max() # Calculate the final smoothed particle volume distribution sumvol_uni = self.impl.post.calc_sum_uni(Q3, sumvol_uni.sum()) # Store the particle count distribution sumN_uni[:, idt] = sumvol_uni[1:] / v_uni[1:] else: # Use the unsmoothed distribution for this time step sumN_uni[:, idt] = pop.post.return_distribution(t=idt, flag='sum_uni', q_type='q0')[0][1:] # For initialization data, do not apply smoothing for idt in opt.idt_init: sumN_uni[:, idt] = pop.post.return_distribution(t=idt, flag='sum_uni', q_type='q0')[0][1:] # Apply noise to the data if noise is enabled if opt.add_noise: sumN_uni = opt.opt_data.function_noise(sumN_uni) # Create a DataFrame for the distribution data and set the index name df = pd.DataFrame(data=sumN_uni, index=x_uni[1:], columns=formatted_times) df.index.name = 'Circular Equivalent Diameter' # Save the DataFrame as an Excel file at the specified path df.to_excel(exp_data_path) return def _get_all_synth_data(self, exp_data_path): """ Process synthetic data by reading the data and converting it to volume-based PSD. This method processes synthetic experimental data for one or multiple samples. For each sample, it reads the experimental data, converts the number-based particle size distribution (PSD) into a volume-based distribution, and then recalculates the final distribution (including q3, Q3, and x_50) based on the specified flags. The results are stored in `data_exp` for use in subsequent optimization steps. Parameters ---------- exp_data_path : str Path to the experimental data file. Returns ------- tuple of arrays - x_uni_exp: List of arrays containing unique particle sizes for each sample. - data_exp: List of arrays containing processed experimental PSD data for each sample. """ opt = self.opt # If only one sample exists, read and process the experimental data if opt.sample_num == 1: x_uni_exp, sumN_uni_exp = opt.opt_data.read_exp(exp_data_path, opt.t_vec[opt.delta_t_start_step:]) x_uni_exp = np.insert(x_uni_exp, 0, 0.0) sumN_uni_exp = np.insert(sumN_uni_exp, 0, 0.0, axis=0) # Convert number-based PSD to volume-based PSD vol_uni = np.tile((1/6)*np.pi*x_uni_exp**3, (opt.num_t_steps-opt.delta_t_start_step, 1)).T sumvol_uni_exp = sumN_uni_exp * vol_uni # Recalculate the distribution for flag, _ in opt.delta_flag: data_exp = self.impl.post.re_calc_distribution(x_uni_exp, sum_uni=sumvol_uni_exp, flag=flag)[0] # If multiple samples exist, process each one else: x_uni_exp = [] data_exp = [] for i in range (0, opt.sample_num): # Read and process experimental data for each sample exp_data_path = opt.opt_data.traverse_path(i, exp_data_path) x_uni_exp_tem, sumN_uni_exp = opt.opt_data.read_exp(exp_data_path, opt.t_vec[opt.delta_t_start_step:]) x_uni_exp_tem = np.insert(x_uni_exp_tem, 0, 0.0) sumN_uni_exp = np.insert(sumN_uni_exp, 0, 0.0, axis=0) # Convert number-based PSD to volume-based PSD vol_uni = np.tile((1/6)*np.pi*x_uni_exp_tem**3, (opt.num_t_steps-opt.delta_t_start_step, 1)).T sumvol_uni_exp = sumN_uni_exp * vol_uni # Recalculate the distribution for flag, _ in opt.delta_flag: data_exp_tem = self.impl.post.re_calc_distribution(x_uni_exp_tem, sum_uni=sumvol_uni_exp, flag=flag)[0] x_uni_exp.append(x_uni_exp_tem) data_exp.append(data_exp_tem) return x_uni_exp, data_exp ## test only for 1d batch exp data def _get_all_exp_data(self, exp_data_path): opt = self.opt (flag, cost_func_type) = opt.delta_flag[0] if opt.sample_num == 1: x_uni_exp, data_exp = opt.opt_data.read_exp(exp_data_path, opt.t_vec[opt.delta_t_start_step:]) x_uni_exp = np.insert(x_uni_exp, 0, 0.0) zero_row = np.zeros((1, data_exp.shape[1])) data_exp = np.insert(data_exp, 0, zero_row, axis=0) if flag == 'x_50' or flag == 'y_weibull': if opt.sheet_name != 'Q_x_int': ## Input is qx data_exp = self.impl.post.re_calc_distribution(x_uni_exp, qx=data_exp, flag=flag)[0] else: ## Input is Qx data_exp = self.impl.post.re_calc_distribution(x_uni_exp, Qx=data_exp, flag=flag)[0] else: x_uni_exp = [] data_exp = [] zero_row = np.zeros((1, data_exp.shape[1])) for i in range (0, opt.sample_num): # Read and process experimental data for each sample exp_data_path = opt.opt_data.traverse_path(i, exp_data_path) x_uni_exp_tem, data_exp_tem = opt.opt_data.read_exp(exp_data_path, opt.t_vec[opt.delta_t_start_step:]) x_uni_exp_tem = np.insert(x_uni_exp_tem, 0, 0.0) data_exp_tem = np.insert(data_exp_tem, 0, zero_row, axis=0) if flag == 'x_50' or flag == 'y_weibull': if opt.sheet_name != 'Q_x_int': ## Input is qx data_exp_tem_raw = self.impl.post.re_calc_distribution(x_uni_exp, qx=data_exp_tem, flag=flag)[0] else: ## Input is Qx data_exp_tem_raw = self.impl.post.re_calc_distribution(x_uni_exp, Qx=data_exp_tem, flag=flag)[0] data_exp_tem = data_exp_tem_raw x_uni_exp.append(x_uni_exp_tem) data_exp.append(data_exp_tem) return x_uni_exp, data_exp