Source code for modopt.core.optimizer

import numpy as np
import scipy as sp
from typing import Union
import os, shutil, copy
from datetime import datetime
import contextlib

from modopt.utils.options_dictionary import OptionsDictionary
from modopt.utils.general_utils import pad_name
# from io import StringIO
from modopt.core.problem import Problem
from modopt.core.problem_lite import ProblemLite
from modopt.core.visualization import Visualizer
import warnings

try:
    import h5py
except ImportError:
    warnings.warn("h5py not found, recording disabled")

from abc import ABC, abstractmethod

[docs]class Optimizer(ABC): ''' Base class for all optimization algorithms in modOpt. This class provides the common functionalities for all the optimization algorithms, such as checking the correctness of first derivatives, recording the outputs, hot-starting the optimization, and visualizing the scalar variables. The user-defined optimization algorithms should inherit from this class. Attributes ---------- problem : Problem or ProblemLite The problem to be solved. Needs to be a Problem() or ProblemLite() object. problem_name : str The name of the problem. solver_name : str The name of the solver. options : OptionsDictionary The options dictionary for the optimizer. record : h5py.File The record file to store the outputs from all iterations of the optimization. hot_start_record : h5py.File The record file from which to hot-start the optimization. modOpt loads and reuses the outputs stored in this file from a previous optimization. out_dir : str The directory to store all the output files generated from the optimization. modopt_output_files : list The list of all files generated by modOpt during the optimization. visualizer : Visualizer The visualizer object to plot the scalar variables during the optimization. timestamp : str The timestamp for the optimization. scalar_outputs : list The list of scalar outputs provided by the optimizer after each iteration. available_outputs : dict The dictionary of all available outputs from the optimizer after each iteration. results : dict The dictionary containing the results of the optimization. update_outputs_count : int Number of times the ``update_outputs()`` method is called. Only relevant for development and debugging purposes. '''
[docs] def __init__(self, problem:Union[Problem, ProblemLite], recording:bool = False, out_dir:str = None, hot_start_from:str = None, hot_start_atol:float = 0., hot_start_rtol:float = 0., visualize:list = [], keep_viz_open:bool = False, turn_off_outputs:bool = False, **kwargs): ''' Initialize the Optimizer object. Sets up recording, hot-starting, and visualization. Parameters ---------- problem : Problem or ProblemLite The problem to be solved. Required argument for all optimizers. recording : bool, default=False If ``True``, record all outputs from the optimization. This needs to be enabled for hot-starting the same problem later, if the optimization is interrupted. out_dir : str, optional The directory to store all the output files generated from the optimization. hot_start_from : str, optional The record file from which to hot-start the optimization. hot_start_atol : float, default=0. The absolute tolerance check for the inputs when reusing outputs from the hot-start record. hot_start_rtol : float, default=0. The relative tolerance check for the inputs when reusing outputs from the hot-start record. visualize : list, default=[] The list of scalar variables to be visualized during the optimization. keep_viz_open : bool, default=False If ``True``, keep the visualization window open after the optimization is complete. turn_off_outputs : bool, default=False If ``True``, prevent modOpt from generating any output files. **kwargs Additional optimizer-specific keyword arguments. ''' # if type(self) is Optimizer: # raise TypeError("Optimizer cannot be instantiated directly.") now = datetime.now() self.timestamp = now.strftime("%Y-%m-%d_%H.%M.%S.%f") self.options = OptionsDictionary() self.problem = problem self.problem_name = problem.problem_name self.solver_name = 'unnamed_solver' self.options.declare('recording', default=False, types=bool) self.options.declare('out_dir', default=None, types=(type(None), str)) self.options.declare('hot_start_from', default=None, types=(type(None), str)) self.options.declare('hot_start_atol', default=0., types=float) self.options.declare('hot_start_rtol', default=0., types=float) self.options.declare('visualize', default=[], types=list) self.options.declare('keep_viz_open', default=False, types=bool) self.options.declare('turn_off_outputs', default=False, types=bool) self.update_outputs_count = 0 self.options.declare('formulation', default='rs', types=str) self.initialize() self.options.update({'recording': recording, 'out_dir': out_dir, 'hot_start_from': hot_start_from, 'hot_start_atol': hot_start_atol, 'hot_start_rtol': hot_start_rtol, 'visualize': visualize, 'keep_viz_open': keep_viz_open, 'turn_off_outputs': turn_off_outputs}) self.options.update(kwargs) # compute the scalar outputs from the optimizer after initialization a_outs = self.available_outputs self.scalar_outputs = [out for out in a_outs.keys() if not isinstance(a_outs[out], tuple)] # Create the outputs directory if not self.options['turn_off_outputs']: default_out_dir = f"{self.problem_name}_outputs/{self.timestamp}" user_out_dir = self.options['out_dir'] self.out_dir = f"{user_out_dir}/{self.timestamp}" if user_out_dir else default_out_dir self.modopt_output_files = [f"directory: {self.out_dir}", 'modopt_results.out'] os.makedirs(self.out_dir) # recursively create the directory else: if self.options['recording']: raise ValueError("Cannot record with 'turn_off_outputs=True'.") if self.options['out_dir']: raise ValueError("Cannot specify 'out_dir' with 'turn_off_outputs=True'.") if self.options['readable_outputs'] != []: raise ValueError("Cannot write 'readable_outputs' with 'turn_off_outputs=True'.") if self.options['visualize'] != []: raise ValueError("Cannot visualize with 'turn_off_outputs=True'.") # Hot starting and recording should start even before setup() is called # since there might be callbacks in the setup() function self.record = self.problem._record = None # Reset if using the same problem object again self.problem._callback_count = 0 # Reset if using the same problem object again self.problem._obj_count = 0 # Reset if using the same problem object again self.problem._grad_count = 0 # Reset if using the same problem object again self.problem._hess_count = 0 # Reset if using the same problem object again self.problem._con_count = 0 # Reset if using the same problem object again self.problem._jac_count = 0 # Reset if using the same problem object again self.problem._reused_callback_count = 0 # Reset if using the same problem object again self.problem._hot_start_mode = False # Reset if using the same problem object again self.problem._hot_start_record = None # Reset if using the same problem object again self.problem._num_callbacks_found = 0 # Reset if using the same problem object again self.problem._hot_start_tol = None # Reset if using the same problem object again self.problem._visualizer = None # Reset if using the same problem object again if self.options['recording']: self.record = self.problem._record = h5py.File(f'{self.out_dir}/record.hdf5', 'a') if self.options['hot_start_from'] is not None: self.setup_hot_start() if self.options['visualize'] != []: # NOTE: This will neglect 'obj_hess', 'lag_hess' active_callbacks for IPOPT # and 'obj_hess', 'lag_hess', 'obj_hvp' for TrustConstr # since these get added in the setup() function. self.setup_visualization() self._setup()
def _setup(self): # User defined optimizer-specific setup self.setup() # Setup outputs to be written to file if not self.options['turn_off_outputs']: self.setup_outputs() def setup_outputs(self): ''' Set up the directory and open files to write the outputs of the optimization problem. Four different types of outputs are written: 1. Summary table: Single file with the scalar outputs of the optimization problem. 2. Readable outputs: A file for each readable_output declared. 3. Recorder: Contains all the outputs of the optimization problem, if recording is enabled. 4. Results: Single file with the readable print_results() string (no setup needed). ''' dir = self.out_dir a_outs = self.available_outputs # Available outputs dictionary d_outs = self.options['readable_outputs'] # Declared outputs list s_outs = self.scalar_outputs # Scalar outputs list # 1. Write the header of the summary_table file if len(s_outs) > 0: header = "%10s " % '#' for key in s_outs: if a_outs[key] in (int, np.int_, np.int32, np.int64): header += "%10s " % key elif a_outs[key] in (float, np.float32, np.float64): header += "%16s " % key with open(f"{dir}/modopt_summary.out", 'w') as f: f.write(header) self.modopt_output_files += ["modopt_summary.out"] # 2. Create the readable output files for key in d_outs: if key not in a_outs: raise ValueError(f'Invalid readable output "{key}" is declared.' \ f'Available outputs are {list(a_outs.keys())}.') with open(f"{dir}/{key}.out", 'w') as f: pass self.modopt_output_files += [f"{key}.out"] # 3. Create the recorder output file and write the attributes if self.options['recording']: constrained = self.problem.constrained rec = self.record self.modopt_output_files += ['record.hdf5'] rec.attrs['problem_name'] = self.problem_name rec.attrs['solver_name'] = self.solver_name rec.attrs['modopt_output_files'] = self.modopt_output_files if hasattr(self, 'default_solver_options'): solver_opts = self.solver_options.get_pure_dict() for key, value in solver_opts.items(): value = 'None' if value is None else value if isinstance(value, (int, float, bool, str)): rec.attrs[f'solver_options-{key}'] = value elif self.solver_name == 'ipopt': # ipopt-specific for key, value in self.nlp_options['ipopt'].items(): if isinstance(value, (int, float, bool, str)): rec.attrs[f'solver_options-{key}'] = value elif self.solver_name.startswith('convex_qpsolvers'): # convex_qpsolvers-specific for key, value in self.options['solver_options'].items(): if isinstance(value, (int, float, bool, str)): rec.attrs[f'solver_options-{key}'] = value else: # for inbuilt solvers opts = self.options.get_pure_dict() for key, value in opts.items(): value = 'None' if value is None else value if isinstance(value, (int, float, bool, str)): rec.attrs[f'options-{key}'] = value rec.attrs['readable_outputs'] = d_outs rec.attrs['recording'] = str(self.options['recording']) rec.attrs['hot_start_from'] = str(self.options['hot_start_from']) rec.attrs['visualize'] = self.options['visualize'] rec.attrs['timestamp'] = self.timestamp rec.attrs['constrained'] = constrained rec.attrs['nx'] = self.problem.nx rec.attrs['nc'] = self.problem.nc rec.attrs['x0'] = self.problem.x0 / self.problem.x_scaler rec.attrs['x_scaler'] = self.problem.x_scaler rec.attrs['o_scaler'] = self.problem.o_scaler # Only for single-objective problems rec.attrs['x_lower'] = self.problem.x_lower / self.problem.x_scaler rec.attrs['x_upper'] = self.problem.x_upper / self.problem.x_scaler rec.attrs['c_scaler'] = self.problem.c_scaler if constrained else 'None' rec.attrs['c_lower'] = self.problem.c_lower / self.problem.c_scaler if constrained else 'None' rec.attrs['c_upper'] = self.problem.c_upper / self.problem.c_scaler if constrained else 'None' def setup_hot_start(self): ''' Open the hot-start record file, compute the number of callbacks found in it, and pass both to the problem object. ''' self.hot_start_record = h5py.File(self.options['hot_start_from'], 'r') num_callbacks_found = len([key for key in list(self.hot_start_record.keys()) if key.startswith('callback_')]) self.problem._hot_start_mode = True self.problem._hot_start_record = self.hot_start_record self.problem._num_callbacks_found = num_callbacks_found self.problem._hot_start_tol = (self.options['hot_start_rtol'], self.options['hot_start_atol']) def setup_visualization(self,): ''' Setup the visualization for scalar variables of the optimization problem. Variables can be either optimizer outputs or callback inputs/outputs. ''' visualize_vars = [] available_vars = sorted(list(set(list(self.available_outputs.keys()) + self.active_callbacks + ['x']))) for s_var in self.options['visualize']: # scalar variables var = s_var.split('[')[0] if var not in available_vars: raise ValueError(f"Unavailable variable '{var}' is declared for visualization. " \ f"Available variables for visualization are {available_vars}.") if var in self.scalar_outputs + ['obj', 'lag']: if var != s_var: raise ValueError(f"Scalar variable '{var}' is indexed for visualization.") else: if var == s_var: raise ValueError(f"Non-scalar variable '{var}' is not indexed for visualization. " \ f"Provide an index to a scalar entry in '{var}' for visualization.") if var in self.available_outputs.keys(): visualize_vars.append(s_var) if var in self.active_callbacks + ['x']: visualize_vars.append('callback_' + s_var) visualize_callbacks = True # # No need to visualize callbacks if all variables are optimizer outputs # if all(s_var.split('[')[0] in self.available_outputs.keys() for s_var in self.options['visualize']): # visualize_callbacks = False self.visualizer = Visualizer(self.problem_name, visualize_vars, self.out_dir) if visualize_callbacks: self.problem._visualizer = self.visualizer
[docs] @abstractmethod def initialize(self): ''' Set solver name and any solver-specific options. ''' # raise NotImplementedError("Subclasses must implement this method.") pass
[docs] @abstractmethod def setup(self): ''' Set up any solver-specific attributes or modules (e.g., merit function or Hessian approximation) required by the optimization algorithm, during optimizer instantiation. This method is called after initialize(). ''' # raise NotImplementedError("Subclasses must implement this method.") pass
[docs] @abstractmethod # Ensure that the subclasses cannot be instantiated unless the method is implemented def solve(self): ''' Run the optimization algorithm to solve the problem. ''' # raise NotImplementedError("Subclasses must implement this method.") pass
def run_post_processing(self): ''' Run the post-processing functions of the optimizer. 1. Write the print_results() output to the the results.out file 2. Write self.results to the record file 3. Save and close the visualization plot ''' self.results['total_callbacks'] = self.problem._callback_count self.results['obj_evals'] = self.problem._obj_count self.results['grad_evals'] = self.problem._grad_count self.results['hess_evals'] = self.problem._hess_count self.results['con_evals'] = self.problem._con_count self.results['jac_evals'] = self.problem._jac_count self.results['reused_callbacks'] = self.problem._reused_callback_count if self.options['turn_off_outputs']: return if self.options['visualize'] != []: if self.options['keep_viz_open']: self.visualizer.keep_plot() # vis_wait = self.visualizer.wait_time else: self.visualizer.close_plot() self.results['vis_time'] = self.visualizer.vis_time self.results['out_dir'] = self.out_dir with open(f"{self.out_dir}/modopt_results.out", 'w') as f: with contextlib.redirect_stdout(f): self.print_results(all=True) if self.options['recording']: group = self.record.create_group('results') for key, value in self.results.items(): if self.solver_name.startswith('convex_qpsolvers') and key in ['problem', 'extras']: continue if isinstance(value, dict): for k, v in value.items(): group[f"{key}-{k}"] = v else: group[key] = value def update_outputs(self, **kwargs): ''' Update and write the outputs of the optimization problem to the corresponding files. Three different types of outputs are written: 1. Summary table: Contains the scalar outputs of the optimization problem 2. Readable outputs: Contains the declared readable outputs 3. Recorder outputs: Contains all the outputs of the optimization problem, if recording is enabled ''' self.out_dict = out_dict = copy.deepcopy(kwargs) if self.options['turn_off_outputs']: return if self.options['visualize'] != []: self.visualizer.update_plot(out_dict) dir = self.out_dir a_outs = self.available_outputs # Available outputs dictionary d_outs = self.options['readable_outputs'] # Declared outputs list if set(kwargs.keys()) != set(a_outs): raise ValueError(f'Output(s) passed in to be updated {list(kwargs.keys())} ' \ f'do not match the available outputs {list(a_outs.keys())}.') # 1. Write the scalar outputs to the summary file if len(self.scalar_outputs) > 0: # Print summary_table row new_row ='\n' + "%10i " % self.update_outputs_count for key in self.scalar_outputs: if a_outs[key] in (int, np.int_, np.int32, np.int64): new_row += "%10i " % kwargs[key] elif a_outs[key] in (float, np.float32, np.float64): new_row += "%16.6E " % kwargs[key] with open(f"{dir}/modopt_summary.out", 'a') as f: f.write(new_row) # 2. Write the declared readable outputs to the corresponding files for key in d_outs: value = kwargs[key] if key in self.scalar_outputs: if np.isscalar(value) and np.isreal(value): with open(f"{dir}/{key}.out", 'a') as f: np.savetxt(f, [value]) else: raise ValueError(f'Value of "{key}" is not a real-valued scalar.') else: # Multidim. arrays will be flattened (c-major/row major) before writing to a file with open(f"{dir}/{key}.out", 'a') as f: np.savetxt(f, value.reshape(1, value.size)) # 3. Write the outputs to the recording files if self.options['recording']: group_name = 'iteration_' + str(self.update_outputs_count) group = self.record.create_group(group_name) for var, value in out_dict.items(): group[var] = value self.update_outputs_count += 1 def check_if_callbacks_are_declared(self, cb, cb_str, solver_str): if cb not in self.problem.user_defined_callbacks: if isinstance(self.problem, Problem): raise ValueError(f"{cb_str} function is not declared in the Problem() subclass but is needed for {solver_str}.") elif isinstance(self.problem, ProblemLite): warnings.warn(f"{cb_str} function is not provided in the ProblemLite() container but is needed for {solver_str}. "\ f"The optimizer will use finite differences to compute the {cb_str}.")
[docs] def print_results(self, summary_table=False, all=False): ''' Print the results of the optimization problem to the terminal. Parameters ---------- summary_table : bool, default=False If ``True``, print the summary table for the optimization. all : bool, default=False If ``False``, print only the scalar outputs of the optimization problem. Otherwise, print all the outputs of the optimization problem. ''' # TODO: Testing to verify the design variable data # print( # np.loadtxt(self.problem_name + '_outputs/x.out')) output = "\n\tSolution from modOpt:" output += "\n\t"+"-" * 100 output += f"\n\t{'Problem':25}: {self.problem_name}" output += f"\n\t{'Solver':25}: {self.solver_name}" for key, value in self.results.items(): if np.isscalar(value): output += f"\n\t{key:25}: {value}" elif all: output += f"\n\t{key:25}: {value}" output += '\n\t' + '-'*100 print(output) if summary_table: with open(f"{self.out_dir}/modopt_summary.out", 'r') as f: # lines = f.readlines() lines = f.read().splitlines() title = "modOpt summary table:" line_length = max(len(lines[0]), len(title)) # Print header output = "\n" + "=" * line_length output += f"\n{title.center(line_length)}" output += "\n" + "=" * line_length # Print all iterations output += "\n" + "\n".join(lines) output += "\n" + "=" * line_length print(output)
def print_callback_counts(self): print('\n') print(f"{'Problem':20}: {self.problem_name}") print('-'*100) print(f"{'total_callbacks':20}: {self.problem._callback_count}") print(f"{'reused_callbacks':20}: {self.problem._reused_callback_count}") for key in ['obj', 'grad', 'hess', 'con', 'jac']: count = getattr(self.problem, f"_{key}_count") name = f"{key}_callbacks" print(f"{name:20}: {count}") print('-'*100) def get_callback_counts_string(self, length): output = f"\n\t{'Total callbacks':{length}}: {self.problem._callback_count}" output += f"\n\t{'Reused callbacks':{length}}: {self.problem._reused_callback_count}" for key in ['obj', 'grad', 'hess', 'con', 'jac']: count = getattr(self.problem, f"_{key}_count") name = f"{key} callbacks" output += f"\n\t{name:{length}}: {count}" return output
[docs] def check_first_derivatives(self, x=None, step=1e-6, formulation='rs'): ''' Check the first derivatives of the optimization problem using finite differences. Parameters ---------- x : np.ndarray, optional The design variables at which the derivatives are to be checked. If not provided, the initial design variables are used. step : float, default=1e-6 The step size for the finite differences. ''' obj = self.obj grad = self.grad if x is None: x = self.problem.x0 if self.problem.ny == 0: nx = self.problem.nx nc = self.problem.nc ############################### # Only for the SURF algorithm # ############################### else: nx = self.problem.nx + self.problem.ny nc = self.problem.nc + self.problem.nr ############################### constrained = False if nc != 0: constrained = True con = self.problem._compute_constraints jac = self.problem._compute_constraint_jacobian ############################### # Only for the SURF algorithm # ############################### if formulation in ('cfs', 'surf'): print("INSIDE cfs or surf") y = self.problem.solve_residual_equations( x[:self.problem.nx]) x[self.problem.nx:] = y self.problem.formulation = 'fs' ############################### grad_exact = grad(x) if constrained: jac_exact = jac(x) h = step grad_fd = np.full((nx, ), -obj(x), dtype=float) if constrained: jac_fd = np.outer(-con(x), np.ones((nx, ), dtype=float)) for i in range(nx): e = h * np.identity(nx)[i] grad_fd[i] += obj(x + e) if constrained: jac_fd[:, i] += con(x + e) grad_fd /= h if constrained: jac_fd /= h EPSILON = 1e-10 # print('grad_exact:', grad_exact) # print('grad_fd:', grad_fd) # print('jac_exact:', jac_exact) # print('jac_fd:', jac_fd) grad_abs_error = np.absolute(grad_fd - grad_exact) # FD is assumed to give the actual gradient grad_rel_error = grad_abs_error / (np.linalg.norm(grad_fd, 2) + EPSILON) # grad_rel_error = grad_abs_error / (np.absolute(grad_fd) + EPSILON) if constrained: jac_abs_error = np.absolute(jac_fd - jac_exact) jac_rel_error = jac_abs_error / (np.linalg.norm(jac_fd, 'fro') + EPSILON) # jac_rel_error = jac_abs_error / (np.absolute(jac_fd) + EPSILON) # jac_rel_error = jac_abs_error / (np.absolute(jac_exact.toarray()) + EPSILON) # out_buffer = StringIO() header = "{0} | {1} | {2} | {3} | {4} "\ .format( pad_name('Derivative type', 8, quotes=False), pad_name('Calc norm', 10), pad_name('FD norm', 10), pad_name('Abs error norm', 10), pad_name('Rel error norm', 10), ) # out_buffer.write('\n' + header + '\n') print('\n' + '-' * len(header)) print(header) # out_buffer.write('-' * len(header) + '\n' + '\n') print('-' * len(header) + '\n') deriv_line = "{0} | {1:.4e} | {2:.4e} | {3:.4e} | {4:.4e} " grad_line = deriv_line.format( pad_name('Gradient', 15, quotes=False), np.linalg.norm(grad_exact), np.linalg.norm(grad_fd), np.linalg.norm(grad_abs_error), np.linalg.norm(grad_rel_error), ) # out_buffer.write(grad_line + '\n') print(grad_line) if constrained: if isinstance(jac_exact, np.ndarray): jac_exact_norm = np.linalg.norm(jac_exact) else: jac_exact_norm = sp.sparse.linalg.norm(jac_exact) jac_line = deriv_line.format( pad_name('Jacobian', 15, quotes=False), jac_exact_norm, # np.linalg.norm(jac_exact), # sp.sparse.linalg.norm(jac_exact), np.linalg.norm(jac_fd), np.linalg.norm(jac_abs_error), np.linalg.norm(jac_rel_error), ) # out_buffer.write(jac_line + '\n') print(jac_line) print('-' * len(header) + '\n')