# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Qiskit runtime service."""
import json
import logging
import traceback
import warnings
from datetime import datetime
from collections import OrderedDict
from typing import Dict, Callable, Optional, Union, List, Any, Type, Sequence
from qiskit.providers.backend import BackendV1 as Backend
from qiskit.providers.provider import ProviderV1 as Provider
from qiskit.providers.exceptions import QiskitBackendNotFoundError
from qiskit.providers.providerutils import filter_backends
from qiskit.providers.models import (
PulseBackendConfiguration,
QasmBackendConfiguration,
)
from qiskit_ibm_provider.proxies import ProxyConfiguration
from qiskit_ibm_provider.utils.hgp import to_instance_format, from_instance_format
from qiskit_ibm_provider.utils.backend_decoder import configuration_from_server_data
from qiskit_ibm_runtime import ibm_backend
from .accounts import AccountManager, Account, ChannelType
from .api.clients import AuthClient, VersionClient
from .api.clients.runtime import RuntimeClient
from .api.exceptions import RequestsApiError
from .constants import QISKIT_IBM_RUNTIME_API_URL
from .exceptions import IBMNotAuthorizedError, IBMInputValueError, IBMAccountError
from .exceptions import (
IBMRuntimeError,
RuntimeDuplicateProgramError,
RuntimeProgramNotFound,
RuntimeJobNotFound,
)
from .hub_group_project import HubGroupProject # pylint: disable=cyclic-import
from .program.result_decoder import ResultDecoder
from .runtime_job import RuntimeJob
from .runtime_program import RuntimeProgram, ParameterNamespace
from .utils import RuntimeDecoder, to_base64_string, to_python_identifier
from .api.client_parameters import ClientParameters
from .runtime_options import RuntimeOptions
from .ibm_backend import IBMBackend
logger = logging.getLogger(__name__)
SERVICE_NAME = "runtime"
[docs]
class QiskitRuntimeService(Provider):
"""Class for interacting with the Qiskit Runtime service.
Qiskit Runtime is a new architecture offered by IBM Quantum that
streamlines computations requiring many iterations. These experiments will
execute significantly faster within its improved hybrid quantum/classical
process.
A sample workflow of using the runtime service::
from qiskit_ibm_runtime import QiskitRuntimeService, Session, Sampler, Estimator, Options
from qiskit.test.reference_circuits import ReferenceCircuits
from qiskit.circuit.library import RealAmplitudes
from qiskit.quantum_info import SparsePauliOp
# Initialize account.
service = QiskitRuntimeService()
# Set options, which can be overwritten at job level.
options = Options(optimization_level=1)
# Prepare inputs.
bell = ReferenceCircuits.bell()
psi = RealAmplitudes(num_qubits=2, reps=2)
H1 = SparsePauliOp.from_list([("II", 1), ("IZ", 2), ("XI", 3)])
theta = [0, 1, 1, 2, 3, 5]
with Session(service=service, backend="ibmq_qasm_simulator") as session:
# Submit a request to the Sampler primitive within the session.
sampler = Sampler(session=session, options=options)
job = sampler.run(circuits=bell)
print(f"Sampler results: {job.result()}")
# Submit a request to the Estimator primitive within the session.
estimator = Estimator(session=session, options=options)
job = estimator.run(
circuits=[psi], observables=[H1], parameter_values=[theta]
)
print(f"Estimator results: {job.result()}")
# Close the session only if all jobs are finished
# and you don't need to run more in the session.
session.close()
The example above uses the dedicated :class:`~qiskit_ibm_runtime.Sampler`
and :class:`~qiskit_ibm_runtime.Estimator` classes. You can also
use the :meth:`run` method directly to invoke a Qiskit Runtime program.
If the program has any interim results, you can use the ``callback``
parameter of the :meth:`run` method to stream the interim results.
Alternatively, you can use the :meth:`RuntimeJob.stream_results` method to stream
the results at a later time, but before the job finishes.
The :meth:`run` method returns a
:class:`RuntimeJob` object. You can use its
methods to perform tasks like checking job status, getting job result, and
canceling job.
"""
global_service = None
def __init__(
self,
channel: Optional[ChannelType] = None,
token: Optional[str] = None,
url: Optional[str] = None,
filename: Optional[str] = None,
name: Optional[str] = None,
instance: Optional[str] = None,
proxies: Optional[dict] = None,
verify: Optional[bool] = None,
channel_strategy: Optional[str] = None,
) -> None:
"""QiskitRuntimeService constructor
An account is selected in the following order:
- Account with the input `name`, if specified.
- Default account for the `channel` type, if `channel` is specified but `token` is not.
- Account defined by the input `channel` and `token`, if specified.
- Account defined by the `default_channel` if defined in filename
- Account defined by the environment variables, if defined.
- Default account for the ``ibm_cloud`` account, if one is available.
- Default account for the ``ibm_quantum`` account, if one is available.
`instance`, `proxies`, and `verify` can be used to overwrite corresponding
values in the loaded account.
Args:
channel: Channel type. ``ibm_cloud`` or ``ibm_quantum``.
token: IBM Cloud API key or IBM Quantum API token.
url: The API URL.
Defaults to https://cloud.ibm.com (ibm_cloud) or
https://auth.quantum-computing.ibm.com/api (ibm_quantum).
filename: Full path of the file where the account is created.
Default: _DEFAULT_ACCOUNT_CONFIG_JSON_FILE
name: Name of the account to load.
instance: The service instance to use.
For ``ibm_cloud`` runtime, this is the Cloud Resource Name (CRN) or the service name.
For ``ibm_quantum`` runtime, this is the hub/group/project in that format.
proxies: Proxy configuration. Supported optional keys are
``urls`` (a dictionary mapping protocol or protocol and host to the URL of the proxy,
documented at https://docs.python-requests.org/en/latest/api/#requests.Session.proxies),
``username_ntlm``, ``password_ntlm`` (username and password to enable NTLM user
authentication)
verify: Whether to verify the server's TLS certificate.
channel_strategy: Error mitigation strategy.
Returns:
An instance of QiskitRuntimeService.
Raises:
IBMInputValueError: If an input is invalid.
"""
super().__init__()
self._account = self._discover_account(
token=token,
url=url,
instance=instance,
channel=channel,
filename=filename,
name=name,
proxies=ProxyConfiguration(**proxies) if proxies else None,
verify=verify,
channel_strategy=channel_strategy,
)
self._client_params = ClientParameters(
channel=self._account.channel,
token=self._account.token,
url=self._account.url,
instance=self._account.instance,
proxies=self._account.proxies,
verify=self._account.verify,
)
self._channel_strategy = channel_strategy or self._account.channel_strategy
self._channel = self._account.channel
self._programs: Dict[str, RuntimeProgram] = {}
self._backends: Dict[str, "ibm_backend.IBMBackend"] = {}
self._backend_configs: Dict[str, Any] = {}
if self._channel == "ibm_cloud":
self._api_client = RuntimeClient(self._client_params)
# TODO: We can make the backend discovery lazy
self._backends = self._discover_cloud_backends()
QiskitRuntimeService.global_service = self
return
else:
auth_client = self._authenticate_ibm_quantum_account(self._client_params)
# Update client parameters to use authenticated values.
self._client_params.url = auth_client.current_service_urls()["services"]["runtime"]
self._client_params.token = auth_client.current_access_token()
self._api_client = RuntimeClient(self._client_params)
self._hgps = self._initialize_hgps(auth_client)
for hgp in self._hgps.values():
for backend_name in hgp.backends:
if backend_name not in self._backends:
self._backends[backend_name] = None
QiskitRuntimeService.global_service = self
# TODO - it'd be nice to allow some kind of autocomplete, but `service.ibmq_foo`
# just seems wrong since backends are not runtime service instances.
# self._discover_backends()
def _discover_account(
self,
token: Optional[str] = None,
url: Optional[str] = None,
instance: Optional[str] = None,
channel: Optional[ChannelType] = None,
filename: Optional[str] = None,
name: Optional[str] = None,
proxies: Optional[ProxyConfiguration] = None,
verify: Optional[bool] = None,
channel_strategy: Optional[str] = None,
) -> Account:
"""Discover account."""
account = None
verify_ = verify or True
if channel_strategy:
if channel_strategy not in ["q-ctrl", "default"]:
raise ValueError(f"{channel_strategy} is not a valid channel strategy.")
if channel and channel != "ibm_cloud":
raise ValueError(
f"The channel strategy {channel_strategy} is "
"only supported on the ibm_cloud channel."
)
if name:
if filename:
if any([channel, token, url]):
logger.warning(
"Loading account from file %s with name %s. Any input "
"'channel', 'token' or 'url' are ignored.",
filename,
name,
)
else:
if any([channel, token, url]):
logger.warning(
"Loading account with name %s. Any input "
"'channel', 'token' or 'url' are ignored.",
name,
)
account = AccountManager.get(filename=filename, name=name)
elif channel:
if channel and channel not in ["ibm_cloud", "ibm_quantum"]:
raise ValueError("'channel' can only be 'ibm_cloud' or 'ibm_quantum'")
if token:
account = Account.create_account(
channel=channel,
token=token,
url=url,
instance=instance,
proxies=proxies,
verify=verify_,
channel_strategy=channel_strategy,
)
else:
if url:
logger.warning("Loading default %s account. Input 'url' is ignored.", channel)
account = AccountManager.get(filename=filename, name=name, channel=channel)
elif any([token, url]):
# Let's not infer based on these attributes as they may change in the future.
raise ValueError(
"'channel' is required if 'token', or 'url' is specified but 'name' is not."
)
# channel is not defined yet, get it from the AccountManager
if account is None:
account = AccountManager.get(filename=filename)
if instance:
account.instance = instance
if proxies:
account.proxies = proxies
if verify is not None:
account.verify = verify
# resolve CRN if needed
self._resolve_crn(account)
# ensure account is valid, fail early if not
account.validate()
return account
def _discover_cloud_backends(self) -> Dict[str, "ibm_backend.IBMBackend"]:
"""Return the remote backends available for this service instance.
Returns:
A dict of the remote backend instances, keyed by backend name.
"""
ret = OrderedDict() # type: ignore[var-annotated]
backends_list = self._api_client.list_backends(channel_strategy=self._channel_strategy)
for backend_name in backends_list:
raw_config = self._api_client.backend_configuration(backend_name=backend_name)
config = configuration_from_server_data(
raw_config=raw_config, instance=self._account.instance
)
if not config:
continue
ret[config.backend_name] = ibm_backend.IBMBackend(
configuration=config,
service=self,
api_client=self._api_client,
)
return ret
def _resolve_crn(self, account: Account) -> None:
account.resolve_crn()
def _authenticate_ibm_quantum_account(self, client_params: ClientParameters) -> AuthClient:
"""Authenticate against IBM Quantum and populate the hub/group/projects.
Args:
client_params: Parameters used for server connection.
Raises:
IBMInputValueError: If the URL specified is not a valid IBM Quantum authentication URL.
IBMNotAuthorizedError: If the account is not authorized to use runtime.
Returns:
Authentication client.
"""
version_info = self._check_api_version(client_params)
# Check the URL is a valid authentication URL.
if not version_info["new_api"] or "api-auth" not in version_info:
raise IBMInputValueError(
"The URL specified ({}) is not an IBM Quantum authentication URL. "
"Valid authentication URL: {}.".format(
client_params.url, QISKIT_IBM_RUNTIME_API_URL
)
)
auth_client = AuthClient(client_params)
service_urls = auth_client.current_service_urls()
if not service_urls.get("services", {}).get(SERVICE_NAME):
raise IBMNotAuthorizedError(
"This account is not authorized to use ``ibm_quantum`` runtime service."
)
return auth_client
def _initialize_hgps(
self,
auth_client: AuthClient,
) -> Dict:
"""Authenticate against IBM Quantum and populate the hub/group/projects.
Args:
auth_client: Authentication data.
Raises:
IBMInputValueError: If the URL specified is not a valid IBM Quantum authentication URL.
IBMAccountError: If no hub/group/project could be found for this account.
IBMInputValueError: If instance parameter is not found in hgps.
Returns:
The hub/group/projects for this account.
"""
# pylint: disable=unsubscriptable-object
hgps: OrderedDict[str, HubGroupProject] = OrderedDict()
service_urls = auth_client.current_service_urls()
user_hubs = auth_client.user_hubs()
for hub_info in user_hubs:
# Build credentials.
hgp_params = ClientParameters(
channel=self._account.channel,
token=auth_client.current_access_token(),
url=service_urls["services"]["runtime"],
instance=to_instance_format(
hub_info["hub"], hub_info["group"], hub_info["project"]
),
proxies=self._account.proxies,
verify=self._account.verify,
)
# Build the hgp.
try:
hgp = HubGroupProject(
client_params=hgp_params, instance=hgp_params.instance, service=self
)
hgps[hgp.name] = hgp
except Exception: # pylint: disable=broad-except
# Catch-all for errors instantiating the hgp.
logger.warning(
"Unable to instantiate hub/group/project for %s: %s",
hub_info,
traceback.format_exc(),
)
if not hgps:
raise IBMAccountError(
"No hub/group/project that supports Qiskit Runtime could "
"be found for this account."
)
# Move open hgp to end of the list
if len(hgps) > 1:
open_key, open_val = hgps.popitem(last=False)
hgps[open_key] = open_val
default_hgp = self._account.instance
if default_hgp:
if default_hgp in hgps:
# Move user selected hgp to front of the list
hgps.move_to_end(default_hgp, last=False)
else:
raise IBMInputValueError(
f"Hub/group/project {default_hgp} could not be found for this account."
)
return hgps
@staticmethod
def _check_api_version(params: ClientParameters) -> Dict[str, Union[bool, str]]:
"""Check the version of the remote server in a set of client parameters.
Args:
params: Parameters used for server connection.
Returns:
A dictionary with version information.
"""
version_finder = VersionClient(url=params.url, **params.connection_parameters())
return version_finder.version()
def _get_hgp(
self,
instance: Optional[str] = None,
backend_name: Optional[Any] = None,
) -> HubGroupProject:
"""Return an instance of `HubGroupProject`.
This function also allows to find the `HubGroupProject` that contains a backend
`backend_name`.
Args:
instance: The hub/group/project to use.
backend_name: Name of the IBM Quantum backend.
Returns:
An instance of `HubGroupProject` that matches the specified criteria or the default.
Raises:
IBMInputValueError: If no hub/group/project matches the specified criteria,
or if the input value is in an incorrect format.
QiskitBackendNotFoundError: If backend cannot be found.
"""
if instance:
_ = from_instance_format(instance) # Verify format
if instance not in self._hgps:
raise IBMInputValueError(
f"Hub/group/project {instance} " "could not be found for this account."
)
if backend_name and not self._hgps[instance].has_backend(backend_name):
raise QiskitBackendNotFoundError(
f"Backend {backend_name} cannot be found in " f"hub/group/project {instance}"
)
return self._hgps[instance]
if not backend_name:
return list(self._hgps.values())[0]
for hgp in self._hgps.values():
if hgp.has_backend(backend_name):
return hgp
error_message = (
f"Backend {backend_name} cannot be found in any " f"hub/group/project for this account."
)
if not isinstance(backend_name, str):
error_message += (
f" {backend_name} is of type {type(backend_name)} but should "
f"instead be initialized through the {self}."
)
raise QiskitBackendNotFoundError(error_message)
def _discover_backends(self) -> None:
"""Discovers the remote backends for this account, if not already known."""
for backend in self._backends.values():
backend_name = to_python_identifier(backend.name)
# Append _ if duplicate
while backend_name in self.__dict__:
backend_name += "_"
setattr(self, backend_name, backend)
# pylint: disable=arguments-differ
[docs]
def backends(
self,
name: Optional[str] = None,
min_num_qubits: Optional[int] = None,
instance: Optional[str] = None,
filters: Optional[Callable[[List["ibm_backend.IBMBackend"]], bool]] = None,
**kwargs: Any,
) -> List["ibm_backend.IBMBackend"]:
"""Return all backends accessible via this account, subject to optional filtering.
Args:
name: Backend name to filter by.
min_num_qubits: Minimum number of qubits the backend has to have.
instance: This is only supported for ``ibm_quantum`` runtime and is in the
hub/group/project format.
filters: More complex filters, such as lambda functions.
For example::
QiskitRuntimeService.backends(
filters=lambda b: b.max_shots > 50000)
QiskitRuntimeService.backends(
filters=lambda x: ("rz" in x.basis_gates )
**kwargs: Simple filters that require a specific value for an attribute in
backend configuration or status.
Examples::
# Get the operational real backends
QiskitRuntimeService.backends(simulator=False, operational=True)
# Get the backends with at least 127 qubits
QiskitRuntimeService.backends(min_num_qubits=127)
# Get the backends that support OpenPulse
QiskitRuntimeService.backends(open_pulse=True)
For the full list of backend attributes, see the `IBMBackend` class documentation
<https://qiskit.org/documentation/apidoc/providers_models.html>
Returns:
The list of available backends that match the filter.
Raises:
IBMInputValueError: If an input is invalid.
QiskitBackendNotFoundError: If the backend is not in any instance.
"""
# TODO filter out input_allowed not having runtime
backends: List[IBMBackend] = []
instance_filter = instance if instance else self._account.instance
if self._channel == "ibm_quantum":
if name:
if name not in self._backends:
raise QiskitBackendNotFoundError("No backend matches the criteria.")
if not self._backends[name] or instance != self._backends[name]._instance:
self._set_backend_config(name)
self._backends[name] = self._create_backend_obj(
self._backend_configs[name],
instance,
)
if self._backends[name]:
backends.append(self._backends[name])
elif instance_filter:
hgp = self._get_hgp(instance=instance_filter)
for backend_name in hgp.backends:
if (
not self._backends[backend_name]
or instance_filter != self._backends[backend_name]._instance
):
self._set_backend_config(backend_name, instance_filter)
self._backends[backend_name] = self._create_backend_obj(
self._backend_configs[backend_name], instance_filter
)
if self._backends[backend_name]:
backends.append(self._backends[backend_name])
else:
for backend_name, backend_config in self._backends.items():
if not backend_config:
self._set_backend_config(backend_name)
self._backends[backend_name] = self._create_backend_obj(
self._backend_configs[backend_name]
)
if self._backends[backend_name]:
backends.append(self._backends[backend_name])
else:
if instance:
raise IBMInputValueError(
"The 'instance' keyword is only supported for ``ibm_quantum`` runtime."
)
backends = list(self._backends.values())
if name:
kwargs["backend_name"] = name
if min_num_qubits:
backends = list(
filter(lambda b: b.configuration().n_qubits >= min_num_qubits, backends)
)
return filter_backends(backends, filters=filters, **kwargs)
def _set_backend_config(self, backend_name: str, instance: Optional[str] = None) -> None:
"""Retrieve backend configuration and add to backend_configs.
Args:
backend_name: backend name that will be returned.
instance: the current h/g/p.
"""
if backend_name not in self._backend_configs:
raw_config = self._api_client.backend_configuration(backend_name)
config = configuration_from_server_data(raw_config=raw_config, instance=instance)
self._backend_configs[backend_name] = config
def _create_backend_obj(
self,
config: Union[QasmBackendConfiguration, PulseBackendConfiguration],
instance: Optional[str] = None,
) -> IBMBackend:
"""Given a backend configuration return the backend object.
Args:
config: backend configuration.
instance: the current h/g/p.
Returns:
A backend object.
Raises:
QiskitBackendNotFoundError: if the backend is not in the hgp passed in.
"""
if config:
if not instance:
for hgp in list(self._hgps.values()):
if config.backend_name in hgp.backends:
instance = to_instance_format(hgp._hub, hgp._group, hgp._project)
break
elif config.backend_name not in self._get_hgp(instance=instance).backends:
raise QiskitBackendNotFoundError(
f"Backend {config.backend_name} is not in "
f"{instance}: please try a different hub/group/project."
)
return ibm_backend.IBMBackend(
instance=instance,
configuration=config,
service=self,
api_client=self._api_client,
)
return None
[docs]
def active_account(self) -> Optional[Dict[str, str]]:
"""Return the IBM Quantum account currently in use for the session.
Returns:
A dictionary with information about the account currently in the session.
"""
return self._account.to_saved_format()
[docs]
@staticmethod
def delete_account(
filename: Optional[str] = None,
name: Optional[str] = None,
channel: Optional[ChannelType] = None,
) -> bool:
"""Delete a saved account from disk.
Args:
filename: Name of file from which to delete the account.
name: Name of the saved account to delete.
channel: Channel type of the default account to delete.
Ignored if account name is provided.
Returns:
True if the account was deleted.
False if no account was found.
"""
return AccountManager.delete(filename=filename, name=name, channel=channel)
[docs]
@staticmethod
def save_account(
token: Optional[str] = None,
url: Optional[str] = None,
instance: Optional[str] = None,
channel: Optional[ChannelType] = None,
filename: Optional[str] = None,
name: Optional[str] = None,
proxies: Optional[dict] = None,
verify: Optional[bool] = None,
overwrite: Optional[bool] = False,
channel_strategy: Optional[str] = None,
set_as_default: Optional[bool] = None,
) -> None:
"""Save the account to disk for future use.
Args:
token: IBM Cloud API key or IBM Quantum API token.
url: The API URL.
Defaults to https://cloud.ibm.com (ibm_cloud) or
https://auth.quantum-computing.ibm.com/api (ibm_quantum).
instance: The CRN (ibm_cloud) or hub/group/project (ibm_quantum).
channel: Channel type. `ibm_cloud` or `ibm_quantum`.
filename: Full path of the file where the account is saved.
name: Name of the account to save.
proxies: Proxy configuration. Supported optional keys are
``urls`` (a dictionary mapping protocol or protocol and host to the URL of the proxy,
documented at https://docs.python-requests.org/en/latest/api/#requests.Session.proxies),
``username_ntlm``, ``password_ntlm`` (username and password to enable NTLM user
authentication)
verify: Verify the server's TLS certificate.
overwrite: ``True`` if the existing account is to be overwritten.
channel_strategy: Error mitigation strategy.
set_as_default: If ``True``, the account is saved in filename,
as the default account.
"""
AccountManager.save(
token=token,
url=url,
instance=instance,
channel=channel,
filename=filename,
name=name,
proxies=ProxyConfiguration(**proxies) if proxies else None,
verify=verify,
overwrite=overwrite,
channel_strategy=channel_strategy,
set_as_default=set_as_default,
)
[docs]
@staticmethod
def saved_accounts(
default: Optional[bool] = None,
channel: Optional[ChannelType] = None,
filename: Optional[str] = None,
name: Optional[str] = None,
) -> dict:
"""List the accounts saved on disk.
Args:
default: If set to True, only default accounts are returned.
channel: Channel type. `ibm_cloud` or `ibm_quantum`.
filename: Name of file whose accounts are returned.
name: If set, only accounts with the given name are returned.
Returns:
A dictionary with information about the accounts saved on disk.
Raises:
ValueError: If an invalid account is found on disk.
"""
return dict(
map(
lambda kv: (kv[0], Account.to_saved_format(kv[1])),
AccountManager.list(
default=default, channel=channel, filename=filename, name=name
).items(),
),
)
[docs]
def backend(
self,
name: str = None,
instance: Optional[str] = None,
) -> Backend:
"""Return a single backend matching the specified filtering.
Args:
name: Name of the backend.
instance: This is only supported for ``ibm_quantum`` runtime and is in the
hub/group/project format. If an instance is not given, among the providers
with access to the backend, a premium provider will be prioritized.
For users without access to a premium provider, the default open provider will be used.
Returns:
Backend: A backend matching the filtering.
Raises:
QiskitBackendNotFoundError: if no backend could be found.
"""
# pylint: disable=arguments-differ, line-too-long
backends = self.backends(name, instance=instance)
if not backends:
cloud_msg_url = ""
if self._channel == "ibm_cloud":
cloud_msg_url = (
" Learn more about available backends here "
"https://cloud.ibm.com/docs/quantum-computing?topic=quantum-computing-choose-backend "
)
raise QiskitBackendNotFoundError("No backend matches the criteria." + cloud_msg_url)
return backends[0]
[docs]
def get_backend(self, name: str = None, **kwargs: Any) -> Backend:
return self.backend(name, **kwargs)
[docs]
def pprint_programs(
self,
refresh: bool = False,
detailed: bool = False,
limit: int = 20,
skip: int = 0,
) -> None:
"""Pretty print information about available runtime programs.
Args:
refresh: If ``True``, re-query the server for the programs. Otherwise
return the cached value.
detailed: If ``True`` print all details about available runtime programs.
limit: The number of programs returned at a time. Default and maximum
value of 20.
skip: The number of programs to skip.
"""
programs = self.programs(refresh, limit, skip)
for prog in programs:
print("=" * 50)
if detailed:
print(str(prog))
else:
print(
f"{prog.program_id}:",
)
print(f" Name: {prog.name}")
print(f" Description: {prog.description}")
[docs]
def programs(
self, refresh: bool = False, limit: int = 20, skip: int = 0
) -> List[RuntimeProgram]:
"""Return available runtime programs.
Currently only program metadata is returned.
Args:
refresh: If ``True``, re-query the server for the programs. Otherwise
return the cached value.
limit: The number of programs returned at a time. ``None`` means no limit.
skip: The number of programs to skip.
Returns:
A list of runtime programs.
"""
if skip is None:
skip = 0
if not self._programs or refresh:
self._programs = {}
current_page_limit = 20
offset = 0
while True:
response = self._api_client.list_programs(limit=current_page_limit, skip=offset)
program_page = response.get("programs", [])
# count is the total number of programs that would be returned if
# there was no limit or skip
count = response.get("count", 0)
if limit is None:
limit = count
for prog_dict in program_page:
program = self._to_program(prog_dict)
self._programs[program.program_id] = program
num_cached_programs = len(self._programs)
if num_cached_programs == count or num_cached_programs >= (limit + skip):
# Stop if there are no more programs returned by the server or
# if the number of cached programs is greater than the sum of limit and skip
break
offset += len(program_page)
if limit is None:
limit = len(self._programs)
return list(self._programs.values())[skip : limit + skip]
[docs]
def program(self, program_id: str, refresh: bool = False) -> RuntimeProgram:
"""Retrieve a runtime program.
Currently only program metadata is returned.
Args:
program_id: Program ID.
refresh: If ``True``, re-query the server for the program. Otherwise
return the cached value.
Returns:
Runtime program.
Raises:
RuntimeProgramNotFound: If the program does not exist.
IBMRuntimeError: If the request failed.
"""
if program_id not in self._programs or refresh:
try:
response = self._api_client.program_get(program_id)
except RequestsApiError as ex:
if ex.status_code == 404:
raise RuntimeProgramNotFound(f"Program not found: {ex.message}") from None
raise IBMRuntimeError(f"Failed to get program: {ex}") from None
self._programs[program_id] = self._to_program(response)
return self._programs[program_id]
def _to_program(self, response: Dict) -> RuntimeProgram:
"""Convert server response to ``RuntimeProgram`` instances.
Args:
response: Server response.
Returns:
A ``RuntimeProgram`` instance.
"""
backend_requirements = {}
parameters = {}
return_values = {}
interim_results = {}
if "spec" in response:
backend_requirements = response["spec"].get("backend_requirements", {})
parameters = response["spec"].get("parameters", {})
return_values = response["spec"].get("return_values", {})
interim_results = response["spec"].get("interim_results", {})
return RuntimeProgram(
program_name=response["name"],
program_id=response["id"],
description=response.get("description", ""),
parameters=parameters,
return_values=return_values,
interim_results=interim_results,
max_execution_time=response.get("cost", 0),
creation_date=response.get("creation_date", ""),
update_date=response.get("update_date", ""),
backend_requirements=backend_requirements,
is_public=response.get("is_public", False),
data=response.get("data", ""),
api_client=self._api_client,
)
[docs]
def run(
self,
program_id: str,
inputs: Union[Dict, ParameterNamespace],
options: Optional[Union[RuntimeOptions, Dict]] = None,
callback: Optional[Callable] = None,
result_decoder: Optional[Union[Type[ResultDecoder], Sequence[Type[ResultDecoder]]]] = None,
session_id: Optional[str] = None,
start_session: Optional[bool] = False,
) -> RuntimeJob:
"""Execute the runtime program.
Args:
program_id: Program ID.
inputs: Program input parameters. These input values are passed
to the runtime program.
options: Runtime options that control the execution environment.
See :class:`RuntimeOptions` for all available options.
callback: Callback function to be invoked for any interim results and final result.
The callback function will receive 2 positional parameters:
1. Job ID
2. Job result.
result_decoder: A :class:`ResultDecoder` subclass used to decode job results.
If more than one decoder is specified, the first is used for interim results and
the second final results. If not specified, a program-specific decoder or the default
``ResultDecoder`` is used.
session_id: Job ID of the first job in a runtime session.
start_session: Set to True to explicitly start a runtime session. Defaults to False.
Returns:
A ``RuntimeJob`` instance representing the execution.
Raises:
IBMInputValueError: If input is invalid.
RuntimeProgramNotFound: If the program cannot be found.
IBMRuntimeError: An error occurred running the program.
"""
qrt_options: RuntimeOptions = options
if options is None:
qrt_options = RuntimeOptions()
elif isinstance(options, Dict):
qrt_options = RuntimeOptions(**options)
# If using params object, extract as dictionary.
if isinstance(inputs, ParameterNamespace):
inputs.validate()
inputs = vars(inputs)
qrt_options.validate(channel=self.channel)
hgp_name = None
if self._channel == "ibm_quantum":
# Find the right hgp
hgp = self._get_hgp(instance=qrt_options.instance, backend_name=qrt_options.backend)
hgp_name = hgp.name
backend = self.backend(name=qrt_options.backend, instance=hgp_name)
status = backend.status()
if status.operational is True and status.status_msg != "active":
warnings.warn(
f"The backend {backend.name} currently has a status of {status.status_msg}."
)
try:
response = self._api_client.program_run(
program_id=program_id,
backend_name=qrt_options.backend,
params=inputs,
image=qrt_options.image,
hgp=hgp_name,
log_level=qrt_options.log_level,
session_id=session_id,
job_tags=qrt_options.job_tags,
max_execution_time=qrt_options.max_execution_time,
start_session=start_session,
session_time=qrt_options.session_time,
channel_strategy=None
if self._channel_strategy == "default"
else self._channel_strategy,
)
if self._channel == "ibm_quantum":
messages = response.get("messages")
if messages:
warning_message = messages[0].get("data")
warnings.warn(warning_message)
except RequestsApiError as ex:
if ex.status_code == 404:
raise RuntimeProgramNotFound(f"Program not found: {ex.message}") from None
raise IBMRuntimeError(f"Failed to run program: {ex}") from None
backend = (
self.backend(name=response["backend"], instance=hgp_name)
if response["backend"]
else qrt_options.backend
)
job = RuntimeJob(
backend=backend,
api_client=self._api_client,
client_params=self._client_params,
job_id=response["id"],
program_id=program_id,
user_callback=callback,
result_decoder=result_decoder,
image=qrt_options.image,
service=self,
)
return job
[docs]
def upload_program(self, data: str, metadata: Optional[Union[Dict, str]] = None) -> str:
"""Upload a runtime program.
In addition to program data, the following program metadata is also
required:
- name
- max_execution_time
Program metadata can be specified using the `metadata` parameter or
individual parameter (for example, `name` and `description`). If the
same metadata field is specified in both places, the individual parameter
takes precedence. For example, if you specify::
upload_program(metadata={"name": "name1"}, name="name2")
``name2`` will be used as the program name.
Args:
data: Program data or path of the file containing program data to upload.
metadata: Name of the program metadata file or metadata dictionary.
A metadata file needs to be in the JSON format. The ``parameters``,
``return_values``, and ``interim_results`` should be defined as JSON Schema.
See :file:`program/program_metadata_sample.json` for an example. The
fields in metadata are explained below.
* name: Name of the program. Required.
* max_execution_time: Maximum execution time in seconds. Required.
* description: Program description.
* is_public: Whether the runtime program should be visible to the public.
The default is ``False``.
* spec: Specifications for backend characteristics and input parameters
required to run the program, interim results and final result.
* backend_requirements: Backend requirements.
* parameters: Program input parameters in JSON schema format.
* return_values: Program return values in JSON schema format.
* interim_results: Program interim results in JSON schema format.
Returns:
Program ID.
Raises:
IBMInputValueError: If required metadata is missing.
RuntimeDuplicateProgramError: If a program with the same name already exists.
IBMNotAuthorizedError: If you are not authorized to upload programs.
IBMRuntimeError: If the upload failed.
"""
program_metadata = self._read_metadata(metadata=metadata)
for req in ["name", "max_execution_time"]:
if req not in program_metadata or not program_metadata[req]:
raise IBMInputValueError(f"{req} is a required metadata field.")
if "def main(" not in data:
# This is the program file
with open(data, "r", encoding="utf-8") as file:
data = file.read()
try:
program_data = to_base64_string(data)
response = self._api_client.program_create(
program_data=program_data, **program_metadata
)
except RequestsApiError as ex:
if ex.status_code == 409:
raise RuntimeDuplicateProgramError(
"Program with the same name already exists."
) from None
if ex.status_code == 403:
raise IBMNotAuthorizedError("You are not authorized to upload programs.") from None
raise IBMRuntimeError(f"Failed to create program: {ex}") from None
return response["id"]
def _read_metadata(self, metadata: Optional[Union[Dict, str]] = None) -> Dict:
"""Read metadata.
Args:
metadata: Name of the program metadata file or metadata dictionary.
Returns:
Return metadata.
"""
upd_metadata: dict = {}
if metadata is not None:
if isinstance(metadata, str):
with open(metadata, "r", encoding="utf-8") as file:
upd_metadata = json.load(file)
else:
upd_metadata = metadata
# TODO validate metadata format
metadata_keys = [
"name",
"max_execution_time",
"description",
"spec",
"is_public",
]
return {key: val for key, val in upd_metadata.items() if key in metadata_keys}
[docs]
def update_program(
self,
program_id: str,
data: str = None,
metadata: Optional[Union[Dict, str]] = None,
name: str = None,
description: str = None,
max_execution_time: int = None,
spec: Optional[Dict] = None,
) -> None:
"""Update a runtime program.
Program metadata can be specified using the `metadata` parameter or
individual parameters, such as `name` and `description`. If the
same metadata field is specified in both places, the individual parameter
takes precedence.
Args:
program_id: Program ID.
data: Program data or path of the file containing program data to upload.
metadata: Name of the program metadata file or metadata dictionary.
name: New program name.
description: New program description.
max_execution_time: New maximum execution time.
spec: New specifications for backend characteristics, input parameters,
interim results and final result.
Raises:
RuntimeProgramNotFound: If the program doesn't exist.
IBMRuntimeError: If the request failed.
"""
if not any([data, metadata, name, description, max_execution_time, spec]):
warnings.warn(
"None of the 'data', 'metadata', 'name', 'description', "
"'max_execution_time', or 'spec' parameters is specified. "
"No update is made."
)
return
if data:
if "def main(" not in data:
# This is the program file
with open(data, "r", encoding="utf-8") as file:
data = file.read()
data = to_base64_string(data)
if metadata:
metadata = self._read_metadata(metadata=metadata)
combined_metadata = self._merge_metadata(
metadata=metadata,
name=name,
description=description,
max_execution_time=max_execution_time,
spec=spec,
)
try:
self._api_client.program_update(program_id, program_data=data, **combined_metadata)
except RequestsApiError as ex:
if ex.status_code == 404:
raise RuntimeProgramNotFound(f"Program not found: {ex.message}") from None
raise IBMRuntimeError(f"Failed to update program: {ex}") from None
if program_id in self._programs:
program = self._programs[program_id]
program._refresh()
def _merge_metadata(self, metadata: Optional[Dict] = None, **kwargs: Any) -> Dict:
"""Merge multiple copies of metadata.
Args:
metadata: Program metadata.
**kwargs: Additional metadata fields to overwrite.
Returns:
Merged metadata.
"""
merged = {}
metadata = metadata or {}
metadata_keys = ["name", "max_execution_time", "description", "spec"]
for key in metadata_keys:
if kwargs.get(key, None) is not None:
merged[key] = kwargs[key]
elif key in metadata.keys():
merged[key] = metadata[key]
return merged
[docs]
def delete_program(self, program_id: str) -> None:
"""Delete a runtime program.
Args:
program_id: Program ID.
Raises:
RuntimeProgramNotFound: If the program doesn't exist.
IBMRuntimeError: If the request failed.
"""
try:
self._api_client.program_delete(program_id=program_id)
except RequestsApiError as ex:
if ex.status_code == 404:
raise RuntimeProgramNotFound(f"Program not found: {ex.message}") from None
raise IBMRuntimeError(f"Failed to delete program: {ex}") from None
if program_id in self._programs:
del self._programs[program_id]
[docs]
def set_program_visibility(self, program_id: str, public: bool) -> None:
"""Sets a program's visibility.
Args:
program_id: Program ID.
public: If ``True``, make the program visible to all.
If ``False``, make the program visible to just your account.
Raises:
RuntimeProgramNotFound: if program not found (404)
IBMRuntimeError: if update failed (401, 403)
"""
try:
self._api_client.set_program_visibility(program_id, public)
except RequestsApiError as ex:
if ex.status_code == 404:
raise RuntimeProgramNotFound(f"Program not found: {ex.message}") from None
raise IBMRuntimeError(f"Failed to set program visibility: {ex}") from None
if program_id in self._programs:
program = self._programs[program_id]
program._is_public = public
[docs]
def job(self, job_id: str) -> RuntimeJob:
"""Retrieve a runtime job.
Args:
job_id: Job ID.
Returns:
Runtime job retrieved.
Raises:
RuntimeJobNotFound: If the job doesn't exist.
IBMRuntimeError: If the request failed.
"""
try:
response = self._api_client.job_get(job_id, exclude_params=True)
except RequestsApiError as ex:
if ex.status_code == 404:
raise RuntimeJobNotFound(f"Job not found: {ex.message}") from None
raise IBMRuntimeError(f"Failed to delete job: {ex}") from None
return self._decode_job(response)
[docs]
def jobs(
self,
limit: Optional[int] = 10,
skip: int = 0,
backend_name: Optional[str] = None,
pending: bool = None,
program_id: str = None,
instance: Optional[str] = None,
job_tags: Optional[List[str]] = None,
session_id: Optional[str] = None,
created_after: Optional[datetime] = None,
created_before: Optional[datetime] = None,
descending: bool = True,
) -> List[RuntimeJob]:
"""Retrieve all runtime jobs, subject to optional filtering.
Args:
limit: Number of jobs to retrieve. ``None`` means no limit.
skip: Starting index for the job retrieval.
backend_name: Name of the backend to retrieve jobs from.
pending: Filter by job pending state. If ``True``, 'QUEUED' and 'RUNNING'
jobs are included. If ``False``, 'DONE', 'CANCELLED' and 'ERROR' jobs
are included.
program_id: Filter by Program ID.
instance: This is only supported for ``ibm_quantum`` runtime and is in the
hub/group/project format.
job_tags: Filter by tags assigned to jobs. Matched jobs are associated with all tags.
session_id: Job ID of the first job in a runtime session.
created_after: Filter by the given start date, in local time. This is used to
find jobs whose creation dates are after (greater than or equal to) this
local date/time.
created_before: Filter by the given end date, in local time. This is used to
find jobs whose creation dates are before (less than or equal to) this
local date/time.
descending: If ``True``, return the jobs in descending order of the job
creation date (i.e. newest first) until the limit is reached.
Returns:
A list of runtime jobs.
Raises:
IBMInputValueError: If an input value is invalid.
"""
hub = group = project = None
if instance:
if self._channel == "ibm_cloud":
raise IBMInputValueError(
"The 'instance' keyword is only supported for ``ibm_quantum`` runtime."
)
hub, group, project = from_instance_format(instance)
job_responses = [] # type: List[Dict[str, Any]]
current_page_limit = limit or 20
offset = skip
while True:
jobs_response = self._api_client.jobs_get(
limit=current_page_limit,
skip=offset,
backend_name=backend_name,
pending=pending,
program_id=program_id,
hub=hub,
group=group,
project=project,
job_tags=job_tags,
session_id=session_id,
created_after=created_after,
created_before=created_before,
descending=descending,
)
job_page = jobs_response["jobs"]
# count is the total number of jobs that would be returned if
# there was no limit or skip
count = jobs_response["count"]
job_responses += job_page
if len(job_responses) == count - skip:
# Stop if there are no more jobs returned by the server.
break
if limit:
if len(job_responses) >= limit:
# Stop if we have reached the limit.
break
current_page_limit = limit - len(job_responses)
else:
current_page_limit = 20
offset += len(job_page)
return [self._decode_job(job) for job in job_responses]
[docs]
def delete_job(self, job_id: str) -> None:
"""Delete a runtime job.
Note that this operation cannot be reversed.
Args:
job_id: ID of the job to delete.
Raises:
RuntimeJobNotFound: If the job doesn't exist.
IBMRuntimeError: If the request failed.
"""
try:
self._api_client.job_delete(job_id)
except RequestsApiError as ex:
if ex.status_code == 404:
raise RuntimeJobNotFound(f"Job not found: {ex.message}") from None
raise IBMRuntimeError(f"Failed to delete job: {ex}") from None
def _decode_job(self, raw_data: Dict) -> RuntimeJob:
"""Decode job data received from the server.
Args:
raw_data: Raw job data received from the server.
Returns:
Decoded job data.
"""
instance = None
if self._channel == "ibm_quantum":
hub = raw_data.get("hub")
group = raw_data.get("group")
project = raw_data.get("project")
if all([hub, group, project]):
instance = to_instance_format(hub, group, project)
# Try to find the right backend
try:
if "backend" in raw_data:
backend = self.backend(raw_data["backend"], instance=instance)
else:
backend = None
except QiskitBackendNotFoundError:
backend = ibm_backend.IBMRetiredBackend.from_name(
backend_name=raw_data["backend"],
api=None,
)
params = raw_data.get("params", {})
if isinstance(params, list):
if len(params) > 0:
params = params[0]
else:
params = {}
if not isinstance(params, str):
params = json.dumps(params)
decoded = json.loads(params, cls=RuntimeDecoder)
return RuntimeJob(
backend=backend,
api_client=self._api_client,
client_params=self._client_params,
service=self,
job_id=raw_data["id"],
program_id=raw_data.get("program", {}).get("id", ""),
params=decoded,
creation_date=raw_data.get("created", None),
session_id=raw_data.get("session_id"),
tags=raw_data.get("tags"),
)
[docs]
def least_busy(
self,
min_num_qubits: Optional[int] = None,
instance: Optional[str] = None,
filters: Optional[Callable[[List["ibm_backend.IBMBackend"]], bool]] = None,
**kwargs: Any,
) -> ibm_backend.IBMBackend:
"""Return the least busy available backend.
Args:
min_num_qubits: Minimum number of qubits the backend has to have.
instance: This is only supported for ``ibm_quantum`` runtime and is in the
hub/group/project format.
filters: Filters can be defined as for the :meth:`backends` method.
An example to get the operational backends with 5 qubits::
QiskitRuntimeService.least_busy(n_qubits=5, operational=True)
Returns:
The backend with the fewest number of pending jobs.
Raises:
QiskitBackendNotFoundError: If no backend matches the criteria.
"""
backends = self.backends(
min_num_qubits=min_num_qubits, instance=instance, filters=filters, **kwargs
)
candidates = []
for back in backends:
backend_status = back.status()
if not backend_status.operational or backend_status.status_msg != "active":
continue
candidates.append(back)
if not candidates:
raise QiskitBackendNotFoundError("No backend matches the criteria.")
return min(candidates, key=lambda b: b.status().pending_jobs)
[docs]
def instances(self) -> List[str]:
"""Return the IBM Quantum instances list currently in use for the session.
Returns:
A list with instances currently in the session.
"""
if self._channel == "ibm_quantum":
return list(self._hgps.keys())
return []
@property
def channel(self) -> str:
"""Return the channel type used.
Returns:
The channel type used.
"""
return self._channel
@property
def runtime(self): # type:ignore
"""Return self for compatibility with IBMQ provider.
Returns:
self
"""
return self
def __repr__(self) -> str:
return "<{}>".format(self.__class__.__name__)