Skip to content

Commit

Permalink
add delete records api to admin client
Browse files Browse the repository at this point in the history
Delete records help truncate a topic
  • Loading branch information
Hoang Phan committed Jul 12, 2024
1 parent e69c2aa commit 9d89318
Show file tree
Hide file tree
Showing 4 changed files with 266 additions and 2 deletions.
4 changes: 4 additions & 0 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
/// Native rdkafka delete topic object.
pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;

/// Native rdkafka delete records object.
pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t;


/// Native rdkafka delete group object.
pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;

Expand Down
93 changes: 93 additions & 0 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{trace, warn};
use crate::TopicPartitionList;
use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};

//
Expand Down Expand Up @@ -131,6 +132,44 @@ impl<C: ClientContext> AdminClient<C> {
Ok(rx)
}

/// Deletes the records in topics.
///
/// Note that while the API supports deleting records from multiple topics at once, it is
/// not transactional. Deletion of some records in topics may succeed while others
/// fail. Be sure to check the result of each individual operation.
pub fn delete_records(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<RecordsResult>>> {
match self.delete_records_inner(offsets, opts) {
Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn delete_records_inner(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut err_buf = ErrBuf::new();
let delete_records = unsafe {
NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr())).unwrap()
};
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_DeleteRecords(
self.client.native_ptr(),
&mut delete_records.ptr(),
1,
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}

/// Deletes the named groups.
pub fn delete_groups(
&self,
Expand Down Expand Up @@ -601,6 +640,26 @@ fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Ve
out
}

/// The result of a DeleteRecords operation.
pub type RecordsResult = Result<String, (String, RDKafkaErrorCode)>;

fn build_records_results(offsets: *const RDKafkaTopicPartitionList) -> Vec<RecordsResult> {
let count = unsafe { (*offsets).cnt as usize };
let mut out = Vec::with_capacity(count);
for i in 0..count {
let item = unsafe { (*offsets).elems.add(i) };
let kafka_err = unsafe { (*item).err };
let name = unsafe { cstr_to_owned((*item).topic) };
if kafka_err.is_error() {
out.push(Err((name, kafka_err.into())));
} else {
out.push(Ok(name));
}
}
out
}


/// The result of a DeleteGroup operation.
pub type GroupResult = Result<String, (String, RDKafkaErrorCode)>;

Expand Down Expand Up @@ -801,6 +860,40 @@ impl Future for DeleteTopicsFuture {
}
}

//
// Delete records handling
//

type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;

unsafe impl KafkaDrop for RDKafkaDeleteRecords {
const TYPE: &'static str = "delete records";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
}

struct DeleteRecordsFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for DeleteRecordsFuture {
type Output = KafkaResult<Vec<RecordsResult>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"delete records request received response of incorrect type ({})",
typ
))));
}
let offsets = unsafe { rdsys::rd_kafka_DeleteRecords_result_offsets(res) };
Poll::Ready(Ok(build_records_results(offsets)))
}
}

//
// Delete group handling
//
Expand Down
2 changes: 1 addition & 1 deletion src/config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -150,7 +150,7 @@ impl NativeClientConfig {
}

// Convert the C string to a Rust string.
Ok(String::from_utf8_lossy(&buf).to_string())
Ok(String::from_utf8_lossy(&buf).trim_matches(char::from(0)).to_string())
}
}

Expand Down
169 changes: 168 additions & 1 deletion tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use rdkafka::client::DefaultClientContext;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::metadata::Metadata;
use rdkafka::{ClientConfig, TopicPartitionList};
use rdkafka::{ClientConfig, Offset, TopicPartitionList};

use crate::utils::*;

Expand Down Expand Up @@ -154,6 +154,173 @@ async fn test_topics() {
assert_eq!(1, metadata_topic1.partitions().len());
assert_eq!(3, metadata_topic2.partitions().len());

// Verify that records can be deleted from an empty topic
let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&name1, 0, Offset::Offset(0))
.expect("add partition offset failed");
let res = admin_client
.delete_records(&offsets, &opts)
.await
.expect("delete records failed");
assert_eq!(res, &[Ok(name1.clone())]);

// Verify that records can be deleted from a non-empty topic
let partition = 0;
let message_count = 10;
populate_topic(
&name1,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;

let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&name1, partition, Offset::Offset(message_count.into()))
.expect("add partition offset failed");
let res = admin_client
.delete_records(&offsets, &opts)
.await
.expect("delete records failed");
assert_eq!(res, &[Ok(name1.clone())]);

// Verify that records can be deleted from multiple topics
populate_topic(
&name1,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;
populate_topic(
&name2,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;

let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&name1, partition, Offset::Offset(message_count.into()))
.expect("add partition offset1 failed");
offsets
.add_partition_offset(&name2, partition, Offset::Offset(message_count.into()))
.expect("add partition offset2 failed");
let results = admin_client
.delete_records(&offsets, &opts)
.await
.expect("delete records failed");

// Results can be in any order that is why we can't just say the following:
// assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]);

let mut found = false;
for result in &results {
if let Ok(name) = result {
if *name == name1 {
found = true;
break;
}
}
}
assert!(found);

let mut found = false;
for result in &results {
if let Ok(name) = result {
if *name == name2 {
found = true;
break;
}
}
}
assert!(found);

// Verify that mixed-success operations properly report the successful and
// failing operations.
populate_topic(
&name1,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;
populate_topic(
&name2,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;

let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&name1, partition, Offset::Offset(message_count.into()))
.expect("add partition offset1 failed");
let unknown_partition = 42;
offsets
.add_partition_offset(
&name2,
unknown_partition,
Offset::Offset(message_count.into()),
)
.expect("add partition offset2 failed");
let results = admin_client
.delete_records(&offsets, &opts)
.await
.expect("delete records failed");

// Results can be in any order that is why we can't just say the following:
// assert_eq!(res, &[Ok(name1.clone()), Err((name2.clone(), RDKafkaErrorCode::UnknownPartition))]);

let mut found = false;
for result in &results {
if let Ok(name) = result {
if *name == name1 {
found = true;
break;
}
}
}
assert!(found);

let mut found = false;
for result in &results {
if let Err((name, err)) = result {
if *name == name2 {
assert_eq!(err, &RDKafkaErrorCode::UnknownPartition);
found = true;
break;
}
}
}
assert!(found);

// Verify that deleting records from a non-existent topic fails.
{
let nonexistent_topic_name = rand_test_topic("test_topics");
let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&nonexistent_topic_name, 0, Offset::Offset(0))
.expect("add partition offset failed");
let res = admin_client.delete_records(&offsets, &opts).await;
assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaErrorCode::NoEnt)));
}


let res = admin_client
.describe_configs(
&[
Expand Down

0 comments on commit 9d89318

Please sign in to comment.