# -*- coding: utf-8 -*-
"""
data-processing-related calculations during optimization
"""
import numpy as np
from sklearn.neighbors import KernelDensity
from scipy.interpolate import interp1d
from optframework.utils.func.func_read_exp import write_read_exp
[docs]class OptData:
def __init__(self, base):
self.base = base
[docs] def read_exp(self, exp_data_path, t_vec, sheet_name=None):
"""
Reads experimental data from a specified path and processes it for use in the optimization.
Parameters
----------
exp_data_path : str
Path to the experimental data file.
t_vec : array-like
The time vector corresponding to the desired time points for the experimental data.
Returns
-------
tuple of arrays
- x_uni_exp: An array of unique particle sizes from the experimental data.
- raw_data_exp: An array of the sum of number concentrations for the unique particle sizes.
"""
if sheet_name is None:
sheet_name = self.base.sheet_name
# Instantiate the write_read_exp class to handle reading and writing PSD data,
# and initialize the time format for the experimental data
exp_data = write_read_exp(exp_data_path, read=True, sheet_name=sheet_name, exp_data=self.base.exp_data)
# Extract the experimental data corresponding to the given time vector
df = exp_data.get_exp_data(t_vec)
# Get the particle sizes (x_uni_exp) and corresponding data (raw_data_exp)
x_uni_exp = df.index.to_numpy()
raw_data_exp = df.to_numpy()
return x_uni_exp, raw_data_exp
# def get_all_synth_data(self, exp_data_path):
# """
# Process synthetic data by reading the data and converting it to volume-based PSD.
# This method processes synthetic experimental data for one or multiple samples. For each sample,
# it reads the experimental data, converts the number-based particle size distribution (PSD) into
# a volume-based distribution, and then recalculates the final distribution (including q3, Q3, and
# x_50) based on the specified flags. The results are stored in `data_exp` for use in subsequent
# optimization steps.
# Parameters
# ----------
# exp_data_path : str
# Path to the experimental data file.
# Returns
# -------
# tuple of arrays
# - x_uni_exp: List of arrays containing unique particle sizes for each sample.
# - data_exp: List of arrays containing processed experimental PSD data for each sample.
# """
# base = self.base
# # If only one sample exists, read and process the experimental data
# if base.sample_num == 1:
# x_uni_exp, sumN_uni_exp = self.read_exp(exp_data_path, base.t_vec[base.delta_t_start_step:])
# x_uni_exp = np.insert(x_uni_exp, 0, 0.0)
# sumN_uni_exp = np.insert(sumN_uni_exp, 0, 0.0, axis=0)
# # Convert number-based PSD to volume-based PSD
# vol_uni = np.tile((1/6)*np.pi*x_uni_exp**3, (base.num_t_steps-base.delta_t_start_step, 1)).T
# sumvol_uni_exp = sumN_uni_exp * vol_uni
# # Recalculate the distribution
# for flag, _ in base.delta_flag:
# data_exp = base.opt_pbe.re_calc_distribution(x_uni_exp, sum_uni=sumvol_uni_exp, flag=flag)[0]
# # If multiple samples exist, process each one
# else:
# x_uni_exp = []
# data_exp = []
# for i in range (0, base.sample_num):
# # Read and process experimental data for each sample
# exp_data_path = self.traverse_path(i, exp_data_path)
# x_uni_exp_tem, sumN_uni_exp = self.read_exp(exp_data_path, base.t_vec[base.delta_t_start_step:])
# x_uni_exp_tem = np.insert(x_uni_exp_tem, 0, 0.0)
# sumN_uni_exp = np.insert(sumN_uni_exp, 0, 0.0, axis=0)
# # Convert number-based PSD to volume-based PSD
# vol_uni = np.tile((1/6)*np.pi*x_uni_exp_tem**3, (base.num_t_steps-base.delta_t_start_step, 1)).T
# sumvol_uni_exp = sumN_uni_exp * vol_uni
# # Recalculate the distribution
# for flag, _ in base.delta_flag:
# data_exp_tem = base.opt_pbe.re_calc_distribution(x_uni_exp_tem, sum_uni=sumvol_uni_exp, flag=flag)[0]
# x_uni_exp.append(x_uni_exp_tem)
# data_exp.append(data_exp_tem)
# return x_uni_exp, data_exp
## test only for 1d batch exp data
# def get_all_exp_data(self, exp_data_path):
# base = self.base
# (flag, cost_func_type) = base.delta_flag[0]
# if base.sample_num == 1:
# x_uni_exp, data_exp = self.read_exp(exp_data_path, base.t_vec[base.delta_t_start_step:])
# x_uni_exp = np.insert(x_uni_exp, 0, 0.0)
# zero_row = np.zeros((1, data_exp.shape[1]))
# data_exp = np.insert(data_exp, 0, zero_row, axis=0)
# if flag == 'x_50' or flag == 'y_weibull':
# if base.sheet_name != 'Q_x_int':
# ## Input is qx
# data_exp = base.opt_pbe.re_calc_distribution(x_uni_exp, qx=data_exp, flag=flag)[0]
# else:
# ## Input is Qx
# data_exp = base.opt_pbe.re_calc_distribution(x_uni_exp, Qx=data_exp, flag=flag)[0]
# else:
# x_uni_exp = []
# data_exp = []
# zero_row = np.zeros((1, data_exp.shape[1]))
# for i in range (0, base.sample_num):
# # Read and process experimental data for each sample
# exp_data_path = self.traverse_path(i, exp_data_path)
# x_uni_exp_tem, data_exp_tem = self.read_exp(exp_data_path, base.t_vec[base.delta_t_start_step:])
# x_uni_exp_tem = np.insert(x_uni_exp_tem, 0, 0.0)
# data_exp_tem = np.insert(data_exp_tem, 0, zero_row, axis=0)
# if flag == 'x_50' or flag == 'y_weibull':
# if base.sheet_name != 'Q_x_int':
# ## Input is qx
# data_exp_tem_raw = base.opt_pbe.re_calc_distribution(x_uni_exp, qx=data_exp_tem, flag=flag)[0]
# else:
# ## Input is Qx
# data_exp_tem_raw = base.opt_pbe.re_calc_distribution(x_uni_exp, Qx=data_exp_tem, flag=flag)[0]
# data_exp_tem = data_exp_tem_raw
# x_uni_exp.append(x_uni_exp_tem)
# data_exp.append(data_exp_tem)
# return x_uni_exp, data_exp
[docs] def function_noise(self, ori_data):
"""
Adds noise to the original data based on the
`noise_type` and `noise_strength` attributes. Supported noise types include
Gaussian ('Gaus'), Uniform ('Uni'), Poisson ('Po'), and Multiplicative ('Mul').
The resulting noisy data is clipped to be non-negative.
Parameters
----------
ori_data : array-like
The original data to which noise will be added.
Returns
-------
array-like
The noised data.
Notes
-----
The noise types behave as follows:
- Gaussian ('Gaus'): Adds noise with mean 0 and standard deviation `noise_strength`.
- Uniform ('Uni'): Adds noise uniformly distributed over [-`noise_strength`/2, `noise_strength`/2).
- Poisson ('Po'): Adds Poisson-distributed noise where `noise_strength` serves as lambda.
- Multiplicative ('Mul'): Applies Gaussian multiplicative noise with mean 1 and standard deviation
`noise_strength`, multiplying the original data by the generated noise.
The resulting noised data is clipped to ensure no negative values.
"""
base = self.base
# Get the shape of the original data and initialize the noise array
rows, cols = ori_data.shape
noise = np.zeros((rows, cols))
if base.noise_type == 'Gaus':
# Add Gaussian noise with mean 0 and standard deviation `noise_strength`
for i in range(cols):
noise[:, i] = np.random.normal(0, base.noise_strength, rows)
noised_data = ori_data + noise
elif base.noise_type == 'Uni':
# Uniform noise over the interval [-`noise_strength`/2, `noise_strength`/2)
for i in range(cols):
noise[:, i] = np.random.uniform(low=-base.noise_strength/2, high=base.noise_strength/2, size=rows)
noised_data = ori_data + noise
elif base.noise_type == 'Po':
# Poisson noise with `noise_strength` as lambda
for i in range(cols):
noise[:, i] = np.random.poisson(base.noise_strength, rows)
noised_data = ori_data + noise
elif base.noise_type == 'Mul':
# Multiplicative Gaussian noise with mean 1 and standard deviation `noise_strength`
for i in range(cols):
noise[:, i] = np.random.normal(1, base.noise_strength, rows)
noised_data = ori_data * noise
# Ensure the data remains non-negative by clipping
noised_data = np.clip(noised_data, 0, np.inf)
return noised_data
[docs] def KDE_fit(self, x_uni_ori, data_ori, bandwidth='scott', kernel_func='epanechnikov'):
"""
Fit a Kernel Density Estimation (KDE) model to the original data using the
specified kernel function and bandwidth.
Parameters
----------
x_uni_ori : array-like
The unique values of the data variable. Must be a one-dimensional array.
data_ori : array-like
The original data corresponding to `x_uni_ori`. Should be absolute values, not relative.
bandwidth : float or {'scott', 'silverman'}, optional
The bandwidth of the kernel. If a float is provided, it defines the bandwidth directly.
If a string ('scott' or 'silverman') is provided, the bandwidth is estimated using one
of these methods. Defaults to 'scott'.
kernel_func : {'gaussian', 'tophat', 'epanechnikov', 'exponential', 'linear', 'cosine'}, optional
The kernel to use for the density estimation. Defaults to 'epanechnikov'.
Returns
-------
sklearn.neighbors.kde.KernelDensity
The fitted KDE model.
Notes
-----
- `x_uni_ori` must be reshaped into a column vector for compatibility with the KernelDensity class.
- Any values in `data_ori` that are zero or less are adjusted to a small positive value (1e-20) to
avoid numerical issues during KDE fitting.
"""
# Reshape the input data to be compatible with KernelDensity
x_uni_ori_re = x_uni_ori.reshape(-1, 1)
# Avoid divide-by-zero errors by adjusting very small or zero data points
data_ori_adjested = np.where(data_ori <= 0, 1e-20, data_ori)
# Create and fit the KDE model with the specified kernel and bandwidth
kde = KernelDensity(kernel=kernel_func, bandwidth=bandwidth)
kde.fit(x_uni_ori_re, sample_weight=data_ori_adjested)
return kde
[docs] def KDE_score(self, kde, x_uni_new):
"""
Evaluate and normalize the KDE model on new data points based on the
cumulative distribution function (Q3).
Parameters
----------
kde : sklearn.neighbors.kde.KernelDensity
The fitted KDE model from the method.
x_uni_new : array-like
New unique data points where the KDE model will be evaluated.
Returns
-------
array-like
The smoothed and normalized data based on the KDE model.
Notes
-----
- The KDE model is evaluated on the new data points by calculating the log density, which is
then exponentiated to get the actual density values.
- The smoothed data is normalized by dividing by the last value of the cumulative distribution (Q3).
"""
# Reshape the new data points to match the input format expected by the KDE model
x_uni_new_re = x_uni_new.reshape(-1, 1)
# Evaluate the KDE model to get the smoothed density values
data_smoothing = np.exp(kde.score_samples(x_uni_new_re))
# Flatten a column vector into a one-dimensional array
data_smoothing = data_smoothing.ravel()
# Normalize the smoothed data using the cumulative distribution function (Q3)
Qx = self.base.p.post.calc_Qx(x_uni_new, data_smoothing)
data_smoothing = data_smoothing / Qx[-1]
return data_smoothing
[docs] def traverse_path(self, label, path_ori):
"""
Update the file path or list of file paths based on the given label.
This method modifies the provided file path or a list of file paths by appending
or updating a numerical label (e.g., '_0', '_1') to distinguish different samples
of the same test.
Parameters
----------
label : int
The label to update or append to the file path(s). The label corresponds to the
current sample or iteration number.
path_ori : str or list of str
The original file path or list of file paths to be updated.
Returns
-------
str or list of str
The updated file path(s) with the new label.
"""
def update_path(path, label):
# For label 0, append '_0' to the file name before the extension
if label == 0:
return path.replace(".xlsx", f"_{label}.xlsx")
# For other labels, replace the previous label with the current label
else:
return path.replace(f"_{label-1}.xlsx", f"_{label}.xlsx")
# If the input is a list of paths, update each path in the list
if isinstance(path_ori, list):
return [update_path(path, label) for path in path_ori]
else:
return update_path(path_ori, label)