Source code for qiskit_ibm_provider.job.job_monitor

# This code is part of Qiskit.
# (C) Copyright IBM 2021.
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""A module for monitoring jobs."""

import sys
import time
from typing import TextIO, Optional

from .ibm_job import IBMJob
from ..utils.converters import duration_difference

def _text_checker(
    job: IBMJob,
    interval: float,
    _interval_set: bool = False,
    output: TextIO = sys.stdout,
) -> None:
    """A text-based job status checker.

        job: The job to check.
        interval: The interval at which to check.
        _interval_set: Was interval time set by user?
        output: The file like object to write status messages to.
            By default this is sys.stdout.

    status = job.status()
    msg = status.value
    prev_msg = msg
    msg_len = len(msg)
    prev_time_str = ""

    print("\r%s: %s" % ("Job Status", msg), end="", file=output)
    while not in ["DONE", "CANCELLED", "ERROR"]:
        status = job.status()
        msg = status.value

        if == "QUEUED":
            queue_info = job.queue_info()

            if queue_info:
                if queue_info.estimated_start_time:
                    est_time = queue_info.estimated_start_time
                    time_str = duration_difference(est_time)
                    prev_time_str = time_str

                time_str = prev_time_str

            msg += " ({queue}) [Est. wait time: {time}]".format(
                queue=job.queue_position(), time=time_str

            if job.queue_position() is None:
                if not _interval_set:
                    interval = 2
            elif not _interval_set:
                interval = max(job.queue_position(), 2)
            if not _interval_set:
                interval = 2

        if == "RUNNING":
            msg = "RUNNING"
            job_mode = job.scheduling_mode()
            if job_mode:
                msg += " - {}".format(job_mode)

        elif == "ERROR":
            msg = "ERROR - {}".format(job.error_message())

        # Adjust length of message so there are no artifacts
        if len(msg) < msg_len:
            msg += " " * (msg_len - len(msg))
        elif len(msg) > msg_len:
            msg_len = len(msg)

        if msg != prev_msg:
            print("\r%s: %s" % ("Job Status", msg), end="", file=output)
            prev_msg = msg

    print("", file=output)

[docs]def job_monitor( job: IBMJob, interval: Optional[float] = None, output: TextIO = sys.stdout ) -> None: """Monitor the status of an ``IBMJob`` instance. Args: job: Job to monitor. interval: Time interval between status queries. output: The file like object to write status messages to. By default this is sys.stdout. """ if interval is None: _interval_set = False interval = 5 else: _interval_set = True _text_checker(job, interval, _interval_set, output=output)