# -*- coding: utf-8 -*-
"""
Created on Wed Jun 4 09:57:59 2025
@author: Haoran Ji
e-mail: haoran.ji@kit.edu
"""
import os
import numpy as np
from optframework.kernel_opt.opt_base import OptBase
import matplotlib.pyplot as plt
import optframework.utils.plotter.plotter as pt
from optframework.utils.general_scripts.convert_exp_data_Batch import interpolate_Qx
import pandas as pd
# Modified load function to include G_flag as parameter
[docs]def load_cross_validation_results(result_dir, n_iter, data_dir, G_flag):
"""
Load and assemble cross-validation results for a specific (n_iter, data_dir, G_flag).
Returns lists of optimized parameters, training scores, and all losses.
"""
opt_params_list = []
opt_scores = []
losses_all_list = []
# Build filename patterns
prefix = f"cv_iter_{n_iter}_"
suffix = f"_{data_dir}_{G_flag}.npz"
files = sorted([
f for f in os.listdir(result_dir)
if f.startswith(prefix) and f.endswith(suffix)
])
# Load each file
for fname in files:
path = os.path.join(result_dir, fname)
data = np.load(path, allow_pickle=True)
opt_params_list.append(data['opt_params'].item())
opt_scores.append(data['opt_score_train'].item())
losses_all_list.append(data['losses_all'])
evaluated_params = list(data['all_params'])
evaluated_rewards = list(data['all_score'])
return opt_params_list, opt_scores, losses_all_list, evaluated_params, evaluated_rewards
[docs]def load_all_cv_results(result_dir, n_iter_list, data_dir, G_flag_list):
"""
Load results across all combinations of n_iter and G_flag.
Returns a nested dict: results[G_flag][n_iter] = {...}
"""
results = {}
for G_flag in G_flag_list:
results[G_flag] = {}
for n_iter in n_iter_list:
opt_params, scores, losses, evaluated_params, evaluated_rewards = load_cross_validation_results(
result_dir, n_iter, data_dir, G_flag
)
results[G_flag][n_iter] = {
'opt_params_list': opt_params,
'opt_scores': scores,
'losses_all_list': losses,
'evaluated_params': evaluated_params,
'evaluated_rewards': evaluated_rewards
}
return results
[docs]def add_opt_params_mean(results):
for G_flag in results:
for n_iter in results[G_flag]:
opt_list = results[G_flag][n_iter]["opt_params_list"]
keys = opt_list[0].keys()
means = {key: np.mean([d[key] for d in opt_list]) for key in keys}
results[G_flag][n_iter]["opt_params_mean"] = means
return results
[docs]def plot_summary_two_charts(results, n_iter_list, G_flag_list, result_dir):
"""
Plot summary charts for a single G_flag from cross-validation results.
Parameters:
results : Nested dict from load_all_cv_results
n_iter_list : List of iteration counts
G_flag_list : List of G_flag strings (only one expected)
result_dir : Directory path to save plots
"""
os.makedirs(result_dir, exist_ok=True)
G_flag = G_flag_list[0]
# Chart 1: Training/Test mean and std over n_iter (twin y-axes)
train_means, train_stds = [], []
test_means, test_stds = [], []
for n_iter in n_iter_list:
train_scores = results[G_flag][n_iter]['opt_scores']
test_losses = [results[G_flag][n_iter]['losses_all_list'][i][i] for i in range(len(train_scores))]
train_means.append(np.mean(train_scores))
train_stds.append(np.std(train_scores))
test_means.append(np.mean(test_losses))
test_stds.append(np.std(test_losses))
fig = plt.figure(figsize=(10, 6), dpi=300)
ax1 = fig.subplots()
ax2 = ax1.twinx()
ax1.plot(n_iter_list, train_means, 'o-', markersize=8, linewidth=2, label='Train Mean', color='tab:blue')
ax1.plot(n_iter_list, test_means, 's-', markersize=8, linewidth=2, label='Test Mean', color='tab:green')
ax2.plot(n_iter_list, train_stds, 'o--', markersize=8, linewidth=2, label='Train Std', color='tab:blue')
ax2.plot(n_iter_list, test_stds, 's--', markersize=8, linewidth=2, label='Test Std', color='tab:green')
ax1.set_xlabel("n_iter")
ax1.set_ylabel("Mean Score", color='black')
ax2.set_ylabel("Std of Score", color='black')
ax1.set_xscale('log')
ax1.set_yscale('log')
ax1.grid(True)
lines, labels = ax1.get_legend_handles_labels()
lines2, labels2 = ax2.get_legend_handles_labels()
ax1.legend(lines + lines2, labels + labels2, loc='upper right', ncol=2)
plt.title("Train/Test Mean & Std over Iterations")
plt.tight_layout()
plt.savefig(os.path.join(result_dir, "train_test_mean_std_combined.jpg"), dpi=300, bbox_inches='tight')
plt.close()
# Chart 2: Normalized parameter std
param_names = list(results[G_flag][n_iter_list[0]]['opt_params_list'][0].keys())
param_names = [p for p in param_names if p != 'G'] # exclude G
plt.figure(figsize=(10, 6), dpi=300)
for i, param in enumerate(param_names):
raw_means = []
raw_stds = []
for n_iter in n_iter_list:
vals = [p[param] for p in results[G_flag][n_iter]['opt_params_list']]
raw_means.append(np.mean(vals))
raw_stds.append(np.std(vals))
norm_stds = [s / m if m != 0 else 0.0 for s, m in zip(raw_stds, raw_means)]
marker = marker_styles[i % len(marker_styles)]
plt.plot(n_iter_list, norm_stds, marker=marker, markersize=8, linewidth=2, label=param)
plt.xscale('log')
plt.yscale('log')
plt.xlabel("n_iter")
plt.ylabel("Normalized Std of Parameters")
plt.title("Normalized Parameter Std vs n_iter")
plt.legend()
plt.grid(True)
plt.tight_layout()
plt.savefig(os.path.join(result_dir, "params_normalized_std_combined.jpg"), dpi=300, bbox_inches='tight')
plt.close()
# save the mean and stds as excel file
rows = []
for idx, n_iter in enumerate(n_iter_list):
row = {
"n_iter": n_iter,
"train_mean": train_means[idx],
"train_std": train_stds[idx],
"test_mean": test_means[idx],
"test_std": test_stds[idx],
}
for param in param_names:
vals = [p[param] for p in results[G_flag][n_iter]['opt_params_list']]
param_mean = np.mean(vals)
param_std = np.std(vals)
norm_std = param_std / param_mean if param_mean != 0 else 0.0
row[f"{param}_mean"] = param_mean
row[f"{param}_norm_std"] = norm_std
rows.append(row)
df = pd.DataFrame(rows)
excel_path = os.path.join(result_dir, "summary_data.xlsx")
df.to_excel(excel_path, index=False)
print(f"[Saved] Summary data table to: {excel_path}")
[docs]def plot_x_mean(results, G_flag):
if G_flag == "Median_Integral":
n = 3.3700
G_datas = [32.0404, 39.1135, 41.4924, 44.7977, 45.6443]
elif G_flag == "Median_LocalStirrer":
n = 0.6417
G_datas = [104.014, 258.081, 450.862, 623.357, 647.442]
elif G_flag == "Mean_Integral":
n = 1.6477
G_datas = [87.2642, 132.668, 143.68, 183.396, 185.225]
elif G_flag == "Mean_LocalStirrer":
n = 0.8154
G_datas = [297.136, 594.268, 890.721, 1167.74, 1284.46]
else:
raise ValueError(f"Unknown G_flag: {G_flag}")
known_params_list = [{'G': G_val} for G_val in G_datas]
n_iter_list = [200, 800, 6400]
time_points = [0, 5, 10, 45]
linestyles = {200: '-', 800: '--', 6400: ':'}
sample_names = ["n=600", "n=900", "n=1200", "n=1500", "n=1800"]
excel_path = os.path.join(result_dir, "x_mean.xlsx")
writer = pd.ExcelWriter(excel_path, engine='openpyxl')
for i in range(len(known_params_list)):
fig, ax = plt.subplots(figsize=(8, 6))
ax.plot(time_points, x_exp_mean[i], 'x', color='black', label="Exp")
data_dict = {'Time': time_points, 'Exp': x_exp_mean[i]}
for n_iter in n_iter_list:
opt_params = results[G_flag][n_iter]['opt_params_list'][i]
_, x_mod_mean = calc_delta_test(
[known_params_list[i]],
[data_names_list[i]],
init_core=True,
opt_params=opt_params
)
x_mod_mean = np.array(x_mod_mean)
if isinstance(x_mod_mean[0], list):
x_mod_mean = x_mod_mean[0]
ax.plot(time_points, x_mod_mean, linestyle=linestyles[n_iter], label=f"Sim ({n_iter})")
data_dict[f"Sim_{n_iter}"] = x_mod_mean
ax.set_xlabel("Time (min)")
ax.set_ylabel("Mean Diameter (m)")
ax.set_yscale('log')
ax.set_title(f"Experimental vs Simulated Mean Volume - {G_flag} - Sample {i+1}")
ax.grid(True, which='both')
ax.legend()
plt.tight_layout()
plt.show()
df = pd.DataFrame(data_dict)
df.to_excel(writer, index=False, sheet_name=sample_names[i])
writer.close()
print(f"[Saved] x_mean comparison to: {excel_path}")
[docs]def calc_delta_test(known_params_list, exp_data_paths, init_core=True, opt_params=None,
ax=None, fig=None, index=''):
if opt_params is None:
pop_params = {
'pl_v': 1.5,
'pl_P1': 1e-4,
'pl_P2': 1,
'corr_agg': np.array([1e-3])
}
else:
if 'corr_agg_0' in opt_params:
opt_params = opt.core.array_dict_transform(opt_params)
pop_params = opt_params.copy()
exp_data_paths = [os.path.join(opt.data_path, name) for name in exp_data_paths]
if init_core:
opt.core.init_attr(opt.core_params)
opt.core.init_pbe(opt.pop_params, opt.data_path)
x_uni_exp = []
data_exp = []
for exp_data_path_tem in exp_data_paths:
x_uni_exp_tem, data_exp_tem = opt.core.p.get_all_data(exp_data_path_tem)
x_uni_exp.append(x_uni_exp_tem)
data_exp.append(data_exp_tem)
losses = []
for i in range(len(known_params_list)):
known_i = known_params_list[i]
for key, value in known_i.items():
pop_params[key] = value
x_i = x_uni_exp[i]
data_i = data_exp[i]
loss_i = opt.core.calc_delta(pop_params, x_i, data_i)
losses.append(loss_i)
mom = opt.core.p.calc_mom_t()
x_mod_mean = (mom[1,0,:] / mom[0,0,:] *6 /np.pi)**(1/3)
return losses, x_mod_mean
[docs]def plot_x_50(results, G_flag):
if G_flag == "Median_Integral":
n = 3.3700
G_datas = [32.0404, 39.1135, 41.4924, 44.7977, 45.6443]
elif G_flag == "Median_LocalStirrer":
n = 0.6417
G_datas = [104.014, 258.081, 450.862, 623.357, 647.442]
elif G_flag == "Mean_Integral":
n = 1.6477
G_datas = [87.2642, 132.668, 143.68, 183.396, 185.225]
elif G_flag == "Mean_LocalStirrer":
n = 0.8154
G_datas = [297.136, 594.268, 890.721, 1167.74, 1284.46]
else:
raise ValueError(f"Unknown G_flag: {G_flag}")
known_params_list = [{'G': G_val} for G_val in G_datas]
n_iter_list = [200, 800, 6400]
time_points = [0, 5, 10, 45]
linestyles = {200: '-', 800: '--', 6400: ':'}
sample_names = ["n=600", "n=900", "n=1200", "n=1500", "n=1800"]
excel_path = os.path.join(result_dir, "x_50.xlsx")
writer = pd.ExcelWriter(excel_path, engine='openpyxl')
for i in range(len(known_params_list)):
fig, ax = plt.subplots(figsize=(8, 6))
opt_params_dict = {n_iter: results[G_flag][n_iter]['opt_params_list'][i] for n_iter in n_iter_list}
exp_x50, _ = calc_x50([known_params_list[i]], [data_names_list[i]], init_core=True, opt_params=opt_params_dict[200])
ax.plot(time_points, exp_x50, 'x', color='black', label='Exp')
data_dict = {'Time': time_points, 'Exp': exp_x50}
for n_iter in n_iter_list:
_, mod_x50 = calc_x50([known_params_list[i]], [data_names_list[i]], init_core=True, opt_params=opt_params_dict[n_iter])
ax.plot(time_points, mod_x50, linestyle=linestyles[n_iter], label=f"Sim ({n_iter})")
data_dict[f"Sim_{n_iter}"] = mod_x50
ax.set_xlabel("Time (min)")
ax.set_ylabel("Median Diameter (m)")
ax.set_yscale('log')
ax.set_title(f"Exp vs Sim Median Volume - {G_flag} - Sample {i+1}")
ax.grid(True, which='both')
ax.legend()
plt.tight_layout()
plt.show()
df = pd.DataFrame(data_dict)
df.to_excel(writer, index=False, sheet_name=sample_names[i])
writer.close()
print(f"[Saved] x_50 comparison to: {excel_path}")
[docs]def calc_x50(known_params_list, exp_data_paths, init_core=True, opt_params=None,
ax=None, fig=None, index=''):
if opt_params is None:
pop_params = {
'pl_v': 1.5,
'pl_P1': 1e-4,
'pl_P2': 1,
'corr_agg': np.array([1e-3])
}
else:
if 'corr_agg_0' in opt_params:
opt_params = opt.core.array_dict_transform(opt_params)
pop_params = opt_params.copy()
exp_data_paths = [os.path.join(opt.data_path, name) for name in exp_data_paths]
if init_core:
opt.core.init_attr(opt.core_params)
opt.core.init_pbe(opt.pop_params, opt.data_path)
opt.core.sheet_name = 'Q_x_int'
opt.core.delta_flag = [('x_50','MSE')]
x_uni_exp = []
data_exp = []
for exp_data_path_tem in exp_data_paths:
x_uni_exp_tem, data_exp_tem = opt.core.p.get_all_data(exp_data_path_tem)
data_exp.append(data_exp_tem)
x_50_exp = np.array(data_exp).reshape(-1)
for i in range(len(known_params_list)):
known_i = known_params_list[i]
for key, value in known_i.items():
pop_params[key] = value
x_i = x_uni_exp[i]
data_i = data_exp[i]
_ = opt.core.calc_delta(pop_params, x_i, data_i)
x_50_mod = np.zeros(len(opt.core.p.t_vec))
for t_frame, t in enumerate(opt.core.p.t_vec):
x_50_mod[t_frame] = opt.core.p.return_distribution(t=t_frame, flag='x_50', q_type='q0')[0]
return x_50_exp, x_50_mod
[docs]def re_calc_lognormal_results():
N = len(data_names_list)
for n_iter in n_iter_list:
for G_flag in G_flag_list:
if G_flag == "Median_Integral":
n = 2.6428 if data_dir == "lognormal_curvefit" else 3.1896
G_datas = [32.0404, 39.1135, 41.4924, 44.7977, 45.6443]
elif G_flag == "Median_LocalStirrer":
n = 0.4723 if data_dir == "lognormal_curvefit" else 0.5560
G_datas = [104.014, 258.081, 450.862, 623.357, 647.442]
elif G_flag == "Mean_Integral":
n = 1.1746 if data_dir == "lognormal_curvefit" else 1.3457
G_datas = [87.2642, 132.668, 143.68, 183.396, 185.225]
elif G_flag == "Mean_LocalStirrer":
n = 0.5917 if data_dir == "lognormal_curvefit" else 0.7020
G_datas = [297.136, 594.268, 890.721, 1167.74, 1284.46]
# G_datas = [297.136, 594.268, 890.721, 1167.74]
else:
raise ValueError(f"Unknown G_flag: {G_flag}")
known_params_list = [{'G': G_val} for G_val in G_datas]
# Build filename patterns
prefix = f"cv_iter_{n_iter}_"
suffix = f"_{data_dir}_{G_flag}.npz"
files = sorted([
f for f in os.listdir(result_dir)
if f.startswith(prefix) and f.endswith(suffix)
])
# Load each file
for i, fname in enumerate(files):
path = os.path.join(result_dir, fname)
with np.load(path, allow_pickle=True) as data:
data_dict = dict(data)
opt_params = data_dict['opt_params'].item()
losses_all_re_calc, _ = calc_delta_test(known_params_list, data_names_list, False, opt_params)
train_losses_re_calc = [losses_all_re_calc[j] for j in range(N) if j != i]
opt_scores_re_calc = sum(train_losses_re_calc) / len(train_losses_re_calc)
data_dict['losses_all'] = losses_all_re_calc
data_dict['opt_score_train'] = opt_scores_re_calc
np.savez(path, **data_dict)
[docs]def visualize_opt_distribution(t_frame=-1, x_uni_exp=None, data_exp=None,
ax=None, fig=None, index=0):
x_uni, q0, Q0, sum_uni = opt.core.p.return_distribution(t=t_frame, flag='x_uni, qx, Qx,sum_uni', q_type='q0')
if opt.core.smoothing:
kde = opt.core.KDE_fit(x_uni[1:], sum_uni[1:])
q0 = opt.core.KDE_score(kde, x_uni_exp[1:])
q0 = np.insert(q0, 0, 0.0)
Q0 = opt.core.calc_Qx(x_uni_exp, q0)
q0 = q0 / Q0.max()
x_uni = x_uni_exp
fig, ax = plt.subplots()
ax, fig = pt.plot_data(x_uni, Q0, fig=fig, ax=ax,
xlbl=r'Agglomeration size $x_\mathrm{A}$ / $-$',
ylbl='number distribution of agglomerates $q0$ / $-$',
lbl='opt',clr='b',mrk='o')
ax, fig = pt.plot_data(x_uni_exp, data_exp[:, t_frame], fig=fig, ax=ax,
xlbl=r'Agglomeration size $x_\mathrm{A}$ / $-$',
ylbl='number distribution of agglomerates $q0$ / $-$',
lbl='exp',clr='r',mrk='^')
# color_list = ['b', 'gold', 'g', 'r', 'violet']
# ax, fig = pt.plot_data(x_uni[:30], Q0[:30], fig=fig, ax=ax, plt_type='scatter',
# xlbl=r'Agglomeration size $x_\mathrm{A}$ / $-$',
# ylbl='number distribution of agglomerates $q0$ / $-$',
# lbl=f'opt_{index}',clr=color_list[i],mrk='o')
# ax, fig = pt.plot_data(x_uni_exp[:30], data_exp[:30, t_frame], fig=fig, ax=ax,
# xlbl=r'Agglomeration size $x_\mathrm{A}$ / $-$',
# ylbl='number distribution of agglomerates $q0$ / $-$',
# lbl=f'exp_{index}',clr=color_list[i],mrk='^')
ax.grid('minor')
# ax.set_xscale('log')
plt.tight_layout()
plt.show()
return ax, fig
[docs]def main():
results = load_all_cv_results(result_dir, n_iter_list, data_dir, G_flag_list)
add_opt_params_mean(results)
plot_summary_two_charts(results, n_iter_list, G_flag_list, result_dir)
plot_x_mean(results, G_flag_list[0])
plot_x_50(results, G_flag_list[0])
return results
if __name__ == '__main__':
base_path = r"C:\Users\px2030\Code\PSD_opt\tests"
config_path = os.path.join(base_path, "config", "opt_Batch_config.py")
data_dir = "int1d" ## "int1d", "lognormal_curvefit"
data_path = os.path.join(base_path, "data", data_dir)
opt = OptBase(config_path=config_path, data_path=data_path)
data_names_list = [
"Batch_600_Q0_post.xlsx",
"Batch_900_Q0_post.xlsx",
"Batch_1200_Q0_post.xlsx",
"Batch_1500_Q0_post.xlsx",
"Batch_1800_Q0_post.xlsx",
]
n_iter_list = [200, 400, 800, 1600, 2400, 4000, 6400]
marker_styles = ['o', 's', 'D', '^', 'v', 'P', '*', 'X']
result_dir = os.path.join(r"C:\Users\px2030\Code\Ergebnisse\Batch_opt\opt_results", "cv_results_group38")
if data_dir == "lognormal_curvefit":
x_exp_mean = np.array([
[0.08523817, 0.08326844, 0.08579775, 0.07277097],
[0.08523817, 0.07365923, 0.07167854, 0.07170555],
[0.08523817, 0.07324782, 0.07380572, 0.06823556],
[0.08523817, 0.07450765, 0.07019528, 0.06936281],
[0.08523817, 0.07652142, 0.0740287 , 0.06236208]]) * 1e-6
G_flag_list = ["Mean_Integral"]
elif data_dir == "int1d":
x_exp_mean = np.array([
[0.11826945, 0.12083028, 0.11184293, 0.21994005],
[0.11826945, 0.12277321, 0.09504571, 0.08820781],
[0.11826945, 0.09741461, 0.09254452, 0.28119003],
[0.11826945, 0.0992418 , 0.09677272, 0.09395102],
[0.11826945, 0.09177996, 0.08894885, 0.08688184]
]) * 1e-6
G_flag_list = ["Median_LocalStirrer"]
G_flag_list = [
"Median_Integral",
# "Median_LocalStirrer",
# "Mean_Integral",
# "Mean_LocalStirrer"
]
# re_calc_lognormal_results()
results = main()