# -*- coding: utf-8 -*-
"""
Optimization algorithm based on ray-Tune
"""
# import uuid
import os
import sqlite3
from filelock import FileLock
import json
# import numpy as np
# import math
# import ray
from ray import tune
# from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# from ray.util.placement_group import placement_group, placement_group_table
import ray.util.multiprocessing as mp
from ray.tune.search.optuna import OptunaSearch
from ray.tune.search.hebo import HEBOSearch
from optuna.samplers import GPSampler,CmaEsSampler,TPESampler,NSGAIIISampler,QMCSampler
from ray.tune.search import ConcurrencyLimiter
from .opt_core_ray import OptCoreRay
from .opt_core_multi_ray import OptCoreMultiRay
from ray._private import state
[docs]class OptBaseRay():
def __init__(self, base):
self.base = base
[docs] def print_current_actors(self):
"""
Print the current number of active Ray actors.
Returns
-------
None
"""
actors = state.actors()
print(f"Number of actors: {len(actors)}")
for actor_id, actor_info in actors.items():
print(f"Actor ID: {actor_id}, State: {actor_info['State']}")
[docs] def multi_optimierer_ray(self, opt_params_space, exp_data_paths=None, known_params=None):
"""
Optimize PBE parameters using multiple Ray Tuners, managed via Ray multiprocessing.
This method enables multiple instances of Ray Tune to run concurrently, each tuning
with different experimental data sets or parameters. It splits the job queue based on
the number of available jobs and distributes the tasks across multiple processes
managed by Ray's multiprocessing pool.
Parameters
----------
opt_params_space : dict
A dictionary of optimization parameters, where each key corresponds to a parameter name
and its value contains information about the bounds and scaling (logarithmic or linear).
exp_data_paths : list of str, optional
Paths to the experimental data for each tuning instance.
known_params : list of dict, optional
Known parameters to be passed to the optimization process.
Returns
-------
list
A list of dictionaries, where each dictionary contains the optimized parameters and
the corresponding objective score (delta).
"""
base = self.base
# --- build the Ray Tune search space once ---
base.RT_space = {}
for name, info in opt_params_space.items():
if "fixed" in info:
# just hand back the literal value
base.RT_space[name] = info["fixed"]
else:
lo, hi = info["bounds"]
if info.get("log_scale", False):
base.RT_space[name] = tune.loguniform(10**lo, 10**hi)
else:
base.RT_space[name] = tune.uniform(lo, hi)
# Build the job queue
job_queue = list(zip(exp_data_paths or [], known_params or []))
queue_length = len(job_queue)
# Ensure num_jobs does not exceed queue length
self.check_num_jobs(queue_length)
num_workers = base.core.num_jobs
# The worker will pull its slice of the queue, look up warm-start params,
# and pass them into optimierer_ray
def worker(job_queue_slice):
job_results = []
for paths, params in job_queue_slice:
# call optimierer_ray with warm start
result = self.optimierer_ray(
exp_data_paths=paths,
known_params=params,
)
job_results.append(result)
return job_results
# split into roughly equal slices
job_slices = [job_queue[i::num_workers] for i in range(num_workers)]
# dispatch with multiprocessing
results = []
with mp.Pool(num_workers) as pool:
batches = pool.map(worker, job_slices)
for batch in batches:
results.extend(batch)
return results
[docs] def check_num_jobs(self, queue_length):
"""
Adjust the number of concurrent Tuners to the available job queue length.
Parameters
----------
queue_length : int
The length of the job queue, representing the number of data sets available for tuning.
Returns
-------
None
"""
base = self.base
if queue_length < base.core.num_jobs:
base.core.num_jobs = queue_length
def _save_warm_params(self, db_path, data_name, all_params, all_score):
assert len(all_params) == len(all_score)
lock_path = db_path + ".lock"
with FileLock(lock_path): # Ensure concurrent safety
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
# Create a table(if not exist)
cursor.execute("""
CREATE TABLE IF NOT EXISTS warm_params (
id INTEGER PRIMARY KEY AUTOINCREMENT,
data_name TEXT,
param_json TEXT,
score REAL
)
""")
# delete the old data_name (if exist)
cursor.execute("DELETE FROM warm_params WHERE data_name = ?", (data_name,))
# insert new values
records = [(data_name, json.dumps(p), s) for p, s in zip(all_params, all_score)]
cursor.executemany("INSERT INTO warm_params (data_name, param_json, score) VALUES (?, ?, ?)", records)
conn.commit()
conn.close()
def _load_warm_params(self, db_path, data_name):
if not os.path.exists(db_path):
return [], []
conn = sqlite3.connect(db_path)
cursor = conn.cursor()
cursor.execute("SELECT param_json, score FROM warm_params WHERE data_name = ?", (data_name,))
rows = cursor.fetchall()
conn.close()
all_params = [json.loads(p) for p, _ in rows]
all_score = [s for _, s in rows]
return all_params, all_score
[docs] def optimierer_ray(self, opt_params_space=None, exp_data_paths=None,known_params=None, evaluated_params=None):
"""
Optimize PBE parameters using Ray's Tune module, based on the delta calculated by the method `calc_delta_agg`.
This method utilizes Ray Tune for hyperparameter optimization. It sets up the search
space and runs the optimization process, either for single or multi-dimensional
PBEs. The results are saved and returned in a dictionary
that contains the optimized parameters and the corresponding objective score.
Parameters
----------
opt_params_space : dict, optional
A dictionary of optimization parameters, where each key corresponds to a parameter name
and its value contains information about the bounds and scaling (logarithmic or linear).
exp_data_paths : list of str, optional
Paths to the experimental or synthetic data to be used for optimization. For multi-case,
this should be a list containing the paths for 1D and 2D data.
known_params : dict, optional
Known parameters to be passed to the optimization process.
Returns
-------
dict
A dictionary containing:
- "opt_score": The optimized objective score (delta value).
- "opt_params_space": The optimized parameters from the search space.
- "file_path": The path(s) to the experimental data used for optimization.
"""
base = self.base
# evaluated_params = getattr(self.core, 'evaluated_params', None)
# evaluated_rewards = getattr(self.core, 'evaluated_rewards', None)
evaluated_params = None
evaluated_rewards = None
# Prepare experimental data (either for 1D or 2D)
if isinstance(exp_data_paths, list):
# When set to multi, the exp_data_paths entered here is a list containing one 2d data name and two 1d data names.
x_uni_exp = []
data_exp = []
for exp_data_paths_tem in exp_data_paths:
x_uni_exp_tem, data_exp_tem = base.core.p.get_all_data(exp_data_paths_tem)
x_uni_exp.append(x_uni_exp_tem)
data_exp.append(data_exp_tem)
data_name = getattr(base.core, 'data_name_tune', os.path.basename(exp_data_paths[0]))
else:
# When not set to multi or optimization of 1d-data, the exp_data_paths contain the name of that data.
x_uni_exp, data_exp = base.core.p.get_all_data(exp_data_paths)
data_name = os.path.basename(exp_data_paths)
# Reuse the previous parameters as warm-up for new optimization
resume_unfinished = getattr(base.core, 'resume_unfinished', False)
result_dir = getattr(base.core, 'result_dir', base.core.tune_storage_path)
if resume_unfinished:
n_prev = getattr(base.core, 'n_iter_prev', 0)
warm_params_path = os.path.join(result_dir, f"{n_prev}.sqlite")
evaluated_params, evaluated_rewards = self._load_warm_params(warm_params_path, data_name)
evaluated_rewards = None
# Set up the Ray Tune search space
if opt_params_space is not None:
base.RT_space = {}
for name, info in opt_params_space.items():
if "fixed" in info:
# just hand back the literal value
base.RT_space[name] = info["fixed"]
else:
lo, hi = info["bounds"]
if info.get("log_scale", False):
base.RT_space[name] = tune.loguniform(10**lo, 10**hi)
else:
base.RT_space[name] = tune.uniform(lo, hi)
# Create the search algorithm
algo = self.create_algo(evaluated_params=evaluated_params, evaluated_rewards=evaluated_rewards)
# Clean up the data name for output storage
if data_name.startswith("Sim_"):
data_name = data_name[len("Sim_"):]
if data_name.endswith(".xlsx"):
data_name = data_name[:-len(".xlsx")]
# Define the directory name creator for each trial
def trial_dirname_creator(trial):
return f"trial_{trial.trial_id}"
# Set up the trainable function based on the multi_flag
if not base.multi_flag:
trainable = tune.with_parameters(OptCoreRay, core_params=base.core_params, pop_params=base.pop_params,
data_path=base.data_path, exp_data_paths=exp_data_paths,
x_uni_exp=x_uni_exp, data_exp=data_exp, known_params=known_params,
exp_case=base.core.exp_data)
else:
trainable = tune.with_parameters(OptCoreMultiRay, core_params=base.core_params, pop_params=base.pop_params,
data_path=base.data_path, exp_data_paths=exp_data_paths,
x_uni_exp=x_uni_exp, data_exp=data_exp, known_params=known_params,
exp_case=base.core.exp_data)
# Define the resources used for each trial using PlacementGroupFactory
trainable_with_resources = tune.with_resources(trainable,
resources=tune.PlacementGroupFactory([{"CPU": base.core.cpus_per_trail}]),
)
# trainable_with_resources = tune.with_resources(trainable,
# {"cpu": base.core.cpus_per_trail},
# )
# checkpoint_path_save = os.path.join(base.core.tune_storage_path, f"{data_name}_checkpoint_{n_save}.pkl")
# if resume_unfinished:
# # checkpoint_path_re = os.path.join(base.core.tune_storage_path, f"{data_name}_checkpoint_{n_prev}.pkl")
# # Resume tuning
# # algo.restore_from_dir(
# # os.path.join(base.core.tune_storage_path, data_name))
# # algo.restore(checkpoint_path_re)
# # Set up a new Ray Tune Tuner
# tuner = tune.Tuner(
# trainable_with_resources,
# param_space=base.RT_space,
# tune_config=tune.TuneConfig(
# num_samples=base.core.n_iter,
# search_alg=algo,
# reuse_actors=True,
# trial_dirname_creator=trial_dirname_creator,
# ),
# run_config=tune.RunConfig(
# storage_path =base.core.tune_storage_path,
# name = data_name,
# verbose = base.core.verbose, # verbose=0: no trial info, 1: basic info, 2: detailed info
# stop={"training_iteration": 1},
# )
# )
# else:
# Set up a new Ray Tune Tuner
tuner = tune.Tuner(
trainable_with_resources,
param_space=base.RT_space,
tune_config=tune.TuneConfig(
num_samples=base.core.n_iter,
search_alg=algo,
reuse_actors=True,
trial_dirname_creator=trial_dirname_creator,
),
run_config=tune.RunConfig(
storage_path =base.core.tune_storage_path,
name = data_name,
verbose = base.core.verbose, # verbose=0: no trial info, 1: basic info, 2: detailed info
stop={"training_iteration": 1},
)
)
# Run the optimization process
results = tuner.fit()
# algo.save(checkpoint_path_save)
all_params = []
all_score = []
for trial in results:
config = trial.config
score = trial.metrics.get("loss", None)
if score is not None:
all_params.append(config)
all_score.append(score)
warm_params_path = os.path.join(result_dir, f"{base.core.n_iter}.sqlite")
self._save_warm_params(warm_params_path, data_name, all_params, all_score)
# Get the best result from the optimization
opt_result = results.get_best_result(metric="loss", mode="min")
opt_params = opt_result.config
opt_exp_data_paths = opt_result.metrics["exp_paths"]
opt_score = opt_result.metrics["loss"]
result_dict = {
"opt_score": opt_score,
"opt_params": opt_params,
"file_path": opt_exp_data_paths,
}
return result_dict
[docs] def create_algo(self, batch=False, evaluated_params=None, evaluated_rewards=None):
"""
Create and return the search algorithm to be used for hyperparameter optimization.
This method creates a search algorithm based on the `method` attribute of the core object.
It supports a variety of search algorithms from the Optuna library, including Bayesian
optimization (`GP`), tree-structured Parzen estimators (`TPE`), covariance matrix adaptation
evolution strategy (`Cmaes`), NSGA-II (`NSGA`), and quasi-Monte Carlo sampling (`QMC`).
Optionally, it can also limit the number of concurrent trials.
The number of concurrent trials controls the parallelism of the optimization process. In theory,
increasing the number of concurrent trials speeds up the calculation, but it may reduce the
convergence rate due to less frequent information sharing between trials. Empirically, a range of
4-12 concurrent trials tends to work well.
The `batch` parameter controls whether a new batch of trials is submitted only after the current
batch finishes all trials. Note that for some algorithms, the batch setting is fixed; for example,
the HEBO algorithm always uses batching.
Parameters
----------
batch : bool, optional
Whether to use batch mode for the concurrency limiter. Default is False.
Returns
-------
search_alg : object
The search algorithm instance to be used for optimization.
"""
base = self.base
# if base.core.method == 'HEBO':
# search_alg = HEBOSearch(metric="loss", mode="min", random_state_seed=base.core.random_seed)
if base.core.method == 'GP':
search_alg = OptunaSearch(metric="loss", mode="min", sampler=GPSampler(seed=base.core.random_seed),
points_to_evaluate=evaluated_params, evaluated_rewards=evaluated_rewards)
elif base.core.method == 'TPE':
search_alg = OptunaSearch(metric="loss", mode="min", sampler=TPESampler(seed=base.core.random_seed),
points_to_evaluate=evaluated_params, evaluated_rewards=evaluated_rewards)
elif base.core.method == 'Cmaes':
search_alg = OptunaSearch(metric="loss", mode="min", sampler=CmaEsSampler(seed=base.core.random_seed),
points_to_evaluate=evaluated_params, evaluated_rewards=evaluated_rewards)
elif base.core.method == 'NSGA':
search_alg = OptunaSearch(metric="loss", mode="min", sampler=NSGAIIISampler(seed=base.core.random_seed),
points_to_evaluate=evaluated_params, evaluated_rewards=evaluated_rewards)
elif base.core.method == 'QMC':
search_alg = OptunaSearch(metric="loss", mode="min", sampler=QMCSampler(scramble=True, seed=base.core.random_seed),
points_to_evaluate=evaluated_params, evaluated_rewards=evaluated_rewards)
else:
raise ValueError(f"Unsupported sampler detected: {base.core.method}")
# If no concurrency limit is set, return the search algorithm directly
if base.core.max_concurrent is None:
return search_alg
else:
# Limit the number of concurrent trials using ConcurrencyLimiter
return ConcurrencyLimiter(search_alg, max_concurrent=base.core.max_concurrent, batch=batch)