diff --git a/rdkafka-sys/src/types.rs b/rdkafka-sys/src/types.rs index 97b77b312..8b4d5a9df 100644 --- a/rdkafka-sys/src/types.rs +++ b/rdkafka-sys/src/types.rs @@ -72,6 +72,10 @@ pub type RDKafkaNewTopic = bindings::rd_kafka_NewTopic_t; /// Native rdkafka delete topic object. pub type RDKafkaDeleteTopic = bindings::rd_kafka_DeleteTopic_t; +/// Native rdkafka delete records object. +pub type RDKafkaDeleteRecords = bindings::rd_kafka_DeleteRecords_t; + + /// Native rdkafka delete group object. pub type RDKafkaDeleteGroup = bindings::rd_kafka_DeleteGroup_t; diff --git a/src/admin.rs b/src/admin.rs index 69dba537b..b10aa0656 100644 --- a/src/admin.rs +++ b/src/admin.rs @@ -25,6 +25,7 @@ use crate::client::{Client, ClientContext, DefaultClientContext, NativeQueue}; use crate::config::{ClientConfig, FromClientConfig, FromClientConfigAndContext}; use crate::error::{IsError, KafkaError, KafkaResult}; use crate::log::{trace, warn}; +use crate::TopicPartitionList; use crate::util::{cstr_to_owned, AsCArray, ErrBuf, IntoOpaque, KafkaDrop, NativePtr, Timeout}; // @@ -131,6 +132,44 @@ impl AdminClient { Ok(rx) } + /// Deletes the records in topics. + /// + /// Note that while the API supports deleting records from multiple topics at once, it is + /// not transactional. Deletion of some records in topics may succeed while others + /// fail. Be sure to check the result of each individual operation. + pub fn delete_records( + &self, + offsets: &TopicPartitionList, + opts: &AdminOptions, + ) -> impl Future>> { + match self.delete_records_inner(offsets, opts) { + Ok(rx) => Either::Left(DeleteRecordsFuture { rx }), + Err(err) => Either::Right(future::err(err)), + } + } + + fn delete_records_inner( + &self, + offsets: &TopicPartitionList, + opts: &AdminOptions, + ) -> KafkaResult> { + let mut err_buf = ErrBuf::new(); + let delete_records = unsafe { + NativeDeleteRecords::from_ptr(rdsys::rd_kafka_DeleteRecords_new(offsets.ptr())).unwrap() + }; + let (native_opts, rx) = opts.to_native(self.client.native_ptr(), &mut err_buf)?; + unsafe { + rdsys::rd_kafka_DeleteRecords( + self.client.native_ptr(), + &mut delete_records.ptr(), + 1, + native_opts.ptr(), + self.queue.ptr(), + ); + } + Ok(rx) + } + /// Deletes the named groups. pub fn delete_groups( &self, @@ -601,6 +640,26 @@ fn build_topic_results(topics: *const *const RDKafkaTopicResult, n: usize) -> Ve out } +/// The result of a DeleteRecords operation. +pub type RecordsResult = Result; + +fn build_records_results(offsets: *const RDKafkaTopicPartitionList) -> Vec { + let count = unsafe { (*offsets).cnt as usize }; + let mut out = Vec::with_capacity(count); + for i in 0..count { + let item = unsafe { (*offsets).elems.add(i) }; + let kafka_err = unsafe { (*item).err }; + let name = unsafe { cstr_to_owned((*item).topic) }; + if kafka_err.is_error() { + out.push(Err((name, kafka_err.into()))); + } else { + out.push(Ok(name)); + } + } + out +} + + /// The result of a DeleteGroup operation. pub type GroupResult = Result; @@ -801,6 +860,40 @@ impl Future for DeleteTopicsFuture { } } +// +// Delete records handling +// + +type NativeDeleteRecords = NativePtr; + +unsafe impl KafkaDrop for RDKafkaDeleteRecords { + const TYPE: &'static str = "delete records"; + const DROP: unsafe extern "C" fn(*mut Self) = rdsys::rd_kafka_DeleteRecords_destroy; +} + +struct DeleteRecordsFuture { + rx: oneshot::Receiver, +} + +impl Future for DeleteRecordsFuture { + type Output = KafkaResult>; + + fn poll(mut self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll { + let event = ready!(self.rx.poll_unpin(cx)).map_err(|_| KafkaError::Canceled)?; + event.check_error()?; + let res = unsafe { rdsys::rd_kafka_event_DeleteRecords_result(event.ptr()) }; + if res.is_null() { + let typ = unsafe { rdsys::rd_kafka_event_type(event.ptr()) }; + return Poll::Ready(Err(KafkaError::AdminOpCreation(format!( + "delete records request received response of incorrect type ({})", + typ + )))); + } + let offsets = unsafe { rdsys::rd_kafka_DeleteRecords_result_offsets(res) }; + Poll::Ready(Ok(build_records_results(offsets))) + } +} + // // Delete group handling // diff --git a/src/config.rs b/src/config.rs index 296d9f867..58e062a5b 100644 --- a/src/config.rs +++ b/src/config.rs @@ -150,7 +150,7 @@ impl NativeClientConfig { } // Convert the C string to a Rust string. - Ok(String::from_utf8_lossy(&buf).to_string()) + Ok(String::from_utf8_lossy(&buf).trim_matches(char::from(0)).to_string()) } } diff --git a/tests/test_admin.rs b/tests/test_admin.rs index 846a96c2a..fe543dfdc 100644 --- a/tests/test_admin.rs +++ b/tests/test_admin.rs @@ -12,7 +12,7 @@ use rdkafka::client::DefaultClientContext; use rdkafka::consumer::{BaseConsumer, CommitMode, Consumer, DefaultConsumerContext}; use rdkafka::error::{KafkaError, RDKafkaErrorCode}; use rdkafka::metadata::Metadata; -use rdkafka::{ClientConfig, TopicPartitionList}; +use rdkafka::{ClientConfig, Offset, TopicPartitionList}; use crate::utils::*; @@ -154,6 +154,173 @@ async fn test_topics() { assert_eq!(1, metadata_topic1.partitions().len()); assert_eq!(3, metadata_topic2.partitions().len()); + // Verify that records can be deleted from an empty topic + let mut offsets = TopicPartitionList::new(); + offsets + .add_partition_offset(&name1, 0, Offset::Offset(0)) + .expect("add partition offset failed"); + let res = admin_client + .delete_records(&offsets, &opts) + .await + .expect("delete records failed"); + assert_eq!(res, &[Ok(name1.clone())]); + + // Verify that records can be deleted from a non-empty topic + let partition = 0; + let message_count = 10; + populate_topic( + &name1, + message_count, + &value_fn, + &key_fn, + Some(partition), + None, + ) + .await; + + let mut offsets = TopicPartitionList::new(); + offsets + .add_partition_offset(&name1, partition, Offset::Offset(message_count.into())) + .expect("add partition offset failed"); + let res = admin_client + .delete_records(&offsets, &opts) + .await + .expect("delete records failed"); + assert_eq!(res, &[Ok(name1.clone())]); + + // Verify that records can be deleted from multiple topics + populate_topic( + &name1, + message_count, + &value_fn, + &key_fn, + Some(partition), + None, + ) + .await; + populate_topic( + &name2, + message_count, + &value_fn, + &key_fn, + Some(partition), + None, + ) + .await; + + let mut offsets = TopicPartitionList::new(); + offsets + .add_partition_offset(&name1, partition, Offset::Offset(message_count.into())) + .expect("add partition offset1 failed"); + offsets + .add_partition_offset(&name2, partition, Offset::Offset(message_count.into())) + .expect("add partition offset2 failed"); + let results = admin_client + .delete_records(&offsets, &opts) + .await + .expect("delete records failed"); + + // Results can be in any order that is why we can't just say the following: + // assert_eq!(res, &[Ok(name1.clone()), Ok(name2.clone())]); + + let mut found = false; + for result in &results { + if let Ok(name) = result { + if *name == name1 { + found = true; + break; + } + } + } + assert!(found); + + let mut found = false; + for result in &results { + if let Ok(name) = result { + if *name == name2 { + found = true; + break; + } + } + } + assert!(found); + + // Verify that mixed-success operations properly report the successful and + // failing operations. + populate_topic( + &name1, + message_count, + &value_fn, + &key_fn, + Some(partition), + None, + ) + .await; + populate_topic( + &name2, + message_count, + &value_fn, + &key_fn, + Some(partition), + None, + ) + .await; + + let mut offsets = TopicPartitionList::new(); + offsets + .add_partition_offset(&name1, partition, Offset::Offset(message_count.into())) + .expect("add partition offset1 failed"); + let unknown_partition = 42; + offsets + .add_partition_offset( + &name2, + unknown_partition, + Offset::Offset(message_count.into()), + ) + .expect("add partition offset2 failed"); + let results = admin_client + .delete_records(&offsets, &opts) + .await + .expect("delete records failed"); + + // Results can be in any order that is why we can't just say the following: + // assert_eq!(res, &[Ok(name1.clone()), Err((name2.clone(), RDKafkaErrorCode::UnknownPartition))]); + + let mut found = false; + for result in &results { + if let Ok(name) = result { + if *name == name1 { + found = true; + break; + } + } + } + assert!(found); + + let mut found = false; + for result in &results { + if let Err((name, err)) = result { + if *name == name2 { + assert_eq!(err, &RDKafkaErrorCode::UnknownPartition); + found = true; + break; + } + } + } + assert!(found); + + // Verify that deleting records from a non-existent topic fails. + { + let nonexistent_topic_name = rand_test_topic("test_topics"); + let mut offsets = TopicPartitionList::new(); + offsets + .add_partition_offset(&nonexistent_topic_name, 0, Offset::Offset(0)) + .expect("add partition offset failed"); + let res = admin_client.delete_records(&offsets, &opts).await; + assert_eq!(res, Err(KafkaError::AdminOp(RDKafkaErrorCode::NoEnt))); + } + + let res = admin_client .describe_configs( &[