Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add support for service $23 - read memory by address #114

Merged
merged 5 commits into from
Jul 15, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ Discovers and utilizes various ISO 14229-1 services.
- ecu_reset - Reset an ECU
- testerpresent - Force an elevated session against an ECU to stay active
- dump_dids - Dumps values of Dynamic Data Identifiers (DIDs)
- read_mem - Read memory from an ECU
- auto - Fully automated diagnostics scan, by using the already existing UDS submodules

Details here: [uds module](documentation/uds.md)
Expand Down
151 changes: 150 additions & 1 deletion caringcaribou/modules/uds.py
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@
DUMP_DID_MAX = 0xFFFF
DUMP_DID_TIMEOUT = 0.2

MEM_START_ADDR = 0
MEM_LEN = 0x100
MEM_SIZE = 0x10
ADDR_BYTE_SIZE = 4
MEM_LEN_BYTE_SIZE = 2



def uds_discovery(min_id, max_id, blacklist_args, auto_blacklist_duration,
delay, verify, print_results=True):
Expand Down Expand Up @@ -1103,6 +1110,109 @@ def dump_dids(arb_id_request, arb_id_response, timeout,
print("Done!")
return responses

def __read_mem_wrapper(args):
"""Wrapper used to initiate memory read"""
arb_id_request = args.src
arb_id_response = args.dst
timeout = args.timeout
start_addr = args.start_addr
mem_length = args.mem_length
mem_size = args.mem_size
address_byte_size = args.address_byte_size
memory_length_byte_size = args.memory_length_byte_size
print_results = True
outfile = args.outfile

results = read_memory(arb_id_request, arb_id_response, timeout, start_addr, mem_length, mem_size, address_byte_size,
memory_length_byte_size, print_results)
if outfile:
with open(outfile, 'w') as f:
for addr,data in results:
f.write(f'{addr:08x} {bytes(data[1:]).hex()}\n')


def read_memory(arb_id_request, arb_id_response, timeout,
start_addr=MEM_START_ADDR, mem_length=MEM_LEN, mem_size=MEM_SIZE, address_byte_size=ADDR_BYTE_SIZE,
memory_length_byte_size=MEM_LEN_BYTE_SIZE, print_results=True):
"""
Sends read memory messages to 'arb_id_request'.
Returns a list of positive responses received from 'arb_id_response' within
'timeout' seconds or an empty list if no positive responses were received.
:param arb_id_request: arbitration ID for requests
:param arb_id_response: arbitration ID for responses
:param timeout: seconds to wait for response before timeout, or None
for default UDS timeout
:param start_addr: starting address to read
:param mem_length: maximum device identifier to read
:param mem_size: number of bytes to read from the controller
:param address_byte_size: number of bytes of the memory address parameter
:param memory_length_byte_size: number of bytes of the memory length parameter
:param print_results: whether progress should be printed to stdout
:type address_byte_size: int
:type memory_length_byte_size: int
:type arb_id_request: int
:type arb_id_response: int
:type timeout: float or None
:type start_addr: int
:type mem_length: int
:type mem_size: int
:type print_results: bool
:return: list of tuples containing memory address and response bytes on success,
empty list if no responses
:rtype [(int, [int])] or []
"""
_max_memory_space = (2 ** (8 * address_byte_size) - 1)
# Sanity checks
if isinstance(timeout, float) and timeout < 0.0:
raise ValueError("Timeout value ({0}) cannot be negative"
.format(timeout))
if start_addr < 0:
raise ValueError("Start Address '{:x}' must be a positive integer".format(start_addr))
if start_addr + mem_length > _max_memory_space:
raise OverflowError("Start Address (0x{:x}) plus Memory Length (0x{:x}) "
"will exceed the maximum memory address space (0x{:x})"
.format(start_addr, mem_length, _max_memory_space))

responses = []
with IsoTp(arb_id_request=arb_id_request,
arb_id_response=arb_id_response) as tp:

# Setup filter for incoming messages
tp.set_filter_single_arbitration_id(arb_id_response)
with Iso14229_1(tp) as uds:
# Set timeout
if timeout is not None:
uds.P3_CLIENT = timeout
if print_results:
print('Dumping Memory in range 0x{:08x}-0x{:08x}\n'.format(
start_addr, start_addr + mem_length-1))
print('Identified Addresses:')
print('Address Value (hex)')
for identifier in range(start_addr, start_addr + mem_length, mem_size):
response = uds.read_memory_by_address(memory_address=identifier, memory_size=mem_size,
address_and_length_format=(memory_length_byte_size << 4) + address_byte_size)

if response and Iso14229_1.is_positive_response(response):
# filter extraneous results ie keep only $23 responses
if response[0] == 0x63:
responses.append((identifier, response))
# response [0] = positive response SID (0x63)
# response [1:] = data returned from memory read
if print_results and len(response) >= 2:
print('0x{:08x}'.format(identifier), list_to_hex_str(response[1:]))
# got a response but it's negative
# TODO - respond differently based on response
# e.g. service not supported in active session vs service not supported vs authorization required
# right now user has to decode the NRC which is not great but we need to write code to translate raw NRC to human redable
elif response:
print(f"Could not dump 0x{mem_size:04x} bytes of memory from address 0x{identifier:08x} - received response: {bytes(response).hex(' ')}")
# this would be a good place to add code to unlock the ECU (if you know how and have the key)
# but to keep this general, we'll just notify user

if print_results:
print("\nDone!")
return responses


def __parse_args(args):
"""Parser for module arguments"""
Expand All @@ -1120,7 +1230,8 @@ def __parse_args(args):
caringcaribou uds testerpresent 0x733
caringcaribou uds security_seed 0x3 0x1 0x733 0x633 -r 1 -d 0.5
caringcaribou uds dump_dids 0x733 0x633
caringcaribou uds dump_dids 0x733 0x633 --min_did 0x6300 --max_did 0x6fff -t 0.1""")
caringcaribou uds dump_dids 0x733 0x633 --min_did 0x6300 --max_did 0x6fff -t 0.1
caringcaribou uds read_mem 0x733 0x633 --start_addr 0x0200 --mem_length 0x10000""")
subparsers = parser.add_subparsers(dest="module_function")
subparsers.required = True

Expand Down Expand Up @@ -1295,6 +1406,44 @@ def __parse_args(args):
help="maximum device identifier (DID) to read (default: 0xFFFF)")
parser_did.set_defaults(func=__dump_dids_wrapper)

# Parser for read_mem
parser_mem = subparsers.add_parser("read_mem")
parser_mem.add_argument("src",
type=parse_int_dec_or_hex,
help="arbitration ID to transmit to")
parser_mem.add_argument("dst",
type=parse_int_dec_or_hex,
help="arbitration ID to listen to")
parser_mem.add_argument("-t", "--timeout",
type=float, metavar="T",
default=DUMP_DID_TIMEOUT,
help="wait T seconds for response before "
"timeout")
parser_mem.add_argument("--start_addr",
type=parse_int_dec_or_hex,
default=MEM_START_ADDR,
help=f"starting address (default: {MEM_START_ADDR})")
parser_mem.add_argument("--mem_length",
type=parse_int_dec_or_hex,
default=MEM_LEN,
help=f"number of bytes to read (default: {MEM_LEN})")
parser_mem.add_argument("--mem_size",
type=parse_int_dec_or_hex,
default=MEM_SIZE,
help=f"numbers of bytes to return per request (default: {MEM_SIZE})")
parser_mem.add_argument("--address_byte_size",
type=parse_int_dec_or_hex,
default=ADDR_BYTE_SIZE,
help=f"numbers of bytes of the address (default: {ADDR_BYTE_SIZE})")
parser_mem.add_argument("--memory_length_byte_size",
type=parse_int_dec_or_hex,
default=MEM_LEN_BYTE_SIZE,
help=f"numbers of bytes of the memory length parameter (default: {MEM_LEN_BYTE_SIZE})")
parser_mem.add_argument("--outfile",
help="filename to write output to")
parser_mem.set_defaults(func=__read_mem_wrapper)

# Parser for auto
parser_auto = subparsers.add_parser("auto")
parser_auto.add_argument("-min",
type=parse_int_dec_or_hex, default=None,
Expand Down
25 changes: 25 additions & 0 deletions caringcaribou/tests/test_module_uds.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for adding a relevant test along with the code - this is very appreciated!

Original file line number Diff line number Diff line change
Expand Up @@ -221,4 +221,29 @@ def test_dump_dids(self):
# since we don't keep responses that don't align with the requested DID
self.assertEqual(expected_response_cnt, len(responses))

def test_read_mem(self):
timeout = None
start_addr = 1
mem_length = 0x10
mem_size = 0x10
address_byte_size = 2
memory_length_byte_size = 2
print_results = False

expected_response = [0x63]
expected_response.extend(list(range(1,0x11)))
responses = uds.read_memory(arb_id_request=self.ARB_ID_REQUEST,
arb_id_response=self.ARB_ID_RESPONSE,
timeout=timeout,
start_addr=start_addr,
mem_length=mem_length,
mem_size=mem_size,
address_byte_size=address_byte_size,
memory_length_byte_size=memory_length_byte_size,
print_results=print_results)
# make sure we got a response
# response looks like ((1, [0x63, 1, 2...]))
# so we need the second element of the first element
expected_response_cnt = 1
self.assertEqual(expected_response_cnt, len(responses))
self.assertListEqual(responses[0][1], expected_response)
4 changes: 2 additions & 2 deletions caringcaribou/utils/iso14229_1.py
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Very happy to see this bug caught and fixed 😅 Excellent!

Original file line number Diff line number Diff line change
Expand Up @@ -346,8 +346,8 @@ def read_memory_by_address(self, address_and_length_format,
:return: Response data if successful,
None otherwise
"""
addr_sz_fmt = (address_and_length_format >> 4) & 0xF
data_sz_fmt = (address_and_length_format & 0xF)
addr_sz_fmt = address_and_length_format & 0xF
data_sz_fmt = (address_and_length_format >> 4) & 0xF

request = [0] * (1 + 1 + addr_sz_fmt + data_sz_fmt)
request[0] = ServiceID.READ_MEMORY_BY_ADDRESS
Expand Down
8 changes: 5 additions & 3 deletions documentation/uds.md
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,7 @@ Supported modes:
* ecu_reset - Reset an ECU
* testerpresent - Force an elevated diagnostics session against an ECU to stay active
* dump_dids - Dumps values of Dynamic Data Identifiers (DIDs)
* read_mem - Read memory from an ECU
* auto - Fully automated diagnostics scan, by using the already existing UDS submodules

As always, module help can be shown by adding the `-h` flag (as shown below). You can also show help for a specific mode by specifying the mode followed by `-h`, e.g. `caringcaribou uds discovery -h` or `caringcaribou uds testerpresent -h`
Expand All @@ -25,13 +26,13 @@ CARING CARIBOU v0.x
Loaded module 'uds'

usage: caringcaribou uds [-h]
{discovery,services,ecu_reset,testerpresent,security_seed,dump_dids}
{discovery,services,ecu_reset,testerpresent,security_seed,dump_dids,read_mem}
...

Universal Diagnostic Services module for CaringCaribou

positional arguments:
{discovery,services,ecu_reset,testerpresent,security_seed,dump_dids}
{discovery,services,ecu_reset,testerpresent,security_seed,dump_dids,read_mem}

optional arguments:
-h, --help show this help message and exit
Expand All @@ -46,6 +47,7 @@ Example usage:
caringcaribou uds security_seed 0x3 0x1 0x733 0x633 -r 1 -d 0.5
caringcaribou uds dump_dids 0x733 0x633
caringcaribou uds dump_dids 0x733 0x633 --min_did 0x6300 --max_did 0x6fff -t 0.1
caringcaribou uds read_mem 0x733 0x633 --start_addr 0x0 --mem_length 0x1000 --mem_size 0x100 --outfile memory_0_1000_100
```

## Discovery
Expand Down Expand Up @@ -235,4 +237,4 @@ options:
-t T, --timeout T wait T seconds for response before timeout (default: 0.2)
--min_did MIN_DID minimum device identifier (DID) to read (default: 0x0000)
--max_did MAX_DID maximum device identifier (DID) to read (default: 0xFFFF)
```
```