Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Emit one kafka message per row in a recordbatch #11

Merged
merged 3 commits into from
Aug 5, 2024
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -8,10 +8,10 @@ authors = [
"Amey Chaugule <[email protected]>",
]
edition = "2021"
homepage = "https://github.com/probably-nothing-labs/df-streams"
homepage = "https://github.com/probably-nothing-labs/denormalized"
license = "Apache-2.0"
readme = "README.md"
repository = "https://github.com/probably-nothing-labs/df-streams"
repository = "https://github.com/probably-nothing-labs/denormalized"
version = "0.1.0"
description = "Embeddable stream processing engine"

Expand Down
21 changes: 8 additions & 13 deletions crates/core/src/datasource/kafka/topic_writer.rs
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ use std::fmt::{self, Debug};
use std::time::Duration;
use std::{any::Any, sync::Arc};

use arrow::json::LineDelimitedWriter;
use arrow_schema::SchemaRef;

use datafusion::catalog::Session;
Expand All @@ -22,6 +21,7 @@ use rdkafka::producer::FutureProducer;
use rdkafka::producer::FutureRecord;

use super::KafkaWriteConfig;
use crate::utils::row_encoder::{JsonRowEncoder, RowEncoder};

// Used to createa kafka source
pub struct TopicWriter(pub Arc<KafkaWriteConfig>);
Expand Down Expand Up @@ -110,21 +110,16 @@ impl DataSink for KafkaSink {
while let Some(batch) = data.next().await.transpose()? {
row_count += batch.num_rows();

if batch.num_rows() > 0 {
let buf = Vec::new();
let mut writer = LineDelimitedWriter::new(buf);
writer.write_batches(&[&batch])?;
writer.finish()?;
let buf = writer.into_inner();
let encoder = JsonRowEncoder {};
let rows = encoder.encode(&batch)?;

let record = FutureRecord::<[u8], _>::to(topic).payload(&buf);
for row in rows {
let record = FutureRecord::<[u8], _>::to(topic).payload(&row);
// .key(key.as_str()),

let _delivery_status = self
.producer
.send(record, Duration::from_secs(0))
.await
.expect("Message not delivered");
if let Err(msg) = self.producer.send(record, Duration::from_secs(0)).await {
tracing::error!("{}", msg.0);
}
}
}

Expand Down
1 change: 1 addition & 0 deletions crates/core/src/utils/mod.rs
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#[allow(dead_code)]
pub mod arrow_helpers;
mod default_optimizer_rules;
pub mod row_encoder;
pub mod serialize;

pub use default_optimizer_rules::get_default_optimizer_rules;
90 changes: 90 additions & 0 deletions crates/core/src/utils/row_encoder.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,90 @@
use arrow::json::writer::{JsonFormat, Writer};
use datafusion::arrow::record_batch::RecordBatch;
use datafusion_common::Result;

pub trait RowEncoder {
fn encode(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>>;
}

#[derive(Debug, Default)]
// Formats json without any characting separating items.
pub struct NoDelimiter {}
impl JsonFormat for NoDelimiter {}
// writes rows as json without any character separating them
type JsonWriter<W> = Writer<W, NoDelimiter>;

pub struct JsonRowEncoder {}

impl JsonRowEncoder {
pub fn batch_to_json(&self, batch: &RecordBatch) -> Result<Vec<u8>> {
let buf = Vec::new();
let mut writer = JsonWriter::new(buf);
writer.write(batch)?;
writer.finish()?;
let buf = writer.into_inner();

Ok(buf)
}
}

impl RowEncoder for JsonRowEncoder {
fn encode(&self, batch: &RecordBatch) -> Result<Vec<Vec<u8>>> {
if batch.num_rows() == 0 {
return Ok(vec![]);
}

// BufWriter uses a buffer size of 8KB
// We therefore double this and flush once we have more than 8KB
let mut buffer = Vec::with_capacity(batch.num_rows());

for i in 0..batch.num_rows() {
let row = batch.slice(i, 1);
buffer.push(self.batch_to_json(&row)?);
}

Ok(buffer)
}
}

#[cfg(test)]
mod tests {
use super::{JsonRowEncoder, RowEncoder};

use datafusion::arrow::array::{Int32Array, StringArray};
use datafusion::arrow::datatypes::{DataType, Field, Schema};
use datafusion::arrow::record_batch::RecordBatch;
use std::sync::Arc;

#[test]
fn serialize_record_batch_to_json() {
// define a schema.
let schema = Arc::new(Schema::new(vec![
Field::new("col1", DataType::Utf8, false),
Field::new("col2", DataType::Int32, false),
]));

let batch = RecordBatch::try_new(
schema,
vec![
Arc::new(StringArray::from(vec!["a", "b", "c", "d"])),
Arc::new(Int32Array::from(vec![1, 10, 20, 100])),
],
)
.unwrap();

let encoder = JsonRowEncoder {};
let buf = encoder.encode(&batch).unwrap();

let res: Vec<&[u8]> = [
"{\"col1\":\"a\",\"col2\":1}",
"{\"col1\":\"b\",\"col2\":10}",
"{\"col1\":\"c\",\"col2\":20}",
"{\"col1\":\"d\",\"col2\":100}",
]
.iter()
.map(|v| v.as_bytes())
.collect::<_>();

assert_eq!(buf, res);
}
}