import numpy as np
import time
import warnings
from typing import Dict
from modopt import Optimizer
from modopt.line_search_algorithms import Minpack2LS
from modopt.merit_functions import AugmentedLagrangian
from modopt.approximate_hessians import BFGSScipy
from modopt import CSDLAlphaProblem
import scipy
# from packaging.version import Version
from scipy._lib._pep440 import Version
[docs]class OpenSQP(Optimizer):
"""
A reconfigurable open-source Sequential Quadratic Programming (SQP) optimizer
developed in modOpt for constrained nonlinear optimization.
Parameters
----------
problem : Problem or ProblemLite
Object containing the problem to be solved.
recording : bool, default=False
If ``True``, record all outputs from the optimization.
This needs to be enabled for hot-starting the same problem later,
if the optimization is interrupted.
out_dir : str, optional
The directory to store all the output files generated from the optimization.
hot_start_from : str, optional
The record file from which to hot-start the optimization.
hot_start_atol : float, default=0.
The absolute tolerance check for the inputs
when reusing outputs from the hot-start record.
hot_start_rtol : float, default=0.
The relative tolerance check for the inputs
when reusing outputs from the hot-start record.
visualize : list, default=[]
The list of scalar variables to visualize during the optimization.
keep_viz_open : bool, default=False
If ``True``, keeps the visualization window open after the optimization is complete.
turn_off_outputs : bool, default=False
If ``True``, prevent modOpt from generating any output files.
verbosity : {0, 1}, default=0
If ``0``, suppresses all console outputs.
If ``1``, prints various convergence measures to monitor optimization progress
and diagnostic messages for debugging purposes.
maxiter : int, default=1000
Maximum number of major iterations.
opt_tol : float, default=1e-7
Optimality tolerance.
feas_tol : float, default=1e-7
Feasibility tolerance.
Certifies convergence when the "scaled" maximum constraint violation
is less than this value.
aqp_primal_feas_tol : float, default=1e-8
Tolerance for the primal feasibility of the augmented QP subproblem.
aqp_dual_feas_tol : float, default=1e-8
Tolerance for the dual feasibility of the augmented QP subproblem.
aqp_time_limit : float, default=5.0
Time limit for the augmented QP solution in seconds.
ls_min_step : float, default=1e-12
Minimum step size for the line search.
ls_max_step : float, default=1.
Maximum step size for the line search.
ls_maxiter : int, default=10
Maximum number of iterations for the line search.
ls_eta_a : float, default=1e-4
Armijo (sufficient decrease condition) parameter for the line search.
ls_eta_w : float, default=0.9
Wolfe (curvature condition) parameter for the line search.
ls_alpha_tol : float, default=1e-14
Relative tolerance for an acceptable step in the line search.
bfgs_init_lag_hess : np.ndarray, default=None
Initial symmetric and positive definite approximation of
the Lagrangian Hessian of shape (nx,nx) for the BFGS update.
Available only with SciPy >= 1.14.0.
init_multipliers : dict, default=None
Initial Lagrange multiplier values for the bounds and constraints.
Should be a dict with keys 'cl', 'cu', 'xl', 'xu' corresponding to
the lower and upper bounds of the constraints and variables, and
values being arrays of the same length as the number of constraints/variables.
readable_outputs : list, default=[]
List of outputs to be written to readable text output files.
Available outputs are: 'major', 'obj', 'x', 'lag_mult', 'slacks', 'constraints', 'opt',
'feas', 'sum_viol', 'max_viol', 'time', 'nfev', 'ngev', 'step', 'rho',
'merit', 'elastic', 'gamma', 'low_curvature'.
"""
# qp_tol : float, default=1e-4
# Tolerance for the QP subproblem.
# qp_maxiter : int, default=5000
# Maximum number of iterations for the QP subproblem.
def initialize(self):
self.solver_name = 'opensqp'
self.nx = self.problem.nx
self.obj = self.problem._compute_objective
self.grad = self.problem._compute_objective_gradient
self.active_callbacks = ['obj', 'grad']
if self.problem.constrained:
self.con_in = self.problem._compute_constraints
self.jac_in = self.problem._compute_constraint_jacobian
self.active_callbacks += ['con', 'jac']
##################################################################
# TEMPORARY: for CSDLAlphaProblem
if type(self.problem) is CSDLAlphaProblem:
self.obj = lambda x: self.problem._compute_objective(x, check_failure=True)
self.grad = lambda x: self.problem._compute_objective_gradient(x, check_failure=True)
if self.problem.constrained:
self.con_in = lambda x: self.problem._compute_constraints(x, check_failure=True)
self.jac_in = lambda x: self.problem._compute_constraint_jacobian(x, check_failure=True)
##################################################################
self.options.declare('verbosity', default=0, values=[0, 1])
self.options.declare('maxiter', default=1000, types=int)
self.options.declare('opt_tol', default=1e-7, types=float)
self.options.declare('feas_tol', default=1e-7, types=float)
self.options.declare('readable_outputs', types=list, default=[])
self.options.declare('aqp_primal_feas_tol', default=1e-8, types=float)
self.options.declare('aqp_dual_feas_tol', default=1e-8, types=float)
self.options.declare('aqp_time_limit', default=5.0, types=float)
# self.options.declare('qp_maxiter', default=5000, types=int)
self.options.declare('ls_min_step', default=1e-12, types=float)
self.options.declare('ls_max_step', default=1.0, types=float)
self.options.declare('ls_maxiter', default=10, types=int)
self.options.declare('ls_eta_a', default=1e-4, types=float)
self.options.declare('ls_eta_w', default=0.95, types=float)
self.options.declare('ls_alpha_tol', default=1e-14, types=float)
self.options.declare('init_multipliers', default=None, types=(type(None), Dict))
if Version(scipy.__version__) >= Version("1.14.0"):
self.options.declare('bfgs_init_lag_hess', default=None, types=(type(None), np.ndarray))
else:
warnings.warn("SciPy version is less than 1.14.0, 'bfgs_init_lag_hess' option is not available.")
self.available_outputs = {
'major': int,
'obj': float,
# for arrays from each iteration, shapes need to be declared
'x': (float, (self.problem.nx, )),
# Note that the number of constraints will
# be updated after constraints are setup
'lag_mult': (float, (self.problem.nc, )),
'slacks': (float, (self.problem.nc, )),
'constraints': (float, (self.problem.nc, )),
'opt': float,
'feas': float,
'sum_viol': float,
'max_viol': float,
'time': float,
'nfev': int,
'ngev': int,
'step': float,
'rho': float,
'merit': float,
'elastic': int,
'gamma': float,
'low_curvature': int,
}
def setup(self):
self.setup_constraints()
self.setup_initial_multipliers()
nx = self.nx
nc = self.nc
nc_e = self.nc_e
self.available_outputs['lag_mult'] = (float, (nc, ))
self.available_outputs['slacks'] = (float, (nc, ))
self.available_outputs['constraints'] = (float, (nc, ))
# Damping parameters
self.delta_rho = 1.
self.num_rho_changes = 0
self.num_rho_increases = 0
self.num_rho_decreases = 0
self.successive_undefined_iterations = 0
init_scale = 1.0
if Version(scipy.__version__) >= Version("1.14.0") and self.options['bfgs_init_lag_hess'] is not None:
init_scale = self.options['bfgs_init_lag_hess'] * 1.0
self.QN = BFGSScipy(nx=nx,
exception_strategy='damp_update',
init_scale=init_scale)
self.MF = AugmentedLagrangian(nx=nx,
nc=nc,
nc_e=nc_e,
f=self.obj,
c=self.con,
g=self.grad,
j=self.jac,
non_bound_indices=self.non_bound_indices)
self.LSS = Minpack2LS(f=self.MF.compute_function,
g=self.MF.compute_gradient,
min_step=self.options['ls_min_step'],
max_step=self.options['ls_max_step'],
maxiter=self.options['ls_maxiter'],
eta_a=self.options['ls_eta_a'],
eta_w=self.options['ls_eta_w'],
alpha_tol=self.options['ls_alpha_tol'],
)
# Adapt constraints to C_e(x)=0, and C_i(x) >= 0
def setup_constraints(self, ):
xl = self.problem.x_lower
xu = self.problem.x_upper
ebi = self.eq_bound_indices = np.array([])
lbi = self.lower_bound_indices = np.where((xl != -np.inf))[0]
ubi = self.upper_bound_indices = np.where((xu != np.inf))[0]
self.eq_bounded = True if len(ebi) > 0 else False
self.lower_bounded = True if len(lbi) > 0 else False
self.upper_bounded = True if len(ubi) > 0 else False
if self.problem.constrained:
cl = self.problem.c_lower
cu = self.problem.c_upper
eci = self.eq_constraint_indices = np.where(cl == cu)[0]
lci = self.lower_constraint_indices = np.where((cl != -np.inf) & (cl != cu))[0]
uci = self.upper_constraint_indices = np.where((cu != np.inf) & (cl != cu))[0]
else:
eci = np.array([])
lci = np.array([])
uci = np.array([])
self.eq_constrained = True if len(eci) > 0 else False
self.lower_constrained = True if len(lci) > 0 else False
self.upper_constrained = True if len(uci) > 0 else False
self.nc_e = len(ebi) + len(eci)
self.nc_b = len(lbi) + len(ubi)
self.nc_i = len(lbi) + len(ubi) + len(lci) + len(uci)
self.nc = self.nc_e + self.nc_i
self.non_bound_indices = np.concatenate([np.arange(self.nc_e), np.arange(self.nc_e+self.nc_b, self.nc)])
def setup_initial_multipliers(self, ):
pi_ = self.options['init_multipliers']
if pi_ is None:
self.pi_0 = None
return
if self.nc_b > 0 and not {'xl', 'xu'}.issubset(pi_.keys()):
raise ValueError("init_multipliers must contain keys 'xl'and 'xu' for problems with variable bounds. \
If unsure about some of the multipliers, set them as zeros.")
lbi = self.lower_bound_indices
ubi = self.upper_bound_indices
if self.problem.constrained:
if not {'cl', 'cu'}.issubset(pi_.keys()):
raise ValueError("init_multipliers must contain keys 'cl' and 'cu' for problems with constraints. \
If unsure about some of the multipliers, set them as zeros.")
eci = self.eq_constraint_indices
lci = self.lower_constraint_indices
uci = self.upper_constraint_indices
pi_0 = np.array([])
if self.eq_constrained:
pi_0 = np.append(pi_0, pi_['cl'][eci])
if self.lower_bounded:
pi_0 = np.append(pi_0, pi_['xl'][lbi])
if self.upper_bounded:
pi_0 = np.append(pi_0, pi_['xu'][ubi])
if self.lower_constrained:
pi_0 = np.append(pi_0, pi_['cl'][lci])
if self.upper_constrained:
pi_0 = np.append(pi_0, pi_['cu'][uci])
self.pi_0 = pi_0
def con(self, x):
ebi = self.eq_bound_indices
lbi = self.lower_bound_indices
ubi = self.upper_bound_indices
if self.problem.constrained:
eci = self.eq_constraint_indices
lci = self.lower_constraint_indices
uci = self.upper_constraint_indices
# Compute problem constraints
c_in = self.con_in(x)
c_out = np.array([])
if self.eq_bounded:
c_out = np.append(c_out, x[ebi] - self.problem.x_lower[ebi])
if self.eq_constrained:
c_out = np.append(c_out, c_in[eci] - self.problem.c_lower[eci])
if self.lower_bounded:
c_out = np.append(c_out, x[lbi] - self.problem.x_lower[lbi])
if self.upper_bounded:
c_out = np.append(c_out, self.problem.x_upper[ubi] - x[ubi])
if self.lower_constrained:
c_out = np.append(c_out, c_in[lci] - self.problem.c_lower[lci])
if self.upper_constrained:
c_out = np.append(c_out, self.problem.c_upper[uci] - c_in[uci])
return c_out
def jac(self, x):
nx = self.nx
# ebi = self.eq_bound_indices
lbi = self.lower_bound_indices
ubi = self.upper_bound_indices
if self.problem.constrained:
eci = self.eq_constraint_indices
lci = self.lower_constraint_indices
uci = self.upper_constraint_indices
# Compute problem constraint Jacobian
j_in = self.jac_in(x)
j_out = np.empty((0, nx), dtype=float)
# if self.eq_bounded:
# j_out = np.append(j_out, np.identity(nx)[ebi], axis=0)
if self.eq_constrained:
j_out = np.append(j_out, j_in[eci], axis=0)
if self.lower_bounded:
j_out = np.append(j_out, np.identity(nx)[lbi], axis=0)
if self.upper_bounded:
j_out = np.append(j_out, -np.identity(nx)[ubi], axis=0)
if self.lower_constrained:
j_out = np.append(j_out, j_in[lci], axis=0)
if self.upper_constrained:
j_out = np.append(j_out, -j_in[uci], axis=0)
return j_out
# Minimize A.L. w.r.t. slacks s.t. s >= 0
def reset_slacks(self, c, pi, rho):
rho_zero_indices = np.where(rho == 0)[0]
rho_nonzero_indices = np.where(rho > 0)[0]
s = np.zeros((len(rho), ))
if len(rho_zero_indices) > 0:
# TODO: test over a range of problems to see which one is better
s[rho_zero_indices] = 0.
# s[rho_zero_indices] = np.maximum(0, c[rho_zero_indices])
if len(rho_nonzero_indices) > 0:
c_rnz = c[rho_nonzero_indices]
pi_rnz = pi[rho_nonzero_indices]
rho_rnz = rho[rho_nonzero_indices]
s[rho_nonzero_indices] = np.maximum(0, (c_rnz - pi_rnz / rho_rnz))
return s
def update_scalar_rho(self, rho_k, dir_deriv_al, pTHp, p_pi, c_k, s_k):
# Damping for scalar rho as in SNOPT
nc = self.nc
delta_rho = self.delta_rho
nbi = self.non_bound_indices
rho_ref = rho_k[0] * 1.
# note: scalar rho_min here
if np.linalg.norm((c_k - s_k)) == 0:
rho_min = 0. # See lemma 4.3 in the paper
else:
rho_min = 2 * np.linalg.norm(p_pi) / np.linalg.norm((c_k - s_k))
rho_computed = rho_min * 1.0
# Damping for rho (Note: vector rho is still not updated)
if rho_k[0] < 4 * (rho_computed + delta_rho):
rho_damped = rho_k[0]
else:
rho_damped = np.sqrt(rho_k[0] * (rho_computed + delta_rho))
rho_new = max(rho_computed, rho_damped)
rho_k[:] = rho_new
# Increasing rho
if rho_k[0] > rho_ref:
if self.num_rho_changes >= 0:
self.num_rho_changes += 1
else:
self.delta_rho *= 2.
self.num_rho_changes = 0
# Decreasing rho
elif rho_k[0] < rho_ref:
if self.num_rho_changes <= 0:
self.num_rho_changes -= 1
else:
self.delta_rho *= 2.
self.num_rho_changes = 0
return rho_k
def update_vector_rho(self, rho_k, dir_deriv_al, pTHp, p_pi, c_k, s_k):
delta_rho = self.delta_rho
num_rho_changes = self.num_rho_changes
nbi = self.non_bound_indices
rho_ref = np.linalg.norm(rho_k)
a = (c_k - s_k) ** 2
b = 0.5 * pTHp + dir_deriv_al + np.inner(a, rho_k)
if b > 0:
a_norm = np.linalg.norm(a)
if a_norm > 0: # Just to be extra cautious (elastic iterations)
rho_computed = (b / (a_norm ** 2)) * a
else:
rho_computed = np.zeros((len(rho_k), ))
else:
rho_computed = np.zeros((len(rho_k), ))
# Damping for rho (Note: vector rho is still not updated)
rho_damped = np.where(
rho_k < 4 * (rho_computed + delta_rho), rho_k,
np.sqrt(rho_k * (rho_computed + delta_rho)))
rho_new = np.maximum(rho_computed, rho_damped)
rho_k[:] = rho_new
rho_norm = np.linalg.norm(rho_k)
# Increasing rho
if rho_norm > rho_ref:
if self.num_rho_decreases >= 2:
self.delta_rho *= 2.
self.num_rho_increases += 1
self.num_rho_decreases = 0
# Decreasing rho
elif rho_norm < rho_ref:
if self.num_rho_increases >= 2:
self.delta_rho *= 2.
self.num_rho_decreases += 1
self.num_rho_increases = 0
# Constant rho
else:
self.num_rho_increases = 0
self.num_rho_decreases = 0
return rho_k
def l1_penalty_line_search(self, x_k, eta, x_qp, p_k, rho_k_Powell, f_k, c_k, g_k):
nc_e = self.nc_e
nx = self.nx
new_f_evals = 0
converged = False
mu = rho_k_Powell[self.non_bound_indices]
c_viol = c_k[self.non_bound_indices]
c_viol[:nc_e] = np.abs(c_viol[:nc_e])
c_viol[nc_e:] = np.maximum(0, -c_viol[nc_e:])
# Penalty merit function value at alpha = alpha
mf0 = f_k + np.dot(mu, c_viol)
exred = g_k.T @ x_qp - np.dot(mu, c_viol) * (1-eta)
ls_iter = 0
step = 1.0
alpha = 1.0
alpha_min = 1e-1
d = p_k[:nx] * 1.0
while True:
ls_iter += 1
exred = alpha*exred # expected reduction (<0) in the merit function
# Note that d needs to recursively scaled down: d != alpha * x_qp
d = alpha * d
x = x_k + d
self.MF.update_functions_in_cache(['f', 'c'], x)
f = self.MF.cache['f'][1]
c = self.MF.cache['c'][1]
new_f_evals += 1
c_viol = c[self.non_bound_indices]
c_viol[:nc_e] = np.abs(c_viol[:nc_e])
c_viol[nc_e:] = np.maximum(0, -c_viol[nc_e:])
# Penalty merit function value at alpha = alpha
mf = f + np.dot(mu, c_viol)
# Actual reduction in the merit function
acred = mf - mf0
# Break out of line search if the change (-ve) in the merit function
# is at least one-tenth of the expected change exred (always negative)
# i.e., the obtained change in the merit function must be more negative
# than one-tenth of the expected reduction
if (acred<=exred/10.0 or ls_iter > 10):
new_g_evals = 0
alpha = step * 1.0
if acred <= exred:
converged = True
break
# Otherwise,
alpha = np.maximum(exred/(2*(exred-acred)), alpha_min)
step *= alpha
return new_f_evals, converged, step
def opt_check(self, pi, c, g, j):
opt_tol = self.options['opt_tol']
if self.nc == 0:
opt_tol_factor = 1.
nonneg_violation = 0.
compl_violation = 0.
stat_violation = np.linalg.norm(g, np.inf)
else:
opt_tol_factor = (1 + np.linalg.norm(pi, np.inf))
if pi[self.nc_e:].size == 0:
nonneg_violation = 0.
else:
nonneg_violation = max(0., -np.min(pi[self.nc_e:]))
# Note: compl_violation can be negative(Question on SNOPT);
# Other 2 violations are always nonnegative
compl_violation = np.max(np.abs(c * pi))
stat_violation = np.linalg.norm(g - j.T @ pi, np.inf)
scaled_opt_tol = opt_tol * opt_tol_factor
opt_check1 = (nonneg_violation <= scaled_opt_tol)
opt_check2 = (compl_violation <= scaled_opt_tol)
opt_check3 = (stat_violation <= scaled_opt_tol)
# opt is always nonnegative
if self.options['verbosity'] == 1:
print('opt_tol_factor:', opt_tol_factor)
print('nonneg_violation:', nonneg_violation)
print('compl_violation:', compl_violation)
print('stat_violation:', stat_violation)
opt = max(nonneg_violation, compl_violation,
stat_violation) / opt_tol_factor
opt_satisfied = (opt_check1 and opt_check2 and opt_check3)
return opt_satisfied, opt
def feas_check(self, x, c):
feas_tol = self.options['feas_tol']
feas_tol_factor = (1 + np.linalg.norm(x, np.inf))
scaled_feas_tol = feas_tol * feas_tol_factor
# Violation is positive
max_con_violation_eq = np.max(np.abs(c[:self.nc_e])) if self.nc_e > 0 else 0.
max_con_violation_ineq = max(0., -np.min(c[self.nc_e:])) if self.nc_i > 0 else 0.
max_con_violation = max(max_con_violation_eq, max_con_violation_ineq)
feas_check = (max_con_violation <= scaled_feas_tol)
sum_con_violation_eq = np.sum(np.abs(c[:self.nc_e]))
sum_con_violation_ineq = np.sum(np.maximum(0., -c[self.nc_e:]))
sum_con_violation = sum_con_violation_eq + sum_con_violation_ineq
feas = max_con_violation / feas_tol_factor
feas_satisfied = feas_check
return feas_satisfied, feas, sum_con_violation, max_con_violation
def exit_early(self, x_k, f_k, c_k, pi_k, opt, feas, nfev, ngev, niter, time, success, exit_msg):
if self.options['verbosity'] == 1:
print(f'{exit_msg} Exiting ...')
self.results = {'x': x_k,
'objective': f_k,
'c': c_k,
'pi': pi_k,
'optimality': opt,
'feasibility': feas,
'nfev': nfev,
'ngev': ngev,
'niter': niter,
'time': time,
'success': success,
'exit_msg': exit_msg}
return self.results
[docs] def solve(self):
try:
import qpsolvers
except ImportError:
raise ImportError("qpsolvers cannot be imported for the QP solver. Install it with 'pip install qpsolvers'.")
try:
from quadprog import solve_qp
except ImportError:
raise ImportError("quadprog cannot be imported for the QP solver. Install it with 'pip install quadprog'.")
try:
import highspy
except ImportError:
raise ImportError("HiGHS cannot be imported for the QP solver. Install it with 'pip install highspy'.")
# Assign shorter names to variables and methods
nx = self.nx
nc = self.nc
nc_e = self.nc_e
nc_i = self.nc_i
x0 = self.problem.x0
verbosity = self.options['verbosity']
maxiter = self.options['maxiter']
aqp_primal_tol = self.options['aqp_primal_feas_tol']
aqp_dual_tol = self.options['aqp_dual_feas_tol']
aqp_time_limit = self.options['aqp_time_limit']
# qp_maxiter = self.options['qp_maxiter']
LSS = self.LSS
QN = self.QN
MF = self.MF
eps = 2.22e-16
start_time = time.time()
# Proximal point initialization for a feasible start wrt bounds
x_k = np.clip(x0, self.problem.x_lower, self.problem.x_upper)
self.MF.update_functions_in_cache(['f', 'c', 'g', 'j'], x_k)
f_k = self.MF.cache['f'][1]
g_k = self.MF.cache['g'][1]
c_k = self.MF.cache['c'][1]
J_k = self.MF.cache['j'][1]
undefined_proximal_point = False
if np.isnan(f_k) or np.isinf(f_k):
warnings.warn('Objective value at the computed proximal initial point is NaN or Inf. Trying given initial point ...')
undefined_proximal_point = True
elif np.any(np.isnan(c_k)) or np.any(np.isinf(c_k)):
warnings.warn('Constraint values at the computed proximal initial point contains NaN or Inf. Trying given initial point ...')
undefined_proximal_point = True
elif np.any(np.isnan(g_k)) or np.any(np.isinf(g_k)):
warnings.warn('Objective gradient at the computed proximal initial point contains NaN or Inf. Trying given initial point ...')
undefined_proximal_point = True
elif np.any(np.isnan(J_k)) or np.any(np.isinf(J_k)):
warnings.warn('Constraint Jacobian at the computed proximal initial point contains NaN or Inf. Trying given initial point ...')
undefined_proximal_point = True
elif verbosity == 1:
print('Proximal point initialization is well-defined and good to go.')
if undefined_proximal_point:
if np.all(x0 == x_k):
exit_msg = 'Initial point provided and proximal point computed were the same and is undefined.'
return self.exit_early(x_k, f_k, c_k, None, None, None, 1, 1, 0, time.time() - start_time, False, exit_msg)
x_k = x0 * 1.
self.MF.update_functions_in_cache(['f', 'c', 'g', 'j'], x_k)
f_k = self.MF.cache['f'][1]
g_k = self.MF.cache['g'][1]
c_k = self.MF.cache['c'][1]
J_k = self.MF.cache['j'][1]
if np.isnan(f_k) or np.isinf(f_k):
exit_msg = 'Objective value at given initial point and computed proximal point is NaN or Inf.'
return self.exit_early(x_k, f_k, c_k, None, None, None, 2, 2, 0, time.time() - start_time, False, exit_msg)
elif np.any(np.isnan(c_k)) or np.any(np.isinf(c_k)):
exit_msg = 'Constraint values at given initial point and computed proximal point contains NaN or Inf.'
return self.exit_early(x_k, f_k, c_k, None, None, None, 2, 2, 0, time.time() - start_time, False, exit_msg)
elif np.any(np.isnan(g_k)) or np.any(np.isinf(g_k)):
exit_msg = 'Gradient at given initial point and computed proximal point contains NaN or Inf.'
return self.exit_early(x_k, f_k, c_k, None, None, None, 2, 2, 0, time.time() - start_time, False, exit_msg)
elif np.any(np.isnan(J_k)) or np.any(np.isinf(J_k)):
exit_msg = 'Constraint Jacobian at given initial point and computed proximal point contains NaN or Inf.'
return self.exit_early(x_k, f_k, c_k, None, None, None, 2, 2, 0, time.time() - start_time, False, exit_msg)
# Set initial values for multipliers and slacks
# pi_k = np.full((nc, ), 1.)
pi_k = self.pi_0 * 1. if self.pi_0 is not None else np.full((nc, ), 0.)
nfev = 1
ngev = 1
# Vector of penalty parameters
rho_k = np.full((nc, ), 0.)
rho_k_Powell = np.full((nc, ), 0.)
# Compute slacks by minimizing A.L. s.t. s_k >= 0
s_k = self.reset_slacks(c_k[nc_e:], pi_k[nc_e:], rho_k[nc_e:])
# s_k = np.maximum(0, c_k) # Use this if rho_k = 0. at start
# Vector of design vars., lag. mults., and slack vars.
v_k = np.concatenate((x_k, pi_k, s_k))
x_k = v_k[:nx]
pi_k = v_k[nx:(nx + nc)]
s_k = v_k[(nx + nc):]
p_k = np.zeros((len(v_k), ))
p_x = p_k[:nx]
p_pi = p_k[nx:(nx + nc)]
p_s = p_k[(nx + nc):]
# Iteration counter
itr = 0
opt_satisfied, opt = self.opt_check(pi_k, c_k, g_k, J_k)
if self.nc == 0:
feas_satisfied, feas, sviol, mviol = True, 0., 0., 0.
else:
feas_satisfied, feas, sviol, mviol = self.feas_check(x_k, c_k)
tol_satisfied = (opt_satisfied and feas_satisfied)
# Evaluate merit function value
MF.set_rho(rho_k)
mf_k = MF.evaluate_function(x_k, pi_k, s_k, f_k, c_k)
mfg_k = MF.evaluate_gradient(x_k, pi_k, s_k, f_k, c_k, g_k, J_k)
# Initializing declared outputs
self.update_outputs(major=0,
x=x_k,
lag_mult=pi_k,
slacks=s_k,
obj=f_k,
constraints=c_k,
opt=opt,
feas=feas,
sum_viol= sviol,
max_viol=mviol,
time=time.time() - start_time,
nfev=nfev,
ngev=ngev,
step=0.,
# rho=rho_k[0],
rho=np.linalg.norm(rho_k),
merit=mf_k,
elastic=0,
gamma=0.,
low_curvature=0)
# merit=penalty_k)
elastic_itr = 0
gamma_0 = 1e6
max_gamma = 1e12
gamma = gamma_0 * 1.
el_wt_check_freq = 25
el_feas_arr = np.zeros((el_wt_check_freq, ))
while itr < maxiter:
itr_start = time.time()
itr += 1
# ALGORITHM STARTS HERE
# >>>>>>>>>>>>>>>>>>>>>
# Compute the search direction toward the next iterate
# Solve a strictly convex quadratic program
# Minimize 1/2 x^T G x - a^T x
# Subject to C.T x >= b
# def solve_qp(double[:, :] G, double[:] a, double[:, :] C=None, double[:] b=None, int meq=0, factorized=False):
# First meq constraints are treated as equality constraints
try:
if c_k.size == 0:
x_qp, f_qp, xu_qp, iter_qp, lag_qp, iact_qp = solve_qp(QN.B_k, -g_k)
lag_qp = np.array([])
else:
x_qp, f_qp, xu_qp, iter_qp, lag_qp, iact_qp = solve_qp(QN.B_k, -g_k, C=J_k.T, b=-c_k, meq=nc_e)
gamma = gamma_0 * 1.
p_k[:nx] = x_qp
p_k[nx:(nx + nc)] = lag_qp - v_k[nx:nx + nc]
elastic_itr = 0 # reset elastic_itr if QP is successful
elastic_mode = False # reset elastic_mode if QP is successful
eta = 0.
except Exception as e:
if verbosity == 1:
print('QP solver failed at major iteration:', itr)
print(e)
if "matrix G is not positive definite" in str(e):
if verbosity == 1:
print('Matrix G is not positive definite. Resetting Hessian.')
# Reset Hessian
wTw = np.dot(w_k, w_k)
wTd = np.dot(w_k, d_k[:nx])
init_scale = wTw / (wTd+1e-16) if wTd > 0 else 1.
self.QN = QN = BFGSScipy(nx=nx,
exception_strategy='damp_update',
init_scale=init_scale)
# init_scale=np.linalg.norm(np.diag(QN.B_k)))
continue # Skip the rest of the iteration and solve QP with new reset Hessian
# break
if 'constraints are inconsistent' in str(e) or \
"unsupported operand type(s) for -: 'NoneType' and 'float'" in str(e) or \
"bad operand type for unary -: 'NoneType'" in str(e) or \
"x_qp is zero" in str(e) or \
"Rank(A) < p or Rank([P; A; G]) < n" in str(e):
if verbosity == 1:
print('Infeasible QP problem. Entering elastic mode.')
elastic_mode = True
while True:
el_feas_arr[elastic_itr % el_wt_check_freq] = feas
elastic_itr += 1
if verbosity == 1:
print('Elastic programmming with weight:', gamma)
# METHOD: Modified Powell's treatment of infeasible QP problems - eq ->ineq
# TODO: only for violation of inequality constraints
# minimize 1/2 x^T B_k x + g_k^T x + gamma/2 * \eta^2
# subject to J_k^T x + c_k(1-\eta) = 0
# J_k^T x + c_k(1-\eta) >= 0 # only for violated constraints
G_elastic = np.block([[QN.B_k, np.zeros((nx, 1))],
[np.zeros((1, nx)), gamma]])
a_elastic = -np.concatenate((g_k, [0.]))
el_c_k = np.concatenate((c_k, -c_k[:nc_e]))
el_c_k = -np.maximum(0, -el_c_k)
J_elastic = np.block([[J_k, -el_c_k[:nc].reshape(nc, 1)],
[-J_k[:nc_e], -el_c_k[nc:].reshape(nc_e, 1)],
[np.zeros((2, nx)), np.array([[1.],[-1.]])]])
b_elastic = np.concatenate((-c_k, c_k[:nc_e], [0., -1.]))
if elastic_itr > el_wt_check_freq:
if gamma < max_gamma:
gamma *= 10.
elastic_itr = 0
qp_prob = qpsolvers.Problem(G_elastic, -a_elastic,
-J_elastic, -b_elastic)
qp_solver = 'highs'
qp_initvals = np.zeros((nx+1, ))
qp_initvals[-1] = 1.
if verbosity == 1:
print('Using elastic QP solver', qp_solver, 'with weight', gamma)
qp_sol = qpsolvers.solve_problem(qp_prob,
qp_solver,
# initvals=qp_initvals,
# verbose=True,
# highs
dual_feasibility_tolerance=aqp_dual_tol,
primal_feasibility_tolerance=aqp_primal_tol,
time_limit=aqp_time_limit,
)
x_qp = qp_sol.x[:nx] * 1.
lag_qp = qp_sol.z[:nc] * 1.
if nc_e > 0:
lag_qp[:nc_e] -= qp_sol.z[nc:nc+nc_e]
p_k[:nx] = x_qp[:nx]
p_k[nx:(nx + nc)] = lag_qp - v_k[nx:nx + nc]
eta = qp_sol.x[nx] * 1.
break
if np.linalg.norm(qp_sol.x[:nx]) == 0 and qp_sol.x[nx] == 1.:
exit_msg = 'Augmented QP is also infeasible'
return self.exit_early(x_k, f_k, c_k, pi_k, opt, feas, nfev, ngev, itr, time.time() - start_time, False, exit_msg)
# Clip the step length such that the design variables remain within bounds
p_k[:nx] = np.clip(p_x, self.problem.x_lower - x_k, self.problem.x_upper - x_k)
# Search direction for s_k :
# (c_k + J_k @ p_x) is the new estimate for s for iniequality constraints
p_k[(nx + nc):] = c_k[nc_e:] + J_k[nc_e:] @ p_k[:nx] - v_k[nx + nc:]
if verbosity == 1:
print('Major iteration:', itr)
print("=====================================")
if self.nc > 0:
dir_deriv_al = np.dot(mfg_k, p_k)
pTHp = p_x.T @ (QN.B_k @ p_x)
# Update penalty parameters
rho_k_Powell = np.maximum(np.abs(lag_qp), 0.5*(rho_k_Powell + np.abs(lag_qp)))
rho_k = self.update_vector_rho(rho_k, dir_deriv_al, pTHp,
p_pi, c_k, np.concatenate([np.zeros((nc_e,)), s_k]))
MF.set_rho(rho_k)
mf_k = MF.evaluate_function(x_k, pi_k, s_k, f_k, c_k)
mfg_k = MF.evaluate_gradient(x_k, pi_k, s_k, f_k, c_k, g_k, J_k)
dir_deriv_al_0 = np.dot(mfg_k, p_k)
# Compute the step length along the search direction via a line search
p_k_temp = p_k * 1.
v_k_temp = v_k * 1.
if dir_deriv_al_0 > -2.22e-16 or np.linalg.norm(p_k[:nx]) <= 2.22e-16:
alpha = 1.0
mf_new, mfg_new, mf_slope_new, new_f_evals, new_g_evals, converged = mf_k, mfg_k, dir_deriv_al_0, 1, 1, True
else:
alpha, mf_new, mfg_new, mf_slope_new, new_f_evals, new_g_evals, converged = LSS.search(
x=v_k_temp, p=p_k_temp, f0=mf_k, g0=mfg_k)
undefined_direction = False
if np.isnan(mf_new) or np.isinf(mf_new) or np.isnan(mfg_new).any() or np.isinf(mfg_new).any():
undefined_direction = True
if verbosity == 1:
print('Merit function or its gradient at the new point has NaN or inf.')
else:
nfev += new_f_evals
ngev += new_g_evals
alpha_golden = 0.061803398875
if (not converged) and (not undefined_direction):
# Use an inexact LS with l1-penalty and only function evaluations
new_f_evals, converged, alpha = self.l1_penalty_line_search(x_k, eta, x_qp, p_k, rho_k_Powell, f_k, c_k, g_k)
nfev += new_f_evals
if not undefined_direction:
if verbosity == 1:
print("###### SUCCESSFUL LINE SEARCH #######")
d_k = alpha * p_k
d_k[nx:(nx + nc)] = d_k[nx:(nx + nc)] / np.sqrt(alpha)
d_k_temp = d_k * 1.
g_old = g_k * 1.
J_old = J_k * 1.
# If the line search failed with nans at alpha=1 earlier, find a smaller alpha where the function is well-defined
undefined_new_point = False
if undefined_direction:
x_k_new = x_k + p_k[:nx]
self.MF.update_functions_in_cache(['f', 'g', 'c', 'j'], x_k_new)
f_new = self.MF.cache['f'][1]
g_new = self.MF.cache['g'][1]
c_new = self.MF.cache['c'][1]
J_new = self.MF.cache['j'][1]
alpha = 1.0
new_f_evals = 0
new_g_evals = 0
while np.isnan(f_new) or np.isinf(f_new):
if verbosity == 1:
print('Objective function is NaN or Inf. Stepping back x_k_new.')
alpha *= 0.1
d_k_temp = alpha * p_k
x_k_new = x_k + d_k_temp[:nx]
self.MF.update_functions_in_cache(['f'], x_k_new)
f_new = self.MF.cache['f'][1]
new_f_evals += 1
if alpha < 1e-12:
undefined_new_point = True
break
if not undefined_new_point:
while np.any(np.isnan(c_new)) or np.any(np.isinf(c_new)):
if verbosity == 1:
print('Constraint values contain NaN or Inf. Stepping back x_k_new.')
alpha *= 0.1
d_k_temp = alpha * p_k
x_k_new = x_k + d_k_temp[:nx]
self.MF.update_functions_in_cache(['c'], x_k_new)
c_new = self.MF.cache['c'][1]
new_f_evals += 1
if alpha < 1e-12:
undefined_new_point = True
break
if not undefined_new_point:
while np.any(np.isnan(g_new)) or np.any(np.isinf(g_new)):
if verbosity == 1:
print('Objective gradient contains NaN or Inf. Stepping back x_k_new.')
alpha *= 0.1
d_k_temp = alpha * p_k
x_k_new = x_k + d_k_temp[:nx]
self.MF.update_functions_in_cache(['g'], x_k_new)
g_new = self.MF.cache['g'][1]
new_g_evals += 1
if alpha < 1e-12:
undefined_new_point = True
break
if not undefined_new_point:
while np.any(np.isnan(J_new)) or np.any(np.isinf(J_new)):
if verbosity == 1:
print('Constraint Jacobian contains NaN or Inf. Stepping back x_k_new.')
alpha *= 0.1
d_k_temp = alpha * p_k
x_k_new = x_k + d_k_temp[:nx]
self.MF.update_functions_in_cache(['j'], x_k_new)
J_new = self.MF.cache['j'][1]
new_g_evals += 1
if alpha < 1e-12:
undefined_new_point = True
break
if self.problem.constrained:
nfev += int(new_f_evals/2)
ngev += int(new_g_evals/2)
else:
nfev += new_f_evals
ngev += new_g_evals
if undefined_new_point:
self.successive_undefined_iterations += 1
if self.successive_undefined_iterations == 1:
if verbosity == 1:
print('No points along the search direction is well-defined. Resetting Hessian.')
self.QN = QN = BFGSScipy(nx=nx,
exception_strategy='damp_update',
init_scale=1.)
continue
if self.successive_undefined_iterations == 2:
exit_msg = 'Two successive iterations with unsuccessful search along predicted direction for well-defined points.'
return self.exit_early(x_k, f_k, c_k, pi_k, opt, feas, nfev, ngev, itr, time.time() - start_time, False, exit_msg)
elif undefined_direction:
self.successive_undefined_iterations = 0
if verbosity == 1:
print('alpha successful without nan:', alpha)
# If the line search failed with nans at alpha=1 earlier, reperform line search with a smaller alpha found above
alpha_new, mf_new, mfg_new, mf_slope_new, new_f_evals, new_g_evals, converged = LSS.search(
x=v_k_temp, p=alpha*p_k, f0=mf_k, g0=mfg_k)
nfev += new_f_evals
ngev += new_g_evals
# If the line search failed to find a point satisfying the Wolfe conditions, try backtracking line search
if not converged:
if verbosity == 1:
print('Inside SLSQP line search: Reperforming backtracking line search with a smaller alpha found above.')
new_f_evals, converged, alpha_new = self.l1_penalty_line_search(x_k, eta, x_qp, alpha*p_k, rho_k_Powell, f_k, c_k, g_k)
nfev += new_f_evals
alpha *= alpha_new
d_k = alpha * p_k
d_k[nx:(nx + nc)] = d_k[nx:(nx + nc)] / np.sqrt(alpha)
d_k_temp = d_k * 1.
v_k += d_k_temp
x_k = v_k[:nx]
pi_k = v_k[nx:(nx + nc)]
g_old = g_k * 1.
J_old = J_k * 1.
self.MF.update_functions_in_cache(['f', 'c', 'g', 'j'], x_k)
f_k = self.MF.cache['f'][1]
c_k = self.MF.cache['c'][1]
g_k = self.MF.cache['g'][1]
J_k = self.MF.cache['j'][1]
v_k[(nx + nc):] = self.reset_slacks(c_k[nc_e:], pi_k[nc_e:], rho_k[nc_e:])
s_k = v_k[(nx + nc):]
# Note: MF changes (decreases) after slack reset
mf_k = MF.evaluate_function(x_k, pi_k, s_k, f_k, c_k)
mfg_k = MF.evaluate_gradient(x_k, pi_k, s_k, f_k, c_k, g_k, J_k)
w_k = (g_k - g_old) - (J_k - J_old).T @ pi_k
dTd = np.dot(d_k[:nx], d_k[:nx])
wTw = np.dot(w_k, w_k)
wTd = np.dot(w_k, d_k[:nx])
B_scaler = wTw / (wTd + 2.22e-16)
dBd = np.dot(d_k[:nx], QN.B_k @ d_k[:nx])
low_curvature = 1 if (wTd > 0.2*dBd) else 0
QN_d_k = d_k[:nx]
QN.update(QN_d_k, w_k)
# # <<<<<<<<<<<<<<<<<<<
# # ALGORITHM ENDS HERE
opt_satisfied, opt = self.opt_check(pi_k, c_k, g_k, J_k)
if self.nc == 0:
feas_satisfied, feas, sviol, mviol = True, 0., 0., 0.
else:
feas_satisfied, feas, sviol, mviol = self.feas_check(x_k, c_k)
tol_satisfied = (opt_satisfied and feas_satisfied)
# Update arrays inside outputs dict with new values from the current iteration
self.update_outputs(
major=itr,
x=x_k,
lag_mult=pi_k,
slacks=s_k,
obj=f_k,
constraints=c_k,
opt=opt,
feas=feas,
sum_viol=sviol,
max_viol=mviol,
time=time.time() - start_time,
nfev=nfev,
ngev=ngev,
# rho=rho_k[0],
rho=np.linalg.norm(rho_k),
step=alpha,
# step=0.5**ls_count,
merit=mf_k,
elastic=(elastic_itr>0),
gamma=gamma,
low_curvature=low_curvature,)
# merit=penalty_k)
if tol_satisfied:
exit_msg = 'Specified convergence tolerances achieved.'
break
if not tol_satisfied:
exit_msg = 'Maximum iterations reached without convergence.'
if verbosity == 1:
print(f'{exit_msg} Exiting ...')
self.total_time = time.time() - start_time
self.results = {
'x': x_k,
'objective': f_k,
'c': c_k,
'pi': pi_k,
'optimality': opt,
'feasibility': feas,
'nfev': nfev,
'ngev': ngev,
'niter': itr,
'time': self.total_time,
'success': tol_satisfied,
'exit_msg': exit_msg
}
# Run post-processing for the Optimizer() base class
self.run_post_processing()
return self.results