diff --git a/qualang_tools/control_panel/video_mode/dash_tools.py b/qualang_tools/control_panel/video_mode/dash_tools.py index 52c50244..b6190e50 100644 --- a/qualang_tools/control_panel/video_mode/dash_tools.py +++ b/qualang_tools/control_panel/video_mode/dash_tools.py @@ -1,3 +1,4 @@ +from typing import Literal, Optional from dash import html from dash_extensions.enrich import dcc import plotly.graph_objects as go @@ -72,7 +73,7 @@ def create_input_field(id, label, value, debounce=True, input_style=None, div_st return html.Div(elements, style=div_style) -def create_axis_layout(axis, span, points, min_span, max_span): +def create_axis_layout(axis: Literal["x", "y"], span: float, points: int, min_span: float, max_span: Optional[float] = None): return html.Div( [ html.Label(axis.upper(), style={"text-align": "left"}), diff --git a/qualang_tools/control_panel/video_mode/data_acquirers.py b/qualang_tools/control_panel/video_mode/data_acquirers.py index 8c9629b6..aee8673c 100644 --- a/qualang_tools/control_panel/video_mode/data_acquirers.py +++ b/qualang_tools/control_panel/video_mode/data_acquirers.py @@ -8,6 +8,7 @@ from qm.jobs.running_qm_job import RunningQmJob from qm.qua import * from qualang_tools.control_panel.video_mode.scan_modes import ScanMode +from qualang_tools.control_panel.video_mode.sweep_axis import SweepAxis __all__ = ["BaseDataAcquirer", "RandomDataAcquirer", "OPXDataAcquirer"] @@ -17,79 +18,41 @@ class BaseDataAcquirer(ABC): def __init__( self, *, - x_offset_parameter, - y_offset_parameter, - x_span, - y_span, - x_attenuation: float = 0.0, - y_attenuation: float = 0.0, - x_points=101, - y_points=101, + x_axis: SweepAxis, + y_axis: SweepAxis, num_averages=1, **kwargs, ): assert not kwargs - self.x_offset_parameter = x_offset_parameter - self.y_offset_parameter = y_offset_parameter - self.x_span = x_span - self.y_span = y_span + self.x_axis = x_axis + self.y_axis = y_axis self.num_averages = num_averages - self.x_points = x_points - self.y_points = y_points - self.x_attenuation = x_attenuation - self.y_attenuation = y_attenuation self.data_history = [] logging.debug("Initializing DataGenerator") self.num_acquisitions = 0 self.data_array = xr.DataArray( - np.zeros((self.x_points, self.y_points)), - coords=[("x", self.x_vals_offset), ("y", self.y_vals_offset)], + np.zeros((self.x_axis.points, self.y_axis.points)), + coords=[ + (self.x_axis.name, self.x_axis.sweep_values_with_offset), + (self.y_axis.name, self.y_axis.sweep_values_with_offset), + ], attrs={"units": "V", "long_name": "Signal"}, ) - for axis, param in {"x": self.x_offset_parameter, "y": self.y_offset_parameter}.items(): - if getattr(param, "label", None): - param_name = param.label - elif getattr(param, "name", None): - param_name = param.name - else: - param_name = "X_axis" - self.data_array.coords[axis].attrs.update({"units": "V", "long_name": param_name}) + for axis in [self.x_axis, self.y_axis]: + label = axis.label or axis.name + self.data_array.coords[axis.name].attrs.update({"units": "V", "long_name": label}) logging.debug("DataGenerator initialized with initial data") - @property - def x_vals(self): - return np.linspace(-self.x_span / 2, self.x_span / 2, self.x_points) - - @property - def y_vals(self): - return np.linspace(-self.y_span / 2, self.y_span / 2, self.y_points) - - @property - def x_vals_unattenuated(self): - x_attenuation_factor = 10 ** (self.x_attenuation / 20) # Convert dB to voltage scale - return self.x_vals * x_attenuation_factor - - @property - def y_vals_unattenuated(self): - y_attenuation_factor = 10 ** (self.y_attenuation / 20) # Convert dB to voltage scale - return self.y_vals * y_attenuation_factor - - @property - def x_vals_offset(self): - return self.x_vals + self.x_offset_parameter.get_latest() - - @property - def y_vals_offset(self): - return self.y_vals + self.y_offset_parameter.get_latest() - def update_voltage_ranges(self): - self.data_array = self.data_array.assign_coords(x=self.x_vals, y=self.y_vals) + self.data_array = self.data_array.assign_coords( + {axis.name: axis.sweep_values for axis in [self.x_axis, self.y_axis]} + ) - x_vals = self.x_vals - y_vals = self.y_vals + x_vals = self.x_axis.sweep_values + y_vals = self.y_axis.sweep_values logging.debug( f"Updated voltage ranges: " f"x_vals=[{x_vals[0]}, {x_vals[1]}, ..., {x_vals[-1]}], " @@ -97,11 +60,8 @@ def update_voltage_ranges(self): ) def update_attrs(self, attrs): - for attr_name, attr in attrs.items(): - if attr_name in ["x_offset", "y_offset"]: - attr["obj"].set(attr["new"]) - else: - setattr(attr["obj"], attr_name, attr["new"]) + for attr in attrs: + setattr(attr["obj"], attr["key"], attr["new"]) pass @abstractmethod @@ -124,12 +84,16 @@ def update_data(self): self.data_array = xr.DataArray( averaged_data, - coords=[("x", self.x_vals_offset), ("y", self.y_vals_offset)], + coords=[("x", self.x_axis.sweep_values_with_offset), ("y", self.y_axis.sweep_values_with_offset)], attrs=self.data_array.attrs, # Preserve original attributes like units ) - self.data_array.coords["x"].attrs.update({"units": "V", "long_name": self.x_offset_parameter.name}) - self.data_array.coords["y"].attrs.update({"units": "V", "long_name": self.y_offset_parameter.name}) + self.data_array.coords[self.x_axis.name].attrs.update( + {"units": "V", "long_name": self.x_axis.label or self.x_axis.name} + ) + self.data_array.coords[self.y_axis.name].attrs.update( + {"units": "V", "long_name": self.y_axis.label or self.y_axis.name} + ) mean_abs_data = np.mean(np.abs(averaged_data)) logging.debug(f"Data acquired with shape: {self.data_array.shape}, mean(abs(data)) = {mean_abs_data}") return self.data_array @@ -139,8 +103,7 @@ class RandomDataAcquirer(BaseDataAcquirer): def acquire_data(self): sleep(1) - results = np.random.rand(len(self.y_vals), len(self.x_vals)) - + results = np.random.rand(self.x_axis.points, self.y_axis.points) return results @@ -154,14 +117,8 @@ def __init__( qm: QuantumMachine, qua_inner_loop_action: Callable, scan_mode: ScanMode, - x_offset_parameter, - y_offset_parameter, - x_span, - y_span, - x_attenuation: float = 0.0, - y_attenuation: float = 0.0, - x_points=101, - y_points=101, + x_axis: SweepAxis, + y_axis: SweepAxis, num_averages=1, result_type: Literal["I", "Q", "abs", "phase"] = "I", initial_delay: Optional[float] = None, @@ -177,15 +134,9 @@ def __init__( self.results: Dict[str, Any] = {} super().__init__( - x_offset_parameter=x_offset_parameter, - y_offset_parameter=y_offset_parameter, - x_span=x_span, - y_span=y_span, - x_attenuation=x_attenuation, - y_attenuation=y_attenuation, + x_axis=x_axis, + y_axis=y_axis, num_averages=num_averages, - x_points=x_points, - y_points=y_points, **kwargs, ) @@ -193,15 +144,15 @@ def update_attrs(self, attrs): super().update_attrs(attrs) logging.info(f"Updated attrs: {attrs}") - requires_regeneration = ["x_span", "y_span", "x_points", "y_points"] - if any(attr in requires_regeneration for attr in attrs): + requires_regeneration = ["span", "points"] + if any(attr["key"] in requires_regeneration for attr in attrs): logging.info("Regenerating QUA program due to new parameters") self.program = None self.run_program() def generate_program(self) -> Program: - x_vals = self.x_vals_unattenuated - y_vals = self.y_vals_unattenuated + x_vals = self.x_axis.sweep_values_unattenuated + y_vals = self.y_axis.sweep_values_unattenuated with program() as prog: IQ_streams = {"I": declare_stream(), "Q": declare_stream()} @@ -218,8 +169,8 @@ def generate_program(self) -> Program: with stream_processing(): streams = { - "I": IQ_streams["I"].buffer(self.x_points * self.y_points), - "Q": IQ_streams["Q"].buffer(self.x_points * self.y_points), + "I": IQ_streams["I"].buffer(self.x_axis.points * self.y_axis.points), + "Q": IQ_streams["Q"].buffer(self.x_axis.points * self.y_axis.points), } combined_stream = None for var in self.stream_vars: @@ -240,8 +191,8 @@ def process_results(self, results: Dict[str, Any]) -> np.ndarray: else: raise ValueError(f"Invalid result type: {self.result_type}") - x_idxs, y_idxs = self.scan_mode.get_idxs(x_points=self.x_points, y_points=self.y_points) - results_2D = np.zeros((self.y_points, self.x_points), dtype=float) + x_idxs, y_idxs = self.scan_mode.get_idxs(x_points=self.x_axis.points, y_points=self.y_axis.points) + results_2D = np.zeros((self.y_axis.points, self.x_axis.points), dtype=float) results_2D[y_idxs, x_idxs] = result return results_2D diff --git a/qualang_tools/control_panel/video_mode/scan_modes.py b/qualang_tools/control_panel/video_mode/scan_modes.py index 22385cb9..8e267ee5 100644 --- a/qualang_tools/control_panel/video_mode/scan_modes.py +++ b/qualang_tools/control_panel/video_mode/scan_modes.py @@ -1,6 +1,6 @@ from abc import ABC, abstractmethod from contextlib import contextmanager -from typing import Dict, Iterator, Sequence, Callable, Tuple, Generator +from typing import Any, Dict, Iterator, Sequence, Callable, Tuple, Generator from matplotlib import pyplot as plt from matplotlib.ticker import MultipleLocator @@ -14,7 +14,7 @@ class ScanMode(ABC): def get_idxs(self, x_points: int, y_points: int) -> Tuple[np.ndarray, np.ndarray]: pass - def plot_scan(self, x_points: int, y_points: int): + def plot_scan(self, x_points: int, y_points: int) -> Tuple[plt.Figure, plt.Axes]: idxs_x, idxs_y = self.get_idxs(x_points, y_points) u = np.diff(idxs_x) @@ -33,9 +33,11 @@ def plot_scan(self, x_points: int, y_points: int): ax.yaxis.set_minor_locator(MultipleLocator(abs(np.max(v)))) plt.show() + return fig, ax + @abstractmethod def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]) -> Iterator[None]: - yield + pass class RasterScan(ScanMode): @@ -44,7 +46,7 @@ def get_idxs(self, x_points: int, y_points: int) -> Tuple[np.ndarray, np.ndarray y_idxs = np.repeat(np.arange(y_points), x_points) return x_idxs, y_idxs - def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]): + def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]) -> Iterator[None]: voltages = {"x": declare(fixed), "y": declare(fixed)} with for_(*from_array(voltages["y"], y_vals)): # type: ignore @@ -74,7 +76,7 @@ def get_idxs(self, x_points: int, y_points: int) -> Tuple[np.ndarray, np.ndarray y_idxs = np.repeat(y_idxs, x_points) return x_idxs, y_idxs - def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]): + def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]) -> Generator[Dict[str, Any], None, None]: voltages = {"x": declare(fixed), "y": declare(fixed)} # with for_(*from_array(voltages["y"], self.interleave_arr(y_vals))): # type: ignore diff --git a/qualang_tools/control_panel/video_mode/sweep_axis.py b/qualang_tools/control_panel/video_mode/sweep_axis.py new file mode 100644 index 00000000..4a8cd30c --- /dev/null +++ b/qualang_tools/control_panel/video_mode/sweep_axis.py @@ -0,0 +1,37 @@ +from dataclasses import dataclass +from typing import Literal, Optional + +import numpy as np + +from qualang_tools.control_panel.video_mode.voltage_parameters import VoltageParameter + + +__all__ = ["SweepAxis"] + + +@dataclass +class SweepAxis: + name: str + span: float + points: int + label: Optional[str] = None + offset_parameter: Optional[VoltageParameter] = None + attenuation: float = 0 + + @property + def sweep_values(self): + return np.linspace(-self.span / 2, self.span / 2, self.points) + + @property + def sweep_values_unattenuated(self): + return self.sweep_values * 10 ** (self.attenuation / 20) + + @property + def sweep_values_with_offset(self): + if self.offset_parameter is None: + return self.sweep_values_unattenuated + return self.sweep_values_unattenuated + self.offset_parameter.get_latest() + + @property + def amplitude_scale(self): + return 10 ** (-self.attenuation / 20) diff --git a/qualang_tools/control_panel/video_mode/test_video_mode.py b/qualang_tools/control_panel/video_mode/test_video_mode.py index d9bae899..6bc306bb 100644 --- a/qualang_tools/control_panel/video_mode/test_video_mode.py +++ b/qualang_tools/control_panel/video_mode/test_video_mode.py @@ -3,6 +3,7 @@ import numpy as np from matplotlib import pyplot as plt from qualang_tools.control_panel.video_mode.voltage_parameters import * +from qualang_tools.control_panel.video_mode.sweep_axis import * from qualang_tools.control_panel.video_mode.data_acquirers import * from qualang_tools.control_panel.video_mode.video_mode import * @@ -49,21 +50,14 @@ y_element=machine.channels["ch2"], readout_pulse=readout_pulse, ) -# scan_mode = RasterScan() + scan_mode = SpiralScan() data_acquirer = OPXDataAcquirer( qm=qm, qua_inner_loop_action=inner_loop_action, scan_mode=scan_mode, - x_offset_parameter=x_offset, - y_offset_parameter=y_offset, - x_span=0.02, - y_span=0.02, - x_attenuation=0, - y_attenuation=0, - num_averages=5, - x_points=11, - y_points=11, + x_axis=SweepAxis("x", span=0.03, points=11, offset_parameter=x_offset), + y_axis=SweepAxis("y", span=0.03, points=11, offset_parameter=y_offset), result_type="abs", ) # %% Run program @@ -78,32 +72,35 @@ live_plotter = VideoMode(data_acquirer=data_acquirer, update_interval=1) live_plotter.run(use_reloader=False) -# # %% -# scan_mode.plot_scan(11, 11) +# %% +scan_mode.plot_scan(11, 11) + +# %% Generate QUA script +from qm import generate_qua_script -# # %% Generate QUA script -# from qm import generate_qua_script +qua_script = generate_qua_script(data_acquirer.generate_program(), config) +print(qua_script) -# qua_script = generate_qua_script(data_acquirer.generate_program(), config) -# print(qua_script) +# %% Simulate results +from qm import SimulationConfig -# # %% Simulate results -# from qm import SimulationConfig +prog = data_acquirer.generate_program() +simulation_config = SimulationConfig(duration=100000) # In clock cycles = 4ns +job = qmm.simulate(config, prog, simulation_config) +con1 = job.get_simulated_samples().con1 -# prog = data_acquirer.generate_program() -# simulation_config = SimulationConfig(duration=100000) # In clock cycles = 4ns -# job = qmm.simulate(config, prog, simulation_config) -# con1 = job.get_simulated_samples().con1 +con1.plot(analog_ports=["1", "2"]) -# con1.plot(analog_ports=["1", "2"]) +plt.figure() +plt.plot(con1.analog["1"], con1.analog["2"]) -# plt.figure() -# plt.plot(con1.analog["1"], con1.analog["2"]) +plt.figure() +data_acquirer.scan_mode.plot_scan(data_acquirer.x_axis.points, data_acquirer.y_axis.points) -# # %% -# from qualang_tools.control_panel.video_mode.scan_modes import SwitchRasterScan -# import numpy as np -# scan_mode = SwitchRasterScan() -# print(scan_mode.interleave_arr(np.arange(10))) -# scan_mode.plot_scan(10, 10) -# # %% +# %% +from qualang_tools.control_panel.video_mode.scan_modes import SwitchRasterScan +import numpy as np +scan_mode = SwitchRasterScan() +print(scan_mode.interleave_arr(np.arange(10))) +scan_mode.plot_scan(10, 10) +# %% diff --git a/qualang_tools/control_panel/video_mode/video_mode.py b/qualang_tools/control_panel/video_mode/video_mode.py index 9388c892..030afd67 100644 --- a/qualang_tools/control_panel/video_mode/video_mode.py +++ b/qualang_tools/control_panel/video_mode/video_mode.py @@ -108,17 +108,17 @@ def create_layout(self): ), create_axis_layout( "x", - span=self.data_acquirer.x_span, - points=self.data_acquirer.x_points, + span=self.data_acquirer.x_axis.span, + points=self.data_acquirer.x_axis.points, min_span=0.01, - max_span=self.data_acquirer.x_span * 2, + max_span=None, ), create_axis_layout( "y", - span=self.data_acquirer.y_span, - points=self.data_acquirer.y_points, + span=self.data_acquirer.y_axis.span, + points=self.data_acquirer.y_axis.points, min_span=0.01, - max_span=self.data_acquirer.y_span * 2, + max_span=None, ), html.Div( # Update and Save buttons [ @@ -138,11 +138,11 @@ def create_layout(self): dcc.Graph( id="live-heatmap", figure=self.fig, style={"width": "55%", "height": "100%", "min-width": "500px"} ), - dcc.Interval(id="interval-component", interval=self.update_interval, n_intervals=0), + dcc.Interval(id="interval-component", interval=self.update_interval * 1000, n_intervals=0), ], style={"display": "flex", "flex-direction": "row", "height": "100%", "flex-wrap": "wrap"}, ) - logging.debug(f"Dash layout created, update interval: {self.update_interval} ms") + logging.debug(f"Dash layout created, update interval: {self.update_interval*1000} ms") self.add_callbacks() def clear_data(self): @@ -188,28 +188,30 @@ def update_heatmap( y_points, ): logging.debug(f"*** Dash callback {n_intervals} called at {datetime.now().strftime('%H:%M:%S.%f')[:-3]}") - attrs = { - "num_averages": {"obj": self.data_acquirer, "new": num_averages}, - "x_span": {"obj": self.data_acquirer, "new": x_span}, - "y_span": {"obj": self.data_acquirer, "new": y_span}, - "x_points": {"obj": self.data_acquirer, "new": x_points}, - "y_points": {"obj": self.data_acquirer, "new": y_points}, - } - updated_attrs = {} + attrs = [ + dict(obj=self.data_acquirer, key="num_averages", new=num_averages), + dict(obj=self.data_acquirer.x_axis, key="span", new=x_span), + dict(obj=self.data_acquirer.y_axis, key="span", new=y_span), + dict(obj=self.data_acquirer.x_axis, key="points", new=x_points), + dict(obj=self.data_acquirer.y_axis, key="points", new=y_points), + ] + updated_attrs = [] if n_update_clicks > self._last_update_clicks: self._last_update_clicks = n_update_clicks - for attr_name, attr in attrs.items(): - attr["old"] = getattr(attr["obj"], attr_name) - + for attr in attrs: + attr["old"] = getattr(attr["obj"], attr["key"]) attr["changed"] = attr["old"] != attr["new"] if not attr["changed"]: continue - updated_attrs[attr_name] = attr + if attr["new"] is None: + continue + + updated_attrs.append(attr) - logging.debug(f"Updating {attr_name} from {attr['old']} to {attr['new']}") + logging.debug(f"Updating {attr['key']} from {attr['old']} to {attr['new']}") if updated_attrs: self.clear_data() diff --git a/qualang_tools/control_panel/video_mode/voltage_parameters.py b/qualang_tools/control_panel/video_mode/voltage_parameters.py index 330e59a6..b11a06b0 100644 --- a/qualang_tools/control_panel/video_mode/voltage_parameters.py +++ b/qualang_tools/control_panel/video_mode/voltage_parameters.py @@ -7,8 +7,9 @@ # VoltageParameter Class remains unchanged class VoltageParameter: - def __init__(self, name, initial_value=0.0, units="V"): + def __init__(self, name, label=None, initial_value=0.0, units="V"): self.name = name + self.label = label self.latest_value = initial_value self._value = initial_value self.units = units