Source code for qiskit_algorithms.optimizers.p_bfgs

# This code is part of a Qiskit project.
#
# (C) Copyright IBM 2018, 2023.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Parallelized Limited-memory BFGS optimizer"""
from __future__ import annotations

import logging
import multiprocessing
import platform
from collections.abc import Callable
from typing import SupportsFloat

import numpy as np

from qiskit_algorithms.utils import algorithm_globals
from qiskit_algorithms.utils.validation import validate_min

from .optimizer import OptimizerResult, POINT
from .scipy_optimizer import SciPyOptimizer

logger = logging.getLogger(__name__)


[docs]class P_BFGS(SciPyOptimizer): # pylint: disable=invalid-name """ Parallelized Limited-memory BFGS optimizer. P-BFGS is a parallelized version of :class:`L_BFGS_B` with which it shares the same parameters. P-BFGS can be useful when the target hardware is a quantum simulator running on a classical machine. This allows the multiple processes to use simulation to potentially reach a minimum faster. The parallelization may also help the optimizer avoid getting stuck at local optima. Uses scipy.optimize.fmin_l_bfgs_b. For further detail, please refer to https://docs.scipy.org/doc/scipy/reference/generated/scipy.optimize.fmin_l_bfgs_b.html .. note:: This component has some function that is normally random. If you want to reproduce behavior then you should set the random number generator seed in the algorithm_globals (``qiskit_algorithms.utils.algorithm_globals.random_seed = seed``). """ _OPTIONS = ["maxfun", "ftol", "iprint"] # pylint: disable=unused-argument def __init__( self, maxfun: int = 1000, ftol: SupportsFloat = 10 * np.finfo(float).eps, iprint: int = -1, max_processes: int | None = None, options: dict | None = None, max_evals_grouped: int = 1, **kwargs, ) -> None: r""" Args: maxfun: Maximum number of function evaluations. ftol: The iteration stops when (f\^k - f\^{k+1})/max{\|f\^k\|,\|f\^{k+1}\|,1} <= ftol. iprint: Controls the frequency of output. iprint < 0 means no output; iprint = 0 print only one line at the last iteration; 0 < iprint < 99 print also f and \|proj g\| every iprint iterations; iprint = 99 print details of every iteration except n-vectors; iprint = 100 print also the changes of active set and final x; iprint > 100 print details of every iteration including x and g. max_processes: maximum number of processes allowed, has a min. value of 1 if not None. options: A dictionary of solver options. max_evals_grouped: Max number of default gradient evaluations performed simultaneously. kwargs: additional kwargs for scipy.optimize.minimize. """ if max_processes: validate_min("max_processes", max_processes, 1) if options is None: options = {} for k, v in list(locals().items()): if k in self._OPTIONS: options[k] = v super().__init__( method="L-BFGS-B", options=options, max_evals_grouped=max_evals_grouped, **kwargs, ) self._max_processes = max_processes
[docs] def minimize( self, fun: Callable[[POINT], float], x0: POINT, jac: Callable[[POINT], POINT] | None = None, bounds: list[tuple[float, float]] | None = None, ) -> OptimizerResult: x0 = np.asarray(x0) num_procs = multiprocessing.cpu_count() - 1 num_procs = ( num_procs if self._max_processes is None else min(num_procs, self._max_processes) ) num_procs = num_procs if num_procs >= 0 else 0 if platform.system() == "Darwin": # Changed in version 3.8: On macOS, the spawn start method is now the # default. The fork start method should be considered unsafe as it can # lead to crashes. # However P_BFGS doesn't support spawn, so we revert to single process. num_procs = 0 logger.warning( "For MacOS, python >= 3.8, using only current process. " "Multiple core use not supported." ) elif platform.system() == "Windows": num_procs = 0 logger.warning( "For Windows, using only current process. Multiple core use not supported." ) queue: multiprocessing.queues.Queue[tuple[POINT, float, int]] = multiprocessing.Queue() # TODO: are automatic bounds a good idea? What if the circuit parameters are not # just from plain Pauli rotations but have a coefficient? # bounds for additional initial points in case bounds has any None values threshold = 2 * np.pi if bounds is None: bounds = [(-threshold, threshold)] * x0.size low = [(l if l is not None else -threshold) for (l, u) in bounds] high = [(u if u is not None else threshold) for (l, u) in bounds] def optimize_runner(_queue, _i_pt): # Multi-process sampling _sol, _opt, _nfev = self._optimize(fun, _i_pt, jac, bounds) _queue.put((_sol, _opt, _nfev)) # Start off as many other processes running the optimize (can be 0) processes = [] for _ in range(num_procs): i_pt = algorithm_globals.random.uniform(low, high) # Another random point in bounds proc = multiprocessing.Process(target=optimize_runner, args=(queue, i_pt)) processes.append(proc) proc.start() # While the one optimize in this process below runs the other processes will # be running too. This one runs # with the supplied initial point. The process ones have their own random one sol, opt, nfev = self._optimize(fun, x0, jac, bounds) for proc in processes: # For each other process we wait now for it to finish and see if it has # a better result than above proc.join() p_sol, p_opt, p_nfev = queue.get() if p_opt < opt: sol, opt = p_sol, p_opt nfev += p_nfev result = OptimizerResult() result.x = sol result.fun = opt result.nfev = nfev return result
def _optimize( self, objective_function, initial_point, gradient_function=None, variable_bounds=None, ) -> tuple[POINT, float, int]: result = super().minimize( objective_function, initial_point, gradient_function, variable_bounds ) return result.x, result.fun, result.nfev