Skip to content

Commit

Permalink
Merge pull request #5 from zdomke/archiver_time_plot
Browse files Browse the repository at this point in the history
ENH Archiver Time Plot insert live data
  • Loading branch information
YektaY authored Apr 12, 2024
2 parents 46dc86d + a88ac93 commit 77d2f59
Show file tree
Hide file tree
Showing 3 changed files with 84 additions and 9 deletions.
23 changes: 18 additions & 5 deletions examples/archiver_time_plot/archiver_time_plot_example.py
Original file line number Diff line number Diff line change
@@ -1,7 +1,7 @@
from pydm import Display

from qtpy import QtCore
from qtpy.QtWidgets import QHBoxLayout, QApplication
from qtpy.QtWidgets import QHBoxLayout, QApplication, QCheckBox
from pydm.widgets import PyDMArchiverTimePlot


Expand All @@ -22,23 +22,36 @@ def setup_ui(self):
self.setLayout(self.main_layout)
self.plot_live = PyDMArchiverTimePlot(background=[255, 255, 255, 255])
self.plot_archived = PyDMArchiverTimePlot(background=[255, 255, 255, 255])
self.chkbx_live = QCheckBox()
self.chkbx_live.setChecked(True)
self.chkbx_archived = QCheckBox()
self.chkbx_archived.setChecked(True)
self.main_layout.addWidget(self.chkbx_live)
self.main_layout.addWidget(self.plot_live)
self.main_layout.addWidget(self.plot_archived)
self.main_layout.addWidget(self.chkbx_archived)

self.plot_live.addYChannel(
y_channel="CA://XCOR:LI29:302:IACT",
curve_live = self.plot_live.addYChannel(
y_channel="ca://XCOR:LI29:302:IACT",
name="name",
color="red",
yAxisName="Axis",
useArchiveData=True,
liveData=True,
)

self.plot_archived.addYChannel(
y_channel="CA://XCOR:LI29:302:IACT",
curve_archived = self.plot_archived.addYChannel(
y_channel="ca://XCOR:LI28:302:IACT",
name="name",
color="blue",
yAxisName="Axis",
useArchiveData=True,
liveData=False,
)

self.chkbx_live.stateChanged.connect(lambda x: self.set_live(curve_live, x))
self.chkbx_archived.stateChanged.connect(lambda x: self.set_live(curve_archived, x))

@staticmethod
def set_live(curve, live):
curve.liveData = live
35 changes: 31 additions & 4 deletions pydm/widgets/archiver_time_plot.py
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@ def __init__(
self.archive_points_accumulated = 0
self._archiveBufferSize = DEFAULT_ARCHIVE_BUFFER_SIZE
self.archive_data_buffer = np.zeros((2, self._archiveBufferSize), order="f", dtype=float)
self.liveData = liveData
self._liveData = liveData

# When optimized or mean value data is requested, we can display error bars representing
# the full range of values retrieved
Expand All @@ -63,11 +63,30 @@ def to_dict(self) -> OrderedDict:
dic_ = OrderedDict(
[
("useArchiveData", self.use_archive_data),
("liveData", self.liveData)
]
)
dic_.update(super(ArchivePlotCurveItem, self).to_dict())
return dic_

@property
def liveData(self):
return self._liveData

@liveData.setter
def liveData(self, get_live: bool):
if not get_live:
self._liveData = False
return

min_x = self.data_buffer[0, self._bufferSize - 1]
max_x = time.time()

# Avoids noisy requests when first rendering the plot
if max_x - min_x > 5:
self.archive_data_request_signal.emit(min_x, max_x - 1, "")


def setArchiveChannel(self, address: str) -> None:
"""Creates the channel for the input address for communicating with the archiver appliance plugin."""
archiver_prefix = "archiver://pv="
Expand Down Expand Up @@ -98,6 +117,14 @@ def receiveArchiveData(self, data: np.ndarray) -> None:
archive_data_length = len(data[0])
max_x = data[0][archive_data_length - 1]

# Filling live buffer if data is more recent than Archive Data Buffer
last_ts = self.archive_data_buffer[0][-1]
if self.archive_data_buffer.any() and (int(last_ts) <= data[0][0]):
self.insert_live_data(data)
self._liveData = True
self.data_changed.emit()
return

if self.points_accumulated != 0:
while max_x > self.data_buffer[0][-self.points_accumulated]:
# Sometimes optimized queries return data past the current timestamp, this will delete those data points
Expand Down Expand Up @@ -218,7 +245,7 @@ def channels(self) -> List[PyDMChannel]:

def receiveNewValue(self, new_value):
""" """
if self.liveData:
if self._liveData:
super().receiveNewValue(new_value)


Expand Down Expand Up @@ -429,11 +456,11 @@ def addYChannel(
yAxisName=None,
useArchiveData=False,
liveData=True,
):
) -> ArchivePlotCurveItem:
"""
Overrides timeplot addYChannel method to be able to pass the liveData flag.
"""
super().addYChannel(
return super().addYChannel(
y_channel=y_channel,
plot_style=plot_style,
name=name,
Expand Down
35 changes: 35 additions & 0 deletions pydm/widgets/timeplot.py
Original file line number Diff line number Diff line change
Expand Up @@ -257,6 +257,41 @@ def resetBufferSize(self):
self._bufferSize = DEFAULT_BUFFER_SIZE
self.initialize_buffer()

def insert_live_data(self, data: np.ndarray) -> None:
"""
Inserts data directly into the live buffer.
Example use case would be pausing the gathering of data and
filling the buffer with missed data.
Parameters
----------
data : np.ndarray
A numpy array of shape (2, length_of_data). Index 0 contains
timestamps and index 1 contains the data observations.
"""
live_data_length = len(data[0])
min_x = data[0][0]
max_x = data[0][live_data_length - 1]
# Get the indices between which we want to insert the data
min_insertion_index = np.searchsorted(self.data_buffer[0], min_x)
max_insertion_index = np.searchsorted(self.data_buffer[0], max_x)
# Delete any non-raw data between the indices so we don't have multiple data points for the same timestamp
self.data_buffer = np.delete(
self.data_buffer, slice(min_insertion_index, max_insertion_index), axis=1
)
num_points_deleted = max_insertion_index - min_insertion_index
delta_points = live_data_length - num_points_deleted
if live_data_length > num_points_deleted:
# If the insertion will overflow the data buffer, need to delete the oldest points
self.data_buffer = np.delete(self.data_buffer, slice(0, delta_points), axis=1)
else:
self.data_buffer = np.insert(self.data_buffer, [0], np.zeros((2, delta_points)), axis=1)
min_insertion_index = np.searchsorted(self.data_buffer[0], min_x)
self.data_buffer = np.insert(self.data_buffer, [min_insertion_index], data[0:2], axis=1)

self.points_accumulated += live_data_length - num_points_deleted

@Slot()
def redrawCurve(self, min_x: Optional[float] = None, max_x: Optional[float] = None):
"""
Expand Down

0 comments on commit 77d2f59

Please sign in to comment.