# This code is part of Qiskit.
#
# (C) Copyright IBM 2022.
#
# This code is licensed under the Apache License, Version 2.0. You may
# obtain a copy of this license in the LICENSE.txt file in the root directory
# of this source tree or at http://www.apache.org/licenses/LICENSE-2.0.
#
# Any modifications or derivative works of this code must retain this
# copyright notice, and modified files need to carry a notice indicating
# that they have been altered from the originals.
"""Padding pass to fill timeslots for IBM (dynamic circuit) backends."""
from typing import Dict, Iterable, List, Optional, Union, Set
from qiskit.circuit import (
Qubit,
Clbit,
ControlFlowOp,
Gate,
IfElseOp,
Instruction,
Measure,
)
from qiskit.circuit.bit import Bit
from qiskit.circuit.library import Barrier
from qiskit.circuit.delay import Delay
from qiskit.circuit.parameterexpression import ParameterExpression
from qiskit.converters import dag_to_circuit
from qiskit.dagcircuit import DAGCircuit, DAGNode
from qiskit.transpiler.basepasses import TransformationPass
from qiskit.transpiler.exceptions import TranspilerError
from .utils import block_order_op_nodes
[docs]class BlockBasePadder(TransformationPass):
"""The base class of padding pass.
This pass requires one of scheduling passes to be executed before itself.
Since there are multiple scheduling strategies, the selection of scheduling
pass is left in the hands of the pass manager designer.
Once a scheduling analysis pass is run, ``node_start_time`` is generated
in the :attr:`property_set`. This information is represented by a python dictionary of
the expected instruction execution times keyed on the node instances.
The padding pass expects all ``DAGOpNode`` in the circuit to be scheduled.
This base class doesn't define any sequence to interleave, but it manages
the location where the sequence is inserted, and provides a set of information necessary
to construct the proper sequence. Thus, a subclass of this pass just needs to implement
:meth:`_pad` method, in which the subclass constructs a circuit block to insert.
This mechanism removes lots of boilerplate logic to manage whole DAG circuits.
Note that padding pass subclasses should define interleaving sequences satisfying:
- Interleaved sequence does not change start time of other nodes
- Interleaved sequence should have total duration of the provided ``time_interval``.
Any manipulation violating these constraints may prevent this base pass from correctly
tracking the start time of each instruction,
which may result in violation of hardware alignment constraints.
"""
def __init__(self, schedule_idle_qubits: bool = False) -> None:
self._node_start_time = None
self._node_block_dags = None
self._idle_after: Optional[Dict[Qubit, int]] = None
self._root_dag = None
self._dag = None
self._block_dag = None
self._prev_node: Optional[DAGNode] = None
self._wire_map: Optional[Dict[Bit, Bit]] = None
self._block_duration = 0
self._current_block_idx = 0
self._conditional_block = False
self._bit_indices: Optional[Dict[Qubit, int]] = None
# Nodes that the scheduling of this node is tied to.
self._last_node_to_touch: Optional[Dict[Qubit, DAGNode]] = None
# Last node to touch a bit
self._fast_path_nodes: Set[DAGNode] = set()
self._dirty_qubits: Set[Qubit] = set()
# Qubits that are dirty in the circuit.
self._schedule_idle_qubits = schedule_idle_qubits
self._idle_qubits: Set[Qubit] = set()
super().__init__()
[docs] def run(self, dag: DAGCircuit) -> DAGCircuit:
"""Run the padding pass on ``dag``.
Args:
dag: DAG to be checked.
Returns:
DAGCircuit: DAG with idle time filled with instructions.
Raises:
TranspilerError: When a particular node is not scheduled, likely some transform pass
is inserted before this node is called.
"""
if not self._schedule_idle_qubits:
self._idle_qubits = set(
wire for wire in dag.idle_wires() if isinstance(wire, Qubit)
)
self._pre_runhook(dag)
self._init_run(dag)
# Trivial wire map at the top-level
wire_map = {wire: wire for wire in dag.wires}
# Top-level dag is the entry block
new_dag = self._visit_block(dag, wire_map)
return new_dag
def _init_run(self, dag: DAGCircuit) -> None:
"""Setup for initial run."""
self._node_start_time = self.property_set["node_start_time"].copy()
self._node_block_dags = self.property_set["node_block_dags"]
self._idle_after = {bit: 0 for bit in dag.qubits}
self._current_block_idx = 0
self._conditional_block = False
self._block_duration = 0
# Prepare DAG to pad
self._root_dag = dag
self._dag = self._empty_dag_like(dag)
self._block_dag = self._dag
self._bit_indices = {q: index for index, q in enumerate(dag.qubits)}
self._last_node_to_touch = {}
self._fast_path_nodes = set()
self._dirty_qubits = set()
self.property_set["node_start_time"].clear()
self._prev_node = None
self._wire_map = {}
def _empty_dag_like(
self,
dag: DAGCircuit,
pad_wires: bool = True,
wire_map: Optional[Dict[Qubit, Qubit]] = None,
ignore_idle: bool = False,
) -> DAGCircuit:
"""Create an empty dag like the input dag."""
new_dag = DAGCircuit()
# Ensure *all* registers are included from the input circuit
# so that they are scheduled in sub-blocks
# The top-level QuantumCircuit has the full registers available
# Control flow blocks do not get the full register added to the
# block but just the bits. When testing for equivalency the register
# information is taken into account. To work around this we try to
# while enabling generic handling of QuantumCircuits we
# add the register if available and otherwise add the bits directly.
# We need this work around as otherwise the padded circuit will
# not be equivalent to one written manually as bits will not
# be defined on registers like in the test case.
source_wire_dag = self._root_dag if pad_wires else dag
# trivial wire map if not provided, or if the top-level dag is used
if not wire_map or pad_wires:
wire_map = {wire: wire for wire in source_wire_dag.wires}
if dag.qregs and self._schedule_idle_qubits or not ignore_idle:
for qreg in source_wire_dag.qregs.values():
new_dag.add_qreg(qreg)
else:
new_dag.add_qubits(
[
wire_map[qubit]
for qubit in source_wire_dag.qubits
if qubit not in self._idle_qubits or not ignore_idle
]
)
# Don't add root cargs as these will not be padded.
# Just focus on current block dag.
if dag.cregs:
for creg in dag.cregs.values():
new_dag.add_creg(creg)
else:
new_dag.add_clbits(dag.clbits)
new_dag.name = dag.name
new_dag.metadata = dag.metadata
new_dag.unit = self.property_set["time_unit"] or "dt"
if new_dag.unit != "dt":
raise TranspilerError(
'All blocks must have time units of "dt". '
"Please run TimeUnitConversion pass prior to padding."
)
new_dag.calibrations = dag.calibrations
new_dag.global_phase = dag.global_phase
return new_dag
def _pre_runhook(self, dag: DAGCircuit) -> None:
"""Extra routine inserted before running the padding pass.
Args:
dag: DAG circuit on which the sequence is applied.
Raises:
TranspilerError: If the whole circuit or instruction is not scheduled.
"""
if "node_start_time" not in self.property_set:
raise TranspilerError(
f"The input circuit {dag.name} is not scheduled. Call one of scheduling passes "
f"before running the {self.__class__.__name__} pass."
)
def _pad(
self,
block_idx: int,
qubit: Qubit,
t_start: int,
t_end: int,
next_node: DAGNode,
prev_node: DAGNode,
) -> None:
"""Interleave instruction sequence in between two nodes.
.. note::
If a DAGOpNode is added here, it should update node_start_time property
in the property set so that the added node is also scheduled.
This is achieved by adding operation via :meth:`_apply_scheduled_op`.
.. note::
This method doesn't check if the total duration of new DAGOpNode added here
is identical to the interval (``t_end - t_start``).
A developer of the pass must guarantee this is satisfied.
If the duration is greater than the interval, your circuit may be
compiled down to the target code with extra duration on the backend compiler,
which is then played normally without error. However, the outcome of your circuit
might be unexpected due to erroneous scheduling.
Args:
block_idx: Execution block index for this node.
qubit: The wire that the sequence is applied on.
t_start: Absolute start time of this interval.
t_end: Absolute end time of this interval.
next_node: Node that follows the sequence.
prev_node: Node ahead of the sequence.
"""
raise NotImplementedError
def _get_node_duration(self, node: DAGNode) -> int:
"""Get the duration of a node."""
if node.op.condition_bits or isinstance(node.op, ControlFlowOp):
# As we cannot currently schedule through conditionals model
# as zero duration to avoid padding.
return 0
indices = [self._bit_indices[qarg] for qarg in self._map_wires(node.qargs)]
if self._block_dag.has_calibration_for(node):
# If node has calibration, this value should be the highest priority
cal_key = tuple(indices), tuple(float(p) for p in node.op.params)
duration = self._block_dag.calibrations[node.op.name][cal_key].duration
else:
duration = node.op.duration
if isinstance(duration, ParameterExpression):
raise TranspilerError(
f"Parameterized duration ({duration}) "
f"of {node.op.name} on qubits {indices} is not bounded."
)
if duration is None:
raise TranspilerError(
f"Duration of {node.op.name} on qubits {indices} is not found."
)
return duration
def _needs_block_terminating_barrier(
self, prev_node: DAGNode, curr_node: DAGNode
) -> bool:
# Only barrier if not in fast-path nodes
is_fast_path_node = curr_node in self._fast_path_nodes
def _is_terminating_barrier(node: DAGNode) -> bool:
return (
isinstance(node.op, (Barrier, ControlFlowOp))
and len(node.qargs) == self._block_dag.num_qubits()
)
return not (
prev_node is None
or (
isinstance(prev_node.op, ControlFlowOp)
and isinstance(curr_node.op, ControlFlowOp)
)
or _is_terminating_barrier(prev_node)
or _is_terminating_barrier(curr_node)
or is_fast_path_node
)
def _add_block_terminating_barrier(
self, block_idx: int, time: int, current_node: DAGNode, force: bool = False
) -> None:
"""Add a block terminating barrier to prevent topological ordering slide by.
TODO: Fix by ensuring control-flow is a block terminator in the core circuit IR.
"""
# Only add a barrier to the end if a viable barrier is not already present on all qubits
# Only barrier if not in fast-path nodes
needs_terminating_barrier = True
if not force:
needs_terminating_barrier = self._needs_block_terminating_barrier(
self._prev_node, current_node
)
if needs_terminating_barrier:
# Terminate with a barrier to ensure topological ordering does not slide past
if self._schedule_idle_qubits:
barrier = Barrier(self._block_dag.num_qubits())
qubits = self._block_dag.qubits
else:
barrier = Barrier(self._block_dag.num_qubits() - len(self._idle_qubits))
qubits = [
x for x in self._block_dag.qubits if x not in self._idle_qubits
]
barrier_node = self._apply_scheduled_op(
block_idx,
time,
barrier,
qubits,
[],
)
barrier_node.op.duration = 0
def _visit_block(
self,
block: DAGCircuit,
wire_map: Dict[Qubit, Qubit],
pad_wires: bool = True,
ignore_idle: bool = False,
) -> DAGCircuit:
# Push the previous block dag onto the stack
prev_node = self._prev_node
self._prev_node = None
prev_wire_map, self._wire_map = self._wire_map, wire_map
prev_block_dag = self._block_dag
self._block_dag = new_block_dag = self._empty_dag_like(
block, pad_wires, wire_map=wire_map, ignore_idle=ignore_idle
)
self._block_duration = 0
self._conditional_block = False
for node in block_order_op_nodes(block):
self._visit_node(node)
# Terminate the block to pad it after scheduling.
prev_block_duration = self._block_duration
prev_block_idx = self._current_block_idx
self._terminate_block(self._block_duration, self._current_block_idx)
# Edge-case: Add a barrier if the final node is a fast-path
if self._prev_node in self._fast_path_nodes:
self._add_block_terminating_barrier(
prev_block_duration, prev_block_idx, self._prev_node, force=True
)
# Pop the previous block dag off the stack restoring it
self._block_dag = prev_block_dag
self._prev_node = prev_node
self._wire_map = prev_wire_map
return new_block_dag
def _visit_node(self, node: DAGNode) -> None:
if isinstance(node.op, ControlFlowOp):
if isinstance(node.op, IfElseOp):
self._visit_if_else_op(node)
else:
self._visit_control_flow_op(node)
elif node in self._node_start_time:
if isinstance(node.op, Delay):
self._visit_delay(node)
else:
self._visit_generic(node)
else:
raise TranspilerError(
f"Operation {repr(node)} is likely added after the circuit is scheduled. "
"Schedule the circuit again if you transformed it."
)
self._prev_node = node
def _visit_if_else_op(self, node: DAGNode) -> None:
"""check if is fast-path eligible otherwise fall back
to standard ControlFlowOp handling."""
if self._will_use_fast_path(node):
self._fast_path_nodes.add(node)
self._visit_control_flow_op(node)
def _will_use_fast_path(self, node: DAGNode) -> bool:
"""Check if this conditional operation will be scheduled on the fastpath.
This will happen if
1. This operation is a direct descendent of a current measurement block to be flushed
2. The operation only operates on the qubit that is measured.
"""
# Verify IfElseOp has a direct measurement predecessor
condition_bits = node.op.condition_bits
# Fast-path valid only with a single bit.
if not condition_bits or len(condition_bits) > 1:
return False
bit = condition_bits[0]
last_node, last_node_dag = self._last_node_to_touch.get(bit, (None, None))
last_node_in_block = last_node_dag is self._block_dag
if not (
last_node_in_block
and isinstance(last_node.op, Measure)
and set(self._map_wires(node.qargs))
== set(self._map_wires(last_node.qargs))
):
return False
# Fast path contents are limited to gates and delays
for block in node.op.blocks:
if not all(
isinstance(inst.operation, (Gate, Delay)) for inst in block.data
):
return False
return True
def _visit_control_flow_op(self, node: DAGNode) -> None:
"""Visit a control-flow node to pad."""
# Control-flow terminator ends scheduling of block currently
block_idx, t0 = self._node_start_time[node] # pylint: disable=invalid-name
self._terminate_block(t0, block_idx)
self._add_block_terminating_barrier(block_idx, t0, node)
# Only pad non-fast path nodes
fast_path_node = node in self._fast_path_nodes
# TODO: This is a hack required to tie nodes of control-flow
# blocks across the scheduler and block_base_padder. This is
# because the current control flow nodes store the block as a
# circuit which is not hashable. For processing we are currently
# required to convert each circuit block to a dag which is inefficient
# and causes node relationships stored in analysis to be lost between
# passes as we are constantly recreating the block dags.
# We resolve this here by extracting the cached dag blocks that were
# stored by the scheduling pass.
new_node_block_dags = []
for block_idx, _ in enumerate(node.op.blocks):
block_dag = self._node_block_dags[node][block_idx]
inner_wire_map = {
inner: outer
for outer, inner in zip(
self._map_wires(node.qargs + node.cargs),
block_dag.qubits + block_dag.clbits,
)
}
new_node_block_dags.append(
self._visit_block(
block_dag,
pad_wires=not fast_path_node,
wire_map=inner_wire_map,
ignore_idle=True,
)
)
# Build new control-flow operation containing scheduled blocks
# and apply to the DAG.
new_control_flow_op = node.op.replace_blocks(
dag_to_circuit(block) for block in new_node_block_dags
)
# Enforce that this control-flow operation contains all wires since it has now been padded
# such that each qubit is scheduled within each block. Don't added all cargs as these will not
# be padded.
if fast_path_node:
padded_qubits = node.qargs
elif not self._schedule_idle_qubits:
padded_qubits = [
q for q in self._block_dag.qubits if q not in self._idle_qubits
]
else:
padded_qubits = self._block_dag.qubits
self._apply_scheduled_op(
block_idx,
t0,
new_control_flow_op,
padded_qubits,
self._map_wires(node.cargs),
)
def _visit_delay(self, node: DAGNode) -> None:
"""The padding class considers a delay instruction as idle time
rather than instruction. Delay node is not added so that
we can extract non-delay predecessors.
"""
block_idx, t0 = self._node_start_time[node] # pylint: disable=invalid-name
# Trigger the end of a block
if block_idx > self._current_block_idx:
self._terminate_block(self._block_duration, self._current_block_idx)
self._add_block_terminating_barrier(block_idx, t0, node)
self._conditional_block = bool(node.op.condition_bits)
self._current_block_idx = block_idx
t1 = t0 + self._get_node_duration(node) # pylint: disable=invalid-name
self._block_duration = max(self._block_duration, t1)
def _visit_generic(self, node: DAGNode) -> None:
"""Visit a generic node to pad."""
# Note: t0 is the relative time with respect to the current block specified
# by block_idx.
block_idx, t0 = self._node_start_time[node] # pylint: disable=invalid-name
# Trigger the end of a block
if block_idx > self._current_block_idx:
self._terminate_block(self._block_duration, self._current_block_idx)
self._add_block_terminating_barrier(block_idx, t0, node)
# This block will not be padded as it is conditional.
# See TODO below.
self._conditional_block = bool(node.op.condition_bits)
# Now set the current block index.
self._current_block_idx = block_idx
t1 = t0 + self._get_node_duration(node) # pylint: disable=invalid-name
self._block_duration = max(self._block_duration, t1)
for bit in self._map_wires(node.qargs):
if bit in self._idle_qubits:
continue
# Fill idle time with some sequence
if t0 - self._idle_after.get(bit, 0) > 0:
# Find previous node on the wire, i.e. always the latest node on the wire
prev_node = next(
self._block_dag.predecessors(self._block_dag.output_map[bit])
)
self._pad(
block_idx=block_idx,
qubit=bit,
t_start=self._idle_after[bit],
t_end=t0,
next_node=node,
prev_node=prev_node,
)
self._idle_after[bit] = t1
if not isinstance(node.op, (Barrier, Delay)):
self._dirty_qubits |= set(self._map_wires(node.qargs))
new_node = self._apply_scheduled_op(
block_idx,
t0,
node.op,
self._map_wires(node.qargs),
self._map_wires(node.cargs),
)
self._last_node_to_touch.update(
{
bit: (new_node, self._block_dag)
for bit in new_node.qargs + new_node.cargs
}
)
def _terminate_block(self, block_duration: int, block_idx: int) -> None:
"""Terminate the end of a block scheduling region."""
# Update all other qubits as not idle so that delays are *not*
# inserted. This is because we need the delays to be inserted in
# the conditional circuit block.
self._block_duration = 0
self._pad_until_block_end(block_duration, block_idx)
self._idle_after = {bit: 0 for bit in self._block_dag.qubits}
def _pad_until_block_end(self, block_duration: int, block_idx: int) -> None:
# Add delays until the end of circuit.
for bit in self._block_dag.qubits:
if bit in self._idle_qubits:
continue
idle_after = self._idle_after.get(bit, 0)
if block_duration - idle_after > 0:
node = self._block_dag.output_map[bit]
prev_node = next(self._block_dag.predecessors(node))
self._pad(
block_idx=block_idx,
qubit=bit,
t_start=idle_after,
t_end=block_duration,
next_node=node,
prev_node=prev_node,
)
def _apply_scheduled_op(
self,
block_idx: int,
t_start: int,
oper: Instruction,
qubits: Union[Qubit, Iterable[Qubit]],
clbits: Union[Clbit, Iterable[Clbit]] = (),
) -> DAGNode:
"""Add new operation to DAG with scheduled information.
This is identical to apply_operation_back + updating the node_start_time propety.
Args:
block_idx: Execution block index for this node.
t_start: Start time of new node.
oper: New operation that is added to the DAG circuit.
qubits: The list of qubits that the operation acts on.
clbits: The list of clbits that the operation acts on.
Returns:
The DAGNode applied to.
"""
if isinstance(qubits, Qubit):
qubits = [qubits]
if isinstance(clbits, Clbit):
clbits = [clbits]
new_node = self._block_dag.apply_operation_back(oper, qubits, clbits)
self.property_set["node_start_time"][new_node] = (block_idx, t_start)
return new_node
def _map_wires(self, wires: Iterable[Bit]) -> List[Bit]:
"""Map the wires from the current block to the top-level block's wires.
TODO: We should have an easier approach to wire mapping from the transpiler.
"""
return [self._wire_map[w] for w in wires]