Skip to content

Commit

Permalink
pw_log_rpc: Invoke pw.log.Logs.Listen() to restore prior behavior
Browse files Browse the repository at this point in the history
Prior to http://pwrev.dev/169174, the pw_rpc "open" feature was
improperly implemented in the Python client. As a result,
LogStreamHandler.listen_to_logs() used to actually call the
pw.rpc.Logs.Listen RPC instead of simply listening for packets. When the
fix landed, this behavior changed.

To maintain prior behavior, rename listen_to_logs to start_logging and
have it invoke the pw.log.Logs.Listen RPC.

Bug: b/364421706
Change-Id: I30fdd416190ead9adbfe85ec3bea9bfeed62b81f
Reviewed-on: https://pigweed-review.googlesource.com/c/pigweed/pigweed/+/233991
Commit-Queue: Wyatt Hepler <[email protected]>
Pigweed-Auto-Submit: Wyatt Hepler <[email protected]>
Reviewed-by: Carlos Chinchilla <[email protected]>
Lint: Lint 🤖 <[email protected]>
  • Loading branch information
255 authored and CQ Bot Account committed Sep 4, 2024
1 parent f177db3 commit 5bb6580
Show file tree
Hide file tree
Showing 4 changed files with 39 additions and 36 deletions.
2 changes: 1 addition & 1 deletion pw_log_rpc/docs.rst
Original file line number Diff line number Diff line change
Expand Up @@ -489,7 +489,7 @@ pw_log_rpc in Python
--------------------
``pw_log_rpc`` provides client utilities for dealing with RPC logging.

The ``LogStreamHandler`` offers APIs to start a log stream:``listen_to_logs``,
The ``LogStreamHandler`` offers APIs to start a log stream: ``start_logs``,
to handle RPC stream errors: ``handle_log_stream_error``, and RPC stream
completed events: ``handle_log_stream_completed``. It uses a provided
``LogStreamDecoder`` to delegate log parsing to.
Expand Down
27 changes: 15 additions & 12 deletions pw_log_rpc/py/pw_log_rpc/rpc_log_stream.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
"""Utils to decode logs."""

import logging
import warnings

from pw_log.log_decoder import LogStreamDecoder
from pw_log.proto import log_pb2
Expand All @@ -39,23 +40,25 @@ def __init__(
self._decoder = decoder

def listen_to_logs(self) -> None:
"""Requests Logs streamed over RPC.
The RPCs remain open until the server cancels or closes them, either
with a response or error packet.
"""

def on_log_entries(_, log_entries_proto: log_pb2.LogEntries) -> None:
self._decoder.parse_log_entries_proto(log_entries_proto)
warnings.warn(
'listen_to_logs is deprecated; call start_logging() instead',
DeprecationWarning,
)
self.start_logging()

self.rpcs.pw.log.Logs.Listen.open(
on_next=on_log_entries,
def start_logging(self) -> None:
"""Requests logs to be streamed over the pw.log.Logs.Listen RPC."""
self.rpcs.pw.log.Logs.Listen.invoke(
on_next=self._on_log_entries,
on_completed=lambda _, status: self.handle_log_stream_completed(
status
),
on_error=lambda _, error: self.handle_log_stream_error(error),
)

def _on_log_entries(self, _, log_entries_proto: log_pb2.LogEntries) -> None:
self._decoder.parse_log_entries_proto(log_entries_proto)

def handle_log_stream_error(self, error: pw_status.Status) -> None:
"""Resets the log stream RPC on error to avoid losing logs.
Expand All @@ -68,7 +71,7 @@ def handle_log_stream_error(self, error: pw_status.Status) -> None:
)
# Only re-request logs if the RPC was not cancelled by the client.
if error != pw_status.Status.CANCELLED:
self.listen_to_logs()
self.start_logging()

def handle_log_stream_completed(self, status: pw_status.Status) -> None:
"""Resets the log stream RPC on completed to avoid losing logs.
Expand All @@ -80,7 +83,7 @@ def handle_log_stream_completed(self, status: pw_status.Status) -> None:
status,
self.source_name,
)
self.listen_to_logs()
self.start_logging()

@property
def source_name(self) -> str:
Expand Down
44 changes: 22 additions & 22 deletions pw_log_rpc/py/rpc_log_stream_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -82,11 +82,11 @@ def _get_rpc_ids(self) -> RpcIds:
self._channel_id, service.id, method.id, client.OPEN_CALL_ID
)

def test_listen_to_logs_subsequent_calls(self):
def test_start_logging_subsequent_calls(self):
"""Test a stream of RPC Logs."""
self.log_stream_handler.handle_log_stream_error = mock.Mock()
self.log_stream_handler.handle_log_stream_completed = mock.Mock()
self.log_stream_handler.listen_to_logs()
self.log_stream_handler.start_logging()

self.assertIs(
self.client.process_packet(
Expand Down Expand Up @@ -136,11 +136,11 @@ def test_log_stream_cancelled(self):
self.log_stream_handler.handle_log_stream_error = mock.Mock()
self.log_stream_handler.handle_log_stream_completed = mock.Mock()

listen_function = _CallableWithCounter(
self.log_stream_handler.listen_to_logs
start_function = _CallableWithCounter(
self.log_stream_handler.start_logging
)
self.log_stream_handler.listen_to_logs = listen_function
self.log_stream_handler.listen_to_logs()
self.log_stream_handler.start_logging = start_function
self.log_stream_handler.start_logging()

# Send logs prior to cancellation.
self.assertIs(
Expand Down Expand Up @@ -173,7 +173,7 @@ def test_log_stream_cancelled(self):
self.log_stream_handler.handle_log_stream_completed.called
)
self.assertEqual(len(self.captured_logs), 2)
self.assertEqual(listen_function.call_count(), 1)
self.assertEqual(start_function.call_count(), 1)

def test_log_stream_error_stream_restarted(self):
"""Tests that an error on the log stream restarts the stream."""
Expand All @@ -184,11 +184,11 @@ def test_log_stream_error_stream_restarted(self):
)
self.log_stream_handler.handle_log_stream_error = error_handler

listen_function = _CallableWithCounter(
self.log_stream_handler.listen_to_logs
start_function = _CallableWithCounter(
self.log_stream_handler.start_logging
)
self.log_stream_handler.listen_to_logs = listen_function
self.log_stream_handler.listen_to_logs()
self.log_stream_handler.start_logging = start_function
self.log_stream_handler.start_logging()

# Send logs prior to cancellation.
self.assertIs(
Expand Down Expand Up @@ -217,7 +217,7 @@ def test_log_stream_error_stream_restarted(self):
self.log_stream_handler.handle_log_stream_completed.called
)
self.assertEqual(len(self.captured_logs), 2)
self.assertEqual(listen_function.call_count(), 2)
self.assertEqual(start_function.call_count(), 2)
self.assertEqual(error_handler.call_count(), 1)
self.assertEqual(error_handler.calls[0].args, (Status.UNKNOWN,))

Expand All @@ -230,11 +230,11 @@ def test_log_stream_completed_ok_stream_restarted(self):
)
self.log_stream_handler.handle_log_stream_completed = completion_handler

listen_function = _CallableWithCounter(
self.log_stream_handler.listen_to_logs
start_function = _CallableWithCounter(
self.log_stream_handler.start_logging
)
self.log_stream_handler.listen_to_logs = listen_function
self.log_stream_handler.listen_to_logs()
self.log_stream_handler.start_logging = start_function
self.log_stream_handler.start_logging()

# Send logs prior to cancellation.
self.assertIs(
Expand All @@ -261,7 +261,7 @@ def test_log_stream_completed_ok_stream_restarted(self):

self.assertFalse(self.log_stream_handler.handle_log_stream_error.called)
self.assertEqual(len(self.captured_logs), 2)
self.assertEqual(listen_function.call_count(), 2)
self.assertEqual(start_function.call_count(), 2)
self.assertEqual(completion_handler.call_count(), 1)
self.assertEqual(completion_handler.calls[0].args, (Status.OK,))

Expand All @@ -274,11 +274,11 @@ def test_log_stream_completed_with_error_stream_restarted(self):
)
self.log_stream_handler.handle_log_stream_completed = completion_handler

listen_function = _CallableWithCounter(
self.log_stream_handler.listen_to_logs
start_function = _CallableWithCounter(
self.log_stream_handler.start_logging
)
self.log_stream_handler.listen_to_logs = listen_function
self.log_stream_handler.listen_to_logs()
self.log_stream_handler.start_logging = start_function
self.log_stream_handler.start_logging()

# Send logs prior to cancellation.
self.assertIs(
Expand Down Expand Up @@ -307,7 +307,7 @@ def test_log_stream_completed_with_error_stream_restarted(self):

self.assertFalse(self.log_stream_handler.handle_log_stream_error.called)
self.assertEqual(len(self.captured_logs), 2)
self.assertEqual(listen_function.call_count(), 2)
self.assertEqual(start_function.call_count(), 2)
self.assertEqual(completion_handler.call_count(), 1)
self.assertEqual(completion_handler.calls[0].args, (Status.UNKNOWN,))

Expand Down
2 changes: 1 addition & 1 deletion pw_system/py/pw_system/device.py
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ def decoded_log_handler(log: log_decoder.Log) -> None:
self.log_stream_handler = rpc_log_stream.LogStreamHandler(
self.rpcs, self._log_decoder
)
self.log_stream_handler.listen_to_logs()
self.log_stream_handler.start_logging()

# Create the transfer manager
self.transfer_service = self.rpcs.pw.transfer.Transfer
Expand Down

0 comments on commit 5bb6580

Please sign in to comment.