Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

add delete records api to admin client #1

Open
wants to merge 1 commit into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions rdkafka-sys/src/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,6 +72,10 @@ pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t;
/// Native rdkafka delete topic object.
pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t;

/// Native rdkafka delete records object.
pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t;


/// Native rdkafka delete group object.
pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t;

Expand Down
93 changes: 93 additions & 0 deletions src/admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue};
use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext};
use crate::error::{IsError, KafkaError, KafkaResult};
use crate::log::{trace, warn};
use crate::TopicPartitionList;
use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout};

//
Expand Down Expand Up @@ -131,6 +132,44 @@ impl<C: ClientContext> AdminClient<C> {
Ok(rx)
}

/// Deletes the records in topics.
///
/// Note that while the API supports deleting records from multiple topics at once, it is
/// not transactional. Deletion of some records in topics may succeed while others
/// fail. Be sure to check the result of each individual operation.
pub fn delete_records(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> impl Future<Output = KafkaResult<Vec<RecordsResult>>> {
match self.delete_records_inner(offsets, opts) {
Ok(rx) => Either::Left(DeleteRecordsFuture { rx }),
Err(err) => Either::Right(future::err(err)),
}
}

fn delete_records_inner(
&self,
offsets: &TopicPartitionList,
opts: &AdminOptions,
) -> KafkaResult<oneshot::Receiver<NativeEvent>> {
let mut err_buf = ErrBuf::new();
let delete_records = unsafe {
NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr())).unwrap()
};
let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?;
unsafe {
rdsys::rd_kafka_DeleteRecords(
self.client.native_ptr(),
&mut delete_records.ptr(),
1,
native_opts.ptr(),
self.queue.ptr(),
);
}
Ok(rx)
}

/// Deletes the named groups.
pub fn delete_groups(
&self,
Expand Down Expand Up @@ -601,6 +640,26 @@ fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Ve
out
}

/// The result of a DeleteRecords operation.
pub type RecordsResult = Result<String, (String, RDKafkaErrorCode)>;

fn build_records_results(offsets: *const RDKafkaTopicPartitionList) -> Vec<RecordsResult> {
let count = unsafe { (*offsets).cnt as usize };
let mut out = Vec::with_capacity(count);
for i in 0..count {
let item = unsafe { (*offsets).elems.add(i) };
let kafka_err = unsafe { (*item).err };
let name = unsafe { cstr_to_owned((*item).topic) };
if kafka_err.is_error() {
out.push(Err((name, kafka_err.into())));
} else {
out.push(Ok(name));
}
}
out
}


/// The result of a DeleteGroup operation.
pub type GroupResult = Result<String, (String, RDKafkaErrorCode)>;

Expand Down Expand Up @@ -801,6 +860,40 @@ impl Future for DeleteTopicsFuture {
}
}

//
// Delete records handling
//

type NativeDeleteRecords = NativePtr<RDKafkaDeleteRecords>;

unsafe impl KafkaDrop for RDKafkaDeleteRecords {
const TYPE: &'static str = "delete records";
const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy;
}

struct DeleteRecordsFuture {
rx: oneshot::Receiver<NativeEvent>,
}

impl Future for DeleteRecordsFuture {
type Output = KafkaResult<Vec<RecordsResult>>;

fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll<Self::Output> {
let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?;
event.check_error()?;
let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) };
if res.is_null() {
let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) };
return Poll::Ready(Err(KafkaError::AdminOpCreation(format!(
"delete records request received response of incorrect type ({})",
typ
))));
}
let offsets = unsafe { rdsys::rd_kafka_DeleteRecords_result_offsets(res) };
Poll::Ready(Ok(build_records_results(offsets)))
}
}

//
// Delete group handling
//
Expand Down
169 changes: 168 additions & 1 deletion tests/test_admin.rs
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ use rdkafka::client::DefaultClientContext;
use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext};
use rdkafka::error::{KafkaError, RDKafkaErrorCode};
use rdkafka::metadata::Metadata;
use rdkafka::{ClientConfig, TopicPartitionList};
use rdkafka::{ClientConfig, Offset, TopicPartitionList};

use crate::utils::*;

Expand Down Expand Up @@ -154,6 +154,173 @@ async fn test_topics() {
assert_eq!(1, metadata_topic1.partitions().len());
assert_eq!(3, metadata_topic2.partitions().len());

// Verify that records can be deleted from an empty topic
let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&name1, 0, Offset::Offset(0))
.expect("add partition offset failed");
let res = admin_client
.delete_records(&offsets, &opts)
.await
.expect("delete records failed");
assert_eq!(res, &[Ok(name1.clone())]);

// Verify that records can be deleted from a non-empty topic
let partition = 0;
let message_count = 10;
populate_topic(
&name1,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;

let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&name1, partition, Offset::Offset(message_count.into()))
.expect("add partition offset failed");
let res = admin_client
.delete_records(&offsets, &opts)
.await
.expect("delete records failed");
assert_eq!(res, &[Ok(name1.clone())]);

// Verify that records can be deleted from multiple topics
populate_topic(
&name1,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;
populate_topic(
&name2,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;

let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&name1, partition, Offset::Offset(message_count.into()))
.expect("add partition offset1 failed");
offsets
.add_partition_offset(&name2, partition, Offset::Offset(message_count.into()))
.expect("add partition offset2 failed");
let results = admin_client
.delete_records(&offsets, &opts)
.await
.expect("delete records failed");

// Results can be in any order that is why we can't just say the following:
// assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]);

let mut found = false;
for result in &results {
if let Ok(name) = result {
if *name == name1 {
found = true;
break;
}
}
}
assert!(found);

let mut found = false;
for result in &results {
if let Ok(name) = result {
if *name == name2 {
found = true;
break;
}
}
}
assert!(found);

// Verify that mixed-success operations properly report the successful and
// failing operations.
populate_topic(
&name1,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;
populate_topic(
&name2,
message_count,
&value_fn,
&key_fn,
Some(partition),
None,
)
.await;

let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&name1, partition, Offset::Offset(message_count.into()))
.expect("add partition offset1 failed");
let unknown_partition = 42;
offsets
.add_partition_offset(
&name2,
unknown_partition,
Offset::Offset(message_count.into()),
)
.expect("add partition offset2 failed");
let results = admin_client
.delete_records(&offsets, &opts)
.await
.expect("delete records failed");

// Results can be in any order that is why we can't just say the following:
// assert_eq!(res, &[Ok(name1.clone()), Err((name2.clone(), RDKafkaErrorCode::UnknownPartition))]);

let mut found = false;
for result in &results {
if let Ok(name) = result {
if *name == name1 {
found = true;
break;
}
}
}
assert!(found);

let mut found = false;
for result in &results {
if let Err((name, err)) = result {
if *name == name2 {
assert_eq!(err, &RDKafkaErrorCode::UnknownPartition);
found = true;
break;
}
}
}
assert!(found);

// Verify that deleting records from a non-existent topic fails.
{
let nonexistent_topic_name = rand_test_topic("test_topics");
let mut offsets = TopicPartitionList::new();
offsets
.add_partition_offset(&nonexistent_topic_name, 0, Offset::Offset(0))
.expect("add partition offset failed");
let res = admin_client.delete_records(&offsets, &opts).await;
assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaErrorCode::NoEnt)));
}


let res = admin_client
.describe_configs(
&[
Expand Down