diff --git a/.github/workflows/test.yml b/.github/workflows/test.yml new file mode 100644 index 0000000..7fffb8c --- /dev/null +++ b/.github/workflows/test.yml @@ -0,0 +1,47 @@ +name: Test and Build + +on: [push, pull_request] + +env: + CARGO_TERM_COLOR: always + +jobs: + test: + runs-on: ${{ matrix.os }} + strategy: + fail-fast: false + matrix: + include: + - os: ubuntu-latest + TARGET: x86_64-unknown-linux-musl + + - os: macos-latest + TARGET: x86_64-apple-darwin + + - os: windows-latest + TARGET: x86_64-pc-windows-msvc + + steps: + + - name: Checkout + uses: actions/checkout@v4 + + - name: Use cached dependencies + uses: Swatinem/rust-cache@v2 + with: + key: "${{ matrix.os }}-${{ matrix.TARGET }}-${{ hashFiles('**/Cargo.lock') }}" + shared-key: "shared" + + - name: Install build dependencies - Rustup + run: | + curl --proto '=https' --tlsv1.2 -sSf https://sh.rustup.rs | sh -s -- --default-toolchain stable --profile minimal --target ${{ matrix.TARGET }} -y + echo "$HOME/.cargo/bin" >> $GITHUB_PATH + + - name: Building lib for ${{ matrix.os }} ${{ matrix.target }} + run: cargo build --release --locked --target ${{matrix.target}} --verbose + + - name: Building tests for ${{ matrix.os }} ${{ matrix.target }} + run: cargo test --no-run --locked --target ${{matrix.target}} --verbose + + - name: Running tests for ${{ matrix.os }} ${{ matrix.target }} + run: cargo test --locked --target ${{matrix.target}} --verbose diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..ea8c4bf --- /dev/null +++ b/.gitignore @@ -0,0 +1 @@ +/target diff --git a/Cargo.lock b/Cargo.lock new file mode 100644 index 0000000..172bf55 --- /dev/null +++ b/Cargo.lock @@ -0,0 +1,1169 @@ +# This file is automatically @generated by Cargo. +# It is not intended for manual editing. +version = 3 + +[[package]] +name = "addr2line" +version = "0.24.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "dfbe277e56a376000877090da837660b4427aad530e3028d44e0bffe4f89a1c1" +dependencies = [ + "gimli", +] + +[[package]] +name = "adler2" +version = "2.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "512761e0bb2578dd7380c6baaa0f4ce03e84f95e960231d1dec8bf4d7d6e2627" + +[[package]] +name = "aho-corasick" +version = "1.1.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e60d3430d3a69478ad0993f19238d2df97c507009a52b3c10addcd7f6bcb916" +dependencies = [ + "memchr", +] + +[[package]] +name = "anes" +version = "0.1.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4b46cbb362ab8752921c97e041f5e366ee6297bd428a31275b9fcf1e380f7299" + +[[package]] +name = "anstyle" +version = "1.0.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1bec1de6f59aedf83baf9ff929c98f2ad654b97c9510f4e70cf6f661d49fd5b1" + +[[package]] +name = "anyhow" +version = "1.0.89" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "86fdf8605db99b54d3cd748a44c6d04df638eb5dafb219b135d0149bd0db01f6" + +[[package]] +name = "async-trait" +version = "0.1.83" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "721cae7de5c34fbb2acd27e21e6d2cf7b886dce0c27388d46c4e6c47ea4318dd" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "autocfg" +version = "1.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ace50bade8e6234aa140d9a2f552bbee1db4d353f69b8217bc503490fc1a9f26" + +[[package]] +name = "backtrace" +version = "0.3.74" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8d82cb332cdfaed17ae235a638438ac4d4839913cc2af585c3c6746e8f8bee1a" +dependencies = [ + "addr2line", + "cfg-if", + "libc", + "miniz_oxide", + "object", + "rustc-demangle", + "windows-targets", +] + +[[package]] +name = "bitflags" +version = "1.3.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bef38d45163c2f1dde094a7dfd33ccf595c92905c8f8f4fdc18d06fb1037718a" + +[[package]] +name = "bitflags" +version = "2.6.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b048fb63fd8b5923fc5aa7b340d8e156aec7ec02f0c78fa8a6ddc2613f6f71de" + +[[package]] +name = "bumpalo" +version = "3.16.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "79296716171880943b8470b5f8d03aa55eb2e645a4874bdbb28adb49162e012c" + +[[package]] +name = "byteorder" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1fd0f2584146f6f2ef48085050886acf353beff7305ebd1ae69500e27c67f64b" + +[[package]] +name = "bytes" +version = "1.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "428d9aa8fbc0670b7b8d6030a7fadd0f86151cae55e4dbbece15f3780a3dfaf3" + +[[package]] +name = "cast" +version = "0.3.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "37b2a672a2cb129a2e41c10b1224bb368f9f37a2b16b612598138befd7b37eb5" + +[[package]] +name = "cfg-if" +version = "1.0.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "baf1de4339761588bc0619e3cbc0120ee582ebb74b53b4efbf79117bd2da40fd" + +[[package]] +name = "ciborium" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "42e69ffd6f0917f5c029256a24d0161db17cea3997d185db0d35926308770f0e" +dependencies = [ + "ciborium-io", + "ciborium-ll", + "serde", +] + +[[package]] +name = "ciborium-io" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05afea1e0a06c9be33d539b876f1ce3692f4afea2cb41f740e7743225ed1c757" + +[[package]] +name = "ciborium-ll" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "57663b653d948a338bfb3eeba9bb2fd5fcfaecb9e199e87e1eda4d9e8b240fd9" +dependencies = [ + "ciborium-io", + "half", +] + +[[package]] +name = "clap" +version = "4.5.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b97f376d85a664d5837dbae44bf546e6477a679ff6610010f17276f686d867e8" +dependencies = [ + "clap_builder", +] + +[[package]] +name = "clap_builder" +version = "4.5.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19bc80abd44e4bed93ca373a0704ccbd1b710dc5749406201bb018272808dc54" +dependencies = [ + "anstyle", + "clap_lex", +] + +[[package]] +name = "clap_lex" +version = "0.7.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1462739cb27611015575c0c11df5df7601141071f07518d56fcc1be504cbec97" + +[[package]] +name = "crc-any" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a62ec9ff5f7965e4d7280bd5482acd20aadb50d632cf6c1d74493856b011fa73" + +[[package]] +name = "criterion" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f2b12d017a929603d80db1831cd3a24082f8137ce19c69e6447f54f5fc8d692f" +dependencies = [ + "anes", + "cast", + "ciborium", + "clap", + "criterion-plot", + "futures", + "is-terminal", + "itertools", + "num-traits", + "once_cell", + "oorandom", + "plotters", + "rayon", + "regex", + "serde", + "serde_derive", + "serde_json", + "tinytemplate", + "tokio", + "walkdir", +] + +[[package]] +name = "criterion-plot" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6b50826342786a51a89e2da3a28f1c32b06e387201bc2d19791f622c673706b1" +dependencies = [ + "cast", + "itertools", +] + +[[package]] +name = "crossbeam-deque" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "613f8cc01fe9cf1a3eb3d7f488fd2fa8388403e97039e2f73692932e291a770d" +dependencies = [ + "crossbeam-epoch", + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-epoch" +version = "0.9.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5b82ac4a3c2ca9c3460964f020e1402edd5753411d7737aa39c3714ad1b5420e" +dependencies = [ + "crossbeam-utils", +] + +[[package]] +name = "crossbeam-utils" +version = "0.8.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "22ec99545bb0ed0ea7bb9b8e1e9122ea386ff8a48c0922e43f36d45ab09e0e80" + +[[package]] +name = "crunchy" +version = "0.2.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "7a81dae078cea95a014a339291cec439d2f232ebe854a9d672b796c6afafa9b7" + +[[package]] +name = "dev-utils" +version = "0.1.0" +dependencies = [ + "mavlink", + "rand", +] + +[[package]] +name = "either" +version = "1.13.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "60b1af1c220855b6ceac025d3f6ecdd2b7c4894bfe9cd9bda4fbb4bc7c0d4cf0" + +[[package]] +name = "futures" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65bc07b1a8bc7c85c5f2e110c476c7389b4554ba72af57d8445ea63a576b0876" +dependencies = [ + "futures-channel", + "futures-core", + "futures-executor", + "futures-io", + "futures-sink", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-channel" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2dff15bf788c671c1934e366d07e30c1814a8ef514e1af724a602e8a2fbe1b10" +dependencies = [ + "futures-core", + "futures-sink", +] + +[[package]] +name = "futures-core" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "05f29059c0c2090612e8d742178b0580d2dc940c837851ad723096f87af6663e" + +[[package]] +name = "futures-executor" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e28d1d997f585e54aebc3f97d39e72338912123a67330d723fdbb564d646c9f" +dependencies = [ + "futures-core", + "futures-task", + "futures-util", +] + +[[package]] +name = "futures-io" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9e5c1b78ca4aae1ac06c48a526a655760685149f0d465d21f37abfe57ce075c6" + +[[package]] +name = "futures-macro" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "162ee34ebcb7c64a8abebc059ce0fee27c2262618d7b60ed8faf72fef13c3650" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "futures-sink" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e575fab7d1e0dcb8d0c7bcf9a63ee213816ab51902e6d244a95819acacf1d4f7" + +[[package]] +name = "futures-task" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f90f7dce0722e95104fcb095585910c0977252f286e354b5e3bd38902cd99988" + +[[package]] +name = "futures-util" +version = "0.3.31" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9fa08315bb612088cc391249efdc3bc77536f16c91f6cf495e6fbe85b20a4a81" +dependencies = [ + "futures-channel", + "futures-core", + "futures-io", + "futures-macro", + "futures-sink", + "futures-task", + "memchr", + "pin-project-lite", + "pin-utils", + "slab", +] + +[[package]] +name = "getrandom" +version = "0.2.15" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c4567c8db10ae91089c99af84c68c38da3ec2f087c3f82960bcdbf3656b6f4d7" +dependencies = [ + "cfg-if", + "libc", + "wasi", +] + +[[package]] +name = "gimli" +version = "0.31.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07e28edb80900c19c28f1072f2e8aeca7fa06b23cd4169cefe1af5aa3260783f" + +[[package]] +name = "half" +version = "2.4.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6dd08c532ae367adf81c312a4580bc67f1d0fe8bc9c460520283f4c0ff277888" +dependencies = [ + "cfg-if", + "crunchy", +] + +[[package]] +name = "hermit-abi" +version = "0.3.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d231dfb89cfffdbc30e7fc41579ed6066ad03abda9e567ccafae602b97ec5024" + +[[package]] +name = "hermit-abi" +version = "0.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fbf6a919d6cf397374f7dfeeea91d974c7c0a7221d0d0f4f20d859d329e53fcc" + +[[package]] +name = "is-terminal" +version = "0.4.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "261f68e344040fbd0edea105bef17c66edf46f984ddb1115b775ce31be948f4b" +dependencies = [ + "hermit-abi 0.4.0", + "libc", + "windows-sys", +] + +[[package]] +name = "itertools" +version = "0.10.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b0fd2260e829bddf4cb6ea802289de2f86d6a7a690192fbe91b3f46e0f2c8473" +dependencies = [ + "either", +] + +[[package]] +name = "itoa" +version = "1.0.11" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "49f1f14873335454500d59611f1cf4a4b0f786f9ac11f4312a78e4cf2566695b" + +[[package]] +name = "js-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6a88f1bda2bd75b0452a14784937d796722fdebfe50df998aeb3f0b7603019a9" +dependencies = [ + "wasm-bindgen", +] + +[[package]] +name = "lazy_static" +version = "1.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bbd2bcb4c963f2ddae06a2efc7e9f3591312473c50c6685e1f298068316e66fe" + +[[package]] +name = "libc" +version = "0.2.159" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "561d97a539a36e26a9a5fad1ea11a3039a67714694aaa379433e580854bc3dc5" + +[[package]] +name = "lock_api" +version = "0.4.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "07af8b9cdd281b7915f413fa73f29ebd5d55d0d3f0155584dade1ff18cea1b17" +dependencies = [ + "autocfg", + "scopeguard", +] + +[[package]] +name = "log" +version = "0.4.22" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a7a70ba024b9dc04c27ea2f0c0548feb474ec5c54bba33a7f72f873a39d07b24" + +[[package]] +name = "mavlink" +version = "0.13.2" +source = "git+https://github.com/mavlink/rust-mavlink#5f2ecbe856a02e7bd17f43de63c8136db7947715" +dependencies = [ + "bitflags 1.3.2", + "mavlink-bindgen", + "mavlink-core", + "num-derive", + "num-traits", +] + +[[package]] +name = "mavlink-bindgen" +version = "0.13.2" +source = "git+https://github.com/mavlink/rust-mavlink#5f2ecbe856a02e7bd17f43de63c8136db7947715" +dependencies = [ + "crc-any", + "lazy_static", + "proc-macro2", + "quick-xml", + "quote", + "thiserror", +] + +[[package]] +name = "mavlink-codec" +version = "0.1.0" +dependencies = [ + "anyhow", + "bytes", + "criterion", + "dev-utils", + "futures", + "log", + "mavlink", + "rand", + "thiserror", + "tokio", + "tokio-stream", + "tokio-util", + "tracing", +] + +[[package]] +name = "mavlink-core" +version = "0.13.2" +source = "git+https://github.com/mavlink/rust-mavlink#5f2ecbe856a02e7bd17f43de63c8136db7947715" +dependencies = [ + "async-trait", + "byteorder", + "crc-any", + "tokio", +] + +[[package]] +name = "memchr" +version = "2.7.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "78ca9ab1a0babb1e7d5695e3530886289c18cf2f87ec19a575a0abdce112e3a3" + +[[package]] +name = "miniz_oxide" +version = "0.8.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2d80299ef12ff69b16a84bb182e3b9df68b5a91574d3d4fa6e41b65deec4df1" +dependencies = [ + "adler2", +] + +[[package]] +name = "mio" +version = "1.0.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "80e04d1dcff3aae0704555fe5fee3bcfaf3d1fdf8a7e521d5b9d2b42acb52cec" +dependencies = [ + "hermit-abi 0.3.9", + "libc", + "wasi", + "windows-sys", +] + +[[package]] +name = "num-derive" +version = "0.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ed3955f1a9c7c0c15e092f9c887db08b1fc683305fdf6eb6684f22555355e202" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "num-traits" +version = "0.2.19" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "071dfc062690e90b734c0b2273ce72ad0ffa95f0c74596bc250dcfd960262841" +dependencies = [ + "autocfg", +] + +[[package]] +name = "object" +version = "0.36.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "aedf0a2d09c573ed1d8d85b30c119153926a2b36dce0ab28322c09a117a4683e" +dependencies = [ + "memchr", +] + +[[package]] +name = "once_cell" +version = "1.20.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1261fe7e33c73b354eab43b1273a57c8f967d0391e80353e51f764ac02cf6775" + +[[package]] +name = "oorandom" +version = "11.1.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b410bbe7e14ab526a0e86877eb47c6996a2bd7746f027ba551028c925390e4e9" + +[[package]] +name = "parking_lot" +version = "0.12.3" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f1bf18183cf54e8d6059647fc3063646a1801cf30896933ec2311622cc4b9a27" +dependencies = [ + "lock_api", + "parking_lot_core", +] + +[[package]] +name = "parking_lot_core" +version = "0.9.10" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1e401f977ab385c9e4e3ab30627d6f26d00e2c73eef317493c4ec6d468726cf8" +dependencies = [ + "cfg-if", + "libc", + "redox_syscall", + "smallvec", + "windows-targets", +] + +[[package]] +name = "pin-project-lite" +version = "0.2.14" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "bda66fc9667c18cb2758a2ac84d1167245054bcf85d5d1aaa6923f45801bdd02" + +[[package]] +name = "pin-utils" +version = "0.1.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8b870d8c151b6f2fb93e84a13146138f05d02ed11c7e7c54f8826aaaf7c9f184" + +[[package]] +name = "plotters" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "5aeb6f403d7a4911efb1e33402027fc44f29b5bf6def3effcc22d7bb75f2b747" +dependencies = [ + "num-traits", + "plotters-backend", + "plotters-svg", + "wasm-bindgen", + "web-sys", +] + +[[package]] +name = "plotters-backend" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "df42e13c12958a16b3f7f4386b9ab1f3e7933914ecea48da7139435263a4172a" + +[[package]] +name = "plotters-svg" +version = "0.3.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "51bae2ac328883f7acdfea3d66a7c35751187f870bc81f94563733a154d7a670" +dependencies = [ + "plotters-backend", +] + +[[package]] +name = "ppv-lite86" +version = "0.2.20" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "77957b295656769bb8ad2b6a6b09d897d94f05c41b069aede1fcdaa675eaea04" +dependencies = [ + "zerocopy", +] + +[[package]] +name = "proc-macro2" +version = "1.0.87" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b3e4daa0dcf6feba26f985457cdf104d4b4256fc5a09547140f3631bb076b19a" +dependencies = [ + "unicode-ident", +] + +[[package]] +name = "quick-xml" +version = "0.36.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f7649a7b4df05aed9ea7ec6f628c67c9953a43869b8bc50929569b2999d443fe" +dependencies = [ + "memchr", +] + +[[package]] +name = "quote" +version = "1.0.37" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b5b9d34b8991d19d98081b46eacdd8eb58c6f2b201139f7c5f643cc155a633af" +dependencies = [ + "proc-macro2", +] + +[[package]] +name = "rand" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34af8d1a0e25924bc5b7c43c079c942339d8f0a8b57c39049bef581b46327404" +dependencies = [ + "libc", + "rand_chacha", + "rand_core", +] + +[[package]] +name = "rand_chacha" +version = "0.3.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e6c10a63a0fa32252be49d21e7709d4d4baf8d231c2dbce1eaa8141b9b127d88" +dependencies = [ + "ppv-lite86", + "rand_core", +] + +[[package]] +name = "rand_core" +version = "0.6.4" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ec0be4795e2f6a28069bec0b5ff3e2ac9bafc99e6a9a7dc3547996c5c816922c" +dependencies = [ + "getrandom", +] + +[[package]] +name = "rayon" +version = "1.10.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "b418a60154510ca1a002a752ca9714984e21e4241e804d32555251faf8b78ffa" +dependencies = [ + "either", + "rayon-core", +] + +[[package]] +name = "rayon-core" +version = "1.12.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1465873a3dfdaa8ae7cb14b4383657caab0b3e8a0aa9ae8e04b044854c8dfce2" +dependencies = [ + "crossbeam-deque", + "crossbeam-utils", +] + +[[package]] +name = "redox_syscall" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b6dfecf2c74bce2466cabf93f6664d6998a69eb21e39f4207930065b27b771f" +dependencies = [ + "bitflags 2.6.0", +] + +[[package]] +name = "regex" +version = "1.11.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "38200e5ee88914975b69f657f0801b6f6dccafd44fd9326302a4aaeecfacb1d8" +dependencies = [ + "aho-corasick", + "memchr", + "regex-automata", + "regex-syntax", +] + +[[package]] +name = "regex-automata" +version = "0.4.8" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "368758f23274712b504848e9d5a6f010445cc8b87a7cdb4d7cbee666c1288da3" +dependencies = [ + "aho-corasick", + "memchr", + "regex-syntax", +] + +[[package]] +name = "regex-syntax" +version = "0.8.5" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "2b15c43186be67a4fd63bee50d0303afffcef381492ebe2c5d87f324e1b8815c" + +[[package]] +name = "rustc-demangle" +version = "0.1.24" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "719b953e2095829ee67db738b3bfa9fa368c94900df327b3f07fe6e794d2fe1f" + +[[package]] +name = "ryu" +version = "1.0.18" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f3cb5ba0dc43242ce17de99c180e96db90b235b8a9fdc9543c96d2209116bd9f" + +[[package]] +name = "same-file" +version = "1.0.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "93fc1dc3aaa9bfed95e02e6eadabb4baf7e3078b0bd1b4d7b6b0b68378900502" +dependencies = [ + "winapi-util", +] + +[[package]] +name = "scopeguard" +version = "1.2.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "94143f37725109f92c262ed2cf5e59bce7498c01bcc1502d7b9afe439a4e9f49" + +[[package]] +name = "serde" +version = "1.0.210" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c8e3592472072e6e22e0a54d5904d9febf8508f65fb8552499a1abc7d1078c3a" +dependencies = [ + "serde_derive", +] + +[[package]] +name = "serde_derive" +version = "1.0.210" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "243902eda00fad750862fc144cea25caca5e20d615af0a81bee94ca738f1df1f" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "serde_json" +version = "1.0.128" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "6ff5456707a1de34e7e37f2a6fd3d3f808c318259cbd01ab6377795054b483d8" +dependencies = [ + "itoa", + "memchr", + "ryu", + "serde", +] + +[[package]] +name = "signal-hook-registry" +version = "1.4.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a9e9e0b4211b72e7b8b6e85c807d36c212bdb33ea8587f7569562a84df5465b1" +dependencies = [ + "libc", +] + +[[package]] +name = "slab" +version = "0.4.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8f92a496fb766b417c996b9c5e57daf2f7ad3b0bebe1ccfca4856390e3d3bb67" +dependencies = [ + "autocfg", +] + +[[package]] +name = "smallvec" +version = "1.13.2" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "3c5e1a9a646d36c3599cd173a41282daf47c44583ad367b8e6837255952e5c67" + +[[package]] +name = "socket2" +version = "0.5.7" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ce305eb0b4296696835b71df73eb912e0f1ffd2556a501fcede6e0c50349191c" +dependencies = [ + "libc", + "windows-sys", +] + +[[package]] +name = "syn" +version = "2.0.79" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "89132cd0bf050864e1d38dc3bbc07a0eb8e7530af26344d3d2bbbef83499f590" +dependencies = [ + "proc-macro2", + "quote", + "unicode-ident", +] + +[[package]] +name = "thiserror" +version = "1.0.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "d50af8abc119fb8bb6dbabcfa89656f46f84aa0ac7688088608076ad2b459a84" +dependencies = [ + "thiserror-impl", +] + +[[package]] +name = "thiserror-impl" +version = "1.0.64" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "08904e7672f5eb876eaaf87e0ce17857500934f4981c4a0ab2b4aa98baac7fc3" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tinytemplate" +version = "1.2.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "be4d6b5f19ff7664e8c98d03e2139cb510db9b0a60b55f8e8709b689d939b6bc" +dependencies = [ + "serde", + "serde_json", +] + +[[package]] +name = "tokio" +version = "1.40.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e2b070231665d27ad9ec9b8df639893f46727666c6767db40317fbe920a5d998" +dependencies = [ + "backtrace", + "bytes", + "libc", + "mio", + "parking_lot", + "pin-project-lite", + "signal-hook-registry", + "socket2", + "tokio-macros", + "windows-sys", +] + +[[package]] +name = "tokio-macros" +version = "2.4.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "693d596312e88961bc67d7f1f97af8a70227d9f90c31bba5806eec004978d752" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tokio-stream" +version = "0.1.16" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "4f4e6ce100d0eb49a2734f8c0812bcd324cf357d21810932c5df6b96ef2b86f1" +dependencies = [ + "futures-core", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tokio-util" +version = "0.7.12" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "61e7c3654c13bcd040d4a03abee2c75b1d14a37b423cf5a813ceae1cc903ec6a" +dependencies = [ + "bytes", + "futures-core", + "futures-sink", + "pin-project-lite", + "tokio", +] + +[[package]] +name = "tracing" +version = "0.1.40" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c3523ab5a71916ccf420eebdf5521fcef02141234bbc0b8a49f2fdc4544364ef" +dependencies = [ + "pin-project-lite", + "tracing-attributes", + "tracing-core", +] + +[[package]] +name = "tracing-attributes" +version = "0.1.27" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "34704c8d6ebcbc939824180af020566b01a7c01f80641264eba0999f6c2b6be7" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] + +[[package]] +name = "tracing-core" +version = "0.1.32" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "c06d3da6113f116aaee68e4d601191614c9053067f9ab7f6edbcb161237daa54" +dependencies = [ + "once_cell", +] + +[[package]] +name = "unicode-ident" +version = "1.0.13" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e91b56cd4cadaeb79bbf1a5645f6b4f8dc5bde8834ad5894a8db35fda9efa1fe" + +[[package]] +name = "walkdir" +version = "2.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "29790946404f91d9c5d06f9874efddea1dc06c5efe94541a7d6863108e3a5e4b" +dependencies = [ + "same-file", + "winapi-util", +] + +[[package]] +name = "wasi" +version = "0.11.0+wasi-snapshot-preview1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9c8d87e72b64a3b4db28d11ce29237c246188f4f51057d65a7eab63b7987e423" + +[[package]] +name = "wasm-bindgen" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "128d1e363af62632b8eb57219c8fd7877144af57558fb2ef0368d0087bddeb2e" +dependencies = [ + "cfg-if", + "once_cell", + "wasm-bindgen-macro", +] + +[[package]] +name = "wasm-bindgen-backend" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cb6dd4d3ca0ddffd1dd1c9c04f94b868c37ff5fac97c30b97cff2d74fce3a358" +dependencies = [ + "bumpalo", + "log", + "once_cell", + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-macro" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "e79384be7f8f5a9dd5d7167216f022090cf1f9ec128e6e6a482a2cb5c5422c56" +dependencies = [ + "quote", + "wasm-bindgen-macro-support", +] + +[[package]] +name = "wasm-bindgen-macro-support" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "26c6ab57572f7a24a4985830b120de1594465e5d500f24afe89e16b4e833ef68" +dependencies = [ + "proc-macro2", + "quote", + "syn", + "wasm-bindgen-backend", + "wasm-bindgen-shared", +] + +[[package]] +name = "wasm-bindgen-shared" +version = "0.2.95" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "65fc09f10666a9f147042251e0dda9c18f166ff7de300607007e96bdebc1068d" + +[[package]] +name = "web-sys" +version = "0.3.72" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "f6488b90108c040df0fe62fa815cbdee25124641df01814dd7282749234c6112" +dependencies = [ + "js-sys", + "wasm-bindgen", +] + +[[package]] +name = "winapi-util" +version = "0.1.9" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "cf221c93e13a30d793f7645a0e7762c55d169dbb0a49671918a2319d289b10bb" +dependencies = [ + "windows-sys", +] + +[[package]] +name = "windows-sys" +version = "0.52.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "282be5f36a8ce781fad8c8ae18fa3f9beff57ec1b52cb3de0789201425d9a33d" +dependencies = [ + "windows-targets", +] + +[[package]] +name = "windows-targets" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "9b724f72796e036ab90c1021d4780d4d3d648aca59e491e6b98e725b84e99973" +dependencies = [ + "windows_aarch64_gnullvm", + "windows_aarch64_msvc", + "windows_i686_gnu", + "windows_i686_gnullvm", + "windows_i686_msvc", + "windows_x86_64_gnu", + "windows_x86_64_gnullvm", + "windows_x86_64_msvc", +] + +[[package]] +name = "windows_aarch64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "32a4622180e7a0ec044bb555404c800bc9fd9ec262ec147edd5989ccd0c02cd3" + +[[package]] +name = "windows_aarch64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "09ec2a7bb152e2252b53fa7803150007879548bc709c039df7627cabbd05d469" + +[[package]] +name = "windows_i686_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "8e9b5ad5ab802e97eb8e295ac6720e509ee4c243f69d781394014ebfe8bbfa0b" + +[[package]] +name = "windows_i686_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "0eee52d38c090b3caa76c563b86c3a4bd71ef1a819287c19d586d7334ae8ed66" + +[[package]] +name = "windows_i686_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "240948bc05c5e7c6dabba28bf89d89ffce3e303022809e73deaefe4f6ec56c66" + +[[package]] +name = "windows_x86_64_gnu" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "147a5c80aabfbf0c7d901cb5895d1de30ef2907eb21fbbab29ca94c5b08b1a78" + +[[package]] +name = "windows_x86_64_gnullvm" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "24d5b23dc417412679681396f2b49f3de8c1473deb516bd34410872eff51ed0d" + +[[package]] +name = "windows_x86_64_msvc" +version = "0.52.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "589f6da84c646204747d1270a2a5661ea66ed1cced2631d546fdfb155959f9ec" + +[[package]] +name = "zerocopy" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "1b9b4fd18abc82b8136838da5d50bae7bdea537c574d8dc1a34ed098d6c166f0" +dependencies = [ + "byteorder", + "zerocopy-derive", +] + +[[package]] +name = "zerocopy-derive" +version = "0.7.35" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "fa4f8080344d4671fb4e831a13ad1e68092748387dfc4f55e356242fae12ce3e" +dependencies = [ + "proc-macro2", + "quote", + "syn", +] diff --git a/Cargo.toml b/Cargo.toml new file mode 100644 index 0000000..e8c57cd --- /dev/null +++ b/Cargo.toml @@ -0,0 +1,44 @@ +[package] +name = "mavlink-codec" +authors = ["João Antônio Cardoso "] +license = "MIT" +version = "0.1.0" +edition = "2021" + +[lib] +name = "mavlink_codec" +path = "src/lib.rs" +bench = false + +# [profile.release] +# codegen-units = 1 +# lto = "fat" +# panic = "abort" +# opt-level = 3 + +[dependencies] +bytes = "1.7" +log = "0.4" +mavlink = { default-features = false, features = ["std", "ardupilotmega", "tokio-1"], git = "https://github.com/mavlink/rust-mavlink", hash = "5f2ecbe8" } +thiserror = "1.0" +tokio-util = { version = "0.7", features = ["codec"] } + +[features] +default = ["std"] +std = [] + +[dev-dependencies] +anyhow = "1.0" +criterion = { version = "0.5", features = ["async_tokio"] } +dev-utils = { path = "dev_utils" } +futures = "0.3" +mavlink = { default-features = false, features = ["std", "ardupilotmega", "tokio-1"], git = "https://github.com/mavlink/rust-mavlink", hash = "5f2ecbe8" } +rand = "0.8" +tokio = { version = "1", features = ["full"] } +tokio-stream = "0.1" +tokio-util = "0.7" +tracing = "0.1" + +[[bench]] +name = "bench" +harness = false diff --git a/LICENSE b/LICENSE new file mode 100644 index 0000000..5851ca4 --- /dev/null +++ b/LICENSE @@ -0,0 +1,21 @@ +MIT License + +Copyright (c) 2024 Blue Robotics + +Permission is hereby granted, free of charge, to any person obtaining a copy +of this software and associated documentation files (the "Software"), to deal +in the Software without restriction, including without limitation the rights +to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +copies of the Software, and to permit persons to whom the Software is +furnished to do so, subject to the following conditions: + +The above copyright notice and this permission notice shall be included in all +copies or substantial portions of the Software. + +THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +SOFTWARE. diff --git a/benches/bench.rs b/benches/bench.rs new file mode 100644 index 0000000..b2b0c79 --- /dev/null +++ b/benches/bench.rs @@ -0,0 +1,141 @@ +use criterion::{ + black_box, criterion_group, criterion_main, AxisScale, BenchmarkId, Criterion, + PlotConfiguration, Throughput, +}; +use mavlink::Message; +use mavlink_codec::{codec::MavlinkCodec, v2::V2Packet}; +use rand::{prelude::StdRng, SeedableRng}; +use tokio_stream::StreamExt; +use tokio_util::codec::{Decoder, FramedRead}; + +fn add_random_v2_message(buf: &mut Vec, rng: &mut StdRng) { + use rand::Rng; + + use mavlink::ardupilotmega::*; + + let header = mavlink::MavHeader { + system_id: rng.gen_range(1..255), + component_id: rng.gen_range(1..255), + sequence: rng.gen_range(0..255), + }; + + loop { + let message_id = rng.gen_range(0..2 ^ 24); + if let Ok(data) = MavMessage::default_message_from_id(message_id) { + if mavlink::write_v2_msg(buf, header, &data).is_ok() { + break; + } + }; + } +} + +fn benchmark_decode(c: &mut Criterion) { + let seed = 42; + println!("Using seed {seed:?}"); + let mut rng: StdRng = SeedableRng::seed_from_u64(seed); + + let mut group = c.benchmark_group("decode"); + group.confidence_level(0.95).sample_size(100); + + let plot_config = PlotConfiguration::default().summary_scale(AxisScale::Logarithmic); + + group.plot_config(plot_config); + + let messages_counts = vec![1, 5, 10, 50, 100, 500, 1000, 5000, 10000, 50000, 100000]; + + let rt = tokio::runtime::Runtime::new().unwrap(); + + for messages_count in &messages_counts { + group.throughput(Throughput::Elements(*messages_count)); + + let mut buf: Vec = + Vec::with_capacity(V2Packet::MAX_PACKET_SIZE * *messages_count as usize); + for _ in 0..*messages_count { + add_random_v2_message(&mut buf, &mut rng); + } + + group.bench_with_input( + BenchmarkId::new("rust-mavlink", messages_count), + messages_count, + |b, &messages_count| { + let buf = buf.clone(); + + b.to_async(&rt).iter(|| async { + let mut reader = mavlink::peek_reader::PeekReader::new(&buf[..]); + + for _ in 0..messages_count { + let _msg = black_box( + mavlink::read_v2_raw_message::( + &mut reader, + ) + .unwrap(), + ); + } + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("rust-mavlink-async", messages_count), + messages_count, + |b, &messages_count| { + let buf = buf.clone(); + + b.to_async(&rt).iter(|| async { + let mut reader = mavlink::async_peek_reader::AsyncPeekReader::new(&buf[..]); + + for _ in 0..messages_count { + let _msg = black_box( + mavlink::read_v2_raw_message_async::< + mavlink::ardupilotmega::MavMessage, + _, + >(&mut reader) + .await + .unwrap(), + ); + } + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("decoder-decode", messages_count), + messages_count, + |b, &messages_count| { + let buf = buf.clone(); // Reset buffer each time + + b.to_async(&rt).iter(|| async { + let mut buf = bytes::BytesMut::from(buf.as_slice()); + let mut codec = + MavlinkCodec::::default(); + + for _ in 0..messages_count { + let _msg = black_box(codec.decode(&mut buf).unwrap().unwrap()); + } + }) + }, + ); + + group.bench_with_input( + BenchmarkId::new("decoder-framed.next", messages_count), + messages_count, + |b, &messages_count| { + let buf = buf.clone(); + + b.to_async(&rt).iter(|| async { + let codec = MavlinkCodec::::default(); + let mut framed = FramedRead::new(buf.as_slice(), codec); + + for _ in 0..messages_count { + let _msg = black_box(framed.next().await.unwrap().unwrap()); + } + }) + }, + ); + } + + group.finish(); +} + +criterion_group!(benches, benchmark_decode); +criterion_main!(benches); diff --git a/dev_utils/Cargo.toml b/dev_utils/Cargo.toml new file mode 100644 index 0000000..e381f5d --- /dev/null +++ b/dev_utils/Cargo.toml @@ -0,0 +1,10 @@ +[package] +name = "dev-utils" +authors = ["João Antônio Cardoso "] +license = "MIT" +version = "0.1.0" +edition = "2021" + +[dependencies] +mavlink = { default-features = false, features = ["std", "ardupilotmega", "tokio-1"], git = "https://github.com/mavlink/rust-mavlink", hash = "5f2ecbe8" } +rand = "0.8" diff --git a/dev_utils/src/lib.rs b/dev_utils/src/lib.rs new file mode 100644 index 0000000..7c7c14b --- /dev/null +++ b/dev_utils/src/lib.rs @@ -0,0 +1,84 @@ +use std::io::Write; + +use mavlink::{MAVLinkV1MessageRaw, MAVLinkV2MessageRaw}; +use rand::{prelude::StdRng, Rng}; + +pub fn add_random_v1_message(buf: &mut Vec, rng: &mut StdRng) { + let raw_v1_message = create_random_v1_raw_message(rng); + + let bytes = raw_v1_message.raw_bytes(); + + let max_packet_size = 263; + let checksum_size = std::mem::size_of::(); + let size_without_checksum = bytes.len() - checksum_size; + + buf.write(&bytes[..size_without_checksum]).unwrap(); + for _ in 0..max_packet_size - checksum_size - size_without_checksum { + buf.push(0); + } + buf.write(&bytes[bytes.len() - checksum_size..]).unwrap(); +} + +pub fn create_random_v1_raw_message(rng: &mut StdRng) -> MAVLinkV1MessageRaw { + use mavlink::{ardupilotmega::*, Message}; + + let header = mavlink::MavHeader { + system_id: rng.gen_range(1..255), + component_id: rng.gen_range(1..255), + sequence: rng.gen_range(0..255), + }; + + loop { + let message_id = rng.gen_range(0..2 ^ 24); + if let Ok(message_data) = MavMessage::default_message_from_id(message_id) { + let mut raw_v1_message = MAVLinkV1MessageRaw::new(); + + raw_v1_message.serialize_message(header, &message_data); + + return raw_v1_message; + }; + } +} + +pub fn add_random_v2_message(buf: &mut Vec, rng: &mut StdRng) { + let raw_v2_message = create_random_v2_raw_message(rng); + + buf.write(raw_v2_message.raw_bytes()).unwrap(); +} + +pub fn create_random_v2_raw_message(rng: &mut StdRng) -> MAVLinkV2MessageRaw { + use mavlink::{ardupilotmega::*, Message}; + + let header = mavlink::MavHeader { + system_id: rng.gen_range(1..255), + component_id: rng.gen_range(1..255), + sequence: rng.gen_range(0..255), + }; + + loop { + let message_id = rng.gen_range(0..2 ^ 24); + if let Ok(message_data) = MavMessage::default_message_from_id(message_id) { + let mut raw_v2_message = MAVLinkV2MessageRaw::new(); + + raw_v2_message.serialize_message(header, &message_data); + + return raw_v2_message; + }; + } +} + +pub fn chunk_buffer_randomly(buf: &[u8], rng: &mut StdRng, min: usize, max: usize) -> Vec> { + let mut chunks = Vec::new(); + let mut remaining = buf.len(); + let mut start = 0; + + while remaining > 0 { + let chunk_size = rng.gen_range(min..=max).min(remaining); + let end = start + chunk_size; + chunks.push(buf[start..end].to_vec()); + start = end; + remaining -= chunk_size; + } + + chunks +} diff --git a/src/codec.rs b/src/codec.rs new file mode 100644 index 0000000..19d5ad3 --- /dev/null +++ b/src/codec.rs @@ -0,0 +1,556 @@ +use bytes::{Buf, BufMut, BytesMut}; +use log::trace; +use mavlink::calculate_crc; +use tokio_util::codec::{Decoder, Encoder}; + +use crate::{ + error::DecoderError, + v1::{self, V1Packet, V1_STX}, + v2::{self, V2Packet, MAVLINK_SUPPORTED_IFLAGS, V2_STX}, + Packet, +}; + +#[derive(Debug, Default)] +pub struct MavlinkCodec< + const ACCEPT_V1: bool, + const ACCEPT_V2: bool, + const DROP_INVALID_SYSID: bool, + const DROP_INVALID_COMPID: bool, + const SKIP_CRC_VALIDATION: bool, + const DROP_INCOMPATIBLE: bool, +> { + pub state: CodecState, +} + +#[derive(Debug, Default)] +pub enum CodecState { + #[default] + WaitingForStx, + WaitingV1Packet, + WaitingV2Packet, + ValidatingV1Packet, + ValidatingV2Packet, + CopyV1Packet, + CopyV2Packet, +} + +impl< + const ACCEPT_V1: bool, + const ACCEPT_V2: bool, + const DROP_INVALID_SYSID: bool, + const DROP_INVALID_COMPID: bool, + const SKIP_CRC_VALIDATION: bool, + const DROP_INCOMPATIBLE: bool, + > Decoder + for MavlinkCodec< + ACCEPT_V1, + ACCEPT_V2, + DROP_INVALID_SYSID, + DROP_INVALID_COMPID, + SKIP_CRC_VALIDATION, + DROP_INCOMPATIBLE, + > +{ + type Item = Result; + type Error = std::io::Error; + + fn decode(&mut self, buf: &mut BytesMut) -> Result, Self::Error> { + trace!("Decoding: {:?}", &buf[..]); + + loop { + match self.state { + CodecState::WaitingForStx => { + if buf.is_empty() { + if ACCEPT_V2 { + buf.reserve(V2Packet::MAX_PACKET_SIZE); + } else { + buf.reserve(V1Packet::MAX_PACKET_SIZE); + } + + trace!( + "Not enough data, buf.len: {:?}, buf.capacity: {:?}", + buf.len(), + buf.capacity() + ); + return Ok(None); + } + + match buf[0] { + V1_STX if ACCEPT_V1 => self.state = CodecState::WaitingV1Packet, + V2_STX if ACCEPT_V2 => self.state = CodecState::WaitingV2Packet, + _ => { + trace!("Invalid STX byte: {}", buf[0]); + buf.advance(V1Packet::STX_SIZE); + continue; + } + } + } + // V1 Codec + CodecState::WaitingV1Packet if ACCEPT_V1 => { + if buf.len() < V1Packet::MAX_PACKET_SIZE { + buf.reserve(V1Packet::MAX_PACKET_SIZE); + + trace!( + "Not enough data, buf.len: {:?}, buf.capacity: {:?}", + buf.len(), + buf.capacity() + ); + return Ok(None); + } + + trace!("Packet is read: {:?}", &buf[..]); + self.state = CodecState::ValidatingV1Packet; + } + CodecState::ValidatingV1Packet if ACCEPT_V1 => { + if buf.len() < v1::packet_size(buf) { + buf.reserve(V1Packet::MAX_PACKET_SIZE); + + trace!( + "Not enough data, buf.len: {:?}, buf.capacity: {:?}", + buf.len(), + buf.capacity() + ); + return Ok(None); + } + + // System ID validation + if DROP_INVALID_SYSID { + let sysid = *v1::sysid(buf); + if sysid == 0 { + trace!("Invalid SystemID: {sysid:?}. Data: {:?}", &buf[..]); + + buf.advance(V1Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::InvalidSystemID { sysid }))); + } + } + + // Component ID validation + if DROP_INVALID_COMPID { + let compid = *v1::compid(buf); + if compid == 0 { + trace!("Invalid SystemID: {compid:?}. Data: {:?}", &buf[..]); + + buf.advance(V1Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::InvalidComponentID { compid }))); + } + } + + // CRC Validation + if SKIP_CRC_VALIDATION { + trace!("CRC Validation skipped."); + self.state = CodecState::CopyV1Packet; + continue; + } + + let msgid = *v1::msgid(buf) as u32; + let Some(extra_crc) = get_extra_crc(msgid) else { + trace!("Unknown message ID {msgid:?}. Data: {:?}", &buf[..]); + + buf.advance(V1Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::UnknownMessageID { msgid }))); + }; + let checksum_data = v1::checksum_data(buf); + let calculated_crc = calculate_crc(checksum_data, extra_crc); + + let expected_crc = v1::checksum(buf); + if calculated_crc.ne(&expected_crc) { + trace!( + "Invalid CRC: expected: {expected_crc:?}, calculated: {calculated_crc:?}. checksum_data: {checksum_data:?}" + ); + + buf.advance(V1Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::InvalidCRC { + expected_crc, + calculated_crc, + }))); + } + + self.state = CodecState::CopyV1Packet; + } + CodecState::CopyV1Packet if ACCEPT_V1 => { + if buf.len() < V1Packet::MAX_PACKET_SIZE { + buf.reserve(V1Packet::MAX_PACKET_SIZE); + + trace!( + "Not enough data, buf.len: {:?}, buf.capacity: {:?}", + buf.len(), + buf.capacity() + ); + return Ok(None); + } + + let buf_packet = if SKIP_CRC_VALIDATION { + let mut buf_packet = BytesMut::with_capacity(V1Packet::MAX_PACKET_SIZE); + + // Because V1 packets have zero-filled payloads, and we want to avoid copying the whole data as once, we do the copy in two steps: + // 1. Copy header and payload + let checksum_start = v1::packet_size(&buf) - V1Packet::CHECKSUM_SIZE; + buf_packet[..checksum_start].copy_from_slice(&buf[..checksum_start]); + + // 2. Copy the checksum + let checksum_end = checksum_start + V1Packet::CHECKSUM_SIZE; + buf_packet[checksum_start..] + .copy_from_slice(&buf[checksum_start..checksum_end]); + + // Since it is a non validated packet, there might be other packets within this buffer, so we can only discard this STX + buf.advance(V1Packet::STX_SIZE); + + buf_packet + } else { + let buf_packet = buf.split_to(V1Packet::MAX_PACKET_SIZE); + buf.reserve(V1Packet::MAX_PACKET_SIZE); + + buf_packet + }; + + let packet = V1Packet { + buffer: buf_packet.freeze(), + }; + + return Ok(Some(Ok(Packet::V1(packet)))); + } + // V2 Codec + CodecState::WaitingV2Packet if ACCEPT_V2 => { + if buf.len() < V2Packet::HEADER_SIZE { + buf.reserve(V2Packet::HEADER_SIZE); + + trace!( + "Not enough data, buf.len: {:?}, buf.capacity: {:?}", + buf.len(), + buf.capacity() + ); + return Ok(None); + } + + if DROP_INCOMPATIBLE { + let incompat_flags = *v2::incompat_flags(buf); + if incompat_flags & !MAVLINK_SUPPORTED_IFLAGS > 0 { + buf.advance(V1Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::Incompatible { incompat_flags }))); + } + } + + let packet_size = v2::packet_size(buf); + if buf.len() < packet_size { + buf.reserve(packet_size); + + trace!( + "Not enough data, buf.len: {:?}, buf.capacity: {:?}", + buf.len(), + buf.capacity() + ); + return Ok(None); + } + + trace!("Packet is read: {:?}", &buf[..]); + self.state = CodecState::ValidatingV2Packet; + } + CodecState::ValidatingV2Packet if ACCEPT_V2 => { + if buf.len() < v2::packet_size(buf) { + buf.reserve(V2Packet::MAX_PACKET_SIZE); + + trace!( + "Not enough data, buf.len: {:?}, buf.capacity: {:?}", + buf.len(), + buf.capacity() + ); + return Ok(None); + } + + // System ID validation + if DROP_INVALID_SYSID { + let sysid = *v2::sysid(buf); + if sysid == 0 { + trace!("Invalid SystemID: {sysid:?}. Data: {:?}", &buf[..]); + + buf.advance(V2Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::InvalidSystemID { sysid }))); + } + } + + // Component ID validation + if DROP_INVALID_COMPID { + let compid = *v2::compid(buf); + if compid == 0 { + trace!("Invalid SystemID: {compid:?}. Data: {:?}", &buf[..]); + + buf.advance(V2Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::InvalidComponentID { compid }))); + } + } + + // CRC Validation + if SKIP_CRC_VALIDATION { + trace!("Validation skipped."); + self.state = CodecState::CopyV2Packet; + continue; + } + + let msgid = v2::msgid(buf); + let Some(extra_crc) = get_extra_crc(msgid) else { + trace!("Unknown message ID {msgid:?}. Data: {:?}", &buf[..]); + + buf.advance(V2Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::UnknownMessageID { msgid }))); + }; + let checksum_data = v2::checksum_data(buf); + let calculated_crc = calculate_crc(checksum_data, extra_crc); + + let expected_crc = v2::checksum(buf); + if calculated_crc.ne(&expected_crc) { + trace!("Invalid CRC: expected {expected_crc}, calculated {calculated_crc}"); + + buf.advance(V2Packet::STX_SIZE); // Discard this STX + self.state = CodecState::WaitingForStx; + + return Ok(Some(Err(DecoderError::InvalidCRC { + expected_crc, + calculated_crc, + }))); + } + + self.state = CodecState::CopyV2Packet; + } + CodecState::CopyV2Packet if ACCEPT_V2 => { + let packet_size = v2::packet_size(buf); + if buf.len() < packet_size { + buf.reserve(V2Packet::MAX_PACKET_SIZE); + + trace!( + "Not enough data, buf.len: {:?}, buf.capacity: {:?}", + buf.len(), + buf.capacity() + ); + return Ok(None); + } + + let buf_packet = if SKIP_CRC_VALIDATION { + // Copy the entire packet consuming the source buffer + let mut buf_packet = BytesMut::with_capacity(packet_size); + buf_packet[..packet_size].copy_from_slice(&buf[..packet_size]); + + // Since it is a non validated packet, there might be other packets within this buffer, so we can only discard this STX + buf.advance(V2Packet::STX_SIZE); + + buf_packet + } else { + let buf_packet = buf.split_to(packet_size); + buf.reserve(V2Packet::MAX_PACKET_SIZE); + + buf_packet + }; + + let packet = V2Packet { + buffer: buf_packet.freeze(), + }; + + self.state = CodecState::WaitingForStx; + return Ok(Some(Ok(Packet::V2(packet)))); + } + _ => { + unreachable!() + } + } + } + } +} + +impl< + const ACCEPT_V1: bool, + const ACCEPT_V2: bool, + const DROP_INVALID_SYSID: bool, + const DROP_INVALID_COMPID: bool, + const SKIP_CRC_VALIDATION: bool, + const DROP_INCOMPATIBLE: bool, + > Encoder + for MavlinkCodec< + ACCEPT_V1, + ACCEPT_V2, + DROP_INVALID_SYSID, + DROP_INVALID_COMPID, + SKIP_CRC_VALIDATION, + DROP_INCOMPATIBLE, + > +{ + type Error = std::io::Error; + + fn encode(&mut self, packet: Packet, buf: &mut BytesMut) -> Result<(), Self::Error> { + trace!("encoding..."); + match packet { + Packet::V1(v1_packet) if ACCEPT_V1 => { + trace!("v1 package written"); + buf.put(&v1_packet.buffer[..]); + } + Packet::V2(v2_packet) if ACCEPT_V2 => { + trace!("v2 package written"); + buf.put(&v2_packet.buffer[..]); + } + _ => { + trace!("unsupported package version"); + return Err(std::io::Error::new( + std::io::ErrorKind::InvalidData, + "Unsupported packet version", + )); + } + } + + Ok(()) + } +} + +#[inline(always)] +pub fn get_extra_crc(msgid: u32) -> Option { + use mavlink::Message; + + Some(mavlink::ardupilotmega::MavMessage::extra_crc(msgid)) +} + +#[cfg(test)] +mod test_encode { + use super::*; + use mavlink::{ + ardupilotmega::MavMessage, MAVLinkV1MessageRaw, MAVLinkV2MessageRaw, MavHeader, Message, + }; + + #[test] + fn test_encode_v1() { + let mut codec = MavlinkCodec::::default(); + + let v1_packet = { + let header = MavHeader { + system_id: 1, + component_id: 1, + sequence: 0, + }; + + let message_data = MavMessage::default_message_from_id(0).unwrap(); // Heartbeat message + let mut raw_v1_message = MAVLinkV1MessageRaw::new(); + raw_v1_message.serialize_message(header, &message_data); + V1Packet::from(raw_v1_message) + }; + + let mut buf = BytesMut::with_capacity(V1Packet::MAX_PACKET_SIZE); + + codec + .encode(Packet::V1(v1_packet.clone()), &mut buf) + .unwrap(); + + assert_eq!(&buf[..V1Packet::MAX_PACKET_SIZE], v1_packet.as_slice()) + } + + #[test] + fn test_encode_v2() { + let mut codec = MavlinkCodec::::default(); + + let v2_packet = { + let header = MavHeader { + system_id: 1, + component_id: 1, + sequence: 0, + }; + + let message_data = MavMessage::default_message_from_id(0).unwrap(); // Heartbeat message + let mut raw_v2_message = MAVLinkV2MessageRaw::new(); + raw_v2_message.serialize_message(header, &message_data); + V2Packet::from(raw_v2_message) + }; + + let mut buf = BytesMut::with_capacity(V1Packet::MAX_PACKET_SIZE); + + codec + .encode(Packet::V2(v2_packet.clone()), &mut buf) + .unwrap(); + + assert_eq!(&buf[..v2_packet.packet_size()], v2_packet.as_slice()) + } +} + +#[cfg(test)] +mod test_decode { + use super::*; + use mavlink::{ + ardupilotmega::MavMessage, MAVLinkV1MessageRaw, MAVLinkV2MessageRaw, MavHeader, Message, + }; + + #[test] + fn test_decode_v1() { + let mut codec = MavlinkCodec::::default(); + + let mut buf = BytesMut::with_capacity(V1Packet::MAX_PACKET_SIZE); + + let expected_packet = { + let header = MavHeader { + system_id: 1, + component_id: 1, + sequence: 0, + }; + + let message_data = MavMessage::default_message_from_id(0).unwrap(); // Heartbeat message + let mut raw_v1_message = MAVLinkV1MessageRaw::new(); + raw_v1_message.serialize_message(header, &message_data); + + let bytes = raw_v1_message.raw_bytes(); + + let size_without_checksum = bytes.len() - V1Packet::CHECKSUM_SIZE; + buf.put(&bytes[..size_without_checksum]); + buf.put_bytes( + 0, + V1Packet::MAX_PACKET_SIZE - V1Packet::CHECKSUM_SIZE - size_without_checksum, + ); + buf.put(&bytes[bytes.len() - V1Packet::CHECKSUM_SIZE..]); + + Packet::V1(V1Packet::from(raw_v1_message)) + }; + assert!(!buf.is_empty()); + + let packet = codec.decode(&mut buf).unwrap().unwrap().unwrap(); + + assert_eq!(packet, expected_packet); + } + + #[test] + fn test_decode_v2() { + let mut codec = MavlinkCodec::::default(); + + let mut buf = BytesMut::with_capacity(V1Packet::MAX_PACKET_SIZE); + + let expected_packet = { + let header = MavHeader { + system_id: 1, + component_id: 1, + sequence: 0, + }; + + let message_data = MavMessage::default_message_from_id(0).unwrap(); // Heartbeat message + let mut raw_v2_message = MAVLinkV2MessageRaw::new(); + raw_v2_message.serialize_message(header, &message_data); + + let bytes = raw_v2_message.raw_bytes(); + + buf.put(&bytes[..bytes.len()]); + + Packet::V2(V2Packet::from(raw_v2_message)) + }; + assert!(!buf.is_empty()); + + let packet = codec.decode(&mut buf).unwrap().unwrap().unwrap(); + + assert_eq!(packet, expected_packet); + } +} diff --git a/src/error.rs b/src/error.rs new file mode 100644 index 0000000..420c3a2 --- /dev/null +++ b/src/error.rs @@ -0,0 +1,30 @@ +use thiserror::Error; + +use std::io; + +#[derive(Error, Debug)] +pub enum DecoderError { + #[error("invalid System ID: {sysid}")] + InvalidSystemID { sysid: u8 }, + + #[error("invalid Component ID: {compid}")] + InvalidComponentID { compid: u8 }, + + #[error("found incompatible flags in {incompat_flags}")] + Incompatible { incompat_flags: u8 }, + + #[error("unknown Message ID")] + UnknownMessageID { msgid: u32 }, + + #[error("invalid CRC: expected {expected_crc}, calculated {calculated_crc}")] + InvalidCRC { + expected_crc: u16, + calculated_crc: u16, + }, + + #[error("io error")] + Io(#[from] io::Error), + + #[error("unknown error")] + Unknown, +} diff --git a/src/lib.rs b/src/lib.rs new file mode 100644 index 0000000..c4c68fe --- /dev/null +++ b/src/lib.rs @@ -0,0 +1,180 @@ +pub mod codec; +pub mod error; +pub mod rust_mavlink_compatibility; +pub mod v1; +pub mod v2; + +use bytes::Bytes; + +use v1::{V1Packet, V1_STX}; +use v2::{V2Packet, V2_STX}; + +#[derive(Debug, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +#[repr(u8)] +pub enum Packet { + V1(V1Packet) = V1_STX, + V2(V2Packet) = V2_STX, +} + +impl Packet { + #[inline(always)] + pub fn bytes(&self) -> &Bytes { + match self { + Packet::V1(v1_packet) => v1_packet.bytes(), + Packet::V2(v2_packet) => v2_packet.bytes(), + } + } + + #[inline(always)] + pub fn as_slice(&self) -> &[u8] { + match self { + Packet::V1(v1_packet) => v1_packet.as_slice(), + Packet::V2(v2_packet) => v2_packet.as_slice(), + } + } + + #[inline(always)] + pub fn header(&self) -> &[u8] { + match self { + Packet::V1(v1_packet) => v1_packet.header(), + Packet::V2(v2_packet) => v2_packet.header(), + } + } + + #[inline(always)] + pub fn payload(&self) -> &[u8] { + match self { + Packet::V1(v1_packet) => v1_packet.payload(), + Packet::V2(v2_packet) => v2_packet.payload(), + } + } + + #[inline(always)] + pub fn checksum(&self) -> u16 { + match self { + Packet::V1(v1_packet) => v1_packet.checksum(), + Packet::V2(v2_packet) => v2_packet.checksum(), + } + } + + #[inline(always)] + pub fn checksum_data(&self) -> &[u8] { + match self { + Packet::V1(v1_packet) => v1_packet.checksum_data(), + Packet::V2(v2_packet) => v2_packet.checksum_data(), + } + } + + #[inline(always)] + pub fn packet_size(&self) -> usize { + match self { + Packet::V1(v1_packet) => v1_packet.packet_size(), + Packet::V2(v2_packet) => v2_packet.packet_size(), + } + } + + #[inline(always)] + pub fn stx(&self) -> &u8 { + match self { + Packet::V1(v1_packet) => v1_packet.stx(), + Packet::V2(v2_packet) => v2_packet.stx(), + } + } + + #[inline(always)] + pub fn payload_length(&self) -> &u8 { + match self { + Packet::V1(v1_packet) => v1_packet.payload_length(), + Packet::V2(v2_packet) => v2_packet.payload_length(), + } + } + + #[inline(always)] + pub fn sequence(&self) -> &u8 { + match self { + Packet::V1(v1_packet) => v1_packet.sequence(), + Packet::V2(v2_packet) => v2_packet.sequence(), + } + } + + #[inline(always)] + pub fn system_id(&self) -> &u8 { + match self { + Packet::V1(v1_packet) => v1_packet.system_id(), + Packet::V2(v2_packet) => v2_packet.system_id(), + } + } + + #[inline(always)] + pub fn component_id(&self) -> &u8 { + match self { + Packet::V1(v1_packet) => v1_packet.component_id(), + Packet::V2(v2_packet) => v2_packet.component_id(), + } + } + + #[inline(always)] + pub fn message_id(&self) -> u32 { + match self { + Packet::V1(v1_packet) => *v1_packet.message_id() as u32, + Packet::V2(v2_packet) => v2_packet.message_id(), + } + } +} + +/// Creates a `MavlinkCodec` with compile-time configuration. +/// +/// # Parameters +/// +/// - `accept_v1`: Whether to accept MAVLink V1 messages. +/// - `accept_v2`: Whether to accept MAVLink V2 messages. +/// - `drop_invalid_sysid`: Whether to drop messages with zeroed System ID +/// - `drop_invalid_compid`: Whether to drop messages with zeroed Component ID +/// - `skip_crc_validation`: Whether to skip the CRC validation +/// - `drop_incompatible`: Whether to drop messages with unknown Incompatibility Flags +/// +/// # Example +/// +/// ``` +/// use mavlink_codec::{mavlink_codec, codec::MavlinkCodec}; +/// +/// let codec = mavlink_codec! { +/// accept_v1: true, +/// accept_v2: true, +/// drop_invalid_sysid: false, +/// drop_invalid_compid: false, +/// skip_crc_validation: false, +/// drop_incompatible: false, +/// }; +/// +/// // Which is equivallent to: +/// let codec = MavlinkCodec::::default(); +/// ``` +#[macro_export] +macro_rules! mavlink_codec { + ( + // Whether to accept MAVLink V1 messages + accept_v1: $accept_v1:expr, + // Whether to accept MAVLink V2 messages + accept_v2: $accept_v2:expr, + /// Whether to drop messages with zeroed System ID + drop_invalid_sysid: $drop_invalid_sysid:expr, + /// Whether to drop messages with zeroed Component ID + drop_invalid_compid: $drop_invalid_compid:expr, + /// Whether to skip the CRC validation + skip_crc_validation: $skip_crc_validation:expr, + /// Whether to drop messages with unknown Incompatibility Flags + drop_incompatible: $drop_incompatible:expr, + ) => { + $crate::codec::MavlinkCodec::< + { $accept_v1 }, + { $accept_v2 }, + { $drop_invalid_sysid }, + { $drop_invalid_compid }, + { $skip_crc_validation }, + { $drop_incompatible }, + > { + state: $crate::codec::CodecState::default(), + } + }; +} diff --git a/src/rust_mavlink_compatibility.rs b/src/rust_mavlink_compatibility.rs new file mode 100644 index 0000000..93ad0d8 --- /dev/null +++ b/src/rust_mavlink_compatibility.rs @@ -0,0 +1,107 @@ +use bytes::{BufMut, BytesMut}; + +use crate::{v1::V1Packet, v2::V2Packet, Packet}; + +impl From for Packet { + fn from(value: mavlink::MAVLinkV1MessageRaw) -> Self { + Self::V1(V1Packet::from(value)) + } +} + +impl From for Packet { + fn from(value: mavlink::MAVLinkV2MessageRaw) -> Self { + Self::V2(V2Packet::from(value)) + } +} + +impl TryFrom for mavlink::MAVLinkV1MessageRaw { + type Error = mavlink::error::MessageReadError; + + /// A convenient rust-mavlink compatibility layer + /// warning: this has a bad performance because we don't have access to the mutable internal buffer of rust-mavlink's raw messages fn try_from(value: Packet) -> Result { + fn try_from(value: Packet) -> Result { + match value { + Packet::V1(v1_packet) => mavlink::MAVLinkV1MessageRaw::try_from(v1_packet), + Packet::V2(_) => Err(mavlink::error::MessageReadError::Io(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "Expected V1 Message", + ))), + } + } +} + +impl TryFrom for mavlink::MAVLinkV2MessageRaw { + type Error = mavlink::error::MessageReadError; + + /// A convenient rust-mavlink compatibility layer + /// warning: this has a bad performance because we don't have access to the mutable internal buffer of rust-mavlink's raw messages fn try_from(value: Packet) -> Result { + fn try_from(value: Packet) -> Result { + match value { + Packet::V1(_) => Err(mavlink::error::MessageReadError::Io(std::io::Error::new( + std::io::ErrorKind::Unsupported, + "Expected V2 Message", + ))), + Packet::V2(v2_packet) => mavlink::MAVLinkV2MessageRaw::try_from(v2_packet), + } + } +} + +impl From for V1Packet { + fn from(value: mavlink::MAVLinkV1MessageRaw) -> Self { + let bytes = value.raw_bytes(); + + let mut buf = BytesMut::with_capacity(Self::MAX_PACKET_SIZE); + + let size_without_checksum = bytes.len() - V1Packet::CHECKSUM_SIZE; + buf.put(&bytes[..size_without_checksum]); + buf.put_bytes( + 0, + V1Packet::MAX_PACKET_SIZE - V1Packet::CHECKSUM_SIZE - size_without_checksum, + ); + buf.put(&bytes[bytes.len() - V1Packet::CHECKSUM_SIZE..]); + + Self { + buffer: buf.freeze(), + } + } +} + +impl TryFrom for mavlink::MAVLinkV1MessageRaw { + type Error = mavlink::error::MessageReadError; + + /// A convenient rust-mavlink compatibility layer + /// warning: this has a bad performance because we don't have access to the mutable internal buffer of rust-mavlink's raw messages + fn try_from(value: V1Packet) -> Result { + use mavlink::ardupilotmega::MavMessage; + + let mut reader = mavlink::peek_reader::PeekReader::new(value.as_slice()); + mavlink::read_v1_raw_message::(&mut reader) + } +} + +impl From for V2Packet { + fn from(value: mavlink::MAVLinkV2MessageRaw) -> Self { + let bytes = value.raw_bytes(); + + let mut buf = BytesMut::with_capacity(bytes.len()); + + buf.put(bytes); + + Self { + buffer: buf.freeze(), + } + } +} + +impl TryFrom for mavlink::MAVLinkV2MessageRaw { + type Error = mavlink::error::MessageReadError; + + /// A convenient rust-mavlink compatibility layer + /// warning: this has a bad performance because we don't have access to the mutable internal buffer of rust-mavlink's raw messages + fn try_from(value: V2Packet) -> Result { + use mavlink::ardupilotmega::MavMessage; + + let mut reader = mavlink::peek_reader::PeekReader::new(value.as_slice()); + mavlink::read_v2_raw_message::(&mut reader) + } +} diff --git a/src/v1.rs b/src/v1.rs new file mode 100644 index 0000000..433791a --- /dev/null +++ b/src/v1.rs @@ -0,0 +1,274 @@ +use bytes::Bytes; + +pub const V1_STX: u8 = 0xFE; + +#[derive(Default, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct V1Packet { + pub(crate) buffer: Bytes, +} + +impl std::fmt::Debug for V1Packet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("V1Packet") + .field("buffer", &&self.buffer[..]) + .finish() + } +} + +impl V1Packet { + pub const STX_SIZE: usize = 1; + pub const HEADER_SIZE: usize = 5; + pub const MAX_PAYLOAD_SIZE: usize = 255; + pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); + pub const MAX_PACKET_SIZE: usize = V1Packet::STX_SIZE + + V1Packet::HEADER_SIZE + + V1Packet::MAX_PAYLOAD_SIZE + + V1Packet::CHECKSUM_SIZE; + + #[inline(always)] + pub fn new(bytes: Bytes) -> Self { + Self { buffer: bytes } + } + + #[inline(always)] + pub fn bytes(&self) -> &Bytes { + &self.buffer + } + + #[inline(always)] + pub fn as_slice(&self) -> &[u8] { + &self.buffer[..] + } + + #[inline(always)] + pub fn header(&self) -> &[u8] { + header(&self.buffer) + } + + #[inline(always)] + pub fn payload(&self) -> &[u8] { + payload(&self.buffer) + } + + #[inline(always)] + pub fn checksum(&self) -> u16 { + checksum(&self.buffer) + } + + #[inline(always)] + pub fn checksum_data(&self) -> &[u8] { + checksum_data(&self.buffer) + } + + #[inline(always)] + pub fn packet_size(&self) -> usize { + packet_size(&self.buffer) + } + + #[inline(always)] + pub fn stx(&self) -> &u8 { + stx(&self.buffer) + } + + #[inline(always)] + pub fn payload_length(&self) -> &u8 { + len(&self.buffer) + } + + #[inline(always)] + pub fn sequence(&self) -> &u8 { + seq(&self.buffer) + } + + #[inline(always)] + pub fn system_id(&self) -> &u8 { + sysid(&self.buffer) + } + + #[inline(always)] + pub fn component_id(&self) -> &u8 { + compid(&self.buffer) + } + + #[inline(always)] + pub fn message_id(&self) -> &u8 { + msgid(&self.buffer) + } +} + +#[inline(always)] +pub(crate) fn header>(buf: &T) -> &[u8] { + let header_start = V1Packet::STX_SIZE; + let header_end = header_start + V1Packet::HEADER_SIZE; + + &buf.as_ref()[header_start..header_end] +} + +#[inline(always)] +pub(crate) fn payload>(buf: &T) -> &[u8] { + let payload_start = V1Packet::STX_SIZE + V1Packet::HEADER_SIZE; + let payload_size = *len(buf) as usize; + let payload_end = payload_start + payload_size; + + &buf.as_ref()[payload_start..payload_end] +} + +#[inline(always)] +pub(crate) fn checksum>(buf: &T) -> u16 { + let checksum_end = packet_size(buf); + let checksum_start = checksum_end - V1Packet::CHECKSUM_SIZE; + + let buf = buf.as_ref(); + u16::from_le_bytes([buf[checksum_start], buf[checksum_end - 1]]) +} + +#[inline(always)] +pub(crate) fn checksum_data>(buf: &T) -> &[u8] { + let checksum_data_start = V1Packet::STX_SIZE; + let payload_size = *len(buf) as usize; + let checksum_data_end = V1Packet::STX_SIZE + V1Packet::HEADER_SIZE + payload_size; + + &buf.as_ref()[checksum_data_start..checksum_data_end] +} + +#[inline(always)] +pub(crate) fn packet_size>(_buf: &T) -> usize { + let stx = V1Packet::STX_SIZE; + let header = V1Packet::HEADER_SIZE; + let payload = V1Packet::MAX_PAYLOAD_SIZE; // Payload always includes zero + let checksum = V1Packet::CHECKSUM_SIZE; + + stx + header + payload + checksum +} + +#[inline(always)] +pub(crate) fn stx>(buf: &T) -> &u8 { + &buf.as_ref()[0] +} + +#[inline(always)] +pub(crate) fn len>(buf: &T) -> &u8 { + &buf.as_ref()[1] +} + +#[inline(always)] +pub(crate) fn seq>(buf: &T) -> &u8 { + &buf.as_ref()[2] +} + +#[inline(always)] +pub(crate) fn sysid>(buf: &T) -> &u8 { + &buf.as_ref()[3] +} + +#[inline(always)] +pub(crate) fn compid>(buf: &T) -> &u8 { + &buf.as_ref()[4] +} + +#[inline(always)] +pub(crate) fn msgid>(buf: &T) -> &u8 { + &buf.as_ref()[5] +} + +#[cfg(test)] +mod test { + use super::*; + + pub const HEARTBEAT: &[u8; V1Packet::MAX_PACKET_SIZE] = &[ + 254, // stx + 9, // payload len + 239, 1, 2, // header + // payload: + 0, 5, 0, 0, 0, 2, 3, 89, 3, 3, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, // + 31, 80, // crc + ]; + + #[test] + fn test_stx() { + assert_eq!(*stx(&HEARTBEAT), V1_STX); + } + + #[test] + fn test_len() { + assert_eq!(*len(&HEARTBEAT), 9); + } + + #[test] + fn test_seq() { + assert_eq!(*seq(&HEARTBEAT), 239); + } + + #[test] + fn test_sysid() { + assert_eq!(*sysid(&HEARTBEAT), 1); + } + + #[test] + fn test_compid() { + assert_eq!(*compid(&HEARTBEAT), 2); + } + + #[test] + fn test_msgid() { + assert_eq!(*msgid(&HEARTBEAT), 0); + } + + #[test] + fn test_header() { + assert_eq!(header(&HEARTBEAT), &HEARTBEAT[1..6]); + } + + #[test] + fn test_payload() { + assert_eq!(payload(&HEARTBEAT), &HEARTBEAT[6..15]); + } + + #[test] + fn test_checksum() { + assert_eq!(checksum(&HEARTBEAT), u16::from_le_bytes([31, 80])); + } + + #[test] + fn test_checksum_data() { + assert_eq!(checksum_data(&HEARTBEAT), &HEARTBEAT[1..15]); + } + + #[test] + fn test_v1packet_from_raw_v1_message() { + use mavlink::{common::MavMessage, MAVLinkV1MessageRaw, MavHeader, Message}; + + let raw_v1_message = { + let header = MavHeader { + system_id: 1, + component_id: 1, + sequence: 0, + }; + + let message_data = MavMessage::default_message_from_id(0).unwrap(); // Heartbeat message + let mut raw_v1_message = MAVLinkV1MessageRaw::new(); + raw_v1_message.serialize_message(header, &message_data); + raw_v1_message + }; + + let v1_packet = V1Packet::from(raw_v1_message); + + assert_eq!(v1_packet.header(), raw_v1_message.clone().header()); // Todo: remote this clone once [this PR](https://github.com/mavlink/rust-mavlink/pull/288) get merged upstream + assert_eq!(*v1_packet.stx(), raw_v1_message.raw_bytes()[0]); + assert_eq!(*v1_packet.payload_length(), raw_v1_message.payload_length()); + assert_eq!(*v1_packet.sequence(), raw_v1_message.sequence()); + assert_eq!(*v1_packet.system_id(), raw_v1_message.system_id()); + assert_eq!(*v1_packet.component_id(), raw_v1_message.component_id()); + assert_eq!(*v1_packet.message_id(), raw_v1_message.message_id()); + assert_eq!(v1_packet.payload(), raw_v1_message.payload()); + assert_eq!(v1_packet.checksum(), raw_v1_message.checksum()); + } +} diff --git a/src/v2.rs b/src/v2.rs new file mode 100644 index 0000000..d0e3377 --- /dev/null +++ b/src/v2.rs @@ -0,0 +1,372 @@ +use bytes::Bytes; + +pub const V2_STX: u8 = 0xFD; +pub const MAVLINK_IFLAG_SIGNED: u8 = 0x01; +pub const MAVLINK_SUPPORTED_IFLAGS: u8 = MAVLINK_IFLAG_SIGNED; + +#[derive(Default, Clone, PartialEq, Eq, PartialOrd, Ord, Hash)] +pub struct V2Packet { + pub(crate) buffer: Bytes, +} + +impl std::fmt::Debug for V2Packet { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("V2Packet") + .field("buffer", &&self.buffer[..]) + .finish() + } +} + +impl V2Packet { + pub const STX_SIZE: usize = 1; + pub const HEADER_SIZE: usize = 9; + pub const MAX_PAYLOAD_SIZE: usize = 255; + pub const CHECKSUM_SIZE: usize = std::mem::size_of::(); + pub const SIGNATURE_SIZE: usize = 13; + pub const MAX_PACKET_SIZE: usize = V2Packet::STX_SIZE + + V2Packet::HEADER_SIZE + + V2Packet::MAX_PAYLOAD_SIZE + + V2Packet::CHECKSUM_SIZE + + V2Packet::SIGNATURE_SIZE; + + #[inline(always)] + pub fn new(bytes: Bytes) -> Self { + Self { buffer: bytes } + } + + #[inline(always)] + pub fn bytes(&self) -> &Bytes { + &self.buffer + } + + #[inline(always)] + pub fn as_slice(&self) -> &[u8] { + &self.buffer[..] + } + + #[inline(always)] + pub fn header(&self) -> &[u8] { + header(&self.buffer) + } + + #[inline(always)] + pub fn payload(&self) -> &[u8] { + payload(&self.buffer) + } + + #[inline(always)] + pub fn checksum(&self) -> u16 { + checksum(&self.buffer) + } + + #[inline(always)] + pub fn signature(&self) -> Option<&[u8]> { + signature(&self.buffer) + } + + #[inline(always)] + pub fn checksum_data(&self) -> &[u8] { + checksum_data(&self.buffer) + } + + #[inline(always)] + pub fn packet_size(&self) -> usize { + packet_size(&self.buffer) + } + + #[inline(always)] + pub fn has_signature(&self) -> bool { + has_signature(&self.buffer) + } + + #[inline(always)] + pub fn stx(&self) -> &u8 { + stx(&self.buffer) + } + + #[inline(always)] + pub fn payload_length(&self) -> &u8 { + len(&self.buffer) + } + + #[inline(always)] + pub fn incompatibility_flags(&self) -> &u8 { + incompat_flags(&self.buffer) + } + + #[inline(always)] + pub fn compatibility_flags(&self) -> &u8 { + compat_flags(&self.buffer) + } + + #[inline(always)] + pub fn sequence(&self) -> &u8 { + seq(&self.buffer) + } + + #[inline(always)] + pub fn system_id(&self) -> &u8 { + sysid(&self.buffer) + } + + #[inline(always)] + pub fn component_id(&self) -> &u8 { + compid(&self.buffer) + } + + #[inline(always)] + pub fn message_id(&self) -> u32 { + msgid(&self.buffer) + } +} + +#[inline(always)] +pub(crate) fn header>(buf: &T) -> &[u8] { + let header_start = 1; + let header_end = header_start + V2Packet::HEADER_SIZE; + + &buf.as_ref()[header_start..header_end] +} + +#[inline(always)] +pub(crate) fn payload>(buf: &T) -> &[u8] { + let payload_start = V2Packet::STX_SIZE + V2Packet::HEADER_SIZE; + let payload_size = *len(buf) as usize; + let payload_end = payload_start + payload_size; + + &buf.as_ref()[payload_start..payload_end] +} + +#[inline(always)] +pub(crate) fn checksum>(buf: &T) -> u16 { + let checksum_end = packet_size(buf); + let checksum_start = checksum_end - V2Packet::CHECKSUM_SIZE; + + let buf = buf.as_ref(); + u16::from_le_bytes([buf[checksum_start], buf[checksum_end - 1]]) +} + +#[inline(always)] +pub(crate) fn signature>(buf: &T) -> Option<&[u8]> { + if !has_signature(buf) { + return None; + } + + let payload_size = *len(buf) as usize; + let signature_start = + V2Packet::STX_SIZE + V2Packet::HEADER_SIZE + payload_size + V2Packet::CHECKSUM_SIZE; + let signature_end = signature_start + V2Packet::SIGNATURE_SIZE; + + Some(&buf.as_ref()[signature_start..signature_end]) +} + +#[inline(always)] +pub(crate) fn checksum_data>(buf: &T) -> &[u8] { + let checksum_data_start = V2Packet::STX_SIZE; + let payload_size = *len(buf) as usize; + let checksum_data_end = V2Packet::STX_SIZE + V2Packet::HEADER_SIZE + payload_size; + + &buf.as_ref()[checksum_data_start..checksum_data_end] +} + +#[inline(always)] +pub(crate) fn packet_size>(buf: &T) -> usize { + let stx = V2Packet::STX_SIZE; + let header = V2Packet::HEADER_SIZE; + let payload = *len(buf) as usize; + let checksum = V2Packet::CHECKSUM_SIZE; + let signature = has_signature(buf) + .then_some(V2Packet::SIGNATURE_SIZE) + .unwrap_or_default(); + + stx + header + payload + checksum + signature +} + +#[inline(always)] +pub(crate) fn has_signature>(buf: &T) -> bool { + incompat_flags(buf) & (IncompatibilityFlags::Signed as u8) == 1 +} + +#[inline(always)] +pub(crate) fn stx>(buf: &T) -> &u8 { + &buf.as_ref()[0] +} + +#[inline(always)] +pub(crate) fn len>(buf: &T) -> &u8 { + &buf.as_ref()[1] +} + +#[inline(always)] +pub(crate) fn incompat_flags>(buf: &T) -> &u8 { + &buf.as_ref()[2] +} + +#[inline(always)] +pub(crate) fn compat_flags>(buf: &T) -> &u8 { + &buf.as_ref()[3] +} + +#[inline(always)] +pub(crate) fn seq>(buf: &T) -> &u8 { + &buf.as_ref()[4] +} + +#[inline(always)] +pub(crate) fn sysid>(buf: &T) -> &u8 { + &buf.as_ref()[5] +} + +#[inline(always)] +pub(crate) fn compid>(buf: &T) -> &u8 { + &buf.as_ref()[6] +} + +#[inline(always)] +pub(crate) fn msgid>(buf: &T) -> u32 { + let buf = buf.as_ref(); + u32::from_le_bytes([buf[7], buf[8], buf[9], 0]) +} + +#[derive(Copy, Clone, Debug, PartialEq)] +#[repr(u8)] +enum IncompatibilityFlags { + Signed = 0x01, +} + +#[cfg(test)] +mod test { + + use super::*; + const COMMAND_LONG: &[u8] = &[ + 253, // stx + 30, // payload len + 0, // incompat flags + 0, // compat flags + 0, 0, 50, // header + 76, 0, 0, // msg ID + // payload: + 0, 0, 230, 66, 0, 64, 156, 69, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, + 255, 1, // + 188, 195, // crc + ]; + + #[test] + fn test_msgid() { + assert_eq!(msgid(&COMMAND_LONG), 76); + } + + #[test] + fn test_stx() { + assert_eq!(*stx(&COMMAND_LONG), V2_STX); + } + + #[test] + fn test_len() { + assert_eq!(*len(&COMMAND_LONG), 30); + } + + #[test] + fn test_incompat_flags() { + assert_eq!(*incompat_flags(&COMMAND_LONG), 0); + } + + #[test] + fn test_compat_flags() { + assert_eq!(*compat_flags(&COMMAND_LONG), 0); + } + + #[test] + fn test_seq() { + assert_eq!(*seq(&COMMAND_LONG), 0); + } + + #[test] + fn test_sysid() { + assert_eq!(*sysid(&COMMAND_LONG), 0); + } + + #[test] + fn test_compid() { + assert_eq!(*compid(&COMMAND_LONG), 50); + } + + #[test] + fn test_header() { + assert_eq!(header(&COMMAND_LONG), &COMMAND_LONG[1..10]); + } + + #[test] + fn test_payload() { + assert_eq!( + payload(&COMMAND_LONG), + &COMMAND_LONG[10..(10 + 30)] // HEADER_SIZE + payload length + ); + } + + #[test] + fn test_checksum() { + assert_eq!(checksum(&COMMAND_LONG), 0xC3BC); // crc in little endian + } + + #[test] + fn test_signature_none() { + assert!(signature(&COMMAND_LONG).is_none()); + } + + #[test] + fn test_has_signature_false() { + assert!(!has_signature(&COMMAND_LONG)); + } + + #[test] + fn test_checksum_data() { + assert_eq!( + checksum_data(&COMMAND_LONG), + &COMMAND_LONG[1..(10 + 30)] // from len (1) to payload end + ); + } + + #[test] + fn test_packet_size_no_signature() { + assert_eq!(packet_size(&COMMAND_LONG), 10 + 30 + 2); // HEADER_SIZE + payload + crc + } + + #[test] + fn test_v2packet_from_raw_v2_message() { + use mavlink::{ardupilotmega::MavMessage, MAVLinkV2MessageRaw, MavHeader, Message}; + + let raw_v2_message = { + let header = MavHeader { + system_id: 1, + component_id: 1, + sequence: 0, + }; + + let message_data = MavMessage::default_message_from_id(0).unwrap(); // Heartbeat message + let mut raw_v2_message = MAVLinkV2MessageRaw::new(); + raw_v2_message.serialize_message(header, &message_data); + raw_v2_message + }; + + let v2_packet = V2Packet::from(raw_v2_message); + + assert_eq!(v2_packet.header(), raw_v2_message.header()); // Todo: remote this clone once [this PR](https://github.com/mavlink/rust-mavlink/pull/288) get merged upstream + assert_eq!(*v2_packet.stx(), raw_v2_message.raw_bytes()[0]); + assert_eq!(*v2_packet.payload_length(), raw_v2_message.payload_length()); + assert_eq!( + *v2_packet.incompatibility_flags(), + raw_v2_message.incompatibility_flags() + ); + assert_eq!( + *v2_packet.compatibility_flags(), + raw_v2_message.compatibility_flags() + ); + assert_eq!(*v2_packet.sequence(), raw_v2_message.sequence()); + assert_eq!(*v2_packet.system_id(), raw_v2_message.system_id()); + assert_eq!(*v2_packet.component_id(), raw_v2_message.component_id()); + assert_eq!(v2_packet.message_id(), raw_v2_message.message_id()); + assert_eq!(v2_packet.payload(), raw_v2_message.payload()); + assert_eq!(v2_packet.checksum(), raw_v2_message.checksum()); + } +} diff --git a/tests/chuncked_decode_test.rs b/tests/chuncked_decode_test.rs new file mode 100644 index 0000000..b1fb1ff --- /dev/null +++ b/tests/chuncked_decode_test.rs @@ -0,0 +1,161 @@ +use rand::{prelude::StdRng, Rng, SeedableRng}; +use tokio::io::AsyncWriteExt; +use tokio_stream::StreamExt; +use tokio_util::codec::FramedRead; + +use dev_utils::{add_random_v1_message, add_random_v2_message, chunk_buffer_randomly}; +use mavlink_codec::{codec::MavlinkCodec, v1::V1Packet, v2::V2Packet}; + +#[tokio::test] +async fn chuncked_decode_v1() { + let seed = 42; + println!("Using seed {seed:?}"); + let mut rng: StdRng = SeedableRng::seed_from_u64(seed); + + let messages_count = 1000; + let mut messages = Vec::with_capacity(messages_count); + for _ in 0..messages_count { + let mut message = Vec::with_capacity(V1Packet::MAX_PACKET_SIZE); + add_random_v1_message(&mut message, &mut rng); + messages.push(message); + } + println!("Generated {} messages", messages.len()); + + let messages_cloned = messages.clone(); + let mut messages = messages.concat(); + println!("Total concatenated message size: {}", messages.len()); + + // Add some trash in the beginning + for _ in 0..100 { + messages.insert(0, rng.gen_range(0..255)); + } + println!("Added trash to the beginning of the message"); + + let (reader, mut writer) = tokio::io::simplex(4096); + let chunked_buf = chunk_buffer_randomly(&messages, &mut rng, 1, 128); + println!("Chunked buffer into {} chunks", chunked_buf.len()); + + // Inject the chunked data into the writer asynchronously + let writer_task = tokio::spawn(async move { + for (idx, chunk) in chunked_buf.iter().enumerate() { + writer.write_all(chunk).await.unwrap(); + println!("Sent chunk {idx}, size: {:?}", chunk.len()); + // Simulate network delay + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + } + println!("All messages sent"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + writer.shutdown().await.unwrap(); + }); + + let codec = MavlinkCodec::::default(); + let mut framed = FramedRead::new(reader, codec); + + let mut i = 0; + while let Some(Ok(result)) = framed.next().await { + match result { + Ok(packet) => { + let buffer = match &packet { + mavlink_codec::Packet::V1(_v1_packet) => _v1_packet.as_slice(), + mavlink_codec::Packet::V2(_v2_packet) => panic!("Got a wrong package"), + }; + + assert_eq!(buffer, messages_cloned[i].as_slice()); + // println!("Successfully decoded message {i}"); + + i += 1; + if i == messages_count { + break; + } + } + Err(error) => { + eprintln!( + "Error while decoding packet at message {i}: {error:?}. framed buffer: {:?}", + framed.read_buffer().to_vec() + ); + } + } + } + + assert_eq!(i, messages_count); + println!("All {i} messages successfully decoded!"); + + writer_task.abort(); +} + +#[tokio::test] +async fn chuncked_decode_v2() { + let seed = 42; + println!("Using seed {seed:?}"); + let mut rng: StdRng = SeedableRng::seed_from_u64(seed); + + let messages_count = 1000; + let mut messages = Vec::with_capacity(messages_count); + for _ in 0..messages_count { + let mut message = Vec::with_capacity(V2Packet::MAX_PACKET_SIZE); + add_random_v2_message(&mut message, &mut rng); + messages.push(message); + } + println!("Generated {} messages", messages.len()); + + let messages_cloned = messages.clone(); + let mut messages = messages.concat(); + println!("Total concatenated message size: {}", messages.len()); + + // Add some trash in the beginning + for _ in 0..100 { + messages.insert(0, rng.gen_range(0..255)); + } + println!("Added trash to the beginning of the message"); + + let (reader, mut writer) = tokio::io::simplex(4096); + let chunked_buf = chunk_buffer_randomly(&messages, &mut rng, 1, 128); + println!("Chunked buffer into {} chunks", chunked_buf.len()); + + // Inject the chunked data into the writer asynchronously + let writer_task = tokio::spawn(async move { + for (idx, chunk) in chunked_buf.iter().enumerate() { + writer.write_all(chunk).await.unwrap(); + println!("Sent chunk {idx}, size: {:?}", chunk.len()); + // Simulate network delay + tokio::time::sleep(tokio::time::Duration::from_micros(1)).await; + } + println!("All messages sent"); + tokio::time::sleep(std::time::Duration::from_secs(5)).await; + writer.shutdown().await.unwrap(); + }); + + let codec = MavlinkCodec::::default(); + let mut framed = FramedRead::new(reader, codec); + + let mut i = 0; + while let Some(Ok(result)) = framed.next().await { + match result { + Ok(packet) => { + let buffer = match &packet { + mavlink_codec::Packet::V1(_v1_packet) => panic!("Got a wrong package"), + mavlink_codec::Packet::V2(_v2_packet) => _v2_packet.as_slice(), + }; + + assert_eq!(buffer, messages_cloned[i]); + // println!("Successfully decoded message {i}"); + + i += 1; + if i == messages_count { + break; + } + } + Err(error) => { + eprintln!( + "Error while decoding packet at message {i}: {error:?}. framed buffer: {:?}", + framed.read_buffer().to_vec() + ); + } + } + } + + assert_eq!(i, messages_count); + println!("All {i} messages successfully decoded!"); + + writer_task.abort(); +} diff --git a/tests/framed_send_recv.rs b/tests/framed_send_recv.rs new file mode 100644 index 0000000..51f3ff4 --- /dev/null +++ b/tests/framed_send_recv.rs @@ -0,0 +1,167 @@ +use futures::SinkExt; +use rand::{prelude::StdRng, SeedableRng}; +use tokio_stream::StreamExt; +use tokio_util::codec::Framed; + +use dev_utils::{create_random_v1_raw_message, create_random_v2_raw_message}; +use mavlink_codec::{codec::MavlinkCodec, Packet}; + +#[tokio::test] +async fn send_recv_v1() { + let seed = 42; + println!("Using seed {seed:?}"); + let mut rng: StdRng = SeedableRng::seed_from_u64(seed); + + let packets_count = 100000; + let mut packets = Vec::with_capacity(packets_count); + for _ in 0..packets_count { + let mavlink_v1_message_raw = create_random_v1_raw_message(&mut rng); + let packet = Packet::from(mavlink_v1_message_raw); + packets.push(packet); + } + + let codec = MavlinkCodec::::default(); + let simplex = tokio::io::SimplexStream::new_unsplit(4096); + let framed = Framed::new(simplex, codec); + let (mut writer, mut reader) = futures::StreamExt::split(framed); + + // Send and receive each packet sequentially + for (idx, packet) in packets.iter().enumerate() { + println!("Sending packet {idx}"); + // Send the packet + writer.send(packet.clone()).await.unwrap(); + + // Wait for the response + if let Some(Ok(Ok(received_packet))) = reader.next().await { + println!("Received packet {idx}"); + assert_eq!(received_packet, *packet); + } else { + panic!("Failed to receive packet {idx}"); + } + } + + println!("All packets sent and received successfully!"); +} + +#[tokio::test] +async fn send_recv_v1_concurrent() { + let seed = 42; + println!("Using seed {seed:?}"); + let mut rng: StdRng = SeedableRng::seed_from_u64(seed); + + let packets_count = 100000; + let mut packets = Vec::with_capacity(packets_count); + for _ in 0..packets_count { + let mavlink_v1_message_raw = create_random_v1_raw_message(&mut rng); + let packet = Packet::from(mavlink_v1_message_raw); + packets.push(packet); + } + + let codec = MavlinkCodec::::default(); + let simplex = tokio::io::SimplexStream::new_unsplit(4096); + let framed = Framed::new(simplex, codec); + let (mut writer, mut reader) = futures::StreamExt::split(framed); + + let packets_cloned = packets.clone(); + let writer_task = tokio::spawn(async move { + for (idx, packet) in packets_cloned.iter().enumerate() { + println!("Sending packet {idx}"); + writer.send(packet.clone()).await.unwrap(); + } + }); + + let packets_cloned = packets.clone(); + let reader_task = tokio::spawn(async move { + let mut received_count = 0; + while received_count < packets_count { + if let Some(Ok(Ok(received_packet))) = reader.next().await { + println!("Received packet {received_count}"); + assert_eq!(received_packet, packets_cloned[received_count]); + received_count += 1; + } + } + }); + + let _ = tokio::join!(writer_task, reader_task); + println!("All packets sent and received successfully!"); +} + +#[tokio::test] +async fn send_recv_v2() { + let seed = 42; + println!("Using seed {seed:?}"); + let mut rng: StdRng = SeedableRng::seed_from_u64(seed); + + let packets_count = 100000; + let mut packets = Vec::with_capacity(packets_count); + for _ in 0..packets_count { + let mavlink_v2_message_raw = create_random_v2_raw_message(&mut rng); + let packet = Packet::from(mavlink_v2_message_raw); + packets.push(packet); + } + + let codec = MavlinkCodec::::default(); + let simplex = tokio::io::SimplexStream::new_unsplit(4096); + let framed = Framed::new(simplex, codec); + let (mut writer, mut reader) = futures::StreamExt::split(framed); + + // Send and receive each packet sequentially + for (idx, packet) in packets.iter().enumerate() { + println!("Sending packet {idx}"); + // Send the packet + writer.send(packet.clone()).await.unwrap(); + + // Wait for the response + if let Some(Ok(Ok(received_packet))) = reader.next().await { + println!("Received packet {idx}"); + assert_eq!(received_packet, *packet); + } else { + panic!("Failed to receive packet {idx}"); + } + } + + println!("All packets sent and received successfully!"); +} + +#[tokio::test] +async fn send_recv_v2_concurrent() { + let seed = 42; + println!("Using seed {seed:?}"); + let mut rng: StdRng = SeedableRng::seed_from_u64(seed); + + let packets_count = 100000; + let mut packets = Vec::with_capacity(packets_count); + for _ in 0..packets_count { + let mavlink_v2_message_raw = create_random_v2_raw_message(&mut rng); + let packet = Packet::from(mavlink_v2_message_raw); + packets.push(packet); + } + + let codec = MavlinkCodec::::default(); + let simplex = tokio::io::SimplexStream::new_unsplit(4096); + let framed = Framed::new(simplex, codec); + let (mut writer, mut reader) = futures::StreamExt::split(framed); + + let packets_cloned = packets.clone(); + let writer_task = tokio::spawn(async move { + for (idx, packet) in packets_cloned.iter().enumerate() { + println!("Sending packet {idx}"); + writer.send(packet.clone()).await.unwrap(); + } + }); + + let packets_cloned = packets.clone(); + let reader_task = tokio::spawn(async move { + let mut received_count = 0; + while received_count < packets_count { + if let Some(Ok(Ok(received_packet))) = reader.next().await { + println!("Received packet {received_count}"); + assert_eq!(received_packet, packets_cloned[received_count]); + received_count += 1; + } + } + }); + + let _ = tokio::join!(writer_task, reader_task); + println!("All packets sent and received successfully!"); +}