Skip to content

Commit

Permalink
WIP: Attempting to no-copy from ByteSlice to TinyBytes
Browse files Browse the repository at this point in the history
  • Loading branch information
ekump committed Nov 22, 2024
1 parent 415c43f commit e9f9a11
Show file tree
Hide file tree
Showing 7 changed files with 32 additions and 11 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions data-pipeline-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -26,3 +26,4 @@ data-pipeline = { path = "../data-pipeline" }
ddcommon-ffi = { path = "../ddcommon-ffi", default-features = false }
bytes = "1.4"
libc = "0.2.153"
tinybytes = { path = "../tinybytes" }
5 changes: 4 additions & 1 deletion data-pipeline-ffi/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -111,8 +111,11 @@ pub unsafe extern "C" fn ddog_trace_exporter_send(
trace_count: usize,
) -> MaybeError {
// TODO - handle errors - https://datadoghq.atlassian.net/browse/APMSP-1095
let static_trace: ByteSlice<'static> = std::mem::transmute(trace);
let tinybytes_trace = tinybytes::Bytes::from(static_trace);

handle
.send(trace.as_bytes(), trace_count)
.send(tinybytes_trace, trace_count)
.unwrap_or(String::from(""));
MaybeError::None
}
2 changes: 1 addition & 1 deletion data-pipeline/examples/send-traces-with-stats.rs
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,6 @@ fn main() {
traces.push(trace);
}
let data = rmp_serde::to_vec_named(&traces).unwrap();
exporter.send(&data, 100).unwrap();
exporter.send(tinybytes::Bytes::from(data), 100).unwrap();
exporter.shutdown(None).unwrap();
}
20 changes: 11 additions & 9 deletions data-pipeline/src/trace_exporter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -250,12 +250,12 @@ impl TraceExporter {

/// Send msgpack serialized traces to the agent
#[allow(missing_docs)]
pub fn send(&self, data: &[u8], trace_count: usize) -> Result<String, String> {
pub fn send(&self, data: tinybytes::Bytes, trace_count: usize) -> Result<String, String> {
self.check_agent_info();
match self.input_format {
TraceExporterInputFormat::Proxy => self.send_proxy(data, trace_count),
TraceExporterInputFormat::Proxy => self.send_proxy(data.as_ref(), trace_count),
TraceExporterInputFormat::V04 => {
self.send_deser_ser(tinybytes::Bytes::copy_from_slice(data))
self.send_deser_ser(data)
// TODO: APMSP-1582 - Refactor data-pipeline-ffi so we can leverage a type that
// implements tinybytes::UnderlyingBytes trait to avoid copying
// self.send_deser_ser(tinybytes::Bytes::copy_from_slice(data))
Expand Down Expand Up @@ -1184,15 +1184,15 @@ mod tests {
}];

let data = rmp_serde::to_vec_named(&vec![trace_chunk]).unwrap();

// Wait for the info fetcher to get the config
while mock_info.hits() == 0 {
exporter.runtime.block_on(async {
sleep(Duration::from_millis(100)).await;
})
}

exporter.send(data.as_slice(), 1).unwrap();
exporter.send(tinybytes::Bytes::from(data), 1).unwrap();
exporter.shutdown(None).unwrap();

mock_traces.assert();
Expand Down Expand Up @@ -1311,7 +1311,7 @@ mod tests {
}],
];
let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
let _result = exporter.send(&bytes, 1).expect("failed to send trace");
let _result = exporter.send(tinybytes::Bytes::from(bytes), 1).expect("failed to send trace");

assert_eq!(
&format!(
Expand Down Expand Up @@ -1341,9 +1341,11 @@ mod tests {
fake_agent.url("/v0.4/traces"),
stats_socket.local_addr().unwrap().to_string(),
);


let bad_payload = tinybytes::Bytes::copy_from_slice(b"some_bad_payload".as_ref());

let _result = exporter
.send(b"some_bad_payload", 1)
.send(bad_payload, 1)
.expect("failed to send trace");

assert_eq!(
Expand Down Expand Up @@ -1378,7 +1380,7 @@ mod tests {
..Default::default()
}]];
let bytes = rmp_serde::to_vec_named(&traces).expect("failed to serialize static trace");
let _result = exporter.send(&bytes, 1).expect("failed to send trace");
let _result = exporter.send(tinybytes::Bytes::from(bytes), 1).expect("failed to send trace");

assert_eq!(
&format!(
Expand Down
1 change: 1 addition & 0 deletions ddcommon-ffi/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ crossbeam-queue = "0.3.11"
ddcommon = { path = "../ddcommon" }
hyper = {version = "0.14", features = ["backports", "deprecated"], default-features = false}
serde = "1.0"
tinybytes = { path = "../tinybytes" }

[dev-dependencies]
bolero = "0.10.1"
12 changes: 12 additions & 0 deletions ddcommon-ffi/src/slice.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ use std::hash::{Hash, Hasher};
use std::marker::PhantomData;
use std::os::raw::c_char;
use std::str::Utf8Error;
use tinybytes::UnderlyingBytes;

#[repr(C)]
#[derive(Copy, Clone)]
Expand Down Expand Up @@ -52,6 +53,17 @@ pub type CharSlice<'a> = Slice<'a, c_char>;

/// Use to represent bytes -- does not need to be valid UTF-8.
pub type ByteSlice<'a> = Slice<'a, u8>;
unsafe impl Send for ByteSlice<'static> {}
unsafe impl Sync for ByteSlice<'static> {}
impl UnderlyingBytes for ByteSlice<'static> {}


impl AsRef<[u8]> for ByteSlice<'static> {
#[inline]
fn as_ref(&self) -> &[u8] {
self.as_slice()
}
}

/// This exists as an intrinsic, but it is private.
pub fn is_aligned_and_not_null<T>(ptr: *const T) -> bool {
Expand Down

0 comments on commit e9f9a11

Please sign in to comment.