Skip to content

Commit

Permalink
Emit one kafka message per row in a recordbatch
Browse files Browse the repository at this point in the history
  • Loading branch information
emgeee committed Aug 2, 2024
1 parent 92b0920 commit cd72f0c
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 15 deletions.
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ authors = [
"Amey Chaugule <[email protected]>",
]
edition = "2021"
homepage = "https://github.com/probably-nothing-labs/df-streams"
homepage = "https://github.com/probably-nothing-labs/denormalized"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/probably-nothing-labs/df-streams"
repository = "https://github.com/probably-nothing-labs/denormalized"
version = "0.1.0"
description = "Embeddable stream processing engine"

Expand Down
21 changes: 8 additions & 13 deletions crates/core/src/datasource/kafka/topic_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::fmt::{self, Debug};
use std::time::Duration;
use std::{any::Any, sync::Arc};

use arrow::json::LineDelimitedWriter;
use arrow_schema::SchemaRef;

use datafusion::catalog::Session;
Expand All @@ -22,6 +21,7 @@ use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

use super::KafkaWriteConfig;
use crate::utils::row_encoder::{JsonRowEncoder, RowEncoder};

// Used to createa kafka source
pub struct TopicWriter(pub Arc<KafkaWriteConfig>);
Expand Down Expand Up @@ -110,21 +110,16 @@ impl DataSink for KafkaSink {
while let Some(batch) = data.next().await.transpose()? {
row_count += batch.num_rows();

if batch.num_rows() > 0 {
let buf = Vec::new();
let mut writer = LineDelimitedWriter::new(buf);
writer.write_batches(&[&batch])?;
writer.finish()?;
let buf = writer.into_inner();
let encoder = JsonRowEncoder {};
let rows = encoder.encode(&batch)?;

let record = FutureRecord::<[u8], _>::to(topic).payload(&buf);
for row in rows {
let record = FutureRecord::<[u8], _>::to(topic).payload(&row);
// .key(key.as_str()),

let _delivery_status = self
.producer
.send(record, Duration::from_secs(0))
.await
.expect("Message not delivered");
if let Err(msg) = self.producer.send(record, Duration::from_secs(0)).await {
tracing::error!("{}", msg.0);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/utils/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,5 +2,6 @@
pub mod arrow_helpers;
mod default_optimizer_rules;
pub mod serialize;
pub mod row_encoder;

pub use default_optimizer_rules::get_default_optimizer_rules;
90 changes: 90 additions & 0 deletions crates/core/src/utils/row_encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use arrow::json::writer::{JsonFormat, Writer};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_common::Result;

pub trait RowEncoder {
fn encode(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>>;
}

#[derive(Debug, Default)]
// Formats json without any characting separating items.
pub struct NoDelimiter {}
impl JsonFormat for NoDelimiter {}
// writes rows as json without any character separating them
type JsonWriter<W> = Writer<W, NoDelimiter>;

pub struct JsonRowEncoder {}

impl JsonRowEncoder {
pub fn batch_to_json(&self, batch: &RecordBatch) -> Result<Vec<u8>> {
let buf = Vec::new();
let mut writer = JsonWriter::new(buf);
writer.write(batch)?;
writer.finish()?;
let buf = writer.into_inner();

Ok(buf)
}
}

impl RowEncoder for JsonRowEncoder {
fn encode(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>> {
if batch.num_rows() == 0 {
return Ok(vec![]);
}

// BufWriter uses a buffer size of 8KB
// We therefore double this and flush once we have more than 8KB
let mut buffer = Vec::with_capacity(batch.num_rows());

for i in 0..batch.num_rows() {
let row = batch.slice(i, 1);
buffer.push(self.batch_to_json(&row)?);
}

Ok(buffer)
}
}

#[cfg(test)]
mod tests {
use super::{JsonRowEncoder, RowEncoder};

use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;

#[test]
fn serialize_record_batch_to_json() {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("col1", DataType::Utf8, false),
Field::new("col2", DataType::Int32, false),
]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();

let encoder = JsonRowEncoder {};
let buf = encoder.encode(&batch).unwrap();

let res: Vec<&[u8]> = vec![
"{\"col1\":\"a\",\"col2\":1}",
"{\"col1\":\"b\",\"col2\":10}",
"{\"col1\":\"c\",\"col2\":20}",
"{\"col1\":\"d\",\"col2\":100}",
]
.iter()
.map(|v| v.as_bytes())
.collect::<_>();

assert_eq!(buf, res);
}
}

0 comments on commit cd72f0c

Please sign in to comment.