Source code for optframework.utils.func.RK_Radau

import numpy as np
import scipy.integrate as integrate
from scipy.linalg import lu_factor, lu_solve

#%% Constant
S6 = 6 ** 0.5
# Butcher tableau. A is not used directly, see below.
C = np.array([(4 - S6) / 10, (4 + S6) / 10, 1])
A_ij = np.array([
    [11/45-7*S6/360, 37/225-169*S6/1800, -2/225+S6/75],
    [37/225+169*S6/1800, 11/45 + 7*S6/360, -2/225-S6/75],
    [4/9-S6/36, 4/9+S6/36, 1/9]
    ])
E = np.array([-13 - 7 * S6, -13 + 7 * S6, -1], dtype=float) / 3.0

# Eigendecomposition of A is done: A = T L T**-1. There is 1 real eigenvalue
# and a complex conjugate pair. They are written below.
MU_REAL = 3 + 3 ** (2 / 3) - 3 ** (1 / 3)
MU_COMPLEX = (3 + 0.5 * (3 ** (1 / 3) - 3 ** (2 / 3))
              - 0.5j * (3 ** (5 / 6) + 3 ** (7 / 6)))

# These are transformation matrices.
T = np.array([
    [0.09443876248897524, -0.14125529502095421, 0.03002919410514742],
    [0.25021312296533332, 0.20412935229379994, -0.38294211275726192],
    [1, 1, 0]])
TI = np.array([
    [4.17871859155190428, 0.32768282076106237, 0.52337644549944951],
    [-4.17871859155190428, -0.32768282076106237, 0.47662355450055044],
    [0.50287263494578682, -2.57192694985560522, 0.59603920482822492]])
# These linear combinations are used in the algorithm.
TI_REAL = TI[0]
TI_COMPLEX = TI[1] + 1j * TI[2]

# Interpolator coefficients.
P = np.array([
[13/3 + 7*S6/3, -23/3 - 22*S6/3, 10/3 + 5 * S6],
[13/3 - 7*S6/3, -23/3 + 22*S6/3, 10/3 - 5 * S6],
[1/3, -8/3, 10/3]])

EPS = np.finfo(float).eps
NEWTON_MAXITER = 10  # Maximum number of Newton iterations.
MIN_FACTOR = 0.2  # Minimum allowed decrease in a step size.
MAX_FACTOR = 10  # Maximum allowed increase in a step size.

NUM_JAC_DIFF_REJECT = EPS ** 0.875
NUM_JAC_DIFF_SMALL = EPS ** 0.75
NUM_JAC_DIFF_BIG = EPS ** 0.25
NUM_JAC_MIN_FACTOR = 1e3 * EPS
NUM_JAC_FACTOR_INCREASE = 10
NUM_JAC_FACTOR_DECREASE = 0.1
#%% FUNCTION
[docs]def func(t, y): return 300*y**0.9
[docs]def analytic_sol(t,y0): return y0 * np.exp(3*t)
[docs]def num_jac(fun,t, y, f, threshold, factor): # y = np.asarray(y) n = y.shape[0] if n == 0: return np.empty((0, 0)), factor if factor is None: factor = np.full(n, EPS ** 0.5) else: factor = factor.copy() # Direct the step as ODE dictates, hoping that such a step won't lead to # a problematic region. For complex ODEs it makes sense to use the real # part of f as we use steps along real axis. f_sign = 2 * (np.real(f) >= 0).astype(np.float64) - 1 y_scale = f_sign * np.maximum(threshold, np.abs(y)) h = (y + factor * y_scale) - y # Make sure that the step is not 0 to start with. Not likely it will be # executed often. for i in range(n): if h[i] == 0: while h[i] == 0: factor[i] *= 10 h[i] = (y[i] + factor[i] * y_scale[i]) - y[i] return dense_num_jac(fun, t, y, f, h, factor, y_scale)
[docs]def dense_num_jac(fun, t, y, f, h, factor, y_scale): n = y.shape[0] h_vecs = np.diag(h) f_new = fun(t, y[:, None] + h_vecs) diff = f_new - f[:, None] max_ind = np.argmax(np.abs(diff), axis=0) r = np.arange(n) max_diff = np.abs(diff[max_ind, r]) scale = np.maximum(np.abs(f[max_ind]), np.abs(f_new[max_ind, r])) diff_too_small = max_diff < NUM_JAC_DIFF_REJECT * scale if np.any(diff_too_small): ind, = np.nonzero(diff_too_small) new_factor = NUM_JAC_FACTOR_INCREASE * factor[ind] h_new = (y[ind] + new_factor * y_scale[ind]) - y[ind] h_vecs[ind, ind] = h_new f_new = fun(t, y[:, None] + h_vecs[:, ind]) diff_new = f_new - f[:, None] max_ind = np.argmax(np.abs(diff_new), axis=0) r = np.arange(ind.shape[0]) max_diff_new = np.abs(diff_new[max_ind, r]) scale_new = np.maximum(np.abs(f[max_ind]), np.abs(f_new[max_ind, r])) update = max_diff[ind] * scale_new < max_diff_new * scale[ind] if np.any(update): update, = np.nonzero(update) update_ind = ind[update] factor[update_ind] = new_factor[update] h[update_ind] = h_new[update] diff[:, update_ind] = diff_new[:, update] scale[update_ind] = scale_new[update] max_diff[update_ind] = max_diff_new[update] diff /= h factor[max_diff < NUM_JAC_DIFF_SMALL * scale] *= NUM_JAC_FACTOR_INCREASE factor[max_diff > NUM_JAC_DIFF_BIG * scale] *= NUM_JAC_FACTOR_DECREASE factor = np.maximum(factor, NUM_JAC_MIN_FACTOR) return diff, factor
[docs]def predict_factor(dt, dt_old, error_norm, error_norm_old): # E. Hairer, S. P. Norsett G. Wanner, "Solving Ordinary Differential # Equations II: Stiff and Differential-Algebraic Problems", Sec. IV.8. if error_norm_old < 0 or dt_old < 0 or error_norm == 0: multiplier = 1 else: multiplier = dt / dt_old * (error_norm_old / error_norm) ** 0.25 with np.errstate(divide='ignore'): factor = min(1, multiplier) * error_norm ** -0.25 return factor
[docs]def norm(x): # Compute RMS norm. return np.linalg.norm(x) / x.size ** 0.5
[docs]def interpolation_radau(t_old, t_current, t_eval, y_old, Q): x = (t_eval - t_old) / (t_current - t_old) order = Q.shape[1] - 1 if t_eval.ndim == 0: p = np.tile(x, order + 1) p = np.cumprod(p) else: p = np.tile(x, (order + 1, 1)) p = np.cumprod(p, axis=0) y = np.dot(Q, p) if y.ndim == 2: y += y_old[:, None] else: y += y_old return y
[docs]def solve_collocation_system(fun, t, y, h, Z0, scale, tol, LU_real, LU_complex): ## see Hairer 1998-Radau n = y.shape[0] M_real = MU_REAL / h M_complex = MU_COMPLEX / h W = TI.dot(Z0) Z = Z0 F = np.empty((3, n)) ch = h * C dW_norm_old = None dW = np.empty_like(W) converged = False rate = None for k in range(NEWTON_MAXITER): for i in range(3): F[i] = fun(t + ch[i], y + Z[i]) if not np.all(np.isfinite(F)): break with np.errstate(divide='raise', over='raise', invalid='raise'): try: f_real = F.T.dot(TI_REAL) - M_real * W[0] f_complex = F.T.dot(TI_COMPLEX) - M_complex * (W[1] + 1j * W[2]) except FloatingPointError: break dW_real = lu_solve(LU_real, f_real, overwrite_b=True,) dW_complex = lu_solve(LU_complex, f_complex, overwrite_b=True) dW[0] = dW_real dW[1] = dW_complex.real dW[2] = dW_complex.imag dW_norm = norm(dW / scale) if dW_norm_old is not None: rate = dW_norm / dW_norm_old if (rate is not None and (rate >= 1 or rate ** (NEWTON_MAXITER - k) / (1 - rate) * dW_norm > tol)): break W += dW Z = T.dot(W) # 注意后续需要求的是y,这里直接求解的是降维度后的W,需要从W求到Z, # Z的第一维度表示每一阶段的结果,到外部之后再从Z求到y,只需要使用最终阶段的值,也就是Z[-1] if (dW_norm == 0 or rate is not None and rate / (1 - rate) * dW_norm < tol): converged = True break dW_norm_old = dW_norm return converged, k + 1, Z, rate
#%% RADAU
[docs]class radau_ii_a(): def __init__(self, func, y0, t_eval, args=(), dt_first=0.01): self.ns = len(y0) self.y0 = y0 self.y_current = y0 self.y_old = np.zeros_like(y0) # self.y_new = np. self.t_eval = t_eval self.t_current = t_eval[0] self.t_old = t_eval[0] self.dt_current = dt_first self.dt_old = -1 self.error_norm_old = -1 self.Q = np.empty((self.ns,3),dtype=np.float64) self.Q_valid = False self.I = np.identity(self.ns) LU_real_array = np.empty((self.ns, self.ns), dtype=np.float64) LU_complex_array = np.empty((self.ns, self.ns), dtype=np.complex128) LU_pivots = np.empty(self.ns, dtype=np.int32) self.LU_real = (LU_real_array, LU_pivots) self.LU_real_valid = False self.LU_complex = (LU_complex_array, LU_pivots) self.LU_complex_valid = False self.jac_factor = np.full(self.ns, EPS ** 0.5) self.dt_crit = 1e-6 self.dt_is_too_small = False ## 对参数进行包装,后续无需重复输入 def func(t, y, func=func): return func(t, y, *args) def func_vectorized(t, y): f = np.empty_like(y) for i, yi in enumerate(y.T): f[:, i] = func(t, yi) return f self.func = func ## 用于计算雅可比矩阵 self.func_vectorized = func_vectorized
[docs] def solve_ode(self, re_tol=1e-1, abs_tol=1e-6): # 创建保存原始结果的容器,用于后续的对t_eval的点进行插值 y_res_tem_list = [] t_res_tem_list = [] error_list = [] rate_list = [] Q_list = [] newton_tol = max(10 * EPS / re_tol, min(0.03, re_tol ** 0.5)) self.is_new_jac = True f_current = self.func(self.t_current,self.y_current) self.J,self.jac_factor = num_jac(self.func_vectorized, self.t_current, self.y_current, f_current, abs_tol, self.jac_factor) y_res_tem_list.append(self.y_current) t_res_tem_list.append(self.t_current) while self.t_current < self.t_eval[-1]: self.radau_ii_a_step(re_tol, abs_tol,newton_tol) if self.dt_is_too_small: break y_res_tem_list.append(self.y_current) t_res_tem_list.append(self.t_current) error_list.append(self.error_norm_old) rate_list.append(self.rate) Q_list.append(self.Q) y_res_tem = np.array(y_res_tem_list).T rate_res_tem = np.array(rate_list) error_res_tem = np.array(error_list) t_res_tem = np.array(t_res_tem_list) Q_res_tem = np.array(Q_list) # 预分配y_evaluated数组 y_evaluated = np.zeros((len(self.y0), len(self.t_eval))) if self.dt_is_too_small: y_evaluated[:] = -1 return y_evaluated, y_res_tem, t_res_tem, rate_res_tem, error_res_tem # 使用searchsorted找到t_eval中每个时间点对应的区间索引 indexes = np.searchsorted(t_res_tem, self.t_eval) - 1 y_evaluated[:,0] = self.y0 for i, index in enumerate(indexes): if i == 0: continue t_old = t_res_tem[index] t_current = t_res_tem[index + 1] y_old = y_res_tem[:,index] # 注意Q,或者说Z是个过程量,其本身比t和y短1,但是是短在结束时间点上,中间的对应关系不变 Q = Q_res_tem[index] # 计算插值 y_evaluated[:,i] = interpolation_radau(t_old, t_current,self.t_eval[i], y_old, Q) return y_evaluated, y_res_tem, t_res_tem, rate_res_tem, error_res_tem
[docs] def radau_ii_a_step(self, re_tol, abs_tol,newton_tol): dt_current = self.dt_current f_current = self.func(self.t_current,self.y_current) rejected = False step_accepted = False while not step_accepted: t_new = self.t_current + dt_current if t_new - self.t_eval[-1] > 0: t_new = self.t_eval[-1] dt_current = t_new - self.t_current # 这里Z0的第一维度是3,因为这个Radau是5阶的,有3组参数,也就是分3阶段求解的,这里Z第一维度的每个占位表示1个阶段 if not self.Q_valid: Z0 = np.zeros((3, self.ns)) else: t_tem = self.t_current + dt_current*C Z0 = interpolation_radau(self.t_old, self.t_current, t_tem, self.y_old, self.Q).T - self.y_current # Z0 = self.Z_old scale = abs_tol + np.abs(self.y_current) * re_tol converged = False while not converged: if dt_current <= self.dt_crit: self.dt_is_too_small = True return if not self.LU_real_valid or not self.LU_complex_valid : self.LU_real = lu_factor(MU_REAL / dt_current *self.I - self.J, overwrite_a=True) self.LU_complex = lu_factor(MU_COMPLEX / dt_current * self.I - self.J, overwrite_a=True) self.LU_real_valid = True self.LU_complex_valid = True converged, n_iter, Z, rate = solve_collocation_system( self.func, self.t_current, self.y_current, dt_current, Z0, scale, newton_tol, self.LU_real, self.LU_complex) if not converged: if self.is_new_jac: break # 这一段是某些情况下后续程序会判断没有必要更新J,所以上边的迭代使用上一步的J计算的,可能会发散 # 这种情况下就重新计算一下J,如果还发散,就缩小dt(或者一开始就是用的新的J计算的,也一样) self.J,self.jac_factor = num_jac(self.func_vectorized, self.t_current, self.y_current, f_current, abs_tol, self.jac_factor) self.is_new_jac = True self.LU_real_valid = False self.LU_complex_valid = False # 求解方程组时没有收敛,尝试缩小dt,这里应当设置一个允许的最小时间步,否则可能会无限循环 if not converged: dt_current *= 0.5 self.LU_real_valid = False self.LU_complex_valid = False print(f"at the time point t_current = {self.t_current}, not converged. dt_current = {dt_current}") # print(f"n_iter of newton solver is {n_iter}, convergenz rate is {rate}") continue # 根据误差估计再调整一次时间步,但只调整一次(为啥?) y_new = self.y_current + Z[-1] ZE = Z.T.dot(E) / dt_current error = lu_solve(self.LU_real, f_current + ZE, overwrite_b=True) scale = abs_tol + np.maximum(np.abs(self.y_current), np.abs(y_new)) * re_tol error_norm = norm(error / scale) # print(f"error is {error}, error_norm is {error_norm}") safety = 0.9 * (2 * NEWTON_MAXITER + 1) / (2 * NEWTON_MAXITER + n_iter) if rejected and error_norm > 1: error = lu_solve(self.LU_real, self.func(self.t_current, self.y_current + error) + ZE, overwrite_b=True) error_norm = norm(error / scale) if error_norm > 1: factor = predict_factor(dt_current, self.dt_old, error_norm, self.error_norm_old) dt_current *= max(MIN_FACTOR, safety * factor) self.LU_real_valid = False self.LU_complex_valid = False rejected = True # print(f"at the time point t_current = {self.t_current}, rejected. dt_current = {dt_current}") # print(f"error_norm is {error_norm}") else: step_accepted = True # 根据变化率和迭代数判断是否需要在下一时间步中重新计算雅可比矩阵 recompute_jac = n_iter > 2 and rate > 1e-3 # 只是对下一步要用的dt的初始值进行了一定的放大,不影响后续本步中的t_new factor = predict_factor(dt_current, self.dt_old, error_norm, self.error_norm_old) # print(f"factor is {factor}") factor = min(MAX_FACTOR, safety * factor) # 为factor设置一个阈值,低于此阈值就不让dt发生变化,防止一点小波动就对dt的频繁改变 if not recompute_jac and factor < 1.2: factor = 1 else: self.LU_real_valid = False self.LU_complex_valid = False f_new = self.func(t_new, y_new) # 如果需要,重新计算下一步骤中要用的雅可比矩阵 if recompute_jac: self.J,self.jac_factor = num_jac(self.func_vectorized, t_new, y_new, f_new, abs_tol, self.jac_factor) self.is_new_jac = True else: self.is_new_jac = False self.y_old = self.y_current self.t_old = self.t_current self.dt_old = self.dt_current self.error_norm_old = error_norm self.rate = rate self.y_current = y_new self.t_current = t_new self.dt_current = dt_current * factor self.Q = np.dot(Z.T, P) self.Q_valid = True # self.Z_old = Z return
#%% MAIN if __name__ == "__main__": y0 = np.array([1e10,1e13,1e12,1e10,1e15]) t_eval=np.arange(0, 100, 10, dtype=float) ode_sys = radau_ii_a(func, y0,t_eval,dt_first=0.1) y_evaluated, y_res_tem, t_res_tem= ode_sys.solve_ode() # y_analytic = np.zeros((len(y0),len(t_eval))) # for idt, t_val in enumerate(t_eval): # y_analytic[:,idt] = analytic_sol(t_val, y0) RES = integrate.solve_ivp(func, [0,max(t_eval)], y0,#t_eval=t_eval, method='Radau',first_step=0.1,rtol=1e-1) # Reshape and save result to N and t_vec y_ivp = RES.y t_ivp = RES.t # y_e_ivp=abs(y_analytic-y_ivp) # y_e_eval=abs(y_analytic-y_evaluated) # y_e = abs(y_evaluated-y_ivp) # y_e.mean(axis=0) # print(y_e.mean(axis=0))