From a48624c8f987b5c828a380f5c2ae489b0c7e6482 Mon Sep 17 00:00:00 2001 From: Brendan <2bndy5@gmail.com> Date: Thu, 24 Oct 2024 15:58:05 -0700 Subject: [PATCH] add examples/node/streamingData improve python streaming data example --- examples/node/ts/acknowledgementPayloads.ts | 50 ++--- examples/node/ts/gettingStarted.ts | 34 ++-- examples/node/ts/scanner.ts | 44 ++--- examples/node/ts/streamingData.ts | 201 ++++++++++++++++++++ examples/python/streaming_data.py | 42 ++-- library/src/radio/rf24/radio.rs | 4 + 6 files changed, 292 insertions(+), 83 deletions(-) create mode 100644 examples/node/ts/streamingData.ts diff --git a/examples/node/ts/acknowledgementPayloads.ts b/examples/node/ts/acknowledgementPayloads.ts index b7fd848..d10b196 100644 --- a/examples/node/ts/acknowledgementPayloads.ts +++ b/examples/node/ts/acknowledgementPayloads.ts @@ -72,24 +72,24 @@ export async function setup(): Promise { * The transmitting node's behavior. * @param count The number of payloads to send */ -export async function master(example: AppState, count: number | null) { - example.radio.stopListening(); +export async function master(app: AppState, count: number | null) { + app.radio.stopListening(); // we'll use a DataView object to store our string and number into a bytearray buffer const outgoing = Buffer.from("Hello \0."); for (let i = 0; i < (count || 5); i++) { - outgoing.writeUint8(example.counter, 7); + outgoing.writeUint8(app.counter, 7); const start = process.hrtime.bigint(); - const result = example.radio.send(outgoing); + const result = app.radio.send(outgoing); const end = process.hrtime.bigint(); if (result) { const elapsed = (end - start) / BigInt(1000); process.stdout.write( `Transmission successful! Time to Transmit: ${elapsed} us. Sent: ` + - `${outgoing.subarray(0, 6).toString()}${example.counter} `, + `${outgoing.subarray(0, 6).toString()}${app.counter} `, ); - example.counter += 1; - if (example.radio.available()) { - const incoming = example.radio.read(); + app.counter += 1; + if (app.radio.available()) { + const incoming = app.radio.read(); const counter = incoming.readUint8(7); console.log( ` Received: ${incoming.subarray(0, 6).toString()}${counter}`, @@ -108,36 +108,36 @@ export async function master(example: AppState, count: number | null) { * The receiving node's behavior. * @param duration The timeout duration (in seconds) to listen after receiving a payload. */ -export function slave(example: AppState, duration: number | null) { - example.radio.startListening(); +export function slave(app: AppState, duration: number | null) { + app.radio.startListening(); // we'll use a DataView object to store our string and number into a bytearray buffer const outgoing = Buffer.from("World \0."); - outgoing.writeUint8(example.counter, 7); - example.radio.writeAckPayload(1, outgoing); + outgoing.writeUint8(app.counter, 7); + app.radio.writeAckPayload(1, outgoing); let timeout = Date.now() + (duration || 6) * 1000; while (Date.now() < timeout) { - const hasRx = example.radio.availablePipe(); + const hasRx = app.radio.availablePipe(); if (hasRx.available) { - const incoming = example.radio.read(); + const incoming = app.radio.read(); const counter = incoming.readUint8(7); console.log( `Received ${incoming.length} bytes on pipe ${hasRx.pipe}: ` + `${incoming.subarray(0, 6).toString()}${counter} Sent: ` + - `${outgoing.subarray(0, 6).toString()}${example.counter}`, + `${outgoing.subarray(0, 6).toString()}${app.counter}`, ); - example.counter = counter; + app.counter = counter; outgoing.writeUint8(counter + 1, 7); - example.radio.writeAckPayload(1, outgoing); + app.radio.writeAckPayload(1, outgoing); timeout = Date.now() + (duration || 6) * 1000; } } - example.radio.stopListening(); // flushes TX FIFO when ACK payloads are enabled + app.radio.stopListening(); // flushes TX FIFO when ACK payloads are enabled } /** * This function prompts the user and performs the specified role for the radio. */ -export async function setRole(example: AppState): Promise { +export async function setRole(app: AppState): Promise { const prompt = "*** Enter 'T' to transmit\n" + "*** Enter 'R' to receive\n" + @@ -149,25 +149,25 @@ export async function setRole(example: AppState): Promise { } switch (input[0].charAt(0).toLowerCase()) { case "t": - await master(example, param); + await master(app, param); return true; case "r": - slave(example, param); + slave(app, param); return true; default: console.log(`'${input[0].charAt(0)}' is an unrecognized input`); return true; case "q": - example.radio.powerDown(); + app.radio.powerDown(); return false; } } export async function main() { - const example = await setup(); - while (await setRole(example)); + const app = await setup(); + while (await setRole(app)); io.close(); - example.radio.powerDown(); + app.radio.powerDown(); } main(); diff --git a/examples/node/ts/gettingStarted.ts b/examples/node/ts/gettingStarted.ts index 2042059..0f9fa83 100644 --- a/examples/node/ts/gettingStarted.ts +++ b/examples/node/ts/gettingStarted.ts @@ -77,16 +77,16 @@ export async function setup(): Promise { * The transmitting node's behavior. * @param count The number of payloads to send */ -export async function master(example: AppState, count: number | null) { - example.radio.stopListening(); +export async function master(app: AppState, count: number | null) { + app.radio.stopListening(); for (let i = 0; i < (count || 5); i++) { const start = process.hrtime.bigint(); - const result = example.radio.send(example.payload); + const result = app.radio.send(app.payload); const end = process.hrtime.bigint(); if (result) { const elapsed = (end - start) / BigInt(1000); console.log(`Transmission successful! Time to Transmit: ${elapsed} us`); - example.payload.writeFloatLE(example.payload.readFloatLE(0) + 0.01, 0); + app.payload.writeFloatLE(app.payload.readFloatLE(0) + 0.01, 0); } else { console.log("Transmission failed or timed out!"); } @@ -98,14 +98,14 @@ export async function master(example: AppState, count: number | null) { * The receiving node's behavior. * @param duration The timeout duration (in seconds) to listen after receiving a payload. */ -export function slave(example: AppState, duration: number | null) { - example.radio.startListening(); +export function slave(app: AppState, duration: number | null) { + app.radio.startListening(); let timeout = Date.now() + (duration || 6) * 1000; while (Date.now() < timeout) { - const hasRx = example.radio.availablePipe(); + const hasRx = app.radio.availablePipe(); if (hasRx.available) { - const incoming = example.radio.read(); - example.payload = incoming; + const incoming = app.radio.read(); + app.payload = incoming; const data = incoming.readFloatLE(0); console.log( `Received ${incoming.length} bytes on pipe ${hasRx.pipe}: ${data}`, @@ -113,13 +113,13 @@ export function slave(example: AppState, duration: number | null) { timeout = Date.now() + (duration || 6) * 1000; } } - example.radio.stopListening(); + app.radio.stopListening(); } /** * This function prompts the user and performs the specified role for the radio. */ -export async function setRole(example: AppState): Promise { +export async function setRole(app: AppState): Promise { const prompt = "*** Enter 'T' to transmit\n" + "*** Enter 'R' to receive\n" + @@ -131,25 +131,25 @@ export async function setRole(example: AppState): Promise { } switch (input[0].charAt(0).toLowerCase()) { case "t": - await master(example, param); + await master(app, param); return true; case "r": - slave(example, param); + slave(app, param); return true; default: console.log(`'${input[0].charAt(0)}' is an unrecognized input`); return true; case "q": - example.radio.powerDown(); + app.radio.powerDown(); return false; } } export async function main() { - const example = await setup(); - while (await setRole(example)); + const app = await setup(); + while (await setRole(app)); io.close(); - example.radio.powerDown(); + app.radio.powerDown(); } main(); diff --git a/examples/node/ts/scanner.ts b/examples/node/ts/scanner.ts index fb144af..c466b27 100644 --- a/examples/node/ts/scanner.ts +++ b/examples/node/ts/scanner.ts @@ -109,7 +109,7 @@ export function printHeader() { /** * The scanner behavior. */ -export async function scan(example: AppState, duration: number | null) { +export async function scan(app: AppState, duration: number | null) { printHeader(); const caches = []; for (let i = 0; i < CHANNELS; i++) { @@ -120,17 +120,17 @@ export async function scan(example: AppState, duration: number | null) { const timeout = Date.now() + (duration || 30) * 1000; while (Date.now() < timeout) { - example.radio.setChannel(channel); - example.radio.startListening(); + app.radio.setChannel(channel); + app.radio.startListening(); await timer.setTimeout(0.13); // needs to be at least 130 microseconds - const rpd = example.radio.rpd; - example.radio.stopListening(); - const foundSignal = example.radio.available(); + const rpd = app.radio.rpd; + app.radio.stopListening(); + const foundSignal = app.radio.available(); - caches[channel] += Number(foundSignal || rpd || example.radio.rpd); + caches[channel] += Number(foundSignal || rpd || app.radio.rpd); if (foundSignal) { - example.radio.flushRx(); // discard any packets (noise) saved in RX FIFO + app.radio.flushRx(); // discard any packets (noise) saved in RX FIFO } const total = caches[channel]; process.stdout.write(total > 0 ? total.toString(16) : "-"); @@ -164,21 +164,21 @@ export async function scan(example: AppState, duration: number | null) { /** * Sniff ambient noise and print it out as hexadecimal string. */ -export function noise(example: AppState, duration: number | null) { +export function noise(app: AppState, duration: number | null) { const timeout = Date.now() + (duration || 10) * 1000; - example.radio.startListening(); + app.radio.startListening(); while ( - example.radio.isListening || - example.radio.getFifoState(false) != FifoState.Empty + app.radio.isListening || + app.radio.getFifoState(false) != FifoState.Empty ) { - const payload = example.radio.read(); + const payload = app.radio.read(); const hexArray = []; for (let i = 0; i < payload.length; i++) { hexArray.push(payload[i].toString(16).padStart(2, "0")); } console.log(hexArray.join(" ")); - if (Date.now() > timeout && example.radio.isListening) { - example.radio.stopListening(); + if (Date.now() > timeout && app.radio.isListening) { + app.radio.stopListening(); } } } @@ -186,7 +186,7 @@ export function noise(example: AppState, duration: number | null) { /** * This function prompts the user and performs the specified role for the radio. */ -export async function setRole(example: AppState): Promise { +export async function setRole(app: AppState): Promise { const prompt = "*** Enter 'S' to scan\n" + "*** Enter 'N' to print noise\n" + @@ -198,25 +198,25 @@ export async function setRole(example: AppState): Promise { } switch (input[0].charAt(0).toLowerCase()) { case "s": - await scan(example, param); + await scan(app, param); return true; case "n": - noise(example, param); + noise(app, param); return true; default: console.log(`'${input[0].charAt(0)}' is an unrecognized input`); return true; case "q": - example.radio.powerDown(); + app.radio.powerDown(); return false; } } export async function main() { - const example = await setup(); - while (await setRole(example)); + const app = await setup(); + while (await setRole(app)); io.close(); - example.radio.powerDown(); + app.radio.powerDown(); } main(); diff --git a/examples/node/ts/streamingData.ts b/examples/node/ts/streamingData.ts new file mode 100644 index 0000000..d823a19 --- /dev/null +++ b/examples/node/ts/streamingData.ts @@ -0,0 +1,201 @@ +import * as readline from "readline/promises"; +import * as fs from "fs"; +import { RF24, PaLevel } from "@rf24/rf24"; + +const io = readline.createInterface({ + input: process.stdin, + output: process.stdout, +}); + +type AppState = { + radio: RF24; +}; + +console.log(module.filename); + +export async function setup(): Promise { + // The radio's CE Pin uses a GPIO number. + // On Linux, consider the device path `/dev/gpiochip`: + // - `` is the gpio chip's identifying number. + // Using RPi4 (or earlier), this number is `0` (the default). + // Using the RPi5, this number is actually `4`. + // The radio's CE pin must connected to a pin exposed on the specified chip. + const CE_PIN = 22; // for GPIO22 + // try detecting RPi5 first; fall back to default + const DEV_GPIO_CHIP = fs.existsSync("/dev/gpiochip4") ? 4 : 0; + + // The radio's CSN Pin corresponds the SPI bus's CS pin (aka CE pin). + // On Linux, consider the device path `/dev/spidev.`: + // - `` is the SPI bus number (defaults to `0`) + // - `` is the CSN pin (must be unique for each device on the same SPI bus) + const CSN_PIN = 0; // aka CE0 for SPI bus 0 (/dev/spidev0.0) + + // create a radio object for the specified hard ware config: + const radio = new RF24(CE_PIN, CSN_PIN, { + devGpioChip: DEV_GPIO_CHIP, + }); + + // initialize the nRF24L01 on the spi bus + radio.begin(); + + // For this example, we will use different addresses + // An address needs to be a buffer object (bytearray) + const address = [Buffer.from("1Node"), Buffer.from("2Node")]; + + // to use different addresses on a pair of radios, we need a variable to + // uniquely identify which address this radio will use to transmit + // 0 uses address[0] to transmit, 1 uses address[1] to transmit + const radioNumber = Number( + (await io.question( + "Which radio is this? Enter '1' or '0' (default is '0') ", + )) == "1", + ); + console.log(`radioNumber is ${radioNumber}`); + //set TX address of RX node into the TX pipe + radio.openTxPipe(address[radioNumber]); // always uses pipe 0 + // set RX address of TX node into an RX pipe + radio.openRxPipe(1, address[1 - radioNumber]); // using pipe 1 + + // set the Power Amplifier level to -12 dBm since this test example is + // usually run with nRF24L01 transceivers in close proximity of each other + radio.setPaLevel(PaLevel.Low); // PaLevel.Max is default + + return { radio: radio }; +} + +export function makePayloads(size: number): Array { + const arr = Array(); + for (let i = 0; i < size; i++) { + // prefix each payload with a letter to indicate which payloads were lost (if any) + const prefix = i + (i < 26 ? 65 : 71); + let payload = String.fromCharCode(prefix); + const middleByte = Math.abs((size - 1) / 2 - i); + for (let j = 0; j < size - 1; j++) { + const byte = + Boolean(j >= (size - 1) / 2 + middleByte) || + Boolean(j < (size - 1) / 2 - middleByte); + payload += String.fromCharCode(Number(byte) + 48); + } + arr.push(Buffer.from(payload)); + } + return arr; +} + +/** + * The transmitting node's behavior. + * @param count The number of streams to send + * @param size The number of payloads (and the payloads' size) in each stream. + */ +export async function master( + app: AppState, + count: number | null, + size: number | null, +) { + // minimum stream size should be at least 6 payloads for this example. + const payloadSize = Math.max(Math.min(size || 32, 32), 6); + const payloads = makePayloads(payloadSize); + // save on transmission time by setting the radio to only transmit the + // number of bytes we need to transmit + app.radio.setPayloadLength(payloadSize); // default is the maximum 32 bytes + + app.radio.stopListening(); // put radio into TX mode + for (let cnt = 0; cnt < (count || 1); cnt++) { + // for each stream + + let failures = 0; + const start = Date.now(); + for (let bufIndex = 0; bufIndex < payloadSize; bufIndex++) { + // for each payload in stream + while (!app.radio.write(payloads[bufIndex])) { + // upload to TX FIFO failed because TX FIFO is full. + // check status flags + app.radio.update(); + const flags = app.radio.getStatusFlags(); + if (flags.txDf) { + // transmission failed + app.radio.rewrite(); // resets txDf flag and reuses top level of TX FIFO + failures += 1; // increment manual retry count + if (failures > 99) { + // too many failures detected + break; // prevent infinite loop + } + } + } + if (failures > 99 && bufIndex < 7 && cnt < 2) { + app.radio.flushTx(); + break; // receiver radio seems unresponsive + } + } + const end = Date.now(); + console.log( + `Transmission took ${end - start} ms with ${failures} failures detected`, + ); + } +} + +/** + * The receiving node's behavior. + * @param duration The timeout duration (in seconds) to listen after receiving a payload. + * @param size The number of bytes in each payload + */ +export function slave( + app: AppState, + duration: number | null, + size: number | null, +) { + app.radio.setPayloadLength(Math.max(Math.min(size || 32, 32), 6)); + let count = 0; + app.radio.startListening(); + let timeout = Date.now() + (duration || 6) * 1000; + while (Date.now() < timeout) { + if (app.radio.available()) { + const incoming = app.radio.read(); + count += 1; + console.log(`Received: ${incoming.toString()} = ${count}`); + timeout = Date.now() + (duration || 6) * 1000; + } + } + app.radio.stopListening(); +} + +/** + * This function prompts the user and performs the specified role for the radio. + */ +export async function setRole(app: AppState): Promise { + const prompt = + "*** Enter 'T' to transmit\n" + + "*** Enter 'R' to receive\n" + + "*** Enter 'Q' to quit\n"; + const input = (await io.question(prompt)).split(" "); + let param: number | null = null; + if (input.length > 1) { + param = Number(input[1]); + } + let size = null; + if (input.length > 2) { + size = Number(input[2]); + } + switch (input[0].charAt(0).toLowerCase()) { + case "t": + await master(app, param, size); + return true; + case "r": + slave(app, param, size); + return true; + default: + console.log(`'${input[0].charAt(0)}' is an unrecognized input`); + return true; + case "q": + app.radio.powerDown(); + return false; + } +} + +export async function main() { + const app = await setup(); + while (await setRole(app)); + io.close(); + app.radio.powerDown(); +} + +main(); diff --git a/examples/python/streaming_data.py b/examples/python/streaming_data.py index cf02d34..337dc9a 100644 --- a/examples/python/streaming_data.py +++ b/examples/python/streaming_data.py @@ -59,30 +59,34 @@ # radio.print_details() -def make_buffer(buf_iter: int, size: int = 32): +def make_payloads(size: int = 32) -> list[bytes]: """return a list of payloads""" # we'll use `size` for the number of payloads in the list and the # payloads' length - # prefix payload with a sequential letter to indicate which - # payloads were lost (if any) - buff = bytes([buf_iter + (65 if 0 <= buf_iter < 26 else 71)]) - for j in range(size - 1): - char = bool(j >= (size - 1) / 2 + abs((size - 1) / 2 - buf_iter)) - char |= bool(j < (size - 1) / 2 - abs((size - 1) / 2 - buf_iter)) - buff += bytes([char + 48]) - return buff + stream = [] + for i in range(size): + # prefix payload with a sequential letter to indicate which + # payloads were lost (if any) + buff = bytes([i + (65 if 0 <= i < 26 else 71)]) + for j in range(size - 1): + char = bool(j >= (size - 1) / 2 + abs((size - 1) / 2 - i)) + char |= bool(j < (size - 1) / 2 - abs((size - 1) / 2 - i)) + buff += bytes([char + 48]) + stream.append(buff) + return stream def master(count: int = 1, size: int = 32): """Uses all 3 levels of the TX FIFO `RF24.writeFast()`""" - if size < 6: - print("setting size to 6;", size, "is not allowed for this test.") - size = 6 + # minimum number of payloads in stream should be at least 6 for this example + size = max(min(size, 32), 6) # save on transmission time by setting the radio to only transmit the - # number of bytes we need to transmit + # number of bytes we need to transmit radio.payload_length = size # the default is the maximum 32 bytes + # create a stream + stream = make_payloads(size) radio.listen = False # ensures the nRF24L01 is in TX mode for cnt in range(count): # transmit the same payloads this many times radio.flush_tx() # clear the TX FIFO so we can use all 3 levels @@ -90,16 +94,16 @@ def master(count: int = 1, size: int = 32): buf_iter = 0 # iterator of payloads for the while loop failures = 0 # keep track of manual retries start_timer = time.monotonic() * 1000 # start timer - while buf_iter < size: # cycle through all the payloads - buf = make_buffer(buf_iter, size) # make a payload - while not radio.write(buf): + for buf_index in range(size): # cycle through all payloads in stream + while not radio.write(stream[buf_iter]): # upload to TX FIFO failed because TX FIFO is full. # check for transmission errors radio.update() flags: StatusFlags = radio.get_status_flags() - if flags.tx_df: # reception failed - failures += 1 # increment manual retries - radio.rewrite() # resets the tx_df flag and reuses payload in TX FIFO + if flags.tx_df: # transmission failed + failures += 1 # increment manual retry count + # rewrite() resets the tx_df flag and reuses top level of TX FIFO + radio.rewrite() if failures > 99: break if failures > 99 and buf_iter < 7 and cnt < 2: diff --git a/library/src/radio/rf24/radio.rs b/library/src/radio/rf24/radio.rs index d15cfe3..e01f86a 100644 --- a/library/src/radio/rf24/radio.rs +++ b/library/src/radio/rf24/radio.rs @@ -240,6 +240,10 @@ where } fn resend(&mut self) -> Result { + if self.is_listening() { + // if in RX mode, prevent infinite loop below + return Ok(false); + } self.rewrite()?; self._delay_impl.delay_ns(10000); // now block until a tx_ds or tx_df event occurs