# -*- coding: utf-8 -*-
# =============================================================================
# qopt
# Copyright (C) 2020 Julian Teske, Forschungszentrum Juelich
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
#
# Contact email: j.teske@fz-juelich.de
# =============================================================================
"""
This module contains interfaces to optimization algorithms or the algorithms
themselves.
Currently it supports interfacing to least squares and minimization algorithms
of the scipy package. Conditionally, a simulated annealing is supported, if the
simanneal package is included in the environment. (See installation
instructions on https://github.com/qutech/qopt) The classes for simulated
annealing are in an experimental state and not well tested!
The `Optimizer` class uses the `Simulator` as interface to the cost functions
evaluated after the simulation.
Classes
-------
:class:`Optimizer`
Base class optimizer.
:class:`LeastSquaresOptimizer`
An interface to scipy's least squares optimizer.
:class:`ScalarMinimizingOptimizer`
An interface to scipy's minimize functions.
:class:`WallTimeExceeded`
Exception for exceeding the optimization's time limit.
:class:`PulseAnnealer`
Helper class for `SimulatedAnnealing`.
:class:`SimulatedAnnealing`
Simulated annealing as optimization.
Notes
-----
The implementation was inspired by the optimal control package of QuTiP [1]_
(Quantum Toolbox in Python)
References
----------
.. [1] J. R. Johansson, P. D. Nation, and F. Nori: "QuTiP 2: A Python framework
for the dynamics of open quantum systems.", Comp. Phys. Comm. 184, 1234
(2013) [DOI: 10.1016/j.cpc.2012.11.019].
"""
import copy
import numpy as np
import scipy
import scipy.optimize
import time
from abc import ABC, abstractmethod
from typing import Dict, Optional, Callable, List, Union, Sequence
from unittest import mock
from warnings import warn
from qopt import optimization_data, simulator, performance_statistics
import time
import sys
try:
import simanneal
except ImportError:
warn('simanneal not installed. '
'SimulatedAnnealing is not available')
simanneal = mock.Mock()
default_termination_conditions = {
"min_gradient_norm": 1e-7,
"min_cost_gain": 1e-7,
"max_wall_time": 10 * 60.0,
"max_cost_func_calls": 1e6,
"max_iterations": 10000,
"min_amplitude_change": 1e-8
}
[docs]class WallTimeExceeded(Exception):
"""Raised when the time limit for the optimization is exceeded. """
pass
[docs]class Optimizer(ABC):
""" Abstract base class for the optimizer.
Parameters
----------
system_simulator : Simulator
The simulator is the interface to the simulation.
termination_cond: dict, optional
The termination conditions of the optimization.
save_intermediary_steps: bool, optional
If True, then the results from intermediary steps are stored. Defaults
to True.
cost_func_weights: list of float, optional
The cost functions are multiplied with these weights during the
optimisation.
use_jacobian_function: bool, optional
If set to true, then the jacobians are calculated analytically.
Defaults to True.
store_optimizer: bool, optional
If True, then the optimizer stores itself in the result class.
Defaults to False
Attributes
----------
system_simulator : Simulator
The simulator is the interface to the simulation.
pulse_shape : Tuple of int
The shape of the control amplitudes is saved and used for the
cost functions while the optimization function might need them flatted.
cost_func_weights: list of float, optional
The cost functions are multiplied with these weights during the
optimisation.
use_jacobian_function: bool, optional
If set to true, then the jacobians are calculated analytically.
store_optimizer: bool, optional
If True, then the optimizer stores itself in the result class.
Defaults to False
"""
def __init__(
self,
system_simulator: Optional[simulator.Simulator] = None,
termination_cond: Optional[Dict] = None,
save_intermediary_steps: bool = True,
cost_func_weights: Optional[Sequence[float]] = None,
use_jacobian_function=True,
store_optimizer: bool = False
):
self.system_simulator = system_simulator
self.use_jacobian_function = use_jacobian_function
self.termination_conditions = default_termination_conditions
if termination_cond is not None:
self.termination_conditions.update(**termination_cond)
self.optim_iter_summary = None
self.pulse_shape = ()
self._opt_start_time = 0
self._min_costs = np.inf
self._min_costs_par = None
self._n_cost_fkt_eval = 0
self._n_jac_fkt_eval = 0
# flags:
self.save_intermediary_steps = save_intermediary_steps
self.store_optimizer = store_optimizer
self.cost_func_weights = cost_func_weights
if self.cost_func_weights is not None:
self.cost_func_weights = np.asarray(
self.cost_func_weights).flatten()
if len(self.cost_func_weights) == 0:
self.cost_func_weights = None
[docs] def cost_func_wrapper(self, optimization_parameters):
"""Wraps the cost function given by the simulator class.
The relevant information for the analysis is saved.
Parameters
----------
optimization_parameters: np.array
Raw optimization parameters in a linear array.
Returns
-------
costs: np.array, shape (n_fun)
Cost values.
"""
if (time.time() - self._opt_start_time) \
> self.termination_conditions['max_wall_time']:
raise WallTimeExceeded
costs = self.system_simulator.wrapped_cost_functions(
optimization_parameters.reshape(self.pulse_shape[::-1]).T)
if self.save_intermediary_steps:
self.optim_iter_summary.iter_num += 1
self.optim_iter_summary.costs.append(costs)
self.optim_iter_summary.parameters.append(
optimization_parameters.reshape(self.pulse_shape[::-1]).T
)
if np.linalg.norm(costs) < np.linalg.norm(self._min_costs):
self._min_costs = costs
self._min_costs_par = optimization_parameters.reshape(
self.pulse_shape[::-1]).T
# apply the cost function weights after saving the values.
if self.cost_func_weights is not None:
costs *= self.cost_func_weights
self._n_cost_fkt_eval += 1
return costs
[docs] def cost_jacobian_wrapper(self, optimization_parameters):
"""Wraps the cost Jacobian function given by the simulator class.
The relevant information for the analysis is saved.
Parameters
----------
optimization_parameters: np.array
Raw optimization parameters in a linear array.
Returns
-------
jacobian: np.array, shape (num_func, num_t * num_amp)
Jacobian of the cost functions.
"""
jacobian = self.system_simulator.wrapped_jac_function(
optimization_parameters.reshape(self.pulse_shape[::-1]).T)
if self.save_intermediary_steps:
self.optim_iter_summary.gradients.append(jacobian)
# jacobian shape (num_t, num_f, num_ctrl) -> (num_f, num_t * num_ctrl)
jacobian = jacobian.transpose([1, 2, 0])
jacobian = jacobian.reshape(
(jacobian.shape[0], jacobian.shape[1] * jacobian.shape[2]))
# apply the cost function weights after saving the values.
if self.cost_func_weights is not None:
jacobian = np.einsum('ab, a -> ab', jacobian,
self.cost_func_weights)
self._n_jac_fkt_eval += 1
return jacobian
[docs] @abstractmethod
def run_optimization(self, initial_control_amplitudes: np.ndarray,
verbose) \
-> optimization_data.OptimizationResult:
"""Runs the optimization of the control amplitudes.
Parameters
----------
initial_control_amplitudes : array
shape (num_t, num_ctrl)
verbose
Verbosity of the run. Depends on which optimizer is used.
Returns
-------
optimization_result : `OptimizationResult`
The resulting data of the simulation.
"""
pass
[docs] def prepare_optimization(self,
initial_optimization_parameters: np.ndarray):
"""Prepare for the next optimization.
Parameters
----------
initial_optimization_parameters : array
shape (num_t, num_ctrl)
Data stored in this class might be overwritten.
"""
self._min_costs = np.inf
self._min_costs_par = None
self._n_cost_fkt_eval = 0
self._n_jac_fkt_eval = 0
self.pulse_shape = initial_optimization_parameters.shape
if self.save_intermediary_steps:
self.optim_iter_summary = \
optimization_data.OptimizationSummary(
indices=self.system_simulator.cost_indices
)
self._opt_start_time = time.time()
if self.system_simulator.stats is not None:
# If the system simulator wants to write down statistics, then
# initialise a fresh instance
self.system_simulator.stats = \
performance_statistics.PerformanceStatistics()
self.system_simulator.stats.start_t_opt = float(
self._opt_start_time)
self.system_simulator.stats.indices = \
self.system_simulator.cost_indices
[docs] def write_state_to_result(self):
""" Writes the current state into an instance of 'OptimizationResult'.
Intended for saving progress when terminating the optimization in an
unexpected way.
Returns
-------
result: optimization_data.OptimizationResult
The current result of the optimization.
"""
if self.system_simulator.stats is not None:
self.system_simulator.stats.end_t_opt = time.time()
if self.use_jacobian_function:
jac_norm = np.linalg.norm(
self.cost_jacobian_wrapper(self._min_costs_par))
else:
jac_norm = 0
if self.store_optimizer:
storage_opt = self
else:
storage_opt = None
optim_result = optimization_data.OptimizationResult(
final_cost=self._min_costs,
indices=self.system_simulator.cost_indices,
final_parameters=self._min_costs_par,
final_grad_norm=jac_norm,
num_iter=self._n_cost_fkt_eval,
termination_reason='Maximum Wall Time Exceeded',
status=5,
optimizer=storage_opt,
optim_summary=self.optim_iter_summary,
optimization_stats=self.system_simulator.stats
)
return optim_result
[docs]class LeastSquaresOptimizer(Optimizer):
"""
Uses the scipy least squares method for optimization.
Parameters
----------
system_simulator: `Simulator`
The systems simulator.
termination_cond: dict
Termination conditions.
save_intermediary_steps: bool, optional
If False, only the simulation result is stored. Defaults to False.
method: str, optional
The optimization method used. Currently implemented are:
- 'trf': A trust region optimization algorithm. This is the default.
bounds: list of array-like, optional
Attention: The boundary format can vary between optimizers!
The boundary conditions for the pulse optimizations. If none are given
then the pulse is assumed to take any real value. The boundaries are
given as a list of two arrays. The first array specifies the upper
and the second array specifies the lower bounds. Single parameters
can be excepted by using np.inf with appropriate sign.
"""
def __init__(
self,
system_simulator: Optional[simulator.Simulator] = None,
termination_cond: Optional[Dict] = None,
save_intermediary_steps: bool = True,
method: str = 'trf',
bounds: Union[np.ndarray, List, None] = None,
use_jacobian_function=True,
cost_func_weights: Optional[Sequence[float]] = None,
store_optimizer: bool = False):
super().__init__(system_simulator=system_simulator,
termination_cond=termination_cond,
save_intermediary_steps=save_intermediary_steps,
cost_func_weights=cost_func_weights,
use_jacobian_function=use_jacobian_function,
store_optimizer=store_optimizer)
self.method = method
self.bounds = bounds
[docs] def run_optimization(self, initial_control_amplitudes: np.array,
verbose: int = 0) -> optimization_data.OptimizationResult:
"""See base class. """
super().prepare_optimization(
initial_optimization_parameters=initial_control_amplitudes)
if self.use_jacobian_function:
jac = super().cost_jacobian_wrapper
else:
jac = '2-point'
try:
result = scipy.optimize.least_squares(
fun=super().cost_func_wrapper,
x0=initial_control_amplitudes.T.flatten(),
jac=jac,
bounds=self.bounds,
method=self.method,
ftol=self.termination_conditions["min_cost_gain"],
xtol=self.termination_conditions["min_amplitude_change"],
gtol=self.termination_conditions["min_gradient_norm"],
max_nfev=self.termination_conditions["max_iterations"],
verbose=verbose
)
if self.system_simulator.stats is not None:
self.system_simulator.stats.end_t_opt = time.time()
if self.store_optimizer:
storage_opt = self
else:
storage_opt = None
optim_result = optimization_data.OptimizationResult(
final_cost=result.fun,
indices=self.system_simulator.cost_indices,
final_parameters=result.x.reshape(
self.pulse_shape[::-1]).T,
final_grad_norm=np.linalg.norm(result.grad),
num_iter=result.nfev,
termination_reason=result.message,
status=result.status,
optimizer=storage_opt,
optim_summary=self.optim_iter_summary,
optimization_stats=self.system_simulator.stats
)
except WallTimeExceeded:
optim_result = self.write_state_to_result()
return optim_result
[docs]class ScalarMinimizingOptimizer(Optimizer):
""" Interfaces to the minimize functions of the optimization package in
scipy.
Parameters
----------
method: string
Takes methods implemented by scipy.optimize.minimize.
bounds: sequence, optional
Attention: The boundary format can vary between optimizers!
The boundary conditions for the pulse optimizations. If none are given
then the pulse is assumed to take any real value. The boundaries are
given as a sequence of (min, max) pairs for each element. Defaults to
None.
"""
def __init__(
self,
system_simulator: Optional[simulator.Simulator] = None,
termination_cond: Optional[Dict] = None,
save_intermediary_steps: bool = True,
method: str = 'L-BFGS-B',
bounds: Union[np.ndarray, List, None] = None,
use_jacobian_function=True,
cost_func_weights: Optional[Sequence[float]] = None,
store_optimizer: bool = False
):
super().__init__(system_simulator=system_simulator,
termination_cond=termination_cond,
save_intermediary_steps=save_intermediary_steps,
cost_func_weights=cost_func_weights,
use_jacobian_function=use_jacobian_function,
store_optimizer=store_optimizer)
self.method = method
self.bounds = bounds
[docs] def cost_func_wrapper(self, optimization_parameters):
""" Evalutes the cost function.
The total cost function is defined as the sum of cost functions.
"""
costs = super().cost_func_wrapper(optimization_parameters)
scalar_costs = np.sum(costs)
return scalar_costs
[docs] def cost_jacobian_wrapper(self, optimization_parameters):
""" The Jacobian reduced to the gradient.
The gradient is calculated by summation over the Jacobian along the
function axis, because the total cost function is defined as the sum
of cost functions.
Returns
-------
gradient: numpy array, shape (num_t * num_amp)
The gradient of the costs in the 2 norm.
"""
jac = super().cost_jacobian_wrapper(optimization_parameters)
grad = (np.sum(jac, axis=0))
return grad
[docs] def run_optimization(self, initial_control_amplitudes: np.array,
verbose: bool = False) -> optimization_data.OptimizationResult:
super().prepare_optimization(
initial_optimization_parameters=initial_control_amplitudes)
if self.use_jacobian_function:
jac = self.cost_jacobian_wrapper
else:
jac = None
if self.method == 'L-BFGS-B':
try:
result = scipy.optimize.minimize(
fun=self.cost_func_wrapper,
x0=initial_control_amplitudes.T.flatten(),
jac=jac,
bounds=self.bounds,
method=self.method,
options={
'ftol': self.termination_conditions["min_cost_gain"],
'gtol': self.termination_conditions["min_gradient_norm"],
'maxiter': self.termination_conditions["max_iterations"],
'disp': verbose
}
)
if self.store_optimizer:
storage_opt = self
else:
storage_opt = None
optim_result = optimization_data.OptimizationResult(
final_cost=result.fun,
indices=self.system_simulator.cost_indices,
final_parameters=result.x.reshape(
self.pulse_shape[::-1]).T,
final_grad_norm=np.linalg.norm(result.jac),
num_iter=result.nfev,
termination_reason=result.message,
status=result.status,
optimizer=storage_opt,
optim_summary=self.optim_iter_summary,
optimization_stats=self.system_simulator.stats
)
except WallTimeExceeded:
optim_result = self.write_state_to_result()
elif self.method == 'Nelder-Mead':
try:
result = scipy.optimize.minimize(
fun=self.cost_func_wrapper,
x0=initial_control_amplitudes.T.flatten(),
bounds=self.bounds,
method=self.method,
options={
'maxiter': self.termination_conditions[
"max_iterations"]},
)
if self.store_optimizer:
storage_opt = self
else:
storage_opt = None
optim_result = optimization_data.OptimizationResult(
final_cost=result.fun,
indices=self.system_simulator.cost_indices,
final_parameters=result.x.reshape(
self.pulse_shape[::-1]).T,
num_iter=result.nfev,
termination_reason=result.message,
status=result.status,
optimizer=storage_opt,
optim_summary=self.optim_iter_summary,
optimization_stats=self.system_simulator.stats
)
except WallTimeExceeded:
optim_result = self.write_state_to_result()
else:
try:
result = scipy.optimize.minimize(
fun=self.cost_func_wrapper,
x0=initial_control_amplitudes.T.flatten(),
bounds=self.bounds,
method=self.method
)
optim_result = optimization_data.OptimizationResult(
final_cost=result.fun,
indices=self.system_simulator.cost_indices,
final_parameters=result.x.reshape(
self.pulse_shape[::-1]).T,
num_iter=result.nfev,
termination_reason=result.message,
status=result.status,
optimizer=self,
optim_summary=self.optim_iter_summary,
optimization_stats=self.system_simulator.stats
)
except WallTimeExceeded:
optim_result = self.write_state_to_result()
if self.system_simulator.stats is not None:
self.system_simulator.stats.end_t_opt = time.time()
return optim_result
[docs]def time_string(seconds):
"""Returns time in seconds as a string formatted HHHH:MM:SS."""
s = int(round(seconds)) # round to nearest second
h, s = divmod(s, 3600) # get hours and remainder
m, s = divmod(s, 60) # split remainder into minutes and seconds
return '%4i:%02i:%02i' % (h, m, s)
class PulseAnnealer(simanneal.Annealer):
"""
Simulated annealer for the discrete optimization of pulses.
This function implements an annealer class for the optimization of pulses.
The virtual methods move and energy are reimplemented.
The state is the pulse.
Parameters
----------
initial_state: numpy array, shape = (n_times, n_ctrl)
The initial state or initial pulse.
bounds: numpy array, shape = (2, n_times, n_ctrl)
Boundaries for the pulse.
energy_function: callable
Energy or Cost function.
step_size: int
The annealer will optimize taking steps of the size of all integers,
whose absolute values are smaller or equal to step_size in any
direction. Defaults to 1 for the optimization of integers.
step_ratio: float
The step ratio determines how many of the time segments are optimized
at once.
Tmax: float
Initial or maximum temperature for the annealing algorithm. The initial
temperature should be about as large as the initial infidelity. If
pulses with random initial values are optimized, I suggest an initial
temperature of 1.
Tmin: float
Final or minimal temperature. The final temperature should be at least
2 orders of magnitude smaller than the final fidelity you expect, such
that the annealer does not jump close to the completion.
steps: int
Number of optimization steps before the pulse terminates.
updates: int
Number of updates. At each update the program calls the update
function to document how many steps were accepted since the last update
and how many of them improved the energy or cost function.
"""
def __init__(
self,
initial_state,
bounds,
energy_function: Callable,
step_size: int,
step_ratio: float,
Tmax: float,
Tmin: float,
steps: int,
updates: int
):
super().__init__(initial_state=initial_state)
self.Tmax = Tmax
self.Tmin = Tmin
self.steps = steps
self.updates = updates
self.bounds = bounds
self.step_size = step_size
self.step_ratio = step_ratio
self.energy_function = energy_function
def move(self):
""" Moving into a random direction.
This function applies a random variation to the pulse. The value of
each time segment is either incremented by step_size, reduced by
step_size or kept constant. """
pulse = self.state
if type(self.step_size) != int:
raise ValueError("The step size must be integer! But it is: "
+ str(self.step_size))
if self.step_size == 0:
raise ValueError("The step size has been set to 0.")
# Create random integers between -self.step_size and self.step_size
random_step = np.random.randint(
low=-1 * self.step_size,
high=self.step_size + 1,
size=pulse.shape
)
# The update mask decides randomly which directions are neglected
update_mask = np.random.rand(*random_step.shape)
random_step[update_mask > self.step_ratio] = 0
new_pulse = pulse + random_step
# if a limit is exceeded, set the value to the limit
lower_limit_exceeded = new_pulse < self.bounds[0]
upper_limit_exceeded = new_pulse > self.bounds[1]
new_pulse[lower_limit_exceeded] = self.bounds[0][lower_limit_exceeded]
new_pulse[upper_limit_exceeded] = self.bounds[1][upper_limit_exceeded]
self.state = new_pulse
return
def energy(self):
"""The energy or cost function of the annealer. """
e = np.linalg.norm(self.energy_function(self.state.T.flatten()))
return e
[docs]class SimulatedAnnealing(Optimizer):
"""
This class uses simulated annealing for discrete optimization.
The use of this class requires the installation of the simanneal package
from pypi.
Parameters
----------
bounds: array of boundaries, shape: (2, num_t, num_ctrl)
The boundary conditions for the pulse optimizations. bounds[0] should
be the lower bounds, and bounds[1] the upper ones.
initial_temperature: float
Initial or maximum temperature for the annealing algorithm. The initial
temperature should be about as large as the initial infidelity. If
pulses with random initial values are optimized, I suggest an initial
temperature of 1. Defaults to 1.
final_temperature: float
Final or minimal temperature. The final temperature should be at least
2 orders of magnitude smaller than the final fidelity you expect, such
that the annealer does not jump close to the completion.
step_size: int
The annealer will optimize taking steps of the size of all integers,
whose absolute values are smaller or equal to step_size in any
direction. Defaults to 1 for the optimization of integers. Defaults to
1.
step_ratio: float
The step ratio determines how many of the time segments are optimized
at once. Defaults to 1.
steps: int
Number of optimization steps before the pulse terminates. Defaults to
1000.
updates: int
Number of updates. At each update the program calls the update
function to document how many steps were accepted since the last update
and how many of them improved the energy or cost function. Defaults to
100.
"""
def __init__(
self,
system_simulator: Optional[simulator.Simulator],
bounds: Optional[np.ndarray],
store_optimizer: bool = False,
initial_temperature: float = 1.,
final_temperature: float = 1e-6,
step_size: int = 1,
step_ratio: float = 1.,
steps: Optional[int] = 1000,
updates: int = 100
):
try:
import simanneal
except ImportError as err:
raise RuntimeError(
'Requirements not fulfilled. Please install simanneal'
) from err
termination_conditions = copy.deepcopy(default_termination_conditions)
termination_conditions['max_wall_time'] = 1e8
super().__init__(
system_simulator=system_simulator,
termination_cond=termination_conditions,
save_intermediary_steps=False,
store_optimizer=store_optimizer
)
self.annealer = PulseAnnealer(
initial_state=0,
bounds=bounds,
energy_function=self.cost_func_wrapper,
step_size=step_size,
step_ratio=step_ratio,
Tmax=initial_temperature,
Tmin=final_temperature,
steps=steps,
updates=updates
)
[docs] def run_optimization(self, initial_control_amplitudes: np.ndarray,
verbose=None):
"""See base class. """
self.prepare_optimization(
initial_optimization_parameters=initial_control_amplitudes)
pulse, costs = self.annealer.anneal()
if self.system_simulator.stats is not None:
self.system_simulator.stats.end_t_opt = time.time()
if self.store_optimizer:
storage_opt = self
else:
storage_opt = None
optim_result = optimization_data.OptimizationResult(
final_cost=costs,
indices=self.system_simulator.cost_indices,
final_parameters=pulse,
optimizer=storage_opt,
optim_summary=self.optim_iter_summary,
optimization_stats=self.system_simulator.stats
)
return optim_result
[docs] def prepare_optimization(self,
initial_optimization_parameters: np.ndarray):
""" See base class. """
super().prepare_optimization(
initial_optimization_parameters=initial_optimization_parameters)
self.annealer.state = initial_optimization_parameters