Source code for qiskit.providers.ibmq.runtime.runtime_job

# This code is part of Qiskit.
# (C) Copyright IBM 2021.
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Qiskit runtime job."""

from typing import Any, Optional, Callable, Dict, Type
import time
import logging
from concurrent import futures
import traceback
import queue
from datetime import datetime

from qiskit.providers.exceptions import JobTimeoutError
from qiskit.providers.backend import Backend
from qiskit.providers.jobstatus import JobStatus, JOB_FINAL_STATES

from .exceptions import RuntimeJobFailureError, RuntimeInvalidStateError, QiskitRuntimeError
from .program.result_decoder import ResultDecoder
from ..api.clients import RuntimeClient, RuntimeWebsocketClient, WebsocketClientCloseCode
from ..exceptions import IBMQError
from ..api.exceptions import RequestsApiError
from ..utils.converters import utc_to_local
from ..credentials import Credentials

logger = logging.getLogger(__name__)

[docs]class RuntimeJob: """Representation of a runtime program execution. A new ``RuntimeJob`` instance is returned when you call :meth:`IBMRuntimeService.run<qiskit.providers.ibmq.runtime.IBMRuntimeService.run>` to execute a runtime program, or :meth:`IBMRuntimeService.job<qiskit.providers.ibmq.runtime.IBMRuntimeService.job>` to retrieve a previously executed job. If the program execution is successful, you can inspect the job's status by calling :meth:`status()`. Job status can be one of the :class:`~qiskit.providers.JobStatus` members. Some of the methods in this class are blocking, which means control may not be returned immediately. :meth:`result()` is an example of a blocking method:: job = provider.runtime.run(...) try: job_result = job.result() # It will block until the job finishes. print("The job finished with result {}".format(job_result)) except RuntimeJobFailureError as ex: print("Job failed!: {}".format(ex)) If the program has any interim results, you can use the ``callback`` parameter of the :meth:`~qiskit.providers.ibmq.runtime.IBMRuntimeService.run` method to stream the interim results. Alternatively, you can use the :meth:`stream_results` method to stream the results at a later time, but before the job finishes. """ _POISON_PILL = "_poison_pill" """Used to inform streaming to stop.""" _executor = futures.ThreadPoolExecutor(thread_name_prefix="runtime_job") def __init__( self, backend: Backend, api_client: RuntimeClient, credentials: Credentials, job_id: str, program_id: str, params: Optional[Dict] = None, creation_date: Optional[str] = None, user_callback: Optional[Callable] = None, result_decoder: Type[ResultDecoder] = ResultDecoder, image: Optional[str] = "" ) -> None: """RuntimeJob constructor. Args: backend: The backend instance used to run this job. api_client: Object for connecting to the server. credentials: Account credentials. job_id: Job ID. program_id: ID of the program this job is for. params: Job parameters. creation_date: Job creation date, in UTC. user_callback: User callback function. result_decoder: A :class:`ResultDecoder` subclass used to decode job results. image: Runtime image used for this job: image_name:tag. """ self._job_id = job_id self._backend = backend self._api_client = api_client self._results: Optional[Any] = None self._params = params or {} self._creation_date = creation_date self._program_id = program_id self._status = JobStatus.INITIALIZING self._error_message = None # type: Optional[str] self._result_decoder = result_decoder self._image = image # Used for streaming self._ws_client_future = None # type: Optional[futures.Future] self._result_queue = queue.Queue() # type: queue.Queue self._ws_client = RuntimeWebsocketClient( websocket_url=credentials.runtime_url.replace('https', 'wss'), credentials=credentials, job_id=job_id, message_queue=self._result_queue) if user_callback is not None: self.stream_results(user_callback)
[docs] def result( self, timeout: Optional[float] = None, wait: float = 5, decoder: Optional[Type[ResultDecoder]] = None ) -> Any: """Return the results of the job. Args: timeout: Number of seconds to wait for job. wait: Seconds between queries. decoder: A :class:`ResultDecoder` subclass used to decode job results. Returns: Runtime job result. Raises: RuntimeJobFailureError: If the job failed. """ _decoder = decoder or self._result_decoder if self._results is None or (_decoder != self._result_decoder): self.wait_for_final_state(timeout=timeout, wait=wait) if self._status == JobStatus.ERROR: raise RuntimeJobFailureError(f"Unable to retrieve job result. " f"{self.error_message()}") result_raw = self._api_client.job_results(job_id=self.job_id()) self._results = _decoder.decode(result_raw) return self._results
[docs] def cancel(self) -> None: """Cancel the job. Raises: RuntimeInvalidStateError: If the job is in a state that cannot be cancelled. QiskitRuntimeError: If unable to cancel job. """ try: self._api_client.job_cancel(self.job_id()) except RequestsApiError as ex: if ex.status_code == 409: raise RuntimeInvalidStateError(f"Job cannot be cancelled: {ex}") from None raise QiskitRuntimeError(f"Failed to cancel job: {ex}") from None self.cancel_result_streaming() self._status = JobStatus.CANCELLED
[docs] def status(self) -> JobStatus: """Return the status of the job. Returns: Status of this job. """ self._set_status_and_error_message() return self._status
[docs] def error_message(self) -> Optional[str]: """Returns the reason if the job failed. Returns: Error message string or ``None``. """ self._set_status_and_error_message() return self._error_message
[docs] def wait_for_final_state( self, timeout: Optional[float] = None, wait: float = 5 ) -> None: """Poll the job status until it progresses to a final state such as ``DONE`` or ``ERROR``. Args: timeout: Seconds to wait for the job. If ``None``, wait indefinitely. wait: Seconds between queries. Raises: JobTimeoutError: If the job does not reach a final state before the specified timeout. """ start_time = time.time() status = self.status() while status not in JOB_FINAL_STATES: elapsed_time = time.time() - start_time if timeout is not None and elapsed_time >= timeout: raise JobTimeoutError( 'Timeout while waiting for job {}.'.format(self.job_id())) time.sleep(wait) status = self.status()
[docs] def stream_results( self, callback: Callable, decoder: Optional[Type[ResultDecoder]] = None ) -> None: """Start streaming job results. Args: callback: Callback function to be invoked for any interim results. The callback function will receive 2 positional parameters: 1. Job ID 2. Job interim result. decoder: A :class:`ResultDecoder` subclass used to decode job results. Raises: RuntimeInvalidStateError: If a callback function is already streaming results or if the job already finished. """ if self._is_streaming(): raise RuntimeInvalidStateError("A callback function is already streaming results.") if self._status in JOB_FINAL_STATES: raise RuntimeInvalidStateError("Job already finished.") self._ws_client_future = self._executor.submit(self._start_websocket_client) self._executor.submit(self._stream_results, result_queue=self._result_queue, user_callback=callback, decoder=decoder)
[docs] def cancel_result_streaming(self) -> None: """Cancel result streaming.""" if not self._is_streaming(): return self._ws_client.disconnect(WebsocketClientCloseCode.CANCEL)
[docs] def logs(self) -> str: """Return job logs. Note: Job logs are only available after the job finishes. Returns: Job logs, including standard output and error. Raises: QiskitRuntimeError: If a network error occurred. """ if self.status() not in JOB_FINAL_STATES: logger.warning("Job logs are only available after the job finishes.") try: return self._api_client.job_logs(self.job_id()) except RequestsApiError as err: if err.status_code == 404: return "" raise QiskitRuntimeError(f"Failed to get job logs: {err}") from None
def _set_status_and_error_message(self) -> None: """Fetch and set status and error message.""" if self._status not in JOB_FINAL_STATES: response = self._api_client.job_get(job_id=self.job_id()) self._set_status(response) self._set_error_message(response) def _set_status(self, job_response: Dict) -> None: """Set status. Args: job_response: Job response from runtime API. Raises: IBMQError: If an unknown status is returned from the server. """ try: self._status = API_TO_JOB_STATUS[job_response['status'].upper()] except KeyError: raise IBMQError(f"Unknown status: {job_response['status']}") def _set_error_message(self, job_response: Dict) -> None: """Set error message if the job failed. Args: job_response: Job response from runtime API. """ if self._status == JobStatus.ERROR: job_result_raw = self._api_client.job_results(job_id=self.job_id()) self._error_message = API_TO_JOB_ERROR_MESSAGE[job_response['status'].upper()].format( self.job_id(), job_result_raw) else: self._error_message = None def _is_streaming(self) -> bool: """Return whether job results are being streamed. Returns: Whether job results are being streamed. """ if self._ws_client_future is None: return False if self._ws_client_future.done(): return False return True def _start_websocket_client( self ) -> None: """Start websocket client to stream results.""" try: logger.debug("Start websocket client for job %s", self.job_id()) self._ws_client.job_results() except Exception: # pylint: disable=broad-except logger.warning( "An error occurred while streaming results " "from the server for job %s:\n%s", self.job_id(), traceback.format_exc()) finally: self._result_queue.put_nowait(self._POISON_PILL) def _stream_results( self, result_queue: queue.Queue, user_callback: Callable, decoder: Optional[Type[ResultDecoder]] = None ) -> None: """Stream interim results. Args: result_queue: Queue used to pass websocket messages. user_callback: User callback function. decoder: A :class:`ResultDecoder` (sub)class used to decode job results. """ logger.debug("Start result streaming for job %s", self.job_id()) _decoder = decoder or self._result_decoder while True: try: response = result_queue.get() if response == self._POISON_PILL: self._empty_result_queue(result_queue) return user_callback(self.job_id(), _decoder.decode(response)) except Exception: # pylint: disable=broad-except logger.warning( "An error occurred while streaming results " "for job %s:\n%s", self.job_id(), traceback.format_exc()) def _empty_result_queue(self, result_queue: queue.Queue) -> None: """Empty the result queue. Args: result_queue: Result queue to empty. """ try: while True: result_queue.get_nowait() except queue.Empty: pass
[docs] def job_id(self) -> str: """Return a unique ID identifying the job. Returns: Job ID. """ return self._job_id
[docs] def backend(self) -> Backend: """Return the backend where this job was executed. Returns: Backend used for the job. """ return self._backend
@property def image(self) -> str: """Return the runtime image used for the job. Returns: Runtime image: image_name:tag or "" if the default image is used. """ return self._image @property def inputs(self) -> Dict: """Job input parameters. Returns: Input parameters used in this job. """ return self._params @property def program_id(self) -> str: """Program ID. Returns: ID of the program this job is for. """ return self._program_id @property def creation_date(self) -> Optional[datetime]: """Job creation date in local time. Returns: The job creation date as a datetime object, in local time, or ``None`` if creation date is not available. """ if not self._creation_date: response = self._api_client.job_get(job_id=self.job_id()) self._creation_date = response.get('created', None) if not self._creation_date: return None creation_date_local_dt = utc_to_local(self._creation_date) return creation_date_local_dt