diff --git a/notecard/notecard.py b/notecard/notecard.py index 15fdeb9..1b15f3e 100644 --- a/notecard/notecard.py +++ b/notecard/notecard.py @@ -572,7 +572,12 @@ def __init__(self, uart_id, debug=False): if sys.implementation.name == 'micropython': self._available = self._available_micropython else: - self._available = self._available_default + if hasattr(self.uart, 'in_waiting'): + self._available = self._available_default + else: + raise NotImplementedError(('Serial communications with the ' + 'Notecard are not supported for this' + ' platform.')) self.Reset() @@ -604,7 +609,12 @@ def _read(self, length): # The rest is the data. data = read_buf[2:] - return (available, data_len, data) + if len(data) != data_len: + raise Exception(('Serial-over-I2C error: reported data length ' + f'({data_len}) differs from actual data length' + f' ({len(data)}).')) + + return available, data def _write(self, data): """Perform a serial-over-I2C write.""" @@ -620,8 +630,8 @@ def receive(self, timeout_secs=CARD_INTRA_TRANSACTION_TIMEOUT_SEC, received_data = bytearray() while True: - available, data_len, data = self._read(read_len) - if data_len > 0: + available, data = self._read(read_len) + if len(data) > 0: received_data += data timeout_secs = CARD_INTRA_TRANSACTION_TIMEOUT_SEC @@ -670,6 +680,8 @@ def transmit(self, data, delay=True): data_left -= chunk_len sent_in_seg += chunk_len + # We delay for CARD_REQUEST_SEGMENT_DELAY_MS ms every time a full + # "segment" of data has been transmitted. if sent_in_seg > CARD_REQUEST_SEGMENT_MAX_LEN: sent_in_seg -= CARD_REQUEST_SEGMENT_MAX_LEN @@ -693,7 +705,7 @@ def _transact(self, req_bytes, rsp_expected, start = start_timeout() available = 0 while available == 0: - available, _, _ = self._read(0) + available, _ = self._read(0) if timeout_secs != 0 and has_timed_out(start, timeout_secs): raise Exception(('Timed out while querying Notecard for ' @@ -730,14 +742,14 @@ def Reset(self): read_len = 0 while not has_timed_out(start, CARD_RESET_DRAIN_MS / 1000): try: - available, data_len, data = self._read(read_len) + available, data = self._read(read_len) except Exception as e: if self._debug: print(e) time.sleep(CARD_REQUEST_SEGMENT_DELAY_MS / 1000) continue - if data_len > 0: + if len(data) > 0: something_found = True # The Notecard responds to a bare `\n` with `\r\n`. If # we get any other characters back, it means the host diff --git a/test/test_i2c.py b/test/test_i2c.py index 144fc95..acbd46f 100644 --- a/test/test_i2c.py +++ b/test/test_i2c.py @@ -1,6 +1,7 @@ import os import sys import pytest +import re from unittest.mock import MagicMock, patch from .unit_test_utils import TrueOnNthIteration, BooleanToggle @@ -59,12 +60,29 @@ def _arrange_transact_test(): yield _arrange_transact_test +@pytest.fixture +def arrange_read_test(arrange_test): + def _arrange_read_test(available, data_len, data): + def _platform_read_side_effect(initiate_read_msg, read_buf): + read_buf[0] = available + read_buf[1] = data_len + read_buf[2:] = data + + card = arrange_test() + card._platform_read = MagicMock( + side_effect=_platform_read_side_effect) + + return card + + yield _arrange_read_test + + class TestI2C: # Reset tests. def test_reset_succeeds_on_good_notecard_response( self, arrange_reset_test): card = arrange_reset_test() - card._read.return_value = (0, 2, b'\r\n') + card._read.return_value = (0, b'\r\n') with patch('notecard.notecard.has_timed_out', side_effect=TrueOnNthIteration(2)): @@ -75,7 +93,7 @@ def test_reset_succeeds_on_good_notecard_response( def test_reset_sends_a_newline_to_clear_stale_response( self, arrange_reset_test): card = arrange_reset_test() - card._read.return_value = (0, 2, b'\r\n') + card._read.return_value = (0, b'\r\n') with patch('notecard.notecard.has_timed_out', side_effect=TrueOnNthIteration(2)): @@ -85,7 +103,7 @@ def test_reset_sends_a_newline_to_clear_stale_response( def test_reset_locks_and_unlocks(self, arrange_reset_test): card = arrange_reset_test() - card._read.return_value = (0, 2, b'\r\n') + card._read.return_value = (0, b'\r\n') with patch('notecard.notecard.has_timed_out', side_effect=TrueOnNthIteration(2)): @@ -220,10 +238,10 @@ def test_receive_returns_all_data_bytes_from_read(self, arrange_test): card._read.side_effect = [ # There are 4 bytes available to read, and there are no more bytes # to read in this packet. - (4, 0, None), + (4, bytearray()), # 0 bytes available to read after this packet. 4 coming in this # packet, and they are {}\r\n. - (0, 4, payload) + (0, payload) ] rx_data = card.receive() @@ -239,13 +257,13 @@ def test_receive_keeps_reading_if_data_available_after_newline( card._read.side_effect = [ # There are 4 bytes available to read, and there are no more bytes # to read in this packet. - (4, 0, None), + (4, bytearray()), # 2 bytes available to read after this packet. 4 coming in this # packet, and they are {}\r\n. - (2, 4, payload), + (2, payload), # 0 bytes after this packet. 2 coming in this packet, and they are # io. - (0, 2, excess_data) + (0, excess_data) ] rx_data = card.receive() @@ -259,11 +277,11 @@ def test_receive_raises_exception_on_timeout(self, arrange_test): card._read.side_effect = [ # There are 3 bytes available to read, and there are no more bytes # to read in this packet. - (4, 0, None), + (3, bytearray()), # 0 bytes available to read after this packet. 3 coming in this # packet, and they are {}\r. The lack of a newline at the end will # cause this test to hit the timeout. - (0, 3, payload) + (0, payload) ] with patch('notecard.notecard.has_timed_out', return_value=True): @@ -330,7 +348,7 @@ def test_transact_returns_none_if_rsp_not_expected( def test_transact_returns_not_none_if_rsp_expected( self, arrange_transact_test): card, req_bytes = arrange_transact_test() - card._read = MagicMock(return_value=(1, None, None)) + card._read = MagicMock(return_value=(1, bytearray())) rsp = card._transact(req_bytes, rsp_expected=True) @@ -339,7 +357,7 @@ def test_transact_returns_not_none_if_rsp_expected( def test_transact_calls_receive_if_rsp_expected( self, arrange_transact_test): card, req_bytes = arrange_transact_test() - card._read = MagicMock(return_value=(1, None, None)) + card._read = MagicMock(return_value=(1, bytearray())) card._transact(req_bytes, rsp_expected=True) @@ -347,7 +365,7 @@ def test_transact_calls_receive_if_rsp_expected( def test_transact_raises_exception_on_timeout(self, arrange_transact_test): card, req_bytes = arrange_transact_test() - card._read = MagicMock(return_value=(0, None, None)) + card._read = MagicMock(return_value=(0, bytearray())) # Force a timeout. with patch('notecard.notecard.has_timed_out', @@ -357,55 +375,64 @@ def test_transact_raises_exception_on_timeout(self, arrange_transact_test): 'available data.')): card._transact(req_bytes, rsp_expected=True) - # _read tests - def test_read_sends_the_initial_read_packet_correctly(self, arrange_test): - card = arrange_test() - card._platform_read = MagicMock() - length = 12 + # _read tests. + def test_read_sends_the_initial_read_packet_correctly( + self, arrange_read_test): + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(0, data_len, data) # To start a read from the Notecard using serial-over-I2C, the host # should send a 0 byte followed by a byte with the requested read # length. expected_packet = bytearray(2) expected_packet[0] = 0 - expected_packet[1] = length + expected_packet[1] = data_len - card._read(length) + card._read(data_len) card._platform_read.assert_called_once() assert card._platform_read.call_args[0][0] == expected_packet - def test_read_sizes_read_buf_correctly(self, arrange_test): - card = arrange_test() - card._platform_read = MagicMock() - header_length = 2 - data_length = 12 - expected_read_buffer_length = header_length + data_length + def test_read_sizes_read_buf_correctly(self, arrange_read_test): + data_len = 4 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(0, data_len, data) + header_len = 2 + expected_read_buffer_len = header_len + data_len - card._read(data_length) + card._read(data_len) card._platform_read.assert_called_once() - assert card._platform_read.call_args[0][1] == bytearray(expected_read_buffer_length) + assert len(card._platform_read.call_args[0][1]) == \ + expected_read_buffer_len - def test_read_parses_data_correctly(self, arrange_test): + def test_read_parses_data_correctly(self, arrange_read_test): available = 8 data_len = 4 data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(available, data_len, data) - def platform_read_side_effect(initiate_read_msg, read_buf): - read_buf[0] = available - read_buf[1] = data_len - read_buf[2:] = data - - card = arrange_test() - card._platform_read = MagicMock(side_effect=platform_read_side_effect) - - actual_available, actual_data_len, actual_data = card._read(len(data)) + actual_available, actual_data = card._read(len(data)) card._platform_read.assert_called_once() assert actual_available == available - assert actual_data_len == data_len assert actual_data == data + def test_read_raises_exception_if_data_length_does_not_match_data( + self, arrange_read_test): + available = 8 + # The reported length is 5, but the actual length is 4. + data_len = 5 + data = b'\xDE\xAD\xBE\xEF' + card = arrange_read_test(available, data_len, data) + + exception_msg = re.escape(('Serial-over-I2C error: reported data length' + f' ({data_len}) differs from actual data ' + f'length ({len(data)}).')) + with pytest.raises(Exception, match=exception_msg): + card._read(len(data)) + + # _write tests. def test_write_calls_platform_write_correctly(self, arrange_test): card = arrange_test() card._platform_write = MagicMock() diff --git a/test/test_notecard.py b/test/test_notecard.py index fcf8416..4cdbb8d 100644 --- a/test/test_notecard.py +++ b/test/test_notecard.py @@ -368,7 +368,7 @@ def test_command_returns_none(self): card = notecard.Notecard() card.Transaction = MagicMock() - rsp = card.Command({'cmd': 'card.sleep'}) + rsp = card.Command({'cmd': 'hub.set'}) # A command generates no response, by definition. assert rsp is None diff --git a/test/test_serial.py b/test/test_serial.py index 5f6b1e1..198e47e 100644 --- a/test/test_serial.py +++ b/test/test_serial.py @@ -155,6 +155,16 @@ def test_init_creates_appropriate_lock_type( assert isinstance(card.lock_handle, lock_type) + def test_init_fails_if_not_micropython_and_uart_has_no_in_waiting_attr( + self): + exception_msg = ('Serial communications with the Notecard are not ' + 'supported for this platform.') + + with patch('notecard.notecard.sys.implementation.name', new='cpython'): + with patch('notecard.notecard.OpenSerial.Reset'): + with pytest.raises(Exception, match=exception_msg): + notecard.OpenSerial(42) + @pytest.mark.parametrize( 'platform,available_method', [