Source code for optframework.mcpbe.mcpbe_jit

# -*- coding: utf-8 -*-
"""
Solving multi-component population balance equations for agglomerating systems via MONTE CARLO method.
@author: Frank Rhein, frank.rhein@kit.edu, Luzia Gramling, Institute of Mechanical Process Engineering and Mechanics
"""
### ------ IMPORTS ------ ###
## General
import os
import runpy
import numpy as np
import math
import sys
from numba import jit
from scipy.stats import norm, weibull_min
import time
import copy
from pathlib import Path
from scipy.integrate import quad, dblquad
from scipy.interpolate import interp1d, RegularGridInterpolator

from optframework.base.base_solver import BaseSolver
import optframework.utils.func.jit_kernel_agg as kernel_agg
import optframework.utils.func.jit_kernel_break as kernel_break

## For plots
import matplotlib.pyplot as plt
from ..utils.plotter import plotter as pt          
from ..utils.plotter.KIT_cmap import c_KIT_green, c_KIT_red, c_KIT_blue

### ------ POPULATION CLASS DEFINITION ------ ###
[docs]class MCPBESolver(BaseSolver): def __init__(self, dim=2, t_total=601, t_write=10, t_vec= None, verbose=False, load_attr=True, config_path=None, init=True): self._init_base_parameters(dim, t_total, t_write, t_vec) ## Simulation parameters self.c = np.full(dim,0.1e-2) # Concentration array of components self.x = np.full(dim,1e-6) # (Mean) equivalent diameter of primary particles for each component self.x2 = np.full(dim,1e-6) # (Mean) equivalent diameter of primary particles for each component (bi-modal case) self.a0 = 1e3 # Total amount of particles in control volume (initially) [-] self.CDF_method = "disc" # Which method to use for calculating and representing the fragment distribution function. # "disc" uses 1000 (default) discrete points to describe the distribution function; # the mass depends on the number of discrete points, and too many points may affect computational efficiency. # "conti" uses continuous functions and integration to describe the distribution function, # providing higher accuracy but limited to one dimension. self.VERBOSE = verbose # Whether to print detailed information of the calculation. ## Initial conditions # PGV defines which initial particle size distribution is assumed for each component # 'mono': Monodisperse at x = x[i] # 'norm': Normal distribution at x_mean = x[i] with sigma defined in SIG self.PGV = np.full(dim, 'mono') self.SIG = np.full(dim,0.1) # (relative) STD of normal distribution STD = SIG*v # PGV2 and SIG2 can be used to initialize bi-modal distributions. Use None to stay mono-modal self.PGV2 = None self.SIG2 = None ## Print more information if VERBOSE is True self.V_flat = None # Load the configuration file, if available if config_path is None and load_attr: config_path = os.path.join(self.work_dir,"config","MCPBE_config.py") if load_attr: self._load_attributes(config_path) # self._reset_params() if init: self.init_calc()
[docs] def init_calc(self, init_Vc=True): # Check dimension of all relevant parameters: if not self.check_dim_consistency(): print('Provided inputs are not consistend in their dimensions. Exiting..') sys.exit() if init_Vc: # # Extract number of components from concentration array c # self.dim = len(self.c) self.v = self.x**3*math.pi/6 # Array of (mean) volume primary particles (each component) self.v2 = self.x2**3*math.pi/6 # Array of (mean) volume primary particles (bi-modal case) # The following part changes whether mono-or bi-modal is used ## Mono-modal if self.PGV2 is None: self.n = np.round(self.c/(self.v)) # Array of number concentration (each component) self.n2 = 0*self.n ## Bi-modal else: self.n = np.round(self.c/(2*self.v)) # Array of number concentration (each component) self.n2 = np.round(self.c/(2*self.v2)) # Array of number concentration (each component) self.n0 = np.sum([self.n,self.n2]) # Total number of primary particles self.Vc = self.a0/self.n0 # Control volume so that a0 is realized with respect to n0 self.a = np.round(self.n*self.Vc).astype(int) # Array total number of primary particles self.a2 = np.round(self.n2*self.Vc).astype(int) # Array total number of primary particles if self.V is None: ## Calculate volume matrix # Each COLUMN = one individual particle / agglomerate # LINE no. i = partial volume component i # Final LINE = total volume of particle / agglomerate if self.dim == 2 and self.process_type == "breakage" and self.PGV[0] == 'mono': ## The initial mono-disperse condition for 2D breakage is an aggregate consisting ## of two main particles (here the largest particles). a_tem = (np.sum(self.a)/2).astype(int) self.V = np.zeros((self.dim+1,a_tem)) for i in range(self.dim): self.V[i, :] = np.full(a_tem, self.v[i]) # Initialize V for pure agglomeration or mix case or (breakage in 1d): else: # self.V = np.zeros((self.dim+1,np.sum(self.a))) self.V = np.zeros((self.dim+1,np.sum([self.a,self.a2]))) cnt = 0 # Loop through all components for i in range (self.dim): ## Monodisperse if self.PGV[i] == 'mono': self.V[i,cnt:cnt+self.a[i]] = np.full(self.a[i],self.v[i]) ## Normal Distribution elif self.PGV[i] == 'norm': self.V[i,cnt:cnt+self.a[i]] = norm.rvs(loc=self.v[i], scale=self.SIG[i]*self.v[i], size=self.a[i]) ## Weibull Distribution elif self.PGV[i] == 'weibull': self.V[i,cnt:cnt+self.a[i]] = weibull_min.rvs(2, loc=self.SIG[i]*self.v[i], scale=self.v[i], size=self.a[i]) else: print(f'Provided PGV "{self.PGV[i]}" is invalid') cnt += self.a[i] ## Bi-Modal if self.PGV2 is not None: if self.PGV2[i] == 'mono': self.V[i,cnt:cnt+self.a2[i]] = np.full(self.a2[i],self.v2[i]) ## Normal Distribution elif self.PGV2[i] == 'norm': self.V[i,cnt:cnt+self.a2[i]] = norm.rvs(loc=self.v2[i], scale=self.SIG2[i]*self.v2[i], size=self.a2[i]) ## Weibull Distribution elif self.PGV2[i] == 'weibull': self.V[i,cnt:cnt+self.a2[i]] = weibull_min.rvs(2, loc=self.SIG2[i]*self.v2[i], scale=self.v2[i], size=self.a2[i]) else: print(f'Provided PGV "{self.PGV[i]}" is invalid') cnt += self.a2[i] # Last row: total volume self.V[-1,:] = np.sum(self.V[:self.dim,:], axis=0) # Delete empty entries (may occur to round-off error) self.V = np.delete(self.V, self.V[-1,:]==0, axis=1) self.a_tot = len(self.V[-1,:]) # Final total number of primary particles in control volume # IDX contains indices of initially present (primary) particles # Can be used to retrace which agglomerates contain which particles #self.IDX = [np.array([i],dtype=object) for i in range(len(self.V[-1,:]))] # self.IDX = [[i] for i in range(len(self.V[-1,:]))] #self.IDX = np.array(self.IDX,dtype=object) # Calculate equivalent diameter from total volume self.X=(6*(self.V[-1,:])/math.pi)**(1/3) # Initialize time array self.t=[0] # Initialize beta array if self.COLEVAL != 3: self.betaarray = calc_betaarray_jit(self.COLEVAL, self.a_tot, self.G, self.X, self.CORR_BETA, self.V, self.Vc) ## Calculate the expected number of particles generated based on the break function if self.BREAKFVAL == 1: self.frag_num = 4 elif self.BREAKFVAL == 2: self.frag_num = 2 elif self.BREAKFVAL == 3: self.frag_num = self.pl_v elif self.BREAKFVAL == 4: self.frag_num = (self.pl_v + 1) / self.pl_v if self.BREAKRVAL != 1: self.break_rate = np.zeros(self.a_tot) self.calc_break_rate() if self.BREAKFVAL != 1 or self.BREAKFVAL != 2: self.calc_break_func() if self.process_type == "agglomeration": self.process_agg = True self.process_break = False elif self.process_type == "breakage": self.process_agg = False self.process_break = True elif self.process_type == "mix": self.process_agg = True self.process_break = True # Save arrays self.t_vec = np.linspace(0,self.t_total,self.savesteps) self.V_save = [self.V] self.Vc_save = [self.Vc] self.V0 = self.V self.X0 = self.X self.V0_save = [self.V0] # self.IDX_save = [copy.deepcopy(self.IDX)] self.step=1
# Solve MC N times
[docs] def solve_MC_N(self, N=5, maxiter=1e8): # Copy base class instance (save all parameters) base = copy.deepcopy(self) if self.VERBOSE: cnt = 1 print(f'### Calculating MC iteration no. {cnt}/{N}.. ###') # Solve for the first time self.solve_MC(maxiter) # Loop through all iterations (-1, first calculation already done) for n in range(N-1): if self.VERBOSE: cnt += 1 print(f'### Calculating MC iteration no. {cnt}/{N}.. ###') # Set up temporary class instance, initialize it again (randomness in PSD) and solve it temp = copy.deepcopy(base) temp.init_calc() temp.solve_MC(maxiter) # Integrate temporary instance in main class instance self.combine_MC(temp)
# Solve MC 1 time
[docs] def solve_MC(self, maxiter=1e8): # Iteration counter and initial time count = 0 t0 = time.time() elapsed_time = 0.0 if self.process_agg: dtd_agg = self.calc_inter_event_time_agg() timer_agg = dtd_agg if self.process_break: dtd_break = self.calc_inter_event_time_break() timer_break = dtd_break if self.process_agg and dtd_agg >= self.t_total: print ("------------------ \n" f"Warning: The initial time step of agglomeration dt_agg = {dtd_agg} s " f"is longer than the total simulation duration t_total = {self.t_total}. " "Please try extending the total simulation duration or adjusting the parameters." "\n------------------") raise ValueError() if self.process_break and dtd_break >= self.t_total: print ("------------------ \n" f"Warning: The initial time step of breakage dt_break = {dtd_break} s is longer" f"than the total simulation duration t_total = {self.t_total}. " "Please try extending the total simulation duration or adjusting the parameters." "\n------------------") while self.t[-1] <= self.t_total and count < maxiter: if self.process_type == "agglomeration": self.calc_one_agg() ## Calculation of inter event time elapsed_time = timer_agg dtd_agg = self.calc_inter_event_time_agg() # Class method timer_agg += dtd_agg #dt = calc_inter_event_time_array(self.Vc,self.a_tot,self.betaarray) # JIT-compiled elif self.process_type == "breakage": self.calc_one_break() elapsed_time = timer_break dtd_break = self.calc_inter_event_time_break() timer_break += dtd_break elif self.process_type == "mix": if timer_agg < timer_break: self.calc_one_agg() ## Calculation of inter event time elapsed_time = timer_agg dtd_agg = self.calc_inter_event_time_agg() # Class method timer_agg += dtd_agg #dt = calc_inter_event_time_array(self.Vc,self.a_tot,self.betaarray) # JIT-compiled else: self.calc_one_break() elapsed_time = timer_break dtd_break = self.calc_inter_event_time_break() timer_break += dtd_break ## Add current timestep self.t=np.append(self.t,elapsed_time) ## When total amount of particles is less than half of its original value ## --> duplicate all agglomerates and double control volume if self.a_tot <= self.a0/2: self.Vc = self.Vc*2 self.a_tot = self.a_tot*2 self.V = np.append(self.V, self.V, axis=1) self.X = np.append(self.X, self.X) # Double indices, Add total number of initially present particles # No value appears double and references to [V_save[0], V_save[0], ...] # tmp = copy.deepcopy(self.IDX) # for i in range(len(tmp)): # for j in range(len(tmp[i])): # tmp[i][j]+=len(self.V0[0,:]) # self.IDX = self.IDX + tmp # self.IDX = np.append(self.IDX, self.IDX + len(self.V0[0,:])) self.V0 = np.append(self.V0, self.V0, axis=1) if self.VERBOSE: print(f'## Doubled control volume {int(np.log2(self.Vc*self.n0/self.a0))}-time(s). Current time: {int(self.t[-1])}s/{self.t_total}s ##') ## new calculation of kernel arrays if self.COLEVAL != 3 and self.process_type != "breakage": self.betaarray = calc_betaarray_jit(self.COLEVAL, self.a_tot, self.G, self.X, self.CORR_BETA, self.V, self.Vc) if self.BREAKRVAL != 1 and self.process_type != "agglomeration": self.break_rate = np.zeros_like(self.a_tot) self.calc_break_rate() ## Save at specific times if self.t_vec[self.step] <= self.t[-1]: self.t_vec[self.step] = self.t[-1] self.V_save.append(self.V) self.Vc_save.append(self.Vc) self.V0_save.append(self.V0) # self.IDX_save = self.IDX_save + [copy.deepcopy(self.IDX)] self.step+=1 count += 1 # Calculation (machine) time self.MACHINE_TIME = time.time()-t0 if self.VERBOSE: # Print why calculations stopped if count == maxiter: print('XX Maximum number of iterations reached XX') print(f'XX Final calculation time is {int(self.t[-1])}s/{self.t_total}s XX') else: print(f'## Agglomeration time reached after {count} iterations ##') print(f'## The calculation took {int(self.MACHINE_TIME)}s ##')
# Random choice of two agglomeration partners. Return indices.
[docs] def select_two_random(self): idx1 = self.select_one_random() idx2 = self.select_one_random() while idx1 == idx2: idx2 = self.select_one_random() return idx1, idx2
[docs] def select_one_random(self): idx = np.random.randint(self.a_tot) return idx
# Beta-weighted selection (requires beta array)
[docs] def select_size(self): return np.random.choice(np.arange(self.betaarray.shape[1]), p=self.betaarray[0,:]/np.sum(self.betaarray[0,:]))
# Calculation of inter-event-time
[docs] def calc_inter_event_time_agg(self): #Berechnung der inter-event-time nach Briesen (2008) mit konstantem/berechneten/ausgewähtem beta if self.COLEVAL == 3 : dtd=2*self.Vc/(self.CORR_BETA*self.a_tot**2) else: # Use mean value of all betas dtd=2*self.Vc/(self.a_tot**2*np.mean(self.betaarray[0,:])) return dtd
# Calculation of inter-event-time
[docs] def calc_inter_event_time_break(self): #Berechnung der inter-event-time nach Briesen (2008) mit konstantem/berechneten/ausgewähtem beta if self.BREAKRVAL == 1 : dtd=1.0/(self.a_tot) else: # Use mean value of all betas dtd=1.0/(self.a_tot*np.mean(self.break_rate)) return dtd
# Calculate alpha base on collision case model
[docs] def calc_alpha_ccm(self, idx1, idx2): P = np.array([self.V_flat[0,idx1]/self.V_flat[-1,idx1]*self.V_flat[0,idx2]/self.V_flat[-1,idx2],\ self.V_flat[0,idx1]/self.V_flat[-1,idx1]*self.V_flat[1,idx2]/self.V_flat[-1,idx2],\ self.V_flat[1,idx1]/self.V_flat[-1,idx1]*self.V_flat[0,idx2]/self.V_flat[-1,idx2],\ self.V_flat[1,idx1]/self.V_flat[-1,idx1]*self.V_flat[1,idx2]/self.V_flat[-1,idx2]]) return np.sum(P*self.alpha_prim)
# Calculate distribution moments mu(i,j,t)
[docs] def calc_mom_t(self): mu = np.zeros((3,3,len(self.t_vec))) # Time loop for t in range(len(self.t_vec)): for i in range(3): if self.dim == 1: mu[i,0,t] = np.sum(self.V_save[t][0,:]**i/self.Vc_save[t]) # The following only applies for more than 1 component else: for j in range(3): mu[i,j,t] = np.sum(self.V_save[t][0,:]**i*self.V_save[t][1,:]**j/self.Vc_save[t]) self.mu = mu return mu
## Combine multiple class instances (if t-grid is identical / for repeated calculations)
[docs] def combine_MC(self,m): # Check dimensions if len(self.Vc_save) != len(m.Vc_save): print("Cannot combine two instances as t arrays don't align. Discarding iteration..") return # Combine data for each timestep for t in range(len(self.t_vec)): self.t_vec[t] = np.mean([self.t_vec[t],m.t_vec[t]]) # Control volumina add up #print(len(self.Vc_save[t]),len(m.Vc_save[t])) self.Vc_save[t] += m.Vc_save[t] # Generating new idices and appending them (IDX of m needs to be raised) tmp = copy.deepcopy(m.IDX_save[t]) for i in range(len(tmp)): for j in range(len(tmp[i])): tmp[i][j]+=len(self.V0_save[t][0,:]) self.IDX_save[t] = self.IDX_save[t] + tmp # Combining initial state V0 self.V0_save[t] = np.append(self.V0_save[t], m.V0_save[t], axis=1) self.V_save[t] = np.append(self.V_save[t], m.V_save[t], axis=1)
## Visualize / plot density and sum distribution:
[docs] def visualize_qQ_t(self,t_plot=None,t_pause=0.5,close_all=False,scl_a4=1,figsze=[12.8,6.4*1.5], show_x10=False, show_x50=True, show_x90=False): # Definition of t_plot: # None: Plot all available times # Numpy array in range [0,1] --> Relative values of time indices # E.g. t_plot=np.array([0,0.5,1]) plots start, half-time and end import seaborn as sns import pandas as pd # Initialize plot pt.plot_init(scl_a4=scl_a4,figsze=figsze,lnewdth=0.8,mrksze=5,use_locale=True,scl=1.2) if close_all: plt.close('all') fig=plt.figure() ax1=fig.add_subplot(1,2,1) ax2=fig.add_subplot(1,2,2) if t_plot is None: t_plot = np.arange(len(self.t_vec)) else: t_plot = np.round(t_plot*(len(self.t_vec)-1)).astype(int) xmin = min(self.return_distribution(t=t_plot[0])[0])*1e6 xmax = max(self.return_distribution(t=t_plot[-1])[0])*1e6 for t in t_plot: # Calculate distribution x_uni, q3, Q3, x_10, x_50, x_90 = self.return_distribution(t=t) ax1.cla() ax2.cla() ax1, fig = pt.plot_data(x_uni*1e6, q3, ax=ax1, fig=fig, plt_type='scatter', xlbl='Particle Diameter $x$ / $\mu$m', ylbl='Volume density distribution $q_3$ / $-$', clr='k',mrk='o',leg=False,zorder=2) if len(x_uni) > 1: sns.histplot(data=pd.DataFrame({'x':x_uni[q3>=0]*1e6,'q':q3[q3>=0]}), x='x',weights='q', log_scale=True, bins=15, ax=ax1, kde=True, color=c_KIT_green) ax2, fig = pt.plot_data(x_uni*1e6, Q3, ax=ax2, fig=fig, xlbl='Particle Diameter $x$ / $\mu$m', ylbl='Volume sum distribution $Q_3$ / $-$', clr='k',mrk='o',leg=False) #ax1.set_ylim([0,0.25]) ax1.grid('minor') ax2.grid('minor') ax1.set_xscale('log') ax2.set_xscale('log') ax1.set_xlim([xmin,xmax]) ax2.set_xlim([xmin,xmax]) ax1.set_ylim([0,1]) ax2.set_ylim([0,1]) if show_x10: ax1.axvline(x_10*1e6, color=c_KIT_green) if show_x10: ax2.axvline(x_10*1e6, color=c_KIT_green) if show_x50: ax1.axvline(x_50*1e6, color=c_KIT_red) if show_x50: ax2.axvline(x_50*1e6, color=c_KIT_red) if show_x90: ax1.axvline(x_90*1e6, color=c_KIT_blue) if show_x90: ax2.axvline(x_90*1e6, color=c_KIT_blue) plt.tight_layout() plt.pause(t_pause) plt.show() return ax1, ax2, fig
## Visualize Moments
[docs] def visualize_mom_t(self, i=0, j=0, fig=None, ax=None, clr=c_KIT_green, lbl='MC'): if fig is None or ax is None: fig=plt.figure() ax=fig.add_subplot(1,1,1) ax, fig = pt.plot_data(self.t_vec,self.mu[i,j,:], fig=fig, ax=ax, xlbl='Agglomeration time $t_\mathrm{A}$ / $s$', ylbl=f'Moment $\mu ({i}{j})$ / '+'$m^{3\cdot'+str(i+j)+'}$', lbl=lbl,clr=clr,mrk='o') # Adjust y scale in case of first moment if i+j == 1: ax.set_ylim([np.min(self.mu[i,j,:])*0.9,np.max(self.mu[i,j,:])*1.1]) ax.grid('minor') plt.tight_layout() return ax, fig
## Return particle size distribution
[docs] def return_distribution(self, comp='all', t=0, Q3_grid=None): v_uni = np.array([]) sumvol_uni = np.array([]) if comp == 'all': # Extract unique values of V and save corresponding volume if not empty for i in range(len(self.V_save[t][-1,:])): if not self.V_save[t][-1,i] in v_uni: v_uni = np.append(v_uni,self.V_save[t][-1,i]) sumvol_uni = np.append(sumvol_uni,self.V_save[t][-1,i]) else: sumvol_uni[v_uni == self.V_save[t][-1,i]] += self.V_save[t][-1,i] # Sort v_uni in ascending order and keep track in sumvol_uni v_uni = v_uni[np.argsort(v_uni)] sumvol_uni = sumvol_uni[np.argsort(v_uni)] # Calculate diameter array x_uni=(6*v_uni/np.pi)**(1/3) # Calculate sum and density distribution Q3 = np.zeros(len(v_uni)) Q3[1:] = np.cumsum(sumvol_uni[1:])/np.sum(sumvol_uni[1:]) q3 = sumvol_uni/np.sum(sumvol_uni) # Retrieve x10, x50 and x90 through interpolation x_10=np.interp(0.1, Q3, x_uni) x_50=np.interp(0.5, Q3, x_uni) x_90=np.interp(0.9, Q3, x_uni) else: print('Case for comp not coded yet. Exiting') return # Q3_grid: 1D np.array to define extrapolation steps if Q3_grid is not None: x_uni_grid = np.interp(Q3_grid, Q3, x_uni) q3_grid = np.interp(x_uni_grid, x_uni, q3) return x_uni_grid, q3_grid, Q3_grid, x_10, x_50, x_90 else: return x_uni, q3, Q3, x_10, x_50, x_90
[docs] def calc_one_agg(self): ## Simplified case for random choice of collision partners (constant kernel) if self.COLEVAL == 3: self.beta = self.CORR_BETA idx1, idx2 = self.select_two_random() self.betaarray=0 ## Calculation of beta array containing all collisions ## Probability-weighted choice of two collision partners else: # select = self.select_size() # Class method select = select_size_jit(self.betaarray[0,:]) # JIT-compiled select self.beta = self.betaarray[0,select] idx1 = int(self.betaarray[1,select]) idx2 = int(self.betaarray[2,select]) ## Calculation of alpha if self.dim == 1: self.alpha = self.alpha_prim elif self.dim == 2: self.alpha = calc_alpha_ccm_jit(idx1,idx2) ## Size-correction if self.SIZEEVAL == 2: lam = min([self.X[idx1]/self.X[idx2],self.X[idx2]/self.X[idx1]]) alpha_corr = np.exp(-self.X_SEL*(1-lam)**2) / ((self.V[-1,idx1]*self.V[-1,idx2]/(np.mean(self.V0[-1,:])**2))**self.Y_SEL) self.alpha *= alpha_corr # Check if agglomeration occurs (if random number e[0,1] is smaller than alpha) if self.alpha > np.random.rand(): ## Modification of V, X and IDX at idx1. self.V[:,idx1] = self.V[:,idx1] + self.V[:,idx2] self.X[idx1] = (6*(self.V[self.dim,idx1])/math.pi)**(1/3) #self.IDX[idx1] = np.append(self.IDX[idx1],self.IDX[idx2]) # self.IDX[idx1] += self.IDX[idx2] ## Deletion of agglomerate at idx 2 self.V = np.delete(self.V, idx2, axis=1) self.X = np.delete(self.X, idx2) #self.IDX = np.delete(self.IDX, idx2) # del self.IDX[idx2] self.a_tot -= 1
[docs] def calc_one_break(self): ## Simplified case for random choice of collision partners (constant kernel) if self.BREAKRVAL == 1: # self.break_rate_select = 1.0 idx = self.select_one_random() else: idx = select_size_jit(self.break_rate) # self.break_rate_select = self.break_rate[idx] particle_to_break = self.V[:, idx].reshape(-1,1) ## Delete broken particles self.V = np.delete(self.V, idx, axis=1) self.X = np.delete(self.X, idx) # Replace the placeholder with the function call self.V, self.X, self.a_tot = handle_fragments(self.V, self.X, self.a_tot, particle_to_break, self.frag_num, self.BREAKFVAL, self.dim, self.break_func, self.rel_frag)
[docs] def calc_break_rate(self): if self.dim == 1: self.break_rate = kernel_break.calc_B_R_1d(self.V[-1,:], self.self.pl_P1, self.pl_P2, self.G, self.BREAKRVAL) ## The 2D fragmentation functionality is currently experimental ## and in some cases the number of fragments generated is different. if self.BREAKFVAL == 5: self.frag_num = (self.pl_v + 2) / self.pl_v elif self.dim == 2: self.break_rate = kernel_break.calc_B_R_2d_flat(self.V[-1,:], self.V[0,:], self.V[1,:], self.G, self.pl_P1, self.pl_P2, self.pl_P3, self.pl_P4, self.BREAKRVAL, self.BREAKFVAL) if self.BREAKFVAL == 5: self.frag_num = (2*self.pl_v+1)*(self.pl_v+2)/(2*self.pl_v*(self.pl_v+1))
[docs] def calc_break_func(self, num_points=1000): if self.dim == 1: self.rel_frag = np.linspace(0, 1, num_points) self.break_func = np.zeros(num_points) if self.CDF_method == "disc": for i in range (num_points): self.break_func[i] = kernel_break.breakage_func_1d(self.rel_frag[i], 1.0, self.pl_v, self.pl_q, self.BREAKFVAL) elif self.CDF_method == "conti": args = (1.0, self.pl_v, self.pl_q, self.BREAKFVAL) func = kernel_break.breakage_func_1d cdf_values = np.array([quad(func, 0.0, x, args=args)[0] for x in self.rel_frag]) ## normalize the break function cdf_values /= cdf_values[-1] self.cdf_interp = interp1d(cdf_values, self.rel_frag, kind="linear", bounds_error=False, fill_value=(0.0, 1.0)) elif self.dim == 2: rel_frag1_tem = np.linspace(0, 1, num_points) rel_frag3_tem = np.linspace(0, 1, num_points) self.rel_frag1, self.rel_frag3 = np.meshgrid(rel_frag1_tem, rel_frag3_tem) self.break_func = np.zeros((num_points,num_points)) if self.CDF_method == "disc": for i in range(num_points): for j in range(num_points): self.break_func[i, j] = kernel_break.breakage_func_2d(self.rel_frag3[i,j], self.rel_frag1[i,j], 1.0, 1.0, self.pl_v, self.pl_q, self.BREAKFVAL) self.break_func = self.break_func.ravel() elif self.CDF_method == "conti": raise ValueError("The continuous CDF method for the two-dimensional case has not yet been coded!")
# args = (1.0, 1.0, self.v, self.q, self.BREAKFVAL) # func = kernel_break.breakage_func_1d # cdf = np.zeros_like(self.rel_frag1) # for i in range(len(rel_frag1_tem)): # for j in range(len(rel_frag3_tem)): # cdf[i, j] = dblquad(func, 0.0, rel_frag1_tem[i], 0.0, rel_frag3_tem[j])[0] # cdf /= cdf[-1, -1] # self.cdf_interp = RegularGridInterpolator((rel_frag1_tem, rel_frag3_tem), # cdf, bounds_error=False, fill_value=None)
[docs] def check_dim_consistency(self): check = np.array([len(self.x),len(self.PGV),len(self.SIG), self.alpha_mmc.shape[0],self.alpha_mmc.shape[1]]) return len(check[check==len(self.c)]) == len(check)
## JIT-compiled calculation of beta array
[docs]@jit(nopython=True) def calc_betaarray_jit(COLEVAL, a, G, X, beta0, V, V_c): """ Calculate the beta array for Monte Carlo using calc_beta. Parameters ---------- COLEVAL : int Type of beta calculation model. a : int Total number of particles. G : float Shear rate or other related parameter. X : array Particle size array. beta0 : float Base collision frequency constant. V : array Particle volume array. Returns ------- beta : array Beta array formatted for Monte Carlo. """ num = int(a * (a - 1) / 2) # Total number of unique collision pairs. beta = np.zeros((3, num)) cnt = 0 for i in range(a): for j in range(i): # Calculate beta using calc_beta beta_ai = kernel_agg.calc_beta(COLEVAL, beta0, G, X/2, i, j) # Store results in beta array beta[0, cnt] = beta_ai beta[1, cnt] = i beta[2, cnt] = j cnt += 1 return beta
## JIT-compiled calculation of beta array
[docs]@jit(nopython=True) def calc_b_r_jit_1d(BREAKRVAL, a, G, V, pl_P1, pl_P2): pass
[docs]@jit(nopython=True) def calc_alpha_ccm_jit(V, alpha_prim, idx1, idx2): P = np.array([V[0,idx1]/V[-1,idx1]*V[0,idx2]/V[-1,idx2],\ V[0,idx1]/V[-1,idx1]*V[1,idx2]/V[-1,idx2],\ V[1,idx1]/V[-1,idx1]*V[0,idx2]/V[-1,idx2],\ V[1,idx1]/V[-1,idx1]*V[1,idx2]/V[-1,idx2]]) return np.sum(P*alpha_prim)
## JIT-compiled calculation of inter-event-time
[docs]@jit(nopython=True) def calc_inter_event_time_array(Vc,a_tot,betaarray): return 2*Vc/(a_tot**2*np.mean(betaarray[0,:]))
[docs]@jit(nopython=True) def select_size_jit(array): return np.arange(len(array))[np.searchsorted(np.cumsum(array/np.sum(array)), np.random.random(), side="right")]
[docs]@jit(nopython=True) def handle_fragments(V, X, a_tot, particle_to_break, frag_num, BREAKFVAL, dim, break_func, rel_frag): # Ensure frag_num is an integer frag_num_floor = int(np.floor(frag_num)) frag_num_ceil = int(np.ceil(frag_num)) if frag_num_floor <= 1: raise ValueError("The expected number of fragments is less than or equal to 1. \ \n Please check the parameters or calculation of the fragment equation.") if frag_num_floor == frag_num_ceil: frag_num_int = frag_num_floor else: ## Guaranteed mathematical expectation of particles generated by ## all breakage events is frag_num delta = frag_num - frag_num_floor if np.random.random() < delta: frag_num_int = frag_num_ceil else: frag_num_int = frag_num_floor ## To generate n fragments, you need to break them n-1 times, ## and the last fragment left is the last one. for i in range(frag_num_int - 1): frag, X_frag = produce_one_frag(particle_to_break, BREAKFVAL, dim, break_func, rel_frag) V = np.concatenate((V, frag), axis=1) X = np.concatenate((X, X_frag), axis=0) particle_to_break -= frag a_tot += 1 V = np.concatenate((V, particle_to_break), axis=1) X_final = (6*(particle_to_break[-1])/math.pi)**(1/3) X = np.concatenate((X, X_final), axis=0) return V, X, a_tot
[docs]@jit(nopython=True) def produce_one_frag(particle_to_break, BREAKFVAL, dim, break_func, rel_frag): if BREAKFVAL == 1 or BREAKFVAL == 2: if dim == 1: random_value = np.random.random() frag = random_value * particle_to_break # elif dim == 2: # random_value1 = np.random.random() # random_value3 = np.random.random() # frag_x1 = random_value1 * particle_to_break[0] # frag_x3 = random_value3 * particle_to_break[1] # frag = np.array([frag_x1, frag_x3, frag_x1 + frag_x3]) else: if dim == 1: # if self.CDF_method == "disc": frag_idx = select_size_jit(break_func) frag = rel_frag[frag_idx] * particle_to_break # elif self.CDF_method == "conti": # random_value = np.random.random() # rel_frag = self.cdf_interp(random_value) # frag = rel_frag * particle_to_break # elif dim == 2: # frag_idx = select_size_jit(break_func) # i, j = np.unravel_index(frag_idx, rel_frag1.shape) # frag_x1 = particle_to_break[0] * rel_frag1[i, j] # frag_x3 = particle_to_break[1] * rel_frag3[i, j] # frag = np.array([frag_x1, frag_x3, frag_x1 + frag_x3]) X = (6 * (frag[-1]) / math.pi) ** (1 / 3) return frag, X