Source code for qiskit_ibm_provider.transpiler.passes.scheduling.scheduler

# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.

"""Scheduler for dynamic circuit backends."""

from abc import abstractmethod
from typing import Dict, List, Optional, Union, Set, Tuple
import itertools

import qiskit
from qiskit.circuit.parameterexpression import ParameterExpression
from qiskit.converters import circuit_to_dag
from qiskit.transpiler.basepasses import TransformationPass
from qiskit.transpiler.passes.scheduling.time_unit_conversion import TimeUnitConversion

from qiskit.circuit import Barrier, Clbit, ControlFlowOp, Measure, Qubit, Reset
from qiskit.circuit.bit import Bit
from qiskit.dagcircuit import DAGCircuit, DAGNode
from qiskit.transpiler.exceptions import TranspilerError

from .utils import block_order_op_nodes


class BaseDynamicCircuitAnalysis(TransformationPass):
    """Base class for scheduling analysis

    This is a scheduler designed to work for the unique scheduling constraints of the dynamic circuits
    backends due to the limitations imposed by hardware. This is expected to evolve over time as the
    dynamic circuit backends also change.

    The primary differences are that:

    * Resets and control-flow currently trigger the end of a "quantum block". The period between the end
        of the block and the next is *nondeterministic*
        ie., we do not know when the next block will begin (as we could be evaluating a classical
        function of nondeterministic length) and therefore the
        next block starts at a *relative* t=0.
    * During a measurement it is possible to apply gates in parallel on disjoint qubits.
    * Measurements and resets on disjoint qubits happen simultaneously and are part of the same block.
    """

    def __init__(
        self, durations: qiskit.transpiler.instruction_durations.InstructionDurations
    ) -> None:
        """Scheduler for dynamic circuit backends.

        Args:
            durations: Durations of instructions to be used in scheduling.
        """
        self._durations = durations

        self._dag: Optional[DAGCircuit] = None
        self._block_dag: Optional[DAGCircuit] = None
        self._wire_map: Optional[Dict[Bit, Bit]] = None
        self._node_mapped_wires: Optional[Dict[DAGNode, List[Bit]]] = None
        self._node_block_dags: Dict[DAGNode, DAGCircuit] = {}
        # Mapping of control-flow nodes to their containing blocks
        self._block_idx_dag_map: Dict[int, DAGCircuit] = {}
        # Mapping of block indices to the respective DAGCircuit

        self._current_block_idx = 0
        self._max_block_t1: Optional[Dict[int, int]] = None
        # Track as we build to avoid extra pass
        self._control_flow_block = False
        self._node_start_time: Optional[Dict[DAGNode, Tuple[int, int]]] = None
        self._node_stop_time: Optional[Dict[DAGNode, Tuple[int, int]]] = None
        self._bit_stop_times: Optional[Dict[int, Dict[Union[Qubit, Clbit], int]]] = None
        # Dictionary of blocks each containing a dictionary with the key for each bit
        # in the block and its value being the final time of the bit within the block.
        self._current_block_measures: Set[DAGNode] = set()
        self._current_block_measures_has_reset: bool = False
        self._node_tied_to: Optional[Dict[DAGNode, Set[DAGNode]]] = None
        # Nodes that the scheduling of this node is tied to.
        self._bit_indices: Optional[Dict[Qubit, int]] = None

        self._time_unit_converter = TimeUnitConversion(durations)

        super().__init__()

    @property
    def _current_block_bit_times(self) -> Dict[Union[Qubit, Clbit], int]:
        return self._bit_stop_times[self._current_block_idx]

    def _visit_block(self, block: DAGCircuit, wire_map: Dict[Qubit, Qubit]) -> None:
        # Push the previous block dag onto the stack
        prev_block_dag = self._block_dag
        self._block_dag = block
        prev_wire_map, self._wire_map = self._wire_map, wire_map

        # We must run this on the individual block
        # as the current implementation does not recurse
        # into the circuit structure.
        self._time_unit_converter.run(block)
        self._begin_new_circuit_block()

        for node in block_order_op_nodes(block):
            self._visit_node(node)

        # Final flush
        self._flush_measures()

        # Pop the previous block dag off the stack restoring it
        self._block_dag = prev_block_dag
        self._wire_map = prev_wire_map

    def _visit_node(self, node: DAGNode) -> None:
        if isinstance(node.op, ControlFlowOp):
            self._visit_control_flow_op(node)
        elif node.op.condition_bits:
            raise TranspilerError(
                "c_if control-flow is not supported by this pass. "
                'Please apply "ConvertConditionsToIfOps" to convert these '
                "conditional operations to new-style Qiskit control-flow."
            )
        else:
            if isinstance(node.op, Measure):
                self._visit_measure(node)
            elif isinstance(node.op, Reset):
                self._visit_reset(node)
            else:
                self._visit_generic(node)

    def _visit_control_flow_op(self, node: DAGNode) -> None:
        # TODO: This is a hack required to tie nodes of control-flow
        # blocks across the scheduler and block_base_padder. This is
        # because the current control flow nodes store the block as a
        # circuit which is not hashable. For processing we are currently
        # required to convert each circuit block to a dag which is inefficient
        # and causes node relationships stored in analysis to be lost between
        # passes as we are constantly recreating the block dags.
        # We resolve this here by caching these dags in the property set.
        self._node_block_dags[node] = node_block_dags = []

        t0 = max(  # pylint: disable=invalid-name
            self._current_block_bit_times[bit] for bit in self._map_wires(node)
        )

        # Duration is 0 as we do not schedule across terminator
        t1 = t0  # pylint: disable=invalid-name
        self._update_bit_times(node, t0, t1)

        for block in node.op.blocks:
            self._control_flow_block = True

            new_dag = circuit_to_dag(block)
            inner_wire_map = {
                inner: outer
                for outer, inner in zip(
                    self._map_wires(node), new_dag.qubits + new_dag.clbits
                )
            }
            node_block_dags.append(new_dag)
            self._visit_block(new_dag, inner_wire_map)

        # Begin new block for exit to "then" block.
        self._begin_new_circuit_block()

    @abstractmethod
    def _visit_measure(self, node: DAGNode) -> None:
        raise NotImplementedError

    @abstractmethod
    def _visit_reset(self, node: DAGNode) -> None:
        raise NotImplementedError

    @abstractmethod
    def _visit_generic(self, node: DAGNode) -> None:
        raise NotImplementedError

    def _init_run(self, dag: DAGCircuit) -> None:
        """Setup for initial run."""

        self._dag = dag
        self._block_dag = None
        self._wire_map = {wire: wire for wire in dag.wires}
        self._node_mapped_wires = {}
        self._node_block_dags = {}
        self._block_idx_dag_map = {}

        self._current_block_idx = 0
        self._max_block_t1 = {}
        self._control_flow_block = False

        if len(dag.qregs) != 1 or dag.qregs.get("q", None) is None:
            raise TranspilerError("ASAP schedule runs on physical circuits only")

        self._node_start_time = {}
        self._node_stop_time = {}
        self._bit_stop_times = {0: {q: 0 for q in dag.qubits + dag.clbits}}
        self._current_block_measures = set()
        self._current_block_measures_has_reset = False
        self._node_tied_to = {}
        self._bit_indices = {q: index for index, q in enumerate(dag.qubits)}

    def _get_duration(self, node: DAGNode, dag: Optional[DAGCircuit] = None) -> int:
        if node.op.condition_bits or isinstance(node.op, ControlFlowOp):
            # As we cannot currently schedule through conditionals model
            # as zero duration to avoid padding.
            return 0

        indices = [self._bit_indices[qarg] for qarg in self._map_qubits(node)]

        # Fall back to current block dag if not specified.
        dag = dag or self._block_dag

        if dag.has_calibration_for(node):
            # If node has calibration, this value should be the highest priority
            cal_key = tuple(indices), tuple(float(p) for p in node.op.params)
            duration = dag.calibrations[node.op.name][cal_key].duration
            node.op.duration = duration
        else:
            duration = node.op.duration

        if isinstance(duration, ParameterExpression):
            raise TranspilerError(
                f"Parameterized duration ({duration}) "
                f"of {node.op.name} on qubits {indices} is not bounded."
            )
        if duration is None:
            raise TranspilerError(
                f"Duration of {node.op.name} on qubits {indices} is not found."
            )

        return duration

    def _update_bit_times(  # pylint: disable=invalid-name
        self, node: DAGNode, t0: int, t1: int, update_cargs: bool = True
    ) -> None:
        self._max_block_t1[self._current_block_idx] = max(
            self._max_block_t1.get(self._current_block_idx, 0), t1
        )

        update_bits = self._map_wires(node) if update_cargs else self._map_qubits(node)
        for bit in update_bits:
            self._current_block_bit_times[bit] = t1

        self._node_start_time[node] = (self._current_block_idx, t0)
        self._node_stop_time[node] = (self._current_block_idx, t1)

    def _begin_new_circuit_block(self) -> None:
        """Create a new timed circuit block completing the previous block."""
        self._current_block_idx += 1
        self._block_idx_dag_map[self._current_block_idx] = self._block_dag
        self._control_flow_block = False
        self._bit_stop_times[self._current_block_idx] = {
            self._wire_map[wire]: 0 for wire in self._block_dag.wires
        }
        self._flush_measures()

    def _flush_measures(self) -> None:
        """Flush currently accumulated measurements by resetting block measures."""
        for node in self._current_block_measures:
            self._node_tied_to[node] = self._current_block_measures.copy()

        self._current_block_measures = set()
        self._current_block_measures_has_reset = False

    def _current_block_measure_qargs(self) -> Set[Qubit]:
        return set(
            qarg
            for measure in self._current_block_measures
            for qarg in self._map_qubits(measure)
        )

    def _check_flush_measures(self, node: DAGNode) -> None:
        if self._current_block_measure_qargs() & set(self._map_qubits(node)):
            if self._current_block_measures_has_reset:
                # If a reset is included we must trigger the end of a block.
                self._begin_new_circuit_block()
            else:
                # Otherwise just trigger a measurement flush
                self._flush_measures()

    def _map_wires(self, node: DAGNode) -> List[Qubit]:
        """Map the wires from the current node to the top-level block's wires.

        TODO: We should have an easier approach to wire mapping from the transpiler.
        """
        if node not in self._node_mapped_wires:
            self._node_mapped_wires[node] = wire_map = [
                self._wire_map[q] for q in node.qargs + node.cargs
            ]
            return wire_map

        return self._node_mapped_wires[node]

    def _map_qubits(self, node: DAGNode) -> List[Qubit]:
        """Map the qubits from the current node to the top-level block's qubits.

        TODO: We should have an easier approach to wire mapping from the transpiler.
        """
        return [wire for wire in self._map_wires(node) if isinstance(wire, Qubit)]


[docs]class ASAPScheduleAnalysis(BaseDynamicCircuitAnalysis): """Dynamic circuits as-soon-as-possible (ASAP) scheduling analysis pass. This is a scheduler designed to work for the unique scheduling constraints of the dynamic circuits backends due to the limitations imposed by hardware. This is expected to evolve over time as the dynamic circuit backends also change. In its current form this is similar to Qiskit's ASAP scheduler in which instructions start as early as possible. The primary differences are that: * Resets and control-flow currently trigger the end of a "quantum block". The period between the end of the block and the next is *nondeterministic* ie., we do not know when the next block will begin (as we could be evaluating a classical function of nondeterministic length) and therefore the next block starts at a *relative* t=0. * During a measurement it is possible to apply gates in parallel on disjoint qubits. * Measurements and resets on disjoint qubits happen simultaneously and are part of the same block. """
[docs] def run(self, dag: DAGCircuit) -> DAGCircuit: """Run the ALAPSchedule pass on `dag`. Args: dag (DAGCircuit): DAG to schedule. Raises: TranspilerError: if the circuit is not mapped on physical qubits. TranspilerError: if conditional bit is added to non-supported instruction. Returns: The scheduled DAGCircuit. """ self._init_run(dag) # Trivial wire map at the top-level wire_map = {wire: wire for wire in dag.wires} # Top-level dag is the entry block self._visit_block(dag, wire_map) self.property_set["node_start_time"] = self._node_start_time self.property_set["node_block_dags"] = self._node_block_dags return dag
def _visit_measure(self, node: DAGNode) -> None: """Visit a measurement node. Measurement currently triggers the end of a deterministically scheduled block of instructions in IBM dynamic circuits hardware. This means that it is possible to schedule *up to* a measurement (and during its pulses) but the measurement will be followed by a period of indeterminism. All measurements on disjoint qubits that topologically follow another measurement will be collected and performed in parallel. A measurement on a qubit intersecting with the set of qubits to be measured in parallel will trigger the end of a scheduling block with said measurement occurring in a following block which begins another grouping sequence. This behavior will change in future backend software updates.""" current_block_measure_qargs = self._current_block_measure_qargs() # We handle a set of qubits here as _visit_reset currently calls # this method and a reset may have multiple qubits. measure_qargs = set(self._map_qubits(node)) t0q = max( self._current_block_bit_times[q] for q in measure_qargs ) # pylint: disable=invalid-name # If the measurement qubits overlap, we need to flush measurements and start a # new scheduling block. if current_block_measure_qargs & measure_qargs: if self._current_block_measures_has_reset: # If a reset is included we must trigger the end of a block. self._begin_new_circuit_block() t0q = 0 else: # Otherwise just trigger a measurement flush self._flush_measures() else: # Otherwise we need to increment all measurements to start at the same time within the block. t0q = max( # pylint: disable=invalid-name itertools.chain( [t0q], ( self._node_start_time[measure][1] for measure in self._current_block_measures ), ) ) # Insert this measure into the block self._current_block_measures.add(node) for measure in self._current_block_measures: t0 = t0q # pylint: disable=invalid-name bit_indices = { bit: index for index, bit in enumerate(self._block_dag.qubits) } measure_duration = self._durations.get( Measure(), [bit_indices[qarg] for qarg in self._map_qubits(measure)], unit="dt", ) t1 = t0 + measure_duration # pylint: disable=invalid-name self._update_bit_times(measure, t0, t1) def _visit_reset(self, node: DAGNode) -> None: """Visit a reset node. Reset currently triggers the end of a pulse block in IBM dynamic circuits hardware as conditional reset is performed internally using a c_if. This means that it is possible to schedule *up to* a reset (and during its measurement pulses) but the reset will be followed by a period of conditional indeterminism. All resets on disjoint qubits will be collected on the same qubits to be run simultaneously. """ # Process as measurement self._current_block_measures_has_reset = True self._visit_measure(node) # Then set that we are now a conditional node. self._control_flow_block = True def _visit_generic(self, node: DAGNode) -> None: """Visit a generic node such as a gate or barrier.""" op_duration = self._get_duration(node) # If the measurement qubits overlap, we need to flush the measurement group self._check_flush_measures(node) t0 = max( # pylint: disable=invalid-name self._current_block_bit_times[bit] for bit in self._map_wires(node) ) t1 = t0 + op_duration # pylint: disable=invalid-name self._update_bit_times(node, t0, t1)
[docs]class ALAPScheduleAnalysis(BaseDynamicCircuitAnalysis): """Dynamic circuits as-late-as-possible (ALAP) scheduling analysis pass. This is a scheduler designed to work for the unique scheduling constraints of the dynamic circuits backends due to the limitations imposed by hardware. This is expected to evolve over time as the dynamic circuit backends also change. In its current form this is similar to Qiskit's ALAP scheduler in which instructions start as late as possible. The primary differences are that: * Resets and control-flow currently trigger the end of a "quantum block". The period between the end of the block and the next is *nondeterministic* ie., we do not know when the next block will begin (as we could be evaluating a classical function of nondeterministic length) and therefore the next block starts at a *relative* t=0. * During a measurement it is possible to apply gates in parallel on disjoint qubits. * Measurements and resets on disjoint qubits happen simultaneously and are part of the same block. """
[docs] def run(self, dag: DAGCircuit) -> None: """Run the ASAPSchedule pass on `dag`. Args: dag (DAGCircuit): DAG to schedule. Raises: TranspilerError: if the circuit is not mapped on physical qubits. TranspilerError: if conditional bit is added to non-supported instruction. Returns: The scheduled DAGCircuit. """ self._init_run(dag) # Trivial wire map at the top-level wire_map = {wire: wire for wire in dag.wires} # Top-level dag is the entry block self._visit_block(dag, wire_map) self._push_block_durations() self.property_set["node_start_time"] = self._node_start_time self.property_set["node_block_dags"] = self._node_block_dags return dag
def _visit_measure(self, node: DAGNode) -> None: """Visit a measurement node. Measurement currently triggers the end of a deterministically scheduled block of instructions in IBM dynamic circuits hardware. This means that it is possible to schedule *up to* a measurement (and during its pulses) but the measurement will be followed by a period of indeterminism. All measurements on disjoint qubits that topologically follow another measurement will be collected and performed in parallel. A measurement on a qubit intersecting with the set of qubits to be measured in parallel will trigger the end of a scheduling block with said measurement occurring in a following block which begins another grouping sequence. This behavior will change in future backend software updates.""" current_block_measure_qargs = self._current_block_measure_qargs() # We handle a set of qubits here as _visit_reset currently calls # this method and a reset may have multiple qubits. measure_qargs = set(self._map_qubits(node)) t0q = max( self._current_block_bit_times[q] for q in measure_qargs ) # pylint: disable=invalid-name # If the measurement qubits overlap, we need to flush measurements and start a # new scheduling block. if current_block_measure_qargs & measure_qargs: if self._current_block_measures_has_reset: # If a reset is included we must trigger the end of a block. self._begin_new_circuit_block() t0q = 0 else: # Otherwise just trigger a measurement flush self._flush_measures() else: # Otherwise we need to increment all measurements to start at the same time within the block. t0q = max( # pylint: disable=invalid-name itertools.chain( [t0q], ( self._node_start_time[measure][1] for measure in self._current_block_measures ), ) ) # Insert this measure into the block self._current_block_measures.add(node) for measure in self._current_block_measures: t0 = t0q # pylint: disable=invalid-name bit_indices = { bit: index for index, bit in enumerate(self._block_dag.qubits) } measure_duration = self._durations.get( Measure(), [bit_indices[qarg] for qarg in self._map_qubits(measure)], unit="dt", ) t1 = t0 + measure_duration # pylint: disable=invalid-name self._update_bit_times(measure, t0, t1) def _visit_reset(self, node: DAGNode) -> None: """Visit a reset node. Reset currently triggers the end of a pulse block in IBM dynamic circuits hardware as conditional reset is performed internally using a c_if. This means that it is possible to schedule *up to* a reset (and during its measurement pulses) but the reset will be followed by a period of conditional indeterminism. All resets on disjoint qubits will be collected on the same qubits to be run simultaneously. """ # Process as measurement self._current_block_measures_has_reset = True self._visit_measure(node) # Then set that we are now a conditional node. self._control_flow_block = True def _visit_generic(self, node: DAGNode) -> None: """Visit a generic node such as a gate or barrier.""" # If True we are coming from a conditional block. # start a new block for the unconditional operations. if self._control_flow_block: self._begin_new_circuit_block() op_duration = self._get_duration(node) # If the measurement qubits overlap, we need to flush the measurement group self._check_flush_measures(node) t0 = max( # pylint: disable=invalid-name self._current_block_bit_times[bit] for bit in self._map_wires(node) ) t1 = t0 + op_duration # pylint: disable=invalid-name self._update_bit_times(node, t0, t1) def _push_block_durations(self) -> None: """After scheduling of each block, pass over and push the times of all nodes.""" # Store the next available time to push to for the block by bit block_bit_times = {} # Iterated nodes starting at the first, from the node with the # last time, preferring barriers over non-barriers def order_ops( item: Tuple[DAGNode, Tuple[int, int]] ) -> Tuple[int, int, bool, int]: """Iterated nodes ordering by channel, time and preferring that barriers are processed first.""" return ( item[1][0], -item[1][1], not isinstance(item[0].op, Barrier), self._get_duration(item[0], dag=self._block_idx_dag_map[item[1][0]]), ) iterate_nodes = sorted(self._node_stop_time.items(), key=order_ops) new_node_start_time = {} new_node_stop_time = {} def _calculate_new_times( block: int, node: DAGNode, block_bit_times: Dict[int, Dict[Qubit, int]] ) -> int: max_block_time = min( block_bit_times[block][bit] for bit in self._map_qubits(node) ) t0 = self._node_start_time[node][1] # pylint: disable=invalid-name t1 = self._node_stop_time[node][1] # pylint: disable=invalid-name # Determine how much to shift by node_offset = max_block_time - t1 new_t0 = t0 + node_offset return new_t0 scheduled = set() def _update_time( block: int, node: DAGNode, new_time: int, block_bit_times: Dict[int, Dict[Qubit, int]], ) -> None: scheduled.add(node) new_node_start_time[node] = (block, new_time) new_node_stop_time[node] = ( block, new_time + self._get_duration(node, dag=self._block_idx_dag_map[block]), ) # Update available times by bit for bit in self._map_qubits(node): block_bit_times[block][bit] = new_time for node, ( block, _, ) in iterate_nodes: # pylint: disable=invalid-name # skip already scheduled if node in scheduled: continue # Start with last time as the time to push to if block not in block_bit_times: block_bit_times[block] = { q: self._max_block_t1[block] for q in self._dag.wires } # Calculate the latest available time to push to collectively for tied nodes tied_nodes = self._node_tied_to.get(node, None) if tied_nodes is not None: # Take the minimum time that will be schedulable # self._node_tied_to includes the node itself. new_times = [ _calculate_new_times(block, tied_node, block_bit_times) for tied_node in self._node_tied_to[node] ] new_time = min(new_times) for tied_node in tied_nodes: _update_time(block, tied_node, new_time, block_bit_times) else: new_t0 = _calculate_new_times(block, node, block_bit_times) _update_time(block, node, new_t0, block_bit_times) self._node_start_time = new_node_start_time self._node_stop_time = new_node_stop_time