Source code for torchmdo.optimize.optimizer

import torch
from numpy import ndarray
from scipy.optimize import (
    minimize,
    NonlinearConstraint,
    OptimizeResult,
    check_grad,
)
from scipy.sparse import csr_matrix, eye
from logging import getLogger
from typing import Union, Optional, List, Tuple, cast, Dict, Callable
from warnings import warn
from pdb import set_trace
from functools import cached_property
from .input_output import (
    DesignVariable,
    Output,
    Constraint,
    Minimize,
    Maximize,
    NearestFeasible,
)
from ..model import Model

logger = getLogger(__name__)
Tensor = torch.Tensor
as_tensor = torch.as_tensor


[docs]class Optimizer: """ Object used to perform non-linear constrained optimization. Args: initial_design_variables: list of design variables to be used in the optimization problem. objective: objective to be used in the optimization problem. constraints: list of constraints to be used in the optimization problem. model: object that computes all outputs when ``model.compute()`` is called. vectorize_constraint_jac: if true then the computation of the constraint jacobian will be vectorized which may help a lot when many constraints are present. Note, however, that this is an experimental feature. Default: `False`. """ def __init__( self, initial_design_variables: List[DesignVariable], objective: Union[Minimize, Maximize, NearestFeasible], constraints: List[Constraint], model: Model, vectorize_constraint_jac: bool = False, ): self.vectorize_constraint_jac = bool(vectorize_constraint_jac) # check model assert isinstance(model, Model) self.model = model # check outputs assert isinstance(objective, (Minimize, Maximize, NearestFeasible)) for out in constraints: assert isinstance(out, Constraint) self.outputs = cast(List[Output], [objective, *constraints]) self.objective_index = 0 # is always the first one # check to make sure that the model has all of the design variables attributes for idv in initial_design_variables: assert isinstance(idv, DesignVariable) if not hasattr(self.model, idv.name): raise KeyError( "Name '%s' of the design variable does not match an attribute in the Model" % idv.name ) # if the initial value of the design variable is not set then extract it # from the model if idv.value is None: assert getattr(self.model, idv.name) is not None, ( "No initial value found for design variable '%s'" % idv.name ) idv.extract_val(model=self.model) self.initial_design_variables = initial_design_variables # set parameters in the model to the initial design variables self.variables_object = self.initial_design_variables # do a sanity check to make sure the reconstructed desgin variables are the same for (original, reconstructed) in zip( self.initial_design_variables, self.variables_object ): try: assert original.name == reconstructed.name assert torch.all(original.value_tensor == reconstructed.value_tensor) except: logger.error( "design variable internalization failed for %s." % original.name ) raise # initialize an tensor to save the variable values from the last iteration so we # can check whether or not they have changed self._last_variables = None @property def variables_object(self) -> List[DesignVariable]: """ create a copy of the initial design variables with a change made to the value """ design_variables = [ idv.replace(value=as_tensor(getattr(self.model, idv.name))) for idv in self.initial_design_variables ] return design_variables @variables_object.setter def variables_object(self, design_variables: List[DesignVariable]) -> None: """ Set the design variables to the specified values in the Model. """ for dv in design_variables: # make sure design variables are of the correct type assert isinstance(dv, DesignVariable) setattr(self.model, dv.name, dv.value_tensor) # reset state since variables changed self.reset_state() @property def variables_tensor(self) -> Tensor: """ returns the variables as a 1d tensor (ignoring the fixed variables and scaling appropriately) """ return torch.cat( [ torch.ravel(dv.value_tensor) / dv.scale for dv in self.variables_object if not dv.fixed ] ) @variables_tensor.setter def variables_tensor(self, value: Tensor): """ setter for variables_tensor property (assuming fixed variables are ignored and un-scaling appropriately) """ if ( self._last_variables is None or torch.any(value != self._last_variables) or (not self._last_variables.requires_grad and value.requires_grad) ): # if any value changed, or if the previous computation was done without # `requires_grad` and it is now required, # then update the model with the new variables i_cur = 0 # current variable index for idv in self.initial_design_variables: if not idv.fixed: setattr( self.model, idv.name, torch.reshape(value[i_cur : (i_cur + idv.numel)], idv.shape) * idv.scale, ) i_cur += idv.numel # update the position of the index assert ( i_cur == value.numel() ), "sanity check: did not use all design variables" # reset the state since the variables have changed self.reset_state() # save parameters for next iteration # NOTE: we do not detach this attribute since we will need to gradients to # flow through. self._last_variables = value else: pass # nothing changed so pass @property def variable_bounds_tensor(self) -> Tuple[Tensor, Tensor]: """ get bounds for all optimization variables that are not fixed, with appropriate scaling. """ lower = torch.cat( [ idv.lower_tensor.reshape(-1) / idv.scale for idv in self.initial_design_variables if not idv.fixed ] ) upper = torch.cat( [ idv.upper_tensor.reshape(-1) / idv.scale for idv in self.initial_design_variables if not idv.fixed ] ) return lower, upper @property def output_bounds_tensor(self) -> Tuple[Tensor, Tensor]: """ get the lower and upper bounds for all the ouputs """ lower = torch.cat( [ out.lower_tensor.reshape(-1) / out.scale if not isinstance(out, NearestFeasible) else -Tensor([float("inf")]) for out in self.outputs ] ) upper = torch.cat( [ out.upper_tensor.reshape(-1) / out.scale if not isinstance(out, NearestFeasible) else Tensor([float("inf")]) for out in self.outputs ] ) return lower, upper @property def constraint_bounds_tensor(self) -> Tuple[Tensor, Tensor]: """ get bounds for all constrained outputs """ lower, upper = self.output_bounds_tensor return lower[self.constrained_output_mask], upper[self.constrained_output_mask] @cached_property def constrained_output_mask(self) -> Tensor: """ Get a boolean tensor indicating which outputs are constrained. It is cached since this will never change throughout an optimization. """ lower, upper = self.output_bounds_tensor # determine which are constrained (has a finite lower and/or upper bound) return torch.logical_or(torch.isfinite(lower), torch.isfinite(upper)) @cached_property def num_constraints(self) -> int: """ Returns the number of constraints present. """ return int(torch.count_nonzero(self.constrained_output_mask)) @cached_property def constraints_are_all_linear(self) -> bool: """ Returns true if all the constraints are linear. """ return all(out.linear for out in self.outputs if out.is_constrained) def reset_state(self): """ resets outputs and internal state """ # remove all output values for out in self.outputs: out.value = None # reset the _last_variables tensor self._last_variables = None def compute(self, **kwargs) -> None: """ run compute to get all outputs (objective and constraints) for the current design variable setting. """ # check if the computation has already been completed if self.outputs[self.objective_index].value is not None: # if the objective is already computed then don't need to recompute anything return else: # run the objective and constraint functions self.model.compute(**kwargs) # extract the outputs for out in self.outputs: if isinstance(out, NearestFeasible): # special case if we are using the nearest feasible objective if hasattr(self, "x0"): out.value = 0.5 * torch.sum( torch.square(self.variables_tensor - self.x0) ) else: out.extract_val(model=self.model) def objective_fun(self, x: Union[ndarray, Tensor]) -> Tensor: """ Return the objective. Note that we never do any scaling for the objective. """ x = as_tensor(x) if isinstance(self.outputs[self.objective_index], NearestFeasible): # then just compute the distance from the nearest point return 0.5 * torch.sum(torch.square(x - self.x0)) x.requires_grad = True self.variables_tensor = x self.compute() objective = self.outputs[self.objective_index].value_tensor # run a couple of quick checks assert objective.numel() == 1, "objective must be a scalar" assert torch.all( torch.isfinite(objective) ), "Non-finite value encountered in the objective = %s" % str( objective.detach() ) # determine if we need to negate the objective if isinstance(self.outputs[self.objective_index], Maximize): objective = -objective # negate # return the objective and detach it so we can convert it to numpy return objective.reshape(()).detach() def objective_grad(self, x: Union[ndarray, Tensor]) -> Tensor: """ Compute the gradient of the objective with respect to the input x. Note that we never do any scaling for the objective. """ x = as_tensor(x) if isinstance(self.outputs[self.objective_index], NearestFeasible): # then just compute the gradient of the distance from the nearest point return x - self.x0 x.requires_grad = True self.variables_tensor = x self.compute() # extract the objective objective = self.outputs[self.objective_index].value_tensor # compute the objective gradient # NOTE: the gradient is actually with respect to _last_variables which may # or may not be the same tensor as x depending on whether `objective_fun` # was called first at the same point. objective.backward() assert self._last_variables is not None, "sanity check" objective_grad = self._last_variables.grad assert ( objective_grad is not None ), "objective gradient failed, something wrong with comptuational graph." assert torch.all(torch.isfinite(objective_grad)), ( "Non-finite value encountered in the objective gradient = %s" % str(objective_grad.detach()) ) # determine if we need to negate the objective if isinstance(self.outputs[self.objective_index], Maximize): objective_grad = -objective_grad # negate return objective_grad def constraint_fun( self, x: Union[ndarray, Tensor], jacobian_computation=False ) -> Tensor: """ return the constraints, with appropriate scaling """ x = as_tensor(x) if jacobian_computation: # explicitly reset the state whether we have computed at this x or not # this ensures that all computations are performed self.reset_state() else: x.requires_grad = True # set the values internally to x self.variables_tensor = x self.compute() # get all the outputs as a concatenated 1d vector all_outputs = torch.cat( [out.value_tensor.reshape(-1) / out.scale for out in self.outputs] ) # extract any constrained outputs constraints = all_outputs[self.constrained_output_mask] # run a quick check assert torch.all(torch.isfinite(constraints)), ( "Non-finite value encountered in the constraints:\n%s" % "\n".join([str(output) for output in self.outputs]) ) # return, determining whether to detach or not based on whether the jacobian # is being computed return constraints if jacobian_computation else constraints.detach() def constraint_jac(self, x: Union[ndarray, Tensor]) -> Tensor: """ Compute the jacobian of the constraints with respect to the design variables. Note that whereas the objective gradient is computed by pytorch's standard implicit approach the constraint jacobian is computed by pytorch's functional API. """ x = as_tensor(x) # compute the jacobian which is shape (num. outputs, num. design variables) # NOTE: we need to manually reset the state for each forward call to ensure all computations # are actually being performed. Also resetting of this state doesn't seem to # effect the number of evaluations because of the order in which the optimizer # calls the method which appears to be: # - objective_grad # - objective_fun - this re-uses the previous computation from objective_grad # - constraint_jac # - constraint_fun - this re-uses the previous computation from constraint_jac constraint_jac = cast( Tensor, torch.autograd.functional.jacobian( func=lambda xx: self.constraint_fun(x=xx, jacobian_computation=True), inputs=x, create_graph=False, strict=False, vectorize=self.vectorize_constraint_jac, ), ) # run a quick check assert torch.all(torch.isfinite(constraint_jac)), ( "Non-finite value encountered in the constraint jacobian:\n%s" % repr(constraint_jac.detach()) ) return constraint_jac
[docs] def optimize( self, maxiter: int = 1000, display_step: int = 50, keep_feasible: bool = False, use_finite_diff: bool = False, callback: Optional[Callable[[ndarray, OptimizeResult], bool]] = None, **optimizer_options ) -> Optional[OptimizeResult]: """ Optimize the objective, subject to constraints Args: maxiter: maximum number of optimization iterations display_step: number of optimization iterations between printing a logging message to monitor the procedure. keep_feasible: whether the optimizer should attempt to keep the optimization trajectory in a feasible region. Default: `False`. use_finite_diff: if True, will approximate gradients using finite differences rather than using pytorch's automatic differentiation. This will result in less accurate gradients and slower computation but can be useful for debugging. callback: callback function which must match the function signature of :func:`Optimizer.callback` which is the default callback function. Note that if specified then `display_step` will be ignored unless the user provided callback makes a call to :func:`Optimizer.callback`. optimizer_options: additional optimization options from scipy's `trust-constr` implementation. See `here <https://docs.scipy.org/doc/scipy/reference/optimize.minimize-trustconstr.html>`_. """ self.display_step = display_step # compute the current iterate first. This is nessessary since we need to know the dimensions of all the outputs self.compute() logger.info( "Initial objective: %.3f." % ( self.outputs[self.objective_index].value_tensor if not isinstance(self.outputs[self.objective_index], NearestFeasible) else 0.0 ) ) logger.info("Beginning optimization.") self.x0 = self.variables_tensor.detach() # setup constraints if self.num_constraints > 0: constraint_lb, constraint_ub = self.constraint_bounds_tensor constraints = NonlinearConstraint( fun=self.constraint_fun, jac=self.constraint_jac if not use_finite_diff else "2-point", # type:ignore lb=constraint_lb, ub=constraint_ub, keep_feasible=keep_feasible, **( # specify a Hessian of zero if all the constraints are linear dict(hess=lambda x, v: csr_matrix((self.x0.numel(),) * 2)) if self.constraints_are_all_linear else {} ) ) else: # there are no constraints constraints = None # run the optimization try: res = minimize( fun=self.objective_fun, jac=self.objective_grad if not use_finite_diff else None, hess=(lambda x, *args: csr_matrix((self.x0.numel(),) * 2)) if self.outputs[self.objective_index].linear else ( (lambda x, *args: eye(self.x0.numel())) if isinstance(self.outputs[self.objective_index], NearestFeasible) else None ), x0=self.x0, bounds=[bound for bound in zip(*self.variable_bounds_tensor)], method="trust-constr", constraints=constraints, options=dict(maxiter=maxiter, **optimizer_options), callback=callback if callback is not None else (self.callback if display_step < maxiter else None), ) except (KeyboardInterrupt): logger.info("Keyboard interrupt raised. Cleaning up...") return except: logger.error("Error during optimization!") raise else: logger.info("Optimization completed.") logger.info( "Function Evals: %d. Exit status: %s. Objective: %.4g. Constraint Violation: %.3g" % (res["nfev"], res["status"], res["fun"], res["constr_violation"]) ) logger.info("Execution time: %.1fs" % res["execution_time"]) print(res["message"]) # set the parameters internally self.variables_tensor = as_tensor(res["x"]) return res
[docs] def callback(self, xk: ndarray, res: OptimizeResult) -> bool: """ Callback function for scipy minimize. Args: xk: current design variable setting. res: current optimization result and details. Returns: Flag indicating whether to terminate optimization. """ if res["niter"] == 1 or res["niter"] % self.display_step == 0: logger.info( "niter: %04d, fun: %.4f, constr_violation: %.3g, execution_time: %.1fs" % ( res["niter"], -res["fun"] if isinstance(self.outputs[self.objective_index], Maximize) else res["fun"], res["constr_violation"], res["execution_time"], ) ) return False
[docs] def check_grad(self) -> Tuple[Tensor, Tensor]: """ verifies that the objective gradient and constraint jacobian are correct at the current setting of the design variables. Returns: A scalar tensor that gives the error in the objective gradient and a 1D tensor that gives the error in each row of the the constraint jacobian, respectively. """ # check the objective gradient objective_grad_error = check_grad( func=lambda x: self.objective_fun(x).numpy(), grad=lambda x: self.objective_grad(x).numpy(), x0=self.variables_tensor.detach().numpy(), ) # check the constraint gradients # do this by looping through each constraint and the respective row of the jacobian constraints_jac_error = [ check_grad( func=lambda x: self.constraint_fun(x)[i].numpy(), grad=lambda x: self.constraint_jac(x)[i, :].numpy(), x0=self.variables_tensor.detach().numpy(), ) for i in range(self.num_constraints) ] return as_tensor(objective_grad_error), as_tensor(constraints_jac_error)
def reset_design_variables(self) -> None: """ reset the design variables to their initial value. """ self.variables_object = self.initial_design_variables def __str__(self): with torch.no_grad(): string = "" # print all design variables string += "*" * 80 + "\n" string += "DESIGN VARIABLES: (value, lower, upper, active)\n" string += "*" * 80 + "\n" design_variables = self.variables_object for dv in design_variables: string += dv.__str__() + "\n" # print all outputs string += "*" * 80 + "\n" string += "OUTPUTS: (value, lower, upper, active)\n" string += "*" * 80 + "\n" for out in self.outputs: if not isinstance(out, NearestFeasible): string += out.__str__() + "\n" return string