Source code for optframework.utils.func.func_read_exp

# -*- coding: utf-8 -*-
"""
Created on Thu Nov  9 14:59:35 2023

Read or interpolate experimental data based on the time step of the simulated data

@author: px2030
"""
import os
import pandas as pd
from datetime import datetime, timedelta
from scipy.interpolate import interp1d

[docs]class write_read_exp(): def __init__(self, path_exp_data = None, read = False, sheet_name=None, exp_data=False): if read: # read the original data self.file_name = os.path.basename(path_exp_data) if sheet_name is None: self.exp_data = pd.read_excel(path_exp_data) else: self.exp_data = pd.read_excel(path_exp_data, sheet_name=sheet_name) self.exp_data = self.exp_data.set_index('Circular Equivalent Diameter') # Parse time in Format %H:%M:%S and convert to minutes self.exp_data.columns = [self.convert_time_to_seconds(col) for col in self.exp_data.columns] # Merge data from duplicate time columns self.merge_duplicate_time_columns()
[docs] def convert_time_to_seconds(self, time_str): # Parse the time string into a datetime object and then convert it to second try: # First try the format without milliseconds time_obj = datetime.strptime(time_str, '%H:%M:%S') except ValueError: try: time_obj = datetime.strptime(time_str, '%H:%M:%S.%f') except ValueError: raise ValueError(f"Time string format not recognized: {time_str}") total_second = time_obj.hour * 3600 + time_obj.minute * 60 + time_obj.second + time_obj.microsecond / 1e6 # print(f"Time string: {time_str}, Total second: {total_second}") return total_second
[docs] def convert_seconds_to_time(t_vec): # Parse the datetime into time string formatted_times = [str(timedelta(seconds=t)).split(".")[0] + '.' + str(int(t % 1 * 1000000)).zfill(6) for t in t_vec] return formatted_times
[docs] def merge_duplicate_time_columns(self): unique_times = [] duplicate_columns = [] # Identify repeating time columns for col in self.exp_data.columns: if col not in unique_times: unique_times.append(col) else: duplicate_columns.append(col) # For each repeated time point, merge the data for time in duplicate_columns: duplicate_data = self.exp_data.loc[:, self.exp_data.columns == time] mean_data = duplicate_data.mean(axis=1) self.exp_data = self.exp_data.drop(columns=duplicate_data.columns) self.exp_data[time] = mean_data # Rearrange the columns so they are in chronological order self.exp_data = self.exp_data.reindex(sorted(self.exp_data.columns), axis=1)
[docs] def get_exp_data(self, t_vec): interpolated_results = {} for t_exp in t_vec: # use interpolation to find experiment data time_points = self.exp_data.columns[:] # make sure that t_exp not out of the range # if t_exp < min(time_points) or t_exp > max(time_points): if t_exp < 0 or t_exp > max(time_points): raise ValueError(f"The experimental time is out of the range of the data file {self.file_name}.") # interpolation for time interpolated_data = {} for diameter in self.exp_data.index: row_data = self.exp_data.loc[diameter, time_points] interp_func = interp1d(time_points, row_data, kind='linear') interpolated_data[diameter] = float(interp_func(t_exp)) interpolated_results[t_exp] = pd.Series(interpolated_data, name=t_exp) # convert the result in dataframe result_df = pd.DataFrame(interpolated_results) return result_df