diff --git a/docs/releases/changelog-dev.md b/docs/releases/changelog-dev.md index 4864edb25..b5dfd90f2 100644 --- a/docs/releases/changelog-dev.md +++ b/docs/releases/changelog-dev.md @@ -233,6 +233,10 @@ - Added a `save_plot=True` parameter to the `plotS21()` method of `ExperimentResults`. When enabled (default: True), the plot is automatically saved in the same directory as the experiment results. [#819](https://github.com/qilimanjaro-tech/qililab/pull/819) +- Improved the transpiler, by making it more modular, and adding a `gate_cancellation()` stage before the transpilation to natives, this stage can be skipped, together with the old `optimize_transpilation()`, if the flag `optimize=False` is passed. + +[#823](https://github.com/qilimanjaro-tech/qililab/pull/823) + ### Breaking changes - Renamed the platform's `execute_anneal_program()` method to `execute_annealing_program()` and updated its parameters. The method now expects `preparation_block` and `measurement_block`, which are strings used to retrieve blocks from the `Calibration`. These blocks are inserted before and after the annealing schedule, respectively. diff --git a/src/qililab/digital/circuit_optimizer.py b/src/qililab/digital/circuit_optimizer.py new file mode 100644 index 000000000..38a44535c --- /dev/null +++ b/src/qililab/digital/circuit_optimizer.py @@ -0,0 +1,263 @@ +# Copyright 2023 Qilimanjaro Quantum Tech +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CircuitOptimizer class""" + +from copy import deepcopy + +from qibo import Circuit, gates + +from qililab import digital +from qililab.settings.digital.digital_compilation_settings import DigitalCompilationSettings + +from .native_gates import Drag + + +class CircuitOptimizer: + """Optimizes a circuit, cancelling redundant gates.""" + + def __init__(self, digital_compilation_settings: DigitalCompilationSettings): # type: ignore # ignore typing to avoid importing platform and causing circular imports + self.digital_compilation_settings = digital_compilation_settings + + @classmethod + def run_gate_cancellations(cls, circuit: Circuit) -> Circuit: + """Main method to run the gate cancellations. Currently only consists of cancelling pairs of hermitian gates. + + Can/Might be extended in the future to include more complex gate cancellations. + + Args: + circuit (Circuit): circuit to optimize. + + Returns: + Circuit: optimized circuit. + """ + return cls.cancel_pairs_of_hermitian_gates(circuit) + + @classmethod + def cancel_pairs_of_hermitian_gates(cls, circuit: Circuit) -> Circuit: + """Optimizes circuit by cancelling adjacent hermitian gates. + + Args: + circuit (Circuit): circuit to optimize. + + Returns: + Circuit: optimized circuit. + """ + # Initial and final circuit gates lists, from which to, one by one, after checks, pass non-cancelled gates: + circ_list: list[tuple] = cls._get_circuit_gates(circuit) + + # We want to do the sweep circuit cancelling gates least once always: + previous_circ_list = deepcopy(circ_list) + output_circ_list = cls._sweep_circuit_cancelling_pairs_of_hermitian_gates(circ_list) + + # And then keep iterating, sweeping over the circuit (cancelling gates) each time, until there is full sweep without any cancellations: + while output_circ_list != previous_circ_list: + previous_circ_list = deepcopy(output_circ_list) + output_circ_list = cls._sweep_circuit_cancelling_pairs_of_hermitian_gates(output_circ_list) + + # Create optimized circuit, from the obtained non-cancelled list: + return cls._create_circuit(output_circ_list, circuit.nqubits) + + def optimize_transpilation(self, circuit: Circuit) -> list[gates.Gate]: + """Optimizes transpiled circuit by applying virtual Z gates. + + This is done by moving all RZ to the left of all operators as a single RZ. The corresponding cumulative rotation + from each RZ is carried on as phase in all drag pulses left of the RZ operator. + + Virtual Z gates are also applied to correct phase errors from CZ gates. + + The final RZ operator left to be applied as the last operator in the circuit can afterwards be removed since the last + operation is going to be a measurement, which is performed on the Z basis and is therefore invariant under rotations + around the Z axis. + + This last step can also be seen from the fact that an RZ operator applied on a single qubit, with no operations carried + on afterwards induces a phase rotation. Since phase is an imaginary unitary component, its absolute value will be 1 + independent on any (unitary) operations carried on it. + + Mind that moving an operator to the left is equivalent to applying this operator last so + it is actually moved to the _right_ of ``Circuit.queue`` (last element of list). + + For more information on virtual Z gates, see https://arxiv.org/abs/1612.00858 + + Args: + circuit (Circuit): circuit with native gates, to optimize. + + Returns: + list[gates.Gate] : list of re-ordered gates + """ + nqubits: int = circuit.nqubits + ngates: list[gates.Gate] = circuit.queue + + supported_gates = ["rz", "drag", "cz", "wait", "measure"] + new_gates = [] + shift = dict.fromkeys(range(nqubits), 0) + for gate in ngates: + if gate.name not in supported_gates: + raise NotImplementedError(f"{gate.name} not part of native supported gates {supported_gates}") + if isinstance(gate, gates.RZ): + shift[gate.qubits[0]] += gate.parameters[0] + # add CZ phase correction + elif isinstance(gate, gates.CZ): + gate_settings = self.digital_compilation_settings.get_gate( + name=gate.__class__.__name__, qubits=gate.qubits + ) + control_qubit, target_qubit = gate.qubits + corrections = next( + ( + event.pulse.options + for event in gate_settings + if ( + event.pulse.options is not None + and f"q{control_qubit}_phase_correction" in event.pulse.options + ) + ), + None, + ) + if corrections is not None: + shift[control_qubit] += corrections[f"q{control_qubit}_phase_correction"] + shift[target_qubit] += corrections[f"q{target_qubit}_phase_correction"] + new_gates.append(gate) + else: + # if gate is drag pulse, shift parameters by accumulated Zs + if isinstance(gate, Drag): + # create new drag pulse rather than modify parameters of the old one + gate = Drag(gate.qubits[0], gate.parameters[0], gate.parameters[1] - shift[gate.qubits[0]]) + + # append gate to optimized list + new_gates.append(gate) + + return new_gates + + @staticmethod + def _get_circuit_gates(circuit: Circuit) -> list[tuple]: + """Get the gates of the circuit. + + Args: + circuit (qibo.models.Circuit): Circuit to get the gates from. + + Returns: + list[tuple]: List of gates in the circuit. Where each gate is a tuple of ('name', 'init_args', 'initi_kwargs'). + """ + return [(type(gate).__name__, gate.init_args, gate.init_kwargs) for gate in circuit.queue] + + @staticmethod + def _create_gate(gate_class: str, gate_args: list | int, gate_kwargs: dict) -> gates.Gate: + """Converts a tuple representation of qibo gate (name, qubits) into a Gate object. + + Args: + gate_class (str): The class name of the gate. Can be any Qibo or Qililab supported class. + gate_args (list | int): The qubits the gate acts on. + gate_kwargs (dict): The kwargs of the gate. + + Returns: + gates.Gate: The qibo Gate object. + """ + # Solve Identity gate, argument int issue: + gate_args = [gate_args] if isinstance(gate_args, int) else gate_args + + return ( + getattr(digital, gate_class)(*gate_args, **gate_kwargs) + if gate_class in {"Drag", "Wait"} + else getattr(gates, gate_class)(*gate_args, **gate_kwargs) + ) + + @classmethod + def _create_circuit(cls, gates_list: list[tuple], nqubits: int) -> Circuit: + """Converts a list of gates (name, qubits) into a qibo Circuit object. + + Args: + gates_list (list[tuple]): List of gates in the circuit. Where each gate is a tuple of ('name', 'init_args', 'initi_kwargs') + nqubits (int): Number of qubits in the circuit. + + Returns: + Circuit: The qibo Circuit object. + """ + # Create optimized circuit, from the obtained non-cancelled list: + output_circuit = Circuit(nqubits) + for gate, gate_args, gate_kwargs in gates_list: + qibo_gate = cls._create_gate(gate, gate_args, gate_kwargs) + output_circuit.add(qibo_gate) + + return output_circuit + + @classmethod + def _sweep_circuit_cancelling_pairs_of_hermitian_gates(cls, circ_list: list[tuple]) -> list[tuple]: + """Cancels adjacent gates in a circuit. + + Args: + circ_list (list[tuple]): List of gates in the circuit. Where each gate is a tuple of ('name', 'init_args', 'initi_kwargs') + + Returns: + list[tuple]: List of gates in the circuit, after cancelling adjacent gates. Where each gate is a tuple of ('name', 'init_args', 'initi_kwargs') + """ + # List of gates, that are available for cancellation: + hermitian_gates: list = ["H", "X", "Y", "Z", "CNOT", "CZ", "SWAP"] + + output_circ_list: list[tuple] = [] + + while circ_list: # If original circuit list, is empty or has one gate remaining, we are done: + if len(circ_list) == 1: + output_circ_list.append(circ_list[0]) + break + + # Gate of the original circuit, to find a match for: + gate, gate_args, gate_kwargs = circ_list.pop(0) + gate_qubits = cls._extract_qubits(gate_args) # Assuming qubits are the first two args + + # If gate is not hermitian (can't be cancelled), add it to the output circuit and continue: + if gate not in hermitian_gates: + output_circ_list.append((gate, gate_args, gate_kwargs)) + continue + + subend = False + for i in range(len(circ_list)): + # Next gates, to compare the original with: + comp_gate, comp_args, comp_kwargs = circ_list[i] + comp_qubits = cls._extract_qubits(comp_args) # Assuming qubits are the first two args + + # Simplify duplication, if same gate and qubits found, without any other in between: + if gate == comp_gate and gate_args == comp_args and gate_kwargs == comp_kwargs: + circ_list.pop(i) + break + + # Add gate, if there is no other gate that acts on the same qubits: + if i == len(circ_list) - 1: + output_circ_list.append((gate, gate_args, gate_kwargs)) + break + + # Add gate and leave comparison_gate loop, if we find a gate in common qubit, that prevents contraction: + for gate_qubit in gate_qubits: + if gate_qubit in comp_qubits: + output_circ_list.append((gate, gate_args, gate_kwargs)) + subend = True + break + if subend: + break + + return output_circ_list + + @staticmethod + def _extract_qubits(gate_args: list | int) -> list: + """Extract qubits from gate_args. + + Args: + gate_args (list | int): The arguments of the gate. + + Returns: + list: The qubits of the gate in an iterable. + """ + # Assuming qubits are the first one or two args: + if isinstance(gate_args, int): + return [gate_args] + return gate_args if len(gate_args) <= 2 else gate_args[:2] diff --git a/src/qililab/digital/circuit_to_pulses.py b/src/qililab/digital/circuit_to_pulses.py new file mode 100644 index 000000000..813cb2a48 --- /dev/null +++ b/src/qililab/digital/circuit_to_pulses.py @@ -0,0 +1,275 @@ +# Copyright 2023 Qilimanjaro Quantum Tech +# +# Licensed under the Apache License, Version 2.0 (the "License"); +# you may not use this file except in compliance with the License. +# You may obtain a copy of the License at +# +# http://www.apache.org/licenses/LICENSE-2.0 +# +# Unless required by applicable law or agreed to in writing, software +# distributed under the License is distributed on an "AS IS" BASIS, +# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +# See the License for the specific language governing permissions and +# limitations under the License. + +"""CircuitToPulses class""" + +from dataclasses import asdict + +import numpy as np +from qibo import gates +from qibo.models import Circuit + +from qililab.constants import RUNCARD +from qililab.pulse.pulse import Pulse +from qililab.pulse.pulse_event import PulseEvent +from qililab.pulse.pulse_schedule import PulseSchedule +from qililab.settings.digital.digital_compilation_settings import DigitalCompilationSettings +from qililab.settings.digital.gate_event_settings import GateEventSettings +from qililab.typings.enums import Line +from qililab.utils import Factory + +from .native_gates import Drag, Wait + + +class CircuitToPulses: + """Translates circuits into pulse sequences.""" + + def __init__(self, digital_compilation_settings: DigitalCompilationSettings): # type: ignore # ignore typing to avoid importing platform and causing circular imports + self.digital_compilation_settings = digital_compilation_settings + + def run(self, circuit: Circuit) -> PulseSchedule: + """Translates a circuit into a pulse sequences. + + For each circuit gate we look up for its corresponding gates settings in the runcard (the name of the class of the circuit + gate and the name of the gate in the runcard should match) and load its schedule of GateEvents. + + Each gate event corresponds to a concrete pulse applied at a certain time w.r.t the gate's start time and through a specific bus + (see gates settings docstrings for more details). + + Measurement gates are handled in a slightly different manner. For a circuit gate M(0,1,2) the settings for each M(0), M(1), M(2) + will be looked up and will be applied in sync. Note that thus a circuit gate for M(0,1,2) is different from the circuit sequence + M(0)M(1)M(2) since the later will not be necessarily applied at the same time for all the qubits involved. + + Times for each qubit are kept track of with the dictionary `time`. + + The times at which each pulse is applied are padded if they are not multiples of the minimum clock time. This means that if min clock + time is 4 and a pulse applied to qubit k lasts 17ns, the next pulse at qubit k will be at t=20ns + + Args: + circuits (List[Circuit]): List of Qibo Circuit classes. + + Returns: + list[PulseSequences]: List of :class:`PulseSequences` classes. + """ + + pulse_schedule: PulseSchedule = PulseSchedule() + time: dict[int, int] = {} # init/restart time + for gate in circuit.queue: + # handle wait gates + if isinstance(gate, Wait): + self._update_time(time=time, qubit=gate.qubits[0], gate_time=gate.parameters[0]) + continue + + # Measurement gates need to be handled on their own because qibo allows to define + # an M gate as eg. gates.M(*range(5)) + if isinstance(gate, gates.M): + gate_schedule = [] + gate_qubits = gate.qubits + for qubit in gate_qubits: + gate_schedule += self._gate_schedule_from_settings(gates.M(qubit)) + + # handle control gates + else: + # extract gate schedule + gate_schedule = self._gate_schedule_from_settings(gate) + gate_qubits = self._get_gate_qubits(gate, gate_schedule) + + # process gate_schedule to pulses for both M and control gates + # get total duration for the gate + gate_time = self._get_total_schedule_duration(gate_schedule) + # update time, start time is that of the qubit most ahead in time + start_time = 0 + for qubit in gate_qubits: + start_time = max(self._update_time(time=time, qubit=qubit, gate_time=gate_time), start_time) + # sync gate end time + self._sync_qubit_times(gate_qubits, time=time) + # apply gate schedule + for gate_event in gate_schedule: + # add control gate schedule + pulse_event = self._gate_element_to_pulse_event(time=start_time, gate=gate, gate_event=gate_event) + # pop first qubit from gate if it is measurement + # this is so that the target qubit for multiM gates is every qubit in the M gate + if isinstance(gate, gates.M): + gate = gates.M(*gate.qubits[1:]) + # add event + delay = self.digital_compilation_settings.buses[gate_event.bus].delay + pulse_schedule.add_event(pulse_event=pulse_event, bus_alias=gate_event.bus, delay=delay) # type: ignore + + for bus_alias in self.digital_compilation_settings.buses: + # If we find a flux port, create empty schedule for that port. + # This is needed because for Qblox instrument working in flux buses as DC sources, if we don't + # add an empty schedule its offsets won't be activated and the results will be misleading. + if self.digital_compilation_settings.buses[bus_alias].line == Line.FLUX: + pulse_schedule.create_schedule(bus_alias=bus_alias) + + return pulse_schedule + + def _gate_schedule_from_settings(self, gate: gates.Gate) -> list[GateEventSettings]: + """Gets the gate schedule. The gate schedule is the list of pulses to apply + to a given bus for a given gate + + Args: + gate (Gate): Qibo gate + + Returns: + list[GateEventSettings]: schedule list with each of the pulses settings + """ + + gate_schedule = self.digital_compilation_settings.get_gate(name=gate.__class__.__name__, qubits=gate.qubits) + + if not isinstance(gate, Drag): + return gate_schedule + + # drag gates are currently the only parametric gates we are handling and they are handled here + if len(gate_schedule) > 1: + raise ValueError( + f"Schedule for the drag gate is expected to have only 1 pulse but instead found {len(gate_schedule)} pulses" + ) + drag_schedule = GateEventSettings( + **asdict(gate_schedule[0]) + ) # make new object so that gate_schedule is not overwritten + theta = self._normalize_angle(angle=gate.parameters[0]) + amplitude = drag_schedule.pulse.amplitude * theta / np.pi + phase = self._normalize_angle(angle=gate.parameters[1]) + if amplitude < 0: + amplitude = -amplitude + phase = self._normalize_angle(angle=gate.parameters[1] + np.pi) + drag_schedule.pulse.amplitude = amplitude + drag_schedule.pulse.phase = phase + return [drag_schedule] + + @staticmethod + def _normalize_angle(angle: float): + """Normalizes angle in range [-pi, pi]. + + Args: + angle (float): Normalized angle. + """ + angle %= 2 * np.pi + if angle > np.pi: + angle -= 2 * np.pi + return angle + + @staticmethod + def _get_total_schedule_duration(schedule: list[GateEventSettings]) -> int: + """Returns total time for a gate schedule. This is done by taking the max of (init + duration) + for all the elements in the schedule + + Args: + schedule (list[CircuitPulseSettings]): Schedule of pulses to apply + + Returns: + int: Total gate time + """ + time = 0 + for schedule_element in schedule: + time = max(time, schedule_element.pulse.duration + schedule_element.wait_time) + return time + + def _get_gate_qubits(self, gate: gates.Gate, schedule: list[GateEventSettings] | None = None) -> tuple[int, ...]: + """Gets qubits involved in gate. This includes gate.qubits but also qubits which are targets of + buses in the gate schedule + + Args: + schedule (list[CircuitPulseSettings]): Gate schedule + + Returns: + list[int]: list of qubits + """ + + schedule_qubits = ( + [ + qubit + for schedule_element in schedule + for qubit in self.digital_compilation_settings.buses[schedule_element.bus].qubits + if schedule_element.bus in self.digital_compilation_settings.buses + ] + if schedule is not None + else [] + ) + + gate_qubits = list(gate.qubits) + + return tuple(set(schedule_qubits + gate_qubits)) # convert to set and back to list to remove repeated items + + def _gate_element_to_pulse_event(self, time: int, gate: gates.Gate, gate_event: GateEventSettings) -> PulseEvent: + """Translates a gate element into a pulse. + + Args: + time (dict[int, int]): dictionary containing qubit indices as keys and current time (ns) as values + gate (gate): circuit gate. This is used only to know the qubit target of measurement gates + gate_event (GateEventSettings): gate event, a single element of a gate schedule containing information + about the pulse to be applied + bus (bus): bus through which the pulse is sent + + Returns: + PulseEvent: pulse event corresponding to the input gate event + """ + + # copy to avoid modifying runcard settings + pulse = gate_event.pulse + pulse_shape_copy = pulse.shape.copy() + pulse_shape = Factory.get(pulse_shape_copy.pop(RUNCARD.NAME))(**pulse_shape_copy) + + # handle measurement gates and target qubits for control gates which might have multi-qubit schedules + bus = self.digital_compilation_settings.buses[gate_event.bus] + qubit = ( + gate.qubits[0] + if isinstance(gate, gates.M) + else next((qubit for qubit in bus.qubits), None) + if bus is not None + else None + ) + + return PulseEvent( + pulse=Pulse( + amplitude=pulse.amplitude, + phase=pulse.phase, + duration=pulse.duration, + frequency=0, + pulse_shape=pulse_shape, + ), + start_time=time + gate_event.wait_time + self.digital_compilation_settings.delay_before_readout, + pulse_distortions=bus.distortions, + qubit=qubit, + ) + + def _update_time(self, time: dict[int, int], qubit: int, gate_time: int): + """Creates new timeline if not already created and update time. + + Args: + time (Dict[int, int]): Dictionary with the time of each qubit. + qubit_idx (int): qubit index + gate_time (int): total duration of the gate + """ + if qubit not in time: + time[qubit] = 0 + old_time = time[qubit] + residue = (gate_time) % self.digital_compilation_settings.minimum_clock_time + if residue != 0: + gate_time += self.digital_compilation_settings.minimum_clock_time - residue + time[qubit] += gate_time + return old_time + + @staticmethod + def _sync_qubit_times(qubits: list[int], time: dict[int, int]): + """Syncs the time of the given qubit list + + Args: + qubits (list[int]): qubits to sync + time (dict[int, int]): time dictionary + """ + max_time = max((time[qubit] for qubit in qubits if qubit in time), default=0) + for qubit in qubits: + time[qubit] = max_time diff --git a/src/qililab/digital/circuit_transpiler.py b/src/qililab/digital/circuit_transpiler.py index 7249767c5..b3b379877 100644 --- a/src/qililab/digital/circuit_transpiler.py +++ b/src/qililab/digital/circuit_transpiler.py @@ -14,28 +14,19 @@ """Circuit Transpiler class""" -from dataclasses import asdict - import networkx as nx -import numpy as np -from qibo import gates from qibo.models import Circuit from qibo.transpiler.placer import Placer from qibo.transpiler.router import Router from qililab.config import logger -from qililab.constants import RUNCARD +from qililab.digital.circuit_optimizer import CircuitOptimizer from qililab.digital.circuit_router import CircuitRouter -from qililab.pulse.pulse import Pulse -from qililab.pulse.pulse_event import PulseEvent +from qililab.digital.circuit_to_pulses import CircuitToPulses from qililab.pulse.pulse_schedule import PulseSchedule from qililab.settings.digital.digital_compilation_settings import DigitalCompilationSettings -from qililab.settings.digital.gate_event_settings import GateEventSettings -from qililab.typings.enums import Line -from qililab.utils import Factory from .gate_decompositions import translate_gates -from .native_gates import Drag, Wait class CircuitTranspiler: @@ -58,6 +49,7 @@ def transpile_circuits( placer: Placer | type[Placer] | tuple[type[Placer], dict] | None = None, router: Router | type[Router] | tuple[type[Router], dict] | None = None, routing_iterations: int = 10, + optimize: bool = True, ) -> tuple[list[PulseSchedule], list[dict]]: """Transpiles a list of ``qibo.models.Circuit`` to a list of pulse schedules. @@ -90,15 +82,18 @@ def transpile_circuits( # Create transpiler: transpiler = CircuitTranspiler(platform) - Now we can transpile like: + Now we can transpile like, in the following examples: .. code-block:: python - # Default Transpile: - pulse_schedule, final_layouts = transpiler.transpile_circuit([c]) # Defaults to ReverseTraversal, Sabre + # Default Transpilation (with ReverseTraversal, Sabre, platform's connectivity and optimize = True): + routed_circuit, final_layouts = transpiler.transpile_circuits([c]) + + # Or another case, not doing optimization for some reason, and with Non-Default placer and router: + routed_circuit, final_layout = transpiler.transpile_circuits([c], placer=Trivial, router=Sabre, optimize=False) - # Non-Default Trivial placer, and Default Router, but with its kwargs specified: - pulse_sched, final_layouts = transpiler.transpile_circuit([c], placer=Trivial, router=(Sabre, {"lookahead": 2})) + # Or also specifying the `router` with kwargs: + routed_circuit, final_layouts = transpiler.transpile_circuits([c], router=(Sabre, {"lookahead": 2})) Args: circuits (list[Circuit]): list of qibo circuits. @@ -107,17 +102,33 @@ def transpile_circuits( router (Router | type[Router] | tuple[type[Router], dict], optional): `Router` instance, or subclass `type[Router]` to use, with optionally, its kwargs dict (other than connectivity), both in a tuple. Defaults to `Sabre`. routing_iterations (int, optional): Number of times to repeat the routing pipeline, to get the best stochastic result. Defaults to 10. + optimize (bool, optional): whether to optimize the circuit and/or transpilation. Defaults to True. Returns: tuple[list[PulseSchedule],list[dict[str, int]]]: list of pulse schedules and list of the final layouts of the qubits, in each circuit {"qI": J}. """ + + # Routing stage; routed_circuits, final_layouts = zip( *(self.route_circuit(circuit, placer, router, iterations=routing_iterations) for circuit in circuits) ) logger.info(f"Circuits final layouts: {final_layouts}") + # Optimze qibo gates, cancellating redundant gates, stage: + if optimize: + routed_circuits = tuple(self.optimize_circuit(circuit) for circuit in routed_circuits) + + # Unroll to Natives stage: native_circuits = (self.circuit_to_native(circuit) for circuit in routed_circuits) - return self.circuit_to_pulses(list(native_circuits)), list(final_layouts) + + # Optimize native gates, optimize transpilation stage: + if optimize: + native_circuits = (self.optimize_transpilation(circuit) for circuit in native_circuits) + + # Pulse schedule stage: + pulse_schedules = self.circuit_to_pulses(list(native_circuits)) + + return pulse_schedules, list(final_layouts) def route_circuit( self, @@ -192,28 +203,34 @@ def route_circuit( return circuit_router.route(circuit, iterations) - def circuit_to_native(self, circuit: Circuit, optimize: bool = True) -> Circuit: + def optimize_circuit(self, circuit: Circuit) -> Circuit: + """Main function to optimize circuits with. Currently works by cancelling adjacent hermitian gates. + + The total optimization can/might be expanded in the future. + + Args: + circuit (Circuit): circuit to optimize. + + Returns: + Circuit: optimized circuit. + """ + return CircuitOptimizer.run_gate_cancellations(circuit) + + def circuit_to_native(self, circuit: Circuit) -> Circuit: """Converts circuit with qibo gates to circuit with native gates Args: - circuit (Circuit): circuit with qibo gates - optimize (bool): optimize the transpiled circuit using otpimize_transpilation + circuit (Circuit): circuit with qibo gate. Returns: new_circuit (Circuit): circuit with transpiled gates """ - # init new circuit new_circuit = Circuit(circuit.nqubits) - # add transpiled gates to new circuit, optimize if needed - if optimize: - gates_to_optimize = translate_gates(circuit.queue) - new_circuit.add(self.optimize_transpilation(circuit.nqubits, ngates=gates_to_optimize)) - else: - new_circuit.add(translate_gates(circuit.queue)) + new_circuit.add(translate_gates(circuit.queue)) return new_circuit - def optimize_transpilation(self, nqubits: int, ngates: list[gates.Gate]) -> list[gates.Gate]: + def optimize_transpilation(self, circuit: Circuit) -> Circuit: """Optimizes transpiled circuit by applying virtual Z gates. This is done by moving all RZ to the left of all operators as a single RZ. The corresponding cumulative rotation @@ -235,56 +252,23 @@ def optimize_transpilation(self, nqubits: int, ngates: list[gates.Gate]) -> list For more information on virtual Z gates, see https://arxiv.org/abs/1612.00858 Args: - nqubits (int) : number of qubits in the circuit - ngates (list[gates.Gate]) : list of gates in the circuit + circuit (Circuit): circuit with native gates, to optimize. Returns: - list[gates.Gate] : list of re-ordered gates + Circuit: Circuit with optimized transpiled gates. """ - supported_gates = ["rz", "drag", "cz", "wait", "measure"] - new_gates = [] - shift = dict.fromkeys(range(nqubits), 0) - for gate in ngates: - if gate.name not in supported_gates: - raise NotImplementedError(f"{gate.name} not part of native supported gates {supported_gates}") - if isinstance(gate, gates.RZ): - shift[gate.qubits[0]] += gate.parameters[0] - # add CZ phase correction - elif isinstance(gate, gates.CZ): - gate_settings = self.digital_compilation_settings.get_gate( - name=gate.__class__.__name__, qubits=gate.qubits - ) - control_qubit, target_qubit = gate.qubits - corrections = next( - ( - event.pulse.options - for event in gate_settings - if ( - event.pulse.options is not None - and f"q{control_qubit}_phase_correction" in event.pulse.options - ) - ), - None, - ) - if corrections is not None: - shift[control_qubit] += corrections[f"q{control_qubit}_phase_correction"] - shift[target_qubit] += corrections[f"q{target_qubit}_phase_correction"] - new_gates.append(gate) - else: - # if gate is drag pulse, shift parameters by accumulated Zs - if isinstance(gate, Drag): - # create new drag pulse rather than modify parameters of the old one - gate = Drag(gate.qubits[0], gate.parameters[0], gate.parameters[1] - shift[gate.qubits[0]]) - - # append gate to optimized list - new_gates.append(gate) - - return new_gates + optimizer = CircuitOptimizer(self.digital_compilation_settings) + + output_circuit = Circuit(circuit.nqubits) + output_circuit.add(optimizer.optimize_transpilation(circuit)) + return output_circuit def circuit_to_pulses(self, circuits: list[Circuit]) -> list[PulseSchedule]: - """Translates a list of circuits into a list of pulse sequences (each circuit to an independent pulse sequence) + """Translates a list of circuits into a list of pulse sequences (each circuit to an independent pulse sequence). + For each circuit gate we look up for its corresponding gates settings in the runcard (the name of the class of the circuit gate and the name of the gate in the runcard should match) and load its schedule of GateEvents. + Each gate event corresponds to a concrete pulse applied at a certain time w.r.t the gate's start time and through a specific bus (see gates settings docstrings for more details). @@ -293,6 +277,7 @@ def circuit_to_pulses(self, circuits: list[Circuit]) -> list[PulseSchedule]: M(0)M(1)M(2) since the later will not be necessarily applied at the same time for all the qubits involved. Times for each qubit are kept track of with the dictionary `time`. + The times at which each pulse is applied are padded if they are not multiples of the minimum clock time. This means that if min clock time is 4 and a pulse applied to qubit k lasts 17ns, the next pulse at qubit k will be at t=20ns @@ -302,215 +287,5 @@ def circuit_to_pulses(self, circuits: list[Circuit]) -> list[PulseSchedule]: Returns: list[PulseSequences]: List of :class:`PulseSequences` classes. """ - - pulse_schedule_list: list[PulseSchedule] = [] - for circuit in circuits: - pulse_schedule = PulseSchedule() - time: dict[int, int] = {} # init/restart time - for gate in circuit.queue: - # handle wait gates - if isinstance(gate, Wait): - self._update_time(time=time, qubit=gate.qubits[0], gate_time=gate.parameters[0]) - continue - - # Measurement gates need to be handled on their own because qibo allows to define - # an M gate as eg. gates.M(*range(5)) - if isinstance(gate, gates.M): - gate_schedule = [] - gate_qubits = gate.qubits - for qubit in gate_qubits: - gate_schedule += self._gate_schedule_from_settings(gates.M(qubit)) - - # handle control gates - else: - # extract gate schedule - gate_schedule = self._gate_schedule_from_settings(gate) - gate_qubits = self._get_gate_qubits(gate, gate_schedule) - - # process gate_schedule to pulses for both M and control gates - # get total duration for the gate - gate_time = self._get_total_schedule_duration(gate_schedule) - # update time, start time is that of the qubit most ahead in time - start_time = 0 - for qubit in gate_qubits: - start_time = max(self._update_time(time=time, qubit=qubit, gate_time=gate_time), start_time) - # sync gate end time - self._sync_qubit_times(gate_qubits, time=time) - # apply gate schedule - for gate_event in gate_schedule: - # add control gate schedule - pulse_event = self._gate_element_to_pulse_event(time=start_time, gate=gate, gate_event=gate_event) - # pop first qubit from gate if it is measurement - # this is so that the target qubit for multiM gates is every qubit in the M gate - if isinstance(gate, gates.M): - gate = gates.M(*gate.qubits[1:]) - # add event - delay = self.digital_compilation_settings.buses[gate_event.bus].delay - pulse_schedule.add_event(pulse_event=pulse_event, bus_alias=gate_event.bus, delay=delay) # type: ignore - - for bus_alias in self.digital_compilation_settings.buses: - # If we find a flux port, create empty schedule for that port. - # This is needed because for Qblox instrument working in flux buses as DC sources, if we don't - # add an empty schedule its offsets won't be activated and the results will be misleading. - if self.digital_compilation_settings.buses[bus_alias].line == Line.FLUX: - pulse_schedule.create_schedule(bus_alias=bus_alias) - - pulse_schedule_list.append(pulse_schedule) - - return pulse_schedule_list - - def _gate_schedule_from_settings(self, gate: gates.Gate) -> list[GateEventSettings]: - """Gets the gate schedule. The gate schedule is the list of pulses to apply - to a given bus for a given gate - - Args: - gate (Gate): Qibo gate - - Returns: - list[GateEventSettings]: schedule list with each of the pulses settings - """ - - gate_schedule = self.digital_compilation_settings.get_gate(name=gate.__class__.__name__, qubits=gate.qubits) - - if not isinstance(gate, Drag): - return gate_schedule - - # drag gates are currently the only parametric gates we are handling and they are handled here - if len(gate_schedule) > 1: - raise ValueError( - f"Schedule for the drag gate is expected to have only 1 pulse but instead found {len(gate_schedule)} pulses" - ) - drag_schedule = GateEventSettings( - **asdict(gate_schedule[0]) - ) # make new object so that gate_schedule is not overwritten - theta = self._normalize_angle(angle=gate.parameters[0]) - amplitude = drag_schedule.pulse.amplitude * theta / np.pi - phase = self._normalize_angle(angle=gate.parameters[1]) - if amplitude < 0: - amplitude = -amplitude - phase = self._normalize_angle(angle=gate.parameters[1] + np.pi) - drag_schedule.pulse.amplitude = amplitude - drag_schedule.pulse.phase = phase - return [drag_schedule] - - def _normalize_angle(self, angle: float): - """Normalizes angle in range [-pi, pi]. - - Args: - angle (float): Normalized angle. - """ - angle %= 2 * np.pi - if angle > np.pi: - angle -= 2 * np.pi - return angle - - def _get_total_schedule_duration(self, schedule: list[GateEventSettings]) -> int: - """Returns total time for a gate schedule. This is done by taking the max of (init + duration) - for all the elements in the schedule - - Args: - schedule (list[CircuitPulseSettings]): Schedule of pulses to apply - - Returns: - int: Total gate time - """ - time = 0 - for schedule_element in schedule: - time = max(time, schedule_element.pulse.duration + schedule_element.wait_time) - return time - - def _get_gate_qubits(self, gate: gates.Gate, schedule: list[GateEventSettings] | None = None) -> tuple[int, ...]: - """Gets qubits involved in gate. This includes gate.qubits but also qubits which are targets of - buses in the gate schedule - - Args: - schedule (list[CircuitPulseSettings]): Gate schedule - - Returns: - list[int]: list of qubits - """ - - schedule_qubits = ( - [ - qubit - for schedule_element in schedule - for qubit in self.digital_compilation_settings.buses[schedule_element.bus].qubits - if schedule_element.bus in self.digital_compilation_settings.buses - ] - if schedule is not None - else [] - ) - - gate_qubits = list(gate.qubits) - - return tuple(set(schedule_qubits + gate_qubits)) # convert to set and back to list to remove repeated items - - def _gate_element_to_pulse_event(self, time: int, gate: gates.Gate, gate_event: GateEventSettings) -> PulseEvent: - """Translates a gate element into a pulse. - - Args: - time (dict[int, int]): dictionary containing qubit indices as keys and current time (ns) as values - gate (gate): circuit gate. This is used only to know the qubit target of measurement gates - gate_event (GateEventSettings): gate event, a single element of a gate schedule containing information - about the pulse to be applied - bus (bus): bus through which the pulse is sent - - Returns: - PulseEvent: pulse event corresponding to the input gate event - """ - - # copy to avoid modifying runcard settings - pulse = gate_event.pulse - pulse_shape_copy = pulse.shape.copy() - pulse_shape = Factory.get(pulse_shape_copy.pop(RUNCARD.NAME))(**pulse_shape_copy) - - # handle measurement gates and target qubits for control gates which might have multi-qubit schedules - bus = self.digital_compilation_settings.buses[gate_event.bus] - qubit = ( - gate.qubits[0] - if isinstance(gate, gates.M) - else next((qubit for qubit in bus.qubits), None) - if bus is not None - else None - ) - - return PulseEvent( - pulse=Pulse( - amplitude=pulse.amplitude, - phase=pulse.phase, - duration=pulse.duration, - frequency=0, - pulse_shape=pulse_shape, - ), - start_time=time + gate_event.wait_time + self.digital_compilation_settings.delay_before_readout, - pulse_distortions=bus.distortions, - qubit=qubit, - ) - - def _update_time(self, time: dict[int, int], qubit: int, gate_time: int): - """Creates new timeline if not already created and update time. - - Args: - time (Dict[int, int]): Dictionary with the time of each qubit. - qubit_idx (int): qubit index - gate_time (int): total duration of the gate - """ - if qubit not in time: - time[qubit] = 0 - old_time = time[qubit] - residue = (gate_time) % self.digital_compilation_settings.minimum_clock_time - if residue != 0: - gate_time += self.digital_compilation_settings.minimum_clock_time - residue - time[qubit] += gate_time - return old_time - - def _sync_qubit_times(self, qubits: list[int], time: dict[int, int]): - """Syncs the time of the given qubit list - - Args: - qubits (list[int]): qubits to sync - time (dict[int, int]): time dictionary - """ - max_time = max((time[qubit] for qubit in qubits if qubit in time), default=0) - for qubit in qubits: - time[qubit] = max_time + circuit_to_pulses = CircuitToPulses(self.digital_compilation_settings) + return [circuit_to_pulses.run(circuit) for circuit in circuits] diff --git a/src/qililab/execute_circuit.py b/src/qililab/execute_circuit.py index f050826f7..cd28855b2 100644 --- a/src/qililab/execute_circuit.py +++ b/src/qililab/execute_circuit.py @@ -31,6 +31,7 @@ def execute( placer: Placer | type[Placer] | tuple[type[Placer], dict] | None = None, router: Router | type[Router] | tuple[type[Router], dict] | None = None, routing_iterations: int = 10, + optimize: bool = True, ) -> Result | list[Result]: """Executes a Qibo circuit (or a list of circuits) with qililab and returns the results. @@ -47,6 +48,7 @@ def execute( router (Router | type[Router] | tuple[type[Router], dict], optional): `Router` instance, or subclass `type[Router]` to use,` with optionally, its kwargs dict (other than connectivity), both in a tuple. Defaults to `Sabre`. routing_iterations (int, optional): Number of times to repeat the routing pipeline, to keep the best stochastic result. Defaults to 10. + optimize (bool, optional): whether to optimize the circuit and/or transpilation. Defaults to True. Returns: Result | list[Result]: :class:`Result` class (or list of :class:`Result` classes) containing the results of the @@ -96,6 +98,7 @@ def execute( placer=placer, router=router, routing_iterations=routing_iterations, + optimize=optimize, ) for circuit in tqdm(program, total=len(program)) ] diff --git a/src/qililab/platform/platform.py b/src/qililab/platform/platform.py index 9d7b5d0d4..92f513bd3 100644 --- a/src/qililab/platform/platform.py +++ b/src/qililab/platform/platform.py @@ -905,6 +905,7 @@ def execute( placer: Placer | type[Placer] | tuple[type[Placer], dict] | None = None, router: Router | type[Router] | tuple[type[Router], dict] | None = None, routing_iterations: int = 10, + optimize: bool = True, ) -> Result | QbloxResult: """Compiles and executes a circuit or a pulse schedule, using the platform instruments. @@ -924,6 +925,7 @@ def execute( router (Router | type[Router] | tuple[type[Router], dict], optional): `Router` instance, or subclass `type[Router]` to use, with optionally, its kwargs dict (other than connectivity), both in a tuple. Defaults to `Sabre`. routing_iterations (int, optional): Number of times to repeat the routing pipeline, to keep the best stochastic result. Defaults to 10. + optimize (bool, optional): whether to optimize the circuit and/or transpilation. Defaults to True. Returns: Result: Result obtained from the execution. This corresponds to a numpy array that depending on the @@ -936,7 +938,7 @@ def execute( """ # Compile pulse schedule programs, final_layout = self.compile( - program, num_avg, repetition_duration, num_bins, placer, router, routing_iterations + program, num_avg, repetition_duration, num_bins, placer, router, routing_iterations, optimize ) # Upload pulse schedule @@ -1032,6 +1034,7 @@ def compile( placer: Placer | type[Placer] | tuple[type[Placer], dict] | None = None, router: Router | type[Router] | tuple[type[Router], dict] | None = None, routing_iterations: int = 10, + optimize: bool = True, ) -> tuple[dict[str, list[QpySequence]], dict[str, int] | None]: """Compiles the circuit / pulse schedule into a set of assembly programs, to be uploaded into the awg buses. @@ -1050,6 +1053,7 @@ def compile( router (Router | type[Router] | tuple[type[Router], dict], optional): `Router` instance, or subclass `type[Router]` to use, with optionally, its kwargs dict (other than connectivity), both in a tuple. Defaults to `Sabre`. routing_iterations (int, optional): Number of times to repeat the routing pipeline, to keep the best stochastic result. Defaults to 10. + optimize (bool, optional): whether to optimize the circuit and/or transpilation. Defaults to True. Returns: tuple[dict, dict[str, int]]: Tuple containing the dictionary of compiled assembly programs (The key is the bus alias (``str``), and the value is the assembly compilation (``list``)) and the final layout of the qubits in the circuit {"qX":Y}. @@ -1065,7 +1069,7 @@ def compile( transpiler = CircuitTranspiler(digital_compilation_settings=self.digital_compilation_settings) transpiled_circuits, final_layouts = transpiler.transpile_circuits( - [program], placer, router, routing_iterations + [program], placer, router, routing_iterations, optimize ) pulse_schedule, final_layout = transpiled_circuits[0], final_layouts[0] diff --git a/tests/digital/test_circuit_optimizer.py b/tests/digital/test_circuit_optimizer.py new file mode 100644 index 000000000..c01c3a53a --- /dev/null +++ b/tests/digital/test_circuit_optimizer.py @@ -0,0 +1,129 @@ +from unittest.mock import patch +import numpy as np +from qibo import Circuit, gates + +from qililab.digital.circuit_optimizer import CircuitOptimizer +from qililab.digital.native_gates import Drag +from qililab.settings.digital.digital_compilation_settings import DigitalCompilationSettings + + +class TestCircuitOptimizerIntegration: + """Tests for the circuit optimizer class, with integration tests.""" + + def test_run_gate_cancelation(self): + """Test run gate cancelation.""" + # Create a circuit with two gates that cancel each other. + circuit = Circuit(5) + + # pairs that cancels: + circuit.add(gates.H(0)) + circuit.add(gates.H(0)) + + # From here only the X(4) will cancel with the X(4) at the end. + circuit.add(gates.CNOT(2,3)) # 1 + circuit.add(gates.X(4)) + circuit.add(gates.H(3)) # 2 + + # The 0-1 and 1-4 CNOTs shold cancel each other. + circuit.add(gates.CNOT(1,4)) + circuit.add(gates.CNOT(0,1)) + circuit.add(Drag(3, theta=2*np.pi, phase=np.pi)) # 3 + circuit.add(gates.CNOT(0,1)) + circuit.add(gates.CNOT(1,4)) + + circuit.add(gates.H(3)) # 4 + circuit.add(gates.X(4)) + circuit.add(gates.CNOT(2,3)) # 5 + + + # Optimize the circuit. + optimizer = CircuitOptimizer(None) + optimized_circuit = optimizer.run_gate_cancellations(circuit) + + # Check that the circuit is optimized + assert len(optimized_circuit.queue) == 5 + # Check name attribute: + assert [gate.name for gate in optimized_circuit.queue] == ["cx", "h", "drag", "h", "cx"] + # CHeck the type of the gates: + assert [type(gate).__name__ for gate in optimized_circuit.queue] == ["CNOT", "H", "Drag", "H", "CNOT"] + # Assert the initial arguments: + assert [gate.init_args for gate in optimized_circuit.queue] == [[2,3], [3], [3], [3], [2,3]] + assert [gate.init_kwargs for gate in optimized_circuit.queue] == [{}, {}, {"theta": 2*np.pi, "phase": np.pi, "trainable": True}, {}, {}] + + +class TestCircuitOptimizerUnit: + """Tests for the circuit optimizer class, with Unit test.""" + + @patch("qililab.digital.circuit_optimizer.CircuitOptimizer.cancel_pairs_of_hermitian_gates", return_value=[gates.CZ(0, 1), Drag(0, theta=np.pi, phase=np.pi / 2)]) + def test_run_gate_cancellations(self, mock_cancelation): + """Test optimize transpilation.""" + circuit = Circuit(2) + circuit.add(gates.RZ(0, theta=np.pi / 2)) + circuit.add(gates.CZ(0, 1)) + circuit.add(Drag(0, theta=np.pi, phase=np.pi / 2)) + + optimizer = CircuitOptimizer(None) + optimized_gates = optimizer.run_gate_cancellations(circuit) + + mock_cancelation.assert_called_once_with(circuit) + assert len(optimized_gates) == 2 + assert [gate.name for gate in optimized_gates] == ["cz", "drag"] + assert [type(gate).__name__ for gate in optimized_gates] == ["CZ", "Drag"] + + + @patch("qililab.digital.circuit_optimizer.CircuitOptimizer._create_circuit", return_value=Circuit(5)) + @patch("qililab.digital.circuit_optimizer.CircuitOptimizer._sweep_circuit_cancelling_pairs_of_hermitian_gates", return_value=[("CZ", [0, 1], {}), ("Drag", [0], {"theta": np.pi, "phase": np.pi / 2})]) + @patch("qililab.digital.circuit_optimizer.CircuitOptimizer._get_circuit_gates", return_value=[("CZ", [0, 1], {}), ("Drag", [0], {"theta": np.pi, "phase": np.pi / 2})]) + def test_cancel_pairs_of_hermitian_gates(self, mock_get_circuit_gates, mock_sweep_circuit, mock_create_circuit): + """Test run gate cancellations with mocks.""" + circuit = Circuit(2) + circuit.add(gates.RZ(0, theta=np.pi / 2)) + circuit.add(gates.CZ(0, 1)) + circuit.add(Drag(0, theta=np.pi, phase=np.pi / 2)) + + optimizer = CircuitOptimizer(None) + _ = optimizer.cancel_pairs_of_hermitian_gates(circuit) + + mock_get_circuit_gates.assert_called_once_with(circuit) + mock_sweep_circuit.assert_called_once_with([("CZ", [0, 1], {}), ("Drag", [0], {"theta": np.pi, "phase": np.pi / 2})]) + mock_create_circuit.assert_called_once_with([("CZ", [0, 1], {}), ("Drag", [0], {"theta": np.pi, "phase": np.pi / 2})], circuit.nqubits) + + + def test_get_circuit_gates(self): + """Test get circuit gates.""" + circuit = Circuit(2) + circuit.add(gates.X(0)) + circuit.add(gates.H(1)) + + gates_list = CircuitOptimizer._get_circuit_gates(circuit) + + assert gates_list == [("X", [0], {}), ("H", [1], {})] + + def test_create_gate(self): + """Test create gate.""" + gate = CircuitOptimizer._create_gate("X", [0], {}) + assert isinstance(gate, gates.X) + assert gate.init_args == [0] + + def test_create_circuit(self): + """Test create circuit.""" + gates_list = [("X", [0], {}), ("H", [1], {})] + circuit = CircuitOptimizer._create_circuit(gates_list, 2) + + assert len(circuit.queue) == 2 + assert [gate.name for gate in circuit.queue] == ["x", "h"] + + def test_sweep_circuit_cancelling_pairs_of_hermitian_gates(self): + """Test sweep circuit cancelling pairs of hermitian gates.""" + circ_list = [("X", [0], {}), ("X", [0], {}), ("H", [1], {}), ("H", [1], {})] + output_circ_list = CircuitOptimizer._sweep_circuit_cancelling_pairs_of_hermitian_gates(circ_list) + + assert output_circ_list == [] + + def test_extract_qubits(self): + """Test extract qubits.""" + qubits = CircuitOptimizer._extract_qubits([0, 1]) + assert qubits == [0, 1] + + qubits = CircuitOptimizer._extract_qubits(0) + assert qubits == [0] diff --git a/tests/digital/test_circuit_transpiler.py b/tests/digital/test_circuit_transpiler.py index b079877a3..d9fe68ac6 100644 --- a/tests/digital/test_circuit_transpiler.py +++ b/tests/digital/test_circuit_transpiler.py @@ -501,7 +501,7 @@ def test_circuit_to_native(self): exhaustive=True, ) - c2 = transpiler.circuit_to_native(c1, optimize=False) + c2 = transpiler.circuit_to_native(c1) # check that both c1, c2 are qibo.Circuit assert isinstance(c1, Circuit) @@ -559,8 +559,13 @@ def test_optimize_transpilation(self, digital_settings): Drag(1, 2, -2), ] + # create circuit to test function with + circuit = Circuit(3) + circuit.add(test_gates) + # check that lists are the same - optimized_gates = transpiler.optimize_transpilation(3, test_gates) + circuit = transpiler.optimize_transpilation(circuit) + optimized_gates = list(circuit.queue) for gate_r, gate_opt in zip(result_gates, optimized_gates): assert gate_r.name == gate_opt.name assert gate_r.parameters == gate_opt.parameters @@ -640,10 +645,14 @@ def test_drag_schedule_error(self, digital_settings): with pytest.raises(ValueError, match=error_string): transpiler.circuit_to_pulses(circuits=[circuit]) + + @pytest.mark.parametrize("optimize", [True, False]) + @patch("qililab.digital.circuit_transpiler.CircuitTranspiler.optimize_circuit") + @patch("qililab.digital.circuit_transpiler.CircuitTranspiler.optimize_transpilation") @patch("qililab.digital.circuit_transpiler.CircuitTranspiler.route_circuit") @patch("qililab.digital.circuit_transpiler.CircuitTranspiler.circuit_to_native") @patch("qililab.digital.circuit_transpiler.CircuitTranspiler.circuit_to_pulses") - def test_transpile_circuits(self, mock_to_pulses, mock_to_native, mock_route, digital_settings): + def test_transpile_circuits(self, mock_to_pulses, mock_to_native, mock_route, mock_opt_trans, mock_opt_circuit, optimize, digital_settings): """Test transpile_circuits method""" transpiler = CircuitTranspiler(digital_compilation_settings=digital_settings) placer = MagicMock() @@ -653,7 +662,7 @@ def test_transpile_circuits(self, mock_to_pulses, mock_to_native, mock_route, di # Mock circuit for return values mock_circuit = Circuit(5) - mock_circuit.add(X(0)) + mock_circuit.add(Drag(0, 2*np.pi, np.pi)) # Mock layout for return values mock_layout = {"q0": 0, "q1": 2, "q2": 1, "q3": 3, "q4": 4} @@ -663,21 +672,33 @@ def test_transpile_circuits(self, mock_to_pulses, mock_to_native, mock_route, di # Mock the return values mock_route.return_value = mock_circuit, mock_layout + mock_opt_circuit.return_value = mock_circuit mock_to_native.return_value = mock_circuit + mock_opt_trans.return_value = mock_circuit mock_to_pulses.return_value = [mock_schedule] circuit = random_circuit(5, 10, np.random.default_rng()) - list_schedules, list_layouts = transpiler.transpile_circuits([circuit]*list_size, placer, router, routing_iterations) + list_schedules, list_layouts = transpiler.transpile_circuits([circuit]*list_size, placer, router, routing_iterations, optimize=optimize) # Asserts: + # The next two functions get called for individual circuits: mock_route.assert_called_with(circuit, placer, router, iterations=routing_iterations) - assert mock_route.call_count == list_size mock_to_native.assert_called_with(mock_circuit) - assert mock_to_native.call_count == list_size + assert mock_route.call_count == mock_to_native.call_count == list_size + # The last one instead gets called for the whole list: mock_to_pulses.assert_called_once_with([mock_circuit]*list_size) assert list_schedules, list_layouts == ([mock_schedule]*list_size, [mock_layout]*list_size) + # Asserts in optimizeL, which is called for individual circuits: + if optimize: + mock_opt_circuit.assert_called_with(mock_circuit) + mock_opt_trans.assert_called_with(mock_circuit) + assert mock_opt_circuit.call_count == mock_opt_trans.call_count == list_size + else: + mock_opt_circuit.assert_not_called() + mock_opt_trans.assert_not_called() + @patch("qililab.digital.circuit_router.CircuitRouter.route") def test_route_circuit(self, mock_route, digital_settings): """Test route_circuit method"""