diff --git a/Cargo.toml b/Cargo.toml index 2c4d3e0..1e83231 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -8,10 +8,10 @@ authors = [ "Amey Chaugule ", ] edition = "2021" -homepage = "https://github.com/probably-nothing-labs/df-streams" +homepage = "https://github.com/probably-nothing-labs/denormalized" license = "Apache-2.0" readme = "README.md" -repository = "https://github.com/probably-nothing-labs/df-streams" +repository = "https://github.com/probably-nothing-labs/denormalized" version = "0.1.0" description = "Embeddable stream processing engine" diff --git a/crates/core/src/datasource/kafka/topic_writer.rs b/crates/core/src/datasource/kafka/topic_writer.rs index 9d330da..70a8f6c 100644 --- a/crates/core/src/datasource/kafka/topic_writer.rs +++ b/crates/core/src/datasource/kafka/topic_writer.rs @@ -4,7 +4,6 @@ use std::fmt::{self, Debug}; use std::time::Duration; use std::{any::Any, sync::Arc}; -use arrow::json::LineDelimitedWriter; use arrow_schema::SchemaRef; use datafusion::catalog::Session; @@ -22,6 +21,7 @@ use rdkafka::producer::FutureProducer; use rdkafka::producer::FutureRecord; use super::KafkaWriteConfig; +use crate::utils::row_encoder::{JsonRowEncoder, RowEncoder}; // Used to createa kafka source pub struct TopicWriter(pub Arc); @@ -110,21 +110,16 @@ impl DataSink for KafkaSink { while let Some(batch) = data.next().await.transpose()? { row_count += batch.num_rows(); - if batch.num_rows() > 0 { - let buf = Vec::new(); - let mut writer = LineDelimitedWriter::new(buf); - writer.write_batches(&[&batch])?; - writer.finish()?; - let buf = writer.into_inner(); + let encoder = JsonRowEncoder {}; + let rows = encoder.encode(&batch)?; - let record = FutureRecord::<[u8], _>::to(topic).payload(&buf); + for row in rows { + let record = FutureRecord::<[u8], _>::to(topic).payload(&row); // .key(key.as_str()), - let _delivery_status = self - .producer - .send(record, Duration::from_secs(0)) - .await - .expect("Message not delivered"); + if let Err(msg) = self.producer.send(record, Duration::from_secs(0)).await { + tracing::error!("{}", msg.0); + } } } diff --git a/crates/core/src/utils/mod.rs b/crates/core/src/utils/mod.rs index f77961d..73cef59 100644 --- a/crates/core/src/utils/mod.rs +++ b/crates/core/src/utils/mod.rs @@ -1,6 +1,7 @@ #[allow(dead_code)] pub mod arrow_helpers; mod default_optimizer_rules; +pub mod row_encoder; pub mod serialize; pub use default_optimizer_rules::get_default_optimizer_rules; diff --git a/crates/core/src/utils/row_encoder.rs b/crates/core/src/utils/row_encoder.rs new file mode 100644 index 0000000..fea5a8f --- /dev/null +++ b/crates/core/src/utils/row_encoder.rs @@ -0,0 +1,87 @@ +use arrow::json::writer::{JsonFormat, Writer}; +use datafusion::arrow::record_batch::RecordBatch; +use datafusion_common::Result; + +pub trait RowEncoder { + fn encode(&self, batch: &RecordBatch) -> Result>>; +} + +#[derive(Debug, Default)] +// Formats json without any characting separating items. +pub struct NoDelimiter {} +impl JsonFormat for NoDelimiter {} +// writes rows as json without any character separating them +type JsonWriter = Writer; + +pub struct JsonRowEncoder {} + +impl JsonRowEncoder { + pub fn batch_to_json(&self, batch: &RecordBatch) -> Result> { + let buf = Vec::new(); + let mut writer = JsonWriter::new(buf); + writer.write(batch)?; + writer.finish()?; + let buf = writer.into_inner(); + + Ok(buf) + } +} + +impl RowEncoder for JsonRowEncoder { + fn encode(&self, batch: &RecordBatch) -> Result>> { + if batch.num_rows() == 0 { + return Ok(vec![]); + } + + let mut buffer = Vec::with_capacity(batch.num_rows()); + for i in 0..batch.num_rows() { + let row = batch.slice(i, 1); + buffer.push(self.batch_to_json(&row)?); + } + + Ok(buffer) + } +} + +#[cfg(test)] +mod tests { + use super::{JsonRowEncoder, RowEncoder}; + + use datafusion::arrow::array::{Int32Array, StringArray}; + use datafusion::arrow::datatypes::{DataType, Field, Schema}; + use datafusion::arrow::record_batch::RecordBatch; + use std::sync::Arc; + + #[test] + fn serialize_record_batch_to_json() { + // define a schema. + let schema = Arc::new(Schema::new(vec![ + Field::new("col1", DataType::Utf8, false), + Field::new("col2", DataType::Int32, false), + ])); + + let batch = RecordBatch::try_new( + schema, + vec![ + Arc::new(StringArray::from(vec!["a", "b", "c", "d"])), + Arc::new(Int32Array::from(vec![1, 10, 20, 100])), + ], + ) + .unwrap(); + + let encoder = JsonRowEncoder {}; + let buf = encoder.encode(&batch).unwrap(); + + let res: Vec<&[u8]> = [ + "{\"col1\":\"a\",\"col2\":1}", + "{\"col1\":\"b\",\"col2\":10}", + "{\"col1\":\"c\",\"col2\":20}", + "{\"col1\":\"d\",\"col2\":100}", + ] + .iter() + .map(|v| v.as_bytes()) + .collect::<_>(); + + assert_eq!(buf, res); + } +}