From fd88f0d1b63c745b23162d8b030e5648d1d246e7 Mon Sep 17 00:00:00 2001 From: Edmund Kump Date: Fri, 22 Nov 2024 15:47:02 -0500 Subject: [PATCH] WIP: Attempting to no-copy from ByteSlice to TinyBytes --- Cargo.lock | 2 ++ data-pipeline-ffi/Cargo.toml | 1 + data-pipeline-ffi/src/trace_exporter.rs | 5 ++++- .../examples/send-traces-with-stats.rs | 2 +- data-pipeline/src/trace_exporter.rs | 20 ++++++++++--------- ddcommon-ffi/Cargo.toml | 1 + ddcommon-ffi/src/slice.rs | 12 +++++++++++ 7 files changed, 32 insertions(+), 11 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index 2a021886d..df5c75a0a 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1328,6 +1328,7 @@ dependencies = [ "data-pipeline", "ddcommon-ffi", "libc", + "tinybytes", ] [[package]] @@ -1835,6 +1836,7 @@ dependencies = [ "ddcommon", "hyper 0.14.31", "serde", + "tinybytes", ] [[package]] diff --git a/data-pipeline-ffi/Cargo.toml b/data-pipeline-ffi/Cargo.toml index 8d5f4b6a0..de29d1b42 100644 --- a/data-pipeline-ffi/Cargo.toml +++ b/data-pipeline-ffi/Cargo.toml @@ -26,3 +26,4 @@ data-pipeline = { path = "../data-pipeline" } ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false } bytes = "1.4" libc = "0.2.153" +tinybytes = { path = "../tinybytes" } diff --git a/data-pipeline-ffi/src/trace_exporter.rs b/data-pipeline-ffi/src/trace_exporter.rs index b3cdbe365..9b16c90c1 100644 --- a/data-pipeline-ffi/src/trace_exporter.rs +++ b/data-pipeline-ffi/src/trace_exporter.rs @@ -111,8 +111,11 @@ pub unsafe extern "C" fn ddog_trace_exporter_send( trace_count: usize, ) -> MaybeError { // TODO - handle errors - https://datadoghq.atlassian.net/browse/APMSP-1095 + let static_trace: ByteSlice<'static> = std::mem::transmute(trace); + let tinybytes_trace = tinybytes::Bytes::from(static_trace); + handle - .send(trace.as_bytes(), trace_count) + .send(tinybytes_trace, trace_count) .unwrap_or(String::from("")); MaybeError::None } diff --git a/data-pipeline/examples/send-traces-with-stats.rs b/data-pipeline/examples/send-traces-with-stats.rs index be58c1712..06722f319 100644 --- a/data-pipeline/examples/send-traces-with-stats.rs +++ b/data-pipeline/examples/send-traces-with-stats.rs @@ -55,6 +55,6 @@ fn main() { traces.push(trace); } let data = rmp_serde::to_vec_named(&traces).unwrap(); - exporter.send(&data, 100).unwrap(); + exporter.send(tinybytes::Bytes::from(data), 100).unwrap(); exporter.shutdown(None).unwrap(); } diff --git a/data-pipeline/src/trace_exporter.rs b/data-pipeline/src/trace_exporter.rs index efa3ce909..dc5df4c59 100644 --- a/data-pipeline/src/trace_exporter.rs +++ b/data-pipeline/src/trace_exporter.rs @@ -250,12 +250,12 @@ impl TraceExporter { /// Send msgpack serialized traces to the agent #[allow(missing_docs)] - pub fn send(&self, data: &[u8], trace_count: usize) -> Result { + pub fn send(&self, data: tinybytes::Bytes, trace_count: usize) -> Result { self.check_agent_info(); match self.input_format { - TraceExporterInputFormat::Proxy => self.send_proxy(data, trace_count), + TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count), TraceExporterInputFormat::V04 => { - self.send_deser_ser(tinybytes::Bytes::copy_from_slice(data)) + self.send_deser_ser(data) // TODO: APMSP-1582 - Refactor data-pipeline-ffi so we can leverage a type that // implements tinybytes::UnderlyingBytes trait to avoid copying // self.send_deser_ser(tinybytes::Bytes::copy_from_slice(data)) @@ -1184,7 +1184,7 @@ mod tests { }]; let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap(); - + // Wait for the info fetcher to get the config while mock_info.hits() == 0 { exporter.runtime.block_on(async { @@ -1192,7 +1192,7 @@ mod tests { }) } - exporter.send(data.as_slice(), 1).unwrap(); + exporter.send(tinybytes::Bytes::from(data), 1).unwrap(); exporter.shutdown(None).unwrap(); mock_traces.assert(); @@ -1311,7 +1311,7 @@ mod tests { }], ]; let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); - let _result = exporter.send(&bytes, 1).expect("failed to send trace"); + let _result = exporter.send(tinybytes::Bytes::from(bytes), 1).expect("failed to send trace"); assert_eq!( &format!( @@ -1341,9 +1341,11 @@ mod tests { fake_agent.url("/v0.4/traces"), stats_socket.local_addr().unwrap().to_string(), ); - + + let bad_payload = tinybytes::Bytes::copy_from_slice(b"some_bad_payload".as_ref()); + let _result = exporter - .send(b"some_bad_payload", 1) + .send(bad_payload, 1) .expect("failed to send trace"); assert_eq!( @@ -1378,7 +1380,7 @@ mod tests { ..Default::default() }]]; let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace"); - let _result = exporter.send(&bytes, 1).expect("failed to send trace"); + let _result = exporter.send(tinybytes::Bytes::from(bytes), 1).expect("failed to send trace"); assert_eq!( &format!( diff --git a/ddcommon-ffi/Cargo.toml b/ddcommon-ffi/Cargo.toml index e92eb51e1..6b0755262 100644 --- a/ddcommon-ffi/Cargo.toml +++ b/ddcommon-ffi/Cargo.toml @@ -25,6 +25,7 @@ crossbeam-queue = "0.3.11" ddcommon = { path = "../ddcommon" } hyper = {version = "0.14", features = ["backports", "deprecated"], default-features = false} serde = "1.0" +tinybytes = { path = "../tinybytes" } [dev-dependencies] bolero = "0.10.1" diff --git a/ddcommon-ffi/src/slice.rs b/ddcommon-ffi/src/slice.rs index 4cc3ff417..e03e5faba 100644 --- a/ddcommon-ffi/src/slice.rs +++ b/ddcommon-ffi/src/slice.rs @@ -10,6 +10,7 @@ use std::hash::{Hash, Hasher}; use std::marker::PhantomData; use std::os::raw::c_char; use std::str::Utf8Error; +use tinybytes::UnderlyingBytes; #[repr(C)] #[derive(Copy, Clone)] @@ -52,6 +53,17 @@ pub type CharSlice<'a> = Slice<'a, c_char>; /// Use to represent bytes -- does not need to be valid UTF-8. pub type ByteSlice<'a> = Slice<'a, u8>; +unsafe impl Send for ByteSlice<'static> {} +unsafe impl Sync for ByteSlice<'static> {} +impl UnderlyingBytes for ByteSlice<'static> {} + + +impl AsRef<[u8]> for ByteSlice<'static> { + #[inline] + fn as_ref(&self) -> &[u8] { + self.as_slice() + } +} /// This exists as an intrinsic, but it is private. pub fn is_aligned_and_not_null(ptr: *const T) -> bool {