Skip to content

Commit

Permalink
Address review feedback.
Browse files Browse the repository at this point in the history
  • Loading branch information
haydenroche5 committed Oct 4, 2023
1 parent 0a89ebe commit ab20d7e
Show file tree
Hide file tree
Showing 4 changed files with 96 additions and 47 deletions.
26 changes: 19 additions & 7 deletions notecard/notecard.py
Original file line number Diff line number Diff line change
Expand Up @@ -572,7 +572,12 @@ def __init__(self, uart_id, debug=False):
if sys.implementation.name == 'micropython':
self._available = self._available_micropython
else:
self._available = self._available_default
if hasattr(self.uart, 'in_waiting'):
self._available = self._available_default
else:
raise NotImplementedError(('Serial communications with the '
'Notecard are not supported for this'
' platform.'))

self.Reset()

Expand Down Expand Up @@ -604,7 +609,12 @@ def _read(self, length):
# The rest is the data.
data = read_buf[2:]

return (available, data_len, data)
if len(data) != data_len:
raise Exception(('Serial-over-I2C error: reported data length '
f'({data_len}) differs from actual data length'
f' ({len(data)}).'))

return available, data

def _write(self, data):
"""Perform a serial-over-I2C write."""
Expand All @@ -620,8 +630,8 @@ def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC,
received_data = bytearray()

while True:
available, data_len, data = self._read(read_len)
if data_len > 0:
available, data = self._read(read_len)
if len(data) > 0:
received_data += data

timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC
Expand Down Expand Up @@ -670,6 +680,8 @@ def transmit(self, data, delay=True):
data_left -= chunk_len
sent_in_seg += chunk_len

# We delay for CARD_REQUEST_SEGMENT_DELAY_MS ms every time a full
# "segment" of data has been transmitted.
if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN:
sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN

Expand All @@ -693,7 +705,7 @@ def _transact(self, req_bytes, rsp_expected,
start = start_timeout()
available = 0
while available == 0:
available, _, _ = self._read(0)
available, _ = self._read(0)

if timeout_secs != 0 and has_timed_out(start, timeout_secs):
raise Exception(('Timed out while querying Notecard for '
Expand Down Expand Up @@ -730,14 +742,14 @@ def Reset(self):
read_len = 0
while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000):
try:
available, data_len, data = self._read(read_len)
available, data = self._read(read_len)
except Exception as e:
if self._debug:
print(e)
time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000)
continue

if data_len > 0:
if len(data) > 0:
something_found = True
# The Notecard responds to a bare `\n` with `\r\n`. If
# we get any other characters back, it means the host
Expand Down
105 changes: 66 additions & 39 deletions test/test_i2c.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
import os
import sys
import pytest
import re
from unittest.mock import MagicMock, patch
from .unit_test_utils import TrueOnNthIteration, BooleanToggle

Expand Down Expand Up @@ -59,12 +60,29 @@ def _arrange_transact_test():
yield _arrange_transact_test


@pytest.fixture
def arrange_read_test(arrange_test):
def _arrange_read_test(available, data_len, data):
def _platform_read_side_effect(initiate_read_msg, read_buf):
read_buf[0] = available
read_buf[1] = data_len
read_buf[2:] = data

card = arrange_test()
card._platform_read = MagicMock(
side_effect=_platform_read_side_effect)

return card

yield _arrange_read_test


class TestI2C:
# Reset tests.
def test_reset_succeeds_on_good_notecard_response(
self, arrange_reset_test):
card = arrange_reset_test()
card._read.return_value = (0, 2, b'\r\n')
card._read.return_value = (0, b'\r\n')

with patch('notecard.notecard.has_timed_out',
side_effect=TrueOnNthIteration(2)):
Expand All @@ -75,7 +93,7 @@ def test_reset_succeeds_on_good_notecard_response(
def test_reset_sends_a_newline_to_clear_stale_response(
self, arrange_reset_test):
card = arrange_reset_test()
card._read.return_value = (0, 2, b'\r\n')
card._read.return_value = (0, b'\r\n')

with patch('notecard.notecard.has_timed_out',
side_effect=TrueOnNthIteration(2)):
Expand All @@ -85,7 +103,7 @@ def test_reset_sends_a_newline_to_clear_stale_response(

def test_reset_locks_and_unlocks(self, arrange_reset_test):
card = arrange_reset_test()
card._read.return_value = (0, 2, b'\r\n')
card._read.return_value = (0, b'\r\n')

with patch('notecard.notecard.has_timed_out',
side_effect=TrueOnNthIteration(2)):
Expand Down Expand Up @@ -220,10 +238,10 @@ def test_receive_returns_all_data_bytes_from_read(self, arrange_test):
card._read.side_effect = [
# There are 4 bytes available to read, and there are no more bytes
# to read in this packet.
(4, 0, None),
(4, bytearray()),
# 0 bytes available to read after this packet. 4 coming in this
# packet, and they are {}\r\n.
(0, 4, payload)
(0, payload)
]

rx_data = card.receive()
Expand All @@ -239,13 +257,13 @@ def test_receive_keeps_reading_if_data_available_after_newline(
card._read.side_effect = [
# There are 4 bytes available to read, and there are no more bytes
# to read in this packet.
(4, 0, None),
(4, bytearray()),
# 2 bytes available to read after this packet. 4 coming in this
# packet, and they are {}\r\n.
(2, 4, payload),
(2, payload),
# 0 bytes after this packet. 2 coming in this packet, and they are
# io.
(0, 2, excess_data)
(0, excess_data)
]

rx_data = card.receive()
Expand All @@ -259,11 +277,11 @@ def test_receive_raises_exception_on_timeout(self, arrange_test):
card._read.side_effect = [
# There are 3 bytes available to read, and there are no more bytes
# to read in this packet.
(4, 0, None),
(3, bytearray()),
# 0 bytes available to read after this packet. 3 coming in this
# packet, and they are {}\r. The lack of a newline at the end will
# cause this test to hit the timeout.
(0, 3, payload)
(0, payload)
]

with patch('notecard.notecard.has_timed_out', return_value=True):
Expand Down Expand Up @@ -330,7 +348,7 @@ def test_transact_returns_none_if_rsp_not_expected(
def test_transact_returns_not_none_if_rsp_expected(
self, arrange_transact_test):
card, req_bytes = arrange_transact_test()
card._read = MagicMock(return_value=(1, None, None))
card._read = MagicMock(return_value=(1, bytearray()))

rsp = card._transact(req_bytes, rsp_expected=True)

Expand All @@ -339,15 +357,15 @@ def test_transact_returns_not_none_if_rsp_expected(
def test_transact_calls_receive_if_rsp_expected(
self, arrange_transact_test):
card, req_bytes = arrange_transact_test()
card._read = MagicMock(return_value=(1, None, None))
card._read = MagicMock(return_value=(1, bytearray()))

card._transact(req_bytes, rsp_expected=True)

card.receive.assert_called_once()

def test_transact_raises_exception_on_timeout(self, arrange_transact_test):
card, req_bytes = arrange_transact_test()
card._read = MagicMock(return_value=(0, None, None))
card._read = MagicMock(return_value=(0, bytearray()))

# Force a timeout.
with patch('notecard.notecard.has_timed_out',
Expand All @@ -357,55 +375,64 @@ def test_transact_raises_exception_on_timeout(self, arrange_transact_test):
'available data.')):
card._transact(req_bytes, rsp_expected=True)

# _read tests
def test_read_sends_the_initial_read_packet_correctly(self, arrange_test):
card = arrange_test()
card._platform_read = MagicMock()
length = 12
# _read tests.
def test_read_sends_the_initial_read_packet_correctly(
self, arrange_read_test):
data_len = 4
data = b'\xDE\xAD\xBE\xEF'
card = arrange_read_test(0, data_len, data)
# To start a read from the Notecard using serial-over-I2C, the host
# should send a 0 byte followed by a byte with the requested read
# length.
expected_packet = bytearray(2)
expected_packet[0] = 0
expected_packet[1] = length
expected_packet[1] = data_len

card._read(length)
card._read(data_len)

card._platform_read.assert_called_once()
assert card._platform_read.call_args[0][0] == expected_packet

def test_read_sizes_read_buf_correctly(self, arrange_test):
card = arrange_test()
card._platform_read = MagicMock()
header_length = 2
data_length = 12
expected_read_buffer_length = header_length + data_length
def test_read_sizes_read_buf_correctly(self, arrange_read_test):
data_len = 4
data = b'\xDE\xAD\xBE\xEF'
card = arrange_read_test(0, data_len, data)
header_len = 2
expected_read_buffer_len = header_len + data_len

card._read(data_length)
card._read(data_len)

card._platform_read.assert_called_once()
assert card._platform_read.call_args[0][1] == bytearray(expected_read_buffer_length)
assert len(card._platform_read.call_args[0][1]) == \
expected_read_buffer_len

def test_read_parses_data_correctly(self, arrange_test):
def test_read_parses_data_correctly(self, arrange_read_test):
available = 8
data_len = 4
data = b'\xDE\xAD\xBE\xEF'
card = arrange_read_test(available, data_len, data)

def platform_read_side_effect(initiate_read_msg, read_buf):
read_buf[0] = available
read_buf[1] = data_len
read_buf[2:] = data

card = arrange_test()
card._platform_read = MagicMock(side_effect=platform_read_side_effect)

actual_available, actual_data_len, actual_data = card._read(len(data))
actual_available, actual_data = card._read(len(data))

card._platform_read.assert_called_once()
assert actual_available == available
assert actual_data_len == data_len
assert actual_data == data

def test_read_raises_exception_if_data_length_does_not_match_data(
self, arrange_read_test):
available = 8
# The reported length is 5, but the actual length is 4.
data_len = 5
data = b'\xDE\xAD\xBE\xEF'
card = arrange_read_test(available, data_len, data)

exception_msg = re.escape(('Serial-over-I2C error: reported data length'
f' ({data_len}) differs from actual data '
f'length ({len(data)}).'))
with pytest.raises(Exception, match=exception_msg):
card._read(len(data))

# _write tests.
def test_write_calls_platform_write_correctly(self, arrange_test):
card = arrange_test()
card._platform_write = MagicMock()
Expand Down
2 changes: 1 addition & 1 deletion test/test_notecard.py
Original file line number Diff line number Diff line change
Expand Up @@ -368,7 +368,7 @@ def test_command_returns_none(self):
card = notecard.Notecard()
card.Transaction = MagicMock()

rsp = card.Command({'cmd': 'card.sleep'})
rsp = card.Command({'cmd': 'hub.set'})

# A command generates no response, by definition.
assert rsp is None
Expand Down
10 changes: 10 additions & 0 deletions test/test_serial.py
Original file line number Diff line number Diff line change
Expand Up @@ -155,6 +155,16 @@ def test_init_creates_appropriate_lock_type(

assert isinstance(card.lock_handle, lock_type)

def test_init_fails_if_not_micropython_and_uart_has_no_in_waiting_attr(
self):
exception_msg = ('Serial communications with the Notecard are not '
'supported for this platform.')

with patch('notecard.notecard.sys.implementation.name', new='cpython'):
with patch('notecard.notecard.OpenSerial.Reset'):
with pytest.raises(Exception, match=exception_msg):
notecard.OpenSerial(42)

@pytest.mark.parametrize(
'platform,available_method',
[
Expand Down

0 comments on commit ab20d7e

Please sign in to comment.