Skip to content

Commit

Permalink
added sweep axes
Browse files Browse the repository at this point in the history
  • Loading branch information
nulinspiratie committed Oct 7, 2024
1 parent 8bf8bec commit f915a21
Show file tree
Hide file tree
Showing 7 changed files with 140 additions and 149 deletions.
3 changes: 2 additions & 1 deletion qualang_tools/control_panel/video_mode/dash_tools.py
Original file line number Diff line number Diff line change
@@ -1,3 +1,4 @@
from typing import Literal, Optional
from dash import html
from dash_extensions.enrich import dcc
import plotly.graph_objects as go
Expand Down Expand Up @@ -72,7 +73,7 @@ def create_input_field(id, label, value, debounce=True, input_style=None, div_st
return html.Div(elements, style=div_style)


def create_axis_layout(axis, span, points, min_span, max_span):
def create_axis_layout(axis: Literal["x", "y"], span: float, points: int, min_span: float, max_span: Optional[float] = None):
return html.Div(
[
html.Label(axis.upper(), style={"text-align": "left"}),
Expand Down
129 changes: 40 additions & 89 deletions qualang_tools/control_panel/video_mode/data_acquirers.py
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@
from qm.jobs.running_qm_job import RunningQmJob
from qm.qua import *
from qualang_tools.control_panel.video_mode.scan_modes import ScanMode
from qualang_tools.control_panel.video_mode.sweep_axis import SweepAxis


__all__ = ["BaseDataAcquirer", "RandomDataAcquirer", "OPXDataAcquirer"]
Expand All @@ -17,91 +18,50 @@ class BaseDataAcquirer(ABC):
def __init__(
self,
*,
x_offset_parameter,
y_offset_parameter,
x_span,
y_span,
x_attenuation: float = 0.0,
y_attenuation: float = 0.0,
x_points=101,
y_points=101,
x_axis: SweepAxis,
y_axis: SweepAxis,
num_averages=1,
**kwargs,
):
assert not kwargs

self.x_offset_parameter = x_offset_parameter
self.y_offset_parameter = y_offset_parameter
self.x_span = x_span
self.y_span = y_span
self.x_axis = x_axis
self.y_axis = y_axis
self.num_averages = num_averages
self.x_points = x_points
self.y_points = y_points
self.x_attenuation = x_attenuation
self.y_attenuation = y_attenuation
self.data_history = []
logging.debug("Initializing DataGenerator")

self.num_acquisitions = 0

self.data_array = xr.DataArray(
np.zeros((self.x_points, self.y_points)),
coords=[("x", self.x_vals_offset), ("y", self.y_vals_offset)],
np.zeros((self.x_axis.points, self.y_axis.points)),
coords=[
(self.x_axis.name, self.x_axis.sweep_values_with_offset),
(self.y_axis.name, self.y_axis.sweep_values_with_offset),
],
attrs={"units": "V", "long_name": "Signal"},
)
for axis, param in {"x": self.x_offset_parameter, "y": self.y_offset_parameter}.items():
if getattr(param, "label", None):
param_name = param.label
elif getattr(param, "name", None):
param_name = param.name
else:
param_name = "X_axis"
self.data_array.coords[axis].attrs.update({"units": "V", "long_name": param_name})
for axis in [self.x_axis, self.y_axis]:
label = axis.label or axis.name
self.data_array.coords[axis.name].attrs.update({"units": "V", "long_name": label})
logging.debug("DataGenerator initialized with initial data")

@property
def x_vals(self):
return np.linspace(-self.x_span / 2, self.x_span / 2, self.x_points)

@property
def y_vals(self):
return np.linspace(-self.y_span / 2, self.y_span / 2, self.y_points)

@property
def x_vals_unattenuated(self):
x_attenuation_factor = 10 ** (self.x_attenuation / 20) # Convert dB to voltage scale
return self.x_vals * x_attenuation_factor

@property
def y_vals_unattenuated(self):
y_attenuation_factor = 10 ** (self.y_attenuation / 20) # Convert dB to voltage scale
return self.y_vals * y_attenuation_factor

@property
def x_vals_offset(self):
return self.x_vals + self.x_offset_parameter.get_latest()

@property
def y_vals_offset(self):
return self.y_vals + self.y_offset_parameter.get_latest()

def update_voltage_ranges(self):
self.data_array = self.data_array.assign_coords(x=self.x_vals, y=self.y_vals)
self.data_array = self.data_array.assign_coords(
{axis.name: axis.sweep_values for axis in [self.x_axis, self.y_axis]}
)

x_vals = self.x_vals
y_vals = self.y_vals
x_vals = self.x_axis.sweep_values
y_vals = self.y_axis.sweep_values
logging.debug(
f"Updated voltage ranges: "
f"x_vals=[{x_vals[0]}, {x_vals[1]}, ..., {x_vals[-1]}], "
f"y_vals=[{y_vals[0]}, {y_vals[1]}, ..., {y_vals[-1]}]"
)

def update_attrs(self, attrs):
for attr_name, attr in attrs.items():
if attr_name in ["x_offset", "y_offset"]:
attr["obj"].set(attr["new"])
else:
setattr(attr["obj"], attr_name, attr["new"])
for attr in attrs:
setattr(attr["obj"], attr["key"], attr["new"])
pass

@abstractmethod
Expand All @@ -124,12 +84,16 @@ def update_data(self):

self.data_array = xr.DataArray(
averaged_data,
coords=[("x", self.x_vals_offset), ("y", self.y_vals_offset)],
coords=[("x", self.x_axis.sweep_values_with_offset), ("y", self.y_axis.sweep_values_with_offset)],
attrs=self.data_array.attrs, # Preserve original attributes like units
)

self.data_array.coords["x"].attrs.update({"units": "V", "long_name": self.x_offset_parameter.name})
self.data_array.coords["y"].attrs.update({"units": "V", "long_name": self.y_offset_parameter.name})
self.data_array.coords[self.x_axis.name].attrs.update(
{"units": "V", "long_name": self.x_axis.label or self.x_axis.name}
)
self.data_array.coords[self.y_axis.name].attrs.update(
{"units": "V", "long_name": self.y_axis.label or self.y_axis.name}
)
mean_abs_data = np.mean(np.abs(averaged_data))
logging.debug(f"Data acquired with shape: {self.data_array.shape}, mean(abs(data)) = {mean_abs_data}")
return self.data_array
Expand All @@ -139,8 +103,7 @@ class RandomDataAcquirer(BaseDataAcquirer):

def acquire_data(self):
sleep(1)
results = np.random.rand(len(self.y_vals), len(self.x_vals))

results = np.random.rand(self.x_axis.points, self.y_axis.points)
return results


Expand All @@ -154,14 +117,8 @@ def __init__(
qm: QuantumMachine,
qua_inner_loop_action: Callable,
scan_mode: ScanMode,
x_offset_parameter,
y_offset_parameter,
x_span,
y_span,
x_attenuation: float = 0.0,
y_attenuation: float = 0.0,
x_points=101,
y_points=101,
x_axis: SweepAxis,
y_axis: SweepAxis,
num_averages=1,
result_type: Literal["I", "Q", "abs", "phase"] = "I",
initial_delay: Optional[float] = None,
Expand All @@ -177,31 +134,25 @@ def __init__(
self.results: Dict[str, Any] = {}

super().__init__(
x_offset_parameter=x_offset_parameter,
y_offset_parameter=y_offset_parameter,
x_span=x_span,
y_span=y_span,
x_attenuation=x_attenuation,
y_attenuation=y_attenuation,
x_axis=x_axis,
y_axis=y_axis,
num_averages=num_averages,
x_points=x_points,
y_points=y_points,
**kwargs,
)

def update_attrs(self, attrs):
super().update_attrs(attrs)
logging.info(f"Updated attrs: {attrs}")

requires_regeneration = ["x_span", "y_span", "x_points", "y_points"]
if any(attr in requires_regeneration for attr in attrs):
requires_regeneration = ["span", "points"]
if any(attr["key"] in requires_regeneration for attr in attrs):
logging.info("Regenerating QUA program due to new parameters")
self.program = None
self.run_program()

def generate_program(self) -> Program:
x_vals = self.x_vals_unattenuated
y_vals = self.y_vals_unattenuated
x_vals = self.x_axis.sweep_values_unattenuated
y_vals = self.y_axis.sweep_values_unattenuated

with program() as prog:
IQ_streams = {"I": declare_stream(), "Q": declare_stream()}
Expand All @@ -218,8 +169,8 @@ def generate_program(self) -> Program:

with stream_processing():
streams = {
"I": IQ_streams["I"].buffer(self.x_points * self.y_points),
"Q": IQ_streams["Q"].buffer(self.x_points * self.y_points),
"I": IQ_streams["I"].buffer(self.x_axis.points * self.y_axis.points),
"Q": IQ_streams["Q"].buffer(self.x_axis.points * self.y_axis.points),
}
combined_stream = None
for var in self.stream_vars:
Expand All @@ -240,8 +191,8 @@ def process_results(self, results: Dict[str, Any]) -> np.ndarray:
else:
raise ValueError(f"Invalid result type: {self.result_type}")

x_idxs, y_idxs = self.scan_mode.get_idxs(x_points=self.x_points, y_points=self.y_points)
results_2D = np.zeros((self.y_points, self.x_points), dtype=float)
x_idxs, y_idxs = self.scan_mode.get_idxs(x_points=self.x_axis.points, y_points=self.y_axis.points)
results_2D = np.zeros((self.y_axis.points, self.x_axis.points), dtype=float)
results_2D[y_idxs, x_idxs] = result

return results_2D
Expand Down
12 changes: 7 additions & 5 deletions qualang_tools/control_panel/video_mode/scan_modes.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
from abc import ABC, abstractmethod
from contextlib import contextmanager
from typing import Dict, Iterator, Sequence, Callable, Tuple, Generator
from typing import Any, Dict, Iterator, Sequence, Callable, Tuple, Generator
from matplotlib import pyplot as plt
from matplotlib.ticker import MultipleLocator

Expand All @@ -14,7 +14,7 @@ class ScanMode(ABC):
def get_idxs(self, x_points: int, y_points: int) -> Tuple[np.ndarray, np.ndarray]:
pass

def plot_scan(self, x_points: int, y_points: int):
def plot_scan(self, x_points: int, y_points: int) -> Tuple[plt.Figure, plt.Axes]:
idxs_x, idxs_y = self.get_idxs(x_points, y_points)

u = np.diff(idxs_x)
Expand All @@ -33,9 +33,11 @@ def plot_scan(self, x_points: int, y_points: int):
ax.yaxis.set_minor_locator(MultipleLocator(abs(np.max(v))))
plt.show()

return fig, ax

@abstractmethod
def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]) -> Iterator[None]:
yield
pass


class RasterScan(ScanMode):
Expand All @@ -44,7 +46,7 @@ def get_idxs(self, x_points: int, y_points: int) -> Tuple[np.ndarray, np.ndarray
y_idxs = np.repeat(np.arange(y_points), x_points)
return x_idxs, y_idxs

def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]):
def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]) -> Iterator[None]:
voltages = {"x": declare(fixed), "y": declare(fixed)}

with for_(*from_array(voltages["y"], y_vals)): # type: ignore
Expand Down Expand Up @@ -74,7 +76,7 @@ def get_idxs(self, x_points: int, y_points: int) -> Tuple[np.ndarray, np.ndarray
y_idxs = np.repeat(y_idxs, x_points)
return x_idxs, y_idxs

def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]):
def scan(self, x_vals: Sequence[float], y_vals: Sequence[float]) -> Generator[Dict[str, Any], None, None]:
voltages = {"x": declare(fixed), "y": declare(fixed)}

# with for_(*from_array(voltages["y"], self.interleave_arr(y_vals))): # type: ignore
Expand Down
37 changes: 37 additions & 0 deletions qualang_tools/control_panel/video_mode/sweep_axis.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
from dataclasses import dataclass
from typing import Literal, Optional

import numpy as np

from qualang_tools.control_panel.video_mode.voltage_parameters import VoltageParameter


__all__ = ["SweepAxis"]


@dataclass
class SweepAxis:
name: str
span: float
points: int
label: Optional[str] = None
offset_parameter: Optional[VoltageParameter] = None
attenuation: float = 0

@property
def sweep_values(self):
return np.linspace(-self.span / 2, self.span / 2, self.points)

@property
def sweep_values_unattenuated(self):
return self.sweep_values * 10 ** (self.attenuation / 20)

@property
def sweep_values_with_offset(self):
if self.offset_parameter is None:
return self.sweep_values_unattenuated
return self.sweep_values_unattenuated + self.offset_parameter.get_latest()

@property
def amplitude_scale(self):
return 10 ** (-self.attenuation / 20)
Loading

0 comments on commit f915a21

Please sign in to comment.